Thank you Paul and Bolke. We have some older version of Airflow deployed on our prod system and I could make the pipeline work on it by replacing the following import statement :
from airflow.operators.subdag_operator import SubDagOperator with from airflow.operators import SubDagOperator On Fri, Apr 21, 2017 at 8:34 PM, Bolke de Bruin <[email protected]> wrote: > Most of the (known) issues will be fixed in 1.8.1, which we are working on > getting out the door. > > I'm not sure what can cause the below issue, and will investigate when I > have some time. > > Bolke > > Sent from my iPhone > > > On 21 Apr 2017, at 16:50, Paul Zaczkiewicz <[email protected]> wrote: > > > > There are major outstanding issues with SubDagOperator. in 1.8.0 separate > > from what you're experiencing here. Sometimes a SubDag run will just > hang, > > and you can't re-run portions of the SubDag without re-running the entire > > SubDag. I'd recommend against using SubDags in 1.8.0. > > > > I'm not sure what's causing your Zoom Into Subdag issues though. > > > > On Wed, Apr 19, 2017 at 6:44 AM, Devjyoti Patra <[email protected]> > > wrote: > > > >> I am not able to make SubdagOperator work with CeleryExecutor. The > >> following code works well on my local setup (with LocalExecutor), but on > >> production, when clicking on "Zoom into Sub Dag", Airflow complains that > >> dag with the name "parent_name.child_name". is not found. Please tell > me > >> what did I miss out in my implementation. > >> > >> Thanks, > >> Devj > >> > >> > >> default_args = { > >> 'owner': 'airflow', > >> 'start_date': datetime.strptime('${date_str}', '%Y-%m-%d'), > >> 'email': ['${email_list}'], > >> 'email_on_failure': True, > >> 'email_on_retry': True, > >> 'retries': 1, > >> 'retry_delay': timedelta(minutes=5), > >> 'queue': 'default'} > >> > >> def sub_dag(child_dag_name, default_args, start_date, schedule_interval, > >> kas): > >> subdag = DAG( > >> '%s.%s' % (PARENT_DAG_NAME, child_dag_name), > >> default_args=default_args, > >> schedule_interval=schedule_interval, > >> start_date=start_date, > >> ) > >> > >> fork = DummyOperator(task_id='discovery_fork_' + str(k), > >> dag=subdag) > >> #The JOIN task has to be changed for writing to RDS > >> join = BashOperator( > >> task_id='join_' + str(k), > >> bash_command='echo "more wait for subdag..."', > >> default_args=default_args, > >> dag=subdag > >> ) > >> > >> for k in kas: > >> task = QuboleOperator( > >> > >> task_id='task_' + str(k), > >> command_type='sparkcmd', > >> sql="SOME QUERY", > >> qubole_conn_id='default', > >> provide_context=True, > >> dag=subdag) > >> > >> task.set_upstream(fork) > >> task.set_downstream(join) > >> > >> > >> return subdag > >> > >> #Airflow pipeline is created below > >> dag = DAG(PARENT_DAG_NAME, > >> default_args=default_args, > >> schedule_interval='@hourly') > >> > >> > >> > >> start_node = DummyOperator(task_id='start', > >> dag=dag) > >> > >> > >> end_node = DummyOperator(task_id='end', > >> dag=dag) > >> > >> > >> > >> setup_task = QuboleOperator( > >> task_id='setup_task', > >> command_type='sparkcmd', > >> sql="SOME QUERY", > >> qubole_conn_id='default', > >> provide_context=True, > >> dag=dag) > >> > >> setup_task.set_upstream(start_node) > >> > >> > >> for k in k_per_subdag: > >> child_name = SUBDAG_NAME_PREFIX + str(k) > >> > >> branch = SubDagOperator( > >> subdag=sub_dag(child_name, default_args, dag.start_date, > >> dag.schedule_interval, k), > >> default_args=default_args, > >> task_id=child_name, > >> dag=dag > >> ) > >> > >> branch.set_upstream(setup_task) > >> branch.set_downstream(end_node) > >> >
