eskarimov commented on a change in pull request #19835:
URL: https://github.com/apache/airflow/pull/19835#discussion_r757859224



##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -64,10 +66,12 @@
 class RunState:
     """Utility class for the run state concept of Databricks runs."""
 
-    def __init__(self, life_cycle_state: str, result_state: str, 
state_message: str) -> None:
+    def __init__(
+        self, life_cycle_state: str, state_message: str, result_state: str = 
None, *args, **kwargs

Review comment:
       It needs to be reviewed together with the changes for initialising the 
class instance out of API response:
   
   Current version:
   ```python
           state = response['state']
           life_cycle_state = state['life_cycle_state']
           # result_state may not be in the state if not terminal
           result_state = state.get('result_state', None)
           state_message = state['state_message']
           return RunState(life_cycle_state, result_state, state_message)
   ```
   Proposed version:
   ```python
           state = response['state']
           return RunState(**state)
   ```
   
   Current version is basically an intermediate layer between the API response 
and class, extracting values out of the API response and initialising class 
instance. But actually the response should already represent a state, why do we 
need this layer then?
   I see the following drawbacks with it:
   - Class description doesn't tell that `result_state` might be missing if 
state is not terminal. Currently it's described with the comment deep in the 
code.
   - It tends to increase repeating code - let's say we want to introduce async 
class for `DatabricksHook`. This logic needs to be written twice. Also in case 
we want to change the class in the future, let's say add new property 
`user_cancelled_or_timedout` (which is already a part of the API response), 
then we need to change class arguments, parsing response logic and class 
instance initialisation everywhere it's used.
     With the proposed version, we only need to change class arguments.
   
   With all the above, answering the questions:
   - > why do we need *args, **kwargs ?
         
        It shows that RunState might receive other init arguments (since we 
don't have control over API response), see above example with 
`user_cancelled_or_timedout` in the response.
   
   - > why to change order of the parameters? They are logical now: lifecycle 
-> result state -> state message.
   
        Just because of Python syntax, we need to put arguments with default 
values after required arguments.

##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -235,21 +241,18 @@ def _get_aad_token(self, resource: str) -> str:
             attempt_num += 1
             sleep(self.retry_delay)
 
-    def _fill_aad_tokens(self, headers: dict) -> str:
+    def _fill_aad_headers(self, headers: dict) -> dict:
         """
-        Fills headers if necessary (SPN is outside of the workspace) and 
generates AAD token
+        Fills AAD headers if necessary (SPN is outside of the workspace)
         :param headers: dictionary with headers to fill-in
-        :return: AAD token
+        :return: dictionary with filled AAD headers
         """
-        # SP is outside of the workspace
-        if 'azure_resource_id' in self.databricks_conn.extra_dejson:

Review comment:
       What do you think if we call it `_get_aad_headers()`, which would return 
either empty dict or a filled dict? Also we won't need input arg `headers` in 
this case.
   
   Then we could construct headers like:
   ```python
   aad_headers = self._get_aad_headers()
   headers = {**USER_AGENT_HEADER.copy(), **aad_headers}
   ```
   

##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -356,31 +353,31 @@ def _do_api_call(self, endpoint_info, json):
     def _log_request_error(self, attempt_num: int, error: str) -> None:
         self.log.error('Attempt %s API Request to Databricks failed with 
reason: %s', attempt_num, error)
 
-    def run_now(self, json: dict) -> str:
+    def run_now(self, json: dict) -> int:

Review comment:
       It won't break existing code, actually it's backwards - if someone 
assumes that output is `str` because of the function signature, then it'd break 
the code, because the actual returned type is `int`.

##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -522,6 +515,20 @@ def uninstall(self, json: dict) -> None:
         """
         self._do_api_call(UNINSTALL_LIBS_ENDPOINT, json)
 
+    @staticmethod
+    def _is_aad_token_valid(aad_token: dict) -> bool:

Review comment:
       - Mainly for readability to hide the details for checking that token is 
valid under the function, because it's not the main purpose of the parent 
function `_get_aad_token`
   - There's a mistake in the current function implementation: it subtracts 
`TOKEN_REFRESH_LEAD_TIME` out of the current time, while it should actually sum 
it. With this we fix it and cover the function with tests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to