This is an automated email from the ASF dual-hosted git repository.
turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new c12e33e Use consistent message in
SchedulerJob._process_executor_events (#9929)
c12e33e is described below
commit c12e33efa99357cfd017cecf4a4852c7b2961a1d
Author: Tomek Urbaszek <[email protected]>
AuthorDate: Mon Jul 27 13:50:50 2020 +0200
Use consistent message in SchedulerJob._process_executor_events (#9929)
---
airflow/jobs/scheduler_job.py | 13 +++++--------
tests/jobs/test_scheduler_job.py | 6 ++++--
2 files changed, 9 insertions(+), 10 deletions(-)
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index f7a2a53..146be5c 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1544,7 +1544,7 @@ class SchedulerJob(BaseJob):
if not tis_with_right_state:
return
- # Check state of finishes tasks
+ # Check state of finished tasks
filter_for_tis = TI.filter_for_tis(tis_with_right_state)
tis: List[TI] = session.query(TI).filter(filter_for_tis).all()
for ti in tis:
@@ -1555,17 +1555,14 @@ class SchedulerJob(BaseJob):
# TODO: should we fail RUNNING as well, as we do in Backfills?
if ti.try_number == buffer_key.try_number and ti.state ==
State.QUEUED:
Stats.incr('scheduler.tasks.killed_externally')
- self.log.error(
- "Executor reports task instance %s finished (%s) although
the task says its %s. "
- "(Info: %s) Was the task killed externally?",
- ti, state, ti.state, info
- )
+ msg = "Executor reports task instance %s finished (%s)
although the " \
+ "task says its %s. (Info: %s) Was the task killed
externally?"
+ self.log.error(msg, ti, state, ti.state, info)
simple_dag = simple_dag_bag.get_dag(ti.dag_id)
self.processor_agent.send_callback_to_execute(
full_filepath=simple_dag.full_filepath,
task_instance=ti,
- msg=f"Executor reports task instance finished ({state})
although the "
- f"task says its {ti.state}. (Info: {info}) Was the
task killed externally?"
+ msg=msg % (ti, state, ti.state, info),
)
def _execute(self) -> None:
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index d8a86d0..0020ead 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -1473,8 +1473,10 @@ class TestSchedulerJob(unittest.TestCase):
scheduler.processor_agent.send_callback_to_execute.assert_called_once_with(
full_filepath='/test_path1/',
task_instance=mock.ANY,
- msg='Executor reports task instance finished (failed) '
- 'although the task says its queued. (Info: None) Was the task
killed externally?'
+ msg='Executor reports task instance '
+ '<TaskInstance: test_process_executor_events.dummy_task
2016-01-01 00:00:00+00:00 [queued]> '
+ 'finished (failed) although the task says its queued. (Info:
None) '
+ 'Was the task killed externally?'
)
scheduler.processor_agent.reset_mock()