This is an automated email from the ASF dual-hosted git repository.

turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new c12e33e  Use consistent message in 
SchedulerJob._process_executor_events (#9929)
c12e33e is described below

commit c12e33efa99357cfd017cecf4a4852c7b2961a1d
Author: Tomek Urbaszek <turbas...@gmail.com>
AuthorDate: Mon Jul 27 13:50:50 2020 +0200

    Use consistent message in SchedulerJob._process_executor_events (#9929)
---
 airflow/jobs/scheduler_job.py    | 13 +++++--------
 tests/jobs/test_scheduler_job.py |  6 ++++--
 2 files changed, 9 insertions(+), 10 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index f7a2a53..146be5c 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1544,7 +1544,7 @@ class SchedulerJob(BaseJob):
         if not tis_with_right_state:
             return
 
-        # Check state of finishes tasks
+        # Check state of finished tasks
         filter_for_tis = TI.filter_for_tis(tis_with_right_state)
         tis: List[TI] = session.query(TI).filter(filter_for_tis).all()
         for ti in tis:
@@ -1555,17 +1555,14 @@ class SchedulerJob(BaseJob):
             # TODO: should we fail RUNNING as well, as we do in Backfills?
             if ti.try_number == buffer_key.try_number and ti.state == 
State.QUEUED:
                 Stats.incr('scheduler.tasks.killed_externally')
-                self.log.error(
-                    "Executor reports task instance %s finished (%s) although 
the task says its %s. "
-                    "(Info: %s) Was the task killed externally?",
-                    ti, state, ti.state, info
-                )
+                msg = "Executor reports task instance %s finished (%s) 
although the " \
+                      "task says its %s. (Info: %s) Was the task killed 
externally?"
+                self.log.error(msg, ti, state, ti.state, info)
                 simple_dag = simple_dag_bag.get_dag(ti.dag_id)
                 self.processor_agent.send_callback_to_execute(
                     full_filepath=simple_dag.full_filepath,
                     task_instance=ti,
-                    msg=f"Executor reports task instance finished ({state}) 
although the "
-                        f"task says its {ti.state}. (Info: {info}) Was the 
task killed externally?"
+                    msg=msg % (ti, state, ti.state, info),
                 )
 
     def _execute(self) -> None:
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index d8a86d0..0020ead 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -1473,8 +1473,10 @@ class TestSchedulerJob(unittest.TestCase):
         
scheduler.processor_agent.send_callback_to_execute.assert_called_once_with(
             full_filepath='/test_path1/',
             task_instance=mock.ANY,
-            msg='Executor reports task instance finished (failed) '
-                'although the task says its queued. (Info: None) Was the task 
killed externally?'
+            msg='Executor reports task instance '
+                '<TaskInstance: test_process_executor_events.dummy_task 
2016-01-01 00:00:00+00:00 [queued]> '
+                'finished (failed) although the task says its queued. (Info: 
None) '
+                'Was the task killed externally?'
         )
         scheduler.processor_agent.reset_mock()
 

Reply via email to