bertrand-buffat commented on issue #19329:
URL: https://github.com/apache/airflow/issues/19329#issuecomment-1269642000

   Sure.
   Operator:
   ```
   class ClearFalseUpstreamFailedTaskOperator(AirflowBandAids):
       """Clears tasks that cab appear with a state upstream failed even though 
there are no task failed upstream"""
   
       query_template = """SELECT
     upstream_failed.dag_id,
     upstream_failed.run_id,
     upstream_failed.task_id
   FROM (
    SELECT dag_id, task_id, state, run_id
    FROM task_instance
    WHERE state = 'upstream_failed'
    AND start_date BETWEEN '{start_date}' and '{end_date}'
   ) AS upstream_failed
   LEFT JOIN task_instance t
     ON t.dag_id = upstream_failed.dag_id
     AND t.run_id = upstream_failed.run_id
     AND t.state = 'failed'
   WHERE t.state is null
   """
   
       def __init__(
           self,
           airflow_conn_id,
           hour_delta,
           *args,
           **kwargs,
       ):
           """
           Args:
               airflow_conn_id: The connection id of airflow metadata backend
               hour_delta: The delta to apply to these task instances execution 
date.
                           The date is used as range to look for false 
upstream_failed
                           task in the metadata DB.
           """
           super().__init__(*args, **kwargs)
           self.airflow_conn_id = airflow_conn_id
           self.hour_delta = hour_delta
   
       def execute(self, context):
           tasks_by_dags = self.get_task_per_dag_run(
               self.query_template.format(
                   
start_date=context["logical_date"].add(hours=-self.hour_delta),
                   end_date=context["data_interval_end"],
               ),
               airflow_conn_id=self.airflow_conn_id,
           )
   
           if not tasks_by_dags:
               raise AirflowSkipException("No task to clear")
   
           for (dag_id, run_id), task_ids in tasks_by_dags.items():
               self.clear_tasks(dag_id, run_id, task_ids)
   ```
   
   dag:
   ```
   from doctolib_plugins.operators.airflow_metadata_operator import 
ClearFalseUpstreamFailedTaskOperator
   
   with DAG(
       dag_id="wkf_clear_airflow_tasks",
       schedule_interval="*/10 * * * *",
       default_args=obtain_default_args(),
       max_active_runs=1,
       catchup=False,
   ) as dag:
       ClearFalseUpstreamFailedTaskOperator(
           task_id="clear_false_upstream_failed_tasks",
           airflow_conn_id=airflow_backend_db_conn_id,
           hour_delta=3,
       )
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to