This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 81bf2ebb58197f9fa1ac4b1dffa3df580a9d546d Author: Sergey Nuyanzin <[email protected]> AuthorDate: Tue Nov 25 08:40:50 2025 +0100 [FLINK-38712][table] Decouple table and `TableSchemaContext` This closes #27264 --- .../src/main/codegen/data/Parser.tdd | 4 +- .../src/main/codegen/includes/parserImpls.ftl | 41 ++++++++++---------- .../apache/flink/sql/parser/ddl/SqlAlterTable.java | 8 ---- .../flink/sql/parser/ddl/SqlCreateTable.java | 13 ------- .../flink/sql/parser/ddl/TableSchemaContext.java | 45 ++++++++++++++++++++++ 5 files changed, 67 insertions(+), 44 deletions(-) diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd index 27e84f755bf..2bd62da7fe0 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd +++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd @@ -58,7 +58,8 @@ "org.apache.flink.sql.parser.ddl.SqlAlterModelReset" "org.apache.flink.sql.parser.ddl.SqlAlterModelSet" "org.apache.flink.sql.parser.ddl.SqlAlterTable" - "org.apache.flink.sql.parser.ddl.SqlAlterTable.AlterTableContext" + "org.apache.flink.sql.parser.ddl.TableSchemaContext" + "org.apache.flink.sql.parser.ddl.TableSchemaContext.AlterTableSchemaContext" "org.apache.flink.sql.parser.ddl.SqlAlterTableAdd" "org.apache.flink.sql.parser.ddl.SqlAlterTableAddConstraint" "org.apache.flink.sql.parser.ddl.SqlAlterTableDropColumn" @@ -82,7 +83,6 @@ "org.apache.flink.sql.parser.ddl.SqlCreateModelAs" "org.apache.flink.sql.parser.ddl.SqlCreateOrAlterMaterializedTable" "org.apache.flink.sql.parser.ddl.SqlCreateTable" - "org.apache.flink.sql.parser.ddl.SqlCreateTable.TableCreationContext" "org.apache.flink.sql.parser.ddl.SqlCreateTableAs" "org.apache.flink.sql.parser.ddl.SqlCreateTableLike" "org.apache.flink.sql.parser.ddl.SqlCreateView" diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index df8ce03e0f0..bf2f07ce060 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -857,7 +857,7 @@ SqlAlterTable SqlAlterTable() : SqlTableConstraint constraint; SqlIdentifier originColumnIdentifier; SqlIdentifier newColumnIdentifier; - AlterTableContext ctx = new AlterTableContext(); + AlterTableSchemaContext ctx = new AlterTableSchemaContext(); AlterTableAddPartitionContext addPartitionCtx = new AlterTableAddPartitionContext(); AlterTableDropPartitionsContext dropPartitionsCtx = new AlterTableDropPartitionsContext(); } @@ -924,8 +924,7 @@ SqlAlterTable SqlAlterTable() : | ( <DISTRIBUTION> - ctx.distribution = SqlDistribution(getPos()) - {return new SqlAddDistribution(getPos(), tableIdentifier, ctx.distribution);} + {return new SqlAddDistribution(getPos(), tableIdentifier, SqlDistribution(getPos()));} | AlterTableAddOrModify(ctx) | @@ -950,8 +949,7 @@ SqlAlterTable SqlAlterTable() : <MODIFY> ( <DISTRIBUTION> - ctx.distribution = SqlDistribution(getPos()) - {return new SqlModifyDistribution(getPos(), tableIdentifier, ctx.distribution);} + {return new SqlModifyDistribution(getPos(), tableIdentifier, SqlDistribution(getPos()));} | AlterTableAddOrModify(ctx) | @@ -1062,7 +1060,7 @@ SqlNodeList PropertyKeys(): { return new SqlNodeList(proKeyList, span.end(this)); } } -void TableColumn(TableCreationContext context) : +void TableColumn(TableSchemaContext context) : { SqlTableConstraint constraint; } @@ -1081,7 +1079,7 @@ void TableColumn(TableCreationContext context) : ) } -void Watermark(TableCreationContext context) : +void Watermark(TableSchemaContext context) : { SqlIdentifier eventTimeColumnName; SqlParserPos pos; @@ -1102,7 +1100,7 @@ void Watermark(TableCreationContext context) : } /** Parses {@code column_name column_data_type [...]}. */ -SqlTableColumn TypedColumn(TableCreationContext context) : +SqlTableColumn TypedColumn(TableSchemaContext context) : { SqlTableColumn tableColumn; SqlIdentifier name; @@ -1123,7 +1121,7 @@ SqlTableColumn TypedColumn(TableCreationContext context) : } /** Parses {@code column_name AS expr [COMMENT 'comment']}. */ -SqlTableColumn ComputedColumn(TableCreationContext context) : +SqlTableColumn ComputedColumn(TableSchemaContext context) : { SqlIdentifier name; SqlParserPos pos; @@ -1152,7 +1150,7 @@ SqlTableColumn ComputedColumn(TableCreationContext context) : } /** Parses {@code column_name column_data_type METADATA [FROM 'alias_name'] [VIRTUAL] [COMMENT 'comment']}. */ -SqlTableColumn MetadataColumn(TableCreationContext context, SqlIdentifier name, SqlDataTypeSpec type) : +SqlTableColumn MetadataColumn(TableSchemaContext context, SqlIdentifier name, SqlDataTypeSpec type) : { SqlNode metadataAlias = null; boolean isVirtual = false; @@ -1189,7 +1187,7 @@ SqlTableColumn MetadataColumn(TableCreationContext context, SqlIdentifier name, } /** Parses {@code column_name column_data_type [constraint] [COMMENT 'comment']}. */ -SqlTableColumn RegularColumn(TableCreationContext context, SqlIdentifier name, SqlDataTypeSpec type) : +SqlTableColumn RegularColumn(TableSchemaContext context, SqlIdentifier name, SqlDataTypeSpec type) : { SqlTableConstraint constraint = null; SqlCharStringLiteral comment = null; @@ -1245,8 +1243,8 @@ void AlterTableAddPartition(AlterTableAddPartitionContext context) : } } -/** Parses {@code ALTER TABLE table_name ADD/MODIFY [...]}. */ -void AlterTableAddOrModify(AlterTableContext context) : +/** Parses {@code ALTER [MATERIALIZED ]TABLE table_name ADD/MODIFY [...]}. */ +void AlterTableAddOrModify(AlterTableSchemaContext context) : { SqlTableConstraint constraint; } @@ -1263,7 +1261,7 @@ void AlterTableAddOrModify(AlterTableContext context) : } /** Parses {@code ADD/MODIFY column_name column_data_type [...]}. */ -void AddOrModifyColumn(AlterTableContext context) : +void AddOrModifyColumn(AlterTableSchemaContext context) : { SqlTableColumn column; SqlIdentifier referencedColumn = null; @@ -1568,7 +1566,7 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) : tableName = CompoundIdentifier() [ - <LPAREN> { pos = getPos(); TableCreationContext ctx = new TableCreationContext();} + <LPAREN> { pos = getPos(); TableSchemaContext ctx = new TableSchemaContext();} TableColumnsOrIdentifiers(pos, ctx) { pos = pos.plus(getPos()); @@ -1753,9 +1751,9 @@ SqlDrop SqlDropTable(Span s, boolean replace, boolean isTemporary) : } } -void TableColumnsOrIdentifiers(SqlParserPos pos, TableCreationContext ctx) : +void TableColumnsOrIdentifiers(SqlParserPos pos, TableSchemaContext ctx) : { - final TableCreationContext tempCtx = new TableCreationContext(); + final TableSchemaContext tempCtx = new TableSchemaContext(); final List<SqlNode> identifiers = new ArrayList<SqlNode>(); } { @@ -1812,7 +1810,7 @@ SqlNode SqlReplaceTable() : tableName = CompoundIdentifier() [ - <LPAREN> { pos = getPos(); TableCreationContext ctx = new TableCreationContext();} + <LPAREN> { pos = getPos(); TableSchemaContext ctx = new TableSchemaContext();} TableColumnsOrIdentifiers(pos, ctx) { pos = getPos(); @@ -1899,7 +1897,7 @@ SqlCreate SqlCreateOrAlterMaterializedTable(Span s, boolean replace, boolean isT <TABLE> tableName = CompoundIdentifier() [ - <LPAREN> { pos = getPos(); TableCreationContext ctx = new TableCreationContext();} + <LPAREN> { pos = getPos(); TableSchemaContext ctx = new TableSchemaContext();} TableColumnsOrIdentifiers(pos, ctx) { pos = pos.plus(getPos()); isColumnsIdentifiersOnly = ctx.isColumnsIdentifiersOnly(); @@ -2013,6 +2011,7 @@ SqlAlterMaterializedTable SqlAlterMaterializedTable() : SqlNodeList partSpec = SqlNodeList.EMPTY; SqlNode freshness = null; SqlNode asQuery = null; + AlterTableSchemaContext ctx = new AlterTableSchemaContext(); } { <ALTER> <MATERIALIZED> <TABLE> { startPos = getPos();} @@ -3461,7 +3460,7 @@ SqlCreate SqlCreateModel(Span s, boolean isTemporary) : modelIdentifier = CompoundIdentifier() [ - <INPUT> <LPAREN> { pos = getPos(); TableCreationContext ctx = new TableCreationContext();} + <INPUT> <LPAREN> { pos = getPos(); TableSchemaContext ctx = new TableSchemaContext();} TableColumn(ctx) ( <COMMA> TableColumn(ctx) @@ -3473,7 +3472,7 @@ SqlCreate SqlCreateModel(Span s, boolean isTemporary) : <RPAREN> ] [ - <OUTPUT> <LPAREN> { pos = getPos(); TableCreationContext ctx = new TableCreationContext();} + <OUTPUT> <LPAREN> { pos = getPos(); TableSchemaContext ctx = new TableSchemaContext();} TableColumn(ctx) ( <COMMA> TableColumn(ctx) diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTable.java index 4756887ca9e..b12b3cf0297 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTable.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTable.java @@ -23,7 +23,6 @@ import org.apache.flink.sql.parser.SqlParseUtils; import org.apache.calcite.sql.SqlAlter; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlKind; -import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.SqlOperator; import org.apache.calcite.sql.SqlSpecialOperator; @@ -32,9 +31,7 @@ import org.apache.calcite.sql.parser.SqlParserPos; import javax.annotation.Nullable; -import java.util.ArrayList; import java.util.LinkedHashMap; -import java.util.List; import static java.util.Objects.requireNonNull; @@ -118,9 +115,4 @@ public abstract class SqlAlterTable extends SqlAlter { public boolean ifTableExists() { return ifTableExists; } - - /** Alter table context. */ - public static class AlterTableContext extends SqlCreateTable.TableCreationContext { - public List<SqlNode> columnPositions = new ArrayList<>(); - } } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java index 042ff687eea..2d6769b2252 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java @@ -41,7 +41,6 @@ import org.apache.calcite.util.ImmutableNullableList; import javax.annotation.Nonnull; import javax.annotation.Nullable; -import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -213,16 +212,4 @@ public class SqlCreateTable extends SqlCreateObject implements ExtendedSqlNode { SqlUnparseUtils.unparsePartitionKeyList(partitionKeyList, writer, leftPrec, rightPrec); SqlUnparseUtils.unparseProperties(properties, writer, leftPrec, rightPrec); } - - /** Table creation context. */ - public static class TableCreationContext { - public List<SqlNode> columnList = new ArrayList<>(); - public List<SqlTableConstraint> constraints = new ArrayList<>(); - @Nullable public SqlWatermark watermark; - @Nullable public SqlDistribution distribution; - - public boolean isColumnsIdentifiersOnly() { - return !columnList.isEmpty() && columnList.get(0) instanceof SqlIdentifier; - } - } } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/TableSchemaContext.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/TableSchemaContext.java new file mode 100644 index 00000000000..c3269e2a327 --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/TableSchemaContext.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint; + +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlNode; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; + +/** Table schema creation context. */ +public class TableSchemaContext { + public List<SqlNode> columnList = new ArrayList<>(); + public List<SqlTableConstraint> constraints = new ArrayList<>(); + @Nullable public SqlWatermark watermark; + + public boolean isColumnsIdentifiersOnly() { + return !columnList.isEmpty() && columnList.get(0) instanceof SqlIdentifier; + } + + /** Alter table context. */ + public static class AlterTableSchemaContext extends TableSchemaContext { + public List<SqlNode> columnPositions = new ArrayList<>(); + } +}
