This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 15192d766813ad47184db02ca4750f5d4d5b47c6
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Thu Jun 10 14:29:30 2021 +0100

    Run mini scheduler in LocalTaskJob during task exit (#16289)
    
    Currently, the chances of tasks being killed by the LocalTaskJob heartbeat 
is high.
    
    This is because, after marking a task successful/failed in Taskinstance.py 
and mini scheduler is enabled,
    we start running the mini scheduler. Whenever the mini scheduling takes 
time and meet the next job heartbeat,
    the heartbeat detects that this task has succeeded with no return code 
because LocalTaskJob.handle_task_exit
    was not called after the task succeeded. Hence, the heartbeat thinks that 
this task was externally marked failed/successful.
    
    This change resolves this by moving the mini scheduler to LocalTaskJob at 
the handle_task_exit method ensuring
    that the task will no longer be killed by the next heartbeat
    
    (cherry picked from commit 408bd26c22913af93d05aa70abc3c66c52cd4588)
---
 tests/jobs/test_local_task_job.py | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git a/tests/jobs/test_local_task_job.py 
b/tests/jobs/test_local_task_job.py
index 14c74ce..060bce8 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -842,12 +842,12 @@ class TestLocalTaskJob:
             op1 = PythonOperator(task_id='dummy', python_callable=lambda: True)
 
         session = settings.Session()
-        dag_maker.make_dagmodel(
-            has_task_concurrency_limits=False,
-            next_dagrun_create_after=dag.following_schedule(DEFAULT_DATE),
-            is_active=True,
-            is_paused=True,
-        )
+        dagmodel = dag_maker.dag_model
+        dagmodel.next_dagrun_create_after = 
dag.following_schedule(DEFAULT_DATE)
+        dagmodel.is_paused = True
+        session.merge(dagmodel)
+        session.flush()
+
         # Write Dag to DB
         dagbag = DagBag(dag_folder="/dev/null", include_examples=False, 
read_dags_from_db=False)
         dagbag.bag_dag(dag, root_dag=dag)

Reply via email to