ashb commented on code in PR #53722:
URL: https://github.com/apache/airflow/pull/53722#discussion_r2253687812


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2108,18 +2136,43 @@ def _emit_pool_metrics(self, session: Session = 
NEW_SESSION) -> None:
         with DebugTrace.start_span(span_name="emit_pool_metrics", 
component="SchedulerJobRunner") as span:
             pools = Pool.slots_stats(session=session)
             for pool_name, slot_stats in pools.items():
-                Stats.gauge(f"pool.open_slots.{pool_name}", slot_stats["open"])
-                Stats.gauge(f"pool.queued_slots.{pool_name}", 
slot_stats["queued"])
-                Stats.gauge(f"pool.running_slots.{pool_name}", 
slot_stats["running"])
-                Stats.gauge(f"pool.deferred_slots.{pool_name}", 
slot_stats["deferred"])
-                Stats.gauge(f"pool.scheduled_slots.{pool_name}", 
slot_stats["scheduled"])
-
-                # Same metrics with tagging
-                Stats.gauge("pool.open_slots", slot_stats["open"], 
tags={"pool_name": pool_name})
-                Stats.gauge("pool.queued_slots", slot_stats["queued"], 
tags={"pool_name": pool_name})
-                Stats.gauge("pool.running_slots", slot_stats["running"], 
tags={"pool_name": pool_name})
-                Stats.gauge("pool.deferred_slots", slot_stats["deferred"], 
tags={"pool_name": pool_name})
-                Stats.gauge("pool.scheduled_slots", slot_stats["scheduled"], 
tags={"pool_name": pool_name})
+                # If enabled on the config, publish metrics twice,
+                # once with backward compatible name, and then with tags.
+                DualStatsManager.gauge(
+                    f"pool.open_slots.{pool_name}",
+                    "pool.open_slots",
+                    slot_stats["open"],
+                    tags={},
+                    extra_tags={"pool_name": pool_name},
+                )

Review Comment:
   I didn't check all cases in this PR, but I wonder if we could do this as 
   ```suggestion
                   DualStatsManager.gauge(
                       "pool.open_slots",
                       slot_stats["open"],
                       tags={},
                       extra_tags={"pool_name": pool_name},
                   )
   ```
   
   And then we can build the old name via something like this:
   
   ```python
   metric_name = metric + ".".join(extra_tags.values())
   ```
   
   wdyt?



##########
devel-common/src/tests_common/test_utils/otel_utils.py:
##########
@@ -94,14 +95,23 @@ def clean_task_lines(lines: list) -> list:
     return cleaned_lines
 
 
-def extract_spans_from_output(output_lines: list):
+def _extract_obj_from_output(output_lines: list[str], kind: str):
     """
-    For a given list of ConsoleSpanExporter output lines, it extracts the json 
spans and creates two dictionaries.
-
-    :return: root spans dict (key: root_span_id - value: root_span), spans 
dict (key: span_id - value: span)
+    Used to extract spans or metrics from the output.
+
+    Parameters
+    ----------
+    output_lines : list[str]
+        The captured stdout split into lines.
+    kind : "spans" | "metrics"
+        Which json type to extract from the output.

Review Comment:
   ```suggestion
       Parameters
       ----------
       :param output_lines: The captured stdout split into lines.
       :param kind: Which json type to extract from the output.
   ```



##########
devel-common/src/tests_common/test_utils/otel_utils.py:
##########
@@ -94,14 +95,23 @@ def clean_task_lines(lines: list) -> list:
     return cleaned_lines
 
 
-def extract_spans_from_output(output_lines: list):
+def _extract_obj_from_output(output_lines: list[str], kind: str):

Review Comment:
   ```suggestion
   def _extract_obj_from_output(output_lines: list[str], kind: Literal["spans"] 
| Literal["metrics"]):
   ```



##########
providers/edge3/src/airflow/providers/edge3/version_compat.py:
##########
@@ -33,3 +33,15 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
 
 
 AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
+AIRFLOW_V_3_1_PLUS: bool = get_base_airflow_version_tuple() >= (3, 1, 0)
+
+if AIRFLOW_V_3_0_PLUS:
+    from airflow.sdk import BaseOperator
+else:
+    from airflow.models import BaseOperator
+
+__all__ = [
+    "AIRFLOW_V_3_0_PLUS",
+    "AIRFLOW_V_3_1_PLUS",
+    "BaseOperator",
+]

Review Comment:
   Ditto here
   ```suggestion
   
   try:
       from airflow.sdk import BaseOperator
   except ImportError:
       from airflow.models import BaseOperator
   
   __all__ = [
       "AIRFLOW_V_3_0_PLUS",
       "BaseOperator",
   ]
   ```



##########
providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py:
##########
@@ -157,15 +158,30 @@ def emit(self, event: RunEvent):
         transport_type = f"{self._client.transport.kind}".lower()
 
         try:
-            with ExitStack() as stack:
-                
stack.enter_context(Stats.timer(f"ol.emit.attempts.{event_type}.{transport_type}"))
-                stack.enter_context(Stats.timer("ol.emit.attempts"))
-                self._client.emit(redacted_event)
-                self.log.info(
-                    "Successfully emitted OpenLineage `%s` event of id `%s`",
-                    event_type.upper(),
-                    event.run.runId,
-                )
+            if AIRFLOW_V_3_1_PLUS:
+                from airflow.metrics.dual_stats_manager import DualStatsManager
+
+                # If enabled on the config, publish metrics twice,
+                # once with backward compatible name, and then with tags.
+                with DualStatsManager.timer(
+                    f"ol.emit.attempts.{event_type}.{transport_type}", 
"ol.emit.attempts"
+                ):
+                    self._client.emit(redacted_event)
+                    self.log.info(
+                        "Successfully emitted OpenLineage `%s` event of id 
`%s`",
+                        event_type.upper(),
+                        event.run.runId,
+                    )

Review Comment:
   Rather than duplicating the code here, I think it would be better to use the 
exist stack and have this as something like
   
   ```python
               with ExitStack() as stack:
                   if AIRFLOW_V_3_1_PLUS:
                       stack.enter_context(DualStatsManager)
                   else:
                       stack.enter_context(Stats.timer(...))
                       stack.enter_context(Stats.timer(...))
   
                   self._client.emit(redacted_event)
                   self.log.info(
                       "Successfully emitted OpenLineage `%s` event of id `%s`",
                       event_type.upper(),
                       event.run.runId,
                   )
   ```



##########
providers/edge3/src/airflow/providers/edge3/models/edge_worker.py:
##########
@@ -27,13 +27,18 @@
 
 from airflow.exceptions import AirflowException
 from airflow.models.base import Base
+from airflow.providers.edge3.version_compat import AIRFLOW_V_3_1_PLUS
 from airflow.stats import Stats
-from airflow.utils import timezone
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.sqlalchemy import UtcDateTime
 
+if AIRFLOW_V_3_1_PLUS:
+    from airflow.sdk import timezone
+else:
+    from airflow.utils import timezone  # type: ignore[attr-defined,no-redef]

Review Comment:
   ```suggestion
   try:
       from airflow.sdk import timezone
   except ImportError:
       from airflow.utils import timezone  # type: ignore[attr-defined,no-redef]
   ```
   
   Better not to use version checks, especially as this an import from TaskSDK, 
which is working torwards independence from the Airflow version.



##########
providers/edge3/src/airflow/providers/edge3/version_compat.py:
##########
@@ -33,3 +33,15 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
 
 
 AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
+AIRFLOW_V_3_1_PLUS: bool = get_base_airflow_version_tuple() >= (3, 1, 0)
+
+if AIRFLOW_V_3_0_PLUS:
+    from airflow.sdk import BaseOperator
+else:
+    from airflow.models import BaseOperator
+
+__all__ = [
+    "AIRFLOW_V_3_0_PLUS",
+    "AIRFLOW_V_3_1_PLUS",
+    "BaseOperator",
+]

Review Comment:
   A few more places have this pattern that I haven't commented on



##########
providers/edge3/src/airflow/providers/edge3/models/edge_worker.py:
##########
@@ -27,13 +27,18 @@
 
 from airflow.exceptions import AirflowException
 from airflow.models.base import Base
+from airflow.providers.edge3.version_compat import AIRFLOW_V_3_1_PLUS

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to