This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 59ad79ab0 [core] ReadBuilder.withShard should shuffle files for dv 
mode and first row (#3472)
59ad79ab0 is described below

commit 59ad79ab0144de287d8ff3631615e97bde31a13c
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Jun 5 12:07:09 2024 +0800

    [core] ReadBuilder.withShard should shuffle files for dv mode and first row 
(#3472)
---
 .../paimon/table/AbstractFileStoreTable.java       |  2 --
 .../table/source/AppendOnlySplitGenerator.java     |  5 +++++
 .../paimon/table/source/DataTableBatchScan.java    |  6 +-----
 .../paimon/table/source/DataTableStreamScan.java   |  6 +-----
 .../table/source/MergeTreeSplitGenerator.java      |  5 +++++
 .../apache/paimon/table/source/SplitGenerator.java |  2 ++
 .../table/source/snapshot/SnapshotReader.java      | 14 +------------
 .../table/source/snapshot/SnapshotReaderImpl.java  | 13 ++++++++++++
 .../apache/paimon/table/system/AuditLogTable.java  |  6 ++++++
 .../paimon/table/system/ReadOptimizedTable.java    |  2 --
 .../paimon/table/FileStoreTableTestBase.java       |  4 +++-
 .../paimon/table/PrimaryKeyFileStoreTableTest.java | 24 ++++++++++++++++++++++
 12 files changed, 61 insertions(+), 28 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 07777428c..6f723f834 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -155,7 +155,6 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
     @Override
     public DataTableBatchScan newScan() {
         return new DataTableBatchScan(
-                bucketMode(),
                 tableSchema.primaryKeys().size() > 0,
                 coreOptions(),
                 newSnapshotReader(),
@@ -165,7 +164,6 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
     @Override
     public StreamDataTableScan newStreamScan() {
         return new DataTableStreamScan(
-                bucketMode(),
                 coreOptions(),
                 newSnapshotReader(),
                 snapshotManager(),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendOnlySplitGenerator.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendOnlySplitGenerator.java
index 227566158..aefac1d97 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendOnlySplitGenerator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendOnlySplitGenerator.java
@@ -44,6 +44,11 @@ public class AppendOnlySplitGenerator implements 
SplitGenerator {
         this.bucketMode = bucketMode;
     }
 
+    @Override
+    public boolean alwaysRawConvertible() {
+        return true;
+    }
+
     @Override
     public List<SplitGroup> splitForBatch(List<DataFileMeta> input) {
         List<DataFileMeta> files = new ArrayList<>(input);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
index 92652aab3..5c7b09fc2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
@@ -21,7 +21,6 @@ package org.apache.paimon.table.source;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.operation.DefaultValueAssigner;
 import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.table.source.snapshot.StartingScanner;
 import org.apache.paimon.table.source.snapshot.StartingScanner.ScannedResult;
@@ -34,7 +33,6 @@ import static 
org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
 /** {@link TableScan} implementation for batch planning. */
 public class DataTableBatchScan extends AbstractDataTableScan {
 
-    private final BucketMode bucketMode;
     private final DefaultValueAssigner defaultValueAssigner;
 
     private StartingScanner startingScanner;
@@ -43,13 +41,11 @@ public class DataTableBatchScan extends 
AbstractDataTableScan {
     private Integer pushDownLimit;
 
     public DataTableBatchScan(
-            BucketMode bucketMode,
             boolean pkTable,
             CoreOptions options,
             SnapshotReader snapshotReader,
             DefaultValueAssigner defaultValueAssigner) {
         super(options, snapshotReader);
-        this.bucketMode = bucketMode;
         this.hasNext = true;
         this.defaultValueAssigner = defaultValueAssigner;
         if (pkTable && (options.deletionVectorsEnabled() || 
options.mergeEngine() == FIRST_ROW)) {
@@ -125,7 +121,7 @@ public class DataTableBatchScan extends 
AbstractDataTableScan {
 
     @Override
     public DataTableScan withShard(int indexOfThisSubtask, int 
numberOfParallelSubtasks) {
-        snapshotReader.withShard(bucketMode, indexOfThisSubtask, 
numberOfParallelSubtasks);
+        snapshotReader.withShard(indexOfThisSubtask, numberOfParallelSubtasks);
         return this;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
index 40a608e1f..d746803b7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
@@ -24,7 +24,6 @@ import org.apache.paimon.consumer.Consumer;
 import org.apache.paimon.lookup.LookupStrategy;
 import org.apache.paimon.operation.DefaultValueAssigner;
 import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.source.snapshot.AllDeltaFollowUpScanner;
 import org.apache.paimon.table.source.snapshot.BoundedChecker;
 import 
org.apache.paimon.table.source.snapshot.CompactionChangelogFollowUpScanner;
@@ -53,7 +52,6 @@ public class DataTableStreamScan extends 
AbstractDataTableScan implements Stream
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DataTableStreamScan.class);
 
-    private final BucketMode bucketMode;
     private final CoreOptions options;
     private final SnapshotManager snapshotManager;
     private final boolean supportStreamingReadOverwrite;
@@ -70,14 +68,12 @@ public class DataTableStreamScan extends 
AbstractDataTableScan implements Stream
     @Nullable private Long nextSnapshotId;
 
     public DataTableStreamScan(
-            BucketMode bucketMode,
             CoreOptions options,
             SnapshotReader snapshotReader,
             SnapshotManager snapshotManager,
             boolean supportStreamingReadOverwrite,
             DefaultValueAssigner defaultValueAssigner) {
         super(options, snapshotReader);
-        this.bucketMode = bucketMode;
         this.options = options;
         this.snapshotManager = snapshotManager;
         this.supportStreamingReadOverwrite = supportStreamingReadOverwrite;
@@ -286,7 +282,7 @@ public class DataTableStreamScan extends 
AbstractDataTableScan implements Stream
 
     @Override
     public DataTableScan withShard(int indexOfThisSubtask, int 
numberOfParallelSubtasks) {
-        snapshotReader.withShard(bucketMode, indexOfThisSubtask, 
numberOfParallelSubtasks);
+        snapshotReader.withShard(indexOfThisSubtask, numberOfParallelSubtasks);
         return this;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java
index c532f85b5..8d59b7681 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java
@@ -60,6 +60,11 @@ public class MergeTreeSplitGenerator implements 
SplitGenerator {
         this.mergeEngine = mergeEngine;
     }
 
+    @Override
+    public boolean alwaysRawConvertible() {
+        return deletionVectorsEnabled || mergeEngine == FIRST_ROW;
+    }
+
     @Override
     public List<SplitGroup> splitForBatch(List<DataFileMeta> files) {
         boolean rawConvertible =
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/SplitGenerator.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/SplitGenerator.java
index 73cfa9826..baf4515ba 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/SplitGenerator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/SplitGenerator.java
@@ -25,6 +25,8 @@ import java.util.List;
 /** Generate splits from {@link DataFileMeta}s. */
 public interface SplitGenerator {
 
+    boolean alwaysRawConvertible();
+
     List<SplitGroup> splitForBatch(List<DataFileMeta> files);
 
     List<SplitGroup> splitForStreaming(List<DataFileMeta> files);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index 26840ade7..8db3effd1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -24,7 +24,6 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.table.source.Split;
@@ -69,18 +68,7 @@ public interface SnapshotReader {
 
     SnapshotReader withDataFileNameFilter(Filter<String> fileNameFilter);
 
-    default SnapshotReader withShard(
-            BucketMode bucketMode, int indexOfThisSubtask, int 
numberOfParallelSubtasks) {
-        if (bucketMode == BucketMode.BUCKET_UNAWARE) {
-            withDataFileNameFilter(
-                    file ->
-                            Math.abs(file.hashCode() % 
numberOfParallelSubtasks)
-                                    == indexOfThisSubtask);
-        } else {
-            withBucketFilter(bucket -> bucket % numberOfParallelSubtasks == 
indexOfThisSubtask);
-        }
-        return this;
-    }
+    SnapshotReader withShard(int indexOfThisSubtask, int 
numberOfParallelSubtasks);
 
     SnapshotReader withMetricRegistry(MetricRegistry registry);
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index cc793c641..6e27a2480 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -227,6 +227,19 @@ public class SnapshotReaderImpl implements SnapshotReader {
         return this;
     }
 
+    @Override
+    public SnapshotReader withShard(int indexOfThisSubtask, int 
numberOfParallelSubtasks) {
+        if (splitGenerator.alwaysRawConvertible()) {
+            withDataFileNameFilter(
+                    file ->
+                            Math.abs(file.hashCode() % 
numberOfParallelSubtasks)
+                                    == indexOfThisSubtask);
+        } else {
+            withBucketFilter(bucket -> bucket % numberOfParallelSubtasks == 
indexOfThisSubtask);
+        }
+        return this;
+    }
+
     /** Get splits from {@link FileKind#ADD} files. */
     @Override
     public Plan read() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 196b2fa4c..6a67f696b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -282,6 +282,12 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
             return this;
         }
 
+        @Override
+        public SnapshotReader withShard(int indexOfThisSubtask, int 
numberOfParallelSubtasks) {
+            snapshotReader.withShard(indexOfThisSubtask, 
numberOfParallelSubtasks);
+            return this;
+        }
+
         @Override
         public SnapshotReader withMetricRegistry(MetricRegistry registry) {
             snapshotReader.withMetricRegistry(registry);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
index c88e2c66c..4da8524c8 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
@@ -99,7 +99,6 @@ public class ReadOptimizedTable implements DataTable, 
ReadonlyTable {
     @Override
     public DataTableBatchScan newScan() {
         return new DataTableBatchScan(
-                dataTable.bucketMode(),
                 dataTable.schema().primaryKeys().size() > 0,
                 coreOptions(),
                 newSnapshotReader(),
@@ -109,7 +108,6 @@ public class ReadOptimizedTable implements DataTable, 
ReadonlyTable {
     @Override
     public StreamDataTableScan newStreamScan() {
         return new DataTableStreamScan(
-                dataTable.bucketMode(),
                 coreOptions(),
                 newSnapshotReader(),
                 snapshotManager(),
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index 7592dd079..35b7666c9 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -30,6 +30,7 @@ import org.apache.paimon.data.GenericMap;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.JoinedRow;
+import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.FileIOFinder;
 import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
@@ -387,7 +388,8 @@ public abstract class FileStoreTableTestBase {
     }
 
     protected void innerTestWithShard(FileStoreTable table) throws Exception {
-        StreamTableWrite write = table.newWrite(commitUser);
+        StreamTableWrite write =
+                
table.newWrite(commitUser).withIOManager(IOManager.create(tempDir.toString()));
         write.write(rowData(1, 1, 2L));
         write.write(rowData(1, 3, 4L));
         write.write(rowData(1, 5, 6L));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index a952e032f..8026becfb 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -89,6 +89,8 @@ import static 
org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
 import static org.apache.paimon.CoreOptions.ChangelogProducer.LOOKUP;
 import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
 import static org.apache.paimon.CoreOptions.FILE_FORMAT;
+import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
+import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
 import static org.apache.paimon.CoreOptions.SNAPSHOT_EXPIRE_LIMIT;
 import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST;
 import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_TARGET_SIZE;
@@ -788,6 +790,28 @@ public class PrimaryKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         innerTestWithShard(table);
     }
 
+    @Test
+    public void testWithShardDeletionVectors() throws Exception {
+        FileStoreTable table =
+                createFileStoreTable(
+                        conf -> {
+                            conf.set(BUCKET, 5);
+                            conf.set(DELETION_VECTORS_ENABLED, true);
+                        });
+        innerTestWithShard(table);
+    }
+
+    @Test
+    public void testWithShardFirstRow() throws Exception {
+        FileStoreTable table =
+                createFileStoreTable(
+                        conf -> {
+                            conf.set(BUCKET, 5);
+                            conf.set(MERGE_ENGINE, FIRST_ROW);
+                        });
+        innerTestWithShard(table);
+    }
+
     @Test
     public void testSlowCommit() throws Exception {
         FileStoreTable table = createFileStoreTable();

Reply via email to