This is an automated email from the ASF dual-hosted git repository. turbaszek pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push: new c12e33e Use consistent message in SchedulerJob._process_executor_events (#9929) c12e33e is described below commit c12e33efa99357cfd017cecf4a4852c7b2961a1d Author: Tomek Urbaszek <turbas...@gmail.com> AuthorDate: Mon Jul 27 13:50:50 2020 +0200 Use consistent message in SchedulerJob._process_executor_events (#9929) --- airflow/jobs/scheduler_job.py | 13 +++++-------- tests/jobs/test_scheduler_job.py | 6 ++++-- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index f7a2a53..146be5c 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -1544,7 +1544,7 @@ class SchedulerJob(BaseJob): if not tis_with_right_state: return - # Check state of finishes tasks + # Check state of finished tasks filter_for_tis = TI.filter_for_tis(tis_with_right_state) tis: List[TI] = session.query(TI).filter(filter_for_tis).all() for ti in tis: @@ -1555,17 +1555,14 @@ class SchedulerJob(BaseJob): # TODO: should we fail RUNNING as well, as we do in Backfills? if ti.try_number == buffer_key.try_number and ti.state == State.QUEUED: Stats.incr('scheduler.tasks.killed_externally') - self.log.error( - "Executor reports task instance %s finished (%s) although the task says its %s. " - "(Info: %s) Was the task killed externally?", - ti, state, ti.state, info - ) + msg = "Executor reports task instance %s finished (%s) although the " \ + "task says its %s. (Info: %s) Was the task killed externally?" + self.log.error(msg, ti, state, ti.state, info) simple_dag = simple_dag_bag.get_dag(ti.dag_id) self.processor_agent.send_callback_to_execute( full_filepath=simple_dag.full_filepath, task_instance=ti, - msg=f"Executor reports task instance finished ({state}) although the " - f"task says its {ti.state}. (Info: {info}) Was the task killed externally?" + msg=msg % (ti, state, ti.state, info), ) def _execute(self) -> None: diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index d8a86d0..0020ead 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -1473,8 +1473,10 @@ class TestSchedulerJob(unittest.TestCase): scheduler.processor_agent.send_callback_to_execute.assert_called_once_with( full_filepath='/test_path1/', task_instance=mock.ANY, - msg='Executor reports task instance finished (failed) ' - 'although the task says its queued. (Info: None) Was the task killed externally?' + msg='Executor reports task instance ' + '<TaskInstance: test_process_executor_events.dummy_task 2016-01-01 00:00:00+00:00 [queued]> ' + 'finished (failed) although the task says its queued. (Info: None) ' + 'Was the task killed externally?' ) scheduler.processor_agent.reset_mock()