This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 30b681087a1 [FLINK-38177][table] `ALTER TABLE` .. `MODIFY WATERMARK` 
might reset current distribution
30b681087a1 is described below

commit 30b681087a1b0b7e2d0abd6ec773007dbd8cda7a
Author: Sergey Nuyanzin <snuyan...@gmail.com>
AuthorDate: Thu Jul 31 19:19:15 2025 +0200

    [FLINK-38177][table] `ALTER TABLE` .. `MODIFY WATERMARK` might reset 
current distribution
---
 .../planner/operations/AlterSchemaConverter.java   |  28 ++--
 .../operations/SqlDdlToOperationConverterTest.java | 169 ++++++++++++---------
 2 files changed, 111 insertions(+), 86 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
index eea0cde882f..02480adb00a 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
@@ -979,29 +979,35 @@ public class AlterSchemaConverter {
         oldColumn.getComment().ifPresent(builder::withComment);
     }
 
+    private TableDistribution getTableDistribution(
+            SqlAlterTable alterTable, ResolvedCatalogTable oldTable) {
+        if (alterTable instanceof SqlAlterTableSchema) {
+            final Optional<TableDistribution> tableDistribution =
+                    ((SqlAlterTableSchema) alterTable)
+                            .getDistribution()
+                            
.map(OperationConverterUtils::getDistributionFromSqlDistribution);
+            if (tableDistribution.isPresent()) {
+                return tableDistribution.get();
+            }
+        }
+        return oldTable.getDistribution().orElse(null);
+    }
+
     private Operation buildAlterTableChangeOperation(
             SqlAlterTable alterTable,
             List<TableChange> tableChanges,
             Schema newSchema,
             ResolvedCatalogTable oldTable) {
+        final TableDistribution tableDistribution = 
getTableDistribution(alterTable, oldTable);
+
         CatalogTable.Builder builder =
                 CatalogTable.newBuilder()
                         .schema(newSchema)
                         .comment(oldTable.getComment())
                         .partitionKeys(oldTable.getPartitionKeys())
+                        .distribution(tableDistribution)
                         .options(oldTable.getOptions());
 
-        if (alterTable instanceof SqlAlterTableSchema) {
-            ((SqlAlterTableSchema) alterTable)
-                    .getDistribution()
-                    .ifPresent(
-                            distribution ->
-                                    builder.distribution(
-                                            OperationConverterUtils
-                                                    
.getDistributionFromSqlDistribution(
-                                                            distribution)));
-        }
-
         return new AlterTableChangeOperation(
                 catalogManager.qualifyIdentifier(
                         UnresolvedIdentifier.of(alterTable.fullTableName())),
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
index f3282fa2127..da91a54060f 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
@@ -102,10 +102,10 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test cases for the DDL statements for {@link 
SqlNodeToOperationConversion}. */
-public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversionTestBase {
+class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversionTestBase {
 
     @Test
-    public void testAlterCatalog() {
+    void testAlterCatalog() {
         // test alter catalog options
         final String sql1 = "ALTER CATALOG cat2 SET ('K1' = 'V1', 'k2' = 'v2', 
'k2' = 'v2_new')";
         final Map<String, String> expectedOptions = new HashMap<>();
@@ -160,7 +160,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testCreateDatabase() {
+    void testCreateDatabase() {
         final String[] createDatabaseSqls =
                 new String[] {
                     "create database db1",
@@ -200,7 +200,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testDropDatabase() {
+    void testDropDatabase() {
         final String[] dropDatabaseSqls =
                 new String[] {
                     "drop database db1",
@@ -225,7 +225,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testAlterDatabase() throws Exception {
+    void testAlterDatabase() throws Exception {
         catalogManager.registerCatalog("cat1", new 
GenericInMemoryCatalog("default", "default"));
         catalogManager.createDatabase(
                 "cat1", "db1", new CatalogDatabaseImpl(new HashMap<>(), 
"db1_comment"), true);
@@ -246,7 +246,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testCreateTable() {
+    void testCreateTable() {
         final String sql =
                 "CREATE TABLE tbl1 (\n"
                         + "  a bigint comment 'column a',\n"
@@ -286,7 +286,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testCreateTableWithPrimaryKey() {
+    void testCreateTableWithPrimaryKey() {
         final String sql =
                 "CREATE TABLE tbl1 (\n"
                         + "  a bigint,\n"
@@ -323,7 +323,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testPrimaryKeyOnGeneratedColumn() {
+    void testPrimaryKeyOnGeneratedColumn() {
         final String sql =
                 "CREATE TABLE tbl1 (\n"
                         + "  a bigint not null,\n"
@@ -343,7 +343,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testPrimaryKeyNonExistentColumn() {
+    void testPrimaryKeyNonExistentColumn() {
         final String sql =
                 "CREATE TABLE tbl1 (\n"
                         + "  a bigint not null,\n"
@@ -361,7 +361,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testCreateTableWithMinusInOptionKey() {
+    void testCreateTableWithMinusInOptionKey() {
         final String sql =
                 "create table source_table(\n"
                         + "  a int,\n"
@@ -394,7 +394,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testCreateTableWithWatermark()
+    void testCreateTableWithWatermark()
             throws FunctionAlreadyExistException, DatabaseNotExistException {
         CatalogFunction cf =
                 new 
CatalogFunctionImpl(JavaUserDefinedScalarFunctions.JavaFunc5.class.getName());
@@ -436,7 +436,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testBasicCreateTableLike() {
+    void testBasicCreateTableLike() {
         Map<String, String> sourceProperties = new HashMap<>();
         sourceProperties.put("format.type", "json");
         CatalogTable catalogTable =
@@ -483,7 +483,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testCreateTableLikeWithFullPath() {
+    void testCreateTableLikeWithFullPath() {
         Map<String, String> sourceProperties = new HashMap<>();
         sourceProperties.put("connector.type", "kafka");
         sourceProperties.put("format.type", "json");
@@ -516,7 +516,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testMergingCreateTableLike() {
+    void testMergingCreateTableLike() {
         Map<String, String> sourceProperties = new HashMap<>();
         sourceProperties.put("format.type", "json");
         CatalogTable catalogTable =
@@ -571,7 +571,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testMergingCreateTableLikeExcludingDistribution() {
+    void testMergingCreateTableLikeExcludingDistribution() {
         Map<String, String> sourceProperties = new HashMap<>();
         sourceProperties.put("format.type", "json");
         CatalogTable catalogTable =
@@ -630,7 +630,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testCreateTableValidDistribution() {
+    void testCreateTableValidDistribution() {
         final String sql =
                 "create table derivedTable(\n" + "  a int\n" + ")\n" + 
"DISTRIBUTED BY (a)";
         Operation operation = parseAndConvert(sql);
@@ -644,7 +644,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testCreateTableInvalidDistribution() {
+    void testCreateTableInvalidDistribution() {
         final String sql =
                 "create table derivedTable(\n" + "  a int\n" + ")\n" + 
"DISTRIBUTED BY (f3)";
 
@@ -655,7 +655,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void tesCreateTableAsWithOrderingColumns() {
+    void testCreateTableAsWithOrderingColumns() {
         CatalogTable catalogTable =
                 CatalogTable.newBuilder()
                         .schema(
@@ -689,7 +689,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testCreateTableInvalidPartition() {
+    void testCreateTableInvalidPartition() {
         final String sql =
                 "create table derivedTable(\n" + "  a int\n" + ")\n" + 
"PARTITIONED BY (f3)";
 
@@ -700,7 +700,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testCreateTableLikeInvalidPartition() {
+    void testCreateTableLikeInvalidPartition() {
         CatalogTable catalogTable =
                 CatalogTable.newBuilder()
                         .schema(Schema.newBuilder().column("f0", 
DataTypes.INT().notNull()).build())
@@ -722,7 +722,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testCreateTableInvalidWatermark() {
+    void testCreateTableInvalidWatermark() {
         final String sql =
                 "create table derivedTable(\n"
                         + "  a int,\n"
@@ -738,7 +738,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testCreateTableLikeInvalidWatermark() {
+    void testCreateTableLikeInvalidWatermark() {
         CatalogTable catalogTable =
                 CatalogTable.newBuilder()
                         .schema(Schema.newBuilder().column("f0", 
DataTypes.INT().notNull()).build())
@@ -762,7 +762,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testCreateTableLikeNestedWatermark() {
+    void testCreateTableLikeNestedWatermark() {
         CatalogTable catalogTable =
                 CatalogTable.newBuilder()
                         .schema(
@@ -795,7 +795,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test // TODO: tweak the tests when FLINK-13604 is fixed.
-    public void testCreateTableWithFullDataTypes() {
+    void testCreateTableWithFullDataTypes() {
         final List<TestItem> testItems =
                 Arrays.asList(
                         createTestItem("CHAR", DataTypes.CHAR(1)),
@@ -949,7 +949,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testCreateTableWithComputedColumn() {
+    void testCreateTableWithComputedColumn() {
         final String sql =
                 "CREATE TABLE tbl1 (\n"
                         + "  a int,\n"
@@ -1006,7 +1006,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testCreateTableWithMetadataColumn() {
+    void testCreateTableWithMetadataColumn() {
         final String sql =
                 "CREATE TABLE tbl1 (\n"
                         + "  a INT,\n"
@@ -1039,7 +1039,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testCreateFunction() {
+    void testCreateFunction() {
         // test create catalog function
         String sql =
                 "CREATE FUNCTION test_udf AS 
'org.apache.fink.function.function1' "
@@ -1080,7 +1080,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testAlterTable() throws Exception {
+    void testAlterTable() throws Exception {
         prepareTable(false);
         final String[] renameTableSqls =
                 new String[] {
@@ -1140,7 +1140,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testAlterTableRenameColumn() throws Exception {
+    void testAlterTableRenameColumn() throws Exception {
         prepareTable("tb1", false, true, 3);
         // rename pk column c
         Operation operation = parse("alter table tb1 rename c to c1");
@@ -1253,7 +1253,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
                         "Failed to execute ALTER TABLE statement.\nThe column 
`a` is used as the partition keys.");
         checkAlterNonExistTable("alter table %s nonexistent rename a to a1");
 
-        prepareTableWithDistribution("tb3");
+        prepareTableWithDistribution("tb3", false);
         // rename column used as distribution key
         assertThatThrownBy(() -> parse("alter table tb3 rename c to a1"))
                 .isInstanceOf(ValidationException.class)
@@ -1263,7 +1263,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testFailedToAlterTableDropColumn() throws Exception {
+    void testFailedToAlterTableDropColumn() throws Exception {
         prepareTable("tb1", false, true, 3);
 
         // drop a nonexistent column
@@ -1295,7 +1295,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining("The column `c` is used as the primary 
key.");
 
-        prepareTableWithDistribution("tb3");
+        prepareTableWithDistribution("tb3", false);
         assertThatThrownBy(() -> parse("alter table tb3 drop c"))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining("The column `c` is used as a 
distribution key.");
@@ -1308,7 +1308,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testAlterTableDropColumn() throws Exception {
+    void testAlterTableDropColumn() throws Exception {
         prepareTable(false);
         // drop a single column
         Operation operation = parse("alter table tb1 drop c");
@@ -1340,7 +1340,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testFailedToAlterTableDropConstraint() throws Exception {
+    void testFailedToAlterTableDropConstraint() throws Exception {
         prepareTable("tb1", 0);
         assertThatThrownBy(() -> parse("alter table tb1 drop primary key"))
                 .isInstanceOf(ValidationException.class)
@@ -1358,7 +1358,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testAlterTableDropConstraint() throws Exception {
+    void testAlterTableDropConstraint() throws Exception {
         prepareTable(true);
         String expectedSummaryString = "ALTER TABLE cat1.db1.tb1\n  DROP 
CONSTRAINT ct1";
 
@@ -1374,8 +1374,8 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testAlterTableDropDistribution() throws Exception {
-        prepareTableWithDistribution("tb1");
+    void testAlterTableDropDistribution() throws Exception {
+        prepareTableWithDistribution("tb1", false);
         String expectedSummaryString = "ALTER TABLE cat1.db1.tb1\n  DROP 
DISTRIBUTION";
 
         Operation operation = parse("alter table tb1 drop distribution");
@@ -1387,7 +1387,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testFailedToAlterTableDropDistribution() throws Exception {
+    void testFailedToAlterTableDropDistribution() throws Exception {
         prepareTable("tb1", false);
         assertThatThrownBy(() -> parse("alter table tb1 drop distribution"))
                 .isInstanceOf(ValidationException.class)
@@ -1397,7 +1397,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testFailedToAlterTableDropWatermark() throws Exception {
+    void testFailedToAlterTableDropWatermark() throws Exception {
         prepareTable("tb1", false);
         assertThatThrownBy(() -> parse("alter table tb1 drop watermark"))
                 .isInstanceOf(ValidationException.class)
@@ -1406,7 +1406,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testAlterTableDropWatermark() throws Exception {
+    void testAlterTableDropWatermark() throws Exception {
         prepareTable("tb1", true);
         Operation operation = parse("alter table tb1 drop watermark");
         assertThat(operation).isInstanceOf(AlterTableChangeOperation.class);
@@ -1421,7 +1421,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testFailedToAlterTableAddColumn() throws Exception {
+    void testFailedToAlterTableAddColumn() throws Exception {
         prepareTable("tb1", 0);
 
         // try to add a column with duplicated name
@@ -1477,7 +1477,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testAlterTableAddColumn() throws Exception {
+    void testAlterTableAddColumn() throws Exception {
         prepareTable("tb1", 0);
 
         ObjectIdentifier tableIdentifier = ObjectIdentifier.of("cat1", "db1", 
"tb1");
@@ -1591,7 +1591,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testFailedToAlterTableAddPk() throws Exception {
+    void testFailedToAlterTableAddPk() throws Exception {
         // the original table has one pk
         prepareTable("tb1", 1);
 
@@ -1667,7 +1667,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testAlterTableAddPrimaryKey() throws Exception {
+    void testAlterTableAddPrimaryKey() throws Exception {
         prepareTable("tb1", 0);
 
         ObjectIdentifier tableIdentifier = ObjectIdentifier.of("cat1", "db1", 
"tb1");
@@ -1737,7 +1737,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testFailedToAlterTableAddWatermark() throws Exception {
+    void testFailedToAlterTableAddWatermark() throws Exception {
         prepareTable("tb1", false);
 
         // add watermark with an undefined column as rowtime
@@ -1776,7 +1776,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testAlterTableAddWatermark() throws Exception {
+    void testAlterTableAddWatermark() throws Exception {
         prepareTable("tb1", false);
 
         ObjectIdentifier tableIdentifier = ObjectIdentifier.of("cat1", "db1", 
"tb1");
@@ -1859,7 +1859,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testFailedToAlterTableModifyColumn() throws Exception {
+    void testFailedToAlterTableModifyColumn() throws Exception {
         prepareTable("tb1", true);
 
         // modify duplicated column same
@@ -1934,7 +1934,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testAlterTableModifyColumn() throws Exception {
+    void testAlterTableModifyColumn() throws Exception {
         prepareTable("tb1", 2);
 
         ObjectIdentifier tableIdentifier = ObjectIdentifier.of("cat1", "db1", 
"tb1");
@@ -2085,7 +2085,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testFailedToAlterTableModifyPk() throws Exception {
+    void testFailedToAlterTableModifyPk() throws Exception {
         prepareTable("tb1", 0);
 
         // modify pk on a table without pk specified
@@ -2129,7 +2129,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testAlterTableModifyPk() throws Exception {
+    void testAlterTableModifyPk() throws Exception {
         prepareTable("tb1", 1);
 
         // test modify constraint name
@@ -2176,7 +2176,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testAlterTableAddDistribution() throws Exception {
+    void testAlterTableAddDistribution() throws Exception {
         prepareTable("tb1", false);
 
         Operation operation = parse("alter table tb1 add distribution by 
hash(a) into 12 buckets");
@@ -2189,8 +2189,8 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testFailedToAlterTableAddDistribution() throws Exception {
-        prepareTableWithDistribution("tb1");
+    void testFailedToAlterTableAddDistribution() throws Exception {
+        prepareTableWithDistribution("tb1", false);
 
         // add distribution on a table with distribution
         assertThatThrownBy(
@@ -2200,7 +2200,24 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testFailedToAlterTableModifyDistribution() throws Exception {
+    void testNonEmptyDistributionNotChangedWithOtherModify() throws Exception {
+        final CatalogTable table = prepareTableWithDistribution("tb1", true);
+
+        assertThat(table.getDistribution()).isPresent();
+        
assertThat(table.getDistribution().get().getBucketKeys()).containsExactly("c");
+
+        final Operation operation =
+                parse("ALTER TABLE tb1 MODIFY WATERMARK FOR ts AS ts - 
INTERVAL '1' MINUTE");
+
+        assertThat(operation).isInstanceOf(AlterTableChangeOperation.class);
+        Optional<TableDistribution> distribution =
+                ((AlterTableChangeOperation) 
operation).getNewTable().getDistribution();
+        assertThat(distribution).isPresent();
+        assertThat(distribution.get().getBucketKeys()).containsExactly("c");
+    }
+
+    @Test
+    void testFailedToAlterTableModifyDistribution() throws Exception {
         prepareTable("tb2", false);
 
         // modify distribution on a table without distribution
@@ -2214,8 +2231,8 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testAlterTableModifyDistribution() throws Exception {
-        prepareTableWithDistribution("tb1");
+    void testAlterTableModifyDistribution() throws Exception {
+        prepareTableWithDistribution("tb1", false);
 
         Operation operation =
                 parse("alter table tb1 modify distribution by hash(c) into 12 
buckets");
@@ -2229,7 +2246,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testFailedToAlterTableModifyWatermark() throws Exception {
+    void testFailedToAlterTableModifyWatermark() throws Exception {
         prepareTable("tb1", false);
 
         // modify watermark on a table without watermark
@@ -2264,7 +2281,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testAlterTableModifyWatermark() throws Exception {
+    void testAlterTableModifyWatermark() throws Exception {
         prepareTable("tb1", true);
 
         // modify watermark offset
@@ -2294,7 +2311,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testCreateViewWithMatchRecognize() {
+    void testCreateViewWithMatchRecognize() {
         Map<String, String> prop = new HashMap<>();
         prop.put("connector", "values");
         prop.put("bounded", "true");
@@ -2337,7 +2354,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testCreateViewWithDynamicTableOptions() {
+    void testCreateViewWithDynamicTableOptions() {
         Map<String, String> prop = new HashMap<>();
         prop.put("connector", "values");
         prop.put("bounded", "true");
@@ -2365,7 +2382,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testAlterTableAddPartitions() throws Exception {
+    void testAlterTableAddPartitions() throws Exception {
         prepareTable("tb1", true, true, 0);
 
         // test add single partition
@@ -2392,7 +2409,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testAlterTableDropPartitions() throws Exception {
+    void testAlterTableDropPartitions() throws Exception {
         prepareTable("tb1", true, true, 0);
         // test drop single partition
         Operation operation = parse("alter table tb1 drop partition (b = '1', 
c = '2')");
@@ -2411,7 +2428,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testCreateViewWithDuplicateFieldName() {
+    void testCreateViewWithDuplicateFieldName() {
         Map<String, String> prop = new HashMap<>();
         prop.put("connector", "values");
         prop.put("bounded", "true");
@@ -2488,32 +2505,33 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
         return testItem;
     }
 
-    private void prepareTable(boolean hasConstraint) throws Exception {
-        prepareTable("tb1", hasConstraint ? 1 : 0);
+    private CatalogTable prepareTable(boolean hasConstraint) throws Exception {
+        return prepareTable("tb1", hasConstraint ? 1 : 0);
     }
 
-    private void prepareTable(String tableName, int numOfPkFields) throws 
Exception {
-        prepareTable(tableName, false, false, numOfPkFields);
+    private CatalogTable prepareTable(String tableName, int numOfPkFields) 
throws Exception {
+        return prepareTable(tableName, false, false, numOfPkFields);
     }
 
-    private void prepareTable(String tableName, boolean hasWatermark) throws 
Exception {
-        prepareTable(tableName, false, hasWatermark, 0);
+    private CatalogTable prepareTable(String tableName, boolean hasWatermark) 
throws Exception {
+        return prepareTable(tableName, false, hasWatermark, 0);
     }
 
-    private void prepareTableWithDistribution(String tableName) throws 
Exception {
+    private CatalogTable prepareTableWithDistribution(String tableName, 
boolean withWatermark)
+            throws Exception {
         TableDistribution distribution =
                 TableDistribution.of(
                         TableDistribution.Kind.HASH, 6, 
Collections.singletonList("c"));
-        prepareTable(tableName, false, false, 0, distribution);
+        return prepareTable(tableName, false, withWatermark, 0, distribution);
     }
 
-    private void prepareTable(
+    private CatalogTable prepareTable(
             String tableName, boolean hasPartition, boolean hasWatermark, int 
numOfPkFields)
             throws Exception {
-        prepareTable(tableName, hasPartition, hasWatermark, numOfPkFields, 
null);
+        return prepareTable(tableName, hasPartition, hasWatermark, 
numOfPkFields, null);
     }
 
-    private void prepareTable(
+    private CatalogTable prepareTable(
             String tableName,
             boolean hasPartition,
             boolean hasWatermark,
@@ -2521,7 +2539,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
             @Nullable TableDistribution tableDistribution)
             throws Exception {
         Catalog catalog = new GenericInMemoryCatalog("default", "default");
-        if (!catalogManager.getCatalog("cat1").isPresent()) {
+        if (catalogManager.getCatalog("cat1").isEmpty()) {
             catalogManager.registerCatalog("cat1", catalog);
         }
         catalogManager.createDatabase(
@@ -2582,6 +2600,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
         catalogManager.setCurrentDatabase("db1");
         ObjectIdentifier tableIdentifier = ObjectIdentifier.of("cat1", "db1", 
tableName);
         catalogManager.createTable(catalogTable, tableIdentifier, true);
+        return catalogTable;
     }
 
     private void assertAlterTableOptions(

Reply via email to