This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 47bf4e41342e feat(flink): Wire Flink 2.1 nested Parquet readers into
the Hudi read path (FLINK-35702) (#18700)
47bf4e41342e is described below
commit 47bf4e41342e4f1dab26a7fb1489f278fbd1226c
Author: Shihuan Liu <[email protected]>
AuthorDate: Thu May 7 20:07:05 2026 -0700
feat(flink): Wire Flink 2.1 nested Parquet readers into the Hudi read path
(FLINK-35702) (#18700)
---
.../table/format/cow/ParquetSplitReaderUtil.java | 686 ++++++++++++---------
.../format/cow/vector/HeapMapColumnVector.java | 19 +-
.../cow/vector/reader/NestedColumnReader.java | 22 +
.../reader/ParquetColumnarRowSplitReader.java | 34 +-
4 files changed, 472 insertions(+), 289 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index 2bb5be1d9614..5468dc86a25a 100644
---
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,19 +19,18 @@
package org.apache.hudi.table.format.cow;
import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.table.format.cow.vector.HeapArrayGroupColumnVector;
import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
import org.apache.hudi.table.format.cow.vector.HeapDecimalVector;
import org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
-import org.apache.hudi.table.format.cow.vector.reader.ArrayColumnReader;
-import org.apache.hudi.table.format.cow.vector.reader.ArrayGroupReader;
import org.apache.hudi.table.format.cow.vector.reader.EmptyColumnReader;
import
org.apache.hudi.table.format.cow.vector.reader.FixedLenBytesColumnReader;
import
org.apache.hudi.table.format.cow.vector.reader.Int64TimestampColumnReader;
-import org.apache.hudi.table.format.cow.vector.reader.MapColumnReader;
+import org.apache.hudi.table.format.cow.vector.reader.NestedColumnReader;
import
org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader;
-import org.apache.hudi.table.format.cow.vector.reader.RowColumnReader;
+import org.apache.hudi.table.format.cow.vector.type.ParquetField;
+import org.apache.hudi.table.format.cow.vector.type.ParquetGroupField;
+import org.apache.hudi.table.format.cow.vector.type.ParquetPrimitiveField;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.parquet.vector.reader.BooleanColumnReader;
@@ -64,12 +63,14 @@ import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.LogicalTypeFamily;
-import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.ParquetRuntimeException;
import org.apache.parquet.column.ColumnDescriptor;
@@ -77,12 +78,18 @@ import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.filter.UnboundRecordFilter;
import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.io.ColumnIO;
+import org.apache.parquet.io.GroupColumnIO;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.PrimitiveColumnIO;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.InvalidSchemaException;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.math.BigDecimal;
import java.sql.Date;
@@ -90,25 +97,38 @@ import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.stream.Collectors;
import static org.apache.flink.table.utils.DateTimeUtils.toInternal;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.apache.parquet.Preconditions.checkArgument;
+import static org.apache.parquet.schema.Type.Repetition.REPEATED;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
/**
* Util for generating {@link ParquetColumnarRowSplitReader}.
*
- * <p>NOTE: reference from Flink release 1.11.2 {@code
ParquetSplitReaderUtil}, modify to support INT64
- * based TIMESTAMP_MILLIS as ConvertedType, should remove when Flink supports
that.
+ * <p>Uses the Dremel-style nested reader ported from Apache Flink 2.1
(FLINK-35702). For primitive
+ * top-level columns we keep Hudi's specialized readers — {@link
Int64TimestampColumnReader},
+ * {@link FixedLenBytesColumnReader}, and the Hudi {@link HeapDecimalVector} —
unchanged. For
+ * nested types (ARRAY / MAP / MULTISET / ROW) we build a {@link ParquetField}
tree once per
+ * split via {@link #buildFieldsList(List, List, MessageColumnIO)} and
delegate reading to
+ * {@link NestedColumnReader}.
+ *
+ * <p>Schema evolution: missing top-level fields are still handled by the
caller
+ * ({@link ParquetColumnarRowSplitReader} patches them with null vectors).
Missing fields
+ * <em>inside</em> a Row are handled here — {@link #constructField} returns
{@code null} for a
+ * child that isn't physically present, and the corresponding child in the
pre-allocated vector
+ * is filled with nulls via {@link #createVectorFromConstant} so the Dremel
assembler can
+ * passthrough the slot (see {@link NestedColumnReader#readToVector}).
*/
public class ParquetSplitReaderUtil {
- /**
- * Util for generating partitioned {@link ParquetColumnarRowSplitReader}.
- */
+ /** Util for generating partitioned {@link ParquetColumnarRowSplitReader}. */
public static ParquetColumnarRowSplitReader genPartColumnarRowReader(
boolean utcTimestamp,
boolean caseSensitive,
@@ -182,10 +202,13 @@ public class ParquetSplitReaderUtil {
return readVector;
}
- private static ColumnVector createVectorFromConstant(
- LogicalType type,
- Object value,
- int batchSize) {
+ /**
+ * Builds a constant-filled column vector for either a partition column
(non-null value) or a
+ * missing-column slot (null value). Used both at the batch-generator level
for partition
+ * injection and at the row-reader level for fields absent from the Parquet
file.
+ */
+ public static ColumnVector createVectorFromConstant(
+ LogicalType type, Object value, int batchSize) {
switch (type.getTypeRoot()) {
case CHAR:
case VARCHAR:
@@ -278,6 +301,7 @@ public class ParquetSplitReaderUtil {
value == null ? null : toInternal((Date) value),
batchSize);
case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
HeapTimestampVector tv = new HeapTimestampVector(batchSize);
if (value == null) {
tv.fillWithNulls();
@@ -286,46 +310,41 @@ public class ParquetSplitReaderUtil {
}
return tv;
case ARRAY:
- ArrayType arrayType = (ArrayType) type;
- if (arrayType.getElementType().isAnyOf(LogicalTypeFamily.CONSTRUCTED))
{
- HeapArrayGroupColumnVector arrayGroup = new
HeapArrayGroupColumnVector(batchSize);
- if (value == null) {
- arrayGroup.fillWithNulls();
- return arrayGroup;
- } else {
- throw new UnsupportedOperationException("Unsupported create array
with default value.");
- }
- } else {
- HeapArrayVector arrayVector = new HeapArrayVector(batchSize);
- if (value == null) {
- arrayVector.fillWithNulls();
- return arrayVector;
- } else {
- throw new UnsupportedOperationException("Unsupported create array
with default value.");
- }
+ if (value != null) {
+ throw new UnsupportedOperationException("Unsupported create array
with default value.");
}
+ HeapArrayVector arrayVector = new HeapArrayVector(batchSize);
+ arrayVector.fillWithNulls();
+ return arrayVector;
case MAP:
- HeapMapColumnVector mapVector = new HeapMapColumnVector(batchSize,
null, null);
- if (value == null) {
- mapVector.fillWithNulls();
- return mapVector;
- } else {
- throw new UnsupportedOperationException("Unsupported create map with
default value.");
+ case MULTISET:
+ if (value != null) {
+ throw new UnsupportedOperationException(
+ "Unsupported create " + type.getTypeRoot() + " with default
value.");
}
+ HeapMapColumnVector mapVector = new HeapMapColumnVector(batchSize,
null, null);
+ mapVector.fillWithNulls();
+ return mapVector;
case ROW:
- HeapRowColumnVector rowVector = new HeapRowColumnVector(batchSize);
- if (value == null) {
- rowVector.fillWithNulls();
- return rowVector;
- } else {
+ if (value != null) {
throw new UnsupportedOperationException("Unsupported create row with
default value.");
}
+ RowType rowType = (RowType) type;
+ WritableColumnVector[] childVectors = new
WritableColumnVector[rowType.getFieldCount()];
+ for (int i = 0; i < childVectors.length; i++) {
+ childVectors[i] =
+ (WritableColumnVector)
createVectorFromConstant(rowType.getTypeAt(i), null, batchSize);
+ }
+ HeapRowColumnVector rowVector = new HeapRowColumnVector(batchSize,
childVectors);
+ rowVector.fillWithNulls();
+ return rowVector;
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
}
}
- private static List<ColumnDescriptor> filterDescriptors(int depth, Type
type, List<ColumnDescriptor> columns) throws ParquetRuntimeException {
+ private static List<ColumnDescriptor> filterDescriptors(
+ int depth, Type type, List<ColumnDescriptor> columns) throws
ParquetRuntimeException {
List<ColumnDescriptor> filtered = new ArrayList<>();
for (ColumnDescriptor descriptor : columns) {
if (depth >= descriptor.getPath().length) {
@@ -339,24 +358,61 @@ public class ParquetSplitReaderUtil {
return filtered;
}
+ /**
+ * Creates a {@link ColumnReader} for one top-level requested field. For
primitive types the
+ * Hudi-specialized reader path is used. For nested types ({@code ARRAY},
{@code MAP},
+ * {@code MULTISET}, {@code ROW}) the Dremel-style {@link
NestedColumnReader} is used, driven by
+ * the supplied pre-built {@link ParquetField} tree.
+ *
+ * @param field the {@link ParquetField} tree for this column, built by
+ * {@link #buildFieldsList(List, List, MessageColumnIO)}. Required
(non-null) for nested
+ * types; ignored for primitives.
+ */
+ public static ColumnReader createColumnReader(
+ boolean utcTimestamp,
+ LogicalType fieldType,
+ Type physicalType,
+ List<ColumnDescriptor> descriptors,
+ PageReadStore pages,
+ @Nullable ParquetField field) throws IOException {
+ switch (fieldType.getTypeRoot()) {
+ case ARRAY:
+ case MAP:
+ case MULTISET:
+ case ROW:
+ Preconditions.checkNotNull(
+ field, "ParquetField must be provided for nested type: %s",
fieldType);
+ return new NestedColumnReader(utcTimestamp, pages, field);
+ default:
+ return createPrimitiveColumnReader(utcTimestamp, fieldType,
physicalType, descriptors, pages);
+ }
+ }
+
+ /**
+ * Backward-compat entry point kept for callers that don't project nested
types and therefore
+ * never need a {@link ParquetField} tree. Forwards to the {@link
ParquetField}-aware overload
+ * with a null field; nested types now go through that overload directly.
+ *
+ * @deprecated use {@link #createColumnReader(boolean, LogicalType, Type,
List, PageReadStore,
+ * ParquetField)} so nested types take the Dremel path.
+ */
+ @Deprecated
public static ColumnReader createColumnReader(
boolean utcTimestamp,
LogicalType fieldType,
Type physicalType,
List<ColumnDescriptor> descriptors,
PageReadStore pages) throws IOException {
- return createColumnReader(utcTimestamp, fieldType, physicalType,
descriptors,
- pages, 0);
+ return createColumnReader(utcTimestamp, fieldType, physicalType,
descriptors, pages, null);
}
- private static ColumnReader createColumnReader(
+ private static ColumnReader createPrimitiveColumnReader(
boolean utcTimestamp,
LogicalType fieldType,
Type physicalType,
List<ColumnDescriptor> columns,
- PageReadStore pages,
- int depth) throws IOException {
- List<ColumnDescriptor> descriptors = filterDescriptors(depth,
physicalType, columns);
+ PageReadStore pages) throws IOException {
+ List<ColumnDescriptor> descriptors = filterDescriptors(0, physicalType,
columns);
ColumnDescriptor descriptor = descriptors.get(0);
PageReader pageReader = pages.getPageReader(descriptor);
switch (fieldType.getTypeRoot()) {
@@ -392,7 +448,9 @@ public class ParquetSplitReaderUtil {
case INT96:
return new TimestampColumnReader(utcTimestamp, descriptor,
pageReader);
default:
- throw new AssertionError();
+ throw new AssertionError(
+ "Unexpected physical type for TIMESTAMP: "
+ + descriptor.getPrimitiveType().getPrimitiveTypeName());
}
case DECIMAL:
switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
@@ -403,106 +461,23 @@ public class ParquetSplitReaderUtil {
case BINARY:
return new BytesColumnReader(descriptor, pageReader);
case FIXED_LEN_BYTE_ARRAY:
- return new FixedLenBytesColumnReader(
- descriptor, pageReader);
+ return new FixedLenBytesColumnReader(descriptor, pageReader);
default:
- throw new AssertionError();
- }
- case ARRAY:
- ArrayType arrayType = (ArrayType) fieldType;
- if (arrayType.getElementType().isAnyOf(LogicalTypeFamily.CONSTRUCTED))
{
- boolean isThreeLevelList = isThreeLevelList(physicalType);
- // 3-level List structure, drill down 2 level to get type for
`element`
- Type elementType = isThreeLevelList
- ? physicalType.asGroupType().getType(0).asGroupType().getType(0)
- : physicalType.asGroupType().getType(0);
- int elementDepth = isThreeLevelList ? depth + 2 : depth + 1;
- return new ArrayGroupReader(createColumnReader(
- utcTimestamp,
- arrayType.getElementType(),
- elementType,
- descriptors,
- pages,
- elementDepth));
- } else {
- return new ArrayColumnReader(
- descriptor,
- pageReader,
- utcTimestamp,
- descriptor.getPrimitiveType(),
- fieldType);
- }
- case MAP:
- MapType mapType = (MapType) fieldType;
- ArrayColumnReader keyReader =
- new ArrayColumnReader(
- descriptor,
- pageReader,
- utcTimestamp,
- descriptor.getPrimitiveType(),
- new ArrayType(mapType.getKeyType()));
- ColumnReader<WritableColumnVector> valueReader;
- if (mapType.getValueType().isAnyOf(LogicalTypeFamily.CONSTRUCTED)) {
- valueReader = new ArrayGroupReader(createColumnReader(
- utcTimestamp,
- mapType.getValueType(),
- physicalType.asGroupType().getType(0).asGroupType().getType(1),
// Get the value physical type
- descriptors.subList(1, descriptors.size()), // remove the key
descriptor
- pages,
- depth + 2)); // increase the depth by 2, because there's a
key_value entry in the path
- } else {
- valueReader = new ArrayColumnReader(
- descriptors.get(1),
- pages.getPageReader(descriptors.get(1)),
- utcTimestamp,
- descriptors.get(1).getPrimitiveType(),
- new ArrayType(mapType.getValueType()));
+ throw new AssertionError(
+ "Unexpected physical type for DECIMAL: "
+ + descriptor.getPrimitiveType().getPrimitiveTypeName());
}
- return new MapColumnReader(keyReader, valueReader);
- case ROW:
- RowType rowType = (RowType) fieldType;
- GroupType groupType = physicalType.asGroupType();
- List<ColumnReader> fieldReaders = new ArrayList<>();
- for (int i = 0; i < rowType.getFieldCount(); i++) {
- // schema evolution: read the parquet file with a new extended field
name.
- int fieldIndex =
getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
- if (fieldIndex < 0) {
- fieldReaders.add(new EmptyColumnReader());
- } else {
- // Check for nested row in array with atomic field type.
-
- // This is done to meet the Parquet field algorithm that pushes
multiplicity and structures down to individual fields.
- // In Parquet, an array of rows is stored as separate arrays for
each field.
-
- // Limitations: It won't work for multiple nested arrays and maps.
- // The main problem is that the Flink classes and interface don't
follow that pattern.
- if (descriptors.get(fieldIndex).getMaxRepetitionLevel() > 0 &&
!rowType.getTypeAt(i).is(LogicalTypeRoot.ARRAY)) {
- fieldReaders.add(
- createColumnReader(
- utcTimestamp,
- new ArrayType(rowType.getTypeAt(i).isNullable(),
rowType.getTypeAt(i)),
- groupType.getType(fieldIndex),
- descriptors,
- pages,
- depth + 1));
- } else {
- fieldReaders.add(
- createColumnReader(
- utcTimestamp,
- rowType.getTypeAt(i),
- groupType.getType(fieldIndex),
- descriptors,
- pages,
- depth + 1));
- }
- }
- }
- return new RowColumnReader(fieldReaders);
default:
throw new UnsupportedOperationException(fieldType + " is not supported
now.");
}
}
+ /**
+ * Creates the writable column vector that the reader will write into. The
returned vector shape
+ * matches {@code fieldType}; for ROW types missing physical fields are
slotted with null-filled
+ * vectors (sourced from {@link #createVectorFromConstant}) so that the
Dremel assembler in
+ * {@link NestedColumnReader} can pass them through unchanged.
+ */
public static WritableColumnVector createWritableColumnVector(
int batchSize,
LogicalType fieldType,
@@ -523,40 +498,48 @@ public class ParquetSplitReaderUtil {
switch (fieldType.getTypeRoot()) {
case BOOLEAN:
checkArgument(
- typeName == PrimitiveType.PrimitiveTypeName.BOOLEAN,
getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
+ typeName == PrimitiveType.PrimitiveTypeName.BOOLEAN,
+ getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
return new HeapBooleanVector(batchSize);
case TINYINT:
checkArgument(
- typeName == PrimitiveType.PrimitiveTypeName.INT32,
getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
+ typeName == PrimitiveType.PrimitiveTypeName.INT32,
+ getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
return new HeapByteVector(batchSize);
case DOUBLE:
checkArgument(
- typeName == PrimitiveType.PrimitiveTypeName.DOUBLE,
getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
+ typeName == PrimitiveType.PrimitiveTypeName.DOUBLE,
+ getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
return new HeapDoubleVector(batchSize);
case FLOAT:
checkArgument(
- typeName == PrimitiveType.PrimitiveTypeName.FLOAT,
getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
+ typeName == PrimitiveType.PrimitiveTypeName.FLOAT,
+ getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
return new HeapFloatVector(batchSize);
case INTEGER:
case DATE:
case TIME_WITHOUT_TIME_ZONE:
checkArgument(
- typeName == PrimitiveType.PrimitiveTypeName.INT32,
getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
+ typeName == PrimitiveType.PrimitiveTypeName.INT32,
+ getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
return new HeapIntVector(batchSize);
case BIGINT:
checkArgument(
- typeName == PrimitiveType.PrimitiveTypeName.INT64,
getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
+ typeName == PrimitiveType.PrimitiveTypeName.INT64,
+ getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
return new HeapLongVector(batchSize);
case SMALLINT:
checkArgument(
- typeName == PrimitiveType.PrimitiveTypeName.INT32,
getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
+ typeName == PrimitiveType.PrimitiveTypeName.INT32,
+ getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
return new HeapShortVector(batchSize);
case CHAR:
case VARCHAR:
case BINARY:
case VARBINARY:
checkArgument(
- typeName == PrimitiveType.PrimitiveTypeName.BINARY,
getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
+ typeName == PrimitiveType.PrimitiveTypeName.BINARY,
+ getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
return new HeapBytesVector(batchSize);
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
@@ -566,112 +549,64 @@ public class ParquetSplitReaderUtil {
case DECIMAL:
checkArgument(
(typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
- || typeName == PrimitiveType.PrimitiveTypeName.BINARY)
+ || typeName == PrimitiveType.PrimitiveTypeName.BINARY)
&& primitiveType.getOriginalType() == OriginalType.DECIMAL,
getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
return new HeapDecimalVector(batchSize);
case ARRAY:
ArrayType arrayType = (ArrayType) fieldType;
- if (arrayType.getElementType().isAnyOf(LogicalTypeFamily.CONSTRUCTED))
{
- boolean isThreeLevelList = isThreeLevelList(physicalType);
- // 3-level List structure, drill down 2 level to get type for
`element`
- Type elementType = isThreeLevelList
- ? physicalType.asGroupType().getType(0).asGroupType().getType(0)
- : physicalType.asGroupType().getType(0);
- int elementDepth = isThreeLevelList ? depth + 2 : depth + 1;
- return new HeapArrayGroupColumnVector(
- batchSize,
- createWritableColumnVector(
- batchSize,
- arrayType.getElementType(),
- elementType,
- descriptors,
- elementDepth));
- } else {
- return new HeapArrayVector(
- batchSize,
- createWritableColumnVector(
- batchSize,
- arrayType.getElementType(),
- physicalType,
- descriptors,
- depth));
- }
- case MAP:
+ return new HeapArrayVector(
+ batchSize,
+ createWritableColumnVector(
+ batchSize, arrayType.getElementType(), physicalType,
descriptors, depth));
+ case MAP: {
MapType mapType = (MapType) fieldType;
- GroupType repeatedType =
physicalType.asGroupType().getType(0).asGroupType();
- // the map column has three level paths.
- WritableColumnVector keyColumnVector = createWritableColumnVector(
+ GroupType repeatedType = unwrapMapRepeatedType(physicalType);
+ return new HeapMapColumnVector(
batchSize,
- new ArrayType(mapType.getKeyType().isNullable(),
mapType.getKeyType()),
- repeatedType.getType(0),
- descriptors,
- depth + 2);
- WritableColumnVector valueColumnVector;
- if (mapType.getValueType().isAnyOf(LogicalTypeFamily.CONSTRUCTED)) {
- valueColumnVector = new HeapArrayGroupColumnVector(
- batchSize,
- createWritableColumnVector(
- batchSize,
- mapType.getValueType(),
- repeatedType.getType(1).asGroupType(),
- descriptors,
- depth + 2));
- } else {
- valueColumnVector = createWritableColumnVector(
- batchSize,
- new ArrayType(mapType.getValueType().isNullable(),
mapType.getValueType()),
- repeatedType.getType(1),
- descriptors,
- depth + 2);
- }
- return new HeapMapColumnVector(batchSize, keyColumnVector,
valueColumnVector);
+ createWritableColumnVector(
+ batchSize, mapType.getKeyType(), repeatedType.getType(0),
descriptors, depth + 2),
+ createWritableColumnVector(
+ batchSize, mapType.getValueType(), repeatedType.getType(1),
descriptors, depth + 2));
+ }
+ case MULTISET: {
+ MultisetType multisetType = (MultisetType) fieldType;
+ GroupType repeatedType = unwrapMapRepeatedType(physicalType);
+ return new HeapMapColumnVector(
+ batchSize,
+ createWritableColumnVector(
+ batchSize,
+ multisetType.getElementType(),
+ repeatedType.getType(0),
+ descriptors,
+ depth + 2),
+ createWritableColumnVector(
+ batchSize,
+ new IntType(false),
+ repeatedType.getType(1),
+ descriptors,
+ depth + 2));
+ }
case ROW:
RowType rowType = (RowType) fieldType;
GroupType groupType = physicalType.asGroupType();
WritableColumnVector[] columnVectors = new
WritableColumnVector[rowType.getFieldCount()];
for (int i = 0; i < columnVectors.length; i++) {
- // schema evolution: read the file with a new extended field name.
int fieldIndex =
getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
if (fieldIndex < 0) {
- // Check for nested row in array with atomic field type.
-
- // This is done to meet the Parquet field algorithm that pushes
multiplicity and structures down to individual fields.
- // In Parquet, an array of rows is stored as separate arrays for
each field.
-
- // Limitations: It won't work for multiple nested arrays and maps.
- // The main problem is that the Flink classes and interface don't
follow that pattern.
- if (groupType.getRepetition().equals(Type.Repetition.REPEATED) &&
!rowType.getTypeAt(i).is(LogicalTypeRoot.ARRAY)) {
- columnVectors[i] = (WritableColumnVector)
createVectorFromConstant(
- new ArrayType(rowType.getTypeAt(i).isNullable(),
rowType.getTypeAt(i)), null, batchSize);
- } else {
- columnVectors[i] = (WritableColumnVector)
createVectorFromConstant(rowType.getTypeAt(i), null, batchSize);
- }
+ // Schema evolution: logical field is absent from the Parquet
file. Slot a null-filled
+ // vector of the correct shape; NestedColumnReader.readRow will
pass it through when the
+ // matching ParquetField child is null.
+ columnVectors[i] =
+ (WritableColumnVector)
createVectorFromConstant(rowType.getTypeAt(i), null, batchSize);
} else {
- // Check for nested row in array with atomic field type.
-
- // This is done to meet the Parquet field algorithm that pushes
multiplicity and structures down to individual fields.
- // In Parquet, an array of rows is stored as separate arrays for
each field.
-
- // Limitations: It won't work for multiple nested arrays and maps.
- // The main problem is that the Flink classes and interface don't
follow that pattern.
- if (descriptors.get(fieldIndex).getMaxRepetitionLevel() > 0 &&
!rowType.getTypeAt(i).is(LogicalTypeRoot.ARRAY)) {
- columnVectors[i] =
- createWritableColumnVector(
- batchSize,
- new ArrayType(rowType.getTypeAt(i).isNullable(),
rowType.getTypeAt(i)),
- groupType.getType(fieldIndex),
- descriptors,
- depth + 1);
- } else {
- columnVectors[i] =
- createWritableColumnVector(
- batchSize,
- rowType.getTypeAt(i),
- groupType.getType(fieldIndex),
- descriptors,
- depth + 1);
- }
+ columnVectors[i] =
+ createWritableColumnVector(
+ batchSize,
+ rowType.getTypeAt(i),
+ groupType.getType(fieldIndex),
+ descriptors,
+ depth + 1);
}
}
return new HeapRowColumnVector(batchSize, columnVectors);
@@ -681,56 +616,245 @@ public class ParquetSplitReaderUtil {
}
/**
- * Returns the field index with given physical row type {@code groupType}
and field name {@code fieldName}.
- *
- * @return The physical field index or -1 if the field does not exist
+ * Peels one {@code repeated group key_value} wrapper off a MAP / MULTISET
physical type, matching
+ * Parquet's canonical 3-level map encoding.
*/
- private static int getFieldIndexInPhysicalType(String fieldName, GroupType
groupType) {
- // get index from fileSchema type, else, return -1
- return groupType.containsField(fieldName) ?
groupType.getFieldIndex(fieldName) : -1;
+ private static GroupType unwrapMapRepeatedType(Type physicalType) {
+ return physicalType.asGroupType().getType(0).asGroupType();
}
+ //
------------------------------------------------------------------------------------------
+ // ParquetField tree construction (vendored from Apache Flink 2.1
ParquetSplitReaderUtil)
+ //
+ // The only Hudi-specific divergence is in `constructField`: the ROW branch
tolerates children
+ // missing from the Parquet file by emitting a null ParquetField child
(upstream throws). This
+ // matches the Hudi schema-evolution contract and is the companion to the
null-child branch in
+ // `NestedColumnReader#readRow` and the null-vector slot in
`createWritableColumnVector#ROW`.
+ //
------------------------------------------------------------------------------------------
+
/**
- * Check whether the given list type is a three-level list type.
- * <p>
- * <list-repetition> group <name> (LIST) {
- * repeated group list {
- * <element-repetition> <element-type> element;
- * }
- * }
- *
- * @param type list type
- * @return true if the list type is a three-level list type
+ * Builds {@link ParquetField} trees — one per top-level projected logical
column — that feed
+ * {@link NestedColumnReader}. The returned list mirrors the input {@code
children} positionally;
+ * primitive top-level fields produce {@code null} entries (callers don't
need a tree for those).
+ */
+ public static List<ParquetField> buildFieldsList(
+ List<RowType.RowField> children, List<String> fieldNames,
MessageColumnIO columnIO) {
+ List<ParquetField> list = new ArrayList<>();
+ for (int i = 0; i < children.size(); i++) {
+ RowType.RowField child = children.get(i);
+ if (isNestedType(child.getType())) {
+ list.add(constructField(child, lookupColumnByName(columnIO,
fieldNames.get(i))));
+ } else {
+ list.add(null);
+ }
+ }
+ return list;
+ }
+
+ private static boolean isNestedType(LogicalType type) {
+ return type instanceof RowType
+ || type instanceof ArrayType
+ || type instanceof MapType
+ || type instanceof MultisetType;
+ }
+
+ @Nullable
+ private static ParquetField constructField(RowType.RowField rowField,
ColumnIO columnIO) {
+ boolean required = columnIO.getType().getRepetition() == REQUIRED;
+ int repetitionLevel = columnIO.getRepetitionLevel();
+ int definitionLevel = columnIO.getDefinitionLevel();
+ LogicalType type = rowField.getType();
+ String fieldName = rowField.getName();
+ if (type instanceof RowType) {
+ GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
+ RowType rowType = (RowType) type;
+ List<RowType.RowField> childFields = rowType.getFields();
+ List<ParquetField> fieldsList = new ArrayList<>(childFields.size());
+ for (RowType.RowField childField : childFields) {
+ // Hudi schema evolution: a logical child may be absent from the
Parquet file. In that
+ // case we emit a null ParquetField so that NestedColumnReader.readRow
passes through the
+ // pre-filled null vector instead of recursing.
+ ColumnIO childIo = lookupColumnByNameOrNull(groupColumnIO,
childField.getName());
+ if (childIo == null) {
+ fieldsList.add(null);
+ } else {
+ fieldsList.add(constructField(childField, childIo));
+ }
+ }
+ return new ParquetGroupField(
+ type,
+ repetitionLevel,
+ definitionLevel,
+ required,
+ Collections.unmodifiableList(fieldsList));
+ }
+
+ if (type instanceof MapType) {
+ GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
+ GroupColumnIO keyValueColumnIO = getMapKeyValueColumn(groupColumnIO);
+ MapType mapType = (MapType) type;
+ ParquetField keyField =
+ constructField(
+ new RowType.RowField("", mapType.getKeyType()),
keyValueColumnIO.getChild(0));
+ ParquetField valueField =
+ constructField(
+ new RowType.RowField("", mapType.getValueType()),
keyValueColumnIO.getChild(1));
+ return new ParquetGroupField(
+ type,
+ repetitionLevel,
+ definitionLevel,
+ required,
+ Collections.unmodifiableList(Arrays.asList(keyField, valueField)));
+ }
+
+ if (type instanceof MultisetType) {
+ GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
+ GroupColumnIO keyValueColumnIO = getMapKeyValueColumn(groupColumnIO);
+ MultisetType multisetType = (MultisetType) type;
+ ParquetField keyField =
+ constructField(
+ new RowType.RowField("", multisetType.getElementType()),
+ keyValueColumnIO.getChild(0));
+ ParquetField valueField =
+ constructField(
+ new RowType.RowField("", new IntType()),
keyValueColumnIO.getChild(1));
+ return new ParquetGroupField(
+ type,
+ repetitionLevel,
+ definitionLevel,
+ required,
+ Collections.unmodifiableList(Arrays.asList(keyField, valueField)));
+ }
+
+ if (type instanceof ArrayType) {
+ ArrayType arrayType = (ArrayType) type;
+ ColumnIO elementTypeColumnIO;
+ if (columnIO instanceof GroupColumnIO) {
+ GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
+ if (!StringUtils.isNullOrWhitespaceOnly(fieldName)) {
+ while (!Objects.equals(groupColumnIO.getName(), fieldName)) {
+ groupColumnIO = (GroupColumnIO) groupColumnIO.getChild(0);
+ }
+ elementTypeColumnIO = groupColumnIO;
+ } else {
+ if (arrayType.getElementType() instanceof RowType) {
+ elementTypeColumnIO = groupColumnIO;
+ } else {
+ elementTypeColumnIO = groupColumnIO.getChild(0);
+ }
+ }
+ } else if (columnIO instanceof PrimitiveColumnIO) {
+ elementTypeColumnIO = columnIO;
+ } else {
+ throw new FlinkRuntimeException(String.format("Unknown ColumnIO, %s",
columnIO));
+ }
+
+ ParquetField elementField =
+ constructField(
+ new RowType.RowField("", arrayType.getElementType()),
+ getArrayElementColumn(elementTypeColumnIO));
+ if (repetitionLevel == elementField.getRepetitionLevel()) {
+ repetitionLevel = columnIO.getParent().getRepetitionLevel();
+ }
+ return new ParquetGroupField(
+ type,
+ repetitionLevel,
+ definitionLevel,
+ required,
+ Collections.singletonList(elementField));
+ }
+
+ PrimitiveColumnIO primitiveColumnIO = (PrimitiveColumnIO) columnIO;
+ return new ParquetPrimitiveField(
+ type, required, primitiveColumnIO.getColumnDescriptor(),
primitiveColumnIO.getId());
+ }
+
+ /**
+ * Parquet column names are case-insensitive in Flink's lookup. Matches
upstream
+ * {@code ParquetSplitReaderUtil.lookupColumnByName}; throws when absent.
*/
- private static boolean isThreeLevelList(Type type) {
- if (type.isPrimitive()) {
- return false;
+ public static ColumnIO lookupColumnByName(GroupColumnIO groupColumnIO,
String columnName) {
+ ColumnIO columnIO = lookupColumnByNameOrNull(groupColumnIO, columnName);
+ if (columnIO != null) {
+ return columnIO;
}
- GroupType groupType = type.asGroupType();
- OriginalType originalType = groupType.getOriginalType();
- return originalType == OriginalType.LIST
- && groupType.getType(0).getName().equals("list");
+ throw new FlinkRuntimeException(
+ "Can not find column io for parquet reader. Column name: " +
columnName);
}
/**
- * Construct the error message when primitive type mismatches.
- *
- * @param primitiveType Primitive type
- * @param fieldType Logical field type
- * @return The error message
+ * Case-insensitive column lookup that returns {@code null} when no match is
found — the
+ * Hudi-specific companion to {@link #lookupColumnByName}, used by {@link
#constructField} to
+ * emit null {@link ParquetField} children for fields absent from the
Parquet file.
*/
- private static String
getPrimitiveTypeCheckFailureMessage(PrimitiveType.PrimitiveTypeName
primitiveType, LogicalType fieldType) {
- return String.format("Unexpected type exception. Primitive type: %s. Field
type: %s.", primitiveType, fieldType.getTypeRoot().name());
+ @Nullable
+ private static ColumnIO lookupColumnByNameOrNull(
+ GroupColumnIO groupColumnIO, String columnName) {
+ ColumnIO columnIO = groupColumnIO.getChild(columnName);
+ if (columnIO != null) {
+ return columnIO;
+ }
+ for (int i = 0; i < groupColumnIO.getChildrenCount(); i++) {
+ if (groupColumnIO.getChild(i).getName().equalsIgnoreCase(columnName)) {
+ return groupColumnIO.getChild(i);
+ }
+ }
+ return null;
+ }
+
+ public static GroupColumnIO getMapKeyValueColumn(GroupColumnIO
groupColumnIO) {
+ while (groupColumnIO.getChildrenCount() == 1) {
+ groupColumnIO = (GroupColumnIO) groupColumnIO.getChild(0);
+ }
+ return groupColumnIO;
+ }
+
+ public static ColumnIO getArrayElementColumn(ColumnIO columnIO) {
+ while (columnIO instanceof GroupColumnIO &&
!columnIO.getType().isRepetition(REPEATED)) {
+ columnIO = ((GroupColumnIO) columnIO).getChild(0);
+ }
+
+ // Three-level list: skip the synthetic `element` / `list` wrapper when
present.
+ if (columnIO instanceof GroupColumnIO
+ && columnIO.getType().getLogicalTypeAnnotation() == null
+ && ((GroupColumnIO) columnIO).getChildrenCount() == 1
+ && !columnIO.getName().equals("array")
+ && !columnIO.getName().equals(columnIO.getParent().getName() +
"_tuple")) {
+ return ((GroupColumnIO) columnIO).getChild(0);
+ }
+ return columnIO;
}
/**
- * Construct the error message when original type mismatches.
+ * Returns the field index with given physical row type {@code groupType}
and field name
+ * {@code fieldName}.
*
- * @param originalType Original type
- * @param fieldType Logical field type
- * @return The error message
+ * @return the physical field index or -1 if the field does not exist
+ */
+ private static int getFieldIndexInPhysicalType(String fieldName, GroupType
groupType) {
+ return groupType.containsField(fieldName) ?
groupType.getFieldIndex(fieldName) : -1;
+ }
+
+ private static String getPrimitiveTypeCheckFailureMessage(
+ PrimitiveType.PrimitiveTypeName primitiveType, LogicalType fieldType) {
+ return String.format(
+ "Unexpected type exception. Primitive type: %s. Field type: %s.",
+ primitiveType, fieldType.getTypeRoot().name());
+ }
+
+ private static String getOriginalTypeCheckFailureMessage(
+ OriginalType originalType, LogicalType fieldType) {
+ return String.format(
+ "Unexpected type exception. Original type: %s. Field type: %s.",
+ originalType, fieldType.getTypeRoot().name());
+ }
+
+ /**
+ * Returns a synthetic null-column reader to fill missing top-level fields.
Kept as a convenience
+ * for callers that need to mirror Hudi's original behaviour where a missing
column produces an
+ * explicit null-valued reader rather than being omitted from the batch.
*/
- private static String getOriginalTypeCheckFailureMessage(OriginalType
originalType, LogicalType fieldType) {
- return String.format("Unexpected type exception. Original type: %s. Field
type: %s.", originalType, fieldType.getTypeRoot().name());
+ public static ColumnReader<WritableColumnVector> emptyColumnReader() {
+ return new EmptyColumnReader();
}
}
diff --git
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
index a98bdebd707a..14aad22039e0 100644
---
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
+++
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.format.cow.vector;
import lombok.Getter;
import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.columnar.ColumnarMapData;
import org.apache.flink.table.data.columnar.vector.ColumnVector;
import org.apache.flink.table.data.columnar.vector.MapColumnVector;
import org.apache.flink.table.data.columnar.vector.heap.AbstractHeapVector;
@@ -27,6 +28,14 @@ import
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector
/**
* This class represents a nullable heap map column vector.
+ *
+ * <p>Mirrors {@code
org.apache.flink.table.data.columnar.vector.heap.HeapMapVector} from
+ * Flink 2.1 (FLINK-35702). One deliberate divergence from upstream is
preserved for backward
+ * compatibility: the {@code keys} / {@code values} fields are typed
+ * {@link WritableColumnVector} rather than upstream's {@link ColumnVector},
so the existing
+ * Lombok-generated {@code getKeys()} / {@code getValues()} accessors keep
their original
+ * signature. Callers wanting the Flink-2.1 contract (a {@code ColumnVector})
use
+ * {@link #getKeyColumnVector()} / {@link #getValueColumnVector()}.
*/
public class HeapMapColumnVector extends AbstractHeapVector
implements WritableColumnVector, MapColumnVector {
@@ -38,10 +47,8 @@ public class HeapMapColumnVector extends AbstractHeapVector
//
---------------------------------------------------------------------------------------------
// Flink 2.1 Dremel-style state. Populated by {@link
- // org.apache.hudi.table.format.cow.vector.reader.NestedColumnReader}
(FLINK-35702 port). The
- // legacy {@link #getMap(int)} implementation below continues to use {@code
ColumnarGroupMapData}
- // — wiring it through these offsets/lengths happens in a follow-up PR that
switches the read
- // path. Left here so the new readers can compile against the additive
surface.
+ // org.apache.hudi.table.format.cow.vector.reader.NestedColumnReader}
(FLINK-35702 port) and
+ // consumed by {@link #getMap(int)}.
//
---------------------------------------------------------------------------------------------
private long[] offsets;
private long[] lengths;
@@ -102,6 +109,8 @@ public class HeapMapColumnVector extends AbstractHeapVector
@Override
public MapData getMap(int rowId) {
- return new ColumnarGroupMapData(keys, values, rowId);
+ long offset = offsets[rowId];
+ long length = lengths[rowId];
+ return new ColumnarMapData(keys, values, (int) offset, (int) length);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/NestedColumnReader.java
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/NestedColumnReader.java
index 27eab298b320..6ae1cc9492ec 100644
---
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/NestedColumnReader.java
+++
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/NestedColumnReader.java
@@ -147,6 +147,28 @@ public class NestedColumnReader implements
ColumnReader<WritableColumnVector> {
if (rowPosition.getIsNull() != null) {
setFieldNullFlag(rowPosition.getIsNull(), heapRowVector);
}
+
+ // Hudi-specific: collapse a present row whose every child is null into a
null row. The
+ // legacy RowColumnReader did this so that a SQL value like `row(null,
null)` round-trips
+ // to NULL on read; preserve it here for backward compatibility. Diverges
from Flink 2.1,
+ // which would surface it as Row(null, null). Mirrored by the integration
test
+ // ITTestHoodieDataSource#testParquetNullChildColumnsRowTypes.
+ int rowCount = rowPosition.getPositionsCount();
+ for (int j = 0; j < rowCount; j++) {
+ if (heapRowVector.isNullAt(j)) {
+ continue;
+ }
+ boolean allChildrenNull = true;
+ for (WritableColumnVector child : finalChildrenVectors) {
+ if (!child.isNullAt(j)) {
+ allChildrenNull = false;
+ break;
+ }
+ }
+ if (allChildrenNull) {
+ heapRowVector.setNullAt(j);
+ }
+ }
return Tuple2.of(levelDelegation, heapRowVector);
}
diff --git
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
index 3572b117a631..1826419db5d4 100644
---
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
+++
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
@@ -18,7 +18,9 @@
package org.apache.hudi.table.format.cow.vector.reader;
+import org.apache.hudi.table.format.cow.ParquetSplitReaderUtil;
import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector;
+import org.apache.hudi.table.format.cow.vector.type.ParquetField;
import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
import org.apache.flink.table.data.RowData;
@@ -28,6 +30,7 @@ import
org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch;
import
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -39,6 +42,8 @@ import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
@@ -46,6 +51,7 @@ import org.apache.parquet.schema.Types;
import java.io.Closeable;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -53,7 +59,6 @@ import java.util.Locale;
import java.util.Map;
import java.util.stream.IntStream;
-import static
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createColumnReader;
import static
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector;
import static org.apache.parquet.filter2.compat.FilterCompat.get;
import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
@@ -77,6 +82,14 @@ public class ParquetColumnarRowSplitReader implements
Closeable {
private final MessageType requestedSchema;
+ /**
+ * {@link ParquetField} tree per top-level requested column, used by
+ * {@link ParquetSplitReaderUtil#createColumnReader(boolean, LogicalType,
Type, List,
+ * PageReadStore, ParquetField)} to drive the Dremel-style {@link
NestedColumnReader} for
+ * nested types. Entries are {@code null} for primitive top-level fields.
Built once per split.
+ */
+ private final List<ParquetField> requestedFields;
+
/**
* The total number of rows this RecordReader will eventually read. The sum
of the rows of all
* the row groups.
@@ -158,6 +171,20 @@ public class ParquetColumnarRowSplitReader implements
Closeable {
checkSchema();
+ // Build the ParquetField tree once per split (the Dremel-style nested
reader reuses it across
+ // row groups). Only columns with nested logical type get a non-null entry
— primitive columns
+ // still use Hudi's specialized ColumnReaders.
+ MessageColumnIO messageColumnIO = new
ColumnIOFactory().getColumnIO(requestedSchema);
+ List<RowType.RowField> requestedRowFields = new
ArrayList<>(requestedTypes.length);
+ List<String> requestedFieldNames = new ArrayList<>(requestedTypes.length);
+ for (int i = 0; i < requestedTypes.length; i++) {
+ String name = requestedSchema.getFieldName(i);
+ requestedRowFields.add(new RowType.RowField(name, requestedTypes[i]));
+ requestedFieldNames.add(name);
+ }
+ this.requestedFields = ParquetSplitReaderUtil.buildFieldsList(
+ requestedRowFields, requestedFieldNames, messageColumnIO);
+
this.writableVectors = createWritableVectors();
ColumnVector[] columnVectors = patchedVector(selectedFieldNames.length,
createReadableVectors(), requestedIndices);
this.columnarBatch = generator.generate(columnVectors);
@@ -340,12 +367,13 @@ public class ParquetColumnarRowSplitReader implements
Closeable {
List<ColumnDescriptor> columns = requestedSchema.getColumns();
columnReaders = new ColumnReader[types.size()];
for (int i = 0; i < types.size(); ++i) {
- columnReaders[i] = createColumnReader(
+ columnReaders[i] = ParquetSplitReaderUtil.createColumnReader(
utcTimestamp,
requestedTypes[i],
types.get(i),
columns,
- pages);
+ pages,
+ requestedFields.get(i));
}
totalCountLoadedSoFar += pages.getRowCount();
}