SameerMesiah97 commented on code in PR #60778:
URL: https://github.com/apache/airflow/pull/60778#discussion_r2747632022


##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py:
##########
@@ -210,3 +212,15 @@ def annotations_for_logging_task_metadata(annotation_set):
     else:
         annotations_for_logging = "<omitted>"
     return annotations_for_logging
+
+
+def serializable_callback(f):
+    """Convert async callback so it can run in sync or async mode."""
+
+    @wraps(f)
+    def wrapper(*args, mode: str, **kwargs):
+        if mode == ExecutionMode.ASYNC:
+            return f(*args, mode=mode, **kwargs)
+        return asyncio.run(f(*args, mode=mode, **kwargs))
+

Review Comment:
   So the invocation of `@serializable_callback` is not being guarded and you 
are relying on the user correctly inferring the intent of the function? I agree 
with your motivation for introducing this helper but we cannot guarantee that 
it will not be called within an event loop in the operator. Is there any reason 
why the operator cannot do something like this:
   
   `asyncio.run(callback(...))`
   
   If that is not feasible, I believe at the very minimum, it should be:
   
   1) The possibility of encountering `RuntimeError` should be documented very 
clearly (docstring and comment)
   2) The `RuntimeError` should be caught in a try/except block with a more 
informative error message. 
   
   Below is a suggested implementation if you are still intent on keeping 2 
separate modes:
   
   ```
   def serializable_callback(f):
       """
       Convert async callback so it can run in sync or async mode.
   
       In ASYNC mode (e.g. triggerer), the callback is expected to be awaited
       by the caller. In SYNC mode (e.g. operator fallback), the callback is
       executed via asyncio.run(); callers should ensure this is only used
       when no event loop is already running.
       """
   
       @wraps(f)
       def wrapper(*args, mode: str, **kwargs):
           if mode == ExecutionMode.ASYNC:
               return f(*args, mode=mode, **kwargs)
   
           # SYNC mode owns the event loop; calling this while a loop is already
           # running is a hard error and indicates a misclassified execution 
context.
           try:
               return asyncio.run(f(*args, mode=mode, **kwargs))
           except RuntimeError as e:
               raise RuntimeError(
                   "Cannot call serializable_callback in SYNC mode while an 
event "
                   "loop is running. Use ExecutionMode.ASYNC and await the 
callback "
                   "instead."
               ) from e
   
       return wrapper
   ```
   This will immediately inform the user of the reason for the `RuntimeError` 
and mitigate against further unsafe usage. 



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -1080,31 +1085,57 @@ async def fetch_container_logs_before_current_sec(
             since_seconds=(math.ceil((now - since_time).total_seconds()) if 
since_time else None),
         )
         message_to_log = None
-        try:
-            now_seconds = now.replace(microsecond=0)
-            for line in logs:
-                line_timestamp, message = parse_log_line(line)
-                # Skip log lines from the current second to prevent duplicate 
entries on the next read.
-                # The API only allows specifying 'since_seconds', not an exact 
timestamp.
-                if line_timestamp and line_timestamp.replace(microsecond=0) == 
now_seconds:
-                    break
-                if line_timestamp:  # detect new log line
-                    if message_to_log is None:  # first line in the log
-                        message_to_log = message
-                    else:  # previous log line is complete
-                        if message_to_log is not None:
-                            if is_log_group_marker(message_to_log):
-                                print(message_to_log)
-                            else:
-                                self.log.info("[%s] %s", container_name, 
message_to_log)
-                        message_to_log = message
-                elif message_to_log:  # continuation of the previous log line
-                    message_to_log = f"{message_to_log}\n{message}"
-        finally:
-            # log the last line and update the last_captured_timestamp
-            if message_to_log is not None:
-                if is_log_group_marker(message_to_log):
-                    print(message_to_log)
-                else:
-                    self.log.info("[%s] %s", container_name, message_to_log)
+        async with self._hook.get_conn() as connection:
+            v1_api = async_k8s.CoreV1Api(connection)
+            try:
+                now_seconds = now.replace(microsecond=0)
+                for line in logs:
+                    line_timestamp, message = parse_log_line(line)
+                    # Skip log lines from the current second to prevent 
duplicate entries on the next read.
+                    # The API only allows specifying 'since_seconds', not an 
exact timestamp.
+                    if line_timestamp and 
line_timestamp.replace(microsecond=0) == now_seconds:
+                        break
+                    if line_timestamp:  # detect new log line
+                        if message_to_log is None:  # first line in the log
+                            message_to_log = message
+                        else:  # previous log line is complete
+                            if message_to_log is not None:
+                                if is_log_group_marker(message_to_log):
+                                    print(message_to_log)
+                                else:
+                                    for callback in self._callbacks:
+                                        cb = callback.progress_callback(
+                                            line=message_to_log,
+                                            client=v1_api,
+                                            mode=ExecutionMode.ASYNC,
+                                            container_name=container_name,
+                                            timestamp=line_timestamp,
+                                            pod=pod,
+                                        )
+                                        if asyncio.iscoroutine(cb):
+                                            await cb

Review Comment:
   The key phrase here is “should be lightweight”. Can we really guarantee 
that? Reading the code, it doesn’t seem like there are any restrictions on what 
a callback could execute, so we are effectively trusting users not to do any 
heavy lifting in them.
   
   One possible compromise would be to enforce a small global timeout for 
callbacks (e.g. a few seconds at most). However, even with that, this still 
sets a precedent for executing arbitrary user code in the triggerer.
   
   I agree the motivation here is solid and I can see the value of the feature, 
but this crosses into a fundamental triggerer design decision, which I’m not 
comfortable approving or disapproving unilaterally.
   
   @jscheffl I know you requested my review but I’d be interested in your 
thoughts on whether this is a precedent we’re happy to set for triggerers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to