potiuk commented on code in PR #27758:
URL: https://github.com/apache/airflow/pull/27758#discussion_r1059960445


##########
airflow/jobs/triggerer_job.py:
##########
@@ -17,26 +17,175 @@
 from __future__ import annotations
 
 import asyncio
+import logging
 import os
 import signal
 import sys
 import threading
 import time
+import warnings
 from collections import deque
-from typing import Deque
+from queue import SimpleQueue
+from typing import TYPE_CHECKING, Deque
 
 from sqlalchemy import func
 
 from airflow.configuration import conf
 from airflow.jobs.base_job import BaseJob
 from airflow.models.trigger import Trigger
+from airflow.settings import DONOT_MODIFY_HANDLERS
 from airflow.stats import Stats
 from airflow.triggers.base import BaseTrigger, TriggerEvent
 from airflow.typing_compat import TypedDict
+from airflow.utils.log.file_task_handler import FileTaskHandler
 from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.log.trigger_handler import (
+    DropTriggerLogsFilter,
+    LocalQueueHandler,
+    TriggererHandlerWrapper,
+    TriggerMetadataFilter,
+    ctx_close_handler,
+    ctx_indiv_trigger,
+    ctx_task_instance,
+    ctx_trigger_id,
+)
 from airflow.utils.module_loading import import_string
 from airflow.utils.session import provide_session
 
+if TYPE_CHECKING:
+    from airflow.models import TaskInstance
+
+USING_TRIGGERER_HANDLER_WRAPPER = False
+"""
+If this value is true, trigger logging is configured to use 
TriggerHandlerWrapper
+
+:meta :private
+"""
+
+logger = logging.getLogger(__name__)
+
+
+def configure_trigger_log_handler():
+    """
+    Configure logging such that each trigger logs to its own file and
+    can be exposed through the airflow webserver.
+
+    Generally speaking, we take the log handler configured for logger 
``airflow.task``,
+    wrap it with TriggerHandlerWrapper, and set it as the handler for root 
logger.
+
+    If there already is a handler configured for the root logger
+    and it supports triggers, we wrap it instead.
+
+    :meta private:
+    """
+    global USING_TRIGGERER_HANDLER_WRAPPER
+
+    def supports_triggerer(handler):
+        return getattr(handler, "supports_triggerer", False)
+
+    def get_task_handler_from_logger(logger_):
+        for h in logger_.handlers:
+            if isinstance(h, FileTaskHandler) and not supports_triggerer(h):
+                warnings.warn(
+                    f"Handler {h.__class__.__name__} does not support "
+                    "individual trigger logging. Please check the release 
notes "
+                    "for your provider to see if a newer version supports "
+                    "individual trigger logging."
+                )
+            if supports_triggerer(h):
+                return h
+
+    def find_suitable_task_handler():
+        # check root logger then check airflow.task to see if a handler
+        # suitable for use with TriggerHandlerWrapper (has supports_triggerer
+        # attr, likely inherits from FileTaskHandler)
+        h = get_task_handler_from_logger(root_logger)
+        if not h:
+            # try to use handler configured from airflow task
+            logger.debug("No task logger configured for root logger; trying 
`airflow.task`.")
+            h = get_task_handler_from_logger(logging.getLogger("airflow.task"))
+            if h:
+                logger.debug("Using logging configuration from `airflow.task`")
+        if not h:
+            warnings.warn("Could not find log handler suitable for individual 
trigger logging.")
+            return None
+        return h
+
+    def filter_trigger_logs_from_other_root_handlers(new_hdlr):
+        # we add context vars to log records emitted for individual triggerer 
logging
+        # we want these records to be processed by our special trigger handler 
wrapper
+        # but not by any other handlers, so we filter out these messages from
+        # other handlers by adding DropTriggerLogsFilter
+        # we could consider only adding this filter to the default console 
logger
+        # so as to leave other custom handlers alone
+        for h in root_logger.handlers:
+            if h is not new_hdlr:
+                h.addFilter(DropTriggerLogsFilter())
+
+    def add_handler_wrapper_to_root(base_handler):
+        # first make sure we remove from root logger if it happens to be there
+        # it could have come from root or airflow.task, but we only need
+        # to make sure we remove from root, since messages will not flow
+        # through airflow.task
+        if base_handler in root_logger.handlers:
+            root_logger.removeHandler(base_handler)
+
+        h = TriggererHandlerWrapper(base_handler=base_handler, 
level=base_handler.level)
+        # just extra cautious, checking if user manually configured it there
+        if h not in root_logger.handlers:
+            root_logger.addHandler(h)
+        return h
+
+    root_logger = logging.getLogger()
+    task_handler = find_suitable_task_handler()
+    if not task_handler:
+        return None
+    if TYPE_CHECKING:
+        assert isinstance(task_handler, FileTaskHandler)
+    wrapper_handler = add_handler_wrapper_to_root(task_handler)
+    filter_trigger_logs_from_other_root_handlers(wrapper_handler)

Review Comment:
   > Another reason I went with separate tabs is, the handlers, in their reader 
capacity, support streaming, reading from offset. It gets a little weird to 
think about what that means and how that should work when you have two 
different streams/files. And then add in that those files could be local, 
served, or remote.
   
   Yeah. Very good point. I have not though about the "offset" - and I guess it 
it pretty much "logging-system-dependent" and might get wrong when you have two 
different processes (or even machines) wring in paralell - which indeed has not 
been the case so far.  So yeah. Agree separate tabs is likely better in this 
case.
   
   Though I also think (and that's more to @bbovenzi and @pierrejeambrun  - it 
would be great if we can have the main and triggerer log tab also side-by-side 
and ideally some way to link scrolling (maybe in the future ) that they could 
be scrolled through "together". This might get of course extremely tricky and 
maybe the scrolling part is not needed, but being able to see the task log and 
triggerer log at least side-by-side with timestamps so that they can be somehow 
- even manually-corelllated is I think a must. I think having to switch tabs 
back-forth to see where you deferred and what happened after is pretty terrible 
UX.
   
   Also I think still the "find and replace handler" if we can avoid it is way 
better (I will discuss on slack about it - maybe that is not possible/easy to 
avoid it but it looks pretty "fishy" to me). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to