[ 
https://issues.apache.org/jira/browse/AIRFLOW-928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Uri Shamay updated AIRFLOW-928:
-------------------------------
    Description: 
Hi,

When using with Airflow with CeleryExecutor, both RabbitMQ && Redis I tested, I 
see that when workers are down, the scheduler run each period of time 
**append** to the same key of {task,execution_date} in the broker, the same 
{task,execution_date}, what means is that if workers are down/can't connect to 
broker for few hours, I got in the broker thousands of same executions.
In my scenario I have just one dummy dag to run with dag_concurrency of 4,
I expected in that scenario that broker will hold just 4 messages, and the 
scheduler shouldn't queuing another and another and another for same {task, 
execution_date}

What happened is that when workers start to consume messages, they got 
thousands of tasks for just 4 tasks, and when they trying to write to database 
for task_instances - there are errors of integrity while such 
{task,execution_date} already exist.

Note that in my test after let Airflow to consume works of just one dag without 
workers for few hours, then I connect to the broker outside by custom client 
and retrieve the messages - there was thousands of same {dag,execution_date}.
Even if the case is that there are a lot of dag works on the same key that run 
just one instance when poll thousands - it's still bad behavior, better to 
produce one message to the queue, and if some timeout occurred (like 
visibility), to set the key - and not append to it. 
What happened is when workers are down for long time and have a lot of jobs 
that scheduled each minute, when workers come back, they got thousands of same 
jobs => cause to the worker to run the same dags a lot of times => a lot of 
wasted python runners => utilized all celery worker threads/processes => starve 
all other jobs till he understood that need just one instance from all same.

Attached files:

1. airflow.log - this is the task log, you can see that few instances processes 
of same {task,execution_date} write to the same log file.

2. worker.log - this is the worker log, you can see that worker trying to run 
same {task,execution_date} multiple times + the errors from the database 
integrity that said that those tasks on those dates already exists.

3. scheduler.log to show that scheduler decided to send again and again and 
again infinitely the same {job,execution_date}

4. the dummy_dag.py of the test

5. rabbitmq.queue - show that after 5 minutes the broker queue contains 40 
messages of same 4 {job,execution_date}

6. dag_runs.png - show that there are only 4 jobs that need to be run, while 
there are much more messages in the queue

7. processes.list - show that when start worker and doing: ps -ef | grep 
"airflow run", it show that worker run multiple times same {job,execution_date}

Thanks.

  was:
Hi,

When using with Airflow with CeleryExecutor, both RabbitMQ && Redis I tested, I 
see that when workers are down, the scheduler run each period of time 
**append** to the same key of {task,execution_date} in the broker, the same 
{task,execution_date}, what means is that if workers are down/can't connect to 
broker for few hours, I got in the broker thousands of same executions.
In my scenario I have just one dummy dag to run with dag_concurrency of 4,
I expected in that scenario that broker will hold just 4 messages, and the 
scheduler shouldn't queuing another and another and another for same {task, 
execution_date}

What happened is that when workers start to consume messages, they got 
thousands of tasks for just 4 tasks, and when they trying to write to database 
for task_instances - there are errors of integrity while such 
{task,execution_date} already exist.

Attached files:

1. airflow.log - this is the task log, you can see that few instances processes 
of same {task,execution_date} write to the same log file.

2. worker.log - this is the worker log, you can see that worker trying to run 
same {task,execution_date} multiple times + the errors from the database 
integrity that said that those tasks on those dates already exists.

3. scheduler.log to show that scheduler decided to send again and again and 
again infinitely the same {job,execution_date}

4. the dummy_dag.py of the test

5. rabbitmq.queue - show that after 5 minutes the broker queue contains 40 
messages of same 4 {job,execution_date}

6. dag_runs.png - show that there are only 4 jobs that need to be run, while 
there are much more messages in the queue

Thanks.


> Same {task,execution_date} run multiple times in worker when using 
> CeleryExecutor
> ---------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-928
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-928
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: celery
>    Affects Versions: Airflow 1.7.1.3
>         Environment: Docker
>            Reporter: Uri Shamay
>         Attachments: airflow.log, dag_runs.png, dummy_dag.py, processes.list, 
> rabbitmq.queue, scheduler_2.log, scheduler.log, worker.log
>
>
> Hi,
> When using with Airflow with CeleryExecutor, both RabbitMQ && Redis I tested, 
> I see that when workers are down, the scheduler run each period of time 
> **append** to the same key of {task,execution_date} in the broker, the same 
> {task,execution_date}, what means is that if workers are down/can't connect 
> to broker for few hours, I got in the broker thousands of same executions.
> In my scenario I have just one dummy dag to run with dag_concurrency of 4,
> I expected in that scenario that broker will hold just 4 messages, and the 
> scheduler shouldn't queuing another and another and another for same {task, 
> execution_date}
> What happened is that when workers start to consume messages, they got 
> thousands of tasks for just 4 tasks, and when they trying to write to 
> database for task_instances - there are errors of integrity while such 
> {task,execution_date} already exist.
> Note that in my test after let Airflow to consume works of just one dag 
> without workers for few hours, then I connect to the broker outside by custom 
> client and retrieve the messages - there was thousands of same 
> {dag,execution_date}.
> Even if the case is that there are a lot of dag works on the same key that 
> run just one instance when poll thousands - it's still bad behavior, better 
> to produce one message to the queue, and if some timeout occurred (like 
> visibility), to set the key - and not append to it. 
> What happened is when workers are down for long time and have a lot of jobs 
> that scheduled each minute, when workers come back, they got thousands of 
> same jobs => cause to the worker to run the same dags a lot of times => a lot 
> of wasted python runners => utilized all celery worker threads/processes => 
> starve all other jobs till he understood that need just one instance from all 
> same.
> Attached files:
> 1. airflow.log - this is the task log, you can see that few instances 
> processes of same {task,execution_date} write to the same log file.
> 2. worker.log - this is the worker log, you can see that worker trying to run 
> same {task,execution_date} multiple times + the errors from the database 
> integrity that said that those tasks on those dates already exists.
> 3. scheduler.log to show that scheduler decided to send again and again and 
> again infinitely the same {job,execution_date}
> 4. the dummy_dag.py of the test
> 5. rabbitmq.queue - show that after 5 minutes the broker queue contains 40 
> messages of same 4 {job,execution_date}
> 6. dag_runs.png - show that there are only 4 jobs that need to be run, while 
> there are much more messages in the queue
> 7. processes.list - show that when start worker and doing: ps -ef | grep 
> "airflow run", it show that worker run multiple times same 
> {job,execution_date}
> Thanks.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to