This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a1d255bf8d [python] Add StartupMode enum and scan.mode option to 
CoreOptions (#7900)
a1d255bf8d is described below

commit a1d255bf8da9d1d58da0c4748ca6149107f67f4c
Author: Junrui Lee <[email protected]>
AuthorDate: Thu Jun 4 14:19:36 2026 +0800

    [python] Add StartupMode enum and scan.mode option to CoreOptions (#7900)
---
 .../pypaimon/common/options/core_options.py        |  87 +++++++++++++++++
 paimon-python/pypaimon/read/table_scan.py          | 103 +++++++++++++++++++++
 .../pypaimon/tests/table_scan_mode_test.py         |  94 +++++++++++++++++++
 3 files changed, 284 insertions(+)

diff --git a/paimon-python/pypaimon/common/options/core_options.py 
b/paimon-python/pypaimon/common/options/core_options.py
index 6013019aba..874b888faf 100644
--- a/paimon-python/pypaimon/common/options/core_options.py
+++ b/paimon-python/pypaimon/common/options/core_options.py
@@ -16,6 +16,7 @@
 # under the License.
 
 import sys
+import warnings
 from datetime import timedelta
 from enum import Enum
 from typing import Dict, List, Optional
@@ -67,6 +68,23 @@ class SortOrder(str, Enum):
     DESCENDING = "descending"
 
 
+class StartupMode(str, Enum):
+    """
+    Startup mode for scan operations.
+    """
+    DEFAULT = "default"
+    LATEST_FULL = "latest-full"
+    FULL = "full"
+    LATEST = "latest"
+    COMPACTED_FULL = "compacted-full"
+    FROM_TIMESTAMP = "from-timestamp"
+    FROM_SNAPSHOT = "from-snapshot"
+    FROM_SNAPSHOT_FULL = "from-snapshot-full"
+    FROM_CREATION_TIMESTAMP = "from-creation-timestamp"
+    FROM_FILE_CREATION_TIME = "from-file-creation-time"
+    INCREMENTAL = "incremental"
+
+
 class GlobalIndexColumnUpdateAction(str, Enum):
     THROW_ERROR = "THROW_ERROR"
     DROP_PARTITION_INDEX = "DROP_PARTITION_INDEX"
@@ -327,6 +345,21 @@ class CoreOptions:
         .with_description("Specify the file name prefix of data files.")
     )
     # Scan options
+    SCAN_MODE: ConfigOption[StartupMode] = (
+        ConfigOptions.key("scan.mode")
+        .enum_type(StartupMode)
+        .default_value(StartupMode.DEFAULT)
+        .with_description(
+            "Scan startup mode for the table. "
+            "'default' resolves the actual mode from other scan options. "
+            "'latest-full' reads the latest snapshot then streams changes. "
+            "'latest' only streams changes without an initial snapshot. "
+            "'from-timestamp' reads from a specific timestamp. "
+            "'from-snapshot' reads from a specific snapshot. "
+            "'incremental' reads incremental changes between two 
snapshots/tags."
+        )
+    )
+
     SCAN_FALLBACK_BRANCH: ConfigOption[str] = (
         ConfigOptions.key("scan.fallback-branch")
         .string_type()
@@ -388,6 +421,24 @@ class CoreOptions:
         )
     )
 
+    SCAN_FILE_CREATION_TIME_MILLIS: ConfigOption[int] = (
+        ConfigOptions.key("scan.file-creation-time-millis")
+        .long_type()
+        .no_default_value()
+        .with_description(
+            "After configuring this time, only the data files created after 
this time will be read."
+        )
+    )
+
+    SCAN_CREATION_TIME_MILLIS: ConfigOption[int] = (
+        ConfigOptions.key("scan.creation-time-millis")
+        .long_type()
+        .no_default_value()
+        .with_description(
+            "Optional timestamp used in case of 'from-creation-timestamp' scan 
mode."
+        )
+    )
+
     SOURCE_SPLIT_TARGET_SIZE: ConfigOption[MemorySize] = (
         ConfigOptions.key("source.split.target-size")
         .memory_type()
@@ -827,6 +878,42 @@ class CoreOptions:
     def data_file_prefix(self, default=None):
         return self.options.get(CoreOptions.DATA_FILE_PREFIX, default)
 
+    def scan_mode(self, default=None):
+        return self.options.get(CoreOptions.SCAN_MODE, default)
+
+    def startup_mode(self) -> 'StartupMode':
+        """Resolve the effective startup mode, matching Java 
CoreOptions.startupMode().
+
+        If scan.mode is DEFAULT, auto-detects from other scan options.
+        Maps deprecated FULL to LATEST_FULL.
+        """
+        mode = self.scan_mode()
+        if mode == StartupMode.DEFAULT:
+            if (self.options.contains(CoreOptions.SCAN_TIMESTAMP_MILLIS)
+                    or self.options.contains(CoreOptions.SCAN_TIMESTAMP)):
+                return StartupMode.FROM_TIMESTAMP
+            elif (self.options.contains(CoreOptions.SCAN_SNAPSHOT_ID)
+                  or self.options.contains(CoreOptions.SCAN_TAG_NAME)
+                  or self.options.contains(CoreOptions.SCAN_WATERMARK)):
+                return StartupMode.FROM_SNAPSHOT
+            elif 
self.options.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP):
+                return StartupMode.INCREMENTAL
+            elif 
self.options.contains(CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS):
+                return StartupMode.FROM_FILE_CREATION_TIME
+            elif self.options.contains(CoreOptions.SCAN_CREATION_TIME_MILLIS):
+                return StartupMode.FROM_CREATION_TIMESTAMP
+            else:
+                return StartupMode.LATEST_FULL
+        elif mode == StartupMode.FULL:
+            warnings.warn(
+                "scan.mode 'full' is deprecated, use 'latest-full' instead.",
+                DeprecationWarning,
+                stacklevel=2,
+            )
+            return StartupMode.LATEST_FULL
+        else:
+            return mode
+
     def scan_fallback_branch(self, default=None):
         return self.options.get(CoreOptions.SCAN_FALLBACK_BRANCH, default)
 
diff --git a/paimon-python/pypaimon/read/table_scan.py 
b/paimon-python/pypaimon/read/table_scan.py
index 6232618035..03a1c8b062 100755
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -58,6 +58,8 @@ class TableScan:
         snapshot_manager = self.table.snapshot_manager()
         manifest_list_manager = ManifestListManager(self.table)
 
+        self._validate_scan_mode()
+
         from pypaimon.snapshot.time_travel_util import TimeTravelUtil, 
SCAN_KEYS
         has_time_travel = any(options.contains_key(key) for key in SCAN_KEYS)
         has_incremental = 
options.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP)
@@ -158,3 +160,104 @@ class TableScan:
     def with_global_index_result(self, result) -> 'TableScan':
         self.file_scanner.with_global_index_result(result)
         return self
+
+    def _validate_scan_mode(self):
+        """Validate scan.mode against companion options using a whitelist 
approach.
+
+        Each StartupMode declares exactly which scan keys are allowed. Any
+        scan key present but not in the whitelist for the resolved mode is
+        rejected. This matches Java's SchemaValidation mutual-exclusion matrix.
+        """
+        from pypaimon.common.options.core_options import StartupMode
+
+        core_options = self.table.options
+        mode = core_options.startup_mode()
+        options = core_options.options
+
+        has_snapshot_id = options.contains(CoreOptions.SCAN_SNAPSHOT_ID)
+        has_tag_name = options.contains(CoreOptions.SCAN_TAG_NAME)
+        has_watermark = options.contains(CoreOptions.SCAN_WATERMARK)
+        has_timestamp_millis = 
options.contains(CoreOptions.SCAN_TIMESTAMP_MILLIS)
+        has_timestamp = options.contains(CoreOptions.SCAN_TIMESTAMP)
+        has_incremental = 
options.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP)
+        has_file_creation_time = 
options.contains(CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS)
+        has_creation_time = 
options.contains(CoreOptions.SCAN_CREATION_TIME_MILLIS)
+
+        present_keys = []
+        if has_snapshot_id:
+            present_keys.append(CoreOptions.SCAN_SNAPSHOT_ID.key())
+        if has_tag_name:
+            present_keys.append(CoreOptions.SCAN_TAG_NAME.key())
+        if has_watermark:
+            present_keys.append(CoreOptions.SCAN_WATERMARK.key())
+        if has_timestamp_millis:
+            present_keys.append(CoreOptions.SCAN_TIMESTAMP_MILLIS.key())
+        if has_timestamp:
+            present_keys.append(CoreOptions.SCAN_TIMESTAMP.key())
+        if has_incremental:
+            
present_keys.append(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP.key())
+        if has_file_creation_time:
+            
present_keys.append(CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS.key())
+        if has_creation_time:
+            present_keys.append(CoreOptions.SCAN_CREATION_TIME_MILLIS.key())
+
+        # scan.timestamp-millis and scan.timestamp are mutually exclusive
+        if has_timestamp_millis and has_timestamp:
+            raise ValueError(
+                "scan.timestamp-millis and scan.timestamp cannot both be set."
+            )
+
+        # Define allowed companion keys per mode
+        if mode == StartupMode.FROM_TIMESTAMP:
+            allowed = {
+                CoreOptions.SCAN_TIMESTAMP_MILLIS.key(),
+                CoreOptions.SCAN_TIMESTAMP.key(),
+            }
+            if not (has_timestamp_millis or has_timestamp):
+                raise ValueError(
+                    "scan.mode is 'from-timestamp' but neither "
+                    "scan.timestamp-millis nor scan.timestamp is set."
+                )
+        elif mode == StartupMode.FROM_SNAPSHOT_FULL:
+            allowed = {CoreOptions.SCAN_SNAPSHOT_ID.key()}
+            if not has_snapshot_id:
+                raise ValueError(
+                    "scan.mode is 'from-snapshot-full' but scan.snapshot-id is 
not set."
+                )
+        elif mode == StartupMode.FROM_SNAPSHOT:
+            allowed = {
+                CoreOptions.SCAN_SNAPSHOT_ID.key(),
+                CoreOptions.SCAN_TAG_NAME.key(),
+                CoreOptions.SCAN_WATERMARK.key(),
+            }
+            if not (has_snapshot_id or has_tag_name or has_watermark):
+                raise ValueError(
+                    "scan.mode is 'from-snapshot' but none of "
+                    "scan.snapshot-id, scan.tag-name, or scan.watermark is 
set."
+                )
+        elif mode == StartupMode.INCREMENTAL:
+            allowed = {CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP.key()}
+            if not has_incremental:
+                raise ValueError(
+                    "scan.mode is 'incremental' but "
+                    "incremental-between-timestamp is not set."
+                )
+        elif mode in (StartupMode.LATEST_FULL, StartupMode.LATEST):
+            allowed = set()
+        elif mode in (StartupMode.COMPACTED_FULL,
+                      StartupMode.FROM_CREATION_TIMESTAMP,
+                      StartupMode.FROM_FILE_CREATION_TIME):
+            raise ValueError(
+                f"scan.mode '{mode.value}' is not yet supported in pypaimon."
+            )
+        else:
+            allowed = set()
+
+        # Reject any scan key that's not in the whitelist for this mode
+        disallowed = [k for k in present_keys if k not in allowed]
+        if disallowed:
+            raise ValueError(
+                f"scan.mode '{mode.value}' conflicts with: {disallowed}. "
+                f"Only {sorted(allowed) if allowed else 'no scan keys'} "
+                f"are allowed for this mode."
+            )
diff --git a/paimon-python/pypaimon/tests/table_scan_mode_test.py 
b/paimon-python/pypaimon/tests/table_scan_mode_test.py
new file mode 100644
index 0000000000..c33fdf9beb
--- /dev/null
+++ b/paimon-python/pypaimon/tests/table_scan_mode_test.py
@@ -0,0 +1,94 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+import warnings
+from unittest.mock import Mock
+
+from pypaimon.common.options.core_options import CoreOptions, StartupMode
+from pypaimon.read.table_scan import TableScan
+
+
+def _scan(options):
+    scan = TableScan.__new__(TableScan)
+    scan.table = Mock()
+    scan.table.options = CoreOptions.from_dict(options)
+    return scan
+
+
+class TableScanModeTest(unittest.TestCase):
+
+    def test_from_timestamp_requires_timestamp_option(self):
+        scan = _scan({
+            CoreOptions.SCAN_MODE.key(): StartupMode.FROM_TIMESTAMP.value,
+        })
+
+        with self.assertRaisesRegex(
+                ValueError,
+                "neither scan.timestamp-millis nor scan.timestamp is set"):
+            scan._validate_scan_mode()
+
+    def test_latest_conflicts_with_snapshot_id(self):
+        scan = _scan({
+            CoreOptions.SCAN_MODE.key(): StartupMode.LATEST.value,
+            CoreOptions.SCAN_SNAPSHOT_ID.key(): "1",
+        })
+
+        with self.assertRaisesRegex(ValueError, "scan.snapshot-id"):
+            scan._validate_scan_mode()
+
+    def test_default_with_timestamp_millis_resolves_to_from_timestamp(self):
+        options = CoreOptions.from_dict({
+            CoreOptions.SCAN_MODE.key(): StartupMode.DEFAULT.value,
+            CoreOptions.SCAN_TIMESTAMP_MILLIS.key(): "123",
+        })
+
+        self.assertEqual(options.startup_mode(), StartupMode.FROM_TIMESTAMP)
+        _scan(options.options.to_map())._validate_scan_mode()
+
+    def test_default_with_snapshot_id_resolves_to_from_snapshot(self):
+        options = CoreOptions.from_dict({
+            CoreOptions.SCAN_MODE.key(): StartupMode.DEFAULT.value,
+            CoreOptions.SCAN_SNAPSHOT_ID.key(): "1",
+        })
+
+        self.assertEqual(options.startup_mode(), StartupMode.FROM_SNAPSHOT)
+        _scan(options.options.to_map())._validate_scan_mode()
+
+    def test_unsupported_scan_modes_raise_value_error(self):
+        scan = _scan({
+            CoreOptions.SCAN_MODE.key(): StartupMode.COMPACTED_FULL.value,
+        })
+
+        with self.assertRaisesRegex(ValueError, "not yet supported"):
+            scan._validate_scan_mode()
+
+    def test_full_mode_maps_to_latest_full_with_deprecation_warning(self):
+        options = CoreOptions.from_dict({
+            CoreOptions.SCAN_MODE.key(): StartupMode.FULL.value,
+        })
+
+        with warnings.catch_warnings(record=True) as caught:
+            warnings.simplefilter("always")
+            self.assertEqual(options.startup_mode(), StartupMode.LATEST_FULL)
+
+        self.assertEqual(len(caught), 1)
+        self.assertTrue(issubclass(caught[0].category, DeprecationWarning))
+
+
+if __name__ == '__main__':
+    unittest.main()

Reply via email to