[ https://issues.apache.org/jira/browse/AIRFLOW-2372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Xiao Zhu updated AIRFLOW-2372: ------------------------------ Summary: SubDAGs should share dag_concurrency of parent DAG (was: SubDAGs should share parallelism of parent DAG) > SubDAGs should share dag_concurrency of parent DAG > -------------------------------------------------- > > Key: AIRFLOW-2372 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2372 > Project: Apache Airflow > Issue Type: Wish > Affects Versions: Airflow 1.9.0 > Environment: 1.9.0 > a local scheduler and LocalExecutor, and parallelism = 32, dag_concurrency = > 16 > Reporter: Xiao Zhu > Priority: Major > > It seems like right now subDAGs are scheduled just like normal DAGs, so if a > DAG has a lot of (parallel) subDAGs with each having a lot of operators, > triggering that DAG means those subDAGs will gets triggered as normal DAGs, > and they can easily take all the resources (limited by dag_concurrency) of > the scheduler, and other DAGs have to wait for those subDAGs. > For example, if I have this DAG, with a local scheduler and LocalExecutor, > and parallelism = 32, dag_concurrency = 16 > {code:python} > from airflow.operators.dummy_operator import DummyOperator > from airflow.operators.python_operator import PythonOperator > from airflow.operators.subdag_operator import SubDagOperator > NUM_SUBDAGS = 20 > NUM_OPS_PER_SUBDAG = 10 > def logging_func(id): > log.info("Now running id: {}".format(id)) > def build_dag(dag_id, num_ops): > dag = DAG(dag_id) > start_op = DummyOperator(task_id='start', dag=dag) > for i in range(num_ops): > op = PythonOperator( > task_id=str(i), > python_callable=logging_func, > op_args=[i], > dag=dag > ) > start_op >> op > return dag > parent_id = 'consistent_failure' > with DAG( > parent_id > ) as dag: > start_op = DummyOperator(task_id='start') > for i in range(NUM_SUBDAGS): > task_id = "subdag_{}".format(i) > op = SubDagOperator( > task_id=task_id, > subdag=build_dag("{}.{}".format(parent_id, task_id), NUM_OPS_PER_SUBDAG) > ) > start_op >> op > {code} > When I trigger this DAG, Airflow tries to run a lot of the subDAGs at the > same time, and since they don't share the parallelism with their parent DAG, > each of them tries to run all their operators in parallel at the same time > too, which results in 500+ python processes created by Airflow. > Ideally those subDAGs should share parallelism with their parent DAG (and > thus with each other too), so when I trigger this DAG, at any time only up to > 32 operators, including the ones in the subDAGs, are running. -- This message was sent by Atlassian JIRA (v7.6.3#76005)