Most of the (known) issues will be fixed in 1.8.1, which we are working on 
getting out the door. 

I'm not sure what can cause the below issue, and will investigate when I have 
some time. 

Bolke

Sent from my iPhone

> On 21 Apr 2017, at 16:50, Paul Zaczkiewicz <[email protected]> wrote:
> 
> There are major outstanding issues with SubDagOperator. in 1.8.0 separate
> from what you're experiencing here.  Sometimes a SubDag run will just hang,
> and you can't re-run portions of the SubDag without re-running the entire
> SubDag. I'd recommend against using SubDags in 1.8.0.
> 
> I'm not sure what's causing your Zoom Into Subdag issues though.
> 
> On Wed, Apr 19, 2017 at 6:44 AM, Devjyoti Patra <[email protected]>
> wrote:
> 
>> I am not able to make SubdagOperator work with CeleryExecutor. The
>> following code works well on my local setup (with LocalExecutor), but on
>> production, when clicking on "Zoom into Sub Dag", Airflow complains that
>> dag with the name  "parent_name.child_name". is not found. Please tell me
>> what did I miss out in my implementation.
>> 
>> Thanks,
>> Devj
>> 
>> 
>> default_args = {
>>    'owner': 'airflow',
>>    'start_date': datetime.strptime('${date_str}', '%Y-%m-%d'),
>>    'email': ['${email_list}'],
>>    'email_on_failure': True,
>>    'email_on_retry': True,
>>    'retries': 1,
>>    'retry_delay': timedelta(minutes=5),
>>    'queue': 'default'}
>> 
>> def sub_dag(child_dag_name, default_args, start_date, schedule_interval,
>> kas):
>>    subdag = DAG(
>>        '%s.%s' % (PARENT_DAG_NAME, child_dag_name),
>>        default_args=default_args,
>>        schedule_interval=schedule_interval,
>>        start_date=start_date,
>>    )
>> 
>>    fork = DummyOperator(task_id='discovery_fork_' + str(k),
>>                                 dag=subdag)
>>    #The JOIN task has to be changed for writing to RDS
>>     join = BashOperator(
>>            task_id='join_' + str(k),
>>            bash_command='echo "more wait for subdag..."',
>>            default_args=default_args,
>>            dag=subdag
>>      )
>> 
>> for k in kas:
>> task = QuboleOperator(
>> 
>>               task_id='task_' + str(k),
>>               command_type='sparkcmd',
>>               sql="SOME QUERY",
>>               qubole_conn_id='default',
>>               provide_context=True,
>>               dag=subdag)
>> 
>>         task.set_upstream(fork)
>>         task.set_downstream(join)
>> 
>> 
>> return subdag
>> 
>> #Airflow pipeline is created below
>> dag = DAG(PARENT_DAG_NAME,
>>        default_args=default_args,
>>        schedule_interval='@hourly')
>> 
>> 
>> 
>> start_node = DummyOperator(task_id='start',
>>                           dag=dag)
>> 
>> 
>> end_node = DummyOperator(task_id='end',
>>                         dag=dag)
>> 
>> 
>> 
>> setup_task = QuboleOperator(
>>        task_id='setup_task',
>>        command_type='sparkcmd',
>>        sql="SOME QUERY",
>>        qubole_conn_id='default',
>>        provide_context=True,
>>        dag=dag)
>> 
>> setup_task.set_upstream(start_node)
>> 
>> 
>> for k in k_per_subdag:
>>    child_name = SUBDAG_NAME_PREFIX + str(k)
>> 
>>    branch = SubDagOperator(
>>        subdag=sub_dag(child_name, default_args, dag.start_date,
>> dag.schedule_interval, k),
>>        default_args=default_args,
>>        task_id=child_name,
>>        dag=dag
>>    )
>> 
>>    branch.set_upstream(setup_task)
>>    branch.set_downstream(end_node)
>> 

Reply via email to