This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 82c3d83ad9 [python] Add conflict detection in shard update (#7630)
82c3d83ad9 is described below

commit 82c3d83ad9959c73a7bdfea0228b7fbcb1ac3dcd
Author: littlecoder04 <[email protected]>
AuthorDate: Sun Apr 12 15:52:13 2026 +0800

    [python] Add conflict detection in shard update (#7630)
    
    ### Purpose
    This PR is a follow-up to #7323. PR #7323 introduced conflict detection
    for Python data evolution updates, but the shard-update path was not
    covered.
    
    As a result, when shard update and compact run concurrently, the shard
    update may commit successfully against a stale scan snapshot instead of
    failing fast. The problem only shows up later during read, with the
    error:
    `All files in a field merge split should have the same row count`.This
    PR extends the same conflict-detection coverage to the shard-update
    path.
    
    ### Tests
    
    run_compact_conflict_test in run_mixed_tests.sh
---
 .../test/java/org/apache/paimon/JavaPyE2ETest.java | 69 ++++++++++++++++++++++
 paimon-python/dev/run_mixed_tests.sh               | 42 ++++++++++++-
 paimon-python/pypaimon/read/plan.py                |  5 +-
 .../pypaimon/read/scanner/file_scanner.py          | 22 ++++---
 .../pypaimon/read/streaming_table_scan.py          |  2 +-
 paimon-python/pypaimon/read/table_scan.py          | 12 ++--
 paimon-python/pypaimon/tests/binary_row_test.py    |  8 +--
 .../pypaimon/tests/e2e/java_py_read_write_test.py  | 59 ++++++++++++++++++
 .../pypaimon/tests/reader_append_only_test.py      | 45 ++++++++++++++
 .../pypaimon/write/commit/commit_scanner.py        |  4 +-
 paimon-python/pypaimon/write/file_store_commit.py  |  2 +-
 paimon-python/pypaimon/write/table_update.py       |  6 +-
 .../pypaimon/write/table_update_by_row_id.py       |  5 +-
 13 files changed, 254 insertions(+), 27 deletions(-)

diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java 
b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
index bd1e006ec7..c09bf34663 100644
--- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon;
 
+import org.apache.paimon.append.dataevolution.DataEvolutionCompactCoordinator;
+import org.apache.paimon.append.dataevolution.DataEvolutionCompactTask;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
@@ -48,9 +50,11 @@ import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.sink.BatchTableWrite;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.InnerTableCommit;
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.source.EndOfScanException;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
@@ -937,6 +941,71 @@ public class JavaPyE2ETest {
         return GenericRow.ofKind(rowKind, values[0], values[1], values[2]);
     }
 
+    /** Step 1: Write 5 base files for compact conflict test. */
+    @Test
+    @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
+    public void testCompactConflictWriteBase() throws Exception {
+        Identifier id = identifier("compact_conflict_test");
+        try {
+            catalog.dropTable(id, true);
+        } catch (Exception ignore) {
+        }
+        Schema schema =
+                Schema.newBuilder()
+                        .column("f0", DataTypes.INT())
+                        .column("f1", DataTypes.STRING())
+                        .column("f2", DataTypes.STRING())
+                        .option(ROW_TRACKING_ENABLED.key(), "true")
+                        .option(DATA_EVOLUTION_ENABLED.key(), "true")
+                        .option(BUCKET.key(), "-1")
+                        .build();
+        catalog.createTable(id, schema, false);
+
+        RowType fullType = schema.rowType().project(Arrays.asList("f0", "f1"));
+
+        for (int fileIdx = 0; fileIdx < 5; fileIdx++) {
+            int startId = fileIdx * 200;
+            FileStoreTable t = (FileStoreTable) catalog.getTable(id);
+            BatchWriteBuilder builder = t.newBatchWriteBuilder();
+            try (BatchTableWrite w = 
builder.newWrite().withWriteType(fullType)) {
+                for (int i = 0; i < 200; i++) {
+                    w.write(
+                            GenericRow.of(
+                                    startId + i, BinaryString.fromString("n" + 
(startId + i))));
+                }
+                builder.newCommit().commit(w.prepareCommit());
+            }
+        }
+        LOG.info("compact_conflict_test: 5 base files written (200 rows each, 
total 1000)");
+    }
+
+    /** Step 3: Run compact on compact_conflict_test table. */
+    @Test
+    @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
+    public void testCompactConflictRunCompact() throws Exception {
+        Identifier id = identifier("compact_conflict_test");
+        doDataEvolutionCompact((FileStoreTable) catalog.getTable(id));
+        LOG.info("compact_conflict_test: compact done, 5 files merged into 1 
(1000 rows)");
+    }
+
+    private void doDataEvolutionCompact(FileStoreTable table) throws Exception 
{
+        DataEvolutionCompactCoordinator coordinator =
+                new DataEvolutionCompactCoordinator(table, false, false);
+        List<CommitMessage> messages = new ArrayList<>();
+        try {
+            List<DataEvolutionCompactTask> tasks;
+            while (!(tasks = coordinator.plan()).isEmpty()) {
+                for (DataEvolutionCompactTask task : tasks) {
+                    messages.add(task.doCompact(table, "test-compact"));
+                }
+            }
+        } catch (EndOfScanException ignore) {
+        }
+        if (!messages.isEmpty()) {
+            table.newBatchWriteBuilder().newCommit().commit(messages);
+        }
+    }
+
     private static String rowToStringWithStruct(InternalRow row, RowType type) 
{
         StringBuilder build = new StringBuilder();
         build.append(row.getRowKind().shortString()).append("[");
diff --git a/paimon-python/dev/run_mixed_tests.sh 
b/paimon-python/dev/run_mixed_tests.sh
index 3366a5ff1c..077b5af276 100755
--- a/paimon-python/dev/run_mixed_tests.sh
+++ b/paimon-python/dev/run_mixed_tests.sh
@@ -290,6 +290,32 @@ run_lumina_vector_test() {
     fi
 }
 
+run_compact_conflict_test() {
+    echo -e "${YELLOW}=== Running Compact Conflict Test (Java Write Base, 
Python Shard Update + Java Compact) ===${NC}"
+
+    cd "$PROJECT_ROOT"
+
+    # Step 1: Java writes 5 base files
+    echo "Running Maven test for JavaPyE2ETest.testCompactConflictWriteBase..."
+    if mvn test 
-Dtest=org.apache.paimon.JavaPyE2ETest#testCompactConflictWriteBase -pl 
paimon-core -q -Drun.e2e.tests=true; then
+        echo -e "${GREEN}✓ Java write base files completed successfully${NC}"
+    else
+        echo -e "${RED}✗ Java write base files failed${NC}"
+        return 1
+    fi
+
+    # Step 2-4: Python shard update (scan -> Java compact -> commit conflict 
detected)
+    cd "$PAIMON_PYTHON_DIR"
+    echo "Running Python test for 
JavaPyReadWriteTest.test_compact_conflict_shard_update..."
+    if python -m pytest 
java_py_read_write_test.py::JavaPyReadWriteTest::test_compact_conflict_shard_update
 -v; then
+        echo -e "${GREEN}✓ Python compact conflict test completed 
successfully${NC}"
+        return 0
+    else
+        echo -e "${RED}✗ Python compact conflict test failed${NC}"
+        return 1
+    fi
+}
+
 run_blob_alter_compact_test() {
     echo -e "${YELLOW}=== Running Blob Alter+Compact Test (Java 
Write+Alter+Compact, Python Read) ===${NC}"
 
@@ -324,6 +350,7 @@ main() {
     local compressed_text_result=0
     local tantivy_fulltext_result=0
     local lumina_vector_result=0
+    local compact_conflict_result=0
     local blob_alter_compact_result=0
 
     # Detect Python version
@@ -407,6 +434,13 @@ main() {
 
     echo ""
 
+    # Run compact conflict test (Java write+compact, Python read)
+    if ! run_compact_conflict_test; then
+        compact_conflict_result=1
+    fi
+
+    echo ""
+
     # Run blob alter+compact test (Java write+alter+compact, Python read)
     if ! run_blob_alter_compact_test; then
         blob_alter_compact_result=1
@@ -470,6 +504,12 @@ main() {
         echo -e "${RED}✗ Lumina Vector Index Test (Java Write, Python Read): 
FAILED${NC}"
     fi
 
+    if [[ $compact_conflict_result -eq 0 ]]; then
+        echo -e "${GREEN}✓ Compact Conflict Test (Java Write+Compact, Python 
Read): PASSED${NC}"
+    else
+        echo -e "${RED}✗ Compact Conflict Test (Java Write+Compact, Python 
Read): FAILED${NC}"
+    fi
+
     if [[ $blob_alter_compact_result -eq 0 ]]; then
         echo -e "${GREEN}✓ Blob Alter+Compact Test (Java Write+Alter+Compact, 
Python Read): PASSED${NC}"
     else
@@ -481,7 +521,7 @@ main() {
     # Clean up warehouse directory after all tests
     cleanup_warehouse
 
-    if [[ $java_write_result -eq 0 && $python_read_result -eq 0 && 
$python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 && 
$btree_index_result -eq 0 && $compressed_text_result -eq 0 && 
$tantivy_fulltext_result -eq 0 && $lumina_vector_result -eq 0 && 
$blob_alter_compact_result -eq 0 ]]; then
+    if [[ $java_write_result -eq 0 && $python_read_result -eq 0 && 
$python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 && 
$btree_index_result -eq 0 && $compressed_text_result -eq 0 && 
$tantivy_fulltext_result -eq 0 && $lumina_vector_result -eq 0 && 
$compact_conflict_result -eq 0 && $blob_alter_compact_result -eq 0 ]]; then
         echo -e "${GREEN}🎉 All tests passed! Java-Python interoperability 
verified.${NC}"
         return 0
     else
diff --git a/paimon-python/pypaimon/read/plan.py 
b/paimon-python/pypaimon/read/plan.py
index 8c69a41a9b..c4ebc2408f 100644
--- a/paimon-python/pypaimon/read/plan.py
+++ b/paimon-python/pypaimon/read/plan.py
@@ -16,8 +16,8 @@
 # limitations under the License.
 
################################################################################
 
-from dataclasses import dataclass
-from typing import List
+from dataclasses import dataclass, field
+from typing import List, Optional
 
 
 from pypaimon.read.split import Split
@@ -27,6 +27,7 @@ from pypaimon.read.split import Split
 class Plan:
     """Implementation of Plan for native Python reading."""
     _splits: List[Split]
+    snapshot_id: Optional[int] = field(default=None)
 
     def splits(self) -> List[Split]:
         return self._splits
diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py 
b/paimon-python/pypaimon/read/scanner/file_scanner.py
index b0293dac41..80bf54b384 100755
--- a/paimon-python/pypaimon/read/scanner/file_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/file_scanner.py
@@ -18,7 +18,7 @@ limitations under the License.
 import logging
 import os
 import time
-from typing import Callable, Dict, List, Optional, Set
+from typing import Callable, Dict, List, Optional, Set, Tuple
 
 logger = logging.getLogger(__name__)
 
@@ -40,6 +40,7 @@ from pypaimon.read.scanner.data_evolution_split_generator 
import \
 from pypaimon.read.scanner.primary_key_table_split_generator import \
     PrimaryKeyTableSplitGenerator
 from pypaimon.read.split import DataSplit
+from pypaimon.snapshot.snapshot import Snapshot
 from pypaimon.snapshot.snapshot_manager import SnapshotManager
 from pypaimon.table.bucket_mode import BucketMode
 from pypaimon.table.source.deletion_file import DeletionFile
@@ -165,7 +166,7 @@ class FileScanner:
     def __init__(
         self,
         table,
-        manifest_scanner: Callable[[], List[ManifestFileMeta]],
+        manifest_scanner: Callable[[], Tuple[List[ManifestFileMeta], 
Optional[Snapshot]]],
         predicate: Optional[Predicate] = None,
         limit: Optional[int] = None
     ):
@@ -200,6 +201,8 @@ class FileScanner:
         self.data_evolution = options.data_evolution_enabled()
         self.deletion_vectors_enabled = options.deletion_vectors_enabled()
         self._global_index_result = None
+        self._scanned_snapshot = None
+        self._scanned_snapshot_id = None
 
         def schema_fields_func(schema_id: int):
             return self.table.schema_manager.get_schema(schema_id).fields
@@ -216,7 +219,8 @@ class FileScanner:
         bucket_files = set()
         for e in entries:
             bucket_files.add((tuple(e.partition.values), e.bucket))
-        return 
self._scan_dv_index(self.snapshot_manager.get_latest_snapshot(), bucket_files)
+        snapshot = self._scanned_snapshot if self._scanned_snapshot else 
self.snapshot_manager.get_latest_snapshot()
+        return self._scan_dv_index(snapshot, bucket_files)
 
     def scan(self) -> Plan:
         start_ms = time.time() * 1000
@@ -241,7 +245,7 @@ class FileScanner:
             )
 
         if not entries:
-            return Plan([])
+            return Plan([], snapshot_id=self._scanned_snapshot_id)
 
         # Configure sharding if needed
         if self.idx_of_this_subtask is not None:
@@ -258,7 +262,7 @@ class FileScanner:
             "File store scan plan completed in %d ms. Files size: %d",
             duration_ms, len(entries)
         )
-        return Plan(splits)
+        return Plan(splits, snapshot_id=self._scanned_snapshot_id)
 
     def _create_data_evolution_split_generator(self):
         row_ranges = None
@@ -272,7 +276,9 @@ class FileScanner:
         if row_ranges is None and self.predicate is not None:
             row_ranges = _row_ranges_from_predicate(self.predicate)
 
-        manifest_files = self.manifest_scanner()
+        manifest_files, snapshot = self.manifest_scanner()
+        self._scanned_snapshot = snapshot
+        self._scanned_snapshot_id = snapshot.id if snapshot else None
 
         # Filter manifest files by row ranges if available
         if row_ranges is not None:
@@ -293,7 +299,9 @@ class FileScanner:
         )
 
     def plan_files(self) -> List[ManifestEntry]:
-        manifest_files = self.manifest_scanner()
+        manifest_files, snapshot = self.manifest_scanner()
+        self._scanned_snapshot = snapshot
+        self._scanned_snapshot_id = snapshot.id if snapshot else None
         if len(manifest_files) == 0:
             return []
         return self.read_manifest_entries(manifest_files)
diff --git a/paimon-python/pypaimon/read/streaming_table_scan.py 
b/paimon-python/pypaimon/read/streaming_table_scan.py
index 835c61dd73..f426d2064f 100644
--- a/paimon-python/pypaimon/read/streaming_table_scan.py
+++ b/paimon-python/pypaimon/read/streaming_table_scan.py
@@ -323,7 +323,7 @@ class AsyncStreamingTableScan:
     def _create_initial_plan(self, snapshot: Snapshot) -> Plan:
         """Create a Plan for the initial full scan of the latest snapshot."""
         def all_manifests():
-            return self._manifest_list_manager.read_all(snapshot)
+            return self._manifest_list_manager.read_all(snapshot), snapshot
 
         starting_scanner = FileScanner(
             self.table,
diff --git a/paimon-python/pypaimon/read/table_scan.py 
b/paimon-python/pypaimon/read/table_scan.py
index 9579f875cb..c754b50111 100755
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -59,7 +59,7 @@ class TableScan:
             earliest_snapshot = snapshot_manager.try_get_earliest_snapshot()
             latest_snapshot = snapshot_manager.get_latest_snapshot()
             if earliest_snapshot is None or latest_snapshot is None:
-                return FileScanner(self.table, lambda: [])
+                return FileScanner(self.table, lambda: ([], None))
             start_timestamp = int(ts[0])
             end_timestamp = int(ts[1])
             if start_timestamp >= end_timestamp:
@@ -67,7 +67,7 @@ class TableScan:
                     "Ending timestamp %s should be >= starting timestamp %s." 
% (end_timestamp, start_timestamp))
             if (start_timestamp == end_timestamp or start_timestamp > 
latest_snapshot.time_millis
                     or end_timestamp < earliest_snapshot.time_millis):
-                return FileScanner(self.table, lambda: [])
+                return FileScanner(self.table, lambda: ([], None))
 
             starting_snapshot = 
snapshot_manager.earlier_or_equal_time_mills(start_timestamp)
             earliest_snapshot = snapshot_manager.try_get_earliest_snapshot()
@@ -84,8 +84,10 @@ class TableScan:
 
             def incremental_manifest():
                 snapshots_in_range = []
+                end_snapshot = snapshot_manager.get_snapshot_by_id(end_id) if 
end_id >= 1 else None
                 for snapshot_id in range(start_id + 1, end_id + 1):
                     snapshot = snapshot_manager.get_snapshot_by_id(snapshot_id)
+                    end_snapshot = snapshot
                     if snapshot.commit_kind == "APPEND":
                         snapshots_in_range.append(snapshot)
 
@@ -94,7 +96,7 @@ class TableScan:
                 for snapshot in snapshots_in_range:
                     manifest_files = manifest_list_manager.read_delta(snapshot)
                     manifests.extend(manifest_files)
-                return manifests
+                return manifests, end_snapshot
 
             return FileScanner(self.table, incremental_manifest, 
self.predicate, self.limit)
         elif options.contains(CoreOptions.SCAN_TAG_NAME):  # Handle tag-based 
reading
@@ -104,7 +106,7 @@ class TableScan:
                 tag_manager = self.table.tag_manager()
                 tag = tag_manager.get_or_throw(tag_name)
                 snapshot = tag.trim_to_snapshot()
-                return manifest_list_manager.read_all(snapshot)
+                return manifest_list_manager.read_all(snapshot), snapshot
 
             return FileScanner(
                 self.table,
@@ -115,7 +117,7 @@ class TableScan:
 
         def all_manifests():
             snapshot = snapshot_manager.get_latest_snapshot()
-            return manifest_list_manager.read_all(snapshot)
+            return manifest_list_manager.read_all(snapshot), snapshot
 
         return FileScanner(
             self.table,
diff --git a/paimon-python/pypaimon/tests/binary_row_test.py 
b/paimon-python/pypaimon/tests/binary_row_test.py
index b1a35226eb..607b184444 100644
--- a/paimon-python/pypaimon/tests/binary_row_test.py
+++ b/paimon-python/pypaimon/tests/binary_row_test.py
@@ -117,7 +117,7 @@ class BinaryRowTest(unittest.TestCase):
 
     def test_is_not_null_append(self):
         table = self.catalog.get_table('default.test_append')
-        file_scanner = FileScanner(table, lambda: [])
+        file_scanner = FileScanner(table, lambda: ([], None))
         latest_snapshot = file_scanner.snapshot_manager.get_latest_snapshot()
         manifest_files = 
file_scanner.manifest_list_manager.read_all(latest_snapshot)
         manifest_entries = 
file_scanner.manifest_file_manager.read(manifest_files[0].file_name)
@@ -254,7 +254,7 @@ class BinaryRowTest(unittest.TestCase):
         table_write.close()
         table_commit.close()
 
-        file_scanner = FileScanner(table, lambda: [])
+        file_scanner = FileScanner(table, lambda: ([], None))
         latest_snapshot = file_scanner.snapshot_manager.get_latest_snapshot()
         manifest_files = 
file_scanner.manifest_list_manager.read_all(latest_snapshot)
         manifest_entries = 
file_scanner.manifest_file_manager.read(manifest_files[0].file_name)
@@ -293,7 +293,7 @@ class BinaryRowTest(unittest.TestCase):
                          }
         self.assertEqual(expected_data, actual.to_pydict())
 
-        file_scanner = FileScanner(table, lambda: [])
+        file_scanner = FileScanner(table, lambda: ([], None))
         latest_snapshot = file_scanner.snapshot_manager.get_latest_snapshot()
         manifest_files = 
file_scanner.manifest_list_manager.read_all(latest_snapshot)
         manifest_entries = 
file_scanner.manifest_file_manager.read(manifest_files[0].file_name)
@@ -324,7 +324,7 @@ class BinaryRowTest(unittest.TestCase):
                                                                                
 trimmed_pk_fields)
 
     def _overwrite_manifest_entry(self, table):
-        file_scanner = FileScanner(table, lambda: [])
+        file_scanner = FileScanner(table, lambda: ([], None))
         latest_snapshot = file_scanner.snapshot_manager.get_latest_snapshot()
         manifest_files = 
file_scanner.manifest_list_manager.read_all(latest_snapshot)
         manifest_entries = 
file_scanner.manifest_file_manager.read(manifest_files[0].file_name)
diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py 
b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
index 4a61b9a067..3eee324b6c 100644
--- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
+++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
@@ -611,3 +611,62 @@ class JavaPyReadWriteTest(unittest.TestCase):
         splits = table_scan.plan().splits()
         result = table_read.to_arrow(splits)
         self.assertEqual(result.num_rows, 200)
+
+    def test_compact_conflict_shard_update(self):
+        """
+        1. Java writes 5 base files (testCompactConflictWriteBase)
+        2. pypaimon ShardTableUpdator scans table, prepares evolution
+        3. Java runs compact (testCompactConflictRunCompact)
+        4. pypaimon commits stale evolution -> conflict detected, raises 
RuntimeError
+        """
+        import subprocess
+
+        table = self.catalog.get_table('default.compact_conflict_test')
+
+        # Step 2: pypaimon shard update - scan and prepare commit
+        wb = table.new_batch_write_builder()
+        update = wb.new_update()
+        update.with_read_projection(['f0'])
+        update.with_update_type(['f2'])
+        upd = update.new_shard_updator(shard_num=0, total_shard_count=3)
+        print(f"Shard 0 row_ranges: {[(r[1].from_, r[1].to) for r in 
upd.row_ranges]}")
+
+        reader = upd.arrow_reader()
+        import pyarrow as pa
+        rows_read = 0
+        for batch in iter(reader.read_next_batch, None):
+            n = batch.num_rows
+            rows_read += n
+            upd.update_by_arrow_batch(
+                pa.RecordBatch.from_pydict(
+                    {'f2': [f'evo_{i}' for i in range(n)]},
+                    schema=pa.schema([('f2', pa.string())])
+                )
+            )
+        print(f"Shard update read {rows_read} rows")
+        stale_commit_msgs = upd.prepare_commit()
+
+        # Step 3: Java compact (compact happening between scan and commit)
+        project_root = os.path.join(self.tempdir, '..', '..', '..', '..')
+        result = subprocess.run(
+            ['mvn', 'test',
+             '-pl', 'paimon-core',
+             
'-Dtest=org.apache.paimon.JavaPyE2ETest#testCompactConflictRunCompact',
+             '-Drun.e2e.tests=true',
+             '-Dsurefire.failIfNoSpecifiedTests=false',
+             '-q'],
+            cwd=os.path.abspath(project_root),
+            stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+            universal_newlines=True, timeout=120
+        )
+        self.assertEqual(result.returncode, 0,
+                         f"Java compact 
failed:\n{result.stdout}\n{result.stderr}")
+        print("Java compact completed")
+
+        # Step 4: pypaimon commits stale evolution -> conflict detected
+        tc = wb.new_commit()
+        with self.assertRaises(RuntimeError) as ctx:
+            tc.commit(stale_commit_msgs)
+        self.assertIn("conflicts", str(ctx.exception))
+        tc.close()
+        print(f"Conflict detected as expected: {ctx.exception}")
diff --git a/paimon-python/pypaimon/tests/reader_append_only_test.py 
b/paimon-python/pypaimon/tests/reader_append_only_test.py
index a8529c14a8..c4763dd99e 100644
--- a/paimon-python/pypaimon/tests/reader_append_only_test.py
+++ b/paimon-python/pypaimon/tests/reader_append_only_test.py
@@ -103,6 +103,51 @@ class AoReaderTest(unittest.TestCase):
         actual = self._read_test_table(read_builder).sort_by('user_id')
         self.assertEqual(actual, self.expected)
 
+    def test_plan_snapshot_id_for_empty_and_non_empty_scan(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'])
+        self.catalog.create_table('default.test_plan_snapshot_id', schema, 
False)
+        table = self.catalog.get_table('default.test_plan_snapshot_id')
+
+        empty_plan = table.new_read_builder().new_scan().plan()
+        self.assertIsNone(empty_plan.snapshot_id)
+        self.assertEqual(len(empty_plan.splits()), 0)
+
+        self._write_test_table(table)
+
+        plan = table.new_read_builder().new_scan().plan()
+        self.assertEqual(plan.snapshot_id, 2)
+        self.assertGreater(len(plan.splits()), 0)
+
+    def test_incremental_timestamp_empty_range_keeps_end_snapshot_id(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'])
+        
self.catalog.create_table('default.test_incremental_empty_range_snapshot', 
schema, False)
+        table = 
self.catalog.get_table('default.test_incremental_empty_range_snapshot')
+
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        pa_table = pa.Table.from_pydict({
+            'user_id': [1],
+            'item_id': [1001],
+            'behavior': ['a'],
+            'dt': ['p1'],
+        }, schema=self.pa_schema)
+        table_write.write_arrow(pa_table)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        snapshot_manager = SnapshotManager(table)
+        snapshot = snapshot_manager.get_latest_snapshot()
+        table_inc = table.copy({
+            CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP.key():
+                "{},{}".format(snapshot.time_millis, snapshot.time_millis + 1)
+        })
+
+        plan = table_inc.new_read_builder().new_scan().plan()
+        self.assertEqual(plan.snapshot_id, snapshot.id)
+        self.assertEqual(len(plan.splits()), 0)
+
     @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python 
>= 3.11")
     def test_vortex_ao_reader(self):
         schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'], options={'file.format': 'vortex'})
diff --git a/paimon-python/pypaimon/write/commit/commit_scanner.py 
b/paimon-python/pypaimon/write/commit/commit_scanner.py
index 6158c86f5c..d758df3675 100644
--- a/paimon-python/pypaimon/write/commit/commit_scanner.py
+++ b/paimon-python/pypaimon/write/commit/commit_scanner.py
@@ -66,7 +66,7 @@ class CommitScanner:
 
         all_manifests = self.manifest_list_manager.read_all(latest_snapshot)
         return FileScanner(
-            self.table, lambda: [], partition_filter
+            self.table, lambda: ([], None), partition_filter
         ).read_manifest_entries(all_manifests)
 
     def read_incremental_entries_from_changed_partitions(self, snapshot: 
Snapshot,
@@ -92,7 +92,7 @@ class CommitScanner:
         partition_filter = 
self._build_partition_filter_from_entries(commit_entries)
 
         return FileScanner(
-            self.table, lambda: [], partition_filter
+            self.table, lambda: ([], None), partition_filter
         ).read_manifest_entries(delta_manifests)
 
     def _build_partition_filter_from_entries(self, entries: 
List[ManifestEntry]):
diff --git a/paimon-python/pypaimon/write/file_store_commit.py 
b/paimon-python/pypaimon/write/file_store_commit.py
index a4099ac1db..8937842680 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -516,7 +516,7 @@ class FileStoreCommit:
         """Generate commit entries for OVERWRITE mode based on latest 
snapshot."""
         entries = []
         current_entries = [] if latest_snapshot is None \
-            else (FileScanner(self.table, lambda: [], partition_filter).
+            else (FileScanner(self.table, lambda: ([], None), 
partition_filter).
                   
read_manifest_entries(self.manifest_list_manager.read_all(latest_snapshot)))
         for entry in current_entries:
             entry.kind = 1  # DELETE
diff --git a/paimon-python/pypaimon/write/table_update.py 
b/paimon-python/pypaimon/write/table_update.py
index b26194eb55..ec192a98a5 100644
--- a/paimon-python/pypaimon/write/table_update.py
+++ b/paimon-python/pypaimon/write/table_update.py
@@ -171,7 +171,9 @@ class ShardTableUpdator:
         self.dict = defaultdict(list)
 
         scanner = self.table.new_read_builder().new_scan()
-        splits = scanner.plan().splits()
+        plan = scanner.plan()
+        self.snapshot_id = plan.snapshot_id if plan.snapshot_id is not None 
else -1
+        splits = plan.splits()
         splits = _filter_by_whole_file_shard(splits, shard_num, 
total_shard_count)
         self.splits = splits
 
@@ -197,7 +199,7 @@ class ShardTableUpdator:
     def prepare_commit(self) -> List[CommitMessage]:
         commit_messages = []
         for (partition, files) in self.dict.items():
-            commit_messages.append(CommitMessage(partition, 0, files))
+            commit_messages.append(CommitMessage(partition, 0, files, 
self.snapshot_id))
         return commit_messages
 
     def update_by_arrow_batch(self, data: pa.RecordBatch):
diff --git a/paimon-python/pypaimon/write/table_update_by_row_id.py 
b/paimon-python/pypaimon/write/table_update_by_row_id.py
index 089fde047c..2e2efe44f7 100644
--- a/paimon-python/pypaimon/write/table_update_by_row_id.py
+++ b/paimon-python/pypaimon/write/table_update_by_row_id.py
@@ -65,7 +65,8 @@ class TableUpdateByRowId:
 
         read_builder = self.table.new_read_builder()
         scan = read_builder.new_scan()
-        splits = scan.plan().splits()
+        plan = scan.plan()
+        splits = plan.splits()
 
         for split in splits:
             for file in split.files:
@@ -77,7 +78,7 @@ class TableUpdateByRowId:
 
         total_row_count = sum(first_row_id_to_row_count_map.values())
 
-        snapshot_id = self.table.snapshot_manager().get_latest_snapshot().id
+        snapshot_id = plan.snapshot_id if plan.snapshot_id is not None else -1
         return (snapshot_id,
                 sorted(list(set(first_row_ids))),
                 first_row_id_to_partition_map,

Reply via email to