This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ede7c2c1e1620861839e16a98bf5953311b84e28 Author: Sergey Nuyanzin <[email protected]> AuthorDate: Mon Nov 24 12:07:02 2025 +0100 [FLINK-38712][table] Move away distribution from table schema --- .../src/main/codegen/data/Parser.tdd | 2 + .../src/main/codegen/includes/parserImpls.ftl | 14 ++-- .../flink/sql/parser/ddl/SqlAlterDistribution.java | 89 ++++++++++++++++++++++ .../flink/sql/parser/ddl/SqlAlterTableAdd.java | 11 +-- .../flink/sql/parser/ddl/SqlAlterTableModify.java | 11 +-- .../flink/sql/parser/ddl/SqlAlterTableSchema.java | 17 ++--- .../operations/converters/SqlNodeConverters.java | 4 + .../table/AbstractAlterTableConverter.java | 37 +-------- ... => SqlAlterTableAddDistributionConverter.java} | 48 +++++++----- .../SqlAlterTableDropDistributionConverter.java | 2 +- ... SqlAlterTableModifyDistributionConverter.java} | 47 +++++++----- .../table/SqlAlterTableOptionsConverter.java | 2 +- .../table/SqlAlterTableRenameConverter.java | 2 +- .../table/SqlAlterTableResetConverter.java | 2 +- .../table/SqlAlterTableSchemaConverter.java | 1 - 15 files changed, 174 insertions(+), 115 deletions(-) diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd index 748b0102449..27e84f755bf 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd +++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd @@ -31,6 +31,8 @@ "org.apache.flink.sql.parser.ddl.resource.SqlResource" "org.apache.flink.sql.parser.ddl.resource.SqlResourceType" "org.apache.flink.sql.parser.ddl.SqlAddJar" + "org.apache.flink.sql.parser.ddl.SqlAlterDistribution.SqlAddDistribution" + "org.apache.flink.sql.parser.ddl.SqlAlterDistribution.SqlModifyDistribution" "org.apache.flink.sql.parser.ddl.SqlAddPartitions" "org.apache.flink.sql.parser.ddl.SqlAddPartitions.AlterTableAddPartitionContext" "org.apache.flink.sql.parser.ddl.SqlAlterCatalog" diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index ac40ec19148..df8ce03e0f0 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -923,6 +923,10 @@ SqlAlterTable SqlAlterTable() : } | ( + <DISTRIBUTION> + ctx.distribution = SqlDistribution(getPos()) + {return new SqlAddDistribution(getPos(), tableIdentifier, ctx.distribution);} + | AlterTableAddOrModify(ctx) | <LPAREN> @@ -939,13 +943,16 @@ SqlAlterTable SqlAlterTable() : new SqlNodeList(ctx.columnPositions, startPos.plus(getPos())), ctx.constraints, ctx.watermark, - ctx.distribution, ifExists); } ) | <MODIFY> ( + <DISTRIBUTION> + ctx.distribution = SqlDistribution(getPos()) + {return new SqlModifyDistribution(getPos(), tableIdentifier, ctx.distribution);} + | AlterTableAddOrModify(ctx) | <LPAREN> @@ -962,10 +969,8 @@ SqlAlterTable SqlAlterTable() : new SqlNodeList(ctx.columnPositions, startPos.plus(getPos())), ctx.constraints, ctx.watermark, - ctx.distribution, ifExists); } - | <DROP> ( @@ -1254,9 +1259,6 @@ void AlterTableAddOrModify(AlterTableContext context) : } | Watermark(context) - | - <DISTRIBUTION> - context.distribution = SqlDistribution(getPos()) ) } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterDistribution.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterDistribution.java new file mode 100644 index 00000000000..e8696f8b1c5 --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterDistribution.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.List; + +/** ALTER Distribution DDL sql call for tables and materialized tables. */ +public abstract class SqlAlterDistribution extends SqlAlterTable { + + private final SqlDistribution distribution; + + public SqlAlterDistribution( + SqlParserPos pos, SqlIdentifier tableName, SqlDistribution distribution) { + super(pos, tableName, false); + this.distribution = distribution; + } + + protected abstract String getAlterOperation(); + + public SqlDistribution getDistribution() { + return distribution; + } + + @Nonnull + @Override + public List<SqlNode> getOperandList() { + List<SqlNode> operands = new ArrayList<>(); + operands.add(tableIdentifier); + operands.add(distribution); + return operands; + } + + @Override + public void unparseAlterOperation(SqlWriter writer, int leftPrec, int rightPrec) { + super.unparseAlterOperation(writer, leftPrec, rightPrec); + writer.keyword(getAlterOperation()); + distribution.unparseAlter(writer, leftPrec, rightPrec); + } + + public static class SqlAddDistribution extends SqlAlterDistribution { + + public SqlAddDistribution( + SqlParserPos pos, SqlIdentifier tableName, SqlDistribution distribution) { + super(pos, tableName, distribution); + } + + @Override + protected String getAlterOperation() { + return "ADD"; + } + } + + public static class SqlModifyDistribution extends SqlAlterDistribution { + + public SqlModifyDistribution( + SqlParserPos pos, SqlIdentifier tableName, SqlDistribution distribution) { + super(pos, tableName, distribution); + } + + @Override + protected String getAlterOperation() { + return "MODIFY"; + } + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAdd.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAdd.java index bd83a9c233b..96d7676b41c 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAdd.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAdd.java @@ -22,7 +22,6 @@ import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlNodeList; -import org.apache.calcite.sql.SqlWriter; import org.apache.calcite.sql.parser.SqlParserPos; import javax.annotation.Nullable; @@ -56,16 +55,12 @@ public class SqlAlterTableAdd extends SqlAlterTableSchema { SqlNodeList addedColumns, List<SqlTableConstraint> constraint, @Nullable SqlWatermark sqlWatermark, - @Nullable SqlDistribution distribution, boolean ifTableExists) { - super(pos, tableName, addedColumns, constraint, sqlWatermark, distribution, ifTableExists); + super(pos, tableName, addedColumns, constraint, sqlWatermark, ifTableExists); } @Override - public void unparseAlterOperation(SqlWriter writer, int leftPrec, int rightPrec) { - super.unparseAlterOperation(writer, leftPrec, rightPrec); - writer.keyword("ADD"); - // unparse table schema and distribution - unparseSchemaAndDistribution(writer, leftPrec, rightPrec); + protected String getAlterOperation() { + return "ADD"; } } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableModify.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableModify.java index 2a5fa550909..a06277ff31b 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableModify.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableModify.java @@ -22,7 +22,6 @@ import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlNodeList; -import org.apache.calcite.sql.SqlWriter; import org.apache.calcite.sql.parser.SqlParserPos; import javax.annotation.Nullable; @@ -56,16 +55,12 @@ public class SqlAlterTableModify extends SqlAlterTableSchema { SqlNodeList modifiedColumns, List<SqlTableConstraint> constraints, @Nullable SqlWatermark watermark, - @Nullable SqlDistribution distribution, boolean ifTableExists) { - super(pos, tableName, modifiedColumns, constraints, watermark, distribution, ifTableExists); + super(pos, tableName, modifiedColumns, constraints, watermark, ifTableExists); } @Override - public void unparseAlterOperation(SqlWriter writer, int leftPrec, int rightPrec) { - super.unparseAlterOperation(writer, leftPrec, rightPrec); - writer.keyword("MODIFY"); - // unparse table schema and distribution - unparseSchemaAndDistribution(writer, leftPrec, rightPrec); + protected String getAlterOperation() { + return "MODIFY"; } } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableSchema.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableSchema.java index 746b06ecce3..761b64a3a76 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableSchema.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableSchema.java @@ -44,7 +44,6 @@ public abstract class SqlAlterTableSchema extends SqlAlterTable implements Exten protected final SqlNodeList columnList; @Nullable protected final SqlWatermark watermark; - @Nullable protected final SqlDistribution distribution; protected final List<SqlTableConstraint> constraints; public SqlAlterTableSchema( @@ -53,12 +52,10 @@ public abstract class SqlAlterTableSchema extends SqlAlterTable implements Exten SqlNodeList columnList, List<SqlTableConstraint> constraints, @Nullable SqlWatermark sqlWatermark, - @Nullable SqlDistribution distribution, boolean ifTableExists) { super(pos, tableName, ifTableExists); this.columnList = columnList; this.constraints = constraints; - this.distribution = distribution; this.watermark = sqlWatermark; } @@ -85,10 +82,6 @@ public abstract class SqlAlterTableSchema extends SqlAlterTable implements Exten return Optional.ofNullable(watermark); } - public Optional<SqlDistribution> getDistribution() { - return Optional.ofNullable(distribution); - } - public List<SqlTableConstraint> getConstraints() { return constraints; } @@ -107,11 +100,13 @@ public abstract class SqlAlterTableSchema extends SqlAlterTable implements Exten SqlParserPos.ZERO); } - void unparseSchemaAndDistribution(SqlWriter writer, int leftPrec, int rightPrec) { + protected abstract String getAlterOperation(); + + @Override + public void unparseAlterOperation(SqlWriter writer, int leftPrec, int rightPrec) { + super.unparseAlterOperation(writer, leftPrec, rightPrec); + writer.keyword(getAlterOperation()); SqlUnparseUtils.unparseTableSchema( columnList, constraints, watermark, writer, leftPrec, rightPrec); - if (distribution != null) { - distribution.unparseAlter(writer, leftPrec, rightPrec); - } } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java index ca305b3dd0a..bd7c1e45962 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.planner.operations.converters.SqlNodeConverter.ConvertContext; +import org.apache.flink.table.planner.operations.converters.table.SqlAlterTableAddDistributionConverter; import org.apache.flink.table.planner.operations.converters.table.SqlAlterTableAddPartitionConverter; import org.apache.flink.table.planner.operations.converters.table.SqlAlterTableDropColumnConverter; import org.apache.flink.table.planner.operations.converters.table.SqlAlterTableDropConstraintConverter; @@ -29,6 +30,7 @@ import org.apache.flink.table.planner.operations.converters.table.SqlAlterTableD import org.apache.flink.table.planner.operations.converters.table.SqlAlterTableDropPartitionConverter; import org.apache.flink.table.planner.operations.converters.table.SqlAlterTableDropPrimaryKeyConverter; import org.apache.flink.table.planner.operations.converters.table.SqlAlterTableDropWatermarkConverter; +import org.apache.flink.table.planner.operations.converters.table.SqlAlterTableModifyDistributionConverter; import org.apache.flink.table.planner.operations.converters.table.SqlAlterTableOptionsConverter; import org.apache.flink.table.planner.operations.converters.table.SqlAlterTableRenameColumnConverter; import org.apache.flink.table.planner.operations.converters.table.SqlAlterTableRenameConverter; @@ -106,6 +108,8 @@ public class SqlNodeConverters { register(new SqlAlterTableDropConstraintConverter()); register(new SqlAlterTableDropWatermarkConverter()); register(new SqlAlterTableDropDistributionConverter()); + register(new SqlAlterTableAddDistributionConverter()); + register(new SqlAlterTableModifyDistributionConverter()); register(new SqlAlterTableRenameColumnConverter()); register(new SqlAlterTableResetConverter()); register(new SqlAlterTableOptionsConverter()); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/AbstractAlterTableConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/AbstractAlterTableConverter.java index 4341fd96d64..55577c19d42 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/AbstractAlterTableConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/AbstractAlterTableConverter.java @@ -19,7 +19,6 @@ package org.apache.flink.table.planner.operations.converters.table; import org.apache.flink.sql.parser.ddl.SqlAlterTable; -import org.apache.flink.sql.parser.ddl.SqlAlterTableSchema; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.CatalogBaseTable; @@ -36,12 +35,10 @@ import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.ddl.AlterTableChangeOperation; import org.apache.flink.table.operations.utils.ValidationUtils; import org.apache.flink.table.planner.operations.converters.SqlNodeConverter; -import org.apache.flink.table.planner.utils.OperationConverterUtils; import org.apache.calcite.sql.SqlIdentifier; import java.util.List; -import java.util.Map; import java.util.Optional; /** Abstract class for ALTER TABLE converters. */ @@ -52,31 +49,10 @@ public abstract class AbstractAlterTableConverter<T extends SqlAlterTable> protected abstract Operation convertToOperation( T sqlAlterTable, ResolvedCatalogTable oldTable, ConvertContext context); - protected final Schema getOldSchema(ResolvedCatalogTable resolvedCatalogTable) { - return resolvedCatalogTable.getUnresolvedSchema(); - } - - protected final TableDistribution getOldTableDistribution( - ResolvedCatalogTable resolvedCatalogTable) { - return resolvedCatalogTable.getDistribution().orElse(null); - } - - protected final List<String> getOldPartitionKeys(ResolvedCatalogTable resolvedCatalogTable) { - return resolvedCatalogTable.getPartitionKeys(); - } - - protected final String getOldComment(ResolvedCatalogTable resolvedCatalogTable) { - return resolvedCatalogTable.getComment(); - } - - protected final Map<String, String> getOldOptions(ResolvedCatalogTable resolvedCatalogTable) { - return resolvedCatalogTable.getOptions(); - } - @Override public final Operation convertSqlNode(T sqlAlterTable, ConvertContext context) { CatalogManager catalogManager = context.getCatalogManager(); - final ObjectIdentifier tableIdentifier = getIdentifier(sqlAlterTable, context); + final ObjectIdentifier tableIdentifier = resolveIdentifier(sqlAlterTable, context); Optional<ContextResolvedTable> optionalCatalogTable = catalogManager.getTable(tableIdentifier); @@ -133,22 +109,13 @@ public abstract class AbstractAlterTableConverter<T extends SqlAlterTable> return identifier.getSimple(); } - protected final ObjectIdentifier getIdentifier(SqlAlterTable node, ConvertContext context) { + protected final ObjectIdentifier resolveIdentifier(SqlAlterTable node, ConvertContext context) { UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(node.fullTableName()); return context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier); } protected TableDistribution getTableDistribution( SqlAlterTable alterTable, ResolvedCatalogTable oldTable) { - if (alterTable instanceof SqlAlterTableSchema) { - final Optional<TableDistribution> tableDistribution = - ((SqlAlterTableSchema) alterTable) - .getDistribution() - .map(OperationConverterUtils::getDistributionFromSqlDistribution); - if (tableDistribution.isPresent()) { - return tableDistribution.get(); - } - } return oldTable.getDistribution().orElse(null); } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableDropDistributionConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableAddDistributionConverter.java similarity index 54% copy from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableDropDistributionConverter.java copy to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableAddDistributionConverter.java index 1e829089bc2..3a40b8cfd31 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableDropDistributionConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableAddDistributionConverter.java @@ -18,44 +18,50 @@ package org.apache.flink.table.planner.operations.converters.table; +import org.apache.flink.sql.parser.ddl.SqlAlterDistribution; +import org.apache.flink.sql.parser.ddl.SqlAlterDistribution.SqlAddDistribution; import org.apache.flink.sql.parser.ddl.SqlAlterTable; -import org.apache.flink.sql.parser.ddl.SqlAlterTableDropDistribution; import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.TableChange; import org.apache.flink.table.catalog.TableDistribution; import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.planner.utils.OperationConverterUtils; import java.util.List; +import java.util.Optional; + +/** A converter for {@link SqlAlterDistribution} for ADD call. */ +public class SqlAlterTableAddDistributionConverter + extends AbstractAlterTableConverter<SqlAddDistribution> { -/** Convert ALTER TABLE DROP DISTRIBUTION statement. */ -public class SqlAlterTableDropDistributionConverter - extends AbstractAlterTableConverter<SqlAlterTableDropDistribution> { @Override protected Operation convertToOperation( - SqlAlterTableDropDistribution sqlAlterTable, - ResolvedCatalogTable resolvedCatalogTable, + SqlAddDistribution sqlAddDistribution, + ResolvedCatalogTable oldTable, ConvertContext context) { - final ObjectIdentifier tableIdentifier = getIdentifier(sqlAlterTable, context); - if (resolvedCatalogTable.getDistribution().isEmpty()) { - throw new ValidationException( - String.format( - "Table %s does not have a distribution to drop.", tableIdentifier)); - } - - List<TableChange> tableChanges = List.of(TableChange.dropDistribution()); return buildAlterTableChangeOperation( - sqlAlterTable, - tableChanges, - resolvedCatalogTable.getUnresolvedSchema(), - resolvedCatalogTable, + sqlAddDistribution, + List.of( + TableChange.add( + OperationConverterUtils.getDistributionFromSqlDistribution( + sqlAddDistribution.getDistribution()))), + oldTable.getUnresolvedSchema(), + oldTable, context.getCatalogManager()); } - @Override protected TableDistribution getTableDistribution( SqlAlterTable alterTable, ResolvedCatalogTable oldTable) { - return null; + Optional<TableDistribution> oldDistribution = oldTable.getDistribution(); + if (oldDistribution.isPresent()) { + throw new ValidationException( + String.format( + "%sThe base table has already defined the distribution `%s`. " + + "You can modify it or drop it before adding a new one.", + EX_MSG_PREFIX, oldDistribution.get())); + } + return OperationConverterUtils.getDistributionFromSqlDistribution( + ((SqlAlterDistribution) alterTable).getDistribution()); } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableDropDistributionConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableDropDistributionConverter.java index 1e829089bc2..aef53794542 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableDropDistributionConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableDropDistributionConverter.java @@ -37,7 +37,7 @@ public class SqlAlterTableDropDistributionConverter SqlAlterTableDropDistribution sqlAlterTable, ResolvedCatalogTable resolvedCatalogTable, ConvertContext context) { - final ObjectIdentifier tableIdentifier = getIdentifier(sqlAlterTable, context); + final ObjectIdentifier tableIdentifier = resolveIdentifier(sqlAlterTable, context); if (resolvedCatalogTable.getDistribution().isEmpty()) { throw new ValidationException( String.format( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableDropDistributionConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableModifyDistributionConverter.java similarity index 55% copy from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableDropDistributionConverter.java copy to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableModifyDistributionConverter.java index 1e829089bc2..0afa279df2e 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableDropDistributionConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableModifyDistributionConverter.java @@ -18,44 +18,49 @@ package org.apache.flink.table.planner.operations.converters.table; +import org.apache.flink.sql.parser.ddl.SqlAlterDistribution; +import org.apache.flink.sql.parser.ddl.SqlAlterDistribution.SqlModifyDistribution; import org.apache.flink.sql.parser.ddl.SqlAlterTable; -import org.apache.flink.sql.parser.ddl.SqlAlterTableDropDistribution; import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.TableChange; import org.apache.flink.table.catalog.TableDistribution; import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.planner.utils.OperationConverterUtils; import java.util.List; +import java.util.Optional; + +/** A converter for {@link SqlAlterDistribution} for MODIFY call. */ +public class SqlAlterTableModifyDistributionConverter + extends AbstractAlterTableConverter<SqlModifyDistribution> { -/** Convert ALTER TABLE DROP DISTRIBUTION statement. */ -public class SqlAlterTableDropDistributionConverter - extends AbstractAlterTableConverter<SqlAlterTableDropDistribution> { @Override protected Operation convertToOperation( - SqlAlterTableDropDistribution sqlAlterTable, - ResolvedCatalogTable resolvedCatalogTable, + SqlModifyDistribution sqlModifyDistribution, + ResolvedCatalogTable oldTable, ConvertContext context) { - final ObjectIdentifier tableIdentifier = getIdentifier(sqlAlterTable, context); - if (resolvedCatalogTable.getDistribution().isEmpty()) { - throw new ValidationException( - String.format( - "Table %s does not have a distribution to drop.", tableIdentifier)); - } - - List<TableChange> tableChanges = List.of(TableChange.dropDistribution()); return buildAlterTableChangeOperation( - sqlAlterTable, - tableChanges, - resolvedCatalogTable.getUnresolvedSchema(), - resolvedCatalogTable, + sqlModifyDistribution, + List.of( + TableChange.modify( + OperationConverterUtils.getDistributionFromSqlDistribution( + sqlModifyDistribution.getDistribution()))), + oldTable.getUnresolvedSchema(), + oldTable, context.getCatalogManager()); } - @Override protected TableDistribution getTableDistribution( SqlAlterTable alterTable, ResolvedCatalogTable oldTable) { - return null; + Optional<TableDistribution> oldDistribution = oldTable.getDistribution(); + if (oldDistribution.isEmpty()) { + throw new ValidationException( + String.format( + "%sThe base table does not define any distribution. You might want to add a new one.", + EX_MSG_PREFIX)); + } + return OperationConverterUtils.getDistributionFromSqlDistribution( + ((SqlAlterDistribution) alterTable).getDistribution()); } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableOptionsConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableOptionsConverter.java index a0a8156f29a..6f86b40bd69 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableOptionsConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableOptionsConverter.java @@ -45,7 +45,7 @@ public class SqlAlterTableOptionsConverter SqlAlterTableOptions alterTableOptions, ResolvedCatalogTable oldTable, ConvertContext context) { - final ObjectIdentifier tableIdentifier = getIdentifier(alterTableOptions, context); + final ObjectIdentifier tableIdentifier = resolveIdentifier(alterTableOptions, context); final Map<String, String> partitionKVs = alterTableOptions.getPartitionKVs(); // it's altering partitions if (partitionKVs != null) { diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableRenameConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableRenameConverter.java index e8048b519a7..b8fa969f381 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableRenameConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableRenameConverter.java @@ -35,7 +35,7 @@ public class SqlAlterTableRenameConverter extends AbstractAlterTableConverter<Sq SqlAlterTableRename sqlAlterTable, ResolvedCatalogTable oldTable, ConvertContext context) { - final ObjectIdentifier tableIdentifier = getIdentifier(sqlAlterTable, context); + final ObjectIdentifier tableIdentifier = resolveIdentifier(sqlAlterTable, context); UnresolvedIdentifier newUnresolvedIdentifier = UnresolvedIdentifier.of(sqlAlterTable.fullNewTableName()); ObjectIdentifier newTableIdentifier = diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableResetConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableResetConverter.java index 3046d394301..9781a0de7b4 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableResetConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableResetConverter.java @@ -39,7 +39,7 @@ public class SqlAlterTableResetConverter extends AbstractAlterTableConverter<Sql SqlAlterTableReset alterTableReset, ResolvedCatalogTable oldTable, ConvertContext context) { - final ObjectIdentifier tableIdentifier = getIdentifier(alterTableReset, context); + final ObjectIdentifier tableIdentifier = resolveIdentifier(alterTableReset, context); Map<String, String> newOptions = new HashMap<>(oldTable.getOptions()); // reset empty or 'connector' key is not allowed Set<String> resetKeys = alterTableReset.getResetKeys(); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableSchemaConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableSchemaConverter.java index ffe89cca55d..51be19ad381 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableSchemaConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableSchemaConverter.java @@ -35,7 +35,6 @@ public abstract class SqlAlterTableSchemaConverter<T extends SqlAlterTableSchema SchemaConverter converter = createSchemaConverter(alterTableSchema, oldTable, context); converter.updateColumn(alterTableSchema.getColumnPositions().getList()); alterTableSchema.getWatermark().ifPresent(converter::updateWatermark); - alterTableSchema.getDistribution().ifPresent(converter::updateDistribution); alterTableSchema.getFullConstraint().ifPresent(converter::updatePrimaryKey); return buildAlterTableChangeOperation(
