This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ff77bd2f095 Fix OTel metrics lost in forked task processes (#64703)
ff77bd2f095 is described below
commit ff77bd2f095ae3169d04024ecf8c444c3123a973
Author: Michael Black <[email protected]>
AuthorDate: Sat Apr 4 07:26:13 2026 -0600
Fix OTel metrics lost in forked task processes (#64703)
Reset the OTel SDK's Once() guard on _METER_PROVIDER_SET_ONCE before
calling set_meter_provider() in get_otel_logger(). When a forked child
process re-initializes Stats (detected via PID mismatch in stats.py),
the inherited Once._done = True flag prevents the new MeterProvider from
being registered. The child falls back to the parent's stale provider
whose PeriodicExportingMetricReader thread is dead after fork, causing
task-level metrics like ti.finish to be silently dropped.
The fix resets _done and _METER_PROVIDER before each set_meter_provider()
call. On first initialization (no fork), _done is already False so this
is a no-op. On re-initialization after fork, it allows the new provider
to be set correctly.
Closes: #64690
---
.../observability/metrics/otel_logger.py | 16 ++++++++
.../observability/metrics/test_otel_logger.py | 43 ++++++++++++++++++++++
2 files changed, 59 insertions(+)
diff --git
a/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
b/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
index 4b5174a3bf3..14726e3ecc0 100644
---
a/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
+++
b/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
@@ -433,6 +433,22 @@ def get_otel_logger(
)
readers.append(export_to_console)
+ # Reset the OTel SDK's Once() guard so set_meter_provider() can succeed.
+ # This is necessary when get_otel_logger() is called after a process fork:
+ # the parent's _METER_PROVIDER_SET_ONCE._done = True is inherited by the
child,
+ # causing set_meter_provider() to silently fail with "Overriding of current
+ # MeterProvider is not allowed". The child then uses the parent's stale
provider
+ # whose PeriodicExportingMetricReader thread is dead after fork.
+ # On first call (no fork), _done is already False so this is a no-op.
+ # See: https://github.com/apache/airflow/issues/64690
+ try:
+ import opentelemetry.metrics._internal as _metrics_internal
+
+ _metrics_internal._METER_PROVIDER_SET_ONCE._done = False
+ _metrics_internal._METER_PROVIDER = None
+ except (ImportError, AttributeError):
+ pass
+
metrics.set_meter_provider(
MeterProvider(
resource=resource,
diff --git
a/shared/observability/tests/observability/metrics/test_otel_logger.py
b/shared/observability/tests/observability/metrics/test_otel_logger.py
index c27c3729969..f7b348354d7 100644
--- a/shared/observability/tests/observability/metrics/test_otel_logger.py
+++ b/shared/observability/tests/observability/metrics/test_otel_logger.py
@@ -442,7 +442,50 @@ class TestOtelMetrics:
f"stderr:\n{proc.stderr}"
)
+ def test_reinit_after_fork_exports_metrics(self):
+ """Calling get_otel_logger() twice (simulating post-fork re-init)
should still export metrics.
+
+ Reproduces https://github.com/apache/airflow/issues/64690: the OTel
SDK's Once()
+ guard on set_meter_provider() survives fork, preventing the child from
setting a
+ fresh MeterProvider. The fix resets the guard before each
set_meter_provider() call.
+ """
+ test_module_name = "tests.observability.metrics.test_otel_logger"
+ function_call_str = f"import {test_module_name} as m;
m.mock_service_run_reinit()"
+
+ proc = subprocess.run(
+ [sys.executable, "-c", function_call_str],
+ check=False,
+ env=os.environ.copy(),
+ capture_output=True,
+ text=True,
+ timeout=20,
+ )
+
+ assert proc.returncode == 0, f"Process
failed\nstdout:\n{proc.stdout}\nstderr:\n{proc.stderr}"
+
+ assert "post_fork_stat" in proc.stdout, (
+ "Expected 'post_fork_stat' in stdout after re-initialization but
it wasn't found. "
+ "This suggests set_meter_provider() failed due to the Once()
guard.\n"
+ f"stdout:\n{proc.stdout}\n"
+ f"stderr:\n{proc.stderr}"
+ )
+
def mock_service_run():
logger = get_otel_logger(debug=True)
logger.incr("my_test_stat")
+
+
+def mock_service_run_reinit():
+ """Simulate re-initialization after fork by calling get_otel_logger()
twice.
+
+ The first call sets the global MeterProvider and the Once() guard.
+ The second call simulates what happens in a forked child: stats.py detects
+ a PID mismatch and calls the factory again. Without the fix, the second
+ set_meter_provider() silently fails and the child uses a stale provider.
+ """
+ # First init — sets Once._done = True
+ get_otel_logger(debug=True)
+ # Second init — simulates post-fork re-initialization
+ logger = get_otel_logger(debug=True)
+ logger.incr("post_fork_stat")