hussein-awala commented on code in PR #31998:
URL: https://github.com/apache/airflow/pull/31998#discussion_r1237443057


##########
airflow/executors/celery_executor_utils.py:
##########
@@ -249,16 +249,20 @@ def _get_many_from_kv_backend(self, async_tasks) -> 
Mapping[str, EventBufferValu
         task_results_by_task_id = {task_result["task_id"]: task_result for 
task_result in task_results}
 
         return self._prepare_state_and_info_by_task_dict(task_ids, 
task_results_by_task_id)
-
-    def _get_many_from_db_backend(self, async_tasks) -> Mapping[str, 
EventBufferValueType]:
-        task_ids = self._tasks_list_to_task_ids(async_tasks)
+    
+    @retry
+    def _query_task_cls_from_db_backend(self,task_ids,**kwargs):
         session = app.backend.ResultSession()
         task_cls = getattr(app.backend, "task_cls", TaskDb)
         with session_cleanup(session):
-            tasks = 
session.query(task_cls).filter(task_cls.task_id.in_(task_ids)).all()
+            return 
session.query(task_cls).filter(task_cls.task_id.in_(task_ids)).all()
 
+    def _get_many_from_db_backend(self, async_tasks) -> Mapping[str, 
EventBufferValueType]:
+        task_ids = self._tasks_list_to_task_ids(async_tasks)
+        tasks = self._query_task_cls_from_db_backend(task_ids)
         task_results = [app.backend.meta_from_decoded(task.to_dict()) for task 
in tasks]
         task_results_by_task_id = {task_result["task_id"]: task_result for 
task_result in task_results}
+        

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to