This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7204c8fd7c8 [FLINK-39077][table] Throw for materialized table change
query operation in case of non persisted columns in schema
7204c8fd7c8 is described below
commit 7204c8fd7c8cb428c2d4cf45f616e5a4d9cf6295
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Thu Mar 5 22:19:29 2026 +0100
[FLINK-39077][table] Throw for materialized table change query operation in
case of non persisted columns in schema
---
.../MaterializedTableManager.java | 48 +-
.../AlterMaterializedTableAsQueryOperation.java | 12 +-
.../AlterMaterializedTableChangeOperation.java | 535 ++------------------
.../FullAlterMaterializedTableOperation.java | 47 ++
.../MaterializedTableChangeHandler.java | 552 +++++++++++++++++++++
.../AbstractAlterMaterializedTableConverter.java | 33 +-
...rMaterializedTableAddDistributionConverter.java | 15 +-
.../SqlAlterMaterializedTableAsQueryConverter.java | 58 ++-
...MaterializedTableDropDistributionConverter.java | 9 +-
...lAlterMaterializedTableDropSchemaConverter.java | 17 +-
...terializedTableModifyDistributionConverter.java | 15 +-
.../SqlAlterMaterializedTableSchemaConverter.java | 35 +-
...SqlCreateOrAlterMaterializedTableConverter.java | 67 +--
.../planner/utils/MaterializedTableUtils.java | 51 +-
.../operations/SqlDdlToOperationConverterTest.java | 10 +-
...erializedTableNodeToOperationConverterTest.java | 71 ++-
.../planner/utils/MaterializedTableUtilsTest.java | 20 -
17 files changed, 915 insertions(+), 680 deletions(-)
diff --git
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
index efafb8a3a8c..7279639ce3d 100644
---
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
+++
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
@@ -325,11 +325,11 @@ public class MaterializedTableManager {
return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
}
- private CatalogMaterializedTable suspendContinuousRefreshJob(
+ private ResolvedCatalogMaterializedTable suspendContinuousRefreshJob(
OperationExecutor operationExecutor,
OperationHandle handle,
ObjectIdentifier tableIdentifier,
- CatalogMaterializedTable materializedTable) {
+ ResolvedCatalogMaterializedTable materializedTable) {
try {
ContinuousRefreshHandler refreshHandler =
deserializeContinuousHandler(materializedTable.getSerializedRefreshHandler());
@@ -371,7 +371,7 @@ public class MaterializedTableManager {
OperationExecutor operationExecutor,
OperationHandle handle,
ObjectIdentifier tableIdentifier,
- CatalogMaterializedTable materializedTable) {
+ ResolvedCatalogMaterializedTable materializedTable) {
if (RefreshStatus.SUSPENDED == materializedTable.getRefreshStatus()) {
throw new SqlExecutionException(
String.format(
@@ -489,7 +489,7 @@ public class MaterializedTableManager {
OperationExecutor operationExecutor,
OperationHandle handle,
ObjectIdentifier tableIdentifier,
- CatalogMaterializedTable catalogMaterializedTable,
+ ResolvedCatalogMaterializedTable catalogMaterializedTable,
Map<String, String> dynamicOptions) {
// Repeated resume refresh workflow is not supported
if (RefreshStatus.ACTIVATED ==
catalogMaterializedTable.getRefreshStatus()) {
@@ -577,7 +577,7 @@ public class MaterializedTableManager {
operationExecutor,
handle,
materializedTableIdentifier,
- catalogMaterializedTable,
+ resolveCatalogMaterializedTable(operationExecutor,
catalogMaterializedTable),
RefreshStatus.ACTIVATED,
continuousRefreshHandler.asSummaryString(),
serializedBytes);
@@ -822,7 +822,7 @@ public class MaterializedTableManager {
if (RefreshStatus.ACTIVATED ==
oldMaterializedTable.getRefreshStatus()) {
// 1. suspend the materialized table
- CatalogMaterializedTable suspendMaterializedTable =
+ ResolvedCatalogMaterializedTable suspendMaterializedTable =
suspendContinuousRefreshJob(
operationExecutor, handle, tableIdentifier,
oldMaterializedTable);
@@ -830,7 +830,7 @@ public class MaterializedTableManager {
AlterMaterializedTableChangeOperation
alterMaterializedTableChangeOperation =
new AlterMaterializedTableChangeOperation(
op.getTableIdentifier(),
- op.getTableChanges(),
+ oldTable -> op.getTableChanges(),
suspendMaterializedTable);
operationExecutor.callExecutableOperation(
handle, alterMaterializedTableChangeOperation);
@@ -840,8 +840,7 @@ public class MaterializedTableManager {
executeContinuousRefreshJob(
operationExecutor,
handle,
- alterMaterializedTableChangeOperation
- .getMaterializedTableWithAppliedChanges(),
+ alterMaterializedTableChangeOperation.getNewTable(),
tableIdentifier,
Collections.emptyMap(),
Optional.empty());
@@ -851,7 +850,7 @@ public class MaterializedTableManager {
LOG.warn(
"Failed to start the continuous refresh job for
materialized table {} using new query {}, rollback to origin query {}.",
tableIdentifier,
-
op.getMaterializedTableWithAppliedChanges().getExpandedQuery(),
+ op.getNewTable().getExpandedQuery(),
suspendMaterializedTable.getExpandedQuery(),
e);
@@ -874,8 +873,7 @@ public class MaterializedTableManager {
throw new SqlExecutionException(
String.format(
"Failed to start the continuous refresh job
using new query %s when altering materialized table %s select query.",
-
op.getMaterializedTableWithAppliedChanges().getExpandedQuery(),
- tableIdentifier),
+ op.getNewTable().getExpandedQuery(),
tableIdentifier),
e);
}
} else if (RefreshStatus.SUSPENDED ==
oldMaterializedTable.getRefreshStatus()) {
@@ -889,7 +887,7 @@ public class MaterializedTableManager {
AlterMaterializedTableChangeOperation
alterMaterializedTableChangeOperation =
new AlterMaterializedTableChangeOperation(
- tableIdentifier, tableChanges,
oldMaterializedTable);
+ tableIdentifier, oldTable -> tableChanges,
oldMaterializedTable);
operationExecutor.callExecutableOperation(
handle, alterMaterializedTableChangeOperation);
@@ -904,11 +902,11 @@ public class MaterializedTableManager {
}
private AlterMaterializedTableChangeOperation
generateRollbackAlterMaterializedTableOperation(
- CatalogMaterializedTable oldMaterializedTable,
+ ResolvedCatalogMaterializedTable oldMaterializedTable,
AlterMaterializedTableChangeOperation op) {
return new AlterMaterializedTableChangeOperation(
- op.getTableIdentifier(), List.of(), oldMaterializedTable);
+ op.getTableIdentifier(), oldTable -> List.of(),
oldMaterializedTable);
}
private TableChange.ModifyRefreshHandler generateResetSavepointTableChange(
@@ -1158,11 +1156,20 @@ public class MaterializedTableManager {
return (ResolvedCatalogMaterializedTable) resolvedCatalogBaseTable;
}
- private CatalogMaterializedTable updateRefreshHandler(
+ private ResolvedCatalogMaterializedTable resolveCatalogMaterializedTable(
+ OperationExecutor operationExecutor, CatalogMaterializedTable
materializedTable) {
+ return operationExecutor
+ .getSessionContext()
+ .getSessionState()
+ .catalogManager
+ .resolveCatalogMaterializedTable(materializedTable);
+ }
+
+ private ResolvedCatalogMaterializedTable updateRefreshHandler(
OperationExecutor operationExecutor,
OperationHandle operationHandle,
ObjectIdentifier materializedTableIdentifier,
- CatalogMaterializedTable catalogMaterializedTable,
+ ResolvedCatalogMaterializedTable catalogMaterializedTable,
RefreshStatus refreshStatus,
String refreshHandlerSummary,
byte[] serializedRefreshHandler) {
@@ -1172,12 +1179,15 @@ public class MaterializedTableManager {
TableChange.modifyRefreshHandler(refreshHandlerSummary,
serializedRefreshHandler));
AlterMaterializedTableChangeOperation
alterMaterializedTableChangeOperation =
new AlterMaterializedTableChangeOperation(
- materializedTableIdentifier, tableChanges,
catalogMaterializedTable);
+ materializedTableIdentifier,
+ oldTable -> tableChanges,
+ catalogMaterializedTable);
// update RefreshHandler to Catalog
operationExecutor.callExecutableOperation(
operationHandle, alterMaterializedTableChangeOperation);
- return
alterMaterializedTableChangeOperation.getMaterializedTableWithAppliedChanges();
+ return resolveCatalogMaterializedTable(
+ operationExecutor,
alterMaterializedTableChangeOperation.getNewTable());
}
/** Generate insert statement for materialized table. */
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java
index 7ed81b58665..264fc752bfa 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java
@@ -20,11 +20,12 @@ package org.apache.flink.table.operations.materializedtable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.internal.TableResultInternal;
-import org.apache.flink.table.catalog.CatalogMaterializedTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
import org.apache.flink.table.catalog.TableChange;
import java.util.List;
+import java.util.function.Function;
/**
* Operation to describe an ALTER MATERIALIZED TABLE AS query operation. The
operation is not
@@ -38,9 +39,9 @@ public class AlterMaterializedTableAsQueryOperation extends
AlterMaterializedTab
public AlterMaterializedTableAsQueryOperation(
ObjectIdentifier tableIdentifier,
- List<TableChange> tableChanges,
- CatalogMaterializedTable oldTable) {
- super(tableIdentifier, tableChanges, oldTable);
+ Function<ResolvedCatalogMaterializedTable, List<TableChange>>
tableChangesForTable,
+ ResolvedCatalogMaterializedTable oldTable) {
+ super(tableIdentifier, tableChangesForTable, oldTable);
}
@Override
@@ -53,7 +54,6 @@ public class AlterMaterializedTableAsQueryOperation extends
AlterMaterializedTab
public String asSummaryString() {
return String.format(
"ALTER MATERIALIZED TABLE %s AS %s",
- tableIdentifier.asSummaryString(),
- getMaterializedTableWithAppliedChanges().getExpandedQuery());
+ tableIdentifier.asSummaryString(),
getNewTable().getExpandedQuery());
}
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
index b480cc00083..0930b69f1ae 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
@@ -19,59 +19,19 @@
package org.apache.flink.table.operations.materializedtable;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.api.Schema.UnresolvedColumn;
-import org.apache.flink.table.api.Schema.UnresolvedComputedColumn;
-import org.apache.flink.table.api.Schema.UnresolvedMetadataColumn;
-import org.apache.flink.table.api.Schema.UnresolvedPhysicalColumn;
-import org.apache.flink.table.api.Schema.UnresolvedWatermarkSpec;
-import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.TableResultImpl;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.catalog.CatalogMaterializedTable;
-import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshStatus;
-import org.apache.flink.table.catalog.Column;
-import org.apache.flink.table.catalog.Column.ComputedColumn;
-import org.apache.flink.table.catalog.Column.MetadataColumn;
-import org.apache.flink.table.catalog.Column.PhysicalColumn;
import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
import org.apache.flink.table.catalog.TableChange;
-import org.apache.flink.table.catalog.TableChange.AddColumn;
-import org.apache.flink.table.catalog.TableChange.AddDistribution;
-import org.apache.flink.table.catalog.TableChange.AddUniqueConstraint;
-import org.apache.flink.table.catalog.TableChange.AddWatermark;
-import org.apache.flink.table.catalog.TableChange.After;
-import org.apache.flink.table.catalog.TableChange.ColumnPosition;
-import org.apache.flink.table.catalog.TableChange.DropColumn;
-import org.apache.flink.table.catalog.TableChange.DropConstraint;
-import org.apache.flink.table.catalog.TableChange.DropDistribution;
-import org.apache.flink.table.catalog.TableChange.DropWatermark;
-import org.apache.flink.table.catalog.TableChange.ModifyColumn;
-import org.apache.flink.table.catalog.TableChange.ModifyColumnComment;
-import org.apache.flink.table.catalog.TableChange.ModifyColumnPosition;
import org.apache.flink.table.catalog.TableChange.ModifyDefinitionQuery;
-import org.apache.flink.table.catalog.TableChange.ModifyDistribution;
-import org.apache.flink.table.catalog.TableChange.ModifyPhysicalColumnType;
import org.apache.flink.table.catalog.TableChange.ModifyRefreshHandler;
import org.apache.flink.table.catalog.TableChange.ModifyRefreshStatus;
-import org.apache.flink.table.catalog.TableChange.ModifyUniqueConstraint;
-import org.apache.flink.table.catalog.TableChange.ModifyWatermark;
-import org.apache.flink.table.catalog.TableDistribution;
-import org.apache.flink.table.catalog.UniqueConstraint;
-import org.apache.flink.table.catalog.WatermarkSpec;
-import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.operations.ddl.AlterTableChangeOperation;
-import org.apache.flink.table.types.DataType;
-import javax.annotation.Nullable;
-
-import java.util.HashMap;
-import java.util.IdentityHashMap;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.function.BiConsumer;
+import java.util.function.Function;
import java.util.stream.Collectors;
/**
@@ -80,90 +40,70 @@ import java.util.stream.Collectors;
@Internal
public class AlterMaterializedTableChangeOperation extends
AlterMaterializedTableOperation {
- private final List<TableChange> tableChanges;
- private final CatalogMaterializedTable oldTable;
- private CatalogMaterializedTable materializedTableWithAppliedChanges;
+ private final Function<ResolvedCatalogMaterializedTable,
List<TableChange>> tableChangeForTable;
+ private final ResolvedCatalogMaterializedTable oldTable;
+ private MaterializedTableChangeHandler handler;
+ private CatalogMaterializedTable newTable;
+ private List<TableChange> tableChanges;
- /**
- * The original order of table changes should be kept as is in some
situations different order
- * might lead to different results.
- */
public AlterMaterializedTableChangeOperation(
ObjectIdentifier tableIdentifier,
- List<TableChange> tableChanges,
- CatalogMaterializedTable oldTable) {
- this(tableIdentifier, tableChanges, oldTable, null);
- }
-
- private AlterMaterializedTableChangeOperation(
- ObjectIdentifier tableIdentifier,
- List<TableChange> tableChanges,
- CatalogMaterializedTable oldTable,
- CatalogMaterializedTable catalogMaterializedTable) {
+ Function<ResolvedCatalogMaterializedTable, List<TableChange>>
tableChangeForTable,
+ ResolvedCatalogMaterializedTable oldTable) {
super(tableIdentifier);
- this.tableChanges = tableChanges;
+ this.tableChangeForTable = tableChangeForTable;
this.oldTable = oldTable;
- this.materializedTableWithAppliedChanges = catalogMaterializedTable;
}
public List<TableChange> getTableChanges() {
+ if (tableChanges == null) {
+ tableChanges = tableChangeForTable.apply(oldTable);
+ }
return tableChanges;
}
public AlterMaterializedTableChangeOperation copyAsTableChangeOperation() {
return new AlterMaterializedTableChangeOperation(
- tableIdentifier, tableChanges, oldTable,
materializedTableWithAppliedChanges);
+ tableIdentifier, tableChangeForTable, oldTable);
}
- public CatalogMaterializedTable getMaterializedTableWithAppliedChanges() {
- // The only case when materializedTableWithAppliedChanges is not null
from the beginning
- // is copyAsTableChangeOperation where it copies already evaluated
materialized table
- if (oldTable == null || materializedTableWithAppliedChanges != null) {
- return materializedTableWithAppliedChanges;
+ public CatalogMaterializedTable getNewTable() {
+ if (newTable == null) {
+ newTable =
+ MaterializedTableChangeHandler.buildNewMaterializedTable(
+ getHandlerWithChanges());
}
+ return newTable;
+ }
- ChangeContext changeContext = new ChangeContext(oldTable);
- changeContext.applyTableChanges(tableChanges);
-
- materializedTableWithAppliedChanges =
- CatalogMaterializedTable.newBuilder()
- .schema(changeContext.retrieveSchema())
- .comment(oldTable.getComment())
- .partitionKeys(oldTable.getPartitionKeys())
- .options(changeContext.options)
- .originalQuery(changeContext.originalQuery)
- .expandedQuery(changeContext.expandedQuery)
- .distribution(changeContext.distribution)
- .freshness(oldTable.getDefinitionFreshness())
- .logicalRefreshMode(oldTable.getLogicalRefreshMode())
- .refreshMode(oldTable.getRefreshMode())
- .refreshStatus(changeContext.refreshStatus)
-
.refreshHandlerDescription(changeContext.refreshHandlerDesc)
-
.serializedRefreshHandler(changeContext.refreshHandlerBytes)
- .build();
-
- return materializedTableWithAppliedChanges;
+ protected MaterializedTableChangeHandler getHandlerWithChanges() {
+ if (handler == null) {
+ handler =
+ MaterializedTableChangeHandler.getHandlerWithChanges(
+ oldTable, getTableChanges());
+ }
+ return handler;
}
@Override
public TableResultInternal execute(Context ctx) {
ctx.getCatalogManager()
- .alterTable(
- getMaterializedTableWithAppliedChanges(),
- getTableChanges(),
- getTableIdentifier(),
- false);
+ .alterTable(getNewTable(), getTableChanges(),
getTableIdentifier(), false);
return TableResultImpl.TABLE_RESULT_OK;
}
@Override
public String asSummaryString() {
String changes =
- tableChanges.stream()
+ getTableChanges().stream()
.map(AlterMaterializedTableChangeOperation::toString)
.collect(Collectors.joining(",\n"));
return String.format(
- "ALTER MATERIALIZED TABLE %s\n%s",
tableIdentifier.asSummaryString(), changes);
+ "%s %s\n%s", getOperationName(),
tableIdentifier.asSummaryString(), changes);
+ }
+
+ protected String getOperationName() {
+ return "ALTER MATERIALIZED TABLE";
}
private static String toString(TableChange tableChange) {
@@ -184,409 +124,4 @@ public class AlterMaterializedTableChangeOperation
extends AlterMaterializedTabl
return AlterTableChangeOperation.toString(tableChange);
}
}
-
- private static class ChangeContext {
- private static final HandlerRegistry HANDLER_REGISTRY =
createHandlerRegistry();
-
- private final List<UnresolvedColumn> columns;
- private final CatalogMaterializedTable oldTable;
- private boolean isQueryChange;
- private @Nullable TableDistribution distribution;
- private RefreshStatus refreshStatus;
- private @Nullable String refreshHandlerDesc;
- private byte[] refreshHandlerBytes;
- private List<UnresolvedWatermarkSpec> watermarkSpecs;
- private String primaryKeyName = null;
- private List<String> primaryKeyColumns = null;
- private int droppedPersistedCnt = 0;
- private String originalQuery;
- private String expandedQuery;
- private final Map<String, String> options;
-
- public ChangeContext(CatalogMaterializedTable oldTable) {
- this.distribution = oldTable.getDistribution().orElse(null);
- this.refreshStatus = oldTable.getRefreshStatus();
- this.refreshHandlerDesc =
oldTable.getRefreshHandlerDescription().orElse(null);
- this.refreshHandlerBytes = oldTable.getSerializedRefreshHandler();
- this.watermarkSpecs =
oldTable.getUnresolvedSchema().getWatermarkSpecs();
- this.columns = new
LinkedList<>(oldTable.getUnresolvedSchema().getColumns());
- Schema.UnresolvedPrimaryKey primaryKey =
-
oldTable.getUnresolvedSchema().getPrimaryKey().orElse(null);
- if (primaryKey != null) {
- this.primaryKeyName = primaryKey.getConstraintName();
- this.primaryKeyColumns = primaryKey.getColumnNames();
- }
- originalQuery = oldTable.getOriginalQuery();
- expandedQuery = oldTable.getExpandedQuery();
- this.oldTable = oldTable;
- this.options = new HashMap<>(oldTable.getOptions());
- }
-
- private static final class HandlerRegistry {
- private static final Map<Class<? extends TableChange>,
HandlerWrapper<?>> HANDLERS =
- new IdentityHashMap<>();
-
- private <T extends TableChange> void register(
- Class<T> type, BiConsumer<ChangeContext, T> handler) {
- HANDLERS.put(type, new HandlerWrapper<>(handler));
- }
-
- private void apply(ChangeContext context, TableChange change) {
- HandlerWrapper<?> wrapper = HANDLERS.get(change.getClass());
- if (wrapper == null) {
- throw new ValidationException("Unknown table change " +
change.getClass());
- }
- wrapper.accept(context, change);
- }
-
- private static final class HandlerWrapper<T extends TableChange> {
- private final BiConsumer<ChangeContext, T> handler;
-
- private HandlerWrapper(BiConsumer<ChangeContext, T> handler) {
- this.handler = handler;
- }
-
- private void accept(ChangeContext context, TableChange change)
{
- handler.accept(context, (T) change);
- }
- }
- }
-
- private static HandlerRegistry createHandlerRegistry() {
- HandlerRegistry registry = new HandlerRegistry();
-
- // Column operations
- registry.register(AddColumn.class, ChangeContext::addColumn);
- registry.register(ModifyColumn.class, ChangeContext::modifyColumn);
- registry.register(DropColumn.class, ChangeContext::dropColumn);
- registry.register(
- ModifyPhysicalColumnType.class,
ChangeContext::modifyPhysicalColumnType);
- registry.register(ModifyColumnComment.class,
ChangeContext::modifyColumnComment);
- registry.register(ModifyColumnPosition.class,
ChangeContext::modifyColumnPosition);
-
- // Query operations
- registry.register(ModifyDefinitionQuery.class,
ChangeContext::modifyDefinitionQuery);
-
- // Constraint operations
- registry.register(AddUniqueConstraint.class,
ChangeContext::addUniqueConstraint);
- registry.register(ModifyUniqueConstraint.class,
ChangeContext::modifyUniqueConstraint);
- registry.register(DropConstraint.class,
ChangeContext::dropConstraint);
-
- // Watermark operations
- registry.register(AddWatermark.class, ChangeContext::addWatermark);
- registry.register(ModifyWatermark.class,
ChangeContext::modifyWatermark);
- registry.register(DropWatermark.class,
ChangeContext::dropWatermark);
-
- // Refresh operations
- registry.register(ModifyRefreshHandler.class,
ChangeContext::modifyRefreshHandler);
- registry.register(ModifyRefreshStatus.class,
ChangeContext::modifyRefreshStatus);
-
- // Distribution operations
- registry.register(AddDistribution.class,
ChangeContext::addDistribution);
- registry.register(ModifyDistribution.class,
ChangeContext::modifyDistribution);
- registry.register(DropDistribution.class,
ChangeContext::dropDistribution);
-
- // Options
- registry.register(TableChange.SetOption.class,
ChangeContext::setTableOption);
- registry.register(TableChange.ResetOption.class,
ChangeContext::resetTableOption);
-
- return registry;
- }
-
- private void applyTableChanges(List<TableChange> tableChanges) {
- isQueryChange = tableChanges.stream().anyMatch(t -> t instanceof
ModifyDefinitionQuery);
- Schema oldSchema = oldTable.getUnresolvedSchema();
- if (isQueryChange) {
- checkForChangedPositionByQuery(tableChanges, oldSchema);
- }
-
- for (TableChange tableChange : tableChanges) {
- HANDLER_REGISTRY.apply(this, tableChange);
- }
-
- if (droppedPersistedCnt > 0 && isQueryChange) {
- final int schemaSize = oldSchema.getColumns().size();
- throw new ValidationException(
- String.format(
- "Failed to modify query because drop column is
unsupported. "
- + "When modifying a query, you can
only append new columns at the end of original schema. "
- + "The original schema has %d columns,
but the newly derived schema from the query has %d columns.",
- schemaSize, schemaSize - droppedPersistedCnt));
- }
- }
-
- private Schema retrieveSchema() {
- Schema.Builder schemaToApply =
Schema.newBuilder().fromColumns(columns);
- if (primaryKeyColumns != null) {
- if (primaryKeyName == null) {
- schemaToApply.primaryKey(primaryKeyColumns);
- } else {
- schemaToApply.primaryKeyNamed(primaryKeyName,
primaryKeyColumns);
- }
- }
-
- for (UnresolvedWatermarkSpec spec : watermarkSpecs) {
- schemaToApply.watermark(spec.getColumnName(),
spec.getWatermarkExpression());
- }
- return schemaToApply.build();
- }
-
- private void addColumn(AddColumn addColumn) {
- Column column = addColumn.getColumn();
- ColumnPosition position = addColumn.getPosition();
- UnresolvedColumn columnToAdd = toUnresolvedColumn(column);
- setColumnAtPosition(columnToAdd, position);
- }
-
- private void modifyColumn(ModifyColumn modifyColumn) {
- Column column = modifyColumn.getOldColumn();
- Column newColumn = modifyColumn.getNewColumn();
- int index = getColumnIndex(column.getName());
- UnresolvedColumn newColumn1 = toUnresolvedColumn(newColumn);
- columns.set(index, newColumn1);
- }
-
- private void dropColumn(DropColumn dropColumn) {
- String droppedColumnName = dropColumn.getColumnName();
- int index = getColumnIndex(droppedColumnName);
- UnresolvedColumn column = columns.get(index);
- if (isQueryChange && isNonPersistedColumn(column)) {
- // noop
- } else {
- columns.remove(index);
- droppedPersistedCnt++;
- }
- }
-
- private void modifyPhysicalColumnType(ModifyPhysicalColumnType
modifyPhysicalColumnType) {
- Column column = modifyPhysicalColumnType.getOldColumn();
- int position = getColumnIndex(column.getName());
- columns.set(position,
toUnresolvedColumn(modifyPhysicalColumnType.getNewColumn()));
- }
-
- private void modifyColumnComment(ModifyColumnComment
modifyColumnComment) {
- Column column = modifyColumnComment.getOldColumn();
- int position = getColumnIndex(column.getName());
- columns.set(position,
toUnresolvedColumn(modifyColumnComment.getNewColumn()));
- }
-
- private void modifyColumnPosition(ModifyColumnPosition
columnWithChangedPosition) {
- Column column = columnWithChangedPosition.getOldColumn();
- int oldPosition = getColumnIndex(column.getName());
- if (isQueryChange) {
- throw new ValidationException(
- String.format(
- "When modifying the query of a materialized
table, "
- + "currently only support appending
columns at the end of original schema, dropping, renaming, and reordering
columns are not supported.\n"
- + "Column mismatch at position %d:
Original column is [%s], but new column is [%s].",
- oldPosition, column, column));
- }
-
- ColumnPosition position =
columnWithChangedPosition.getNewPosition();
- UnresolvedColumn changedPositionColumn = columns.get(oldPosition);
- columns.remove(oldPosition);
- setColumnAtPosition(changedPositionColumn, position);
- }
-
- private void modifyDefinitionQuery(ModifyDefinitionQuery queryChange) {
- expandedQuery = queryChange.getDefinitionQuery();
- originalQuery = queryChange.getOriginalQuery();
- }
-
- private boolean isNonPersistedColumn(UnresolvedColumn column) {
- return column instanceof UnresolvedComputedColumn
- || column instanceof UnresolvedMetadataColumn
- && ((UnresolvedMetadataColumn) column).isVirtual();
- }
-
- private void addUniqueConstraint(AddUniqueConstraint
addUniqueConstraint) {
- final UniqueConstraint constraint =
addUniqueConstraint.getConstraint();
- primaryKeyName = constraint.getName();
- primaryKeyColumns = constraint.getColumns();
- }
-
- private void modifyUniqueConstraint(ModifyUniqueConstraint
modifyUniqueConstraint) {
- final UniqueConstraint constraint =
modifyUniqueConstraint.getNewConstraint();
- primaryKeyName = constraint.getName();
- primaryKeyColumns = constraint.getColumns();
- }
-
- private void dropConstraint(DropConstraint dropConstraint) {
- primaryKeyName = null;
- primaryKeyColumns = null;
- }
-
- private void addWatermark(AddWatermark addWatermark) {
- final WatermarkSpec spec = addWatermark.getWatermark();
- String rowTimeAttribute = spec.getRowtimeAttribute();
- ResolvedExpression expression = spec.getWatermarkExpression();
- watermarkSpecs = List.of(new
UnresolvedWatermarkSpec(rowTimeAttribute, expression));
- }
-
- private void modifyWatermark(ModifyWatermark modifyWatermark) {
- final WatermarkSpec spec = modifyWatermark.getNewWatermark();
- String rowTimeAttribute = spec.getRowtimeAttribute();
- ResolvedExpression expression = spec.getWatermarkExpression();
- watermarkSpecs = List.of(new
UnresolvedWatermarkSpec(rowTimeAttribute, expression));
- }
-
- private void dropWatermark(DropWatermark dropWatermark) {
- watermarkSpecs = List.of();
- }
-
- private void modifyRefreshHandler(ModifyRefreshHandler refreshHandler)
{
- refreshHandlerDesc = refreshHandler.getRefreshHandlerDesc();
- refreshHandlerBytes = refreshHandler.getRefreshHandlerBytes();
- }
-
- private void modifyRefreshStatus(ModifyRefreshStatus
modifyRefreshStatus) {
- refreshStatus = modifyRefreshStatus.getRefreshStatus();
- }
-
- private void addDistribution(AddDistribution addDistribution) {
- distribution = addDistribution.getDistribution();
- }
-
- private void modifyDistribution(ModifyDistribution modifyDistribution)
{
- distribution = modifyDistribution.getDistribution();
- }
-
- private void dropDistribution(DropDistribution dropDistribution) {
- distribution = null;
- }
-
- private void setTableOption(TableChange.SetOption option) {
- options.put(option.getKey(), option.getValue());
- }
-
- private void resetTableOption(TableChange.ResetOption option) {
- options.remove(option.getKey());
- }
-
- private UnresolvedColumn toUnresolvedColumn(Column column) {
- final String name = column.getName();
- final String comment = column.getComment().orElse(null);
- final DataType type = column.getDataType();
- if (column instanceof PhysicalColumn) {
- return new UnresolvedPhysicalColumn(name, type, comment);
- } else if (column instanceof MetadataColumn) {
- final MetadataColumn metadataColumn = (MetadataColumn) column;
- final String metadataKey =
metadataColumn.getMetadataKey().orElse(null);
- return new UnresolvedMetadataColumn(
- name, type, metadataKey, metadataColumn.isVirtual(),
comment);
- } else {
- return new UnresolvedComputedColumn(
- name, ((ComputedColumn) column).getExpression(),
comment);
- }
- }
-
- private void checkForChangedPositionByQuery(
- List<TableChange> tableChanges, Schema oldSchema) {
- List<ModifyColumnPosition> positionChanges =
- tableChanges.stream()
- .filter(t -> t instanceof ModifyColumnPosition)
- .map(t -> (ModifyColumnPosition) t)
- .collect(Collectors.toList());
-
- List<ModifyPhysicalColumnType> physicalTypeChanges =
- tableChanges.stream()
- .filter(t -> t instanceof ModifyPhysicalColumnType)
- .map(t -> (ModifyPhysicalColumnType) t)
- .collect(Collectors.toList());
-
- if (positionChanges.isEmpty() && physicalTypeChanges.isEmpty()) {
- return;
- }
-
- int persistedColumnOffset = 0;
- List<UnresolvedColumn> oldColumns = oldSchema.getColumns();
- for (UnresolvedColumn column : oldColumns) {
- if (!isNonPersistedColumn(column)) {
- persistedColumnOffset++;
- }
- }
-
- Map<String, Column> afterToColumnName = new HashMap<>();
- for (ModifyColumnPosition change : positionChanges) {
- final ColumnPosition position = change.getNewPosition();
- final Column newColumn = change.getNewColumn();
- if (position == ColumnPosition.first()) {
- if (persistedColumnOffset == 0) {
- throwPositionChangeError(
- newColumn.asSummaryString(),
-
oldColumns.get(persistedColumnOffset).toString(),
- persistedColumnOffset);
- } else {
- afterToColumnName.put(
-
oldColumns.get(persistedColumnOffset).getName(), newColumn);
- }
- } else {
- afterToColumnName.put(((After) position).column(),
newColumn);
- }
- }
-
- for (int i = 1; i < oldColumns.size(); i++) {
- Column newColumn =
afterToColumnName.get(oldColumns.get(i).getName());
- if (newColumn != null) {
- throwPositionChangeError(oldColumns.get(i + 1), newColumn,
i + 1);
- }
- }
-
- Map<String, Integer> nameToIndex = new HashMap<>();
- for (int i = 0; i < oldColumns.size(); i++) {
- UnresolvedColumn column = oldColumns.get(i);
- if (!isNonPersistedColumn(column)) {
- nameToIndex.put(column.getName(), i);
- }
- }
-
- for (ModifyPhysicalColumnType change : physicalTypeChanges) {
- final int index =
nameToIndex.get(change.getOldColumn().getName());
- throwPositionChangeError(change.getOldColumn(),
change.getNewColumn(), index);
- }
- }
-
- private static void throwPositionChangeError(
- UnresolvedColumn oldColumn, Column newColumn, int position) {
- throwPositionChangeError(oldColumn.toString(),
newColumn.asSummaryString(), position);
- }
-
- private static void throwPositionChangeError(
- Column oldColumn, Column newColumn, int position) {
- throwPositionChangeError(
- oldColumn.asSummaryString(), newColumn.asSummaryString(),
position);
- }
-
- private static void throwPositionChangeError(
- String oldColumn, String newColumn, int position) {
- throw new ValidationException(
- String.format(
- "When modifying the query of a materialized table,
"
- + "currently only support appending
columns at the end of original schema, dropping, renaming, and reordering
columns are not supported.\n"
- + "Column mismatch at position %d:
Original column is [%s], but new column is [%s].",
- position, oldColumn, newColumn));
- }
-
- private void setColumnAtPosition(UnresolvedColumn column,
ColumnPosition position) {
- if (position == null) {
- columns.add(column);
- } else if (position == ColumnPosition.first()) {
- columns.add(0, column);
- } else {
- String after = ((After) position).column();
- int index = getColumnIndex(after);
-
- columns.add(index + 1, column);
- }
- }
-
- private int getColumnIndex(String name) {
- for (int i = 0; i < columns.size(); i++) {
- if (Objects.equals(name, columns.get(i).getName())) {
- return i;
- }
- }
- return -1;
- }
- }
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/FullAlterMaterializedTableOperation.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/FullAlterMaterializedTableOperation.java
new file mode 100644
index 00000000000..4b1d4ae395d
--- /dev/null
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/FullAlterMaterializedTableOperation.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.TableChange;
+
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * Operation for CREATE OR ALTER MATERIALIZED TABLE ... in case materialized
table is present and
+ * full materialized table changes should be calculated.
+ */
+@Internal
+public class FullAlterMaterializedTableOperation extends
AlterMaterializedTableChangeOperation {
+
+ public FullAlterMaterializedTableOperation(
+ final ObjectIdentifier tableIdentifier,
+ final Function<ResolvedCatalogMaterializedTable,
List<TableChange>> tableChangeForTable,
+ final ResolvedCatalogMaterializedTable oldTable) {
+ super(tableIdentifier, tableChangeForTable, oldTable);
+ }
+
+ @Override
+ protected String getOperationName() {
+ return "CREATE OR ALTER MATERIALIZED TABLE";
+ }
+}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandler.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandler.java
new file mode 100644
index 00000000000..7acde833570
--- /dev/null
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandler.java
@@ -0,0 +1,552 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Schema.UnresolvedColumn;
+import org.apache.flink.table.api.Schema.UnresolvedPhysicalColumn;
+import org.apache.flink.table.api.Schema.UnresolvedWatermarkSpec;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.Column.MetadataColumn;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.TableChange.AddColumn;
+import org.apache.flink.table.catalog.TableChange.AddDistribution;
+import org.apache.flink.table.catalog.TableChange.AddUniqueConstraint;
+import org.apache.flink.table.catalog.TableChange.AddWatermark;
+import org.apache.flink.table.catalog.TableChange.After;
+import org.apache.flink.table.catalog.TableChange.ColumnPosition;
+import org.apache.flink.table.catalog.TableChange.DropConstraint;
+import org.apache.flink.table.catalog.TableChange.DropDistribution;
+import org.apache.flink.table.catalog.TableChange.ModifyColumn;
+import org.apache.flink.table.catalog.TableChange.ModifyColumnComment;
+import org.apache.flink.table.catalog.TableChange.ModifyColumnPosition;
+import org.apache.flink.table.catalog.TableChange.ModifyDefinitionQuery;
+import org.apache.flink.table.catalog.TableChange.ModifyDistribution;
+import org.apache.flink.table.catalog.TableChange.ModifyPhysicalColumnType;
+import org.apache.flink.table.catalog.TableChange.ModifyRefreshHandler;
+import org.apache.flink.table.catalog.TableChange.ModifyRefreshStatus;
+import org.apache.flink.table.catalog.TableChange.ModifyUniqueConstraint;
+import org.apache.flink.table.catalog.TableChange.ModifyWatermark;
+import org.apache.flink.table.catalog.TableChange.ResetOption;
+import org.apache.flink.table.catalog.TableChange.SetOption;
+import org.apache.flink.table.catalog.TableDistribution;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.catalog.WatermarkSpec;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.DataType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+
+/** Applying table changes to old materialized table and gathering validation
errors. */
+@Internal
+public class MaterializedTableChangeHandler {
+ private static final HandlerRegistry HANDLER_REGISTRY =
createHandlerRegistry();
+
+ private final List<UnresolvedColumn> columns;
+ private final CatalogMaterializedTable oldTable;
+ private boolean isQueryChange;
+ private @Nullable TableDistribution distribution;
+ private CatalogMaterializedTable.RefreshStatus refreshStatus;
+ private @Nullable String refreshHandlerDesc;
+ private byte[] refreshHandlerBytes;
+ private List<UnresolvedWatermarkSpec> watermarkSpecs;
+ private String primaryKeyName = null;
+ private List<String> primaryKeyColumns = null;
+ private int droppedPersistedCnt = 0;
+ private String originalQuery;
+ private String expandedQuery;
+ private final Map<String, String> options;
+ private final List<String> validationErrors = new ArrayList<>();
+
+ public MaterializedTableChangeHandler(CatalogMaterializedTable oldTable) {
+ this.distribution = oldTable.getDistribution().orElse(null);
+ this.refreshStatus = oldTable.getRefreshStatus();
+ this.refreshHandlerDesc =
oldTable.getRefreshHandlerDescription().orElse(null);
+ this.refreshHandlerBytes = oldTable.getSerializedRefreshHandler();
+ this.watermarkSpecs =
oldTable.getUnresolvedSchema().getWatermarkSpecs();
+ this.columns = new
LinkedList<>(oldTable.getUnresolvedSchema().getColumns());
+ Schema.UnresolvedPrimaryKey primaryKey =
+ oldTable.getUnresolvedSchema().getPrimaryKey().orElse(null);
+ if (primaryKey != null) {
+ this.primaryKeyName = primaryKey.getConstraintName();
+ this.primaryKeyColumns = primaryKey.getColumnNames();
+ }
+ originalQuery = oldTable.getOriginalQuery();
+ expandedQuery = oldTable.getExpandedQuery();
+ this.oldTable = oldTable;
+ this.options = new HashMap<>(oldTable.getOptions());
+ }
+
+ private static final class HandlerRegistry {
+ private static final Map<Class<? extends TableChange>,
HandlerRegistry.HandlerWrapper<?>>
+ HANDLERS = new IdentityHashMap<>();
+
+ private <T extends TableChange> void register(
+ Class<T> type, BiConsumer<MaterializedTableChangeHandler, T>
handler) {
+ HANDLERS.put(type, new HandlerRegistry.HandlerWrapper<>(handler));
+ }
+
+ private void apply(MaterializedTableChangeHandler context, TableChange
change) {
+ HandlerRegistry.HandlerWrapper<?> wrapper =
HANDLERS.get(change.getClass());
+ if (wrapper == null) {
+ context.validationErrors.add("Unknown table change " +
change.getClass());
+ } else {
+ wrapper.accept(context, change);
+ }
+ }
+
+ private static final class HandlerWrapper<T extends TableChange> {
+ private final BiConsumer<MaterializedTableChangeHandler, T>
handler;
+
+ private HandlerWrapper(BiConsumer<MaterializedTableChangeHandler,
T> handler) {
+ this.handler = handler;
+ }
+
+ private void accept(MaterializedTableChangeHandler context,
TableChange change) {
+ handler.accept(context, (T) change);
+ }
+ }
+ }
+
+ public List<String> getValidationErrors() {
+ return List.copyOf(validationErrors);
+ }
+
+ public static MaterializedTableChangeHandler getHandlerWithChanges(
+ CatalogMaterializedTable oldTable, List<TableChange> tableChanges)
{
+ MaterializedTableChangeHandler handler = new
MaterializedTableChangeHandler(oldTable);
+ handler.applyTableChanges(tableChanges);
+ return handler;
+ }
+
+ public static CatalogMaterializedTable buildNewMaterializedTable(
+ MaterializedTableChangeHandler context) {
+ final List<String> validationErrors = context.getValidationErrors();
+ if (!validationErrors.isEmpty()) {
+ throw new ValidationException(String.join("\n", validationErrors));
+ }
+
+ final CatalogMaterializedTable oldTable = context.getOldTable();
+ return CatalogMaterializedTable.newBuilder()
+ .schema(context.retrieveSchema())
+ .comment(oldTable.getComment())
+ .partitionKeys(oldTable.getPartitionKeys())
+ .options(context.getOptions())
+ .originalQuery(context.getOriginalQuery())
+ .expandedQuery(context.getExpandedQuery())
+ .distribution(context.getDistribution())
+ .freshness(oldTable.getDefinitionFreshness())
+ .logicalRefreshMode(oldTable.getLogicalRefreshMode())
+ .refreshMode(oldTable.getRefreshMode())
+ .refreshStatus(context.getRefreshStatus())
+ .refreshHandlerDescription(context.getRefreshHandlerDesc())
+ .serializedRefreshHandler(context.getRefreshHandlerBytes())
+ .build();
+ }
+
+ private static HandlerRegistry createHandlerRegistry() {
+ HandlerRegistry registry = new HandlerRegistry();
+
+ // Column operations
+ registry.register(AddColumn.class,
MaterializedTableChangeHandler::addColumn);
+ registry.register(ModifyColumn.class,
MaterializedTableChangeHandler::modifyColumn);
+ registry.register(TableChange.DropColumn.class,
MaterializedTableChangeHandler::dropColumn);
+ registry.register(
+ ModifyPhysicalColumnType.class,
+ MaterializedTableChangeHandler::modifyPhysicalColumnType);
+ registry.register(
+ ModifyColumnComment.class,
MaterializedTableChangeHandler::modifyColumnComment);
+ registry.register(
+ ModifyColumnPosition.class,
MaterializedTableChangeHandler::modifyColumnPosition);
+
+ // Query operations
+ registry.register(
+ ModifyDefinitionQuery.class,
MaterializedTableChangeHandler::modifyDefinitionQuery);
+
+ // Constraint operations
+ registry.register(
+ AddUniqueConstraint.class,
MaterializedTableChangeHandler::addUniqueConstraint);
+ registry.register(
+ ModifyUniqueConstraint.class,
+ MaterializedTableChangeHandler::modifyUniqueConstraint);
+ registry.register(DropConstraint.class,
MaterializedTableChangeHandler::dropConstraint);
+
+ // Watermark operations
+ registry.register(AddWatermark.class,
MaterializedTableChangeHandler::addWatermark);
+ registry.register(ModifyWatermark.class,
MaterializedTableChangeHandler::modifyWatermark);
+ registry.register(
+ TableChange.DropWatermark.class,
MaterializedTableChangeHandler::dropWatermark);
+
+ // Refresh operations
+ registry.register(
+ ModifyRefreshHandler.class,
MaterializedTableChangeHandler::modifyRefreshHandler);
+ registry.register(
+ ModifyRefreshStatus.class,
MaterializedTableChangeHandler::modifyRefreshStatus);
+
+ // Distribution operations
+ registry.register(AddDistribution.class,
MaterializedTableChangeHandler::addDistribution);
+ registry.register(
+ ModifyDistribution.class,
MaterializedTableChangeHandler::modifyDistribution);
+ registry.register(DropDistribution.class,
MaterializedTableChangeHandler::dropDistribution);
+
+ // Options
+ registry.register(SetOption.class,
MaterializedTableChangeHandler::setTableOption);
+ registry.register(ResetOption.class,
MaterializedTableChangeHandler::resetTableOption);
+
+ return registry;
+ }
+
+ void applyTableChanges(List<TableChange> tableChanges) {
+ isQueryChange = tableChanges.stream().anyMatch(t -> t instanceof
ModifyDefinitionQuery);
+ Schema oldSchema = oldTable.getUnresolvedSchema();
+ if (isQueryChange) {
+ checkForChangedPositionByQuery(tableChanges, oldSchema);
+ }
+
+ for (TableChange tableChange : tableChanges) {
+ HANDLER_REGISTRY.apply(this, tableChange);
+ }
+
+ if (droppedPersistedCnt > 0 && isQueryChange) {
+ final int schemaSize = oldSchema.getColumns().size();
+ validationErrors.add(
+ String.format(
+ "Failed to modify query because drop column is
unsupported. "
+ + "When modifying a query, you can only
append new columns at the end of original schema. "
+ + "The original schema has %d columns, but
the newly derived schema from the query has %d columns.",
+ schemaSize, schemaSize - droppedPersistedCnt));
+ }
+ }
+
+ public Schema retrieveSchema() {
+ Schema.Builder schemaToApply =
Schema.newBuilder().fromColumns(columns);
+ if (primaryKeyColumns != null) {
+ if (primaryKeyName == null) {
+ schemaToApply.primaryKey(primaryKeyColumns);
+ } else {
+ schemaToApply.primaryKeyNamed(primaryKeyName,
primaryKeyColumns);
+ }
+ }
+
+ for (UnresolvedWatermarkSpec spec : watermarkSpecs) {
+ schemaToApply.watermark(spec.getColumnName(),
spec.getWatermarkExpression());
+ }
+ return schemaToApply.build();
+ }
+
+ public String getExpandedQuery() {
+ return expandedQuery;
+ }
+
+ public String getOriginalQuery() {
+ return originalQuery;
+ }
+
+ public Map<String, String> getOptions() {
+ return options;
+ }
+
+ @Nullable
+ public TableDistribution getDistribution() {
+ return distribution;
+ }
+
+ public byte[] getRefreshHandlerBytes() {
+ return refreshHandlerBytes;
+ }
+
+ @Nullable
+ public String getRefreshHandlerDesc() {
+ return refreshHandlerDesc;
+ }
+
+ public CatalogMaterializedTable.RefreshStatus getRefreshStatus() {
+ return refreshStatus;
+ }
+
+ public CatalogMaterializedTable getOldTable() {
+ return oldTable;
+ }
+
+ private void addColumn(AddColumn addColumn) {
+ Column column = addColumn.getColumn();
+ ColumnPosition position = addColumn.getPosition();
+ UnresolvedColumn columnToAdd = toUnresolvedColumn(column);
+ setColumnAtPosition(columnToAdd, position);
+ }
+
+ private void modifyColumn(ModifyColumn modifyColumn) {
+ Column column = modifyColumn.getOldColumn();
+ Column newColumn = modifyColumn.getNewColumn();
+ int index = getColumnIndex(column.getName());
+ UnresolvedColumn newUnresolvedColumn = toUnresolvedColumn(newColumn);
+ columns.set(index, newUnresolvedColumn);
+ }
+
+ private void dropColumn(TableChange.DropColumn dropColumn) {
+ String droppedColumnName = dropColumn.getColumnName();
+ int index = getColumnIndex(droppedColumnName);
+ UnresolvedColumn column = columns.get(index);
+ if (isQueryChange && isNonPersistedColumn(column)) {
+ // noop
+ } else {
+ columns.remove(index);
+ droppedPersistedCnt++;
+ }
+ }
+
+ private void modifyPhysicalColumnType(ModifyPhysicalColumnType
modifyPhysicalColumnType) {
+ Column column = modifyPhysicalColumnType.getOldColumn();
+ int position = getColumnIndex(column.getName());
+ columns.set(position,
toUnresolvedColumn(modifyPhysicalColumnType.getNewColumn()));
+ }
+
+ private void modifyColumnComment(ModifyColumnComment modifyColumnComment) {
+ Column column = modifyColumnComment.getOldColumn();
+ int position = getColumnIndex(column.getName());
+ columns.set(position,
toUnresolvedColumn(modifyColumnComment.getNewColumn()));
+ }
+
+ private void modifyColumnPosition(ModifyColumnPosition
columnWithChangedPosition) {
+ Column column = columnWithChangedPosition.getOldColumn();
+ int oldPosition = getColumnIndex(column.getName());
+ if (isQueryChange) {
+ validationErrors.add(
+ String.format(
+ "When modifying the query of a materialized table,
"
+ + "currently only support appending
columns at the end of original schema, dropping, renaming, and reordering
columns are not supported.\n"
+ + "Column mismatch at position %d:
Original column is [%s], but new column is [%s].",
+ oldPosition, column, column));
+ }
+
+ ColumnPosition position = columnWithChangedPosition.getNewPosition();
+ UnresolvedColumn changedPositionColumn = columns.get(oldPosition);
+ columns.remove(oldPosition);
+ setColumnAtPosition(changedPositionColumn, position);
+ }
+
+ private void modifyDefinitionQuery(ModifyDefinitionQuery queryChange) {
+ expandedQuery = queryChange.getDefinitionQuery();
+ originalQuery = queryChange.getOriginalQuery();
+ }
+
+ private boolean isNonPersistedColumn(UnresolvedColumn column) {
+ return column instanceof Schema.UnresolvedComputedColumn
+ || column instanceof Schema.UnresolvedMetadataColumn
+ && ((Schema.UnresolvedMetadataColumn)
column).isVirtual();
+ }
+
+ private void addUniqueConstraint(AddUniqueConstraint addUniqueConstraint) {
+ final UniqueConstraint constraint =
addUniqueConstraint.getConstraint();
+ primaryKeyName = constraint.getName();
+ primaryKeyColumns = constraint.getColumns();
+ }
+
+ private void modifyUniqueConstraint(ModifyUniqueConstraint
modifyUniqueConstraint) {
+ final UniqueConstraint constraint =
modifyUniqueConstraint.getNewConstraint();
+ primaryKeyName = constraint.getName();
+ primaryKeyColumns = constraint.getColumns();
+ }
+
+ private void dropConstraint(DropConstraint dropConstraint) {
+ primaryKeyName = null;
+ primaryKeyColumns = null;
+ }
+
+ private void addWatermark(AddWatermark addWatermark) {
+ final WatermarkSpec spec = addWatermark.getWatermark();
+ String rowTimeAttribute = spec.getRowtimeAttribute();
+ ResolvedExpression expression = spec.getWatermarkExpression();
+ watermarkSpecs = List.of(new UnresolvedWatermarkSpec(rowTimeAttribute,
expression));
+ }
+
+ private void modifyWatermark(ModifyWatermark modifyWatermark) {
+ final WatermarkSpec spec = modifyWatermark.getNewWatermark();
+ String rowTimeAttribute = spec.getRowtimeAttribute();
+ ResolvedExpression expression = spec.getWatermarkExpression();
+ watermarkSpecs = List.of(new UnresolvedWatermarkSpec(rowTimeAttribute,
expression));
+ }
+
+ private void dropWatermark(TableChange.DropWatermark dropWatermark) {
+ watermarkSpecs = List.of();
+ }
+
+ private void modifyRefreshHandler(ModifyRefreshHandler refreshHandler) {
+ refreshHandlerDesc = refreshHandler.getRefreshHandlerDesc();
+ refreshHandlerBytes = refreshHandler.getRefreshHandlerBytes();
+ }
+
+ private void modifyRefreshStatus(ModifyRefreshStatus modifyRefreshStatus) {
+ refreshStatus = modifyRefreshStatus.getRefreshStatus();
+ }
+
+ private void addDistribution(AddDistribution addDistribution) {
+ distribution = addDistribution.getDistribution();
+ }
+
+ private void modifyDistribution(ModifyDistribution modifyDistribution) {
+ distribution = modifyDistribution.getDistribution();
+ }
+
+ private void dropDistribution(DropDistribution dropDistribution) {
+ distribution = null;
+ }
+
+ private void setTableOption(SetOption option) {
+ options.put(option.getKey(), option.getValue());
+ }
+
+ private void resetTableOption(ResetOption option) {
+ options.remove(option.getKey());
+ }
+
+ private UnresolvedColumn toUnresolvedColumn(Column column) {
+ final String name = column.getName();
+ final String comment = column.getComment().orElse(null);
+ final DataType type = column.getDataType();
+ if (column instanceof Column.PhysicalColumn) {
+ return new UnresolvedPhysicalColumn(name, type, comment);
+ } else if (column instanceof MetadataColumn) {
+ final MetadataColumn metadataColumn = (MetadataColumn) column;
+ final String metadataKey =
metadataColumn.getMetadataKey().orElse(null);
+ return new Schema.UnresolvedMetadataColumn(
+ name, type, metadataKey, metadataColumn.isVirtual(),
comment);
+ } else {
+ return new Schema.UnresolvedComputedColumn(
+ name, ((Column.ComputedColumn) column).getExpression(),
comment);
+ }
+ }
+
+ private void checkForChangedPositionByQuery(List<TableChange>
tableChanges, Schema oldSchema) {
+ List<ModifyColumnPosition> positionChanges =
+ tableChanges.stream()
+ .filter(t -> t instanceof ModifyColumnPosition)
+ .map(t -> (ModifyColumnPosition) t)
+ .collect(Collectors.toList());
+
+ List<ModifyPhysicalColumnType> physicalTypeChanges =
+ tableChanges.stream()
+ .filter(t -> t instanceof ModifyPhysicalColumnType)
+ .map(t -> (ModifyPhysicalColumnType) t)
+ .collect(Collectors.toList());
+
+ if (positionChanges.isEmpty() && physicalTypeChanges.isEmpty()) {
+ return;
+ }
+
+ int persistedColumnOffset = 0;
+ List<UnresolvedColumn> oldColumns = oldSchema.getColumns();
+ for (UnresolvedColumn column : oldColumns) {
+ if (!isNonPersistedColumn(column)) {
+ persistedColumnOffset++;
+ }
+ }
+
+ Map<String, Column> afterToColumnName = new HashMap<>();
+ for (ModifyColumnPosition change : positionChanges) {
+ final ColumnPosition position = change.getNewPosition();
+ final Column newColumn = change.getNewColumn();
+ if (position == ColumnPosition.first()) {
+ if (persistedColumnOffset == 0) {
+ positionChangeError(
+ newColumn.asSummaryString(),
+ oldColumns.get(persistedColumnOffset).toString(),
+ persistedColumnOffset);
+ } else {
+ afterToColumnName.put(
+ oldColumns.get(persistedColumnOffset).getName(),
newColumn);
+ }
+ } else {
+ afterToColumnName.put(((After) position).column(), newColumn);
+ }
+ }
+
+ for (int i = 0; i < oldColumns.size() - 1; i++) {
+ Column newColumn =
afterToColumnName.get(oldColumns.get(i).getName());
+ if (newColumn != null) {
+ positionChangeError(oldColumns.get(i + 1), newColumn, i + 1);
+ }
+ }
+
+ Map<String, Integer> nameToIndex = new HashMap<>();
+ for (int i = 0; i < oldColumns.size(); i++) {
+ UnresolvedColumn column = oldColumns.get(i);
+ if (!isNonPersistedColumn(column)) {
+ nameToIndex.put(column.getName(), i);
+ }
+ }
+
+ for (ModifyPhysicalColumnType change : physicalTypeChanges) {
+ final int index = nameToIndex.get(change.getOldColumn().getName());
+ positionChangeError(change.getOldColumn(), change.getNewColumn(),
index);
+ }
+ }
+
+ private void positionChangeError(UnresolvedColumn oldColumn, Column
newColumn, int position) {
+ positionChangeError(oldColumn.toString(), newColumn.asSummaryString(),
position);
+ }
+
+ private void positionChangeError(Column oldColumn, Column newColumn, int
position) {
+ positionChangeError(oldColumn.asSummaryString(),
newColumn.asSummaryString(), position);
+ }
+
+ private void positionChangeError(String oldColumn, String newColumn, int
position) {
+ validationErrors.add(
+ String.format(
+ "When modifying the query of a materialized table, "
+ + "currently only support appending columns at
the end of original schema, dropping, renaming, and reordering columns are not
supported.\n"
+ + "Column mismatch at position %d: Original
column is [%s], but new column is [%s].",
+ position, oldColumn, newColumn));
+ }
+
+ private void setColumnAtPosition(UnresolvedColumn column, ColumnPosition
position) {
+ if (position == null) {
+ columns.add(column);
+ } else if (position == ColumnPosition.first()) {
+ columns.add(0, column);
+ } else {
+ String after = ((After) position).column();
+ int index = getColumnIndex(after);
+
+ columns.add(index + 1, column);
+ }
+ }
+
+ private int getColumnIndex(String name) {
+ for (int i = 0; i < columns.size(); i++) {
+ if (Objects.equals(name, columns.get(i).getName())) {
+ return i;
+ }
+ }
+ return -1;
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractAlterMaterializedTableConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractAlterMaterializedTableConverter.java
index fca8c516be2..4b8c4ccf318 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractAlterMaterializedTableConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractAlterMaterializedTableConverter.java
@@ -22,17 +22,18 @@ import
org.apache.flink.sql.parser.ddl.materializedtable.SqlAlterMaterializedTab
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
import org.apache.flink.table.catalog.CatalogManager;
-import org.apache.flink.table.catalog.CatalogMaterializedTable;
import org.apache.flink.table.catalog.ContextResolvedTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.utils.ValidationUtils;
import org.apache.flink.table.planner.operations.converters.SqlNodeConverter;
+import java.util.List;
import java.util.Optional;
-import java.util.function.Consumer;
+import java.util.function.Function;
/** Abstract converter for {@link SqlAlterMaterializedTable}. */
public abstract class AbstractAlterMaterializedTableConverter<T extends
SqlAlterMaterializedTable>
@@ -44,6 +45,9 @@ public abstract class
AbstractAlterMaterializedTableConverter<T extends SqlAlter
protected abstract Operation convertToOperation(
T sqlAlterTable, ResolvedCatalogMaterializedTable oldTable,
ConvertContext context);
+ protected abstract Function<ResolvedCatalogMaterializedTable,
List<TableChange>>
+ gatherTableChanges(T sqlAlterTable, ConvertContext context);
+
@Override
public final Operation convertSqlNode(T sqlAlterMaterializedTable,
ConvertContext context) {
final CatalogManager catalogManager = context.getCatalogManager();
@@ -75,29 +79,4 @@ public abstract class
AbstractAlterMaterializedTableConverter<T extends SqlAlter
UnresolvedIdentifier.of(sqlAlterMaterializedTable.getFullName());
return
context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier);
}
-
- protected CatalogMaterializedTable buildUpdatedMaterializedTable(
- ResolvedCatalogMaterializedTable oldTable,
- Consumer<CatalogMaterializedTable.Builder> consumer) {
-
- CatalogMaterializedTable.Builder builder =
- CatalogMaterializedTable.newBuilder()
- .schema(oldTable.getUnresolvedSchema())
- .comment(oldTable.getComment())
- .partitionKeys(oldTable.getPartitionKeys())
- .options(oldTable.getOptions())
- .originalQuery(oldTable.getOriginalQuery())
- .expandedQuery(oldTable.getExpandedQuery())
- .distribution(oldTable.getDistribution().orElse(null))
- .freshness(oldTable.getDefinitionFreshness())
- .logicalRefreshMode(oldTable.getLogicalRefreshMode())
- .refreshMode(oldTable.getRefreshMode())
- .refreshStatus(oldTable.getRefreshStatus())
- .refreshHandlerDescription(
-
oldTable.getRefreshHandlerDescription().orElse(null))
-
.serializedRefreshHandler(oldTable.getSerializedRefreshHandler());
-
- consumer.accept(builder);
- return builder.build();
- }
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableAddDistributionConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableAddDistributionConverter.java
index e78a753b3df..c475ef70aae 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableAddDistributionConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableAddDistributionConverter.java
@@ -30,6 +30,7 @@ import
org.apache.flink.table.planner.utils.OperationConverterUtils;
import java.util.List;
import java.util.Optional;
+import java.util.function.Function;
/** A converter for {@link SqlAlterMaterializedTableAddDistribution}. */
public class SqlAlterMaterializedTableAddDistributionConverter
@@ -39,13 +40,23 @@ public class
SqlAlterMaterializedTableAddDistributionConverter
SqlAlterMaterializedTableAddDistribution sqlAddDistribution,
ResolvedCatalogMaterializedTable oldTable,
ConvertContext context) {
- TableDistribution distribution =
getTableDistribution(sqlAddDistribution, oldTable);
+
return new AlterMaterializedTableChangeOperation(
resolveIdentifier(sqlAddDistribution, context),
- List.of(TableChange.add(distribution)),
+ gatherTableChanges(sqlAddDistribution, context),
oldTable);
}
+ @Override
+ protected Function<ResolvedCatalogMaterializedTable, List<TableChange>>
gatherTableChanges(
+ SqlAlterMaterializedTableAddDistribution sqlAddDistribution,
ConvertContext context) {
+ return oldTable -> {
+ final TableDistribution distribution =
+ getTableDistribution(sqlAddDistribution, oldTable);
+ return List.of(TableChange.add(distribution));
+ };
+ }
+
private TableDistribution getTableDistribution(
SqlAlterMaterializedTableDistribution sqlAlterTableDistribution,
ResolvedCatalogMaterializedTable oldTable) {
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableAsQueryConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableAsQueryConverter.java
index ef251ab8d4a..b48d472593a 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableAsQueryConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableAsQueryConverter.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.planner.operations.converters.materializedtable;
import
org.apache.flink.sql.parser.ddl.materializedtable.SqlAlterMaterializedTableAsQuery;
+import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
import org.apache.flink.table.catalog.ResolvedSchema;
@@ -30,7 +31,9 @@ import
org.apache.flink.table.planner.utils.MaterializedTableUtils;
import org.apache.calcite.sql.SqlNode;
+import java.util.ArrayList;
import java.util.List;
+import java.util.function.Function;
/** A converter for {@link SqlAlterMaterializedTableAsQuery}. */
public class SqlAlterMaterializedTableAsQueryConverter
@@ -41,23 +44,42 @@ public class SqlAlterMaterializedTableAsQueryConverter
SqlAlterMaterializedTableAsQuery sqlAlterTableAsQuery,
ResolvedCatalogMaterializedTable oldTable,
ConvertContext context) {
- ObjectIdentifier identifier = resolveIdentifier(sqlAlterTableAsQuery,
context);
-
- // Validate and extract schema from query
- String originalQuery =
context.toQuotedSqlString(sqlAlterTableAsQuery.getAsQuery());
- SqlNode validatedQuery =
-
context.getSqlValidator().validate(sqlAlterTableAsQuery.getAsQuery());
- String definitionQuery = context.toQuotedSqlString(validatedQuery);
- PlannerQueryOperation queryOperation =
- new PlannerQueryOperation(
- context.toRelRoot(validatedQuery).project(), () ->
definitionQuery);
- ResolvedSchema oldSchema = oldTable.getResolvedSchema();
- ResolvedSchema newSchema = queryOperation.getResolvedSchema();
-
- List<TableChange> tableChanges =
- MaterializedTableUtils.buildSchemaTableChanges(oldSchema,
newSchema);
- tableChanges.add(TableChange.modifyDefinitionQuery(originalQuery,
definitionQuery));
-
- return new AlterMaterializedTableAsQueryOperation(identifier,
tableChanges, oldTable);
+ final ObjectIdentifier identifier =
resolveIdentifier(sqlAlterTableAsQuery, context);
+ return new AlterMaterializedTableAsQueryOperation(
+ identifier, gatherTableChanges(sqlAlterTableAsQuery, context),
oldTable);
+ }
+
+ @Override
+ protected Function<ResolvedCatalogMaterializedTable, List<TableChange>>
gatherTableChanges(
+ SqlAlterMaterializedTableAsQuery sqlAlterTableAsQuery,
ConvertContext context) {
+ return oldTable -> {
+ // Validate and extract schema from query
+ String originalQuery =
context.toQuotedSqlString(sqlAlterTableAsQuery.getAsQuery());
+ SqlNode validatedQuery =
+
context.getSqlValidator().validate(sqlAlterTableAsQuery.getAsQuery());
+ String definitionQuery = context.toQuotedSqlString(validatedQuery);
+ PlannerQueryOperation queryOperation =
+ new PlannerQueryOperation(
+ context.toRelRoot(validatedQuery).project(), () ->
definitionQuery);
+
+ ResolvedSchema oldSchema = oldTable.getResolvedSchema();
+ ResolvedSchema newSchema = queryOperation.getResolvedSchema();
+ List<TableChange> tableChanges =
+ new ArrayList<>(
+
MaterializedTableUtils.buildSchemaTableChanges(oldSchema, newSchema));
+
+ if (!tableChanges.isEmpty()) {
+ final boolean hasNonPersistedColumn =
+ oldSchema.getColumns().stream().anyMatch(c ->
!c.isPersisted());
+ if (hasNonPersistedColumn) {
+ throw new ValidationException(
+ "ALTER query for MATERIALIZED TABLE "
+ + "with schema containing non persisted
columns is not supported, "
+ + "consider using CREATE OR ALTER
MATERIALIZED TABLE instead");
+ }
+ }
+ tableChanges.add(TableChange.modifyDefinitionQuery(originalQuery,
definitionQuery));
+ return tableChanges;
+ };
}
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableDropDistributionConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableDropDistributionConverter.java
index 2d81569d4a1..7d85376088d 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableDropDistributionConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableDropDistributionConverter.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.operations.Operation;
import
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableChangeOperation;
import java.util.List;
+import java.util.function.Function;
/** A converter for {@link SqlAlterMaterializedTableDropDistribution}. */
public class SqlAlterMaterializedTableDropDistributionConverter
@@ -45,6 +46,12 @@ public class
SqlAlterMaterializedTableDropDistributionConverter
}
return new AlterMaterializedTableChangeOperation(
- identifier, List.of(TableChange.dropDistribution()), oldTable);
+ identifier, gatherTableChanges(sqlDropDistribution, context),
oldTable);
+ }
+
+ @Override
+ protected Function<ResolvedCatalogMaterializedTable, List<TableChange>>
gatherTableChanges(
+ SqlAlterMaterializedTableDropDistribution sqlAlterTable,
ConvertContext context) {
+ return oldTable -> List.of(TableChange.dropDistribution());
}
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableDropSchemaConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableDropSchemaConverter.java
index d953c8ac619..e8f8a41b44d 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableDropSchemaConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableDropSchemaConverter.java
@@ -38,6 +38,7 @@ import org.apache.calcite.sql.SqlIdentifier;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.function.Function;
import java.util.stream.Collectors;
/**
@@ -50,12 +51,11 @@ public abstract class
SqlAlterMaterializedTableDropSchemaConverter<
@Override
protected Operation convertToOperation(
T alterTableSchema, ResolvedCatalogMaterializedTable oldTable,
ConvertContext context) {
- Set<String> columnsToDrop = getColumnsToDrop(alterTableSchema);
- List<TableChange> tableChanges =
- validateAndGatherDropChanges(alterTableSchema, oldTable,
columnsToDrop, context);
return new AlterMaterializedTableChangeOperation(
- resolveIdentifier(alterTableSchema, context), tableChanges,
oldTable);
+ resolveIdentifier(alterTableSchema, context),
+ gatherTableChanges(alterTableSchema, context),
+ oldTable);
}
protected abstract List<TableChange> validateAndGatherDropChanges(
@@ -66,6 +66,15 @@ public abstract class
SqlAlterMaterializedTableDropSchemaConverter<
protected abstract Set<String> getColumnsToDrop(T alterTableSchema);
+ @Override
+ protected Function<ResolvedCatalogMaterializedTable, List<TableChange>>
gatherTableChanges(
+ T alterTableSchema, ConvertContext context) {
+ return oldTable -> {
+ Set<String> columnsToDrop = getColumnsToDrop(alterTableSchema);
+ return validateAndGatherDropChanges(alterTableSchema, oldTable,
columnsToDrop, context);
+ };
+ }
+
/**
* Convert {@code ALTER TABLE MATERIALIZED TABLE DROP PRIMARY KEY} to
generate an updated
* Schema.
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableModifyDistributionConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableModifyDistributionConverter.java
index b02f3f3ae18..ed490156f0c 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableModifyDistributionConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableModifyDistributionConverter.java
@@ -29,6 +29,7 @@ import
org.apache.flink.table.operations.materializedtable.AlterMaterializedTabl
import org.apache.flink.table.planner.utils.OperationConverterUtils;
import java.util.List;
+import java.util.function.Function;
/** A converter for {@link SqlAlterMaterializedTableModifyDistribution}. */
public class SqlAlterMaterializedTableModifyDistributionConverter
@@ -48,11 +49,17 @@ public class
SqlAlterMaterializedTableModifyDistributionConverter
identifier));
}
- TableDistribution tableDistribution =
+ return new AlterMaterializedTableChangeOperation(
+ identifier, gatherTableChanges(sqlModifyDistribution,
context), oldTable);
+ }
+
+ @Override
+ protected Function<ResolvedCatalogMaterializedTable, List<TableChange>>
gatherTableChanges(
+ SqlAlterMaterializedTableModifyDistribution sqlModifyDistribution,
+ ConvertContext context) {
+ final TableDistribution tableDistribution =
OperationConverterUtils.getDistributionFromSqlDistribution(
sqlModifyDistribution.getDistribution().get());
-
- return new AlterMaterializedTableChangeOperation(
- identifier, List.of(TableChange.modify(tableDistribution)),
oldTable);
+ return oldTable -> List.of(TableChange.modify(tableDistribution));
}
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableSchemaConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableSchemaConverter.java
index f407551f074..05ffdd3ce45 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableSchemaConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableSchemaConverter.java
@@ -34,6 +34,7 @@ import org.apache.flink.table.catalog.Column.PhysicalColumn;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.table.operations.Operation;
import
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableChangeOperation;
import org.apache.flink.table.planner.operations.converters.SchemaAddConverter;
@@ -47,6 +48,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.function.Function;
/**
* Abstract class for converting {@link SqlAlterMaterializedTableSchema} and
its children for alter
@@ -58,23 +60,32 @@ public abstract class
SqlAlterMaterializedTableSchemaConverter<
@Override
protected Operation convertToOperation(
T alterTableSchema, ResolvedCatalogMaterializedTable oldTable,
ConvertContext context) {
- MaterializedTableUtils.validatePersistedColumnsUsedByQuery(
- oldTable, alterTableSchema, context);
-
- SchemaConverter converter = createSchemaConverter(oldTable, context);
-
converter.updateColumn(alterTableSchema.getColumnPositions().getList());
- alterTableSchema.getWatermark().ifPresent(converter::updateWatermark);
-
alterTableSchema.getFullConstraint().ifPresent(converter::updatePrimaryKey);
- Schema schema = converter.convert();
-
- validateChanges(oldTable.getResolvedSchema(), schema, context);
-
return new AlterMaterializedTableChangeOperation(
resolveIdentifier(alterTableSchema, context),
- converter.getChangesCollector(),
+ gatherTableChanges(alterTableSchema, context),
oldTable);
}
+ @Override
+ protected Function<ResolvedCatalogMaterializedTable, List<TableChange>>
gatherTableChanges(
+ T alterTableSchema, ConvertContext context) {
+ return oldTable -> {
+ MaterializedTableUtils.validatePersistedColumnsUsedByQuery(
+ oldTable, alterTableSchema, context);
+
+ SchemaConverter converter = createSchemaConverter(oldTable,
context);
+
converter.updateColumn(alterTableSchema.getColumnPositions().getList());
+
alterTableSchema.getWatermark().ifPresent(converter::updateWatermark);
+
alterTableSchema.getFullConstraint().ifPresent(converter::updatePrimaryKey);
+ Schema schema = converter.convert();
+
+ ResolvedSchema oldSchema = oldTable.getResolvedSchema();
+ validateChanges(oldSchema, schema, context);
+
+ return converter.getChangesCollector();
+ };
+ }
+
protected abstract SchemaConverter createSchemaConverter(
ResolvedCatalogMaterializedTable oldTable, ConvertContext context);
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
index 4fe78a9a698..3e7094580db 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
@@ -31,8 +31,8 @@ import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.table.catalog.TableDistribution;
import org.apache.flink.table.operations.Operation;
-import
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableAsQueryOperation;
import
org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation;
+import
org.apache.flink.table.operations.materializedtable.FullAlterMaterializedTableOperation;
import org.apache.flink.table.planner.operations.converters.MergeTableAsUtil;
import org.apache.flink.table.planner.utils.MaterializedTableUtils;
@@ -45,6 +45,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.function.Function;
/** A converter for {@link SqlCreateOrAlterMaterializedTable}. */
public class SqlCreateOrAlterMaterializedTableConverter
@@ -100,10 +101,8 @@ public class SqlCreateOrAlterMaterializedTableConverter
final ConvertContext context,
final ObjectIdentifier identifier) {
final MergeContext mergeContext =
getMergeContext(sqlCreateOrAlterTable, context);
-
- final List<TableChange> tableChanges = buildTableChanges(oldTable,
mergeContext);
-
- return new AlterMaterializedTableAsQueryOperation(identifier,
tableChanges, oldTable);
+ return new FullAlterMaterializedTableOperation(
+ identifier, buildTableChanges(mergeContext), oldTable);
}
private Operation handleCreate(
@@ -116,49 +115,37 @@ public class SqlCreateOrAlterMaterializedTableConverter
return new CreateMaterializedTableOperation(identifier, resolvedTable);
}
- private List<TableChange> buildTableChanges(
- final ResolvedCatalogMaterializedTable oldTable, final
MergeContext mergeContext) {
- final List<TableChange> changes = new ArrayList<>();
-
- final ResolvedSchema oldSchema = oldTable.getResolvedSchema();
- final List<Column> newColumns =
- MaterializedTableUtils.validateAndExtractNewColumns(
- oldSchema, mergeContext.getMergedQuerySchema());
+ private Function<ResolvedCatalogMaterializedTable, List<TableChange>>
buildTableChanges(
+ final MergeContext mergeContext) {
+ return oldTable -> {
+ final List<TableChange> changes = new ArrayList<>();
- newColumns.forEach(column -> changes.add(TableChange.add(column)));
- changes.add(
- TableChange.modifyDefinitionQuery(
- mergeContext.getMergedOriginalQuery(),
- mergeContext.getMergedExpandedQuery()));
+ final ResolvedSchema oldSchema = oldTable.getResolvedSchema();
+ final List<Column> newColumns =
+ MaterializedTableUtils.validateAndExtractNewColumns(
+ oldSchema, mergeContext.getMergedQuerySchema());
- changes.addAll(
- calculateOptionsChange(
- oldTable.getOptions(),
mergeContext.getMergedTableOptions()));
-
- return changes;
- }
+ newColumns.forEach(column -> changes.add(TableChange.add(column)));
+ changes.add(
+ TableChange.modifyDefinitionQuery(
+ mergeContext.getMergedOriginalQuery(),
+ mergeContext.getMergedExpandedQuery()));
- public static List<TableChange> calculateOptionsChange(
- Map<String, String> oldOptions, Map<String, String> newOptions) {
- if (oldOptions.equals(newOptions)) {
- return List.of();
- }
+ final Map<String, String> oldOptions = oldTable.getOptions();
+ final Map<String, String> newOptions =
mergeContext.getMergedTableOptions();
- final List<TableChange> changes = new ArrayList<>();
- for (Map.Entry<String, String> option : newOptions.entrySet()) {
- final String oldValue = oldOptions.get(option.getKey());
- if (oldValue == null || !oldValue.equals(option.getValue())) {
- changes.add(TableChange.set(option.getKey(),
option.getValue()));
+ for (Map.Entry<String, String> newOptionEntry :
newOptions.entrySet()) {
+ changes.add(TableChange.set(newOptionEntry.getKey(),
newOptionEntry.getValue()));
}
- }
- for (Map.Entry<String, String> option : oldOptions.entrySet()) {
- if (newOptions.get(option.getKey()) == null) {
- changes.add(TableChange.reset(option.getKey()));
+ for (Map.Entry<String, String> oldOptionEntry :
oldOptions.entrySet()) {
+ if (newOptions.get(oldOptionEntry.getKey()) == null) {
+ changes.add(TableChange.reset(oldOptionEntry.getKey()));
+ }
}
- }
- return changes;
+ return changes;
+ };
}
@Override
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
index 70e09e89354..954f876240b 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
@@ -26,13 +26,13 @@ import
org.apache.flink.sql.parser.ddl.SqlTableColumn.SqlRegularColumn;
import
org.apache.flink.sql.parser.ddl.materializedtable.SqlAlterMaterializedTableSchema;
import org.apache.flink.sql.parser.ddl.position.SqlTableColumnPosition;
import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
import
org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode;
import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.Column.ComputedColumn;
import org.apache.flink.table.catalog.Column.MetadataColumn;
import org.apache.flink.table.catalog.IntervalFreshness;
-import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.table.catalog.TableChange.ColumnPosition;
@@ -40,6 +40,7 @@ import
org.apache.flink.table.planner.operations.PlannerQueryOperation;
import
org.apache.flink.table.planner.operations.converters.SqlNodeConverter.ConvertContext;
import org.apache.calcite.sql.SqlIntervalLiteral;
+import org.apache.calcite.sql.SqlIntervalLiteral.IntervalValue;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.type.SqlTypeFamily;
@@ -74,8 +75,7 @@ public class MaterializedTableUtils {
"Materialized table freshness only support SECOND, MINUTE,
HOUR, DAY as the time unit.");
}
- SqlIntervalLiteral.IntervalValue intervalValue =
-
sqlIntervalLiteral.getValueAs(SqlIntervalLiteral.IntervalValue.class);
+ IntervalValue intervalValue =
sqlIntervalLiteral.getValueAs(IntervalValue.class);
String interval = intervalValue.getIntervalLiteral();
switch (intervalValue.getIntervalQualifier().typeName()) {
case INTERVAL_DAY:
@@ -127,17 +127,18 @@ public class MaterializedTableUtils {
// ALTER MATERIALIZED TABLE ... AS ...
public static List<TableChange> buildSchemaTableChanges(
ResolvedSchema oldSchema, ResolvedSchema newSchema) {
+ if (!isSchemaChanged(oldSchema, newSchema)) {
+ return List.of();
+ }
+
final List<Column> oldColumns = oldSchema.getColumns();
- final List<Column> newColumns = newSchema.getColumns();
- int persistedColumnOffset = 0;
final Map<String, Tuple2<Column, Integer>> oldColumnSet = new
HashMap<>();
for (int i = 0; i < oldColumns.size(); i++) {
Column column = oldColumns.get(i);
- if (!column.isPersisted()) {
- persistedColumnOffset++;
- }
oldColumnSet.put(column.getName(), Tuple2.of(oldColumns.get(i),
i));
}
+ // Schema retrieved from query doesn't count existing non persisted
columns
+ final List<Column> newColumns = newSchema.getColumns();
List<TableChange> changes = new ArrayList<>();
for (int i = 0; i < newColumns.size(); i++) {
@@ -150,8 +151,7 @@ public class MaterializedTableUtils {
}
// Check if position changed
- applyPositionChanges(
- newColumns, oldColumnToPosition, i +
persistedColumnOffset, changes);
+ applyPositionChanges(newColumns, oldColumnToPosition, i, changes);
Column oldColumn = oldColumnToPosition.f0;
// Check if column changed
@@ -208,6 +208,33 @@ public class MaterializedTableUtils {
return changes;
}
+ // Since it is only for query change, then check only persisted columns
which could be
+ // changed/added/dropped with such change
+ private static boolean isSchemaChanged(ResolvedSchema oldSchema,
ResolvedSchema newSchema) {
+ List<Column> oldPersistedColumns =
+ oldSchema.getColumns().stream()
+ .filter(Column::isPersisted)
+ .collect(Collectors.toList());
+ if (oldPersistedColumns.size() != newSchema.getColumnCount()) {
+ return true;
+ }
+ for (int i = 0; i < oldPersistedColumns.size(); i++) {
+ Column oldColumn = oldPersistedColumns.get(i);
+ Column newColumn = newSchema.getColumn(i).get();
+ if (!oldColumn.getName().equals(newColumn.getName())) {
+ return true;
+ }
+ if (!newColumn
+ .getDataType()
+ .getLogicalType()
+ .equals(oldColumn.getDataType().getLogicalType())) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
private static void applyPositionChanges(
List<Column> newColumns,
Tuple2<Column, Integer> oldColumnToPosition,
@@ -290,7 +317,7 @@ public class MaterializedTableUtils {
}
public static ResolvedSchema getQueryOperationResolvedSchema(
- ResolvedCatalogMaterializedTable oldTable, ConvertContext context)
{
+ CatalogMaterializedTable oldTable, ConvertContext context) {
final SqlNode originalQuery =
context.getFlinkPlanner().parser().parse(oldTable.getOriginalQuery());
final SqlNode validateQuery =
context.getSqlValidator().validate(originalQuery);
@@ -302,7 +329,7 @@ public class MaterializedTableUtils {
}
public static void validatePersistedColumnsUsedByQuery(
- ResolvedCatalogMaterializedTable oldTable,
+ CatalogMaterializedTable oldTable,
SqlAlterMaterializedTableSchema alterTableSchema,
ConvertContext context) {
final SqlNodeList sqlNodeList = alterTableSchema.getColumnPositions();
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
index 66b338aebb6..2d5aecfff4a 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
@@ -1553,9 +1553,13 @@ class SqlDdlToOperationConverterTest extends
SqlNodeToOperationConversionTestBas
"tb2", false, 1, TableDistribution.of(Kind.HASH, 1,
List.of("a")), "SELECT 1");
assertThatThrownBy(
- () ->
- parse(
- "alter materialized table cat1.db1.tb2
add distribution into 3 buckets"))
+ () -> {
+ AlterMaterializedTableChangeOperation
tableChangeOperation =
+ (AlterMaterializedTableChangeOperation)
+ parse(
+ "alter materialized table
cat1.db1.tb2 add distribution into 3 buckets");
+ tableChangeOperation.getTableChanges();
+ })
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
"The current materialized table has already defined
the distribution "
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
index e56180d633c..9b928b4333d 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
@@ -50,6 +50,7 @@ import
org.apache.flink.table.operations.materializedtable.AlterMaterializedTabl
import
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableSuspendOperation;
import
org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation;
import
org.apache.flink.table.operations.materializedtable.DropMaterializedTableOperation;
+import
org.apache.flink.table.operations.materializedtable.FullAlterMaterializedTableOperation;
import org.apache.flink.table.planner.utils.TableFunc0;
import org.junit.jupiter.api.BeforeEach;
@@ -178,6 +179,19 @@ class SqlMaterializedTableNodeToOperationConverterTest
+ "AS SELECT 1";
createMaterializedTableInCatalog(sqlWithNonPersisted,
"base_mtbl_with_non_persisted");
+
+ // MATERIALIZED TABLE with non persisted columns last
+ final String sqlWithNonPersistedLast =
+ "CREATE MATERIALIZED TABLE base_mtbl_with_non_persisted_last
(\n"
+ + " a INT NOT NULL,"
+ + " c AS UPPER(CAST(a AS STRING))"
+ + ")\n"
+ + "FRESHNESS = INTERVAL '30' SECOND\n"
+ + "REFRESH_MODE = FULL\n"
+ + "AS SELECT 1 as a";
+
+ createMaterializedTableInCatalog(
+ sqlWithNonPersistedLast, "base_mtbl_with_non_persisted_last");
}
@Test
@@ -389,7 +403,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
() -> {
AlterMaterializedTableChangeOperation operation =
(AlterMaterializedTableChangeOperation)
parse(spec.sql);
- operation.getMaterializedTableWithAppliedChanges();
+ operation.getNewTable();
})
.as(spec.sql)
.isInstanceOf(spec.expectedException)
@@ -408,8 +422,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
void createAlterTableSuccessCase(TestSpec testSpec) {
AlterMaterializedTableChangeOperation operation =
(AlterMaterializedTableChangeOperation) parse(testSpec.sql);
- CatalogMaterializedTable catalogMaterializedTable =
- operation.getMaterializedTableWithAppliedChanges();
+ CatalogMaterializedTable catalogMaterializedTable =
operation.getNewTable();
assertThat(catalogMaterializedTable.getUnresolvedSchema())
.hasToString(testSpec.expectedSchema);
}
@@ -538,7 +551,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
(CatalogMaterializedTable)
catalog.getTable(
new
ObjectPath(catalogManager.getCurrentDatabase(), "base_mtbl"));
- CatalogMaterializedTable newTable =
op.getMaterializedTableWithAppliedChanges();
+ CatalogMaterializedTable newTable = op.getNewTable();
assertThat(oldTable.getUnresolvedSchema()).isNotEqualTo(newTable.getUnresolvedSchema());
assertThat(oldTable.getUnresolvedSchema().getPrimaryKey())
@@ -652,10 +665,9 @@ class SqlMaterializedTableNodeToOperationConverterTest
+ "AS SELECT a, b, c, d, d as e, cast('123' as string)
as f FROM t3";
Operation operation = parse(sql);
-
assertThat(operation).isInstanceOf(AlterMaterializedTableAsQueryOperation.class);
+
assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class);
- AlterMaterializedTableAsQueryOperation op =
- (AlterMaterializedTableAsQueryOperation) operation;
+ FullAlterMaterializedTableOperation op =
(FullAlterMaterializedTableOperation) operation;
assertThat(op.getTableChanges())
.containsExactly(
TableChange.add(Column.physical("e",
DataTypes.VARCHAR(Integer.MAX_VALUE))),
@@ -668,15 +680,20 @@ class SqlMaterializedTableNodeToOperationConverterTest
TableChange.reset("connector"));
assertThat(operation.asSummaryString())
.isEqualTo(
- "ALTER MATERIALIZED TABLE builtin.default.base_mtbl AS
SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS
STRING) AS `f`\n"
- + "FROM `builtin`.`default`.`t3` AS `t3`");
+ "CREATE OR ALTER MATERIALIZED TABLE
builtin.default.base_mtbl\n"
+ + " ADD `e` STRING ,\n"
+ + " ADD `f` STRING ,\n"
+ + " MODIFY DEFINITION QUERY TO 'SELECT
`t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING)
AS `f`\n"
+ + "FROM `builtin`.`default`.`t3` AS `t3`',\n"
+ + " SET 'format' = 'json2',\n"
+ + " RESET 'connector'");
// new table only difference schema & definition query with old table.
CatalogMaterializedTable oldTable =
(CatalogMaterializedTable)
catalog.getTable(
new
ObjectPath(catalogManager.getCurrentDatabase(), "base_mtbl"));
- CatalogMaterializedTable newTable =
op.getMaterializedTableWithAppliedChanges();
+ CatalogMaterializedTable newTable = op.getNewTable();
assertThat(newTable.getOptions()).containsExactly(Map.entry("format",
"json2"));
assertThat(oldTable.getUnresolvedSchema()).isNotEqualTo(newTable.getUnresolvedSchema());
@@ -748,6 +765,11 @@ class SqlMaterializedTableNodeToOperationConverterTest
+ "renaming, and reordering columns are not
supported.\n"
+ "Column mismatch at position 3: Original
column is [`d` STRING], "
+ "but new column is [`d` STRING NOT NULL]."),
+ TestSpec.of(
+ "ALTER MATERIALIZED TABLE base_mtbl_with_non_persisted
AS SELECT '123'",
+ "ALTER query for MATERIALIZED TABLE "
+ + "with schema containing non persisted
columns is not supported, "
+ + "consider using CREATE OR ALTER MATERIALIZED
TABLE instead"),
TestSpec.of(
"ALTER MATERIALIZED TABLE t1 AS SELECT * FROM t1",
"ALTER MATERIALIZED TABLE for a table is not
allowed"));
@@ -1200,7 +1222,23 @@ class SqlMaterializedTableNodeToOperationConverterTest
list.add(
TestSpec.withExpectedSchema(
- "ALTER MATERIALIZED TABLE base_mtbl_with_non_persisted
AS SELECT 1",
+ "CREATE OR ALTER MATERIALIZED TABLE
base_mtbl_with_non_persisted_last AS SELECT 1 as a",
+ "(\n"
+ + " `c` AS [UPPER(CAST(`a` AS STRING))],\n"
+ + " `a` INT NOT NULL\n"
+ + ")"));
+
+ list.add(
+ TestSpec.withExpectedSchema(
+ "CREATE OR ALTER MATERIALIZED TABLE
base_mtbl_with_non_persisted_last AS SELECT 1 as a",
+ "(\n"
+ + " `c` AS [UPPER(CAST(`a` AS STRING))],\n"
+ + " `a` INT NOT NULL\n"
+ + ")"));
+
+ list.add(
+ TestSpec.withExpectedSchema(
+ "CREATE OR ALTER MATERIALIZED TABLE
base_mtbl_with_non_persisted AS SELECT 1",
"(\n"
+ " `m` STRING METADATA VIRTUAL,\n"
+ " `calc` AS ['a' || 'b'],\n"
@@ -1209,7 +1247,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
list.add(
TestSpec.withExpectedSchema(
- "ALTER MATERIALIZED TABLE base_mtbl_with_non_persisted
AS SELECT 2, 'a' AS sec",
+ "CREATE OR ALTER MATERIALIZED TABLE
base_mtbl_with_non_persisted AS SELECT 2, 'a' AS sec",
"(\n"
+ " `m` STRING METADATA VIRTUAL,\n"
+ " `calc` AS ['a' || 'b'],\n"
@@ -1226,6 +1264,15 @@ class SqlMaterializedTableNodeToOperationConverterTest
+ " `EXPR$0` INT NOT NULL,\n"
+ " `sec` CHAR(1)\n"
+ ")"));
+
+ list.add(
+ TestSpec.withExpectedSchema(
+ // Schema doesn't change, so should be ok
+ "ALTER MATERIALIZED TABLE
base_mtbl_with_non_persisted_last AS SELECT 123 as a",
+ "(\n"
+ + " `c` AS [UPPER(CAST(`a` AS STRING))],\n"
+ + " `a` INT NOT NULL\n"
+ + ")"));
return list;
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/MaterializedTableUtilsTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/MaterializedTableUtilsTest.java
index 17ee036a51d..1859c0f531b 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/MaterializedTableUtilsTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/MaterializedTableUtilsTest.java
@@ -48,30 +48,10 @@ class MaterializedTableUtilsTest {
schema(physical("a", DataTypes.INT())),
schema(physical("a", DataTypes.INT())),
List.of()),
- TestSpec.of(
- schema(physical("a", DataTypes.INT())),
- schema(physical("a",
DataTypes.INT()).withComment("comment")),
- List.of(
- TableChange.modifyColumnComment(
- physical("a", DataTypes.INT()),
"comment"))),
TestSpec.of(
schema(physical("a",
DataTypes.INT()).withComment("comment")),
schema(physical("a",
DataTypes.INT()).withComment("comment")),
List.of()),
- TestSpec.of(
- schema(physical("a",
DataTypes.TIMESTAMP()).withComment("comment")),
- schema(physical("a",
DataTypes.TIMESTAMP()).withComment("comment 2")),
- List.of(
- TableChange.modifyColumnComment(
- physical("a",
DataTypes.TIMESTAMP()).withComment("comment"),
- "comment 2"))),
- TestSpec.of(
- schema(physical("a",
DataTypes.FLOAT()).withComment("comment")),
- schema(physical("a", DataTypes.FLOAT())),
- List.of(
- TableChange.modifyColumnComment(
- physical("a",
DataTypes.FLOAT()).withComment("comment"),
- null))),
TestSpec.of(
schema(physical("a",
DataTypes.INT()).withComment("comment")),
schema(physical("a2",
DataTypes.STRING()).withComment("comment 2")),