This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 21e016a53 [core] Optimize first_row batch read (#3055)
21e016a53 is described below
commit 21e016a53c72e995d5b0a39a188e89032dd5d4e9
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Mar 20 15:12:10 2024 +0800
[core] Optimize first_row batch read (#3055)
---
.../src/main/java/org/apache/paimon/KeyValueFileStore.java | 3 ++-
.../org/apache/paimon/operation/KeyValueFileStoreRead.java | 2 +-
.../org/apache/paimon/operation/KeyValueFileStoreScan.java | 12 ++++++++++--
.../java/org/apache/paimon/table/AbstractFileStoreTable.java | 5 ++++-
.../org/apache/paimon/table/PrimaryKeyFileStoreTable.java | 8 +++++---
.../org/apache/paimon/table/source/InnerTableScanImpl.java | 5 ++++-
.../apache/paimon/table/source/MergeTreeSplitGenerator.java | 11 +++++++++--
.../paimon/table/source/snapshot/SnapshotReaderImpl.java | 6 +++++-
.../org/apache/paimon/table/system/ReadOptimizedTable.java | 1 +
.../org/apache/paimon/table/source/SplitGeneratorTest.java | 5 +++--
10 files changed, 44 insertions(+), 14 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index d80bd6d39..956b615d7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -229,7 +229,8 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
forWrite,
options.scanManifestParallelism(),
branchName,
- options.deletionVectorsEnabled());
+ options.deletionVectorsEnabled(),
+ options.mergeEngine());
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
index 0e115fddd..b0ab4338f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
@@ -213,7 +213,7 @@ public class KeyValueFileStoreRead implements
FileStoreRead<KeyValue> {
private RecordReader<KeyValue>
createReaderWithoutOuterProjection(DataSplit split)
throws IOException {
if (split.beforeFiles().isEmpty()) {
- if (split.isStreaming() || split.deletionFiles().isPresent()) {
+ if (split.isStreaming() || split.convertToRawFiles().isPresent()) {
return noMergeRead(
split.partition(),
split.bucket(),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index 0f34cac5a..b4c4909ae 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -18,6 +18,7 @@
package org.apache.paimon.operation;
+import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.KeyValueFileStore;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
@@ -36,6 +37,8 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
+
/** {@link FileStoreScan} for {@link KeyValueFileStore}. */
public class KeyValueFileStoreScan extends AbstractFileStoreScan {
@@ -45,6 +48,7 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
private Predicate keyFilter;
private Predicate valueFilter;
private final boolean deletionVectorsEnabled;
+ private final MergeEngine mergeEngine;
public KeyValueFileStoreScan(
RowType partitionType,
@@ -59,7 +63,8 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
boolean checkNumOfBuckets,
Integer scanManifestParallelism,
String branchName,
- boolean deletionVectorsEnabled) {
+ boolean deletionVectorsEnabled,
+ MergeEngine mergeEngine) {
super(
partitionType,
bucketFilter,
@@ -81,6 +86,7 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
sid ->
keyValueFieldsExtractor.valueFields(scanTableSchema(sid)),
schema.id());
this.deletionVectorsEnabled = deletionVectorsEnabled;
+ this.mergeEngine = mergeEngine;
}
public KeyValueFileStoreScan withKeyFilter(Predicate predicate) {
@@ -100,7 +106,9 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
Predicate filter = null;
FieldStatsArraySerializer serializer = null;
BinaryTableStats stats = null;
- if (deletionVectorsEnabled && entry.level() > 0 && valueFilter !=
null) {
+ if ((deletionVectorsEnabled || mergeEngine == FIRST_ROW)
+ && entry.level() > 0
+ && valueFilter != null) {
filter = valueFilter;
serializer =
fieldValueStatsConverters.getOrCreate(entry.file().schemaId());
stats = entry.file().valueStats();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index ac0f798a4..f7215dc57 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -157,7 +157,10 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
@Override
public InnerTableScan newScan() {
return new InnerTableScanImpl(
- coreOptions(), newSnapshotReader(),
DefaultValueAssigner.create(tableSchema));
+ tableSchema.primaryKeys().size() > 0,
+ coreOptions(),
+ newSnapshotReader(),
+ DefaultValueAssigner.create(tableSchema));
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index f35afc64d..fea783259 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -116,11 +116,13 @@ class PrimaryKeyFileStoreTable extends
AbstractFileStoreTable {
@Override
protected SplitGenerator splitGenerator() {
+ CoreOptions options = store().options();
return new MergeTreeSplitGenerator(
store().newKeyComparator(),
- store().options().splitTargetSize(),
- store().options().splitOpenFileCost(),
- store().options().deletionVectorsEnabled());
+ options.splitTargetSize(),
+ options.splitOpenFileCost(),
+ options.deletionVectorsEnabled(),
+ options.mergeEngine());
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
index 375ef7e0a..b307279d0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
@@ -28,6 +28,8 @@ import
org.apache.paimon.table.source.snapshot.StartingScanner.ScannedResult;
import java.util.ArrayList;
import java.util.List;
+import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
+
/** {@link TableScan} implementation for batch planning. */
public class InnerTableScanImpl extends AbstractInnerTableScan {
@@ -39,13 +41,14 @@ public class InnerTableScanImpl extends
AbstractInnerTableScan {
private Integer pushDownLimit;
public InnerTableScanImpl(
+ boolean pkTable,
CoreOptions options,
SnapshotReader snapshotReader,
DefaultValueAssigner defaultValueAssigner) {
super(options, snapshotReader);
this.hasNext = true;
this.defaultValueAssigner = defaultValueAssigner;
- if (options.deletionVectorsEnabled()) {
+ if (pkTable && (options.deletionVectorsEnabled() ||
options.mergeEngine() == FIRST_ROW)) {
snapshotReader.withLevelFilter(level -> level > 0);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java
index 7cf1ed24d..9a06a53f4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java
@@ -18,6 +18,7 @@
package org.apache.paimon.table.source;
+import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.mergetree.SortedRun;
@@ -31,6 +32,8 @@ import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
+import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
+
/** Merge tree implementation of {@link SplitGenerator}. */
public class MergeTreeSplitGenerator implements SplitGenerator {
@@ -42,20 +45,24 @@ public class MergeTreeSplitGenerator implements
SplitGenerator {
private final boolean deletionVectorsEnabled;
+ private final MergeEngine mergeEngine;
+
public MergeTreeSplitGenerator(
Comparator<InternalRow> keyComparator,
long targetSplitSize,
long openFileCost,
- boolean deletionVectorsEnabled) {
+ boolean deletionVectorsEnabled,
+ MergeEngine mergeEngine) {
this.keyComparator = keyComparator;
this.targetSplitSize = targetSplitSize;
this.openFileCost = openFileCost;
this.deletionVectorsEnabled = deletionVectorsEnabled;
+ this.mergeEngine = mergeEngine;
}
@Override
public List<List<DataFileMeta>> splitForBatch(List<DataFileMeta> files) {
- if (deletionVectorsEnabled) {
+ if (deletionVectorsEnabled || mergeEngine == FIRST_ROW) {
Function<DataFileMeta, Long> weightFunc =
file -> Math.max(file.fileSize(), openFileCost);
return BinPacking.packForOrdered(files, weightFunc,
targetSplitSize);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 06836d481..aa28fa667 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -19,6 +19,7 @@
package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.Snapshot;
import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.codegen.RecordComparator;
@@ -63,6 +64,7 @@ import java.util.Set;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
+import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.apache.paimon.operation.FileStoreScan.Plan.groupByPartFiles;
import static
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
@@ -73,6 +75,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
private final FileStoreScan scan;
private final TableSchema tableSchema;
private final CoreOptions options;
+ private final MergeEngine mergeEngine;
private final boolean deletionVectors;
private final SnapshotManager snapshotManager;
private final ConsumerManager consumerManager;
@@ -100,6 +103,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
this.scan = scan;
this.tableSchema = tableSchema;
this.options = options;
+ this.mergeEngine = options.mergeEngine();
this.deletionVectors = options.deletionVectorsEnabled();
this.snapshotManager = snapshotManager;
this.consumerManager =
@@ -435,7 +439,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
String bucketPath = pathFactory.bucketPath(partition,
bucket).toString();
// append only or deletionVectors files can be returned
- if (tableSchema.primaryKeys().isEmpty() || deletionVectors) {
+ if (tableSchema.primaryKeys().isEmpty() || deletionVectors ||
mergeEngine == FIRST_ROW) {
return makeRawTableFiles(bucketPath, dataFiles);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
index 35ac209a9..3c6910fbe 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
@@ -105,6 +105,7 @@ public class ReadOptimizedTable implements DataTable,
ReadonlyTable {
@Override
public InnerTableScan newScan() {
return new InnerTableScanImpl(
+ dataTable.schema().primaryKeys().size() > 0,
coreOptions(),
newSnapshotReader(),
DefaultValueAssigner.create(dataTable.schema()));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
index 6d97eda5f..127833921 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
@@ -30,6 +30,7 @@ import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
+import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
import static org.apache.paimon.io.DataFileTestUtils.fromMinMax;
import static org.assertj.core.api.Assertions.assertThat;
@@ -108,14 +109,14 @@ public class SplitGeneratorTest {
Comparator<InternalRow> comparator = Comparator.comparingInt(o ->
o.getInt(0));
assertThat(
toNames(
- new MergeTreeSplitGenerator(comparator, 100,
2, false)
+ new MergeTreeSplitGenerator(comparator, 100,
2, false, DEDUPLICATE)
.splitForBatch(files)))
.containsExactlyInAnyOrder(
Arrays.asList("1", "2", "4", "3", "5"),
Collections.singletonList("6"));
assertThat(
toNames(
- new MergeTreeSplitGenerator(comparator, 100,
30, false)
+ new MergeTreeSplitGenerator(comparator, 100,
30, false, DEDUPLICATE)
.splitForBatch(files)))
.containsExactlyInAnyOrder(
Arrays.asList("1", "2", "4", "3"),