This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.3
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit 32de0c4d2b3db5ff916000dd0ba55e0570710b5a
Author: JingsongLi <[email protected]>
AuthorDate: Tue Jan 3 16:23:24 2023 +0800

    [FLINK-30545] Merge isNullable into logicalType in SchemaChange.addColumn
---
 .../flink/table/store/file/schema/SchemaChange.java | 21 ++++++---------------
 .../table/store/file/schema/SchemaManager.java      |  3 ++-
 .../table/store/table/SchemaEvolutionTest.java      |  2 +-
 .../flink/table/store/spark/SparkCatalog.java       |  3 +--
 4 files changed, 10 insertions(+), 19 deletions(-)

diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaChange.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaChange.java
index ebbb87fd..a5256c5b 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaChange.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaChange.java
@@ -37,12 +37,11 @@ public interface SchemaChange {
     }
 
     static SchemaChange addColumn(String fieldName, LogicalType logicalType) {
-        return addColumn(fieldName, logicalType, true, null);
+        return addColumn(fieldName, logicalType, null);
     }
 
-    static SchemaChange addColumn(
-            String fieldName, LogicalType logicalType, boolean isNullable, 
String comment) {
-        return new AddColumn(fieldName, logicalType, isNullable, comment);
+    static SchemaChange addColumn(String fieldName, LogicalType logicalType, 
String comment) {
+        return new AddColumn(fieldName, logicalType, comment);
     }
 
     static SchemaChange renameColumn(String fieldName, String newName) {
@@ -135,14 +134,11 @@ public interface SchemaChange {
     final class AddColumn implements SchemaChange {
         private final String fieldName;
         private final LogicalType logicalType;
-        private final boolean isNullable;
         private final String description;
 
-        private AddColumn(
-                String fieldName, LogicalType logicalType, boolean isNullable, 
String description) {
+        private AddColumn(String fieldName, LogicalType logicalType, String 
description) {
             this.fieldName = fieldName;
             this.logicalType = logicalType;
-            this.isNullable = isNullable;
             this.description = description;
         }
 
@@ -154,10 +150,6 @@ public interface SchemaChange {
             return logicalType;
         }
 
-        public boolean isNullable() {
-            return isNullable;
-        }
-
         @Nullable
         public String description() {
             return description;
@@ -172,15 +164,14 @@ public interface SchemaChange {
                 return false;
             }
             AddColumn addColumn = (AddColumn) o;
-            return isNullable == addColumn.isNullable
-                    && Objects.equals(fieldName, addColumn.fieldName)
+            return Objects.equals(fieldName, addColumn.fieldName)
                     && logicalType.equals(addColumn.logicalType)
                     && Objects.equals(description, addColumn.description);
         }
 
         @Override
         public int hashCode() {
-            int result = Objects.hash(logicalType, isNullable, description);
+            int result = Objects.hash(logicalType, description);
             result = 31 * result + Objects.hashCode(fieldName);
             return result;
         }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
index 5c48dfcf..9946e703 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
@@ -212,7 +212,8 @@ public class SchemaManager implements Serializable {
                                         addColumn.fieldName(), tableRoot));
                     }
                     Preconditions.checkArgument(
-                            addColumn.isNullable(), "ADD COLUMN cannot specify 
NOT NULL.");
+                            addColumn.logicalType().isNullable(),
+                            "ADD COLUMN cannot specify NOT NULL.");
                     int id = highestFieldId.incrementAndGet();
                     DataType dataType =
                             TableSchema.toDataType(addColumn.logicalType(), 
highestFieldId);
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
index 73666f63..bb174df9 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
@@ -133,7 +133,7 @@ public class SchemaEvolutionTest {
                                 schemaManager.commitChanges(
                                         Collections.singletonList(
                                                 SchemaChange.addColumn(
-                                                        "f4", new IntType(), 
false, null))))
+                                                        "f4", new 
IntType().copy(false), null))))
                 .isInstanceOf(IllegalArgumentException.class)
                 .hasMessage("ADD COLUMN cannot specify NOT NULL.");
     }
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
index c6b59b42..8c925f4f 100644
--- 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
+++ 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
@@ -268,8 +268,7 @@ public class SparkCatalog implements TableCatalog, 
SupportsNamespaces {
             validateAlterNestedField(add.fieldNames());
             return SchemaChange.addColumn(
                     add.fieldNames()[0],
-                    toFlinkType(add.dataType()),
-                    add.isNullable(),
+                    toFlinkType(add.dataType()).copy(add.isNullable()),
                     add.comment());
         } else if (change instanceof RenameColumn) {
             RenameColumn rename = (RenameColumn) change;

Reply via email to