This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 21e016a53 [core] Optimize first_row batch read (#3055)
21e016a53 is described below

commit 21e016a53c72e995d5b0a39a188e89032dd5d4e9
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Mar 20 15:12:10 2024 +0800

    [core] Optimize first_row batch read (#3055)
---
 .../src/main/java/org/apache/paimon/KeyValueFileStore.java   |  3 ++-
 .../org/apache/paimon/operation/KeyValueFileStoreRead.java   |  2 +-
 .../org/apache/paimon/operation/KeyValueFileStoreScan.java   | 12 ++++++++++--
 .../java/org/apache/paimon/table/AbstractFileStoreTable.java |  5 ++++-
 .../org/apache/paimon/table/PrimaryKeyFileStoreTable.java    |  8 +++++---
 .../org/apache/paimon/table/source/InnerTableScanImpl.java   |  5 ++++-
 .../apache/paimon/table/source/MergeTreeSplitGenerator.java  | 11 +++++++++--
 .../paimon/table/source/snapshot/SnapshotReaderImpl.java     |  6 +++++-
 .../org/apache/paimon/table/system/ReadOptimizedTable.java   |  1 +
 .../org/apache/paimon/table/source/SplitGeneratorTest.java   |  5 +++--
 10 files changed, 44 insertions(+), 14 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index d80bd6d39..956b615d7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -229,7 +229,8 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                 forWrite,
                 options.scanManifestParallelism(),
                 branchName,
-                options.deletionVectorsEnabled());
+                options.deletionVectorsEnabled(),
+                options.mergeEngine());
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
index 0e115fddd..b0ab4338f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
@@ -213,7 +213,7 @@ public class KeyValueFileStoreRead implements 
FileStoreRead<KeyValue> {
     private RecordReader<KeyValue> 
createReaderWithoutOuterProjection(DataSplit split)
             throws IOException {
         if (split.beforeFiles().isEmpty()) {
-            if (split.isStreaming() || split.deletionFiles().isPresent()) {
+            if (split.isStreaming() || split.convertToRawFiles().isPresent()) {
                 return noMergeRead(
                         split.partition(),
                         split.bucket(),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index 0f34cac5a..b4c4909ae 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.operation;
 
+import org.apache.paimon.CoreOptions.MergeEngine;
 import org.apache.paimon.KeyValueFileStore;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFile;
@@ -36,6 +37,8 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
+
 /** {@link FileStoreScan} for {@link KeyValueFileStore}. */
 public class KeyValueFileStoreScan extends AbstractFileStoreScan {
 
@@ -45,6 +48,7 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
     private Predicate keyFilter;
     private Predicate valueFilter;
     private final boolean deletionVectorsEnabled;
+    private final MergeEngine mergeEngine;
 
     public KeyValueFileStoreScan(
             RowType partitionType,
@@ -59,7 +63,8 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
             boolean checkNumOfBuckets,
             Integer scanManifestParallelism,
             String branchName,
-            boolean deletionVectorsEnabled) {
+            boolean deletionVectorsEnabled,
+            MergeEngine mergeEngine) {
         super(
                 partitionType,
                 bucketFilter,
@@ -81,6 +86,7 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
                         sid -> 
keyValueFieldsExtractor.valueFields(scanTableSchema(sid)),
                         schema.id());
         this.deletionVectorsEnabled = deletionVectorsEnabled;
+        this.mergeEngine = mergeEngine;
     }
 
     public KeyValueFileStoreScan withKeyFilter(Predicate predicate) {
@@ -100,7 +106,9 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
         Predicate filter = null;
         FieldStatsArraySerializer serializer = null;
         BinaryTableStats stats = null;
-        if (deletionVectorsEnabled && entry.level() > 0 && valueFilter != 
null) {
+        if ((deletionVectorsEnabled || mergeEngine == FIRST_ROW)
+                && entry.level() > 0
+                && valueFilter != null) {
             filter = valueFilter;
             serializer = 
fieldValueStatsConverters.getOrCreate(entry.file().schemaId());
             stats = entry.file().valueStats();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index ac0f798a4..f7215dc57 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -157,7 +157,10 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
     @Override
     public InnerTableScan newScan() {
         return new InnerTableScanImpl(
-                coreOptions(), newSnapshotReader(), 
DefaultValueAssigner.create(tableSchema));
+                tableSchema.primaryKeys().size() > 0,
+                coreOptions(),
+                newSnapshotReader(),
+                DefaultValueAssigner.create(tableSchema));
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index f35afc64d..fea783259 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -116,11 +116,13 @@ class PrimaryKeyFileStoreTable extends 
AbstractFileStoreTable {
 
     @Override
     protected SplitGenerator splitGenerator() {
+        CoreOptions options = store().options();
         return new MergeTreeSplitGenerator(
                 store().newKeyComparator(),
-                store().options().splitTargetSize(),
-                store().options().splitOpenFileCost(),
-                store().options().deletionVectorsEnabled());
+                options.splitTargetSize(),
+                options.splitOpenFileCost(),
+                options.deletionVectorsEnabled(),
+                options.mergeEngine());
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
index 375ef7e0a..b307279d0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
@@ -28,6 +28,8 @@ import 
org.apache.paimon.table.source.snapshot.StartingScanner.ScannedResult;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
+
 /** {@link TableScan} implementation for batch planning. */
 public class InnerTableScanImpl extends AbstractInnerTableScan {
 
@@ -39,13 +41,14 @@ public class InnerTableScanImpl extends 
AbstractInnerTableScan {
     private Integer pushDownLimit;
 
     public InnerTableScanImpl(
+            boolean pkTable,
             CoreOptions options,
             SnapshotReader snapshotReader,
             DefaultValueAssigner defaultValueAssigner) {
         super(options, snapshotReader);
         this.hasNext = true;
         this.defaultValueAssigner = defaultValueAssigner;
-        if (options.deletionVectorsEnabled()) {
+        if (pkTable && (options.deletionVectorsEnabled() || 
options.mergeEngine() == FIRST_ROW)) {
             snapshotReader.withLevelFilter(level -> level > 0);
         }
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java
index 7cf1ed24d..9a06a53f4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.table.source;
 
+import org.apache.paimon.CoreOptions.MergeEngine;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.mergetree.SortedRun;
@@ -31,6 +32,8 @@ import java.util.List;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
+
 /** Merge tree implementation of {@link SplitGenerator}. */
 public class MergeTreeSplitGenerator implements SplitGenerator {
 
@@ -42,20 +45,24 @@ public class MergeTreeSplitGenerator implements 
SplitGenerator {
 
     private final boolean deletionVectorsEnabled;
 
+    private final MergeEngine mergeEngine;
+
     public MergeTreeSplitGenerator(
             Comparator<InternalRow> keyComparator,
             long targetSplitSize,
             long openFileCost,
-            boolean deletionVectorsEnabled) {
+            boolean deletionVectorsEnabled,
+            MergeEngine mergeEngine) {
         this.keyComparator = keyComparator;
         this.targetSplitSize = targetSplitSize;
         this.openFileCost = openFileCost;
         this.deletionVectorsEnabled = deletionVectorsEnabled;
+        this.mergeEngine = mergeEngine;
     }
 
     @Override
     public List<List<DataFileMeta>> splitForBatch(List<DataFileMeta> files) {
-        if (deletionVectorsEnabled) {
+        if (deletionVectorsEnabled || mergeEngine == FIRST_ROW) {
             Function<DataFileMeta, Long> weightFunc =
                     file -> Math.max(file.fileSize(), openFileCost);
             return BinPacking.packForOrdered(files, weightFunc, 
targetSplitSize);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 06836d481..aa28fa667 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.CoreOptions.MergeEngine;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.codegen.CodeGenUtils;
 import org.apache.paimon.codegen.RecordComparator;
@@ -63,6 +64,7 @@ import java.util.Set;
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
 import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
 import static org.apache.paimon.operation.FileStoreScan.Plan.groupByPartFiles;
 import static 
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
@@ -73,6 +75,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
     private final FileStoreScan scan;
     private final TableSchema tableSchema;
     private final CoreOptions options;
+    private final MergeEngine mergeEngine;
     private final boolean deletionVectors;
     private final SnapshotManager snapshotManager;
     private final ConsumerManager consumerManager;
@@ -100,6 +103,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
         this.scan = scan;
         this.tableSchema = tableSchema;
         this.options = options;
+        this.mergeEngine = options.mergeEngine();
         this.deletionVectors = options.deletionVectorsEnabled();
         this.snapshotManager = snapshotManager;
         this.consumerManager =
@@ -435,7 +439,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
         String bucketPath = pathFactory.bucketPath(partition, 
bucket).toString();
 
         // append only or deletionVectors files can be returned
-        if (tableSchema.primaryKeys().isEmpty() || deletionVectors) {
+        if (tableSchema.primaryKeys().isEmpty() || deletionVectors || 
mergeEngine == FIRST_ROW) {
             return makeRawTableFiles(bucketPath, dataFiles);
         }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
index 35ac209a9..3c6910fbe 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
@@ -105,6 +105,7 @@ public class ReadOptimizedTable implements DataTable, 
ReadonlyTable {
     @Override
     public InnerTableScan newScan() {
         return new InnerTableScanImpl(
+                dataTable.schema().primaryKeys().size() > 0,
                 coreOptions(),
                 newSnapshotReader(),
                 DefaultValueAssigner.create(dataTable.schema()));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
index 6d97eda5f..127833921 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
@@ -30,6 +30,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
 import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
 import static org.apache.paimon.io.DataFileTestUtils.fromMinMax;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -108,14 +109,14 @@ public class SplitGeneratorTest {
         Comparator<InternalRow> comparator = Comparator.comparingInt(o -> 
o.getInt(0));
         assertThat(
                         toNames(
-                                new MergeTreeSplitGenerator(comparator, 100, 
2, false)
+                                new MergeTreeSplitGenerator(comparator, 100, 
2, false, DEDUPLICATE)
                                         .splitForBatch(files)))
                 .containsExactlyInAnyOrder(
                         Arrays.asList("1", "2", "4", "3", "5"), 
Collections.singletonList("6"));
 
         assertThat(
                         toNames(
-                                new MergeTreeSplitGenerator(comparator, 100, 
30, false)
+                                new MergeTreeSplitGenerator(comparator, 100, 
30, false, DEDUPLICATE)
                                         .splitForBatch(files)))
                 .containsExactlyInAnyOrder(
                         Arrays.asList("1", "2", "4", "3"),

Reply via email to