Hello Airflow Devs, 

This is a continuation of this mailing thread: 

https://lists.apache.org/thread.html/25f07715b834a0e8b70b9e39fad8b82771fb267c33da484f15e61c3e@%3Cdev.airflow.apache.org%3E

So we were able to figure out why our Subdag Operator task instances would go 
from FAILED -> UP_FOR_RETRY -> FAILED. 

As you can read from the above posted mailing list, we have custom 
on_retry_callback function that will set all the task instances (within the 
Subdag) to be UP_FOR_RETRY. And some other process (unknown to us) would set 
the state of these task instances back to FAILED. And thus, when the next 
iteration of the Subdag would run, it could not execute the Subdag tasks since 
they were set to FAILED. 

Here is what we found:

SubdagOperators have their own DagRun entry, as a result, whenever a Subdag 
task fails, the DagRun will be set to failed whenever the root task instance 
(inside the Subdag) is UPSTREAM_FAILED or FAILED.  Since all of our Subdag 
tasks have 0 retries configured, no runs will continue.  Our Subdag has retries 
set to 2, so it will call the on_retry_callback and set all the Subdag task 
instances to be UP_FOR_RETRY.  In the `jobs.py`, there is a function called 
`_change_state_for_tis_without_dagrun()`, under the SchedulerJob class, that 
will change the state of the task instances of a corresponding DagRun when the 
DagRun's state is not set to RUNNING.  The 
`_change_state_for_tis_without_dagrun()` will then mark the subdag tasks as 
FAILED afterwards, since the Subdag's DagRun was set to FAILED.

So what we did to combat this was we would also change the state of the Subdag 
DagRun (in the on_retry_callback handler) to RUNNING to avoid this from 
happening. Now our retries are running fine. 

We have a couple questions here:

1) This may be an edge case, but this seems somewhat roundabout in terms of how 
we might manage this case.  The comment in the code states the following:

                # Handle cases where a DAG run state is set (perhaps manually) 
to
                # a non-running state. Handle task instances that belong to
                # DAG runs in those states

                # If a task instance is up for retry but the corresponding DAG 
run
                # isn't running, mark the task instance as FAILED so we don't 
try
                # to re-run it.
                self._change_state_for_tis_without_dagrun(simple_dag_bag,
                                          [State.UP_FOR_RETRY],
                                          State.FAILED)

There appears to be an issue around a SubDag having a DagRun and also being a 
Task that we encountered b/c of the on_retry_callback.  Does it make more sense 
for a SubDag DagRun to not be subject to this condition (the condition 
identified in the comment)?

2) We are not able to figure out why our old on_retry_callback was working fine 
in our testing environment (which doesn't actively run DAGs unless we're 
testing) but not our production environment, which constantly runs DAGs.  We 
replicated our sandbox environment to behave exactly like production and 
scheduled DAGs to run, but haven't seen the above behavior.  Any thoughts why 
this might behave differently in some cases?  

Thanks,

Colin

Reply via email to