[ 
https://issues.apache.org/jira/browse/AIRFLOW-2372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Zhu updated AIRFLOW-2372:
------------------------------
    Summary: SubDAGs should share dag_concurrency of parent DAG  (was: SubDAGs 
should share parallelism of parent DAG)

> SubDAGs should share dag_concurrency of parent DAG
> --------------------------------------------------
>
>                 Key: AIRFLOW-2372
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2372
>             Project: Apache Airflow
>          Issue Type: Wish
>    Affects Versions: Airflow 1.9.0
>         Environment: 1.9.0
> a local scheduler and LocalExecutor, and parallelism = 32, dag_concurrency = 
> 16
>            Reporter: Xiao Zhu
>            Priority: Major
>
> It seems like right now subDAGs are scheduled just like normal DAGs, so if a 
> DAG has a lot of (parallel) subDAGs with each having a lot of operators, 
> triggering that DAG means those subDAGs will gets triggered as normal DAGs, 
> and they can easily take all the resources (limited by dag_concurrency) of 
> the scheduler, and other DAGs have to wait for those subDAGs.
> For example, if I have this DAG, with a local scheduler and LocalExecutor, 
> and parallelism = 32, dag_concurrency = 16
> {code:python}
> from airflow.operators.dummy_operator import DummyOperator
> from airflow.operators.python_operator import PythonOperator
> from airflow.operators.subdag_operator import SubDagOperator
> NUM_SUBDAGS = 20
> NUM_OPS_PER_SUBDAG = 10
> def logging_func(id):
>   log.info("Now running id: {}".format(id))
> def build_dag(dag_id, num_ops):
>   dag = DAG(dag_id)
>   start_op = DummyOperator(task_id='start', dag=dag)
>   for i in range(num_ops):
>     op = PythonOperator(
>       task_id=str(i),
>       python_callable=logging_func,
>       op_args=[i],
>       dag=dag
>     )
>     start_op >> op
>   return dag
> parent_id = 'consistent_failure'
> with DAG(
>   parent_id
> ) as dag:
>   start_op = DummyOperator(task_id='start')
>   for i in range(NUM_SUBDAGS):
>     task_id = "subdag_{}".format(i)
>     op = SubDagOperator(
>       task_id=task_id,
>       subdag=build_dag("{}.{}".format(parent_id, task_id), NUM_OPS_PER_SUBDAG)
>     )
>     start_op >> op
> {code}
> When I trigger this DAG, Airflow tries to run a lot of the subDAGs at the 
> same time, and since they don't share the parallelism with their parent DAG, 
> each of them tries to run all their operators in parallel at the same time 
> too, which results in 500+ python processes created by Airflow.
> Ideally those subDAGs should share parallelism with their parent DAG (and 
> thus with each other too), so when I trigger this DAG, at any time only up to 
> 32 operators, including the ones in the subDAGs, are running.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to