This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c202c07f67 Introduce AirflowJobFacet and AirflowStateRunFacet (#39520)
c202c07f67 is described below

commit c202c07f67173718c736d95de22185b65b25b580
Author: Kacper Muda <[email protected]>
AuthorDate: Wed Jun 5 13:52:13 2024 +0200

    Introduce AirflowJobFacet and AirflowStateRunFacet (#39520)
    
    Signed-off-by: Kacper Muda <[email protected]>
---
 .../openlineage/facets/AirflowJobFacet.json        |  40 ++
 .../openlineage/facets/AirflowRunFacet.json        | 254 ++++++++++++
 .../openlineage/facets/AirflowStateRunFacet.json   |  34 ++
 airflow/providers/openlineage/facets/__init__.py   |  16 +
 airflow/providers/openlineage/plugins/adapter.py   |  20 +-
 airflow/providers/openlineage/plugins/facets.py    |  47 ++-
 airflow/providers/openlineage/plugins/listener.py  |   4 +
 airflow/providers/openlineage/utils/utils.py       | 186 ++++++++-
 docs/spelling_wordlist.txt                         |   4 +
 .../providers/openlineage/plugins/test_adapter.py  | 129 ++++--
 tests/providers/openlineage/utils/test_utils.py    | 449 ++++++++++++++++++++-
 11 files changed, 1115 insertions(+), 68 deletions(-)

diff --git a/airflow/providers/openlineage/facets/AirflowJobFacet.json 
b/airflow/providers/openlineage/facets/AirflowJobFacet.json
new file mode 100644
index 0000000000..51a9954de3
--- /dev/null
+++ b/airflow/providers/openlineage/facets/AirflowJobFacet.json
@@ -0,0 +1,40 @@
+{
+    "$schema": "https://json-schema.org/draft/2020-12/schema";,
+    "$defs": {
+      "AirflowJobFacet": {
+        "allOf": [
+          {
+            "$ref": 
"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet";
+          },
+          {
+            "type": "object",
+            "properties": {
+              "taskTree": {
+                "description": "The hierarchical structure of tasks in the 
DAG.",
+                "type": "object",
+                "additionalProperties": true
+              },
+              "taskGroups": {
+                "description": "Information about all task groups within the 
DAG.",
+                "type": "object",
+                "additionalProperties": true
+              },
+              "tasks": {
+                "description": "Details of all individual tasks within the 
DAG.",
+                "type": "object",
+                "additionalProperties": true
+              }
+            },
+            "required": ["taskTree", "taskGroups", "tasks"]
+          }
+        ],
+        "type": "object"
+      }
+    },
+    "type": "object",
+    "properties": {
+      "airflow": {
+        "$ref": "#/$defs/AirflowJobFacet"
+      }
+    }
+  }
diff --git a/airflow/providers/openlineage/facets/AirflowRunFacet.json 
b/airflow/providers/openlineage/facets/AirflowRunFacet.json
new file mode 100644
index 0000000000..504fb1bc3a
--- /dev/null
+++ b/airflow/providers/openlineage/facets/AirflowRunFacet.json
@@ -0,0 +1,254 @@
+{
+  "$schema": "https://json-schema.org/draft/2020-12/schema";,
+  "$defs": {
+    "AirflowRunFacet": {
+      "allOf": [
+        {
+          "$ref": 
"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet";
+        },
+        {
+          "type": "object",
+          "properties": {
+            "dag": {
+              "$ref": "#/$defs/DAG"
+            },
+            "dagRun": {
+              "$ref": "#/$defs/DagRun"
+            },
+            "taskInstance": {
+              "$ref": "#/$defs/TaskInstance"
+            },
+            "task": {
+              "$ref": "#/$defs/Task"
+            },
+            "taskUuid": {
+              "type": "string"
+            }
+          },
+          "required": [
+            "dag",
+            "dagRun",
+            "taskInstance",
+            "task",
+            "taskUuid"
+          ]
+        }
+      ]
+    },
+    "Task": {
+      "type": "object",
+      "properties": {
+        "depends_on_past": {
+          "type": "boolean"
+        },
+        "downstream_task_ids": {
+          "type": "string"
+        },
+        "execution_timeout": {
+          "type": "string"
+        },
+        "executor_config": {
+          "type": "object",
+          "additionalProperties": true
+        },
+        "ignore_first_depends_on_past": {
+          "type": "boolean"
+        },
+        "is_setup": {
+          "type": "boolean"
+        },
+        "is_teardown": {
+          "type": "boolean"
+        },
+        "mapped": {
+          "type": "boolean"
+        },
+        "max_active_tis_per_dag": {
+          "type": "integer"
+        },
+        "max_active_tis_per_dagrun": {
+          "type": "integer"
+        },
+        "max_retry_delay": {
+          "type": "string"
+        },
+        "multiple_outputs": {
+          "type": "boolean"
+        },
+        "operator_class": {
+          "description": "Module + class name of the operator",
+          "type": "string"
+        },
+        "owner": {
+          "type": "string"
+        },
+        "priority_weight": {
+          "type": "integer"
+        },
+        "queue": {
+          "type": "string"
+        },
+        "retries": {
+          "type": "integer"
+        },
+        "retry_exponential_backoff": {
+          "type": "boolean"
+        },
+        "run_as_user": {
+          "type": "string"
+        },
+        "task_id": {
+          "type": "string"
+        },
+        "trigger_rule": {
+          "type": "string"
+        },
+        "upstream_task_ids": {
+          "type": "string"
+        },
+        "wait_for_downstream": {
+          "type": "boolean"
+        },
+        "wait_for_past_depends_before_skipping": {
+          "type": "boolean"
+        },
+        "weight_rule": {
+          "type": "string"
+        },
+        "task_group": {
+          "description": "Task group related information",
+          "type": "object",
+          "properties": {
+            "group_id": {
+              "type": "string"
+            },
+            "downstream_group_ids": {
+              "type": "string"
+            },
+            "downstream_task_ids": {
+              "type": "string"
+            },
+            "prefix_group_id": {
+              "type": "boolean"
+            },
+            "tooltip": {
+              "type": "string"
+            },
+            "upstream_group_ids": {
+              "type": "string"
+            },
+            "upstream_task_ids": {
+              "type": "string"
+            }
+          },
+          "additionalProperties": true,
+          "required": ["group_id"]
+        }
+      },
+      "additionalProperties": true,
+      "required": [
+        "task_id"
+      ]
+    },
+    "DAG": {
+      "type": "object",
+      "properties": {
+        "dag_id": {
+          "type": "string"
+        },
+        "description": {
+          "type": "string"
+        },
+        "owner": {
+          "type": "string"
+        },
+        "schedule_interval": {
+          "type": "string"
+        },
+        "start_date": {
+          "type": "string",
+          "format": "date-time"
+        },
+        "tags": {
+          "type": "string"
+        },
+        "timetable": {
+          "description": "Describes timetable (successor of 
schedule_interval)",
+          "type": "object",
+          "additionalProperties": true
+        }
+      },
+      "additionalProperties": true,
+      "required": [
+        "dag_id",
+        "start_date"
+      ]
+    },
+    "TaskInstance": {
+      "type": "object",
+      "properties": {
+        "duration": {
+          "type": "number"
+        },
+        "map_index": {
+          "type": "integer"
+        },
+        "pool": {
+          "type": "string"
+        },
+        "try_number": {
+          "type": "integer"
+        }
+      },
+      "additionalProperties": true,
+      "required": [
+        "pool",
+        "try_number"
+      ]
+    },
+    "DagRun": {
+      "type": "object",
+      "properties": {
+        "conf": {
+          "type": "object",
+          "additionalProperties": true
+        },
+        "dag_id": {
+          "type": "string"
+        },
+        "data_interval_start": {
+          "type": "string",
+          "format": "date-time"
+        },
+        "data_interval_end": {
+          "type": "string",
+          "format": "date-time"
+        },
+        "external_trigger": {
+          "type": "boolean"
+        },
+        "run_id": {
+          "type": "string"
+        },
+        "run_type": {
+          "type": "string"
+        },
+        "start_date": {
+          "type": "string",
+          "format": "date-time"
+        }
+      },
+      "additionalProperties": true,
+      "required": [
+        "dag_id",
+        "run_id"
+      ]
+    }
+  },
+  "type": "object",
+  "properties": {
+    "airflow": {
+      "$ref": "#/$defs/AirflowRunFacet"
+    }
+  }
+}
diff --git a/airflow/providers/openlineage/facets/AirflowStateRunFacet.json 
b/airflow/providers/openlineage/facets/AirflowStateRunFacet.json
new file mode 100644
index 0000000000..2788e17282
--- /dev/null
+++ b/airflow/providers/openlineage/facets/AirflowStateRunFacet.json
@@ -0,0 +1,34 @@
+{
+    "$schema": "https://json-schema.org/draft/2020-12/schema";,
+    "$defs": {
+      "AirflowStateRunFacet": {
+        "allOf": [
+          {
+            "$ref": 
"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet";
+          },
+          {
+            "type": "object",
+            "properties": {
+              "dagRunState": {
+                "description": "The final status of the entire DagRun",
+                "type": "string"
+              },
+              "tasksState": {
+                "description": "Mapping of task IDs to their respective 
states",
+                "type": "object",
+                "additionalProperties": true
+              }
+            },
+            "required": ["dagRunState", "tasksState"]
+          }
+        ],
+        "type": "object"
+      }
+    },
+    "type": "object",
+    "properties": {
+      "airflowState": {
+        "$ref": "#/$defs/AirflowStateRunFacet"
+      }
+    }
+  }
diff --git a/airflow/providers/openlineage/facets/__init__.py 
b/airflow/providers/openlineage/facets/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/airflow/providers/openlineage/facets/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/openlineage/plugins/adapter.py 
b/airflow/providers/openlineage/plugins/adapter.py
index 608bd568e1..339ad55fdd 100644
--- a/airflow/providers/openlineage/plugins/adapter.py
+++ b/airflow/providers/openlineage/plugins/adapter.py
@@ -38,7 +38,10 @@ from openlineage.client.run import Job, Run, RunEvent, 
RunState
 from openlineage.client.uuid import generate_static_uuid
 
 from airflow.providers.openlineage import __version__ as 
OPENLINEAGE_PROVIDER_VERSION, conf
-from airflow.providers.openlineage.utils.utils import OpenLineageRedactor
+from airflow.providers.openlineage.utils.utils import (
+    OpenLineageRedactor,
+    get_airflow_state_run_facet,
+)
 from airflow.stats import Stats
 from airflow.utils.log.logging_mixin import LoggingMixin
 
@@ -321,12 +324,19 @@ class OpenLineageAdapter(LoggingMixin):
         msg: str,
         nominal_start_time: str,
         nominal_end_time: str,
+        job_facets: dict[str, BaseFacet] | None = None,  # Custom job facets
     ):
         try:
             event = RunEvent(
                 eventType=RunState.START,
                 eventTime=dag_run.start_date.isoformat(),
-                job=self._build_job(job_name=dag_run.dag_id, 
job_type=_JOB_TYPE_DAG),
+                job=self._build_job(
+                    job_name=dag_run.dag_id,
+                    job_type=_JOB_TYPE_DAG,
+                    job_description=dag_run.dag.description if dag_run.dag 
else None,
+                    owners=[x.strip() for x in dag_run.dag.owner.split(",")] 
if dag_run.dag else None,
+                    job_facets=job_facets,
+                ),
                 run=self._build_run(
                     run_id=self.build_dag_run_id(
                         dag_id=dag_run.dag_id,
@@ -358,6 +368,7 @@ class OpenLineageAdapter(LoggingMixin):
                         dag_id=dag_run.dag_id,
                         execution_date=dag_run.execution_date,
                     ),
+                    facets={**get_airflow_state_run_facet(dag_run)},
                 ),
                 inputs=[],
                 outputs=[],
@@ -381,7 +392,10 @@ class OpenLineageAdapter(LoggingMixin):
                         dag_id=dag_run.dag_id,
                         execution_date=dag_run.execution_date,
                     ),
-                    facets={"errorMessage": ErrorMessageRunFacet(message=msg, 
programmingLanguage="python")},
+                    facets={
+                        "errorMessage": ErrorMessageRunFacet(message=msg, 
programmingLanguage="python"),
+                        **get_airflow_state_run_facet(dag_run),
+                    },
                 ),
                 inputs=[],
                 outputs=[],
diff --git a/airflow/providers/openlineage/plugins/facets.py 
b/airflow/providers/openlineage/plugins/facets.py
index 925f386d6e..fd1f6ef4b5 100644
--- a/airflow/providers/openlineage/plugins/facets.py
+++ b/airflow/providers/openlineage/plugins/facets.py
@@ -39,15 +39,56 @@ class AirflowMappedTaskRunFacet(BaseFacet):
 
     @classmethod
     def from_task_instance(cls, task_instance):
-        task = task_instance.task
-        from airflow.providers.openlineage.utils.utils import 
get_operator_class
+        from airflow.providers.openlineage.utils.utils import 
get_fully_qualified_class_name
 
         return cls(
             mapIndex=task_instance.map_index,
-            
operatorClass=f"{get_operator_class(task).__module__}.{get_operator_class(task).__name__}",
+            operatorClass=get_fully_qualified_class_name(task_instance.task),
         )
 
 
+@define(slots=False)
+class AirflowJobFacet(BaseFacet):
+    """
+    Composite Airflow job facet.
+
+    This facet encapsulates all the necessary information to re-create full 
scope of an Airflow DAG logic,
+    enabling reconstruction, visualization, and analysis of DAGs in a 
comprehensive manner.
+    It includes detailed representations of the tasks, task groups, and their 
hierarchical relationships,
+    making it possible to draw a graph that visually represents the entire DAG 
structure (like in Airflow UI).
+    It also indicates whether a task should emit an OpenLineage (OL) event, 
enabling consumers to anticipate
+    the number of events and identify the tasks from which they can expect 
these events.
+
+    Attributes:
+        taskTree: A dictionary representing the hierarchical structure of 
tasks in the DAG.
+        taskGroups: A dictionary that contains information about task groups 
within the DAG.
+        tasks: A dictionary detailing individual tasks within the DAG.
+    """
+
+    taskTree: dict
+    taskGroups: dict
+    tasks: dict
+
+
+@define(slots=False)
+class AirflowStateRunFacet(BaseFacet):
+    """
+    Airflow facet providing state information.
+
+    This facet is designed to be sent at a completion event, offering state 
information about
+    the DAG run and each individual task. This information is crucial for 
understanding
+    the execution flow and comprehensive post-run analysis and debugging, 
including why certain tasks
+    did not emit events, which can occur due to the use of control flow 
operators like the BranchOperator.
+
+    Attributes:
+        dagRunState: This indicates the final status of the entire DAG run 
(e.g., "success", "failed").
+        tasksState: A dictionary mapping task IDs to their respective states. 
(e.g., "failed", "skipped").
+    """
+
+    dagRunState: str
+    tasksState: dict[str, str]
+
+
 @define(slots=False)
 class AirflowRunFacet(BaseFacet):
     """Composite Airflow run facet."""
diff --git a/airflow/providers/openlineage/plugins/listener.py 
b/airflow/providers/openlineage/plugins/listener.py
index 4a1085168a..76b60d61b7 100644
--- a/airflow/providers/openlineage/plugins/listener.py
+++ b/airflow/providers/openlineage/plugins/listener.py
@@ -30,6 +30,7 @@ from airflow.providers.openlineage import conf
 from airflow.providers.openlineage.extractors import ExtractorManager
 from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter, 
RunState
 from airflow.providers.openlineage.utils.utils import (
+    get_airflow_job_facet,
     get_airflow_run_facet,
     get_custom_facets,
     get_job_name,
@@ -367,6 +368,9 @@ class OpenLineageListener:
             msg=msg,
             nominal_start_time=data_interval_start,
             nominal_end_time=data_interval_end,
+            # AirflowJobFacet should be created outside ProcessPoolExecutor 
that pickles objects,
+            # as it causes lack of some TaskGroup attributes and crashes event 
emission.
+            job_facets={**get_airflow_job_facet(dag_run=dag_run)},
         )
 
     @hookimpl
diff --git a/airflow/providers/openlineage/utils/utils.py 
b/airflow/providers/openlineage/utils/utils.py
index ff6ad63970..6904b32d2d 100644
--- a/airflow/providers/openlineage/utils/utils.py
+++ b/airflow/providers/openlineage/utils/utils.py
@@ -20,8 +20,10 @@ from __future__ import annotations
 import datetime
 import json
 import logging
-from contextlib import suppress
+import re
+from contextlib import redirect_stdout, suppress
 from functools import wraps
+from io import StringIO
 from typing import TYPE_CHECKING, Any, Iterable
 
 import attrs
@@ -32,8 +34,11 @@ from airflow.exceptions import 
AirflowProviderDeprecationWarning  # TODO: move t
 from airflow.models import DAG, BaseOperator, MappedOperator
 from airflow.providers.openlineage import conf
 from airflow.providers.openlineage.plugins.facets import (
+    AirflowJobFacet,
     AirflowMappedTaskRunFacet,
     AirflowRunFacet,
+    AirflowStateRunFacet,
+    BaseFacet,
     UnknownOperatorAttributeRunFacet,
     UnknownOperatorInstance,
 )
@@ -41,6 +46,7 @@ from airflow.providers.openlineage.utils.selective_enable 
import (
     is_dag_lineage_enabled,
     is_task_lineage_enabled,
 )
+from airflow.serialization.serialized_objects import SerializedBaseOperator
 from airflow.utils.context import AirflowContextDeprecationWarning
 from airflow.utils.log.secrets_masker import Redactable, Redacted, 
SecretsMasker, should_hide_value_for_key
 from airflow.utils.module_loading import import_string
@@ -78,7 +84,11 @@ def get_custom_facets(task_instance: TaskInstance | None = 
None) -> dict[str, An
 
 
 def get_fully_qualified_class_name(operator: BaseOperator | MappedOperator) -> 
str:
-    return operator.__class__.__module__ + "." + operator.__class__.__name__
+    if isinstance(operator, (MappedOperator, SerializedBaseOperator)):
+        # as in 
airflow.api_connexion.schemas.common_schema.ClassReferenceSchema
+        return operator._task_module + "." + operator._task_type  # type: 
ignore
+    op_class = get_operator_class(operator)
+    return op_class.__module__ + "." + op_class.__name__
 
 
 def is_operator_disabled(operator: BaseOperator | MappedOperator) -> bool:
@@ -168,7 +178,7 @@ class InfoJsonEncodable(dict):
 class DagInfo(InfoJsonEncodable):
     """Defines encoding DAG object to JSON."""
 
-    includes = ["dag_id", "schedule_interval", "tags", "start_date"]
+    includes = ["dag_id", "description", "owner", "schedule_interval", 
"start_date", "tags"]
     casts = {"timetable": lambda dag: dag.timetable.serialize() if 
getattr(dag, "timetable", None) else None}
     renames = {"_dag_id": "dag_id"}
 
@@ -193,9 +203,9 @@ class TaskInstanceInfo(InfoJsonEncodable):
 
     includes = ["duration", "try_number", "pool"]
     casts = {
-        "map_index": lambda ti: ti.map_index
-        if hasattr(ti, "map_index") and getattr(ti, "map_index") != -1
-        else None
+        "map_index": lambda ti: (
+            ti.map_index if hasattr(ti, "map_index") and getattr(ti, 
"map_index") != -1 else None
+        )
     }
 
 
@@ -234,9 +244,11 @@ class TaskInfo(InfoJsonEncodable):
     ]
     casts = {
         "operator_class": lambda task: task.task_type,
-        "task_group": lambda task: TaskGroupInfo(task.task_group)
-        if hasattr(task, "task_group") and getattr(task.task_group, 
"_group_id", None)
-        else None,
+        "task_group": lambda task: (
+            TaskGroupInfo(task.task_group)
+            if hasattr(task, "task_group") and getattr(task.task_group, 
"_group_id", None)
+            else None
+        ),
     }
 
 
@@ -262,20 +274,158 @@ def get_airflow_run_facet(
     task_instance: TaskInstance,
     task: BaseOperator,
     task_uuid: str,
-):
+) -> dict[str, BaseFacet]:
     return {
-        "airflow": attrs.asdict(
-            AirflowRunFacet(
-                dag=DagInfo(dag),
-                dagRun=DagRunInfo(dag_run),
-                taskInstance=TaskInstanceInfo(task_instance),
-                task=TaskInfo(task),
-                taskUuid=task_uuid,
-            )
+        "airflow": AirflowRunFacet(
+            dag=DagInfo(dag),
+            dagRun=DagRunInfo(dag_run),
+            taskInstance=TaskInstanceInfo(task_instance),
+            task=TaskInfo(task),
+            taskUuid=task_uuid,
+        )
+    }
+
+
+def get_airflow_job_facet(dag_run: DagRun) -> dict[str, BaseFacet]:
+    if not dag_run.dag:
+        return {}
+    return {
+        "airflow": AirflowJobFacet(
+            taskTree=_get_parsed_dag_tree(dag_run.dag),
+            taskGroups=_get_task_groups_details(dag_run.dag),
+            tasks=_get_tasks_details(dag_run.dag),
+        )
+    }
+
+
+def get_airflow_state_run_facet(dag_run: DagRun) -> dict[str, BaseFacet]:
+    return {
+        "airflowState": AirflowStateRunFacet(
+            dagRunState=dag_run.get_state(),
+            tasksState={ti.task_id: ti.state for ti in 
dag_run.get_task_instances()},
         )
     }
 
 
+def _safe_get_dag_tree_view(dag: DAG) -> list[str]:
+    # get_tree_view() has been added in Airflow 2.8.2
+    if hasattr(dag, "get_tree_view"):
+        return dag.get_tree_view().splitlines()
+
+    with redirect_stdout(StringIO()) as stdout:
+        dag.tree_view()
+        return stdout.getvalue().splitlines()
+
+
+def _get_parsed_dag_tree(dag: DAG) -> dict:
+    """
+    Get DAG's tasks hierarchy representation.
+
+    While the task dependencies are defined as following:
+    task >> [task_2, task_4] >> task_7
+    task_3 >> task_5
+    task_6  # has no dependencies, it's a root and a leaf
+
+    The result of this function will look like:
+    {
+        "task": {
+            "task_2": {
+                "task_7": {}
+            },
+            "task_4": {
+                "task_7": {}
+            }
+        },
+        "task_3": {
+            "task_5": {}
+        },
+        "task_6": {}
+    }
+    """
+    lines = _safe_get_dag_tree_view(dag)
+    task_dict: dict[str, dict] = {}
+    parent_map: dict[int, tuple[str, dict]] = {}
+
+    for line in lines:
+        stripped_line = line.strip()
+        if not stripped_line:
+            continue
+
+        # Determine the level by counting the leading spaces, assuming 4 
spaces per level
+        # as defined in airflow.models.dag.DAG._generate_tree_view()
+        level = (len(line) - len(stripped_line)) // 4
+        # airflow.models.baseoperator.BaseOperator.__repr__ is used in DAG tree
+        # <Task({op_class}): {task_id}>
+        match = re.match(r"^<Task\((.+)\): (.*?)>$", stripped_line)
+        if not match:
+            return {}
+        current_task_id = match[2]
+
+        if level == 0:  # It's a root task
+            task_dict[current_task_id] = {}
+            parent_map[level] = (current_task_id, task_dict[current_task_id])
+        else:
+            # Find the immediate parent task
+            parent_task, parent_dict = parent_map[(level - 1)]
+            # Create new dict for the current task
+            parent_dict[current_task_id] = {}
+            # Update this task in the parent map
+            parent_map[level] = (current_task_id, parent_dict[current_task_id])
+
+    return task_dict
+
+
+def _get_tasks_details(dag: DAG) -> dict:
+    tasks = {
+        single_task.task_id: {
+            "operator": get_fully_qualified_class_name(single_task),
+            "task_group": single_task.task_group.group_id if 
single_task.task_group else None,
+            "emits_ol_events": _emits_ol_events(single_task),
+            "ui_color": single_task.ui_color,
+            "ui_fgcolor": single_task.ui_fgcolor,
+            "ui_label": single_task.label,
+            "is_setup": single_task.is_setup,
+            "is_teardown": single_task.is_teardown,
+        }
+        for single_task in dag.tasks
+    }
+
+    return tasks
+
+
+def _get_task_groups_details(dag: DAG) -> dict:
+    return {
+        tg_id: {
+            "parent_group": tg.parent_group.group_id,
+            "tooltip": tg.tooltip,
+            "ui_color": tg.ui_color,
+            "ui_fgcolor": tg.ui_fgcolor,
+            "ui_label": tg.label,
+        }
+        for tg_id, tg in dag.task_group_dict.items()
+    }
+
+
+def _emits_ol_events(task: BaseOperator | MappedOperator) -> bool:
+    config_selective_enabled = is_selective_lineage_enabled(task)
+    config_disabled_for_operators = is_operator_disabled(task)
+    # empty operators without callbacks/outlets are skipped for optimization 
by Airflow
+    # in airflow.models.taskinstance.TaskInstance._schedule_downstream_tasks
+    is_skipped_as_empty_operator = all(
+        (
+            task.inherits_from_empty_operator,
+            not task.on_execute_callback,
+            not task.on_success_callback,
+            not task.outlets,
+        )
+    )
+
+    emits_ol_events = all(
+        (config_selective_enabled, not config_disabled_for_operators, not 
is_skipped_as_empty_operator)
+    )
+    return emits_ol_events
+
+
 def get_unknown_source_attribute_run_facet(task: BaseOperator, name: str | 
None = None):
     if not name:
         name = get_operator_class(task).__name__
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 7ac38eeaf2..8c5164d675 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -362,6 +362,7 @@ dagrun
 DagRunPydantic
 dagruns
 DagRunState
+dagRunState
 DAGs
 Dask
 dask
@@ -1606,6 +1607,7 @@ taskflow
 TaskGroup
 taskgroup
 TaskGroups
+taskGroups
 TaskInstance
 taskinstance
 TaskInstanceKey
@@ -1613,6 +1615,8 @@ taskinstancekey
 taskmeta
 taskmixin
 tasksetmeta
+tasksState
+taskTree
 tblproperties
 TCP
 tcp
diff --git a/tests/providers/openlineage/plugins/test_adapter.py 
b/tests/providers/openlineage/plugins/test_adapter.py
index 6f010b1a2d..0212f1402c 100644
--- a/tests/providers/openlineage/plugins/test_adapter.py
+++ b/tests/providers/openlineage/plugins/test_adapter.py
@@ -38,6 +38,11 @@ from openlineage.client.facet import (
 )
 from openlineage.client.run import Dataset, Job, Run, RunEvent, RunState
 
+from airflow import DAG
+from airflow.models.dagrun import DagRun, DagRunState
+from airflow.models.taskinstance import TaskInstance, TaskInstanceState
+from airflow.operators.bash import BashOperator
+from airflow.operators.empty import EmptyOperator
 from airflow.providers.openlineage.conf import (
     config_path,
     custom_extractors,
@@ -49,6 +54,11 @@ from airflow.providers.openlineage.conf import (
 )
 from airflow.providers.openlineage.extractors import OperatorLineage
 from airflow.providers.openlineage.plugins.adapter import _PRODUCER, 
OpenLineageAdapter
+from airflow.providers.openlineage.plugins.facets import (
+    AirflowStateRunFacet,
+)
+from airflow.providers.openlineage.utils.utils import get_airflow_job_facet
+from airflow.utils.task_group import TaskGroup
 from tests.test_utils.config import conf_vars
 
 pytestmark = pytest.mark.db_test
@@ -534,19 +544,33 @@ def test_emit_dag_started_event(mock_stats_incr, 
mock_stats_timer, generate_stat
     dag_id = "dag_id"
     run_id = str(uuid.uuid4())
 
-    dagrun_mock = MagicMock()
-    dagrun_mock.start_date = event_time
-    dagrun_mock.run_id = run_id
-    dagrun_mock.dag_id = dag_id
+    with DAG(dag_id=dag_id, description="dag desc", 
start_date=datetime.datetime(2024, 6, 1)) as dag:
+        tg = TaskGroup(group_id="tg1")
+        tg2 = TaskGroup(group_id="tg2", parent_group=tg)
+        task_0 = BashOperator(task_id="task_0", bash_command="exit 0;")  # 
noqa: F841
+        task_1 = BashOperator(task_id="task_1", bash_command="exit 0;", 
task_group=tg)  # noqa: F841
+        task_2 = EmptyOperator(task_id="task_2.test.dot", task_group=tg2)  # 
noqa: F841
+
+    dag_run = DagRun(
+        dag_id=dag_id,
+        run_id=run_id,
+        start_date=event_time,
+        execution_date=event_time,
+    )
+    dag_run.dag = dag
     generate_static_uuid.return_value = random_uuid
 
+    job_facets = {**get_airflow_job_facet(dag_run)}
+
     adapter.dag_started(
-        dag_run=dagrun_mock,
+        dag_run=dag_run,
         msg="",
         nominal_start_time=event_time.isoformat(),
         nominal_end_time=event_time.isoformat(),
+        job_facets=job_facets,
     )
 
+    assert len(client.emit.mock_calls) == 1
     assert (
         call(
             RunEvent(
@@ -565,9 +589,16 @@ def test_emit_dag_started_event(mock_stats_incr, 
mock_stats_timer, generate_stat
                     namespace=namespace(),
                     name="dag_id",
                     facets={
+                        "documentation": 
DocumentationJobFacet(description="dag desc"),
+                        "ownership": OwnershipJobFacet(
+                            owners=[
+                                OwnershipJobFacetOwners(name="airflow", 
type=None),
+                            ]
+                        ),
+                        **job_facets,
                         "jobType": JobTypeJobFacet(
                             processingType="BATCH", integration="AIRFLOW", 
jobType="DAG"
-                        )
+                        ),
                     },
                 ),
                 producer=_PRODUCER,
@@ -582,10 +613,11 @@ def test_emit_dag_started_event(mock_stats_incr, 
mock_stats_timer, generate_stat
     mock_stats_timer.assert_called_with("ol.emit.attempts")
 
 
[email protected](DagRun, "get_task_instances")
 
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
 @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
 @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
-def test_emit_dag_complete_event(mock_stats_incr, mock_stats_timer, 
generate_static_uuid):
+def test_emit_dag_complete_event(mock_stats_incr, mock_stats_timer, 
generate_static_uuid, mocked_get_tasks):
     random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
     client = MagicMock()
     adapter = OpenLineageAdapter(client)
@@ -593,15 +625,30 @@ def test_emit_dag_complete_event(mock_stats_incr, 
mock_stats_timer, generate_sta
     dag_id = "dag_id"
     run_id = str(uuid.uuid4())
 
-    dagrun_mock = MagicMock()
-    dagrun_mock.start_date = event_time
-    dagrun_mock.end_date = event_time
-    dagrun_mock.run_id = run_id
-    dagrun_mock.dag_id = dag_id
+    with DAG(dag_id=dag_id, start_date=datetime.datetime(2024, 6, 1)):
+        task_0 = BashOperator(task_id="task_0", bash_command="exit 0;")
+        task_1 = BashOperator(task_id="task_1", bash_command="exit 0;")
+        task_2 = EmptyOperator(
+            task_id="task_2.test",
+        )
+
+    dag_run = DagRun(
+        dag_id=dag_id,
+        run_id=run_id,
+        start_date=event_time,
+        execution_date=event_time,
+    )
+    dag_run._state = DagRunState.SUCCESS
+    dag_run.end_date = event_time
+    mocked_get_tasks.return_value = [
+        TaskInstance(task=task_0, run_id=run_id, 
state=TaskInstanceState.SUCCESS),
+        TaskInstance(task=task_1, run_id=run_id, 
state=TaskInstanceState.SKIPPED),
+        TaskInstance(task=task_2, run_id=run_id, 
state=TaskInstanceState.FAILED),
+    ]
     generate_static_uuid.return_value = random_uuid
 
     adapter.dag_success(
-        dag_run=dagrun_mock,
+        dag_run=dag_run,
         msg="",
     )
 
@@ -610,10 +657,22 @@ def test_emit_dag_complete_event(mock_stats_incr, 
mock_stats_timer, generate_sta
             RunEvent(
                 eventType=RunState.COMPLETE,
                 eventTime=event_time.isoformat(),
-                run=Run(runId=random_uuid, facets={}),
+                run=Run(
+                    runId=random_uuid,
+                    facets={
+                        "airflowState": AirflowStateRunFacet(
+                            dagRunState=DagRunState.SUCCESS,
+                            tasksState={
+                                task_0.task_id: TaskInstanceState.SUCCESS,
+                                task_1.task_id: TaskInstanceState.SKIPPED,
+                                task_2.task_id: TaskInstanceState.FAILED,
+                            },
+                        )
+                    },
+                ),
                 job=Job(
                     namespace=namespace(),
-                    name="dag_id",
+                    name=dag_id,
                     facets={
                         "jobType": JobTypeJobFacet(
                             processingType="BATCH", integration="AIRFLOW", 
jobType="DAG"
@@ -632,10 +691,11 @@ def test_emit_dag_complete_event(mock_stats_incr, 
mock_stats_timer, generate_sta
     mock_stats_timer.assert_called_with("ol.emit.attempts")
 
 
[email protected](DagRun, "get_task_instances")
 
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
 @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
 @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
-def test_emit_dag_failed_event(mock_stats_incr, mock_stats_timer, 
generate_static_uuid):
+def test_emit_dag_failed_event(mock_stats_incr, mock_stats_timer, 
generate_static_uuid, mocked_get_tasks):
     random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
     client = MagicMock()
     adapter = OpenLineageAdapter(client)
@@ -643,15 +703,28 @@ def test_emit_dag_failed_event(mock_stats_incr, 
mock_stats_timer, generate_stati
     dag_id = "dag_id"
     run_id = str(uuid.uuid4())
 
-    dagrun_mock = MagicMock()
-    dagrun_mock.start_date = event_time
-    dagrun_mock.end_date = event_time
-    dagrun_mock.run_id = run_id
-    dagrun_mock.dag_id = dag_id
+    with DAG(dag_id=dag_id, start_date=datetime.datetime(2024, 6, 1)):
+        task_0 = BashOperator(task_id="task_0", bash_command="exit 0;")
+        task_1 = BashOperator(task_id="task_1", bash_command="exit 0;")
+        task_2 = EmptyOperator(task_id="task_2.test")
+
+    dag_run = DagRun(
+        dag_id=dag_id,
+        run_id=run_id,
+        start_date=event_time,
+        execution_date=event_time,
+    )
+    dag_run._state = DagRunState.SUCCESS
+    dag_run.end_date = event_time
+    mocked_get_tasks.return_value = [
+        TaskInstance(task=task_0, run_id=run_id, 
state=TaskInstanceState.SUCCESS),
+        TaskInstance(task=task_1, run_id=run_id, 
state=TaskInstanceState.SKIPPED),
+        TaskInstance(task=task_2, run_id=run_id, 
state=TaskInstanceState.FAILED),
+    ]
     generate_static_uuid.return_value = random_uuid
 
     adapter.dag_failed(
-        dag_run=dagrun_mock,
+        dag_run=dag_run,
         msg="error msg",
     )
 
@@ -665,12 +738,20 @@ def test_emit_dag_failed_event(mock_stats_incr, 
mock_stats_timer, generate_stati
                     facets={
                         "errorMessage": ErrorMessageRunFacet(
                             message="error msg", programmingLanguage="python"
-                        )
+                        ),
+                        "airflowState": AirflowStateRunFacet(
+                            dagRunState=DagRunState.SUCCESS,
+                            tasksState={
+                                task_0.task_id: TaskInstanceState.SUCCESS,
+                                task_1.task_id: TaskInstanceState.SKIPPED,
+                                task_2.task_id: TaskInstanceState.FAILED,
+                            },
+                        ),
                     },
                 ),
                 job=Job(
                     namespace=namespace(),
-                    name="dag_id",
+                    name=dag_id,
                     facets={
                         "jobType": JobTypeJobFacet(
                             processingType="BATCH", integration="AIRFLOW", 
jobType="DAG"
diff --git a/tests/providers/openlineage/utils/test_utils.py 
b/tests/providers/openlineage/utils/test_utils.py
index ce1cd3be7e..d58be508d4 100644
--- a/tests/providers/openlineage/utils/test_utils.py
+++ b/tests/providers/openlineage/utils/test_utils.py
@@ -17,34 +17,443 @@
 # under the License.
 from __future__ import annotations
 
-import pytest
+import datetime
+from unittest.mock import MagicMock
 
-from airflow.decorators import task_group
-from airflow.models.taskinstance import TaskInstance as TI
+from airflow import DAG
+from airflow.models.mappedoperator import MappedOperator
+from airflow.operators.bash import BashOperator
 from airflow.operators.empty import EmptyOperator
-from airflow.providers.openlineage.plugins.facets import 
AirflowMappedTaskRunFacet
-from airflow.providers.openlineage.utils.utils import get_custom_facets
-from airflow.utils import timezone
+from airflow.operators.python import PythonOperator
+from airflow.providers.openlineage.plugins.facets import AirflowJobFacet
+from airflow.providers.openlineage.utils.utils import (
+    _get_parsed_dag_tree,
+    _get_task_groups_details,
+    _get_tasks_details,
+    _safe_get_dag_tree_view,
+    get_airflow_job_facet,
+    get_fully_qualified_class_name,
+    get_job_name,
+    get_operator_class,
+)
+from airflow.serialization.serialized_objects import SerializedBaseOperator
+from airflow.utils.task_group import TaskGroup
+from tests.test_utils.mock_operators import MockOperator
 
-DEFAULT_DATE = timezone.datetime(2016, 1, 1)
 
+class CustomOperatorForTest(BashOperator):
+    pass
 
[email protected]_test
-def test_get_custom_facets(dag_maker):
-    with dag_maker(dag_id="dag_test_get_custom_facets") as dag:
 
-        @task_group
-        def task_group_op(k):
-            EmptyOperator(task_id="empty_operator")
+class CustomOperatorFromEmpty(EmptyOperator):
+    pass
 
-        task_group_op.expand(k=[0])
 
-        dag_maker.create_dagrun()
-        ti_0 = TI(dag.get_task("task_group_op.empty_operator"), 
execution_date=DEFAULT_DATE, map_index=0)
+def test_get_airflow_job_facet():
+    with DAG(dag_id="dag", start_date=datetime.datetime(2024, 6, 1)) as dag:
+        task_0 = BashOperator(task_id="task_0", bash_command="exit 0;")
 
-        assert ti_0.map_index == 0
+        with TaskGroup("section_1", prefix_group_id=True):
+            task_10 = PythonOperator(task_id="task_3", python_callable=lambda: 
1)
 
-        assert get_custom_facets(ti_0)["airflow_mappedTask"] == 
AirflowMappedTaskRunFacet(
-            mapIndex=0,
-            
operatorClass=f"{ti_0.task.operator_class.__module__}.{ti_0.task.operator_class.__name__}",
+        task_0 >> task_10
+
+    dagrun_mock = MagicMock()
+    dagrun_mock.dag = dag
+
+    result = get_airflow_job_facet(dagrun_mock)
+    assert result == {
+        "airflow": AirflowJobFacet(
+            taskTree={"task_0": {"section_1.task_3": {}}},
+            taskGroups={
+                "section_1": {
+                    "parent_group": None,
+                    "tooltip": "",
+                    "ui_color": "CornflowerBlue",
+                    "ui_fgcolor": "#000",
+                    "ui_label": "section_1",
+                }
+            },
+            tasks={
+                "task_0": {
+                    "operator": "airflow.operators.bash.BashOperator",
+                    "task_group": None,
+                    "emits_ol_events": True,
+                    "ui_color": "#f0ede4",
+                    "ui_fgcolor": "#000",
+                    "ui_label": "task_0",
+                    "is_setup": False,
+                    "is_teardown": False,
+                },
+                "section_1.task_3": {
+                    "operator": "airflow.operators.python.PythonOperator",
+                    "task_group": "section_1",
+                    "emits_ol_events": True,
+                    "ui_color": "#ffefeb",
+                    "ui_fgcolor": "#000",
+                    "ui_label": "task_3",
+                    "is_setup": False,
+                    "is_teardown": False,
+                },
+            },
         )
+    }
+
+
+def test_get_fully_qualified_class_name_serialized_operator():
+    op_module_path = "airflow.operators.bash"
+    op_name = "BashOperator"
+
+    op = BashOperator(task_id="test", bash_command="echo 1")
+    op_path_before_serialization = get_fully_qualified_class_name(op)
+    assert op_path_before_serialization == f"{op_module_path}.{op_name}"
+
+    serialized = SerializedBaseOperator.serialize_operator(op)
+    deserialized = SerializedBaseOperator.deserialize_operator(serialized)
+
+    op_path_after_deserialization = 
get_fully_qualified_class_name(deserialized)
+    assert op_path_after_deserialization == f"{op_module_path}.{op_name}"
+    assert deserialized._task_module == op_module_path
+    assert deserialized._task_type == op_name
+
+
+def test_get_fully_qualified_class_name_mapped_operator():
+    mapped = MockOperator.partial(task_id="task_2").expand(arg2=["a", "b", 
"c"])
+    assert isinstance(mapped, MappedOperator)
+    mapped_op_path = get_fully_qualified_class_name(mapped)
+    assert mapped_op_path == "tests.test_utils.mock_operators.MockOperator"
+
+
+def test_get_fully_qualified_class_name_bash_operator():
+    result = get_fully_qualified_class_name(BashOperator(task_id="test", 
bash_command="echo 0;"))
+    expected_result = "airflow.operators.bash.BashOperator"
+    assert result == expected_result
+
+
+def test_get_job_name():
+    task_instance = MagicMock(dag_id="example_dag", task_id="example_task")
+    expected_result = "example_dag.example_task"
+    assert get_job_name(task_instance) == expected_result
+
+
+def test_get_job_name_empty_ids():
+    task_instance = MagicMock(dag_id="", task_id="")
+    expected_result = "."
+    assert get_job_name(task_instance) == expected_result
+
+
+def test_get_operator_class():
+    op_class = get_operator_class(BashOperator(task_id="test", 
bash_command="echo 0;"))
+    assert op_class == BashOperator
+
+
+def test_get_operator_class_mapped_operator():
+    mapped = MockOperator.partial(task_id="task").expand(arg2=["a", "b", "c"])
+    assert isinstance(mapped, MappedOperator)
+    op_class = get_operator_class(mapped)
+    assert op_class == MockOperator
+
+
+def test_get_tasks_details():
+    with DAG(dag_id="dag", start_date=datetime.datetime(2024, 6, 1)) as dag:
+        task = CustomOperatorForTest(task_id="task", bash_command="exit 0;")  
# noqa: F841
+        task_0 = BashOperator(task_id="task_0", bash_command="exit 0;")  # 
noqa: F841
+        task_1 = CustomOperatorFromEmpty(task_id="task_1")  # noqa: F841
+        task_2 = PythonOperator(task_id="task_2", python_callable=lambda: 1)  
# noqa: F841
+        task_3 = BashOperator(task_id="task_3", bash_command="exit 0;")  # 
noqa: F841
+        task_4 = EmptyOperator(task_id="task_4.test.dot")  # noqa: F841
+        task_5 = BashOperator(task_id="task_5", bash_command="exit 0;")  # 
noqa: F841
+
+        with TaskGroup("section_1", prefix_group_id=True) as tg:
+            task_10 = PythonOperator(task_id="task_3", python_callable=lambda: 
1)  # noqa: F841
+            with TaskGroup("section_2", parent_group=tg) as tg2:
+                task_11 = EmptyOperator(task_id="task_11")  # noqa: F841
+                with TaskGroup("section_3", parent_group=tg2):
+                    task_12 = PythonOperator(task_id="task_12", 
python_callable=lambda: 1)  # noqa: F841
+
+    expected = {
+        "task": {
+            "operator": 
"tests.providers.openlineage.utils.test_utils.CustomOperatorForTest",
+            "task_group": None,
+            "emits_ol_events": True,
+            "ui_color": CustomOperatorForTest.ui_color,
+            "ui_fgcolor": CustomOperatorForTest.ui_fgcolor,
+            "ui_label": "task",
+            "is_setup": False,
+            "is_teardown": False,
+        },
+        "task_0": {
+            "operator": "airflow.operators.bash.BashOperator",
+            "task_group": None,
+            "emits_ol_events": True,
+            "ui_color": BashOperator.ui_color,
+            "ui_fgcolor": BashOperator.ui_fgcolor,
+            "ui_label": "task_0",
+            "is_setup": False,
+            "is_teardown": False,
+        },
+        "task_1": {
+            "operator": 
"tests.providers.openlineage.utils.test_utils.CustomOperatorFromEmpty",
+            "task_group": None,
+            "emits_ol_events": False,
+            "ui_color": CustomOperatorFromEmpty.ui_color,
+            "ui_fgcolor": CustomOperatorFromEmpty.ui_fgcolor,
+            "ui_label": "task_1",
+            "is_setup": False,
+            "is_teardown": False,
+        },
+        "task_2": {
+            "operator": "airflow.operators.python.PythonOperator",
+            "task_group": None,
+            "emits_ol_events": True,
+            "ui_color": PythonOperator.ui_color,
+            "ui_fgcolor": PythonOperator.ui_fgcolor,
+            "ui_label": "task_2",
+            "is_setup": False,
+            "is_teardown": False,
+        },
+        "task_3": {
+            "operator": "airflow.operators.bash.BashOperator",
+            "task_group": None,
+            "emits_ol_events": True,
+            "ui_color": BashOperator.ui_color,
+            "ui_fgcolor": BashOperator.ui_fgcolor,
+            "ui_label": "task_3",
+            "is_setup": False,
+            "is_teardown": False,
+        },
+        "task_4.test.dot": {
+            "operator": "airflow.operators.empty.EmptyOperator",
+            "task_group": None,
+            "emits_ol_events": False,
+            "ui_color": EmptyOperator.ui_color,
+            "ui_fgcolor": EmptyOperator.ui_fgcolor,
+            "ui_label": "task_4.test.dot",
+            "is_setup": False,
+            "is_teardown": False,
+        },
+        "task_5": {
+            "operator": "airflow.operators.bash.BashOperator",
+            "task_group": None,
+            "emits_ol_events": True,
+            "ui_color": BashOperator.ui_color,
+            "ui_fgcolor": BashOperator.ui_fgcolor,
+            "ui_label": "task_5",
+            "is_setup": False,
+            "is_teardown": False,
+        },
+        "section_1.task_3": {
+            "operator": "airflow.operators.python.PythonOperator",
+            "task_group": "section_1",
+            "emits_ol_events": True,
+            "ui_color": PythonOperator.ui_color,
+            "ui_fgcolor": PythonOperator.ui_fgcolor,
+            "ui_label": "task_3",
+            "is_setup": False,
+            "is_teardown": False,
+        },
+        "section_1.section_2.task_11": {
+            "operator": "airflow.operators.empty.EmptyOperator",
+            "task_group": "section_1.section_2",
+            "emits_ol_events": False,
+            "ui_color": EmptyOperator.ui_color,
+            "ui_fgcolor": EmptyOperator.ui_fgcolor,
+            "ui_label": "task_11",
+            "is_setup": False,
+            "is_teardown": False,
+        },
+        "section_1.section_2.section_3.task_12": {
+            "operator": "airflow.operators.python.PythonOperator",
+            "task_group": "section_1.section_2.section_3",
+            "emits_ol_events": True,
+            "ui_color": PythonOperator.ui_color,
+            "ui_fgcolor": PythonOperator.ui_fgcolor,
+            "ui_label": "task_12",
+            "is_setup": False,
+            "is_teardown": False,
+        },
+    }
+
+    result = _get_tasks_details(dag)
+    assert result == expected
+
+
+def test_get_tasks_details_empty_dag():
+    assert _get_tasks_details(DAG("test_dag", 
start_date=datetime.datetime(2024, 6, 1))) == {}
+
+
+def test_dag_tree_level_indent():
+    """Tests the correct indentation of tasks in a DAG tree view.
+
+    Test verifies that the tree view of the DAG correctly represents the 
hierarchical structure
+    of the tasks with proper indentation. The expected indentation increases 
by 4 spaces for each
+    subsequent level in the DAG. The test asserts that the generated tree view 
matches the expected
+    lines with correct indentation.
+    """
+    with DAG(dag_id="dag", start_date=datetime.datetime(2024, 6, 1)) as dag:
+        task_0 = EmptyOperator(task_id="task_0")
+        task_1 = EmptyOperator(task_id="task_1")
+        task_2 = EmptyOperator(task_id="task_2")
+        task_3 = EmptyOperator(task_id="task_3")
+
+    task_0 >> task_1 >> task_2
+    task_3 >> task_2
+
+    indent = 4 * " "
+    expected_lines = [
+        "<Task(EmptyOperator): task_0>",
+        indent + "<Task(EmptyOperator): task_1>",
+        2 * indent + "<Task(EmptyOperator): task_2>",
+        "<Task(EmptyOperator): task_3>",
+        indent + "<Task(EmptyOperator): task_2>",
+    ]
+    assert _safe_get_dag_tree_view(dag) == expected_lines
+
+
+def test_get_dag_tree():
+    with DAG(dag_id="dag", start_date=datetime.datetime(2024, 6, 1)) as dag:
+        task = CustomOperatorForTest(task_id="task", bash_command="exit 0;")
+        task_0 = BashOperator(task_id="task_0", bash_command="exit 0;")
+        task_1 = BashOperator(task_id="task_1", bash_command="exit 1;")
+        task_2 = PythonOperator(task_id="task_2", python_callable=lambda: 1)
+        task_3 = BashOperator(task_id="task_3", bash_command="exit 0;")
+        task_4 = EmptyOperator(
+            task_id="task_4",
+        )
+        task_5 = BashOperator(task_id="task_5", bash_command="exit 0;")
+        task_6 = EmptyOperator(task_id="task_6.test5")
+        task_7 = BashOperator(task_id="task_7", bash_command="exit 0;")
+        task_8 = PythonOperator(task_id="task_8", python_callable=lambda: 1)  
# noqa: F841
+        task_9 = BashOperator(task_id="task_9", bash_command="exit 0;")
+
+        with TaskGroup("section_1", prefix_group_id=True) as tg:
+            task_10 = PythonOperator(task_id="task_3", python_callable=lambda: 
1)
+            with TaskGroup("section_2", parent_group=tg) as tg2:
+                task_11 = EmptyOperator(task_id="task_11")  # noqa: F841
+                with TaskGroup("section_3", parent_group=tg2):
+                    task_12 = PythonOperator(task_id="task_12", 
python_callable=lambda: 1)
+
+        task >> [task_2, task_7]
+        task_0 >> [task_2, task_1] >> task_3 >> [task_4, task_5] >> task_6
+        task_1 >> task_9 >> task_3 >> task_4 >> task_5 >> task_6
+        task_3 >> task_10 >> task_12
+
+        expected = {
+            "section_1.section_2.task_11": {},
+            "task": {
+                "task_2": {
+                    "task_3": {
+                        "section_1.task_3": 
{"section_1.section_2.section_3.task_12": {}},
+                        "task_4": {"task_5": {"task_6.test5": {}}, 
"task_6.test5": {}},
+                        "task_5": {"task_6.test5": {}},
+                    }
+                },
+                "task_7": {},
+            },
+            "task_0": {
+                "task_1": {
+                    "task_3": {
+                        "section_1.task_3": 
{"section_1.section_2.section_3.task_12": {}},
+                        "task_4": {"task_5": {"task_6.test5": {}}, 
"task_6.test5": {}},
+                        "task_5": {"task_6.test5": {}},
+                    },
+                    "task_9": {
+                        "task_3": {
+                            "section_1.task_3": 
{"section_1.section_2.section_3.task_12": {}},
+                            "task_4": {"task_5": {"task_6.test5": {}}, 
"task_6.test5": {}},
+                            "task_5": {"task_6.test5": {}},
+                        }
+                    },
+                },
+                "task_2": {
+                    "task_3": {
+                        "section_1.task_3": 
{"section_1.section_2.section_3.task_12": {}},
+                        "task_4": {"task_5": {"task_6.test5": {}}, 
"task_6.test5": {}},
+                        "task_5": {"task_6.test5": {}},
+                    }
+                },
+            },
+            "task_8": {},
+        }
+        result = _get_parsed_dag_tree(dag)
+        assert result == expected
+
+
+def test_get_dag_tree_empty_dag():
+    assert _get_parsed_dag_tree(DAG("test_dag", 
start_date=datetime.datetime(2024, 6, 1))) == {}
+
+
+def test_get_task_groups_details():
+    with DAG("test_dag", start_date=datetime.datetime(2024, 6, 1)) as dag:
+        with TaskGroup("tg1", prefix_group_id=True):
+            task_1 = EmptyOperator(task_id="task_1")  # noqa: F841
+        with TaskGroup("tg2", prefix_group_id=False):
+            task = EmptyOperator(task_id="task_1")  # noqa: F841
+        with TaskGroup("tg3"):
+            task_2 = EmptyOperator(task_id="task_2")  # noqa: F841
+
+    result = _get_task_groups_details(dag)
+    expected = {
+        "tg1": {
+            "parent_group": None,
+            "tooltip": "",
+            "ui_color": "CornflowerBlue",
+            "ui_fgcolor": "#000",
+            "ui_label": "tg1",
+        },
+        "tg2": {
+            "parent_group": None,
+            "tooltip": "",
+            "ui_color": "CornflowerBlue",
+            "ui_fgcolor": "#000",
+            "ui_label": "tg2",
+        },
+        "tg3": {
+            "parent_group": None,
+            "tooltip": "",
+            "ui_color": "CornflowerBlue",
+            "ui_fgcolor": "#000",
+            "ui_label": "tg3",
+        },
+    }
+
+    assert result == expected
+
+
+def test_get_task_groups_details_nested():
+    with DAG("test_dag", start_date=datetime.datetime(2024, 6, 1)) as dag:
+        with TaskGroup("tg1", prefix_group_id=True) as tg:
+            with TaskGroup("tg2", parent_group=tg) as tg2:
+                with TaskGroup("tg3", parent_group=tg2):
+                    pass
+
+    result = _get_task_groups_details(dag)
+    expected = {
+        "tg1": {
+            "parent_group": None,
+            "tooltip": "",
+            "ui_color": "CornflowerBlue",
+            "ui_fgcolor": "#000",
+            "ui_label": "tg1",
+        },
+        "tg1.tg2": {
+            "parent_group": "tg1",
+            "tooltip": "",
+            "ui_color": "CornflowerBlue",
+            "ui_fgcolor": "#000",
+            "ui_label": "tg2",
+        },
+        "tg1.tg2.tg3": {
+            "parent_group": "tg1.tg2",
+            "tooltip": "",
+            "ui_color": "CornflowerBlue",
+            "ui_fgcolor": "#000",
+            "ui_label": "tg3",
+        },
+    }
+
+    assert result == expected
+
+
+def test_get_task_groups_details_no_task_groups():
+    assert _get_task_groups_details(DAG("test_dag", 
start_date=datetime.datetime(2024, 6, 1))) == {}

Reply via email to