Greetings Airflowers. I'm evaluating Airflow 1.9.0 for our distributed
orchestration needs (using CeleryExecutor and RabbitMQ), and I am seeing
something strange.
I made a dag that has three main stages: 1) start, 2) fan out and run N tasks
concurrently, 3) finish.
N can be large, maybe up to 10K. I would expect to see N tasks get dumped onto
the Rabbit queue when stage 2 begins. Instead I am seeing only a few hundred
added at a time. As the workers process the tasks and the queue gets smaller,
then more get added to Celery/Rabbit. Eventually, it does finish, however I
would really prefer that it dump ALL the work (all 10K tasks) into Celery
immediately, for two reasons:
1. The current way makes the scheduler long-lived and stateful. The
scheduler might die after only 5K have completed, in which case the remaining
5K tasks would never get added (I verified this)
2. I want to use the size of the Rabbit queue as metric to trigger
autoscaling events to add more workers. So I need a true picture of how much
outstanding work remains (10K, not a few hundred)
I assume the scheduler has some kind of throttle that keeps it from dumping all
10K messages simultaneously? If so is this configurable?
FYI I have already set “parallelism” to 10K in the airflow.cfg
Here is my test dag:
# This dag tests how well airflow fans out
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('fan_out', default_args=default_args, schedule_interval=None)
num_tasks = 1000
starting = BashOperator(
task_id='starting',
bash_command='echo starting',
dag=dag
)
all_done = BashOperator(
task_id='all_done',
bash_command='echo all done',
dag=dag)
for i in range(0, num_tasks):
task = BashOperator(
task_id='say_hello_' + str(i),
bash_command='echo hello world',
dag=dag)
task.set_upstream(starting)
task.set_downstream(all_done)
--
Regards,
Kevin Pauli
This email and any attachments were sent from a Monsanto email account and may
contain confidential and/or privileged information. If you are not the intended
recipient, please contact the sender and delete this email and any attachments
immediately. Any unauthorized use, including disclosing, printing, storing,
copying or distributing this email, is prohibited. All emails and attachments
sent to or from Monsanto email accounts may be subject to monitoring, reading,
and archiving by Monsanto, including its affiliates and subsidiaries, as
permitted by applicable law. Thank you.