This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4791d6685 [core] Redefine the usage of RecordWithPositionIterator 
(#2953)
4791d6685 is described below

commit 4791d66855b227a4f1130a371b5f1f6e2e2a51ab
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Mar 6 17:31:19 2024 +0800

    [core] Redefine the usage of RecordWithPositionIterator (#2953)
---
 .../paimon/data/columnar/ColumnarRowIterator.java  | 41 +++++++++++-----------
 .../org/apache/paimon/reader/RecordReader.java     | 10 ++----
 .../paimon/reader/RecordWithPositionIterator.java  | 13 ++++---
 .../apache/paimon/format/orc/OrcReaderFactory.java |  3 +-
 .../format/parquet/ParquetReaderFactory.java       |  3 +-
 .../paimon/format/orc/OrcReaderFactoryTest.java    | 20 +++++++++++
 6 files changed, 50 insertions(+), 40 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
index e60c39902..6de861af0 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
@@ -38,36 +38,31 @@ public class ColumnarRowIterator extends 
RecyclableIterator<InternalRow>
     private final Runnable recycler;
 
     private int num;
-    private int pos;
-    private long rowPosition;
+    private int nextPos;
+    private long nextGlobalPos;
 
     public ColumnarRowIterator(ColumnarRow rowData, @Nullable Runnable 
recycler) {
         super(recycler);
         this.rowData = rowData;
         this.recycler = recycler;
-        this.rowPosition = 0;
-    }
-
-    /** Reset the number of rows in the vectorized batch and the start 
position in this batch. */
-    public void reset(int num) {
-        this.num = num;
-        this.pos = 0;
     }
 
     /**
-     * Reset the current record's row position in the file, and it needs to be 
ensured that the row
-     * position after reset is strictly incremented by 1.
+     * Reset the number of rows in the vectorized batch, the start position in 
this batch and the
+     * global position.
      */
-    public void resetRowPosition(long rowPosition) {
-        this.rowPosition = rowPosition;
+    public void reset(int num, long nextGlobalPos) {
+        this.num = num;
+        this.nextPos = 0;
+        this.nextGlobalPos = nextGlobalPos;
     }
 
     @Nullable
     @Override
     public InternalRow next() {
-        if (pos < num) {
-            rowData.setRowId(pos++);
-            rowPosition++;
+        if (nextPos < num) {
+            rowData.setRowId(nextPos++);
+            nextGlobalPos++;
             return rowData;
         } else {
             return null;
@@ -75,8 +70,14 @@ public class ColumnarRowIterator extends 
RecyclableIterator<InternalRow>
     }
 
     @Override
-    public long rowPosition() {
-        return rowPosition;
+    public long returnedPosition() {
+        return nextGlobalPos - 1;
+    }
+
+    public ColumnarRowIterator copy(ColumnVector[] vectors) {
+        ColumnarRowIterator newIterator = new 
ColumnarRowIterator(rowData.copy(vectors), recycler);
+        newIterator.reset(num, nextGlobalPos);
+        return newIterator;
     }
 
     public ColumnarRowIterator mapping(
@@ -90,9 +91,7 @@ public class ColumnarRowIterator extends 
RecyclableIterator<InternalRow>
             if (indexMapping != null) {
                 vectors = 
VectorMappingUtils.createIndexMappedVectors(indexMapping, vectors);
             }
-            ColumnarRowIterator iterator = new 
ColumnarRowIterator(rowData.copy(vectors), recycler);
-            iterator.reset(num);
-            return iterator;
+            return copy(vectors);
         }
         return this;
     }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java 
b/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java
index 815661a0e..276a85571 100644
--- a/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java
+++ b/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java
@@ -150,18 +150,12 @@ public interface RecordReader<T> extends Closeable {
     default void forEachRemainingWithPosition(BiConsumer<Long, ? super T> 
action)
             throws IOException {
         RecordWithPositionIterator<T> batch;
-        long rowPosition;
         T record;
 
         try {
             while ((batch = (RecordWithPositionIterator<T>) readBatch()) != 
null) {
-                while (true) {
-                    rowPosition = batch.rowPosition();
-                    record = batch.next();
-                    if (record == null) {
-                        break;
-                    }
-                    action.accept(rowPosition, record);
+                while ((record = batch.next()) != null) {
+                    action.accept(batch.returnedPosition(), record);
                 }
                 batch.releaseBatch();
             }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/reader/RecordWithPositionIterator.java
 
b/paimon-common/src/main/java/org/apache/paimon/reader/RecordWithPositionIterator.java
index 027416cd5..e4778413a 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/reader/RecordWithPositionIterator.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/reader/RecordWithPositionIterator.java
@@ -33,20 +33,19 @@ import java.util.function.Function;
 public interface RecordWithPositionIterator<T> extends 
RecordReader.RecordIterator<T> {
 
     /**
-     * Get the row position of the row that will be returned by the following 
call to {@link
-     * RecordReader.RecordIterator#next}.
+     * Get the row position of the row returned by {@link 
RecordReader.RecordIterator#next}.
      *
      * @return the row position from 0 to the number of rows in the file
      */
-    long rowPosition();
+    long returnedPosition();
 
     @Override
     default <R> RecordWithPositionIterator<R> transform(Function<T, R> 
function) {
         RecordWithPositionIterator<T> thisIterator = this;
         return new RecordWithPositionIterator<R>() {
             @Override
-            public long rowPosition() {
-                return thisIterator.rowPosition();
+            public long returnedPosition() {
+                return thisIterator.returnedPosition();
             }
 
             @Nullable
@@ -71,8 +70,8 @@ public interface RecordWithPositionIterator<T> extends 
RecordReader.RecordIterat
         RecordWithPositionIterator<T> thisIterator = this;
         return new RecordWithPositionIterator<T>() {
             @Override
-            public long rowPosition() {
-                return thisIterator.rowPosition();
+            public long returnedPosition() {
+                return thisIterator.returnedPosition();
             }
 
             @Nullable
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
index bf8538bcd..696665777 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
@@ -182,8 +182,7 @@ public class OrcReaderFactory implements 
FormatReaderFactory {
             // because they point to the same data arrays internally design
             int batchSize = orcBatch.size;
             paimonColumnBatch.setNumRows(batchSize);
-            result.reset(batchSize);
-            result.resetRowPosition(rowNumber);
+            result.reset(batchSize, rowNumber);
             return result;
         }
     }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
index ece3bbc7b..2c2985d32 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
@@ -399,8 +399,7 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
         }
 
         public RecordIterator<InternalRow> convertAndGetIterator(long 
rowNumber) {
-            result.reset(columnarBatch.getNumRows());
-            result.resetRowPosition(rowNumber);
+            result.reset(columnarBatch.getNumRows(), rowNumber);
             return result;
         }
     }
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java
index 8b3a75230..a5160f5c7 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java
@@ -197,6 +197,26 @@ class OrcReaderFactoryTest {
         }
     }
 
+    @Test
+    void testReadRowPositionWithTransformAndFilter() throws IOException {
+        int randomPooSize = new Random().nextInt(3) + 1;
+        OrcReaderFactory format = createFormat(FLAT_FILE_TYPE, new int[] {2, 
0, 1});
+
+        try (RecordReader<InternalRow> reader =
+                format.createReader(new LocalFileIO(), flatFile, 
randomPooSize)) {
+            reader.transform(row -> row)
+                    .filter(row -> row.getInt(1) % 123 == 0)
+                    .forEachRemainingWithPosition(
+                            (rowPosition, row) -> {
+                                // check row position
+                                // Note: in flatFile, field _col0's value is 
row position + 1, we
+                                // can use it
+                                // to check row position
+                                assertThat(rowPosition + 
1).isEqualTo(row.getInt(1));
+                            });
+        }
+    }
+
     @Test
     void testReadDecimalTypeFile() throws IOException {
         OrcReaderFactory format = createFormat(DECIMAL_FILE_TYPE, new int[] 
{0});

Reply via email to