Repository: incubator-airflow Updated Branches: refs/heads/master 938da987c -> c61726288
[AIRFLOW-1658] Kill Druid task on timeout If the total execution time of a Druid task exceeds the max timeout defined, the Airflow task fails, but the Druid task may still keep running. This can cause undesired behaviour if Airflow retries the task. This patch calls the shutdown endpoint on the Druid task to kill any still running Druid task. This commit also adds tests to ensure that all mocked requests in the Druid hook are actually called. Closes #2644 from danielvdende/kill_druid_task_on_timeout_exceeded Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c6172628 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c6172628 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c6172628 Branch: refs/heads/master Commit: c61726288dcdb093c55a38faaf60aef020d0d3e0 Parents: 938da98 Author: Daniel van der Ende <[email protected]> Authored: Mon Oct 2 17:09:07 2017 +0200 Committer: Bolke de Bruin <[email protected]> Committed: Mon Oct 2 17:09:07 2017 +0200 ---------------------------------------------------------------------- airflow/hooks/druid_hook.py | 2 ++ tests/hooks/test_druid_hook.py | 33 +++++++++++++++++++++++++-------- 2 files changed, 27 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c6172628/airflow/hooks/druid_hook.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py index 0b13670..655f666 100644 --- a/airflow/hooks/druid_hook.py +++ b/airflow/hooks/druid_hook.py @@ -73,6 +73,8 @@ class DruidHook(BaseHook): sec = sec + 1 if sec > self.max_ingestion_time: + # ensure that the job gets killed if the max ingestion time is exceeded + requests.post("{0}/{1}/shutdown".format(url, druid_task_id)) raise AirflowException('Druid ingestion took more than %s seconds', self.max_ingestion_time) time.sleep(self.timeout) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c6172628/tests/hooks/test_druid_hook.py ---------------------------------------------------------------------- diff --git a/tests/hooks/test_druid_hook.py b/tests/hooks/test_druid_hook.py index c049cb2..ddab369 100644 --- a/tests/hooks/test_druid_hook.py +++ b/tests/hooks/test_druid_hook.py @@ -33,11 +33,11 @@ class TestDruidHook(unittest.TestCase): @requests_mock.mock() def test_submit_gone_wrong(self, m): hook = DruidHook() - m.post( + task_post = m.post( 'http://druid-overlord:8081/druid/indexer/v1/task', text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}' ) - m.get( + status_check = m.get( 'http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status', text='{"status":{"status": "FAILED"}}' ) @@ -46,14 +46,17 @@ class TestDruidHook(unittest.TestCase): with self.assertRaises(AirflowException): hook.submit_indexing_job('Long json file') + self.assertTrue(task_post.called_once) + self.assertTrue(status_check.called_once) + @requests_mock.mock() def test_submit_ok(self, m): hook = DruidHook() - m.post( + task_post = m.post( 'http://druid-overlord:8081/druid/indexer/v1/task', text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}' ) - m.get( + status_check = m.get( 'http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status', text='{"status":{"status": "SUCCESS"}}' ) @@ -61,14 +64,17 @@ class TestDruidHook(unittest.TestCase): # Exists just as it should hook.submit_indexing_job('Long json file') + self.assertTrue(task_post.called_once) + self.assertTrue(status_check.called_once) + @requests_mock.mock() def test_submit_unknown_response(self, m): hook = DruidHook() - m.post( + task_post = m.post( 'http://druid-overlord:8081/druid/indexer/v1/task', text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}' ) - m.get( + status_check = m.get( 'http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status', text='{"status":{"status": "UNKNOWN"}}' ) @@ -77,22 +83,33 @@ class TestDruidHook(unittest.TestCase): with self.assertRaises(AirflowException): hook.submit_indexing_job('Long json file') + self.assertTrue(task_post.called_once) + self.assertTrue(status_check.called_once) + @requests_mock.mock() def test_submit_timeout(self, m): hook = DruidHook(timeout=0, max_ingestion_time=5) - m.post( + task_post = m.post( 'http://druid-overlord:8081/druid/indexer/v1/task', text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}' ) - m.get( + status_check = m.get( 'http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status', text='{"status":{"status": "RUNNING"}}' ) + shutdown_post = m.post( + 'http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/shutdown', + text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}' + ) # Because the jobs keeps running with self.assertRaises(AirflowException): hook.submit_indexing_job('Long json file') + self.assertTrue(task_post.called_once) + self.assertTrue(status_check.called) + self.assertTrue(shutdown_post.called_once) +
