This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1a16b9d3d0f feat(openlineage): Add parentRunFacet for DAG events 
(#57809)
1a16b9d3d0f is described below

commit 1a16b9d3d0f59978a6f7abe60188c7cf5f1a1570
Author: Kacper Muda <[email protected]>
AuthorDate: Thu Nov 6 14:36:49 2025 +0100

    feat(openlineage): Add parentRunFacet for DAG events (#57809)
---
 providers/openlineage/docs/guides/user.rst         |  50 +++++
 .../providers/openlineage/plugins/listener.py      |  43 +++-
 .../providers/openlineage/plugins/macros.py        |  70 ++++---
 .../providers/openlineage/plugins/openlineage.py   |   2 +
 .../airflow/providers/openlineage/utils/utils.py   | 168 ++++++++++++++-
 .../openlineage/example_openlineage_trigger_dag.py |  13 +-
 .../expected_events/openlineage_trigger_dag.json   | 160 +++++++++++++-
 .../tests/unit/openlineage/plugins/test_macros.py  | 210 ++++++++++++++++++-
 .../tests/unit/openlineage/utils/test_utils.py     | 233 +++++++++++++++++++++
 9 files changed, 897 insertions(+), 52 deletions(-)

diff --git a/providers/openlineage/docs/guides/user.rst 
b/providers/openlineage/docs/guides/user.rst
index 730d7d0f2ff..50961dc08a5 100644
--- a/providers/openlineage/docs/guides/user.rst
+++ b/providers/openlineage/docs/guides/user.rst
@@ -478,6 +478,56 @@ You can enable this automation by setting 
``spark_inject_transport_info`` option
   AIRFLOW__OPENLINEAGE__SPARK_INJECT_TRANSPORT_INFO=true
 
 
+Passing parent information to Airflow DAG
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+To enable full OpenLineage lineage tracking across dependent DAGs, you can 
pass parent and root job information
+through the DAG's ``dag_run.conf``. When a DAG run configuration includes an 
``openlineage`` section with valid metadata,
+this information is automatically parsed and converted into DAG run's 
``parentRunFacet``, from which the root information
+is also propagated to all tasks. If no DAG run ``openlineage`` configuration 
is provided, the DAG run will not contain
+``parentRunFacet`` and root of all tasks will default to Dag run.
+
+The ``openlineage`` dict in conf should contain the following keys:
+
+
+*(all three values must be included to create a parent reference)*
+
+- **parentRunId** — the unique run ID (uuid) of the direct parent job
+- **parentJobName** — the name of the parent job
+- **parentJobNamespace** — the namespace of the parent job
+
+*(all three values must be included to create a root reference, otherwise 
parent will be used as root)*
+
+- **rootParentRunId** — the run ID (uuid) of the top-level (root) job
+- **rootParentJobName** — the name of the top-level (root) job
+- **rootParentJobNamespace** — the namespace of the top-level (root) job
+
+.. note::
+
+  We highly recommend providing all six OpenLineage identifiers (parent and 
root) to ensure complete lineage tracking. If the root information is missing, 
the parent set will be used as the root; if any of the three parent fields are 
missing, no parent facet will be created. Partial or mixed configurations are 
not supported - either all three parent or all three root values must be 
provided together.
+
+
+Example:
+
+.. code-block:: shell
+
+    curl -X POST "http://<AIRFLOW_HOST>/api/v2/dags/my_dag_name/dagRuns" \
+    -H "Content-Type: application/json" \
+    -d '{
+      "logical_date": "2019-08-24T14:15:22Z",
+      "conf": {
+        "openlineage": {
+          "parentRunId": "3bb703d1-09c1-4a42-8da5-35a0b3216072",
+          "parentJobNamespace": "prod_biz",
+          "parentJobName": "get_files",
+          "rootParentRunId": "9d3b14f7-de91-40b6-aeef-e887e2c7673e",
+          "rootParentJobNamespace": "prod_analytics",
+          "rootParentJobName": "generate_report_sales_e2e"
+        }
+      }
+    }'
+
+
 Troubleshooting
 ===============
 
diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py 
b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
index bd1931e3ca9..d7e87c86965 100644
--- 
a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
+++ 
b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
@@ -41,7 +41,9 @@ from airflow.providers.openlineage.utils.utils import (
     get_airflow_mapped_task_facet,
     get_airflow_run_facet,
     get_dag_documentation,
+    get_dag_parent_run_facet,
     get_job_name,
+    get_root_information_from_dagrun_conf,
     get_task_documentation,
     get_task_parent_run_facet,
     get_user_provided_run_facets,
@@ -224,7 +226,11 @@ class OpenLineageListener:
                 task=task_metadata,
                 run_facets={
                     **get_user_provided_run_facets(task_instance, 
TaskInstanceState.RUNNING),
-                    **get_task_parent_run_facet(parent_run_id=parent_run_id, 
parent_job_name=dag.dag_id),
+                    **get_task_parent_run_facet(
+                        parent_run_id=parent_run_id,
+                        parent_job_name=dag.dag_id,
+                        
**get_root_information_from_dagrun_conf(getattr(dagrun, "conf", {})),
+                    ),
                     **get_airflow_mapped_task_facet(task_instance),
                     **get_airflow_run_facet(dagrun, dag, task_instance, task, 
task_uuid),
                     **debug_facet,
@@ -351,7 +357,11 @@ class OpenLineageListener:
                 nominal_end_time=data_interval_end,
                 run_facets={
                     **get_user_provided_run_facets(task_instance, 
TaskInstanceState.SUCCESS),
-                    **get_task_parent_run_facet(parent_run_id=parent_run_id, 
parent_job_name=dag.dag_id),
+                    **get_task_parent_run_facet(
+                        parent_run_id=parent_run_id,
+                        parent_job_name=dag.dag_id,
+                        
**get_root_information_from_dagrun_conf(getattr(dagrun, "conf", {})),
+                    ),
                     **get_airflow_run_facet(dagrun, dag, task_instance, task, 
task_uuid),
                     **get_airflow_debug_facet(),
                 },
@@ -489,7 +499,11 @@ class OpenLineageListener:
                 job_description_type=doc_type,
                 run_facets={
                     **get_user_provided_run_facets(task_instance, 
TaskInstanceState.FAILED),
-                    **get_task_parent_run_facet(parent_run_id=parent_run_id, 
parent_job_name=dag.dag_id),
+                    **get_task_parent_run_facet(
+                        parent_run_id=parent_run_id,
+                        parent_job_name=dag.dag_id,
+                        
**get_root_information_from_dagrun_conf(getattr(dagrun, "conf", {})),
+                    ),
                     **get_airflow_run_facet(dagrun, dag, task_instance, task, 
task_uuid),
                     **get_airflow_debug_facet(),
                 },
@@ -540,7 +554,11 @@ class OpenLineageListener:
                 "job_description": None,
                 "job_description_type": None,
                 "run_facets": {
-                    **get_task_parent_run_facet(parent_run_id=parent_run_id, 
parent_job_name=ti.dag_id),
+                    **get_task_parent_run_facet(
+                        parent_run_id=parent_run_id,
+                        parent_job_name=ti.dag_id,
+                        
**get_root_information_from_dagrun_conf(getattr(dagrun, "conf", {})),
+                    ),
                     **get_airflow_debug_facet(),
                 },
             }
@@ -645,8 +663,6 @@ class OpenLineageListener:
             )
             data_interval_end = dag_run.data_interval_end.isoformat() if 
dag_run.data_interval_end else None
 
-            run_facets = {**get_airflow_dag_run_facet(dag_run)}
-
             date = dag_run.logical_date
             if AIRFLOW_V_3_0_PLUS and date is None:
                 date = dag_run.run_after
@@ -660,7 +676,6 @@ class OpenLineageListener:
                 start_date=dag_run.start_date,
                 nominal_start_time=data_interval_start,
                 nominal_end_time=data_interval_end,
-                run_facets=run_facets,
                 clear_number=dag_run.clear_number,
                 owners=[x.strip() for x in dag_run.dag.owner.split(",")] if 
dag_run.dag else None,
                 job_description=doc,
@@ -669,6 +684,10 @@ class OpenLineageListener:
                 # AirflowJobFacet should be created outside 
ProcessPoolExecutor that pickles objects,
                 # as it causes lack of some TaskGroup attributes and crashes 
event emission.
                 job_facets=get_airflow_job_facet(dag_run=dag_run),
+                run_facets={
+                    **get_airflow_dag_run_facet(dag_run),
+                    **get_dag_parent_run_facet(getattr(dag_run, "conf", {})),
+                },
             )
         except BaseException as e:
             self.log.warning("OpenLineage received exception in method 
on_dag_run_running", exc_info=e)
@@ -716,7 +735,10 @@ class OpenLineageListener:
                 job_description_type=doc_type,
                 task_ids=task_ids,
                 dag_run_state=dag_run.get_state(),
-                run_facets={**get_airflow_dag_run_facet(dag_run)},
+                run_facets={
+                    **get_airflow_dag_run_facet(dag_run),
+                    **get_dag_parent_run_facet(getattr(dag_run, "conf", {})),
+                },
             )
         except BaseException as e:
             self.log.warning("OpenLineage received exception in method 
on_dag_run_success", exc_info=e)
@@ -765,7 +787,10 @@ class OpenLineageListener:
                 dag_run_state=dag_run.get_state(),
                 task_ids=task_ids,
                 msg=msg,
-                run_facets={**get_airflow_dag_run_facet(dag_run)},
+                run_facets={
+                    **get_airflow_dag_run_facet(dag_run),
+                    **get_dag_parent_run_facet(getattr(dag_run, "conf", {})),
+                },
             )
         except BaseException as e:
             self.log.warning("OpenLineage received exception in method 
on_dag_run_failed", exc_info=e)
diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py 
b/providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py
index ac9c75b4a94..508c89eafda 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py
@@ -20,7 +20,7 @@ from typing import TYPE_CHECKING
 
 from airflow.providers.openlineage import conf
 from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter
-from airflow.providers.openlineage.utils.utils import get_job_name
+from airflow.providers.openlineage.utils.utils import get_job_name, 
get_root_information_from_dagrun_conf
 from airflow.providers.openlineage.version_compat import AIRFLOW_V_3_0_PLUS
 
 if TYPE_CHECKING:
@@ -102,7 +102,7 @@ def lineage_root_parent_id(task_instance: TaskInstance):
     """
     return "/".join(
         (
-            lineage_job_namespace(),
+            lineage_root_job_namespace(task_instance),
             lineage_root_job_name(task_instance),
             lineage_root_run_id(task_instance),
         )
@@ -110,10 +110,16 @@ def lineage_root_parent_id(task_instance: TaskInstance):
 
 
 def lineage_root_job_name(task_instance: TaskInstance):
+    root_parent_job_name = _get_ol_root_id("root_parent_job_name", 
task_instance)
+    if root_parent_job_name:
+        return root_parent_job_name
     return task_instance.dag_id
 
 
 def lineage_root_run_id(task_instance: TaskInstance):
+    root_parent_run_id = _get_ol_root_id("root_parent_run_id", task_instance)
+    if root_parent_run_id:
+        return root_parent_run_id
     return OpenLineageAdapter.build_dag_run_id(
         dag_id=task_instance.dag_id,
         logical_date=_get_logical_date(task_instance),
@@ -121,32 +127,44 @@ def lineage_root_run_id(task_instance: TaskInstance):
     )
 
 
+def lineage_root_job_namespace(task_instance: TaskInstance):
+    root_parent_job_namespace = _get_ol_root_id("root_parent_job_namespace", 
task_instance)
+    if root_parent_job_namespace:
+        return root_parent_job_namespace
+    return conf.namespace()
+
+
+def _get_ol_root_id(id_key: str, task_instance: TaskInstance) -> str | None:
+    dr_conf = _get_dag_run_conf(task_instance=task_instance)
+    ol_root_info = get_root_information_from_dagrun_conf(dr_conf=dr_conf)
+    if ol_root_info and ol_root_info.get(id_key):
+        return ol_root_info[id_key]
+    return None
+
+
+def _get_dagrun_from_ti(task_instance: TaskInstance):
+    context = task_instance.get_template_context()
+    if getattr(task_instance, "dag_run", None):
+        return task_instance.dag_run
+    return context["dag_run"]
+
+
+def _get_dag_run_conf(task_instance: TaskInstance) -> dict:
+    dr = _get_dagrun_from_ti(task_instance=task_instance)
+    return dr.conf or {}
+
+
 def _get_dag_run_clear_number(task_instance: TaskInstance):
-    # todo: remove when min airflow version >= 3.0
-    if AIRFLOW_V_3_0_PLUS:
-        context = task_instance.get_template_context()
-        if hasattr(task_instance, "dag_run"):
-            dag_run = task_instance.dag_run
-        else:
-            dag_run = context["dag_run"]
-        return dag_run.clear_number
-    return task_instance.dag_run.clear_number
+    dr = _get_dagrun_from_ti(task_instance=task_instance)
+    return dr.clear_number
 
 
 def _get_logical_date(task_instance):
-    # todo: remove when min airflow version >= 3.0
     if AIRFLOW_V_3_0_PLUS:
-        context = task_instance.get_template_context()
-        if hasattr(task_instance, "dag_run"):
-            dag_run = task_instance.dag_run
-        else:
-            dag_run = context["dag_run"]
-        if hasattr(dag_run, "logical_date") and dag_run.logical_date:
-            date = dag_run.logical_date
-        else:
-            date = dag_run.run_after
-    elif hasattr(task_instance, "logical_date"):
-        date = task_instance.logical_date
-    else:
-        date = task_instance.execution_date
-    return date
+        dr = _get_dagrun_from_ti(task_instance=task_instance)
+        if getattr(dr, "logical_date", None):
+            return dr.logical_date
+        return dr.run_after
+    if getattr(task_instance, "logical_date", None):
+        return task_instance.logical_date
+    return task_instance.execution_date
diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py
 
b/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py
index 964cac27254..8e422d84ac6 100644
--- 
a/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py
+++ 
b/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py
@@ -28,6 +28,7 @@ if not conf.is_disabled():
         lineage_job_namespace,
         lineage_parent_id,
         lineage_root_job_name,
+        lineage_root_job_namespace,
         lineage_root_parent_id,
         lineage_root_run_id,
         lineage_run_id,
@@ -51,6 +52,7 @@ class OpenLineageProviderPlugin(AirflowPlugin):
             lineage_parent_id,
             lineage_root_run_id,
             lineage_root_job_name,
+            lineage_root_job_namespace,
             lineage_root_parent_id,
         ]
         listeners = [get_openlineage_listener()]
diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py 
b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
index bd3901e84e7..ac86ce28453 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
@@ -147,27 +147,175 @@ def get_job_name(task: TaskInstance | 
RuntimeTaskInstance) -> str:
     return f"{task.dag_id}.{task.task_id}"
 
 
-def get_task_parent_run_facet(
-    parent_run_id: str, parent_job_name: str, parent_job_namespace: str = 
conf.namespace()
+def _get_parent_run_facet(
+    parent_run_id: str,
+    parent_job_name: str,
+    parent_job_namespace: str = conf.namespace(),
+    root_parent_run_id: str | None = None,
+    root_parent_job_name: str | None = None,
+    root_parent_job_namespace: str | None = None,
 ) -> dict[str, Any]:
-    """
-    Retrieve the parent run facet for task-level events.
-
-    This facet currently always points to the DAG-level run ID and name,
-    as external events for DAG runs are not yet handled.
-    """
+    """Create the parent run facet from identifiers."""
+    root_parent_run_id = root_parent_run_id or parent_run_id
+    root_parent_job_name = root_parent_job_name or parent_job_name
+    root_parent_job_namespace = root_parent_job_namespace or 
parent_job_namespace
     return {
         "parent": parent_run.ParentRunFacet(
             run=parent_run.Run(runId=parent_run_id),
             job=parent_run.Job(namespace=parent_job_namespace, 
name=parent_job_name),
             root=parent_run.Root(
-                run=parent_run.RootRun(runId=parent_run_id),
-                job=parent_run.RootJob(namespace=parent_job_namespace, 
name=parent_job_name),
+                run=parent_run.RootRun(runId=root_parent_run_id),
+                job=parent_run.RootJob(namespace=root_parent_job_namespace, 
name=root_parent_job_name),
             ),
         )
     }
 
 
+def get_task_parent_run_facet(
+    parent_run_id: str,
+    parent_job_name: str,
+    parent_job_namespace: str = conf.namespace(),
+    root_parent_run_id: str | None = None,
+    root_parent_job_name: str | None = None,
+    root_parent_job_namespace: str | None = None,
+) -> dict[str, Any]:
+    """Retrieve the parent run facet."""
+    return _get_parent_run_facet(
+        parent_run_id=parent_run_id,
+        parent_job_namespace=parent_job_namespace,
+        parent_job_name=parent_job_name,
+        root_parent_run_id=root_parent_run_id,
+        root_parent_job_namespace=root_parent_job_namespace,
+        root_parent_job_name=root_parent_job_name,
+    )
+
+
+def _get_openlineage_data_from_dagrun_conf(dr_conf: dict | None) -> dict:
+    """Return the 'openlineage' section from a DAG run config if valid, 
otherwise an empty dict."""
+    if not dr_conf or not isinstance(dr_conf, dict):
+        return {}
+    ol_data = dr_conf.get("openlineage")
+    return ol_data if isinstance(ol_data, dict) else {}
+
+
+def get_root_information_from_dagrun_conf(dr_conf: dict | None) -> dict[str, 
str]:
+    """Extract root parent run and job information from a DAG run config."""
+    ol_data = _get_openlineage_data_from_dagrun_conf(dr_conf)
+    if not ol_data:
+        log.debug("No 'openlineage' data found in DAG run config.")
+        return {}
+
+    root_run_id = ol_data.get("rootParentRunId", "")
+    root_namespace = ol_data.get("rootParentJobNamespace", "")
+    root_name = ol_data.get("rootParentJobName", "")
+
+    all_root_info = (root_run_id, root_namespace, root_name)
+    if not all(all_root_info):
+        if any(all_root_info):
+            log.warning(
+                "Incomplete root OpenLineage information in DAG run config. "
+                "No root information will be used. Found values: "
+                "rootParentRunId='%s', rootParentJobNamespace='%s', 
rootParentJobName='%s'.",
+                root_run_id,
+                root_namespace,
+                root_name,
+            )
+        else:
+            log.debug("No 'openlineage' root information found in DAG run 
config.")
+        return {}
+
+    try:  # Validate that runId is correct UUID
+        parent_run.RootRun(runId=root_run_id)
+    except ValueError:
+        log.warning(
+            "Invalid OpenLineage rootParentRunId '%s' in DAG run config - 
expected a valid UUID.",
+            root_run_id,
+        )
+        return {}
+
+    log.debug(
+        "Extracted valid root OpenLineage identifiers from DAG run config: "
+        "rootParentRunId='%s', rootParentJobNamespace='%s', 
rootParentJobName='%s'.",
+        root_run_id,
+        root_namespace,
+        root_name,
+    )
+    return {
+        "root_parent_run_id": root_run_id,
+        "root_parent_job_namespace": root_namespace,
+        "root_parent_job_name": root_name,
+    }
+
+
+def get_dag_parent_run_facet(dr_conf: dict | None) -> dict[str, 
parent_run.ParentRunFacet]:
+    """Build the OpenLineage parent run facet from a DAG run config."""
+    ol_data = _get_openlineage_data_from_dagrun_conf(dr_conf)
+    if not ol_data:
+        log.debug("No 'openlineage' data found in DAG run config.")
+        return {}
+
+    parent_run_id = ol_data.get("parentRunId", "")
+    parent_job_namespace = ol_data.get("parentJobNamespace", "")
+    parent_job_name = ol_data.get("parentJobName", "")
+
+    all_parent_info = (parent_run_id, parent_job_namespace, parent_job_name)
+    if not all(all_parent_info):
+        if any(all_parent_info):
+            log.warning(
+                "Incomplete parent OpenLineage information in DAG run config. "
+                "ParentRunFacet will NOT be created. Found values: "
+                "parentRunId='%s', parentJobNamespace='%s', 
parentJobName='%s'.",
+                parent_run_id,
+                parent_job_namespace,
+                parent_job_name,
+            )
+        else:
+            log.debug("No 'openlineage' parent information found in DAG run 
config.")
+        return {}
+
+    try:  # Validate that runId is correct UUID
+        parent_run.RootRun(runId=parent_run_id)
+    except ValueError:
+        log.warning(
+            "Invalid OpenLineage parentRunId '%s' in DAG run config - expected 
a valid UUID.",
+            parent_run_id,
+        )
+        return {}
+
+    log.debug(
+        "Extracted valid parent OpenLineage identifiers from DAG run config: "
+        "parentRunId='%s', parentJobNamespace='%s', parentJobName='%s'.",
+        parent_run_id,
+        parent_job_namespace,
+        parent_job_name,
+    )
+
+    root_info = get_root_information_from_dagrun_conf(dr_conf)
+    if root_info and all(root_info.values()):
+        root_parent_run_id = root_info["root_parent_run_id"]
+        root_parent_job_namespace = root_info["root_parent_job_namespace"]
+        root_parent_job_name = root_info["root_parent_job_name"]
+    else:
+        log.debug(
+            "Missing OpenLineage root identifiers in DAG run config, "
+            "parent identifiers will be used as root instead."
+        )
+        root_parent_run_id, root_parent_job_namespace, root_parent_job_name = (
+            parent_run_id,
+            parent_job_namespace,
+            parent_job_name,
+        )
+
+    return _get_parent_run_facet(
+        parent_run_id=parent_run_id,
+        parent_job_namespace=parent_job_namespace,
+        parent_job_name=parent_job_name,
+        root_parent_run_id=root_parent_run_id,
+        root_parent_job_namespace=root_parent_job_namespace,
+        root_parent_job_name=root_parent_job_name,
+    )
+
+
 def _truncate_string_to_byte_size(s: str, max_size: int = _MAX_DOC_BYTES) -> 
str:
     """
     Truncate a string to a maximum UTF-8 byte size, ensuring valid encoding.
diff --git 
a/providers/openlineage/tests/system/openlineage/example_openlineage_trigger_dag.py
 
b/providers/openlineage/tests/system/openlineage/example_openlineage_trigger_dag.py
index 513898a51f9..fb524344bab 100644
--- 
a/providers/openlineage/tests/system/openlineage/example_openlineage_trigger_dag.py
+++ 
b/providers/openlineage/tests/system/openlineage/example_openlineage_trigger_dag.py
@@ -20,6 +20,7 @@ Simple DAG that triggers another simple DAG.
 It checks:
     - task's trigger_dag_id
     - DAGRun START and COMPLETE events, for the triggered DAG
+    - propagation of OL parent and root info from DAGRun conf
 """
 
 from __future__ import annotations
@@ -47,7 +48,17 @@ with DAG(
         trigger_dag_id="openlineage_trigger_dag_child__notrigger",
         
trigger_run_id=f"openlineage_trigger_dag_triggering_child_{datetime.now().isoformat()}",
         wait_for_completion=True,
-        conf={"some_config": "value1"},
+        conf={
+            "some_config": "value1",
+            "openlineage": {
+                "parentRunId": "3bb703d1-09c1-4a42-8da5-35a0b3216072",
+                "parentJobNamespace": "prod_biz",
+                "parentJobName": "get_files",
+                "rootParentRunId": "9d3b14f7-de91-40b6-aeef-e887e2c7673e",
+                "rootParentJobNamespace": "prod_analytics",
+                "rootParentJobName": "generate_report_sales_e2e",
+            },
+        },
         poke_interval=10,
     )
 
diff --git 
a/providers/openlineage/tests/system/openlineage/expected_events/openlineage_trigger_dag.json
 
b/providers/openlineage/tests/system/openlineage/expected_events/openlineage_trigger_dag.json
index 45f742efad6..3695408b28f 100644
--- 
a/providers/openlineage/tests/system/openlineage/expected_events/openlineage_trigger_dag.json
+++ 
b/providers/openlineage/tests/system/openlineage/expected_events/openlineage_trigger_dag.json
@@ -21,7 +21,15 @@
                     },
                     "dagRun": {
                         "conf": {
-                            "some_config": "value1"
+                            "some_config": "value1",
+                            "openlineage": {
+                                "parentRunId": 
"3bb703d1-09c1-4a42-8da5-35a0b3216072",
+                                "parentJobNamespace": "prod_biz",
+                                "parentJobName": "get_files",
+                                "rootParentRunId": 
"9d3b14f7-de91-40b6-aeef-e887e2c7673e",
+                                "rootParentJobNamespace": "prod_analytics",
+                                "rootParentJobName": 
"generate_report_sales_e2e"
+                            }
                         },
                         "dag_id": "openlineage_trigger_dag_child__notrigger",
                         "data_interval_end": "{{ is_datetime(result) }}",
@@ -34,6 +42,26 @@
                     "_producer": "{{ regex_match(result, 
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
 }}",
                     "_schemaURL": "{{ regex_match(result, 
\"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunFacet\")
 }}"
                 },
+                "parent": {
+                  "job": {
+                    "namespace": "prod_biz",
+                    "name": "get_files"
+                  },
+                  "run": {
+                    "runId": "3bb703d1-09c1-4a42-8da5-35a0b3216072"
+                  },
+                  "root": {
+                    "job": {
+                      "name": "generate_report_sales_e2e",
+                      "namespace": "prod_analytics"
+                    },
+                    "run": {
+                      "runId": "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
+                    }
+                  },
+                  "_producer": "{{ regex_match(result, 
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
 }}",
+                  "_schemaURL": "{{ regex_match(result, 
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
 }}"
+                },
                 "nominalTime": {
                     "nominalEndTime": "{{ is_datetime(result) }}",
                     "nominalStartTime": "{{ is_datetime(result) }}",
@@ -153,7 +181,15 @@
                     },
                     "dagRun": {
                         "conf": {
-                            "some_config": "value1"
+                            "some_config": "value1",
+                            "openlineage": {
+                                "parentRunId": 
"3bb703d1-09c1-4a42-8da5-35a0b3216072",
+                                "parentJobNamespace": "prod_biz",
+                                "parentJobName": "get_files",
+                                "rootParentRunId": 
"9d3b14f7-de91-40b6-aeef-e887e2c7673e",
+                                "rootParentJobNamespace": "prod_analytics",
+                                "rootParentJobName": 
"generate_report_sales_e2e"
+                            }
                         },
                         "dag_id": "openlineage_trigger_dag_child__notrigger",
                         "data_interval_end": "{{ is_datetime(result) }}",
@@ -167,6 +203,26 @@
                     "_producer": "{{ regex_match(result, 
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
 }}",
                     "_schemaURL": "{{ regex_match(result, 
\"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunFacet\")
 }}"
                 },
+                "parent": {
+                  "job": {
+                    "namespace": "prod_biz",
+                    "name": "get_files"
+                  },
+                  "run": {
+                    "runId": "3bb703d1-09c1-4a42-8da5-35a0b3216072"
+                  },
+                  "root": {
+                    "job": {
+                      "name": "generate_report_sales_e2e",
+                      "namespace": "prod_analytics"
+                    },
+                    "run": {
+                      "runId": "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
+                    }
+                  },
+                  "_producer": "{{ regex_match(result, 
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
 }}",
+                  "_schemaURL": "{{ regex_match(result, 
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
 }}"
+                },
                 "nominalTime": {
                     "nominalEndTime": "{{ is_datetime(result) }}",
                     "nominalStartTime": "{{ is_datetime(result) }}",
@@ -237,6 +293,66 @@
             }
         }
     },
+    {
+        "eventType": "START",
+        "run": {
+            "facets": {
+                "parent": {
+                  "job": {
+                    "namespace": "{{ result is string }}",
+                    "name": "openlineage_trigger_dag_child__notrigger"
+                  },
+                  "run": {
+                    "runId": "{{ is_uuid(result) }}"
+                  },
+                  "root": {
+                    "job": {
+                      "name": "generate_report_sales_e2e",
+                      "namespace": "prod_analytics"
+                    },
+                    "run": {
+                      "runId": "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
+                    }
+                  },
+                  "_producer": "{{ regex_match(result, 
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
 }}",
+                  "_schemaURL": "{{ regex_match(result, 
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
 }}"
+                }
+            }
+        },
+        "job": {
+            "name": "openlineage_trigger_dag_child__notrigger.do_nothing_task"
+        }
+    },
+    {
+        "eventType": "COMPLETE",
+        "run": {
+            "facets": {
+                "parent": {
+                  "job": {
+                    "namespace": "{{ result is string }}",
+                    "name": "openlineage_trigger_dag_child__notrigger"
+                  },
+                  "run": {
+                    "runId": "{{ is_uuid(result) }}"
+                  },
+                  "root": {
+                    "job": {
+                      "name": "generate_report_sales_e2e",
+                      "namespace": "prod_analytics"
+                    },
+                    "run": {
+                      "runId": "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
+                    }
+                  },
+                  "_producer": "{{ regex_match(result, 
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
 }}",
+                  "_schemaURL": "{{ regex_match(result, 
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
 }}"
+                }
+            }
+        },
+        "job": {
+            "name": "openlineage_trigger_dag_child__notrigger.do_nothing_task"
+        }
+    },
     {
         "eventType": "START",
         "run": {
@@ -245,6 +361,26 @@
                     "task": {
                         "trigger_dag_id": 
"openlineage_trigger_dag_child__notrigger"
                     }
+                },
+                "parent": {
+                  "job": {
+                    "namespace": "{{ result is string }}",
+                    "name": "openlineage_trigger_dag"
+                  },
+                  "run": {
+                    "runId": "{{ is_uuid(result) }}"
+                  },
+                  "root": {
+                    "job": {
+                      "name": "openlineage_trigger_dag",
+                      "namespace": "{{ result is string }}"
+                    },
+                    "run": {
+                      "runId": "{{ is_uuid(result) }}"
+                    }
+                  },
+                  "_producer": "{{ regex_match(result, 
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
 }}",
+                  "_schemaURL": "{{ regex_match(result, 
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
 }}"
                 }
             }
         },
@@ -260,6 +396,26 @@
                     "task": {
                         "trigger_dag_id": 
"openlineage_trigger_dag_child__notrigger"
                     }
+                },
+                "parent": {
+                  "job": {
+                    "namespace": "{{ result is string }}",
+                    "name": "openlineage_trigger_dag"
+                  },
+                  "run": {
+                    "runId": "{{ is_uuid(result) }}"
+                  },
+                  "root": {
+                    "job": {
+                      "name": "openlineage_trigger_dag",
+                      "namespace": "{{ result is string }}"
+                    },
+                    "run": {
+                      "runId": "{{ is_uuid(result) }}"
+                    }
+                  },
+                  "_producer": "{{ regex_match(result, 
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
 }}",
+                  "_schemaURL": "{{ regex_match(result, 
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
 }}"
                 }
             }
         },
diff --git 
a/providers/openlineage/tests/unit/openlineage/plugins/test_macros.py 
b/providers/openlineage/tests/unit/openlineage/plugins/test_macros.py
index 520421be768..3cdde5d183f 100644
--- a/providers/openlineage/tests/unit/openlineage/plugins/test_macros.py
+++ b/providers/openlineage/tests/unit/openlineage/plugins/test_macros.py
@@ -21,12 +21,13 @@ from unittest import mock
 
 import pytest
 
-from airflow import __version__
 from airflow.providers.openlineage.conf import namespace
 from airflow.providers.openlineage.plugins.macros import (
     lineage_job_name,
     lineage_job_namespace,
     lineage_parent_id,
+    lineage_root_job_name,
+    lineage_root_job_namespace,
     lineage_root_run_id,
     lineage_run_id,
 )
@@ -35,10 +36,10 @@ from tests_common.test_utils.version_compat import 
AIRFLOW_V_3_0_PLUS
 
 _DAG_NAMESPACE = namespace()
 
-if __version__.startswith("2."):
-    LOGICAL_DATE_KEY = "execution_date"
-else:
+if AIRFLOW_V_3_0_PLUS:
     LOGICAL_DATE_KEY = "logical_date"
+else:
+    LOGICAL_DATE_KEY = "execution_date"
 
 
 def test_lineage_job_namespace():
@@ -130,3 +131,204 @@ def 
test_lineage_root_run_id_with_runtime_task_instance(create_runtime_ti):
         assert lineage_root_run_id(runtime_ti) is not None
     except AttributeError as e:
         pytest.fail(f"lineage_root_run_id should not throw AttributeError with 
RuntimeTaskInstance: {e}")
+
+
[email protected](not AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow 
3.0+")
+def test_lineage_root_run_id_no_conf_af3(create_runtime_ti):
+    from airflow.providers.common.compat.sdk import BaseOperator
+
+    task = BaseOperator(task_id="test_task")
+
+    runtime_ti = create_runtime_ti(
+        task=task,
+        dag_id="test_dag",
+        run_id="test_run_id",
+        conf=None,
+    )
+
+    result = lineage_root_run_id(runtime_ti)
+    assert result == "01937fbb-4680-70b3-b49b-1de6b041527a"
+
+
[email protected](not AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow 
3.0+")
+def test_lineage_root_run_id_with_conf_af3(create_runtime_ti):
+    from airflow.providers.common.compat.sdk import BaseOperator
+
+    task = BaseOperator(task_id="test_task")
+
+    runtime_ti = create_runtime_ti(
+        task=task,
+        dag_id="test_dag",
+        run_id="test_run_id",
+        conf={
+            "openlineage": {
+                "rootParentRunId": "22222222-2222-2222-2222-222222222222",
+                "rootParentJobNamespace": "rootns",
+                "rootParentJobName": "rootjob",
+            }
+        },
+    )
+
+    result = lineage_root_run_id(runtime_ti)
+    assert result == "22222222-2222-2222-2222-222222222222"
+
+
+def test_lineage_root_run_id_without_conf_af2():
+    date = datetime(2020, 1, 1, 1, 1, 1, 0, tzinfo=timezone.utc)
+    conf = {}
+    dag_run = mock.MagicMock(run_id="run_id", conf=conf)
+    dag_run.logical_date = date
+    dag_run.clear_number = 1
+    task_instance = mock.MagicMock(
+        dag_id="dag_id",
+        task_id="task_id",
+        dag_run=dag_run,
+        logical_date=date,
+        try_number=1,
+    )
+
+    call_result1 = lineage_root_run_id(task_instance)
+    call_result2 = lineage_root_run_id(task_instance)
+
+    # random part value does not matter, it just has to be the same for the 
same TaskInstance
+    assert call_result1 == call_result2
+    assert call_result1 == "016f5e9e-c4c8-7c30-9eda-d9c646d633ea"
+
+
+def test_lineage_root_run_id_with_conf_af2():
+    conf = {
+        "openlineage": {
+            "rootParentRunId": "22222222-2222-2222-2222-222222222222",
+            "rootParentJobNamespace": "rootns",
+            "rootParentJobName": "rootjob",
+        }
+    }
+    task_instance = mock.MagicMock(
+        dag_run=mock.MagicMock(conf=conf),
+    )
+
+    call_result1 = lineage_root_run_id(task_instance)
+    call_result2 = lineage_root_run_id(task_instance)
+
+    assert call_result1 == call_result2
+    assert call_result1 == "22222222-2222-2222-2222-222222222222"
+
+
[email protected](not AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow 
3.0+")
+def test_lineage_root_job_name_no_conf_af3(create_runtime_ti):
+    from airflow.providers.common.compat.sdk import BaseOperator
+
+    task = BaseOperator(task_id="test_task")
+
+    runtime_ti = create_runtime_ti(
+        task=task,
+        dag_id="test_dag",
+        run_id="test_run_id",
+        conf=None,
+    )
+
+    result = lineage_root_job_name(runtime_ti)
+    assert result == "test_dag"
+
+
[email protected](not AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow 
3.0+")
+def test_lineage_root_job_name_with_conf_af3(create_runtime_ti):
+    from airflow.providers.common.compat.sdk import BaseOperator
+
+    task = BaseOperator(task_id="test_task")
+
+    runtime_ti = create_runtime_ti(
+        task=task,
+        dag_id="test_dag",
+        run_id="test_run_id",
+        conf={
+            "openlineage": {
+                "rootParentRunId": "22222222-2222-2222-2222-222222222222",
+                "rootParentJobNamespace": "rootns",
+                "rootParentJobName": "rootjob",
+            }
+        },
+    )
+
+    result = lineage_root_job_name(runtime_ti)
+    assert result == "rootjob"
+
+
+def test_lineage_root_job_name_without_conf_af2():
+    task_instance = mock.MagicMock(
+        dag_id="dag_id",
+        dag_run=mock.MagicMock(conf={}),
+    )
+
+    result = lineage_root_job_name(task_instance)
+    assert result == "dag_id"
+
+
+def test_lineage_root_job_name_with_conf_af2():
+    conf = {
+        "openlineage": {
+            "rootParentRunId": "22222222-2222-2222-2222-222222222222",
+            "rootParentJobNamespace": "rootns",
+            "rootParentJobName": "rootjob",
+        }
+    }
+    task_instance = mock.MagicMock(dag_run=mock.MagicMock(conf=conf))
+
+    result = lineage_root_job_name(task_instance)
+    assert result == "rootjob"
+
+
[email protected](not AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow 
3.0+")
+def test_lineage_root_job_namespace_no_conf_af3(create_runtime_ti):
+    from airflow.providers.common.compat.sdk import BaseOperator
+
+    task = BaseOperator(task_id="test_task")
+
+    runtime_ti = create_runtime_ti(task=task, dag_id="test_dag", 
run_id="test_run_id", conf=None)
+
+    result = lineage_root_job_namespace(runtime_ti)
+    assert result == _DAG_NAMESPACE
+
+
[email protected](not AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow 
3.0+")
+def test_lineage_root_job_namespace_with_conf_af3(create_runtime_ti):
+    from airflow.providers.common.compat.sdk import BaseOperator
+
+    task = BaseOperator(task_id="test_task")
+
+    runtime_ti = create_runtime_ti(
+        task=task,
+        dag_id="test_dag",
+        run_id="test_run_id",
+        conf={
+            "openlineage": {
+                "rootParentRunId": "22222222-2222-2222-2222-222222222222",
+                "rootParentJobNamespace": "rootns",
+                "rootParentJobName": "rootjob",
+            }
+        },
+    )
+
+    result = lineage_root_job_namespace(runtime_ti)
+    assert result == "rootns"
+
+
+def test_lineage_root_job_namespace_without_conf_af2():
+    task_instance = mock.MagicMock(dag_run=mock.MagicMock(conf={}))
+
+    result = lineage_root_job_namespace(task_instance)
+    assert result == _DAG_NAMESPACE
+
+
+def test_lineage_root_job_namespace_with_conf_af2():
+    conf = {
+        "openlineage": {
+            "rootParentRunId": "22222222-2222-2222-2222-222222222222",
+            "rootParentJobNamespace": "rootns",
+            "rootParentJobName": "rootjob",
+        }
+    }
+    task_instance = mock.MagicMock(dag_run=mock.MagicMock(conf=conf))
+
+    result = lineage_root_job_namespace(task_instance)
+    assert result == "rootns"
diff --git a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py 
b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
index 3bbea4aafac..198280426a3 100644
--- a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
+++ b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
@@ -24,6 +24,7 @@ from unittest.mock import MagicMock, PropertyMock, patch
 
 import pendulum
 import pytest
+from openlineage.client.facet_v2 import parent_run
 from uuid6 import uuid7
 
 from airflow import DAG
@@ -31,6 +32,7 @@ from airflow.models.dagrun import DagRun
 from airflow.models.taskinstance import TaskInstance, TaskInstanceState
 from airflow.providers.common.compat.assets import Asset
 from airflow.providers.common.compat.sdk import BaseOperator, TaskGroup, task, 
timezone
+from airflow.providers.openlineage.conf import namespace
 from airflow.providers.openlineage.plugins.facets import AirflowDagRunFacet, 
AirflowJobFacet
 from airflow.providers.openlineage.utils.utils import (
     _MAX_DOC_BYTES,
@@ -40,6 +42,7 @@ from airflow.providers.openlineage.utils.utils import (
     TaskInfo,
     TaskInfoComplete,
     TaskInstanceInfo,
+    _get_openlineage_data_from_dagrun_conf,
     _get_task_groups_details,
     _get_tasks_details,
     _truncate_string_to_byte_size,
@@ -47,11 +50,14 @@ from airflow.providers.openlineage.utils.utils import (
     get_airflow_job_facet,
     get_airflow_state_run_facet,
     get_dag_documentation,
+    get_dag_parent_run_facet,
     get_fully_qualified_class_name,
     get_job_name,
     get_operator_class,
     get_operator_provider_version,
+    get_root_information_from_dagrun_conf,
     get_task_documentation,
+    get_task_parent_run_facet,
     get_user_provided_run_facets,
 )
 from airflow.providers.standard.operators.empty import EmptyOperator
@@ -465,6 +471,233 @@ def test_get_operator_class_mapped_operator():
     assert op_class == MockOperator
 
 
[email protected]("dr_conf", (None, {}))
+def test_get_openlineage_data_from_dagrun_conf_none_conf(dr_conf):
+    _dr_conf = None if dr_conf is None else {}
+    assert _get_openlineage_data_from_dagrun_conf(dr_conf) == {}
+    assert dr_conf == _dr_conf  # Assert conf is not changed
+
+
+def test_get_openlineage_data_from_dagrun_conf_no_openlineage_key():
+    dr_conf = {"something_else": {"a": 1}}
+    assert _get_openlineage_data_from_dagrun_conf(dr_conf) == {}
+    assert dr_conf == {"something_else": {"a": 1}}  # Assert conf is not 
changed
+
+
+def test_get_openlineage_data_from_dagrun_conf_invalid_type():
+    dr_conf = {"openlineage": "not_a_dict"}
+    assert _get_openlineage_data_from_dagrun_conf(dr_conf) == {}
+    assert dr_conf == {"openlineage": "not_a_dict"}  # Assert conf is not 
changed
+
+
+def test_get_openlineage_data_from_dagrun_conf_valid_dict():
+    dr_conf = {"openlineage": {"key": "value"}}
+    assert _get_openlineage_data_from_dagrun_conf(dr_conf) == {"key": "value"}
+    assert dr_conf == {"openlineage": {"key": "value"}}  # Assert conf is not 
changed
+
+
[email protected]("dr_conf", (None, {}))
+def test_get_root_information_from_dagrun_conf_no_conf(dr_conf):
+    _dr_conf = None if dr_conf is None else {}
+    assert get_root_information_from_dagrun_conf(dr_conf) == {}
+    assert dr_conf == _dr_conf  # Assert conf is not changed
+
+
+def test_get_root_information_from_dagrun_conf_no_openlineage():
+    dr_conf = {"something": "else"}
+    assert get_root_information_from_dagrun_conf(dr_conf) == {}
+    assert dr_conf == {"something": "else"}  # Assert conf is not changed
+
+
+def test_get_root_information_from_dagrun_conf_openlineage_not_dict():
+    dr_conf = {"openlineage": "my_value"}
+    assert get_root_information_from_dagrun_conf(dr_conf) == {}
+    assert dr_conf == {"openlineage": "my_value"}  # Assert conf is not changed
+
+
+def test_get_root_information_from_dagrun_conf_missing_keys():
+    dr_conf = {"openlineage": {"rootParentRunId": "id_only"}}
+    assert get_root_information_from_dagrun_conf(dr_conf) == {}
+    assert dr_conf == {"openlineage": {"rootParentRunId": "id_only"}}  # 
Assert conf is not changed
+
+
+def test_get_root_information_from_dagrun_conf_invalid_run_id():
+    dr_conf = {
+        "openlineage": {
+            "rootParentRunId": "not_uuid",
+            "rootParentJobNamespace": "ns",
+            "rootParentJobName": "jobX",
+        }
+    }
+    assert get_root_information_from_dagrun_conf(dr_conf) == {}
+    assert dr_conf == {  # Assert conf is not changed
+        "openlineage": {
+            "rootParentRunId": "not_uuid",
+            "rootParentJobNamespace": "ns",
+            "rootParentJobName": "jobX",
+        }
+    }
+
+
+def test_get_root_information_from_dagrun_conf_valid_data():
+    dr_conf = {
+        "openlineage": {
+            "rootParentRunId": "11111111-1111-1111-1111-111111111111",
+            "rootParentJobNamespace": "ns",
+            "rootParentJobName": "jobX",
+        }
+    }
+    expected = {
+        "root_parent_run_id": "11111111-1111-1111-1111-111111111111",
+        "root_parent_job_namespace": "ns",
+        "root_parent_job_name": "jobX",
+    }
+    assert get_root_information_from_dagrun_conf(dr_conf) == expected
+    assert dr_conf == {  # Assert conf is not changed
+        "openlineage": {
+            "rootParentRunId": "11111111-1111-1111-1111-111111111111",
+            "rootParentJobNamespace": "ns",
+            "rootParentJobName": "jobX",
+        }
+    }
+
+
[email protected]("dr_conf", (None, {}))
+def test_get_dag_parent_run_facet_no_conf(dr_conf):
+    _dr_conf = None if dr_conf is None else {}
+    assert get_dag_parent_run_facet(dr_conf) == {}
+    assert dr_conf == _dr_conf  # Assert conf is not changed
+
+
+def test_get_dag_parent_run_facet_missing_keys():
+    dr_conf = {"openlineage": {"parentRunId": 
"11111111-1111-1111-1111-111111111111"}}
+    assert get_dag_parent_run_facet(dr_conf) == {}
+    # Assert conf is not changed
+    assert dr_conf == {"openlineage": {"parentRunId": 
"11111111-1111-1111-1111-111111111111"}}
+
+
+def test_get_dag_parent_run_facet_valid_no_root():
+    dr_conf = {
+        "openlineage": {
+            "parentRunId": "11111111-1111-1111-1111-111111111111",
+            "parentJobNamespace": "ns",
+            "parentJobName": "jobA",
+        }
+    }
+
+    result = get_dag_parent_run_facet(dr_conf)
+    parent_facet = result.get("parent")
+
+    assert isinstance(parent_facet, parent_run.ParentRunFacet)
+    assert parent_facet.run.runId == "11111111-1111-1111-1111-111111111111"
+    assert parent_facet.job.namespace == "ns"
+    assert parent_facet.job.name == "jobA"
+    assert parent_facet.root is not None  # parent is used as root, since root 
is missing
+    assert parent_facet.root.run.runId == 
"11111111-1111-1111-1111-111111111111"
+    assert parent_facet.root.job.namespace == "ns"
+    assert parent_facet.root.job.name == "jobA"
+
+    assert dr_conf == {  # Assert conf is not changed
+        "openlineage": {
+            "parentRunId": "11111111-1111-1111-1111-111111111111",
+            "parentJobNamespace": "ns",
+            "parentJobName": "jobA",
+        }
+    }
+
+
+def test_get_dag_parent_run_facet_invalid_uuid():
+    dr_conf = {
+        "openlineage": {
+            "parentRunId": "not_uuid",
+            "parentJobNamespace": "ns",
+            "parentJobName": "jobA",
+        }
+    }
+
+    result = get_dag_parent_run_facet(dr_conf)
+    assert result == {}
+    assert dr_conf == {  # Assert conf is not changed
+        "openlineage": {
+            "parentRunId": "not_uuid",
+            "parentJobNamespace": "ns",
+            "parentJobName": "jobA",
+        }
+    }
+
+
+def test_get_dag_parent_run_facet_valid_with_root():
+    dr_conf = {
+        "openlineage": {
+            "parentRunId": "11111111-1111-1111-1111-111111111111",
+            "parentJobNamespace": "ns",
+            "parentJobName": "jobA",
+            "rootParentRunId": "22222222-2222-2222-2222-222222222222",
+            "rootParentJobNamespace": "rootns",
+            "rootParentJobName": "rootjob",
+        }
+    }
+
+    result = get_dag_parent_run_facet(dr_conf)
+    parent_facet = result.get("parent")
+
+    assert isinstance(parent_facet, parent_run.ParentRunFacet)
+    assert parent_facet.run.runId == "11111111-1111-1111-1111-111111111111"
+    assert parent_facet.job.namespace == "ns"
+    assert parent_facet.job.name == "jobA"
+    assert parent_facet.root is not None
+    assert parent_facet.root.run.runId == 
"22222222-2222-2222-2222-222222222222"
+    assert parent_facet.root.job.namespace == "rootns"
+    assert parent_facet.root.job.name == "rootjob"
+
+    assert dr_conf == {  # Assert conf is not changed
+        "openlineage": {
+            "parentRunId": "11111111-1111-1111-1111-111111111111",
+            "parentJobNamespace": "ns",
+            "parentJobName": "jobA",
+            "rootParentRunId": "22222222-2222-2222-2222-222222222222",
+            "rootParentJobNamespace": "rootns",
+            "rootParentJobName": "rootjob",
+        }
+    }
+
+
+def test_get_task_parent_run_facet_defaults():
+    result = get_task_parent_run_facet(
+        parent_run_id="11111111-1111-1111-1111-111111111111",
+        parent_job_name="jobA",
+    )
+    parent_facet = result.get("parent")
+
+    assert isinstance(parent_facet, parent_run.ParentRunFacet)
+    assert parent_facet.run.runId == "11111111-1111-1111-1111-111111111111"
+    assert parent_facet.job.namespace == namespace()
+    assert parent_facet.job.name == "jobA"
+    assert parent_facet.root.run.runId == 
"11111111-1111-1111-1111-111111111111"
+    assert parent_facet.root.job.namespace == namespace()
+    assert parent_facet.root.job.name == "jobA"
+
+
+def test_get_task_parent_run_facet_custom_root_values():
+    result = get_task_parent_run_facet(
+        parent_run_id="11111111-1111-1111-1111-111111111111",
+        parent_job_name="jobA",
+        parent_job_namespace="ns",
+        root_parent_run_id="22222222-2222-2222-2222-222222222222",
+        root_parent_job_name="rjob",
+        root_parent_job_namespace="rns",
+    )
+
+    parent_facet = result.get("parent")
+    assert isinstance(parent_facet, parent_run.ParentRunFacet)
+    assert parent_facet.run.runId == "11111111-1111-1111-1111-111111111111"
+    assert parent_facet.job.namespace == "ns"
+    assert parent_facet.job.name == "jobA"
+    assert parent_facet.root.run.runId == 
"22222222-2222-2222-2222-222222222222"
+    assert parent_facet.root.job.namespace == "rns"
+    assert parent_facet.root.job.name == "rjob"
+
+
 def test_get_tasks_details():
     class TestMappedOperator(BaseOperator):
         def __init__(self, value, **kwargs):

Reply via email to