SameerMesiah97 commented on code in PR #60778:
URL: https://github.com/apache/airflow/pull/60778#discussion_r2747785689


##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py:
##########
@@ -210,3 +212,15 @@ def annotations_for_logging_task_metadata(annotation_set):
     else:
         annotations_for_logging = "<omitted>"
     return annotations_for_logging
+
+
+def serializable_callback(f):
+    """Convert async callback so it can run in sync or async mode."""
+
+    @wraps(f)
+    def wrapper(*args, mode: str, **kwargs):
+        if mode == ExecutionMode.ASYNC:
+            return f(*args, mode=mode, **kwargs)
+        return asyncio.run(f(*args, mode=mode, **kwargs))
+

Review Comment:
   So the invocation of `@serializable_callback` is not being guarded and you 
are relying on the user correctly inferring the intent of the function? I agree 
with your motivation for introducing this helper but we cannot guarantee that 
it will not be called within an event loop in the operator. Is there any reason 
why the operator cannot do something like this:
   
   `asyncio.run(callback(...))`
   
   If that is not feasible, I believe at the very minimum, it should be:
   
   The possibility of encountering `RuntimeError` should be documented very 
clearly (docstring and comment)
   The RuntimeError should be caught in a try/except block with a more 
informative error message.
   Below is a suggested implementation if you are still intent on keeping 2 
separate modes:
   
   ```
   def serializable_callback(f):
       """
       Convert async callback so it can run in sync or async mode.
   
       In ASYNC mode (e.g. triggerer), the callback is expected to be awaited
       by the caller. In SYNC mode (e.g. operator fallback), the callback is
       executed via asyncio.run(); callers should ensure this is only used
       when no event loop is already running.
       """
   
       @wraps(f)
       def wrapper(*args, mode: str, **kwargs):
           if mode == ExecutionMode.ASYNC:
               return f(*args, mode=mode, **kwargs)
   
           # SYNC mode owns the event loop; calling this while a loop is already
           # running is a hard error and indicates a misclassified execution 
context.
           try:
               return asyncio.run(f(*args, mode=mode, **kwargs))
           except RuntimeError as e:
               raise RuntimeError(
                   "Cannot call serializable_callback in SYNC mode while an 
event "
                   "loop is running. Use ExecutionMode.ASYNC and await the 
callback "
                   "instead."
               ) from e
   
       return wrapper
   ```
   This will immediately inform the user of the reason for the `RuntimeError` 
and mitigate against further unsafe usage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to