This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b50040ab8e [Parquet] Revert parquet patch #3883 which tries to
construct column vectors like orc (#4745)
b50040ab8e is described below
commit b50040ab8ebceef6c15bfe394dfcd15fbe997e17
Author: yuzelin <[email protected]>
AuthorDate: Sat Dec 21 16:39:23 2024 +0800
[Parquet] Revert parquet patch #3883 which tries to construct column
vectors like orc (#4745)
---
.../arrow/converter/ArrowBatchConverterTest.java | 64 +++--
.../format/parquet/ParquetReaderFactory.java | 7 +-
.../format/parquet/position/RowPosition.java | 40 +++
.../format/parquet/reader/NestedColumnReader.java | 126 ++++------
.../format/parquet/reader/NestedPositionUtil.java | 87 +++++--
.../reader/NestedPrimitiveColumnReader.java | 70 +-----
.../format/parquet/reader/RowColumnReader.java | 59 +++++
.../format/parquet/ParquetColumnVectorTest.java | 270 +++++++++++++++------
.../format/parquet/ParquetReadWriteTest.java | 32 ++-
9 files changed, 482 insertions(+), 273 deletions(-)
diff --git
a/paimon-arrow/src/test/java/org/apache/paimon/arrow/converter/ArrowBatchConverterTest.java
b/paimon-arrow/src/test/java/org/apache/paimon/arrow/converter/ArrowBatchConverterTest.java
index aef589d912..96470b72ee 100644
---
a/paimon-arrow/src/test/java/org/apache/paimon/arrow/converter/ArrowBatchConverterTest.java
+++
b/paimon-arrow/src/test/java/org/apache/paimon/arrow/converter/ArrowBatchConverterTest.java
@@ -167,7 +167,7 @@ public class ArrowBatchConverterTest {
rows.add(GenericRow.of(randomRowValues));
}
- return getRecordIterator(PRIMITIVE_TYPE, rows, projection);
+ return getRecordIterator(PRIMITIVE_TYPE, rows, projection, true);
}
@TestTemplate
@@ -244,7 +244,7 @@ public class ArrowBatchConverterTest {
}
RecordReader.RecordIterator<InternalRow> iterator =
- getRecordIterator(nestedArrayType, rows);
+ getRecordIterator(nestedArrayType, rows, null,
testMode.equals("per_row"));
try (RootAllocator allocator = new RootAllocator()) {
VectorSchemaRoot vsr =
ArrowUtils.createVectorSchemaRoot(nestedArrayType, allocator);
ArrowBatchConverter arrowWriter = createArrowWriter(iterator,
nestedArrayType, vsr);
@@ -308,7 +308,8 @@ public class ArrowBatchConverterTest {
expectedMaps.add(map1);
}
- RecordReader.RecordIterator<InternalRow> iterator =
getRecordIterator(nestedMapType, rows);
+ RecordReader.RecordIterator<InternalRow> iterator =
+ getRecordIterator(nestedMapType, rows, null,
testMode.equals("per_row"));
try (RootAllocator allocator = new RootAllocator()) {
VectorSchemaRoot vsr =
ArrowUtils.createVectorSchemaRoot(nestedMapType, allocator);
ArrowBatchConverter arrowWriter = createArrowWriter(iterator,
nestedMapType, vsr);
@@ -365,7 +366,11 @@ public class ArrowBatchConverterTest {
InternalRow row3 = GenericRow.of(new GenericMap(map3));
RecordReader.RecordIterator<InternalRow> iterator =
- getRecordIterator(nestedMapRowType, Arrays.asList(row1, row2,
row3));
+ getRecordIterator(
+ nestedMapRowType,
+ Arrays.asList(row1, row2, row3),
+ null,
+ testMode.equals("per_row"));
try (RootAllocator allocator = new RootAllocator()) {
VectorSchemaRoot vsr =
ArrowUtils.createVectorSchemaRoot(nestedMapRowType, allocator);
ArrowBatchConverter arrowWriter = createArrowWriter(iterator,
nestedMapRowType, vsr);
@@ -423,7 +428,8 @@ public class ArrowBatchConverterTest {
rows.add(GenericRow.of(GenericRow.of(randomRowValues)));
}
- RecordReader.RecordIterator<InternalRow> iterator =
getRecordIterator(nestedRowType, rows);
+ RecordReader.RecordIterator<InternalRow> iterator =
+ getRecordIterator(nestedRowType, rows, null,
testMode.equals("per_row"));
try (RootAllocator allocator = new RootAllocator()) {
VectorSchemaRoot vsr =
ArrowUtils.createVectorSchemaRoot(nestedRowType, allocator);
ArrowBatchConverter arrowWriter = createArrowWriter(iterator,
nestedRowType, vsr);
@@ -464,7 +470,8 @@ public class ArrowBatchConverterTest {
rows.add(GenericRow.of(i));
}
- RecordReader.RecordIterator<InternalRow> iterator =
getRecordIterator(rowType, rows);
+ RecordReader.RecordIterator<InternalRow> iterator =
+ getRecordIterator(rowType, rows, null, true);
try (RootAllocator allocator = new RootAllocator()) {
VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(rowType,
allocator);
ArrowBatchConverter arrowWriter = createArrowWriter(iterator,
rowType, vsr);
@@ -515,7 +522,7 @@ public class ArrowBatchConverterTest {
int[] projection = readEmpty ? new int[0] : null;
RecordReader.RecordIterator<InternalRow> iterator =
getApplyDeletionFileRecordIterator(
- rowType, rows, deleted,
Collections.singletonList("pk"), projection);
+ rowType, rows, deleted,
Collections.singletonList("pk"), projection, true);
if (readEmpty) {
testReadEmpty(iterator, numRows - deleted.size());
} else {
@@ -588,7 +595,12 @@ public class ArrowBatchConverterTest {
Set<Integer> deleted = getDeletedPks(numRows);
RecordReader.RecordIterator<InternalRow> iterator =
getApplyDeletionFileRecordIterator(
- nestedArrayType, rows, deleted,
Collections.singletonList("pk"), null);
+ nestedArrayType,
+ rows,
+ deleted,
+ Collections.singletonList("pk"),
+ null,
+ testMode.equals("per_row"));
try (RootAllocator allocator = new RootAllocator()) {
Set<Integer> expectedPks = getExpectedPks(numRows, deleted);
VectorSchemaRoot vsr =
ArrowUtils.createVectorSchemaRoot(nestedArrayType, allocator);
@@ -666,7 +678,12 @@ public class ArrowBatchConverterTest {
Set<Integer> deleted = getDeletedPks(numRows);
RecordReader.RecordIterator<InternalRow> iterator =
getApplyDeletionFileRecordIterator(
- nestedMapType, rows, deleted,
Collections.singletonList("pk"), null);
+ nestedMapType,
+ rows,
+ deleted,
+ Collections.singletonList("pk"),
+ null,
+ testMode.equals("per_row"));
try (RootAllocator allocator = new RootAllocator()) {
Set<Integer> expectedPks = getExpectedPks(numRows, deleted);
VectorSchemaRoot vsr =
ArrowUtils.createVectorSchemaRoot(nestedMapType, allocator);
@@ -735,7 +752,12 @@ public class ArrowBatchConverterTest {
Set<Integer> deleted = getDeletedPks(numRows);
RecordReader.RecordIterator<InternalRow> iterator =
getApplyDeletionFileRecordIterator(
- nestedRowType, rows, deleted,
Collections.singletonList("pk"), null);
+ nestedRowType,
+ rows,
+ deleted,
+ Collections.singletonList("pk"),
+ null,
+ testMode.equals("per_row"));
try (RootAllocator allocator = new RootAllocator()) {
Set<Integer> expectedPks = getExpectedPks(numRows, deleted);
VectorSchemaRoot vsr =
ArrowUtils.createVectorSchemaRoot(nestedRowType, allocator);
@@ -803,14 +825,15 @@ public class ArrowBatchConverterTest {
}
private RecordReader.RecordIterator<InternalRow> getRecordIterator(
- RowType rowType, List<InternalRow> rows) throws Exception {
- return getRecordIterator(rowType, rows, null);
- }
-
- private RecordReader.RecordIterator<InternalRow> getRecordIterator(
- RowType rowType, List<InternalRow> rows, @Nullable int[]
projection) throws Exception {
+ RowType rowType,
+ List<InternalRow> rows,
+ @Nullable int[] projection,
+ boolean canTestParquet)
+ throws Exception {
Map<String, String> options = new HashMap<>();
- options.put(CoreOptions.FILE_FORMAT.key(), RND.nextBoolean() ? "orc" :
"parquet");
+ options.put(
+ CoreOptions.FILE_FORMAT.key(),
+ canTestParquet && RND.nextBoolean() ? "parquet" : "orc");
FileStoreTable table = createFileStoreTable(rowType,
Collections.emptyList(), options);
StreamTableWrite write = table.newStreamWriteBuilder().newWrite();
@@ -832,12 +855,15 @@ public class ArrowBatchConverterTest {
List<GenericRow> rows,
Set<Integer> deletedPks,
List<String> primaryKeys,
- @Nullable int[] projection)
+ @Nullable int[] projection,
+ boolean canTestParquet)
throws Exception {
Map<String, String> options = new HashMap<>();
options.put(CoreOptions.DELETION_VECTORS_ENABLED.key(), "true");
options.put(CoreOptions.BUCKET.key(), "1");
- options.put(CoreOptions.FILE_FORMAT.key(), RND.nextBoolean() ? "orc" :
"parquet");
+ options.put(
+ CoreOptions.FILE_FORMAT.key(),
+ canTestParquet && RND.nextBoolean() ? "parquet" : "orc");
FileStoreTable table = createFileStoreTable(rowType, primaryKeys,
options);
StreamTableWrite write = table.newStreamWriteBuilder().newWrite();
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
index 6f8cab2202..910f3031e0 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
@@ -383,13 +383,14 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
/** Advances to the next batch of rows. Returns false if there are no
more. */
private boolean nextBatch(ParquetReaderBatch batch) throws IOException
{
+ if (rowsReturned >= totalRowCount) {
+ return false;
+ }
+
for (WritableColumnVector v : batch.writableVectors) {
v.reset();
}
batch.columnarBatch.setNumRows(0);
- if (rowsReturned >= totalRowCount) {
- return false;
- }
if (rowsReturned == totalCountLoadedSoFar) {
readNextRowGroup();
} else {
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/position/RowPosition.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/position/RowPosition.java
new file mode 100644
index 0000000000..fb63783490
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/position/RowPosition.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.parquet.position;
+
+import javax.annotation.Nullable;
+
+/** To represent struct's position in repeated type. */
+public class RowPosition {
+ @Nullable private final boolean[] isNull;
+ private final int positionsCount;
+
+ public RowPosition(boolean[] isNull, int positionsCount) {
+ this.isNull = isNull;
+ this.positionsCount = positionsCount;
+ }
+
+ public boolean[] getIsNull() {
+ return isNull;
+ }
+
+ public int getPositionsCount() {
+ return positionsCount;
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java
index 8f20be2754..3724014e62 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java
@@ -27,6 +27,7 @@ import org.apache.paimon.data.columnar.heap.HeapRowVector;
import org.apache.paimon.data.columnar.writable.WritableColumnVector;
import org.apache.paimon.format.parquet.position.CollectionPosition;
import org.apache.paimon.format.parquet.position.LevelDelegation;
+import org.apache.paimon.format.parquet.position.RowPosition;
import org.apache.paimon.format.parquet.type.ParquetField;
import org.apache.paimon.format.parquet.type.ParquetGroupField;
import org.apache.paimon.format.parquet.type.ParquetPrimitiveField;
@@ -87,26 +88,20 @@ public class NestedColumnReader implements
ColumnReader<WritableColumnVector> {
@Override
public void readToVector(int readNumber, WritableColumnVector vector)
throws IOException {
- readData(field, readNumber, vector, false, false, false);
+ readData(field, readNumber, vector, false);
}
private Pair<LevelDelegation, WritableColumnVector> readData(
- ParquetField field,
- int readNumber,
- ColumnVector vector,
- boolean inside,
- boolean readRowField,
- boolean readMapKey)
+ ParquetField field, int readNumber, ColumnVector vector, boolean
inside)
throws IOException {
if (field.getType() instanceof RowType) {
return readRow((ParquetGroupField) field, readNumber, vector,
inside);
} else if (field.getType() instanceof MapType || field.getType()
instanceof MultisetType) {
- return readMap((ParquetGroupField) field, readNumber, vector,
inside, readRowField);
+ return readMap((ParquetGroupField) field, readNumber, vector,
inside);
} else if (field.getType() instanceof ArrayType) {
- return readArray((ParquetGroupField) field, readNumber, vector,
inside, readRowField);
+ return readArray((ParquetGroupField) field, readNumber, vector,
inside);
} else {
- return readPrimitive(
- (ParquetPrimitiveField) field, readNumber, vector,
readRowField, readMapKey);
+ return readPrimitive((ParquetPrimitiveField) field, readNumber,
vector);
}
}
@@ -114,60 +109,64 @@ public class NestedColumnReader implements
ColumnReader<WritableColumnVector> {
ParquetGroupField field, int readNumber, ColumnVector vector,
boolean inside)
throws IOException {
HeapRowVector heapRowVector = (HeapRowVector) vector;
- LevelDelegation longest = null;
+ LevelDelegation levelDelegation = null;
List<ParquetField> children = field.getChildren();
WritableColumnVector[] childrenVectors = heapRowVector.getFields();
WritableColumnVector[] finalChildrenVectors =
new WritableColumnVector[childrenVectors.length];
+
+ int len = -1;
+ boolean[] isNull = null;
+ boolean hasNull = false;
+
for (int i = 0; i < children.size(); i++) {
Pair<LevelDelegation, WritableColumnVector> tuple =
- readData(children.get(i), readNumber, childrenVectors[i],
true, true, false);
- LevelDelegation current = tuple.getLeft();
- if (longest == null) {
- longest = current;
- } else if (current.getDefinitionLevel().length >
longest.getDefinitionLevel().length) {
- longest = current;
- }
+ readData(children.get(i), readNumber, childrenVectors[i],
true);
+ levelDelegation = tuple.getLeft();
finalChildrenVectors[i] = tuple.getRight();
+
+ WritableColumnVector writableColumnVector = tuple.getRight();
+ if (len == -1) {
+ len = ((ElementCountable) writableColumnVector).getLen();
+ isNull = new boolean[len];
+ Arrays.fill(isNull, true);
+ }
+
+ for (int j = 0; j < len; j++) {
+ isNull[j] = isNull[j] && writableColumnVector.isNullAt(j);
+ if (isNull[j]) {
+ hasNull = true;
+ }
+ }
}
- if (longest == null) {
+ if (levelDelegation == null) {
throw new RuntimeException(
String.format("Row field does not have any children: %s.",
field));
}
- int len = ((ElementCountable) finalChildrenVectors[0]).getLen();
- boolean[] isNull = new boolean[len];
- Arrays.fill(isNull, true);
- boolean hasNull = false;
- for (int i = 0; i < len; i++) {
- for (WritableColumnVector child : finalChildrenVectors) {
- isNull[i] = isNull[i] && child.isNullAt(i);
- }
- if (isNull[i]) {
- hasNull = true;
- }
- }
+ RowPosition rowPosition =
+ NestedPositionUtil.calculateRowOffsets(
+ field,
+ levelDelegation.getDefinitionLevel(),
+ levelDelegation.getRepetitionLevel());
// If row was inside the structure, then we need to renew the vector
to reset the
// capacity.
if (inside) {
- heapRowVector = new HeapRowVector(len, finalChildrenVectors);
+ heapRowVector =
+ new HeapRowVector(rowPosition.getPositionsCount(),
finalChildrenVectors);
} else {
heapRowVector.setFields(finalChildrenVectors);
}
if (hasNull) {
- setFieldNullFlag(isNull, heapRowVector);
+ setFieldNullFalg(isNull, heapRowVector);
}
- return Pair.of(longest, heapRowVector);
+ return Pair.of(levelDelegation, heapRowVector);
}
private Pair<LevelDelegation, WritableColumnVector> readMap(
- ParquetGroupField field,
- int readNumber,
- ColumnVector vector,
- boolean inside,
- boolean readRowField)
+ ParquetGroupField field, int readNumber, ColumnVector vector,
boolean inside)
throws IOException {
HeapMapVector mapVector = (HeapMapVector) vector;
mapVector.reset();
@@ -177,21 +176,9 @@ public class NestedColumnReader implements
ColumnReader<WritableColumnVector> {
"Maps must have two type parameters, found %s",
children.size());
Pair<LevelDelegation, WritableColumnVector> keyTuple =
- readData(
- children.get(0),
- readNumber,
- mapVector.getKeyColumnVector(),
- true,
- false,
- true);
+ readData(children.get(0), readNumber,
mapVector.getKeyColumnVector(), true);
Pair<LevelDelegation, WritableColumnVector> valueTuple =
- readData(
- children.get(1),
- readNumber,
- mapVector.getValueColumnVector(),
- true,
- false,
- false);
+ readData(children.get(1), readNumber,
mapVector.getValueColumnVector(), true);
LevelDelegation levelDelegation = keyTuple.getLeft();
@@ -199,8 +186,7 @@ public class NestedColumnReader implements
ColumnReader<WritableColumnVector> {
NestedPositionUtil.calculateCollectionOffsets(
field,
levelDelegation.getDefinitionLevel(),
- levelDelegation.getRepetitionLevel(),
- readRowField);
+ levelDelegation.getRepetitionLevel());
// If map was inside the structure, then we need to renew the vector
to reset the
// capacity.
@@ -216,7 +202,7 @@ public class NestedColumnReader implements
ColumnReader<WritableColumnVector> {
}
if (collectionPosition.getIsNull() != null) {
- setFieldNullFlag(collectionPosition.getIsNull(), mapVector);
+ setFieldNullFalg(collectionPosition.getIsNull(), mapVector);
}
mapVector.setLengths(collectionPosition.getLength());
@@ -226,11 +212,7 @@ public class NestedColumnReader implements
ColumnReader<WritableColumnVector> {
}
private Pair<LevelDelegation, WritableColumnVector> readArray(
- ParquetGroupField field,
- int readNumber,
- ColumnVector vector,
- boolean inside,
- boolean readRowField)
+ ParquetGroupField field, int readNumber, ColumnVector vector,
boolean inside)
throws IOException {
HeapArrayVector arrayVector = (HeapArrayVector) vector;
arrayVector.reset();
@@ -240,15 +222,14 @@ public class NestedColumnReader implements
ColumnReader<WritableColumnVector> {
"Arrays must have a single type parameter, found %s",
children.size());
Pair<LevelDelegation, WritableColumnVector> tuple =
- readData(children.get(0), readNumber, arrayVector.getChild(),
true, false, false);
+ readData(children.get(0), readNumber, arrayVector.getChild(),
true);
LevelDelegation levelDelegation = tuple.getLeft();
CollectionPosition collectionPosition =
NestedPositionUtil.calculateCollectionOffsets(
field,
levelDelegation.getDefinitionLevel(),
- levelDelegation.getRepetitionLevel(),
- readRowField);
+ levelDelegation.getRepetitionLevel());
// If array was inside the structure, then we need to renew the vector
to reset the
// capacity.
@@ -259,7 +240,7 @@ public class NestedColumnReader implements
ColumnReader<WritableColumnVector> {
}
if (collectionPosition.getIsNull() != null) {
- setFieldNullFlag(collectionPosition.getIsNull(), arrayVector);
+ setFieldNullFalg(collectionPosition.getIsNull(), arrayVector);
}
arrayVector.setLengths(collectionPosition.getLength());
arrayVector.setOffsets(collectionPosition.getOffsets());
@@ -267,12 +248,7 @@ public class NestedColumnReader implements
ColumnReader<WritableColumnVector> {
}
private Pair<LevelDelegation, WritableColumnVector> readPrimitive(
- ParquetPrimitiveField field,
- int readNumber,
- ColumnVector vector,
- boolean readRowField,
- boolean readMapKey)
- throws IOException {
+ ParquetPrimitiveField field, int readNumber, ColumnVector vector)
throws IOException {
ColumnDescriptor descriptor = field.getDescriptor();
NestedPrimitiveColumnReader reader = columnReaders.get(descriptor);
if (reader == null) {
@@ -282,9 +258,7 @@ public class NestedColumnReader implements
ColumnReader<WritableColumnVector> {
pages,
isUtcTimestamp,
descriptor.getPrimitiveType(),
- field.getType(),
- readRowField,
- readMapKey);
+ field.getType());
columnReaders.put(descriptor, reader);
}
WritableColumnVector writableColumnVector =
@@ -292,7 +266,7 @@ public class NestedColumnReader implements
ColumnReader<WritableColumnVector> {
return Pair.of(reader.getLevelDelegation(), writableColumnVector);
}
- private static void setFieldNullFlag(boolean[] nullFlags,
AbstractHeapVector vector) {
+ private static void setFieldNullFalg(boolean[] nullFlags,
AbstractHeapVector vector) {
for (int index = 0; index < vector.getLen() && index <
nullFlags.length; index++) {
if (nullFlags[index]) {
vector.setNullAt(index);
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java
index b43169a40b..99892c8437 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java
@@ -19,6 +19,7 @@
package org.apache.paimon.format.parquet.reader;
import org.apache.paimon.format.parquet.position.CollectionPosition;
+import org.apache.paimon.format.parquet.position.RowPosition;
import org.apache.paimon.format.parquet.type.ParquetField;
import org.apache.paimon.utils.BooleanArrayList;
import org.apache.paimon.utils.LongArrayList;
@@ -28,6 +29,53 @@ import static java.lang.String.format;
/** Utils to calculate nested type position. */
public class NestedPositionUtil {
+ /**
+ * Calculate row offsets according to column's max repetition level,
definition level, value's
+ * repetition level and definition level. Each row has three situation:
+ * <li>Row is not defined,because it's optional parent fields is null,
this is decided by its
+ * parent's repetition level
+ * <li>Row is null
+ * <li>Row is defined and not empty.
+ *
+ * @param field field that contains the row column message include max
repetition level and
+ * definition level.
+ * @param fieldRepetitionLevels int array with each value's repetition
level.
+ * @param fieldDefinitionLevels int array with each value's definition
level.
+ * @return {@link RowPosition} contains collections row count and isNull
array.
+ */
+ public static RowPosition calculateRowOffsets(
+ ParquetField field, int[] fieldDefinitionLevels, int[]
fieldRepetitionLevels) {
+ int rowDefinitionLevel = field.getDefinitionLevel();
+ int rowRepetitionLevel = field.getRepetitionLevel();
+ int nullValuesCount = 0;
+ BooleanArrayList nullRowFlags = new BooleanArrayList(0);
+ for (int i = 0; i < fieldDefinitionLevels.length; i++) {
+ // TODO: this is not correct ?
+ // if (fieldRepetitionLevels[i] > rowRepetitionLevel) {
+ // throw new IllegalStateException(
+ // format(
+ // "In parquet's row type field
repetition level should
+ // not larger than row's repetition level. "
+ // + "Row repetition level
is %s, row field
+ // repetition level is %s.",
+ // rowRepetitionLevel,
fieldRepetitionLevels[i]));
+ // }
+
+ if (fieldDefinitionLevels[i] >= rowDefinitionLevel) {
+ // current row is defined and not empty
+ nullRowFlags.add(false);
+ } else {
+ // current row is null
+ nullRowFlags.add(true);
+ nullValuesCount++;
+ }
+ }
+ if (nullValuesCount == 0) {
+ return new RowPosition(null, fieldDefinitionLevels.length);
+ }
+ return new RowPosition(nullRowFlags.toArray(), nullRowFlags.size());
+ }
+
/**
* Calculate the collection's offsets according to column's max repetition
level, definition
* level, value's repetition level and definition level. Each collection
(Array or Map) has four
@@ -47,10 +95,7 @@ public class NestedPositionUtil {
* array.
*/
public static CollectionPosition calculateCollectionOffsets(
- ParquetField field,
- int[] definitionLevels,
- int[] repetitionLevels,
- boolean readRowField) {
+ ParquetField field, int[] definitionLevels, int[]
repetitionLevels) {
int collectionDefinitionLevel = field.getDefinitionLevel();
int collectionRepetitionLevel = field.getRepetitionLevel() + 1;
int offset = 0;
@@ -63,42 +108,36 @@ public class NestedPositionUtil {
for (int i = 0;
i < definitionLevels.length;
i = getNextCollectionStartIndex(repetitionLevels,
collectionRepetitionLevel, i)) {
+ valueCount++;
if (definitionLevels[i] >= collectionDefinitionLevel - 1) {
+ boolean isNull =
+ isOptionalFieldValueNull(definitionLevels[i],
collectionDefinitionLevel);
+ nullCollectionFlags.add(isNull);
+ nullValuesCount += isNull ? 1 : 0;
// definitionLevels[i] > collectionDefinitionLevel =>
Collection is defined and not
// empty
// definitionLevels[i] == collectionDefinitionLevel =>
Collection is defined but
// empty
- // definitionLevels[i] == collectionDefinitionLevel - 1 =>
Collection is defined but
- // null
if (definitionLevels[i] > collectionDefinitionLevel) {
- nullCollectionFlags.add(false);
emptyCollectionFlags.add(false);
offset += getCollectionSize(repetitionLevels,
collectionRepetitionLevel, i + 1);
} else if (definitionLevels[i] == collectionDefinitionLevel) {
- nullCollectionFlags.add(false);
+ offset++;
emptyCollectionFlags.add(true);
- // don't increase offset for empty values
} else {
- nullCollectionFlags.add(true);
- nullValuesCount++;
- // 1. don't increase offset for null values
- // 2. offsets and emptyCollectionFlags are meaningless for
null values, but they
- // must be set at each index for calculating lengths later
+ offset++;
emptyCollectionFlags.add(false);
}
offsets.add(offset);
- valueCount++;
- } else if (definitionLevels[i] == collectionDefinitionLevel - 2 &&
readRowField) {
- // row field should store null value
+ } else {
+ // when definitionLevels[i] < collectionDefinitionLevel - 1,
it means the collection
+ // is
+ // not defined, but we need to regard it as null to avoid
getting value wrong.
nullCollectionFlags.add(true);
nullValuesCount++;
+ offsets.add(++offset);
emptyCollectionFlags.add(false);
-
- offsets.add(offset);
- valueCount++;
}
- // else when definitionLevels[i] < collectionDefinitionLevel - 1,
it means the
- // collection is not defined, just ignore it
}
long[] offsetsArray = offsets.toArray();
long[] length =
calculateLengthByOffsets(emptyCollectionFlags.toArray(), offsetsArray);
@@ -109,6 +148,10 @@ public class NestedPositionUtil {
nullCollectionFlags.toArray(), offsetsArray, length,
valueCount);
}
+ public static boolean isOptionalFieldValueNull(int definitionLevel, int
maxDefinitionLevel) {
+ return definitionLevel == maxDefinitionLevel - 1;
+ }
+
public static long[] calculateLengthByOffsets(
boolean[] collectionIsEmpty, long[] arrayOffsets) {
LongArrayList lengthList = new LongArrayList(arrayOffsets.length);
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
index f0a82a6d71..69b0fa5744 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
@@ -75,14 +75,12 @@ public class NestedPrimitiveColumnReader implements
ColumnReader<WritableColumnV
private final ColumnDescriptor descriptor;
private final Type type;
private final DataType dataType;
- private final boolean readRowField;
- private final boolean readMapKey;
/** The dictionary, if this column has dictionary encoding. */
private final ParquetDataColumnReader dictionary;
/** Maximum definition level for this column. */
private final int maxDefLevel;
- private final boolean isUtcTimestamp;
+ private boolean isUtcTimestamp;
/** If true, the current page is dictionary encoded. */
private boolean isCurrentPageDictionaryEncoded;
@@ -106,16 +104,14 @@ public class NestedPrimitiveColumnReader implements
ColumnReader<WritableColumnV
private boolean isFirstRow = true;
- private final LastValueContainer lastValue = new LastValueContainer();
+ private Object lastValue;
public NestedPrimitiveColumnReader(
ColumnDescriptor descriptor,
PageReadStore pageReadStore,
boolean isUtcTimestamp,
Type parquetType,
- DataType dataType,
- boolean readRowField,
- boolean readMapKey)
+ DataType dataType)
throws IOException {
this.descriptor = descriptor;
this.type = parquetType;
@@ -123,8 +119,6 @@ public class NestedPrimitiveColumnReader implements
ColumnReader<WritableColumnV
this.maxDefLevel = descriptor.getMaxDefinitionLevel();
this.isUtcTimestamp = isUtcTimestamp;
this.dataType = dataType;
- this.readRowField = readRowField;
- this.readMapKey = readMapKey;
DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
@@ -196,9 +190,8 @@ public class NestedPrimitiveColumnReader implements
ColumnReader<WritableColumnV
boolean needFilterSkip = pageRowId < rangeStart;
do {
-
- if (!lastValue.shouldSkip && !needFilterSkip) {
- valueList.add(lastValue.value);
+ if (!needFilterSkip) {
+ valueList.add(lastValue);
valueIndex++;
}
} while (readValue() && (repetitionLevel != 0));
@@ -225,27 +218,6 @@ public class NestedPrimitiveColumnReader implements
ColumnReader<WritableColumnV
return new LevelDelegation(repetition, definition);
}
- /**
- * An ARRAY[ARRAY[INT]] Example: {[[0, null], [1], [], null], [], null} =>
[5, 4, 5, 3, 2, 1, 0]
- *
- * <ul>
- * <li>definitionLevel == maxDefLevel => not null value
- * <li>definitionLevel == maxDefLevel - 1 => null value
- * <li>definitionLevel == maxDefLevel - 2 => empty set, skip
- * <li>definitionLevel == maxDefLevel - 3 => null set, skip
- * <li>definitionLevel == maxDefLevel - 4 => empty outer set, skip
- * <li>definitionLevel == maxDefLevel - 5 => null outer set, skip
- * <li>... skip
- * </ul>
- *
- * <p>When (definitionLevel <= maxDefLevel - 2) we skip the value because
children ColumnVector
- * for OrcArrayColumnVector don't contain empty and null set value. Stay
consistent here.
- *
- * <p>For MAP, the value vector is the same as ARRAY. But the key vector
isn't nullable, so just
- * read value when definitionLevel == maxDefLevel.
- *
- * <p>For ROW, RowColumnVector still get null value when definitionLevel
== maxDefLevel - 2.
- */
private boolean readValue() throws IOException {
int left = readPageIfNeed();
if (left > 0) {
@@ -255,24 +227,12 @@ public class NestedPrimitiveColumnReader implements
ColumnReader<WritableColumnV
if (definitionLevel == maxDefLevel) {
if (isCurrentPageDictionaryEncoded) {
int dictionaryId = dataColumn.readValueDictionaryId();
- lastValue.setValue(dictionaryDecodeValue(dataType,
dictionaryId));
+ lastValue = dictionaryDecodeValue(dataType, dictionaryId);
} else {
- lastValue.setValue(readPrimitiveTypedRow(dataType));
+ lastValue = readPrimitiveTypedRow(dataType);
}
} else {
- if (readMapKey) {
- lastValue.skip();
- } else {
- if (definitionLevel == maxDefLevel - 1) {
- // null value inner set
- lastValue.setValue(null);
- } else if (definitionLevel == maxDefLevel - 2 &&
readRowField) {
- lastValue.setValue(null);
- } else {
- // current set is empty or null
- lastValue.skip();
- }
- }
+ lastValue = null;
}
return true;
} else {
@@ -721,18 +681,4 @@ public class NestedPrimitiveColumnReader implements
ColumnReader<WritableColumnV
return 0;
}
}
-
- private static class LastValueContainer {
- protected boolean shouldSkip;
- protected Object value;
-
- protected void setValue(Object value) {
- this.value = value;
- this.shouldSkip = false;
- }
-
- protected void skip() {
- this.shouldSkip = true;
- }
- }
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/RowColumnReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/RowColumnReader.java
new file mode 100644
index 0000000000..fa2da03ef3
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/RowColumnReader.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.parquet.reader;
+
+import org.apache.paimon.data.columnar.heap.HeapRowVector;
+import org.apache.paimon.data.columnar.writable.WritableColumnVector;
+
+import java.io.IOException;
+import java.util.List;
+
+/** Row {@link ColumnReader}. */
+public class RowColumnReader implements ColumnReader<WritableColumnVector> {
+
+ private final List<ColumnReader> fieldReaders;
+
+ public RowColumnReader(List<ColumnReader> fieldReaders) {
+ this.fieldReaders = fieldReaders;
+ }
+
+ @Override
+ public void readToVector(int readNumber, WritableColumnVector vector)
throws IOException {
+ HeapRowVector rowVector = (HeapRowVector) vector;
+ WritableColumnVector[] vectors = rowVector.getFields();
+ // row vector null array
+ boolean[] isNulls = new boolean[readNumber];
+ for (int i = 0; i < vectors.length; i++) {
+ fieldReaders.get(i).readToVector(readNumber, vectors[i]);
+
+ for (int j = 0; j < readNumber; j++) {
+ if (i == 0) {
+ isNulls[j] = vectors[i].isNullAt(j);
+ } else {
+ isNulls[j] = isNulls[j] && vectors[i].isNullAt(j);
+ }
+ if (i == vectors.length - 1 && isNulls[j]) {
+ // rowColumnVector[j] is null only when all fields[j] of
rowColumnVector[j] is
+ // null
+ rowVector.setNullAt(j);
+ }
+ }
+ }
+ }
+}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetColumnVectorTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetColumnVectorTest.java
index 453e9f2752..0d862c3963 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetColumnVectorTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetColumnVectorTest.java
@@ -30,8 +30,6 @@ import org.apache.paimon.data.columnar.ArrayColumnVector;
import org.apache.paimon.data.columnar.BytesColumnVector;
import org.apache.paimon.data.columnar.ColumnVector;
import org.apache.paimon.data.columnar.IntColumnVector;
-import org.apache.paimon.data.columnar.MapColumnVector;
-import org.apache.paimon.data.columnar.RowColumnVector;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.format.FormatWriter;
@@ -53,6 +51,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@@ -120,12 +119,12 @@ public class ParquetColumnVectorTest {
assertThat(iterator.next()).isNull();
// validate ColumnVector
- ArrayColumnVector arrayColumnVector = (ArrayColumnVector)
batch.columns[0];
- expectedData.validateColumnVector(arrayColumnVector, getter);
-
- expectedData.validateInnerChild(
- arrayColumnVector.getColumnVector(),
BYTES_COLUMN_VECTOR_STRING_FUNC);
-
+ // ArrayColumnVector arrayColumnVector = (ArrayColumnVector)
batch.columns[0];
+ // expectedData.validateColumnVector(arrayColumnVector, getter);
+ //
+ // expectedData.validateInnerChild(
+ // arrayColumnVector.getColumnVector(),
BYTES_COLUMN_VECTOR_STRING_FUNC);
+ //
iterator.releaseBatch();
}
@@ -188,16 +187,16 @@ public class ParquetColumnVectorTest {
assertThat(iterator.next()).isNull();
// validate column vector
- ArrayColumnVector arrayColumnVector = (ArrayColumnVector)
batch.columns[0];
-
- expectedData.validateOuterArray(arrayColumnVector, getter);
-
- ArrayColumnVector innerArrayColumnVector =
- (ArrayColumnVector) arrayColumnVector.getColumnVector();
- expectedData.validateInnerArray(innerArrayColumnVector, getter);
-
- ColumnVector columnVector = innerArrayColumnVector.getColumnVector();
- expectedData.validateInnerChild(columnVector,
BYTES_COLUMN_VECTOR_STRING_FUNC);
+ // ArrayColumnVector arrayColumnVector = (ArrayColumnVector)
batch.columns[0];
+ //
+ // expectedData.validateOuterArray(arrayColumnVector, getter);
+ //
+ // ArrayColumnVector innerArrayColumnVector =
+ // (ArrayColumnVector)
arrayColumnVector.getColumnVector();
+ // expectedData.validateInnerArray(innerArrayColumnVector,
getter);
+ //
+ // ColumnVector columnVector =
innerArrayColumnVector.getColumnVector();
+ // expectedData.validateInnerChild(columnVector,
BYTES_COLUMN_VECTOR_STRING_FUNC);
}
@Test
@@ -251,11 +250,13 @@ public class ParquetColumnVectorTest {
assertThat(iterator.next()).isNull();
// validate ColumnVector
- MapColumnVector mapColumnVector = (MapColumnVector) batch.columns[0];
- IntColumnVector keyColumnVector = (IntColumnVector)
mapColumnVector.getKeyColumnVector();
- validateMapKeyColumnVector(keyColumnVector, expectedData);
- ColumnVector valueColumnVector =
mapColumnVector.getValueColumnVector();
- expectedData.validateInnerChild(valueColumnVector,
BYTES_COLUMN_VECTOR_STRING_FUNC);
+ // MapColumnVector mapColumnVector = (MapColumnVector)
batch.columns[0];
+ // IntColumnVector keyColumnVector = (IntColumnVector)
+ // mapColumnVector.getKeyColumnVector();
+ // validateMapKeyColumnVector(keyColumnVector, expectedData);
+ // ColumnVector valueColumnVector =
mapColumnVector.getValueColumnVector();
+ // expectedData.validateInnerChild(valueColumnVector,
+ // BYTES_COLUMN_VECTOR_STRING_FUNC);
iterator.releaseBatch();
}
@@ -330,16 +331,17 @@ public class ParquetColumnVectorTest {
assertThat(iterator.next()).isNull();
// validate column vector
- MapColumnVector mapColumnVector = (MapColumnVector) batch.columns[0];
- IntColumnVector keyColumnVector = (IntColumnVector)
mapColumnVector.getKeyColumnVector();
- validateMapKeyColumnVector(keyColumnVector, expectedData);
-
- ArrayColumnVector valueColumnVector =
- (ArrayColumnVector) mapColumnVector.getValueColumnVector();
- expectedData.validateInnerArray(valueColumnVector, getter);
- expectedData.validateInnerChild(
- valueColumnVector.getColumnVector(),
BYTES_COLUMN_VECTOR_STRING_FUNC);
-
+ // MapColumnVector mapColumnVector = (MapColumnVector)
batch.columns[0];
+ // IntColumnVector keyColumnVector = (IntColumnVector)
+ // mapColumnVector.getKeyColumnVector();
+ // validateMapKeyColumnVector(keyColumnVector, expectedData);
+ //
+ // ArrayColumnVector valueColumnVector =
+ // (ArrayColumnVector)
mapColumnVector.getValueColumnVector();
+ // expectedData.validateInnerArray(valueColumnVector, getter);
+ // expectedData.validateInnerChild(
+ // valueColumnVector.getColumnVector(),
BYTES_COLUMN_VECTOR_STRING_FUNC);
+ //
iterator.releaseBatch();
}
@@ -448,23 +450,23 @@ public class ParquetColumnVectorTest {
assertThat(iterator.next()).isNull();
// validate ColumnVector
- RowColumnVector rowColumnVector = (RowColumnVector) batch.columns[0];
- VectorizedColumnBatch innerBatch = rowColumnVector.getBatch();
-
- IntColumnVector intColumnVector = (IntColumnVector)
innerBatch.columns[0];
- for (int i = 0; i < numRows; i++) {
- Integer f0Value = f0.get(i);
- if (f0Value == null) {
- assertThat(intColumnVector.isNullAt(i)).isTrue();
- } else {
- assertThat(intColumnVector.getInt(i)).isEqualTo(f0Value);
- }
- }
-
- ArrayColumnVector arrayColumnVector = (ArrayColumnVector)
innerBatch.columns[1];
- expectedData.validateColumnVector(arrayColumnVector, getter);
- expectedData.validateInnerChild(
- arrayColumnVector.getColumnVector(),
BYTES_COLUMN_VECTOR_STRING_FUNC);
+ // RowColumnVector rowColumnVector = (RowColumnVector)
batch.columns[0];
+ // VectorizedColumnBatch innerBatch =
rowColumnVector.getBatch();
+ //
+ // IntColumnVector intColumnVector = (IntColumnVector)
innerBatch.columns[0];
+ // for (int i = 0; i < numRows; i++) {
+ // Integer f0Value = f0.get(i);
+ // if (f0Value == null) {
+ // assertThat(intColumnVector.isNullAt(i)).isTrue();
+ // } else {
+ //
assertThat(intColumnVector.getInt(i)).isEqualTo(f0Value);
+ // }
+ // }
+ //
+ // ArrayColumnVector arrayColumnVector = (ArrayColumnVector)
innerBatch.columns[1];
+ // expectedData.validateColumnVector(arrayColumnVector, getter);
+ // expectedData.validateInnerChild(
+ // arrayColumnVector.getColumnVector(),
BYTES_COLUMN_VECTOR_STRING_FUNC);
iterator.releaseBatch();
}
@@ -557,40 +559,148 @@ public class ParquetColumnVectorTest {
assertThat(iterator.next()).isNull();
// validate ColumnVector
- ArrayColumnVector arrayColumnVector = (ArrayColumnVector)
batch.columns[0];
- assertThat(arrayColumnVector.isNullAt(0)).isFalse();
- assertThat(arrayColumnVector.isNullAt(1)).isTrue();
- assertThat(arrayColumnVector.isNullAt(2)).isFalse();
- assertThat(arrayColumnVector.isNullAt(3)).isFalse();
-
- RowColumnVector rowColumnVector = (RowColumnVector)
arrayColumnVector.getColumnVector();
- BytesColumnVector f0Vector = (BytesColumnVector)
rowColumnVector.getBatch().columns[0];
- for (int i = 0; i < 3; i++) {
- BinaryString s = f0.get(i);
- if (s == null) {
- assertThat(f0Vector.isNullAt(i)).isTrue();
- } else {
- assertThat(new
String(f0Vector.getBytes(i).getBytes())).isEqualTo(s.toString());
- }
- }
- ArrayColumnVector f1Vector = (ArrayColumnVector)
rowColumnVector.getBatch().columns[1];
- InternalArray internalArray0 = f1Vector.getArray(0);
- assertThat(internalArray0.size()).isEqualTo(2);
- assertThat(internalArray0.isNullAt(0)).isFalse();
- assertThat(internalArray0.isNullAt(1)).isTrue();
+ // ArrayColumnVector arrayColumnVector = (ArrayColumnVector)
batch.columns[0];
+ // assertThat(arrayColumnVector.isNullAt(0)).isFalse();
+ // assertThat(arrayColumnVector.isNullAt(1)).isTrue();
+ // assertThat(arrayColumnVector.isNullAt(2)).isFalse();
+ // assertThat(arrayColumnVector.isNullAt(3)).isFalse();
+ //
+ // RowColumnVector rowColumnVector = (RowColumnVector)
+ // arrayColumnVector.getColumnVector();
+ // BytesColumnVector f0Vector = (BytesColumnVector)
+ // rowColumnVector.getBatch().columns[0];
+ // for (int i = 0; i < 3; i++) {
+ // BinaryString s = f0.get(i);
+ // if (s == null) {
+ // assertThat(f0Vector.isNullAt(i)).isTrue();
+ // } else {
+ // assertThat(new
+ // String(f0Vector.getBytes(i).getBytes())).isEqualTo(s.toString());
+ // }
+ // }
+ // ArrayColumnVector f1Vector = (ArrayColumnVector)
+ // rowColumnVector.getBatch().columns[1];
+ // InternalArray internalArray0 = f1Vector.getArray(0);
+ // assertThat(internalArray0.size()).isEqualTo(2);
+ // assertThat(internalArray0.isNullAt(0)).isFalse();
+ // assertThat(internalArray0.isNullAt(1)).isTrue();
+ //
+ // InternalArray internalArray1 = f1Vector.getArray(1);
+ // assertThat(internalArray1.size()).isEqualTo(0);
+ //
+ // InternalArray internalArray2 = f1Vector.getArray(2);
+ // assertThat(internalArray2.size()).isEqualTo(1);
+ // assertThat(internalArray2.isNullAt(0)).isFalse();
+ //
+ // IntColumnVector intColumnVector = (IntColumnVector)
f1Vector.getColumnVector();
+ // assertThat(intColumnVector.getInt(0)).isEqualTo(0);
+ // assertThat(intColumnVector.isNullAt(1)).isTrue();
+ // assertThat(intColumnVector.getInt(2)).isEqualTo(1);
- InternalArray internalArray1 = f1Vector.getArray(1);
- assertThat(internalArray1.size()).isEqualTo(0);
+ iterator.releaseBatch();
+ }
- InternalArray internalArray2 = f1Vector.getArray(2);
- assertThat(internalArray2.size()).isEqualTo(1);
- assertThat(internalArray2.isNullAt(0)).isFalse();
+ @Test
+ public void testHighlyNestedSchema() throws IOException {
+ RowType rowType =
+ RowType.builder()
+ .field(
+ "row",
+ RowType.builder()
+ .field("f0",
DataTypes.ARRAY(RowType.of(DataTypes.INT())))
+ .field("f1",
RowType.of(DataTypes.INT()))
+ .build())
+ .build();
- IntColumnVector intColumnVector = (IntColumnVector)
f1Vector.getColumnVector();
- assertThat(intColumnVector.getInt(0)).isEqualTo(0);
- assertThat(intColumnVector.isNullAt(1)).isTrue();
- assertThat(intColumnVector.getInt(2)).isEqualTo(1);
+ InternalRow row0 = GenericRow.of((Object) null);
+ InternalRow row1 = GenericRow.of(GenericRow.of(null,
GenericRow.of(1)));
+ InternalRow row2 =
+ GenericRow.of(
+ GenericRow.of(
+ new GenericArray(
+ new GenericRow[] {
+ GenericRow.of((Object) null),
GenericRow.of(22)
+ }),
+ GenericRow.of((Object) null)));
+ InternalRow row3 =
+ GenericRow.of(GenericRow.of(new GenericArray(new GenericRow[]
{null}), null));
+
+ VectorizedRecordIterator iterator =
+ createVectorizedRecordIterator(rowType, Arrays.asList(row0,
row1, row2, row3));
+ // validate column vector
+ // VectorizedColumnBatch batch = iterator.batch();
+ // RowColumnVector row = (RowColumnVector) batch.columns[0];
+ //
+ // assertThat(row.isNullAt(0)).isTrue();
+ // assertThat(row.isNullAt(1)).isFalse();
+ // assertThat(row.isNullAt(2)).isFalse();
+ // assertThat(row.isNullAt(3)).isFalse();
+ //
+ // ArrayColumnVector f0 = (ArrayColumnVector)
row.getBatch().columns[0];
+ // assertThat(f0.isNullAt(0)).isTrue();
+ // assertThat(f0.isNullAt(1)).isTrue();
+ // assertThat(f0.isNullAt(2)).isFalse();
+ // assertThat(f0.isNullAt(3)).isFalse();
+ //
+ // RowColumnVector arrayRow = (RowColumnVector)
f0.getColumnVector();
+ // assertThat(arrayRow.isNullAt(0)).isFalse();
+ // assertThat(arrayRow.isNullAt(1)).isFalse();
+ // assertThat(arrayRow.isNullAt(2)).isTrue();
+ //
+ // IntColumnVector arrayRowInt = (IntColumnVector)
arrayRow.getBatch().columns[0];
+ // assertThat(arrayRowInt.isNullAt(0)).isTrue();
+ // assertThat(arrayRowInt.isNullAt(1)).isFalse();
+ // assertThat(arrayRowInt.isNullAt(2)).isTrue();
+ //
+ // assertThat(arrayRowInt.getInt(1)).isEqualTo(22);
+ //
+ // RowColumnVector f1 = (RowColumnVector)
row.getBatch().columns[1];
+ // assertThat(f1.isNullAt(0)).isTrue();
+ // assertThat(f1.isNullAt(1)).isFalse();
+ // assertThat(f1.isNullAt(2)).isFalse();
+ // assertThat(f1.isNullAt(3)).isTrue();
+ //
+ // IntColumnVector rowInt = (IntColumnVector)
f1.getBatch().columns[0];
+ // assertThat(rowInt.isNullAt(0)).isTrue();
+ // assertThat(rowInt.isNullAt(1)).isFalse();
+ // assertThat(rowInt.isNullAt(2)).isTrue();
+ // assertThat(rowInt.isNullAt(3)).isTrue();
+ //
+ // assertThat(rowInt.getInt(1)).isEqualTo(1);
+
+ // validate per row
+ InternalRow internalRow0 = iterator.next();
+ assertThat(internalRow0.isNullAt(0)).isTrue();
+
+ InternalRow internalRow1 = iterator.next();
+ assertThat(internalRow1.isNullAt(0)).isFalse();
+ InternalRow internalRow1InternalRow = internalRow1.getRow(0, 2);
+ assertThat(internalRow1InternalRow.isNullAt(0)).isTrue();
+ InternalRow internalRow1InternalRowF1 =
internalRow1InternalRow.getRow(1, 1);
+ assertThat(internalRow1InternalRowF1.getInt(0)).isEqualTo(1);
+
+ InternalRow internalRow2 = iterator.next();
+ assertThat(internalRow2.isNullAt(0)).isFalse();
+ InternalRow internalRow2InternalRow = internalRow2.getRow(0, 2);
+ InternalArray internalRow2InternalRowF0 =
internalRow2InternalRow.getArray(0);
+ assertThat(internalRow2InternalRowF0.size()).isEqualTo(2);
+ InternalRow i0 = internalRow2InternalRowF0.getRow(0, 1);
+ assertThat(i0.isNullAt(0)).isTrue();
+ InternalRow i1 = internalRow2InternalRowF0.getRow(1, 1);
+ assertThat(i1.getInt(0)).isEqualTo(22);
+ InternalRow internalRow2InternalRowF1 =
internalRow2InternalRow.getRow(1, 1);
+ assertThat(internalRow2InternalRowF1.isNullAt(0)).isTrue();
+
+ InternalRow internalRow3 = iterator.next();
+ assertThat(internalRow3.isNullAt(0)).isFalse();
+ InternalRow internalRow3InternalRow = internalRow3.getRow(0, 2);
+ InternalArray internalRow3InternalRowF0 =
internalRow3InternalRow.getArray(0);
+ assertThat(internalRow3InternalRowF0.size()).isEqualTo(1);
+ assertThat(internalRow3InternalRowF0.isNullAt(0)).isTrue();
+ assertThat(internalRow3InternalRow.isNullAt(1)).isTrue();
+
+ assertThat(iterator.next()).isNull();
iterator.releaseBatch();
}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
index ffe4d60082..7db10bab96 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
@@ -34,7 +34,6 @@ import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.BooleanType;
@@ -176,7 +175,11 @@ public class ParquetReadWriteTest {
new
ArrayType(true, new IntType())))
.field("c", new IntType())
.build()),
- new IntType()));
+ new IntType()),
+ RowType.of(
+ new ArrayType(RowType.of(new VarCharType(255))),
+ RowType.of(new IntType()),
+ new VarCharType(255)));
@TempDir public File folder;
@@ -464,7 +467,9 @@ public class ParquetReadWriteTest {
format.createReader(
new FormatReaderContext(
new LocalFileIO(), path, new
LocalFileIO().getFileSize(path)));
- compareNestedRow(rows, new RecordReaderIterator<>(reader));
+ List<InternalRow> results = new ArrayList<>(1283);
+ reader.forEachRemaining(results::add);
+ compareNestedRow(rows, results);
}
@Test
@@ -822,7 +827,8 @@ public class ParquetReadWriteTest {
}),
i)
}),
- i)));
+ i),
+ null));
}
return rows;
}
@@ -873,6 +879,7 @@ public class ParquetReadWriteTest {
row1.add(0, i);
Group row2 = rowList.addGroup(0);
row2.add(0, i + 1);
+ f4.addGroup(0);
// add ROW<`f0` ARRAY<ROW<`b` ARRAY<ARRAY<INT>>, `c` INT>>,
`f1` INT>>
Group f5 = row.addGroup("f5");
@@ -881,6 +888,7 @@ public class ParquetReadWriteTest {
Group insideArray = insideRow.addGroup(0);
createParquetDoubleNestedArray(insideArray, i);
insideRow.add(1, i);
+ arrayRow.addGroup(0);
f5.add(1, i);
writer.write(row);
}
@@ -918,12 +926,12 @@ public class ParquetReadWriteTest {
}
}
- private void compareNestedRow(
- List<InternalRow> rows, RecordReaderIterator<InternalRow>
iterator) throws Exception {
- for (InternalRow origin : rows) {
- assertThat(iterator.hasNext()).isTrue();
- InternalRow result = iterator.next();
+ private void compareNestedRow(List<InternalRow> rows, List<InternalRow>
results) {
+ Assertions.assertEquals(rows.size(), results.size());
+ for (InternalRow result : results) {
+ int index = result.getInt(0);
+ InternalRow origin = rows.get(index);
Assertions.assertEquals(origin.getInt(0), result.getInt(0));
// int[]
@@ -1011,9 +1019,11 @@ public class ParquetReadWriteTest {
origin.getRow(5, 2).getArray(0).getRow(0, 2).getInt(1),
result.getRow(5, 2).getArray(0).getRow(0, 2).getInt(1));
Assertions.assertEquals(origin.getRow(5, 2).getInt(1),
result.getRow(5, 2).getInt(1));
+ Assertions.assertTrue(result.isNullAt(6));
+ Assertions.assertTrue(result.getRow(6, 2).isNullAt(0));
+ Assertions.assertTrue(result.getRow(6, 2).isNullAt(1));
+ Assertions.assertTrue(result.getRow(6, 2).isNullAt(2));
}
- assertThat(iterator.hasNext()).isFalse();
- iterator.close();
}
private void fillWithMap(Map<String, String> map, InternalMap internalMap,
int index) {