This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new feabeaabe [core][bug] BoundedChecker may check non-existing snapshot
at fully read phase (#865)
feabeaabe is described below
commit feabeaabe43b33c865347b1e9319f32359da1a58
Author: yuzelin <[email protected]>
AuthorDate: Thu Apr 13 09:51:01 2023 +0800
[core][bug] BoundedChecker may check non-existing snapshot at fully read
phase (#865)
---
.../apache/paimon/table/source/DataFilePlan.java | 9 ++---
.../table/source/InnerStreamTableScanImpl.java | 21 ++++++-----
.../source/snapshot/CompactedStartingScanner.java | 5 ++-
.../ContinuousCompactorStartingScanner.java | 9 ++---
.../ContinuousFromSnapshotStartingScanner.java | 12 +++----
.../ContinuousFromTimestampStartingScanner.java | 7 ++--
.../snapshot/ContinuousLatestStartingScanner.java | 7 ++--
.../table/source/snapshot/FullStartingScanner.java | 7 ++--
.../table/source/snapshot/StartingScanner.java | 42 ++++++++++++++--------
.../StaticFromSnapshotStartingScanner.java | 7 ++--
.../StaticFromTimestampStartingScanner.java | 5 ++-
.../paimon/table/source/StreamTableScanTest.java | 19 ++++++++++
.../snapshot/CompactedStartingScannerTest.java | 12 ++++---
.../ContinuousCompactorStartingScannerTest.java | 9 ++---
...ContinuousFromTimestampStartingScannerTest.java | 16 +++++----
.../ContinuousLatestStartingScannerTest.java | 9 ++---
.../snapshot/FullCompactedStartingScannerTest.java | 12 ++++---
.../source/snapshot/FullStartingScannerTest.java | 8 +++--
18 files changed, 124 insertions(+), 92 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataFilePlan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataFilePlan.java
index 380325b9d..6c113f2df 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataFilePlan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataFilePlan.java
@@ -20,8 +20,6 @@ package org.apache.paimon.table.source;
import org.apache.paimon.table.source.snapshot.StartingScanner;
-import javax.annotation.Nullable;
-
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -40,7 +38,10 @@ public class DataFilePlan implements TableScan.Plan {
return new ArrayList<>(splits);
}
- public static DataFilePlan fromResult(@Nullable StartingScanner.Result
result) {
- return new DataFilePlan(result == null ? Collections.emptyList() :
result.splits());
+ public static DataFilePlan fromResult(StartingScanner.Result result) {
+ return new DataFilePlan(
+ result instanceof StartingScanner.ScannedResult
+ ? ((StartingScanner.ScannedResult) result).splits()
+ : Collections.emptyList());
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
index 46c01b219..5ec559cde 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
@@ -51,7 +51,7 @@ public class InnerStreamTableScanImpl extends
AbstractInnerTableScan
private StartingScanner startingScanner;
private FollowUpScanner followUpScanner;
private BoundedChecker boundedChecker;
- private boolean isEnd = false;
+ private boolean isFullPhaseEnd = false;
@Nullable private Long nextSnapshotId;
public InnerStreamTableScanImpl(
@@ -92,19 +92,24 @@ public class InnerStreamTableScanImpl extends
AbstractInnerTableScan
private Plan tryFirstPlan() {
StartingScanner.Result result = startingScanner.scan(snapshotManager,
snapshotSplitReader);
- if (result != null) {
- long snapshotId = result.snapshotId();
- nextSnapshotId = snapshotId + 1;
- if
(boundedChecker.shouldEndInput(snapshotManager.snapshot(snapshotId))) {
- isEnd = true;
- }
+ if (result instanceof StartingScanner.ScannedResult) {
+ long currentSnapshotId = ((StartingScanner.ScannedResult)
result).currentSnapshotId();
+ nextSnapshotId = currentSnapshotId + 1;
+ isFullPhaseEnd =
+
boundedChecker.shouldEndInput(snapshotManager.snapshot(currentSnapshotId));
+ } else if (result instanceof StartingScanner.NextSnapshot) {
+ nextSnapshotId = ((StartingScanner.NextSnapshot)
result).nextSnapshotId();
+ isFullPhaseEnd =
+ snapshotManager.snapshotExists(nextSnapshotId - 1)
+ && boundedChecker.shouldEndInput(
+ snapshotManager.snapshot(nextSnapshotId -
1));
}
return DataFilePlan.fromResult(result);
}
private Plan nextPlan() {
while (true) {
- if (isEnd) {
+ if (isFullPhaseEnd) {
throw new EndOfScanException();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
index adb9d2fee..9515d8ee0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
@@ -33,14 +33,13 @@ public class CompactedStartingScanner implements
StartingScanner {
private static final Logger LOG =
LoggerFactory.getLogger(CompactedStartingScanner.class);
@Override
- @Nullable
public Result scan(SnapshotManager snapshotManager, SnapshotSplitReader
snapshotSplitReader) {
Long startingSnapshotId = pick(snapshotManager);
if (startingSnapshotId == null) {
startingSnapshotId = snapshotManager.latestSnapshotId();
if (startingSnapshotId == null) {
LOG.debug("There is currently no snapshot. Wait for the
snapshot generation.");
- return null;
+ return new NoSnapshot();
} else {
LOG.debug(
"No compact snapshot found, reading from the latest
snapshot {}.",
@@ -48,7 +47,7 @@ public class CompactedStartingScanner implements
StartingScanner {
}
}
- return new Result(
+ return new ScannedResult(
startingSnapshotId,
snapshotSplitReader
.withKind(ScanKind.ALL)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScanner.java
index e92291e5b..907f9a896 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScanner.java
@@ -24,8 +24,6 @@ import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-
/** {@link StartingScanner} used internally for stand-alone streaming compact
job sources. */
public class ContinuousCompactorStartingScanner implements StartingScanner {
@@ -33,26 +31,25 @@ public class ContinuousCompactorStartingScanner implements
StartingScanner {
LoggerFactory.getLogger(ContinuousCompactorStartingScanner.class);
@Override
- @Nullable
public Result scan(SnapshotManager snapshotManager, SnapshotSplitReader
snapshotSplitReader) {
Long latestSnapshotId = snapshotManager.latestSnapshotId();
Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
if (latestSnapshotId == null || earliestSnapshotId == null) {
LOG.debug("There is currently no snapshot. Wait for the snapshot
generation.");
- return null;
+ return new NoSnapshot();
}
for (long id = latestSnapshotId; id >= earliestSnapshotId; id--) {
Snapshot snapshot = snapshotManager.snapshot(id);
if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
LOG.debug("Found latest compact snapshot {}, reading from the
next snapshot.", id);
- return new Result(id);
+ return new NextSnapshot(id + 1);
}
}
LOG.debug(
"No compact snapshot found, reading from the earliest snapshot
{}.",
earliestSnapshotId);
- return new Result(earliestSnapshotId - 1);
+ return new NextSnapshot(earliestSnapshotId);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java
index 355bd6d6f..aca3f55fc 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java
@@ -21,8 +21,6 @@ package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.utils.SnapshotManager;
-import javax.annotation.Nullable;
-
/**
* {@link StartingScanner} for the {@link
CoreOptions.StartupMode#FROM_SNAPSHOT} startup mode of a
* streaming read.
@@ -36,15 +34,13 @@ public class ContinuousFromSnapshotStartingScanner
implements StartingScanner {
}
@Override
- @Nullable
public Result scan(SnapshotManager snapshotManager, SnapshotSplitReader
snapshotSplitReader) {
Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
if (earliestSnapshotId == null) {
- return null;
+ return new NoSnapshot();
}
- // We should use `snapshotId - 1` here to start to scan delta data
from specific snapshot
- // id. If the snapshotId < earliestSnapshotId, start to scan from the
earliest.
- return new Result(
- snapshotId >= earliestSnapshotId ? snapshotId - 1 :
earliestSnapshotId - 1);
+ // We should return the specified snapshot as next snapshot to
indicate to scan delta data
+ // from it. If the snapshotId < earliestSnapshotId, start from the
earliest.
+ return new NextSnapshot(Math.max(snapshotId, earliestSnapshotId));
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
index 4f6f318e1..985752ae5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
@@ -24,8 +24,6 @@ import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-
/**
* {@link StartingScanner} for the {@link
CoreOptions.StartupMode#FROM_TIMESTAMP} startup mode of a
* streaming read.
@@ -42,13 +40,12 @@ public class ContinuousFromTimestampStartingScanner
implements StartingScanner {
}
@Override
- @Nullable
public Result scan(SnapshotManager snapshotManager, SnapshotSplitReader
snapshotSplitReader) {
Long startingSnapshotId =
snapshotManager.earlierThanTimeMills(startupMillis);
if (startingSnapshotId == null) {
LOG.debug("There is currently no snapshot. Waiting for snapshot
generation.");
- return null;
+ return new NoSnapshot();
}
- return new Result(startingSnapshotId);
+ return new NextSnapshot(startingSnapshotId + 1);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java
index f0c5c6fe1..099434be7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java
@@ -24,8 +24,6 @@ import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-
/**
* {@link StartingScanner} for the {@link CoreOptions.StartupMode#LATEST}
startup mode of a
* streaming read.
@@ -36,13 +34,12 @@ public class ContinuousLatestStartingScanner implements
StartingScanner {
LoggerFactory.getLogger(ContinuousLatestStartingScanner.class);
@Override
- @Nullable
public Result scan(SnapshotManager snapshotManager, SnapshotSplitReader
snapshotSplitReader) {
Long startingSnapshotId = snapshotManager.latestSnapshotId();
if (startingSnapshotId == null) {
LOG.debug("There is currently no snapshot. Wait for the snapshot
generation.");
- return null;
+ return new NoSnapshot();
}
- return new Result(startingSnapshotId);
+ return new NextSnapshot(startingSnapshotId + 1);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
index fdc93ab5d..6d11f1eb7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
@@ -25,22 +25,19 @@ import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-
/** {@link StartingScanner} for the {@link
CoreOptions.StartupMode#LATEST_FULL} startup mode. */
public class FullStartingScanner implements StartingScanner {
private static final Logger LOG =
LoggerFactory.getLogger(FullStartingScanner.class);
@Override
- @Nullable
public Result scan(SnapshotManager snapshotManager, SnapshotSplitReader
snapshotSplitReader) {
Long startingSnapshotId = snapshotManager.latestSnapshotId();
if (startingSnapshotId == null) {
LOG.debug("There is currently no snapshot. Waiting for snapshot
generation.");
- return null;
+ return new NoSnapshot();
}
- return new Result(
+ return new ScannedResult(
startingSnapshotId,
snapshotSplitReader
.withKind(ScanKind.ALL)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
index 3e850de3c..16fc46aa0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
@@ -22,38 +22,52 @@ import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.utils.SnapshotManager;
-import javax.annotation.Nullable;
-
-import java.util.Collections;
import java.util.List;
/** Helper class for the first planning of {@link TableScan}. */
public interface StartingScanner {
- @Nullable
Result scan(SnapshotManager snapshotManager, SnapshotSplitReader
snapshotSplitReader);
/** Scan result of {@link #scan}. */
- class Result {
+ interface Result {}
- private final long snapshotId;
- private final List<DataSplit> splits;
+ /** Currently, there is no snapshot, need to wait for the snapshot to be
generated. */
+ class NoSnapshot implements Result {}
- public Result(long snapshotId) {
- this(snapshotId, Collections.emptyList());
- }
+ /** Result with scanned snapshot. Next snapshot should be the current
snapshot plus 1. */
+ class ScannedResult implements Result {
+ private final long currentSnapshotId;
+ private final List<DataSplit> splits;
- public Result(long snapshotId, List<DataSplit> splits) {
- this.snapshotId = snapshotId;
+ public ScannedResult(long currentSnapshotId, List<DataSplit> splits) {
+ this.currentSnapshotId = currentSnapshotId;
this.splits = splits;
}
- public long snapshotId() {
- return snapshotId;
+ public long currentSnapshotId() {
+ return currentSnapshotId;
}
public List<DataSplit> splits() {
return splits;
}
}
+
+ /**
+ * Return the next snapshot for followup scanning. The current snapshot is
not scanned (even
+ * doesn't exist), so there are no splits.
+ */
+ class NextSnapshot implements Result {
+
+ private final long nextSnapshotId;
+
+ public NextSnapshot(long nextSnapshotId) {
+ this.nextSnapshotId = nextSnapshotId;
+ }
+
+ public long nextSnapshotId() {
+ return nextSnapshotId;
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
index 5ccf82563..1b796b923 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
@@ -22,8 +22,6 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.operation.ScanKind;
import org.apache.paimon.utils.SnapshotManager;
-import javax.annotation.Nullable;
-
/**
* {@link StartingScanner} for the {@link
CoreOptions.StartupMode#FROM_SNAPSHOT} or {@link
* CoreOptions.StartupMode#FROM_SNAPSHOT_FULL} startup mode of a batch read.
@@ -36,13 +34,12 @@ public class StaticFromSnapshotStartingScanner implements
StartingScanner {
}
@Override
- @Nullable
public Result scan(SnapshotManager snapshotManager, SnapshotSplitReader
snapshotSplitReader) {
if (snapshotManager.earliestSnapshotId() == null
|| snapshotId < snapshotManager.earliestSnapshotId()) {
- return null;
+ return new NoSnapshot();
}
- return new Result(
+ return new ScannedResult(
snapshotId,
snapshotSplitReader.withKind(ScanKind.ALL).withSnapshot(snapshotId).splits());
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
index f4437e41b..f90f81c6f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
@@ -44,16 +44,15 @@ public class StaticFromTimestampStartingScanner implements
StartingScanner {
}
@Override
- @Nullable
public Result scan(SnapshotManager snapshotManager, SnapshotSplitReader
snapshotSplitReader) {
Snapshot startingSnapshot = timeTravelToTimestamp(snapshotManager,
startupMillis);
if (startingSnapshot == null) {
LOG.debug(
"There is currently no snapshot earlier than or equal to
timestamp[{}]",
startupMillis);
- return null;
+ return new NoSnapshot();
}
- return new Result(
+ return new ScannedResult(
startingSnapshot.id(),
snapshotSplitReader
.withKind(ScanKind.ALL)
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/StreamTableScanTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/StreamTableScanTest.java
index a249b2b2e..a6f4d36b5 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/StreamTableScanTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/StreamTableScanTest.java
@@ -22,8 +22,10 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.source.snapshot.ScannerTestBase;
import org.apache.paimon.types.RowKind;
@@ -235,4 +237,21 @@ public class StreamTableScanTest extends ScannerTestBase {
write.close();
commit.close();
}
+
+ @Test
+ public void testStartingFromNonExistingSnapshot() throws Exception {
+ Table table =
+ this.table.copy(
+
Collections.singletonMap(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), "0"));
+
+ StreamWriteBuilder writeBuilder =
table.newStreamWriteBuilder().withCommitUser(commitUser);
+ StreamTableWrite write = writeBuilder.newWrite();
+ StreamTableCommit commit = writeBuilder.newCommit();
+ write.write(rowData(1, 10, 100L));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ StreamTableScan scan = table.newReadBuilder().newStreamScan();
+ TableScan.Plan plan = scan.plan();
+ assertThat(plan.splits()).isEmpty();
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
index 01e40cfea..e16e54678 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
@@ -56,8 +56,9 @@ public class CompactedStartingScannerTest extends
ScannerTestBase {
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(4);
CompactedStartingScanner scanner = new CompactedStartingScanner();
- StartingScanner.Result result = scanner.scan(snapshotManager,
snapshotSplitReader);
- assertThat(result.snapshotId()).isEqualTo(3);
+ StartingScanner.ScannedResult result =
+ (StartingScanner.ScannedResult) scanner.scan(snapshotManager,
snapshotSplitReader);
+ assertThat(result.currentSnapshotId()).isEqualTo(3);
assertThat(getResult(table.newRead(), toSplits(result.splits())))
.hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200",
"+I 1|30|300"));
@@ -69,7 +70,8 @@ public class CompactedStartingScannerTest extends
ScannerTestBase {
public void testNoSnapshot() {
SnapshotManager snapshotManager = table.snapshotManager();
CompactedStartingScanner scanner = new CompactedStartingScanner();
- assertThat(scanner.scan(snapshotManager,
snapshotSplitReader)).isNull();
+ assertThat(scanner.scan(snapshotManager, snapshotSplitReader))
+ .isInstanceOf(StartingScanner.NoSnapshot.class);
}
@Test
@@ -88,7 +90,9 @@ public class CompactedStartingScannerTest extends
ScannerTestBase {
CompactedStartingScanner scanner = new CompactedStartingScanner();
// No compact snapshot found, reading from the latest snapshot
- assertThat(scanner.scan(snapshotManager,
snapshotSplitReader).snapshotId()).isEqualTo(1);
+ StartingScanner.ScannedResult result =
+ (StartingScanner.ScannedResult) scanner.scan(snapshotManager,
snapshotSplitReader);
+ assertThat(result.currentSnapshotId()).isEqualTo(1);
write.close();
commit.close();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScannerTest.java
index c62fa0514..1153c18fd 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScannerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScannerTest.java
@@ -59,9 +59,9 @@ public class ContinuousCompactorStartingScannerTest extends
ScannerTestBase {
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(5);
ContinuousCompactorStartingScanner scanner = new
ContinuousCompactorStartingScanner();
- StartingScanner.Result result = scanner.scan(snapshotManager,
snapshotSplitReader);
- assertThat(result.snapshotId()).isEqualTo(3);
- assertThat(result.splits()).isEmpty();
+ StartingScanner.NextSnapshot result =
+ (StartingScanner.NextSnapshot) scanner.scan(snapshotManager,
snapshotSplitReader);
+ assertThat(result.nextSnapshotId()).isEqualTo(4);
write.close();
commit.close();
@@ -71,6 +71,7 @@ public class ContinuousCompactorStartingScannerTest extends
ScannerTestBase {
public void testNoSnapshot() {
SnapshotManager snapshotManager = table.snapshotManager();
ContinuousCompactorStartingScanner scanner = new
ContinuousCompactorStartingScanner();
- assertThat(scanner.scan(snapshotManager,
snapshotSplitReader)).isNull();
+ assertThat(scanner.scan(snapshotManager, snapshotSplitReader))
+ .isInstanceOf(StartingScanner.NoSnapshot.class);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java
index 69dcb55c4..2c6fe2610 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java
@@ -59,9 +59,9 @@ public class ContinuousFromTimestampStartingScannerTest
extends ScannerTestBase
ContinuousFromTimestampStartingScanner scanner =
new ContinuousFromTimestampStartingScanner(timestamp);
- StartingScanner.Result result = scanner.scan(snapshotManager,
snapshotSplitReader);
- assertThat(result.snapshotId()).isEqualTo(2);
- assertThat(getResult(table.newRead(),
toSplits(result.splits()))).isEmpty();
+ StartingScanner.NextSnapshot result =
+ (StartingScanner.NextSnapshot) scanner.scan(snapshotManager,
snapshotSplitReader);
+ assertThat(result.nextSnapshotId()).isEqualTo(3);
write.close();
commit.close();
@@ -72,7 +72,8 @@ public class ContinuousFromTimestampStartingScannerTest
extends ScannerTestBase
SnapshotManager snapshotManager = table.snapshotManager();
ContinuousFromTimestampStartingScanner scanner =
new
ContinuousFromTimestampStartingScanner(System.currentTimeMillis());
- assertThat(scanner.scan(snapshotManager,
snapshotSplitReader)).isNull();
+ assertThat(scanner.scan(snapshotManager, snapshotSplitReader))
+ .isInstanceOf(StartingScanner.NoSnapshot.class);
}
@Test
@@ -92,9 +93,10 @@ public class ContinuousFromTimestampStartingScannerTest
extends ScannerTestBase
ContinuousFromTimestampStartingScanner scanner =
new ContinuousFromTimestampStartingScanner(timestamp);
- StartingScanner.Result result = scanner.scan(snapshotManager,
snapshotSplitReader);
- assertThat(result.snapshotId()).isEqualTo(0);
- assertThat(getResult(table.newRead(),
toSplits(result.splits()))).isEmpty();
+ StartingScanner.NextSnapshot result =
+ (StartingScanner.NextSnapshot) scanner.scan(snapshotManager,
snapshotSplitReader);
+ // next snapshot
+ assertThat(result.nextSnapshotId()).isEqualTo(1);
write.close();
commit.close();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScannerTest.java
index 33ea201b7..cc6d43e69 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScannerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScannerTest.java
@@ -49,9 +49,9 @@ public class ContinuousLatestStartingScannerTest extends
ScannerTestBase {
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(2);
ContinuousLatestStartingScanner scanner = new
ContinuousLatestStartingScanner();
- StartingScanner.Result result = scanner.scan(snapshotManager,
snapshotSplitReader);
- assertThat(result.snapshotId()).isEqualTo(2);
- assertThat(getResult(table.newRead(),
toSplits(result.splits()))).isEmpty();
+ StartingScanner.NextSnapshot result =
+ (StartingScanner.NextSnapshot) scanner.scan(snapshotManager,
snapshotSplitReader);
+ assertThat(result.nextSnapshotId()).isEqualTo(3);
write.close();
commit.close();
@@ -61,6 +61,7 @@ public class ContinuousLatestStartingScannerTest extends
ScannerTestBase {
public void testNoSnapshot() {
SnapshotManager snapshotManager = table.snapshotManager();
ContinuousLatestStartingScanner scanner = new
ContinuousLatestStartingScanner();
- assertThat(scanner.scan(snapshotManager,
snapshotSplitReader)).isNull();
+ assertThat(scanner.scan(snapshotManager, snapshotSplitReader))
+ .isInstanceOf(StartingScanner.NoSnapshot.class);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScannerTest.java
index ba180684e..b5fba4830 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScannerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScannerTest.java
@@ -46,8 +46,9 @@ public class FullCompactedStartingScannerTest extends
ScannerTestBase {
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(10);
FullCompactedStartingScanner scanner = new
FullCompactedStartingScanner(3);
- StartingScanner.Result result = scanner.scan(snapshotManager,
snapshotSplitReader);
- assertThat(result.snapshotId()).isEqualTo(8);
+ StartingScanner.ScannedResult result =
+ (StartingScanner.ScannedResult) scanner.scan(snapshotManager,
snapshotSplitReader);
+ assertThat(result.currentSnapshotId()).isEqualTo(8);
write.close();
commit.close();
@@ -57,7 +58,8 @@ public class FullCompactedStartingScannerTest extends
ScannerTestBase {
public void testNoSnapshot() {
SnapshotManager snapshotManager = table.snapshotManager();
FullCompactedStartingScanner scanner = new
FullCompactedStartingScanner(3);
- assertThat(scanner.scan(snapshotManager,
snapshotSplitReader)).isNull();
+ assertThat(scanner.scan(snapshotManager, snapshotSplitReader))
+ .isInstanceOf(StartingScanner.NoSnapshot.class);
}
@Test
@@ -86,7 +88,9 @@ public class FullCompactedStartingScannerTest extends
ScannerTestBase {
FullCompactedStartingScanner scanner = new
FullCompactedStartingScanner(3);
// No compact snapshot found, reading from the latest snapshot
- assertThat(scanner.scan(snapshotManager,
snapshotSplitReader).snapshotId()).isEqualTo(4);
+ StartingScanner.ScannedResult result =
+ (StartingScanner.ScannedResult) scanner.scan(snapshotManager,
snapshotSplitReader);
+ assertThat(result.currentSnapshotId()).isEqualTo(4);
write.close();
commit.close();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullStartingScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullStartingScannerTest.java
index 7ce631d20..9737c35b5 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullStartingScannerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullStartingScannerTest.java
@@ -51,8 +51,9 @@ public class FullStartingScannerTest extends ScannerTestBase {
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(2);
FullStartingScanner scanner = new FullStartingScanner();
- StartingScanner.Result result = scanner.scan(snapshotManager,
snapshotSplitReader);
- assertThat(result.snapshotId()).isEqualTo(2);
+ StartingScanner.ScannedResult result =
+ (StartingScanner.ScannedResult) scanner.scan(snapshotManager,
snapshotSplitReader);
+ assertThat(result.currentSnapshotId()).isEqualTo(2);
assertThat(getResult(table.newRead(), toSplits(result.splits())))
.hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200",
"+I 1|30|300"));
@@ -64,6 +65,7 @@ public class FullStartingScannerTest extends ScannerTestBase {
public void testNoSnapshot() {
SnapshotManager snapshotManager = table.snapshotManager();
FullStartingScanner scanner = new FullStartingScanner();
- assertThat(scanner.scan(snapshotManager,
snapshotSplitReader)).isNull();
+ assertThat(scanner.scan(snapshotManager, snapshotSplitReader))
+ .isInstanceOf(StartingScanner.NoSnapshot.class);
}
}