This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.3 in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
commit 32de0c4d2b3db5ff916000dd0ba55e0570710b5a Author: JingsongLi <[email protected]> AuthorDate: Tue Jan 3 16:23:24 2023 +0800 [FLINK-30545] Merge isNullable into logicalType in SchemaChange.addColumn --- .../flink/table/store/file/schema/SchemaChange.java | 21 ++++++--------------- .../table/store/file/schema/SchemaManager.java | 3 ++- .../table/store/table/SchemaEvolutionTest.java | 2 +- .../flink/table/store/spark/SparkCatalog.java | 3 +-- 4 files changed, 10 insertions(+), 19 deletions(-) diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaChange.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaChange.java index ebbb87fd..a5256c5b 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaChange.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaChange.java @@ -37,12 +37,11 @@ public interface SchemaChange { } static SchemaChange addColumn(String fieldName, LogicalType logicalType) { - return addColumn(fieldName, logicalType, true, null); + return addColumn(fieldName, logicalType, null); } - static SchemaChange addColumn( - String fieldName, LogicalType logicalType, boolean isNullable, String comment) { - return new AddColumn(fieldName, logicalType, isNullable, comment); + static SchemaChange addColumn(String fieldName, LogicalType logicalType, String comment) { + return new AddColumn(fieldName, logicalType, comment); } static SchemaChange renameColumn(String fieldName, String newName) { @@ -135,14 +134,11 @@ public interface SchemaChange { final class AddColumn implements SchemaChange { private final String fieldName; private final LogicalType logicalType; - private final boolean isNullable; private final String description; - private AddColumn( - String fieldName, LogicalType logicalType, boolean isNullable, String description) { + private AddColumn(String fieldName, LogicalType logicalType, String description) { this.fieldName = fieldName; this.logicalType = logicalType; - this.isNullable = isNullable; this.description = description; } @@ -154,10 +150,6 @@ public interface SchemaChange { return logicalType; } - public boolean isNullable() { - return isNullable; - } - @Nullable public String description() { return description; @@ -172,15 +164,14 @@ public interface SchemaChange { return false; } AddColumn addColumn = (AddColumn) o; - return isNullable == addColumn.isNullable - && Objects.equals(fieldName, addColumn.fieldName) + return Objects.equals(fieldName, addColumn.fieldName) && logicalType.equals(addColumn.logicalType) && Objects.equals(description, addColumn.description); } @Override public int hashCode() { - int result = Objects.hash(logicalType, isNullable, description); + int result = Objects.hash(logicalType, description); result = 31 * result + Objects.hashCode(fieldName); return result; } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java index 5c48dfcf..9946e703 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java @@ -212,7 +212,8 @@ public class SchemaManager implements Serializable { addColumn.fieldName(), tableRoot)); } Preconditions.checkArgument( - addColumn.isNullable(), "ADD COLUMN cannot specify NOT NULL."); + addColumn.logicalType().isNullable(), + "ADD COLUMN cannot specify NOT NULL."); int id = highestFieldId.incrementAndGet(); DataType dataType = TableSchema.toDataType(addColumn.logicalType(), highestFieldId); diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java index 73666f63..bb174df9 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java @@ -133,7 +133,7 @@ public class SchemaEvolutionTest { schemaManager.commitChanges( Collections.singletonList( SchemaChange.addColumn( - "f4", new IntType(), false, null)))) + "f4", new IntType().copy(false), null)))) .isInstanceOf(IllegalArgumentException.class) .hasMessage("ADD COLUMN cannot specify NOT NULL."); } diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java index c6b59b42..8c925f4f 100644 --- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java +++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java @@ -268,8 +268,7 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces { validateAlterNestedField(add.fieldNames()); return SchemaChange.addColumn( add.fieldNames()[0], - toFlinkType(add.dataType()), - add.isNullable(), + toFlinkType(add.dataType()).copy(add.isNullable()), add.comment()); } else if (change instanceof RenameColumn) { RenameColumn rename = (RenameColumn) change;
