yiqijiu commented on code in PR #31998:
URL: https://github.com/apache/airflow/pull/31998#discussion_r1238841224
##########
airflow/executors/celery_executor_utils.py:
##########
@@ -249,16 +249,20 @@ def _get_many_from_kv_backend(self, async_tasks) ->
Mapping[str, EventBufferValu
task_results_by_task_id = {task_result["task_id"]: task_result for
task_result in task_results}
return self._prepare_state_and_info_by_task_dict(task_ids,
task_results_by_task_id)
-
- def _get_many_from_db_backend(self, async_tasks) -> Mapping[str,
EventBufferValueType]:
- task_ids = self._tasks_list_to_task_ids(async_tasks)
+
+ @retry
+ def _query_task_cls_from_db_backend(self,task_ids,**kwargs):
session = app.backend.ResultSession()
task_cls = getattr(app.backend, "task_cls", TaskDb)
with session_cleanup(session):
- tasks =
session.query(task_cls).filter(task_cls.task_id.in_(task_ids)).all()
+ return
session.query(task_cls).filter(task_cls.task_id.in_(task_ids)).all()
+ def _get_many_from_db_backend(self, async_tasks) -> Mapping[str,
EventBufferValueType]:
+ task_ids = self._tasks_list_to_task_ids(async_tasks)
+ tasks = self._query_task_cls_from_db_backend(task_ids)
task_results = [app.backend.meta_from_decoded(task.to_dict()) for task
in tasks]
task_results_by_task_id = {task_result["task_id"]: task_result for
task_result in task_results}
+
Review Comment:
Thank you for the advice. This is my first time submitting a PR to the
project, some specifications are not done, sorry, I will add the code and check
the changes according to the static code later
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]