This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new 40bc087e84 [Python] Support data writer for PyPaimon (#5997) 40bc087e84 is described below commit 40bc087e8491a21158601d736f84bb5ef9f4cec6 Author: ChengHui Chen <27797326+chenghuic...@users.noreply.github.com> AuthorDate: Thu Jul 31 15:25:28 2025 +0800 [Python] Support data writer for PyPaimon (#5997) --- paimon-python/pypaimon/common/file_io.py | 4 + .../{common/file_io.py => write/__init__.py} | 36 ----- .../file_io.py => write/writer/__init__.py} | 36 ----- .../writer/append_only_data_writer.py} | 38 +---- paimon-python/pypaimon/write/writer/data_writer.py | 161 +++++++++++++++++++++ .../pypaimon/write/writer/key_value_data_writer.py | 80 ++++++++++ 6 files changed, 253 insertions(+), 102 deletions(-) diff --git a/paimon-python/pypaimon/common/file_io.py b/paimon-python/pypaimon/common/file_io.py index f2539ca2c0..b58a936af4 100644 --- a/paimon-python/pypaimon/common/file_io.py +++ b/paimon-python/pypaimon/common/file_io.py @@ -51,3 +51,7 @@ class FileIO(ABC): @abstractmethod def new_input_stream(self, path: Path): """""" + + @abstractmethod + def get_file_size(self, path: Path): + """""" diff --git a/paimon-python/pypaimon/common/file_io.py b/paimon-python/pypaimon/write/__init__.py similarity index 55% copy from paimon-python/pypaimon/common/file_io.py copy to paimon-python/pypaimon/write/__init__.py index f2539ca2c0..65b48d4d79 100644 --- a/paimon-python/pypaimon/common/file_io.py +++ b/paimon-python/pypaimon/write/__init__.py @@ -15,39 +15,3 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ -from abc import ABC, abstractmethod -from pathlib import Path - - -class FileIO(ABC): - @abstractmethod - def exists(self, path: Path) -> bool: - """""" - - @abstractmethod - def read_file_utf8(self, path: Path) -> str: - """""" - - @abstractmethod - def try_to_write_atomic(self, path: Path, content: str) -> bool: - """""" - - @abstractmethod - def list_status(self, path: Path): - """""" - - @abstractmethod - def mkdirs(self, path: Path) -> bool: - """""" - - @abstractmethod - def write_file(self, path: Path, content: str, overwrite: bool = False): - """""" - - @abstractmethod - def delete_quietly(self, path: Path): - """""" - - @abstractmethod - def new_input_stream(self, path: Path): - """""" diff --git a/paimon-python/pypaimon/common/file_io.py b/paimon-python/pypaimon/write/writer/__init__.py similarity index 55% copy from paimon-python/pypaimon/common/file_io.py copy to paimon-python/pypaimon/write/writer/__init__.py index f2539ca2c0..65b48d4d79 100644 --- a/paimon-python/pypaimon/common/file_io.py +++ b/paimon-python/pypaimon/write/writer/__init__.py @@ -15,39 +15,3 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ -from abc import ABC, abstractmethod -from pathlib import Path - - -class FileIO(ABC): - @abstractmethod - def exists(self, path: Path) -> bool: - """""" - - @abstractmethod - def read_file_utf8(self, path: Path) -> str: - """""" - - @abstractmethod - def try_to_write_atomic(self, path: Path, content: str) -> bool: - """""" - - @abstractmethod - def list_status(self, path: Path): - """""" - - @abstractmethod - def mkdirs(self, path: Path) -> bool: - """""" - - @abstractmethod - def write_file(self, path: Path, content: str, overwrite: bool = False): - """""" - - @abstractmethod - def delete_quietly(self, path: Path): - """""" - - @abstractmethod - def new_input_stream(self, path: Path): - """""" diff --git a/paimon-python/pypaimon/common/file_io.py b/paimon-python/pypaimon/write/writer/append_only_data_writer.py similarity index 55% copy from paimon-python/pypaimon/common/file_io.py copy to paimon-python/pypaimon/write/writer/append_only_data_writer.py index f2539ca2c0..c9d4c8f864 100644 --- a/paimon-python/pypaimon/common/file_io.py +++ b/paimon-python/pypaimon/write/writer/append_only_data_writer.py @@ -15,39 +15,17 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ -from abc import ABC, abstractmethod -from pathlib import Path +import pyarrow as pa -class FileIO(ABC): - @abstractmethod - def exists(self, path: Path) -> bool: - """""" +from pypaimon.write.writer.data_writer import DataWriter - @abstractmethod - def read_file_utf8(self, path: Path) -> str: - """""" - @abstractmethod - def try_to_write_atomic(self, path: Path, content: str) -> bool: - """""" +class AppendOnlyDataWriter(DataWriter): + """Data writer for append-only tables.""" - @abstractmethod - def list_status(self, path: Path): - """""" + def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch: + return data - @abstractmethod - def mkdirs(self, path: Path) -> bool: - """""" - - @abstractmethod - def write_file(self, path: Path, content: str, overwrite: bool = False): - """""" - - @abstractmethod - def delete_quietly(self, path: Path): - """""" - - @abstractmethod - def new_input_stream(self, path: Path): - """""" + def _merge_data(self, existing_data: pa.RecordBatch, new_data: pa.RecordBatch) -> pa.RecordBatch: + return pa.concat_tables([existing_data, new_data]) diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py new file mode 100644 index 0000000000..78de30a459 --- /dev/null +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -0,0 +1,161 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import uuid + +import pyarrow as pa +from typing import Tuple, Optional, List +from pathlib import Path +from abc import ABC, abstractmethod + +from pypaimon.api.core_options import CoreOptions +from pypaimon.manifest.schema.data_file_meta import DataFileMeta +from pypaimon.table.file_store_table import FileStoreTable +from pypaimon.table.row.binary_row import BinaryRow + + +class DataWriter(ABC): + """Base class for data writers that handle PyArrow tables directly.""" + + def __init__(self, table: FileStoreTable, partition: Tuple, bucket: int): + self.table = table + self.partition = partition + self.bucket = bucket + + self.file_io = self.table.file_io + self.trimmed_primary_key_fields = self.table.table_schema.get_trimmed_primary_key_fields() + + options = self.table.options + self.target_file_size = 256 * 1024 * 1024 + self.file_format = options.get(CoreOptions.FILE_FORMAT, CoreOptions.FILE_FORMAT_PARQUET) + self.compression = options.get(CoreOptions.FILE_COMPRESSION, "zstd") + + self.pending_data: Optional[pa.RecordBatch] = None + self.committed_files: List[DataFileMeta] = [] + + def write(self, data: pa.RecordBatch): + processed_data = self._process_data(data) + + if self.pending_data is None: + self.pending_data = processed_data + else: + self.pending_data = self._merge_data(self.pending_data, processed_data) + + self._check_and_roll_if_needed() + + def prepare_commit(self) -> List[DataFileMeta]: + if self.pending_data is not None and self.pending_data.num_rows > 0: + self._write_data_to_file(self.pending_data) + self.pending_data = None + + return self.committed_files.copy() + + def close(self): + self.pending_data = None + self.committed_files.clear() + + @abstractmethod + def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch: + """Process incoming data (e.g., add system fields, sort). Must be implemented by subclasses.""" + + @abstractmethod + def _merge_data(self, existing_data: pa.RecordBatch, new_data: pa.RecordBatch) -> pa.RecordBatch: + """Merge existing data with new data. Must be implemented by subclasses.""" + + def _check_and_roll_if_needed(self): + if self.pending_data is None: + return + + current_size = self.pending_data.get_total_buffer_size() + if current_size > self.target_file_size: + split_row = _find_optimal_split_point(self.pending_data, self.target_file_size) + if split_row > 0: + data_to_write = self.pending_data.slice(0, split_row) + remaining_data = self.pending_data.slice(split_row) + + self._write_data_to_file(data_to_write) + self.pending_data = remaining_data + self._check_and_roll_if_needed() + + def _write_data_to_file(self, data: pa.RecordBatch): + if data.num_rows == 0: + return + file_name = f"data-{uuid.uuid4()}.{self.file_format}" + file_path = self._generate_file_path(file_name) + try: + if self.file_format == CoreOptions.FILE_FORMAT_PARQUET: + self.file_io.write_parquet(file_path, data, compression=self.compression) + elif self.file_format == CoreOptions.FILE_FORMAT_ORC: + self.file_io.write_orc(file_path, data, compression=self.compression) + elif self.file_format == CoreOptions.FILE_FORMAT_AVRO: + self.file_io.write_avro(file_path, data, compression=self.compression) + else: + raise ValueError(f"Unsupported file format: {self.file_format}") + + key_columns_batch = data.select(self.trimmed_primary_key_fields) + min_key_data = key_columns_batch.slice(0, 1).to_pylist()[0] + max_key_data = key_columns_batch.slice(key_columns_batch.num_rows - 1, 1).to_pylist()[0] + self.committed_files.append(DataFileMeta( + file_name=file_name, + file_size=self.file_io.get_file_size(file_path), + row_count=data.num_rows, + min_key=BinaryRow(min_key_data, self.trimmed_primary_key_fields), + max_key=BinaryRow(max_key_data, self.trimmed_primary_key_fields), + key_stats=None, # TODO + value_stats=None, + min_sequence_number=0, + max_sequence_number=0, + schema_id=0, + level=0, + extra_files=None, + file_path=str(file_path), + )) + + except Exception as e: + raise RuntimeError(f"Failed to write {self.file_format} file {file_path}: {e}") from e + + def _generate_file_path(self, file_name: str) -> Path: + path_builder = self.table.table_path + + for i, field_name in enumerate(self.table.partition_keys): + path_builder = path_builder / (field_name + "=" + self.partition[i]) + path_builder = path_builder / ("bucket-" + str(self.bucket)) / file_name + + return path_builder + + +def _find_optimal_split_point(data: pa.RecordBatch, target_size: int) -> int: + total_rows = data.num_rows + if total_rows <= 1: + return 0 + + left, right = 1, total_rows + best_split = 0 + + while left <= right: + mid = (left + right) // 2 + slice_data = data.slice(0, mid) + slice_size = slice_data.get_total_buffer_size() + + if slice_size <= target_size: + best_split = mid + left = mid + 1 + else: + right = mid - 1 + + return best_split diff --git a/paimon-python/pypaimon/write/writer/key_value_data_writer.py b/paimon-python/pypaimon/write/writer/key_value_data_writer.py new file mode 100644 index 0000000000..9b10236e14 --- /dev/null +++ b/paimon-python/pypaimon/write/writer/key_value_data_writer.py @@ -0,0 +1,80 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import pyarrow as pa +import pyarrow.compute as pc +from typing import Tuple, Dict + +from pypaimon.write.writer.data_writer import DataWriter + + +class KeyValueDataWriter(DataWriter): + """Data writer for primary key tables with system fields and sorting.""" + + def __init__(self, partition: Tuple, bucket: int, file_io, table_schema, table_identifier, + target_file_size: int, options: Dict[str, str]): + super().__init__(partition, bucket, file_io, table_schema, table_identifier, + target_file_size, options) + self.sequence_generator = SequenceGenerator() + self.trimmed_primary_key = [field.name for field in self.table.table_schema.get_trimmed_primary_key_fields()] + + def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch: + enhanced_data = self._add_system_fields(data) + return self._sort_by_primary_key(enhanced_data) + + def _merge_data(self, existing_data: pa.RecordBatch, new_data: pa.RecordBatch) -> pa.RecordBatch: + combined = pa.concat_tables([existing_data, new_data]) + return self._sort_by_primary_key(combined) + + def _add_system_fields(self, data: pa.RecordBatch) -> pa.RecordBatch: + """Add system fields: _KEY_{pk_key}, _SEQUENCE_NUMBER, _VALUE_KIND.""" + num_rows = data.num_rows + enhanced_table = data + + for pk_key in reversed(self.trimmed_primary_key): + if pk_key in data.column_names: + key_column = data.column(pk_key) + enhanced_table = enhanced_table.add_column(0, f'_KEY_{pk_key}', key_column) + + sequence_column = pa.array([self.sequence_generator.next() for _ in range(num_rows)], type=pa.int64()) + enhanced_table = enhanced_table.add_column(len(self.trimmed_primary_key), '_SEQUENCE_NUMBER', sequence_column) + + # TODO: support real row kind here + value_kind_column = pa.repeat(0, num_rows) + enhanced_table = enhanced_table.add_column(len(self.trimmed_primary_key) + 1, '_VALUE_KIND', + value_kind_column) + + return enhanced_table + + def _sort_by_primary_key(self, data: pa.RecordBatch) -> pa.RecordBatch: + sort_keys = self.trimmed_primary_key + if '_SEQUENCE_NUMBER' in data.column_names: + sort_keys.append('_SEQUENCE_NUMBER') + + sorted_indices = pc.sort_indices(data, sort_keys=sort_keys) + sorted_batch = data.take(sorted_indices) + return sorted_batch + + +class SequenceGenerator: + def __init__(self, start: int = 0): + self.current = start + + def next(self) -> int: + self.current += 1 + return self.current