This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1c38526c62 [python] Support time travel by timestamp and watermark in
TimeTravelUtil (#7899)
1c38526c62 is described below
commit 1c38526c62426ac35d9329b6e3422edb852ba9a7
Author: Junrui Lee <[email protected]>
AuthorDate: Sat May 23 22:55:40 2026 +0800
[python] Support time travel by timestamp and watermark in TimeTravelUtil
(#7899)
Currently pypaimon's TimeTravelUtil only supports time travel by
scan.tag-name and scan.snapshot-id. This PR adds support for three
additional time travel modes that are available on the Java side:
- scan.timestamp-millis: Travel to the latest snapshot with commit time
<= the given timestamp (milliseconds)
- scan.timestamp: Same as above but accepts a human-readable timestamp
string (e.g. '2023-12-01 12:00:00')
- scan.watermark: Travel to the first snapshot with watermark >= the
given value
---
.../pypaimon/common/options/core_options.py | 39 ++++++
paimon-python/pypaimon/read/table_scan.py | 41 +++---
.../pypaimon/snapshot/snapshot_manager.py | 50 ++++++++
.../pypaimon/snapshot/time_travel_util.py | 79 +++++++++++-
.../pypaimon/tests/time_travel_util_test.py | 138 ++++++++++++++++++++-
5 files changed, 320 insertions(+), 27 deletions(-)
diff --git a/paimon-python/pypaimon/common/options/core_options.py
b/paimon-python/pypaimon/common/options/core_options.py
index 2d140b9539..9280705366 100644
--- a/paimon-python/pypaimon/common/options/core_options.py
+++ b/paimon-python/pypaimon/common/options/core_options.py
@@ -271,6 +271,36 @@ class CoreOptions:
)
)
+ SCAN_TIMESTAMP_MILLIS: ConfigOption[int] = (
+ ConfigOptions.key("scan.timestamp-millis")
+ .long_type()
+ .no_default_value()
+ .with_description(
+ "Optional timestamp in milliseconds used for time travel to the "
+ "latest snapshot equal to or earlier than the given timestamp."
+ )
+ )
+
+ SCAN_TIMESTAMP: ConfigOption[str] = (
+ ConfigOptions.key("scan.timestamp")
+ .string_type()
+ .no_default_value()
+ .with_description(
+ "Optional timestamp string (e.g. '2023-12-01 12:00:00') used for "
+ "time travel. Will be converted to milliseconds internally."
+ )
+ )
+
+ SCAN_WATERMARK: ConfigOption[int] = (
+ ConfigOptions.key("scan.watermark")
+ .long_type()
+ .no_default_value()
+ .with_description(
+ "Optional watermark used for time travel to the first snapshot "
+ "with watermark greater than or equal to the given value."
+ )
+ )
+
SOURCE_SPLIT_TARGET_SIZE: ConfigOption[MemorySize] = (
ConfigOptions.key("source.split.target-size")
.memory_type()
@@ -637,6 +667,15 @@ class CoreOptions:
def scan_snapshot_id(self, default=None):
return self.options.get(CoreOptions.SCAN_SNAPSHOT_ID, default)
+ def scan_timestamp_millis(self, default=None):
+ return self.options.get(CoreOptions.SCAN_TIMESTAMP_MILLIS, default)
+
+ def scan_timestamp(self, default=None):
+ return self.options.get(CoreOptions.SCAN_TIMESTAMP, default)
+
+ def scan_watermark(self, default=None):
+ return self.options.get(CoreOptions.SCAN_WATERMARK, default)
+
def source_split_target_size(self, default=None):
return self.options.get(CoreOptions.SOURCE_SPLIT_TARGET_SIZE,
default).get_bytes()
diff --git a/paimon-python/pypaimon/read/table_scan.py
b/paimon-python/pypaimon/read/table_scan.py
index bc610134e0..6232618035 100755
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -57,7 +57,18 @@ class TableScan:
options = self.table.options.options
snapshot_manager = self.table.snapshot_manager()
manifest_list_manager = ManifestListManager(self.table)
- if options.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP):
+
+ from pypaimon.snapshot.time_travel_util import TimeTravelUtil,
SCAN_KEYS
+ has_time_travel = any(options.contains_key(key) for key in SCAN_KEYS)
+ has_incremental =
options.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP)
+
+ if has_incremental and has_time_travel:
+ raise ValueError(
+ "incremental-between-timestamp cannot be used together with "
+ "point-in-time scan options: %s" % SCAN_KEYS
+ )
+
+ if has_incremental:
ts =
options.get(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP).split(",")
if len(ts) != 2:
raise ValueError(
@@ -106,35 +117,21 @@ class TableScan:
return manifests, end_snapshot
return FileScanner(self.table, incremental_manifest,
self.predicate, self.limit)
- elif options.contains(CoreOptions.SCAN_TAG_NAME): # Handle tag-based
reading
- tag_name = options.get(CoreOptions.SCAN_TAG_NAME)
-
- def tag_manifest_scanner():
- tag_manager = self.table.tag_manager()
- tag = tag_manager.get_or_throw(tag_name)
- snapshot = tag.trim_to_snapshot()
- return manifest_list_manager.read_all(snapshot), snapshot
-
- return FileScanner(
- self.table,
- tag_manifest_scanner,
- self.predicate,
- self.limit
- )
- elif options.contains(CoreOptions.SCAN_SNAPSHOT_ID): # Handle
snapshot-id-based reading
- snapshot_id = int(options.get(CoreOptions.SCAN_SNAPSHOT_ID))
- def snapshot_id_manifest_scanner():
- snapshot = snapshot_manager.get_snapshot_by_id(snapshot_id)
+ if has_time_travel:
+ def time_travel_manifest_scanner():
+ snapshot = TimeTravelUtil.try_travel_to_snapshot(
+ options, self.table.tag_manager(), snapshot_manager
+ )
if snapshot is None:
raise ValueError(
- "Snapshot id %d does not exist" % snapshot_id
+ "Could not resolve time travel snapshot from scan
options."
)
return manifest_list_manager.read_all(snapshot), snapshot
return FileScanner(
self.table,
- snapshot_id_manifest_scanner,
+ time_travel_manifest_scanner,
self.predicate,
self.limit
)
diff --git a/paimon-python/pypaimon/snapshot/snapshot_manager.py
b/paimon-python/pypaimon/snapshot/snapshot_manager.py
index c8f1f4818e..41a1ac81ba 100644
--- a/paimon-python/pypaimon/snapshot/snapshot_manager.py
+++ b/paimon-python/pypaimon/snapshot/snapshot_manager.py
@@ -257,6 +257,56 @@ class SnapshotManager:
snapshots.append(snapshot)
return snapshots
+ def later_or_equal_watermark(self, watermark: int) -> Optional[Snapshot]:
+ """
+ Find the first snapshot with watermark >= the given value.
+
+ Args:
+ watermark: The watermark value to compare against
+
+ Returns:
+ The first snapshot with watermark >= the given value, or None if
+ no such snapshot exists
+ """
+ earliest_snap = self.try_get_earliest_snapshot()
+ latest_snap = self.get_latest_snapshot()
+
+ if earliest_snap is None or latest_snap is None:
+ return None
+
+ earliest = earliest_snap.id
+ latest = latest_snap.id
+ result = None
+
+ while earliest <= latest:
+ mid = earliest + (latest - earliest) // 2
+ snapshot = self.get_snapshot_by_id(mid)
+
+ if snapshot is None:
+ found = False
+ for i in range(mid + 1, latest + 1):
+ snapshot = self.get_snapshot_by_id(i)
+ if snapshot is not None:
+ mid = i
+ found = True
+ break
+ if not found:
+ latest = mid - 1
+ continue
+
+ snap_watermark = snapshot.watermark
+ if snap_watermark is None:
+ earliest = mid + 1
+ continue
+
+ if snap_watermark >= watermark:
+ result = snapshot
+ latest = mid - 1
+ else:
+ earliest = mid + 1
+
+ return result
+
def get_snapshot_by_id(self, snapshot_id: int) -> Optional[Snapshot]:
"""
Get a snapshot by its ID.
diff --git a/paimon-python/pypaimon/snapshot/time_travel_util.py
b/paimon-python/pypaimon/snapshot/time_travel_util.py
index de927f5f99..8a8609b147 100644
--- a/paimon-python/pypaimon/snapshot/time_travel_util.py
+++ b/paimon-python/pypaimon/snapshot/time_travel_util.py
@@ -17,6 +17,7 @@
"""The util class of resolve snapshot from scan params for time travel."""
+from datetime import datetime
from typing import Optional
from pypaimon.common.options.core_options import CoreOptions
@@ -25,11 +26,42 @@ from pypaimon.snapshot.snapshot import Snapshot
from pypaimon.tag.tag_manager import TagManager
SCAN_KEYS = [
- CoreOptions.SCAN_TAG_NAME.key(),
CoreOptions.SCAN_SNAPSHOT_ID.key(),
+ CoreOptions.SCAN_TAG_NAME.key(),
+ CoreOptions.SCAN_WATERMARK.key(),
+ CoreOptions.SCAN_TIMESTAMP.key(),
+ CoreOptions.SCAN_TIMESTAMP_MILLIS.key(),
]
+def _parse_timestamp_to_millis(timestamp_str: str) -> int:
+ """Parse a timestamp string to milliseconds since epoch using local
timezone.
+
+ Consistent with Java's TimeZone.getDefault() behavior in TimeTravelUtil.
+
+ Supports formats:
+ - '2023-12-01 12:00:00'
+ - '2023-12-01T12:00:00'
+ - '2023-12-01 12:00:00.123'
+ """
+ formats = [
+ "%Y-%m-%d %H:%M:%S.%f",
+ "%Y-%m-%d %H:%M:%S",
+ "%Y-%m-%dT%H:%M:%S.%f",
+ "%Y-%m-%dT%H:%M:%S",
+ ]
+ for fmt in formats:
+ try:
+ dt = datetime.strptime(timestamp_str, fmt)
+ return int(dt.timestamp() * 1000)
+ except ValueError:
+ continue
+ raise ValueError(
+ f"Cannot parse timestamp '{timestamp_str}'. "
+ f"Expected format: 'yyyy-MM-dd HH:mm:ss' or 'yyyy-MM-dd HH:mm:ss.SSS'"
+ )
+
+
class TimeTravelUtil:
"""The util class of resolve snapshot from scan params for time travel."""
@@ -45,18 +77,22 @@ class TimeTravelUtil:
Supports the following time travel options:
- scan.tag-name: Travel to a specific tag
- scan.snapshot-id: Travel to a specific snapshot id
+ - scan.timestamp-millis: Travel to the latest snapshot <= the given
timestamp (ms)
+ - scan.timestamp: Travel by timestamp string (e.g. '2023-12-01
12:00:00')
+ - scan.watermark: Travel to the first snapshot with watermark >= the
given value
Args:
options: The options containing time travel parameters
tag_manager: The tag manager
snapshot_manager: The snapshot manager, required when
- ``scan.snapshot-id`` is set
+ using snapshot-id, timestamp, or watermark based time travel
Returns:
The Snapshot to travel to, or None if no time travel option is set.
Raises:
- ValueError: If more than one time travel option is set
+ ValueError: If more than one time travel option is set, or if the
+ required manager is not provided
"""
scan_handle_keys = [key for key in SCAN_KEYS if
options.contains_key(key)]
@@ -86,5 +122,42 @@ class TimeTravelUtil:
if snapshot is None:
raise ValueError(f"Snapshot id '{snapshot_id}' doesn't exist.")
return snapshot
+ elif key == CoreOptions.SCAN_TIMESTAMP_MILLIS.key():
+ if snapshot_manager is None:
+ raise ValueError(
+ "snapshot_manager is required to resolve
scan.timestamp-millis"
+ )
+ timestamp_millis = int(core_options.scan_timestamp_millis())
+ snapshot =
snapshot_manager.earlier_or_equal_time_mills(timestamp_millis)
+ if snapshot is None:
+ raise ValueError(
+ f"No snapshot found with timestamp earlier than or equal
to {timestamp_millis}ms."
+ )
+ return snapshot
+ elif key == CoreOptions.SCAN_TIMESTAMP.key():
+ if snapshot_manager is None:
+ raise ValueError(
+ "snapshot_manager is required to resolve scan.timestamp"
+ )
+ timestamp_str = core_options.scan_timestamp()
+ timestamp_millis = _parse_timestamp_to_millis(timestamp_str)
+ snapshot =
snapshot_manager.earlier_or_equal_time_mills(timestamp_millis)
+ if snapshot is None:
+ raise ValueError(
+ f"No snapshot found with timestamp earlier than or equal
to '{timestamp_str}'."
+ )
+ return snapshot
+ elif key == CoreOptions.SCAN_WATERMARK.key():
+ if snapshot_manager is None:
+ raise ValueError(
+ "snapshot_manager is required to resolve scan.watermark"
+ )
+ watermark = int(core_options.scan_watermark())
+ snapshot = snapshot_manager.later_or_equal_watermark(watermark)
+ if snapshot is None:
+ raise ValueError(
+ f"No snapshot found with watermark greater than or equal
to {watermark}."
+ )
+ return snapshot
else:
raise ValueError(f"Unsupported time travel mode: {key}")
diff --git a/paimon-python/pypaimon/tests/time_travel_util_test.py
b/paimon-python/pypaimon/tests/time_travel_util_test.py
index 10372164c7..7b8b139f12 100644
--- a/paimon-python/pypaimon/tests/time_travel_util_test.py
+++ b/paimon-python/pypaimon/tests/time_travel_util_test.py
@@ -22,9 +22,11 @@ from pypaimon.snapshot.time_travel_util import SCAN_KEYS,
TimeTravelUtil
class _StubSnapshot:
- def __init__(self, snapshot_id, schema_id=0):
+ def __init__(self, snapshot_id, schema_id=0, time_millis=0,
watermark=None):
self.id = snapshot_id
self.schema_id = schema_id
+ self.time_millis = time_millis
+ self.watermark = watermark
class _StubSnapshotManager:
@@ -34,6 +36,31 @@ class _StubSnapshotManager:
def get_snapshot_by_id(self, snapshot_id):
return self._snapshots.get(snapshot_id)
+ def try_get_earliest_snapshot(self):
+ if not self._snapshots:
+ return None
+ return self._snapshots[min(self._snapshots.keys())]
+
+ def get_latest_snapshot(self):
+ if not self._snapshots:
+ return None
+ return self._snapshots[max(self._snapshots.keys())]
+
+ def earlier_or_equal_time_mills(self, timestamp):
+ result = None
+ for snap in sorted(self._snapshots.values(), key=lambda s: s.id):
+ if snap.time_millis <= timestamp:
+ result = snap
+ else:
+ break
+ return result
+
+ def later_or_equal_watermark(self, watermark):
+ for snap in sorted(self._snapshots.values(), key=lambda s: s.id):
+ if snap.watermark is not None and snap.watermark >= watermark:
+ return snap
+ return None
+
class _StubTagManager:
def __init__(self, tags):
@@ -87,10 +114,117 @@ class TimeTravelUtilTest(unittest.TestCase):
)
def test_scan_keys_contains_both_options(self):
- # Sanity check: SCAN_KEYS must enumerate both time-travel modes,
+ # Sanity check: SCAN_KEYS must enumerate all time-travel modes,
# otherwise the mutual-exclusion guard above would not trigger.
self.assertIn('scan.snapshot-id', SCAN_KEYS)
self.assertIn('scan.tag-name', SCAN_KEYS)
+ self.assertIn('scan.timestamp-millis', SCAN_KEYS)
+ self.assertIn('scan.timestamp', SCAN_KEYS)
+ self.assertIn('scan.watermark', SCAN_KEYS)
+
+ def test_resolves_timestamp_millis(self):
+ snap1 = _StubSnapshot(1, time_millis=1000)
+ snap2 = _StubSnapshot(2, time_millis=2000)
+ snap3 = _StubSnapshot(3, time_millis=3000)
+ mgr = _StubSnapshotManager([snap1, snap2, snap3])
+ result = TimeTravelUtil.try_travel_to_snapshot(
+ Options({'scan.timestamp-millis': '2500'}),
+ _StubTagManager({}),
+ mgr,
+ )
+ self.assertIs(result, snap2)
+
+ def test_resolves_timestamp_millis_exact_match(self):
+ snap1 = _StubSnapshot(1, time_millis=1000)
+ snap2 = _StubSnapshot(2, time_millis=2000)
+ mgr = _StubSnapshotManager([snap1, snap2])
+ result = TimeTravelUtil.try_travel_to_snapshot(
+ Options({'scan.timestamp-millis': '2000'}),
+ _StubTagManager({}),
+ mgr,
+ )
+ self.assertIs(result, snap2)
+
+ def test_timestamp_millis_no_match_raises(self):
+ snap1 = _StubSnapshot(1, time_millis=5000)
+ mgr = _StubSnapshotManager([snap1])
+ with self.assertRaises(ValueError):
+ TimeTravelUtil.try_travel_to_snapshot(
+ Options({'scan.timestamp-millis': '1000'}),
+ _StubTagManager({}),
+ mgr,
+ )
+
+ def test_timestamp_millis_without_snapshot_manager_raises(self):
+ with self.assertRaises(ValueError):
+ TimeTravelUtil.try_travel_to_snapshot(
+ Options({'scan.timestamp-millis': '1000'}),
+ _StubTagManager({}),
+ None,
+ )
+
+ def test_resolves_timestamp_string(self):
+ # Compute expected millis in local timezone, consistent with Java
behavior
+ from datetime import datetime
+ expected_millis = int(datetime(2023, 12, 1, 0, 0, 0).timestamp() *
1000)
+ snap1 = _StubSnapshot(1, time_millis=expected_millis)
+ snap2 = _StubSnapshot(2, time_millis=expected_millis + 100000)
+ mgr = _StubSnapshotManager([snap1, snap2])
+ result = TimeTravelUtil.try_travel_to_snapshot(
+ Options({'scan.timestamp': '2023-12-01 00:00:00'}),
+ _StubTagManager({}),
+ mgr,
+ )
+ self.assertIs(result, snap1)
+
+ def test_timestamp_string_invalid_format_raises(self):
+ snap1 = _StubSnapshot(1, time_millis=1000)
+ mgr = _StubSnapshotManager([snap1])
+ with self.assertRaises(ValueError):
+ TimeTravelUtil.try_travel_to_snapshot(
+ Options({'scan.timestamp': 'not-a-timestamp'}),
+ _StubTagManager({}),
+ mgr,
+ )
+
+ def test_resolves_watermark(self):
+ snap1 = _StubSnapshot(1, watermark=100)
+ snap2 = _StubSnapshot(2, watermark=200)
+ snap3 = _StubSnapshot(3, watermark=300)
+ mgr = _StubSnapshotManager([snap1, snap2, snap3])
+ result = TimeTravelUtil.try_travel_to_snapshot(
+ Options({'scan.watermark': '200'}),
+ _StubTagManager({}),
+ mgr,
+ )
+ self.assertIs(result, snap2)
+
+ def test_watermark_no_match_raises(self):
+ snap1 = _StubSnapshot(1, watermark=100)
+ mgr = _StubSnapshotManager([snap1])
+ with self.assertRaises(ValueError):
+ TimeTravelUtil.try_travel_to_snapshot(
+ Options({'scan.watermark': '500'}),
+ _StubTagManager({}),
+ mgr,
+ )
+
+ def test_watermark_without_snapshot_manager_raises(self):
+ with self.assertRaises(ValueError):
+ TimeTravelUtil.try_travel_to_snapshot(
+ Options({'scan.watermark': '100'}),
+ _StubTagManager({}),
+ None,
+ )
+
+ def test_rejects_multiple_time_travel_options(self):
+ mgr = _StubSnapshotManager([_StubSnapshot(1, time_millis=1000)])
+ with self.assertRaises(ValueError):
+ TimeTravelUtil.try_travel_to_snapshot(
+ Options({'scan.timestamp-millis': '1000', 'scan.snapshot-id':
'1'}),
+ _StubTagManager({}),
+ mgr,
+ )
if __name__ == '__main__':