This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new bf24f8bd2 [core] Expose reader and dv from ApplyDeletionVectorReader 
(#3244)
bf24f8bd2 is described below

commit bf24f8bd28ba1be7b270e7f5c5cfef523441b87e
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Apr 22 15:49:15 2024 +0800

    [core] Expose reader and dv from ApplyDeletionVectorReader (#3244)
---
 .../apache/paimon/reader/EmptyRecordReader.java    | 35 ++++++++++++
 .../deletionvectors/ApplyDeletionVectorReader.java |  8 +++
 ...ndexRecordReader.java => FileIndexSkipper.java} | 32 ++---------
 .../apache/paimon/operation/RawFileSplitRead.java  | 63 +++++++++++-----------
 4 files changed, 79 insertions(+), 59 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/reader/EmptyRecordReader.java 
b/paimon-common/src/main/java/org/apache/paimon/reader/EmptyRecordReader.java
new file mode 100644
index 000000000..5ffd2701e
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/reader/EmptyRecordReader.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.reader;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/** An empty {@link RecordReader}. */
+public class EmptyRecordReader<T> implements RecordReader<T> {
+    @Nullable
+    @Override
+    public RecordIterator<T> readBatch() throws IOException {
+        return null;
+    }
+
+    @Override
+    public void close() throws IOException {}
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
index 6cc8b396f..ad9288505 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
@@ -39,6 +39,14 @@ public class ApplyDeletionVectorReader<T> implements 
RecordReader<T> {
         this.deletionVector = deletionVector;
     }
 
+    public RecordReader<T> reader() {
+        return reader;
+    }
+
+    public DeletionVector deletionVector() {
+        return deletionVector;
+    }
+
     @Nullable
     @Override
     public RecordIterator<T> readBatch() throws IOException {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexRecordReader.java 
b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexSkipper.java
similarity index 73%
rename from 
paimon-core/src/main/java/org/apache/paimon/io/FileIndexRecordReader.java
rename to paimon-core/src/main/java/org/apache/paimon/io/FileIndexSkipper.java
index 0a91d4f7f..0c4ac82a0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexRecordReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexSkipper.java
@@ -18,35 +18,26 @@
 
 package org.apache.paimon.io;
 
-import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fileindex.FileIndexPredicate;
 import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.mergetree.compact.ConcatRecordReader;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
-import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.TableSchema;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.util.List;
 import java.util.stream.Collectors;
 
 /** File index reader, do the filter in the constructor. */
-public class FileIndexRecordReader implements RecordReader<InternalRow> {
-
-    private final RecordReader<InternalRow> reader;
+public class FileIndexSkipper {
 
-    public FileIndexRecordReader(
+    public static boolean skip(
             FileIO fileIO,
             TableSchema dataSchema,
             List<Predicate> dataFilter,
             DataFilePathFactory dataFilePathFactory,
-            DataFileMeta file,
-            ConcatRecordReader.ReaderSupplier<InternalRow> readerSupplier)
+            DataFileMeta file)
             throws IOException {
-        boolean filterThisFile = false;
         if (dataFilter != null && !dataFilter.isEmpty()) {
             List<String> indexFiles =
                     file.extraFiles().stream()
@@ -66,25 +57,12 @@ public class FileIndexRecordReader implements 
RecordReader<InternalRow> {
                                 dataSchema.logicalRowType())) {
                     if (!predicate.testPredicate(
                             PredicateBuilder.and(dataFilter.toArray(new 
Predicate[0])))) {
-                        filterThisFile = true;
+                        return true;
                     }
                 }
             }
         }
 
-        this.reader = filterThisFile ? null : readerSupplier.get();
-    }
-
-    @Nullable
-    @Override
-    public RecordIterator<InternalRow> readBatch() throws IOException {
-        return reader == null ? null : reader.readBatch();
-    }
-
-    @Override
-    public void close() throws IOException {
-        if (reader != null) {
-            reader.close();
-        }
+        return false;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
index cf3b76e67..fb68823ba 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
@@ -30,11 +30,12 @@ import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
-import org.apache.paimon.io.FileIndexRecordReader;
+import org.apache.paimon.io.FileIndexSkipper;
 import org.apache.paimon.io.FileRecordReader;
 import org.apache.paimon.mergetree.compact.ConcatRecordReader;
 import org.apache.paimon.partition.PartitionUtils;
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.EmptyRecordReader;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.IndexCastMapping;
 import org.apache.paimon.schema.SchemaEvolutionUtil;
@@ -204,37 +205,35 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
             RawFileBulkFormatMapping bulkFormatMapping,
             DeletionVector.Factory dvFactory)
             throws IOException {
-        ConcatRecordReader.ReaderSupplier<InternalRow> supplier =
-                () -> {
-                    FileRecordReader fileRecordReader =
-                            new FileRecordReader(
-                                    bulkFormatMapping.getReaderFactory(),
-                                    new FormatReaderContext(
-                                            fileIO,
-                                            
dataFilePathFactory.toPath(file.fileName()),
-                                            file.fileSize()),
-                                    bulkFormatMapping.getIndexMapping(),
-                                    bulkFormatMapping.getCastMapping(),
-                                    PartitionUtils.create(
-                                            
bulkFormatMapping.getPartitionPair(), partition));
-
-                    Optional<DeletionVector> deletionVector = 
dvFactory.create(file.fileName());
-                    if (deletionVector.isPresent() && 
!deletionVector.get().isEmpty()) {
-                        return new ApplyDeletionVectorReader<>(
-                                fileRecordReader, deletionVector.get());
-                    }
-                    return fileRecordReader;
-                };
-
-        return fileIndexReadEnabled
-                ? new FileIndexRecordReader(
-                        fileIO,
-                        bulkFormatMapping.getDataSchema(),
-                        bulkFormatMapping.getDataFilters(),
-                        dataFilePathFactory,
-                        file,
-                        supplier)
-                : supplier.get();
+        if (fileIndexReadEnabled) {
+            boolean skip =
+                    FileIndexSkipper.skip(
+                            fileIO,
+                            bulkFormatMapping.getDataSchema(),
+                            bulkFormatMapping.getDataFilters(),
+                            dataFilePathFactory,
+                            file);
+            if (skip) {
+                return new EmptyRecordReader<>();
+            }
+        }
+
+        FileRecordReader fileRecordReader =
+                new FileRecordReader(
+                        bulkFormatMapping.getReaderFactory(),
+                        new FormatReaderContext(
+                                fileIO,
+                                dataFilePathFactory.toPath(file.fileName()),
+                                file.fileSize()),
+                        bulkFormatMapping.getIndexMapping(),
+                        bulkFormatMapping.getCastMapping(),
+                        
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition));
+
+        Optional<DeletionVector> deletionVector = 
dvFactory.create(file.fileName());
+        if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) {
+            return new ApplyDeletionVectorReader<>(fileRecordReader, 
deletionVector.get());
+        }
+        return fileRecordReader;
     }
 
     /** Bulk format mapping with data schema and data filters. */

Reply via email to