This is an automated email from the ASF dual-hosted git repository.

bolke pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9a921c2fc6 Add support for serialization of iceberg tables (#35456)
9a921c2fc6 is described below

commit 9a921c2fc61a127de5507bc1ea2acbab8c4a1e82
Author: Bolke de Bruin <[email protected]>
AuthorDate: Sun Nov 5 20:27:07 2023 +0100

    Add support for serialization of iceberg tables (#35456)
    
    This adds lightweight support for serialization of
    Apache Iceberg tables. This means that references
    are captured and tables are re-instantiated with their
    catalog information.
---
 airflow/serialization/serializers/iceberg.py       | 76 ++++++++++++++++++++++
 setup.py                                           |  5 ++
 .../serialization/serializers/test_serializers.py  | 23 +++++++
 3 files changed, 104 insertions(+)

diff --git a/airflow/serialization/serializers/iceberg.py 
b/airflow/serialization/serializers/iceberg.py
new file mode 100644
index 0000000000..9498ce0be0
--- /dev/null
+++ b/airflow/serialization/serializers/iceberg.py
@@ -0,0 +1,76 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from airflow.utils.module_loading import qualname
+
+serializers = ["pyiceberg.table.Table"]
+deserializers = serializers
+stringifiers = serializers
+
+if TYPE_CHECKING:
+    from airflow.serialization.serde import U
+
+__version__ = 1
+
+
+def serialize(o: object) -> tuple[U, str, int, bool]:
+    from pyiceberg.table import Table
+
+    if not isinstance(o, Table):
+        return "", "", 0, False
+
+    from airflow.models.crypto import get_fernet
+
+    # we encrypt the catalog information here until we have
+    # global catalog management in airflow and the properties
+    # can have sensitive information
+    fernet = get_fernet()
+    properties = {}
+    for k, v in o.catalog.properties.items():
+        properties[k] = fernet.encrypt(v.encode("utf-8")).decode("utf-8")
+
+    data = {
+        "identifier": o.identifier,
+        "catalog_properties": properties,
+    }
+
+    return data, qualname(o), __version__, True
+
+
+def deserialize(classname: str, version: int, data: dict):
+    from pyiceberg.catalog import load_catalog
+    from pyiceberg.table import Table
+
+    from airflow.models.crypto import get_fernet
+
+    if version > __version__:
+        raise TypeError("serialized version is newer than class version")
+
+    if classname == qualname(Table):
+        fernet = get_fernet()
+        properties = {}
+        for k, v in data["catalog_properties"].items():
+            properties[k] = fernet.decrypt(v.encode("utf-8")).decode("utf-8")
+
+        catalog = load_catalog(data["identifier"][0], **properties)
+        return catalog.load_table((data["identifier"][1], 
data["identifier"][2]))
+
+    raise TypeError(f"do not know how to deserialize {classname}")
diff --git a/setup.py b/setup.py
index 2183e09e94..9710aa2d79 100644
--- a/setup.py
+++ b/setup.py
@@ -453,6 +453,10 @@ _devel_only_mongo = [
     "mongomock",
 ]
 
+_devel_only_iceberg = [
+    "pyiceberg>=0.5.0",
+]
+
 _devel_only_sentry = [
     "blinker",
 ]
@@ -492,6 +496,7 @@ devel_only = [
     *_devel_only_devscripts,
     *_devel_only_duckdb,
     *_devel_only_mongo,
+    *_devel_only_iceberg,
     *_devel_only_sentry,
     *_devel_only_static_checks,
     *_devel_only_tests,
diff --git a/tests/serialization/serializers/test_serializers.py 
b/tests/serialization/serializers/test_serializers.py
index 2c9c94e5c8..b82438f6e5 100644
--- a/tests/serialization/serializers/test_serializers.py
+++ b/tests/serialization/serializers/test_serializers.py
@@ -18,12 +18,16 @@ from __future__ import annotations
 
 import datetime
 import decimal
+from unittest.mock import patch
 
 import numpy as np
 import pendulum.tz
 import pytest
 from dateutil.tz import tzutc
 from pendulum import DateTime
+from pyiceberg.catalog import Catalog
+from pyiceberg.io import FileIO
+from pyiceberg.table import Table
 
 from airflow import PY39
 from airflow.models.param import Param, ParamsDict
@@ -175,3 +179,22 @@ class TestSerializers:
         e = serialize(i)
         d = deserialize(e)
         assert i.equals(d)
+
+    @patch.object(Catalog, "__abstractmethods__", set())
+    @patch.object(FileIO, "__abstractmethods__", set())
+    @patch("pyiceberg.catalog.Catalog.load_table")
+    @patch("pyiceberg.catalog.load_catalog")
+    def test_iceberg(self, mock_load_catalog, mock_load_table):
+        uri = "http://rest.no.where";
+        catalog = Catalog("catalog", uri=uri)
+        identifier = ("catalog", "schema", "table")
+        mock_load_catalog.return_value = catalog
+
+        i = Table(identifier, "bar", catalog=catalog, metadata_location="", 
io=FileIO())
+        mock_load_table.return_value = i
+
+        e = serialize(i)
+        d = deserialize(e)
+        assert i == d
+        mock_load_catalog.assert_called_with("catalog", uri=uri)
+        mock_load_table.assert_called_with((identifier[1], identifier[2]))

Reply via email to