This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f4cd8c20cc [core] Unify timetravel processing to starting scanners
(#5554)
f4cd8c20cc is described below
commit f4cd8c20cca6b2a052f2c93431b9a7af92ff7241
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Apr 29 20:39:26 2025 +0800
[core] Unify timetravel processing to starting scanners (#5554)
---
.../paimon/table/AbstractFileStoreTable.java | 81 +++-------------
.../StaticFromSnapshotStartingScanner.java | 41 ++++----
.../snapshot/StaticFromTagStartingScanner.java | 11 ++-
.../StaticFromTimestampStartingScanner.java | 17 ++--
.../StaticFromWatermarkStartingScanner.java | 33 ++++---
.../table/source/snapshot/TimeTravelUtil.java | 107 ++++++++++-----------
.../apache/paimon/table/system/ManifestsTable.java | 2 +-
.../paimon/table/system/TableIndexesTable.java | 2 +-
.../table/source/snapshot/TimeTravelUtilsTest.java | 18 +++-
.../paimon/table/system/ManifestsTableTest.java | 3 +-
.../apache/paimon/flink/BatchFileStoreITCase.java | 4 +-
.../apache/paimon/flink/BatchFileStoreITCase.java | 9 +-
12 files changed, 139 insertions(+), 189 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index f8090918eb..1e494799a4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -50,8 +50,6 @@ import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.table.source.StreamDataTableScan;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.SnapshotReaderImpl;
-import
org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
-import
org.apache.paimon.table.source.snapshot.StaticFromWatermarkStartingScanner;
import org.apache.paimon.table.source.snapshot.TimeTravelUtil;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.CatalogBranchManager;
@@ -192,7 +190,7 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
@Override
public Optional<Statistics> statistics() {
- Snapshot snapshot = TimeTravelUtil.resolveSnapshot(this);
+ Snapshot snapshot = TimeTravelUtil.tryTravelOrLatest(this);
if (snapshot != null) {
String file = snapshot.statistics();
if (file == null) {
@@ -480,75 +478,18 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
}
private Optional<TableSchema> tryTimeTravel(Options options) {
- CoreOptions coreOptions = new CoreOptions(options);
-
- switch (coreOptions.startupMode()) {
- case FROM_SNAPSHOT:
- case FROM_SNAPSHOT_FULL:
- if (coreOptions.scanVersion() != null) {
- return travelToVersion(coreOptions.scanVersion(), options);
- } else if (coreOptions.scanSnapshotId() != null) {
- return travelToSnapshot(coreOptions.scanSnapshotId(),
options);
- } else if (coreOptions.scanWatermark() != null) {
- return travelToWatermark(coreOptions.scanWatermark(),
options);
- } else {
- return travelToTag(coreOptions.scanTagName(), options);
- }
- case FROM_TIMESTAMP:
- Snapshot snapshot =
-
StaticFromTimestampStartingScanner.timeTravelToTimestamp(
- snapshotManager(),
coreOptions.scanTimestampMills());
- return travelToSnapshot(snapshot, options);
- default:
- return Optional.empty();
- }
- }
-
- /** Tag first when travelling to a version. */
- private Optional<TableSchema> travelToVersion(String version, Options
options) {
- options.remove(CoreOptions.SCAN_VERSION.key());
- if (tagManager().tagExists(version)) {
- options.set(CoreOptions.SCAN_TAG_NAME, version);
- return travelToTag(version, options);
- } else if (version.startsWith(WATERMARK_PREFIX)) {
- long watermark =
Long.parseLong(version.substring(WATERMARK_PREFIX.length()));
- options.set(CoreOptions.SCAN_WATERMARK, watermark);
- return travelToWatermark(watermark, options);
- } else if (version.chars().allMatch(Character::isDigit)) {
- options.set(CoreOptions.SCAN_SNAPSHOT_ID.key(), version);
- return travelToSnapshot(Long.parseLong(version), options);
- } else {
- throw new RuntimeException("Cannot find a time travel version for
" + version);
- }
- }
-
- private Optional<TableSchema> travelToTag(String tagName, Options options)
{
- return
travelToSnapshot(tagManager().getOrThrow(tagName).trimToSnapshot(), options);
- }
-
- private Optional<TableSchema> travelToSnapshot(long snapshotId, Options
options) {
- SnapshotManager snapshotManager = snapshotManager();
- if (snapshotManager.snapshotExists(snapshotId)) {
- return travelToSnapshot(snapshotManager.snapshot(snapshotId),
options);
- }
- return Optional.empty();
- }
-
- private Optional<TableSchema> travelToWatermark(long watermark, Options
options) {
- Snapshot snapshot =
- StaticFromWatermarkStartingScanner.timeTravelToWatermark(
- snapshotManager(), watermark);
- if (snapshot != null) {
- return
Optional.of(schemaManager().schema(snapshot.schemaId()).copy(options.toMap()));
+ Snapshot snapshot;
+ try {
+ snapshot =
+ TimeTravelUtil.tryTravelToSnapshot(options,
snapshotManager(), tagManager())
+ .orElse(null);
+ } catch (Exception e) {
+ return Optional.empty();
}
- return Optional.empty();
- }
-
- private Optional<TableSchema> travelToSnapshot(@Nullable Snapshot
snapshot, Options options) {
- if (snapshot != null) {
- return
Optional.of(schemaManager().schema(snapshot.schemaId()).copy(options.toMap()));
+ if (snapshot == null) {
+ return Optional.empty();
}
- return Optional.empty();
+ return
Optional.of(schemaManager().schema(snapshot.schemaId()).copy(options.toMap()));
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
index 3621259a6a..ef9119187e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
@@ -19,11 +19,11 @@
package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.utils.SnapshotManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.FileNotFoundException;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -33,9 +33,6 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
*/
public class StaticFromSnapshotStartingScanner extends ReadPlanStartingScanner
{
- private static final Logger LOG =
- LoggerFactory.getLogger(StaticFromSnapshotStartingScanner.class);
-
public StaticFromSnapshotStartingScanner(SnapshotManager snapshotManager,
long snapshotId) {
super(snapshotManager);
this.startingSnapshotId = snapshotId;
@@ -48,21 +45,29 @@ public class StaticFromSnapshotStartingScanner extends
ReadPlanStartingScanner {
@Override
public SnapshotReader configure(SnapshotReader snapshotReader) {
- Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
- Long latestSnapshotId = snapshotManager.latestSnapshotId();
+ return
snapshotReader.withMode(ScanMode.ALL).withSnapshot(getSnapshot());
+ }
- if (earliestSnapshotId == null || latestSnapshotId == null) {
- throw new IllegalArgumentException("There is currently no
snapshot.");
- }
+ public Snapshot getSnapshot() {
+ try {
+ return snapshotManager.tryGetSnapshot(startingSnapshotId);
+ } catch (FileNotFoundException e) {
+ Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
+ Long latestSnapshotId = snapshotManager.latestSnapshotId();
- // Checks earlier whether the specified scan snapshot id is valid.
- checkArgument(
- startingSnapshotId >= earliestSnapshotId && startingSnapshotId
<= latestSnapshotId,
- "The specified scan snapshotId %s is out of available
snapshotId range [%s, %s].",
- startingSnapshotId,
- earliestSnapshotId,
- latestSnapshotId);
+ if (earliestSnapshotId == null || latestSnapshotId == null) {
+ throw new IllegalArgumentException("There is currently no
snapshot.");
+ }
- return
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId);
+ // Checks earlier whether the specified scan snapshot id is valid.
+ checkArgument(
+ startingSnapshotId >= earliestSnapshotId
+ && startingSnapshotId <= latestSnapshotId,
+ "The specified scan snapshotId %s is out of available
snapshotId range [%s, %s].",
+ startingSnapshotId,
+ earliestSnapshotId,
+ latestSnapshotId);
+ throw new RuntimeException(e);
+ }
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java
index b22e17e9a0..3008c970e2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java
@@ -34,6 +34,12 @@ public class StaticFromTagStartingScanner extends
ReadPlanStartingScanner {
this.tagName = tagName;
}
+ public Snapshot getSnapshot() {
+ TagManager tagManager =
+ new TagManager(snapshotManager.fileIO(),
snapshotManager.tablePath());
+ return tagManager.getOrThrow(tagName).trimToSnapshot();
+ }
+
@Override
public ScanMode startingScanMode() {
return ScanMode.ALL;
@@ -41,9 +47,6 @@ public class StaticFromTagStartingScanner extends
ReadPlanStartingScanner {
@Override
public SnapshotReader configure(SnapshotReader snapshotReader) {
- TagManager tagManager =
- new TagManager(snapshotManager.fileIO(),
snapshotManager.tablePath());
- Snapshot snapshot = tagManager.getOrThrow(tagName).trimToSnapshot();
- return snapshotReader.withMode(ScanMode.ALL).withSnapshot(snapshot);
+ return
snapshotReader.withMode(ScanMode.ALL).withSnapshot(getSnapshot());
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
index 0f20938992..7026d2527e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
@@ -23,9 +23,6 @@ import org.apache.paimon.Snapshot;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.utils.SnapshotManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import javax.annotation.Nullable;
/**
@@ -34,15 +31,11 @@ import javax.annotation.Nullable;
*/
public class StaticFromTimestampStartingScanner extends
ReadPlanStartingScanner {
- private static final Logger LOG =
- LoggerFactory.getLogger(StaticFromTimestampStartingScanner.class);
-
- private final long startupMillis;
+ private final Snapshot snapshot;
public StaticFromTimestampStartingScanner(SnapshotManager snapshotManager,
long startupMillis) {
super(snapshotManager);
- this.startupMillis = startupMillis;
- Snapshot snapshot = timeTravelToTimestamp(snapshotManager,
startupMillis);
+ this.snapshot = timeTravelToTimestamp(snapshotManager, startupMillis);
if (snapshot == null) {
Snapshot earliestSnapshot = snapshotManager.earliestSnapshot();
throw new IllegalArgumentException(
@@ -56,9 +49,13 @@ public class StaticFromTimestampStartingScanner extends
ReadPlanStartingScanner
this.startingSnapshotId = snapshot.id();
}
+ public Snapshot getSnapshot() {
+ return snapshot;
+ }
+
@Override
public SnapshotReader configure(SnapshotReader snapshotReader) {
- return
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId);
+ return snapshotReader.withMode(ScanMode.ALL).withSnapshot(snapshot);
}
@Nullable
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromWatermarkStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromWatermarkStartingScanner.java
index 72fba55337..ad4d1f7892 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromWatermarkStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromWatermarkStartingScanner.java
@@ -34,15 +34,26 @@ public class StaticFromWatermarkStartingScanner extends
ReadPlanStartingScanner
private static final Logger LOG =
LoggerFactory.getLogger(StaticFromWatermarkStartingScanner.class);
- private final long watermark;
+ private final Snapshot snapshot;
public StaticFromWatermarkStartingScanner(SnapshotManager snapshotManager,
long watermark) {
super(snapshotManager);
- this.watermark = watermark;
- Snapshot snapshot = timeTravelToWatermark(snapshotManager, watermark);
- if (snapshot != null) {
- this.startingSnapshotId = snapshot.id();
+ this.snapshot = timeTravelToWatermark(snapshotManager, watermark);
+ if (snapshot == null) {
+ LOG.warn(
+ "There is currently no snapshot later than or equal to
watermark[{}]",
+ watermark);
+ throw new RuntimeException(
+ String.format(
+ "There is currently no snapshot later than or
equal to "
+ + "watermark[%d]",
+ watermark));
}
+ this.startingSnapshotId = snapshot.id();
+ }
+
+ public Snapshot getSnapshot() {
+ return snapshot;
}
@Override
@@ -52,17 +63,7 @@ public class StaticFromWatermarkStartingScanner extends
ReadPlanStartingScanner
@Override
public SnapshotReader configure(SnapshotReader snapshotReader) {
- if (startingSnapshotId == null) {
- LOG.warn(
- "There is currently no snapshot later than or equal to
watermark[{}]",
- watermark);
- throw new RuntimeException(
- String.format(
- "There is currently no snapshot later than or
equal to "
- + "watermark[%d]",
- watermark));
- }
- return
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId);
+ return snapshotReader.withMode(ScanMode.ALL).withSnapshot(snapshot);
}
@Nullable
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
index 5f7126d8ee..c2fa1ac5be 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
@@ -20,13 +20,13 @@ package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.ChangelogManager;
import org.apache.paimon.utils.FunctionWithException;
import org.apache.paimon.utils.SnapshotManager;
-import org.apache.paimon.utils.SnapshotNotExistException;
import org.apache.paimon.utils.TagManager;
import org.slf4j.Logger;
@@ -37,8 +37,8 @@ import javax.annotation.Nullable;
import java.io.FileNotFoundException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
-import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static
org.apache.paimon.utils.SnapshotManager.EARLIEST_SNAPSHOT_DEFAULT_RETRY_NUM;
@@ -47,6 +47,8 @@ public class TimeTravelUtil {
private static final Logger LOG =
LoggerFactory.getLogger(TimeTravelUtil.class);
+ private static final String WATERMARK_PREFIX = "watermark-";
+
private static final String[] SCAN_KEYS = {
CoreOptions.SCAN_SNAPSHOT_ID.key(),
CoreOptions.SCAN_TAG_NAME.key(),
@@ -54,21 +56,28 @@ public class TimeTravelUtil {
CoreOptions.SCAN_TIMESTAMP_MILLIS.key()
};
- public static Snapshot resolveSnapshot(FileStoreTable table) {
- return resolveSnapshotFromOptions(table.coreOptions(),
table.snapshotManager());
+ public static Snapshot tryTravelOrLatest(FileStoreTable table) {
+ return tryTravelToSnapshot(table).orElseGet(() ->
table.latestSnapshot().orElse(null));
+ }
+
+ public static Optional<Snapshot> tryTravelToSnapshot(FileStoreTable table)
{
+ return tryTravelToSnapshot(
+ table.coreOptions().toConfiguration(),
table.snapshotManager(), table.tagManager());
}
- public static Snapshot resolveSnapshotFromOptions(
- CoreOptions options, SnapshotManager snapshotManager) {
+ public static Optional<Snapshot> tryTravelToSnapshot(
+ Options options, SnapshotManager snapshotManager, TagManager
tagManager) {
+ adaptScanVersion(options, tagManager);
+
List<String> scanHandleKey = new ArrayList<>(1);
for (String key : SCAN_KEYS) {
- if (options.toConfiguration().containsKey(key)) {
+ if (options.containsKey(key)) {
scanHandleKey.add(key);
}
}
- if (scanHandleKey.size() == 0) {
- return snapshotManager.latestSnapshot();
+ if (scanHandleKey.isEmpty()) {
+ return Optional.empty();
}
checkArgument(
@@ -81,61 +90,49 @@ public class TimeTravelUtil {
CoreOptions.SCAN_TIMESTAMP_MILLIS.key()));
String key = scanHandleKey.get(0);
- Snapshot snapshot = null;
+ CoreOptions coreOptions = new CoreOptions(options);
+ Snapshot snapshot;
if (key.equals(CoreOptions.SCAN_SNAPSHOT_ID.key())) {
- snapshot = resolveSnapshotBySnapshotId(snapshotManager, options);
+ snapshot =
+ new StaticFromSnapshotStartingScanner(
+ snapshotManager,
coreOptions.scanSnapshotId())
+ .getSnapshot();
} else if (key.equals(CoreOptions.SCAN_WATERMARK.key())) {
- snapshot = resolveSnapshotByWatermark(snapshotManager, options);
+ snapshot =
+ new StaticFromWatermarkStartingScanner(
+ snapshotManager,
coreOptions.scanWatermark())
+ .getSnapshot();
} else if (key.equals(CoreOptions.SCAN_TIMESTAMP_MILLIS.key())) {
- snapshot = resolveSnapshotByTimestamp(snapshotManager, options);
+ snapshot =
+ new StaticFromTimestampStartingScanner(
+ snapshotManager,
coreOptions.scanTimestampMills())
+ .getSnapshot();
} else if (key.equals(CoreOptions.SCAN_TAG_NAME.key())) {
- snapshot = resolveSnapshotByTagName(snapshotManager, options);
- }
-
- if (snapshot == null) {
- snapshot = snapshotManager.latestSnapshot();
+ snapshot =
+ new StaticFromTagStartingScanner(snapshotManager,
coreOptions.scanTagName())
+ .getSnapshot();
+ } else {
+ throw new UnsupportedOperationException("Unsupported time travel
mode: " + key);
}
- return snapshot;
+ return Optional.of(snapshot);
}
- private static Snapshot resolveSnapshotBySnapshotId(
- SnapshotManager snapshotManager, CoreOptions options) {
- Long snapshotId = options.scanSnapshotId();
- if (snapshotId != null) {
- if (!snapshotManager.snapshotExists(snapshotId)) {
- Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
- Long latestSnapshotId = snapshotManager.latestSnapshotId();
- throw new SnapshotNotExistException(
- String.format(
- "Specified parameter %s = %s is not exist, you
can set it in range from %s to %s.",
- SCAN_SNAPSHOT_ID.key(),
- snapshotId,
- earliestSnapshotId,
- latestSnapshotId));
- }
- return snapshotManager.snapshot(snapshotId);
+ private static void adaptScanVersion(Options options, TagManager
tagManager) {
+ String version = options.remove(CoreOptions.SCAN_VERSION.key());
+ if (version == null) {
+ return;
}
- return null;
- }
-
- private static Snapshot resolveSnapshotByTimestamp(
- SnapshotManager snapshotManager, CoreOptions options) {
- Long timestamp = options.scanTimestampMills();
- return snapshotManager.earlierOrEqualTimeMills(timestamp);
- }
- private static Snapshot resolveSnapshotByWatermark(
- SnapshotManager snapshotManager, CoreOptions options) {
- Long watermark = options.scanWatermark();
- return snapshotManager.laterOrEqualWatermark(watermark);
- }
-
- private static Snapshot resolveSnapshotByTagName(
- SnapshotManager snapshotManager, CoreOptions options) {
- String tagName = options.scanTagName();
- TagManager tagManager =
- new TagManager(snapshotManager.fileIO(),
snapshotManager.tablePath());
- return tagManager.getOrThrow(tagName).trimToSnapshot();
+ if (tagManager.tagExists(version)) {
+ options.set(CoreOptions.SCAN_TAG_NAME, version);
+ } else if (version.startsWith(WATERMARK_PREFIX)) {
+ long watermark =
Long.parseLong(version.substring(WATERMARK_PREFIX.length()));
+ options.set(CoreOptions.SCAN_WATERMARK, watermark);
+ } else if (version.chars().allMatch(Character::isDigit)) {
+ options.set(CoreOptions.SCAN_SNAPSHOT_ID.key(), version);
+ } else {
+ throw new RuntimeException("Cannot find a time travel version for
" + version);
+ }
}
/**
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
index 549fa6d83f..d1e3307b9b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
@@ -229,7 +229,7 @@ public class ManifestsTable implements ReadonlyTable {
private static List<ManifestFileMeta> allManifests(FileStoreTable
dataTable) {
CoreOptions options = dataTable.coreOptions();
- Snapshot snapshot = TimeTravelUtil.resolveSnapshot(dataTable);
+ Snapshot snapshot = TimeTravelUtil.tryTravelOrLatest(dataTable);
if (snapshot == null) {
LOG.warn("Check if your snapshot is empty.");
return Collections.emptyList();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java
index 647bdd389c..b63cd604a7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java
@@ -230,7 +230,7 @@ public class TableIndexesTable implements ReadonlyTable {
private static List<IndexManifestEntry> allIndexEntries(FileStoreTable
dataTable) {
IndexFileHandler indexFileHandler =
dataTable.store().newIndexFileHandler();
- Snapshot snapshot = TimeTravelUtil.resolveSnapshot(dataTable);
+ Snapshot snapshot = TimeTravelUtil.tryTravelOrLatest(dataTable);
if (snapshot == null) {
LOG.warn("Check if your snapshot is empty.");
return Collections.emptyList();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/TimeTravelUtilsTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/TimeTravelUtilsTest.java
index 89b3bff664..36ed5d23bf 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/TimeTravelUtilsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/TimeTravelUtilsTest.java
@@ -37,7 +37,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class TimeTravelUtilsTest extends ScannerTestBase {
@Test
- public void testResolveSnapshotFromOptions() throws Exception {
+ public void testtryTravelToSnapshot() throws Exception {
SnapshotManager snapshotManager = table.snapshotManager();
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
@@ -56,21 +56,27 @@ public class TimeTravelUtilsTest extends ScannerTestBase {
HashMap<String, String> optMap = new HashMap<>(4);
optMap.put("scan.snapshot-id", "2");
CoreOptions options = CoreOptions.fromMap(optMap);
- Snapshot snapshot = TimeTravelUtil.resolveSnapshotFromOptions(options,
snapshotManager);
+ Snapshot snapshot =
+ TimeTravelUtil.tryTravelToSnapshot(options.toConfiguration(),
snapshotManager, null)
+ .orElse(null);
assertNotNull(snapshot);
assertTrue(snapshot.id() == 2);
optMap.clear();
optMap.put("scan.timestamp-millis", ts + "");
options = CoreOptions.fromMap(optMap);
- snapshot = TimeTravelUtil.resolveSnapshotFromOptions(options,
snapshotManager);
+ snapshot =
+ TimeTravelUtil.tryTravelToSnapshot(options.toConfiguration(),
snapshotManager, null)
+ .orElse(null);
assertTrue(snapshot.id() == 1);
table.createTag("tag3", 3);
optMap.clear();
optMap.put("scan.tag-name", "tag3");
options = CoreOptions.fromMap(optMap);
- snapshot = TimeTravelUtil.resolveSnapshotFromOptions(options,
snapshotManager);
+ snapshot =
+ TimeTravelUtil.tryTravelToSnapshot(options.toConfiguration(),
snapshotManager, null)
+ .orElse(null);
assertTrue(snapshot.id() == 3);
// if contain more scan.xxx config would throw out
@@ -78,7 +84,9 @@ public class TimeTravelUtilsTest extends ScannerTestBase {
CoreOptions options1 = CoreOptions.fromMap(optMap);
assertThrows(
IllegalArgumentException.class,
- () -> TimeTravelUtil.resolveSnapshotFromOptions(options1,
snapshotManager),
+ () ->
+ TimeTravelUtil.tryTravelToSnapshot(
+ options1.toConfiguration(), snapshotManager,
null),
"scan.snapshot-id scan.tag-name scan.watermark and
scan.timestamp-millis can contains only one");
assertThat(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
index c5bb5282fd..c614ca2c27 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
@@ -36,7 +36,6 @@ import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.SnapshotManager;
-import org.apache.paimon.utils.SnapshotNotExistException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -157,7 +156,7 @@ public class ManifestsTableTest extends TableTestBase {
manifestsTable.copy(
Collections.singletonMap(CoreOptions.SCAN_SNAPSHOT_ID.key(), "3"));
assertThrows(
- SnapshotNotExistException.class,
+ Exception.class,
() -> read(manifestsTable),
"Specified parameter scan.snapshot-id = 3 is not exist, you
can set it in range from 1 to 2");
}
diff --git
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 80575c0492..b5e355ee92 100644
---
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -80,7 +80,7 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
assertThatThrownBy(() -> batchSql("SELECT * FROM T /*+
OPTIONS('scan.snapshot-id'='0') */"))
.satisfies(
anyCauseMatches(
- IllegalArgumentException.class,
+ Exception.class,
"The specified scan snapshotId 0 is out of
available snapshotId range [1, 4]."));
assertThatThrownBy(
@@ -89,7 +89,7 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
"SELECT * FROM T /*+
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='0') */"))
.satisfies(
anyCauseMatches(
- IllegalArgumentException.class,
+ Exception.class,
"The specified scan snapshotId 0 is out of
available snapshotId range [1, 4]."));
assertThat(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 5f9773a0cf..144a2d5a8f 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -25,7 +25,6 @@ import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.snapshot.TimeTravelUtil;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.paimon.utils.DateTimeUtils;
-import org.apache.paimon.utils.SnapshotNotExistException;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
@@ -120,8 +119,8 @@ public class BatchFileStoreITCase extends CatalogITCaseBase
{
assertThatThrownBy(() -> batchSql("SELECT * FROM T /*+
OPTIONS('scan.snapshot-id'='0') */"))
.satisfies(
anyCauseMatches(
- SnapshotNotExistException.class,
- "Specified parameter scan.snapshot-id = 0 is
not exist, you can set it in range from 1 to 4."));
+ Exception.class,
+ "The specified scan snapshotId 0 is out of
available snapshotId range [1, 4]."));
assertThatThrownBy(
() ->
@@ -129,8 +128,8 @@ public class BatchFileStoreITCase extends CatalogITCaseBase
{
"SELECT * FROM T /*+
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='0') */"))
.satisfies(
anyCauseMatches(
- SnapshotNotExistException.class,
- "Specified parameter scan.snapshot-id = 0 is
not exist, you can set it in range from 1 to 4."));
+ Exception.class,
+ "The specified scan snapshotId 0 is out of
available snapshotId range [1, 4]."));
assertThat(
batchSql(