I think you the behavior you want may be exactly what will happen provided
that you setup your sensors to retry when they fail.

Let me describe a play-by-play of how things should happen
* sensor is running, doing its sensor thing (for this sake let's say it's
waiting for a file in HDFS to show up), note that it has to be a
"stateless" sensor to work well here, the filename it looks up is "static"
in this example, if the task restarts, it will look at the exact same
* the worker on which this sensor runs goes dark, let's say it's rebooted
* task instance is still marked as running as far as the state of airflow,
but no hearbeats for that task instance are emmited / recorded anymore
* the scheduler's process monitors for heartbeats, and sees that task
instance X has not emitted a heartbeat in the required threshold amount of
time, it decides to fail the task, which triggers the normal post-failure
hooks and email alerting if the task is setup that way
* assuming the task was setup with let's say 5 retries task becomes in an
up_for_retry state, with a retry count of 0
* scheduler re-fires the sensor, cranks up the count of retry to 1 and the
task is running again
* sensor wakes up every retry_interval, checks if the criteria is met
* sensor finds the criteria is met, task instance status is set to success
* downstream tasks fire

If your sensor isn't stateless by nature (let's say it gets a jobid from a
remote service when it starts, and hold it in memory to make periodic calls
to track completion), somehow you have to make stateless. Maybe using XCom
to put the persisted state in the Airflow db, or perhaps some sort of
contract like a predictable, atomic signal for the task to check.


On Mon, Oct 17, 2016 at 3:24 AM, Michal K <postratio...@yahoo.com.invalid>

> Hello Everyone,
> our company is currently considering Airflow for the engine of a pretty
> big workflow system.We use Python 3 and have some HA requirements, so we
> plan to use Celery with multiple worker and scheduler nodes and RabbitMQ as
> message bus. We are also using Docker containers for deployment.
> We have a specific case, which we need to handle in our workflows. Some
> tasks, which we trigger are very long lived (many hours long). These long
> tasks are carried out by external systems, independent of the Airflow
> nodes. In our workflow need to start, then monitor these external tasks and
> trigger downstream operators based on their results.
> This seems like a perfect job for Airflow sensors. Here however, we run
> into a small problem. In our testing it seems not possible to stop an
> Airflow worker while a sensor is running. The sensor continues to run
> during a Celery warm shutdown and it's background process remains active
> even after a cold shutdown. This prevents us from being able to safely
> destroy the worker's Docker container.
> Since at least one of the external tasks we need to monitor is practically
> always running, we would not be able to stop our Airflow workers in order
> to deploy code changes to our DAGs or the libraries they depend on.
> It would be perfect for our case to have the ability to suspend an Airflow
> sensor while shutting down the worker and resume it when the worker
> restarts.
> I was wondering how this could be implemented in Airflow and I came up
> with this initial idea:https://github.com/postrational/incubator-
> airflow/commit/74ac6f7290d6838362ff437e228465bb49fe198f
> The code adds a signal handler to the BaseSensorOperator, which raises an
> exception if it detects that the worker is shutting down (SIGINT detected).
> Later on it handles the exception in the 'run' method of the TaskInstance.
> The idea is to put the sensor in a state which would cause it to be cleanly
> resumed after Airflow comes back up after a restart.
> So far, my draft code works some of the time, but not always. Sometimes
> the sensor resumes correctly, but sometimes it doesn't trigger its
> downstream operator and the whole DAG run is marked as "failed".
> I noticed the following line in the sensor's logs:[2016-10-13
> 19:15:17,033] {jobs.py:1976} WARNING - State of this instance has been
> externally set to None. Taking the poison pill. So long.
> I would like to ask you how to properly mark an operator to be restarted
> cleanly.
> Before I proceed any further I would also like to ask for your opinion of
> this approach. How do you handle long running sensors? Do you use Docker
> with Airflow?
> Any advice would be greatly appreciated. Thanks,
> Michal

Reply via email to