Hello Everyone,
our company is currently considering Airflow for the engine of a pretty big
workflow system.We use Python 3 and have some HA requirements, so we plan to
use Celery with multiple worker and scheduler nodes and RabbitMQ as message
bus. We are also using Docker containers for deployment.
We have a specific case, which we need to handle in our workflows. Some tasks,
which we trigger are very long lived (many hours long). These long tasks are
carried out by external systems, independent of the Airflow nodes. In our
workflow need to start, then monitor these external tasks and trigger
downstream operators based on their results.
This seems like a perfect job for Airflow sensors. Here however, we run into a
small problem. In our testing it seems not possible to stop an Airflow worker
while a sensor is running. The sensor continues to run during a Celery warm
shutdown and it's background process remains active even after a cold shutdown.
This prevents us from being able to safely destroy the worker's Docker
container.
Since at least one of the external tasks we need to monitor is practically
always running, we would not be able to stop our Airflow workers in order to
deploy code changes to our DAGs or the libraries they depend on.
It would be perfect for our case to have the ability to suspend an Airflow
sensor while shutting down the worker and resume it when the worker restarts.
I was wondering how this could be implemented in Airflow and I came up with
this initial
idea:https://github.com/postrational/incubator-airflow/commit/74ac6f7290d6838362ff437e228465bb49fe198f
The code adds a signal handler to the BaseSensorOperator, which raises an
exception if it detects that the worker is shutting down (SIGINT detected).
Later on it handles the exception in the 'run' method of the TaskInstance. The
idea is to put the sensor in a state which would cause it to be cleanly resumed
after Airflow comes back up after a restart.
So far, my draft code works some of the time, but not always. Sometimes the
sensor resumes correctly, but sometimes it doesn't trigger its downstream
operator and the whole DAG run is marked as "failed".
I noticed the following line in the sensor's logs:[2016-10-13 19:15:17,033]
{jobs.py:1976} WARNING - State of this instance has been externally set to
None. Taking the poison pill. So long.
I would like to ask you how to properly mark an operator to be restarted
cleanly.
Before I proceed any further I would also like to ask for your opinion of this
approach. How do you handle long running sensors? Do you use Docker with
Airflow?
Any advice would be greatly appreciated. Thanks,
Michal