This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 674c149 [HUDI-3083] Support component data types for flink
bulk_insert (#4470)
674c149 is described below
commit 674c1492348b5b2a93358c9dd51a1adfe6a8ecf2
Author: Ron <[email protected]>
AuthorDate: Thu Dec 30 11:15:54 2021 +0800
[HUDI-3083] Support component data types for flink bulk_insert (#4470)
* [HUDI-3083] Support component data types for flink bulk_insert
* add nested row type test
---
.../storage/row/parquet/ParquetRowDataWriter.java | 326 +++++++++++---
.../row/parquet/ParquetSchemaConverter.java | 41 ++
.../row/parquet/TestParquetSchemaConverter.java | 74 ++++
.../hudi/sink/append/AppendWriteFunction.java | 19 +-
.../format/cow/ParquetColumnarRowSplitReader.java | 25 +-
.../table/format/cow/ParquetDecimalVector.java | 4 +-
.../table/format/cow/ParquetSplitReaderUtil.java | 148 ++++++-
.../table/format/cow/data/ColumnarArrayData.java | 272 ++++++++++++
.../table/format/cow/data/ColumnarMapData.java | 75 ++++
.../table/format/cow/data/ColumnarRowData.java | 232 ++++++++++
.../table/format/cow/vector/HeapArrayVector.java | 71 ++++
.../format/cow/vector/HeapMapColumnVector.java | 79 ++++
.../format/cow/vector/HeapRowColumnVector.java | 45 ++
.../table/format/cow/vector/MapColumnVector.java | 29 ++
.../table/format/cow/vector/RowColumnVector.java | 30 ++
.../format/cow/vector/VectorizedColumnBatch.java | 148 +++++++
.../cow/vector/reader/ArrayColumnReader.java | 473 +++++++++++++++++++++
.../vector/reader/BaseVectorizedColumnReader.java | 313 ++++++++++++++
.../format/cow/vector/reader/MapColumnReader.java | 76 ++++
.../cow/vector/reader/ParquetDataColumnReader.java | 199 +++++++++
.../reader/ParquetDataColumnReaderFactory.java | 304 +++++++++++++
.../format/cow/vector/reader/RowColumnReader.java | 57 +++
.../apache/hudi/table/HoodieDataSourceITCase.java | 56 +++
.../test/java/org/apache/hudi/utils/TestSQL.java | 10 +
24 files changed, 3031 insertions(+), 75 deletions(-)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java
index 1c8b988..3d9524e 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java
@@ -18,19 +18,22 @@
package org.apache.hudi.io.storage.row.parquet;
+import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.DecimalDataUtils;
+import org.apache.flink.table.data.MapData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.util.Preconditions;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.GroupType;
-import org.apache.parquet.schema.Type;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@@ -46,7 +49,8 @@ import static
org.apache.flink.formats.parquet.vector.reader.TimestampColumnRead
/**
* Writes a record to the Parquet API with the expected schema in order to be
written to a file.
*
- * <p>Reference org.apache.flink.formats.parquet.row.ParquetRowDataWriter to
support timestamp of INT64 8 bytes.
+ * <p>Reference {@code
org.apache.flink.formats.parquet.row.ParquetRowDataWriter}
+ * to support timestamp of INT64 8 bytes and complex data types.
*/
public class ParquetRowDataWriter {
@@ -67,7 +71,7 @@ public class ParquetRowDataWriter {
this.filedWriters = new FieldWriter[rowType.getFieldCount()];
this.fieldNames = rowType.getFieldNames().toArray(new String[0]);
for (int i = 0; i < rowType.getFieldCount(); i++) {
- this.filedWriters[i] = createWriter(rowType.getTypeAt(i),
schema.getType(i));
+ this.filedWriters[i] = createWriter(rowType.getTypeAt(i));
}
}
@@ -91,59 +95,75 @@ public class ParquetRowDataWriter {
recordConsumer.endMessage();
}
- private FieldWriter createWriter(LogicalType t, Type type) {
- if (type.isPrimitive()) {
- switch (t.getTypeRoot()) {
- case CHAR:
- case VARCHAR:
- return new StringWriter();
- case BOOLEAN:
- return new BooleanWriter();
- case BINARY:
- case VARBINARY:
- return new BinaryWriter();
- case DECIMAL:
- DecimalType decimalType = (DecimalType) t;
- return createDecimalWriter(decimalType.getPrecision(),
decimalType.getScale());
- case TINYINT:
- return new ByteWriter();
- case SMALLINT:
- return new ShortWriter();
- case DATE:
- case TIME_WITHOUT_TIME_ZONE:
- case INTEGER:
- return new IntWriter();
- case BIGINT:
- return new LongWriter();
- case FLOAT:
- return new FloatWriter();
- case DOUBLE:
- return new DoubleWriter();
- case TIMESTAMP_WITHOUT_TIME_ZONE:
- TimestampType timestampType = (TimestampType) t;
- if (timestampType.getPrecision() == 3) {
- return new Timestamp64Writer();
- } else {
- return new Timestamp96Writer(timestampType.getPrecision());
- }
- case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
- LocalZonedTimestampType localZonedTimestampType =
(LocalZonedTimestampType) t;
- if (localZonedTimestampType.getPrecision() == 3) {
- return new Timestamp64Writer();
- } else {
- return new
Timestamp96Writer(localZonedTimestampType.getPrecision());
- }
- default:
- throw new UnsupportedOperationException("Unsupported type: " + type);
- }
- } else {
- throw new IllegalArgumentException("Unsupported data type: " + t);
+ private FieldWriter createWriter(LogicalType t) {
+ switch (t.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ return new StringWriter();
+ case BOOLEAN:
+ return new BooleanWriter();
+ case BINARY:
+ case VARBINARY:
+ return new BinaryWriter();
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) t;
+ return createDecimalWriter(decimalType.getPrecision(),
decimalType.getScale());
+ case TINYINT:
+ return new ByteWriter();
+ case SMALLINT:
+ return new ShortWriter();
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ case INTEGER:
+ return new IntWriter();
+ case BIGINT:
+ return new LongWriter();
+ case FLOAT:
+ return new FloatWriter();
+ case DOUBLE:
+ return new DoubleWriter();
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ TimestampType timestampType = (TimestampType) t;
+ if (timestampType.getPrecision() == 3) {
+ return new Timestamp64Writer();
+ } else {
+ return new Timestamp96Writer(timestampType.getPrecision());
+ }
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ LocalZonedTimestampType localZonedTimestampType =
(LocalZonedTimestampType) t;
+ if (localZonedTimestampType.getPrecision() == 3) {
+ return new Timestamp64Writer();
+ } else {
+ return new Timestamp96Writer(localZonedTimestampType.getPrecision());
+ }
+ case ARRAY:
+ ArrayType arrayType = (ArrayType) t;
+ LogicalType elementType = arrayType.getElementType();
+ FieldWriter elementWriter = createWriter(elementType);
+ return new ArrayWriter(elementWriter);
+ case MAP:
+ MapType mapType = (MapType) t;
+ LogicalType keyType = mapType.getKeyType();
+ LogicalType valueType = mapType.getValueType();
+ FieldWriter keyWriter = createWriter(keyType);
+ FieldWriter valueWriter = createWriter(valueType);
+ return new MapWriter(keyWriter, valueWriter);
+ case ROW:
+ RowType rowType = (RowType) t;
+ FieldWriter[] fieldWriters = rowType.getFields().stream()
+
.map(RowType.RowField::getType).map(this::createWriter).toArray(FieldWriter[]::new);
+ String[] fieldNames = rowType.getFields().stream()
+ .map(RowType.RowField::getName).toArray(String[]::new);
+ return new RowWriter(fieldNames, fieldWriters);
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " + t);
}
}
private interface FieldWriter {
-
void write(RowData row, int ordinal);
+
+ void write(ArrayData array, int ordinal);
}
private class BooleanWriter implements FieldWriter {
@@ -152,6 +172,11 @@ public class ParquetRowDataWriter {
public void write(RowData row, int ordinal) {
recordConsumer.addBoolean(row.getBoolean(ordinal));
}
+
+ @Override
+ public void write(ArrayData array, int ordinal) {
+ recordConsumer.addBoolean(array.getBoolean(ordinal));
+ }
}
private class ByteWriter implements FieldWriter {
@@ -160,6 +185,11 @@ public class ParquetRowDataWriter {
public void write(RowData row, int ordinal) {
recordConsumer.addInteger(row.getByte(ordinal));
}
+
+ @Override
+ public void write(ArrayData array, int ordinal) {
+ recordConsumer.addInteger(array.getByte(ordinal));
+ }
}
private class ShortWriter implements FieldWriter {
@@ -168,6 +198,11 @@ public class ParquetRowDataWriter {
public void write(RowData row, int ordinal) {
recordConsumer.addInteger(row.getShort(ordinal));
}
+
+ @Override
+ public void write(ArrayData array, int ordinal) {
+ recordConsumer.addInteger(array.getShort(ordinal));
+ }
}
private class LongWriter implements FieldWriter {
@@ -176,6 +211,11 @@ public class ParquetRowDataWriter {
public void write(RowData row, int ordinal) {
recordConsumer.addLong(row.getLong(ordinal));
}
+
+ @Override
+ public void write(ArrayData array, int ordinal) {
+ recordConsumer.addLong(array.getLong(ordinal));
+ }
}
private class FloatWriter implements FieldWriter {
@@ -184,6 +224,11 @@ public class ParquetRowDataWriter {
public void write(RowData row, int ordinal) {
recordConsumer.addFloat(row.getFloat(ordinal));
}
+
+ @Override
+ public void write(ArrayData array, int ordinal) {
+ recordConsumer.addFloat(array.getFloat(ordinal));
+ }
}
private class DoubleWriter implements FieldWriter {
@@ -192,6 +237,11 @@ public class ParquetRowDataWriter {
public void write(RowData row, int ordinal) {
recordConsumer.addDouble(row.getDouble(ordinal));
}
+
+ @Override
+ public void write(ArrayData array, int ordinal) {
+ recordConsumer.addDouble(array.getDouble(ordinal));
+ }
}
private class StringWriter implements FieldWriter {
@@ -200,6 +250,11 @@ public class ParquetRowDataWriter {
public void write(RowData row, int ordinal) {
recordConsumer.addBinary(Binary.fromReusedByteArray(row.getString(ordinal).toBytes()));
}
+
+ @Override
+ public void write(ArrayData array, int ordinal) {
+
recordConsumer.addBinary(Binary.fromReusedByteArray(array.getString(ordinal).toBytes()));
+ }
}
private class BinaryWriter implements FieldWriter {
@@ -208,6 +263,11 @@ public class ParquetRowDataWriter {
public void write(RowData row, int ordinal) {
recordConsumer.addBinary(Binary.fromReusedByteArray(row.getBinary(ordinal)));
}
+
+ @Override
+ public void write(ArrayData array, int ordinal) {
+
recordConsumer.addBinary(Binary.fromReusedByteArray(array.getBinary(ordinal)));
+ }
}
private class IntWriter implements FieldWriter {
@@ -216,6 +276,11 @@ public class ParquetRowDataWriter {
public void write(RowData row, int ordinal) {
recordConsumer.addInteger(row.getInt(ordinal));
}
+
+ @Override
+ public void write(ArrayData array, int ordinal) {
+ recordConsumer.addInteger(array.getInt(ordinal));
+ }
}
/**
@@ -231,6 +296,11 @@ public class ParquetRowDataWriter {
public void write(RowData row, int ordinal) {
recordConsumer.addLong(timestampToInt64(row.getTimestamp(ordinal, 3)));
}
+
+ @Override
+ public void write(ArrayData array, int ordinal) {
+ recordConsumer.addLong(timestampToInt64(array.getTimestamp(ordinal, 3)));
+ }
}
private long timestampToInt64(TimestampData timestampData) {
@@ -254,6 +324,11 @@ public class ParquetRowDataWriter {
public void write(RowData row, int ordinal) {
recordConsumer.addBinary(timestampToInt96(row.getTimestamp(ordinal,
precision)));
}
+
+ @Override
+ public void write(ArrayData array, int ordinal) {
+ recordConsumer.addBinary(timestampToInt96(array.getTimestamp(ordinal,
precision)));
+ }
}
private Binary timestampToInt96(TimestampData timestampData) {
@@ -304,10 +379,20 @@ public class ParquetRowDataWriter {
@Override
public void write(RowData row, int ordinal) {
long unscaledLong = row.getDecimal(ordinal, precision,
scale).toUnscaledLong();
+ doWrite(unscaledLong);
+ }
+
+ @Override
+ public void write(ArrayData array, int ordinal) {
+ long unscaledLong = array.getDecimal(ordinal, precision,
scale).toUnscaledLong();
+ doWrite(unscaledLong);
+ }
+
+ private void doWrite(long unscaled) {
int i = 0;
int shift = initShift;
while (i < numBytes) {
- decimalBuffer[i] = (byte) (unscaledLong >> shift);
+ decimalBuffer[i] = (byte) (unscaled >> shift);
i += 1;
shift -= 8;
}
@@ -328,6 +413,16 @@ public class ParquetRowDataWriter {
@Override
public void write(RowData row, int ordinal) {
byte[] bytes = row.getDecimal(ordinal, precision,
scale).toUnscaledBytes();
+ doWrite(bytes);
+ }
+
+ @Override
+ public void write(ArrayData array, int ordinal) {
+ byte[] bytes = array.getDecimal(ordinal, precision,
scale).toUnscaledBytes();
+ doWrite(bytes);
+ }
+
+ private void doWrite(byte[] bytes) {
byte[] writtenBytes;
if (bytes.length == numBytes) {
// Avoid copy.
@@ -353,5 +448,132 @@ public class ParquetRowDataWriter {
// 19 <= precision <= 38, writes as FIXED_LEN_BYTE_ARRAY
return new UnscaledBytesWriter();
}
+
+ private class ArrayWriter implements FieldWriter {
+ private final FieldWriter elementWriter;
+
+ private ArrayWriter(FieldWriter elementWriter) {
+ this.elementWriter = elementWriter;
+ }
+
+ @Override
+ public void write(RowData row, int ordinal) {
+ ArrayData arrayData = row.getArray(ordinal);
+ doWrite(arrayData);
+ }
+
+ @Override
+ public void write(ArrayData array, int ordinal) {
+ ArrayData arrayData = array.getArray(ordinal);
+ doWrite(arrayData);
+ }
+
+ private void doWrite(ArrayData arrayData) {
+ recordConsumer.startGroup();
+ if (arrayData.size() > 0) {
+ final String repeatedGroup = "list";
+ final String elementField = "element";
+ recordConsumer.startField(repeatedGroup, 0);
+ for (int i = 0; i < arrayData.size(); i++) {
+ recordConsumer.startGroup();
+ if (!arrayData.isNullAt(i)) {
+ // Only creates the element field if the current array element is
not null.
+ recordConsumer.startField(elementField, 0);
+ elementWriter.write(arrayData, i);
+ recordConsumer.endField(elementField, 0);
+ }
+ recordConsumer.endGroup();
+ }
+ recordConsumer.endField(repeatedGroup, 0);
+ }
+ recordConsumer.endGroup();
+ }
+ }
+
+ private class MapWriter implements FieldWriter {
+ private final FieldWriter keyWriter;
+ private final FieldWriter valueWriter;
+
+ private MapWriter(FieldWriter keyWriter, FieldWriter valueWriter) {
+ this.keyWriter = keyWriter;
+ this.valueWriter = valueWriter;
+ }
+
+ @Override
+ public void write(RowData row, int ordinal) {
+ MapData map = row.getMap(ordinal);
+ doWrite(map);
+ }
+
+ @Override
+ public void write(ArrayData array, int ordinal) {
+ MapData map = array.getMap(ordinal);
+ doWrite(map);
+ }
+
+ private void doWrite(MapData mapData) {
+ ArrayData keyArray = mapData.keyArray();
+ ArrayData valueArray = mapData.valueArray();
+ recordConsumer.startGroup();
+ if (mapData.size() > 0) {
+ final String repeatedGroup = "key_value";
+ final String kField = "key";
+ final String vField = "value";
+ recordConsumer.startField(repeatedGroup, 0);
+ for (int i = 0; i < mapData.size(); i++) {
+ recordConsumer.startGroup();
+ // key
+ recordConsumer.startField(kField, 0);
+ this.keyWriter.write(keyArray, i);
+ recordConsumer.endField(kField, 0);
+ // value
+ if (!valueArray.isNullAt(i)) {
+ // Only creates the "value" field if the value if non-empty
+ recordConsumer.startField(vField, 1);
+ this.valueWriter.write(valueArray, i);
+ recordConsumer.endField(vField, 1);
+ }
+ recordConsumer.endGroup();
+ }
+ recordConsumer.endField(repeatedGroup, 0);
+ }
+ recordConsumer.endGroup();
+ }
+ }
+
+ private class RowWriter implements FieldWriter {
+ private final String[] fieldNames;
+ private final FieldWriter[] fieldWriters;
+
+ private RowWriter(String[] fieldNames, FieldWriter[] fieldWriters) {
+ this.fieldNames = fieldNames;
+ this.fieldWriters = fieldWriters;
+ }
+
+ @Override
+ public void write(RowData row, int ordinal) {
+ RowData nested = row.getRow(ordinal, fieldWriters.length);
+ doWrite(nested);
+ }
+
+ @Override
+ public void write(ArrayData array, int ordinal) {
+ RowData nested = array.getRow(ordinal, fieldWriters.length);
+ doWrite(nested);
+ }
+
+ private void doWrite(RowData row) {
+ recordConsumer.startGroup();
+ for (int i = 0; i < row.getArity(); i++) {
+ if (!row.isNullAt(i)) {
+ String fieldName = fieldNames[i];
+ recordConsumer.startField(fieldName, i);
+ fieldWriters[i].write(row, i);
+ recordConsumer.endField(fieldName, i);
+ }
+ }
+ recordConsumer.endGroup();
+ }
+ }
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
index 80fda29..5da45bf 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
@@ -25,9 +25,11 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.MapTypeInfo;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
@@ -616,6 +618,45 @@ public class ParquetSchemaConverter {
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96,
repetition)
.named(name);
}
+ case ARRAY:
+ // <list-repetition> group <name> (LIST) {
+ // repeated group list {
+ // <element-repetition> <element-type> element;
+ // }
+ // }
+ ArrayType arrayType = (ArrayType) type;
+ LogicalType elementType = arrayType.getElementType();
+ return Types
+ .buildGroup(repetition).as(OriginalType.LIST)
+ .addField(
+ Types.repeatedGroup()
+ .addField(convertToParquetType("element", elementType,
repetition))
+ .named("list"))
+ .named(name);
+ case MAP:
+ // <map-repetition> group <name> (MAP) {
+ // repeated group key_value {
+ // required <key-type> key;
+ // <value-repetition> <value-type> value;
+ // }
+ // }
+ MapType mapType = (MapType) type;
+ LogicalType keyType = mapType.getKeyType();
+ LogicalType valueType = mapType.getValueType();
+ return Types
+ .buildGroup(repetition).as(OriginalType.MAP)
+ .addField(
+ Types
+ .repeatedGroup()
+ .addField(convertToParquetType("key", keyType, repetition))
+ .addField(convertToParquetType("value", valueType,
repetition))
+ .named("key_value"))
+ .named(name);
+ case ROW:
+ RowType rowType = (RowType) type;
+ Types.GroupBuilder<GroupType> builder = Types.buildGroup(repetition);
+ rowType.getFields().forEach(field ->
builder.addField(convertToParquetType(field.getName(), field.getType(),
repetition)));
+ return builder.named(name);
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
}
diff --git
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
new file mode 100644
index 0000000..5305bcc
--- /dev/null
+++
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.row.parquet;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Test cases for {@link ParquetSchemaConverter}.
+ */
+public class TestParquetSchemaConverter {
+ @Test
+ void testConvertComplexTypes() {
+ DataType dataType = DataTypes.ROW(
+ DataTypes.FIELD("f_array",
+ DataTypes.ARRAY(DataTypes.CHAR(10))),
+ DataTypes.FIELD("f_map",
+ DataTypes.MAP(DataTypes.INT(), DataTypes.VARCHAR(20))),
+ DataTypes.FIELD("f_row",
+ DataTypes.ROW(
+ DataTypes.FIELD("f_row_f0", DataTypes.INT()),
+ DataTypes.FIELD("f_row_f1", DataTypes.VARCHAR(10)),
+ DataTypes.FIELD("f_row_f2",
+ DataTypes.ROW(
+ DataTypes.FIELD("f_row_f2_f0", DataTypes.INT()),
+ DataTypes.FIELD("f_row_f2_f1",
DataTypes.VARCHAR(10)))))));
+ org.apache.parquet.schema.MessageType messageType =
+ ParquetSchemaConverter.convertToParquetMessageType("converted",
(RowType) dataType.getLogicalType());
+ assertThat(messageType.getColumns().size(), is(7));
+ final String expected = "message converted {\n"
+ + " optional group f_array (LIST) {\n"
+ + " repeated group list {\n"
+ + " optional binary element (UTF8);\n"
+ + " }\n"
+ + " }\n"
+ + " optional group f_map (MAP) {\n"
+ + " repeated group key_value {\n"
+ + " optional int32 key;\n"
+ + " optional binary value (UTF8);\n"
+ + " }\n"
+ + " }\n"
+ + " optional group f_row {\n"
+ + " optional int32 f_row_f0;\n"
+ + " optional binary f_row_f1 (UTF8);\n"
+ + " optional group f_row_f2 {\n"
+ + " optional int32 f_row_f2_f0;\n"
+ + " optional binary f_row_f2_f1 (UTF8);\n"
+ + " }\n"
+ + " }\n"
+ + "}\n";
+ assertThat(messageType.toString(), is(expected));
+ }
+}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
index 090ed29..a72b885 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
@@ -30,7 +30,10 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Collections;
import java.util.List;
/**
@@ -43,6 +46,7 @@ import java.util.List;
* @see StreamWriteOperatorCoordinator
*/
public class AppendWriteFunction<I> extends AbstractStreamWriteFunction<I> {
+ private static final Logger LOG =
LoggerFactory.getLogger(AppendWriteFunction.class);
private static final long serialVersionUID = 1L;
@@ -113,14 +117,19 @@ public class AppendWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
}
private void flushData(boolean endInput) {
- if (this.writerHelper == null) {
- // does not process any inputs, returns early.
- return;
+ final List<WriteStatus> writeStatus;
+ final String instant;
+ if (this.writerHelper != null) {
+ writeStatus = this.writerHelper.getWriteStatuses(this.taskID);
+ instant = this.writerHelper.getInstantTime();
+ } else {
+ LOG.info("No data to write in subtask [{}] for instant [{}]", taskID,
currentInstant);
+ writeStatus = Collections.emptyList();
+ instant = instantToWrite(false);
}
- final List<WriteStatus> writeStatus =
this.writerHelper.getWriteStatuses(this.taskID);
final WriteMetadataEvent event = WriteMetadataEvent.builder()
.taskID(taskID)
- .instantTime(this.writerHelper.getInstantTime())
+ .instantTime(instant)
.writeStatus(writeStatus)
.lastBatch(true)
.endInput(endInput)
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetColumnarRowSplitReader.java
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetColumnarRowSplitReader.java
index 64eb1f4..c615283 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetColumnarRowSplitReader.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetColumnarRowSplitReader.java
@@ -18,10 +18,11 @@
package org.apache.hudi.table.format.cow;
+import org.apache.hudi.table.format.cow.data.ColumnarRowData;
+import org.apache.hudi.table.format.cow.vector.VectorizedColumnBatch;
+
import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
-import org.apache.flink.table.data.ColumnarRowData;
import org.apache.flink.table.data.vector.ColumnVector;
-import org.apache.flink.table.data.vector.VectorizedColumnBatch;
import org.apache.flink.table.data.vector.writable.WritableColumnVector;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
@@ -208,11 +209,14 @@ public class ParquetColumnarRowSplitReader implements
Closeable {
private WritableColumnVector[] createWritableVectors() {
WritableColumnVector[] columns = new
WritableColumnVector[requestedTypes.length];
+ List<Type> types = requestedSchema.getFields();
+ List<ColumnDescriptor> descriptors = requestedSchema.getColumns();
for (int i = 0; i < requestedTypes.length; i++) {
columns[i] = createWritableColumnVector(
batchSize,
requestedTypes[i],
- requestedSchema.getColumns().get(i).getPrimitiveType());
+ types.get(i),
+ descriptors);
}
return columns;
}
@@ -236,11 +240,6 @@ public class ParquetColumnarRowSplitReader implements
Closeable {
* Check that the requested schema is supported.
*/
for (int i = 0; i < requestedSchema.getFieldCount(); ++i) {
- Type t = requestedSchema.getFields().get(i);
- if (!t.isPrimitive() || t.isRepetition(Type.Repetition.REPEATED)) {
- throw new UnsupportedOperationException("Complex types not
supported.");
- }
-
String[] colPath = requestedSchema.getPaths().get(i);
if (fileSchema.containsPath(colPath)) {
ColumnDescriptor fd = fileSchema.getColumnDescription(colPath);
@@ -322,14 +321,16 @@ public class ParquetColumnarRowSplitReader implements
Closeable {
throw new IOException("expecting more rows but reached last block. Read "
+ rowsReturned + " out of " + totalRowCount);
}
+ List<Type> types = requestedSchema.getFields();
List<ColumnDescriptor> columns = requestedSchema.getColumns();
- columnReaders = new ColumnReader[columns.size()];
- for (int i = 0; i < columns.size(); ++i) {
+ columnReaders = new ColumnReader[types.size()];
+ for (int i = 0; i < types.size(); ++i) {
columnReaders[i] = createColumnReader(
utcTimestamp,
requestedTypes[i],
- columns.get(i),
- pages.getPageReader(columns.get(i)));
+ types.get(i),
+ columns,
+ pages);
}
totalCountLoadedSoFar += pages.getRowCount();
}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetDecimalVector.java
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetDecimalVector.java
index 2749f02..4705b2f 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetDecimalVector.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetDecimalVector.java
@@ -32,9 +32,9 @@ import org.apache.flink.table.data.vector.DecimalColumnVector;
*/
public class ParquetDecimalVector implements DecimalColumnVector {
- private final ColumnVector vector;
+ public final ColumnVector vector;
- ParquetDecimalVector(ColumnVector vector) {
+ public ParquetDecimalVector(ColumnVector vector) {
this.vector = vector;
}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index 29c1b20..10a2dcd 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -18,6 +18,15 @@
package org.apache.hudi.table.format.cow;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
+import org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
+import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
+import org.apache.hudi.table.format.cow.vector.VectorizedColumnBatch;
+import org.apache.hudi.table.format.cow.vector.reader.ArrayColumnReader;
+import org.apache.hudi.table.format.cow.vector.reader.MapColumnReader;
+import org.apache.hudi.table.format.cow.vector.reader.RowColumnReader;
+
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.parquet.vector.reader.BooleanColumnReader;
import org.apache.flink.formats.parquet.vector.reader.ByteColumnReader;
@@ -32,7 +41,6 @@ import
org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.vector.ColumnVector;
-import org.apache.flink.table.data.vector.VectorizedColumnBatch;
import org.apache.flink.table.data.vector.heap.HeapBooleanVector;
import org.apache.flink.table.data.vector.heap.HeapByteVector;
import org.apache.flink.table.data.vector.heap.HeapBytesVector;
@@ -44,16 +52,24 @@ import
org.apache.flink.table.data.vector.heap.HeapShortVector;
import org.apache.flink.table.data.vector.heap.HeapTimestampVector;
import org.apache.flink.table.data.vector.writable.WritableColumnVector;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.ParquetRuntimeException;
import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.InvalidSchemaException;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
import java.io.IOException;
import java.math.BigDecimal;
@@ -61,6 +77,7 @@ import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.time.LocalDate;
import java.time.LocalDateTime;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -252,11 +269,40 @@ public class ParquetSplitReaderUtil {
}
}
+ private static List<ColumnDescriptor> filterDescriptors(int depth, Type
type, List<ColumnDescriptor> columns) throws ParquetRuntimeException {
+ List<ColumnDescriptor> filtered = new ArrayList<>();
+ for (ColumnDescriptor descriptor : columns) {
+ if (depth >= descriptor.getPath().length) {
+ throw new InvalidSchemaException("Expect depth " + depth + " for
schema: " + descriptor);
+ }
+ if (type.getName().equals(descriptor.getPath()[depth])) {
+ filtered.add(descriptor);
+ }
+ }
+ ValidationUtils.checkState(filtered.size() > 0, "Corrupted Parquet
schema");
+ return filtered;
+ }
+
public static ColumnReader createColumnReader(
boolean utcTimestamp,
LogicalType fieldType,
- ColumnDescriptor descriptor,
- PageReader pageReader) throws IOException {
+ Type physicalType,
+ List<ColumnDescriptor> descriptors,
+ PageReadStore pages) throws IOException {
+ return createColumnReader(utcTimestamp, fieldType, physicalType,
descriptors,
+ pages, 0);
+ }
+
+ private static ColumnReader createColumnReader(
+ boolean utcTimestamp,
+ LogicalType fieldType,
+ Type physicalType,
+ List<ColumnDescriptor> columns,
+ PageReadStore pages,
+ int depth) throws IOException {
+ List<ColumnDescriptor> descriptors = filterDescriptors(depth,
physicalType, columns);
+ ColumnDescriptor descriptor = descriptors.get(0);
+ PageReader pageReader = pages.getPageReader(descriptor);
switch (fieldType.getTypeRoot()) {
case BOOLEAN:
return new BooleanColumnReader(descriptor, pageReader);
@@ -303,6 +349,45 @@ public class ParquetSplitReaderUtil {
default:
throw new AssertionError();
}
+ case ARRAY:
+ return new ArrayColumnReader(
+ descriptor,
+ pageReader,
+ utcTimestamp,
+ descriptor.getPrimitiveType(),
+ fieldType);
+ case MAP:
+ MapType mapType = (MapType) fieldType;
+ ArrayColumnReader keyReader =
+ new ArrayColumnReader(
+ descriptor,
+ pageReader,
+ utcTimestamp,
+ descriptor.getPrimitiveType(),
+ new ArrayType(mapType.getKeyType()));
+ ArrayColumnReader valueReader =
+ new ArrayColumnReader(
+ descriptors.get(1),
+ pages.getPageReader(descriptors.get(1)),
+ utcTimestamp,
+ descriptors.get(1).getPrimitiveType(),
+ new ArrayType(mapType.getValueType()));
+ return new MapColumnReader(keyReader, valueReader, fieldType);
+ case ROW:
+ RowType rowType = (RowType) fieldType;
+ GroupType groupType = physicalType.asGroupType();
+ List<ColumnReader> fieldReaders = new ArrayList<>();
+ for (int i = 0; i < rowType.getFieldCount(); i++) {
+ fieldReaders.add(
+ createColumnReader(
+ utcTimestamp,
+ rowType.getTypeAt(i),
+ groupType.getType(i),
+ descriptors,
+ pages,
+ depth + 1));
+ }
+ return new RowColumnReader(fieldReaders);
default:
throw new UnsupportedOperationException(fieldType + " is not supported
now.");
}
@@ -311,7 +396,19 @@ public class ParquetSplitReaderUtil {
public static WritableColumnVector createWritableColumnVector(
int batchSize,
LogicalType fieldType,
- PrimitiveType primitiveType) {
+ Type physicalType,
+ List<ColumnDescriptor> descriptors) {
+ return createWritableColumnVector(batchSize, fieldType, physicalType,
descriptors, 0);
+ }
+
+ private static WritableColumnVector createWritableColumnVector(
+ int batchSize,
+ LogicalType fieldType,
+ Type physicalType,
+ List<ColumnDescriptor> columns,
+ int depth) {
+ List<ColumnDescriptor> descriptors = filterDescriptors(depth,
physicalType, columns);
+ PrimitiveType primitiveType = descriptors.get(0).getPrimitiveType();
PrimitiveType.PrimitiveTypeName typeName =
primitiveType.getPrimitiveTypeName();
switch (fieldType.getTypeRoot()) {
case BOOLEAN:
@@ -371,6 +468,49 @@ public class ParquetSplitReaderUtil {
&& primitiveType.getOriginalType() == OriginalType.DECIMAL,
"Unexpected type: %s", typeName);
return new HeapBytesVector(batchSize);
+ case ARRAY:
+ ArrayType arrayType = (ArrayType) fieldType;
+ return new HeapArrayVector(
+ batchSize,
+ createWritableColumnVector(
+ batchSize,
+ arrayType.getElementType(),
+ physicalType,
+ descriptors,
+ depth));
+ case MAP:
+ MapType mapType = (MapType) fieldType;
+ GroupType repeatedType =
physicalType.asGroupType().getType(0).asGroupType();
+ // the map column has three level paths.
+ return new HeapMapColumnVector(
+ batchSize,
+ createWritableColumnVector(
+ batchSize,
+ mapType.getKeyType(),
+ repeatedType.getType(0),
+ descriptors,
+ depth + 2),
+ createWritableColumnVector(
+ batchSize,
+ mapType.getValueType(),
+ repeatedType.getType(1),
+ descriptors,
+ depth + 2));
+ case ROW:
+ RowType rowType = (RowType) fieldType;
+ GroupType groupType = physicalType.asGroupType();
+ WritableColumnVector[] columnVectors =
+ new WritableColumnVector[rowType.getFieldCount()];
+ for (int i = 0; i < columnVectors.length; i++) {
+ columnVectors[i] =
+ createWritableColumnVector(
+ batchSize,
+ rowType.getTypeAt(i),
+ groupType.getType(i),
+ descriptors,
+ depth + 1);
+ }
+ return new HeapRowColumnVector(batchSize, columnVectors);
default:
throw new UnsupportedOperationException(fieldType + " is not supported
now.");
}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/data/ColumnarArrayData.java
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/data/ColumnarArrayData.java
new file mode 100644
index 0000000..a16a4dd
--- /dev/null
+++
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/data/ColumnarArrayData.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.data;
+
+import org.apache.hudi.table.format.cow.vector.MapColumnVector;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.binary.TypedSetters;
+import org.apache.flink.table.data.vector.ArrayColumnVector;
+import org.apache.flink.table.data.vector.BooleanColumnVector;
+import org.apache.flink.table.data.vector.ByteColumnVector;
+import org.apache.flink.table.data.vector.BytesColumnVector;
+import org.apache.flink.table.data.vector.ColumnVector;
+import org.apache.flink.table.data.vector.DecimalColumnVector;
+import org.apache.flink.table.data.vector.DoubleColumnVector;
+import org.apache.flink.table.data.vector.FloatColumnVector;
+import org.apache.flink.table.data.vector.IntColumnVector;
+import org.apache.flink.table.data.vector.LongColumnVector;
+import org.apache.flink.table.data.vector.RowColumnVector;
+import org.apache.flink.table.data.vector.ShortColumnVector;
+import org.apache.flink.table.data.vector.TimestampColumnVector;
+
+import java.util.Arrays;
+
+/**
+ * Columnar array to support access to vector column data.
+ *
+ * <p>References {@code org.apache.flink.table.data.ColumnarArrayData} to
include FLINK-15390.
+ */
+public final class ColumnarArrayData implements ArrayData, TypedSetters {
+
+ private final ColumnVector data;
+ private final int offset;
+ private final int numElements;
+
+ public ColumnarArrayData(ColumnVector data, int offset, int numElements) {
+ this.data = data;
+ this.offset = offset;
+ this.numElements = numElements;
+ }
+
+ @Override
+ public int size() {
+ return numElements;
+ }
+
+ @Override
+ public boolean isNullAt(int pos) {
+ return data.isNullAt(offset + pos);
+ }
+
+ @Override
+ public void setNullAt(int pos) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public boolean getBoolean(int pos) {
+ return ((BooleanColumnVector) data).getBoolean(offset + pos);
+ }
+
+ @Override
+ public byte getByte(int pos) {
+ return ((ByteColumnVector) data).getByte(offset + pos);
+ }
+
+ @Override
+ public short getShort(int pos) {
+ return ((ShortColumnVector) data).getShort(offset + pos);
+ }
+
+ @Override
+ public int getInt(int pos) {
+ return ((IntColumnVector) data).getInt(offset + pos);
+ }
+
+ @Override
+ public long getLong(int pos) {
+ return ((LongColumnVector) data).getLong(offset + pos);
+ }
+
+ @Override
+ public float getFloat(int pos) {
+ return ((FloatColumnVector) data).getFloat(offset + pos);
+ }
+
+ @Override
+ public double getDouble(int pos) {
+ return ((DoubleColumnVector) data).getDouble(offset + pos);
+ }
+
+ @Override
+ public StringData getString(int pos) {
+ BytesColumnVector.Bytes byteArray = getByteArray(pos);
+ return StringData.fromBytes(byteArray.data, byteArray.offset,
byteArray.len);
+ }
+
+ @Override
+ public DecimalData getDecimal(int pos, int precision, int scale) {
+ return ((DecimalColumnVector) data).getDecimal(offset + pos, precision,
scale);
+ }
+
+ @Override
+ public TimestampData getTimestamp(int pos, int precision) {
+ return ((TimestampColumnVector) data).getTimestamp(offset + pos,
precision);
+ }
+
+ @Override
+ public <T> RawValueData<T> getRawValue(int pos) {
+ throw new UnsupportedOperationException("RawValueData is not supported.");
+ }
+
+ @Override
+ public byte[] getBinary(int pos) {
+ BytesColumnVector.Bytes byteArray = getByteArray(pos);
+ if (byteArray.len == byteArray.data.length) {
+ return byteArray.data;
+ } else {
+ return Arrays.copyOfRange(byteArray.data, byteArray.offset,
byteArray.len);
+ }
+ }
+
+ @Override
+ public ArrayData getArray(int pos) {
+ return ((ArrayColumnVector) data).getArray(offset + pos);
+ }
+
+ @Override
+ public MapData getMap(int pos) {
+ return ((MapColumnVector) data).getMap(offset + pos);
+ }
+
+ @Override
+ public RowData getRow(int pos, int numFields) {
+ return ((RowColumnVector) data).getRow(offset + pos);
+ }
+
+ @Override
+ public void setBoolean(int pos, boolean value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setByte(int pos, byte value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setShort(int pos, short value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setInt(int pos, int value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setLong(int pos, long value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setFloat(int pos, float value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setDouble(int pos, double value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setDecimal(int pos, DecimalData value, int precision) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setTimestamp(int pos, TimestampData value, int precision) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public boolean[] toBooleanArray() {
+ boolean[] res = new boolean[numElements];
+ for (int i = 0; i < numElements; i++) {
+ res[i] = getBoolean(i);
+ }
+ return res;
+ }
+
+ @Override
+ public byte[] toByteArray() {
+ byte[] res = new byte[numElements];
+ for (int i = 0; i < numElements; i++) {
+ res[i] = getByte(i);
+ }
+ return res;
+ }
+
+ @Override
+ public short[] toShortArray() {
+ short[] res = new short[numElements];
+ for (int i = 0; i < numElements; i++) {
+ res[i] = getShort(i);
+ }
+ return res;
+ }
+
+ @Override
+ public int[] toIntArray() {
+ int[] res = new int[numElements];
+ for (int i = 0; i < numElements; i++) {
+ res[i] = getInt(i);
+ }
+ return res;
+ }
+
+ @Override
+ public long[] toLongArray() {
+ long[] res = new long[numElements];
+ for (int i = 0; i < numElements; i++) {
+ res[i] = getLong(i);
+ }
+ return res;
+ }
+
+ @Override
+ public float[] toFloatArray() {
+ float[] res = new float[numElements];
+ for (int i = 0; i < numElements; i++) {
+ res[i] = getFloat(i);
+ }
+ return res;
+ }
+
+ @Override
+ public double[] toDoubleArray() {
+ double[] res = new double[numElements];
+ for (int i = 0; i < numElements; i++) {
+ res[i] = getDouble(i);
+ }
+ return res;
+ }
+
+ private BytesColumnVector.Bytes getByteArray(int pos) {
+ return ((BytesColumnVector) data).getBytes(offset + pos);
+ }
+}
+
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/data/ColumnarMapData.java
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/data/ColumnarMapData.java
new file mode 100644
index 0000000..9792e87
--- /dev/null
+++
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/data/ColumnarMapData.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.data;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.ColumnarArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.vector.ColumnVector;
+
+/**
+ * Columnar map to support access to vector column data.
+ *
+ * <p>Referenced from flink 1.14.0 {@code
org.apache.flink.table.data.ColumnarMapData}.
+ */
+public final class ColumnarMapData implements MapData {
+
+ private final ColumnVector keyColumnVector;
+ private final ColumnVector valueColumnVector;
+ private final int offset;
+ private final int size;
+
+ public ColumnarMapData(
+ ColumnVector keyColumnVector,
+ ColumnVector valueColumnVector,
+ int offset,
+ int size) {
+ this.keyColumnVector = keyColumnVector;
+ this.valueColumnVector = valueColumnVector;
+ this.offset = offset;
+ this.size = size;
+ }
+
+ @Override
+ public int size() {
+ return size;
+ }
+
+ @Override
+ public ArrayData keyArray() {
+ return new ColumnarArrayData(keyColumnVector, offset, size);
+ }
+
+ @Override
+ public ArrayData valueArray() {
+ return new ColumnarArrayData(valueColumnVector, offset, size);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ throw new UnsupportedOperationException(
+ "ColumnarMapData do not support equals, please compare fields one by
one!");
+ }
+
+ @Override
+ public int hashCode() {
+ throw new UnsupportedOperationException(
+ "ColumnarMapData do not support hashCode, please hash fields one by
one!");
+ }
+}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/data/ColumnarRowData.java
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/data/ColumnarRowData.java
new file mode 100644
index 0000000..ebb4ca2
--- /dev/null
+++
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/data/ColumnarRowData.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.data;
+
+import org.apache.hudi.table.format.cow.vector.VectorizedColumnBatch;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.binary.TypedSetters;
+import org.apache.flink.table.data.vector.BytesColumnVector;
+import org.apache.flink.types.RowKind;
+
+/**
+ * Columnar row to support access to vector column data.
+ * It is a row view in {@link VectorizedColumnBatch}.
+ *
+ * <p>References {@code org.apache.flink.table.data.ColumnarRowData} to
include FLINK-15390.
+ */
+public final class ColumnarRowData implements RowData, TypedSetters {
+
+ private RowKind rowKind = RowKind.INSERT;
+ private VectorizedColumnBatch vectorizedColumnBatch;
+ private int rowId;
+
+ public ColumnarRowData() {
+ }
+
+ public ColumnarRowData(VectorizedColumnBatch vectorizedColumnBatch) {
+ this(vectorizedColumnBatch, 0);
+ }
+
+ public ColumnarRowData(VectorizedColumnBatch vectorizedColumnBatch, int
rowId) {
+ this.vectorizedColumnBatch = vectorizedColumnBatch;
+ this.rowId = rowId;
+ }
+
+ public void setVectorizedColumnBatch(VectorizedColumnBatch
vectorizedColumnBatch) {
+ this.vectorizedColumnBatch = vectorizedColumnBatch;
+ this.rowId = 0;
+ }
+
+ public void setRowId(int rowId) {
+ this.rowId = rowId;
+ }
+
+ @Override
+ public RowKind getRowKind() {
+ return rowKind;
+ }
+
+ @Override
+ public void setRowKind(RowKind kind) {
+ this.rowKind = kind;
+ }
+
+ @Override
+ public int getArity() {
+ return vectorizedColumnBatch.getArity();
+ }
+
+ @Override
+ public boolean isNullAt(int pos) {
+ return vectorizedColumnBatch.isNullAt(rowId, pos);
+ }
+
+ @Override
+ public boolean getBoolean(int pos) {
+ return vectorizedColumnBatch.getBoolean(rowId, pos);
+ }
+
+ @Override
+ public byte getByte(int pos) {
+ return vectorizedColumnBatch.getByte(rowId, pos);
+ }
+
+ @Override
+ public short getShort(int pos) {
+ return vectorizedColumnBatch.getShort(rowId, pos);
+ }
+
+ @Override
+ public int getInt(int pos) {
+ return vectorizedColumnBatch.getInt(rowId, pos);
+ }
+
+ @Override
+ public long getLong(int pos) {
+ return vectorizedColumnBatch.getLong(rowId, pos);
+ }
+
+ @Override
+ public float getFloat(int pos) {
+ return vectorizedColumnBatch.getFloat(rowId, pos);
+ }
+
+ @Override
+ public double getDouble(int pos) {
+ return vectorizedColumnBatch.getDouble(rowId, pos);
+ }
+
+ @Override
+ public StringData getString(int pos) {
+ BytesColumnVector.Bytes byteArray =
vectorizedColumnBatch.getByteArray(rowId, pos);
+ return StringData.fromBytes(byteArray.data, byteArray.offset,
byteArray.len);
+ }
+
+ @Override
+ public DecimalData getDecimal(int pos, int precision, int scale) {
+ return vectorizedColumnBatch.getDecimal(rowId, pos, precision, scale);
+ }
+
+ @Override
+ public TimestampData getTimestamp(int pos, int precision) {
+ return vectorizedColumnBatch.getTimestamp(rowId, pos, precision);
+ }
+
+ @Override
+ public <T> RawValueData<T> getRawValue(int pos) {
+ throw new UnsupportedOperationException("RawValueData is not supported.");
+ }
+
+ @Override
+ public byte[] getBinary(int pos) {
+ BytesColumnVector.Bytes byteArray =
vectorizedColumnBatch.getByteArray(rowId, pos);
+ if (byteArray.len == byteArray.data.length) {
+ return byteArray.data;
+ } else {
+ byte[] ret = new byte[byteArray.len];
+ System.arraycopy(byteArray.data, byteArray.offset, ret, 0,
byteArray.len);
+ return ret;
+ }
+ }
+
+ @Override
+ public RowData getRow(int pos, int numFields) {
+ return vectorizedColumnBatch.getRow(rowId, pos);
+ }
+
+ @Override
+ public ArrayData getArray(int pos) {
+ return vectorizedColumnBatch.getArray(rowId, pos);
+ }
+
+ @Override
+ public MapData getMap(int pos) {
+ return vectorizedColumnBatch.getMap(rowId, pos);
+ }
+
+ @Override
+ public void setNullAt(int pos) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setBoolean(int pos, boolean value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setByte(int pos, byte value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setShort(int pos, short value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setInt(int pos, int value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setLong(int pos, long value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setFloat(int pos, float value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setDouble(int pos, double value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setDecimal(int pos, DecimalData value, int precision) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setTimestamp(int pos, TimestampData value, int precision) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ throw new UnsupportedOperationException(
+ "ColumnarRowData do not support equals, please compare fields one by
one!");
+ }
+
+ @Override
+ public int hashCode() {
+ throw new UnsupportedOperationException(
+ "ColumnarRowData do not support hashCode, please hash fields one by
one!");
+ }
+}
+
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayVector.java
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayVector.java
new file mode 100644
index 0000000..f4c15b6
--- /dev/null
+++
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayVector.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.hudi.table.format.cow.data.ColumnarArrayData;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.vector.ArrayColumnVector;
+import org.apache.flink.table.data.vector.ColumnVector;
+import org.apache.flink.table.data.vector.heap.AbstractHeapVector;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
+
+/**
+ * This class represents a nullable heap array column vector.
+ */
+public class HeapArrayVector extends AbstractHeapVector
+ implements WritableColumnVector, ArrayColumnVector {
+
+ public long[] offsets;
+ public long[] lengths;
+ public ColumnVector child;
+ private int size;
+
+ public HeapArrayVector(int len) {
+ super(len);
+ offsets = new long[len];
+ lengths = new long[len];
+ }
+
+ public HeapArrayVector(int len, ColumnVector vector) {
+ super(len);
+ offsets = new long[len];
+ lengths = new long[len];
+ this.child = vector;
+ }
+
+ public int getSize() {
+ return size;
+ }
+
+ public void setSize(int size) {
+ this.size = size;
+ }
+
+ public int getLen() {
+ return this.isNull.length;
+ }
+
+ @Override
+ public ArrayData getArray(int i) {
+ long offset = offsets[i];
+ long length = lengths[i];
+ return new ColumnarArrayData(child, (int) offset, (int) length);
+ }
+}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
new file mode 100644
index 0000000..f05a2e7
--- /dev/null
+++
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.hudi.table.format.cow.data.ColumnarMapData;
+
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.vector.ColumnVector;
+import org.apache.flink.table.data.vector.heap.AbstractHeapVector;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
+
+/**
+ * This class represents a nullable heap map column vector.
+ */
+public class HeapMapColumnVector extends AbstractHeapVector
+ implements WritableColumnVector, MapColumnVector {
+
+ private long[] offsets;
+ private long[] lengths;
+ private int size;
+ private ColumnVector keys;
+ private ColumnVector values;
+
+ public HeapMapColumnVector(int len, ColumnVector keys, ColumnVector values) {
+ super(len);
+ size = 0;
+ offsets = new long[len];
+ lengths = new long[len];
+ this.keys = keys;
+ this.values = values;
+ }
+
+ public void setOffsets(long[] offsets) {
+ this.offsets = offsets;
+ }
+
+ public void setLengths(long[] lengths) {
+ this.lengths = lengths;
+ }
+
+ public void setKeys(ColumnVector keys) {
+ this.keys = keys;
+ }
+
+ public void setValues(ColumnVector values) {
+ this.values = values;
+ }
+
+ public int getSize() {
+ return size;
+ }
+
+ public void setSize(int size) {
+ this.size = size;
+ }
+
+ @Override
+ public MapData getMap(int i) {
+ long offset = offsets[i];
+ long length = lengths[i];
+ return new ColumnarMapData(keys, values, (int) offset, (int) length);
+ }
+}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
new file mode 100644
index 0000000..ad05a61
--- /dev/null
+++
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.hudi.table.format.cow.data.ColumnarRowData;
+
+import org.apache.flink.table.data.vector.heap.AbstractHeapVector;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
+
+/**
+ * This class represents a nullable heap row column vector.
+ */
+public class HeapRowColumnVector extends AbstractHeapVector
+ implements WritableColumnVector, RowColumnVector {
+
+ public WritableColumnVector[] vectors;
+
+ public HeapRowColumnVector(int len, WritableColumnVector... vectors) {
+ super(len);
+ this.vectors = vectors;
+ }
+
+ @Override
+ public ColumnarRowData getRow(int i) {
+ ColumnarRowData columnarRowData = new ColumnarRowData(new
VectorizedColumnBatch(vectors));
+ columnarRowData.setRowId(i);
+ return columnarRowData;
+ }
+}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/MapColumnVector.java
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/MapColumnVector.java
new file mode 100644
index 0000000..38424da
--- /dev/null
+++
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/MapColumnVector.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.vector.ColumnVector;
+
+/**
+ * Map column vector.
+ */
+public interface MapColumnVector extends ColumnVector {
+ MapData getMap(int i);
+}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/RowColumnVector.java
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/RowColumnVector.java
new file mode 100644
index 0000000..293af7b
--- /dev/null
+++
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/RowColumnVector.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.hudi.table.format.cow.data.ColumnarRowData;
+
+import org.apache.flink.table.data.vector.ColumnVector;
+
+/**
+ * Row column vector.
+ */
+public interface RowColumnVector extends ColumnVector {
+ ColumnarRowData getRow(int i);
+}
\ No newline at end of file
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/VectorizedColumnBatch.java
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/VectorizedColumnBatch.java
new file mode 100644
index 0000000..9eee55d
--- /dev/null
+++
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/VectorizedColumnBatch.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.vector.ArrayColumnVector;
+import org.apache.flink.table.data.vector.BooleanColumnVector;
+import org.apache.flink.table.data.vector.ByteColumnVector;
+import org.apache.flink.table.data.vector.BytesColumnVector;
+import org.apache.flink.table.data.vector.ColumnVector;
+import org.apache.flink.table.data.vector.DecimalColumnVector;
+import org.apache.flink.table.data.vector.DoubleColumnVector;
+import org.apache.flink.table.data.vector.FloatColumnVector;
+import org.apache.flink.table.data.vector.IntColumnVector;
+import org.apache.flink.table.data.vector.LongColumnVector;
+import org.apache.flink.table.data.vector.ShortColumnVector;
+import org.apache.flink.table.data.vector.TimestampColumnVector;
+
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * A VectorizedColumnBatch is a set of rows, organized with each column as a
vector. It is the unit
+ * of query execution, organized to minimize the cost per row.
+ *
+ * <p>{@code VectorizedColumnBatch}s are influenced by Apache Hive
VectorizedRowBatch.
+ *
+ * <p>References {@code
org.apache.flink.table.data.vector.VectorizedColumnBatch} to include
FLINK-15390.
+ */
+public class VectorizedColumnBatch implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * This number is carefully chosen to minimize overhead and typically allows
one
+ * VectorizedColumnBatch to fit in cache.
+ */
+ public static final int DEFAULT_SIZE = 2048;
+
+ private int numRows;
+ public final ColumnVector[] columns;
+
+ public VectorizedColumnBatch(ColumnVector[] vectors) {
+ this.columns = vectors;
+ }
+
+ public void setNumRows(int numRows) {
+ this.numRows = numRows;
+ }
+
+ public int getNumRows() {
+ return numRows;
+ }
+
+ public int getArity() {
+ return columns.length;
+ }
+
+ public boolean isNullAt(int rowId, int colId) {
+ return columns[colId].isNullAt(rowId);
+ }
+
+ public boolean getBoolean(int rowId, int colId) {
+ return ((BooleanColumnVector) columns[colId]).getBoolean(rowId);
+ }
+
+ public byte getByte(int rowId, int colId) {
+ return ((ByteColumnVector) columns[colId]).getByte(rowId);
+ }
+
+ public short getShort(int rowId, int colId) {
+ return ((ShortColumnVector) columns[colId]).getShort(rowId);
+ }
+
+ public int getInt(int rowId, int colId) {
+ return ((IntColumnVector) columns[colId]).getInt(rowId);
+ }
+
+ public long getLong(int rowId, int colId) {
+ return ((LongColumnVector) columns[colId]).getLong(rowId);
+ }
+
+ public float getFloat(int rowId, int colId) {
+ return ((FloatColumnVector) columns[colId]).getFloat(rowId);
+ }
+
+ public double getDouble(int rowId, int colId) {
+ return ((DoubleColumnVector) columns[colId]).getDouble(rowId);
+ }
+
+ public BytesColumnVector.Bytes getByteArray(int rowId, int colId) {
+ return ((BytesColumnVector) columns[colId]).getBytes(rowId);
+ }
+
+ private byte[] getBytes(int rowId, int colId) {
+ BytesColumnVector.Bytes byteArray = getByteArray(rowId, colId);
+ if (byteArray.len == byteArray.data.length) {
+ return byteArray.data;
+ } else {
+ return byteArray.getBytes();
+ }
+ }
+
+ public String getString(int rowId, int colId) {
+ BytesColumnVector.Bytes byteArray = getByteArray(rowId, colId);
+ return new String(byteArray.data, byteArray.offset, byteArray.len,
StandardCharsets.UTF_8);
+ }
+
+ public DecimalData getDecimal(int rowId, int colId, int precision, int
scale) {
+ return ((DecimalColumnVector) (columns[colId])).getDecimal(rowId,
precision, scale);
+ }
+
+ public TimestampData getTimestamp(int rowId, int colId, int precision) {
+ return ((TimestampColumnVector) (columns[colId])).getTimestamp(rowId,
precision);
+ }
+
+ public ArrayData getArray(int rowId, int colId) {
+ return ((ArrayColumnVector) columns[colId]).getArray(rowId);
+ }
+
+ public RowData getRow(int rowId, int colId) {
+ return ((RowColumnVector) columns[colId]).getRow(rowId);
+ }
+
+ public MapData getMap(int rowId, int colId) {
+ return ((MapColumnVector) columns[colId]).getMap(rowId);
+ }
+}
+
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayColumnReader.java
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayColumnReader.java
new file mode 100644
index 0000000..256d4c1
--- /dev/null
+++
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayColumnReader.java
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector.reader;
+
+import org.apache.hudi.table.format.cow.ParquetDecimalVector;
+import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
+
+import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.vector.VectorizedColumnBatch;
+import org.apache.flink.table.data.vector.heap.HeapBooleanVector;
+import org.apache.flink.table.data.vector.heap.HeapByteVector;
+import org.apache.flink.table.data.vector.heap.HeapBytesVector;
+import org.apache.flink.table.data.vector.heap.HeapDoubleVector;
+import org.apache.flink.table.data.vector.heap.HeapFloatVector;
+import org.apache.flink.table.data.vector.heap.HeapIntVector;
+import org.apache.flink.table.data.vector.heap.HeapLongVector;
+import org.apache.flink.table.data.vector.heap.HeapShortVector;
+import org.apache.flink.table.data.vector.heap.HeapTimestampVector;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Array {@link ColumnReader}.
+ */
+public class ArrayColumnReader extends BaseVectorizedColumnReader {
+
+ // The value read in last time
+ private Object lastValue;
+
+ // flag to indicate if there is no data in parquet data page
+ private boolean eof = false;
+
+ // flag to indicate if it's the first time to read parquet data page with
this instance
+ boolean isFirstRow = true;
+
+ public ArrayColumnReader(
+ ColumnDescriptor descriptor,
+ PageReader pageReader,
+ boolean isUtcTimestamp,
+ Type type,
+ LogicalType logicalType)
+ throws IOException {
+ super(descriptor, pageReader, isUtcTimestamp, type, logicalType);
+ }
+
+ @Override
+ public void readToVector(int readNumber, WritableColumnVector vector) throws
IOException {
+ HeapArrayVector lcv = (HeapArrayVector) vector;
+ // before readBatch, initial the size of offsets & lengths as the default
value,
+ // the actual size will be assigned in setChildrenInfo() after reading
complete.
+ lcv.offsets = new long[VectorizedColumnBatch.DEFAULT_SIZE];
+ lcv.lengths = new long[VectorizedColumnBatch.DEFAULT_SIZE];
+ // Because the length of ListColumnVector.child can't be known now,
+ // the valueList will save all data for ListColumnVector temporary.
+ List<Object> valueList = new ArrayList<>();
+
+ LogicalType category = ((ArrayType) logicalType).getElementType();
+
+ // read the first row in parquet data page, this will be only happened
once for this
+ // instance
+ if (isFirstRow) {
+ if (!fetchNextValue(category)) {
+ return;
+ }
+ isFirstRow = false;
+ }
+
+ int index = collectDataFromParquetPage(readNumber, lcv, valueList,
category);
+
+ // Convert valueList to array for the ListColumnVector.child
+ fillColumnVector(category, lcv, valueList, index);
+ }
+
+ /**
+ * Reads a single value from parquet page, puts it into lastValue. Returns a
boolean indicating
+ * if there is more values to read (true).
+ *
+ * @param category
+ * @return boolean
+ * @throws IOException
+ */
+ private boolean fetchNextValue(LogicalType category) throws IOException {
+ int left = readPageIfNeed();
+ if (left > 0) {
+ // get the values of repetition and definitionLevel
+ readRepetitionAndDefinitionLevels();
+ // read the data if it isn't null
+ if (definitionLevel == maxDefLevel) {
+ if (isCurrentPageDictionaryEncoded) {
+ lastValue = dataColumn.readValueDictionaryId();
+ } else {
+ lastValue = readPrimitiveTypedRow(category);
+ }
+ } else {
+ lastValue = null;
+ }
+ return true;
+ } else {
+ eof = true;
+ return false;
+ }
+ }
+
+ private int readPageIfNeed() throws IOException {
+ // Compute the number of values we want to read in this page.
+ int leftInPage = (int) (endOfPageValueCount - valuesRead);
+ if (leftInPage == 0) {
+ // no data left in current page, load data from new page
+ readPage();
+ leftInPage = (int) (endOfPageValueCount - valuesRead);
+ }
+ return leftInPage;
+ }
+
+ // Need to be in consistent with that
VectorizedPrimitiveColumnReader#readBatchHelper
+ // TODO Reduce the duplicated code
+ private Object readPrimitiveTypedRow(LogicalType category) {
+ switch (category.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ case BINARY:
+ case VARBINARY:
+ return dataColumn.readString();
+ case BOOLEAN:
+ return dataColumn.readBoolean();
+ case TIME_WITHOUT_TIME_ZONE:
+ case DATE:
+ case INTEGER:
+ return dataColumn.readInteger();
+ case TINYINT:
+ return dataColumn.readTinyInt();
+ case SMALLINT:
+ return dataColumn.readSmallInt();
+ case BIGINT:
+ return dataColumn.readLong();
+ case FLOAT:
+ return dataColumn.readFloat();
+ case DOUBLE:
+ return dataColumn.readDouble();
+ case DECIMAL:
+ switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
+ case INT32:
+ return dataColumn.readInteger();
+ case INT64:
+ return dataColumn.readLong();
+ case BINARY:
+ case FIXED_LEN_BYTE_ARRAY:
+ return dataColumn.readString();
+ default:
+ throw new AssertionError();
+ }
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return dataColumn.readTimestamp();
+ default:
+ throw new RuntimeException("Unsupported type in the list: " + type);
+ }
+ }
+
+ private Object dictionaryDecodeValue(LogicalType category, Integer
dictionaryValue) {
+ if (dictionaryValue == null) {
+ return null;
+ }
+
+ switch (category.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ case BINARY:
+ case VARBINARY:
+ return dictionary.readString(dictionaryValue);
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ case INTEGER:
+ return dictionary.readInteger(dictionaryValue);
+ case BOOLEAN:
+ return dictionary.readBoolean(dictionaryValue) ? 1 : 0;
+ case DOUBLE:
+ return dictionary.readDouble(dictionaryValue);
+ case FLOAT:
+ return dictionary.readFloat(dictionaryValue);
+ case TINYINT:
+ return dictionary.readTinyInt(dictionaryValue);
+ case SMALLINT:
+ return dictionary.readSmallInt(dictionaryValue);
+ case BIGINT:
+ return dictionary.readLong(dictionaryValue);
+ case DECIMAL:
+ switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
+ case INT32:
+ return dictionary.readInteger(dictionaryValue);
+ case INT64:
+ return dictionary.readLong(dictionaryValue);
+ case FIXED_LEN_BYTE_ARRAY:
+ case BINARY:
+ return dictionary.readString(dictionaryValue);
+ default:
+ throw new AssertionError();
+ }
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return dictionary.readTimestamp(dictionaryValue);
+ default:
+ throw new RuntimeException("Unsupported type in the list: " + type);
+ }
+ }
+
+ /**
+ * Collects data from a parquet page and returns the final row index where
it stopped. The
+ * returned index can be equal to or less than total.
+ *
+ * @param total maximum number of rows to collect
+ * @param lcv column vector to do initial setup in data collection time
+ * @param valueList collection of values that will be fed into the vector
later
+ * @param category
+ * @return int
+ * @throws IOException
+ */
+ private int collectDataFromParquetPage(
+ int total, HeapArrayVector lcv, List<Object> valueList, LogicalType
category)
+ throws IOException {
+ int index = 0;
+ /*
+ * Here is a nested loop for collecting all values from a parquet page.
+ * A column of array type can be considered as a list of lists, so the two
loops are as below:
+ * 1. The outer loop iterates on rows (index is a row index, so points to
a row in the batch), e.g.:
+ * [0, 2, 3] <- index: 0
+ * [NULL, 3, 4] <- index: 1
+ *
+ * 2. The inner loop iterates on values within a row (sets all data from
parquet data page
+ * for an element in ListColumnVector), so fetchNextValue returns values
one-by-one:
+ * 0, 2, 3, NULL, 3, 4
+ *
+ * As described below, the repetition level (repetitionLevel != 0)
+ * can be used to decide when we'll start to read values for the next list.
+ */
+ while (!eof && index < total) {
+ // add element to ListColumnVector one by one
+ lcv.offsets[index] = valueList.size();
+ /*
+ * Let's collect all values for a single list.
+ * Repetition level = 0 means that a new list started there in the
parquet page,
+ * in that case, let's exit from the loop, and start to collect value
for a new list.
+ */
+ do {
+ /*
+ * Definition level = 0 when a NULL value was returned instead of a
list
+ * (this is not the same as a NULL value in of a list).
+ */
+ if (definitionLevel == 0) {
+ lcv.setNullAt(index);
+ }
+ valueList.add(
+ isCurrentPageDictionaryEncoded
+ ? dictionaryDecodeValue(category, (Integer) lastValue)
+ : lastValue);
+ } while (fetchNextValue(category) && (repetitionLevel != 0));
+
+ lcv.lengths[index] = valueList.size() - lcv.offsets[index];
+ index++;
+ }
+ return index;
+ }
+
+ /**
+ * The lengths & offsets will be initialized as default size (1024), it
should be set to the
+ * actual size according to the element number.
+ */
+ private void setChildrenInfo(HeapArrayVector lcv, int itemNum, int
elementNum) {
+ lcv.setSize(itemNum);
+ long[] lcvLength = new long[elementNum];
+ long[] lcvOffset = new long[elementNum];
+ System.arraycopy(lcv.lengths, 0, lcvLength, 0, elementNum);
+ System.arraycopy(lcv.offsets, 0, lcvOffset, 0, elementNum);
+ lcv.lengths = lcvLength;
+ lcv.offsets = lcvOffset;
+ }
+
+ private void fillColumnVector(
+ LogicalType category, HeapArrayVector lcv, List valueList, int
elementNum) {
+ int total = valueList.size();
+ setChildrenInfo(lcv, total, elementNum);
+ switch (category.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ case BINARY:
+ case VARBINARY:
+ lcv.child = new HeapBytesVector(total);
+ ((HeapBytesVector) lcv.child).reset();
+ for (int i = 0; i < valueList.size(); i++) {
+ byte[] src = ((List<byte[]>) valueList).get(i);
+ if (src == null) {
+ ((HeapBytesVector) lcv.child).setNullAt(i);
+ } else {
+ ((HeapBytesVector) lcv.child).appendBytes(i, src, 0, src.length);
+ }
+ }
+ break;
+ case BOOLEAN:
+ lcv.child = new HeapBooleanVector(total);
+ ((HeapBooleanVector) lcv.child).reset();
+ for (int i = 0; i < valueList.size(); i++) {
+ if (valueList.get(i) == null) {
+ ((HeapBooleanVector) lcv.child).setNullAt(i);
+ } else {
+ ((HeapBooleanVector) lcv.child).vector[i] =
+ ((List<Boolean>) valueList).get(i);
+ }
+ }
+ break;
+ case TINYINT:
+ lcv.child = new HeapByteVector(total);
+ ((HeapByteVector) lcv.child).reset();
+ for (int i = 0; i < valueList.size(); i++) {
+ if (valueList.get(i) == null) {
+ ((HeapByteVector) lcv.child).setNullAt(i);
+ } else {
+ ((HeapByteVector) lcv.child).vector[i] =
+ (byte) ((List<Integer>) valueList).get(i).intValue();
+ }
+ }
+ break;
+ case SMALLINT:
+ lcv.child = new HeapShortVector(total);
+ ((HeapShortVector) lcv.child).reset();
+ for (int i = 0; i < valueList.size(); i++) {
+ if (valueList.get(i) == null) {
+ ((HeapShortVector) lcv.child).setNullAt(i);
+ } else {
+ ((HeapShortVector) lcv.child).vector[i] =
+ (short) ((List<Integer>) valueList).get(i).intValue();
+ }
+ }
+ break;
+ case INTEGER:
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ lcv.child = new HeapIntVector(total);
+ ((HeapIntVector) lcv.child).reset();
+ for (int i = 0; i < valueList.size(); i++) {
+ if (valueList.get(i) == null) {
+ ((HeapIntVector) lcv.child).setNullAt(i);
+ } else {
+ ((HeapIntVector) lcv.child).vector[i] = ((List<Integer>)
valueList).get(i);
+ }
+ }
+ break;
+ case FLOAT:
+ lcv.child = new HeapFloatVector(total);
+ ((HeapFloatVector) lcv.child).reset();
+ for (int i = 0; i < valueList.size(); i++) {
+ if (valueList.get(i) == null) {
+ ((HeapFloatVector) lcv.child).setNullAt(i);
+ } else {
+ ((HeapFloatVector) lcv.child).vector[i] = ((List<Float>)
valueList).get(i);
+ }
+ }
+ break;
+ case BIGINT:
+ lcv.child = new HeapLongVector(total);
+ ((HeapLongVector) lcv.child).reset();
+ for (int i = 0; i < valueList.size(); i++) {
+ if (valueList.get(i) == null) {
+ ((HeapLongVector) lcv.child).setNullAt(i);
+ } else {
+ ((HeapLongVector) lcv.child).vector[i] = ((List<Long>)
valueList).get(i);
+ }
+ }
+ break;
+ case DOUBLE:
+ lcv.child = new HeapDoubleVector(total);
+ ((HeapDoubleVector) lcv.child).reset();
+ for (int i = 0; i < valueList.size(); i++) {
+ if (valueList.get(i) == null) {
+ ((HeapDoubleVector) lcv.child).setNullAt(i);
+ } else {
+ ((HeapDoubleVector) lcv.child).vector[i] =
+ ((List<Double>) valueList).get(i);
+ }
+ }
+ break;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ lcv.child = new HeapTimestampVector(total);
+ ((HeapTimestampVector) lcv.child).reset();
+ for (int i = 0; i < valueList.size(); i++) {
+ if (valueList.get(i) == null) {
+ ((HeapTimestampVector) lcv.child).setNullAt(i);
+ } else {
+ ((HeapTimestampVector) lcv.child)
+ .setTimestamp(i, ((List<TimestampData>) valueList).get(i));
+ }
+ }
+ break;
+ case DECIMAL:
+ PrimitiveType.PrimitiveTypeName primitiveTypeName =
+ descriptor.getPrimitiveType().getPrimitiveTypeName();
+ switch (primitiveTypeName) {
+ case INT32:
+ lcv.child = new ParquetDecimalVector(new HeapIntVector(total));
+ ((HeapIntVector) ((ParquetDecimalVector)
lcv.child).vector).reset();
+ for (int i = 0; i < valueList.size(); i++) {
+ if (valueList.get(i) == null) {
+ ((HeapIntVector) ((ParquetDecimalVector) lcv.child).vector)
+ .setNullAt(i);
+ } else {
+ ((HeapIntVector) ((ParquetDecimalVector) lcv.child).vector)
+ .vector[i] =
+ ((List<Integer>) valueList).get(i);
+ }
+ }
+ break;
+ case INT64:
+ lcv.child = new ParquetDecimalVector(new HeapLongVector(total));
+ ((HeapLongVector) ((ParquetDecimalVector)
lcv.child).vector).reset();
+ for (int i = 0; i < valueList.size(); i++) {
+ if (valueList.get(i) == null) {
+ ((HeapLongVector) ((ParquetDecimalVector) lcv.child).vector)
+ .setNullAt(i);
+ } else {
+ ((HeapLongVector) ((ParquetDecimalVector) lcv.child).vector)
+ .vector[i] =
+ ((List<Long>) valueList).get(i);
+ }
+ }
+ break;
+ default:
+ lcv.child = new ParquetDecimalVector(new HeapBytesVector(total));
+ ((HeapBytesVector) ((ParquetDecimalVector)
lcv.child).vector).reset();
+ for (int i = 0; i < valueList.size(); i++) {
+ byte[] src = ((List<byte[]>) valueList).get(i);
+ if (valueList.get(i) == null) {
+ ((HeapBytesVector) ((ParquetDecimalVector) lcv.child).vector)
+ .setNullAt(i);
+ } else {
+ ((HeapBytesVector) ((ParquetDecimalVector) lcv.child).vector)
+ .appendBytes(i, src, 0, src.length);
+ }
+ }
+ break;
+ }
+ break;
+ default:
+ throw new RuntimeException("Unsupported type in the list: " + type);
+ }
+ }
+}
+
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/BaseVectorizedColumnReader.java
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/BaseVectorizedColumnReader.java
new file mode 100644
index 0000000..073c704
--- /dev/null
+++
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/BaseVectorizedColumnReader.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector.reader;
+
+import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+
+/**
+ * Abstract {@link ColumnReader}. part of the code is referred from Apache
Hive and Apache Parquet.
+ */
+public abstract class BaseVectorizedColumnReader implements
ColumnReader<WritableColumnVector> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(BaseVectorizedColumnReader.class);
+
+ protected boolean isUtcTimestamp;
+
+ /**
+ * Total number of values read.
+ */
+ protected long valuesRead;
+
+ /**
+ * value that indicates the end of the current page. That is, if valuesRead
==
+ * endOfPageValueCount, we are at the end of the page.
+ */
+ protected long endOfPageValueCount;
+
+ /**
+ * The dictionary, if this column has dictionary encoding.
+ */
+ protected final ParquetDataColumnReader dictionary;
+
+ /**
+ * If true, the current page is dictionary encoded.
+ */
+ protected boolean isCurrentPageDictionaryEncoded;
+
+ /**
+ * Maximum definition level for this column.
+ */
+ protected final int maxDefLevel;
+
+ protected int definitionLevel;
+ protected int repetitionLevel;
+
+ /**
+ * Repetition/Definition/Value readers.
+ */
+ protected IntIterator repetitionLevelColumn;
+
+ protected IntIterator definitionLevelColumn;
+ protected ParquetDataColumnReader dataColumn;
+
+ /**
+ * Total values in the current page.
+ */
+ protected int pageValueCount;
+
+ protected final PageReader pageReader;
+ protected final ColumnDescriptor descriptor;
+ protected final Type type;
+ protected final LogicalType logicalType;
+
+ public BaseVectorizedColumnReader(
+ ColumnDescriptor descriptor,
+ PageReader pageReader,
+ boolean isUtcTimestamp,
+ Type parquetType,
+ LogicalType logicalType)
+ throws IOException {
+ this.descriptor = descriptor;
+ this.type = parquetType;
+ this.pageReader = pageReader;
+ this.maxDefLevel = descriptor.getMaxDefinitionLevel();
+ this.isUtcTimestamp = isUtcTimestamp;
+ this.logicalType = logicalType;
+
+ DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
+ if (dictionaryPage != null) {
+ try {
+ this.dictionary =
+
ParquetDataColumnReaderFactory.getDataColumnReaderByTypeOnDictionary(
+ parquetType.asPrimitiveType(),
+ dictionaryPage
+ .getEncoding()
+ .initDictionary(descriptor, dictionaryPage),
+ isUtcTimestamp);
+ this.isCurrentPageDictionaryEncoded = true;
+ } catch (IOException e) {
+ throw new IOException("could not decode the dictionary for " +
descriptor, e);
+ }
+ } else {
+ this.dictionary = null;
+ this.isCurrentPageDictionaryEncoded = false;
+ }
+ }
+
+ protected void readRepetitionAndDefinitionLevels() {
+ repetitionLevel = repetitionLevelColumn.nextInt();
+ definitionLevel = definitionLevelColumn.nextInt();
+ valuesRead++;
+ }
+
+ protected void readPage() throws IOException {
+ DataPage page = pageReader.readPage();
+
+ if (page == null) {
+ return;
+ }
+
+ page.accept(
+ new DataPage.Visitor<Void>() {
+ @Override
+ public Void visit(DataPageV1 dataPageV1) {
+ readPageV1(dataPageV1);
+ return null;
+ }
+
+ @Override
+ public Void visit(DataPageV2 dataPageV2) {
+ readPageV2(dataPageV2);
+ return null;
+ }
+ });
+ }
+
+ private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in,
int valueCount)
+ throws IOException {
+ this.pageValueCount = valueCount;
+ this.endOfPageValueCount = valuesRead + pageValueCount;
+ if (dataEncoding.usesDictionary()) {
+ this.dataColumn = null;
+ if (dictionary == null) {
+ throw new IOException(
+ "could not read page in col "
+ + descriptor
+ + " as the dictionary was missing for encoding "
+ + dataEncoding);
+ }
+ dataColumn =
+ ParquetDataColumnReaderFactory.getDataColumnReaderByType(
+ type.asPrimitiveType(),
+ dataEncoding.getDictionaryBasedValuesReader(
+ descriptor, VALUES, dictionary.getDictionary()),
+ isUtcTimestamp);
+ this.isCurrentPageDictionaryEncoded = true;
+ } else {
+ dataColumn =
+ ParquetDataColumnReaderFactory.getDataColumnReaderByType(
+ type.asPrimitiveType(),
+ dataEncoding.getValuesReader(descriptor, VALUES),
+ isUtcTimestamp);
+ this.isCurrentPageDictionaryEncoded = false;
+ }
+
+ try {
+ dataColumn.initFromPage(pageValueCount, in);
+ } catch (IOException e) {
+ throw new IOException("could not read page in col " + descriptor, e);
+ }
+ }
+
+ private void readPageV1(DataPageV1 page) {
+ ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor,
REPETITION_LEVEL);
+ ValuesReader dlReader = page.getDlEncoding().getValuesReader(descriptor,
DEFINITION_LEVEL);
+ this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
+ this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
+ try {
+ BytesInput bytes = page.getBytes();
+ LOG.debug("page size " + bytes.size() + " bytes and " + pageValueCount +
" records");
+ ByteBufferInputStream in = bytes.toInputStream();
+ LOG.debug("reading repetition levels at " + in.position());
+ rlReader.initFromPage(pageValueCount, in);
+ LOG.debug("reading definition levels at " + in.position());
+ dlReader.initFromPage(pageValueCount, in);
+ LOG.debug("reading data at " + in.position());
+ initDataReader(page.getValueEncoding(), in, page.getValueCount());
+ } catch (IOException e) {
+ throw new ParquetDecodingException(
+ "could not read page " + page + " in col " + descriptor, e);
+ }
+ }
+
+ private void readPageV2(DataPageV2 page) {
+ this.pageValueCount = page.getValueCount();
+ this.repetitionLevelColumn =
+ newRLEIterator(descriptor.getMaxRepetitionLevel(),
page.getRepetitionLevels());
+ this.definitionLevelColumn =
+ newRLEIterator(descriptor.getMaxDefinitionLevel(),
page.getDefinitionLevels());
+ try {
+ LOG.debug(
+ "page data size "
+ + page.getData().size()
+ + " bytes and "
+ + pageValueCount
+ + " records");
+ initDataReader(
+ page.getDataEncoding(), page.getData().toInputStream(),
page.getValueCount());
+ } catch (IOException e) {
+ throw new ParquetDecodingException(
+ "could not read page " + page + " in col " + descriptor, e);
+ }
+ }
+
+ private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) {
+ try {
+ if (maxLevel == 0) {
+ return new NullIntIterator();
+ }
+ return new RLEIntIterator(
+ new RunLengthBitPackingHybridDecoder(
+ BytesUtils.getWidthFromMaxInt(maxLevel),
+ new ByteArrayInputStream(bytes.toByteArray())));
+ } catch (IOException e) {
+ throw new ParquetDecodingException(
+ "could not read levels in page for col " + descriptor, e);
+ }
+ }
+
+ /**
+ * Utility classes to abstract over different way to read ints with
different encodings.
+ */
+ abstract static class IntIterator {
+ abstract int nextInt();
+ }
+
+ /**
+ * read ints from {@link ValuesReader}.
+ */
+ protected static final class ValuesReaderIntIterator extends IntIterator {
+ ValuesReader delegate;
+
+ public ValuesReaderIntIterator(ValuesReader delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ int nextInt() {
+ return delegate.readInteger();
+ }
+ }
+
+ /**
+ * read ints from {@link RunLengthBitPackingHybridDecoder}.
+ */
+ protected static final class RLEIntIterator extends IntIterator {
+ RunLengthBitPackingHybridDecoder delegate;
+
+ public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ int nextInt() {
+ try {
+ return delegate.readInt();
+ } catch (IOException e) {
+ throw new ParquetDecodingException(e);
+ }
+ }
+ }
+
+ /**
+ * return zero.
+ */
+ protected static final class NullIntIterator extends IntIterator {
+ @Override
+ int nextInt() {
+ return 0;
+ }
+ }
+}
+
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/MapColumnReader.java
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/MapColumnReader.java
new file mode 100644
index 0000000..015a867
--- /dev/null
+++
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/MapColumnReader.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector.reader;
+
+import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
+import org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
+
+import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
+import org.apache.flink.table.data.vector.ColumnVector;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+
+import java.io.IOException;
+
+/**
+ * Map {@link ColumnReader}.
+ */
+public class MapColumnReader implements ColumnReader<WritableColumnVector> {
+
+ private final LogicalType logicalType;
+ private final ArrayColumnReader keyReader;
+ private final ArrayColumnReader valueReader;
+
+ public MapColumnReader(
+ ArrayColumnReader keyReader, ArrayColumnReader valueReader, LogicalType
logicalType) {
+ this.keyReader = keyReader;
+ this.valueReader = valueReader;
+ this.logicalType = logicalType;
+ }
+
+ public void readBatch(int total, ColumnVector column) throws IOException {
+ HeapMapColumnVector mapColumnVector = (HeapMapColumnVector) column;
+ MapType mapType = (MapType) logicalType;
+ // initialize 2 ListColumnVector for keys and values
+ HeapArrayVector keyArrayColumnVector = new HeapArrayVector(total);
+ HeapArrayVector valueArrayColumnVector = new HeapArrayVector(total);
+ // read the keys and values
+ keyReader.readToVector(total, keyArrayColumnVector);
+ valueReader.readToVector(total, valueArrayColumnVector);
+
+ // set the related attributes according to the keys and values
+ mapColumnVector.setKeys(keyArrayColumnVector.child);
+ mapColumnVector.setValues(valueArrayColumnVector.child);
+ mapColumnVector.setOffsets(keyArrayColumnVector.offsets);
+ mapColumnVector.setLengths(keyArrayColumnVector.lengths);
+ mapColumnVector.setSize(keyArrayColumnVector.getSize());
+ for (int i = 0; i < keyArrayColumnVector.getLen(); i++) {
+ if (keyArrayColumnVector.isNullAt(i)) {
+ mapColumnVector.setNullAt(i);
+ }
+ }
+ }
+
+ @Override
+ public void readToVector(int readNumber, WritableColumnVector vector) throws
IOException {
+ readBatch(readNumber, vector);
+ }
+}
+
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetDataColumnReader.java
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetDataColumnReader.java
new file mode 100644
index 0000000..e96cf22
--- /dev/null
+++
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetDataColumnReader.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector.reader;
+
+import org.apache.flink.table.data.TimestampData;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.column.Dictionary;
+
+import java.io.IOException;
+
+/**
+ * The interface to wrap the underlying Parquet dictionary and non dictionary
encoded page reader.
+ */
+public interface ParquetDataColumnReader {
+
+ /**
+ * Initialize the reader by page data.
+ *
+ * @param valueCount value count
+ * @param in page data
+ * @throws IOException
+ */
+ void initFromPage(int valueCount, ByteBufferInputStream in) throws
IOException;
+
+ /**
+ * @return the next Dictionary ID from the page
+ */
+ int readValueDictionaryId();
+
+ /**
+ * @return the next Long from the page
+ */
+ long readLong();
+
+ /**
+ * @return the next Integer from the page
+ */
+ int readInteger();
+
+ /**
+ * @return the next SmallInt from the page
+ */
+ int readSmallInt();
+
+ /**
+ * @return the next TinyInt from the page
+ */
+ int readTinyInt();
+
+ /**
+ * @return the next Float from the page
+ */
+ float readFloat();
+
+ /**
+ * @return the next Boolean from the page
+ */
+ boolean readBoolean();
+
+ /**
+ * @return the next String from the page
+ */
+ byte[] readString();
+
+ /**
+ * @return the next Varchar from the page
+ */
+ byte[] readVarchar();
+
+ /**
+ * @return the next Char from the page
+ */
+ byte[] readChar();
+
+ /**
+ * @return the next Bytes from the page
+ */
+ byte[] readBytes();
+
+ /**
+ * @return the next Decimal from the page
+ */
+ byte[] readDecimal();
+
+ /**
+ * @return the next Double from the page
+ */
+ double readDouble();
+
+ /**
+ * @return the next TimestampData from the page
+ */
+ TimestampData readTimestamp();
+
+ /**
+ * @return is data valid
+ */
+ boolean isValid();
+
+ /**
+ * @return the underlying dictionary if current reader is dictionary encoded
+ */
+ Dictionary getDictionary();
+
+ /**
+ * @param id in dictionary
+ * @return the Bytes from the dictionary by id
+ */
+ byte[] readBytes(int id);
+
+ /**
+ * @param id in dictionary
+ * @return the Float from the dictionary by id
+ */
+ float readFloat(int id);
+
+ /**
+ * @param id in dictionary
+ * @return the Double from the dictionary by id
+ */
+ double readDouble(int id);
+
+ /**
+ * @param id in dictionary
+ * @return the Integer from the dictionary by id
+ */
+ int readInteger(int id);
+
+ /**
+ * @param id in dictionary
+ * @return the Long from the dictionary by id
+ */
+ long readLong(int id);
+
+ /**
+ * @param id in dictionary
+ * @return the Small Int from the dictionary by id
+ */
+ int readSmallInt(int id);
+
+ /**
+ * @param id in dictionary
+ * @return the tiny int from the dictionary by id
+ */
+ int readTinyInt(int id);
+
+ /**
+ * @param id in dictionary
+ * @return the Boolean from the dictionary by id
+ */
+ boolean readBoolean(int id);
+
+ /**
+ * @param id in dictionary
+ * @return the Decimal from the dictionary by id
+ */
+ byte[] readDecimal(int id);
+
+ /**
+ * @param id in dictionary
+ * @return the TimestampData from the dictionary by id
+ */
+ TimestampData readTimestamp(int id);
+
+ /**
+ * @param id in dictionary
+ * @return the String from the dictionary by id
+ */
+ byte[] readString(int id);
+
+ /**
+ * @param id in dictionary
+ * @return the Varchar from the dictionary by id
+ */
+ byte[] readVarchar(int id);
+
+ /**
+ * @param id in dictionary
+ * @return the Char from the dictionary by id
+ */
+ byte[] readChar(int id);
+}
+
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetDataColumnReaderFactory.java
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetDataColumnReaderFactory.java
new file mode 100644
index 0000000..861d5cb
--- /dev/null
+++
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetDataColumnReaderFactory.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector.reader;
+
+import org.apache.flink.table.data.TimestampData;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveType;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.sql.Timestamp;
+
+import static
org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.JULIAN_EPOCH_OFFSET_DAYS;
+import static
org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.MILLIS_IN_DAY;
+import static
org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.NANOS_PER_MILLISECOND;
+import static
org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.NANOS_PER_SECOND;
+
+/**
+ * Parquet file has self-describing schema which may differ from the user
required schema (e.g.
+ * schema evolution). This factory is used to retrieve user required typed
data via corresponding
+ * reader which reads the underlying data.
+ */
+public final class ParquetDataColumnReaderFactory {
+
+ private ParquetDataColumnReaderFactory() {
+ }
+
+ /**
+ * default reader for {@link ParquetDataColumnReader}.
+ */
+ public static class DefaultParquetDataColumnReader implements
ParquetDataColumnReader {
+ protected ValuesReader valuesReader;
+ protected Dictionary dict;
+
+ // After the data is read in the parquet type, isValid will be set to true
if the data can
+ // be returned in the type defined in HMS. Otherwise isValid is set to
false.
+ boolean isValid = true;
+
+ public DefaultParquetDataColumnReader(ValuesReader valuesReader) {
+ this.valuesReader = valuesReader;
+ }
+
+ public DefaultParquetDataColumnReader(Dictionary dict) {
+ this.dict = dict;
+ }
+
+ @Override
+ public void initFromPage(int i, ByteBufferInputStream in) throws
IOException {
+ valuesReader.initFromPage(i, in);
+ }
+
+ @Override
+ public boolean readBoolean() {
+ return valuesReader.readBoolean();
+ }
+
+ @Override
+ public boolean readBoolean(int id) {
+ return dict.decodeToBoolean(id);
+ }
+
+ @Override
+ public byte[] readString(int id) {
+ return dict.decodeToBinary(id).getBytesUnsafe();
+ }
+
+ @Override
+ public byte[] readString() {
+ return valuesReader.readBytes().getBytesUnsafe();
+ }
+
+ @Override
+ public byte[] readVarchar() {
+ // we need to enforce the size here even the types are the same
+ return valuesReader.readBytes().getBytesUnsafe();
+ }
+
+ @Override
+ public byte[] readVarchar(int id) {
+ return dict.decodeToBinary(id).getBytesUnsafe();
+ }
+
+ @Override
+ public byte[] readChar() {
+ return valuesReader.readBytes().getBytesUnsafe();
+ }
+
+ @Override
+ public byte[] readChar(int id) {
+ return dict.decodeToBinary(id).getBytesUnsafe();
+ }
+
+ @Override
+ public byte[] readBytes() {
+ return valuesReader.readBytes().getBytesUnsafe();
+ }
+
+ @Override
+ public byte[] readBytes(int id) {
+ return dict.decodeToBinary(id).getBytesUnsafe();
+ }
+
+ @Override
+ public byte[] readDecimal() {
+ return valuesReader.readBytes().getBytesUnsafe();
+ }
+
+ @Override
+ public byte[] readDecimal(int id) {
+ return dict.decodeToBinary(id).getBytesUnsafe();
+ }
+
+ @Override
+ public float readFloat() {
+ return valuesReader.readFloat();
+ }
+
+ @Override
+ public float readFloat(int id) {
+ return dict.decodeToFloat(id);
+ }
+
+ @Override
+ public double readDouble() {
+ return valuesReader.readDouble();
+ }
+
+ @Override
+ public double readDouble(int id) {
+ return dict.decodeToDouble(id);
+ }
+
+ @Override
+ public TimestampData readTimestamp() {
+ throw new RuntimeException("Unsupported operation");
+ }
+
+ @Override
+ public TimestampData readTimestamp(int id) {
+ throw new RuntimeException("Unsupported operation");
+ }
+
+ @Override
+ public int readInteger() {
+ return valuesReader.readInteger();
+ }
+
+ @Override
+ public int readInteger(int id) {
+ return dict.decodeToInt(id);
+ }
+
+ @Override
+ public boolean isValid() {
+ return isValid;
+ }
+
+ @Override
+ public long readLong(int id) {
+ return dict.decodeToLong(id);
+ }
+
+ @Override
+ public long readLong() {
+ return valuesReader.readLong();
+ }
+
+ @Override
+ public int readSmallInt() {
+ return valuesReader.readInteger();
+ }
+
+ @Override
+ public int readSmallInt(int id) {
+ return dict.decodeToInt(id);
+ }
+
+ @Override
+ public int readTinyInt() {
+ return valuesReader.readInteger();
+ }
+
+ @Override
+ public int readTinyInt(int id) {
+ return dict.decodeToInt(id);
+ }
+
+ @Override
+ public int readValueDictionaryId() {
+ return valuesReader.readValueDictionaryId();
+ }
+
+ public void skip() {
+ valuesReader.skip();
+ }
+
+ @Override
+ public Dictionary getDictionary() {
+ return dict;
+ }
+ }
+
+ /**
+ * The reader who reads from the underlying Timestamp value value.
+ */
+ public static class TypesFromInt96PageReader extends
DefaultParquetDataColumnReader {
+ private final boolean isUtcTimestamp;
+
+ public TypesFromInt96PageReader(ValuesReader realReader, boolean
isUtcTimestamp) {
+ super(realReader);
+ this.isUtcTimestamp = isUtcTimestamp;
+ }
+
+ public TypesFromInt96PageReader(Dictionary dict, boolean isUtcTimestamp) {
+ super(dict);
+ this.isUtcTimestamp = isUtcTimestamp;
+ }
+
+ private TimestampData convert(Binary binary) {
+ ByteBuffer buf = binary.toByteBuffer();
+ buf.order(ByteOrder.LITTLE_ENDIAN);
+ long timeOfDayNanos = buf.getLong();
+ int julianDay = buf.getInt();
+ return int96ToTimestamp(isUtcTimestamp, timeOfDayNanos, julianDay);
+ }
+
+ @Override
+ public TimestampData readTimestamp(int id) {
+ return convert(dict.decodeToBinary(id));
+ }
+
+ @Override
+ public TimestampData readTimestamp() {
+ return convert(valuesReader.readBytes());
+ }
+ }
+
+ private static ParquetDataColumnReader getDataColumnReaderByTypeHelper(
+ boolean isDictionary,
+ PrimitiveType parquetType,
+ Dictionary dictionary,
+ ValuesReader valuesReader,
+ boolean isUtcTimestamp) {
+ if (parquetType.getPrimitiveTypeName() ==
PrimitiveType.PrimitiveTypeName.INT96) {
+ return isDictionary
+ ? new TypesFromInt96PageReader(dictionary, isUtcTimestamp)
+ : new TypesFromInt96PageReader(valuesReader, isUtcTimestamp);
+ } else {
+ return isDictionary
+ ? new DefaultParquetDataColumnReader(dictionary)
+ : new DefaultParquetDataColumnReader(valuesReader);
+ }
+ }
+
+ public static ParquetDataColumnReader getDataColumnReaderByTypeOnDictionary(
+ PrimitiveType parquetType, Dictionary realReader, boolean
isUtcTimestamp) {
+ return getDataColumnReaderByTypeHelper(true, parquetType, realReader,
null, isUtcTimestamp);
+ }
+
+ public static ParquetDataColumnReader getDataColumnReaderByType(
+ PrimitiveType parquetType, ValuesReader realReader, boolean
isUtcTimestamp) {
+ return getDataColumnReaderByTypeHelper(
+ false, parquetType, null, realReader, isUtcTimestamp);
+ }
+
+ private static TimestampData int96ToTimestamp(
+ boolean utcTimestamp, long nanosOfDay, int julianDay) {
+ long millisecond = julianDayToMillis(julianDay) + (nanosOfDay /
NANOS_PER_MILLISECOND);
+
+ if (utcTimestamp) {
+ int nanoOfMillisecond = (int) (nanosOfDay % NANOS_PER_MILLISECOND);
+ return TimestampData.fromEpochMillis(millisecond, nanoOfMillisecond);
+ } else {
+ Timestamp timestamp = new Timestamp(millisecond);
+ timestamp.setNanos((int) (nanosOfDay % NANOS_PER_SECOND));
+ return TimestampData.fromTimestamp(timestamp);
+ }
+ }
+
+ private static long julianDayToMillis(int julianDay) {
+ return (julianDay - JULIAN_EPOCH_OFFSET_DAYS) * MILLIS_IN_DAY;
+ }
+}
+
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java
new file mode 100644
index 0000000..39ebb90
--- /dev/null
+++
b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector.reader;
+
+import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
+
+import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Row {@link ColumnReader}.
+ */
+public class RowColumnReader implements ColumnReader<WritableColumnVector> {
+
+ private final List<ColumnReader> fieldReaders;
+
+ public RowColumnReader(List<ColumnReader> fieldReaders) {
+ this.fieldReaders = fieldReaders;
+ }
+
+ @Override
+ public void readToVector(int readNumber, WritableColumnVector vector) throws
IOException {
+ HeapRowColumnVector rowColumnVector = (HeapRowColumnVector) vector;
+ WritableColumnVector[] vectors = rowColumnVector.vectors;
+ for (int i = 0; i < vectors.length; i++) {
+ fieldReaders.get(i).readToVector(readNumber, vectors[i]);
+
+ for (int j = 0; j < readNumber; j++) {
+ boolean isNull = (i == 0)
+ ? vectors[i].isNullAt(j)
+ : rowColumnVector.isNullAt(j) && vectors[i].isNullAt(j);
+ if (isNull) {
+ rowColumnVector.setNullAt(j);
+ }
+ }
+ }
+ }
+}
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index 9eef2fe..9e0da3a 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -1081,6 +1081,62 @@ public class HoodieDataSourceITCase extends
AbstractTestBase {
assertRowsEquals(result, expected);
}
+ @ParameterizedTest
+ @ValueSource(strings = {"insert", "upsert", "bulk_insert"})
+ void testParquetComplexTypes(String operation) {
+ TableEnvironment tableEnv = batchTableEnv;
+
+ String hoodieTableDDL = sql("t1")
+ .field("f_int int")
+ .field("f_array array<varchar(10)>")
+ .field("f_map map<varchar(20), int>")
+ .field("f_row row(f_row_f0 int, f_row_f1 varchar(10))")
+ .pkField("f_int")
+ .noPartition()
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .option(FlinkOptions.OPERATION, operation)
+ .end();
+ tableEnv.executeSql(hoodieTableDDL);
+
+ execInsertSql(tableEnv, TestSQL.COMPLEX_TYPE_INSERT_T1);
+
+ List<Row> result = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+ final String expected = "["
+ + "+I[1, [abc1, def1], {abc1=1, def1=3}, +I[1, abc1]], "
+ + "+I[2, [abc2, def2], {def2=3, abc2=1}, +I[2, abc2]], "
+ + "+I[3, [abc3, def3], {def3=3, abc3=1}, +I[3, abc3]]]";
+ assertRowsEquals(result, expected);
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"insert", "upsert", "bulk_insert"})
+ void testParquetComplexNestedRowTypes(String operation) {
+ TableEnvironment tableEnv = batchTableEnv;
+
+ String hoodieTableDDL = sql("t1")
+ .field("f_int int")
+ .field("f_array array<varchar(10)>")
+ .field("f_map map<varchar(20), int>")
+ .field("f_row row(f_nested_array array<varchar(10)>, f_nested_row
row(f_row_f0 int, f_row_f1 varchar(10)))")
+ .pkField("f_int")
+ .noPartition()
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .option(FlinkOptions.OPERATION, operation)
+ .end();
+ tableEnv.executeSql(hoodieTableDDL);
+
+ execInsertSql(tableEnv, TestSQL.COMPLEX_NESTED_ROW_TYPE_INSERT_T1);
+
+ List<Row> result = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+ final String expected = "["
+ + "+I[1, [abc1, def1], {abc1=1, def1=3}, +I[[abc1, def1], +I[1,
abc1]]], "
+ + "+I[2, [abc2, def2], {def2=3, abc2=1}, +I[[abc2, def2], +I[2,
abc2]]], "
+ + "+I[3, [abc3, def3], {def3=3, abc3=1}, +I[[abc3, def3], +I[3,
abc3]]]]";
+ assertRowsEquals(result, expected);
+ }
+
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
b/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
index 9dc78aa..595d142 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
@@ -51,4 +51,14 @@ public class TestSQL {
+ "('id9','Jane',19,TIMESTAMP '1970-01-01 00:00:06','par3'),\n"
+ "('id10','Ella',38,TIMESTAMP '1970-01-01 00:00:07','par4'),\n"
+ "('id11','Phoebe',52,TIMESTAMP '1970-01-01 00:00:08','par4')";
+
+ public static final String COMPLEX_TYPE_INSERT_T1 = "insert into t1 values\n"
+ + "(1, array['abc1', 'def1'], map['abc1', 1, 'def1', 3], row(1,
'abc1')),\n"
+ + "(2, array['abc2', 'def2'], map['abc2', 1, 'def2', 3], row(2,
'abc2')),\n"
+ + "(3, array['abc3', 'def3'], map['abc3', 1, 'def3', 3], row(3,
'abc3'))";
+
+ public static final String COMPLEX_NESTED_ROW_TYPE_INSERT_T1 = "insert into
t1 values\n"
+ + "(1, array['abc1', 'def1'], map['abc1', 1, 'def1', 3],
row(array['abc1', 'def1'], row(1, 'abc1'))),\n"
+ + "(2, array['abc2', 'def2'], map['abc2', 1, 'def2', 3],
row(array['abc2', 'def2'], row(2, 'abc2'))),\n"
+ + "(3, array['abc3', 'def3'], map['abc3', 1, 'def3', 3],
row(array['abc3', 'def3'], row(3, 'abc3')))";
}