SameerMesiah97 commented on code in PR #60778:
URL: https://github.com/apache/airflow/pull/60778#discussion_r2747787404
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -1080,31 +1085,57 @@ async def fetch_container_logs_before_current_sec(
since_seconds=(math.ceil((now - since_time).total_seconds()) if
since_time else None),
)
message_to_log = None
- try:
- now_seconds = now.replace(microsecond=0)
- for line in logs:
- line_timestamp, message = parse_log_line(line)
- # Skip log lines from the current second to prevent duplicate
entries on the next read.
- # The API only allows specifying 'since_seconds', not an exact
timestamp.
- if line_timestamp and line_timestamp.replace(microsecond=0) ==
now_seconds:
- break
- if line_timestamp: # detect new log line
- if message_to_log is None: # first line in the log
- message_to_log = message
- else: # previous log line is complete
- if message_to_log is not None:
- if is_log_group_marker(message_to_log):
- print(message_to_log)
- else:
- self.log.info("[%s] %s", container_name,
message_to_log)
- message_to_log = message
- elif message_to_log: # continuation of the previous log line
- message_to_log = f"{message_to_log}\n{message}"
- finally:
- # log the last line and update the last_captured_timestamp
- if message_to_log is not None:
- if is_log_group_marker(message_to_log):
- print(message_to_log)
- else:
- self.log.info("[%s] %s", container_name, message_to_log)
+ async with self._hook.get_conn() as connection:
+ v1_api = async_k8s.CoreV1Api(connection)
+ try:
+ now_seconds = now.replace(microsecond=0)
+ for line in logs:
+ line_timestamp, message = parse_log_line(line)
+ # Skip log lines from the current second to prevent
duplicate entries on the next read.
+ # The API only allows specifying 'since_seconds', not an
exact timestamp.
+ if line_timestamp and
line_timestamp.replace(microsecond=0) == now_seconds:
+ break
+ if line_timestamp: # detect new log line
+ if message_to_log is None: # first line in the log
+ message_to_log = message
+ else: # previous log line is complete
+ if message_to_log is not None:
+ if is_log_group_marker(message_to_log):
+ print(message_to_log)
+ else:
+ for callback in self._callbacks:
+ cb = callback.progress_callback(
+ line=message_to_log,
+ client=v1_api,
+ mode=ExecutionMode.ASYNC,
+ container_name=container_name,
+ timestamp=line_timestamp,
+ pod=pod,
+ )
+ if asyncio.iscoroutine(cb):
+ await cb
Review Comment:
The key phrase here is “should be lightweight”. Can we really guarantee
that? Reading the code, it doesn’t seem like there are any restrictions on what
a callback could execute, so we are effectively trusting users not to do any
heavy lifting in them.
One possible compromise would be to enforce a small global timeout for
callbacks (e.g. a few seconds at most). However, even with that, this still
sets a precedent for executing arbitrary user code in the triggerer.
I agree the motivation here is solid and I can see the value of the feature,
but this crosses into a fundamental triggerer design decision, which I’m not
comfortable approving or disapproving unilaterally.
@jscheffl I know you requested my review — I’d be interested in your
thoughts on whether this is a precedent we’re happy to set for triggerers.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]