This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new c6b64e6536 Python: Add gzip metadata support (#7984)
c6b64e6536 is described below
commit c6b64e6536ce63db0cdff6e27f666a4af8c13d54
Author: Fokko Driesprong <[email protected]>
AuthorDate: Wed Jul 5 09:39:30 2023 +0200
Python: Add gzip metadata support (#7984)
* Python: Add gzip metadata support
Resolves #7977
* Remove `get` verb
---
python/pyiceberg/serializers.py | 70 ++++++++++++++++++++++++++++++++++++++---
python/tests/conftest.py | 8 +++++
python/tests/table/test_init.py | 14 +++++----
3 files changed, 81 insertions(+), 11 deletions(-)
diff --git a/python/pyiceberg/serializers.py b/python/pyiceberg/serializers.py
index 1337f9a2e0..6341c149b2 100644
--- a/python/pyiceberg/serializers.py
+++ b/python/pyiceberg/serializers.py
@@ -14,27 +14,83 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+from __future__ import annotations
import codecs
+import gzip
import json
+from abc import ABC, abstractmethod
+from typing import Callable
from pyiceberg.io import InputFile, InputStream, OutputFile
from pyiceberg.table.metadata import TableMetadata, TableMetadataUtil
+GZIP = "gzip"
+
+
+class Compressor(ABC):
+ @staticmethod
+ def get_compressor(location: str) -> Compressor:
+ return GzipCompressor() if location.endswith(".gz.metadata.json") else
NOOP_COMPRESSOR
+
+ @abstractmethod
+ def stream_decompressor(self, inp: InputStream) -> InputStream:
+ """Returns a stream decompressor.
+
+ Args:
+ inp: The input stream that needs decompressing.
+
+ Returns:
+ The wrapped stream
+ """
+
+ @abstractmethod
+ def bytes_compressor(self) -> Callable[[bytes], bytes]:
+ """Returns a function to compress bytes.
+
+ Returns:
+ A function that can be used to compress bytes.
+ """
+
+
+class NoopCompressor(Compressor):
+ def stream_decompressor(self, inp: InputStream) -> InputStream:
+ return inp
+
+ def bytes_compressor(self) -> Callable[[bytes], bytes]:
+ return lambda b: b
+
+
+NOOP_COMPRESSOR = NoopCompressor()
+
+
+class GzipCompressor(Compressor):
+ def stream_decompressor(self, inp: InputStream) -> InputStream:
+ return gzip.open(inp)
+
+ def bytes_compressor(self) -> Callable[[bytes], bytes]:
+ return gzip.compress
+
class FromByteStream:
"""A collection of methods that deserialize dictionaries into Iceberg
objects."""
@staticmethod
- def table_metadata(byte_stream: InputStream, encoding: str = "utf-8") ->
TableMetadata:
+ def table_metadata(
+ byte_stream: InputStream, encoding: str = "utf-8", compression:
Compressor = NOOP_COMPRESSOR
+ ) -> TableMetadata:
"""Instantiate a TableMetadata object from a byte stream.
Args:
byte_stream: A file-like byte stream object.
encoding (default "utf-8"): The byte encoder to use for the reader.
+ compression: Optional compression method
"""
- reader = codecs.getreader(encoding)
- metadata = json.load(reader(byte_stream))
+ with compression.stream_decompressor(byte_stream) as byte_stream:
+ reader = codecs.getreader(encoding)
+ json_bytes = reader(byte_stream)
+ metadata = json.load(json_bytes)
+
return TableMetadataUtil.parse_obj(metadata)
@@ -54,7 +110,9 @@ class FromInputFile:
"""
with input_file.open() as input_stream:
- return FromByteStream.table_metadata(byte_stream=input_stream,
encoding=encoding)
+ return FromByteStream.table_metadata(
+ byte_stream=input_stream, encoding=encoding,
compression=Compressor.get_compressor(location=input_file.location)
+ )
class ToOutputFile:
@@ -69,4 +127,6 @@ class ToOutputFile:
overwrite (bool): Where to overwrite the file if it already
exists. Defaults to `False`.
"""
with output_file.create(overwrite=overwrite) as output_stream:
- output_stream.write(metadata.json().encode("utf-8"))
+ json_bytes = metadata.json().encode("utf-8")
+ json_bytes =
Compressor.get_compressor(output_file.location).bytes_compressor()(json_bytes)
+ output_stream.write(json_bytes)
diff --git a/python/tests/conftest.py b/python/tests/conftest.py
index 55588f60f9..e5d0860489 100644
--- a/python/tests/conftest.py
+++ b/python/tests/conftest.py
@@ -336,6 +336,14 @@ def metadata_location(tmp_path_factory:
pytest.TempPathFactory) -> str:
return metadata_location
[email protected](scope="session")
+def metadata_location_gz(tmp_path_factory: pytest.TempPathFactory) -> str:
+ metadata_location = str(tmp_path_factory.mktemp("metadata") /
f"{uuid.uuid4()}.gz.metadata.json")
+ metadata = TableMetadataV2(**EXAMPLE_TABLE_METADATA_V2)
+ ToOutputFile.table_metadata(metadata,
PyArrowFileIO().new_output(location=metadata_location), overwrite=True)
+ return metadata_location
+
+
manifest_entry_records = [
{
"status": 1,
diff --git a/python/tests/table/test_init.py b/python/tests/table/test_init.py
index b421f81493..8e42cc6cf8 100644
--- a/python/tests/table/test_init.py
+++ b/python/tests/table/test_init.py
@@ -67,11 +67,6 @@ def table(example_table_metadata_v2: Dict[str, Any]) ->
Table:
)
[email protected]
-def static_table(metadata_location: str) -> StaticTable:
- return StaticTable.from_metadata(metadata_location)
-
-
def test_schema(table: Table) -> None:
assert table.schema() == Schema(
NestedField(field_id=1, name="x", field_type=LongType(),
required=True),
@@ -260,7 +255,14 @@ def test_table_scan_projection_unknown_column(table:
Table) -> None:
assert "Could not find column: 'a'" in str(exc_info.value)
-def test_static_table_same_as_table(table: Table, static_table: StaticTable)
-> None:
+def test_static_table_same_as_table(table: Table, metadata_location: str) ->
None:
+ static_table = StaticTable.from_metadata(metadata_location)
+ assert isinstance(static_table, Table)
+ assert static_table.metadata == table.metadata
+
+
+def test_static_table_gz_same_as_table(table: Table, metadata_location_gz:
str) -> None:
+ static_table = StaticTable.from_metadata(metadata_location_gz)
assert isinstance(static_table, Table)
assert static_table.metadata == table.metadata