danmactough commented on issue #13542: URL: https://github.com/apache/airflow/issues/13542#issuecomment-1011598836
Airflow 2.0.2+e494306fb01f3a026e7e2832ca94902e96b526fa (MWAA on AWS) This happens to us a LOT: a DAG will be running, task instances will be marked as "queued", but nothing gets moved to "running". When this happened today (the first time today), I was able to track down the following error in the scheduler logs: ![2022-01-12 at 7 16 PM](https://user-images.githubusercontent.com/357481/149243393-0f0b5b91-d1f7-4a51-8a43-3eab644a49e7.png) At some point after the scheduler had that exception, I tried to clear the state of the queued task instances to get them to run. That resulting in the following logs: ![2022-01-12 at 7 18 PM](https://user-images.githubusercontent.com/357481/149243535-3ebfd0b1-31af-43aa-99e2-7ee5aa1dbaff.png) This corresponds to this [section of code](https://github.com/apache/airflow/blob/2.0.2/airflow/executors/base_executor.py#L73-L85): ![2022-01-12 at 10 38 AM](https://user-images.githubusercontent.com/357481/149171972-e9824366-6e85-4c2e-a00c-5ee66d466de8.png) My conclusion is that when the scheduler experienced that error, it entered a pathological state: it was running but had bad state in memory. Specifically, the queued task instances were in the `queued_tasks` or `running` in-memory cache, and thus any attempts to re-queue those tasks would fail as long as that scheduler process was running because the tasks would appear to already have been queued and/or running. Both caches use the [`TaskInstanceKey`](https://github.com/apache/airflow/blob/10023fdd65fa78033e7125d3d8103b63c127056e/airflow/models/taskinstance.py#L224-L230), which is made up of `dag_id` (which we can't change), `task_id` (which we can't change), `execution_date` (nope, can't change), and `try_number` (🎉 we can change this!!). So to work around this, I created a utility DAG that will find all task instances in a "queued" or "None" state and increment the `try_number` field. The DAG runs as a single `PythonOperator` with the following callable: ```python @provide_session def unstick_dag_callable(dag_run, session, **kwargs): dag_id = dag_run.conf.get("dag_id") if not dag_id: raise AssertionError("dag_id was not provided") execution_date = dag_run.conf.get("execution_date") if not execution_date: raise AssertionError("execution_date was not provided") execution_date = parse(execution_date) filter = [ or_(TaskInstance.state == State.QUEUED, TaskInstance.state == State.NONE), TaskInstance.dag_id == dag_id, TaskInstance.execution_date == execution_date, ] print( ( f"DAG id: {dag_id}, Execution Date: {execution_date}, State: " f"""{dag_run.conf.get("state", f"{State.QUEUED} or {State.NONE}")}, """ f"Filter: {[str(f) for f in filter]}" ) ) tis = session.query(TaskInstance).filter(*filter).all() dr = ( session.query(DagRun) .filter(DagRun.dag_id == dag_id, DagRun.execution_date == execution_date) .first() ) dagrun = ( dict( id=dr.id, dag_id=dr.dag_id, execution_date=dr.execution_date, start_date=dr.start_date, end_date=dr.end_date, _state=dr._state, run_id=dr.run_id, creating_job_id=dr.creating_job_id, external_trigger=dr.external_trigger, run_type=dr.run_type, conf=dr.conf, last_scheduling_decision=dr.last_scheduling_decision, dag_hash=dr.dag_hash, ) if dr else {} ) print(f"Updating {len(tis)} task instances") print("Here are the task instances we're going to update") # Print no more than 100 tis so we don't lock up the session too long for ti in tis[:100]: pprint( dict( task_id=ti.task_id, job_id=ti.job_id, key=ti.key, dag_id=ti.dag_id, execution_date=ti.execution_date, state=ti.state, dag_run={**dagrun}, ) ) if len(tis) > 100: print("Output truncated after 100 task instances") for ti in tis: ti.try_number = ti.next_try_number ti.state = State.NONE session.merge(ti) if dag_run.conf.get("activate_dag_runs", True): dr.state = State.RUNNING dr.start_date = timezone.utcnow() print("Done") ``` Moments after I shipped this DAG, another DAG got stuck, and I had a chance to see if this utility DAG worked -- it did! 😅 ----- Couple of thoughts: - I don't think my error is exactly the same as OP, as some very key conditions are not applicable to my case, but this issue appears to have many different and probably not at all related bugs that kind of manifest as "stuck DAGs" and this issue has pretty good Google juice -- I just hope my explanation and/or work-around help someone else. - The MWAA product from AWS is using an older version of Airflow, so the combination of factors that leads to this pathological state may no longer be possible in the current version of Airflow. - MWAA uses the CeleryExecutor, which I suspect is where the pathological state is coming from, not BaseExecutor directly. - All that being said, I'm surprised to see this critical state being kept in memory (`queued_tasks` and `running`), but I don't have a complete mental model of how the executor and the scheduler are distinct or not. My understanding is that this is scheduler code, but with the scheduler being high-availability (we're running 3 schedulers), in-memory state seems like something we should be using very judiciously and be flushing and rehydrating from the database regularly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org