This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 3d1b30f0038e269f40a89fc0dc1649b6a03d7cd3 Author: Jingsong Lee <[email protected]> AuthorDate: Fri Oct 17 22:28:00 2025 +0200 [python] Filter manifest files by partition predicate in scan (#6419) --- paimon-python/pypaimon/common/predicate.py | 25 +++++++ paimon-python/pypaimon/read/push_down_utils.py | 44 ++++++++++++ paimon-python/pypaimon/read/read_builder.py | 3 +- .../pypaimon/read/scanner/full_starting_scanner.py | 23 ++++-- .../read/scanner/incremental_starting_scanner.py | 10 ++- paimon-python/pypaimon/read/table_scan.py | 14 ++-- paimon-python/pypaimon/table/file_store_table.py | 1 + .../pypaimon/tests/py36/reader_predicate_test.py | 82 ++++++++++++++++++++++ .../pypaimon/tests/reader_predicate_test.py | 82 ++++++++++++++++++++++ paimon-python/pypaimon/write/file_store_commit.py | 2 +- 10 files changed, 262 insertions(+), 24 deletions(-) diff --git a/paimon-python/pypaimon/common/predicate.py b/paimon-python/pypaimon/common/predicate.py index c8a4070c6a..4ca8644f6e 100644 --- a/paimon-python/pypaimon/common/predicate.py +++ b/paimon-python/pypaimon/common/predicate.py @@ -24,6 +24,7 @@ import pyarrow from pyarrow import compute as pyarrow_compute from pyarrow import dataset as pyarrow_dataset +from pypaimon.manifest.schema.simple_stats import SimpleStats from pypaimon.table.row.internal_row import InternalRow @@ -34,6 +35,20 @@ class Predicate: field: Optional[str] literals: Optional[List[Any]] = None + def new_index(self, index: int): + return Predicate( + method=self.method, + index=index, + field=self.field, + literals=self.literals) + + def new_literals(self, literals: List[Any]): + return Predicate( + method=self.method, + index=self.index, + field=self.field, + literals=literals) + def test(self, record: InternalRow) -> bool: if self.method == 'equal': return record.get_field(self.index) == self.literals[0] @@ -125,6 +140,16 @@ class Predicate: raise ValueError("Unsupported predicate method: {}".format(self.method)) + def test_by_simple_stats(self, stat: SimpleStats, row_count: int) -> bool: + return self.test_by_stats({ + "min_values": stat.min_values.to_dict(), + "max_values": stat.max_values.to_dict(), + "null_counts": { + stat.min_values.fields[i].name: stat.null_counts[i] for i in range(len(stat.min_values.fields)) + }, + "row_count": row_count, + }) + def test_by_stats(self, stat: Dict) -> bool: if self.method == 'and': return all(p.test_by_stats(stat) for p in self.literals) diff --git a/paimon-python/pypaimon/read/push_down_utils.py b/paimon-python/pypaimon/read/push_down_utils.py index 95a99d9005..64e7c238f8 100644 --- a/paimon-python/pypaimon/read/push_down_utils.py +++ b/paimon-python/pypaimon/read/push_down_utils.py @@ -21,6 +21,50 @@ from typing import Dict, List, Set from pypaimon.common.predicate import Predicate +def to_partition_predicate(input_predicate: 'Predicate', all_fields: List[str], partition_keys: List[str]): + if not input_predicate or not partition_keys: + return None + + predicates: list['Predicate'] = _split_and(input_predicate) + predicates = [element for element in predicates if _get_all_fields(element).issubset(partition_keys)] + new_predicate = Predicate( + method='and', + index=None, + field=None, + literals=predicates + ) + + part_to_index = {element: idx for idx, element in enumerate(partition_keys)} + mapping: Dict[int, int] = { + i: part_to_index.get(all_fields[i], -1) + for i in range(len(all_fields)) + } + + return _change_index(new_predicate, mapping) + + +def _split_and(input_predicate: 'Predicate'): + if not input_predicate: + return list() + + if input_predicate.method == 'and': + return list(input_predicate.literals) + + return [input_predicate] + + +def _change_index(input_predicate: 'Predicate', mapping: Dict[int, int]): + if not input_predicate: + return None + + if input_predicate.method == 'and' or input_predicate.method == 'or': + predicates: list['Predicate'] = input_predicate.literals + new_predicates = [_change_index(element, mapping) for element in predicates] + return input_predicate.new_literals(new_predicates) + + return input_predicate.new_index(mapping[input_predicate.index]) + + def extract_predicate_to_list(result: list, input_predicate: 'Predicate', keys: List[str]): if not input_predicate or not keys: return diff --git a/paimon-python/pypaimon/read/read_builder.py b/paimon-python/pypaimon/read/read_builder.py index 30f824a698..6fc8026c43 100644 --- a/paimon-python/pypaimon/read/read_builder.py +++ b/paimon-python/pypaimon/read/read_builder.py @@ -52,8 +52,7 @@ class ReadBuilder: return TableScan( table=self.table, predicate=self._predicate, - limit=self._limit, - read_type=self.read_type() + limit=self._limit ) def new_read(self) -> TableRead: diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py index 73915a0412..a363a6fb6f 100644 --- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py +++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py @@ -25,25 +25,25 @@ from pypaimon.manifest.manifest_file_manager import ManifestFileManager from pypaimon.manifest.manifest_list_manager import ManifestListManager from pypaimon.manifest.schema.data_file_meta import DataFileMeta from pypaimon.manifest.schema.manifest_entry import ManifestEntry +from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta from pypaimon.read.interval_partition import IntervalPartition, SortedRun from pypaimon.read.plan import Plan from pypaimon.read.push_down_utils import (extract_predicate_to_dict, - extract_predicate_to_list) + extract_predicate_to_list, + to_partition_predicate) from pypaimon.read.scanner.starting_scanner import StartingScanner from pypaimon.read.split import Split -from pypaimon.schema.data_types import DataField from pypaimon.snapshot.snapshot_manager import SnapshotManager from pypaimon.table.bucket_mode import BucketMode class FullStartingScanner(StartingScanner): - def __init__(self, table, predicate: Optional[Predicate], limit: Optional[int], read_type: List[DataField]): + def __init__(self, table, predicate: Optional[Predicate], limit: Optional[int]): from pypaimon.table.file_store_table import FileStoreTable self.table: FileStoreTable = table self.predicate = predicate self.limit = limit - self.read_type = read_type self.snapshot_manager = SnapshotManager(table) self.manifest_list_manager = ManifestListManager(table) @@ -82,15 +82,26 @@ class FullStartingScanner(StartingScanner): splits = self._apply_push_down_limit(splits) return Plan(splits) - def plan_files(self) -> List[ManifestEntry]: + def _read_manifest_files(self) -> List[ManifestFileMeta]: latest_snapshot = self.snapshot_manager.get_latest_snapshot() if not latest_snapshot: return [] manifest_files = self.manifest_list_manager.read_all(latest_snapshot) + partition_predicate = to_partition_predicate(self.predicate, self.table.field_names, self.table.partition_keys) + + def test_predicate(file: ManifestFileMeta) -> bool: + if not partition_predicate: + return True + return partition_predicate.test_by_simple_stats( + file.partition_stats, + file.num_added_files + file.num_deleted_files) + return [file for file in manifest_files if test_predicate(file)] + + def plan_files(self) -> List[ManifestEntry]: + manifest_files = self._read_manifest_files() deleted_entries = set() added_entries = [] - # TODO: filter manifest files by predicate for manifest_file in manifest_files: manifest_entries = self.manifest_file_manager.read(manifest_file.file_name, lambda row: self._bucket_filter(row)) diff --git a/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py b/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py index f1a9cd03bc..ead58d260a 100644 --- a/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py +++ b/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py @@ -20,14 +20,13 @@ from typing import List, Optional from pypaimon.common.predicate import Predicate from pypaimon.manifest.schema.manifest_entry import ManifestEntry from pypaimon.read.scanner.full_starting_scanner import FullStartingScanner -from pypaimon.schema.data_types import DataField from pypaimon.snapshot.snapshot_manager import SnapshotManager class IncrementalStartingScanner(FullStartingScanner): def __init__(self, table, predicate: Optional[Predicate], limit: Optional[int], - read_type: List[DataField], start: int, end: int): - super().__init__(table, predicate, limit, read_type) + start: int, end: int): + super().__init__(table, predicate, limit) self.startingSnapshotId = start self.endingSnapshotId = end @@ -55,8 +54,7 @@ class IncrementalStartingScanner(FullStartingScanner): @staticmethod def between_timestamps(table, predicate: Optional[Predicate], limit: Optional[int], - read_type: List[DataField], start_timestamp: int, - end_timestamp: int) -> 'IncrementalStartingScanner': + start_timestamp: int, end_timestamp: int) -> 'IncrementalStartingScanner': """ Create an IncrementalStartingScanner for snapshots between two timestamps. """ @@ -74,4 +72,4 @@ class IncrementalStartingScanner(FullStartingScanner): latest_snapshot = snapshot_manager.get_latest_snapshot() end_id = end_snapshot.id if end_snapshot else (latest_snapshot.id if latest_snapshot else -1) - return IncrementalStartingScanner(table, predicate, limit, read_type, start_id, end_id) + return IncrementalStartingScanner(table, predicate, limit, start_id, end_id) diff --git a/paimon-python/pypaimon/read/table_scan.py b/paimon-python/pypaimon/read/table_scan.py index 93a7babb30..22994d4bb2 100644 --- a/paimon-python/pypaimon/read/table_scan.py +++ b/paimon-python/pypaimon/read/table_scan.py @@ -16,7 +16,7 @@ # limitations under the License. ################################################################################ -from typing import List, Optional +from typing import Optional from pypaimon.common.core_options import CoreOptions from pypaimon.common.predicate import Predicate @@ -27,21 +27,18 @@ from pypaimon.read.scanner.full_starting_scanner import FullStartingScanner from pypaimon.read.scanner.incremental_starting_scanner import \ IncrementalStartingScanner from pypaimon.read.scanner.starting_scanner import StartingScanner -from pypaimon.schema.data_types import DataField from pypaimon.snapshot.snapshot_manager import SnapshotManager class TableScan: """Implementation of TableScan for native Python reading.""" - def __init__(self, table, predicate: Optional[Predicate], limit: Optional[int], - read_type: List[DataField]): + def __init__(self, table, predicate: Optional[Predicate], limit: Optional[int]): from pypaimon.table.file_store_table import FileStoreTable self.table: FileStoreTable = table self.predicate = predicate self.limit = limit - self.read_type = read_type self.starting_scanner = self._create_starting_scanner() def plan(self) -> Plan: @@ -67,10 +64,9 @@ class TableScan: if (start_timestamp == end_timestamp or start_timestamp > latest_snapshot.time_millis or end_timestamp < earliest_snapshot.time_millis): return EmptyStartingScanner() - return IncrementalStartingScanner.between_timestamps(self.table, self.predicate, self.limit, self.read_type, - start_timestamp, - end_timestamp) - return FullStartingScanner(self.table, self.predicate, self.limit, self.read_type) + return IncrementalStartingScanner.between_timestamps(self.table, self.predicate, self.limit, + start_timestamp, end_timestamp) + return FullStartingScanner(self.table, self.predicate, self.limit) def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 'TableScan': self.starting_scanner.with_shard(idx_of_this_subtask, number_of_para_subtasks) diff --git a/paimon-python/pypaimon/table/file_store_table.py b/paimon-python/pypaimon/table/file_store_table.py index 57ccbf83bb..f0186b1657 100644 --- a/paimon-python/pypaimon/table/file_store_table.py +++ b/paimon-python/pypaimon/table/file_store_table.py @@ -46,6 +46,7 @@ class FileStoreTable(Table): self.table_schema = table_schema self.fields = table_schema.fields + self.field_names = [field.name for field in table_schema.fields] self.field_dict = {field.name: field for field in self.fields} self.primary_keys = table_schema.primary_keys self.partition_keys = table_schema.partition_keys diff --git a/paimon-python/pypaimon/tests/py36/reader_predicate_test.py b/paimon-python/pypaimon/tests/py36/reader_predicate_test.py new file mode 100644 index 0000000000..e772205413 --- /dev/null +++ b/paimon-python/pypaimon/tests/py36/reader_predicate_test.py @@ -0,0 +1,82 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import os +import shutil +import tempfile +import unittest + +import pyarrow as pa + +from pypaimon import CatalogFactory +from pypaimon import Schema +from pypaimon.read.split import Split + + +class ReaderPredicateTest(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.tempdir = tempfile.mkdtemp() + cls.warehouse = os.path.join(cls.tempdir, 'warehouse') + cls.catalog = CatalogFactory.create({ + 'warehouse': cls.warehouse + }) + cls.catalog.create_database('default', False) + + cls.pa_schema = pa.schema([ + ('a', pa.int64()), + ('pt', pa.int64()) + ]) + schema = Schema.from_pyarrow_schema(cls.pa_schema, partition_keys=['pt']) + cls.catalog.create_table('default.test_reader_predicate', schema, False) + cls.table = cls.catalog.get_table('default.test_reader_predicate') + + data1 = pa.Table.from_pydict({ + 'a': [1, 2], + 'pt': [1001, 1002]}, schema=cls.pa_schema) + write_builder = cls.table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + table_write.write_arrow(data1) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + data2 = pa.Table.from_pydict({ + 'a': [3, 4], + 'pt': [1003, 1004]}, schema=cls.pa_schema) + write_builder = cls.table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + table_write.write_arrow(data2) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.tempdir, ignore_errors=True) + + def test_partition_predicate(self): + predicate_builder = self.table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.equal('pt', 1003) + read_builder = self.table.new_read_builder() + read_builder.with_filter(predicate) + splits: list[Split] = read_builder.new_scan().plan().splits() + self.assertEqual(len(splits), 1) + self.assertEqual(splits[0].partition.to_dict().get("pt"), 1003) diff --git a/paimon-python/pypaimon/tests/reader_predicate_test.py b/paimon-python/pypaimon/tests/reader_predicate_test.py new file mode 100644 index 0000000000..e772205413 --- /dev/null +++ b/paimon-python/pypaimon/tests/reader_predicate_test.py @@ -0,0 +1,82 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import os +import shutil +import tempfile +import unittest + +import pyarrow as pa + +from pypaimon import CatalogFactory +from pypaimon import Schema +from pypaimon.read.split import Split + + +class ReaderPredicateTest(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.tempdir = tempfile.mkdtemp() + cls.warehouse = os.path.join(cls.tempdir, 'warehouse') + cls.catalog = CatalogFactory.create({ + 'warehouse': cls.warehouse + }) + cls.catalog.create_database('default', False) + + cls.pa_schema = pa.schema([ + ('a', pa.int64()), + ('pt', pa.int64()) + ]) + schema = Schema.from_pyarrow_schema(cls.pa_schema, partition_keys=['pt']) + cls.catalog.create_table('default.test_reader_predicate', schema, False) + cls.table = cls.catalog.get_table('default.test_reader_predicate') + + data1 = pa.Table.from_pydict({ + 'a': [1, 2], + 'pt': [1001, 1002]}, schema=cls.pa_schema) + write_builder = cls.table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + table_write.write_arrow(data1) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + data2 = pa.Table.from_pydict({ + 'a': [3, 4], + 'pt': [1003, 1004]}, schema=cls.pa_schema) + write_builder = cls.table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + table_write.write_arrow(data2) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.tempdir, ignore_errors=True) + + def test_partition_predicate(self): + predicate_builder = self.table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.equal('pt', 1003) + read_builder = self.table.new_read_builder() + read_builder.with_filter(predicate) + splits: list[Split] = read_builder.new_scan().plan().splits() + self.assertEqual(len(splits), 1) + self.assertEqual(splits[0].partition.to_dict().get("pt"), 1003) diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 97804bbf6b..c2fe33105b 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -102,7 +102,7 @@ class FileStoreCommit: f"in {msg.partition} does not belong to this partition") commit_entries = [] - current_entries = FullStartingScanner(self.table, partition_filter, None, []).plan_files() + current_entries = FullStartingScanner(self.table, partition_filter, None).plan_files() for entry in current_entries: entry.kind = 1 commit_entries.append(entry)
