[ https://issues.apache.org/jira/browse/AIRFLOW-2372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Xiao Zhu updated AIRFLOW-2372: ------------------------------ Description: It seems like right now subDAGs are scheduled just like normal DAGs, so if a DAG has a lot of (parallel) subDAGs with each having a lot of operators, triggering that DAG means those subDAGs will get triggered as normal DAGs, and they can easily take all the resources (limited by dag_concurrency) of the scheduler, and other DAGs have to wait for those subDAGs. For example, if I have this DAG, with a local scheduler and LocalExecutor, and parallelism = 32, dag_concurrency = 16 {code:python} from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import PythonOperator from airflow.operators.subdag_operator import SubDagOperator NUM_SUBDAGS = 20 NUM_OPS_PER_SUBDAG = 10 def logging_func(id): log.info("Now running id: {}".format(id)) def build_dag(dag_id, num_ops): dag = DAG(dag_id) start_op = DummyOperator(task_id='start', dag=dag) for i in range(num_ops): op = PythonOperator( task_id=str(i), python_callable=logging_func, op_args=[i], dag=dag ) start_op >> op return dag parent_id = 'consistent_failure' with DAG( parent_id ) as dag: start_op = DummyOperator(task_id='start') for i in range(NUM_SUBDAGS): task_id = "subdag_{}".format(i) op = SubDagOperator( task_id=task_id, subdag=build_dag("{}.{}".format(parent_id, task_id), NUM_OPS_PER_SUBDAG) ) start_op >> op {code} When I trigger this DAG, Airflow tries to run a lot of the subDAGs at the same time, and since they don't share the dag_concurrency with their parent DAG, each of them tries to run all their operators in parallel at the same time too, which results in 500+ python processes created by Airflow. Ideally those subDAGs should share dag_concurrency with their parent DAG (and thus with each other too), so when I trigger this DAG, at any time only up to 16 operators, including the ones in the subDAGs, are running. was: It seems like right now subDAGs are scheduled just like normal DAGs, so if a DAG has a lot of (parallel) subDAGs with each having a lot of operators, triggering that DAG means those subDAGs will gets triggered as normal DAGs, and they can easily take all the resources (limited by dag_concurrency) of the scheduler, and other DAGs have to wait for those subDAGs. For example, if I have this DAG, with a local scheduler and LocalExecutor, and parallelism = 32, dag_concurrency = 16 {code:python} from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import PythonOperator from airflow.operators.subdag_operator import SubDagOperator NUM_SUBDAGS = 20 NUM_OPS_PER_SUBDAG = 10 def logging_func(id): log.info("Now running id: {}".format(id)) def build_dag(dag_id, num_ops): dag = DAG(dag_id) start_op = DummyOperator(task_id='start', dag=dag) for i in range(num_ops): op = PythonOperator( task_id=str(i), python_callable=logging_func, op_args=[i], dag=dag ) start_op >> op return dag parent_id = 'consistent_failure' with DAG( parent_id ) as dag: start_op = DummyOperator(task_id='start') for i in range(NUM_SUBDAGS): task_id = "subdag_{}".format(i) op = SubDagOperator( task_id=task_id, subdag=build_dag("{}.{}".format(parent_id, task_id), NUM_OPS_PER_SUBDAG) ) start_op >> op {code} When I trigger this DAG, Airflow tries to run a lot of the subDAGs at the same time, and since they don't share the dag_concurrency with their parent DAG, each of them tries to run all their operators in parallel at the same time too, which results in 500+ python processes created by Airflow. Ideally those subDAGs should share dag_concurrency with their parent DAG (and thus with each other too), so when I trigger this DAG, at any time only up to 16 operators, including the ones in the subDAGs, are running. > SubDAGs should share dag_concurrency of parent DAG > -------------------------------------------------- > > Key: AIRFLOW-2372 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2372 > Project: Apache Airflow > Issue Type: Wish > Affects Versions: Airflow 1.9.0 > Environment: 1.9.0 > a local scheduler and LocalExecutor, and parallelism = 32, dag_concurrency = > 16 > Reporter: Xiao Zhu > Priority: Major > > It seems like right now subDAGs are scheduled just like normal DAGs, so if a > DAG has a lot of (parallel) subDAGs with each having a lot of operators, > triggering that DAG means those subDAGs will get triggered as normal DAGs, > and they can easily take all the resources (limited by dag_concurrency) of > the scheduler, and other DAGs have to wait for those subDAGs. > For example, if I have this DAG, with a local scheduler and LocalExecutor, > and parallelism = 32, dag_concurrency = 16 > {code:python} > from airflow.operators.dummy_operator import DummyOperator > from airflow.operators.python_operator import PythonOperator > from airflow.operators.subdag_operator import SubDagOperator > NUM_SUBDAGS = 20 > NUM_OPS_PER_SUBDAG = 10 > def logging_func(id): > log.info("Now running id: {}".format(id)) > def build_dag(dag_id, num_ops): > dag = DAG(dag_id) > start_op = DummyOperator(task_id='start', dag=dag) > for i in range(num_ops): > op = PythonOperator( > task_id=str(i), > python_callable=logging_func, > op_args=[i], > dag=dag > ) > start_op >> op > return dag > parent_id = 'consistent_failure' > with DAG( > parent_id > ) as dag: > start_op = DummyOperator(task_id='start') > for i in range(NUM_SUBDAGS): > task_id = "subdag_{}".format(i) > op = SubDagOperator( > task_id=task_id, > subdag=build_dag("{}.{}".format(parent_id, task_id), NUM_OPS_PER_SUBDAG) > ) > start_op >> op > {code} > When I trigger this DAG, Airflow tries to run a lot of the subDAGs at the > same time, and since they don't share the dag_concurrency with their parent > DAG, each of them tries to run all their operators in parallel at the same > time too, which results in 500+ python processes created by Airflow. > Ideally those subDAGs should share dag_concurrency with their parent DAG (and > thus with each other too), so when I trigger this DAG, at any time only up to > 16 operators, including the ones in the subDAGs, are running. -- This message was sent by Atlassian JIRA (v7.6.3#76005)