[
https://issues.apache.org/jira/browse/AIRFLOW-928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Uri Shamay updated AIRFLOW-928:
-------------------------------
Description:
Hi,
When using with Airflow with CeleryExecutor, both RabbitMQ && Redis I tested, I
see that when workers are down, the scheduler run each period of time
**append** to the same key of {task,execution_date} in the broker, the same
{task,execution_date}, what means is that if workers are down/can't connect to
broker for few hours, I got in the broker thousands of same executions.
In my scenario I have just one dummy dag to run with dag_concurrency of 4,
I expected in that scenario that broker will hold just 4 messages, and the
scheduler shouldn't queuing another and another and another for same {task,
execution_date}
What happened is that when workers start to consume messages, they got
thousands of tasks for just 4 tasks, and when they trying to write to database
for task_instances - there are errors of integrity while such
{task,execution_date} already exist.
Attached files:
1. airflow.log - this is the task log, you can see that few instances processes
of same {task,execution_date} write to the same log file.
2. worker.log - this is the worker log, you can see that worker trying to run
same {task,execution_date} multiple times + the errors from the database
integrity that said that those tasks on those dates already exists.
3. scheduler.log to show that scheduler decided to send again and again and
again infinitely the same {job,execution_date}
4. the dummy_dag.py of the test
5. rabbitmq.queue - show that after 5 minutes the broker queue contains 40
messages of same 4 {job,execution_date}
6. dag_runs.png - show that there are only 4 jobs that need to be run, while
there are much more messages in the queue
Thanks.
was:
Hi,
When using with Airflow with CeleryExecutor, both RabbitMQ && Redis I tested, I
see that when workers are down, the scheduler run each period of time
**append** to the same key of {task,execution_date} in the broker, the same
{task,execution_date}, what means is that if workers are down/can't connect to
broker for few hours, I got in the broker thousands of same executions.
In my scenario I have just one dummy dag to run with dag_concurrency of 4,
I expected in that scenario that broker will hold just 4 messages, and the
scheduler shouldn't queuing another and another and another for same {task,
execution_date}
What happened is that when workers start to consume messages, they got
thousands of tasks for just 4 tasks, and when they trying to write to database
for task_instances - there are errors of integrity while such
{task,execution_date} already exist.
Attached 2 files:
1. airflow.log - this is the task log, you can see that few instances processes
of same {task,execution_date} write to the same log file.
2. worker.log - this is the worker log, you can see that worker trying to run
same {task,execution_date} multiple times + the errors from the database
integrity that said that those tasks on those dates already exists.
Thanks.
> Same {task,execution_date} run multiple times in worker when using
> CeleryExecutor
> ---------------------------------------------------------------------------------
>
> Key: AIRFLOW-928
> URL: https://issues.apache.org/jira/browse/AIRFLOW-928
> Project: Apache Airflow
> Issue Type: Bug
> Components: celery
> Affects Versions: Airflow 1.7.1.3
> Environment: Docker
> Reporter: Uri Shamay
> Attachments: airflow.log, dag_runs.png, dummy_dag.py, rabbitmq.queue,
> scheduler.log, worker.log
>
>
> Hi,
> When using with Airflow with CeleryExecutor, both RabbitMQ && Redis I tested,
> I see that when workers are down, the scheduler run each period of time
> **append** to the same key of {task,execution_date} in the broker, the same
> {task,execution_date}, what means is that if workers are down/can't connect
> to broker for few hours, I got in the broker thousands of same executions.
> In my scenario I have just one dummy dag to run with dag_concurrency of 4,
> I expected in that scenario that broker will hold just 4 messages, and the
> scheduler shouldn't queuing another and another and another for same {task,
> execution_date}
> What happened is that when workers start to consume messages, they got
> thousands of tasks for just 4 tasks, and when they trying to write to
> database for task_instances - there are errors of integrity while such
> {task,execution_date} already exist.
> Attached files:
> 1. airflow.log - this is the task log, you can see that few instances
> processes of same {task,execution_date} write to the same log file.
> 2. worker.log - this is the worker log, you can see that worker trying to run
> same {task,execution_date} multiple times + the errors from the database
> integrity that said that those tasks on those dates already exists.
> 3. scheduler.log to show that scheduler decided to send again and again and
> again infinitely the same {job,execution_date}
> 4. the dummy_dag.py of the test
> 5. rabbitmq.queue - show that after 5 minutes the broker queue contains 40
> messages of same 4 {job,execution_date}
> 6. dag_runs.png - show that there are only 4 jobs that need to be run, while
> there are much more messages in the queue
> Thanks.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)