Hi Michal, I took a quick look at your patch, does your `airflow run` process kill itself after it receives a SIGINT? I noticed it catches the `AirflowRetryException` but doesn't re-raise it.
Otherwise, does the log line ("WARNING - State of this instance has been externally set to None. Taking the poison pill. So long.") come from a process from before you restart airflow or after? > The sensor continues to run during a Celery warm shutdown and it's background > process remains active even after a cold shutdown. I looked into this before and we inherited this behavior from Celery (http://docs.celeryproject.org/en/latest/userguide/workers.html#stopping-the-worker). With a long-running sensor what will happen is that Celery receives a SIGTERM and waits for the task (sensor) to finish. Celery by default will wait forever. I have an open PR to change this (https://github.com/apache/incubator-airflow/pull/1732) which might do what you want, but it seems some people rely on the existing behavior. Regards, Xuanji On 17 October 2016 at 11:54, Maxime Beauchemin <maximebeauche...@gmail.com> wrote: > I think you the behavior you want may be exactly what will happen provided > that you setup your sensors to retry when they fail. > > Let me describe a play-by-play of how things should happen > * sensor is running, doing its sensor thing (for this sake let's say it's > waiting for a file in HDFS to show up), note that it has to be a > "stateless" sensor to work well here, the filename it looks up is "static" > in this example, if the task restarts, it will look at the exact same > filename. > * the worker on which this sensor runs goes dark, let's say it's rebooted > * task instance is still marked as running as far as the state of airflow, > but no hearbeats for that task instance are emmited / recorded anymore > * the scheduler's process monitors for heartbeats, and sees that task > instance X has not emitted a heartbeat in the required threshold amount of > time, it decides to fail the task, which triggers the normal post-failure > hooks and email alerting if the task is setup that way > * assuming the task was setup with let's say 5 retries task becomes in an > up_for_retry state, with a retry count of 0 > * scheduler re-fires the sensor, cranks up the count of retry to 1 and the > task is running again > * sensor wakes up every retry_interval, checks if the criteria is met > * sensor finds the criteria is met, task instance status is set to success > * downstream tasks fire > > If your sensor isn't stateless by nature (let's say it gets a jobid from a > remote service when it starts, and hold it in memory to make periodic calls > to track completion), somehow you have to make stateless. Maybe using XCom > to put the persisted state in the Airflow db, or perhaps some sort of > contract like a predictable, atomic signal for the task to check. > > Max > > On Mon, Oct 17, 2016 at 3:24 AM, Michal K <postratio...@yahoo.com.invalid> > wrote: > >> Hello Everyone, >> our company is currently considering Airflow for the engine of a pretty >> big workflow system.We use Python 3 and have some HA requirements, so we >> plan to use Celery with multiple worker and scheduler nodes and RabbitMQ as >> message bus. We are also using Docker containers for deployment. >> We have a specific case, which we need to handle in our workflows. Some >> tasks, which we trigger are very long lived (many hours long). These long >> tasks are carried out by external systems, independent of the Airflow >> nodes. In our workflow need to start, then monitor these external tasks and >> trigger downstream operators based on their results. >> This seems like a perfect job for Airflow sensors. Here however, we run >> into a small problem. In our testing it seems not possible to stop an >> Airflow worker while a sensor is running. The sensor continues to run >> during a Celery warm shutdown and it's background process remains active >> even after a cold shutdown. This prevents us from being able to safely >> destroy the worker's Docker container. >> Since at least one of the external tasks we need to monitor is practically >> always running, we would not be able to stop our Airflow workers in order >> to deploy code changes to our DAGs or the libraries they depend on. >> It would be perfect for our case to have the ability to suspend an Airflow >> sensor while shutting down the worker and resume it when the worker >> restarts. >> I was wondering how this could be implemented in Airflow and I came up >> with this initial idea:https://github.com/postrational/incubator- >> airflow/commit/74ac6f7290d6838362ff437e228465bb49fe198f >> The code adds a signal handler to the BaseSensorOperator, which raises an >> exception if it detects that the worker is shutting down (SIGINT detected). >> Later on it handles the exception in the 'run' method of the TaskInstance. >> The idea is to put the sensor in a state which would cause it to be cleanly >> resumed after Airflow comes back up after a restart. >> So far, my draft code works some of the time, but not always. Sometimes >> the sensor resumes correctly, but sometimes it doesn't trigger its >> downstream operator and the whole DAG run is marked as "failed". >> I noticed the following line in the sensor's logs:[2016-10-13 >> 19:15:17,033] {jobs.py:1976} WARNING - State of this instance has been >> externally set to None. Taking the poison pill. So long. >> I would like to ask you how to properly mark an operator to be restarted >> cleanly. >> Before I proceed any further I would also like to ask for your opinion of >> this approach. How do you handle long running sensors? Do you use Docker >> with Airflow? >> Any advice would be greatly appreciated. Thanks, >> Michal -- Im Xuan Ji!