ashb commented on a change in pull request #3830: [AIRFLOW-2156] Parallelize
Celery Executor
URL: https://github.com/apache/incubator-airflow/pull/3830#discussion_r215555183
##########
File path: airflow/executors/celery_executor.py
##########
@@ -85,30 +139,67 @@ def execute_async(self, key, command,
args=[command], queue=queue)
self.last_state[key] = celery_states.PENDING
+ def _num_tasks_per_process(self):
+ """
+ How many Celery tasks should be sent to each worker process.
+ :return: Number of tasks that should be used per process
+ :rtype: int
+ """
+ return max(1,
+ int(math.ceil(1.0 * len(self.tasks) /
self._sync_parallelism)))
+
def sync(self):
- self.log.debug("Inquiring about %s celery task(s)", len(self.tasks))
- for key, task in list(self.tasks.items()):
- try:
- state = task.state
- if self.last_state[key] != state:
- if state == celery_states.SUCCESS:
- self.success(key)
- del self.tasks[key]
- del self.last_state[key]
- elif state == celery_states.FAILURE:
- self.fail(key)
- del self.tasks[key]
- del self.last_state[key]
- elif state == celery_states.REVOKED:
- self.fail(key)
- del self.tasks[key]
- del self.last_state[key]
- else:
- self.log.info("Unexpected state: %s", state)
- self.last_state[key] = state
- except Exception as e:
- self.log.error("Error syncing the celery executor, ignoring
it:")
- self.log.exception(e)
+ num_processes = min(len(self.tasks), self._sync_parallelism)
+ if num_processes == 0:
+ self.log.debug("No task to query celery, skipping sync")
+ return
+
+ self.log.debug("Inquiring about %s celery task(s) using %s processes",
+ len(self.tasks), num_processes)
+
+ # Recreate the process pool each sync in case processes in the pool die
+ self._sync_pool = Pool(processes=num_processes)
Review comment:
Do we need to do any special handling to ensure of the SQLA connection when
using multiprocssing? (I guess to make sure we don't end up with 16 extra
connections to the DB that we don't need)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services