Hey ypwdr, I got a pull request that will fix this problem. It works for me but I need to fix the failing branch. Be my guest to try it out and see if it resolves your problem.
On Wed, Jan 4, 2017 at 4:14 PM <[email protected]> wrote: > Hello > > I just became aware that failure inside a branch (BranchPythonOperator) set > the DAG state to success despites the task failure. > Indeed Airflow set all downstream tasks to the skipped state because of the > trigger_rule needed for the branching mecanism and as a consequence Airflow > wrongly set the DAG state to success. Please find below a example DAG to > trigger this behaviour. > Is there any way to make tasks run more reliable inside a branch ? > > > from airflow.models import DAG > from airflow.operators.python_operator import PythonOperator, > BranchPythonOperator > from datetime import datetime, timedelta > > DAG_NAME = 'branch_issue' > > dag = DAG(DAG_NAME + '_v1', start_date=datetime(2017, 1, 4, 9, 10), > schedule_interval=timedelta(hours=1)) > > > branch1 = BranchPythonOperator( > task_id='branch1', > python_callable=lambda: 't1', > dag=dag) > > t1 = PythonOperator( > task_id='t1', > python_callable=lambda: sys.exit(1), > dag=dag) > t1.set_upstream(branch1) > > t2 = PythonOperator( > task_id='t2', > python_callable=lambda: True, > dag=dag) > t2.set_upstream(branch1) > > process1 = PythonOperator( > task_id='process1', > python_callable=lambda: True, > trigger_rule='one_success', > dag=dag) > process1.set_upstream(t1) > process1.set_upstream(t2) > > process2 = PythonOperator( > task_id='process2', > python_callable=lambda: True, > dag=dag) > process2.set_upstream(process1) > > process3 = PythonOperator( > task_id='process3', > python_callable=lambda: True, > dag=dag) > process3.set_upstream(process2) > > > At moment, I want my privacy to be protected. > https://mytemp.email/ > -- _/ _/ Alex Van Boxel
