This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new c803a27 Prevent race condition in trying to collect result from
DagFileProcessor (#11306)
c803a27 is described below
commit c803a27e44da071b5a2c7fc2ce2a7951501753c1
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Tue Oct 6 16:39:00 2020 +0100
Prevent race condition in trying to collect result from DagFileProcessor
(#11306)
A rare race condition was noticed in the Scheduler HA PR where the
"test_dags_with_system_exit" test would occasionally fail with the
following symptoms:
- The pipe was "readable" as returned by
`multiprocessing.connection.wait`
- On reading it yielded an EOFError, meaning the other side had closed
the connection
- But the process was still alive/running
This previously would result in the Manager process dying with an error.
This PR makes a few changes:
- It ensures that the pipe is simplex, not duplex (we only ever send one
data) as this is simpler
- We ensure that the "other" end of the pipe is correctly closed in both
parent and child processes. Without this the pipe would be kept open
(sometimes) until the child process had closed anyway.
- When we get an EOFError on reading and the process is still alive, we
give it a few seconds to shut down cleanly, and then kill it.
---
airflow/jobs/scheduler_job.py | 38 +++++++++++++++++++++++++++++++++++---
1 file changed, 35 insertions(+), 3 deletions(-)
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index e0a9f09..ef593f6 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -120,6 +120,7 @@ class
DagFileProcessorProcess(AbstractDagFileProcessorProcess, LoggingMixin, Mul
@staticmethod
def _run_file_processor(
result_channel: MultiprocessingConnection,
+ parent_channel: MultiprocessingConnection,
file_path: str,
pickle_dags: bool,
dag_ids: Optional[List[str]],
@@ -131,6 +132,8 @@ class
DagFileProcessorProcess(AbstractDagFileProcessorProcess, LoggingMixin, Mul
:param result_channel: the connection to use for passing back the
result
:type result_channel: multiprocessing.Connection
+ :param parent_channel: the parent end of the channel to close in the
child
+ :type result_channel: multiprocessing.Connection
:param file_path: the file to process
:type file_path: str
:param pickle_dags: whether to pickle the DAGs found in the file and
@@ -149,6 +152,13 @@ class
DagFileProcessorProcess(AbstractDagFileProcessorProcess, LoggingMixin, Mul
# This helper runs in the newly created process
log: logging.Logger = logging.getLogger("airflow.processor")
+ # Since we share all open FDs from the parent, we need to close the
parent side of the pipe here in
+ # the child, else it won't get closed properly until we exit.
+ log.info("Closing parent pipe")
+
+ parent_channel.close()
+ del parent_channel
+
set_context(log, file_path)
setproctitle("airflow scheduler - DagFileProcessor
{}".format(file_path))
@@ -183,11 +193,12 @@ class
DagFileProcessorProcess(AbstractDagFileProcessorProcess, LoggingMixin, Mul
log.exception("Got an exception! Propagating...")
raise
finally:
- result_channel.close()
# We re-initialized the ORM within this Process above so we need to
# tear it down manually here
settings.dispose_orm()
+ result_channel.close()
+
def start(self) -> None:
"""
Launch the process and start processing the DAG.
@@ -195,11 +206,12 @@ class
DagFileProcessorProcess(AbstractDagFileProcessorProcess, LoggingMixin, Mul
start_method = self._get_multiprocessing_start_method()
context = multiprocessing.get_context(start_method)
- self._parent_channel, _child_channel = context.Pipe()
+ _parent_channel, _child_channel = context.Pipe(duplex=False)
process = context.Process(
target=type(self)._run_file_processor,
args=(
_child_channel,
+ _parent_channel,
self.file_path,
self._pickle_dags,
self._dag_ids,
@@ -212,6 +224,15 @@ class
DagFileProcessorProcess(AbstractDagFileProcessorProcess, LoggingMixin, Mul
self._start_time = timezone.utcnow()
process.start()
+ # Close the child side of the pipe now the subprocess has started --
otherwise this would prevent it
+ # from closing in some cases
+ _child_channel.close()
+ del _child_channel
+
+ # Don't store it on self until after we've started the child process -
we don't want to keep it from
+ # getting GCd/closed
+ self._parent_channel = _parent_channel
+
def kill(self) -> None:
"""
Kill the process launched to process the file, and ensure consistent
state.
@@ -245,6 +266,8 @@ class
DagFileProcessorProcess(AbstractDagFileProcessorProcess, LoggingMixin, Mul
if self._process.is_alive() and self._process.pid:
self.log.warning("Killing DAGFileProcessorProcess (PID=%d)",
self._process.pid)
os.kill(self._process.pid, signal.SIGKILL)
+ if self._parent_channel:
+ self._parent_channel.close()
@property
def pid(self) -> int:
@@ -293,7 +316,16 @@ class
DagFileProcessorProcess(AbstractDagFileProcessorProcess, LoggingMixin, Mul
self._parent_channel.close()
return True
except EOFError:
- pass
+ # If we get an EOFError, it means the child end of the pipe
has been closed. This only happens
+ # in the finally block. But due to a possible race condition,
the process may have not yet
+ # terminated (it could be doing cleanup/python shutdown
still). So we kill it here after a
+ # "suitable" timeout.
+ self._done = True
+ # Arbitrary timeout -- error/race condition only, so this
doesn't need to be tunable.
+ self._process.join(timeout=5)
+ if self._process.is_alive():
+ # Didn't shut down cleanly - kill it
+ self._kill_process()
if not self._process.is_alive():
self._done = True