[ 
https://issues.apache.org/jira/browse/AIRFLOW-2372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Zhu updated AIRFLOW-2372:
------------------------------
    Description: 
It seems like right now subDAGs are scheduled just like normal DAGs, so if a 
DAG has a lot of (parallel) subDAGs with each having a lot of operators, 
triggering that DAG means those subDAGs will gets triggered as normal DAGs, and 
they can easily take all the resources (limited by dag_concurrency) of the 
scheduler, and other DAGs have to wait for those subDAGs.

For example, if I have this DAG, with a local scheduler and LocalExecutor, and 
parallelism = 32, dag_concurrency = 16

{code:python}
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator

NUM_SUBDAGS = 20
NUM_OPS_PER_SUBDAG = 10


def logging_func(id):
  log.info("Now running id: {}".format(id))


def build_dag(dag_id, num_ops):
  dag = DAG(dag_id)

  start_op = DummyOperator(task_id='start', dag=dag)

  for i in range(num_ops):
    op = PythonOperator(
      task_id=str(i),
      python_callable=logging_func,
      op_args=[i],
      dag=dag
    )

    start_op >> op

  return dag

parent_id = 'consistent_failure'
with DAG(
  parent_id
) as dag:

  start_op = DummyOperator(task_id='start')

  for i in range(NUM_SUBDAGS):
    task_id = "subdag_{}".format(i)
    op = SubDagOperator(
      task_id=task_id,
      subdag=build_dag("{}.{}".format(parent_id, task_id), NUM_OPS_PER_SUBDAG)
    )

    start_op >> op
{code}

When I trigger this DAG, Airflow tries to run a lot of the subDAGs at the same 
time, and since they don't share the parallelism with their parent DAG, each of 
them tries to run all their operators in parallel at the same time too, which 
results in 500+ python processes created by Airflow.

Ideally those subDAGs should share parallelism with their parent DAG (and thus 
with each other too), so when I trigger this DAG, at any time only up to 32 
operators, including the ones in the subDAGs, are running.

  was:
It seems like right now subDAGs are scheduled just like normal DAGs, so if a 
DAG has a lot of (parallel) subDAGs with each having a lot of operators, 
triggering that DAG means those subDAGs will gets triggered as normal DAGs, and 
they can easily take all the resources (limited by dag_concurrency) of the 
scheduler, and other DAGs have to wait for those subDAGs.

For example, if I have this DAG, with a local scheduler and LocalExecutor, and 
parallelism = 32, dag_concurrency = 16
{quote}
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator

NUM_SUBDAGS = 20
NUM_OPS_PER_SUBDAG = 10


def logging_func(id):
  log.info("Now running id: {}".format(id))


def build_dag(dag_id, num_ops):
  dag = DAG(dag_id)

  start_op = DummyOperator(task_id='start', dag=dag)

  for i in range(num_ops):
    op = PythonOperator(
      task_id=str(i),
      python_callable=logging_func,
      op_args=[i],
      dag=dag
    )

    start_op >> op

  return dag

parent_id = 'consistent_failure'
with DAG(
  parent_id
) as dag:

  start_op = DummyOperator(task_id='start')

  for i in range(NUM_SUBDAGS):
    task_id = "subdag_{}".format(i)
    op = SubDagOperator(
      task_id=task_id,
      subdag=build_dag("{}.{}".format(parent_id, task_id), NUM_OPS_PER_SUBDAG)
    )

    start_op >> op
{quote}

When I trigger this DAG, Airflow tries to run a lot of the subDAGs at the same 
time, and since they don't share the parallelism with their parent DAG, each of 
them tries to run all their operators in parallel at the same time too, which 
results in 500+ python processes created by Airflow.

Ideally those subDAGs should share parallelism with their parent DAG (and thus 
with each other too), so when I trigger this DAG, at any time only up to 32 
operators, including the ones in the subDAGs, are running.


> SubDAGs should share parallelism of parent DAG
> ----------------------------------------------
>
>                 Key: AIRFLOW-2372
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2372
>             Project: Apache Airflow
>          Issue Type: Wish
>    Affects Versions: Airflow 1.9.0
>         Environment: 1.9.0
> a local scheduler and LocalExecutor, and parallelism = 32, dag_concurrency = 
> 16
>            Reporter: Xiao Zhu
>            Priority: Major
>
> It seems like right now subDAGs are scheduled just like normal DAGs, so if a 
> DAG has a lot of (parallel) subDAGs with each having a lot of operators, 
> triggering that DAG means those subDAGs will gets triggered as normal DAGs, 
> and they can easily take all the resources (limited by dag_concurrency) of 
> the scheduler, and other DAGs have to wait for those subDAGs.
> For example, if I have this DAG, with a local scheduler and LocalExecutor, 
> and parallelism = 32, dag_concurrency = 16
> {code:python}
> from airflow.operators.dummy_operator import DummyOperator
> from airflow.operators.python_operator import PythonOperator
> from airflow.operators.subdag_operator import SubDagOperator
> NUM_SUBDAGS = 20
> NUM_OPS_PER_SUBDAG = 10
> def logging_func(id):
>   log.info("Now running id: {}".format(id))
> def build_dag(dag_id, num_ops):
>   dag = DAG(dag_id)
>   start_op = DummyOperator(task_id='start', dag=dag)
>   for i in range(num_ops):
>     op = PythonOperator(
>       task_id=str(i),
>       python_callable=logging_func,
>       op_args=[i],
>       dag=dag
>     )
>     start_op >> op
>   return dag
> parent_id = 'consistent_failure'
> with DAG(
>   parent_id
> ) as dag:
>   start_op = DummyOperator(task_id='start')
>   for i in range(NUM_SUBDAGS):
>     task_id = "subdag_{}".format(i)
>     op = SubDagOperator(
>       task_id=task_id,
>       subdag=build_dag("{}.{}".format(parent_id, task_id), NUM_OPS_PER_SUBDAG)
>     )
>     start_op >> op
> {code}
> When I trigger this DAG, Airflow tries to run a lot of the subDAGs at the 
> same time, and since they don't share the parallelism with their parent DAG, 
> each of them tries to run all their operators in parallel at the same time 
> too, which results in 500+ python processes created by Airflow.
> Ideally those subDAGs should share parallelism with their parent DAG (and 
> thus with each other too), so when I trigger this DAG, at any time only up to 
> 32 operators, including the ones in the subDAGs, are running.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to