LadyForest commented on code in PR #208:
URL: https://github.com/apache/flink-table-store/pull/208#discussion_r918611551


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java:
##########
@@ -178,40 +206,75 @@ public void createTable(ObjectPath tablePath, 
CatalogBaseTable table, boolean ig
     public void alterTable(
             ObjectPath tablePath, CatalogBaseTable newTable, boolean 
ignoreIfNotExists)
             throws TableNotExistException, CatalogException {
+        if (ignoreIfNotExists && !tableExists(tablePath)) {
+            return;
+        }
+
+        CatalogTable table = getTable(tablePath);
+
+        // Currently, Flink SQL only support altering table properties.
+        validateAlterTable(table, (CatalogTable) newTable);
+
+        List<SchemaChange> changes = new ArrayList<>();
+        Map<String, String> oldProperties = table.getOptions();
+        for (Map.Entry<String, String> entry : 
newTable.getOptions().entrySet()) {
+            String key = entry.getKey();
+            String value = entry.getValue();
+
+            if (Objects.equals(value, oldProperties.get(key))) {
+                continue;
+            }
+
+            if (PATH.key().equalsIgnoreCase(key)) {
+                throw new IllegalArgumentException("Illegal table path in 
table options: " + value);
+            }
+
+            changes.add(SchemaChange.setOption(key, value));
+        }
+
+        oldProperties
+                .keySet()
+                .forEach(
+                        k -> {
+                            if (!newTable.getOptions().containsKey(k)) {
+                                changes.add(SchemaChange.removeOption(k));
+                            }
+                        });
+
         try {
-            catalog.alterTable(
-                    tablePath, convertTableToSchema(tablePath, newTable), 
ignoreIfNotExists);
+            catalog.alterTable(tablePath, changes, ignoreIfNotExists);
         } catch (Catalog.TableNotExistException e) {
             throw new TableNotExistException(getName(), e.tablePath());
         }
     }
 
-    private UpdateSchema convertTableToSchema(ObjectPath tablePath, 
CatalogBaseTable baseTable) {
-        if (!(baseTable instanceof CatalogTable)) {
-            throw new UnsupportedOperationException(
-                    "Only support CatalogTable, but is: " + 
baseTable.getClass());
-        }
-        CatalogTable table = (CatalogTable) baseTable;
-        Map<String, String> options = table.getOptions();
-        if (options.containsKey(CONNECTOR.key())) {
-            throw new CatalogException(
-                    String.format(
-                            "Table Store Catalog only supports table store 
tables, not '%s' connector."
-                                    + " You can create TEMPORARY table 
instead.",
-                            options.get(CONNECTOR.key())));
+    private static void validateAlterTable(CatalogTable ct1, CatalogTable ct2) 
{
+        org.apache.flink.table.api.TableSchema ts1 = ct1.getSchema();
+        org.apache.flink.table.api.TableSchema ts2 = ct2.getSchema();
+        boolean equalsPrimary = false;

Review Comment:
   Nit: `pkEquals`? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to