This is an automated email from the ASF dual-hosted git repository. jhtimmins pushed a commit to branch v2-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 15192d766813ad47184db02ca4750f5d4d5b47c6 Author: Ephraim Anierobi <[email protected]> AuthorDate: Thu Jun 10 14:29:30 2021 +0100 Run mini scheduler in LocalTaskJob during task exit (#16289) Currently, the chances of tasks being killed by the LocalTaskJob heartbeat is high. This is because, after marking a task successful/failed in Taskinstance.py and mini scheduler is enabled, we start running the mini scheduler. Whenever the mini scheduling takes time and meet the next job heartbeat, the heartbeat detects that this task has succeeded with no return code because LocalTaskJob.handle_task_exit was not called after the task succeeded. Hence, the heartbeat thinks that this task was externally marked failed/successful. This change resolves this by moving the mini scheduler to LocalTaskJob at the handle_task_exit method ensuring that the task will no longer be killed by the next heartbeat (cherry picked from commit 408bd26c22913af93d05aa70abc3c66c52cd4588) --- tests/jobs/test_local_task_job.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py index 14c74ce..060bce8 100644 --- a/tests/jobs/test_local_task_job.py +++ b/tests/jobs/test_local_task_job.py @@ -842,12 +842,12 @@ class TestLocalTaskJob: op1 = PythonOperator(task_id='dummy', python_callable=lambda: True) session = settings.Session() - dag_maker.make_dagmodel( - has_task_concurrency_limits=False, - next_dagrun_create_after=dag.following_schedule(DEFAULT_DATE), - is_active=True, - is_paused=True, - ) + dagmodel = dag_maker.dag_model + dagmodel.next_dagrun_create_after = dag.following_schedule(DEFAULT_DATE) + dagmodel.is_paused = True + session.merge(dagmodel) + session.flush() + # Write Dag to DB dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False) dagbag.bag_dag(dag, root_dag=dag)
