This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v1-10-test by this push:
new 87000c8 [AIRFLOW-5902] avoid unnecessary sleep to maintain local task
job heart rate (#6553)
87000c8 is described below
commit 87000c8a485d443178490a28243118c3c9609d61
Author: Qingping Hou <[email protected]>
AuthorDate: Wed Dec 4 02:43:38 2019 -0800
[AIRFLOW-5902] avoid unnecessary sleep to maintain local task job heart
rate (#6553)
sleep to maintain heart rate is already done by the hearbeat() call
(cherry picked from commit dbf81df24444205f36268745b76e575292c91be8)
---
airflow/jobs/base_job.py | 21 ++++++-------
airflow/jobs/local_task_job.py | 8 -----
tests/jobs/test_local_task_job.py | 65 ++++++++++++++++++++++++++++++++++-----
3 files changed, 66 insertions(+), 28 deletions(-)
diff --git a/airflow/jobs/base_job.py b/airflow/jobs/base_job.py
index 7c0a847..1135696 100644
--- a/airflow/jobs/base_job.py
+++ b/airflow/jobs/base_job.py
@@ -158,7 +158,7 @@ class BaseJob(Base, LoggingMixin):
This also allows for any job to be killed externally, regardless
of who is running it or on which machine it is running.
- Note that if your heartbeat is set to 60 seconds and you call this
+ Note that if your heart rate is set to 60 seconds and you call this
method after 10 seconds of processing since the last heartbeat, it
will sleep 50 seconds to complete the 60 seconds and keep a steady
heart rate. If you go over 60 seconds before calling it, it won't
@@ -175,17 +175,14 @@ class BaseJob(Base, LoggingMixin):
if self.state == State.SHUTDOWN:
self.kill()
- is_unit_test = conf.getboolean('core', 'unit_test_mode')
- if not is_unit_test:
- # Figure out how long to sleep for
- sleep_for = 0
- if self.latest_heartbeat:
- seconds_remaining = self.heartrate - \
- (timezone.utcnow() - self.latest_heartbeat)\
- .total_seconds()
- sleep_for = max(0, seconds_remaining)
-
- sleep(sleep_for)
+ # Figure out how long to sleep for
+ sleep_for = 0
+ if self.latest_heartbeat:
+ seconds_remaining = self.heartrate - \
+ (timezone.utcnow() - self.latest_heartbeat)\
+ .total_seconds()
+ sleep_for = max(0, seconds_remaining)
+ sleep(sleep_for)
# Update last heartbeat time
with create_session() as session:
diff --git a/airflow/jobs/local_task_job.py b/airflow/jobs/local_task_job.py
index 353bf03..9398af7 100644
--- a/airflow/jobs/local_task_job.py
+++ b/airflow/jobs/local_task_job.py
@@ -24,7 +24,6 @@ from __future__ import unicode_literals
import os
import signal
-import time
from airflow.configuration import conf
from airflow.exceptions import AirflowException
@@ -116,13 +115,6 @@ class LocalTaskJob(BaseJob):
"exceeded limit ({}s)."
.format(time_since_last_heartbeat,
heartbeat_time_limit))
-
- if time_since_last_heartbeat < self.heartrate:
- sleep_for = self.heartrate - time_since_last_heartbeat
- self.log.debug("Time since last heartbeat(%.2f s) <
heartrate(%s s)"
- ", sleeping for %s s",
time_since_last_heartbeat,
- self.heartrate, sleep_for)
- time.sleep(sleep_for)
finally:
self.on_kill()
diff --git a/tests/jobs/test_local_task_job.py
b/tests/jobs/test_local_task_job.py
index 69a4369..0955825 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -46,6 +46,9 @@ DEFAULT_DATE = timezone.datetime(2016, 1, 1)
class LocalTaskJobTest(unittest.TestCase):
def setUp(self):
clear_db_runs()
+ patcher = patch('airflow.jobs.base_job.sleep')
+ self.addCleanup(patcher.stop)
+ self.mock_base_job_sleep = patcher.start()
def test_localtaskjob_essential_attr(self):
"""
@@ -114,7 +117,7 @@ class LocalTaskJobTest(unittest.TestCase):
session.merge(ti)
session.commit()
- job1.heartbeat_callback()
+ job1.heartbeat_callback(session=None)
mock_pid.return_value = 2
self.assertRaises(AirflowException, job1.heartbeat_callback)
@@ -125,11 +128,7 @@ class LocalTaskJobTest(unittest.TestCase):
Test that task heartbeat will sleep when it fails fast
"""
mock_getpid.return_value = 1
-
- heartbeat_records = []
-
- def heartbeat_recorder(**kwargs):
- heartbeat_records.append(timezone.utcnow())
+ self.mock_base_job_sleep.side_effect = time.sleep
with create_session() as session:
dagbag = models.DagBag(
@@ -155,9 +154,10 @@ class LocalTaskJobTest(unittest.TestCase):
job = LocalTaskJob(task_instance=ti,
executor=MockExecutor(do_update=False))
job.heartrate = 2
- job.heartbeat_callback = heartbeat_recorder
+ heartbeat_records = []
+ job.heartbeat_callback = lambda session:
heartbeat_records.append(job.latest_heartbeat)
job._execute()
- self.assertGreater(len(heartbeat_records), 1)
+ self.assertGreater(len(heartbeat_records), 2)
for i in range(1, len(heartbeat_records)):
time1 = heartbeat_records[i - 1]
time2 = heartbeat_records[i]
@@ -354,3 +354,52 @@ class LocalTaskJobTest(unittest.TestCase):
self.assertTrue(data['called'])
process.join(timeout=10)
self.assertFalse(process.is_alive())
+
+ def test_localtaskjob_maintain_heart_rate(self):
+ dagbag = models.DagBag(
+ dag_folder=TEST_DAG_FOLDER,
+ include_examples=False,
+ )
+ dag = dagbag.dags.get('test_localtaskjob_double_trigger')
+ task = dag.get_task('test_localtaskjob_double_trigger_task')
+
+ session = settings.Session()
+
+ dag.clear()
+ dag.create_dagrun(run_id="test",
+ state=State.SUCCESS,
+ execution_date=DEFAULT_DATE,
+ start_date=DEFAULT_DATE,
+ session=session)
+
+ ti_run = TI(task=task, execution_date=DEFAULT_DATE)
+ ti_run.refresh_from_db()
+ job1 = LocalTaskJob(task_instance=ti_run,
+ executor=SequentialExecutor())
+
+ # this should make sure we only heartbeat once and exit at the second
+ # loop in _execute()
+ return_codes = [None, 0]
+
+ def multi_return_code():
+ return return_codes.pop(0)
+
+ time_start = time.time()
+ from airflow.task.task_runner.standard_task_runner import
StandardTaskRunner
+ with patch.object(StandardTaskRunner, 'start', return_value=None) as
mock_start:
+ with patch.object(StandardTaskRunner, 'return_code') as
mock_ret_code:
+ mock_ret_code.side_effect = multi_return_code
+ job1.run()
+ self.assertEqual(mock_start.call_count, 1)
+ self.assertEqual(mock_ret_code.call_count, 2)
+ time_end = time.time()
+
+ self.assertEqual(self.mock_base_job_sleep.call_count, 1)
+ self.assertEqual(job1.state, State.SUCCESS)
+
+ # Consider we have patched sleep call, it should not be sleeping to
+ # keep up with the heart rate in other unpatched places
+ #
+ # We already make sure patched sleep call is only called once
+ self.assertLess(time_end - time_start, job1.heartrate)
+ session.close()