This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a9df1220b8a Flush in-memory OTel metrics at process shutdown (#61808)
a9df1220b8a is described below
commit a9df1220b8a62aff9e6c0004de0624051a937a91
Author: Christos Bisias <[email protected]>
AuthorDate: Fri Feb 13 15:30:34 2026 +0200
Flush in-memory OTel metrics at process shutdown (#61808)
* flush in-memory otel metrics at process shutdown
* fix module for test
---
airflow-core/tests/integration/otel/test_otel.py | 57 +++++++++++++++++-----
.../observability/metrics/otel_logger.py | 13 +++++
.../airflow_shared/observability/metrics/stats.py | 28 +++++++++++
.../observability/metrics/test_otel_logger.py | 36 ++++++++++++++
4 files changed, 121 insertions(+), 13 deletions(-)
diff --git a/airflow-core/tests/integration/otel/test_otel.py
b/airflow-core/tests/integration/otel/test_otel.py
index d0e9e3877a0..fa3c5b358e3 100644
--- a/airflow-core/tests/integration/otel/test_otel.py
+++ b/airflow-core/tests/integration/otel/test_otel.py
@@ -229,6 +229,15 @@ def check_legacy_metrics(output: str, dag: DAG,
legacy_metrics_on: bool):
assert set(legacy_metric_names).issubset(metrics_dict.keys())
+def check_metrics_exist(output: str, metrics_to_check: list[str]):
+ # Get a list of lines from the captured output.
+ output_lines = output.splitlines()
+
+ metrics_dict = extract_metrics_from_output(output_lines)
+
+ assert set(metrics_to_check).issubset(metrics_dict.keys())
+
+
def check_spans_with_continuance(output: str, dag: DAG, continuance_for_t1:
bool = True):
# Get a list of lines from the captured output.
output_lines = output.splitlines()
@@ -812,25 +821,13 @@ class TestOtelIntegration:
except Exception as ex:
log.error("Could not delete leftover control file '%s', error:
'%s'.", self.control_file, ex)
- @pytest.mark.parametrize(
- ("legacy_names_on_bool", "legacy_names_exported"),
- [
- pytest.param(True, True, id="export_legacy_names"),
- pytest.param(False, False, id="dont_export_legacy_names"),
- ],
- )
- def test_export_legacy_metric_names(
- self, legacy_names_on_bool, legacy_names_exported, monkeypatch,
celery_worker_env_vars, capfd, session
- ):
+ def dag_execution_for_testing_metrics(self, capfd):
# Metrics.
os.environ["AIRFLOW__METRICS__OTEL_ON"] = "True"
os.environ["AIRFLOW__METRICS__OTEL_HOST"] = "breeze-otel-collector"
os.environ["AIRFLOW__METRICS__OTEL_PORT"] = "4318"
os.environ["AIRFLOW__METRICS__OTEL_INTERVAL_MILLISECONDS"] = "5000"
- assert isinstance(legacy_names_on_bool, bool)
- os.environ["AIRFLOW__METRICS__LEGACY_NAMES_ON"] =
str(legacy_names_on_bool)
-
if self.use_otel != "true":
os.environ["AIRFLOW__METRICS__OTEL_DEBUGGING_ON"] = "True"
@@ -869,6 +866,7 @@ class TestOtelIntegration:
task_id=task_id, run_id=run_id, state=State.SUCCESS,
span_status=None
)
+ print_ti_output_for_dag_run(dag_id=dag_id, run_id=run_id)
finally:
# Terminate the processes.
celery_worker_process.terminate()
@@ -899,11 +897,44 @@ class TestOtelIntegration:
log.info("out-start --\n%s\n-- out-end", out)
log.info("err-start --\n%s\n-- err-end", err)
+ return out, dag
+
+ @pytest.mark.parametrize(
+ ("legacy_names_on_bool", "legacy_names_exported"),
+ [
+ pytest.param(True, True, id="export_legacy_names"),
+ pytest.param(False, False, id="dont_export_legacy_names"),
+ ],
+ )
+ def test_export_legacy_metric_names(
+ self, legacy_names_on_bool, legacy_names_exported, monkeypatch,
celery_worker_env_vars, capfd, session
+ ):
+ assert isinstance(legacy_names_on_bool, bool)
+ os.environ["AIRFLOW__METRICS__LEGACY_NAMES_ON"] =
str(legacy_names_on_bool)
+
+ out, dag = self.dag_execution_for_testing_metrics(capfd)
+
if self.use_otel != "true":
# Test the metrics from the output.
assert isinstance(legacy_names_exported, bool)
check_legacy_metrics(output=out, dag=dag,
legacy_metrics_on=legacy_names_exported)
+ def test_export_metrics_during_process_shutdown(
+ self, monkeypatch, celery_worker_env_vars, capfd, session
+ ):
+ out, dag = self.dag_execution_for_testing_metrics(capfd)
+
+ if self.use_otel != "true":
+ # Test the metrics from the output.
+ metrics_to_check = [
+ "airflow.ti_successes",
+ "airflow.operator_successes",
+ "airflow.executor.running_tasks",
+ "airflow.executor.queued_tasks",
+ "airflow.executor.open_slots",
+ ]
+ check_metrics_exist(output=out, metrics_to_check=metrics_to_check)
+
@pytest.mark.execution_timeout(90)
def test_dag_execution_succeeds(self, monkeypatch, celery_worker_env_vars,
capfd, session):
"""The same scheduler will start and finish the dag processing."""
diff --git
a/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
b/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
index e9587192592..2db61862db9 100644
---
a/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
+++
b/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
@@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations
+import atexit
import datetime
import logging
import os
@@ -373,6 +374,15 @@ class MetricsMap:
self.map[key].set_value(value, delta)
+def flush_otel_metrics():
+ provider = metrics.get_meter_provider()
+ provider.force_flush()
+
+
+def atexit_register_metrics_flush():
+ atexit.register(flush_otel_metrics)
+
+
def get_otel_logger(
*,
host: str | None = None,
@@ -424,6 +434,9 @@ def get_otel_logger(
),
)
+ # Register a hook that flushes any in-memory metrics at shutdown.
+ atexit_register_metrics_flush()
+
validator = get_validator(metrics_allow_list, metrics_block_list)
return SafeOtelLogger(
diff --git
a/shared/observability/src/airflow_shared/observability/metrics/stats.py
b/shared/observability/src/airflow_shared/observability/metrics/stats.py
index bfdecd65e8e..8055f8ed95d 100644
--- a/shared/observability/src/airflow_shared/observability/metrics/stats.py
+++ b/shared/observability/src/airflow_shared/observability/metrics/stats.py
@@ -18,6 +18,7 @@
from __future__ import annotations
import logging
+import os
import re
import socket
from collections.abc import Callable
@@ -69,6 +70,31 @@ class _Stats(type):
factory = type.__getattribute__(cls, "factory")
instance = type.__getattribute__(cls, "instance")
+ # When using OpenTelemetry, some subprocesses are short-lived and
+ # often exit before flushing any metrics.
+ #
+ # The solution is to register a hook that performs a force flush at
exit.
+ # The atexit hook is registered when initializing the instance.
+ #
+ # The instance gets initialized once per process. In case a process is
forked, then
+ # the new subprocess, will inherit the already initialized instance of
the parent process.
+ #
+ # Store the instance pid so that it can be compared with the current
pid
+ # to decide whether to initialize the instance again or not.
+ #
+ # So far, all forks are resetting their state to remove anything
inherited by the parent.
+ # But in the future that might not always be true.
+ current_pid = os.getpid()
+ if cls.instance and cls._instance_pid != current_pid:
+ log.info(
+ "Stats instance was created in PID %s but accessed in PID %s.
Re-initializing.",
+ cls._instance_pid,
+ current_pid,
+ )
+ # Setting the instance to None, will force re-initialization.
+ cls.instance = None
+ cls._instance_pid = None
+
if instance is None:
if factory is None:
factory = NoStatsLogger
@@ -76,9 +102,11 @@ class _Stats(type):
try:
instance = factory()
+ cls._instance_pid = current_pid
except (socket.gaierror, ImportError) as e:
log.error("Could not configure StatsClient: %s, using
NoStatsLogger instead.", e)
instance = NoStatsLogger()
+ cls._instance_pid = current_pid
type.__setattr__(cls, "instance", instance)
diff --git
a/shared/observability/tests/observability/metrics/test_otel_logger.py
b/shared/observability/tests/observability/metrics/test_otel_logger.py
index 22578392c4b..e09ad9a1f4f 100644
--- a/shared/observability/tests/observability/metrics/test_otel_logger.py
+++ b/shared/observability/tests/observability/metrics/test_otel_logger.py
@@ -17,6 +17,9 @@
from __future__ import annotations
import logging
+import os
+import subprocess
+import sys
import time
from unittest import mock
@@ -32,6 +35,7 @@ from airflow_shared.observability.metrics.otel_logger import (
_generate_key_name,
_is_up_down_counter,
full_name,
+ get_otel_logger,
)
from airflow_shared.observability.metrics.validators import (
BACK_COMPAT_METRIC_NAMES,
@@ -307,3 +311,35 @@ class TestOtelMetrics:
assert timer.duration == expected_value
assert mock_time.call_count == 2
self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name))
+
+ def test_atexit_flush_on_process_exit(self):
+ """
+ Run a process that initializes a logger, creates a stat and then exits.
+
+ The logger initialization registers an atexit hook.
+ Test that the hook runs and flushes the created stat at shutdown.
+ """
+ test_module_name = "tests.observability.metrics.test_otel_logger"
+ function_call_str = f"import {test_module_name} as m;
m.mock_service_run()"
+
+ proc = subprocess.run(
+ [sys.executable, "-c", function_call_str],
+ check=False,
+ env=os.environ.copy(),
+ capture_output=True,
+ text=True,
+ timeout=20,
+ )
+
+ assert proc.returncode == 0, f"Process
failed\nstdout:\n{proc.stdout}\nstderr:\n{proc.stderr}"
+
+ assert "my_test_stat" in proc.stdout, (
+ "Expected the metric name to be present in the stdout but it
wasn't.\n"
+ f"stdout:\n{proc.stdout}\n"
+ f"stderr:\n{proc.stderr}"
+ )
+
+
+def mock_service_run():
+ logger = get_otel_logger(debug=True)
+ logger.incr("my_test_stat")