LadyForest commented on code in PR #24763:
URL: https://github.com/apache/flink/pull/24763#discussion_r1636497861


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java:
##########
@@ -124,6 +127,26 @@ public void testAlterCatalog() {
                         "cat2",
                         "ALTER CATALOG cat2\n  SET 'K1' = 'V1',\n  SET 'k2' = 
'v2_new'",
                         expectedOptions);
+
+        // test alter catalog reset
+        final Set<String> expectedResetKeys = new HashSet<>();
+        expectedResetKeys.add("K1");

Review Comment:
   Nit: `final Set<String> expectedResetKeys = Collections.singleton("K1");`



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##########
@@ -342,6 +342,15 @@ public void alterCatalog(String catalogName, 
CatalogDescriptor catalogDescriptor
         if (catalogStore.contains(catalogName) && 
oldCatalogDescriptor.isPresent()) {
             Configuration conf = oldCatalogDescriptor.get().getConfiguration();
             conf.addAll(catalogDescriptor.getConfiguration());
+            catalogDescriptor

Review Comment:
   It'd be better to make the configuration change as an updater rather than 
directly passing the catalog descriptor here.
   
   E.g.
   
   ```java
       /**
        * Alters a catalog under the given name. The catalog name must be 
unique.
        *
        * @param catalogName the given catalog name under which to alter the 
given catalog
        * @param catalogUpdater catalog configuration updater to alter catalog
        * @throws CatalogException If the catalog neither exists in the catalog 
store nor in the
        *     initialized catalogs, or if an error occurs while creating the 
catalog or storing the
        *     {@link CatalogDescriptor}
        */
       public void alterCatalog(String catalogName, Consumer<Configuration> 
catalogUpdater)
               throws CatalogException {
           checkArgument(
                   !StringUtils.isNullOrWhitespaceOnly(catalogName),
                   "Catalog name cannot be null or empty.");
           checkNotNull(catalogUpdater, "Catalog configuration updater cannot 
be null.");
   
           CatalogStore catalogStore = catalogStoreHolder.catalogStore();
           Optional<CatalogDescriptor> oldCatalogDescriptor = 
getCatalogDescriptor(catalogName);
   
           if (catalogStore.contains(catalogName) && 
oldCatalogDescriptor.isPresent()) {
               Configuration conf = 
oldCatalogDescriptor.get().getConfiguration();
               catalogUpdater.accept(conf);
               CatalogDescriptor newCatalogDescriptor = 
CatalogDescriptor.of(catalogName, conf);
               Catalog newCatalog = initCatalog(catalogName, 
newCatalogDescriptor);
               catalogStore.removeCatalog(catalogName, false);
               if (catalogs.containsKey(catalogName)) {
                   catalogs.get(catalogName).close();
               }
               newCatalog.open();
               catalogs.put(catalogName, newCatalog);
               catalogStoreHolder.catalogStore().storeCatalog(catalogName, 
newCatalogDescriptor);
           } else {
               throw new CatalogException(
                       String.format("Catalog %s does not exist in the catalog 
store.", catalogName));
           }
       }
   ```
   
   For `ALTER CATALOG SET`
   ```java
       @Override
       public TableResultInternal execute(Context ctx) {
           try {
               ctx.getCatalogManager()
                       .alterCatalog(
                               catalogName, conf -> 
conf.addAll(Configuration.fromMap(properties)));
   
               return TableResultImpl.TABLE_RESULT_OK;
           } catch (CatalogException e) {
               throw new ValidationException(
                       String.format("Could not execute %s", 
asSummaryString()), e);
           }
       }
   ```
   
   For `ALTER CATALOG RESET`
   ```java
       @Override
       public TableResultInternal execute(Context ctx) {
           try {
               ctx.getCatalogManager()
                       .alterCatalog(catalogName, conf -> 
resetKeys.forEach(conf::removeKey));
   
               return TableResultImpl.TABLE_RESULT_OK;
           } catch (CatalogException e) {
               throw new ValidationException(
                       String.format("Could not execute %s", 
asSummaryString()), e);
           }
       }
   ```
   



##########
flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q:
##########
@@ -769,3 +769,28 @@ desc catalog extended cat2;
 +-------------------------+-------------------+
 4 rows in set
 !ok
+

Review Comment:
   ditto



##########
flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q:
##########
@@ -911,3 +911,35 @@ desc catalog extended cat2;
 +-------------------------+-------------------+
 4 rows in set
 !ok
+
+alter catalog cat2 reset ('default-database', 'k1');

Review Comment:
   Nit: I found that a section comment called `# test catalog` already exists 
at L#47, and it'd be better to gather all catalog-related tests together to 
improve readability.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to