This is an automated email from the ASF dual-hosted git repository.
bolke pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7d0afbdfb2 Add lightweight serialization for deltalake tables (#35462)
7d0afbdfb2 is described below
commit 7d0afbdfb28edbeb6d0fcb344084e561192b6057
Author: Bolke de Bruin <[email protected]>
AuthorDate: Sun Nov 5 23:36:36 2023 +0100
Add lightweight serialization for deltalake tables (#35462)
---
airflow/serialization/serializers/deltalake.py | 79 ++++++++++++++++++++++
setup.py | 5 ++
.../serialization/serializers/test_serializers.py | 24 +++++++
3 files changed, 108 insertions(+)
diff --git a/airflow/serialization/serializers/deltalake.py
b/airflow/serialization/serializers/deltalake.py
new file mode 100644
index 0000000000..60456baf80
--- /dev/null
+++ b/airflow/serialization/serializers/deltalake.py
@@ -0,0 +1,79 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from airflow.utils.module_loading import qualname
+
+serializers = ["deltalake.table.DeltaTable"]
+deserializers = serializers
+stringifiers = serializers
+
+if TYPE_CHECKING:
+ from airflow.serialization.serde import U
+
+__version__ = 1
+
+
+def serialize(o: object) -> tuple[U, str, int, bool]:
+ from deltalake.table import DeltaTable
+
+ if not isinstance(o, DeltaTable):
+ return "", "", 0, False
+
+ from airflow.models.crypto import get_fernet
+
+ # we encrypt the information here until we have as part of the
+ # storage options can have sensitive information
+ fernet = get_fernet()
+ properties: dict = {}
+ for k, v in o._storage_options.items() if o._storage_options else {}:
+ properties[k] = fernet.encrypt(v.encode("utf-8")).decode("utf-8")
+
+ data = {
+ "table_uri": o.table_uri,
+ "version": o.version(),
+ "storage_options": properties,
+ }
+
+ return data, qualname(o), __version__, True
+
+
+def deserialize(classname: str, version: int, data: dict):
+ from deltalake.table import DeltaTable
+
+ from airflow.models.crypto import get_fernet
+
+ if version > __version__:
+ raise TypeError("serialized version is newer than class version")
+
+ if classname == qualname(DeltaTable):
+ fernet = get_fernet()
+ properties = {}
+ for k, v in data["storage_options"].items():
+ properties[k] = fernet.decrypt(v.encode("utf-8")).decode("utf-8")
+
+ if len(properties) == 0:
+ storage_options = None
+ else:
+ storage_options = properties
+
+ return DeltaTable(data["table_uri"], version=data["version"],
storage_options=storage_options)
+
+ raise TypeError(f"do not know how to deserialize {classname}")
diff --git a/setup.py b/setup.py
index 9710aa2d79..8a3abf52ec 100644
--- a/setup.py
+++ b/setup.py
@@ -432,6 +432,10 @@ _devel_only_debuggers = [
"ipdb",
]
+_devel_only_deltalake = [
+ "deltalake>=0.12.0",
+]
+
_devel_only_devscripts = [
"click>=8.0",
"gitpython",
@@ -493,6 +497,7 @@ devel_only = [
*_devel_only_azure,
*_devel_only_breeze,
*_devel_only_debuggers,
+ *_devel_only_deltalake,
*_devel_only_devscripts,
*_devel_only_duckdb,
*_devel_only_mongo,
diff --git a/tests/serialization/serializers/test_serializers.py
b/tests/serialization/serializers/test_serializers.py
index b82438f6e5..aab9f91395 100644
--- a/tests/serialization/serializers/test_serializers.py
+++ b/tests/serialization/serializers/test_serializers.py
@@ -24,6 +24,7 @@ import numpy as np
import pendulum.tz
import pytest
from dateutil.tz import tzutc
+from deltalake import DeltaTable
from pendulum import DateTime
from pyiceberg.catalog import Catalog
from pyiceberg.io import FileIO
@@ -198,3 +199,26 @@ class TestSerializers:
assert i == d
mock_load_catalog.assert_called_with("catalog", uri=uri)
mock_load_table.assert_called_with((identifier[1], identifier[2]))
+
+ @patch("deltalake.table.Metadata")
+ @patch("deltalake.table.RawDeltaTable")
+ @patch.object(DeltaTable, "version", return_value=0)
+ @patch.object(DeltaTable, "table_uri", new_callable=lambda:
"/tmp/bucket/path")
+ def test_deltalake(self, mock_table_uri, mock_version, mock_deltalake,
mock_metadata):
+ uri = "/tmp/bucket/path"
+
+ i = DeltaTable(uri, storage_options={"key": "value"})
+
+ e = serialize(i)
+ d = deserialize(e)
+ assert i.table_uri == d.table_uri
+ assert i.version() == d.version()
+ assert i._storage_options == d._storage_options
+
+ i = DeltaTable(uri)
+ e = serialize(i)
+ d = deserialize(e)
+ assert i.table_uri == d.table_uri
+ assert i.version() == d.version()
+ assert i._storage_options == d._storage_options
+ assert d._storage_options is None