This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 77b7d8d4dc [spark] Support changing column types in array<struct> or
map<?,struct> (#4618)
77b7d8d4dc is described below
commit 77b7d8d4dc11cdb5b4b86712faa1e8d37e30af86
Author: tsreaper <[email protected]>
AuthorDate: Mon Dec 2 15:41:03 2024 +0800
[spark] Support changing column types in array<struct> or map<?,struct>
(#4618)
---
.../org/apache/paimon/schema/SchemaManager.java | 23 ++-
.../apache/paimon/schema/SchemaManagerTest.java | 8 +-
.../java/org/apache/paimon/flink/FlinkCatalog.java | 10 +-
.../paimon/spark/SparkSchemaEvolutionITCase.java | 224 +++++++++++++++++++++
4 files changed, 253 insertions(+), 12 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index d827ffd0fb..83ddbccfef 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -639,9 +639,10 @@ public class SchemaManager implements Serializable {
String fullFieldName =
String.join(".",
Arrays.asList(updateFieldNames).subList(0, depth + 1));
- List<DataField> nestedFields =
- new ArrayList<>(extractRowType(field.type(),
fullFieldName).getFields());
- updateIntermediateColumn(nestedFields, depth + 1);
+ List<DataField> nestedFields = new ArrayList<>();
+ int newDepth =
+ depth + extractRowDataFields(field.type(),
fullFieldName, nestedFields);
+ updateIntermediateColumn(nestedFields, newDepth);
newFields.set(
i,
new DataField(
@@ -657,14 +658,22 @@ public class SchemaManager implements Serializable {
String.join(".",
Arrays.asList(updateFieldNames).subList(0, depth + 1)));
}
- private RowType extractRowType(DataType type, String fullFieldName) {
+ private int extractRowDataFields(
+ DataType type, String fullFieldName, List<DataField>
nestedFields) {
switch (type.getTypeRoot()) {
case ROW:
- return (RowType) type;
+ nestedFields.addAll(((RowType) type).getFields());
+ return 1;
case ARRAY:
- return extractRowType(((ArrayType) type).getElementType(),
fullFieldName);
+ return extractRowDataFields(
+ ((ArrayType) type).getElementType(),
+ fullFieldName,
+ nestedFields)
+ + 1;
case MAP:
- return extractRowType(((MapType) type).getValueType(),
fullFieldName);
+ return extractRowDataFields(
+ ((MapType) type).getValueType(),
fullFieldName, nestedFields)
+ + 1;
default:
throw new IllegalArgumentException(
fullFieldName + " is not a structured type.");
diff --git
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
index f0d6543699..c8b102b358 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
@@ -738,13 +738,15 @@ public class SchemaManagerTest {
SchemaChange addColumn =
SchemaChange.addColumn(
- new String[] {"v", "f3"},
+ new String[] {"v", "element", "value", "f3"},
DataTypes.STRING(),
null,
SchemaChange.Move.first("f3"));
- SchemaChange dropColumn = SchemaChange.dropColumn(new String[] {"v",
"f2"});
+ SchemaChange dropColumn =
+ SchemaChange.dropColumn(new String[] {"v", "element", "value",
"f2"});
SchemaChange updateColumnType =
- SchemaChange.updateColumnType(new String[] {"v", "f1"},
DataTypes.BIGINT(), false);
+ SchemaChange.updateColumnType(
+ new String[] {"v", "element", "value", "f1"},
DataTypes.BIGINT(), false);
manager.commitChanges(addColumn, dropColumn, updateColumnType);
innerType =
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 09fc0328ef..c67e79c1c0 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -756,8 +756,11 @@ public class FlinkCatalog extends AbstractCatalog {
"Column %s can only be updated to array type, and cannot
be updated to %s type",
joinedNames,
newType);
+ List<String> fullFieldNames = new ArrayList<>(fieldNames);
+ // add a dummy column name indicating the element of array
+ fullFieldNames.add("element");
generateNestedColumnUpdates(
- fieldNames,
+ fullFieldNames,
((org.apache.paimon.types.ArrayType)
oldType).getElementType(),
((org.apache.paimon.types.ArrayType)
newType).getElementType(),
schemaChanges);
@@ -775,8 +778,11 @@ public class FlinkCatalog extends AbstractCatalog {
joinedNames,
oldMapType.getKeyType(),
newMapType.getKeyType());
+ List<String> fullFieldNames = new ArrayList<>(fieldNames);
+ // add a dummy column name indicating the value of map
+ fullFieldNames.add("value");
generateNestedColumnUpdates(
- fieldNames,
+ fullFieldNames,
oldMapType.getValueType(),
newMapType.getValueType(),
schemaChanges);
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
index 771ddc6287..fb4dab38ed 100644
---
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
@@ -789,6 +789,89 @@ public class SparkSchemaEvolutionITCase extends
SparkReadTestBase {
"[5,[53,[503,500.03,5003],five]]");
}
+ @ParameterizedTest()
+ @ValueSource(strings = {"orc", "avro", "parquet"})
+ public void testAddAndDropNestedColumnInArray(String formatType) {
+ String tableName = "testAddNestedColumnTable";
+ spark.sql(
+ "CREATE TABLE paimon.default."
+ + tableName
+ + " (k INT NOT NULL, v ARRAY<STRUCT<f1: STRING, f2:
INT>>) "
+ + "TBLPROPERTIES ('bucket' = '1', 'primary-key' = 'k',
'file.format' = '"
+ + formatType
+ + "')");
+ spark.sql(
+ "INSERT INTO paimon.default."
+ + tableName
+ + " VALUES (1, ARRAY(STRUCT('apple', 100),
STRUCT('banana', 101))), "
+ + "(2, ARRAY(STRUCT('cat', 200), STRUCT('dog',
201)))");
+ assertThat(
+ spark.sql("SELECT * FROM paimon.default." +
tableName).collectAsList()
+ .stream()
+ .map(Row::toString))
+ .containsExactlyInAnyOrder(
+ "[1,WrappedArray([apple,100], [banana,101])]",
+ "[2,WrappedArray([cat,200], [dog,201])]");
+
+ spark.sql(
+ "ALTER TABLE paimon.default."
+ + tableName
+ + " ADD COLUMN v.element.f3 STRING AFTER f2");
+ spark.sql("ALTER TABLE paimon.default." + tableName + " DROP COLUMN
v.element.f1");
+ spark.sql(
+ "INSERT INTO paimon.default."
+ + tableName
+ + " VALUES (1, ARRAY(STRUCT(110, 'APPLE'), STRUCT(111,
'BANANA'))), "
+ + "(3, ARRAY(STRUCT(310, 'FLOWER')))");
+ assertThat(
+ spark.sql("SELECT * FROM paimon.default." +
tableName).collectAsList()
+ .stream()
+ .map(Row::toString))
+ .containsExactlyInAnyOrder(
+ "[1,WrappedArray([110,APPLE], [111,BANANA])]",
+ "[2,WrappedArray([200,null], [201,null])]",
+ "[3,WrappedArray([310,FLOWER])]");
+ }
+
+ @ParameterizedTest()
+ @ValueSource(strings = {"orc", "avro", "parquet"})
+ public void testAddAndDropNestedColumnInMap(String formatType) {
+ String tableName = "testAddNestedColumnTable";
+ spark.sql(
+ "CREATE TABLE paimon.default."
+ + tableName
+ + " (k INT NOT NULL, v MAP<INT, STRUCT<f1: STRING, f2:
INT>>) "
+ + "TBLPROPERTIES ('bucket' = '1', 'primary-key' = 'k',
'file.format' = '"
+ + formatType
+ + "')");
+ spark.sql(
+ "INSERT INTO paimon.default."
+ + tableName
+ + " VALUES (1, MAP(10, STRUCT('apple', 100), 20,
STRUCT('banana', 101))), "
+ + "(2, MAP(10, STRUCT('cat', 200), 20, STRUCT('dog',
201)))");
+ assertThat(
+ spark.sql("SELECT k, v[10].f1, v[10].f2 FROM
paimon.default." + tableName)
+ .collectAsList().stream()
+ .map(Row::toString))
+ .containsExactlyInAnyOrder("[1,apple,100]", "[2,cat,200]");
+
+ spark.sql(
+ "ALTER TABLE paimon.default."
+ + tableName
+ + " ADD COLUMN v.value.f3 STRING AFTER f2");
+ spark.sql("ALTER TABLE paimon.default." + tableName + " DROP COLUMN
v.value.f1");
+ spark.sql(
+ "INSERT INTO paimon.default."
+ + tableName
+ + " VALUES (1, MAP(10, STRUCT(110, 'APPLE'), 20,
STRUCT(111, 'BANANA'))), "
+ + "(3, MAP(10, STRUCT(310, 'FLOWER')))");
+ assertThat(
+ spark.sql("SELECT k, v[10].f2, v[10].f3 FROM
paimon.default." + tableName)
+ .collectAsList().stream()
+ .map(Row::toString))
+ .containsExactlyInAnyOrder("[1,110,APPLE]", "[2,200,null]",
"[3,310,FLOWER]");
+ }
+
@ParameterizedTest()
@ValueSource(strings = {"orc", "avro", "parquet"})
public void testRenameNestedColumn(String formatType) {
@@ -818,6 +901,67 @@ public class SparkSchemaEvolutionITCase extends
SparkReadTestBase {
.containsExactlyInAnyOrder("[apple,1]", "[banana,2]");
}
+ @ParameterizedTest()
+ @ValueSource(strings = {"orc", "avro", "parquet"})
+ public void testRenameNestedColumnInArray(String formatType) {
+ String tableName = "testRenameNestedColumnTable";
+ spark.sql(
+ "CREATE TABLE paimon.default."
+ + tableName
+ + " (k INT NOT NULL, v ARRAY<STRUCT<f1: STRING, f2:
INT>>) "
+ + "TBLPROPERTIES ('file.format' = '"
+ + formatType
+ + "')");
+ spark.sql(
+ "INSERT INTO paimon.default."
+ + tableName
+ + " VALUES (1, ARRAY(STRUCT('apple', 100),
STRUCT('banana', 101))), "
+ + "(2, ARRAY(STRUCT('cat', 200), STRUCT('dog',
201)))");
+ assertThat(
+ spark.sql("SELECT v[0].f1, k FROM paimon.default." +
tableName)
+ .collectAsList().stream()
+ .map(Row::toString))
+ .containsExactlyInAnyOrder("[apple,1]", "[cat,2]");
+
+ spark.sql(
+ "ALTER TABLE paimon.default." + tableName + " RENAME COLUMN
v.element.f1 to f100");
+ assertThat(
+ spark.sql("SELECT v[0].f100, k FROM paimon.default." +
tableName)
+ .collectAsList().stream()
+ .map(Row::toString))
+ .containsExactlyInAnyOrder("[apple,1]", "[cat,2]");
+ }
+
+ @ParameterizedTest()
+ @ValueSource(strings = {"orc", "avro", "parquet"})
+ public void testRenameNestedColumnInMap(String formatType) {
+ String tableName = "testRenameNestedColumnTable";
+ spark.sql(
+ "CREATE TABLE paimon.default."
+ + tableName
+ + " (k INT NOT NULL, v MAP<INT, STRUCT<f1: STRING, f2:
INT>>) "
+ + "TBLPROPERTIES ('file.format' = '"
+ + formatType
+ + "')");
+ spark.sql(
+ "INSERT INTO paimon.default."
+ + tableName
+ + " VALUES (1, MAP(10, STRUCT('apple', 100), 20,
STRUCT('banana', 101))), "
+ + "(2, MAP(10, STRUCT('cat', 200), 20, STRUCT('dog',
201)))");
+ assertThat(
+ spark.sql("SELECT v[10].f1, k FROM paimon.default." +
tableName)
+ .collectAsList().stream()
+ .map(Row::toString))
+ .containsExactlyInAnyOrder("[apple,1]", "[cat,2]");
+
+ spark.sql("ALTER TABLE paimon.default." + tableName + " RENAME COLUMN
v.value.f1 to f100");
+ assertThat(
+ spark.sql("SELECT v[10].f100, k FROM paimon.default."
+ tableName)
+ .collectAsList().stream()
+ .map(Row::toString))
+ .containsExactlyInAnyOrder("[apple,1]", "[cat,2]");
+ }
+
@ParameterizedTest()
@ValueSource(strings = {"orc", "avro", "parquet"})
public void testUpdateNestedColumnType(String formatType) {
@@ -850,4 +994,84 @@ public class SparkSchemaEvolutionITCase extends
SparkReadTestBase {
.map(Row::toString))
.containsExactlyInAnyOrder("[101,1]", "[200,2]",
"[3000000000000,3]");
}
+
+ @ParameterizedTest()
+ @ValueSource(strings = {"orc", "avro", "parquet"})
+ public void testUpdateNestedColumnTypeInArray(String formatType) {
+ String tableName = "testRenameNestedColumnTable";
+ spark.sql(
+ "CREATE TABLE paimon.default."
+ + tableName
+ + " (k INT NOT NULL, v ARRAY<STRUCT<f1: STRING, f2:
INT>>) "
+ + "TBLPROPERTIES ('bucket' = '1', 'primary-key' = 'k',
'file.format' = '"
+ + formatType
+ + "')");
+ spark.sql(
+ "INSERT INTO paimon.default."
+ + tableName
+ + " VALUES (1, ARRAY(STRUCT('apple', 100),
STRUCT('banana', 101))), "
+ + "(2, ARRAY(STRUCT('cat', 200), STRUCT('dog',
201)))");
+ assertThat(
+ spark.sql("SELECT * FROM paimon.default." +
tableName).collectAsList()
+ .stream()
+ .map(Row::toString))
+ .containsExactlyInAnyOrder(
+ "[1,WrappedArray([apple,100], [banana,101])]",
+ "[2,WrappedArray([cat,200], [dog,201])]");
+
+ spark.sql(
+ "ALTER TABLE paimon.default."
+ + tableName
+ + " CHANGE COLUMN v.element.f2 f2 BIGINT");
+ spark.sql(
+ "INSERT INTO paimon.default."
+ + tableName
+ + " VALUES (1, ARRAY(STRUCT('APPLE', 1000000000000),
STRUCT('BANANA', 111))), "
+ + "(3, ARRAY(STRUCT('FLOWER', 3000000000000)))");
+ assertThat(
+ spark.sql("SELECT * FROM paimon.default." +
tableName).collectAsList()
+ .stream()
+ .map(Row::toString))
+ .containsExactlyInAnyOrder(
+ "[1,WrappedArray([APPLE,1000000000000],
[BANANA,111])]",
+ "[2,WrappedArray([cat,200], [dog,201])]",
+ "[3,WrappedArray([FLOWER,3000000000000])]");
+ }
+
+ @ParameterizedTest()
+ @ValueSource(strings = {"orc", "avro", "parquet"})
+ public void testUpdateNestedColumnTypeInMap(String formatType) {
+ String tableName = "testRenameNestedColumnTable";
+ spark.sql(
+ "CREATE TABLE paimon.default."
+ + tableName
+ + " (k INT NOT NULL, v MAP<INT, STRUCT<f1: STRING, f2:
INT>>) "
+ + "TBLPROPERTIES ('bucket' = '1', 'primary-key' = 'k',
'file.format' = '"
+ + formatType
+ + "')");
+ spark.sql(
+ "INSERT INTO paimon.default."
+ + tableName
+ + " VALUES (1, MAP(10, STRUCT('apple', 100), 20,
STRUCT('banana', 101))), "
+ + "(2, MAP(10, STRUCT('cat', 200), 20, STRUCT('dog',
201)))");
+ assertThat(
+ spark.sql("SELECT k, v[10].f1, v[10].f2 FROM
paimon.default." + tableName)
+ .collectAsList().stream()
+ .map(Row::toString))
+ .containsExactlyInAnyOrder("[1,apple,100]", "[2,cat,200]");
+
+ spark.sql(
+ "ALTER TABLE paimon.default." + tableName + " CHANGE COLUMN
v.value.f2 f2 BIGINT");
+ spark.sql(
+ "INSERT INTO paimon.default."
+ + tableName
+ + " VALUES (1, MAP(10, STRUCT('APPLE', 1000000000000),
20, STRUCT('BANANA', 111))), "
+ + "(3, MAP(10, STRUCT('FLOWER', 3000000000000)))");
+ assertThat(
+ spark.sql("SELECT k, v[10].f1, v[10].f2 FROM
paimon.default." + tableName)
+ .collectAsList().stream()
+ .map(Row::toString))
+ .containsExactlyInAnyOrder(
+ "[1,APPLE,1000000000000]", "[2,cat,200]",
"[3,FLOWER,3000000000000]");
+ }
}