This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ff77bd2f095 Fix OTel metrics lost in forked task processes (#64703)
ff77bd2f095 is described below

commit ff77bd2f095ae3169d04024ecf8c444c3123a973
Author: Michael Black <[email protected]>
AuthorDate: Sat Apr 4 07:26:13 2026 -0600

    Fix OTel metrics lost in forked task processes (#64703)
    
    Reset the OTel SDK's Once() guard on _METER_PROVIDER_SET_ONCE before
    calling set_meter_provider() in get_otel_logger(). When a forked child
    process re-initializes Stats (detected via PID mismatch in stats.py),
    the inherited Once._done = True flag prevents the new MeterProvider from
    being registered. The child falls back to the parent's stale provider
    whose PeriodicExportingMetricReader thread is dead after fork, causing
    task-level metrics like ti.finish to be silently dropped.
    
    The fix resets _done and _METER_PROVIDER before each set_meter_provider()
    call. On first initialization (no fork), _done is already False so this
    is a no-op. On re-initialization after fork, it allows the new provider
    to be set correctly.
    
    Closes: #64690
---
 .../observability/metrics/otel_logger.py           | 16 ++++++++
 .../observability/metrics/test_otel_logger.py      | 43 ++++++++++++++++++++++
 2 files changed, 59 insertions(+)

diff --git 
a/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py 
b/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
index 4b5174a3bf3..14726e3ecc0 100644
--- 
a/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
+++ 
b/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
@@ -433,6 +433,22 @@ def get_otel_logger(
         )
         readers.append(export_to_console)
 
+    # Reset the OTel SDK's Once() guard so set_meter_provider() can succeed.
+    # This is necessary when get_otel_logger() is called after a process fork:
+    # the parent's _METER_PROVIDER_SET_ONCE._done = True is inherited by the 
child,
+    # causing set_meter_provider() to silently fail with "Overriding of current
+    # MeterProvider is not allowed". The child then uses the parent's stale 
provider
+    # whose PeriodicExportingMetricReader thread is dead after fork.
+    # On first call (no fork), _done is already False so this is a no-op.
+    # See: https://github.com/apache/airflow/issues/64690
+    try:
+        import opentelemetry.metrics._internal as _metrics_internal
+
+        _metrics_internal._METER_PROVIDER_SET_ONCE._done = False
+        _metrics_internal._METER_PROVIDER = None
+    except (ImportError, AttributeError):
+        pass
+
     metrics.set_meter_provider(
         MeterProvider(
             resource=resource,
diff --git 
a/shared/observability/tests/observability/metrics/test_otel_logger.py 
b/shared/observability/tests/observability/metrics/test_otel_logger.py
index c27c3729969..f7b348354d7 100644
--- a/shared/observability/tests/observability/metrics/test_otel_logger.py
+++ b/shared/observability/tests/observability/metrics/test_otel_logger.py
@@ -442,7 +442,50 @@ class TestOtelMetrics:
             f"stderr:\n{proc.stderr}"
         )
 
+    def test_reinit_after_fork_exports_metrics(self):
+        """Calling get_otel_logger() twice (simulating post-fork re-init) 
should still export metrics.
+
+        Reproduces https://github.com/apache/airflow/issues/64690: the OTel 
SDK's Once()
+        guard on set_meter_provider() survives fork, preventing the child from 
setting a
+        fresh MeterProvider. The fix resets the guard before each 
set_meter_provider() call.
+        """
+        test_module_name = "tests.observability.metrics.test_otel_logger"
+        function_call_str = f"import {test_module_name} as m; 
m.mock_service_run_reinit()"
+
+        proc = subprocess.run(
+            [sys.executable, "-c", function_call_str],
+            check=False,
+            env=os.environ.copy(),
+            capture_output=True,
+            text=True,
+            timeout=20,
+        )
+
+        assert proc.returncode == 0, f"Process 
failed\nstdout:\n{proc.stdout}\nstderr:\n{proc.stderr}"
+
+        assert "post_fork_stat" in proc.stdout, (
+            "Expected 'post_fork_stat' in stdout after re-initialization but 
it wasn't found. "
+            "This suggests set_meter_provider() failed due to the Once() 
guard.\n"
+            f"stdout:\n{proc.stdout}\n"
+            f"stderr:\n{proc.stderr}"
+        )
+
 
 def mock_service_run():
     logger = get_otel_logger(debug=True)
     logger.incr("my_test_stat")
+
+
+def mock_service_run_reinit():
+    """Simulate re-initialization after fork by calling get_otel_logger() 
twice.
+
+    The first call sets the global MeterProvider and the Once() guard.
+    The second call simulates what happens in a forked child: stats.py detects
+    a PID mismatch and calls the factory again. Without the fix, the second
+    set_meter_provider() silently fails and the child uses a stale provider.
+    """
+    # First init — sets Once._done = True
+    get_otel_logger(debug=True)
+    # Second init — simulates post-fork re-initialization
+    logger = get_otel_logger(debug=True)
+    logger.incr("post_fork_stat")

Reply via email to