mik-laj commented on a change in pull request #9488:
URL: https://github.com/apache/airflow/pull/9488#discussion_r446853441



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1476,28 +1476,32 @@ def _process_executor_events(self, simple_dag_bag, 
session=None):
         """
         Respond to executor events.
         """
-        # TODO: this shares quite a lot of code with _manage_executor_state
-        for key, value in 
self.executor.get_event_buffer(simple_dag_bag.dag_ids).items():
+        event_buffer = self.executor.get_event_buffer(simple_dag_bag.dag_ids)
+        tis_with_right_state: List[TaskInstanceKeyType] = []
+
+        # Report execution
+        for key, value in event_buffer.items():
             state, info = value
             dag_id, task_id, execution_date, try_number = key
             self.log.info(
                 "Executor reports execution of %s.%s execution_date=%s "
                 "exited with status %s for try_number %s",
                 dag_id, task_id, execution_date, state, try_number
             )
-            if state not in (State.FAILED, State.SUCCESS):
-                continue
+            if state in (State.FAILED, State.SUCCESS):
+                tis_with_right_state.append(key)
 
-            # Process finished tasks
-            qry = session.query(TI).filter(
-                TI.dag_id == dag_id,
-                TI.task_id == task_id,
-                TI.execution_date == execution_date
-            )
-            ti = qry.first()
-            if not ti:
-                self.log.warning("TaskInstance %s went missing from the 
database", ti)
-                continue

Review comment:
       ```
                   self.log.warning("<TaskInstance(dag_id=%s, task_id=%s, 
execution_date=%s)> went missing from the database", dag_id, task_id, 
execution_date)
   ```
   What do you think to display filter parameters instead of an object when the 
object does not exist?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to