RNHTTR commented on a change in pull request #14883:
URL: https://github.com/apache/airflow/pull/14883#discussion_r603577662
##########
File path: airflow/executors/celery_executor.py
##########
@@ -543,20 +539,27 @@ def __init__(self, sync_parallelism=None):
super().__init__()
self._sync_parallelism = sync_parallelism
+ def _tasks_list_to_task_ids(self, async_tasks) -> Set[str]:
+ return {a.task_id for a in async_tasks}
+
def get_many(self, async_results) -> Mapping[str, EventBufferValueType]:
"""Gets status for many Celery tasks using the best method
available."""
if isinstance(app.backend, BaseKeyValueStoreBackend):
result = self._get_many_from_kv_backend(async_results)
- return result
- if isinstance(app.backend, DatabaseBackend):
+ elif isinstance(app.backend, DatabaseBackend):
result = self._get_many_from_db_backend(async_results)
- return result
- result = self._get_many_using_multiprocessing(async_results)
- self.log.debug("Fetched %d states for %d task", len(result),
len(async_results))
+ else:
+ async_results = list(async_results) if isinstance(async_results,
map) else async_results
Review comment:
That's probably the most intuitive fix, but in the case of the
recommended backends (i.e. `BaseKeyValueStoreBackend` or `DatabaseBackend`,
we'd be converting it to a list and then immediately creating a set based off
that list instead of just creating the set based on the map object. Do you
think that could have a potential performance hit if a lot of tasks are running?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]