bertrand-buffat commented on issue #19329:
URL: https://github.com/apache/airflow/issues/19329#issuecomment-1269642000
Sure.
Operator:
```
class ClearFalseUpstreamFailedTaskOperator(AirflowBandAids):
"""Clears tasks that cab appear with a state upstream failed even though
there are no task failed upstream"""
query_template = """SELECT
upstream_failed.dag_id,
upstream_failed.run_id,
upstream_failed.task_id
FROM (
SELECT dag_id, task_id, state, run_id
FROM task_instance
WHERE state = 'upstream_failed'
AND start_date BETWEEN '{start_date}' and '{end_date}'
) AS upstream_failed
LEFT JOIN task_instance t
ON t.dag_id = upstream_failed.dag_id
AND t.run_id = upstream_failed.run_id
AND t.state = 'failed'
WHERE t.state is null
"""
def __init__(
self,
airflow_conn_id,
hour_delta,
*args,
**kwargs,
):
"""
Args:
airflow_conn_id: The connection id of airflow metadata backend
hour_delta: The delta to apply to these task instances execution
date.
The date is used as range to look for false
upstream_failed
task in the metadata DB.
"""
super().__init__(*args, **kwargs)
self.airflow_conn_id = airflow_conn_id
self.hour_delta = hour_delta
def execute(self, context):
tasks_by_dags = self.get_task_per_dag_run(
self.query_template.format(
start_date=context["logical_date"].add(hours=-self.hour_delta),
end_date=context["data_interval_end"],
),
airflow_conn_id=self.airflow_conn_id,
)
if not tasks_by_dags:
raise AirflowSkipException("No task to clear")
for (dag_id, run_id), task_ids in tasks_by_dags.items():
self.clear_tasks(dag_id, run_id, task_ids)
```
dag:
```
from doctolib_plugins.operators.airflow_metadata_operator import
ClearFalseUpstreamFailedTaskOperator
with DAG(
dag_id="wkf_clear_airflow_tasks",
schedule_interval="*/10 * * * *",
default_args=obtain_default_args(),
max_active_runs=1,
catchup=False,
) as dag:
ClearFalseUpstreamFailedTaskOperator(
task_id="clear_false_upstream_failed_tasks",
airflow_conn_id=airflow_backend_db_conn_id,
hour_delta=3,
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]