LadyForest commented on code in PR #24735:
URL: https://github.com/apache/flink/pull/24735#discussion_r1591751966
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java:
##########
@@ -102,6 +103,23 @@
/** Test cases for the DDL statements for {@link
SqlNodeToOperationConversion}. */
public class SqlDdlToOperationConverterTest extends
SqlNodeToOperationConversionTestBase {
+ @Test
+ public void testAlterCatalog() {
+ // test alter catalog options
+ final String sql1 = "ALTER CATALOG cat2 SET ('K1' = 'V1', 'k2' = 'v2',
'k2' = 'v2_new')";
+ Operation operation = parse(sql1);
+ assertThat(operation).isInstanceOf(AlterCatalogOptionsOperation.class);
+ assertThat(((AlterCatalogOptionsOperation)
operation).getCatalogName()).isEqualTo("cat2");
+ assertThat(operation.asSummaryString())
+ .isEqualTo("ALTER CATALOG cat2\n SET 'K1' = 'V1',\n SET 'k2'
= 'v2_new'");
+
+ final Map<String, String> expectedOptions = new HashMap<>();
+ expectedOptions.put("K1", "V1");
+ expectedOptions.put("k2", "v2_new");
+ assertThat(((AlterCatalogOptionsOperation) operation).getProperties())
+ .isEqualTo(expectedOptions);
Review Comment:
Nit: can be simplified as
```java
// test alter catalog options
final String sql1 = "ALTER CATALOG cat2 SET ('K1' = 'V1', 'k2' =
'v2', 'k2' = 'v2_new')";
final Map<String, String> expectedOptions = new HashMap<>();
expectedOptions.put("K1", "V1");
expectedOptions.put("k2", "v2_new");
Operation operation = parse(sql1);
assertThat(operation)
.isInstanceOf(AlterCatalogOptionsOperation.class)
.asInstanceOf(type(AlterCatalogOptionsOperation.class))
.extracting(
AlterCatalogOptionsOperation::getCatalogName,
AlterCatalogOptionsOperation::asSummaryString,
AlterCatalogOptionsOperation::getProperties)
.containsExactly(
"cat2",
"ALTER CATALOG cat2\n SET 'K1' = 'V1',\n SET 'k2'
= 'v2_new'",
expectedOptions);
```
##########
flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl:
##########
@@ -147,6 +147,28 @@ SqlDrop SqlDropCatalog(Span s, boolean replace) :
}
}
+/**
+* Parses a alter catalog statement.
Review Comment:
Nit: an alter catalog statement
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##########
@@ -321,6 +322,42 @@ public void createCatalog(String catalogName,
CatalogDescriptor catalogDescripto
catalogStoreHolder.catalogStore().storeCatalog(catalogName,
catalogDescriptor);
}
+ /**
+ * Alters a catalog under the given name. The catalog name must be unique.
+ *
+ * @param catalogName the given catalog name under which to alter the
given catalog
+ * @param catalogDescriptor catalog descriptor for altering catalog
+ * @throws CatalogException If the catalog neither exists in the catalog
store nor in the
+ * initialized catalogs, or if an error occurs while creating the
catalog or storing the
+ * {@link CatalogDescriptor}
+ */
+ public void alterCatalog(String catalogName, CatalogDescriptor
catalogDescriptor)
+ throws CatalogException {
+ checkArgument(
+ !StringUtils.isNullOrWhitespaceOnly(catalogName),
+ "Catalog name cannot be null or empty.");
+ checkNotNull(catalogDescriptor, "Catalog descriptor cannot be null");
+ CatalogStore catalogStore = catalogStoreHolder.catalogStore();
+ Optional<CatalogDescriptor> oldCatalogDescriptor =
getCatalogDescriptor(catalogName);
+ if (catalogStore.contains(catalogName) &&
oldCatalogDescriptor.isPresent()) {
+ Map<String, String> props =
oldCatalogDescriptor.get().getConfiguration().toMap();
Review Comment:
Nit: no need to convert to map back and forth.
```java
Configuration conf = oldCatalogDescriptor.get().getConfiguration();
conf.addAll(catalogDescriptor.getConfiguration());
CatalogDescriptor newCatalogDescriptor = CatalogDescriptor.of(catalogName,
conf);
```
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##########
@@ -321,6 +322,42 @@ public void createCatalog(String catalogName,
CatalogDescriptor catalogDescripto
catalogStoreHolder.catalogStore().storeCatalog(catalogName,
catalogDescriptor);
}
+ /**
+ * Alters a catalog under the given name. The catalog name must be unique.
+ *
+ * @param catalogName the given catalog name under which to alter the
given catalog
+ * @param catalogDescriptor catalog descriptor for altering catalog
+ * @throws CatalogException If the catalog neither exists in the catalog
store nor in the
+ * initialized catalogs, or if an error occurs while creating the
catalog or storing the
+ * {@link CatalogDescriptor}
+ */
+ public void alterCatalog(String catalogName, CatalogDescriptor
catalogDescriptor)
+ throws CatalogException {
+ checkArgument(
+ !StringUtils.isNullOrWhitespaceOnly(catalogName),
+ "Catalog name cannot be null or empty.");
+ checkNotNull(catalogDescriptor, "Catalog descriptor cannot be null");
+ CatalogStore catalogStore = catalogStoreHolder.catalogStore();
+ Optional<CatalogDescriptor> oldCatalogDescriptor =
getCatalogDescriptor(catalogName);
+ if (catalogStore.contains(catalogName) &&
oldCatalogDescriptor.isPresent()) {
+ Map<String, String> props =
oldCatalogDescriptor.get().getConfiguration().toMap();
+ props.putAll(catalogDescriptor.getConfiguration().toMap());
+ CatalogDescriptor newCatalogDescriptor =
+ CatalogDescriptor.of(catalogName,
Configuration.fromMap(props));
+ Catalog catalog = initCatalog(catalogName, newCatalogDescriptor);
Review Comment:
Nit: if `newCatalogDescriptor` is used to differentiate from
`catalogDescriptor,` what about `newCatalog` instead of `catalog` to keep
naming consistency?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]