This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a854d4ede [core] Introduce ReadBuilder.withShard (#3423)
a854d4ede is described below
commit a854d4ede6dd41123167358bf47d0c9020fd584a
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu May 30 14:09:00 2024 +0800
[core] Introduce ReadBuilder.withShard (#3423)
---
.../paimon/crosspartition/IndexBootstrap.java | 4 +--
.../java/org/apache/paimon/io/DataFileMeta.java | 40 +++++++++++----------
.../org/apache/paimon/manifest/ManifestEntry.java | 12 +++++--
.../paimon/manifest/ManifestEntrySerializer.java | 5 +++
.../paimon/operation/AbstractFileStoreScan.java | 11 ++++--
.../org/apache/paimon/operation/FileStoreScan.java | 2 ++
.../paimon/privilege/PrivilegedFileStoreTable.java | 8 ++---
.../paimon/table/AbstractFileStoreTable.java | 17 ++++-----
.../java/org/apache/paimon/table/DataTable.java | 4 +++
.../java/org/apache/paimon/table/InnerTable.java | 4 +--
.../org/apache/paimon/table/ReadonlyTable.java | 4 +--
...erTableScan.java => AbstractDataTableScan.java} | 14 ++++----
...rTableScanImpl.java => DataTableBatchScan.java} | 14 ++++++--
...nnerStreamTableScan.java => DataTableScan.java} | 14 +++-----
...TableScanImpl.java => DataTableStreamScan.java} | 19 +++++++---
.../apache/paimon/table/source/ReadBuilder.java | 3 ++
.../paimon/table/source/ReadBuilderImpl.java | 30 ++++++++++++++--
...reamTableScan.java => StreamDataTableScan.java} | 2 +-
.../table/source/snapshot/SnapshotReader.java | 16 +++++++++
.../table/source/snapshot/SnapshotReaderImpl.java | 6 ++++
.../apache/paimon/table/system/AuditLogTable.java | 41 ++++++++++++++++------
.../apache/paimon/table/system/BucketsTable.java | 8 ++---
.../paimon/table/system/FileMonitorTable.java | 8 ++---
.../paimon/table/system/ReadOptimizedTable.java | 17 ++++-----
.../paimon/operation/OrphanFilesCleanTest.java | 4 +--
.../paimon/table/AppendOnlyFileStoreTableTest.java | 17 +++++++++
.../paimon/table/FileStoreTableTestBase.java | 29 +++++++++++++++
.../paimon/table/PrimaryKeyFileStoreTableTest.java | 6 ++++
.../source/snapshot/DefaultValueScannerTest.java | 4 +--
.../flink/source/ContinuousFileStoreSource.java | 4 +--
.../flink/source/LogHybridSourceFactory.java | 4 +--
.../apache/paimon/spark/sources/StreamHelper.scala | 4 +--
32 files changed, 270 insertions(+), 105 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
index 260af9a9a..ec8244a2a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
+++
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
@@ -27,7 +27,7 @@ import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.AbstractInnerTableScan;
+import org.apache.paimon.table.source.AbstractDataTableScan;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
@@ -84,7 +84,7 @@ public class IndexBootstrap implements Serializable {
.newReadBuilder()
.withProjection(keyProjection);
- AbstractInnerTableScan tableScan = (AbstractInnerTableScan)
readBuilder.newScan();
+ AbstractDataTableScan tableScan = (AbstractDataTableScan)
readBuilder.newScan();
List<Split> splits =
tableScan
.withBucketFilter(bucket -> bucket % numAssigners ==
assignId)
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
index bfdfaf249..834710725 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
@@ -53,6 +53,27 @@ import static
org.apache.paimon.utils.SerializationUtils.newStringType;
/** Metadata of a data file. */
public class DataFileMeta {
+ public static final RowType SCHEMA =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "_FILE_NAME",
newStringType(false)),
+ new DataField(1, "_FILE_SIZE", new
BigIntType(false)),
+ new DataField(2, "_ROW_COUNT", new
BigIntType(false)),
+ new DataField(3, "_MIN_KEY", newBytesType(false)),
+ new DataField(4, "_MAX_KEY", newBytesType(false)),
+ new DataField(5, "_KEY_STATS",
SimpleStatsConverter.schema()),
+ new DataField(6, "_VALUE_STATS",
SimpleStatsConverter.schema()),
+ new DataField(7, "_MIN_SEQUENCE_NUMBER", new
BigIntType(false)),
+ new DataField(8, "_MAX_SEQUENCE_NUMBER", new
BigIntType(false)),
+ new DataField(9, "_SCHEMA_ID", new
BigIntType(false)),
+ new DataField(10, "_LEVEL", new IntType(false)),
+ new DataField(
+ 11, "_EXTRA_FILES", new ArrayType(false,
newStringType(false))),
+ new DataField(12, "_CREATION_TIME",
DataTypes.TIMESTAMP_MILLIS()),
+ new DataField(13, "_DELETE_ROW_COUNT", new
BigIntType(true)),
+ new DataField(14, "_EMBEDDED_FILE_INDEX",
newBytesType(true)),
+ new DataField(15, "_FILE_SOURCE", new
TinyIntType(true))));
+
public static final BinaryRow EMPTY_MIN_KEY = EMPTY_ROW;
public static final BinaryRow EMPTY_MAX_KEY = EMPTY_ROW;
public static final int DUMMY_LEVEL = 0;
@@ -430,24 +451,7 @@ public class DataFileMeta {
}
public static RowType schema() {
- List<DataField> fields = new ArrayList<>();
- fields.add(new DataField(0, "_FILE_NAME", newStringType(false)));
- fields.add(new DataField(1, "_FILE_SIZE", new BigIntType(false)));
- fields.add(new DataField(2, "_ROW_COUNT", new BigIntType(false)));
- fields.add(new DataField(3, "_MIN_KEY", newBytesType(false)));
- fields.add(new DataField(4, "_MAX_KEY", newBytesType(false)));
- fields.add(new DataField(5, "_KEY_STATS",
SimpleStatsConverter.schema()));
- fields.add(new DataField(6, "_VALUE_STATS",
SimpleStatsConverter.schema()));
- fields.add(new DataField(7, "_MIN_SEQUENCE_NUMBER", new
BigIntType(false)));
- fields.add(new DataField(8, "_MAX_SEQUENCE_NUMBER", new
BigIntType(false)));
- fields.add(new DataField(9, "_SCHEMA_ID", new BigIntType(false)));
- fields.add(new DataField(10, "_LEVEL", new IntType(false)));
- fields.add(new DataField(11, "_EXTRA_FILES", new ArrayType(false,
newStringType(false))));
- fields.add(new DataField(12, "_CREATION_TIME",
DataTypes.TIMESTAMP_MILLIS()));
- fields.add(new DataField(13, "_DELETE_ROW_COUNT", new
BigIntType(true)));
- fields.add(new DataField(14, "_EMBEDDED_FILE_INDEX",
newBytesType(true)));
- fields.add(new DataField(15, "_FILE_SOURCE", new TinyIntType(true)));
- return new RowType(fields);
+ return SCHEMA;
}
public static long getMaxSequenceNumber(List<DataFileMeta> fileMetas) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
index 34ad169ff..af1740b00 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
@@ -171,19 +171,27 @@ public class ManifestEntry implements FileEntry {
public static Filter<InternalRow> createEntryRowFilter(
@Nullable PartitionPredicate partitionFilter,
@Nullable Filter<Integer> bucketFilter,
+ @Nullable Filter<String> fileNameFilter,
int numOfBuckets) {
Function<InternalRow, BinaryRow> partitionGetter =
ManifestEntrySerializer.partitionGetter();
Function<InternalRow, Integer> bucketGetter =
ManifestEntrySerializer.bucketGetter();
Function<InternalRow, Integer> totalBucketGetter =
ManifestEntrySerializer.totalBucketGetter();
+ Function<InternalRow, String> fileNameGetter =
ManifestEntrySerializer.fileNameGetter();
return row -> {
if ((partitionFilter != null &&
!partitionFilter.test(partitionGetter.apply(row)))) {
return false;
}
- if (bucketFilter != null && numOfBuckets ==
totalBucketGetter.apply(row)) {
- return bucketFilter.test(bucketGetter.apply(row));
+ if (bucketFilter != null
+ && numOfBuckets == totalBucketGetter.apply(row)
+ && !bucketFilter.test(bucketGetter.apply(row))) {
+ return false;
+ }
+
+ if (fileNameFilter != null &&
!fileNameFilter.test((fileNameGetter.apply(row)))) {
+ return false;
}
return true;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java
index a733ba9e1..83c2be59a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java
@@ -21,6 +21,7 @@ package org.apache.paimon.manifest;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.utils.VersionedObjectSerializer;
@@ -87,4 +88,8 @@ public class ManifestEntrySerializer extends
VersionedObjectSerializer<ManifestE
public static Function<InternalRow, Integer> totalBucketGetter() {
return row -> row.getInt(4);
}
+
+ public static Function<InternalRow, String> fileNameGetter() {
+ return row -> row.getRow(5,
DataFileMeta.SCHEMA.getFieldCount()).getString(0).toString();
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 8abf1f18b..32b87e406 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -87,6 +87,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
private ScanMode scanMode = ScanMode.ALL;
private Filter<Integer> levelFilter = null;
private Long dataFileTimeMills = null;
+ private Filter<String> fileNameFilter = null;
private ManifestCacheFilter manifestCacheFilter = null;
private ScanMetrics scanMetrics = null;
@@ -204,6 +205,12 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
return this;
}
+ @Override
+ public FileStoreScan withDataFileNameFilter(Filter<String> fileNameFilter)
{
+ this.fileNameFilter = fileNameFilter;
+ return this;
+ }
+
@Override
public FileStoreScan withMetrics(ScanMetrics metrics) {
this.scanMetrics = metrics;
@@ -488,7 +495,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
manifest.fileSize(),
ManifestEntry.createCacheRowFilter(manifestCacheFilter, numOfBuckets),
ManifestEntry.createEntryRowFilter(
- partitionFilter, bucketFilter, numOfBuckets));
+ partitionFilter, bucketFilter, fileNameFilter,
numOfBuckets));
}
/** Note: Keep this thread-safe. */
@@ -503,7 +510,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
// see SimpleFileEntrySerializer
ManifestEntry.createCacheRowFilter(manifestCacheFilter, numOfBuckets),
ManifestEntry.createEntryRowFilter(
- partitionFilter, bucketFilter, numOfBuckets));
+ partitionFilter, bucketFilter, fileNameFilter,
numOfBuckets));
}
// ------------------------------------------------------------------------
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index 481010269..f5249efa9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -70,6 +70,8 @@ public interface FileStoreScan {
FileStoreScan withManifestCacheFilter(ManifestCacheFilter manifestFilter);
+ FileStoreScan withDataFileNameFilter(Filter<String> fileNameFilter);
+
FileStoreScan withMetrics(ScanMetrics metrics);
/** Produce a {@link Plan}. */
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
index e4b09df38..fe0c65728 100644
---
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
@@ -34,9 +34,9 @@ import org.apache.paimon.table.query.LocalTableQuery;
import org.apache.paimon.table.sink.RowKeyExtractor;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWriteImpl;
-import org.apache.paimon.table.source.InnerStreamTableScan;
+import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.table.source.InnerTableRead;
-import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.StreamDataTableScan;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.SnapshotManager;
@@ -235,13 +235,13 @@ public class PrivilegedFileStoreTable implements
FileStoreTable {
}
@Override
- public InnerTableScan newScan() {
+ public DataTableScan newScan() {
privilegeChecker.assertCanSelect(identifier);
return wrapped.newScan();
}
@Override
- public InnerStreamTableScan newStreamScan() {
+ public StreamDataTableScan newStreamScan() {
privilegeChecker.assertCanSelect(identifier);
return wrapped.newStreamScan();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 82cc47ad5..07777428c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -44,11 +44,10 @@ import org.apache.paimon.table.sink.RowKeyExtractor;
import org.apache.paimon.table.sink.RowKindGenerator;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.UnawareBucketRowKeyExtractor;
-import org.apache.paimon.table.source.InnerStreamTableScan;
-import org.apache.paimon.table.source.InnerStreamTableScanImpl;
-import org.apache.paimon.table.source.InnerTableScan;
-import org.apache.paimon.table.source.InnerTableScanImpl;
+import org.apache.paimon.table.source.DataTableBatchScan;
+import org.apache.paimon.table.source.DataTableStreamScan;
import org.apache.paimon.table.source.SplitGenerator;
+import org.apache.paimon.table.source.StreamDataTableScan;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.SnapshotReaderImpl;
import
org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
@@ -154,8 +153,9 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
}
@Override
- public InnerTableScan newScan() {
- return new InnerTableScanImpl(
+ public DataTableBatchScan newScan() {
+ return new DataTableBatchScan(
+ bucketMode(),
tableSchema.primaryKeys().size() > 0,
coreOptions(),
newSnapshotReader(),
@@ -163,8 +163,9 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
}
@Override
- public InnerStreamTableScan newStreamScan() {
- return new InnerStreamTableScanImpl(
+ public StreamDataTableScan newStreamScan() {
+ return new DataTableStreamScan(
+ bucketMode(),
coreOptions(),
newSnapshotReader(),
snapshotManager(),
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
index b5bebe2a7..3c56b4b3b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.SnapshotManager;
@@ -29,6 +30,9 @@ import org.apache.paimon.utils.TagManager;
/** A {@link Table} for data. */
public interface DataTable extends InnerTable {
+ @Override
+ DataTableScan newScan();
+
SnapshotReader newSnapshotReader();
CoreOptions coreOptions();
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/InnerTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/InnerTable.java
index 456834ebc..069b8929a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/InnerTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/InnerTable.java
@@ -24,18 +24,18 @@ import org.apache.paimon.table.sink.InnerTableCommit;
import org.apache.paimon.table.sink.InnerTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.sink.StreamWriteBuilderImpl;
-import org.apache.paimon.table.source.InnerStreamTableScan;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.ReadBuilderImpl;
+import org.apache.paimon.table.source.StreamDataTableScan;
/** Inner table for implementation, provide newScan, newRead ... directly. */
public interface InnerTable extends Table {
InnerTableScan newScan();
- InnerStreamTableScan newStreamScan();
+ StreamDataTableScan newStreamScan();
InnerTableRead newRead();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
index dcb62dfcb..dc50a83d9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
@@ -23,7 +23,7 @@ import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.InnerTableCommit;
import org.apache.paimon.table.sink.InnerTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
-import org.apache.paimon.table.source.InnerStreamTableScan;
+import org.apache.paimon.table.source.StreamDataTableScan;
import java.time.Duration;
import java.util.Collections;
@@ -87,7 +87,7 @@ public interface ReadonlyTable extends InnerTable {
}
@Override
- default InnerStreamTableScan newStreamScan() {
+ default StreamDataTableScan newStreamScan() {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support newStreamScan.",
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
similarity index 95%
rename from
paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
rename to
paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index 821f60cbb..66b4dd8e4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -56,41 +56,41 @@ import static
org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** An abstraction layer above {@link FileStoreScan} to provide input split
generation. */
-public abstract class AbstractInnerTableScan implements InnerTableScan {
+public abstract class AbstractDataTableScan implements DataTableScan {
private final CoreOptions options;
protected final SnapshotReader snapshotReader;
- protected AbstractInnerTableScan(CoreOptions options, SnapshotReader
snapshotReader) {
+ protected AbstractDataTableScan(CoreOptions options, SnapshotReader
snapshotReader) {
this.options = options;
this.snapshotReader = snapshotReader;
}
@VisibleForTesting
- public AbstractInnerTableScan withBucket(int bucket) {
+ public AbstractDataTableScan withBucket(int bucket) {
snapshotReader.withBucket(bucket);
return this;
}
@Override
- public AbstractInnerTableScan withBucketFilter(Filter<Integer>
bucketFilter) {
+ public AbstractDataTableScan withBucketFilter(Filter<Integer>
bucketFilter) {
snapshotReader.withBucketFilter(bucketFilter);
return this;
}
@Override
- public AbstractInnerTableScan withPartitionFilter(Map<String, String>
partitionSpec) {
+ public AbstractDataTableScan withPartitionFilter(Map<String, String>
partitionSpec) {
snapshotReader.withPartitionFilter(partitionSpec);
return this;
}
@Override
- public AbstractInnerTableScan withLevelFilter(Filter<Integer> levelFilter)
{
+ public AbstractDataTableScan withLevelFilter(Filter<Integer> levelFilter) {
snapshotReader.withLevelFilter(levelFilter);
return this;
}
- public AbstractInnerTableScan withMetricsRegistry(MetricRegistry
metricsRegistry) {
+ public AbstractDataTableScan withMetricsRegistry(MetricRegistry
metricsRegistry) {
snapshotReader.withMetricRegistry(metricsRegistry);
return this;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
similarity index 89%
rename from
paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
rename to
paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
index b307279d0..92652aab3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.source;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.StartingScanner;
import org.apache.paimon.table.source.snapshot.StartingScanner.ScannedResult;
@@ -31,8 +32,9 @@ import java.util.List;
import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
/** {@link TableScan} implementation for batch planning. */
-public class InnerTableScanImpl extends AbstractInnerTableScan {
+public class DataTableBatchScan extends AbstractDataTableScan {
+ private final BucketMode bucketMode;
private final DefaultValueAssigner defaultValueAssigner;
private StartingScanner startingScanner;
@@ -40,12 +42,14 @@ public class InnerTableScanImpl extends
AbstractInnerTableScan {
private Integer pushDownLimit;
- public InnerTableScanImpl(
+ public DataTableBatchScan(
+ BucketMode bucketMode,
boolean pkTable,
CoreOptions options,
SnapshotReader snapshotReader,
DefaultValueAssigner defaultValueAssigner) {
super(options, snapshotReader);
+ this.bucketMode = bucketMode;
this.hasNext = true;
this.defaultValueAssigner = defaultValueAssigner;
if (pkTable && (options.deletionVectorsEnabled() ||
options.mergeEngine() == FIRST_ROW)) {
@@ -118,4 +122,10 @@ public class InnerTableScanImpl extends
AbstractInnerTableScan {
return 0L;
}
}
+
+ @Override
+ public DataTableScan withShard(int indexOfThisSubtask, int
numberOfParallelSubtasks) {
+ snapshotReader.withShard(bucketMode, indexOfThisSubtask,
numberOfParallelSubtasks);
+ return this;
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableScan.java
similarity index 66%
copy from
paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScan.java
copy to
paimon-core/src/main/java/org/apache/paimon/table/source/DataTableScan.java
index b395cc9a8..29c825b0b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableScan.java
@@ -18,15 +18,9 @@
package org.apache.paimon.table.source;
-import org.apache.paimon.table.source.snapshot.StartingContext;
+/** Table scan for data table. */
+public interface DataTableScan extends InnerTableScan {
-import javax.annotation.Nullable;
-
-/** Streaming {@link InnerTableScan} with {@link StreamTableScan}. */
-public interface InnerStreamTableScan extends InnerTableScan, StreamTableScan {
-
- StartingContext startingContext();
-
- /** Restore from checkpoint next snapshot id with scan kind. */
- void restore(@Nullable Long nextSnapshotId, boolean scanAllSnapshot);
+ /** Specify the shard to be read, and allocate sharded files to read
records. */
+ DataTableScan withShard(int indexOfThisSubtask, int
numberOfParallelSubtasks);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
similarity index 94%
rename from
paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
rename to
paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
index 3e03dc0a1..8aeab36ff 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
@@ -24,6 +24,7 @@ import org.apache.paimon.consumer.Consumer;
import org.apache.paimon.lookup.LookupStrategy;
import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.source.snapshot.AllDeltaFollowUpScanner;
import org.apache.paimon.table.source.snapshot.BoundedChecker;
import
org.apache.paimon.table.source.snapshot.CompactionChangelogFollowUpScanner;
@@ -48,11 +49,11 @@ import javax.annotation.Nullable;
import static org.apache.paimon.CoreOptions.ChangelogProducer.FULL_COMPACTION;
/** {@link StreamTableScan} implementation for streaming planning. */
-public class InnerStreamTableScanImpl extends AbstractInnerTableScan
- implements InnerStreamTableScan {
+public class DataTableStreamScan extends AbstractDataTableScan implements
StreamDataTableScan {
- private static final Logger LOG =
LoggerFactory.getLogger(InnerStreamTableScanImpl.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(DataTableStreamScan.class);
+ private final BucketMode bucketMode;
private final CoreOptions options;
private final SnapshotManager snapshotManager;
private final boolean supportStreamingReadOverwrite;
@@ -68,13 +69,15 @@ public class InnerStreamTableScanImpl extends
AbstractInnerTableScan
@Nullable private Long currentWatermark;
@Nullable private Long nextSnapshotId;
- public InnerStreamTableScanImpl(
+ public DataTableStreamScan(
+ BucketMode bucketMode,
CoreOptions options,
SnapshotReader snapshotReader,
SnapshotManager snapshotManager,
boolean supportStreamingReadOverwrite,
DefaultValueAssigner defaultValueAssigner) {
super(options, snapshotReader);
+ this.bucketMode = bucketMode;
this.options = options;
this.snapshotManager = snapshotManager;
this.supportStreamingReadOverwrite = supportStreamingReadOverwrite;
@@ -87,7 +90,7 @@ public class InnerStreamTableScanImpl extends
AbstractInnerTableScan
}
@Override
- public InnerStreamTableScanImpl withFilter(Predicate predicate) {
+ public DataTableStreamScan withFilter(Predicate predicate) {
snapshotReader.withFilter(defaultValueAssigner.handlePredicate(predicate));
return this;
}
@@ -279,4 +282,10 @@ public class InnerStreamTableScanImpl extends
AbstractInnerTableScan
snapshotReader.consumerManager().resetConsumer(consumerId, new
Consumer(nextSnapshot));
}
}
+
+ @Override
+ public DataTableScan withShard(int indexOfThisSubtask, int
numberOfParallelSubtasks) {
+ snapshotReader.withShard(bucketMode, indexOfThisSubtask,
numberOfParallelSubtasks);
+ return this;
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
index fc7e41569..d0b03c966 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
@@ -122,6 +122,9 @@ public interface ReadBuilder extends Serializable {
/** the row number pushed down. */
ReadBuilder withLimit(int limit);
+ /** Specify the shard to be read, and allocate sharded files to read
records. */
+ ReadBuilder withShard(int indexOfThisSubtask, int
numberOfParallelSubtasks);
+
/** Create a {@link TableScan} to perform batch planning. */
TableScan newScan();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
index 1ceb614ec..8813f9b3f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
@@ -41,6 +41,9 @@ public class ReadBuilderImpl implements ReadBuilder {
private Integer limit = null;
+ private Integer shardIndexOfThisSubtask;
+ private Integer shardNumberOfParallelSubtasks;
+
private Map<String, String> partitionSpec;
public ReadBuilderImpl(InnerTable table) {
@@ -88,10 +91,16 @@ public class ReadBuilderImpl implements ReadBuilder {
return this;
}
+ @Override
+ public ReadBuilder withShard(int indexOfThisSubtask, int
numberOfParallelSubtasks) {
+ this.shardIndexOfThisSubtask = indexOfThisSubtask;
+ this.shardNumberOfParallelSubtasks = numberOfParallelSubtasks;
+ return this;
+ }
+
@Override
public TableScan newScan() {
- InnerTableScan tableScan =
-
table.newScan().withFilter(filter).withPartitionFilter(partitionSpec);
+ InnerTableScan tableScan = configureScan(table.newScan());
if (limit != null) {
tableScan.withLimit(limit);
}
@@ -100,7 +109,22 @@ public class ReadBuilderImpl implements ReadBuilder {
@Override
public StreamTableScan newStreamScan() {
- return (StreamTableScan) table.newStreamScan().withFilter(filter);
+ return (StreamTableScan) configureScan(table.newStreamScan());
+ }
+
+ private InnerTableScan configureScan(InnerTableScan scan) {
+ scan.withFilter(filter).withPartitionFilter(partitionSpec);
+
+ if (shardIndexOfThisSubtask != null) {
+ if (scan instanceof DataTableScan) {
+ ((DataTableScan) scan)
+ .withShard(shardIndexOfThisSubtask,
shardNumberOfParallelSubtasks);
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported table scan type for shard configuring,
the scan is: " + scan);
+ }
+ }
+ return scan;
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScan.java
similarity index 93%
rename from
paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScan.java
rename to
paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScan.java
index b395cc9a8..f5e6e3a0a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScan.java
@@ -23,7 +23,7 @@ import
org.apache.paimon.table.source.snapshot.StartingContext;
import javax.annotation.Nullable;
/** Streaming {@link InnerTableScan} with {@link StreamTableScan}. */
-public interface InnerStreamTableScan extends InnerTableScan, StreamTableScan {
+public interface StreamDataTableScan extends DataTableScan, StreamTableScan {
StartingContext startingContext();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index ac84f4f16..26840ade7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -24,6 +24,7 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.Split;
@@ -66,6 +67,21 @@ public interface SnapshotReader {
SnapshotReader withBucketFilter(Filter<Integer> bucketFilter);
+ SnapshotReader withDataFileNameFilter(Filter<String> fileNameFilter);
+
+ default SnapshotReader withShard(
+ BucketMode bucketMode, int indexOfThisSubtask, int
numberOfParallelSubtasks) {
+ if (bucketMode == BucketMode.BUCKET_UNAWARE) {
+ withDataFileNameFilter(
+ file ->
+ Math.abs(file.hashCode() %
numberOfParallelSubtasks)
+ == indexOfThisSubtask);
+ } else {
+ withBucketFilter(bucket -> bucket % numberOfParallelSubtasks ==
indexOfThisSubtask);
+ }
+ return this;
+ }
+
SnapshotReader withMetricRegistry(MetricRegistry registry);
/** Get splits plan from snapshot. */
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index be91096ab..cc793c641 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -221,6 +221,12 @@ public class SnapshotReaderImpl implements SnapshotReader {
return this;
}
+ @Override
+ public SnapshotReader withDataFileNameFilter(Filter<String>
fileNameFilter) {
+ scan.withDataFileNameFilter(fileNameFilter);
+ return this;
+ }
+
/** Get splits from {@link FileKind#ADD} files. */
@Override
public Plan read() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 7ff7f936e..196b2fa4c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -38,12 +38,13 @@ import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.ReadonlyTable;
import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.InnerStreamTableScan;
+import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.SplitGenerator;
+import org.apache.paimon.table.source.StreamDataTableScan;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.StartingContext;
@@ -133,12 +134,12 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
}
@Override
- public InnerTableScan newScan() {
+ public DataTableScan newScan() {
return new AuditLogBatchScan(dataTable.newScan());
}
@Override
- public InnerStreamTableScan newStreamScan() {
+ public StreamDataTableScan newStreamScan() {
return new AuditLogStreamScan(dataTable.newStreamScan());
}
@@ -275,6 +276,12 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
return this;
}
+ @Override
+ public SnapshotReader withDataFileNameFilter(Filter<String>
fileNameFilter) {
+ snapshotReader.withDataFileNameFilter(fileNameFilter);
+ return this;
+ }
+
@Override
public SnapshotReader withMetricRegistry(MetricRegistry registry) {
snapshotReader.withMetricRegistry(registry);
@@ -307,11 +314,11 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
}
}
- private class AuditLogBatchScan implements InnerTableScan {
+ private class AuditLogBatchScan implements DataTableScan {
- private final InnerTableScan batchScan;
+ private final DataTableScan batchScan;
- private AuditLogBatchScan(InnerTableScan batchScan) {
+ private AuditLogBatchScan(DataTableScan batchScan) {
this.batchScan = batchScan;
}
@@ -360,18 +367,24 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
public List<BinaryRow> listPartitions() {
return batchScan.listPartitions();
}
+
+ @Override
+ public DataTableScan withShard(int indexOfThisSubtask, int
numberOfParallelSubtasks) {
+ batchScan.withShard(indexOfThisSubtask, numberOfParallelSubtasks);
+ return this;
+ }
}
- private class AuditLogStreamScan implements InnerStreamTableScan {
+ private class AuditLogStreamScan implements StreamDataTableScan {
- private final InnerStreamTableScan streamScan;
+ private final StreamDataTableScan streamScan;
- private AuditLogStreamScan(InnerStreamTableScan streamScan) {
+ private AuditLogStreamScan(StreamDataTableScan streamScan) {
this.streamScan = streamScan;
}
@Override
- public InnerStreamTableScan withFilter(Predicate predicate) {
+ public StreamDataTableScan withFilter(Predicate predicate) {
convert(predicate).ifPresent(streamScan::withFilter);
return this;
}
@@ -419,10 +432,16 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
}
@Override
- public InnerStreamTableScan withMetricsRegistry(MetricRegistry
metricsRegistry) {
+ public StreamDataTableScan withMetricsRegistry(MetricRegistry
metricsRegistry) {
streamScan.withMetricsRegistry(metricsRegistry);
return this;
}
+
+ @Override
+ public DataTableScan withShard(int indexOfThisSubtask, int
numberOfParallelSubtasks) {
+ streamScan.withShard(indexOfThisSubtask, numberOfParallelSubtasks);
+ return this;
+ }
}
private class AuditLogRead implements InnerTableRead {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
index 4c9b9a601..7712b1ca2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
@@ -33,10 +33,10 @@ import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.ReadonlyTable;
import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.InnerStreamTableScan;
+import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.table.source.InnerTableRead;
-import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.StreamDataTableScan;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.types.BigIntType;
@@ -149,12 +149,12 @@ public class BucketsTable implements DataTable,
ReadonlyTable {
}
@Override
- public InnerTableScan newScan() {
+ public DataTableScan newScan() {
return wrapped.newScan();
}
@Override
- public InnerStreamTableScan newStreamScan() {
+ public StreamDataTableScan newStreamScan() {
return wrapped.newStreamScan();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
index 7825b93e4..58f4bc56a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
@@ -34,10 +34,10 @@ import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.ReadonlyTable;
import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.InnerStreamTableScan;
+import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.table.source.InnerTableRead;
-import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.StreamDataTableScan;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.types.BigIntType;
@@ -136,12 +136,12 @@ public class FileMonitorTable implements DataTable,
ReadonlyTable {
}
@Override
- public InnerTableScan newScan() {
+ public DataTableScan newScan() {
return wrapped.newScan();
}
@Override
- public InnerStreamTableScan newStreamScan() {
+ public StreamDataTableScan newStreamScan() {
return wrapped.newStreamScan();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
index 0b0b0e586..c88e2c66c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
@@ -26,11 +26,10 @@ import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.ReadonlyTable;
import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.InnerStreamTableScan;
-import org.apache.paimon.table.source.InnerStreamTableScanImpl;
+import org.apache.paimon.table.source.DataTableBatchScan;
+import org.apache.paimon.table.source.DataTableStreamScan;
import org.apache.paimon.table.source.InnerTableRead;
-import org.apache.paimon.table.source.InnerTableScan;
-import org.apache.paimon.table.source.InnerTableScanImpl;
+import org.apache.paimon.table.source.StreamDataTableScan;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BranchManager;
@@ -98,8 +97,9 @@ public class ReadOptimizedTable implements DataTable,
ReadonlyTable {
}
@Override
- public InnerTableScan newScan() {
- return new InnerTableScanImpl(
+ public DataTableBatchScan newScan() {
+ return new DataTableBatchScan(
+ dataTable.bucketMode(),
dataTable.schema().primaryKeys().size() > 0,
coreOptions(),
newSnapshotReader(),
@@ -107,8 +107,9 @@ public class ReadOptimizedTable implements DataTable,
ReadonlyTable {
}
@Override
- public InnerStreamTableScan newStreamScan() {
- return new InnerStreamTableScanImpl(
+ public StreamDataTableScan newStreamScan() {
+ return new DataTableStreamScan(
+ dataTable.bucketMode(),
coreOptions(),
newSnapshotReader(),
snapshotManager(),
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
index c5bd22dc5..129f928c9 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
@@ -42,8 +42,8 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWriteImpl;
-import org.apache.paimon.table.source.InnerStreamTableScan;
import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.StreamDataTableScan;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
@@ -267,7 +267,7 @@ public class OrphanFilesCleanTest {
changelogData.keySet().stream()
.max(Comparator.comparingLong(Long::longValue))
.get();
- InnerStreamTableScan scan = scanTable.newStreamScan();
+ StreamDataTableScan scan = scanTable.newStreamScan();
TreeMap<Long, List<InternalRow>> data = new TreeMap<>(changelogData);
// clear the data < the smallest changelog data.
data.headMap(changelogs.get(0).id()).clear();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
index b569a76a8..5eb0b5db6 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
@@ -491,6 +491,23 @@ public class AppendOnlyFileStoreTableTest extends
FileStoreTableTestBase {
reader.forEachRemaining(row ->
assertThat(row.getString(1).toString()).isEqualTo("b"));
}
+ @Test
+ public void testWithShardAppendTable() throws Exception {
+ FileStoreTable table = createFileStoreTable(conf -> conf.set(BUCKET,
-1));
+ innerTestWithShard(table);
+ }
+
+ @Test
+ public void testWithShardBucketedTable() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(BUCKET, 5);
+ conf.set(BUCKET_KEY, "a");
+ });
+ innerTestWithShard(table);
+ }
+
@Test
public void testBloomFilterForMapField() throws Exception {
RowType rowType =
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index b30dab420..d10c9e8fa 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -386,6 +386,35 @@ public abstract class FileStoreTableTestBase {
assertThat(((DataSplit) splits.get(0)).bucket()).isEqualTo(1);
}
+ protected void innerTestWithShard(FileStoreTable table) throws Exception {
+ StreamTableWrite write = table.newWrite(commitUser);
+ write.write(rowData(1, 1, 2L));
+ write.write(rowData(1, 3, 4L));
+ write.write(rowData(1, 5, 6L));
+ write.write(rowData(1, 7, 8L));
+ write.write(rowData(1, 9, 10L));
+ TableCommitImpl commit = table.newCommit(commitUser);
+ commit.commit(0, write.prepareCommit(true, 0));
+ write.close();
+ commit.close();
+
+ ReadBuilder readBuilder = table.newReadBuilder();
+
+ List<Split> splits = new ArrayList<>();
+ splits.addAll(readBuilder.withShard(0, 3).newScan().plan().splits());
+ splits.addAll(readBuilder.withShard(1, 3).newScan().plan().splits());
+ splits.addAll(readBuilder.withShard(2, 3).newScan().plan().splits());
+
+ assertThat(getResult(readBuilder.newRead(), splits,
BATCH_ROW_TO_STRING))
+ .hasSameElementsAs(
+ Arrays.asList(
+
"1|3|4|binary|varbinary|mapKey:mapVal|multiset",
+
"1|9|10|binary|varbinary|mapKey:mapVal|multiset",
+
"1|1|2|binary|varbinary|mapKey:mapVal|multiset",
+
"1|5|6|binary|varbinary|mapKey:mapVal|multiset",
+
"1|7|8|binary|varbinary|mapKey:mapVal|multiset"));
+ }
+
@Test
public void testAbort() throws Exception {
FileStoreTable table = createFileStoreTable(conf -> conf.set(BUCKET,
1));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index e4a6b1d3f..a952e032f 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -782,6 +782,12 @@ public class PrimaryKeyFileStoreTableTest extends
FileStoreTableTestBase {
"1|10|200|binary|varbinary|mapKey:mapVal|multiset"));
}
+ @Test
+ public void testWithShard() throws Exception {
+ FileStoreTable table = createFileStoreTable(conf -> conf.set(BUCKET,
5));
+ innerTestWithShard(table);
+ }
+
@Test
public void testSlowCommit() throws Exception {
FileStoreTable table = createFileStoreTable();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DefaultValueScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DefaultValueScannerTest.java
index ac2e21118..a99d6b234 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DefaultValueScannerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DefaultValueScannerTest.java
@@ -25,7 +25,7 @@ import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
-import org.apache.paimon.table.source.InnerStreamTableScan;
+import org.apache.paimon.table.source.StreamDataTableScan;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.TableScan;
@@ -43,7 +43,7 @@ public class DefaultValueScannerTest extends ScannerTestBase {
TableRead read = table.newRead();
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
- InnerStreamTableScan scan = table.newStreamScan();
+ StreamDataTableScan scan = table.newStreamScan();
write.write(rowData(1, 10, 101L));
commit.commit(0, write.prepareCommit(true, 0));
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
index 83470fb78..ca61ab4eb 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
@@ -21,8 +21,8 @@ package org.apache.paimon.flink.source;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
import org.apache.paimon.table.BucketMode;
-import org.apache.paimon.table.source.InnerStreamTableScan;
import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.StreamDataTableScan;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.flink.api.connector.source.Boundedness;
@@ -77,7 +77,7 @@ public class ContinuousFileStoreSource extends FlinkSource {
}
StreamTableScan scan = readBuilder.newStreamScan();
if (metricGroup(context) != null) {
- ((InnerStreamTableScan) scan)
+ ((StreamDataTableScan) scan)
.withMetricsRegistry(new
FlinkMetricRegistry(context.metricGroup()));
}
scan.restore(nextSnapshotId);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java
index 7f6c7ac0d..90c283bf8 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java
@@ -26,8 +26,8 @@ import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.InnerStreamTableScan;
import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.StreamDataTableScan;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.utils.SnapshotManager;
@@ -117,7 +117,7 @@ public class LogHybridSourceFactory
StreamTableScan scan = readBuilder.newStreamScan();
// register scan metrics
if (context.metricGroup() != null) {
- ((InnerStreamTableScan) scan)
+ ((StreamDataTableScan) scan)
.withMetricsRegistry(new
FlinkMetricRegistry(context.metricGroup()));
}
splits = splitGenerator.createSplits(scan.plan());
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
index dbce04719..91df04e6d 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
@@ -22,7 +22,7 @@ import org.apache.paimon.CoreOptions
import org.apache.paimon.data.BinaryRow
import org.apache.paimon.spark.SparkTypeUtils
import org.apache.paimon.table.DataTable
-import org.apache.paimon.table.source.{DataSplit, InnerStreamTableScan}
+import org.apache.paimon.table.source.{DataSplit, StreamDataTableScan}
import org.apache.paimon.table.source.TableScan.Plan
import org.apache.paimon.table.source.snapshot.StartingContext
import org.apache.paimon.utils.{InternalRowPartitionComputer, TypeUtils}
@@ -44,7 +44,7 @@ private[spark] trait StreamHelper {
var lastTriggerMillis: Long
- private lazy val streamScan: InnerStreamTableScan = table.newStreamScan()
+ private lazy val streamScan: StreamDataTableScan = table.newStreamScan()
private lazy val partitionSchema: StructType =
SparkTypeUtils.fromPaimonRowType(TypeUtils.project(table.rowType(),
table.partitionKeys()))