This is an automated email from the ASF dual-hosted git repository.

vatsrahul1001 pushed a commit to branch fix-callback-otel-unhashable-dict
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 6097e9dd405fef163eae4d1717e544f24b5a5d50
Author: vatsrahul1001 <[email protected]>
AuthorDate: Tue May 26 10:55:51 2026 +0530

    Fix Callback.handle_event crash on OTel metrics with dict tag values
    
    The triggerer crashes on the next deadline async callback when OpenTelemetry
    metrics are enabled:
    
        File ".../airflow/jobs/triggerer_job_runner.py", line 659, in 
handle_events
            Trigger.submit_event(...)
        File ".../airflow/models/callback.py", line 234, in handle_event
            Stats.incr(**self.get_metric_info(status, self.output))
        File ".../airflow/_shared/observability/metrics/otel_logger.py", line 
211, in incr
            counter.add(count, attributes=tags)
        File ".../opentelemetry/sdk/metrics/.../view_instrument_match.py", line 
105
            aggr_key = frozenset(attributes.items())
        TypeError: unhashable type: 'dict'
    
    `Callback.get_metric_info` builds the metric tags dict directly from the
    callback's `result` and `self.data` (which includes `kwargs`). Both are
    frequently dicts — for deadline async callbacks the `result` is the user
    callback's return value, and `kwargs` is the captured callback kwargs. When
    the metrics backend is OTel, the SDK builds the aggregation key as
    `frozenset(attributes.items())`, which raises if any value is unhashable
    (dict, list, set). The result is a triggerer crash and stalled triggers.
    
    The bug is metrics-backend-dependent: statsd accepts non-primitive tag 
values
    without complaint, so OSS users running default statsd never see it. OTel
    backends (used in production by Astronomer Astro Cloud and any OSS 
deployment
    that enables `[metrics] otel_*`) hit it consistently.
    
    Reproduces against 3.2.1 and main; not a 3.2.x regression.
    
    Sanitize tag values to primitives before returning from `get_metric_info`:
    keep `str | int | float | bool | None` as-is, JSON-stringify anything else.
    Using `default=str` in `json.dumps` so values like `datetime` fall back
    cleanly instead of raising.
    
    Adds a regression test that asserts every tag value is hashable and that
    `frozenset(tags.items())` does not raise.
    
    Reported by Astronomer Runtime team while testing 3.2.2rc2-based images.
---
 airflow-core/src/airflow/models/callback.py     | 12 +++++++++++
 airflow-core/tests/unit/models/test_callback.py | 27 ++++++++++++++++++++++++-
 2 files changed, 38 insertions(+), 1 deletion(-)

diff --git a/airflow-core/src/airflow/models/callback.py 
b/airflow-core/src/airflow/models/callback.py
index 15f9662cdc8..d9b58690e30 100644
--- a/airflow-core/src/airflow/models/callback.py
+++ b/airflow-core/src/airflow/models/callback.py
@@ -16,6 +16,7 @@
 # under the License.
 from __future__ import annotations
 
+import json
 from dataclasses import dataclass
 from datetime import datetime
 from enum import Enum
@@ -161,6 +162,17 @@ class Callback(Base, BaseWorkload):
             # Remove the context (if exists) to keep the tags simple
             tags["kwargs"] = {k: v for k, v in tags["kwargs"].items() if k != 
"context"}
 
+        # Metric backends (statsd, OpenTelemetry) require tag values to be 
primitives.
+        # OTel's aggregation key is built via 
``frozenset(attributes.items())``, which
+        # raises ``TypeError: unhashable type: 'dict'`` if a value is a 
dict/list. The
+        # callback's ``result`` (passed in from a user callback) and 
``kwargs`` are both
+        # frequently dicts, so coerce any non-primitive tag value to a JSON 
string before
+        # returning. Using ``default=str`` so values like ``datetime`` fall 
back cleanly.
+        tags = {
+            k: v if isinstance(v, (str, int, float, bool)) or v is None else 
json.dumps(v, default=str)
+            for k, v in tags.items()
+        }
+
         prefix = self.data.get("prefix", "")
         name = f"{prefix}.callback_{status}" if prefix else 
f"callback_{status}"
 
diff --git a/airflow-core/tests/unit/models/test_callback.py 
b/airflow-core/tests/unit/models/test_callback.py
index bb3102decf8..fbfdd233aaa 100644
--- a/airflow-core/tests/unit/models/test_callback.py
+++ b/airflow-core/tests/unit/models/test_callback.py
@@ -109,13 +109,38 @@ class TestCallback:
         metric_info = callback.get_metric_info(CallbackState.SUCCESS, "0")
 
         assert metric_info["stat"] == "deadline_alerts.callback_success"
+        # kwargs is JSON-stringified so non-primitive tag values don't crash 
OTel
+        # (see test_get_metric_info_dict_values_are_stringified for the 
regression).
         assert metric_info["tags"] == {
             "result": "0",
             "path": TEST_ASYNC_CALLBACK.path,
-            "kwargs": {"email": "[email protected]"},
+            "kwargs": '{"email": "[email protected]"}',
             "dag_id": TEST_DAG_ID,
         }
 
+    def test_get_metric_info_dict_values_are_stringified(self):
+        """
+        Regression for ``TypeError: unhashable type: 'dict'`` raised by 
OpenTelemetry's
+        ``_view_instrument_match`` when callback metric tags contain dict/list 
values.
+
+        OTel builds its aggregation key as ``frozenset(attributes.items())``; 
any tag
+        value that isn't hashable (dict, list, set) crashes the triggerer when 
a
+        callback completes — e.g., deadline async callbacks whose ``result`` 
is a dict.
+        """
+        callback = TriggererCallback(TEST_ASYNC_CALLBACK, 
prefix="deadline_alerts", dag_id=TEST_DAG_ID)
+        callback.data["kwargs"] = {"context": {"dag_id": TEST_DAG_ID}, 
"nested": {"a": 1}}
+
+        # ``result`` is a dict — exactly the case that surfaced in the 
deadline DAG.
+        metric_info = callback.get_metric_info(CallbackState.SUCCESS, 
{"output": [1, 2], "code": 0})
+
+        # Every tag value must be a primitive (str/int/float/bool/None) so 
OTel can hash it.
+        for k, v in metric_info["tags"].items():
+            assert isinstance(v, (str, int, float, bool)) or v is None, (
+                f"Tag {k!r}={v!r} is type {type(v).__name__}; must be 
primitive for OTel."
+            )
+        # ``frozenset(attributes.items())`` must not raise.
+        frozenset(metric_info["tags"].items())
+
 
 class TestTriggererCallback:
     def test_polymorphic_serde(self, session):

Reply via email to