This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 538c24c8bf4 [FLINK-31499][table] Move `SqlCreateTable` conversion
logic to `SqlCreateTableConverter` and make it more generic
538c24c8bf4 is described below
commit 538c24c8bf490eb8870c4d39c8ac5b204d3613ee
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Mon Sep 29 10:50:18 2025 +0200
[FLINK-31499][table] Move `SqlCreateTable` conversion logic to
`SqlCreateTableConverter` and make it more generic
---
.../flink/sql/parser/ddl/SqlCreateTable.java | 16 +-
.../flink/sql/parser/ddl/SqlCreateTableAs.java | 3 +-
.../flink/sql/parser/ddl/SqlCreateTableLike.java | 3 +-
.../flink/sql/parser/ddl/SqlReplaceTableAs.java | 117 ++-----
.../operations/SqlCreateTableConverter.java | 359 ---------------------
.../operations/SqlNodeToOperationConversion.java | 40 +--
.../converters/AbstractCreateTableConverter.java | 131 ++++++++
.../{ => converters}/MergeTableAsUtil.java | 20 +-
.../{ => converters}/MergeTableLikeUtil.java | 9 +-
.../{ => converters}/SchemaBuilderUtil.java | 7 +-
.../SqlAlterMaterializedTableResumeConverter.java | 15 +-
.../converters/SqlAlterModelSetConverter.java | 2 +-
.../SqlAlterTableAddPartitionConverter.java | 3 +-
.../SqlAlterViewPropertiesConverter.java | 2 +-
.../converters/SqlCreateCatalogConverter.java | 13 +-
.../SqlCreateMaterializedTableConverter.java | 25 +-
.../converters/SqlCreateModelConverter.java | 1 -
.../converters/SqlCreateTableAsConverter.java | 121 +++++++
.../converters/SqlCreateTableConverter.java | 86 +++++
.../converters/SqlCreateTableLikeConverter.java | 140 ++++++++
.../converters/SqlCreateViewConverter.java | 3 +-
.../operations/converters/SqlNodeConverters.java | 3 +
.../converters/SqlReplaceTableAsConverter.java | 152 +++------
.../planner/utils/OperationConverterUtils.java | 10 +-
.../{ => converters}/MergeTableLikeUtilTest.java | 2 +-
25 files changed, 621 insertions(+), 662 deletions(-)
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
index 50e36baf712..d02f696c1fb 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
@@ -63,10 +63,6 @@ public class SqlCreateTable extends SqlCreate implements
ExtendedSqlNode {
private final List<SqlTableConstraint> tableConstraints;
- public SqlDistribution getDistribution() {
- return distribution;
- }
-
private final SqlDistribution distribution;
private final SqlNodeList partitionKeyList;
@@ -101,7 +97,8 @@ public class SqlCreateTable extends SqlCreate implements
ExtendedSqlNode {
watermark,
comment,
isTemporary,
- ifNotExists);
+ ifNotExists,
+ false);
}
protected SqlCreateTable(
@@ -116,8 +113,9 @@ public class SqlCreateTable extends SqlCreate implements
ExtendedSqlNode {
@Nullable SqlWatermark watermark,
@Nullable SqlCharStringLiteral comment,
boolean isTemporary,
- boolean ifNotExists) {
- super(operator, pos, false, ifNotExists);
+ boolean ifNotExists,
+ boolean replace) {
+ super(operator, pos, replace, ifNotExists);
this.tableName = requireNonNull(tableName, "tableName should not be
null");
this.columnList = requireNonNull(columnList, "columnList should not be
null");
this.tableConstraints =
@@ -156,6 +154,10 @@ public class SqlCreateTable extends SqlCreate implements
ExtendedSqlNode {
return columnList;
}
+ public final SqlDistribution getDistribution() {
+ return distribution;
+ }
+
public SqlNodeList getPropertyList() {
return propertyList;
}
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableAs.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableAs.java
index c48fbfdf19e..4ee67b8c115 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableAs.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableAs.java
@@ -98,7 +98,8 @@ public class SqlCreateTableAs extends SqlCreateTable {
watermark,
comment,
isTemporary,
- ifNotExists);
+ ifNotExists,
+ false);
this.asQuery =
requireNonNull(asQuery, "As clause is required for CREATE
TABLE AS SELECT DDL");
}
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableLike.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableLike.java
index 599a93829d4..f7fa4306258 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableLike.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableLike.java
@@ -100,7 +100,8 @@ public class SqlCreateTableLike extends SqlCreateTable {
watermark,
comment,
isTemporary,
- ifNotExists);
+ ifNotExists,
+ false);
this.tableLike =
requireNonNull(tableLike, "LIKE clause is required for CREATE
TABLE LIKE DDL");
}
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java
index 47d7b78b211..03c38d396b0 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java
@@ -25,7 +25,6 @@ import
org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
import org.apache.flink.sql.parser.error.SqlValidateException;
import org.apache.calcite.sql.SqlCharStringLiteral;
-import org.apache.calcite.sql.SqlCreate;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
@@ -39,9 +38,6 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.List;
-import java.util.Optional;
-
-import static java.util.Objects.requireNonNull;
/**
* {@link SqlNode} to describe the [CREATE OR] REPLACE TABLE AS (RTAS) syntax.
The RTAS would create
@@ -73,7 +69,7 @@ import static java.util.Objects.requireNonNull;
* AS SELECT * FROM base_table;
* }</pre>
*/
-public class SqlReplaceTableAs extends SqlCreate implements ExtendedSqlNode {
+public class SqlReplaceTableAs extends SqlCreateTable implements
ExtendedSqlNode {
public static final SqlSpecialOperator REPLACE_OPERATOR =
new SqlSpecialOperator("REPLACE TABLE AS", SqlKind.OTHER_DDL);
@@ -81,28 +77,6 @@ public class SqlReplaceTableAs extends SqlCreate implements
ExtendedSqlNode {
public static final SqlSpecialOperator CREATE_OR_REPLACE_OPERATOR =
new SqlSpecialOperator("CREATE OR REPLACE TABLE AS",
SqlKind.OTHER_DDL);
- private final SqlIdentifier tableName;
-
- private final SqlNodeList columnList;
-
- private final SqlNodeList propertyList;
-
- private final List<SqlTableConstraint> tableConstraints;
-
- public SqlDistribution getDistribution() {
- return distribution;
- }
-
- private final SqlDistribution distribution;
-
- private final SqlNodeList partitionKeyList;
-
- private final SqlWatermark watermark;
-
- private final SqlCharStringLiteral comment;
-
- private final boolean isTemporary;
-
private final boolean isCreateOrReplace;
private final SqlNode asQuery;
@@ -124,20 +98,17 @@ public class SqlReplaceTableAs extends SqlCreate
implements ExtendedSqlNode {
super(
isCreateOrReplace ? CREATE_OR_REPLACE_OPERATOR :
REPLACE_OPERATOR,
pos,
- true,
- ifNotExists);
-
- this.tableName = requireNonNull(tableName, "tableName should not be
null");
- this.columnList = requireNonNull(columnList, "columnList should not be
null");
- this.tableConstraints =
- requireNonNull(tableConstraints, "table constraints should not
be null");
- this.propertyList = requireNonNull(propertyList, "propertyList should
not be null");
- this.distribution = distribution;
- this.partitionKeyList =
- requireNonNull(partitionKeyList, "partitionKeyList should not
be null");
- this.watermark = watermark;
- this.comment = comment;
- this.isTemporary = isTemporary;
+ tableName,
+ columnList,
+ tableConstraints,
+ propertyList,
+ distribution,
+ partitionKeyList,
+ watermark,
+ comment,
+ isTemporary,
+ ifNotExists,
+ true);
this.asQuery = asQuery;
this.isCreateOrReplace = isCreateOrReplace;
@@ -146,20 +117,21 @@ public class SqlReplaceTableAs extends SqlCreate
implements ExtendedSqlNode {
@Override
public @Nonnull List<SqlNode> getOperandList() {
return ImmutableNullableList.of(
- tableName,
- columnList,
- new SqlNodeList(tableConstraints, SqlParserPos.ZERO),
- propertyList,
- partitionKeyList,
- watermark,
- comment,
+ getTableName(),
+ getColumnList(),
+ new SqlNodeList(getTableConstraints(), SqlParserPos.ZERO),
+ getPropertyList(),
+ getPartitionKeyList(),
+ getWatermark().get(),
+ getComment().get(),
asQuery);
}
@Override
public void validate() throws SqlValidateException {
if (!isSchemaWithColumnsIdentifiersOnly()) {
-
SqlConstraintValidator.validateAndChangeColumnNullability(tableConstraints,
columnList);
+ SqlConstraintValidator.validateAndChangeColumnNullability(
+ getTableConstraints(), getColumnList());
}
// The following features are not currently supported by RTAS, but may
be supported in the
@@ -188,52 +160,17 @@ public class SqlReplaceTableAs extends SqlCreate
implements ExtendedSqlNode {
return isCreateOrReplace;
}
- public SqlIdentifier getTableName() {
- return tableName;
- }
-
- public SqlNodeList getColumnList() {
- return columnList;
- }
-
- public SqlNodeList getPropertyList() {
- return propertyList;
- }
-
- public SqlNodeList getPartitionKeyList() {
- return partitionKeyList;
- }
-
- public List<SqlTableConstraint> getTableConstraints() {
- return tableConstraints;
- }
-
- public Optional<SqlWatermark> getWatermark() {
- return Optional.ofNullable(watermark);
- }
-
- public Optional<SqlCharStringLiteral> getComment() {
- return Optional.ofNullable(comment);
- }
-
- public boolean isIfNotExists() {
- return ifNotExists;
- }
-
- public boolean isTemporary() {
- return isTemporary;
- }
-
public boolean isSchemaWithColumnsIdentifiersOnly() {
// REPLACE table supports passing only column identifiers in the
column list. If
// the first column in the list is an identifier, then we assume the
rest of the
// columns are identifiers as well.
+ SqlNodeList columnList = getColumnList();
return !columnList.isEmpty() && columnList.get(0) instanceof
SqlIdentifier;
}
/** Returns the column constraints plus the table constraints. */
public List<SqlTableConstraint> getFullConstraints() {
- return SqlConstraintValidator.getFullConstraints(tableConstraints,
columnList);
+ return
SqlConstraintValidator.getFullConstraints(getTableConstraints(),
getColumnList());
}
@Override
@@ -242,15 +179,17 @@ public class SqlReplaceTableAs extends SqlCreate
implements ExtendedSqlNode {
writer.keyword("CREATE OR");
}
writer.keyword("REPLACE TABLE");
- tableName.unparse(writer, leftPrec, rightPrec);
+ getTableName().unparse(writer, leftPrec, rightPrec);
+ SqlCharStringLiteral comment = getComment().orElse(null);
if (comment != null) {
writer.newlineAndIndent();
writer.keyword("COMMENT");
comment.unparse(writer, leftPrec, rightPrec);
}
- if (this.propertyList.size() > 0) {
+ SqlNodeList propertyList = getPropertyList();
+ if (!propertyList.isEmpty()) {
writer.keyword("WITH");
SqlWriter.Frame withFrame = writer.startList("(", ")");
for (SqlNode property : propertyList) {
@@ -268,6 +207,6 @@ public class SqlReplaceTableAs extends SqlCreate implements
ExtendedSqlNode {
}
public String[] fullTableName() {
- return tableName.names.toArray(new String[0]);
+ return getTableName().names.toArray(new String[0]);
}
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
deleted file mode 100644
index 56129869fc8..00000000000
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
+++ /dev/null
@@ -1,359 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.operations;
-
-import org.apache.flink.sql.parser.ddl.SqlCreateTable;
-import org.apache.flink.sql.parser.ddl.SqlCreateTableAs;
-import org.apache.flink.sql.parser.ddl.SqlCreateTableLike;
-import org.apache.flink.sql.parser.ddl.SqlTableLike;
-import org.apache.flink.sql.parser.ddl.SqlTableOption;
-import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
-import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.catalog.CatalogManager;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.ContextResolvedTable;
-import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.catalog.ResolvedCatalogTable;
-import org.apache.flink.table.catalog.ResolvedSchema;
-import org.apache.flink.table.catalog.TableDistribution;
-import org.apache.flink.table.catalog.UnresolvedIdentifier;
-import org.apache.flink.table.operations.CreateTableASOperation;
-import org.apache.flink.table.operations.Operation;
-import org.apache.flink.table.operations.ddl.CreateTableOperation;
-import org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator;
-import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
-import org.apache.flink.table.planner.calcite.SqlRewriterUtils;
-import org.apache.flink.table.planner.utils.OperationConverterUtils;
-
-import org.apache.calcite.sql.SqlIdentifier;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlNodeList;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-/** Helper class for converting {@link SqlCreateTable} to {@link
CreateTableOperation}. */
-class SqlCreateTableConverter {
-
- private final MergeTableLikeUtil mergeTableLikeUtil;
- private final MergeTableAsUtil mergeTableAsUtil;
- private final CatalogManager catalogManager;
- private final FlinkTypeFactory typeFactory;
- private final SqlRewriterUtils rewriterUtils;
-
- SqlCreateTableConverter(
- FlinkCalciteSqlValidator sqlValidator,
- CatalogManager catalogManager,
- Function<SqlNode, String> escapeExpression) {
- this.mergeTableLikeUtil =
- new MergeTableLikeUtil(
- sqlValidator, escapeExpression,
catalogManager.getDataTypeFactory());
- this.mergeTableAsUtil =
- new MergeTableAsUtil(
- sqlValidator, escapeExpression,
catalogManager.getDataTypeFactory());
- this.catalogManager = catalogManager;
- this.typeFactory = (FlinkTypeFactory) sqlValidator.getTypeFactory();
- this.rewriterUtils = new SqlRewriterUtils(sqlValidator);
- }
-
- /** Convert the {@link SqlCreateTable} node. */
- Operation convertCreateTable(SqlCreateTable sqlCreateTable) {
- ResolvedCatalogTable catalogTable = createCatalogTable(sqlCreateTable);
-
- UnresolvedIdentifier unresolvedIdentifier =
- UnresolvedIdentifier.of(sqlCreateTable.fullTableName());
- ObjectIdentifier identifier =
catalogManager.qualifyIdentifier(unresolvedIdentifier);
-
- return new CreateTableOperation(
- identifier,
- catalogTable,
- sqlCreateTable.isIfNotExists(),
- sqlCreateTable.isTemporary());
- }
-
- /** Convert the {@link SqlCreateTableAs} node. */
- Operation convertCreateTableAS(
- FlinkPlannerImpl flinkPlanner, SqlCreateTableAs sqlCreateTableAs) {
- UnresolvedIdentifier unresolvedIdentifier =
- UnresolvedIdentifier.of(sqlCreateTableAs.fullTableName());
- ObjectIdentifier identifier =
catalogManager.qualifyIdentifier(unresolvedIdentifier);
-
- SqlNode asQuerySqlNode = sqlCreateTableAs.getAsQuery();
- SqlNode validatedAsQuery = flinkPlanner.validate(asQuerySqlNode);
-
- PlannerQueryOperation query =
- (PlannerQueryOperation)
- SqlNodeToOperationConversion.convert(
- flinkPlanner, catalogManager,
validatedAsQuery)
- .orElseThrow(
- () ->
- new TableException(
- "CTAS unsupported node
type "
- +
validatedAsQuery
-
.getClass()
-
.getSimpleName()));
- ResolvedCatalogTable tableWithResolvedSchema =
- createCatalogTable(sqlCreateTableAs,
query.getResolvedSchema());
-
- // If needed, rewrite the query to include the new sink fields in the
select list
- query =
- mergeTableAsUtil.maybeRewriteQuery(
- catalogManager,
- flinkPlanner,
- query,
- validatedAsQuery,
- tableWithResolvedSchema);
-
- CreateTableOperation createTableOperation =
- new CreateTableOperation(
- identifier,
- tableWithResolvedSchema,
- sqlCreateTableAs.isIfNotExists(),
- sqlCreateTableAs.isTemporary());
-
- return new CreateTableASOperation(
- createTableOperation, Collections.emptyMap(), query, false);
- }
-
- private ResolvedCatalogTable createCatalogTable(
- SqlCreateTableAs sqlCreateTableAs, ResolvedSchema querySchema) {
- Map<String, String> tableOptions =
- sqlCreateTableAs.getPropertyList().getList().stream()
- .collect(
- Collectors.toMap(
- p -> ((SqlTableOption)
p).getKeyString(),
- p -> ((SqlTableOption)
p).getValueString()));
-
- String tableComment =
-
OperationConverterUtils.getTableComment(sqlCreateTableAs.getComment());
-
- Schema mergedSchema;
- if (sqlCreateTableAs.isSchemaWithColumnsIdentifiersOnly()) {
- // If only column identifiers are provided, then these are used to
- // order the columns in the schema.
- mergedSchema =
-
mergeTableAsUtil.reorderSchema(sqlCreateTableAs.getColumnList(), querySchema);
- } else {
- mergedSchema =
- mergeTableAsUtil.mergeSchemas(
- sqlCreateTableAs.getColumnList(),
- sqlCreateTableAs.getWatermark().orElse(null),
- sqlCreateTableAs.getFullConstraints(),
- querySchema);
- }
-
- Optional<TableDistribution> tableDistribution =
- Optional.ofNullable(sqlCreateTableAs.getDistribution())
-
.map(OperationConverterUtils::getDistributionFromSqlDistribution);
-
- List<String> partitionKeys =
-
getPartitionKeyColumnNames(sqlCreateTableAs.getPartitionKeyList());
- verifyPartitioningColumnsExist(mergedSchema, partitionKeys);
-
- CatalogTable catalogTable =
- CatalogTable.newBuilder()
- .schema(mergedSchema)
- .comment(tableComment)
- .distribution(tableDistribution.orElse(null))
- .options(tableOptions)
- .partitionKeys(partitionKeys)
- .build();
-
- return catalogManager.resolveCatalogTable(catalogTable);
- }
-
- private ResolvedCatalogTable createCatalogTable(SqlCreateTable
sqlCreateTable) {
-
- final Schema sourceTableSchema;
- final Optional<TableDistribution> sourceTableDistribution;
- final List<String> sourcePartitionKeys;
- final List<SqlTableLike.SqlTableLikeOption> likeOptions;
- final Map<String, String> sourceProperties;
- if (sqlCreateTable instanceof SqlCreateTableLike) {
- SqlTableLike sqlTableLike = ((SqlCreateTableLike)
sqlCreateTable).getTableLike();
- CatalogTable table = lookupLikeSourceTable(sqlTableLike);
- sourceTableSchema = table.getUnresolvedSchema();
- sourceTableDistribution = table.getDistribution();
- sourcePartitionKeys = table.getPartitionKeys();
- likeOptions = sqlTableLike.getOptions();
- sourceProperties = table.getOptions();
- } else {
- sourceTableSchema = Schema.newBuilder().build();
- sourceTableDistribution = Optional.empty();
- sourcePartitionKeys = Collections.emptyList();
- likeOptions = Collections.emptyList();
- sourceProperties = Collections.emptyMap();
- }
-
- Map<SqlTableLike.FeatureOption, SqlTableLike.MergingStrategy>
mergingStrategies =
- mergeTableLikeUtil.computeMergingStrategies(likeOptions);
-
- Map<String, String> mergedOptions =
- mergeOptions(sqlCreateTable, sourceProperties,
mergingStrategies);
-
- // It is assumed only a primary key constraint may be defined in the
table. The
- // SqlCreateTableAs has validations to ensure this before the object
is created.
- Optional<SqlTableConstraint> primaryKey =
- sqlCreateTable.getFullConstraints().stream()
- .filter(SqlTableConstraint::isPrimaryKey)
- .findAny();
-
- Schema mergedSchema =
- mergeTableLikeUtil.mergeTables(
- mergingStrategies,
- sourceTableSchema,
- sqlCreateTable.getColumnList().getList(),
- sqlCreateTable
- .getWatermark()
- .map(Collections::singletonList)
- .orElseGet(Collections::emptyList),
- primaryKey.orElse(null));
-
- Optional<TableDistribution> mergedTableDistribution =
- mergeDistribution(sourceTableDistribution, sqlCreateTable,
mergingStrategies);
-
- List<String> partitionKeys =
- mergePartitions(
- sourcePartitionKeys,
- sqlCreateTable.getPartitionKeyList(),
- mergingStrategies);
- verifyPartitioningColumnsExist(mergedSchema, partitionKeys);
-
- String tableComment =
OperationConverterUtils.getTableComment(sqlCreateTable.getComment());
-
- CatalogTable catalogTable =
- CatalogTable.newBuilder()
- .schema(mergedSchema)
- .comment(tableComment)
- .distribution(mergedTableDistribution.orElse(null))
- .options(new HashMap<>(mergedOptions))
- .partitionKeys(partitionKeys)
- .build();
-
- return catalogManager.resolveCatalogTable(catalogTable);
- }
-
- private CatalogTable lookupLikeSourceTable(SqlTableLike sqlTableLike) {
- UnresolvedIdentifier unresolvedIdentifier =
- UnresolvedIdentifier.of(sqlTableLike.getSourceTable().names);
- ObjectIdentifier identifier =
catalogManager.qualifyIdentifier(unresolvedIdentifier);
- ContextResolvedTable lookupResult =
- catalogManager
- .getTable(identifier)
- .orElseThrow(
- () ->
- new ValidationException(
- String.format(
- "Source table '%s' of
the LIKE clause not found in the catalog, at %s",
- identifier,
- sqlTableLike
-
.getSourceTable()
-
.getParserPosition())));
- if (!(lookupResult.getResolvedTable() instanceof CatalogTable)) {
- throw new ValidationException(
- String.format(
- "Source table '%s' of the LIKE clause can not be a
VIEW, at %s",
- identifier,
sqlTableLike.getSourceTable().getParserPosition()));
- }
- return lookupResult.getResolvedTable();
- }
-
- private void verifyPartitioningColumnsExist(Schema mergedSchema,
List<String> partitionKeys) {
- Set<String> columnNames =
- mergedSchema.getColumns().stream()
- .map(Schema.UnresolvedColumn::getName)
- .collect(Collectors.toCollection(LinkedHashSet::new));
- for (String partitionKey : partitionKeys) {
- if (!columnNames.contains(partitionKey)) {
- throw new ValidationException(
- String.format(
- "Partition column '%s' not defined in the
table schema. Available columns: [%s]",
- partitionKey,
- columnNames.stream()
- .collect(Collectors.joining("', '",
"'", "'"))));
- }
- }
- }
-
- private Optional<TableDistribution> mergeDistribution(
- Optional<TableDistribution> sourceTableDistribution,
- SqlCreateTable sqlCreateTable,
- Map<SqlTableLike.FeatureOption, SqlTableLike.MergingStrategy>
mergingStrategies) {
-
- Optional<TableDistribution> derivedTabledDistribution =
Optional.empty();
- if (sqlCreateTable.getDistribution() != null) {
- TableDistribution distribution =
- OperationConverterUtils.getDistributionFromSqlDistribution(
- sqlCreateTable.getDistribution());
- derivedTabledDistribution = Optional.of(distribution);
- }
-
- return mergeTableLikeUtil.mergeDistribution(
- mergingStrategies.get(SqlTableLike.FeatureOption.DISTRIBUTION),
- sourceTableDistribution,
- derivedTabledDistribution);
- }
-
- private List<String> getPartitionKeyColumnNames(SqlNodeList partitionKey) {
- return partitionKey.getList().stream()
- .map(p -> ((SqlIdentifier) p).getSimple())
- .collect(Collectors.toList());
- }
-
- private List<String> mergePartitions(
- List<String> sourcePartitionKeys,
- SqlNodeList derivedPartitionKeys,
- Map<SqlTableLike.FeatureOption, SqlTableLike.MergingStrategy>
mergingStrategies) {
- // set partition key
- return mergeTableLikeUtil.mergePartitions(
- mergingStrategies.get(SqlTableLike.FeatureOption.PARTITIONS),
- sourcePartitionKeys,
- getPartitionKeyColumnNames(derivedPartitionKeys));
- }
-
- private Map<String, String> mergeOptions(
- SqlCreateTable sqlCreateTable,
- Map<String, String> sourceProperties,
- Map<SqlTableLike.FeatureOption, SqlTableLike.MergingStrategy>
mergingStrategies) {
- // set with properties
- Map<String, String> properties = new HashMap<>();
- sqlCreateTable
- .getPropertyList()
- .getList()
- .forEach(
- p ->
- properties.put(
- ((SqlTableOption) p).getKeyString(),
- ((SqlTableOption)
p).getValueString()));
- return mergeTableLikeUtil.mergeOptions(
- mergingStrategies.get(SqlTableLike.FeatureOption.OPTIONS),
- sourceProperties,
- properties);
- }
-}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
index a4563a2671c..8208bd2a666 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
@@ -36,7 +36,6 @@ import org.apache.flink.sql.parser.ddl.SqlAnalyzeTable;
import org.apache.flink.sql.parser.ddl.SqlCompilePlan;
import org.apache.flink.sql.parser.ddl.SqlCreateDatabase;
import org.apache.flink.sql.parser.ddl.SqlCreateFunction;
-import org.apache.flink.sql.parser.ddl.SqlCreateTable;
import org.apache.flink.sql.parser.ddl.SqlCreateTableAs;
import org.apache.flink.sql.parser.ddl.SqlDropCatalog;
import org.apache.flink.sql.parser.ddl.SqlDropDatabase;
@@ -221,7 +220,6 @@ import java.util.stream.Collectors;
public class SqlNodeToOperationConversion {
private final FlinkPlannerImpl flinkPlanner;
private final CatalogManager catalogManager;
- private final SqlCreateTableConverter createTableConverter;
private final AlterSchemaConverter alterSchemaConverter;
// ~ Constructors
-----------------------------------------------------------
@@ -230,11 +228,6 @@ public class SqlNodeToOperationConversion {
FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager) {
this.flinkPlanner = flinkPlanner;
this.catalogManager = catalogManager;
- this.createTableConverter =
- new SqlCreateTableConverter(
- flinkPlanner.getOrCreateSqlValidator(),
- catalogManager,
- this::getQuotedSqlString);
this.alterSchemaConverter =
new AlterSchemaConverter(
flinkPlanner.getOrCreateSqlValidator(),
@@ -299,14 +292,6 @@ public class SqlNodeToOperationConversion {
converter.convertShowCurrentDatabase((SqlShowCurrentDatabase) validated));
} else if (validated instanceof SqlUseDatabase) {
return Optional.of(converter.convertUseDatabase((SqlUseDatabase)
validated));
- } else if (validated instanceof SqlCreateTable) {
- if (validated instanceof SqlCreateTableAs) {
- return Optional.of(
- converter.createTableConverter.convertCreateTableAS(
- flinkPlanner, (SqlCreateTableAs) validated));
- }
- return Optional.of(
-
converter.createTableConverter.convertCreateTable((SqlCreateTable) validated));
} else if (validated instanceof SqlDropTable) {
return Optional.of(converter.convertDropTable((SqlDropTable)
validated));
} else if (validated instanceof SqlAlterTable) {
@@ -488,7 +473,7 @@ public class SqlNodeToOperationConversion {
tableIdentifier)));
Map<String, String> newProps = new
HashMap<>(catalogPartition.getProperties());
newProps.putAll(
-
OperationConverterUtils.extractProperties(alterTableOptions.getPropertyList()));
+
OperationConverterUtils.getProperties(alterTableOptions.getPropertyList()));
return new AlterPartitionPropertiesOperation(
tableIdentifier,
partitionSpec,
@@ -496,7 +481,7 @@ public class SqlNodeToOperationConversion {
} else {
// it's altering a table
Map<String, String> changeOptions =
-
OperationConverterUtils.extractProperties(alterTableOptions.getPropertyList());
+
OperationConverterUtils.getProperties(alterTableOptions.getPropertyList());
Map<String, String> newOptions = new
HashMap<>(oldTable.getOptions());
newOptions.putAll(changeOptions);
return new AlterTableChangeOperation(
@@ -786,15 +771,8 @@ public class SqlNodeToOperationConversion {
.map(comment ->
comment.getValueAs(NlsString.class).getValue())
.orElse(null);
// set with properties
- Map<String, String> properties = new HashMap<>();
- sqlCreateDatabase
- .getPropertyList()
- .getList()
- .forEach(
- p ->
- properties.put(
- ((SqlTableOption) p).getKeyString(),
- ((SqlTableOption)
p).getValueString()));
+ final Map<String, String> properties =
+
OperationConverterUtils.getProperties(sqlCreateDatabase.getPropertyList());
CatalogDatabase catalogDatabase = new CatalogDatabaseImpl(properties,
databaseComment);
return new CreateDatabaseOperation(
catalogName, databaseName, catalogDatabase, ignoreIfExists);
@@ -846,14 +824,8 @@ public class SqlNodeToOperationConversion {
throw new ValidationException(String.format("Catalog %s not
exists", catalogName));
}
// set with properties
- sqlAlterDatabase
- .getPropertyList()
- .getList()
- .forEach(
- p ->
- properties.put(
- ((SqlTableOption) p).getKeyString(),
- ((SqlTableOption)
p).getValueString()));
+ properties.putAll(
+
OperationConverterUtils.getProperties(sqlAlterDatabase.getPropertyList()));
CatalogDatabase catalogDatabase =
new CatalogDatabaseImpl(properties,
originCatalogDatabase.getComment());
return new AlterDatabaseOperation(catalogName, databaseName,
catalogDatabase);
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/AbstractCreateTableConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/AbstractCreateTableConverter.java
new file mode 100644
index 00000000000..f83062103dd
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/AbstractCreateTableConverter.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.ddl.SqlCreateTable;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableDistribution;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.operations.ddl.CreateTableOperation;
+import org.apache.flink.table.planner.utils.OperationConverterUtils;
+
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Abstract class for converting {@link SqlCreateTable} and it's children to
create table
+ * operations.
+ */
+public abstract class AbstractCreateTableConverter<T extends SqlCreateTable>
+ implements SqlNodeConverter<T> {
+
+ /** Context of create table converters while mering source and derived
items. */
+ protected interface MergeContext {
+ Schema getMergedSchema(ResolvedSchema schemaToMerge);
+
+ Map<String, String> getMergedTableOptions();
+
+ List<String> getMergedPartitionKeys();
+
+ Optional<TableDistribution> getMergedTableDistribution();
+ }
+
+ protected abstract MergeContext getMergeContext(T sqlCreateTable,
ConvertContext context);
+
+ protected final Optional<TableDistribution> getDerivedTableDistribution(T
sqlCreateTable) {
+ return Optional.ofNullable(sqlCreateTable.getDistribution())
+
.map(OperationConverterUtils::getDistributionFromSqlDistribution);
+ }
+
+ protected final List<String> getDerivedPartitionKeys(T sqlCreateTable) {
+ return
OperationConverterUtils.getColumnNames(sqlCreateTable.getPartitionKeyList());
+ }
+
+ protected final Map<String, String> getDerivedTableOptions(T
sqlCreateTable) {
+ return
OperationConverterUtils.getProperties(sqlCreateTable.getPropertyList());
+ }
+
+ protected final String getComment(T sqlCreateTable) {
+ return OperationConverterUtils.getComment(sqlCreateTable.getComment());
+ }
+
+ protected final ResolvedCatalogTable getResolvedCatalogTable(
+ T sqlCreateTable, ConvertContext context, ResolvedSchema
schemaToMerge) {
+ final MergeContext mergeContext = getMergeContext(sqlCreateTable,
context);
+ final List<String> partitionKeys =
mergeContext.getMergedPartitionKeys();
+ final Schema schema = mergeContext.getMergedSchema(schemaToMerge);
+ verifyPartitioningColumnsExist(schema, partitionKeys);
+
+ final Map<String, String> tableOptions =
mergeContext.getMergedTableOptions();
+ final TableDistribution distribution =
+ mergeContext.getMergedTableDistribution().orElse(null);
+ final String comment = getComment(sqlCreateTable);
+ final CatalogTable catalogTable =
+ CatalogTable.newBuilder()
+ .schema(schema)
+ .comment(comment)
+ .distribution(distribution)
+ .options(tableOptions)
+ .partitionKeys(partitionKeys)
+ .build();
+ return context.getCatalogManager().resolveCatalogTable(catalogTable);
+ }
+
+ protected final ObjectIdentifier getIdentifier(SqlCreateTable node,
ConvertContext context) {
+ UnresolvedIdentifier unresolvedIdentifier =
UnresolvedIdentifier.of(node.fullTableName());
+ return
context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier);
+ }
+
+ protected final CreateTableOperation getCreateTableOperation(
+ ObjectIdentifier identifier,
+ ResolvedCatalogTable tableWithResolvedSchema,
+ T sqlCreateTable) {
+ return new CreateTableOperation(
+ identifier,
+ tableWithResolvedSchema,
+ sqlCreateTable.isIfNotExists(),
+ sqlCreateTable.isTemporary());
+ }
+
+ private void verifyPartitioningColumnsExist(Schema schema, List<String>
partitionKeys) {
+ final Set<String> columnNames =
+ schema.getColumns().stream()
+ .map(Schema.UnresolvedColumn::getName)
+ .collect(Collectors.toCollection(LinkedHashSet::new));
+ for (String partitionKey : partitionKeys) {
+ if (!columnNames.contains(partitionKey)) {
+ throw new ValidationException(
+ String.format(
+ "Partition column '%s' not defined in the
table schema. Available columns: [%s]",
+ partitionKey,
+ columnNames.stream()
+ .collect(Collectors.joining("', '",
"'", "'"))));
+ }
+ }
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableAsUtil.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/MergeTableAsUtil.java
similarity index 96%
rename from
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableAsUtil.java
rename to
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/MergeTableAsUtil.java
index 3509b62cb1a..cdecc263a6a 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableAsUtil.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/MergeTableAsUtil.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.planner.operations;
+package org.apache.flink.table.planner.operations.converters;
import org.apache.flink.sql.parser.ddl.SqlTableColumn;
import org.apache.flink.sql.parser.ddl.SqlTableColumn.SqlComputedColumn;
@@ -38,6 +38,9 @@ import
org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.calcite.SqlRewriterUtils;
+import org.apache.flink.table.planner.operations.PlannerQueryOperation;
+import org.apache.flink.table.planner.operations.SqlNodeToOperationConversion;
+import
org.apache.flink.table.planner.operations.converters.SqlNodeConverter.ConvertContext;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
@@ -79,6 +82,13 @@ public class MergeTableAsUtil {
this.dataTypeFactory = dataTypeFactory;
}
+ public MergeTableAsUtil(ConvertContext context) {
+ this(
+ context.getSqlValidator(),
+ context::toQuotedSqlString,
+ context.getCatalogManager().getDataTypeFactory());
+ }
+
/**
* Rewrites the query operation to include only the fields that may be
persisted in the sink.
*/
@@ -204,9 +214,7 @@ public class MergeTableAsUtil {
Optional<SqlTableConstraint> primaryKey =
sqlTableConstraints.stream().filter(SqlTableConstraint::isPrimaryKey).findAny();
- if (primaryKey.isPresent()) {
- schemaBuilder.setPrimaryKey(primaryKey.get());
- }
+ primaryKey.ifPresent(schemaBuilder::setPrimaryKey);
return schemaBuilder.build();
}
@@ -233,9 +241,9 @@ public class MergeTableAsUtil {
*/
private static class SchemaBuilder extends SchemaBuilderUtil {
// Mapping required when evaluating compute expressions and watermark
columns.
- private Map<String, RelDataType> regularAndMetadataFieldNamesToTypes =
+ private final Map<String, RelDataType>
regularAndMetadataFieldNamesToTypes =
new LinkedHashMap<>();
- private Map<String, RelDataType> computeFieldNamesToTypes = new
LinkedHashMap<>();
+ private final Map<String, RelDataType> computeFieldNamesToTypes = new
LinkedHashMap<>();
FlinkTypeFactory typeFactory;
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/MergeTableLikeUtil.java
similarity index 98%
rename from
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java
rename to
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/MergeTableLikeUtil.java
index 3dc43b36cfe..9c2958f4b0a 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/MergeTableLikeUtil.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.planner.operations;
+package org.apache.flink.table.planner.operations.converters;
import org.apache.flink.sql.parser.ddl.SqlCreateTable;
import org.apache.flink.sql.parser.ddl.SqlTableColumn;
@@ -83,6 +83,13 @@ class MergeTableLikeUtil {
this.dataTypeFactory = dataTypeFactory;
}
+ MergeTableLikeUtil(SqlNodeConverter.ConvertContext context) {
+ this(
+ context.getSqlValidator(),
+ context::toQuotedSqlString,
+ context.getCatalogManager().getDataTypeFactory());
+ }
+
/**
* Calculates merging strategies for all options. It applies options given
by a user to the
* {@link #defaultMergingStrategies}. The {@link MergingStrategy}
specified for {@link
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SchemaBuilderUtil.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SchemaBuilderUtil.java
similarity index 97%
rename from
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SchemaBuilderUtil.java
rename to
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SchemaBuilderUtil.java
index 61e94edcbb9..0af303a404b 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SchemaBuilderUtil.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SchemaBuilderUtil.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.planner.operations;
+package org.apache.flink.table.planner.operations.converters;
import org.apache.flink.sql.parser.ddl.SqlTableColumn;
import org.apache.flink.sql.parser.ddl.SqlTableColumn.SqlComputedColumn;
@@ -243,10 +243,7 @@ public class SchemaBuilderUtil {
/** Converts a {@link SqlTableConstraint} to an {@link
UnresolvedPrimaryKey} object. */
public UnresolvedPrimaryKey toUnresolvedPrimaryKey(SqlTableConstraint
primaryKey) {
- List<String> columnNames =
- primaryKey.getColumns().getList().stream()
- .map(n -> ((SqlIdentifier) n).getSimple())
- .collect(Collectors.toList());
+ List<String> columnNames = List.of(primaryKey.getColumnNames());
String constraintName =
primaryKey
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableResumeConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableResumeConverter.java
index afb7f272e7b..a395dd20929 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableResumeConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableResumeConverter.java
@@ -19,13 +19,12 @@
package org.apache.flink.table.planner.operations.converters;
import org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableResume;
-import org.apache.flink.sql.parser.ddl.SqlTableOption;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.operations.Operation;
import
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableResumeOperation;
+import org.apache.flink.table.planner.utils.OperationConverterUtils;
-import java.util.HashMap;
import java.util.Map;
/** A converter for {@link SqlAlterMaterializedTableResume}. */
@@ -41,15 +40,9 @@ public class SqlAlterMaterializedTableResumeConverter
context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier);
// get table options
- Map<String, String> options = new HashMap<>();
- sqlAlterMaterializedTableResume
- .getPropertyList()
- .getList()
- .forEach(
- p ->
- options.put(
- ((SqlTableOption) p).getKeyString(),
- ((SqlTableOption)
p).getValueString()));
+ final Map<String, String> options =
+ OperationConverterUtils.getProperties(
+ sqlAlterMaterializedTableResume.getPropertyList());
return new AlterMaterializedTableResumeOperation(identifier, options);
}
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterModelSetConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterModelSetConverter.java
index 742f9573b57..8c6f465faef 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterModelSetConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterModelSetConverter.java
@@ -44,7 +44,7 @@ public class SqlAlterModelSetConverter extends
AbstractSqlAlterModelConverter<Sq
sqlAlterModelSet.ifModelExists());
Map<String, String> changeModelOptions =
-
OperationConverterUtils.extractProperties(sqlAlterModelSet.getOptionList());
+
OperationConverterUtils.getProperties(sqlAlterModelSet.getOptionList());
if (changeModelOptions.isEmpty()) {
throw new ValidationException("ALTER MODEL SET does not support
empty option.");
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterTableAddPartitionConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterTableAddPartitionConverter.java
index 2e30060bce1..7a4d8fa9932 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterTableAddPartitionConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterTableAddPartitionConverter.java
@@ -46,8 +46,7 @@ public class SqlAlterTableAddPartitionConverter implements
SqlNodeConverter<SqlA
for (int i = 0; i < sqlAddPartitions.getPartSpecs().size(); i++) {
specs.add(new
CatalogPartitionSpec(sqlAddPartitions.getPartitionKVs(i)));
Map<String, String> props =
- OperationConverterUtils.extractProperties(
- sqlAddPartitions.getPartProps().get(i));
+
OperationConverterUtils.getProperties(sqlAddPartitions.getPartProps().get(i));
partitions.add(new CatalogPartitionImpl(props, null));
}
return new AddPartitionsOperation(
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterViewPropertiesConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterViewPropertiesConverter.java
index 8d57143e0a9..6d1f8520038 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterViewPropertiesConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterViewPropertiesConverter.java
@@ -41,7 +41,7 @@ public class SqlAlterViewPropertiesConverter implements
SqlNodeConverter<SqlAlte
context.getCatalogManager()
.qualifyIdentifier(UnresolvedIdentifier.of(alterView.fullViewName()));
Map<String, String> newOptions = new HashMap<>(oldView.getOptions());
-
newOptions.putAll(OperationConverterUtils.extractProperties(alterView.getPropertyList()));
+
newOptions.putAll(OperationConverterUtils.getProperties(alterView.getPropertyList()));
CatalogView newView =
CatalogView.of(
oldView.getUnresolvedSchema(),
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateCatalogConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateCatalogConverter.java
index ea65f8f4ed8..d02c56b57a6 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateCatalogConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateCatalogConverter.java
@@ -19,14 +19,13 @@
package org.apache.flink.table.planner.operations.converters;
import org.apache.flink.sql.parser.ddl.SqlCreateCatalog;
-import org.apache.flink.sql.parser.ddl.SqlTableOption;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
+import org.apache.flink.table.planner.utils.OperationConverterUtils;
import org.apache.calcite.sql.SqlCharStringLiteral;
import org.apache.calcite.util.NlsString;
-import java.util.HashMap;
import java.util.Map;
/** A converter for {@link SqlCreateCatalog}. */
@@ -35,14 +34,8 @@ public class SqlCreateCatalogConverter implements
SqlNodeConverter<SqlCreateCata
@Override
public Operation convertSqlNode(SqlCreateCatalog node, ConvertContext
context) {
// set with properties
- Map<String, String> properties = new HashMap<>();
- node.getPropertyList()
- .getList()
- .forEach(
- p ->
- properties.put(
- ((SqlTableOption) p).getKeyString(),
- ((SqlTableOption)
p).getValueString()));
+ final Map<String, String> properties =
+ OperationConverterUtils.getProperties(node.getPropertyList());
return new CreateCatalogOperation(
node.catalogName(),
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
index f474e9fdf36..d836bfdcfdc 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.planner.operations.converters;
import org.apache.flink.sql.parser.SqlConstraintValidator;
import org.apache.flink.sql.parser.ddl.SqlCreateMaterializedTable;
import org.apache.flink.sql.parser.ddl.SqlRefreshMode;
-import org.apache.flink.sql.parser.ddl.SqlTableOption;
import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
import org.apache.flink.sql.parser.error.SqlValidateException;
import org.apache.flink.table.api.Schema;
@@ -41,11 +40,9 @@ import
org.apache.flink.table.planner.utils.OperationConverterUtils;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeFamily;
-import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlNode;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -72,18 +69,11 @@ public class SqlCreateMaterializedTableConverter
// get comment
String tableComment =
-
OperationConverterUtils.getTableComment(sqlCreateMaterializedTable.getComment());
+
OperationConverterUtils.getComment(sqlCreateMaterializedTable.getComment());
// get options
- Map<String, String> options = new HashMap<>();
- sqlCreateMaterializedTable
- .getPropertyList()
- .getList()
- .forEach(
- p ->
- options.put(
- ((SqlTableOption) p).getKeyString(),
- ((SqlTableOption)
p).getValueString()));
+ final Map<String, String> tableOptions =
+
OperationConverterUtils.getProperties(sqlCreateMaterializedTable.getPropertyList());
// get freshness
IntervalFreshness intervalFreshness =
@@ -138,13 +128,12 @@ public class SqlCreateMaterializedTableConverter
// get and verify partition key
List<String> partitionKeys =
-
sqlCreateMaterializedTable.getPartitionKeyList().getList().stream()
- .map(p -> ((SqlIdentifier) p).getSimple())
- .collect(Collectors.toList());
+ OperationConverterUtils.getColumnNames(
+ sqlCreateMaterializedTable.getPartitionKeyList());
verifyPartitioningColumnsExist(
resolvedSchema,
partitionKeys,
- options.keySet().stream()
+ tableOptions.keySet().stream()
.filter(k -> k.startsWith(PARTITION_FIELDS))
.collect(Collectors.toSet()));
@@ -166,7 +155,7 @@ public class SqlCreateMaterializedTableConverter
.comment(tableComment)
.distribution(tableDistribution.orElse(null))
.partitionKeys(partitionKeys)
- .options(options)
+ .options(tableOptions)
.definitionQuery(definitionQuery)
.freshness(intervalFreshness)
.logicalRefreshMode(logicalRefreshMode)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateModelConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateModelConverter.java
index 8a4523b6527..48d56ccbf00 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateModelConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateModelConverter.java
@@ -29,7 +29,6 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateModelOperation;
-import org.apache.flink.table.planner.operations.SchemaBuilderUtil;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateTableAsConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateTableAsConverter.java
new file mode 100644
index 00000000000..7cb06b8a451
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateTableAsConverter.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.ddl.SqlCreateTableAs;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableDistribution;
+import org.apache.flink.table.operations.CreateTableASOperation;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.CreateTableOperation;
+import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
+import org.apache.flink.table.planner.operations.PlannerQueryOperation;
+import org.apache.flink.table.planner.operations.SqlNodeToOperationConversion;
+
+import org.apache.calcite.sql.SqlNode;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** Helper class for converting {@link SqlCreateTableAs} to {@link
CreateTableASOperation}. */
+public class SqlCreateTableAsConverter extends
AbstractCreateTableConverter<SqlCreateTableAs> {
+
+ @Override
+ public Operation convertSqlNode(SqlCreateTableAs sqlCreateTableAs,
ConvertContext context) {
+ final FlinkPlannerImpl flinkPlanner = context.getFlinkPlanner();
+ final CatalogManager catalogManager = context.getCatalogManager();
+ SqlNode asQuerySqlNode = sqlCreateTableAs.getAsQuery();
+ SqlNode validatedAsQuery = flinkPlanner.validate(asQuerySqlNode);
+
+ PlannerQueryOperation query =
+ (PlannerQueryOperation)
+ SqlNodeToOperationConversion.convert(
+ flinkPlanner, catalogManager,
validatedAsQuery)
+ .orElseThrow(
+ () ->
+ new TableException(
+ "CTAS unsupported node
type "
+ +
validatedAsQuery
+
.getClass()
+
.getSimpleName()));
+ ResolvedCatalogTable tableWithResolvedSchema =
+ getResolvedCatalogTable(sqlCreateTableAs, context,
query.getResolvedSchema());
+
+ // If needed, rewrite the query to include the new sink fields in the
select list
+ query =
+ new MergeTableAsUtil(context)
+ .maybeRewriteQuery(
+ catalogManager,
+ flinkPlanner,
+ query,
+ validatedAsQuery,
+ tableWithResolvedSchema);
+
+ ObjectIdentifier identifier = getIdentifier(sqlCreateTableAs, context);
+ CreateTableOperation createTableOperation =
+ getCreateTableOperation(identifier, tableWithResolvedSchema,
sqlCreateTableAs);
+
+ return new CreateTableASOperation(createTableOperation, Map.of(),
query, false);
+ }
+
+ @Override
+ protected MergeContext getMergeContext(
+ SqlCreateTableAs sqlCreateTableAs, ConvertContext context) {
+ return new MergeContext() {
+ private final MergeTableAsUtil mergeTableAsUtil = new
MergeTableAsUtil(context);
+
+ @Override
+ public Schema getMergedSchema(ResolvedSchema schemaToMerge) {
+ if (sqlCreateTableAs.isSchemaWithColumnsIdentifiersOnly()) {
+ // If only column identifiers are provided, then these are
used to
+ // order the columns in the schema.
+ return mergeTableAsUtil.reorderSchema(
+ sqlCreateTableAs.getColumnList(), schemaToMerge);
+ } else {
+ return mergeTableAsUtil.mergeSchemas(
+ sqlCreateTableAs.getColumnList(),
+ sqlCreateTableAs.getWatermark().orElse(null),
+ sqlCreateTableAs.getFullConstraints(),
+ schemaToMerge);
+ }
+ }
+
+ @Override
+ public Map<String, String> getMergedTableOptions() {
+ return
SqlCreateTableAsConverter.this.getDerivedTableOptions(sqlCreateTableAs);
+ }
+
+ @Override
+ public List<String> getMergedPartitionKeys() {
+ return
SqlCreateTableAsConverter.this.getDerivedPartitionKeys(sqlCreateTableAs);
+ }
+
+ @Override
+ public Optional<TableDistribution> getMergedTableDistribution() {
+ return
SqlCreateTableAsConverter.this.getDerivedTableDistribution(sqlCreateTableAs);
+ }
+ };
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateTableConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateTableConverter.java
new file mode 100644
index 00000000000..1cf4d24ebee
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateTableConverter.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.ddl.SqlCreateTable;
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableDistribution;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.CreateTableOperation;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** Helper class for converting {@link SqlCreateTable} to {@link
CreateTableOperation}. */
+public class SqlCreateTableConverter extends
AbstractCreateTableConverter<SqlCreateTable> {
+
+ @Override
+ public Operation convertSqlNode(SqlCreateTable sqlCreateTable,
ConvertContext context) {
+ // no schema to merge for CREATE TABLE
+ ResolvedCatalogTable catalogTable =
getResolvedCatalogTable(sqlCreateTable, context, null);
+
+ ObjectIdentifier identifier = getIdentifier(sqlCreateTable, context);
+ return getCreateTableOperation(identifier, catalogTable,
sqlCreateTable);
+ }
+
+ @Override
+ protected MergeContext getMergeContext(SqlCreateTable sqlCreateTable,
ConvertContext context) {
+ return new MergeContext() {
+ private final MergeTableLikeUtil mergeTableLikeUtil = new
MergeTableLikeUtil(context);
+
+ @Override
+ public Schema getMergedSchema(ResolvedSchema schemaToMerge) {
+ final Optional<SqlTableConstraint> tableConstraint =
+ sqlCreateTable.getFullConstraints().stream()
+ .filter(SqlTableConstraint::isPrimaryKey)
+ .findAny();
+ return mergeTableLikeUtil.mergeTables(
+ Map.of(),
+ Schema.newBuilder().build(),
+ sqlCreateTable.getColumnList().getList(),
+ sqlCreateTable
+ .getWatermark()
+ .map(Collections::singletonList)
+ .orElseGet(Collections::emptyList),
+ tableConstraint.orElse(null));
+ }
+
+ @Override
+ public Map<String, String> getMergedTableOptions() {
+ return
SqlCreateTableConverter.this.getDerivedTableOptions(sqlCreateTable);
+ }
+
+ @Override
+ public List<String> getMergedPartitionKeys() {
+ return
SqlCreateTableConverter.this.getDerivedPartitionKeys(sqlCreateTable);
+ }
+
+ @Override
+ public Optional<TableDistribution> getMergedTableDistribution() {
+ return
SqlCreateTableConverter.this.getDerivedTableDistribution(sqlCreateTable);
+ }
+ };
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateTableLikeConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateTableLikeConverter.java
new file mode 100644
index 00000000000..8e949eb0d14
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateTableLikeConverter.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.ddl.SqlCreateTableLike;
+import org.apache.flink.sql.parser.ddl.SqlTableLike;
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableDistribution;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.CreateTableOperation;
+import org.apache.flink.table.planner.utils.OperationConverterUtils;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** Helper class for converting {@link SqlCreateTableLike} to {@link
CreateTableOperation}. */
+public class SqlCreateTableLikeConverter extends
AbstractCreateTableConverter<SqlCreateTableLike> {
+
+ @Override
+ public Operation convertSqlNode(SqlCreateTableLike sqlCreateTableLike,
ConvertContext context) {
+ ResolvedCatalogTable catalogTable =
+ // no schema definition to merge for CREATE TABLE ... LIKE,
schema from source table
+ // will be merged
+ getResolvedCatalogTable(sqlCreateTableLike, context, null);
+ final ObjectIdentifier identifier = getIdentifier(sqlCreateTableLike,
context);
+ return getCreateTableOperation(identifier, catalogTable,
sqlCreateTableLike);
+ }
+
+ private CatalogTable lookupLikeSourceTable(
+ SqlTableLike sqlTableLike, CatalogManager catalogManager) {
+ UnresolvedIdentifier unresolvedIdentifier =
+ UnresolvedIdentifier.of(sqlTableLike.getSourceTable().names);
+ ObjectIdentifier identifier =
catalogManager.qualifyIdentifier(unresolvedIdentifier);
+ ContextResolvedTable lookupResult =
+ catalogManager
+ .getTable(identifier)
+ .orElseThrow(
+ () ->
+ new ValidationException(
+ String.format(
+ "Source table '%s' of
the LIKE clause not found in the catalog, at %s",
+ identifier,
+ sqlTableLike
+
.getSourceTable()
+
.getParserPosition())));
+ if (!(lookupResult.getResolvedTable() instanceof CatalogTable)) {
+ throw new ValidationException(
+ String.format(
+ "Source table '%s' of the LIKE clause can not be a
VIEW, at %s",
+ identifier,
sqlTableLike.getSourceTable().getParserPosition()));
+ }
+ return lookupResult.getResolvedTable();
+ }
+
+ @Override
+ protected MergeContext getMergeContext(
+ SqlCreateTableLike sqlCreateTableLike, ConvertContext context) {
+ return new MergeContext() {
+ private final MergeTableLikeUtil mergeTableLikeUtil = new
MergeTableLikeUtil(context);
+ private final SqlTableLike sqlTableLike =
sqlCreateTableLike.getTableLike();
+ private final CatalogTable table =
+ lookupLikeSourceTable(sqlTableLike,
context.getCatalogManager());
+ private final Map<SqlTableLike.FeatureOption,
SqlTableLike.MergingStrategy>
+ mergingStrategies =
+
mergeTableLikeUtil.computeMergingStrategies(sqlTableLike.getOptions());
+
+ @Override
+ public Schema getMergedSchema(ResolvedSchema schemaToMerge) {
+ final Optional<SqlTableConstraint> tableConstraint =
+ sqlCreateTableLike.getFullConstraints().stream()
+ .filter(SqlTableConstraint::isPrimaryKey)
+ .findAny();
+ return mergeTableLikeUtil.mergeTables(
+ mergingStrategies,
+ table.getUnresolvedSchema(),
+ sqlCreateTableLike.getColumnList().getList(),
+ sqlCreateTableLike
+ .getWatermark()
+ .map(Collections::singletonList)
+ .orElseGet(Collections::emptyList),
+ tableConstraint.orElse(null));
+ }
+
+ @Override
+ public Map<String, String> getMergedTableOptions() {
+ final Map<String, String> derivedTableOptions =
+
OperationConverterUtils.getProperties(sqlCreateTableLike.getPropertyList());
+ return mergeTableLikeUtil.mergeOptions(
+
mergingStrategies.get(SqlTableLike.FeatureOption.OPTIONS),
+ table.getOptions(),
+ derivedTableOptions);
+ }
+
+ @Override
+ public List<String> getMergedPartitionKeys() {
+ return mergeTableLikeUtil.mergePartitions(
+
mergingStrategies.get(SqlTableLike.FeatureOption.PARTITIONS),
+ table.getPartitionKeys(),
+
SqlCreateTableLikeConverter.this.getDerivedPartitionKeys(
+ sqlCreateTableLike));
+ }
+
+ @Override
+ public Optional<TableDistribution> getMergedTableDistribution() {
+ return mergeTableLikeUtil.mergeDistribution(
+
mergingStrategies.get(SqlTableLike.FeatureOption.DISTRIBUTION),
+ table.getDistribution(),
+
SqlCreateTableLikeConverter.this.getDerivedTableDistribution(
+ sqlCreateTableLike));
+ }
+ };
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateViewConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateViewConverter.java
index 4e98455c1b1..88455f91e80 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateViewConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateViewConverter.java
@@ -51,8 +51,7 @@ public class SqlCreateViewConverter implements
SqlNodeConverter<SqlCreateView> {
.map(c -> c.getValueAs(NlsString.class).getValue())
.orElse(null);
Map<String, String> viewOptions =
- OperationConverterUtils.extractProperties(
- sqlCreateView.getProperties().orElse(null));
+
OperationConverterUtils.getProperties(sqlCreateView.getProperties().orElse(null));
CatalogView catalogView =
SqlNodeConvertUtils.toCatalogView(
query, viewFields, viewOptions, viewComment, context);
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
index 76c8f1b1f87..92ac82a8b75 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
@@ -79,6 +79,9 @@ public class SqlNodeConverters {
register(new SqlShowCatalogsConverter());
register(new SqlDescribeFunctionConverter());
register(new SqlDescribeModelConverter());
+ register(new SqlCreateTableAsConverter());
+ register(new SqlCreateTableConverter());
+ register(new SqlCreateTableLikeConverter());
}
/**
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java
index b2b5d8f1039..12bfa23dbc1 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java
@@ -19,54 +19,32 @@
package org.apache.flink.table.planner.operations.converters;
import org.apache.flink.sql.parser.ddl.SqlReplaceTableAs;
-import org.apache.flink.sql.parser.ddl.SqlTableOption;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogManager;
-import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.TableDistribution;
-import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ReplaceTableAsOperation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
-import org.apache.flink.table.planner.operations.MergeTableAsUtil;
import org.apache.flink.table.planner.operations.PlannerQueryOperation;
import org.apache.flink.table.planner.operations.SqlNodeToOperationConversion;
-import org.apache.flink.table.planner.utils.OperationConverterUtils;
-import org.apache.calcite.sql.SqlIdentifier;
-import org.apache.calcite.sql.SqlNodeList;
-
-import java.util.HashMap;
-import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Collectors;
/** A converter for {@link SqlReplaceTableAs}. */
-public class SqlReplaceTableAsConverter implements
SqlNodeConverter<SqlReplaceTableAs> {
+public class SqlReplaceTableAsConverter extends
AbstractCreateTableConverter<SqlReplaceTableAs> {
@Override
public Operation convertSqlNode(SqlReplaceTableAs sqlReplaceTableAs,
ConvertContext context) {
CatalogManager catalogManager = context.getCatalogManager();
- UnresolvedIdentifier unresolvedIdentifier =
- UnresolvedIdentifier.of(sqlReplaceTableAs.fullTableName());
- ObjectIdentifier identifier =
catalogManager.qualifyIdentifier(unresolvedIdentifier);
FlinkPlannerImpl flinkPlanner = context.getFlinkPlanner();
- MergeTableAsUtil mergeTableAsUtil =
- new MergeTableAsUtil(
- context.getSqlValidator(),
- sqlNode -> sqlNode.toString(),
- catalogManager.getDataTypeFactory());
-
PlannerQueryOperation query =
(PlannerQueryOperation)
SqlNodeToOperationConversion.convert(
@@ -84,109 +62,63 @@ public class SqlReplaceTableAsConverter implements
SqlNodeConverter<SqlReplaceTa
// get table
ResolvedCatalogTable tableWithResolvedSchema =
- createCatalogTable(
- context, mergeTableAsUtil, sqlReplaceTableAs,
query.getResolvedSchema());
+ getResolvedCatalogTable(sqlReplaceTableAs, context,
query.getResolvedSchema());
// If needed, rewrite the query to include the new sink fields in the
select list
query =
- mergeTableAsUtil.maybeRewriteQuery(
- context.getCatalogManager(),
- flinkPlanner,
- query,
- sqlReplaceTableAs.getAsQuery(),
- tableWithResolvedSchema);
-
+ new MergeTableAsUtil(context)
+ .maybeRewriteQuery(
+ context.getCatalogManager(),
+ flinkPlanner,
+ query,
+ sqlReplaceTableAs.getAsQuery(),
+ tableWithResolvedSchema);
+
+ ObjectIdentifier identifier = getIdentifier(sqlReplaceTableAs,
context);
CreateTableOperation createTableOperation =
- new CreateTableOperation(
- identifier,
- tableWithResolvedSchema,
- sqlReplaceTableAs.isIfNotExists(),
- sqlReplaceTableAs.isTemporary());
+ getCreateTableOperation(identifier, tableWithResolvedSchema,
sqlReplaceTableAs);
return new ReplaceTableAsOperation(
createTableOperation, query,
sqlReplaceTableAs.isCreateOrReplace());
}
- private ResolvedCatalogTable createCatalogTable(
- ConvertContext context,
- MergeTableAsUtil mergeTableAsUtil,
- SqlReplaceTableAs sqlReplaceTableAs,
- ResolvedSchema querySchema) {
- CatalogManager catalogManager = context.getCatalogManager();
-
- // get table comment
- String tableComment =
-
OperationConverterUtils.getTableComment(sqlReplaceTableAs.getComment());
-
- // get table properties
- Map<String, String> properties = new HashMap<>();
- sqlReplaceTableAs
- .getPropertyList()
- .getList()
- .forEach(
- p ->
- properties.put(
- ((SqlTableOption) p).getKeyString(),
- ((SqlTableOption)
p).getValueString()));
-
- Schema mergedSchema;
- if (sqlReplaceTableAs.isSchemaWithColumnsIdentifiersOnly()) {
- // If only column identifiers are provided, then these are used to
- // order the columns in the schema.
- mergedSchema =
-
mergeTableAsUtil.reorderSchema(sqlReplaceTableAs.getColumnList(), querySchema);
- } else {
- // merge schemas
- mergedSchema =
- mergeTableAsUtil.mergeSchemas(
+ @Override
+ protected MergeContext getMergeContext(
+ SqlReplaceTableAs sqlReplaceTableAs, ConvertContext context) {
+ return new MergeContext() {
+ private final MergeTableAsUtil mergeTableAsUtil = new
MergeTableAsUtil(context);
+
+ @Override
+ public Schema getMergedSchema(ResolvedSchema querySchema) {
+ if (sqlReplaceTableAs.isSchemaWithColumnsIdentifiersOnly()) {
+ // If only column identifiers are provided, then these are
used to
+ // order the columns in the schema.
+ return mergeTableAsUtil.reorderSchema(
+ sqlReplaceTableAs.getColumnList(), querySchema);
+ } else {
+ return mergeTableAsUtil.mergeSchemas(
sqlReplaceTableAs.getColumnList(),
sqlReplaceTableAs.getWatermark().orElse(null),
sqlReplaceTableAs.getFullConstraints(),
querySchema);
- }
-
- // get distribution
- Optional<TableDistribution> tableDistribution =
- Optional.ofNullable(sqlReplaceTableAs.getDistribution())
-
.map(OperationConverterUtils::getDistributionFromSqlDistribution);
-
- // get partition key
- List<String> partitionKeys =
-
getPartitionKeyColumnNames(sqlReplaceTableAs.getPartitionKeyList());
- verifyPartitioningColumnsExist(mergedSchema, partitionKeys);
-
- CatalogTable catalogTable =
- CatalogTable.newBuilder()
- .schema(mergedSchema)
- .comment(tableComment)
- .distribution(tableDistribution.orElse(null))
- .options(properties)
- .partitionKeys(partitionKeys)
- .build();
+ }
+ }
- return catalogManager.resolveCatalogTable(catalogTable);
- }
+ @Override
+ public Map<String, String> getMergedTableOptions() {
+ return
SqlReplaceTableAsConverter.this.getDerivedTableOptions(sqlReplaceTableAs);
+ }
- private List<String> getPartitionKeyColumnNames(SqlNodeList partitionKey) {
- return partitionKey.getList().stream()
- .map(p -> ((SqlIdentifier) p).getSimple())
- .collect(Collectors.toList());
- }
+ @Override
+ public List<String> getMergedPartitionKeys() {
+ return
SqlReplaceTableAsConverter.this.getDerivedPartitionKeys(sqlReplaceTableAs);
+ }
- private void verifyPartitioningColumnsExist(Schema mergedSchema,
List<String> partitionKeys) {
- Set<String> columnNames =
- mergedSchema.getColumns().stream()
- .map(Schema.UnresolvedColumn::getName)
- .collect(Collectors.toCollection(LinkedHashSet::new));
- for (String partitionKey : partitionKeys) {
- if (!columnNames.contains(partitionKey)) {
- throw new ValidationException(
- String.format(
- "Partition column '%s' not defined in the
table schema. Available columns: [%s]",
- partitionKey,
- columnNames.stream()
- .collect(Collectors.joining("', '",
"'", "'"))));
+ @Override
+ public Optional<TableDistribution> getMergedTableDistribution() {
+ return
SqlReplaceTableAsConverter.this.getDerivedTableDistribution(
+ sqlReplaceTableAs);
}
- }
+ };
}
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
index 1b1d224cf85..d475d9f33c6 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
@@ -92,11 +92,11 @@ public class OperationConverterUtils {
.orElse(null);
}
- public static @Nullable String
getTableComment(Optional<SqlCharStringLiteral> tableComment) {
+ public static @Nullable String getComment(Optional<SqlCharStringLiteral>
tableComment) {
return tableComment.map(comment ->
comment.getValueAs(String.class)).orElse(null);
}
- public static Map<String, String> extractProperties(SqlNodeList propList) {
+ public static Map<String, String> getProperties(SqlNodeList propList) {
Map<String, String> properties = new HashMap<>();
if (propList != null) {
propList.getList()
@@ -109,6 +109,12 @@ public class OperationConverterUtils {
return properties;
}
+ public static List<String> getColumnNames(SqlNodeList sqlNodeList) {
+ return sqlNodeList.getList().stream()
+ .map(p -> ((SqlIdentifier) p).getSimple())
+ .collect(Collectors.toList());
+ }
+
public static TableDistribution getDistributionFromSqlDistribution(
SqlDistribution distribution) {
TableDistribution.Kind kind =
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/MergeTableLikeUtilTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/converters/MergeTableLikeUtilTest.java
similarity index 99%
rename from
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/MergeTableLikeUtilTest.java
rename to
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/converters/MergeTableLikeUtilTest.java
index 785b08ca18c..50e14fb2caa 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/MergeTableLikeUtilTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/converters/MergeTableLikeUtilTest.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.planner.operations;
+package org.apache.flink.table.planner.operations.converters;
import org.apache.flink.sql.parser.ddl.SqlTableColumn.SqlComputedColumn;
import org.apache.flink.sql.parser.ddl.SqlTableColumn.SqlMetadataColumn;