This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e396f06041 Pass SSL arg to all requests in DruidOperator (#39066)
e396f06041 is described below
commit e396f06041e51691ae09751c23ef57fd9a03df22
Author: Daniel Bell <[email protected]>
AuthorDate: Sun May 5 08:19:50 2024 +0200
Pass SSL arg to all requests in DruidOperator (#39066)
* Pass SSL arg to all requests in DruidOperator
* Remove unneeded test
* Lint
* Fix test
* Fix tests
* Add true test as per dirrao's comment
* Use call_count == 1
---------
Co-authored-by: Daniel Bell <[email protected]>
---
airflow/providers/apache/druid/hooks/druid.py | 6 ++-
tests/providers/apache/druid/hooks/test_druid.py | 60 ++++++++++++++++++++----
2 files changed, 56 insertions(+), 10 deletions(-)
diff --git a/airflow/providers/apache/druid/hooks/druid.py
b/airflow/providers/apache/druid/hooks/druid.py
index da678d0153..a79b494f32 100644
--- a/airflow/providers/apache/druid/hooks/druid.py
+++ b/airflow/providers/apache/druid/hooks/druid.py
@@ -144,13 +144,15 @@ class DruidHook(BaseHook):
sec = 0
while running:
- req_status = requests.get(druid_task_status_url,
auth=self.get_auth())
+ req_status = requests.get(druid_task_status_url,
auth=self.get_auth(), verify=self.get_verify())
self.log.info("Job still running for %s seconds...", sec)
if self.max_ingestion_time and sec > self.max_ingestion_time:
# ensure that the job gets killed if the max ingestion time is
exceeded
- requests.post(f"{url}/{druid_task_id}/shutdown",
auth=self.get_auth())
+ requests.post(
+ f"{url}/{druid_task_id}/shutdown", auth=self.get_auth(),
verify=self.get_verify()
+ )
raise AirflowException(f"Druid ingestion took more than
{self.max_ingestion_time} seconds")
time.sleep(self.timeout)
diff --git a/tests/providers/apache/druid/hooks/test_druid.py
b/tests/providers/apache/druid/hooks/test_druid.py
index 2b40264455..0d42695f06 100644
--- a/tests/providers/apache/druid/hooks/test_druid.py
+++ b/tests/providers/apache/druid/hooks/test_druid.py
@@ -99,25 +99,69 @@ class TestDruidSubmitHook:
assert task_post.call_count == 1
assert status_check.call_count == 1
- def test_submit_with_correct_ssl_arg(self, requests_mock):
+ def test_submit_with_false_ssl_arg(self, requests_mock):
+ # Timeout so that all three requests are sent
+ self.db_hook.timeout = 1
+ self.db_hook.max_ingestion_time = 5
self.db_hook.verify_ssl = False
+
task_post = requests_mock.post(
"http://druid-overlord:8081/druid/indexer/v1/task",
text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}',
)
status_check = requests_mock.get(
"http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status",
- text='{"status":{"status": "SUCCESS"}}',
+ text='{"status":{"status": "RUNNING"}}',
+ )
+ shutdown_post = requests_mock.post(
+ "http://druid-overlord:8081/druid/indexer/v1/task/"
+ "9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/shutdown",
+ text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}',
)
- self.db_hook.submit_indexing_job("Long json file")
+ with pytest.raises(AirflowException):
+ self.db_hook.submit_indexing_job("Long json file")
- # PGH005: false positive on ``requests_mock`` argument `called_once`
assert task_post.call_count == 1
- assert status_check.call_count == 1
- if task_post.called_once:
- verify_ssl = task_post.request_history[0].verify
- assert False is verify_ssl
+ assert False is task_post.request_history[0].verify
+
+ assert status_check.call_count > 1
+ assert False is status_check.request_history[0].verify
+
+ assert shutdown_post.call_count == 1
+ assert False is shutdown_post.request_history[0].verify
+
+ def test_submit_with_true_ssl_arg(self, requests_mock):
+ # Timeout so that all three requests are sent
+ self.db_hook.timeout = 1
+ self.db_hook.max_ingestion_time = 5
+ self.db_hook.verify_ssl = True
+
+ task_post = requests_mock.post(
+ "http://druid-overlord:8081/druid/indexer/v1/task",
+ text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}',
+ )
+ status_check = requests_mock.get(
+
"http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status",
+ text='{"status":{"status": "RUNNING"}}',
+ )
+ shutdown_post = requests_mock.post(
+ "http://druid-overlord:8081/druid/indexer/v1/task/"
+ "9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/shutdown",
+ text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}',
+ )
+
+ with pytest.raises(AirflowException):
+ self.db_hook.submit_indexing_job("Long json file")
+
+ assert task_post.call_count == 1
+ assert True is task_post.request_history[0].verify
+
+ assert status_check.call_count > 1
+ assert True is status_check.request_history[0].verify
+
+ assert shutdown_post.call_count == 1
+ assert True is shutdown_post.request_history[0].verify
def test_submit_correct_json_body(self, requests_mock):
task_post = requests_mock.post(