This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.3 in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
commit 61484f9ec335c97ab92ba624926897bbdc58a8d7 Author: yuzelin <[email protected]> AuthorDate: Tue Jan 3 16:19:08 2023 +0800 [FLINK-30545] Add check of 'NOT NULL' for 'ADD COLUMN' schema change This closes #449 --- .../table/store/file/schema/SchemaManager.java | 2 ++ .../table/store/table/SchemaEvolutionTest.java | 10 +++++++++ .../store/spark/SparkSchemaEvolutionITCase.java | 25 ++++++++++++++++++++++ 3 files changed, 37 insertions(+) diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java index 4e6708fa..5c48dfcf 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java @@ -211,6 +211,8 @@ public class SchemaManager implements Serializable { "The column [%s] exists in the table[%s].", addColumn.fieldName(), tableRoot)); } + Preconditions.checkArgument( + addColumn.isNullable(), "ADD COLUMN cannot specify NOT NULL."); int id = highestFieldId.incrementAndGet(); DataType dataType = TableSchema.toDataType(addColumn.logicalType(), highestFieldId); diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java index 373c1a88..73666f63 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java @@ -126,6 +126,16 @@ public class SchemaEvolutionTest { // read where f3 = 3 (filter on new field) rows = readRecords(table, builder.equal(2, 3L)); assertThat(rows).containsExactlyInAnyOrder(Row.of(3, 3L, 3L), Row.of(4, 4L, 4L)); + + // test add not null field + assertThatThrownBy( + () -> + schemaManager.commitChanges( + Collections.singletonList( + SchemaChange.addColumn( + "f4", new IntType(), false, null)))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("ADD COLUMN cannot specify NOT NULL."); } @Test diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java index c23facfe..fb88cc88 100644 --- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java +++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java @@ -79,6 +79,31 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase { assertThat(results.toString()).isEqualTo("[[8]]"); } + @Test + public void testAddNotNullColumn() throws Exception { + Path tablePath = new Path(warehousePath, "default.db/testAddNotNullColumn"); + createTestHelper(tablePath); + + List<Row> beforeAdd = + spark.sql("SHOW CREATE TABLE tablestore.default.testAddNotNullColumn") + .collectAsList(); + assertThat(beforeAdd.toString()) + .isEqualTo( + "[[CREATE TABLE testAddNotNullColumn (\n" + + " `a` INT NOT NULL,\n" + + " `b` BIGINT,\n" + + " `c` STRING)\n" + + "]]"); + + assertThatThrownBy( + () -> + spark.sql( + "ALTER TABLE tablestore.default.testAddNotNullColumn ADD COLUMNS (d INT NOT NULL)")) + .isInstanceOf(RuntimeException.class) + .hasMessage( + "java.lang.IllegalArgumentException: ADD COLUMN cannot specify NOT NULL."); + } + @Test public void testRenameColumn() throws Exception { Path tablePath = new Path(warehousePath, "default.db/testRenameColumn");
