This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 75a92efd7b3 [FLINK-30493][table-api] Introduce TableChange to
SET/RESET options (#21554)
75a92efd7b3 is described below
commit 75a92efd7b35501698e5de253e5231d680830c16
Author: Shengkai <[email protected]>
AuthorDate: Tue Dec 27 20:36:08 2022 +0800
[FLINK-30493][table-api] Introduce TableChange to SET/RESET options (#21554)
This closes #21554
---
.../hive/parse/HiveParserDDLSemanticAnalyzer.java | 10 +-
.../table/api/internal/TableEnvironmentImpl.java | 9 ++
.../apache/flink/table/catalog/CatalogManager.java | 24 +++
.../operations/ddl/AlterTableChangeOperation.java | 77 ++++++++++
.../operations/ddl/AlterTableOptionsOperation.java | 7 +-
.../org/apache/flink/table/catalog/Catalog.java | 27 ++++
.../apache/flink/table/catalog/TableChange.java | 164 +++++++++++++++++++++
.../operations/SqlToOperationConverter.java | 20 ++-
.../operations/SqlToOperationConverterTest.java | 30 +++-
9 files changed, 352 insertions(+), 16 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
index ad73f164020..b386a46b62b 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
@@ -47,6 +47,7 @@ import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.FunctionLanguage;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
@@ -73,7 +74,7 @@ import org.apache.flink.table.operations.UseDatabaseOperation;
import org.apache.flink.table.operations.ddl.AddPartitionsOperation;
import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
import org.apache.flink.table.operations.ddl.AlterPartitionPropertiesOperation;
-import org.apache.flink.table.operations.ddl.AlterTableOptionsOperation;
+import org.apache.flink.table.operations.ddl.AlterTableChangeOperation;
import org.apache.flink.table.operations.ddl.AlterTableRenameOperation;
import org.apache.flink.table.operations.ddl.AlterTableSchemaOperation;
import org.apache.flink.table.operations.ddl.AlterViewAsOperation;
@@ -1492,7 +1493,12 @@ public class HiveParserDDLSemanticAnalyzer {
} else {
props.putAll(oldTable.getOptions());
props.putAll(newProps);
- return new AlterTableOptionsOperation(tableIdentifier,
oldTable.copy(props));
+ return new AlterTableChangeOperation(
+ tableIdentifier,
+ newProps.entrySet().stream()
+ .map(entry -> TableChange.set(entry.getKey(),
entry.getValue()))
+ .collect(Collectors.toList()),
+ oldTable.copy(props));
}
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index d3e58d0cb19..bffbef65c1c 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -124,6 +124,7 @@ import
org.apache.flink.table.operations.ddl.AddPartitionsOperation;
import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
import org.apache.flink.table.operations.ddl.AlterPartitionPropertiesOperation;
+import org.apache.flink.table.operations.ddl.AlterTableChangeOperation;
import org.apache.flink.table.operations.ddl.AlterTableDropConstraintOperation;
import org.apache.flink.table.operations.ddl.AlterTableOperation;
import org.apache.flink.table.operations.ddl.AlterTableOptionsOperation;
@@ -1047,6 +1048,14 @@ public class TableEnvironmentImpl implements
TableEnvironmentInternal {
for (CatalogPartitionSpec spec :
dropPartitionsOperation.getPartitionSpecs()) {
catalog.dropPartition(tablePath, spec, ifExists);
}
+ } else if (alterTableOperation instanceof
AlterTableChangeOperation) {
+ AlterTableChangeOperation alterTableChangeOperation =
+ (AlterTableChangeOperation) alterTableOperation;
+ catalogManager.alterTable(
+ alterTableChangeOperation.getNewTable(),
+ alterTableChangeOperation.getTableChanges(),
+ alterTableChangeOperation.getTableIdentifier(),
+ false);
}
return TableResultImpl.TABLE_RESULT_OK;
} catch (TableAlreadyExistException | TableNotExistException e) {
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index 66b6dc2b4d8..09fbb880e85 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -806,6 +806,30 @@ public final class CatalogManager {
"AlterTable");
}
+ /**
+ * Alters a table in a given fully qualified path with table changes.
+ *
+ * @param table The table to put in the given path
+ * @param changes The table changes from the original table to the new
table.
+ * @param objectIdentifier The fully qualified path where to alter the
table.
+ * @param ignoreIfNotExists If false exception will be thrown if the table
or database or
+ * catalog to be altered does not exist.
+ */
+ public void alterTable(
+ CatalogBaseTable table,
+ List<TableChange> changes,
+ ObjectIdentifier objectIdentifier,
+ boolean ignoreIfNotExists) {
+ execute(
+ (catalog, path) -> {
+ final CatalogBaseTable resolvedTable =
resolveCatalogBaseTable(table);
+ catalog.alterTable(path, resolvedTable, changes,
ignoreIfNotExists);
+ },
+ objectIdentifier,
+ ignoreIfNotExists,
+ "AlterTable");
+ }
+
/**
* Drops a table in a given fully qualified path.
*
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java
new file mode 100644
index 00000000000..009d7e9a071
--- /dev/null
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.TableChange;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Alter table with new table definition and table changes represents the
modification. */
+public class AlterTableChangeOperation extends AlterTableOperation {
+
+ private final List<TableChange> tableChanges;
+ private final CatalogTable newTable;
+
+ public AlterTableChangeOperation(
+ ObjectIdentifier tableIdentifier,
+ List<TableChange> tableChanges,
+ CatalogTable newTable) {
+ super(tableIdentifier);
+ this.tableChanges = Collections.unmodifiableList(tableChanges);
+ this.newTable = newTable;
+ }
+
+ public List<TableChange> getTableChanges() {
+ return tableChanges;
+ }
+
+ public CatalogTable getNewTable() {
+ return newTable;
+ }
+
+ @Override
+ public String asSummaryString() {
+ String changes =
+ tableChanges.stream()
+ .map(
+ tableChange -> {
+ if (tableChange instanceof
TableChange.SetOption) {
+ TableChange.SetOption setChange =
+ (TableChange.SetOption)
tableChange;
+ return String.format(
+ " SET '%s' = '%s'",
+ setChange.getKey(),
setChange.getValue());
+ } else if (tableChange instanceof
TableChange.ResetOption) {
+ TableChange.ResetOption resetChange =
+ (TableChange.ResetOption)
tableChange;
+ return String.format(" RESET '%s'",
resetChange.getKey());
+ } else {
+ throw new
UnsupportedOperationException(
+ String.format(
+ "Unknown table change:
%s.", tableChange));
+ }
+ })
+ .collect(Collectors.joining(",\n"));
+ return String.format("ALTER TABLE %s\n%s",
tableIdentifier.asSummaryString(), changes);
+ }
+}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableOptionsOperation.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableOptionsOperation.java
index b3385f2b049..4b505d1b1ab 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableOptionsOperation.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableOptionsOperation.java
@@ -24,7 +24,12 @@ import org.apache.flink.table.operations.OperationUtils;
import java.util.stream.Collectors;
-/** Operation to describe a ALTER TABLE .. SET .. statement. */
+/**
+ * Operation to describe a ALTER TABLE .. SET .. statement.
+ *
+ * @deprecated Please use {@link AlterTableChangeOperation} instead.
+ */
+@Deprecated
public class AlterTableOptionsOperation extends AlterTableOperation {
private final CatalogTable catalogTable;
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
index 31a65776e6c..420391b6e82 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
@@ -314,6 +314,33 @@ public interface Catalog {
void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean
ignoreIfNotExists)
throws TableNotExistException, CatalogException;
+ /**
+ * Modifies an existing table or view. Note that the new and old {@link
CatalogBaseTable} must
+ * be of the same kind. For example, this doesn't allow altering a regular
table to partitioned
+ * table, or altering a view to a table, and vice versa.
+ *
+ * <p>The framework will make sure to call this method with fully
validated {@link
+ * ResolvedCatalogTable} or {@link ResolvedCatalogView}. Those instances
are easy to serialize
+ * for a durable catalog implementation.
+ *
+ * @param tablePath path of the table or view to be modified
+ * @param newTable the new table definition
+ * @param tableChanges change to describe the modification between the
newTable and the original
+ * table.
+ * @param ignoreIfNotExists flag to specify behavior when the table or
view does not exist: if
+ * set to false, throw an exception, if set to true, do nothing.
+ * @throws TableNotExistException if the table does not exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ default void alterTable(
+ ObjectPath tablePath,
+ CatalogBaseTable newTable,
+ List<TableChange> tableChanges,
+ boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ alterTable(tablePath, newTable, ignoreIfNotExists);
+ }
+
/** If true, tables which do not specify a connector will be translated to
managed tables. */
default boolean supportsManagedTable() {
return false;
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java
new file mode 100644
index 00000000000..afb3bc13813
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Objects;
+
+/** {@link TableChange} represents the modification of the table. */
+@PublicEvolving
+public interface TableChange {
+
+ /**
+ * A table change to set the table option.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> SET '<key>' = '<value>';
+ * </pre>
+ *
+ * @param key the option name to set.
+ * @param value the option value to set.
+ * @return a TableChange represents the modification.
+ */
+ static SetOption set(String key, String value) {
+ return new SetOption(key, value);
+ }
+
+ /**
+ * A table change to reset the table option.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> RESET '<key>'
+ * </pre>
+ *
+ * @param key the option name to reset.
+ * @return a TableChange represents the modification.
+ */
+ static ResetOption reset(String key) {
+ return new ResetOption(key);
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Property change
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * A table change to set the table option.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> SET '<key>' = '<value>';
+ * </pre>
+ */
+ @PublicEvolving
+ class SetOption implements TableChange {
+
+ private final String key;
+ private final String value;
+
+ private SetOption(String key, String value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ /** Returns the Option key to set. */
+ public String getKey() {
+ return key;
+ }
+
+ /** Returns the Option value to set. */
+ public String getValue() {
+ return value;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof SetOption)) {
+ return false;
+ }
+ SetOption setOption = (SetOption) o;
+ return Objects.equals(key, setOption.key) && Objects.equals(value,
setOption.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(key, value);
+ }
+
+ @Override
+ public String toString() {
+ return "SetOption{" + "key='" + key + '\'' + ", value='" + value +
'\'' + '}';
+ }
+ }
+
+ /**
+ * A table change to reset the table option.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> RESET '<key>'
+ * </pre>
+ */
+ @PublicEvolving
+ class ResetOption implements TableChange {
+
+ private final String key;
+
+ public ResetOption(String key) {
+ this.key = key;
+ }
+
+ /** Returns the Option key to reset. */
+ public String getKey() {
+ return key;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof ResetOption)) {
+ return false;
+ }
+ ResetOption that = (ResetOption) o;
+ return Objects.equals(key, that.key);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(key);
+ }
+
+ @Override
+ public String toString() {
+ return "ResetOption{" + "key='" + key + '\'' + '}';
+ }
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 3fe2c32a324..237977c049c 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -109,6 +109,7 @@ import org.apache.flink.table.catalog.ManagedTableListener;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
@@ -160,8 +161,8 @@ import
org.apache.flink.table.operations.ddl.AddPartitionsOperation;
import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
import org.apache.flink.table.operations.ddl.AlterPartitionPropertiesOperation;
+import org.apache.flink.table.operations.ddl.AlterTableChangeOperation;
import org.apache.flink.table.operations.ddl.AlterTableDropConstraintOperation;
-import org.apache.flink.table.operations.ddl.AlterTableOptionsOperation;
import org.apache.flink.table.operations.ddl.AlterTableRenameOperation;
import org.apache.flink.table.operations.ddl.AlterTableSchemaOperation;
import org.apache.flink.table.operations.ddl.AlterViewAsOperation;
@@ -595,10 +596,16 @@ public class SqlToOperationConverter {
new CatalogPartitionImpl(newProps,
catalogPartition.getComment()));
} else {
// it's altering a table
+ Map<String, String> changeOptions =
+
OperationConverterUtils.extractProperties(alterTableOptions.getPropertyList());
Map<String, String> newOptions = new
HashMap<>(oldTable.getOptions());
- newOptions.putAll(
-
OperationConverterUtils.extractProperties(alterTableOptions.getPropertyList()));
- return new AlterTableOptionsOperation(tableIdentifier,
oldTable.copy(newOptions));
+ newOptions.putAll(changeOptions);
+ return new AlterTableChangeOperation(
+ tableIdentifier,
+ changeOptions.entrySet().stream()
+ .map(entry -> TableChange.set(entry.getKey(),
entry.getValue()))
+ .collect(Collectors.toList()),
+ oldTable.copy(newOptions));
}
}
@@ -618,7 +625,10 @@ public class SqlToOperationConverter {
}
// reset table option keys
resetKeys.forEach(newOptions::remove);
- return new AlterTableOptionsOperation(tableIdentifier,
oldTable.copy(newOptions));
+ return new AlterTableChangeOperation(
+ tableIdentifier,
+
resetKeys.stream().map(TableChange::reset).collect(Collectors.toList()),
+ oldTable.copy(newOptions));
}
/**
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index b2547bb4622..6d3bd447c65 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -46,6 +46,7 @@ import org.apache.flink.table.catalog.FunctionLanguage;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
@@ -79,8 +80,8 @@ import
org.apache.flink.table.operations.command.ResetOperation;
import org.apache.flink.table.operations.command.SetOperation;
import org.apache.flink.table.operations.command.ShowJarsOperation;
import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
+import org.apache.flink.table.operations.ddl.AlterTableChangeOperation;
import org.apache.flink.table.operations.ddl.AlterTableDropConstraintOperation;
-import org.apache.flink.table.operations.ddl.AlterTableOptionsOperation;
import org.apache.flink.table.operations.ddl.AlterTableRenameOperation;
import org.apache.flink.table.operations.ddl.AlterTableSchemaOperation;
import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation;
@@ -1243,12 +1244,21 @@ public class SqlToOperationConverterTest {
expectedOptions.put("k1", "v1");
expectedOptions.put("K2", "V2");
- assertAlterTableOptions(operation, expectedIdentifier,
expectedOptions);
+ assertAlterTableOptions(
+ operation,
+ expectedIdentifier,
+ expectedOptions,
+ Arrays.asList(TableChange.set("k1", "v1"),
TableChange.set("K2", "V2")),
+ "ALTER TABLE cat1.db1.tb1\n SET 'k1' = 'v1',\n SET 'K2' =
'V2'");
// test alter table reset
operation = parse("alter table cat1.db1.tb1 reset ('k')");
assertAlterTableOptions(
- operation, expectedIdentifier,
Collections.singletonMap("connector", "dummy"));
+ operation,
+ expectedIdentifier,
+ Collections.singletonMap("connector", "dummy"),
+ Collections.singletonList(TableChange.reset("k")),
+ "ALTER TABLE cat1.db1.tb1\n RESET 'k'");
assertThatThrownBy(() -> parse("alter table cat1.db1.tb1 reset
('connector')"))
.isInstanceOf(ValidationException.class)
.hasMessageContaining("ALTER TABLE RESET does not support
changing 'connector'");
@@ -2228,13 +2238,17 @@ public class SqlToOperationConverterTest {
private void assertAlterTableOptions(
Operation operation,
ObjectIdentifier expectedIdentifier,
- Map<String, String> expectedOptions) {
- assertThat(operation).isInstanceOf(AlterTableOptionsOperation.class);
- final AlterTableOptionsOperation alterTableOptionsOperation =
- (AlterTableOptionsOperation) operation;
+ Map<String, String> expectedOptions,
+ List<TableChange> expectedChanges,
+ String expectedSummary) {
+ assertThat(operation).isInstanceOf(AlterTableChangeOperation.class);
+ final AlterTableChangeOperation alterTableOptionsOperation =
+ (AlterTableChangeOperation) operation;
assertThat(alterTableOptionsOperation.getTableIdentifier()).isEqualTo(expectedIdentifier);
- assertThat(alterTableOptionsOperation.getCatalogTable().getOptions())
+ assertThat(alterTableOptionsOperation.getNewTable().getOptions())
.isEqualTo(expectedOptions);
+
assertThat(expectedChanges).isEqualTo(alterTableOptionsOperation.getTableChanges());
+
assertThat(alterTableOptionsOperation.asSummaryString()).isEqualTo(expectedSummary);
}
private void assertAlterTableSchema(