This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 508411d94 [core] Introduce RecordWithPositionIterator interface (#2916)
508411d94 is described below

commit 508411d94fd68c90ee20eaaa0eecf83efc2cc15f
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Feb 28 19:52:01 2024 +0800

    [core] Introduce RecordWithPositionIterator interface (#2916)
---
 .../paimon/data/columnar/ColumnarRowIterator.java  | 25 +++++-
 .../org/apache/paimon/reader/RecordReader.java     | 28 +++++++
 .../paimon/reader/RecordWithPositionIterator.java  | 98 ++++++++++++++++++++++
 .../apache/paimon/format/orc/OrcReaderFactory.java |  9 +-
 .../format/parquet/ParquetReaderFactory.java       | 13 ++-
 .../paimon/format/orc/OrcReaderFactoryTest.java    | 61 ++++++++++++++
 .../format/parquet/ParquetReadWriteTest.java       | 33 ++++++++
 7 files changed, 258 insertions(+), 9 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
index 5283f55f8..e60c39902 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
@@ -21,6 +21,7 @@ package org.apache.paimon.data.columnar;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.PartitionInfo;
 import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.reader.RecordWithPositionIterator;
 import org.apache.paimon.utils.RecyclableIterator;
 import org.apache.paimon.utils.VectorMappingUtils;
 
@@ -30,36 +31,54 @@ import javax.annotation.Nullable;
  * A {@link RecordReader.RecordIterator} that returns {@link InternalRow}s. 
The next row is set by
  * {@link ColumnarRow#setRowId}.
  */
-public class ColumnarRowIterator extends RecyclableIterator<InternalRow> {
+public class ColumnarRowIterator extends RecyclableIterator<InternalRow>
+        implements RecordWithPositionIterator<InternalRow> {
 
     private final ColumnarRow rowData;
     private final Runnable recycler;
 
     private int num;
     private int pos;
+    private long rowPosition;
 
     public ColumnarRowIterator(ColumnarRow rowData, @Nullable Runnable 
recycler) {
         super(recycler);
         this.rowData = rowData;
         this.recycler = recycler;
+        this.rowPosition = 0;
     }
 
-    public void set(int num) {
+    /** Reset the number of rows in the vectorized batch and the start 
position in this batch. */
+    public void reset(int num) {
         this.num = num;
         this.pos = 0;
     }
 
+    /**
+     * Reset the current record's row position in the file, and it needs to be 
ensured that the row
+     * position after reset is strictly incremented by 1.
+     */
+    public void resetRowPosition(long rowPosition) {
+        this.rowPosition = rowPosition;
+    }
+
     @Nullable
     @Override
     public InternalRow next() {
         if (pos < num) {
             rowData.setRowId(pos++);
+            rowPosition++;
             return rowData;
         } else {
             return null;
         }
     }
 
+    @Override
+    public long rowPosition() {
+        return rowPosition;
+    }
+
     public ColumnarRowIterator mapping(
             @Nullable PartitionInfo partitionInfo, @Nullable int[] 
indexMapping) {
         if (partitionInfo != null || indexMapping != null) {
@@ -72,7 +91,7 @@ public class ColumnarRowIterator extends 
RecyclableIterator<InternalRow> {
                 vectors = 
VectorMappingUtils.createIndexMappedVectors(indexMapping, vectors);
             }
             ColumnarRowIterator iterator = new 
ColumnarRowIterator(rowData.copy(vectors), recycler);
-            iterator.set(num);
+            iterator.reset(num);
             return iterator;
         }
         return this;
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java 
b/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java
index 79fab867d..815661a0e 100644
--- a/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java
+++ b/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java
@@ -28,6 +28,7 @@ import javax.annotation.Nullable;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
@@ -142,6 +143,33 @@ public interface RecordReader<T> extends Closeable {
         }
     }
 
+    /**
+     * Performs the given action for each remaining element with row position 
in {@link
+     * RecordReader} until all elements have been processed or the action 
throws an exception.
+     */
+    default void forEachRemainingWithPosition(BiConsumer<Long, ? super T> 
action)
+            throws IOException {
+        RecordWithPositionIterator<T> batch;
+        long rowPosition;
+        T record;
+
+        try {
+            while ((batch = (RecordWithPositionIterator<T>) readBatch()) != 
null) {
+                while (true) {
+                    rowPosition = batch.rowPosition();
+                    record = batch.next();
+                    if (record == null) {
+                        break;
+                    }
+                    action.accept(rowPosition, record);
+                }
+                batch.releaseBatch();
+            }
+        } finally {
+            close();
+        }
+    }
+
     /**
      * Performs the given action for each remaining element in {@link 
RecordReader} until all
      * elements have been processed or the action throws an exception.
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/reader/RecordWithPositionIterator.java
 
b/paimon-common/src/main/java/org/apache/paimon/reader/RecordWithPositionIterator.java
new file mode 100644
index 000000000..027416cd5
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/reader/RecordWithPositionIterator.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.reader;
+
+import org.apache.paimon.utils.Filter;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.function.Function;
+
+/**
+ * Wrap {@link RecordReader.RecordIterator} to support returning the record's 
row position.
+ *
+ * @param <T> The type of the record.
+ */
+public interface RecordWithPositionIterator<T> extends 
RecordReader.RecordIterator<T> {
+
+    /**
+     * Get the row position of the row that will be returned by the following 
call to {@link
+     * RecordReader.RecordIterator#next}.
+     *
+     * @return the row position from 0 to the number of rows in the file
+     */
+    long rowPosition();
+
+    @Override
+    default <R> RecordWithPositionIterator<R> transform(Function<T, R> 
function) {
+        RecordWithPositionIterator<T> thisIterator = this;
+        return new RecordWithPositionIterator<R>() {
+            @Override
+            public long rowPosition() {
+                return thisIterator.rowPosition();
+            }
+
+            @Nullable
+            @Override
+            public R next() throws IOException {
+                T next = thisIterator.next();
+                if (next == null) {
+                    return null;
+                }
+                return function.apply(next);
+            }
+
+            @Override
+            public void releaseBatch() {
+                thisIterator.releaseBatch();
+            }
+        };
+    }
+
+    @Override
+    default RecordWithPositionIterator<T> filter(Filter<T> filter) {
+        RecordWithPositionIterator<T> thisIterator = this;
+        return new RecordWithPositionIterator<T>() {
+            @Override
+            public long rowPosition() {
+                return thisIterator.rowPosition();
+            }
+
+            @Nullable
+            @Override
+            public T next() throws IOException {
+                while (true) {
+                    T next = thisIterator.next();
+                    if (next == null) {
+                        return null;
+                    }
+                    if (filter.test(next)) {
+                        return next;
+                    }
+                }
+            }
+
+            @Override
+            public void releaseBatch() {
+                thisIterator.releaseBatch();
+            }
+        };
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
index 0e2a0308e..bf8538bcd 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
@@ -176,12 +176,14 @@ public class OrcReaderFactory implements 
FormatReaderFactory {
             return orcVectorizedRowBatch;
         }
 
-        private RecordIterator<InternalRow> 
convertAndGetIterator(VectorizedRowBatch orcBatch) {
+        private RecordIterator<InternalRow> convertAndGetIterator(
+                VectorizedRowBatch orcBatch, long rowNumber) {
             // no copying from the ORC column vectors to the Paimon columns 
vectors necessary,
             // because they point to the same data arrays internally design
             int batchSize = orcBatch.size;
             paimonColumnBatch.setNumRows(batchSize);
-            result.set(batchSize);
+            result.reset(batchSize);
+            result.resetRowPosition(rowNumber);
             return result;
         }
     }
@@ -218,12 +220,13 @@ public class OrcReaderFactory implements 
FormatReaderFactory {
             final OrcReaderBatch batch = getCachedEntry();
             final VectorizedRowBatch orcVectorBatch = 
batch.orcVectorizedRowBatch();
 
+            long rowNumber = orcReader.getRowNumber();
             if (!nextBatch(orcReader, orcVectorBatch)) {
                 batch.recycle();
                 return null;
             }
 
-            return batch.convertAndGetIterator(orcVectorBatch);
+            return batch.convertAndGetIterator(orcVectorBatch, rowNumber);
         }
 
         @Override
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
index 5f64da4e7..ece3bbc7b 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
@@ -259,6 +259,9 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
         /** The number of rows that have been reading, including the current 
in flight row group. */
         private long totalCountLoadedSoFar;
 
+        /** The current row's position in the file. */
+        private long currentRowPosition;
+
         /**
          * For each request column, the reader to read this column. This is 
NULL if this column is
          * missing from the file, in which case we populate the attribute with 
NULL.
@@ -277,6 +280,7 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
             this.pool = pool;
             this.rowsReturned = 0;
             this.totalCountLoadedSoFar = 0;
+            this.currentRowPosition = 0;
         }
 
         @Nullable
@@ -284,12 +288,13 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
         public RecordIterator<InternalRow> readBatch() throws IOException {
             final ParquetReaderBatch batch = getCachedEntry();
 
+            long rowNumber = currentRowPosition;
             if (!nextBatch(batch)) {
                 batch.recycle();
                 return null;
             }
 
-            return batch.convertAndGetIterator();
+            return batch.convertAndGetIterator(rowNumber);
         }
 
         /** Advances to the next batch of rows. Returns false if there are no 
more. */
@@ -315,6 +320,7 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
                 }
             }
             rowsReturned += num;
+            currentRowPosition += num;
             batch.columnarBatch.setNumRows(num);
             return true;
         }
@@ -392,8 +398,9 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
             recycler.recycle(this);
         }
 
-        public RecordIterator<InternalRow> convertAndGetIterator() {
-            result.set(columnarBatch.getNumRows());
+        public RecordIterator<InternalRow> convertAndGetIterator(long 
rowNumber) {
+            result.reset(columnarBatch.getNumRows());
+            result.resetRowPosition(rowNumber);
             return result;
         }
     }
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java
index 3ffbaca07..8b3a75230 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java
@@ -31,7 +31,9 @@ import org.apache.paimon.utils.DecimalUtils;
 import org.apache.paimon.utils.Projection;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
@@ -40,6 +42,8 @@ import java.io.InputStream;
 import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
@@ -136,6 +140,63 @@ class OrcReaderFactoryTest {
         assertThat(totalF0.get()).isEqualTo(1844737280400L);
     }
 
+    @Test
+    void testReadRowPosition() throws IOException {
+        OrcReaderFactory format = createFormat(FLAT_FILE_TYPE, new int[] {2, 
0, 1});
+
+        AtomicInteger cnt = new AtomicInteger(0);
+        AtomicLong totalF0 = new AtomicLong(0);
+
+        try (RecordReader<InternalRow> reader = format.createReader(new 
LocalFileIO(), flatFile)) {
+            reader.forEachRemainingWithPosition(
+                    (rowPosition, row) -> {
+                        assertThat(row.isNullAt(0)).isFalse();
+                        assertThat(row.isNullAt(1)).isFalse();
+                        assertThat(row.isNullAt(2)).isFalse();
+                        assertThat(row.getString(0).toString()).isNotNull();
+                        totalF0.addAndGet(row.getInt(1));
+                        assertThat(row.getString(2).toString()).isNotNull();
+                        // check row position
+                        assertThat(rowPosition).isEqualTo(cnt.get());
+                        cnt.incrementAndGet();
+                    });
+        }
+        // check that all rows have been read
+        assertThat(cnt.get()).isEqualTo(1920800);
+        assertThat(totalF0.get()).isEqualTo(1844737280400L);
+    }
+
+    @RepeatedTest(10)
+    void testReadRowPositionWithRandomFilterAndPool() throws IOException {
+        ArrayList<OrcFilters.Predicate> predicates = new ArrayList<>();
+        int randomStart = new Random().nextInt(1920800);
+        int randomPooSize = new Random().nextInt(3) + 1;
+        predicates.add(
+                new OrcFilters.Not(
+                        new OrcFilters.LessThanEquals(
+                                "_col0", PredicateLeaf.Type.LONG, 
randomStart)));
+        OrcReaderFactory format = createFormat(FLAT_FILE_TYPE, new int[] {2, 
0, 1}, predicates);
+
+        AtomicBoolean isFirst = new AtomicBoolean(true);
+
+        try (RecordReader<InternalRow> reader =
+                format.createReader(new LocalFileIO(), flatFile, 
randomPooSize)) {
+            reader.forEachRemainingWithPosition(
+                    (rowPosition, row) -> {
+                        // check filter: _col0 > randomStart
+                        // Note: the accuracy of filter is within flatFile's 
strip size
+                        if (isFirst.get()) {
+                            assertThat(randomStart - 
row.getInt(1)).isLessThan(5000);
+                            isFirst.set(false);
+                        }
+                        // check row position
+                        // Note: in flatFile, field _col0's value is row 
position + 1, we can use it
+                        // to check row position
+                        assertThat(rowPosition + 1).isEqualTo(row.getInt(1));
+                    });
+        }
+    }
+
     @Test
     void testReadDecimalTypeFile() throws IOException {
         OrcReaderFactory format = createFormat(DECIMAL_FILE_TYPE, new int[] 
{0});
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
index 6542af029..bf2b7217d 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
@@ -48,6 +48,7 @@ import org.apache.paimon.types.TinyIntType;
 import org.apache.paimon.types.VarCharType;
 import org.apache.paimon.utils.InstantiationUtil;
 
+import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
@@ -281,6 +282,38 @@ public class ParquetReadWriteTest {
                 });
     }
 
+    @RepeatedTest(10)
+    void testReadRowPosition() throws IOException {
+        int recordNumber = new Random().nextInt(10000) + 1;
+        int batchSize = new Random().nextInt(1000) + 1;
+        int rowGroupSize = new Random().nextInt(1000) + 1;
+        List<InternalRow> records = new ArrayList<>(recordNumber);
+        for (int i = 0; i < recordNumber; i++) {
+            Integer v = i;
+            records.add(newRow(v));
+        }
+
+        Path testPath = createTempParquetFile(folder, records, rowGroupSize);
+
+        DataType[] fieldTypes = new DataType[] {new DoubleType()};
+        ParquetReaderFactory format =
+                new ParquetReaderFactory(
+                        new Options(),
+                        RowType.builder().fields(fieldTypes, new String[] 
{"f7"}).build(),
+                        batchSize);
+
+        AtomicInteger cnt = new AtomicInteger(0);
+        try (RecordReader<InternalRow> reader = format.createReader(new 
LocalFileIO(), testPath)) {
+            reader.forEachRemainingWithPosition(
+                    (rowPosition, row) -> {
+                        assertThat(row.getDouble(0)).isEqualTo(cnt.get());
+                        // check row position
+                        assertThat(rowPosition).isEqualTo(cnt.get());
+                        cnt.incrementAndGet();
+                    });
+        }
+    }
+
     private void innerTestTypes(File folder, List<Integer> records, int 
rowGroupSize)
             throws IOException {
         List<InternalRow> rows = 
records.stream().map(this::newRow).collect(Collectors.toList());

Reply via email to