This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit e036dec7e24f16c017bd120320cc0a1f6ed1efaf Author: Jingsong Lee <[email protected]> AuthorDate: Fri Oct 17 22:14:53 2025 +0200 [python] Make FileStoreWrite.max_seq_numbers lazied (#6418) --- paimon-python/pypaimon/write/file_store_write.py | 55 ++++++++++++------------ 1 file changed, 27 insertions(+), 28 deletions(-) diff --git a/paimon-python/pypaimon/write/file_store_write.py b/paimon-python/pypaimon/write/file_store_write.py index 35b4a7d980..9ebabf1103 100644 --- a/paimon-python/pypaimon/write/file_store_write.py +++ b/paimon-python/pypaimon/write/file_store_write.py @@ -24,6 +24,7 @@ from pypaimon.write.writer.append_only_data_writer import AppendOnlyDataWriter from pypaimon.write.writer.data_blob_writer import DataBlobWriter from pypaimon.write.writer.data_writer import DataWriter from pypaimon.write.writer.key_value_data_writer import KeyValueDataWriter +from pypaimon.table.bucket_mode import BucketMode class FileStoreWrite: @@ -34,7 +35,7 @@ class FileStoreWrite: self.table: FileStoreTable = table self.data_writers: Dict[Tuple, DataWriter] = {} - self.max_seq_numbers = self._seq_number_stats() # TODO: build this on-demand instead of on all + self.max_seq_numbers: dict = {} self.write_cols = None def write(self, partition: Tuple, bucket: int, data: pa.RecordBatch): @@ -45,27 +46,29 @@ class FileStoreWrite: writer.write(data) def _create_data_writer(self, partition: Tuple, bucket: int) -> DataWriter: + def max_seq_number(): + return self._seq_number_stats(partition).get(bucket, 1) # Check if table has blob columns if self._has_blob_columns(): return DataBlobWriter( table=self.table, partition=partition, bucket=bucket, - max_seq_number=self.max_seq_numbers.get((partition, bucket), 1), + max_seq_number=0, ) elif self.table.is_primary_key_table: return KeyValueDataWriter( table=self.table, partition=partition, bucket=bucket, - max_seq_number=self.max_seq_numbers.get((partition, bucket), 1), - ) + max_seq_number=max_seq_number()) else: + seq_number = 0 if self.table.bucket_mode() == BucketMode.BUCKET_UNAWARE else max_seq_number() return AppendOnlyDataWriter( table=self.table, partition=partition, bucket=bucket, - max_seq_number=self.max_seq_numbers.get((partition, bucket), 1), + max_seq_number=seq_number, write_cols=self.write_cols ) @@ -99,32 +102,28 @@ class FileStoreWrite: writer.close() self.data_writers.clear() - def _seq_number_stats(self) -> dict: - from pypaimon.manifest.manifest_file_manager import ManifestFileManager - from pypaimon.manifest.manifest_list_manager import ManifestListManager - from pypaimon.snapshot.snapshot_manager import SnapshotManager - - snapshot_manager = SnapshotManager(self.table) - manifest_list_manager = ManifestListManager(self.table) - manifest_file_manager = ManifestFileManager(self.table) + def _seq_number_stats(self, partition: Tuple) -> Dict[int, int]: + buckets = self.max_seq_numbers.get(partition) + if buckets is None: + buckets = self._load_seq_number_stats(partition) + self.max_seq_numbers[partition] = buckets + return buckets - latest_snapshot = snapshot_manager.get_latest_snapshot() - if not latest_snapshot: - return {} - manifest_files = manifest_list_manager.read_all(latest_snapshot) + def _load_seq_number_stats(self, partition: Tuple) -> dict: + read_builder = self.table.new_read_builder() + predicate_builder = read_builder.new_predicate_builder() + sub_predicates = [] + for key, value in zip(self.table.partition_keys, partition): + sub_predicates.append(predicate_builder.equal(key, value)) + partition_filter = predicate_builder.and_predicates(sub_predicates) - file_entries = [] - for manifest_file in manifest_files: - manifest_entries = manifest_file_manager.read(manifest_file.file_name) - for entry in manifest_entries: - if entry.kind == 0: - file_entries.append(entry) + scan = read_builder.with_filter(partition_filter).new_scan() + splits = scan.plan().splits() max_seq_numbers = {} - for entry in file_entries: - partition_key = (tuple(entry.partition.values), entry.bucket) - current_seq_num = entry.file.max_sequence_number - existing_max = max_seq_numbers.get(partition_key, -1) + for split in splits: + current_seq_num = max([file.max_sequence_number for file in split.files]) + existing_max = max_seq_numbers.get(split.bucket, -1) if current_seq_num > existing_max: - max_seq_numbers[partition_key] = current_seq_num + max_seq_numbers[split.bucket] = current_seq_num return max_seq_numbers
