val2k edited a comment on issue #13542:
URL: https://github.com/apache/airflow/issues/13542#issuecomment-1029786174
@jpkoponen I adapted the @danmactough DAG to make it automatic and fit our
use case. In filter DAGs that have been stuck for more than 5 minutes, and
simply delete them. (In my case, changing the `try_number` and the state has no
other effect than queuing the DAG again).
```python
import os
import requests
import time
import json
from datetime import datetime, timedelta
from pprint import pprint
from airflow import DAG
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
from airflow.operators.python import PythonOperator
from airflow.utils import timezone
from airflow.utils.db import provide_session
from airflow.utils.state import State
from dependencies.utils.var import DATADOG_API_KEY
DAG_NAME = os.path.splitext(os.path.basename(__file__))[0]
DEFAULT_ARGS = {
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"execution_timeout": timedelta(minutes=10),
"retries": 0,
}
@provide_session
def unstick_dag_callable(dag_run, session, **kwargs):
filter = [
TaskInstance.state == State.QUEUED,
TaskInstance.queued_dttm < datetime.now(timezone.utc) -
timedelta(minutes=5)
]
tis = session.query(TaskInstance).filter(*filter).all()
print(f"Task instances: {tis}")
print(f"Updating {len(tis)} task instances")
for ti in tis:
dr = (
session.query(DagRun)
.filter(DagRun.run_id == ti.dag_run.run_id)
.first()
)
dagrun = (
dict(
id=dr.id,
dag_id=dr.dag_id,
execution_date=dr.execution_date,
start_date=dr.start_date,
end_date=dr.end_date,
_state=dr._state,
run_id=dr.run_id,
creating_job_id=dr.creating_job_id,
external_trigger=dr.external_trigger,
run_type=dr.run_type,
conf=dr.conf,
last_scheduling_decision=dr.last_scheduling_decision,
dag_hash=dr.dag_hash,
)
if dr
else {}
)
pprint(
dict(
task_id=ti.task_id,
job_id=ti.job_id,
key=ti.key,
dag_id=ti.dag_id,
execution_date=ti.execution_date,
state=ti.state,
dag_run={**dagrun},
)
)
dr.state = State.FAILED
print(f"Deleting {str(ti)}.")
session.delete(ti)
session.commit()
print("Done.")
with DAG(
DAG_NAME,
description="Utility DAG to fix TaskInstances stuck in queued state",
default_args=DEFAULT_ARGS,
schedule_interval="*/5 * * * *",
start_date=datetime(year=2021, month=8, day=1),
max_active_runs=1,
catchup=False,
default_view="graph",
is_paused_upon_creation=False,
) as dag:
PythonOperator(task_id="unstick_dag",
python_callable=unstick_dag_callable)
```
The DAG runs every 5 minutes and I never caught it in a queued state.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]