This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new baa67f465b7 [FLINK-35709][table-planner] Allow reordering source
columns in CTAS and RTAS
baa67f465b7 is described below
commit baa67f465b77c9b3131d8153b52d2a94e3e00ef9
Author: Sergio Peña <[email protected]>
AuthorDate: Thu Oct 3 02:38:31 2024 -0500
[FLINK-35709][table-planner] Allow reordering source columns in CTAS and
RTAS
---
docs/content.zh/docs/dev/table/sql/create.md | 31 +++++++
docs/content/docs/dev/table/sql/create.md | 31 +++++++
.../src/main/codegen/includes/parserImpls.ftl | 54 +++++++++++--
.../flink/sql/parser/ddl/SqlCreateTable.java | 4 +
.../flink/sql/parser/ddl/SqlCreateTableAs.java | 12 ++-
.../flink/sql/parser/ddl/SqlReplaceTableAs.java | 12 ++-
.../flink/sql/parser/utils/ParserResource.java | 4 +
.../flink/sql/parser/FlinkSqlParserImplTest.java | 37 +++++++++
.../table/planner/operations/MergeTableAsUtil.java | 59 +++++++++++---
.../operations/SqlCreateTableConverter.java | 22 +++--
.../converters/SqlReplaceTableAsConverter.java | 24 ++++--
.../planner/calcite/PreValidateReWriter.scala | 65 ++-------------
.../table/planner/calcite/SqlRewriterUtils.scala | 94 +++++++++++++++++++++-
.../operations/SqlDdlToOperationConverterTest.java | 74 +++++++++++++++++
.../SqlRTASNodeToOperationConverterTest.java | 44 ++++++++++
.../planner/runtime/stream/sql/RTASITCase.java | 22 +++++
.../runtime/stream/sql/TableSinkITCase.scala | 43 ++++++++++
17 files changed, 539 insertions(+), 93 deletions(-)
diff --git a/docs/content.zh/docs/dev/table/sql/create.md
b/docs/content.zh/docs/dev/table/sql/create.md
index 717c0050230..650c1b28dc3 100644
--- a/docs/content.zh/docs/dev/table/sql/create.md
+++ b/docs/content.zh/docs/dev/table/sql/create.md
@@ -643,6 +643,37 @@ CREATE TABLE my_ctas_table (
INSERT INTO my_ctas_table SELECT id, name FROM source_table;
```
+`CTAS` also allows you to reorder the columns defined in the `SELECT` part by
specifying all column names without data types in the `CREATE` part. This
feature is equivalent to the `INSERT INTO` statement.
+The columns specified must match the names and number of columns in the
`SELECT` part. This definition cannot be combined with new columns, which
requires defining data types.
+
+Consider the example statement below:
+
+```sql
+CREATE TABLE my_ctas_table (
+ order_time, price, quantity, id
+) WITH (
+ 'connector' = 'kafka',
+ ...
+) AS SELECT id, price, quantity, order_time FROM source_table;
+```
+
+The resulting table `my_ctas_table` will be equivalent to create the following
table and insert the data with the following statement:
+
+```
+CREATE TABLE my_ctas_table (
+ order_time TIMESTAMP(3),
+ price DOUBLE,
+ quantity DOUBLE,
+ id BIGINT
+) WITH (
+ 'connector' = 'kafka',
+ ...
+);
+
+INSERT INTO my_ctas_table (order_time, price, quantity, id)
+ SELECT id, price, quantity, order_time FROM source_table;
+```
+
**Note:** CTAS has these restrictions:
* Does not support creating a temporary table yet.
* Does not support creating partitioned table yet.
diff --git a/docs/content/docs/dev/table/sql/create.md
b/docs/content/docs/dev/table/sql/create.md
index 2957a656790..3a564d3ffb1 100644
--- a/docs/content/docs/dev/table/sql/create.md
+++ b/docs/content/docs/dev/table/sql/create.md
@@ -643,6 +643,37 @@ CREATE TABLE my_ctas_table (
INSERT INTO my_ctas_table SELECT id, name FROM source_table;
```
+`CTAS` also allows you to reorder the columns defined in the `SELECT` part by
specifying all column names without data types in the `CREATE` part. This
feature is equivalent to the `INSERT INTO` statement.
+The columns specified must match the names and number of columns in the
`SELECT` part. This definition cannot be combined with new columns, which
requires defining data types.
+
+Consider the example statement below:
+
+```sql
+CREATE TABLE my_ctas_table (
+ order_time, price, quantity, id
+) WITH (
+ 'connector' = 'kafka',
+ ...
+) AS SELECT id, price, quantity, order_time FROM source_table;
+```
+
+The resulting table `my_ctas_table` will be equivalent to create the following
table and insert the data with the following statement:
+
+```
+CREATE TABLE my_ctas_table (
+ order_time TIMESTAMP(3),
+ price DOUBLE,
+ quantity DOUBLE,
+ id BIGINT
+) WITH (
+ 'connector' = 'kafka',
+ ...
+);
+
+INSERT INTO my_ctas_table (order_time, price, quantity, id)
+ SELECT id, price, quantity, order_time FROM source_table;
+```
+
**Note:** CTAS has these restrictions:
* Does not support creating a temporary table yet.
* Does not support creating partitioned table yet.
diff --git
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index d1ae33dd9ce..9e7bad6206c 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -1532,6 +1532,7 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean
isTemporary) :
SqlDistribution distribution = null;
SqlNodeList partitionColumns = SqlNodeList.EMPTY;
SqlParserPos pos = startPos;
+ boolean isColumnsIdentifiersOnly = false;
}
{
<TABLE>
@@ -1541,12 +1542,10 @@ SqlCreate SqlCreateTable(Span s, boolean replace,
boolean isTemporary) :
tableName = CompoundIdentifier()
[
<LPAREN> { pos = getPos(); TableCreationContext ctx = new
TableCreationContext();}
- TableColumn(ctx)
- (
- <COMMA> TableColumn(ctx)
- )*
+ TableColumnsOrIdentifiers(pos, ctx)
{
pos = pos.plus(getPos());
+ isColumnsIdentifiersOnly = ctx.isColumnsIdentifiersOnly();
columnList = new SqlNodeList(ctx.columnList, pos);
constraints = ctx.constraints;
watermark = ctx.watermark;
@@ -1574,6 +1573,12 @@ SqlCreate SqlCreateTable(Span s, boolean replace,
boolean isTemporary) :
<LIKE>
tableLike = SqlTableLike(getPos())
{
+ if (isColumnsIdentifiersOnly) {
+ throw SqlUtil.newContextException(
+ pos,
+ ParserResource.RESOURCE.columnsIdentifiersUnsupported());
+ }
+
return new SqlCreateTableLike(startPos.plus(getPos()),
tableName,
columnList,
@@ -1622,6 +1627,12 @@ SqlCreate SqlCreateTable(Span s, boolean replace,
boolean isTemporary) :
}
]
{
+ if (isColumnsIdentifiersOnly) {
+ throw SqlUtil.newContextException(
+ pos,
+ ParserResource.RESOURCE.columnsIdentifiersUnsupported());
+ }
+
return new SqlCreateTable(startPos.plus(getPos()),
tableName,
columnList,
@@ -1716,6 +1727,36 @@ SqlDrop SqlDropTable(Span s, boolean replace, boolean
isTemporary) :
}
}
+void TableColumnsOrIdentifiers(SqlParserPos pos, TableCreationContext ctx) :
+{
+ final TableCreationContext tempCtx = new TableCreationContext();
+ final List<SqlNode> identifiers = new ArrayList<SqlNode>();
+}
+{
+ LOOKAHEAD(2)
+ (
+ TableColumn(tempCtx)
+ (<COMMA> TableColumn(tempCtx))*
+ ) {
+ ctx.columnList = tempCtx.columnList;
+ ctx.constraints = tempCtx.constraints;
+ ctx.watermark = tempCtx.watermark;
+ }
+ |
+ (
+ AddCompoundColumnIdentifier(identifiers)
+ (<COMMA> AddCompoundColumnIdentifier(identifiers))*
+ ) { ctx.columnList = identifiers; }
+}
+
+void AddCompoundColumnIdentifier(List<SqlNode> list) :
+{
+ final SqlIdentifier name;
+}
+{
+ name = CompoundIdentifier() { list.add(name); }
+}
+
/**
* Parser a REPLACE TABLE AS statement
*/
@@ -1746,10 +1787,7 @@ SqlNode SqlReplaceTable() :
tableName = CompoundIdentifier()
[
<LPAREN> { pos = getPos(); TableCreationContext ctx = new
TableCreationContext();}
- TableColumn(ctx)
- (
- <COMMA> TableColumn(ctx)
- )*
+ TableColumnsOrIdentifiers(pos, ctx)
{
pos = getPos();
columnList = new SqlNodeList(ctx.columnList, pos);
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
index b26e5d1e734..96de6b79921 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
@@ -297,6 +297,10 @@ public class SqlCreateTable extends SqlCreate implements
ExtendedSqlNode {
public List<SqlTableConstraint> constraints = new ArrayList<>();
@Nullable public SqlWatermark watermark;
@Nullable public SqlDistribution distribution;
+
+ public boolean isColumnsIdentifiersOnly() {
+ return !columnList.isEmpty() && columnList.get(0) instanceof
SqlIdentifier;
+ }
}
public String[] fullTableName() {
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableAs.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableAs.java
index c2edf89d356..c48fbfdf19e 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableAs.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableAs.java
@@ -118,7 +118,10 @@ public class SqlCreateTableAs extends SqlCreateTable {
@Override
public void validate() throws SqlValidateException {
- super.validate();
+ if (!isSchemaWithColumnsIdentifiersOnly()) {
+ super.validate();
+ }
+
if (isTemporary()) {
throw new SqlValidateException(
getParserPosition(),
@@ -130,6 +133,13 @@ public class SqlCreateTableAs extends SqlCreateTable {
return asQuery;
}
+ public boolean isSchemaWithColumnsIdentifiersOnly() {
+ // CREATE AS SELECT supports passing only column identifiers in the
column list. If
+ // the first column in the list is an identifier, then we assume the
rest of the
+ // columns are identifiers as well.
+ return !getColumnList().isEmpty() && getColumnList().get(0) instanceof
SqlIdentifier;
+ }
+
@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
super.unparse(writer, leftPrec, rightPrec);
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java
index 65bb7056dbd..47d7b78b211 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java
@@ -158,7 +158,10 @@ public class SqlReplaceTableAs extends SqlCreate
implements ExtendedSqlNode {
@Override
public void validate() throws SqlValidateException {
-
SqlConstraintValidator.validateAndChangeColumnNullability(tableConstraints,
columnList);
+ if (!isSchemaWithColumnsIdentifiersOnly()) {
+
SqlConstraintValidator.validateAndChangeColumnNullability(tableConstraints,
columnList);
+ }
+
// The following features are not currently supported by RTAS, but may
be supported in the
// future
String errorMsg =
@@ -221,6 +224,13 @@ public class SqlReplaceTableAs extends SqlCreate
implements ExtendedSqlNode {
return isTemporary;
}
+ public boolean isSchemaWithColumnsIdentifiersOnly() {
+ // REPLACE table supports passing only column identifiers in the
column list. If
+ // the first column in the list is an identifier, then we assume the
rest of the
+ // columns are identifiers as well.
+ return !columnList.isEmpty() && columnList.get(0) instanceof
SqlIdentifier;
+ }
+
/** Returns the column constraints plus the table constraints. */
public List<SqlTableConstraint> getFullConstraints() {
return SqlConstraintValidator.getFullConstraints(tableConstraints,
columnList);
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
index 0dbe275031b..9c29119ba9e 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
@@ -45,6 +45,10 @@ public interface ParserResource {
"Unsupported CREATE OR REPLACE statement for EXPLAIN. The
statement must define a query using the AS clause (i.e. CTAS/RTAS statements).")
Resources.ExInst<ParseException>
explainCreateOrReplaceStatementUnsupported();
+ @Resources.BaseMessage(
+ "Columns identifiers without types in the schema are supported on
CTAS/RTAS statements only.")
+ Resources.ExInst<ParseException> columnsIdentifiersUnsupported();
+
@Resources.BaseMessage("CREATE FUNCTION USING JAR syntax is not applicable
to {0} language.")
Resources.ExInst<ParseException> createFunctionUsingJar(String language);
diff --git
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 0345886c434..c732859e5a8 100644
---
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -2951,6 +2951,43 @@ class FlinkSqlParserImplTest extends SqlParserTest {
.node(new ValidationMatcher().ok());
}
+ @Test
+ void testCreateTableAsSelectWithColumnIdentifiers() {
+ // test with only column identifiers
+ sql("CREATE TABLE t (col1) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+ .node(new ValidationMatcher().ok());
+
+ // test mix of column identifiers and column with types is not allowed
+ sql("CREATE TABLE t (col1, col2 ^int^) WITH ('test' = 'zm') AS SELECT
col1 FROM b")
+ .fails("(?s).*Encountered \"int\" at line 1, column 28.*");
+ }
+
+ @Test
+ void testUnsupportedCreateTableStatementsWithColumnIdentifiers() {
+ String expectedErrorMsg =
+ "Columns identifiers without types in the schema are "
+ + "supported on CTAS/RTAS statements only.";
+
+ sql("CREATE TABLE t ^(a, h^) WITH " + "('connector' = 'kafka',
'kafka.topic' = 'log.test')")
+ .fails(expectedErrorMsg);
+
+ sql("CREATE TABLE t ^(a, h^) WITH "
+ + "('connector' = 'kafka', 'kafka.topic' = 'log.test')
"
+ + "LIKE parent_table")
+ .fails(expectedErrorMsg);
+ }
+
+ @Test
+ void testReplaceTableAsSelectWithColumnIdentifiers() {
+ // test with only column identifiers
+ sql("REPLACE TABLE t (col1) WITH ('test' = 'zm') AS SELECT col1 FROM
b")
+ .node(new ValidationMatcher().ok());
+
+ // test mix of column identifiers and column with types is not allowed
+ sql("REPLACE TABLE t (col1, col2 ^int^) WITH ('test' = 'zm') AS SELECT
col1 FROM b")
+ .fails("(?s).*Encountered \"int\" at line 1, column 29.*");
+ }
+
@Test
void testReplaceTableAsSelect() {
// test replace table as select without options
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableAsUtil.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableAsUtil.java
index e743e5964a2..e3a1b08578c 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableAsUtil.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableAsUtil.java
@@ -42,10 +42,10 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
-import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.validate.SqlValidator;
@@ -136,18 +136,16 @@ public class MergeTableAsUtil {
}
}
- // if there are no new sink fields to include, then return the
original query
- if (assignedFields.isEmpty()) {
- return origQueryOperation;
- }
-
// rewrite query
SqlCall newSelect =
- rewriterUtils.rewriteSelect(
- (SqlSelect) origQueryNode,
+ rewriterUtils.rewriteCall(
+ rewriterUtils,
+ sqlValidator,
+ (SqlCall) origQueryNode,
typeFactory.buildRelNodeRowType(sinkRowType),
assignedFields,
- targetPositions);
+ targetPositions,
+ () -> "Unsupported node type " +
origQueryNode.getKind());
return (PlannerQueryOperation)
SqlNodeToOperationConversion.convert(flinkPlanner,
catalogManager, newSelect)
@@ -212,6 +210,22 @@ public class MergeTableAsUtil {
return schemaBuilder.build();
}
+ /** Reorders the columns from the source schema based on the columns
identifiers list. */
+ public Schema reorderSchema(SqlNodeList sqlColumnList, ResolvedSchema
sourceSchema) {
+ SchemaBuilder schemaBuilder =
+ new SchemaBuilder(
+ (FlinkTypeFactory) validator.getTypeFactory(),
+ dataTypeFactory,
+ validator,
+ escapeExpression);
+
+ schemaBuilder.reorderColumns(
+ sqlColumnList,
+
Schema.newBuilder().fromResolvedSchema(sourceSchema).build().getColumns());
+
+ return schemaBuilder.build();
+ }
+
/**
* Builder class for constructing a {@link Schema} based on the rules of
the {@code CREATE TABLE
* ... AS SELECT} statement.
@@ -311,6 +325,33 @@ public class MergeTableAsUtil {
columns.putAll(sourceSchemaCols);
}
+ /** Reorders the columns from the source schema based on the columns
identifiers list. */
+ private void reorderColumns(List<SqlNode> identifiers,
List<UnresolvedColumn> sourceCols) {
+ Map<String, UnresolvedColumn> sinkSchemaCols = new
LinkedHashMap<>();
+ Map<String, UnresolvedColumn> sourceSchemaCols = new
LinkedHashMap<>();
+
+ populateColumnsFromSource(sourceCols, sourceSchemaCols);
+
+ if (identifiers.size() != sourceCols.size()) {
+ throw new ValidationException(
+ "The number of columns in the column list must match
the number "
+ + "of columns in the source schema.");
+ }
+
+ for (SqlNode identifier : identifiers) {
+ String name = ((SqlIdentifier) identifier).getSimple();
+ if (!sourceSchemaCols.containsKey(name)) {
+ throw new ValidationException(
+ String.format("Column '%s' not found in the source
schema. ", name));
+ }
+
+ sinkSchemaCols.put(name, sourceSchemaCols.get(name));
+ }
+
+ columns.clear();
+ columns.putAll(sinkSchemaCols);
+ }
+
/**
* Populates the schema columns from the source schema. The source
schema is expected to
* contain only physical columns.
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
index 037fc4cde92..dbff49f5310 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
@@ -140,7 +140,7 @@ class SqlCreateTableConverter {
}
private ResolvedCatalogTable createCatalogTable(
- SqlCreateTableAs sqlCreateTableAs, ResolvedSchema mergeSchema) {
+ SqlCreateTableAs sqlCreateTableAs, ResolvedSchema querySchema) {
Map<String, String> tableOptions =
sqlCreateTableAs.getPropertyList().getList().stream()
.collect(
@@ -151,12 +151,20 @@ class SqlCreateTableConverter {
String tableComment =
OperationConverterUtils.getTableComment(sqlCreateTableAs.getComment());
- Schema mergedSchema =
- mergeTableAsUtil.mergeSchemas(
- sqlCreateTableAs.getColumnList(),
- sqlCreateTableAs.getWatermark().orElse(null),
- sqlCreateTableAs.getFullConstraints(),
- mergeSchema);
+ Schema mergedSchema;
+ if (sqlCreateTableAs.isSchemaWithColumnsIdentifiersOnly()) {
+ // If only column identifiers are provided, then these are used to
+ // order the columns in the schema.
+ mergedSchema =
+
mergeTableAsUtil.reorderSchema(sqlCreateTableAs.getColumnList(), querySchema);
+ } else {
+ mergedSchema =
+ mergeTableAsUtil.mergeSchemas(
+ sqlCreateTableAs.getColumnList(),
+ sqlCreateTableAs.getWatermark().orElse(null),
+ sqlCreateTableAs.getFullConstraints(),
+ querySchema);
+ }
Optional<TableDistribution> tableDistribution =
Optional.ofNullable(sqlCreateTableAs.getDistribution())
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java
index cf4d96cf26f..b2b5d8f1039 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java
@@ -111,7 +111,7 @@ public class SqlReplaceTableAsConverter implements
SqlNodeConverter<SqlReplaceTa
ConvertContext context,
MergeTableAsUtil mergeTableAsUtil,
SqlReplaceTableAs sqlReplaceTableAs,
- ResolvedSchema mergeSchema) {
+ ResolvedSchema querySchema) {
CatalogManager catalogManager = context.getCatalogManager();
// get table comment
@@ -129,13 +129,21 @@ public class SqlReplaceTableAsConverter implements
SqlNodeConverter<SqlReplaceTa
((SqlTableOption) p).getKeyString(),
((SqlTableOption)
p).getValueString()));
- // merge schemas
- Schema mergedSchema =
- mergeTableAsUtil.mergeSchemas(
- sqlReplaceTableAs.getColumnList(),
- sqlReplaceTableAs.getWatermark().orElse(null),
- sqlReplaceTableAs.getFullConstraints(),
- mergeSchema);
+ Schema mergedSchema;
+ if (sqlReplaceTableAs.isSchemaWithColumnsIdentifiersOnly()) {
+ // If only column identifiers are provided, then these are used to
+ // order the columns in the schema.
+ mergedSchema =
+
mergeTableAsUtil.reorderSchema(sqlReplaceTableAs.getColumnList(), querySchema);
+ } else {
+ // merge schemas
+ mergedSchema =
+ mergeTableAsUtil.mergeSchemas(
+ sqlReplaceTableAs.getColumnList(),
+ sqlReplaceTableAs.getWatermark().orElse(null),
+ sqlReplaceTableAs.getFullConstraints(),
+ querySchema);
+ }
// get distribution
Optional<TableDistribution> tableDistribution =
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
index d47a91b4fb6..8be16db924f 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
@@ -220,63 +220,14 @@ object PreValidateReWriter {
}
}
- rewriteSqlCall(rewriterUtils, validator, source, targetRowType,
assignedFields, targetPosition)
- }
-
- private def rewriteSqlCall(
- rewriterUtils: SqlRewriterUtils,
- validator: FlinkCalciteSqlValidator,
- call: SqlCall,
- targetRowType: RelDataType,
- assignedFields: util.LinkedHashMap[Integer, SqlNode],
- targetPosition: util.List[Int]): SqlCall = {
-
- def rewrite(node: SqlNode): SqlCall = {
- checkArgument(node.isInstanceOf[SqlCall], node)
- rewriteSqlCall(
- rewriterUtils,
- validator,
- node.asInstanceOf[SqlCall],
- targetRowType,
- assignedFields,
- targetPosition)
- }
-
- call.getKind match {
- case SqlKind.SELECT =>
- val sqlSelect = call.asInstanceOf[SqlSelect]
-
- if (targetPosition.nonEmpty && sqlSelect.getSelectList.size() !=
targetPosition.size()) {
- throw newValidationError(call, RESOURCE.columnCountMismatch())
- }
- rewriterUtils.rewriteSelect(sqlSelect, targetRowType, assignedFields,
targetPosition)
- case SqlKind.VALUES =>
- call.getOperandList.toSeq.foreach {
- case sqlCall: SqlCall => {
- if (targetPosition.nonEmpty && sqlCall.getOperandList.size() !=
targetPosition.size()) {
- throw newValidationError(call, RESOURCE.columnCountMismatch())
- }
- }
- }
- rewriterUtils.rewriteValues(call, targetRowType, assignedFields,
targetPosition)
- case kind if SqlKind.SET_QUERY.contains(kind) =>
- call.getOperandList.zipWithIndex.foreach {
- case (operand, index) => call.setOperand(index, rewrite(operand))
- }
- call
- case SqlKind.ORDER_BY =>
- val operands = call.getOperandList
- new SqlOrderBy(
- call.getParserPosition,
- rewrite(operands.get(0)),
- operands.get(1).asInstanceOf[SqlNodeList],
- operands.get(2),
- operands.get(3))
- // Not support:
- // case SqlKind.WITH =>
- // case SqlKind.EXPLICIT_TABLE =>
- case _ => throw new ValidationException(notSupported(call))
- }
+ rewriterUtils.rewriteCall(
+ rewriterUtils,
+ validator,
+ source,
+ targetRowType,
+ assignedFields,
+ targetPosition,
+ () => notSupported(source))
}
/**
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala
index 64d426971d8..b9810b09f0d 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala
@@ -18,13 +18,19 @@
package org.apache.flink.table.planner.calcite
import org.apache.flink.sql.parser.`type`.SqlMapTypeNameSpec
-import
org.apache.flink.table.planner.calcite.SqlRewriterUtils.{rewriteSqlSelect,
rewriteSqlValues}
+import org.apache.flink.table.api.ValidationException
+import
org.apache.flink.table.planner.calcite.PreValidateReWriter.{newValidationError,
notSupported}
+import
org.apache.flink.table.planner.calcite.SqlRewriterUtils.{rewriteSqlCall,
rewriteSqlSelect, rewriteSqlValues}
+import org.apache.flink.util.Preconditions.checkArgument
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.runtime.{CalciteContextException, Resources}
import org.apache.calcite.sql.`type`.SqlTypeUtil
-import org.apache.calcite.sql.{SqlCall, SqlDataTypeSpec, SqlKind, SqlNode,
SqlNodeList, SqlSelect}
+import org.apache.calcite.sql.{SqlCall, SqlDataTypeSpec, SqlKind, SqlNode,
SqlNodeList, SqlOrderBy, SqlSelect, SqlUtil}
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.validate.SqlValidatorException
+import org.apache.calcite.util.Static.RESOURCE
import java.util
import java.util.Collections
@@ -48,6 +54,24 @@ class SqlRewriterUtils(validator: FlinkCalciteSqlValidator) {
rewriteSqlValues(svalues, targetRowType, assignedFields, targetPosition)
}
+ def rewriteCall(
+ rewriterUtils: SqlRewriterUtils,
+ validator: FlinkCalciteSqlValidator,
+ call: SqlCall,
+ targetRowType: RelDataType,
+ assignedFields: util.LinkedHashMap[Integer, SqlNode],
+ targetPosition: util.List[Int],
+ unsupportedErrorMessage: () => String): SqlCall = {
+ rewriteSqlCall(
+ rewriterUtils,
+ validator,
+ call,
+ targetRowType,
+ assignedFields,
+ targetPosition,
+ unsupportedErrorMessage)
+ }
+
// This code snippet is copied from the SqlValidatorImpl.
def maybeCast(
node: SqlNode,
@@ -82,6 +106,64 @@ class SqlRewriterUtils(validator: FlinkCalciteSqlValidator)
{
}
object SqlRewriterUtils {
+ def rewriteSqlCall(
+ rewriterUtils: SqlRewriterUtils,
+ validator: FlinkCalciteSqlValidator,
+ call: SqlCall,
+ targetRowType: RelDataType,
+ assignedFields: util.LinkedHashMap[Integer, SqlNode],
+ targetPosition: util.List[Int],
+ unsupportedErrorMessage: () => String): SqlCall = {
+
+ def rewrite(node: SqlNode): SqlCall = {
+ checkArgument(node.isInstanceOf[SqlCall], node)
+ rewriteSqlCall(
+ rewriterUtils,
+ validator,
+ node.asInstanceOf[SqlCall],
+ targetRowType,
+ assignedFields,
+ targetPosition,
+ unsupportedErrorMessage)
+ }
+
+ call.getKind match {
+ case SqlKind.SELECT =>
+ val sqlSelect = call.asInstanceOf[SqlSelect]
+
+ if (targetPosition.nonEmpty && sqlSelect.getSelectList.size() !=
targetPosition.size()) {
+ throw newValidationError(call, RESOURCE.columnCountMismatch())
+ }
+ rewriterUtils.rewriteSelect(sqlSelect, targetRowType, assignedFields,
targetPosition)
+ case SqlKind.VALUES =>
+ call.getOperandList.toSeq.foreach {
+ case sqlCall: SqlCall => {
+ if (targetPosition.nonEmpty && sqlCall.getOperandList.size() !=
targetPosition.size()) {
+ throw newValidationError(call, RESOURCE.columnCountMismatch())
+ }
+ }
+ }
+ rewriterUtils.rewriteValues(call, targetRowType, assignedFields,
targetPosition)
+ case kind if SqlKind.SET_QUERY.contains(kind) =>
+ call.getOperandList.zipWithIndex.foreach {
+ case (operand, index) => call.setOperand(index, rewrite(operand))
+ }
+ call
+ case SqlKind.ORDER_BY =>
+ val operands = call.getOperandList
+ new SqlOrderBy(
+ call.getParserPosition,
+ rewrite(operands.get(0)),
+ operands.get(1).asInstanceOf[SqlNodeList],
+ operands.get(2),
+ operands.get(3))
+ // Not support:
+ // case SqlKind.WITH =>
+ // case SqlKind.EXPLICIT_TABLE =>
+ case _ => throw new ValidationException(unsupportedErrorMessage())
+ }
+ }
+
def rewriteSqlSelect(
validator: FlinkCalciteSqlValidator,
select: SqlSelect,
@@ -156,6 +238,14 @@ object SqlRewriterUtils {
SqlStdOperatorTable.VALUES.createCall(values.getParserPosition, fixedNodes)
}
+ def newValidationError(
+ node: SqlNode,
+ e: Resources.ExInst[SqlValidatorException]): CalciteContextException = {
+ assert(node != null)
+ val pos = node.getParserPosition
+ SqlUtil.newContextException(pos, e)
+ }
+
/**
* Reorder sourceList to targetPosition. For example:
* - sourceList(f0, f1, f2).
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
index e758296f2ec..6904763dbf6 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
@@ -658,6 +658,80 @@ public class SqlDdlToOperationConverterTest extends
SqlNodeToOperationConversion
"Invalid bucket key 'f3'. A bucket key for a
distribution must reference a physical column in the schema. Available columns
are: [a]");
}
+ @Test
+ public void tesCreateTableAsWithOrderingColumns() {
+ CatalogTable catalogTable =
+ CatalogTable.newBuilder()
+ .schema(
+ Schema.newBuilder()
+ .column("f0",
DataTypes.INT().notNull())
+ .column("f1", DataTypes.TIMESTAMP(3))
+ .build())
+ .build();
+
+ catalogManager.createTable(
+ catalogTable, ObjectIdentifier.of("builtin", "default",
"src1"), false);
+
+ final String sql = "create table tbl1 (f1, f0) AS SELECT * FROM src1";
+
+ Operation ctas = parseAndConvert(sql);
+ Operation operation = ((CreateTableASOperation)
ctas).getCreateTableOperation();
+ assertThat(operation)
+ .is(
+ new HamcrestCondition<>(
+ isCreateTableOperation(
+ withNoDistribution(),
+ withSchema(
+ Schema.newBuilder()
+ .column("f1",
DataTypes.TIMESTAMP(3))
+ .column("f0",
DataTypes.INT().notNull())
+ .build()))));
+ }
+
+ @Test
+ public void testCreateTableAsWithNotFoundColumnIdentifiers() {
+ CatalogTable catalogTable =
+ CatalogTable.newBuilder()
+ .schema(
+ Schema.newBuilder()
+ .column("f0",
DataTypes.INT().notNull())
+ .column("f1", DataTypes.INT())
+ .build())
+ .build();
+
+ catalogManager.createTable(
+ catalogTable, ObjectIdentifier.of("builtin", "default",
"src1"), false);
+
+ final String sql = "create table tbl1 (f1, f2) AS SELECT * FROM src1";
+
+ assertThatThrownBy(() -> parseAndConvert(sql))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("Column 'f2' not found in the source
schema.");
+ }
+
+ @Test
+ public void testCreateTableAsWithMismatchIdentifiersLength() {
+ CatalogTable catalogTable =
+ CatalogTable.newBuilder()
+ .schema(
+ Schema.newBuilder()
+ .column("f0",
DataTypes.INT().notNull())
+ .column("f1", DataTypes.INT())
+ .build())
+ .build();
+
+ catalogManager.createTable(
+ catalogTable, ObjectIdentifier.of("builtin", "default",
"src1"), false);
+
+ final String sql = "create table tbl1 (f1) AS SELECT * FROM src1";
+
+ assertThatThrownBy(() -> parseAndConvert(sql))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "The number of columns in the column list "
+ + "must match the number of columns in the
source schema.");
+ }
+
@Test
public void testCreateTableAsWithColumns() {
CatalogTable catalogTable =
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
index ee6e8bca658..78ed39bf980 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
@@ -61,6 +61,50 @@ public class SqlRTASNodeToOperationConverterTest extends
SqlNodeToOperationConve
testCommonReplaceTableAs(sql, tableName, tableComment);
}
+ @Test
+ public void testReplaceTableAsWithOrderingColumns() {
+ String tableName = "replace_table";
+ String sql =
+ "REPLACE TABLE "
+ + tableName
+ + " (a, b) WITH ('k1' = 'v1', 'k2' = 'v2') as SELECT
b, a FROM t1";
+ Schema tableSchema =
+ Schema.newBuilder()
+ .column("a", DataTypes.BIGINT().notNull())
+ .column("b", DataTypes.STRING())
+ .build();
+
+ testCommonReplaceTableAs(sql, tableName, null, tableSchema, null,
Collections.emptyList());
+ }
+
+ @Test
+ public void testReplaceTableAsWithNotFoundColumnIdentifiers() {
+ String tableName = "replace_table";
+ String sql =
+ "REPLACE TABLE "
+ + tableName
+ + " (a, d) WITH ('k1' = 'v1', 'k2' = 'v2') as SELECT
b, a FROM t1";
+
+ assertThatThrownBy(() -> parseAndConvert(sql))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("Column 'd' not found in the source
schema.");
+ }
+
+ @Test
+ public void testReplaceTableAsWithMismatchIdentifiersLength() {
+ String tableName = "replace_table";
+ String sql =
+ "REPLACE TABLE "
+ + tableName
+ + " (a) WITH ('k1' = 'v1', 'k2' = 'v2') as SELECT b, a
FROM t1";
+
+ assertThatThrownBy(() -> parseAndConvert(sql))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "The number of columns in the column list "
+ + "must match the number of columns in the
source schema.");
+ }
+
@Test
public void testCreateOrReplaceTableAs() {
String tableName = "create_or_replace_table";
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/RTASITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/RTASITCase.java
index 2ce2bc6dffe..61e1c0d087b 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/RTASITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/RTASITCase.java
@@ -188,6 +188,28 @@ class RTASITCase extends StreamingTestBase {
verifyCatalogTable(expectCatalogTable, getCatalogTable("target"));
}
+ @Test
+ void testReplaceTableAsSelectWithColumnOrdering() throws Exception {
+ tEnv().executeSql(
+ "REPLACE TABLE target"
+ + " (c, a)"
+ + " WITH ('connector' = 'values', 'bounded' =
'true')"
+ + " AS SELECT a, c FROM source")
+ .await();
+
+ // verify written rows
+
assertThat(TestValuesTableFactory.getResultsAsStrings("target").toString())
+ .isEqualTo("[" + "+I[Hi, 1], " + "+I[Hello, 2], " + "+I[Hello
world, 3]" + "]");
+
+ // verify the table after replacing
+ CatalogTable expectCatalogTable =
+ getExpectCatalogTable(
+ new String[] {"c", "a"},
+ new AbstractDataType[] {DataTypes.STRING(),
DataTypes.INT()});
+
+ verifyCatalogTable(expectCatalogTable, getCatalogTable("target"));
+ }
+
@Test
void testCreateOrReplaceTableASWithSortLimit() throws Exception {
tEnv().executeSql(
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
index 364feaad4e6..d0512adcb85 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
@@ -337,6 +337,49 @@ class TableSinkITCase(mode: StateBackendMode) extends
StreamingWithStateTestBase
" managed table relies on checkpoint to commit and the data is visible
only after commit.")
}
+ @TestTemplate
+ def testCreateTableAsSelectWithColumnOrdering(): Unit = {
+ tEnv
+ .executeSql("""
+ |CREATE TABLE MyCtasTable(votes, person)
+ | WITH (
+ | 'connector' = 'values',
+ | 'sink-insert-only' = 'true'
+ |) AS
+ | SELECT
+ | `person`,
+ | `votes`
+ | FROM
+ | src
+ |""".stripMargin)
+ .await()
+ val actual = TestValuesTableFactory.getResultsAsStrings("MyCtasTable")
+ val expected = List(
+ "+I[1, jason]",
+ "+I[1, jason]",
+ "+I[1, jason]",
+ "+I[1, jason]"
+ )
+ assertThat(actual.sorted).isEqualTo(expected.sorted)
+ // test statement set
+ val statementSet = tEnv.createStatementSet()
+ statementSet.addInsertSql("""
+ |CREATE TABLE MyCtasTableUseStatement(votes,
person)
+ | WITH (
+ | 'connector' = 'values',
+ | 'sink-insert-only' = 'true'
+ |) AS
+ | SELECT
+ | `person`,
+ | `votes`
+ | FROM
+ | src
+ |""".stripMargin)
+ statementSet.execute().await()
+ val actualUseStatement =
TestValuesTableFactory.getResultsAsStrings("MyCtasTableUseStatement")
+ assertThat(actualUseStatement.sorted).isEqualTo(expected.sorted)
+ }
+
@TestTemplate
def testCreateTableAsSelectWithNewColumnsOnly(): Unit = {
tEnv