Thank you Paul and Bolke.

We have some older version of Airflow deployed on our prod system and I
could make the pipeline work on it by replacing the following import
statement :


from airflow.operators.subdag_operator import SubDagOperator

with

from airflow.operators import SubDagOperator


On Fri, Apr 21, 2017 at 8:34 PM, Bolke de Bruin <[email protected]> wrote:

> Most of the (known) issues will be fixed in 1.8.1, which we are working on
> getting out the door.
>
> I'm not sure what can cause the below issue, and will investigate when I
> have some time.
>
> Bolke
>
> Sent from my iPhone
>
> > On 21 Apr 2017, at 16:50, Paul Zaczkiewicz <[email protected]> wrote:
> >
> > There are major outstanding issues with SubDagOperator. in 1.8.0 separate
> > from what you're experiencing here.  Sometimes a SubDag run will just
> hang,
> > and you can't re-run portions of the SubDag without re-running the entire
> > SubDag. I'd recommend against using SubDags in 1.8.0.
> >
> > I'm not sure what's causing your Zoom Into Subdag issues though.
> >
> > On Wed, Apr 19, 2017 at 6:44 AM, Devjyoti Patra <[email protected]>
> > wrote:
> >
> >> I am not able to make SubdagOperator work with CeleryExecutor. The
> >> following code works well on my local setup (with LocalExecutor), but on
> >> production, when clicking on "Zoom into Sub Dag", Airflow complains that
> >> dag with the name  "parent_name.child_name". is not found. Please tell
> me
> >> what did I miss out in my implementation.
> >>
> >> Thanks,
> >> Devj
> >>
> >>
> >> default_args = {
> >>    'owner': 'airflow',
> >>    'start_date': datetime.strptime('${date_str}', '%Y-%m-%d'),
> >>    'email': ['${email_list}'],
> >>    'email_on_failure': True,
> >>    'email_on_retry': True,
> >>    'retries': 1,
> >>    'retry_delay': timedelta(minutes=5),
> >>    'queue': 'default'}
> >>
> >> def sub_dag(child_dag_name, default_args, start_date, schedule_interval,
> >> kas):
> >>    subdag = DAG(
> >>        '%s.%s' % (PARENT_DAG_NAME, child_dag_name),
> >>        default_args=default_args,
> >>        schedule_interval=schedule_interval,
> >>        start_date=start_date,
> >>    )
> >>
> >>    fork = DummyOperator(task_id='discovery_fork_' + str(k),
> >>                                 dag=subdag)
> >>    #The JOIN task has to be changed for writing to RDS
> >>     join = BashOperator(
> >>            task_id='join_' + str(k),
> >>            bash_command='echo "more wait for subdag..."',
> >>            default_args=default_args,
> >>            dag=subdag
> >>      )
> >>
> >> for k in kas:
> >> task = QuboleOperator(
> >>
> >>               task_id='task_' + str(k),
> >>               command_type='sparkcmd',
> >>               sql="SOME QUERY",
> >>               qubole_conn_id='default',
> >>               provide_context=True,
> >>               dag=subdag)
> >>
> >>         task.set_upstream(fork)
> >>         task.set_downstream(join)
> >>
> >>
> >> return subdag
> >>
> >> #Airflow pipeline is created below
> >> dag = DAG(PARENT_DAG_NAME,
> >>        default_args=default_args,
> >>        schedule_interval='@hourly')
> >>
> >>
> >>
> >> start_node = DummyOperator(task_id='start',
> >>                           dag=dag)
> >>
> >>
> >> end_node = DummyOperator(task_id='end',
> >>                         dag=dag)
> >>
> >>
> >>
> >> setup_task = QuboleOperator(
> >>        task_id='setup_task',
> >>        command_type='sparkcmd',
> >>        sql="SOME QUERY",
> >>        qubole_conn_id='default',
> >>        provide_context=True,
> >>        dag=dag)
> >>
> >> setup_task.set_upstream(start_node)
> >>
> >>
> >> for k in k_per_subdag:
> >>    child_name = SUBDAG_NAME_PREFIX + str(k)
> >>
> >>    branch = SubDagOperator(
> >>        subdag=sub_dag(child_name, default_args, dag.start_date,
> >> dag.schedule_interval, k),
> >>        default_args=default_args,
> >>        task_id=child_name,
> >>        dag=dag
> >>    )
> >>
> >>    branch.set_upstream(setup_task)
> >>    branch.set_downstream(end_node)
> >>
>

Reply via email to