It sounds like you may be getting bottlenecked by executor concurrency settings.
Are you using default values for the other concurrency settings, specifically the ones <https://stackoverflow.com/questions/50737800/how-many-tasks-can-be-scheduled-in-a-single-airflow-dag/50743825#50743825> mentioned here? If you increase the other ones to be very high as well, do you still experience the issue? Taylor *Taylor Edmiston* Blog <https://blog.tedmiston.com/> | CV <https://stackoverflow.com/cv/taylor> | LinkedIn <https://www.linkedin.com/in/tedmiston/> | AngelList <https://angel.co/taylor> | Stack Overflow <https://stackoverflow.com/users/149428/taylor-edmiston> On Mon, Jun 25, 2018 at 1:40 PM, PAULI, KEVIN CHRISTIAN [AG/1000] < [email protected]> wrote: > Greetings Airflowers. I'm evaluating Airflow 1.9.0 for our distributed > orchestration needs (using CeleryExecutor and RabbitMQ), and I am seeing > something strange. > > I made a dag that has three main stages: 1) start, 2) fan out and run N > tasks concurrently, 3) finish. > > N can be large, maybe up to 10K. I would expect to see N tasks get dumped > onto the Rabbit queue when stage 2 begins. Instead I am seeing only a few > hundred added at a time. As the workers process the tasks and the queue > gets smaller, then more get added to Celery/Rabbit. Eventually, it does > finish, however I would really prefer that it dump ALL the work (all 10K > tasks) into Celery immediately, for two reasons: > > > 1. The current way makes the scheduler long-lived and stateful. The > scheduler might die after only 5K have completed, in which case the > remaining 5K tasks would never get added (I verified this) > 2. I want to use the size of the Rabbit queue as metric to trigger > autoscaling events to add more workers. So I need a true picture of how > much outstanding work remains (10K, not a few hundred) > > I assume the scheduler has some kind of throttle that keeps it from > dumping all 10K messages simultaneously? If so is this configurable? > > FYI I have already set “parallelism” to 10K in the airflow.cfg > > Here is my test dag: > > # This dag tests how well airflow fans out > > from airflow import DAG > from datetime import datetime, timedelta > > from airflow.operators.bash_operator import BashOperator > > default_args = { > 'owner': 'airflow', > 'depends_on_past': False, > 'start_date': datetime(2015, 6, 1), > 'email': ['[email protected]<mailto:[email protected]>'], > 'email_on_failure': False, > 'email_on_retry': False, > 'retries': 1, > 'retry_delay': timedelta(minutes=5), > } > > dag = DAG('fan_out', default_args=default_args, schedule_interval=None) > > num_tasks = 1000 > > starting = BashOperator( > task_id='starting', > bash_command='echo starting', > dag=dag > ) > > all_done = BashOperator( > task_id='all_done', > bash_command='echo all done', > dag=dag) > > for i in range(0, num_tasks): > task = BashOperator( > task_id='say_hello_' + str(i), > bash_command='echo hello world', > dag=dag) > task.set_upstream(starting) > task.set_downstream(all_done) > > > > -- > Regards, > Kevin Pauli > > This email and any attachments were sent from a Monsanto email account and > may contain confidential and/or privileged information. If you are not the > intended recipient, please contact the sender and delete this email and any > attachments immediately. Any unauthorized use, including disclosing, > printing, storing, copying or distributing this email, is prohibited. All > emails and attachments sent to or from Monsanto email accounts may be > subject to monitoring, reading, and archiving by Monsanto, including its > affiliates and subsidiaries, as permitted by applicable law. Thank you. >
