This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c1fa29ac91a [FLINK-35708][table-planner] Allow defining a schema in 
REPLACE TABLE AS (RTAS)
c1fa29ac91a is described below

commit c1fa29ac91aeef64701d8d97b13644a4267e8f8e
Author: Sergio Peña <[email protected]>
AuthorDate: Tue Aug 27 09:26:51 2024 -0500

    [FLINK-35708][table-planner] Allow defining a schema in REPLACE TABLE AS 
(RTAS)
---
 .../flink/sql/parser/ddl/SqlReplaceTableAs.java    |  27 ----
 .../flink/sql/parser/FlinkSqlParserImplTest.java   |  41 ++----
 .../table/planner/operations/MergeTableAsUtil.java | 134 ++++++++++++++---
 .../operations/SqlCreateTableConverter.java        | 101 ++-----------
 .../planner/operations/SqlNodeConvertContext.java  |   5 +
 .../operations/converters/SqlNodeConverter.java    |   4 +
 .../converters/SqlReplaceTableAsConverter.java     | 139 ++++++++++++++---
 .../SqlRTASNodeToOperationConverterTest.java       | 164 +++++++++++++++++++--
 .../planner/runtime/stream/sql/RTASITCase.java     |  80 ++++++++++
 9 files changed, 496 insertions(+), 199 deletions(-)

diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java
index 5d343e8c2c6..65bb7056dbd 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java
@@ -175,33 +175,6 @@ public class SqlReplaceTableAs extends SqlCreate 
implements ExtendedSqlNode {
                     getParserPosition(),
                     errorMsg + " syntax does not support temporary table 
yet.");
         }
-
-        if (getColumnList().size() > 0) {
-            throw new SqlValidateException(
-                    getParserPosition(),
-                    errorMsg + " syntax does not support to specify explicit 
columns yet.");
-        }
-
-        if (getWatermark().isPresent()) {
-            throw new SqlValidateException(
-                    getParserPosition(),
-                    errorMsg + " syntax does not support to specify explicit 
watermark yet.");
-        }
-        if (getDistribution() != null) {
-            throw new SqlValidateException(
-                    getParserPosition(),
-                    errorMsg + " syntax does not support creating distributed 
tables yet.");
-        }
-        if (getPartitionKeyList().size() > 0) {
-            throw new SqlValidateException(
-                    getParserPosition(),
-                    errorMsg + " syntax does not support to create partitioned 
table yet.");
-        }
-        if 
(getFullConstraints().stream().anyMatch(SqlTableConstraint::isPrimaryKey)) {
-            throw new SqlValidateException(
-                    getParserPosition(),
-                    errorMsg + " syntax does not support primary key 
constraints yet.");
-        }
     }
 
     public SqlNode getAsQuery() {
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 7e29a0548e6..abf0725c2ec 100644
--- 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -2944,17 +2944,11 @@ class FlinkSqlParserImplTest extends SqlParserTest {
 
         // test replace table as select with explicit columns
         sql("REPLACE TABLE t (col1 string) WITH ('test' = 'zm') AS SELECT col1 
FROM b")
-                .node(
-                        new ValidationMatcher()
-                                .fails(
-                                        "REPLACE TABLE AS SELECT syntax does 
not support to specify explicit columns yet."));
+                .node(new ValidationMatcher().ok());
 
         // test replace table as select with watermark
         sql("REPLACE TABLE t (watermark FOR ts AS ts - interval '3' second) 
WITH ('test' = 'zm') AS SELECT col1 FROM b")
-                .node(
-                        new ValidationMatcher()
-                                .fails(
-                                        "REPLACE TABLE AS SELECT syntax does 
not support to specify explicit watermark yet."));
+                .node(new ValidationMatcher().ok());
 
         // test replace table as select with constraints
         sql("REPLACE TABLE t (PRIMARY KEY (col1)) WITH ('test' = 'zm') AS 
SELECT col1 FROM b")
@@ -2973,17 +2967,11 @@ class FlinkSqlParserImplTest extends SqlParserTest {
 
         // test replace table as select with partition key
         sql("REPLACE TABLE t PARTITIONED BY(col1) WITH ('test' = 'zm') AS 
SELECT col1 FROM b")
-                .node(
-                        new ValidationMatcher()
-                                .fails(
-                                        "REPLACE TABLE AS SELECT syntax does 
not support to create partitioned table yet."));
+                .node(new ValidationMatcher().ok());
 
         // test replace table as select with distribution
         sql("REPLACE TABLE t DISTRIBUTED BY(col1) WITH ('test' = 'zm') AS 
SELECT col1 FROM b")
-                .node(
-                        new ValidationMatcher()
-                                .fails(
-                                        "REPLACE TABLE AS SELECT syntax does 
not support creating distributed tables yet."));
+                .node(new ValidationMatcher().ok());
     }
 
     @Test
@@ -3010,17 +2998,12 @@ class FlinkSqlParserImplTest extends SqlParserTest {
 
         // test create or replace table as select with explicit columns
         sql("CREATE OR REPLACE TABLE t (col1 string) WITH ('test' = 'zm') AS 
SELECT col1 FROM b")
-                .node(
-                        new ValidationMatcher()
-                                .fails(
-                                        "CREATE OR REPLACE TABLE AS SELECT 
syntax does not support to specify explicit columns yet."));
+                .node(new ValidationMatcher().ok());
 
         // test create or replace table as select with watermark
         sql("CREATE OR REPLACE TABLE t (watermark FOR ts AS ts - interval '3' 
second) WITH ('test' = 'zm') AS SELECT col1 FROM b")
-                .node(
-                        new ValidationMatcher()
-                                .fails(
-                                        "CREATE OR REPLACE TABLE AS SELECT 
syntax does not support to specify explicit watermark yet."));
+                .node(new ValidationMatcher().ok());
+
         // test create or replace table as select with constraints
         sql("CREATE OR REPLACE TABLE t (PRIMARY KEY (col1)) WITH ('test' = 
'zm') AS SELECT col1 FROM b")
                 .node(
@@ -3038,16 +3021,10 @@ class FlinkSqlParserImplTest extends SqlParserTest {
 
         // test create or replace table as select with partition key
         sql("CREATE OR REPLACE TABLE t PARTITIONED BY(col1) WITH ('test' = 
'zm') AS SELECT col1 FROM b")
-                .node(
-                        new ValidationMatcher()
-                                .fails(
-                                        "CREATE OR REPLACE TABLE AS SELECT 
syntax does not support to create partitioned table yet."));
+                .node(new ValidationMatcher().ok());
 
         sql("CREATE OR REPLACE TABLE t DISTRIBUTED BY(col1) WITH ('test' = 
'zm') AS SELECT col1 FROM b")
-                .node(
-                        new ValidationMatcher()
-                                .fails(
-                                        "CREATE OR REPLACE TABLE AS SELECT 
syntax does not support creating distributed tables yet."));
+                .node(new ValidationMatcher().ok());
     }
 
     @Test
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableAsUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableAsUtil.java
index 12662c52b5a..e743e5964a2 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableAsUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableAsUtil.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.planner.operations;
 
-import org.apache.flink.sql.parser.ddl.SqlCreateTableAs;
 import org.apache.flink.sql.parser.ddl.SqlTableColumn;
 import org.apache.flink.sql.parser.ddl.SqlTableColumn.SqlComputedColumn;
 import org.apache.flink.sql.parser.ddl.SqlTableColumn.SqlMetadataColumn;
@@ -28,22 +27,39 @@ import 
org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.Schema.UnresolvedColumn;
 import org.apache.flink.table.api.Schema.UnresolvedPhysicalColumn;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator;
+import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.calcite.SqlRewriterUtils;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlLiteral;
 import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.sql.validate.SqlValidator;
 
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static 
org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsImplicitCast;
 
@@ -53,7 +69,7 @@ public class MergeTableAsUtil {
     private final Function<SqlNode, String> escapeExpression;
     private final DataTypeFactory dataTypeFactory;
 
-    MergeTableAsUtil(
+    public MergeTableAsUtil(
             SqlValidator validator,
             Function<SqlNode, String> escapeExpression,
             DataTypeFactory dataTypeFactory) {
@@ -63,9 +79,90 @@ public class MergeTableAsUtil {
     }
 
     /**
-     * Merges the schema part of the {@code sqlCreateTableAs} with the {@code 
sourceSchema}.
+     * Rewrites the query operation to include only the fields that may be 
persisted in the sink.
+     */
+    public PlannerQueryOperation maybeRewriteQuery(
+            CatalogManager catalogManager,
+            FlinkPlannerImpl flinkPlanner,
+            PlannerQueryOperation origQueryOperation,
+            SqlNode origQueryNode,
+            ResolvedCatalogTable sinkTable) {
+        FlinkCalciteSqlValidator sqlValidator = 
flinkPlanner.getOrCreateSqlValidator();
+        SqlRewriterUtils rewriterUtils = new SqlRewriterUtils(sqlValidator);
+        FlinkTypeFactory typeFactory = (FlinkTypeFactory) 
sqlValidator.getTypeFactory();
+
+        // Only fields that may be persisted will be included in the select 
query
+        RowType sinkRowType =
+                ((RowType) 
sinkTable.getResolvedSchema().toSinkRowDataType().getLogicalType());
+
+        Map<String, Integer> sourceFields =
+                IntStream.range(0, 
origQueryOperation.getResolvedSchema().getColumnNames().size())
+                        .boxed()
+                        .collect(
+                                Collectors.toMap(
+                                        
origQueryOperation.getResolvedSchema().getColumnNames()
+                                                ::get,
+                                        Function.identity()));
+
+        // assignedFields contains the new sink fields that are not present in 
the source
+        // and that will be included in the select query
+        LinkedHashMap<Integer, SqlNode> assignedFields = new LinkedHashMap<>();
+
+        // targetPositions contains the positions of the source fields that 
will be
+        // included in the select query
+        List<Object> targetPositions = new ArrayList<>();
+
+        int pos = -1;
+        for (RowType.RowField targetField : sinkRowType.getFields()) {
+            pos++;
+
+            if (!sourceFields.containsKey(targetField.getName())) {
+                if (!targetField.getType().isNullable()) {
+                    throw new ValidationException(
+                            "Column '"
+                                    + targetField.getName()
+                                    + "' has no default value and does not 
allow NULLs.");
+                }
+
+                assignedFields.put(
+                        pos,
+                        rewriterUtils.maybeCast(
+                                SqlLiteral.createNull(SqlParserPos.ZERO),
+                                typeFactory.createUnknownType(),
+                                
typeFactory.createFieldTypeFromLogicalType(targetField.getType()),
+                                typeFactory));
+            } else {
+                targetPositions.add(sourceFields.get(targetField.getName()));
+            }
+        }
+
+        // if there are no new sink fields to include, then return the 
original query
+        if (assignedFields.isEmpty()) {
+            return origQueryOperation;
+        }
+
+        // rewrite query
+        SqlCall newSelect =
+                rewriterUtils.rewriteSelect(
+                        (SqlSelect) origQueryNode,
+                        typeFactory.buildRelNodeRowType(sinkRowType),
+                        assignedFields,
+                        targetPositions);
+
+        return (PlannerQueryOperation)
+                SqlNodeToOperationConversion.convert(flinkPlanner, 
catalogManager, newSelect)
+                        .orElseThrow(
+                                () ->
+                                        new TableException(
+                                                "Unsupported node type "
+                                                        + 
newSelect.getClass().getSimpleName()));
+    }
+
+    /**
+     * Merges the specified schema with columns, watermark, and constraints 
with the {@code
+     * sourceSchema}.
      *
-     * <p>The schema part of the {@code CREATE TABLE} statement merged 
includes:
+     * <p>The resulted schema will contain the following elements:
      *
      * <ul>
      *   <li>columns
@@ -75,16 +172,19 @@ public class MergeTableAsUtil {
      *   <li>primary key
      * </ul>
      *
-     * <p>It is expected that the {@code sourceSchema} contains only 
physical/regular columns, which
-     * is behavior of the CTAS statement to generate such schema.
+     * <p>It is expected that the {@code sourceSchema} contains only 
physical/regular columns.
      *
-     * <p>Columns of {@code sourceSchema} are appended to the schema of {@code 
sqlCreateTableAs}. If
-     * a column in the {@code sqlCreateTableAs} is already defined in {@code 
sourceSchema}, then the
-     * types of the columns are implicit cast and must be compatible based on 
the implicit cast
-     * rules. If they're compatible, then the column position in the schema 
stays the same as
-     * defined in the appended {@code sourceSchema}.
+     * <p>Columns of the {@code sourceSchema} are appended to the schema 
columns defined in the
+     * {@code sqlColumnList}. If a column in the {@code sqlColumnList} is 
already defined in the
+     * {@code sourceSchema}, then the types of the columns are implicit cast 
and must be compatible
+     * based on the implicit cast rules. If they're compatible, then the 
column position in the
+     * schema stays the same as defined in the appended {@code sourceSchema}.
      */
-    public Schema mergeSchemas(SqlCreateTableAs sqlCreateTableAs, 
ResolvedSchema sourceSchema) {
+    public Schema mergeSchemas(
+            SqlNodeList sqlColumnList,
+            @Nullable SqlWatermark sqlWatermark,
+            List<SqlTableConstraint> sqlTableConstraints,
+            ResolvedSchema sourceSchema) {
         SchemaBuilder schemaBuilder =
                 new SchemaBuilder(
                         (FlinkTypeFactory) validator.getTypeFactory(),
@@ -93,19 +193,17 @@ public class MergeTableAsUtil {
                         escapeExpression);
 
         schemaBuilder.mergeColumns(
-                sqlCreateTableAs.getColumnList(),
+                sqlColumnList,
                 
Schema.newBuilder().fromResolvedSchema(sourceSchema).build().getColumns());
 
-        if (sqlCreateTableAs.getWatermark().isPresent()) {
-            schemaBuilder.setWatermark(sqlCreateTableAs.getWatermark().get());
+        if (sqlWatermark != null) {
+            schemaBuilder.setWatermark(sqlWatermark);
         }
 
         // It is assumed only a primary key constraint may be defined in the 
table. The
         // SqlCreateTableAs has validations to ensure this before the object 
is created.
         Optional<SqlTableConstraint> primaryKey =
-                sqlCreateTableAs.getFullConstraints().stream()
-                        .filter(SqlTableConstraint::isPrimaryKey)
-                        .findAny();
+                
sqlTableConstraints.stream().filter(SqlTableConstraint::isPrimaryKey).findAny();
 
         if (primaryKey.isPresent()) {
             schemaBuilder.setPrimaryKey(primaryKey.get());
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
index 2a7c1044456..037fc4cde92 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
@@ -43,21 +43,13 @@ import 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.planner.calcite.SqlRewriterUtils;
 import org.apache.flink.table.planner.utils.OperationConverterUtils;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.RowType.RowField;
 
-import org.apache.calcite.sql.SqlCall;
 import org.apache.calcite.sql.SqlIdentifier;
-import org.apache.calcite.sql.SqlLiteral;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
-import org.apache.calcite.sql.SqlSelect;
-import org.apache.calcite.sql.parser.SqlParserPos;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -65,7 +57,6 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 /** Helper class for converting {@link SqlCreateTable} to {@link 
CreateTableOperation}. */
 class SqlCreateTableConverter {
@@ -130,8 +121,12 @@ class SqlCreateTableConverter {
 
         // If needed, rewrite the query to include the new sink fields in the 
select list
         query =
-                maybeRewriteCreateTableAsQuery(
-                        flinkPlanner, sqlCreateTableAs, 
tableWithResolvedSchema, query);
+                mergeTableAsUtil.maybeRewriteQuery(
+                        catalogManager,
+                        flinkPlanner,
+                        query,
+                        sqlCreateTableAs.getAsQuery(),
+                        tableWithResolvedSchema);
 
         CreateTableOperation createTableOperation =
                 new CreateTableOperation(
@@ -144,83 +139,6 @@ class SqlCreateTableConverter {
                 createTableOperation, Collections.emptyMap(), query, false);
     }
 
-    /**
-     * Builds and returns a new Query operation with new sink fields declared 
in the {@code
-     * sinkTable}.
-     */
-    private PlannerQueryOperation maybeRewriteCreateTableAsQuery(
-            FlinkPlannerImpl flinkPlanner,
-            SqlCreateTableAs sqlCreateTableAs,
-            ResolvedCatalogTable sinkTable,
-            PlannerQueryOperation query) {
-
-        // Only fields that may be persisted will be included in the select 
query
-        RowType sinkRowType =
-                ((RowType) 
sinkTable.getResolvedSchema().toSinkRowDataType().getLogicalType());
-
-        Map<String, Integer> sourceFields =
-                IntStream.range(0, 
query.getResolvedSchema().getColumnNames().size())
-                        .boxed()
-                        .collect(
-                                Collectors.toMap(
-                                        
query.getResolvedSchema().getColumnNames()::get,
-                                        Function.identity()));
-
-        // assignedFields contains the new sink fields that are not present in 
the source
-        // and that will be included in the select query
-        LinkedHashMap<Integer, SqlNode> assignedFields = new LinkedHashMap<>();
-
-        // targetPositions contains the positions of the source fields that 
will be
-        // included in the select query
-        List<Object> targetPositions = new ArrayList<>();
-
-        int pos = -1;
-        for (RowField targetField : sinkRowType.getFields()) {
-            pos++;
-
-            if (!sourceFields.containsKey(targetField.getName())) {
-                if (!targetField.getType().isNullable()) {
-                    throw new ValidationException(
-                            "Column '"
-                                    + targetField.getName()
-                                    + "' has "
-                                    + "no default value and does not allow 
NULLs.");
-                }
-
-                assignedFields.put(
-                        pos,
-                        rewriterUtils.maybeCast(
-                                SqlLiteral.createNull(SqlParserPos.ZERO),
-                                typeFactory.createUnknownType(),
-                                
typeFactory.createFieldTypeFromLogicalType(targetField.getType()),
-                                typeFactory));
-            } else {
-                targetPositions.add(sourceFields.get(targetField.getName()));
-            }
-        }
-
-        // if there are no new sink fields to include, then return the 
original query
-        if (assignedFields.isEmpty()) {
-            return query;
-        }
-
-        // rewrite query
-        SqlCall newSelect =
-                rewriterUtils.rewriteSelect(
-                        (SqlSelect) sqlCreateTableAs.getAsQuery(),
-                        typeFactory.buildRelNodeRowType(sinkRowType),
-                        assignedFields,
-                        targetPositions);
-
-        return (PlannerQueryOperation)
-                SqlNodeToOperationConversion.convert(flinkPlanner, 
catalogManager, newSelect)
-                        .orElseThrow(
-                                () ->
-                                        new TableException(
-                                                "CTAS unsupported node type "
-                                                        + 
newSelect.getClass().getSimpleName()));
-    }
-
     private ResolvedCatalogTable createCatalogTable(
             SqlCreateTableAs sqlCreateTableAs, ResolvedSchema mergeSchema) {
         Map<String, String> tableOptions =
@@ -233,7 +151,12 @@ class SqlCreateTableConverter {
         String tableComment =
                 
OperationConverterUtils.getTableComment(sqlCreateTableAs.getComment());
 
-        Schema mergedSchema = mergeTableAsUtil.mergeSchemas(sqlCreateTableAs, 
mergeSchema);
+        Schema mergedSchema =
+                mergeTableAsUtil.mergeSchemas(
+                        sqlCreateTableAs.getColumnList(),
+                        sqlCreateTableAs.getWatermark().orElse(null),
+                        sqlCreateTableAs.getFullConstraints(),
+                        mergeSchema);
 
         Optional<TableDistribution> tableDistribution =
                 Optional.ofNullable(sqlCreateTableAs.getDistribution())
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java
index 2958333caba..687a8ce483a 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java
@@ -72,6 +72,11 @@ public class SqlNodeConvertContext implements 
SqlNodeConverter.ConvertContext {
         return catalogManager;
     }
 
+    @Override
+    public FlinkPlannerImpl getFlinkPlanner() {
+        return flinkPlanner;
+    }
+
     @Override
     public RelRoot toRelRoot(SqlNode sqlNode) {
         return flinkPlanner.rel(sqlNode);
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java
index cdd9d860097..2c54d78c516 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java
@@ -22,6 +22,7 @@ import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
 import org.apache.flink.table.planner.utils.Expander;
 import org.apache.flink.table.types.DataType;
 
@@ -83,6 +84,9 @@ public interface SqlNodeConverter<S extends SqlNode> {
         /** Returns the {@link CatalogManager} in the convert context. */
         CatalogManager getCatalogManager();
 
+        /** Returns the {@link FlinkPlannerImpl} in the convert context. */
+        FlinkPlannerImpl getFlinkPlanner();
+
         /** Converts the given validated {@link SqlNode} into a {@link 
RelRoot}. */
         RelRoot toRelRoot(SqlNode sqlNode);
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java
index 83b10eb97d5..cf4d96cf26f 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java
@@ -21,22 +21,34 @@ package 
org.apache.flink.table.planner.operations.converters;
 import org.apache.flink.sql.parser.ddl.SqlReplaceTableAs;
 import org.apache.flink.sql.parser.ddl.SqlTableOption;
 import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableDistribution;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.operations.Operation;
-import org.apache.flink.table.operations.QueryOperation;
 import org.apache.flink.table.operations.ReplaceTableAsOperation;
 import org.apache.flink.table.operations.ddl.CreateTableOperation;
+import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
+import org.apache.flink.table.planner.operations.MergeTableAsUtil;
 import org.apache.flink.table.planner.operations.PlannerQueryOperation;
+import org.apache.flink.table.planner.operations.SqlNodeToOperationConversion;
 import org.apache.flink.table.planner.utils.OperationConverterUtils;
 
-import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNodeList;
 
-import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 /** A converter for {@link SqlReplaceTableAs}. */
 public class SqlReplaceTableAsConverter implements 
SqlNodeConverter<SqlReplaceTableAs> {
@@ -47,13 +59,60 @@ public class SqlReplaceTableAsConverter implements 
SqlNodeConverter<SqlReplaceTa
         UnresolvedIdentifier unresolvedIdentifier =
                 UnresolvedIdentifier.of(sqlReplaceTableAs.fullTableName());
         ObjectIdentifier identifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier);
+        FlinkPlannerImpl flinkPlanner = context.getFlinkPlanner();
 
-        SqlNode asQuerySqlNode = sqlReplaceTableAs.getAsQuery();
-        SqlNode validated = context.getSqlValidator().validate(asQuerySqlNode);
-        QueryOperation query =
-                new PlannerQueryOperation(
-                        context.toRelRoot(validated).project(),
-                        () -> context.toQuotedSqlString(asQuerySqlNode));
+        MergeTableAsUtil mergeTableAsUtil =
+                new MergeTableAsUtil(
+                        context.getSqlValidator(),
+                        sqlNode -> sqlNode.toString(),
+                        catalogManager.getDataTypeFactory());
+
+        PlannerQueryOperation query =
+                (PlannerQueryOperation)
+                        SqlNodeToOperationConversion.convert(
+                                        flinkPlanner,
+                                        catalogManager,
+                                        sqlReplaceTableAs.getAsQuery())
+                                .orElseThrow(
+                                        () ->
+                                                new TableException(
+                                                        "RTAS unsupported node 
type "
+                                                                + 
sqlReplaceTableAs
+                                                                        
.getAsQuery()
+                                                                        
.getClass()
+                                                                        
.getSimpleName()));
+
+        // get table
+        ResolvedCatalogTable tableWithResolvedSchema =
+                createCatalogTable(
+                        context, mergeTableAsUtil, sqlReplaceTableAs, 
query.getResolvedSchema());
+
+        // If needed, rewrite the query to include the new sink fields in the 
select list
+        query =
+                mergeTableAsUtil.maybeRewriteQuery(
+                        context.getCatalogManager(),
+                        flinkPlanner,
+                        query,
+                        sqlReplaceTableAs.getAsQuery(),
+                        tableWithResolvedSchema);
+
+        CreateTableOperation createTableOperation =
+                new CreateTableOperation(
+                        identifier,
+                        tableWithResolvedSchema,
+                        sqlReplaceTableAs.isIfNotExists(),
+                        sqlReplaceTableAs.isTemporary());
+
+        return new ReplaceTableAsOperation(
+                createTableOperation, query, 
sqlReplaceTableAs.isCreateOrReplace());
+    }
+
+    private ResolvedCatalogTable createCatalogTable(
+            ConvertContext context,
+            MergeTableAsUtil mergeTableAsUtil,
+            SqlReplaceTableAs sqlReplaceTableAs,
+            ResolvedSchema mergeSchema) {
+        CatalogManager catalogManager = context.getCatalogManager();
 
         // get table comment
         String tableComment =
@@ -70,22 +129,56 @@ public class SqlReplaceTableAsConverter implements 
SqlNodeConverter<SqlReplaceTa
                                         ((SqlTableOption) p).getKeyString(),
                                         ((SqlTableOption) 
p).getValueString()));
 
-        // get table
+        // merge schemas
+        Schema mergedSchema =
+                mergeTableAsUtil.mergeSchemas(
+                        sqlReplaceTableAs.getColumnList(),
+                        sqlReplaceTableAs.getWatermark().orElse(null),
+                        sqlReplaceTableAs.getFullConstraints(),
+                        mergeSchema);
+
+        // get distribution
+        Optional<TableDistribution> tableDistribution =
+                Optional.ofNullable(sqlReplaceTableAs.getDistribution())
+                        
.map(OperationConverterUtils::getDistributionFromSqlDistribution);
+
+        // get partition key
+        List<String> partitionKeys =
+                
getPartitionKeyColumnNames(sqlReplaceTableAs.getPartitionKeyList());
+        verifyPartitioningColumnsExist(mergedSchema, partitionKeys);
+
         CatalogTable catalogTable =
-                CatalogTable.of(
-                        
Schema.newBuilder().fromResolvedSchema(query.getResolvedSchema()).build(),
-                        tableComment,
-                        Collections.emptyList(),
-                        properties);
+                CatalogTable.newBuilder()
+                        .schema(mergedSchema)
+                        .comment(tableComment)
+                        .distribution(tableDistribution.orElse(null))
+                        .options(properties)
+                        .partitionKeys(partitionKeys)
+                        .build();
 
-        CreateTableOperation createTableOperation =
-                new CreateTableOperation(
-                        identifier,
-                        catalogTable,
-                        sqlReplaceTableAs.isIfNotExists(),
-                        sqlReplaceTableAs.isTemporary());
+        return catalogManager.resolveCatalogTable(catalogTable);
+    }
 
-        return new ReplaceTableAsOperation(
-                createTableOperation, query, 
sqlReplaceTableAs.isCreateOrReplace());
+    private List<String> getPartitionKeyColumnNames(SqlNodeList partitionKey) {
+        return partitionKey.getList().stream()
+                .map(p -> ((SqlIdentifier) p).getSimple())
+                .collect(Collectors.toList());
+    }
+
+    private void verifyPartitioningColumnsExist(Schema mergedSchema, 
List<String> partitionKeys) {
+        Set<String> columnNames =
+                mergedSchema.getColumns().stream()
+                        .map(Schema.UnresolvedColumn::getName)
+                        .collect(Collectors.toCollection(LinkedHashSet::new));
+        for (String partitionKey : partitionKeys) {
+            if (!columnNames.contains(partitionKey)) {
+                throw new ValidationException(
+                        String.format(
+                                "Partition column '%s' not defined in the 
table schema. Available columns: [%s]",
+                                partitionKey,
+                                columnNames.stream()
+                                        .collect(Collectors.joining("', '", 
"'", "'"))));
+            }
+        }
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
index cc7b5a3f1af..ee6e8bca658 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
@@ -21,8 +21,10 @@ package org.apache.flink.table.planner.operations;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.TableDistribution;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.ReplaceTableAsOperation;
 import org.apache.flink.table.operations.ddl.CreateTableOperation;
@@ -37,15 +39,17 @@ import javax.annotation.Nullable;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test base for testing convert [CREATE OR] REPLACE TABLE AS statement to 
operation. */
 public class SqlRTASNodeToOperationConverterTest extends 
SqlNodeToOperationConversionTestBase {
 
     @Test
-    public void testReplaceTableAS() {
+    public void testReplaceTableAs() {
         String tableName = "replace_table";
         String tableComment = "test table comment 表描述";
         String sql =
@@ -58,7 +62,7 @@ public class SqlRTASNodeToOperationConverterTest extends 
SqlNodeToOperationConve
     }
 
     @Test
-    public void testCreateOrReplaceTableAS() {
+    public void testCreateOrReplaceTableAs() {
         String tableName = "create_or_replace_table";
         String sql =
                 "CREATE OR REPLACE TABLE "
@@ -68,25 +72,163 @@ public class SqlRTASNodeToOperationConverterTest extends 
SqlNodeToOperationConve
     }
 
     @Test
-    public void testCreateOrReplaceTableASWithLimit() {
+    public void testCreateOrReplaceTableAsWithColumns() {
         String tableName = "create_or_replace_table";
         String sql =
                 "CREATE OR REPLACE TABLE "
                         + tableName
-                        + " WITH ('k1' = 'v1', 'k2' = 'v2') as (SELECT * FROM 
t1 LIMIT 5)";
-        testCommonReplaceTableAs(sql, tableName, null);
+                        + "(c0 int, c1 double metadata, c2 as c0 * a) "
+                        + " WITH ('k1' = 'v1', 'k2' = 'v2') as SELECT * FROM 
t1";
+        Schema tableSchema =
+                Schema.newBuilder()
+                        .column("c0", DataTypes.INT())
+                        .columnByMetadata("c1", DataTypes.DOUBLE())
+                        .columnByExpression("c2", "`c0` * `a`")
+                        .fromSchema(getDefaultTableSchema())
+                        .build();
+
+        testCommonReplaceTableAs(sql, tableName, null, tableSchema, null, 
Collections.emptyList());
+    }
+
+    @Test
+    public void testCreateOrReplaceTableAsWithColumnsOverridden() {
+        String tableName = "create_or_replace_table";
+        String sql =
+                "CREATE OR REPLACE TABLE "
+                        + tableName
+                        + "(c0 int, a double, c int) "
+                        + " WITH ('k1' = 'v1', 'k2' = 'v2') as SELECT * FROM 
t1";
+        Schema tableSchema =
+                Schema.newBuilder()
+                        .column("c0", DataTypes.INT())
+                        .column("a", DataTypes.DOUBLE())
+                        .column("b", DataTypes.STRING())
+                        .column("c", DataTypes.INT())
+                        .column("d", DataTypes.STRING())
+                        .build();
+
+        testCommonReplaceTableAs(sql, tableName, null, tableSchema, null, 
Collections.emptyList());
+    }
+
+    @Test
+    public void testCreateOrReplaceTableAsWithNotNullColumnsAreNotAllowed() {
+        String tableName = "create_or_replace_table";
+        String sql =
+                "CREATE OR REPLACE TABLE "
+                        + tableName
+                        + "(c0 int not null) "
+                        + " WITH ('k1' = 'v1', 'k2' = 'v2') as SELECT * FROM 
t1";
+
+        assertThatThrownBy(() -> parseAndConvert(sql))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining("Column 'c0' has no default value and 
does not allow NULLs.");
+    }
+
+    @Test
+    public void testCreateOrReplaceTableAsWithIncompatibleImplicitCastTypes() {
+        String tableName = "create_or_replace_table";
+        String sql =
+                "CREATE OR REPLACE TABLE "
+                        + tableName
+                        + "(a boolean) "
+                        + " WITH ('k1' = 'v1', 'k2' = 'v2') as SELECT * FROM 
t1";
+
+        assertThatThrownBy(() -> parseAndConvert(sql))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Incompatible types for sink column 'a' at position 0. 
"
+                                + "The source column has type 'BIGINT NOT 
NULL', while the target "
+                                + "column has type 'BOOLEAN'.");
+    }
+
+    @Test
+    public void testCreateOrReplaceTableAsWithDistribution() {
+        String tableName = "create_or_replace_table";
+        String sql =
+                "CREATE OR REPLACE TABLE "
+                        + tableName
+                        + " DISTRIBUTED BY HASH(b) INTO 2 BUCKETS "
+                        + " WITH ('k1' = 'v1', 'k2' = 'v2') as SELECT * FROM 
t1";
+        Schema tableSchema = 
Schema.newBuilder().fromSchema(getDefaultTableSchema()).build();
+
+        testCommonReplaceTableAs(
+                sql,
+                tableName,
+                null,
+                tableSchema,
+                TableDistribution.ofHash(Collections.singletonList("b"), 2),
+                Collections.emptyList());
+    }
+
+    @Test
+    public void testCreateOrReplaceTableAsWithPrimaryKey() {
+        String tableName = "create_or_replace_table";
+        String sql =
+                "CREATE OR REPLACE TABLE "
+                        + tableName
+                        + "(PRIMARY KEY (a) NOT ENFORCED) "
+                        + " WITH ('k1' = 'v1', 'k2' = 'v2') as SELECT * FROM 
t1";
+        Schema tableSchema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.BIGINT().notNull())
+                        .column("b", DataTypes.STRING())
+                        .column("c", DataTypes.INT())
+                        .column("d", DataTypes.STRING())
+                        .primaryKey("a")
+                        .build();
+
+        testCommonReplaceTableAs(sql, tableName, null, tableSchema, null, 
Collections.emptyList());
+    }
+
+    @Test
+    public void testCreateOrReplaceTableAsWithWatermark() {
+        String tableName = "create_or_replace_table";
+        String sql =
+                "CREATE OR REPLACE TABLE "
+                        + tableName
+                        + "(c0 TIMESTAMP(3), WATERMARK FOR c0 AS c0 - INTERVAL 
'3' SECOND)"
+                        + " WITH ('k1' = 'v1', 'k2' = 'v2') as SELECT * FROM 
t1";
+        Schema tableSchema =
+                Schema.newBuilder()
+                        .column("c0", DataTypes.TIMESTAMP(3))
+                        .column("a", DataTypes.BIGINT().notNull())
+                        .column("b", DataTypes.STRING())
+                        .column("c", DataTypes.INT())
+                        .column("d", DataTypes.STRING())
+                        .watermark("c0", "`c0` - INTERVAL '3' SECOND")
+                        .build();
+
+        testCommonReplaceTableAs(sql, tableName, null, tableSchema, null, 
Collections.emptyList());
     }
 
     private void testCommonReplaceTableAs(
             String sql, String tableName, @Nullable String tableComment) {
+        testCommonReplaceTableAs(
+                sql,
+                tableName,
+                tableComment,
+                getDefaultTableSchema(),
+                null,
+                Collections.emptyList());
+    }
+
+    private void testCommonReplaceTableAs(
+            String sql,
+            String tableName,
+            @Nullable String tableComment,
+            Schema tableSchema,
+            @Nullable TableDistribution distribution,
+            List<String> partitionKey) {
         ObjectIdentifier expectedIdentifier = ObjectIdentifier.of("builtin", 
"default", tableName);
         Operation operation = parseAndConvert(sql);
         CatalogTable expectedCatalogTable =
-                CatalogTable.of(
-                        getDefaultTableSchema(),
-                        tableComment,
-                        Collections.emptyList(),
-                        getDefaultTableOptions());
+                CatalogTable.newBuilder()
+                        .schema(tableSchema)
+                        .comment(tableComment)
+                        .distribution(distribution)
+                        .options(getDefaultTableOptions())
+                        .partitionKeys(partitionKey)
+                        .build();
         verifyReplaceTableAsOperation(operation, expectedIdentifier, 
expectedCatalogTable);
     }
 
@@ -122,6 +264,8 @@ public class SqlRTASNodeToOperationConverterTest extends 
SqlNodeToOperationConve
         assertThat(actualCatalogTable.getPartitionKeys())
                 .isEqualTo(expectedCatalogTable.getPartitionKeys());
         
assertThat(actualCatalogTable.getOptions()).isEqualTo(expectedCatalogTable.getOptions());
+        assertThat(actualCatalogTable.getDistribution())
+                .isEqualTo(expectedCatalogTable.getDistribution());
     }
 
     private Map<String, String> getDefaultTableOptions() {
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/RTASITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/RTASITCase.java
index f720dde7eb7..2ce2bc6dffe 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/RTASITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/RTASITCase.java
@@ -108,6 +108,86 @@ class RTASITCase extends StreamingTestBase {
         verifyCatalogTable(expectCatalogTable, getCatalogTable("target"));
     }
 
+    @Test
+    void testCreateOrReplaceTableASWithNewColumnsOnly() throws Exception {
+        tEnv().executeSql(
+                        "CREATE OR REPLACE TABLE target"
+                                + " (`p1` INT, `p2` STRING)"
+                                + " WITH ('connector' = 'values', 'bounded' = 
'true')"
+                                + " AS SELECT a, c FROM source")
+                .await();
+
+        // verify written rows
+        
assertThat(TestValuesTableFactory.getResultsAsStrings("target").toString())
+                .isEqualTo(
+                        "["
+                                + "+I[null, null, 1, Hi], "
+                                + "+I[null, null, 2, Hello], "
+                                + "+I[null, null, 3, Hello world]"
+                                + "]");
+
+        // verify the table after replacing
+        CatalogTable expectCatalogTable =
+                getExpectCatalogTable(
+                        new String[] {"p1", "p2", "a", "c"},
+                        new AbstractDataType[] {
+                            DataTypes.INT(), DataTypes.STRING(), 
DataTypes.INT(), DataTypes.STRING()
+                        });
+
+        verifyCatalogTable(expectCatalogTable, getCatalogTable("target"));
+    }
+
+    @Test
+    void testCreateOrReplaceTableAsSelectWithColumnsFromQueryOnly() throws 
Exception {
+        tEnv().executeSql(
+                        "CREATE OR REPLACE TABLE target"
+                                + " (`a` DOUBLE, `c` STRING)"
+                                + " WITH ('connector' = 'values', 'bounded' = 
'true')"
+                                + " AS SELECT a, c FROM source")
+                .await();
+
+        // verify written rows
+        
assertThat(TestValuesTableFactory.getResultsAsStrings("target").toString())
+                .isEqualTo("[+I[1.0, Hi], +I[2.0, Hello], +I[3.0, Hello 
world]]");
+
+        // verify the table after replacing
+        CatalogTable expectCatalogTable =
+                getExpectCatalogTable(
+                        new String[] {"a", "c"},
+                        new AbstractDataType[] {DataTypes.DOUBLE(), 
DataTypes.STRING()});
+
+        verifyCatalogTable(expectCatalogTable, getCatalogTable("target"));
+    }
+
+    @Test
+    void testCreateOrReplaceTableAsSelectWithMixOfNewColumnsAndQueryColumns() 
throws Exception {
+        tEnv().executeSql(
+                        "CREATE OR REPLACE TABLE target"
+                                + " (`p1` INT, `a` DOUBLE)"
+                                + " WITH ('connector' = 'values', 'bounded' = 
'true')"
+                                + " AS SELECT a, c FROM source")
+                .await();
+
+        // verify written rows
+        
assertThat(TestValuesTableFactory.getResultsAsStrings("target").toString())
+                .isEqualTo(
+                        "["
+                                + "+I[null, 1.0, Hi], "
+                                + "+I[null, 2.0, Hello], "
+                                + "+I[null, 3.0, Hello world]"
+                                + "]");
+
+        // verify the table after replacing
+        CatalogTable expectCatalogTable =
+                getExpectCatalogTable(
+                        new String[] {"p1", "a", "c"},
+                        new AbstractDataType[] {
+                            DataTypes.INT(), DataTypes.DOUBLE(), 
DataTypes.STRING()
+                        });
+
+        verifyCatalogTable(expectCatalogTable, getCatalogTable("target"));
+    }
+
     @Test
     void testCreateOrReplaceTableASWithSortLimit() throws Exception {
         tEnv().executeSql(

Reply via email to