This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1a16b9d3d0f feat(openlineage): Add parentRunFacet for DAG events
(#57809)
1a16b9d3d0f is described below
commit 1a16b9d3d0f59978a6f7abe60188c7cf5f1a1570
Author: Kacper Muda <[email protected]>
AuthorDate: Thu Nov 6 14:36:49 2025 +0100
feat(openlineage): Add parentRunFacet for DAG events (#57809)
---
providers/openlineage/docs/guides/user.rst | 50 +++++
.../providers/openlineage/plugins/listener.py | 43 +++-
.../providers/openlineage/plugins/macros.py | 70 ++++---
.../providers/openlineage/plugins/openlineage.py | 2 +
.../airflow/providers/openlineage/utils/utils.py | 168 ++++++++++++++-
.../openlineage/example_openlineage_trigger_dag.py | 13 +-
.../expected_events/openlineage_trigger_dag.json | 160 +++++++++++++-
.../tests/unit/openlineage/plugins/test_macros.py | 210 ++++++++++++++++++-
.../tests/unit/openlineage/utils/test_utils.py | 233 +++++++++++++++++++++
9 files changed, 897 insertions(+), 52 deletions(-)
diff --git a/providers/openlineage/docs/guides/user.rst
b/providers/openlineage/docs/guides/user.rst
index 730d7d0f2ff..50961dc08a5 100644
--- a/providers/openlineage/docs/guides/user.rst
+++ b/providers/openlineage/docs/guides/user.rst
@@ -478,6 +478,56 @@ You can enable this automation by setting
``spark_inject_transport_info`` option
AIRFLOW__OPENLINEAGE__SPARK_INJECT_TRANSPORT_INFO=true
+Passing parent information to Airflow DAG
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+To enable full OpenLineage lineage tracking across dependent DAGs, you can
pass parent and root job information
+through the DAG's ``dag_run.conf``. When a DAG run configuration includes an
``openlineage`` section with valid metadata,
+this information is automatically parsed and converted into DAG run's
``parentRunFacet``, from which the root information
+is also propagated to all tasks. If no DAG run ``openlineage`` configuration
is provided, the DAG run will not contain
+``parentRunFacet`` and root of all tasks will default to Dag run.
+
+The ``openlineage`` dict in conf should contain the following keys:
+
+
+*(all three values must be included to create a parent reference)*
+
+- **parentRunId** — the unique run ID (uuid) of the direct parent job
+- **parentJobName** — the name of the parent job
+- **parentJobNamespace** — the namespace of the parent job
+
+*(all three values must be included to create a root reference, otherwise
parent will be used as root)*
+
+- **rootParentRunId** — the run ID (uuid) of the top-level (root) job
+- **rootParentJobName** — the name of the top-level (root) job
+- **rootParentJobNamespace** — the namespace of the top-level (root) job
+
+.. note::
+
+ We highly recommend providing all six OpenLineage identifiers (parent and
root) to ensure complete lineage tracking. If the root information is missing,
the parent set will be used as the root; if any of the three parent fields are
missing, no parent facet will be created. Partial or mixed configurations are
not supported - either all three parent or all three root values must be
provided together.
+
+
+Example:
+
+.. code-block:: shell
+
+ curl -X POST "http://<AIRFLOW_HOST>/api/v2/dags/my_dag_name/dagRuns" \
+ -H "Content-Type: application/json" \
+ -d '{
+ "logical_date": "2019-08-24T14:15:22Z",
+ "conf": {
+ "openlineage": {
+ "parentRunId": "3bb703d1-09c1-4a42-8da5-35a0b3216072",
+ "parentJobNamespace": "prod_biz",
+ "parentJobName": "get_files",
+ "rootParentRunId": "9d3b14f7-de91-40b6-aeef-e887e2c7673e",
+ "rootParentJobNamespace": "prod_analytics",
+ "rootParentJobName": "generate_report_sales_e2e"
+ }
+ }
+ }'
+
+
Troubleshooting
===============
diff --git
a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
index bd1931e3ca9..d7e87c86965 100644
---
a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
+++
b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
@@ -41,7 +41,9 @@ from airflow.providers.openlineage.utils.utils import (
get_airflow_mapped_task_facet,
get_airflow_run_facet,
get_dag_documentation,
+ get_dag_parent_run_facet,
get_job_name,
+ get_root_information_from_dagrun_conf,
get_task_documentation,
get_task_parent_run_facet,
get_user_provided_run_facets,
@@ -224,7 +226,11 @@ class OpenLineageListener:
task=task_metadata,
run_facets={
**get_user_provided_run_facets(task_instance,
TaskInstanceState.RUNNING),
- **get_task_parent_run_facet(parent_run_id=parent_run_id,
parent_job_name=dag.dag_id),
+ **get_task_parent_run_facet(
+ parent_run_id=parent_run_id,
+ parent_job_name=dag.dag_id,
+
**get_root_information_from_dagrun_conf(getattr(dagrun, "conf", {})),
+ ),
**get_airflow_mapped_task_facet(task_instance),
**get_airflow_run_facet(dagrun, dag, task_instance, task,
task_uuid),
**debug_facet,
@@ -351,7 +357,11 @@ class OpenLineageListener:
nominal_end_time=data_interval_end,
run_facets={
**get_user_provided_run_facets(task_instance,
TaskInstanceState.SUCCESS),
- **get_task_parent_run_facet(parent_run_id=parent_run_id,
parent_job_name=dag.dag_id),
+ **get_task_parent_run_facet(
+ parent_run_id=parent_run_id,
+ parent_job_name=dag.dag_id,
+
**get_root_information_from_dagrun_conf(getattr(dagrun, "conf", {})),
+ ),
**get_airflow_run_facet(dagrun, dag, task_instance, task,
task_uuid),
**get_airflow_debug_facet(),
},
@@ -489,7 +499,11 @@ class OpenLineageListener:
job_description_type=doc_type,
run_facets={
**get_user_provided_run_facets(task_instance,
TaskInstanceState.FAILED),
- **get_task_parent_run_facet(parent_run_id=parent_run_id,
parent_job_name=dag.dag_id),
+ **get_task_parent_run_facet(
+ parent_run_id=parent_run_id,
+ parent_job_name=dag.dag_id,
+
**get_root_information_from_dagrun_conf(getattr(dagrun, "conf", {})),
+ ),
**get_airflow_run_facet(dagrun, dag, task_instance, task,
task_uuid),
**get_airflow_debug_facet(),
},
@@ -540,7 +554,11 @@ class OpenLineageListener:
"job_description": None,
"job_description_type": None,
"run_facets": {
- **get_task_parent_run_facet(parent_run_id=parent_run_id,
parent_job_name=ti.dag_id),
+ **get_task_parent_run_facet(
+ parent_run_id=parent_run_id,
+ parent_job_name=ti.dag_id,
+
**get_root_information_from_dagrun_conf(getattr(dagrun, "conf", {})),
+ ),
**get_airflow_debug_facet(),
},
}
@@ -645,8 +663,6 @@ class OpenLineageListener:
)
data_interval_end = dag_run.data_interval_end.isoformat() if
dag_run.data_interval_end else None
- run_facets = {**get_airflow_dag_run_facet(dag_run)}
-
date = dag_run.logical_date
if AIRFLOW_V_3_0_PLUS and date is None:
date = dag_run.run_after
@@ -660,7 +676,6 @@ class OpenLineageListener:
start_date=dag_run.start_date,
nominal_start_time=data_interval_start,
nominal_end_time=data_interval_end,
- run_facets=run_facets,
clear_number=dag_run.clear_number,
owners=[x.strip() for x in dag_run.dag.owner.split(",")] if
dag_run.dag else None,
job_description=doc,
@@ -669,6 +684,10 @@ class OpenLineageListener:
# AirflowJobFacet should be created outside
ProcessPoolExecutor that pickles objects,
# as it causes lack of some TaskGroup attributes and crashes
event emission.
job_facets=get_airflow_job_facet(dag_run=dag_run),
+ run_facets={
+ **get_airflow_dag_run_facet(dag_run),
+ **get_dag_parent_run_facet(getattr(dag_run, "conf", {})),
+ },
)
except BaseException as e:
self.log.warning("OpenLineage received exception in method
on_dag_run_running", exc_info=e)
@@ -716,7 +735,10 @@ class OpenLineageListener:
job_description_type=doc_type,
task_ids=task_ids,
dag_run_state=dag_run.get_state(),
- run_facets={**get_airflow_dag_run_facet(dag_run)},
+ run_facets={
+ **get_airflow_dag_run_facet(dag_run),
+ **get_dag_parent_run_facet(getattr(dag_run, "conf", {})),
+ },
)
except BaseException as e:
self.log.warning("OpenLineage received exception in method
on_dag_run_success", exc_info=e)
@@ -765,7 +787,10 @@ class OpenLineageListener:
dag_run_state=dag_run.get_state(),
task_ids=task_ids,
msg=msg,
- run_facets={**get_airflow_dag_run_facet(dag_run)},
+ run_facets={
+ **get_airflow_dag_run_facet(dag_run),
+ **get_dag_parent_run_facet(getattr(dag_run, "conf", {})),
+ },
)
except BaseException as e:
self.log.warning("OpenLineage received exception in method
on_dag_run_failed", exc_info=e)
diff --git
a/providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py
b/providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py
index ac9c75b4a94..508c89eafda 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py
@@ -20,7 +20,7 @@ from typing import TYPE_CHECKING
from airflow.providers.openlineage import conf
from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter
-from airflow.providers.openlineage.utils.utils import get_job_name
+from airflow.providers.openlineage.utils.utils import get_job_name,
get_root_information_from_dagrun_conf
from airflow.providers.openlineage.version_compat import AIRFLOW_V_3_0_PLUS
if TYPE_CHECKING:
@@ -102,7 +102,7 @@ def lineage_root_parent_id(task_instance: TaskInstance):
"""
return "/".join(
(
- lineage_job_namespace(),
+ lineage_root_job_namespace(task_instance),
lineage_root_job_name(task_instance),
lineage_root_run_id(task_instance),
)
@@ -110,10 +110,16 @@ def lineage_root_parent_id(task_instance: TaskInstance):
def lineage_root_job_name(task_instance: TaskInstance):
+ root_parent_job_name = _get_ol_root_id("root_parent_job_name",
task_instance)
+ if root_parent_job_name:
+ return root_parent_job_name
return task_instance.dag_id
def lineage_root_run_id(task_instance: TaskInstance):
+ root_parent_run_id = _get_ol_root_id("root_parent_run_id", task_instance)
+ if root_parent_run_id:
+ return root_parent_run_id
return OpenLineageAdapter.build_dag_run_id(
dag_id=task_instance.dag_id,
logical_date=_get_logical_date(task_instance),
@@ -121,32 +127,44 @@ def lineage_root_run_id(task_instance: TaskInstance):
)
+def lineage_root_job_namespace(task_instance: TaskInstance):
+ root_parent_job_namespace = _get_ol_root_id("root_parent_job_namespace",
task_instance)
+ if root_parent_job_namespace:
+ return root_parent_job_namespace
+ return conf.namespace()
+
+
+def _get_ol_root_id(id_key: str, task_instance: TaskInstance) -> str | None:
+ dr_conf = _get_dag_run_conf(task_instance=task_instance)
+ ol_root_info = get_root_information_from_dagrun_conf(dr_conf=dr_conf)
+ if ol_root_info and ol_root_info.get(id_key):
+ return ol_root_info[id_key]
+ return None
+
+
+def _get_dagrun_from_ti(task_instance: TaskInstance):
+ context = task_instance.get_template_context()
+ if getattr(task_instance, "dag_run", None):
+ return task_instance.dag_run
+ return context["dag_run"]
+
+
+def _get_dag_run_conf(task_instance: TaskInstance) -> dict:
+ dr = _get_dagrun_from_ti(task_instance=task_instance)
+ return dr.conf or {}
+
+
def _get_dag_run_clear_number(task_instance: TaskInstance):
- # todo: remove when min airflow version >= 3.0
- if AIRFLOW_V_3_0_PLUS:
- context = task_instance.get_template_context()
- if hasattr(task_instance, "dag_run"):
- dag_run = task_instance.dag_run
- else:
- dag_run = context["dag_run"]
- return dag_run.clear_number
- return task_instance.dag_run.clear_number
+ dr = _get_dagrun_from_ti(task_instance=task_instance)
+ return dr.clear_number
def _get_logical_date(task_instance):
- # todo: remove when min airflow version >= 3.0
if AIRFLOW_V_3_0_PLUS:
- context = task_instance.get_template_context()
- if hasattr(task_instance, "dag_run"):
- dag_run = task_instance.dag_run
- else:
- dag_run = context["dag_run"]
- if hasattr(dag_run, "logical_date") and dag_run.logical_date:
- date = dag_run.logical_date
- else:
- date = dag_run.run_after
- elif hasattr(task_instance, "logical_date"):
- date = task_instance.logical_date
- else:
- date = task_instance.execution_date
- return date
+ dr = _get_dagrun_from_ti(task_instance=task_instance)
+ if getattr(dr, "logical_date", None):
+ return dr.logical_date
+ return dr.run_after
+ if getattr(task_instance, "logical_date", None):
+ return task_instance.logical_date
+ return task_instance.execution_date
diff --git
a/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py
b/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py
index 964cac27254..8e422d84ac6 100644
---
a/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py
+++
b/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py
@@ -28,6 +28,7 @@ if not conf.is_disabled():
lineage_job_namespace,
lineage_parent_id,
lineage_root_job_name,
+ lineage_root_job_namespace,
lineage_root_parent_id,
lineage_root_run_id,
lineage_run_id,
@@ -51,6 +52,7 @@ class OpenLineageProviderPlugin(AirflowPlugin):
lineage_parent_id,
lineage_root_run_id,
lineage_root_job_name,
+ lineage_root_job_namespace,
lineage_root_parent_id,
]
listeners = [get_openlineage_listener()]
diff --git
a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
index bd3901e84e7..ac86ce28453 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
@@ -147,27 +147,175 @@ def get_job_name(task: TaskInstance |
RuntimeTaskInstance) -> str:
return f"{task.dag_id}.{task.task_id}"
-def get_task_parent_run_facet(
- parent_run_id: str, parent_job_name: str, parent_job_namespace: str =
conf.namespace()
+def _get_parent_run_facet(
+ parent_run_id: str,
+ parent_job_name: str,
+ parent_job_namespace: str = conf.namespace(),
+ root_parent_run_id: str | None = None,
+ root_parent_job_name: str | None = None,
+ root_parent_job_namespace: str | None = None,
) -> dict[str, Any]:
- """
- Retrieve the parent run facet for task-level events.
-
- This facet currently always points to the DAG-level run ID and name,
- as external events for DAG runs are not yet handled.
- """
+ """Create the parent run facet from identifiers."""
+ root_parent_run_id = root_parent_run_id or parent_run_id
+ root_parent_job_name = root_parent_job_name or parent_job_name
+ root_parent_job_namespace = root_parent_job_namespace or
parent_job_namespace
return {
"parent": parent_run.ParentRunFacet(
run=parent_run.Run(runId=parent_run_id),
job=parent_run.Job(namespace=parent_job_namespace,
name=parent_job_name),
root=parent_run.Root(
- run=parent_run.RootRun(runId=parent_run_id),
- job=parent_run.RootJob(namespace=parent_job_namespace,
name=parent_job_name),
+ run=parent_run.RootRun(runId=root_parent_run_id),
+ job=parent_run.RootJob(namespace=root_parent_job_namespace,
name=root_parent_job_name),
),
)
}
+def get_task_parent_run_facet(
+ parent_run_id: str,
+ parent_job_name: str,
+ parent_job_namespace: str = conf.namespace(),
+ root_parent_run_id: str | None = None,
+ root_parent_job_name: str | None = None,
+ root_parent_job_namespace: str | None = None,
+) -> dict[str, Any]:
+ """Retrieve the parent run facet."""
+ return _get_parent_run_facet(
+ parent_run_id=parent_run_id,
+ parent_job_namespace=parent_job_namespace,
+ parent_job_name=parent_job_name,
+ root_parent_run_id=root_parent_run_id,
+ root_parent_job_namespace=root_parent_job_namespace,
+ root_parent_job_name=root_parent_job_name,
+ )
+
+
+def _get_openlineage_data_from_dagrun_conf(dr_conf: dict | None) -> dict:
+ """Return the 'openlineage' section from a DAG run config if valid,
otherwise an empty dict."""
+ if not dr_conf or not isinstance(dr_conf, dict):
+ return {}
+ ol_data = dr_conf.get("openlineage")
+ return ol_data if isinstance(ol_data, dict) else {}
+
+
+def get_root_information_from_dagrun_conf(dr_conf: dict | None) -> dict[str,
str]:
+ """Extract root parent run and job information from a DAG run config."""
+ ol_data = _get_openlineage_data_from_dagrun_conf(dr_conf)
+ if not ol_data:
+ log.debug("No 'openlineage' data found in DAG run config.")
+ return {}
+
+ root_run_id = ol_data.get("rootParentRunId", "")
+ root_namespace = ol_data.get("rootParentJobNamespace", "")
+ root_name = ol_data.get("rootParentJobName", "")
+
+ all_root_info = (root_run_id, root_namespace, root_name)
+ if not all(all_root_info):
+ if any(all_root_info):
+ log.warning(
+ "Incomplete root OpenLineage information in DAG run config. "
+ "No root information will be used. Found values: "
+ "rootParentRunId='%s', rootParentJobNamespace='%s',
rootParentJobName='%s'.",
+ root_run_id,
+ root_namespace,
+ root_name,
+ )
+ else:
+ log.debug("No 'openlineage' root information found in DAG run
config.")
+ return {}
+
+ try: # Validate that runId is correct UUID
+ parent_run.RootRun(runId=root_run_id)
+ except ValueError:
+ log.warning(
+ "Invalid OpenLineage rootParentRunId '%s' in DAG run config -
expected a valid UUID.",
+ root_run_id,
+ )
+ return {}
+
+ log.debug(
+ "Extracted valid root OpenLineage identifiers from DAG run config: "
+ "rootParentRunId='%s', rootParentJobNamespace='%s',
rootParentJobName='%s'.",
+ root_run_id,
+ root_namespace,
+ root_name,
+ )
+ return {
+ "root_parent_run_id": root_run_id,
+ "root_parent_job_namespace": root_namespace,
+ "root_parent_job_name": root_name,
+ }
+
+
+def get_dag_parent_run_facet(dr_conf: dict | None) -> dict[str,
parent_run.ParentRunFacet]:
+ """Build the OpenLineage parent run facet from a DAG run config."""
+ ol_data = _get_openlineage_data_from_dagrun_conf(dr_conf)
+ if not ol_data:
+ log.debug("No 'openlineage' data found in DAG run config.")
+ return {}
+
+ parent_run_id = ol_data.get("parentRunId", "")
+ parent_job_namespace = ol_data.get("parentJobNamespace", "")
+ parent_job_name = ol_data.get("parentJobName", "")
+
+ all_parent_info = (parent_run_id, parent_job_namespace, parent_job_name)
+ if not all(all_parent_info):
+ if any(all_parent_info):
+ log.warning(
+ "Incomplete parent OpenLineage information in DAG run config. "
+ "ParentRunFacet will NOT be created. Found values: "
+ "parentRunId='%s', parentJobNamespace='%s',
parentJobName='%s'.",
+ parent_run_id,
+ parent_job_namespace,
+ parent_job_name,
+ )
+ else:
+ log.debug("No 'openlineage' parent information found in DAG run
config.")
+ return {}
+
+ try: # Validate that runId is correct UUID
+ parent_run.RootRun(runId=parent_run_id)
+ except ValueError:
+ log.warning(
+ "Invalid OpenLineage parentRunId '%s' in DAG run config - expected
a valid UUID.",
+ parent_run_id,
+ )
+ return {}
+
+ log.debug(
+ "Extracted valid parent OpenLineage identifiers from DAG run config: "
+ "parentRunId='%s', parentJobNamespace='%s', parentJobName='%s'.",
+ parent_run_id,
+ parent_job_namespace,
+ parent_job_name,
+ )
+
+ root_info = get_root_information_from_dagrun_conf(dr_conf)
+ if root_info and all(root_info.values()):
+ root_parent_run_id = root_info["root_parent_run_id"]
+ root_parent_job_namespace = root_info["root_parent_job_namespace"]
+ root_parent_job_name = root_info["root_parent_job_name"]
+ else:
+ log.debug(
+ "Missing OpenLineage root identifiers in DAG run config, "
+ "parent identifiers will be used as root instead."
+ )
+ root_parent_run_id, root_parent_job_namespace, root_parent_job_name = (
+ parent_run_id,
+ parent_job_namespace,
+ parent_job_name,
+ )
+
+ return _get_parent_run_facet(
+ parent_run_id=parent_run_id,
+ parent_job_namespace=parent_job_namespace,
+ parent_job_name=parent_job_name,
+ root_parent_run_id=root_parent_run_id,
+ root_parent_job_namespace=root_parent_job_namespace,
+ root_parent_job_name=root_parent_job_name,
+ )
+
+
def _truncate_string_to_byte_size(s: str, max_size: int = _MAX_DOC_BYTES) ->
str:
"""
Truncate a string to a maximum UTF-8 byte size, ensuring valid encoding.
diff --git
a/providers/openlineage/tests/system/openlineage/example_openlineage_trigger_dag.py
b/providers/openlineage/tests/system/openlineage/example_openlineage_trigger_dag.py
index 513898a51f9..fb524344bab 100644
---
a/providers/openlineage/tests/system/openlineage/example_openlineage_trigger_dag.py
+++
b/providers/openlineage/tests/system/openlineage/example_openlineage_trigger_dag.py
@@ -20,6 +20,7 @@ Simple DAG that triggers another simple DAG.
It checks:
- task's trigger_dag_id
- DAGRun START and COMPLETE events, for the triggered DAG
+ - propagation of OL parent and root info from DAGRun conf
"""
from __future__ import annotations
@@ -47,7 +48,17 @@ with DAG(
trigger_dag_id="openlineage_trigger_dag_child__notrigger",
trigger_run_id=f"openlineage_trigger_dag_triggering_child_{datetime.now().isoformat()}",
wait_for_completion=True,
- conf={"some_config": "value1"},
+ conf={
+ "some_config": "value1",
+ "openlineage": {
+ "parentRunId": "3bb703d1-09c1-4a42-8da5-35a0b3216072",
+ "parentJobNamespace": "prod_biz",
+ "parentJobName": "get_files",
+ "rootParentRunId": "9d3b14f7-de91-40b6-aeef-e887e2c7673e",
+ "rootParentJobNamespace": "prod_analytics",
+ "rootParentJobName": "generate_report_sales_e2e",
+ },
+ },
poke_interval=10,
)
diff --git
a/providers/openlineage/tests/system/openlineage/expected_events/openlineage_trigger_dag.json
b/providers/openlineage/tests/system/openlineage/expected_events/openlineage_trigger_dag.json
index 45f742efad6..3695408b28f 100644
---
a/providers/openlineage/tests/system/openlineage/expected_events/openlineage_trigger_dag.json
+++
b/providers/openlineage/tests/system/openlineage/expected_events/openlineage_trigger_dag.json
@@ -21,7 +21,15 @@
},
"dagRun": {
"conf": {
- "some_config": "value1"
+ "some_config": "value1",
+ "openlineage": {
+ "parentRunId":
"3bb703d1-09c1-4a42-8da5-35a0b3216072",
+ "parentJobNamespace": "prod_biz",
+ "parentJobName": "get_files",
+ "rootParentRunId":
"9d3b14f7-de91-40b6-aeef-e887e2c7673e",
+ "rootParentJobNamespace": "prod_analytics",
+ "rootParentJobName":
"generate_report_sales_e2e"
+ }
},
"dag_id": "openlineage_trigger_dag_child__notrigger",
"data_interval_end": "{{ is_datetime(result) }}",
@@ -34,6 +42,26 @@
"_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
"_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunFacet\")
}}"
},
+ "parent": {
+ "job": {
+ "namespace": "prod_biz",
+ "name": "get_files"
+ },
+ "run": {
+ "runId": "3bb703d1-09c1-4a42-8da5-35a0b3216072"
+ },
+ "root": {
+ "job": {
+ "name": "generate_report_sales_e2e",
+ "namespace": "prod_analytics"
+ },
+ "run": {
+ "runId": "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
+ }
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
}}"
+ },
"nominalTime": {
"nominalEndTime": "{{ is_datetime(result) }}",
"nominalStartTime": "{{ is_datetime(result) }}",
@@ -153,7 +181,15 @@
},
"dagRun": {
"conf": {
- "some_config": "value1"
+ "some_config": "value1",
+ "openlineage": {
+ "parentRunId":
"3bb703d1-09c1-4a42-8da5-35a0b3216072",
+ "parentJobNamespace": "prod_biz",
+ "parentJobName": "get_files",
+ "rootParentRunId":
"9d3b14f7-de91-40b6-aeef-e887e2c7673e",
+ "rootParentJobNamespace": "prod_analytics",
+ "rootParentJobName":
"generate_report_sales_e2e"
+ }
},
"dag_id": "openlineage_trigger_dag_child__notrigger",
"data_interval_end": "{{ is_datetime(result) }}",
@@ -167,6 +203,26 @@
"_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
"_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunFacet\")
}}"
},
+ "parent": {
+ "job": {
+ "namespace": "prod_biz",
+ "name": "get_files"
+ },
+ "run": {
+ "runId": "3bb703d1-09c1-4a42-8da5-35a0b3216072"
+ },
+ "root": {
+ "job": {
+ "name": "generate_report_sales_e2e",
+ "namespace": "prod_analytics"
+ },
+ "run": {
+ "runId": "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
+ }
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
}}"
+ },
"nominalTime": {
"nominalEndTime": "{{ is_datetime(result) }}",
"nominalStartTime": "{{ is_datetime(result) }}",
@@ -237,6 +293,66 @@
}
}
},
+ {
+ "eventType": "START",
+ "run": {
+ "facets": {
+ "parent": {
+ "job": {
+ "namespace": "{{ result is string }}",
+ "name": "openlineage_trigger_dag_child__notrigger"
+ },
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ },
+ "root": {
+ "job": {
+ "name": "generate_report_sales_e2e",
+ "namespace": "prod_analytics"
+ },
+ "run": {
+ "runId": "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
+ }
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
}}"
+ }
+ }
+ },
+ "job": {
+ "name": "openlineage_trigger_dag_child__notrigger.do_nothing_task"
+ }
+ },
+ {
+ "eventType": "COMPLETE",
+ "run": {
+ "facets": {
+ "parent": {
+ "job": {
+ "namespace": "{{ result is string }}",
+ "name": "openlineage_trigger_dag_child__notrigger"
+ },
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ },
+ "root": {
+ "job": {
+ "name": "generate_report_sales_e2e",
+ "namespace": "prod_analytics"
+ },
+ "run": {
+ "runId": "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
+ }
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
}}"
+ }
+ }
+ },
+ "job": {
+ "name": "openlineage_trigger_dag_child__notrigger.do_nothing_task"
+ }
+ },
{
"eventType": "START",
"run": {
@@ -245,6 +361,26 @@
"task": {
"trigger_dag_id":
"openlineage_trigger_dag_child__notrigger"
}
+ },
+ "parent": {
+ "job": {
+ "namespace": "{{ result is string }}",
+ "name": "openlineage_trigger_dag"
+ },
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ },
+ "root": {
+ "job": {
+ "name": "openlineage_trigger_dag",
+ "namespace": "{{ result is string }}"
+ },
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ }
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
}}"
}
}
},
@@ -260,6 +396,26 @@
"task": {
"trigger_dag_id":
"openlineage_trigger_dag_child__notrigger"
}
+ },
+ "parent": {
+ "job": {
+ "namespace": "{{ result is string }}",
+ "name": "openlineage_trigger_dag"
+ },
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ },
+ "root": {
+ "job": {
+ "name": "openlineage_trigger_dag",
+ "namespace": "{{ result is string }}"
+ },
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ }
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
}}"
}
}
},
diff --git
a/providers/openlineage/tests/unit/openlineage/plugins/test_macros.py
b/providers/openlineage/tests/unit/openlineage/plugins/test_macros.py
index 520421be768..3cdde5d183f 100644
--- a/providers/openlineage/tests/unit/openlineage/plugins/test_macros.py
+++ b/providers/openlineage/tests/unit/openlineage/plugins/test_macros.py
@@ -21,12 +21,13 @@ from unittest import mock
import pytest
-from airflow import __version__
from airflow.providers.openlineage.conf import namespace
from airflow.providers.openlineage.plugins.macros import (
lineage_job_name,
lineage_job_namespace,
lineage_parent_id,
+ lineage_root_job_name,
+ lineage_root_job_namespace,
lineage_root_run_id,
lineage_run_id,
)
@@ -35,10 +36,10 @@ from tests_common.test_utils.version_compat import
AIRFLOW_V_3_0_PLUS
_DAG_NAMESPACE = namespace()
-if __version__.startswith("2."):
- LOGICAL_DATE_KEY = "execution_date"
-else:
+if AIRFLOW_V_3_0_PLUS:
LOGICAL_DATE_KEY = "logical_date"
+else:
+ LOGICAL_DATE_KEY = "execution_date"
def test_lineage_job_namespace():
@@ -130,3 +131,204 @@ def
test_lineage_root_run_id_with_runtime_task_instance(create_runtime_ti):
assert lineage_root_run_id(runtime_ti) is not None
except AttributeError as e:
pytest.fail(f"lineage_root_run_id should not throw AttributeError with
RuntimeTaskInstance: {e}")
+
+
[email protected](not AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow
3.0+")
+def test_lineage_root_run_id_no_conf_af3(create_runtime_ti):
+ from airflow.providers.common.compat.sdk import BaseOperator
+
+ task = BaseOperator(task_id="test_task")
+
+ runtime_ti = create_runtime_ti(
+ task=task,
+ dag_id="test_dag",
+ run_id="test_run_id",
+ conf=None,
+ )
+
+ result = lineage_root_run_id(runtime_ti)
+ assert result == "01937fbb-4680-70b3-b49b-1de6b041527a"
+
+
[email protected](not AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow
3.0+")
+def test_lineage_root_run_id_with_conf_af3(create_runtime_ti):
+ from airflow.providers.common.compat.sdk import BaseOperator
+
+ task = BaseOperator(task_id="test_task")
+
+ runtime_ti = create_runtime_ti(
+ task=task,
+ dag_id="test_dag",
+ run_id="test_run_id",
+ conf={
+ "openlineage": {
+ "rootParentRunId": "22222222-2222-2222-2222-222222222222",
+ "rootParentJobNamespace": "rootns",
+ "rootParentJobName": "rootjob",
+ }
+ },
+ )
+
+ result = lineage_root_run_id(runtime_ti)
+ assert result == "22222222-2222-2222-2222-222222222222"
+
+
+def test_lineage_root_run_id_without_conf_af2():
+ date = datetime(2020, 1, 1, 1, 1, 1, 0, tzinfo=timezone.utc)
+ conf = {}
+ dag_run = mock.MagicMock(run_id="run_id", conf=conf)
+ dag_run.logical_date = date
+ dag_run.clear_number = 1
+ task_instance = mock.MagicMock(
+ dag_id="dag_id",
+ task_id="task_id",
+ dag_run=dag_run,
+ logical_date=date,
+ try_number=1,
+ )
+
+ call_result1 = lineage_root_run_id(task_instance)
+ call_result2 = lineage_root_run_id(task_instance)
+
+ # random part value does not matter, it just has to be the same for the
same TaskInstance
+ assert call_result1 == call_result2
+ assert call_result1 == "016f5e9e-c4c8-7c30-9eda-d9c646d633ea"
+
+
+def test_lineage_root_run_id_with_conf_af2():
+ conf = {
+ "openlineage": {
+ "rootParentRunId": "22222222-2222-2222-2222-222222222222",
+ "rootParentJobNamespace": "rootns",
+ "rootParentJobName": "rootjob",
+ }
+ }
+ task_instance = mock.MagicMock(
+ dag_run=mock.MagicMock(conf=conf),
+ )
+
+ call_result1 = lineage_root_run_id(task_instance)
+ call_result2 = lineage_root_run_id(task_instance)
+
+ assert call_result1 == call_result2
+ assert call_result1 == "22222222-2222-2222-2222-222222222222"
+
+
[email protected](not AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow
3.0+")
+def test_lineage_root_job_name_no_conf_af3(create_runtime_ti):
+ from airflow.providers.common.compat.sdk import BaseOperator
+
+ task = BaseOperator(task_id="test_task")
+
+ runtime_ti = create_runtime_ti(
+ task=task,
+ dag_id="test_dag",
+ run_id="test_run_id",
+ conf=None,
+ )
+
+ result = lineage_root_job_name(runtime_ti)
+ assert result == "test_dag"
+
+
[email protected](not AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow
3.0+")
+def test_lineage_root_job_name_with_conf_af3(create_runtime_ti):
+ from airflow.providers.common.compat.sdk import BaseOperator
+
+ task = BaseOperator(task_id="test_task")
+
+ runtime_ti = create_runtime_ti(
+ task=task,
+ dag_id="test_dag",
+ run_id="test_run_id",
+ conf={
+ "openlineage": {
+ "rootParentRunId": "22222222-2222-2222-2222-222222222222",
+ "rootParentJobNamespace": "rootns",
+ "rootParentJobName": "rootjob",
+ }
+ },
+ )
+
+ result = lineage_root_job_name(runtime_ti)
+ assert result == "rootjob"
+
+
+def test_lineage_root_job_name_without_conf_af2():
+ task_instance = mock.MagicMock(
+ dag_id="dag_id",
+ dag_run=mock.MagicMock(conf={}),
+ )
+
+ result = lineage_root_job_name(task_instance)
+ assert result == "dag_id"
+
+
+def test_lineage_root_job_name_with_conf_af2():
+ conf = {
+ "openlineage": {
+ "rootParentRunId": "22222222-2222-2222-2222-222222222222",
+ "rootParentJobNamespace": "rootns",
+ "rootParentJobName": "rootjob",
+ }
+ }
+ task_instance = mock.MagicMock(dag_run=mock.MagicMock(conf=conf))
+
+ result = lineage_root_job_name(task_instance)
+ assert result == "rootjob"
+
+
[email protected](not AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow
3.0+")
+def test_lineage_root_job_namespace_no_conf_af3(create_runtime_ti):
+ from airflow.providers.common.compat.sdk import BaseOperator
+
+ task = BaseOperator(task_id="test_task")
+
+ runtime_ti = create_runtime_ti(task=task, dag_id="test_dag",
run_id="test_run_id", conf=None)
+
+ result = lineage_root_job_namespace(runtime_ti)
+ assert result == _DAG_NAMESPACE
+
+
[email protected](not AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow
3.0+")
+def test_lineage_root_job_namespace_with_conf_af3(create_runtime_ti):
+ from airflow.providers.common.compat.sdk import BaseOperator
+
+ task = BaseOperator(task_id="test_task")
+
+ runtime_ti = create_runtime_ti(
+ task=task,
+ dag_id="test_dag",
+ run_id="test_run_id",
+ conf={
+ "openlineage": {
+ "rootParentRunId": "22222222-2222-2222-2222-222222222222",
+ "rootParentJobNamespace": "rootns",
+ "rootParentJobName": "rootjob",
+ }
+ },
+ )
+
+ result = lineage_root_job_namespace(runtime_ti)
+ assert result == "rootns"
+
+
+def test_lineage_root_job_namespace_without_conf_af2():
+ task_instance = mock.MagicMock(dag_run=mock.MagicMock(conf={}))
+
+ result = lineage_root_job_namespace(task_instance)
+ assert result == _DAG_NAMESPACE
+
+
+def test_lineage_root_job_namespace_with_conf_af2():
+ conf = {
+ "openlineage": {
+ "rootParentRunId": "22222222-2222-2222-2222-222222222222",
+ "rootParentJobNamespace": "rootns",
+ "rootParentJobName": "rootjob",
+ }
+ }
+ task_instance = mock.MagicMock(dag_run=mock.MagicMock(conf=conf))
+
+ result = lineage_root_job_namespace(task_instance)
+ assert result == "rootns"
diff --git a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
index 3bbea4aafac..198280426a3 100644
--- a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
+++ b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
@@ -24,6 +24,7 @@ from unittest.mock import MagicMock, PropertyMock, patch
import pendulum
import pytest
+from openlineage.client.facet_v2 import parent_run
from uuid6 import uuid7
from airflow import DAG
@@ -31,6 +32,7 @@ from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance, TaskInstanceState
from airflow.providers.common.compat.assets import Asset
from airflow.providers.common.compat.sdk import BaseOperator, TaskGroup, task,
timezone
+from airflow.providers.openlineage.conf import namespace
from airflow.providers.openlineage.plugins.facets import AirflowDagRunFacet,
AirflowJobFacet
from airflow.providers.openlineage.utils.utils import (
_MAX_DOC_BYTES,
@@ -40,6 +42,7 @@ from airflow.providers.openlineage.utils.utils import (
TaskInfo,
TaskInfoComplete,
TaskInstanceInfo,
+ _get_openlineage_data_from_dagrun_conf,
_get_task_groups_details,
_get_tasks_details,
_truncate_string_to_byte_size,
@@ -47,11 +50,14 @@ from airflow.providers.openlineage.utils.utils import (
get_airflow_job_facet,
get_airflow_state_run_facet,
get_dag_documentation,
+ get_dag_parent_run_facet,
get_fully_qualified_class_name,
get_job_name,
get_operator_class,
get_operator_provider_version,
+ get_root_information_from_dagrun_conf,
get_task_documentation,
+ get_task_parent_run_facet,
get_user_provided_run_facets,
)
from airflow.providers.standard.operators.empty import EmptyOperator
@@ -465,6 +471,233 @@ def test_get_operator_class_mapped_operator():
assert op_class == MockOperator
[email protected]("dr_conf", (None, {}))
+def test_get_openlineage_data_from_dagrun_conf_none_conf(dr_conf):
+ _dr_conf = None if dr_conf is None else {}
+ assert _get_openlineage_data_from_dagrun_conf(dr_conf) == {}
+ assert dr_conf == _dr_conf # Assert conf is not changed
+
+
+def test_get_openlineage_data_from_dagrun_conf_no_openlineage_key():
+ dr_conf = {"something_else": {"a": 1}}
+ assert _get_openlineage_data_from_dagrun_conf(dr_conf) == {}
+ assert dr_conf == {"something_else": {"a": 1}} # Assert conf is not
changed
+
+
+def test_get_openlineage_data_from_dagrun_conf_invalid_type():
+ dr_conf = {"openlineage": "not_a_dict"}
+ assert _get_openlineage_data_from_dagrun_conf(dr_conf) == {}
+ assert dr_conf == {"openlineage": "not_a_dict"} # Assert conf is not
changed
+
+
+def test_get_openlineage_data_from_dagrun_conf_valid_dict():
+ dr_conf = {"openlineage": {"key": "value"}}
+ assert _get_openlineage_data_from_dagrun_conf(dr_conf) == {"key": "value"}
+ assert dr_conf == {"openlineage": {"key": "value"}} # Assert conf is not
changed
+
+
[email protected]("dr_conf", (None, {}))
+def test_get_root_information_from_dagrun_conf_no_conf(dr_conf):
+ _dr_conf = None if dr_conf is None else {}
+ assert get_root_information_from_dagrun_conf(dr_conf) == {}
+ assert dr_conf == _dr_conf # Assert conf is not changed
+
+
+def test_get_root_information_from_dagrun_conf_no_openlineage():
+ dr_conf = {"something": "else"}
+ assert get_root_information_from_dagrun_conf(dr_conf) == {}
+ assert dr_conf == {"something": "else"} # Assert conf is not changed
+
+
+def test_get_root_information_from_dagrun_conf_openlineage_not_dict():
+ dr_conf = {"openlineage": "my_value"}
+ assert get_root_information_from_dagrun_conf(dr_conf) == {}
+ assert dr_conf == {"openlineage": "my_value"} # Assert conf is not changed
+
+
+def test_get_root_information_from_dagrun_conf_missing_keys():
+ dr_conf = {"openlineage": {"rootParentRunId": "id_only"}}
+ assert get_root_information_from_dagrun_conf(dr_conf) == {}
+ assert dr_conf == {"openlineage": {"rootParentRunId": "id_only"}} #
Assert conf is not changed
+
+
+def test_get_root_information_from_dagrun_conf_invalid_run_id():
+ dr_conf = {
+ "openlineage": {
+ "rootParentRunId": "not_uuid",
+ "rootParentJobNamespace": "ns",
+ "rootParentJobName": "jobX",
+ }
+ }
+ assert get_root_information_from_dagrun_conf(dr_conf) == {}
+ assert dr_conf == { # Assert conf is not changed
+ "openlineage": {
+ "rootParentRunId": "not_uuid",
+ "rootParentJobNamespace": "ns",
+ "rootParentJobName": "jobX",
+ }
+ }
+
+
+def test_get_root_information_from_dagrun_conf_valid_data():
+ dr_conf = {
+ "openlineage": {
+ "rootParentRunId": "11111111-1111-1111-1111-111111111111",
+ "rootParentJobNamespace": "ns",
+ "rootParentJobName": "jobX",
+ }
+ }
+ expected = {
+ "root_parent_run_id": "11111111-1111-1111-1111-111111111111",
+ "root_parent_job_namespace": "ns",
+ "root_parent_job_name": "jobX",
+ }
+ assert get_root_information_from_dagrun_conf(dr_conf) == expected
+ assert dr_conf == { # Assert conf is not changed
+ "openlineage": {
+ "rootParentRunId": "11111111-1111-1111-1111-111111111111",
+ "rootParentJobNamespace": "ns",
+ "rootParentJobName": "jobX",
+ }
+ }
+
+
[email protected]("dr_conf", (None, {}))
+def test_get_dag_parent_run_facet_no_conf(dr_conf):
+ _dr_conf = None if dr_conf is None else {}
+ assert get_dag_parent_run_facet(dr_conf) == {}
+ assert dr_conf == _dr_conf # Assert conf is not changed
+
+
+def test_get_dag_parent_run_facet_missing_keys():
+ dr_conf = {"openlineage": {"parentRunId":
"11111111-1111-1111-1111-111111111111"}}
+ assert get_dag_parent_run_facet(dr_conf) == {}
+ # Assert conf is not changed
+ assert dr_conf == {"openlineage": {"parentRunId":
"11111111-1111-1111-1111-111111111111"}}
+
+
+def test_get_dag_parent_run_facet_valid_no_root():
+ dr_conf = {
+ "openlineage": {
+ "parentRunId": "11111111-1111-1111-1111-111111111111",
+ "parentJobNamespace": "ns",
+ "parentJobName": "jobA",
+ }
+ }
+
+ result = get_dag_parent_run_facet(dr_conf)
+ parent_facet = result.get("parent")
+
+ assert isinstance(parent_facet, parent_run.ParentRunFacet)
+ assert parent_facet.run.runId == "11111111-1111-1111-1111-111111111111"
+ assert parent_facet.job.namespace == "ns"
+ assert parent_facet.job.name == "jobA"
+ assert parent_facet.root is not None # parent is used as root, since root
is missing
+ assert parent_facet.root.run.runId ==
"11111111-1111-1111-1111-111111111111"
+ assert parent_facet.root.job.namespace == "ns"
+ assert parent_facet.root.job.name == "jobA"
+
+ assert dr_conf == { # Assert conf is not changed
+ "openlineage": {
+ "parentRunId": "11111111-1111-1111-1111-111111111111",
+ "parentJobNamespace": "ns",
+ "parentJobName": "jobA",
+ }
+ }
+
+
+def test_get_dag_parent_run_facet_invalid_uuid():
+ dr_conf = {
+ "openlineage": {
+ "parentRunId": "not_uuid",
+ "parentJobNamespace": "ns",
+ "parentJobName": "jobA",
+ }
+ }
+
+ result = get_dag_parent_run_facet(dr_conf)
+ assert result == {}
+ assert dr_conf == { # Assert conf is not changed
+ "openlineage": {
+ "parentRunId": "not_uuid",
+ "parentJobNamespace": "ns",
+ "parentJobName": "jobA",
+ }
+ }
+
+
+def test_get_dag_parent_run_facet_valid_with_root():
+ dr_conf = {
+ "openlineage": {
+ "parentRunId": "11111111-1111-1111-1111-111111111111",
+ "parentJobNamespace": "ns",
+ "parentJobName": "jobA",
+ "rootParentRunId": "22222222-2222-2222-2222-222222222222",
+ "rootParentJobNamespace": "rootns",
+ "rootParentJobName": "rootjob",
+ }
+ }
+
+ result = get_dag_parent_run_facet(dr_conf)
+ parent_facet = result.get("parent")
+
+ assert isinstance(parent_facet, parent_run.ParentRunFacet)
+ assert parent_facet.run.runId == "11111111-1111-1111-1111-111111111111"
+ assert parent_facet.job.namespace == "ns"
+ assert parent_facet.job.name == "jobA"
+ assert parent_facet.root is not None
+ assert parent_facet.root.run.runId ==
"22222222-2222-2222-2222-222222222222"
+ assert parent_facet.root.job.namespace == "rootns"
+ assert parent_facet.root.job.name == "rootjob"
+
+ assert dr_conf == { # Assert conf is not changed
+ "openlineage": {
+ "parentRunId": "11111111-1111-1111-1111-111111111111",
+ "parentJobNamespace": "ns",
+ "parentJobName": "jobA",
+ "rootParentRunId": "22222222-2222-2222-2222-222222222222",
+ "rootParentJobNamespace": "rootns",
+ "rootParentJobName": "rootjob",
+ }
+ }
+
+
+def test_get_task_parent_run_facet_defaults():
+ result = get_task_parent_run_facet(
+ parent_run_id="11111111-1111-1111-1111-111111111111",
+ parent_job_name="jobA",
+ )
+ parent_facet = result.get("parent")
+
+ assert isinstance(parent_facet, parent_run.ParentRunFacet)
+ assert parent_facet.run.runId == "11111111-1111-1111-1111-111111111111"
+ assert parent_facet.job.namespace == namespace()
+ assert parent_facet.job.name == "jobA"
+ assert parent_facet.root.run.runId ==
"11111111-1111-1111-1111-111111111111"
+ assert parent_facet.root.job.namespace == namespace()
+ assert parent_facet.root.job.name == "jobA"
+
+
+def test_get_task_parent_run_facet_custom_root_values():
+ result = get_task_parent_run_facet(
+ parent_run_id="11111111-1111-1111-1111-111111111111",
+ parent_job_name="jobA",
+ parent_job_namespace="ns",
+ root_parent_run_id="22222222-2222-2222-2222-222222222222",
+ root_parent_job_name="rjob",
+ root_parent_job_namespace="rns",
+ )
+
+ parent_facet = result.get("parent")
+ assert isinstance(parent_facet, parent_run.ParentRunFacet)
+ assert parent_facet.run.runId == "11111111-1111-1111-1111-111111111111"
+ assert parent_facet.job.namespace == "ns"
+ assert parent_facet.job.name == "jobA"
+ assert parent_facet.root.run.runId ==
"22222222-2222-2222-2222-222222222222"
+ assert parent_facet.root.job.namespace == "rns"
+ assert parent_facet.root.job.name == "rjob"
+
+
def test_get_tasks_details():
class TestMappedOperator(BaseOperator):
def __init__(self, value, **kwargs):