ashb commented on a change in pull request #3830: [AIRFLOW-2156] Parallelize 
Celery Executor
URL: https://github.com/apache/incubator-airflow/pull/3830#discussion_r215552551
 
 

 ##########
 File path: airflow/executors/celery_executor.py
 ##########
 @@ -85,30 +139,67 @@ def execute_async(self, key, command,
             args=[command], queue=queue)
         self.last_state[key] = celery_states.PENDING
 
+    def _num_tasks_per_process(self):
+        """
+        How many Celery tasks should be sent to each worker process.
+        :return: Number of tasks that should be used per process
+        :rtype: int
+        """
+        return max(1,
+                   int(math.ceil(1.0 * len(self.tasks) / 
self._sync_parallelism)))
+
     def sync(self):
-        self.log.debug("Inquiring about %s celery task(s)", len(self.tasks))
-        for key, task in list(self.tasks.items()):
-            try:
-                state = task.state
-                if self.last_state[key] != state:
-                    if state == celery_states.SUCCESS:
-                        self.success(key)
-                        del self.tasks[key]
-                        del self.last_state[key]
-                    elif state == celery_states.FAILURE:
-                        self.fail(key)
-                        del self.tasks[key]
-                        del self.last_state[key]
-                    elif state == celery_states.REVOKED:
-                        self.fail(key)
-                        del self.tasks[key]
-                        del self.last_state[key]
-                    else:
-                        self.log.info("Unexpected state: %s", state)
-                        self.last_state[key] = state
-            except Exception as e:
-                self.log.error("Error syncing the celery executor, ignoring 
it:")
-                self.log.exception(e)
+        num_processes = min(len(self.tasks), self._sync_parallelism)
+        if num_processes == 0:
+            self.log.debug("No task to query celery, skipping sync")
+            return
+
+        self.log.debug("Inquiring about %s celery task(s) using %s processes",
+                       len(self.tasks), num_processes)
+
+        # Recreate the process pool each sync in case processes in the pool die
+        self._sync_pool = Pool(processes=num_processes)
+
+        # Use chunking instead of a work queue to reduce context switching 
since tasks are
+        # roughly uniform in size
+        chunksize = self._num_tasks_per_process()
+
+        self.log.debug("Waiting for inquiries to complete...")
+        task_keys_to_states = self._sync_pool.map(
+            fetch_celery_task_state,
+            self.tasks.items(),
+            chunksize=chunksize)
+        self._sync_pool.close()
+        self._sync_pool.join()
+        self.log.debug("Inquiries completed.")
+
+        for key_and_state in task_keys_to_states:
+            if isinstance(key_and_state, ExceptionWithTraceback):
+                self.log.error(
+                    CELERY_FETCH_ERR_MSG_HEADER + ", ignoring 
it:{}\n{}\n".format(
+                        key_and_state.exception, key_and_state.traceback))
+            else:
 
 Review comment:
   Rather than an `else:` block here, if we add a `continue` on the line above 
then we can un-indent the whole block  following this (L182-202) - I aim for 
shallower indents/less nesting where possible as I find it easier to follow the 
code as a result.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to