This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new bb003741ee7 [v3-1-test] Flush in-memory OTel metrics at process
shutdown (#61808) (#61869)
bb003741ee7 is described below
commit bb003741ee7bef6c903909bffb0b88e06b9ff59b
Author: Christos Bisias <[email protected]>
AuthorDate: Tue Feb 17 03:05:55 2026 +0200
[v3-1-test] Flush in-memory OTel metrics at process shutdown (#61808)
(#61869)
* manually backport and resolve conflicts
* fix mypy-airflow-core error
* make package unit.core importable for the test subprocess
---
airflow-core/src/airflow/metrics/otel_logger.py | 13 +++
airflow-core/src/airflow/stats.py | 29 ++++++
airflow-core/tests/integration/otel/test_otel.py | 104 +++++++++++++++++++++
airflow-core/tests/unit/core/test_otel_logger.py | 55 +++++++++++
.../src/tests_common/test_utils/otel_utils.py | 68 ++++++++++----
5 files changed, 252 insertions(+), 17 deletions(-)
diff --git a/airflow-core/src/airflow/metrics/otel_logger.py
b/airflow-core/src/airflow/metrics/otel_logger.py
index 317b70a5ca2..6b75428529c 100644
--- a/airflow-core/src/airflow/metrics/otel_logger.py
+++ b/airflow-core/src/airflow/metrics/otel_logger.py
@@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations
+import atexit
import datetime
import logging
import random
@@ -370,6 +371,15 @@ class MetricsMap:
self.map[key].set_value(value, delta)
+def flush_otel_metrics():
+ provider = metrics.get_meter_provider()
+ provider.force_flush()
+
+
+def atexit_register_metrics_flush():
+ atexit.register(flush_otel_metrics)
+
+
def get_otel_logger(cls) -> SafeOtelLogger:
host = conf.get("metrics", "otel_host") # ex: "breeze-otel-collector"
port = conf.getint("metrics", "otel_port") # ex: 4318
@@ -405,4 +415,7 @@ def get_otel_logger(cls) -> SafeOtelLogger:
),
)
+ # Register a hook that flushes any in-memory metrics at shutdown.
+ atexit_register_metrics_flush()
+
return SafeOtelLogger(metrics.get_meter_provider(), prefix,
get_validator())
diff --git a/airflow-core/src/airflow/stats.py
b/airflow-core/src/airflow/stats.py
index 6cb9229ab73..50d46231fd2 100644
--- a/airflow-core/src/airflow/stats.py
+++ b/airflow-core/src/airflow/stats.py
@@ -18,6 +18,7 @@
from __future__ import annotations
import logging
+import os
import socket
from collections.abc import Callable
from typing import TYPE_CHECKING
@@ -34,14 +35,42 @@ log = logging.getLogger(__name__)
class _Stats(type):
factory: Callable
instance: StatsLogger | NoStatsLogger | None = None
+ _instance_pid: int | None = None
def __getattr__(cls, name: str) -> str:
+ # When using OpenTelemetry, some subprocesses are short-lived and
+ # often exit before flushing any metrics.
+ #
+ # The solution is to register a hook that performs a force flush at
exit.
+ # The atexit hook is registered when initializing the instance.
+ #
+ # The instance gets initialized once per process. In case a process is
forked, then
+ # the new subprocess, will inherit the already initialized instance of
the parent process.
+ #
+ # Store the instance pid so that it can be compared with the current
pid
+ # to decide whether to initialize the instance again or not.
+ #
+ # So far, all forks are resetting their state to remove anything
inherited by the parent.
+ # But in the future that might not always be true.
+ current_pid = os.getpid()
+ if cls.instance and cls._instance_pid != current_pid:
+ log.info(
+ "Stats instance was created in PID %s but accessed in PID %s.
Re-initializing.",
+ cls._instance_pid,
+ current_pid,
+ )
+ # Setting the instance to None, will force re-initialization.
+ cls.instance = None
+ cls._instance_pid = None
+
if not cls.instance:
try:
cls.instance = cls.factory()
+ cls._instance_pid = current_pid
except (socket.gaierror, ImportError) as e:
log.error("Could not configure StatsClient: %s, using
NoStatsLogger instead.", e)
cls.instance = NoStatsLogger()
+ cls._instance_pid = current_pid
return getattr(cls.instance, name)
def __init__(cls, *args, **kwargs) -> None:
diff --git a/airflow-core/tests/integration/otel/test_otel.py
b/airflow-core/tests/integration/otel/test_otel.py
index 51f70c51291..944f350c6e1 100644
--- a/airflow-core/tests/integration/otel/test_otel.py
+++ b/airflow-core/tests/integration/otel/test_otel.py
@@ -45,6 +45,7 @@ from tests_common.test_utils.otel_utils import (
assert_span_name_belongs_to_root_span,
assert_span_not_in_children_spans,
dump_airflow_metadata_db,
+ extract_metrics_from_output,
extract_spans_from_output,
get_parent_child_dict,
)
@@ -193,6 +194,15 @@ def check_ti_state_and_span_status(task_id: str, run_id:
str, state: str, span_s
)
+def check_metrics_exist(output: str, metrics_to_check: list[str]):
+ # Get a list of lines from the captured output.
+ output_lines = output.splitlines()
+
+ metrics_dict = extract_metrics_from_output(output_lines)
+
+ assert set(metrics_to_check).issubset(metrics_dict.keys())
+
+
def check_spans_with_continuance(output: str, dag: DAG, continuance_for_t1:
bool = True):
# Get a list of lines from the captured output.
output_lines = output.splitlines()
@@ -769,6 +779,100 @@ class TestOtelIntegration:
except Exception as ex:
log.error("Could not delete leftover control file '%s', error:
'%s'.", self.control_file, ex)
+ def dag_execution_for_testing_metrics(self, capfd):
+ # Metrics.
+ os.environ["AIRFLOW__METRICS__OTEL_ON"] = "True"
+ os.environ["AIRFLOW__METRICS__OTEL_HOST"] = "breeze-otel-collector"
+ os.environ["AIRFLOW__METRICS__OTEL_PORT"] = "4318"
+ os.environ["AIRFLOW__METRICS__OTEL_INTERVAL_MILLISECONDS"] = "5000"
+
+ if self.use_otel != "true":
+ os.environ["AIRFLOW__METRICS__OTEL_DEBUGGING_ON"] = "True"
+
+ celery_worker_process = None
+ scheduler_process = None
+ apiserver_process = None
+ try:
+ # Start the processes here and not as fixtures or in a common
setup,
+ # so that the test can capture their output.
+ celery_worker_process, scheduler_process, apiserver_process =
self.start_worker_and_scheduler1()
+
+ dag_id = "otel_test_dag"
+
+ assert len(self.dags) > 0
+ dag = self.dags[dag_id]
+
+ assert dag is not None
+
+ run_id = unpause_trigger_dag_and_get_run_id(dag_id=dag_id)
+
+ # Skip the span_status check.
+ wait_for_dag_run_and_check_span_status(
+ dag_id=dag_id, run_id=run_id, max_wait_time=90,
span_status=None
+ )
+
+ # The ti span_status is updated while processing the executor
events,
+ # which is after the dag_run state has been updated.
+ time.sleep(10)
+
+ task_dict = dag.task_dict
+ task_dict_ids = task_dict.keys()
+
+ for task_id in task_dict_ids:
+ # Skip the span_status check.
+ check_ti_state_and_span_status(
+ task_id=task_id, run_id=run_id, state=State.SUCCESS,
span_status=None
+ )
+
+ print_ti_output_for_dag_run(dag_id=dag_id, run_id=run_id)
+ finally:
+ # Terminate the processes.
+ celery_worker_process.terminate()
+ celery_worker_process.wait()
+
+ celery_status = celery_worker_process.poll()
+ assert celery_status is not None, (
+ "The celery worker process status is None, which means that it
hasn't terminated as expected."
+ )
+
+ scheduler_process.terminate()
+ scheduler_process.wait()
+
+ scheduler_status = scheduler_process.poll()
+ assert scheduler_status is not None, (
+ "The scheduler_1 process status is None, which means that it
hasn't terminated as expected."
+ )
+
+ apiserver_process.terminate()
+ apiserver_process.wait()
+
+ apiserver_status = apiserver_process.poll()
+ assert apiserver_status is not None, (
+ "The apiserver process status is None, which means that it
hasn't terminated as expected."
+ )
+
+ out, err = capfd.readouterr()
+ log.info("out-start --\n%s\n-- out-end", out)
+ log.info("err-start --\n%s\n-- err-end", err)
+
+ return out, dag
+
+ def test_export_metrics_during_process_shutdown(
+ self, monkeypatch, celery_worker_env_vars, capfd, session
+ ):
+ out, dag = self.dag_execution_for_testing_metrics(capfd)
+
+ if self.use_otel != "true":
+ # Test the metrics from the output.
+ metrics_to_check = [
+ "airflow.ti_successes",
+ "airflow.operator_successes",
+ "airflow.executor.running_tasks",
+ "airflow.executor.queued_tasks",
+ "airflow.executor.open_slots",
+ ]
+ check_metrics_exist(output=out, metrics_to_check=metrics_to_check)
+
@pytest.mark.execution_timeout(90)
def test_dag_execution_succeeds(self, monkeypatch, celery_worker_env_vars,
capfd, session):
"""The same scheduler will start and finish the dag processing."""
diff --git a/airflow-core/tests/unit/core/test_otel_logger.py
b/airflow-core/tests/unit/core/test_otel_logger.py
index 43d30ff9d07..db08fbd204e 100644
--- a/airflow-core/tests/unit/core/test_otel_logger.py
+++ b/airflow-core/tests/unit/core/test_otel_logger.py
@@ -17,6 +17,10 @@
from __future__ import annotations
import logging
+import os
+import pathlib
+import subprocess
+import sys
import time
from unittest import mock
@@ -32,8 +36,12 @@ from airflow.metrics.otel_logger import (
_generate_key_name,
_is_up_down_counter,
full_name,
+ get_otel_logger,
)
from airflow.metrics.validators import BACK_COMPAT_METRIC_NAMES,
MetricNameLengthExemptionWarning
+from airflow.stats import Stats
+
+from tests_common.test_utils.config import conf_vars
INVALID_STAT_NAME_CASES = [
(None, "can not be None"),
@@ -302,3 +310,50 @@ class TestOtelMetrics:
assert timer.duration == expected_value
assert mock_time.call_count == 2
self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name))
+
+ def test_atexit_flush_on_process_exit(self):
+ """
+ Run a process that initializes a logger, creates a stat and then exits.
+
+ The logger initialization registers an atexit hook.
+ Test that the hook runs and flushes the created stat at shutdown.
+ """
+ test_module_name = "unit.core.test_otel_logger"
+ function_call_str = f"import {test_module_name} as m;
m.mock_service_run()"
+
+ # pytest adds 'airflow-core/tests' to the path and makes the package
'unit.core' importable.
+ # The subprocess doesn't inherit it, and in order to make the package
importable,
+ # the current tests directory, needs to be injected into the
'PYTHONPATH'.
+ #
+ # Get 'airflow-core/tests' and add it to the env copy that is passed
to the subprocess.
+ tests_dir = str(pathlib.Path(__file__).resolve().parents[2])
+ current_env = os.environ.copy()
+ current_env["PYTHONPATH"] = tests_dir + os.pathsep +
current_env.get("PYTHONPATH", "")
+
+ proc = subprocess.run(
+ [sys.executable, "-c", function_call_str],
+ check=False,
+ env=current_env,
+ capture_output=True,
+ text=True,
+ timeout=20,
+ )
+
+ assert proc.returncode == 0, f"Process
failed\nstdout:\n{proc.stdout}\nstderr:\n{proc.stderr}"
+
+ assert "my_test_stat" in proc.stdout, (
+ "Expected the metric name to be present in the stdout but it
wasn't.\n"
+ f"stdout:\n{proc.stdout}\n"
+ f"stderr:\n{proc.stderr}"
+ )
+
+
+def mock_service_run():
+ with conf_vars(
+ {
+ ("metrics", "otel_on"): "True",
+ ("metrics", "otel_debugging_on"): "True",
+ }
+ ):
+ logger = get_otel_logger(Stats)
+ logger.incr("my_test_stat")
diff --git a/devel-common/src/tests_common/test_utils/otel_utils.py
b/devel-common/src/tests_common/test_utils/otel_utils.py
index 3e8e99ac5bf..a2f53b47381 100644
--- a/devel-common/src/tests_common/test_utils/otel_utils.py
+++ b/devel-common/src/tests_common/test_utils/otel_utils.py
@@ -19,6 +19,8 @@ from __future__ import annotations
import json
import logging
import pprint
+from collections import defaultdict
+from typing import Literal
from sqlalchemy import inspect
@@ -94,14 +96,21 @@ def clean_task_lines(lines: list) -> list:
return cleaned_lines
-def extract_spans_from_output(output_lines: list):
+def _extract_obj_from_output(output_lines: list[str], kind: Literal["spans"] |
Literal["metrics"]):
"""
- For a given list of ConsoleSpanExporter output lines, it extracts the json
spans and creates two dictionaries.
+ Used to extract spans or metrics from the output.
- :return: root spans dict (key: root_span_id - value: root_span), spans
dict (key: span_id - value: span)
+ Parameters
+ ----------
+ :param output_lines: The captured stdout split into lines.
+ :param kind: Which json type to extract from the output.
"""
+ assert kind in ("spans", "metrics")
+
span_dict = {}
root_span_dict = {}
+ metric_dict: dict[str, list[dict]] = defaultdict(list)
+
total_lines = len(output_lines)
index = 0
output_lines = clean_task_lines(output_lines)
@@ -133,23 +142,48 @@ def extract_spans_from_output(output_lines: list):
# Create a formatted json string and then convert the string to a
python dict.
json_str = "\n".join(json_lines)
try:
- span = json.loads(json_str)
- span_id = span["context"]["span_id"]
- span_dict[span_id] = span
-
- if span["parent_id"] is None:
- # This is a root span, add it to the root_span_map as well.
- root_span_id = span["context"]["span_id"]
- root_span_dict[root_span_id] = span
-
- except json.JSONDecodeError as e:
- log.error("Failed to parse JSON span: %s", e)
- log.error("Failed JSON string:")
- log.error(json_str)
+ obj = json.loads(json_str)
+ except json.JSONDecodeError:
+ log.error("Failed to parse JSON: %s", json_str)
+ index += 1
+ continue
+
+ if kind == "spans":
+ if "context" not in obj or "resource_metrics" in obj:
+ index += 1
+ continue
+ span_id = obj["context"]["span_id"]
+ span_dict[span_id] = obj
+ if obj["parent_id"] is None:
+ root_span_dict[span_id] = obj
+ else: # kind == "metrics"
+ if "resource_metrics" not in obj:
+ index += 1
+ continue
+ for res in obj["resource_metrics"]:
+ for scope in res.get("scope_metrics", []):
+ for metric in scope.get("metrics", []):
+ metric_dict[metric["name"]].append(metric)
+
else:
index += 1
- return root_span_dict, span_dict
+ return (root_span_dict, span_dict) if kind == "spans" else metric_dict
+
+
+def extract_spans_from_output(output_lines: list):
+ """
+ For a given list of output lines, it extracts the json spans and creates
two dictionaries.
+
+ :return: root spans dict (key: root_span_id - value: root_span), spans
dict (key: span_id - value: span)
+ """
+ return _extract_obj_from_output(output_lines, "spans")
+
+
+def extract_metrics_from_output(output_lines: list):
+ """For a given list of output lines, it extracts the json metrics and
creates a dictionary."""
+
+ return _extract_obj_from_output(output_lines, "metrics")
def get_id_for_a_given_name(span_dict: dict, span_name: str):