This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 4c1408e939bf4bb17a33ddd5710584d1ab6b9dfb Author: ChengHui Chen <[email protected]> AuthorDate: Wed Oct 22 15:43:22 2025 +0800 [Python] parallel read manifest entries (#6451) --- paimon-python/pypaimon/common/core_options.py | 1 + .../pypaimon/manifest/manifest_file_manager.py | 39 +++++++++++++++++++- .../pypaimon/read/scanner/full_starting_scanner.py | 41 +++++++--------------- .../read/scanner/incremental_starting_scanner.py | 8 ++++- 4 files changed, 59 insertions(+), 30 deletions(-) diff --git a/paimon-python/pypaimon/common/core_options.py b/paimon-python/pypaimon/common/core_options.py index cbf35b33e4..be7eff98d2 100644 --- a/paimon-python/pypaimon/common/core_options.py +++ b/paimon-python/pypaimon/common/core_options.py @@ -33,6 +33,7 @@ class CoreOptions(str, Enum): BUCKET = "bucket" BUCKET_KEY = "bucket-key" WAREHOUSE = "warehouse" + MANIFEST_READ_THREADS = "manifest_read_threads" # File format options FILE_FORMAT = "file.format" FILE_FORMAT_ORC = "orc" diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index 9fc92a4113..98180e7b41 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -15,14 +15,16 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ +from concurrent.futures import ThreadPoolExecutor from io import BytesIO -from typing import List +from typing import List, Tuple, Set import fastavro from pypaimon.manifest.schema.data_file_meta import DataFileMeta from pypaimon.manifest.schema.manifest_entry import (MANIFEST_ENTRY_SCHEMA, ManifestEntry) +from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta from pypaimon.manifest.schema.simple_stats import SimpleStats from pypaimon.table.row.generic_row import (GenericRowDeserializer, GenericRowSerializer) @@ -42,6 +44,41 @@ class ManifestFileManager: self.primary_keys_fields = self.table.primary_keys_fields self.trimmed_primary_keys_fields = self.table.trimmed_primary_keys_fields + def read_entries_parallel(self, manifest_files: List[ManifestFileMeta], manifest_file_filter=None, + manifest_entry_filter=None, drop_stats=True, max_workers=8) -> List[ManifestEntry]: + + def _process_single_manifest(manifest_file: ManifestFileMeta) -> Tuple[List[ManifestEntry], Set[tuple]]: + local_added = [] + local_deleted_keys = set() + if manifest_file_filter and not manifest_file_filter(manifest_file): + return local_added, local_deleted_keys + manifest_entries = self.read(manifest_file.file_name, manifest_entry_filter, drop_stats) + for entry in manifest_entries: + if entry.kind == 0: + local_added.append(entry) + else: + key = (tuple(entry.partition.values), entry.bucket, entry.file.file_name) + local_deleted_keys.add(key) + local_final_added = [ + entry for entry in local_added + if (tuple(entry.partition.values), entry.bucket, entry.file.file_name) not in local_deleted_keys + ] + return local_final_added, local_deleted_keys + + deleted_entry_keys = set() + added_entries = [] + with ThreadPoolExecutor(max_workers=max_workers) as executor: + future_results = executor.map(_process_single_manifest, manifest_files) + for added, deleted_keys in future_results: + added_entries.extend(added) + deleted_entry_keys.update(deleted_keys) + + final_entries = [ + entry for entry in added_entries + if (tuple(entry.partition.values), entry.bucket, entry.file.file_name) not in deleted_entry_keys + ] + return final_entries + def read(self, manifest_file_name: str, manifest_entry_filter=None, drop_stats=True) -> List[ManifestEntry]: manifest_file_path = self.manifest_path / manifest_file_name diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py index dc1b178135..1b21619309 100644 --- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py +++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py @@ -15,6 +15,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ +import os from collections import defaultdict from typing import Callable, List, Optional @@ -95,35 +96,12 @@ class FullStartingScanner(StartingScanner): if not latest_snapshot: return [] manifest_files = self.manifest_list_manager.read_all(latest_snapshot) - return self.read_manifest_entries(manifest_files) - def read_manifest_entries(self, manifest_files: List[ManifestFileMeta]) -> List[ManifestEntry]: - def filter_manifest_file(file: ManifestFileMeta) -> bool: - if not self.partition_key_predicate: - return True - return self.partition_key_predicate.test_by_simple_stats( - file.partition_stats, - file.num_added_files + file.num_deleted_files) - - deleted_entries = set() - added_entries = [] - for manifest_file in manifest_files: - if not filter_manifest_file(manifest_file): - continue - manifest_entries = self.manifest_file_manager.read( - manifest_file.file_name, - lambda row: self._filter_manifest_entry(row)) - for entry in manifest_entries: - if entry.kind == 0: - added_entries.append(entry) - else: - deleted_entries.add((tuple(entry.partition.values), entry.bucket, entry.file.file_name)) - - file_entries = [ - entry for entry in added_entries - if (tuple(entry.partition.values), entry.bucket, entry.file.file_name) not in deleted_entries - ] - return file_entries + max_workers = int(self.table.options.get(CoreOptions.MANIFEST_READ_THREADS, (os.cpu_count() or 4) * 2)) + return self.manifest_file_manager.read_entries_parallel(manifest_files, + self._filter_manifest_file, + self._filter_manifest_entry, + max_workers=max_workers) def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 'FullStartingScanner': if idx_of_this_subtask >= number_of_para_subtasks: @@ -224,6 +202,13 @@ class FullStartingScanner(StartingScanner): return limited_splits + def _filter_manifest_file(self, file: ManifestFileMeta) -> bool: + if not self.partition_key_predicate: + return True + return self.partition_key_predicate.test_by_simple_stats( + file.partition_stats, + file.num_added_files + file.num_deleted_files) + def _filter_manifest_entry(self, entry: ManifestEntry) -> bool: if self.only_read_real_buckets and entry.bucket < 0: return False diff --git a/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py b/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py index 0139dafc40..0aa1274b7e 100644 --- a/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py +++ b/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py @@ -15,8 +15,10 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ +import os from typing import List, Optional +from pypaimon.common.core_options import CoreOptions from pypaimon.common.predicate import Predicate from pypaimon.manifest.schema.manifest_entry import ManifestEntry from pypaimon.read.scanner.full_starting_scanner import FullStartingScanner @@ -43,7 +45,11 @@ class IncrementalStartingScanner(FullStartingScanner): for snapshot in snapshots_in_range: # Get manifest files for this snapshot manifest_files = self.manifest_list_manager.read_delta(snapshot) - file_entries.extend(self.read_manifest_entries(manifest_files)) + max_workers = int(self.table.options.get(CoreOptions.MANIFEST_READ_THREADS, (os.cpu_count() or 4) * 2)) + file_entries.extend(self.manifest_file_manager.read_entries_parallel(manifest_files, + self._filter_manifest_file, + self._filter_manifest_entry, + max_workers=max_workers)) return file_entries @staticmethod
