This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a854d4ede [core] Introduce ReadBuilder.withShard (#3423)
a854d4ede is described below

commit a854d4ede6dd41123167358bf47d0c9020fd584a
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu May 30 14:09:00 2024 +0800

    [core] Introduce ReadBuilder.withShard (#3423)
---
 .../paimon/crosspartition/IndexBootstrap.java      |  4 +--
 .../java/org/apache/paimon/io/DataFileMeta.java    | 40 +++++++++++----------
 .../org/apache/paimon/manifest/ManifestEntry.java  | 12 +++++--
 .../paimon/manifest/ManifestEntrySerializer.java   |  5 +++
 .../paimon/operation/AbstractFileStoreScan.java    | 11 ++++--
 .../org/apache/paimon/operation/FileStoreScan.java |  2 ++
 .../paimon/privilege/PrivilegedFileStoreTable.java |  8 ++---
 .../paimon/table/AbstractFileStoreTable.java       | 17 ++++-----
 .../java/org/apache/paimon/table/DataTable.java    |  4 +++
 .../java/org/apache/paimon/table/InnerTable.java   |  4 +--
 .../org/apache/paimon/table/ReadonlyTable.java     |  4 +--
 ...erTableScan.java => AbstractDataTableScan.java} | 14 ++++----
 ...rTableScanImpl.java => DataTableBatchScan.java} | 14 ++++++--
 ...nnerStreamTableScan.java => DataTableScan.java} | 14 +++-----
 ...TableScanImpl.java => DataTableStreamScan.java} | 19 +++++++---
 .../apache/paimon/table/source/ReadBuilder.java    |  3 ++
 .../paimon/table/source/ReadBuilderImpl.java       | 30 ++++++++++++++--
 ...reamTableScan.java => StreamDataTableScan.java} |  2 +-
 .../table/source/snapshot/SnapshotReader.java      | 16 +++++++++
 .../table/source/snapshot/SnapshotReaderImpl.java  |  6 ++++
 .../apache/paimon/table/system/AuditLogTable.java  | 41 ++++++++++++++++------
 .../apache/paimon/table/system/BucketsTable.java   |  8 ++---
 .../paimon/table/system/FileMonitorTable.java      |  8 ++---
 .../paimon/table/system/ReadOptimizedTable.java    | 17 ++++-----
 .../paimon/operation/OrphanFilesCleanTest.java     |  4 +--
 .../paimon/table/AppendOnlyFileStoreTableTest.java | 17 +++++++++
 .../paimon/table/FileStoreTableTestBase.java       | 29 +++++++++++++++
 .../paimon/table/PrimaryKeyFileStoreTableTest.java |  6 ++++
 .../source/snapshot/DefaultValueScannerTest.java   |  4 +--
 .../flink/source/ContinuousFileStoreSource.java    |  4 +--
 .../flink/source/LogHybridSourceFactory.java       |  4 +--
 .../apache/paimon/spark/sources/StreamHelper.scala |  4 +--
 32 files changed, 270 insertions(+), 105 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
index 260af9a9a..ec8244a2a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
@@ -27,7 +27,7 @@ import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.AbstractInnerTableScan;
+import org.apache.paimon.table.source.AbstractDataTableScan;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
@@ -84,7 +84,7 @@ public class IndexBootstrap implements Serializable {
                         .newReadBuilder()
                         .withProjection(keyProjection);
 
-        AbstractInnerTableScan tableScan = (AbstractInnerTableScan) 
readBuilder.newScan();
+        AbstractDataTableScan tableScan = (AbstractDataTableScan) 
readBuilder.newScan();
         List<Split> splits =
                 tableScan
                         .withBucketFilter(bucket -> bucket % numAssigners == 
assignId)
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
index bfdfaf249..834710725 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
@@ -53,6 +53,27 @@ import static 
org.apache.paimon.utils.SerializationUtils.newStringType;
 /** Metadata of a data file. */
 public class DataFileMeta {
 
+    public static final RowType SCHEMA =
+            new RowType(
+                    Arrays.asList(
+                            new DataField(0, "_FILE_NAME", 
newStringType(false)),
+                            new DataField(1, "_FILE_SIZE", new 
BigIntType(false)),
+                            new DataField(2, "_ROW_COUNT", new 
BigIntType(false)),
+                            new DataField(3, "_MIN_KEY", newBytesType(false)),
+                            new DataField(4, "_MAX_KEY", newBytesType(false)),
+                            new DataField(5, "_KEY_STATS", 
SimpleStatsConverter.schema()),
+                            new DataField(6, "_VALUE_STATS", 
SimpleStatsConverter.schema()),
+                            new DataField(7, "_MIN_SEQUENCE_NUMBER", new 
BigIntType(false)),
+                            new DataField(8, "_MAX_SEQUENCE_NUMBER", new 
BigIntType(false)),
+                            new DataField(9, "_SCHEMA_ID", new 
BigIntType(false)),
+                            new DataField(10, "_LEVEL", new IntType(false)),
+                            new DataField(
+                                    11, "_EXTRA_FILES", new ArrayType(false, 
newStringType(false))),
+                            new DataField(12, "_CREATION_TIME", 
DataTypes.TIMESTAMP_MILLIS()),
+                            new DataField(13, "_DELETE_ROW_COUNT", new 
BigIntType(true)),
+                            new DataField(14, "_EMBEDDED_FILE_INDEX", 
newBytesType(true)),
+                            new DataField(15, "_FILE_SOURCE", new 
TinyIntType(true))));
+
     public static final BinaryRow EMPTY_MIN_KEY = EMPTY_ROW;
     public static final BinaryRow EMPTY_MAX_KEY = EMPTY_ROW;
     public static final int DUMMY_LEVEL = 0;
@@ -430,24 +451,7 @@ public class DataFileMeta {
     }
 
     public static RowType schema() {
-        List<DataField> fields = new ArrayList<>();
-        fields.add(new DataField(0, "_FILE_NAME", newStringType(false)));
-        fields.add(new DataField(1, "_FILE_SIZE", new BigIntType(false)));
-        fields.add(new DataField(2, "_ROW_COUNT", new BigIntType(false)));
-        fields.add(new DataField(3, "_MIN_KEY", newBytesType(false)));
-        fields.add(new DataField(4, "_MAX_KEY", newBytesType(false)));
-        fields.add(new DataField(5, "_KEY_STATS", 
SimpleStatsConverter.schema()));
-        fields.add(new DataField(6, "_VALUE_STATS", 
SimpleStatsConverter.schema()));
-        fields.add(new DataField(7, "_MIN_SEQUENCE_NUMBER", new 
BigIntType(false)));
-        fields.add(new DataField(8, "_MAX_SEQUENCE_NUMBER", new 
BigIntType(false)));
-        fields.add(new DataField(9, "_SCHEMA_ID", new BigIntType(false)));
-        fields.add(new DataField(10, "_LEVEL", new IntType(false)));
-        fields.add(new DataField(11, "_EXTRA_FILES", new ArrayType(false, 
newStringType(false))));
-        fields.add(new DataField(12, "_CREATION_TIME", 
DataTypes.TIMESTAMP_MILLIS()));
-        fields.add(new DataField(13, "_DELETE_ROW_COUNT", new 
BigIntType(true)));
-        fields.add(new DataField(14, "_EMBEDDED_FILE_INDEX", 
newBytesType(true)));
-        fields.add(new DataField(15, "_FILE_SOURCE", new TinyIntType(true)));
-        return new RowType(fields);
+        return SCHEMA;
     }
 
     public static long getMaxSequenceNumber(List<DataFileMeta> fileMetas) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
index 34ad169ff..af1740b00 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
@@ -171,19 +171,27 @@ public class ManifestEntry implements FileEntry {
     public static Filter<InternalRow> createEntryRowFilter(
             @Nullable PartitionPredicate partitionFilter,
             @Nullable Filter<Integer> bucketFilter,
+            @Nullable Filter<String> fileNameFilter,
             int numOfBuckets) {
         Function<InternalRow, BinaryRow> partitionGetter =
                 ManifestEntrySerializer.partitionGetter();
         Function<InternalRow, Integer> bucketGetter = 
ManifestEntrySerializer.bucketGetter();
         Function<InternalRow, Integer> totalBucketGetter =
                 ManifestEntrySerializer.totalBucketGetter();
+        Function<InternalRow, String> fileNameGetter = 
ManifestEntrySerializer.fileNameGetter();
         return row -> {
             if ((partitionFilter != null && 
!partitionFilter.test(partitionGetter.apply(row)))) {
                 return false;
             }
 
-            if (bucketFilter != null && numOfBuckets == 
totalBucketGetter.apply(row)) {
-                return bucketFilter.test(bucketGetter.apply(row));
+            if (bucketFilter != null
+                    && numOfBuckets == totalBucketGetter.apply(row)
+                    && !bucketFilter.test(bucketGetter.apply(row))) {
+                return false;
+            }
+
+            if (fileNameFilter != null && 
!fileNameFilter.test((fileNameGetter.apply(row)))) {
+                return false;
             }
 
             return true;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java
index a733ba9e1..83c2be59a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java
@@ -21,6 +21,7 @@ package org.apache.paimon.manifest;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFileMetaSerializer;
 import org.apache.paimon.utils.VersionedObjectSerializer;
 
@@ -87,4 +88,8 @@ public class ManifestEntrySerializer extends 
VersionedObjectSerializer<ManifestE
     public static Function<InternalRow, Integer> totalBucketGetter() {
         return row -> row.getInt(4);
     }
+
+    public static Function<InternalRow, String> fileNameGetter() {
+        return row -> row.getRow(5, 
DataFileMeta.SCHEMA.getFieldCount()).getString(0).toString();
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 8abf1f18b..32b87e406 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -87,6 +87,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     private ScanMode scanMode = ScanMode.ALL;
     private Filter<Integer> levelFilter = null;
     private Long dataFileTimeMills = null;
+    private Filter<String> fileNameFilter = null;
 
     private ManifestCacheFilter manifestCacheFilter = null;
     private ScanMetrics scanMetrics = null;
@@ -204,6 +205,12 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
         return this;
     }
 
+    @Override
+    public FileStoreScan withDataFileNameFilter(Filter<String> fileNameFilter) 
{
+        this.fileNameFilter = fileNameFilter;
+        return this;
+    }
+
     @Override
     public FileStoreScan withMetrics(ScanMetrics metrics) {
         this.scanMetrics = metrics;
@@ -488,7 +495,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
                         manifest.fileSize(),
                         
ManifestEntry.createCacheRowFilter(manifestCacheFilter, numOfBuckets),
                         ManifestEntry.createEntryRowFilter(
-                                partitionFilter, bucketFilter, numOfBuckets));
+                                partitionFilter, bucketFilter, fileNameFilter, 
numOfBuckets));
     }
 
     /** Note: Keep this thread-safe. */
@@ -503,7 +510,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
                         // see SimpleFileEntrySerializer
                         
ManifestEntry.createCacheRowFilter(manifestCacheFilter, numOfBuckets),
                         ManifestEntry.createEntryRowFilter(
-                                partitionFilter, bucketFilter, numOfBuckets));
+                                partitionFilter, bucketFilter, fileNameFilter, 
numOfBuckets));
     }
 
     // ------------------------------------------------------------------------
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index 481010269..f5249efa9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -70,6 +70,8 @@ public interface FileStoreScan {
 
     FileStoreScan withManifestCacheFilter(ManifestCacheFilter manifestFilter);
 
+    FileStoreScan withDataFileNameFilter(Filter<String> fileNameFilter);
+
     FileStoreScan withMetrics(ScanMetrics metrics);
 
     /** Produce a {@link Plan}. */
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
index e4b09df38..fe0c65728 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
@@ -34,9 +34,9 @@ import org.apache.paimon.table.query.LocalTableQuery;
 import org.apache.paimon.table.sink.RowKeyExtractor;
 import org.apache.paimon.table.sink.TableCommitImpl;
 import org.apache.paimon.table.sink.TableWriteImpl;
-import org.apache.paimon.table.source.InnerStreamTableScan;
+import org.apache.paimon.table.source.DataTableScan;
 import org.apache.paimon.table.source.InnerTableRead;
-import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.StreamDataTableScan;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.utils.BranchManager;
 import org.apache.paimon.utils.SnapshotManager;
@@ -235,13 +235,13 @@ public class PrivilegedFileStoreTable implements 
FileStoreTable {
     }
 
     @Override
-    public InnerTableScan newScan() {
+    public DataTableScan newScan() {
         privilegeChecker.assertCanSelect(identifier);
         return wrapped.newScan();
     }
 
     @Override
-    public InnerStreamTableScan newStreamScan() {
+    public StreamDataTableScan newStreamScan() {
         privilegeChecker.assertCanSelect(identifier);
         return wrapped.newStreamScan();
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 82cc47ad5..07777428c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -44,11 +44,10 @@ import org.apache.paimon.table.sink.RowKeyExtractor;
 import org.apache.paimon.table.sink.RowKindGenerator;
 import org.apache.paimon.table.sink.TableCommitImpl;
 import org.apache.paimon.table.sink.UnawareBucketRowKeyExtractor;
-import org.apache.paimon.table.source.InnerStreamTableScan;
-import org.apache.paimon.table.source.InnerStreamTableScanImpl;
-import org.apache.paimon.table.source.InnerTableScan;
-import org.apache.paimon.table.source.InnerTableScanImpl;
+import org.apache.paimon.table.source.DataTableBatchScan;
+import org.apache.paimon.table.source.DataTableStreamScan;
 import org.apache.paimon.table.source.SplitGenerator;
+import org.apache.paimon.table.source.StreamDataTableScan;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.table.source.snapshot.SnapshotReaderImpl;
 import 
org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
@@ -154,8 +153,9 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
     }
 
     @Override
-    public InnerTableScan newScan() {
-        return new InnerTableScanImpl(
+    public DataTableBatchScan newScan() {
+        return new DataTableBatchScan(
+                bucketMode(),
                 tableSchema.primaryKeys().size() > 0,
                 coreOptions(),
                 newSnapshotReader(),
@@ -163,8 +163,9 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
     }
 
     @Override
-    public InnerStreamTableScan newStreamScan() {
-        return new InnerStreamTableScanImpl(
+    public StreamDataTableScan newStreamScan() {
+        return new DataTableStreamScan(
+                bucketMode(),
                 coreOptions(),
                 newSnapshotReader(),
                 snapshotManager(),
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
index b5bebe2a7..3c56b4b3b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.table.source.DataTableScan;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.utils.BranchManager;
 import org.apache.paimon.utils.SnapshotManager;
@@ -29,6 +30,9 @@ import org.apache.paimon.utils.TagManager;
 /** A {@link Table} for data. */
 public interface DataTable extends InnerTable {
 
+    @Override
+    DataTableScan newScan();
+
     SnapshotReader newSnapshotReader();
 
     CoreOptions coreOptions();
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/InnerTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/InnerTable.java
index 456834ebc..069b8929a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/InnerTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/InnerTable.java
@@ -24,18 +24,18 @@ import org.apache.paimon.table.sink.InnerTableCommit;
 import org.apache.paimon.table.sink.InnerTableWrite;
 import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.table.sink.StreamWriteBuilderImpl;
-import org.apache.paimon.table.source.InnerStreamTableScan;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.ReadBuilderImpl;
+import org.apache.paimon.table.source.StreamDataTableScan;
 
 /** Inner table for implementation, provide newScan, newRead ... directly. */
 public interface InnerTable extends Table {
 
     InnerTableScan newScan();
 
-    InnerStreamTableScan newStreamScan();
+    StreamDataTableScan newStreamScan();
 
     InnerTableRead newRead();
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
index dcb62dfcb..dc50a83d9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
@@ -23,7 +23,7 @@ import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.table.sink.InnerTableCommit;
 import org.apache.paimon.table.sink.InnerTableWrite;
 import org.apache.paimon.table.sink.StreamWriteBuilder;
-import org.apache.paimon.table.source.InnerStreamTableScan;
+import org.apache.paimon.table.source.StreamDataTableScan;
 
 import java.time.Duration;
 import java.util.Collections;
@@ -87,7 +87,7 @@ public interface ReadonlyTable extends InnerTable {
     }
 
     @Override
-    default InnerStreamTableScan newStreamScan() {
+    default StreamDataTableScan newStreamScan() {
         throw new UnsupportedOperationException(
                 String.format(
                         "Readonly Table %s does not support newStreamScan.",
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
similarity index 95%
rename from 
paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
rename to 
paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index 821f60cbb..66b4dd8e4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -56,41 +56,41 @@ import static 
org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** An abstraction layer above {@link FileStoreScan} to provide input split 
generation. */
-public abstract class AbstractInnerTableScan implements InnerTableScan {
+public abstract class AbstractDataTableScan implements DataTableScan {
 
     private final CoreOptions options;
     protected final SnapshotReader snapshotReader;
 
-    protected AbstractInnerTableScan(CoreOptions options, SnapshotReader 
snapshotReader) {
+    protected AbstractDataTableScan(CoreOptions options, SnapshotReader 
snapshotReader) {
         this.options = options;
         this.snapshotReader = snapshotReader;
     }
 
     @VisibleForTesting
-    public AbstractInnerTableScan withBucket(int bucket) {
+    public AbstractDataTableScan withBucket(int bucket) {
         snapshotReader.withBucket(bucket);
         return this;
     }
 
     @Override
-    public AbstractInnerTableScan withBucketFilter(Filter<Integer> 
bucketFilter) {
+    public AbstractDataTableScan withBucketFilter(Filter<Integer> 
bucketFilter) {
         snapshotReader.withBucketFilter(bucketFilter);
         return this;
     }
 
     @Override
-    public AbstractInnerTableScan withPartitionFilter(Map<String, String> 
partitionSpec) {
+    public AbstractDataTableScan withPartitionFilter(Map<String, String> 
partitionSpec) {
         snapshotReader.withPartitionFilter(partitionSpec);
         return this;
     }
 
     @Override
-    public AbstractInnerTableScan withLevelFilter(Filter<Integer> levelFilter) 
{
+    public AbstractDataTableScan withLevelFilter(Filter<Integer> levelFilter) {
         snapshotReader.withLevelFilter(levelFilter);
         return this;
     }
 
-    public AbstractInnerTableScan withMetricsRegistry(MetricRegistry 
metricsRegistry) {
+    public AbstractDataTableScan withMetricsRegistry(MetricRegistry 
metricsRegistry) {
         snapshotReader.withMetricRegistry(metricsRegistry);
         return this;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
similarity index 89%
rename from 
paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
rename to 
paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
index b307279d0..92652aab3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.source;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.operation.DefaultValueAssigner;
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.table.source.snapshot.StartingScanner;
 import org.apache.paimon.table.source.snapshot.StartingScanner.ScannedResult;
@@ -31,8 +32,9 @@ import java.util.List;
 import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
 
 /** {@link TableScan} implementation for batch planning. */
-public class InnerTableScanImpl extends AbstractInnerTableScan {
+public class DataTableBatchScan extends AbstractDataTableScan {
 
+    private final BucketMode bucketMode;
     private final DefaultValueAssigner defaultValueAssigner;
 
     private StartingScanner startingScanner;
@@ -40,12 +42,14 @@ public class InnerTableScanImpl extends 
AbstractInnerTableScan {
 
     private Integer pushDownLimit;
 
-    public InnerTableScanImpl(
+    public DataTableBatchScan(
+            BucketMode bucketMode,
             boolean pkTable,
             CoreOptions options,
             SnapshotReader snapshotReader,
             DefaultValueAssigner defaultValueAssigner) {
         super(options, snapshotReader);
+        this.bucketMode = bucketMode;
         this.hasNext = true;
         this.defaultValueAssigner = defaultValueAssigner;
         if (pkTable && (options.deletionVectorsEnabled() || 
options.mergeEngine() == FIRST_ROW)) {
@@ -118,4 +122,10 @@ public class InnerTableScanImpl extends 
AbstractInnerTableScan {
             return 0L;
         }
     }
+
+    @Override
+    public DataTableScan withShard(int indexOfThisSubtask, int 
numberOfParallelSubtasks) {
+        snapshotReader.withShard(bucketMode, indexOfThisSubtask, 
numberOfParallelSubtasks);
+        return this;
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScan.java
 b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableScan.java
similarity index 66%
copy from 
paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScan.java
copy to 
paimon-core/src/main/java/org/apache/paimon/table/source/DataTableScan.java
index b395cc9a8..29c825b0b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableScan.java
@@ -18,15 +18,9 @@
 
 package org.apache.paimon.table.source;
 
-import org.apache.paimon.table.source.snapshot.StartingContext;
+/** Table scan for data table. */
+public interface DataTableScan extends InnerTableScan {
 
-import javax.annotation.Nullable;
-
-/** Streaming {@link InnerTableScan} with {@link StreamTableScan}. */
-public interface InnerStreamTableScan extends InnerTableScan, StreamTableScan {
-
-    StartingContext startingContext();
-
-    /** Restore from checkpoint next snapshot id with scan kind. */
-    void restore(@Nullable Long nextSnapshotId, boolean scanAllSnapshot);
+    /** Specify the shard to be read, and allocate sharded files to read 
records. */
+    DataTableScan withShard(int indexOfThisSubtask, int 
numberOfParallelSubtasks);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
similarity index 94%
rename from 
paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
rename to 
paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
index 3e03dc0a1..8aeab36ff 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
@@ -24,6 +24,7 @@ import org.apache.paimon.consumer.Consumer;
 import org.apache.paimon.lookup.LookupStrategy;
 import org.apache.paimon.operation.DefaultValueAssigner;
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.source.snapshot.AllDeltaFollowUpScanner;
 import org.apache.paimon.table.source.snapshot.BoundedChecker;
 import 
org.apache.paimon.table.source.snapshot.CompactionChangelogFollowUpScanner;
@@ -48,11 +49,11 @@ import javax.annotation.Nullable;
 import static org.apache.paimon.CoreOptions.ChangelogProducer.FULL_COMPACTION;
 
 /** {@link StreamTableScan} implementation for streaming planning. */
-public class InnerStreamTableScanImpl extends AbstractInnerTableScan
-        implements InnerStreamTableScan {
+public class DataTableStreamScan extends AbstractDataTableScan implements 
StreamDataTableScan {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(InnerStreamTableScanImpl.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(DataTableStreamScan.class);
 
+    private final BucketMode bucketMode;
     private final CoreOptions options;
     private final SnapshotManager snapshotManager;
     private final boolean supportStreamingReadOverwrite;
@@ -68,13 +69,15 @@ public class InnerStreamTableScanImpl extends 
AbstractInnerTableScan
     @Nullable private Long currentWatermark;
     @Nullable private Long nextSnapshotId;
 
-    public InnerStreamTableScanImpl(
+    public DataTableStreamScan(
+            BucketMode bucketMode,
             CoreOptions options,
             SnapshotReader snapshotReader,
             SnapshotManager snapshotManager,
             boolean supportStreamingReadOverwrite,
             DefaultValueAssigner defaultValueAssigner) {
         super(options, snapshotReader);
+        this.bucketMode = bucketMode;
         this.options = options;
         this.snapshotManager = snapshotManager;
         this.supportStreamingReadOverwrite = supportStreamingReadOverwrite;
@@ -87,7 +90,7 @@ public class InnerStreamTableScanImpl extends 
AbstractInnerTableScan
     }
 
     @Override
-    public InnerStreamTableScanImpl withFilter(Predicate predicate) {
+    public DataTableStreamScan withFilter(Predicate predicate) {
         
snapshotReader.withFilter(defaultValueAssigner.handlePredicate(predicate));
         return this;
     }
@@ -279,4 +282,10 @@ public class InnerStreamTableScanImpl extends 
AbstractInnerTableScan
             snapshotReader.consumerManager().resetConsumer(consumerId, new 
Consumer(nextSnapshot));
         }
     }
+
+    @Override
+    public DataTableScan withShard(int indexOfThisSubtask, int 
numberOfParallelSubtasks) {
+        snapshotReader.withShard(bucketMode, indexOfThisSubtask, 
numberOfParallelSubtasks);
+        return this;
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
index fc7e41569..d0b03c966 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
@@ -122,6 +122,9 @@ public interface ReadBuilder extends Serializable {
     /** the row number pushed down. */
     ReadBuilder withLimit(int limit);
 
+    /** Specify the shard to be read, and allocate sharded files to read 
records. */
+    ReadBuilder withShard(int indexOfThisSubtask, int 
numberOfParallelSubtasks);
+
     /** Create a {@link TableScan} to perform batch planning. */
     TableScan newScan();
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
index 1ceb614ec..8813f9b3f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
@@ -41,6 +41,9 @@ public class ReadBuilderImpl implements ReadBuilder {
 
     private Integer limit = null;
 
+    private Integer shardIndexOfThisSubtask;
+    private Integer shardNumberOfParallelSubtasks;
+
     private Map<String, String> partitionSpec;
 
     public ReadBuilderImpl(InnerTable table) {
@@ -88,10 +91,16 @@ public class ReadBuilderImpl implements ReadBuilder {
         return this;
     }
 
+    @Override
+    public ReadBuilder withShard(int indexOfThisSubtask, int 
numberOfParallelSubtasks) {
+        this.shardIndexOfThisSubtask = indexOfThisSubtask;
+        this.shardNumberOfParallelSubtasks = numberOfParallelSubtasks;
+        return this;
+    }
+
     @Override
     public TableScan newScan() {
-        InnerTableScan tableScan =
-                
table.newScan().withFilter(filter).withPartitionFilter(partitionSpec);
+        InnerTableScan tableScan = configureScan(table.newScan());
         if (limit != null) {
             tableScan.withLimit(limit);
         }
@@ -100,7 +109,22 @@ public class ReadBuilderImpl implements ReadBuilder {
 
     @Override
     public StreamTableScan newStreamScan() {
-        return (StreamTableScan) table.newStreamScan().withFilter(filter);
+        return (StreamTableScan) configureScan(table.newStreamScan());
+    }
+
+    private InnerTableScan configureScan(InnerTableScan scan) {
+        scan.withFilter(filter).withPartitionFilter(partitionSpec);
+
+        if (shardIndexOfThisSubtask != null) {
+            if (scan instanceof DataTableScan) {
+                ((DataTableScan) scan)
+                        .withShard(shardIndexOfThisSubtask, 
shardNumberOfParallelSubtasks);
+            } else {
+                throw new UnsupportedOperationException(
+                        "Unsupported table scan type for shard configuring, 
the scan is: " + scan);
+            }
+        }
+        return scan;
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScan.java
similarity index 93%
rename from 
paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScan.java
rename to 
paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScan.java
index b395cc9a8..f5e6e3a0a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScan.java
@@ -23,7 +23,7 @@ import 
org.apache.paimon.table.source.snapshot.StartingContext;
 import javax.annotation.Nullable;
 
 /** Streaming {@link InnerTableScan} with {@link StreamTableScan}. */
-public interface InnerStreamTableScan extends InnerTableScan, StreamTableScan {
+public interface StreamDataTableScan extends DataTableScan, StreamTableScan {
 
     StartingContext startingContext();
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index ac84f4f16..26840ade7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -24,6 +24,7 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.table.source.Split;
@@ -66,6 +67,21 @@ public interface SnapshotReader {
 
     SnapshotReader withBucketFilter(Filter<Integer> bucketFilter);
 
+    SnapshotReader withDataFileNameFilter(Filter<String> fileNameFilter);
+
+    default SnapshotReader withShard(
+            BucketMode bucketMode, int indexOfThisSubtask, int 
numberOfParallelSubtasks) {
+        if (bucketMode == BucketMode.BUCKET_UNAWARE) {
+            withDataFileNameFilter(
+                    file ->
+                            Math.abs(file.hashCode() % 
numberOfParallelSubtasks)
+                                    == indexOfThisSubtask);
+        } else {
+            withBucketFilter(bucket -> bucket % numberOfParallelSubtasks == 
indexOfThisSubtask);
+        }
+        return this;
+    }
+
     SnapshotReader withMetricRegistry(MetricRegistry registry);
 
     /** Get splits plan from snapshot. */
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index be91096ab..cc793c641 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -221,6 +221,12 @@ public class SnapshotReaderImpl implements SnapshotReader {
         return this;
     }
 
+    @Override
+    public SnapshotReader withDataFileNameFilter(Filter<String> 
fileNameFilter) {
+        scan.withDataFileNameFilter(fileNameFilter);
+        return this;
+    }
+
     /** Get splits from {@link FileKind#ADD} files. */
     @Override
     public Plan read() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 7ff7f936e..196b2fa4c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -38,12 +38,13 @@ import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.ReadonlyTable;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.InnerStreamTableScan;
+import org.apache.paimon.table.source.DataTableScan;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.SplitGenerator;
+import org.apache.paimon.table.source.StreamDataTableScan;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.table.source.snapshot.StartingContext;
@@ -133,12 +134,12 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
     }
 
     @Override
-    public InnerTableScan newScan() {
+    public DataTableScan newScan() {
         return new AuditLogBatchScan(dataTable.newScan());
     }
 
     @Override
-    public InnerStreamTableScan newStreamScan() {
+    public StreamDataTableScan newStreamScan() {
         return new AuditLogStreamScan(dataTable.newStreamScan());
     }
 
@@ -275,6 +276,12 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
             return this;
         }
 
+        @Override
+        public SnapshotReader withDataFileNameFilter(Filter<String> 
fileNameFilter) {
+            snapshotReader.withDataFileNameFilter(fileNameFilter);
+            return this;
+        }
+
         @Override
         public SnapshotReader withMetricRegistry(MetricRegistry registry) {
             snapshotReader.withMetricRegistry(registry);
@@ -307,11 +314,11 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
         }
     }
 
-    private class AuditLogBatchScan implements InnerTableScan {
+    private class AuditLogBatchScan implements DataTableScan {
 
-        private final InnerTableScan batchScan;
+        private final DataTableScan batchScan;
 
-        private AuditLogBatchScan(InnerTableScan batchScan) {
+        private AuditLogBatchScan(DataTableScan batchScan) {
             this.batchScan = batchScan;
         }
 
@@ -360,18 +367,24 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
         public List<BinaryRow> listPartitions() {
             return batchScan.listPartitions();
         }
+
+        @Override
+        public DataTableScan withShard(int indexOfThisSubtask, int 
numberOfParallelSubtasks) {
+            batchScan.withShard(indexOfThisSubtask, numberOfParallelSubtasks);
+            return this;
+        }
     }
 
-    private class AuditLogStreamScan implements InnerStreamTableScan {
+    private class AuditLogStreamScan implements StreamDataTableScan {
 
-        private final InnerStreamTableScan streamScan;
+        private final StreamDataTableScan streamScan;
 
-        private AuditLogStreamScan(InnerStreamTableScan streamScan) {
+        private AuditLogStreamScan(StreamDataTableScan streamScan) {
             this.streamScan = streamScan;
         }
 
         @Override
-        public InnerStreamTableScan withFilter(Predicate predicate) {
+        public StreamDataTableScan withFilter(Predicate predicate) {
             convert(predicate).ifPresent(streamScan::withFilter);
             return this;
         }
@@ -419,10 +432,16 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
         }
 
         @Override
-        public InnerStreamTableScan withMetricsRegistry(MetricRegistry 
metricsRegistry) {
+        public StreamDataTableScan withMetricsRegistry(MetricRegistry 
metricsRegistry) {
             streamScan.withMetricsRegistry(metricsRegistry);
             return this;
         }
+
+        @Override
+        public DataTableScan withShard(int indexOfThisSubtask, int 
numberOfParallelSubtasks) {
+            streamScan.withShard(indexOfThisSubtask, numberOfParallelSubtasks);
+            return this;
+        }
     }
 
     private class AuditLogRead implements InnerTableRead {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
index 4c9b9a601..7712b1ca2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
@@ -33,10 +33,10 @@ import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.ReadonlyTable;
 import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.InnerStreamTableScan;
+import org.apache.paimon.table.source.DataTableScan;
 import org.apache.paimon.table.source.InnerTableRead;
-import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.StreamDataTableScan;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.types.BigIntType;
@@ -149,12 +149,12 @@ public class BucketsTable implements DataTable, 
ReadonlyTable {
     }
 
     @Override
-    public InnerTableScan newScan() {
+    public DataTableScan newScan() {
         return wrapped.newScan();
     }
 
     @Override
-    public InnerStreamTableScan newStreamScan() {
+    public StreamDataTableScan newStreamScan() {
         return wrapped.newStreamScan();
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
index 7825b93e4..58f4bc56a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
@@ -34,10 +34,10 @@ import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.ReadonlyTable;
 import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.InnerStreamTableScan;
+import org.apache.paimon.table.source.DataTableScan;
 import org.apache.paimon.table.source.InnerTableRead;
-import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.StreamDataTableScan;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.types.BigIntType;
@@ -136,12 +136,12 @@ public class FileMonitorTable implements DataTable, 
ReadonlyTable {
     }
 
     @Override
-    public InnerTableScan newScan() {
+    public DataTableScan newScan() {
         return wrapped.newScan();
     }
 
     @Override
-    public InnerStreamTableScan newStreamScan() {
+    public StreamDataTableScan newStreamScan() {
         return wrapped.newStreamScan();
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
index 0b0b0e586..c88e2c66c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
@@ -26,11 +26,10 @@ import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.ReadonlyTable;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.InnerStreamTableScan;
-import org.apache.paimon.table.source.InnerStreamTableScanImpl;
+import org.apache.paimon.table.source.DataTableBatchScan;
+import org.apache.paimon.table.source.DataTableStreamScan;
 import org.apache.paimon.table.source.InnerTableRead;
-import org.apache.paimon.table.source.InnerTableScan;
-import org.apache.paimon.table.source.InnerTableScanImpl;
+import org.apache.paimon.table.source.StreamDataTableScan;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.BranchManager;
@@ -98,8 +97,9 @@ public class ReadOptimizedTable implements DataTable, 
ReadonlyTable {
     }
 
     @Override
-    public InnerTableScan newScan() {
-        return new InnerTableScanImpl(
+    public DataTableBatchScan newScan() {
+        return new DataTableBatchScan(
+                dataTable.bucketMode(),
                 dataTable.schema().primaryKeys().size() > 0,
                 coreOptions(),
                 newSnapshotReader(),
@@ -107,8 +107,9 @@ public class ReadOptimizedTable implements DataTable, 
ReadonlyTable {
     }
 
     @Override
-    public InnerStreamTableScan newStreamScan() {
-        return new InnerStreamTableScanImpl(
+    public StreamDataTableScan newStreamScan() {
+        return new DataTableStreamScan(
+                dataTable.bucketMode(),
                 coreOptions(),
                 newSnapshotReader(),
                 snapshotManager(),
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
index c5bd22dc5..129f928c9 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
@@ -42,8 +42,8 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.sink.TableCommitImpl;
 import org.apache.paimon.table.sink.TableWriteImpl;
-import org.apache.paimon.table.source.InnerStreamTableScan;
 import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.StreamDataTableScan;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowKind;
@@ -267,7 +267,7 @@ public class OrphanFilesCleanTest {
                 changelogData.keySet().stream()
                         .max(Comparator.comparingLong(Long::longValue))
                         .get();
-        InnerStreamTableScan scan = scanTable.newStreamScan();
+        StreamDataTableScan scan = scanTable.newStreamScan();
         TreeMap<Long, List<InternalRow>> data = new TreeMap<>(changelogData);
         // clear the data < the smallest changelog data.
         data.headMap(changelogs.get(0).id()).clear();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
index b569a76a8..5eb0b5db6 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
@@ -491,6 +491,23 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
         reader.forEachRemaining(row -> 
assertThat(row.getString(1).toString()).isEqualTo("b"));
     }
 
+    @Test
+    public void testWithShardAppendTable() throws Exception {
+        FileStoreTable table = createFileStoreTable(conf -> conf.set(BUCKET, 
-1));
+        innerTestWithShard(table);
+    }
+
+    @Test
+    public void testWithShardBucketedTable() throws Exception {
+        FileStoreTable table =
+                createFileStoreTable(
+                        conf -> {
+                            conf.set(BUCKET, 5);
+                            conf.set(BUCKET_KEY, "a");
+                        });
+        innerTestWithShard(table);
+    }
+
     @Test
     public void testBloomFilterForMapField() throws Exception {
         RowType rowType =
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index b30dab420..d10c9e8fa 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -386,6 +386,35 @@ public abstract class FileStoreTableTestBase {
         assertThat(((DataSplit) splits.get(0)).bucket()).isEqualTo(1);
     }
 
+    protected void innerTestWithShard(FileStoreTable table) throws Exception {
+        StreamTableWrite write = table.newWrite(commitUser);
+        write.write(rowData(1, 1, 2L));
+        write.write(rowData(1, 3, 4L));
+        write.write(rowData(1, 5, 6L));
+        write.write(rowData(1, 7, 8L));
+        write.write(rowData(1, 9, 10L));
+        TableCommitImpl commit = table.newCommit(commitUser);
+        commit.commit(0, write.prepareCommit(true, 0));
+        write.close();
+        commit.close();
+
+        ReadBuilder readBuilder = table.newReadBuilder();
+
+        List<Split> splits = new ArrayList<>();
+        splits.addAll(readBuilder.withShard(0, 3).newScan().plan().splits());
+        splits.addAll(readBuilder.withShard(1, 3).newScan().plan().splits());
+        splits.addAll(readBuilder.withShard(2, 3).newScan().plan().splits());
+
+        assertThat(getResult(readBuilder.newRead(), splits, 
BATCH_ROW_TO_STRING))
+                .hasSameElementsAs(
+                        Arrays.asList(
+                                
"1|3|4|binary|varbinary|mapKey:mapVal|multiset",
+                                
"1|9|10|binary|varbinary|mapKey:mapVal|multiset",
+                                
"1|1|2|binary|varbinary|mapKey:mapVal|multiset",
+                                
"1|5|6|binary|varbinary|mapKey:mapVal|multiset",
+                                
"1|7|8|binary|varbinary|mapKey:mapVal|multiset"));
+    }
+
     @Test
     public void testAbort() throws Exception {
         FileStoreTable table = createFileStoreTable(conf -> conf.set(BUCKET, 
1));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index e4a6b1d3f..a952e032f 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -782,6 +782,12 @@ public class PrimaryKeyFileStoreTableTest extends 
FileStoreTableTestBase {
                                 
"1|10|200|binary|varbinary|mapKey:mapVal|multiset"));
     }
 
+    @Test
+    public void testWithShard() throws Exception {
+        FileStoreTable table = createFileStoreTable(conf -> conf.set(BUCKET, 
5));
+        innerTestWithShard(table);
+    }
+
     @Test
     public void testSlowCommit() throws Exception {
         FileStoreTable table = createFileStoreTable();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DefaultValueScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DefaultValueScannerTest.java
index ac2e21118..a99d6b234 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DefaultValueScannerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DefaultValueScannerTest.java
@@ -25,7 +25,7 @@ import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
-import org.apache.paimon.table.source.InnerStreamTableScan;
+import org.apache.paimon.table.source.StreamDataTableScan;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.table.source.TableScan;
 
@@ -43,7 +43,7 @@ public class DefaultValueScannerTest extends ScannerTestBase {
         TableRead read = table.newRead();
         StreamTableWrite write = table.newWrite(commitUser);
         StreamTableCommit commit = table.newCommit(commitUser);
-        InnerStreamTableScan scan = table.newStreamScan();
+        StreamDataTableScan scan = table.newStreamScan();
 
         write.write(rowData(1, 10, 101L));
         commit.commit(0, write.prepareCommit(true, 0));
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
index 83470fb78..ca61ab4eb 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
@@ -21,8 +21,8 @@ package org.apache.paimon.flink.source;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
 import org.apache.paimon.table.BucketMode;
-import org.apache.paimon.table.source.InnerStreamTableScan;
 import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.StreamDataTableScan;
 import org.apache.paimon.table.source.StreamTableScan;
 
 import org.apache.flink.api.connector.source.Boundedness;
@@ -77,7 +77,7 @@ public class ContinuousFileStoreSource extends FlinkSource {
         }
         StreamTableScan scan = readBuilder.newStreamScan();
         if (metricGroup(context) != null) {
-            ((InnerStreamTableScan) scan)
+            ((StreamDataTableScan) scan)
                     .withMetricsRegistry(new 
FlinkMetricRegistry(context.metricGroup()));
         }
         scan.restore(nextSnapshotId);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java
index 7f6c7ac0d..90c283bf8 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java
@@ -26,8 +26,8 @@ import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.InnerStreamTableScan;
 import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.StreamDataTableScan;
 import org.apache.paimon.table.source.StreamTableScan;
 import org.apache.paimon.utils.SnapshotManager;
 
@@ -117,7 +117,7 @@ public class LogHybridSourceFactory
                 StreamTableScan scan = readBuilder.newStreamScan();
                 // register scan metrics
                 if (context.metricGroup() != null) {
-                    ((InnerStreamTableScan) scan)
+                    ((StreamDataTableScan) scan)
                             .withMetricsRegistry(new 
FlinkMetricRegistry(context.metricGroup()));
                 }
                 splits = splitGenerator.createSplits(scan.plan());
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
index dbce04719..91df04e6d 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
@@ -22,7 +22,7 @@ import org.apache.paimon.CoreOptions
 import org.apache.paimon.data.BinaryRow
 import org.apache.paimon.spark.SparkTypeUtils
 import org.apache.paimon.table.DataTable
-import org.apache.paimon.table.source.{DataSplit, InnerStreamTableScan}
+import org.apache.paimon.table.source.{DataSplit, StreamDataTableScan}
 import org.apache.paimon.table.source.TableScan.Plan
 import org.apache.paimon.table.source.snapshot.StartingContext
 import org.apache.paimon.utils.{InternalRowPartitionComputer, TypeUtils}
@@ -44,7 +44,7 @@ private[spark] trait StreamHelper {
 
   var lastTriggerMillis: Long
 
-  private lazy val streamScan: InnerStreamTableScan = table.newStreamScan()
+  private lazy val streamScan: StreamDataTableScan = table.newStreamScan()
 
   private lazy val partitionSchema: StructType =
     SparkTypeUtils.fromPaimonRowType(TypeUtils.project(table.rowType(), 
table.partitionKeys()))

Reply via email to