Hello

I just became aware that failure inside a branch (BranchPythonOperator) set the DAG state to success despites the task failure. Indeed Airflow set all downstream tasks to the skipped state because of the trigger_rule needed for the branching mecanism and as a consequence Airflow wrongly set the DAG state to success. Please find below a example DAG to trigger this behaviour.
Is there any way to make tasks run more reliable inside a branch ?


from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from datetime import datetime, timedelta

DAG_NAME = 'branch_issue'

dag = DAG(DAG_NAME + '_v1', start_date=datetime(2017, 1, 4, 9, 10), schedule_interval=timedelta(hours=1))


branch1 = BranchPythonOperator(
        task_id='branch1',
        python_callable=lambda: 't1',
        dag=dag)

t1 = PythonOperator(
    task_id='t1',
    python_callable=lambda: sys.exit(1),
    dag=dag)
t1.set_upstream(branch1)

t2 = PythonOperator(
    task_id='t2',
    python_callable=lambda: True,
    dag=dag)
t2.set_upstream(branch1)

process1 = PythonOperator(
    task_id='process1',
    python_callable=lambda: True,
    trigger_rule='one_success',
    dag=dag)
process1.set_upstream(t1)
process1.set_upstream(t2)

process2 = PythonOperator(
    task_id='process2',
    python_callable=lambda: True,
    dag=dag)
process2.set_upstream(process1)

process3 = PythonOperator(
    task_id='process3',
    python_callable=lambda: True,
    dag=dag)
process3.set_upstream(process2)


At moment, I want my privacy to be protected.
https://mytemp.email/

Reply via email to