This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c44c0e6974 [core] Add support for map value type and array element
type evolution (#5673)
c44c0e6974 is described below
commit c44c0e6974be3b0871521ea2238c341eb72b8db0
Author: Ashish Khatkar <[email protected]>
AuthorDate: Tue Jun 3 12:06:49 2025 +0100
[core] Add support for map value type and array element type evolution
(#5673)
---
.../org/apache/paimon/schema/SchemaManager.java | 165 +++++++++++-----
.../apache/paimon/flink/SchemaChangeITCase.java | 210 +++++++++++++++++++++
2 files changed, 333 insertions(+), 42 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index cf60da9cf6..59c3c42dd5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -70,7 +70,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Function;
+import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
@@ -344,7 +344,8 @@ public class SchemaManager implements Serializable {
new NestedColumnModifier(addColumn.fieldNames()) {
@Override
- protected void updateLastColumn(List<DataField> newFields,
String fieldName)
+ protected void updateLastColumn(
+ int depth, List<DataField> newFields, String
fieldName)
throws Catalog.ColumnAlreadyExistException {
assertColumnNotExists(newFields, fieldName);
@@ -374,7 +375,8 @@ public class SchemaManager implements Serializable {
assertNotUpdatingPrimaryKeys(oldTableSchema,
rename.fieldNames(), "rename");
new NestedColumnModifier(rename.fieldNames()) {
@Override
- protected void updateLastColumn(List<DataField> newFields,
String fieldName)
+ protected void updateLastColumn(
+ int depth, List<DataField> newFields, String
fieldName)
throws Catalog.ColumnNotExistException,
Catalog.ColumnAlreadyExistException {
assertColumnExists(newFields, fieldName);
@@ -401,7 +403,8 @@ public class SchemaManager implements Serializable {
dropColumnValidation(oldTableSchema, drop);
new NestedColumnModifier(drop.fieldNames()) {
@Override
- protected void updateLastColumn(List<DataField> newFields,
String fieldName)
+ protected void updateLastColumn(
+ int depth, List<DataField> newFields, String
fieldName)
throws Catalog.ColumnNotExistException {
assertColumnExists(newFields, fieldName);
newFields.removeIf(f -> f.name().equals(fieldName));
@@ -416,26 +419,37 @@ public class SchemaManager implements Serializable {
updateNestedColumn(
newFields,
update.fieldNames(),
- (field) -> {
- DataType targetType = update.newDataType();
+ (field, depth) -> {
+ // find the dataType at depth and update the type
for it
+ DataType sourceRootType =
+ getRootType(field.type(), depth,
update.fieldNames().length);
+ DataType targetRootType = update.newDataType();
if (update.keepNullability()) {
- targetType =
targetType.copy(field.type().isNullable());
+ targetRootType =
targetRootType.copy(sourceRootType.isNullable());
} else {
assertNullabilityChange(
- field.type().isNullable(),
- update.newDataType().isNullable(),
+ sourceRootType.isNullable(),
+ targetRootType.isNullable(),
StringUtils.join(Arrays.asList(update.fieldNames()), "."),
disableNullToNotNull);
}
checkState(
-
DataTypeCasts.supportsExplicitCast(field.type(), targetType)
- &&
CastExecutors.resolve(field.type(), targetType)
+ DataTypeCasts.supportsExplicitCast(
+ sourceRootType,
targetRootType)
+ &&
CastExecutors.resolve(sourceRootType, targetRootType)
!= null,
String.format(
"Column type %s[%s] cannot be
converted to %s without loosing information.",
- field.name(), field.type(),
targetType));
+ field.name(), sourceRootType,
targetRootType));
return new DataField(
- field.id(), field.name(), targetType,
field.description());
+ field.id(),
+ field.name(),
+ getArrayMapTypeWithTargetTypeRoot(
+ field.type(),
+ targetRootType,
+ depth,
+ update.fieldNames().length),
+ field.description());
});
} else if (change instanceof UpdateColumnNullability) {
UpdateColumnNullability update = (UpdateColumnNullability)
change;
@@ -448,16 +462,24 @@ public class SchemaManager implements Serializable {
updateNestedColumn(
newFields,
update.fieldNames(),
- (field) -> {
+ (field, depth) -> {
+ // find the DataType at depth and update that
DataTypes nullability
+ DataType sourceRootType =
+ getRootType(field.type(), depth,
update.fieldNames().length);
assertNullabilityChange(
- field.type().isNullable(),
+ sourceRootType.isNullable(),
update.newNullability(),
StringUtils.join(Arrays.asList(update.fieldNames()), "."),
disableNullToNotNull);
+ sourceRootType =
sourceRootType.copy(update.newNullability());
return new DataField(
field.id(),
field.name(),
- field.type().copy(update.newNullability()),
+ getArrayMapTypeWithTargetTypeRoot(
+ field.type(),
+ sourceRootType,
+ depth,
+ update.fieldNames().length),
field.description());
});
} else if (change instanceof UpdateColumnComment) {
@@ -465,7 +487,7 @@ public class SchemaManager implements Serializable {
updateNestedColumn(
newFields,
update.fieldNames(),
- (field) ->
+ (field, depth) ->
new DataField(
field.id(),
field.name(),
@@ -502,6 +524,58 @@ public class SchemaManager implements Serializable {
newSchema.comment());
}
+ // gets the rootType at the defined depth
+ // ex: ARRAY<MAP<STRING, ARRAY<INT>>>
+ // if we want to update ARRAY<INT> -> ARRAY<BIGINT>
+ // the maxDepth will be based on updateFieldNames
+ // which in the case will be [v, element, value, element],
+ // so maxDepth is 4 and return DataType will be INT
+ private DataType getRootType(DataType type, int currDepth, int maxDepth) {
+ if (currDepth == maxDepth - 1) {
+ return type;
+ }
+ switch (type.getTypeRoot()) {
+ case ARRAY:
+ return getRootType(((ArrayType) type).getElementType(),
currDepth + 1, maxDepth);
+ case MAP:
+ return getRootType(((MapType) type).getValueType(), currDepth
+ 1, maxDepth);
+ default:
+ return type;
+ }
+ }
+
+ // builds the targetType from source type based on the maxDepth which
needs to be updated
+ // ex: ARRAY<MAP<STRING, ARRAY<INT>>> -> ARRAY<MAP<STRING, ARRAY<BIGINT>>>
+ // here we only need to update type of ARRAY<INT> to ARRAY<BIGINT> and
rest of the type
+ // remains same. This function achieves this.
+ private DataType getArrayMapTypeWithTargetTypeRoot(
+ DataType source, DataType target, int currDepth, int maxDepth) {
+ if (currDepth == maxDepth - 1) {
+ return target;
+ }
+ switch (source.getTypeRoot()) {
+ case ARRAY:
+ return new ArrayType(
+ source.isNullable(),
+ getArrayMapTypeWithTargetTypeRoot(
+ ((ArrayType) source).getElementType(),
+ target,
+ currDepth + 1,
+ maxDepth));
+ case MAP:
+ return new MapType(
+ source.isNullable(),
+ ((MapType) source).getKeyType(),
+ getArrayMapTypeWithTargetTypeRoot(
+ ((MapType) source).getValueType(),
+ target,
+ currDepth + 1,
+ maxDepth));
+ default:
+ return target;
+ }
+ }
+
private void assertNullabilityChange(
boolean oldNullability,
boolean newNullability,
@@ -671,10 +745,22 @@ public class SchemaManager implements Serializable {
this.updateFieldNames = updateFieldNames;
}
- public void updateIntermediateColumn(List<DataField> newFields, int
depth)
+ private void updateIntermediateColumn(
+ List<DataField> newFields, List<DataField> previousFields, int
depth, int prevDepth)
throws Catalog.ColumnNotExistException,
Catalog.ColumnAlreadyExistException {
if (depth == updateFieldNames.length - 1) {
- updateLastColumn(newFields, updateFieldNames[depth]);
+ updateLastColumn(depth, newFields, updateFieldNames[depth]);
+ return;
+ } else if (depth >= updateFieldNames.length) {
+ // to handle the case of ARRAY or MAP type evolution
+ // for instance : ARRAY<INT> -> ARRAY<BIGINT>
+ // the updateFieldNames in this case is [v, element] where v
is array field name
+ // the depth returned by extractRowDataFields is 2 which will
overflow.
+ // So the logic is to go to previous depth and update the
column using previous
+ // fields which will have DataFields from prevDepth
+ // The reason for this handling is the addition of element and
value for array
+ // and map type in FlinkCatalog as dummy column name
+ updateLastColumn(prevDepth, previousFields,
updateFieldNames[prevDepth]);
return;
}
@@ -683,13 +769,10 @@ public class SchemaManager implements Serializable {
if (!field.name().equals(updateFieldNames[depth])) {
continue;
}
-
- String fullFieldName =
- String.join(".",
Arrays.asList(updateFieldNames).subList(0, depth + 1));
List<DataField> nestedFields = new ArrayList<>();
- int newDepth =
- depth + extractRowDataFields(field.type(),
fullFieldName, nestedFields);
- updateIntermediateColumn(nestedFields, newDepth);
+ int newDepth = depth + extractRowDataFields(field.type(),
nestedFields);
+ updateIntermediateColumn(nestedFields, newFields, newDepth,
depth);
+ field = newFields.get(i);
newFields.set(
i,
new DataField(
@@ -705,25 +788,23 @@ public class SchemaManager implements Serializable {
String.join(".",
Arrays.asList(updateFieldNames).subList(0, depth + 1)));
}
- private int extractRowDataFields(
- DataType type, String fullFieldName, List<DataField>
nestedFields) {
+ public void updateIntermediateColumn(List<DataField> newFields, int
depth)
+ throws Catalog.ColumnNotExistException,
Catalog.ColumnAlreadyExistException {
+ updateIntermediateColumn(newFields, newFields, depth, depth);
+ }
+
+ private int extractRowDataFields(DataType type, List<DataField>
nestedFields) {
switch (type.getTypeRoot()) {
case ROW:
nestedFields.addAll(((RowType) type).getFields());
return 1;
case ARRAY:
- return extractRowDataFields(
- ((ArrayType) type).getElementType(),
- fullFieldName,
- nestedFields)
+ return extractRowDataFields(((ArrayType)
type).getElementType(), nestedFields)
+ 1;
case MAP:
- return extractRowDataFields(
- ((MapType) type).getValueType(),
fullFieldName, nestedFields)
- + 1;
+ return extractRowDataFields(((MapType)
type).getValueType(), nestedFields) + 1;
default:
- throw new IllegalArgumentException(
- fullFieldName + " is not a structured type.");
+ return 1;
}
}
@@ -742,12 +823,12 @@ public class SchemaManager implements Serializable {
mapType.getKeyType(),
wrapNewRowType(mapType.getValueType(),
nestedFields));
default:
- throw new IllegalStateException(
- "Trying to wrap a row type in " + type + ". This
is unexpected.");
+ return type;
}
}
- protected abstract void updateLastColumn(List<DataField> newFields,
String fieldName)
+ protected abstract void updateLastColumn(
+ int depth, List<DataField> newFields, String fieldName)
throws Catalog.ColumnNotExistException,
Catalog.ColumnAlreadyExistException;
protected void assertColumnExists(List<DataField> newFields, String
fieldName)
@@ -786,11 +867,11 @@ public class SchemaManager implements Serializable {
private void updateNestedColumn(
List<DataField> newFields,
String[] updateFieldNames,
- Function<DataField, DataField> updateFunc)
+ BiFunction<DataField, Integer, DataField> updateFunc)
throws Catalog.ColumnNotExistException,
Catalog.ColumnAlreadyExistException {
new NestedColumnModifier(updateFieldNames) {
@Override
- protected void updateLastColumn(List<DataField> newFields, String
fieldName)
+ protected void updateLastColumn(int depth, List<DataField>
newFields, String fieldName)
throws Catalog.ColumnNotExistException {
for (int i = 0; i < newFields.size(); i++) {
DataField field = newFields.get(i);
@@ -798,7 +879,7 @@ public class SchemaManager implements Serializable {
continue;
}
- newFields.set(i, updateFunc.apply(field));
+ newFields.set(i, updateFunc.apply(field, depth));
return;
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
index 4dcdf3284c..03b568202e 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
@@ -1351,4 +1351,214 @@ public class SchemaChangeITCase extends
CatalogITCaseBase {
assertThat(sql("SELECT * FROM T"))
.containsExactlyInAnyOrder(Row.of(1, 10L), Row.of(2, 20L));
}
+
+ @Test
+ public void testAlterColumnTypeNestedArrayAndMap() {
+ sql("CREATE TABLE T ( k INT, v ARRAY<ARRAY<ARRAY<INT>>>, PRIMARY
KEY(k) NOT ENFORCED )");
+ sql("INSERT INTO T VALUES (1, ARRAY[ARRAY[ARRAY[1, 2]]]), (2,
ARRAY[ARRAY[ARRAY[3, 4]]])");
+ assertThat(sql("SELECT * FROM T"))
+ .containsExactlyInAnyOrder(
+ Row.of(1, new Integer[][][] {{{1, 2}}}),
+ Row.of(2, new Integer[][][] {{{3, 4}}}));
+ sql("ALTER TABLE T MODIFY v ARRAY<ARRAY<ARRAY<BIGINT>>>");
+ assertThat(sql("SELECT * FROM T"))
+ .containsExactlyInAnyOrder(
+ Row.of(1, new Long[][][] {{{1L, 2L}}}),
+ Row.of(2, new Long[][][] {{{3L, 4L}}}));
+ assertThatCode(() -> sql("ALTER TABLE T MODIFY v
ARRAY<ARRAY<ARRAY<BIGINT NOT NULL>>>"))
+ .hasStackTraceContaining(
+ "Cannot update column type from nullable to non
nullable for v.element.element.element");
+
+ sql("DROP TABLE T");
+
+ sql(
+ "CREATE TABLE T ( k INT, v MAP<STRING, MAP<STRING, MAP<STRING,
INT NOT NULL>>>, PRIMARY KEY(k) NOT ENFORCED )");
+ Map<String, Map<String, Map<String, Integer>>> mp1 = new HashMap<>();
+ Map<String, Map<String, Integer>> l1Mp1 = new HashMap<>();
+ Map<String, Integer> l2Mp1 = new HashMap<>();
+ l2Mp1.put("aaa", 1);
+ l2Mp1.put("aab", 2);
+ l1Mp1.put("aa", l2Mp1);
+ mp1.put("a", l1Mp1);
+ Map<String, Map<String, Map<String, Integer>>> mp2 = new HashMap<>();
+ Map<String, Map<String, Integer>> l1Mp2 = new HashMap<>();
+ Map<String, Integer> l2Mp2 = new HashMap<>();
+ l2Mp2.put("bbb", 3);
+ l2Mp2.put("bbc", 4);
+ l1Mp2.put("bb", l2Mp2);
+ mp2.put("b", l1Mp2);
+ sql(
+ "INSERT INTO T VALUES (1, MAP['a', MAP['aa', MAP['aaa', 1,
'aab', 2]]]), (2, MAP['b', MAP['bb', MAP['bbb', 3, 'bbc', 4]]])");
+ assertThat(sql("SELECT * FROM T"))
+ .containsExactlyInAnyOrder(Row.of(1, mp1), Row.of(2, mp2));
+ sql("ALTER TABLE T MODIFY v MAP<STRING, MAP<STRING, MAP<STRING,
BIGINT>>>");
+ Map<String, Map<String, Map<String, Long>>> mp3 = new HashMap<>();
+ Map<String, Map<String, Long>> l1Mp3 = new HashMap<>();
+ Map<String, Long> l2Mp3 = new HashMap<>();
+ l2Mp3.put("aaa", 1L);
+ l2Mp3.put("aab", 2L);
+ l1Mp3.put("aa", l2Mp3);
+ mp3.put("a", l1Mp3);
+ Map<String, Map<String, Map<String, Long>>> mp4 = new HashMap<>();
+ Map<String, Map<String, Long>> l1Mp4 = new HashMap<>();
+ Map<String, Long> l2Mp4 = new HashMap<>();
+ l2Mp4.put("bbb", 3L);
+ l2Mp4.put("bbc", 4L);
+ l1Mp4.put("bb", l2Mp4);
+ mp4.put("b", l1Mp4);
+ assertThat(sql("SELECT * FROM T"))
+ .containsExactlyInAnyOrder(Row.of(1, mp3), Row.of(2, mp4));
+ assertThatCode(
+ () ->
+ sql(
+ "ALTER TABLE T MODIFY v MAP<STRING,
MAP<STRING, MAP<STRING, BIGINT NOT NULL>>>"))
+ .hasStackTraceContaining(
+ "Cannot update column type from nullable to non
nullable for v.value.value.value");
+
+ sql("DROP TABLE T");
+
+ sql(
+ "CREATE TABLE T ( k INT, a ARRAY<INT NOT NULL>, b MAP<STRING,
INT NOT NULL>, PRIMARY KEY(k) NOT ENFORCED )");
+ sql("INSERT INTO T VALUES (1, ARRAY[1, 2, 3, 4], MAP['a', 1, 'b',
2])");
+ assertThat(sql("SELECT * FROM T"))
+ .containsExactlyInAnyOrder(
+ Row.of(
+ 1,
+ new Integer[] {1, 2, 3, 4},
+ new HashMap<String, Integer>() {
+ {
+ put("a", 1);
+ }
+
+ {
+ put("b", 2);
+ }
+ }));
+ sql("ALTER TABLE T MODIFY a ARRAY<BIGINT>");
+ sql("INSERT INTO T VALUES (2, ARRAY[5, 6, 7, CAST(NULL AS BIGINT)],
MAP['c', 3, 'd', 4])");
+ assertThat(sql("SELECT * FROM T"))
+ .containsExactlyInAnyOrder(
+ Row.of(
+ 1,
+ new Long[] {1L, 2L, 3L, 4L},
+ new HashMap<String, Integer>() {
+ {
+ put("a", 1);
+ }
+
+ {
+ put("b", 2);
+ }
+ }),
+ Row.of(
+ 2,
+ new Long[] {5L, 6L, 7L, null},
+ new HashMap<String, Integer>() {
+ {
+ put("c", 3);
+ }
+
+ {
+ put("d", 4);
+ }
+ }));
+ assertThatCode(() -> sql("ALTER TABLE T MODIFY a ARRAY<BIGINT NOT
NULL>"))
+ .hasStackTraceContaining(
+ "Cannot update column type from nullable to non
nullable for a.element");
+ sql("ALTER TABLE T MODIFY b MAP<STRING, BIGINT>");
+ sql(
+ "INSERT INTO T VALUES (2, ARRAY[5, 6, 7, CAST(NULL AS
BIGINT)], MAP['c', 3, 'd', CAST(NULL AS BIGINT)])");
+ assertThat(sql("SELECT * FROM T"))
+ .containsExactlyInAnyOrder(
+ Row.of(
+ 1,
+ new Long[] {1L, 2L, 3L, 4L},
+ new HashMap<String, Long>() {
+ {
+ put("a", 1L);
+ }
+
+ {
+ put("b", 2L);
+ }
+ }),
+ Row.of(
+ 2,
+ new Long[] {5L, 6L, 7L, null},
+ new HashMap<String, Long>() {
+ {
+ put("c", 3L);
+ }
+
+ {
+ put("d", null);
+ }
+ }));
+ assertThatCode(() -> sql("ALTER TABLE T MODIFY b MAP<STRING, BIGINT
NOT NULL>"))
+ .hasStackTraceContaining(
+ "Cannot update column type from nullable to non
nullable for b.value");
+
+ sql("DROP TABLE T");
+ sql(
+ "CREATE TABLE T ( k INT, a MAP<STRING, ARRAY<INT NOT NULL>>,
PRIMARY KEY(k) NOT ENFORCED )");
+ sql("INSERT INTO T VALUES (1, MAP['a', ARRAY[1, 2, 3]])");
+ assertThat(sql("SELECT * FROM T"))
+ .containsExactlyInAnyOrder(
+ Row.of(
+ 1,
+ new HashMap<String, Integer[]>() {
+ {
+ put("a", new Integer[] {1, 2, 3});
+ }
+ }));
+ sql("ALTER TABLE T MODIFY a MAP<STRING, ARRAY<BIGINT>>");
+ sql(
+ "INSERT INTO T VALUES(1, MAP['a', ARRAY[1, 2, 3], 'b',
ARRAY[2, 3, CAST(NULL AS BIGINT)]])");
+ assertThat(sql("SELECT * FROM T"))
+ .containsExactlyInAnyOrder(
+ Row.of(
+ 1,
+ new HashMap<String, Long[]>() {
+ {
+ put("a", new Long[] {1L, 2L, 3L});
+ }
+
+ {
+ put("b", new Long[] {2L, 3L, null});
+ }
+ }));
+ assertThatCode(() -> sql("ALTER TABLE T MODIFY a MAP<STRING,
ARRAY<BIGINT NOT NULL>>"))
+ .hasStackTraceContaining(
+ "Cannot update column type from nullable to non
nullable for a.value.element");
+
+ sql("DROP TABLE T");
+
+ sql(
+ "CREATE TABLE T ( k INT, a ROW(c1 DOUBLE, c2 ARRAY<BOOLEAN>
NOT NULL) NOT NULL, PRIMARY KEY(k) NOT ENFORCED )");
+ sql("INSERT INTO T VALUES (1, ROW(1.0, ARRAY[true, false]))");
+ assertThat(sql("SELECT * FROM T"))
+ .containsExactlyInAnyOrder(Row.of(1, Row.of(1.0, new Boolean[]
{true, false})));
+ sql("ALTER TABLE T MODIFY a ROW(c1 DOUBLE, c2 ARRAY<BOOLEAN>) NOT
NULL");
+ sql("INSERT INTO T VALUES (2, ROW(2.0, CAST(NULL AS
ARRAY<BOOLEAN>)))");
+ assertThat(sql("SELECT * FROM T"))
+ .containsExactlyInAnyOrder(
+ Row.of(1, Row.of(1.0, new Boolean[] {true, false})),
+ Row.of(2, Row.of(2.0, null)));
+ assertThatCode(
+ () ->
+ sql(
+ "ALTER TABLE T MODIFY a ROW(c1 DOUBLE,
c2 ARRAY<BOOLEAN> NOT NULL) NOT NULL"))
+ .hasStackTraceContaining(
+ "Cannot update column type from nullable to non
nullable for a.c2");
+ sql(
+ "ALTER TABLE T MODIFY a ROW(c1 DOUBLE, c2 ARRAY<BOOLEAN>, c3
ARRAY<MAP<STRING, BOOLEAN NOT NULL>>) NOT NULL");
+ sql(
+ "ALTER TABLE T MODIFY a ROW(c1 DOUBLE, c2 ARRAY<BOOLEAN>, c3
ARRAY<MAP<STRING, BOOLEAN>>) NOT NULL");
+ assertThatCode(
+ () ->
+ sql(
+ "ALTER TABLE T MODIFY a ROW(c1 DOUBLE,
c2 ARRAY<BOOLEAN>, c3 ARRAY<MAP<STRING, BOOLEAN NOT NULL>>) NOT NULL"))
+ .hasStackTraceContaining(
+ "Cannot update column type from nullable to non
nullable for a.c3.element.value");
+ }
}