This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e1dd9b5cd2 openlineage, ftp: add OpenLineage support for
FTPFileTransferOperator (#31354)
e1dd9b5cd2 is described below
commit e1dd9b5cd2c5b19aec152f097709c7de02c42f34
Author: Maciej Obuchowski <[email protected]>
AuthorDate: Sun Jul 30 21:26:33 2023 +0200
openlineage, ftp: add OpenLineage support for FTPFileTransferOperator
(#31354)
Signed-off-by: Maciej Obuchowski <[email protected]>
Co-authored-by: eladkal <[email protected]>
---
airflow/providers/ftp/operators/ftp.py | 62 +++++++++++++++++++++++++++++
generated/provider_dependencies.json | 4 +-
tests/providers/ftp/operators/test_ftp.py | 65 ++++++++++++++++++++++++++++++-
3 files changed, 129 insertions(+), 2 deletions(-)
diff --git a/airflow/providers/ftp/operators/ftp.py
b/airflow/providers/ftp/operators/ftp.py
index 8c935751d4..45bccbea4c 100644
--- a/airflow/providers/ftp/operators/ftp.py
+++ b/airflow/providers/ftp/operators/ftp.py
@@ -19,6 +19,8 @@
from __future__ import annotations
import os
+import socket
+from ftplib import FTP_PORT
from functools import cached_property
from pathlib import Path
from typing import Any, Sequence
@@ -135,6 +137,66 @@ class FTPFileTransmitOperator(BaseOperator):
return self.local_filepath
+ def get_openlineage_facets_on_start(self):
+ """
+ Returns OpenLineage datasets with following naming structure:
+ input: file://hostname/path
+ output file://<conn.host>:<conn.port>/path.
+ """
+ from openlineage.client.run import Dataset
+
+ from airflow.providers.openlineage.extractors import OperatorLineage
+
+ scheme = "file"
+
+ local_host = socket.gethostname()
+ try:
+ local_host = socket.gethostbyname(local_host)
+ except Exception as e:
+ self.log.warning(
+ f"Failed to resolve local hostname. Using the hostname got by
socket.gethostbyname() without resolution. {e}", # noqa: E501
+ exc_info=True,
+ )
+
+ conn = self.hook.get_conn()
+ remote_host = conn.host
+ remote_port = conn.port
+
+ if isinstance(self.local_filepath, str):
+ local_filepath = [self.local_filepath]
+ else:
+ local_filepath = self.local_filepath
+ if isinstance(self.remote_filepath, str):
+ remote_filepath = [self.remote_filepath]
+ else:
+ remote_filepath = self.remote_filepath
+
+ local_datasets = [
+ Dataset(namespace=self._get_namespace(scheme, local_host, None,
path), name=path)
+ for path in local_filepath
+ ]
+ remote_datasets = [
+ Dataset(namespace=self._get_namespace(scheme, remote_host,
remote_port, path), name=path)
+ for path in remote_filepath
+ ]
+
+ if self.operation.lower() == FTPOperation.GET:
+ inputs = remote_datasets
+ outputs = local_datasets
+ else:
+ inputs = local_datasets
+ outputs = remote_datasets
+
+ return OperatorLineage(
+ inputs=inputs,
+ outputs=outputs,
+ )
+
+ def _get_namespace(self, scheme, host, port, path) -> str:
+ port = port or FTP_PORT
+ authority = f"{host}:{port}"
+ return f"{scheme}://{authority}"
+
class FTPSFileTransmitOperator(FTPFileTransmitOperator):
"""
diff --git a/generated/provider_dependencies.json
b/generated/provider_dependencies.json
index 19cdf951cf..fadffea64a 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -388,7 +388,9 @@
"deps": [
"apache-airflow>=2.4.0"
],
- "cross-providers-deps": [],
+ "cross-providers-deps": [
+ "openlineage"
+ ],
"excluded-python-versions": []
},
"github": {
diff --git a/tests/providers/ftp/operators/test_ftp.py
b/tests/providers/ftp/operators/test_ftp.py
index f62e3f2bd4..81b554b26b 100644
--- a/tests/providers/ftp/operators/test_ftp.py
+++ b/tests/providers/ftp/operators/test_ftp.py
@@ -17,16 +17,19 @@
# under the License.
from __future__ import annotations
+import socket
from unittest import mock
import pytest
+from openlineage.client.run import Dataset
-from airflow.models import DAG
+from airflow.models import DAG, Connection
from airflow.providers.ftp.operators.ftp import (
FTPFileTransmitOperator,
FTPOperation,
FTPSFileTransmitOperator,
)
+from airflow.utils import timezone
from airflow.utils.timezone import datetime
DEFAULT_DATE = datetime(2017, 1, 1)
@@ -278,3 +281,63 @@ class TestFTPSFileTransmitOperator:
assert mock_put.call_count == 2
for count, (args, _) in enumerate(mock_put.call_args_list):
assert args == (remote_filepath[count], local_filepath[count])
+
+ @mock.patch("airflow.providers.ftp.hooks.ftp.FTPHook.get_conn",
spec=Connection)
+ def test_extract_get(self, get_conn):
+ get_conn.return_value = Connection(
+ conn_id="ftp_conn_id",
+ conn_type="ftp",
+ host="remotehost",
+ port=21,
+ )
+
+ dag_id = "ftp_dag"
+ task_id = "ftp_task"
+
+ task = FTPFileTransmitOperator(
+ task_id=task_id,
+ ftp_conn_id="ftp_conn_id",
+ dag=DAG(dag_id),
+ start_date=timezone.utcnow(),
+ local_filepath="/path/to/local",
+ remote_filepath="/path/to/remote",
+ operation=FTPOperation.GET,
+ )
+ lineage = task.get_openlineage_facets_on_start()
+
+ assert lineage.inputs == [Dataset(namespace="file://remotehost:21",
name="/path/to/remote")]
+ assert lineage.outputs == [
+ Dataset(
+
namespace=f"file://{socket.gethostbyname(socket.gethostname())}:21",
name="/path/to/local"
+ )
+ ]
+
+ @mock.patch("airflow.providers.ftp.hooks.ftp.FTPHook.get_conn",
spec=Connection)
+ def test_extract_put(self, get_conn):
+ get_conn.return_value = Connection(
+ conn_id="ftp_conn_id",
+ conn_type="ftp",
+ host="remotehost",
+ port=21,
+ )
+
+ dag_id = "ftp_dag"
+ task_id = "ftp_task"
+
+ task = FTPFileTransmitOperator(
+ task_id=task_id,
+ ftp_conn_id="ftp_conn_id",
+ dag=DAG(dag_id),
+ start_date=timezone.utcnow(),
+ local_filepath="/path/to/local",
+ remote_filepath="/path/to/remote",
+ operation=FTPOperation.PUT,
+ )
+ lineage = task.get_openlineage_facets_on_start()
+
+ assert lineage.inputs == [
+ Dataset(
+
namespace=f"file://{socket.gethostbyname(socket.gethostname())}:21",
name="/path/to/local"
+ )
+ ]
+ assert lineage.outputs == [Dataset(namespace="file://remotehost:21",
name="/path/to/remote")]