AHeise commented on code in PR #28287:
URL: https://github.com/apache/flink/pull/28287#discussion_r3339304451


##########
docs/content/docs/sql/materialized-table/statements.md:
##########
@@ -260,6 +261,56 @@ The operation updates the materialized table similarly to 
[ALTER MATERIALIZED TA
 
 See [ALTER MATERIALIZED TABLE AS](#as-select_statement-1) for more details.
 
+## Converting a Table to a Materialized Table
+
+`CREATE OR ALTER MATERIALIZED TABLE` can convert an existing regular table 
into a materialized table in place. The catalog object keeps its identity and 
underlying storage; only the table kind and the materialized-table metadata 
(query definition, freshness, refresh mode, and refresh status) change. After 
the conversion, a refresh job is launched just as it is for a newly created 
materialized table.
+
+This lets you adopt a materialized table on top of a table that already 
exists, without dropping and recreating it.

Review Comment:
   Swap the sentences, the technical explanation last.



##########
docs/content/docs/sql/materialized-table/statements.md:
##########
@@ -260,6 +261,56 @@ The operation updates the materialized table similarly to 
[ALTER MATERIALIZED TA
 
 See [ALTER MATERIALIZED TABLE AS](#as-select_statement-1) for more details.
 
+## Converting a Table to a Materialized Table
+
+`CREATE OR ALTER MATERIALIZED TABLE` can convert an existing regular table 
into a materialized table in place. The catalog object keeps its identity and 
underlying storage; only the table kind and the materialized-table metadata 
(query definition, freshness, refresh mode, and refresh status) change. After 
the conversion, a refresh job is launched just as it is for a newly created 
materialized table.
+
+This lets you adopt a materialized table on top of a table that already 
exists, without dropping and recreating it.
+
+**Enabling conversion**
+
+Conversion is disabled by default. To enable it, set the following option in 
the cluster configuration file `config.yaml`:
+
+```yaml
+table.materialized-table.conversion-from-table.enabled: true
+```
+
+This is a cluster-wide setting: it must be set in the cluster configuration, 
and a session-level `SET` statement has no effect. When the option is disabled, 
`CREATE OR ALTER MATERIALIZED TABLE` against a regular table is rejected.
+
+**Watermark and primary key inheritance**
+
+When the conversion statement does not declare a `WATERMARK` or a `PRIMARY 
KEY`, the corresponding definition is inherited from the source table:

Review Comment:
   Hm what's the CoA behavior? Wouldn't we want to drop things not declared?



##########
docs/layouts/shortcodes/generated/table_config_configuration.html:
##########
@@ -68,6 +68,12 @@
             <td>String</td>
             <td>The local time zone defines current session time zone id. It 
is used when converting to/from &lt;code&gt;TIMESTAMP WITH LOCAL TIME 
ZONE&lt;/code&gt;. Internally, timestamps with local time zone are always 
represented in the UTC time zone. However, when converting to data types that 
don't include a time zone (e.g. TIMESTAMP, TIME, or simply STRING), the session 
time zone is used during conversion. The input of option is either a full name 
such as "America/Los_Angeles", or a custom timezone id such as "GMT-08:00".</td>
         </tr>
+        <tr>
+            
<td><h5>table.materialized-table.conversion-from-table.enabled</h5><br> <span 
class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>If enabled, CREATE OR ALTER MATERIALIZED TABLE against an 
existing regular table converts it in place to a materialized table, preserving 
identity and storage. This is a cluster-wide setting: it must be set in the 
cluster configuration (e.g. config.yaml); session-level SET has no effect. When 
disabled (the default), CREATE OR ALTER MATERIALIZED TABLE against a regular 
table is rejected.</td>

Review Comment:
   "CREATE OR ALTER MATERIALIZED TABLE against" sounds weird. Either "issuing 
CoA against" or "CREATE OR ALTER... on"



##########
docs/content.zh/docs/sql/materialized-table/statements.md:
##########
@@ -242,6 +242,7 @@ The `OR ALTER` clause provides create-or-update semantics:
 
 - **If the materialized table does not exist**: Creates a new materialized 
table with the specified options
 - **If the materialized table exists**: Modifies the query definition (behaves 
like `ALTER MATERIALIZED TABLE AS`)
+- **If a regular table with the same name exists**: Converts it in place into 
a materialized table, when enabled (see [Converting a Table to a Materialized 
Table](#converting-a-table-to-a-materialized-table))

Review Comment:
   Whole section needs to be translated. Did you file a translation ticket 
already?



##########
docs/content/docs/sql/materialized-table/statements.md:
##########
@@ -260,6 +261,56 @@ The operation updates the materialized table similarly to 
[ALTER MATERIALIZED TA
 
 See [ALTER MATERIALIZED TABLE AS](#as-select_statement-1) for more details.
 
+## Converting a Table to a Materialized Table
+
+`CREATE OR ALTER MATERIALIZED TABLE` can convert an existing regular table 
into a materialized table in place. The catalog object keeps its identity and 
underlying storage; only the table kind and the materialized-table metadata 
(query definition, freshness, refresh mode, and refresh status) change. After 
the conversion, a refresh job is launched just as it is for a newly created 
materialized table.
+
+This lets you adopt a materialized table on top of a table that already 
exists, without dropping and recreating it.
+
+**Enabling conversion**
+
+Conversion is disabled by default. To enable it, set the following option in 
the cluster configuration file `config.yaml`:
+
+```yaml
+table.materialized-table.conversion-from-table.enabled: true
+```
+
+This is a cluster-wide setting: it must be set in the cluster configuration, 
and a session-level `SET` statement has no effect. When the option is disabled, 
`CREATE OR ALTER MATERIALIZED TABLE` against a regular table is rejected.
+
+**Watermark and primary key inheritance**
+
+When the conversion statement does not declare a `WATERMARK` or a `PRIMARY 
KEY`, the corresponding definition is inherited from the source table:
+
+- Omitting the watermark carries over the source table's watermark; declaring 
one replaces it.
+- Omitting the primary key carries over the source table's primary key; 
declaring one replaces it.
+
+Inheritance is applied independently for each, regardless of whether a column 
list is provided. To drop an inherited watermark or primary key after 
conversion, run a follow-up `ALTER MATERIALIZED TABLE`. A source table that 
defines more than one watermark cannot be converted; declare an explicit 
watermark in the conversion statement instead.
+
+**Example**
+
+```sql
+-- An existing regular table that you now want to maintain as a materialized 
table
+CREATE TABLE user_spending (
+    user_id BIGINT,
+    total_amount BIGINT
+) WITH (
+    'connector' = '...'
+);
+
+-- Convert it in place; from now on it is refreshed by the query
+CREATE OR ALTER MATERIALIZED TABLE user_spending
+    AS SELECT
+        user_id,
+        SUM(amount) AS total_amount
+    FROM orders
+    GROUP BY user_id;
+```
+
+<span class="label label-danger">Note</span>
+- The conversion is one-way and cannot be undone. To revert, drop the 
materialized table and recreate the original table.

Review Comment:
   First explain that you can simply suspend and use it as if it was a regular 
table for input/output.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -302,6 +307,119 @@ private void createMaterializedTableInFullMode(
         }
     }
 
+    private ResultFetcher callConvertTableToMaterializedTableOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            ConvertTableToMaterializedTableOperation convertOperation) {
+        ResolvedCatalogMaterializedTable materializedTable = 
convertOperation.getMaterializedTable();
+        if (RefreshMode.CONTINUOUS == materializedTable.getRefreshMode()) {
+            convertTableToMaterializedTableInContinuousMode(
+                    operationExecutor, handle, convertOperation);
+        } else {
+            convertTableToMaterializedTableInFullMode(operationExecutor, 
handle, convertOperation);
+        }
+        // Just return ok for unify different refresh job info of continuous 
and full mode, user
+        // should get the refresh job info via desc table.
+        return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
+    }
+
+    private void convertTableToMaterializedTableInContinuousMode(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            ConvertTableToMaterializedTableOperation convertOperation) {
+        // swap the catalog entry from a regular table to a materialized table 
first
+        operationExecutor.callExecutableOperation(handle, convertOperation);
+
+        ObjectIdentifier materializedTableIdentifier = 
convertOperation.getTableIdentifier();
+        ResolvedCatalogMaterializedTable catalogMaterializedTable =
+                convertOperation.getMaterializedTable();
+
+        try {
+            executeContinuousRefreshJob(
+                    operationExecutor,
+                    handle,
+                    catalogMaterializedTable,
+                    materializedTableIdentifier,
+                    Collections.emptyMap(),
+                    Optional.empty());
+        } catch (Exception e) {
+            restoreOriginalTable(operationExecutor, handle, convertOperation);
+            throw new SqlExecutionException(
+                    String.format(
+                            "Failed to submit continuous refresh job when 
converting table %s to a materialized table.",
+                            materializedTableIdentifier),
+                    e);
+        }
+    }
+
+    private void convertTableToMaterializedTableInFullMode(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            ConvertTableToMaterializedTableOperation convertOperation) {
+        if (workflowScheduler == null) {
+            throw new SqlExecutionException(
+                    "The workflow scheduler must be configured when converting 
a table to a materialized table in full refresh mode.");
+        }
+        // swap the catalog entry from a regular table to a materialized table 
first
+        operationExecutor.callExecutableOperation(handle, convertOperation);
+
+        ObjectIdentifier materializedTableIdentifier = 
convertOperation.getTableIdentifier();
+        ResolvedCatalogMaterializedTable catalogMaterializedTable =
+                convertOperation.getMaterializedTable();
+
+        final IntervalFreshness freshness = 
catalogMaterializedTable.getDefinitionFreshness();
+        String cronExpression = convertFreshnessToCron(freshness);
+        CreateRefreshWorkflow createRefreshWorkflow =
+                new CreatePeriodicRefreshWorkflow(
+                        materializedTableIdentifier,
+                        catalogMaterializedTable.getExpandedQuery(),
+                        cronExpression,
+                        getSessionInitializationConf(operationExecutor),
+                        Collections.emptyMap(),
+                        restEndpointUrl);
+
+        try {
+            RefreshHandler refreshHandler =
+                    
workflowScheduler.createRefreshWorkflow(createRefreshWorkflow);
+            RefreshHandlerSerializer refreshHandlerSerializer =
+                    workflowScheduler.getRefreshHandlerSerializer();
+            byte[] serializedRefreshHandler = 
refreshHandlerSerializer.serialize(refreshHandler);
+
+            updateRefreshHandler(
+                    operationExecutor,
+                    handle,
+                    materializedTableIdentifier,
+                    catalogMaterializedTable,
+                    RefreshStatus.ACTIVATED,
+                    refreshHandler.asSummaryString(),
+                    serializedRefreshHandler);
+        } catch (Exception e) {

Review Comment:
   Extract boilerplate code into helper. I see the same thing in CREATE.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java:
##########
@@ -383,6 +383,30 @@ default void alterTable(
         alterTable(tablePath, newTable, ignoreIfNotExists);
     }
 
+    /**
+     * Converts an existing {@link CatalogTable} to a {@link 
CatalogMaterializedTable} in place,
+     * preserving the catalog entry's identity and storage.
+     *
+     * <p>The default throws {@link UnsupportedOperationException}; catalogs 
that support in-place
+     * conversion override it. Launching the refresh job for the new 
materialized table is the
+     * executor's responsibility, not the catalog's.
+     *
+     * @param tableChanges structured delta between {@code originalTable} and 
{@code

Review Comment:
   How can this be empty? Should you get modification changes for all the MT 
unique properties?



##########
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java:
##########
@@ -54,6 +59,63 @@ static void init() {
 
     // ------ tables ------
 
+    @Test
+    void testConvertTableToMaterializedTable() throws Exception {

Review Comment:
   Nested tests would well here



##########
flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalog.java:
##########
@@ -416,6 +416,40 @@ public void alterTable(
         }
     }
 
+    @Override
+    public void convertTableToMaterializedTable(
+            ObjectPath tablePath,
+            CatalogTable originalTable,
+            CatalogMaterializedTable materializedTable,
+            List<TableChange> tableChanges)
+            throws TableNotExistException, CatalogException {
+        if (!tableExists(tablePath)) {
+            throw new TableNotExistException(getName(), tablePath);
+        }
+
+        Tuple4<Path, Path, Path, String> tableSchemaInfo =

Review Comment:
   Can we use record yet?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java:
##########
@@ -78,23 +82,34 @@ private Operation handleCreateOrAlter(
             final ObjectIdentifier identifier) {
         final Optional<ResolvedCatalogBaseTable<?>> resolvedBaseTable =
                 context.getCatalogManager().getCatalogBaseTable(identifier);
-        return resolvedBaseTable
-                .map(
-                        oldBaseTable -> {
-                            if (oldBaseTable.getTableKind() != 
TableKind.MATERIALIZED_TABLE) {
-                                throw new ValidationException(
-                                        String.format(
-                                                "Table %s is not a 
materialized table. Only materialized table support create or alter operation.",
-                                                identifier.asSummaryString()));
-                            }
-                            return handleAlter(
-                                    sqlCreateOrAlterMaterializedTable,
-                                    (ResolvedCatalogMaterializedTable) 
oldBaseTable,
-                                    context,
-                                    identifier);
-                        })
-                .orElseGet(
-                        () -> handleCreate(sqlCreateOrAlterMaterializedTable, 
context, identifier));
+        if (resolvedBaseTable.isEmpty()) {
+            return handleCreate(sqlCreateOrAlterMaterializedTable, context, 
identifier);
+        }
+        final ResolvedCatalogBaseTable<?> oldBaseTable = 
resolvedBaseTable.get();
+        switch (oldBaseTable.getTableKind()) {
+            case MATERIALIZED_TABLE:
+                return handleAlter(
+                        sqlCreateOrAlterMaterializedTable,
+                        (ResolvedCatalogMaterializedTable) oldBaseTable,
+                        context,
+                        identifier);
+            case VIEW:
+                throw new ValidationException(
+                        String.format(
+                                "Cannot convert view %s to a materialized 
table.",
+                                identifier.asSummaryString()));
+            case TABLE:
+                return handleConvert(
+                        sqlCreateOrAlterMaterializedTable,
+                        context,
+                        identifier,
+                        (ResolvedCatalogTable) oldBaseTable);
+            default:
+                throw new ValidationException(
+                        String.format(
+                                "Unsupported table kind %s for %s.",

Review Comment:
   Don't change the error message unnecessarily.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConvertTableToMaterializedTableTest.java:
##########
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.expressions.SqlCallExpression;
+import org.apache.flink.table.operations.Operation;
+import 
org.apache.flink.table.operations.materializedtable.ConvertTableToMaterializedTableOperation;
+import 
org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation;
+import 
org.apache.flink.table.operations.materializedtable.FullAlterMaterializedTableOperation;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Tests for in-place conversion of a regular table to a materialized table 
via {@code CREATE OR
+ * ALTER MATERIALIZED TABLE}.
+ */
+class SqlNodeToOperationConvertTableToMaterializedTableTest
+        extends SqlNodeToOperationConversionTestBase {
+
+    private static final String SOURCE_REGULAR_TABLE_NAME = "src_table";
+    private static final String EXISTING_MT_NAME = "existing_mt";
+
+    private static final String CREATE_EXISTING_MT_SQL =
+            "CREATE MATERIALIZED TABLE existing_mt (\n"
+                    + "  CONSTRAINT pk1 PRIMARY KEY(a) NOT ENFORCED\n"
+                    + ")\n"
+                    + "FRESHNESS = INTERVAL '1' MINUTE\n"
+                    + "AS SELECT a, b FROM t1";
+
+    @BeforeEach
+    void before() throws TableAlreadyExistException, DatabaseNotExistException 
{
+        super.before();
+        createSourceRegularTable(SOURCE_REGULAR_TABLE_NAME, false, false);
+        createTimestampSourceTable();
+        createExistingMaterializedTable();
+    }
+
+    // 
--------------------------------------------------------------------------------------------

Review Comment:
   Nested tests are superior to comment blocks...



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java:
##########
@@ -117,6 +132,147 @@ private Operation handleAlter(
                 currentTable -> buildNewTable(currentTable, mergeContext, 
schemaResolver));
     }
 
+    private Operation handleConvert(
+            final SqlCreateOrAlterMaterializedTable 
sqlCreateOrAlterMaterializedTable,
+            final ConvertContext context,
+            final ObjectIdentifier identifier,
+            final ResolvedCatalogTable oldBaseTable) {
+        final boolean conversionEnabled =
+                context.getTableConfig()
+                        .getRootConfiguration()
+                        
.get(TableConfigOptions.MATERIALIZED_TABLE_CONVERSION_FROM_TABLE_ENABLED);
+        if (!conversionEnabled) {
+            throw new ValidationException(
+                    String.format(
+                            "Table %s is not a materialized table. Only 
materialized table support create or alter operation.",
+                            identifier.asSummaryString()));
+        }
+        return handleConvert(sqlCreateOrAlterMaterializedTable, oldBaseTable, 
context, identifier);
+    }
+
+    private Operation handleConvert(
+            final SqlCreateOrAlterMaterializedTable sqlCreateOrAlterTable,
+            final ResolvedCatalogTable oldTable,
+            final ConvertContext context,
+            final ObjectIdentifier identifier) {
+        final MergeContext baseMergeContext = 
getMergeContext(sqlCreateOrAlterTable, context);
+        final boolean ddlDeclaresWatermark = 
sqlCreateOrAlterTable.getWatermark().isPresent();
+        final boolean ddlDeclaresPrimaryKey =
+                sqlCreateOrAlterTable.getFullConstraints().stream()
+                        .anyMatch(SqlTableConstraint::isPrimaryKey);
+
+        final Schema mergedSchema = baseMergeContext.getMergedSchema();
+        final Schema finalSchema =
+                inheritWatermarkAndPrimaryKey(
+                        mergedSchema,
+                        oldTable.getResolvedSchema(),
+                        ddlDeclaresWatermark,
+                        ddlDeclaresPrimaryKey,
+                        identifier);
+
+        final CatalogMaterializedTable newMaterializedTable =
+                CatalogMaterializedTable.newBuilder()
+                        .schema(finalSchema)
+                        .comment(baseMergeContext.getMergedComment())
+                        
.partitionKeys(baseMergeContext.getMergedPartitionKeys())
+                        .options(baseMergeContext.getMergedTableOptions())
+                        
.originalQuery(baseMergeContext.getMergedOriginalQuery())
+                        
.expandedQuery(baseMergeContext.getMergedExpandedQuery())
+                        
.distribution(baseMergeContext.getMergedTableDistribution().orElse(null))
+                        .freshness(baseMergeContext.getMergedFreshness())
+                        
.logicalRefreshMode(baseMergeContext.getMergedLogicalRefreshMode())
+                        .refreshMode(baseMergeContext.getMergedRefreshMode())
+                        .refreshStatus(RefreshStatus.INITIALIZING)
+                        .startMode(baseMergeContext.getMergedStartMode())
+                        .build();
+        final ResolvedCatalogMaterializedTable resolvedNewMaterializedTable =
+                
context.getCatalogManager().resolveCatalogMaterializedTable(newMaterializedTable);
+
+        final List<TableChange> tableChanges =
+                buildConversionTableChanges(
+                        oldTable,
+                        resolvedNewMaterializedTable,
+                        baseMergeContext.hasSchemaDefinition(),
+                        baseMergeContext.hasConstraintDefinition());
+
+        return new ConvertTableToMaterializedTableOperation(
+                identifier, oldTable, resolvedNewMaterializedTable, 
tableChanges);
+    }
+
+    /**
+     * Layers the watermark and primary key from the source table onto the 
merged schema when the
+     * conversion DDL does not declare them, preserving the "in-place" 
semantics.
+     *
+     * <p>Inheritance is per-feature and independent of column-list presence: 
the DDL declaring a
+     * watermark replaces the source watermark; omitting it carries the source 
watermark over. The
+     * same rule applies to the primary key.
+     *
+     * <p>To drop an inherited watermark or primary key after conversion, the 
user runs a follow-up
+     * {@code ALTER MATERIALIZED TABLE ... DROP WATERMARK} or {@code ... DROP 
PRIMARY KEY}.
+     *
+     * @return the merged schema with the inherited watermark and / or primary 
key folded in
+     * @throws ValidationException if the source table carries more than one 
watermark
+     *     specification, which materialized tables do not support
+     */
+    private static Schema inheritWatermarkAndPrimaryKey(

Review Comment:
   I don't agree with this part. Was that in the FLIP? This destroys the 
idempotency mindset that we have for CoA.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java:
##########
@@ -117,6 +132,147 @@ private Operation handleAlter(
                 currentTable -> buildNewTable(currentTable, mergeContext, 
schemaResolver));
     }
 
+    private Operation handleConvert(
+            final SqlCreateOrAlterMaterializedTable 
sqlCreateOrAlterMaterializedTable,
+            final ConvertContext context,
+            final ObjectIdentifier identifier,
+            final ResolvedCatalogTable oldBaseTable) {
+        final boolean conversionEnabled =
+                context.getTableConfig()
+                        .getRootConfiguration()
+                        
.get(TableConfigOptions.MATERIALIZED_TABLE_CONVERSION_FROM_TABLE_ENABLED);
+        if (!conversionEnabled) {
+            throw new ValidationException(
+                    String.format(
+                            "Table %s is not a materialized table. Only 
materialized table support create or alter operation.",
+                            identifier.asSummaryString()));
+        }
+        return handleConvert(sqlCreateOrAlterMaterializedTable, oldBaseTable, 
context, identifier);
+    }
+
+    private Operation handleConvert(
+            final SqlCreateOrAlterMaterializedTable sqlCreateOrAlterTable,
+            final ResolvedCatalogTable oldTable,
+            final ConvertContext context,
+            final ObjectIdentifier identifier) {
+        final MergeContext baseMergeContext = 
getMergeContext(sqlCreateOrAlterTable, context);
+        final boolean ddlDeclaresWatermark = 
sqlCreateOrAlterTable.getWatermark().isPresent();
+        final boolean ddlDeclaresPrimaryKey =
+                sqlCreateOrAlterTable.getFullConstraints().stream()
+                        .anyMatch(SqlTableConstraint::isPrimaryKey);
+
+        final Schema mergedSchema = baseMergeContext.getMergedSchema();
+        final Schema finalSchema =
+                inheritWatermarkAndPrimaryKey(
+                        mergedSchema,
+                        oldTable.getResolvedSchema(),
+                        ddlDeclaresWatermark,
+                        ddlDeclaresPrimaryKey,
+                        identifier);
+
+        final CatalogMaterializedTable newMaterializedTable =
+                CatalogMaterializedTable.newBuilder()
+                        .schema(finalSchema)
+                        .comment(baseMergeContext.getMergedComment())
+                        
.partitionKeys(baseMergeContext.getMergedPartitionKeys())
+                        .options(baseMergeContext.getMergedTableOptions())
+                        
.originalQuery(baseMergeContext.getMergedOriginalQuery())
+                        
.expandedQuery(baseMergeContext.getMergedExpandedQuery())
+                        
.distribution(baseMergeContext.getMergedTableDistribution().orElse(null))
+                        .freshness(baseMergeContext.getMergedFreshness())
+                        
.logicalRefreshMode(baseMergeContext.getMergedLogicalRefreshMode())
+                        .refreshMode(baseMergeContext.getMergedRefreshMode())
+                        .refreshStatus(RefreshStatus.INITIALIZING)
+                        .startMode(baseMergeContext.getMergedStartMode())
+                        .build();
+        final ResolvedCatalogMaterializedTable resolvedNewMaterializedTable =
+                
context.getCatalogManager().resolveCatalogMaterializedTable(newMaterializedTable);
+
+        final List<TableChange> tableChanges =
+                buildConversionTableChanges(
+                        oldTable,
+                        resolvedNewMaterializedTable,
+                        baseMergeContext.hasSchemaDefinition(),
+                        baseMergeContext.hasConstraintDefinition());
+
+        return new ConvertTableToMaterializedTableOperation(
+                identifier, oldTable, resolvedNewMaterializedTable, 
tableChanges);
+    }
+
+    /**
+     * Layers the watermark and primary key from the source table onto the 
merged schema when the
+     * conversion DDL does not declare them, preserving the "in-place" 
semantics.
+     *
+     * <p>Inheritance is per-feature and independent of column-list presence: 
the DDL declaring a
+     * watermark replaces the source watermark; omitting it carries the source 
watermark over. The
+     * same rule applies to the primary key.
+     *
+     * <p>To drop an inherited watermark or primary key after conversion, the 
user runs a follow-up
+     * {@code ALTER MATERIALIZED TABLE ... DROP WATERMARK} or {@code ... DROP 
PRIMARY KEY}.
+     *
+     * @return the merged schema with the inherited watermark and / or primary 
key folded in
+     * @throws ValidationException if the source table carries more than one 
watermark
+     *     specification, which materialized tables do not support
+     */
+    private static Schema inheritWatermarkAndPrimaryKey(

Review Comment:
   Note that especially those two table properties are for consumers only. So 
it's a hard sell that it's needed for keeping the data intact imho.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -302,6 +307,119 @@ private void createMaterializedTableInFullMode(
         }
     }
 
+    private ResultFetcher callConvertTableToMaterializedTableOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            ConvertTableToMaterializedTableOperation convertOperation) {
+        ResolvedCatalogMaterializedTable materializedTable = 
convertOperation.getMaterializedTable();
+        if (RefreshMode.CONTINUOUS == materializedTable.getRefreshMode()) {
+            convertTableToMaterializedTableInContinuousMode(
+                    operationExecutor, handle, convertOperation);
+        } else {
+            convertTableToMaterializedTableInFullMode(operationExecutor, 
handle, convertOperation);
+        }
+        // Just return ok for unify different refresh job info of continuous 
and full mode, user
+        // should get the refresh job info via desc table.
+        return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
+    }
+
+    private void convertTableToMaterializedTableInContinuousMode(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            ConvertTableToMaterializedTableOperation convertOperation) {
+        // swap the catalog entry from a regular table to a materialized table 
first
+        operationExecutor.callExecutableOperation(handle, convertOperation);
+
+        ObjectIdentifier materializedTableIdentifier = 
convertOperation.getTableIdentifier();
+        ResolvedCatalogMaterializedTable catalogMaterializedTable =
+                convertOperation.getMaterializedTable();
+
+        try {
+            executeContinuousRefreshJob(
+                    operationExecutor,
+                    handle,
+                    catalogMaterializedTable,
+                    materializedTableIdentifier,
+                    Collections.emptyMap(),
+                    Optional.empty());
+        } catch (Exception e) {
+            restoreOriginalTable(operationExecutor, handle, convertOperation);
+            throw new SqlExecutionException(
+                    String.format(
+                            "Failed to submit continuous refresh job when 
converting table %s to a materialized table.",
+                            materializedTableIdentifier),
+                    e);
+        }
+    }
+
+    private void convertTableToMaterializedTableInFullMode(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            ConvertTableToMaterializedTableOperation convertOperation) {
+        if (workflowScheduler == null) {
+            throw new SqlExecutionException(
+                    "The workflow scheduler must be configured when converting 
a table to a materialized table in full refresh mode.");
+        }
+        // swap the catalog entry from a regular table to a materialized table 
first
+        operationExecutor.callExecutableOperation(handle, convertOperation);
+
+        ObjectIdentifier materializedTableIdentifier = 
convertOperation.getTableIdentifier();
+        ResolvedCatalogMaterializedTable catalogMaterializedTable =
+                convertOperation.getMaterializedTable();
+
+        final IntervalFreshness freshness = 
catalogMaterializedTable.getDefinitionFreshness();
+        String cronExpression = convertFreshnessToCron(freshness);
+        CreateRefreshWorkflow createRefreshWorkflow =
+                new CreatePeriodicRefreshWorkflow(
+                        materializedTableIdentifier,
+                        catalogMaterializedTable.getExpandedQuery(),
+                        cronExpression,
+                        getSessionInitializationConf(operationExecutor),
+                        Collections.emptyMap(),
+                        restEndpointUrl);
+
+        try {
+            RefreshHandler refreshHandler =
+                    
workflowScheduler.createRefreshWorkflow(createRefreshWorkflow);
+            RefreshHandlerSerializer refreshHandlerSerializer =
+                    workflowScheduler.getRefreshHandlerSerializer();
+            byte[] serializedRefreshHandler = 
refreshHandlerSerializer.serialize(refreshHandler);
+
+            updateRefreshHandler(
+                    operationExecutor,
+                    handle,
+                    materializedTableIdentifier,
+                    catalogMaterializedTable,
+                    RefreshStatus.ACTIVATED,
+                    refreshHandler.asSummaryString(),
+                    serializedRefreshHandler);
+        } catch (Exception e) {
+            restoreOriginalTable(operationExecutor, handle, convertOperation);
+            throw new SqlExecutionException(
+                    String.format(
+                            "Failed to create refresh workflow when converting 
table %s to a materialized table.",
+                            materializedTableIdentifier),
+                    e);
+        }
+    }
+
+    /**
+     * Best-effort restore of the original regular table after a failed 
conversion. The catalog
+     * entry was already swapped to a materialized table, so this drops it and 
recreates the
+     * original table. This is not atomic: if the recreate fails, the entry is 
left dropped.
+     */
+    private void restoreOriginalTable(

Review Comment:
   This doesn't look expected to me. A user specifically wants to retain the 
data by starting the conversion. Now you drop it...
   Can we instead start suspended if scheduling fails? Alternatively, the 
catalog actually needs to support converting it back.
   Note the more architectural sound solution would be to also move the 
scheduling into the catalog, so the catalog can do the rollback itself.



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -99,6 +99,68 @@
  */
 class MaterializedTableStatementITCase extends 
AbstractMaterializedTableStatementITCase {
 
+    @Test
+    void testConvertTableToMaterializedTableInContinuousMode() throws 
Exception {

Review Comment:
   Also 1 negative test where things fail. (might be easier in unit tests)



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##########
@@ -1404,6 +1404,44 @@ public void alterTable(
                 "AlterTable");
     }
 
+    /**
+     * Converts an existing regular table to a materialized table in place. 
Identity and storage are
+     * preserved; only the kind and the materialized-table specific metadata 
change.
+     *
+     * @param originalTable the existing regular table
+     * @param materializedTable the new materialized table definition
+     * @param changes describe the modification from originalTable to 
materializedTable
+     * @param objectIdentifier fully qualified path of the table being 
converted
+     */
+    public void convertTableToMaterializedTable(
+            CatalogTable originalTable,
+            CatalogMaterializedTable materializedTable,
+            List<TableChange> changes,
+            ObjectIdentifier objectIdentifier) {
+        execute(
+                (catalog, path) -> {
+                    final CatalogTable resolvedOriginal =
+                            (CatalogTable) 
resolveCatalogBaseTable(originalTable);
+                    final CatalogMaterializedTable resolvedMt =
+                            (CatalogMaterializedTable) 
resolveCatalogBaseTable(materializedTable);
+                    catalog.convertTableToMaterializedTable(
+                            path, resolvedOriginal, resolvedMt, changes);
+                    catalogModificationListeners.forEach(
+                            listener ->
+                                    listener.onEvent(
+                                            AlterTableEvent.createEvent(

Review Comment:
   New event? Or does this suffice?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java:
##########
@@ -78,23 +82,34 @@ private Operation handleCreateOrAlter(
             final ObjectIdentifier identifier) {
         final Optional<ResolvedCatalogBaseTable<?>> resolvedBaseTable =
                 context.getCatalogManager().getCatalogBaseTable(identifier);
-        return resolvedBaseTable
-                .map(
-                        oldBaseTable -> {
-                            if (oldBaseTable.getTableKind() != 
TableKind.MATERIALIZED_TABLE) {
-                                throw new ValidationException(
-                                        String.format(
-                                                "Table %s is not a 
materialized table. Only materialized table support create or alter operation.",
-                                                identifier.asSummaryString()));
-                            }
-                            return handleAlter(
-                                    sqlCreateOrAlterMaterializedTable,
-                                    (ResolvedCatalogMaterializedTable) 
oldBaseTable,
-                                    context,
-                                    identifier);
-                        })
-                .orElseGet(
-                        () -> handleCreate(sqlCreateOrAlterMaterializedTable, 
context, identifier));
+        if (resolvedBaseTable.isEmpty()) {
+            return handleCreate(sqlCreateOrAlterMaterializedTable, context, 
identifier);
+        }
+        final ResolvedCatalogBaseTable<?> oldBaseTable = 
resolvedBaseTable.get();
+        switch (oldBaseTable.getTableKind()) {
+            case MATERIALIZED_TABLE:
+                return handleAlter(
+                        sqlCreateOrAlterMaterializedTable,
+                        (ResolvedCatalogMaterializedTable) oldBaseTable,
+                        context,
+                        identifier);
+            case VIEW:
+                throw new ValidationException(
+                        String.format(
+                                "Cannot convert view %s to a materialized 
table.",

Review Comment:
   Don't change the error message unnecessarily. Just collapse into default 
case.



##########
docs/content/docs/sql/materialized-table/statements.md:
##########
@@ -260,6 +261,56 @@ The operation updates the materialized table similarly to 
[ALTER MATERIALIZED TA
 
 See [ALTER MATERIALIZED TABLE AS](#as-select_statement-1) for more details.
 
+## Converting a Table to a Materialized Table
+
+`CREATE OR ALTER MATERIALIZED TABLE` can convert an existing regular table 
into a materialized table in place. The catalog object keeps its identity and 
underlying storage; only the table kind and the materialized-table metadata 
(query definition, freshness, refresh mode, and refresh status) change. After 
the conversion, a refresh job is launched just as it is for a newly created 
materialized table.
+
+This lets you adopt a materialized table on top of a table that already 
exists, without dropping and recreating it.
+
+**Enabling conversion**
+
+Conversion is disabled by default. To enable it, set the following option in 
the cluster configuration file `config.yaml`:

Review Comment:
   Add: What happens if it's disabled?



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractMaterializedTableStatementITCase.java:
##########
@@ -157,6 +159,16 @@ static void setUp(@TempDir Path temporaryFolder) throws 
Exception {
                         .build();
     }
 
+    private static Configuration getGatewayConfiguration() {

Review Comment:
   This also disallows you from IT testing that the flag is off. I recommend 
forking out a ConversionIT where you said the flag only there. Then you can 
test disabled flag behavior in the existing test and move the current test to 
the new file.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java:
##########
@@ -117,6 +132,147 @@ private Operation handleAlter(
                 currentTable -> buildNewTable(currentTable, mergeContext, 
schemaResolver));
     }
 
+    private Operation handleConvert(
+            final SqlCreateOrAlterMaterializedTable 
sqlCreateOrAlterMaterializedTable,
+            final ConvertContext context,
+            final ObjectIdentifier identifier,
+            final ResolvedCatalogTable oldBaseTable) {
+        final boolean conversionEnabled =
+                context.getTableConfig()
+                        .getRootConfiguration()
+                        
.get(TableConfigOptions.MATERIALIZED_TABLE_CONVERSION_FROM_TABLE_ENABLED);
+        if (!conversionEnabled) {
+            throw new ValidationException(
+                    String.format(
+                            "Table %s is not a materialized table. Only 
materialized table support create or alter operation.",
+                            identifier.asSummaryString()));
+        }
+        return handleConvert(sqlCreateOrAlterMaterializedTable, oldBaseTable, 
context, identifier);
+    }
+
+    private Operation handleConvert(
+            final SqlCreateOrAlterMaterializedTable sqlCreateOrAlterTable,
+            final ResolvedCatalogTable oldTable,
+            final ConvertContext context,
+            final ObjectIdentifier identifier) {
+        final MergeContext baseMergeContext = 
getMergeContext(sqlCreateOrAlterTable, context);
+        final boolean ddlDeclaresWatermark = 
sqlCreateOrAlterTable.getWatermark().isPresent();
+        final boolean ddlDeclaresPrimaryKey =
+                sqlCreateOrAlterTable.getFullConstraints().stream()
+                        .anyMatch(SqlTableConstraint::isPrimaryKey);
+
+        final Schema mergedSchema = baseMergeContext.getMergedSchema();
+        final Schema finalSchema =
+                inheritWatermarkAndPrimaryKey(
+                        mergedSchema,
+                        oldTable.getResolvedSchema(),
+                        ddlDeclaresWatermark,
+                        ddlDeclaresPrimaryKey,
+                        identifier);
+
+        final CatalogMaterializedTable newMaterializedTable =
+                CatalogMaterializedTable.newBuilder()
+                        .schema(finalSchema)
+                        .comment(baseMergeContext.getMergedComment())
+                        
.partitionKeys(baseMergeContext.getMergedPartitionKeys())
+                        .options(baseMergeContext.getMergedTableOptions())
+                        
.originalQuery(baseMergeContext.getMergedOriginalQuery())
+                        
.expandedQuery(baseMergeContext.getMergedExpandedQuery())
+                        
.distribution(baseMergeContext.getMergedTableDistribution().orElse(null))
+                        .freshness(baseMergeContext.getMergedFreshness())
+                        
.logicalRefreshMode(baseMergeContext.getMergedLogicalRefreshMode())
+                        .refreshMode(baseMergeContext.getMergedRefreshMode())
+                        .refreshStatus(RefreshStatus.INITIALIZING)
+                        .startMode(baseMergeContext.getMergedStartMode())
+                        .build();
+        final ResolvedCatalogMaterializedTable resolvedNewMaterializedTable =
+                
context.getCatalogManager().resolveCatalogMaterializedTable(newMaterializedTable);
+
+        final List<TableChange> tableChanges =
+                buildConversionTableChanges(

Review Comment:
   Build changes lazily like CoA



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java:
##########
@@ -117,6 +132,147 @@ private Operation handleAlter(
                 currentTable -> buildNewTable(currentTable, mergeContext, 
schemaResolver));
     }
 
+    private Operation handleConvert(
+            final SqlCreateOrAlterMaterializedTable 
sqlCreateOrAlterMaterializedTable,
+            final ConvertContext context,
+            final ObjectIdentifier identifier,
+            final ResolvedCatalogTable oldBaseTable) {
+        final boolean conversionEnabled =
+                context.getTableConfig()
+                        .getRootConfiguration()
+                        
.get(TableConfigOptions.MATERIALIZED_TABLE_CONVERSION_FROM_TABLE_ENABLED);
+        if (!conversionEnabled) {
+            throw new ValidationException(
+                    String.format(
+                            "Table %s is not a materialized table. Only 
materialized table support create or alter operation.",
+                            identifier.asSummaryString()));
+        }
+        return handleConvert(sqlCreateOrAlterMaterializedTable, oldBaseTable, 
context, identifier);
+    }
+
+    private Operation handleConvert(
+            final SqlCreateOrAlterMaterializedTable sqlCreateOrAlterTable,
+            final ResolvedCatalogTable oldTable,
+            final ConvertContext context,
+            final ObjectIdentifier identifier) {
+        final MergeContext baseMergeContext = 
getMergeContext(sqlCreateOrAlterTable, context);
+        final boolean ddlDeclaresWatermark = 
sqlCreateOrAlterTable.getWatermark().isPresent();
+        final boolean ddlDeclaresPrimaryKey =
+                sqlCreateOrAlterTable.getFullConstraints().stream()
+                        .anyMatch(SqlTableConstraint::isPrimaryKey);
+
+        final Schema mergedSchema = baseMergeContext.getMergedSchema();
+        final Schema finalSchema =
+                inheritWatermarkAndPrimaryKey(
+                        mergedSchema,
+                        oldTable.getResolvedSchema(),
+                        ddlDeclaresWatermark,
+                        ddlDeclaresPrimaryKey,
+                        identifier);
+
+        final CatalogMaterializedTable newMaterializedTable =
+                CatalogMaterializedTable.newBuilder()
+                        .schema(finalSchema)
+                        .comment(baseMergeContext.getMergedComment())
+                        
.partitionKeys(baseMergeContext.getMergedPartitionKeys())
+                        .options(baseMergeContext.getMergedTableOptions())
+                        
.originalQuery(baseMergeContext.getMergedOriginalQuery())
+                        
.expandedQuery(baseMergeContext.getMergedExpandedQuery())
+                        
.distribution(baseMergeContext.getMergedTableDistribution().orElse(null))
+                        .freshness(baseMergeContext.getMergedFreshness())
+                        
.logicalRefreshMode(baseMergeContext.getMergedLogicalRefreshMode())
+                        .refreshMode(baseMergeContext.getMergedRefreshMode())
+                        .refreshStatus(RefreshStatus.INITIALIZING)
+                        .startMode(baseMergeContext.getMergedStartMode())
+                        .build();
+        final ResolvedCatalogMaterializedTable resolvedNewMaterializedTable =
+                
context.getCatalogManager().resolveCatalogMaterializedTable(newMaterializedTable);
+
+        final List<TableChange> tableChanges =
+                buildConversionTableChanges(
+                        oldTable,
+                        resolvedNewMaterializedTable,
+                        baseMergeContext.hasSchemaDefinition(),
+                        baseMergeContext.hasConstraintDefinition());
+
+        return new ConvertTableToMaterializedTableOperation(
+                identifier, oldTable, resolvedNewMaterializedTable, 
tableChanges);
+    }
+
+    /**
+     * Layers the watermark and primary key from the source table onto the 
merged schema when the
+     * conversion DDL does not declare them, preserving the "in-place" 
semantics.
+     *
+     * <p>Inheritance is per-feature and independent of column-list presence: 
the DDL declaring a
+     * watermark replaces the source watermark; omitting it carries the source 
watermark over. The
+     * same rule applies to the primary key.
+     *
+     * <p>To drop an inherited watermark or primary key after conversion, the 
user runs a follow-up
+     * {@code ALTER MATERIALIZED TABLE ... DROP WATERMARK} or {@code ... DROP 
PRIMARY KEY}.
+     *
+     * @return the merged schema with the inherited watermark and / or primary 
key folded in
+     * @throws ValidationException if the source table carries more than one 
watermark
+     *     specification, which materialized tables do not support
+     */
+    private static Schema inheritWatermarkAndPrimaryKey(

Review Comment:
   What actually happens if I run the CoA twice (first conversion, then alter). 
Would it drop watermark/pk then?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java:
##########
@@ -117,6 +132,147 @@ private Operation handleAlter(
                 currentTable -> buildNewTable(currentTable, mergeContext, 
schemaResolver));
     }
 
+    private Operation handleConvert(
+            final SqlCreateOrAlterMaterializedTable 
sqlCreateOrAlterMaterializedTable,
+            final ConvertContext context,
+            final ObjectIdentifier identifier,
+            final ResolvedCatalogTable oldBaseTable) {
+        final boolean conversionEnabled =
+                context.getTableConfig()
+                        .getRootConfiguration()
+                        
.get(TableConfigOptions.MATERIALIZED_TABLE_CONVERSION_FROM_TABLE_ENABLED);
+        if (!conversionEnabled) {
+            throw new ValidationException(
+                    String.format(
+                            "Table %s is not a materialized table. Only 
materialized table support create or alter operation.",
+                            identifier.asSummaryString()));
+        }
+        return handleConvert(sqlCreateOrAlterMaterializedTable, oldBaseTable, 
context, identifier);
+    }
+
+    private Operation handleConvert(

Review Comment:
   having two methods with near-identical signatures is throwing me off a bit. 
Can you just merge, I don't see what's the difference on them from the 
signature.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to