Sorry, I meant to say we check the state of FAILED and not UPSTREAM_FAILED
tasks, which we'd set to UP_FOR_RETRY.

Thanks.

Regards,
David

On Mon, Oct 23, 2017 at 10:30 AM, David Klosowski <dav...@thinknear.com>
wrote:

> Hey Airflow Devs,
>
> We've noticed that the on_retry_callback on the SubDagOperator seems to be
> acting differently in version 1.9.0 from 1.8.x.  What's more interesting,
> it's only happening in one environment and not another making it hard to
> determine the issue since that environment that has unexpected behavior is
> our production environment.
>
> We had custom logic in our on_retry_callback and prevent the subtasks
> themselves from managing their retries, allowing the subdag to act as the
> retry manager.
>
> Before we'd check the state of of UPSTREAM_FAILED tasks setting them to
> UP_FOR_RETRY.  We set the task_instances to have 0 retries and set the
> retries on the SubDAG (3 in this case).  In our test environment if there
> is a failure, the on_retry_callback successfully resets the state of all
> task_instances, so we can fully retry appropriately, so they all go to
> UP_FOR_RETRY.  In our production environment, the task_instances go from
> FAILED -> UP_FOR_RETRY and then back to FAILED and they won't retry.  It's
> unclear why they go back to FAILED.
>
> The logic in the retry handler is as follows:
>
>     logging.info('Setting all task instances of subdag: {subdag} for
> time: {execution_date} to UP_FOR_RETRY'
>                  .format(subdag=dag_id, execution_date=execution_date))
>     task_instances = session.query(TaskInstance).filter(TaskInstance.dag_id
> == dag_id,
>
> TaskInstance.execution_date == execution_date)
>     for task_instance in task_instances:
>         logging.info("Setting task %s from %s to UP_FOR_RETRY",
> task_instance.task_id, task_instance.state)
>         task_instance.state = State.UP_FOR_RETRY
>
>     if task_instances:
>         session.commit()
>         task_instances = session.query(TaskInstance). \
>             filter(TaskInstance.dag_id == dag_id,
>                    TaskInstance.execution_date == execution_date)
>         for task_instance in task_instances:
>             logging.info("After update found task (%s) with state (%s)",
> task_instance.task_id, task_instance.state)
>
> Both environments are using the CeleryExecutor while our production
> environment is using 3 workers and our test is using 1.  We're using the
> the v1-9-test branch of Airflow in a docker container environment, so the
> code in our dev environment is exactly the same as our production one.
>
> I verified that the permissions and schemas in both databases (dev &
> production) are the same by dumping them and comparing them.
>
> Any thoughts on what might cause the difference in behavior and why
> the on_retry_callback isn't working in our production environment or what
> we might look at?  My next step was to add some workers in our dev
> environment and see if there is any issue with the number of workers.
>
> Thanks.
>
> Regards,
> David
>
>

Reply via email to