pcolladosoto commented on issue #22191:
URL: https://github.com/apache/airflow/issues/22191#issuecomment-1074273533
# Trying to explain things...
Our team has run into this issue time and time again. We have tried
different combinations of both Airflow and Python versions to no avail.
## TL;DR
When a `DAGFileProcessor` hangs and is killed due to a timeout we believe
the `self.waitables` and `self._processors` attributes of the
`DAGFileProcessorManager` are not being updated as they should. This causes an
unhandled exception when trying to receive data on a pipe end (i.e. file
descriptor) which has already been closed.
## The long read...
We're running a decouple Airflow deployment within a k8s cluster. We are
currently using a 3-container *pod* where one of them runs the *Web Server*,
another one executes the *Scheduler* and the third one implements *Flower*
(we're using the *CeleryExecutor*). The backbone of the deployment is
implemented through a *StatefulSet* that runs the Celery executors themselves.
The trace we were seeing on the scheduler time and time again was:
```
Process ForkServerProcess-1:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/multiprocessing/process.py", line 297, in
_bootstrap
self.run()
File "/usr/local/lib/python3.7/multiprocessing/process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/dag_processing/manager.py",
line 370, in _run_processor_manager
processor_manager.start()
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/dag_processing/manager.py",
line 610, in start
return self._run_parsing_loop()
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/dag_processing/manager.py",
line 671, in _run_parsing_loop
self._collect_results_from_processor(processor)
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/dag_processing/manager.py",
line 981, in _collect_results_from_processor
if processor.result is not None:
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/dag_processing/processor.py",
line 321, in result
if not self.done:
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/dag_processing/processor.py",
line 286, in done
if self._parent_channel.poll():
File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 255,
in poll
self._check_closed()
File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 136,
in _check_closed
raise OSError("handle is closed")
OSError: handle is closed
```
This has been thrown by Airflow 2.1.3, but we've seen very similar (if not
equal) variations with versions all the way up to Airflow 2.2.4.
Given we traced the problem down to the way multiprocessing synchronisation
was being handled we played around with `multiprocessing`'s [*start
method*](https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods)
through the `mp_start_method` configuration parameter which wasn't included in
the stock configuration example:
https://github.com/apache/airflow/blob/f309ea78f7d8b62383bc41eac217681a0916382b/airflow/utils/mixins.py#L27-L38
The containers we are using leverage `fork` as the default way of creating
new process. After trying that one out we moved on to using `spawn` and ended
up settling on `forkserver`. No matter the *start method* we leveraged, we ran
into the same issue over and over again.
For a while we coped with this behaviour by just restarting the Airflow
deployment on an hourly basis, but we decided to set some time apart today to
delve a bit deeper into all this. The good news is after a thorough
investigation we noticed a pattern the preceded the crash.
In order to pin it down down ran
[`ps(1)`](https://www.man7.org/linux/man-pages/man1/ps.1.html) on the scheduler
container. We also monitored the *DAG Processor Manager* log (which we have at
`/opt/airflow/logs/dag_processor_manager/dag_processor_manager.log` given our
Airflow home is `/opt/airflow`) and we took a look at the scheduler's log
through `kubectl logs` given it's sent to *stdout/stderr*. The pattern itself
goes something like:
1. A `DAGFileProcessor` get's stuck for longer than
`dag_file_processor_timeout` as seen on `ps`' output.
2. As soon as the timeout is exceeded, the `DAGFileProcessorManager` kills
the stuck `DAGFileProcessor`.
3. When the `DAGFileProcessorManager` tries to collect results back from the
different `DAGFileProcessor`s it crashes.
The above led us to believe something was a bit off in the way the
`DAGFileProcessor`s were being killed. Given our Docker-based deployment
allowed for it, we retrieved a copy of the stock
[`manager.py`](https://github.com/apache/airflow/blob/v2-1-stable/airflow/dag_processing/manager.py)
and
[`processor.py`](https://github.com/apache/airflow/blob/v2-1-stable/airflow/dag_processing/processor.py)
files and added a bit of logging through `self.log.debug()`. The following
appeared on out `DAGFileProcessorManager` log:
```
[2022-03-21 13:01:00,747] {manager.py:1163} DEBUG - FOO - Looking for DAG
processors to terminate due to timeouts!
[2022-03-21 13:01:00,748] {manager.py:1172} ERROR - Processor for
/opt/airflow/dags/dags-airflow/spark_streaming.py with PID 965 started at
2022-03-21T13:00:10.536124+00:00 has timed out, killing it.
[2022-03-21 13:01:00,748] {manager.py:1178} DEBUG - FOO - # of waitables
BEFORE killing timed out processor: 2
[2022-03-21 13:01:00,748] {manager.py:1180} DEBUG - FOO - # of waitables
AFTER killing timed out processor: 2
[2022-03-21 13:01:00,748] {manager.py:1013} DEBUG - 1/1 DAG parsing
processes running
[2022-03-21 13:01:00,748] {manager.py:1015} DEBUG - 2 file paths queued for
processing
[2022-03-21 13:01:00,758] {manager.py:978} DEBUG - Processor for
/opt/airflow/dags/dags-airflow/spark_streaming.py finished
[2022-03-21 13:01:00,758] {manager.py:982} DEBUG - FOO - Trying to get
result for processor with PID: 965
```
Can you see how the number of `waitables` (more on that later) doesn't
change even though we're killing a `DAGFileProcessor`? We believe that's what's
causing the trouble...
Note we added the `- FOO -` token to the logging entries we added to easily
`grep` for them. These entries were generated with calls to `self.log.debug()`
within the `_kill_timed_out_processors()` function. The 'stock' version is:
https://github.com/apache/airflow/blob/v2-1-stable/airflow/dag_processing/manager.py#L1159-L1175
After we added additional logging it looked like:
```python
def _kill_timed_out_processors(self):
"""Kill any file processors that timeout to defend against process
hangs."""
self.log.debug("FOO - Looking for DAG processors to terminate due to
timeouts!")
now = timezone.utcnow()
processors_to_remove = []
for file_path, processor in self._processors.items():
duration = now - processor.start_time
if duration > self._processor_timeout:
self.log.error(
"Processor for %s with PID %s started at %s has timed out,
killing it.",
file_path,
processor.pid,
processor.start_time.isoformat(),
)
Stats.decr('dag_processing.processes')
Stats.incr('dag_processing.processor_timeouts')
# TODO: Remove after Airflow 2.0
Stats.incr('dag_file_processor_timeouts')
self.log.debug(f"FOO - # of waitables BEFORE killing timed out
processor: {len(self.waitables)}")
processor.kill()
self.log.debug(f"FOO - # of waitables AFTER killing timed out
processor: {len(self.waitables)}")
```
You can see how we call the `kill()` (which basically wraps the processor's
`_kill_process()` method) method on the timed out processor. We believe the key
of all this resides on `line 246` of:
https://github.com/apache/airflow/blob/v2-1-stable/airflow/dag_processing/processor.py#L238-L246
Notice how the end of the communicating pipe opened on `line 187` is being
closed here:
https://github.com/apache/airflow/blob/v2-1-stable/airflow/dag_processing/processor.py#L187
That's exactly the same pipe end (i.e. file descriptor) the
`DAGFileProcessorManager` tries to read from later on! If we look at the
traceback we included before it's `line 286` which ultimately triggers the
`OSError`:
https://github.com/apache/airflow/blob/v2-1-stable/airflow/dag_processing/processor.py#L286
What it's trying to do is `poll()` a closed pipe. If we take a look at
`multiprocessing`'s implementation we'll check how, as shown on the traceback,
it calls `_check_closed()` on the pipe's file descriptor (i.e. handle) before
proceeding: this, as seen before, triggers the `OSError`.
So... why are we trying to collect results from a `DAGFileProcessor` we
killed due to a timeout? in order to answer that we took a walk around
`_run_parsing_loop()`:
https://github.com/apache/airflow/blob/v2-1-stable/airflow/dag_processing/manager.py#L612-L734
It basically runs an infinite (unless we specify a maximum number of runs)
loop that calls
[`multiprocessing.connection.wait`](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.connection.wait)
based on the contents of `self.waitables`. This attribute is a dictionary
containing a reference to the different `DAGFileProcessor`'s spawned by the
`DAGFileProcessorManager`. Entries are added on `line 1034` within
`start_new_process()`:
https://github.com/apache/airflow/blob/v2-1-stable/airflow/dag_processing/manager.py#L1015-L1034
However, this dictionary **is not** updated when a processor is killed due
to a timeout. You can check that out on the snippet we included above. Thus,
after the timed out `DAGFileProcessor` is killed, the infinite loop on
`_run_parsing_loop()` will assume the underlying process is ready (it's done,
as we've effectively terminated it) and will try to read from the pipe end we
closed on `_kill_process()`, thus triggering the exception and bringing
everything down. In other words, we believe the `self.waitables` attribute is
not being updated as it should when `DAGFileProcessor`s are terminated due to
timeouts. The same is true for the `self._processors` attribute on the
`DAGFileProcessorManager`. After all, `_kill_timed_out_processors()` iterates
over its contents... If we don't update it too we'll see how we try to kill an
already terminated process over and over again.
After some testing we arrived at the following implementation of
`_kill_timed_out_processors()`:
```python
def _kill_timed_out_processors(self):
"""Kill any file processors that timeout to defend against process
hangs."""
self.log.debug("FOO - ** Entering _kill_timed_out_processors() **")
# We'll get a clear picture of what these two attributes look like before
# killing anything.
self.log.debug(f"FOO - We'll iterate over: {self._processors}")
self.log.debug(f"FOO - Current waitables dir: {self.waitables}")
now = timezone.utcnow()
processors_to_remove = []
for file_path, processor in self._processors.items():
duration = now - processor.start_time
if duration > self._processor_timeout:
self.log.error(
"Processor for %s with PID %s started at %s has timed out,
killing it.",
file_path,
processor.pid,
processor.start_time.isoformat(),
)
Stats.decr('dag_processing.processes')
Stats.incr('dag_processing.processor_timeouts')
# TODO: Remove after Airflow 2.0
Stats.incr('dag_file_processor_timeouts')
# Add some logging to check stuff out
self.log.debug(f"FOO - # of waitables BEFORE killing timed out
processor: {len(self.waitables)}")
self.log.debug(f"FOO - We'll iterate over: {self._processors}")
self.log.debug(f"FOO - Current waitables dir: {self.waitables}")
# Kill the hanged processor
processor.kill()
# Update self.waitables to avoid asking for results later on
self.waitables.pop(processor.waitable_handle)
# Make a note of the file_paths we are to remove later on: we
feel a bit uneasy about modifying the
# container we're currently iterating over...
processors_to_remove.append(file_path)
# Add some logging to check how stuff is doing...
self.log.debug(f"FOO - # of waitables AFTER killing timed out
processor: {len(self.waitables)}")
self.log.debug(f"FOO - We'll iterate over: {self._processors}")
self.log.debug(f"FOO - Current waitables dir: {self.waitables}")
# Clean up `self._processors` too!
for proc in processors_to_remove:
self._processors.pop(proc)
# And after we're done take a look at the final state
self.log.debug(f"FOO - Processors after cleanup: {self._processors}")
self.log.debug(f"FOO - Waitables after cleanup: {self.waitables}")
self.log.debug(f"FOO - ** Leaving _kill_timed_out_processors() with **")
```
We know the above can surely be written in a more succinct/better way: we're
by no means good programmers!
Against all odds, the code above seems to prevent the crash! :tada: It does,
however, spawn zombies when we kill the `DAGFileProcessor`: it's not being
`wait()`ed for...
We decided to also play around with the `DAGFileProcessor`'s
`_kill_process()` method a bit in the name of science to try and prevent that
zombie from spawning:
```python
def _kill_process(self) -> None:
if self._process is None:
raise AirflowException("Tried to kill process before starting!")
if self._process.is_alive() and self._process.pid:
self.log.warning("Killing DAGFileProcessorProcess (PID=%d)",
self._process.pid)
os.kill(self._process.pid, signal.SIGKILL)
# Reap the resulting zombie! Note the call to `waitpid()` blocks
unless we
# leverage the `WNOHANG`
(https://docs.python.org/3/library/os.html#os.WNOHANG)
# option. Given we were just playing around we decided not to
bother with that...
self.warning(f"FOO - Waiting to reap the zombie spawned from PID
{self._process.pid}")
os.waitpid(self._process.pid)
self.warning(f"FOO - Reaped the zombie spawned from PID
{self._process.pid}")
if self._parent_channel:
self._parent_channel.close()
```
From what we could see, the above reaped the zombie like we initially
expected it to.
So, after all this nonsense we just wanted to end up by saying that we
believe it's the way `DAGFileManagerProcessor`'s attributes are being cleaned
up that crashes Airflow for us. In our experience this is triggered by a
`DAGFileProcessor` being forcefully terminated after a timeout.
We would also like to thank everybody making Airflow possible: it's one heck
of a tool!
Feel free to ask for more details and, if we got anything wrong (it wouldn't
be the first time), please do let us know!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]