This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e1dd9b5cd2 openlineage, ftp: add OpenLineage support for 
FTPFileTransferOperator (#31354)
e1dd9b5cd2 is described below

commit e1dd9b5cd2c5b19aec152f097709c7de02c42f34
Author: Maciej Obuchowski <[email protected]>
AuthorDate: Sun Jul 30 21:26:33 2023 +0200

    openlineage, ftp: add OpenLineage support for FTPFileTransferOperator 
(#31354)
    
    Signed-off-by: Maciej Obuchowski <[email protected]>
    Co-authored-by: eladkal <[email protected]>
---
 airflow/providers/ftp/operators/ftp.py    | 62 +++++++++++++++++++++++++++++
 generated/provider_dependencies.json      |  4 +-
 tests/providers/ftp/operators/test_ftp.py | 65 ++++++++++++++++++++++++++++++-
 3 files changed, 129 insertions(+), 2 deletions(-)

diff --git a/airflow/providers/ftp/operators/ftp.py 
b/airflow/providers/ftp/operators/ftp.py
index 8c935751d4..45bccbea4c 100644
--- a/airflow/providers/ftp/operators/ftp.py
+++ b/airflow/providers/ftp/operators/ftp.py
@@ -19,6 +19,8 @@
 from __future__ import annotations
 
 import os
+import socket
+from ftplib import FTP_PORT
 from functools import cached_property
 from pathlib import Path
 from typing import Any, Sequence
@@ -135,6 +137,66 @@ class FTPFileTransmitOperator(BaseOperator):
 
         return self.local_filepath
 
+    def get_openlineage_facets_on_start(self):
+        """
+        Returns OpenLineage datasets with following naming structure:
+                input: file://hostname/path
+                output file://<conn.host>:<conn.port>/path.
+        """
+        from openlineage.client.run import Dataset
+
+        from airflow.providers.openlineage.extractors import OperatorLineage
+
+        scheme = "file"
+
+        local_host = socket.gethostname()
+        try:
+            local_host = socket.gethostbyname(local_host)
+        except Exception as e:
+            self.log.warning(
+                f"Failed to resolve local hostname. Using the hostname got by 
socket.gethostbyname() without resolution. {e}",  # noqa: E501
+                exc_info=True,
+            )
+
+        conn = self.hook.get_conn()
+        remote_host = conn.host
+        remote_port = conn.port
+
+        if isinstance(self.local_filepath, str):
+            local_filepath = [self.local_filepath]
+        else:
+            local_filepath = self.local_filepath
+        if isinstance(self.remote_filepath, str):
+            remote_filepath = [self.remote_filepath]
+        else:
+            remote_filepath = self.remote_filepath
+
+        local_datasets = [
+            Dataset(namespace=self._get_namespace(scheme, local_host, None, 
path), name=path)
+            for path in local_filepath
+        ]
+        remote_datasets = [
+            Dataset(namespace=self._get_namespace(scheme, remote_host, 
remote_port, path), name=path)
+            for path in remote_filepath
+        ]
+
+        if self.operation.lower() == FTPOperation.GET:
+            inputs = remote_datasets
+            outputs = local_datasets
+        else:
+            inputs = local_datasets
+            outputs = remote_datasets
+
+        return OperatorLineage(
+            inputs=inputs,
+            outputs=outputs,
+        )
+
+    def _get_namespace(self, scheme, host, port, path) -> str:
+        port = port or FTP_PORT
+        authority = f"{host}:{port}"
+        return f"{scheme}://{authority}"
+
 
 class FTPSFileTransmitOperator(FTPFileTransmitOperator):
     """
diff --git a/generated/provider_dependencies.json 
b/generated/provider_dependencies.json
index 19cdf951cf..fadffea64a 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -388,7 +388,9 @@
     "deps": [
       "apache-airflow>=2.4.0"
     ],
-    "cross-providers-deps": [],
+    "cross-providers-deps": [
+      "openlineage"
+    ],
     "excluded-python-versions": []
   },
   "github": {
diff --git a/tests/providers/ftp/operators/test_ftp.py 
b/tests/providers/ftp/operators/test_ftp.py
index f62e3f2bd4..81b554b26b 100644
--- a/tests/providers/ftp/operators/test_ftp.py
+++ b/tests/providers/ftp/operators/test_ftp.py
@@ -17,16 +17,19 @@
 # under the License.
 from __future__ import annotations
 
+import socket
 from unittest import mock
 
 import pytest
+from openlineage.client.run import Dataset
 
-from airflow.models import DAG
+from airflow.models import DAG, Connection
 from airflow.providers.ftp.operators.ftp import (
     FTPFileTransmitOperator,
     FTPOperation,
     FTPSFileTransmitOperator,
 )
+from airflow.utils import timezone
 from airflow.utils.timezone import datetime
 
 DEFAULT_DATE = datetime(2017, 1, 1)
@@ -278,3 +281,63 @@ class TestFTPSFileTransmitOperator:
         assert mock_put.call_count == 2
         for count, (args, _) in enumerate(mock_put.call_args_list):
             assert args == (remote_filepath[count], local_filepath[count])
+
+    @mock.patch("airflow.providers.ftp.hooks.ftp.FTPHook.get_conn", 
spec=Connection)
+    def test_extract_get(self, get_conn):
+        get_conn.return_value = Connection(
+            conn_id="ftp_conn_id",
+            conn_type="ftp",
+            host="remotehost",
+            port=21,
+        )
+
+        dag_id = "ftp_dag"
+        task_id = "ftp_task"
+
+        task = FTPFileTransmitOperator(
+            task_id=task_id,
+            ftp_conn_id="ftp_conn_id",
+            dag=DAG(dag_id),
+            start_date=timezone.utcnow(),
+            local_filepath="/path/to/local",
+            remote_filepath="/path/to/remote",
+            operation=FTPOperation.GET,
+        )
+        lineage = task.get_openlineage_facets_on_start()
+
+        assert lineage.inputs == [Dataset(namespace="file://remotehost:21", 
name="/path/to/remote")]
+        assert lineage.outputs == [
+            Dataset(
+                
namespace=f"file://{socket.gethostbyname(socket.gethostname())}:21", 
name="/path/to/local"
+            )
+        ]
+
+    @mock.patch("airflow.providers.ftp.hooks.ftp.FTPHook.get_conn", 
spec=Connection)
+    def test_extract_put(self, get_conn):
+        get_conn.return_value = Connection(
+            conn_id="ftp_conn_id",
+            conn_type="ftp",
+            host="remotehost",
+            port=21,
+        )
+
+        dag_id = "ftp_dag"
+        task_id = "ftp_task"
+
+        task = FTPFileTransmitOperator(
+            task_id=task_id,
+            ftp_conn_id="ftp_conn_id",
+            dag=DAG(dag_id),
+            start_date=timezone.utcnow(),
+            local_filepath="/path/to/local",
+            remote_filepath="/path/to/remote",
+            operation=FTPOperation.PUT,
+        )
+        lineage = task.get_openlineage_facets_on_start()
+
+        assert lineage.inputs == [
+            Dataset(
+                
namespace=f"file://{socket.gethostbyname(socket.gethostname())}:21", 
name="/path/to/local"
+            )
+        ]
+        assert lineage.outputs == [Dataset(namespace="file://remotehost:21", 
name="/path/to/remote")]

Reply via email to