This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit d45886ba40e04174cc5a4d450c6a14ecaa1fb46a Author: ChengHui Chen <[email protected]> AuthorDate: Mon Oct 20 19:19:41 2025 +0800 [Python] optimize codes related to push_down_utils (#6430) --- paimon-python/pypaimon/read/push_down_utils.py | 14 +++++++------- .../pypaimon/read/scanner/full_starting_scanner.py | 6 +++--- paimon-python/pypaimon/read/table_read.py | 4 ++-- paimon-python/pypaimon/tests/py36/reader_predicate_test.py | 11 +++++++++++ paimon-python/pypaimon/tests/reader_predicate_test.py | 11 +++++++++++ 5 files changed, 34 insertions(+), 12 deletions(-) diff --git a/paimon-python/pypaimon/read/push_down_utils.py b/paimon-python/pypaimon/read/push_down_utils.py index 43892fb5e6..f812341149 100644 --- a/paimon-python/pypaimon/read/push_down_utils.py +++ b/paimon-python/pypaimon/read/push_down_utils.py @@ -22,9 +22,9 @@ from pypaimon.common.predicate import Predicate from pypaimon.common.predicate_builder import PredicateBuilder -def filter_and_transform_predicate(input_predicate: Predicate, all_fields: List[str], fields: List[str]): - new_predicate = filter_predicate_by_fields(input_predicate, fields) - part_to_index = {element: idx for idx, element in enumerate(fields)} +def trim_and_transform_predicate(input_predicate: Predicate, all_fields: List[str], trimmed_keys: List[str]): + new_predicate = trim_predicate_by_fields(input_predicate, trimmed_keys) + part_to_index = {element: idx for idx, element in enumerate(trimmed_keys)} mapping: Dict[int, int] = { i: part_to_index.get(all_fields[i], -1) for i in range(len(all_fields)) @@ -32,12 +32,12 @@ def filter_and_transform_predicate(input_predicate: Predicate, all_fields: List[ return _change_index(new_predicate, mapping) -def filter_predicate_by_fields(input_predicate: Predicate, fields: List[str]): - if not input_predicate or not fields: +def trim_predicate_by_fields(input_predicate: Predicate, trimmed_keys: List[str]): + if not input_predicate or not trimmed_keys: return None predicates: list[Predicate] = _split_and(input_predicate) - predicates = [element for element in predicates if _get_all_fields(element).issubset(fields)] + predicates = [element for element in predicates if _get_all_fields(element).issubset(trimmed_keys)] return PredicateBuilder.and_predicates(predicates) @@ -46,7 +46,7 @@ def _split_and(input_predicate: Predicate): return list() if input_predicate.method == 'and': - return list(input_predicate.literals) + return [p for element in (input_predicate.literals or []) for p in _split_and(element)] return [input_predicate] diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py index 156b05cdb6..f1ade2c4e8 100644 --- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py +++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py @@ -27,7 +27,7 @@ from pypaimon.manifest.schema.manifest_entry import ManifestEntry from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta from pypaimon.read.interval_partition import IntervalPartition, SortedRun from pypaimon.read.plan import Plan -from pypaimon.read.push_down_utils import (filter_and_transform_predicate) +from pypaimon.read.push_down_utils import (trim_and_transform_predicate) from pypaimon.read.scanner.starting_scanner import StartingScanner from pypaimon.read.split import Split from pypaimon.snapshot.snapshot_manager import SnapshotManager @@ -46,10 +46,10 @@ class FullStartingScanner(StartingScanner): self.manifest_list_manager = ManifestListManager(table) self.manifest_file_manager = ManifestFileManager(table) - self.primary_key_predicate = filter_and_transform_predicate( + self.primary_key_predicate = trim_and_transform_predicate( self.predicate, self.table.field_names, self.table.table_schema.get_trimmed_primary_keys()) - self.partition_key_predicate = filter_and_transform_predicate( + self.partition_key_predicate = trim_and_transform_predicate( self.predicate, self.table.field_names, self.table.partition_keys) self.target_split_size = 128 * 1024 * 1024 diff --git a/paimon-python/pypaimon/read/table_read.py b/paimon-python/pypaimon/read/table_read.py index 4bf07d37a5..061303862c 100644 --- a/paimon-python/pypaimon/read/table_read.py +++ b/paimon-python/pypaimon/read/table_read.py @@ -22,7 +22,7 @@ import pyarrow from pypaimon.common.core_options import CoreOptions from pypaimon.common.predicate import Predicate -from pypaimon.read.push_down_utils import filter_predicate_by_fields +from pypaimon.read.push_down_utils import trim_predicate_by_fields from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader from pypaimon.read.split import Split from pypaimon.read.split_read import (MergeFileSplitRead, RawFileSplitRead, @@ -112,7 +112,7 @@ class TableRead: if self.predicate is None: return None elif self.table.is_primary_key_table: - pk_predicate = filter_predicate_by_fields(self.predicate, self.table.primary_keys) + pk_predicate = trim_predicate_by_fields(self.predicate, self.table.primary_keys) if not pk_predicate: return None return pk_predicate.to_arrow() diff --git a/paimon-python/pypaimon/tests/py36/reader_predicate_test.py b/paimon-python/pypaimon/tests/py36/reader_predicate_test.py index e772205413..4c52fcdc41 100644 --- a/paimon-python/pypaimon/tests/py36/reader_predicate_test.py +++ b/paimon-python/pypaimon/tests/py36/reader_predicate_test.py @@ -25,6 +25,7 @@ import pyarrow as pa from pypaimon import CatalogFactory from pypaimon import Schema +from pypaimon.read import push_down_utils from pypaimon.read.split import Split @@ -80,3 +81,13 @@ class ReaderPredicateTest(unittest.TestCase): splits: list[Split] = read_builder.new_scan().plan().splits() self.assertEqual(len(splits), 1) self.assertEqual(splits[0].partition.to_dict().get("pt"), 1003) + + def test_trim_predicate(self): + predicate_builder = self.table.new_read_builder().new_predicate_builder() + p1 = predicate_builder.between('pt', 1002, 1003) + p2 = predicate_builder.and_predicates([predicate_builder.equal('pt', 1003), predicate_builder.equal('a', 3)]) + predicate = predicate_builder.and_predicates([p1, p2]) + pred = push_down_utils.trim_predicate_by_fields(predicate, self.table.partition_keys) + self.assertEqual(len(pred.literals), 2) + self.assertEqual(pred.literals[0].field, 'pt') + self.assertEqual(pred.literals[1].field, 'pt') diff --git a/paimon-python/pypaimon/tests/reader_predicate_test.py b/paimon-python/pypaimon/tests/reader_predicate_test.py index e772205413..4c52fcdc41 100644 --- a/paimon-python/pypaimon/tests/reader_predicate_test.py +++ b/paimon-python/pypaimon/tests/reader_predicate_test.py @@ -25,6 +25,7 @@ import pyarrow as pa from pypaimon import CatalogFactory from pypaimon import Schema +from pypaimon.read import push_down_utils from pypaimon.read.split import Split @@ -80,3 +81,13 @@ class ReaderPredicateTest(unittest.TestCase): splits: list[Split] = read_builder.new_scan().plan().splits() self.assertEqual(len(splits), 1) self.assertEqual(splits[0].partition.to_dict().get("pt"), 1003) + + def test_trim_predicate(self): + predicate_builder = self.table.new_read_builder().new_predicate_builder() + p1 = predicate_builder.between('pt', 1002, 1003) + p2 = predicate_builder.and_predicates([predicate_builder.equal('pt', 1003), predicate_builder.equal('a', 3)]) + predicate = predicate_builder.and_predicates([p1, p2]) + pred = push_down_utils.trim_predicate_by_fields(predicate, self.table.partition_keys) + self.assertEqual(len(pred.literals), 2) + self.assertEqual(pred.literals[0].field, 'pt') + self.assertEqual(pred.literals[1].field, 'pt')
