This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a68385cf2 [core] Introduce ApplyDeletionFileRecordIterator to know
underly iterator and dv (#3257)
a68385cf2 is described below
commit a68385cf2e69014e74ce0e1224a05cc4e7f5e479
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Apr 25 09:40:36 2024 +0800
[core] Introduce ApplyDeletionFileRecordIterator to know underly iterator
and dv (#3257)
---
...r.java => ApplyDeletionFileRecordIterator.java} | 57 ++++++++++++----------
.../deletionvectors/ApplyDeletionVectorReader.java | 5 +-
2 files changed, 31 insertions(+), 31 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionFileRecordIterator.java
similarity index 52%
copy from
paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
copy to
paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionFileRecordIterator.java
index ad9288505..8b7a1da5d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionFileRecordIterator.java
@@ -18,56 +18,59 @@
package org.apache.paimon.deletionvectors;
+import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.FileRecordIterator;
-import org.apache.paimon.reader.RecordReader;
import javax.annotation.Nullable;
import java.io.IOException;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-
-/** A {@link RecordReader} which apply {@link DeletionVector} to filter
record. */
-public class ApplyDeletionVectorReader<T> implements RecordReader<T> {
-
- private final RecordReader<T> reader;
+/** A {@link FileRecordIterator} wraps a {@link FileRecordIterator} and {@link
DeletionVector}. */
+public class ApplyDeletionFileRecordIterator<T> implements
FileRecordIterator<T> {
+ private final FileRecordIterator<T> iterator;
private final DeletionVector deletionVector;
- public ApplyDeletionVectorReader(RecordReader<T> reader, DeletionVector
deletionVector) {
- this.reader = reader;
+ public ApplyDeletionFileRecordIterator(
+ FileRecordIterator<T> iterator, DeletionVector deletionVector) {
+ this.iterator = iterator;
this.deletionVector = deletionVector;
}
- public RecordReader<T> reader() {
- return reader;
+ public FileRecordIterator<T> iterator() {
+ return iterator;
}
public DeletionVector deletionVector() {
return deletionVector;
}
- @Nullable
@Override
- public RecordIterator<T> readBatch() throws IOException {
- RecordIterator<T> batch = reader.readBatch();
-
- if (batch == null) {
- return null;
- }
-
- checkArgument(
- batch instanceof FileRecordIterator,
- "There is a bug, RecordIterator in ApplyDeletionVectorReader
must be RecordWithPositionIterator");
+ public long returnedPosition() {
+ return iterator.returnedPosition();
+ }
- FileRecordIterator<T> batchWithPosition = (FileRecordIterator<T>)
batch;
+ @Override
+ public Path filePath() {
+ return iterator.filePath();
+ }
- return batchWithPosition.filter(
- a ->
!deletionVector.isDeleted(batchWithPosition.returnedPosition()));
+ @Nullable
+ @Override
+ public T next() throws IOException {
+ while (true) {
+ T next = iterator.next();
+ if (next == null) {
+ return null;
+ }
+ if (!deletionVector.isDeleted(returnedPosition())) {
+ return next;
+ }
+ }
}
@Override
- public void close() throws IOException {
- reader.close();
+ public void releaseBatch() {
+ iterator.releaseBatch();
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
index ad9288505..bc00be25e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
@@ -60,10 +60,7 @@ public class ApplyDeletionVectorReader<T> implements
RecordReader<T> {
batch instanceof FileRecordIterator,
"There is a bug, RecordIterator in ApplyDeletionVectorReader
must be RecordWithPositionIterator");
- FileRecordIterator<T> batchWithPosition = (FileRecordIterator<T>)
batch;
-
- return batchWithPosition.filter(
- a ->
!deletionVector.isDeleted(batchWithPosition.returnedPosition()));
+ return new ApplyDeletionFileRecordIterator<>((FileRecordIterator<T>)
batch, deletionVector);
}
@Override