Saddala opened a new issue, #64959:
URL: https://github.com/apache/airflow/issues/64959

   ### Under which category would you file this issue?
   
   Airflow Core
   
   ### Apache Airflow version
   
   3.1.8 and also in current latest 3.2.0 (v3-2-test)
   
   ### What happened and how to reproduce it?
   
   The DAG Processor Job crashes with **ValueError: write to closed file** 
inside **structlog.BytesLogger._write()** when t**erminate_orphan_processes** 
kills orphan DAG-file subprocesses during a bundle refresh cycle. The full 
DagProcessorJobRunner is torn down and must be restarted.
   
   **The crash sequence:**
   DagProcessorJobRunner._execute
   
   1.    DagFileProcessorManager._run_parsing_loop
   2.    _refresh_dag_bundles → handle_removed_files
   3.    terminate_orphan_processes
   4.    DagFileProcessorProcess.kill(SIGKILL)
   5.    _service_subprocess (shared selector delivers a stale socket event)
   6.    process_log_messages_from_subprocess generator
   7.     target.log(level, msg, **event)
   8.    structlog.BytesLogger._write()
   ValueError: write to closed file
   
   This is a race condition between two sequential but incorrectly ordered 
operations in **terminate_orphan_processes**:
   
     1. **processor.kill(signal.SIGKILL)**  sends SIGKILL, loops 
_service_subprocess() until exit code is detected, then returns.
     2. **processor.logger_filehandle.close()**  closes the BytesLogger file 
handle.
   
   **kill()** returns as soon as the child process's exit code is confirmed. It 
does not guarantee all buffered data in the subprocess's log sockets has been 
fully drained. The subprocess's sockets remain registered in the manager's 
shared selectors.BaseSelector. A subsequent **selector.select()** call from the 
main parsing loop or the next kill cycle delivers those remaining socket 
events. Their callbacks call **process_log_messages_from_subprocess**, which 
calls **target.log() → BytesLogger._write()** on the already-closed file 
handle, raising **ValueError** and killing the entire Job.
   
   ### What you think should happen instead?
   
   Killing an orphan subprocess should be safe regardless of buffered log data 
remaining in its sockets. The **ValueError** must not propagate out of 
**process_log_messages_from_subprocess**. At worst, residual log lines from a 
dead subprocess should be silently dropped, the DAG Processor Job must survive.
   
   ### Operating System
   
   Amazon Linux 2 Version 2
   
   ### Deployment
   
   Other
   
   ### Apache Airflow Provider(s)
   
   _No response_
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-amazon==9.22.0
   apache-airflow-providers-cncf-kubernetes==10.12.3
   apache-airflow-providers-common-compat==1.13.0
   apache-airflow-providers-common-io==1.7.1
   apache-airflow-providers-common-sql==1.30.4
   apache-airflow-providers-fab==3.2.0
   apache-airflow-providers-git==0.2.2
   apache-airflow-providers-http==6.0.0
   apache-airflow-providers-postgres==6.5.3
   apache-airflow-providers-smtp==2.4.2
   apache-airflow-providers-standard==1.11.0
   
   ### Official Helm Chart version
   
   Not Applicable
   
   ### Kubernetes Version
   
   Not Applicable
   
   ### Helm Chart configuration
   
   _No response_
   
   ### Docker Image customizations
   
   Not Applicable
   
   ### Anything else?
   
   **Root Cause Analysis:**
   
     There are two interacting flaws:
   
     **Flaw 1 — airflow-core/src/airflow/dag_processing/manager.py, 
terminate_orphan_processes**
   
     Current code (main branch, also 3.1.8):
   
     _def terminate_orphan_processes(self, present: set[DagFileInfo]):
         """Stop processors that are working on deleted files."""
         for file in list(self._processors.keys()):
             if file not in present:
                 processor = self._processors.pop(file, None)
                 if not processor:
                     continue
                 file_name = str(file.rel_path)
                 self.log.warning("Stopping processor for %s", file_name)
                 Stats.decr("dag_processing.processes", tags={"file_path": 
file_name, "action": "stop"})
                 processor.kill(signal.SIGKILL)
                 processor.logger_filehandle.close()   # ← BUG: closes the 
handle before
                                                       #   draining remaining 
socket events
                 self._file_stats.pop(file, None)_
   
     All DagFileProcessorProcess instances share the manager's 
selectors.BaseSelector — this selector is created on DagFileProcessorManager 
(field selector, manager.py line 174) and passed via selector=self.selector to 
every
     DagFileProcessorProcess.start() call (manager.py _create_process). After 
logger_filehandle.close(), those sockets remain registered in the shared 
selector. The next selector.select() call delivers stale events, whose 
callbacks reach
     BytesLogger._write() on the closed handle.
   
     The normal cleanup path — _on_socket_closed in WatchedSubprocess — 
unregisters a socket only when EOF is read from it. Under SIGKILL, the process 
may die before all buffered data has been read, meaning EOF is never delivered 
and
     _on_socket_closed is never called before the file handle is already closed.
   
     **Flaw 2 — task-sdk/src/airflow/sdk/execution_time/supervisor.py, 
process_log_messages_from_subprocess**
   
     Current code (main branch, also 3.1.8):
   
     _def process_log_messages_from_subprocess(
         loggers: tuple[FilteringBoundLogger, ...],
     ) -> Generator[None, bytes | bytearray, None]:
         ...
         while True:
             line = yield
             ...
             if level := NAME_TO_LEVEL.get(event.pop("level")):
                 msg = event.pop("event", None)
                 for target in loggers:
                     target.log(level, msg, **event)   # ← no guard against 
ValueError
                                                       #   from a closed 
BytesLogger_
   
     No protection exists against ValueError raised when the underlying 
BytesLogger file handle has been closed. The exception propagates all the way 
through the supervisor call stack and kills the Job.
   
     **Full Stacktrace:**
   
     Traceback (most recent call last):
       File 
"/usr/local/miniconda/lib/python3.10/site-packages/airflow/jobs/dag_processor_job_runner.py",
 line 61, in _execute
         self.processor.run()
       File 
"/usr/local/miniconda/lib/python3.10/site-packages/airflow/dag_processing/manager.py",
 line 297, in run
         return self._run_parsing_loop()
       File 
"/usr/local/miniconda/lib/python3.10/site-packages/airflow/dag_processing/manager.py",
 line 389, in _run_parsing_loop
         self._refresh_dag_bundles(known_files=known_files)
       File 
"/usr/local/miniconda/lib/python3.10/site-packages/airflow/dag_processing/manager.py",
 line 637, in _refresh_dag_bundles
         self.handle_removed_files(known_files=known_files)
       File 
"/usr/local/miniconda/lib/python3.10/site-packages/airflow/dag_processing/manager.py",
 line 836, in handle_removed_files
         self.terminate_orphan_processes(present=files_set)
       File 
"/usr/local/miniconda/lib/python3.10/site-packages/airflow/dag_processing/manager.py",
 line 861, in terminate_orphan_processes
         processor.kill(signal.SIGKILL)
       File 
"/usr/local/miniconda/lib/python3.10/site-packages/airflow/sdk/execution_time/supervisor.py",
 line 706, in kill
         exit_code := self._service_subprocess(
       File 
"/usr/local/miniconda/lib/python3.10/site-packages/airflow/sdk/execution_time/supervisor.py",
 line 773, in _service_subprocess
         need_more = socket_handler(key.fileobj)
       File 
"/usr/local/miniconda/lib/python3.10/site-packages/airflow/sdk/execution_time/supervisor.py",
 line 1723, in cb
         gen.send(line)
       File 
"/usr/local/miniconda/lib/python3.10/site-packages/airflow/sdk/execution_time/supervisor.py",
 line 1806, in process_log_messages_from_subprocess
         target.log(level, msg, **event)
       File 
"/usr/local/miniconda/lib/python3.10/site-packages/structlog/_native.py", line 
197, in log
         return self._proxy_to_logger(
       File 
"/usr/local/miniconda/lib/python3.10/site-packages/structlog/_base.py", line 
223, in _proxy_to_logger
         return getattr(self._logger, method_name)(*args, **kw)
       File 
"/usr/local/miniconda/lib/python3.10/site-packages/structlog/_output.py", line 
321, in msg
         self._write(message + b"\n")
     ValueError: write to closed file
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to