Hello
I just became aware that failure inside a branch (BranchPythonOperator) set
the DAG state to success despites the task failure.
Indeed Airflow set all downstream tasks to the skipped state because of the
trigger_rule needed for the branching mecanism and as a consequence Airflow
wrongly set the DAG state to success. Please find below a example DAG to
trigger this behaviour.
Is there any way to make tasks run more reliable inside a branch ?
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator,
BranchPythonOperator
from datetime import datetime, timedelta
DAG_NAME = 'branch_issue'
dag = DAG(DAG_NAME + '_v1', start_date=datetime(2017, 1, 4, 9, 10),
schedule_interval=timedelta(hours=1))
branch1 = BranchPythonOperator(
task_id='branch1',
python_callable=lambda: 't1',
dag=dag)
t1 = PythonOperator(
task_id='t1',
python_callable=lambda: sys.exit(1),
dag=dag)
t1.set_upstream(branch1)
t2 = PythonOperator(
task_id='t2',
python_callable=lambda: True,
dag=dag)
t2.set_upstream(branch1)
process1 = PythonOperator(
task_id='process1',
python_callable=lambda: True,
trigger_rule='one_success',
dag=dag)
process1.set_upstream(t1)
process1.set_upstream(t2)
process2 = PythonOperator(
task_id='process2',
python_callable=lambda: True,
dag=dag)
process2.set_upstream(process1)
process3 = PythonOperator(
task_id='process3',
python_callable=lambda: True,
dag=dag)
process3.set_upstream(process2)
At moment, I want my privacy to be protected.
https://mytemp.email/