This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new c6b64e6536 Python: Add gzip metadata support (#7984)
c6b64e6536 is described below

commit c6b64e6536ce63db0cdff6e27f666a4af8c13d54
Author: Fokko Driesprong <[email protected]>
AuthorDate: Wed Jul 5 09:39:30 2023 +0200

    Python: Add gzip metadata support (#7984)
    
    * Python: Add gzip metadata support
    
    Resolves #7977
    
    * Remove `get` verb
---
 python/pyiceberg/serializers.py | 70 ++++++++++++++++++++++++++++++++++++++---
 python/tests/conftest.py        |  8 +++++
 python/tests/table/test_init.py | 14 +++++----
 3 files changed, 81 insertions(+), 11 deletions(-)

diff --git a/python/pyiceberg/serializers.py b/python/pyiceberg/serializers.py
index 1337f9a2e0..6341c149b2 100644
--- a/python/pyiceberg/serializers.py
+++ b/python/pyiceberg/serializers.py
@@ -14,27 +14,83 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+from __future__ import annotations
 
 import codecs
+import gzip
 import json
+from abc import ABC, abstractmethod
+from typing import Callable
 
 from pyiceberg.io import InputFile, InputStream, OutputFile
 from pyiceberg.table.metadata import TableMetadata, TableMetadataUtil
 
+GZIP = "gzip"
+
+
+class Compressor(ABC):
+    @staticmethod
+    def get_compressor(location: str) -> Compressor:
+        return GzipCompressor() if location.endswith(".gz.metadata.json") else 
NOOP_COMPRESSOR
+
+    @abstractmethod
+    def stream_decompressor(self, inp: InputStream) -> InputStream:
+        """Returns a stream decompressor.
+
+        Args:
+            inp: The input stream that needs decompressing.
+
+        Returns:
+            The wrapped stream
+        """
+
+    @abstractmethod
+    def bytes_compressor(self) -> Callable[[bytes], bytes]:
+        """Returns a function to compress bytes.
+
+        Returns:
+            A function that can be used to compress bytes.
+        """
+
+
+class NoopCompressor(Compressor):
+    def stream_decompressor(self, inp: InputStream) -> InputStream:
+        return inp
+
+    def bytes_compressor(self) -> Callable[[bytes], bytes]:
+        return lambda b: b
+
+
+NOOP_COMPRESSOR = NoopCompressor()
+
+
+class GzipCompressor(Compressor):
+    def stream_decompressor(self, inp: InputStream) -> InputStream:
+        return gzip.open(inp)
+
+    def bytes_compressor(self) -> Callable[[bytes], bytes]:
+        return gzip.compress
+
 
 class FromByteStream:
     """A collection of methods that deserialize dictionaries into Iceberg 
objects."""
 
     @staticmethod
-    def table_metadata(byte_stream: InputStream, encoding: str = "utf-8") -> 
TableMetadata:
+    def table_metadata(
+        byte_stream: InputStream, encoding: str = "utf-8", compression: 
Compressor = NOOP_COMPRESSOR
+    ) -> TableMetadata:
         """Instantiate a TableMetadata object from a byte stream.
 
         Args:
             byte_stream: A file-like byte stream object.
             encoding (default "utf-8"): The byte encoder to use for the reader.
+            compression: Optional compression method
         """
-        reader = codecs.getreader(encoding)
-        metadata = json.load(reader(byte_stream))
+        with compression.stream_decompressor(byte_stream) as byte_stream:
+            reader = codecs.getreader(encoding)
+            json_bytes = reader(byte_stream)
+            metadata = json.load(json_bytes)
+
         return TableMetadataUtil.parse_obj(metadata)
 
 
@@ -54,7 +110,9 @@ class FromInputFile:
 
         """
         with input_file.open() as input_stream:
-            return FromByteStream.table_metadata(byte_stream=input_stream, 
encoding=encoding)
+            return FromByteStream.table_metadata(
+                byte_stream=input_stream, encoding=encoding, 
compression=Compressor.get_compressor(location=input_file.location)
+            )
 
 
 class ToOutputFile:
@@ -69,4 +127,6 @@ class ToOutputFile:
             overwrite (bool): Where to overwrite the file if it already 
exists. Defaults to `False`.
         """
         with output_file.create(overwrite=overwrite) as output_stream:
-            output_stream.write(metadata.json().encode("utf-8"))
+            json_bytes = metadata.json().encode("utf-8")
+            json_bytes = 
Compressor.get_compressor(output_file.location).bytes_compressor()(json_bytes)
+            output_stream.write(json_bytes)
diff --git a/python/tests/conftest.py b/python/tests/conftest.py
index 55588f60f9..e5d0860489 100644
--- a/python/tests/conftest.py
+++ b/python/tests/conftest.py
@@ -336,6 +336,14 @@ def metadata_location(tmp_path_factory: 
pytest.TempPathFactory) -> str:
     return metadata_location
 
 
[email protected](scope="session")
+def metadata_location_gz(tmp_path_factory: pytest.TempPathFactory) -> str:
+    metadata_location = str(tmp_path_factory.mktemp("metadata") / 
f"{uuid.uuid4()}.gz.metadata.json")
+    metadata = TableMetadataV2(**EXAMPLE_TABLE_METADATA_V2)
+    ToOutputFile.table_metadata(metadata, 
PyArrowFileIO().new_output(location=metadata_location), overwrite=True)
+    return metadata_location
+
+
 manifest_entry_records = [
     {
         "status": 1,
diff --git a/python/tests/table/test_init.py b/python/tests/table/test_init.py
index b421f81493..8e42cc6cf8 100644
--- a/python/tests/table/test_init.py
+++ b/python/tests/table/test_init.py
@@ -67,11 +67,6 @@ def table(example_table_metadata_v2: Dict[str, Any]) -> 
Table:
     )
 
 
[email protected]
-def static_table(metadata_location: str) -> StaticTable:
-    return StaticTable.from_metadata(metadata_location)
-
-
 def test_schema(table: Table) -> None:
     assert table.schema() == Schema(
         NestedField(field_id=1, name="x", field_type=LongType(), 
required=True),
@@ -260,7 +255,14 @@ def test_table_scan_projection_unknown_column(table: 
Table) -> None:
     assert "Could not find column: 'a'" in str(exc_info.value)
 
 
-def test_static_table_same_as_table(table: Table, static_table: StaticTable) 
-> None:
+def test_static_table_same_as_table(table: Table, metadata_location: str) -> 
None:
+    static_table = StaticTable.from_metadata(metadata_location)
+    assert isinstance(static_table, Table)
+    assert static_table.metadata == table.metadata
+
+
+def test_static_table_gz_same_as_table(table: Table, metadata_location_gz: 
str) -> None:
+    static_table = StaticTable.from_metadata(metadata_location_gz)
     assert isinstance(static_table, Table)
     assert static_table.metadata == table.metadata
 

Reply via email to