This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4a884fc31f [python] Add FollowUpScanner, IncrementalDiffScanner,
sharding (#7348)
4a884fc31f is described below
commit 4a884fc31fda314efa50cfe96f2184f279e76564
Author: Toby Cole <[email protected]>
AuthorDate: Wed Mar 11 06:17:05 2026 +0000
[python] Add FollowUpScanner, IncrementalDiffScanner, sharding (#7348)
- Add `FollowUpScanner` hierarchy (base, delta, changelog) for streaming
scan planning
- Add `IncrementalDiffScanner` for diff-based streaming reads
- Add sharding support to `FileScanner`
---
.../pypaimon/manifest/manifest_list_manager.py | 1 +
.../read/scanner/changelog_follow_up_scanner.py | 29 ++
.../read/scanner/delta_follow_up_scanner.py | 28 ++
.../pypaimon/read/scanner/file_scanner.py | 47 ++-
.../pypaimon/read/scanner/follow_up_scanner.py | 30 ++
.../read/scanner/incremental_diff_scanner.py | 100 ++++++
paimon-python/pypaimon/read/table_read.py | 2 +-
.../tests/changelog_follow_up_scanner_test.py | 51 +++
.../pypaimon/tests/follow_up_scanner_test.py | 59 ++++
.../tests/scanner/incremental_diff_scanner_test.py | 390 +++++++++++++++++++++
10 files changed, 720 insertions(+), 17 deletions(-)
diff --git a/paimon-python/pypaimon/manifest/manifest_list_manager.py
b/paimon-python/pypaimon/manifest/manifest_list_manager.py
index bc14fd86e4..3790dfce04 100644
--- a/paimon-python/pypaimon/manifest/manifest_list_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_list_manager.py
@@ -40,6 +40,7 @@ class ManifestListManager:
self.file_io = self.table.file_io
def read_all(self, snapshot: Optional[Snapshot]) -> List[ManifestFileMeta]:
+ """Read base + delta manifest lists for full file state."""
if snapshot is None:
return []
manifest_files = []
diff --git a/paimon-python/pypaimon/read/scanner/changelog_follow_up_scanner.py
b/paimon-python/pypaimon/read/scanner/changelog_follow_up_scanner.py
new file mode 100644
index 0000000000..6ae3b4068a
--- /dev/null
+++ b/paimon-python/pypaimon/read/scanner/changelog_follow_up_scanner.py
@@ -0,0 +1,29 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""ChangelogFollowUpScanner for tables with changelog-producer settings."""
+
+from pypaimon.read.scanner.follow_up_scanner import FollowUpScanner
+from pypaimon.snapshot.snapshot import Snapshot
+
+
+class ChangelogFollowUpScanner(FollowUpScanner):
+ """Scans any commit that has a changelog_manifest_list."""
+
+ def should_scan(self, snapshot: Snapshot) -> bool:
+ changelog_list = snapshot.changelog_manifest_list
+ return changelog_list is not None and changelog_list != ""
diff --git a/paimon-python/pypaimon/read/scanner/delta_follow_up_scanner.py
b/paimon-python/pypaimon/read/scanner/delta_follow_up_scanner.py
new file mode 100644
index 0000000000..bb9b6a92bc
--- /dev/null
+++ b/paimon-python/pypaimon/read/scanner/delta_follow_up_scanner.py
@@ -0,0 +1,28 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""DeltaFollowUpScanner for append-only streaming reads."""
+
+from pypaimon.read.scanner.follow_up_scanner import FollowUpScanner
+from pypaimon.snapshot.snapshot import Snapshot
+
+
+class DeltaFollowUpScanner(FollowUpScanner):
+ """Scans only APPEND commits; skips compaction and maintenance."""
+
+ def should_scan(self, snapshot: Snapshot) -> bool:
+ return snapshot.commit_kind == "APPEND"
diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py
b/paimon-python/pypaimon/read/scanner/file_scanner.py
index e56c568b95..d5e543b2fc 100755
--- a/paimon-python/pypaimon/read/scanner/file_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/file_scanner.py
@@ -15,38 +15,39 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
+import logging
import os
import time
-import logging
-from typing import List, Optional, Dict, Set, Callable
+from typing import Callable, Dict, List, Optional, Set
logger = logging.getLogger(__name__)
from pypaimon.common.predicate import Predicate
from pypaimon.globalindex import ScoredGlobalIndexResult
-from pypaimon.table.source.deletion_file import DeletionFile
from pypaimon.manifest.index_manifest_file import IndexManifestFile
from pypaimon.manifest.manifest_file_manager import ManifestFileManager
from pypaimon.manifest.manifest_list_manager import ManifestListManager
from pypaimon.manifest.schema.manifest_entry import ManifestEntry
from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta
+from pypaimon.manifest.simple_stats_evolutions import SimpleStatsEvolutions
from pypaimon.read.plan import Plan
-from pypaimon.read.push_down_utils import (
- remove_row_id_filter,
- trim_and_transform_predicate,
-)
-from pypaimon.read.scanner.append_table_split_generator import
AppendTableSplitGenerator
-from pypaimon.read.scanner.data_evolution_split_generator import
DataEvolutionSplitGenerator
-from pypaimon.read.scanner.primary_key_table_split_generator import
PrimaryKeyTableSplitGenerator
+from pypaimon.read.push_down_utils import (remove_row_id_filter,
+ trim_and_transform_predicate)
+from pypaimon.read.scanner.append_table_split_generator import \
+ AppendTableSplitGenerator
+from pypaimon.read.scanner.data_evolution_split_generator import \
+ DataEvolutionSplitGenerator
+from pypaimon.read.scanner.primary_key_table_split_generator import \
+ PrimaryKeyTableSplitGenerator
from pypaimon.read.split import DataSplit
from pypaimon.snapshot.snapshot_manager import SnapshotManager
from pypaimon.table.bucket_mode import BucketMode
-from pypaimon.manifest.simple_stats_evolutions import SimpleStatsEvolutions
+from pypaimon.table.source.deletion_file import DeletionFile
def _row_ranges_from_predicate(predicate: Optional[Predicate]) ->
Optional[List]:
- from pypaimon.utils.range import Range
from pypaimon.table.special_fields import SpecialFields
+ from pypaimon.utils.range import Range
if predicate is None:
return None
@@ -167,7 +168,10 @@ class FileScanner:
manifest_scanner: Callable[[], List[ManifestFileMeta]],
predicate: Optional[Predicate] = None,
limit: Optional[int] = None,
- vector_search: Optional['VectorSearch'] = None
+ vector_search: Optional['VectorSearch'] = None,
+ shard_index: Optional[int] = None,
+ shard_count: Optional[int] = None,
+ bucket_filter: Optional[Callable[[int], bool]] = None
):
from pypaimon.table.file_store_table import FileStoreTable
@@ -178,6 +182,11 @@ class FileScanner:
self.limit = limit
self.vector_search = vector_search
+ # Bucket-level sharding for parallel consumption
+ self._shard_index = shard_index
+ self._shard_count = shard_count
+ self._bucket_filter = bucket_filter
+
self.snapshot_manager = SnapshotManager(table)
self.manifest_list_manager = ManifestListManager(table)
self.manifest_file_manager = ManifestFileManager(table)
@@ -299,9 +308,8 @@ class FileScanner:
def _eval_global_index(self):
from pypaimon.globalindex.global_index_result import GlobalIndexResult
- from pypaimon.globalindex.global_index_scan_builder import (
+ from pypaimon.globalindex.global_index_scan_builder import \
GlobalIndexScanBuilder
- )
from pypaimon.utils.range import Range
# No filter and no vector search - nothing to evaluate
@@ -361,7 +369,7 @@ class FileScanner:
return result
def read_manifest_entries(self, manifest_files: List[ManifestFileMeta]) ->
List[ManifestEntry]:
- max_workers = max(8,
self.table.options.scan_manifest_parallelism(os.cpu_count() or 8))
+ max_workers =
self.table.options.scan_manifest_parallelism(os.cpu_count() or 8)
manifest_files = [entry for entry in manifest_files if
self._filter_manifest_file(entry)]
return self.manifest_file_manager.read_entries_parallel(
manifest_files,
@@ -414,6 +422,13 @@ class FileScanner:
return False
if self.partition_key_predicate and not
self.partition_key_predicate.test(entry.partition):
return False
+ # Apply bucket-level sharding for parallel consumption
+ if self._shard_index is not None and self._shard_count is not None:
+ if entry.bucket % self._shard_count != self._shard_index:
+ return False
+ elif self._bucket_filter is not None:
+ if not self._bucket_filter(entry.bucket):
+ return False
# Get SimpleStatsEvolution for this schema
evolution =
self.simple_stats_evolutions.get_or_create(entry.file.schema_id)
diff --git a/paimon-python/pypaimon/read/scanner/follow_up_scanner.py
b/paimon-python/pypaimon/read/scanner/follow_up_scanner.py
new file mode 100644
index 0000000000..cace582298
--- /dev/null
+++ b/paimon-python/pypaimon/read/scanner/follow_up_scanner.py
@@ -0,0 +1,30 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""FollowUpScanner interface for streaming table scans."""
+
+from abc import ABC, abstractmethod
+
+from pypaimon.snapshot.snapshot import Snapshot
+
+
+class FollowUpScanner(ABC):
+ """Determines which snapshots to scan after the initial streaming scan."""
+
+ @abstractmethod
+ def should_scan(self, snapshot: Snapshot) -> bool:
+ ...
diff --git a/paimon-python/pypaimon/read/scanner/incremental_diff_scanner.py
b/paimon-python/pypaimon/read/scanner/incremental_diff_scanner.py
new file mode 100644
index 0000000000..61ec66e035
--- /dev/null
+++ b/paimon-python/pypaimon/read/scanner/incremental_diff_scanner.py
@@ -0,0 +1,100 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import os
+from typing import List, Set, Tuple
+
+from pypaimon.manifest.manifest_file_manager import ManifestFileManager
+from pypaimon.manifest.manifest_list_manager import ManifestListManager
+from pypaimon.manifest.schema.manifest_entry import ManifestEntry
+from pypaimon.read.plan import Plan
+from pypaimon.read.scanner.append_table_split_generator import \
+ AppendTableSplitGenerator
+from pypaimon.snapshot.snapshot import Snapshot
+
+
+class IncrementalDiffScanner:
+ """
+ Scan files added between two snapshots via set-diff.
+
+ More efficient than reading N delta_manifest_lists when many intermediate
+ snapshots are compaction-only.
+ """
+
+ def __init__(self, table):
+ from pypaimon.table.file_store_table import FileStoreTable
+
+ self.table: FileStoreTable = table
+ self.manifest_list_manager = ManifestListManager(table)
+ self.manifest_file_manager = ManifestFileManager(table)
+
+ options = self.table.options
+ self.target_split_size = options.source_split_target_size()
+ self.open_file_cost = options.source_split_open_file_cost()
+
+ def scan(self, start_snapshot: Snapshot, end_snapshot: Snapshot) -> Plan:
+ """Scan files added between start (exclusive) and end (inclusive)
snapshots."""
+ added_entries = self.compute_diff(start_snapshot, end_snapshot)
+
+ if not added_entries:
+ return Plan([])
+
+ split_generator = AppendTableSplitGenerator(
+ self.table,
+ self.target_split_size,
+ self.open_file_cost,
+ {} # No deletion files for incremental diff
+ )
+
+ splits = split_generator.create_splits(added_entries)
+ return Plan(splits)
+
+ def compute_diff(
+ self,
+ start_snapshot: Snapshot,
+ end_snapshot: Snapshot
+ ) -> List[ManifestEntry]:
+ """Return files present in end_snapshot but absent from
start_snapshot."""
+ start_manifest_files =
self.manifest_list_manager.read_all(start_snapshot)
+ end_manifest_files = self.manifest_list_manager.read_all(end_snapshot)
+
+ max_workers =
self.table.options.scan_manifest_parallelism(os.cpu_count() or 8)
+
+ start_entries = self.manifest_file_manager.read_entries_parallel(
+ start_manifest_files,
+ max_workers=max_workers
+ )
+ end_entries = self.manifest_file_manager.read_entries_parallel(
+ end_manifest_files,
+ max_workers=max_workers
+ )
+
+ start_keys: Set[Tuple] = {self._entry_key(e) for e in start_entries}
+
+ added_entries = [
+ entry for entry in end_entries
+ if self._entry_key(entry) not in start_keys
+ ]
+
+ return added_entries
+
+ def _entry_key(self, entry: ManifestEntry) -> Tuple:
+ return (
+ tuple(entry.partition.values),
+ entry.bucket,
+ entry.file.file_name
+ )
diff --git a/paimon-python/pypaimon/read/table_read.py
b/paimon-python/pypaimon/read/table_read.py
index 5206147f80..28e5a9f5ba 100644
--- a/paimon-python/pypaimon/read/table_read.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -198,7 +198,7 @@ class TableRead:
You needn't manually set this in most cases.
**read_args: Additional kwargs passed to the datasource.
For example, ``per_task_row_limit`` (Ray 2.52.0+).
-
+
See `Ray Data API
<https://docs.ray.io/en/latest/data/api/doc/ray.data.read_datasource.html>`_
for details.
"""
diff --git a/paimon-python/pypaimon/tests/changelog_follow_up_scanner_test.py
b/paimon-python/pypaimon/tests/changelog_follow_up_scanner_test.py
new file mode 100644
index 0000000000..2c959276c3
--- /dev/null
+++ b/paimon-python/pypaimon/tests/changelog_follow_up_scanner_test.py
@@ -0,0 +1,51 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""Tests for ChangelogFollowUpScanner."""
+
+import unittest
+from unittest.mock import Mock
+
+from pypaimon.read.scanner.changelog_follow_up_scanner import \
+ ChangelogFollowUpScanner
+
+
+class ChangelogFollowUpScannerTest(unittest.TestCase):
+ """Tests for ChangelogFollowUpScanner."""
+
+ def test_should_scan_any_commit_with_changelog(self):
+ """Scanner scans based on changelog_manifest_list, regardless of
commit kind."""
+ scanner = ChangelogFollowUpScanner()
+ for kind in ("APPEND", "COMPACT"):
+ snapshot =
Mock(changelog_manifest_list=f"changelog-manifest-{kind}")
+ self.assertTrue(scanner.should_scan(snapshot), kind)
+
+ def test_should_skip_when_no_changelog(self):
+ """Scanner should skip when changelog_manifest_list is None."""
+ scanner = ChangelogFollowUpScanner()
+ snapshot = Mock(changelog_manifest_list=None)
+ self.assertFalse(scanner.should_scan(snapshot))
+
+ def test_should_skip_for_empty_string(self):
+ """Empty string should be treated as no changelog."""
+ scanner = ChangelogFollowUpScanner()
+ snapshot = Mock(changelog_manifest_list="")
+ self.assertFalse(scanner.should_scan(snapshot))
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/paimon-python/pypaimon/tests/follow_up_scanner_test.py
b/paimon-python/pypaimon/tests/follow_up_scanner_test.py
new file mode 100644
index 0000000000..e7b6bcd4dd
--- /dev/null
+++ b/paimon-python/pypaimon/tests/follow_up_scanner_test.py
@@ -0,0 +1,59 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""Tests for FollowUpScanner implementations."""
+
+import unittest
+from unittest.mock import Mock
+
+from pypaimon.read.scanner.delta_follow_up_scanner import DeltaFollowUpScanner
+from pypaimon.read.scanner.follow_up_scanner import FollowUpScanner
+
+
+class FollowUpScannerInterfaceTest(unittest.TestCase):
+ """Test that FollowUpScanner interface is properly defined."""
+
+ def test_follow_up_scanner_is_abstract(self):
+ """FollowUpScanner should be an abstract base class."""
+ with self.assertRaises(TypeError):
+ FollowUpScanner()
+
+
+class DeltaFollowUpScannerTest(unittest.TestCase):
+ """Tests for DeltaFollowUpScanner which handles APPEND commits only."""
+
+ def setUp(self):
+ self.scanner = DeltaFollowUpScanner()
+
+ def test_should_scan_returns_true_for_append_commit(self):
+ """DeltaFollowUpScanner should scan APPEND commits."""
+ snapshot = Mock()
+ snapshot.commit_kind = "APPEND"
+
+ result = self.scanner.should_scan(snapshot)
+
+ self.assertTrue(result)
+
+ def test_should_scan_returns_false_for_non_append_commits(self):
+ """DeltaFollowUpScanner should skip non-APPEND commits."""
+ for kind in ("COMPACT", "OVERWRITE", "EXPIRE", "ANALYZE"):
+ snapshot = Mock(commit_kind=kind)
+ self.assertFalse(self.scanner.should_scan(snapshot), kind)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git
a/paimon-python/pypaimon/tests/scanner/incremental_diff_scanner_test.py
b/paimon-python/pypaimon/tests/scanner/incremental_diff_scanner_test.py
new file mode 100644
index 0000000000..d5feba2b7d
--- /dev/null
+++ b/paimon-python/pypaimon/tests/scanner/incremental_diff_scanner_test.py
@@ -0,0 +1,390 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""Tests for IncrementalDiffScanner."""
+
+import unittest
+from unittest.mock import Mock, patch
+
+
+def _create_mock_table():
+ table = Mock()
+ table.is_primary_key_table = False
+ table.options = Mock()
+ table.options.source_split_target_size.return_value = 128 * 1024 * 1024
+ table.options.source_split_open_file_cost.return_value = 4 * 1024 * 1024
+ table.options.scan_manifest_parallelism.return_value = 8
+ table.partition_keys = []
+ return table
+
+
+def _create_mock_entry(partition_values, bucket, filename):
+ entry = Mock()
+ entry.partition = Mock()
+ entry.partition.values = partition_values
+ entry.bucket = bucket
+ entry.total_buckets = 1
+ entry.file = Mock()
+ entry.file.file_name = filename
+ entry.file.file_size = 1024
+ entry.file.row_count = 100
+ entry.kind = 0 # ADD
+ return entry
+
+
+def _create_mock_snapshot(snapshot_id, commit_kind="APPEND",
+ base_manifest="base", delta_manifest="delta"):
+ snapshot = Mock()
+ snapshot.id = snapshot_id
+ snapshot.commit_kind = commit_kind
+ snapshot.base_manifest_list = f"{base_manifest}-{snapshot_id}"
+ snapshot.delta_manifest_list = f"{delta_manifest}-{snapshot_id}"
+ return snapshot
+
+
+class IncrementalDiffScannerTest(unittest.TestCase):
+
+
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestListManager')
+
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestFileManager')
+ def test_diff_returns_only_added_files(self, MockManifestFileManager,
MockManifestListManager):
+ """Files in end but not in start should be returned."""
+ from pypaimon.read.scanner.incremental_diff_scanner import \
+ IncrementalDiffScanner
+
+ table = _create_mock_table()
+
+ start_entries = [
+ _create_mock_entry([], 0, "file1.parquet"),
+ _create_mock_entry([], 0, "file2.parquet"),
+ ]
+ end_entries = [
+ _create_mock_entry([], 0, "file1.parquet"),
+ _create_mock_entry([], 0, "file2.parquet"),
+ _create_mock_entry([], 0, "file3.parquet"),
+ _create_mock_entry([], 0, "file4.parquet"),
+ ]
+
+ mock_mlm = MockManifestListManager.return_value
+ mock_mfm = MockManifestFileManager.return_value
+
+ mock_mlm.read_base.side_effect = lambda s:
[Mock(file_name=f"manifest-{s.id}")]
+ mock_mfm.read_entries_parallel.side_effect = [start_entries,
end_entries]
+
+ scanner = IncrementalDiffScanner(table)
+ start_snapshot = _create_mock_snapshot(1)
+ end_snapshot = _create_mock_snapshot(5)
+
+ added_entries = scanner.compute_diff(start_snapshot, end_snapshot)
+
+ added_filenames = {e.file.file_name for e in added_entries}
+ self.assertEqual(added_filenames, {"file3.parquet", "file4.parquet"})
+
+
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestListManager')
+
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestFileManager')
+ def test_diff_handles_empty_start(self, MockManifestFileManager,
MockManifestListManager):
+ """When start is empty, all end files should be returned."""
+ from pypaimon.read.scanner.incremental_diff_scanner import \
+ IncrementalDiffScanner
+
+ table = _create_mock_table()
+
+ start_entries = []
+ end_entries = [
+ _create_mock_entry([], 0, "file1.parquet"),
+ _create_mock_entry([], 0, "file2.parquet"),
+ ]
+
+ mock_mlm = MockManifestListManager.return_value
+ mock_mfm = MockManifestFileManager.return_value
+ mock_mlm.read_base.side_effect = lambda s:
[Mock(file_name=f"manifest-{s.id}")]
+ mock_mfm.read_entries_parallel.side_effect = [start_entries,
end_entries]
+
+ scanner = IncrementalDiffScanner(table)
+ start_snapshot = _create_mock_snapshot(0)
+ end_snapshot = _create_mock_snapshot(5)
+
+ added_entries = scanner.compute_diff(start_snapshot, end_snapshot)
+
+ added_filenames = {e.file.file_name for e in added_entries}
+ self.assertEqual(added_filenames, {"file1.parquet", "file2.parquet"})
+
+
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestListManager')
+
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestFileManager')
+ def test_diff_handles_same_snapshots(self, MockManifestFileManager,
MockManifestListManager):
+ """When start == end (same files), diff should return empty."""
+ from pypaimon.read.scanner.incremental_diff_scanner import \
+ IncrementalDiffScanner
+
+ table = _create_mock_table()
+
+ entries = [
+ _create_mock_entry([], 0, "file1.parquet"),
+ _create_mock_entry([], 0, "file2.parquet"),
+ ]
+
+ mock_mlm = MockManifestListManager.return_value
+ mock_mfm = MockManifestFileManager.return_value
+ mock_mlm.read_base.side_effect = lambda s:
[Mock(file_name=f"manifest-{s.id}")]
+ mock_mfm.read_entries_parallel.side_effect = [entries, entries]
+
+ scanner = IncrementalDiffScanner(table)
+ snapshot = _create_mock_snapshot(5)
+
+ added_entries = scanner.compute_diff(snapshot, snapshot)
+
+ self.assertEqual(len(added_entries), 0)
+
+
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestListManager')
+
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestFileManager')
+ def test_diff_groups_by_partition_bucket(self, MockManifestFileManager,
MockManifestListManager):
+ """Diff should be computed per (partition, bucket)."""
+ from pypaimon.read.scanner.incremental_diff_scanner import \
+ IncrementalDiffScanner
+
+ table = _create_mock_table()
+
+ start_entries = [
+ _create_mock_entry([1], 0, "file1.parquet"), # p1, bucket 0
+ _create_mock_entry([1], 1, "file3.parquet"), # p1, bucket 1
+ ]
+ end_entries = [
+ _create_mock_entry([1], 0, "file1.parquet"), # p1, bucket 0
+ _create_mock_entry([1], 0, "file2.parquet"), # p1, bucket 0 - NEW
+ _create_mock_entry([1], 1, "file3.parquet"), # p1, bucket 1
+ ]
+
+ mock_mlm = MockManifestListManager.return_value
+ mock_mfm = MockManifestFileManager.return_value
+ mock_mlm.read_base.side_effect = lambda s:
[Mock(file_name=f"manifest-{s.id}")]
+ mock_mfm.read_entries_parallel.side_effect = [start_entries,
end_entries]
+
+ scanner = IncrementalDiffScanner(table)
+ start_snapshot = _create_mock_snapshot(1)
+ end_snapshot = _create_mock_snapshot(5)
+
+ added_entries = scanner.compute_diff(start_snapshot, end_snapshot)
+
+ self.assertEqual(len(added_entries), 1)
+ self.assertEqual(added_entries[0].file.file_name, "file2.parquet")
+
+
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestListManager')
+
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestFileManager')
+ def test_diff_handles_file_deletions(self, MockManifestFileManager,
MockManifestListManager):
+ """Files in start but not in end (deleted) should NOT be returned."""
+ from pypaimon.read.scanner.incremental_diff_scanner import \
+ IncrementalDiffScanner
+
+ table = _create_mock_table()
+
+ start_entries = [
+ _create_mock_entry([], 0, "file1.parquet"),
+ _create_mock_entry([], 0, "file2.parquet"),
+ _create_mock_entry([], 0, "file3.parquet"),
+ ]
+ end_entries = [
+ _create_mock_entry([], 0, "file1.parquet"),
+ _create_mock_entry([], 0, "file2.parquet"),
+ ]
+
+ mock_mlm = MockManifestListManager.return_value
+ mock_mfm = MockManifestFileManager.return_value
+ mock_mlm.read_base.side_effect = lambda s:
[Mock(file_name=f"manifest-{s.id}")]
+ mock_mfm.read_entries_parallel.side_effect = [start_entries,
end_entries]
+
+ scanner = IncrementalDiffScanner(table)
+ start_snapshot = _create_mock_snapshot(1)
+ end_snapshot = _create_mock_snapshot(5)
+
+ added_entries = scanner.compute_diff(start_snapshot, end_snapshot)
+
+ self.assertEqual(len(added_entries), 0)
+
+
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestListManager')
+
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestFileManager')
+ def test_diff_creates_correct_splits(self, MockManifestFileManager,
MockManifestListManager):
+ """scan() should return a Plan with correct DataSplits."""
+ from pypaimon.read.plan import Plan
+ from pypaimon.read.scanner.incremental_diff_scanner import \
+ IncrementalDiffScanner
+
+ table = _create_mock_table()
+
+ start_entries = []
+ end_entries = [
+ _create_mock_entry([], 0, "file1.parquet"),
+ ]
+
+ mock_mlm = MockManifestListManager.return_value
+ mock_mfm = MockManifestFileManager.return_value
+ mock_mlm.read_base.side_effect = lambda s:
[Mock(file_name=f"manifest-{s.id}")]
+ mock_mfm.read_entries_parallel.side_effect = [start_entries,
end_entries]
+
+ scanner = IncrementalDiffScanner(table)
+ start_snapshot = _create_mock_snapshot(0)
+ end_snapshot = _create_mock_snapshot(5)
+
+ plan = scanner.scan(start_snapshot, end_snapshot)
+
+ self.assertIsInstance(plan, Plan)
+ splits = plan.splits()
+ self.assertGreater(len(splits), 0)
+
+
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestListManager')
+
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestFileManager')
+ def test_entry_key_uniqueness(self, MockManifestFileManager,
MockManifestListManager):
+ """Entry key should uniquely identify a file by (partition, bucket,
filename)."""
+ from pypaimon.read.scanner.incremental_diff_scanner import \
+ IncrementalDiffScanner
+
+ table = _create_mock_table()
+
+ scanner = IncrementalDiffScanner(table)
+
+ # Same file in different partitions should have different keys
+ entry1 = _create_mock_entry([1], 0, "file.parquet")
+ entry2 = _create_mock_entry([2], 0, "file.parquet")
+ self.assertNotEqual(scanner._entry_key(entry1),
scanner._entry_key(entry2))
+
+ # Same file in different buckets should have different keys
+ entry3 = _create_mock_entry([1], 0, "file.parquet")
+ entry4 = _create_mock_entry([1], 1, "file.parquet")
+ self.assertNotEqual(scanner._entry_key(entry3),
scanner._entry_key(entry4))
+
+ # Same partition/bucket/filename should have same key
+ entry5 = _create_mock_entry([1], 0, "file.parquet")
+ entry6 = _create_mock_entry([1], 0, "file.parquet")
+ self.assertEqual(scanner._entry_key(entry5),
scanner._entry_key(entry6))
+
+
+class IncrementalDiffIntegrationTest(unittest.TestCase):
+ """Integration tests comparing diff vs delta approaches."""
+
+
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestListManager')
+
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestFileManager')
+ def test_diff_equals_delta_results(self, MockManifestFileManager,
MockManifestListManager):
+ """Diff and delta approaches should return the same added files."""
+ from pypaimon.read.scanner.incremental_diff_scanner import \
+ IncrementalDiffScanner
+
+ table = _create_mock_table()
+
+ base_entries_1 = [
+ _create_mock_entry([], 0, "file1.parquet"),
+ _create_mock_entry([], 0, "file2.parquet"),
+ ]
+
+ base_entries_5 = [
+ _create_mock_entry([], 0, "file1.parquet"),
+ _create_mock_entry([], 0, "file2.parquet"),
+ _create_mock_entry([], 0, "file3.parquet"),
+ _create_mock_entry([], 0, "file4.parquet"),
+ _create_mock_entry([], 0, "file5.parquet"),
+ ]
+
+ # Delta manifests for each snapshot
+ delta_2 = [_create_mock_entry([], 0, "file3.parquet")]
+ delta_3 = [] # COMPACT - no new files
+ delta_4 = [_create_mock_entry([], 0, "file4.parquet")]
+ delta_5 = [_create_mock_entry([], 0, "file5.parquet")]
+
+ mock_mlm = MockManifestListManager.return_value
+ mock_mfm = MockManifestFileManager.return_value
+
+ def mock_read_all(snapshot):
+ if snapshot.id == 1:
+ return [Mock(file_name="all-manifest-1")]
+ elif snapshot.id == 5:
+ return [Mock(file_name="all-manifest-5")]
+ return []
+
+ mock_mlm.read_all.side_effect = mock_read_all
+
+ def mock_read_entries(manifests, *args, **kwargs):
+ if manifests and manifests[0].file_name == "all-manifest-1":
+ return base_entries_1
+ elif manifests and manifests[0].file_name == "all-manifest-5":
+ return base_entries_5
+ return []
+
+ mock_mfm.read_entries_parallel.side_effect = mock_read_entries
+
+ scanner = IncrementalDiffScanner(table)
+ start_snapshot = _create_mock_snapshot(1)
+ end_snapshot = _create_mock_snapshot(5)
+
+ diff_entries = scanner.compute_diff(start_snapshot, end_snapshot)
+ diff_filenames = {e.file.file_name for e in diff_entries}
+
+ delta_filenames = {
+ e.file.file_name for entries in [delta_2, delta_3, delta_4,
delta_5]
+ for e in entries
+ }
+
+ self.assertEqual(diff_filenames, delta_filenames)
+ self.assertEqual(diff_filenames, {"file3.parquet", "file4.parquet",
"file5.parquet"})
+
+
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestListManager')
+
@patch('pypaimon.read.scanner.incremental_diff_scanner.ManifestFileManager')
+ def test_diff_handles_compaction_correctly(self, MockManifestFileManager,
MockManifestListManager):
+ """Compaction merging file1+file2 into file3 should return only
file3."""
+ from pypaimon.read.scanner.incremental_diff_scanner import \
+ IncrementalDiffScanner
+
+ table = _create_mock_table()
+
+ base_entries_1 = [
+ _create_mock_entry([], 0, "file1.parquet"),
+ _create_mock_entry([], 0, "file2.parquet"),
+ ]
+
+ base_entries_5 = [
+ _create_mock_entry([], 0, "file3.parquet"), # Compacted result
+ ]
+
+ mock_mlm = MockManifestListManager.return_value
+ mock_mfm = MockManifestFileManager.return_value
+
+ def mock_read_all(snapshot):
+ if snapshot.id == 1:
+ return [Mock(file_name="all-manifest-1")]
+ elif snapshot.id == 5:
+ return [Mock(file_name="all-manifest-5")]
+ return []
+
+ mock_mlm.read_all.side_effect = mock_read_all
+
+ def mock_read_entries(manifests, *args, **kwargs):
+ if manifests and manifests[0].file_name == "all-manifest-1":
+ return base_entries_1
+ elif manifests and manifests[0].file_name == "all-manifest-5":
+ return base_entries_5
+ return []
+
+ mock_mfm.read_entries_parallel.side_effect = mock_read_entries
+
+ scanner = IncrementalDiffScanner(table)
+ start_snapshot = _create_mock_snapshot(1)
+ end_snapshot = _create_mock_snapshot(5)
+
+ diff_entries = scanner.compute_diff(start_snapshot, end_snapshot)
+ diff_filenames = {e.file.file_name for e in diff_entries}
+
+ self.assertEqual(diff_filenames, {"file3.parquet"})
+
+
+if __name__ == '__main__':
+ unittest.main()