This is an automated email from the ASF dual-hosted git repository.
xddeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new b027223 Add install/uninstall api to databricks hook (#12316)
b027223 is described below
commit b0272231320a4975cc39968dec8f0abf7a5cca11
Author: Trinity Xia <[email protected]>
AuthorDate: Thu Nov 12 22:41:31 2020 -0800
Add install/uninstall api to databricks hook (#12316)
- adding install Databricks API to databricks
hook(api/2.0/libraries/install)
- adding uninstall Databricks API to databricks hook
(2.0/libraries/uninstall)
---
airflow/providers/databricks/hooks/databricks.py | 25 ++++++++++
.../providers/databricks/hooks/test_databricks.py | 56 ++++++++++++++++++++++
2 files changed, 81 insertions(+)
diff --git a/airflow/providers/databricks/hooks/databricks.py
b/airflow/providers/databricks/hooks/databricks.py
index ef71c2c..a91daee 100644
--- a/airflow/providers/databricks/hooks/databricks.py
+++ b/airflow/providers/databricks/hooks/databricks.py
@@ -43,6 +43,9 @@ GET_RUN_ENDPOINT = ('GET', 'api/2.0/jobs/runs/get')
CANCEL_RUN_ENDPOINT = ('POST', 'api/2.0/jobs/runs/cancel')
USER_AGENT_HEADER = {'user-agent': f'airflow-{__version__}'}
+INSTALL_LIBS_ENDPOINT = ('POST', 'api/2.0/libraries/install')
+UNINSTALL_LIBS_ENDPOINT = ('POST', 'api/2.0/libraries/uninstall')
+
class RunState:
"""Utility class for the run state concept of Databricks runs."""
@@ -311,6 +314,28 @@ class DatabricksHook(BaseHook): # noqa
"""
self._do_api_call(TERMINATE_CLUSTER_ENDPOINT, json)
+ def install(self, json: dict) -> None:
+ """
+ Install libraries on the cluster.
+
+ Utility function to call the ``2.0/libraries/install`` endpoint.
+
+ :param json: json dictionary containing cluster_id and an array of
library
+ :type json: dict
+ """
+ self._do_api_call(INSTALL_LIBS_ENDPOINT, json)
+
+ def uninstall(self, json: dict) -> None:
+ """
+ Uninstall libraries on the cluster.
+
+ Utility function to call the ``2.0/libraries/uninstall`` endpoint.
+
+ :param json: json dictionary containing cluster_id and an array of
library
+ :type json: dict
+ """
+ self._do_api_call(UNINSTALL_LIBS_ENDPOINT, json)
+
def _retryable_error(exception) -> bool:
return (
diff --git a/tests/providers/databricks/hooks/test_databricks.py
b/tests/providers/databricks/hooks/test_databricks.py
index 94a094b..2c3f966 100644
--- a/tests/providers/databricks/hooks/test_databricks.py
+++ b/tests/providers/databricks/hooks/test_databricks.py
@@ -55,6 +55,10 @@ GET_RUN_RESPONSE = {
NOTEBOOK_PARAMS = {"dry-run": "true", "oldest-time-to-consider":
"1457570074236"}
JAR_PARAMS = ["param1", "param2"]
RESULT_STATE = None # type: None
+LIBRARIES = [
+ {"jar": "dbfs:/mnt/libraries/library.jar"},
+ {"maven": {"coordinates": "org.jsoup:jsoup:1.7.2", "exclusions":
["slf4j:slf4j"]}},
+]
def run_now_endpoint(host):
@@ -106,6 +110,20 @@ def terminate_cluster_endpoint(host):
return f'https://{host}/api/2.0/clusters/delete'
+def install_endpoint(host):
+ """
+ Utility function to generate the install endpoint given the host.
+ """
+ return f'https://{host}/api/2.0/libraries/install'
+
+
+def uninstall_endpoint(host):
+ """
+ Utility function to generate the uninstall endpoint given the host.
+ """
+ return f'https://{host}/api/2.0/libraries/uninstall'
+
+
def create_valid_response_mock(content):
response = mock.MagicMock()
response.json.return_value = content
@@ -424,6 +442,44 @@ class TestDatabricksHook(unittest.TestCase):
timeout=self.hook.timeout_seconds,
)
+ @mock.patch('airflow.providers.databricks.hooks.databricks.requests')
+ def test_install_libs_on_cluster(self, mock_requests):
+ mock_requests.codes.ok = 200
+ mock_requests.post.return_value.json.return_value = {}
+ status_code_mock = mock.PropertyMock(return_value=200)
+ type(mock_requests.post.return_value).status_code = status_code_mock
+
+ data = {'cluster_id': CLUSTER_ID, 'libraries': LIBRARIES}
+ self.hook.install(data)
+
+ mock_requests.post.assert_called_once_with(
+ install_endpoint(HOST),
+ json={'cluster_id': CLUSTER_ID, 'libraries': LIBRARIES},
+ params=None,
+ auth=(LOGIN, PASSWORD),
+ headers=USER_AGENT_HEADER,
+ timeout=self.hook.timeout_seconds,
+ )
+
+ @mock.patch('airflow.providers.databricks.hooks.databricks.requests')
+ def test_uninstall_libs_on_cluster(self, mock_requests):
+ mock_requests.codes.ok = 200
+ mock_requests.post.return_value.json.return_value = {}
+ status_code_mock = mock.PropertyMock(return_value=200)
+ type(mock_requests.post.return_value).status_code = status_code_mock
+
+ data = {'cluster_id': CLUSTER_ID, 'libraries': LIBRARIES}
+ self.hook.uninstall(data)
+
+ mock_requests.post.assert_called_once_with(
+ uninstall_endpoint(HOST),
+ json={'cluster_id': CLUSTER_ID, 'libraries': LIBRARIES},
+ params=None,
+ auth=(LOGIN, PASSWORD),
+ headers=USER_AGENT_HEADER,
+ timeout=self.hook.timeout_seconds,
+ )
+
class TestDatabricksHookToken(unittest.TestCase):
"""