Copilot commented on code in PR #6390:
URL: https://github.com/apache/paimon/pull/6390#discussion_r2427806643
##########
paimon-python/pypaimon/common/file_io.py:
##########
@@ -364,3 +369,53 @@ def record_generator():
with self.new_output_stream(path) as output_stream:
fastavro.writer(output_stream, avro_schema, records, **kwargs)
+
+ def write_blob(self, path: Path, data: pyarrow.Table, **kwargs):
+ try:
+ # Validate input constraints
+ if data.num_columns != 1:
+ raise ValueError(f"Blob format only supports a single column,
got {data.num_columns} columns")
Review Comment:
The error message should be more descriptive about why blob format has this
limitation. Consider: `ValueError('Blob format only supports a single column as
each file stores one blob field, got {data.num_columns} columns')`
```suggestion
raise ValueError(f"Blob format only supports a single column
as each file stores one blob field, got {data.num_columns} columns")
```
##########
paimon-python/pypaimon/write/blob_format_writer.py:
##########
@@ -0,0 +1,97 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import struct
+import zlib
+from typing import BinaryIO, List
+
+from pypaimon.table.row.blob import BlobData
+from pypaimon.common.delta_varint_compressor import DeltaVarintCompressor
+
+
+class BlobFormatWriter:
+ VERSION = 1
+ MAGIC_NUMBER = 1481511375 # Same as Java implementation
+ BUFFER_SIZE = 4096
+
+ def __init__(self, output_stream: BinaryIO):
+ self.output_stream = output_stream
+ self.lengths: List[int] = []
+ self.position = 0
+
+ def add_element(self, row) -> None:
+ if not hasattr(row, 'values') or len(row.values) != 1:
+ raise ValueError("BlobFormatWriter only supports one field")
+
+ blob_value = row.values[0]
+ if blob_value is None:
+ raise ValueError("BlobFormatWriter only supports non-null blob")
+
+ if not isinstance(blob_value, BlobData):
+ raise ValueError("Field must be BlobData instance")
+
+ previous_pos = self.position
+ crc32 = 0 # Initialize CRC32
+
+ # Write magic number
+ magic_bytes = struct.pack('<I', self.MAGIC_NUMBER) # Little endian
+ crc32 = self._write_with_crc(magic_bytes, crc32)
+
+ # Write blob data
+ blob_data = blob_value.to_data()
+ crc32 = self._write_with_crc(blob_data, crc32)
+
+ # Calculate total length including magic number, data, length, and CRC
+ # +12 for length (8 bytes) and CRC (4 bytes) fields written after the
data;
+ # this makes the total calculation correct
+ bin_length = self.position - previous_pos + 12
+ self.lengths.append(bin_length)
Review Comment:
The magic number `12` represents the combined size of length (8 bytes) and
CRC (4 bytes) fields. Consider defining this as a named constant like
`METADATA_SIZE = 12` to improve code clarity and maintainability.
##########
paimon-python/pypaimon/read/reader/format_blob_reader.py:
##########
@@ -0,0 +1,204 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import struct
+from typing import List, Optional, Any, Iterator
+
+import pyarrow as pa
+import pyarrow.dataset as ds
+from pyarrow import RecordBatch
+
+from pypaimon.common.delta_varint_compressor import DeltaVarintCompressor
+from pypaimon.common.file_io import FileIO
+from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
+from pypaimon.schema.data_types import DataField, PyarrowFieldParser
+from pypaimon.table.row.blob import Blob, BlobDescriptor, BlobRef
+from pypaimon.table.row.generic_row import GenericRow
+
+
+class FormatBlobReader(RecordBatchReader):
+
+ def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str],
+ full_fields: List[DataField], push_down_predicate: Any):
+ self._file_io = file_io
+ self._file_path = file_path
+ self._push_down_predicate = push_down_predicate
+
+ # Get file size
+ self._file_size = file_io.get_file_size(file_path)
+
+ # Initialize the low-level blob format reader
+ self.file_path = file_path
+ self.blob_lengths: List[int] = []
+ self.blob_offsets: List[int] = []
+ self.returned = False
+ self._read_index()
+
+ # Set up fields and schema
+ if len(read_fields) > 1:
+ raise RuntimeError("BlobFileFormat only supports one field.")
+ self._fields = read_fields
+ full_fields_map = {field.name: field for field in full_fields}
+ projected_data_fields = [full_fields_map[name] for name in read_fields]
+ self._schema =
PyarrowFieldParser.from_paimon_schema(projected_data_fields)
+
+ # Initialize iterator
+ self._blob_iterator = None
+ self._current_batch = None
+
+ def read_arrow_batch(self) -> Optional[RecordBatch]:
+ if self._blob_iterator is None:
+ if self.returned:
+ return None
+ self.returned = True
+ batch_iterator = BlobRecordIterator(self.file_path,
self.blob_lengths, self.blob_offsets, self._fields[0])
+ self._blob_iterator = iter(batch_iterator)
+
+ # Collect records for this batch
+ pydict_data = {name: [] for name in self._fields}
+ records_in_batch = 0
+
+ try:
+ while True:
+ # Get next blob record
+ blob_row = next(self._blob_iterator)
+ # Check if first read returns None, stop immediately
+ if blob_row is None:
+ break
+
+ # Extract blob data from the row
+ blob = blob_row.values[0] # Blob files have single blob field
+
+ # Convert blob to appropriate format for each requested field
+ for field_name in self._fields:
+ # For blob files, all fields should contain blob data
+ if isinstance(blob, Blob):
+ blob_data = blob.to_data()
+ else:
+ blob_data = bytes(blob) if blob is not None else None
+ pydict_data[field_name].append(blob_data)
+
+ records_in_batch += 1
+
+ except StopIteration:
+ # Stop immediately when StopIteration occurs
+ pass
+
+ if records_in_batch == 0:
+ return None
+
+ # Create RecordBatch
+ if self._push_down_predicate is None:
+ # Convert to Table first, then to RecordBatch
+ table = pa.Table.from_pydict(pydict_data, self._schema)
+ if table.num_rows > 0:
+ return table.to_batches()[0]
+ else:
+ return None
+ else:
+ # Apply predicate filtering
+ pa_batch = pa.Table.from_pydict(pydict_data, self._schema)
+ dataset = ds.InMemoryDataset(pa_batch)
+ scanner = dataset.scanner(filter=self._push_down_predicate)
+ combine_chunks = scanner.to_table().combine_chunks()
+ if combine_chunks.num_rows > 0:
+ return combine_chunks.to_batches()[0]
+ else:
+ return None
+
+ def close(self):
+ self._blob_iterator = None
+
+ def _read_index(self) -> None:
+ with open(self.file_path, 'rb') as f:
+ f.seek(self._file_size - 5)
+ header = f.read(5)
+
+ if len(header) != 5:
+ raise IOError("Invalid blob file: cannot read header")
+
+ # Parse header
+ index_length = struct.unpack('<I', header[:4])[0] # Little endian
+ version = header[4]
+
+ if version != 1:
+ raise IOError(f"Unsupported blob file version: {version}")
+
+ # Read index data
+ f.seek(self._file_size - 5 - index_length)
+ index_bytes = f.read(index_length)
+
+ if len(index_bytes) != index_length:
+ raise IOError("Invalid blob file: cannot read index")
+
+ # Decompress blob lengths
+ blob_lengths = DeltaVarintCompressor.decompress(index_bytes)
+
+ # Calculate blob offsets
+ blob_offsets = []
+ offset = 0
+ for length in blob_lengths:
+ blob_offsets.append(offset)
+ offset += length
+ self.blob_lengths = blob_lengths
+ self.blob_offsets = blob_offsets
+
+
+class BlobRecordIterator:
+ def __init__(self, file_path: str, blob_lengths: List[int], blob_offsets:
List[int], field_name: str):
+ self.file_path = file_path
+ self.field_name = field_name
+ self.blob_lengths = blob_lengths
+ self.blob_offsets = blob_offsets
+ self.current_position = 0
+
+ def __iter__(self) -> Iterator[GenericRow]:
+ return self
+
+ def __next__(self) -> GenericRow:
+ if self.current_position >= len(self.blob_lengths):
+ raise StopIteration
+
+ # Create blob reference for the current blob
+ # Skip magic number (4 bytes) and exclude length (8 bytes) + CRC (4
bytes) = 12 bytes
+ blob_offset = self.blob_offsets[self.current_position] + 4 # Skip
magic number
+ blob_length = self.blob_lengths[self.current_position] - 16 # Exclude
magic(4) + length(8) + CRC(4)
Review Comment:
The magic numbers `4` and `16` should be defined as named constants.
Consider adding `MAGIC_NUMBER_SIZE = 4` and `METADATA_OVERHEAD = 16` to improve
code readability and reduce the risk of calculation errors.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]