This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 3d1b30f0038e269f40a89fc0dc1649b6a03d7cd3
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Oct 17 22:28:00 2025 +0200

    [python] Filter manifest files by partition predicate in scan (#6419)
---
 paimon-python/pypaimon/common/predicate.py         | 25 +++++++
 paimon-python/pypaimon/read/push_down_utils.py     | 44 ++++++++++++
 paimon-python/pypaimon/read/read_builder.py        |  3 +-
 .../pypaimon/read/scanner/full_starting_scanner.py | 23 ++++--
 .../read/scanner/incremental_starting_scanner.py   | 10 ++-
 paimon-python/pypaimon/read/table_scan.py          | 14 ++--
 paimon-python/pypaimon/table/file_store_table.py   |  1 +
 .../pypaimon/tests/py36/reader_predicate_test.py   | 82 ++++++++++++++++++++++
 .../pypaimon/tests/reader_predicate_test.py        | 82 ++++++++++++++++++++++
 paimon-python/pypaimon/write/file_store_commit.py  |  2 +-
 10 files changed, 262 insertions(+), 24 deletions(-)

diff --git a/paimon-python/pypaimon/common/predicate.py 
b/paimon-python/pypaimon/common/predicate.py
index c8a4070c6a..4ca8644f6e 100644
--- a/paimon-python/pypaimon/common/predicate.py
+++ b/paimon-python/pypaimon/common/predicate.py
@@ -24,6 +24,7 @@ import pyarrow
 from pyarrow import compute as pyarrow_compute
 from pyarrow import dataset as pyarrow_dataset
 
+from pypaimon.manifest.schema.simple_stats import SimpleStats
 from pypaimon.table.row.internal_row import InternalRow
 
 
@@ -34,6 +35,20 @@ class Predicate:
     field: Optional[str]
     literals: Optional[List[Any]] = None
 
+    def new_index(self, index: int):
+        return Predicate(
+            method=self.method,
+            index=index,
+            field=self.field,
+            literals=self.literals)
+
+    def new_literals(self, literals: List[Any]):
+        return Predicate(
+            method=self.method,
+            index=self.index,
+            field=self.field,
+            literals=literals)
+
     def test(self, record: InternalRow) -> bool:
         if self.method == 'equal':
             return record.get_field(self.index) == self.literals[0]
@@ -125,6 +140,16 @@ class Predicate:
 
         raise ValueError("Unsupported predicate method: 
{}".format(self.method))
 
+    def test_by_simple_stats(self, stat: SimpleStats, row_count: int) -> bool:
+        return self.test_by_stats({
+            "min_values": stat.min_values.to_dict(),
+            "max_values": stat.max_values.to_dict(),
+            "null_counts": {
+                stat.min_values.fields[i].name: stat.null_counts[i] for i in 
range(len(stat.min_values.fields))
+            },
+            "row_count": row_count,
+        })
+
     def test_by_stats(self, stat: Dict) -> bool:
         if self.method == 'and':
             return all(p.test_by_stats(stat) for p in self.literals)
diff --git a/paimon-python/pypaimon/read/push_down_utils.py 
b/paimon-python/pypaimon/read/push_down_utils.py
index 95a99d9005..64e7c238f8 100644
--- a/paimon-python/pypaimon/read/push_down_utils.py
+++ b/paimon-python/pypaimon/read/push_down_utils.py
@@ -21,6 +21,50 @@ from typing import Dict, List, Set
 from pypaimon.common.predicate import Predicate
 
 
+def to_partition_predicate(input_predicate: 'Predicate', all_fields: 
List[str], partition_keys: List[str]):
+    if not input_predicate or not partition_keys:
+        return None
+
+    predicates: list['Predicate'] = _split_and(input_predicate)
+    predicates = [element for element in predicates if 
_get_all_fields(element).issubset(partition_keys)]
+    new_predicate = Predicate(
+        method='and',
+        index=None,
+        field=None,
+        literals=predicates
+    )
+
+    part_to_index = {element: idx for idx, element in 
enumerate(partition_keys)}
+    mapping: Dict[int, int] = {
+        i: part_to_index.get(all_fields[i], -1)
+        for i in range(len(all_fields))
+    }
+
+    return _change_index(new_predicate, mapping)
+
+
+def _split_and(input_predicate: 'Predicate'):
+    if not input_predicate:
+        return list()
+
+    if input_predicate.method == 'and':
+        return list(input_predicate.literals)
+
+    return [input_predicate]
+
+
+def _change_index(input_predicate: 'Predicate', mapping: Dict[int, int]):
+    if not input_predicate:
+        return None
+
+    if input_predicate.method == 'and' or input_predicate.method == 'or':
+        predicates: list['Predicate'] = input_predicate.literals
+        new_predicates = [_change_index(element, mapping) for element in 
predicates]
+        return input_predicate.new_literals(new_predicates)
+
+    return input_predicate.new_index(mapping[input_predicate.index])
+
+
 def extract_predicate_to_list(result: list, input_predicate: 'Predicate', 
keys: List[str]):
     if not input_predicate or not keys:
         return
diff --git a/paimon-python/pypaimon/read/read_builder.py 
b/paimon-python/pypaimon/read/read_builder.py
index 30f824a698..6fc8026c43 100644
--- a/paimon-python/pypaimon/read/read_builder.py
+++ b/paimon-python/pypaimon/read/read_builder.py
@@ -52,8 +52,7 @@ class ReadBuilder:
         return TableScan(
             table=self.table,
             predicate=self._predicate,
-            limit=self._limit,
-            read_type=self.read_type()
+            limit=self._limit
         )
 
     def new_read(self) -> TableRead:
diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py 
b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
index 73915a0412..a363a6fb6f 100644
--- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
@@ -25,25 +25,25 @@ from pypaimon.manifest.manifest_file_manager import 
ManifestFileManager
 from pypaimon.manifest.manifest_list_manager import ManifestListManager
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
 from pypaimon.manifest.schema.manifest_entry import ManifestEntry
+from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta
 from pypaimon.read.interval_partition import IntervalPartition, SortedRun
 from pypaimon.read.plan import Plan
 from pypaimon.read.push_down_utils import (extract_predicate_to_dict,
-                                           extract_predicate_to_list)
+                                           extract_predicate_to_list,
+                                           to_partition_predicate)
 from pypaimon.read.scanner.starting_scanner import StartingScanner
 from pypaimon.read.split import Split
-from pypaimon.schema.data_types import DataField
 from pypaimon.snapshot.snapshot_manager import SnapshotManager
 from pypaimon.table.bucket_mode import BucketMode
 
 
 class FullStartingScanner(StartingScanner):
-    def __init__(self, table, predicate: Optional[Predicate], limit: 
Optional[int], read_type: List[DataField]):
+    def __init__(self, table, predicate: Optional[Predicate], limit: 
Optional[int]):
         from pypaimon.table.file_store_table import FileStoreTable
 
         self.table: FileStoreTable = table
         self.predicate = predicate
         self.limit = limit
-        self.read_type = read_type
 
         self.snapshot_manager = SnapshotManager(table)
         self.manifest_list_manager = ManifestListManager(table)
@@ -82,15 +82,26 @@ class FullStartingScanner(StartingScanner):
         splits = self._apply_push_down_limit(splits)
         return Plan(splits)
 
-    def plan_files(self) -> List[ManifestEntry]:
+    def _read_manifest_files(self) -> List[ManifestFileMeta]:
         latest_snapshot = self.snapshot_manager.get_latest_snapshot()
         if not latest_snapshot:
             return []
         manifest_files = self.manifest_list_manager.read_all(latest_snapshot)
+        partition_predicate = to_partition_predicate(self.predicate, 
self.table.field_names, self.table.partition_keys)
+
+        def test_predicate(file: ManifestFileMeta) -> bool:
+            if not partition_predicate:
+                return True
+            return partition_predicate.test_by_simple_stats(
+                file.partition_stats,
+                file.num_added_files + file.num_deleted_files)
 
+        return [file for file in manifest_files if test_predicate(file)]
+
+    def plan_files(self) -> List[ManifestEntry]:
+        manifest_files = self._read_manifest_files()
         deleted_entries = set()
         added_entries = []
-        # TODO: filter manifest files by predicate
         for manifest_file in manifest_files:
             manifest_entries = 
self.manifest_file_manager.read(manifest_file.file_name,
                                                                lambda row: 
self._bucket_filter(row))
diff --git 
a/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py 
b/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py
index f1a9cd03bc..ead58d260a 100644
--- a/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py
@@ -20,14 +20,13 @@ from typing import List, Optional
 from pypaimon.common.predicate import Predicate
 from pypaimon.manifest.schema.manifest_entry import ManifestEntry
 from pypaimon.read.scanner.full_starting_scanner import FullStartingScanner
-from pypaimon.schema.data_types import DataField
 from pypaimon.snapshot.snapshot_manager import SnapshotManager
 
 
 class IncrementalStartingScanner(FullStartingScanner):
     def __init__(self, table, predicate: Optional[Predicate], limit: 
Optional[int],
-                 read_type: List[DataField], start: int, end: int):
-        super().__init__(table, predicate, limit, read_type)
+                 start: int, end: int):
+        super().__init__(table, predicate, limit)
         self.startingSnapshotId = start
         self.endingSnapshotId = end
 
@@ -55,8 +54,7 @@ class IncrementalStartingScanner(FullStartingScanner):
 
     @staticmethod
     def between_timestamps(table, predicate: Optional[Predicate], limit: 
Optional[int],
-                           read_type: List[DataField], start_timestamp: int,
-                           end_timestamp: int) -> 'IncrementalStartingScanner':
+                           start_timestamp: int, end_timestamp: int) -> 
'IncrementalStartingScanner':
         """
         Create an IncrementalStartingScanner for snapshots between two 
timestamps.
         """
@@ -74,4 +72,4 @@ class IncrementalStartingScanner(FullStartingScanner):
         latest_snapshot = snapshot_manager.get_latest_snapshot()
         end_id = end_snapshot.id if end_snapshot else (latest_snapshot.id if 
latest_snapshot else -1)
 
-        return IncrementalStartingScanner(table, predicate, limit, read_type, 
start_id, end_id)
+        return IncrementalStartingScanner(table, predicate, limit, start_id, 
end_id)
diff --git a/paimon-python/pypaimon/read/table_scan.py 
b/paimon-python/pypaimon/read/table_scan.py
index 93a7babb30..22994d4bb2 100644
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -16,7 +16,7 @@
 # limitations under the License.
 
################################################################################
 
-from typing import List, Optional
+from typing import Optional
 
 from pypaimon.common.core_options import CoreOptions
 from pypaimon.common.predicate import Predicate
@@ -27,21 +27,18 @@ from pypaimon.read.scanner.full_starting_scanner import 
FullStartingScanner
 from pypaimon.read.scanner.incremental_starting_scanner import \
     IncrementalStartingScanner
 from pypaimon.read.scanner.starting_scanner import StartingScanner
-from pypaimon.schema.data_types import DataField
 from pypaimon.snapshot.snapshot_manager import SnapshotManager
 
 
 class TableScan:
     """Implementation of TableScan for native Python reading."""
 
-    def __init__(self, table, predicate: Optional[Predicate], limit: 
Optional[int],
-                 read_type: List[DataField]):
+    def __init__(self, table, predicate: Optional[Predicate], limit: 
Optional[int]):
         from pypaimon.table.file_store_table import FileStoreTable
 
         self.table: FileStoreTable = table
         self.predicate = predicate
         self.limit = limit
-        self.read_type = read_type
         self.starting_scanner = self._create_starting_scanner()
 
     def plan(self) -> Plan:
@@ -67,10 +64,9 @@ class TableScan:
             if (start_timestamp == end_timestamp or start_timestamp > 
latest_snapshot.time_millis
                     or end_timestamp < earliest_snapshot.time_millis):
                 return EmptyStartingScanner()
-            return IncrementalStartingScanner.between_timestamps(self.table, 
self.predicate, self.limit, self.read_type,
-                                                                 
start_timestamp,
-                                                                 end_timestamp)
-        return FullStartingScanner(self.table, self.predicate, self.limit, 
self.read_type)
+            return IncrementalStartingScanner.between_timestamps(self.table, 
self.predicate, self.limit,
+                                                                 
start_timestamp, end_timestamp)
+        return FullStartingScanner(self.table, self.predicate, self.limit)
 
     def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 
'TableScan':
         self.starting_scanner.with_shard(idx_of_this_subtask, 
number_of_para_subtasks)
diff --git a/paimon-python/pypaimon/table/file_store_table.py 
b/paimon-python/pypaimon/table/file_store_table.py
index 57ccbf83bb..f0186b1657 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -46,6 +46,7 @@ class FileStoreTable(Table):
 
         self.table_schema = table_schema
         self.fields = table_schema.fields
+        self.field_names = [field.name for field in table_schema.fields]
         self.field_dict = {field.name: field for field in self.fields}
         self.primary_keys = table_schema.primary_keys
         self.partition_keys = table_schema.partition_keys
diff --git a/paimon-python/pypaimon/tests/py36/reader_predicate_test.py 
b/paimon-python/pypaimon/tests/py36/reader_predicate_test.py
new file mode 100644
index 0000000000..e772205413
--- /dev/null
+++ b/paimon-python/pypaimon/tests/py36/reader_predicate_test.py
@@ -0,0 +1,82 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory
+from pypaimon import Schema
+from pypaimon.read.split import Split
+
+
+class ReaderPredicateTest(unittest.TestCase):
+    @classmethod
+    def setUpClass(cls):
+        cls.tempdir = tempfile.mkdtemp()
+        cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+        cls.catalog = CatalogFactory.create({
+            'warehouse': cls.warehouse
+        })
+        cls.catalog.create_database('default', False)
+
+        cls.pa_schema = pa.schema([
+            ('a', pa.int64()),
+            ('pt', pa.int64())
+        ])
+        schema = Schema.from_pyarrow_schema(cls.pa_schema, 
partition_keys=['pt'])
+        cls.catalog.create_table('default.test_reader_predicate', schema, 
False)
+        cls.table = cls.catalog.get_table('default.test_reader_predicate')
+
+        data1 = pa.Table.from_pydict({
+            'a': [1, 2],
+            'pt': [1001, 1002]}, schema=cls.pa_schema)
+        write_builder = cls.table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        table_write.write_arrow(data1)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        data2 = pa.Table.from_pydict({
+            'a': [3, 4],
+            'pt': [1003, 1004]}, schema=cls.pa_schema)
+        write_builder = cls.table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        table_write.write_arrow(data2)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+    def test_partition_predicate(self):
+        predicate_builder = 
self.table.new_read_builder().new_predicate_builder()
+        predicate = predicate_builder.equal('pt', 1003)
+        read_builder = self.table.new_read_builder()
+        read_builder.with_filter(predicate)
+        splits: list[Split] = read_builder.new_scan().plan().splits()
+        self.assertEqual(len(splits), 1)
+        self.assertEqual(splits[0].partition.to_dict().get("pt"), 1003)
diff --git a/paimon-python/pypaimon/tests/reader_predicate_test.py 
b/paimon-python/pypaimon/tests/reader_predicate_test.py
new file mode 100644
index 0000000000..e772205413
--- /dev/null
+++ b/paimon-python/pypaimon/tests/reader_predicate_test.py
@@ -0,0 +1,82 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory
+from pypaimon import Schema
+from pypaimon.read.split import Split
+
+
+class ReaderPredicateTest(unittest.TestCase):
+    @classmethod
+    def setUpClass(cls):
+        cls.tempdir = tempfile.mkdtemp()
+        cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+        cls.catalog = CatalogFactory.create({
+            'warehouse': cls.warehouse
+        })
+        cls.catalog.create_database('default', False)
+
+        cls.pa_schema = pa.schema([
+            ('a', pa.int64()),
+            ('pt', pa.int64())
+        ])
+        schema = Schema.from_pyarrow_schema(cls.pa_schema, 
partition_keys=['pt'])
+        cls.catalog.create_table('default.test_reader_predicate', schema, 
False)
+        cls.table = cls.catalog.get_table('default.test_reader_predicate')
+
+        data1 = pa.Table.from_pydict({
+            'a': [1, 2],
+            'pt': [1001, 1002]}, schema=cls.pa_schema)
+        write_builder = cls.table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        table_write.write_arrow(data1)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        data2 = pa.Table.from_pydict({
+            'a': [3, 4],
+            'pt': [1003, 1004]}, schema=cls.pa_schema)
+        write_builder = cls.table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        table_write.write_arrow(data2)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+    def test_partition_predicate(self):
+        predicate_builder = 
self.table.new_read_builder().new_predicate_builder()
+        predicate = predicate_builder.equal('pt', 1003)
+        read_builder = self.table.new_read_builder()
+        read_builder.with_filter(predicate)
+        splits: list[Split] = read_builder.new_scan().plan().splits()
+        self.assertEqual(len(splits), 1)
+        self.assertEqual(splits[0].partition.to_dict().get("pt"), 1003)
diff --git a/paimon-python/pypaimon/write/file_store_commit.py 
b/paimon-python/pypaimon/write/file_store_commit.py
index 97804bbf6b..c2fe33105b 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -102,7 +102,7 @@ class FileStoreCommit:
                                        f"in {msg.partition} does not belong to 
this partition")
 
         commit_entries = []
-        current_entries = FullStartingScanner(self.table, partition_filter, 
None, []).plan_files()
+        current_entries = FullStartingScanner(self.table, partition_filter, 
None).plan_files()
         for entry in current_entries:
             entry.kind = 1
             commit_entries.append(entry)

Reply via email to