This is an automated email from the ASF dual-hosted git repository.

kaxil pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c68247ecd3f Add `OpenTelemetry` tracing for `common.ai` Pydantic AI 
agents (#67792)
c68247ecd3f is described below

commit c68247ecd3f3bacb94a643749aa93fa83308b41e
Author: Kaxil Naik <[email protected]>
AuthorDate: Mon Jun 1 11:29:41 2026 +0100

    Add `OpenTelemetry` tracing for `common.ai` Pydantic AI agents (#67792)
    
    Attach Pydantic AI's native OpenTelemetry instrumentation in
    PydanticAIHook.create_agent() so agent, model, and tool spans (with token
    usage) export through Airflow's existing [traces] OTLP exporter and nest
    under the task span. Gated by [common.ai] otel_export_enabled (off by
    default); reuses the global TracerProvider rather than configuring its own.
    Content capture is off by default, gated by [common.ai] capture_content.
    
    
    Automatic retries reuse the task instance's persisted trace context, so all
    attempts share one trace (distinguished by try_number); only a manual clear 
or
    rerun starts a new trace. The previous wording said each retry was a 
distinct
    trace, which is only true for a manual clear.
---
 docs/spelling_wordlist.txt                         |   5 +
 providers/common/ai/docs/index.rst                 |   1 +
 providers/common/ai/docs/observability.rst         |  98 +++++++++++++++
 providers/common/ai/provider.yaml                  |  35 ++++++
 .../providers/common/ai/get_provider_info.py       |  16 ++-
 .../providers/common/ai/hooks/pydantic_ai.py       |  19 ++-
 .../airflow/providers/common/ai/observability.py   | 105 ++++++++++++++++
 .../tests/unit/common/ai/hooks/test_pydantic_ai.py |  42 +++++++
 .../ai/tests/unit/common/ai/test_observability.py  | 138 +++++++++++++++++++++
 9 files changed, 457 insertions(+), 2 deletions(-)

diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index f8e7efca67f..504ae7058d8 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -689,6 +689,7 @@ gdbm
 gdrive
 gelf
 gemini
+GenAI
 generateUploadUrl
 geq
 getattr
@@ -866,6 +867,7 @@ iteratively
 iTerm
 iterm
 itertools
+Jaeger
 Jarek
 JavaCoordinator
 javascript
@@ -956,6 +958,7 @@ kylin
 Kyverno
 Lakehouse
 langchain
+Langfuse
 LanguageServiceClient
 lastname
 latencies
@@ -1147,6 +1150,7 @@ openjdk
 openlineage
 OpenSearch
 opensearch
+OpenTelemetry
 oper
 OperatorLineage
 Opsgenie
@@ -1166,6 +1170,7 @@ oss
 ot
 OTel
 otel
+OTLP
 PaaS
 Pagerduty
 pagerduty
diff --git a/providers/common/ai/docs/index.rst 
b/providers/common/ai/docs/index.rst
index 590a770d481..b8a99c91cf8 100644
--- a/providers/common/ai/docs/index.rst
+++ b/providers/common/ai/docs/index.rst
@@ -41,6 +41,7 @@
     Operators <operators/index>
     Retry Policies <retry_policies>
     HITL Review <hitl_review>
+    Observability <observability>
 
 .. toctree::
     :hidden:
diff --git a/providers/common/ai/docs/observability.rst 
b/providers/common/ai/docs/observability.rst
new file mode 100644
index 00000000000..6669419c8aa
--- /dev/null
+++ b/providers/common/ai/docs/observability.rst
@@ -0,0 +1,98 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+Observability (OpenTelemetry tracing)
+=====================================
+
+pydantic-ai ships native OpenTelemetry instrumentation that emits GenAI spans
+for each agent run, model call, and tool call, following the
+`OpenTelemetry GenAI semantic conventions 
<https://opentelemetry.io/docs/specs/semconv/gen-ai/>`__.
+When enabled, this provider turns that instrumentation on for every agent it
+builds and routes the spans through the OpenTelemetry exporter Airflow already
+uses, so they appear in whatever backend your deployment runs (Jaeger, Tempo,
+Grafana, Phoenix, Langfuse, an OTLP collector, ...), correlated to the task 
that
+produced them.
+
+This covers all of the LLM operators 
(:class:`~airflow.providers.common.ai.operators.agent.AgentOperator`,
+``@task.agent`` / ``@task.llm`` and the SQL / branch / file-analysis / 
schema-compare
+operators), because they all build their agent through
+:meth:`~airflow.providers.common.ai.hooks.pydantic_ai.PydanticAIHook.create_agent`.
+
+How it works
+------------
+
+* **No extra infrastructure.** The provider does not configure an exporter or a
+  ``TracerProvider`` of its own. It reuses the global provider that Airflow's
+  core tracing installs, so the spans share the exporter and endpoint already
+  configured under ``[traces]`` / the standard ``OTEL_EXPORTER_OTLP_*``
+  environment variables. If core tracing is not enabled in the worker process,
+  no GenAI spans are emitted.
+* **Correlation is automatic.** The worker opens a task span before the 
operator
+  runs, so the agent's spans nest under it and inherit the task's ``trace_id``
+  and ``airflow.*`` attributes (dag id, run id, task id, try number, map 
index).
+  An automatic retry reuses the task instance's persisted trace context, so all
+  attempts share one trace and appear as repeated task-run spans on it,
+  distinguished by ``try number``. Only a manual clear or rerun regenerates the
+  context and starts a new trace.
+* **Content is off by default.** Only token counts, model id, latency, tool
+  names, and finish reason are recorded. Prompt and completion text is never
+  emitted unless you opt in (see below).
+
+Enabling it
+-----------
+
+Enable core tracing and turn on the provider option:
+
+.. code-block:: ini
+
+    [traces]
+    otel_on = True
+
+    [common.ai]
+    otel_export_enabled = True
+
+Configure the exporter destination with the standard OpenTelemetry environment
+variables, for example:
+
+.. code-block:: bash
+
+    # Core tracing defaults the exporter to OTLP/gRPC. For an OTLP/HTTP
+    # endpoint (port 4318, ``/v1/traces`` path) also select the HTTP exporter:
+    export OTEL_TRACES_EXPORTER="otlp_proto_http"
+    export 
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT="http://otel-collector:4318/v1/traces";
+
+Capturing prompt and completion content
+---------------------------------------
+
+By default the spans carry no message text. To also record model inputs and
+outputs (``gen_ai.input.messages`` / ``gen_ai.output.messages``), set:
+
+.. code-block:: ini
+
+    [common.ai]
+    capture_content = True
+
+.. warning::
+
+    With ``capture_content`` enabled, prompts, completions, and tool IO are
+    exported to your tracing backend **without redaction**. Airflow's secret 
masking
+    applies to logs and rendered template fields, not to OpenTelemetry span
+    attributes, so it does not scrub this content. Enable it only for debugging
+    in a trusted environment. It has no effect unless ``otel_export_enabled`` 
is
+    ``True``.
+
+See :doc:`configurations-ref` for the full list of options.
diff --git a/providers/common/ai/provider.yaml 
b/providers/common/ai/provider.yaml
index d95ff608857..8a4560ec440 100644
--- a/providers/common/ai/provider.yaml
+++ b/providers/common/ai/provider.yaml
@@ -100,6 +100,41 @@ config:
         type: string
         example: "file:///tmp/airflow_durable_cache"
         default: ""
+      otel_export_enabled:
+        description: |
+          Attach pydantic-ai OpenTelemetry instrumentation to agents created by
+          this provider and emit GenAI spans (agent run, model call, tool call,
+          token usage) for ``AgentOperator`` / ``@task.agent`` / ``@task.llm``
+          and the other LLM operators.
+
+          Spans are emitted through Airflow's existing OpenTelemetry exporter,
+          configured under ``[traces]`` / the standard ``OTEL_EXPORTER_OTLP_*``
+          environment variables, and nest under the task span so they are
+          attributable to the originating DAG run and task instance. The
+          provider does not configure an exporter of its own: if core tracing
+          (``[traces] otel_on``) is not enabled in the worker process, no spans
+          are emitted. Off by default so installing the provider never starts
+          shipping spans without opt-in.
+        version_added: 0.4.0
+        type: boolean
+        example: "True"
+        default: "False"
+      capture_content:
+        description: |
+          Capture prompt, completion, and tool-call content on the emitted
+          GenAI spans (``gen_ai.input.messages`` / ``gen_ai.output.messages``).
+
+          Off by default: only token counts, model id, latency, tool names, and
+          finish reason are recorded, never message text. Turning this on 
exports
+          model inputs and outputs to your tracing backend without redaction. 
Airflow's
+          secret masking applies to logs and rendered template fields, not to
+          OpenTelemetry span attributes, so it does not scrub this content.
+          Enable it only for debugging in a trusted environment. Has no effect
+          unless ``otel_export_enabled`` is ``True``.
+        version_added: 0.4.0
+        type: boolean
+        example: "False"
+        default: "False"
 
 
 connection-types:
diff --git 
a/providers/common/ai/src/airflow/providers/common/ai/get_provider_info.py 
b/providers/common/ai/src/airflow/providers/common/ai/get_provider_info.py
index a3642e4895b..8bc03c266cb 100644
--- a/providers/common/ai/src/airflow/providers/common/ai/get_provider_info.py
+++ b/providers/common/ai/src/airflow/providers/common/ai/get_provider_info.py
@@ -97,7 +97,21 @@ def get_provider_info():
                         "type": "string",
                         "example": "file:///tmp/airflow_durable_cache",
                         "default": "",
-                    }
+                    },
+                    "otel_export_enabled": {
+                        "description": "Attach pydantic-ai OpenTelemetry 
instrumentation to agents created by\nthis provider and emit GenAI spans (agent 
run, model call, tool call,\ntoken usage) for ``AgentOperator`` / 
``@task.agent`` / ``@task.llm``\nand the other LLM operators.\n\nSpans are 
emitted through Airflow's existing OpenTelemetry exporter,\nconfigured under 
``[traces]`` / the standard ``OTEL_EXPORTER_OTLP_*``\nenvironment variables, 
and nest under the task span so they are\nat [...]
+                        "version_added": "0.4.0",
+                        "type": "boolean",
+                        "example": "True",
+                        "default": "False",
+                    },
+                    "capture_content": {
+                        "description": "Capture prompt, completion, and 
tool-call content on the emitted\nGenAI spans (``gen_ai.input.messages`` / 
``gen_ai.output.messages``).\n\nOff by default: only token counts, model id, 
latency, tool names, and\nfinish reason are recorded, never message text. 
Turning this on exports\nmodel inputs and outputs to your tracing backend 
without redaction. Airflow's\nsecret masking applies to logs and rendered 
template fields, not to\nOpenTelemetry span at [...]
+                        "version_added": "0.4.0",
+                        "type": "boolean",
+                        "example": "False",
+                        "default": "False",
+                    },
                 },
             }
         },
diff --git 
a/providers/common/ai/src/airflow/providers/common/ai/hooks/pydantic_ai.py 
b/providers/common/ai/src/airflow/providers/common/ai/hooks/pydantic_ai.py
index eb4db2d1ec5..44e2436576f 100644
--- a/providers/common/ai/src/airflow/providers/common/ai/hooks/pydantic_ai.py
+++ b/providers/common/ai/src/airflow/providers/common/ai/hooks/pydantic_ai.py
@@ -22,6 +22,7 @@ from pydantic_ai import Agent
 from pydantic_ai.models import infer_model
 from pydantic_ai.providers import infer_provider, infer_provider_class
 
+from airflow.providers.common.ai.observability import 
genai_instrumentation_settings
 from airflow.providers.common.compat.sdk import BaseHook
 
 OutputT = TypeVar("OutputT")
@@ -185,11 +186,27 @@ class PydanticAIHook(BaseHook):
         """
         Create a pydantic-ai Agent configured with this hook's model.
 
+        When ``[common.ai] otel_export_enabled`` is set and the worker has an
+        OpenTelemetry exporter configured, the agent is instrumented to emit
+        GenAI spans through Airflow's tracing pipeline. See
+        :mod:`airflow.providers.common.ai.observability`.
+
         :param output_type: The expected output type from the agent (default: 
``str``).
         :param instructions: System-level instructions for the agent.
         :param agent_kwargs: Additional keyword arguments passed to the Agent 
constructor.
         """
-        return Agent(self.get_conn(), output_type=output_type, 
instructions=instructions, **agent_kwargs)
+        agent = Agent(self.get_conn(), output_type=output_type, 
instructions=instructions, **agent_kwargs)
+        if "instrument" not in agent_kwargs:
+            # Set the public ``agent.instrument`` surface rather than the
+            # ``Agent(instrument=...)`` constructor kwarg, which is deprecated 
in
+            # current pydantic-ai. Assigning ``agent.instrument`` works across 
the
+            # provider's ``pydantic-ai-slim>=1.71`` floor (a plain instance
+            # attribute on older versions, a property on newer ones). A caller
+            # that passed its own ``instrument`` wins.
+            settings = genai_instrumentation_settings()
+            if settings is not None:
+                agent.instrument = settings
+        return agent
 
     def test_connection(self) -> tuple[bool, str]:
         """
diff --git 
a/providers/common/ai/src/airflow/providers/common/ai/observability.py 
b/providers/common/ai/src/airflow/providers/common/ai/observability.py
new file mode 100644
index 00000000000..e81a6680d04
--- /dev/null
+++ b/providers/common/ai/src/airflow/providers/common/ai/observability.py
@@ -0,0 +1,105 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+OpenTelemetry tracing for pydantic-ai agents created by this provider.
+
+pydantic-ai ships native OpenTelemetry instrumentation that emits GenAI spans
+(agent run, model call, tool call) following the OTel GenAI semantic
+conventions. This module turns it on and points it at Airflow's existing
+OpenTelemetry exporter so agent spans flow to whatever OTLP backend the
+deployment already runs, nested under the worker's task span.
+
+It deliberately does not configure an exporter or a ``TracerProvider`` of its
+own: it reuses the global SDK provider that core tracing (``[traces] otel_on``)
+installs in the worker process. When that provider is absent (core tracing off,
+or not configured in this process) no spans are emitted.
+"""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Literal
+
+from airflow.providers.common.compat.sdk import conf
+
+if TYPE_CHECKING:
+    from pydantic_ai.models.instrumented import InstrumentationSettings
+
+SECTION = "common.ai"
+
+# OTel GenAI semantic-convention attribute set. Pinned so the emitted span and
+# ``gen_ai.*`` attribute names stay stable regardless of the pydantic-ai 
default
+# (which tracks the latest, still-evolving revision).
+_SEMCONV_VERSION: Literal[4] = 4
+
+
+def _otel_export_enabled() -> bool:
+    return conf.getboolean(SECTION, "otel_export_enabled", fallback=False)
+
+
+def _capture_content() -> bool:
+    return conf.getboolean(SECTION, "capture_content", fallback=False)
+
+
+def _live_tracer_provider():
+    """
+    Return the worker's configured SDK ``TracerProvider``, or ``None``.
+
+    Core tracing installs an ``opentelemetry.sdk`` ``TracerProvider`` via
+    ``trace.set_tracer_provider()``. Until then ``get_tracer_provider()``
+    returns the API's no-op proxy. Reusing the SDK provider is what makes the
+    GenAI spans share the core OTLP exporter and nest under the task span; we
+    never install our own. Returns ``None`` when the OpenTelemetry SDK is not
+    installed or no real provider is configured in this process.
+    """
+    try:
+        from opentelemetry import trace
+        from opentelemetry.sdk.trace import TracerProvider
+    except ImportError:
+        return None
+
+    provider = trace.get_tracer_provider()
+    return provider if isinstance(provider, TracerProvider) else None
+
+
+def genai_instrumentation_settings() -> InstrumentationSettings | None:
+    """
+    Build pydantic-ai ``InstrumentationSettings`` for an agent run.
+
+    Returns ``None`` (leave the agent un-instrumented, zero overhead) when
+    export is disabled or no live OTLP ``TracerProvider`` is configured in this
+    worker process. ``include_content`` is off by default so prompts,
+    completions, and tool IO are never emitted unless explicitly opted in via
+    ``[common.ai] capture_content``.
+    """
+    if not _otel_export_enabled():
+        return None
+    provider = _live_tracer_provider()
+    if provider is None:
+        return None
+
+    # Imported here, not at module top: this module is imported by the hook on
+    # every agent build, but the ``instrumented`` submodule is only needed when
+    # tracing is actually on. Keeping it lazy avoids that cost on the common
+    # tracing-off path.
+    from pydantic_ai.models.instrumented import InstrumentationSettings
+
+    return InstrumentationSettings(
+        version=_SEMCONV_VERSION,
+        include_content=_capture_content(),
+        include_binary_content=False,
+        tracer_provider=provider,
+    )
diff --git a/providers/common/ai/tests/unit/common/ai/hooks/test_pydantic_ai.py 
b/providers/common/ai/tests/unit/common/ai/hooks/test_pydantic_ai.py
index 993824e1c14..fb63b1f3e48 100644
--- a/providers/common/ai/tests/unit/common/ai/hooks/test_pydantic_ai.py
+++ b/providers/common/ai/tests/unit/common/ai/hooks/test_pydantic_ai.py
@@ -22,6 +22,7 @@ from unittest.mock import MagicMock, patch
 
 import pytest
 from pydantic_ai.models import Model
+from pydantic_ai.models.test import TestModel
 
 from airflow.models.connection import Connection
 from airflow.providers.common.ai.hooks.pydantic_ai import (
@@ -243,6 +244,47 @@ class TestPydanticAIHookCreateAgent:
         )
 
 
+class TestPydanticAIHookCreateAgentInstrumentation:
+    """create_agent() wires OpenTelemetry instrumentation from 
observability."""
+
+    @staticmethod
+    def _hook() -> PydanticAIHook:
+        return PydanticAIHook(llm_conn_id="test_conn", 
model_id="openai:gpt-5.3")
+
+    
@patch("airflow.providers.common.ai.hooks.pydantic_ai.genai_instrumentation_settings")
+    def test_instrument_set_when_settings_returned(self, mock_settings):
+        sentinel = MagicMock(name="InstrumentationSettings")
+        mock_settings.return_value = sentinel
+        hook = self._hook()
+        with patch.object(hook, "get_conn", return_value=TestModel()):
+            agent = hook.create_agent(instructions="hi")
+
+        assert agent.instrument is sentinel
+
+    
@patch("airflow.providers.common.ai.hooks.pydantic_ai.genai_instrumentation_settings")
+    def test_no_instrument_when_settings_none(self, mock_settings):
+        mock_settings.return_value = None
+        hook = self._hook()
+        with patch.object(hook, "get_conn", return_value=TestModel()):
+            agent = hook.create_agent(instructions="hi")
+
+        mock_settings.assert_called_once()
+        assert agent.instrument is None
+
+    @patch("airflow.providers.common.ai.hooks.pydantic_ai.Agent", 
autospec=True)
+    
@patch("airflow.providers.common.ai.hooks.pydantic_ai.genai_instrumentation_settings")
+    @patch("airflow.providers.common.ai.hooks.pydantic_ai.infer_model", 
autospec=True)
+    def test_caller_instrument_short_circuits(self, mock_infer_model, 
mock_settings, mock_agent_cls):
+        """A caller that passes its own ``instrument`` wins; we don't override 
it."""
+        mock_infer_model.return_value = MagicMock(spec=Model)
+        hook = self._hook()
+        conn = Connection(conn_id="test_conn", conn_type="pydanticai")
+        with patch.object(hook, "get_connection", return_value=conn):
+            hook.create_agent(instructions="hi", instrument=False)
+
+        mock_settings.assert_not_called()
+
+
 class TestPydanticAIHookTestConnection:
     @patch("airflow.providers.common.ai.hooks.pydantic_ai.infer_model", 
autospec=True)
     def test_successful_connection(self, mock_infer_model):
diff --git a/providers/common/ai/tests/unit/common/ai/test_observability.py 
b/providers/common/ai/tests/unit/common/ai/test_observability.py
new file mode 100644
index 00000000000..bfb6f0fbe6a
--- /dev/null
+++ b/providers/common/ai/tests/unit/common/ai/test_observability.py
@@ -0,0 +1,138 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+from unittest.mock import MagicMock, patch
+
+from opentelemetry.sdk.trace import TracerProvider
+from opentelemetry.sdk.trace.export import SimpleSpanProcessor
+from opentelemetry.sdk.trace.export.in_memory_span_exporter import 
InMemorySpanExporter
+from pydantic_ai.models.test import TestModel
+
+from airflow.providers.common.ai import observability
+from airflow.providers.common.ai.hooks.pydantic_ai import PydanticAIHook
+
+
+def _conf(*, enabled: bool = False, capture: bool = False) -> MagicMock:
+    """Return a conf double whose getboolean reflects the two flags."""
+    values = {"otel_export_enabled": enabled, "capture_content": capture}
+    m = MagicMock()
+    m.getboolean.side_effect = lambda section, key, fallback=False: 
values.get(key, fallback)
+    return m
+
+
+class TestGenaiInstrumentationSettings:
+    def test_returns_none_when_export_disabled(self):
+        with patch.object(observability, "conf", _conf(enabled=False)):
+            assert observability.genai_instrumentation_settings() is None
+
+    def test_returns_none_when_no_live_provider(self):
+        with (
+            patch.object(observability, "conf", _conf(enabled=True)),
+            patch.object(observability, "_live_tracer_provider", 
return_value=None),
+        ):
+            assert observability.genai_instrumentation_settings() is None
+
+    def test_returns_settings_when_enabled_with_provider(self):
+        with (
+            patch.object(observability, "conf", _conf(enabled=True, 
capture=False)),
+            patch.object(observability, "_live_tracer_provider", 
return_value=TracerProvider()),
+        ):
+            settings = observability.genai_instrumentation_settings()
+
+        assert settings is not None
+        assert settings.version == observability._SEMCONV_VERSION == 4
+        # Content is never emitted unless explicitly opted in.
+        assert settings.include_content is False
+        assert settings.include_binary_content is False
+        # The provided SDK provider was used to build the tracer.
+        assert settings.tracer is not None
+
+    def test_capture_content_opt_in_enables_content(self):
+        with (
+            patch.object(observability, "conf", _conf(enabled=True, 
capture=True)),
+            patch.object(observability, "_live_tracer_provider", 
return_value=TracerProvider()),
+        ):
+            settings = observability.genai_instrumentation_settings()
+
+        assert settings is not None
+        assert settings.include_content is True
+        # Binary content stays off even when text content is captured.
+        assert settings.include_binary_content is False
+
+
+class TestLiveTracerProvider:
+    def test_none_when_provider_is_not_sdk(self):
+        # The API's no-op proxy default (anything that is not an SDK
+        # TracerProvider) must be rejected, so we never emit orphan spans.
+        with patch("opentelemetry.trace.get_tracer_provider", 
return_value=object()):
+            assert observability._live_tracer_provider() is None
+
+    def test_returns_sdk_provider(self):
+        provider = TracerProvider()
+        with patch("opentelemetry.trace.get_tracer_provider", 
return_value=provider):
+            assert observability._live_tracer_provider() is provider
+
+
+class TestEndToEndSpanEmission:
+    """Exercise the real instrumentation path: build an agent through the hook,
+    run it, and assert on the genuine spans the OpenTelemetry SDK exports."""
+
+    # A distinctive user-prompt string we can grep for in the exported spans.
+    _PROMPT = "user-secret-needle-42"
+
+    @staticmethod
+    def _run(*, capture: bool):
+        exporter = InMemorySpanExporter()
+        provider = TracerProvider()
+        provider.add_span_processor(SimpleSpanProcessor(exporter))
+
+        hook = PydanticAIHook(llm_conn_id="c", model_id="test")
+        with (
+            patch.object(observability, "conf", _conf(enabled=True, 
capture=capture)),
+            patch.object(observability, "_live_tracer_provider", 
return_value=provider),
+            patch.object(hook, "get_conn", return_value=TestModel()),
+        ):
+            agent = hook.create_agent(instructions="be helpful")
+            # Stand in for the worker's task span: open a parent and run 
inside it.
+            with 
provider.get_tracer("test").start_as_current_span("worker.task") as parent:
+                parent_trace_id = parent.get_span_context().trace_id
+                agent.run_sync(TestEndToEndSpanEmission._PROMPT)
+
+        spans = exporter.get_finished_spans()
+        genai = [s for s in spans if s.attributes and 
any(k.startswith("gen_ai.") for k in s.attributes)]
+        attrs_blob = json.dumps([dict(s.attributes or {}) for s in genai])
+        return genai, parent_trace_id, attrs_blob
+
+    def test_spans_emitted_and_nested_without_content_by_default(self):
+        genai, parent_trace_id, attrs_blob = self._run(capture=False)
+
+        assert genai, "expected gen_ai spans to be emitted"
+        # Token usage is captured even with content off.
+        assert any("gen_ai.usage.input_tokens" in (s.attributes or {}) for s 
in genai)
+        # Parenting is implicit: agent spans share the task span's trace_id.
+        assert all(s.context.trace_id == parent_trace_id for s in genai)
+        # The prompt text must not leak when content capture is off.
+        assert self._PROMPT not in attrs_blob
+
+    def test_capture_content_includes_prompt_text(self):
+        genai, _, attrs_blob = self._run(capture=True)
+
+        assert genai
+        # With the opt-in, the prompt text is present on the spans.
+        assert self._PROMPT in attrs_blob

Reply via email to