This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a9df1220b8a Flush in-memory OTel metrics at process shutdown (#61808)
a9df1220b8a is described below

commit a9df1220b8a62aff9e6c0004de0624051a937a91
Author: Christos Bisias <[email protected]>
AuthorDate: Fri Feb 13 15:30:34 2026 +0200

    Flush in-memory OTel metrics at process shutdown (#61808)
    
    * flush in-memory otel metrics at process shutdown
    
    * fix module for test
---
 airflow-core/tests/integration/otel/test_otel.py   | 57 +++++++++++++++++-----
 .../observability/metrics/otel_logger.py           | 13 +++++
 .../airflow_shared/observability/metrics/stats.py  | 28 +++++++++++
 .../observability/metrics/test_otel_logger.py      | 36 ++++++++++++++
 4 files changed, 121 insertions(+), 13 deletions(-)

diff --git a/airflow-core/tests/integration/otel/test_otel.py 
b/airflow-core/tests/integration/otel/test_otel.py
index d0e9e3877a0..fa3c5b358e3 100644
--- a/airflow-core/tests/integration/otel/test_otel.py
+++ b/airflow-core/tests/integration/otel/test_otel.py
@@ -229,6 +229,15 @@ def check_legacy_metrics(output: str, dag: DAG, 
legacy_metrics_on: bool):
         assert set(legacy_metric_names).issubset(metrics_dict.keys())
 
 
+def check_metrics_exist(output: str, metrics_to_check: list[str]):
+    # Get a list of lines from the captured output.
+    output_lines = output.splitlines()
+
+    metrics_dict = extract_metrics_from_output(output_lines)
+
+    assert set(metrics_to_check).issubset(metrics_dict.keys())
+
+
 def check_spans_with_continuance(output: str, dag: DAG, continuance_for_t1: 
bool = True):
     # Get a list of lines from the captured output.
     output_lines = output.splitlines()
@@ -812,25 +821,13 @@ class TestOtelIntegration:
         except Exception as ex:
             log.error("Could not delete leftover control file '%s', error: 
'%s'.", self.control_file, ex)
 
-    @pytest.mark.parametrize(
-        ("legacy_names_on_bool", "legacy_names_exported"),
-        [
-            pytest.param(True, True, id="export_legacy_names"),
-            pytest.param(False, False, id="dont_export_legacy_names"),
-        ],
-    )
-    def test_export_legacy_metric_names(
-        self, legacy_names_on_bool, legacy_names_exported, monkeypatch, 
celery_worker_env_vars, capfd, session
-    ):
+    def dag_execution_for_testing_metrics(self, capfd):
         # Metrics.
         os.environ["AIRFLOW__METRICS__OTEL_ON"] = "True"
         os.environ["AIRFLOW__METRICS__OTEL_HOST"] = "breeze-otel-collector"
         os.environ["AIRFLOW__METRICS__OTEL_PORT"] = "4318"
         os.environ["AIRFLOW__METRICS__OTEL_INTERVAL_MILLISECONDS"] = "5000"
 
-        assert isinstance(legacy_names_on_bool, bool)
-        os.environ["AIRFLOW__METRICS__LEGACY_NAMES_ON"] = 
str(legacy_names_on_bool)
-
         if self.use_otel != "true":
             os.environ["AIRFLOW__METRICS__OTEL_DEBUGGING_ON"] = "True"
 
@@ -869,6 +866,7 @@ class TestOtelIntegration:
                     task_id=task_id, run_id=run_id, state=State.SUCCESS, 
span_status=None
                 )
 
+            print_ti_output_for_dag_run(dag_id=dag_id, run_id=run_id)
         finally:
             # Terminate the processes.
             celery_worker_process.terminate()
@@ -899,11 +897,44 @@ class TestOtelIntegration:
         log.info("out-start --\n%s\n-- out-end", out)
         log.info("err-start --\n%s\n-- err-end", err)
 
+        return out, dag
+
+    @pytest.mark.parametrize(
+        ("legacy_names_on_bool", "legacy_names_exported"),
+        [
+            pytest.param(True, True, id="export_legacy_names"),
+            pytest.param(False, False, id="dont_export_legacy_names"),
+        ],
+    )
+    def test_export_legacy_metric_names(
+        self, legacy_names_on_bool, legacy_names_exported, monkeypatch, 
celery_worker_env_vars, capfd, session
+    ):
+        assert isinstance(legacy_names_on_bool, bool)
+        os.environ["AIRFLOW__METRICS__LEGACY_NAMES_ON"] = 
str(legacy_names_on_bool)
+
+        out, dag = self.dag_execution_for_testing_metrics(capfd)
+
         if self.use_otel != "true":
             # Test the metrics from the output.
             assert isinstance(legacy_names_exported, bool)
             check_legacy_metrics(output=out, dag=dag, 
legacy_metrics_on=legacy_names_exported)
 
+    def test_export_metrics_during_process_shutdown(
+        self, monkeypatch, celery_worker_env_vars, capfd, session
+    ):
+        out, dag = self.dag_execution_for_testing_metrics(capfd)
+
+        if self.use_otel != "true":
+            # Test the metrics from the output.
+            metrics_to_check = [
+                "airflow.ti_successes",
+                "airflow.operator_successes",
+                "airflow.executor.running_tasks",
+                "airflow.executor.queued_tasks",
+                "airflow.executor.open_slots",
+            ]
+            check_metrics_exist(output=out, metrics_to_check=metrics_to_check)
+
     @pytest.mark.execution_timeout(90)
     def test_dag_execution_succeeds(self, monkeypatch, celery_worker_env_vars, 
capfd, session):
         """The same scheduler will start and finish the dag processing."""
diff --git 
a/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py 
b/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
index e9587192592..2db61862db9 100644
--- 
a/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
+++ 
b/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
@@ -16,6 +16,7 @@
 # under the License.
 from __future__ import annotations
 
+import atexit
 import datetime
 import logging
 import os
@@ -373,6 +374,15 @@ class MetricsMap:
         self.map[key].set_value(value, delta)
 
 
+def flush_otel_metrics():
+    provider = metrics.get_meter_provider()
+    provider.force_flush()
+
+
+def atexit_register_metrics_flush():
+    atexit.register(flush_otel_metrics)
+
+
 def get_otel_logger(
     *,
     host: str | None = None,
@@ -424,6 +434,9 @@ def get_otel_logger(
         ),
     )
 
+    # Register a hook that flushes any in-memory metrics at shutdown.
+    atexit_register_metrics_flush()
+
     validator = get_validator(metrics_allow_list, metrics_block_list)
 
     return SafeOtelLogger(
diff --git 
a/shared/observability/src/airflow_shared/observability/metrics/stats.py 
b/shared/observability/src/airflow_shared/observability/metrics/stats.py
index bfdecd65e8e..8055f8ed95d 100644
--- a/shared/observability/src/airflow_shared/observability/metrics/stats.py
+++ b/shared/observability/src/airflow_shared/observability/metrics/stats.py
@@ -18,6 +18,7 @@
 from __future__ import annotations
 
 import logging
+import os
 import re
 import socket
 from collections.abc import Callable
@@ -69,6 +70,31 @@ class _Stats(type):
         factory = type.__getattribute__(cls, "factory")
         instance = type.__getattribute__(cls, "instance")
 
+        # When using OpenTelemetry, some subprocesses are short-lived and
+        # often exit before flushing any metrics.
+        #
+        # The solution is to register a hook that performs a force flush at 
exit.
+        # The atexit hook is registered when initializing the instance.
+        #
+        # The instance gets initialized once per process. In case a process is 
forked, then
+        # the new subprocess, will inherit the already initialized instance of 
the parent process.
+        #
+        # Store the instance pid so that it can be compared with the current 
pid
+        # to decide whether to initialize the instance again or not.
+        #
+        # So far, all forks are resetting their state to remove anything 
inherited by the parent.
+        # But in the future that might not always be true.
+        current_pid = os.getpid()
+        if cls.instance and cls._instance_pid != current_pid:
+            log.info(
+                "Stats instance was created in PID %s but accessed in PID %s. 
Re-initializing.",
+                cls._instance_pid,
+                current_pid,
+            )
+            # Setting the instance to None, will force re-initialization.
+            cls.instance = None
+            cls._instance_pid = None
+
         if instance is None:
             if factory is None:
                 factory = NoStatsLogger
@@ -76,9 +102,11 @@ class _Stats(type):
 
             try:
                 instance = factory()
+                cls._instance_pid = current_pid
             except (socket.gaierror, ImportError) as e:
                 log.error("Could not configure StatsClient: %s, using 
NoStatsLogger instead.", e)
                 instance = NoStatsLogger()
+                cls._instance_pid = current_pid
 
             type.__setattr__(cls, "instance", instance)
 
diff --git 
a/shared/observability/tests/observability/metrics/test_otel_logger.py 
b/shared/observability/tests/observability/metrics/test_otel_logger.py
index 22578392c4b..e09ad9a1f4f 100644
--- a/shared/observability/tests/observability/metrics/test_otel_logger.py
+++ b/shared/observability/tests/observability/metrics/test_otel_logger.py
@@ -17,6 +17,9 @@
 from __future__ import annotations
 
 import logging
+import os
+import subprocess
+import sys
 import time
 from unittest import mock
 
@@ -32,6 +35,7 @@ from airflow_shared.observability.metrics.otel_logger import (
     _generate_key_name,
     _is_up_down_counter,
     full_name,
+    get_otel_logger,
 )
 from airflow_shared.observability.metrics.validators import (
     BACK_COMPAT_METRIC_NAMES,
@@ -307,3 +311,35 @@ class TestOtelMetrics:
         assert timer.duration == expected_value
         assert mock_time.call_count == 2
         
self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name))
+
+    def test_atexit_flush_on_process_exit(self):
+        """
+        Run a process that initializes a logger, creates a stat and then exits.
+
+        The logger initialization registers an atexit hook.
+        Test that the hook runs and flushes the created stat at shutdown.
+        """
+        test_module_name = "tests.observability.metrics.test_otel_logger"
+        function_call_str = f"import {test_module_name} as m; 
m.mock_service_run()"
+
+        proc = subprocess.run(
+            [sys.executable, "-c", function_call_str],
+            check=False,
+            env=os.environ.copy(),
+            capture_output=True,
+            text=True,
+            timeout=20,
+        )
+
+        assert proc.returncode == 0, f"Process 
failed\nstdout:\n{proc.stdout}\nstderr:\n{proc.stderr}"
+
+        assert "my_test_stat" in proc.stdout, (
+            "Expected the metric name to be present in the stdout but it 
wasn't.\n"
+            f"stdout:\n{proc.stdout}\n"
+            f"stderr:\n{proc.stderr}"
+        )
+
+
+def mock_service_run():
+    logger = get_otel_logger(debug=True)
+    logger.incr("my_test_stat")

Reply via email to