This is an automated email from the ASF dual-hosted git repository.
bolke pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9a921c2fc6 Add support for serialization of iceberg tables (#35456)
9a921c2fc6 is described below
commit 9a921c2fc61a127de5507bc1ea2acbab8c4a1e82
Author: Bolke de Bruin <[email protected]>
AuthorDate: Sun Nov 5 20:27:07 2023 +0100
Add support for serialization of iceberg tables (#35456)
This adds lightweight support for serialization of
Apache Iceberg tables. This means that references
are captured and tables are re-instantiated with their
catalog information.
---
airflow/serialization/serializers/iceberg.py | 76 ++++++++++++++++++++++
setup.py | 5 ++
.../serialization/serializers/test_serializers.py | 23 +++++++
3 files changed, 104 insertions(+)
diff --git a/airflow/serialization/serializers/iceberg.py
b/airflow/serialization/serializers/iceberg.py
new file mode 100644
index 0000000000..9498ce0be0
--- /dev/null
+++ b/airflow/serialization/serializers/iceberg.py
@@ -0,0 +1,76 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from airflow.utils.module_loading import qualname
+
+serializers = ["pyiceberg.table.Table"]
+deserializers = serializers
+stringifiers = serializers
+
+if TYPE_CHECKING:
+ from airflow.serialization.serde import U
+
+__version__ = 1
+
+
+def serialize(o: object) -> tuple[U, str, int, bool]:
+ from pyiceberg.table import Table
+
+ if not isinstance(o, Table):
+ return "", "", 0, False
+
+ from airflow.models.crypto import get_fernet
+
+ # we encrypt the catalog information here until we have
+ # global catalog management in airflow and the properties
+ # can have sensitive information
+ fernet = get_fernet()
+ properties = {}
+ for k, v in o.catalog.properties.items():
+ properties[k] = fernet.encrypt(v.encode("utf-8")).decode("utf-8")
+
+ data = {
+ "identifier": o.identifier,
+ "catalog_properties": properties,
+ }
+
+ return data, qualname(o), __version__, True
+
+
+def deserialize(classname: str, version: int, data: dict):
+ from pyiceberg.catalog import load_catalog
+ from pyiceberg.table import Table
+
+ from airflow.models.crypto import get_fernet
+
+ if version > __version__:
+ raise TypeError("serialized version is newer than class version")
+
+ if classname == qualname(Table):
+ fernet = get_fernet()
+ properties = {}
+ for k, v in data["catalog_properties"].items():
+ properties[k] = fernet.decrypt(v.encode("utf-8")).decode("utf-8")
+
+ catalog = load_catalog(data["identifier"][0], **properties)
+ return catalog.load_table((data["identifier"][1],
data["identifier"][2]))
+
+ raise TypeError(f"do not know how to deserialize {classname}")
diff --git a/setup.py b/setup.py
index 2183e09e94..9710aa2d79 100644
--- a/setup.py
+++ b/setup.py
@@ -453,6 +453,10 @@ _devel_only_mongo = [
"mongomock",
]
+_devel_only_iceberg = [
+ "pyiceberg>=0.5.0",
+]
+
_devel_only_sentry = [
"blinker",
]
@@ -492,6 +496,7 @@ devel_only = [
*_devel_only_devscripts,
*_devel_only_duckdb,
*_devel_only_mongo,
+ *_devel_only_iceberg,
*_devel_only_sentry,
*_devel_only_static_checks,
*_devel_only_tests,
diff --git a/tests/serialization/serializers/test_serializers.py
b/tests/serialization/serializers/test_serializers.py
index 2c9c94e5c8..b82438f6e5 100644
--- a/tests/serialization/serializers/test_serializers.py
+++ b/tests/serialization/serializers/test_serializers.py
@@ -18,12 +18,16 @@ from __future__ import annotations
import datetime
import decimal
+from unittest.mock import patch
import numpy as np
import pendulum.tz
import pytest
from dateutil.tz import tzutc
from pendulum import DateTime
+from pyiceberg.catalog import Catalog
+from pyiceberg.io import FileIO
+from pyiceberg.table import Table
from airflow import PY39
from airflow.models.param import Param, ParamsDict
@@ -175,3 +179,22 @@ class TestSerializers:
e = serialize(i)
d = deserialize(e)
assert i.equals(d)
+
+ @patch.object(Catalog, "__abstractmethods__", set())
+ @patch.object(FileIO, "__abstractmethods__", set())
+ @patch("pyiceberg.catalog.Catalog.load_table")
+ @patch("pyiceberg.catalog.load_catalog")
+ def test_iceberg(self, mock_load_catalog, mock_load_table):
+ uri = "http://rest.no.where"
+ catalog = Catalog("catalog", uri=uri)
+ identifier = ("catalog", "schema", "table")
+ mock_load_catalog.return_value = catalog
+
+ i = Table(identifier, "bar", catalog=catalog, metadata_location="",
io=FileIO())
+ mock_load_table.return_value = i
+
+ e = serialize(i)
+ d = deserialize(e)
+ assert i == d
+ mock_load_catalog.assert_called_with("catalog", uri=uri)
+ mock_load_table.assert_called_with((identifier[1], identifier[2]))