This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d257de5ca9 [flink] Don't allow converting nullable columns and fields
to non nullable (#5630)
d257de5ca9 is described below
commit d257de5ca9d3eb567ef5e6c2407f0ff4f65a1897
Author: Ashish Khatkar <[email protected]>
AuthorDate: Wed May 21 11:18:08 2025 +0100
[flink] Don't allow converting nullable columns and fields to non nullable
(#5630)
---
.../shortcodes/generated/core_configuration.html | 6 +
.../main/java/org/apache/paimon/CoreOptions.java | 12 ++
.../org/apache/paimon/schema/SchemaManager.java | 46 ++++++-
.../org/apache/paimon/catalog/CatalogTestBase.java | 7 +
.../apache/paimon/flink/SchemaChangeITCase.java | 152 +++++++++++++++++++++
5 files changed, 217 insertions(+), 6 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 4cd943f8b2..b8fdc7b9ac 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -32,6 +32,12 @@ under the License.
<td>Boolean</td>
<td>Whether to remove the whole row in aggregation engine when -D
records are received.</td>
</tr>
+ <tr>
+ <td><h5>alter-column-null-to-not-null.disabled</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>If true, it disables altering column type from null to not
null. Default is true. Users can disable this option to explicitly convert null
column type to not null.</td>
+ </tr>
<tr>
<td><h5>async-file-write</h5></td>
<td style="word-wrap: break-word;">true</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index f3db58af6b..6473ca6240 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1721,6 +1721,14 @@ public class CoreOptions implements Serializable {
.noDefaultValue()
.withDescription("The serialized refresh handler of
materialized table.");
+ public static final ConfigOption<Boolean>
DISABLE_ALTER_COLUMN_NULL_TO_NOT_NULL =
+ ConfigOptions.key("alter-column-null-to-not-null.disabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "If true, it disables altering column type from
null to not null. Default is true. "
+ + "Users can disable this option to
explicitly convert null column type to not null.");
+
private final Options options;
public CoreOptions(Map<String, String> options) {
@@ -2232,6 +2240,10 @@ public class CoreOptions implements Serializable {
return options.get(MANIFEST_DELETE_FILE_DROP_STATS);
}
+ public boolean disableNullToNotNull() {
+ return options.get(DISABLE_ALTER_COLUMN_NULL_TO_NOT_NULL);
+ }
+
public LookupStrategy lookupStrategy() {
return LookupStrategy.from(
mergeEngine().equals(MergeEngine.FIRST_ROW),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index c31a3a616d..cf60da9cf6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -301,6 +301,13 @@ public class SchemaManager implements Serializable {
throws Catalog.ColumnAlreadyExistException,
Catalog.ColumnNotExistException {
Map<String, String> oldOptions = new
HashMap<>(oldTableSchema.options());
Map<String, String> newOptions = new
HashMap<>(oldTableSchema.options());
+ boolean disableNullToNotNull =
+ Boolean.parseBoolean(
+ oldOptions.getOrDefault(
+
CoreOptions.DISABLE_ALTER_COLUMN_NULL_TO_NOT_NULL.key(),
+
CoreOptions.DISABLE_ALTER_COLUMN_NULL_TO_NOT_NULL
+ .defaultValue()
+ .toString()));
List<DataField> newFields = new ArrayList<>(oldTableSchema.fields());
AtomicInteger highestFieldId = new
AtomicInteger(oldTableSchema.highestFieldId());
String newComment = oldTableSchema.comment();
@@ -413,6 +420,12 @@ public class SchemaManager implements Serializable {
DataType targetType = update.newDataType();
if (update.keepNullability()) {
targetType =
targetType.copy(field.type().isNullable());
+ } else {
+ assertNullabilityChange(
+ field.type().isNullable(),
+ update.newDataType().isNullable(),
+
StringUtils.join(Arrays.asList(update.fieldNames()), "."),
+ disableNullToNotNull);
}
checkState(
DataTypeCasts.supportsExplicitCast(field.type(), targetType)
@@ -435,12 +448,18 @@ public class SchemaManager implements Serializable {
updateNestedColumn(
newFields,
update.fieldNames(),
- (field) ->
- new DataField(
- field.id(),
- field.name(),
-
field.type().copy(update.newNullability()),
- field.description()));
+ (field) -> {
+ assertNullabilityChange(
+ field.type().isNullable(),
+ update.newNullability(),
+
StringUtils.join(Arrays.asList(update.fieldNames()), "."),
+ disableNullToNotNull);
+ return new DataField(
+ field.id(),
+ field.name(),
+ field.type().copy(update.newNullability()),
+ field.description());
+ });
} else if (change instanceof UpdateColumnComment) {
UpdateColumnComment update = (UpdateColumnComment) change;
updateNestedColumn(
@@ -483,6 +502,21 @@ public class SchemaManager implements Serializable {
newSchema.comment());
}
+ private void assertNullabilityChange(
+ boolean oldNullability,
+ boolean newNullability,
+ String fieldName,
+ boolean disableNullToNotNull) {
+ if (disableNullToNotNull && oldNullability && !newNullability) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Cannot update column type from nullable to non
nullable for %s. "
+ + "You can set table configuration option
'alter-column-null-to-not-null.disabled' = 'false' "
+ + "to allow converting null columns to not
null",
+ fieldName));
+ }
+ }
+
public void applyMove(List<DataField> newFields, SchemaChange.Move move) {
Map<String, Integer> map = new HashMap<>();
for (int i = 0; i < newFields.size(); i++) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index da60f47b8b..a877408e41 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -1011,6 +1011,13 @@ public abstract class CatalogTestBase {
""),
false);
+ catalog.alterTable(
+ identifier,
+ Lists.newArrayList(
+ SchemaChange.setOption(
+
CoreOptions.DISABLE_ALTER_COLUMN_NULL_TO_NOT_NULL.key(), "false")),
+ false);
+
catalog.alterTable(
identifier,
Lists.newArrayList(SchemaChange.updateColumnNullability("col1", false)),
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
index eafa3b6916..4dcdf3284c 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
@@ -700,6 +700,7 @@ public class SchemaChangeITCase extends CatalogITCaseBase {
+ " `e` FLOAT");
// Nullable -> not null
+ sql("ALTER TABLE T SET ('alter-column-null-to-not-null.disabled' =
'false')");
sql("ALTER TABLE T MODIFY c STRING NOT NULL");
result = sql("SHOW CREATE TABLE T");
assertThat(result.toString())
@@ -1199,4 +1200,155 @@ public class SchemaChangeITCase extends
CatalogITCaseBase {
},
3));
}
+
+ @ParameterizedTest()
+ @ValueSource(strings = {"orc", "avro", "parquet"})
+ public void testUpdateNullabilityPrimitiveType(String formatType) {
+ sql(
+ "CREATE TABLE T "
+ + "( k INT, v INT NOT NULL, PRIMARY KEY (k) NOT
ENFORCED ) "
+ + "WITH ( 'bucket' = '1', 'file.format' = '"
+ + formatType
+ + "' )");
+ sql("INSERT INTO T VALUES (1, 100), (2, 200)");
+ assertThat(sql("SELECT * FROM T"))
+ .containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 200));
+
+ sql("ALTER TABLE T MODIFY v INT"); // convert non nullable to nullable
+ sql("INSERT INTO T VALUES " + "(3, CAST(NULL AS INT))");
+ assertThat(sql("SELECT * FROM T"))
+ .containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 200),
Row.of(3, null));
+
+ assertThatCode(() -> sql("ALTER TABLE T MODIFY v INT NOT NULL"))
+ .hasStackTraceContaining(
+ "Cannot update column type from nullable to non
nullable for v");
+ }
+
+ @ParameterizedTest()
+ @ValueSource(strings = {"orc", "avro", "parquet"})
+ public void testUpdateNullabilityRowType(String formatType) {
+ sql(
+ "CREATE TABLE T "
+ + "( k INT, v ROW(f1 INT, f2 INT NOT NULL) NOT NULL,
PRIMARY KEY (k) NOT ENFORCED ) "
+ + "WITH ( 'bucket' = '1', 'file.format' = '"
+ + formatType
+ + "' )");
+ sql("INSERT INTO T VALUES (1, ROW(10, 100)), (2, ROW(20, 200))");
+ assertThat(sql("SELECT * FROM T"))
+ .containsExactlyInAnyOrder(Row.of(1, Row.of(10, 100)),
Row.of(2, Row.of(20, 200)));
+
+ sql("ALTER TABLE T MODIFY (v ROW(f1 INT, f2 INT) NOT NULL)"); //
convert non nullable
+ // field in row to
+ // nullable
+ sql("INSERT INTO T VALUES " + "(3, ROW(30, CAST(NULL AS INT)))");
+ assertThat(sql("SELECT * FROM T"))
+ .containsExactlyInAnyOrder(
+ Row.of(1, Row.of(10, 100)),
+ Row.of(2, Row.of(20, 200)),
+ Row.of(3, Row.of(30, null)));
+
+ assertThatCode(() -> sql("ALTER TABLE T MODIFY (v ROW(f1 INT NOT NULL,
f2 INT) NOT NULL)"))
+ .hasStackTraceContaining(
+ "Cannot update column type from nullable to non
nullable for v.f1");
+
+ sql("ALTER TABLE T MODIFY (v ROW(f1 INT, f2 INT))"); // convert entire
row to nullable
+ assertThatCode(() -> sql("ALTER TABLE T MODIFY (v ROW(f1 INT, f2 INT)
NOT NULL)"))
+ .hasStackTraceContaining(
+ "Cannot update column type from nullable to non
nullable for v");
+ }
+
+ @ParameterizedTest()
+ @ValueSource(strings = {"orc", "avro", "parquet"})
+ public void testUpdateNullabilityArrayAndMapType(String formatType) {
+ sql(
+ "CREATE TABLE T "
+ + "( k INT, v1 ARRAY<ROW(f1 INT, f2 INT) NOT NULL>, v2
MAP<INT, ROW(f1 INT, f2 INT) NOT NULL>, PRIMARY KEY (k) NOT ENFORCED ) "
+ + "WITH ( 'bucket' = '1', 'file.format' = '"
+ + formatType
+ + "' )");
+ sql(
+ "INSERT INTO T VALUES "
+ + "(1, ARRAY[ROW(10, 100), ROW(20, 200)], MAP[11,
ROW(10, 100), 12, ROW(11, 110)]), "
+ + "(2, ARRAY[ROW(30, 300), ROW(40, 400)], MAP[21,
ROW(20, 200), 22, ROW(21, 210)])");
+ Map<Integer, Row> map1 = new HashMap<>();
+ map1.put(11, Row.of(10, 100));
+ map1.put(12, Row.of(11, 110));
+
+ Map<Integer, Row> map2 = new HashMap<>();
+ map2.put(21, Row.of(20, 200));
+ map2.put(22, Row.of(21, 210));
+
+ assertThat(sql("SELECT * FROM T"))
+ .containsExactlyInAnyOrder(
+ Row.of(1, new Row[] {Row.of(10, 100), Row.of(20,
200)}, map1),
+ Row.of(2, new Row[] {Row.of(30, 300), Row.of(40,
400)}, map2));
+
+ assertThatCode(
+ () ->
+ sql(
+ "ALTER TABLE T MODIFY (v1 ARRAY<ROW(f1
INT, f2 INT) NOT NULL> NOT NULL)"))
+ .hasRootCauseMessage(
+ "Cannot update column type from nullable to non
nullable for v1. You can set table configuration option
'alter-column-null-to-not-null.disabled' = 'false' to allow converting null
columns to not null");
+ assertThatCode(
+ () ->
+ sql(
+ "ALTER TABLE T MODIFY (v1 ARRAY<ROW(f1
INT NOT NULL, f2 INT) NOT NULL>)"))
+ .hasStackTraceContaining(
+ "Cannot update column type from nullable to non
nullable for v1.element.f1");
+
+ assertThatCode(
+ () ->
+ sql(
+ "ALTER TABLE T MODIFY (v2 MAP<INT,
ROW(f1 INT, f2 INT) NOT NULL> NOT NULL)"))
+ .hasStackTraceContaining(
+ "Cannot update column type from nullable to non
nullable for v2");
+
+ assertThatCode(
+ () ->
+ sql(
+ "ALTER TABLE T MODIFY (v2 MAP<INT,
ROW(f1 INT, f2 INT NOT NULL) NOT NULL>)"))
+ .hasStackTraceContaining(
+ "Cannot update column type from nullable to non
nullable for v2.value.f2");
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"orc", "avro", "parquet"})
+ public void testUpdateNullabilityByEnablingNullToNotNullOption(String
formatType) {
+ sql(
+ "CREATE TABLE T "
+ + "( k INT, v INT, PRIMARY KEY (k) NOT ENFORCED ) "
+ + "WITH ( 'bucket' = '1', 'file.format' = '"
+ + formatType
+ + "' )");
+
+ sql("INSERT INTO T VALUES (1, 10), (2, 20)");
+ assertThat(sql("SELECT * FROM T")).containsExactlyInAnyOrder(Row.of(1,
10), Row.of(2, 20));
+
+ assertThatCode(() -> sql("ALTER TABLE T MODIFY v INT NOT NULL"))
+ .hasStackTraceContaining(
+ "Cannot update column type from nullable to non
nullable for v");
+
+ // enable null to not null option
+ sql("ALTER TABLE T SET ('alter-column-null-to-not-null.disabled' =
'false')");
+ sql("ALTER TABLE T MODIFY v INT NOT NULL");
+ assertThat(sql("SELECT * FROM T")).containsExactlyInAnyOrder(Row.of(1,
10), Row.of(2, 20));
+ }
+
+ @Test
+ public void testAlterColumnTypeWithNullabilityUpdate() {
+ sql("CREATE TABLE T ( k INT, v INT, PRIMARY KEY(k) NOT ENFORCED )");
+
+ sql("INSERT INTO T VALUES (1, 10), (2, 20)");
+ assertThat(sql("SELECT * FROM T")).containsExactlyInAnyOrder(Row.of(1,
10), Row.of(2, 20));
+
+ assertThatCode(() -> sql("ALTER TABLE T MODIFY v BIGINT NOT NULL"))
+ .hasStackTraceContaining(
+ "Cannot update column type from nullable to non
nullable for v");
+
+ // enable null to not null option
+ sql("ALTER TABLE T SET ('alter-column-null-to-not-null.disabled' =
'false')");
+ sql("ALTER TABLE T MODIFY v BIGINT NOT NULL");
+ assertThat(sql("SELECT * FROM T"))
+ .containsExactlyInAnyOrder(Row.of(1, 10L), Row.of(2, 20L));
+ }
}