This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ef2ad070c2 Add OpenLineage support to `S3FileTransformOperator`
(#35819)
ef2ad070c2 is described below
commit ef2ad070c2ecbcb4271f8fb4571fed73e7c8c039
Author: Kacper Muda <[email protected]>
AuthorDate: Thu Nov 23 17:17:23 2023 +0100
Add OpenLineage support to `S3FileTransformOperator` (#35819)
---
airflow/providers/amazon/aws/operators/s3.py | 33 +++++++++++++++++++++++++
tests/providers/amazon/aws/operators/test_s3.py | 22 +++++++++++++++++
2 files changed, 55 insertions(+)
diff --git a/airflow/providers/amazon/aws/operators/s3.py
b/airflow/providers/amazon/aws/operators/s3.py
index 068f73e622..37dbc6c528 100644
--- a/airflow/providers/amazon/aws/operators/s3.py
+++ b/airflow/providers/amazon/aws/operators/s3.py
@@ -693,6 +693,39 @@ class S3FileTransformOperator(BaseOperator):
)
self.log.info("Upload successful")
+ def get_openlineage_facets_on_start(self):
+ from openlineage.client.run import Dataset
+
+ from airflow.providers.openlineage.extractors import OperatorLineage
+
+ dest_bucket_name, dest_bucket_key = S3Hook.get_s3_bucket_key(
+ bucket=None,
+ key=self.dest_s3_key,
+ bucket_param_name="dest_bucket_name",
+ key_param_name="dest_bucket_key",
+ )
+
+ source_bucket_name, source_bucket_key = S3Hook.get_s3_bucket_key(
+ bucket=None,
+ key=self.source_s3_key,
+ bucket_param_name="source_bucket_name",
+ key_param_name="source_bucket_key",
+ )
+
+ input_dataset = Dataset(
+ namespace=f"s3://{source_bucket_name}",
+ name=source_bucket_key,
+ )
+ output_dataset = Dataset(
+ namespace=f"s3://{dest_bucket_name}",
+ name=dest_bucket_key,
+ )
+
+ return OperatorLineage(
+ inputs=[input_dataset],
+ outputs=[output_dataset],
+ )
+
class S3ListOperator(BaseOperator):
"""
diff --git a/tests/providers/amazon/aws/operators/test_s3.py
b/tests/providers/amazon/aws/operators/test_s3.py
index 80a4b645d4..0ec3de1fb4 100644
--- a/tests/providers/amazon/aws/operators/test_s3.py
+++ b/tests/providers/amazon/aws/operators/test_s3.py
@@ -299,6 +299,28 @@ class TestS3FileTransformOperator:
result = conn.get_object(Bucket=self.bucket, Key=self.output_key)
assert self.content == result["Body"].read()
+ def test_get_openlineage_facets_on_start(self):
+ expected_input = Dataset(
+ namespace=f"s3://{self.bucket}",
+ name=self.input_key,
+ )
+ expected_output = Dataset(
+ namespace=f"s3://{self.bucket}",
+ name=self.output_key,
+ )
+
+ op = S3FileTransformOperator(
+ task_id="test",
+ source_s3_key=f"s3://{self.bucket}/{self.input_key}",
+ dest_s3_key=f"s3://{self.bucket}/{self.output_key}",
+ )
+
+ lineage = op.get_openlineage_facets_on_start()
+ assert len(lineage.inputs) == 1
+ assert len(lineage.outputs) == 1
+ assert lineage.inputs[0] == expected_input
+ assert lineage.outputs[0] == expected_output
+
@staticmethod
def mock_process(mock_popen, return_code=0, process_output=None):
mock_proc = mock.MagicMock()