This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 81bf2ebb58197f9fa1ac4b1dffa3df580a9d546d
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Tue Nov 25 08:40:50 2025 +0100

    [FLINK-38712][table] Decouple table and `TableSchemaContext`
    
    This closes #27264
---
 .../src/main/codegen/data/Parser.tdd               |  4 +-
 .../src/main/codegen/includes/parserImpls.ftl      | 41 ++++++++++----------
 .../apache/flink/sql/parser/ddl/SqlAlterTable.java |  8 ----
 .../flink/sql/parser/ddl/SqlCreateTable.java       | 13 -------
 .../flink/sql/parser/ddl/TableSchemaContext.java   | 45 ++++++++++++++++++++++
 5 files changed, 67 insertions(+), 44 deletions(-)

diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd 
b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 27e84f755bf..2bd62da7fe0 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -58,7 +58,8 @@
     "org.apache.flink.sql.parser.ddl.SqlAlterModelReset"
     "org.apache.flink.sql.parser.ddl.SqlAlterModelSet"
     "org.apache.flink.sql.parser.ddl.SqlAlterTable"
-    "org.apache.flink.sql.parser.ddl.SqlAlterTable.AlterTableContext"
+    "org.apache.flink.sql.parser.ddl.TableSchemaContext"
+    
"org.apache.flink.sql.parser.ddl.TableSchemaContext.AlterTableSchemaContext"
     "org.apache.flink.sql.parser.ddl.SqlAlterTableAdd"
     "org.apache.flink.sql.parser.ddl.SqlAlterTableAddConstraint"
     "org.apache.flink.sql.parser.ddl.SqlAlterTableDropColumn"
@@ -82,7 +83,6 @@
     "org.apache.flink.sql.parser.ddl.SqlCreateModelAs"
     "org.apache.flink.sql.parser.ddl.SqlCreateOrAlterMaterializedTable"
     "org.apache.flink.sql.parser.ddl.SqlCreateTable"
-    "org.apache.flink.sql.parser.ddl.SqlCreateTable.TableCreationContext"
     "org.apache.flink.sql.parser.ddl.SqlCreateTableAs"
     "org.apache.flink.sql.parser.ddl.SqlCreateTableLike"
     "org.apache.flink.sql.parser.ddl.SqlCreateView"
diff --git 
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl 
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index df8ce03e0f0..bf2f07ce060 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -857,7 +857,7 @@ SqlAlterTable SqlAlterTable() :
     SqlTableConstraint constraint;
     SqlIdentifier originColumnIdentifier;
     SqlIdentifier newColumnIdentifier;
-    AlterTableContext ctx = new AlterTableContext();
+    AlterTableSchemaContext ctx = new AlterTableSchemaContext();
     AlterTableAddPartitionContext addPartitionCtx = new 
AlterTableAddPartitionContext();
     AlterTableDropPartitionsContext dropPartitionsCtx = new 
AlterTableDropPartitionsContext();
 }
@@ -924,8 +924,7 @@ SqlAlterTable SqlAlterTable() :
         |
         (
             <DISTRIBUTION>
-            ctx.distribution = SqlDistribution(getPos())
-            {return new SqlAddDistribution(getPos(), tableIdentifier, 
ctx.distribution);}
+            {return new SqlAddDistribution(getPos(), tableIdentifier, 
SqlDistribution(getPos()));}
         |
             AlterTableAddOrModify(ctx)
         |
@@ -950,8 +949,7 @@ SqlAlterTable SqlAlterTable() :
         <MODIFY>
         (
             <DISTRIBUTION>
-            ctx.distribution = SqlDistribution(getPos())
-            {return new SqlModifyDistribution(getPos(), tableIdentifier, 
ctx.distribution);}
+            {return new SqlModifyDistribution(getPos(), tableIdentifier, 
SqlDistribution(getPos()));}
         |
             AlterTableAddOrModify(ctx)
         |
@@ -1062,7 +1060,7 @@ SqlNodeList PropertyKeys():
     {  return new SqlNodeList(proKeyList, span.end(this)); }
 }
 
-void TableColumn(TableCreationContext context) :
+void TableColumn(TableSchemaContext context) :
 {
     SqlTableConstraint constraint;
 }
@@ -1081,7 +1079,7 @@ void TableColumn(TableCreationContext context) :
     )
 }
 
-void Watermark(TableCreationContext context) :
+void Watermark(TableSchemaContext context) :
 {
     SqlIdentifier eventTimeColumnName;
     SqlParserPos pos;
@@ -1102,7 +1100,7 @@ void Watermark(TableCreationContext context) :
 }
 
 /** Parses {@code column_name column_data_type [...]}. */
-SqlTableColumn TypedColumn(TableCreationContext context) :
+SqlTableColumn TypedColumn(TableSchemaContext context) :
 {
     SqlTableColumn tableColumn;
     SqlIdentifier name;
@@ -1123,7 +1121,7 @@ SqlTableColumn TypedColumn(TableCreationContext context) :
 }
 
 /** Parses {@code column_name AS expr [COMMENT 'comment']}. */
-SqlTableColumn ComputedColumn(TableCreationContext context) :
+SqlTableColumn ComputedColumn(TableSchemaContext context) :
 {
     SqlIdentifier name;
     SqlParserPos pos;
@@ -1152,7 +1150,7 @@ SqlTableColumn ComputedColumn(TableCreationContext 
context) :
 }
 
 /** Parses {@code column_name column_data_type METADATA [FROM 'alias_name'] 
[VIRTUAL] [COMMENT 'comment']}. */
-SqlTableColumn MetadataColumn(TableCreationContext context, SqlIdentifier 
name, SqlDataTypeSpec type) :
+SqlTableColumn MetadataColumn(TableSchemaContext context, SqlIdentifier name, 
SqlDataTypeSpec type) :
 {
     SqlNode metadataAlias = null;
     boolean isVirtual = false;
@@ -1189,7 +1187,7 @@ SqlTableColumn MetadataColumn(TableCreationContext 
context, SqlIdentifier name,
 }
 
 /** Parses {@code column_name column_data_type [constraint] [COMMENT 
'comment']}. */
-SqlTableColumn RegularColumn(TableCreationContext context, SqlIdentifier name, 
SqlDataTypeSpec type) :
+SqlTableColumn RegularColumn(TableSchemaContext context, SqlIdentifier name, 
SqlDataTypeSpec type) :
 {
     SqlTableConstraint constraint = null;
     SqlCharStringLiteral comment = null;
@@ -1245,8 +1243,8 @@ void  
AlterTableAddPartition(AlterTableAddPartitionContext context) :
     }
 }
 
-/** Parses {@code ALTER TABLE table_name ADD/MODIFY [...]}. */
-void AlterTableAddOrModify(AlterTableContext context) :
+/** Parses {@code ALTER [MATERIALIZED ]TABLE table_name ADD/MODIFY [...]}. */
+void AlterTableAddOrModify(AlterTableSchemaContext context) :
 {
     SqlTableConstraint constraint;
 }
@@ -1263,7 +1261,7 @@ void AlterTableAddOrModify(AlterTableContext context) :
 }
 
 /** Parses {@code ADD/MODIFY column_name column_data_type [...]}. */
-void AddOrModifyColumn(AlterTableContext context) :
+void AddOrModifyColumn(AlterTableSchemaContext context) :
 {
     SqlTableColumn column;
     SqlIdentifier referencedColumn = null;
@@ -1568,7 +1566,7 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean 
isTemporary) :
 
     tableName = CompoundIdentifier()
     [
-        <LPAREN> { pos = getPos(); TableCreationContext ctx = new 
TableCreationContext();}
+        <LPAREN> { pos = getPos(); TableSchemaContext ctx = new 
TableSchemaContext();}
         TableColumnsOrIdentifiers(pos, ctx)
         {
             pos = pos.plus(getPos());
@@ -1753,9 +1751,9 @@ SqlDrop SqlDropTable(Span s, boolean replace, boolean 
isTemporary) :
     }
 }
 
-void TableColumnsOrIdentifiers(SqlParserPos pos, TableCreationContext ctx) :
+void TableColumnsOrIdentifiers(SqlParserPos pos, TableSchemaContext ctx) :
 {
-    final TableCreationContext tempCtx = new TableCreationContext();
+    final TableSchemaContext tempCtx = new TableSchemaContext();
     final List<SqlNode> identifiers = new ArrayList<SqlNode>();
 }
 {
@@ -1812,7 +1810,7 @@ SqlNode SqlReplaceTable() :
 
     tableName = CompoundIdentifier()
     [
-        <LPAREN> { pos = getPos(); TableCreationContext ctx = new 
TableCreationContext();}
+        <LPAREN> { pos = getPos(); TableSchemaContext ctx = new 
TableSchemaContext();}
         TableColumnsOrIdentifiers(pos, ctx)
         {
             pos = getPos();
@@ -1899,7 +1897,7 @@ SqlCreate SqlCreateOrAlterMaterializedTable(Span s, 
boolean replace, boolean isT
     <TABLE>
     tableName = CompoundIdentifier()
     [
-        <LPAREN> { pos = getPos(); TableCreationContext ctx = new 
TableCreationContext();}
+        <LPAREN> { pos = getPos(); TableSchemaContext ctx = new 
TableSchemaContext();}
             TableColumnsOrIdentifiers(pos, ctx) {
                 pos = pos.plus(getPos());
                 isColumnsIdentifiersOnly = ctx.isColumnsIdentifiersOnly();
@@ -2013,6 +2011,7 @@ SqlAlterMaterializedTable SqlAlterMaterializedTable() :
     SqlNodeList partSpec = SqlNodeList.EMPTY;
     SqlNode freshness = null;
     SqlNode asQuery = null;
+    AlterTableSchemaContext ctx = new AlterTableSchemaContext();
 }
 {
     <ALTER> <MATERIALIZED> <TABLE> { startPos = getPos();}
@@ -3461,7 +3460,7 @@ SqlCreate SqlCreateModel(Span s, boolean isTemporary) :
 
     modelIdentifier = CompoundIdentifier()
     [
-        <INPUT> <LPAREN> { pos = getPos(); TableCreationContext ctx = new 
TableCreationContext();}
+        <INPUT> <LPAREN> { pos = getPos(); TableSchemaContext ctx = new 
TableSchemaContext();}
         TableColumn(ctx)
         (
             <COMMA> TableColumn(ctx)
@@ -3473,7 +3472,7 @@ SqlCreate SqlCreateModel(Span s, boolean isTemporary) :
         <RPAREN>
     ]
     [
-        <OUTPUT> <LPAREN> { pos = getPos(); TableCreationContext ctx = new 
TableCreationContext();}
+        <OUTPUT> <LPAREN> { pos = getPos(); TableSchemaContext ctx = new 
TableSchemaContext();}
         TableColumn(ctx)
         (
             <COMMA> TableColumn(ctx)
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTable.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTable.java
index 4756887ca9e..b12b3cf0297 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTable.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTable.java
@@ -23,7 +23,6 @@ import org.apache.flink.sql.parser.SqlParseUtils;
 import org.apache.calcite.sql.SqlAlter;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.SqlSpecialOperator;
@@ -32,9 +31,7 @@ import org.apache.calcite.sql.parser.SqlParserPos;
 
 import javax.annotation.Nullable;
 
-import java.util.ArrayList;
 import java.util.LinkedHashMap;
-import java.util.List;
 
 import static java.util.Objects.requireNonNull;
 
@@ -118,9 +115,4 @@ public abstract class SqlAlterTable extends SqlAlter {
     public boolean ifTableExists() {
         return ifTableExists;
     }
-
-    /** Alter table context. */
-    public static class AlterTableContext extends 
SqlCreateTable.TableCreationContext {
-        public List<SqlNode> columnPositions = new ArrayList<>();
-    }
 }
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
index 042ff687eea..2d6769b2252 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
@@ -41,7 +41,6 @@ import org.apache.calcite.util.ImmutableNullableList;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 
@@ -213,16 +212,4 @@ public class SqlCreateTable extends SqlCreateObject 
implements ExtendedSqlNode {
         SqlUnparseUtils.unparsePartitionKeyList(partitionKeyList, writer, 
leftPrec, rightPrec);
         SqlUnparseUtils.unparseProperties(properties, writer, leftPrec, 
rightPrec);
     }
-
-    /** Table creation context. */
-    public static class TableCreationContext {
-        public List<SqlNode> columnList = new ArrayList<>();
-        public List<SqlTableConstraint> constraints = new ArrayList<>();
-        @Nullable public SqlWatermark watermark;
-        @Nullable public SqlDistribution distribution;
-
-        public boolean isColumnsIdentifiersOnly() {
-            return !columnList.isEmpty() && columnList.get(0) instanceof 
SqlIdentifier;
-        }
-    }
 }
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/TableSchemaContext.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/TableSchemaContext.java
new file mode 100644
index 00000000000..c3269e2a327
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/TableSchemaContext.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Table schema creation context. */
+public class TableSchemaContext {
+    public List<SqlNode> columnList = new ArrayList<>();
+    public List<SqlTableConstraint> constraints = new ArrayList<>();
+    @Nullable public SqlWatermark watermark;
+
+    public boolean isColumnsIdentifiersOnly() {
+        return !columnList.isEmpty() && columnList.get(0) instanceof 
SqlIdentifier;
+    }
+
+    /** Alter table context. */
+    public static class AlterTableSchemaContext extends TableSchemaContext {
+        public List<SqlNode> columnPositions = new ArrayList<>();
+    }
+}

Reply via email to