This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 30b681087a1 [FLINK-38177][table] `ALTER TABLE` .. `MODIFY WATERMARK` might reset current distribution 30b681087a1 is described below commit 30b681087a1b0b7e2d0abd6ec773007dbd8cda7a Author: Sergey Nuyanzin <snuyan...@gmail.com> AuthorDate: Thu Jul 31 19:19:15 2025 +0200 [FLINK-38177][table] `ALTER TABLE` .. `MODIFY WATERMARK` might reset current distribution --- .../planner/operations/AlterSchemaConverter.java | 28 ++-- .../operations/SqlDdlToOperationConverterTest.java | 169 ++++++++++++--------- 2 files changed, 111 insertions(+), 86 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java index eea0cde882f..02480adb00a 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java @@ -979,29 +979,35 @@ public class AlterSchemaConverter { oldColumn.getComment().ifPresent(builder::withComment); } + private TableDistribution getTableDistribution( + SqlAlterTable alterTable, ResolvedCatalogTable oldTable) { + if (alterTable instanceof SqlAlterTableSchema) { + final Optional<TableDistribution> tableDistribution = + ((SqlAlterTableSchema) alterTable) + .getDistribution() + .map(OperationConverterUtils::getDistributionFromSqlDistribution); + if (tableDistribution.isPresent()) { + return tableDistribution.get(); + } + } + return oldTable.getDistribution().orElse(null); + } + private Operation buildAlterTableChangeOperation( SqlAlterTable alterTable, List<TableChange> tableChanges, Schema newSchema, ResolvedCatalogTable oldTable) { + final TableDistribution tableDistribution = getTableDistribution(alterTable, oldTable); + CatalogTable.Builder builder = CatalogTable.newBuilder() .schema(newSchema) .comment(oldTable.getComment()) .partitionKeys(oldTable.getPartitionKeys()) + .distribution(tableDistribution) .options(oldTable.getOptions()); - if (alterTable instanceof SqlAlterTableSchema) { - ((SqlAlterTableSchema) alterTable) - .getDistribution() - .ifPresent( - distribution -> - builder.distribution( - OperationConverterUtils - .getDistributionFromSqlDistribution( - distribution))); - } - return new AlterTableChangeOperation( catalogManager.qualifyIdentifier( UnresolvedIdentifier.of(alterTable.fullTableName())), diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java index f3282fa2127..da91a54060f 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java @@ -102,10 +102,10 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test cases for the DDL statements for {@link SqlNodeToOperationConversion}. */ -public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversionTestBase { +class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversionTestBase { @Test - public void testAlterCatalog() { + void testAlterCatalog() { // test alter catalog options final String sql1 = "ALTER CATALOG cat2 SET ('K1' = 'V1', 'k2' = 'v2', 'k2' = 'v2_new')"; final Map<String, String> expectedOptions = new HashMap<>(); @@ -160,7 +160,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testCreateDatabase() { + void testCreateDatabase() { final String[] createDatabaseSqls = new String[] { "create database db1", @@ -200,7 +200,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testDropDatabase() { + void testDropDatabase() { final String[] dropDatabaseSqls = new String[] { "drop database db1", @@ -225,7 +225,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testAlterDatabase() throws Exception { + void testAlterDatabase() throws Exception { catalogManager.registerCatalog("cat1", new GenericInMemoryCatalog("default", "default")); catalogManager.createDatabase( "cat1", "db1", new CatalogDatabaseImpl(new HashMap<>(), "db1_comment"), true); @@ -246,7 +246,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testCreateTable() { + void testCreateTable() { final String sql = "CREATE TABLE tbl1 (\n" + " a bigint comment 'column a',\n" @@ -286,7 +286,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testCreateTableWithPrimaryKey() { + void testCreateTableWithPrimaryKey() { final String sql = "CREATE TABLE tbl1 (\n" + " a bigint,\n" @@ -323,7 +323,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testPrimaryKeyOnGeneratedColumn() { + void testPrimaryKeyOnGeneratedColumn() { final String sql = "CREATE TABLE tbl1 (\n" + " a bigint not null,\n" @@ -343,7 +343,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testPrimaryKeyNonExistentColumn() { + void testPrimaryKeyNonExistentColumn() { final String sql = "CREATE TABLE tbl1 (\n" + " a bigint not null,\n" @@ -361,7 +361,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testCreateTableWithMinusInOptionKey() { + void testCreateTableWithMinusInOptionKey() { final String sql = "create table source_table(\n" + " a int,\n" @@ -394,7 +394,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testCreateTableWithWatermark() + void testCreateTableWithWatermark() throws FunctionAlreadyExistException, DatabaseNotExistException { CatalogFunction cf = new CatalogFunctionImpl(JavaUserDefinedScalarFunctions.JavaFunc5.class.getName()); @@ -436,7 +436,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testBasicCreateTableLike() { + void testBasicCreateTableLike() { Map<String, String> sourceProperties = new HashMap<>(); sourceProperties.put("format.type", "json"); CatalogTable catalogTable = @@ -483,7 +483,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testCreateTableLikeWithFullPath() { + void testCreateTableLikeWithFullPath() { Map<String, String> sourceProperties = new HashMap<>(); sourceProperties.put("connector.type", "kafka"); sourceProperties.put("format.type", "json"); @@ -516,7 +516,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testMergingCreateTableLike() { + void testMergingCreateTableLike() { Map<String, String> sourceProperties = new HashMap<>(); sourceProperties.put("format.type", "json"); CatalogTable catalogTable = @@ -571,7 +571,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testMergingCreateTableLikeExcludingDistribution() { + void testMergingCreateTableLikeExcludingDistribution() { Map<String, String> sourceProperties = new HashMap<>(); sourceProperties.put("format.type", "json"); CatalogTable catalogTable = @@ -630,7 +630,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testCreateTableValidDistribution() { + void testCreateTableValidDistribution() { final String sql = "create table derivedTable(\n" + " a int\n" + ")\n" + "DISTRIBUTED BY (a)"; Operation operation = parseAndConvert(sql); @@ -644,7 +644,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testCreateTableInvalidDistribution() { + void testCreateTableInvalidDistribution() { final String sql = "create table derivedTable(\n" + " a int\n" + ")\n" + "DISTRIBUTED BY (f3)"; @@ -655,7 +655,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void tesCreateTableAsWithOrderingColumns() { + void testCreateTableAsWithOrderingColumns() { CatalogTable catalogTable = CatalogTable.newBuilder() .schema( @@ -689,7 +689,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testCreateTableInvalidPartition() { + void testCreateTableInvalidPartition() { final String sql = "create table derivedTable(\n" + " a int\n" + ")\n" + "PARTITIONED BY (f3)"; @@ -700,7 +700,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testCreateTableLikeInvalidPartition() { + void testCreateTableLikeInvalidPartition() { CatalogTable catalogTable = CatalogTable.newBuilder() .schema(Schema.newBuilder().column("f0", DataTypes.INT().notNull()).build()) @@ -722,7 +722,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testCreateTableInvalidWatermark() { + void testCreateTableInvalidWatermark() { final String sql = "create table derivedTable(\n" + " a int,\n" @@ -738,7 +738,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testCreateTableLikeInvalidWatermark() { + void testCreateTableLikeInvalidWatermark() { CatalogTable catalogTable = CatalogTable.newBuilder() .schema(Schema.newBuilder().column("f0", DataTypes.INT().notNull()).build()) @@ -762,7 +762,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testCreateTableLikeNestedWatermark() { + void testCreateTableLikeNestedWatermark() { CatalogTable catalogTable = CatalogTable.newBuilder() .schema( @@ -795,7 +795,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test // TODO: tweak the tests when FLINK-13604 is fixed. - public void testCreateTableWithFullDataTypes() { + void testCreateTableWithFullDataTypes() { final List<TestItem> testItems = Arrays.asList( createTestItem("CHAR", DataTypes.CHAR(1)), @@ -949,7 +949,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testCreateTableWithComputedColumn() { + void testCreateTableWithComputedColumn() { final String sql = "CREATE TABLE tbl1 (\n" + " a int,\n" @@ -1006,7 +1006,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testCreateTableWithMetadataColumn() { + void testCreateTableWithMetadataColumn() { final String sql = "CREATE TABLE tbl1 (\n" + " a INT,\n" @@ -1039,7 +1039,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testCreateFunction() { + void testCreateFunction() { // test create catalog function String sql = "CREATE FUNCTION test_udf AS 'org.apache.fink.function.function1' " @@ -1080,7 +1080,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testAlterTable() throws Exception { + void testAlterTable() throws Exception { prepareTable(false); final String[] renameTableSqls = new String[] { @@ -1140,7 +1140,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testAlterTableRenameColumn() throws Exception { + void testAlterTableRenameColumn() throws Exception { prepareTable("tb1", false, true, 3); // rename pk column c Operation operation = parse("alter table tb1 rename c to c1"); @@ -1253,7 +1253,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion "Failed to execute ALTER TABLE statement.\nThe column `a` is used as the partition keys."); checkAlterNonExistTable("alter table %s nonexistent rename a to a1"); - prepareTableWithDistribution("tb3"); + prepareTableWithDistribution("tb3", false); // rename column used as distribution key assertThatThrownBy(() -> parse("alter table tb3 rename c to a1")) .isInstanceOf(ValidationException.class) @@ -1263,7 +1263,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testFailedToAlterTableDropColumn() throws Exception { + void testFailedToAlterTableDropColumn() throws Exception { prepareTable("tb1", false, true, 3); // drop a nonexistent column @@ -1295,7 +1295,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion .isInstanceOf(ValidationException.class) .hasMessageContaining("The column `c` is used as the primary key."); - prepareTableWithDistribution("tb3"); + prepareTableWithDistribution("tb3", false); assertThatThrownBy(() -> parse("alter table tb3 drop c")) .isInstanceOf(ValidationException.class) .hasMessageContaining("The column `c` is used as a distribution key."); @@ -1308,7 +1308,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testAlterTableDropColumn() throws Exception { + void testAlterTableDropColumn() throws Exception { prepareTable(false); // drop a single column Operation operation = parse("alter table tb1 drop c"); @@ -1340,7 +1340,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testFailedToAlterTableDropConstraint() throws Exception { + void testFailedToAlterTableDropConstraint() throws Exception { prepareTable("tb1", 0); assertThatThrownBy(() -> parse("alter table tb1 drop primary key")) .isInstanceOf(ValidationException.class) @@ -1358,7 +1358,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testAlterTableDropConstraint() throws Exception { + void testAlterTableDropConstraint() throws Exception { prepareTable(true); String expectedSummaryString = "ALTER TABLE cat1.db1.tb1\n DROP CONSTRAINT ct1"; @@ -1374,8 +1374,8 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testAlterTableDropDistribution() throws Exception { - prepareTableWithDistribution("tb1"); + void testAlterTableDropDistribution() throws Exception { + prepareTableWithDistribution("tb1", false); String expectedSummaryString = "ALTER TABLE cat1.db1.tb1\n DROP DISTRIBUTION"; Operation operation = parse("alter table tb1 drop distribution"); @@ -1387,7 +1387,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testFailedToAlterTableDropDistribution() throws Exception { + void testFailedToAlterTableDropDistribution() throws Exception { prepareTable("tb1", false); assertThatThrownBy(() -> parse("alter table tb1 drop distribution")) .isInstanceOf(ValidationException.class) @@ -1397,7 +1397,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testFailedToAlterTableDropWatermark() throws Exception { + void testFailedToAlterTableDropWatermark() throws Exception { prepareTable("tb1", false); assertThatThrownBy(() -> parse("alter table tb1 drop watermark")) .isInstanceOf(ValidationException.class) @@ -1406,7 +1406,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testAlterTableDropWatermark() throws Exception { + void testAlterTableDropWatermark() throws Exception { prepareTable("tb1", true); Operation operation = parse("alter table tb1 drop watermark"); assertThat(operation).isInstanceOf(AlterTableChangeOperation.class); @@ -1421,7 +1421,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testFailedToAlterTableAddColumn() throws Exception { + void testFailedToAlterTableAddColumn() throws Exception { prepareTable("tb1", 0); // try to add a column with duplicated name @@ -1477,7 +1477,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testAlterTableAddColumn() throws Exception { + void testAlterTableAddColumn() throws Exception { prepareTable("tb1", 0); ObjectIdentifier tableIdentifier = ObjectIdentifier.of("cat1", "db1", "tb1"); @@ -1591,7 +1591,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testFailedToAlterTableAddPk() throws Exception { + void testFailedToAlterTableAddPk() throws Exception { // the original table has one pk prepareTable("tb1", 1); @@ -1667,7 +1667,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testAlterTableAddPrimaryKey() throws Exception { + void testAlterTableAddPrimaryKey() throws Exception { prepareTable("tb1", 0); ObjectIdentifier tableIdentifier = ObjectIdentifier.of("cat1", "db1", "tb1"); @@ -1737,7 +1737,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testFailedToAlterTableAddWatermark() throws Exception { + void testFailedToAlterTableAddWatermark() throws Exception { prepareTable("tb1", false); // add watermark with an undefined column as rowtime @@ -1776,7 +1776,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testAlterTableAddWatermark() throws Exception { + void testAlterTableAddWatermark() throws Exception { prepareTable("tb1", false); ObjectIdentifier tableIdentifier = ObjectIdentifier.of("cat1", "db1", "tb1"); @@ -1859,7 +1859,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testFailedToAlterTableModifyColumn() throws Exception { + void testFailedToAlterTableModifyColumn() throws Exception { prepareTable("tb1", true); // modify duplicated column same @@ -1934,7 +1934,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testAlterTableModifyColumn() throws Exception { + void testAlterTableModifyColumn() throws Exception { prepareTable("tb1", 2); ObjectIdentifier tableIdentifier = ObjectIdentifier.of("cat1", "db1", "tb1"); @@ -2085,7 +2085,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testFailedToAlterTableModifyPk() throws Exception { + void testFailedToAlterTableModifyPk() throws Exception { prepareTable("tb1", 0); // modify pk on a table without pk specified @@ -2129,7 +2129,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testAlterTableModifyPk() throws Exception { + void testAlterTableModifyPk() throws Exception { prepareTable("tb1", 1); // test modify constraint name @@ -2176,7 +2176,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testAlterTableAddDistribution() throws Exception { + void testAlterTableAddDistribution() throws Exception { prepareTable("tb1", false); Operation operation = parse("alter table tb1 add distribution by hash(a) into 12 buckets"); @@ -2189,8 +2189,8 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testFailedToAlterTableAddDistribution() throws Exception { - prepareTableWithDistribution("tb1"); + void testFailedToAlterTableAddDistribution() throws Exception { + prepareTableWithDistribution("tb1", false); // add distribution on a table with distribution assertThatThrownBy( @@ -2200,7 +2200,24 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testFailedToAlterTableModifyDistribution() throws Exception { + void testNonEmptyDistributionNotChangedWithOtherModify() throws Exception { + final CatalogTable table = prepareTableWithDistribution("tb1", true); + + assertThat(table.getDistribution()).isPresent(); + assertThat(table.getDistribution().get().getBucketKeys()).containsExactly("c"); + + final Operation operation = + parse("ALTER TABLE tb1 MODIFY WATERMARK FOR ts AS ts - INTERVAL '1' MINUTE"); + + assertThat(operation).isInstanceOf(AlterTableChangeOperation.class); + Optional<TableDistribution> distribution = + ((AlterTableChangeOperation) operation).getNewTable().getDistribution(); + assertThat(distribution).isPresent(); + assertThat(distribution.get().getBucketKeys()).containsExactly("c"); + } + + @Test + void testFailedToAlterTableModifyDistribution() throws Exception { prepareTable("tb2", false); // modify distribution on a table without distribution @@ -2214,8 +2231,8 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testAlterTableModifyDistribution() throws Exception { - prepareTableWithDistribution("tb1"); + void testAlterTableModifyDistribution() throws Exception { + prepareTableWithDistribution("tb1", false); Operation operation = parse("alter table tb1 modify distribution by hash(c) into 12 buckets"); @@ -2229,7 +2246,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testFailedToAlterTableModifyWatermark() throws Exception { + void testFailedToAlterTableModifyWatermark() throws Exception { prepareTable("tb1", false); // modify watermark on a table without watermark @@ -2264,7 +2281,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testAlterTableModifyWatermark() throws Exception { + void testAlterTableModifyWatermark() throws Exception { prepareTable("tb1", true); // modify watermark offset @@ -2294,7 +2311,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testCreateViewWithMatchRecognize() { + void testCreateViewWithMatchRecognize() { Map<String, String> prop = new HashMap<>(); prop.put("connector", "values"); prop.put("bounded", "true"); @@ -2337,7 +2354,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testCreateViewWithDynamicTableOptions() { + void testCreateViewWithDynamicTableOptions() { Map<String, String> prop = new HashMap<>(); prop.put("connector", "values"); prop.put("bounded", "true"); @@ -2365,7 +2382,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testAlterTableAddPartitions() throws Exception { + void testAlterTableAddPartitions() throws Exception { prepareTable("tb1", true, true, 0); // test add single partition @@ -2392,7 +2409,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testAlterTableDropPartitions() throws Exception { + void testAlterTableDropPartitions() throws Exception { prepareTable("tb1", true, true, 0); // test drop single partition Operation operation = parse("alter table tb1 drop partition (b = '1', c = '2')"); @@ -2411,7 +2428,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testCreateViewWithDuplicateFieldName() { + void testCreateViewWithDuplicateFieldName() { Map<String, String> prop = new HashMap<>(); prop.put("connector", "values"); prop.put("bounded", "true"); @@ -2488,32 +2505,33 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion return testItem; } - private void prepareTable(boolean hasConstraint) throws Exception { - prepareTable("tb1", hasConstraint ? 1 : 0); + private CatalogTable prepareTable(boolean hasConstraint) throws Exception { + return prepareTable("tb1", hasConstraint ? 1 : 0); } - private void prepareTable(String tableName, int numOfPkFields) throws Exception { - prepareTable(tableName, false, false, numOfPkFields); + private CatalogTable prepareTable(String tableName, int numOfPkFields) throws Exception { + return prepareTable(tableName, false, false, numOfPkFields); } - private void prepareTable(String tableName, boolean hasWatermark) throws Exception { - prepareTable(tableName, false, hasWatermark, 0); + private CatalogTable prepareTable(String tableName, boolean hasWatermark) throws Exception { + return prepareTable(tableName, false, hasWatermark, 0); } - private void prepareTableWithDistribution(String tableName) throws Exception { + private CatalogTable prepareTableWithDistribution(String tableName, boolean withWatermark) + throws Exception { TableDistribution distribution = TableDistribution.of( TableDistribution.Kind.HASH, 6, Collections.singletonList("c")); - prepareTable(tableName, false, false, 0, distribution); + return prepareTable(tableName, false, withWatermark, 0, distribution); } - private void prepareTable( + private CatalogTable prepareTable( String tableName, boolean hasPartition, boolean hasWatermark, int numOfPkFields) throws Exception { - prepareTable(tableName, hasPartition, hasWatermark, numOfPkFields, null); + return prepareTable(tableName, hasPartition, hasWatermark, numOfPkFields, null); } - private void prepareTable( + private CatalogTable prepareTable( String tableName, boolean hasPartition, boolean hasWatermark, @@ -2521,7 +2539,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion @Nullable TableDistribution tableDistribution) throws Exception { Catalog catalog = new GenericInMemoryCatalog("default", "default"); - if (!catalogManager.getCatalog("cat1").isPresent()) { + if (catalogManager.getCatalog("cat1").isEmpty()) { catalogManager.registerCatalog("cat1", catalog); } catalogManager.createDatabase( @@ -2582,6 +2600,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion catalogManager.setCurrentDatabase("db1"); ObjectIdentifier tableIdentifier = ObjectIdentifier.of("cat1", "db1", tableName); catalogManager.createTable(catalogTable, tableIdentifier, true); + return catalogTable; } private void assertAlterTableOptions(