This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new baa67f465b7 [FLINK-35709][table-planner] Allow reordering source 
columns in CTAS and RTAS
baa67f465b7 is described below

commit baa67f465b77c9b3131d8153b52d2a94e3e00ef9
Author: Sergio Peña <[email protected]>
AuthorDate: Thu Oct 3 02:38:31 2024 -0500

    [FLINK-35709][table-planner] Allow reordering source columns in CTAS and 
RTAS
---
 docs/content.zh/docs/dev/table/sql/create.md       | 31 +++++++
 docs/content/docs/dev/table/sql/create.md          | 31 +++++++
 .../src/main/codegen/includes/parserImpls.ftl      | 54 +++++++++++--
 .../flink/sql/parser/ddl/SqlCreateTable.java       |  4 +
 .../flink/sql/parser/ddl/SqlCreateTableAs.java     | 12 ++-
 .../flink/sql/parser/ddl/SqlReplaceTableAs.java    | 12 ++-
 .../flink/sql/parser/utils/ParserResource.java     |  4 +
 .../flink/sql/parser/FlinkSqlParserImplTest.java   | 37 +++++++++
 .../table/planner/operations/MergeTableAsUtil.java | 59 +++++++++++---
 .../operations/SqlCreateTableConverter.java        | 22 +++--
 .../converters/SqlReplaceTableAsConverter.java     | 24 ++++--
 .../planner/calcite/PreValidateReWriter.scala      | 65 ++-------------
 .../table/planner/calcite/SqlRewriterUtils.scala   | 94 +++++++++++++++++++++-
 .../operations/SqlDdlToOperationConverterTest.java | 74 +++++++++++++++++
 .../SqlRTASNodeToOperationConverterTest.java       | 44 ++++++++++
 .../planner/runtime/stream/sql/RTASITCase.java     | 22 +++++
 .../runtime/stream/sql/TableSinkITCase.scala       | 43 ++++++++++
 17 files changed, 539 insertions(+), 93 deletions(-)

diff --git a/docs/content.zh/docs/dev/table/sql/create.md 
b/docs/content.zh/docs/dev/table/sql/create.md
index 717c0050230..650c1b28dc3 100644
--- a/docs/content.zh/docs/dev/table/sql/create.md
+++ b/docs/content.zh/docs/dev/table/sql/create.md
@@ -643,6 +643,37 @@ CREATE TABLE my_ctas_table (
 INSERT INTO my_ctas_table SELECT id, name FROM source_table;
 ```
 
+`CTAS` also allows you to reorder the columns defined in the `SELECT` part by 
specifying all column names without data types in the `CREATE` part. This 
feature is equivalent to the `INSERT INTO` statement.
+The columns specified must match the names and number of columns in the 
`SELECT` part. This definition cannot be combined with new columns, which 
requires defining data types.
+
+Consider the example statement below:
+
+```sql
+CREATE TABLE my_ctas_table (
+    order_time, price, quantity, id
+) WITH (
+    'connector' = 'kafka',
+    ...
+) AS SELECT id, price, quantity, order_time FROM source_table;
+```
+
+The resulting table `my_ctas_table` will be equivalent to create the following 
table and insert the data with the following statement:
+
+```
+CREATE TABLE my_ctas_table (
+    order_time TIMESTAMP(3),
+    price DOUBLE,
+    quantity DOUBLE,
+    id BIGINT
+) WITH (
+    'connector' = 'kafka',
+    ...
+);
+
+INSERT INTO my_ctas_table (order_time, price, quantity, id)
+    SELECT id, price, quantity, order_time FROM source_table;
+```
+
 **Note:** CTAS has these restrictions:
 * Does not support creating a temporary table yet.
 * Does not support creating partitioned table yet.
diff --git a/docs/content/docs/dev/table/sql/create.md 
b/docs/content/docs/dev/table/sql/create.md
index 2957a656790..3a564d3ffb1 100644
--- a/docs/content/docs/dev/table/sql/create.md
+++ b/docs/content/docs/dev/table/sql/create.md
@@ -643,6 +643,37 @@ CREATE TABLE my_ctas_table (
 INSERT INTO my_ctas_table SELECT id, name FROM source_table;
 ```
 
+`CTAS` also allows you to reorder the columns defined in the `SELECT` part by 
specifying all column names without data types in the `CREATE` part. This 
feature is equivalent to the `INSERT INTO` statement.
+The columns specified must match the names and number of columns in the 
`SELECT` part. This definition cannot be combined with new columns, which 
requires defining data types.
+
+Consider the example statement below:
+
+```sql
+CREATE TABLE my_ctas_table (
+    order_time, price, quantity, id
+) WITH (
+    'connector' = 'kafka',
+    ...
+) AS SELECT id, price, quantity, order_time FROM source_table;
+```
+
+The resulting table `my_ctas_table` will be equivalent to create the following 
table and insert the data with the following statement:
+
+```
+CREATE TABLE my_ctas_table (
+    order_time TIMESTAMP(3),
+    price DOUBLE,
+    quantity DOUBLE,
+    id BIGINT
+) WITH (
+    'connector' = 'kafka',
+    ...
+);
+
+INSERT INTO my_ctas_table (order_time, price, quantity, id)
+    SELECT id, price, quantity, order_time FROM source_table;
+```
+
 **Note:** CTAS has these restrictions:
 * Does not support creating a temporary table yet.
 * Does not support creating partitioned table yet.
diff --git 
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl 
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index d1ae33dd9ce..9e7bad6206c 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -1532,6 +1532,7 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean 
isTemporary) :
     SqlDistribution distribution = null;
     SqlNodeList partitionColumns = SqlNodeList.EMPTY;
     SqlParserPos pos = startPos;
+    boolean isColumnsIdentifiersOnly = false;
 }
 {
     <TABLE>
@@ -1541,12 +1542,10 @@ SqlCreate SqlCreateTable(Span s, boolean replace, 
boolean isTemporary) :
     tableName = CompoundIdentifier()
     [
         <LPAREN> { pos = getPos(); TableCreationContext ctx = new 
TableCreationContext();}
-        TableColumn(ctx)
-        (
-            <COMMA> TableColumn(ctx)
-        )*
+        TableColumnsOrIdentifiers(pos, ctx)
         {
             pos = pos.plus(getPos());
+            isColumnsIdentifiersOnly = ctx.isColumnsIdentifiersOnly();
             columnList = new SqlNodeList(ctx.columnList, pos);
             constraints = ctx.constraints;
             watermark = ctx.watermark;
@@ -1574,6 +1573,12 @@ SqlCreate SqlCreateTable(Span s, boolean replace, 
boolean isTemporary) :
         <LIKE>
         tableLike = SqlTableLike(getPos())
         {
+            if (isColumnsIdentifiersOnly) {
+                throw SqlUtil.newContextException(
+                    pos,
+                    ParserResource.RESOURCE.columnsIdentifiersUnsupported());
+            }
+
             return new SqlCreateTableLike(startPos.plus(getPos()),
                 tableName,
                 columnList,
@@ -1622,6 +1627,12 @@ SqlCreate SqlCreateTable(Span s, boolean replace, 
boolean isTemporary) :
         }
     ]
     {
+        if (isColumnsIdentifiersOnly) {
+            throw SqlUtil.newContextException(
+                pos,
+                ParserResource.RESOURCE.columnsIdentifiersUnsupported());
+        }
+
         return new SqlCreateTable(startPos.plus(getPos()),
             tableName,
             columnList,
@@ -1716,6 +1727,36 @@ SqlDrop SqlDropTable(Span s, boolean replace, boolean 
isTemporary) :
     }
 }
 
+void TableColumnsOrIdentifiers(SqlParserPos pos, TableCreationContext ctx) :
+{
+    final TableCreationContext tempCtx = new TableCreationContext();
+    final List<SqlNode> identifiers = new ArrayList<SqlNode>();
+}
+{
+    LOOKAHEAD(2)
+    (
+        TableColumn(tempCtx)
+        (<COMMA> TableColumn(tempCtx))*
+    ) {
+        ctx.columnList = tempCtx.columnList;
+        ctx.constraints = tempCtx.constraints;
+        ctx.watermark = tempCtx.watermark;
+    }
+    |
+    (
+        AddCompoundColumnIdentifier(identifiers)
+            (<COMMA> AddCompoundColumnIdentifier(identifiers))*
+    ) { ctx.columnList = identifiers; }
+}
+
+void AddCompoundColumnIdentifier(List<SqlNode> list) :
+{
+    final SqlIdentifier name;
+}
+{
+    name = CompoundIdentifier() { list.add(name); }
+}
+
 /**
 * Parser a REPLACE TABLE AS statement
 */
@@ -1746,10 +1787,7 @@ SqlNode SqlReplaceTable() :
     tableName = CompoundIdentifier()
     [
         <LPAREN> { pos = getPos(); TableCreationContext ctx = new 
TableCreationContext();}
-        TableColumn(ctx)
-        (
-            <COMMA> TableColumn(ctx)
-        )*
+        TableColumnsOrIdentifiers(pos, ctx)
         {
             pos = getPos();
             columnList = new SqlNodeList(ctx.columnList, pos);
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
index b26e5d1e734..96de6b79921 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
@@ -297,6 +297,10 @@ public class SqlCreateTable extends SqlCreate implements 
ExtendedSqlNode {
         public List<SqlTableConstraint> constraints = new ArrayList<>();
         @Nullable public SqlWatermark watermark;
         @Nullable public SqlDistribution distribution;
+
+        public boolean isColumnsIdentifiersOnly() {
+            return !columnList.isEmpty() && columnList.get(0) instanceof 
SqlIdentifier;
+        }
     }
 
     public String[] fullTableName() {
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableAs.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableAs.java
index c2edf89d356..c48fbfdf19e 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableAs.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableAs.java
@@ -118,7 +118,10 @@ public class SqlCreateTableAs extends SqlCreateTable {
 
     @Override
     public void validate() throws SqlValidateException {
-        super.validate();
+        if (!isSchemaWithColumnsIdentifiersOnly()) {
+            super.validate();
+        }
+
         if (isTemporary()) {
             throw new SqlValidateException(
                     getParserPosition(),
@@ -130,6 +133,13 @@ public class SqlCreateTableAs extends SqlCreateTable {
         return asQuery;
     }
 
+    public boolean isSchemaWithColumnsIdentifiersOnly() {
+        // CREATE AS SELECT supports passing only column identifiers in the 
column list. If
+        // the first column in the list is an identifier, then we assume the 
rest of the
+        // columns are identifiers as well.
+        return !getColumnList().isEmpty() && getColumnList().get(0) instanceof 
SqlIdentifier;
+    }
+
     @Override
     public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
         super.unparse(writer, leftPrec, rightPrec);
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java
index 65bb7056dbd..47d7b78b211 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java
@@ -158,7 +158,10 @@ public class SqlReplaceTableAs extends SqlCreate 
implements ExtendedSqlNode {
 
     @Override
     public void validate() throws SqlValidateException {
-        
SqlConstraintValidator.validateAndChangeColumnNullability(tableConstraints, 
columnList);
+        if (!isSchemaWithColumnsIdentifiersOnly()) {
+            
SqlConstraintValidator.validateAndChangeColumnNullability(tableConstraints, 
columnList);
+        }
+
         // The following features are not currently supported by RTAS, but may 
be supported in the
         // future
         String errorMsg =
@@ -221,6 +224,13 @@ public class SqlReplaceTableAs extends SqlCreate 
implements ExtendedSqlNode {
         return isTemporary;
     }
 
+    public boolean isSchemaWithColumnsIdentifiersOnly() {
+        // REPLACE table supports passing only column identifiers in the 
column list. If
+        // the first column in the list is an identifier, then we assume the 
rest of the
+        // columns are identifiers as well.
+        return !columnList.isEmpty() && columnList.get(0) instanceof 
SqlIdentifier;
+    }
+
     /** Returns the column constraints plus the table constraints. */
     public List<SqlTableConstraint> getFullConstraints() {
         return SqlConstraintValidator.getFullConstraints(tableConstraints, 
columnList);
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
index 0dbe275031b..9c29119ba9e 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
@@ -45,6 +45,10 @@ public interface ParserResource {
             "Unsupported CREATE OR REPLACE statement for EXPLAIN. The 
statement must define a query using the AS clause (i.e. CTAS/RTAS statements).")
     Resources.ExInst<ParseException> 
explainCreateOrReplaceStatementUnsupported();
 
+    @Resources.BaseMessage(
+            "Columns identifiers without types in the schema are supported on 
CTAS/RTAS statements only.")
+    Resources.ExInst<ParseException> columnsIdentifiersUnsupported();
+
     @Resources.BaseMessage("CREATE FUNCTION USING JAR syntax is not applicable 
to {0} language.")
     Resources.ExInst<ParseException> createFunctionUsingJar(String language);
 
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 0345886c434..c732859e5a8 100644
--- 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -2951,6 +2951,43 @@ class FlinkSqlParserImplTest extends SqlParserTest {
                 .node(new ValidationMatcher().ok());
     }
 
+    @Test
+    void testCreateTableAsSelectWithColumnIdentifiers() {
+        // test with only column identifiers
+        sql("CREATE TABLE t (col1) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+                .node(new ValidationMatcher().ok());
+
+        // test mix of column identifiers and column with types is not allowed
+        sql("CREATE TABLE t (col1, col2 ^int^) WITH ('test' = 'zm') AS SELECT 
col1 FROM b")
+                .fails("(?s).*Encountered \"int\" at line 1, column 28.*");
+    }
+
+    @Test
+    void testUnsupportedCreateTableStatementsWithColumnIdentifiers() {
+        String expectedErrorMsg =
+                "Columns identifiers without types in the schema are "
+                        + "supported on CTAS/RTAS statements only.";
+
+        sql("CREATE TABLE t ^(a, h^) WITH " + "('connector' = 'kafka', 
'kafka.topic' = 'log.test')")
+                .fails(expectedErrorMsg);
+
+        sql("CREATE TABLE t ^(a, h^) WITH "
+                        + "('connector' = 'kafka', 'kafka.topic' = 'log.test') 
"
+                        + "LIKE parent_table")
+                .fails(expectedErrorMsg);
+    }
+
+    @Test
+    void testReplaceTableAsSelectWithColumnIdentifiers() {
+        // test with only column identifiers
+        sql("REPLACE TABLE t (col1) WITH ('test' = 'zm') AS SELECT col1 FROM 
b")
+                .node(new ValidationMatcher().ok());
+
+        // test mix of column identifiers and column with types is not allowed
+        sql("REPLACE TABLE t (col1, col2 ^int^) WITH ('test' = 'zm') AS SELECT 
col1 FROM b")
+                .fails("(?s).*Encountered \"int\" at line 1, column 29.*");
+    }
+
     @Test
     void testReplaceTableAsSelect() {
         // test replace table as select without options
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableAsUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableAsUtil.java
index e743e5964a2..e3a1b08578c 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableAsUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableAsUtil.java
@@ -42,10 +42,10 @@ import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlLiteral;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
-import org.apache.calcite.sql.SqlSelect;
 import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.sql.validate.SqlValidator;
 
@@ -136,18 +136,16 @@ public class MergeTableAsUtil {
             }
         }
 
-        // if there are no new sink fields to include, then return the 
original query
-        if (assignedFields.isEmpty()) {
-            return origQueryOperation;
-        }
-
         // rewrite query
         SqlCall newSelect =
-                rewriterUtils.rewriteSelect(
-                        (SqlSelect) origQueryNode,
+                rewriterUtils.rewriteCall(
+                        rewriterUtils,
+                        sqlValidator,
+                        (SqlCall) origQueryNode,
                         typeFactory.buildRelNodeRowType(sinkRowType),
                         assignedFields,
-                        targetPositions);
+                        targetPositions,
+                        () -> "Unsupported node type " + 
origQueryNode.getKind());
 
         return (PlannerQueryOperation)
                 SqlNodeToOperationConversion.convert(flinkPlanner, 
catalogManager, newSelect)
@@ -212,6 +210,22 @@ public class MergeTableAsUtil {
         return schemaBuilder.build();
     }
 
+    /** Reorders the columns from the source schema based on the columns 
identifiers list. */
+    public Schema reorderSchema(SqlNodeList sqlColumnList, ResolvedSchema 
sourceSchema) {
+        SchemaBuilder schemaBuilder =
+                new SchemaBuilder(
+                        (FlinkTypeFactory) validator.getTypeFactory(),
+                        dataTypeFactory,
+                        validator,
+                        escapeExpression);
+
+        schemaBuilder.reorderColumns(
+                sqlColumnList,
+                
Schema.newBuilder().fromResolvedSchema(sourceSchema).build().getColumns());
+
+        return schemaBuilder.build();
+    }
+
     /**
      * Builder class for constructing a {@link Schema} based on the rules of 
the {@code CREATE TABLE
      * ... AS SELECT} statement.
@@ -311,6 +325,33 @@ public class MergeTableAsUtil {
             columns.putAll(sourceSchemaCols);
         }
 
+        /** Reorders the columns from the source schema based on the columns 
identifiers list. */
+        private void reorderColumns(List<SqlNode> identifiers, 
List<UnresolvedColumn> sourceCols) {
+            Map<String, UnresolvedColumn> sinkSchemaCols = new 
LinkedHashMap<>();
+            Map<String, UnresolvedColumn> sourceSchemaCols = new 
LinkedHashMap<>();
+
+            populateColumnsFromSource(sourceCols, sourceSchemaCols);
+
+            if (identifiers.size() != sourceCols.size()) {
+                throw new ValidationException(
+                        "The number of columns in the column list must match 
the number "
+                                + "of columns in the source schema.");
+            }
+
+            for (SqlNode identifier : identifiers) {
+                String name = ((SqlIdentifier) identifier).getSimple();
+                if (!sourceSchemaCols.containsKey(name)) {
+                    throw new ValidationException(
+                            String.format("Column '%s' not found in the source 
schema. ", name));
+                }
+
+                sinkSchemaCols.put(name, sourceSchemaCols.get(name));
+            }
+
+            columns.clear();
+            columns.putAll(sinkSchemaCols);
+        }
+
         /**
          * Populates the schema columns from the source schema. The source 
schema is expected to
          * contain only physical columns.
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
index 037fc4cde92..dbff49f5310 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
@@ -140,7 +140,7 @@ class SqlCreateTableConverter {
     }
 
     private ResolvedCatalogTable createCatalogTable(
-            SqlCreateTableAs sqlCreateTableAs, ResolvedSchema mergeSchema) {
+            SqlCreateTableAs sqlCreateTableAs, ResolvedSchema querySchema) {
         Map<String, String> tableOptions =
                 sqlCreateTableAs.getPropertyList().getList().stream()
                         .collect(
@@ -151,12 +151,20 @@ class SqlCreateTableConverter {
         String tableComment =
                 
OperationConverterUtils.getTableComment(sqlCreateTableAs.getComment());
 
-        Schema mergedSchema =
-                mergeTableAsUtil.mergeSchemas(
-                        sqlCreateTableAs.getColumnList(),
-                        sqlCreateTableAs.getWatermark().orElse(null),
-                        sqlCreateTableAs.getFullConstraints(),
-                        mergeSchema);
+        Schema mergedSchema;
+        if (sqlCreateTableAs.isSchemaWithColumnsIdentifiersOnly()) {
+            // If only column identifiers are provided, then these are used to
+            // order the columns in the schema.
+            mergedSchema =
+                    
mergeTableAsUtil.reorderSchema(sqlCreateTableAs.getColumnList(), querySchema);
+        } else {
+            mergedSchema =
+                    mergeTableAsUtil.mergeSchemas(
+                            sqlCreateTableAs.getColumnList(),
+                            sqlCreateTableAs.getWatermark().orElse(null),
+                            sqlCreateTableAs.getFullConstraints(),
+                            querySchema);
+        }
 
         Optional<TableDistribution> tableDistribution =
                 Optional.ofNullable(sqlCreateTableAs.getDistribution())
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java
index cf4d96cf26f..b2b5d8f1039 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java
@@ -111,7 +111,7 @@ public class SqlReplaceTableAsConverter implements 
SqlNodeConverter<SqlReplaceTa
             ConvertContext context,
             MergeTableAsUtil mergeTableAsUtil,
             SqlReplaceTableAs sqlReplaceTableAs,
-            ResolvedSchema mergeSchema) {
+            ResolvedSchema querySchema) {
         CatalogManager catalogManager = context.getCatalogManager();
 
         // get table comment
@@ -129,13 +129,21 @@ public class SqlReplaceTableAsConverter implements 
SqlNodeConverter<SqlReplaceTa
                                         ((SqlTableOption) p).getKeyString(),
                                         ((SqlTableOption) 
p).getValueString()));
 
-        // merge schemas
-        Schema mergedSchema =
-                mergeTableAsUtil.mergeSchemas(
-                        sqlReplaceTableAs.getColumnList(),
-                        sqlReplaceTableAs.getWatermark().orElse(null),
-                        sqlReplaceTableAs.getFullConstraints(),
-                        mergeSchema);
+        Schema mergedSchema;
+        if (sqlReplaceTableAs.isSchemaWithColumnsIdentifiersOnly()) {
+            // If only column identifiers are provided, then these are used to
+            // order the columns in the schema.
+            mergedSchema =
+                    
mergeTableAsUtil.reorderSchema(sqlReplaceTableAs.getColumnList(), querySchema);
+        } else {
+            // merge schemas
+            mergedSchema =
+                    mergeTableAsUtil.mergeSchemas(
+                            sqlReplaceTableAs.getColumnList(),
+                            sqlReplaceTableAs.getWatermark().orElse(null),
+                            sqlReplaceTableAs.getFullConstraints(),
+                            querySchema);
+        }
 
         // get distribution
         Optional<TableDistribution> tableDistribution =
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
index d47a91b4fb6..8be16db924f 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
@@ -220,63 +220,14 @@ object PreValidateReWriter {
       }
     }
 
-    rewriteSqlCall(rewriterUtils, validator, source, targetRowType, 
assignedFields, targetPosition)
-  }
-
-  private def rewriteSqlCall(
-      rewriterUtils: SqlRewriterUtils,
-      validator: FlinkCalciteSqlValidator,
-      call: SqlCall,
-      targetRowType: RelDataType,
-      assignedFields: util.LinkedHashMap[Integer, SqlNode],
-      targetPosition: util.List[Int]): SqlCall = {
-
-    def rewrite(node: SqlNode): SqlCall = {
-      checkArgument(node.isInstanceOf[SqlCall], node)
-      rewriteSqlCall(
-        rewriterUtils,
-        validator,
-        node.asInstanceOf[SqlCall],
-        targetRowType,
-        assignedFields,
-        targetPosition)
-    }
-
-    call.getKind match {
-      case SqlKind.SELECT =>
-        val sqlSelect = call.asInstanceOf[SqlSelect]
-
-        if (targetPosition.nonEmpty && sqlSelect.getSelectList.size() != 
targetPosition.size()) {
-          throw newValidationError(call, RESOURCE.columnCountMismatch())
-        }
-        rewriterUtils.rewriteSelect(sqlSelect, targetRowType, assignedFields, 
targetPosition)
-      case SqlKind.VALUES =>
-        call.getOperandList.toSeq.foreach {
-          case sqlCall: SqlCall => {
-            if (targetPosition.nonEmpty && sqlCall.getOperandList.size() != 
targetPosition.size()) {
-              throw newValidationError(call, RESOURCE.columnCountMismatch())
-            }
-          }
-        }
-        rewriterUtils.rewriteValues(call, targetRowType, assignedFields, 
targetPosition)
-      case kind if SqlKind.SET_QUERY.contains(kind) =>
-        call.getOperandList.zipWithIndex.foreach {
-          case (operand, index) => call.setOperand(index, rewrite(operand))
-        }
-        call
-      case SqlKind.ORDER_BY =>
-        val operands = call.getOperandList
-        new SqlOrderBy(
-          call.getParserPosition,
-          rewrite(operands.get(0)),
-          operands.get(1).asInstanceOf[SqlNodeList],
-          operands.get(2),
-          operands.get(3))
-      // Not support:
-      // case SqlKind.WITH =>
-      // case SqlKind.EXPLICIT_TABLE =>
-      case _ => throw new ValidationException(notSupported(call))
-    }
+    rewriterUtils.rewriteCall(
+      rewriterUtils,
+      validator,
+      source,
+      targetRowType,
+      assignedFields,
+      targetPosition,
+      () => notSupported(source))
   }
 
   /**
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala
index 64d426971d8..b9810b09f0d 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala
@@ -18,13 +18,19 @@
 package org.apache.flink.table.planner.calcite
 
 import org.apache.flink.sql.parser.`type`.SqlMapTypeNameSpec
-import 
org.apache.flink.table.planner.calcite.SqlRewriterUtils.{rewriteSqlSelect, 
rewriteSqlValues}
+import org.apache.flink.table.api.ValidationException
+import 
org.apache.flink.table.planner.calcite.PreValidateReWriter.{newValidationError, 
notSupported}
+import 
org.apache.flink.table.planner.calcite.SqlRewriterUtils.{rewriteSqlCall, 
rewriteSqlSelect, rewriteSqlValues}
+import org.apache.flink.util.Preconditions.checkArgument
 
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.runtime.{CalciteContextException, Resources}
 import org.apache.calcite.sql.`type`.SqlTypeUtil
-import org.apache.calcite.sql.{SqlCall, SqlDataTypeSpec, SqlKind, SqlNode, 
SqlNodeList, SqlSelect}
+import org.apache.calcite.sql.{SqlCall, SqlDataTypeSpec, SqlKind, SqlNode, 
SqlNodeList, SqlOrderBy, SqlSelect, SqlUtil}
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.validate.SqlValidatorException
+import org.apache.calcite.util.Static.RESOURCE
 
 import java.util
 import java.util.Collections
@@ -48,6 +54,24 @@ class SqlRewriterUtils(validator: FlinkCalciteSqlValidator) {
     rewriteSqlValues(svalues, targetRowType, assignedFields, targetPosition)
   }
 
+  def rewriteCall(
+      rewriterUtils: SqlRewriterUtils,
+      validator: FlinkCalciteSqlValidator,
+      call: SqlCall,
+      targetRowType: RelDataType,
+      assignedFields: util.LinkedHashMap[Integer, SqlNode],
+      targetPosition: util.List[Int],
+      unsupportedErrorMessage: () => String): SqlCall = {
+    rewriteSqlCall(
+      rewriterUtils,
+      validator,
+      call,
+      targetRowType,
+      assignedFields,
+      targetPosition,
+      unsupportedErrorMessage)
+  }
+
   // This code snippet is copied from the SqlValidatorImpl.
   def maybeCast(
       node: SqlNode,
@@ -82,6 +106,64 @@ class SqlRewriterUtils(validator: FlinkCalciteSqlValidator) 
{
 }
 
 object SqlRewriterUtils {
+  def rewriteSqlCall(
+      rewriterUtils: SqlRewriterUtils,
+      validator: FlinkCalciteSqlValidator,
+      call: SqlCall,
+      targetRowType: RelDataType,
+      assignedFields: util.LinkedHashMap[Integer, SqlNode],
+      targetPosition: util.List[Int],
+      unsupportedErrorMessage: () => String): SqlCall = {
+
+    def rewrite(node: SqlNode): SqlCall = {
+      checkArgument(node.isInstanceOf[SqlCall], node)
+      rewriteSqlCall(
+        rewriterUtils,
+        validator,
+        node.asInstanceOf[SqlCall],
+        targetRowType,
+        assignedFields,
+        targetPosition,
+        unsupportedErrorMessage)
+    }
+
+    call.getKind match {
+      case SqlKind.SELECT =>
+        val sqlSelect = call.asInstanceOf[SqlSelect]
+
+        if (targetPosition.nonEmpty && sqlSelect.getSelectList.size() != 
targetPosition.size()) {
+          throw newValidationError(call, RESOURCE.columnCountMismatch())
+        }
+        rewriterUtils.rewriteSelect(sqlSelect, targetRowType, assignedFields, 
targetPosition)
+      case SqlKind.VALUES =>
+        call.getOperandList.toSeq.foreach {
+          case sqlCall: SqlCall => {
+            if (targetPosition.nonEmpty && sqlCall.getOperandList.size() != 
targetPosition.size()) {
+              throw newValidationError(call, RESOURCE.columnCountMismatch())
+            }
+          }
+        }
+        rewriterUtils.rewriteValues(call, targetRowType, assignedFields, 
targetPosition)
+      case kind if SqlKind.SET_QUERY.contains(kind) =>
+        call.getOperandList.zipWithIndex.foreach {
+          case (operand, index) => call.setOperand(index, rewrite(operand))
+        }
+        call
+      case SqlKind.ORDER_BY =>
+        val operands = call.getOperandList
+        new SqlOrderBy(
+          call.getParserPosition,
+          rewrite(operands.get(0)),
+          operands.get(1).asInstanceOf[SqlNodeList],
+          operands.get(2),
+          operands.get(3))
+      // Not support:
+      // case SqlKind.WITH =>
+      // case SqlKind.EXPLICIT_TABLE =>
+      case _ => throw new ValidationException(unsupportedErrorMessage())
+    }
+  }
+
   def rewriteSqlSelect(
       validator: FlinkCalciteSqlValidator,
       select: SqlSelect,
@@ -156,6 +238,14 @@ object SqlRewriterUtils {
     SqlStdOperatorTable.VALUES.createCall(values.getParserPosition, fixedNodes)
   }
 
+  def newValidationError(
+      node: SqlNode,
+      e: Resources.ExInst[SqlValidatorException]): CalciteContextException = {
+    assert(node != null)
+    val pos = node.getParserPosition
+    SqlUtil.newContextException(pos, e)
+  }
+
   /**
    * Reorder sourceList to targetPosition. For example:
    *   - sourceList(f0, f1, f2).
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
index e758296f2ec..6904763dbf6 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
@@ -658,6 +658,80 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
                         "Invalid bucket key 'f3'. A bucket key for a 
distribution must reference a physical column in the schema. Available columns 
are: [a]");
     }
 
+    @Test
+    public void tesCreateTableAsWithOrderingColumns() {
+        CatalogTable catalogTable =
+                CatalogTable.newBuilder()
+                        .schema(
+                                Schema.newBuilder()
+                                        .column("f0", 
DataTypes.INT().notNull())
+                                        .column("f1", DataTypes.TIMESTAMP(3))
+                                        .build())
+                        .build();
+
+        catalogManager.createTable(
+                catalogTable, ObjectIdentifier.of("builtin", "default", 
"src1"), false);
+
+        final String sql = "create table tbl1 (f1, f0) AS SELECT * FROM src1";
+
+        Operation ctas = parseAndConvert(sql);
+        Operation operation = ((CreateTableASOperation) 
ctas).getCreateTableOperation();
+        assertThat(operation)
+                .is(
+                        new HamcrestCondition<>(
+                                isCreateTableOperation(
+                                        withNoDistribution(),
+                                        withSchema(
+                                                Schema.newBuilder()
+                                                        .column("f1", 
DataTypes.TIMESTAMP(3))
+                                                        .column("f0", 
DataTypes.INT().notNull())
+                                                        .build()))));
+    }
+
+    @Test
+    public void testCreateTableAsWithNotFoundColumnIdentifiers() {
+        CatalogTable catalogTable =
+                CatalogTable.newBuilder()
+                        .schema(
+                                Schema.newBuilder()
+                                        .column("f0", 
DataTypes.INT().notNull())
+                                        .column("f1", DataTypes.INT())
+                                        .build())
+                        .build();
+
+        catalogManager.createTable(
+                catalogTable, ObjectIdentifier.of("builtin", "default", 
"src1"), false);
+
+        final String sql = "create table tbl1 (f1, f2) AS SELECT * FROM src1";
+
+        assertThatThrownBy(() -> parseAndConvert(sql))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining("Column 'f2' not found in the source 
schema.");
+    }
+
+    @Test
+    public void testCreateTableAsWithMismatchIdentifiersLength() {
+        CatalogTable catalogTable =
+                CatalogTable.newBuilder()
+                        .schema(
+                                Schema.newBuilder()
+                                        .column("f0", 
DataTypes.INT().notNull())
+                                        .column("f1", DataTypes.INT())
+                                        .build())
+                        .build();
+
+        catalogManager.createTable(
+                catalogTable, ObjectIdentifier.of("builtin", "default", 
"src1"), false);
+
+        final String sql = "create table tbl1 (f1) AS SELECT * FROM src1";
+
+        assertThatThrownBy(() -> parseAndConvert(sql))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "The number of columns in the column list "
+                                + "must match the number of columns in the 
source schema.");
+    }
+
     @Test
     public void testCreateTableAsWithColumns() {
         CatalogTable catalogTable =
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
index ee6e8bca658..78ed39bf980 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
@@ -61,6 +61,50 @@ public class SqlRTASNodeToOperationConverterTest extends 
SqlNodeToOperationConve
         testCommonReplaceTableAs(sql, tableName, tableComment);
     }
 
+    @Test
+    public void testReplaceTableAsWithOrderingColumns() {
+        String tableName = "replace_table";
+        String sql =
+                "REPLACE TABLE "
+                        + tableName
+                        + " (a, b) WITH ('k1' = 'v1', 'k2' = 'v2') as SELECT 
b, a FROM t1";
+        Schema tableSchema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.BIGINT().notNull())
+                        .column("b", DataTypes.STRING())
+                        .build();
+
+        testCommonReplaceTableAs(sql, tableName, null, tableSchema, null, 
Collections.emptyList());
+    }
+
+    @Test
+    public void testReplaceTableAsWithNotFoundColumnIdentifiers() {
+        String tableName = "replace_table";
+        String sql =
+                "REPLACE TABLE "
+                        + tableName
+                        + " (a, d) WITH ('k1' = 'v1', 'k2' = 'v2') as SELECT 
b, a FROM t1";
+
+        assertThatThrownBy(() -> parseAndConvert(sql))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining("Column 'd' not found in the source 
schema.");
+    }
+
+    @Test
+    public void testReplaceTableAsWithMismatchIdentifiersLength() {
+        String tableName = "replace_table";
+        String sql =
+                "REPLACE TABLE "
+                        + tableName
+                        + " (a) WITH ('k1' = 'v1', 'k2' = 'v2') as SELECT b, a 
FROM t1";
+
+        assertThatThrownBy(() -> parseAndConvert(sql))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "The number of columns in the column list "
+                                + "must match the number of columns in the 
source schema.");
+    }
+
     @Test
     public void testCreateOrReplaceTableAs() {
         String tableName = "create_or_replace_table";
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/RTASITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/RTASITCase.java
index 2ce2bc6dffe..61e1c0d087b 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/RTASITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/RTASITCase.java
@@ -188,6 +188,28 @@ class RTASITCase extends StreamingTestBase {
         verifyCatalogTable(expectCatalogTable, getCatalogTable("target"));
     }
 
+    @Test
+    void testReplaceTableAsSelectWithColumnOrdering() throws Exception {
+        tEnv().executeSql(
+                        "REPLACE TABLE target"
+                                + " (c, a)"
+                                + " WITH ('connector' = 'values', 'bounded' = 
'true')"
+                                + " AS SELECT a, c FROM source")
+                .await();
+
+        // verify written rows
+        
assertThat(TestValuesTableFactory.getResultsAsStrings("target").toString())
+                .isEqualTo("[" + "+I[Hi, 1], " + "+I[Hello, 2], " + "+I[Hello 
world, 3]" + "]");
+
+        // verify the table after replacing
+        CatalogTable expectCatalogTable =
+                getExpectCatalogTable(
+                        new String[] {"c", "a"},
+                        new AbstractDataType[] {DataTypes.STRING(), 
DataTypes.INT()});
+
+        verifyCatalogTable(expectCatalogTable, getCatalogTable("target"));
+    }
+
     @Test
     void testCreateOrReplaceTableASWithSortLimit() throws Exception {
         tEnv().executeSql(
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
index 364feaad4e6..d0512adcb85 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
@@ -337,6 +337,49 @@ class TableSinkITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
         " managed table relies on checkpoint to commit and the data is visible 
only after commit.")
   }
 
+  @TestTemplate
+  def testCreateTableAsSelectWithColumnOrdering(): Unit = {
+    tEnv
+      .executeSql("""
+                    |CREATE TABLE MyCtasTable(votes, person)
+                    | WITH (
+                    |   'connector' = 'values',
+                    |   'sink-insert-only' = 'true'
+                    |) AS
+                    |  SELECT
+                    |    `person`,
+                    |    `votes`
+                    |  FROM
+                    |    src
+                    |""".stripMargin)
+      .await()
+    val actual = TestValuesTableFactory.getResultsAsStrings("MyCtasTable")
+    val expected = List(
+      "+I[1, jason]",
+      "+I[1, jason]",
+      "+I[1, jason]",
+      "+I[1, jason]"
+    )
+    assertThat(actual.sorted).isEqualTo(expected.sorted)
+    // test statement set
+    val statementSet = tEnv.createStatementSet()
+    statementSet.addInsertSql("""
+                                |CREATE TABLE MyCtasTableUseStatement(votes, 
person)
+                                | WITH (
+                                |   'connector' = 'values',
+                                |   'sink-insert-only' = 'true'
+                                |) AS
+                                |  SELECT
+                                |    `person`,
+                                |    `votes`
+                                |  FROM
+                                |    src
+                                |""".stripMargin)
+    statementSet.execute().await()
+    val actualUseStatement = 
TestValuesTableFactory.getResultsAsStrings("MyCtasTableUseStatement")
+    assertThat(actualUseStatement.sorted).isEqualTo(expected.sorted)
+  }
+
   @TestTemplate
   def testCreateTableAsSelectWithNewColumnsOnly(): Unit = {
     tEnv


Reply via email to