This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4791d6685 [core] Redefine the usage of RecordWithPositionIterator
(#2953)
4791d6685 is described below
commit 4791d66855b227a4f1130a371b5f1f6e2e2a51ab
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Mar 6 17:31:19 2024 +0800
[core] Redefine the usage of RecordWithPositionIterator (#2953)
---
.../paimon/data/columnar/ColumnarRowIterator.java | 41 +++++++++++-----------
.../org/apache/paimon/reader/RecordReader.java | 10 ++----
.../paimon/reader/RecordWithPositionIterator.java | 13 ++++---
.../apache/paimon/format/orc/OrcReaderFactory.java | 3 +-
.../format/parquet/ParquetReaderFactory.java | 3 +-
.../paimon/format/orc/OrcReaderFactoryTest.java | 20 +++++++++++
6 files changed, 50 insertions(+), 40 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
index e60c39902..6de861af0 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
@@ -38,36 +38,31 @@ public class ColumnarRowIterator extends
RecyclableIterator<InternalRow>
private final Runnable recycler;
private int num;
- private int pos;
- private long rowPosition;
+ private int nextPos;
+ private long nextGlobalPos;
public ColumnarRowIterator(ColumnarRow rowData, @Nullable Runnable
recycler) {
super(recycler);
this.rowData = rowData;
this.recycler = recycler;
- this.rowPosition = 0;
- }
-
- /** Reset the number of rows in the vectorized batch and the start
position in this batch. */
- public void reset(int num) {
- this.num = num;
- this.pos = 0;
}
/**
- * Reset the current record's row position in the file, and it needs to be
ensured that the row
- * position after reset is strictly incremented by 1.
+ * Reset the number of rows in the vectorized batch, the start position in
this batch and the
+ * global position.
*/
- public void resetRowPosition(long rowPosition) {
- this.rowPosition = rowPosition;
+ public void reset(int num, long nextGlobalPos) {
+ this.num = num;
+ this.nextPos = 0;
+ this.nextGlobalPos = nextGlobalPos;
}
@Nullable
@Override
public InternalRow next() {
- if (pos < num) {
- rowData.setRowId(pos++);
- rowPosition++;
+ if (nextPos < num) {
+ rowData.setRowId(nextPos++);
+ nextGlobalPos++;
return rowData;
} else {
return null;
@@ -75,8 +70,14 @@ public class ColumnarRowIterator extends
RecyclableIterator<InternalRow>
}
@Override
- public long rowPosition() {
- return rowPosition;
+ public long returnedPosition() {
+ return nextGlobalPos - 1;
+ }
+
+ public ColumnarRowIterator copy(ColumnVector[] vectors) {
+ ColumnarRowIterator newIterator = new
ColumnarRowIterator(rowData.copy(vectors), recycler);
+ newIterator.reset(num, nextGlobalPos);
+ return newIterator;
}
public ColumnarRowIterator mapping(
@@ -90,9 +91,7 @@ public class ColumnarRowIterator extends
RecyclableIterator<InternalRow>
if (indexMapping != null) {
vectors =
VectorMappingUtils.createIndexMappedVectors(indexMapping, vectors);
}
- ColumnarRowIterator iterator = new
ColumnarRowIterator(rowData.copy(vectors), recycler);
- iterator.reset(num);
- return iterator;
+ return copy(vectors);
}
return this;
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java
b/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java
index 815661a0e..276a85571 100644
--- a/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java
+++ b/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java
@@ -150,18 +150,12 @@ public interface RecordReader<T> extends Closeable {
default void forEachRemainingWithPosition(BiConsumer<Long, ? super T>
action)
throws IOException {
RecordWithPositionIterator<T> batch;
- long rowPosition;
T record;
try {
while ((batch = (RecordWithPositionIterator<T>) readBatch()) !=
null) {
- while (true) {
- rowPosition = batch.rowPosition();
- record = batch.next();
- if (record == null) {
- break;
- }
- action.accept(rowPosition, record);
+ while ((record = batch.next()) != null) {
+ action.accept(batch.returnedPosition(), record);
}
batch.releaseBatch();
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/reader/RecordWithPositionIterator.java
b/paimon-common/src/main/java/org/apache/paimon/reader/RecordWithPositionIterator.java
index 027416cd5..e4778413a 100644
---
a/paimon-common/src/main/java/org/apache/paimon/reader/RecordWithPositionIterator.java
+++
b/paimon-common/src/main/java/org/apache/paimon/reader/RecordWithPositionIterator.java
@@ -33,20 +33,19 @@ import java.util.function.Function;
public interface RecordWithPositionIterator<T> extends
RecordReader.RecordIterator<T> {
/**
- * Get the row position of the row that will be returned by the following
call to {@link
- * RecordReader.RecordIterator#next}.
+ * Get the row position of the row returned by {@link
RecordReader.RecordIterator#next}.
*
* @return the row position from 0 to the number of rows in the file
*/
- long rowPosition();
+ long returnedPosition();
@Override
default <R> RecordWithPositionIterator<R> transform(Function<T, R>
function) {
RecordWithPositionIterator<T> thisIterator = this;
return new RecordWithPositionIterator<R>() {
@Override
- public long rowPosition() {
- return thisIterator.rowPosition();
+ public long returnedPosition() {
+ return thisIterator.returnedPosition();
}
@Nullable
@@ -71,8 +70,8 @@ public interface RecordWithPositionIterator<T> extends
RecordReader.RecordIterat
RecordWithPositionIterator<T> thisIterator = this;
return new RecordWithPositionIterator<T>() {
@Override
- public long rowPosition() {
- return thisIterator.rowPosition();
+ public long returnedPosition() {
+ return thisIterator.returnedPosition();
}
@Nullable
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
index bf8538bcd..696665777 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
@@ -182,8 +182,7 @@ public class OrcReaderFactory implements
FormatReaderFactory {
// because they point to the same data arrays internally design
int batchSize = orcBatch.size;
paimonColumnBatch.setNumRows(batchSize);
- result.reset(batchSize);
- result.resetRowPosition(rowNumber);
+ result.reset(batchSize, rowNumber);
return result;
}
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
index ece3bbc7b..2c2985d32 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
@@ -399,8 +399,7 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
}
public RecordIterator<InternalRow> convertAndGetIterator(long
rowNumber) {
- result.reset(columnarBatch.getNumRows());
- result.resetRowPosition(rowNumber);
+ result.reset(columnarBatch.getNumRows(), rowNumber);
return result;
}
}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java
index 8b3a75230..a5160f5c7 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java
@@ -197,6 +197,26 @@ class OrcReaderFactoryTest {
}
}
+ @Test
+ void testReadRowPositionWithTransformAndFilter() throws IOException {
+ int randomPooSize = new Random().nextInt(3) + 1;
+ OrcReaderFactory format = createFormat(FLAT_FILE_TYPE, new int[] {2,
0, 1});
+
+ try (RecordReader<InternalRow> reader =
+ format.createReader(new LocalFileIO(), flatFile,
randomPooSize)) {
+ reader.transform(row -> row)
+ .filter(row -> row.getInt(1) % 123 == 0)
+ .forEachRemainingWithPosition(
+ (rowPosition, row) -> {
+ // check row position
+ // Note: in flatFile, field _col0's value is
row position + 1, we
+ // can use it
+ // to check row position
+ assertThat(rowPosition +
1).isEqualTo(row.getInt(1));
+ });
+ }
+ }
+
@Test
void testReadDecimalTypeFile() throws IOException {
OrcReaderFactory format = createFormat(DECIMAL_FILE_TYPE, new int[]
{0});