This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c1fa29ac91a [FLINK-35708][table-planner] Allow defining a schema in
REPLACE TABLE AS (RTAS)
c1fa29ac91a is described below
commit c1fa29ac91aeef64701d8d97b13644a4267e8f8e
Author: Sergio Peña <[email protected]>
AuthorDate: Tue Aug 27 09:26:51 2024 -0500
[FLINK-35708][table-planner] Allow defining a schema in REPLACE TABLE AS
(RTAS)
---
.../flink/sql/parser/ddl/SqlReplaceTableAs.java | 27 ----
.../flink/sql/parser/FlinkSqlParserImplTest.java | 41 ++----
.../table/planner/operations/MergeTableAsUtil.java | 134 ++++++++++++++---
.../operations/SqlCreateTableConverter.java | 101 ++-----------
.../planner/operations/SqlNodeConvertContext.java | 5 +
.../operations/converters/SqlNodeConverter.java | 4 +
.../converters/SqlReplaceTableAsConverter.java | 139 ++++++++++++++---
.../SqlRTASNodeToOperationConverterTest.java | 164 +++++++++++++++++++--
.../planner/runtime/stream/sql/RTASITCase.java | 80 ++++++++++
9 files changed, 496 insertions(+), 199 deletions(-)
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java
index 5d343e8c2c6..65bb7056dbd 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java
@@ -175,33 +175,6 @@ public class SqlReplaceTableAs extends SqlCreate
implements ExtendedSqlNode {
getParserPosition(),
errorMsg + " syntax does not support temporary table
yet.");
}
-
- if (getColumnList().size() > 0) {
- throw new SqlValidateException(
- getParserPosition(),
- errorMsg + " syntax does not support to specify explicit
columns yet.");
- }
-
- if (getWatermark().isPresent()) {
- throw new SqlValidateException(
- getParserPosition(),
- errorMsg + " syntax does not support to specify explicit
watermark yet.");
- }
- if (getDistribution() != null) {
- throw new SqlValidateException(
- getParserPosition(),
- errorMsg + " syntax does not support creating distributed
tables yet.");
- }
- if (getPartitionKeyList().size() > 0) {
- throw new SqlValidateException(
- getParserPosition(),
- errorMsg + " syntax does not support to create partitioned
table yet.");
- }
- if
(getFullConstraints().stream().anyMatch(SqlTableConstraint::isPrimaryKey)) {
- throw new SqlValidateException(
- getParserPosition(),
- errorMsg + " syntax does not support primary key
constraints yet.");
- }
}
public SqlNode getAsQuery() {
diff --git
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 7e29a0548e6..abf0725c2ec 100644
---
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -2944,17 +2944,11 @@ class FlinkSqlParserImplTest extends SqlParserTest {
// test replace table as select with explicit columns
sql("REPLACE TABLE t (col1 string) WITH ('test' = 'zm') AS SELECT col1
FROM b")
- .node(
- new ValidationMatcher()
- .fails(
- "REPLACE TABLE AS SELECT syntax does
not support to specify explicit columns yet."));
+ .node(new ValidationMatcher().ok());
// test replace table as select with watermark
sql("REPLACE TABLE t (watermark FOR ts AS ts - interval '3' second)
WITH ('test' = 'zm') AS SELECT col1 FROM b")
- .node(
- new ValidationMatcher()
- .fails(
- "REPLACE TABLE AS SELECT syntax does
not support to specify explicit watermark yet."));
+ .node(new ValidationMatcher().ok());
// test replace table as select with constraints
sql("REPLACE TABLE t (PRIMARY KEY (col1)) WITH ('test' = 'zm') AS
SELECT col1 FROM b")
@@ -2973,17 +2967,11 @@ class FlinkSqlParserImplTest extends SqlParserTest {
// test replace table as select with partition key
sql("REPLACE TABLE t PARTITIONED BY(col1) WITH ('test' = 'zm') AS
SELECT col1 FROM b")
- .node(
- new ValidationMatcher()
- .fails(
- "REPLACE TABLE AS SELECT syntax does
not support to create partitioned table yet."));
+ .node(new ValidationMatcher().ok());
// test replace table as select with distribution
sql("REPLACE TABLE t DISTRIBUTED BY(col1) WITH ('test' = 'zm') AS
SELECT col1 FROM b")
- .node(
- new ValidationMatcher()
- .fails(
- "REPLACE TABLE AS SELECT syntax does
not support creating distributed tables yet."));
+ .node(new ValidationMatcher().ok());
}
@Test
@@ -3010,17 +2998,12 @@ class FlinkSqlParserImplTest extends SqlParserTest {
// test create or replace table as select with explicit columns
sql("CREATE OR REPLACE TABLE t (col1 string) WITH ('test' = 'zm') AS
SELECT col1 FROM b")
- .node(
- new ValidationMatcher()
- .fails(
- "CREATE OR REPLACE TABLE AS SELECT
syntax does not support to specify explicit columns yet."));
+ .node(new ValidationMatcher().ok());
// test create or replace table as select with watermark
sql("CREATE OR REPLACE TABLE t (watermark FOR ts AS ts - interval '3'
second) WITH ('test' = 'zm') AS SELECT col1 FROM b")
- .node(
- new ValidationMatcher()
- .fails(
- "CREATE OR REPLACE TABLE AS SELECT
syntax does not support to specify explicit watermark yet."));
+ .node(new ValidationMatcher().ok());
+
// test create or replace table as select with constraints
sql("CREATE OR REPLACE TABLE t (PRIMARY KEY (col1)) WITH ('test' =
'zm') AS SELECT col1 FROM b")
.node(
@@ -3038,16 +3021,10 @@ class FlinkSqlParserImplTest extends SqlParserTest {
// test create or replace table as select with partition key
sql("CREATE OR REPLACE TABLE t PARTITIONED BY(col1) WITH ('test' =
'zm') AS SELECT col1 FROM b")
- .node(
- new ValidationMatcher()
- .fails(
- "CREATE OR REPLACE TABLE AS SELECT
syntax does not support to create partitioned table yet."));
+ .node(new ValidationMatcher().ok());
sql("CREATE OR REPLACE TABLE t DISTRIBUTED BY(col1) WITH ('test' =
'zm') AS SELECT col1 FROM b")
- .node(
- new ValidationMatcher()
- .fails(
- "CREATE OR REPLACE TABLE AS SELECT
syntax does not support creating distributed tables yet."));
+ .node(new ValidationMatcher().ok());
}
@Test
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableAsUtil.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableAsUtil.java
index 12662c52b5a..e743e5964a2 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableAsUtil.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableAsUtil.java
@@ -18,7 +18,6 @@
package org.apache.flink.table.planner.operations;
-import org.apache.flink.sql.parser.ddl.SqlCreateTableAs;
import org.apache.flink.sql.parser.ddl.SqlTableColumn;
import org.apache.flink.sql.parser.ddl.SqlTableColumn.SqlComputedColumn;
import org.apache.flink.sql.parser.ddl.SqlTableColumn.SqlMetadataColumn;
@@ -28,22 +27,39 @@ import
org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Schema.UnresolvedColumn;
import org.apache.flink.table.api.Schema.UnresolvedPhysicalColumn;
+import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator;
+import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.calcite.SqlRewriterUtils;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.validate.SqlValidator;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import static
org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsImplicitCast;
@@ -53,7 +69,7 @@ public class MergeTableAsUtil {
private final Function<SqlNode, String> escapeExpression;
private final DataTypeFactory dataTypeFactory;
- MergeTableAsUtil(
+ public MergeTableAsUtil(
SqlValidator validator,
Function<SqlNode, String> escapeExpression,
DataTypeFactory dataTypeFactory) {
@@ -63,9 +79,90 @@ public class MergeTableAsUtil {
}
/**
- * Merges the schema part of the {@code sqlCreateTableAs} with the {@code
sourceSchema}.
+ * Rewrites the query operation to include only the fields that may be
persisted in the sink.
+ */
+ public PlannerQueryOperation maybeRewriteQuery(
+ CatalogManager catalogManager,
+ FlinkPlannerImpl flinkPlanner,
+ PlannerQueryOperation origQueryOperation,
+ SqlNode origQueryNode,
+ ResolvedCatalogTable sinkTable) {
+ FlinkCalciteSqlValidator sqlValidator =
flinkPlanner.getOrCreateSqlValidator();
+ SqlRewriterUtils rewriterUtils = new SqlRewriterUtils(sqlValidator);
+ FlinkTypeFactory typeFactory = (FlinkTypeFactory)
sqlValidator.getTypeFactory();
+
+ // Only fields that may be persisted will be included in the select
query
+ RowType sinkRowType =
+ ((RowType)
sinkTable.getResolvedSchema().toSinkRowDataType().getLogicalType());
+
+ Map<String, Integer> sourceFields =
+ IntStream.range(0,
origQueryOperation.getResolvedSchema().getColumnNames().size())
+ .boxed()
+ .collect(
+ Collectors.toMap(
+
origQueryOperation.getResolvedSchema().getColumnNames()
+ ::get,
+ Function.identity()));
+
+ // assignedFields contains the new sink fields that are not present in
the source
+ // and that will be included in the select query
+ LinkedHashMap<Integer, SqlNode> assignedFields = new LinkedHashMap<>();
+
+ // targetPositions contains the positions of the source fields that
will be
+ // included in the select query
+ List<Object> targetPositions = new ArrayList<>();
+
+ int pos = -1;
+ for (RowType.RowField targetField : sinkRowType.getFields()) {
+ pos++;
+
+ if (!sourceFields.containsKey(targetField.getName())) {
+ if (!targetField.getType().isNullable()) {
+ throw new ValidationException(
+ "Column '"
+ + targetField.getName()
+ + "' has no default value and does not
allow NULLs.");
+ }
+
+ assignedFields.put(
+ pos,
+ rewriterUtils.maybeCast(
+ SqlLiteral.createNull(SqlParserPos.ZERO),
+ typeFactory.createUnknownType(),
+
typeFactory.createFieldTypeFromLogicalType(targetField.getType()),
+ typeFactory));
+ } else {
+ targetPositions.add(sourceFields.get(targetField.getName()));
+ }
+ }
+
+ // if there are no new sink fields to include, then return the
original query
+ if (assignedFields.isEmpty()) {
+ return origQueryOperation;
+ }
+
+ // rewrite query
+ SqlCall newSelect =
+ rewriterUtils.rewriteSelect(
+ (SqlSelect) origQueryNode,
+ typeFactory.buildRelNodeRowType(sinkRowType),
+ assignedFields,
+ targetPositions);
+
+ return (PlannerQueryOperation)
+ SqlNodeToOperationConversion.convert(flinkPlanner,
catalogManager, newSelect)
+ .orElseThrow(
+ () ->
+ new TableException(
+ "Unsupported node type "
+ +
newSelect.getClass().getSimpleName()));
+ }
+
+ /**
+ * Merges the specified schema with columns, watermark, and constraints
with the {@code
+ * sourceSchema}.
*
- * <p>The schema part of the {@code CREATE TABLE} statement merged
includes:
+ * <p>The resulted schema will contain the following elements:
*
* <ul>
* <li>columns
@@ -75,16 +172,19 @@ public class MergeTableAsUtil {
* <li>primary key
* </ul>
*
- * <p>It is expected that the {@code sourceSchema} contains only
physical/regular columns, which
- * is behavior of the CTAS statement to generate such schema.
+ * <p>It is expected that the {@code sourceSchema} contains only
physical/regular columns.
*
- * <p>Columns of {@code sourceSchema} are appended to the schema of {@code
sqlCreateTableAs}. If
- * a column in the {@code sqlCreateTableAs} is already defined in {@code
sourceSchema}, then the
- * types of the columns are implicit cast and must be compatible based on
the implicit cast
- * rules. If they're compatible, then the column position in the schema
stays the same as
- * defined in the appended {@code sourceSchema}.
+ * <p>Columns of the {@code sourceSchema} are appended to the schema
columns defined in the
+ * {@code sqlColumnList}. If a column in the {@code sqlColumnList} is
already defined in the
+ * {@code sourceSchema}, then the types of the columns are implicit cast
and must be compatible
+ * based on the implicit cast rules. If they're compatible, then the
column position in the
+ * schema stays the same as defined in the appended {@code sourceSchema}.
*/
- public Schema mergeSchemas(SqlCreateTableAs sqlCreateTableAs,
ResolvedSchema sourceSchema) {
+ public Schema mergeSchemas(
+ SqlNodeList sqlColumnList,
+ @Nullable SqlWatermark sqlWatermark,
+ List<SqlTableConstraint> sqlTableConstraints,
+ ResolvedSchema sourceSchema) {
SchemaBuilder schemaBuilder =
new SchemaBuilder(
(FlinkTypeFactory) validator.getTypeFactory(),
@@ -93,19 +193,17 @@ public class MergeTableAsUtil {
escapeExpression);
schemaBuilder.mergeColumns(
- sqlCreateTableAs.getColumnList(),
+ sqlColumnList,
Schema.newBuilder().fromResolvedSchema(sourceSchema).build().getColumns());
- if (sqlCreateTableAs.getWatermark().isPresent()) {
- schemaBuilder.setWatermark(sqlCreateTableAs.getWatermark().get());
+ if (sqlWatermark != null) {
+ schemaBuilder.setWatermark(sqlWatermark);
}
// It is assumed only a primary key constraint may be defined in the
table. The
// SqlCreateTableAs has validations to ensure this before the object
is created.
Optional<SqlTableConstraint> primaryKey =
- sqlCreateTableAs.getFullConstraints().stream()
- .filter(SqlTableConstraint::isPrimaryKey)
- .findAny();
+
sqlTableConstraints.stream().filter(SqlTableConstraint::isPrimaryKey).findAny();
if (primaryKey.isPresent()) {
schemaBuilder.setPrimaryKey(primaryKey.get());
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
index 2a7c1044456..037fc4cde92 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
@@ -43,21 +43,13 @@ import
org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.calcite.SqlRewriterUtils;
import org.apache.flink.table.planner.utils.OperationConverterUtils;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.RowType.RowField;
-import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlIdentifier;
-import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
-import org.apache.calcite.sql.SqlSelect;
-import org.apache.calcite.sql.parser.SqlParserPos;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@@ -65,7 +57,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
-import java.util.stream.IntStream;
/** Helper class for converting {@link SqlCreateTable} to {@link
CreateTableOperation}. */
class SqlCreateTableConverter {
@@ -130,8 +121,12 @@ class SqlCreateTableConverter {
// If needed, rewrite the query to include the new sink fields in the
select list
query =
- maybeRewriteCreateTableAsQuery(
- flinkPlanner, sqlCreateTableAs,
tableWithResolvedSchema, query);
+ mergeTableAsUtil.maybeRewriteQuery(
+ catalogManager,
+ flinkPlanner,
+ query,
+ sqlCreateTableAs.getAsQuery(),
+ tableWithResolvedSchema);
CreateTableOperation createTableOperation =
new CreateTableOperation(
@@ -144,83 +139,6 @@ class SqlCreateTableConverter {
createTableOperation, Collections.emptyMap(), query, false);
}
- /**
- * Builds and returns a new Query operation with new sink fields declared
in the {@code
- * sinkTable}.
- */
- private PlannerQueryOperation maybeRewriteCreateTableAsQuery(
- FlinkPlannerImpl flinkPlanner,
- SqlCreateTableAs sqlCreateTableAs,
- ResolvedCatalogTable sinkTable,
- PlannerQueryOperation query) {
-
- // Only fields that may be persisted will be included in the select
query
- RowType sinkRowType =
- ((RowType)
sinkTable.getResolvedSchema().toSinkRowDataType().getLogicalType());
-
- Map<String, Integer> sourceFields =
- IntStream.range(0,
query.getResolvedSchema().getColumnNames().size())
- .boxed()
- .collect(
- Collectors.toMap(
-
query.getResolvedSchema().getColumnNames()::get,
- Function.identity()));
-
- // assignedFields contains the new sink fields that are not present in
the source
- // and that will be included in the select query
- LinkedHashMap<Integer, SqlNode> assignedFields = new LinkedHashMap<>();
-
- // targetPositions contains the positions of the source fields that
will be
- // included in the select query
- List<Object> targetPositions = new ArrayList<>();
-
- int pos = -1;
- for (RowField targetField : sinkRowType.getFields()) {
- pos++;
-
- if (!sourceFields.containsKey(targetField.getName())) {
- if (!targetField.getType().isNullable()) {
- throw new ValidationException(
- "Column '"
- + targetField.getName()
- + "' has "
- + "no default value and does not allow
NULLs.");
- }
-
- assignedFields.put(
- pos,
- rewriterUtils.maybeCast(
- SqlLiteral.createNull(SqlParserPos.ZERO),
- typeFactory.createUnknownType(),
-
typeFactory.createFieldTypeFromLogicalType(targetField.getType()),
- typeFactory));
- } else {
- targetPositions.add(sourceFields.get(targetField.getName()));
- }
- }
-
- // if there are no new sink fields to include, then return the
original query
- if (assignedFields.isEmpty()) {
- return query;
- }
-
- // rewrite query
- SqlCall newSelect =
- rewriterUtils.rewriteSelect(
- (SqlSelect) sqlCreateTableAs.getAsQuery(),
- typeFactory.buildRelNodeRowType(sinkRowType),
- assignedFields,
- targetPositions);
-
- return (PlannerQueryOperation)
- SqlNodeToOperationConversion.convert(flinkPlanner,
catalogManager, newSelect)
- .orElseThrow(
- () ->
- new TableException(
- "CTAS unsupported node type "
- +
newSelect.getClass().getSimpleName()));
- }
-
private ResolvedCatalogTable createCatalogTable(
SqlCreateTableAs sqlCreateTableAs, ResolvedSchema mergeSchema) {
Map<String, String> tableOptions =
@@ -233,7 +151,12 @@ class SqlCreateTableConverter {
String tableComment =
OperationConverterUtils.getTableComment(sqlCreateTableAs.getComment());
- Schema mergedSchema = mergeTableAsUtil.mergeSchemas(sqlCreateTableAs,
mergeSchema);
+ Schema mergedSchema =
+ mergeTableAsUtil.mergeSchemas(
+ sqlCreateTableAs.getColumnList(),
+ sqlCreateTableAs.getWatermark().orElse(null),
+ sqlCreateTableAs.getFullConstraints(),
+ mergeSchema);
Optional<TableDistribution> tableDistribution =
Optional.ofNullable(sqlCreateTableAs.getDistribution())
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java
index 2958333caba..687a8ce483a 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java
@@ -72,6 +72,11 @@ public class SqlNodeConvertContext implements
SqlNodeConverter.ConvertContext {
return catalogManager;
}
+ @Override
+ public FlinkPlannerImpl getFlinkPlanner() {
+ return flinkPlanner;
+ }
+
@Override
public RelRoot toRelRoot(SqlNode sqlNode) {
return flinkPlanner.rel(sqlNode);
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java
index cdd9d860097..2c54d78c516 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java
@@ -22,6 +22,7 @@ import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.utils.Expander;
import org.apache.flink.table.types.DataType;
@@ -83,6 +84,9 @@ public interface SqlNodeConverter<S extends SqlNode> {
/** Returns the {@link CatalogManager} in the convert context. */
CatalogManager getCatalogManager();
+ /** Returns the {@link FlinkPlannerImpl} in the convert context. */
+ FlinkPlannerImpl getFlinkPlanner();
+
/** Converts the given validated {@link SqlNode} into a {@link
RelRoot}. */
RelRoot toRelRoot(SqlNode sqlNode);
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java
index 83b10eb97d5..cf4d96cf26f 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java
@@ -21,22 +21,34 @@ package
org.apache.flink.table.planner.operations.converters;
import org.apache.flink.sql.parser.ddl.SqlReplaceTableAs;
import org.apache.flink.sql.parser.ddl.SqlTableOption;
import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableDistribution;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.operations.Operation;
-import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.ReplaceTableAsOperation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
+import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
+import org.apache.flink.table.planner.operations.MergeTableAsUtil;
import org.apache.flink.table.planner.operations.PlannerQueryOperation;
+import org.apache.flink.table.planner.operations.SqlNodeToOperationConversion;
import org.apache.flink.table.planner.utils.OperationConverterUtils;
-import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNodeList;
-import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
/** A converter for {@link SqlReplaceTableAs}. */
public class SqlReplaceTableAsConverter implements
SqlNodeConverter<SqlReplaceTableAs> {
@@ -47,13 +59,60 @@ public class SqlReplaceTableAsConverter implements
SqlNodeConverter<SqlReplaceTa
UnresolvedIdentifier unresolvedIdentifier =
UnresolvedIdentifier.of(sqlReplaceTableAs.fullTableName());
ObjectIdentifier identifier =
catalogManager.qualifyIdentifier(unresolvedIdentifier);
+ FlinkPlannerImpl flinkPlanner = context.getFlinkPlanner();
- SqlNode asQuerySqlNode = sqlReplaceTableAs.getAsQuery();
- SqlNode validated = context.getSqlValidator().validate(asQuerySqlNode);
- QueryOperation query =
- new PlannerQueryOperation(
- context.toRelRoot(validated).project(),
- () -> context.toQuotedSqlString(asQuerySqlNode));
+ MergeTableAsUtil mergeTableAsUtil =
+ new MergeTableAsUtil(
+ context.getSqlValidator(),
+ sqlNode -> sqlNode.toString(),
+ catalogManager.getDataTypeFactory());
+
+ PlannerQueryOperation query =
+ (PlannerQueryOperation)
+ SqlNodeToOperationConversion.convert(
+ flinkPlanner,
+ catalogManager,
+ sqlReplaceTableAs.getAsQuery())
+ .orElseThrow(
+ () ->
+ new TableException(
+ "RTAS unsupported node
type "
+ +
sqlReplaceTableAs
+
.getAsQuery()
+
.getClass()
+
.getSimpleName()));
+
+ // get table
+ ResolvedCatalogTable tableWithResolvedSchema =
+ createCatalogTable(
+ context, mergeTableAsUtil, sqlReplaceTableAs,
query.getResolvedSchema());
+
+ // If needed, rewrite the query to include the new sink fields in the
select list
+ query =
+ mergeTableAsUtil.maybeRewriteQuery(
+ context.getCatalogManager(),
+ flinkPlanner,
+ query,
+ sqlReplaceTableAs.getAsQuery(),
+ tableWithResolvedSchema);
+
+ CreateTableOperation createTableOperation =
+ new CreateTableOperation(
+ identifier,
+ tableWithResolvedSchema,
+ sqlReplaceTableAs.isIfNotExists(),
+ sqlReplaceTableAs.isTemporary());
+
+ return new ReplaceTableAsOperation(
+ createTableOperation, query,
sqlReplaceTableAs.isCreateOrReplace());
+ }
+
+ private ResolvedCatalogTable createCatalogTable(
+ ConvertContext context,
+ MergeTableAsUtil mergeTableAsUtil,
+ SqlReplaceTableAs sqlReplaceTableAs,
+ ResolvedSchema mergeSchema) {
+ CatalogManager catalogManager = context.getCatalogManager();
// get table comment
String tableComment =
@@ -70,22 +129,56 @@ public class SqlReplaceTableAsConverter implements
SqlNodeConverter<SqlReplaceTa
((SqlTableOption) p).getKeyString(),
((SqlTableOption)
p).getValueString()));
- // get table
+ // merge schemas
+ Schema mergedSchema =
+ mergeTableAsUtil.mergeSchemas(
+ sqlReplaceTableAs.getColumnList(),
+ sqlReplaceTableAs.getWatermark().orElse(null),
+ sqlReplaceTableAs.getFullConstraints(),
+ mergeSchema);
+
+ // get distribution
+ Optional<TableDistribution> tableDistribution =
+ Optional.ofNullable(sqlReplaceTableAs.getDistribution())
+
.map(OperationConverterUtils::getDistributionFromSqlDistribution);
+
+ // get partition key
+ List<String> partitionKeys =
+
getPartitionKeyColumnNames(sqlReplaceTableAs.getPartitionKeyList());
+ verifyPartitioningColumnsExist(mergedSchema, partitionKeys);
+
CatalogTable catalogTable =
- CatalogTable.of(
-
Schema.newBuilder().fromResolvedSchema(query.getResolvedSchema()).build(),
- tableComment,
- Collections.emptyList(),
- properties);
+ CatalogTable.newBuilder()
+ .schema(mergedSchema)
+ .comment(tableComment)
+ .distribution(tableDistribution.orElse(null))
+ .options(properties)
+ .partitionKeys(partitionKeys)
+ .build();
- CreateTableOperation createTableOperation =
- new CreateTableOperation(
- identifier,
- catalogTable,
- sqlReplaceTableAs.isIfNotExists(),
- sqlReplaceTableAs.isTemporary());
+ return catalogManager.resolveCatalogTable(catalogTable);
+ }
- return new ReplaceTableAsOperation(
- createTableOperation, query,
sqlReplaceTableAs.isCreateOrReplace());
+ private List<String> getPartitionKeyColumnNames(SqlNodeList partitionKey) {
+ return partitionKey.getList().stream()
+ .map(p -> ((SqlIdentifier) p).getSimple())
+ .collect(Collectors.toList());
+ }
+
+ private void verifyPartitioningColumnsExist(Schema mergedSchema,
List<String> partitionKeys) {
+ Set<String> columnNames =
+ mergedSchema.getColumns().stream()
+ .map(Schema.UnresolvedColumn::getName)
+ .collect(Collectors.toCollection(LinkedHashSet::new));
+ for (String partitionKey : partitionKeys) {
+ if (!columnNames.contains(partitionKey)) {
+ throw new ValidationException(
+ String.format(
+ "Partition column '%s' not defined in the
table schema. Available columns: [%s]",
+ partitionKey,
+ columnNames.stream()
+ .collect(Collectors.joining("', '",
"'", "'"))));
+ }
+ }
}
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
index cc7b5a3f1af..ee6e8bca658 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
@@ -21,8 +21,10 @@ package org.apache.flink.table.planner.operations;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.TableDistribution;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ReplaceTableAsOperation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
@@ -37,15 +39,17 @@ import javax.annotation.Nullable;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test base for testing convert [CREATE OR] REPLACE TABLE AS statement to
operation. */
public class SqlRTASNodeToOperationConverterTest extends
SqlNodeToOperationConversionTestBase {
@Test
- public void testReplaceTableAS() {
+ public void testReplaceTableAs() {
String tableName = "replace_table";
String tableComment = "test table comment 表描述";
String sql =
@@ -58,7 +62,7 @@ public class SqlRTASNodeToOperationConverterTest extends
SqlNodeToOperationConve
}
@Test
- public void testCreateOrReplaceTableAS() {
+ public void testCreateOrReplaceTableAs() {
String tableName = "create_or_replace_table";
String sql =
"CREATE OR REPLACE TABLE "
@@ -68,25 +72,163 @@ public class SqlRTASNodeToOperationConverterTest extends
SqlNodeToOperationConve
}
@Test
- public void testCreateOrReplaceTableASWithLimit() {
+ public void testCreateOrReplaceTableAsWithColumns() {
String tableName = "create_or_replace_table";
String sql =
"CREATE OR REPLACE TABLE "
+ tableName
- + " WITH ('k1' = 'v1', 'k2' = 'v2') as (SELECT * FROM
t1 LIMIT 5)";
- testCommonReplaceTableAs(sql, tableName, null);
+ + "(c0 int, c1 double metadata, c2 as c0 * a) "
+ + " WITH ('k1' = 'v1', 'k2' = 'v2') as SELECT * FROM
t1";
+ Schema tableSchema =
+ Schema.newBuilder()
+ .column("c0", DataTypes.INT())
+ .columnByMetadata("c1", DataTypes.DOUBLE())
+ .columnByExpression("c2", "`c0` * `a`")
+ .fromSchema(getDefaultTableSchema())
+ .build();
+
+ testCommonReplaceTableAs(sql, tableName, null, tableSchema, null,
Collections.emptyList());
+ }
+
+ @Test
+ public void testCreateOrReplaceTableAsWithColumnsOverridden() {
+ String tableName = "create_or_replace_table";
+ String sql =
+ "CREATE OR REPLACE TABLE "
+ + tableName
+ + "(c0 int, a double, c int) "
+ + " WITH ('k1' = 'v1', 'k2' = 'v2') as SELECT * FROM
t1";
+ Schema tableSchema =
+ Schema.newBuilder()
+ .column("c0", DataTypes.INT())
+ .column("a", DataTypes.DOUBLE())
+ .column("b", DataTypes.STRING())
+ .column("c", DataTypes.INT())
+ .column("d", DataTypes.STRING())
+ .build();
+
+ testCommonReplaceTableAs(sql, tableName, null, tableSchema, null,
Collections.emptyList());
+ }
+
+ @Test
+ public void testCreateOrReplaceTableAsWithNotNullColumnsAreNotAllowed() {
+ String tableName = "create_or_replace_table";
+ String sql =
+ "CREATE OR REPLACE TABLE "
+ + tableName
+ + "(c0 int not null) "
+ + " WITH ('k1' = 'v1', 'k2' = 'v2') as SELECT * FROM
t1";
+
+ assertThatThrownBy(() -> parseAndConvert(sql))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("Column 'c0' has no default value and
does not allow NULLs.");
+ }
+
+ @Test
+ public void testCreateOrReplaceTableAsWithIncompatibleImplicitCastTypes() {
+ String tableName = "create_or_replace_table";
+ String sql =
+ "CREATE OR REPLACE TABLE "
+ + tableName
+ + "(a boolean) "
+ + " WITH ('k1' = 'v1', 'k2' = 'v2') as SELECT * FROM
t1";
+
+ assertThatThrownBy(() -> parseAndConvert(sql))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "Incompatible types for sink column 'a' at position 0.
"
+ + "The source column has type 'BIGINT NOT
NULL', while the target "
+ + "column has type 'BOOLEAN'.");
+ }
+
+ @Test
+ public void testCreateOrReplaceTableAsWithDistribution() {
+ String tableName = "create_or_replace_table";
+ String sql =
+ "CREATE OR REPLACE TABLE "
+ + tableName
+ + " DISTRIBUTED BY HASH(b) INTO 2 BUCKETS "
+ + " WITH ('k1' = 'v1', 'k2' = 'v2') as SELECT * FROM
t1";
+ Schema tableSchema =
Schema.newBuilder().fromSchema(getDefaultTableSchema()).build();
+
+ testCommonReplaceTableAs(
+ sql,
+ tableName,
+ null,
+ tableSchema,
+ TableDistribution.ofHash(Collections.singletonList("b"), 2),
+ Collections.emptyList());
+ }
+
+ @Test
+ public void testCreateOrReplaceTableAsWithPrimaryKey() {
+ String tableName = "create_or_replace_table";
+ String sql =
+ "CREATE OR REPLACE TABLE "
+ + tableName
+ + "(PRIMARY KEY (a) NOT ENFORCED) "
+ + " WITH ('k1' = 'v1', 'k2' = 'v2') as SELECT * FROM
t1";
+ Schema tableSchema =
+ Schema.newBuilder()
+ .column("a", DataTypes.BIGINT().notNull())
+ .column("b", DataTypes.STRING())
+ .column("c", DataTypes.INT())
+ .column("d", DataTypes.STRING())
+ .primaryKey("a")
+ .build();
+
+ testCommonReplaceTableAs(sql, tableName, null, tableSchema, null,
Collections.emptyList());
+ }
+
+ @Test
+ public void testCreateOrReplaceTableAsWithWatermark() {
+ String tableName = "create_or_replace_table";
+ String sql =
+ "CREATE OR REPLACE TABLE "
+ + tableName
+ + "(c0 TIMESTAMP(3), WATERMARK FOR c0 AS c0 - INTERVAL
'3' SECOND)"
+ + " WITH ('k1' = 'v1', 'k2' = 'v2') as SELECT * FROM
t1";
+ Schema tableSchema =
+ Schema.newBuilder()
+ .column("c0", DataTypes.TIMESTAMP(3))
+ .column("a", DataTypes.BIGINT().notNull())
+ .column("b", DataTypes.STRING())
+ .column("c", DataTypes.INT())
+ .column("d", DataTypes.STRING())
+ .watermark("c0", "`c0` - INTERVAL '3' SECOND")
+ .build();
+
+ testCommonReplaceTableAs(sql, tableName, null, tableSchema, null,
Collections.emptyList());
}
private void testCommonReplaceTableAs(
String sql, String tableName, @Nullable String tableComment) {
+ testCommonReplaceTableAs(
+ sql,
+ tableName,
+ tableComment,
+ getDefaultTableSchema(),
+ null,
+ Collections.emptyList());
+ }
+
+ private void testCommonReplaceTableAs(
+ String sql,
+ String tableName,
+ @Nullable String tableComment,
+ Schema tableSchema,
+ @Nullable TableDistribution distribution,
+ List<String> partitionKey) {
ObjectIdentifier expectedIdentifier = ObjectIdentifier.of("builtin",
"default", tableName);
Operation operation = parseAndConvert(sql);
CatalogTable expectedCatalogTable =
- CatalogTable.of(
- getDefaultTableSchema(),
- tableComment,
- Collections.emptyList(),
- getDefaultTableOptions());
+ CatalogTable.newBuilder()
+ .schema(tableSchema)
+ .comment(tableComment)
+ .distribution(distribution)
+ .options(getDefaultTableOptions())
+ .partitionKeys(partitionKey)
+ .build();
verifyReplaceTableAsOperation(operation, expectedIdentifier,
expectedCatalogTable);
}
@@ -122,6 +264,8 @@ public class SqlRTASNodeToOperationConverterTest extends
SqlNodeToOperationConve
assertThat(actualCatalogTable.getPartitionKeys())
.isEqualTo(expectedCatalogTable.getPartitionKeys());
assertThat(actualCatalogTable.getOptions()).isEqualTo(expectedCatalogTable.getOptions());
+ assertThat(actualCatalogTable.getDistribution())
+ .isEqualTo(expectedCatalogTable.getDistribution());
}
private Map<String, String> getDefaultTableOptions() {
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/RTASITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/RTASITCase.java
index f720dde7eb7..2ce2bc6dffe 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/RTASITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/RTASITCase.java
@@ -108,6 +108,86 @@ class RTASITCase extends StreamingTestBase {
verifyCatalogTable(expectCatalogTable, getCatalogTable("target"));
}
+ @Test
+ void testCreateOrReplaceTableASWithNewColumnsOnly() throws Exception {
+ tEnv().executeSql(
+ "CREATE OR REPLACE TABLE target"
+ + " (`p1` INT, `p2` STRING)"
+ + " WITH ('connector' = 'values', 'bounded' =
'true')"
+ + " AS SELECT a, c FROM source")
+ .await();
+
+ // verify written rows
+
assertThat(TestValuesTableFactory.getResultsAsStrings("target").toString())
+ .isEqualTo(
+ "["
+ + "+I[null, null, 1, Hi], "
+ + "+I[null, null, 2, Hello], "
+ + "+I[null, null, 3, Hello world]"
+ + "]");
+
+ // verify the table after replacing
+ CatalogTable expectCatalogTable =
+ getExpectCatalogTable(
+ new String[] {"p1", "p2", "a", "c"},
+ new AbstractDataType[] {
+ DataTypes.INT(), DataTypes.STRING(),
DataTypes.INT(), DataTypes.STRING()
+ });
+
+ verifyCatalogTable(expectCatalogTable, getCatalogTable("target"));
+ }
+
+ @Test
+ void testCreateOrReplaceTableAsSelectWithColumnsFromQueryOnly() throws
Exception {
+ tEnv().executeSql(
+ "CREATE OR REPLACE TABLE target"
+ + " (`a` DOUBLE, `c` STRING)"
+ + " WITH ('connector' = 'values', 'bounded' =
'true')"
+ + " AS SELECT a, c FROM source")
+ .await();
+
+ // verify written rows
+
assertThat(TestValuesTableFactory.getResultsAsStrings("target").toString())
+ .isEqualTo("[+I[1.0, Hi], +I[2.0, Hello], +I[3.0, Hello
world]]");
+
+ // verify the table after replacing
+ CatalogTable expectCatalogTable =
+ getExpectCatalogTable(
+ new String[] {"a", "c"},
+ new AbstractDataType[] {DataTypes.DOUBLE(),
DataTypes.STRING()});
+
+ verifyCatalogTable(expectCatalogTable, getCatalogTable("target"));
+ }
+
+ @Test
+ void testCreateOrReplaceTableAsSelectWithMixOfNewColumnsAndQueryColumns()
throws Exception {
+ tEnv().executeSql(
+ "CREATE OR REPLACE TABLE target"
+ + " (`p1` INT, `a` DOUBLE)"
+ + " WITH ('connector' = 'values', 'bounded' =
'true')"
+ + " AS SELECT a, c FROM source")
+ .await();
+
+ // verify written rows
+
assertThat(TestValuesTableFactory.getResultsAsStrings("target").toString())
+ .isEqualTo(
+ "["
+ + "+I[null, 1.0, Hi], "
+ + "+I[null, 2.0, Hello], "
+ + "+I[null, 3.0, Hello world]"
+ + "]");
+
+ // verify the table after replacing
+ CatalogTable expectCatalogTable =
+ getExpectCatalogTable(
+ new String[] {"p1", "a", "c"},
+ new AbstractDataType[] {
+ DataTypes.INT(), DataTypes.DOUBLE(),
DataTypes.STRING()
+ });
+
+ verifyCatalogTable(expectCatalogTable, getCatalogTable("target"));
+ }
+
@Test
void testCreateOrReplaceTableASWithSortLimit() throws Exception {
tEnv().executeSql(