jason810496 commented on code in PR #45129:
URL: https://github.com/apache/airflow/pull/45129#discussion_r1959232169
##########
airflow/utils/log/file_task_handler.py:
##########
@@ -402,11 +588,29 @@ def _read(
TaskInstanceState.RUNNING,
TaskInstanceState.DEFERRED,
)
+
+ current_total_logs_size = local_logs_size + remote_logs_size +
executor_logs_size + served_logs_size
+ interleave_log_stream = _interleave_logs(
+ *local_parsed_logs,
+ *remote_parsed_logs,
+ *(executor_parsed_logs or []),
+ *served_parsed_logs,
+ )
+
+ # skip log stream until the last position
if metadata and "log_pos" in metadata:
- previous_chars = metadata["log_pos"]
- logs = logs[previous_chars:] # Cut off previously passed log test
as new tail
- out_message = logs if "log_pos" in (metadata or {}) else messages +
logs
- return out_message, {"end_of_log": end_of_log, "log_pos": log_pos}
+ offset = metadata["log_pos"]
+ for _ in range(offset):
+ next(interleave_log_stream, None)
+
+ out_stream: Iterable[str]
+ if "log_pos" in (metadata or {}):
Review Comment:
Both will produce the same result in this context, but yours is much better.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]