eskarimov commented on a change in pull request #19835:
URL: https://github.com/apache/airflow/pull/19835#discussion_r757859224
##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -64,10 +66,12 @@
class RunState:
"""Utility class for the run state concept of Databricks runs."""
- def __init__(self, life_cycle_state: str, result_state: str,
state_message: str) -> None:
+ def __init__(
+ self, life_cycle_state: str, state_message: str, result_state: str =
None, *args, **kwargs
Review comment:
It needs to be reviewed together with the changes for initialising the
class instance out of API response:
Current version:
```python
state = response['state']
life_cycle_state = state['life_cycle_state']
# result_state may not be in the state if not terminal
result_state = state.get('result_state', None)
state_message = state['state_message']
return RunState(life_cycle_state, result_state, state_message)
```
Proposed version:
```python
state = response['state']
return RunState(**state)
```
Current version is basically an intermediate layer between the API response
and class, extracting values out of the API response and initialising class
instance. But actually the response should already represent a state, why do we
need this layer then?
I see the following drawbacks with it:
- Class description doesn't tell that `result_state` might be missing if
state is not terminal. Currently it's described with the comment deep in the
code.
- It tends to increase repeating code - let's say we want to introduce async
class for `DatabricksHook`. This logic needs to be written twice. Also in case
we want to change the class in the future, let's say add new property
`user_cancelled_or_timedout` (which is already a part of the API response),
then we need to change class arguments, parsing response logic and class
instance initialisation everywhere it's used.
With the proposed version, we only need to change class arguments.
With all the above, answering the questions:
- > why do we need *args, **kwargs ?
It shows that RunState might receive other init arguments (since we
don't have control over API response), see above example with
`user_cancelled_or_timedout` in the response.
- > why to change order of the parameters? They are logical now: lifecycle
-> result state -> state message.
Just because of Python syntax, we need to put arguments with default
values after required arguments.
##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -235,21 +241,18 @@ def _get_aad_token(self, resource: str) -> str:
attempt_num += 1
sleep(self.retry_delay)
- def _fill_aad_tokens(self, headers: dict) -> str:
+ def _fill_aad_headers(self, headers: dict) -> dict:
"""
- Fills headers if necessary (SPN is outside of the workspace) and
generates AAD token
+ Fills AAD headers if necessary (SPN is outside of the workspace)
:param headers: dictionary with headers to fill-in
- :return: AAD token
+ :return: dictionary with filled AAD headers
"""
- # SP is outside of the workspace
- if 'azure_resource_id' in self.databricks_conn.extra_dejson:
Review comment:
What do you think if we call it `_get_aad_headers()`, which would return
either empty dict or a filled dict? Also we won't need input arg `headers` in
this case.
Then we could construct headers like:
```python
aad_headers = self._get_aad_headers()
headers = {**USER_AGENT_HEADER.copy(), **aad_headers}
```
##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -356,31 +353,31 @@ def _do_api_call(self, endpoint_info, json):
def _log_request_error(self, attempt_num: int, error: str) -> None:
self.log.error('Attempt %s API Request to Databricks failed with
reason: %s', attempt_num, error)
- def run_now(self, json: dict) -> str:
+ def run_now(self, json: dict) -> int:
Review comment:
It won't break existing code, actually it's backwards - if someone
assumes that output is `str` because of the function signature, then it'd break
the code, because the actual returned type is `int`.
##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -522,6 +515,20 @@ def uninstall(self, json: dict) -> None:
"""
self._do_api_call(UNINSTALL_LIBS_ENDPOINT, json)
+ @staticmethod
+ def _is_aad_token_valid(aad_token: dict) -> bool:
Review comment:
- Mainly for readability to hide the details for checking that token is
valid under the function, because it's not the main purpose of the parent
function `_get_aad_token`
- There's a mistake in the current function implementation: it subtracts
`TOKEN_REFRESH_LEAD_TIME` out of the current time, while it should actually sum
it. With this we fix it and cover the function with tests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]