ashb commented on code in PR #53722:
URL: https://github.com/apache/airflow/pull/53722#discussion_r2253687812
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2108,18 +2136,43 @@ def _emit_pool_metrics(self, session: Session =
NEW_SESSION) -> None:
with DebugTrace.start_span(span_name="emit_pool_metrics",
component="SchedulerJobRunner") as span:
pools = Pool.slots_stats(session=session)
for pool_name, slot_stats in pools.items():
- Stats.gauge(f"pool.open_slots.{pool_name}", slot_stats["open"])
- Stats.gauge(f"pool.queued_slots.{pool_name}",
slot_stats["queued"])
- Stats.gauge(f"pool.running_slots.{pool_name}",
slot_stats["running"])
- Stats.gauge(f"pool.deferred_slots.{pool_name}",
slot_stats["deferred"])
- Stats.gauge(f"pool.scheduled_slots.{pool_name}",
slot_stats["scheduled"])
-
- # Same metrics with tagging
- Stats.gauge("pool.open_slots", slot_stats["open"],
tags={"pool_name": pool_name})
- Stats.gauge("pool.queued_slots", slot_stats["queued"],
tags={"pool_name": pool_name})
- Stats.gauge("pool.running_slots", slot_stats["running"],
tags={"pool_name": pool_name})
- Stats.gauge("pool.deferred_slots", slot_stats["deferred"],
tags={"pool_name": pool_name})
- Stats.gauge("pool.scheduled_slots", slot_stats["scheduled"],
tags={"pool_name": pool_name})
+ # If enabled on the config, publish metrics twice,
+ # once with backward compatible name, and then with tags.
+ DualStatsManager.gauge(
+ f"pool.open_slots.{pool_name}",
+ "pool.open_slots",
+ slot_stats["open"],
+ tags={},
+ extra_tags={"pool_name": pool_name},
+ )
Review Comment:
I didn't check all cases in this PR, but I wonder if we could do this as
```suggestion
DualStatsManager.gauge(
"pool.open_slots",
slot_stats["open"],
tags={},
extra_tags={"pool_name": pool_name},
)
```
And then we can build the old name via something like this:
```python
metric_name = metric + ".".join(extra_tags.values())
```
wdyt?
##########
devel-common/src/tests_common/test_utils/otel_utils.py:
##########
@@ -94,14 +95,23 @@ def clean_task_lines(lines: list) -> list:
return cleaned_lines
-def extract_spans_from_output(output_lines: list):
+def _extract_obj_from_output(output_lines: list[str], kind: str):
"""
- For a given list of ConsoleSpanExporter output lines, it extracts the json
spans and creates two dictionaries.
-
- :return: root spans dict (key: root_span_id - value: root_span), spans
dict (key: span_id - value: span)
+ Used to extract spans or metrics from the output.
+
+ Parameters
+ ----------
+ output_lines : list[str]
+ The captured stdout split into lines.
+ kind : "spans" | "metrics"
+ Which json type to extract from the output.
Review Comment:
```suggestion
Parameters
----------
:param output_lines: The captured stdout split into lines.
:param kind: Which json type to extract from the output.
```
##########
devel-common/src/tests_common/test_utils/otel_utils.py:
##########
@@ -94,14 +95,23 @@ def clean_task_lines(lines: list) -> list:
return cleaned_lines
-def extract_spans_from_output(output_lines: list):
+def _extract_obj_from_output(output_lines: list[str], kind: str):
Review Comment:
```suggestion
def _extract_obj_from_output(output_lines: list[str], kind: Literal["spans"]
| Literal["metrics"]):
```
##########
providers/edge3/src/airflow/providers/edge3/version_compat.py:
##########
@@ -33,3 +33,15 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
+AIRFLOW_V_3_1_PLUS: bool = get_base_airflow_version_tuple() >= (3, 1, 0)
+
+if AIRFLOW_V_3_0_PLUS:
+ from airflow.sdk import BaseOperator
+else:
+ from airflow.models import BaseOperator
+
+__all__ = [
+ "AIRFLOW_V_3_0_PLUS",
+ "AIRFLOW_V_3_1_PLUS",
+ "BaseOperator",
+]
Review Comment:
Ditto here
```suggestion
try:
from airflow.sdk import BaseOperator
except ImportError:
from airflow.models import BaseOperator
__all__ = [
"AIRFLOW_V_3_0_PLUS",
"BaseOperator",
]
```
##########
providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py:
##########
@@ -157,15 +158,30 @@ def emit(self, event: RunEvent):
transport_type = f"{self._client.transport.kind}".lower()
try:
- with ExitStack() as stack:
-
stack.enter_context(Stats.timer(f"ol.emit.attempts.{event_type}.{transport_type}"))
- stack.enter_context(Stats.timer("ol.emit.attempts"))
- self._client.emit(redacted_event)
- self.log.info(
- "Successfully emitted OpenLineage `%s` event of id `%s`",
- event_type.upper(),
- event.run.runId,
- )
+ if AIRFLOW_V_3_1_PLUS:
+ from airflow.metrics.dual_stats_manager import DualStatsManager
+
+ # If enabled on the config, publish metrics twice,
+ # once with backward compatible name, and then with tags.
+ with DualStatsManager.timer(
+ f"ol.emit.attempts.{event_type}.{transport_type}",
"ol.emit.attempts"
+ ):
+ self._client.emit(redacted_event)
+ self.log.info(
+ "Successfully emitted OpenLineage `%s` event of id
`%s`",
+ event_type.upper(),
+ event.run.runId,
+ )
Review Comment:
Rather than duplicating the code here, I think it would be better to use the
exist stack and have this as something like
```python
with ExitStack() as stack:
if AIRFLOW_V_3_1_PLUS:
stack.enter_context(DualStatsManager)
else:
stack.enter_context(Stats.timer(...))
stack.enter_context(Stats.timer(...))
self._client.emit(redacted_event)
self.log.info(
"Successfully emitted OpenLineage `%s` event of id `%s`",
event_type.upper(),
event.run.runId,
)
```
##########
providers/edge3/src/airflow/providers/edge3/models/edge_worker.py:
##########
@@ -27,13 +27,18 @@
from airflow.exceptions import AirflowException
from airflow.models.base import Base
+from airflow.providers.edge3.version_compat import AIRFLOW_V_3_1_PLUS
from airflow.stats import Stats
-from airflow.utils import timezone
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.providers_configuration_loader import
providers_configuration_loaded
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import UtcDateTime
+if AIRFLOW_V_3_1_PLUS:
+ from airflow.sdk import timezone
+else:
+ from airflow.utils import timezone # type: ignore[attr-defined,no-redef]
Review Comment:
```suggestion
try:
from airflow.sdk import timezone
except ImportError:
from airflow.utils import timezone # type: ignore[attr-defined,no-redef]
```
Better not to use version checks, especially as this an import from TaskSDK,
which is working torwards independence from the Airflow version.
##########
providers/edge3/src/airflow/providers/edge3/version_compat.py:
##########
@@ -33,3 +33,15 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
+AIRFLOW_V_3_1_PLUS: bool = get_base_airflow_version_tuple() >= (3, 1, 0)
+
+if AIRFLOW_V_3_0_PLUS:
+ from airflow.sdk import BaseOperator
+else:
+ from airflow.models import BaseOperator
+
+__all__ = [
+ "AIRFLOW_V_3_0_PLUS",
+ "AIRFLOW_V_3_1_PLUS",
+ "BaseOperator",
+]
Review Comment:
A few more places have this pattern that I haven't commented on
##########
providers/edge3/src/airflow/providers/edge3/models/edge_worker.py:
##########
@@ -27,13 +27,18 @@
from airflow.exceptions import AirflowException
from airflow.models.base import Base
+from airflow.providers.edge3.version_compat import AIRFLOW_V_3_1_PLUS
Review Comment:
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]