eskarimov commented on a change in pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#discussion_r754529168



##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -493,3 +504,120 @@ def __init__(self, token: str) -> None:
     def __call__(self, r: PreparedRequest) -> PreparedRequest:
         r.headers['Authorization'] = 'Bearer ' + self.token
         return r
+
+
+class DatabricksAsyncHook(DatabricksHook):
+    """
+    Async version of the ``DatabricksHook``
+    Implements only necessary methods used further in Databricks Triggers.
+    """
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        super().__init__(*args, **kwargs)
+
+    async def __aenter__(self):
+        self._session = aiohttp.ClientSession()
+        return self
+
+    async def __aexit__(self, *err):
+        await self._session.close()
+        self._session = None
+
+    async def _do_api_call(self, endpoint_info: Tuple[str, str], json: dict) 
-> dict:
+        """
+        Utility function to perform an async API call with retries
+
+        :param endpoint_info: Tuple of method and endpoint
+        :type endpoint_info: tuple[string, string]
+        :param json: Parameters for this API call.
+        :type json: dict
+        :return: If the api call returns a OK status code,
+            this function returns the response in JSON. Otherwise, throw an 
AirflowException.
+        :rtype: dict
+        """
+        method, endpoint = endpoint_info
+
+        self.databricks_conn = self.get_connection(self.databricks_conn_id)
+
+        auth = None
+        headers = {}
+        if 'token' in self.databricks_conn.extra_dejson:
+            self.log.info('Using token auth. ')
+            headers["Authorization"] = f'Bearer 
{self.databricks_conn.extra_dejson["token"]}'
+            if 'host' in self.databricks_conn.extra_dejson:
+                host = 
self._parse_host(self.databricks_conn.extra_dejson['host'])
+            else:
+                host = self.databricks_conn.host
+        else:
+            self.log.info('Using basic auth. ')
+            auth = aiohttp.BasicAuth(self.databricks_conn.login, 
self.databricks_conn.password)
+            host = self.databricks_conn.host

Review comment:
       Thanks for pointing! I think to refactor `DatabricksHook`, sharing 
functionality between sync and async as much as possible, to avoid maintaining 
2 similar classes in the future




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to