This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new c803a27  Prevent race condition in trying to collect result from 
DagFileProcessor (#11306)
c803a27 is described below

commit c803a27e44da071b5a2c7fc2ce2a7951501753c1
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Tue Oct 6 16:39:00 2020 +0100

    Prevent race condition in trying to collect result from DagFileProcessor 
(#11306)
    
    A rare race condition was noticed in the Scheduler HA PR where the
    "test_dags_with_system_exit" test would occasionally fail with the
    following symptoms:
    
    - The pipe was "readable" as returned by
      `multiprocessing.connection.wait`
    - On reading it yielded an EOFError, meaning the other side had closed
      the connection
    - But the process was still alive/running
    
    This previously would result in the Manager process dying with an error.
    
    This PR makes a few changes:
    
    - It ensures that the pipe is simplex, not duplex (we only ever send one
      data) as this is simpler
    - We ensure that the "other" end of the pipe is correctly closed in both
      parent and child processes. Without this the pipe would be kept open
      (sometimes) until the child process had closed anyway.
    - When we get an EOFError on reading and the process is still alive, we
      give it a few seconds to shut down cleanly, and then kill it.
---
 airflow/jobs/scheduler_job.py | 38 +++++++++++++++++++++++++++++++++++---
 1 file changed, 35 insertions(+), 3 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index e0a9f09..ef593f6 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -120,6 +120,7 @@ class 
DagFileProcessorProcess(AbstractDagFileProcessorProcess, LoggingMixin, Mul
     @staticmethod
     def _run_file_processor(
         result_channel: MultiprocessingConnection,
+        parent_channel: MultiprocessingConnection,
         file_path: str,
         pickle_dags: bool,
         dag_ids: Optional[List[str]],
@@ -131,6 +132,8 @@ class 
DagFileProcessorProcess(AbstractDagFileProcessorProcess, LoggingMixin, Mul
 
         :param result_channel: the connection to use for passing back the 
result
         :type result_channel: multiprocessing.Connection
+        :param parent_channel: the parent end of the channel to close in the 
child
+        :type result_channel: multiprocessing.Connection
         :param file_path: the file to process
         :type file_path: str
         :param pickle_dags: whether to pickle the DAGs found in the file and
@@ -149,6 +152,13 @@ class 
DagFileProcessorProcess(AbstractDagFileProcessorProcess, LoggingMixin, Mul
         # This helper runs in the newly created process
         log: logging.Logger = logging.getLogger("airflow.processor")
 
+        # Since we share all open FDs from the parent, we need to close the 
parent side of the pipe here in
+        # the child, else it won't get closed properly until we exit.
+        log.info("Closing parent pipe")
+
+        parent_channel.close()
+        del parent_channel
+
         set_context(log, file_path)
         setproctitle("airflow scheduler - DagFileProcessor 
{}".format(file_path))
 
@@ -183,11 +193,12 @@ class 
DagFileProcessorProcess(AbstractDagFileProcessorProcess, LoggingMixin, Mul
             log.exception("Got an exception! Propagating...")
             raise
         finally:
-            result_channel.close()
             # We re-initialized the ORM within this Process above so we need to
             # tear it down manually here
             settings.dispose_orm()
 
+            result_channel.close()
+
     def start(self) -> None:
         """
         Launch the process and start processing the DAG.
@@ -195,11 +206,12 @@ class 
DagFileProcessorProcess(AbstractDagFileProcessorProcess, LoggingMixin, Mul
         start_method = self._get_multiprocessing_start_method()
         context = multiprocessing.get_context(start_method)
 
-        self._parent_channel, _child_channel = context.Pipe()
+        _parent_channel, _child_channel = context.Pipe(duplex=False)
         process = context.Process(
             target=type(self)._run_file_processor,
             args=(
                 _child_channel,
+                _parent_channel,
                 self.file_path,
                 self._pickle_dags,
                 self._dag_ids,
@@ -212,6 +224,15 @@ class 
DagFileProcessorProcess(AbstractDagFileProcessorProcess, LoggingMixin, Mul
         self._start_time = timezone.utcnow()
         process.start()
 
+        # Close the child side of the pipe now the subprocess has started -- 
otherwise this would prevent it
+        # from closing in some cases
+        _child_channel.close()
+        del _child_channel
+
+        # Don't store it on self until after we've started the child process - 
we don't want to keep it from
+        # getting GCd/closed
+        self._parent_channel = _parent_channel
+
     def kill(self) -> None:
         """
         Kill the process launched to process the file, and ensure consistent 
state.
@@ -245,6 +266,8 @@ class 
DagFileProcessorProcess(AbstractDagFileProcessorProcess, LoggingMixin, Mul
         if self._process.is_alive() and self._process.pid:
             self.log.warning("Killing DAGFileProcessorProcess (PID=%d)", 
self._process.pid)
             os.kill(self._process.pid, signal.SIGKILL)
+        if self._parent_channel:
+            self._parent_channel.close()
 
     @property
     def pid(self) -> int:
@@ -293,7 +316,16 @@ class 
DagFileProcessorProcess(AbstractDagFileProcessorProcess, LoggingMixin, Mul
                 self._parent_channel.close()
                 return True
             except EOFError:
-                pass
+                # If we get an EOFError, it means the child end of the pipe 
has been closed. This only happens
+                # in the finally block. But due to a possible race condition, 
the process may have not yet
+                # terminated (it could be doing cleanup/python shutdown 
still). So we kill it here after a
+                # "suitable" timeout.
+                self._done = True
+                # Arbitrary timeout -- error/race condition only, so this 
doesn't need to be tunable.
+                self._process.join(timeout=5)
+                if self._process.is_alive():
+                    # Didn't shut down cleanly - kill it
+                    self._kill_process()
 
         if not self._process.is_alive():
             self._done = True

Reply via email to