This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c202c07f67 Introduce AirflowJobFacet and AirflowStateRunFacet (#39520)
c202c07f67 is described below
commit c202c07f67173718c736d95de22185b65b25b580
Author: Kacper Muda <[email protected]>
AuthorDate: Wed Jun 5 13:52:13 2024 +0200
Introduce AirflowJobFacet and AirflowStateRunFacet (#39520)
Signed-off-by: Kacper Muda <[email protected]>
---
.../openlineage/facets/AirflowJobFacet.json | 40 ++
.../openlineage/facets/AirflowRunFacet.json | 254 ++++++++++++
.../openlineage/facets/AirflowStateRunFacet.json | 34 ++
airflow/providers/openlineage/facets/__init__.py | 16 +
airflow/providers/openlineage/plugins/adapter.py | 20 +-
airflow/providers/openlineage/plugins/facets.py | 47 ++-
airflow/providers/openlineage/plugins/listener.py | 4 +
airflow/providers/openlineage/utils/utils.py | 186 ++++++++-
docs/spelling_wordlist.txt | 4 +
.../providers/openlineage/plugins/test_adapter.py | 129 ++++--
tests/providers/openlineage/utils/test_utils.py | 449 ++++++++++++++++++++-
11 files changed, 1115 insertions(+), 68 deletions(-)
diff --git a/airflow/providers/openlineage/facets/AirflowJobFacet.json
b/airflow/providers/openlineage/facets/AirflowJobFacet.json
new file mode 100644
index 0000000000..51a9954de3
--- /dev/null
+++ b/airflow/providers/openlineage/facets/AirflowJobFacet.json
@@ -0,0 +1,40 @@
+{
+ "$schema": "https://json-schema.org/draft/2020-12/schema",
+ "$defs": {
+ "AirflowJobFacet": {
+ "allOf": [
+ {
+ "$ref":
"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet"
+ },
+ {
+ "type": "object",
+ "properties": {
+ "taskTree": {
+ "description": "The hierarchical structure of tasks in the
DAG.",
+ "type": "object",
+ "additionalProperties": true
+ },
+ "taskGroups": {
+ "description": "Information about all task groups within the
DAG.",
+ "type": "object",
+ "additionalProperties": true
+ },
+ "tasks": {
+ "description": "Details of all individual tasks within the
DAG.",
+ "type": "object",
+ "additionalProperties": true
+ }
+ },
+ "required": ["taskTree", "taskGroups", "tasks"]
+ }
+ ],
+ "type": "object"
+ }
+ },
+ "type": "object",
+ "properties": {
+ "airflow": {
+ "$ref": "#/$defs/AirflowJobFacet"
+ }
+ }
+ }
diff --git a/airflow/providers/openlineage/facets/AirflowRunFacet.json
b/airflow/providers/openlineage/facets/AirflowRunFacet.json
new file mode 100644
index 0000000000..504fb1bc3a
--- /dev/null
+++ b/airflow/providers/openlineage/facets/AirflowRunFacet.json
@@ -0,0 +1,254 @@
+{
+ "$schema": "https://json-schema.org/draft/2020-12/schema",
+ "$defs": {
+ "AirflowRunFacet": {
+ "allOf": [
+ {
+ "$ref":
"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet"
+ },
+ {
+ "type": "object",
+ "properties": {
+ "dag": {
+ "$ref": "#/$defs/DAG"
+ },
+ "dagRun": {
+ "$ref": "#/$defs/DagRun"
+ },
+ "taskInstance": {
+ "$ref": "#/$defs/TaskInstance"
+ },
+ "task": {
+ "$ref": "#/$defs/Task"
+ },
+ "taskUuid": {
+ "type": "string"
+ }
+ },
+ "required": [
+ "dag",
+ "dagRun",
+ "taskInstance",
+ "task",
+ "taskUuid"
+ ]
+ }
+ ]
+ },
+ "Task": {
+ "type": "object",
+ "properties": {
+ "depends_on_past": {
+ "type": "boolean"
+ },
+ "downstream_task_ids": {
+ "type": "string"
+ },
+ "execution_timeout": {
+ "type": "string"
+ },
+ "executor_config": {
+ "type": "object",
+ "additionalProperties": true
+ },
+ "ignore_first_depends_on_past": {
+ "type": "boolean"
+ },
+ "is_setup": {
+ "type": "boolean"
+ },
+ "is_teardown": {
+ "type": "boolean"
+ },
+ "mapped": {
+ "type": "boolean"
+ },
+ "max_active_tis_per_dag": {
+ "type": "integer"
+ },
+ "max_active_tis_per_dagrun": {
+ "type": "integer"
+ },
+ "max_retry_delay": {
+ "type": "string"
+ },
+ "multiple_outputs": {
+ "type": "boolean"
+ },
+ "operator_class": {
+ "description": "Module + class name of the operator",
+ "type": "string"
+ },
+ "owner": {
+ "type": "string"
+ },
+ "priority_weight": {
+ "type": "integer"
+ },
+ "queue": {
+ "type": "string"
+ },
+ "retries": {
+ "type": "integer"
+ },
+ "retry_exponential_backoff": {
+ "type": "boolean"
+ },
+ "run_as_user": {
+ "type": "string"
+ },
+ "task_id": {
+ "type": "string"
+ },
+ "trigger_rule": {
+ "type": "string"
+ },
+ "upstream_task_ids": {
+ "type": "string"
+ },
+ "wait_for_downstream": {
+ "type": "boolean"
+ },
+ "wait_for_past_depends_before_skipping": {
+ "type": "boolean"
+ },
+ "weight_rule": {
+ "type": "string"
+ },
+ "task_group": {
+ "description": "Task group related information",
+ "type": "object",
+ "properties": {
+ "group_id": {
+ "type": "string"
+ },
+ "downstream_group_ids": {
+ "type": "string"
+ },
+ "downstream_task_ids": {
+ "type": "string"
+ },
+ "prefix_group_id": {
+ "type": "boolean"
+ },
+ "tooltip": {
+ "type": "string"
+ },
+ "upstream_group_ids": {
+ "type": "string"
+ },
+ "upstream_task_ids": {
+ "type": "string"
+ }
+ },
+ "additionalProperties": true,
+ "required": ["group_id"]
+ }
+ },
+ "additionalProperties": true,
+ "required": [
+ "task_id"
+ ]
+ },
+ "DAG": {
+ "type": "object",
+ "properties": {
+ "dag_id": {
+ "type": "string"
+ },
+ "description": {
+ "type": "string"
+ },
+ "owner": {
+ "type": "string"
+ },
+ "schedule_interval": {
+ "type": "string"
+ },
+ "start_date": {
+ "type": "string",
+ "format": "date-time"
+ },
+ "tags": {
+ "type": "string"
+ },
+ "timetable": {
+ "description": "Describes timetable (successor of
schedule_interval)",
+ "type": "object",
+ "additionalProperties": true
+ }
+ },
+ "additionalProperties": true,
+ "required": [
+ "dag_id",
+ "start_date"
+ ]
+ },
+ "TaskInstance": {
+ "type": "object",
+ "properties": {
+ "duration": {
+ "type": "number"
+ },
+ "map_index": {
+ "type": "integer"
+ },
+ "pool": {
+ "type": "string"
+ },
+ "try_number": {
+ "type": "integer"
+ }
+ },
+ "additionalProperties": true,
+ "required": [
+ "pool",
+ "try_number"
+ ]
+ },
+ "DagRun": {
+ "type": "object",
+ "properties": {
+ "conf": {
+ "type": "object",
+ "additionalProperties": true
+ },
+ "dag_id": {
+ "type": "string"
+ },
+ "data_interval_start": {
+ "type": "string",
+ "format": "date-time"
+ },
+ "data_interval_end": {
+ "type": "string",
+ "format": "date-time"
+ },
+ "external_trigger": {
+ "type": "boolean"
+ },
+ "run_id": {
+ "type": "string"
+ },
+ "run_type": {
+ "type": "string"
+ },
+ "start_date": {
+ "type": "string",
+ "format": "date-time"
+ }
+ },
+ "additionalProperties": true,
+ "required": [
+ "dag_id",
+ "run_id"
+ ]
+ }
+ },
+ "type": "object",
+ "properties": {
+ "airflow": {
+ "$ref": "#/$defs/AirflowRunFacet"
+ }
+ }
+}
diff --git a/airflow/providers/openlineage/facets/AirflowStateRunFacet.json
b/airflow/providers/openlineage/facets/AirflowStateRunFacet.json
new file mode 100644
index 0000000000..2788e17282
--- /dev/null
+++ b/airflow/providers/openlineage/facets/AirflowStateRunFacet.json
@@ -0,0 +1,34 @@
+{
+ "$schema": "https://json-schema.org/draft/2020-12/schema",
+ "$defs": {
+ "AirflowStateRunFacet": {
+ "allOf": [
+ {
+ "$ref":
"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet"
+ },
+ {
+ "type": "object",
+ "properties": {
+ "dagRunState": {
+ "description": "The final status of the entire DagRun",
+ "type": "string"
+ },
+ "tasksState": {
+ "description": "Mapping of task IDs to their respective
states",
+ "type": "object",
+ "additionalProperties": true
+ }
+ },
+ "required": ["dagRunState", "tasksState"]
+ }
+ ],
+ "type": "object"
+ }
+ },
+ "type": "object",
+ "properties": {
+ "airflowState": {
+ "$ref": "#/$defs/AirflowStateRunFacet"
+ }
+ }
+ }
diff --git a/airflow/providers/openlineage/facets/__init__.py
b/airflow/providers/openlineage/facets/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/airflow/providers/openlineage/facets/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/openlineage/plugins/adapter.py
b/airflow/providers/openlineage/plugins/adapter.py
index 608bd568e1..339ad55fdd 100644
--- a/airflow/providers/openlineage/plugins/adapter.py
+++ b/airflow/providers/openlineage/plugins/adapter.py
@@ -38,7 +38,10 @@ from openlineage.client.run import Job, Run, RunEvent,
RunState
from openlineage.client.uuid import generate_static_uuid
from airflow.providers.openlineage import __version__ as
OPENLINEAGE_PROVIDER_VERSION, conf
-from airflow.providers.openlineage.utils.utils import OpenLineageRedactor
+from airflow.providers.openlineage.utils.utils import (
+ OpenLineageRedactor,
+ get_airflow_state_run_facet,
+)
from airflow.stats import Stats
from airflow.utils.log.logging_mixin import LoggingMixin
@@ -321,12 +324,19 @@ class OpenLineageAdapter(LoggingMixin):
msg: str,
nominal_start_time: str,
nominal_end_time: str,
+ job_facets: dict[str, BaseFacet] | None = None, # Custom job facets
):
try:
event = RunEvent(
eventType=RunState.START,
eventTime=dag_run.start_date.isoformat(),
- job=self._build_job(job_name=dag_run.dag_id,
job_type=_JOB_TYPE_DAG),
+ job=self._build_job(
+ job_name=dag_run.dag_id,
+ job_type=_JOB_TYPE_DAG,
+ job_description=dag_run.dag.description if dag_run.dag
else None,
+ owners=[x.strip() for x in dag_run.dag.owner.split(",")]
if dag_run.dag else None,
+ job_facets=job_facets,
+ ),
run=self._build_run(
run_id=self.build_dag_run_id(
dag_id=dag_run.dag_id,
@@ -358,6 +368,7 @@ class OpenLineageAdapter(LoggingMixin):
dag_id=dag_run.dag_id,
execution_date=dag_run.execution_date,
),
+ facets={**get_airflow_state_run_facet(dag_run)},
),
inputs=[],
outputs=[],
@@ -381,7 +392,10 @@ class OpenLineageAdapter(LoggingMixin):
dag_id=dag_run.dag_id,
execution_date=dag_run.execution_date,
),
- facets={"errorMessage": ErrorMessageRunFacet(message=msg,
programmingLanguage="python")},
+ facets={
+ "errorMessage": ErrorMessageRunFacet(message=msg,
programmingLanguage="python"),
+ **get_airflow_state_run_facet(dag_run),
+ },
),
inputs=[],
outputs=[],
diff --git a/airflow/providers/openlineage/plugins/facets.py
b/airflow/providers/openlineage/plugins/facets.py
index 925f386d6e..fd1f6ef4b5 100644
--- a/airflow/providers/openlineage/plugins/facets.py
+++ b/airflow/providers/openlineage/plugins/facets.py
@@ -39,15 +39,56 @@ class AirflowMappedTaskRunFacet(BaseFacet):
@classmethod
def from_task_instance(cls, task_instance):
- task = task_instance.task
- from airflow.providers.openlineage.utils.utils import
get_operator_class
+ from airflow.providers.openlineage.utils.utils import
get_fully_qualified_class_name
return cls(
mapIndex=task_instance.map_index,
-
operatorClass=f"{get_operator_class(task).__module__}.{get_operator_class(task).__name__}",
+ operatorClass=get_fully_qualified_class_name(task_instance.task),
)
+@define(slots=False)
+class AirflowJobFacet(BaseFacet):
+ """
+ Composite Airflow job facet.
+
+ This facet encapsulates all the necessary information to re-create full
scope of an Airflow DAG logic,
+ enabling reconstruction, visualization, and analysis of DAGs in a
comprehensive manner.
+ It includes detailed representations of the tasks, task groups, and their
hierarchical relationships,
+ making it possible to draw a graph that visually represents the entire DAG
structure (like in Airflow UI).
+ It also indicates whether a task should emit an OpenLineage (OL) event,
enabling consumers to anticipate
+ the number of events and identify the tasks from which they can expect
these events.
+
+ Attributes:
+ taskTree: A dictionary representing the hierarchical structure of
tasks in the DAG.
+ taskGroups: A dictionary that contains information about task groups
within the DAG.
+ tasks: A dictionary detailing individual tasks within the DAG.
+ """
+
+ taskTree: dict
+ taskGroups: dict
+ tasks: dict
+
+
+@define(slots=False)
+class AirflowStateRunFacet(BaseFacet):
+ """
+ Airflow facet providing state information.
+
+ This facet is designed to be sent at a completion event, offering state
information about
+ the DAG run and each individual task. This information is crucial for
understanding
+ the execution flow and comprehensive post-run analysis and debugging,
including why certain tasks
+ did not emit events, which can occur due to the use of control flow
operators like the BranchOperator.
+
+ Attributes:
+ dagRunState: This indicates the final status of the entire DAG run
(e.g., "success", "failed").
+ tasksState: A dictionary mapping task IDs to their respective states.
(e.g., "failed", "skipped").
+ """
+
+ dagRunState: str
+ tasksState: dict[str, str]
+
+
@define(slots=False)
class AirflowRunFacet(BaseFacet):
"""Composite Airflow run facet."""
diff --git a/airflow/providers/openlineage/plugins/listener.py
b/airflow/providers/openlineage/plugins/listener.py
index 4a1085168a..76b60d61b7 100644
--- a/airflow/providers/openlineage/plugins/listener.py
+++ b/airflow/providers/openlineage/plugins/listener.py
@@ -30,6 +30,7 @@ from airflow.providers.openlineage import conf
from airflow.providers.openlineage.extractors import ExtractorManager
from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter,
RunState
from airflow.providers.openlineage.utils.utils import (
+ get_airflow_job_facet,
get_airflow_run_facet,
get_custom_facets,
get_job_name,
@@ -367,6 +368,9 @@ class OpenLineageListener:
msg=msg,
nominal_start_time=data_interval_start,
nominal_end_time=data_interval_end,
+ # AirflowJobFacet should be created outside ProcessPoolExecutor
that pickles objects,
+ # as it causes lack of some TaskGroup attributes and crashes event
emission.
+ job_facets={**get_airflow_job_facet(dag_run=dag_run)},
)
@hookimpl
diff --git a/airflow/providers/openlineage/utils/utils.py
b/airflow/providers/openlineage/utils/utils.py
index ff6ad63970..6904b32d2d 100644
--- a/airflow/providers/openlineage/utils/utils.py
+++ b/airflow/providers/openlineage/utils/utils.py
@@ -20,8 +20,10 @@ from __future__ import annotations
import datetime
import json
import logging
-from contextlib import suppress
+import re
+from contextlib import redirect_stdout, suppress
from functools import wraps
+from io import StringIO
from typing import TYPE_CHECKING, Any, Iterable
import attrs
@@ -32,8 +34,11 @@ from airflow.exceptions import
AirflowProviderDeprecationWarning # TODO: move t
from airflow.models import DAG, BaseOperator, MappedOperator
from airflow.providers.openlineage import conf
from airflow.providers.openlineage.plugins.facets import (
+ AirflowJobFacet,
AirflowMappedTaskRunFacet,
AirflowRunFacet,
+ AirflowStateRunFacet,
+ BaseFacet,
UnknownOperatorAttributeRunFacet,
UnknownOperatorInstance,
)
@@ -41,6 +46,7 @@ from airflow.providers.openlineage.utils.selective_enable
import (
is_dag_lineage_enabled,
is_task_lineage_enabled,
)
+from airflow.serialization.serialized_objects import SerializedBaseOperator
from airflow.utils.context import AirflowContextDeprecationWarning
from airflow.utils.log.secrets_masker import Redactable, Redacted,
SecretsMasker, should_hide_value_for_key
from airflow.utils.module_loading import import_string
@@ -78,7 +84,11 @@ def get_custom_facets(task_instance: TaskInstance | None =
None) -> dict[str, An
def get_fully_qualified_class_name(operator: BaseOperator | MappedOperator) ->
str:
- return operator.__class__.__module__ + "." + operator.__class__.__name__
+ if isinstance(operator, (MappedOperator, SerializedBaseOperator)):
+ # as in
airflow.api_connexion.schemas.common_schema.ClassReferenceSchema
+ return operator._task_module + "." + operator._task_type # type:
ignore
+ op_class = get_operator_class(operator)
+ return op_class.__module__ + "." + op_class.__name__
def is_operator_disabled(operator: BaseOperator | MappedOperator) -> bool:
@@ -168,7 +178,7 @@ class InfoJsonEncodable(dict):
class DagInfo(InfoJsonEncodable):
"""Defines encoding DAG object to JSON."""
- includes = ["dag_id", "schedule_interval", "tags", "start_date"]
+ includes = ["dag_id", "description", "owner", "schedule_interval",
"start_date", "tags"]
casts = {"timetable": lambda dag: dag.timetable.serialize() if
getattr(dag, "timetable", None) else None}
renames = {"_dag_id": "dag_id"}
@@ -193,9 +203,9 @@ class TaskInstanceInfo(InfoJsonEncodable):
includes = ["duration", "try_number", "pool"]
casts = {
- "map_index": lambda ti: ti.map_index
- if hasattr(ti, "map_index") and getattr(ti, "map_index") != -1
- else None
+ "map_index": lambda ti: (
+ ti.map_index if hasattr(ti, "map_index") and getattr(ti,
"map_index") != -1 else None
+ )
}
@@ -234,9 +244,11 @@ class TaskInfo(InfoJsonEncodable):
]
casts = {
"operator_class": lambda task: task.task_type,
- "task_group": lambda task: TaskGroupInfo(task.task_group)
- if hasattr(task, "task_group") and getattr(task.task_group,
"_group_id", None)
- else None,
+ "task_group": lambda task: (
+ TaskGroupInfo(task.task_group)
+ if hasattr(task, "task_group") and getattr(task.task_group,
"_group_id", None)
+ else None
+ ),
}
@@ -262,20 +274,158 @@ def get_airflow_run_facet(
task_instance: TaskInstance,
task: BaseOperator,
task_uuid: str,
-):
+) -> dict[str, BaseFacet]:
return {
- "airflow": attrs.asdict(
- AirflowRunFacet(
- dag=DagInfo(dag),
- dagRun=DagRunInfo(dag_run),
- taskInstance=TaskInstanceInfo(task_instance),
- task=TaskInfo(task),
- taskUuid=task_uuid,
- )
+ "airflow": AirflowRunFacet(
+ dag=DagInfo(dag),
+ dagRun=DagRunInfo(dag_run),
+ taskInstance=TaskInstanceInfo(task_instance),
+ task=TaskInfo(task),
+ taskUuid=task_uuid,
+ )
+ }
+
+
+def get_airflow_job_facet(dag_run: DagRun) -> dict[str, BaseFacet]:
+ if not dag_run.dag:
+ return {}
+ return {
+ "airflow": AirflowJobFacet(
+ taskTree=_get_parsed_dag_tree(dag_run.dag),
+ taskGroups=_get_task_groups_details(dag_run.dag),
+ tasks=_get_tasks_details(dag_run.dag),
+ )
+ }
+
+
+def get_airflow_state_run_facet(dag_run: DagRun) -> dict[str, BaseFacet]:
+ return {
+ "airflowState": AirflowStateRunFacet(
+ dagRunState=dag_run.get_state(),
+ tasksState={ti.task_id: ti.state for ti in
dag_run.get_task_instances()},
)
}
+def _safe_get_dag_tree_view(dag: DAG) -> list[str]:
+ # get_tree_view() has been added in Airflow 2.8.2
+ if hasattr(dag, "get_tree_view"):
+ return dag.get_tree_view().splitlines()
+
+ with redirect_stdout(StringIO()) as stdout:
+ dag.tree_view()
+ return stdout.getvalue().splitlines()
+
+
+def _get_parsed_dag_tree(dag: DAG) -> dict:
+ """
+ Get DAG's tasks hierarchy representation.
+
+ While the task dependencies are defined as following:
+ task >> [task_2, task_4] >> task_7
+ task_3 >> task_5
+ task_6 # has no dependencies, it's a root and a leaf
+
+ The result of this function will look like:
+ {
+ "task": {
+ "task_2": {
+ "task_7": {}
+ },
+ "task_4": {
+ "task_7": {}
+ }
+ },
+ "task_3": {
+ "task_5": {}
+ },
+ "task_6": {}
+ }
+ """
+ lines = _safe_get_dag_tree_view(dag)
+ task_dict: dict[str, dict] = {}
+ parent_map: dict[int, tuple[str, dict]] = {}
+
+ for line in lines:
+ stripped_line = line.strip()
+ if not stripped_line:
+ continue
+
+ # Determine the level by counting the leading spaces, assuming 4
spaces per level
+ # as defined in airflow.models.dag.DAG._generate_tree_view()
+ level = (len(line) - len(stripped_line)) // 4
+ # airflow.models.baseoperator.BaseOperator.__repr__ is used in DAG tree
+ # <Task({op_class}): {task_id}>
+ match = re.match(r"^<Task\((.+)\): (.*?)>$", stripped_line)
+ if not match:
+ return {}
+ current_task_id = match[2]
+
+ if level == 0: # It's a root task
+ task_dict[current_task_id] = {}
+ parent_map[level] = (current_task_id, task_dict[current_task_id])
+ else:
+ # Find the immediate parent task
+ parent_task, parent_dict = parent_map[(level - 1)]
+ # Create new dict for the current task
+ parent_dict[current_task_id] = {}
+ # Update this task in the parent map
+ parent_map[level] = (current_task_id, parent_dict[current_task_id])
+
+ return task_dict
+
+
+def _get_tasks_details(dag: DAG) -> dict:
+ tasks = {
+ single_task.task_id: {
+ "operator": get_fully_qualified_class_name(single_task),
+ "task_group": single_task.task_group.group_id if
single_task.task_group else None,
+ "emits_ol_events": _emits_ol_events(single_task),
+ "ui_color": single_task.ui_color,
+ "ui_fgcolor": single_task.ui_fgcolor,
+ "ui_label": single_task.label,
+ "is_setup": single_task.is_setup,
+ "is_teardown": single_task.is_teardown,
+ }
+ for single_task in dag.tasks
+ }
+
+ return tasks
+
+
+def _get_task_groups_details(dag: DAG) -> dict:
+ return {
+ tg_id: {
+ "parent_group": tg.parent_group.group_id,
+ "tooltip": tg.tooltip,
+ "ui_color": tg.ui_color,
+ "ui_fgcolor": tg.ui_fgcolor,
+ "ui_label": tg.label,
+ }
+ for tg_id, tg in dag.task_group_dict.items()
+ }
+
+
+def _emits_ol_events(task: BaseOperator | MappedOperator) -> bool:
+ config_selective_enabled = is_selective_lineage_enabled(task)
+ config_disabled_for_operators = is_operator_disabled(task)
+ # empty operators without callbacks/outlets are skipped for optimization
by Airflow
+ # in airflow.models.taskinstance.TaskInstance._schedule_downstream_tasks
+ is_skipped_as_empty_operator = all(
+ (
+ task.inherits_from_empty_operator,
+ not task.on_execute_callback,
+ not task.on_success_callback,
+ not task.outlets,
+ )
+ )
+
+ emits_ol_events = all(
+ (config_selective_enabled, not config_disabled_for_operators, not
is_skipped_as_empty_operator)
+ )
+ return emits_ol_events
+
+
def get_unknown_source_attribute_run_facet(task: BaseOperator, name: str |
None = None):
if not name:
name = get_operator_class(task).__name__
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 7ac38eeaf2..8c5164d675 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -362,6 +362,7 @@ dagrun
DagRunPydantic
dagruns
DagRunState
+dagRunState
DAGs
Dask
dask
@@ -1606,6 +1607,7 @@ taskflow
TaskGroup
taskgroup
TaskGroups
+taskGroups
TaskInstance
taskinstance
TaskInstanceKey
@@ -1613,6 +1615,8 @@ taskinstancekey
taskmeta
taskmixin
tasksetmeta
+tasksState
+taskTree
tblproperties
TCP
tcp
diff --git a/tests/providers/openlineage/plugins/test_adapter.py
b/tests/providers/openlineage/plugins/test_adapter.py
index 6f010b1a2d..0212f1402c 100644
--- a/tests/providers/openlineage/plugins/test_adapter.py
+++ b/tests/providers/openlineage/plugins/test_adapter.py
@@ -38,6 +38,11 @@ from openlineage.client.facet import (
)
from openlineage.client.run import Dataset, Job, Run, RunEvent, RunState
+from airflow import DAG
+from airflow.models.dagrun import DagRun, DagRunState
+from airflow.models.taskinstance import TaskInstance, TaskInstanceState
+from airflow.operators.bash import BashOperator
+from airflow.operators.empty import EmptyOperator
from airflow.providers.openlineage.conf import (
config_path,
custom_extractors,
@@ -49,6 +54,11 @@ from airflow.providers.openlineage.conf import (
)
from airflow.providers.openlineage.extractors import OperatorLineage
from airflow.providers.openlineage.plugins.adapter import _PRODUCER,
OpenLineageAdapter
+from airflow.providers.openlineage.plugins.facets import (
+ AirflowStateRunFacet,
+)
+from airflow.providers.openlineage.utils.utils import get_airflow_job_facet
+from airflow.utils.task_group import TaskGroup
from tests.test_utils.config import conf_vars
pytestmark = pytest.mark.db_test
@@ -534,19 +544,33 @@ def test_emit_dag_started_event(mock_stats_incr,
mock_stats_timer, generate_stat
dag_id = "dag_id"
run_id = str(uuid.uuid4())
- dagrun_mock = MagicMock()
- dagrun_mock.start_date = event_time
- dagrun_mock.run_id = run_id
- dagrun_mock.dag_id = dag_id
+ with DAG(dag_id=dag_id, description="dag desc",
start_date=datetime.datetime(2024, 6, 1)) as dag:
+ tg = TaskGroup(group_id="tg1")
+ tg2 = TaskGroup(group_id="tg2", parent_group=tg)
+ task_0 = BashOperator(task_id="task_0", bash_command="exit 0;") #
noqa: F841
+ task_1 = BashOperator(task_id="task_1", bash_command="exit 0;",
task_group=tg) # noqa: F841
+ task_2 = EmptyOperator(task_id="task_2.test.dot", task_group=tg2) #
noqa: F841
+
+ dag_run = DagRun(
+ dag_id=dag_id,
+ run_id=run_id,
+ start_date=event_time,
+ execution_date=event_time,
+ )
+ dag_run.dag = dag
generate_static_uuid.return_value = random_uuid
+ job_facets = {**get_airflow_job_facet(dag_run)}
+
adapter.dag_started(
- dag_run=dagrun_mock,
+ dag_run=dag_run,
msg="",
nominal_start_time=event_time.isoformat(),
nominal_end_time=event_time.isoformat(),
+ job_facets=job_facets,
)
+ assert len(client.emit.mock_calls) == 1
assert (
call(
RunEvent(
@@ -565,9 +589,16 @@ def test_emit_dag_started_event(mock_stats_incr,
mock_stats_timer, generate_stat
namespace=namespace(),
name="dag_id",
facets={
+ "documentation":
DocumentationJobFacet(description="dag desc"),
+ "ownership": OwnershipJobFacet(
+ owners=[
+ OwnershipJobFacetOwners(name="airflow",
type=None),
+ ]
+ ),
+ **job_facets,
"jobType": JobTypeJobFacet(
processingType="BATCH", integration="AIRFLOW",
jobType="DAG"
- )
+ ),
},
),
producer=_PRODUCER,
@@ -582,10 +613,11 @@ def test_emit_dag_started_event(mock_stats_incr,
mock_stats_timer, generate_stat
mock_stats_timer.assert_called_with("ol.emit.attempts")
[email protected](DagRun, "get_task_instances")
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
-def test_emit_dag_complete_event(mock_stats_incr, mock_stats_timer,
generate_static_uuid):
+def test_emit_dag_complete_event(mock_stats_incr, mock_stats_timer,
generate_static_uuid, mocked_get_tasks):
random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
client = MagicMock()
adapter = OpenLineageAdapter(client)
@@ -593,15 +625,30 @@ def test_emit_dag_complete_event(mock_stats_incr,
mock_stats_timer, generate_sta
dag_id = "dag_id"
run_id = str(uuid.uuid4())
- dagrun_mock = MagicMock()
- dagrun_mock.start_date = event_time
- dagrun_mock.end_date = event_time
- dagrun_mock.run_id = run_id
- dagrun_mock.dag_id = dag_id
+ with DAG(dag_id=dag_id, start_date=datetime.datetime(2024, 6, 1)):
+ task_0 = BashOperator(task_id="task_0", bash_command="exit 0;")
+ task_1 = BashOperator(task_id="task_1", bash_command="exit 0;")
+ task_2 = EmptyOperator(
+ task_id="task_2.test",
+ )
+
+ dag_run = DagRun(
+ dag_id=dag_id,
+ run_id=run_id,
+ start_date=event_time,
+ execution_date=event_time,
+ )
+ dag_run._state = DagRunState.SUCCESS
+ dag_run.end_date = event_time
+ mocked_get_tasks.return_value = [
+ TaskInstance(task=task_0, run_id=run_id,
state=TaskInstanceState.SUCCESS),
+ TaskInstance(task=task_1, run_id=run_id,
state=TaskInstanceState.SKIPPED),
+ TaskInstance(task=task_2, run_id=run_id,
state=TaskInstanceState.FAILED),
+ ]
generate_static_uuid.return_value = random_uuid
adapter.dag_success(
- dag_run=dagrun_mock,
+ dag_run=dag_run,
msg="",
)
@@ -610,10 +657,22 @@ def test_emit_dag_complete_event(mock_stats_incr,
mock_stats_timer, generate_sta
RunEvent(
eventType=RunState.COMPLETE,
eventTime=event_time.isoformat(),
- run=Run(runId=random_uuid, facets={}),
+ run=Run(
+ runId=random_uuid,
+ facets={
+ "airflowState": AirflowStateRunFacet(
+ dagRunState=DagRunState.SUCCESS,
+ tasksState={
+ task_0.task_id: TaskInstanceState.SUCCESS,
+ task_1.task_id: TaskInstanceState.SKIPPED,
+ task_2.task_id: TaskInstanceState.FAILED,
+ },
+ )
+ },
+ ),
job=Job(
namespace=namespace(),
- name="dag_id",
+ name=dag_id,
facets={
"jobType": JobTypeJobFacet(
processingType="BATCH", integration="AIRFLOW",
jobType="DAG"
@@ -632,10 +691,11 @@ def test_emit_dag_complete_event(mock_stats_incr,
mock_stats_timer, generate_sta
mock_stats_timer.assert_called_with("ol.emit.attempts")
[email protected](DagRun, "get_task_instances")
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
-def test_emit_dag_failed_event(mock_stats_incr, mock_stats_timer,
generate_static_uuid):
+def test_emit_dag_failed_event(mock_stats_incr, mock_stats_timer,
generate_static_uuid, mocked_get_tasks):
random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
client = MagicMock()
adapter = OpenLineageAdapter(client)
@@ -643,15 +703,28 @@ def test_emit_dag_failed_event(mock_stats_incr,
mock_stats_timer, generate_stati
dag_id = "dag_id"
run_id = str(uuid.uuid4())
- dagrun_mock = MagicMock()
- dagrun_mock.start_date = event_time
- dagrun_mock.end_date = event_time
- dagrun_mock.run_id = run_id
- dagrun_mock.dag_id = dag_id
+ with DAG(dag_id=dag_id, start_date=datetime.datetime(2024, 6, 1)):
+ task_0 = BashOperator(task_id="task_0", bash_command="exit 0;")
+ task_1 = BashOperator(task_id="task_1", bash_command="exit 0;")
+ task_2 = EmptyOperator(task_id="task_2.test")
+
+ dag_run = DagRun(
+ dag_id=dag_id,
+ run_id=run_id,
+ start_date=event_time,
+ execution_date=event_time,
+ )
+ dag_run._state = DagRunState.SUCCESS
+ dag_run.end_date = event_time
+ mocked_get_tasks.return_value = [
+ TaskInstance(task=task_0, run_id=run_id,
state=TaskInstanceState.SUCCESS),
+ TaskInstance(task=task_1, run_id=run_id,
state=TaskInstanceState.SKIPPED),
+ TaskInstance(task=task_2, run_id=run_id,
state=TaskInstanceState.FAILED),
+ ]
generate_static_uuid.return_value = random_uuid
adapter.dag_failed(
- dag_run=dagrun_mock,
+ dag_run=dag_run,
msg="error msg",
)
@@ -665,12 +738,20 @@ def test_emit_dag_failed_event(mock_stats_incr,
mock_stats_timer, generate_stati
facets={
"errorMessage": ErrorMessageRunFacet(
message="error msg", programmingLanguage="python"
- )
+ ),
+ "airflowState": AirflowStateRunFacet(
+ dagRunState=DagRunState.SUCCESS,
+ tasksState={
+ task_0.task_id: TaskInstanceState.SUCCESS,
+ task_1.task_id: TaskInstanceState.SKIPPED,
+ task_2.task_id: TaskInstanceState.FAILED,
+ },
+ ),
},
),
job=Job(
namespace=namespace(),
- name="dag_id",
+ name=dag_id,
facets={
"jobType": JobTypeJobFacet(
processingType="BATCH", integration="AIRFLOW",
jobType="DAG"
diff --git a/tests/providers/openlineage/utils/test_utils.py
b/tests/providers/openlineage/utils/test_utils.py
index ce1cd3be7e..d58be508d4 100644
--- a/tests/providers/openlineage/utils/test_utils.py
+++ b/tests/providers/openlineage/utils/test_utils.py
@@ -17,34 +17,443 @@
# under the License.
from __future__ import annotations
-import pytest
+import datetime
+from unittest.mock import MagicMock
-from airflow.decorators import task_group
-from airflow.models.taskinstance import TaskInstance as TI
+from airflow import DAG
+from airflow.models.mappedoperator import MappedOperator
+from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
-from airflow.providers.openlineage.plugins.facets import
AirflowMappedTaskRunFacet
-from airflow.providers.openlineage.utils.utils import get_custom_facets
-from airflow.utils import timezone
+from airflow.operators.python import PythonOperator
+from airflow.providers.openlineage.plugins.facets import AirflowJobFacet
+from airflow.providers.openlineage.utils.utils import (
+ _get_parsed_dag_tree,
+ _get_task_groups_details,
+ _get_tasks_details,
+ _safe_get_dag_tree_view,
+ get_airflow_job_facet,
+ get_fully_qualified_class_name,
+ get_job_name,
+ get_operator_class,
+)
+from airflow.serialization.serialized_objects import SerializedBaseOperator
+from airflow.utils.task_group import TaskGroup
+from tests.test_utils.mock_operators import MockOperator
-DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+class CustomOperatorForTest(BashOperator):
+ pass
[email protected]_test
-def test_get_custom_facets(dag_maker):
- with dag_maker(dag_id="dag_test_get_custom_facets") as dag:
- @task_group
- def task_group_op(k):
- EmptyOperator(task_id="empty_operator")
+class CustomOperatorFromEmpty(EmptyOperator):
+ pass
- task_group_op.expand(k=[0])
- dag_maker.create_dagrun()
- ti_0 = TI(dag.get_task("task_group_op.empty_operator"),
execution_date=DEFAULT_DATE, map_index=0)
+def test_get_airflow_job_facet():
+ with DAG(dag_id="dag", start_date=datetime.datetime(2024, 6, 1)) as dag:
+ task_0 = BashOperator(task_id="task_0", bash_command="exit 0;")
- assert ti_0.map_index == 0
+ with TaskGroup("section_1", prefix_group_id=True):
+ task_10 = PythonOperator(task_id="task_3", python_callable=lambda:
1)
- assert get_custom_facets(ti_0)["airflow_mappedTask"] ==
AirflowMappedTaskRunFacet(
- mapIndex=0,
-
operatorClass=f"{ti_0.task.operator_class.__module__}.{ti_0.task.operator_class.__name__}",
+ task_0 >> task_10
+
+ dagrun_mock = MagicMock()
+ dagrun_mock.dag = dag
+
+ result = get_airflow_job_facet(dagrun_mock)
+ assert result == {
+ "airflow": AirflowJobFacet(
+ taskTree={"task_0": {"section_1.task_3": {}}},
+ taskGroups={
+ "section_1": {
+ "parent_group": None,
+ "tooltip": "",
+ "ui_color": "CornflowerBlue",
+ "ui_fgcolor": "#000",
+ "ui_label": "section_1",
+ }
+ },
+ tasks={
+ "task_0": {
+ "operator": "airflow.operators.bash.BashOperator",
+ "task_group": None,
+ "emits_ol_events": True,
+ "ui_color": "#f0ede4",
+ "ui_fgcolor": "#000",
+ "ui_label": "task_0",
+ "is_setup": False,
+ "is_teardown": False,
+ },
+ "section_1.task_3": {
+ "operator": "airflow.operators.python.PythonOperator",
+ "task_group": "section_1",
+ "emits_ol_events": True,
+ "ui_color": "#ffefeb",
+ "ui_fgcolor": "#000",
+ "ui_label": "task_3",
+ "is_setup": False,
+ "is_teardown": False,
+ },
+ },
)
+ }
+
+
+def test_get_fully_qualified_class_name_serialized_operator():
+ op_module_path = "airflow.operators.bash"
+ op_name = "BashOperator"
+
+ op = BashOperator(task_id="test", bash_command="echo 1")
+ op_path_before_serialization = get_fully_qualified_class_name(op)
+ assert op_path_before_serialization == f"{op_module_path}.{op_name}"
+
+ serialized = SerializedBaseOperator.serialize_operator(op)
+ deserialized = SerializedBaseOperator.deserialize_operator(serialized)
+
+ op_path_after_deserialization =
get_fully_qualified_class_name(deserialized)
+ assert op_path_after_deserialization == f"{op_module_path}.{op_name}"
+ assert deserialized._task_module == op_module_path
+ assert deserialized._task_type == op_name
+
+
+def test_get_fully_qualified_class_name_mapped_operator():
+ mapped = MockOperator.partial(task_id="task_2").expand(arg2=["a", "b",
"c"])
+ assert isinstance(mapped, MappedOperator)
+ mapped_op_path = get_fully_qualified_class_name(mapped)
+ assert mapped_op_path == "tests.test_utils.mock_operators.MockOperator"
+
+
+def test_get_fully_qualified_class_name_bash_operator():
+ result = get_fully_qualified_class_name(BashOperator(task_id="test",
bash_command="echo 0;"))
+ expected_result = "airflow.operators.bash.BashOperator"
+ assert result == expected_result
+
+
+def test_get_job_name():
+ task_instance = MagicMock(dag_id="example_dag", task_id="example_task")
+ expected_result = "example_dag.example_task"
+ assert get_job_name(task_instance) == expected_result
+
+
+def test_get_job_name_empty_ids():
+ task_instance = MagicMock(dag_id="", task_id="")
+ expected_result = "."
+ assert get_job_name(task_instance) == expected_result
+
+
+def test_get_operator_class():
+ op_class = get_operator_class(BashOperator(task_id="test",
bash_command="echo 0;"))
+ assert op_class == BashOperator
+
+
+def test_get_operator_class_mapped_operator():
+ mapped = MockOperator.partial(task_id="task").expand(arg2=["a", "b", "c"])
+ assert isinstance(mapped, MappedOperator)
+ op_class = get_operator_class(mapped)
+ assert op_class == MockOperator
+
+
+def test_get_tasks_details():
+ with DAG(dag_id="dag", start_date=datetime.datetime(2024, 6, 1)) as dag:
+ task = CustomOperatorForTest(task_id="task", bash_command="exit 0;")
# noqa: F841
+ task_0 = BashOperator(task_id="task_0", bash_command="exit 0;") #
noqa: F841
+ task_1 = CustomOperatorFromEmpty(task_id="task_1") # noqa: F841
+ task_2 = PythonOperator(task_id="task_2", python_callable=lambda: 1)
# noqa: F841
+ task_3 = BashOperator(task_id="task_3", bash_command="exit 0;") #
noqa: F841
+ task_4 = EmptyOperator(task_id="task_4.test.dot") # noqa: F841
+ task_5 = BashOperator(task_id="task_5", bash_command="exit 0;") #
noqa: F841
+
+ with TaskGroup("section_1", prefix_group_id=True) as tg:
+ task_10 = PythonOperator(task_id="task_3", python_callable=lambda:
1) # noqa: F841
+ with TaskGroup("section_2", parent_group=tg) as tg2:
+ task_11 = EmptyOperator(task_id="task_11") # noqa: F841
+ with TaskGroup("section_3", parent_group=tg2):
+ task_12 = PythonOperator(task_id="task_12",
python_callable=lambda: 1) # noqa: F841
+
+ expected = {
+ "task": {
+ "operator":
"tests.providers.openlineage.utils.test_utils.CustomOperatorForTest",
+ "task_group": None,
+ "emits_ol_events": True,
+ "ui_color": CustomOperatorForTest.ui_color,
+ "ui_fgcolor": CustomOperatorForTest.ui_fgcolor,
+ "ui_label": "task",
+ "is_setup": False,
+ "is_teardown": False,
+ },
+ "task_0": {
+ "operator": "airflow.operators.bash.BashOperator",
+ "task_group": None,
+ "emits_ol_events": True,
+ "ui_color": BashOperator.ui_color,
+ "ui_fgcolor": BashOperator.ui_fgcolor,
+ "ui_label": "task_0",
+ "is_setup": False,
+ "is_teardown": False,
+ },
+ "task_1": {
+ "operator":
"tests.providers.openlineage.utils.test_utils.CustomOperatorFromEmpty",
+ "task_group": None,
+ "emits_ol_events": False,
+ "ui_color": CustomOperatorFromEmpty.ui_color,
+ "ui_fgcolor": CustomOperatorFromEmpty.ui_fgcolor,
+ "ui_label": "task_1",
+ "is_setup": False,
+ "is_teardown": False,
+ },
+ "task_2": {
+ "operator": "airflow.operators.python.PythonOperator",
+ "task_group": None,
+ "emits_ol_events": True,
+ "ui_color": PythonOperator.ui_color,
+ "ui_fgcolor": PythonOperator.ui_fgcolor,
+ "ui_label": "task_2",
+ "is_setup": False,
+ "is_teardown": False,
+ },
+ "task_3": {
+ "operator": "airflow.operators.bash.BashOperator",
+ "task_group": None,
+ "emits_ol_events": True,
+ "ui_color": BashOperator.ui_color,
+ "ui_fgcolor": BashOperator.ui_fgcolor,
+ "ui_label": "task_3",
+ "is_setup": False,
+ "is_teardown": False,
+ },
+ "task_4.test.dot": {
+ "operator": "airflow.operators.empty.EmptyOperator",
+ "task_group": None,
+ "emits_ol_events": False,
+ "ui_color": EmptyOperator.ui_color,
+ "ui_fgcolor": EmptyOperator.ui_fgcolor,
+ "ui_label": "task_4.test.dot",
+ "is_setup": False,
+ "is_teardown": False,
+ },
+ "task_5": {
+ "operator": "airflow.operators.bash.BashOperator",
+ "task_group": None,
+ "emits_ol_events": True,
+ "ui_color": BashOperator.ui_color,
+ "ui_fgcolor": BashOperator.ui_fgcolor,
+ "ui_label": "task_5",
+ "is_setup": False,
+ "is_teardown": False,
+ },
+ "section_1.task_3": {
+ "operator": "airflow.operators.python.PythonOperator",
+ "task_group": "section_1",
+ "emits_ol_events": True,
+ "ui_color": PythonOperator.ui_color,
+ "ui_fgcolor": PythonOperator.ui_fgcolor,
+ "ui_label": "task_3",
+ "is_setup": False,
+ "is_teardown": False,
+ },
+ "section_1.section_2.task_11": {
+ "operator": "airflow.operators.empty.EmptyOperator",
+ "task_group": "section_1.section_2",
+ "emits_ol_events": False,
+ "ui_color": EmptyOperator.ui_color,
+ "ui_fgcolor": EmptyOperator.ui_fgcolor,
+ "ui_label": "task_11",
+ "is_setup": False,
+ "is_teardown": False,
+ },
+ "section_1.section_2.section_3.task_12": {
+ "operator": "airflow.operators.python.PythonOperator",
+ "task_group": "section_1.section_2.section_3",
+ "emits_ol_events": True,
+ "ui_color": PythonOperator.ui_color,
+ "ui_fgcolor": PythonOperator.ui_fgcolor,
+ "ui_label": "task_12",
+ "is_setup": False,
+ "is_teardown": False,
+ },
+ }
+
+ result = _get_tasks_details(dag)
+ assert result == expected
+
+
+def test_get_tasks_details_empty_dag():
+ assert _get_tasks_details(DAG("test_dag",
start_date=datetime.datetime(2024, 6, 1))) == {}
+
+
+def test_dag_tree_level_indent():
+ """Tests the correct indentation of tasks in a DAG tree view.
+
+ Test verifies that the tree view of the DAG correctly represents the
hierarchical structure
+ of the tasks with proper indentation. The expected indentation increases
by 4 spaces for each
+ subsequent level in the DAG. The test asserts that the generated tree view
matches the expected
+ lines with correct indentation.
+ """
+ with DAG(dag_id="dag", start_date=datetime.datetime(2024, 6, 1)) as dag:
+ task_0 = EmptyOperator(task_id="task_0")
+ task_1 = EmptyOperator(task_id="task_1")
+ task_2 = EmptyOperator(task_id="task_2")
+ task_3 = EmptyOperator(task_id="task_3")
+
+ task_0 >> task_1 >> task_2
+ task_3 >> task_2
+
+ indent = 4 * " "
+ expected_lines = [
+ "<Task(EmptyOperator): task_0>",
+ indent + "<Task(EmptyOperator): task_1>",
+ 2 * indent + "<Task(EmptyOperator): task_2>",
+ "<Task(EmptyOperator): task_3>",
+ indent + "<Task(EmptyOperator): task_2>",
+ ]
+ assert _safe_get_dag_tree_view(dag) == expected_lines
+
+
+def test_get_dag_tree():
+ with DAG(dag_id="dag", start_date=datetime.datetime(2024, 6, 1)) as dag:
+ task = CustomOperatorForTest(task_id="task", bash_command="exit 0;")
+ task_0 = BashOperator(task_id="task_0", bash_command="exit 0;")
+ task_1 = BashOperator(task_id="task_1", bash_command="exit 1;")
+ task_2 = PythonOperator(task_id="task_2", python_callable=lambda: 1)
+ task_3 = BashOperator(task_id="task_3", bash_command="exit 0;")
+ task_4 = EmptyOperator(
+ task_id="task_4",
+ )
+ task_5 = BashOperator(task_id="task_5", bash_command="exit 0;")
+ task_6 = EmptyOperator(task_id="task_6.test5")
+ task_7 = BashOperator(task_id="task_7", bash_command="exit 0;")
+ task_8 = PythonOperator(task_id="task_8", python_callable=lambda: 1)
# noqa: F841
+ task_9 = BashOperator(task_id="task_9", bash_command="exit 0;")
+
+ with TaskGroup("section_1", prefix_group_id=True) as tg:
+ task_10 = PythonOperator(task_id="task_3", python_callable=lambda:
1)
+ with TaskGroup("section_2", parent_group=tg) as tg2:
+ task_11 = EmptyOperator(task_id="task_11") # noqa: F841
+ with TaskGroup("section_3", parent_group=tg2):
+ task_12 = PythonOperator(task_id="task_12",
python_callable=lambda: 1)
+
+ task >> [task_2, task_7]
+ task_0 >> [task_2, task_1] >> task_3 >> [task_4, task_5] >> task_6
+ task_1 >> task_9 >> task_3 >> task_4 >> task_5 >> task_6
+ task_3 >> task_10 >> task_12
+
+ expected = {
+ "section_1.section_2.task_11": {},
+ "task": {
+ "task_2": {
+ "task_3": {
+ "section_1.task_3":
{"section_1.section_2.section_3.task_12": {}},
+ "task_4": {"task_5": {"task_6.test5": {}},
"task_6.test5": {}},
+ "task_5": {"task_6.test5": {}},
+ }
+ },
+ "task_7": {},
+ },
+ "task_0": {
+ "task_1": {
+ "task_3": {
+ "section_1.task_3":
{"section_1.section_2.section_3.task_12": {}},
+ "task_4": {"task_5": {"task_6.test5": {}},
"task_6.test5": {}},
+ "task_5": {"task_6.test5": {}},
+ },
+ "task_9": {
+ "task_3": {
+ "section_1.task_3":
{"section_1.section_2.section_3.task_12": {}},
+ "task_4": {"task_5": {"task_6.test5": {}},
"task_6.test5": {}},
+ "task_5": {"task_6.test5": {}},
+ }
+ },
+ },
+ "task_2": {
+ "task_3": {
+ "section_1.task_3":
{"section_1.section_2.section_3.task_12": {}},
+ "task_4": {"task_5": {"task_6.test5": {}},
"task_6.test5": {}},
+ "task_5": {"task_6.test5": {}},
+ }
+ },
+ },
+ "task_8": {},
+ }
+ result = _get_parsed_dag_tree(dag)
+ assert result == expected
+
+
+def test_get_dag_tree_empty_dag():
+ assert _get_parsed_dag_tree(DAG("test_dag",
start_date=datetime.datetime(2024, 6, 1))) == {}
+
+
+def test_get_task_groups_details():
+ with DAG("test_dag", start_date=datetime.datetime(2024, 6, 1)) as dag:
+ with TaskGroup("tg1", prefix_group_id=True):
+ task_1 = EmptyOperator(task_id="task_1") # noqa: F841
+ with TaskGroup("tg2", prefix_group_id=False):
+ task = EmptyOperator(task_id="task_1") # noqa: F841
+ with TaskGroup("tg3"):
+ task_2 = EmptyOperator(task_id="task_2") # noqa: F841
+
+ result = _get_task_groups_details(dag)
+ expected = {
+ "tg1": {
+ "parent_group": None,
+ "tooltip": "",
+ "ui_color": "CornflowerBlue",
+ "ui_fgcolor": "#000",
+ "ui_label": "tg1",
+ },
+ "tg2": {
+ "parent_group": None,
+ "tooltip": "",
+ "ui_color": "CornflowerBlue",
+ "ui_fgcolor": "#000",
+ "ui_label": "tg2",
+ },
+ "tg3": {
+ "parent_group": None,
+ "tooltip": "",
+ "ui_color": "CornflowerBlue",
+ "ui_fgcolor": "#000",
+ "ui_label": "tg3",
+ },
+ }
+
+ assert result == expected
+
+
+def test_get_task_groups_details_nested():
+ with DAG("test_dag", start_date=datetime.datetime(2024, 6, 1)) as dag:
+ with TaskGroup("tg1", prefix_group_id=True) as tg:
+ with TaskGroup("tg2", parent_group=tg) as tg2:
+ with TaskGroup("tg3", parent_group=tg2):
+ pass
+
+ result = _get_task_groups_details(dag)
+ expected = {
+ "tg1": {
+ "parent_group": None,
+ "tooltip": "",
+ "ui_color": "CornflowerBlue",
+ "ui_fgcolor": "#000",
+ "ui_label": "tg1",
+ },
+ "tg1.tg2": {
+ "parent_group": "tg1",
+ "tooltip": "",
+ "ui_color": "CornflowerBlue",
+ "ui_fgcolor": "#000",
+ "ui_label": "tg2",
+ },
+ "tg1.tg2.tg3": {
+ "parent_group": "tg1.tg2",
+ "tooltip": "",
+ "ui_color": "CornflowerBlue",
+ "ui_fgcolor": "#000",
+ "ui_label": "tg3",
+ },
+ }
+
+ assert result == expected
+
+
+def test_get_task_groups_details_no_task_groups():
+ assert _get_task_groups_details(DAG("test_dag",
start_date=datetime.datetime(2024, 6, 1))) == {}