This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 78dcff71e07 [HUDI-7930] Flink Support for Array of Row and Map of Row 
value (#11727)
78dcff71e07 is described below

commit 78dcff71e07257a2713ce724162b44b62fdf17ba
Author: David N Perkins <[email protected]>
AuthorDate: Mon Oct 7 01:16:13 2024 -0400

    [HUDI-7930] Flink Support for Array of Row and Map of Row value (#11727)
    
    * HUDI-7930 add unit test to reproduce issue
    * HUDI-7930 add avro schema for array of rows data
    * HUDI initial implementation
    * HUDI-7930 implemented Row value type for Maps
    * GH-7930 fixed insert and bulk insert to match upsert schema
    * GH-7930 fix other tests
    * GH-7930 fixed missing row field bug
    * GH-7930 fix TestParquetSchemaConverter
    * GH-7930 remove TestHoodieTableSource tests and associated resources
    * fix the api compatibility
    * fix the null values for map type and schema evolution for row data type
    * Add comment for the nested row in array check
    * GH-7930 add schema evolution tests
    
    ---------
    
    Co-authored-by: danny0405 <[email protected]>
---
 .../storage/row/parquet/ParquetRowDataWriter.java  |  29 ++-
 .../row/parquet/ParquetSchemaConverter.java        |  18 +-
 .../row/parquet/TestParquetSchemaConverter.java    |   2 +-
 .../apache/hudi/table/ITTestHoodieDataSource.java  | 182 +++++++++-------
 .../apache/hudi/table/ITTestSchemaEvolution.java   | 162 ++++++++++-----
 .../org/apache/hudi/utils/TestConfigurations.java  |  22 ++
 .../test/java/org/apache/hudi/utils/TestSQL.java   |   5 +
 .../table/format/cow/ParquetSplitReaderUtil.java   | 229 +++++++++++++++------
 .../format/cow/vector/ColumnarGroupArrayData.java  | 179 ++++++++++++++++
 .../format/cow/vector/ColumnarGroupMapData.java    |  63 ++++++
 .../format/cow/vector/ColumnarGroupRowData.java    | 138 +++++++++++++
 .../cow/vector/HeapArrayGroupColumnVector.java     |  53 +++++
 .../format/cow/vector/HeapMapColumnVector.java     |  44 +---
 .../format/cow/vector/reader/ArrayGroupReader.java |  44 ++++
 .../format/cow/vector/reader/MapColumnReader.java  |  38 +---
 15 files changed, 935 insertions(+), 273 deletions(-)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java
index e5b9509d879..62ea526c275 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java
@@ -506,18 +506,27 @@ public class ParquetRowDataWriter {
     private void doWrite(ArrayData arrayData) {
       recordConsumer.startGroup();
       if (arrayData.size() > 0) {
-        final String repeatedGroup = "list";
-        final String elementField = "element";
+        final String repeatedGroup = "array";
         recordConsumer.startField(repeatedGroup, 0);
-        for (int i = 0; i < arrayData.size(); i++) {
-          recordConsumer.startGroup();
-          if (!arrayData.isNullAt(i)) {
-            // Only creates the element field if the current array element is 
not null.
-            recordConsumer.startField(elementField, 0);
-            elementWriter.write(arrayData, i);
-            recordConsumer.endField(elementField, 0);
+        if (elementWriter instanceof RowWriter) {
+          for (int i = 0; i < arrayData.size(); i++) {
+            if (!arrayData.isNullAt(i)) {
+              // Only creates the element field if the current array element 
is not null.
+              elementWriter.write(arrayData, i);
+            }
+          }
+        } else {
+          final String elementField = "element";
+          for (int i = 0; i < arrayData.size(); i++) {
+            recordConsumer.startGroup();
+            if (!arrayData.isNullAt(i)) {
+              // Only creates the element field if the current array element 
is not null.
+              recordConsumer.startField(elementField, 0);
+              elementWriter.write(arrayData, i);
+              recordConsumer.endField(elementField, 0);
+            }
+            recordConsumer.endGroup();
           }
-          recordConsumer.endGroup();
         }
         recordConsumer.endField(repeatedGroup, 0);
       }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
index 8bd1a848855..b4b425f383c 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TimestampType;
@@ -622,18 +623,25 @@ public class ParquetSchemaConverter {
         }
       case ARRAY:
         // <list-repetition> group <name> (LIST) {
-        //   repeated group list {
+        //   repeated group array {
         //     <element-repetition> <element-type> element;
         //   }
         // }
         ArrayType arrayType = (ArrayType) type;
         LogicalType elementType = arrayType.getElementType();
+
+        Types.GroupBuilder<GroupType> arrayGroupBuilder = 
Types.repeatedGroup();
+        if (elementType.getTypeRoot() == LogicalTypeRoot.ROW) {
+          RowType rowType = (RowType) elementType;
+          rowType.getFields().forEach(field ->
+                  
arrayGroupBuilder.addField(convertToParquetType(field.getName(), 
field.getType(), repetition)));
+        } else {
+          arrayGroupBuilder.addField(convertToParquetType("element", 
elementType, repetition));
+        }
+
         return Types
             .buildGroup(repetition).as(OriginalType.LIST)
-            .addField(
-                Types.repeatedGroup()
-                    .addField(convertToParquetType("element", elementType, 
repetition))
-                    .named("list"))
+            .addField(arrayGroupBuilder.named("array"))
             .named(name);
       case MAP:
         // <map-repetition> group <name> (MAP) {
diff --git 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
index 9e07edbd4ca..864abeed86a 100644
--- 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
+++ 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
@@ -50,7 +50,7 @@ public class TestParquetSchemaConverter {
     assertThat(messageType.getColumns().size(), is(7));
     final String expected = "message converted {\n"
         + "  optional group f_array (LIST) {\n"
-        + "    repeated group list {\n"
+        + "    repeated group array {\n"
         + "      optional binary element (STRING);\n"
         + "    }\n"
         + "  }\n"
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index e0147f6ed8a..77b66ff974e 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -371,9 +371,9 @@ public class ITTestHoodieDataSource {
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
         .option(FlinkOptions.OPERATION, "insert")
         .option(FlinkOptions.READ_AS_STREAMING, true)
-        .option(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED,true)
+        .option(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true)
         .option(FlinkOptions.CLUSTERING_ASYNC_ENABLED, true)
-        .option(FlinkOptions.CLUSTERING_DELTA_COMMITS,1)
+        .option(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1)
         .option(FlinkOptions.CLUSTERING_TASKS, 1)
         .end();
     streamTableEnv.executeSql(hoodieTableDDL);
@@ -383,7 +383,7 @@ public class ITTestHoodieDataSource {
     String instant = TestUtils.getNthCompleteInstant(new 
StoragePath(tempFile.toURI()), 2, HoodieTimeline.COMMIT_ACTION);
 
     streamTableEnv.getConfig().getConfiguration()
-            .setBoolean("table.dynamic-table-options.enabled", true);
+        .setBoolean("table.dynamic-table-options.enabled", true);
     final String query = String.format("select * from t1/*+ 
options('read.start-commit'='%s')*/", instant);
     List<Row> rows = execSelectSql(streamTableEnv, query, 10);
     assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
@@ -398,9 +398,9 @@ public class ITTestHoodieDataSource {
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
         .option(FlinkOptions.OPERATION, "insert")
-        .option(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED,true)
+        .option(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true)
         .option(FlinkOptions.CLUSTERING_ASYNC_ENABLED, true)
-        .option(FlinkOptions.CLUSTERING_DELTA_COMMITS,2)
+        .option(FlinkOptions.CLUSTERING_DELTA_COMMITS, 2)
         .option(FlinkOptions.CLUSTERING_TASKS, 1)
         .option(FlinkOptions.CLEAN_RETAIN_COMMITS, 1)
         .end();
@@ -409,9 +409,9 @@ public class ITTestHoodieDataSource {
     execInsertSql(streamTableEnv, insertInto);
 
     streamTableEnv.getConfig().getConfiguration()
-            .setBoolean("table.dynamic-table-options.enabled", true);
+        .setBoolean("table.dynamic-table-options.enabled", true);
     final String query = String.format("select * from t1/*+ 
options('read.start-commit'='%s')*/",
-            FlinkOptions.START_COMMIT_EARLIEST);
+        FlinkOptions.START_COMMIT_EARLIEST);
 
     List<Row> rows = execSelectSql(streamTableEnv, query, 10);
     // batch read will not lose data when cleaned clustered files.
@@ -450,12 +450,12 @@ public class ITTestHoodieDataSource {
   @Test
   void testBatchWriteWithCleaning() {
     String hoodieTableDDL = sql("t1")
-            .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
-            .option(FlinkOptions.CLEAN_RETAIN_COMMITS, 1)
-            .end();
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.CLEAN_RETAIN_COMMITS, 1)
+        .end();
     batchTableEnv.executeSql(hoodieTableDDL);
     String insertInto = "insert into t1 values\n"
-            + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')";
+        + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')";
     execInsertSql(batchTableEnv, insertInto);
     execInsertSql(batchTableEnv, insertInto);
     execInsertSql(batchTableEnv, insertInto);
@@ -466,7 +466,7 @@ public class ITTestHoodieDataSource {
     HoodieTimeline timeline = 
StreamerUtil.createMetaClient(conf).getActiveTimeline();
     assertTrue(timeline.filterCompletedInstants()
             .getInstants().stream().anyMatch(instant -> 
instant.getAction().equals("clean")),
-            "some commits should be cleaned");
+        "some commits should be cleaned");
   }
 
   @Test
@@ -1621,12 +1621,44 @@ public class ITTestHoodieDataSource {
     List<Row> result = CollectionUtil.iterableToList(
         () -> tableEnv.sqlQuery("select * from t1").execute().collect());
     List<Row> expected = Arrays.asList(
-        row(1, array("abc1", "def1"), array(1, 1),  map("abc1", 1, "def1", 3), 
row(array("abc1", "def1"), row(1, "abc1"))),
-        row(2, array("abc2", "def2"), array(2, 2),  map("abc2", 1, "def2", 3), 
row(array("abc2", "def2"), row(2, "abc2"))),
-        row(3, array("abc3", "def3"), array(3, 3),  map("abc3", 1, "def3", 3), 
row(array("abc3", "def3"), row(3, "abc3"))));
+        row(1, array("abc1", "def1"), array(1, 1), map("abc1", 1, "def1", 3), 
row(array("abc1", "def1"), row(1, "abc1"))),
+        row(2, array("abc2", "def2"), array(2, 2), map("abc2", 1, "def2", 3), 
row(array("abc2", "def2"), row(2, "abc2"))),
+        row(3, array("abc3", "def3"), array(3, 3), map("abc3", 1, "def3", 3), 
row(array("abc3", "def3"), row(3, "abc3"))));
     assertRowsEqualsUnordered(result, expected);
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = {"insert", "upsert", "bulk_insert"})
+  void testParquetArrayMapOfRowTypes(String operation) {
+    TableEnvironment tableEnv = batchTableEnv;
+
+    String hoodieTableDDL = sql("t1")
+        .field("f_int int")
+        .field("f_array array<row(f_array_row_f0 varchar(10), f_array_row_f1 
int)>")
+        .field("f_map map<varchar(20), row(f_map_row_f0 int, f_map_row_f1 
varchar(10))>")
+        .pkField("f_int")
+        .noPartition()
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.OPERATION, operation)
+        .end();
+    tableEnv.executeSql(hoodieTableDDL);
+
+    execInsertSql(tableEnv, TestSQL.ARRAY_MAP_OF_ROW_TYPE_INSERT_T1);
+
+    tableEnv.executeSql("ALTER TABLE t1 MODIFY (\n"
+        + "    f_array array<row(f_array_row_f0 varchar(10), f_array_row_f1 
int, f_array_row_f2 double)>,\n"
+        + "    f_map map<varchar(20), row(f_map_row_f0 int, f_map_row_f1 
varchar(10), f_map_row_f2 double)>\n"
+        + ");");
+
+    List<Row> result = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+    List<Row> expected = Arrays.asList(
+        row(1, array(row("abc11", 11, null), row("abc12", 12, null), 
row("abc13", 13, null)), map("abc11", row(11, "def11", null), "abc12", row(12, 
"def12", null), "abc13", row(13, "def13", null))),
+        row(2, array(row("abc21", 21, null), row("abc22", 22, null), 
row("abc23", 23, null)), map("abc21", row(21, "def21", null), "abc22", row(22, 
"def22", null), "abc23", row(23, "def23", null))),
+        row(3, array(row("abc31", 31, null), row("abc32", 32, null), 
row("abc33", 33, null)), map("abc31", row(31, "def31", null), "abc32", row(32, 
"def32", null), "abc33", row(33, "def33", null))));
+    assertRowsEqualsUnordered(expected, result);
+  }
+
   @ParameterizedTest
   @ValueSource(strings = {"insert", "upsert", "bulk_insert"})
   void testParquetNullChildColumnsRowTypes(String operation) {
@@ -2026,18 +2058,18 @@ public class ITTestHoodieDataSource {
   void testReadMetaFields(HoodieTableType tableType, String queryType, int 
numInsertBatches, int compactionDeltaCommits) throws Exception {
     String path = tempFile.getAbsolutePath();
     String hoodieTableDDL = sql("t1")
-            .field("id int")
-            .field("name varchar(10)")
-            .field("ts timestamp(6)")
-            .field("`partition` varchar(10)")
-            .pkField("id")
-            .partitionField("partition")
-            .option(FlinkOptions.TABLE_TYPE, tableType)
-            .option(FlinkOptions.QUERY_TYPE, queryType)
-            .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
-            .option(FlinkOptions.COMPACTION_DELTA_COMMITS, 
compactionDeltaCommits)
-            .option(FlinkOptions.PATH, path)
-            .end();
+        .field("id int")
+        .field("name varchar(10)")
+        .field("ts timestamp(6)")
+        .field("`partition` varchar(10)")
+        .pkField("id")
+        .partitionField("partition")
+        .option(FlinkOptions.TABLE_TYPE, tableType)
+        .option(FlinkOptions.QUERY_TYPE, queryType)
+        .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
+        .option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits)
+        .option(FlinkOptions.PATH, path)
+        .end();
     batchTableEnv.executeSql(hoodieTableDDL);
 
     final String[] insertInto = new String[] {
@@ -2076,7 +2108,7 @@ public class ITTestHoodieDataSource {
     for (int i = 0; i < numInsertBatches; i++) {
       execInsertSql(batchTableEnv, insertInto[i]);
       String commitTime = tableType.equals(HoodieTableType.MERGE_ON_READ)
-              ? TestUtils.getLastDeltaCompleteInstant(path) : 
TestUtils.getLastCompleteInstant(path);
+          ? TestUtils.getLastDeltaCompleteInstant(path) : 
TestUtils.getLastCompleteInstant(path);
       expected1.append(template1[i]);
       expected2.append(String.format(template2[i], commitTime));
       expected3.append(String.format(template3[i], commitTime));
@@ -2087,18 +2119,18 @@ public class ITTestHoodieDataSource {
     String readHoodieTableDDL;
     batchTableEnv.executeSql("drop table t1");
     readHoodieTableDDL = sql("t1")
-            .field("id int")
-            .field("name varchar(10)")
-            .field("ts timestamp(6)")
-            .field("`partition` varchar(10)")
-            .pkField("id")
-            .partitionField("partition")
-            .option(FlinkOptions.TABLE_TYPE, tableType)
-            .option(FlinkOptions.QUERY_TYPE, queryType)
-            .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
-            .option(FlinkOptions.COMPACTION_DELTA_COMMITS, 
compactionDeltaCommits)
-            .option(FlinkOptions.PATH, path)
-            .end();
+        .field("id int")
+        .field("name varchar(10)")
+        .field("ts timestamp(6)")
+        .field("`partition` varchar(10)")
+        .pkField("id")
+        .partitionField("partition")
+        .option(FlinkOptions.TABLE_TYPE, tableType)
+        .option(FlinkOptions.QUERY_TYPE, queryType)
+        .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
+        .option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits)
+        .option(FlinkOptions.PATH, path)
+        .end();
     batchTableEnv.executeSql(readHoodieTableDDL);
 
     List<Row> result = execSelectSql(batchTableEnv, "select * from t1", 
ExecMode.BATCH);
@@ -2106,21 +2138,21 @@ public class ITTestHoodieDataSource {
 
     batchTableEnv.executeSql("drop table t1");
     readHoodieTableDDL = sql("t1")
-            .field("_hoodie_commit_time string")
-            .field("_hoodie_record_key string")
-            .field("_hoodie_partition_path string")
-            .field("id int")
-            .field("name varchar(10)")
-            .field("ts timestamp(6)")
-            .field("`partition` varchar(10)")
-            .pkField("id")
-            .partitionField("partition")
-            .option(FlinkOptions.TABLE_TYPE, tableType)
-            .option(FlinkOptions.QUERY_TYPE, queryType)
-            .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
-            .option(FlinkOptions.COMPACTION_DELTA_COMMITS, 
compactionDeltaCommits)
-            .option(FlinkOptions.PATH, path)
-            .end();
+        .field("_hoodie_commit_time string")
+        .field("_hoodie_record_key string")
+        .field("_hoodie_partition_path string")
+        .field("id int")
+        .field("name varchar(10)")
+        .field("ts timestamp(6)")
+        .field("`partition` varchar(10)")
+        .pkField("id")
+        .partitionField("partition")
+        .option(FlinkOptions.TABLE_TYPE, tableType)
+        .option(FlinkOptions.QUERY_TYPE, queryType)
+        .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
+        .option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits)
+        .option(FlinkOptions.PATH, path)
+        .end();
     batchTableEnv.executeSql(readHoodieTableDDL);
 
     result = execSelectSql(batchTableEnv, "select * from t1", ExecMode.BATCH);
@@ -2128,21 +2160,21 @@ public class ITTestHoodieDataSource {
 
     batchTableEnv.executeSql("drop table t1");
     readHoodieTableDDL = sql("t1")
-            .field("id int")
-            .field("_hoodie_commit_time string")
-            .field("name varchar(10)")
-            .field("_hoodie_record_key string")
-            .field("ts timestamp(6)")
-            .field("_hoodie_partition_path string")
-            .field("`partition` varchar(10)")
-            .pkField("id")
-            .partitionField("partition")
-            .option(FlinkOptions.TABLE_TYPE, tableType)
-            .option(FlinkOptions.QUERY_TYPE, queryType)
-            .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
-            .option(FlinkOptions.COMPACTION_DELTA_COMMITS, 
compactionDeltaCommits)
-            .option(FlinkOptions.PATH, path)
-            .end();
+        .field("id int")
+        .field("_hoodie_commit_time string")
+        .field("name varchar(10)")
+        .field("_hoodie_record_key string")
+        .field("ts timestamp(6)")
+        .field("_hoodie_partition_path string")
+        .field("`partition` varchar(10)")
+        .pkField("id")
+        .partitionField("partition")
+        .option(FlinkOptions.TABLE_TYPE, tableType)
+        .option(FlinkOptions.QUERY_TYPE, queryType)
+        .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
+        .option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits)
+        .option(FlinkOptions.PATH, path)
+        .end();
     batchTableEnv.executeSql(readHoodieTableDDL);
 
     result = execSelectSql(batchTableEnv, "select * from t1", ExecMode.BATCH);
@@ -2300,11 +2332,11 @@ public class ITTestHoodieDataSource {
    */
   private static Stream<Arguments> 
tableTypeQueryTypeNumInsertAndCompactionDeltaCommitsParams() {
     return Arrays.stream(new Object[][] {
-            {HoodieTableType.COPY_ON_WRITE, 
FlinkOptions.QUERY_TYPE_INCREMENTAL, 1, 1},
-            {HoodieTableType.COPY_ON_WRITE, 
FlinkOptions.QUERY_TYPE_READ_OPTIMIZED, 1, 1},
-            {HoodieTableType.MERGE_ON_READ, FlinkOptions.QUERY_TYPE_SNAPSHOT, 
1, 1},
-            {HoodieTableType.MERGE_ON_READ, FlinkOptions.QUERY_TYPE_SNAPSHOT, 
1, 3},
-            {HoodieTableType.MERGE_ON_READ, FlinkOptions.QUERY_TYPE_SNAPSHOT, 
3, 2}
+        {HoodieTableType.COPY_ON_WRITE, FlinkOptions.QUERY_TYPE_INCREMENTAL, 
1, 1},
+        {HoodieTableType.COPY_ON_WRITE, 
FlinkOptions.QUERY_TYPE_READ_OPTIMIZED, 1, 1},
+        {HoodieTableType.MERGE_ON_READ, FlinkOptions.QUERY_TYPE_SNAPSHOT, 1, 
1},
+        {HoodieTableType.MERGE_ON_READ, FlinkOptions.QUERY_TYPE_SNAPSHOT, 1, 
3},
+        {HoodieTableType.MERGE_ON_READ, FlinkOptions.QUERY_TYPE_SNAPSHOT, 3, 2}
     }).map(Arguments::of);
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
index 46f51df741f..ddb29cdbea7 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
@@ -180,6 +180,8 @@ public class ITTestSchemaEvolution {
         + "  f_struct row<f0 int, f1 string, drop_add string, change_type 
int>,"
         + "  f_map map<string, int>,"
         + "  f_array array<int>,"
+        + "  f_row_map map<string, row<f0 int, f1 string, drop_add string, 
change_type int>>,"
+        + "  f_row_array array<row<f0 int, f1 string, drop_add string, 
change_type int>>,"
         + "  `partition` string"
         + ") partitioned by (`partition`) with (" + tableOptions + ")"
     );
@@ -195,18 +197,29 @@ public class ITTestSchemaEvolution {
         + "  cast(f_struct as row<f0 int, f1 string, drop_add string, 
change_type int>),"
         + "  cast(f_map as map<string, int>),"
         + "  cast(f_array as array<int>),"
+        + "  cast(f_row_map as map<string, row< f0 int, f1 string, drop_add 
string, change_type int>>),"
+        + "  cast(f_row_array as array<row< f0 int, f1 string, drop_add 
string, change_type int>>),"
         + "  cast(`partition` as string) "
         + "from (values "
-        + "  ('id0', 'Indica', 'F', 12, '2000-01-01 00:00:00', cast(null as 
row<f0 int, f1 string, drop_add string, change_type int>), map['Indica', 1212], 
array[12], 'par0'),"
-        + "  ('id1', 'Danny', 'M', 23, '2000-01-01 00:00:01', row(1, 's1', '', 
1), cast(map['Danny', 2323] as map<string, int>), array[23, 23], 'par1'),"
-        + "  ('id2', 'Stephen', 'M', 33, '2000-01-01 00:00:02', row(2, 's2', 
'', 2), cast(map['Stephen', 3333] as map<string, int>), array[33], 'par1'),"
-        + "  ('id3', 'Julian', 'M', 53, '2000-01-01 00:00:03', row(3, 's3', 
'', 3), cast(map['Julian', 5353] as map<string, int>), array[53, 53], 'par2'),"
-        + "  ('id4', 'Fabian', 'M', 31, '2000-01-01 00:00:04', row(4, 's4', 
'', 4), cast(map['Fabian', 3131] as map<string, int>), array[31], 'par2'),"
-        + "  ('id5', 'Sophia', 'F', 18, '2000-01-01 00:00:05', row(5, 's5', 
'', 5), cast(map['Sophia', 1818] as map<string, int>), array[18, 18], 'par3'),"
-        + "  ('id6', 'Emma', 'F', 20, '2000-01-01 00:00:06', row(6, 's6', '', 
6), cast(map['Emma', 2020] as map<string, int>), array[20], 'par3'),"
-        + "  ('id7', 'Bob', 'M', 44, '2000-01-01 00:00:07', row(7, 's7', '', 
7), cast(map['Bob', 4444] as map<string, int>), array[44, 44], 'par4'),"
-        + "  ('id8', 'Han', 'M', 56, '2000-01-01 00:00:08', row(8, 's8', '', 
8), cast(map['Han', 5656] as map<string, int>), array[56, 56, 56], 'par4')"
-        + ") as A(uuid, name, gender, age, ts, f_struct, f_map, f_array, 
`partition`)"
+        + "  ('id0', 'Indica', 'F', 12, '2000-01-01 00:00:00', cast(null as 
row<f0 int, f1 string, drop_add string, change_type int>), map['Indica', 1212], 
array[12], "
+        + "  cast(null as map<string, row<f0 int, f1 string, drop_add string, 
change_type int>>), array[row(0, 's0', '', 0)], 'par0'),"
+        + "  ('id1', 'Danny', 'M', 23, '2000-01-01 00:00:01', row(1, 's1', '', 
1), cast(map['Danny', 2323] as map<string, int>), array[23, 23], "
+        + "  cast(map['Danny', row(1, 's1', '', 1)] as map<string, row<f0 int, 
f1 string, drop_add string, change_type int>>), array[row(1, 's1', '', 1)], 
'par1'),"
+        + "  ('id2', 'Stephen', 'M', 33, '2000-01-01 00:00:02', row(2, 's2', 
'', 2), cast(map['Stephen', 3333] as map<string, int>), array[33], "
+        + "  cast(map['Stephen', row(2, 's2', '', 2)] as map<string, row<f0 
int, f1 string, drop_add string, change_type int>>), array[row(2, 's2', '', 
2)], 'par1'),"
+        + "  ('id3', 'Julian', 'M', 53, '2000-01-01 00:00:03', row(3, 's3', 
'', 3), cast(map['Julian', 5353] as map<string, int>), array[53, 53], "
+        + "  cast(map['Julian', row(3, 's3', '', 3)] as map<string, row<f0 
int, f1 string, drop_add string, change_type int>>), array[row(3, 's3', '', 
3)], 'par2'),"
+        + "  ('id4', 'Fabian', 'M', 31, '2000-01-01 00:00:04', row(4, 's4', 
'', 4), cast(map['Fabian', 3131] as map<string, int>), array[31], "
+        + "  cast(map['Fabian', row(4, 's4', '', 4)] as map<string, row<f0 
int, f1 string, drop_add string, change_type int>>), array[row(4, 's4', '', 
4)], 'par2'),"
+        + "  ('id5', 'Sophia', 'F', 18, '2000-01-01 00:00:05', row(5, 's5', 
'', 5), cast(map['Sophia', 1818] as map<string, int>), array[18, 18], "
+        + "  cast(map['Sophia', row(5, 's5', '', 5)] as map<string, row<f0 
int, f1 string, drop_add string, change_type int>>), array[row(5, 's5', '', 
5)], 'par3'),"
+        + "  ('id6', 'Emma', 'F', 20, '2000-01-01 00:00:06', row(6, 's6', '', 
6), cast(map['Emma', 2020] as map<string, int>), array[20], "
+        + "  cast(map['Emma', row(6, 's6', '', 6)] as map<string, row<f0 int, 
f1 string, drop_add string, change_type int>>), array[row(6, 's6', '', 6)], 
'par3'),"
+        + "  ('id7', 'Bob', 'M', 44, '2000-01-01 00:00:07', row(7, 's7', '', 
7), cast(map['Bob', 4444] as map<string, int>), array[44, 44], "
+        + "  cast(map['Bob', row(7, 's7', '', 7)] as map<string, row<f0 int, 
f1 string, drop_add string, change_type int>>), array[row(7, 's7', '', 7)], 
'par4'),"
+        + "  ('id8', 'Han', 'M', 56, '2000-01-01 00:00:08', row(8, 's8', '', 
8), cast(map['Han', 5656] as map<string, int>), array[56, 56, 56], "
+        + "  cast(map['Han', row(8, 's8', '', 8)] as map<string, row<f0 int, 
f1 string, drop_add string, change_type int>>), array[row(8, 's8', '', 8)], 
'par4')"
+        + ") as A(uuid, name, gender, age, ts, f_struct, f_map, f_array, 
f_row_map, f_row_array, `partition`)"
     ).await();
   }
 
@@ -232,19 +245,25 @@ public class ITTestSchemaEvolution {
       writeClient.updateColumnType("age", Types.StringType.get());
       writeClient.addColumn("last_name", stringType, "empty allowed", 
"salary", BEFORE);
       writeClient.reOrderColPosition("age", "first_name", BEFORE);
-      // add a field in the middle of the `f_struct` column
+      // add a field in the middle of the `f_struct` and `f_row_map` columns
       writeClient.addColumn("f_struct.f2", intType, "add field in middle of 
struct", "f_struct.f0", AFTER);
-      // add a field at the end of `f_struct` column
+      writeClient.addColumn("f_row_map.value.f2", intType, "add field in 
middle of struct", "f_row_map.value.f0", AFTER);
+      // add a field at the end of `f_struct` and `f_row_map` column
       writeClient.addColumn("f_struct.f3", stringType);
+      writeClient.addColumn("f_row_map.value.f3", stringType);
 
       // delete and add a field with the same name
       // reads should not return previously inserted datum of dropped field of 
the same name
       writeClient.deleteColumns("f_struct.drop_add");
       writeClient.addColumn("f_struct.drop_add", doubleType);
+      writeClient.deleteColumns("f_row_map.value.drop_add");
+      writeClient.addColumn("f_row_map.value.drop_add", doubleType);
 
       // perform comprehensive evolution on complex types (struct, array, map) 
by promoting its primitive types
       writeClient.updateColumnType("f_struct.change_type", 
Types.LongType.get());
       writeClient.renameColumn("f_struct.change_type", "renamed_change_type");
+      writeClient.updateColumnType("f_row_map.value.change_type", 
Types.LongType.get());
+      writeClient.renameColumn("f_row_map.value.change_type", 
"renamed_change_type");
       writeClient.updateColumnType("f_array.element", Types.DoubleType.get());
       writeClient.updateColumnType("f_map.value", Types.DoubleType.get());
 
@@ -256,6 +275,8 @@ public class ITTestSchemaEvolution {
       // perform comprehensive evolution on a struct column by reordering 
field positions
       writeClient.updateColumnType("f_struct.f0", Types.DecimalType.get(20, 
0));
       writeClient.reOrderColPosition("f_struct.f0", "f_struct.drop_add", 
AFTER);
+      writeClient.updateColumnType("f_row_map.value.f0", 
Types.DecimalType.get(20, 0));
+      writeClient.reOrderColPosition("f_row_map.value.f0", 
"f_row_map.value.drop_add", AFTER);
     }
   }
 
@@ -278,6 +299,8 @@ public class ITTestSchemaEvolution {
         + "  f_struct row<f2 int, f1 string, renamed_change_type bigint, f3 
string, drop_add string, f0 decimal(20, 0)>,"
         + "  f_map map<string, double>,"
         + "  f_array array<double>,"
+        + "  f_row_map map<string, row<f2 int, f1 string, renamed_change_type 
bigint, f3 string, drop_add string, f0 decimal(20, 0)>>,"
+        + "  f_row_array array<row<f0 int, f1 string, drop_add string, 
change_type int>>,"
         + "  new_row_col row<f0 bigint, f1 string>,"
         + "  new_array_col array<string>,"
         + "  new_map_col map<string, string>,"
@@ -296,18 +319,26 @@ public class ITTestSchemaEvolution {
         + "  cast(f_struct as row<f2 int, f1 string, renamed_change_type 
bigint, f3 string, drop_add string, f0 decimal(20, 0)>),"
         + "  cast(f_map as map<string, double>),"
         + "  cast(f_array as array<double>),"
+        + "  cast(f_row_map as map<string, row<f2 int, f1 string, 
renamed_change_type bigint, f3 string, drop_add string, f0 decimal(20, 0)>>),"
+        + "  cast(f_row_array as array<row<f0 int, f1 string, drop_add string, 
change_type int>>),"
         + "  cast(new_row_col as row<f0 bigint, f1 string>),"
         + "  cast(new_array_col as array<string>),"
         + "  cast(new_map_col as map<string, string>),"
         + "  cast(`partition` as string) "
         + "from (values "
         + "  ('id1', '23', 'Danny', '', 10000.1, '2000-01-01 00:00:01', row(1, 
's1', 11, 't1', 'drop_add1', 1), cast(map['Danny', 2323.23] as map<string, 
double>), array[23, 23, 23], "
+        + "  cast(map['Danny', row(1, 's1', 11, 't1', 'drop_add1', 1)] as 
map<string, row<f2 int, f1 string, renamed_change_type bigint, f3 string, 
drop_add string, f0 decimal(20, 0)>>), "
+        + "  array[row(1, 's1', '', 1)], "
         + "  row(1, '1'), array['1'], Map['k1','v1'], 'par1'),"
         + "  ('id9', 'unknown', 'Alice', '', 90000.9, '2000-01-01 00:00:09', 
row(9, 's9', 99, 't9', 'drop_add9', 9), cast(map['Alice', 9999.99] as 
map<string, double>), array[9999, 9999], "
+        + "  cast(map['Alice', row(9, 's9', 99, 't9', 'drop_add9', 9)] as 
map<string, row<f2 int, f1 string, renamed_change_type bigint, f3 string, 
drop_add string, f0 decimal(20, 0)>>), "
+        + "  array[row(9, 's9', '', 9)], "
         + "  row(9, '9'), array['9'], Map['k9','v9'], 'par1'),"
         + "  ('id3', '53', 'Julian', '', 30000.3, '2000-01-01 00:00:03', 
row(3, 's3', 33, 't3', 'drop_add3', 3), cast(map['Julian', 5353.53] as 
map<string, double>), array[53], "
+        + "  cast(map['Julian', row(3, 's3', 33, 't3', 'drop_add3', 3)] as 
map<string, row<f2 int, f1 string, renamed_change_type bigint, f3 string, 
drop_add string, f0 decimal(20, 0)>>), "
+        + "  array[row(3, 's3', '', 3)], "
         + "  row(3, '3'), array['3'], Map['k3','v3'], 'par2')"
-        + ") as A(uuid, age, first_name, last_name, salary, ts, f_struct, 
f_map, f_array, new_row_col, new_array_col, new_map_col, `partition`)"
+        + ") as A(uuid, age, first_name, last_name, salary, ts, f_struct, 
f_map, f_array, f_row_map, f_row_array, new_row_col, new_array_col, 
new_map_col, `partition`)"
     ).await();
   }
 
@@ -345,6 +376,8 @@ public class ITTestSchemaEvolution {
         + "  f_struct, "
         + "  f_map, "
         + "  f_array, "
+        + "  f_row_map, "
+        + "  f_row_array, "
         + "  new_row_col, "
         + "  new_array_col, "
         + "  new_map_col "
@@ -376,6 +409,8 @@ public class ITTestSchemaEvolution {
         + "  f_struct row<f2 int, f1 string, renamed_change_type bigint, f3 
string, drop_add string, f0 decimal(20, 0)>,"
         + "  f_map map<string, double>,"
         + "  f_array array<double>,"
+        + "  f_row_map map<string, row<f2 int, f1 string, renamed_change_type 
bigint, f3 string, drop_add string, f0 decimal(20, 0)>>,"
+        + "  f_row_array array<row<f0 int, f1 string, drop_add string, 
change_type int>>,"
         + "  new_row_col row<f0 bigint, f1 string>,"
         + "  new_array_col array<string>,"
         + "  new_map_col map<string, string>,"
@@ -392,6 +427,8 @@ public class ITTestSchemaEvolution {
         + "  f_struct, "
         + "  f_map, "
         + "  f_array, "
+        + "  f_row_map, "
+        + "  f_row_array, "
         + "  new_row_col, "
         + "  new_array_col, "
         + "  new_map_col "
@@ -427,6 +464,16 @@ public class ITTestSchemaEvolution {
       executor.shutdownNow();
     }
 
+    for (String expectedItem : expected) {
+      if (!actual.contains(expectedItem)) {
+        System.out.println("Not in actual: " + expectedItem);
+      }
+    }
+    for (String actualItem : actual) {
+      if (!expected.contains(actualItem)) {
+        System.out.println("Not in expected: " + actualItem);
+      }
+    }
     assertEquals(expected, actual);
   }
 
@@ -472,30 +519,31 @@ public class ITTestSchemaEvolution {
     }
   }
 
+  //TODO: null arrays have a single null row; array with null vs array with 
row will all null values
   private static final ExpectedResult EXPECTED_MERGED_RESULT = new 
ExpectedResult(
       new String[] {
-          "+I[Indica, null, 12, null, {Indica=1212.0}, [12.0], null, null, 
null]",
-          "+I[Danny, 10000.1, 23, +I[1, s1, 11, t1, drop_add1, 1], 
{Danny=2323.23}, [23.0, 23.0, 23.0], +I[1, 1], [1], {k1=v1}]",
-          "+I[Stephen, null, 33, +I[null, s2, 2, null, null, 2], 
{Stephen=3333.0}, [33.0], null, null, null]",
-          "+I[Julian, 30000.3, 53, +I[3, s3, 33, t3, drop_add3, 3], 
{Julian=5353.53}, [53.0], +I[3, 3], [3], {k3=v3}]",
-          "+I[Fabian, null, 31, +I[null, s4, 4, null, null, 4], 
{Fabian=3131.0}, [31.0], null, null, null]",
-          "+I[Sophia, null, 18, +I[null, s5, 5, null, null, 5], 
{Sophia=1818.0}, [18.0, 18.0], null, null, null]",
-          "+I[Emma, null, 20, +I[null, s6, 6, null, null, 6], {Emma=2020.0}, 
[20.0], null, null, null]",
-          "+I[Bob, null, 44, +I[null, s7, 7, null, null, 7], {Bob=4444.0}, 
[44.0, 44.0], null, null, null]",
-          "+I[Han, null, 56, +I[null, s8, 8, null, null, 8], {Han=5656.0}, 
[56.0, 56.0, 56.0], null, null, null]",
-          "+I[Alice, 90000.9, unknown, +I[9, s9, 99, t9, drop_add9, 9], 
{Alice=9999.99}, [9999.0, 9999.0], +I[9, 9], [9], {k9=v9}]",
+          "+I[Indica, null, 12, null, {Indica=1212.0}, [12.0], null, [+I[0, 
s0, , 0]], null, null, null]",
+          "+I[Danny, 10000.1, 23, +I[1, s1, 11, t1, drop_add1, 1], 
{Danny=2323.23}, [23.0, 23.0, 23.0], {Danny=+I[1, s1, 11, t1, drop_add1, 1]}, 
[+I[1, s1, , 1]], +I[1, 1], [1], {k1=v1}]",
+          "+I[Stephen, null, 33, +I[null, s2, 2, null, null, 2], 
{Stephen=3333.0}, [33.0], {Stephen=+I[null, s2, 2, null, null, 2]}, [+I[2, s2, 
, 2]], null, null, null]",
+          "+I[Julian, 30000.3, 53, +I[3, s3, 33, t3, drop_add3, 3], 
{Julian=5353.53}, [53.0], {Julian=+I[3, s3, 33, t3, drop_add3, 3]}, [+I[3, s3, 
, 3]], +I[3, 3], [3], {k3=v3}]",
+          "+I[Fabian, null, 31, +I[null, s4, 4, null, null, 4], 
{Fabian=3131.0}, [31.0], {Fabian=+I[null, s4, 4, null, null, 4]}, [+I[4, s4, , 
4]], null, null, null]",
+          "+I[Sophia, null, 18, +I[null, s5, 5, null, null, 5], 
{Sophia=1818.0}, [18.0, 18.0], {Sophia=+I[null, s5, 5, null, null, 5]}, [+I[5, 
s5, , 5]], null, null, null]",
+          "+I[Emma, null, 20, +I[null, s6, 6, null, null, 6], {Emma=2020.0}, 
[20.0], {Emma=+I[null, s6, 6, null, null, 6]}, [+I[6, s6, , 6]], null, null, 
null]",
+          "+I[Bob, null, 44, +I[null, s7, 7, null, null, 7], {Bob=4444.0}, 
[44.0, 44.0], {Bob=+I[null, s7, 7, null, null, 7]}, [+I[7, s7, , 7]], null, 
null, null]",
+          "+I[Han, null, 56, +I[null, s8, 8, null, null, 8], {Han=5656.0}, 
[56.0, 56.0, 56.0], {Han=+I[null, s8, 8, null, null, 8]}, [+I[8, s8, , 8]], 
null, null, null]",
+          "+I[Alice, 90000.9, unknown, +I[9, s9, 99, t9, drop_add9, 9], 
{Alice=9999.99}, [9999.0, 9999.0], {Alice=+I[9, s9, 99, t9, drop_add9, 9]}, 
[+I[9, s9, , 9]], +I[9, 9], [9], {k9=v9}]",
       },
       new String[] {
-          "+I[id0, Indica, null, 12, null, {Indica=1212.0}, [12.0], null, 
null, null]",
-          "+I[id1, Danny, 10000.1, 23, +I[1, s1, 11, t1, drop_add1, 1], 
{Danny=2323.23}, [23.0, 23.0, 23.0], +I[1, 1], [1], {k1=v1}]",
-          "+I[id2, Stephen, null, 33, +I[null, s2, 2, null, null, 2], 
{Stephen=3333.0}, [33.0], null, null, null]",
-          "+I[id3, Julian, 30000.3, 53, +I[3, s3, 33, t3, drop_add3, 3], 
{Julian=5353.53}, [53.0], +I[3, 3], [3], {k3=v3}]",
-          "+I[id4, Fabian, null, 31, +I[null, s4, 4, null, null, 4], 
{Fabian=3131.0}, [31.0], null, null, null]",
-          "+I[id5, Sophia, null, 18, +I[null, s5, 5, null, null, 5], 
{Sophia=1818.0}, [18.0, 18.0], null, null, null]",
-          "+I[id6, Emma, null, 20, +I[null, s6, 6, null, null, 6], 
{Emma=2020.0}, [20.0], null, null, null]",
-          "+I[id7, Bob, null, 44, +I[null, s7, 7, null, null, 7], 
{Bob=4444.0}, [44.0, 44.0], null, null, null]",
-          "+I[id8, Han, null, 56, +I[null, s8, 8, null, null, 8], 
{Han=5656.0}, [56.0, 56.0, 56.0], null, null, null]",
-          "+I[id9, Alice, 90000.9, unknown, +I[9, s9, 99, t9, drop_add9, 9], 
{Alice=9999.99}, [9999.0, 9999.0], +I[9, 9], [9], {k9=v9}]",
+          "+I[id0, Indica, null, 12, null, {Indica=1212.0}, [12.0], null, 
[+I[0, s0, , 0]], null, null, null]",
+          "+I[id1, Danny, 10000.1, 23, +I[1, s1, 11, t1, drop_add1, 1], 
{Danny=2323.23}, [23.0, 23.0, 23.0], {Danny=+I[1, s1, 11, t1, drop_add1, 1]}, 
[+I[1, s1, , 1]], +I[1, 1], [1], {k1=v1}]",
+          "+I[id2, Stephen, null, 33, +I[null, s2, 2, null, null, 2], 
{Stephen=3333.0}, [33.0], {Stephen=+I[null, s2, 2, null, null, 2]}, [+I[2, s2, 
, 2]], null, null, null]",
+          "+I[id3, Julian, 30000.3, 53, +I[3, s3, 33, t3, drop_add3, 3], 
{Julian=5353.53}, [53.0], {Julian=+I[3, s3, 33, t3, drop_add3, 3]}, [+I[3, s3, 
, 3]], +I[3, 3], [3], {k3=v3}]",
+          "+I[id4, Fabian, null, 31, +I[null, s4, 4, null, null, 4], 
{Fabian=3131.0}, [31.0], {Fabian=+I[null, s4, 4, null, null, 4]}, [+I[4, s4, , 
4]], null, null, null]",
+          "+I[id5, Sophia, null, 18, +I[null, s5, 5, null, null, 5], 
{Sophia=1818.0}, [18.0, 18.0], {Sophia=+I[null, s5, 5, null, null, 5]}, [+I[5, 
s5, , 5]], null, null, null]",
+          "+I[id6, Emma, null, 20, +I[null, s6, 6, null, null, 6], 
{Emma=2020.0}, [20.0], {Emma=+I[null, s6, 6, null, null, 6]}, [+I[6, s6, , 6]], 
null, null, null]",
+          "+I[id7, Bob, null, 44, +I[null, s7, 7, null, null, 7], 
{Bob=4444.0}, [44.0, 44.0], {Bob=+I[null, s7, 7, null, null, 7]}, [+I[7, s7, , 
7]], null, null, null]",
+          "+I[id8, Han, null, 56, +I[null, s8, 8, null, null, 8], 
{Han=5656.0}, [56.0, 56.0, 56.0], {Han=+I[null, s8, 8, null, null, 8]}, [+I[8, 
s8, , 8]], null, null, null]",
+          "+I[id9, Alice, 90000.9, unknown, +I[9, s9, 99, t9, drop_add9, 9], 
{Alice=9999.99}, [9999.0, 9999.0], {Alice=+I[9, s9, 99, t9, drop_add9, 9]}, 
[+I[9, s9, , 9]], +I[9, 9], [9], {k9=v9}]",
       },
       new String[] {
           "+I[1]",
@@ -522,32 +570,32 @@ public class ITTestSchemaEvolution {
 
   private static final ExpectedResult EXPECTED_UNMERGED_RESULT = new 
ExpectedResult(
       new String[] {
-          "+I[Indica, null, 12, null, {Indica=1212.0}, [12.0], null, null, 
null]",
-          "+I[Danny, null, 23, +I[null, s1, 1, null, null, 1], {Danny=2323.0}, 
[23.0, 23.0], null, null, null]",
-          "+I[Stephen, null, 33, +I[null, s2, 2, null, null, 2], 
{Stephen=3333.0}, [33.0], null, null, null]",
-          "+I[Julian, null, 53, +I[null, s3, 3, null, null, 3], 
{Julian=5353.0}, [53.0, 53.0], null, null, null]",
-          "+I[Fabian, null, 31, +I[null, s4, 4, null, null, 4], 
{Fabian=3131.0}, [31.0], null, null, null]",
-          "+I[Sophia, null, 18, +I[null, s5, 5, null, null, 5], 
{Sophia=1818.0}, [18.0, 18.0], null, null, null]",
-          "+I[Emma, null, 20, +I[null, s6, 6, null, null, 6], {Emma=2020.0}, 
[20.0], null, null, null]",
-          "+I[Bob, null, 44, +I[null, s7, 7, null, null, 7], {Bob=4444.0}, 
[44.0, 44.0], null, null, null]",
-          "+I[Han, null, 56, +I[null, s8, 8, null, null, 8], {Han=5656.0}, 
[56.0, 56.0, 56.0], null, null, null]",
-          "+I[Alice, 90000.9, unknown, +I[9, s9, 99, t9, drop_add9, 9], 
{Alice=9999.99}, [9999.0, 9999.0], +I[9, 9], [9], {k9=v9}]",
-          "+I[Danny, 10000.1, 23, +I[1, s1, 11, t1, drop_add1, 1], 
{Danny=2323.23}, [23.0, 23.0, 23.0], +I[1, 1], [1], {k1=v1}]",
-          "+I[Julian, 30000.3, 53, +I[3, s3, 33, t3, drop_add3, 3], 
{Julian=5353.53}, [53.0], +I[3, 3], [3], {k3=v3}]",
+          "+I[Indica, null, 12, null, {Indica=1212.0}, [12.0], null, [+I[0, 
s0, , 0]], null, null, null]",
+          "+I[Danny, null, 23, +I[null, s1, 1, null, null, 1], {Danny=2323.0}, 
[23.0, 23.0], {Danny=+I[null, s1, 1, null, null, 1]}, [+I[1, s1, , 1]], null, 
null, null]",
+          "+I[Stephen, null, 33, +I[null, s2, 2, null, null, 2], 
{Stephen=3333.0}, [33.0], {Stephen=+I[null, s2, 2, null, null, 2]}, [+I[2, s2, 
, 2]], null, null, null]",
+          "+I[Julian, null, 53, +I[null, s3, 3, null, null, 3], 
{Julian=5353.0}, [53.0, 53.0], {Julian=+I[null, s3, 3, null, null, 3]}, [+I[3, 
s3, , 3]], null, null, null]",
+          "+I[Fabian, null, 31, +I[null, s4, 4, null, null, 4], 
{Fabian=3131.0}, [31.0], {Fabian=+I[null, s4, 4, null, null, 4]}, [+I[4, s4, , 
4]], null, null, null]",
+          "+I[Sophia, null, 18, +I[null, s5, 5, null, null, 5], 
{Sophia=1818.0}, [18.0, 18.0], {Sophia=+I[null, s5, 5, null, null, 5]}, [+I[5, 
s5, , 5]], null, null, null]",
+          "+I[Emma, null, 20, +I[null, s6, 6, null, null, 6], {Emma=2020.0}, 
[20.0], {Emma=+I[null, s6, 6, null, null, 6]}, [+I[6, s6, , 6]], null, null, 
null]",
+          "+I[Bob, null, 44, +I[null, s7, 7, null, null, 7], {Bob=4444.0}, 
[44.0, 44.0], {Bob=+I[null, s7, 7, null, null, 7]}, [+I[7, s7, , 7]], null, 
null, null]",
+          "+I[Han, null, 56, +I[null, s8, 8, null, null, 8], {Han=5656.0}, 
[56.0, 56.0, 56.0], {Han=+I[null, s8, 8, null, null, 8]}, [+I[8, s8, , 8]], 
null, null, null]",
+          "+I[Alice, 90000.9, unknown, +I[9, s9, 99, t9, drop_add9, 9], 
{Alice=9999.99}, [9999.0, 9999.0], {Alice=+I[9, s9, 99, t9, drop_add9, 9]}, 
[+I[9, s9, , 9]], +I[9, 9], [9], {k9=v9}]",
+          "+I[Danny, 10000.1, 23, +I[1, s1, 11, t1, drop_add1, 1], 
{Danny=2323.23}, [23.0, 23.0, 23.0], {Danny=+I[1, s1, 11, t1, drop_add1, 1]}, 
[+I[1, s1, , 1]], +I[1, 1], [1], {k1=v1}]",
+          "+I[Julian, 30000.3, 53, +I[3, s3, 33, t3, drop_add3, 3], 
{Julian=5353.53}, [53.0], {Julian=+I[3, s3, 33, t3, drop_add3, 3]}, [+I[3, s3, 
, 3]], +I[3, 3], [3], {k3=v3}]",
       },
       new String[] {
-          "+I[id0, Indica, null, 12, null, {Indica=1212.0}, [12.0], null, 
null, null]",
-          "+I[id1, Danny, null, 23, +I[null, s1, 1, null, null, 1], 
{Danny=2323.0}, [23.0, 23.0], null, null, null]",
-          "+I[id2, Stephen, null, 33, +I[null, s2, 2, null, null, 2], 
{Stephen=3333.0}, [33.0], null, null, null]",
-          "+I[id3, Julian, null, 53, +I[null, s3, 3, null, null, 3], 
{Julian=5353.0}, [53.0, 53.0], null, null, null]",
-          "+I[id4, Fabian, null, 31, +I[null, s4, 4, null, null, 4], 
{Fabian=3131.0}, [31.0], null, null, null]",
-          "+I[id5, Sophia, null, 18, +I[null, s5, 5, null, null, 5], 
{Sophia=1818.0}, [18.0, 18.0], null, null, null]",
-          "+I[id6, Emma, null, 20, +I[null, s6, 6, null, null, 6], 
{Emma=2020.0}, [20.0], null, null, null]",
-          "+I[id7, Bob, null, 44, +I[null, s7, 7, null, null, 7], 
{Bob=4444.0}, [44.0, 44.0], null, null, null]",
-          "+I[id8, Han, null, 56, +I[null, s8, 8, null, null, 8], 
{Han=5656.0}, [56.0, 56.0, 56.0], null, null, null]",
-          "+I[id9, Alice, 90000.9, unknown, +I[9, s9, 99, t9, drop_add9, 9], 
{Alice=9999.99}, [9999.0, 9999.0], +I[9, 9], [9], {k9=v9}]",
-          "+I[id1, Danny, 10000.1, 23, +I[1, s1, 11, t1, drop_add1, 1], 
{Danny=2323.23}, [23.0, 23.0, 23.0], +I[1, 1], [1], {k1=v1}]",
-          "+I[id3, Julian, 30000.3, 53, +I[3, s3, 33, t3, drop_add3, 3], 
{Julian=5353.53}, [53.0], +I[3, 3], [3], {k3=v3}]",
+          "+I[id0, Indica, null, 12, null, {Indica=1212.0}, [12.0], null, 
[+I[0, s0, , 0]], null, null, null]",
+          "+I[id1, Danny, null, 23, +I[null, s1, 1, null, null, 1], 
{Danny=2323.0}, [23.0, 23.0], {Danny=+I[null, s1, 1, null, null, 1]}, [+I[1, 
s1, , 1]], null, null, null]",
+          "+I[id2, Stephen, null, 33, +I[null, s2, 2, null, null, 2], 
{Stephen=3333.0}, [33.0], {Stephen=+I[null, s2, 2, null, null, 2]}, [+I[2, s2, 
, 2]], null, null, null]",
+          "+I[id3, Julian, null, 53, +I[null, s3, 3, null, null, 3], 
{Julian=5353.0}, [53.0, 53.0], {Julian=+I[null, s3, 3, null, null, 3]}, [+I[3, 
s3, , 3]], null, null, null]",
+          "+I[id4, Fabian, null, 31, +I[null, s4, 4, null, null, 4], 
{Fabian=3131.0}, [31.0], {Fabian=+I[null, s4, 4, null, null, 4]}, [+I[4, s4, , 
4]], null, null, null]",
+          "+I[id5, Sophia, null, 18, +I[null, s5, 5, null, null, 5], 
{Sophia=1818.0}, [18.0, 18.0], {Sophia=+I[null, s5, 5, null, null, 5]}, [+I[5, 
s5, , 5]], null, null, null]",
+          "+I[id6, Emma, null, 20, +I[null, s6, 6, null, null, 6], 
{Emma=2020.0}, [20.0], {Emma=+I[null, s6, 6, null, null, 6]}, [+I[6, s6, , 6]], 
null, null, null]",
+          "+I[id7, Bob, null, 44, +I[null, s7, 7, null, null, 7], 
{Bob=4444.0}, [44.0, 44.0], {Bob=+I[null, s7, 7, null, null, 7]}, [+I[7, s7, , 
7]], null, null, null]",
+          "+I[id8, Han, null, 56, +I[null, s8, 8, null, null, 8], 
{Han=5656.0}, [56.0, 56.0, 56.0], {Han=+I[null, s8, 8, null, null, 8]}, [+I[8, 
s8, , 8]], null, null, null]",
+          "+I[id9, Alice, 90000.9, unknown, +I[9, s9, 99, t9, drop_add9, 9], 
{Alice=9999.99}, [9999.0, 9999.0], {Alice=+I[9, s9, 99, t9, drop_add9, 9]}, 
[+I[9, s9, , 9]], +I[9, 9], [9], {k9=v9}]",
+          "+I[id1, Danny, 10000.1, 23, +I[1, s1, 11, t1, drop_add1, 1], 
{Danny=2323.23}, [23.0, 23.0, 23.0], {Danny=+I[1, s1, 11, t1, drop_add1, 1]}, 
[+I[1, s1, , 1]], +I[1, 1], [1], {k1=v1}]",
+          "+I[id3, Julian, 30000.3, 53, +I[3, s3, 33, t3, drop_add3, 3], 
{Julian=5353.53}, [53.0], {Julian=+I[3, s3, 33, t3, drop_add3, 3]}, [+I[3, s3, 
, 3]], +I[3, 3], [3], {k3=v3}]",
       },
       new String[] {
           "+I[1]",
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
index 71295d93b10..92d44a8e7ef 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
@@ -97,6 +97,16 @@ public class TestConfigurations {
               DataTypes.FIELD("change_type", DataTypes.INT()))),
           DataTypes.FIELD("f_map", DataTypes.MAP(DataTypes.STRING(), 
DataTypes.INT())),
           DataTypes.FIELD("f_array", DataTypes.ARRAY(DataTypes.INT())),
+          DataTypes.FIELD("f_row_map", DataTypes.MAP(DataTypes.STRING(), 
DataTypes.ROW(
+              DataTypes.FIELD("f0", DataTypes.INT()),
+              DataTypes.FIELD("f1", DataTypes.STRING()),
+              DataTypes.FIELD("drop_add", DataTypes.STRING()),
+              DataTypes.FIELD("change_type", DataTypes.INT())))),
+          DataTypes.FIELD("f_row_array", DataTypes.ARRAY(DataTypes.ROW(
+              DataTypes.FIELD("f0", DataTypes.INT()),
+              DataTypes.FIELD("f1", DataTypes.STRING()),
+              DataTypes.FIELD("drop_add", DataTypes.STRING()),
+              DataTypes.FIELD("change_type", DataTypes.INT())))),
           DataTypes.FIELD("partition", DataTypes.VARCHAR(10)))
       .notNull();
 
@@ -118,6 +128,18 @@ public class TestConfigurations {
               DataTypes.FIELD("f0", DataTypes.DECIMAL(20, 0)))),
           DataTypes.FIELD("f_map", DataTypes.MAP(DataTypes.STRING(), 
DataTypes.DOUBLE())),
           DataTypes.FIELD("f_array", DataTypes.ARRAY(DataTypes.DOUBLE())),
+          DataTypes.FIELD("f_row_map", DataTypes.MAP(DataTypes.STRING(), 
DataTypes.ROW(
+              DataTypes.FIELD("f2", DataTypes.INT()), // new field added in 
the middle of struct
+              DataTypes.FIELD("f1", DataTypes.STRING()),
+              DataTypes.FIELD("renamed_change_type", DataTypes.BIGINT()),
+              DataTypes.FIELD("f3", DataTypes.STRING()),
+              DataTypes.FIELD("drop_add", DataTypes.STRING()),
+              DataTypes.FIELD("f0", DataTypes.DECIMAL(20, 0))))),
+          DataTypes.FIELD("f_row_array", DataTypes.ARRAY(DataTypes.ROW(
+              DataTypes.FIELD("f0", DataTypes.INT()),
+              DataTypes.FIELD("f1", DataTypes.STRING()),
+              DataTypes.FIELD("drop_add", DataTypes.STRING()),
+              DataTypes.FIELD("change_type", DataTypes.INT())))),
           DataTypes.FIELD("new_row_col", DataTypes.ROW(
               DataTypes.FIELD("f0", DataTypes.BIGINT()),
               DataTypes.FIELD("f1", DataTypes.STRING()))),
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
index 70455d94466..683051a4926 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
@@ -62,6 +62,11 @@ public class TestSQL {
       + "(2, array['abc2', 'def2'], array[2, 2], map['abc2', 1, 'def2', 3], 
row(array['abc2', 'def2'], row(2, 'abc2'))),\n"
       + "(3, array['abc3', 'def3'], array[3, 3], map['abc3', 1, 'def3', 3], 
row(array['abc3', 'def3'], row(3, 'abc3')))";
 
+  public static final String ARRAY_MAP_OF_ROW_TYPE_INSERT_T1 = "insert into t1 
values\n"
+          + "(1, array[row('abc11', 11), row('abc12', 12), row('abc13', 13)], 
map['abc11', row(11, 'def11'), 'abc12', row(12, 'def12'), 'abc13', row(13, 
'def13')]),\n"
+          + "(2, array[row('abc21', 21), row('abc22', 22), row('abc23', 23)], 
map['abc21', row(21, 'def21'), 'abc22', row(22, 'def22'), 'abc23', row(23, 
'def23')]),\n"
+          + "(3, array[row('abc31', 31), row('abc32', 32), row('abc33', 33)], 
map['abc31', row(31, 'def31'), 'abc32', row(32, 'def32'), 'abc33', row(33, 
'def33')])";
+
   public static final String NULL_CHILD_COLUMNS_ROW_TYPE_INSERT_T1 = "insert 
into t1 values\n"
       + "(1, row(cast(null as int), 'abc1')),\n"
       + "(2, row(2, cast(null as varchar))),\n"
diff --git 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index 8bbbb1288e5..4532fd0672e 100644
--- 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -19,11 +19,13 @@
 package org.apache.hudi.table.format.cow;
 
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.table.format.cow.vector.HeapArrayGroupColumnVector;
 import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
 import org.apache.hudi.table.format.cow.vector.HeapDecimalVector;
 import org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
 import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
 import org.apache.hudi.table.format.cow.vector.reader.ArrayColumnReader;
+import org.apache.hudi.table.format.cow.vector.reader.ArrayGroupReader;
 import org.apache.hudi.table.format.cow.vector.reader.EmptyColumnReader;
 import 
org.apache.hudi.table.format.cow.vector.reader.FixedLenBytesColumnReader;
 import 
org.apache.hudi.table.format.cow.vector.reader.Int64TimestampColumnReader;
@@ -62,6 +64,8 @@ import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TimestampType;
@@ -121,7 +125,7 @@ public class ParquetSplitReaderUtil {
       UnboundRecordFilter recordFilter) throws IOException {
 
     ValidationUtils.checkState(Arrays.stream(selectedFields).noneMatch(x -> x 
== -1),
-            "One or more specified columns does not exist in the hudi table.");
+        "One or more specified columns does not exist in the hudi table.");
 
     List<String> selNonPartNames = Arrays.stream(selectedFields)
         .mapToObj(i -> fullFieldNames[i])
@@ -282,12 +286,23 @@ public class ParquetSplitReaderUtil {
         }
         return tv;
       case ARRAY:
-        HeapArrayVector arrayVector = new HeapArrayVector(batchSize);
-        if (value == null) {
-          arrayVector.fillWithNulls();
-          return arrayVector;
+        ArrayType arrayType = (ArrayType) type;
+        if (arrayType.getElementType().isAnyOf(LogicalTypeFamily.CONSTRUCTED)) 
{
+          HeapArrayGroupColumnVector arrayGroup = new 
HeapArrayGroupColumnVector(batchSize);
+          if (value == null) {
+            arrayGroup.fillWithNulls();
+            return arrayGroup;
+          } else {
+            throw new UnsupportedOperationException("Unsupported create array 
with default value.");
+          }
         } else {
-          throw new UnsupportedOperationException("Unsupported create array 
with default value.");
+          HeapArrayVector arrayVector = new HeapArrayVector(batchSize);
+          if (value == null) {
+            arrayVector.fillWithNulls();
+            return arrayVector;
+          } else {
+            throw new UnsupportedOperationException("Unsupported create array 
with default value.");
+          }
         }
       case MAP:
         HeapMapColumnVector mapVector = new HeapMapColumnVector(batchSize, 
null, null);
@@ -394,12 +409,23 @@ public class ParquetSplitReaderUtil {
             throw new AssertionError();
         }
       case ARRAY:
-        return new ArrayColumnReader(
-            descriptor,
-            pageReader,
-            utcTimestamp,
-            descriptor.getPrimitiveType(),
-            fieldType);
+        ArrayType arrayType = (ArrayType) fieldType;
+        if (arrayType.getElementType().isAnyOf(LogicalTypeFamily.CONSTRUCTED)) 
{
+          return new ArrayGroupReader(createColumnReader(
+              utcTimestamp,
+              arrayType.getElementType(),
+              physicalType.asGroupType().getType(0),
+              descriptors,
+              pages,
+              depth + 1));
+        } else {
+          return new ArrayColumnReader(
+              descriptor,
+              pageReader,
+              utcTimestamp,
+              descriptor.getPrimitiveType(),
+              fieldType);
+        }
       case MAP:
         MapType mapType = (MapType) fieldType;
         ArrayColumnReader keyReader =
@@ -409,14 +435,24 @@ public class ParquetSplitReaderUtil {
                 utcTimestamp,
                 descriptor.getPrimitiveType(),
                 new ArrayType(mapType.getKeyType()));
-        ArrayColumnReader valueReader =
-            new ArrayColumnReader(
-                descriptors.get(1),
-                pages.getPageReader(descriptors.get(1)),
-                utcTimestamp,
-                descriptors.get(1).getPrimitiveType(),
-                new ArrayType(mapType.getValueType()));
-        return new MapColumnReader(keyReader, valueReader, fieldType);
+        ColumnReader<WritableColumnVector> valueReader;
+        if (mapType.getValueType().isAnyOf(LogicalTypeFamily.CONSTRUCTED)) {
+          valueReader = new ArrayGroupReader(createColumnReader(
+              utcTimestamp,
+              mapType.getValueType(),
+              physicalType.asGroupType().getType(0).asGroupType().getType(1), 
// Get the value physical type
+              descriptors.subList(1, descriptors.size()), // remove the key 
descriptor
+              pages,
+              depth + 2)); // increase the depth by 2, because there's a 
key_value entry in the path
+        } else {
+          valueReader = new ArrayColumnReader(
+              descriptors.get(1),
+              pages.getPageReader(descriptors.get(1)),
+              utcTimestamp,
+              descriptors.get(1).getPrimitiveType(),
+              new ArrayType(mapType.getValueType()));
+        }
+        return new MapColumnReader(keyReader, valueReader);
       case ROW:
         RowType rowType = (RowType) fieldType;
         GroupType groupType = physicalType.asGroupType();
@@ -427,14 +463,32 @@ public class ParquetSplitReaderUtil {
           if (fieldIndex < 0) {
             fieldReaders.add(new EmptyColumnReader());
           } else {
-            fieldReaders.add(
-                createColumnReader(
-                    utcTimestamp,
-                    rowType.getTypeAt(i),
-                    groupType.getType(fieldIndex),
-                    descriptors,
-                    pages,
-                    depth + 1));
+            // Check for nested row in array with atomic field type.
+
+            // This is done to meet the Parquet field algorithm that pushes 
multiplicity and structures down to individual fields.
+            // In Parquet, an array of rows is stored as separate arrays for 
each field.
+
+            // Limitations: It won't work for multiple nested arrays and maps.
+            // The main problem is that the Flink classes and interface don't 
follow that pattern.
+            if (descriptors.get(fieldIndex).getMaxRepetitionLevel() > 0 && 
!rowType.getTypeAt(i).is(LogicalTypeRoot.ARRAY)) {
+              fieldReaders.add(
+                  createColumnReader(
+                      utcTimestamp,
+                      new ArrayType(rowType.getTypeAt(i).isNullable(), 
rowType.getTypeAt(i)),
+                      groupType.getType(fieldIndex),
+                      descriptors,
+                      pages,
+                      depth + 1));
+            } else {
+              fieldReaders.add(
+                  createColumnReader(
+                      utcTimestamp,
+                      rowType.getTypeAt(i),
+                      groupType.getType(fieldIndex),
+                      descriptors,
+                      pages,
+                      depth + 1));
+            }
           }
         }
         return new RowColumnReader(fieldReaders);
@@ -501,43 +555,65 @@ public class ParquetSplitReaderUtil {
       case TIMESTAMP_WITHOUT_TIME_ZONE:
       case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
         checkArgument(primitiveType.getOriginalType() != 
OriginalType.TIME_MICROS,
-                
getOriginalTypeCheckFailureMessage(primitiveType.getOriginalType(), fieldType));
+            
getOriginalTypeCheckFailureMessage(primitiveType.getOriginalType(), fieldType));
         return new HeapTimestampVector(batchSize);
       case DECIMAL:
         checkArgument(
             (typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
                 || typeName == PrimitiveType.PrimitiveTypeName.BINARY)
                 && primitiveType.getOriginalType() == OriginalType.DECIMAL,
-                getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
+            getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
         return new HeapDecimalVector(batchSize);
       case ARRAY:
         ArrayType arrayType = (ArrayType) fieldType;
-        return new HeapArrayVector(
-            batchSize,
-            createWritableColumnVector(
-                batchSize,
-                arrayType.getElementType(),
-                physicalType,
-                descriptors,
-                depth));
+        if (arrayType.getElementType().isAnyOf(LogicalTypeFamily.CONSTRUCTED)) 
{
+          return new HeapArrayGroupColumnVector(
+              batchSize,
+              createWritableColumnVector(
+                  batchSize,
+                  arrayType.getElementType(),
+                  physicalType.asGroupType().getType(0),
+                  descriptors,
+                  depth + 1));
+        } else {
+          return new HeapArrayVector(
+              batchSize,
+              createWritableColumnVector(
+                  batchSize,
+                  arrayType.getElementType(),
+                  physicalType,
+                  descriptors,
+                  depth));
+        }
       case MAP:
         MapType mapType = (MapType) fieldType;
         GroupType repeatedType = 
physicalType.asGroupType().getType(0).asGroupType();
         // the map column has three level paths.
-        return new HeapMapColumnVector(
+        WritableColumnVector keyColumnVector = createWritableColumnVector(
             batchSize,
-            createWritableColumnVector(
-                batchSize,
-                mapType.getKeyType(),
-                repeatedType.getType(0),
-                descriptors,
-                depth + 2),
-            createWritableColumnVector(
-                batchSize,
-                mapType.getValueType(),
-                repeatedType.getType(1),
-                descriptors,
-                depth + 2));
+            new ArrayType(mapType.getKeyType().isNullable(), 
mapType.getKeyType()),
+            repeatedType.getType(0),
+            descriptors,
+            depth + 2);
+        WritableColumnVector valueColumnVector;
+        if (mapType.getValueType().isAnyOf(LogicalTypeFamily.CONSTRUCTED)) {
+          valueColumnVector = new HeapArrayGroupColumnVector(
+              batchSize,
+              createWritableColumnVector(
+                  batchSize,
+                  mapType.getValueType(),
+                  repeatedType.getType(1).asGroupType(),
+                  descriptors,
+                  depth + 2));
+        } else {
+          valueColumnVector = createWritableColumnVector(
+              batchSize,
+              new ArrayType(mapType.getValueType().isNullable(), 
mapType.getValueType()),
+              repeatedType.getType(1),
+              descriptors,
+              depth + 2);
+        }
+        return new HeapMapColumnVector(batchSize, keyColumnVector, 
valueColumnVector);
       case ROW:
         RowType rowType = (RowType) fieldType;
         GroupType groupType = physicalType.asGroupType();
@@ -546,15 +622,44 @@ public class ParquetSplitReaderUtil {
           // schema evolution: read the file with a new extended field name.
           int fieldIndex = 
getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
           if (fieldIndex < 0) {
-            columnVectors[i] = (WritableColumnVector) 
createVectorFromConstant(rowType.getTypeAt(i), null, batchSize);
+            // Check for nested row in array with atomic field type.
+
+            // This is done to meet the Parquet field algorithm that pushes 
multiplicity and structures down to individual fields.
+            // In Parquet, an array of rows is stored as separate arrays for 
each field.
+
+            // Limitations: It won't work for multiple nested arrays and maps.
+            // The main problem is that the Flink classes and interface don't 
follow that pattern.
+            if (groupType.getRepetition().equals(Type.Repetition.REPEATED) && 
!rowType.getTypeAt(i).is(LogicalTypeRoot.ARRAY)) {
+              columnVectors[i] = (WritableColumnVector) 
createVectorFromConstant(
+                  new ArrayType(rowType.getTypeAt(i).isNullable(), 
rowType.getTypeAt(i)), null, batchSize);
+            } else {
+              columnVectors[i] = (WritableColumnVector) 
createVectorFromConstant(rowType.getTypeAt(i), null, batchSize);
+            }
           } else {
-            columnVectors[i] =
-                createWritableColumnVector(
-                    batchSize,
-                    rowType.getTypeAt(i),
-                    groupType.getType(fieldIndex),
-                    descriptors,
-                    depth + 1);
+            // Check for nested row in array with atomic field type.
+
+            // This is done to meet the Parquet field algorithm that pushes 
multiplicity and structures down to individual fields.
+            // In Parquet, an array of rows is stored as separate arrays for 
each field.
+
+            // Limitations: It won't work for multiple nested arrays and maps.
+            // The main problem is that the Flink classes and interface don't 
follow that pattern.
+            if (descriptors.get(fieldIndex).getMaxRepetitionLevel() > 0 && 
!rowType.getTypeAt(i).is(LogicalTypeRoot.ARRAY)) {
+              columnVectors[i] =
+                  createWritableColumnVector(
+                      batchSize,
+                      new ArrayType(rowType.getTypeAt(i).isNullable(), 
rowType.getTypeAt(i)),
+                      groupType.getType(fieldIndex),
+                      descriptors,
+                      depth + 1);
+            } else {
+              columnVectors[i] =
+                  createWritableColumnVector(
+                      batchSize,
+                      rowType.getTypeAt(i),
+                      groupType.getType(fieldIndex),
+                      descriptors,
+                      depth + 1);
+            }
           }
         }
         return new HeapRowColumnVector(batchSize, columnVectors);
@@ -575,8 +680,9 @@ public class ParquetSplitReaderUtil {
 
   /**
    * Construct the error message when primitive type mismatches.
+   *
    * @param primitiveType Primitive type
-   * @param fieldType Logical field type
+   * @param fieldType     Logical field type
    * @return The error message
    */
   private static String 
getPrimitiveTypeCheckFailureMessage(PrimitiveType.PrimitiveTypeName 
primitiveType, LogicalType fieldType) {
@@ -585,8 +691,9 @@ public class ParquetSplitReaderUtil {
 
   /**
    * Construct the error message when original type mismatches.
+   *
    * @param originalType Original type
-   * @param fieldType Logical field type
+   * @param fieldType    Logical field type
    * @return The error message
    */
   private static String getOriginalTypeCheckFailureMessage(OriginalType 
originalType, LogicalType fieldType) {
diff --git 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/ColumnarGroupArrayData.java
 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/ColumnarGroupArrayData.java
new file mode 100644
index 00000000000..4c9275f3b09
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/ColumnarGroupArrayData.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import 
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
+
+public class ColumnarGroupArrayData implements ArrayData {
+
+  WritableColumnVector vector;
+  int rowId;
+
+  public ColumnarGroupArrayData(WritableColumnVector vector, int rowId) {
+    this.vector = vector;
+    this.rowId = rowId;
+  }
+
+  @Override
+  public int size() {
+    if (vector == null) {
+      return 0;
+    }
+
+    if (vector instanceof HeapRowColumnVector) {
+      // assume all fields have the same size
+      if (((HeapRowColumnVector) vector).vectors == null || 
((HeapRowColumnVector) vector).vectors.length == 0) {
+        return 0;
+      }
+      return ((HeapArrayVector) ((HeapRowColumnVector) 
vector).vectors[0]).getArray(rowId).size();
+    }
+    throw new UnsupportedOperationException(vector.getClass().getName() + " is 
not supported. Supported vector types: HeapRowColumnVector");
+  }
+
+  @Override
+  public boolean isNullAt(int index) {
+    if (vector == null) {
+      return true;
+    }
+
+    if (vector instanceof HeapRowColumnVector) {
+      return ((HeapRowColumnVector) vector).vectors == null;
+    }
+
+    throw new UnsupportedOperationException(vector.getClass().getName() + " is 
not supported. Supported vector types: HeapRowColumnVector");
+  }
+
+  @Override
+  public boolean getBoolean(int index) {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public byte getByte(int index) {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public short getShort(int index) {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public int getInt(int index) {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public long getLong(int index) {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public float getFloat(int index) {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public double getDouble(int index) {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public StringData getString(int index) {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public DecimalData getDecimal(int index, int precision, int scale) {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public TimestampData getTimestamp(int index, int precision) {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public <T> RawValueData<T> getRawValue(int index) {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public byte[] getBinary(int index) {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public ArrayData getArray(int index) {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public MapData getMap(int index) {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public RowData getRow(int index, int numFields) {
+    return new ColumnarGroupRowData((HeapRowColumnVector) vector, rowId, 
index);
+  }
+
+  @Override
+  public boolean[] toBooleanArray() {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public byte[] toByteArray() {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public short[] toShortArray() {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public int[] toIntArray() {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public long[] toLongArray() {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public float[] toFloatArray() {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public double[] toDoubleArray() {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+}
diff --git 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/ColumnarGroupMapData.java
 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/ColumnarGroupMapData.java
new file mode 100644
index 00000000000..69cb6feca13
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/ColumnarGroupMapData.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.MapData;
+import 
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
+
+public class ColumnarGroupMapData implements MapData {
+
+  WritableColumnVector keyVector;
+  WritableColumnVector valueVector;
+  int rowId;
+
+  public ColumnarGroupMapData(WritableColumnVector keyVector, 
WritableColumnVector valueVector, int rowId) {
+    this.keyVector = keyVector;
+    this.valueVector = valueVector;
+    this.rowId = rowId;
+  }
+
+  @Override
+  public int size() {
+    if (keyVector == null) {
+      return 0;
+    }
+
+    if (keyVector instanceof HeapArrayVector) {
+      return ((HeapArrayVector) keyVector).getArray(rowId).size();
+    }
+    throw new UnsupportedOperationException(keyVector.getClass().getName() + " 
is not supported. Supported vector types: HeapArrayVector");
+  }
+
+  @Override
+  public ArrayData keyArray() {
+    return ((HeapArrayVector) keyVector).getArray(rowId);
+  }
+
+  @Override
+  public ArrayData valueArray() {
+    if (valueVector instanceof HeapArrayVector) {
+      return ((HeapArrayVector) valueVector).getArray(rowId);
+    } else if (valueVector instanceof HeapArrayGroupColumnVector) {
+      return ((HeapArrayGroupColumnVector) valueVector).getArray(rowId);
+    }
+    throw new UnsupportedOperationException(valueVector.getClass().getName() + 
" is not supported. Supported vector types: HeapArrayVector, 
HeapArrayGroupColumnVector");
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/ColumnarGroupRowData.java
 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/ColumnarGroupRowData.java
new file mode 100644
index 00000000000..439c1880823
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/ColumnarGroupRowData.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+
+public class ColumnarGroupRowData implements RowData {
+
+  HeapRowColumnVector vector;
+  int rowId;
+  int index;
+
+  public ColumnarGroupRowData(HeapRowColumnVector vector, int rowId, int 
index) {
+    this.vector = vector;
+    this.rowId = rowId;
+    this.index = index;
+  }
+
+  @Override
+  public int getArity() {
+    return vector.vectors.length;
+  }
+
+  @Override
+  public RowKind getRowKind() {
+    return RowKind.INSERT;
+  }
+
+  @Override
+  public void setRowKind(RowKind rowKind) {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public boolean isNullAt(int pos) {
+    return
+        vector.vectors[pos].isNullAt(rowId)
+            || ((HeapArrayVector) 
(vector.vectors[pos])).getArray(rowId).isNullAt(index);
+  }
+
+  @Override
+  public boolean getBoolean(int pos) {
+    return ((HeapArrayVector) 
(vector.vectors[pos])).getArray(rowId).getBoolean(index);
+  }
+
+  @Override
+  public byte getByte(int pos) {
+    return ((HeapArrayVector) 
(vector.vectors[pos])).getArray(rowId).getByte(index);
+  }
+
+  @Override
+  public short getShort(int pos) {
+    return ((HeapArrayVector) 
(vector.vectors[pos])).getArray(rowId).getShort(index);
+  }
+
+  @Override
+  public int getInt(int pos) {
+    return ((HeapArrayVector) 
(vector.vectors[pos])).getArray(rowId).getInt(index);
+  }
+
+  @Override
+  public long getLong(int pos) {
+    return ((HeapArrayVector) 
(vector.vectors[pos])).getArray(rowId).getLong(index);
+  }
+
+  @Override
+  public float getFloat(int pos) {
+    return ((HeapArrayVector) 
(vector.vectors[pos])).getArray(rowId).getFloat(index);
+  }
+
+  @Override
+  public double getDouble(int pos) {
+    return ((HeapArrayVector) 
(vector.vectors[pos])).getArray(rowId).getDouble(index);
+  }
+
+  @Override
+  public StringData getString(int pos) {
+    return ((HeapArrayVector) 
(vector.vectors[pos])).getArray(rowId).getString(index);
+  }
+
+  @Override
+  public DecimalData getDecimal(int pos, int i1, int i2) {
+    return ((HeapArrayVector) 
(vector.vectors[pos])).getArray(rowId).getDecimal(index, i1, i2);
+  }
+
+  @Override
+  public TimestampData getTimestamp(int pos, int i1) {
+    return ((HeapArrayVector) 
(vector.vectors[pos])).getArray(rowId).getTimestamp(index, i1);
+  }
+
+  @Override
+  public <T> RawValueData<T> getRawValue(int pos) {
+    return ((HeapArrayVector) 
(vector.vectors[pos])).getArray(rowId).getRawValue(index);
+  }
+
+  @Override
+  public byte[] getBinary(int pos) {
+    return ((HeapArrayVector) 
(vector.vectors[pos])).getArray(rowId).getBinary(index);
+  }
+
+  @Override
+  public ArrayData getArray(int pos) {
+    return ((HeapArrayVector) 
(vector.vectors[pos])).getArray(rowId).getArray(index);
+  }
+
+  @Override
+  public MapData getMap(int pos) {
+    return ((HeapArrayVector) 
(vector.vectors[pos])).getArray(rowId).getMap(index);
+  }
+
+  @Override
+  public RowData getRow(int pos, int numFields) {
+    return ((HeapArrayVector) 
(vector.vectors[pos])).getArray(rowId).getRow(index, numFields);
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayGroupColumnVector.java
 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayGroupColumnVector.java
new file mode 100644
index 00000000000..3d7d8b1f0de
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayGroupColumnVector.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.columnar.vector.ArrayColumnVector;
+import org.apache.flink.table.data.columnar.vector.heap.AbstractHeapVector;
+import 
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
+
+/**
+ * This class represents a nullable heap row column vector.
+ */
+public class HeapArrayGroupColumnVector extends AbstractHeapVector
+    implements WritableColumnVector, ArrayColumnVector {
+
+  public WritableColumnVector vector;
+
+  public HeapArrayGroupColumnVector(int len) {
+    super(len);
+  }
+
+  public HeapArrayGroupColumnVector(int len, WritableColumnVector vector) {
+    super(len);
+    this.vector = vector;
+  }
+
+  @Override
+  public ArrayData getArray(int rowId) {
+    return new ColumnarGroupArrayData(vector, rowId);
+  }
+
+  @Override
+  public void reset() {
+    super.reset();
+    vector.reset();
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
index a3797371695..95d8fd720d3 100644
--- 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
+++ 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
@@ -19,8 +19,6 @@
 package org.apache.hudi.table.format.cow.vector;
 
 import org.apache.flink.table.data.MapData;
-import org.apache.flink.table.data.columnar.ColumnarMapData;
-import org.apache.flink.table.data.columnar.vector.ColumnVector;
 import org.apache.flink.table.data.columnar.vector.MapColumnVector;
 import org.apache.flink.table.data.columnar.vector.heap.AbstractHeapVector;
 import 
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
@@ -31,49 +29,25 @@ import 
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector
 public class HeapMapColumnVector extends AbstractHeapVector
     implements WritableColumnVector, MapColumnVector {
 
-  private long[] offsets;
-  private long[] lengths;
-  private int size;
-  private ColumnVector keys;
-  private ColumnVector values;
+  private WritableColumnVector keys;
+  private WritableColumnVector values;
 
-  public HeapMapColumnVector(int len, ColumnVector keys, ColumnVector values) {
+  public HeapMapColumnVector(int len, WritableColumnVector keys, 
WritableColumnVector values) {
     super(len);
-    size = 0;
-    offsets = new long[len];
-    lengths = new long[len];
     this.keys = keys;
     this.values = values;
   }
 
-  public void setOffsets(long[] offsets) {
-    this.offsets = offsets;
+  public WritableColumnVector getKeys() {
+    return keys;
   }
 
-  public void setLengths(long[] lengths) {
-    this.lengths = lengths;
-  }
-
-  public void setKeys(ColumnVector keys) {
-    this.keys = keys;
-  }
-
-  public void setValues(ColumnVector values) {
-    this.values = values;
-  }
-
-  public int getSize() {
-    return size;
-  }
-
-  public void setSize(int size) {
-    this.size = size;
+  public WritableColumnVector getValues() {
+    return values;
   }
 
   @Override
-  public MapData getMap(int i) {
-    long offset = offsets[i];
-    long length = lengths[i];
-    return new ColumnarMapData(keys, values, (int) offset, (int) length);
+  public MapData getMap(int rowId) {
+    return new ColumnarGroupMapData(keys, values, rowId);
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayGroupReader.java
 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayGroupReader.java
new file mode 100644
index 00000000000..437c186a936
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayGroupReader.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector.reader;
+
+import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
+import 
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
+import org.apache.hudi.table.format.cow.vector.HeapArrayGroupColumnVector;
+
+import java.io.IOException;
+
+/**
+ * Array of a Group type (Array, Map, Row, etc.) {@link ColumnReader}.
+ */
+public class ArrayGroupReader implements ColumnReader<WritableColumnVector> {
+
+  private final ColumnReader<WritableColumnVector> fieldReader;
+
+  public ArrayGroupReader(ColumnReader<WritableColumnVector> fieldReader) {
+    this.fieldReader = fieldReader;
+  }
+
+  @Override
+  public void readToVector(int readNumber, WritableColumnVector vector) throws 
IOException {
+    HeapArrayGroupColumnVector rowColumnVector = (HeapArrayGroupColumnVector) 
vector;
+
+    fieldReader.readToVector(readNumber, rowColumnVector.vector);
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/MapColumnReader.java
 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/MapColumnReader.java
index a6762d2e175..ee65dd22c43 100644
--- 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/MapColumnReader.java
+++ 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/MapColumnReader.java
@@ -18,14 +18,11 @@
 
 package org.apache.hudi.table.format.cow.vector.reader;
 
-import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
 import org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
 
 import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
-import org.apache.flink.table.data.columnar.vector.ColumnVector;
+import org.apache.flink.table.data.columnar.vector.heap.AbstractHeapVector;
 import 
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.MapType;
 
 import java.io.IOException;
 
@@ -34,43 +31,26 @@ import java.io.IOException;
  */
 public class MapColumnReader implements ColumnReader<WritableColumnVector> {
 
-  private final LogicalType logicalType;
   private final ArrayColumnReader keyReader;
-  private final ArrayColumnReader valueReader;
+  private final ColumnReader<WritableColumnVector> valueReader;
 
   public MapColumnReader(
-      ArrayColumnReader keyReader, ArrayColumnReader valueReader, LogicalType 
logicalType) {
+      ArrayColumnReader keyReader, ColumnReader<WritableColumnVector> 
valueReader) {
     this.keyReader = keyReader;
     this.valueReader = valueReader;
-    this.logicalType = logicalType;
   }
 
-  public void readBatch(int total, ColumnVector column) throws IOException {
-    HeapMapColumnVector mapColumnVector = (HeapMapColumnVector) column;
-    MapType mapType = (MapType) logicalType;
-    // initialize 2 ListColumnVector for keys and values
-    HeapArrayVector keyArrayColumnVector = new HeapArrayVector(total);
-    HeapArrayVector valueArrayColumnVector = new HeapArrayVector(total);
-    // read the keys and values
-    keyReader.readToVector(total, keyArrayColumnVector);
-    valueReader.readToVector(total, valueArrayColumnVector);
-
-    // set the related attributes according to the keys and values
-    mapColumnVector.setKeys(keyArrayColumnVector.child);
-    mapColumnVector.setValues(valueArrayColumnVector.child);
-    mapColumnVector.setOffsets(keyArrayColumnVector.offsets);
-    mapColumnVector.setLengths(keyArrayColumnVector.lengths);
-    mapColumnVector.setSize(keyArrayColumnVector.getSize());
+  @Override
+  public void readToVector(int readNumber, WritableColumnVector vector) throws 
IOException {
+    HeapMapColumnVector mapColumnVector = (HeapMapColumnVector) vector;
+    AbstractHeapVector keyArrayColumnVector = (AbstractHeapVector) 
(mapColumnVector.getKeys());
+    keyReader.readToVector(readNumber, mapColumnVector.getKeys());
+    valueReader.readToVector(readNumber, mapColumnVector.getValues());
     for (int i = 0; i < keyArrayColumnVector.getLen(); i++) {
       if (keyArrayColumnVector.isNullAt(i)) {
         mapColumnVector.setNullAt(i);
       }
     }
   }
-
-  @Override
-  public void readToVector(int readNumber, WritableColumnVector vector) throws 
IOException {
-    readBatch(readNumber, vector);
-  }
 }
 


Reply via email to