GitHub user IanMoroney added a comment to the discussion: Airflow tasks hang in 
'Running' state

Sorry to necro an old thread here, but this may help others:

In version 3.19.0 (Released 2025-11-21) of dd-trace-py, our pods were getting 
stuck with a deadlock in forked sub-processes caused by the Datadog trace 
library. Reverting back to 3.18.1 and pinning to that version solved the issue.

https://github.com/DataDog/dd-trace-py/blob/b3855747420b6f074418c78fe267721270281b81/ddtrace/internal/writer/writer.py#L857C1-L857C37
```
        # Stop the trace exporter worker
        self._exporter.stop_worker()
```

This is the line where we saw the pods getting stuck, which is a method from 
the NativeWriter class.

The 3.19 release bumped libdatadog to a new major version (24.0.0) and made 
NativeWriter the default in the Nov 13th commits.

```
# ddtrace/internal/writer/writer.py
def create_trace_writer(...) -> TraceWriter:
    if _use_log_writer():
        return LogWriter()

    if config._trace_writer_native:  # DEFAULT IS NOW TRUE IN 3.19.0
        return NativeWriter(...)     # ← This has the fork bug
    else:
        return AgentWriter(...)      # ← This was used before
```

The NativeWriter is implemented in Rust + tokio. This is the underlying rust 
code implementation: 
https://github.com/DataDog/libdatadog/blob/3445414c9ba4fefc76be46cf7e2f998986592892/libdd-data-pipeline/src/trace_exporter/mod.rs#L302

```
    pub fn stop_worker(&self) {
        let runtime = self.runtime.lock_or_panic().take();
        if let Some(ref rt) = runtime {
            // Stop workers to save their state
            let mut workers = self.workers.lock_or_panic();
            rt.block_on(async {
                let _ = workers.info.pause().await;
                if let Some(stats_worker) = &mut workers.stats {
                    let _ = stats_worker.pause().await;
                };
                if let Some(telemetry_worker) = &mut workers.telemetry {
                    let _ = telemetry_worker.pause().await;
                };
            });
        }
        // When the info fetcher is paused, the trigger channel keeps a 
reference to the runtime's
        // IoStack as a waker. This prevents the IoStack from being dropped 
when shutting
        // down runtime. By manually sending a message to the trigger channel 
we trigger the
        // waker releasing the reference to the IoStack. Finally we drain the 
channel to
        // avoid triggering a fetch when the info fetcher is restarted.
        if let PausableWorker::Paused { worker } = &mut 
self.workers.lock_or_panic().info {
            self.info_response_observer.manual_trigger();
            worker.drain();
        }
        drop(runtime);
    }
```

self.runtime and self.workers are protected behind an Arc<Mutex<>>
```
  pub struct TraceExporter {
      runtime: Arc<Mutex<Option<Arc<Runtime>>>>,
      workers: Arc<Mutex<TraceExporterWorkers>>,
  }
```
 
Rust's std::sync::Mutex is built on POSIX pthread_mutex_t on Unix systems, 
which is not fork-safe.
We also have reason to believe that tokio itself is also not fork safe.
https://github.com/tokio-rs/tokio/issues/1541
https://github.com/tokio-rs/tokio/issues/4301

TL;DR, there may be some third party non-fork-safe observability or add-ins 
(such as the AWS one in the thread above, or the Datadog on here) which cause 
deadlocks in forked sub-processes, which makes it appear as if Airflow spawns 
"stuck" processes, so if in doubt, check anything that might be observing these 
tasks as they spawn.

GitHub link: 
https://github.com/apache/airflow/discussions/19587#discussioncomment-15135201

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to