This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c34a933b097 feat: Add DagRun note to OL events (#62221)
c34a933b097 is described below

commit c34a933b09780159fc6a5824b0346e4c54fe3a7e
Author: Kacper Muda <[email protected]>
AuthorDate: Fri Feb 20 17:52:40 2026 +0100

    feat: Add DagRun note to OL events (#62221)
---
 .../src/airflow/providers/openlineage/utils/utils.py |  2 ++
 .../tests/unit/openlineage/utils/test_utils.py       | 20 +++++++++++++++++++-
 2 files changed, 21 insertions(+), 1 deletion(-)

diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py 
b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
index ed8667ffab0..f7b32452d48 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
@@ -740,6 +740,7 @@ class DagRunInfo(InfoJsonEncodable):
         "execution_date",  # Airflow 2
         "external_trigger",  # Removed in Airflow 3, use run_type instead
         "logical_date",  # Airflow 3
+        "note",  # Airflow 3.2+
         "run_after",  # Airflow 3
         "run_id",
         "run_type",
@@ -837,6 +838,7 @@ class TaskInfo(InfoJsonEncodable):
         # Operator-specific useful attributes
         "trigger_dag_id",  # TriggerDagRunOperator
         "trigger_run_id",  # TriggerDagRunOperator
+        "note",  # TriggerDagRunOperator
         "external_dag_id",  # ExternalTaskSensor and ExternalTaskMarker (if 
run, as it's EmptyOperator)
         "external_task_id",  # ExternalTaskSensor and ExternalTaskMarker (if 
run, as it's EmptyOperator)
         "external_task_ids",  # ExternalTaskSensor
diff --git a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py 
b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
index 4e27cdcf486..6a4b5c29482 100644
--- a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
+++ b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
@@ -80,7 +80,11 @@ from airflow.utils.types import DagRunType
 from tests_common.test_utils.compat import BashOperator, 
OperatorSerialization, PythonOperator
 from tests_common.test_utils.mock_operators import MockOperator
 from tests_common.test_utils.taskinstance import create_task_instance
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_3_PLUS, 
AIRFLOW_V_3_0_PLUS
+from tests_common.test_utils.version_compat import (
+    AIRFLOW_V_3_0_3_PLUS,
+    AIRFLOW_V_3_0_PLUS,
+    AIRFLOW_V_3_2_PLUS,
+)
 
 BASH_OPERATOR_PATH = "airflow.providers.standard.operators.bash"
 PYTHON_OPERATOR_PATH = "airflow.providers.standard.operators.python"
@@ -185,6 +189,7 @@ def test_get_airflow_dag_run_facet():
     dagrun_mock.end_date = datetime.datetime(2024, 6, 1, 1, 2, 14, 34172, 
tzinfo=datetime.timezone.utc)
     dagrun_mock.triggering_user_name = "user1"
     dagrun_mock.triggered_by = "something"
+    dagrun_mock.note = "note"
     dagrun_mock.dag_versions = [
         MagicMock(
             bundle_name="bundle_name",
@@ -226,6 +231,7 @@ def test_get_airflow_dag_run_facet():
                 "duration": 10.034172,
                 "execution_date": "2024-06-01T01:02:04+00:00",
                 "logical_date": "2024-06-01T01:02:04+00:00",
+                "note": "note",
                 "run_after": "2024-06-01T01:02:04+00:00",
                 "dag_bundle_name": "bundle_name",
                 "dag_bundle_version": "bundle_version",
@@ -2665,6 +2671,10 @@ def test_dagrun_info_af3(mocked_dag_versions):
     dv2.bundle_name = "bundle_name"
     dv2.bundle_version = "bundle_version"
 
+    optional_args = {}
+    if AIRFLOW_V_3_2_PLUS:
+        optional_args["note"] = "note"
+
     mocked_dag_versions.return_value = [dv1, dv2]
     dagrun = DagRun(
         dag_id="dag_id",
@@ -2681,6 +2691,7 @@ def test_dagrun_info_af3(mocked_dag_versions):
         triggered_by=DagRunTriggeredByType.UI,
         backfill_id=999,
         bundle_version="bundle_version",
+        **optional_args,
     )
     assert dagrun.dag_versions == [dv1, dv2]
     dagrun.end_date = date + datetime.timedelta(seconds=74, microseconds=546)
@@ -2706,6 +2717,7 @@ def test_dagrun_info_af3(mocked_dag_versions):
         "dag_version_number": "version_number",
         "triggered_by": DagRunTriggeredByType.UI,
         "triggering_user_name": "my_user",
+        "note": optional_args.get("note"),
     }
 
 
@@ -2728,6 +2740,7 @@ def test_dagrun_info_af2():
     )
     dagrun.start_date = date
     dagrun.end_date = date + datetime.timedelta(seconds=74, microseconds=546)
+    dagrun.note = "note"
 
     result = DagRunInfo(dagrun)
     assert dict(result) == {
@@ -2748,6 +2761,7 @@ def test_dagrun_info_af2():
         "dag_bundle_version": None,
         "dag_version_id": None,
         "dag_version_number": None,
+        "note": "note",
     }
 
 
@@ -2852,6 +2866,7 @@ def test_task_info_af3():
             self.tol = "tol"  # SQLValueCheckOperator
             self.trigger_dag_id = "trigger_dag_id"  # TriggerDagRunOperator
             self.trigger_run_id = "trigger_run_id"  # TriggerDagRunOperator
+            self.note = "note"  # TriggerDagRunOperator
             self.hitl_summary = "hitl_summary"  # HITLOperator
             super().__init__(*args, **kwargs)
 
@@ -2899,6 +2914,7 @@ def test_task_info_af3():
         "max_active_tis_per_dagrun": None,
         "max_retry_delay": None,
         "multiple_outputs": False,
+        "note": "note",
         "operator_class": "CustomOperator",
         "operator_class_path": get_fully_qualified_class_name(task_10),
         "operator_provider_version": None,  # Custom operator doesn't have 
provider version
@@ -2979,6 +2995,7 @@ def test_task_info_af2():
             self.tol = "tol"  # SQLValueCheckOperator
             self.trigger_dag_id = "trigger_dag_id"  # TriggerDagRunOperator
             self.trigger_run_id = "trigger_run_id"  # TriggerDagRunOperator
+            self.note = "note"  # TriggerDagRunOperator
             self.hitl_summary = "hitl_summary"  # HITLOperator
             super().__init__(*args, **kwargs)
 
@@ -3029,6 +3046,7 @@ def test_task_info_af2():
         "max_active_tis_per_dagrun": None,
         "max_retry_delay": None,
         "multiple_outputs": False,
+        "note": "note",
         "operator_class": "CustomOperator",
         "operator_class_path": get_fully_qualified_class_name(task_10),
         "operator_provider_version": None,  # Custom operator doesn't have 
provider version

Reply via email to