This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.3
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit 61484f9ec335c97ab92ba624926897bbdc58a8d7
Author: yuzelin <[email protected]>
AuthorDate: Tue Jan 3 16:19:08 2023 +0800

    [FLINK-30545] Add check of 'NOT NULL' for 'ADD COLUMN' schema change
    
    This closes #449
---
 .../table/store/file/schema/SchemaManager.java     |  2 ++
 .../table/store/table/SchemaEvolutionTest.java     | 10 +++++++++
 .../store/spark/SparkSchemaEvolutionITCase.java    | 25 ++++++++++++++++++++++
 3 files changed, 37 insertions(+)

diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
index 4e6708fa..5c48dfcf 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
@@ -211,6 +211,8 @@ public class SchemaManager implements Serializable {
                                         "The column [%s] exists in the 
table[%s].",
                                         addColumn.fieldName(), tableRoot));
                     }
+                    Preconditions.checkArgument(
+                            addColumn.isNullable(), "ADD COLUMN cannot specify 
NOT NULL.");
                     int id = highestFieldId.incrementAndGet();
                     DataType dataType =
                             TableSchema.toDataType(addColumn.logicalType(), 
highestFieldId);
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
index 373c1a88..73666f63 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
@@ -126,6 +126,16 @@ public class SchemaEvolutionTest {
         // read where f3 = 3 (filter on new field)
         rows = readRecords(table, builder.equal(2, 3L));
         assertThat(rows).containsExactlyInAnyOrder(Row.of(3, 3L, 3L), 
Row.of(4, 4L, 4L));
+
+        // test add not null field
+        assertThatThrownBy(
+                        () ->
+                                schemaManager.commitChanges(
+                                        Collections.singletonList(
+                                                SchemaChange.addColumn(
+                                                        "f4", new IntType(), 
false, null))))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage("ADD COLUMN cannot specify NOT NULL.");
     }
 
     @Test
diff --git 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
index c23facfe..fb88cc88 100644
--- 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
+++ 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
@@ -79,6 +79,31 @@ public class SparkSchemaEvolutionITCase extends 
SparkReadTestBase {
         assertThat(results.toString()).isEqualTo("[[8]]");
     }
 
+    @Test
+    public void testAddNotNullColumn() throws Exception {
+        Path tablePath = new Path(warehousePath, 
"default.db/testAddNotNullColumn");
+        createTestHelper(tablePath);
+
+        List<Row> beforeAdd =
+                spark.sql("SHOW CREATE TABLE 
tablestore.default.testAddNotNullColumn")
+                        .collectAsList();
+        assertThat(beforeAdd.toString())
+                .isEqualTo(
+                        "[[CREATE TABLE testAddNotNullColumn (\n"
+                                + "  `a` INT NOT NULL,\n"
+                                + "  `b` BIGINT,\n"
+                                + "  `c` STRING)\n"
+                                + "]]");
+
+        assertThatThrownBy(
+                        () ->
+                                spark.sql(
+                                        "ALTER TABLE 
tablestore.default.testAddNotNullColumn ADD COLUMNS (d INT NOT NULL)"))
+                .isInstanceOf(RuntimeException.class)
+                .hasMessage(
+                        "java.lang.IllegalArgumentException: ADD COLUMN cannot 
specify NOT NULL.");
+    }
+
     @Test
     public void testRenameColumn() throws Exception {
         Path tablePath = new Path(warehousePath, 
"default.db/testRenameColumn");

Reply via email to