This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f2c6769b3 [core] Supports StartingContext and mini-refactor
StartingScanner (#1955)
f2c6769b3 is described below
commit f2c6769b31802123a9e3e7e7f87f9eba82117c7e
Author: Yann Byron <[email protected]>
AuthorDate: Thu Sep 7 13:33:43 2023 +0800
[core] Supports StartingContext and mini-refactor StartingScanner (#1955)
---
.../table/source/AbstractInnerTableScan.java | 44 ++++++++++++++--------
.../table/source/InnerStreamTableScanImpl.java | 32 ++++++++++++----
.../paimon/table/source/InnerTableScanImpl.java | 2 +-
.../paimon/table/source/StreamTableScan.java | 3 ++
...ngScanner.java => AbstractStartingScanner.java} | 28 ++++++++------
.../source/snapshot/CompactedStartingScanner.java | 18 +++++++--
.../ContinuousCompactorStartingScanner.java | 9 ++++-
.../ContinuousFromSnapshotFullStartingScanner.java | 18 ++++++---
.../ContinuousFromSnapshotStartingScanner.java | 13 +++----
.../ContinuousFromTimestampStartingScanner.java | 18 +++++++--
.../snapshot/ContinuousLatestStartingScanner.java | 18 ++++++++-
.../snapshot/FullCompactedStartingScanner.java | 39 +++++++++++++++++--
.../table/source/snapshot/FullStartingScanner.java | 14 ++++++-
.../snapshot/IncrementalStartingScanner.java | 27 ++++++-------
.../snapshot/IncrementalTagStartingScanner.java | 17 +++++++--
.../IncrementalTimeStampStartingScanner.java | 25 +++++++-----
...otStartingScanner.java => StartingContext.java} | 38 ++++++++++---------
.../table/source/snapshot/StartingScanner.java | 5 ++-
.../StaticFromSnapshotStartingScanner.java | 19 ++++++----
.../snapshot/StaticFromTagStartingScanner.java | 25 ++++++++----
.../StaticFromTimestampStartingScanner.java | 16 +++++---
.../apache/paimon/table/system/AuditLogTable.java | 6 +++
.../snapshot/CompactedStartingScannerTest.java | 13 +++----
.../ContinuousCompactorStartingScannerTest.java | 11 +++---
...ContinuousFromTimestampStartingScannerTest.java | 14 +++----
.../ContinuousLatestStartingScannerTest.java | 11 +++---
.../snapshot/FullCompactedStartingScannerTest.java | 13 +++----
.../source/snapshot/FullStartingScannerTest.java | 9 ++---
.../snapshot/StaticFromTagStartingScannerTest.java | 8 ++--
.../source/ContinuousFileSplitEnumeratorTest.java | 6 +++
30 files changed, 343 insertions(+), 176 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
index 034a333b9..2ec3125bd 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
@@ -43,6 +43,7 @@ import
org.apache.paimon.table.source.snapshot.StaticFromTagStartingScanner;
import
org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.SnapshotManager;
import java.util.List;
import java.util.Optional;
@@ -77,6 +78,7 @@ public abstract class AbstractInnerTableScan implements
InnerTableScan {
}
protected StartingScanner createStartingScanner(boolean isStreaming) {
+ SnapshotManager snapshotManager = snapshotReader.snapshotManager();
CoreOptions.StreamingCompactionType type =
options.toConfiguration().get(CoreOptions.STREAMING_COMPACT);
switch (type) {
@@ -85,11 +87,11 @@ public abstract class AbstractInnerTableScan implements
InnerTableScan {
checkArgument(
isStreaming,
"Set 'streaming-compact' in batch mode. This is
unexpected.");
- return new ContinuousCompactorStartingScanner();
+ return new
ContinuousCompactorStartingScanner(snapshotManager);
}
case BUCKET_UNAWARE:
{
- return new FullStartingScanner();
+ return new FullStartingScanner(snapshotManager);
}
}
@@ -99,18 +101,19 @@ public abstract class AbstractInnerTableScan implements
InnerTableScan {
ConsumerManager consumerManager = snapshotReader.consumerManager();
Optional<Consumer> consumer = consumerManager.consumer(consumerId);
if (consumer.isPresent()) {
- return new
ContinuousFromSnapshotStartingScanner(consumer.get().nextSnapshot());
+ return new ContinuousFromSnapshotStartingScanner(
+ snapshotManager, consumer.get().nextSnapshot());
}
}
CoreOptions.StartupMode startupMode = options.startupMode();
switch (startupMode) {
case LATEST_FULL:
- return new FullStartingScanner();
+ return new FullStartingScanner(snapshotManager);
case LATEST:
return isStreaming
- ? new ContinuousLatestStartingScanner()
- : new FullStartingScanner();
+ ? new ContinuousLatestStartingScanner(snapshotManager)
+ : new FullStartingScanner(snapshotManager);
case COMPACTED_FULL:
if (options.changelogProducer() ==
ChangelogProducer.FULL_COMPACTION
||
options.toConfiguration().contains(FULL_COMPACTION_DELTA_COMMITS)) {
@@ -118,42 +121,51 @@ public abstract class AbstractInnerTableScan implements
InnerTableScan {
options.toConfiguration()
.getOptional(FULL_COMPACTION_DELTA_COMMITS)
.orElse(1);
- return new FullCompactedStartingScanner(deltaCommits);
+ return new FullCompactedStartingScanner(snapshotManager,
deltaCommits);
} else {
- return new CompactedStartingScanner();
+ return new CompactedStartingScanner(snapshotManager);
}
case FROM_TIMESTAMP:
Long startupMillis = options.scanTimestampMills();
return isStreaming
- ? new
ContinuousFromTimestampStartingScanner(startupMillis)
- : new
StaticFromTimestampStartingScanner(startupMillis);
+ ? new
ContinuousFromTimestampStartingScanner(snapshotManager, startupMillis)
+ : new
StaticFromTimestampStartingScanner(snapshotManager, startupMillis);
case FROM_SNAPSHOT:
if (options.scanSnapshotId() != null) {
return isStreaming
- ? new
ContinuousFromSnapshotStartingScanner(options.scanSnapshotId())
- : new
StaticFromSnapshotStartingScanner(options.scanSnapshotId());
+ ? new ContinuousFromSnapshotStartingScanner(
+ snapshotManager, options.scanSnapshotId())
+ : new StaticFromSnapshotStartingScanner(
+ snapshotManager, options.scanSnapshotId());
} else {
checkArgument(!isStreaming, "Cannot scan from tag in
streaming mode.");
- return new
StaticFromTagStartingScanner(options().scanTagName());
+ return new StaticFromTagStartingScanner(
+ snapshotManager, options().scanTagName());
}
case FROM_SNAPSHOT_FULL:
return isStreaming
- ? new
ContinuousFromSnapshotFullStartingScanner(options.scanSnapshotId())
- : new
StaticFromSnapshotStartingScanner(options.scanSnapshotId());
+ ? new ContinuousFromSnapshotFullStartingScanner(
+ snapshotManager, options.scanSnapshotId())
+ : new StaticFromSnapshotStartingScanner(
+ snapshotManager, options.scanSnapshotId());
case INCREMENTAL:
checkArgument(!isStreaming, "Cannot read incremental in
streaming mode.");
Pair<String, String> incrementalBetween =
options.incrementalBetween();
if (options.toMap().get(CoreOptions.INCREMENTAL_BETWEEN.key())
!= null) {
try {
return new IncrementalStartingScanner(
+ snapshotManager,
Long.parseLong(incrementalBetween.getLeft()),
Long.parseLong(incrementalBetween.getRight()));
} catch (NumberFormatException e) {
return new IncrementalTagStartingScanner(
- incrementalBetween.getLeft(),
incrementalBetween.getRight());
+ snapshotManager,
+ incrementalBetween.getLeft(),
+ incrementalBetween.getRight());
}
} else {
return new IncrementalTimeStampStartingScanner(
+ snapshotManager,
Long.parseLong(incrementalBetween.getLeft()),
Long.parseLong(incrementalBetween.getRight()));
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
index eb8566b65..4ff0b3be0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
@@ -31,6 +31,7 @@ import
org.apache.paimon.table.source.snapshot.DeltaFollowUpScanner;
import org.apache.paimon.table.source.snapshot.FollowUpScanner;
import org.apache.paimon.table.source.snapshot.InputChangelogFollowUpScanner;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.table.source.snapshot.StartingContext;
import org.apache.paimon.table.source.snapshot.StartingScanner;
import org.apache.paimon.table.source.snapshot.StartingScanner.ScannedResult;
import org.apache.paimon.utils.SnapshotManager;
@@ -51,9 +52,11 @@ public class InnerStreamTableScanImpl extends
AbstractInnerTableScan
private final boolean supportStreamingReadOverwrite;
private final DefaultValueAssigner defaultValueAssigner;
+ private boolean inited = false;
private StartingScanner startingScanner;
private FollowUpScanner followUpScanner;
private BoundedChecker boundedChecker;
+
private boolean isFullPhaseEnd = false;
@Nullable private Long nextSnapshotId;
@@ -76,8 +79,28 @@ public class InnerStreamTableScanImpl extends
AbstractInnerTableScan
return this;
}
+ @Override
+ public StartingContext startingContext() {
+ if (!inited) {
+ initScanner();
+ }
+ return startingScanner.startingContext();
+ }
+
@Override
public RichPlan plan() {
+ if (!inited) {
+ initScanner();
+ }
+
+ if (nextSnapshotId == null) {
+ return tryFirstPlan();
+ } else {
+ return nextPlan();
+ }
+ }
+
+ private void initScanner() {
if (startingScanner == null) {
startingScanner = createStartingScanner(true);
}
@@ -87,16 +110,11 @@ public class InnerStreamTableScanImpl extends
AbstractInnerTableScan
if (boundedChecker == null) {
boundedChecker = createBoundedChecker();
}
-
- if (nextSnapshotId == null) {
- return tryFirstPlan();
- } else {
- return nextPlan();
- }
+ inited = true;
}
private RichPlan tryFirstPlan() {
- StartingScanner.Result result = startingScanner.scan(snapshotManager,
snapshotReader);
+ StartingScanner.Result result = startingScanner.scan(snapshotReader);
if (result instanceof ScannedResult) {
ScannedResult scannedResult = (ScannedResult) result;
long currentSnapshotId = scannedResult.currentSnapshotId();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
index dc01bd429..cb4e39b23 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
@@ -59,7 +59,7 @@ public class InnerTableScanImpl extends
AbstractInnerTableScan {
if (hasNext) {
hasNext = false;
- StartingScanner.Result result =
startingScanner.scan(snapshotManager, snapshotReader);
+ StartingScanner.Result result =
startingScanner.scan(snapshotReader);
return DataFilePlan.fromResult(result);
} else {
throw new EndOfScanException();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
index 869937137..5c8932059 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
@@ -19,6 +19,7 @@
package org.apache.paimon.table.source;
import org.apache.paimon.annotation.Public;
+import org.apache.paimon.table.source.snapshot.StartingContext;
import org.apache.paimon.utils.Restorable;
import javax.annotation.Nullable;
@@ -33,6 +34,8 @@ import javax.annotation.Nullable;
@Public
public interface StreamTableScan extends TableScan, Restorable<Long> {
+ StartingContext startingContext();
+
@Override
RichPlan plan();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/AbstractStartingScanner.java
similarity index 57%
copy from
paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
copy to
paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/AbstractStartingScanner.java
index a03a707be..d711781d7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/AbstractStartingScanner.java
@@ -18,26 +18,30 @@
package org.apache.paimon.table.source.snapshot;
-import org.apache.paimon.CoreOptions;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.utils.SnapshotManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+/** The abstract class for StartingScanner. */
+public abstract class AbstractStartingScanner implements StartingScanner {
-/** {@link StartingScanner} for the {@link
CoreOptions.StartupMode#LATEST_FULL} startup mode. */
-public class FullStartingScanner implements StartingScanner {
+ protected final SnapshotManager snapshotManager;
- private static final Logger LOG =
LoggerFactory.getLogger(FullStartingScanner.class);
+ protected Long startingSnapshotId = null;
+
+ AbstractStartingScanner(SnapshotManager snapshotManager) {
+ this.snapshotManager = snapshotManager;
+ }
+
+ protected ScanMode startingScanMode() {
+ return ScanMode.DELTA;
+ }
@Override
- public Result scan(SnapshotManager snapshotManager, SnapshotReader
snapshotReader) {
- Long startingSnapshotId = snapshotManager.latestSnapshotId();
+ public StartingContext startingContext() {
if (startingSnapshotId == null) {
- LOG.debug("There is currently no snapshot. Waiting for snapshot
generation.");
- return new NoSnapshot();
+ return StartingContext.EMPTY;
+ } else {
+ return new StartingContext(startingSnapshotId, startingScanMode()
== ScanMode.ALL);
}
- return StartingScanner.fromPlan(
-
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId).read());
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
index a1d9d90a0..a15954627 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
@@ -29,13 +29,23 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
/** {@link StartingScanner} for the {@link
CoreOptions.StartupMode#COMPACTED_FULL} startup mode. */
-public class CompactedStartingScanner implements StartingScanner {
+public class CompactedStartingScanner extends AbstractStartingScanner {
private static final Logger LOG =
LoggerFactory.getLogger(CompactedStartingScanner.class);
+ public CompactedStartingScanner(SnapshotManager snapshotManager) {
+ super(snapshotManager);
+ this.startingSnapshotId = pick();
+ }
+
+ @Override
+ public ScanMode startingScanMode() {
+ return ScanMode.ALL;
+ }
+
@Override
- public Result scan(SnapshotManager snapshotManager, SnapshotReader
snapshotReader) {
- Long startingSnapshotId = pick(snapshotManager);
+ public Result scan(SnapshotReader snapshotReader) {
+ Long startingSnapshotId = pick();
if (startingSnapshotId == null) {
startingSnapshotId = snapshotManager.latestSnapshotId();
if (startingSnapshotId == null) {
@@ -53,7 +63,7 @@ public class CompactedStartingScanner implements
StartingScanner {
}
@Nullable
- protected Long pick(SnapshotManager snapshotManager) {
+ protected Long pick() {
return snapshotManager.pickOrLatest(s -> s.commitKind() ==
Snapshot.CommitKind.COMPACT);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScanner.java
index 4674002a8..5f24366ca 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScanner.java
@@ -25,13 +25,18 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** {@link StartingScanner} used internally for stand-alone streaming compact
job sources. */
-public class ContinuousCompactorStartingScanner implements StartingScanner {
+public class ContinuousCompactorStartingScanner extends
AbstractStartingScanner {
private static final Logger LOG =
LoggerFactory.getLogger(ContinuousCompactorStartingScanner.class);
+ public ContinuousCompactorStartingScanner(SnapshotManager snapshotManager)
{
+ super(snapshotManager);
+ this.startingSnapshotId = snapshotManager.earliestSnapshotId();
+ }
+
@Override
- public Result scan(SnapshotManager snapshotManager, SnapshotReader
snapshotReader) {
+ public Result scan(SnapshotReader snapshotReader) {
Long latestSnapshotId = snapshotManager.latestSnapshotId();
Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
if (latestSnapshotId == null || earliestSnapshotId == null) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotFullStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotFullStartingScanner.java
index 0a1a2c195..29bd0b7cd 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotFullStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotFullStartingScanner.java
@@ -26,20 +26,26 @@ import org.apache.paimon.utils.SnapshotManager;
* {@link StartingScanner} for the {@link
CoreOptions.StartupMode#FROM_SNAPSHOT} or {@link
* CoreOptions.StartupMode#FROM_SNAPSHOT_FULL} startup mode of a batch read.
*/
-public class ContinuousFromSnapshotFullStartingScanner implements
StartingScanner {
- private final long snapshotId;
+public class ContinuousFromSnapshotFullStartingScanner extends
AbstractStartingScanner {
- public ContinuousFromSnapshotFullStartingScanner(long snapshotId) {
- this.snapshotId = snapshotId;
+ public ContinuousFromSnapshotFullStartingScanner(
+ SnapshotManager snapshotManager, long snapshotId) {
+ super(snapshotManager);
+ this.startingSnapshotId = snapshotId;
}
@Override
- public Result scan(SnapshotManager snapshotManager, SnapshotReader
snapshotReader) {
+ public ScanMode startingScanMode() {
+ return ScanMode.ALL;
+ }
+
+ @Override
+ public Result scan(SnapshotReader snapshotReader) {
Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
if (earliestSnapshotId == null) {
return new NoSnapshot();
}
- long ceiledSnapshotId = Math.max(snapshotId, earliestSnapshotId);
+ long ceiledSnapshotId = Math.max(startingSnapshotId,
earliestSnapshotId);
return StartingScanner.fromPlan(
snapshotReader.withMode(ScanMode.ALL).withSnapshot(ceiledSnapshotId).read());
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java
index 8f7319ac2..c966de58c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java
@@ -25,22 +25,21 @@ import org.apache.paimon.utils.SnapshotManager;
* {@link StartingScanner} for the {@link
CoreOptions.StartupMode#FROM_SNAPSHOT} startup mode of a
* streaming read.
*/
-public class ContinuousFromSnapshotStartingScanner implements StartingScanner {
+public class ContinuousFromSnapshotStartingScanner extends
AbstractStartingScanner {
- private final long snapshotId;
-
- public ContinuousFromSnapshotStartingScanner(long snapshotId) {
- this.snapshotId = snapshotId;
+ public ContinuousFromSnapshotStartingScanner(SnapshotManager
snapshotManager, long snapshotId) {
+ super(snapshotManager);
+ this.startingSnapshotId = snapshotId;
}
@Override
- public Result scan(SnapshotManager snapshotManager, SnapshotReader
snapshotReader) {
+ public Result scan(SnapshotReader snapshotReader) {
Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
if (earliestSnapshotId == null) {
return new NoSnapshot();
}
// We should return the specified snapshot as next snapshot to
indicate to scan delta data
// from it. If the snapshotId < earliestSnapshotId, start from the
earliest.
- return new NextSnapshot(Math.max(snapshotId, earliestSnapshotId));
+ return new NextSnapshot(Math.max(startingSnapshotId,
earliestSnapshotId));
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
index d72233519..6113773ff 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
@@ -28,19 +28,31 @@ import org.slf4j.LoggerFactory;
* {@link StartingScanner} for the {@link
CoreOptions.StartupMode#FROM_TIMESTAMP} startup mode of a
* streaming read.
*/
-public class ContinuousFromTimestampStartingScanner implements StartingScanner
{
+public class ContinuousFromTimestampStartingScanner extends
AbstractStartingScanner {
private static final Logger LOG =
LoggerFactory.getLogger(ContinuousFromTimestampStartingScanner.class);
private final long startupMillis;
- public ContinuousFromTimestampStartingScanner(long startupMillis) {
+ public ContinuousFromTimestampStartingScanner(
+ SnapshotManager snapshotManager, long startupMillis) {
+ super(snapshotManager);
this.startupMillis = startupMillis;
+ this.startingSnapshotId =
this.snapshotManager.earlierThanTimeMills(this.startupMillis);
}
@Override
- public Result scan(SnapshotManager snapshotManager, SnapshotReader
snapshotReader) {
+ public StartingContext startingContext() {
+ if (startingSnapshotId == null) {
+ return StartingContext.EMPTY;
+ } else {
+ return new StartingContext(startingSnapshotId + 1, false);
+ }
+ }
+
+ @Override
+ public Result scan(SnapshotReader snapshotReader) {
Long startingSnapshotId =
snapshotManager.earlierThanTimeMills(startupMillis);
if (startingSnapshotId == null) {
LOG.debug("There is currently no snapshot. Waiting for snapshot
generation.");
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java
index 7a1c49c34..423181b37 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java
@@ -28,13 +28,27 @@ import org.slf4j.LoggerFactory;
* {@link StartingScanner} for the {@link CoreOptions.StartupMode#LATEST}
startup mode of a
* streaming read.
*/
-public class ContinuousLatestStartingScanner implements StartingScanner {
+public class ContinuousLatestStartingScanner extends AbstractStartingScanner {
private static final Logger LOG =
LoggerFactory.getLogger(ContinuousLatestStartingScanner.class);
+ public ContinuousLatestStartingScanner(SnapshotManager snapshotManager) {
+ super(snapshotManager);
+ this.startingSnapshotId = snapshotManager.latestSnapshotId();
+ }
+
+ @Override
+ public StartingContext startingContext() {
+ if (startingSnapshotId == null) {
+ return StartingContext.EMPTY;
+ } else {
+ return new StartingContext(startingSnapshotId + 1, false);
+ }
+ }
+
@Override
- public Result scan(SnapshotManager snapshotManager, SnapshotReader
snapshotReader) {
+ public Result scan(SnapshotReader snapshotReader) {
Long startingSnapshotId = snapshotManager.latestSnapshotId();
if (startingSnapshotId == null) {
LOG.debug("There is currently no snapshot. Wait for the snapshot
generation.");
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScanner.java
index 31df3c8b9..5015d56c7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScanner.java
@@ -21,25 +21,32 @@ package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.CoreOptions.StartupMode;
import org.apache.paimon.Snapshot;
import org.apache.paimon.Snapshot.CommitKind;
+import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.utils.SnapshotManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import javax.annotation.Nullable;
/**
* {@link StartingScanner} for the {@link StartupMode#COMPACTED_FULL} startup
mode with
* 'full-compaction.delta-commits'.
*/
-public class FullCompactedStartingScanner extends CompactedStartingScanner {
+public class FullCompactedStartingScanner extends AbstractStartingScanner {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FullCompactedStartingScanner.class);
private final int deltaCommits;
- public FullCompactedStartingScanner(int deltaCommits) {
+ public FullCompactedStartingScanner(SnapshotManager snapshotManager, int
deltaCommits) {
+ super(snapshotManager);
this.deltaCommits = deltaCommits;
+ this.startingSnapshotId = pick();
}
- @Override
@Nullable
- protected Long pick(SnapshotManager snapshotManager) {
+ protected Long pick() {
return snapshotManager.pickOrLatest(this::picked);
}
@@ -49,6 +56,30 @@ public class FullCompactedStartingScanner extends
CompactedStartingScanner {
&& isFullCompactedIdentifier(identifier, deltaCommits);
}
+ @Override
+ public ScanMode startingScanMode() {
+ return ScanMode.ALL;
+ }
+
+ @Override
+ public Result scan(SnapshotReader snapshotReader) {
+ Long startingSnapshotId = pick();
+ if (startingSnapshotId == null) {
+ startingSnapshotId = snapshotManager.latestSnapshotId();
+ if (startingSnapshotId == null) {
+ LOG.debug("There is currently no snapshot. Wait for the
snapshot generation.");
+ return new NoSnapshot();
+ } else {
+ LOG.debug(
+ "No compact snapshot found, reading from the latest
snapshot {}.",
+ startingSnapshotId);
+ }
+ }
+
+ return StartingScanner.fromPlan(
+
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId).read());
+ }
+
public static boolean isFullCompactedIdentifier(long identifier, int
deltaCommits) {
return identifier % deltaCommits == 0 || identifier == Long.MAX_VALUE;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
index a03a707be..c177be7ea 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
@@ -26,12 +26,22 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** {@link StartingScanner} for the {@link
CoreOptions.StartupMode#LATEST_FULL} startup mode. */
-public class FullStartingScanner implements StartingScanner {
+public class FullStartingScanner extends AbstractStartingScanner {
private static final Logger LOG =
LoggerFactory.getLogger(FullStartingScanner.class);
+ public FullStartingScanner(SnapshotManager snapshotManager) {
+ super(snapshotManager);
+ this.startingSnapshotId = snapshotManager.latestSnapshotId();
+ }
+
+ @Override
+ public ScanMode startingScanMode() {
+ return ScanMode.ALL;
+ }
+
@Override
- public Result scan(SnapshotManager snapshotManager, SnapshotReader
snapshotReader) {
+ public Result scan(SnapshotReader snapshotReader) {
Long startingSnapshotId = snapshotManager.latestSnapshotId();
if (startingSnapshotId == null) {
LOG.debug("There is currently no snapshot. Waiting for snapshot
generation.");
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
index fbdf8a858..4e750688b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
@@ -35,26 +35,21 @@ import java.util.List;
import java.util.Map;
/** {@link StartingScanner} for incremental changes by snapshot. */
-public class IncrementalStartingScanner implements StartingScanner {
+public class IncrementalStartingScanner extends AbstractStartingScanner {
- private long start;
- private long end;
+ private long endingSnapshotId;
- public IncrementalStartingScanner(long start, long end) {
- this.start = start;
- this.end = end;
+ public IncrementalStartingScanner(SnapshotManager snapshotManager, long
start, long end) {
+ super(snapshotManager);
+ this.startingSnapshotId = start;
+ this.endingSnapshotId = end;
}
@Override
- public Result scan(SnapshotManager manager, SnapshotReader reader) {
- long earliestSnapshotId = manager.earliestSnapshotId();
- long latestSnapshotId = manager.latestSnapshotId();
- start = (start < earliestSnapshotId) ? earliestSnapshotId - 1 : start;
- end = (end > latestSnapshotId) ? latestSnapshotId : end;
-
+ public Result scan(SnapshotReader reader) {
Map<Pair<BinaryRow, Integer>, List<DataFileMeta>> grouped = new
HashMap<>();
- for (long i = start + 1; i < end + 1; i++) {
- List<DataSplit> splits = readDeltaSplits(reader,
manager.snapshot(i));
+ for (long i = startingSnapshotId + 1; i < endingSnapshotId + 1; i++) {
+ List<DataSplit> splits = readDeltaSplits(reader,
snapshotManager.snapshot(i));
for (DataSplit split : splits) {
grouped.computeIfAbsent(
Pair.of(split.partition(), split.bucket()), k
-> new ArrayList<>())
@@ -70,7 +65,7 @@ public class IncrementalStartingScanner implements
StartingScanner {
reader.splitGenerator().splitForBatch(entry.getValue())) {
result.add(
DataSplit.builder()
- .withSnapshot(end)
+ .withSnapshot(endingSnapshotId)
.withPartition(partition)
.withBucket(bucket)
.withDataFiles(files)
@@ -87,7 +82,7 @@ public class IncrementalStartingScanner implements
StartingScanner {
@Override
public Long snapshotId() {
- return end;
+ return endingSnapshotId;
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
index 242db6b19..2cdf5bff9 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
@@ -23,19 +23,28 @@ import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
/** {@link StartingScanner} for incremental changes by tag. */
-public class IncrementalTagStartingScanner implements StartingScanner {
+public class IncrementalTagStartingScanner extends AbstractStartingScanner {
private final String start;
private final String end;
- public IncrementalTagStartingScanner(String start, String end) {
+ public IncrementalTagStartingScanner(
+ SnapshotManager snapshotManager, String start, String end) {
+ super(snapshotManager);
this.start = start;
this.end = end;
+ TagManager tagManager =
+ new TagManager(snapshotManager.fileIO(),
snapshotManager.tablePath());
+ Snapshot startingSnapshot = tagManager.taggedSnapshot(start);
+ if (startingSnapshot != null) {
+ this.startingSnapshotId = startingSnapshot.id();
+ }
}
@Override
- public Result scan(SnapshotManager manager, SnapshotReader reader) {
- TagManager tagManager = new TagManager(manager.fileIO(),
manager.tablePath());
+ public Result scan(SnapshotReader reader) {
+ TagManager tagManager =
+ new TagManager(snapshotManager.fileIO(),
snapshotManager.tablePath());
Snapshot tag1 = tagManager.taggedSnapshot(start);
Snapshot tag2 = tagManager.taggedSnapshot(end);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTimeStampStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTimeStampStartingScanner.java
index ed8613a26..6e50903b0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTimeStampStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTimeStampStartingScanner.java
@@ -22,31 +22,36 @@ import org.apache.paimon.Snapshot;
import org.apache.paimon.utils.SnapshotManager;
/** {@link StartingScanner} for incremental changes by timestamp. */
-public class IncrementalTimeStampStartingScanner implements StartingScanner {
+public class IncrementalTimeStampStartingScanner extends
AbstractStartingScanner {
private final long startTimestamp;
private final long endTimestamp;
- public IncrementalTimeStampStartingScanner(long startTimestamp, long
endTimestamp) {
+ public IncrementalTimeStampStartingScanner(
+ SnapshotManager snapshotManager, long startTimestamp, long
endTimestamp) {
+ super(snapshotManager);
this.startTimestamp = startTimestamp;
this.endTimestamp = endTimestamp;
+ Snapshot startingSnapshot =
snapshotManager.earlierOrEqualTimeMills(startTimestamp);
+ if (startingSnapshot != null) {
+ this.startingSnapshotId = startingSnapshot.id();
+ }
}
@Override
- public Result scan(SnapshotManager manager, SnapshotReader reader) {
- Snapshot earliestSnapshot =
manager.snapshot(manager.earliestSnapshotId());
- Snapshot latestSnapshot = manager.latestSnapshot();
+ public Result scan(SnapshotReader reader) {
+ Snapshot earliestSnapshot =
snapshotManager.snapshot(snapshotManager.earliestSnapshotId());
+ Snapshot latestSnapshot = snapshotManager.latestSnapshot();
if (startTimestamp > latestSnapshot.timeMillis()
|| endTimestamp < earliestSnapshot.timeMillis()) {
return new NoSnapshot();
}
- Snapshot startSnapshot =
manager.earlierOrEqualTimeMills(startTimestamp);
Long startSnapshotId =
- (startSnapshot == null) ? earliestSnapshot.id() - 1 :
startSnapshot.id();
- Snapshot endSnapshot = manager.earlierOrEqualTimeMills(endTimestamp);
+ (startingSnapshotId == null) ? earliestSnapshot.id() - 1 :
startingSnapshotId;
+ Snapshot endSnapshot =
snapshotManager.earlierOrEqualTimeMills(endTimestamp);
Long endSnapshotId = (endSnapshot == null) ? latestSnapshot.id() :
endSnapshot.id();
IncrementalStartingScanner incrementalStartingScanner =
- new IncrementalStartingScanner(startSnapshotId, endSnapshotId);
- return incrementalStartingScanner.scan(manager, reader);
+ new IncrementalStartingScanner(snapshotManager,
startSnapshotId, endSnapshotId);
+ return incrementalStartingScanner.scan(reader);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingContext.java
similarity index 50%
copy from
paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
copy to
paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingContext.java
index 1157e22d2..9204f4d47 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingContext.java
@@ -18,28 +18,30 @@
package org.apache.paimon.table.source.snapshot;
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.table.source.ScanMode;
-import org.apache.paimon.utils.SnapshotManager;
+/** That contains some information that will be used out of StartingScanner. */
+public class StartingContext {
+ /**
+ * Notice: The snapshot ID is the initial one corresponding to the
StartScanner configuration,
+ * not necessarily the snapshot ID at the time of the actual scan. E.g, in
+ * ContinuousFromSnapshotFullStartingScanner, this snapshot ID used in the
first scan is the
+ * bigger one between the configured one and the earliest one.
+ */
+ private final Long snapshotId;
-/**
- * {@link StartingScanner} for the {@link
CoreOptions.StartupMode#FROM_SNAPSHOT} or {@link
- * CoreOptions.StartupMode#FROM_SNAPSHOT_FULL} startup mode of a batch read.
- */
-public class StaticFromSnapshotStartingScanner implements StartingScanner {
- private final long snapshotId;
+ private final Boolean scanFullSnapshot;
- public StaticFromSnapshotStartingScanner(long snapshotId) {
+ public StartingContext(Long snapshotId, Boolean scanFullSnapshot) {
this.snapshotId = snapshotId;
+ this.scanFullSnapshot = scanFullSnapshot;
+ }
+
+ public Long getSnapshotId() {
+ return this.snapshotId;
}
- @Override
- public Result scan(SnapshotManager snapshotManager, SnapshotReader
snapshotReader) {
- if (snapshotManager.earliestSnapshotId() == null
- || snapshotId < snapshotManager.earliestSnapshotId()) {
- return new NoSnapshot();
- }
- return StartingScanner.fromPlan(
-
snapshotReader.withMode(ScanMode.ALL).withSnapshot(snapshotId).read());
+ public Boolean getScanFullSnapshot() {
+ return this.scanFullSnapshot;
}
+
+ public static final StartingContext EMPTY = new StartingContext(0L, false);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
index af44a8b8a..b09bfe298 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
@@ -20,14 +20,15 @@ package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.TableScan;
-import org.apache.paimon.utils.SnapshotManager;
import java.util.List;
/** Helper class for the first planning of {@link TableScan}. */
public interface StartingScanner {
- Result scan(SnapshotManager snapshotManager, SnapshotReader
snapshotReader);
+ StartingContext startingContext();
+
+ Result scan(SnapshotReader snapshotReader);
/** Scan result of {@link #scan}. */
interface Result {}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
index 1157e22d2..97144e8f8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
@@ -26,20 +26,25 @@ import org.apache.paimon.utils.SnapshotManager;
* {@link StartingScanner} for the {@link
CoreOptions.StartupMode#FROM_SNAPSHOT} or {@link
* CoreOptions.StartupMode#FROM_SNAPSHOT_FULL} startup mode of a batch read.
*/
-public class StaticFromSnapshotStartingScanner implements StartingScanner {
- private final long snapshotId;
+public class StaticFromSnapshotStartingScanner extends AbstractStartingScanner
{
- public StaticFromSnapshotStartingScanner(long snapshotId) {
- this.snapshotId = snapshotId;
+ public StaticFromSnapshotStartingScanner(SnapshotManager snapshotManager,
long snapshotId) {
+ super(snapshotManager);
+ this.startingSnapshotId = snapshotId;
}
@Override
- public Result scan(SnapshotManager snapshotManager, SnapshotReader
snapshotReader) {
+ public ScanMode startingScanMode() {
+ return ScanMode.ALL;
+ }
+
+ @Override
+ public Result scan(SnapshotReader snapshotReader) {
if (snapshotManager.earliestSnapshotId() == null
- || snapshotId < snapshotManager.earliestSnapshotId()) {
+ || startingSnapshotId < snapshotManager.earliestSnapshotId()) {
return new NoSnapshot();
}
return StartingScanner.fromPlan(
-
snapshotReader.withMode(ScanMode.ALL).withSnapshot(snapshotId).read());
+
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId).read());
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java
index b402851a1..20bb3199c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java
@@ -25,21 +25,32 @@ import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
/** {@link StartingScanner} for the {@link CoreOptions#SCAN_TAG_NAME} of a
batch read. */
-public class StaticFromTagStartingScanner implements StartingScanner {
+public class StaticFromTagStartingScanner extends AbstractStartingScanner {
private final String tagName;
- public StaticFromTagStartingScanner(String tagName) {
+ public StaticFromTagStartingScanner(SnapshotManager snapshotManager,
String tagName) {
+ super(snapshotManager);
this.tagName = tagName;
+ TagManager tagManager =
+ new TagManager(snapshotManager.fileIO(),
snapshotManager.tablePath());
+ Snapshot snapshot = tagManager.taggedSnapshot(this.tagName);
+ if (snapshot != null) {
+ this.startingSnapshotId = snapshot.id();
+ }
}
@Override
- public Result scan(SnapshotManager snapshotManager, SnapshotReader
snapshotReader) {
- TagManager tagManager =
- new TagManager(snapshotManager.fileIO(),
snapshotManager.tablePath());
- Snapshot snapshot = tagManager.taggedSnapshot(tagName);
+ public ScanMode startingScanMode() {
+ return ScanMode.ALL;
+ }
+ @Override
+ public Result scan(SnapshotReader snapshotReader) {
+ if (startingSnapshotId == null) {
+ return new NoSnapshot();
+ }
return StartingScanner.fromPlan(
-
snapshotReader.withMode(ScanMode.ALL).withSnapshot(snapshot).read());
+
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId).read());
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
index 315b749df..1c039c0c7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
@@ -32,28 +32,32 @@ import javax.annotation.Nullable;
* {@link StartingScanner} for the {@link
CoreOptions.StartupMode#FROM_TIMESTAMP} startup mode of a
* batch read.
*/
-public class StaticFromTimestampStartingScanner implements StartingScanner {
+public class StaticFromTimestampStartingScanner extends
AbstractStartingScanner {
private static final Logger LOG =
LoggerFactory.getLogger(StaticFromTimestampStartingScanner.class);
private final long startupMillis;
- public StaticFromTimestampStartingScanner(long startupMillis) {
+ public StaticFromTimestampStartingScanner(SnapshotManager snapshotManager,
long startupMillis) {
+ super(snapshotManager);
this.startupMillis = startupMillis;
+ Snapshot snapshot = timeTravelToTimestamp(snapshotManager,
startupMillis);
+ if (snapshot != null) {
+ this.startingSnapshotId = snapshot.id();
+ }
}
@Override
- public Result scan(SnapshotManager snapshotManager, SnapshotReader
snapshotReader) {
- Snapshot startingSnapshot = timeTravelToTimestamp(snapshotManager,
startupMillis);
- if (startingSnapshot == null) {
+ public Result scan(SnapshotReader snapshotReader) {
+ if (startingSnapshotId == null) {
LOG.debug(
"There is currently no snapshot earlier than or equal to
timestamp[{}]",
startupMillis);
return new NoSnapshot();
}
return StartingScanner.fromPlan(
-
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshot.id()).read());
+
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId).read());
}
@Nullable
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index e8e90df1b..28bba8901 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -45,6 +45,7 @@ import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.table.source.snapshot.StartingContext;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
@@ -307,6 +308,11 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
return this;
}
+ @Override
+ public StartingContext startingContext() {
+ return streamScan.startingContext();
+ }
+
@Override
public RichPlan plan() {
return streamScan.plan();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
index d99a5dbcb..60c957300 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
@@ -55,9 +55,9 @@ public class CompactedStartingScannerTest extends
ScannerTestBase {
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(4);
- CompactedStartingScanner scanner = new CompactedStartingScanner();
+ CompactedStartingScanner scanner = new
CompactedStartingScanner(snapshotManager);
StartingScanner.ScannedResult result =
- (StartingScanner.ScannedResult) scanner.scan(snapshotManager,
snapshotReader);
+ (StartingScanner.ScannedResult) scanner.scan(snapshotReader);
assertThat(result.currentSnapshotId()).isEqualTo(3);
assertThat(getResult(table.newRead(), toSplits(result.splits())))
.hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200",
"+I 1|30|300"));
@@ -69,9 +69,8 @@ public class CompactedStartingScannerTest extends
ScannerTestBase {
@Test
public void testNoSnapshot() {
SnapshotManager snapshotManager = table.snapshotManager();
- CompactedStartingScanner scanner = new CompactedStartingScanner();
- assertThat(scanner.scan(snapshotManager, snapshotReader))
- .isInstanceOf(StartingScanner.NoSnapshot.class);
+ CompactedStartingScanner scanner = new
CompactedStartingScanner(snapshotManager);
+
assertThat(scanner.scan(snapshotReader)).isInstanceOf(StartingScanner.NoSnapshot.class);
}
@Test
@@ -87,11 +86,11 @@ public class CompactedStartingScannerTest extends
ScannerTestBase {
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(1);
- CompactedStartingScanner scanner = new CompactedStartingScanner();
+ CompactedStartingScanner scanner = new
CompactedStartingScanner(snapshotManager);
// No compact snapshot found, reading from the latest snapshot
StartingScanner.ScannedResult result =
- (StartingScanner.ScannedResult) scanner.scan(snapshotManager,
snapshotReader);
+ (StartingScanner.ScannedResult) scanner.scan(snapshotReader);
assertThat(result.currentSnapshotId()).isEqualTo(1);
write.close();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScannerTest.java
index d11565ff5..a55a36f01 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScannerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScannerTest.java
@@ -58,9 +58,10 @@ public class ContinuousCompactorStartingScannerTest extends
ScannerTestBase {
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(5);
- ContinuousCompactorStartingScanner scanner = new
ContinuousCompactorStartingScanner();
+ ContinuousCompactorStartingScanner scanner =
+ new ContinuousCompactorStartingScanner(snapshotManager);
StartingScanner.NextSnapshot result =
- (StartingScanner.NextSnapshot) scanner.scan(snapshotManager,
snapshotReader);
+ (StartingScanner.NextSnapshot) scanner.scan(snapshotReader);
assertThat(result.nextSnapshotId()).isEqualTo(4);
write.close();
@@ -70,8 +71,8 @@ public class ContinuousCompactorStartingScannerTest extends
ScannerTestBase {
@Test
public void testNoSnapshot() {
SnapshotManager snapshotManager = table.snapshotManager();
- ContinuousCompactorStartingScanner scanner = new
ContinuousCompactorStartingScanner();
- assertThat(scanner.scan(snapshotManager, snapshotReader))
- .isInstanceOf(StartingScanner.NoSnapshot.class);
+ ContinuousCompactorStartingScanner scanner =
+ new ContinuousCompactorStartingScanner(snapshotManager);
+
assertThat(scanner.scan(snapshotReader)).isInstanceOf(StartingScanner.NoSnapshot.class);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java
index 308e04e60..e9364eeff 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java
@@ -58,9 +58,9 @@ public class ContinuousFromTimestampStartingScannerTest
extends ScannerTestBase
long timestamp = snapshotManager.snapshot(3).timeMillis();
ContinuousFromTimestampStartingScanner scanner =
- new ContinuousFromTimestampStartingScanner(timestamp);
+ new ContinuousFromTimestampStartingScanner(snapshotManager,
timestamp);
StartingScanner.NextSnapshot result =
- (StartingScanner.NextSnapshot) scanner.scan(snapshotManager,
snapshotReader);
+ (StartingScanner.NextSnapshot) scanner.scan(snapshotReader);
assertThat(result.nextSnapshotId()).isEqualTo(3);
write.close();
@@ -71,9 +71,9 @@ public class ContinuousFromTimestampStartingScannerTest
extends ScannerTestBase
public void testNoSnapshot() {
SnapshotManager snapshotManager = table.snapshotManager();
ContinuousFromTimestampStartingScanner scanner =
- new
ContinuousFromTimestampStartingScanner(System.currentTimeMillis());
- assertThat(scanner.scan(snapshotManager, snapshotReader))
- .isInstanceOf(StartingScanner.NoSnapshot.class);
+ new ContinuousFromTimestampStartingScanner(
+ snapshotManager, System.currentTimeMillis());
+
assertThat(scanner.scan(snapshotReader)).isInstanceOf(StartingScanner.NoSnapshot.class);
}
@Test
@@ -92,9 +92,9 @@ public class ContinuousFromTimestampStartingScannerTest
extends ScannerTestBase
long timestamp = snapshotManager.snapshot(1).timeMillis();
ContinuousFromTimestampStartingScanner scanner =
- new ContinuousFromTimestampStartingScanner(timestamp);
+ new ContinuousFromTimestampStartingScanner(snapshotManager,
timestamp);
StartingScanner.NextSnapshot result =
- (StartingScanner.NextSnapshot) scanner.scan(snapshotManager,
snapshotReader);
+ (StartingScanner.NextSnapshot) scanner.scan(snapshotReader);
// next snapshot
assertThat(result.nextSnapshotId()).isEqualTo(1);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScannerTest.java
index 25a292c2f..bfd86e584 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScannerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScannerTest.java
@@ -48,9 +48,10 @@ public class ContinuousLatestStartingScannerTest extends
ScannerTestBase {
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(2);
- ContinuousLatestStartingScanner scanner = new
ContinuousLatestStartingScanner();
+ ContinuousLatestStartingScanner scanner =
+ new ContinuousLatestStartingScanner(snapshotManager);
StartingScanner.NextSnapshot result =
- (StartingScanner.NextSnapshot) scanner.scan(snapshotManager,
snapshotReader);
+ (StartingScanner.NextSnapshot) scanner.scan(snapshotReader);
assertThat(result.nextSnapshotId()).isEqualTo(3);
write.close();
@@ -60,8 +61,8 @@ public class ContinuousLatestStartingScannerTest extends
ScannerTestBase {
@Test
public void testNoSnapshot() {
SnapshotManager snapshotManager = table.snapshotManager();
- ContinuousLatestStartingScanner scanner = new
ContinuousLatestStartingScanner();
- assertThat(scanner.scan(snapshotManager, snapshotReader))
- .isInstanceOf(StartingScanner.NoSnapshot.class);
+ ContinuousLatestStartingScanner scanner =
+ new ContinuousLatestStartingScanner(snapshotManager);
+
assertThat(scanner.scan(snapshotReader)).isInstanceOf(StartingScanner.NoSnapshot.class);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScannerTest.java
index bfdb5d3e8..b11d30b3a 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScannerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScannerTest.java
@@ -45,9 +45,9 @@ public class FullCompactedStartingScannerTest extends
ScannerTestBase {
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(10);
- FullCompactedStartingScanner scanner = new
FullCompactedStartingScanner(3);
+ FullCompactedStartingScanner scanner = new
FullCompactedStartingScanner(snapshotManager, 3);
StartingScanner.ScannedResult result =
- (StartingScanner.ScannedResult) scanner.scan(snapshotManager,
snapshotReader);
+ (StartingScanner.ScannedResult) scanner.scan(snapshotReader);
assertThat(result.currentSnapshotId()).isEqualTo(8);
write.close();
@@ -57,9 +57,8 @@ public class FullCompactedStartingScannerTest extends
ScannerTestBase {
@Test
public void testNoSnapshot() {
SnapshotManager snapshotManager = table.snapshotManager();
- FullCompactedStartingScanner scanner = new
FullCompactedStartingScanner(3);
- assertThat(scanner.scan(snapshotManager, snapshotReader))
- .isInstanceOf(StartingScanner.NoSnapshot.class);
+ FullCompactedStartingScanner scanner = new
FullCompactedStartingScanner(snapshotManager, 3);
+
assertThat(scanner.scan(snapshotReader)).isInstanceOf(StartingScanner.NoSnapshot.class);
}
@Test
@@ -85,11 +84,11 @@ public class FullCompactedStartingScannerTest extends
ScannerTestBase {
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(4);
- FullCompactedStartingScanner scanner = new
FullCompactedStartingScanner(3);
+ FullCompactedStartingScanner scanner = new
FullCompactedStartingScanner(snapshotManager, 3);
// No compact snapshot found, reading from the latest snapshot
StartingScanner.ScannedResult result =
- (StartingScanner.ScannedResult) scanner.scan(snapshotManager,
snapshotReader);
+ (StartingScanner.ScannedResult) scanner.scan(snapshotReader);
assertThat(result.currentSnapshotId()).isEqualTo(4);
write.close();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullStartingScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullStartingScannerTest.java
index 0ab9cfbf4..20a65a04b 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullStartingScannerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullStartingScannerTest.java
@@ -50,9 +50,9 @@ public class FullStartingScannerTest extends ScannerTestBase {
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(2);
- FullStartingScanner scanner = new FullStartingScanner();
+ FullStartingScanner scanner = new FullStartingScanner(snapshotManager);
StartingScanner.ScannedResult result =
- (StartingScanner.ScannedResult) scanner.scan(snapshotManager,
snapshotReader);
+ (StartingScanner.ScannedResult) scanner.scan(snapshotReader);
assertThat(result.currentSnapshotId()).isEqualTo(2);
assertThat(getResult(table.newRead(), toSplits(result.splits())))
.hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200",
"+I 1|30|300"));
@@ -64,8 +64,7 @@ public class FullStartingScannerTest extends ScannerTestBase {
@Test
public void testNoSnapshot() {
SnapshotManager snapshotManager = table.snapshotManager();
- FullStartingScanner scanner = new FullStartingScanner();
- assertThat(scanner.scan(snapshotManager, snapshotReader))
- .isInstanceOf(StartingScanner.NoSnapshot.class);
+ FullStartingScanner scanner = new FullStartingScanner(snapshotManager);
+
assertThat(scanner.scan(snapshotReader)).isInstanceOf(StartingScanner.NoSnapshot.class);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScannerTest.java
index eb524625b..afdffc37e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScannerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScannerTest.java
@@ -53,9 +53,10 @@ public class StaticFromTagStartingScannerTest extends
ScannerTestBase {
table.createTag("tag2", 2);
- StaticFromTagStartingScanner scanner = new
StaticFromTagStartingScanner("tag2");
+ StaticFromTagStartingScanner scanner =
+ new StaticFromTagStartingScanner(snapshotManager, "tag2");
StartingScanner.ScannedResult result =
- (StartingScanner.ScannedResult) scanner.scan(snapshotManager,
snapshotReader);
+ (StartingScanner.ScannedResult) scanner.scan(snapshotReader);
assertThat(result.currentSnapshotId()).isEqualTo(2);
assertThat(getResult(table.newRead(), toSplits(result.splits())))
.hasSameElementsAs(
@@ -68,8 +69,7 @@ public class StaticFromTagStartingScannerTest extends
ScannerTestBase {
@Test
public void testNonExistingTag() {
SnapshotManager snapshotManager = table.snapshotManager();
- StaticFromTagStartingScanner scanner = new
StaticFromTagStartingScanner("non-existing");
- assertThatThrownBy(() -> scanner.scan(snapshotManager, snapshotReader))
+ assertThatThrownBy(() -> new
StaticFromTagStartingScanner(snapshotManager, "non-existing"))
.satisfies(
AssertionUtils.anyCauseMatches(
IllegalArgumentException.class,
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index 4d51c0f89..adf88ffe6 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -28,6 +28,7 @@ import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.SnapshotNotExistPlan;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
+import org.apache.paimon.table.source.snapshot.StartingContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
@@ -830,6 +831,11 @@ public class ContinuousFileSplitEnumeratorTest {
this.nextSnapshotId = null;
}
+ @Override
+ public StartingContext startingContext() {
+ return null;
+ }
+
@Override
public RichPlan plan() {
Map.Entry<Long, RichPlan> planEntry = results.pollFirstEntry();