Hi Michal,

I took a quick look at your patch, does your `airflow run` process
kill itself after it receives a SIGINT? I noticed it catches the
`AirflowRetryException` but doesn't re-raise it.

Otherwise, does the log line ("WARNING - State of this instance has
been externally set to None. Taking the poison pill. So long.") come
from a process from before you restart airflow or after?

> The sensor continues to run during a Celery warm shutdown and it's background 
> process remains active even after a cold shutdown.

I looked into this before and we inherited this behavior from Celery
(http://docs.celeryproject.org/en/latest/userguide/workers.html#stopping-the-worker).
With a long-running sensor what will happen is that Celery receives a
SIGTERM and waits for the task (sensor) to finish. Celery by default
will wait forever. I have an open PR to change this
(https://github.com/apache/incubator-airflow/pull/1732) which might do
what you want, but it seems some people rely on the existing behavior.

Regards,
Xuanji



On 17 October 2016 at 11:54, Maxime Beauchemin
<maximebeauche...@gmail.com> wrote:
> I think you the behavior you want may be exactly what will happen provided
> that you setup your sensors to retry when they fail.
>
> Let me describe a play-by-play of how things should happen
> * sensor is running, doing its sensor thing (for this sake let's say it's
> waiting for a file in HDFS to show up), note that it has to be a
> "stateless" sensor to work well here, the filename it looks up is "static"
> in this example, if the task restarts, it will look at the exact same
> filename.
> * the worker on which this sensor runs goes dark, let's say it's rebooted
> * task instance is still marked as running as far as the state of airflow,
> but no hearbeats for that task instance are emmited / recorded anymore
> * the scheduler's process monitors for heartbeats, and sees that task
> instance X has not emitted a heartbeat in the required threshold amount of
> time, it decides to fail the task, which triggers the normal post-failure
> hooks and email alerting if the task is setup that way
> * assuming the task was setup with let's say 5 retries task becomes in an
> up_for_retry state, with a retry count of 0
> * scheduler re-fires the sensor, cranks up the count of retry to 1 and the
> task is running again
> * sensor wakes up every retry_interval, checks if the criteria is met
> * sensor finds the criteria is met, task instance status is set to success
> * downstream tasks fire
>
> If your sensor isn't stateless by nature (let's say it gets a jobid from a
> remote service when it starts, and hold it in memory to make periodic calls
> to track completion), somehow you have to make stateless. Maybe using XCom
> to put the persisted state in the Airflow db, or perhaps some sort of
> contract like a predictable, atomic signal for the task to check.
>
> Max
>
> On Mon, Oct 17, 2016 at 3:24 AM, Michal K <postratio...@yahoo.com.invalid>
> wrote:
>
>> Hello Everyone,
>> our company is currently considering Airflow for the engine of a pretty
>> big workflow system.We use Python 3 and have some HA requirements, so we
>> plan to use Celery with multiple worker and scheduler nodes and RabbitMQ as
>> message bus. We are also using Docker containers for deployment.
>> We have a specific case, which we need to handle in our workflows. Some
>> tasks, which we trigger are very long lived (many hours long). These long
>> tasks are carried out by external systems, independent of the Airflow
>> nodes. In our workflow need to start, then monitor these external tasks and
>> trigger downstream operators based on their results.
>> This seems like a perfect job for Airflow sensors. Here however, we run
>> into a small problem. In our testing it seems not possible to stop an
>> Airflow worker while a sensor is running. The sensor continues to run
>> during a Celery warm shutdown and it's background process remains active
>> even after a cold shutdown. This prevents us from being able to safely
>> destroy the worker's Docker container.
>> Since at least one of the external tasks we need to monitor is practically
>> always running, we would not be able to stop our Airflow workers in order
>> to deploy code changes to our DAGs or the libraries they depend on.
>> It would be perfect for our case to have the ability to suspend an Airflow
>> sensor while shutting down the worker and resume it when the worker
>> restarts.
>> I was wondering how this could be implemented in Airflow and I came up
>> with this initial idea:https://github.com/postrational/incubator-
>> airflow/commit/74ac6f7290d6838362ff437e228465bb49fe198f
>> The code adds a signal handler to the BaseSensorOperator, which raises an
>> exception if it detects that the worker is shutting down (SIGINT detected).
>> Later on it handles the exception in the 'run' method of the TaskInstance.
>> The idea is to put the sensor in a state which would cause it to be cleanly
>> resumed after Airflow comes back up after a restart.
>> So far, my draft code works some of the time, but not always. Sometimes
>> the sensor resumes correctly, but sometimes it doesn't trigger its
>> downstream operator and the whole DAG run is marked as "failed".
>> I noticed the following line in the sensor's logs:[2016-10-13
>> 19:15:17,033] {jobs.py:1976} WARNING - State of this instance has been
>> externally set to None. Taking the poison pill. So long.
>> I would like to ask you how to properly mark an operator to be restarted
>> cleanly.
>> Before I proceed any further I would also like to ask for your opinion of
>> this approach. How do you handle long running sensors? Do you use Docker
>> with Airflow?
>> Any advice would be greatly appreciated. Thanks,
>> Michal



-- 
Im Xuan Ji!

Reply via email to