kaxil commented on code in PR #68232:
URL: https://github.com/apache/airflow/pull/68232#discussion_r3467571101


##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -628,6 +653,16 @@ def delete(self, key: str) -> None:
         if backend is not None:
             backend.delete(self._scope, key)
 
+    async def adelete(self, key: str) -> None:
+        """Async version of :meth:`delete` that awaits instead of blocking the 
event loop."""
+        from airflow.sdk.execution_time.comms import DeleteTaskStateStore
+        from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
+
+        await SUPERVISOR_COMMS.asend(DeleteTaskStateStore(ti_id=self._ti_id, 
key=key))
+        backend = _get_worker_state_store_backend()
+        if backend is not None:
+            backend.delete(self._scope, key)

Review Comment:
   `adelete` awaits the socket round-trip but then calls the synchronous 
`backend.delete()`, which blocks the event loop when a custom `[workers] 
state_store_backend` is configured (a real backend's `delete` does network 
I/O). `aclear` has the same pattern on line 692 with `backend.clear()`. 
`BaseStoreBackend` provides `adelete`/`aclear` for exactly this case, so these 
should be `await backend.adelete(self._scope, key)` and `await 
backend.aclear(...)`. With the default (no backend) the call is skipped, so 
this only bites custom-backend users, which is also why the new tests don't 
catch it. (`aget`/`aset` have the same sync-backend call via 
`_extract_get_response`/`_build_set_message`, but the serialize/deserialize 
helpers have no async variant, so those are harder to address.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to