This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ede7c2c1e1620861839e16a98bf5953311b84e28
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Mon Nov 24 12:07:02 2025 +0100

    [FLINK-38712][table] Move away distribution from table schema
---
 .../src/main/codegen/data/Parser.tdd               |  2 +
 .../src/main/codegen/includes/parserImpls.ftl      | 14 ++--
 .../flink/sql/parser/ddl/SqlAlterDistribution.java | 89 ++++++++++++++++++++++
 .../flink/sql/parser/ddl/SqlAlterTableAdd.java     | 11 +--
 .../flink/sql/parser/ddl/SqlAlterTableModify.java  | 11 +--
 .../flink/sql/parser/ddl/SqlAlterTableSchema.java  | 17 ++---
 .../operations/converters/SqlNodeConverters.java   |  4 +
 .../table/AbstractAlterTableConverter.java         | 37 +--------
 ... => SqlAlterTableAddDistributionConverter.java} | 48 +++++++-----
 .../SqlAlterTableDropDistributionConverter.java    |  2 +-
 ... SqlAlterTableModifyDistributionConverter.java} | 47 +++++++-----
 .../table/SqlAlterTableOptionsConverter.java       |  2 +-
 .../table/SqlAlterTableRenameConverter.java        |  2 +-
 .../table/SqlAlterTableResetConverter.java         |  2 +-
 .../table/SqlAlterTableSchemaConverter.java        |  1 -
 15 files changed, 174 insertions(+), 115 deletions(-)

diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd 
b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 748b0102449..27e84f755bf 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -31,6 +31,8 @@
     "org.apache.flink.sql.parser.ddl.resource.SqlResource"
     "org.apache.flink.sql.parser.ddl.resource.SqlResourceType"
     "org.apache.flink.sql.parser.ddl.SqlAddJar"
+    "org.apache.flink.sql.parser.ddl.SqlAlterDistribution.SqlAddDistribution"
+    
"org.apache.flink.sql.parser.ddl.SqlAlterDistribution.SqlModifyDistribution"
     "org.apache.flink.sql.parser.ddl.SqlAddPartitions"
     
"org.apache.flink.sql.parser.ddl.SqlAddPartitions.AlterTableAddPartitionContext"
     "org.apache.flink.sql.parser.ddl.SqlAlterCatalog"
diff --git 
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl 
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index ac40ec19148..df8ce03e0f0 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -923,6 +923,10 @@ SqlAlterTable SqlAlterTable() :
             }
         |
         (
+            <DISTRIBUTION>
+            ctx.distribution = SqlDistribution(getPos())
+            {return new SqlAddDistribution(getPos(), tableIdentifier, 
ctx.distribution);}
+        |
             AlterTableAddOrModify(ctx)
         |
             <LPAREN>
@@ -939,13 +943,16 @@ SqlAlterTable SqlAlterTable() :
                         new SqlNodeList(ctx.columnPositions, 
startPos.plus(getPos())),
                         ctx.constraints,
                         ctx.watermark,
-                        ctx.distribution,
                         ifExists);
         }
         )
     |
         <MODIFY>
         (
+            <DISTRIBUTION>
+            ctx.distribution = SqlDistribution(getPos())
+            {return new SqlModifyDistribution(getPos(), tableIdentifier, 
ctx.distribution);}
+        |
             AlterTableAddOrModify(ctx)
         |
             <LPAREN>
@@ -962,10 +969,8 @@ SqlAlterTable SqlAlterTable() :
                         new SqlNodeList(ctx.columnPositions, 
startPos.plus(getPos())),
                         ctx.constraints,
                         ctx.watermark,
-                        ctx.distribution,
                         ifExists);
         }
-
     |
      <DROP>
         (
@@ -1254,9 +1259,6 @@ void AlterTableAddOrModify(AlterTableContext context) :
         }
     |
         Watermark(context)
-    |
-        <DISTRIBUTION>
-        context.distribution = SqlDistribution(getPos())
     )
 }
 
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterDistribution.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterDistribution.java
new file mode 100644
index 00000000000..e8696f8b1c5
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterDistribution.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** ALTER Distribution DDL sql call for tables and materialized tables. */
+public abstract class SqlAlterDistribution extends SqlAlterTable {
+
+    private final SqlDistribution distribution;
+
+    public SqlAlterDistribution(
+            SqlParserPos pos, SqlIdentifier tableName, SqlDistribution 
distribution) {
+        super(pos, tableName, false);
+        this.distribution = distribution;
+    }
+
+    protected abstract String getAlterOperation();
+
+    public SqlDistribution getDistribution() {
+        return distribution;
+    }
+
+    @Nonnull
+    @Override
+    public List<SqlNode> getOperandList() {
+        List<SqlNode> operands = new ArrayList<>();
+        operands.add(tableIdentifier);
+        operands.add(distribution);
+        return operands;
+    }
+
+    @Override
+    public void unparseAlterOperation(SqlWriter writer, int leftPrec, int 
rightPrec) {
+        super.unparseAlterOperation(writer, leftPrec, rightPrec);
+        writer.keyword(getAlterOperation());
+        distribution.unparseAlter(writer, leftPrec, rightPrec);
+    }
+
+    public static class SqlAddDistribution extends SqlAlterDistribution {
+
+        public SqlAddDistribution(
+                SqlParserPos pos, SqlIdentifier tableName, SqlDistribution 
distribution) {
+            super(pos, tableName, distribution);
+        }
+
+        @Override
+        protected String getAlterOperation() {
+            return "ADD";
+        }
+    }
+
+    public static class SqlModifyDistribution extends SqlAlterDistribution {
+
+        public SqlModifyDistribution(
+                SqlParserPos pos, SqlIdentifier tableName, SqlDistribution 
distribution) {
+            super(pos, tableName, distribution);
+        }
+
+        @Override
+        protected String getAlterOperation() {
+            return "MODIFY";
+        }
+    }
+}
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAdd.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAdd.java
index bd83a9c233b..96d7676b41c 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAdd.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAdd.java
@@ -22,7 +22,6 @@ import 
org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
 
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlNodeList;
-import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
 
 import javax.annotation.Nullable;
@@ -56,16 +55,12 @@ public class SqlAlterTableAdd extends SqlAlterTableSchema {
             SqlNodeList addedColumns,
             List<SqlTableConstraint> constraint,
             @Nullable SqlWatermark sqlWatermark,
-            @Nullable SqlDistribution distribution,
             boolean ifTableExists) {
-        super(pos, tableName, addedColumns, constraint, sqlWatermark, 
distribution, ifTableExists);
+        super(pos, tableName, addedColumns, constraint, sqlWatermark, 
ifTableExists);
     }
 
     @Override
-    public void unparseAlterOperation(SqlWriter writer, int leftPrec, int 
rightPrec) {
-        super.unparseAlterOperation(writer, leftPrec, rightPrec);
-        writer.keyword("ADD");
-        // unparse table schema and distribution
-        unparseSchemaAndDistribution(writer, leftPrec, rightPrec);
+    protected String getAlterOperation() {
+        return "ADD";
     }
 }
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableModify.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableModify.java
index 2a5fa550909..a06277ff31b 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableModify.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableModify.java
@@ -22,7 +22,6 @@ import 
org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
 
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlNodeList;
-import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
 
 import javax.annotation.Nullable;
@@ -56,16 +55,12 @@ public class SqlAlterTableModify extends 
SqlAlterTableSchema {
             SqlNodeList modifiedColumns,
             List<SqlTableConstraint> constraints,
             @Nullable SqlWatermark watermark,
-            @Nullable SqlDistribution distribution,
             boolean ifTableExists) {
-        super(pos, tableName, modifiedColumns, constraints, watermark, 
distribution, ifTableExists);
+        super(pos, tableName, modifiedColumns, constraints, watermark, 
ifTableExists);
     }
 
     @Override
-    public void unparseAlterOperation(SqlWriter writer, int leftPrec, int 
rightPrec) {
-        super.unparseAlterOperation(writer, leftPrec, rightPrec);
-        writer.keyword("MODIFY");
-        // unparse table schema and distribution
-        unparseSchemaAndDistribution(writer, leftPrec, rightPrec);
+    protected String getAlterOperation() {
+        return "MODIFY";
     }
 }
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableSchema.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableSchema.java
index 746b06ecce3..761b64a3a76 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableSchema.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableSchema.java
@@ -44,7 +44,6 @@ public abstract class SqlAlterTableSchema extends 
SqlAlterTable implements Exten
 
     protected final SqlNodeList columnList;
     @Nullable protected final SqlWatermark watermark;
-    @Nullable protected final SqlDistribution distribution;
     protected final List<SqlTableConstraint> constraints;
 
     public SqlAlterTableSchema(
@@ -53,12 +52,10 @@ public abstract class SqlAlterTableSchema extends 
SqlAlterTable implements Exten
             SqlNodeList columnList,
             List<SqlTableConstraint> constraints,
             @Nullable SqlWatermark sqlWatermark,
-            @Nullable SqlDistribution distribution,
             boolean ifTableExists) {
         super(pos, tableName, ifTableExists);
         this.columnList = columnList;
         this.constraints = constraints;
-        this.distribution = distribution;
         this.watermark = sqlWatermark;
     }
 
@@ -85,10 +82,6 @@ public abstract class SqlAlterTableSchema extends 
SqlAlterTable implements Exten
         return Optional.ofNullable(watermark);
     }
 
-    public Optional<SqlDistribution> getDistribution() {
-        return Optional.ofNullable(distribution);
-    }
-
     public List<SqlTableConstraint> getConstraints() {
         return constraints;
     }
@@ -107,11 +100,13 @@ public abstract class SqlAlterTableSchema extends 
SqlAlterTable implements Exten
                 SqlParserPos.ZERO);
     }
 
-    void unparseSchemaAndDistribution(SqlWriter writer, int leftPrec, int 
rightPrec) {
+    protected abstract String getAlterOperation();
+
+    @Override
+    public void unparseAlterOperation(SqlWriter writer, int leftPrec, int 
rightPrec) {
+        super.unparseAlterOperation(writer, leftPrec, rightPrec);
+        writer.keyword(getAlterOperation());
         SqlUnparseUtils.unparseTableSchema(
                 columnList, constraints, watermark, writer, leftPrec, 
rightPrec);
-        if (distribution != null) {
-            distribution.unparseAlter(writer, leftPrec, rightPrec);
-        }
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
index ca305b3dd0a..bd7c1e45962 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.table.operations.Operation;
 import 
org.apache.flink.table.planner.operations.converters.SqlNodeConverter.ConvertContext;
+import 
org.apache.flink.table.planner.operations.converters.table.SqlAlterTableAddDistributionConverter;
 import 
org.apache.flink.table.planner.operations.converters.table.SqlAlterTableAddPartitionConverter;
 import 
org.apache.flink.table.planner.operations.converters.table.SqlAlterTableDropColumnConverter;
 import 
org.apache.flink.table.planner.operations.converters.table.SqlAlterTableDropConstraintConverter;
@@ -29,6 +30,7 @@ import 
org.apache.flink.table.planner.operations.converters.table.SqlAlterTableD
 import 
org.apache.flink.table.planner.operations.converters.table.SqlAlterTableDropPartitionConverter;
 import 
org.apache.flink.table.planner.operations.converters.table.SqlAlterTableDropPrimaryKeyConverter;
 import 
org.apache.flink.table.planner.operations.converters.table.SqlAlterTableDropWatermarkConverter;
+import 
org.apache.flink.table.planner.operations.converters.table.SqlAlterTableModifyDistributionConverter;
 import 
org.apache.flink.table.planner.operations.converters.table.SqlAlterTableOptionsConverter;
 import 
org.apache.flink.table.planner.operations.converters.table.SqlAlterTableRenameColumnConverter;
 import 
org.apache.flink.table.planner.operations.converters.table.SqlAlterTableRenameConverter;
@@ -106,6 +108,8 @@ public class SqlNodeConverters {
         register(new SqlAlterTableDropConstraintConverter());
         register(new SqlAlterTableDropWatermarkConverter());
         register(new SqlAlterTableDropDistributionConverter());
+        register(new SqlAlterTableAddDistributionConverter());
+        register(new SqlAlterTableModifyDistributionConverter());
         register(new SqlAlterTableRenameColumnConverter());
         register(new SqlAlterTableResetConverter());
         register(new SqlAlterTableOptionsConverter());
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/AbstractAlterTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/AbstractAlterTableConverter.java
index 4341fd96d64..55577c19d42 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/AbstractAlterTableConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/AbstractAlterTableConverter.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.planner.operations.converters.table;
 
 import org.apache.flink.sql.parser.ddl.SqlAlterTable;
-import org.apache.flink.sql.parser.ddl.SqlAlterTableSchema;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogBaseTable;
@@ -36,12 +35,10 @@ import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.ddl.AlterTableChangeOperation;
 import org.apache.flink.table.operations.utils.ValidationUtils;
 import org.apache.flink.table.planner.operations.converters.SqlNodeConverter;
-import org.apache.flink.table.planner.utils.OperationConverterUtils;
 
 import org.apache.calcite.sql.SqlIdentifier;
 
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 
 /** Abstract class for ALTER TABLE converters. */
@@ -52,31 +49,10 @@ public abstract class AbstractAlterTableConverter<T extends 
SqlAlterTable>
     protected abstract Operation convertToOperation(
             T sqlAlterTable, ResolvedCatalogTable oldTable, ConvertContext 
context);
 
-    protected final Schema getOldSchema(ResolvedCatalogTable 
resolvedCatalogTable) {
-        return resolvedCatalogTable.getUnresolvedSchema();
-    }
-
-    protected final TableDistribution getOldTableDistribution(
-            ResolvedCatalogTable resolvedCatalogTable) {
-        return resolvedCatalogTable.getDistribution().orElse(null);
-    }
-
-    protected final List<String> getOldPartitionKeys(ResolvedCatalogTable 
resolvedCatalogTable) {
-        return resolvedCatalogTable.getPartitionKeys();
-    }
-
-    protected final String getOldComment(ResolvedCatalogTable 
resolvedCatalogTable) {
-        return resolvedCatalogTable.getComment();
-    }
-
-    protected final Map<String, String> getOldOptions(ResolvedCatalogTable 
resolvedCatalogTable) {
-        return resolvedCatalogTable.getOptions();
-    }
-
     @Override
     public final Operation convertSqlNode(T sqlAlterTable, ConvertContext 
context) {
         CatalogManager catalogManager = context.getCatalogManager();
-        final ObjectIdentifier tableIdentifier = getIdentifier(sqlAlterTable, 
context);
+        final ObjectIdentifier tableIdentifier = 
resolveIdentifier(sqlAlterTable, context);
         Optional<ContextResolvedTable> optionalCatalogTable =
                 catalogManager.getTable(tableIdentifier);
 
@@ -133,22 +109,13 @@ public abstract class AbstractAlterTableConverter<T 
extends SqlAlterTable>
         return identifier.getSimple();
     }
 
-    protected final ObjectIdentifier getIdentifier(SqlAlterTable node, 
ConvertContext context) {
+    protected final ObjectIdentifier resolveIdentifier(SqlAlterTable node, 
ConvertContext context) {
         UnresolvedIdentifier unresolvedIdentifier = 
UnresolvedIdentifier.of(node.fullTableName());
         return 
context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier);
     }
 
     protected TableDistribution getTableDistribution(
             SqlAlterTable alterTable, ResolvedCatalogTable oldTable) {
-        if (alterTable instanceof SqlAlterTableSchema) {
-            final Optional<TableDistribution> tableDistribution =
-                    ((SqlAlterTableSchema) alterTable)
-                            .getDistribution()
-                            
.map(OperationConverterUtils::getDistributionFromSqlDistribution);
-            if (tableDistribution.isPresent()) {
-                return tableDistribution.get();
-            }
-        }
         return oldTable.getDistribution().orElse(null);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableDropDistributionConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableAddDistributionConverter.java
similarity index 54%
copy from 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableDropDistributionConverter.java
copy to 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableAddDistributionConverter.java
index 1e829089bc2..3a40b8cfd31 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableDropDistributionConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableAddDistributionConverter.java
@@ -18,44 +18,50 @@
 
 package org.apache.flink.table.planner.operations.converters.table;
 
+import org.apache.flink.sql.parser.ddl.SqlAlterDistribution;
+import org.apache.flink.sql.parser.ddl.SqlAlterDistribution.SqlAddDistribution;
 import org.apache.flink.sql.parser.ddl.SqlAlterTable;
-import org.apache.flink.sql.parser.ddl.SqlAlterTableDropDistribution;
 import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.TableChange;
 import org.apache.flink.table.catalog.TableDistribution;
 import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.planner.utils.OperationConverterUtils;
 
 import java.util.List;
+import java.util.Optional;
+
+/** A converter for {@link SqlAlterDistribution} for ADD call. */
+public class SqlAlterTableAddDistributionConverter
+        extends AbstractAlterTableConverter<SqlAddDistribution> {
 
-/** Convert ALTER TABLE DROP DISTRIBUTION statement. */
-public class SqlAlterTableDropDistributionConverter
-        extends AbstractAlterTableConverter<SqlAlterTableDropDistribution> {
     @Override
     protected Operation convertToOperation(
-            SqlAlterTableDropDistribution sqlAlterTable,
-            ResolvedCatalogTable resolvedCatalogTable,
+            SqlAddDistribution sqlAddDistribution,
+            ResolvedCatalogTable oldTable,
             ConvertContext context) {
-        final ObjectIdentifier tableIdentifier = getIdentifier(sqlAlterTable, 
context);
-        if (resolvedCatalogTable.getDistribution().isEmpty()) {
-            throw new ValidationException(
-                    String.format(
-                            "Table %s does not have a distribution to drop.", 
tableIdentifier));
-        }
-
-        List<TableChange> tableChanges = 
List.of(TableChange.dropDistribution());
         return buildAlterTableChangeOperation(
-                sqlAlterTable,
-                tableChanges,
-                resolvedCatalogTable.getUnresolvedSchema(),
-                resolvedCatalogTable,
+                sqlAddDistribution,
+                List.of(
+                        TableChange.add(
+                                
OperationConverterUtils.getDistributionFromSqlDistribution(
+                                        
sqlAddDistribution.getDistribution()))),
+                oldTable.getUnresolvedSchema(),
+                oldTable,
                 context.getCatalogManager());
     }
 
-    @Override
     protected TableDistribution getTableDistribution(
             SqlAlterTable alterTable, ResolvedCatalogTable oldTable) {
-        return null;
+        Optional<TableDistribution> oldDistribution = 
oldTable.getDistribution();
+        if (oldDistribution.isPresent()) {
+            throw new ValidationException(
+                    String.format(
+                            "%sThe base table has already defined the 
distribution `%s`. "
+                                    + "You can modify it or drop it before 
adding a new one.",
+                            EX_MSG_PREFIX, oldDistribution.get()));
+        }
+        return OperationConverterUtils.getDistributionFromSqlDistribution(
+                ((SqlAlterDistribution) alterTable).getDistribution());
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableDropDistributionConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableDropDistributionConverter.java
index 1e829089bc2..aef53794542 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableDropDistributionConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableDropDistributionConverter.java
@@ -37,7 +37,7 @@ public class SqlAlterTableDropDistributionConverter
             SqlAlterTableDropDistribution sqlAlterTable,
             ResolvedCatalogTable resolvedCatalogTable,
             ConvertContext context) {
-        final ObjectIdentifier tableIdentifier = getIdentifier(sqlAlterTable, 
context);
+        final ObjectIdentifier tableIdentifier = 
resolveIdentifier(sqlAlterTable, context);
         if (resolvedCatalogTable.getDistribution().isEmpty()) {
             throw new ValidationException(
                     String.format(
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableDropDistributionConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableModifyDistributionConverter.java
similarity index 55%
copy from 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableDropDistributionConverter.java
copy to 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableModifyDistributionConverter.java
index 1e829089bc2..0afa279df2e 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableDropDistributionConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableModifyDistributionConverter.java
@@ -18,44 +18,49 @@
 
 package org.apache.flink.table.planner.operations.converters.table;
 
+import org.apache.flink.sql.parser.ddl.SqlAlterDistribution;
+import 
org.apache.flink.sql.parser.ddl.SqlAlterDistribution.SqlModifyDistribution;
 import org.apache.flink.sql.parser.ddl.SqlAlterTable;
-import org.apache.flink.sql.parser.ddl.SqlAlterTableDropDistribution;
 import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.TableChange;
 import org.apache.flink.table.catalog.TableDistribution;
 import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.planner.utils.OperationConverterUtils;
 
 import java.util.List;
+import java.util.Optional;
+
+/** A converter for {@link SqlAlterDistribution} for MODIFY call. */
+public class SqlAlterTableModifyDistributionConverter
+        extends AbstractAlterTableConverter<SqlModifyDistribution> {
 
-/** Convert ALTER TABLE DROP DISTRIBUTION statement. */
-public class SqlAlterTableDropDistributionConverter
-        extends AbstractAlterTableConverter<SqlAlterTableDropDistribution> {
     @Override
     protected Operation convertToOperation(
-            SqlAlterTableDropDistribution sqlAlterTable,
-            ResolvedCatalogTable resolvedCatalogTable,
+            SqlModifyDistribution sqlModifyDistribution,
+            ResolvedCatalogTable oldTable,
             ConvertContext context) {
-        final ObjectIdentifier tableIdentifier = getIdentifier(sqlAlterTable, 
context);
-        if (resolvedCatalogTable.getDistribution().isEmpty()) {
-            throw new ValidationException(
-                    String.format(
-                            "Table %s does not have a distribution to drop.", 
tableIdentifier));
-        }
-
-        List<TableChange> tableChanges = 
List.of(TableChange.dropDistribution());
         return buildAlterTableChangeOperation(
-                sqlAlterTable,
-                tableChanges,
-                resolvedCatalogTable.getUnresolvedSchema(),
-                resolvedCatalogTable,
+                sqlModifyDistribution,
+                List.of(
+                        TableChange.modify(
+                                
OperationConverterUtils.getDistributionFromSqlDistribution(
+                                        
sqlModifyDistribution.getDistribution()))),
+                oldTable.getUnresolvedSchema(),
+                oldTable,
                 context.getCatalogManager());
     }
 
-    @Override
     protected TableDistribution getTableDistribution(
             SqlAlterTable alterTable, ResolvedCatalogTable oldTable) {
-        return null;
+        Optional<TableDistribution> oldDistribution = 
oldTable.getDistribution();
+        if (oldDistribution.isEmpty()) {
+            throw new ValidationException(
+                    String.format(
+                            "%sThe base table does not define any 
distribution. You might want to add a new one.",
+                            EX_MSG_PREFIX));
+        }
+        return OperationConverterUtils.getDistributionFromSqlDistribution(
+                ((SqlAlterDistribution) alterTable).getDistribution());
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableOptionsConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableOptionsConverter.java
index a0a8156f29a..6f86b40bd69 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableOptionsConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableOptionsConverter.java
@@ -45,7 +45,7 @@ public class SqlAlterTableOptionsConverter
             SqlAlterTableOptions alterTableOptions,
             ResolvedCatalogTable oldTable,
             ConvertContext context) {
-        final ObjectIdentifier tableIdentifier = 
getIdentifier(alterTableOptions, context);
+        final ObjectIdentifier tableIdentifier = 
resolveIdentifier(alterTableOptions, context);
         final Map<String, String> partitionKVs = 
alterTableOptions.getPartitionKVs();
         // it's altering partitions
         if (partitionKVs != null) {
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableRenameConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableRenameConverter.java
index e8048b519a7..b8fa969f381 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableRenameConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableRenameConverter.java
@@ -35,7 +35,7 @@ public class SqlAlterTableRenameConverter extends 
AbstractAlterTableConverter<Sq
             SqlAlterTableRename sqlAlterTable,
             ResolvedCatalogTable oldTable,
             ConvertContext context) {
-        final ObjectIdentifier tableIdentifier = getIdentifier(sqlAlterTable, 
context);
+        final ObjectIdentifier tableIdentifier = 
resolveIdentifier(sqlAlterTable, context);
         UnresolvedIdentifier newUnresolvedIdentifier =
                 UnresolvedIdentifier.of(sqlAlterTable.fullNewTableName());
         ObjectIdentifier newTableIdentifier =
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableResetConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableResetConverter.java
index 3046d394301..9781a0de7b4 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableResetConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableResetConverter.java
@@ -39,7 +39,7 @@ public class SqlAlterTableResetConverter extends 
AbstractAlterTableConverter<Sql
             SqlAlterTableReset alterTableReset,
             ResolvedCatalogTable oldTable,
             ConvertContext context) {
-        final ObjectIdentifier tableIdentifier = 
getIdentifier(alterTableReset, context);
+        final ObjectIdentifier tableIdentifier = 
resolveIdentifier(alterTableReset, context);
         Map<String, String> newOptions = new HashMap<>(oldTable.getOptions());
         // reset empty or 'connector' key is not allowed
         Set<String> resetKeys = alterTableReset.getResetKeys();
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableSchemaConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableSchemaConverter.java
index ffe89cca55d..51be19ad381 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableSchemaConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/SqlAlterTableSchemaConverter.java
@@ -35,7 +35,6 @@ public abstract class SqlAlterTableSchemaConverter<T extends 
SqlAlterTableSchema
         SchemaConverter converter = createSchemaConverter(alterTableSchema, 
oldTable, context);
         
converter.updateColumn(alterTableSchema.getColumnPositions().getList());
         alterTableSchema.getWatermark().ifPresent(converter::updateWatermark);
-        
alterTableSchema.getDistribution().ifPresent(converter::updateDistribution);
         
alterTableSchema.getFullConstraint().ifPresent(converter::updatePrimaryKey);
 
         return buildAlterTableChangeOperation(

Reply via email to