This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a884fc31f [python] Add FollowUpScanner, IncrementalDiffScanner, 
sharding (#7348)
4a884fc31f is described below

commit 4a884fc31fda314efa50cfe96f2184f279e76564
Author: Toby Cole <[email protected]>
AuthorDate: Wed Mar 11 06:17:05 2026 +0000

    [python] Add FollowUpScanner, IncrementalDiffScanner, sharding (#7348)
    
    - Add `FollowUpScanner` hierarchy (base, delta, changelog) for streaming
    scan planning
    - Add `IncrementalDiffScanner` for diff-based streaming reads
    - Add sharding support to `FileScanner`
---
 .../pypaimon/manifest/manifest_list_manager.py     |   1 +
 .../read/scanner/changelog_follow_up_scanner.py    |  29 ++
 .../read/scanner/delta_follow_up_scanner.py        |  28 ++
 .../pypaimon/read/scanner/file_scanner.py          |  47 ++-
 .../pypaimon/read/scanner/follow_up_scanner.py     |  30 ++
 .../read/scanner/incremental_diff_scanner.py       | 100 ++++++
 paimon-python/pypaimon/read/table_read.py          |   2 +-
 .../tests/changelog_follow_up_scanner_test.py      |  51 +++
 .../pypaimon/tests/follow_up_scanner_test.py       |  59 ++++
 .../tests/scanner/incremental_diff_scanner_test.py | 390 +++++++++++++++++++++
 10 files changed, 720 insertions(+), 17 deletions(-)

diff --git a/paimon-python/pypaimon/manifest/manifest_list_manager.py 
b/paimon-python/pypaimon/manifest/manifest_list_manager.py
index bc14fd86e4..3790dfce04 100644
--- a/paimon-python/pypaimon/manifest/manifest_list_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_list_manager.py
@@ -40,6 +40,7 @@ class ManifestListManager:
         self.file_io = self.table.file_io
 
     def read_all(self, snapshot: Optional[Snapshot]) -> List[ManifestFileMeta]:
+        """Read base + delta manifest lists for full file state."""
         if snapshot is None:
             return []
         manifest_files = []
diff --git a/paimon-python/pypaimon/read/scanner/changelog_follow_up_scanner.py 
b/paimon-python/pypaimon/read/scanner/changelog_follow_up_scanner.py
new file mode 100644
index 0000000000..6ae3b4068a
--- /dev/null
+++ b/paimon-python/pypaimon/read/scanner/changelog_follow_up_scanner.py
@@ -0,0 +1,29 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""ChangelogFollowUpScanner for tables with changelog-producer settings."""
+
+from pypaimon.read.scanner.follow_up_scanner import FollowUpScanner
+from pypaimon.snapshot.snapshot import Snapshot
+
+
+class ChangelogFollowUpScanner(FollowUpScanner):
+    """Scans any commit that has a changelog_manifest_list."""
+
+    def should_scan(self, snapshot: Snapshot) -> bool:
+        changelog_list = snapshot.changelog_manifest_list
+        return changelog_list is not None and changelog_list != ""
diff --git a/paimon-python/pypaimon/read/scanner/delta_follow_up_scanner.py 
b/paimon-python/pypaimon/read/scanner/delta_follow_up_scanner.py
new file mode 100644
index 0000000000..bb9b6a92bc
--- /dev/null
+++ b/paimon-python/pypaimon/read/scanner/delta_follow_up_scanner.py
@@ -0,0 +1,28 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""DeltaFollowUpScanner for append-only streaming reads."""
+
+from pypaimon.read.scanner.follow_up_scanner import FollowUpScanner
+from pypaimon.snapshot.snapshot import Snapshot
+
+
+class DeltaFollowUpScanner(FollowUpScanner):
+    """Scans only APPEND commits; skips compaction and maintenance."""
+
+    def should_scan(self, snapshot: Snapshot) -> bool:
+        return snapshot.commit_kind == "APPEND"
diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py 
b/paimon-python/pypaimon/read/scanner/file_scanner.py
index e56c568b95..d5e543b2fc 100755
--- a/paimon-python/pypaimon/read/scanner/file_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/file_scanner.py
@@ -15,38 +15,39 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 """
+import logging
 import os
 import time
-import logging
-from typing import List, Optional, Dict, Set, Callable
+from typing import Callable, Dict, List, Optional, Set
 
 logger = logging.getLogger(__name__)
 
 from pypaimon.common.predicate import Predicate
 from pypaimon.globalindex import ScoredGlobalIndexResult
-from pypaimon.table.source.deletion_file import DeletionFile
 from pypaimon.manifest.index_manifest_file import IndexManifestFile
 from pypaimon.manifest.manifest_file_manager import ManifestFileManager
 from pypaimon.manifest.manifest_list_manager import ManifestListManager
 from pypaimon.manifest.schema.manifest_entry import ManifestEntry
 from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta
+from pypaimon.manifest.simple_stats_evolutions import SimpleStatsEvolutions
 from pypaimon.read.plan import Plan
-from pypaimon.read.push_down_utils import (
-    remove_row_id_filter,
-    trim_and_transform_predicate,
-)
-from pypaimon.read.scanner.append_table_split_generator import 
AppendTableSplitGenerator
-from pypaimon.read.scanner.data_evolution_split_generator import 
DataEvolutionSplitGenerator
-from pypaimon.read.scanner.primary_key_table_split_generator import 
PrimaryKeyTableSplitGenerator
+from pypaimon.read.push_down_utils import (remove_row_id_filter,
+                                           trim_and_transform_predicate)
+from pypaimon.read.scanner.append_table_split_generator import \
+    AppendTableSplitGenerator
+from pypaimon.read.scanner.data_evolution_split_generator import \
+    DataEvolutionSplitGenerator
+from pypaimon.read.scanner.primary_key_table_split_generator import \
+    PrimaryKeyTableSplitGenerator
 from pypaimon.read.split import DataSplit
 from pypaimon.snapshot.snapshot_manager import SnapshotManager
 from pypaimon.table.bucket_mode import BucketMode
-from pypaimon.manifest.simple_stats_evolutions import SimpleStatsEvolutions
+from pypaimon.table.source.deletion_file import DeletionFile
 
 
 def _row_ranges_from_predicate(predicate: Optional[Predicate]) -> 
Optional[List]:
-    from pypaimon.utils.range import Range
     from pypaimon.table.special_fields import SpecialFields
+    from pypaimon.utils.range import Range
 
     if predicate is None:
         return None
@@ -167,7 +168,10 @@ class FileScanner:
         manifest_scanner: Callable[[], List[ManifestFileMeta]],
         predicate: Optional[Predicate] = None,
         limit: Optional[int] = None,
-        vector_search: Optional['VectorSearch'] = None
+        vector_search: Optional['VectorSearch'] = None,
+        shard_index: Optional[int] = None,
+        shard_count: Optional[int] = None,
+        bucket_filter: Optional[Callable[[int], bool]] = None
     ):
         from pypaimon.table.file_store_table import FileStoreTable
 
@@ -178,6 +182,11 @@ class FileScanner:
         self.limit = limit
         self.vector_search = vector_search
 
+        # Bucket-level sharding for parallel consumption
+        self._shard_index = shard_index
+        self._shard_count = shard_count
+        self._bucket_filter = bucket_filter
+
         self.snapshot_manager = SnapshotManager(table)
         self.manifest_list_manager = ManifestListManager(table)
         self.manifest_file_manager = ManifestFileManager(table)
@@ -299,9 +308,8 @@ class FileScanner:
 
     def _eval_global_index(self):
         from pypaimon.globalindex.global_index_result import GlobalIndexResult
-        from pypaimon.globalindex.global_index_scan_builder import (
+        from pypaimon.globalindex.global_index_scan_builder import \
             GlobalIndexScanBuilder
-        )
         from pypaimon.utils.range import Range
 
         # No filter and no vector search - nothing to evaluate
@@ -361,7 +369,7 @@ class FileScanner:
         return result
 
     def read_manifest_entries(self, manifest_files: List[ManifestFileMeta]) -> 
List[ManifestEntry]:
-        max_workers = max(8, 
self.table.options.scan_manifest_parallelism(os.cpu_count() or 8))
+        max_workers = 
self.table.options.scan_manifest_parallelism(os.cpu_count() or 8)
         manifest_files = [entry for entry in manifest_files if 
self._filter_manifest_file(entry)]
         return self.manifest_file_manager.read_entries_parallel(
             manifest_files,
@@ -414,6 +422,13 @@ class FileScanner:
             return False
         if self.partition_key_predicate and not 
self.partition_key_predicate.test(entry.partition):
             return False
+        # Apply bucket-level sharding for parallel consumption
+        if self._shard_index is not None and self._shard_count is not None:
+            if entry.bucket % self._shard_count != self._shard_index:
+                return False
+        elif self._bucket_filter is not None:
+            if not self._bucket_filter(entry.bucket):
+                return False
         # Get SimpleStatsEvolution for this schema
         evolution = 
self.simple_stats_evolutions.get_or_create(entry.file.schema_id)
 
diff --git a/paimon-python/pypaimon/read/scanner/follow_up_scanner.py 
b/paimon-python/pypaimon/read/scanner/follow_up_scanner.py
new file mode 100644
index 0000000000..cace582298
--- /dev/null
+++ b/paimon-python/pypaimon/read/scanner/follow_up_scanner.py
@@ -0,0 +1,30 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""FollowUpScanner interface for streaming table scans."""
+
+from abc import ABC, abstractmethod
+
+from pypaimon.snapshot.snapshot import Snapshot
+
+
+class FollowUpScanner(ABC):
+    """Determines which snapshots to scan after the initial streaming scan."""
+
+    @abstractmethod
+    def should_scan(self, snapshot: Snapshot) -> bool:
+        ...
diff --git a/paimon-python/pypaimon/read/scanner/incremental_diff_scanner.py 
b/paimon-python/pypaimon/read/scanner/incremental_diff_scanner.py
new file mode 100644
index 0000000000..61ec66e035
--- /dev/null
+++ b/paimon-python/pypaimon/read/scanner/incremental_diff_scanner.py
@@ -0,0 +1,100 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import os
+from typing import List, Set, Tuple
+
+from pypaimon.manifest.manifest_file_manager import ManifestFileManager
+from pypaimon.manifest.manifest_list_manager import ManifestListManager
+from pypaimon.manifest.schema.manifest_entry import ManifestEntry
+from pypaimon.read.plan import Plan
+from pypaimon.read.scanner.append_table_split_generator import \
+    AppendTableSplitGenerator
+from pypaimon.snapshot.snapshot import Snapshot
+
+
+class IncrementalDiffScanner:
+    """
+    Scan files added between two snapshots via set-diff.
+
+    More efficient than reading N delta_manifest_lists when many intermediate
+    snapshots are compaction-only.
+    """
+
+    def __init__(self, table):
+        from pypaimon.table.file_store_table import FileStoreTable
+
+        self.table: FileStoreTable = table
+        self.manifest_list_manager = ManifestListManager(table)
+        self.manifest_file_manager = ManifestFileManager(table)
+
+        options = self.table.options
+        self.target_split_size = options.source_split_target_size()
+        self.open_file_cost = options.source_split_open_file_cost()
+
+    def scan(self, start_snapshot: Snapshot, end_snapshot: Snapshot) -> Plan:
+        """Scan files added between start (exclusive) and end (inclusive) 
snapshots."""
+        added_entries = self.compute_diff(start_snapshot, end_snapshot)
+
+        if not added_entries:
+            return Plan([])
+
+        split_generator = AppendTableSplitGenerator(
+            self.table,
+            self.target_split_size,
+            self.open_file_cost,
+            {}  # No deletion files for incremental diff
+        )
+
+        splits = split_generator.create_splits(added_entries)
+        return Plan(splits)
+
+    def compute_diff(
+        self,
+        start_snapshot: Snapshot,
+        end_snapshot: Snapshot
+    ) -> List[ManifestEntry]:
+        """Return files present in end_snapshot but absent from 
start_snapshot."""
+        start_manifest_files = 
self.manifest_list_manager.read_all(start_snapshot)
+        end_manifest_files = self.manifest_list_manager.read_all(end_snapshot)
+
+        max_workers = 
self.table.options.scan_manifest_parallelism(os.cpu_count() or 8)
+
+        start_entries = self.manifest_file_manager.read_entries_parallel(
+            start_manifest_files,
+            max_workers=max_workers
+        )
+        end_entries = self.manifest_file_manager.read_entries_parallel(
+            end_manifest_files,
+            max_workers=max_workers
+        )
+
+        start_keys: Set[Tuple] = {self._entry_key(e) for e in start_entries}
+
+        added_entries = [
+            entry for entry in end_entries
+            if self._entry_key(entry) not in start_keys
+        ]
+
+        return added_entries
+
+    def _entry_key(self, entry: ManifestEntry) -> Tuple:
+        return (
+            tuple(entry.partition.values),
+            entry.bucket,
+            entry.file.file_name
+        )
diff --git a/paimon-python/pypaimon/read/table_read.py 
b/paimon-python/pypaimon/read/table_read.py
index 5206147f80..28e5a9f5ba 100644
--- a/paimon-python/pypaimon/read/table_read.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -198,7 +198,7 @@ class TableRead:
                 You needn't manually set this in most cases.
             **read_args: Additional kwargs passed to the datasource.
                 For example, ``per_task_row_limit`` (Ray 2.52.0+).
-        
+
         See `Ray Data API 
<https://docs.ray.io/en/latest/data/api/doc/ray.data.read_datasource.html>`_
         for details.
         """
diff --git a/paimon-python/pypaimon/tests/changelog_follow_up_scanner_test.py 
b/paimon-python/pypaimon/tests/changelog_follow_up_scanner_test.py
new file mode 100644
index 0000000000..2c959276c3
--- /dev/null
+++ b/paimon-python/pypaimon/tests/changelog_follow_up_scanner_test.py
@@ -0,0 +1,51 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""Tests for ChangelogFollowUpScanner."""
+
+import unittest
+from unittest.mock import Mock
+
+from pypaimon.read.scanner.changelog_follow_up_scanner import \
+    ChangelogFollowUpScanner
+
+
+class ChangelogFollowUpScannerTest(unittest.TestCase):
+    """Tests for ChangelogFollowUpScanner."""
+
+    def test_should_scan_any_commit_with_changelog(self):
+        """Scanner scans based on changelog_manifest_list, regardless of 
commit kind."""
+        scanner = ChangelogFollowUpScanner()
+        for kind in ("APPEND", "COMPACT"):
+            snapshot = 
Mock(changelog_manifest_list=f"changelog-manifest-{kind}")
+            self.assertTrue(scanner.should_scan(snapshot), kind)
+
+    def test_should_skip_when_no_changelog(self):
+        """Scanner should skip when changelog_manifest_list is None."""
+        scanner = ChangelogFollowUpScanner()
+        snapshot = Mock(changelog_manifest_list=None)
+        self.assertFalse(scanner.should_scan(snapshot))
+
+    def test_should_skip_for_empty_string(self):
+        """Empty string should be treated as no changelog."""
+        scanner = ChangelogFollowUpScanner()
+        snapshot = Mock(changelog_manifest_list="")
+        self.assertFalse(scanner.should_scan(snapshot))
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/paimon-python/pypaimon/tests/follow_up_scanner_test.py 
b/paimon-python/pypaimon/tests/follow_up_scanner_test.py
new file mode 100644
index 0000000000..e7b6bcd4dd
--- /dev/null
+++ b/paimon-python/pypaimon/tests/follow_up_scanner_test.py
@@ -0,0 +1,59 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""Tests for FollowUpScanner implementations."""
+
+import unittest
+from unittest.mock import Mock
+
+from pypaimon.read.scanner.delta_follow_up_scanner import DeltaFollowUpScanner
+from pypaimon.read.scanner.follow_up_scanner import FollowUpScanner
+
+
+class FollowUpScannerInterfaceTest(unittest.TestCase):
+    """Test that FollowUpScanner interface is properly defined."""
+
+    def test_follow_up_scanner_is_abstract(self):
+        """FollowUpScanner should be an abstract base class."""
+        with self.assertRaises(TypeError):
+            FollowUpScanner()
+
+
+class DeltaFollowUpScannerTest(unittest.TestCase):
+    """Tests for DeltaFollowUpScanner which handles APPEND commits only."""
+
+    def setUp(self):
+        self.scanner = DeltaFollowUpScanner()
+
+    def test_should_scan_returns_true_for_append_commit(self):
+        """DeltaFollowUpScanner should scan APPEND commits."""
+        snapshot = Mock()
+        snapshot.commit_kind = "APPEND"
+
+        result = self.scanner.should_scan(snapshot)
+
+        self.assertTrue(result)
+
+    def test_should_scan_returns_false_for_non_append_commits(self):
+        """DeltaFollowUpScanner should skip non-APPEND commits."""
+        for kind in ("COMPACT", "OVERWRITE", "EXPIRE", "ANALYZE"):
+            snapshot = Mock(commit_kind=kind)
+            self.assertFalse(self.scanner.should_scan(snapshot), kind)
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git 
a/paimon-python/pypaimon/tests/scanner/incremental_diff_scanner_test.py 
b/paimon-python/pypaimon/tests/scanner/incremental_diff_scanner_test.py
new file mode 100644
index 0000000000..d5feba2b7d
--- /dev/null
+++ b/paimon-python/pypaimon/tests/scanner/incremental_diff_scanner_test.py
@@ -0,0 +1,390 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""Tests for IncrementalDiffScanner."""
+
+import unittest
+from unittest.mock import Mock, patch
+
+
+def _create_mock_table():
+    table = Mock()
+    table.is_primary_key_table = False
+    table.options = Mock()
+    table.options.source_split_target_size.return_value = 128 * 1024 * 1024
+    table.options.source_split_open_file_cost.return_value = 4 * 1024 * 1024
+    table.options.scan_manifest_parallelism.return_value = 8
+    table.partition_keys = []
+    return table
+
+
+def _create_mock_entry(partition_values, bucket, filename):
+    entry = Mock()
+    entry.partition = Mock()
+    entry.partition.values = partition_values
+    entry.bucket = bucket
+    entry.total_buckets = 1
+    entry.file = Mock()
+    entry.file.file_name = filename
+    entry.file.file_size = 1024
+    entry.file.row_count = 100
+    entry.kind = 0  # ADD
+    return entry
+
+
+def _create_mock_snapshot(snapshot_id, commit_kind="APPEND",
+                          base_manifest="base", delta_manifest="delta"):
+    snapshot = Mock()
+    snapshot.id = snapshot_id
+    snapshot.commit_kind = commit_kind
+    snapshot.base_manifest_list = f"{base_manifest}-{snapshot_id}"
+    snapshot.delta_manifest_list = f"{delta_manifest}-{snapshot_id}"
+    return snapshot
+
+
+class IncrementalDiffScannerTest(unittest.TestCase):
+
+    
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestListManager')
+    
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestFileManager')
+    def test_diff_returns_only_added_files(self, MockManifestFileManager, 
MockManifestListManager):
+        """Files in end but not in start should be returned."""
+        from pypaimon.read.scanner.incremental_diff_scanner import \
+            IncrementalDiffScanner
+
+        table = _create_mock_table()
+
+        start_entries = [
+            _create_mock_entry([], 0, "file1.parquet"),
+            _create_mock_entry([], 0, "file2.parquet"),
+        ]
+        end_entries = [
+            _create_mock_entry([], 0, "file1.parquet"),
+            _create_mock_entry([], 0, "file2.parquet"),
+            _create_mock_entry([], 0, "file3.parquet"),
+            _create_mock_entry([], 0, "file4.parquet"),
+        ]
+
+        mock_mlm = MockManifestListManager.return_value
+        mock_mfm = MockManifestFileManager.return_value
+
+        mock_mlm.read_base.side_effect = lambda s: 
[Mock(file_name=f"manifest-{s.id}")]
+        mock_mfm.read_entries_parallel.side_effect = [start_entries, 
end_entries]
+
+        scanner = IncrementalDiffScanner(table)
+        start_snapshot = _create_mock_snapshot(1)
+        end_snapshot = _create_mock_snapshot(5)
+
+        added_entries = scanner.compute_diff(start_snapshot, end_snapshot)
+
+        added_filenames = {e.file.file_name for e in added_entries}
+        self.assertEqual(added_filenames, {"file3.parquet", "file4.parquet"})
+
+    
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestListManager')
+    
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestFileManager')
+    def test_diff_handles_empty_start(self, MockManifestFileManager, 
MockManifestListManager):
+        """When start is empty, all end files should be returned."""
+        from pypaimon.read.scanner.incremental_diff_scanner import \
+            IncrementalDiffScanner
+
+        table = _create_mock_table()
+
+        start_entries = []
+        end_entries = [
+            _create_mock_entry([], 0, "file1.parquet"),
+            _create_mock_entry([], 0, "file2.parquet"),
+        ]
+
+        mock_mlm = MockManifestListManager.return_value
+        mock_mfm = MockManifestFileManager.return_value
+        mock_mlm.read_base.side_effect = lambda s: 
[Mock(file_name=f"manifest-{s.id}")]
+        mock_mfm.read_entries_parallel.side_effect = [start_entries, 
end_entries]
+
+        scanner = IncrementalDiffScanner(table)
+        start_snapshot = _create_mock_snapshot(0)
+        end_snapshot = _create_mock_snapshot(5)
+
+        added_entries = scanner.compute_diff(start_snapshot, end_snapshot)
+
+        added_filenames = {e.file.file_name for e in added_entries}
+        self.assertEqual(added_filenames, {"file1.parquet", "file2.parquet"})
+
+    
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestListManager')
+    
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestFileManager')
+    def test_diff_handles_same_snapshots(self, MockManifestFileManager, 
MockManifestListManager):
+        """When start == end (same files), diff should return empty."""
+        from pypaimon.read.scanner.incremental_diff_scanner import \
+            IncrementalDiffScanner
+
+        table = _create_mock_table()
+
+        entries = [
+            _create_mock_entry([], 0, "file1.parquet"),
+            _create_mock_entry([], 0, "file2.parquet"),
+        ]
+
+        mock_mlm = MockManifestListManager.return_value
+        mock_mfm = MockManifestFileManager.return_value
+        mock_mlm.read_base.side_effect = lambda s: 
[Mock(file_name=f"manifest-{s.id}")]
+        mock_mfm.read_entries_parallel.side_effect = [entries, entries]
+
+        scanner = IncrementalDiffScanner(table)
+        snapshot = _create_mock_snapshot(5)
+
+        added_entries = scanner.compute_diff(snapshot, snapshot)
+
+        self.assertEqual(len(added_entries), 0)
+
+    
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestListManager')
+    
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestFileManager')
+    def test_diff_groups_by_partition_bucket(self, MockManifestFileManager, 
MockManifestListManager):
+        """Diff should be computed per (partition, bucket)."""
+        from pypaimon.read.scanner.incremental_diff_scanner import \
+            IncrementalDiffScanner
+
+        table = _create_mock_table()
+
+        start_entries = [
+            _create_mock_entry([1], 0, "file1.parquet"),  # p1, bucket 0
+            _create_mock_entry([1], 1, "file3.parquet"),  # p1, bucket 1
+        ]
+        end_entries = [
+            _create_mock_entry([1], 0, "file1.parquet"),  # p1, bucket 0
+            _create_mock_entry([1], 0, "file2.parquet"),  # p1, bucket 0 - NEW
+            _create_mock_entry([1], 1, "file3.parquet"),  # p1, bucket 1
+        ]
+
+        mock_mlm = MockManifestListManager.return_value
+        mock_mfm = MockManifestFileManager.return_value
+        mock_mlm.read_base.side_effect = lambda s: 
[Mock(file_name=f"manifest-{s.id}")]
+        mock_mfm.read_entries_parallel.side_effect = [start_entries, 
end_entries]
+
+        scanner = IncrementalDiffScanner(table)
+        start_snapshot = _create_mock_snapshot(1)
+        end_snapshot = _create_mock_snapshot(5)
+
+        added_entries = scanner.compute_diff(start_snapshot, end_snapshot)
+
+        self.assertEqual(len(added_entries), 1)
+        self.assertEqual(added_entries[0].file.file_name, "file2.parquet")
+
+    
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestListManager')
+    
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestFileManager')
+    def test_diff_handles_file_deletions(self, MockManifestFileManager, 
MockManifestListManager):
+        """Files in start but not in end (deleted) should NOT be returned."""
+        from pypaimon.read.scanner.incremental_diff_scanner import \
+            IncrementalDiffScanner
+
+        table = _create_mock_table()
+
+        start_entries = [
+            _create_mock_entry([], 0, "file1.parquet"),
+            _create_mock_entry([], 0, "file2.parquet"),
+            _create_mock_entry([], 0, "file3.parquet"),
+        ]
+        end_entries = [
+            _create_mock_entry([], 0, "file1.parquet"),
+            _create_mock_entry([], 0, "file2.parquet"),
+        ]
+
+        mock_mlm = MockManifestListManager.return_value
+        mock_mfm = MockManifestFileManager.return_value
+        mock_mlm.read_base.side_effect = lambda s: 
[Mock(file_name=f"manifest-{s.id}")]
+        mock_mfm.read_entries_parallel.side_effect = [start_entries, 
end_entries]
+
+        scanner = IncrementalDiffScanner(table)
+        start_snapshot = _create_mock_snapshot(1)
+        end_snapshot = _create_mock_snapshot(5)
+
+        added_entries = scanner.compute_diff(start_snapshot, end_snapshot)
+
+        self.assertEqual(len(added_entries), 0)
+
+    
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestListManager')
+    
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestFileManager')
+    def test_diff_creates_correct_splits(self, MockManifestFileManager, 
MockManifestListManager):
+        """scan() should return a Plan with correct DataSplits."""
+        from pypaimon.read.plan import Plan
+        from pypaimon.read.scanner.incremental_diff_scanner import \
+            IncrementalDiffScanner
+
+        table = _create_mock_table()
+
+        start_entries = []
+        end_entries = [
+            _create_mock_entry([], 0, "file1.parquet"),
+        ]
+
+        mock_mlm = MockManifestListManager.return_value
+        mock_mfm = MockManifestFileManager.return_value
+        mock_mlm.read_base.side_effect = lambda s: 
[Mock(file_name=f"manifest-{s.id}")]
+        mock_mfm.read_entries_parallel.side_effect = [start_entries, 
end_entries]
+
+        scanner = IncrementalDiffScanner(table)
+        start_snapshot = _create_mock_snapshot(0)
+        end_snapshot = _create_mock_snapshot(5)
+
+        plan = scanner.scan(start_snapshot, end_snapshot)
+
+        self.assertIsInstance(plan, Plan)
+        splits = plan.splits()
+        self.assertGreater(len(splits), 0)
+
+    
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestListManager')
+    
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestFileManager')
+    def test_entry_key_uniqueness(self, MockManifestFileManager, 
MockManifestListManager):
+        """Entry key should uniquely identify a file by (partition, bucket, 
filename)."""
+        from pypaimon.read.scanner.incremental_diff_scanner import \
+            IncrementalDiffScanner
+
+        table = _create_mock_table()
+
+        scanner = IncrementalDiffScanner(table)
+
+        # Same file in different partitions should have different keys
+        entry1 = _create_mock_entry([1], 0, "file.parquet")
+        entry2 = _create_mock_entry([2], 0, "file.parquet")
+        self.assertNotEqual(scanner._entry_key(entry1), 
scanner._entry_key(entry2))
+
+        # Same file in different buckets should have different keys
+        entry3 = _create_mock_entry([1], 0, "file.parquet")
+        entry4 = _create_mock_entry([1], 1, "file.parquet")
+        self.assertNotEqual(scanner._entry_key(entry3), 
scanner._entry_key(entry4))
+
+        # Same partition/bucket/filename should have same key
+        entry5 = _create_mock_entry([1], 0, "file.parquet")
+        entry6 = _create_mock_entry([1], 0, "file.parquet")
+        self.assertEqual(scanner._entry_key(entry5), 
scanner._entry_key(entry6))
+
+
+class IncrementalDiffIntegrationTest(unittest.TestCase):
+    """Integration tests comparing diff vs delta approaches."""
+
+    
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestListManager')
+    
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestFileManager')
+    def test_diff_equals_delta_results(self, MockManifestFileManager, 
MockManifestListManager):
+        """Diff and delta approaches should return the same added files."""
+        from pypaimon.read.scanner.incremental_diff_scanner import \
+            IncrementalDiffScanner
+
+        table = _create_mock_table()
+
+        base_entries_1 = [
+            _create_mock_entry([], 0, "file1.parquet"),
+            _create_mock_entry([], 0, "file2.parquet"),
+        ]
+
+        base_entries_5 = [
+            _create_mock_entry([], 0, "file1.parquet"),
+            _create_mock_entry([], 0, "file2.parquet"),
+            _create_mock_entry([], 0, "file3.parquet"),
+            _create_mock_entry([], 0, "file4.parquet"),
+            _create_mock_entry([], 0, "file5.parquet"),
+        ]
+
+        # Delta manifests for each snapshot
+        delta_2 = [_create_mock_entry([], 0, "file3.parquet")]
+        delta_3 = []  # COMPACT - no new files
+        delta_4 = [_create_mock_entry([], 0, "file4.parquet")]
+        delta_5 = [_create_mock_entry([], 0, "file5.parquet")]
+
+        mock_mlm = MockManifestListManager.return_value
+        mock_mfm = MockManifestFileManager.return_value
+
+        def mock_read_all(snapshot):
+            if snapshot.id == 1:
+                return [Mock(file_name="all-manifest-1")]
+            elif snapshot.id == 5:
+                return [Mock(file_name="all-manifest-5")]
+            return []
+
+        mock_mlm.read_all.side_effect = mock_read_all
+
+        def mock_read_entries(manifests, *args, **kwargs):
+            if manifests and manifests[0].file_name == "all-manifest-1":
+                return base_entries_1
+            elif manifests and manifests[0].file_name == "all-manifest-5":
+                return base_entries_5
+            return []
+
+        mock_mfm.read_entries_parallel.side_effect = mock_read_entries
+
+        scanner = IncrementalDiffScanner(table)
+        start_snapshot = _create_mock_snapshot(1)
+        end_snapshot = _create_mock_snapshot(5)
+
+        diff_entries = scanner.compute_diff(start_snapshot, end_snapshot)
+        diff_filenames = {e.file.file_name for e in diff_entries}
+
+        delta_filenames = {
+            e.file.file_name for entries in [delta_2, delta_3, delta_4, 
delta_5]
+            for e in entries
+        }
+
+        self.assertEqual(diff_filenames, delta_filenames)
+        self.assertEqual(diff_filenames, {"file3.parquet", "file4.parquet", 
"file5.parquet"})
+
+    
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestListManager')
+    
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestFileManager')
+    def test_diff_handles_compaction_correctly(self, MockManifestFileManager, 
MockManifestListManager):
+        """Compaction merging file1+file2 into file3 should return only 
file3."""
+        from pypaimon.read.scanner.incremental_diff_scanner import \
+            IncrementalDiffScanner
+
+        table = _create_mock_table()
+
+        base_entries_1 = [
+            _create_mock_entry([], 0, "file1.parquet"),
+            _create_mock_entry([], 0, "file2.parquet"),
+        ]
+
+        base_entries_5 = [
+            _create_mock_entry([], 0, "file3.parquet"),  # Compacted result
+        ]
+
+        mock_mlm = MockManifestListManager.return_value
+        mock_mfm = MockManifestFileManager.return_value
+
+        def mock_read_all(snapshot):
+            if snapshot.id == 1:
+                return [Mock(file_name="all-manifest-1")]
+            elif snapshot.id == 5:
+                return [Mock(file_name="all-manifest-5")]
+            return []
+
+        mock_mlm.read_all.side_effect = mock_read_all
+
+        def mock_read_entries(manifests, *args, **kwargs):
+            if manifests and manifests[0].file_name == "all-manifest-1":
+                return base_entries_1
+            elif manifests and manifests[0].file_name == "all-manifest-5":
+                return base_entries_5
+            return []
+
+        mock_mfm.read_entries_parallel.side_effect = mock_read_entries
+
+        scanner = IncrementalDiffScanner(table)
+        start_snapshot = _create_mock_snapshot(1)
+        end_snapshot = _create_mock_snapshot(5)
+
+        diff_entries = scanner.compute_diff(start_snapshot, end_snapshot)
+        diff_filenames = {e.file.file_name for e in diff_entries}
+
+        self.assertEqual(diff_filenames, {"file3.parquet"})
+
+
+if __name__ == '__main__':
+    unittest.main()


Reply via email to