Hey Airflow Devs,

We've noticed that the on_retry_callback on the SubDagOperator seems to be
acting differently in version 1.9.0 from 1.8.x.  What's more interesting,
it's only happening in one environment and not another making it hard to
determine the issue since that environment that has unexpected behavior is
our production environment.

We had custom logic in our on_retry_callback and prevent the subtasks
themselves from managing their retries, allowing the subdag to act as the
retry manager.

Before we'd check the state of of UPSTREAM_FAILED tasks setting them to
UP_FOR_RETRY.  We set the task_instances to have 0 retries and set the
retries on the SubDAG (3 in this case).  In our test environment if there
is a failure, the on_retry_callback successfully resets the state of all
task_instances, so we can fully retry appropriately, so they all go to
UP_FOR_RETRY.  In our production environment, the task_instances go from
FAILED -> UP_FOR_RETRY and then back to FAILED and they won't retry.  It's
unclear why they go back to FAILED.

The logic in the retry handler is as follows:

    logging.info('Setting all task instances of subdag: {subdag} for time:
{execution_date} to UP_FOR_RETRY'
                 .format(subdag=dag_id, execution_date=execution_date))
    task_instances = session.query(TaskInstance).filter(TaskInstance.dag_id
== dag_id,

TaskInstance.execution_date == execution_date)
    for task_instance in task_instances:
        logging.info("Setting task %s from %s to UP_FOR_RETRY",
task_instance.task_id, task_instance.state)
        task_instance.state = State.UP_FOR_RETRY

    if task_instances:
        session.commit()
        task_instances = session.query(TaskInstance). \
            filter(TaskInstance.dag_id == dag_id,
                   TaskInstance.execution_date == execution_date)
        for task_instance in task_instances:
            logging.info("After update found task (%s) with state (%s)",
task_instance.task_id, task_instance.state)

Both environments are using the CeleryExecutor while our production
environment is using 3 workers and our test is using 1.  We're using the
the v1-9-test branch of Airflow in a docker container environment, so the
code in our dev environment is exactly the same as our production one.

I verified that the permissions and schemas in both databases (dev &
production) are the same by dumping them and comparing them.

Any thoughts on what might cause the difference in behavior and why
the on_retry_callback isn't working in our production environment or what
we might look at?  My next step was to add some workers in our dev
environment and see if there is any issue with the number of workers.

Thanks.

Regards,
David

Reply via email to