This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new cc4ac7784de feat: Add DagRun note to OL events (#62347)
cc4ac7784de is described below
commit cc4ac7784de4608fd067e13b6fdd0addc2a90b84
Author: Kacper Muda <[email protected]>
AuthorDate: Mon Mar 2 21:01:14 2026 +0100
feat: Add DagRun note to OL events (#62347)
---
.../airflow/providers/openlineage/utils/utils.py | 8 +++++++-
.../providers/openlineage/version_compat.py | 3 ++-
.../tests/unit/openlineage/utils/test_utils.py | 22 +++++++++++++++++++++-
3 files changed, 30 insertions(+), 3 deletions(-)
diff --git
a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
index ed8667ffab0..bfc9f566fcb 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
@@ -60,7 +60,11 @@ from airflow.providers.openlineage.utils.selective_enable
import (
is_dag_lineage_enabled,
is_task_lineage_enabled,
)
-from airflow.providers.openlineage.version_compat import AIRFLOW_V_3_0_PLUS,
get_base_airflow_version_tuple
+from airflow.providers.openlineage.version_compat import (
+ AIRFLOW_V_3_0_PLUS,
+ AIRFLOW_V_3_2_PLUS,
+ get_base_airflow_version_tuple,
+)
from airflow.serialization.serialized_objects import SerializedBaseOperator,
SerializedDAG
try:
@@ -749,6 +753,7 @@ class DagRunInfo(InfoJsonEncodable):
]
casts = {
+ "note": lambda dagrun: getattr(dagrun, "note", None) if
AIRFLOW_V_3_2_PLUS else None,
"duration": lambda dagrun: DagRunInfo.duration(dagrun),
"dag_bundle_name": lambda dagrun: DagRunInfo.dag_version_info(dagrun,
"bundle_name"),
"dag_bundle_version": lambda dagrun:
DagRunInfo.dag_version_info(dagrun, "bundle_version"),
@@ -837,6 +842,7 @@ class TaskInfo(InfoJsonEncodable):
# Operator-specific useful attributes
"trigger_dag_id", # TriggerDagRunOperator
"trigger_run_id", # TriggerDagRunOperator
+ "note", # TriggerDagRunOperator
"external_dag_id", # ExternalTaskSensor and ExternalTaskMarker (if
run, as it's EmptyOperator)
"external_task_id", # ExternalTaskSensor and ExternalTaskMarker (if
run, as it's EmptyOperator)
"external_task_ids", # ExternalTaskSensor
diff --git
a/providers/openlineage/src/airflow/providers/openlineage/version_compat.py
b/providers/openlineage/src/airflow/providers/openlineage/version_compat.py
index 4a2c6ca5c6c..114631640bc 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/version_compat.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/version_compat.py
@@ -33,6 +33,7 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
+AIRFLOW_V_3_2_PLUS = get_base_airflow_version_tuple() >= (3, 2, 0)
-__all__ = ["AIRFLOW_V_3_0_PLUS"]
+__all__ = ["AIRFLOW_V_3_0_PLUS", "AIRFLOW_V_3_2_PLUS"]
diff --git a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
index 4e27cdcf486..4e76a795bb8 100644
--- a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
+++ b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
@@ -80,7 +80,11 @@ from airflow.utils.types import DagRunType
from tests_common.test_utils.compat import BashOperator,
OperatorSerialization, PythonOperator
from tests_common.test_utils.mock_operators import MockOperator
from tests_common.test_utils.taskinstance import create_task_instance
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_3_PLUS,
AIRFLOW_V_3_0_PLUS
+from tests_common.test_utils.version_compat import (
+ AIRFLOW_V_3_0_3_PLUS,
+ AIRFLOW_V_3_0_PLUS,
+ AIRFLOW_V_3_2_PLUS,
+)
BASH_OPERATOR_PATH = "airflow.providers.standard.operators.bash"
PYTHON_OPERATOR_PATH = "airflow.providers.standard.operators.python"
@@ -185,6 +189,7 @@ def test_get_airflow_dag_run_facet():
dagrun_mock.end_date = datetime.datetime(2024, 6, 1, 1, 2, 14, 34172,
tzinfo=datetime.timezone.utc)
dagrun_mock.triggering_user_name = "user1"
dagrun_mock.triggered_by = "something"
+ dagrun_mock.note = "note"
dagrun_mock.dag_versions = [
MagicMock(
bundle_name="bundle_name",
@@ -209,6 +214,9 @@ def test_get_airflow_dag_run_facet():
}
if hasattr(dag, "schedule_interval"): # Airflow 2 compat.
expected_dag_info["schedule_interval"] = "@once"
+ note: str | None = None
+ if AIRFLOW_V_3_2_PLUS:
+ note = "note"
assert result == {
"airflowDagRun": AirflowDagRunFacet(
dag=expected_dag_info,
@@ -233,6 +241,7 @@ def test_get_airflow_dag_run_facet():
"dag_version_number": "version_number",
"triggering_user_name": "user1",
"triggered_by": "something",
+ "note": note,
},
)
}
@@ -2665,6 +2674,10 @@ def test_dagrun_info_af3(mocked_dag_versions):
dv2.bundle_name = "bundle_name"
dv2.bundle_version = "bundle_version"
+ optional_args = {}
+ if AIRFLOW_V_3_2_PLUS:
+ optional_args["note"] = "note"
+
mocked_dag_versions.return_value = [dv1, dv2]
dagrun = DagRun(
dag_id="dag_id",
@@ -2681,6 +2694,7 @@ def test_dagrun_info_af3(mocked_dag_versions):
triggered_by=DagRunTriggeredByType.UI,
backfill_id=999,
bundle_version="bundle_version",
+ **optional_args,
)
assert dagrun.dag_versions == [dv1, dv2]
dagrun.end_date = date + datetime.timedelta(seconds=74, microseconds=546)
@@ -2706,6 +2720,7 @@ def test_dagrun_info_af3(mocked_dag_versions):
"dag_version_number": "version_number",
"triggered_by": DagRunTriggeredByType.UI,
"triggering_user_name": "my_user",
+ "note": optional_args.get("note"),
}
@@ -2748,6 +2763,7 @@ def test_dagrun_info_af2():
"dag_bundle_version": None,
"dag_version_id": None,
"dag_version_number": None,
+ "note": None,
}
@@ -2852,6 +2868,7 @@ def test_task_info_af3():
self.tol = "tol" # SQLValueCheckOperator
self.trigger_dag_id = "trigger_dag_id" # TriggerDagRunOperator
self.trigger_run_id = "trigger_run_id" # TriggerDagRunOperator
+ self.note = "note" # TriggerDagRunOperator
self.hitl_summary = "hitl_summary" # HITLOperator
super().__init__(*args, **kwargs)
@@ -2899,6 +2916,7 @@ def test_task_info_af3():
"max_active_tis_per_dagrun": None,
"max_retry_delay": None,
"multiple_outputs": False,
+ "note": "note",
"operator_class": "CustomOperator",
"operator_class_path": get_fully_qualified_class_name(task_10),
"operator_provider_version": None, # Custom operator doesn't have
provider version
@@ -2979,6 +2997,7 @@ def test_task_info_af2():
self.tol = "tol" # SQLValueCheckOperator
self.trigger_dag_id = "trigger_dag_id" # TriggerDagRunOperator
self.trigger_run_id = "trigger_run_id" # TriggerDagRunOperator
+ self.note = "note" # TriggerDagRunOperator
self.hitl_summary = "hitl_summary" # HITLOperator
super().__init__(*args, **kwargs)
@@ -3063,6 +3082,7 @@ def test_task_info_af2():
"max_threshold": "max_threshold",
"metrics_thresholds": "metrics_thresholds",
"min_threshold": "min_threshold",
+ "note": "note",
"parameters": "parameters",
"pass_value": "pass_value",
"postoperator": "postoperator",