This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit d45886ba40e04174cc5a4d450c6a14ecaa1fb46a
Author: ChengHui Chen <[email protected]>
AuthorDate: Mon Oct 20 19:19:41 2025 +0800

    [Python] optimize codes related to push_down_utils (#6430)
---
 paimon-python/pypaimon/read/push_down_utils.py             | 14 +++++++-------
 .../pypaimon/read/scanner/full_starting_scanner.py         |  6 +++---
 paimon-python/pypaimon/read/table_read.py                  |  4 ++--
 paimon-python/pypaimon/tests/py36/reader_predicate_test.py | 11 +++++++++++
 paimon-python/pypaimon/tests/reader_predicate_test.py      | 11 +++++++++++
 5 files changed, 34 insertions(+), 12 deletions(-)

diff --git a/paimon-python/pypaimon/read/push_down_utils.py 
b/paimon-python/pypaimon/read/push_down_utils.py
index 43892fb5e6..f812341149 100644
--- a/paimon-python/pypaimon/read/push_down_utils.py
+++ b/paimon-python/pypaimon/read/push_down_utils.py
@@ -22,9 +22,9 @@ from pypaimon.common.predicate import Predicate
 from pypaimon.common.predicate_builder import PredicateBuilder
 
 
-def filter_and_transform_predicate(input_predicate: Predicate, all_fields: 
List[str], fields: List[str]):
-    new_predicate = filter_predicate_by_fields(input_predicate, fields)
-    part_to_index = {element: idx for idx, element in enumerate(fields)}
+def trim_and_transform_predicate(input_predicate: Predicate, all_fields: 
List[str], trimmed_keys: List[str]):
+    new_predicate = trim_predicate_by_fields(input_predicate, trimmed_keys)
+    part_to_index = {element: idx for idx, element in enumerate(trimmed_keys)}
     mapping: Dict[int, int] = {
         i: part_to_index.get(all_fields[i], -1)
         for i in range(len(all_fields))
@@ -32,12 +32,12 @@ def filter_and_transform_predicate(input_predicate: 
Predicate, all_fields: List[
     return _change_index(new_predicate, mapping)
 
 
-def filter_predicate_by_fields(input_predicate: Predicate, fields: List[str]):
-    if not input_predicate or not fields:
+def trim_predicate_by_fields(input_predicate: Predicate, trimmed_keys: 
List[str]):
+    if not input_predicate or not trimmed_keys:
         return None
 
     predicates: list[Predicate] = _split_and(input_predicate)
-    predicates = [element for element in predicates if 
_get_all_fields(element).issubset(fields)]
+    predicates = [element for element in predicates if 
_get_all_fields(element).issubset(trimmed_keys)]
     return PredicateBuilder.and_predicates(predicates)
 
 
@@ -46,7 +46,7 @@ def _split_and(input_predicate: Predicate):
         return list()
 
     if input_predicate.method == 'and':
-        return list(input_predicate.literals)
+        return [p for element in (input_predicate.literals or []) for p in 
_split_and(element)]
 
     return [input_predicate]
 
diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py 
b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
index 156b05cdb6..f1ade2c4e8 100644
--- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
@@ -27,7 +27,7 @@ from pypaimon.manifest.schema.manifest_entry import 
ManifestEntry
 from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta
 from pypaimon.read.interval_partition import IntervalPartition, SortedRun
 from pypaimon.read.plan import Plan
-from pypaimon.read.push_down_utils import (filter_and_transform_predicate)
+from pypaimon.read.push_down_utils import (trim_and_transform_predicate)
 from pypaimon.read.scanner.starting_scanner import StartingScanner
 from pypaimon.read.split import Split
 from pypaimon.snapshot.snapshot_manager import SnapshotManager
@@ -46,10 +46,10 @@ class FullStartingScanner(StartingScanner):
         self.manifest_list_manager = ManifestListManager(table)
         self.manifest_file_manager = ManifestFileManager(table)
 
-        self.primary_key_predicate = filter_and_transform_predicate(
+        self.primary_key_predicate = trim_and_transform_predicate(
             self.predicate, self.table.field_names, 
self.table.table_schema.get_trimmed_primary_keys())
 
-        self.partition_key_predicate = filter_and_transform_predicate(
+        self.partition_key_predicate = trim_and_transform_predicate(
             self.predicate, self.table.field_names, self.table.partition_keys)
 
         self.target_split_size = 128 * 1024 * 1024
diff --git a/paimon-python/pypaimon/read/table_read.py 
b/paimon-python/pypaimon/read/table_read.py
index 4bf07d37a5..061303862c 100644
--- a/paimon-python/pypaimon/read/table_read.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -22,7 +22,7 @@ import pyarrow
 
 from pypaimon.common.core_options import CoreOptions
 from pypaimon.common.predicate import Predicate
-from pypaimon.read.push_down_utils import filter_predicate_by_fields
+from pypaimon.read.push_down_utils import trim_predicate_by_fields
 from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
 from pypaimon.read.split import Split
 from pypaimon.read.split_read import (MergeFileSplitRead, RawFileSplitRead,
@@ -112,7 +112,7 @@ class TableRead:
         if self.predicate is None:
             return None
         elif self.table.is_primary_key_table:
-            pk_predicate = filter_predicate_by_fields(self.predicate, 
self.table.primary_keys)
+            pk_predicate = trim_predicate_by_fields(self.predicate, 
self.table.primary_keys)
             if not pk_predicate:
                 return None
             return pk_predicate.to_arrow()
diff --git a/paimon-python/pypaimon/tests/py36/reader_predicate_test.py 
b/paimon-python/pypaimon/tests/py36/reader_predicate_test.py
index e772205413..4c52fcdc41 100644
--- a/paimon-python/pypaimon/tests/py36/reader_predicate_test.py
+++ b/paimon-python/pypaimon/tests/py36/reader_predicate_test.py
@@ -25,6 +25,7 @@ import pyarrow as pa
 
 from pypaimon import CatalogFactory
 from pypaimon import Schema
+from pypaimon.read import push_down_utils
 from pypaimon.read.split import Split
 
 
@@ -80,3 +81,13 @@ class ReaderPredicateTest(unittest.TestCase):
         splits: list[Split] = read_builder.new_scan().plan().splits()
         self.assertEqual(len(splits), 1)
         self.assertEqual(splits[0].partition.to_dict().get("pt"), 1003)
+
+    def test_trim_predicate(self):
+        predicate_builder = 
self.table.new_read_builder().new_predicate_builder()
+        p1 = predicate_builder.between('pt', 1002, 1003)
+        p2 = predicate_builder.and_predicates([predicate_builder.equal('pt', 
1003), predicate_builder.equal('a', 3)])
+        predicate = predicate_builder.and_predicates([p1, p2])
+        pred = push_down_utils.trim_predicate_by_fields(predicate, 
self.table.partition_keys)
+        self.assertEqual(len(pred.literals), 2)
+        self.assertEqual(pred.literals[0].field, 'pt')
+        self.assertEqual(pred.literals[1].field, 'pt')
diff --git a/paimon-python/pypaimon/tests/reader_predicate_test.py 
b/paimon-python/pypaimon/tests/reader_predicate_test.py
index e772205413..4c52fcdc41 100644
--- a/paimon-python/pypaimon/tests/reader_predicate_test.py
+++ b/paimon-python/pypaimon/tests/reader_predicate_test.py
@@ -25,6 +25,7 @@ import pyarrow as pa
 
 from pypaimon import CatalogFactory
 from pypaimon import Schema
+from pypaimon.read import push_down_utils
 from pypaimon.read.split import Split
 
 
@@ -80,3 +81,13 @@ class ReaderPredicateTest(unittest.TestCase):
         splits: list[Split] = read_builder.new_scan().plan().splits()
         self.assertEqual(len(splits), 1)
         self.assertEqual(splits[0].partition.to_dict().get("pt"), 1003)
+
+    def test_trim_predicate(self):
+        predicate_builder = 
self.table.new_read_builder().new_predicate_builder()
+        p1 = predicate_builder.between('pt', 1002, 1003)
+        p2 = predicate_builder.and_predicates([predicate_builder.equal('pt', 
1003), predicate_builder.equal('a', 3)])
+        predicate = predicate_builder.and_predicates([p1, p2])
+        pred = push_down_utils.trim_predicate_by_fields(predicate, 
self.table.partition_keys)
+        self.assertEqual(len(pred.literals), 2)
+        self.assertEqual(pred.literals[0].field, 'pt')
+        self.assertEqual(pred.literals[1].field, 'pt')

Reply via email to