There are major outstanding issues with SubDagOperator. in 1.8.0 separate from what you're experiencing here. Sometimes a SubDag run will just hang, and you can't re-run portions of the SubDag without re-running the entire SubDag. I'd recommend against using SubDags in 1.8.0.
I'm not sure what's causing your Zoom Into Subdag issues though. On Wed, Apr 19, 2017 at 6:44 AM, Devjyoti Patra <[email protected]> wrote: > I am not able to make SubdagOperator work with CeleryExecutor. The > following code works well on my local setup (with LocalExecutor), but on > production, when clicking on "Zoom into Sub Dag", Airflow complains that > dag with the name "parent_name.child_name". is not found. Please tell me > what did I miss out in my implementation. > > Thanks, > Devj > > > default_args = { > 'owner': 'airflow', > 'start_date': datetime.strptime('${date_str}', '%Y-%m-%d'), > 'email': ['${email_list}'], > 'email_on_failure': True, > 'email_on_retry': True, > 'retries': 1, > 'retry_delay': timedelta(minutes=5), > 'queue': 'default'} > > def sub_dag(child_dag_name, default_args, start_date, schedule_interval, > kas): > subdag = DAG( > '%s.%s' % (PARENT_DAG_NAME, child_dag_name), > default_args=default_args, > schedule_interval=schedule_interval, > start_date=start_date, > ) > > fork = DummyOperator(task_id='discovery_fork_' + str(k), > dag=subdag) > #The JOIN task has to be changed for writing to RDS > join = BashOperator( > task_id='join_' + str(k), > bash_command='echo "more wait for subdag..."', > default_args=default_args, > dag=subdag > ) > > for k in kas: > task = QuboleOperator( > > task_id='task_' + str(k), > command_type='sparkcmd', > sql="SOME QUERY", > qubole_conn_id='default', > provide_context=True, > dag=subdag) > > task.set_upstream(fork) > task.set_downstream(join) > > > return subdag > > #Airflow pipeline is created below > dag = DAG(PARENT_DAG_NAME, > default_args=default_args, > schedule_interval='@hourly') > > > > start_node = DummyOperator(task_id='start', > dag=dag) > > > end_node = DummyOperator(task_id='end', > dag=dag) > > > > setup_task = QuboleOperator( > task_id='setup_task', > command_type='sparkcmd', > sql="SOME QUERY", > qubole_conn_id='default', > provide_context=True, > dag=dag) > > setup_task.set_upstream(start_node) > > > for k in k_per_subdag: > child_name = SUBDAG_NAME_PREFIX + str(k) > > branch = SubDagOperator( > subdag=sub_dag(child_name, default_args, dag.start_date, > dag.schedule_interval, k), > default_args=default_args, > task_id=child_name, > dag=dag > ) > > branch.set_upstream(setup_task) > branch.set_downstream(end_node) >
