[ 
https://issues.apache.org/jira/browse/AIRFLOW-74?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16450406#comment-16450406
 ] 

Tao Feng commented on AIRFLOW-74:
---------------------------------

[~xzhu] , There are some issues for subdag to use with other executors(like 
celery). So here is to change the default executor for subdag to sequential. 
You could always override the executor to something else as long as you are 
aware of the risk. Ideally we should fix and make subdag as first class. But 
that is a long term fix.

> SubdagOperators can consume all celeryd worker processes
> --------------------------------------------------------
>
>                 Key: AIRFLOW-74
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-74
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: celery
>    Affects Versions: Airflow 1.7.1, Airflow 1.7.0, Airflow 1.6.2
>         Environment: Airflow 1.7.1rc3 with CeleryExecutor
> 1  webserver
> 1 scheduler
> 2 workers 
>            Reporter: Steven Yvinec-Kruyk
>            Assignee: zgl
>            Priority: Major
>             Fix For: 1.10.0
>
>
> If the amount of concurrent ```SubdagOperator``` running >= the no. of celery 
> worker processes tasks are unable to work. All SDOs come to a complete halt. 
> Futhermore performance of a DAG is drastically reduced even before full 
> saturation of the workers as less workers are gradually available for actual 
> tasks. A workaround for this is to specify ```SequentialExecutor``` be used 
> by the ```SubdagOperator```
> ```
> from datetime import timedelta, datetime
> from airflow.models import DAG, Pool
> from airflow.operators import BashOperator, SubDagOperator, DummyOperator
> from airflow.executors import SequentialExecutor
> import airflow
> # -----------------------------------------------------------------\
> # DEFINE THE POOLS
> # -----------------------------------------------------------------/
> session = airflow.settings.Session()
> for p in ['test_pool_1', 'test_pool_2', 'test_pool_3']:
>     pool = (
>         session.query(Pool)
>         .filter(Pool.pool == p)
>         .first())
>     if not pool:
>         session.add(Pool(pool=p, slots=8))
>         session.commit()
> # -----------------------------------------------------------------\
> # DEFINE THE DAG
> # -----------------------------------------------------------------/
> # Define the Dag Name. This must be unique.
> dag_name = 'hanging_subdags_n16_sqe'
> # Default args are passed to each task
> default_args = {
>     'owner': 'Airflow',
>     'depends_on_past': False,
>     'start_date': datetime(2016, 04, 10),
>     'retries': 0,
>     'retry_interval': timedelta(minutes=5),
>     'email': ['[email protected]'],
>     'email_on_failure': True,
>     'email_on_retry': True,
>     'wait_for_downstream': False,
> }
> # Create the dag object
> dag = DAG(dag_name,
>           default_args=default_args,
>           schedule_interval='0 0 * * *'
>           )
> # -----------------------------------------------------------------\
> # DEFINE THE TASKS
> # -----------------------------------------------------------------/
> def get_subdag(dag, sd_id, pool=None):
>     subdag = DAG(
>         dag_id='{parent_dag}.{sd_id}'.format(
>             parent_dag=dag.dag_id,
>             sd_id=sd_id),
>         params=dag.params,
>         default_args=dag.default_args,
>         template_searchpath=dag.template_searchpath,
>         user_defined_macros=dag.user_defined_macros,
>     )
>     t1 = BashOperator(
>         task_id='{sd_id}_step_1'.format(
>             sd_id=sd_id
>         ),
>         bash_command='echo "hello" && sleep 60',
>         dag=subdag,
>         pool=pool,
>         executor=SequentialExecutor
>     )
>     t2 = BashOperator(
>         task_id='{sd_id}_step_two'.format(
>             sd_id=sd_id
>         ),
>         bash_command='echo "hello" && sleep 15',
>         dag=subdag,
>         pool=pool,
>         executor=SequentialExecutor
>     )
>     t2.set_upstream(t1)
>     sdo = SubDagOperator(
>         task_id=sd_id,
>         subdag=subdag,
>         retries=0,
>         retry_delay=timedelta(seconds=5),
>         dag=dag,
>         depends_on_past=True,
>     )
>     return sdo
> start_task = DummyOperator(
>     task_id='start',
>     dag=dag
> )
> for n in range(1, 17):
>     sd_i = get_subdag(dag=dag, sd_id='level_1_{n}'.format(n=n), 
> pool='test_pool_1')
>     sd_ii = get_subdag(dag=dag, sd_id='level_2_{n}'.format(n=n), 
> pool='test_pool_2')
>     sd_iii = get_subdag(dag=dag, sd_id='level_3_{n}'.format(n=n), 
> pool='test_pool_3')
>     sd_i.set_upstream(start_task)
>     sd_ii.set_upstream(sd_i)
>     sd_iii.set_upstream(sd_ii)
> ```



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to