This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 093ab7e755 Add lineage_job_namespace and lineage_job_name OpenLineage
macros (#38829)
093ab7e755 is described below
commit 093ab7e7556bad9202e83e9fd6d968c50a5f7cb8
Author: Maxim Martynov <[email protected]>
AuthorDate: Mon Apr 8 20:17:14 2024 +0300
Add lineage_job_namespace and lineage_job_name OpenLineage macros (#38829)
---
airflow/providers/openlineage/plugins/macros.py | 43 ++++++++++++----
.../providers/openlineage/plugins/openlineage.py | 9 +++-
airflow/providers/openlineage/utils/utils.py | 2 +-
.../macros.rst | 59 ++++++++++++++--------
tests/providers/openlineage/plugins/test_macros.py | 36 ++++++++++---
5 files changed, 109 insertions(+), 40 deletions(-)
diff --git a/airflow/providers/openlineage/plugins/macros.py
b/airflow/providers/openlineage/plugins/macros.py
index 391b29495f..ddfceb3459 100644
--- a/airflow/providers/openlineage/plugins/macros.py
+++ b/airflow/providers/openlineage/plugins/macros.py
@@ -26,22 +26,41 @@ if TYPE_CHECKING:
from airflow.models import TaskInstance
+def lineage_job_namespace():
+ """
+ Macro function which returns Airflow OpenLineage namespace.
+
+ .. seealso::
+ For more information take a look at the guide:
+ :ref:`howto/macros:openlineage`
+ """
+ return conf.namespace()
+
+
+def lineage_job_name(task_instance: TaskInstance):
+ """
+ Macro function which returns Airflow task name in OpenLineage format
(`<dag_id>.<task_id>`).
+
+ .. seealso::
+ For more information take a look at the guide:
+ :ref:`howto/macros:openlineage`
+ """
+ return get_job_name(task_instance)
+
+
def lineage_run_id(task_instance: TaskInstance):
"""
- Macro function which returns the generated run id for a given task.
+ Macro function which returns the generated run id (UUID) for a given task.
This can be used to forward the run id from a task to a child run so the
job hierarchy is preserved.
.. seealso::
- For more information on how to use this operator, take a look at the
guide:
+ For more information take a look at the guide:
:ref:`howto/macros:openlineage`
"""
- if TYPE_CHECKING:
- assert task_instance.task
-
return OpenLineageAdapter.build_task_instance_run_id(
dag_id=task_instance.dag_id,
- task_id=task_instance.task.task_id,
+ task_id=task_instance.task_id,
execution_date=task_instance.execution_date,
try_number=task_instance.try_number,
)
@@ -56,9 +75,13 @@ def lineage_parent_id(task_instance: TaskInstance):
run so the job hierarchy is preserved. Child run can easily create
ParentRunFacet from these information.
.. seealso::
- For more information on how to use this macro, take a look at the
guide:
+ For more information take a look at the guide:
:ref:`howto/macros:openlineage`
"""
- job_name = get_job_name(task_instance.task)
- run_id = lineage_run_id(task_instance)
- return f"{conf.namespace()}/{job_name}/{run_id}"
+ return "/".join(
+ (
+ lineage_job_namespace(),
+ lineage_job_name(task_instance),
+ lineage_run_id(task_instance),
+ )
+ )
diff --git a/airflow/providers/openlineage/plugins/openlineage.py
b/airflow/providers/openlineage/plugins/openlineage.py
index a0be47a499..5927929588 100644
--- a/airflow/providers/openlineage/plugins/openlineage.py
+++ b/airflow/providers/openlineage/plugins/openlineage.py
@@ -19,7 +19,12 @@ from __future__ import annotations
from airflow.plugins_manager import AirflowPlugin
from airflow.providers.openlineage import conf
from airflow.providers.openlineage.plugins.listener import
get_openlineage_listener
-from airflow.providers.openlineage.plugins.macros import lineage_parent_id,
lineage_run_id
+from airflow.providers.openlineage.plugins.macros import (
+ lineage_job_name,
+ lineage_job_namespace,
+ lineage_parent_id,
+ lineage_run_id,
+)
class OpenLineageProviderPlugin(AirflowPlugin):
@@ -32,5 +37,5 @@ class OpenLineageProviderPlugin(AirflowPlugin):
name = "OpenLineageProviderPlugin"
if not conf.is_disabled():
- macros = [lineage_run_id, lineage_parent_id]
+ macros = [lineage_job_namespace, lineage_job_name, lineage_run_id,
lineage_parent_id]
listeners = [get_openlineage_listener()]
diff --git a/airflow/providers/openlineage/utils/utils.py
b/airflow/providers/openlineage/utils/utils.py
index fb2263b90d..1c777aff76 100644
--- a/airflow/providers/openlineage/utils/utils.py
+++ b/airflow/providers/openlineage/utils/utils.py
@@ -56,7 +56,7 @@ def get_operator_class(task: BaseOperator) -> type:
return task.__class__
-def get_job_name(task):
+def get_job_name(task: TaskInstance) -> str:
return f"{task.dag_id}.{task.task_id}"
diff --git a/docs/apache-airflow-providers-openlineage/macros.rst
b/docs/apache-airflow-providers-openlineage/macros.rst
index 72c1e3a7a5..3ce285f966 100644
--- a/docs/apache-airflow-providers-openlineage/macros.rst
+++ b/docs/apache-airflow-providers-openlineage/macros.rst
@@ -24,30 +24,49 @@ Macros included in OpenLineage plugin get integrated to
Airflow's main collectio
They can be invoked as a Jinja template, e.g.
-Lineage run id
---------------
+Lineage job & run macros
+------------------------
+
+These macros:
+ * ``lineage_job_namespace()``
+ * ``lineage_job_name(task_instance)``
+ * ``lineage_run_id(task_instance)``
+
+allow injecting pieces of run information of a given Airflow task into the
arguments sent to a remote processing job.
+For example, ``SparkSubmitOperator`` can be set up like this:
+
.. code-block:: python
- PythonOperator(
- task_id="render_template",
- python_callable=my_task_function,
- op_args=[
- "{{
macros.OpenLineageProviderPlugin.lineage_run_id(task_instance) }}"
- ], # lineage_run_id macro invoked
- provide_context=False,
- dag=dag,
- )
+ SparkSubmitOperator(
+ task_id="my_task",
+ application="/script.py",
+ conf={
+ # separated components
+ "spark.openlineage.parentJobNamespace": "{{
macros.OpenLineagePlugin.lineage_job_namespace() }}",
+ "spark.openlineage.parentJobName": "{{
macros.OpenLineagePlugin.lineage_job_name(task_instance) }}",
+ "spark.openlineage.parentRunId": "{{
macros.OpenLineagePlugin.lineage_run_id(task_instance) }}",
+ },
+ )
Lineage parent id
-----------------
+
+Same information, but compacted to one string, can be passed using
``linage_parent_id(task_instance)`` macro:
+
.. code-block:: python
- PythonOperator(
- task_id="render_template",
- python_callable=my_task_function,
- op_args=[
- "{{ macros.OpenLineageProviderPlugin.lineage_parent_id(run_id,
task_instance) }}"
- ], # lineage_parent_id macro invoked
- provide_context=False,
- dag=dag,
- )
+ def my_task_function(templates_dict, **kwargs):
+ parent_job_namespace, parent_job_name, parent_run_id =
templates_dict["parentRun"].split("/")
+ ...
+
+
+ PythonOperator(
+ task_id="render_template",
+ python_callable=my_task_function,
+ templates_dict={
+ # joined components as one string `<namespace>/<name>/<run_id>`
+ "parentRun": "{{
macros.OpenLineageProviderPlugin.lineage_parent_id(task_instance) }}",
+ },
+ provide_context=False,
+ dag=dag,
+ )
diff --git a/tests/providers/openlineage/plugins/test_macros.py
b/tests/providers/openlineage/plugins/test_macros.py
index 9e2160aa19..a735312ab6 100644
--- a/tests/providers/openlineage/plugins/test_macros.py
+++ b/tests/providers/openlineage/plugins/test_macros.py
@@ -20,16 +20,38 @@ import uuid
from unittest import mock
from airflow.providers.openlineage.conf import namespace
-from airflow.providers.openlineage.plugins.macros import lineage_parent_id,
lineage_run_id
+from airflow.providers.openlineage.plugins.macros import (
+ lineage_job_name,
+ lineage_job_namespace,
+ lineage_parent_id,
+ lineage_run_id,
+)
_DAG_NAMESPACE = namespace()
+def test_lineage_job_namespace():
+ assert lineage_job_namespace() == _DAG_NAMESPACE
+
+
+def test_lineage_job_name():
+ task_instance = mock.MagicMock(
+ dag_id="dag_id",
+ task_id="task_id",
+ execution_date="execution_date",
+ try_number=1,
+ )
+ assert lineage_job_name(task_instance) == "dag_id.task_id"
+
+
def test_lineage_run_id():
- task = mock.MagicMock(
- dag_id="dag_id", execution_date="execution_date", try_number=1,
task=mock.MagicMock(task_id="task_id")
+ task_instance = mock.MagicMock(
+ dag_id="dag_id",
+ task_id="task_id",
+ execution_date="execution_date",
+ try_number=1,
)
- actual = lineage_run_id(task)
+ actual = lineage_run_id(task_instance)
expected = str(
uuid.uuid3(
uuid.NAMESPACE_URL,
@@ -42,12 +64,12 @@ def test_lineage_run_id():
@mock.patch("airflow.providers.openlineage.plugins.macros.lineage_run_id")
def test_lineage_parent_id(mock_run_id):
mock_run_id.return_value = "run_id"
- task = mock.MagicMock(
+ task_instance = mock.MagicMock(
dag_id="dag_id",
+ task_id="task_id",
execution_date="execution_date",
try_number=1,
- task=mock.MagicMock(task_id="task_id", dag_id="dag_id"),
)
- actual = lineage_parent_id(task_instance=task)
+ actual = lineage_parent_id(task_instance)
expected = f"{_DAG_NAMESPACE}/dag_id.task_id/run_id"
assert actual == expected