mwojtyczka commented on code in PR #63611:
URL: https://github.com/apache/airflow/pull/63611#discussion_r2938814655


##########
providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py:
##########
@@ -161,6 +166,16 @@ def my_after_func(retry_state):
     def databricks_conn(self) -> Connection:
         return self.get_connection(self.databricks_conn_id)  # type: 
ignore[return-value]
 
+    async def adatabricks_conn(self) -> Connection:

Review Comment:
   same as above, use common naming convention



##########
providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py:
##########


Review Comment:
   The entire k8s call chain it invokes still accesses self.databricks_conn 
synchronously
   _a_get_token()                          ← fixed: uses adatabricks_conn()
     └─ _a_get_federated_databricks_token()
          ├─ _get_required_client_id()     ← sync, accesses self.databricks_conn
          └─ _a_get_k8s_jwt_token()        ← async, accesses 
self.databricks_conn
               ├─ _a_get_k8s_projected_volume_token()  ← async, accesses 
self.databricks_conn
               └─ _a_get_k8s_token_request_api()       ← async, accesses 
self.databricks_conn
   
   So anyone using federated_k8s auth with a deferrable operator would still 
hit the same AsyncToSync-in-event-loop crash that motivated this PR.
   The fix would need to add async version of _get_required_client_id that 
awaits adatabricks_conn().
   Note: the existing test_no_sync_get_connection test doesn't catch this 
because it uses Connection(login="foo", password="bar") which doesn't exercise 
the federated_k8s branch. After the k8s methods are fixed, the regression test 
should also exercise that path to prove it doesn't touch self.databricks_conn. 



##########
providers/databricks/tests/unit/databricks/hooks/test_databricks_base.py:
##########
@@ -1757,6 +1759,7 @@ async def test_a_get_token_with_federated_k8s_login(self, 
mock_post):
 
         hook = BaseDatabricksHook()
         hook.databricks_conn = mock_conn
+        hook._async_databricks_conn = mock_conn  # For async methods

Review Comment:
   The tests now set `hook._async_databricks_conn = mock_conn`. This is 
functional but sets the internal cache slot directly, bypassing the 
`adatabricks_conn()` caching logic. It couples tests to the internal 
implementation. Better to mock `adatabricks_conn` as a coroutine (as done in 
the newer tests). This way the test goes through `adatabricks_conn()` like 
production code does, and there's no need to know about the internal 
`_async_databricks_conn` cache field. 
   
   ```
   @pytest.mark.asyncio
   @mock.patch("aiohttp.ClientSession.post")
   
@mock.patch("airflow.providers.databricks.hooks.databricks_base.BaseDatabricksHook.adatabricks_conn")
   async def test_a_get_token_with_federated_k8s_login(self, mock_conn, 
mock_post):
       mock_conn.return_value = Connection(
           host="my-workspace.cloud.databricks.com",
           login="federated_k8s",
           password=None,
           extra={"client_id": "test-client-id"},
       )
       hook = BaseDatabricksHook()
       # no manual assignment of databricks_conn or _async_databricks_conn
       ...
   ```



##########
providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py:
##########
@@ -142,6 +143,10 @@ def __init__(
         self._metadata_expiry: float = 0
         self._metadata_ttl: int = 300
 
+        # Cache for lack of an async @cached_property
+        self._async_databricks_conn: Connection | None = None

Review Comment:
   ```suggestion
           self._a_databricks_conn: Connection | None = None
   ```
   We should use a common naming convention. There are 3 patterns now: 
`async_<text>`, `a<text>`, `a_<text>`. The pattern used so far was a_<text>. I 
find both `async_<text>` and `a_<text>` appropriate.



##########
providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py:
##########
@@ -292,11 +324,10 @@ async def _a_get_sp_token(self, resource: str) -> str:
         try:
             async for attempt in self._a_get_retry_object():
                 with attempt:
+                    conn = await self.adatabricks_conn()

Review Comment:
   In the PR, _a_get_aad_token fetches conn once before the loop:
   ```
   conn = await self.adatabricks_conn()
   async for attempt in self._a_get_retry_object():
   ```
   
   But `_a_get_sp_token` fetches conn inside each attempt. Both are correct due 
to caching, but `_a_get_sp_token` should match `_a_get_aad_token's` style and 
fetch before the loop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to