This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 674c149  [HUDI-3083] Support component data types for flink 
bulk_insert (#4470)
674c149 is described below

commit 674c1492348b5b2a93358c9dd51a1adfe6a8ecf2
Author: Ron <[email protected]>
AuthorDate: Thu Dec 30 11:15:54 2021 +0800

    [HUDI-3083] Support component data types for flink bulk_insert (#4470)
    
    * [HUDI-3083] Support component data types for flink bulk_insert
    
    * add nested row type test
---
 .../storage/row/parquet/ParquetRowDataWriter.java  | 326 +++++++++++---
 .../row/parquet/ParquetSchemaConverter.java        |  41 ++
 .../row/parquet/TestParquetSchemaConverter.java    |  74 ++++
 .../hudi/sink/append/AppendWriteFunction.java      |  19 +-
 .../format/cow/ParquetColumnarRowSplitReader.java  |  25 +-
 .../table/format/cow/ParquetDecimalVector.java     |   4 +-
 .../table/format/cow/ParquetSplitReaderUtil.java   | 148 ++++++-
 .../table/format/cow/data/ColumnarArrayData.java   | 272 ++++++++++++
 .../table/format/cow/data/ColumnarMapData.java     |  75 ++++
 .../table/format/cow/data/ColumnarRowData.java     | 232 ++++++++++
 .../table/format/cow/vector/HeapArrayVector.java   |  71 ++++
 .../format/cow/vector/HeapMapColumnVector.java     |  79 ++++
 .../format/cow/vector/HeapRowColumnVector.java     |  45 ++
 .../table/format/cow/vector/MapColumnVector.java   |  29 ++
 .../table/format/cow/vector/RowColumnVector.java   |  30 ++
 .../format/cow/vector/VectorizedColumnBatch.java   | 148 +++++++
 .../cow/vector/reader/ArrayColumnReader.java       | 473 +++++++++++++++++++++
 .../vector/reader/BaseVectorizedColumnReader.java  | 313 ++++++++++++++
 .../format/cow/vector/reader/MapColumnReader.java  |  76 ++++
 .../cow/vector/reader/ParquetDataColumnReader.java | 199 +++++++++
 .../reader/ParquetDataColumnReaderFactory.java     | 304 +++++++++++++
 .../format/cow/vector/reader/RowColumnReader.java  |  57 +++
 .../apache/hudi/table/HoodieDataSourceITCase.java  |  56 +++
 .../test/java/org/apache/hudi/utils/TestSQL.java   |  10 +
 24 files changed, 3031 insertions(+), 75 deletions(-)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java
index 1c8b988..3d9524e 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java
@@ -18,19 +18,22 @@
 
 package org.apache.hudi.io.storage.row.parquet;
 
+import org.apache.flink.table.data.ArrayData;
 import org.apache.flink.table.data.DecimalDataUtils;
+import org.apache.flink.table.data.MapData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.util.Preconditions;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.io.api.RecordConsumer;
 import org.apache.parquet.schema.GroupType;
-import org.apache.parquet.schema.Type;
 
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
@@ -46,7 +49,8 @@ import static 
org.apache.flink.formats.parquet.vector.reader.TimestampColumnRead
 /**
  * Writes a record to the Parquet API with the expected schema in order to be 
written to a file.
  *
- * <p>Reference org.apache.flink.formats.parquet.row.ParquetRowDataWriter to 
support timestamp of INT64 8 bytes.
+ * <p>Reference {@code 
org.apache.flink.formats.parquet.row.ParquetRowDataWriter}
+ * to support timestamp of INT64 8 bytes and complex data types.
  */
 public class ParquetRowDataWriter {
 
@@ -67,7 +71,7 @@ public class ParquetRowDataWriter {
     this.filedWriters = new FieldWriter[rowType.getFieldCount()];
     this.fieldNames = rowType.getFieldNames().toArray(new String[0]);
     for (int i = 0; i < rowType.getFieldCount(); i++) {
-      this.filedWriters[i] = createWriter(rowType.getTypeAt(i), 
schema.getType(i));
+      this.filedWriters[i] = createWriter(rowType.getTypeAt(i));
     }
   }
 
@@ -91,59 +95,75 @@ public class ParquetRowDataWriter {
     recordConsumer.endMessage();
   }
 
-  private FieldWriter createWriter(LogicalType t, Type type) {
-    if (type.isPrimitive()) {
-      switch (t.getTypeRoot()) {
-        case CHAR:
-        case VARCHAR:
-          return new StringWriter();
-        case BOOLEAN:
-          return new BooleanWriter();
-        case BINARY:
-        case VARBINARY:
-          return new BinaryWriter();
-        case DECIMAL:
-          DecimalType decimalType = (DecimalType) t;
-          return createDecimalWriter(decimalType.getPrecision(), 
decimalType.getScale());
-        case TINYINT:
-          return new ByteWriter();
-        case SMALLINT:
-          return new ShortWriter();
-        case DATE:
-        case TIME_WITHOUT_TIME_ZONE:
-        case INTEGER:
-          return new IntWriter();
-        case BIGINT:
-          return new LongWriter();
-        case FLOAT:
-          return new FloatWriter();
-        case DOUBLE:
-          return new DoubleWriter();
-        case TIMESTAMP_WITHOUT_TIME_ZONE:
-          TimestampType timestampType = (TimestampType) t;
-          if (timestampType.getPrecision() == 3) {
-            return new Timestamp64Writer();
-          } else {
-            return new Timestamp96Writer(timestampType.getPrecision());
-          }
-        case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-          LocalZonedTimestampType localZonedTimestampType = 
(LocalZonedTimestampType) t;
-          if (localZonedTimestampType.getPrecision() == 3) {
-            return new Timestamp64Writer();
-          } else {
-            return new 
Timestamp96Writer(localZonedTimestampType.getPrecision());
-          }
-        default:
-          throw new UnsupportedOperationException("Unsupported type: " + type);
-      }
-    } else {
-      throw new IllegalArgumentException("Unsupported  data type: " + t);
+  private FieldWriter createWriter(LogicalType t) {
+    switch (t.getTypeRoot()) {
+      case CHAR:
+      case VARCHAR:
+        return new StringWriter();
+      case BOOLEAN:
+        return new BooleanWriter();
+      case BINARY:
+      case VARBINARY:
+        return new BinaryWriter();
+      case DECIMAL:
+        DecimalType decimalType = (DecimalType) t;
+        return createDecimalWriter(decimalType.getPrecision(), 
decimalType.getScale());
+      case TINYINT:
+        return new ByteWriter();
+      case SMALLINT:
+        return new ShortWriter();
+      case DATE:
+      case TIME_WITHOUT_TIME_ZONE:
+      case INTEGER:
+        return new IntWriter();
+      case BIGINT:
+        return new LongWriter();
+      case FLOAT:
+        return new FloatWriter();
+      case DOUBLE:
+        return new DoubleWriter();
+      case TIMESTAMP_WITHOUT_TIME_ZONE:
+        TimestampType timestampType = (TimestampType) t;
+        if (timestampType.getPrecision() == 3) {
+          return new Timestamp64Writer();
+        } else {
+          return new Timestamp96Writer(timestampType.getPrecision());
+        }
+      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+        LocalZonedTimestampType localZonedTimestampType = 
(LocalZonedTimestampType) t;
+        if (localZonedTimestampType.getPrecision() == 3) {
+          return new Timestamp64Writer();
+        } else {
+          return new Timestamp96Writer(localZonedTimestampType.getPrecision());
+        }
+      case ARRAY:
+        ArrayType arrayType = (ArrayType) t;
+        LogicalType elementType = arrayType.getElementType();
+        FieldWriter elementWriter = createWriter(elementType);
+        return new ArrayWriter(elementWriter);
+      case MAP:
+        MapType mapType = (MapType) t;
+        LogicalType keyType = mapType.getKeyType();
+        LogicalType valueType = mapType.getValueType();
+        FieldWriter keyWriter = createWriter(keyType);
+        FieldWriter valueWriter = createWriter(valueType);
+        return new MapWriter(keyWriter, valueWriter);
+      case ROW:
+        RowType rowType = (RowType) t;
+        FieldWriter[] fieldWriters = rowType.getFields().stream()
+            
.map(RowType.RowField::getType).map(this::createWriter).toArray(FieldWriter[]::new);
+        String[] fieldNames = rowType.getFields().stream()
+            .map(RowType.RowField::getName).toArray(String[]::new);
+        return new RowWriter(fieldNames, fieldWriters);
+      default:
+        throw new UnsupportedOperationException("Unsupported type: " + t);
     }
   }
 
   private interface FieldWriter {
-
     void write(RowData row, int ordinal);
+
+    void write(ArrayData array, int ordinal);
   }
 
   private class BooleanWriter implements FieldWriter {
@@ -152,6 +172,11 @@ public class ParquetRowDataWriter {
     public void write(RowData row, int ordinal) {
       recordConsumer.addBoolean(row.getBoolean(ordinal));
     }
+
+    @Override
+    public void write(ArrayData array, int ordinal) {
+      recordConsumer.addBoolean(array.getBoolean(ordinal));
+    }
   }
 
   private class ByteWriter implements FieldWriter {
@@ -160,6 +185,11 @@ public class ParquetRowDataWriter {
     public void write(RowData row, int ordinal) {
       recordConsumer.addInteger(row.getByte(ordinal));
     }
+
+    @Override
+    public void write(ArrayData array, int ordinal) {
+      recordConsumer.addInteger(array.getByte(ordinal));
+    }
   }
 
   private class ShortWriter implements FieldWriter {
@@ -168,6 +198,11 @@ public class ParquetRowDataWriter {
     public void write(RowData row, int ordinal) {
       recordConsumer.addInteger(row.getShort(ordinal));
     }
+
+    @Override
+    public void write(ArrayData array, int ordinal) {
+      recordConsumer.addInteger(array.getShort(ordinal));
+    }
   }
 
   private class LongWriter implements FieldWriter {
@@ -176,6 +211,11 @@ public class ParquetRowDataWriter {
     public void write(RowData row, int ordinal) {
       recordConsumer.addLong(row.getLong(ordinal));
     }
+
+    @Override
+    public void write(ArrayData array, int ordinal) {
+      recordConsumer.addLong(array.getLong(ordinal));
+    }
   }
 
   private class FloatWriter implements FieldWriter {
@@ -184,6 +224,11 @@ public class ParquetRowDataWriter {
     public void write(RowData row, int ordinal) {
       recordConsumer.addFloat(row.getFloat(ordinal));
     }
+
+    @Override
+    public void write(ArrayData array, int ordinal) {
+      recordConsumer.addFloat(array.getFloat(ordinal));
+    }
   }
 
   private class DoubleWriter implements FieldWriter {
@@ -192,6 +237,11 @@ public class ParquetRowDataWriter {
     public void write(RowData row, int ordinal) {
       recordConsumer.addDouble(row.getDouble(ordinal));
     }
+
+    @Override
+    public void write(ArrayData array, int ordinal) {
+      recordConsumer.addDouble(array.getDouble(ordinal));
+    }
   }
 
   private class StringWriter implements FieldWriter {
@@ -200,6 +250,11 @@ public class ParquetRowDataWriter {
     public void write(RowData row, int ordinal) {
       
recordConsumer.addBinary(Binary.fromReusedByteArray(row.getString(ordinal).toBytes()));
     }
+
+    @Override
+    public void write(ArrayData array, int ordinal) {
+      
recordConsumer.addBinary(Binary.fromReusedByteArray(array.getString(ordinal).toBytes()));
+    }
   }
 
   private class BinaryWriter implements FieldWriter {
@@ -208,6 +263,11 @@ public class ParquetRowDataWriter {
     public void write(RowData row, int ordinal) {
       
recordConsumer.addBinary(Binary.fromReusedByteArray(row.getBinary(ordinal)));
     }
+
+    @Override
+    public void write(ArrayData array, int ordinal) {
+      
recordConsumer.addBinary(Binary.fromReusedByteArray(array.getBinary(ordinal)));
+    }
   }
 
   private class IntWriter implements FieldWriter {
@@ -216,6 +276,11 @@ public class ParquetRowDataWriter {
     public void write(RowData row, int ordinal) {
       recordConsumer.addInteger(row.getInt(ordinal));
     }
+
+    @Override
+    public void write(ArrayData array, int ordinal) {
+      recordConsumer.addInteger(array.getInt(ordinal));
+    }
   }
 
   /**
@@ -231,6 +296,11 @@ public class ParquetRowDataWriter {
     public void write(RowData row, int ordinal) {
       recordConsumer.addLong(timestampToInt64(row.getTimestamp(ordinal, 3)));
     }
+
+    @Override
+    public void write(ArrayData array, int ordinal) {
+      recordConsumer.addLong(timestampToInt64(array.getTimestamp(ordinal, 3)));
+    }
   }
 
   private long timestampToInt64(TimestampData timestampData) {
@@ -254,6 +324,11 @@ public class ParquetRowDataWriter {
     public void write(RowData row, int ordinal) {
       recordConsumer.addBinary(timestampToInt96(row.getTimestamp(ordinal, 
precision)));
     }
+
+    @Override
+    public void write(ArrayData array, int ordinal) {
+      recordConsumer.addBinary(timestampToInt96(array.getTimestamp(ordinal, 
precision)));
+    }
   }
 
   private Binary timestampToInt96(TimestampData timestampData) {
@@ -304,10 +379,20 @@ public class ParquetRowDataWriter {
       @Override
       public void write(RowData row, int ordinal) {
         long unscaledLong = row.getDecimal(ordinal, precision, 
scale).toUnscaledLong();
+        doWrite(unscaledLong);
+      }
+
+      @Override
+      public void write(ArrayData array, int ordinal) {
+        long unscaledLong = array.getDecimal(ordinal, precision, 
scale).toUnscaledLong();
+        doWrite(unscaledLong);
+      }
+
+      private void doWrite(long unscaled) {
         int i = 0;
         int shift = initShift;
         while (i < numBytes) {
-          decimalBuffer[i] = (byte) (unscaledLong >> shift);
+          decimalBuffer[i] = (byte) (unscaled >> shift);
           i += 1;
           shift -= 8;
         }
@@ -328,6 +413,16 @@ public class ParquetRowDataWriter {
       @Override
       public void write(RowData row, int ordinal) {
         byte[] bytes = row.getDecimal(ordinal, precision, 
scale).toUnscaledBytes();
+        doWrite(bytes);
+      }
+
+      @Override
+      public void write(ArrayData array, int ordinal) {
+        byte[] bytes = array.getDecimal(ordinal, precision, 
scale).toUnscaledBytes();
+        doWrite(bytes);
+      }
+
+      private void doWrite(byte[] bytes) {
         byte[] writtenBytes;
         if (bytes.length == numBytes) {
           // Avoid copy.
@@ -353,5 +448,132 @@ public class ParquetRowDataWriter {
     // 19 <= precision <= 38, writes as FIXED_LEN_BYTE_ARRAY
     return new UnscaledBytesWriter();
   }
+
+  private class ArrayWriter implements FieldWriter {
+    private final FieldWriter elementWriter;
+
+    private ArrayWriter(FieldWriter elementWriter) {
+      this.elementWriter = elementWriter;
+    }
+
+    @Override
+    public void write(RowData row, int ordinal) {
+      ArrayData arrayData = row.getArray(ordinal);
+      doWrite(arrayData);
+    }
+
+    @Override
+    public void write(ArrayData array, int ordinal) {
+      ArrayData arrayData = array.getArray(ordinal);
+      doWrite(arrayData);
+    }
+
+    private void doWrite(ArrayData arrayData) {
+      recordConsumer.startGroup();
+      if (arrayData.size() > 0) {
+        final String repeatedGroup = "list";
+        final String elementField = "element";
+        recordConsumer.startField(repeatedGroup, 0);
+        for (int i = 0; i < arrayData.size(); i++) {
+          recordConsumer.startGroup();
+          if (!arrayData.isNullAt(i)) {
+            // Only creates the element field if the current array element is 
not null.
+            recordConsumer.startField(elementField, 0);
+            elementWriter.write(arrayData, i);
+            recordConsumer.endField(elementField, 0);
+          }
+          recordConsumer.endGroup();
+        }
+        recordConsumer.endField(repeatedGroup, 0);
+      }
+      recordConsumer.endGroup();
+    }
+  }
+
+  private class MapWriter implements FieldWriter {
+    private final FieldWriter keyWriter;
+    private final FieldWriter valueWriter;
+
+    private MapWriter(FieldWriter keyWriter, FieldWriter valueWriter) {
+      this.keyWriter = keyWriter;
+      this.valueWriter = valueWriter;
+    }
+
+    @Override
+    public void write(RowData row, int ordinal) {
+      MapData map = row.getMap(ordinal);
+      doWrite(map);
+    }
+
+    @Override
+    public void write(ArrayData array, int ordinal) {
+      MapData map = array.getMap(ordinal);
+      doWrite(map);
+    }
+
+    private void doWrite(MapData mapData) {
+      ArrayData keyArray = mapData.keyArray();
+      ArrayData valueArray = mapData.valueArray();
+      recordConsumer.startGroup();
+      if (mapData.size() > 0) {
+        final String repeatedGroup = "key_value";
+        final String kField = "key";
+        final String vField = "value";
+        recordConsumer.startField(repeatedGroup, 0);
+        for (int i = 0; i < mapData.size(); i++) {
+          recordConsumer.startGroup();
+          // key
+          recordConsumer.startField(kField, 0);
+          this.keyWriter.write(keyArray, i);
+          recordConsumer.endField(kField, 0);
+          // value
+          if (!valueArray.isNullAt(i)) {
+            // Only creates the "value" field if the value if non-empty
+            recordConsumer.startField(vField, 1);
+            this.valueWriter.write(valueArray, i);
+            recordConsumer.endField(vField, 1);
+          }
+          recordConsumer.endGroup();
+        }
+        recordConsumer.endField(repeatedGroup, 0);
+      }
+      recordConsumer.endGroup();
+    }
+  }
+
+  private class RowWriter implements FieldWriter {
+    private final String[] fieldNames;
+    private final FieldWriter[] fieldWriters;
+
+    private RowWriter(String[] fieldNames, FieldWriter[] fieldWriters) {
+      this.fieldNames = fieldNames;
+      this.fieldWriters = fieldWriters;
+    }
+
+    @Override
+    public void write(RowData row, int ordinal) {
+      RowData nested = row.getRow(ordinal, fieldWriters.length);
+      doWrite(nested);
+    }
+
+    @Override
+    public void write(ArrayData array, int ordinal) {
+      RowData nested = array.getRow(ordinal, fieldWriters.length);
+      doWrite(nested);
+    }
+
+    private void doWrite(RowData row) {
+      recordConsumer.startGroup();
+      for (int i = 0; i < row.getArity(); i++) {
+        if (!row.isNullAt(i)) {
+          String fieldName = fieldNames[i];
+          recordConsumer.startField(fieldName, i);
+          fieldWriters[i].write(row, i);
+          recordConsumer.endField(fieldName, i);
+        }
+      }
+      recordConsumer.endGroup();
+    }
+  }
 }
 
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
index 80fda29..5da45bf 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
@@ -25,9 +25,11 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.MapTypeInfo;
 import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.flink.table.types.logical.TimestampType;
@@ -616,6 +618,45 @@ public class ParquetSchemaConverter {
           return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, 
repetition)
               .named(name);
         }
+      case ARRAY:
+        // <list-repetition> group <name> (LIST) {
+        //   repeated group list {
+        //     <element-repetition> <element-type> element;
+        //   }
+        // }
+        ArrayType arrayType = (ArrayType) type;
+        LogicalType elementType = arrayType.getElementType();
+        return Types
+            .buildGroup(repetition).as(OriginalType.LIST)
+            .addField(
+                Types.repeatedGroup()
+                    .addField(convertToParquetType("element", elementType, 
repetition))
+                    .named("list"))
+            .named(name);
+      case MAP:
+        // <map-repetition> group <name> (MAP) {
+        //   repeated group key_value {
+        //     required <key-type> key;
+        //     <value-repetition> <value-type> value;
+        //   }
+        // }
+        MapType mapType = (MapType) type;
+        LogicalType keyType = mapType.getKeyType();
+        LogicalType valueType = mapType.getValueType();
+        return Types
+            .buildGroup(repetition).as(OriginalType.MAP)
+            .addField(
+                Types
+                    .repeatedGroup()
+                    .addField(convertToParquetType("key", keyType, repetition))
+                    .addField(convertToParquetType("value", valueType, 
repetition))
+                    .named("key_value"))
+            .named(name);
+      case ROW:
+        RowType rowType = (RowType) type;
+        Types.GroupBuilder<GroupType> builder = Types.buildGroup(repetition);
+        rowType.getFields().forEach(field -> 
builder.addField(convertToParquetType(field.getName(), field.getType(), 
repetition)));
+        return builder.named(name);
       default:
         throw new UnsupportedOperationException("Unsupported type: " + type);
     }
diff --git 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
new file mode 100644
index 0000000..5305bcc
--- /dev/null
+++ 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.row.parquet;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Test cases for {@link ParquetSchemaConverter}.
+ */
+public class TestParquetSchemaConverter {
+  @Test
+  void testConvertComplexTypes() {
+    DataType dataType = DataTypes.ROW(
+        DataTypes.FIELD("f_array",
+            DataTypes.ARRAY(DataTypes.CHAR(10))),
+        DataTypes.FIELD("f_map",
+            DataTypes.MAP(DataTypes.INT(), DataTypes.VARCHAR(20))),
+        DataTypes.FIELD("f_row",
+            DataTypes.ROW(
+                DataTypes.FIELD("f_row_f0", DataTypes.INT()),
+                DataTypes.FIELD("f_row_f1", DataTypes.VARCHAR(10)),
+                DataTypes.FIELD("f_row_f2",
+                    DataTypes.ROW(
+                        DataTypes.FIELD("f_row_f2_f0", DataTypes.INT()),
+                        DataTypes.FIELD("f_row_f2_f1", 
DataTypes.VARCHAR(10)))))));
+    org.apache.parquet.schema.MessageType messageType =
+        ParquetSchemaConverter.convertToParquetMessageType("converted", 
(RowType) dataType.getLogicalType());
+    assertThat(messageType.getColumns().size(), is(7));
+    final String expected = "message converted {\n"
+        + "  optional group f_array (LIST) {\n"
+        + "    repeated group list {\n"
+        + "      optional binary element (UTF8);\n"
+        + "    }\n"
+        + "  }\n"
+        + "  optional group f_map (MAP) {\n"
+        + "    repeated group key_value {\n"
+        + "      optional int32 key;\n"
+        + "      optional binary value (UTF8);\n"
+        + "    }\n"
+        + "  }\n"
+        + "  optional group f_row {\n"
+        + "    optional int32 f_row_f0;\n"
+        + "    optional binary f_row_f1 (UTF8);\n"
+        + "    optional group f_row_f2 {\n"
+        + "      optional int32 f_row_f2_f0;\n"
+        + "      optional binary f_row_f2_f1 (UTF8);\n"
+        + "    }\n"
+        + "  }\n"
+        + "}\n";
+    assertThat(messageType.toString(), is(expected));
+  }
+}
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java 
b/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
index 090ed29..a72b885 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
@@ -30,7 +30,10 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.List;
 
 /**
@@ -43,6 +46,7 @@ import java.util.List;
  * @see StreamWriteOperatorCoordinator
  */
 public class AppendWriteFunction<I> extends AbstractStreamWriteFunction<I> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(AppendWriteFunction.class);
 
   private static final long serialVersionUID = 1L;
 
@@ -113,14 +117,19 @@ public class AppendWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
   }
 
   private void flushData(boolean endInput) {
-    if (this.writerHelper == null) {
-      // does not process any inputs, returns early.
-      return;
+    final List<WriteStatus> writeStatus;
+    final String instant;
+    if (this.writerHelper != null) {
+      writeStatus = this.writerHelper.getWriteStatuses(this.taskID);
+      instant = this.writerHelper.getInstantTime();
+    } else {
+      LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, 
currentInstant);
+      writeStatus = Collections.emptyList();
+      instant = instantToWrite(false);
     }
-    final List<WriteStatus> writeStatus = 
this.writerHelper.getWriteStatuses(this.taskID);
     final WriteMetadataEvent event = WriteMetadataEvent.builder()
         .taskID(taskID)
-        .instantTime(this.writerHelper.getInstantTime())
+        .instantTime(instant)
         .writeStatus(writeStatus)
         .lastBatch(true)
         .endInput(endInput)
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetColumnarRowSplitReader.java
 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetColumnarRowSplitReader.java
index 64eb1f4..c615283 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetColumnarRowSplitReader.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetColumnarRowSplitReader.java
@@ -18,10 +18,11 @@
 
 package org.apache.hudi.table.format.cow;
 
+import org.apache.hudi.table.format.cow.data.ColumnarRowData;
+import org.apache.hudi.table.format.cow.vector.VectorizedColumnBatch;
+
 import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
-import org.apache.flink.table.data.ColumnarRowData;
 import org.apache.flink.table.data.vector.ColumnVector;
-import org.apache.flink.table.data.vector.VectorizedColumnBatch;
 import org.apache.flink.table.data.vector.writable.WritableColumnVector;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
@@ -208,11 +209,14 @@ public class ParquetColumnarRowSplitReader implements 
Closeable {
 
   private WritableColumnVector[] createWritableVectors() {
     WritableColumnVector[] columns = new 
WritableColumnVector[requestedTypes.length];
+    List<Type> types = requestedSchema.getFields();
+    List<ColumnDescriptor> descriptors = requestedSchema.getColumns();
     for (int i = 0; i < requestedTypes.length; i++) {
       columns[i] = createWritableColumnVector(
           batchSize,
           requestedTypes[i],
-          requestedSchema.getColumns().get(i).getPrimitiveType());
+          types.get(i),
+          descriptors);
     }
     return columns;
   }
@@ -236,11 +240,6 @@ public class ParquetColumnarRowSplitReader implements 
Closeable {
      * Check that the requested schema is supported.
      */
     for (int i = 0; i < requestedSchema.getFieldCount(); ++i) {
-      Type t = requestedSchema.getFields().get(i);
-      if (!t.isPrimitive() || t.isRepetition(Type.Repetition.REPEATED)) {
-        throw new UnsupportedOperationException("Complex types not 
supported.");
-      }
-
       String[] colPath = requestedSchema.getPaths().get(i);
       if (fileSchema.containsPath(colPath)) {
         ColumnDescriptor fd = fileSchema.getColumnDescription(colPath);
@@ -322,14 +321,16 @@ public class ParquetColumnarRowSplitReader implements 
Closeable {
       throw new IOException("expecting more rows but reached last block. Read "
           + rowsReturned + " out of " + totalRowCount);
     }
+    List<Type> types = requestedSchema.getFields();
     List<ColumnDescriptor> columns = requestedSchema.getColumns();
-    columnReaders = new ColumnReader[columns.size()];
-    for (int i = 0; i < columns.size(); ++i) {
+    columnReaders = new ColumnReader[types.size()];
+    for (int i = 0; i < types.size(); ++i) {
       columnReaders[i] = createColumnReader(
           utcTimestamp,
           requestedTypes[i],
-          columns.get(i),
-          pages.getPageReader(columns.get(i)));
+          types.get(i),
+          columns,
+          pages);
     }
     totalCountLoadedSoFar += pages.getRowCount();
   }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetDecimalVector.java
 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetDecimalVector.java
index 2749f02..4705b2f 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetDecimalVector.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetDecimalVector.java
@@ -32,9 +32,9 @@ import org.apache.flink.table.data.vector.DecimalColumnVector;
  */
 public class ParquetDecimalVector implements DecimalColumnVector {
 
-  private final ColumnVector vector;
+  public final ColumnVector vector;
 
-  ParquetDecimalVector(ColumnVector vector) {
+  public ParquetDecimalVector(ColumnVector vector) {
     this.vector = vector;
   }
 
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index 29c1b20..10a2dcd 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -18,6 +18,15 @@
 
 package org.apache.hudi.table.format.cow;
 
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
+import org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
+import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
+import org.apache.hudi.table.format.cow.vector.VectorizedColumnBatch;
+import org.apache.hudi.table.format.cow.vector.reader.ArrayColumnReader;
+import org.apache.hudi.table.format.cow.vector.reader.MapColumnReader;
+import org.apache.hudi.table.format.cow.vector.reader.RowColumnReader;
+
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.formats.parquet.vector.reader.BooleanColumnReader;
 import org.apache.flink.formats.parquet.vector.reader.ByteColumnReader;
@@ -32,7 +41,6 @@ import 
org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader;
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.data.vector.ColumnVector;
-import org.apache.flink.table.data.vector.VectorizedColumnBatch;
 import org.apache.flink.table.data.vector.heap.HeapBooleanVector;
 import org.apache.flink.table.data.vector.heap.HeapByteVector;
 import org.apache.flink.table.data.vector.heap.HeapBytesVector;
@@ -44,16 +52,24 @@ import 
org.apache.flink.table.data.vector.heap.HeapShortVector;
 import org.apache.flink.table.data.vector.heap.HeapTimestampVector;
 import org.apache.flink.table.data.vector.writable.WritableColumnVector;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.util.Preconditions;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.ParquetRuntimeException;
 import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.InvalidSchemaException;
 import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
 
 import java.io.IOException;
 import java.math.BigDecimal;
@@ -61,6 +77,7 @@ import java.nio.charset.StandardCharsets;
 import java.sql.Date;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -252,11 +269,40 @@ public class ParquetSplitReaderUtil {
     }
   }
 
+  private static List<ColumnDescriptor> filterDescriptors(int depth, Type 
type, List<ColumnDescriptor> columns) throws ParquetRuntimeException {
+    List<ColumnDescriptor> filtered = new ArrayList<>();
+    for (ColumnDescriptor descriptor : columns) {
+      if (depth >= descriptor.getPath().length) {
+        throw new InvalidSchemaException("Expect depth " + depth + " for 
schema: " + descriptor);
+      }
+      if (type.getName().equals(descriptor.getPath()[depth])) {
+        filtered.add(descriptor);
+      }
+    }
+    ValidationUtils.checkState(filtered.size() > 0, "Corrupted Parquet 
schema");
+    return filtered;
+  }
+
   public static ColumnReader createColumnReader(
       boolean utcTimestamp,
       LogicalType fieldType,
-      ColumnDescriptor descriptor,
-      PageReader pageReader) throws IOException {
+      Type physicalType,
+      List<ColumnDescriptor> descriptors,
+      PageReadStore pages) throws IOException {
+    return createColumnReader(utcTimestamp, fieldType, physicalType, 
descriptors,
+        pages, 0);
+  }
+
+  private static ColumnReader createColumnReader(
+      boolean utcTimestamp,
+      LogicalType fieldType,
+      Type physicalType,
+      List<ColumnDescriptor> columns,
+      PageReadStore pages,
+      int depth) throws IOException {
+    List<ColumnDescriptor> descriptors = filterDescriptors(depth, 
physicalType, columns);
+    ColumnDescriptor descriptor = descriptors.get(0);
+    PageReader pageReader = pages.getPageReader(descriptor);
     switch (fieldType.getTypeRoot()) {
       case BOOLEAN:
         return new BooleanColumnReader(descriptor, pageReader);
@@ -303,6 +349,45 @@ public class ParquetSplitReaderUtil {
           default:
             throw new AssertionError();
         }
+      case ARRAY:
+        return new ArrayColumnReader(
+            descriptor,
+            pageReader,
+            utcTimestamp,
+            descriptor.getPrimitiveType(),
+            fieldType);
+      case MAP:
+        MapType mapType = (MapType) fieldType;
+        ArrayColumnReader keyReader =
+            new ArrayColumnReader(
+                descriptor,
+                pageReader,
+                utcTimestamp,
+                descriptor.getPrimitiveType(),
+                new ArrayType(mapType.getKeyType()));
+        ArrayColumnReader valueReader =
+            new ArrayColumnReader(
+                descriptors.get(1),
+                pages.getPageReader(descriptors.get(1)),
+                utcTimestamp,
+                descriptors.get(1).getPrimitiveType(),
+                new ArrayType(mapType.getValueType()));
+        return new MapColumnReader(keyReader, valueReader, fieldType);
+      case ROW:
+        RowType rowType = (RowType) fieldType;
+        GroupType groupType = physicalType.asGroupType();
+        List<ColumnReader> fieldReaders = new ArrayList<>();
+        for (int i = 0; i < rowType.getFieldCount(); i++) {
+          fieldReaders.add(
+              createColumnReader(
+                  utcTimestamp,
+                  rowType.getTypeAt(i),
+                  groupType.getType(i),
+                  descriptors,
+                  pages,
+                  depth + 1));
+        }
+        return new RowColumnReader(fieldReaders);
       default:
         throw new UnsupportedOperationException(fieldType + " is not supported 
now.");
     }
@@ -311,7 +396,19 @@ public class ParquetSplitReaderUtil {
   public static WritableColumnVector createWritableColumnVector(
       int batchSize,
       LogicalType fieldType,
-      PrimitiveType primitiveType) {
+      Type physicalType,
+      List<ColumnDescriptor> descriptors) {
+    return createWritableColumnVector(batchSize, fieldType, physicalType, 
descriptors, 0);
+  }
+
+  private static WritableColumnVector createWritableColumnVector(
+      int batchSize,
+      LogicalType fieldType,
+      Type physicalType,
+      List<ColumnDescriptor> columns,
+      int depth) {
+    List<ColumnDescriptor> descriptors = filterDescriptors(depth, 
physicalType, columns);
+    PrimitiveType primitiveType = descriptors.get(0).getPrimitiveType();
     PrimitiveType.PrimitiveTypeName typeName = 
primitiveType.getPrimitiveTypeName();
     switch (fieldType.getTypeRoot()) {
       case BOOLEAN:
@@ -371,6 +468,49 @@ public class ParquetSplitReaderUtil {
                 && primitiveType.getOriginalType() == OriginalType.DECIMAL,
             "Unexpected type: %s", typeName);
         return new HeapBytesVector(batchSize);
+      case ARRAY:
+        ArrayType arrayType = (ArrayType) fieldType;
+        return new HeapArrayVector(
+            batchSize,
+            createWritableColumnVector(
+                batchSize,
+                arrayType.getElementType(),
+                physicalType,
+                descriptors,
+                depth));
+      case MAP:
+        MapType mapType = (MapType) fieldType;
+        GroupType repeatedType = 
physicalType.asGroupType().getType(0).asGroupType();
+        // the map column has three level paths.
+        return new HeapMapColumnVector(
+            batchSize,
+            createWritableColumnVector(
+                batchSize,
+                mapType.getKeyType(),
+                repeatedType.getType(0),
+                descriptors,
+                depth + 2),
+            createWritableColumnVector(
+                batchSize,
+                mapType.getValueType(),
+                repeatedType.getType(1),
+                descriptors,
+                depth + 2));
+      case ROW:
+        RowType rowType = (RowType) fieldType;
+        GroupType groupType = physicalType.asGroupType();
+        WritableColumnVector[] columnVectors =
+            new WritableColumnVector[rowType.getFieldCount()];
+        for (int i = 0; i < columnVectors.length; i++) {
+          columnVectors[i] =
+              createWritableColumnVector(
+                  batchSize,
+                  rowType.getTypeAt(i),
+                  groupType.getType(i),
+                  descriptors,
+                  depth + 1);
+        }
+        return new HeapRowColumnVector(batchSize, columnVectors);
       default:
         throw new UnsupportedOperationException(fieldType + " is not supported 
now.");
     }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/data/ColumnarArrayData.java
 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/data/ColumnarArrayData.java
new file mode 100644
index 0000000..a16a4dd
--- /dev/null
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/data/ColumnarArrayData.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.data;
+
+import org.apache.hudi.table.format.cow.vector.MapColumnVector;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.binary.TypedSetters;
+import org.apache.flink.table.data.vector.ArrayColumnVector;
+import org.apache.flink.table.data.vector.BooleanColumnVector;
+import org.apache.flink.table.data.vector.ByteColumnVector;
+import org.apache.flink.table.data.vector.BytesColumnVector;
+import org.apache.flink.table.data.vector.ColumnVector;
+import org.apache.flink.table.data.vector.DecimalColumnVector;
+import org.apache.flink.table.data.vector.DoubleColumnVector;
+import org.apache.flink.table.data.vector.FloatColumnVector;
+import org.apache.flink.table.data.vector.IntColumnVector;
+import org.apache.flink.table.data.vector.LongColumnVector;
+import org.apache.flink.table.data.vector.RowColumnVector;
+import org.apache.flink.table.data.vector.ShortColumnVector;
+import org.apache.flink.table.data.vector.TimestampColumnVector;
+
+import java.util.Arrays;
+
+/**
+ * Columnar array to support access to vector column data.
+ *
+ * <p>References {@code org.apache.flink.table.data.ColumnarArrayData} to 
include FLINK-15390.
+ */
+public final class ColumnarArrayData implements ArrayData, TypedSetters {
+
+  private final ColumnVector data;
+  private final int offset;
+  private final int numElements;
+
+  public ColumnarArrayData(ColumnVector data, int offset, int numElements) {
+    this.data = data;
+    this.offset = offset;
+    this.numElements = numElements;
+  }
+
+  @Override
+  public int size() {
+    return numElements;
+  }
+
+  @Override
+  public boolean isNullAt(int pos) {
+    return data.isNullAt(offset + pos);
+  }
+
+  @Override
+  public void setNullAt(int pos) {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public boolean getBoolean(int pos) {
+    return ((BooleanColumnVector) data).getBoolean(offset + pos);
+  }
+
+  @Override
+  public byte getByte(int pos) {
+    return ((ByteColumnVector) data).getByte(offset + pos);
+  }
+
+  @Override
+  public short getShort(int pos) {
+    return ((ShortColumnVector) data).getShort(offset + pos);
+  }
+
+  @Override
+  public int getInt(int pos) {
+    return ((IntColumnVector) data).getInt(offset + pos);
+  }
+
+  @Override
+  public long getLong(int pos) {
+    return ((LongColumnVector) data).getLong(offset + pos);
+  }
+
+  @Override
+  public float getFloat(int pos) {
+    return ((FloatColumnVector) data).getFloat(offset + pos);
+  }
+
+  @Override
+  public double getDouble(int pos) {
+    return ((DoubleColumnVector) data).getDouble(offset + pos);
+  }
+
+  @Override
+  public StringData getString(int pos) {
+    BytesColumnVector.Bytes byteArray = getByteArray(pos);
+    return StringData.fromBytes(byteArray.data, byteArray.offset, 
byteArray.len);
+  }
+
+  @Override
+  public DecimalData getDecimal(int pos, int precision, int scale) {
+    return ((DecimalColumnVector) data).getDecimal(offset + pos, precision, 
scale);
+  }
+
+  @Override
+  public TimestampData getTimestamp(int pos, int precision) {
+    return ((TimestampColumnVector) data).getTimestamp(offset + pos, 
precision);
+  }
+
+  @Override
+  public <T> RawValueData<T> getRawValue(int pos) {
+    throw new UnsupportedOperationException("RawValueData is not supported.");
+  }
+
+  @Override
+  public byte[] getBinary(int pos) {
+    BytesColumnVector.Bytes byteArray = getByteArray(pos);
+    if (byteArray.len == byteArray.data.length) {
+      return byteArray.data;
+    } else {
+      return Arrays.copyOfRange(byteArray.data, byteArray.offset, 
byteArray.len);
+    }
+  }
+
+  @Override
+  public ArrayData getArray(int pos) {
+    return ((ArrayColumnVector) data).getArray(offset + pos);
+  }
+
+  @Override
+  public MapData getMap(int pos) {
+    return ((MapColumnVector) data).getMap(offset + pos);
+  }
+
+  @Override
+  public RowData getRow(int pos, int numFields) {
+    return ((RowColumnVector) data).getRow(offset + pos);
+  }
+
+  @Override
+  public void setBoolean(int pos, boolean value) {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public void setByte(int pos, byte value) {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public void setShort(int pos, short value) {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public void setInt(int pos, int value) {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public void setLong(int pos, long value) {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public void setFloat(int pos, float value) {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public void setDouble(int pos, double value) {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public void setDecimal(int pos, DecimalData value, int precision) {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public void setTimestamp(int pos, TimestampData value, int precision) {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public boolean[] toBooleanArray() {
+    boolean[] res = new boolean[numElements];
+    for (int i = 0; i < numElements; i++) {
+      res[i] = getBoolean(i);
+    }
+    return res;
+  }
+
+  @Override
+  public byte[] toByteArray() {
+    byte[] res = new byte[numElements];
+    for (int i = 0; i < numElements; i++) {
+      res[i] = getByte(i);
+    }
+    return res;
+  }
+
+  @Override
+  public short[] toShortArray() {
+    short[] res = new short[numElements];
+    for (int i = 0; i < numElements; i++) {
+      res[i] = getShort(i);
+    }
+    return res;
+  }
+
+  @Override
+  public int[] toIntArray() {
+    int[] res = new int[numElements];
+    for (int i = 0; i < numElements; i++) {
+      res[i] = getInt(i);
+    }
+    return res;
+  }
+
+  @Override
+  public long[] toLongArray() {
+    long[] res = new long[numElements];
+    for (int i = 0; i < numElements; i++) {
+      res[i] = getLong(i);
+    }
+    return res;
+  }
+
+  @Override
+  public float[] toFloatArray() {
+    float[] res = new float[numElements];
+    for (int i = 0; i < numElements; i++) {
+      res[i] = getFloat(i);
+    }
+    return res;
+  }
+
+  @Override
+  public double[] toDoubleArray() {
+    double[] res = new double[numElements];
+    for (int i = 0; i < numElements; i++) {
+      res[i] = getDouble(i);
+    }
+    return res;
+  }
+
+  private BytesColumnVector.Bytes getByteArray(int pos) {
+    return ((BytesColumnVector) data).getBytes(offset + pos);
+  }
+}
+
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/data/ColumnarMapData.java
 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/data/ColumnarMapData.java
new file mode 100644
index 0000000..9792e87
--- /dev/null
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/data/ColumnarMapData.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.data;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.ColumnarArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.vector.ColumnVector;
+
+/**
+ * Columnar map to support access to vector column data.
+ *
+ * <p>Referenced from flink 1.14.0 {@code 
org.apache.flink.table.data.ColumnarMapData}.
+ */
+public final class ColumnarMapData implements MapData {
+
+  private final ColumnVector keyColumnVector;
+  private final ColumnVector valueColumnVector;
+  private final int offset;
+  private final int size;
+
+  public ColumnarMapData(
+      ColumnVector keyColumnVector,
+      ColumnVector valueColumnVector,
+      int offset,
+      int size) {
+    this.keyColumnVector = keyColumnVector;
+    this.valueColumnVector = valueColumnVector;
+    this.offset = offset;
+    this.size = size;
+  }
+
+  @Override
+  public int size() {
+    return size;
+  }
+
+  @Override
+  public ArrayData keyArray() {
+    return new ColumnarArrayData(keyColumnVector, offset, size);
+  }
+
+  @Override
+  public ArrayData valueArray() {
+    return new ColumnarArrayData(valueColumnVector, offset, size);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    throw new UnsupportedOperationException(
+        "ColumnarMapData do not support equals, please compare fields one by 
one!");
+  }
+
+  @Override
+  public int hashCode() {
+    throw new UnsupportedOperationException(
+        "ColumnarMapData do not support hashCode, please hash fields one by 
one!");
+  }
+}
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/data/ColumnarRowData.java
 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/data/ColumnarRowData.java
new file mode 100644
index 0000000..ebb4ca2
--- /dev/null
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/data/ColumnarRowData.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.data;
+
+import org.apache.hudi.table.format.cow.vector.VectorizedColumnBatch;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.binary.TypedSetters;
+import org.apache.flink.table.data.vector.BytesColumnVector;
+import org.apache.flink.types.RowKind;
+
+/**
+ * Columnar row to support access to vector column data.
+ * It is a row view in {@link VectorizedColumnBatch}.
+ *
+ * <p>References {@code org.apache.flink.table.data.ColumnarRowData} to 
include FLINK-15390.
+ */
+public final class ColumnarRowData implements RowData, TypedSetters {
+
+  private RowKind rowKind = RowKind.INSERT;
+  private VectorizedColumnBatch vectorizedColumnBatch;
+  private int rowId;
+
+  public ColumnarRowData() {
+  }
+
+  public ColumnarRowData(VectorizedColumnBatch vectorizedColumnBatch) {
+    this(vectorizedColumnBatch, 0);
+  }
+
+  public ColumnarRowData(VectorizedColumnBatch vectorizedColumnBatch, int 
rowId) {
+    this.vectorizedColumnBatch = vectorizedColumnBatch;
+    this.rowId = rowId;
+  }
+
+  public void setVectorizedColumnBatch(VectorizedColumnBatch 
vectorizedColumnBatch) {
+    this.vectorizedColumnBatch = vectorizedColumnBatch;
+    this.rowId = 0;
+  }
+
+  public void setRowId(int rowId) {
+    this.rowId = rowId;
+  }
+
+  @Override
+  public RowKind getRowKind() {
+    return rowKind;
+  }
+
+  @Override
+  public void setRowKind(RowKind kind) {
+    this.rowKind = kind;
+  }
+
+  @Override
+  public int getArity() {
+    return vectorizedColumnBatch.getArity();
+  }
+
+  @Override
+  public boolean isNullAt(int pos) {
+    return vectorizedColumnBatch.isNullAt(rowId, pos);
+  }
+
+  @Override
+  public boolean getBoolean(int pos) {
+    return vectorizedColumnBatch.getBoolean(rowId, pos);
+  }
+
+  @Override
+  public byte getByte(int pos) {
+    return vectorizedColumnBatch.getByte(rowId, pos);
+  }
+
+  @Override
+  public short getShort(int pos) {
+    return vectorizedColumnBatch.getShort(rowId, pos);
+  }
+
+  @Override
+  public int getInt(int pos) {
+    return vectorizedColumnBatch.getInt(rowId, pos);
+  }
+
+  @Override
+  public long getLong(int pos) {
+    return vectorizedColumnBatch.getLong(rowId, pos);
+  }
+
+  @Override
+  public float getFloat(int pos) {
+    return vectorizedColumnBatch.getFloat(rowId, pos);
+  }
+
+  @Override
+  public double getDouble(int pos) {
+    return vectorizedColumnBatch.getDouble(rowId, pos);
+  }
+
+  @Override
+  public StringData getString(int pos) {
+    BytesColumnVector.Bytes byteArray = 
vectorizedColumnBatch.getByteArray(rowId, pos);
+    return StringData.fromBytes(byteArray.data, byteArray.offset, 
byteArray.len);
+  }
+
+  @Override
+  public DecimalData getDecimal(int pos, int precision, int scale) {
+    return vectorizedColumnBatch.getDecimal(rowId, pos, precision, scale);
+  }
+
+  @Override
+  public TimestampData getTimestamp(int pos, int precision) {
+    return vectorizedColumnBatch.getTimestamp(rowId, pos, precision);
+  }
+
+  @Override
+  public <T> RawValueData<T> getRawValue(int pos) {
+    throw new UnsupportedOperationException("RawValueData is not supported.");
+  }
+
+  @Override
+  public byte[] getBinary(int pos) {
+    BytesColumnVector.Bytes byteArray = 
vectorizedColumnBatch.getByteArray(rowId, pos);
+    if (byteArray.len == byteArray.data.length) {
+      return byteArray.data;
+    } else {
+      byte[] ret = new byte[byteArray.len];
+      System.arraycopy(byteArray.data, byteArray.offset, ret, 0, 
byteArray.len);
+      return ret;
+    }
+  }
+
+  @Override
+  public RowData getRow(int pos, int numFields) {
+    return vectorizedColumnBatch.getRow(rowId, pos);
+  }
+
+  @Override
+  public ArrayData getArray(int pos) {
+    return vectorizedColumnBatch.getArray(rowId, pos);
+  }
+
+  @Override
+  public MapData getMap(int pos) {
+    return vectorizedColumnBatch.getMap(rowId, pos);
+  }
+
+  @Override
+  public void setNullAt(int pos) {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public void setBoolean(int pos, boolean value) {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public void setByte(int pos, byte value) {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public void setShort(int pos, short value) {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public void setInt(int pos, int value) {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public void setLong(int pos, long value) {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public void setFloat(int pos, float value) {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public void setDouble(int pos, double value) {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public void setDecimal(int pos, DecimalData value, int precision) {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public void setTimestamp(int pos, TimestampData value, int precision) {
+    throw new UnsupportedOperationException("Not support the operation!");
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    throw new UnsupportedOperationException(
+        "ColumnarRowData do not support equals, please compare fields one by 
one!");
+  }
+
+  @Override
+  public int hashCode() {
+    throw new UnsupportedOperationException(
+        "ColumnarRowData do not support hashCode, please hash fields one by 
one!");
+  }
+}
+
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayVector.java
 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayVector.java
new file mode 100644
index 0000000..f4c15b6
--- /dev/null
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayVector.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.hudi.table.format.cow.data.ColumnarArrayData;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.vector.ArrayColumnVector;
+import org.apache.flink.table.data.vector.ColumnVector;
+import org.apache.flink.table.data.vector.heap.AbstractHeapVector;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
+
+/**
+ * This class represents a nullable heap array column vector.
+ */
+public class HeapArrayVector extends AbstractHeapVector
+    implements WritableColumnVector, ArrayColumnVector {
+
+  public long[] offsets;
+  public long[] lengths;
+  public ColumnVector child;
+  private int size;
+
+  public HeapArrayVector(int len) {
+    super(len);
+    offsets = new long[len];
+    lengths = new long[len];
+  }
+
+  public HeapArrayVector(int len, ColumnVector vector) {
+    super(len);
+    offsets = new long[len];
+    lengths = new long[len];
+    this.child = vector;
+  }
+
+  public int getSize() {
+    return size;
+  }
+
+  public void setSize(int size) {
+    this.size = size;
+  }
+
+  public int getLen() {
+    return this.isNull.length;
+  }
+
+  @Override
+  public ArrayData getArray(int i) {
+    long offset = offsets[i];
+    long length = lengths[i];
+    return new ColumnarArrayData(child, (int) offset, (int) length);
+  }
+}
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
new file mode 100644
index 0000000..f05a2e7
--- /dev/null
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.hudi.table.format.cow.data.ColumnarMapData;
+
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.vector.ColumnVector;
+import org.apache.flink.table.data.vector.heap.AbstractHeapVector;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
+
+/**
+ * This class represents a nullable heap map column vector.
+ */
+public class HeapMapColumnVector extends AbstractHeapVector
+    implements WritableColumnVector, MapColumnVector {
+
+  private long[] offsets;
+  private long[] lengths;
+  private int size;
+  private ColumnVector keys;
+  private ColumnVector values;
+
+  public HeapMapColumnVector(int len, ColumnVector keys, ColumnVector values) {
+    super(len);
+    size = 0;
+    offsets = new long[len];
+    lengths = new long[len];
+    this.keys = keys;
+    this.values = values;
+  }
+
+  public void setOffsets(long[] offsets) {
+    this.offsets = offsets;
+  }
+
+  public void setLengths(long[] lengths) {
+    this.lengths = lengths;
+  }
+
+  public void setKeys(ColumnVector keys) {
+    this.keys = keys;
+  }
+
+  public void setValues(ColumnVector values) {
+    this.values = values;
+  }
+
+  public int getSize() {
+    return size;
+  }
+
+  public void setSize(int size) {
+    this.size = size;
+  }
+
+  @Override
+  public MapData getMap(int i) {
+    long offset = offsets[i];
+    long length = lengths[i];
+    return new ColumnarMapData(keys, values, (int) offset, (int) length);
+  }
+}
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
new file mode 100644
index 0000000..ad05a61
--- /dev/null
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.hudi.table.format.cow.data.ColumnarRowData;
+
+import org.apache.flink.table.data.vector.heap.AbstractHeapVector;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
+
+/**
+ * This class represents a nullable heap row column vector.
+ */
+public class HeapRowColumnVector extends AbstractHeapVector
+    implements WritableColumnVector, RowColumnVector {
+
+  public WritableColumnVector[] vectors;
+
+  public HeapRowColumnVector(int len, WritableColumnVector... vectors) {
+    super(len);
+    this.vectors = vectors;
+  }
+
+  @Override
+  public ColumnarRowData getRow(int i) {
+    ColumnarRowData columnarRowData = new ColumnarRowData(new 
VectorizedColumnBatch(vectors));
+    columnarRowData.setRowId(i);
+    return columnarRowData;
+  }
+}
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/MapColumnVector.java
 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/MapColumnVector.java
new file mode 100644
index 0000000..38424da
--- /dev/null
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/MapColumnVector.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.vector.ColumnVector;
+
+/**
+ * Map column vector.
+ */
+public interface MapColumnVector extends ColumnVector {
+  MapData getMap(int i);
+}
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/RowColumnVector.java
 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/RowColumnVector.java
new file mode 100644
index 0000000..293af7b
--- /dev/null
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/RowColumnVector.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.hudi.table.format.cow.data.ColumnarRowData;
+
+import org.apache.flink.table.data.vector.ColumnVector;
+
+/**
+ * Row column vector.
+ */
+public interface RowColumnVector extends ColumnVector {
+  ColumnarRowData getRow(int i);
+}
\ No newline at end of file
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/VectorizedColumnBatch.java
 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/VectorizedColumnBatch.java
new file mode 100644
index 0000000..9eee55d
--- /dev/null
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/VectorizedColumnBatch.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.vector.ArrayColumnVector;
+import org.apache.flink.table.data.vector.BooleanColumnVector;
+import org.apache.flink.table.data.vector.ByteColumnVector;
+import org.apache.flink.table.data.vector.BytesColumnVector;
+import org.apache.flink.table.data.vector.ColumnVector;
+import org.apache.flink.table.data.vector.DecimalColumnVector;
+import org.apache.flink.table.data.vector.DoubleColumnVector;
+import org.apache.flink.table.data.vector.FloatColumnVector;
+import org.apache.flink.table.data.vector.IntColumnVector;
+import org.apache.flink.table.data.vector.LongColumnVector;
+import org.apache.flink.table.data.vector.ShortColumnVector;
+import org.apache.flink.table.data.vector.TimestampColumnVector;
+
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * A VectorizedColumnBatch is a set of rows, organized with each column as a 
vector. It is the unit
+ * of query execution, organized to minimize the cost per row.
+ *
+ * <p>{@code VectorizedColumnBatch}s are influenced by Apache Hive 
VectorizedRowBatch.
+ *
+ * <p>References {@code 
org.apache.flink.table.data.vector.VectorizedColumnBatch} to include 
FLINK-15390.
+ */
+public class VectorizedColumnBatch implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * This number is carefully chosen to minimize overhead and typically allows 
one
+   * VectorizedColumnBatch to fit in cache.
+   */
+  public static final int DEFAULT_SIZE = 2048;
+
+  private int numRows;
+  public final ColumnVector[] columns;
+
+  public VectorizedColumnBatch(ColumnVector[] vectors) {
+    this.columns = vectors;
+  }
+
+  public void setNumRows(int numRows) {
+    this.numRows = numRows;
+  }
+
+  public int getNumRows() {
+    return numRows;
+  }
+
+  public int getArity() {
+    return columns.length;
+  }
+
+  public boolean isNullAt(int rowId, int colId) {
+    return columns[colId].isNullAt(rowId);
+  }
+
+  public boolean getBoolean(int rowId, int colId) {
+    return ((BooleanColumnVector) columns[colId]).getBoolean(rowId);
+  }
+
+  public byte getByte(int rowId, int colId) {
+    return ((ByteColumnVector) columns[colId]).getByte(rowId);
+  }
+
+  public short getShort(int rowId, int colId) {
+    return ((ShortColumnVector) columns[colId]).getShort(rowId);
+  }
+
+  public int getInt(int rowId, int colId) {
+    return ((IntColumnVector) columns[colId]).getInt(rowId);
+  }
+
+  public long getLong(int rowId, int colId) {
+    return ((LongColumnVector) columns[colId]).getLong(rowId);
+  }
+
+  public float getFloat(int rowId, int colId) {
+    return ((FloatColumnVector) columns[colId]).getFloat(rowId);
+  }
+
+  public double getDouble(int rowId, int colId) {
+    return ((DoubleColumnVector) columns[colId]).getDouble(rowId);
+  }
+
+  public BytesColumnVector.Bytes getByteArray(int rowId, int colId) {
+    return ((BytesColumnVector) columns[colId]).getBytes(rowId);
+  }
+
+  private byte[] getBytes(int rowId, int colId) {
+    BytesColumnVector.Bytes byteArray = getByteArray(rowId, colId);
+    if (byteArray.len == byteArray.data.length) {
+      return byteArray.data;
+    } else {
+      return byteArray.getBytes();
+    }
+  }
+
+  public String getString(int rowId, int colId) {
+    BytesColumnVector.Bytes byteArray = getByteArray(rowId, colId);
+    return new String(byteArray.data, byteArray.offset, byteArray.len, 
StandardCharsets.UTF_8);
+  }
+
+  public DecimalData getDecimal(int rowId, int colId, int precision, int 
scale) {
+    return ((DecimalColumnVector) (columns[colId])).getDecimal(rowId, 
precision, scale);
+  }
+
+  public TimestampData getTimestamp(int rowId, int colId, int precision) {
+    return ((TimestampColumnVector) (columns[colId])).getTimestamp(rowId, 
precision);
+  }
+
+  public ArrayData getArray(int rowId, int colId) {
+    return ((ArrayColumnVector) columns[colId]).getArray(rowId);
+  }
+
+  public RowData getRow(int rowId, int colId) {
+    return ((RowColumnVector) columns[colId]).getRow(rowId);
+  }
+
+  public MapData getMap(int rowId, int colId) {
+    return ((MapColumnVector) columns[colId]).getMap(rowId);
+  }
+}
+
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayColumnReader.java
 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayColumnReader.java
new file mode 100644
index 0000000..256d4c1
--- /dev/null
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayColumnReader.java
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector.reader;
+
+import org.apache.hudi.table.format.cow.ParquetDecimalVector;
+import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
+
+import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.vector.VectorizedColumnBatch;
+import org.apache.flink.table.data.vector.heap.HeapBooleanVector;
+import org.apache.flink.table.data.vector.heap.HeapByteVector;
+import org.apache.flink.table.data.vector.heap.HeapBytesVector;
+import org.apache.flink.table.data.vector.heap.HeapDoubleVector;
+import org.apache.flink.table.data.vector.heap.HeapFloatVector;
+import org.apache.flink.table.data.vector.heap.HeapIntVector;
+import org.apache.flink.table.data.vector.heap.HeapLongVector;
+import org.apache.flink.table.data.vector.heap.HeapShortVector;
+import org.apache.flink.table.data.vector.heap.HeapTimestampVector;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Array {@link ColumnReader}.
+ */
+public class ArrayColumnReader extends BaseVectorizedColumnReader {
+
+  // The value read in last time
+  private Object lastValue;
+
+  // flag to indicate if there is no data in parquet data page
+  private boolean eof = false;
+
+  // flag to indicate if it's the first time to read parquet data page with 
this instance
+  boolean isFirstRow = true;
+
+  public ArrayColumnReader(
+      ColumnDescriptor descriptor,
+      PageReader pageReader,
+      boolean isUtcTimestamp,
+      Type type,
+      LogicalType logicalType)
+      throws IOException {
+    super(descriptor, pageReader, isUtcTimestamp, type, logicalType);
+  }
+
+  @Override
+  public void readToVector(int readNumber, WritableColumnVector vector) throws 
IOException {
+    HeapArrayVector lcv = (HeapArrayVector) vector;
+    // before readBatch, initial the size of offsets & lengths as the default 
value,
+    // the actual size will be assigned in setChildrenInfo() after reading 
complete.
+    lcv.offsets = new long[VectorizedColumnBatch.DEFAULT_SIZE];
+    lcv.lengths = new long[VectorizedColumnBatch.DEFAULT_SIZE];
+    // Because the length of ListColumnVector.child can't be known now,
+    // the valueList will save all data for ListColumnVector temporary.
+    List<Object> valueList = new ArrayList<>();
+
+    LogicalType category = ((ArrayType) logicalType).getElementType();
+
+    // read the first row in parquet data page, this will be only happened 
once for this
+    // instance
+    if (isFirstRow) {
+      if (!fetchNextValue(category)) {
+        return;
+      }
+      isFirstRow = false;
+    }
+
+    int index = collectDataFromParquetPage(readNumber, lcv, valueList, 
category);
+
+    // Convert valueList to array for the ListColumnVector.child
+    fillColumnVector(category, lcv, valueList, index);
+  }
+
+  /**
+   * Reads a single value from parquet page, puts it into lastValue. Returns a 
boolean indicating
+   * if there is more values to read (true).
+   *
+   * @param category
+   * @return boolean
+   * @throws IOException
+   */
+  private boolean fetchNextValue(LogicalType category) throws IOException {
+    int left = readPageIfNeed();
+    if (left > 0) {
+      // get the values of repetition and definitionLevel
+      readRepetitionAndDefinitionLevels();
+      // read the data if it isn't null
+      if (definitionLevel == maxDefLevel) {
+        if (isCurrentPageDictionaryEncoded) {
+          lastValue = dataColumn.readValueDictionaryId();
+        } else {
+          lastValue = readPrimitiveTypedRow(category);
+        }
+      } else {
+        lastValue = null;
+      }
+      return true;
+    } else {
+      eof = true;
+      return false;
+    }
+  }
+
+  private int readPageIfNeed() throws IOException {
+    // Compute the number of values we want to read in this page.
+    int leftInPage = (int) (endOfPageValueCount - valuesRead);
+    if (leftInPage == 0) {
+      // no data left in current page, load data from new page
+      readPage();
+      leftInPage = (int) (endOfPageValueCount - valuesRead);
+    }
+    return leftInPage;
+  }
+
+  // Need to be in consistent with that 
VectorizedPrimitiveColumnReader#readBatchHelper
+  // TODO Reduce the duplicated code
+  private Object readPrimitiveTypedRow(LogicalType category) {
+    switch (category.getTypeRoot()) {
+      case CHAR:
+      case VARCHAR:
+      case BINARY:
+      case VARBINARY:
+        return dataColumn.readString();
+      case BOOLEAN:
+        return dataColumn.readBoolean();
+      case TIME_WITHOUT_TIME_ZONE:
+      case DATE:
+      case INTEGER:
+        return dataColumn.readInteger();
+      case TINYINT:
+        return dataColumn.readTinyInt();
+      case SMALLINT:
+        return dataColumn.readSmallInt();
+      case BIGINT:
+        return dataColumn.readLong();
+      case FLOAT:
+        return dataColumn.readFloat();
+      case DOUBLE:
+        return dataColumn.readDouble();
+      case DECIMAL:
+        switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
+          case INT32:
+            return dataColumn.readInteger();
+          case INT64:
+            return dataColumn.readLong();
+          case BINARY:
+          case FIXED_LEN_BYTE_ARRAY:
+            return dataColumn.readString();
+          default:
+            throw new AssertionError();
+        }
+      case TIMESTAMP_WITHOUT_TIME_ZONE:
+      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+        return dataColumn.readTimestamp();
+      default:
+        throw new RuntimeException("Unsupported type in the list: " + type);
+    }
+  }
+
+  private Object dictionaryDecodeValue(LogicalType category, Integer 
dictionaryValue) {
+    if (dictionaryValue == null) {
+      return null;
+    }
+
+    switch (category.getTypeRoot()) {
+      case CHAR:
+      case VARCHAR:
+      case BINARY:
+      case VARBINARY:
+        return dictionary.readString(dictionaryValue);
+      case DATE:
+      case TIME_WITHOUT_TIME_ZONE:
+      case INTEGER:
+        return dictionary.readInteger(dictionaryValue);
+      case BOOLEAN:
+        return dictionary.readBoolean(dictionaryValue) ? 1 : 0;
+      case DOUBLE:
+        return dictionary.readDouble(dictionaryValue);
+      case FLOAT:
+        return dictionary.readFloat(dictionaryValue);
+      case TINYINT:
+        return dictionary.readTinyInt(dictionaryValue);
+      case SMALLINT:
+        return dictionary.readSmallInt(dictionaryValue);
+      case BIGINT:
+        return dictionary.readLong(dictionaryValue);
+      case DECIMAL:
+        switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
+          case INT32:
+            return dictionary.readInteger(dictionaryValue);
+          case INT64:
+            return dictionary.readLong(dictionaryValue);
+          case FIXED_LEN_BYTE_ARRAY:
+          case BINARY:
+            return dictionary.readString(dictionaryValue);
+          default:
+            throw new AssertionError();
+        }
+      case TIMESTAMP_WITHOUT_TIME_ZONE:
+      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+        return dictionary.readTimestamp(dictionaryValue);
+      default:
+        throw new RuntimeException("Unsupported type in the list: " + type);
+    }
+  }
+
+  /**
+   * Collects data from a parquet page and returns the final row index where 
it stopped. The
+   * returned index can be equal to or less than total.
+   *
+   * @param total     maximum number of rows to collect
+   * @param lcv       column vector to do initial setup in data collection time
+   * @param valueList collection of values that will be fed into the vector 
later
+   * @param category
+   * @return int
+   * @throws IOException
+   */
+  private int collectDataFromParquetPage(
+      int total, HeapArrayVector lcv, List<Object> valueList, LogicalType 
category)
+      throws IOException {
+    int index = 0;
+    /*
+     * Here is a nested loop for collecting all values from a parquet page.
+     * A column of array type can be considered as a list of lists, so the two 
loops are as below:
+     * 1. The outer loop iterates on rows (index is a row index, so points to 
a row in the batch), e.g.:
+     * [0, 2, 3]    <- index: 0
+     * [NULL, 3, 4] <- index: 1
+     *
+     * 2. The inner loop iterates on values within a row (sets all data from 
parquet data page
+     * for an element in ListColumnVector), so fetchNextValue returns values 
one-by-one:
+     * 0, 2, 3, NULL, 3, 4
+     *
+     * As described below, the repetition level (repetitionLevel != 0)
+     * can be used to decide when we'll start to read values for the next list.
+     */
+    while (!eof && index < total) {
+      // add element to ListColumnVector one by one
+      lcv.offsets[index] = valueList.size();
+      /*
+       * Let's collect all values for a single list.
+       * Repetition level = 0 means that a new list started there in the 
parquet page,
+       * in that case, let's exit from the loop, and start to collect value 
for a new list.
+       */
+      do {
+        /*
+         * Definition level = 0 when a NULL value was returned instead of a 
list
+         * (this is not the same as a NULL value in of a list).
+         */
+        if (definitionLevel == 0) {
+          lcv.setNullAt(index);
+        }
+        valueList.add(
+            isCurrentPageDictionaryEncoded
+                ? dictionaryDecodeValue(category, (Integer) lastValue)
+                : lastValue);
+      } while (fetchNextValue(category) && (repetitionLevel != 0));
+
+      lcv.lengths[index] = valueList.size() - lcv.offsets[index];
+      index++;
+    }
+    return index;
+  }
+
+  /**
+   * The lengths & offsets will be initialized as default size (1024), it 
should be set to the
+   * actual size according to the element number.
+   */
+  private void setChildrenInfo(HeapArrayVector lcv, int itemNum, int 
elementNum) {
+    lcv.setSize(itemNum);
+    long[] lcvLength = new long[elementNum];
+    long[] lcvOffset = new long[elementNum];
+    System.arraycopy(lcv.lengths, 0, lcvLength, 0, elementNum);
+    System.arraycopy(lcv.offsets, 0, lcvOffset, 0, elementNum);
+    lcv.lengths = lcvLength;
+    lcv.offsets = lcvOffset;
+  }
+
+  private void fillColumnVector(
+      LogicalType category, HeapArrayVector lcv, List valueList, int 
elementNum) {
+    int total = valueList.size();
+    setChildrenInfo(lcv, total, elementNum);
+    switch (category.getTypeRoot()) {
+      case CHAR:
+      case VARCHAR:
+      case BINARY:
+      case VARBINARY:
+        lcv.child = new HeapBytesVector(total);
+        ((HeapBytesVector) lcv.child).reset();
+        for (int i = 0; i < valueList.size(); i++) {
+          byte[] src = ((List<byte[]>) valueList).get(i);
+          if (src == null) {
+            ((HeapBytesVector) lcv.child).setNullAt(i);
+          } else {
+            ((HeapBytesVector) lcv.child).appendBytes(i, src, 0, src.length);
+          }
+        }
+        break;
+      case BOOLEAN:
+        lcv.child = new HeapBooleanVector(total);
+        ((HeapBooleanVector) lcv.child).reset();
+        for (int i = 0; i < valueList.size(); i++) {
+          if (valueList.get(i) == null) {
+            ((HeapBooleanVector) lcv.child).setNullAt(i);
+          } else {
+            ((HeapBooleanVector) lcv.child).vector[i] =
+                ((List<Boolean>) valueList).get(i);
+          }
+        }
+        break;
+      case TINYINT:
+        lcv.child = new HeapByteVector(total);
+        ((HeapByteVector) lcv.child).reset();
+        for (int i = 0; i < valueList.size(); i++) {
+          if (valueList.get(i) == null) {
+            ((HeapByteVector) lcv.child).setNullAt(i);
+          } else {
+            ((HeapByteVector) lcv.child).vector[i] =
+                (byte) ((List<Integer>) valueList).get(i).intValue();
+          }
+        }
+        break;
+      case SMALLINT:
+        lcv.child = new HeapShortVector(total);
+        ((HeapShortVector) lcv.child).reset();
+        for (int i = 0; i < valueList.size(); i++) {
+          if (valueList.get(i) == null) {
+            ((HeapShortVector) lcv.child).setNullAt(i);
+          } else {
+            ((HeapShortVector) lcv.child).vector[i] =
+                (short) ((List<Integer>) valueList).get(i).intValue();
+          }
+        }
+        break;
+      case INTEGER:
+      case DATE:
+      case TIME_WITHOUT_TIME_ZONE:
+        lcv.child = new HeapIntVector(total);
+        ((HeapIntVector) lcv.child).reset();
+        for (int i = 0; i < valueList.size(); i++) {
+          if (valueList.get(i) == null) {
+            ((HeapIntVector) lcv.child).setNullAt(i);
+          } else {
+            ((HeapIntVector) lcv.child).vector[i] = ((List<Integer>) 
valueList).get(i);
+          }
+        }
+        break;
+      case FLOAT:
+        lcv.child = new HeapFloatVector(total);
+        ((HeapFloatVector) lcv.child).reset();
+        for (int i = 0; i < valueList.size(); i++) {
+          if (valueList.get(i) == null) {
+            ((HeapFloatVector) lcv.child).setNullAt(i);
+          } else {
+            ((HeapFloatVector) lcv.child).vector[i] = ((List<Float>) 
valueList).get(i);
+          }
+        }
+        break;
+      case BIGINT:
+        lcv.child = new HeapLongVector(total);
+        ((HeapLongVector) lcv.child).reset();
+        for (int i = 0; i < valueList.size(); i++) {
+          if (valueList.get(i) == null) {
+            ((HeapLongVector) lcv.child).setNullAt(i);
+          } else {
+            ((HeapLongVector) lcv.child).vector[i] = ((List<Long>) 
valueList).get(i);
+          }
+        }
+        break;
+      case DOUBLE:
+        lcv.child = new HeapDoubleVector(total);
+        ((HeapDoubleVector) lcv.child).reset();
+        for (int i = 0; i < valueList.size(); i++) {
+          if (valueList.get(i) == null) {
+            ((HeapDoubleVector) lcv.child).setNullAt(i);
+          } else {
+            ((HeapDoubleVector) lcv.child).vector[i] =
+                ((List<Double>) valueList).get(i);
+          }
+        }
+        break;
+      case TIMESTAMP_WITHOUT_TIME_ZONE:
+      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+        lcv.child = new HeapTimestampVector(total);
+        ((HeapTimestampVector) lcv.child).reset();
+        for (int i = 0; i < valueList.size(); i++) {
+          if (valueList.get(i) == null) {
+            ((HeapTimestampVector) lcv.child).setNullAt(i);
+          } else {
+            ((HeapTimestampVector) lcv.child)
+                .setTimestamp(i, ((List<TimestampData>) valueList).get(i));
+          }
+        }
+        break;
+      case DECIMAL:
+        PrimitiveType.PrimitiveTypeName primitiveTypeName =
+            descriptor.getPrimitiveType().getPrimitiveTypeName();
+        switch (primitiveTypeName) {
+          case INT32:
+            lcv.child = new ParquetDecimalVector(new HeapIntVector(total));
+            ((HeapIntVector) ((ParquetDecimalVector) 
lcv.child).vector).reset();
+            for (int i = 0; i < valueList.size(); i++) {
+              if (valueList.get(i) == null) {
+                ((HeapIntVector) ((ParquetDecimalVector) lcv.child).vector)
+                    .setNullAt(i);
+              } else {
+                ((HeapIntVector) ((ParquetDecimalVector) lcv.child).vector)
+                    .vector[i] =
+                    ((List<Integer>) valueList).get(i);
+              }
+            }
+            break;
+          case INT64:
+            lcv.child = new ParquetDecimalVector(new HeapLongVector(total));
+            ((HeapLongVector) ((ParquetDecimalVector) 
lcv.child).vector).reset();
+            for (int i = 0; i < valueList.size(); i++) {
+              if (valueList.get(i) == null) {
+                ((HeapLongVector) ((ParquetDecimalVector) lcv.child).vector)
+                    .setNullAt(i);
+              } else {
+                ((HeapLongVector) ((ParquetDecimalVector) lcv.child).vector)
+                    .vector[i] =
+                    ((List<Long>) valueList).get(i);
+              }
+            }
+            break;
+          default:
+            lcv.child = new ParquetDecimalVector(new HeapBytesVector(total));
+            ((HeapBytesVector) ((ParquetDecimalVector) 
lcv.child).vector).reset();
+            for (int i = 0; i < valueList.size(); i++) {
+              byte[] src = ((List<byte[]>) valueList).get(i);
+              if (valueList.get(i) == null) {
+                ((HeapBytesVector) ((ParquetDecimalVector) lcv.child).vector)
+                    .setNullAt(i);
+              } else {
+                ((HeapBytesVector) ((ParquetDecimalVector) lcv.child).vector)
+                    .appendBytes(i, src, 0, src.length);
+              }
+            }
+            break;
+        }
+        break;
+      default:
+        throw new RuntimeException("Unsupported type in the list: " + type);
+    }
+  }
+}
+
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/BaseVectorizedColumnReader.java
 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/BaseVectorizedColumnReader.java
new file mode 100644
index 0000000..073c704
--- /dev/null
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/BaseVectorizedColumnReader.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector.reader;
+
+import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+
+/**
+ * Abstract {@link ColumnReader}. part of the code is referred from Apache 
Hive and Apache Parquet.
+ */
+public abstract class BaseVectorizedColumnReader implements 
ColumnReader<WritableColumnVector> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseVectorizedColumnReader.class);
+
+  protected boolean isUtcTimestamp;
+
+  /**
+   * Total number of values read.
+   */
+  protected long valuesRead;
+
+  /**
+   * value that indicates the end of the current page. That is, if valuesRead 
==
+   * endOfPageValueCount, we are at the end of the page.
+   */
+  protected long endOfPageValueCount;
+
+  /**
+   * The dictionary, if this column has dictionary encoding.
+   */
+  protected final ParquetDataColumnReader dictionary;
+
+  /**
+   * If true, the current page is dictionary encoded.
+   */
+  protected boolean isCurrentPageDictionaryEncoded;
+
+  /**
+   * Maximum definition level for this column.
+   */
+  protected final int maxDefLevel;
+
+  protected int definitionLevel;
+  protected int repetitionLevel;
+
+  /**
+   * Repetition/Definition/Value readers.
+   */
+  protected IntIterator repetitionLevelColumn;
+
+  protected IntIterator definitionLevelColumn;
+  protected ParquetDataColumnReader dataColumn;
+
+  /**
+   * Total values in the current page.
+   */
+  protected int pageValueCount;
+
+  protected final PageReader pageReader;
+  protected final ColumnDescriptor descriptor;
+  protected final Type type;
+  protected final LogicalType logicalType;
+
+  public BaseVectorizedColumnReader(
+      ColumnDescriptor descriptor,
+      PageReader pageReader,
+      boolean isUtcTimestamp,
+      Type parquetType,
+      LogicalType logicalType)
+      throws IOException {
+    this.descriptor = descriptor;
+    this.type = parquetType;
+    this.pageReader = pageReader;
+    this.maxDefLevel = descriptor.getMaxDefinitionLevel();
+    this.isUtcTimestamp = isUtcTimestamp;
+    this.logicalType = logicalType;
+
+    DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
+    if (dictionaryPage != null) {
+      try {
+        this.dictionary =
+            
ParquetDataColumnReaderFactory.getDataColumnReaderByTypeOnDictionary(
+                parquetType.asPrimitiveType(),
+                dictionaryPage
+                    .getEncoding()
+                    .initDictionary(descriptor, dictionaryPage),
+                isUtcTimestamp);
+        this.isCurrentPageDictionaryEncoded = true;
+      } catch (IOException e) {
+        throw new IOException("could not decode the dictionary for " + 
descriptor, e);
+      }
+    } else {
+      this.dictionary = null;
+      this.isCurrentPageDictionaryEncoded = false;
+    }
+  }
+
+  protected void readRepetitionAndDefinitionLevels() {
+    repetitionLevel = repetitionLevelColumn.nextInt();
+    definitionLevel = definitionLevelColumn.nextInt();
+    valuesRead++;
+  }
+
+  protected void readPage() throws IOException {
+    DataPage page = pageReader.readPage();
+
+    if (page == null) {
+      return;
+    }
+
+    page.accept(
+        new DataPage.Visitor<Void>() {
+          @Override
+          public Void visit(DataPageV1 dataPageV1) {
+            readPageV1(dataPageV1);
+            return null;
+          }
+
+          @Override
+          public Void visit(DataPageV2 dataPageV2) {
+            readPageV2(dataPageV2);
+            return null;
+          }
+        });
+  }
+
+  private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in, 
int valueCount)
+      throws IOException {
+    this.pageValueCount = valueCount;
+    this.endOfPageValueCount = valuesRead + pageValueCount;
+    if (dataEncoding.usesDictionary()) {
+      this.dataColumn = null;
+      if (dictionary == null) {
+        throw new IOException(
+            "could not read page in col "
+                + descriptor
+                + " as the dictionary was missing for encoding "
+                + dataEncoding);
+      }
+      dataColumn =
+          ParquetDataColumnReaderFactory.getDataColumnReaderByType(
+              type.asPrimitiveType(),
+              dataEncoding.getDictionaryBasedValuesReader(
+                  descriptor, VALUES, dictionary.getDictionary()),
+              isUtcTimestamp);
+      this.isCurrentPageDictionaryEncoded = true;
+    } else {
+      dataColumn =
+          ParquetDataColumnReaderFactory.getDataColumnReaderByType(
+              type.asPrimitiveType(),
+              dataEncoding.getValuesReader(descriptor, VALUES),
+              isUtcTimestamp);
+      this.isCurrentPageDictionaryEncoded = false;
+    }
+
+    try {
+      dataColumn.initFromPage(pageValueCount, in);
+    } catch (IOException e) {
+      throw new IOException("could not read page in col " + descriptor, e);
+    }
+  }
+
+  private void readPageV1(DataPageV1 page) {
+    ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, 
REPETITION_LEVEL);
+    ValuesReader dlReader = page.getDlEncoding().getValuesReader(descriptor, 
DEFINITION_LEVEL);
+    this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
+    this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
+    try {
+      BytesInput bytes = page.getBytes();
+      LOG.debug("page size " + bytes.size() + " bytes and " + pageValueCount + 
" records");
+      ByteBufferInputStream in = bytes.toInputStream();
+      LOG.debug("reading repetition levels at " + in.position());
+      rlReader.initFromPage(pageValueCount, in);
+      LOG.debug("reading definition levels at " + in.position());
+      dlReader.initFromPage(pageValueCount, in);
+      LOG.debug("reading data at " + in.position());
+      initDataReader(page.getValueEncoding(), in, page.getValueCount());
+    } catch (IOException e) {
+      throw new ParquetDecodingException(
+          "could not read page " + page + " in col " + descriptor, e);
+    }
+  }
+
+  private void readPageV2(DataPageV2 page) {
+    this.pageValueCount = page.getValueCount();
+    this.repetitionLevelColumn =
+        newRLEIterator(descriptor.getMaxRepetitionLevel(), 
page.getRepetitionLevels());
+    this.definitionLevelColumn =
+        newRLEIterator(descriptor.getMaxDefinitionLevel(), 
page.getDefinitionLevels());
+    try {
+      LOG.debug(
+          "page data size "
+              + page.getData().size()
+              + " bytes and "
+              + pageValueCount
+              + " records");
+      initDataReader(
+          page.getDataEncoding(), page.getData().toInputStream(), 
page.getValueCount());
+    } catch (IOException e) {
+      throw new ParquetDecodingException(
+          "could not read page " + page + " in col " + descriptor, e);
+    }
+  }
+
+  private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) {
+    try {
+      if (maxLevel == 0) {
+        return new NullIntIterator();
+      }
+      return new RLEIntIterator(
+          new RunLengthBitPackingHybridDecoder(
+              BytesUtils.getWidthFromMaxInt(maxLevel),
+              new ByteArrayInputStream(bytes.toByteArray())));
+    } catch (IOException e) {
+      throw new ParquetDecodingException(
+          "could not read levels in page for col " + descriptor, e);
+    }
+  }
+
+  /**
+   * Utility classes to abstract over different way to read ints with 
different encodings.
+   */
+  abstract static class IntIterator {
+    abstract int nextInt();
+  }
+
+  /**
+   * read ints from {@link ValuesReader}.
+   */
+  protected static final class ValuesReaderIntIterator extends IntIterator {
+    ValuesReader delegate;
+
+    public ValuesReaderIntIterator(ValuesReader delegate) {
+      this.delegate = delegate;
+    }
+
+    @Override
+    int nextInt() {
+      return delegate.readInteger();
+    }
+  }
+
+  /**
+   * read ints from {@link RunLengthBitPackingHybridDecoder}.
+   */
+  protected static final class RLEIntIterator extends IntIterator {
+    RunLengthBitPackingHybridDecoder delegate;
+
+    public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) {
+      this.delegate = delegate;
+    }
+
+    @Override
+    int nextInt() {
+      try {
+        return delegate.readInt();
+      } catch (IOException e) {
+        throw new ParquetDecodingException(e);
+      }
+    }
+  }
+
+  /**
+   * return zero.
+   */
+  protected static final class NullIntIterator extends IntIterator {
+    @Override
+    int nextInt() {
+      return 0;
+    }
+  }
+}
+
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/MapColumnReader.java
 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/MapColumnReader.java
new file mode 100644
index 0000000..015a867
--- /dev/null
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/MapColumnReader.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector.reader;
+
+import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
+import org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
+
+import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
+import org.apache.flink.table.data.vector.ColumnVector;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+
+import java.io.IOException;
+
+/**
+ * Map {@link ColumnReader}.
+ */
+public class MapColumnReader implements ColumnReader<WritableColumnVector> {
+
+  private final LogicalType logicalType;
+  private final ArrayColumnReader keyReader;
+  private final ArrayColumnReader valueReader;
+
+  public MapColumnReader(
+      ArrayColumnReader keyReader, ArrayColumnReader valueReader, LogicalType 
logicalType) {
+    this.keyReader = keyReader;
+    this.valueReader = valueReader;
+    this.logicalType = logicalType;
+  }
+
+  public void readBatch(int total, ColumnVector column) throws IOException {
+    HeapMapColumnVector mapColumnVector = (HeapMapColumnVector) column;
+    MapType mapType = (MapType) logicalType;
+    // initialize 2 ListColumnVector for keys and values
+    HeapArrayVector keyArrayColumnVector = new HeapArrayVector(total);
+    HeapArrayVector valueArrayColumnVector = new HeapArrayVector(total);
+    // read the keys and values
+    keyReader.readToVector(total, keyArrayColumnVector);
+    valueReader.readToVector(total, valueArrayColumnVector);
+
+    // set the related attributes according to the keys and values
+    mapColumnVector.setKeys(keyArrayColumnVector.child);
+    mapColumnVector.setValues(valueArrayColumnVector.child);
+    mapColumnVector.setOffsets(keyArrayColumnVector.offsets);
+    mapColumnVector.setLengths(keyArrayColumnVector.lengths);
+    mapColumnVector.setSize(keyArrayColumnVector.getSize());
+    for (int i = 0; i < keyArrayColumnVector.getLen(); i++) {
+      if (keyArrayColumnVector.isNullAt(i)) {
+        mapColumnVector.setNullAt(i);
+      }
+    }
+  }
+
+  @Override
+  public void readToVector(int readNumber, WritableColumnVector vector) throws 
IOException {
+    readBatch(readNumber, vector);
+  }
+}
+
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetDataColumnReader.java
 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetDataColumnReader.java
new file mode 100644
index 0000000..e96cf22
--- /dev/null
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetDataColumnReader.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector.reader;
+
+import org.apache.flink.table.data.TimestampData;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.column.Dictionary;
+
+import java.io.IOException;
+
+/**
+ * The interface to wrap the underlying Parquet dictionary and non dictionary 
encoded page reader.
+ */
+public interface ParquetDataColumnReader {
+
+  /**
+   * Initialize the reader by page data.
+   *
+   * @param valueCount value count
+   * @param in         page data
+   * @throws IOException
+   */
+  void initFromPage(int valueCount, ByteBufferInputStream in) throws 
IOException;
+
+  /**
+   * @return the next Dictionary ID from the page
+   */
+  int readValueDictionaryId();
+
+  /**
+   * @return the next Long from the page
+   */
+  long readLong();
+
+  /**
+   * @return the next Integer from the page
+   */
+  int readInteger();
+
+  /**
+   * @return the next SmallInt from the page
+   */
+  int readSmallInt();
+
+  /**
+   * @return the next TinyInt from the page
+   */
+  int readTinyInt();
+
+  /**
+   * @return the next Float from the page
+   */
+  float readFloat();
+
+  /**
+   * @return the next Boolean from the page
+   */
+  boolean readBoolean();
+
+  /**
+   * @return the next String from the page
+   */
+  byte[] readString();
+
+  /**
+   * @return the next Varchar from the page
+   */
+  byte[] readVarchar();
+
+  /**
+   * @return the next Char from the page
+   */
+  byte[] readChar();
+
+  /**
+   * @return the next Bytes from the page
+   */
+  byte[] readBytes();
+
+  /**
+   * @return the next Decimal from the page
+   */
+  byte[] readDecimal();
+
+  /**
+   * @return the next Double from the page
+   */
+  double readDouble();
+
+  /**
+   * @return the next TimestampData from the page
+   */
+  TimestampData readTimestamp();
+
+  /**
+   * @return is data valid
+   */
+  boolean isValid();
+
+  /**
+   * @return the underlying dictionary if current reader is dictionary encoded
+   */
+  Dictionary getDictionary();
+
+  /**
+   * @param id in dictionary
+   * @return the Bytes from the dictionary by id
+   */
+  byte[] readBytes(int id);
+
+  /**
+   * @param id in dictionary
+   * @return the Float from the dictionary by id
+   */
+  float readFloat(int id);
+
+  /**
+   * @param id in dictionary
+   * @return the Double from the dictionary by id
+   */
+  double readDouble(int id);
+
+  /**
+   * @param id in dictionary
+   * @return the Integer from the dictionary by id
+   */
+  int readInteger(int id);
+
+  /**
+   * @param id in dictionary
+   * @return the Long from the dictionary by id
+   */
+  long readLong(int id);
+
+  /**
+   * @param id in dictionary
+   * @return the Small Int from the dictionary by id
+   */
+  int readSmallInt(int id);
+
+  /**
+   * @param id in dictionary
+   * @return the tiny int from the dictionary by id
+   */
+  int readTinyInt(int id);
+
+  /**
+   * @param id in dictionary
+   * @return the Boolean from the dictionary by id
+   */
+  boolean readBoolean(int id);
+
+  /**
+   * @param id in dictionary
+   * @return the Decimal from the dictionary by id
+   */
+  byte[] readDecimal(int id);
+
+  /**
+   * @param id in dictionary
+   * @return the TimestampData from the dictionary by id
+   */
+  TimestampData readTimestamp(int id);
+
+  /**
+   * @param id in dictionary
+   * @return the String from the dictionary by id
+   */
+  byte[] readString(int id);
+
+  /**
+   * @param id in dictionary
+   * @return the Varchar from the dictionary by id
+   */
+  byte[] readVarchar(int id);
+
+  /**
+   * @param id in dictionary
+   * @return the Char from the dictionary by id
+   */
+  byte[] readChar(int id);
+}
+
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetDataColumnReaderFactory.java
 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetDataColumnReaderFactory.java
new file mode 100644
index 0000000..861d5cb
--- /dev/null
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetDataColumnReaderFactory.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector.reader;
+
+import org.apache.flink.table.data.TimestampData;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveType;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.sql.Timestamp;
+
+import static 
org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.JULIAN_EPOCH_OFFSET_DAYS;
+import static 
org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.MILLIS_IN_DAY;
+import static 
org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.NANOS_PER_MILLISECOND;
+import static 
org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.NANOS_PER_SECOND;
+
+/**
+ * Parquet file has self-describing schema which may differ from the user 
required schema (e.g.
+ * schema evolution). This factory is used to retrieve user required typed 
data via corresponding
+ * reader which reads the underlying data.
+ */
+public final class ParquetDataColumnReaderFactory {
+
+  private ParquetDataColumnReaderFactory() {
+  }
+
+  /**
+   * default reader for {@link ParquetDataColumnReader}.
+   */
+  public static class DefaultParquetDataColumnReader implements 
ParquetDataColumnReader {
+    protected ValuesReader valuesReader;
+    protected Dictionary dict;
+
+    // After the data is read in the parquet type, isValid will be set to true 
if the data can
+    // be returned in the type defined in HMS.  Otherwise isValid is set to 
false.
+    boolean isValid = true;
+
+    public DefaultParquetDataColumnReader(ValuesReader valuesReader) {
+      this.valuesReader = valuesReader;
+    }
+
+    public DefaultParquetDataColumnReader(Dictionary dict) {
+      this.dict = dict;
+    }
+
+    @Override
+    public void initFromPage(int i, ByteBufferInputStream in) throws 
IOException {
+      valuesReader.initFromPage(i, in);
+    }
+
+    @Override
+    public boolean readBoolean() {
+      return valuesReader.readBoolean();
+    }
+
+    @Override
+    public boolean readBoolean(int id) {
+      return dict.decodeToBoolean(id);
+    }
+
+    @Override
+    public byte[] readString(int id) {
+      return dict.decodeToBinary(id).getBytesUnsafe();
+    }
+
+    @Override
+    public byte[] readString() {
+      return valuesReader.readBytes().getBytesUnsafe();
+    }
+
+    @Override
+    public byte[] readVarchar() {
+      // we need to enforce the size here even the types are the same
+      return valuesReader.readBytes().getBytesUnsafe();
+    }
+
+    @Override
+    public byte[] readVarchar(int id) {
+      return dict.decodeToBinary(id).getBytesUnsafe();
+    }
+
+    @Override
+    public byte[] readChar() {
+      return valuesReader.readBytes().getBytesUnsafe();
+    }
+
+    @Override
+    public byte[] readChar(int id) {
+      return dict.decodeToBinary(id).getBytesUnsafe();
+    }
+
+    @Override
+    public byte[] readBytes() {
+      return valuesReader.readBytes().getBytesUnsafe();
+    }
+
+    @Override
+    public byte[] readBytes(int id) {
+      return dict.decodeToBinary(id).getBytesUnsafe();
+    }
+
+    @Override
+    public byte[] readDecimal() {
+      return valuesReader.readBytes().getBytesUnsafe();
+    }
+
+    @Override
+    public byte[] readDecimal(int id) {
+      return dict.decodeToBinary(id).getBytesUnsafe();
+    }
+
+    @Override
+    public float readFloat() {
+      return valuesReader.readFloat();
+    }
+
+    @Override
+    public float readFloat(int id) {
+      return dict.decodeToFloat(id);
+    }
+
+    @Override
+    public double readDouble() {
+      return valuesReader.readDouble();
+    }
+
+    @Override
+    public double readDouble(int id) {
+      return dict.decodeToDouble(id);
+    }
+
+    @Override
+    public TimestampData readTimestamp() {
+      throw new RuntimeException("Unsupported operation");
+    }
+
+    @Override
+    public TimestampData readTimestamp(int id) {
+      throw new RuntimeException("Unsupported operation");
+    }
+
+    @Override
+    public int readInteger() {
+      return valuesReader.readInteger();
+    }
+
+    @Override
+    public int readInteger(int id) {
+      return dict.decodeToInt(id);
+    }
+
+    @Override
+    public boolean isValid() {
+      return isValid;
+    }
+
+    @Override
+    public long readLong(int id) {
+      return dict.decodeToLong(id);
+    }
+
+    @Override
+    public long readLong() {
+      return valuesReader.readLong();
+    }
+
+    @Override
+    public int readSmallInt() {
+      return valuesReader.readInteger();
+    }
+
+    @Override
+    public int readSmallInt(int id) {
+      return dict.decodeToInt(id);
+    }
+
+    @Override
+    public int readTinyInt() {
+      return valuesReader.readInteger();
+    }
+
+    @Override
+    public int readTinyInt(int id) {
+      return dict.decodeToInt(id);
+    }
+
+    @Override
+    public int readValueDictionaryId() {
+      return valuesReader.readValueDictionaryId();
+    }
+
+    public void skip() {
+      valuesReader.skip();
+    }
+
+    @Override
+    public Dictionary getDictionary() {
+      return dict;
+    }
+  }
+
+  /**
+   * The reader who reads from the underlying Timestamp value value.
+   */
+  public static class TypesFromInt96PageReader extends 
DefaultParquetDataColumnReader {
+    private final boolean isUtcTimestamp;
+
+    public TypesFromInt96PageReader(ValuesReader realReader, boolean 
isUtcTimestamp) {
+      super(realReader);
+      this.isUtcTimestamp = isUtcTimestamp;
+    }
+
+    public TypesFromInt96PageReader(Dictionary dict, boolean isUtcTimestamp) {
+      super(dict);
+      this.isUtcTimestamp = isUtcTimestamp;
+    }
+
+    private TimestampData convert(Binary binary) {
+      ByteBuffer buf = binary.toByteBuffer();
+      buf.order(ByteOrder.LITTLE_ENDIAN);
+      long timeOfDayNanos = buf.getLong();
+      int julianDay = buf.getInt();
+      return int96ToTimestamp(isUtcTimestamp, timeOfDayNanos, julianDay);
+    }
+
+    @Override
+    public TimestampData readTimestamp(int id) {
+      return convert(dict.decodeToBinary(id));
+    }
+
+    @Override
+    public TimestampData readTimestamp() {
+      return convert(valuesReader.readBytes());
+    }
+  }
+
+  private static ParquetDataColumnReader getDataColumnReaderByTypeHelper(
+      boolean isDictionary,
+      PrimitiveType parquetType,
+      Dictionary dictionary,
+      ValuesReader valuesReader,
+      boolean isUtcTimestamp) {
+    if (parquetType.getPrimitiveTypeName() == 
PrimitiveType.PrimitiveTypeName.INT96) {
+      return isDictionary
+          ? new TypesFromInt96PageReader(dictionary, isUtcTimestamp)
+          : new TypesFromInt96PageReader(valuesReader, isUtcTimestamp);
+    } else {
+      return isDictionary
+          ? new DefaultParquetDataColumnReader(dictionary)
+          : new DefaultParquetDataColumnReader(valuesReader);
+    }
+  }
+
+  public static ParquetDataColumnReader getDataColumnReaderByTypeOnDictionary(
+      PrimitiveType parquetType, Dictionary realReader, boolean 
isUtcTimestamp) {
+    return getDataColumnReaderByTypeHelper(true, parquetType, realReader, 
null, isUtcTimestamp);
+  }
+
+  public static ParquetDataColumnReader getDataColumnReaderByType(
+      PrimitiveType parquetType, ValuesReader realReader, boolean 
isUtcTimestamp) {
+    return getDataColumnReaderByTypeHelper(
+        false, parquetType, null, realReader, isUtcTimestamp);
+  }
+
+  private static TimestampData int96ToTimestamp(
+      boolean utcTimestamp, long nanosOfDay, int julianDay) {
+    long millisecond = julianDayToMillis(julianDay) + (nanosOfDay / 
NANOS_PER_MILLISECOND);
+
+    if (utcTimestamp) {
+      int nanoOfMillisecond = (int) (nanosOfDay % NANOS_PER_MILLISECOND);
+      return TimestampData.fromEpochMillis(millisecond, nanoOfMillisecond);
+    } else {
+      Timestamp timestamp = new Timestamp(millisecond);
+      timestamp.setNanos((int) (nanosOfDay % NANOS_PER_SECOND));
+      return TimestampData.fromTimestamp(timestamp);
+    }
+  }
+
+  private static long julianDayToMillis(int julianDay) {
+    return (julianDay - JULIAN_EPOCH_OFFSET_DAYS) * MILLIS_IN_DAY;
+  }
+}
+
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java
 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java
new file mode 100644
index 0000000..39ebb90
--- /dev/null
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector.reader;
+
+import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
+
+import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Row {@link ColumnReader}.
+ */
+public class RowColumnReader implements ColumnReader<WritableColumnVector> {
+
+  private final List<ColumnReader> fieldReaders;
+
+  public RowColumnReader(List<ColumnReader> fieldReaders) {
+    this.fieldReaders = fieldReaders;
+  }
+
+  @Override
+  public void readToVector(int readNumber, WritableColumnVector vector) throws 
IOException {
+    HeapRowColumnVector rowColumnVector = (HeapRowColumnVector) vector;
+    WritableColumnVector[] vectors = rowColumnVector.vectors;
+    for (int i = 0; i < vectors.length; i++) {
+      fieldReaders.get(i).readToVector(readNumber, vectors[i]);
+
+      for (int j = 0; j < readNumber; j++) {
+        boolean isNull = (i == 0)
+            ? vectors[i].isNullAt(j)
+            : rowColumnVector.isNullAt(j) && vectors[i].isNullAt(j);
+        if (isNull) {
+          rowColumnVector.setNullAt(j);
+        }
+      }
+    }
+  }
+}
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java 
b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index 9eef2fe..9e0da3a 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -1081,6 +1081,62 @@ public class HoodieDataSourceITCase extends 
AbstractTestBase {
     assertRowsEquals(result, expected);
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = {"insert", "upsert", "bulk_insert"})
+  void testParquetComplexTypes(String operation) {
+    TableEnvironment tableEnv = batchTableEnv;
+
+    String hoodieTableDDL = sql("t1")
+        .field("f_int int")
+        .field("f_array array<varchar(10)>")
+        .field("f_map map<varchar(20), int>")
+        .field("f_row row(f_row_f0 int, f_row_f1 varchar(10))")
+        .pkField("f_int")
+        .noPartition()
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.OPERATION, operation)
+        .end();
+    tableEnv.executeSql(hoodieTableDDL);
+
+    execInsertSql(tableEnv, TestSQL.COMPLEX_TYPE_INSERT_T1);
+
+    List<Row> result = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+    final String expected = "["
+        + "+I[1, [abc1, def1], {abc1=1, def1=3}, +I[1, abc1]], "
+        + "+I[2, [abc2, def2], {def2=3, abc2=1}, +I[2, abc2]], "
+        + "+I[3, [abc3, def3], {def3=3, abc3=1}, +I[3, abc3]]]";
+    assertRowsEquals(result, expected);
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = {"insert", "upsert", "bulk_insert"})
+  void testParquetComplexNestedRowTypes(String operation) {
+    TableEnvironment tableEnv = batchTableEnv;
+
+    String hoodieTableDDL = sql("t1")
+        .field("f_int int")
+        .field("f_array array<varchar(10)>")
+        .field("f_map map<varchar(20), int>")
+        .field("f_row row(f_nested_array array<varchar(10)>, f_nested_row 
row(f_row_f0 int, f_row_f1 varchar(10)))")
+        .pkField("f_int")
+        .noPartition()
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.OPERATION, operation)
+        .end();
+    tableEnv.executeSql(hoodieTableDDL);
+
+    execInsertSql(tableEnv, TestSQL.COMPLEX_NESTED_ROW_TYPE_INSERT_T1);
+
+    List<Row> result = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+    final String expected = "["
+        + "+I[1, [abc1, def1], {abc1=1, def1=3}, +I[[abc1, def1], +I[1, 
abc1]]], "
+        + "+I[2, [abc2, def2], {def2=3, abc2=1}, +I[[abc2, def2], +I[2, 
abc2]]], "
+        + "+I[3, [abc3, def3], {def3=3, abc3=1}, +I[[abc3, def3], +I[3, 
abc3]]]]";
+    assertRowsEquals(result, expected);
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java 
b/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
index 9dc78aa..595d142 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
@@ -51,4 +51,14 @@ public class TestSQL {
       + "('id9','Jane',19,TIMESTAMP '1970-01-01 00:00:06','par3'),\n"
       + "('id10','Ella',38,TIMESTAMP '1970-01-01 00:00:07','par4'),\n"
       + "('id11','Phoebe',52,TIMESTAMP '1970-01-01 00:00:08','par4')";
+
+  public static final String COMPLEX_TYPE_INSERT_T1 = "insert into t1 values\n"
+      + "(1, array['abc1', 'def1'], map['abc1', 1, 'def1', 3], row(1, 
'abc1')),\n"
+      + "(2, array['abc2', 'def2'], map['abc2', 1, 'def2', 3], row(2, 
'abc2')),\n"
+      + "(3, array['abc3', 'def3'], map['abc3', 1, 'def3', 3], row(3, 
'abc3'))";
+
+  public static final String COMPLEX_NESTED_ROW_TYPE_INSERT_T1 = "insert into 
t1 values\n"
+      + "(1, array['abc1', 'def1'], map['abc1', 1, 'def1', 3], 
row(array['abc1', 'def1'], row(1, 'abc1'))),\n"
+      + "(2, array['abc2', 'def2'], map['abc2', 1, 'def2', 3], 
row(array['abc2', 'def2'], row(2, 'abc2'))),\n"
+      + "(3, array['abc3', 'def3'], map['abc3', 1, 'def3', 3], 
row(array['abc3', 'def3'], row(3, 'abc3')))";
 }

Reply via email to