kaori-seasons commented on code in PR #6766: URL: https://github.com/apache/paimon/pull/6766#discussion_r2601236460
########## paimon-python/pypaimon/deletionvectors/bitmap_deletion_vector.py: ########## @@ -0,0 +1,165 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from pypaimon.deletionvectors.deletion_vector import DeletionVector +import struct +import zlib +from pyroaring import BitMap + + +class BitmapDeletionVector(DeletionVector): + """ + A DeletionVector based on RoaringBitmap, it only supports files with row count + not exceeding 2147483647 (max value for 32-bit integer). + """ + + MAGIC_NUMBER = 1581511376 + MAGIC_NUMBER_SIZE_BYTES = 4 + MAX_VALUE = 2147483647 + + def __init__(self, bitmap: BitMap = None): + """ + Initialize a BitmapDeletionVector. + + Args: + bitmap: Optional RoaringBitmap instance. If None, creates an empty bitmap. + """ + self._bitmap = bitmap if bitmap is not None else BitMap() + + def delete(self, position: int) -> None: + """ + Marks the row at the specified position as deleted. + + Args: + position: The position of the row to be marked as deleted. + """ + self._check_position(position) + self._bitmap.add(position) + + def is_deleted(self, position: int) -> bool: + """ + Checks if the row at the specified position is deleted. + + Args: + position: The position of the row to check. + + Returns: + True if the row is deleted, False otherwise. + """ + self._check_position(position) + return position in self._bitmap + + def is_empty(self) -> bool: + return len(self._bitmap) == 0 + + def get_cardinality(self) -> int: + """ + Returns the number of distinct integers added to the DeletionVector. + + Returns: + The number of deleted positions. + """ + return len(self._bitmap) + + def merge(self, deletion_vector: DeletionVector) -> None: + """ + Merge another DeletionVector to this current one. + + Args: + deletion_vector: The other DeletionVector to merge. + """ + if isinstance(deletion_vector, BitmapDeletionVector): + self._bitmap |= deletion_vector._bitmap + else: + raise RuntimeError("Only instance with the same class type can be merged.") + + def serialize(self) -> bytes: + """ + Serializes the deletion vector to bytes. + + Returns: + The serialized bytes. + """ + # Serialize the bitmap + bitmap_bytes = self._bitmap.serialize() + + # Create the full data with magic number + magic_bytes = struct.pack('>I', self.MAGIC_NUMBER) + data = magic_bytes + bitmap_bytes + + # Calculate size and checksum + size = len(data) + checksum = self._calculate_checksum(data) + + # Pack: size (4 bytes) + data + checksum (4 bytes) + result = struct.pack('>I', size) + data + struct.pack('>I', checksum) + + return result + + @staticmethod + def deserialize_from_bytes(data: bytes) -> 'BitmapDeletionVector': + """ + Deserializes a BitmapDeletionVector from bytes. + + Args: + data: The serialized bytes (without magic number). + + Returns: + A BitmapDeletionVector instance. + """ + bitmap = BitMap.deserialize(data) + return BitmapDeletionVector(bitmap) + + def bit_map(self): + return self._bitmap + + def _check_position(self, position: int) -> None: + """ + Checks if the position is valid. + + Args: + position: The position to check. + + Raises: + ValueError: If the position exceeds the maximum value. + """ + if position > self.MAX_VALUE: + raise ValueError( + f"The file has too many rows, RoaringBitmap32 only supports files " + f"with row count not exceeding {self.MAX_VALUE}." + ) + + @staticmethod + def _calculate_checksum(data: bytes) -> int: + """ + Calculates CRC32 checksum for the given data. + + Args: + data: The data to calculate checksum for. + + Returns: + The CRC32 checksum as an unsigned 32-bit integer. + """ + return zlib.crc32(data) & 0xffffffff + + def __eq__(self, other): + if not isinstance(other, BitmapDeletionVector): + return False + return self._bitmap == other._bitmap + + def __hash__(self): + return hash(tuple(self._bitmap)) Review Comment: BitmapDeletionVector.hash: Returns hash(tuple(self._bitmap)). This is inefficient on large bitmaps as it iterates through all bits, and it's rarely used. Consider removing the hash function or replacing it with one based on CRC/hash of serialized bytes (if a hash is truly needed). ########## paimon-python/pypaimon/deletionvectors/apply_deletion_vector_reader.py: ########## @@ -0,0 +1,129 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import Optional + +import pyarrow +from pyarrow import RecordBatch + +from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader +from pypaimon.read.reader.iface.record_iterator import RecordIterator +from pypaimon.deletionvectors.deletion_vector import DeletionVector + +from pyroaring import BitMap + +from pypaimon.read.reader.iface.record_reader import RecordReader + + +class ApplyDeletionVectorReader(RecordBatchReader): + """ + A RecordReader which applies DeletionVector to filter records. + """ + + def __init__(self, reader: RecordReader, deletion_vector: DeletionVector): + """ + Initialize an ApplyDeletionVectorReader. + + Args: + reader: The underlying record reader. + deletion_vector: The deletion vector to apply. + """ + self._reader = reader + self._deletion_vector = deletion_vector + + def reader(self) -> RecordReader: + return self._reader + + def deletion_vector(self) -> DeletionVector: + return self._deletion_vector + + def read_arrow_batch(self) -> Optional[RecordBatch]: + self._reader: RecordBatchReader + arrow_batch = self._reader.read_arrow_batch() + if arrow_batch is None: + return None + # Remove the deleted rows from the batch + range_bitmap = BitMap( Review Comment: `apply_deletion_vector_reader.ApplyDeletionVectorReader.read_arrow_batch` uses `BitMap(range(batch_start, batch_end))` and then performs a set difference operation to obtain the non-deleted positions. When the batch is very large (or the number of batches is very large), creating such a large range BitMap consumes a significant amount of memory/CPU. However, the actual deletion bitmap is usually sparse, so iterating through the deletion bitmap should be prioritized. Impact: Performance degradation and OOM risk, especially in large tables or long intervals. Suggested fix: Instead of generating a BitMap representing the entire batch, directly filter the positions within the current batch range from the deletion vector's bitmap (which is usually sparse and more efficient). `pyroaring.BitMap` is iterable, so it can be iterated bit by bit and filtered. ########## paimon-python/pypaimon/deletionvectors/apply_deletion_vector_reader.py: ########## @@ -0,0 +1,129 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import Optional + +import pyarrow +from pyarrow import RecordBatch + +from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader +from pypaimon.read.reader.iface.record_iterator import RecordIterator +from pypaimon.deletionvectors.deletion_vector import DeletionVector + +from pyroaring import BitMap + +from pypaimon.read.reader.iface.record_reader import RecordReader + + +class ApplyDeletionVectorReader(RecordBatchReader): + """ + A RecordReader which applies DeletionVector to filter records. + """ + + def __init__(self, reader: RecordReader, deletion_vector: DeletionVector): + """ + Initialize an ApplyDeletionVectorReader. + + Args: + reader: The underlying record reader. + deletion_vector: The deletion vector to apply. + """ + self._reader = reader + self._deletion_vector = deletion_vector + + def reader(self) -> RecordReader: + return self._reader + + def deletion_vector(self) -> DeletionVector: + return self._deletion_vector + + def read_arrow_batch(self) -> Optional[RecordBatch]: + self._reader: RecordBatchReader + arrow_batch = self._reader.read_arrow_batch() + if arrow_batch is None: + return None + # Remove the deleted rows from the batch + range_bitmap = BitMap( + range(self._reader.return_batch_pos() - arrow_batch.num_rows, self._reader.return_batch_pos())) + intersection_bitmap = range_bitmap - self._deletion_vector.bit_map() + added_row_list = [x - (self._reader.return_batch_pos() - arrow_batch.num_rows) for x in + list(intersection_bitmap)] + return arrow_batch.take(pyarrow.array(added_row_list, type=pyarrow.int32())) + + def read_batch(self) -> Optional[RecordIterator]: + """ + Reads one batch with deletion vector applied. + + Returns: + A RecordIterator with deletion filtering, or None if no more data. + """ + batch = self._reader.read_batch() + + if batch is None: + return None + + return ApplyDeletionRecordIterator(batch, self._deletion_vector) + + def close(self): + self._reader.close() + + +class ApplyDeletionRecordIterator(RecordIterator): Review Comment: Problem: The `ApplyDeletionRecordIterator`, `RowPositionRecordIterator`, and `KeyValueWrapIterator` classes are inconsistent in how they return the position (`return_pos`): some classes use the name `returned_position`, while others use `return_pos`. This can easily lead to `TypeError` / `AttributeError` at runtime, and makes it difficult to guarantee that wrappers (such as `ApplyDeletionVectorReader`, `RowPositionReader`) will work correctly in all scenarios. Suggestion: Implement compatibility handling in all wrappers/adapters for cases where the object might be a Python iterator or a custom `RecordIterator`. The best practice is to require `RecordIterator` to implement `next` (to achieve fundamental consistency), but if it's not possible to modify all implementations at once, use a compatibility layer (checking `hasattr(next)` / `hasattr(next)`, etc.) to avoid crashes. Examples: ``` # pypaimon/read/reader/iface/record_iterator.py from abc import ABC, abstractmethod from typing import Generic, Optional, TypeVar, Iterator T = TypeVar("T") class RecordIterator(Iterator[T], Generic[T], ABC): """ Record iterator: must implement Python iterator protocol and provide return_pos(). Implementations should: - implement __next__() returning Optional[T] (or raise StopIteration) - implement return_pos() returning current position index for the last returned element """ def __iter__(self): return self @abstractmethod def __next__(self) -> T: """Return next record or raise StopIteration""" ... # backward-compat helper for code that used next() def next(self) -> Optional[T]: try: return self.__next__() except StopIteration: return None @abstractmethod def return_pos(self) -> int: """Return current position of the iterator (index of the last returned element).""" ... ``` ``` class RowPositionRecordIterator(RecordIterator[tuple]): def __init__(self): self.reused_iterator: Optional[Iterator[tuple]] = None self.pos = -1 def __next__(self) -> tuple: if self.reused_iterator is None: raise StopIteration row_tuple = next(self.reused_iterator, None) if row_tuple is None: raise StopIteration self.pos += 1 return row_tuple def next(self) -> Optional[tuple]: try: return self.__next__() except StopIteration: return None def return_pos(self) -> int: return self.pos ``` Then update all custom implementations (e.g., RowPositionRecordIterator, InternalRowWrapperIterator, etc.): Implement `next`, and maintain the position counter `self.pos` within `next` (the logic of the original `next` method is moved into the new `next`), and retain `next()` as a compatibility alias (as above). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
