This is an automated email from the ASF dual-hosted git repository.
kamilbregula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new fc862a3 Do not create a separate process for one task in
CeleryExecutor (#8855)
fc862a3 is described below
commit fc862a3edd010e65b9b3fe586855fe81807ee4e8
Author: Kamil BreguĊa <[email protected]>
AuthorDate: Thu May 14 06:01:13 2020 +0200
Do not create a separate process for one task in CeleryExecutor (#8855)
---
airflow/executors/celery_executor.py | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)
diff --git a/airflow/executors/celery_executor.py
b/airflow/executors/celery_executor.py
index 98fff4b..9ad762e 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -170,11 +170,11 @@ class CeleryExecutor(BaseExecutor):
task_tuples_to_send.append((key, simple_ti, command, queue,
execute_command))
if task_tuples_to_send:
- tasks = [t[4] for t in task_tuples_to_send]
+ first_task = next(t[4] for t in task_tuples_to_send)
# Celery state queries will stuck if we do not use one same backend
# for all tasks.
- cached_celery_backend = tasks[0].backend
+ cached_celery_backend = first_task.backend
key_and_async_results =
self._send_tasks_to_celery(task_tuples_to_send)
self.log.debug('Sent all tasks.')
@@ -194,6 +194,11 @@ class CeleryExecutor(BaseExecutor):
self.last_state[key] = celery_states.PENDING
def _send_tasks_to_celery(self, task_tuples_to_send):
+ if len(task_tuples_to_send) == 1:
+ # One tuple, so send it in the main thread.
+ return [
+ send_task_to_executor(task_tuples_to_send[0])
+ ]
# Use chunks instead of a work queue to reduce context switching
# since tasks are roughly uniform in size
chunksize = self._num_tasks_per_send_process(len(task_tuples_to_send))