This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 40bc087e84 [Python] Support data writer for PyPaimon (#5997)
40bc087e84 is described below

commit 40bc087e8491a21158601d736f84bb5ef9f4cec6
Author: ChengHui Chen <27797326+chenghuic...@users.noreply.github.com>
AuthorDate: Thu Jul 31 15:25:28 2025 +0800

    [Python] Support data writer for PyPaimon (#5997)
---
 paimon-python/pypaimon/common/file_io.py           |   4 +
 .../{common/file_io.py => write/__init__.py}       |  36 -----
 .../file_io.py => write/writer/__init__.py}        |  36 -----
 .../writer/append_only_data_writer.py}             |  38 +----
 paimon-python/pypaimon/write/writer/data_writer.py | 161 +++++++++++++++++++++
 .../pypaimon/write/writer/key_value_data_writer.py |  80 ++++++++++
 6 files changed, 253 insertions(+), 102 deletions(-)

diff --git a/paimon-python/pypaimon/common/file_io.py 
b/paimon-python/pypaimon/common/file_io.py
index f2539ca2c0..b58a936af4 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -51,3 +51,7 @@ class FileIO(ABC):
     @abstractmethod
     def new_input_stream(self, path: Path):
         """"""
+
+    @abstractmethod
+    def get_file_size(self, path: Path):
+        """"""
diff --git a/paimon-python/pypaimon/common/file_io.py 
b/paimon-python/pypaimon/write/__init__.py
similarity index 55%
copy from paimon-python/pypaimon/common/file_io.py
copy to paimon-python/pypaimon/write/__init__.py
index f2539ca2c0..65b48d4d79 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/write/__init__.py
@@ -15,39 +15,3 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-from abc import ABC, abstractmethod
-from pathlib import Path
-
-
-class FileIO(ABC):
-    @abstractmethod
-    def exists(self, path: Path) -> bool:
-        """"""
-
-    @abstractmethod
-    def read_file_utf8(self, path: Path) -> str:
-        """"""
-
-    @abstractmethod
-    def try_to_write_atomic(self, path: Path, content: str) -> bool:
-        """"""
-
-    @abstractmethod
-    def list_status(self, path: Path):
-        """"""
-
-    @abstractmethod
-    def mkdirs(self, path: Path) -> bool:
-        """"""
-
-    @abstractmethod
-    def write_file(self, path: Path, content: str, overwrite: bool = False):
-        """"""
-
-    @abstractmethod
-    def delete_quietly(self, path: Path):
-        """"""
-
-    @abstractmethod
-    def new_input_stream(self, path: Path):
-        """"""
diff --git a/paimon-python/pypaimon/common/file_io.py 
b/paimon-python/pypaimon/write/writer/__init__.py
similarity index 55%
copy from paimon-python/pypaimon/common/file_io.py
copy to paimon-python/pypaimon/write/writer/__init__.py
index f2539ca2c0..65b48d4d79 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/write/writer/__init__.py
@@ -15,39 +15,3 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-from abc import ABC, abstractmethod
-from pathlib import Path
-
-
-class FileIO(ABC):
-    @abstractmethod
-    def exists(self, path: Path) -> bool:
-        """"""
-
-    @abstractmethod
-    def read_file_utf8(self, path: Path) -> str:
-        """"""
-
-    @abstractmethod
-    def try_to_write_atomic(self, path: Path, content: str) -> bool:
-        """"""
-
-    @abstractmethod
-    def list_status(self, path: Path):
-        """"""
-
-    @abstractmethod
-    def mkdirs(self, path: Path) -> bool:
-        """"""
-
-    @abstractmethod
-    def write_file(self, path: Path, content: str, overwrite: bool = False):
-        """"""
-
-    @abstractmethod
-    def delete_quietly(self, path: Path):
-        """"""
-
-    @abstractmethod
-    def new_input_stream(self, path: Path):
-        """"""
diff --git a/paimon-python/pypaimon/common/file_io.py 
b/paimon-python/pypaimon/write/writer/append_only_data_writer.py
similarity index 55%
copy from paimon-python/pypaimon/common/file_io.py
copy to paimon-python/pypaimon/write/writer/append_only_data_writer.py
index f2539ca2c0..c9d4c8f864 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/write/writer/append_only_data_writer.py
@@ -15,39 +15,17 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-from abc import ABC, abstractmethod
-from pathlib import Path
 
+import pyarrow as pa
 
-class FileIO(ABC):
-    @abstractmethod
-    def exists(self, path: Path) -> bool:
-        """"""
+from pypaimon.write.writer.data_writer import DataWriter
 
-    @abstractmethod
-    def read_file_utf8(self, path: Path) -> str:
-        """"""
 
-    @abstractmethod
-    def try_to_write_atomic(self, path: Path, content: str) -> bool:
-        """"""
+class AppendOnlyDataWriter(DataWriter):
+    """Data writer for append-only tables."""
 
-    @abstractmethod
-    def list_status(self, path: Path):
-        """"""
+    def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch:
+        return data
 
-    @abstractmethod
-    def mkdirs(self, path: Path) -> bool:
-        """"""
-
-    @abstractmethod
-    def write_file(self, path: Path, content: str, overwrite: bool = False):
-        """"""
-
-    @abstractmethod
-    def delete_quietly(self, path: Path):
-        """"""
-
-    @abstractmethod
-    def new_input_stream(self, path: Path):
-        """"""
+    def _merge_data(self, existing_data: pa.RecordBatch, new_data: 
pa.RecordBatch) -> pa.RecordBatch:
+        return pa.concat_tables([existing_data, new_data])
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py 
b/paimon-python/pypaimon/write/writer/data_writer.py
new file mode 100644
index 0000000000..78de30a459
--- /dev/null
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -0,0 +1,161 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import uuid
+
+import pyarrow as pa
+from typing import Tuple, Optional, List
+from pathlib import Path
+from abc import ABC, abstractmethod
+
+from pypaimon.api.core_options import CoreOptions
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.table.file_store_table import FileStoreTable
+from pypaimon.table.row.binary_row import BinaryRow
+
+
+class DataWriter(ABC):
+    """Base class for data writers that handle PyArrow tables directly."""
+
+    def __init__(self, table: FileStoreTable, partition: Tuple, bucket: int):
+        self.table = table
+        self.partition = partition
+        self.bucket = bucket
+
+        self.file_io = self.table.file_io
+        self.trimmed_primary_key_fields = 
self.table.table_schema.get_trimmed_primary_key_fields()
+
+        options = self.table.options
+        self.target_file_size = 256 * 1024 * 1024
+        self.file_format = options.get(CoreOptions.FILE_FORMAT, 
CoreOptions.FILE_FORMAT_PARQUET)
+        self.compression = options.get(CoreOptions.FILE_COMPRESSION, "zstd")
+
+        self.pending_data: Optional[pa.RecordBatch] = None
+        self.committed_files: List[DataFileMeta] = []
+
+    def write(self, data: pa.RecordBatch):
+        processed_data = self._process_data(data)
+
+        if self.pending_data is None:
+            self.pending_data = processed_data
+        else:
+            self.pending_data = self._merge_data(self.pending_data, 
processed_data)
+
+        self._check_and_roll_if_needed()
+
+    def prepare_commit(self) -> List[DataFileMeta]:
+        if self.pending_data is not None and self.pending_data.num_rows > 0:
+            self._write_data_to_file(self.pending_data)
+            self.pending_data = None
+
+        return self.committed_files.copy()
+
+    def close(self):
+        self.pending_data = None
+        self.committed_files.clear()
+
+    @abstractmethod
+    def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch:
+        """Process incoming data (e.g., add system fields, sort). Must be 
implemented by subclasses."""
+
+    @abstractmethod
+    def _merge_data(self, existing_data: pa.RecordBatch, new_data: 
pa.RecordBatch) -> pa.RecordBatch:
+        """Merge existing data with new data. Must be implemented by 
subclasses."""
+
+    def _check_and_roll_if_needed(self):
+        if self.pending_data is None:
+            return
+
+        current_size = self.pending_data.get_total_buffer_size()
+        if current_size > self.target_file_size:
+            split_row = _find_optimal_split_point(self.pending_data, 
self.target_file_size)
+            if split_row > 0:
+                data_to_write = self.pending_data.slice(0, split_row)
+                remaining_data = self.pending_data.slice(split_row)
+
+                self._write_data_to_file(data_to_write)
+                self.pending_data = remaining_data
+                self._check_and_roll_if_needed()
+
+    def _write_data_to_file(self, data: pa.RecordBatch):
+        if data.num_rows == 0:
+            return
+        file_name = f"data-{uuid.uuid4()}.{self.file_format}"
+        file_path = self._generate_file_path(file_name)
+        try:
+            if self.file_format == CoreOptions.FILE_FORMAT_PARQUET:
+                self.file_io.write_parquet(file_path, data, 
compression=self.compression)
+            elif self.file_format == CoreOptions.FILE_FORMAT_ORC:
+                self.file_io.write_orc(file_path, data, 
compression=self.compression)
+            elif self.file_format == CoreOptions.FILE_FORMAT_AVRO:
+                self.file_io.write_avro(file_path, data, 
compression=self.compression)
+            else:
+                raise ValueError(f"Unsupported file format: 
{self.file_format}")
+
+            key_columns_batch = data.select(self.trimmed_primary_key_fields)
+            min_key_data = key_columns_batch.slice(0, 1).to_pylist()[0]
+            max_key_data = key_columns_batch.slice(key_columns_batch.num_rows 
- 1, 1).to_pylist()[0]
+            self.committed_files.append(DataFileMeta(
+                file_name=file_name,
+                file_size=self.file_io.get_file_size(file_path),
+                row_count=data.num_rows,
+                min_key=BinaryRow(min_key_data, 
self.trimmed_primary_key_fields),
+                max_key=BinaryRow(max_key_data, 
self.trimmed_primary_key_fields),
+                key_stats=None,  # TODO
+                value_stats=None,
+                min_sequence_number=0,
+                max_sequence_number=0,
+                schema_id=0,
+                level=0,
+                extra_files=None,
+                file_path=str(file_path),
+            ))
+
+        except Exception as e:
+            raise RuntimeError(f"Failed to write {self.file_format} file 
{file_path}: {e}") from e
+
+    def _generate_file_path(self, file_name: str) -> Path:
+        path_builder = self.table.table_path
+
+        for i, field_name in enumerate(self.table.partition_keys):
+            path_builder = path_builder / (field_name + "=" + 
self.partition[i])
+        path_builder = path_builder / ("bucket-" + str(self.bucket)) / 
file_name
+
+        return path_builder
+
+
+def _find_optimal_split_point(data: pa.RecordBatch, target_size: int) -> int:
+    total_rows = data.num_rows
+    if total_rows <= 1:
+        return 0
+
+    left, right = 1, total_rows
+    best_split = 0
+
+    while left <= right:
+        mid = (left + right) // 2
+        slice_data = data.slice(0, mid)
+        slice_size = slice_data.get_total_buffer_size()
+
+        if slice_size <= target_size:
+            best_split = mid
+            left = mid + 1
+        else:
+            right = mid - 1
+
+    return best_split
diff --git a/paimon-python/pypaimon/write/writer/key_value_data_writer.py 
b/paimon-python/pypaimon/write/writer/key_value_data_writer.py
new file mode 100644
index 0000000000..9b10236e14
--- /dev/null
+++ b/paimon-python/pypaimon/write/writer/key_value_data_writer.py
@@ -0,0 +1,80 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import pyarrow as pa
+import pyarrow.compute as pc
+from typing import Tuple, Dict
+
+from pypaimon.write.writer.data_writer import DataWriter
+
+
+class KeyValueDataWriter(DataWriter):
+    """Data writer for primary key tables with system fields and sorting."""
+
+    def __init__(self, partition: Tuple, bucket: int, file_io, table_schema, 
table_identifier,
+                 target_file_size: int, options: Dict[str, str]):
+        super().__init__(partition, bucket, file_io, table_schema, 
table_identifier,
+                         target_file_size, options)
+        self.sequence_generator = SequenceGenerator()
+        self.trimmed_primary_key = [field.name for field in 
self.table.table_schema.get_trimmed_primary_key_fields()]
+
+    def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch:
+        enhanced_data = self._add_system_fields(data)
+        return self._sort_by_primary_key(enhanced_data)
+
+    def _merge_data(self, existing_data: pa.RecordBatch, new_data: 
pa.RecordBatch) -> pa.RecordBatch:
+        combined = pa.concat_tables([existing_data, new_data])
+        return self._sort_by_primary_key(combined)
+
+    def _add_system_fields(self, data: pa.RecordBatch) -> pa.RecordBatch:
+        """Add system fields: _KEY_{pk_key}, _SEQUENCE_NUMBER, _VALUE_KIND."""
+        num_rows = data.num_rows
+        enhanced_table = data
+
+        for pk_key in reversed(self.trimmed_primary_key):
+            if pk_key in data.column_names:
+                key_column = data.column(pk_key)
+                enhanced_table = enhanced_table.add_column(0, 
f'_KEY_{pk_key}', key_column)
+
+        sequence_column = pa.array([self.sequence_generator.next() for _ in 
range(num_rows)], type=pa.int64())
+        enhanced_table = 
enhanced_table.add_column(len(self.trimmed_primary_key), '_SEQUENCE_NUMBER', 
sequence_column)
+
+        # TODO: support real row kind here
+        value_kind_column = pa.repeat(0, num_rows)
+        enhanced_table = 
enhanced_table.add_column(len(self.trimmed_primary_key) + 1, '_VALUE_KIND',
+                                                   value_kind_column)
+
+        return enhanced_table
+
+    def _sort_by_primary_key(self, data: pa.RecordBatch) -> pa.RecordBatch:
+        sort_keys = self.trimmed_primary_key
+        if '_SEQUENCE_NUMBER' in data.column_names:
+            sort_keys.append('_SEQUENCE_NUMBER')
+
+        sorted_indices = pc.sort_indices(data, sort_keys=sort_keys)
+        sorted_batch = data.take(sorted_indices)
+        return sorted_batch
+
+
+class SequenceGenerator:
+    def __init__(self, start: int = 0):
+        self.current = start
+
+    def next(self) -> int:
+        self.current += 1
+        return self.current

Reply via email to