This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c38526c62 [python] Support time travel by timestamp and watermark in 
TimeTravelUtil (#7899)
1c38526c62 is described below

commit 1c38526c62426ac35d9329b6e3422edb852ba9a7
Author: Junrui Lee <[email protected]>
AuthorDate: Sat May 23 22:55:40 2026 +0800

    [python] Support time travel by timestamp and watermark in TimeTravelUtil 
(#7899)
    
    Currently pypaimon's TimeTravelUtil only supports time travel by
    scan.tag-name and scan.snapshot-id. This PR adds support for three
    additional time travel modes that are available on the Java side:
    
    - scan.timestamp-millis: Travel to the latest snapshot with commit time
    <= the given timestamp (milliseconds)
    - scan.timestamp: Same as above but accepts a human-readable timestamp
    string (e.g. '2023-12-01 12:00:00')
    - scan.watermark: Travel to the first snapshot with watermark >= the
    given value
---
 .../pypaimon/common/options/core_options.py        |  39 ++++++
 paimon-python/pypaimon/read/table_scan.py          |  41 +++---
 .../pypaimon/snapshot/snapshot_manager.py          |  50 ++++++++
 .../pypaimon/snapshot/time_travel_util.py          |  79 +++++++++++-
 .../pypaimon/tests/time_travel_util_test.py        | 138 ++++++++++++++++++++-
 5 files changed, 320 insertions(+), 27 deletions(-)

diff --git a/paimon-python/pypaimon/common/options/core_options.py 
b/paimon-python/pypaimon/common/options/core_options.py
index 2d140b9539..9280705366 100644
--- a/paimon-python/pypaimon/common/options/core_options.py
+++ b/paimon-python/pypaimon/common/options/core_options.py
@@ -271,6 +271,36 @@ class CoreOptions:
         )
     )
 
+    SCAN_TIMESTAMP_MILLIS: ConfigOption[int] = (
+        ConfigOptions.key("scan.timestamp-millis")
+        .long_type()
+        .no_default_value()
+        .with_description(
+            "Optional timestamp in milliseconds used for time travel to the "
+            "latest snapshot equal to or earlier than the given timestamp."
+        )
+    )
+
+    SCAN_TIMESTAMP: ConfigOption[str] = (
+        ConfigOptions.key("scan.timestamp")
+        .string_type()
+        .no_default_value()
+        .with_description(
+            "Optional timestamp string (e.g. '2023-12-01 12:00:00') used for "
+            "time travel. Will be converted to milliseconds internally."
+        )
+    )
+
+    SCAN_WATERMARK: ConfigOption[int] = (
+        ConfigOptions.key("scan.watermark")
+        .long_type()
+        .no_default_value()
+        .with_description(
+            "Optional watermark used for time travel to the first snapshot "
+            "with watermark greater than or equal to the given value."
+        )
+    )
+
     SOURCE_SPLIT_TARGET_SIZE: ConfigOption[MemorySize] = (
         ConfigOptions.key("source.split.target-size")
         .memory_type()
@@ -637,6 +667,15 @@ class CoreOptions:
     def scan_snapshot_id(self, default=None):
         return self.options.get(CoreOptions.SCAN_SNAPSHOT_ID, default)
 
+    def scan_timestamp_millis(self, default=None):
+        return self.options.get(CoreOptions.SCAN_TIMESTAMP_MILLIS, default)
+
+    def scan_timestamp(self, default=None):
+        return self.options.get(CoreOptions.SCAN_TIMESTAMP, default)
+
+    def scan_watermark(self, default=None):
+        return self.options.get(CoreOptions.SCAN_WATERMARK, default)
+
     def source_split_target_size(self, default=None):
         return self.options.get(CoreOptions.SOURCE_SPLIT_TARGET_SIZE, 
default).get_bytes()
 
diff --git a/paimon-python/pypaimon/read/table_scan.py 
b/paimon-python/pypaimon/read/table_scan.py
index bc610134e0..6232618035 100755
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -57,7 +57,18 @@ class TableScan:
         options = self.table.options.options
         snapshot_manager = self.table.snapshot_manager()
         manifest_list_manager = ManifestListManager(self.table)
-        if options.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP):
+
+        from pypaimon.snapshot.time_travel_util import TimeTravelUtil, 
SCAN_KEYS
+        has_time_travel = any(options.contains_key(key) for key in SCAN_KEYS)
+        has_incremental = 
options.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP)
+
+        if has_incremental and has_time_travel:
+            raise ValueError(
+                "incremental-between-timestamp cannot be used together with "
+                "point-in-time scan options: %s" % SCAN_KEYS
+            )
+
+        if has_incremental:
             ts = 
options.get(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP).split(",")
             if len(ts) != 2:
                 raise ValueError(
@@ -106,35 +117,21 @@ class TableScan:
                 return manifests, end_snapshot
 
             return FileScanner(self.table, incremental_manifest, 
self.predicate, self.limit)
-        elif options.contains(CoreOptions.SCAN_TAG_NAME):  # Handle tag-based 
reading
-            tag_name = options.get(CoreOptions.SCAN_TAG_NAME)
-
-            def tag_manifest_scanner():
-                tag_manager = self.table.tag_manager()
-                tag = tag_manager.get_or_throw(tag_name)
-                snapshot = tag.trim_to_snapshot()
-                return manifest_list_manager.read_all(snapshot), snapshot
-
-            return FileScanner(
-                self.table,
-                tag_manifest_scanner,
-                self.predicate,
-                self.limit
-            )
-        elif options.contains(CoreOptions.SCAN_SNAPSHOT_ID):  # Handle 
snapshot-id-based reading
-            snapshot_id = int(options.get(CoreOptions.SCAN_SNAPSHOT_ID))
 
-            def snapshot_id_manifest_scanner():
-                snapshot = snapshot_manager.get_snapshot_by_id(snapshot_id)
+        if has_time_travel:
+            def time_travel_manifest_scanner():
+                snapshot = TimeTravelUtil.try_travel_to_snapshot(
+                    options, self.table.tag_manager(), snapshot_manager
+                )
                 if snapshot is None:
                     raise ValueError(
-                        "Snapshot id %d does not exist" % snapshot_id
+                        "Could not resolve time travel snapshot from scan 
options."
                     )
                 return manifest_list_manager.read_all(snapshot), snapshot
 
             return FileScanner(
                 self.table,
-                snapshot_id_manifest_scanner,
+                time_travel_manifest_scanner,
                 self.predicate,
                 self.limit
             )
diff --git a/paimon-python/pypaimon/snapshot/snapshot_manager.py 
b/paimon-python/pypaimon/snapshot/snapshot_manager.py
index c8f1f4818e..41a1ac81ba 100644
--- a/paimon-python/pypaimon/snapshot/snapshot_manager.py
+++ b/paimon-python/pypaimon/snapshot/snapshot_manager.py
@@ -257,6 +257,56 @@ class SnapshotManager:
                 snapshots.append(snapshot)
         return snapshots
 
+    def later_or_equal_watermark(self, watermark: int) -> Optional[Snapshot]:
+        """
+        Find the first snapshot with watermark >= the given value.
+
+        Args:
+            watermark: The watermark value to compare against
+
+        Returns:
+            The first snapshot with watermark >= the given value, or None if
+            no such snapshot exists
+        """
+        earliest_snap = self.try_get_earliest_snapshot()
+        latest_snap = self.get_latest_snapshot()
+
+        if earliest_snap is None or latest_snap is None:
+            return None
+
+        earliest = earliest_snap.id
+        latest = latest_snap.id
+        result = None
+
+        while earliest <= latest:
+            mid = earliest + (latest - earliest) // 2
+            snapshot = self.get_snapshot_by_id(mid)
+
+            if snapshot is None:
+                found = False
+                for i in range(mid + 1, latest + 1):
+                    snapshot = self.get_snapshot_by_id(i)
+                    if snapshot is not None:
+                        mid = i
+                        found = True
+                        break
+                if not found:
+                    latest = mid - 1
+                    continue
+
+            snap_watermark = snapshot.watermark
+            if snap_watermark is None:
+                earliest = mid + 1
+                continue
+
+            if snap_watermark >= watermark:
+                result = snapshot
+                latest = mid - 1
+            else:
+                earliest = mid + 1
+
+        return result
+
     def get_snapshot_by_id(self, snapshot_id: int) -> Optional[Snapshot]:
         """
         Get a snapshot by its ID.
diff --git a/paimon-python/pypaimon/snapshot/time_travel_util.py 
b/paimon-python/pypaimon/snapshot/time_travel_util.py
index de927f5f99..8a8609b147 100644
--- a/paimon-python/pypaimon/snapshot/time_travel_util.py
+++ b/paimon-python/pypaimon/snapshot/time_travel_util.py
@@ -17,6 +17,7 @@
 
 """The util class of resolve snapshot from scan params for time travel."""
 
+from datetime import datetime
 from typing import Optional
 
 from pypaimon.common.options.core_options import CoreOptions
@@ -25,11 +26,42 @@ from pypaimon.snapshot.snapshot import Snapshot
 from pypaimon.tag.tag_manager import TagManager
 
 SCAN_KEYS = [
-    CoreOptions.SCAN_TAG_NAME.key(),
     CoreOptions.SCAN_SNAPSHOT_ID.key(),
+    CoreOptions.SCAN_TAG_NAME.key(),
+    CoreOptions.SCAN_WATERMARK.key(),
+    CoreOptions.SCAN_TIMESTAMP.key(),
+    CoreOptions.SCAN_TIMESTAMP_MILLIS.key(),
 ]
 
 
+def _parse_timestamp_to_millis(timestamp_str: str) -> int:
+    """Parse a timestamp string to milliseconds since epoch using local 
timezone.
+
+    Consistent with Java's TimeZone.getDefault() behavior in TimeTravelUtil.
+
+    Supports formats:
+    - '2023-12-01 12:00:00'
+    - '2023-12-01T12:00:00'
+    - '2023-12-01 12:00:00.123'
+    """
+    formats = [
+        "%Y-%m-%d %H:%M:%S.%f",
+        "%Y-%m-%d %H:%M:%S",
+        "%Y-%m-%dT%H:%M:%S.%f",
+        "%Y-%m-%dT%H:%M:%S",
+    ]
+    for fmt in formats:
+        try:
+            dt = datetime.strptime(timestamp_str, fmt)
+            return int(dt.timestamp() * 1000)
+        except ValueError:
+            continue
+    raise ValueError(
+        f"Cannot parse timestamp '{timestamp_str}'. "
+        f"Expected format: 'yyyy-MM-dd HH:mm:ss' or 'yyyy-MM-dd HH:mm:ss.SSS'"
+    )
+
+
 class TimeTravelUtil:
     """The util class of resolve snapshot from scan params for time travel."""
 
@@ -45,18 +77,22 @@ class TimeTravelUtil:
         Supports the following time travel options:
         - scan.tag-name: Travel to a specific tag
         - scan.snapshot-id: Travel to a specific snapshot id
+        - scan.timestamp-millis: Travel to the latest snapshot <= the given 
timestamp (ms)
+        - scan.timestamp: Travel by timestamp string (e.g. '2023-12-01 
12:00:00')
+        - scan.watermark: Travel to the first snapshot with watermark >= the 
given value
 
         Args:
             options: The options containing time travel parameters
             tag_manager: The tag manager
             snapshot_manager: The snapshot manager, required when
-                ``scan.snapshot-id`` is set
+                using snapshot-id, timestamp, or watermark based time travel
 
         Returns:
             The Snapshot to travel to, or None if no time travel option is set.
 
         Raises:
-            ValueError: If more than one time travel option is set
+            ValueError: If more than one time travel option is set, or if the
+                required manager is not provided
         """
 
         scan_handle_keys = [key for key in SCAN_KEYS if 
options.contains_key(key)]
@@ -86,5 +122,42 @@ class TimeTravelUtil:
             if snapshot is None:
                 raise ValueError(f"Snapshot id '{snapshot_id}' doesn't exist.")
             return snapshot
+        elif key == CoreOptions.SCAN_TIMESTAMP_MILLIS.key():
+            if snapshot_manager is None:
+                raise ValueError(
+                    "snapshot_manager is required to resolve 
scan.timestamp-millis"
+                )
+            timestamp_millis = int(core_options.scan_timestamp_millis())
+            snapshot = 
snapshot_manager.earlier_or_equal_time_mills(timestamp_millis)
+            if snapshot is None:
+                raise ValueError(
+                    f"No snapshot found with timestamp earlier than or equal 
to {timestamp_millis}ms."
+                )
+            return snapshot
+        elif key == CoreOptions.SCAN_TIMESTAMP.key():
+            if snapshot_manager is None:
+                raise ValueError(
+                    "snapshot_manager is required to resolve scan.timestamp"
+                )
+            timestamp_str = core_options.scan_timestamp()
+            timestamp_millis = _parse_timestamp_to_millis(timestamp_str)
+            snapshot = 
snapshot_manager.earlier_or_equal_time_mills(timestamp_millis)
+            if snapshot is None:
+                raise ValueError(
+                    f"No snapshot found with timestamp earlier than or equal 
to '{timestamp_str}'."
+                )
+            return snapshot
+        elif key == CoreOptions.SCAN_WATERMARK.key():
+            if snapshot_manager is None:
+                raise ValueError(
+                    "snapshot_manager is required to resolve scan.watermark"
+                )
+            watermark = int(core_options.scan_watermark())
+            snapshot = snapshot_manager.later_or_equal_watermark(watermark)
+            if snapshot is None:
+                raise ValueError(
+                    f"No snapshot found with watermark greater than or equal 
to {watermark}."
+                )
+            return snapshot
         else:
             raise ValueError(f"Unsupported time travel mode: {key}")
diff --git a/paimon-python/pypaimon/tests/time_travel_util_test.py 
b/paimon-python/pypaimon/tests/time_travel_util_test.py
index 10372164c7..7b8b139f12 100644
--- a/paimon-python/pypaimon/tests/time_travel_util_test.py
+++ b/paimon-python/pypaimon/tests/time_travel_util_test.py
@@ -22,9 +22,11 @@ from pypaimon.snapshot.time_travel_util import SCAN_KEYS, 
TimeTravelUtil
 
 
 class _StubSnapshot:
-    def __init__(self, snapshot_id, schema_id=0):
+    def __init__(self, snapshot_id, schema_id=0, time_millis=0, 
watermark=None):
         self.id = snapshot_id
         self.schema_id = schema_id
+        self.time_millis = time_millis
+        self.watermark = watermark
 
 
 class _StubSnapshotManager:
@@ -34,6 +36,31 @@ class _StubSnapshotManager:
     def get_snapshot_by_id(self, snapshot_id):
         return self._snapshots.get(snapshot_id)
 
+    def try_get_earliest_snapshot(self):
+        if not self._snapshots:
+            return None
+        return self._snapshots[min(self._snapshots.keys())]
+
+    def get_latest_snapshot(self):
+        if not self._snapshots:
+            return None
+        return self._snapshots[max(self._snapshots.keys())]
+
+    def earlier_or_equal_time_mills(self, timestamp):
+        result = None
+        for snap in sorted(self._snapshots.values(), key=lambda s: s.id):
+            if snap.time_millis <= timestamp:
+                result = snap
+            else:
+                break
+        return result
+
+    def later_or_equal_watermark(self, watermark):
+        for snap in sorted(self._snapshots.values(), key=lambda s: s.id):
+            if snap.watermark is not None and snap.watermark >= watermark:
+                return snap
+        return None
+
 
 class _StubTagManager:
     def __init__(self, tags):
@@ -87,10 +114,117 @@ class TimeTravelUtilTest(unittest.TestCase):
             )
 
     def test_scan_keys_contains_both_options(self):
-        # Sanity check: SCAN_KEYS must enumerate both time-travel modes,
+        # Sanity check: SCAN_KEYS must enumerate all time-travel modes,
         # otherwise the mutual-exclusion guard above would not trigger.
         self.assertIn('scan.snapshot-id', SCAN_KEYS)
         self.assertIn('scan.tag-name', SCAN_KEYS)
+        self.assertIn('scan.timestamp-millis', SCAN_KEYS)
+        self.assertIn('scan.timestamp', SCAN_KEYS)
+        self.assertIn('scan.watermark', SCAN_KEYS)
+
+    def test_resolves_timestamp_millis(self):
+        snap1 = _StubSnapshot(1, time_millis=1000)
+        snap2 = _StubSnapshot(2, time_millis=2000)
+        snap3 = _StubSnapshot(3, time_millis=3000)
+        mgr = _StubSnapshotManager([snap1, snap2, snap3])
+        result = TimeTravelUtil.try_travel_to_snapshot(
+            Options({'scan.timestamp-millis': '2500'}),
+            _StubTagManager({}),
+            mgr,
+        )
+        self.assertIs(result, snap2)
+
+    def test_resolves_timestamp_millis_exact_match(self):
+        snap1 = _StubSnapshot(1, time_millis=1000)
+        snap2 = _StubSnapshot(2, time_millis=2000)
+        mgr = _StubSnapshotManager([snap1, snap2])
+        result = TimeTravelUtil.try_travel_to_snapshot(
+            Options({'scan.timestamp-millis': '2000'}),
+            _StubTagManager({}),
+            mgr,
+        )
+        self.assertIs(result, snap2)
+
+    def test_timestamp_millis_no_match_raises(self):
+        snap1 = _StubSnapshot(1, time_millis=5000)
+        mgr = _StubSnapshotManager([snap1])
+        with self.assertRaises(ValueError):
+            TimeTravelUtil.try_travel_to_snapshot(
+                Options({'scan.timestamp-millis': '1000'}),
+                _StubTagManager({}),
+                mgr,
+            )
+
+    def test_timestamp_millis_without_snapshot_manager_raises(self):
+        with self.assertRaises(ValueError):
+            TimeTravelUtil.try_travel_to_snapshot(
+                Options({'scan.timestamp-millis': '1000'}),
+                _StubTagManager({}),
+                None,
+            )
+
+    def test_resolves_timestamp_string(self):
+        # Compute expected millis in local timezone, consistent with Java 
behavior
+        from datetime import datetime
+        expected_millis = int(datetime(2023, 12, 1, 0, 0, 0).timestamp() * 
1000)
+        snap1 = _StubSnapshot(1, time_millis=expected_millis)
+        snap2 = _StubSnapshot(2, time_millis=expected_millis + 100000)
+        mgr = _StubSnapshotManager([snap1, snap2])
+        result = TimeTravelUtil.try_travel_to_snapshot(
+            Options({'scan.timestamp': '2023-12-01 00:00:00'}),
+            _StubTagManager({}),
+            mgr,
+        )
+        self.assertIs(result, snap1)
+
+    def test_timestamp_string_invalid_format_raises(self):
+        snap1 = _StubSnapshot(1, time_millis=1000)
+        mgr = _StubSnapshotManager([snap1])
+        with self.assertRaises(ValueError):
+            TimeTravelUtil.try_travel_to_snapshot(
+                Options({'scan.timestamp': 'not-a-timestamp'}),
+                _StubTagManager({}),
+                mgr,
+            )
+
+    def test_resolves_watermark(self):
+        snap1 = _StubSnapshot(1, watermark=100)
+        snap2 = _StubSnapshot(2, watermark=200)
+        snap3 = _StubSnapshot(3, watermark=300)
+        mgr = _StubSnapshotManager([snap1, snap2, snap3])
+        result = TimeTravelUtil.try_travel_to_snapshot(
+            Options({'scan.watermark': '200'}),
+            _StubTagManager({}),
+            mgr,
+        )
+        self.assertIs(result, snap2)
+
+    def test_watermark_no_match_raises(self):
+        snap1 = _StubSnapshot(1, watermark=100)
+        mgr = _StubSnapshotManager([snap1])
+        with self.assertRaises(ValueError):
+            TimeTravelUtil.try_travel_to_snapshot(
+                Options({'scan.watermark': '500'}),
+                _StubTagManager({}),
+                mgr,
+            )
+
+    def test_watermark_without_snapshot_manager_raises(self):
+        with self.assertRaises(ValueError):
+            TimeTravelUtil.try_travel_to_snapshot(
+                Options({'scan.watermark': '100'}),
+                _StubTagManager({}),
+                None,
+            )
+
+    def test_rejects_multiple_time_travel_options(self):
+        mgr = _StubSnapshotManager([_StubSnapshot(1, time_millis=1000)])
+        with self.assertRaises(ValueError):
+            TimeTravelUtil.try_travel_to_snapshot(
+                Options({'scan.timestamp-millis': '1000', 'scan.snapshot-id': 
'1'}),
+                _StubTagManager({}),
+                mgr,
+            )
 
 
 if __name__ == '__main__':

Reply via email to