This is an automated email from the ASF dual-hosted git repository.
onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9e159fc48d Add OpenLineage support to S3Operators - Copy, Delete and
Create Object (#35796)
9e159fc48d is described below
commit 9e159fc48dd774aa09358801c17d6da217052f8a
Author: Kacper Muda <[email protected]>
AuthorDate: Wed Nov 22 19:51:55 2023 +0100
Add OpenLineage support to S3Operators - Copy, Delete and Create Object
(#35796)
---
airflow/providers/amazon/aws/operators/s3.py | 86 ++++++++++++++
tests/providers/amazon/aws/operators/test_s3.py | 146 ++++++++++++++++++++++++
2 files changed, 232 insertions(+)
diff --git a/airflow/providers/amazon/aws/operators/s3.py
b/airflow/providers/amazon/aws/operators/s3.py
index 956feddb4a..068f73e622 100644
--- a/airflow/providers/amazon/aws/operators/s3.py
+++ b/airflow/providers/amazon/aws/operators/s3.py
@@ -321,6 +321,33 @@ class S3CopyObjectOperator(BaseOperator):
self.acl_policy,
)
+ def get_openlineage_facets_on_start(self):
+ from openlineage.client.run import Dataset
+
+ from airflow.providers.openlineage.extractors import OperatorLineage
+
+ dest_bucket_name, dest_bucket_key = S3Hook.get_s3_bucket_key(
+ self.dest_bucket_name, self.dest_bucket_key, "dest_bucket_name",
"dest_bucket_key"
+ )
+
+ source_bucket_name, source_bucket_key = S3Hook.get_s3_bucket_key(
+ self.source_bucket_name, self.source_bucket_key,
"source_bucket_name", "source_bucket_key"
+ )
+
+ input_dataset = Dataset(
+ namespace=f"s3://{source_bucket_name}",
+ name=source_bucket_key,
+ )
+ output_dataset = Dataset(
+ namespace=f"s3://{dest_bucket_name}",
+ name=dest_bucket_key,
+ )
+
+ return OperatorLineage(
+ inputs=[input_dataset],
+ outputs=[output_dataset],
+ )
+
class S3CreateObjectOperator(BaseOperator):
"""
@@ -409,6 +436,22 @@ class S3CreateObjectOperator(BaseOperator):
else:
s3_hook.load_bytes(self.data, s3_key, s3_bucket, self.replace,
self.encrypt, self.acl_policy)
+ def get_openlineage_facets_on_start(self):
+ from openlineage.client.run import Dataset
+
+ from airflow.providers.openlineage.extractors import OperatorLineage
+
+ bucket, key = S3Hook.get_s3_bucket_key(self.s3_bucket, self.s3_key,
"dest_bucket", "dest_key")
+
+ output_dataset = Dataset(
+ namespace=f"s3://{bucket}",
+ name=key,
+ )
+
+ return OperatorLineage(
+ outputs=[output_dataset],
+ )
+
class S3DeleteObjectsOperator(BaseOperator):
"""
@@ -462,6 +505,8 @@ class S3DeleteObjectsOperator(BaseOperator):
self.aws_conn_id = aws_conn_id
self.verify = verify
+ self._keys: str | list[str] = ""
+
if not exactly_one(prefix is None, keys is None):
raise AirflowException("Either keys or prefix should be set.")
@@ -476,6 +521,47 @@ class S3DeleteObjectsOperator(BaseOperator):
keys = self.keys or s3_hook.list_keys(bucket_name=self.bucket,
prefix=self.prefix)
if keys:
s3_hook.delete_objects(bucket=self.bucket, keys=keys)
+ self._keys = keys
+
+ def get_openlineage_facets_on_complete(self, task_instance):
+ """Implement _on_complete because object keys are resolved in
execute()."""
+ from openlineage.client.facet import (
+ LifecycleStateChange,
+ LifecycleStateChangeDatasetFacet,
+ LifecycleStateChangeDatasetFacetPreviousIdentifier,
+ )
+ from openlineage.client.run import Dataset
+
+ from airflow.providers.openlineage.extractors import OperatorLineage
+
+ if not self._keys:
+ return OperatorLineage()
+
+ keys = self._keys
+ if isinstance(keys, str):
+ keys = [keys]
+
+ bucket_url = f"s3://{self.bucket}"
+ input_datasets = [
+ Dataset(
+ namespace=bucket_url,
+ name=key,
+ facets={
+ "lifecycleStateChange": LifecycleStateChangeDatasetFacet(
+ lifecycleStateChange=LifecycleStateChange.DROP.value,
+
previousIdentifier=LifecycleStateChangeDatasetFacetPreviousIdentifier(
+ namespace=bucket_url,
+ name=key,
+ ),
+ )
+ },
+ )
+ for key in keys
+ ]
+
+ return OperatorLineage(
+ inputs=input_datasets,
+ )
class S3FileTransformOperator(BaseOperator):
diff --git a/tests/providers/amazon/aws/operators/test_s3.py
b/tests/providers/amazon/aws/operators/test_s3.py
index 3bfd238a97..80a4b645d4 100644
--- a/tests/providers/amazon/aws/operators/test_s3.py
+++ b/tests/providers/amazon/aws/operators/test_s3.py
@@ -28,6 +28,12 @@ from unittest import mock
import boto3
import pytest
from moto import mock_s3
+from openlineage.client.facet import (
+ LifecycleStateChange,
+ LifecycleStateChangeDatasetFacet,
+ LifecycleStateChangeDatasetFacetPreviousIdentifier,
+)
+from openlineage.client.run import Dataset
from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
@@ -44,6 +50,7 @@ from airflow.providers.amazon.aws.operators.s3 import (
S3ListPrefixesOperator,
S3PutBucketTaggingOperator,
)
+from airflow.providers.openlineage.extractors import OperatorLineage
BUCKET_NAME = os.environ.get("BUCKET_NAME", "test-airflow-bucket")
S3_KEY = "test-airflow-key"
@@ -409,6 +416,55 @@ class TestS3CopyObjectOperator:
# the object found should be consistent with dest_key specified earlier
assert objects_in_dest_bucket["Contents"][0]["Key"] == self.dest_key
+ def test_get_openlineage_facets_on_start_combination_1(self):
+ expected_input = Dataset(
+ namespace=f"s3://{self.source_bucket}",
+ name=self.source_key,
+ )
+ expected_output = Dataset(
+ namespace=f"s3://{self.dest_bucket}",
+ name=self.dest_key,
+ )
+
+ op = S3CopyObjectOperator(
+ task_id="test",
+ source_bucket_name=self.source_bucket,
+ source_bucket_key=self.source_key,
+ dest_bucket_name=self.dest_bucket,
+ dest_bucket_key=self.dest_key,
+ )
+
+ lineage = op.get_openlineage_facets_on_start()
+ assert len(lineage.inputs) == 1
+ assert len(lineage.outputs) == 1
+ assert lineage.inputs[0] == expected_input
+ assert lineage.outputs[0] == expected_output
+
+ def test_get_openlineage_facets_on_start_combination_2(self):
+ expected_input = Dataset(
+ namespace=f"s3://{self.source_bucket}",
+ name=self.source_key,
+ )
+ expected_output = Dataset(
+ namespace=f"s3://{self.dest_bucket}",
+ name=self.dest_key,
+ )
+
+ source_key_s3_url = f"s3://{self.source_bucket}/{self.source_key}"
+ dest_key_s3_url = f"s3://{self.dest_bucket}/{self.dest_key}"
+
+ op = S3CopyObjectOperator(
+ task_id="test",
+ source_bucket_key=source_key_s3_url,
+ dest_bucket_key=dest_key_s3_url,
+ )
+
+ lineage = op.get_openlineage_facets_on_start()
+ assert len(lineage.inputs) == 1
+ assert len(lineage.outputs) == 1
+ assert lineage.inputs[0] == expected_input
+ assert lineage.outputs[0] == expected_output
+
@mock_s3
class TestS3DeleteObjectsOperator:
@@ -575,6 +631,82 @@ class TestS3DeleteObjectsOperator:
# the object found should be consistent with dest_key specified earlier
assert objects_in_dest_bucket["Contents"][0]["Key"] == key_of_test
+ @pytest.mark.parametrize("keys", ("path/data.txt", ["path/data.txt"]))
+ @mock.patch("airflow.providers.amazon.aws.operators.s3.S3Hook")
+ def test_get_openlineage_facets_on_complete_single_object(self, mock_hook,
keys):
+ bucket = "testbucket"
+ expected_input = Dataset(
+ namespace=f"s3://{bucket}",
+ name="path/data.txt",
+ facets={
+ "lifecycleStateChange": LifecycleStateChangeDatasetFacet(
+ lifecycleStateChange=LifecycleStateChange.DROP.value,
+
previousIdentifier=LifecycleStateChangeDatasetFacetPreviousIdentifier(
+ namespace=f"s3://{bucket}",
+ name="path/data.txt",
+ ),
+ )
+ },
+ )
+
+ op =
S3DeleteObjectsOperator(task_id="test_task_s3_delete_single_object",
bucket=bucket, keys=keys)
+ op.execute(None)
+
+ lineage = op.get_openlineage_facets_on_complete(None)
+ assert len(lineage.inputs) == 1
+ assert lineage.inputs[0] == expected_input
+
+ @mock.patch("airflow.providers.amazon.aws.operators.s3.S3Hook")
+ def test_get_openlineage_facets_on_complete_multiple_objects(self,
mock_hook):
+ bucket = "testbucket"
+ keys = ["path/data1.txt", "path/data2.txt"]
+ expected_inputs = [
+ Dataset(
+ namespace=f"s3://{bucket}",
+ name="path/data1.txt",
+ facets={
+ "lifecycleStateChange": LifecycleStateChangeDatasetFacet(
+ lifecycleStateChange=LifecycleStateChange.DROP.value,
+
previousIdentifier=LifecycleStateChangeDatasetFacetPreviousIdentifier(
+ namespace=f"s3://{bucket}",
+ name="path/data1.txt",
+ ),
+ )
+ },
+ ),
+ Dataset(
+ namespace=f"s3://{bucket}",
+ name="path/data2.txt",
+ facets={
+ "lifecycleStateChange": LifecycleStateChangeDatasetFacet(
+ lifecycleStateChange=LifecycleStateChange.DROP.value,
+
previousIdentifier=LifecycleStateChangeDatasetFacetPreviousIdentifier(
+ namespace=f"s3://{bucket}",
+ name="path/data2.txt",
+ ),
+ )
+ },
+ ),
+ ]
+
+ op =
S3DeleteObjectsOperator(task_id="test_task_s3_delete_single_object",
bucket=bucket, keys=keys)
+ op.execute(None)
+
+ lineage = op.get_openlineage_facets_on_complete(None)
+ assert len(lineage.inputs) == 2
+ assert lineage.inputs == expected_inputs
+
+ @pytest.mark.parametrize("keys", ("", []))
+ @mock.patch("airflow.providers.amazon.aws.operators.s3.S3Hook")
+ def test_get_openlineage_facets_on_complete_no_objects(self, mock_hook,
keys):
+ op = S3DeleteObjectsOperator(
+ task_id="test_task_s3_delete_single_object", bucket="testbucket",
keys=keys
+ )
+ op.execute(None)
+
+ lineage = op.get_openlineage_facets_on_complete(None)
+ assert lineage == OperatorLineage()
+
class TestS3CreateObjectOperator:
@mock.patch.object(S3Hook, "load_string")
@@ -614,3 +746,17 @@ class TestS3CreateObjectOperator:
operator.execute(None)
mock_load_string.assert_called_once_with(data, S3_KEY, BUCKET_NAME,
False, False, None, None, None)
+
+ @pytest.mark.parametrize(("bucket", "key"), (("bucket", "file.txt"),
(None, "s3://bucket/file.txt")))
+ def test_get_openlineage_facets_on_start(self, bucket, key):
+ expected_output = Dataset(
+ namespace="s3://bucket",
+ name="file.txt",
+ )
+
+ op = S3CreateObjectOperator(task_id="test", s3_bucket=bucket,
s3_key=key, data="test")
+
+ lineage = op.get_openlineage_facets_on_start()
+ assert len(lineage.inputs) == 0
+ assert len(lineage.outputs) == 1
+ assert lineage.outputs[0] == expected_output