amogh-jahagirdar commented on code in PR #3478:
URL: https://github.com/apache/iceberg-python/pull/3478#discussion_r3408955205
##########
pyiceberg/table/puffin.py:
##########
@@ -30,6 +32,12 @@
EMPTY_BITMAP = FrozenBitMap()
MAX_JAVA_SIGNED = int(math.pow(2, 31)) - 1
PROPERTY_REFERENCED_DATA_FILE = "referenced-data-file"
+_DV_BLOB_LENGTH = struct.Struct(">I")
+_DV_BLOB_MAGIC = struct.Struct("<I")
+_DV_BLOB_CRC = struct.Struct(">I")
+_DV_BLOB_MAGIC_NUMBER = 1681511377
+_ROARING_BITMAP_COUNT_SIZE_BYTES = 8
+_DV_BLOB_MIN_SIZE_BYTES = _DV_BLOB_LENGTH.size + _DV_BLOB_MAGIC.size +
_ROARING_BITMAP_COUNT_SIZE_BYTES + _DV_BLOB_CRC.size
Review Comment:
I think we'd want to decouple the DV specific parts into a separate module,
table/deletion_vector.py? That would expose something like
`read_deletion_vector(io, file) -> [bytes]` and that handles deserialization
and delegates to the io to do the range read.
##########
pyiceberg/io/pyarrow.py:
##########
@@ -1139,6 +1162,22 @@ def _read_deletes(io: FileIO, data_file: DataFile) ->
dict[str, pa.ChunkedArray]
}
elif data_file.file_format == FileFormat.PUFFIN:
with io.new_input(data_file.file_path).open() as fi:
+ content_offset = getattr(data_file, "content_offset", None)
+ content_size_in_bytes = getattr(data_file,
"content_size_in_bytes", None)
+ if content_offset is not None or content_size_in_bytes is not None:
+ # A DV is declared as PUFFIN in the manifest, but the content
range points directly
+ # to the serialized bitmap blob, so avoid parsing the entire
file as a Puffin file.
+ content_offset, content_size_in_bytes, referenced_data_file =
_validate_deletion_vector(data_file)
+
+ fi.seek(content_offset)
+ payload = fi.read(content_size_in_bytes)
+ if len(payload) != content_size_in_bytes:
+ raise ValueError(
+ f"Could not read deletion vector, expected
{content_size_in_bytes} bytes, got {len(payload)}"
+ )
+ bitmaps = _deserialize_dv_blob(payload, data_file.record_count)
+ return {referenced_data_file:
_bitmaps_to_chunked_array(bitmaps)}
Review Comment:
Same as below, I think if we abstract away a read_deletion_vector(io,
delete_file) -> bytes API it'll be a bit cleaner. That takes care of all the
offset handling, io, deserialization, validation etc .
##########
pyiceberg/io/pyarrow.py:
##########
@@ -1139,6 +1162,22 @@ def _read_deletes(io: FileIO, data_file: DataFile) ->
dict[str, pa.ChunkedArray]
}
elif data_file.file_format == FileFormat.PUFFIN:
with io.new_input(data_file.file_path).open() as fi:
+ content_offset = getattr(data_file, "content_offset", None)
+ content_size_in_bytes = getattr(data_file,
"content_size_in_bytes", None)
+ if content_offset is not None or content_size_in_bytes is not None:
Review Comment:
Correct @ebyhr , these should always exist. so I think we can do a stricter
read on these fields.
##########
pyiceberg/io/pyarrow.py:
##########
@@ -1116,6 +1118,27 @@ def _get_file_format(file_format: FileFormat, **kwargs:
dict[str, Any]) -> ds.Fi
raise ValueError(f"Unsupported file format: {file_format}")
+def _validate_deletion_vector(data_file: DataFile) -> tuple[int, int, str]:
Review Comment:
I think this would be a helper in the deletion_vector module I mentioned
below that I think we should introduce. This class looks like it's kinda become
a bit of a kitchen sink of different things but the original intent I think was
for fileIO and so I don't think we would want to have the low level DV
validation here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]