This is an automated email from the ASF dual-hosted git repository. ash pushed a commit to branch v2-0-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit ac00eab33446171c2776d2822621fa8845a4bfcd Author: João Ponte <[email protected]> AuthorDate: Sat Apr 3 10:26:59 2021 +0200 Restore base lineage backend (#14146) This adds back the base lineage backend which can be extended to send lineage metadata to any custom backend. closes: #14106 Co-authored-by: Joao Ponte <[email protected]> Co-authored-by: Tomek Urbaszek <[email protected]> (cherry picked from commit af2d11e36ed43b0103a54780640493b8ae46d70e) --- airflow/lineage/__init__.py | 22 ++++++++++++++++++ airflow/lineage/backend.py | 47 +++++++++++++++++++++++++++++++++++++++ docs/apache-airflow/lineage.rst | 21 ++++++++++++++++++ tests/lineage/test_lineage.py | 49 ++++++++++++++++++++++++++++++++++++++++- 4 files changed, 138 insertions(+), 1 deletion(-) diff --git a/airflow/lineage/__init__.py b/airflow/lineage/__init__.py index 65f19ef..905eb00 100644 --- a/airflow/lineage/__init__.py +++ b/airflow/lineage/__init__.py @@ -25,6 +25,8 @@ import attr import jinja2 from cattr import structure, unstructure +from airflow.configuration import conf +from airflow.lineage.backend import LineageBackend from airflow.utils.module_loading import import_string ENV = jinja2.Environment() @@ -45,6 +47,22 @@ class Metadata: data: Dict = attr.ib() +def get_backend() -> Optional[LineageBackend]: + """Gets the lineage backend if defined in the configs""" + clazz = conf.getimport("lineage", "backend", fallback=None) + + if clazz: + if not issubclass(clazz, LineageBackend): + raise TypeError( + f"Your custom Lineage class `{clazz.__name__}` " + f"is not a subclass of `{LineageBackend.__name__}`." + ) + else: + return clazz() + + return None + + def _get_instance(meta: Metadata): """Instantiate an object from Metadata""" cls = import_string(meta.type_name) @@ -82,6 +100,7 @@ def apply_lineage(func: T) -> T: Saves the lineage to XCom and if configured to do so sends it to the backend. """ + _backend = get_backend() @wraps(func) def wrapper(self, context, *args, **kwargs): @@ -101,6 +120,9 @@ def apply_lineage(func: T) -> T: context, key=PIPELINE_INLETS, value=inlets, execution_date=context['ti'].execution_date ) + if _backend: + _backend.send_lineage(operator=self, inlets=self.inlets, outlets=self.outlets, context=context) + return ret_val return cast(T, wrapper) diff --git a/airflow/lineage/backend.py b/airflow/lineage/backend.py new file mode 100644 index 0000000..edfbe0e --- /dev/null +++ b/airflow/lineage/backend.py @@ -0,0 +1,47 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Sends lineage metadata to a backend""" +from typing import TYPE_CHECKING, Optional + +if TYPE_CHECKING: + from airflow.models.baseoperator import BaseOperator # pylint: disable=cyclic-import + + +class LineageBackend: + """Sends lineage metadata to a backend""" + + def send_lineage( + self, + operator: 'BaseOperator', + inlets: Optional[list] = None, + outlets: Optional[list] = None, + context: Optional[dict] = None, + ): + """ + Sends lineage metadata to a backend + + :param operator: the operator executing a transformation on the inlets and outlets + :type operator: airflow.models.baseoperator.BaseOperator + :param inlets: the inlets to this operator + :type inlets: list + :param outlets: the outlets from this operator + :type outlets: list + :param context: the current context of the task instance + :type context: dict + """ + raise NotImplementedError() diff --git a/docs/apache-airflow/lineage.rst b/docs/apache-airflow/lineage.rst index a29f042..362d3e6 100644 --- a/docs/apache-airflow/lineage.rst +++ b/docs/apache-airflow/lineage.rst @@ -95,3 +95,24 @@ has outlets defined (e.g. by using ``add_outlets(..)`` or has out of the box sup f_in > run_this | (run_this_last > outlets) .. _precedence: https://docs.python.org/3/reference/expressions.html + + +Lineage Backend +--------------- + +It's possible to push the lineage metrics to a custom backend by providing an instance of a LinageBackend in the config: + +.. code-block:: ini + + [lineage] + backend = my.lineage.CustomBackend + +The backend should inherit from ``airflow.lineage.LineageBackend``. + +.. code-block:: python + + from airflow.lineage.backend import LineageBackend + + class ExampleBackend(LineageBackend): + def send_lineage(self, operator, inlets=None, outlets=None, context=None): + # Send the info to some external service diff --git a/tests/lineage/test_lineage.py b/tests/lineage/test_lineage.py index 350a8be..b5ebbea 100644 --- a/tests/lineage/test_lineage.py +++ b/tests/lineage/test_lineage.py @@ -16,16 +16,24 @@ # specific language governing permissions and limitations # under the License. import unittest +from unittest import mock -from airflow.lineage import AUTO +from airflow.lineage import AUTO, apply_lineage, get_backend, prepare_lineage +from airflow.lineage.backend import LineageBackend from airflow.lineage.entities import File from airflow.models import DAG, TaskInstance as TI from airflow.operators.dummy import DummyOperator from airflow.utils import timezone +from tests.test_utils.config import conf_vars DEFAULT_DATE = timezone.datetime(2016, 1, 1) +class CustomLineageBackend(LineageBackend): + def send_lineage(self, operator, inlets=None, outlets=None, context=None): + pass + + class TestLineage(unittest.TestCase): def test_lineage(self): dag = DAG(dag_id='test_prepare_lineage', start_date=DEFAULT_DATE) @@ -111,3 +119,42 @@ class TestLineage(unittest.TestCase): op1.pre_execute(ctx1) assert op1.inlets[0].url == f1s.format(DEFAULT_DATE) assert op1.outlets[0].url == f1s.format(DEFAULT_DATE) + + @mock.patch("airflow.lineage.get_backend") + def test_lineage_is_sent_to_backend(self, mock_get_backend): + class TestBackend(LineageBackend): + def send_lineage(self, operator, inlets=None, outlets=None, context=None): + assert len(inlets) == 1 + assert len(outlets) == 1 + + func = mock.Mock() + func.__name__ = 'foo' + + mock_get_backend.return_value = TestBackend() + + dag = DAG(dag_id='test_lineage_is_sent_to_backend', start_date=DEFAULT_DATE) + + with dag: + op1 = DummyOperator(task_id='task1') + + file1 = File("/tmp/some_file") + + op1.inlets.append(file1) + op1.outlets.append(file1) + + ctx1 = {"ti": TI(task=op1, execution_date=DEFAULT_DATE), "execution_date": DEFAULT_DATE} + + prep = prepare_lineage(func) + prep(op1, ctx1) + post = apply_lineage(func) + post(op1, ctx1) + + def test_empty_lineage_backend(self): + backend = get_backend() + assert backend is None + + @conf_vars({("lineage", "backend"): "tests.lineage.test_lineage.CustomLineageBackend"}) + def test_resolve_lineage_class(self): + backend = get_backend() + assert issubclass(backend.__class__, LineageBackend) + assert isinstance(backend, CustomLineageBackend)
