zhoulii commented on code in PR #6861: URL: https://github.com/apache/paimon/pull/6861#discussion_r2642974429
########## paimon-python/pypaimon/write/partial_column_write.py: ########## @@ -0,0 +1,187 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import bisect +from typing import Dict, List, Optional + +import pyarrow as pa +import pyarrow.compute as pc + +from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER +from pypaimon.table.row.generic_row import GenericRow +from pypaimon.table.special_fields import SpecialFields +from pypaimon.write.file_store_write import FileStoreWrite + + +class PartialColumnWrite: + """ + Table write for partial column updates (data evolution). + + This writer is designed for adding/updating specific columns in existing tables. + Input data should contain _ROW_ID column. + """ + + FIRST_ROW_ID_COLUMN = '_FIRST_ROW_ID' + + def __init__(self, table, commit_user: str): + from pypaimon.table.file_store_table import FileStoreTable + + self.table: FileStoreTable = table + self.commit_user = commit_user + + # Load existing first_row_ids and build partition map + self.first_row_ids, self.first_row_id_to_partition_map = self._load_existing_files_info() + + # Collect commit messages + self.commit_messages = [] + + def _load_existing_files_info(self): + """Load existing first_row_ids and build partition map for efficient lookup.""" + first_row_ids = [] + first_row_id_to_partition_map: Dict[int, GenericRow] = {} + + read_builder = self.table.new_read_builder() + scan = read_builder.new_scan() + splits = scan.plan().splits() + + for split in splits: + for file in split.files: + if file.first_row_id is not None: + first_row_ids.append(file.first_row_id) + first_row_id_to_partition_map[file.first_row_id] = split.partition + + return sorted(list(set(first_row_ids))), first_row_id_to_partition_map + + def update_columns(self, data: pa.Table, column_names: List[str]) -> List: + """ + Add or update columns in the table. + + Args: + data: Input data containing row_id and columns to update + column_names: Names of columns to update (excluding row_id) + + Returns: + List of commit messages + """ + + # Validate column_names is not empty + if not column_names: + raise ValueError("column_names cannot be empty") + + # Validate input data has row_id column + if SpecialFields.ROW_ID.name not in data.column_names: + raise ValueError(f"Input data must contain {SpecialFields.ROW_ID.name} column") + + # Validate all update columns exist in the schema + for col_name in column_names: + if col_name not in self.table.field_names: + raise ValueError(f"Column {col_name} not found in table schema") + + # Sort data by _ROW_ID column + sorted_data = data.sort_by([(SpecialFields.ROW_ID.name, "ascending")]) + + # Calculate first_row_id for each row + data_with_first_row_id = self._calculate_first_row_id(sorted_data) + + # Group by first_row_id and write each group + self._write_by_first_row_id(data_with_first_row_id, column_names) + + return self.commit_messages + + def _calculate_first_row_id(self, data: pa.Table) -> pa.Table: + """Calculate _first_row_id for each row based on _ROW_ID.""" + row_ids = data[SpecialFields.ROW_ID.name].to_pylist() + + # Calculate first_row_id for each row_id + first_row_id_values = [] + for row_id in row_ids: + first_row_id = self._floor_binary_search(self.first_row_ids, row_id) + first_row_id_values.append(first_row_id) + + # Add first_row_id column to the table + first_row_id_array = pa.array(first_row_id_values, type=pa.int64()) + return data.append_column(self.FIRST_ROW_ID_COLUMN, first_row_id_array) + + def _floor_binary_search(self, sorted_seq: List[int], value: int) -> int: + """Binary search to find the floor value in sorted sequence.""" + if not sorted_seq: + raise ValueError("The input sorted sequence is empty.") + + idx = bisect.bisect_right(sorted_seq, value) - 1 + if idx < 0: + raise ValueError(f"Value {value} is less than the first element in the sorted sequence.") + + return sorted_seq[idx] + + def _write_by_first_row_id(self, data: pa.Table, column_names: List[str]): + """Write data grouped by first_row_id.""" + # Extract unique first_row_id values + first_row_id_array = data[self.FIRST_ROW_ID_COLUMN] + unique_first_row_ids = pc.unique(first_row_id_array).to_pylist() + + for first_row_id in unique_first_row_ids: + # Filter rows for this first_row_id + mask = pc.equal(first_row_id_array, first_row_id) + group_data = data.filter(mask) + + # Get partition for this first_row_id + partition = self._find_partition_by_first_row_id(first_row_id) + + if partition is None: + raise ValueError(f"No existing file found for first_row_id {first_row_id}") + + # Write this group + self._write_group(partition, first_row_id, group_data, column_names) + + def _find_partition_by_first_row_id(self, first_row_id: int) -> Optional[GenericRow]: + """Find the partition for a given first_row_id using pre-built partition map.""" + return self.first_row_id_to_partition_map.get(first_row_id) + + def _write_group(self, partition: GenericRow, first_row_id: int, Review Comment: Added some row counts validations, if not match, an error would be raised. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
