johnhoran commented on code in PR #60778:
URL: https://github.com/apache/airflow/pull/60778#discussion_r2717923807


##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -1080,31 +1085,57 @@ async def fetch_container_logs_before_current_sec(
             since_seconds=(math.ceil((now - since_time).total_seconds()) if 
since_time else None),
         )
         message_to_log = None
-        try:
-            now_seconds = now.replace(microsecond=0)
-            for line in logs:
-                line_timestamp, message = parse_log_line(line)
-                # Skip log lines from the current second to prevent duplicate 
entries on the next read.
-                # The API only allows specifying 'since_seconds', not an exact 
timestamp.
-                if line_timestamp and line_timestamp.replace(microsecond=0) == 
now_seconds:
-                    break
-                if line_timestamp:  # detect new log line
-                    if message_to_log is None:  # first line in the log
-                        message_to_log = message
-                    else:  # previous log line is complete
-                        if message_to_log is not None:
-                            if is_log_group_marker(message_to_log):
-                                print(message_to_log)
-                            else:
-                                self.log.info("[%s] %s", container_name, 
message_to_log)
-                        message_to_log = message
-                elif message_to_log:  # continuation of the previous log line
-                    message_to_log = f"{message_to_log}\n{message}"
-        finally:
-            # log the last line and update the last_captured_timestamp
-            if message_to_log is not None:
-                if is_log_group_marker(message_to_log):
-                    print(message_to_log)
-                else:
-                    self.log.info("[%s] %s", container_name, message_to_log)
+        async with self._hook.get_conn() as connection:
+            v1_api = async_k8s.CoreV1Api(connection)
+            try:
+                now_seconds = now.replace(microsecond=0)
+                for line in logs:
+                    line_timestamp, message = parse_log_line(line)
+                    # Skip log lines from the current second to prevent 
duplicate entries on the next read.
+                    # The API only allows specifying 'since_seconds', not an 
exact timestamp.
+                    if line_timestamp and 
line_timestamp.replace(microsecond=0) == now_seconds:
+                        break
+                    if line_timestamp:  # detect new log line
+                        if message_to_log is None:  # first line in the log
+                            message_to_log = message
+                        else:  # previous log line is complete
+                            if message_to_log is not None:
+                                if is_log_group_marker(message_to_log):
+                                    print(message_to_log)
+                                else:
+                                    for callback in self._callbacks:
+                                        cb = callback.progress_callback(
+                                            line=message_to_log,
+                                            client=v1_api,
+                                            mode=ExecutionMode.ASYNC,
+                                            container_name=container_name,
+                                            timestamp=line_timestamp,
+                                            pod=pod,
+                                        )
+                                        if asyncio.iscoroutine(cb):
+                                            await cb

Review Comment:
   The specific use case I have in mind is in support of the watcher pattern 
that is being implement in astronomer cosmos for running DBT.  
https://github.com/astronomer/astronomer-cosmos/pull/2207
   
   With this you have a pod that's actually running DBT and an airflow task 
that is parsing kubernetes logs to extract DBT events and create xcom variables 
that are consumed by sensors.  I think the parsing of the logs and setting xcom 
variables should be lightweight enough that it can be run from the triggerer.  
Implementing this from the triggerer was part of the original design of the 
defferred mode for KPO, but the implementation ran into issues so it was 
stripped back out.
   
   I did wonder if I should check the callbacks and only pass the ones that 
have actually implemented progress_callbacks to make everything a little 
lighter.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to