Hello Everyone,
our company is currently considering Airflow for the engine of a pretty big 
workflow system.We use Python 3 and have some HA requirements, so we plan to 
use Celery with multiple worker and scheduler nodes and RabbitMQ as message 
bus. We are also using Docker containers for deployment.
We have a specific case, which we need to handle in our workflows. Some tasks, 
which we trigger are very long lived (many hours long). These long tasks are 
carried out by external systems, independent of the Airflow nodes. In our 
workflow need to start, then monitor these external tasks and trigger 
downstream operators based on their results.
This seems like a perfect job for Airflow sensors. Here however, we run into a 
small problem. In our testing it seems not possible to stop an Airflow worker 
while a sensor is running. The sensor continues to run during a Celery warm 
shutdown and it's background process remains active even after a cold shutdown. 
This prevents us from being able to safely destroy the worker's Docker 
Since at least one of the external tasks we need to monitor is practically 
always running, we would not be able to stop our Airflow workers in order to 
deploy code changes to our DAGs or the libraries they depend on.
It would be perfect for our case to have the ability to suspend an Airflow 
sensor while shutting down the worker and resume it when the worker restarts.
I was wondering how this could be implemented in Airflow and I came up with 
this initial 
The code adds a signal handler to the BaseSensorOperator, which raises an 
exception if it detects that the worker is shutting down (SIGINT detected).
Later on it handles the exception in the 'run' method of the TaskInstance. The 
idea is to put the sensor in a state which would cause it to be cleanly resumed 
after Airflow comes back up after a restart.
So far, my draft code works some of the time, but not always. Sometimes the 
sensor resumes correctly, but sometimes it doesn't trigger its downstream 
operator and the whole DAG run is marked as "failed".
I noticed the following line in the sensor's logs:[2016-10-13 19:15:17,033] 
{jobs.py:1976} WARNING - State of this instance has been externally set to 
None. Taking the poison pill. So long.
I would like to ask you how to properly mark an operator to be restarted 
Before I proceed any further I would also like to ask for your opinion of this 
approach. How do you handle long running sensors? Do you use Docker with 
Any advice would be greatly appreciated. Thanks,

Reply via email to