[
https://issues.apache.org/jira/browse/AIRFLOW-2372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chao-Han Tsai reassigned AIRFLOW-2372:
--------------------------------------
Assignee: Chao-Han Tsai
> SubDAGs should share dag_concurrency of parent DAG
> --------------------------------------------------
>
> Key: AIRFLOW-2372
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2372
> Project: Apache Airflow
> Issue Type: Wish
> Components: subdag
> Affects Versions: 1.9.0
> Environment: 1.9.0
> a local scheduler and LocalExecutor, and parallelism = 32, dag_concurrency =
> 16
> Reporter: Xiao Zhu
> Assignee: Chao-Han Tsai
> Priority: Major
>
> It seems like right now subDAGs are scheduled just like normal DAGs, so if a
> DAG has a lot of (parallel) subDAGs with each having a lot of operators,
> triggering that DAG means those subDAGs will get triggered as normal DAGs,
> and they can easily take all the resources (limited by parallelism) of the
> scheduler, and other DAGs have to wait for those subDAGs.
> For example, if I have this DAG, with a local scheduler and LocalExecutor,
> and parallelism = 32, dag_concurrency = 16
> {code:python}
> from airflow.operators.dummy_operator import DummyOperator
> from airflow.operators.python_operator import PythonOperator
> from airflow.operators.subdag_operator import SubDagOperator
> NUM_SUBDAGS = 20
> NUM_OPS_PER_SUBDAG = 10
> def logging_func(id):
> log.info("Now running id: {}".format(id))
> def build_dag(dag_id, num_ops):
> dag = DAG(dag_id)
> start_op = DummyOperator(task_id='start', dag=dag)
> for i in range(num_ops):
> op = PythonOperator(
> task_id=str(i),
> python_callable=logging_func,
> op_args=[i],
> dag=dag
> )
> start_op >> op
> return dag
> parent_id = 'consistent_failure'
> with DAG(
> parent_id
> ) as dag:
> start_op = DummyOperator(task_id='start')
> for i in range(NUM_SUBDAGS):
> task_id = "subdag_{}".format(i)
> op = SubDagOperator(
> task_id=task_id,
> subdag=build_dag("{}.{}".format(parent_id, task_id), NUM_OPS_PER_SUBDAG)
> )
> start_op >> op
> {code}
> When I trigger this DAG, Airflow tries to run a lot of the subDAGs at the
> same time, and since they don't share the dag_concurrency with their parent
> DAG, each of them tries to run all their operators in parallel at the same
> time too, which results in 500+ python processes created by Airflow.
> Ideally those subDAGs should share dag_concurrency with their parent DAG (and
> thus with each other too), so when I trigger this DAG, at any time only up to
> 16 operators, including the ones in the subDAGs, are running.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)