This is an automated email from the ASF dual-hosted git repository.

amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 159f9ce5df2 Route providers to consume Stats from common compat 
provider (#61812)
159f9ce5df2 is described below

commit 159f9ce5df2f7245fb353a1cf2369e4639309292
Author: Amogh Desai <[email protected]>
AuthorDate: Wed Feb 18 13:14:04 2026 +0530

    Route providers to consume Stats from common compat provider (#61812)
---
 .../compat/src/airflow/providers/common/compat/sdk.py   |  2 +-
 providers/edge3/pyproject.toml                          |  2 +-
 .../src/airflow/providers/edge3/models/edge_worker.py   | 11 +++++++----
 .../airflow/providers/edge3/worker_api/routes/jobs.py   | 17 +++++++++--------
 .../airflow/providers/edge3/worker_api/routes/worker.py | 15 ++++++++-------
 providers/openlineage/pyproject.toml                    |  2 +-
 .../airflow/providers/openlineage/plugins/adapter.py    | 11 +++++++----
 task-sdk/src/airflow/sdk/observability/stats.py         |  3 ++-
 8 files changed, 36 insertions(+), 27 deletions(-)

diff --git a/providers/common/compat/src/airflow/providers/common/compat/sdk.py 
b/providers/common/compat/src/airflow/providers/common/compat/sdk.py
index 0cfa54dbd35..e444085e0d8 100644
--- a/providers/common/compat/src/airflow/providers/common/compat/sdk.py
+++ b/providers/common/compat/src/airflow/providers/common/compat/sdk.py
@@ -272,7 +272,7 @@ _IMPORT_MAP: dict[str, str | tuple[str, ...]] = {
     # 
============================================================================
     # Observability
     # 
============================================================================
-    "Stats": ("airflow.sdk._shared.observability.metrics.stats", 
"airflow.stats"),
+    "Stats": ("airflow.sdk.observability.stats", 
"airflow.observability.stats", "airflow.stats"),
     # 
============================================================================
     # Secrets Masking
     # 
============================================================================
diff --git a/providers/edge3/pyproject.toml b/providers/edge3/pyproject.toml
index 1b1eb983d25..4a5c2c17f8f 100644
--- a/providers/edge3/pyproject.toml
+++ b/providers/edge3/pyproject.toml
@@ -69,7 +69,7 @@ requires-python = ">=3.10"
 # After you modify the dependencies, and rebuild your Breeze CI image with 
``breeze ci-image build``
 dependencies = [
     "apache-airflow>=3.0.0,!=3.1.0",
-    "apache-airflow-providers-common-compat>=1.13.0",
+    "apache-airflow-providers-common-compat>=1.13.0",  # use next version
     "pydantic>=2.11.0",
     "retryhttp>=1.4.0",
     "aiofiles>=23.2.0",
diff --git a/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py 
b/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py
index 3a4779420e7..2e739826793 100644
--- a/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py
+++ b/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py
@@ -28,6 +28,11 @@ from sqlalchemy.orm import Mapped
 
 from airflow.models.base import Base
 from airflow.providers.common.compat.sdk import AirflowException, Stats, 
timezone
+
+try:
+    from airflow.sdk.observability.stats import DualStatsManager
+except ImportError:
+    DualStatsManager = None  # type: ignore[assignment,misc]  # Airflow < 3.2 
compat
 from airflow.providers.common.compat.sqlalchemy.orm import mapped_column
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
@@ -173,9 +178,7 @@ def set_metrics(
         EdgeWorkerState.OFFLINE_MAINTENANCE,
     )
 
-    try:
-        from airflow.sdk._shared.observability.metrics.dual_stats_manager 
import DualStatsManager
-
+    if DualStatsManager is not None:
         DualStatsManager.gauge(
             "edge_worker.connected",
             int(connected),
@@ -217,7 +220,7 @@ def set_metrics(
             tags={},
             extra_tags={"worker_name": worker_name, "queues": 
",".join(queues)},
         )
-    except ImportError:
+    else:
         Stats.gauge(f"edge_worker.connected.{worker_name}", int(connected))
         Stats.gauge("edge_worker.connected", int(connected), 
tags={"worker_name": worker_name})
 
diff --git 
a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/jobs.py 
b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/jobs.py
index 4263d61a335..1191d068014 100644
--- a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/jobs.py
+++ b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/jobs.py
@@ -27,6 +27,11 @@ from airflow.api_fastapi.common.router import AirflowRouter
 from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
 from airflow.executors.workloads import ExecuteTask
 from airflow.providers.common.compat.sdk import Stats, timezone
+
+try:
+    from airflow.sdk.observability.stats import DualStatsManager
+except ImportError:
+    DualStatsManager = None  # type: ignore[assignment,misc]  # Airflow < 3.2 
compat
 from airflow.providers.edge3.models.edge_job import EdgeJobModel
 from airflow.providers.edge3.worker_api.auth import 
jwt_token_authorization_rest
 from airflow.providers.edge3.worker_api.datamodels import (
@@ -86,11 +91,9 @@ def fetch(
     session.commit()
     # Edge worker does not backport emitted Airflow metrics, so export some 
metrics
     tags = {"dag_id": job.dag_id, "task_id": job.task_id, "queue": job.queue}
-    try:
-        from airflow.sdk._shared.observability.metrics.dual_stats_manager 
import DualStatsManager
-
+    if DualStatsManager is not None:
         DualStatsManager.incr("edge_worker.ti.start", tags=tags)
-    except ImportError:
+    else:
         
Stats.incr(f"edge_worker.ti.start.{job.queue}.{job.dag_id}.{job.task_id}", 
tags=tags)
         Stats.incr("edge_worker.ti.start", tags=tags)
     return EdgeJobFetched(
@@ -145,14 +148,12 @@ def state(
                 "queue": job.queue,
                 "state": str(state),
             }
-            try:
-                from 
airflow.sdk._shared.observability.metrics.dual_stats_manager import 
DualStatsManager
-
+            if DualStatsManager is not None:
                 DualStatsManager.incr(
                     "edge_worker.ti.finish",
                     tags=tags,
                 )
-            except ImportError:
+            else:
                 Stats.incr(
                     
f"edge_worker.ti.finish.{job.queue}.{state}.{job.dag_id}.{job.task_id}",
                     tags=tags,
diff --git 
a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py 
b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py
index 473c6aa4ea0..8e3dce56e15 100644
--- a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py
+++ b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py
@@ -26,7 +26,12 @@ from sqlalchemy import select
 from airflow.api_fastapi.common.db.common import SessionDep  # noqa: TC001
 from airflow.api_fastapi.common.router import AirflowRouter
 from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
-from airflow.providers.common.compat.sdk import timezone
+from airflow.providers.common.compat.sdk import Stats, timezone
+
+try:
+    from airflow.sdk.observability.stats import DualStatsManager
+except ImportError:
+    DualStatsManager = None  # type: ignore[assignment,misc]  # Airflow < 3.2 
compat
 from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel, 
EdgeWorkerState, set_metrics
 from airflow.providers.edge3.worker_api.auth import 
jwt_token_authorization_rest
 from airflow.providers.edge3.worker_api.datamodels import (
@@ -212,9 +217,7 @@ def set_state(
     worker.sysinfo = json.dumps(body.sysinfo)
     worker.last_update = timezone.utcnow()
     session.commit()
-    try:
-        from airflow.sdk._shared.observability.metrics.dual_stats_manager 
import DualStatsManager
-
+    if DualStatsManager is not None:
         DualStatsManager.incr(
             "edge_worker.heartbeat_count",
             1,
@@ -222,9 +225,7 @@ def set_state(
             tags={},
             extra_tags={"worker_name": worker_name},
         )
-    except ImportError:
-        from airflow.providers.common.compat.sdk import Stats
-
+    else:
         Stats.incr(f"edge_worker.heartbeat_count.{worker_name}", 1, 1)
         Stats.incr("edge_worker.heartbeat_count", 1, 1, tags={"worker_name": 
worker_name})
     set_metrics(
diff --git a/providers/openlineage/pyproject.toml 
b/providers/openlineage/pyproject.toml
index 73d2ebf0e22..a241ecb2be9 100644
--- a/providers/openlineage/pyproject.toml
+++ b/providers/openlineage/pyproject.toml
@@ -60,7 +60,7 @@ requires-python = ">=3.10"
 dependencies = [
     "apache-airflow>=2.11.0",
     "apache-airflow-providers-common-sql>=1.20.0",
-    "apache-airflow-providers-common-compat>=1.13.1",
+    "apache-airflow-providers-common-compat>=1.13.1",  # use next version
     "attrs>=22.2",
     "openlineage-integration-common>=1.41.0",
     "openlineage-python>=1.41.0",
diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py 
b/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py
index 7e21bcda6a9..076730e9bf6 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py
@@ -36,6 +36,11 @@ from openlineage.client.facet_v2 import (
 )
 
 from airflow.providers.common.compat.sdk import Stats, conf as airflow_conf
+
+try:
+    from airflow.sdk.observability.stats import DualStatsManager
+except ImportError:
+    DualStatsManager = None  # type: ignore[assignment,misc]  # Airflow < 3.2 
compat
 from airflow.providers.openlineage import __version__ as 
OPENLINEAGE_PROVIDER_VERSION, conf
 from airflow.providers.openlineage.utils.utils import (
     OpenLineageRedactor,
@@ -158,16 +163,14 @@ class OpenLineageAdapter(LoggingMixin):
 
         try:
             with ExitStack() as stack:
-                try:
-                    from 
airflow.sdk._shared.observability.metrics.dual_stats_manager import 
DualStatsManager
-
+                if DualStatsManager is not None:
                     stack.enter_context(
                         DualStatsManager.timer(
                             "ol.emit.attempts",
                             extra_tags={"event_type": event_type, 
"transport_type": transport_type},
                         )
                     )
-                except ImportError:
+                else:
                     
stack.enter_context(Stats.timer(f"ol.emit.attempts.{event_type}.{transport_type}"))
                     stack.enter_context(Stats.timer("ol.emit.attempts"))
 
diff --git a/task-sdk/src/airflow/sdk/observability/stats.py 
b/task-sdk/src/airflow/sdk/observability/stats.py
index 40452fbcaaa..072342262d5 100644
--- a/task-sdk/src/airflow/sdk/observability/stats.py
+++ b/task-sdk/src/airflow/sdk/observability/stats.py
@@ -19,6 +19,7 @@
 
 from __future__ import annotations
 
+from airflow.sdk._shared.observability.metrics.dual_stats_manager import 
DualStatsManager
 from airflow.sdk._shared.observability.metrics.stats import Stats, 
normalize_name_for_stats
 
-__all__ = ["Stats", "normalize_name_for_stats"]
+__all__ = ["Stats", "normalize_name_for_stats", "DualStatsManager"]

Reply via email to