This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 78dcff71e07 [HUDI-7930] Flink Support for Array of Row and Map of Row
value (#11727)
78dcff71e07 is described below
commit 78dcff71e07257a2713ce724162b44b62fdf17ba
Author: David N Perkins <[email protected]>
AuthorDate: Mon Oct 7 01:16:13 2024 -0400
[HUDI-7930] Flink Support for Array of Row and Map of Row value (#11727)
* HUDI-7930 add unit test to reproduce issue
* HUDI-7930 add avro schema for array of rows data
* HUDI initial implementation
* HUDI-7930 implemented Row value type for Maps
* GH-7930 fixed insert and bulk insert to match upsert schema
* GH-7930 fix other tests
* GH-7930 fixed missing row field bug
* GH-7930 fix TestParquetSchemaConverter
* GH-7930 remove TestHoodieTableSource tests and associated resources
* fix the api compatibility
* fix the null values for map type and schema evolution for row data type
* Add comment for the nested row in array check
* GH-7930 add schema evolution tests
---------
Co-authored-by: danny0405 <[email protected]>
---
.../storage/row/parquet/ParquetRowDataWriter.java | 29 ++-
.../row/parquet/ParquetSchemaConverter.java | 18 +-
.../row/parquet/TestParquetSchemaConverter.java | 2 +-
.../apache/hudi/table/ITTestHoodieDataSource.java | 182 +++++++++-------
.../apache/hudi/table/ITTestSchemaEvolution.java | 162 ++++++++++-----
.../org/apache/hudi/utils/TestConfigurations.java | 22 ++
.../test/java/org/apache/hudi/utils/TestSQL.java | 5 +
.../table/format/cow/ParquetSplitReaderUtil.java | 229 +++++++++++++++------
.../format/cow/vector/ColumnarGroupArrayData.java | 179 ++++++++++++++++
.../format/cow/vector/ColumnarGroupMapData.java | 63 ++++++
.../format/cow/vector/ColumnarGroupRowData.java | 138 +++++++++++++
.../cow/vector/HeapArrayGroupColumnVector.java | 53 +++++
.../format/cow/vector/HeapMapColumnVector.java | 44 +---
.../format/cow/vector/reader/ArrayGroupReader.java | 44 ++++
.../format/cow/vector/reader/MapColumnReader.java | 38 +---
15 files changed, 935 insertions(+), 273 deletions(-)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java
index e5b9509d879..62ea526c275 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java
@@ -506,18 +506,27 @@ public class ParquetRowDataWriter {
private void doWrite(ArrayData arrayData) {
recordConsumer.startGroup();
if (arrayData.size() > 0) {
- final String repeatedGroup = "list";
- final String elementField = "element";
+ final String repeatedGroup = "array";
recordConsumer.startField(repeatedGroup, 0);
- for (int i = 0; i < arrayData.size(); i++) {
- recordConsumer.startGroup();
- if (!arrayData.isNullAt(i)) {
- // Only creates the element field if the current array element is
not null.
- recordConsumer.startField(elementField, 0);
- elementWriter.write(arrayData, i);
- recordConsumer.endField(elementField, 0);
+ if (elementWriter instanceof RowWriter) {
+ for (int i = 0; i < arrayData.size(); i++) {
+ if (!arrayData.isNullAt(i)) {
+ // Only creates the element field if the current array element
is not null.
+ elementWriter.write(arrayData, i);
+ }
+ }
+ } else {
+ final String elementField = "element";
+ for (int i = 0; i < arrayData.size(); i++) {
+ recordConsumer.startGroup();
+ if (!arrayData.isNullAt(i)) {
+ // Only creates the element field if the current array element
is not null.
+ recordConsumer.startField(elementField, 0);
+ elementWriter.write(arrayData, i);
+ recordConsumer.endField(elementField, 0);
+ }
+ recordConsumer.endGroup();
}
- recordConsumer.endGroup();
}
recordConsumer.endField(repeatedGroup, 0);
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
index 8bd1a848855..b4b425f383c 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
@@ -622,18 +623,25 @@ public class ParquetSchemaConverter {
}
case ARRAY:
// <list-repetition> group <name> (LIST) {
- // repeated group list {
+ // repeated group array {
// <element-repetition> <element-type> element;
// }
// }
ArrayType arrayType = (ArrayType) type;
LogicalType elementType = arrayType.getElementType();
+
+ Types.GroupBuilder<GroupType> arrayGroupBuilder =
Types.repeatedGroup();
+ if (elementType.getTypeRoot() == LogicalTypeRoot.ROW) {
+ RowType rowType = (RowType) elementType;
+ rowType.getFields().forEach(field ->
+
arrayGroupBuilder.addField(convertToParquetType(field.getName(),
field.getType(), repetition)));
+ } else {
+ arrayGroupBuilder.addField(convertToParquetType("element",
elementType, repetition));
+ }
+
return Types
.buildGroup(repetition).as(OriginalType.LIST)
- .addField(
- Types.repeatedGroup()
- .addField(convertToParquetType("element", elementType,
repetition))
- .named("list"))
+ .addField(arrayGroupBuilder.named("array"))
.named(name);
case MAP:
// <map-repetition> group <name> (MAP) {
diff --git
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
index 9e07edbd4ca..864abeed86a 100644
---
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
+++
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
@@ -50,7 +50,7 @@ public class TestParquetSchemaConverter {
assertThat(messageType.getColumns().size(), is(7));
final String expected = "message converted {\n"
+ " optional group f_array (LIST) {\n"
- + " repeated group list {\n"
+ + " repeated group array {\n"
+ " optional binary element (STRING);\n"
+ " }\n"
+ " }\n"
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index e0147f6ed8a..77b66ff974e 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -371,9 +371,9 @@ public class ITTestHoodieDataSource {
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.OPERATION, "insert")
.option(FlinkOptions.READ_AS_STREAMING, true)
- .option(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED,true)
+ .option(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true)
.option(FlinkOptions.CLUSTERING_ASYNC_ENABLED, true)
- .option(FlinkOptions.CLUSTERING_DELTA_COMMITS,1)
+ .option(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1)
.option(FlinkOptions.CLUSTERING_TASKS, 1)
.end();
streamTableEnv.executeSql(hoodieTableDDL);
@@ -383,7 +383,7 @@ public class ITTestHoodieDataSource {
String instant = TestUtils.getNthCompleteInstant(new
StoragePath(tempFile.toURI()), 2, HoodieTimeline.COMMIT_ACTION);
streamTableEnv.getConfig().getConfiguration()
- .setBoolean("table.dynamic-table-options.enabled", true);
+ .setBoolean("table.dynamic-table-options.enabled", true);
final String query = String.format("select * from t1/*+
options('read.start-commit'='%s')*/", instant);
List<Row> rows = execSelectSql(streamTableEnv, query, 10);
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
@@ -398,9 +398,9 @@ public class ITTestHoodieDataSource {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.OPERATION, "insert")
- .option(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED,true)
+ .option(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true)
.option(FlinkOptions.CLUSTERING_ASYNC_ENABLED, true)
- .option(FlinkOptions.CLUSTERING_DELTA_COMMITS,2)
+ .option(FlinkOptions.CLUSTERING_DELTA_COMMITS, 2)
.option(FlinkOptions.CLUSTERING_TASKS, 1)
.option(FlinkOptions.CLEAN_RETAIN_COMMITS, 1)
.end();
@@ -409,9 +409,9 @@ public class ITTestHoodieDataSource {
execInsertSql(streamTableEnv, insertInto);
streamTableEnv.getConfig().getConfiguration()
- .setBoolean("table.dynamic-table-options.enabled", true);
+ .setBoolean("table.dynamic-table-options.enabled", true);
final String query = String.format("select * from t1/*+
options('read.start-commit'='%s')*/",
- FlinkOptions.START_COMMIT_EARLIEST);
+ FlinkOptions.START_COMMIT_EARLIEST);
List<Row> rows = execSelectSql(streamTableEnv, query, 10);
// batch read will not lose data when cleaned clustered files.
@@ -450,12 +450,12 @@ public class ITTestHoodieDataSource {
@Test
void testBatchWriteWithCleaning() {
String hoodieTableDDL = sql("t1")
- .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
- .option(FlinkOptions.CLEAN_RETAIN_COMMITS, 1)
- .end();
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .option(FlinkOptions.CLEAN_RETAIN_COMMITS, 1)
+ .end();
batchTableEnv.executeSql(hoodieTableDDL);
String insertInto = "insert into t1 values\n"
- + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')";
+ + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')";
execInsertSql(batchTableEnv, insertInto);
execInsertSql(batchTableEnv, insertInto);
execInsertSql(batchTableEnv, insertInto);
@@ -466,7 +466,7 @@ public class ITTestHoodieDataSource {
HoodieTimeline timeline =
StreamerUtil.createMetaClient(conf).getActiveTimeline();
assertTrue(timeline.filterCompletedInstants()
.getInstants().stream().anyMatch(instant ->
instant.getAction().equals("clean")),
- "some commits should be cleaned");
+ "some commits should be cleaned");
}
@Test
@@ -1621,12 +1621,44 @@ public class ITTestHoodieDataSource {
List<Row> result = CollectionUtil.iterableToList(
() -> tableEnv.sqlQuery("select * from t1").execute().collect());
List<Row> expected = Arrays.asList(
- row(1, array("abc1", "def1"), array(1, 1), map("abc1", 1, "def1", 3),
row(array("abc1", "def1"), row(1, "abc1"))),
- row(2, array("abc2", "def2"), array(2, 2), map("abc2", 1, "def2", 3),
row(array("abc2", "def2"), row(2, "abc2"))),
- row(3, array("abc3", "def3"), array(3, 3), map("abc3", 1, "def3", 3),
row(array("abc3", "def3"), row(3, "abc3"))));
+ row(1, array("abc1", "def1"), array(1, 1), map("abc1", 1, "def1", 3),
row(array("abc1", "def1"), row(1, "abc1"))),
+ row(2, array("abc2", "def2"), array(2, 2), map("abc2", 1, "def2", 3),
row(array("abc2", "def2"), row(2, "abc2"))),
+ row(3, array("abc3", "def3"), array(3, 3), map("abc3", 1, "def3", 3),
row(array("abc3", "def3"), row(3, "abc3"))));
assertRowsEqualsUnordered(result, expected);
}
+ @ParameterizedTest
+ @ValueSource(strings = {"insert", "upsert", "bulk_insert"})
+ void testParquetArrayMapOfRowTypes(String operation) {
+ TableEnvironment tableEnv = batchTableEnv;
+
+ String hoodieTableDDL = sql("t1")
+ .field("f_int int")
+ .field("f_array array<row(f_array_row_f0 varchar(10), f_array_row_f1
int)>")
+ .field("f_map map<varchar(20), row(f_map_row_f0 int, f_map_row_f1
varchar(10))>")
+ .pkField("f_int")
+ .noPartition()
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .option(FlinkOptions.OPERATION, operation)
+ .end();
+ tableEnv.executeSql(hoodieTableDDL);
+
+ execInsertSql(tableEnv, TestSQL.ARRAY_MAP_OF_ROW_TYPE_INSERT_T1);
+
+ tableEnv.executeSql("ALTER TABLE t1 MODIFY (\n"
+ + " f_array array<row(f_array_row_f0 varchar(10), f_array_row_f1
int, f_array_row_f2 double)>,\n"
+ + " f_map map<varchar(20), row(f_map_row_f0 int, f_map_row_f1
varchar(10), f_map_row_f2 double)>\n"
+ + ");");
+
+ List<Row> result = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+ List<Row> expected = Arrays.asList(
+ row(1, array(row("abc11", 11, null), row("abc12", 12, null),
row("abc13", 13, null)), map("abc11", row(11, "def11", null), "abc12", row(12,
"def12", null), "abc13", row(13, "def13", null))),
+ row(2, array(row("abc21", 21, null), row("abc22", 22, null),
row("abc23", 23, null)), map("abc21", row(21, "def21", null), "abc22", row(22,
"def22", null), "abc23", row(23, "def23", null))),
+ row(3, array(row("abc31", 31, null), row("abc32", 32, null),
row("abc33", 33, null)), map("abc31", row(31, "def31", null), "abc32", row(32,
"def32", null), "abc33", row(33, "def33", null))));
+ assertRowsEqualsUnordered(expected, result);
+ }
+
@ParameterizedTest
@ValueSource(strings = {"insert", "upsert", "bulk_insert"})
void testParquetNullChildColumnsRowTypes(String operation) {
@@ -2026,18 +2058,18 @@ public class ITTestHoodieDataSource {
void testReadMetaFields(HoodieTableType tableType, String queryType, int
numInsertBatches, int compactionDeltaCommits) throws Exception {
String path = tempFile.getAbsolutePath();
String hoodieTableDDL = sql("t1")
- .field("id int")
- .field("name varchar(10)")
- .field("ts timestamp(6)")
- .field("`partition` varchar(10)")
- .pkField("id")
- .partitionField("partition")
- .option(FlinkOptions.TABLE_TYPE, tableType)
- .option(FlinkOptions.QUERY_TYPE, queryType)
- .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
- .option(FlinkOptions.COMPACTION_DELTA_COMMITS,
compactionDeltaCommits)
- .option(FlinkOptions.PATH, path)
- .end();
+ .field("id int")
+ .field("name varchar(10)")
+ .field("ts timestamp(6)")
+ .field("`partition` varchar(10)")
+ .pkField("id")
+ .partitionField("partition")
+ .option(FlinkOptions.TABLE_TYPE, tableType)
+ .option(FlinkOptions.QUERY_TYPE, queryType)
+ .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
+ .option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits)
+ .option(FlinkOptions.PATH, path)
+ .end();
batchTableEnv.executeSql(hoodieTableDDL);
final String[] insertInto = new String[] {
@@ -2076,7 +2108,7 @@ public class ITTestHoodieDataSource {
for (int i = 0; i < numInsertBatches; i++) {
execInsertSql(batchTableEnv, insertInto[i]);
String commitTime = tableType.equals(HoodieTableType.MERGE_ON_READ)
- ? TestUtils.getLastDeltaCompleteInstant(path) :
TestUtils.getLastCompleteInstant(path);
+ ? TestUtils.getLastDeltaCompleteInstant(path) :
TestUtils.getLastCompleteInstant(path);
expected1.append(template1[i]);
expected2.append(String.format(template2[i], commitTime));
expected3.append(String.format(template3[i], commitTime));
@@ -2087,18 +2119,18 @@ public class ITTestHoodieDataSource {
String readHoodieTableDDL;
batchTableEnv.executeSql("drop table t1");
readHoodieTableDDL = sql("t1")
- .field("id int")
- .field("name varchar(10)")
- .field("ts timestamp(6)")
- .field("`partition` varchar(10)")
- .pkField("id")
- .partitionField("partition")
- .option(FlinkOptions.TABLE_TYPE, tableType)
- .option(FlinkOptions.QUERY_TYPE, queryType)
- .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
- .option(FlinkOptions.COMPACTION_DELTA_COMMITS,
compactionDeltaCommits)
- .option(FlinkOptions.PATH, path)
- .end();
+ .field("id int")
+ .field("name varchar(10)")
+ .field("ts timestamp(6)")
+ .field("`partition` varchar(10)")
+ .pkField("id")
+ .partitionField("partition")
+ .option(FlinkOptions.TABLE_TYPE, tableType)
+ .option(FlinkOptions.QUERY_TYPE, queryType)
+ .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
+ .option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits)
+ .option(FlinkOptions.PATH, path)
+ .end();
batchTableEnv.executeSql(readHoodieTableDDL);
List<Row> result = execSelectSql(batchTableEnv, "select * from t1",
ExecMode.BATCH);
@@ -2106,21 +2138,21 @@ public class ITTestHoodieDataSource {
batchTableEnv.executeSql("drop table t1");
readHoodieTableDDL = sql("t1")
- .field("_hoodie_commit_time string")
- .field("_hoodie_record_key string")
- .field("_hoodie_partition_path string")
- .field("id int")
- .field("name varchar(10)")
- .field("ts timestamp(6)")
- .field("`partition` varchar(10)")
- .pkField("id")
- .partitionField("partition")
- .option(FlinkOptions.TABLE_TYPE, tableType)
- .option(FlinkOptions.QUERY_TYPE, queryType)
- .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
- .option(FlinkOptions.COMPACTION_DELTA_COMMITS,
compactionDeltaCommits)
- .option(FlinkOptions.PATH, path)
- .end();
+ .field("_hoodie_commit_time string")
+ .field("_hoodie_record_key string")
+ .field("_hoodie_partition_path string")
+ .field("id int")
+ .field("name varchar(10)")
+ .field("ts timestamp(6)")
+ .field("`partition` varchar(10)")
+ .pkField("id")
+ .partitionField("partition")
+ .option(FlinkOptions.TABLE_TYPE, tableType)
+ .option(FlinkOptions.QUERY_TYPE, queryType)
+ .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
+ .option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits)
+ .option(FlinkOptions.PATH, path)
+ .end();
batchTableEnv.executeSql(readHoodieTableDDL);
result = execSelectSql(batchTableEnv, "select * from t1", ExecMode.BATCH);
@@ -2128,21 +2160,21 @@ public class ITTestHoodieDataSource {
batchTableEnv.executeSql("drop table t1");
readHoodieTableDDL = sql("t1")
- .field("id int")
- .field("_hoodie_commit_time string")
- .field("name varchar(10)")
- .field("_hoodie_record_key string")
- .field("ts timestamp(6)")
- .field("_hoodie_partition_path string")
- .field("`partition` varchar(10)")
- .pkField("id")
- .partitionField("partition")
- .option(FlinkOptions.TABLE_TYPE, tableType)
- .option(FlinkOptions.QUERY_TYPE, queryType)
- .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
- .option(FlinkOptions.COMPACTION_DELTA_COMMITS,
compactionDeltaCommits)
- .option(FlinkOptions.PATH, path)
- .end();
+ .field("id int")
+ .field("_hoodie_commit_time string")
+ .field("name varchar(10)")
+ .field("_hoodie_record_key string")
+ .field("ts timestamp(6)")
+ .field("_hoodie_partition_path string")
+ .field("`partition` varchar(10)")
+ .pkField("id")
+ .partitionField("partition")
+ .option(FlinkOptions.TABLE_TYPE, tableType)
+ .option(FlinkOptions.QUERY_TYPE, queryType)
+ .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
+ .option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits)
+ .option(FlinkOptions.PATH, path)
+ .end();
batchTableEnv.executeSql(readHoodieTableDDL);
result = execSelectSql(batchTableEnv, "select * from t1", ExecMode.BATCH);
@@ -2300,11 +2332,11 @@ public class ITTestHoodieDataSource {
*/
private static Stream<Arguments>
tableTypeQueryTypeNumInsertAndCompactionDeltaCommitsParams() {
return Arrays.stream(new Object[][] {
- {HoodieTableType.COPY_ON_WRITE,
FlinkOptions.QUERY_TYPE_INCREMENTAL, 1, 1},
- {HoodieTableType.COPY_ON_WRITE,
FlinkOptions.QUERY_TYPE_READ_OPTIMIZED, 1, 1},
- {HoodieTableType.MERGE_ON_READ, FlinkOptions.QUERY_TYPE_SNAPSHOT,
1, 1},
- {HoodieTableType.MERGE_ON_READ, FlinkOptions.QUERY_TYPE_SNAPSHOT,
1, 3},
- {HoodieTableType.MERGE_ON_READ, FlinkOptions.QUERY_TYPE_SNAPSHOT,
3, 2}
+ {HoodieTableType.COPY_ON_WRITE, FlinkOptions.QUERY_TYPE_INCREMENTAL,
1, 1},
+ {HoodieTableType.COPY_ON_WRITE,
FlinkOptions.QUERY_TYPE_READ_OPTIMIZED, 1, 1},
+ {HoodieTableType.MERGE_ON_READ, FlinkOptions.QUERY_TYPE_SNAPSHOT, 1,
1},
+ {HoodieTableType.MERGE_ON_READ, FlinkOptions.QUERY_TYPE_SNAPSHOT, 1,
3},
+ {HoodieTableType.MERGE_ON_READ, FlinkOptions.QUERY_TYPE_SNAPSHOT, 3, 2}
}).map(Arguments::of);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
index 46f51df741f..ddb29cdbea7 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
@@ -180,6 +180,8 @@ public class ITTestSchemaEvolution {
+ " f_struct row<f0 int, f1 string, drop_add string, change_type
int>,"
+ " f_map map<string, int>,"
+ " f_array array<int>,"
+ + " f_row_map map<string, row<f0 int, f1 string, drop_add string,
change_type int>>,"
+ + " f_row_array array<row<f0 int, f1 string, drop_add string,
change_type int>>,"
+ " `partition` string"
+ ") partitioned by (`partition`) with (" + tableOptions + ")"
);
@@ -195,18 +197,29 @@ public class ITTestSchemaEvolution {
+ " cast(f_struct as row<f0 int, f1 string, drop_add string,
change_type int>),"
+ " cast(f_map as map<string, int>),"
+ " cast(f_array as array<int>),"
+ + " cast(f_row_map as map<string, row< f0 int, f1 string, drop_add
string, change_type int>>),"
+ + " cast(f_row_array as array<row< f0 int, f1 string, drop_add
string, change_type int>>),"
+ " cast(`partition` as string) "
+ "from (values "
- + " ('id0', 'Indica', 'F', 12, '2000-01-01 00:00:00', cast(null as
row<f0 int, f1 string, drop_add string, change_type int>), map['Indica', 1212],
array[12], 'par0'),"
- + " ('id1', 'Danny', 'M', 23, '2000-01-01 00:00:01', row(1, 's1', '',
1), cast(map['Danny', 2323] as map<string, int>), array[23, 23], 'par1'),"
- + " ('id2', 'Stephen', 'M', 33, '2000-01-01 00:00:02', row(2, 's2',
'', 2), cast(map['Stephen', 3333] as map<string, int>), array[33], 'par1'),"
- + " ('id3', 'Julian', 'M', 53, '2000-01-01 00:00:03', row(3, 's3',
'', 3), cast(map['Julian', 5353] as map<string, int>), array[53, 53], 'par2'),"
- + " ('id4', 'Fabian', 'M', 31, '2000-01-01 00:00:04', row(4, 's4',
'', 4), cast(map['Fabian', 3131] as map<string, int>), array[31], 'par2'),"
- + " ('id5', 'Sophia', 'F', 18, '2000-01-01 00:00:05', row(5, 's5',
'', 5), cast(map['Sophia', 1818] as map<string, int>), array[18, 18], 'par3'),"
- + " ('id6', 'Emma', 'F', 20, '2000-01-01 00:00:06', row(6, 's6', '',
6), cast(map['Emma', 2020] as map<string, int>), array[20], 'par3'),"
- + " ('id7', 'Bob', 'M', 44, '2000-01-01 00:00:07', row(7, 's7', '',
7), cast(map['Bob', 4444] as map<string, int>), array[44, 44], 'par4'),"
- + " ('id8', 'Han', 'M', 56, '2000-01-01 00:00:08', row(8, 's8', '',
8), cast(map['Han', 5656] as map<string, int>), array[56, 56, 56], 'par4')"
- + ") as A(uuid, name, gender, age, ts, f_struct, f_map, f_array,
`partition`)"
+ + " ('id0', 'Indica', 'F', 12, '2000-01-01 00:00:00', cast(null as
row<f0 int, f1 string, drop_add string, change_type int>), map['Indica', 1212],
array[12], "
+ + " cast(null as map<string, row<f0 int, f1 string, drop_add string,
change_type int>>), array[row(0, 's0', '', 0)], 'par0'),"
+ + " ('id1', 'Danny', 'M', 23, '2000-01-01 00:00:01', row(1, 's1', '',
1), cast(map['Danny', 2323] as map<string, int>), array[23, 23], "
+ + " cast(map['Danny', row(1, 's1', '', 1)] as map<string, row<f0 int,
f1 string, drop_add string, change_type int>>), array[row(1, 's1', '', 1)],
'par1'),"
+ + " ('id2', 'Stephen', 'M', 33, '2000-01-01 00:00:02', row(2, 's2',
'', 2), cast(map['Stephen', 3333] as map<string, int>), array[33], "
+ + " cast(map['Stephen', row(2, 's2', '', 2)] as map<string, row<f0
int, f1 string, drop_add string, change_type int>>), array[row(2, 's2', '',
2)], 'par1'),"
+ + " ('id3', 'Julian', 'M', 53, '2000-01-01 00:00:03', row(3, 's3',
'', 3), cast(map['Julian', 5353] as map<string, int>), array[53, 53], "
+ + " cast(map['Julian', row(3, 's3', '', 3)] as map<string, row<f0
int, f1 string, drop_add string, change_type int>>), array[row(3, 's3', '',
3)], 'par2'),"
+ + " ('id4', 'Fabian', 'M', 31, '2000-01-01 00:00:04', row(4, 's4',
'', 4), cast(map['Fabian', 3131] as map<string, int>), array[31], "
+ + " cast(map['Fabian', row(4, 's4', '', 4)] as map<string, row<f0
int, f1 string, drop_add string, change_type int>>), array[row(4, 's4', '',
4)], 'par2'),"
+ + " ('id5', 'Sophia', 'F', 18, '2000-01-01 00:00:05', row(5, 's5',
'', 5), cast(map['Sophia', 1818] as map<string, int>), array[18, 18], "
+ + " cast(map['Sophia', row(5, 's5', '', 5)] as map<string, row<f0
int, f1 string, drop_add string, change_type int>>), array[row(5, 's5', '',
5)], 'par3'),"
+ + " ('id6', 'Emma', 'F', 20, '2000-01-01 00:00:06', row(6, 's6', '',
6), cast(map['Emma', 2020] as map<string, int>), array[20], "
+ + " cast(map['Emma', row(6, 's6', '', 6)] as map<string, row<f0 int,
f1 string, drop_add string, change_type int>>), array[row(6, 's6', '', 6)],
'par3'),"
+ + " ('id7', 'Bob', 'M', 44, '2000-01-01 00:00:07', row(7, 's7', '',
7), cast(map['Bob', 4444] as map<string, int>), array[44, 44], "
+ + " cast(map['Bob', row(7, 's7', '', 7)] as map<string, row<f0 int,
f1 string, drop_add string, change_type int>>), array[row(7, 's7', '', 7)],
'par4'),"
+ + " ('id8', 'Han', 'M', 56, '2000-01-01 00:00:08', row(8, 's8', '',
8), cast(map['Han', 5656] as map<string, int>), array[56, 56, 56], "
+ + " cast(map['Han', row(8, 's8', '', 8)] as map<string, row<f0 int,
f1 string, drop_add string, change_type int>>), array[row(8, 's8', '', 8)],
'par4')"
+ + ") as A(uuid, name, gender, age, ts, f_struct, f_map, f_array,
f_row_map, f_row_array, `partition`)"
).await();
}
@@ -232,19 +245,25 @@ public class ITTestSchemaEvolution {
writeClient.updateColumnType("age", Types.StringType.get());
writeClient.addColumn("last_name", stringType, "empty allowed",
"salary", BEFORE);
writeClient.reOrderColPosition("age", "first_name", BEFORE);
- // add a field in the middle of the `f_struct` column
+ // add a field in the middle of the `f_struct` and `f_row_map` columns
writeClient.addColumn("f_struct.f2", intType, "add field in middle of
struct", "f_struct.f0", AFTER);
- // add a field at the end of `f_struct` column
+ writeClient.addColumn("f_row_map.value.f2", intType, "add field in
middle of struct", "f_row_map.value.f0", AFTER);
+ // add a field at the end of `f_struct` and `f_row_map` column
writeClient.addColumn("f_struct.f3", stringType);
+ writeClient.addColumn("f_row_map.value.f3", stringType);
// delete and add a field with the same name
// reads should not return previously inserted datum of dropped field of
the same name
writeClient.deleteColumns("f_struct.drop_add");
writeClient.addColumn("f_struct.drop_add", doubleType);
+ writeClient.deleteColumns("f_row_map.value.drop_add");
+ writeClient.addColumn("f_row_map.value.drop_add", doubleType);
// perform comprehensive evolution on complex types (struct, array, map)
by promoting its primitive types
writeClient.updateColumnType("f_struct.change_type",
Types.LongType.get());
writeClient.renameColumn("f_struct.change_type", "renamed_change_type");
+ writeClient.updateColumnType("f_row_map.value.change_type",
Types.LongType.get());
+ writeClient.renameColumn("f_row_map.value.change_type",
"renamed_change_type");
writeClient.updateColumnType("f_array.element", Types.DoubleType.get());
writeClient.updateColumnType("f_map.value", Types.DoubleType.get());
@@ -256,6 +275,8 @@ public class ITTestSchemaEvolution {
// perform comprehensive evolution on a struct column by reordering
field positions
writeClient.updateColumnType("f_struct.f0", Types.DecimalType.get(20,
0));
writeClient.reOrderColPosition("f_struct.f0", "f_struct.drop_add",
AFTER);
+ writeClient.updateColumnType("f_row_map.value.f0",
Types.DecimalType.get(20, 0));
+ writeClient.reOrderColPosition("f_row_map.value.f0",
"f_row_map.value.drop_add", AFTER);
}
}
@@ -278,6 +299,8 @@ public class ITTestSchemaEvolution {
+ " f_struct row<f2 int, f1 string, renamed_change_type bigint, f3
string, drop_add string, f0 decimal(20, 0)>,"
+ " f_map map<string, double>,"
+ " f_array array<double>,"
+ + " f_row_map map<string, row<f2 int, f1 string, renamed_change_type
bigint, f3 string, drop_add string, f0 decimal(20, 0)>>,"
+ + " f_row_array array<row<f0 int, f1 string, drop_add string,
change_type int>>,"
+ " new_row_col row<f0 bigint, f1 string>,"
+ " new_array_col array<string>,"
+ " new_map_col map<string, string>,"
@@ -296,18 +319,26 @@ public class ITTestSchemaEvolution {
+ " cast(f_struct as row<f2 int, f1 string, renamed_change_type
bigint, f3 string, drop_add string, f0 decimal(20, 0)>),"
+ " cast(f_map as map<string, double>),"
+ " cast(f_array as array<double>),"
+ + " cast(f_row_map as map<string, row<f2 int, f1 string,
renamed_change_type bigint, f3 string, drop_add string, f0 decimal(20, 0)>>),"
+ + " cast(f_row_array as array<row<f0 int, f1 string, drop_add string,
change_type int>>),"
+ " cast(new_row_col as row<f0 bigint, f1 string>),"
+ " cast(new_array_col as array<string>),"
+ " cast(new_map_col as map<string, string>),"
+ " cast(`partition` as string) "
+ "from (values "
+ " ('id1', '23', 'Danny', '', 10000.1, '2000-01-01 00:00:01', row(1,
's1', 11, 't1', 'drop_add1', 1), cast(map['Danny', 2323.23] as map<string,
double>), array[23, 23, 23], "
+ + " cast(map['Danny', row(1, 's1', 11, 't1', 'drop_add1', 1)] as
map<string, row<f2 int, f1 string, renamed_change_type bigint, f3 string,
drop_add string, f0 decimal(20, 0)>>), "
+ + " array[row(1, 's1', '', 1)], "
+ " row(1, '1'), array['1'], Map['k1','v1'], 'par1'),"
+ " ('id9', 'unknown', 'Alice', '', 90000.9, '2000-01-01 00:00:09',
row(9, 's9', 99, 't9', 'drop_add9', 9), cast(map['Alice', 9999.99] as
map<string, double>), array[9999, 9999], "
+ + " cast(map['Alice', row(9, 's9', 99, 't9', 'drop_add9', 9)] as
map<string, row<f2 int, f1 string, renamed_change_type bigint, f3 string,
drop_add string, f0 decimal(20, 0)>>), "
+ + " array[row(9, 's9', '', 9)], "
+ " row(9, '9'), array['9'], Map['k9','v9'], 'par1'),"
+ " ('id3', '53', 'Julian', '', 30000.3, '2000-01-01 00:00:03',
row(3, 's3', 33, 't3', 'drop_add3', 3), cast(map['Julian', 5353.53] as
map<string, double>), array[53], "
+ + " cast(map['Julian', row(3, 's3', 33, 't3', 'drop_add3', 3)] as
map<string, row<f2 int, f1 string, renamed_change_type bigint, f3 string,
drop_add string, f0 decimal(20, 0)>>), "
+ + " array[row(3, 's3', '', 3)], "
+ " row(3, '3'), array['3'], Map['k3','v3'], 'par2')"
- + ") as A(uuid, age, first_name, last_name, salary, ts, f_struct,
f_map, f_array, new_row_col, new_array_col, new_map_col, `partition`)"
+ + ") as A(uuid, age, first_name, last_name, salary, ts, f_struct,
f_map, f_array, f_row_map, f_row_array, new_row_col, new_array_col,
new_map_col, `partition`)"
).await();
}
@@ -345,6 +376,8 @@ public class ITTestSchemaEvolution {
+ " f_struct, "
+ " f_map, "
+ " f_array, "
+ + " f_row_map, "
+ + " f_row_array, "
+ " new_row_col, "
+ " new_array_col, "
+ " new_map_col "
@@ -376,6 +409,8 @@ public class ITTestSchemaEvolution {
+ " f_struct row<f2 int, f1 string, renamed_change_type bigint, f3
string, drop_add string, f0 decimal(20, 0)>,"
+ " f_map map<string, double>,"
+ " f_array array<double>,"
+ + " f_row_map map<string, row<f2 int, f1 string, renamed_change_type
bigint, f3 string, drop_add string, f0 decimal(20, 0)>>,"
+ + " f_row_array array<row<f0 int, f1 string, drop_add string,
change_type int>>,"
+ " new_row_col row<f0 bigint, f1 string>,"
+ " new_array_col array<string>,"
+ " new_map_col map<string, string>,"
@@ -392,6 +427,8 @@ public class ITTestSchemaEvolution {
+ " f_struct, "
+ " f_map, "
+ " f_array, "
+ + " f_row_map, "
+ + " f_row_array, "
+ " new_row_col, "
+ " new_array_col, "
+ " new_map_col "
@@ -427,6 +464,16 @@ public class ITTestSchemaEvolution {
executor.shutdownNow();
}
+ for (String expectedItem : expected) {
+ if (!actual.contains(expectedItem)) {
+ System.out.println("Not in actual: " + expectedItem);
+ }
+ }
+ for (String actualItem : actual) {
+ if (!expected.contains(actualItem)) {
+ System.out.println("Not in expected: " + actualItem);
+ }
+ }
assertEquals(expected, actual);
}
@@ -472,30 +519,31 @@ public class ITTestSchemaEvolution {
}
}
+ //TODO: null arrays have a single null row; array with null vs array with
row will all null values
private static final ExpectedResult EXPECTED_MERGED_RESULT = new
ExpectedResult(
new String[] {
- "+I[Indica, null, 12, null, {Indica=1212.0}, [12.0], null, null,
null]",
- "+I[Danny, 10000.1, 23, +I[1, s1, 11, t1, drop_add1, 1],
{Danny=2323.23}, [23.0, 23.0, 23.0], +I[1, 1], [1], {k1=v1}]",
- "+I[Stephen, null, 33, +I[null, s2, 2, null, null, 2],
{Stephen=3333.0}, [33.0], null, null, null]",
- "+I[Julian, 30000.3, 53, +I[3, s3, 33, t3, drop_add3, 3],
{Julian=5353.53}, [53.0], +I[3, 3], [3], {k3=v3}]",
- "+I[Fabian, null, 31, +I[null, s4, 4, null, null, 4],
{Fabian=3131.0}, [31.0], null, null, null]",
- "+I[Sophia, null, 18, +I[null, s5, 5, null, null, 5],
{Sophia=1818.0}, [18.0, 18.0], null, null, null]",
- "+I[Emma, null, 20, +I[null, s6, 6, null, null, 6], {Emma=2020.0},
[20.0], null, null, null]",
- "+I[Bob, null, 44, +I[null, s7, 7, null, null, 7], {Bob=4444.0},
[44.0, 44.0], null, null, null]",
- "+I[Han, null, 56, +I[null, s8, 8, null, null, 8], {Han=5656.0},
[56.0, 56.0, 56.0], null, null, null]",
- "+I[Alice, 90000.9, unknown, +I[9, s9, 99, t9, drop_add9, 9],
{Alice=9999.99}, [9999.0, 9999.0], +I[9, 9], [9], {k9=v9}]",
+ "+I[Indica, null, 12, null, {Indica=1212.0}, [12.0], null, [+I[0,
s0, , 0]], null, null, null]",
+ "+I[Danny, 10000.1, 23, +I[1, s1, 11, t1, drop_add1, 1],
{Danny=2323.23}, [23.0, 23.0, 23.0], {Danny=+I[1, s1, 11, t1, drop_add1, 1]},
[+I[1, s1, , 1]], +I[1, 1], [1], {k1=v1}]",
+ "+I[Stephen, null, 33, +I[null, s2, 2, null, null, 2],
{Stephen=3333.0}, [33.0], {Stephen=+I[null, s2, 2, null, null, 2]}, [+I[2, s2,
, 2]], null, null, null]",
+ "+I[Julian, 30000.3, 53, +I[3, s3, 33, t3, drop_add3, 3],
{Julian=5353.53}, [53.0], {Julian=+I[3, s3, 33, t3, drop_add3, 3]}, [+I[3, s3,
, 3]], +I[3, 3], [3], {k3=v3}]",
+ "+I[Fabian, null, 31, +I[null, s4, 4, null, null, 4],
{Fabian=3131.0}, [31.0], {Fabian=+I[null, s4, 4, null, null, 4]}, [+I[4, s4, ,
4]], null, null, null]",
+ "+I[Sophia, null, 18, +I[null, s5, 5, null, null, 5],
{Sophia=1818.0}, [18.0, 18.0], {Sophia=+I[null, s5, 5, null, null, 5]}, [+I[5,
s5, , 5]], null, null, null]",
+ "+I[Emma, null, 20, +I[null, s6, 6, null, null, 6], {Emma=2020.0},
[20.0], {Emma=+I[null, s6, 6, null, null, 6]}, [+I[6, s6, , 6]], null, null,
null]",
+ "+I[Bob, null, 44, +I[null, s7, 7, null, null, 7], {Bob=4444.0},
[44.0, 44.0], {Bob=+I[null, s7, 7, null, null, 7]}, [+I[7, s7, , 7]], null,
null, null]",
+ "+I[Han, null, 56, +I[null, s8, 8, null, null, 8], {Han=5656.0},
[56.0, 56.0, 56.0], {Han=+I[null, s8, 8, null, null, 8]}, [+I[8, s8, , 8]],
null, null, null]",
+ "+I[Alice, 90000.9, unknown, +I[9, s9, 99, t9, drop_add9, 9],
{Alice=9999.99}, [9999.0, 9999.0], {Alice=+I[9, s9, 99, t9, drop_add9, 9]},
[+I[9, s9, , 9]], +I[9, 9], [9], {k9=v9}]",
},
new String[] {
- "+I[id0, Indica, null, 12, null, {Indica=1212.0}, [12.0], null,
null, null]",
- "+I[id1, Danny, 10000.1, 23, +I[1, s1, 11, t1, drop_add1, 1],
{Danny=2323.23}, [23.0, 23.0, 23.0], +I[1, 1], [1], {k1=v1}]",
- "+I[id2, Stephen, null, 33, +I[null, s2, 2, null, null, 2],
{Stephen=3333.0}, [33.0], null, null, null]",
- "+I[id3, Julian, 30000.3, 53, +I[3, s3, 33, t3, drop_add3, 3],
{Julian=5353.53}, [53.0], +I[3, 3], [3], {k3=v3}]",
- "+I[id4, Fabian, null, 31, +I[null, s4, 4, null, null, 4],
{Fabian=3131.0}, [31.0], null, null, null]",
- "+I[id5, Sophia, null, 18, +I[null, s5, 5, null, null, 5],
{Sophia=1818.0}, [18.0, 18.0], null, null, null]",
- "+I[id6, Emma, null, 20, +I[null, s6, 6, null, null, 6],
{Emma=2020.0}, [20.0], null, null, null]",
- "+I[id7, Bob, null, 44, +I[null, s7, 7, null, null, 7],
{Bob=4444.0}, [44.0, 44.0], null, null, null]",
- "+I[id8, Han, null, 56, +I[null, s8, 8, null, null, 8],
{Han=5656.0}, [56.0, 56.0, 56.0], null, null, null]",
- "+I[id9, Alice, 90000.9, unknown, +I[9, s9, 99, t9, drop_add9, 9],
{Alice=9999.99}, [9999.0, 9999.0], +I[9, 9], [9], {k9=v9}]",
+ "+I[id0, Indica, null, 12, null, {Indica=1212.0}, [12.0], null,
[+I[0, s0, , 0]], null, null, null]",
+ "+I[id1, Danny, 10000.1, 23, +I[1, s1, 11, t1, drop_add1, 1],
{Danny=2323.23}, [23.0, 23.0, 23.0], {Danny=+I[1, s1, 11, t1, drop_add1, 1]},
[+I[1, s1, , 1]], +I[1, 1], [1], {k1=v1}]",
+ "+I[id2, Stephen, null, 33, +I[null, s2, 2, null, null, 2],
{Stephen=3333.0}, [33.0], {Stephen=+I[null, s2, 2, null, null, 2]}, [+I[2, s2,
, 2]], null, null, null]",
+ "+I[id3, Julian, 30000.3, 53, +I[3, s3, 33, t3, drop_add3, 3],
{Julian=5353.53}, [53.0], {Julian=+I[3, s3, 33, t3, drop_add3, 3]}, [+I[3, s3,
, 3]], +I[3, 3], [3], {k3=v3}]",
+ "+I[id4, Fabian, null, 31, +I[null, s4, 4, null, null, 4],
{Fabian=3131.0}, [31.0], {Fabian=+I[null, s4, 4, null, null, 4]}, [+I[4, s4, ,
4]], null, null, null]",
+ "+I[id5, Sophia, null, 18, +I[null, s5, 5, null, null, 5],
{Sophia=1818.0}, [18.0, 18.0], {Sophia=+I[null, s5, 5, null, null, 5]}, [+I[5,
s5, , 5]], null, null, null]",
+ "+I[id6, Emma, null, 20, +I[null, s6, 6, null, null, 6],
{Emma=2020.0}, [20.0], {Emma=+I[null, s6, 6, null, null, 6]}, [+I[6, s6, , 6]],
null, null, null]",
+ "+I[id7, Bob, null, 44, +I[null, s7, 7, null, null, 7],
{Bob=4444.0}, [44.0, 44.0], {Bob=+I[null, s7, 7, null, null, 7]}, [+I[7, s7, ,
7]], null, null, null]",
+ "+I[id8, Han, null, 56, +I[null, s8, 8, null, null, 8],
{Han=5656.0}, [56.0, 56.0, 56.0], {Han=+I[null, s8, 8, null, null, 8]}, [+I[8,
s8, , 8]], null, null, null]",
+ "+I[id9, Alice, 90000.9, unknown, +I[9, s9, 99, t9, drop_add9, 9],
{Alice=9999.99}, [9999.0, 9999.0], {Alice=+I[9, s9, 99, t9, drop_add9, 9]},
[+I[9, s9, , 9]], +I[9, 9], [9], {k9=v9}]",
},
new String[] {
"+I[1]",
@@ -522,32 +570,32 @@ public class ITTestSchemaEvolution {
private static final ExpectedResult EXPECTED_UNMERGED_RESULT = new
ExpectedResult(
new String[] {
- "+I[Indica, null, 12, null, {Indica=1212.0}, [12.0], null, null,
null]",
- "+I[Danny, null, 23, +I[null, s1, 1, null, null, 1], {Danny=2323.0},
[23.0, 23.0], null, null, null]",
- "+I[Stephen, null, 33, +I[null, s2, 2, null, null, 2],
{Stephen=3333.0}, [33.0], null, null, null]",
- "+I[Julian, null, 53, +I[null, s3, 3, null, null, 3],
{Julian=5353.0}, [53.0, 53.0], null, null, null]",
- "+I[Fabian, null, 31, +I[null, s4, 4, null, null, 4],
{Fabian=3131.0}, [31.0], null, null, null]",
- "+I[Sophia, null, 18, +I[null, s5, 5, null, null, 5],
{Sophia=1818.0}, [18.0, 18.0], null, null, null]",
- "+I[Emma, null, 20, +I[null, s6, 6, null, null, 6], {Emma=2020.0},
[20.0], null, null, null]",
- "+I[Bob, null, 44, +I[null, s7, 7, null, null, 7], {Bob=4444.0},
[44.0, 44.0], null, null, null]",
- "+I[Han, null, 56, +I[null, s8, 8, null, null, 8], {Han=5656.0},
[56.0, 56.0, 56.0], null, null, null]",
- "+I[Alice, 90000.9, unknown, +I[9, s9, 99, t9, drop_add9, 9],
{Alice=9999.99}, [9999.0, 9999.0], +I[9, 9], [9], {k9=v9}]",
- "+I[Danny, 10000.1, 23, +I[1, s1, 11, t1, drop_add1, 1],
{Danny=2323.23}, [23.0, 23.0, 23.0], +I[1, 1], [1], {k1=v1}]",
- "+I[Julian, 30000.3, 53, +I[3, s3, 33, t3, drop_add3, 3],
{Julian=5353.53}, [53.0], +I[3, 3], [3], {k3=v3}]",
+ "+I[Indica, null, 12, null, {Indica=1212.0}, [12.0], null, [+I[0,
s0, , 0]], null, null, null]",
+ "+I[Danny, null, 23, +I[null, s1, 1, null, null, 1], {Danny=2323.0},
[23.0, 23.0], {Danny=+I[null, s1, 1, null, null, 1]}, [+I[1, s1, , 1]], null,
null, null]",
+ "+I[Stephen, null, 33, +I[null, s2, 2, null, null, 2],
{Stephen=3333.0}, [33.0], {Stephen=+I[null, s2, 2, null, null, 2]}, [+I[2, s2,
, 2]], null, null, null]",
+ "+I[Julian, null, 53, +I[null, s3, 3, null, null, 3],
{Julian=5353.0}, [53.0, 53.0], {Julian=+I[null, s3, 3, null, null, 3]}, [+I[3,
s3, , 3]], null, null, null]",
+ "+I[Fabian, null, 31, +I[null, s4, 4, null, null, 4],
{Fabian=3131.0}, [31.0], {Fabian=+I[null, s4, 4, null, null, 4]}, [+I[4, s4, ,
4]], null, null, null]",
+ "+I[Sophia, null, 18, +I[null, s5, 5, null, null, 5],
{Sophia=1818.0}, [18.0, 18.0], {Sophia=+I[null, s5, 5, null, null, 5]}, [+I[5,
s5, , 5]], null, null, null]",
+ "+I[Emma, null, 20, +I[null, s6, 6, null, null, 6], {Emma=2020.0},
[20.0], {Emma=+I[null, s6, 6, null, null, 6]}, [+I[6, s6, , 6]], null, null,
null]",
+ "+I[Bob, null, 44, +I[null, s7, 7, null, null, 7], {Bob=4444.0},
[44.0, 44.0], {Bob=+I[null, s7, 7, null, null, 7]}, [+I[7, s7, , 7]], null,
null, null]",
+ "+I[Han, null, 56, +I[null, s8, 8, null, null, 8], {Han=5656.0},
[56.0, 56.0, 56.0], {Han=+I[null, s8, 8, null, null, 8]}, [+I[8, s8, , 8]],
null, null, null]",
+ "+I[Alice, 90000.9, unknown, +I[9, s9, 99, t9, drop_add9, 9],
{Alice=9999.99}, [9999.0, 9999.0], {Alice=+I[9, s9, 99, t9, drop_add9, 9]},
[+I[9, s9, , 9]], +I[9, 9], [9], {k9=v9}]",
+ "+I[Danny, 10000.1, 23, +I[1, s1, 11, t1, drop_add1, 1],
{Danny=2323.23}, [23.0, 23.0, 23.0], {Danny=+I[1, s1, 11, t1, drop_add1, 1]},
[+I[1, s1, , 1]], +I[1, 1], [1], {k1=v1}]",
+ "+I[Julian, 30000.3, 53, +I[3, s3, 33, t3, drop_add3, 3],
{Julian=5353.53}, [53.0], {Julian=+I[3, s3, 33, t3, drop_add3, 3]}, [+I[3, s3,
, 3]], +I[3, 3], [3], {k3=v3}]",
},
new String[] {
- "+I[id0, Indica, null, 12, null, {Indica=1212.0}, [12.0], null,
null, null]",
- "+I[id1, Danny, null, 23, +I[null, s1, 1, null, null, 1],
{Danny=2323.0}, [23.0, 23.0], null, null, null]",
- "+I[id2, Stephen, null, 33, +I[null, s2, 2, null, null, 2],
{Stephen=3333.0}, [33.0], null, null, null]",
- "+I[id3, Julian, null, 53, +I[null, s3, 3, null, null, 3],
{Julian=5353.0}, [53.0, 53.0], null, null, null]",
- "+I[id4, Fabian, null, 31, +I[null, s4, 4, null, null, 4],
{Fabian=3131.0}, [31.0], null, null, null]",
- "+I[id5, Sophia, null, 18, +I[null, s5, 5, null, null, 5],
{Sophia=1818.0}, [18.0, 18.0], null, null, null]",
- "+I[id6, Emma, null, 20, +I[null, s6, 6, null, null, 6],
{Emma=2020.0}, [20.0], null, null, null]",
- "+I[id7, Bob, null, 44, +I[null, s7, 7, null, null, 7],
{Bob=4444.0}, [44.0, 44.0], null, null, null]",
- "+I[id8, Han, null, 56, +I[null, s8, 8, null, null, 8],
{Han=5656.0}, [56.0, 56.0, 56.0], null, null, null]",
- "+I[id9, Alice, 90000.9, unknown, +I[9, s9, 99, t9, drop_add9, 9],
{Alice=9999.99}, [9999.0, 9999.0], +I[9, 9], [9], {k9=v9}]",
- "+I[id1, Danny, 10000.1, 23, +I[1, s1, 11, t1, drop_add1, 1],
{Danny=2323.23}, [23.0, 23.0, 23.0], +I[1, 1], [1], {k1=v1}]",
- "+I[id3, Julian, 30000.3, 53, +I[3, s3, 33, t3, drop_add3, 3],
{Julian=5353.53}, [53.0], +I[3, 3], [3], {k3=v3}]",
+ "+I[id0, Indica, null, 12, null, {Indica=1212.0}, [12.0], null,
[+I[0, s0, , 0]], null, null, null]",
+ "+I[id1, Danny, null, 23, +I[null, s1, 1, null, null, 1],
{Danny=2323.0}, [23.0, 23.0], {Danny=+I[null, s1, 1, null, null, 1]}, [+I[1,
s1, , 1]], null, null, null]",
+ "+I[id2, Stephen, null, 33, +I[null, s2, 2, null, null, 2],
{Stephen=3333.0}, [33.0], {Stephen=+I[null, s2, 2, null, null, 2]}, [+I[2, s2,
, 2]], null, null, null]",
+ "+I[id3, Julian, null, 53, +I[null, s3, 3, null, null, 3],
{Julian=5353.0}, [53.0, 53.0], {Julian=+I[null, s3, 3, null, null, 3]}, [+I[3,
s3, , 3]], null, null, null]",
+ "+I[id4, Fabian, null, 31, +I[null, s4, 4, null, null, 4],
{Fabian=3131.0}, [31.0], {Fabian=+I[null, s4, 4, null, null, 4]}, [+I[4, s4, ,
4]], null, null, null]",
+ "+I[id5, Sophia, null, 18, +I[null, s5, 5, null, null, 5],
{Sophia=1818.0}, [18.0, 18.0], {Sophia=+I[null, s5, 5, null, null, 5]}, [+I[5,
s5, , 5]], null, null, null]",
+ "+I[id6, Emma, null, 20, +I[null, s6, 6, null, null, 6],
{Emma=2020.0}, [20.0], {Emma=+I[null, s6, 6, null, null, 6]}, [+I[6, s6, , 6]],
null, null, null]",
+ "+I[id7, Bob, null, 44, +I[null, s7, 7, null, null, 7],
{Bob=4444.0}, [44.0, 44.0], {Bob=+I[null, s7, 7, null, null, 7]}, [+I[7, s7, ,
7]], null, null, null]",
+ "+I[id8, Han, null, 56, +I[null, s8, 8, null, null, 8],
{Han=5656.0}, [56.0, 56.0, 56.0], {Han=+I[null, s8, 8, null, null, 8]}, [+I[8,
s8, , 8]], null, null, null]",
+ "+I[id9, Alice, 90000.9, unknown, +I[9, s9, 99, t9, drop_add9, 9],
{Alice=9999.99}, [9999.0, 9999.0], {Alice=+I[9, s9, 99, t9, drop_add9, 9]},
[+I[9, s9, , 9]], +I[9, 9], [9], {k9=v9}]",
+ "+I[id1, Danny, 10000.1, 23, +I[1, s1, 11, t1, drop_add1, 1],
{Danny=2323.23}, [23.0, 23.0, 23.0], {Danny=+I[1, s1, 11, t1, drop_add1, 1]},
[+I[1, s1, , 1]], +I[1, 1], [1], {k1=v1}]",
+ "+I[id3, Julian, 30000.3, 53, +I[3, s3, 33, t3, drop_add3, 3],
{Julian=5353.53}, [53.0], {Julian=+I[3, s3, 33, t3, drop_add3, 3]}, [+I[3, s3,
, 3]], +I[3, 3], [3], {k3=v3}]",
},
new String[] {
"+I[1]",
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
index 71295d93b10..92d44a8e7ef 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
@@ -97,6 +97,16 @@ public class TestConfigurations {
DataTypes.FIELD("change_type", DataTypes.INT()))),
DataTypes.FIELD("f_map", DataTypes.MAP(DataTypes.STRING(),
DataTypes.INT())),
DataTypes.FIELD("f_array", DataTypes.ARRAY(DataTypes.INT())),
+ DataTypes.FIELD("f_row_map", DataTypes.MAP(DataTypes.STRING(),
DataTypes.ROW(
+ DataTypes.FIELD("f0", DataTypes.INT()),
+ DataTypes.FIELD("f1", DataTypes.STRING()),
+ DataTypes.FIELD("drop_add", DataTypes.STRING()),
+ DataTypes.FIELD("change_type", DataTypes.INT())))),
+ DataTypes.FIELD("f_row_array", DataTypes.ARRAY(DataTypes.ROW(
+ DataTypes.FIELD("f0", DataTypes.INT()),
+ DataTypes.FIELD("f1", DataTypes.STRING()),
+ DataTypes.FIELD("drop_add", DataTypes.STRING()),
+ DataTypes.FIELD("change_type", DataTypes.INT())))),
DataTypes.FIELD("partition", DataTypes.VARCHAR(10)))
.notNull();
@@ -118,6 +128,18 @@ public class TestConfigurations {
DataTypes.FIELD("f0", DataTypes.DECIMAL(20, 0)))),
DataTypes.FIELD("f_map", DataTypes.MAP(DataTypes.STRING(),
DataTypes.DOUBLE())),
DataTypes.FIELD("f_array", DataTypes.ARRAY(DataTypes.DOUBLE())),
+ DataTypes.FIELD("f_row_map", DataTypes.MAP(DataTypes.STRING(),
DataTypes.ROW(
+ DataTypes.FIELD("f2", DataTypes.INT()), // new field added in
the middle of struct
+ DataTypes.FIELD("f1", DataTypes.STRING()),
+ DataTypes.FIELD("renamed_change_type", DataTypes.BIGINT()),
+ DataTypes.FIELD("f3", DataTypes.STRING()),
+ DataTypes.FIELD("drop_add", DataTypes.STRING()),
+ DataTypes.FIELD("f0", DataTypes.DECIMAL(20, 0))))),
+ DataTypes.FIELD("f_row_array", DataTypes.ARRAY(DataTypes.ROW(
+ DataTypes.FIELD("f0", DataTypes.INT()),
+ DataTypes.FIELD("f1", DataTypes.STRING()),
+ DataTypes.FIELD("drop_add", DataTypes.STRING()),
+ DataTypes.FIELD("change_type", DataTypes.INT())))),
DataTypes.FIELD("new_row_col", DataTypes.ROW(
DataTypes.FIELD("f0", DataTypes.BIGINT()),
DataTypes.FIELD("f1", DataTypes.STRING()))),
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
index 70455d94466..683051a4926 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
@@ -62,6 +62,11 @@ public class TestSQL {
+ "(2, array['abc2', 'def2'], array[2, 2], map['abc2', 1, 'def2', 3],
row(array['abc2', 'def2'], row(2, 'abc2'))),\n"
+ "(3, array['abc3', 'def3'], array[3, 3], map['abc3', 1, 'def3', 3],
row(array['abc3', 'def3'], row(3, 'abc3')))";
+ public static final String ARRAY_MAP_OF_ROW_TYPE_INSERT_T1 = "insert into t1
values\n"
+ + "(1, array[row('abc11', 11), row('abc12', 12), row('abc13', 13)],
map['abc11', row(11, 'def11'), 'abc12', row(12, 'def12'), 'abc13', row(13,
'def13')]),\n"
+ + "(2, array[row('abc21', 21), row('abc22', 22), row('abc23', 23)],
map['abc21', row(21, 'def21'), 'abc22', row(22, 'def22'), 'abc23', row(23,
'def23')]),\n"
+ + "(3, array[row('abc31', 31), row('abc32', 32), row('abc33', 33)],
map['abc31', row(31, 'def31'), 'abc32', row(32, 'def32'), 'abc33', row(33,
'def33')])";
+
public static final String NULL_CHILD_COLUMNS_ROW_TYPE_INSERT_T1 = "insert
into t1 values\n"
+ "(1, row(cast(null as int), 'abc1')),\n"
+ "(2, row(2, cast(null as varchar))),\n"
diff --git
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index 8bbbb1288e5..4532fd0672e 100644
---
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -19,11 +19,13 @@
package org.apache.hudi.table.format.cow;
import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.table.format.cow.vector.HeapArrayGroupColumnVector;
import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
import org.apache.hudi.table.format.cow.vector.HeapDecimalVector;
import org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
import org.apache.hudi.table.format.cow.vector.reader.ArrayColumnReader;
+import org.apache.hudi.table.format.cow.vector.reader.ArrayGroupReader;
import org.apache.hudi.table.format.cow.vector.reader.EmptyColumnReader;
import
org.apache.hudi.table.format.cow.vector.reader.FixedLenBytesColumnReader;
import
org.apache.hudi.table.format.cow.vector.reader.Int64TimestampColumnReader;
@@ -62,6 +64,8 @@ import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
@@ -121,7 +125,7 @@ public class ParquetSplitReaderUtil {
UnboundRecordFilter recordFilter) throws IOException {
ValidationUtils.checkState(Arrays.stream(selectedFields).noneMatch(x -> x
== -1),
- "One or more specified columns does not exist in the hudi table.");
+ "One or more specified columns does not exist in the hudi table.");
List<String> selNonPartNames = Arrays.stream(selectedFields)
.mapToObj(i -> fullFieldNames[i])
@@ -282,12 +286,23 @@ public class ParquetSplitReaderUtil {
}
return tv;
case ARRAY:
- HeapArrayVector arrayVector = new HeapArrayVector(batchSize);
- if (value == null) {
- arrayVector.fillWithNulls();
- return arrayVector;
+ ArrayType arrayType = (ArrayType) type;
+ if (arrayType.getElementType().isAnyOf(LogicalTypeFamily.CONSTRUCTED))
{
+ HeapArrayGroupColumnVector arrayGroup = new
HeapArrayGroupColumnVector(batchSize);
+ if (value == null) {
+ arrayGroup.fillWithNulls();
+ return arrayGroup;
+ } else {
+ throw new UnsupportedOperationException("Unsupported create array
with default value.");
+ }
} else {
- throw new UnsupportedOperationException("Unsupported create array
with default value.");
+ HeapArrayVector arrayVector = new HeapArrayVector(batchSize);
+ if (value == null) {
+ arrayVector.fillWithNulls();
+ return arrayVector;
+ } else {
+ throw new UnsupportedOperationException("Unsupported create array
with default value.");
+ }
}
case MAP:
HeapMapColumnVector mapVector = new HeapMapColumnVector(batchSize,
null, null);
@@ -394,12 +409,23 @@ public class ParquetSplitReaderUtil {
throw new AssertionError();
}
case ARRAY:
- return new ArrayColumnReader(
- descriptor,
- pageReader,
- utcTimestamp,
- descriptor.getPrimitiveType(),
- fieldType);
+ ArrayType arrayType = (ArrayType) fieldType;
+ if (arrayType.getElementType().isAnyOf(LogicalTypeFamily.CONSTRUCTED))
{
+ return new ArrayGroupReader(createColumnReader(
+ utcTimestamp,
+ arrayType.getElementType(),
+ physicalType.asGroupType().getType(0),
+ descriptors,
+ pages,
+ depth + 1));
+ } else {
+ return new ArrayColumnReader(
+ descriptor,
+ pageReader,
+ utcTimestamp,
+ descriptor.getPrimitiveType(),
+ fieldType);
+ }
case MAP:
MapType mapType = (MapType) fieldType;
ArrayColumnReader keyReader =
@@ -409,14 +435,24 @@ public class ParquetSplitReaderUtil {
utcTimestamp,
descriptor.getPrimitiveType(),
new ArrayType(mapType.getKeyType()));
- ArrayColumnReader valueReader =
- new ArrayColumnReader(
- descriptors.get(1),
- pages.getPageReader(descriptors.get(1)),
- utcTimestamp,
- descriptors.get(1).getPrimitiveType(),
- new ArrayType(mapType.getValueType()));
- return new MapColumnReader(keyReader, valueReader, fieldType);
+ ColumnReader<WritableColumnVector> valueReader;
+ if (mapType.getValueType().isAnyOf(LogicalTypeFamily.CONSTRUCTED)) {
+ valueReader = new ArrayGroupReader(createColumnReader(
+ utcTimestamp,
+ mapType.getValueType(),
+ physicalType.asGroupType().getType(0).asGroupType().getType(1),
// Get the value physical type
+ descriptors.subList(1, descriptors.size()), // remove the key
descriptor
+ pages,
+ depth + 2)); // increase the depth by 2, because there's a
key_value entry in the path
+ } else {
+ valueReader = new ArrayColumnReader(
+ descriptors.get(1),
+ pages.getPageReader(descriptors.get(1)),
+ utcTimestamp,
+ descriptors.get(1).getPrimitiveType(),
+ new ArrayType(mapType.getValueType()));
+ }
+ return new MapColumnReader(keyReader, valueReader);
case ROW:
RowType rowType = (RowType) fieldType;
GroupType groupType = physicalType.asGroupType();
@@ -427,14 +463,32 @@ public class ParquetSplitReaderUtil {
if (fieldIndex < 0) {
fieldReaders.add(new EmptyColumnReader());
} else {
- fieldReaders.add(
- createColumnReader(
- utcTimestamp,
- rowType.getTypeAt(i),
- groupType.getType(fieldIndex),
- descriptors,
- pages,
- depth + 1));
+ // Check for nested row in array with atomic field type.
+
+ // This is done to meet the Parquet field algorithm that pushes
multiplicity and structures down to individual fields.
+ // In Parquet, an array of rows is stored as separate arrays for
each field.
+
+ // Limitations: It won't work for multiple nested arrays and maps.
+ // The main problem is that the Flink classes and interface don't
follow that pattern.
+ if (descriptors.get(fieldIndex).getMaxRepetitionLevel() > 0 &&
!rowType.getTypeAt(i).is(LogicalTypeRoot.ARRAY)) {
+ fieldReaders.add(
+ createColumnReader(
+ utcTimestamp,
+ new ArrayType(rowType.getTypeAt(i).isNullable(),
rowType.getTypeAt(i)),
+ groupType.getType(fieldIndex),
+ descriptors,
+ pages,
+ depth + 1));
+ } else {
+ fieldReaders.add(
+ createColumnReader(
+ utcTimestamp,
+ rowType.getTypeAt(i),
+ groupType.getType(fieldIndex),
+ descriptors,
+ pages,
+ depth + 1));
+ }
}
}
return new RowColumnReader(fieldReaders);
@@ -501,43 +555,65 @@ public class ParquetSplitReaderUtil {
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
checkArgument(primitiveType.getOriginalType() !=
OriginalType.TIME_MICROS,
-
getOriginalTypeCheckFailureMessage(primitiveType.getOriginalType(), fieldType));
+
getOriginalTypeCheckFailureMessage(primitiveType.getOriginalType(), fieldType));
return new HeapTimestampVector(batchSize);
case DECIMAL:
checkArgument(
(typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
|| typeName == PrimitiveType.PrimitiveTypeName.BINARY)
&& primitiveType.getOriginalType() == OriginalType.DECIMAL,
- getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
+ getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
return new HeapDecimalVector(batchSize);
case ARRAY:
ArrayType arrayType = (ArrayType) fieldType;
- return new HeapArrayVector(
- batchSize,
- createWritableColumnVector(
- batchSize,
- arrayType.getElementType(),
- physicalType,
- descriptors,
- depth));
+ if (arrayType.getElementType().isAnyOf(LogicalTypeFamily.CONSTRUCTED))
{
+ return new HeapArrayGroupColumnVector(
+ batchSize,
+ createWritableColumnVector(
+ batchSize,
+ arrayType.getElementType(),
+ physicalType.asGroupType().getType(0),
+ descriptors,
+ depth + 1));
+ } else {
+ return new HeapArrayVector(
+ batchSize,
+ createWritableColumnVector(
+ batchSize,
+ arrayType.getElementType(),
+ physicalType,
+ descriptors,
+ depth));
+ }
case MAP:
MapType mapType = (MapType) fieldType;
GroupType repeatedType =
physicalType.asGroupType().getType(0).asGroupType();
// the map column has three level paths.
- return new HeapMapColumnVector(
+ WritableColumnVector keyColumnVector = createWritableColumnVector(
batchSize,
- createWritableColumnVector(
- batchSize,
- mapType.getKeyType(),
- repeatedType.getType(0),
- descriptors,
- depth + 2),
- createWritableColumnVector(
- batchSize,
- mapType.getValueType(),
- repeatedType.getType(1),
- descriptors,
- depth + 2));
+ new ArrayType(mapType.getKeyType().isNullable(),
mapType.getKeyType()),
+ repeatedType.getType(0),
+ descriptors,
+ depth + 2);
+ WritableColumnVector valueColumnVector;
+ if (mapType.getValueType().isAnyOf(LogicalTypeFamily.CONSTRUCTED)) {
+ valueColumnVector = new HeapArrayGroupColumnVector(
+ batchSize,
+ createWritableColumnVector(
+ batchSize,
+ mapType.getValueType(),
+ repeatedType.getType(1).asGroupType(),
+ descriptors,
+ depth + 2));
+ } else {
+ valueColumnVector = createWritableColumnVector(
+ batchSize,
+ new ArrayType(mapType.getValueType().isNullable(),
mapType.getValueType()),
+ repeatedType.getType(1),
+ descriptors,
+ depth + 2);
+ }
+ return new HeapMapColumnVector(batchSize, keyColumnVector,
valueColumnVector);
case ROW:
RowType rowType = (RowType) fieldType;
GroupType groupType = physicalType.asGroupType();
@@ -546,15 +622,44 @@ public class ParquetSplitReaderUtil {
// schema evolution: read the file with a new extended field name.
int fieldIndex =
getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
if (fieldIndex < 0) {
- columnVectors[i] = (WritableColumnVector)
createVectorFromConstant(rowType.getTypeAt(i), null, batchSize);
+ // Check for nested row in array with atomic field type.
+
+ // This is done to meet the Parquet field algorithm that pushes
multiplicity and structures down to individual fields.
+ // In Parquet, an array of rows is stored as separate arrays for
each field.
+
+ // Limitations: It won't work for multiple nested arrays and maps.
+ // The main problem is that the Flink classes and interface don't
follow that pattern.
+ if (groupType.getRepetition().equals(Type.Repetition.REPEATED) &&
!rowType.getTypeAt(i).is(LogicalTypeRoot.ARRAY)) {
+ columnVectors[i] = (WritableColumnVector)
createVectorFromConstant(
+ new ArrayType(rowType.getTypeAt(i).isNullable(),
rowType.getTypeAt(i)), null, batchSize);
+ } else {
+ columnVectors[i] = (WritableColumnVector)
createVectorFromConstant(rowType.getTypeAt(i), null, batchSize);
+ }
} else {
- columnVectors[i] =
- createWritableColumnVector(
- batchSize,
- rowType.getTypeAt(i),
- groupType.getType(fieldIndex),
- descriptors,
- depth + 1);
+ // Check for nested row in array with atomic field type.
+
+ // This is done to meet the Parquet field algorithm that pushes
multiplicity and structures down to individual fields.
+ // In Parquet, an array of rows is stored as separate arrays for
each field.
+
+ // Limitations: It won't work for multiple nested arrays and maps.
+ // The main problem is that the Flink classes and interface don't
follow that pattern.
+ if (descriptors.get(fieldIndex).getMaxRepetitionLevel() > 0 &&
!rowType.getTypeAt(i).is(LogicalTypeRoot.ARRAY)) {
+ columnVectors[i] =
+ createWritableColumnVector(
+ batchSize,
+ new ArrayType(rowType.getTypeAt(i).isNullable(),
rowType.getTypeAt(i)),
+ groupType.getType(fieldIndex),
+ descriptors,
+ depth + 1);
+ } else {
+ columnVectors[i] =
+ createWritableColumnVector(
+ batchSize,
+ rowType.getTypeAt(i),
+ groupType.getType(fieldIndex),
+ descriptors,
+ depth + 1);
+ }
}
}
return new HeapRowColumnVector(batchSize, columnVectors);
@@ -575,8 +680,9 @@ public class ParquetSplitReaderUtil {
/**
* Construct the error message when primitive type mismatches.
+ *
* @param primitiveType Primitive type
- * @param fieldType Logical field type
+ * @param fieldType Logical field type
* @return The error message
*/
private static String
getPrimitiveTypeCheckFailureMessage(PrimitiveType.PrimitiveTypeName
primitiveType, LogicalType fieldType) {
@@ -585,8 +691,9 @@ public class ParquetSplitReaderUtil {
/**
* Construct the error message when original type mismatches.
+ *
* @param originalType Original type
- * @param fieldType Logical field type
+ * @param fieldType Logical field type
* @return The error message
*/
private static String getOriginalTypeCheckFailureMessage(OriginalType
originalType, LogicalType fieldType) {
diff --git
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/ColumnarGroupArrayData.java
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/ColumnarGroupArrayData.java
new file mode 100644
index 00000000000..4c9275f3b09
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/ColumnarGroupArrayData.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
+
+public class ColumnarGroupArrayData implements ArrayData {
+
+ WritableColumnVector vector;
+ int rowId;
+
+ public ColumnarGroupArrayData(WritableColumnVector vector, int rowId) {
+ this.vector = vector;
+ this.rowId = rowId;
+ }
+
+ @Override
+ public int size() {
+ if (vector == null) {
+ return 0;
+ }
+
+ if (vector instanceof HeapRowColumnVector) {
+ // assume all fields have the same size
+ if (((HeapRowColumnVector) vector).vectors == null ||
((HeapRowColumnVector) vector).vectors.length == 0) {
+ return 0;
+ }
+ return ((HeapArrayVector) ((HeapRowColumnVector)
vector).vectors[0]).getArray(rowId).size();
+ }
+ throw new UnsupportedOperationException(vector.getClass().getName() + " is
not supported. Supported vector types: HeapRowColumnVector");
+ }
+
+ @Override
+ public boolean isNullAt(int index) {
+ if (vector == null) {
+ return true;
+ }
+
+ if (vector instanceof HeapRowColumnVector) {
+ return ((HeapRowColumnVector) vector).vectors == null;
+ }
+
+ throw new UnsupportedOperationException(vector.getClass().getName() + " is
not supported. Supported vector types: HeapRowColumnVector");
+ }
+
+ @Override
+ public boolean getBoolean(int index) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public byte getByte(int index) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public short getShort(int index) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public int getInt(int index) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public long getLong(int index) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public float getFloat(int index) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public double getDouble(int index) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public StringData getString(int index) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public DecimalData getDecimal(int index, int precision, int scale) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public TimestampData getTimestamp(int index, int precision) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public <T> RawValueData<T> getRawValue(int index) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public byte[] getBinary(int index) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public ArrayData getArray(int index) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public MapData getMap(int index) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public RowData getRow(int index, int numFields) {
+ return new ColumnarGroupRowData((HeapRowColumnVector) vector, rowId,
index);
+ }
+
+ @Override
+ public boolean[] toBooleanArray() {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public byte[] toByteArray() {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public short[] toShortArray() {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public int[] toIntArray() {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public long[] toLongArray() {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public float[] toFloatArray() {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public double[] toDoubleArray() {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+}
diff --git
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/ColumnarGroupMapData.java
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/ColumnarGroupMapData.java
new file mode 100644
index 00000000000..69cb6feca13
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/ColumnarGroupMapData.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.MapData;
+import
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
+
+public class ColumnarGroupMapData implements MapData {
+
+ WritableColumnVector keyVector;
+ WritableColumnVector valueVector;
+ int rowId;
+
+ public ColumnarGroupMapData(WritableColumnVector keyVector,
WritableColumnVector valueVector, int rowId) {
+ this.keyVector = keyVector;
+ this.valueVector = valueVector;
+ this.rowId = rowId;
+ }
+
+ @Override
+ public int size() {
+ if (keyVector == null) {
+ return 0;
+ }
+
+ if (keyVector instanceof HeapArrayVector) {
+ return ((HeapArrayVector) keyVector).getArray(rowId).size();
+ }
+ throw new UnsupportedOperationException(keyVector.getClass().getName() + "
is not supported. Supported vector types: HeapArrayVector");
+ }
+
+ @Override
+ public ArrayData keyArray() {
+ return ((HeapArrayVector) keyVector).getArray(rowId);
+ }
+
+ @Override
+ public ArrayData valueArray() {
+ if (valueVector instanceof HeapArrayVector) {
+ return ((HeapArrayVector) valueVector).getArray(rowId);
+ } else if (valueVector instanceof HeapArrayGroupColumnVector) {
+ return ((HeapArrayGroupColumnVector) valueVector).getArray(rowId);
+ }
+ throw new UnsupportedOperationException(valueVector.getClass().getName() +
" is not supported. Supported vector types: HeapArrayVector,
HeapArrayGroupColumnVector");
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/ColumnarGroupRowData.java
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/ColumnarGroupRowData.java
new file mode 100644
index 00000000000..439c1880823
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/ColumnarGroupRowData.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+
+public class ColumnarGroupRowData implements RowData {
+
+ HeapRowColumnVector vector;
+ int rowId;
+ int index;
+
+ public ColumnarGroupRowData(HeapRowColumnVector vector, int rowId, int
index) {
+ this.vector = vector;
+ this.rowId = rowId;
+ this.index = index;
+ }
+
+ @Override
+ public int getArity() {
+ return vector.vectors.length;
+ }
+
+ @Override
+ public RowKind getRowKind() {
+ return RowKind.INSERT;
+ }
+
+ @Override
+ public void setRowKind(RowKind rowKind) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public boolean isNullAt(int pos) {
+ return
+ vector.vectors[pos].isNullAt(rowId)
+ || ((HeapArrayVector)
(vector.vectors[pos])).getArray(rowId).isNullAt(index);
+ }
+
+ @Override
+ public boolean getBoolean(int pos) {
+ return ((HeapArrayVector)
(vector.vectors[pos])).getArray(rowId).getBoolean(index);
+ }
+
+ @Override
+ public byte getByte(int pos) {
+ return ((HeapArrayVector)
(vector.vectors[pos])).getArray(rowId).getByte(index);
+ }
+
+ @Override
+ public short getShort(int pos) {
+ return ((HeapArrayVector)
(vector.vectors[pos])).getArray(rowId).getShort(index);
+ }
+
+ @Override
+ public int getInt(int pos) {
+ return ((HeapArrayVector)
(vector.vectors[pos])).getArray(rowId).getInt(index);
+ }
+
+ @Override
+ public long getLong(int pos) {
+ return ((HeapArrayVector)
(vector.vectors[pos])).getArray(rowId).getLong(index);
+ }
+
+ @Override
+ public float getFloat(int pos) {
+ return ((HeapArrayVector)
(vector.vectors[pos])).getArray(rowId).getFloat(index);
+ }
+
+ @Override
+ public double getDouble(int pos) {
+ return ((HeapArrayVector)
(vector.vectors[pos])).getArray(rowId).getDouble(index);
+ }
+
+ @Override
+ public StringData getString(int pos) {
+ return ((HeapArrayVector)
(vector.vectors[pos])).getArray(rowId).getString(index);
+ }
+
+ @Override
+ public DecimalData getDecimal(int pos, int i1, int i2) {
+ return ((HeapArrayVector)
(vector.vectors[pos])).getArray(rowId).getDecimal(index, i1, i2);
+ }
+
+ @Override
+ public TimestampData getTimestamp(int pos, int i1) {
+ return ((HeapArrayVector)
(vector.vectors[pos])).getArray(rowId).getTimestamp(index, i1);
+ }
+
+ @Override
+ public <T> RawValueData<T> getRawValue(int pos) {
+ return ((HeapArrayVector)
(vector.vectors[pos])).getArray(rowId).getRawValue(index);
+ }
+
+ @Override
+ public byte[] getBinary(int pos) {
+ return ((HeapArrayVector)
(vector.vectors[pos])).getArray(rowId).getBinary(index);
+ }
+
+ @Override
+ public ArrayData getArray(int pos) {
+ return ((HeapArrayVector)
(vector.vectors[pos])).getArray(rowId).getArray(index);
+ }
+
+ @Override
+ public MapData getMap(int pos) {
+ return ((HeapArrayVector)
(vector.vectors[pos])).getArray(rowId).getMap(index);
+ }
+
+ @Override
+ public RowData getRow(int pos, int numFields) {
+ return ((HeapArrayVector)
(vector.vectors[pos])).getArray(rowId).getRow(index, numFields);
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayGroupColumnVector.java
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayGroupColumnVector.java
new file mode 100644
index 00000000000..3d7d8b1f0de
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayGroupColumnVector.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.columnar.vector.ArrayColumnVector;
+import org.apache.flink.table.data.columnar.vector.heap.AbstractHeapVector;
+import
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
+
+/**
+ * This class represents a nullable heap row column vector.
+ */
+public class HeapArrayGroupColumnVector extends AbstractHeapVector
+ implements WritableColumnVector, ArrayColumnVector {
+
+ public WritableColumnVector vector;
+
+ public HeapArrayGroupColumnVector(int len) {
+ super(len);
+ }
+
+ public HeapArrayGroupColumnVector(int len, WritableColumnVector vector) {
+ super(len);
+ this.vector = vector;
+ }
+
+ @Override
+ public ArrayData getArray(int rowId) {
+ return new ColumnarGroupArrayData(vector, rowId);
+ }
+
+ @Override
+ public void reset() {
+ super.reset();
+ vector.reset();
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
index a3797371695..95d8fd720d3 100644
---
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
+++
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
@@ -19,8 +19,6 @@
package org.apache.hudi.table.format.cow.vector;
import org.apache.flink.table.data.MapData;
-import org.apache.flink.table.data.columnar.ColumnarMapData;
-import org.apache.flink.table.data.columnar.vector.ColumnVector;
import org.apache.flink.table.data.columnar.vector.MapColumnVector;
import org.apache.flink.table.data.columnar.vector.heap.AbstractHeapVector;
import
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
@@ -31,49 +29,25 @@ import
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector
public class HeapMapColumnVector extends AbstractHeapVector
implements WritableColumnVector, MapColumnVector {
- private long[] offsets;
- private long[] lengths;
- private int size;
- private ColumnVector keys;
- private ColumnVector values;
+ private WritableColumnVector keys;
+ private WritableColumnVector values;
- public HeapMapColumnVector(int len, ColumnVector keys, ColumnVector values) {
+ public HeapMapColumnVector(int len, WritableColumnVector keys,
WritableColumnVector values) {
super(len);
- size = 0;
- offsets = new long[len];
- lengths = new long[len];
this.keys = keys;
this.values = values;
}
- public void setOffsets(long[] offsets) {
- this.offsets = offsets;
+ public WritableColumnVector getKeys() {
+ return keys;
}
- public void setLengths(long[] lengths) {
- this.lengths = lengths;
- }
-
- public void setKeys(ColumnVector keys) {
- this.keys = keys;
- }
-
- public void setValues(ColumnVector values) {
- this.values = values;
- }
-
- public int getSize() {
- return size;
- }
-
- public void setSize(int size) {
- this.size = size;
+ public WritableColumnVector getValues() {
+ return values;
}
@Override
- public MapData getMap(int i) {
- long offset = offsets[i];
- long length = lengths[i];
- return new ColumnarMapData(keys, values, (int) offset, (int) length);
+ public MapData getMap(int rowId) {
+ return new ColumnarGroupMapData(keys, values, rowId);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayGroupReader.java
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayGroupReader.java
new file mode 100644
index 00000000000..437c186a936
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayGroupReader.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector.reader;
+
+import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
+import
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
+import org.apache.hudi.table.format.cow.vector.HeapArrayGroupColumnVector;
+
+import java.io.IOException;
+
+/**
+ * Array of a Group type (Array, Map, Row, etc.) {@link ColumnReader}.
+ */
+public class ArrayGroupReader implements ColumnReader<WritableColumnVector> {
+
+ private final ColumnReader<WritableColumnVector> fieldReader;
+
+ public ArrayGroupReader(ColumnReader<WritableColumnVector> fieldReader) {
+ this.fieldReader = fieldReader;
+ }
+
+ @Override
+ public void readToVector(int readNumber, WritableColumnVector vector) throws
IOException {
+ HeapArrayGroupColumnVector rowColumnVector = (HeapArrayGroupColumnVector)
vector;
+
+ fieldReader.readToVector(readNumber, rowColumnVector.vector);
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/MapColumnReader.java
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/MapColumnReader.java
index a6762d2e175..ee65dd22c43 100644
---
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/MapColumnReader.java
+++
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/MapColumnReader.java
@@ -18,14 +18,11 @@
package org.apache.hudi.table.format.cow.vector.reader;
-import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
import org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
-import org.apache.flink.table.data.columnar.vector.ColumnVector;
+import org.apache.flink.table.data.columnar.vector.heap.AbstractHeapVector;
import
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.MapType;
import java.io.IOException;
@@ -34,43 +31,26 @@ import java.io.IOException;
*/
public class MapColumnReader implements ColumnReader<WritableColumnVector> {
- private final LogicalType logicalType;
private final ArrayColumnReader keyReader;
- private final ArrayColumnReader valueReader;
+ private final ColumnReader<WritableColumnVector> valueReader;
public MapColumnReader(
- ArrayColumnReader keyReader, ArrayColumnReader valueReader, LogicalType
logicalType) {
+ ArrayColumnReader keyReader, ColumnReader<WritableColumnVector>
valueReader) {
this.keyReader = keyReader;
this.valueReader = valueReader;
- this.logicalType = logicalType;
}
- public void readBatch(int total, ColumnVector column) throws IOException {
- HeapMapColumnVector mapColumnVector = (HeapMapColumnVector) column;
- MapType mapType = (MapType) logicalType;
- // initialize 2 ListColumnVector for keys and values
- HeapArrayVector keyArrayColumnVector = new HeapArrayVector(total);
- HeapArrayVector valueArrayColumnVector = new HeapArrayVector(total);
- // read the keys and values
- keyReader.readToVector(total, keyArrayColumnVector);
- valueReader.readToVector(total, valueArrayColumnVector);
-
- // set the related attributes according to the keys and values
- mapColumnVector.setKeys(keyArrayColumnVector.child);
- mapColumnVector.setValues(valueArrayColumnVector.child);
- mapColumnVector.setOffsets(keyArrayColumnVector.offsets);
- mapColumnVector.setLengths(keyArrayColumnVector.lengths);
- mapColumnVector.setSize(keyArrayColumnVector.getSize());
+ @Override
+ public void readToVector(int readNumber, WritableColumnVector vector) throws
IOException {
+ HeapMapColumnVector mapColumnVector = (HeapMapColumnVector) vector;
+ AbstractHeapVector keyArrayColumnVector = (AbstractHeapVector)
(mapColumnVector.getKeys());
+ keyReader.readToVector(readNumber, mapColumnVector.getKeys());
+ valueReader.readToVector(readNumber, mapColumnVector.getValues());
for (int i = 0; i < keyArrayColumnVector.getLen(); i++) {
if (keyArrayColumnVector.isNullAt(i)) {
mapColumnVector.setNullAt(i);
}
}
}
-
- @Override
- public void readToVector(int readNumber, WritableColumnVector vector) throws
IOException {
- readBatch(readNumber, vector);
- }
}