tal181 commented on issue #13542:
URL: https://github.com/apache/airflow/issues/13542#issuecomment-1131428221
From Airflow doc :
Re-run Tasks
Some of the tasks can fail during the scheduled run. Once you have fixed the
errors after going through the logs, you can re-run the tasks by clearing them
for the scheduled date. **Clearing a task instance doesn't delete the task
instance record. Instead, it updates max_tries to 0 and sets the current task
instance state to None, which causes the task to re-run**.
Click on the failed task in the Tree or Graph views and then click on Clear.
The executor will re-run it.
Taking @val2k script and changing the max_tries to 0 & state to None fixed
the script for us
```
import os
import requests
import time
import json
from datetime import datetime, timedelta
from pprint import pprint
import logging
from airflow import DAG
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
from airflow.operators.python import PythonOperator
from airflow.utils import timezone
from airflow.utils.db import provide_session
from airflow.utils.state import State
from clients.constants import WORKFLOW_ARGS
from clients.notifications_client import NotificationsClient
from event_type import EventType
from config.configuration import Configuration
from utils import get_env
from airflow.models.taskinstance import clear_task_instances
from sqlalchemy.sql.expression import or_
def send_sla_retry_notification(task_ids):
message = prepare_message(task_ids)
ENV = get_env()
configuration = Configuration(env=ENV)
notifications_client = NotificationsClient(configuration)
notifications_client.send_data_set_notification(0,
EventType.PIPELINES_MISSED_SLA, message, message,
datetime.now().isoformat(), 0, None, "DQ-AIRFLOW")
def prepare_message(task_ids):
message = "Going to retry tasks : \n"
for task_id in task_ids:
message = message + str(task_id) + "\n"
message = message[:-1]
logging.info(f"preparing message {message}")
return message
@provide_session
def unstick_dag_callable(session, **kwargs):
filter = [
or_(TaskInstance.state == State.QUEUED, TaskInstance.state ==
State.NONE),
TaskInstance.queued_dttm < datetime.now(timezone.utc) -
timedelta(hours=2)
]
tis = session.query(TaskInstance).filter(*filter).all()
logging.info(f"Task instances: {tis}")
logging.info(f"Updating {len(tis)} task instances")
task_ids = []
for ti in tis:
task_ids.append(f"dag_id : {ti.dag_id},task_id : {ti.task_id},
execution_date : {ti.execution_date}")
try:
dr = (
session.query(DagRun)
.filter(DagRun.run_id == ti.dag_run.run_id)
.first()
)
dagrun = {}
if dr:
dagrun = dict(
id=dr.id,
dag_id=dr.dag_id,
execution_date=dr.execution_date,
start_date=dr.start_date,
end_date=dr.end_date,
_state=dr._state,
run_id=dr.run_id,
creating_job_id=dr.creating_job_id,
external_trigger=dr.external_trigger,
run_type=dr.run_type,
conf=dr.conf,
last_scheduling_decision=dr.last_scheduling_decision,
dag_hash=dr.dag_hash,
)
logging.info(
dict(
task_id=ti.task_id,
job_id=ti.job_id,
key=ti.key,
dag_id=ti.dag_id,
execution_date=ti.execution_date,
state=ti.state,
dag_run={**dagrun},
)
)
ti.max_tries = 0
ti.state = None
session.merge(ti)
except Exception as e:
logging.error("Failed to clear task reason : " + str(e))
session.commit()
if(len(task_ids)> 0):
send_sla_retry_notification(task_ids)
else:
logging.info("No DAGS to retry.")
logging.info("Done.")
def clear_task(session, ti):
clear_task_instances(tis=[ti],
session=session,
activate_dag_runs=True,
dag=None)
with DAG(
"retry_dag",
description="Utility DAG to fix TaskInstances stuck in queued state",
default_args=WORKFLOW_ARGS,
schedule_interval="*/10 * * * *",
start_date=datetime(year=2021, month=8, day=1),
max_active_runs=1,
catchup=False,
is_paused_upon_creation=True,
) as dag:
PythonOperator(task_id="unstick_dag",
python_callable=unstick_dag_callable)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]