LadyForest commented on code in PR #24763:
URL: https://github.com/apache/flink/pull/24763#discussion_r1636497861
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java:
##########
@@ -124,6 +127,26 @@ public void testAlterCatalog() {
"cat2",
"ALTER CATALOG cat2\n SET 'K1' = 'V1',\n SET 'k2' =
'v2_new'",
expectedOptions);
+
+ // test alter catalog reset
+ final Set<String> expectedResetKeys = new HashSet<>();
+ expectedResetKeys.add("K1");
Review Comment:
Nit: `final Set<String> expectedResetKeys = Collections.singleton("K1");`
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##########
@@ -342,6 +342,15 @@ public void alterCatalog(String catalogName,
CatalogDescriptor catalogDescriptor
if (catalogStore.contains(catalogName) &&
oldCatalogDescriptor.isPresent()) {
Configuration conf = oldCatalogDescriptor.get().getConfiguration();
conf.addAll(catalogDescriptor.getConfiguration());
+ catalogDescriptor
Review Comment:
It'd be better to make the configuration change as an updater rather than
directly passing the catalog descriptor here.
E.g.
```java
/**
* Alters a catalog under the given name. The catalog name must be
unique.
*
* @param catalogName the given catalog name under which to alter the
given catalog
* @param catalogUpdater catalog configuration updater to alter catalog
* @throws CatalogException If the catalog neither exists in the catalog
store nor in the
* initialized catalogs, or if an error occurs while creating the
catalog or storing the
* {@link CatalogDescriptor}
*/
public void alterCatalog(String catalogName, Consumer<Configuration>
catalogUpdater)
throws CatalogException {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(catalogName),
"Catalog name cannot be null or empty.");
checkNotNull(catalogUpdater, "Catalog configuration updater cannot
be null.");
CatalogStore catalogStore = catalogStoreHolder.catalogStore();
Optional<CatalogDescriptor> oldCatalogDescriptor =
getCatalogDescriptor(catalogName);
if (catalogStore.contains(catalogName) &&
oldCatalogDescriptor.isPresent()) {
Configuration conf =
oldCatalogDescriptor.get().getConfiguration();
catalogUpdater.accept(conf);
CatalogDescriptor newCatalogDescriptor =
CatalogDescriptor.of(catalogName, conf);
Catalog newCatalog = initCatalog(catalogName,
newCatalogDescriptor);
catalogStore.removeCatalog(catalogName, false);
if (catalogs.containsKey(catalogName)) {
catalogs.get(catalogName).close();
}
newCatalog.open();
catalogs.put(catalogName, newCatalog);
catalogStoreHolder.catalogStore().storeCatalog(catalogName,
newCatalogDescriptor);
} else {
throw new CatalogException(
String.format("Catalog %s does not exist in the catalog
store.", catalogName));
}
}
```
For `ALTER CATALOG SET`
```java
@Override
public TableResultInternal execute(Context ctx) {
try {
ctx.getCatalogManager()
.alterCatalog(
catalogName, conf ->
conf.addAll(Configuration.fromMap(properties)));
return TableResultImpl.TABLE_RESULT_OK;
} catch (CatalogException e) {
throw new ValidationException(
String.format("Could not execute %s",
asSummaryString()), e);
}
}
```
For `ALTER CATALOG RESET`
```java
@Override
public TableResultInternal execute(Context ctx) {
try {
ctx.getCatalogManager()
.alterCatalog(catalogName, conf ->
resetKeys.forEach(conf::removeKey));
return TableResultImpl.TABLE_RESULT_OK;
} catch (CatalogException e) {
throw new ValidationException(
String.format("Could not execute %s",
asSummaryString()), e);
}
}
```
##########
flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q:
##########
@@ -769,3 +769,28 @@ desc catalog extended cat2;
+-------------------------+-------------------+
4 rows in set
!ok
+
Review Comment:
ditto
##########
flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q:
##########
@@ -911,3 +911,35 @@ desc catalog extended cat2;
+-------------------------+-------------------+
4 rows in set
!ok
+
+alter catalog cat2 reset ('default-database', 'k1');
Review Comment:
Nit: I found that a section comment called `# test catalog` already exists
at L#47, and it'd be better to gather all catalog-related tests together to
improve readability.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]