AHeise commented on code in PR #27488:
URL: https://github.com/apache/flink/pull/27488#discussion_r2741077991
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java:
##########
@@ -77,24 +158,343 @@ public String asSummaryString() {
}
private static String toString(TableChange tableChange) {
- if (tableChange instanceof TableChange.ModifyRefreshStatus) {
- TableChange.ModifyRefreshStatus refreshStatus =
- (TableChange.ModifyRefreshStatus) tableChange;
+ if (tableChange instanceof ModifyRefreshStatus) {
+ ModifyRefreshStatus refreshStatus = (ModifyRefreshStatus)
tableChange;
return String.format(
" MODIFY REFRESH STATUS TO '%s'",
refreshStatus.getRefreshStatus());
- } else if (tableChange instanceof TableChange.ModifyRefreshHandler) {
- TableChange.ModifyRefreshHandler refreshHandler =
- (TableChange.ModifyRefreshHandler) tableChange;
+ } else if (tableChange instanceof ModifyRefreshHandler) {
+ ModifyRefreshHandler refreshHandler = (ModifyRefreshHandler)
tableChange;
return String.format(
" MODIFY REFRESH HANDLER DESCRIPTION TO '%s'",
refreshHandler.getRefreshHandlerDesc());
- } else if (tableChange instanceof TableChange.ModifyDefinitionQuery) {
- TableChange.ModifyDefinitionQuery definitionQuery =
- (TableChange.ModifyDefinitionQuery) tableChange;
+ } else if (tableChange instanceof ModifyDefinitionQuery) {
+ ModifyDefinitionQuery definitionQuery = (ModifyDefinitionQuery)
tableChange;
return String.format(
" MODIFY DEFINITION QUERY TO '%s'",
definitionQuery.getDefinitionQuery());
} else {
return AlterTableChangeOperation.toString(tableChange);
}
}
+
+ private static class MTContext {
+ private boolean isQueryChange;
+ private @Nullable TableDistribution distribution;
+ private RefreshStatus refreshStatus;
+ private @Nullable String refreshHandlerDesc;
+ private byte[] refreshHandlerBytes;
+ private List<UnresolvedColumn> columns;
+ private List<UnresolvedWatermarkSpec> watermarkSpecs;
+ private String primaryKeyName = null;
+ private List<String> primaryKeyColumns = null;
+ private int droppedPersistedCnt = 0;
+ private String originalQuery;
+ private String expandedQuery;
+ private CatalogMaterializedTable oldTable;
+
+ public MTContext(CatalogMaterializedTable oldTable) {
+ this.distribution = oldTable.getDistribution().orElse(null);
+ this.refreshStatus = oldTable.getRefreshStatus();
+ this.refreshHandlerDesc =
oldTable.getRefreshHandlerDescription().orElse(null);
+ this.refreshHandlerBytes = oldTable.getSerializedRefreshHandler();
+ this.watermarkSpecs =
oldTable.getUnresolvedSchema().getWatermarkSpecs();
+ this.columns = new
LinkedList<>(oldTable.getUnresolvedSchema().getColumns());
+ Schema.UnresolvedPrimaryKey primaryKey =
+
oldTable.getUnresolvedSchema().getPrimaryKey().orElse(null);
+ if (primaryKey != null) {
+ this.primaryKeyName = primaryKey.getConstraintName();
+ this.primaryKeyColumns = primaryKey.getColumnNames();
+ }
+ originalQuery = oldTable.getOriginalQuery();
+ expandedQuery = oldTable.getExpandedQuery();
+ this.oldTable = oldTable;
+ }
+
+ private void setColumnAtPosition(UnresolvedColumn column,
ColumnPosition position) {
+ if (position == null) {
+ columns.add(column);
+ } else if (position == ColumnPosition.first()) {
+ columns.add(0, column);
+ } else {
+ String after = ((After) position).column();
+ int index = getColumnIndex(after);
+
+ columns.add(index + 1, column);
+ }
+ }
+
+ private int getColumnIndex(String name) {
+ for (int i = 0; i < columns.size(); i++) {
+ if (Objects.equals(name, columns.get(i).getName())) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ private void applyTableChanges(List<TableChange> tableChanges) {
+ isQueryChange = tableChanges.stream().anyMatch(t -> t instanceof
ModifyDefinitionQuery);
+ Schema oldSchema = oldTable.getUnresolvedSchema();
+ if (isQueryChange) {
+ checkForChangedPositionByQuery(tableChanges, oldSchema);
+ }
+
+ for (TableChange tableChange : tableChanges) {
+ handleColumns(tableChange);
+ handleDistribution(tableChange);
+ handleWatermark(tableChange);
+ handleUniqueConstraint(tableChange);
+ handleRefreshStatus(tableChange);
+ handleRefreshHandler(tableChange);
+ handleModifyDefinitionQuery(tableChange);
Review Comment:
We could return true to short cut the handles on the first successful
invocation.
Alterantively, we could build nested Handler classes that provide a list of
supported change classes. Then we can have an index from type -> handler.
Going further we could have just 1 handler per change class, where we use a
generic type to capture the handled change and automatically extract the change
class to builld the inex.
WDYT?
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java:
##########
@@ -77,24 +158,343 @@ public String asSummaryString() {
}
private static String toString(TableChange tableChange) {
- if (tableChange instanceof TableChange.ModifyRefreshStatus) {
- TableChange.ModifyRefreshStatus refreshStatus =
- (TableChange.ModifyRefreshStatus) tableChange;
+ if (tableChange instanceof ModifyRefreshStatus) {
+ ModifyRefreshStatus refreshStatus = (ModifyRefreshStatus)
tableChange;
return String.format(
" MODIFY REFRESH STATUS TO '%s'",
refreshStatus.getRefreshStatus());
- } else if (tableChange instanceof TableChange.ModifyRefreshHandler) {
- TableChange.ModifyRefreshHandler refreshHandler =
- (TableChange.ModifyRefreshHandler) tableChange;
+ } else if (tableChange instanceof ModifyRefreshHandler) {
+ ModifyRefreshHandler refreshHandler = (ModifyRefreshHandler)
tableChange;
return String.format(
" MODIFY REFRESH HANDLER DESCRIPTION TO '%s'",
refreshHandler.getRefreshHandlerDesc());
- } else if (tableChange instanceof TableChange.ModifyDefinitionQuery) {
- TableChange.ModifyDefinitionQuery definitionQuery =
- (TableChange.ModifyDefinitionQuery) tableChange;
+ } else if (tableChange instanceof ModifyDefinitionQuery) {
+ ModifyDefinitionQuery definitionQuery = (ModifyDefinitionQuery)
tableChange;
return String.format(
" MODIFY DEFINITION QUERY TO '%s'",
definitionQuery.getDefinitionQuery());
} else {
return AlterTableChangeOperation.toString(tableChange);
}
}
+
+ private static class MTContext {
+ private boolean isQueryChange;
+ private @Nullable TableDistribution distribution;
+ private RefreshStatus refreshStatus;
+ private @Nullable String refreshHandlerDesc;
+ private byte[] refreshHandlerBytes;
+ private List<UnresolvedColumn> columns;
+ private List<UnresolvedWatermarkSpec> watermarkSpecs;
+ private String primaryKeyName = null;
+ private List<String> primaryKeyColumns = null;
+ private int droppedPersistedCnt = 0;
+ private String originalQuery;
+ private String expandedQuery;
+ private CatalogMaterializedTable oldTable;
+
+ public MTContext(CatalogMaterializedTable oldTable) {
+ this.distribution = oldTable.getDistribution().orElse(null);
+ this.refreshStatus = oldTable.getRefreshStatus();
+ this.refreshHandlerDesc =
oldTable.getRefreshHandlerDescription().orElse(null);
+ this.refreshHandlerBytes = oldTable.getSerializedRefreshHandler();
+ this.watermarkSpecs =
oldTable.getUnresolvedSchema().getWatermarkSpecs();
+ this.columns = new
LinkedList<>(oldTable.getUnresolvedSchema().getColumns());
+ Schema.UnresolvedPrimaryKey primaryKey =
+
oldTable.getUnresolvedSchema().getPrimaryKey().orElse(null);
+ if (primaryKey != null) {
+ this.primaryKeyName = primaryKey.getConstraintName();
+ this.primaryKeyColumns = primaryKey.getColumnNames();
+ }
+ originalQuery = oldTable.getOriginalQuery();
+ expandedQuery = oldTable.getExpandedQuery();
+ this.oldTable = oldTable;
+ }
+
+ private void setColumnAtPosition(UnresolvedColumn column,
ColumnPosition position) {
+ if (position == null) {
+ columns.add(column);
+ } else if (position == ColumnPosition.first()) {
+ columns.add(0, column);
+ } else {
+ String after = ((After) position).column();
+ int index = getColumnIndex(after);
+
+ columns.add(index + 1, column);
+ }
+ }
+
+ private int getColumnIndex(String name) {
+ for (int i = 0; i < columns.size(); i++) {
+ if (Objects.equals(name, columns.get(i).getName())) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ private void applyTableChanges(List<TableChange> tableChanges) {
+ isQueryChange = tableChanges.stream().anyMatch(t -> t instanceof
ModifyDefinitionQuery);
+ Schema oldSchema = oldTable.getUnresolvedSchema();
+ if (isQueryChange) {
+ checkForChangedPositionByQuery(tableChanges, oldSchema);
+ }
+
+ for (TableChange tableChange : tableChanges) {
+ handleColumns(tableChange);
+ handleDistribution(tableChange);
+ handleWatermark(tableChange);
+ handleUniqueConstraint(tableChange);
+ handleRefreshStatus(tableChange);
+ handleRefreshHandler(tableChange);
+ handleModifyDefinitionQuery(tableChange);
+ }
+
+ if (droppedPersistedCnt > 0 && isQueryChange) {
+ final int schemaSize = oldSchema.getColumns().size();
+ throw new ValidationException(
+ String.format(
+ "Failed to modify query because drop column is
unsupported. "
+ + "When modifying a query, you can
only append new columns at the end of original schema. "
+ + "The original schema has %d columns,
but the newly derived schema from the query has %d columns.",
+ schemaSize, schemaSize - droppedPersistedCnt));
+ }
+ }
+
+ private Schema retrieveSchema() {
+ Schema.Builder schemaToApply =
Schema.newBuilder().fromColumns(columns);
+ if (primaryKeyColumns != null) {
+ if (primaryKeyName == null) {
+ schemaToApply.primaryKey(primaryKeyColumns);
+ } else {
+ schemaToApply.primaryKeyNamed(primaryKeyName,
primaryKeyColumns);
+ }
+ }
+
+ for (UnresolvedWatermarkSpec spec : watermarkSpecs) {
+ schemaToApply.watermark(spec.getColumnName(),
spec.getWatermarkExpression());
+ }
+ return schemaToApply.build();
+ }
+
+ private void handleColumns(TableChange tableChange) {
+ if (tableChange instanceof AddColumn) {
+ AddColumn addColumn = (AddColumn) tableChange;
+ Column column = addColumn.getColumn();
+ ColumnPosition position = addColumn.getPosition();
+ UnresolvedColumn columnToAdd = toUnresolvedColumn(column);
+ setColumnAtPosition(columnToAdd, position);
+ } else if (tableChange instanceof ModifyColumnPosition) {
+ // ModifyColumnPosition extends ModifyColumn so it should be
before ModifyColumn
+ ModifyColumnPosition columnWithChangedPosition =
(ModifyColumnPosition) tableChange;
+ Column column = columnWithChangedPosition.getOldColumn();
+ int oldPosition = getColumnIndex(column.getName());
+ if (isQueryChange) {
+ throw new ValidationException(
+ String.format(
+ "When modifying the query of a
materialized table, "
+ + "currently only support
appending columns at the end of original schema, dropping, renaming, and
reordering columns are not supported.\n"
+ + "Column mismatch at position %d:
Original column is [%s], but new column is [%s].",
+ oldPosition, column, column));
+ }
+
+ ColumnPosition position =
columnWithChangedPosition.getNewPosition();
+ UnresolvedColumn changedPositionColumn =
columns.get(oldPosition);
+ columns.remove(oldPosition);
+ setColumnAtPosition(changedPositionColumn, position);
+ } else if (tableChange instanceof DropColumn) {
+ DropColumn dropColumn = (DropColumn) tableChange;
+ String droppedColumnName = dropColumn.getColumnName();
+ int index = getColumnIndex(droppedColumnName);
+ UnresolvedColumn column = columns.get(index);
+ if (isQueryChange && isNonPersistedColumn(column)) {
+ // noop
+ } else {
+ columns.remove(index);
+ droppedPersistedCnt++;
+ }
+ } else if (tableChange instanceof ModifyColumn) {
+ ModifyColumn type = (ModifyColumn) tableChange;
+ Column column = type.getOldColumn();
+ Column newColumn = type.getNewColumn();
+ int index = getColumnIndex(column.getName());
+ UnresolvedColumn newColumn1 = toUnresolvedColumn(newColumn);
+ columns.set(index, newColumn1);
+ }
+ }
+
+ private void handleModifyDefinitionQuery(TableChange tableChange) {
+ if (tableChange instanceof ModifyDefinitionQuery) {
+ ModifyDefinitionQuery queryChange = (ModifyDefinitionQuery)
tableChange;
+ expandedQuery = queryChange.getDefinitionQuery();
+ originalQuery = queryChange.getOriginalQuery();
+ }
+ }
+
+ private boolean isNonPersistedColumn(UnresolvedColumn column) {
+ return column instanceof UnresolvedComputedColumn
+ || column instanceof UnresolvedMetadataColumn
+ && ((UnresolvedMetadataColumn) column).isVirtual();
+ }
+
+ private void handleUniqueConstraint(TableChange tableChange) {
+ if (tableChange instanceof DropConstraint) {
+ primaryKeyName = null;
+ primaryKeyColumns = null;
Review Comment:
Do you need to verify if the drop constraint is actually for that particular
primary key? Or is this always the case?
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java:
##########
@@ -36,30 +78,69 @@
public class AlterMaterializedTableChangeOperation extends
AlterMaterializedTableOperation {
private final List<TableChange> tableChanges;
- private final CatalogMaterializedTable catalogMaterializedTable;
+ private final CatalogMaterializedTable oldTable;
+ private CatalogMaterializedTable materializedTableWithAppliedChanges;
public AlterMaterializedTableChangeOperation(
ObjectIdentifier tableIdentifier,
List<TableChange> tableChanges,
+ CatalogMaterializedTable oldTable) {
+ this(tableIdentifier, tableChanges, oldTable, null);
+ }
+
+ private AlterMaterializedTableChangeOperation(
+ ObjectIdentifier tableIdentifier,
+ List<TableChange> tableChanges,
+ CatalogMaterializedTable oldTable,
CatalogMaterializedTable catalogMaterializedTable) {
super(tableIdentifier);
this.tableChanges = tableChanges;
- this.catalogMaterializedTable = catalogMaterializedTable;
+ this.oldTable = oldTable;
+ this.materializedTableWithAppliedChanges = catalogMaterializedTable;
}
public List<TableChange> getTableChanges() {
return tableChanges;
}
- public CatalogMaterializedTable getCatalogMaterializedTable() {
- return catalogMaterializedTable;
+ public AlterMaterializedTableChangeOperation copyAsTableChangeOperation() {
+ return new AlterMaterializedTableChangeOperation(
+ tableIdentifier, tableChanges, oldTable,
materializedTableWithAppliedChanges);
+ }
+
+ public CatalogMaterializedTable getMaterializedTableWithAppliedChanges() {
+ if (oldTable == null) {
+ return materializedTableWithAppliedChanges;
+ }
+
+ MTContext mtContext = new MTContext(oldTable);
+ mtContext.applyTableChanges(tableChanges);
+
+ materializedTableWithAppliedChanges =
+ CatalogMaterializedTable.newBuilder()
+ .schema(mtContext.retrieveSchema())
+ .comment(oldTable.getComment())
+ .partitionKeys(oldTable.getPartitionKeys())
+ .options(oldTable.getOptions())
Review Comment:
Does that mean that we currently don't support changing these fields?
(I'm fine if this is also the current behavior)
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java:
##########
@@ -77,24 +158,343 @@ public String asSummaryString() {
}
private static String toString(TableChange tableChange) {
- if (tableChange instanceof TableChange.ModifyRefreshStatus) {
- TableChange.ModifyRefreshStatus refreshStatus =
- (TableChange.ModifyRefreshStatus) tableChange;
+ if (tableChange instanceof ModifyRefreshStatus) {
+ ModifyRefreshStatus refreshStatus = (ModifyRefreshStatus)
tableChange;
return String.format(
" MODIFY REFRESH STATUS TO '%s'",
refreshStatus.getRefreshStatus());
- } else if (tableChange instanceof TableChange.ModifyRefreshHandler) {
- TableChange.ModifyRefreshHandler refreshHandler =
- (TableChange.ModifyRefreshHandler) tableChange;
+ } else if (tableChange instanceof ModifyRefreshHandler) {
+ ModifyRefreshHandler refreshHandler = (ModifyRefreshHandler)
tableChange;
return String.format(
" MODIFY REFRESH HANDLER DESCRIPTION TO '%s'",
refreshHandler.getRefreshHandlerDesc());
- } else if (tableChange instanceof TableChange.ModifyDefinitionQuery) {
- TableChange.ModifyDefinitionQuery definitionQuery =
- (TableChange.ModifyDefinitionQuery) tableChange;
+ } else if (tableChange instanceof ModifyDefinitionQuery) {
+ ModifyDefinitionQuery definitionQuery = (ModifyDefinitionQuery)
tableChange;
return String.format(
" MODIFY DEFINITION QUERY TO '%s'",
definitionQuery.getDefinitionQuery());
} else {
return AlterTableChangeOperation.toString(tableChange);
}
}
+
+ private static class MTContext {
Review Comment:
```suggestion
private static class ChangeContext {
```
?
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java:
##########
@@ -1188,7 +1207,7 @@ private static Collection<TestSpec>
alterQuerySuccessCase() {
"ALTER MATERIALIZED TABLE base_mtbl_with_non_persisted
AS SELECT 1",
"(\n"
+ " `m` STRING METADATA VIRTUAL,\n"
- + " `calc` AS 'a' || 'b',\n"
+ + " `calc` AS ['a' || 'b'],\n"
Review Comment:
Why are these changes necessary?
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java:
##########
@@ -77,24 +158,343 @@ public String asSummaryString() {
}
private static String toString(TableChange tableChange) {
- if (tableChange instanceof TableChange.ModifyRefreshStatus) {
- TableChange.ModifyRefreshStatus refreshStatus =
- (TableChange.ModifyRefreshStatus) tableChange;
+ if (tableChange instanceof ModifyRefreshStatus) {
+ ModifyRefreshStatus refreshStatus = (ModifyRefreshStatus)
tableChange;
return String.format(
" MODIFY REFRESH STATUS TO '%s'",
refreshStatus.getRefreshStatus());
- } else if (tableChange instanceof TableChange.ModifyRefreshHandler) {
- TableChange.ModifyRefreshHandler refreshHandler =
- (TableChange.ModifyRefreshHandler) tableChange;
+ } else if (tableChange instanceof ModifyRefreshHandler) {
+ ModifyRefreshHandler refreshHandler = (ModifyRefreshHandler)
tableChange;
return String.format(
" MODIFY REFRESH HANDLER DESCRIPTION TO '%s'",
refreshHandler.getRefreshHandlerDesc());
- } else if (tableChange instanceof TableChange.ModifyDefinitionQuery) {
- TableChange.ModifyDefinitionQuery definitionQuery =
- (TableChange.ModifyDefinitionQuery) tableChange;
+ } else if (tableChange instanceof ModifyDefinitionQuery) {
+ ModifyDefinitionQuery definitionQuery = (ModifyDefinitionQuery)
tableChange;
return String.format(
" MODIFY DEFINITION QUERY TO '%s'",
definitionQuery.getDefinitionQuery());
} else {
return AlterTableChangeOperation.toString(tableChange);
}
}
+
+ private static class MTContext {
+ private boolean isQueryChange;
+ private @Nullable TableDistribution distribution;
+ private RefreshStatus refreshStatus;
+ private @Nullable String refreshHandlerDesc;
+ private byte[] refreshHandlerBytes;
+ private List<UnresolvedColumn> columns;
+ private List<UnresolvedWatermarkSpec> watermarkSpecs;
+ private String primaryKeyName = null;
+ private List<String> primaryKeyColumns = null;
+ private int droppedPersistedCnt = 0;
+ private String originalQuery;
+ private String expandedQuery;
+ private CatalogMaterializedTable oldTable;
+
+ public MTContext(CatalogMaterializedTable oldTable) {
+ this.distribution = oldTable.getDistribution().orElse(null);
+ this.refreshStatus = oldTable.getRefreshStatus();
+ this.refreshHandlerDesc =
oldTable.getRefreshHandlerDescription().orElse(null);
+ this.refreshHandlerBytes = oldTable.getSerializedRefreshHandler();
+ this.watermarkSpecs =
oldTable.getUnresolvedSchema().getWatermarkSpecs();
+ this.columns = new
LinkedList<>(oldTable.getUnresolvedSchema().getColumns());
+ Schema.UnresolvedPrimaryKey primaryKey =
+
oldTable.getUnresolvedSchema().getPrimaryKey().orElse(null);
+ if (primaryKey != null) {
+ this.primaryKeyName = primaryKey.getConstraintName();
+ this.primaryKeyColumns = primaryKey.getColumnNames();
+ }
+ originalQuery = oldTable.getOriginalQuery();
+ expandedQuery = oldTable.getExpandedQuery();
+ this.oldTable = oldTable;
+ }
+
+ private void setColumnAtPosition(UnresolvedColumn column,
ColumnPosition position) {
+ if (position == null) {
+ columns.add(column);
+ } else if (position == ColumnPosition.first()) {
+ columns.add(0, column);
+ } else {
+ String after = ((After) position).column();
+ int index = getColumnIndex(after);
+
+ columns.add(index + 1, column);
+ }
+ }
+
+ private int getColumnIndex(String name) {
+ for (int i = 0; i < columns.size(); i++) {
+ if (Objects.equals(name, columns.get(i).getName())) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ private void applyTableChanges(List<TableChange> tableChanges) {
+ isQueryChange = tableChanges.stream().anyMatch(t -> t instanceof
ModifyDefinitionQuery);
+ Schema oldSchema = oldTable.getUnresolvedSchema();
+ if (isQueryChange) {
+ checkForChangedPositionByQuery(tableChanges, oldSchema);
+ }
+
+ for (TableChange tableChange : tableChanges) {
+ handleColumns(tableChange);
+ handleDistribution(tableChange);
+ handleWatermark(tableChange);
+ handleUniqueConstraint(tableChange);
+ handleRefreshStatus(tableChange);
+ handleRefreshHandler(tableChange);
+ handleModifyDefinitionQuery(tableChange);
+ }
+
+ if (droppedPersistedCnt > 0 && isQueryChange) {
+ final int schemaSize = oldSchema.getColumns().size();
+ throw new ValidationException(
+ String.format(
+ "Failed to modify query because drop column is
unsupported. "
+ + "When modifying a query, you can
only append new columns at the end of original schema. "
+ + "The original schema has %d columns,
but the newly derived schema from the query has %d columns.",
+ schemaSize, schemaSize - droppedPersistedCnt));
+ }
+ }
+
+ private Schema retrieveSchema() {
+ Schema.Builder schemaToApply =
Schema.newBuilder().fromColumns(columns);
+ if (primaryKeyColumns != null) {
+ if (primaryKeyName == null) {
+ schemaToApply.primaryKey(primaryKeyColumns);
+ } else {
+ schemaToApply.primaryKeyNamed(primaryKeyName,
primaryKeyColumns);
+ }
+ }
+
+ for (UnresolvedWatermarkSpec spec : watermarkSpecs) {
+ schemaToApply.watermark(spec.getColumnName(),
spec.getWatermarkExpression());
+ }
+ return schemaToApply.build();
+ }
+
+ private void handleColumns(TableChange tableChange) {
+ if (tableChange instanceof AddColumn) {
+ AddColumn addColumn = (AddColumn) tableChange;
+ Column column = addColumn.getColumn();
+ ColumnPosition position = addColumn.getPosition();
+ UnresolvedColumn columnToAdd = toUnresolvedColumn(column);
+ setColumnAtPosition(columnToAdd, position);
+ } else if (tableChange instanceof ModifyColumnPosition) {
+ // ModifyColumnPosition extends ModifyColumn so it should be
before ModifyColumn
+ ModifyColumnPosition columnWithChangedPosition =
(ModifyColumnPosition) tableChange;
+ Column column = columnWithChangedPosition.getOldColumn();
+ int oldPosition = getColumnIndex(column.getName());
+ if (isQueryChange) {
+ throw new ValidationException(
+ String.format(
+ "When modifying the query of a
materialized table, "
+ + "currently only support
appending columns at the end of original schema, dropping, renaming, and
reordering columns are not supported.\n"
+ + "Column mismatch at position %d:
Original column is [%s], but new column is [%s].",
+ oldPosition, column, column));
+ }
+
+ ColumnPosition position =
columnWithChangedPosition.getNewPosition();
+ UnresolvedColumn changedPositionColumn =
columns.get(oldPosition);
+ columns.remove(oldPosition);
+ setColumnAtPosition(changedPositionColumn, position);
+ } else if (tableChange instanceof DropColumn) {
+ DropColumn dropColumn = (DropColumn) tableChange;
+ String droppedColumnName = dropColumn.getColumnName();
+ int index = getColumnIndex(droppedColumnName);
+ UnresolvedColumn column = columns.get(index);
+ if (isQueryChange && isNonPersistedColumn(column)) {
+ // noop
+ } else {
+ columns.remove(index);
+ droppedPersistedCnt++;
+ }
+ } else if (tableChange instanceof ModifyColumn) {
+ ModifyColumn type = (ModifyColumn) tableChange;
+ Column column = type.getOldColumn();
+ Column newColumn = type.getNewColumn();
+ int index = getColumnIndex(column.getName());
+ UnresolvedColumn newColumn1 = toUnresolvedColumn(newColumn);
+ columns.set(index, newColumn1);
+ }
+ }
+
+ private void handleModifyDefinitionQuery(TableChange tableChange) {
+ if (tableChange instanceof ModifyDefinitionQuery) {
+ ModifyDefinitionQuery queryChange = (ModifyDefinitionQuery)
tableChange;
+ expandedQuery = queryChange.getDefinitionQuery();
+ originalQuery = queryChange.getOriginalQuery();
+ }
+ }
+
+ private boolean isNonPersistedColumn(UnresolvedColumn column) {
+ return column instanceof UnresolvedComputedColumn
+ || column instanceof UnresolvedMetadataColumn
+ && ((UnresolvedMetadataColumn) column).isVirtual();
+ }
+
+ private void handleUniqueConstraint(TableChange tableChange) {
+ if (tableChange instanceof DropConstraint) {
+ primaryKeyName = null;
+ primaryKeyColumns = null;
Review Comment:
Do we need to sequence the changes somehow? If I change the PK from pk1 to
pk2, do I get a DropConstraint and an AddUniqueConstraint. If so, we need to
drop first before readding, right?
Can this also happen for other changes?
Since changes is a list, do we already have the sequence? If so, please
document that.
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/MaterializedTableUtilsTest.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.utils;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.TableChange.ColumnPosition;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Collection;
+import java.util.List;
+
+import static org.apache.flink.table.catalog.Column.physical;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Tests for {@link MaterializedTableUtils}. */
+class MaterializedTableUtilsTest {
+ @ParameterizedTest
+ @MethodSource("input")
+ void test(TestSpec spec) {
+
assertThat(MaterializedTableUtils.buildSchemaTableChanges(spec.oldSchema,
spec.newSchema))
+ .isEqualTo(spec.expected);
+ }
+
+ private static Collection<TestSpec> input() {
+ return List.of(
+ TestSpec.of(
+ schema(physical("a", DataTypes.INT())),
+ schema(physical("a", DataTypes.INT())),
+ List.of()),
+ TestSpec.of(
+ schema(physical("a", DataTypes.INT())),
+ schema(physical("a",
DataTypes.INT()).withComment("comment")),
+ List.of(
+ TableChange.modifyColumnComment(
+ physical("a", DataTypes.INT()),
"comment"))),
+ TestSpec.of(
+ schema(physical("a",
DataTypes.INT()).withComment("comment")),
+ schema(physical("a",
DataTypes.INT()).withComment("comment")),
+ List.of()),
+ TestSpec.of(
+ schema(physical("a",
DataTypes.TIMESTAMP()).withComment("comment")),
+ schema(physical("a",
DataTypes.TIMESTAMP()).withComment("comment 2")),
+ List.of(
+ TableChange.modifyColumnComment(
+ physical("a",
DataTypes.TIMESTAMP()).withComment("comment"),
+ "comment 2"))),
+ TestSpec.of(
+ schema(physical("a",
DataTypes.FLOAT()).withComment("comment")),
+ schema(physical("a", DataTypes.FLOAT())),
+ List.of(
+ TableChange.modifyColumnComment(
+ physical("a",
DataTypes.FLOAT()).withComment("comment"),
+ null))),
+ TestSpec.of(
+ schema(physical("a",
DataTypes.INT()).withComment("comment")),
+ schema(physical("a2",
DataTypes.STRING()).withComment("comment 2")),
+ List.of(
+ TableChange.add(
+ physical("a2", DataTypes.STRING())
+ .withComment("comment 2")),
+ TableChange.dropColumn("a"))),
+ TestSpec.of(
+ schema(physical("a", DataTypes.INT())),
+ schema(physical("b", DataTypes.INT())),
+ List.of(
+ TableChange.add(physical("b",
DataTypes.INT())),
+ TableChange.dropColumn("a"))),
+ TestSpec.of(
Review Comment:
Do we also need tests for column type changes (e.g. widening)? If that's not
supported yet, ignore this comment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]