[ https://issues.apache.org/jira/browse/AIRFLOW-74?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16450411#comment-16450411 ]
Tao Feng commented on AIRFLOW-74: --------------------------------- [~xzhu] , just to be clear, you could still use celeryExectuor etc for your dag level execution. But this only limits to use sequentialExector as default within the subdag. > SubdagOperators can consume all celeryd worker processes > -------------------------------------------------------- > > Key: AIRFLOW-74 > URL: https://issues.apache.org/jira/browse/AIRFLOW-74 > Project: Apache Airflow > Issue Type: Bug > Components: celery > Affects Versions: Airflow 1.7.1, Airflow 1.7.0, Airflow 1.6.2 > Environment: Airflow 1.7.1rc3 with CeleryExecutor > 1 webserver > 1 scheduler > 2 workers > Reporter: Steven Yvinec-Kruyk > Assignee: zgl > Priority: Major > Fix For: 1.10.0 > > > If the amount of concurrent ```SubdagOperator``` running >= the no. of celery > worker processes tasks are unable to work. All SDOs come to a complete halt. > Futhermore performance of a DAG is drastically reduced even before full > saturation of the workers as less workers are gradually available for actual > tasks. A workaround for this is to specify ```SequentialExecutor``` be used > by the ```SubdagOperator``` > ``` > from datetime import timedelta, datetime > from airflow.models import DAG, Pool > from airflow.operators import BashOperator, SubDagOperator, DummyOperator > from airflow.executors import SequentialExecutor > import airflow > # -----------------------------------------------------------------\ > # DEFINE THE POOLS > # -----------------------------------------------------------------/ > session = airflow.settings.Session() > for p in ['test_pool_1', 'test_pool_2', 'test_pool_3']: > pool = ( > session.query(Pool) > .filter(Pool.pool == p) > .first()) > if not pool: > session.add(Pool(pool=p, slots=8)) > session.commit() > # -----------------------------------------------------------------\ > # DEFINE THE DAG > # -----------------------------------------------------------------/ > # Define the Dag Name. This must be unique. > dag_name = 'hanging_subdags_n16_sqe' > # Default args are passed to each task > default_args = { > 'owner': 'Airflow', > 'depends_on_past': False, > 'start_date': datetime(2016, 04, 10), > 'retries': 0, > 'retry_interval': timedelta(minutes=5), > 'email': ['y...@email.com'], > 'email_on_failure': True, > 'email_on_retry': True, > 'wait_for_downstream': False, > } > # Create the dag object > dag = DAG(dag_name, > default_args=default_args, > schedule_interval='0 0 * * *' > ) > # -----------------------------------------------------------------\ > # DEFINE THE TASKS > # -----------------------------------------------------------------/ > def get_subdag(dag, sd_id, pool=None): > subdag = DAG( > dag_id='{parent_dag}.{sd_id}'.format( > parent_dag=dag.dag_id, > sd_id=sd_id), > params=dag.params, > default_args=dag.default_args, > template_searchpath=dag.template_searchpath, > user_defined_macros=dag.user_defined_macros, > ) > t1 = BashOperator( > task_id='{sd_id}_step_1'.format( > sd_id=sd_id > ), > bash_command='echo "hello" && sleep 60', > dag=subdag, > pool=pool, > executor=SequentialExecutor > ) > t2 = BashOperator( > task_id='{sd_id}_step_two'.format( > sd_id=sd_id > ), > bash_command='echo "hello" && sleep 15', > dag=subdag, > pool=pool, > executor=SequentialExecutor > ) > t2.set_upstream(t1) > sdo = SubDagOperator( > task_id=sd_id, > subdag=subdag, > retries=0, > retry_delay=timedelta(seconds=5), > dag=dag, > depends_on_past=True, > ) > return sdo > start_task = DummyOperator( > task_id='start', > dag=dag > ) > for n in range(1, 17): > sd_i = get_subdag(dag=dag, sd_id='level_1_{n}'.format(n=n), > pool='test_pool_1') > sd_ii = get_subdag(dag=dag, sd_id='level_2_{n}'.format(n=n), > pool='test_pool_2') > sd_iii = get_subdag(dag=dag, sd_id='level_3_{n}'.format(n=n), > pool='test_pool_3') > sd_i.set_upstream(start_task) > sd_ii.set_upstream(sd_i) > sd_iii.set_upstream(sd_ii) > ``` -- This message was sent by Atlassian JIRA (v7.6.3#76005)