This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 59ad79ab0 [core] ReadBuilder.withShard should shuffle files for dv
mode and first row (#3472)
59ad79ab0 is described below
commit 59ad79ab0144de287d8ff3631615e97bde31a13c
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Jun 5 12:07:09 2024 +0800
[core] ReadBuilder.withShard should shuffle files for dv mode and first row
(#3472)
---
.../paimon/table/AbstractFileStoreTable.java | 2 --
.../table/source/AppendOnlySplitGenerator.java | 5 +++++
.../paimon/table/source/DataTableBatchScan.java | 6 +-----
.../paimon/table/source/DataTableStreamScan.java | 6 +-----
.../table/source/MergeTreeSplitGenerator.java | 5 +++++
.../apache/paimon/table/source/SplitGenerator.java | 2 ++
.../table/source/snapshot/SnapshotReader.java | 14 +------------
.../table/source/snapshot/SnapshotReaderImpl.java | 13 ++++++++++++
.../apache/paimon/table/system/AuditLogTable.java | 6 ++++++
.../paimon/table/system/ReadOptimizedTable.java | 2 --
.../paimon/table/FileStoreTableTestBase.java | 4 +++-
.../paimon/table/PrimaryKeyFileStoreTableTest.java | 24 ++++++++++++++++++++++
12 files changed, 61 insertions(+), 28 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 07777428c..6f723f834 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -155,7 +155,6 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
@Override
public DataTableBatchScan newScan() {
return new DataTableBatchScan(
- bucketMode(),
tableSchema.primaryKeys().size() > 0,
coreOptions(),
newSnapshotReader(),
@@ -165,7 +164,6 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
@Override
public StreamDataTableScan newStreamScan() {
return new DataTableStreamScan(
- bucketMode(),
coreOptions(),
newSnapshotReader(),
snapshotManager(),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendOnlySplitGenerator.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendOnlySplitGenerator.java
index 227566158..aefac1d97 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendOnlySplitGenerator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendOnlySplitGenerator.java
@@ -44,6 +44,11 @@ public class AppendOnlySplitGenerator implements
SplitGenerator {
this.bucketMode = bucketMode;
}
+ @Override
+ public boolean alwaysRawConvertible() {
+ return true;
+ }
+
@Override
public List<SplitGroup> splitForBatch(List<DataFileMeta> input) {
List<DataFileMeta> files = new ArrayList<>(input);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
index 92652aab3..5c7b09fc2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
@@ -21,7 +21,6 @@ package org.apache.paimon.table.source;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.StartingScanner;
import org.apache.paimon.table.source.snapshot.StartingScanner.ScannedResult;
@@ -34,7 +33,6 @@ import static
org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
/** {@link TableScan} implementation for batch planning. */
public class DataTableBatchScan extends AbstractDataTableScan {
- private final BucketMode bucketMode;
private final DefaultValueAssigner defaultValueAssigner;
private StartingScanner startingScanner;
@@ -43,13 +41,11 @@ public class DataTableBatchScan extends
AbstractDataTableScan {
private Integer pushDownLimit;
public DataTableBatchScan(
- BucketMode bucketMode,
boolean pkTable,
CoreOptions options,
SnapshotReader snapshotReader,
DefaultValueAssigner defaultValueAssigner) {
super(options, snapshotReader);
- this.bucketMode = bucketMode;
this.hasNext = true;
this.defaultValueAssigner = defaultValueAssigner;
if (pkTable && (options.deletionVectorsEnabled() ||
options.mergeEngine() == FIRST_ROW)) {
@@ -125,7 +121,7 @@ public class DataTableBatchScan extends
AbstractDataTableScan {
@Override
public DataTableScan withShard(int indexOfThisSubtask, int
numberOfParallelSubtasks) {
- snapshotReader.withShard(bucketMode, indexOfThisSubtask,
numberOfParallelSubtasks);
+ snapshotReader.withShard(indexOfThisSubtask, numberOfParallelSubtasks);
return this;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
index 40a608e1f..d746803b7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
@@ -24,7 +24,6 @@ import org.apache.paimon.consumer.Consumer;
import org.apache.paimon.lookup.LookupStrategy;
import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.source.snapshot.AllDeltaFollowUpScanner;
import org.apache.paimon.table.source.snapshot.BoundedChecker;
import
org.apache.paimon.table.source.snapshot.CompactionChangelogFollowUpScanner;
@@ -53,7 +52,6 @@ public class DataTableStreamScan extends
AbstractDataTableScan implements Stream
private static final Logger LOG =
LoggerFactory.getLogger(DataTableStreamScan.class);
- private final BucketMode bucketMode;
private final CoreOptions options;
private final SnapshotManager snapshotManager;
private final boolean supportStreamingReadOverwrite;
@@ -70,14 +68,12 @@ public class DataTableStreamScan extends
AbstractDataTableScan implements Stream
@Nullable private Long nextSnapshotId;
public DataTableStreamScan(
- BucketMode bucketMode,
CoreOptions options,
SnapshotReader snapshotReader,
SnapshotManager snapshotManager,
boolean supportStreamingReadOverwrite,
DefaultValueAssigner defaultValueAssigner) {
super(options, snapshotReader);
- this.bucketMode = bucketMode;
this.options = options;
this.snapshotManager = snapshotManager;
this.supportStreamingReadOverwrite = supportStreamingReadOverwrite;
@@ -286,7 +282,7 @@ public class DataTableStreamScan extends
AbstractDataTableScan implements Stream
@Override
public DataTableScan withShard(int indexOfThisSubtask, int
numberOfParallelSubtasks) {
- snapshotReader.withShard(bucketMode, indexOfThisSubtask,
numberOfParallelSubtasks);
+ snapshotReader.withShard(indexOfThisSubtask, numberOfParallelSubtasks);
return this;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java
index c532f85b5..8d59b7681 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java
@@ -60,6 +60,11 @@ public class MergeTreeSplitGenerator implements
SplitGenerator {
this.mergeEngine = mergeEngine;
}
+ @Override
+ public boolean alwaysRawConvertible() {
+ return deletionVectorsEnabled || mergeEngine == FIRST_ROW;
+ }
+
@Override
public List<SplitGroup> splitForBatch(List<DataFileMeta> files) {
boolean rawConvertible =
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/SplitGenerator.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/SplitGenerator.java
index 73cfa9826..baf4515ba 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/SplitGenerator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/SplitGenerator.java
@@ -25,6 +25,8 @@ import java.util.List;
/** Generate splits from {@link DataFileMeta}s. */
public interface SplitGenerator {
+ boolean alwaysRawConvertible();
+
List<SplitGroup> splitForBatch(List<DataFileMeta> files);
List<SplitGroup> splitForStreaming(List<DataFileMeta> files);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index 26840ade7..8db3effd1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -24,7 +24,6 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.Split;
@@ -69,18 +68,7 @@ public interface SnapshotReader {
SnapshotReader withDataFileNameFilter(Filter<String> fileNameFilter);
- default SnapshotReader withShard(
- BucketMode bucketMode, int indexOfThisSubtask, int
numberOfParallelSubtasks) {
- if (bucketMode == BucketMode.BUCKET_UNAWARE) {
- withDataFileNameFilter(
- file ->
- Math.abs(file.hashCode() %
numberOfParallelSubtasks)
- == indexOfThisSubtask);
- } else {
- withBucketFilter(bucket -> bucket % numberOfParallelSubtasks ==
indexOfThisSubtask);
- }
- return this;
- }
+ SnapshotReader withShard(int indexOfThisSubtask, int
numberOfParallelSubtasks);
SnapshotReader withMetricRegistry(MetricRegistry registry);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index cc793c641..6e27a2480 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -227,6 +227,19 @@ public class SnapshotReaderImpl implements SnapshotReader {
return this;
}
+ @Override
+ public SnapshotReader withShard(int indexOfThisSubtask, int
numberOfParallelSubtasks) {
+ if (splitGenerator.alwaysRawConvertible()) {
+ withDataFileNameFilter(
+ file ->
+ Math.abs(file.hashCode() %
numberOfParallelSubtasks)
+ == indexOfThisSubtask);
+ } else {
+ withBucketFilter(bucket -> bucket % numberOfParallelSubtasks ==
indexOfThisSubtask);
+ }
+ return this;
+ }
+
/** Get splits from {@link FileKind#ADD} files. */
@Override
public Plan read() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 196b2fa4c..6a67f696b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -282,6 +282,12 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
return this;
}
+ @Override
+ public SnapshotReader withShard(int indexOfThisSubtask, int
numberOfParallelSubtasks) {
+ snapshotReader.withShard(indexOfThisSubtask,
numberOfParallelSubtasks);
+ return this;
+ }
+
@Override
public SnapshotReader withMetricRegistry(MetricRegistry registry) {
snapshotReader.withMetricRegistry(registry);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
index c88e2c66c..4da8524c8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
@@ -99,7 +99,6 @@ public class ReadOptimizedTable implements DataTable,
ReadonlyTable {
@Override
public DataTableBatchScan newScan() {
return new DataTableBatchScan(
- dataTable.bucketMode(),
dataTable.schema().primaryKeys().size() > 0,
coreOptions(),
newSnapshotReader(),
@@ -109,7 +108,6 @@ public class ReadOptimizedTable implements DataTable,
ReadonlyTable {
@Override
public StreamDataTableScan newStreamScan() {
return new DataTableStreamScan(
- dataTable.bucketMode(),
coreOptions(),
newSnapshotReader(),
snapshotManager(),
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index 7592dd079..35b7666c9 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -30,6 +30,7 @@ import org.apache.paimon.data.GenericMap;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.JoinedRow;
+import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
@@ -387,7 +388,8 @@ public abstract class FileStoreTableTestBase {
}
protected void innerTestWithShard(FileStoreTable table) throws Exception {
- StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableWrite write =
+
table.newWrite(commitUser).withIOManager(IOManager.create(tempDir.toString()));
write.write(rowData(1, 1, 2L));
write.write(rowData(1, 3, 4L));
write.write(rowData(1, 5, 6L));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index a952e032f..8026becfb 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -89,6 +89,8 @@ import static
org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
import static org.apache.paimon.CoreOptions.ChangelogProducer.LOOKUP;
import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
import static org.apache.paimon.CoreOptions.FILE_FORMAT;
+import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
+import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
import static org.apache.paimon.CoreOptions.SNAPSHOT_EXPIRE_LIMIT;
import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST;
import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_TARGET_SIZE;
@@ -788,6 +790,28 @@ public class PrimaryKeyFileStoreTableTest extends
FileStoreTableTestBase {
innerTestWithShard(table);
}
+ @Test
+ public void testWithShardDeletionVectors() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(BUCKET, 5);
+ conf.set(DELETION_VECTORS_ENABLED, true);
+ });
+ innerTestWithShard(table);
+ }
+
+ @Test
+ public void testWithShardFirstRow() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(BUCKET, 5);
+ conf.set(MERGE_ENGINE, FIRST_ROW);
+ });
+ innerTestWithShard(table);
+ }
+
@Test
public void testSlowCommit() throws Exception {
FileStoreTable table = createFileStoreTable();