This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ef2ad070c2 Add OpenLineage support to `S3FileTransformOperator` 
(#35819)
ef2ad070c2 is described below

commit ef2ad070c2ecbcb4271f8fb4571fed73e7c8c039
Author: Kacper Muda <[email protected]>
AuthorDate: Thu Nov 23 17:17:23 2023 +0100

    Add OpenLineage support to `S3FileTransformOperator` (#35819)
---
 airflow/providers/amazon/aws/operators/s3.py    | 33 +++++++++++++++++++++++++
 tests/providers/amazon/aws/operators/test_s3.py | 22 +++++++++++++++++
 2 files changed, 55 insertions(+)

diff --git a/airflow/providers/amazon/aws/operators/s3.py 
b/airflow/providers/amazon/aws/operators/s3.py
index 068f73e622..37dbc6c528 100644
--- a/airflow/providers/amazon/aws/operators/s3.py
+++ b/airflow/providers/amazon/aws/operators/s3.py
@@ -693,6 +693,39 @@ class S3FileTransformOperator(BaseOperator):
             )
             self.log.info("Upload successful")
 
+    def get_openlineage_facets_on_start(self):
+        from openlineage.client.run import Dataset
+
+        from airflow.providers.openlineage.extractors import OperatorLineage
+
+        dest_bucket_name, dest_bucket_key = S3Hook.get_s3_bucket_key(
+            bucket=None,
+            key=self.dest_s3_key,
+            bucket_param_name="dest_bucket_name",
+            key_param_name="dest_bucket_key",
+        )
+
+        source_bucket_name, source_bucket_key = S3Hook.get_s3_bucket_key(
+            bucket=None,
+            key=self.source_s3_key,
+            bucket_param_name="source_bucket_name",
+            key_param_name="source_bucket_key",
+        )
+
+        input_dataset = Dataset(
+            namespace=f"s3://{source_bucket_name}",
+            name=source_bucket_key,
+        )
+        output_dataset = Dataset(
+            namespace=f"s3://{dest_bucket_name}",
+            name=dest_bucket_key,
+        )
+
+        return OperatorLineage(
+            inputs=[input_dataset],
+            outputs=[output_dataset],
+        )
+
 
 class S3ListOperator(BaseOperator):
     """
diff --git a/tests/providers/amazon/aws/operators/test_s3.py 
b/tests/providers/amazon/aws/operators/test_s3.py
index 80a4b645d4..0ec3de1fb4 100644
--- a/tests/providers/amazon/aws/operators/test_s3.py
+++ b/tests/providers/amazon/aws/operators/test_s3.py
@@ -299,6 +299,28 @@ class TestS3FileTransformOperator:
         result = conn.get_object(Bucket=self.bucket, Key=self.output_key)
         assert self.content == result["Body"].read()
 
+    def test_get_openlineage_facets_on_start(self):
+        expected_input = Dataset(
+            namespace=f"s3://{self.bucket}",
+            name=self.input_key,
+        )
+        expected_output = Dataset(
+            namespace=f"s3://{self.bucket}",
+            name=self.output_key,
+        )
+
+        op = S3FileTransformOperator(
+            task_id="test",
+            source_s3_key=f"s3://{self.bucket}/{self.input_key}",
+            dest_s3_key=f"s3://{self.bucket}/{self.output_key}",
+        )
+
+        lineage = op.get_openlineage_facets_on_start()
+        assert len(lineage.inputs) == 1
+        assert len(lineage.outputs) == 1
+        assert lineage.inputs[0] == expected_input
+        assert lineage.outputs[0] == expected_output
+
     @staticmethod
     def mock_process(mock_popen, return_code=0, process_output=None):
         mock_proc = mock.MagicMock()

Reply via email to