Copilot commented on code in PR #6390: URL: https://github.com/apache/paimon/pull/6390#discussion_r2428057706
########## paimon-python/pypaimon/write/blob_format_writer.py: ########## @@ -0,0 +1,108 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import struct +import zlib +from typing import BinaryIO, List + +from pypaimon.table.row.blob import Blob, BlobData +from pypaimon.common.delta_varint_compressor import DeltaVarintCompressor + + +class BlobFormatWriter: + VERSION = 1 + MAGIC_NUMBER = 1481511375 + BUFFER_SIZE = 4096 + METADATA_SIZE = 12 # 8-byte length + 4-byte CRC + + def __init__(self, output_stream: BinaryIO): + self.output_stream = output_stream + self.lengths: List[int] = [] + self.position = 0 + + def add_element(self, row) -> None: + if not hasattr(row, 'values') or len(row.values) != 1: + raise ValueError("BlobFormatWriter only supports one field") + + blob_value = row.values[0] + if blob_value is None: + raise ValueError("BlobFormatWriter only supports non-null blob") + + # Support both BlobData and BlobRef via Blob interface + if not isinstance(blob_value, (BlobData, Blob)): Review Comment: The type check `isinstance(blob_value, (BlobData, Blob))` is redundant since `BlobData` is a subclass of `Blob`. Consider simplifying to `isinstance(blob_value, Blob)` for cleaner code. ```suggestion if not isinstance(blob_value, Blob): ``` ########## paimon-python/pypaimon/read/reader/format_blob_reader.py: ########## @@ -0,0 +1,199 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import struct +from pathlib import Path +from typing import List, Optional, Any, Iterator + +import pyarrow as pa +import pyarrow.dataset as ds +from pyarrow import RecordBatch + +from pypaimon.common.delta_varint_compressor import DeltaVarintCompressor +from pypaimon.common.file_io import FileIO +from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader +from pypaimon.schema.data_types import DataField, PyarrowFieldParser +from pypaimon.table.row.blob import Blob, BlobDescriptor, BlobRef +from pypaimon.table.row.generic_row import GenericRow + + +class FormatBlobReader(RecordBatchReader): + + def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str], + full_fields: List[DataField], push_down_predicate: Any): + self._file_io = file_io + self._file_path = file_path + self._push_down_predicate = push_down_predicate + + # Get file size + self._file_size = file_io.get_file_size(file_path) + + # Initialize the low-level blob format reader + self.file_path = file_path + self.blob_lengths: List[int] = [] + self.blob_offsets: List[int] = [] + self.returned = False + self._read_index() + + # Set up fields and schema + if len(read_fields) > 1: + raise RuntimeError("BlobFileFormat only supports one field.") Review Comment: The error message mentions 'BlobFileFormat' but this class is named 'FormatBlobReader'. Consider updating the message to 'FormatBlobReader only supports one field' for consistency. ```suggestion raise RuntimeError("FormatBlobReader only supports one field.") ``` ########## paimon-python/pypaimon/common/delta_varint_compressor.py: ########## @@ -0,0 +1,127 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import io +from typing import List + + +class DeltaVarintCompressor: + + @staticmethod + def compress(data: List[int]) -> bytes: + if not data: + return b'' + + # Estimate output size (conservative: 5 bytes per varint max) + estimated_size = len(data) * 5 + out = io.BytesIO() + out.truncate(estimated_size) # Pre-allocate buffer Review Comment: The `truncate()` call after creating an empty BytesIO doesn't actually pre-allocate memory in Python. The comment is misleading and the operation is unnecessary. Consider removing this line. ```suggestion ``` ########## paimon-python/pypaimon/common/file_io.py: ########## @@ -364,3 +369,53 @@ def record_generator(): with self.new_output_stream(path) as output_stream: fastavro.writer(output_stream, avro_schema, records, **kwargs) + + def write_blob(self, path: Path, data: pyarrow.Table, **kwargs): + try: + # Validate input constraints + if data.num_columns != 1: + raise RuntimeError(f"Blob format only supports a single column, got {data.num_columns} columns") + # Check for null values + column = data.column(0) + if column.null_count > 0: + raise RuntimeError("Blob format does not support null values") + # Convert PyArrow schema to Paimon DataFields + # For blob files, we expect exactly one blob column + field = data.schema[0] + if pyarrow.types.is_large_binary(field.type): + fields = [DataField(0, field.name, AtomicType("BLOB"))] + else: + # Convert other types as needed + paimon_type = PyarrowFieldParser.to_paimon_type(field.type) Review Comment: The `to_paimon_type` method call is missing the required `nullable` parameter. This will cause a TypeError when called with non-large_binary types. ```suggestion paimon_type = PyarrowFieldParser.to_paimon_type(field.type, field.nullable) ``` ########## paimon-python/pypaimon/table/row/generic_row.py: ########## @@ -260,9 +270,12 @@ def to_bytes(cls, binary_row: GenericRow) -> bytes: raise ValueError(f"BinaryRow only support AtomicType yet, meet {field.type.__class__}") type_name = field.type.type.upper() - if any(type_name.startswith(p) for p in ['CHAR', 'VARCHAR', 'STRING', 'BINARY', 'VARBINARY', 'BYTES']): + if any(type_name.startswith(p) for p in ['CHAR', 'VARCHAR', 'STRING', + 'BINARY', 'VARBINARY', 'BYTES', 'BLOB']): Review Comment: The condition includes 'BLOB' in the list of prefixes to check with `startswith()`, but BLOB is an exact match. Consider handling BLOB separately or use exact equality check for better clarity. ```suggestion 'BINARY', 'VARBINARY', 'BYTES']): ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
