This is an automated email from the ASF dual-hosted git repository.

vatsrahul1001 pushed a commit to branch backport-67527-to-v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit aa45adb0636ab999a3ee5230ee1731df86a3e961
Author: Rahul Vats <[email protected]>
AuthorDate: Tue May 26 12:09:41 2026 +0530

    Fix Callback.handle_event crash on OTel metrics with dict tag values 
(#67527)
    
    Fix Callback.handle_event crash on OTel metrics with dict tag values 
(#67527)
    
    (cherry picked from commit 597891128e9a9deb82d121f471830914ed5cb049)
---
 airflow-core/src/airflow/models/callback.py     | 14 +++++++++++++
 airflow-core/tests/unit/models/test_callback.py | 28 ++++++++++++++++++++++++-
 2 files changed, 41 insertions(+), 1 deletion(-)

diff --git a/airflow-core/src/airflow/models/callback.py 
b/airflow-core/src/airflow/models/callback.py
index e08d58fa4da..c854b26cc4f 100644
--- a/airflow-core/src/airflow/models/callback.py
+++ b/airflow-core/src/airflow/models/callback.py
@@ -17,6 +17,7 @@
 from __future__ import annotations
 
 import inspect
+import json
 from collections.abc import Callable
 from datetime import datetime
 from enum import Enum
@@ -161,6 +162,19 @@ class Callback(Base, BaseWorkload):
             # Remove the context (if exists) to keep the tags simple
             tags["kwargs"] = {k: v for k, v in tags["kwargs"].items() if k != 
"context"}
 
+        # Metric backends (statsd, OpenTelemetry) require tag values to be 
primitives.
+        # OTel's aggregation key is built via 
``frozenset(attributes.items())``, which
+        # raises ``TypeError: unhashable type: 'dict'`` if a value is a 
dict/list. The
+        # callback's ``result`` (passed in from a user callback) and 
``kwargs`` are both
+        # frequently dicts, so coerce any non-primitive tag value to a JSON 
string before
+        # returning. Using ``default=str`` so values like ``datetime`` fall 
back cleanly.
+        tags = {
+            k: v
+            if isinstance(v, (str, int, float, bool)) or v is None
+            else json.dumps(v, default=str, sort_keys=True)
+            for k, v in tags.items()
+        }
+
         prefix = self.data.get("prefix", "")
         name = f"{prefix}.callback_{status}" if prefix else 
f"callback_{status}"
 
diff --git a/airflow-core/tests/unit/models/test_callback.py 
b/airflow-core/tests/unit/models/test_callback.py
index 5d4940b77ad..dbb8680b5a7 100644
--- a/airflow-core/tests/unit/models/test_callback.py
+++ b/airflow-core/tests/unit/models/test_callback.py
@@ -112,10 +112,36 @@ class TestCallback:
         assert metric_info["tags"] == {
             "result": "0",
             "path": TEST_ASYNC_CALLBACK.path,
-            "kwargs": {"email": "[email protected]"},
+            "kwargs": '{"email": "[email protected]"}',
             "dag_id": TEST_DAG_ID,
         }
 
+    def test_get_metric_info_dict_values_are_stringified(self):
+        """
+        Regression for ``TypeError: unhashable type: 'dict'`` raised by 
OpenTelemetry's
+        ``_view_instrument_match`` when callback metric tags contain dict/list 
values.
+
+        OTel builds its aggregation key as ``frozenset(attributes.items())``; 
any tag
+        value that isn't hashable (dict, list, set) crashes the triggerer when 
a
+        callback completes — e.g., deadline async callbacks whose ``result`` 
is a dict.
+        """
+        callback = TriggererCallback(TEST_ASYNC_CALLBACK, 
prefix="deadline_alerts", dag_id=TEST_DAG_ID)
+        callback.data["kwargs"] = {"context": {"dag_id": TEST_DAG_ID}, 
"nested": {"a": 1}}
+
+        # ``result`` is a dict — exactly the case that surfaced in the 
deadline DAG.
+        metric_info = callback.get_metric_info(CallbackState.SUCCESS, 
{"output": [1, 2], "code": 0})
+
+        # Every tag value must be a primitive (str/int/float/bool/None) so 
OTel can hash it.
+        for k, v in metric_info["tags"].items():
+            assert isinstance(v, (str, int, float, bool)) or v is None, (
+                f"Tag {k!r}={v!r} is type {type(v).__name__}; must be 
primitive for OTel."
+            )
+        # ``frozenset(attributes.items())`` must not raise.
+        frozenset(metric_info["tags"].items())
+        # Stringified tag values must be sorted so equivalent kwargs in 
different
+        # insertion order collapse to one metric series (no needless 
cardinality split).
+        assert metric_info["tags"]["result"] == '{"code": 0, "output": [1, 2]}'
+
 
 class TestTriggererCallback:
     def test_polymorphic_serde(self, session):

Reply via email to