danmactough commented on issue #13542:
URL: https://github.com/apache/airflow/issues/13542#issuecomment-1011598836


   Airflow 2.0.2+e494306fb01f3a026e7e2832ca94902e96b526fa (MWAA on AWS)
   
   This happens to us a LOT: a DAG will be running, task instances will be 
marked as "queued", but nothing gets moved to "running".
   
   When this happened today (the first time today), I was able to track down 
the following error in the scheduler logs:
   
   ![2022-01-12 at 7 16 
PM](https://user-images.githubusercontent.com/357481/149243393-0f0b5b91-d1f7-4a51-8a43-3eab644a49e7.png)
   
   At some point after the scheduler had that exception, I tried to clear the 
state of the queued task instances to get them to run. That resulting in the 
following logs:
   
   ![2022-01-12 at 7 18 
PM](https://user-images.githubusercontent.com/357481/149243535-3ebfd0b1-31af-43aa-99e2-7ee5aa1dbaff.png)
   
   This corresponds to this [section of 
code](https://github.com/apache/airflow/blob/2.0.2/airflow/executors/base_executor.py#L73-L85):
   
   ![2022-01-12 at 10 38 
AM](https://user-images.githubusercontent.com/357481/149171972-e9824366-6e85-4c2e-a00c-5ee66d466de8.png)
   
   My conclusion is that when the scheduler experienced that error, it entered 
a pathological state: it was running but had bad state in memory. Specifically, 
the queued task instances were in the `queued_tasks` or `running` in-memory 
cache, and thus any attempts to re-queue those tasks would fail as long as that 
scheduler process was running because the tasks would appear to already have 
been queued and/or running.
   
   Both caches use the 
[`TaskInstanceKey`](https://github.com/apache/airflow/blob/10023fdd65fa78033e7125d3d8103b63c127056e/airflow/models/taskinstance.py#L224-L230),
 which is made up of `dag_id` (which we can't change), `task_id` (which we 
can't change), `execution_date` (nope, can't change), and `try_number` (🎉 we 
can change this!!).
   
   So to work around this, I created a utility DAG that will find all task 
instances in a "queued" or "None" state and increment the `try_number` field.
   
   The DAG runs as a single `PythonOperator` with the following callable:
   
   ```python
   @provide_session
   def unstick_dag_callable(dag_run, session, **kwargs):
       dag_id = dag_run.conf.get("dag_id")
       if not dag_id:
           raise AssertionError("dag_id was not provided")
       execution_date = dag_run.conf.get("execution_date")
       if not execution_date:
           raise AssertionError("execution_date was not provided")
       execution_date = parse(execution_date)
   
       filter = [
           or_(TaskInstance.state == State.QUEUED, TaskInstance.state == 
State.NONE),
           TaskInstance.dag_id == dag_id,
           TaskInstance.execution_date == execution_date,
       ]
       print(
           (
               f"DAG id: {dag_id}, Execution Date: {execution_date}, State: "
               f"""{dag_run.conf.get("state", f"{State.QUEUED} or 
{State.NONE}")}, """
               f"Filter: {[str(f) for f in filter]}"
           )
       )
   
       tis = session.query(TaskInstance).filter(*filter).all()
       dr = (
           session.query(DagRun)
           .filter(DagRun.dag_id == dag_id, DagRun.execution_date == 
execution_date)
           .first()
       )
       dagrun = (
           dict(
               id=dr.id,
               dag_id=dr.dag_id,
               execution_date=dr.execution_date,
               start_date=dr.start_date,
               end_date=dr.end_date,
               _state=dr._state,
               run_id=dr.run_id,
               creating_job_id=dr.creating_job_id,
               external_trigger=dr.external_trigger,
               run_type=dr.run_type,
               conf=dr.conf,
               last_scheduling_decision=dr.last_scheduling_decision,
               dag_hash=dr.dag_hash,
           )
           if dr
           else {}
       )
   
       print(f"Updating {len(tis)} task instances")
       print("Here are the task instances we're going to update")
       # Print no more than 100 tis so we don't lock up the session too long
       for ti in tis[:100]:
           pprint(
               dict(
                   task_id=ti.task_id,
                   job_id=ti.job_id,
                   key=ti.key,
                   dag_id=ti.dag_id,
                   execution_date=ti.execution_date,
                   state=ti.state,
                   dag_run={**dagrun},
               )
           )
       if len(tis) > 100:
           print("Output truncated after 100 task instances")
   
       for ti in tis:
           ti.try_number = ti.next_try_number
           ti.state = State.NONE
           session.merge(ti)
   
       if dag_run.conf.get("activate_dag_runs", True):
           dr.state = State.RUNNING
           dr.start_date = timezone.utcnow()
   
       print("Done")
   ```
   
   Moments after I shipped this DAG, another DAG got stuck, and I had a chance 
to see if this utility DAG worked -- it did! 😅 
   
   -----
   
   Couple of thoughts:
   
   - I don't think my error is exactly the same as OP, as some very key 
conditions are not applicable to my case, but this issue appears to have many 
different and probably not at all related bugs that kind of manifest as "stuck 
DAGs" and this issue has pretty good Google juice -- I just hope my explanation 
and/or work-around help someone else.
   - The MWAA product from AWS is using an older version of Airflow, so the 
combination of factors that leads to this pathological state may no longer be 
possible in the current version of Airflow.
   - MWAA uses the CeleryExecutor, which I suspect is where the pathological 
state is coming from, not BaseExecutor directly.
   - All that being said, I'm surprised to see this critical state being kept 
in memory (`queued_tasks` and `running`), but I don't have a complete mental 
model of how the executor and the scheduler are distinct or not. My 
understanding is that this is scheduler code, but with the scheduler being 
high-availability (we're running 3 schedulers), in-memory state seems like 
something we should be using very judiciously and be flushing and rehydrating 
from the database regularly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to