AHeise commented on code in PR #28287:
URL: https://github.com/apache/flink/pull/28287#discussion_r3339304451
##########
docs/content/docs/sql/materialized-table/statements.md:
##########
@@ -260,6 +261,56 @@ The operation updates the materialized table similarly to
[ALTER MATERIALIZED TA
See [ALTER MATERIALIZED TABLE AS](#as-select_statement-1) for more details.
+## Converting a Table to a Materialized Table
+
+`CREATE OR ALTER MATERIALIZED TABLE` can convert an existing regular table
into a materialized table in place. The catalog object keeps its identity and
underlying storage; only the table kind and the materialized-table metadata
(query definition, freshness, refresh mode, and refresh status) change. After
the conversion, a refresh job is launched just as it is for a newly created
materialized table.
+
+This lets you adopt a materialized table on top of a table that already
exists, without dropping and recreating it.
Review Comment:
Swap the sentences, the technical explanation last.
##########
docs/content/docs/sql/materialized-table/statements.md:
##########
@@ -260,6 +261,56 @@ The operation updates the materialized table similarly to
[ALTER MATERIALIZED TA
See [ALTER MATERIALIZED TABLE AS](#as-select_statement-1) for more details.
+## Converting a Table to a Materialized Table
+
+`CREATE OR ALTER MATERIALIZED TABLE` can convert an existing regular table
into a materialized table in place. The catalog object keeps its identity and
underlying storage; only the table kind and the materialized-table metadata
(query definition, freshness, refresh mode, and refresh status) change. After
the conversion, a refresh job is launched just as it is for a newly created
materialized table.
+
+This lets you adopt a materialized table on top of a table that already
exists, without dropping and recreating it.
+
+**Enabling conversion**
+
+Conversion is disabled by default. To enable it, set the following option in
the cluster configuration file `config.yaml`:
+
+```yaml
+table.materialized-table.conversion-from-table.enabled: true
+```
+
+This is a cluster-wide setting: it must be set in the cluster configuration,
and a session-level `SET` statement has no effect. When the option is disabled,
`CREATE OR ALTER MATERIALIZED TABLE` against a regular table is rejected.
+
+**Watermark and primary key inheritance**
+
+When the conversion statement does not declare a `WATERMARK` or a `PRIMARY
KEY`, the corresponding definition is inherited from the source table:
Review Comment:
Hm what's the CoA behavior? Wouldn't we want to drop things not declared?
##########
docs/layouts/shortcodes/generated/table_config_configuration.html:
##########
@@ -68,6 +68,12 @@
<td>String</td>
<td>The local time zone defines current session time zone id. It
is used when converting to/from <code>TIMESTAMP WITH LOCAL TIME
ZONE</code>. Internally, timestamps with local time zone are always
represented in the UTC time zone. However, when converting to data types that
don't include a time zone (e.g. TIMESTAMP, TIME, or simply STRING), the session
time zone is used during conversion. The input of option is either a full name
such as "America/Los_Angeles", or a custom timezone id such as "GMT-08:00".</td>
</tr>
+ <tr>
+
<td><h5>table.materialized-table.conversion-from-table.enabled</h5><br> <span
class="label label-primary">Batch</span> <span class="label
label-primary">Streaming</span></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>If enabled, CREATE OR ALTER MATERIALIZED TABLE against an
existing regular table converts it in place to a materialized table, preserving
identity and storage. This is a cluster-wide setting: it must be set in the
cluster configuration (e.g. config.yaml); session-level SET has no effect. When
disabled (the default), CREATE OR ALTER MATERIALIZED TABLE against a regular
table is rejected.</td>
Review Comment:
"CREATE OR ALTER MATERIALIZED TABLE against" sounds weird. Either "issuing
CoA against" or "CREATE OR ALTER... on"
##########
docs/content.zh/docs/sql/materialized-table/statements.md:
##########
@@ -242,6 +242,7 @@ The `OR ALTER` clause provides create-or-update semantics:
- **If the materialized table does not exist**: Creates a new materialized
table with the specified options
- **If the materialized table exists**: Modifies the query definition (behaves
like `ALTER MATERIALIZED TABLE AS`)
+- **If a regular table with the same name exists**: Converts it in place into
a materialized table, when enabled (see [Converting a Table to a Materialized
Table](#converting-a-table-to-a-materialized-table))
Review Comment:
Whole section needs to be translated. Did you file a translation ticket
already?
##########
docs/content/docs/sql/materialized-table/statements.md:
##########
@@ -260,6 +261,56 @@ The operation updates the materialized table similarly to
[ALTER MATERIALIZED TA
See [ALTER MATERIALIZED TABLE AS](#as-select_statement-1) for more details.
+## Converting a Table to a Materialized Table
+
+`CREATE OR ALTER MATERIALIZED TABLE` can convert an existing regular table
into a materialized table in place. The catalog object keeps its identity and
underlying storage; only the table kind and the materialized-table metadata
(query definition, freshness, refresh mode, and refresh status) change. After
the conversion, a refresh job is launched just as it is for a newly created
materialized table.
+
+This lets you adopt a materialized table on top of a table that already
exists, without dropping and recreating it.
+
+**Enabling conversion**
+
+Conversion is disabled by default. To enable it, set the following option in
the cluster configuration file `config.yaml`:
+
+```yaml
+table.materialized-table.conversion-from-table.enabled: true
+```
+
+This is a cluster-wide setting: it must be set in the cluster configuration,
and a session-level `SET` statement has no effect. When the option is disabled,
`CREATE OR ALTER MATERIALIZED TABLE` against a regular table is rejected.
+
+**Watermark and primary key inheritance**
+
+When the conversion statement does not declare a `WATERMARK` or a `PRIMARY
KEY`, the corresponding definition is inherited from the source table:
+
+- Omitting the watermark carries over the source table's watermark; declaring
one replaces it.
+- Omitting the primary key carries over the source table's primary key;
declaring one replaces it.
+
+Inheritance is applied independently for each, regardless of whether a column
list is provided. To drop an inherited watermark or primary key after
conversion, run a follow-up `ALTER MATERIALIZED TABLE`. A source table that
defines more than one watermark cannot be converted; declare an explicit
watermark in the conversion statement instead.
+
+**Example**
+
+```sql
+-- An existing regular table that you now want to maintain as a materialized
table
+CREATE TABLE user_spending (
+ user_id BIGINT,
+ total_amount BIGINT
+) WITH (
+ 'connector' = '...'
+);
+
+-- Convert it in place; from now on it is refreshed by the query
+CREATE OR ALTER MATERIALIZED TABLE user_spending
+ AS SELECT
+ user_id,
+ SUM(amount) AS total_amount
+ FROM orders
+ GROUP BY user_id;
+```
+
+<span class="label label-danger">Note</span>
+- The conversion is one-way and cannot be undone. To revert, drop the
materialized table and recreate the original table.
Review Comment:
First explain that you can simply suspend and use it as if it was a regular
table for input/output.
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -302,6 +307,119 @@ private void createMaterializedTableInFullMode(
}
}
+ private ResultFetcher callConvertTableToMaterializedTableOperation(
+ OperationExecutor operationExecutor,
+ OperationHandle handle,
+ ConvertTableToMaterializedTableOperation convertOperation) {
+ ResolvedCatalogMaterializedTable materializedTable =
convertOperation.getMaterializedTable();
+ if (RefreshMode.CONTINUOUS == materializedTable.getRefreshMode()) {
+ convertTableToMaterializedTableInContinuousMode(
+ operationExecutor, handle, convertOperation);
+ } else {
+ convertTableToMaterializedTableInFullMode(operationExecutor,
handle, convertOperation);
+ }
+ // Just return ok for unify different refresh job info of continuous
and full mode, user
+ // should get the refresh job info via desc table.
+ return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
+ }
+
+ private void convertTableToMaterializedTableInContinuousMode(
+ OperationExecutor operationExecutor,
+ OperationHandle handle,
+ ConvertTableToMaterializedTableOperation convertOperation) {
+ // swap the catalog entry from a regular table to a materialized table
first
+ operationExecutor.callExecutableOperation(handle, convertOperation);
+
+ ObjectIdentifier materializedTableIdentifier =
convertOperation.getTableIdentifier();
+ ResolvedCatalogMaterializedTable catalogMaterializedTable =
+ convertOperation.getMaterializedTable();
+
+ try {
+ executeContinuousRefreshJob(
+ operationExecutor,
+ handle,
+ catalogMaterializedTable,
+ materializedTableIdentifier,
+ Collections.emptyMap(),
+ Optional.empty());
+ } catch (Exception e) {
+ restoreOriginalTable(operationExecutor, handle, convertOperation);
+ throw new SqlExecutionException(
+ String.format(
+ "Failed to submit continuous refresh job when
converting table %s to a materialized table.",
+ materializedTableIdentifier),
+ e);
+ }
+ }
+
+ private void convertTableToMaterializedTableInFullMode(
+ OperationExecutor operationExecutor,
+ OperationHandle handle,
+ ConvertTableToMaterializedTableOperation convertOperation) {
+ if (workflowScheduler == null) {
+ throw new SqlExecutionException(
+ "The workflow scheduler must be configured when converting
a table to a materialized table in full refresh mode.");
+ }
+ // swap the catalog entry from a regular table to a materialized table
first
+ operationExecutor.callExecutableOperation(handle, convertOperation);
+
+ ObjectIdentifier materializedTableIdentifier =
convertOperation.getTableIdentifier();
+ ResolvedCatalogMaterializedTable catalogMaterializedTable =
+ convertOperation.getMaterializedTable();
+
+ final IntervalFreshness freshness =
catalogMaterializedTable.getDefinitionFreshness();
+ String cronExpression = convertFreshnessToCron(freshness);
+ CreateRefreshWorkflow createRefreshWorkflow =
+ new CreatePeriodicRefreshWorkflow(
+ materializedTableIdentifier,
+ catalogMaterializedTable.getExpandedQuery(),
+ cronExpression,
+ getSessionInitializationConf(operationExecutor),
+ Collections.emptyMap(),
+ restEndpointUrl);
+
+ try {
+ RefreshHandler refreshHandler =
+
workflowScheduler.createRefreshWorkflow(createRefreshWorkflow);
+ RefreshHandlerSerializer refreshHandlerSerializer =
+ workflowScheduler.getRefreshHandlerSerializer();
+ byte[] serializedRefreshHandler =
refreshHandlerSerializer.serialize(refreshHandler);
+
+ updateRefreshHandler(
+ operationExecutor,
+ handle,
+ materializedTableIdentifier,
+ catalogMaterializedTable,
+ RefreshStatus.ACTIVATED,
+ refreshHandler.asSummaryString(),
+ serializedRefreshHandler);
+ } catch (Exception e) {
Review Comment:
Extract boilerplate code into helper. I see the same thing in CREATE.
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java:
##########
@@ -383,6 +383,30 @@ default void alterTable(
alterTable(tablePath, newTable, ignoreIfNotExists);
}
+ /**
+ * Converts an existing {@link CatalogTable} to a {@link
CatalogMaterializedTable} in place,
+ * preserving the catalog entry's identity and storage.
+ *
+ * <p>The default throws {@link UnsupportedOperationException}; catalogs
that support in-place
+ * conversion override it. Launching the refresh job for the new
materialized table is the
+ * executor's responsibility, not the catalog's.
+ *
+ * @param tableChanges structured delta between {@code originalTable} and
{@code
Review Comment:
How can this be empty? Should you get modification changes for all the MT
unique properties?
##########
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java:
##########
@@ -54,6 +59,63 @@ static void init() {
// ------ tables ------
+ @Test
+ void testConvertTableToMaterializedTable() throws Exception {
Review Comment:
Nested tests would well here
##########
flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalog.java:
##########
@@ -416,6 +416,40 @@ public void alterTable(
}
}
+ @Override
+ public void convertTableToMaterializedTable(
+ ObjectPath tablePath,
+ CatalogTable originalTable,
+ CatalogMaterializedTable materializedTable,
+ List<TableChange> tableChanges)
+ throws TableNotExistException, CatalogException {
+ if (!tableExists(tablePath)) {
+ throw new TableNotExistException(getName(), tablePath);
+ }
+
+ Tuple4<Path, Path, Path, String> tableSchemaInfo =
Review Comment:
Can we use record yet?
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java:
##########
@@ -78,23 +82,34 @@ private Operation handleCreateOrAlter(
final ObjectIdentifier identifier) {
final Optional<ResolvedCatalogBaseTable<?>> resolvedBaseTable =
context.getCatalogManager().getCatalogBaseTable(identifier);
- return resolvedBaseTable
- .map(
- oldBaseTable -> {
- if (oldBaseTable.getTableKind() !=
TableKind.MATERIALIZED_TABLE) {
- throw new ValidationException(
- String.format(
- "Table %s is not a
materialized table. Only materialized table support create or alter operation.",
- identifier.asSummaryString()));
- }
- return handleAlter(
- sqlCreateOrAlterMaterializedTable,
- (ResolvedCatalogMaterializedTable)
oldBaseTable,
- context,
- identifier);
- })
- .orElseGet(
- () -> handleCreate(sqlCreateOrAlterMaterializedTable,
context, identifier));
+ if (resolvedBaseTable.isEmpty()) {
+ return handleCreate(sqlCreateOrAlterMaterializedTable, context,
identifier);
+ }
+ final ResolvedCatalogBaseTable<?> oldBaseTable =
resolvedBaseTable.get();
+ switch (oldBaseTable.getTableKind()) {
+ case MATERIALIZED_TABLE:
+ return handleAlter(
+ sqlCreateOrAlterMaterializedTable,
+ (ResolvedCatalogMaterializedTable) oldBaseTable,
+ context,
+ identifier);
+ case VIEW:
+ throw new ValidationException(
+ String.format(
+ "Cannot convert view %s to a materialized
table.",
+ identifier.asSummaryString()));
+ case TABLE:
+ return handleConvert(
+ sqlCreateOrAlterMaterializedTable,
+ context,
+ identifier,
+ (ResolvedCatalogTable) oldBaseTable);
+ default:
+ throw new ValidationException(
+ String.format(
+ "Unsupported table kind %s for %s.",
Review Comment:
Don't change the error message unnecessarily.
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConvertTableToMaterializedTableTest.java:
##########
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.expressions.SqlCallExpression;
+import org.apache.flink.table.operations.Operation;
+import
org.apache.flink.table.operations.materializedtable.ConvertTableToMaterializedTableOperation;
+import
org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation;
+import
org.apache.flink.table.operations.materializedtable.FullAlterMaterializedTableOperation;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Tests for in-place conversion of a regular table to a materialized table
via {@code CREATE OR
+ * ALTER MATERIALIZED TABLE}.
+ */
+class SqlNodeToOperationConvertTableToMaterializedTableTest
+ extends SqlNodeToOperationConversionTestBase {
+
+ private static final String SOURCE_REGULAR_TABLE_NAME = "src_table";
+ private static final String EXISTING_MT_NAME = "existing_mt";
+
+ private static final String CREATE_EXISTING_MT_SQL =
+ "CREATE MATERIALIZED TABLE existing_mt (\n"
+ + " CONSTRAINT pk1 PRIMARY KEY(a) NOT ENFORCED\n"
+ + ")\n"
+ + "FRESHNESS = INTERVAL '1' MINUTE\n"
+ + "AS SELECT a, b FROM t1";
+
+ @BeforeEach
+ void before() throws TableAlreadyExistException, DatabaseNotExistException
{
+ super.before();
+ createSourceRegularTable(SOURCE_REGULAR_TABLE_NAME, false, false);
+ createTimestampSourceTable();
+ createExistingMaterializedTable();
+ }
+
+ //
--------------------------------------------------------------------------------------------
Review Comment:
Nested tests are superior to comment blocks...
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java:
##########
@@ -117,6 +132,147 @@ private Operation handleAlter(
currentTable -> buildNewTable(currentTable, mergeContext,
schemaResolver));
}
+ private Operation handleConvert(
+ final SqlCreateOrAlterMaterializedTable
sqlCreateOrAlterMaterializedTable,
+ final ConvertContext context,
+ final ObjectIdentifier identifier,
+ final ResolvedCatalogTable oldBaseTable) {
+ final boolean conversionEnabled =
+ context.getTableConfig()
+ .getRootConfiguration()
+
.get(TableConfigOptions.MATERIALIZED_TABLE_CONVERSION_FROM_TABLE_ENABLED);
+ if (!conversionEnabled) {
+ throw new ValidationException(
+ String.format(
+ "Table %s is not a materialized table. Only
materialized table support create or alter operation.",
+ identifier.asSummaryString()));
+ }
+ return handleConvert(sqlCreateOrAlterMaterializedTable, oldBaseTable,
context, identifier);
+ }
+
+ private Operation handleConvert(
+ final SqlCreateOrAlterMaterializedTable sqlCreateOrAlterTable,
+ final ResolvedCatalogTable oldTable,
+ final ConvertContext context,
+ final ObjectIdentifier identifier) {
+ final MergeContext baseMergeContext =
getMergeContext(sqlCreateOrAlterTable, context);
+ final boolean ddlDeclaresWatermark =
sqlCreateOrAlterTable.getWatermark().isPresent();
+ final boolean ddlDeclaresPrimaryKey =
+ sqlCreateOrAlterTable.getFullConstraints().stream()
+ .anyMatch(SqlTableConstraint::isPrimaryKey);
+
+ final Schema mergedSchema = baseMergeContext.getMergedSchema();
+ final Schema finalSchema =
+ inheritWatermarkAndPrimaryKey(
+ mergedSchema,
+ oldTable.getResolvedSchema(),
+ ddlDeclaresWatermark,
+ ddlDeclaresPrimaryKey,
+ identifier);
+
+ final CatalogMaterializedTable newMaterializedTable =
+ CatalogMaterializedTable.newBuilder()
+ .schema(finalSchema)
+ .comment(baseMergeContext.getMergedComment())
+
.partitionKeys(baseMergeContext.getMergedPartitionKeys())
+ .options(baseMergeContext.getMergedTableOptions())
+
.originalQuery(baseMergeContext.getMergedOriginalQuery())
+
.expandedQuery(baseMergeContext.getMergedExpandedQuery())
+
.distribution(baseMergeContext.getMergedTableDistribution().orElse(null))
+ .freshness(baseMergeContext.getMergedFreshness())
+
.logicalRefreshMode(baseMergeContext.getMergedLogicalRefreshMode())
+ .refreshMode(baseMergeContext.getMergedRefreshMode())
+ .refreshStatus(RefreshStatus.INITIALIZING)
+ .startMode(baseMergeContext.getMergedStartMode())
+ .build();
+ final ResolvedCatalogMaterializedTable resolvedNewMaterializedTable =
+
context.getCatalogManager().resolveCatalogMaterializedTable(newMaterializedTable);
+
+ final List<TableChange> tableChanges =
+ buildConversionTableChanges(
+ oldTable,
+ resolvedNewMaterializedTable,
+ baseMergeContext.hasSchemaDefinition(),
+ baseMergeContext.hasConstraintDefinition());
+
+ return new ConvertTableToMaterializedTableOperation(
+ identifier, oldTable, resolvedNewMaterializedTable,
tableChanges);
+ }
+
+ /**
+ * Layers the watermark and primary key from the source table onto the
merged schema when the
+ * conversion DDL does not declare them, preserving the "in-place"
semantics.
+ *
+ * <p>Inheritance is per-feature and independent of column-list presence:
the DDL declaring a
+ * watermark replaces the source watermark; omitting it carries the source
watermark over. The
+ * same rule applies to the primary key.
+ *
+ * <p>To drop an inherited watermark or primary key after conversion, the
user runs a follow-up
+ * {@code ALTER MATERIALIZED TABLE ... DROP WATERMARK} or {@code ... DROP
PRIMARY KEY}.
+ *
+ * @return the merged schema with the inherited watermark and / or primary
key folded in
+ * @throws ValidationException if the source table carries more than one
watermark
+ * specification, which materialized tables do not support
+ */
+ private static Schema inheritWatermarkAndPrimaryKey(
Review Comment:
I don't agree with this part. Was that in the FLIP? This destroys the
idempotency mindset that we have for CoA.
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java:
##########
@@ -117,6 +132,147 @@ private Operation handleAlter(
currentTable -> buildNewTable(currentTable, mergeContext,
schemaResolver));
}
+ private Operation handleConvert(
+ final SqlCreateOrAlterMaterializedTable
sqlCreateOrAlterMaterializedTable,
+ final ConvertContext context,
+ final ObjectIdentifier identifier,
+ final ResolvedCatalogTable oldBaseTable) {
+ final boolean conversionEnabled =
+ context.getTableConfig()
+ .getRootConfiguration()
+
.get(TableConfigOptions.MATERIALIZED_TABLE_CONVERSION_FROM_TABLE_ENABLED);
+ if (!conversionEnabled) {
+ throw new ValidationException(
+ String.format(
+ "Table %s is not a materialized table. Only
materialized table support create or alter operation.",
+ identifier.asSummaryString()));
+ }
+ return handleConvert(sqlCreateOrAlterMaterializedTable, oldBaseTable,
context, identifier);
+ }
+
+ private Operation handleConvert(
+ final SqlCreateOrAlterMaterializedTable sqlCreateOrAlterTable,
+ final ResolvedCatalogTable oldTable,
+ final ConvertContext context,
+ final ObjectIdentifier identifier) {
+ final MergeContext baseMergeContext =
getMergeContext(sqlCreateOrAlterTable, context);
+ final boolean ddlDeclaresWatermark =
sqlCreateOrAlterTable.getWatermark().isPresent();
+ final boolean ddlDeclaresPrimaryKey =
+ sqlCreateOrAlterTable.getFullConstraints().stream()
+ .anyMatch(SqlTableConstraint::isPrimaryKey);
+
+ final Schema mergedSchema = baseMergeContext.getMergedSchema();
+ final Schema finalSchema =
+ inheritWatermarkAndPrimaryKey(
+ mergedSchema,
+ oldTable.getResolvedSchema(),
+ ddlDeclaresWatermark,
+ ddlDeclaresPrimaryKey,
+ identifier);
+
+ final CatalogMaterializedTable newMaterializedTable =
+ CatalogMaterializedTable.newBuilder()
+ .schema(finalSchema)
+ .comment(baseMergeContext.getMergedComment())
+
.partitionKeys(baseMergeContext.getMergedPartitionKeys())
+ .options(baseMergeContext.getMergedTableOptions())
+
.originalQuery(baseMergeContext.getMergedOriginalQuery())
+
.expandedQuery(baseMergeContext.getMergedExpandedQuery())
+
.distribution(baseMergeContext.getMergedTableDistribution().orElse(null))
+ .freshness(baseMergeContext.getMergedFreshness())
+
.logicalRefreshMode(baseMergeContext.getMergedLogicalRefreshMode())
+ .refreshMode(baseMergeContext.getMergedRefreshMode())
+ .refreshStatus(RefreshStatus.INITIALIZING)
+ .startMode(baseMergeContext.getMergedStartMode())
+ .build();
+ final ResolvedCatalogMaterializedTable resolvedNewMaterializedTable =
+
context.getCatalogManager().resolveCatalogMaterializedTable(newMaterializedTable);
+
+ final List<TableChange> tableChanges =
+ buildConversionTableChanges(
+ oldTable,
+ resolvedNewMaterializedTable,
+ baseMergeContext.hasSchemaDefinition(),
+ baseMergeContext.hasConstraintDefinition());
+
+ return new ConvertTableToMaterializedTableOperation(
+ identifier, oldTable, resolvedNewMaterializedTable,
tableChanges);
+ }
+
+ /**
+ * Layers the watermark and primary key from the source table onto the
merged schema when the
+ * conversion DDL does not declare them, preserving the "in-place"
semantics.
+ *
+ * <p>Inheritance is per-feature and independent of column-list presence:
the DDL declaring a
+ * watermark replaces the source watermark; omitting it carries the source
watermark over. The
+ * same rule applies to the primary key.
+ *
+ * <p>To drop an inherited watermark or primary key after conversion, the
user runs a follow-up
+ * {@code ALTER MATERIALIZED TABLE ... DROP WATERMARK} or {@code ... DROP
PRIMARY KEY}.
+ *
+ * @return the merged schema with the inherited watermark and / or primary
key folded in
+ * @throws ValidationException if the source table carries more than one
watermark
+ * specification, which materialized tables do not support
+ */
+ private static Schema inheritWatermarkAndPrimaryKey(
Review Comment:
Note that especially those two table properties are for consumers only. So
it's a hard sell that it's needed for keeping the data intact imho.
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -302,6 +307,119 @@ private void createMaterializedTableInFullMode(
}
}
+ private ResultFetcher callConvertTableToMaterializedTableOperation(
+ OperationExecutor operationExecutor,
+ OperationHandle handle,
+ ConvertTableToMaterializedTableOperation convertOperation) {
+ ResolvedCatalogMaterializedTable materializedTable =
convertOperation.getMaterializedTable();
+ if (RefreshMode.CONTINUOUS == materializedTable.getRefreshMode()) {
+ convertTableToMaterializedTableInContinuousMode(
+ operationExecutor, handle, convertOperation);
+ } else {
+ convertTableToMaterializedTableInFullMode(operationExecutor,
handle, convertOperation);
+ }
+ // Just return ok for unify different refresh job info of continuous
and full mode, user
+ // should get the refresh job info via desc table.
+ return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
+ }
+
+ private void convertTableToMaterializedTableInContinuousMode(
+ OperationExecutor operationExecutor,
+ OperationHandle handle,
+ ConvertTableToMaterializedTableOperation convertOperation) {
+ // swap the catalog entry from a regular table to a materialized table
first
+ operationExecutor.callExecutableOperation(handle, convertOperation);
+
+ ObjectIdentifier materializedTableIdentifier =
convertOperation.getTableIdentifier();
+ ResolvedCatalogMaterializedTable catalogMaterializedTable =
+ convertOperation.getMaterializedTable();
+
+ try {
+ executeContinuousRefreshJob(
+ operationExecutor,
+ handle,
+ catalogMaterializedTable,
+ materializedTableIdentifier,
+ Collections.emptyMap(),
+ Optional.empty());
+ } catch (Exception e) {
+ restoreOriginalTable(operationExecutor, handle, convertOperation);
+ throw new SqlExecutionException(
+ String.format(
+ "Failed to submit continuous refresh job when
converting table %s to a materialized table.",
+ materializedTableIdentifier),
+ e);
+ }
+ }
+
+ private void convertTableToMaterializedTableInFullMode(
+ OperationExecutor operationExecutor,
+ OperationHandle handle,
+ ConvertTableToMaterializedTableOperation convertOperation) {
+ if (workflowScheduler == null) {
+ throw new SqlExecutionException(
+ "The workflow scheduler must be configured when converting
a table to a materialized table in full refresh mode.");
+ }
+ // swap the catalog entry from a regular table to a materialized table
first
+ operationExecutor.callExecutableOperation(handle, convertOperation);
+
+ ObjectIdentifier materializedTableIdentifier =
convertOperation.getTableIdentifier();
+ ResolvedCatalogMaterializedTable catalogMaterializedTable =
+ convertOperation.getMaterializedTable();
+
+ final IntervalFreshness freshness =
catalogMaterializedTable.getDefinitionFreshness();
+ String cronExpression = convertFreshnessToCron(freshness);
+ CreateRefreshWorkflow createRefreshWorkflow =
+ new CreatePeriodicRefreshWorkflow(
+ materializedTableIdentifier,
+ catalogMaterializedTable.getExpandedQuery(),
+ cronExpression,
+ getSessionInitializationConf(operationExecutor),
+ Collections.emptyMap(),
+ restEndpointUrl);
+
+ try {
+ RefreshHandler refreshHandler =
+
workflowScheduler.createRefreshWorkflow(createRefreshWorkflow);
+ RefreshHandlerSerializer refreshHandlerSerializer =
+ workflowScheduler.getRefreshHandlerSerializer();
+ byte[] serializedRefreshHandler =
refreshHandlerSerializer.serialize(refreshHandler);
+
+ updateRefreshHandler(
+ operationExecutor,
+ handle,
+ materializedTableIdentifier,
+ catalogMaterializedTable,
+ RefreshStatus.ACTIVATED,
+ refreshHandler.asSummaryString(),
+ serializedRefreshHandler);
+ } catch (Exception e) {
+ restoreOriginalTable(operationExecutor, handle, convertOperation);
+ throw new SqlExecutionException(
+ String.format(
+ "Failed to create refresh workflow when converting
table %s to a materialized table.",
+ materializedTableIdentifier),
+ e);
+ }
+ }
+
+ /**
+ * Best-effort restore of the original regular table after a failed
conversion. The catalog
+ * entry was already swapped to a materialized table, so this drops it and
recreates the
+ * original table. This is not atomic: if the recreate fails, the entry is
left dropped.
+ */
+ private void restoreOriginalTable(
Review Comment:
This doesn't look expected to me. A user specifically wants to retain the
data by starting the conversion. Now you drop it...
Can we instead start suspended if scheduling fails? Alternatively, the
catalog actually needs to support converting it back.
Note the more architectural sound solution would be to also move the
scheduling into the catalog, so the catalog can do the rollback itself.
##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -99,6 +99,68 @@
*/
class MaterializedTableStatementITCase extends
AbstractMaterializedTableStatementITCase {
+ @Test
+ void testConvertTableToMaterializedTableInContinuousMode() throws
Exception {
Review Comment:
Also 1 negative test where things fail. (might be easier in unit tests)
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##########
@@ -1404,6 +1404,44 @@ public void alterTable(
"AlterTable");
}
+ /**
+ * Converts an existing regular table to a materialized table in place.
Identity and storage are
+ * preserved; only the kind and the materialized-table specific metadata
change.
+ *
+ * @param originalTable the existing regular table
+ * @param materializedTable the new materialized table definition
+ * @param changes describe the modification from originalTable to
materializedTable
+ * @param objectIdentifier fully qualified path of the table being
converted
+ */
+ public void convertTableToMaterializedTable(
+ CatalogTable originalTable,
+ CatalogMaterializedTable materializedTable,
+ List<TableChange> changes,
+ ObjectIdentifier objectIdentifier) {
+ execute(
+ (catalog, path) -> {
+ final CatalogTable resolvedOriginal =
+ (CatalogTable)
resolveCatalogBaseTable(originalTable);
+ final CatalogMaterializedTable resolvedMt =
+ (CatalogMaterializedTable)
resolveCatalogBaseTable(materializedTable);
+ catalog.convertTableToMaterializedTable(
+ path, resolvedOriginal, resolvedMt, changes);
+ catalogModificationListeners.forEach(
+ listener ->
+ listener.onEvent(
+ AlterTableEvent.createEvent(
Review Comment:
New event? Or does this suffice?
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java:
##########
@@ -78,23 +82,34 @@ private Operation handleCreateOrAlter(
final ObjectIdentifier identifier) {
final Optional<ResolvedCatalogBaseTable<?>> resolvedBaseTable =
context.getCatalogManager().getCatalogBaseTable(identifier);
- return resolvedBaseTable
- .map(
- oldBaseTable -> {
- if (oldBaseTable.getTableKind() !=
TableKind.MATERIALIZED_TABLE) {
- throw new ValidationException(
- String.format(
- "Table %s is not a
materialized table. Only materialized table support create or alter operation.",
- identifier.asSummaryString()));
- }
- return handleAlter(
- sqlCreateOrAlterMaterializedTable,
- (ResolvedCatalogMaterializedTable)
oldBaseTable,
- context,
- identifier);
- })
- .orElseGet(
- () -> handleCreate(sqlCreateOrAlterMaterializedTable,
context, identifier));
+ if (resolvedBaseTable.isEmpty()) {
+ return handleCreate(sqlCreateOrAlterMaterializedTable, context,
identifier);
+ }
+ final ResolvedCatalogBaseTable<?> oldBaseTable =
resolvedBaseTable.get();
+ switch (oldBaseTable.getTableKind()) {
+ case MATERIALIZED_TABLE:
+ return handleAlter(
+ sqlCreateOrAlterMaterializedTable,
+ (ResolvedCatalogMaterializedTable) oldBaseTable,
+ context,
+ identifier);
+ case VIEW:
+ throw new ValidationException(
+ String.format(
+ "Cannot convert view %s to a materialized
table.",
Review Comment:
Don't change the error message unnecessarily. Just collapse into default
case.
##########
docs/content/docs/sql/materialized-table/statements.md:
##########
@@ -260,6 +261,56 @@ The operation updates the materialized table similarly to
[ALTER MATERIALIZED TA
See [ALTER MATERIALIZED TABLE AS](#as-select_statement-1) for more details.
+## Converting a Table to a Materialized Table
+
+`CREATE OR ALTER MATERIALIZED TABLE` can convert an existing regular table
into a materialized table in place. The catalog object keeps its identity and
underlying storage; only the table kind and the materialized-table metadata
(query definition, freshness, refresh mode, and refresh status) change. After
the conversion, a refresh job is launched just as it is for a newly created
materialized table.
+
+This lets you adopt a materialized table on top of a table that already
exists, without dropping and recreating it.
+
+**Enabling conversion**
+
+Conversion is disabled by default. To enable it, set the following option in
the cluster configuration file `config.yaml`:
Review Comment:
Add: What happens if it's disabled?
##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractMaterializedTableStatementITCase.java:
##########
@@ -157,6 +159,16 @@ static void setUp(@TempDir Path temporaryFolder) throws
Exception {
.build();
}
+ private static Configuration getGatewayConfiguration() {
Review Comment:
This also disallows you from IT testing that the flag is off. I recommend
forking out a ConversionIT where you said the flag only there. Then you can
test disabled flag behavior in the existing test and move the current test to
the new file.
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java:
##########
@@ -117,6 +132,147 @@ private Operation handleAlter(
currentTable -> buildNewTable(currentTable, mergeContext,
schemaResolver));
}
+ private Operation handleConvert(
+ final SqlCreateOrAlterMaterializedTable
sqlCreateOrAlterMaterializedTable,
+ final ConvertContext context,
+ final ObjectIdentifier identifier,
+ final ResolvedCatalogTable oldBaseTable) {
+ final boolean conversionEnabled =
+ context.getTableConfig()
+ .getRootConfiguration()
+
.get(TableConfigOptions.MATERIALIZED_TABLE_CONVERSION_FROM_TABLE_ENABLED);
+ if (!conversionEnabled) {
+ throw new ValidationException(
+ String.format(
+ "Table %s is not a materialized table. Only
materialized table support create or alter operation.",
+ identifier.asSummaryString()));
+ }
+ return handleConvert(sqlCreateOrAlterMaterializedTable, oldBaseTable,
context, identifier);
+ }
+
+ private Operation handleConvert(
+ final SqlCreateOrAlterMaterializedTable sqlCreateOrAlterTable,
+ final ResolvedCatalogTable oldTable,
+ final ConvertContext context,
+ final ObjectIdentifier identifier) {
+ final MergeContext baseMergeContext =
getMergeContext(sqlCreateOrAlterTable, context);
+ final boolean ddlDeclaresWatermark =
sqlCreateOrAlterTable.getWatermark().isPresent();
+ final boolean ddlDeclaresPrimaryKey =
+ sqlCreateOrAlterTable.getFullConstraints().stream()
+ .anyMatch(SqlTableConstraint::isPrimaryKey);
+
+ final Schema mergedSchema = baseMergeContext.getMergedSchema();
+ final Schema finalSchema =
+ inheritWatermarkAndPrimaryKey(
+ mergedSchema,
+ oldTable.getResolvedSchema(),
+ ddlDeclaresWatermark,
+ ddlDeclaresPrimaryKey,
+ identifier);
+
+ final CatalogMaterializedTable newMaterializedTable =
+ CatalogMaterializedTable.newBuilder()
+ .schema(finalSchema)
+ .comment(baseMergeContext.getMergedComment())
+
.partitionKeys(baseMergeContext.getMergedPartitionKeys())
+ .options(baseMergeContext.getMergedTableOptions())
+
.originalQuery(baseMergeContext.getMergedOriginalQuery())
+
.expandedQuery(baseMergeContext.getMergedExpandedQuery())
+
.distribution(baseMergeContext.getMergedTableDistribution().orElse(null))
+ .freshness(baseMergeContext.getMergedFreshness())
+
.logicalRefreshMode(baseMergeContext.getMergedLogicalRefreshMode())
+ .refreshMode(baseMergeContext.getMergedRefreshMode())
+ .refreshStatus(RefreshStatus.INITIALIZING)
+ .startMode(baseMergeContext.getMergedStartMode())
+ .build();
+ final ResolvedCatalogMaterializedTable resolvedNewMaterializedTable =
+
context.getCatalogManager().resolveCatalogMaterializedTable(newMaterializedTable);
+
+ final List<TableChange> tableChanges =
+ buildConversionTableChanges(
Review Comment:
Build changes lazily like CoA
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java:
##########
@@ -117,6 +132,147 @@ private Operation handleAlter(
currentTable -> buildNewTable(currentTable, mergeContext,
schemaResolver));
}
+ private Operation handleConvert(
+ final SqlCreateOrAlterMaterializedTable
sqlCreateOrAlterMaterializedTable,
+ final ConvertContext context,
+ final ObjectIdentifier identifier,
+ final ResolvedCatalogTable oldBaseTable) {
+ final boolean conversionEnabled =
+ context.getTableConfig()
+ .getRootConfiguration()
+
.get(TableConfigOptions.MATERIALIZED_TABLE_CONVERSION_FROM_TABLE_ENABLED);
+ if (!conversionEnabled) {
+ throw new ValidationException(
+ String.format(
+ "Table %s is not a materialized table. Only
materialized table support create or alter operation.",
+ identifier.asSummaryString()));
+ }
+ return handleConvert(sqlCreateOrAlterMaterializedTable, oldBaseTable,
context, identifier);
+ }
+
+ private Operation handleConvert(
+ final SqlCreateOrAlterMaterializedTable sqlCreateOrAlterTable,
+ final ResolvedCatalogTable oldTable,
+ final ConvertContext context,
+ final ObjectIdentifier identifier) {
+ final MergeContext baseMergeContext =
getMergeContext(sqlCreateOrAlterTable, context);
+ final boolean ddlDeclaresWatermark =
sqlCreateOrAlterTable.getWatermark().isPresent();
+ final boolean ddlDeclaresPrimaryKey =
+ sqlCreateOrAlterTable.getFullConstraints().stream()
+ .anyMatch(SqlTableConstraint::isPrimaryKey);
+
+ final Schema mergedSchema = baseMergeContext.getMergedSchema();
+ final Schema finalSchema =
+ inheritWatermarkAndPrimaryKey(
+ mergedSchema,
+ oldTable.getResolvedSchema(),
+ ddlDeclaresWatermark,
+ ddlDeclaresPrimaryKey,
+ identifier);
+
+ final CatalogMaterializedTable newMaterializedTable =
+ CatalogMaterializedTable.newBuilder()
+ .schema(finalSchema)
+ .comment(baseMergeContext.getMergedComment())
+
.partitionKeys(baseMergeContext.getMergedPartitionKeys())
+ .options(baseMergeContext.getMergedTableOptions())
+
.originalQuery(baseMergeContext.getMergedOriginalQuery())
+
.expandedQuery(baseMergeContext.getMergedExpandedQuery())
+
.distribution(baseMergeContext.getMergedTableDistribution().orElse(null))
+ .freshness(baseMergeContext.getMergedFreshness())
+
.logicalRefreshMode(baseMergeContext.getMergedLogicalRefreshMode())
+ .refreshMode(baseMergeContext.getMergedRefreshMode())
+ .refreshStatus(RefreshStatus.INITIALIZING)
+ .startMode(baseMergeContext.getMergedStartMode())
+ .build();
+ final ResolvedCatalogMaterializedTable resolvedNewMaterializedTable =
+
context.getCatalogManager().resolveCatalogMaterializedTable(newMaterializedTable);
+
+ final List<TableChange> tableChanges =
+ buildConversionTableChanges(
+ oldTable,
+ resolvedNewMaterializedTable,
+ baseMergeContext.hasSchemaDefinition(),
+ baseMergeContext.hasConstraintDefinition());
+
+ return new ConvertTableToMaterializedTableOperation(
+ identifier, oldTable, resolvedNewMaterializedTable,
tableChanges);
+ }
+
+ /**
+ * Layers the watermark and primary key from the source table onto the
merged schema when the
+ * conversion DDL does not declare them, preserving the "in-place"
semantics.
+ *
+ * <p>Inheritance is per-feature and independent of column-list presence:
the DDL declaring a
+ * watermark replaces the source watermark; omitting it carries the source
watermark over. The
+ * same rule applies to the primary key.
+ *
+ * <p>To drop an inherited watermark or primary key after conversion, the
user runs a follow-up
+ * {@code ALTER MATERIALIZED TABLE ... DROP WATERMARK} or {@code ... DROP
PRIMARY KEY}.
+ *
+ * @return the merged schema with the inherited watermark and / or primary
key folded in
+ * @throws ValidationException if the source table carries more than one
watermark
+ * specification, which materialized tables do not support
+ */
+ private static Schema inheritWatermarkAndPrimaryKey(
Review Comment:
What actually happens if I run the CoA twice (first conversion, then alter).
Would it drop watermark/pk then?
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java:
##########
@@ -117,6 +132,147 @@ private Operation handleAlter(
currentTable -> buildNewTable(currentTable, mergeContext,
schemaResolver));
}
+ private Operation handleConvert(
+ final SqlCreateOrAlterMaterializedTable
sqlCreateOrAlterMaterializedTable,
+ final ConvertContext context,
+ final ObjectIdentifier identifier,
+ final ResolvedCatalogTable oldBaseTable) {
+ final boolean conversionEnabled =
+ context.getTableConfig()
+ .getRootConfiguration()
+
.get(TableConfigOptions.MATERIALIZED_TABLE_CONVERSION_FROM_TABLE_ENABLED);
+ if (!conversionEnabled) {
+ throw new ValidationException(
+ String.format(
+ "Table %s is not a materialized table. Only
materialized table support create or alter operation.",
+ identifier.asSummaryString()));
+ }
+ return handleConvert(sqlCreateOrAlterMaterializedTable, oldBaseTable,
context, identifier);
+ }
+
+ private Operation handleConvert(
Review Comment:
having two methods with near-identical signatures is throwing me off a bit.
Can you just merge, I don't see what's the difference on them from the
signature.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]