tal181 commented on issue #13542:
URL: https://github.com/apache/airflow/issues/13542#issuecomment-1131428221

   From Airflow doc :
    Re-run Tasks
   Some of the tasks can fail during the scheduled run. Once you have fixed the 
errors after going through the logs, you can re-run the tasks by clearing them 
for the scheduled date. **Clearing a task instance doesn't delete the task 
instance record. Instead, it updates max_tries to 0 and sets the current task 
instance state to None, which causes the task to re-run**.
   Click on the failed task in the Tree or Graph views and then click on Clear. 
The executor will re-run it.
   
   Taking @val2k script and changing the max_tries to 0 & state to None fixed 
the script for us
   ```
   import os
   import requests
   import time
   import json
   from datetime import datetime, timedelta
   from pprint import pprint
   import logging
   from airflow import DAG
   from airflow.models.dagrun import DagRun
   from airflow.models.taskinstance import TaskInstance
   from airflow.operators.python import PythonOperator
   from airflow.utils import timezone
   from airflow.utils.db import provide_session
   from airflow.utils.state import State
   from clients.constants import WORKFLOW_ARGS
   from clients.notifications_client import NotificationsClient
   from event_type import EventType
   from config.configuration import Configuration
   from utils import get_env
   from airflow.models.taskinstance import clear_task_instances
   from sqlalchemy.sql.expression import or_
   
   def send_sla_retry_notification(task_ids):
       message = prepare_message(task_ids)
   
       ENV = get_env()
       configuration = Configuration(env=ENV)
       notifications_client = NotificationsClient(configuration)
       notifications_client.send_data_set_notification(0, 
EventType.PIPELINES_MISSED_SLA, message, message,
                                                            
datetime.now().isoformat(), 0, None, "DQ-AIRFLOW")
   
   def prepare_message(task_ids):
       message = "Going to retry tasks : \n"
       for task_id in task_ids:
           message = message + str(task_id) + "\n"
       message = message[:-1]
   
       logging.info(f"preparing message {message}")
   
       return message
   
   @provide_session
   def unstick_dag_callable(session, **kwargs):
       filter = [
           or_(TaskInstance.state == State.QUEUED, TaskInstance.state == 
State.NONE),
           TaskInstance.queued_dttm < datetime.now(timezone.utc) - 
timedelta(hours=2)
       ]
   
       tis = session.query(TaskInstance).filter(*filter).all()
       logging.info(f"Task instances: {tis}")
       logging.info(f"Updating {len(tis)} task instances")
   
       task_ids = []
       for ti in tis:
   
           task_ids.append(f"dag_id : {ti.dag_id},task_id : {ti.task_id}, 
execution_date : {ti.execution_date}")
   
           try:
               dr = (
                   session.query(DagRun)
                       .filter(DagRun.run_id == ti.dag_run.run_id)
                       .first()
               )
   
               dagrun = {}
   
               if dr:
                   dagrun = dict(
                       id=dr.id,
                       dag_id=dr.dag_id,
                       execution_date=dr.execution_date,
                       start_date=dr.start_date,
                       end_date=dr.end_date,
                       _state=dr._state,
                       run_id=dr.run_id,
                       creating_job_id=dr.creating_job_id,
                       external_trigger=dr.external_trigger,
                       run_type=dr.run_type,
                       conf=dr.conf,
                       last_scheduling_decision=dr.last_scheduling_decision,
                       dag_hash=dr.dag_hash,
                   )
   
               logging.info(
                   dict(
                       task_id=ti.task_id,
                       job_id=ti.job_id,
                       key=ti.key,
                       dag_id=ti.dag_id,
                       execution_date=ti.execution_date,
                       state=ti.state,
                       dag_run={**dagrun},
                   )
               )
   
   
   
               ti.max_tries = 0
               ti.state = None
               session.merge(ti)
           except Exception as e:
               logging.error("Failed to clear task reason : " + str(e))
       session.commit()
   
       if(len(task_ids)> 0):
           send_sla_retry_notification(task_ids)
       else:
           logging.info("No DAGS to retry.")
       logging.info("Done.")
   
   
   def clear_task(session, ti):
       clear_task_instances(tis=[ti],
                            session=session,
                            activate_dag_runs=True,
                            dag=None)
   
   
   with DAG(
       "retry_dag",
       description="Utility DAG to fix TaskInstances stuck in queued state",
       default_args=WORKFLOW_ARGS,
       schedule_interval="*/10 * * * *",
       start_date=datetime(year=2021, month=8, day=1),
       max_active_runs=1,
       catchup=False,
       is_paused_upon_creation=True,
   ) as dag:
       PythonOperator(task_id="unstick_dag", 
python_callable=unstick_dag_callable)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to