This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a1d255bf8d [python] Add StartupMode enum and scan.mode option to
CoreOptions (#7900)
a1d255bf8d is described below
commit a1d255bf8da9d1d58da0c4748ca6149107f67f4c
Author: Junrui Lee <[email protected]>
AuthorDate: Thu Jun 4 14:19:36 2026 +0800
[python] Add StartupMode enum and scan.mode option to CoreOptions (#7900)
---
.../pypaimon/common/options/core_options.py | 87 +++++++++++++++++
paimon-python/pypaimon/read/table_scan.py | 103 +++++++++++++++++++++
.../pypaimon/tests/table_scan_mode_test.py | 94 +++++++++++++++++++
3 files changed, 284 insertions(+)
diff --git a/paimon-python/pypaimon/common/options/core_options.py
b/paimon-python/pypaimon/common/options/core_options.py
index 6013019aba..874b888faf 100644
--- a/paimon-python/pypaimon/common/options/core_options.py
+++ b/paimon-python/pypaimon/common/options/core_options.py
@@ -16,6 +16,7 @@
# under the License.
import sys
+import warnings
from datetime import timedelta
from enum import Enum
from typing import Dict, List, Optional
@@ -67,6 +68,23 @@ class SortOrder(str, Enum):
DESCENDING = "descending"
+class StartupMode(str, Enum):
+ """
+ Startup mode for scan operations.
+ """
+ DEFAULT = "default"
+ LATEST_FULL = "latest-full"
+ FULL = "full"
+ LATEST = "latest"
+ COMPACTED_FULL = "compacted-full"
+ FROM_TIMESTAMP = "from-timestamp"
+ FROM_SNAPSHOT = "from-snapshot"
+ FROM_SNAPSHOT_FULL = "from-snapshot-full"
+ FROM_CREATION_TIMESTAMP = "from-creation-timestamp"
+ FROM_FILE_CREATION_TIME = "from-file-creation-time"
+ INCREMENTAL = "incremental"
+
+
class GlobalIndexColumnUpdateAction(str, Enum):
THROW_ERROR = "THROW_ERROR"
DROP_PARTITION_INDEX = "DROP_PARTITION_INDEX"
@@ -327,6 +345,21 @@ class CoreOptions:
.with_description("Specify the file name prefix of data files.")
)
# Scan options
+ SCAN_MODE: ConfigOption[StartupMode] = (
+ ConfigOptions.key("scan.mode")
+ .enum_type(StartupMode)
+ .default_value(StartupMode.DEFAULT)
+ .with_description(
+ "Scan startup mode for the table. "
+ "'default' resolves the actual mode from other scan options. "
+ "'latest-full' reads the latest snapshot then streams changes. "
+ "'latest' only streams changes without an initial snapshot. "
+ "'from-timestamp' reads from a specific timestamp. "
+ "'from-snapshot' reads from a specific snapshot. "
+ "'incremental' reads incremental changes between two
snapshots/tags."
+ )
+ )
+
SCAN_FALLBACK_BRANCH: ConfigOption[str] = (
ConfigOptions.key("scan.fallback-branch")
.string_type()
@@ -388,6 +421,24 @@ class CoreOptions:
)
)
+ SCAN_FILE_CREATION_TIME_MILLIS: ConfigOption[int] = (
+ ConfigOptions.key("scan.file-creation-time-millis")
+ .long_type()
+ .no_default_value()
+ .with_description(
+ "After configuring this time, only the data files created after
this time will be read."
+ )
+ )
+
+ SCAN_CREATION_TIME_MILLIS: ConfigOption[int] = (
+ ConfigOptions.key("scan.creation-time-millis")
+ .long_type()
+ .no_default_value()
+ .with_description(
+ "Optional timestamp used in case of 'from-creation-timestamp' scan
mode."
+ )
+ )
+
SOURCE_SPLIT_TARGET_SIZE: ConfigOption[MemorySize] = (
ConfigOptions.key("source.split.target-size")
.memory_type()
@@ -827,6 +878,42 @@ class CoreOptions:
def data_file_prefix(self, default=None):
return self.options.get(CoreOptions.DATA_FILE_PREFIX, default)
+ def scan_mode(self, default=None):
+ return self.options.get(CoreOptions.SCAN_MODE, default)
+
+ def startup_mode(self) -> 'StartupMode':
+ """Resolve the effective startup mode, matching Java
CoreOptions.startupMode().
+
+ If scan.mode is DEFAULT, auto-detects from other scan options.
+ Maps deprecated FULL to LATEST_FULL.
+ """
+ mode = self.scan_mode()
+ if mode == StartupMode.DEFAULT:
+ if (self.options.contains(CoreOptions.SCAN_TIMESTAMP_MILLIS)
+ or self.options.contains(CoreOptions.SCAN_TIMESTAMP)):
+ return StartupMode.FROM_TIMESTAMP
+ elif (self.options.contains(CoreOptions.SCAN_SNAPSHOT_ID)
+ or self.options.contains(CoreOptions.SCAN_TAG_NAME)
+ or self.options.contains(CoreOptions.SCAN_WATERMARK)):
+ return StartupMode.FROM_SNAPSHOT
+ elif
self.options.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP):
+ return StartupMode.INCREMENTAL
+ elif
self.options.contains(CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS):
+ return StartupMode.FROM_FILE_CREATION_TIME
+ elif self.options.contains(CoreOptions.SCAN_CREATION_TIME_MILLIS):
+ return StartupMode.FROM_CREATION_TIMESTAMP
+ else:
+ return StartupMode.LATEST_FULL
+ elif mode == StartupMode.FULL:
+ warnings.warn(
+ "scan.mode 'full' is deprecated, use 'latest-full' instead.",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ return StartupMode.LATEST_FULL
+ else:
+ return mode
+
def scan_fallback_branch(self, default=None):
return self.options.get(CoreOptions.SCAN_FALLBACK_BRANCH, default)
diff --git a/paimon-python/pypaimon/read/table_scan.py
b/paimon-python/pypaimon/read/table_scan.py
index 6232618035..03a1c8b062 100755
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -58,6 +58,8 @@ class TableScan:
snapshot_manager = self.table.snapshot_manager()
manifest_list_manager = ManifestListManager(self.table)
+ self._validate_scan_mode()
+
from pypaimon.snapshot.time_travel_util import TimeTravelUtil,
SCAN_KEYS
has_time_travel = any(options.contains_key(key) for key in SCAN_KEYS)
has_incremental =
options.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP)
@@ -158,3 +160,104 @@ class TableScan:
def with_global_index_result(self, result) -> 'TableScan':
self.file_scanner.with_global_index_result(result)
return self
+
+ def _validate_scan_mode(self):
+ """Validate scan.mode against companion options using a whitelist
approach.
+
+ Each StartupMode declares exactly which scan keys are allowed. Any
+ scan key present but not in the whitelist for the resolved mode is
+ rejected. This matches Java's SchemaValidation mutual-exclusion matrix.
+ """
+ from pypaimon.common.options.core_options import StartupMode
+
+ core_options = self.table.options
+ mode = core_options.startup_mode()
+ options = core_options.options
+
+ has_snapshot_id = options.contains(CoreOptions.SCAN_SNAPSHOT_ID)
+ has_tag_name = options.contains(CoreOptions.SCAN_TAG_NAME)
+ has_watermark = options.contains(CoreOptions.SCAN_WATERMARK)
+ has_timestamp_millis =
options.contains(CoreOptions.SCAN_TIMESTAMP_MILLIS)
+ has_timestamp = options.contains(CoreOptions.SCAN_TIMESTAMP)
+ has_incremental =
options.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP)
+ has_file_creation_time =
options.contains(CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS)
+ has_creation_time =
options.contains(CoreOptions.SCAN_CREATION_TIME_MILLIS)
+
+ present_keys = []
+ if has_snapshot_id:
+ present_keys.append(CoreOptions.SCAN_SNAPSHOT_ID.key())
+ if has_tag_name:
+ present_keys.append(CoreOptions.SCAN_TAG_NAME.key())
+ if has_watermark:
+ present_keys.append(CoreOptions.SCAN_WATERMARK.key())
+ if has_timestamp_millis:
+ present_keys.append(CoreOptions.SCAN_TIMESTAMP_MILLIS.key())
+ if has_timestamp:
+ present_keys.append(CoreOptions.SCAN_TIMESTAMP.key())
+ if has_incremental:
+
present_keys.append(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP.key())
+ if has_file_creation_time:
+
present_keys.append(CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS.key())
+ if has_creation_time:
+ present_keys.append(CoreOptions.SCAN_CREATION_TIME_MILLIS.key())
+
+ # scan.timestamp-millis and scan.timestamp are mutually exclusive
+ if has_timestamp_millis and has_timestamp:
+ raise ValueError(
+ "scan.timestamp-millis and scan.timestamp cannot both be set."
+ )
+
+ # Define allowed companion keys per mode
+ if mode == StartupMode.FROM_TIMESTAMP:
+ allowed = {
+ CoreOptions.SCAN_TIMESTAMP_MILLIS.key(),
+ CoreOptions.SCAN_TIMESTAMP.key(),
+ }
+ if not (has_timestamp_millis or has_timestamp):
+ raise ValueError(
+ "scan.mode is 'from-timestamp' but neither "
+ "scan.timestamp-millis nor scan.timestamp is set."
+ )
+ elif mode == StartupMode.FROM_SNAPSHOT_FULL:
+ allowed = {CoreOptions.SCAN_SNAPSHOT_ID.key()}
+ if not has_snapshot_id:
+ raise ValueError(
+ "scan.mode is 'from-snapshot-full' but scan.snapshot-id is
not set."
+ )
+ elif mode == StartupMode.FROM_SNAPSHOT:
+ allowed = {
+ CoreOptions.SCAN_SNAPSHOT_ID.key(),
+ CoreOptions.SCAN_TAG_NAME.key(),
+ CoreOptions.SCAN_WATERMARK.key(),
+ }
+ if not (has_snapshot_id or has_tag_name or has_watermark):
+ raise ValueError(
+ "scan.mode is 'from-snapshot' but none of "
+ "scan.snapshot-id, scan.tag-name, or scan.watermark is
set."
+ )
+ elif mode == StartupMode.INCREMENTAL:
+ allowed = {CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP.key()}
+ if not has_incremental:
+ raise ValueError(
+ "scan.mode is 'incremental' but "
+ "incremental-between-timestamp is not set."
+ )
+ elif mode in (StartupMode.LATEST_FULL, StartupMode.LATEST):
+ allowed = set()
+ elif mode in (StartupMode.COMPACTED_FULL,
+ StartupMode.FROM_CREATION_TIMESTAMP,
+ StartupMode.FROM_FILE_CREATION_TIME):
+ raise ValueError(
+ f"scan.mode '{mode.value}' is not yet supported in pypaimon."
+ )
+ else:
+ allowed = set()
+
+ # Reject any scan key that's not in the whitelist for this mode
+ disallowed = [k for k in present_keys if k not in allowed]
+ if disallowed:
+ raise ValueError(
+ f"scan.mode '{mode.value}' conflicts with: {disallowed}. "
+ f"Only {sorted(allowed) if allowed else 'no scan keys'} "
+ f"are allowed for this mode."
+ )
diff --git a/paimon-python/pypaimon/tests/table_scan_mode_test.py
b/paimon-python/pypaimon/tests/table_scan_mode_test.py
new file mode 100644
index 0000000000..c33fdf9beb
--- /dev/null
+++ b/paimon-python/pypaimon/tests/table_scan_mode_test.py
@@ -0,0 +1,94 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+import warnings
+from unittest.mock import Mock
+
+from pypaimon.common.options.core_options import CoreOptions, StartupMode
+from pypaimon.read.table_scan import TableScan
+
+
+def _scan(options):
+ scan = TableScan.__new__(TableScan)
+ scan.table = Mock()
+ scan.table.options = CoreOptions.from_dict(options)
+ return scan
+
+
+class TableScanModeTest(unittest.TestCase):
+
+ def test_from_timestamp_requires_timestamp_option(self):
+ scan = _scan({
+ CoreOptions.SCAN_MODE.key(): StartupMode.FROM_TIMESTAMP.value,
+ })
+
+ with self.assertRaisesRegex(
+ ValueError,
+ "neither scan.timestamp-millis nor scan.timestamp is set"):
+ scan._validate_scan_mode()
+
+ def test_latest_conflicts_with_snapshot_id(self):
+ scan = _scan({
+ CoreOptions.SCAN_MODE.key(): StartupMode.LATEST.value,
+ CoreOptions.SCAN_SNAPSHOT_ID.key(): "1",
+ })
+
+ with self.assertRaisesRegex(ValueError, "scan.snapshot-id"):
+ scan._validate_scan_mode()
+
+ def test_default_with_timestamp_millis_resolves_to_from_timestamp(self):
+ options = CoreOptions.from_dict({
+ CoreOptions.SCAN_MODE.key(): StartupMode.DEFAULT.value,
+ CoreOptions.SCAN_TIMESTAMP_MILLIS.key(): "123",
+ })
+
+ self.assertEqual(options.startup_mode(), StartupMode.FROM_TIMESTAMP)
+ _scan(options.options.to_map())._validate_scan_mode()
+
+ def test_default_with_snapshot_id_resolves_to_from_snapshot(self):
+ options = CoreOptions.from_dict({
+ CoreOptions.SCAN_MODE.key(): StartupMode.DEFAULT.value,
+ CoreOptions.SCAN_SNAPSHOT_ID.key(): "1",
+ })
+
+ self.assertEqual(options.startup_mode(), StartupMode.FROM_SNAPSHOT)
+ _scan(options.options.to_map())._validate_scan_mode()
+
+ def test_unsupported_scan_modes_raise_value_error(self):
+ scan = _scan({
+ CoreOptions.SCAN_MODE.key(): StartupMode.COMPACTED_FULL.value,
+ })
+
+ with self.assertRaisesRegex(ValueError, "not yet supported"):
+ scan._validate_scan_mode()
+
+ def test_full_mode_maps_to_latest_full_with_deprecation_warning(self):
+ options = CoreOptions.from_dict({
+ CoreOptions.SCAN_MODE.key(): StartupMode.FULL.value,
+ })
+
+ with warnings.catch_warnings(record=True) as caught:
+ warnings.simplefilter("always")
+ self.assertEqual(options.startup_mode(), StartupMode.LATEST_FULL)
+
+ self.assertEqual(len(caught), 1)
+ self.assertTrue(issubclass(caught[0].category, DeprecationWarning))
+
+
+if __name__ == '__main__':
+ unittest.main()