This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 75a92efd7b3 [FLINK-30493][table-api] Introduce TableChange to 
SET/RESET options (#21554)
75a92efd7b3 is described below

commit 75a92efd7b35501698e5de253e5231d680830c16
Author: Shengkai <[email protected]>
AuthorDate: Tue Dec 27 20:36:08 2022 +0800

    [FLINK-30493][table-api] Introduce TableChange to SET/RESET options (#21554)
    
    This closes #21554
---
 .../hive/parse/HiveParserDDLSemanticAnalyzer.java  |  10 +-
 .../table/api/internal/TableEnvironmentImpl.java   |   9 ++
 .../apache/flink/table/catalog/CatalogManager.java |  24 +++
 .../operations/ddl/AlterTableChangeOperation.java  |  77 ++++++++++
 .../operations/ddl/AlterTableOptionsOperation.java |   7 +-
 .../org/apache/flink/table/catalog/Catalog.java    |  27 ++++
 .../apache/flink/table/catalog/TableChange.java    | 164 +++++++++++++++++++++
 .../operations/SqlToOperationConverter.java        |  20 ++-
 .../operations/SqlToOperationConverterTest.java    |  30 +++-
 9 files changed, 352 insertions(+), 16 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
index ad73f164020..b386a46b62b 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
@@ -47,6 +47,7 @@ import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.catalog.FunctionLanguage;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.TableChange;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
@@ -73,7 +74,7 @@ import org.apache.flink.table.operations.UseDatabaseOperation;
 import org.apache.flink.table.operations.ddl.AddPartitionsOperation;
 import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
 import org.apache.flink.table.operations.ddl.AlterPartitionPropertiesOperation;
-import org.apache.flink.table.operations.ddl.AlterTableOptionsOperation;
+import org.apache.flink.table.operations.ddl.AlterTableChangeOperation;
 import org.apache.flink.table.operations.ddl.AlterTableRenameOperation;
 import org.apache.flink.table.operations.ddl.AlterTableSchemaOperation;
 import org.apache.flink.table.operations.ddl.AlterViewAsOperation;
@@ -1492,7 +1493,12 @@ public class HiveParserDDLSemanticAnalyzer {
         } else {
             props.putAll(oldTable.getOptions());
             props.putAll(newProps);
-            return new AlterTableOptionsOperation(tableIdentifier, 
oldTable.copy(props));
+            return new AlterTableChangeOperation(
+                    tableIdentifier,
+                    newProps.entrySet().stream()
+                            .map(entry -> TableChange.set(entry.getKey(), 
entry.getValue()))
+                            .collect(Collectors.toList()),
+                    oldTable.copy(props));
         }
     }
 
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index d3e58d0cb19..bffbef65c1c 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -124,6 +124,7 @@ import 
org.apache.flink.table.operations.ddl.AddPartitionsOperation;
 import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
 import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
 import org.apache.flink.table.operations.ddl.AlterPartitionPropertiesOperation;
+import org.apache.flink.table.operations.ddl.AlterTableChangeOperation;
 import org.apache.flink.table.operations.ddl.AlterTableDropConstraintOperation;
 import org.apache.flink.table.operations.ddl.AlterTableOperation;
 import org.apache.flink.table.operations.ddl.AlterTableOptionsOperation;
@@ -1047,6 +1048,14 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
                     for (CatalogPartitionSpec spec : 
dropPartitionsOperation.getPartitionSpecs()) {
                         catalog.dropPartition(tablePath, spec, ifExists);
                     }
+                } else if (alterTableOperation instanceof 
AlterTableChangeOperation) {
+                    AlterTableChangeOperation alterTableChangeOperation =
+                            (AlterTableChangeOperation) alterTableOperation;
+                    catalogManager.alterTable(
+                            alterTableChangeOperation.getNewTable(),
+                            alterTableChangeOperation.getTableChanges(),
+                            alterTableChangeOperation.getTableIdentifier(),
+                            false);
                 }
                 return TableResultImpl.TABLE_RESULT_OK;
             } catch (TableAlreadyExistException | TableNotExistException e) {
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index 66b6dc2b4d8..09fbb880e85 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -806,6 +806,30 @@ public final class CatalogManager {
                 "AlterTable");
     }
 
+    /**
+     * Alters a table in a given fully qualified path with table changes.
+     *
+     * @param table The table to put in the given path
+     * @param changes The table changes from the original table to the new 
table.
+     * @param objectIdentifier The fully qualified path where to alter the 
table.
+     * @param ignoreIfNotExists If false exception will be thrown if the table 
or database or
+     *     catalog to be altered does not exist.
+     */
+    public void alterTable(
+            CatalogBaseTable table,
+            List<TableChange> changes,
+            ObjectIdentifier objectIdentifier,
+            boolean ignoreIfNotExists) {
+        execute(
+                (catalog, path) -> {
+                    final CatalogBaseTable resolvedTable = 
resolveCatalogBaseTable(table);
+                    catalog.alterTable(path, resolvedTable, changes, 
ignoreIfNotExists);
+                },
+                objectIdentifier,
+                ignoreIfNotExists,
+                "AlterTable");
+    }
+
     /**
      * Drops a table in a given fully qualified path.
      *
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java
new file mode 100644
index 00000000000..009d7e9a071
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.TableChange;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Alter table with new table definition and table changes represents the 
modification. */
+public class AlterTableChangeOperation extends AlterTableOperation {
+
+    private final List<TableChange> tableChanges;
+    private final CatalogTable newTable;
+
+    public AlterTableChangeOperation(
+            ObjectIdentifier tableIdentifier,
+            List<TableChange> tableChanges,
+            CatalogTable newTable) {
+        super(tableIdentifier);
+        this.tableChanges = Collections.unmodifiableList(tableChanges);
+        this.newTable = newTable;
+    }
+
+    public List<TableChange> getTableChanges() {
+        return tableChanges;
+    }
+
+    public CatalogTable getNewTable() {
+        return newTable;
+    }
+
+    @Override
+    public String asSummaryString() {
+        String changes =
+                tableChanges.stream()
+                        .map(
+                                tableChange -> {
+                                    if (tableChange instanceof 
TableChange.SetOption) {
+                                        TableChange.SetOption setChange =
+                                                (TableChange.SetOption) 
tableChange;
+                                        return String.format(
+                                                "  SET '%s' = '%s'",
+                                                setChange.getKey(), 
setChange.getValue());
+                                    } else if (tableChange instanceof 
TableChange.ResetOption) {
+                                        TableChange.ResetOption resetChange =
+                                                (TableChange.ResetOption) 
tableChange;
+                                        return String.format("  RESET '%s'", 
resetChange.getKey());
+                                    } else {
+                                        throw new 
UnsupportedOperationException(
+                                                String.format(
+                                                        "Unknown table change: 
%s.", tableChange));
+                                    }
+                                })
+                        .collect(Collectors.joining(",\n"));
+        return String.format("ALTER TABLE %s\n%s", 
tableIdentifier.asSummaryString(), changes);
+    }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableOptionsOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableOptionsOperation.java
index b3385f2b049..4b505d1b1ab 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableOptionsOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableOptionsOperation.java
@@ -24,7 +24,12 @@ import org.apache.flink.table.operations.OperationUtils;
 
 import java.util.stream.Collectors;
 
-/** Operation to describe a ALTER TABLE .. SET .. statement. */
+/**
+ * Operation to describe a ALTER TABLE .. SET .. statement.
+ *
+ * @deprecated Please use {@link AlterTableChangeOperation} instead.
+ */
+@Deprecated
 public class AlterTableOptionsOperation extends AlterTableOperation {
     private final CatalogTable catalogTable;
 
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
index 31a65776e6c..420391b6e82 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
@@ -314,6 +314,33 @@ public interface Catalog {
     void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean 
ignoreIfNotExists)
             throws TableNotExistException, CatalogException;
 
+    /**
+     * Modifies an existing table or view. Note that the new and old {@link 
CatalogBaseTable} must
+     * be of the same kind. For example, this doesn't allow altering a regular 
table to partitioned
+     * table, or altering a view to a table, and vice versa.
+     *
+     * <p>The framework will make sure to call this method with fully 
validated {@link
+     * ResolvedCatalogTable} or {@link ResolvedCatalogView}. Those instances 
are easy to serialize
+     * for a durable catalog implementation.
+     *
+     * @param tablePath path of the table or view to be modified
+     * @param newTable the new table definition
+     * @param tableChanges change to describe the modification between the 
newTable and the original
+     *     table.
+     * @param ignoreIfNotExists flag to specify behavior when the table or 
view does not exist: if
+     *     set to false, throw an exception, if set to true, do nothing.
+     * @throws TableNotExistException if the table does not exist
+     * @throws CatalogException in case of any runtime exception
+     */
+    default void alterTable(
+            ObjectPath tablePath,
+            CatalogBaseTable newTable,
+            List<TableChange> tableChanges,
+            boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        alterTable(tablePath, newTable, ignoreIfNotExists);
+    }
+
     /** If true, tables which do not specify a connector will be translated to 
managed tables. */
     default boolean supportsManagedTable() {
         return false;
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java
new file mode 100644
index 00000000000..afb3bc13813
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Objects;
+
+/** {@link TableChange} represents the modification of the table. */
+@PublicEvolving
+public interface TableChange {
+
+    /**
+     * A table change to set the table option.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; SET '&lt;key&gt;' = '&lt;value&gt;';
+     * </pre>
+     *
+     * @param key the option name to set.
+     * @param value the option value to set.
+     * @return a TableChange represents the modification.
+     */
+    static SetOption set(String key, String value) {
+        return new SetOption(key, value);
+    }
+
+    /**
+     * A table change to reset the table option.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; RESET '&lt;key&gt;'
+     * </pre>
+     *
+     * @param key the option name to reset.
+     * @return a TableChange represents the modification.
+     */
+    static ResetOption reset(String key) {
+        return new ResetOption(key);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Property change
+    // 
--------------------------------------------------------------------------------------------
+
+    /**
+     * A table change to set the table option.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; SET '&lt;key&gt;' = '&lt;value&gt;';
+     * </pre>
+     */
+    @PublicEvolving
+    class SetOption implements TableChange {
+
+        private final String key;
+        private final String value;
+
+        private SetOption(String key, String value) {
+            this.key = key;
+            this.value = value;
+        }
+
+        /** Returns the Option key to set. */
+        public String getKey() {
+            return key;
+        }
+
+        /** Returns the Option value to set. */
+        public String getValue() {
+            return value;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof SetOption)) {
+                return false;
+            }
+            SetOption setOption = (SetOption) o;
+            return Objects.equals(key, setOption.key) && Objects.equals(value, 
setOption.value);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(key, value);
+        }
+
+        @Override
+        public String toString() {
+            return "SetOption{" + "key='" + key + '\'' + ", value='" + value + 
'\'' + '}';
+        }
+    }
+
+    /**
+     * A table change to reset the table option.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; RESET '&lt;key&gt;'
+     * </pre>
+     */
+    @PublicEvolving
+    class ResetOption implements TableChange {
+
+        private final String key;
+
+        public ResetOption(String key) {
+            this.key = key;
+        }
+
+        /** Returns the Option key to reset. */
+        public String getKey() {
+            return key;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof ResetOption)) {
+                return false;
+            }
+            ResetOption that = (ResetOption) o;
+            return Objects.equals(key, that.key);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(key);
+        }
+
+        @Override
+        public String toString() {
+            return "ResetOption{" + "key='" + key + '\'' + '}';
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 3fe2c32a324..237977c049c 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -109,6 +109,7 @@ import org.apache.flink.table.catalog.ManagedTableListener;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableChange;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
@@ -160,8 +161,8 @@ import 
org.apache.flink.table.operations.ddl.AddPartitionsOperation;
 import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
 import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
 import org.apache.flink.table.operations.ddl.AlterPartitionPropertiesOperation;
+import org.apache.flink.table.operations.ddl.AlterTableChangeOperation;
 import org.apache.flink.table.operations.ddl.AlterTableDropConstraintOperation;
-import org.apache.flink.table.operations.ddl.AlterTableOptionsOperation;
 import org.apache.flink.table.operations.ddl.AlterTableRenameOperation;
 import org.apache.flink.table.operations.ddl.AlterTableSchemaOperation;
 import org.apache.flink.table.operations.ddl.AlterViewAsOperation;
@@ -595,10 +596,16 @@ public class SqlToOperationConverter {
                     new CatalogPartitionImpl(newProps, 
catalogPartition.getComment()));
         } else {
             // it's altering a table
+            Map<String, String> changeOptions =
+                    
OperationConverterUtils.extractProperties(alterTableOptions.getPropertyList());
             Map<String, String> newOptions = new 
HashMap<>(oldTable.getOptions());
-            newOptions.putAll(
-                    
OperationConverterUtils.extractProperties(alterTableOptions.getPropertyList()));
-            return new AlterTableOptionsOperation(tableIdentifier, 
oldTable.copy(newOptions));
+            newOptions.putAll(changeOptions);
+            return new AlterTableChangeOperation(
+                    tableIdentifier,
+                    changeOptions.entrySet().stream()
+                            .map(entry -> TableChange.set(entry.getKey(), 
entry.getValue()))
+                            .collect(Collectors.toList()),
+                    oldTable.copy(newOptions));
         }
     }
 
@@ -618,7 +625,10 @@ public class SqlToOperationConverter {
         }
         // reset table option keys
         resetKeys.forEach(newOptions::remove);
-        return new AlterTableOptionsOperation(tableIdentifier, 
oldTable.copy(newOptions));
+        return new AlterTableChangeOperation(
+                tableIdentifier,
+                
resetKeys.stream().map(TableChange::reset).collect(Collectors.toList()),
+                oldTable.copy(newOptions));
     }
 
     /**
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index b2547bb4622..6d3bd447c65 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -46,6 +46,7 @@ import org.apache.flink.table.catalog.FunctionLanguage;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.TableChange;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
@@ -79,8 +80,8 @@ import 
org.apache.flink.table.operations.command.ResetOperation;
 import org.apache.flink.table.operations.command.SetOperation;
 import org.apache.flink.table.operations.command.ShowJarsOperation;
 import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
+import org.apache.flink.table.operations.ddl.AlterTableChangeOperation;
 import org.apache.flink.table.operations.ddl.AlterTableDropConstraintOperation;
-import org.apache.flink.table.operations.ddl.AlterTableOptionsOperation;
 import org.apache.flink.table.operations.ddl.AlterTableRenameOperation;
 import org.apache.flink.table.operations.ddl.AlterTableSchemaOperation;
 import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation;
@@ -1243,12 +1244,21 @@ public class SqlToOperationConverterTest {
         expectedOptions.put("k1", "v1");
         expectedOptions.put("K2", "V2");
 
-        assertAlterTableOptions(operation, expectedIdentifier, 
expectedOptions);
+        assertAlterTableOptions(
+                operation,
+                expectedIdentifier,
+                expectedOptions,
+                Arrays.asList(TableChange.set("k1", "v1"), 
TableChange.set("K2", "V2")),
+                "ALTER TABLE cat1.db1.tb1\n  SET 'k1' = 'v1',\n  SET 'K2' = 
'V2'");
 
         // test alter table reset
         operation = parse("alter table cat1.db1.tb1 reset ('k')");
         assertAlterTableOptions(
-                operation, expectedIdentifier, 
Collections.singletonMap("connector", "dummy"));
+                operation,
+                expectedIdentifier,
+                Collections.singletonMap("connector", "dummy"),
+                Collections.singletonList(TableChange.reset("k")),
+                "ALTER TABLE cat1.db1.tb1\n  RESET 'k'");
         assertThatThrownBy(() -> parse("alter table cat1.db1.tb1 reset 
('connector')"))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining("ALTER TABLE RESET does not support 
changing 'connector'");
@@ -2228,13 +2238,17 @@ public class SqlToOperationConverterTest {
     private void assertAlterTableOptions(
             Operation operation,
             ObjectIdentifier expectedIdentifier,
-            Map<String, String> expectedOptions) {
-        assertThat(operation).isInstanceOf(AlterTableOptionsOperation.class);
-        final AlterTableOptionsOperation alterTableOptionsOperation =
-                (AlterTableOptionsOperation) operation;
+            Map<String, String> expectedOptions,
+            List<TableChange> expectedChanges,
+            String expectedSummary) {
+        assertThat(operation).isInstanceOf(AlterTableChangeOperation.class);
+        final AlterTableChangeOperation alterTableOptionsOperation =
+                (AlterTableChangeOperation) operation;
         
assertThat(alterTableOptionsOperation.getTableIdentifier()).isEqualTo(expectedIdentifier);
-        assertThat(alterTableOptionsOperation.getCatalogTable().getOptions())
+        assertThat(alterTableOptionsOperation.getNewTable().getOptions())
                 .isEqualTo(expectedOptions);
+        
assertThat(expectedChanges).isEqualTo(alterTableOptionsOperation.getTableChanges());
+        
assertThat(alterTableOptionsOperation.asSummaryString()).isEqualTo(expectedSummary);
     }
 
     private void assertAlterTableSchema(

Reply via email to