dstandish commented on code in PR #28440:
URL: https://github.com/apache/airflow/pull/28440#discussion_r1059057535
##########
airflow/cli/commands/task_command.py:
##########
@@ -280,38 +281,85 @@ def _extract_external_executor_id(args) -> str | None:
@contextmanager
-def _capture_task_logs(ti: TaskInstance) -> Generator[None, None, None]:
+def _move_task_handlers_to_root(ti: TaskInstance) -> Generator[None, None,
None]:
"""
- Manage logging context for a task run.
+ Move handlers for task logging to root logger.
- - Replace the root logger configuration with the airflow.task configuration
- so we can capture logs from any custom loggers used in the task.
+ We want anything logged during task run to be propagated to task log
handlers.
+ If running in a k8s executor pod, also keep the stream handler on root
logger
+ so that logs are still emitted to stdout.
+ """
+ # if there are no task handlers, then we should not do anything
+ # because either the handlers were already moved by the LocalTaskJob
+ # invocation of task_run (which wraps the --raw invocation), or
+ # user is doing something custom / unexpected
+ if not ti.log.handlers or settings.DONOT_MODIFY_HANDLERS:
+ yield
+ return
+ is_k8s_executor_pod = os.environ.get("AIRFLOW_IS_K8S_EXECUTOR_POD")
+ root_logger = logging.getLogger()
+ root_level = root_logger.level
+ task_logger = ti.log
+ task_propagate = task_logger.propagate
+ task_handlers = task_logger.handlers.copy()
+
+ # these are already copied to root logger, so remove and propagate
+ # (this ensures they are not handled more than they need to be)
+ # we'll add them back later
+ for h in task_logger.handlers[:]:
+ task_logger.removeHandler(h)
+ task_logger.propagate = True
+ root_logger.setLevel(task_logger.level)
+ root_handlers = root_logger.handlers.copy()
+ root_logger.handlers[:] = task_handlers
+
+ # task log handler reads from k8s pod logs when pod still running
+ # so we need to keep the console handler and set it to not respect
+ # redirection because we read live pod logging from container stdout
+ if is_k8s_executor_pod:
+ for h in root_handlers:
+ if isinstance(h, RedirectStdHandler):
+ root_logger.addHandler(h)
+ # "redirect stdout" context managers work by changing what
lives at
+ # sys.stdout. If respect_redirection is True, then we
reference whatever
+ # "sys.stdout" is at runtime. If False, this handler will
store the value
+ # of sys.stdout at handler *init* time, and stream to *that*
instead of whatever
+ # the current value is.
+ h.respect_redirection = False
+ break
+ try:
+ yield
+ finally:
+ task_logger.propagate = task_propagate
+ root_logger.setLevel(root_level)
+ root_logger.handlers[:] = root_handlers
+ task_logger.handlers[:] = task_handlers
+ if is_k8s_executor_pod:
+ for h in root_handlers:
+ if isinstance(h, RedirectStdHandler):
+ h.respect_redirection = True
- - Redirect stdout and stderr to the task instance log, as INFO and WARNING
- level messages, respectively.
+@contextmanager
+def _redirect_stdout_to_ti_log(ti: TaskInstance) -> Generator[None, None,
None]:
"""
- modify = not settings.DONOT_MODIFY_HANDLERS
- if modify:
- root_logger, task_logger = logging.getLogger(),
logging.getLogger("airflow.task")
+ Redirect stdout to ti logger.
- orig_level = root_logger.level
- root_logger.setLevel(task_logger.level)
- orig_handlers = root_logger.handlers.copy()
- root_logger.handlers[:] = task_logger.handlers
Review Comment:
@malthe observe, we already copy (temporarily) to root logger the thandlers
from airflow.task
This causes problems because we have to have complicated propagation rules
at the airflow.task logger because we leave it there.
While my solution here is a few more lines (and a lot more comments) it's
not very complicated. There are two parts.
1. Instead of _copying_ the handlers to root, I _move_ them to root. We
don't need them at task if they are already at root. This could ultimately
allow for simplification of our propagation logic.
2. Previously we _remove_ our console handler from root at run time. Now,
if we're in a k8s executor pod, I keep the console handler there.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]