This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 44269407b [core] Refactor StreamTableScan interface to spark streaming
read (#1905)
44269407b is described below
commit 44269407b62eee418a1fff1ca7706428ef504c29
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Aug 29 10:58:42 2023 +0800
[core] Refactor StreamTableScan interface to spark streaming read (#1905)
---
.../paimon/operation/AbstractFileStoreScan.java | 16 ++--
.../org/apache/paimon/operation/FileStoreScan.java | 5 +-
.../apache/paimon/table/source/DataFilePlan.java | 9 +--
.../table/source/InnerStreamTableScanImpl.java | 22 +++--
.../paimon/table/source/InnerTableScanImpl.java | 2 +-
.../{SnapshotNotExistPlan.java => RichPlan.java} | 35 +++++---
.../ScanKind.java => table/source/ScanMode.java} | 16 +++-
.../paimon/table/source/SnapshotNotExistPlan.java | 21 ++++-
.../paimon/table/source/StreamTableScan.java | 8 +-
.../source/snapshot/CompactedStartingScanner.java | 4 +-
.../CompactionChangelogFollowUpScanner.java | 4 +-
.../ContinuousAppendAndCompactFollowUpScanner.java | 4 +-
.../ContinuousCompactorFollowUpScanner.java | 4 +-
.../ContinuousFromSnapshotFullStartingScanner.java | 4 +-
.../source/snapshot/DeltaFollowUpScanner.java | 4 +-
.../table/source/snapshot/FullStartingScanner.java | 4 +-
.../snapshot/IncrementalStartingScanner.java | 29 ++++++-
.../snapshot/InputChangelogFollowUpScanner.java | 4 +-
.../table/source/snapshot/SnapshotReader.java | 20 +----
.../table/source/snapshot/SnapshotReaderImpl.java | 27 +++++--
.../table/source/snapshot/StartingScanner.java | 29 +++----
.../StaticFromSnapshotStartingScanner.java | 4 +-
.../snapshot/StaticFromTagStartingScanner.java | 4 +-
.../StaticFromTimestampStartingScanner.java | 4 +-
.../apache/paimon/table/system/AuditLogTable.java | 18 ++---
.../test/java/org/apache/paimon/TestFileStore.java | 6 +-
.../paimon/table/AppendOnlyFileStoreTableTest.java | 6 +-
.../ChangelogValueCountFileStoreTableTest.java | 10 +--
.../table/ChangelogWithKeyFileDataTableTest.java | 10 +--
.../table/ChangelogWithKeyFileStoreTableTest.java | 18 ++---
.../paimon/table/FileDataFilterTestBase.java | 14 ++--
.../paimon/table/source/StartupModeTest.java | 37 +++++----
.../flink/source/operator/MonitorFunction.java | 6 +-
.../source/ContinuousFileSplitEnumeratorTest.java | 93 ++++++++++++++--------
.../source/FileStoreSourceSplitGeneratorTest.java | 6 ++
35 files changed, 302 insertions(+), 205 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 45f6bd153..2ea41bd6b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -32,6 +32,7 @@ import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.stats.FieldStatsArraySerializer;
+import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Filter;
@@ -72,7 +73,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
private Snapshot specifiedSnapshot = null;
private Filter<Integer> bucketFilter = null;
private List<ManifestFileMeta> specifiedManifests = null;
- private ScanKind scanKind = ScanKind.ALL;
+ private ScanMode scanMode = ScanMode.ALL;
private Filter<Integer> levelFilter = null;
private ManifestCacheFilter manifestCacheFilter = null;
@@ -169,8 +170,8 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
}
@Override
- public FileStoreScan withKind(ScanKind scanKind) {
- this.scanKind = scanKind;
+ public FileStoreScan withKind(ScanMode scanMode) {
+ this.scanMode = scanMode;
return this;
}
@@ -207,6 +208,11 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
return readSnapshot == null ? null : readSnapshot.id();
}
+ @Override
+ public ScanMode scanMode() {
+ return scanMode;
+ }
+
@Override
public List<ManifestEntry> files() {
return files;
@@ -274,7 +280,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
}
private List<ManifestFileMeta> readManifests(Snapshot snapshot) {
- switch (scanKind) {
+ switch (scanMode) {
case ALL:
return snapshot.dataManifests(manifestList);
case DELTA:
@@ -294,7 +300,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
"Incremental scan does not accept %s snapshot",
snapshot.commitKind()));
default:
- throw new UnsupportedOperationException("Unknown scan kind " +
scanKind.name());
+ throw new UnsupportedOperationException("Unknown scan kind " +
scanMode.name());
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index 441b6eef7..9fce20710 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -26,6 +26,7 @@ import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.utils.Filter;
import javax.annotation.Nullable;
@@ -55,7 +56,7 @@ public interface FileStoreScan {
FileStoreScan withManifestList(List<ManifestFileMeta> manifests);
- FileStoreScan withKind(ScanKind scanKind);
+ FileStoreScan withKind(ScanMode scanMode);
FileStoreScan withLevelFilter(Filter<Integer> levelFilter);
@@ -77,6 +78,8 @@ public interface FileStoreScan {
@Nullable
Long snapshotId();
+ ScanMode scanMode();
+
/** Result {@link ManifestEntry} files. */
List<ManifestEntry> files();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataFilePlan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataFilePlan.java
index 6c113f2df..d1355f1bb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataFilePlan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataFilePlan.java
@@ -38,10 +38,9 @@ public class DataFilePlan implements TableScan.Plan {
return new ArrayList<>(splits);
}
- public static DataFilePlan fromResult(StartingScanner.Result result) {
- return new DataFilePlan(
- result instanceof StartingScanner.ScannedResult
- ? ((StartingScanner.ScannedResult) result).splits()
- : Collections.emptyList());
+ public static TableScan.Plan fromResult(StartingScanner.Result result) {
+ return result instanceof StartingScanner.ScannedResult
+ ? ((StartingScanner.ScannedResult) result).plan()
+ : new DataFilePlan(Collections.emptyList());
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
index 3cf3d9b10..eb8566b65 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
@@ -55,7 +55,6 @@ public class InnerStreamTableScanImpl extends
AbstractInnerTableScan
private FollowUpScanner followUpScanner;
private BoundedChecker boundedChecker;
private boolean isFullPhaseEnd = false;
- @Nullable private Long currentWatermark;
@Nullable private Long nextSnapshotId;
public InnerStreamTableScanImpl(
@@ -78,7 +77,7 @@ public class InnerStreamTableScanImpl extends
AbstractInnerTableScan
}
@Override
- public Plan plan() {
+ public RichPlan plan() {
if (startingScanner == null) {
startingScanner = createStartingScanner(true);
}
@@ -96,16 +95,15 @@ public class InnerStreamTableScanImpl extends
AbstractInnerTableScan
}
}
- private Plan tryFirstPlan() {
+ private RichPlan tryFirstPlan() {
StartingScanner.Result result = startingScanner.scan(snapshotManager,
snapshotReader);
if (result instanceof ScannedResult) {
ScannedResult scannedResult = (ScannedResult) result;
- currentWatermark = scannedResult.currentWatermark();
long currentSnapshotId = scannedResult.currentSnapshotId();
nextSnapshotId = currentSnapshotId + 1;
isFullPhaseEnd =
boundedChecker.shouldEndInput(snapshotManager.snapshot(currentSnapshotId));
- return DataFilePlan.fromResult(result);
+ return scannedResult.plan();
} else if (result instanceof StartingScanner.NextSnapshot) {
nextSnapshotId = ((StartingScanner.NextSnapshot)
result).nextSnapshotId();
isFullPhaseEnd =
@@ -116,7 +114,7 @@ public class InnerStreamTableScanImpl extends
AbstractInnerTableScan
return SnapshotNotExistPlan.INSTANCE;
}
- private Plan nextPlan() {
+ private RichPlan nextPlan() {
while (true) {
if (isFullPhaseEnd) {
throw new EndOfScanException();
@@ -150,13 +148,11 @@ public class InnerStreamTableScanImpl extends
AbstractInnerTableScan
LOG.debug("Find overwrite snapshot id {}.", nextSnapshotId);
SnapshotReader.Plan overwritePlan =
followUpScanner.getOverwriteChangesPlan(nextSnapshotId, snapshotReader);
- currentWatermark = overwritePlan.watermark();
nextSnapshotId++;
return overwritePlan;
} else if (followUpScanner.shouldScanSnapshot(snapshot)) {
LOG.debug("Find snapshot id {}.", nextSnapshotId);
SnapshotReader.Plan plan =
followUpScanner.scan(nextSnapshotId, snapshotReader);
- currentWatermark = plan.watermark();
nextSnapshotId++;
return plan;
} else {
@@ -218,15 +214,15 @@ public class InnerStreamTableScanImpl extends
AbstractInnerTableScan
return nextSnapshotId;
}
- @Nullable
@Override
- public Long watermark() {
- return currentWatermark;
+ public void restore(@Nullable Long nextSnapshotId) {
+ this.nextSnapshotId = nextSnapshotId;
}
@Override
- public void restore(@Nullable Long nextSnapshotId) {
- this.nextSnapshotId = nextSnapshotId;
+ public void restore(@Nullable Long nextSnapshotId, ScanMode scanMode) {
+ restore(nextSnapshotId);
+ snapshotReader.withMode(scanMode);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
index ea19a1ab9..dc01bd429 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
@@ -52,7 +52,7 @@ public class InnerTableScanImpl extends
AbstractInnerTableScan {
}
@Override
- public DataFilePlan plan() {
+ public TableScan.Plan plan() {
if (startingScanner == null) {
startingScanner = createStartingScanner(false);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/SnapshotNotExistPlan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/RichPlan.java
similarity index 63%
copy from
paimon-core/src/main/java/org/apache/paimon/table/source/SnapshotNotExistPlan.java
copy to paimon-core/src/main/java/org/apache/paimon/table/source/RichPlan.java
index df58c8067..e592f4648 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/SnapshotNotExistPlan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/RichPlan.java
@@ -18,19 +18,30 @@
package org.apache.paimon.table.source;
-import java.util.Collections;
-import java.util.List;
+import org.apache.paimon.annotation.Public;
-/** This is used to distinguish the case where the snapshot does not exist and
the plan is empty. */
-public class SnapshotNotExistPlan implements TableScan.Plan {
- public static final SnapshotNotExistPlan INSTANCE = new
SnapshotNotExistPlan();
+import javax.annotation.Nullable;
- private SnapshotNotExistPlan() {
- // private
- }
+/**
+ * Rich Plan of scan.
+ *
+ * @since 0.6.0
+ */
+@Public
+public interface RichPlan extends TableScan.Plan {
+
+ /** Current watermark for consumed snapshot. */
+ @Nullable
+ Long watermark();
+
+ /**
+ * Snapshot id of this plan.
+ *
+ * @return null if the table is empty.
+ */
+ @Nullable
+ Long snapshotId();
- @Override
- public List<Split> splits() {
- return Collections.emptyList();
- }
+ /** Scan which part of the snapshot. */
+ ScanMode scanMode();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/ScanKind.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/ScanMode.java
similarity index 84%
rename from paimon-core/src/main/java/org/apache/paimon/operation/ScanKind.java
rename to paimon-core/src/main/java/org/apache/paimon/table/source/ScanMode.java
index 7d0818aaa..10bc47c83 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/ScanKind.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ScanMode.java
@@ -16,14 +16,24 @@
* limitations under the License.
*/
-package org.apache.paimon.operation;
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.annotation.Public;
+
+/**
+ * Scan which part of the snapshot.
+ *
+ * @since 0.6.0
+ */
+@Public
+public enum ScanMode {
-/** Scan which part of the snapshot. */
-public enum ScanKind {
/** Scan complete data files of a snapshot. */
ALL,
+
/** Only scan newly changed files of a snapshot. */
DELTA,
+
/** Only scan changelog files of a snapshot. */
CHANGELOG
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/SnapshotNotExistPlan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/SnapshotNotExistPlan.java
index df58c8067..358a6ce7e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/SnapshotNotExistPlan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/SnapshotNotExistPlan.java
@@ -18,11 +18,13 @@
package org.apache.paimon.table.source;
+import javax.annotation.Nullable;
+
import java.util.Collections;
import java.util.List;
/** This is used to distinguish the case where the snapshot does not exist and
the plan is empty. */
-public class SnapshotNotExistPlan implements TableScan.Plan {
+public class SnapshotNotExistPlan implements RichPlan {
public static final SnapshotNotExistPlan INSTANCE = new
SnapshotNotExistPlan();
private SnapshotNotExistPlan() {
@@ -33,4 +35,21 @@ public class SnapshotNotExistPlan implements TableScan.Plan {
public List<Split> splits() {
return Collections.emptyList();
}
+
+ @Nullable
+ @Override
+ public Long watermark() {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public Long snapshotId() {
+ return null;
+ }
+
+ @Override
+ public ScanMode scanMode() {
+ throw new UnsupportedOperationException();
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
index c0a900bcf..869937137 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
@@ -33,14 +33,16 @@ import javax.annotation.Nullable;
@Public
public interface StreamTableScan extends TableScan, Restorable<Long> {
- /** Current watermark for consumed snapshot. */
- @Nullable
- Long watermark();
+ @Override
+ RichPlan plan();
/** Restore from checkpoint next snapshot id. */
@Override
void restore(@Nullable Long nextSnapshotId);
+ /** Restore from checkpoint next snapshot id with scan kind. */
+ void restore(@Nullable Long nextSnapshotId, ScanMode scanMode);
+
/** Checkpoint to return next snapshot id. */
@Nullable
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
index 0f89c5b42..a1d9d90a0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
@@ -20,7 +20,7 @@ package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
-import org.apache.paimon.operation.ScanKind;
+import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
@@ -49,7 +49,7 @@ public class CompactedStartingScanner implements
StartingScanner {
}
return StartingScanner.fromPlan(
-
snapshotReader.withKind(ScanKind.ALL).withSnapshot(startingSnapshotId).read());
+
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId).read());
}
@Nullable
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScanner.java
index 05a23d1be..b74052822 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScanner.java
@@ -20,7 +20,7 @@ package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
-import org.apache.paimon.operation.ScanKind;
+import org.apache.paimon.table.source.ScanMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,6 +49,6 @@ public class CompactionChangelogFollowUpScanner implements
FollowUpScanner {
@Override
public SnapshotReader.Plan scan(long snapshotId, SnapshotReader
snapshotReader) {
- return
snapshotReader.withKind(ScanKind.CHANGELOG).withSnapshot(snapshotId).read();
+ return
snapshotReader.withMode(ScanMode.CHANGELOG).withSnapshot(snapshotId).read();
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousAppendAndCompactFollowUpScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousAppendAndCompactFollowUpScanner.java
index 7824c75f0..342015612 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousAppendAndCompactFollowUpScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousAppendAndCompactFollowUpScanner.java
@@ -19,7 +19,7 @@
package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.Snapshot;
-import org.apache.paimon.operation.ScanKind;
+import org.apache.paimon.table.source.ScanMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,6 +49,6 @@ public class ContinuousAppendAndCompactFollowUpScanner
implements FollowUpScanne
@Override
public SnapshotReader.Plan scan(long snapshotId, SnapshotReader
snapshotReader) {
- return
snapshotReader.withKind(ScanKind.DELTA).withSnapshot(snapshotId).read();
+ return
snapshotReader.withMode(ScanMode.DELTA).withSnapshot(snapshotId).read();
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScanner.java
index caaf40ddf..7564d5372 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScanner.java
@@ -19,7 +19,7 @@
package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.Snapshot;
-import org.apache.paimon.operation.ScanKind;
+import org.apache.paimon.table.source.ScanMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +45,6 @@ public class ContinuousCompactorFollowUpScanner implements
FollowUpScanner {
@Override
public SnapshotReader.Plan scan(long snapshotId, SnapshotReader
snapshotReader) {
- return
snapshotReader.withKind(ScanKind.DELTA).withSnapshot(snapshotId).read();
+ return
snapshotReader.withMode(ScanMode.DELTA).withSnapshot(snapshotId).read();
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotFullStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotFullStartingScanner.java
index d524b7883..0a1a2c195 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotFullStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotFullStartingScanner.java
@@ -19,7 +19,7 @@
package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.operation.ScanKind;
+import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.utils.SnapshotManager;
/**
@@ -41,6 +41,6 @@ public class ContinuousFromSnapshotFullStartingScanner
implements StartingScanne
}
long ceiledSnapshotId = Math.max(snapshotId, earliestSnapshotId);
return StartingScanner.fromPlan(
-
snapshotReader.withKind(ScanKind.ALL).withSnapshot(ceiledSnapshotId).read());
+
snapshotReader.withMode(ScanMode.ALL).withSnapshot(ceiledSnapshotId).read());
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScanner.java
index edcbada93..48d8d03fd 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScanner.java
@@ -20,7 +20,7 @@ package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
-import org.apache.paimon.operation.ScanKind;
+import org.apache.paimon.table.source.ScanMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +45,6 @@ public class DeltaFollowUpScanner implements FollowUpScanner {
@Override
public SnapshotReader.Plan scan(long snapshotId, SnapshotReader
snapshotReader) {
- return
snapshotReader.withKind(ScanKind.DELTA).withSnapshot(snapshotId).read();
+ return
snapshotReader.withMode(ScanMode.DELTA).withSnapshot(snapshotId).read();
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
index b331f2a21..a03a707be 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
@@ -19,7 +19,7 @@
package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.operation.ScanKind;
+import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
@@ -38,6 +38,6 @@ public class FullStartingScanner implements StartingScanner {
return new NoSnapshot();
}
return StartingScanner.fromPlan(
-
snapshotReader.withKind(ScanKind.ALL).withSnapshot(startingSnapshotId).read());
+
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId).read());
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
index e3ba10691..fbdf8a858 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
@@ -22,8 +22,9 @@ import org.apache.paimon.Snapshot;
import org.apache.paimon.Snapshot.CommitKind;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.operation.ScanKind;
import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.table.source.Split;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;
@@ -77,7 +78,29 @@ public class IncrementalStartingScanner implements
StartingScanner {
}
}
- return new ScannedResult(end, null, result);
+ return StartingScanner.fromPlan(
+ new SnapshotReader.Plan() {
+ @Override
+ public Long watermark() {
+ return null;
+ }
+
+ @Override
+ public Long snapshotId() {
+ return end;
+ }
+
+ @Override
+ public ScanMode scanMode() {
+ // TODO introduce a new mode
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<Split> splits() {
+ return (List) result;
+ }
+ });
}
@SuppressWarnings({"unchecked", "rawtypes"})
@@ -86,6 +109,6 @@ public class IncrementalStartingScanner implements
StartingScanner {
// ignore COMPACT and OVERWRITE
return Collections.emptyList();
}
- return (List)
reader.withSnapshot(s).withKind(ScanKind.DELTA).read().splits();
+ return (List)
reader.withSnapshot(s).withMode(ScanMode.DELTA).read().splits();
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScanner.java
index 4b188f6fe..adb49c4d0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScanner.java
@@ -20,7 +20,7 @@ package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
-import org.apache.paimon.operation.ScanKind;
+import org.apache.paimon.table.source.ScanMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +45,6 @@ public class InputChangelogFollowUpScanner implements
FollowUpScanner {
@Override
public SnapshotReader.Plan scan(long snapshotId, SnapshotReader
snapshotReader) {
- return
snapshotReader.withKind(ScanKind.CHANGELOG).withSnapshot(snapshotId).read();
+ return
snapshotReader.withMode(ScanMode.CHANGELOG).withSnapshot(snapshotId).read();
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index 0cee60086..7cba985f8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -21,17 +21,15 @@ package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.Snapshot;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.operation.ScanKind;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.RichPlan;
+import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.SplitGenerator;
-import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.SnapshotManager;
-import javax.annotation.Nullable;
-
import java.util.List;
/** Read splits from specified {@link Snapshot} with given configuration. */
@@ -49,7 +47,7 @@ public interface SnapshotReader {
SnapshotReader withFilter(Predicate predicate);
- SnapshotReader withKind(ScanKind scanKind);
+ SnapshotReader withMode(ScanMode scanMode);
SnapshotReader withLevelFilter(Filter<Integer> levelFilter);
@@ -69,17 +67,7 @@ public interface SnapshotReader {
List<BinaryRow> partitions();
/** Result plan of this scan. */
- interface Plan extends TableScan.Plan {
-
- @Nullable
- Long watermark();
-
- /**
- * Snapshot id of this plan, return null if the table is empty or the
manifest list is
- * specified.
- */
- @Nullable
- Long snapshotId();
+ interface Plan extends RichPlan {
/** Result splits. */
List<Split> splits();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 99d2b7291..b335524d5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -30,11 +30,11 @@ import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.operation.FileStoreScan;
-import org.apache.paimon.operation.ScanKind;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.utils.Filter;
@@ -69,7 +69,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
private final BiConsumer<FileStoreScan, Predicate>
nonPartitionFilterConsumer;
private final DefaultValueAssigner defaultValueAssigner;
- private ScanKind scanKind = ScanKind.ALL;
+ private ScanMode scanMode = ScanMode.ALL;
private RecordComparator lazyPartitionComparator;
public SnapshotReaderImpl(
@@ -149,9 +149,9 @@ public class SnapshotReaderImpl implements SnapshotReader {
}
@Override
- public SnapshotReader withKind(ScanKind scanKind) {
- this.scanKind = scanKind;
- scan.withKind(scanKind);
+ public SnapshotReader withMode(ScanMode scanMode) {
+ this.scanMode = scanMode;
+ scan.withKind(scanMode);
return this;
}
@@ -191,7 +191,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
List<DataSplit> splits =
generateSplits(
snapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID - 1 :
snapshotId,
- scanKind != ScanKind.ALL,
+ scanMode != ScanMode.ALL,
splitGenerator,
files);
return new Plan() {
@@ -207,6 +207,11 @@ public class SnapshotReaderImpl implements SnapshotReader {
return plan.snapshotId();
}
+ @Override
+ public ScanMode scanMode() {
+ return plan.scanMode();
+ }
+
@Override
public List<Split> splits() {
return (List) splits;
@@ -234,7 +239,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
/** Get splits from an overwritten snapshot files. */
@Override
public Plan readOverwrittenChanges() {
- withKind(ScanKind.DELTA);
+ withMode(ScanMode.DELTA);
FileStoreScan.Plan plan = scan.plan();
long snapshotId = plan.snapshotId();
@@ -309,6 +314,12 @@ public class SnapshotReaderImpl implements SnapshotReader {
return plan.snapshotId();
}
+ @Override
+ public ScanMode scanMode() {
+ // TODO introduce a new mode
+ throw new UnsupportedOperationException();
+ }
+
@Override
public List<Split> splits() {
return (List) splits;
@@ -318,7 +329,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
@Override
public Plan readIncrementalDiff(Snapshot before) {
- withKind(ScanKind.ALL);
+ withMode(ScanMode.ALL);
FileStoreScan.Plan plan = scan.plan();
Map<BinaryRow, Map<Integer, List<DataFileMeta>>> dataFiles =
groupByPartFiles(plan.files(FileKind.ADD));
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
index 807930be7..af44a8b8a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
@@ -22,8 +22,6 @@ import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.utils.SnapshotManager;
-import javax.annotation.Nullable;
-
import java.util.List;
/** Helper class for the first planning of {@link TableScan}. */
@@ -38,33 +36,28 @@ public interface StartingScanner {
class NoSnapshot implements Result {}
static ScannedResult fromPlan(SnapshotReader.Plan plan) {
- return new ScannedResult(plan.snapshotId(), plan.watermark(), (List)
plan.splits());
+ return new ScannedResult(plan);
}
/** Result with scanned snapshot. Next snapshot should be the current
snapshot plus 1. */
class ScannedResult implements Result {
- private final long currentSnapshotId;
- @Nullable private final Long currentWatermark;
- private final List<DataSplit> splits;
-
- public ScannedResult(
- long currentSnapshotId, @Nullable Long currentWatermark,
List<DataSplit> splits) {
- this.currentSnapshotId = currentSnapshotId;
- this.currentWatermark = currentWatermark;
- this.splits = splits;
+
+ private final SnapshotReader.Plan plan;
+
+ public ScannedResult(SnapshotReader.Plan plan) {
+ this.plan = plan;
}
public long currentSnapshotId() {
- return currentSnapshotId;
+ return plan.snapshotId();
}
- @Nullable
- public Long currentWatermark() {
- return currentWatermark;
+ public List<DataSplit> splits() {
+ return (List) plan.splits();
}
- public List<DataSplit> splits() {
- return splits;
+ public SnapshotReader.Plan plan() {
+ return plan;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
index cc4f2907b..1157e22d2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
@@ -19,7 +19,7 @@
package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.operation.ScanKind;
+import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.utils.SnapshotManager;
/**
@@ -40,6 +40,6 @@ public class StaticFromSnapshotStartingScanner implements
StartingScanner {
return new NoSnapshot();
}
return StartingScanner.fromPlan(
-
snapshotReader.withKind(ScanKind.ALL).withSnapshot(snapshotId).read());
+
snapshotReader.withMode(ScanMode.ALL).withSnapshot(snapshotId).read());
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java
index 13c5e95cd..b402851a1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java
@@ -20,7 +20,7 @@ package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
-import org.apache.paimon.operation.ScanKind;
+import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
@@ -40,6 +40,6 @@ public class StaticFromTagStartingScanner implements
StartingScanner {
Snapshot snapshot = tagManager.taggedSnapshot(tagName);
return StartingScanner.fromPlan(
-
snapshotReader.withKind(ScanKind.ALL).withSnapshot(snapshot).read());
+
snapshotReader.withMode(ScanMode.ALL).withSnapshot(snapshot).read());
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
index 34b0f0974..315b749df 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
@@ -20,7 +20,7 @@ package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
-import org.apache.paimon.operation.ScanKind;
+import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
@@ -53,7 +53,7 @@ public class StaticFromTimestampStartingScanner implements
StartingScanner {
return new NoSnapshot();
}
return StartingScanner.fromPlan(
-
snapshotReader.withKind(ScanKind.ALL).withSnapshot(startingSnapshot.id()).read());
+
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshot.id()).read());
}
@Nullable
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index ba8a4b4cc..e8e90df1b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -27,7 +27,6 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
-import org.apache.paimon.operation.ScanKind;
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
@@ -40,6 +39,8 @@ import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.InnerStreamTableScan;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.RichPlan;
+import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.table.source.TableRead;
@@ -225,8 +226,8 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
return this;
}
- public SnapshotReader withKind(ScanKind scanKind) {
- snapshotReader.withKind(scanKind);
+ public SnapshotReader withMode(ScanMode scanMode) {
+ snapshotReader.withMode(scanMode);
return this;
}
@@ -307,7 +308,7 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
}
@Override
- public Plan plan() {
+ public RichPlan plan() {
return streamScan.plan();
}
@@ -322,15 +323,14 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
return streamScan.checkpoint();
}
- @Nullable
@Override
- public Long watermark() {
- return streamScan.watermark();
+ public void restore(@Nullable Long nextSnapshotId) {
+ streamScan.restore(nextSnapshotId);
}
@Override
- public void restore(@Nullable Long nextSnapshotId) {
- streamScan.restore(nextSnapshotId);
+ public void restore(@Nullable Long nextSnapshotId, ScanMode scanMode) {
+ streamScan.restore(nextSnapshotId, scanMode);
}
@Override
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 9b5441fd5..f545f7a28 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -39,7 +39,6 @@ import org.apache.paimon.operation.FileStoreCommitImpl;
import org.apache.paimon.operation.FileStoreExpireImpl;
import org.apache.paimon.operation.FileStoreRead;
import org.apache.paimon.operation.FileStoreScan;
-import org.apache.paimon.operation.ScanKind;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReaderIterator;
@@ -47,6 +46,7 @@ import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.FileStorePathFactory;
@@ -339,8 +339,8 @@ public class TestFileStore extends KeyValueFileStore {
.withKind(
options.changelogProducer()
==
CoreOptions.ChangelogProducer.NONE
- ? ScanKind.DELTA
- : ScanKind.CHANGELOG)
+ ? ScanMode.DELTA
+ : ScanMode.CHANGELOG)
.withSnapshot(snapshotId)
.plan()
.files();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
index 2272b7af3..5c99df4aa 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
@@ -25,7 +25,6 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.operation.ScanKind;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
@@ -38,6 +37,7 @@ import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableRead;
@@ -213,7 +213,7 @@ public class AppendOnlyFileStoreTableTest extends
FileStoreTableTestBase {
FileStoreTable table = createFileStoreTable();
List<Split> splits =
-
toSplits(table.newSnapshotReader().withKind(ScanKind.DELTA).read().dataSplits());
+
toSplits(table.newSnapshotReader().withMode(ScanMode.DELTA).read().dataSplits());
TableRead read = table.newRead().withProjection(PROJECTION);
assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_PROJECTED_ROW_TO_STRING))
.isEqualTo(Arrays.asList("+101|11", "+102|12"));
@@ -231,7 +231,7 @@ public class AppendOnlyFileStoreTableTest extends
FileStoreTableTestBase {
List<Split> splits =
toSplits(
table.newSnapshotReader()
- .withKind(ScanKind.DELTA)
+ .withMode(ScanMode.DELTA)
.withFilter(predicate)
.read()
.dataSplits());
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogValueCountFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogValueCountFileStoreTableTest.java
index 59bec0dec..6fd275083 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogValueCountFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogValueCountFileStoreTableTest.java
@@ -24,7 +24,6 @@ import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFilePathFactory;
-import org.apache.paimon.operation.ScanKind;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
@@ -34,6 +33,7 @@ import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.RowKind;
@@ -109,7 +109,7 @@ public class ChangelogValueCountFileStoreTableTest extends
FileStoreTableTestBas
FileStoreTable table = createFileStoreTable();
List<Split> splits =
-
toSplits(table.newSnapshotReader().withKind(ScanKind.DELTA).read().dataSplits());
+
toSplits(table.newSnapshotReader().withMode(ScanMode.DELTA).read().dataSplits());
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_ROW_TO_STRING))
.isEqualTo(
@@ -130,7 +130,7 @@ public class ChangelogValueCountFileStoreTableTest extends
FileStoreTableTestBas
FileStoreTable table = createFileStoreTable();
List<Split> splits =
-
toSplits(table.newSnapshotReader().withKind(ScanKind.DELTA).read().dataSplits());
+
toSplits(table.newSnapshotReader().withMode(ScanMode.DELTA).read().dataSplits());
TableRead read = table.newRead().withProjection(PROJECTION);
assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_PROJECTED_ROW_TO_STRING))
.isEqualTo(Arrays.asList("-100|10", "+101|11"));
@@ -148,7 +148,7 @@ public class ChangelogValueCountFileStoreTableTest extends
FileStoreTableTestBas
List<Split> splits =
toSplits(
table.newSnapshotReader()
- .withKind(ScanKind.DELTA)
+ .withMode(ScanMode.DELTA)
.withFilter(predicate)
.read()
.dataSplits());
@@ -203,7 +203,7 @@ public class ChangelogValueCountFileStoreTableTest extends
FileStoreTableTestBas
// check that no data file is produced
List<Split> splits =
-
toSplits(table.newSnapshotReader().withKind(ScanKind.DELTA).read().dataSplits());
+
toSplits(table.newSnapshotReader().withMode(ScanMode.DELTA).read().dataSplits());
assertThat(splits).isEmpty();
// check that no changelog file is produced
Path bucketPath = DataFilePathFactory.bucketPath(table.location(),
"1", 0);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileDataTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileDataTableTest.java
index b10fadf61..565262f39 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileDataTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileDataTableTest.java
@@ -20,10 +20,10 @@ package org.apache.paimon.table;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.WriteMode;
-import org.apache.paimon.operation.ScanKind;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.RowType;
@@ -153,7 +153,7 @@ public class ChangelogWithKeyFileDataTableTest extends
FileDataFilterTestBase {
List<Split> splits =
toSplits(
table.newSnapshotReader()
- .withKind(ScanKind.DELTA)
+ .withMode(ScanMode.DELTA)
.read()
.dataSplits());
// filter with "b" = 15 in schema0
@@ -174,7 +174,7 @@ public class ChangelogWithKeyFileDataTableTest extends
FileDataFilterTestBase {
List<Split> splits =
toSplits(
table.newSnapshotReader()
- .withKind(ScanKind.DELTA)
+ .withMode(ScanMode.DELTA)
.read()
.dataSplits());
@@ -212,7 +212,7 @@ public class ChangelogWithKeyFileDataTableTest extends
FileDataFilterTestBase {
List<Split> splits =
toSplits(
table.newSnapshotReader()
- .withKind(ScanKind.DELTA)
+ .withMode(ScanMode.DELTA)
.read()
.dataSplits());
// filter with "kt" = 116 in schema0
@@ -230,7 +230,7 @@ public class ChangelogWithKeyFileDataTableTest extends
FileDataFilterTestBase {
List<Split> splits =
toSplits(
table.newSnapshotReader()
- .withKind(ScanKind.DELTA)
+ .withMode(ScanMode.DELTA)
.read()
.dataSplits());
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
index ce901586c..278305c38 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
@@ -28,7 +28,6 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.operation.ScanKind;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
@@ -48,6 +47,7 @@ import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableRead;
@@ -253,7 +253,7 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
FileStoreTable table = createFileStoreTable();
List<Split> splits =
-
toSplits(table.newSnapshotReader().withKind(ScanKind.DELTA).read().dataSplits());
+
toSplits(table.newSnapshotReader().withMode(ScanMode.DELTA).read().dataSplits());
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_ROW_TO_STRING))
.isEqualTo(
@@ -273,7 +273,7 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
FileStoreTable table = createFileStoreTable();
List<Split> splits =
-
toSplits(table.newSnapshotReader().withKind(ScanKind.DELTA).read().dataSplits());
+
toSplits(table.newSnapshotReader().withMode(ScanMode.DELTA).read().dataSplits());
TableRead read = table.newRead().withProjection(PROJECTION);
assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_PROJECTED_ROW_TO_STRING))
@@ -292,7 +292,7 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
List<Split> splits =
toSplits(
table.newSnapshotReader()
- .withKind(ScanKind.DELTA)
+ .withMode(ScanMode.DELTA)
.withFilter(predicate)
.read()
.dataSplits());
@@ -328,7 +328,7 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
List<Split> splits =
toSplits(
-
table.newSnapshotReader().withKind(ScanKind.CHANGELOG).read().dataSplits());
+
table.newSnapshotReader().withMode(ScanMode.CHANGELOG).read().dataSplits());
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0,
CHANGELOG_ROW_TO_STRING))
.containsExactlyInAnyOrder(
@@ -377,7 +377,7 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
List<Split> splits =
toSplits(
-
table.newSnapshotReader().withKind(ScanKind.CHANGELOG).read().dataSplits());
+
table.newSnapshotReader().withMode(ScanMode.CHANGELOG).read().dataSplits());
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0,
CHANGELOG_ROW_TO_STRING))
.containsExactlyInAnyOrder(
@@ -400,7 +400,7 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
splits =
toSplits(
-
table.newSnapshotReader().withKind(ScanKind.CHANGELOG).read().dataSplits());
+
table.newSnapshotReader().withMode(ScanMode.CHANGELOG).read().dataSplits());
assertThat(getResult(read, splits, binaryRow(1), 0,
CHANGELOG_ROW_TO_STRING))
.containsExactlyInAnyOrder("+I
1|30|130|binary|varbinary|mapKey:mapVal|multiset");
assertThat(getResult(read, splits, binaryRow(2), 0,
CHANGELOG_ROW_TO_STRING))
@@ -426,7 +426,7 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
splits =
toSplits(
-
table.newSnapshotReader().withKind(ScanKind.CHANGELOG).read().dataSplits());
+
table.newSnapshotReader().withMode(ScanMode.CHANGELOG).read().dataSplits());
assertThat(getResult(read, splits, binaryRow(1), 0,
CHANGELOG_ROW_TO_STRING))
.containsExactlyInAnyOrder(
"-D 1|20|120|binary|varbinary|mapKey:mapVal|multiset",
@@ -709,7 +709,7 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
write.write(rowData(1, 20, 200L));
commit.commit(0, write.prepareCommit(true, 0));
- SnapshotReader snapshotReader =
table.newSnapshotReader().withKind(ScanKind.DELTA);
+ SnapshotReader snapshotReader =
table.newSnapshotReader().withMode(ScanMode.DELTA);
List<DataSplit> splits0 = snapshotReader.read().dataSplits();
assertThat(splits0).hasSize(1);
assertThat(splits0.get(0).dataFiles()).hasSize(1);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/FileDataFilterTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/FileDataFilterTestBase.java
index a68f2594c..039a968cb 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/FileDataFilterTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/FileDataFilterTestBase.java
@@ -19,12 +19,12 @@
package org.apache.paimon.table;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.operation.ScanKind;
import org.apache.paimon.predicate.Equal;
import org.apache.paimon.predicate.IsNull;
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.DataTypes;
@@ -341,7 +341,7 @@ public abstract class FileDataFilterTestBase extends
SchemaEvolutionTableTestBas
List<Split> splits =
toSplits(
table.newSnapshotReader()
- .withKind(ScanKind.DELTA)
+ .withMode(ScanMode.DELTA)
.read()
.dataSplits());
TableRead read = table.newRead();
@@ -359,7 +359,7 @@ public abstract class FileDataFilterTestBase extends
SchemaEvolutionTableTestBas
List<Split> splits =
toSplits(
table.newSnapshotReader()
- .withKind(ScanKind.DELTA)
+ .withMode(ScanMode.DELTA)
.read()
.dataSplits());
@@ -384,7 +384,7 @@ public abstract class FileDataFilterTestBase extends
SchemaEvolutionTableTestBas
List<Split> splits =
toSplits(
table.newSnapshotReader()
- .withKind(ScanKind.DELTA)
+ .withMode(ScanMode.DELTA)
.read()
.dataSplits());
// project "c", "b", "pt" in schema0
@@ -400,7 +400,7 @@ public abstract class FileDataFilterTestBase extends
SchemaEvolutionTableTestBas
List<Split> splits =
toSplits(
table.newSnapshotReader()
- .withKind(ScanKind.DELTA)
+ .withMode(ScanMode.DELTA)
.read()
.dataSplits());
@@ -424,7 +424,7 @@ public abstract class FileDataFilterTestBase extends
SchemaEvolutionTableTestBas
List<Split> splits =
toSplits(
table.newSnapshotReader()
- .withKind(ScanKind.DELTA)
+ .withMode(ScanMode.DELTA)
.read()
.dataSplits());
// filter with "b" = 15 in schema0
@@ -442,7 +442,7 @@ public abstract class FileDataFilterTestBase extends
SchemaEvolutionTableTestBas
List<Split> splits =
toSplits(
table.newSnapshotReader()
- .withKind(ScanKind.DELTA)
+ .withMode(ScanMode.DELTA)
.read()
.dataSplits());
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java
index 724d255bb..9ce46e77b 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java
@@ -22,7 +22,6 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.Path;
-import org.apache.paimon.operation.ScanKind;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.StreamTableCommit;
@@ -76,13 +75,13 @@ public class StartupModeTest extends ScannerTestBase {
writeAndCommit(4, rowData(1, 10, 103L));
TableScan.Plan thirdPlan = dataTableScan.plan();
assertThat(thirdPlan.splits())
-
.isEqualTo(snapshotReader.withSnapshot(4).withKind(ScanKind.DELTA).read().splits());
+
.isEqualTo(snapshotReader.withSnapshot(4).withMode(ScanMode.DELTA).read().splits());
// batch mode
TableScan batchScan = table.newScan();
TableScan.Plan plan = batchScan.plan();
assertThat(plan.splits())
-
.isEqualTo(snapshotReader.withSnapshot(4).withKind(ScanKind.ALL).read().splits());
+
.isEqualTo(snapshotReader.withSnapshot(4).withMode(ScanMode.ALL).read().splits());
}
@Test
@@ -96,20 +95,20 @@ public class StartupModeTest extends ScannerTestBase {
TableScan.Plan secondPlan = dataTableScan.plan();
assertThat(firstPlan.splits())
-
.isEqualTo(snapshotReader.withSnapshot(3).withKind(ScanKind.ALL).read().splits());
+
.isEqualTo(snapshotReader.withSnapshot(3).withMode(ScanMode.ALL).read().splits());
assertThat(secondPlan.splits()).isEmpty();
// write next data
writeAndCommit(4, rowData(1, 10, 103L));
TableScan.Plan thirdPlan = dataTableScan.plan();
assertThat(thirdPlan.splits())
-
.isEqualTo(snapshotReader.withSnapshot(4).withKind(ScanKind.DELTA).read().splits());
+
.isEqualTo(snapshotReader.withSnapshot(4).withMode(ScanMode.DELTA).read().splits());
// batch mode
TableScan batchScan = table.newScan();
TableScan.Plan plan = batchScan.plan();
assertThat(plan.splits())
-
.isEqualTo(snapshotReader.withSnapshot(4).withKind(ScanKind.ALL).read().splits());
+
.isEqualTo(snapshotReader.withSnapshot(4).withMode(ScanMode.ALL).read().splits());
}
@Test
@@ -135,13 +134,13 @@ public class StartupModeTest extends ScannerTestBase {
assertThat(firstPlan.splits()).isEmpty();
assertThat(secondPlan.splits())
-
.isEqualTo(snapshotReader.withSnapshot(4).withKind(ScanKind.DELTA).read().splits());
+
.isEqualTo(snapshotReader.withSnapshot(4).withMode(ScanMode.DELTA).read().splits());
// batch mode
TableScan batchScan = readTable.newScan();
TableScan.Plan plan = batchScan.plan();
assertThat(plan.splits())
-
.isEqualTo(snapshotReader.withSnapshot(3).withKind(ScanKind.ALL).read().splits());
+
.isEqualTo(snapshotReader.withSnapshot(3).withMode(ScanMode.ALL).read().splits());
}
@Test
@@ -159,15 +158,15 @@ public class StartupModeTest extends ScannerTestBase {
TableScan.Plan secondPlan = dataTableScan.plan();
assertThat(firstPlan.splits())
-
.isEqualTo(snapshotReader.withSnapshot(4).withKind(ScanKind.ALL).read().splits());
+
.isEqualTo(snapshotReader.withSnapshot(4).withMode(ScanMode.ALL).read().splits());
assertThat(secondPlan.splits())
-
.isEqualTo(snapshotReader.withSnapshot(5).withKind(ScanKind.DELTA).read().splits());
+
.isEqualTo(snapshotReader.withSnapshot(5).withMode(ScanMode.DELTA).read().splits());
// batch mode
TableScan batchScan = table.newScan();
TableScan.Plan plan = batchScan.plan();
assertThat(plan.splits())
-
.isEqualTo(snapshotReader.withSnapshot(4).withKind(ScanKind.ALL).read().splits());
+
.isEqualTo(snapshotReader.withSnapshot(4).withMode(ScanMode.ALL).read().splits());
}
@Test
@@ -184,13 +183,13 @@ public class StartupModeTest extends ScannerTestBase {
assertThat(firstPlan.splits()).isEmpty();
assertThat(secondPlan.splits())
-
.isEqualTo(snapshotReader.withSnapshot(2).withKind(ScanKind.DELTA).read().splits());
+
.isEqualTo(snapshotReader.withSnapshot(2).withMode(ScanMode.DELTA).read().splits());
// batch mode
TableScan batchScan = table.newScan();
TableScan.Plan plan = batchScan.plan();
assertThat(plan.splits())
-
.isEqualTo(snapshotReader.withSnapshot(2).withKind(ScanKind.ALL).read().splits());
+
.isEqualTo(snapshotReader.withSnapshot(2).withMode(ScanMode.ALL).read().splits());
}
@Test
@@ -212,7 +211,7 @@ public class StartupModeTest extends ScannerTestBase {
assertThat(firstPlan.splits()).isEmpty();
// ceiled up to the earliest snapshot id = 2
assertThat(secondPlan.splits())
-
.isEqualTo(snapshotReader.withSnapshot(2).withKind(ScanKind.DELTA).read().splits());
+
.isEqualTo(snapshotReader.withSnapshot(2).withMode(ScanMode.DELTA).read().splits());
// batch mode
TableScan batchScan = table.newScan();
@@ -232,15 +231,15 @@ public class StartupModeTest extends ScannerTestBase {
TableScan.Plan secondPlan = dataTableScan.plan();
assertThat(firstPlan.splits())
-
.isEqualTo(snapshotReader.withSnapshot(2).withKind(ScanKind.ALL).read().splits());
+
.isEqualTo(snapshotReader.withSnapshot(2).withMode(ScanMode.ALL).read().splits());
assertThat(secondPlan.splits())
-
.isEqualTo(snapshotReader.withSnapshot(3).withKind(ScanKind.DELTA).read().splits());
+
.isEqualTo(snapshotReader.withSnapshot(3).withMode(ScanMode.DELTA).read().splits());
// batch mode
TableScan batchScan = table.newScan();
TableScan.Plan plan = batchScan.plan();
assertThat(plan.splits())
-
.isEqualTo(snapshotReader.withSnapshot(2).withKind(ScanKind.ALL).read().splits());
+
.isEqualTo(snapshotReader.withSnapshot(2).withMode(ScanMode.ALL).read().splits());
}
@Test
@@ -260,9 +259,9 @@ public class StartupModeTest extends ScannerTestBase {
// ceiled up to the earliest snapshot id = 2
assertThat(firstPlan.splits())
-
.isEqualTo(snapshotReader.withSnapshot(2).withKind(ScanKind.ALL).read().splits());
+
.isEqualTo(snapshotReader.withSnapshot(2).withMode(ScanMode.ALL).read().splits());
assertThat(secondPlan.splits())
-
.isEqualTo(snapshotReader.withSnapshot(3).withKind(ScanKind.DELTA).read().splits());
+
.isEqualTo(snapshotReader.withSnapshot(3).withMode(ScanMode.DELTA).read().splits());
// batch mode
TableScan batchScan = table.newScan();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
index 30ac622ce..2905d8a30 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
@@ -22,6 +22,7 @@ import org.apache.paimon.flink.utils.JavaTypeInfo;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.RichPlan;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
@@ -180,12 +181,13 @@ public class MonitorFunction extends
RichSourceFunction<Split>
return;
}
try {
- List<Split> splits = scan.plan().splits();
+ RichPlan plan = scan.plan();
+ List<Split> splits = plan.splits();
isEmpty = splits.isEmpty();
splits.forEach(ctx::collect);
if (emitSnapshotWatermark) {
- Long watermark = scan.watermark();
+ Long watermark = plan.watermark();
if (watermark != null) {
ctx.emitWatermark(new Watermark(watermark));
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index d02cd2b89..4d51c0f89 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -21,12 +21,13 @@ package org.apache.paimon.flink.source;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.table.BucketMode;
-import org.apache.paimon.table.source.DataFilePlan;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
+import org.apache.paimon.table.source.RichPlan;
+import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.SnapshotNotExistPlan;
+import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
-import org.apache.paimon.table.source.TableScan;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
@@ -200,7 +201,7 @@ public class ContinuousFileSplitEnumeratorTest {
context.registerReader(0, "test-host");
context.registerReader(1, "test-host");
- TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
+ TreeMap<Long, RichPlan> results = new TreeMap<>();
MockScan scan = new MockScan(results);
ContinuousFileSplitEnumerator enumerator =
new Builder()
@@ -216,7 +217,7 @@ public class ContinuousFileSplitEnumeratorTest {
for (int i = 0; i < 4; i++) {
splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
}
- results.put(1L, new DataFilePlan(splits));
+ results.put(1L, new MockPlan(splits));
context.triggerAllActions();
// assign to task 0
@@ -269,7 +270,7 @@ public class ContinuousFileSplitEnumeratorTest {
new TestingSplitEnumeratorContext<>(3);
context.registerReader(0, "test-host");
- TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
+ TreeMap<Long, RichPlan> results = new TreeMap<>();
StreamTableScan scan = new MockScan(results);
ContinuousFileSplitEnumerator enumerator =
new Builder()
@@ -284,7 +285,7 @@ public class ContinuousFileSplitEnumeratorTest {
long snapshot = 0;
List<DataSplit> splits = new ArrayList<>();
splits.add(createDataSplit(snapshot, 1, Collections.emptyList()));
- results.put(1L, new DataFilePlan(splits));
+ results.put(1L, new MockPlan(splits));
context.triggerAllActions();
// assign to task 0
@@ -296,7 +297,7 @@ public class ContinuousFileSplitEnumeratorTest {
splits.clear();
splits.add(createDataSplit(snapshot, 2, Collections.emptyList()));
- results.put(2L, new DataFilePlan(splits));
+ results.put(2L, new MockPlan(splits));
context.triggerAllActions();
// assign to task 0
@@ -315,7 +316,7 @@ public class ContinuousFileSplitEnumeratorTest {
context.registerReader(2, "test-host");
context.registerReader(3, "test-host");
- TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
+ TreeMap<Long, RichPlan> results = new TreeMap<>();
StreamTableScan scan = new MockScan(results);
ContinuousFileSplitEnumerator enumerator =
new Builder()
@@ -332,7 +333,7 @@ public class ContinuousFileSplitEnumeratorTest {
for (int i = 0; i < 100; i++) {
splits.add(createDataSplit(snapshot, 0, Collections.emptyList()));
}
- results.put(1L, new DataFilePlan(splits));
+ results.put(1L, new MockPlan(splits));
context.triggerAllActions();
// assign to task 0
@@ -378,7 +379,7 @@ public class ContinuousFileSplitEnumeratorTest {
context.registerReader(2, "test-host");
context.registerReader(3, "test-host");
- TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
+ TreeMap<Long, RichPlan> results = new TreeMap<>();
MockScan scan = new MockScan(results);
ContinuousFileSplitEnumerator enumerator =
new Builder()
@@ -407,7 +408,7 @@ public class ContinuousFileSplitEnumeratorTest {
for (int i = 0; i < 100; i++) {
splits.add(createDataSplit(snapshot, 0, Collections.emptyList()));
}
- results.put(1L, new DataFilePlan(splits));
+ results.put(1L, new MockPlan(splits));
// trigger assign task 0 and task 1 will get their assignment
context.triggerAllActions();
@@ -436,7 +437,7 @@ public class ContinuousFileSplitEnumeratorTest {
context.registerReader(0, "test-host");
context.registerReader(1, "test-host");
- TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
+ TreeMap<Long, RichPlan> results = new TreeMap<>();
StreamTableScan scan = new MockScan(results);
ContinuousFileSplitEnumerator enumerator =
new Builder()
@@ -453,7 +454,7 @@ public class ContinuousFileSplitEnumeratorTest {
for (int i = 0; i < 4; i++) {
splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
}
- results.put(1L, new DataFilePlan(splits));
+ results.put(1L, new MockPlan(splits));
context.triggerAllActions();
// assign to task 0
@@ -477,7 +478,7 @@ public class ContinuousFileSplitEnumeratorTest {
context.registerReader(0, "test-host");
context.registerReader(1, "test-host");
- TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
+ TreeMap<Long, RichPlan> results = new TreeMap<>();
StreamTableScan scan = new MockScan(results);
ContinuousFileSplitEnumerator enumerator =
new Builder()
@@ -495,7 +496,7 @@ public class ContinuousFileSplitEnumeratorTest {
for (int i = 0; i < 4; i++) {
splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
}
- results.put(1L, new DataFilePlan(splits));
+ results.put(1L, new MockPlan(splits));
context.registeredReaders().remove(1);
// assign to task 0
@@ -510,7 +511,7 @@ public class ContinuousFileSplitEnumeratorTest {
context.registerReader(0, "test-host");
context.registerReader(1, "test-host");
- TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
+ TreeMap<Long, RichPlan> results = new TreeMap<>();
MockScan scan = new MockScan(results);
scan.allowEnd(false);
ContinuousFileSplitEnumerator enumerator =
@@ -527,7 +528,7 @@ public class ContinuousFileSplitEnumeratorTest {
for (int i = 0; i < 4; i++) {
splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
}
- results.put(1L, new DataFilePlan(splits));
+ results.put(1L, new MockPlan(splits));
// request directly
enumerator.handleSplitRequest(0, "test-host");
@@ -555,7 +556,7 @@ public class ContinuousFileSplitEnumeratorTest {
context.registerReader(2, "test-host");
context.registerReader(3, "test-host");
- TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
+ TreeMap<Long, RichPlan> results = new TreeMap<>();
MockScan scan = new MockScan(results);
scan.allowEnd(false);
ContinuousFileSplitEnumerator enumerator =
@@ -574,7 +575,7 @@ public class ContinuousFileSplitEnumeratorTest {
for (int i = 0; i < 2; i++) {
splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
}
- results.put(1L, new DataFilePlan(splits));
+ results.put(1L, new MockPlan(splits));
// will not trigger scan here
enumerator.handleSplitRequest(0, "test-host");
@@ -602,7 +603,7 @@ public class ContinuousFileSplitEnumeratorTest {
for (int i = 2; i < 4; i++) {
splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
}
- results.put(2L, new DataFilePlan(splits));
+ results.put(2L, new MockPlan(splits));
// because blockScanByRequest = false, so this request will trigger
scan
enumerator.handleSplitRequest(2, "test-host");
context.getExecutorService().triggerAllNonPeriodicTasks();
@@ -620,7 +621,7 @@ public class ContinuousFileSplitEnumeratorTest {
context.getExecutorService().triggerAllNonPeriodicTasks();
splits.clear();
splits.add(createDataSplit(snapshot, 7, Collections.emptyList()));
- results.put(3L, new DataFilePlan(splits));
+ results.put(3L, new MockPlan(splits));
// this won't trigger scan, cause blockScanByRequest = true
enumerator.handleSplitRequest(3, "test-host");
@@ -647,13 +648,13 @@ public class ContinuousFileSplitEnumeratorTest {
context.registerReader(0, "test-host");
// prepare test data
- TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
+ TreeMap<Long, RichPlan> results = new TreeMap<>();
Map<Long, List<DataSplit>> expectedResults = new HashMap<>(4);
StreamTableScan scan = new MockScan(results);
for (int i = 1; i <= 4; i++) {
List<DataSplit> dataSplits =
Collections.singletonList(createDataSplit(i, 0,
Collections.emptyList()));
- results.put((long) i, new DataFilePlan(dataSplits));
+ results.put((long) i, new MockPlan(dataSplits));
expectedResults.put((long) i, dataSplits);
}
@@ -787,20 +788,51 @@ public class ContinuousFileSplitEnumeratorTest {
}
}
+ private static class MockPlan implements RichPlan {
+
+ private final List<DataSplit> splits;
+
+ public MockPlan(List<DataSplit> splits) {
+ this.splits = splits;
+ }
+
+ @Nullable
+ @Override
+ public Long watermark() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Nullable
+ @Override
+ public Long snapshotId() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ScanMode scanMode() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<Split> splits() {
+ return (List) splits;
+ }
+ }
+
private static class MockScan implements StreamTableScan {
- private final TreeMap<Long, Plan> results;
+ private final TreeMap<Long, RichPlan> results;
private @Nullable Long nextSnapshotId;
private boolean allowEnd = true;
- public MockScan(TreeMap<Long, Plan> results) {
+ public MockScan(TreeMap<Long, RichPlan> results) {
this.results = results;
this.nextSnapshotId = null;
}
@Override
- public Plan plan() {
- Map.Entry<Long, Plan> planEntry = results.pollFirstEntry();
+ public RichPlan plan() {
+ Map.Entry<Long, RichPlan> planEntry = results.pollFirstEntry();
if (planEntry == null) {
if (allowEnd) {
throw new EndOfScanException();
@@ -825,14 +857,11 @@ public class ContinuousFileSplitEnumeratorTest {
@Override
public void notifyCheckpointComplete(@Nullable Long nextSnapshot) {}
- @Nullable
@Override
- public Long watermark() {
- return null;
- }
+ public void restore(Long state) {}
@Override
- public void restore(Long state) {}
+ public void restore(@Nullable Long nextSnapshotId, ScanMode scanMode)
{}
public void allowEnd(boolean allowEnd) {
this.allowEnd = allowEnd;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
index 2fb1f090a..880f3288e 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
@@ -25,6 +25,7 @@ import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.stats.StatsTestUtils;
import org.apache.paimon.table.source.DataFilePlan;
import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.table.source.snapshot.SnapshotReaderImpl;
@@ -59,6 +60,11 @@ public class FileStoreSourceSplitGeneratorTest {
return 1L;
}
+ @Override
+ public ScanMode scanMode() {
+ return ScanMode.ALL;
+ }
+
@Override
public List<ManifestEntry> files() {
return Arrays.asList(