ashb commented on a change in pull request #14883:
URL: https://github.com/apache/airflow/pull/14883#discussion_r603366574
##########
File path: airflow/executors/celery_executor.py
##########
@@ -543,20 +539,27 @@ def __init__(self, sync_parallelism=None):
super().__init__()
self._sync_parallelism = sync_parallelism
+ def _tasks_list_to_task_ids(self, async_tasks) -> Set[str]:
+ return {a.task_id for a in async_tasks}
+
def get_many(self, async_results) -> Mapping[str, EventBufferValueType]:
"""Gets status for many Celery tasks using the best method
available."""
if isinstance(app.backend, BaseKeyValueStoreBackend):
result = self._get_many_from_kv_backend(async_results)
- return result
- if isinstance(app.backend, DatabaseBackend):
+ elif isinstance(app.backend, DatabaseBackend):
result = self._get_many_from_db_backend(async_results)
- return result
- result = self._get_many_using_multiprocessing(async_results)
- self.log.debug("Fetched %d states for %d task", len(result),
len(async_results))
+ else:
+ async_results = list(async_results) if isinstance(async_results,
map) else async_results
Review comment:
The other possible fix is to change the one call that passes a map
(L478) to be `list(map(...))`
##########
File path: airflow/executors/celery_executor.py
##########
@@ -543,20 +539,27 @@ def __init__(self, sync_parallelism=None):
super().__init__()
self._sync_parallelism = sync_parallelism
+ def _tasks_list_to_task_ids(self, async_tasks) -> Set[str]:
+ return {a.task_id for a in async_tasks}
+
def get_many(self, async_results) -> Mapping[str, EventBufferValueType]:
"""Gets status for many Celery tasks using the best method
available."""
if isinstance(app.backend, BaseKeyValueStoreBackend):
result = self._get_many_from_kv_backend(async_results)
- return result
- if isinstance(app.backend, DatabaseBackend):
+ elif isinstance(app.backend, DatabaseBackend):
result = self._get_many_from_db_backend(async_results)
- return result
- result = self._get_many_using_multiprocessing(async_results)
- self.log.debug("Fetched %d states for %d task", len(result),
len(async_results))
+ else:
+ async_results = list(async_results) if isinstance(async_results,
map) else async_results
+ result = self._get_many_using_multiprocessing(async_results)
+ if logging.getLevelName(self.log.level) == "DEBUG":
+ if isinstance(async_results, map):
+ self.log.debug("Fetched state for %d task(s)", len(result))
+ else:
+ self.log.debug("Fetched %d state(s) for %d task(s)",
len(result), len(async_results))
Review comment:
```suggestion
self.log.debug("Fetched %d state(s) for %d task(s)",
len(result), len(async_results))
```
I think this should be enough, as you have already changed the type on L552.
##########
File path: airflow/executors/celery_executor.py
##########
@@ -543,20 +539,27 @@ def __init__(self, sync_parallelism=None):
super().__init__()
self._sync_parallelism = sync_parallelism
+ def _tasks_list_to_task_ids(self, async_tasks) -> Set[str]:
+ return {a.task_id for a in async_tasks}
+
def get_many(self, async_results) -> Mapping[str, EventBufferValueType]:
"""Gets status for many Celery tasks using the best method
available."""
if isinstance(app.backend, BaseKeyValueStoreBackend):
result = self._get_many_from_kv_backend(async_results)
- return result
- if isinstance(app.backend, DatabaseBackend):
+ elif isinstance(app.backend, DatabaseBackend):
result = self._get_many_from_db_backend(async_results)
- return result
- result = self._get_many_using_multiprocessing(async_results)
- self.log.debug("Fetched %d states for %d task", len(result),
len(async_results))
+ else:
+ async_results = list(async_results) if isinstance(async_results,
map) else async_results
Review comment:
```suggestion
if isinstance(async_results, map):
async_results = list(async_results)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]