raminqaf commented on code in PR #28287:
URL: https://github.com/apache/flink/pull/28287#discussion_r3356901605
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java:
##########
@@ -78,23 +82,34 @@ private Operation handleCreateOrAlter(
final ObjectIdentifier identifier) {
final Optional<ResolvedCatalogBaseTable<?>> resolvedBaseTable =
context.getCatalogManager().getCatalogBaseTable(identifier);
- return resolvedBaseTable
- .map(
- oldBaseTable -> {
- if (oldBaseTable.getTableKind() !=
TableKind.MATERIALIZED_TABLE) {
- throw new ValidationException(
- String.format(
- "Table %s is not a
materialized table. Only materialized table support create or alter operation.",
- identifier.asSummaryString()));
- }
- return handleAlter(
- sqlCreateOrAlterMaterializedTable,
- (ResolvedCatalogMaterializedTable)
oldBaseTable,
- context,
- identifier);
- })
- .orElseGet(
- () -> handleCreate(sqlCreateOrAlterMaterializedTable,
context, identifier));
+ if (resolvedBaseTable.isEmpty()) {
+ return handleCreate(sqlCreateOrAlterMaterializedTable, context,
identifier);
+ }
+ final ResolvedCatalogBaseTable<?> oldBaseTable =
resolvedBaseTable.get();
+ switch (oldBaseTable.getTableKind()) {
+ case MATERIALIZED_TABLE:
+ return handleAlter(
+ sqlCreateOrAlterMaterializedTable,
+ (ResolvedCatalogMaterializedTable) oldBaseTable,
+ context,
+ identifier);
+ case VIEW:
+ throw new ValidationException(
+ String.format(
+ "Cannot convert view %s to a materialized
table.",
+ identifier.asSummaryString()));
+ case TABLE:
+ return handleConvert(
+ sqlCreateOrAlterMaterializedTable,
+ context,
+ identifier,
+ (ResolvedCatalogTable) oldBaseTable);
+ default:
+ throw new ValidationException(
+ String.format(
+ "Unsupported table kind %s for %s.",
Review Comment:
Reverted
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java:
##########
@@ -78,23 +82,34 @@ private Operation handleCreateOrAlter(
final ObjectIdentifier identifier) {
final Optional<ResolvedCatalogBaseTable<?>> resolvedBaseTable =
context.getCatalogManager().getCatalogBaseTable(identifier);
- return resolvedBaseTable
- .map(
- oldBaseTable -> {
- if (oldBaseTable.getTableKind() !=
TableKind.MATERIALIZED_TABLE) {
- throw new ValidationException(
- String.format(
- "Table %s is not a
materialized table. Only materialized table support create or alter operation.",
- identifier.asSummaryString()));
- }
- return handleAlter(
- sqlCreateOrAlterMaterializedTable,
- (ResolvedCatalogMaterializedTable)
oldBaseTable,
- context,
- identifier);
- })
- .orElseGet(
- () -> handleCreate(sqlCreateOrAlterMaterializedTable,
context, identifier));
+ if (resolvedBaseTable.isEmpty()) {
+ return handleCreate(sqlCreateOrAlterMaterializedTable, context,
identifier);
+ }
+ final ResolvedCatalogBaseTable<?> oldBaseTable =
resolvedBaseTable.get();
+ switch (oldBaseTable.getTableKind()) {
+ case MATERIALIZED_TABLE:
+ return handleAlter(
+ sqlCreateOrAlterMaterializedTable,
+ (ResolvedCatalogMaterializedTable) oldBaseTable,
+ context,
+ identifier);
+ case VIEW:
+ throw new ValidationException(
+ String.format(
+ "Cannot convert view %s to a materialized
table.",
Review Comment:
Removed the branch VIEW
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java:
##########
@@ -117,6 +132,147 @@ private Operation handleAlter(
currentTable -> buildNewTable(currentTable, mergeContext,
schemaResolver));
}
+ private Operation handleConvert(
+ final SqlCreateOrAlterMaterializedTable
sqlCreateOrAlterMaterializedTable,
+ final ConvertContext context,
+ final ObjectIdentifier identifier,
+ final ResolvedCatalogTable oldBaseTable) {
+ final boolean conversionEnabled =
+ context.getTableConfig()
+ .getRootConfiguration()
+
.get(TableConfigOptions.MATERIALIZED_TABLE_CONVERSION_FROM_TABLE_ENABLED);
+ if (!conversionEnabled) {
+ throw new ValidationException(
+ String.format(
+ "Table %s is not a materialized table. Only
materialized table support create or alter operation.",
+ identifier.asSummaryString()));
+ }
+ return handleConvert(sqlCreateOrAlterMaterializedTable, oldBaseTable,
context, identifier);
+ }
+
+ private Operation handleConvert(
Review Comment:
Inlined it into a single method
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java:
##########
@@ -117,6 +132,147 @@ private Operation handleAlter(
currentTable -> buildNewTable(currentTable, mergeContext,
schemaResolver));
}
+ private Operation handleConvert(
+ final SqlCreateOrAlterMaterializedTable
sqlCreateOrAlterMaterializedTable,
+ final ConvertContext context,
+ final ObjectIdentifier identifier,
+ final ResolvedCatalogTable oldBaseTable) {
+ final boolean conversionEnabled =
+ context.getTableConfig()
+ .getRootConfiguration()
+
.get(TableConfigOptions.MATERIALIZED_TABLE_CONVERSION_FROM_TABLE_ENABLED);
+ if (!conversionEnabled) {
+ throw new ValidationException(
+ String.format(
+ "Table %s is not a materialized table. Only
materialized table support create or alter operation.",
+ identifier.asSummaryString()));
+ }
+ return handleConvert(sqlCreateOrAlterMaterializedTable, oldBaseTable,
context, identifier);
+ }
+
+ private Operation handleConvert(
+ final SqlCreateOrAlterMaterializedTable sqlCreateOrAlterTable,
+ final ResolvedCatalogTable oldTable,
+ final ConvertContext context,
+ final ObjectIdentifier identifier) {
+ final MergeContext baseMergeContext =
getMergeContext(sqlCreateOrAlterTable, context);
+ final boolean ddlDeclaresWatermark =
sqlCreateOrAlterTable.getWatermark().isPresent();
+ final boolean ddlDeclaresPrimaryKey =
+ sqlCreateOrAlterTable.getFullConstraints().stream()
+ .anyMatch(SqlTableConstraint::isPrimaryKey);
+
+ final Schema mergedSchema = baseMergeContext.getMergedSchema();
+ final Schema finalSchema =
+ inheritWatermarkAndPrimaryKey(
+ mergedSchema,
+ oldTable.getResolvedSchema(),
+ ddlDeclaresWatermark,
+ ddlDeclaresPrimaryKey,
+ identifier);
+
+ final CatalogMaterializedTable newMaterializedTable =
+ CatalogMaterializedTable.newBuilder()
+ .schema(finalSchema)
+ .comment(baseMergeContext.getMergedComment())
+
.partitionKeys(baseMergeContext.getMergedPartitionKeys())
+ .options(baseMergeContext.getMergedTableOptions())
+
.originalQuery(baseMergeContext.getMergedOriginalQuery())
+
.expandedQuery(baseMergeContext.getMergedExpandedQuery())
+
.distribution(baseMergeContext.getMergedTableDistribution().orElse(null))
+ .freshness(baseMergeContext.getMergedFreshness())
+
.logicalRefreshMode(baseMergeContext.getMergedLogicalRefreshMode())
+ .refreshMode(baseMergeContext.getMergedRefreshMode())
+ .refreshStatus(RefreshStatus.INITIALIZING)
+ .startMode(baseMergeContext.getMergedStartMode())
+ .build();
+ final ResolvedCatalogMaterializedTable resolvedNewMaterializedTable =
+
context.getCatalogManager().resolveCatalogMaterializedTable(newMaterializedTable);
+
+ final List<TableChange> tableChanges =
+ buildConversionTableChanges(
Review Comment:
Done! passing a function into the Operation
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java:
##########
@@ -117,6 +132,147 @@ private Operation handleAlter(
currentTable -> buildNewTable(currentTable, mergeContext,
schemaResolver));
}
+ private Operation handleConvert(
+ final SqlCreateOrAlterMaterializedTable
sqlCreateOrAlterMaterializedTable,
+ final ConvertContext context,
+ final ObjectIdentifier identifier,
+ final ResolvedCatalogTable oldBaseTable) {
+ final boolean conversionEnabled =
+ context.getTableConfig()
+ .getRootConfiguration()
+
.get(TableConfigOptions.MATERIALIZED_TABLE_CONVERSION_FROM_TABLE_ENABLED);
+ if (!conversionEnabled) {
+ throw new ValidationException(
+ String.format(
+ "Table %s is not a materialized table. Only
materialized table support create or alter operation.",
+ identifier.asSummaryString()));
+ }
+ return handleConvert(sqlCreateOrAlterMaterializedTable, oldBaseTable,
context, identifier);
+ }
+
+ private Operation handleConvert(
+ final SqlCreateOrAlterMaterializedTable sqlCreateOrAlterTable,
+ final ResolvedCatalogTable oldTable,
+ final ConvertContext context,
+ final ObjectIdentifier identifier) {
+ final MergeContext baseMergeContext =
getMergeContext(sqlCreateOrAlterTable, context);
+ final boolean ddlDeclaresWatermark =
sqlCreateOrAlterTable.getWatermark().isPresent();
+ final boolean ddlDeclaresPrimaryKey =
+ sqlCreateOrAlterTable.getFullConstraints().stream()
+ .anyMatch(SqlTableConstraint::isPrimaryKey);
+
+ final Schema mergedSchema = baseMergeContext.getMergedSchema();
+ final Schema finalSchema =
+ inheritWatermarkAndPrimaryKey(
+ mergedSchema,
+ oldTable.getResolvedSchema(),
+ ddlDeclaresWatermark,
+ ddlDeclaresPrimaryKey,
+ identifier);
+
+ final CatalogMaterializedTable newMaterializedTable =
+ CatalogMaterializedTable.newBuilder()
+ .schema(finalSchema)
+ .comment(baseMergeContext.getMergedComment())
+
.partitionKeys(baseMergeContext.getMergedPartitionKeys())
+ .options(baseMergeContext.getMergedTableOptions())
+
.originalQuery(baseMergeContext.getMergedOriginalQuery())
+
.expandedQuery(baseMergeContext.getMergedExpandedQuery())
+
.distribution(baseMergeContext.getMergedTableDistribution().orElse(null))
+ .freshness(baseMergeContext.getMergedFreshness())
+
.logicalRefreshMode(baseMergeContext.getMergedLogicalRefreshMode())
+ .refreshMode(baseMergeContext.getMergedRefreshMode())
+ .refreshStatus(RefreshStatus.INITIALIZING)
+ .startMode(baseMergeContext.getMergedStartMode())
+ .build();
+ final ResolvedCatalogMaterializedTable resolvedNewMaterializedTable =
+
context.getCatalogManager().resolveCatalogMaterializedTable(newMaterializedTable);
+
+ final List<TableChange> tableChanges =
+ buildConversionTableChanges(
+ oldTable,
+ resolvedNewMaterializedTable,
+ baseMergeContext.hasSchemaDefinition(),
+ baseMergeContext.hasConstraintDefinition());
+
+ return new ConvertTableToMaterializedTableOperation(
+ identifier, oldTable, resolvedNewMaterializedTable,
tableChanges);
+ }
+
+ /**
+ * Layers the watermark and primary key from the source table onto the
merged schema when the
+ * conversion DDL does not declare them, preserving the "in-place"
semantics.
+ *
+ * <p>Inheritance is per-feature and independent of column-list presence:
the DDL declaring a
+ * watermark replaces the source watermark; omitting it carries the source
watermark over. The
+ * same rule applies to the primary key.
+ *
+ * <p>To drop an inherited watermark or primary key after conversion, the
user runs a follow-up
+ * {@code ALTER MATERIALIZED TABLE ... DROP WATERMARK} or {@code ... DROP
PRIMARY KEY}.
+ *
+ * @return the merged schema with the inherited watermark and / or primary
key folded in
+ * @throws ValidationException if the source table carries more than one
watermark
+ * specification, which materialized tables do not support
+ */
+ private static Schema inheritWatermarkAndPrimaryKey(
Review Comment:
That is a very good point. The idempotency is the real concern here. Removed
it and updated the docs.
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConvertTableToMaterializedTableTest.java:
##########
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.expressions.SqlCallExpression;
+import org.apache.flink.table.operations.Operation;
+import
org.apache.flink.table.operations.materializedtable.ConvertTableToMaterializedTableOperation;
+import
org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation;
+import
org.apache.flink.table.operations.materializedtable.FullAlterMaterializedTableOperation;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Tests for in-place conversion of a regular table to a materialized table
via {@code CREATE OR
+ * ALTER MATERIALIZED TABLE}.
+ */
+class SqlNodeToOperationConvertTableToMaterializedTableTest
+ extends SqlNodeToOperationConversionTestBase {
+
+ private static final String SOURCE_REGULAR_TABLE_NAME = "src_table";
+ private static final String EXISTING_MT_NAME = "existing_mt";
+
+ private static final String CREATE_EXISTING_MT_SQL =
+ "CREATE MATERIALIZED TABLE existing_mt (\n"
+ + " CONSTRAINT pk1 PRIMARY KEY(a) NOT ENFORCED\n"
+ + ")\n"
+ + "FRESHNESS = INTERVAL '1' MINUTE\n"
+ + "AS SELECT a, b FROM t1";
+
+ @BeforeEach
+ void before() throws TableAlreadyExistException, DatabaseNotExistException
{
+ super.before();
+ createSourceRegularTable(SOURCE_REGULAR_TABLE_NAME, false, false);
+ createTimestampSourceTable();
+ createExistingMaterializedTable();
+ }
+
+ //
--------------------------------------------------------------------------------------------
Review Comment:
Converted to `@Nested` tests
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]