This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 508411d94 [core] Introduce RecordWithPositionIterator interface (#2916)
508411d94 is described below
commit 508411d94fd68c90ee20eaaa0eecf83efc2cc15f
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Feb 28 19:52:01 2024 +0800
[core] Introduce RecordWithPositionIterator interface (#2916)
---
.../paimon/data/columnar/ColumnarRowIterator.java | 25 +++++-
.../org/apache/paimon/reader/RecordReader.java | 28 +++++++
.../paimon/reader/RecordWithPositionIterator.java | 98 ++++++++++++++++++++++
.../apache/paimon/format/orc/OrcReaderFactory.java | 9 +-
.../format/parquet/ParquetReaderFactory.java | 13 ++-
.../paimon/format/orc/OrcReaderFactoryTest.java | 61 ++++++++++++++
.../format/parquet/ParquetReadWriteTest.java | 33 ++++++++
7 files changed, 258 insertions(+), 9 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
index 5283f55f8..e60c39902 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
@@ -21,6 +21,7 @@ package org.apache.paimon.data.columnar;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.PartitionInfo;
import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.reader.RecordWithPositionIterator;
import org.apache.paimon.utils.RecyclableIterator;
import org.apache.paimon.utils.VectorMappingUtils;
@@ -30,36 +31,54 @@ import javax.annotation.Nullable;
* A {@link RecordReader.RecordIterator} that returns {@link InternalRow}s.
The next row is set by
* {@link ColumnarRow#setRowId}.
*/
-public class ColumnarRowIterator extends RecyclableIterator<InternalRow> {
+public class ColumnarRowIterator extends RecyclableIterator<InternalRow>
+ implements RecordWithPositionIterator<InternalRow> {
private final ColumnarRow rowData;
private final Runnable recycler;
private int num;
private int pos;
+ private long rowPosition;
public ColumnarRowIterator(ColumnarRow rowData, @Nullable Runnable
recycler) {
super(recycler);
this.rowData = rowData;
this.recycler = recycler;
+ this.rowPosition = 0;
}
- public void set(int num) {
+ /** Reset the number of rows in the vectorized batch and the start
position in this batch. */
+ public void reset(int num) {
this.num = num;
this.pos = 0;
}
+ /**
+ * Reset the current record's row position in the file, and it needs to be
ensured that the row
+ * position after reset is strictly incremented by 1.
+ */
+ public void resetRowPosition(long rowPosition) {
+ this.rowPosition = rowPosition;
+ }
+
@Nullable
@Override
public InternalRow next() {
if (pos < num) {
rowData.setRowId(pos++);
+ rowPosition++;
return rowData;
} else {
return null;
}
}
+ @Override
+ public long rowPosition() {
+ return rowPosition;
+ }
+
public ColumnarRowIterator mapping(
@Nullable PartitionInfo partitionInfo, @Nullable int[]
indexMapping) {
if (partitionInfo != null || indexMapping != null) {
@@ -72,7 +91,7 @@ public class ColumnarRowIterator extends
RecyclableIterator<InternalRow> {
vectors =
VectorMappingUtils.createIndexMappedVectors(indexMapping, vectors);
}
ColumnarRowIterator iterator = new
ColumnarRowIterator(rowData.copy(vectors), recycler);
- iterator.set(num);
+ iterator.reset(num);
return iterator;
}
return this;
diff --git
a/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java
b/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java
index 79fab867d..815661a0e 100644
--- a/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java
+++ b/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java
@@ -28,6 +28,7 @@ import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -142,6 +143,33 @@ public interface RecordReader<T> extends Closeable {
}
}
+ /**
+ * Performs the given action for each remaining element with row position
in {@link
+ * RecordReader} until all elements have been processed or the action
throws an exception.
+ */
+ default void forEachRemainingWithPosition(BiConsumer<Long, ? super T>
action)
+ throws IOException {
+ RecordWithPositionIterator<T> batch;
+ long rowPosition;
+ T record;
+
+ try {
+ while ((batch = (RecordWithPositionIterator<T>) readBatch()) !=
null) {
+ while (true) {
+ rowPosition = batch.rowPosition();
+ record = batch.next();
+ if (record == null) {
+ break;
+ }
+ action.accept(rowPosition, record);
+ }
+ batch.releaseBatch();
+ }
+ } finally {
+ close();
+ }
+ }
+
/**
* Performs the given action for each remaining element in {@link
RecordReader} until all
* elements have been processed or the action throws an exception.
diff --git
a/paimon-common/src/main/java/org/apache/paimon/reader/RecordWithPositionIterator.java
b/paimon-common/src/main/java/org/apache/paimon/reader/RecordWithPositionIterator.java
new file mode 100644
index 000000000..027416cd5
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/reader/RecordWithPositionIterator.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.reader;
+
+import org.apache.paimon.utils.Filter;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.function.Function;
+
+/**
+ * Wrap {@link RecordReader.RecordIterator} to support returning the record's
row position.
+ *
+ * @param <T> The type of the record.
+ */
+public interface RecordWithPositionIterator<T> extends
RecordReader.RecordIterator<T> {
+
+ /**
+ * Get the row position of the row that will be returned by the following
call to {@link
+ * RecordReader.RecordIterator#next}.
+ *
+ * @return the row position from 0 to the number of rows in the file
+ */
+ long rowPosition();
+
+ @Override
+ default <R> RecordWithPositionIterator<R> transform(Function<T, R>
function) {
+ RecordWithPositionIterator<T> thisIterator = this;
+ return new RecordWithPositionIterator<R>() {
+ @Override
+ public long rowPosition() {
+ return thisIterator.rowPosition();
+ }
+
+ @Nullable
+ @Override
+ public R next() throws IOException {
+ T next = thisIterator.next();
+ if (next == null) {
+ return null;
+ }
+ return function.apply(next);
+ }
+
+ @Override
+ public void releaseBatch() {
+ thisIterator.releaseBatch();
+ }
+ };
+ }
+
+ @Override
+ default RecordWithPositionIterator<T> filter(Filter<T> filter) {
+ RecordWithPositionIterator<T> thisIterator = this;
+ return new RecordWithPositionIterator<T>() {
+ @Override
+ public long rowPosition() {
+ return thisIterator.rowPosition();
+ }
+
+ @Nullable
+ @Override
+ public T next() throws IOException {
+ while (true) {
+ T next = thisIterator.next();
+ if (next == null) {
+ return null;
+ }
+ if (filter.test(next)) {
+ return next;
+ }
+ }
+ }
+
+ @Override
+ public void releaseBatch() {
+ thisIterator.releaseBatch();
+ }
+ };
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
index 0e2a0308e..bf8538bcd 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
@@ -176,12 +176,14 @@ public class OrcReaderFactory implements
FormatReaderFactory {
return orcVectorizedRowBatch;
}
- private RecordIterator<InternalRow>
convertAndGetIterator(VectorizedRowBatch orcBatch) {
+ private RecordIterator<InternalRow> convertAndGetIterator(
+ VectorizedRowBatch orcBatch, long rowNumber) {
// no copying from the ORC column vectors to the Paimon columns
vectors necessary,
// because they point to the same data arrays internally design
int batchSize = orcBatch.size;
paimonColumnBatch.setNumRows(batchSize);
- result.set(batchSize);
+ result.reset(batchSize);
+ result.resetRowPosition(rowNumber);
return result;
}
}
@@ -218,12 +220,13 @@ public class OrcReaderFactory implements
FormatReaderFactory {
final OrcReaderBatch batch = getCachedEntry();
final VectorizedRowBatch orcVectorBatch =
batch.orcVectorizedRowBatch();
+ long rowNumber = orcReader.getRowNumber();
if (!nextBatch(orcReader, orcVectorBatch)) {
batch.recycle();
return null;
}
- return batch.convertAndGetIterator(orcVectorBatch);
+ return batch.convertAndGetIterator(orcVectorBatch, rowNumber);
}
@Override
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
index 5f64da4e7..ece3bbc7b 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
@@ -259,6 +259,9 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
/** The number of rows that have been reading, including the current
in flight row group. */
private long totalCountLoadedSoFar;
+ /** The current row's position in the file. */
+ private long currentRowPosition;
+
/**
* For each request column, the reader to read this column. This is
NULL if this column is
* missing from the file, in which case we populate the attribute with
NULL.
@@ -277,6 +280,7 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
this.pool = pool;
this.rowsReturned = 0;
this.totalCountLoadedSoFar = 0;
+ this.currentRowPosition = 0;
}
@Nullable
@@ -284,12 +288,13 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
public RecordIterator<InternalRow> readBatch() throws IOException {
final ParquetReaderBatch batch = getCachedEntry();
+ long rowNumber = currentRowPosition;
if (!nextBatch(batch)) {
batch.recycle();
return null;
}
- return batch.convertAndGetIterator();
+ return batch.convertAndGetIterator(rowNumber);
}
/** Advances to the next batch of rows. Returns false if there are no
more. */
@@ -315,6 +320,7 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
}
}
rowsReturned += num;
+ currentRowPosition += num;
batch.columnarBatch.setNumRows(num);
return true;
}
@@ -392,8 +398,9 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
recycler.recycle(this);
}
- public RecordIterator<InternalRow> convertAndGetIterator() {
- result.set(columnarBatch.getNumRows());
+ public RecordIterator<InternalRow> convertAndGetIterator(long
rowNumber) {
+ result.reset(columnarBatch.getNumRows());
+ result.resetRowPosition(rowNumber);
return result;
}
}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java
index 3ffbaca07..8b3a75230 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java
@@ -31,7 +31,9 @@ import org.apache.paimon.utils.DecimalUtils;
import org.apache.paimon.utils.Projection;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -40,6 +42,8 @@ import java.io.InputStream;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
@@ -136,6 +140,63 @@ class OrcReaderFactoryTest {
assertThat(totalF0.get()).isEqualTo(1844737280400L);
}
+ @Test
+ void testReadRowPosition() throws IOException {
+ OrcReaderFactory format = createFormat(FLAT_FILE_TYPE, new int[] {2,
0, 1});
+
+ AtomicInteger cnt = new AtomicInteger(0);
+ AtomicLong totalF0 = new AtomicLong(0);
+
+ try (RecordReader<InternalRow> reader = format.createReader(new
LocalFileIO(), flatFile)) {
+ reader.forEachRemainingWithPosition(
+ (rowPosition, row) -> {
+ assertThat(row.isNullAt(0)).isFalse();
+ assertThat(row.isNullAt(1)).isFalse();
+ assertThat(row.isNullAt(2)).isFalse();
+ assertThat(row.getString(0).toString()).isNotNull();
+ totalF0.addAndGet(row.getInt(1));
+ assertThat(row.getString(2).toString()).isNotNull();
+ // check row position
+ assertThat(rowPosition).isEqualTo(cnt.get());
+ cnt.incrementAndGet();
+ });
+ }
+ // check that all rows have been read
+ assertThat(cnt.get()).isEqualTo(1920800);
+ assertThat(totalF0.get()).isEqualTo(1844737280400L);
+ }
+
+ @RepeatedTest(10)
+ void testReadRowPositionWithRandomFilterAndPool() throws IOException {
+ ArrayList<OrcFilters.Predicate> predicates = new ArrayList<>();
+ int randomStart = new Random().nextInt(1920800);
+ int randomPooSize = new Random().nextInt(3) + 1;
+ predicates.add(
+ new OrcFilters.Not(
+ new OrcFilters.LessThanEquals(
+ "_col0", PredicateLeaf.Type.LONG,
randomStart)));
+ OrcReaderFactory format = createFormat(FLAT_FILE_TYPE, new int[] {2,
0, 1}, predicates);
+
+ AtomicBoolean isFirst = new AtomicBoolean(true);
+
+ try (RecordReader<InternalRow> reader =
+ format.createReader(new LocalFileIO(), flatFile,
randomPooSize)) {
+ reader.forEachRemainingWithPosition(
+ (rowPosition, row) -> {
+ // check filter: _col0 > randomStart
+ // Note: the accuracy of filter is within flatFile's
strip size
+ if (isFirst.get()) {
+ assertThat(randomStart -
row.getInt(1)).isLessThan(5000);
+ isFirst.set(false);
+ }
+ // check row position
+ // Note: in flatFile, field _col0's value is row
position + 1, we can use it
+ // to check row position
+ assertThat(rowPosition + 1).isEqualTo(row.getInt(1));
+ });
+ }
+ }
+
@Test
void testReadDecimalTypeFile() throws IOException {
OrcReaderFactory format = createFormat(DECIMAL_FILE_TYPE, new int[]
{0});
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
index 6542af029..bf2b7217d 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
@@ -48,6 +48,7 @@ import org.apache.paimon.types.TinyIntType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.InstantiationUtil;
+import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
@@ -281,6 +282,38 @@ public class ParquetReadWriteTest {
});
}
+ @RepeatedTest(10)
+ void testReadRowPosition() throws IOException {
+ int recordNumber = new Random().nextInt(10000) + 1;
+ int batchSize = new Random().nextInt(1000) + 1;
+ int rowGroupSize = new Random().nextInt(1000) + 1;
+ List<InternalRow> records = new ArrayList<>(recordNumber);
+ for (int i = 0; i < recordNumber; i++) {
+ Integer v = i;
+ records.add(newRow(v));
+ }
+
+ Path testPath = createTempParquetFile(folder, records, rowGroupSize);
+
+ DataType[] fieldTypes = new DataType[] {new DoubleType()};
+ ParquetReaderFactory format =
+ new ParquetReaderFactory(
+ new Options(),
+ RowType.builder().fields(fieldTypes, new String[]
{"f7"}).build(),
+ batchSize);
+
+ AtomicInteger cnt = new AtomicInteger(0);
+ try (RecordReader<InternalRow> reader = format.createReader(new
LocalFileIO(), testPath)) {
+ reader.forEachRemainingWithPosition(
+ (rowPosition, row) -> {
+ assertThat(row.getDouble(0)).isEqualTo(cnt.get());
+ // check row position
+ assertThat(rowPosition).isEqualTo(cnt.get());
+ cnt.incrementAndGet();
+ });
+ }
+ }
+
private void innerTestTypes(File folder, List<Integer> records, int
rowGroupSize)
throws IOException {
List<InternalRow> rows =
records.stream().map(this::newRow).collect(Collectors.toList());