Sorry, I meant to say we check the state of FAILED and not UPSTREAM_FAILED tasks, which we'd set to UP_FOR_RETRY.
Thanks. Regards, David On Mon, Oct 23, 2017 at 10:30 AM, David Klosowski <dav...@thinknear.com> wrote: > Hey Airflow Devs, > > We've noticed that the on_retry_callback on the SubDagOperator seems to be > acting differently in version 1.9.0 from 1.8.x. What's more interesting, > it's only happening in one environment and not another making it hard to > determine the issue since that environment that has unexpected behavior is > our production environment. > > We had custom logic in our on_retry_callback and prevent the subtasks > themselves from managing their retries, allowing the subdag to act as the > retry manager. > > Before we'd check the state of of UPSTREAM_FAILED tasks setting them to > UP_FOR_RETRY. We set the task_instances to have 0 retries and set the > retries on the SubDAG (3 in this case). In our test environment if there > is a failure, the on_retry_callback successfully resets the state of all > task_instances, so we can fully retry appropriately, so they all go to > UP_FOR_RETRY. In our production environment, the task_instances go from > FAILED -> UP_FOR_RETRY and then back to FAILED and they won't retry. It's > unclear why they go back to FAILED. > > The logic in the retry handler is as follows: > > logging.info('Setting all task instances of subdag: {subdag} for > time: {execution_date} to UP_FOR_RETRY' > .format(subdag=dag_id, execution_date=execution_date)) > task_instances = session.query(TaskInstance).filter(TaskInstance.dag_id > == dag_id, > > TaskInstance.execution_date == execution_date) > for task_instance in task_instances: > logging.info("Setting task %s from %s to UP_FOR_RETRY", > task_instance.task_id, task_instance.state) > task_instance.state = State.UP_FOR_RETRY > > if task_instances: > session.commit() > task_instances = session.query(TaskInstance). \ > filter(TaskInstance.dag_id == dag_id, > TaskInstance.execution_date == execution_date) > for task_instance in task_instances: > logging.info("After update found task (%s) with state (%s)", > task_instance.task_id, task_instance.state) > > Both environments are using the CeleryExecutor while our production > environment is using 3 workers and our test is using 1. We're using the > the v1-9-test branch of Airflow in a docker container environment, so the > code in our dev environment is exactly the same as our production one. > > I verified that the permissions and schemas in both databases (dev & > production) are the same by dumping them and comparing them. > > Any thoughts on what might cause the difference in behavior and why > the on_retry_callback isn't working in our production environment or what > we might look at? My next step was to add some workers in our dev > environment and see if there is any issue with the number of workers. > > Thanks. > > Regards, > David > >