This is an automated email from the ASF dual-hosted git repository.
amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 159f9ce5df2 Route providers to consume Stats from common compat
provider (#61812)
159f9ce5df2 is described below
commit 159f9ce5df2f7245fb353a1cf2369e4639309292
Author: Amogh Desai <[email protected]>
AuthorDate: Wed Feb 18 13:14:04 2026 +0530
Route providers to consume Stats from common compat provider (#61812)
---
.../compat/src/airflow/providers/common/compat/sdk.py | 2 +-
providers/edge3/pyproject.toml | 2 +-
.../src/airflow/providers/edge3/models/edge_worker.py | 11 +++++++----
.../airflow/providers/edge3/worker_api/routes/jobs.py | 17 +++++++++--------
.../airflow/providers/edge3/worker_api/routes/worker.py | 15 ++++++++-------
providers/openlineage/pyproject.toml | 2 +-
.../airflow/providers/openlineage/plugins/adapter.py | 11 +++++++----
task-sdk/src/airflow/sdk/observability/stats.py | 3 ++-
8 files changed, 36 insertions(+), 27 deletions(-)
diff --git a/providers/common/compat/src/airflow/providers/common/compat/sdk.py
b/providers/common/compat/src/airflow/providers/common/compat/sdk.py
index 0cfa54dbd35..e444085e0d8 100644
--- a/providers/common/compat/src/airflow/providers/common/compat/sdk.py
+++ b/providers/common/compat/src/airflow/providers/common/compat/sdk.py
@@ -272,7 +272,7 @@ _IMPORT_MAP: dict[str, str | tuple[str, ...]] = {
#
============================================================================
# Observability
#
============================================================================
- "Stats": ("airflow.sdk._shared.observability.metrics.stats",
"airflow.stats"),
+ "Stats": ("airflow.sdk.observability.stats",
"airflow.observability.stats", "airflow.stats"),
#
============================================================================
# Secrets Masking
#
============================================================================
diff --git a/providers/edge3/pyproject.toml b/providers/edge3/pyproject.toml
index 1b1eb983d25..4a5c2c17f8f 100644
--- a/providers/edge3/pyproject.toml
+++ b/providers/edge3/pyproject.toml
@@ -69,7 +69,7 @@ requires-python = ">=3.10"
# After you modify the dependencies, and rebuild your Breeze CI image with
``breeze ci-image build``
dependencies = [
"apache-airflow>=3.0.0,!=3.1.0",
- "apache-airflow-providers-common-compat>=1.13.0",
+ "apache-airflow-providers-common-compat>=1.13.0", # use next version
"pydantic>=2.11.0",
"retryhttp>=1.4.0",
"aiofiles>=23.2.0",
diff --git a/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py
b/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py
index 3a4779420e7..2e739826793 100644
--- a/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py
+++ b/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py
@@ -28,6 +28,11 @@ from sqlalchemy.orm import Mapped
from airflow.models.base import Base
from airflow.providers.common.compat.sdk import AirflowException, Stats,
timezone
+
+try:
+ from airflow.sdk.observability.stats import DualStatsManager
+except ImportError:
+ DualStatsManager = None # type: ignore[assignment,misc] # Airflow < 3.2
compat
from airflow.providers.common.compat.sqlalchemy.orm import mapped_column
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.providers_configuration_loader import
providers_configuration_loaded
@@ -173,9 +178,7 @@ def set_metrics(
EdgeWorkerState.OFFLINE_MAINTENANCE,
)
- try:
- from airflow.sdk._shared.observability.metrics.dual_stats_manager
import DualStatsManager
-
+ if DualStatsManager is not None:
DualStatsManager.gauge(
"edge_worker.connected",
int(connected),
@@ -217,7 +220,7 @@ def set_metrics(
tags={},
extra_tags={"worker_name": worker_name, "queues":
",".join(queues)},
)
- except ImportError:
+ else:
Stats.gauge(f"edge_worker.connected.{worker_name}", int(connected))
Stats.gauge("edge_worker.connected", int(connected),
tags={"worker_name": worker_name})
diff --git
a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/jobs.py
b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/jobs.py
index 4263d61a335..1191d068014 100644
--- a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/jobs.py
+++ b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/jobs.py
@@ -27,6 +27,11 @@ from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_exception_doc
from airflow.executors.workloads import ExecuteTask
from airflow.providers.common.compat.sdk import Stats, timezone
+
+try:
+ from airflow.sdk.observability.stats import DualStatsManager
+except ImportError:
+ DualStatsManager = None # type: ignore[assignment,misc] # Airflow < 3.2
compat
from airflow.providers.edge3.models.edge_job import EdgeJobModel
from airflow.providers.edge3.worker_api.auth import
jwt_token_authorization_rest
from airflow.providers.edge3.worker_api.datamodels import (
@@ -86,11 +91,9 @@ def fetch(
session.commit()
# Edge worker does not backport emitted Airflow metrics, so export some
metrics
tags = {"dag_id": job.dag_id, "task_id": job.task_id, "queue": job.queue}
- try:
- from airflow.sdk._shared.observability.metrics.dual_stats_manager
import DualStatsManager
-
+ if DualStatsManager is not None:
DualStatsManager.incr("edge_worker.ti.start", tags=tags)
- except ImportError:
+ else:
Stats.incr(f"edge_worker.ti.start.{job.queue}.{job.dag_id}.{job.task_id}",
tags=tags)
Stats.incr("edge_worker.ti.start", tags=tags)
return EdgeJobFetched(
@@ -145,14 +148,12 @@ def state(
"queue": job.queue,
"state": str(state),
}
- try:
- from
airflow.sdk._shared.observability.metrics.dual_stats_manager import
DualStatsManager
-
+ if DualStatsManager is not None:
DualStatsManager.incr(
"edge_worker.ti.finish",
tags=tags,
)
- except ImportError:
+ else:
Stats.incr(
f"edge_worker.ti.finish.{job.queue}.{state}.{job.dag_id}.{job.task_id}",
tags=tags,
diff --git
a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py
b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py
index 473c6aa4ea0..8e3dce56e15 100644
--- a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py
+++ b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py
@@ -26,7 +26,12 @@ from sqlalchemy import select
from airflow.api_fastapi.common.db.common import SessionDep # noqa: TC001
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_exception_doc
-from airflow.providers.common.compat.sdk import timezone
+from airflow.providers.common.compat.sdk import Stats, timezone
+
+try:
+ from airflow.sdk.observability.stats import DualStatsManager
+except ImportError:
+ DualStatsManager = None # type: ignore[assignment,misc] # Airflow < 3.2
compat
from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel,
EdgeWorkerState, set_metrics
from airflow.providers.edge3.worker_api.auth import
jwt_token_authorization_rest
from airflow.providers.edge3.worker_api.datamodels import (
@@ -212,9 +217,7 @@ def set_state(
worker.sysinfo = json.dumps(body.sysinfo)
worker.last_update = timezone.utcnow()
session.commit()
- try:
- from airflow.sdk._shared.observability.metrics.dual_stats_manager
import DualStatsManager
-
+ if DualStatsManager is not None:
DualStatsManager.incr(
"edge_worker.heartbeat_count",
1,
@@ -222,9 +225,7 @@ def set_state(
tags={},
extra_tags={"worker_name": worker_name},
)
- except ImportError:
- from airflow.providers.common.compat.sdk import Stats
-
+ else:
Stats.incr(f"edge_worker.heartbeat_count.{worker_name}", 1, 1)
Stats.incr("edge_worker.heartbeat_count", 1, 1, tags={"worker_name":
worker_name})
set_metrics(
diff --git a/providers/openlineage/pyproject.toml
b/providers/openlineage/pyproject.toml
index 73d2ebf0e22..a241ecb2be9 100644
--- a/providers/openlineage/pyproject.toml
+++ b/providers/openlineage/pyproject.toml
@@ -60,7 +60,7 @@ requires-python = ">=3.10"
dependencies = [
"apache-airflow>=2.11.0",
"apache-airflow-providers-common-sql>=1.20.0",
- "apache-airflow-providers-common-compat>=1.13.1",
+ "apache-airflow-providers-common-compat>=1.13.1", # use next version
"attrs>=22.2",
"openlineage-integration-common>=1.41.0",
"openlineage-python>=1.41.0",
diff --git
a/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py
b/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py
index 7e21bcda6a9..076730e9bf6 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py
@@ -36,6 +36,11 @@ from openlineage.client.facet_v2 import (
)
from airflow.providers.common.compat.sdk import Stats, conf as airflow_conf
+
+try:
+ from airflow.sdk.observability.stats import DualStatsManager
+except ImportError:
+ DualStatsManager = None # type: ignore[assignment,misc] # Airflow < 3.2
compat
from airflow.providers.openlineage import __version__ as
OPENLINEAGE_PROVIDER_VERSION, conf
from airflow.providers.openlineage.utils.utils import (
OpenLineageRedactor,
@@ -158,16 +163,14 @@ class OpenLineageAdapter(LoggingMixin):
try:
with ExitStack() as stack:
- try:
- from
airflow.sdk._shared.observability.metrics.dual_stats_manager import
DualStatsManager
-
+ if DualStatsManager is not None:
stack.enter_context(
DualStatsManager.timer(
"ol.emit.attempts",
extra_tags={"event_type": event_type,
"transport_type": transport_type},
)
)
- except ImportError:
+ else:
stack.enter_context(Stats.timer(f"ol.emit.attempts.{event_type}.{transport_type}"))
stack.enter_context(Stats.timer("ol.emit.attempts"))
diff --git a/task-sdk/src/airflow/sdk/observability/stats.py
b/task-sdk/src/airflow/sdk/observability/stats.py
index 40452fbcaaa..072342262d5 100644
--- a/task-sdk/src/airflow/sdk/observability/stats.py
+++ b/task-sdk/src/airflow/sdk/observability/stats.py
@@ -19,6 +19,7 @@
from __future__ import annotations
+from airflow.sdk._shared.observability.metrics.dual_stats_manager import
DualStatsManager
from airflow.sdk._shared.observability.metrics.stats import Stats,
normalize_name_for_stats
-__all__ = ["Stats", "normalize_name_for_stats"]
+__all__ = ["Stats", "normalize_name_for_stats", "DualStatsManager"]