mik-laj commented on a change in pull request #9488:
URL: https://github.com/apache/airflow/pull/9488#discussion_r446853441
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1476,28 +1476,32 @@ def _process_executor_events(self, simple_dag_bag,
session=None):
"""
Respond to executor events.
"""
- # TODO: this shares quite a lot of code with _manage_executor_state
- for key, value in
self.executor.get_event_buffer(simple_dag_bag.dag_ids).items():
+ event_buffer = self.executor.get_event_buffer(simple_dag_bag.dag_ids)
+ tis_with_right_state: List[TaskInstanceKeyType] = []
+
+ # Report execution
+ for key, value in event_buffer.items():
state, info = value
dag_id, task_id, execution_date, try_number = key
self.log.info(
"Executor reports execution of %s.%s execution_date=%s "
"exited with status %s for try_number %s",
dag_id, task_id, execution_date, state, try_number
)
- if state not in (State.FAILED, State.SUCCESS):
- continue
+ if state in (State.FAILED, State.SUCCESS):
+ tis_with_right_state.append(key)
- # Process finished tasks
- qry = session.query(TI).filter(
- TI.dag_id == dag_id,
- TI.task_id == task_id,
- TI.execution_date == execution_date
- )
- ti = qry.first()
- if not ti:
- self.log.warning("TaskInstance %s went missing from the
database", ti)
- continue
Review comment:
```
self.log.warning("<TaskInstance(dag_id=%s, task_id=%s,
execution_date=%s)> went missing from the database", dag_id, task_id,
execution_date)
```
What do you think to display filter parameters instead of an object when the
object does not exist?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]