This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new bf24f8bd2 [core] Expose reader and dv from ApplyDeletionVectorReader
(#3244)
bf24f8bd2 is described below
commit bf24f8bd28ba1be7b270e7f5c5cfef523441b87e
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Apr 22 15:49:15 2024 +0800
[core] Expose reader and dv from ApplyDeletionVectorReader (#3244)
---
.../apache/paimon/reader/EmptyRecordReader.java | 35 ++++++++++++
.../deletionvectors/ApplyDeletionVectorReader.java | 8 +++
...ndexRecordReader.java => FileIndexSkipper.java} | 32 ++---------
.../apache/paimon/operation/RawFileSplitRead.java | 63 +++++++++++-----------
4 files changed, 79 insertions(+), 59 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/reader/EmptyRecordReader.java
b/paimon-common/src/main/java/org/apache/paimon/reader/EmptyRecordReader.java
new file mode 100644
index 000000000..5ffd2701e
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/reader/EmptyRecordReader.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.reader;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/** An empty {@link RecordReader}. */
+public class EmptyRecordReader<T> implements RecordReader<T> {
+ @Nullable
+ @Override
+ public RecordIterator<T> readBatch() throws IOException {
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {}
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
index 6cc8b396f..ad9288505 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
@@ -39,6 +39,14 @@ public class ApplyDeletionVectorReader<T> implements
RecordReader<T> {
this.deletionVector = deletionVector;
}
+ public RecordReader<T> reader() {
+ return reader;
+ }
+
+ public DeletionVector deletionVector() {
+ return deletionVector;
+ }
+
@Nullable
@Override
public RecordIterator<T> readBatch() throws IOException {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexRecordReader.java
b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexSkipper.java
similarity index 73%
rename from
paimon-core/src/main/java/org/apache/paimon/io/FileIndexRecordReader.java
rename to paimon-core/src/main/java/org/apache/paimon/io/FileIndexSkipper.java
index 0a91d4f7f..0c4ac82a0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexRecordReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexSkipper.java
@@ -18,35 +18,26 @@
package org.apache.paimon.io;
-import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fileindex.FileIndexPredicate;
import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
-import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.TableSchema;
-import javax.annotation.Nullable;
-
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
/** File index reader, do the filter in the constructor. */
-public class FileIndexRecordReader implements RecordReader<InternalRow> {
-
- private final RecordReader<InternalRow> reader;
+public class FileIndexSkipper {
- public FileIndexRecordReader(
+ public static boolean skip(
FileIO fileIO,
TableSchema dataSchema,
List<Predicate> dataFilter,
DataFilePathFactory dataFilePathFactory,
- DataFileMeta file,
- ConcatRecordReader.ReaderSupplier<InternalRow> readerSupplier)
+ DataFileMeta file)
throws IOException {
- boolean filterThisFile = false;
if (dataFilter != null && !dataFilter.isEmpty()) {
List<String> indexFiles =
file.extraFiles().stream()
@@ -66,25 +57,12 @@ public class FileIndexRecordReader implements
RecordReader<InternalRow> {
dataSchema.logicalRowType())) {
if (!predicate.testPredicate(
PredicateBuilder.and(dataFilter.toArray(new
Predicate[0])))) {
- filterThisFile = true;
+ return true;
}
}
}
}
- this.reader = filterThisFile ? null : readerSupplier.get();
- }
-
- @Nullable
- @Override
- public RecordIterator<InternalRow> readBatch() throws IOException {
- return reader == null ? null : reader.readBatch();
- }
-
- @Override
- public void close() throws IOException {
- if (reader != null) {
- reader.close();
- }
+ return false;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
index cf3b76e67..fb68823ba 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
@@ -30,11 +30,12 @@ import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
-import org.apache.paimon.io.FileIndexRecordReader;
+import org.apache.paimon.io.FileIndexSkipper;
import org.apache.paimon.io.FileRecordReader;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.partition.PartitionUtils;
import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.EmptyRecordReader;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.IndexCastMapping;
import org.apache.paimon.schema.SchemaEvolutionUtil;
@@ -204,37 +205,35 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
RawFileBulkFormatMapping bulkFormatMapping,
DeletionVector.Factory dvFactory)
throws IOException {
- ConcatRecordReader.ReaderSupplier<InternalRow> supplier =
- () -> {
- FileRecordReader fileRecordReader =
- new FileRecordReader(
- bulkFormatMapping.getReaderFactory(),
- new FormatReaderContext(
- fileIO,
-
dataFilePathFactory.toPath(file.fileName()),
- file.fileSize()),
- bulkFormatMapping.getIndexMapping(),
- bulkFormatMapping.getCastMapping(),
- PartitionUtils.create(
-
bulkFormatMapping.getPartitionPair(), partition));
-
- Optional<DeletionVector> deletionVector =
dvFactory.create(file.fileName());
- if (deletionVector.isPresent() &&
!deletionVector.get().isEmpty()) {
- return new ApplyDeletionVectorReader<>(
- fileRecordReader, deletionVector.get());
- }
- return fileRecordReader;
- };
-
- return fileIndexReadEnabled
- ? new FileIndexRecordReader(
- fileIO,
- bulkFormatMapping.getDataSchema(),
- bulkFormatMapping.getDataFilters(),
- dataFilePathFactory,
- file,
- supplier)
- : supplier.get();
+ if (fileIndexReadEnabled) {
+ boolean skip =
+ FileIndexSkipper.skip(
+ fileIO,
+ bulkFormatMapping.getDataSchema(),
+ bulkFormatMapping.getDataFilters(),
+ dataFilePathFactory,
+ file);
+ if (skip) {
+ return new EmptyRecordReader<>();
+ }
+ }
+
+ FileRecordReader fileRecordReader =
+ new FileRecordReader(
+ bulkFormatMapping.getReaderFactory(),
+ new FormatReaderContext(
+ fileIO,
+ dataFilePathFactory.toPath(file.fileName()),
+ file.fileSize()),
+ bulkFormatMapping.getIndexMapping(),
+ bulkFormatMapping.getCastMapping(),
+
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition));
+
+ Optional<DeletionVector> deletionVector =
dvFactory.create(file.fileName());
+ if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) {
+ return new ApplyDeletionVectorReader<>(fileRecordReader,
deletionVector.get());
+ }
+ return fileRecordReader;
}
/** Bulk format mapping with data schema and data filters. */