Saddala opened a new issue, #64959:
URL: https://github.com/apache/airflow/issues/64959
### Under which category would you file this issue?
Airflow Core
### Apache Airflow version
3.1.8 and also in current latest 3.2.0 (v3-2-test)
### What happened and how to reproduce it?
The DAG Processor Job crashes with **ValueError: write to closed file**
inside **structlog.BytesLogger._write()** when t**erminate_orphan_processes**
kills orphan DAG-file subprocesses during a bundle refresh cycle. The full
DagProcessorJobRunner is torn down and must be restarted.
**The crash sequence:**
DagProcessorJobRunner._execute
1. DagFileProcessorManager._run_parsing_loop
2. _refresh_dag_bundles → handle_removed_files
3. terminate_orphan_processes
4. DagFileProcessorProcess.kill(SIGKILL)
5. _service_subprocess (shared selector delivers a stale socket event)
6. process_log_messages_from_subprocess generator
7. target.log(level, msg, **event)
8. structlog.BytesLogger._write()
ValueError: write to closed file
This is a race condition between two sequential but incorrectly ordered
operations in **terminate_orphan_processes**:
1. **processor.kill(signal.SIGKILL)** sends SIGKILL, loops
_service_subprocess() until exit code is detected, then returns.
2. **processor.logger_filehandle.close()** closes the BytesLogger file
handle.
**kill()** returns as soon as the child process's exit code is confirmed. It
does not guarantee all buffered data in the subprocess's log sockets has been
fully drained. The subprocess's sockets remain registered in the manager's
shared selectors.BaseSelector. A subsequent **selector.select()** call from the
main parsing loop or the next kill cycle delivers those remaining socket
events. Their callbacks call **process_log_messages_from_subprocess**, which
calls **target.log() → BytesLogger._write()** on the already-closed file
handle, raising **ValueError** and killing the entire Job.
### What you think should happen instead?
Killing an orphan subprocess should be safe regardless of buffered log data
remaining in its sockets. The **ValueError** must not propagate out of
**process_log_messages_from_subprocess**. At worst, residual log lines from a
dead subprocess should be silently dropped, the DAG Processor Job must survive.
### Operating System
Amazon Linux 2 Version 2
### Deployment
Other
### Apache Airflow Provider(s)
_No response_
### Versions of Apache Airflow Providers
apache-airflow-providers-amazon==9.22.0
apache-airflow-providers-cncf-kubernetes==10.12.3
apache-airflow-providers-common-compat==1.13.0
apache-airflow-providers-common-io==1.7.1
apache-airflow-providers-common-sql==1.30.4
apache-airflow-providers-fab==3.2.0
apache-airflow-providers-git==0.2.2
apache-airflow-providers-http==6.0.0
apache-airflow-providers-postgres==6.5.3
apache-airflow-providers-smtp==2.4.2
apache-airflow-providers-standard==1.11.0
### Official Helm Chart version
Not Applicable
### Kubernetes Version
Not Applicable
### Helm Chart configuration
_No response_
### Docker Image customizations
Not Applicable
### Anything else?
**Root Cause Analysis:**
There are two interacting flaws:
**Flaw 1 — airflow-core/src/airflow/dag_processing/manager.py,
terminate_orphan_processes**
Current code (main branch, also 3.1.8):
_def terminate_orphan_processes(self, present: set[DagFileInfo]):
"""Stop processors that are working on deleted files."""
for file in list(self._processors.keys()):
if file not in present:
processor = self._processors.pop(file, None)
if not processor:
continue
file_name = str(file.rel_path)
self.log.warning("Stopping processor for %s", file_name)
Stats.decr("dag_processing.processes", tags={"file_path":
file_name, "action": "stop"})
processor.kill(signal.SIGKILL)
processor.logger_filehandle.close() # ← BUG: closes the
handle before
# draining remaining
socket events
self._file_stats.pop(file, None)_
All DagFileProcessorProcess instances share the manager's
selectors.BaseSelector — this selector is created on DagFileProcessorManager
(field selector, manager.py line 174) and passed via selector=self.selector to
every
DagFileProcessorProcess.start() call (manager.py _create_process). After
logger_filehandle.close(), those sockets remain registered in the shared
selector. The next selector.select() call delivers stale events, whose
callbacks reach
BytesLogger._write() on the closed handle.
The normal cleanup path — _on_socket_closed in WatchedSubprocess —
unregisters a socket only when EOF is read from it. Under SIGKILL, the process
may die before all buffered data has been read, meaning EOF is never delivered
and
_on_socket_closed is never called before the file handle is already closed.
**Flaw 2 — task-sdk/src/airflow/sdk/execution_time/supervisor.py,
process_log_messages_from_subprocess**
Current code (main branch, also 3.1.8):
_def process_log_messages_from_subprocess(
loggers: tuple[FilteringBoundLogger, ...],
) -> Generator[None, bytes | bytearray, None]:
...
while True:
line = yield
...
if level := NAME_TO_LEVEL.get(event.pop("level")):
msg = event.pop("event", None)
for target in loggers:
target.log(level, msg, **event) # ← no guard against
ValueError
# from a closed
BytesLogger_
No protection exists against ValueError raised when the underlying
BytesLogger file handle has been closed. The exception propagates all the way
through the supervisor call stack and kills the Job.
**Full Stacktrace:**
Traceback (most recent call last):
File
"/usr/local/miniconda/lib/python3.10/site-packages/airflow/jobs/dag_processor_job_runner.py",
line 61, in _execute
self.processor.run()
File
"/usr/local/miniconda/lib/python3.10/site-packages/airflow/dag_processing/manager.py",
line 297, in run
return self._run_parsing_loop()
File
"/usr/local/miniconda/lib/python3.10/site-packages/airflow/dag_processing/manager.py",
line 389, in _run_parsing_loop
self._refresh_dag_bundles(known_files=known_files)
File
"/usr/local/miniconda/lib/python3.10/site-packages/airflow/dag_processing/manager.py",
line 637, in _refresh_dag_bundles
self.handle_removed_files(known_files=known_files)
File
"/usr/local/miniconda/lib/python3.10/site-packages/airflow/dag_processing/manager.py",
line 836, in handle_removed_files
self.terminate_orphan_processes(present=files_set)
File
"/usr/local/miniconda/lib/python3.10/site-packages/airflow/dag_processing/manager.py",
line 861, in terminate_orphan_processes
processor.kill(signal.SIGKILL)
File
"/usr/local/miniconda/lib/python3.10/site-packages/airflow/sdk/execution_time/supervisor.py",
line 706, in kill
exit_code := self._service_subprocess(
File
"/usr/local/miniconda/lib/python3.10/site-packages/airflow/sdk/execution_time/supervisor.py",
line 773, in _service_subprocess
need_more = socket_handler(key.fileobj)
File
"/usr/local/miniconda/lib/python3.10/site-packages/airflow/sdk/execution_time/supervisor.py",
line 1723, in cb
gen.send(line)
File
"/usr/local/miniconda/lib/python3.10/site-packages/airflow/sdk/execution_time/supervisor.py",
line 1806, in process_log_messages_from_subprocess
target.log(level, msg, **event)
File
"/usr/local/miniconda/lib/python3.10/site-packages/structlog/_native.py", line
197, in log
return self._proxy_to_logger(
File
"/usr/local/miniconda/lib/python3.10/site-packages/structlog/_base.py", line
223, in _proxy_to_logger
return getattr(self._logger, method_name)(*args, **kw)
File
"/usr/local/miniconda/lib/python3.10/site-packages/structlog/_output.py", line
321, in msg
self._write(message + b"\n")
ValueError: write to closed file
### Are you willing to submit PR?
- [x] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]