This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f4cd8c20cc [core] Unify timetravel processing to starting scanners 
(#5554)
f4cd8c20cc is described below

commit f4cd8c20cca6b2a052f2c93431b9a7af92ff7241
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Apr 29 20:39:26 2025 +0800

    [core] Unify timetravel processing to starting scanners (#5554)
---
 .../paimon/table/AbstractFileStoreTable.java       |  81 +++-------------
 .../StaticFromSnapshotStartingScanner.java         |  41 ++++----
 .../snapshot/StaticFromTagStartingScanner.java     |  11 ++-
 .../StaticFromTimestampStartingScanner.java        |  17 ++--
 .../StaticFromWatermarkStartingScanner.java        |  33 ++++---
 .../table/source/snapshot/TimeTravelUtil.java      | 107 ++++++++++-----------
 .../apache/paimon/table/system/ManifestsTable.java |   2 +-
 .../paimon/table/system/TableIndexesTable.java     |   2 +-
 .../table/source/snapshot/TimeTravelUtilsTest.java |  18 +++-
 .../paimon/table/system/ManifestsTableTest.java    |   3 +-
 .../apache/paimon/flink/BatchFileStoreITCase.java  |   4 +-
 .../apache/paimon/flink/BatchFileStoreITCase.java  |   9 +-
 12 files changed, 139 insertions(+), 189 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index f8090918eb..1e494799a4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -50,8 +50,6 @@ import org.apache.paimon.table.source.SplitGenerator;
 import org.apache.paimon.table.source.StreamDataTableScan;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.table.source.snapshot.SnapshotReaderImpl;
-import 
org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
-import 
org.apache.paimon.table.source.snapshot.StaticFromWatermarkStartingScanner;
 import org.apache.paimon.table.source.snapshot.TimeTravelUtil;
 import org.apache.paimon.utils.BranchManager;
 import org.apache.paimon.utils.CatalogBranchManager;
@@ -192,7 +190,7 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
 
     @Override
     public Optional<Statistics> statistics() {
-        Snapshot snapshot = TimeTravelUtil.resolveSnapshot(this);
+        Snapshot snapshot = TimeTravelUtil.tryTravelOrLatest(this);
         if (snapshot != null) {
             String file = snapshot.statistics();
             if (file == null) {
@@ -480,75 +478,18 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
     }
 
     private Optional<TableSchema> tryTimeTravel(Options options) {
-        CoreOptions coreOptions = new CoreOptions(options);
-
-        switch (coreOptions.startupMode()) {
-            case FROM_SNAPSHOT:
-            case FROM_SNAPSHOT_FULL:
-                if (coreOptions.scanVersion() != null) {
-                    return travelToVersion(coreOptions.scanVersion(), options);
-                } else if (coreOptions.scanSnapshotId() != null) {
-                    return travelToSnapshot(coreOptions.scanSnapshotId(), 
options);
-                } else if (coreOptions.scanWatermark() != null) {
-                    return travelToWatermark(coreOptions.scanWatermark(), 
options);
-                } else {
-                    return travelToTag(coreOptions.scanTagName(), options);
-                }
-            case FROM_TIMESTAMP:
-                Snapshot snapshot =
-                        
StaticFromTimestampStartingScanner.timeTravelToTimestamp(
-                                snapshotManager(), 
coreOptions.scanTimestampMills());
-                return travelToSnapshot(snapshot, options);
-            default:
-                return Optional.empty();
-        }
-    }
-
-    /** Tag first when travelling to a version. */
-    private Optional<TableSchema> travelToVersion(String version, Options 
options) {
-        options.remove(CoreOptions.SCAN_VERSION.key());
-        if (tagManager().tagExists(version)) {
-            options.set(CoreOptions.SCAN_TAG_NAME, version);
-            return travelToTag(version, options);
-        } else if (version.startsWith(WATERMARK_PREFIX)) {
-            long watermark = 
Long.parseLong(version.substring(WATERMARK_PREFIX.length()));
-            options.set(CoreOptions.SCAN_WATERMARK, watermark);
-            return travelToWatermark(watermark, options);
-        } else if (version.chars().allMatch(Character::isDigit)) {
-            options.set(CoreOptions.SCAN_SNAPSHOT_ID.key(), version);
-            return travelToSnapshot(Long.parseLong(version), options);
-        } else {
-            throw new RuntimeException("Cannot find a time travel version for 
" + version);
-        }
-    }
-
-    private Optional<TableSchema> travelToTag(String tagName, Options options) 
{
-        return 
travelToSnapshot(tagManager().getOrThrow(tagName).trimToSnapshot(), options);
-    }
-
-    private Optional<TableSchema> travelToSnapshot(long snapshotId, Options 
options) {
-        SnapshotManager snapshotManager = snapshotManager();
-        if (snapshotManager.snapshotExists(snapshotId)) {
-            return travelToSnapshot(snapshotManager.snapshot(snapshotId), 
options);
-        }
-        return Optional.empty();
-    }
-
-    private Optional<TableSchema> travelToWatermark(long watermark, Options 
options) {
-        Snapshot snapshot =
-                StaticFromWatermarkStartingScanner.timeTravelToWatermark(
-                        snapshotManager(), watermark);
-        if (snapshot != null) {
-            return 
Optional.of(schemaManager().schema(snapshot.schemaId()).copy(options.toMap()));
+        Snapshot snapshot;
+        try {
+            snapshot =
+                    TimeTravelUtil.tryTravelToSnapshot(options, 
snapshotManager(), tagManager())
+                            .orElse(null);
+        } catch (Exception e) {
+            return Optional.empty();
         }
-        return Optional.empty();
-    }
-
-    private Optional<TableSchema> travelToSnapshot(@Nullable Snapshot 
snapshot, Options options) {
-        if (snapshot != null) {
-            return 
Optional.of(schemaManager().schema(snapshot.schemaId()).copy(options.toMap()));
+        if (snapshot == null) {
+            return Optional.empty();
         }
-        return Optional.empty();
+        return 
Optional.of(schemaManager().schema(snapshot.schemaId()).copy(options.toMap()));
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
index 3621259a6a..ef9119187e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
@@ -19,11 +19,11 @@
 package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.utils.SnapshotManager;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.FileNotFoundException;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
@@ -33,9 +33,6 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
  */
 public class StaticFromSnapshotStartingScanner extends ReadPlanStartingScanner 
{
 
-    private static final Logger LOG =
-            LoggerFactory.getLogger(StaticFromSnapshotStartingScanner.class);
-
     public StaticFromSnapshotStartingScanner(SnapshotManager snapshotManager, 
long snapshotId) {
         super(snapshotManager);
         this.startingSnapshotId = snapshotId;
@@ -48,21 +45,29 @@ public class StaticFromSnapshotStartingScanner extends 
ReadPlanStartingScanner {
 
     @Override
     public SnapshotReader configure(SnapshotReader snapshotReader) {
-        Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
-        Long latestSnapshotId = snapshotManager.latestSnapshotId();
+        return 
snapshotReader.withMode(ScanMode.ALL).withSnapshot(getSnapshot());
+    }
 
-        if (earliestSnapshotId == null || latestSnapshotId == null) {
-            throw new IllegalArgumentException("There is currently no 
snapshot.");
-        }
+    public Snapshot getSnapshot() {
+        try {
+            return snapshotManager.tryGetSnapshot(startingSnapshotId);
+        } catch (FileNotFoundException e) {
+            Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
+            Long latestSnapshotId = snapshotManager.latestSnapshotId();
 
-        // Checks earlier whether the specified scan snapshot id is valid.
-        checkArgument(
-                startingSnapshotId >= earliestSnapshotId && startingSnapshotId 
<= latestSnapshotId,
-                "The specified scan snapshotId %s is out of available 
snapshotId range [%s, %s].",
-                startingSnapshotId,
-                earliestSnapshotId,
-                latestSnapshotId);
+            if (earliestSnapshotId == null || latestSnapshotId == null) {
+                throw new IllegalArgumentException("There is currently no 
snapshot.");
+            }
 
-        return 
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId);
+            // Checks earlier whether the specified scan snapshot id is valid.
+            checkArgument(
+                    startingSnapshotId >= earliestSnapshotId
+                            && startingSnapshotId <= latestSnapshotId,
+                    "The specified scan snapshotId %s is out of available 
snapshotId range [%s, %s].",
+                    startingSnapshotId,
+                    earliestSnapshotId,
+                    latestSnapshotId);
+            throw new RuntimeException(e);
+        }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java
index b22e17e9a0..3008c970e2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java
@@ -34,6 +34,12 @@ public class StaticFromTagStartingScanner extends 
ReadPlanStartingScanner {
         this.tagName = tagName;
     }
 
+    public Snapshot getSnapshot() {
+        TagManager tagManager =
+                new TagManager(snapshotManager.fileIO(), 
snapshotManager.tablePath());
+        return tagManager.getOrThrow(tagName).trimToSnapshot();
+    }
+
     @Override
     public ScanMode startingScanMode() {
         return ScanMode.ALL;
@@ -41,9 +47,6 @@ public class StaticFromTagStartingScanner extends 
ReadPlanStartingScanner {
 
     @Override
     public SnapshotReader configure(SnapshotReader snapshotReader) {
-        TagManager tagManager =
-                new TagManager(snapshotManager.fileIO(), 
snapshotManager.tablePath());
-        Snapshot snapshot = tagManager.getOrThrow(tagName).trimToSnapshot();
-        return snapshotReader.withMode(ScanMode.ALL).withSnapshot(snapshot);
+        return 
snapshotReader.withMode(ScanMode.ALL).withSnapshot(getSnapshot());
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
index 0f20938992..7026d2527e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
@@ -23,9 +23,6 @@ import org.apache.paimon.Snapshot;
 import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.utils.SnapshotManager;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import javax.annotation.Nullable;
 
 /**
@@ -34,15 +31,11 @@ import javax.annotation.Nullable;
  */
 public class StaticFromTimestampStartingScanner extends 
ReadPlanStartingScanner {
 
-    private static final Logger LOG =
-            LoggerFactory.getLogger(StaticFromTimestampStartingScanner.class);
-
-    private final long startupMillis;
+    private final Snapshot snapshot;
 
     public StaticFromTimestampStartingScanner(SnapshotManager snapshotManager, 
long startupMillis) {
         super(snapshotManager);
-        this.startupMillis = startupMillis;
-        Snapshot snapshot = timeTravelToTimestamp(snapshotManager, 
startupMillis);
+        this.snapshot = timeTravelToTimestamp(snapshotManager, startupMillis);
         if (snapshot == null) {
             Snapshot earliestSnapshot = snapshotManager.earliestSnapshot();
             throw new IllegalArgumentException(
@@ -56,9 +49,13 @@ public class StaticFromTimestampStartingScanner extends 
ReadPlanStartingScanner
         this.startingSnapshotId = snapshot.id();
     }
 
+    public Snapshot getSnapshot() {
+        return snapshot;
+    }
+
     @Override
     public SnapshotReader configure(SnapshotReader snapshotReader) {
-        return 
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId);
+        return snapshotReader.withMode(ScanMode.ALL).withSnapshot(snapshot);
     }
 
     @Nullable
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromWatermarkStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromWatermarkStartingScanner.java
index 72fba55337..ad4d1f7892 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromWatermarkStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromWatermarkStartingScanner.java
@@ -34,15 +34,26 @@ public class StaticFromWatermarkStartingScanner extends 
ReadPlanStartingScanner
     private static final Logger LOG =
             LoggerFactory.getLogger(StaticFromWatermarkStartingScanner.class);
 
-    private final long watermark;
+    private final Snapshot snapshot;
 
     public StaticFromWatermarkStartingScanner(SnapshotManager snapshotManager, 
long watermark) {
         super(snapshotManager);
-        this.watermark = watermark;
-        Snapshot snapshot = timeTravelToWatermark(snapshotManager, watermark);
-        if (snapshot != null) {
-            this.startingSnapshotId = snapshot.id();
+        this.snapshot = timeTravelToWatermark(snapshotManager, watermark);
+        if (snapshot == null) {
+            LOG.warn(
+                    "There is currently no snapshot later than or equal to 
watermark[{}]",
+                    watermark);
+            throw new RuntimeException(
+                    String.format(
+                            "There is currently no snapshot later than or 
equal to "
+                                    + "watermark[%d]",
+                            watermark));
         }
+        this.startingSnapshotId = snapshot.id();
+    }
+
+    public Snapshot getSnapshot() {
+        return snapshot;
     }
 
     @Override
@@ -52,17 +63,7 @@ public class StaticFromWatermarkStartingScanner extends 
ReadPlanStartingScanner
 
     @Override
     public SnapshotReader configure(SnapshotReader snapshotReader) {
-        if (startingSnapshotId == null) {
-            LOG.warn(
-                    "There is currently no snapshot later than or equal to 
watermark[{}]",
-                    watermark);
-            throw new RuntimeException(
-                    String.format(
-                            "There is currently no snapshot later than or 
equal to "
-                                    + "watermark[%d]",
-                            watermark));
-        }
-        return 
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId);
+        return snapshotReader.withMode(ScanMode.ALL).withSnapshot(snapshot);
     }
 
     @Nullable
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
index 5f7126d8ee..c2fa1ac5be 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
@@ -20,13 +20,13 @@ package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.utils.ChangelogManager;
 import org.apache.paimon.utils.FunctionWithException;
 import org.apache.paimon.utils.SnapshotManager;
-import org.apache.paimon.utils.SnapshotNotExistException;
 import org.apache.paimon.utils.TagManager;
 
 import org.slf4j.Logger;
@@ -37,8 +37,8 @@ import javax.annotation.Nullable;
 import java.io.FileNotFoundException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 
-import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static 
org.apache.paimon.utils.SnapshotManager.EARLIEST_SNAPSHOT_DEFAULT_RETRY_NUM;
 
@@ -47,6 +47,8 @@ public class TimeTravelUtil {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(TimeTravelUtil.class);
 
+    private static final String WATERMARK_PREFIX = "watermark-";
+
     private static final String[] SCAN_KEYS = {
         CoreOptions.SCAN_SNAPSHOT_ID.key(),
         CoreOptions.SCAN_TAG_NAME.key(),
@@ -54,21 +56,28 @@ public class TimeTravelUtil {
         CoreOptions.SCAN_TIMESTAMP_MILLIS.key()
     };
 
-    public static Snapshot resolveSnapshot(FileStoreTable table) {
-        return resolveSnapshotFromOptions(table.coreOptions(), 
table.snapshotManager());
+    public static Snapshot tryTravelOrLatest(FileStoreTable table) {
+        return tryTravelToSnapshot(table).orElseGet(() -> 
table.latestSnapshot().orElse(null));
+    }
+
+    public static Optional<Snapshot> tryTravelToSnapshot(FileStoreTable table) 
{
+        return tryTravelToSnapshot(
+                table.coreOptions().toConfiguration(), 
table.snapshotManager(), table.tagManager());
     }
 
-    public static Snapshot resolveSnapshotFromOptions(
-            CoreOptions options, SnapshotManager snapshotManager) {
+    public static Optional<Snapshot> tryTravelToSnapshot(
+            Options options, SnapshotManager snapshotManager, TagManager 
tagManager) {
+        adaptScanVersion(options, tagManager);
+
         List<String> scanHandleKey = new ArrayList<>(1);
         for (String key : SCAN_KEYS) {
-            if (options.toConfiguration().containsKey(key)) {
+            if (options.containsKey(key)) {
                 scanHandleKey.add(key);
             }
         }
 
-        if (scanHandleKey.size() == 0) {
-            return snapshotManager.latestSnapshot();
+        if (scanHandleKey.isEmpty()) {
+            return Optional.empty();
         }
 
         checkArgument(
@@ -81,61 +90,49 @@ public class TimeTravelUtil {
                         CoreOptions.SCAN_TIMESTAMP_MILLIS.key()));
 
         String key = scanHandleKey.get(0);
-        Snapshot snapshot = null;
+        CoreOptions coreOptions = new CoreOptions(options);
+        Snapshot snapshot;
         if (key.equals(CoreOptions.SCAN_SNAPSHOT_ID.key())) {
-            snapshot = resolveSnapshotBySnapshotId(snapshotManager, options);
+            snapshot =
+                    new StaticFromSnapshotStartingScanner(
+                                    snapshotManager, 
coreOptions.scanSnapshotId())
+                            .getSnapshot();
         } else if (key.equals(CoreOptions.SCAN_WATERMARK.key())) {
-            snapshot = resolveSnapshotByWatermark(snapshotManager, options);
+            snapshot =
+                    new StaticFromWatermarkStartingScanner(
+                                    snapshotManager, 
coreOptions.scanWatermark())
+                            .getSnapshot();
         } else if (key.equals(CoreOptions.SCAN_TIMESTAMP_MILLIS.key())) {
-            snapshot = resolveSnapshotByTimestamp(snapshotManager, options);
+            snapshot =
+                    new StaticFromTimestampStartingScanner(
+                                    snapshotManager, 
coreOptions.scanTimestampMills())
+                            .getSnapshot();
         } else if (key.equals(CoreOptions.SCAN_TAG_NAME.key())) {
-            snapshot = resolveSnapshotByTagName(snapshotManager, options);
-        }
-
-        if (snapshot == null) {
-            snapshot = snapshotManager.latestSnapshot();
+            snapshot =
+                    new StaticFromTagStartingScanner(snapshotManager, 
coreOptions.scanTagName())
+                            .getSnapshot();
+        } else {
+            throw new UnsupportedOperationException("Unsupported time travel 
mode: " + key);
         }
-        return snapshot;
+        return Optional.of(snapshot);
     }
 
-    private static Snapshot resolveSnapshotBySnapshotId(
-            SnapshotManager snapshotManager, CoreOptions options) {
-        Long snapshotId = options.scanSnapshotId();
-        if (snapshotId != null) {
-            if (!snapshotManager.snapshotExists(snapshotId)) {
-                Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
-                Long latestSnapshotId = snapshotManager.latestSnapshotId();
-                throw new SnapshotNotExistException(
-                        String.format(
-                                "Specified parameter %s = %s is not exist, you 
can set it in range from %s to %s.",
-                                SCAN_SNAPSHOT_ID.key(),
-                                snapshotId,
-                                earliestSnapshotId,
-                                latestSnapshotId));
-            }
-            return snapshotManager.snapshot(snapshotId);
+    private static void adaptScanVersion(Options options, TagManager 
tagManager) {
+        String version = options.remove(CoreOptions.SCAN_VERSION.key());
+        if (version == null) {
+            return;
         }
-        return null;
-    }
-
-    private static Snapshot resolveSnapshotByTimestamp(
-            SnapshotManager snapshotManager, CoreOptions options) {
-        Long timestamp = options.scanTimestampMills();
-        return snapshotManager.earlierOrEqualTimeMills(timestamp);
-    }
 
-    private static Snapshot resolveSnapshotByWatermark(
-            SnapshotManager snapshotManager, CoreOptions options) {
-        Long watermark = options.scanWatermark();
-        return snapshotManager.laterOrEqualWatermark(watermark);
-    }
-
-    private static Snapshot resolveSnapshotByTagName(
-            SnapshotManager snapshotManager, CoreOptions options) {
-        String tagName = options.scanTagName();
-        TagManager tagManager =
-                new TagManager(snapshotManager.fileIO(), 
snapshotManager.tablePath());
-        return tagManager.getOrThrow(tagName).trimToSnapshot();
+        if (tagManager.tagExists(version)) {
+            options.set(CoreOptions.SCAN_TAG_NAME, version);
+        } else if (version.startsWith(WATERMARK_PREFIX)) {
+            long watermark = 
Long.parseLong(version.substring(WATERMARK_PREFIX.length()));
+            options.set(CoreOptions.SCAN_WATERMARK, watermark);
+        } else if (version.chars().allMatch(Character::isDigit)) {
+            options.set(CoreOptions.SCAN_SNAPSHOT_ID.key(), version);
+        } else {
+            throw new RuntimeException("Cannot find a time travel version for 
" + version);
+        }
     }
 
     /**
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
index 549fa6d83f..d1e3307b9b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
@@ -229,7 +229,7 @@ public class ManifestsTable implements ReadonlyTable {
 
     private static List<ManifestFileMeta> allManifests(FileStoreTable 
dataTable) {
         CoreOptions options = dataTable.coreOptions();
-        Snapshot snapshot = TimeTravelUtil.resolveSnapshot(dataTable);
+        Snapshot snapshot = TimeTravelUtil.tryTravelOrLatest(dataTable);
         if (snapshot == null) {
             LOG.warn("Check if your snapshot is empty.");
             return Collections.emptyList();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java
index 647bdd389c..b63cd604a7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java
@@ -230,7 +230,7 @@ public class TableIndexesTable implements ReadonlyTable {
 
     private static List<IndexManifestEntry> allIndexEntries(FileStoreTable 
dataTable) {
         IndexFileHandler indexFileHandler = 
dataTable.store().newIndexFileHandler();
-        Snapshot snapshot = TimeTravelUtil.resolveSnapshot(dataTable);
+        Snapshot snapshot = TimeTravelUtil.tryTravelOrLatest(dataTable);
         if (snapshot == null) {
             LOG.warn("Check if your snapshot is empty.");
             return Collections.emptyList();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/TimeTravelUtilsTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/TimeTravelUtilsTest.java
index 89b3bff664..36ed5d23bf 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/TimeTravelUtilsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/TimeTravelUtilsTest.java
@@ -37,7 +37,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class TimeTravelUtilsTest extends ScannerTestBase {
 
     @Test
-    public void testResolveSnapshotFromOptions() throws Exception {
+    public void testtryTravelToSnapshot() throws Exception {
         SnapshotManager snapshotManager = table.snapshotManager();
         StreamTableWrite write = table.newWrite(commitUser);
         StreamTableCommit commit = table.newCommit(commitUser);
@@ -56,21 +56,27 @@ public class TimeTravelUtilsTest extends ScannerTestBase {
         HashMap<String, String> optMap = new HashMap<>(4);
         optMap.put("scan.snapshot-id", "2");
         CoreOptions options = CoreOptions.fromMap(optMap);
-        Snapshot snapshot = TimeTravelUtil.resolveSnapshotFromOptions(options, 
snapshotManager);
+        Snapshot snapshot =
+                TimeTravelUtil.tryTravelToSnapshot(options.toConfiguration(), 
snapshotManager, null)
+                        .orElse(null);
         assertNotNull(snapshot);
         assertTrue(snapshot.id() == 2);
 
         optMap.clear();
         optMap.put("scan.timestamp-millis", ts + "");
         options = CoreOptions.fromMap(optMap);
-        snapshot = TimeTravelUtil.resolveSnapshotFromOptions(options, 
snapshotManager);
+        snapshot =
+                TimeTravelUtil.tryTravelToSnapshot(options.toConfiguration(), 
snapshotManager, null)
+                        .orElse(null);
         assertTrue(snapshot.id() == 1);
 
         table.createTag("tag3", 3);
         optMap.clear();
         optMap.put("scan.tag-name", "tag3");
         options = CoreOptions.fromMap(optMap);
-        snapshot = TimeTravelUtil.resolveSnapshotFromOptions(options, 
snapshotManager);
+        snapshot =
+                TimeTravelUtil.tryTravelToSnapshot(options.toConfiguration(), 
snapshotManager, null)
+                        .orElse(null);
         assertTrue(snapshot.id() == 3);
 
         // if contain more scan.xxx config would throw out
@@ -78,7 +84,9 @@ public class TimeTravelUtilsTest extends ScannerTestBase {
         CoreOptions options1 = CoreOptions.fromMap(optMap);
         assertThrows(
                 IllegalArgumentException.class,
-                () -> TimeTravelUtil.resolveSnapshotFromOptions(options1, 
snapshotManager),
+                () ->
+                        TimeTravelUtil.tryTravelToSnapshot(
+                                options1.toConfiguration(), snapshotManager, 
null),
                 "scan.snapshot-id scan.tag-name scan.watermark and 
scan.timestamp-millis can contains only one");
 
         assertThat(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
index c5bb5282fd..c614ca2c27 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
@@ -36,7 +36,6 @@ import org.apache.paimon.table.Table;
 import org.apache.paimon.table.TableTestBase;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.utils.SnapshotManager;
-import org.apache.paimon.utils.SnapshotNotExistException;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -157,7 +156,7 @@ public class ManifestsTableTest extends TableTestBase {
                         manifestsTable.copy(
                                 
Collections.singletonMap(CoreOptions.SCAN_SNAPSHOT_ID.key(), "3"));
         assertThrows(
-                SnapshotNotExistException.class,
+                Exception.class,
                 () -> read(manifestsTable),
                 "Specified parameter scan.snapshot-id = 3 is not exist, you 
can set it in range from 1 to 2");
     }
diff --git 
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 80575c0492..b5e355ee92 100644
--- 
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -80,7 +80,7 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
         assertThatThrownBy(() -> batchSql("SELECT * FROM T /*+ 
OPTIONS('scan.snapshot-id'='0') */"))
                 .satisfies(
                         anyCauseMatches(
-                                IllegalArgumentException.class,
+                                Exception.class,
                                 "The specified scan snapshotId 0 is out of 
available snapshotId range [1, 4]."));
 
         assertThatThrownBy(
@@ -89,7 +89,7 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
                                         "SELECT * FROM T /*+ 
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='0') */"))
                 .satisfies(
                         anyCauseMatches(
-                                IllegalArgumentException.class,
+                                Exception.class,
                                 "The specified scan snapshotId 0 is out of 
available snapshotId range [1, 4]."));
 
         assertThat(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 5f9773a0cf..144a2d5a8f 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -25,7 +25,6 @@ import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.snapshot.TimeTravelUtil;
 import org.apache.paimon.utils.BlockingIterator;
 import org.apache.paimon.utils.DateTimeUtils;
-import org.apache.paimon.utils.SnapshotNotExistException;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 
@@ -120,8 +119,8 @@ public class BatchFileStoreITCase extends CatalogITCaseBase 
{
         assertThatThrownBy(() -> batchSql("SELECT * FROM T /*+ 
OPTIONS('scan.snapshot-id'='0') */"))
                 .satisfies(
                         anyCauseMatches(
-                                SnapshotNotExistException.class,
-                                "Specified parameter scan.snapshot-id = 0 is 
not exist, you can set it in range from 1 to 4."));
+                                Exception.class,
+                                "The specified scan snapshotId 0 is out of 
available snapshotId range [1, 4]."));
 
         assertThatThrownBy(
                         () ->
@@ -129,8 +128,8 @@ public class BatchFileStoreITCase extends CatalogITCaseBase 
{
                                         "SELECT * FROM T /*+ 
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='0') */"))
                 .satisfies(
                         anyCauseMatches(
-                                SnapshotNotExistException.class,
-                                "Specified parameter scan.snapshot-id = 0 is 
not exist, you can set it in range from 1 to 4."));
+                                Exception.class,
+                                "The specified scan snapshotId 0 is out of 
available snapshotId range [1, 4]."));
 
         assertThat(
                         batchSql(

Reply via email to