This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 80185d6fee [python] Fix limit push-down discarding non-raw_convertible
splits (#7742)
80185d6fee is described below
commit 80185d6feeec48c10a0e51d8e907131a72d2dda7
Author: chaoyang <[email protected]>
AuthorDate: Fri May 8 22:44:48 2026 +0800
[python] Fix limit push-down discarding non-raw_convertible splits (#7742)
`_apply_push_down_limit` now mirrors Java: short-circuit when the
predicate references any non-partition column, otherwise accumulate
`split.merged_row_count()` and stop at the limit. Splits with unknown
merged count fall through to the reader unchanged.
---
.../pypaimon/read/scanner/file_scanner.py | 27 ++++++--
.../pypaimon/tests/reader_split_generator_test.py | 71 ++++++++++++++++++++++
2 files changed, 92 insertions(+), 6 deletions(-)
diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py
b/paimon-python/pypaimon/read/scanner/file_scanner.py
index 39c7401746..78e48f85e5 100755
--- a/paimon-python/pypaimon/read/scanner/file_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/file_scanner.py
@@ -31,7 +31,8 @@ from pypaimon.manifest.schema.manifest_entry import
ManifestEntry
from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta
from pypaimon.manifest.simple_stats_evolutions import SimpleStatsEvolutions
from pypaimon.read.plan import Plan
-from pypaimon.read.push_down_utils import (remove_row_id_filter,
+from pypaimon.read.push_down_utils import (_get_all_fields,
+ remove_row_id_filter,
trim_and_transform_predicate)
from pypaimon.read.scanner.append_table_split_generator import \
AppendTableSplitGenerator
@@ -365,20 +366,34 @@ class FileScanner:
return self
def _apply_push_down_limit(self, splits: List[DataSplit]) ->
List[DataSplit]:
+ """Mirror Java ``DataTableBatchScan.applyPushDownLimit``: sum the
+ DV-aware ``merged_row_count`` (== Java ``Split.mergedRowCount()``)
+ until the limit is met. Splits with unknown merged count fall
+ through to the reader unchanged.
+ """
if self.limit is None:
return splits
- scanned_row_count = 0
- limited_splits = []
+ if self._has_non_partition_filter():
+ return splits
+ scanned_row_count = 0
+ limited_splits: List[DataSplit] = []
for split in splits:
- if split.raw_convertible:
+ merged = split.merged_row_count()
+ if merged is not None:
limited_splits.append(split)
- scanned_row_count += split.row_count
+ scanned_row_count += merged
if scanned_row_count >= self.limit:
return limited_splits
-
return splits
+ def _has_non_partition_filter(self) -> bool:
+ """Mirror Java ``SnapshotReaderImpl.hasNonPartitionFilter``."""
+ if self.predicate is None:
+ return False
+ partition_keys = set(self.table.partition_keys or [])
+ return not _get_all_fields(self.predicate).issubset(partition_keys)
+
def _filter_manifest_file(self, file: ManifestFileMeta) -> bool:
if not self.partition_key_predicate:
return True
diff --git a/paimon-python/pypaimon/tests/reader_split_generator_test.py
b/paimon-python/pypaimon/tests/reader_split_generator_test.py
index 53819aebd5..00b363fd33 100644
--- a/paimon-python/pypaimon/tests/reader_split_generator_test.py
+++ b/paimon-python/pypaimon/tests/reader_split_generator_test.py
@@ -330,5 +330,76 @@ class SplitGeneratorTest(unittest.TestCase):
# This test ensures that if SlicedSplit is created, merged_row_count()
works correctly
+class ApplyPushDownLimitUnitTest(unittest.TestCase):
+ """Mock-driven coverage of ``FileScanner._apply_push_down_limit``."""
+
+ @staticmethod
+ def _apply(splits, limit, has_non_partition_filter=False):
+ from pypaimon.read.scanner.file_scanner import FileScanner
+
+ class _FakeScanner:
+ pass
+
+ scanner = _FakeScanner()
+ scanner.limit = limit
+ scanner._has_non_partition_filter = lambda: has_non_partition_filter
+ return FileScanner._apply_push_down_limit(scanner, splits)
+
+ @staticmethod
+ def _split(raw_convertible, row_count, merged_row_count):
+ class _FakeSplit:
+ pass
+
+ s = _FakeSplit()
+ s.raw_convertible = raw_convertible
+ s.row_count = row_count
+ s._merged = merged_row_count
+ s.merged_row_count = lambda: s._merged
+ return s
+
+ def test_dv_aware_accumulator_uses_merged_row_count(self):
+ """DV-aware raw split + trailing non-raw splits, ``limit > merged``:
+ pre-fix (``+= row_count``) early-returns ``[raw]``; post-fix
+ (``+= merged_row_count``) leaves the budget at 4 < 5, the loop
+ completes, and the fall-through returns all three splits."""
+ s_raw = self._split(raw_convertible=True, row_count=10,
merged_row_count=4)
+ s_nr1 = self._split(raw_convertible=False, row_count=10,
merged_row_count=None)
+ s_nr2 = self._split(raw_convertible=False, row_count=10,
merged_row_count=None)
+
+ result = self._apply([s_raw, s_nr1, s_nr2], limit=5)
+ self.assertEqual(len(result), 3)
+
+ def test_accumulator_skips_splits_with_unknown_merged_count(self):
+ """A split whose ``merged_row_count()`` returns ``None`` does not
+ contribute to the budget; the loop completes and returns the
+ input via the fall-through."""
+ s = self._split(raw_convertible=True, row_count=10,
merged_row_count=None)
+ result = self._apply([s], limit=5)
+ self.assertEqual(result, [s])
+
+ def test_no_raw_splits_falls_through_to_full_list(self):
+ """No split contributes to the budget → fall-through returns all."""
+ s1 = self._split(raw_convertible=False, row_count=10,
merged_row_count=None)
+ s2 = self._split(raw_convertible=False, row_count=10,
merged_row_count=None)
+ result = self._apply([s1, s2], limit=5)
+ self.assertEqual(result, [s1, s2])
+
+ def test_empty_splits_returns_empty(self):
+ self.assertEqual(self._apply([], limit=5), [])
+
+ def test_no_limit_returns_input_unchanged(self):
+ s = self._split(raw_convertible=True, row_count=10,
merged_row_count=10)
+ result = self._apply([s], limit=None)
+ self.assertEqual(result, [s])
+
+ def test_non_partition_filter_short_circuits_pushdown(self):
+ """Predicate touching a non-partition column → no pushdown,
+ regardless of how many DV-aware splits the plan contains."""
+ s_raw = self._split(raw_convertible=True, row_count=10,
merged_row_count=10)
+ result = self._apply(
+ [s_raw, s_raw, s_raw], limit=5, has_non_partition_filter=True)
+ self.assertEqual(len(result), 3)
+
+
if __name__ == '__main__':
unittest.main()