Repository: incubator-airflow Updated Branches: refs/heads/v1-9-test d42d231ee -> cbf7add7a
[AIRFLOW-1658] Kill Druid task on timeout If the total execution time of a Druid task exceeds the max timeout defined, the Airflow task fails, but the Druid task may still keep running. This can cause undesired behaviour if Airflow retries the task. This patch calls the shutdown endpoint on the Druid task to kill any still running Druid task. This commit also adds tests to ensure that all mocked requests in the Druid hook are actually called. Closes #2644 from danielvdende/kill_druid_task_on_timeout_exceeded (cherry picked from commit c61726288dcdb093c55a38faaf60aef020d0d3e0) Signed-off-by: Bolke de Bruin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/cbf7add7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/cbf7add7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/cbf7add7 Branch: refs/heads/v1-9-test Commit: cbf7add7aa2e61d1bfe511d6a8250b63485068bb Parents: d42d231 Author: Daniel van der Ende <[email protected]> Authored: Mon Oct 2 17:09:07 2017 +0200 Committer: Bolke de Bruin <[email protected]> Committed: Mon Oct 2 17:09:25 2017 +0200 ---------------------------------------------------------------------- airflow/hooks/druid_hook.py | 2 ++ tests/hooks/test_druid_hook.py | 33 +++++++++++++++++++++++++-------- 2 files changed, 27 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cbf7add7/airflow/hooks/druid_hook.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py index 0b13670..655f666 100644 --- a/airflow/hooks/druid_hook.py +++ b/airflow/hooks/druid_hook.py @@ -73,6 +73,8 @@ class DruidHook(BaseHook): sec = sec + 1 if sec > self.max_ingestion_time: + # ensure that the job gets killed if the max ingestion time is exceeded + requests.post("{0}/{1}/shutdown".format(url, druid_task_id)) raise AirflowException('Druid ingestion took more than %s seconds', self.max_ingestion_time) time.sleep(self.timeout) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cbf7add7/tests/hooks/test_druid_hook.py ---------------------------------------------------------------------- diff --git a/tests/hooks/test_druid_hook.py b/tests/hooks/test_druid_hook.py index c049cb2..ddab369 100644 --- a/tests/hooks/test_druid_hook.py +++ b/tests/hooks/test_druid_hook.py @@ -33,11 +33,11 @@ class TestDruidHook(unittest.TestCase): @requests_mock.mock() def test_submit_gone_wrong(self, m): hook = DruidHook() - m.post( + task_post = m.post( 'http://druid-overlord:8081/druid/indexer/v1/task', text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}' ) - m.get( + status_check = m.get( 'http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status', text='{"status":{"status": "FAILED"}}' ) @@ -46,14 +46,17 @@ class TestDruidHook(unittest.TestCase): with self.assertRaises(AirflowException): hook.submit_indexing_job('Long json file') + self.assertTrue(task_post.called_once) + self.assertTrue(status_check.called_once) + @requests_mock.mock() def test_submit_ok(self, m): hook = DruidHook() - m.post( + task_post = m.post( 'http://druid-overlord:8081/druid/indexer/v1/task', text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}' ) - m.get( + status_check = m.get( 'http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status', text='{"status":{"status": "SUCCESS"}}' ) @@ -61,14 +64,17 @@ class TestDruidHook(unittest.TestCase): # Exists just as it should hook.submit_indexing_job('Long json file') + self.assertTrue(task_post.called_once) + self.assertTrue(status_check.called_once) + @requests_mock.mock() def test_submit_unknown_response(self, m): hook = DruidHook() - m.post( + task_post = m.post( 'http://druid-overlord:8081/druid/indexer/v1/task', text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}' ) - m.get( + status_check = m.get( 'http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status', text='{"status":{"status": "UNKNOWN"}}' ) @@ -77,22 +83,33 @@ class TestDruidHook(unittest.TestCase): with self.assertRaises(AirflowException): hook.submit_indexing_job('Long json file') + self.assertTrue(task_post.called_once) + self.assertTrue(status_check.called_once) + @requests_mock.mock() def test_submit_timeout(self, m): hook = DruidHook(timeout=0, max_ingestion_time=5) - m.post( + task_post = m.post( 'http://druid-overlord:8081/druid/indexer/v1/task', text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}' ) - m.get( + status_check = m.get( 'http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status', text='{"status":{"status": "RUNNING"}}' ) + shutdown_post = m.post( + 'http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/shutdown', + text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}' + ) # Because the jobs keeps running with self.assertRaises(AirflowException): hook.submit_indexing_job('Long json file') + self.assertTrue(task_post.called_once) + self.assertTrue(status_check.called) + self.assertTrue(shutdown_post.called_once) +
