This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a68385cf2 [core] Introduce ApplyDeletionFileRecordIterator to know 
underly iterator and dv (#3257)
a68385cf2 is described below

commit a68385cf2e69014e74ce0e1224a05cc4e7f5e479
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Apr 25 09:40:36 2024 +0800

    [core] Introduce ApplyDeletionFileRecordIterator to know underly iterator 
and dv (#3257)
---
 ...r.java => ApplyDeletionFileRecordIterator.java} | 57 ++++++++++++----------
 .../deletionvectors/ApplyDeletionVectorReader.java |  5 +-
 2 files changed, 31 insertions(+), 31 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionFileRecordIterator.java
similarity index 52%
copy from 
paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
copy to 
paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionFileRecordIterator.java
index ad9288505..8b7a1da5d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionFileRecordIterator.java
@@ -18,56 +18,59 @@
 
 package org.apache.paimon.deletionvectors;
 
+import org.apache.paimon.fs.Path;
 import org.apache.paimon.reader.FileRecordIterator;
-import org.apache.paimon.reader.RecordReader;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-
-/** A {@link RecordReader} which apply {@link DeletionVector} to filter 
record. */
-public class ApplyDeletionVectorReader<T> implements RecordReader<T> {
-
-    private final RecordReader<T> reader;
+/** A {@link FileRecordIterator} wraps a {@link FileRecordIterator} and {@link 
DeletionVector}. */
+public class ApplyDeletionFileRecordIterator<T> implements 
FileRecordIterator<T> {
 
+    private final FileRecordIterator<T> iterator;
     private final DeletionVector deletionVector;
 
-    public ApplyDeletionVectorReader(RecordReader<T> reader, DeletionVector 
deletionVector) {
-        this.reader = reader;
+    public ApplyDeletionFileRecordIterator(
+            FileRecordIterator<T> iterator, DeletionVector deletionVector) {
+        this.iterator = iterator;
         this.deletionVector = deletionVector;
     }
 
-    public RecordReader<T> reader() {
-        return reader;
+    public FileRecordIterator<T> iterator() {
+        return iterator;
     }
 
     public DeletionVector deletionVector() {
         return deletionVector;
     }
 
-    @Nullable
     @Override
-    public RecordIterator<T> readBatch() throws IOException {
-        RecordIterator<T> batch = reader.readBatch();
-
-        if (batch == null) {
-            return null;
-        }
-
-        checkArgument(
-                batch instanceof FileRecordIterator,
-                "There is a bug, RecordIterator in ApplyDeletionVectorReader 
must be RecordWithPositionIterator");
+    public long returnedPosition() {
+        return iterator.returnedPosition();
+    }
 
-        FileRecordIterator<T> batchWithPosition = (FileRecordIterator<T>) 
batch;
+    @Override
+    public Path filePath() {
+        return iterator.filePath();
+    }
 
-        return batchWithPosition.filter(
-                a -> 
!deletionVector.isDeleted(batchWithPosition.returnedPosition()));
+    @Nullable
+    @Override
+    public T next() throws IOException {
+        while (true) {
+            T next = iterator.next();
+            if (next == null) {
+                return null;
+            }
+            if (!deletionVector.isDeleted(returnedPosition())) {
+                return next;
+            }
+        }
     }
 
     @Override
-    public void close() throws IOException {
-        reader.close();
+    public void releaseBatch() {
+        iterator.releaseBatch();
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
index ad9288505..bc00be25e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
@@ -60,10 +60,7 @@ public class ApplyDeletionVectorReader<T> implements 
RecordReader<T> {
                 batch instanceof FileRecordIterator,
                 "There is a bug, RecordIterator in ApplyDeletionVectorReader 
must be RecordWithPositionIterator");
 
-        FileRecordIterator<T> batchWithPosition = (FileRecordIterator<T>) 
batch;
-
-        return batchWithPosition.filter(
-                a -> 
!deletionVector.isDeleted(batchWithPosition.returnedPosition()));
+        return new ApplyDeletionFileRecordIterator<>((FileRecordIterator<T>) 
batch, deletionVector);
     }
 
     @Override

Reply via email to