potiuk commented on code in PR #27758:
URL: https://github.com/apache/airflow/pull/27758#discussion_r1059854137


##########
airflow/jobs/triggerer_job.py:
##########
@@ -17,26 +17,175 @@
 from __future__ import annotations
 
 import asyncio
+import logging
 import os
 import signal
 import sys
 import threading
 import time
+import warnings
 from collections import deque
-from typing import Deque
+from queue import SimpleQueue
+from typing import TYPE_CHECKING, Deque
 
 from sqlalchemy import func
 
 from airflow.configuration import conf
 from airflow.jobs.base_job import BaseJob
 from airflow.models.trigger import Trigger
+from airflow.settings import DONOT_MODIFY_HANDLERS
 from airflow.stats import Stats
 from airflow.triggers.base import BaseTrigger, TriggerEvent
 from airflow.typing_compat import TypedDict
+from airflow.utils.log.file_task_handler import FileTaskHandler
 from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.log.trigger_handler import (
+    DropTriggerLogsFilter,
+    LocalQueueHandler,
+    TriggererHandlerWrapper,
+    TriggerMetadataFilter,
+    ctx_close_handler,
+    ctx_indiv_trigger,
+    ctx_task_instance,
+    ctx_trigger_id,
+)
 from airflow.utils.module_loading import import_string
 from airflow.utils.session import provide_session
 
+if TYPE_CHECKING:
+    from airflow.models import TaskInstance
+
+USING_TRIGGERER_HANDLER_WRAPPER = False
+"""
+If this value is true, trigger logging is configured to use 
TriggerHandlerWrapper
+
+:meta :private
+"""
+
+logger = logging.getLogger(__name__)
+
+
+def configure_trigger_log_handler():
+    """
+    Configure logging such that each trigger logs to its own file and
+    can be exposed through the airflow webserver.
+
+    Generally speaking, we take the log handler configured for logger 
``airflow.task``,
+    wrap it with TriggerHandlerWrapper, and set it as the handler for root 
logger.
+
+    If there already is a handler configured for the root logger
+    and it supports triggers, we wrap it instead.
+
+    :meta private:
+    """
+    global USING_TRIGGERER_HANDLER_WRAPPER
+
+    def supports_triggerer(handler):
+        return getattr(handler, "supports_triggerer", False)
+
+    def get_task_handler_from_logger(logger_):
+        for h in logger_.handlers:
+            if isinstance(h, FileTaskHandler) and not supports_triggerer(h):
+                warnings.warn(
+                    f"Handler {h.__class__.__name__} does not support "
+                    "individual trigger logging. Please check the release 
notes "
+                    "for your provider to see if a newer version supports "
+                    "individual trigger logging."
+                )
+            if supports_triggerer(h):
+                return h
+
+    def find_suitable_task_handler():
+        # check root logger then check airflow.task to see if a handler
+        # suitable for use with TriggerHandlerWrapper (has supports_triggerer
+        # attr, likely inherits from FileTaskHandler)
+        h = get_task_handler_from_logger(root_logger)
+        if not h:
+            # try to use handler configured from airflow task
+            logger.debug("No task logger configured for root logger; trying 
`airflow.task`.")
+            h = get_task_handler_from_logger(logging.getLogger("airflow.task"))
+            if h:
+                logger.debug("Using logging configuration from `airflow.task`")
+        if not h:
+            warnings.warn("Could not find log handler suitable for individual 
trigger logging.")
+            return None
+        return h
+
+    def filter_trigger_logs_from_other_root_handlers(new_hdlr):
+        # we add context vars to log records emitted for individual triggerer 
logging
+        # we want these records to be processed by our special trigger handler 
wrapper
+        # but not by any other handlers, so we filter out these messages from
+        # other handlers by adding DropTriggerLogsFilter
+        # we could consider only adding this filter to the default console 
logger
+        # so as to leave other custom handlers alone
+        for h in root_logger.handlers:
+            if h is not new_hdlr:
+                h.addFilter(DropTriggerLogsFilter())
+
+    def add_handler_wrapper_to_root(base_handler):
+        # first make sure we remove from root logger if it happens to be there
+        # it could have come from root or airflow.task, but we only need
+        # to make sure we remove from root, since messages will not flow
+        # through airflow.task
+        if base_handler in root_logger.handlers:
+            root_logger.removeHandler(base_handler)
+
+        h = TriggererHandlerWrapper(base_handler=base_handler, 
level=base_handler.level)
+        # just extra cautious, checking if user manually configured it there
+        if h not in root_logger.handlers:
+            root_logger.addHandler(h)
+        return h
+
+    root_logger = logging.getLogger()
+    task_handler = find_suitable_task_handler()
+    if not task_handler:
+        return None
+    if TYPE_CHECKING:
+        assert isinstance(task_handler, FileTaskHandler)
+    wrapper_handler = add_handler_wrapper_to_root(task_handler)
+    filter_trigger_logs_from_other_root_handlers(wrapper_handler)

Review Comment:
   It's quite clear. BTW. This is an interesting problem to solve with blocking 
I/O in log handlers.
   
   However, I just wonder if this is really the kind of complexity we need 
here. And yes.I am sorry for being late in the game here as I had other stuff 
to take a look at, so maybe I do not understand something and maybe I miss a 
crucial piece of information. Apologies If I do, but I think we can do it 
slightly simpler, and in the way that we will get full support for any (even 
completely custom) streaming log handlers from day one.
   
   What would be nice there is that the "Trigger logs" tab would only be needed 
if you do not use remote logging or when your remote logging does not support 
streaming during task execution (like s3). In case of streaming loggers 
instead, we will get the logs interleaved with the regular task logs - which is 
much better, because then you will see them in one "stream" (barring occasional 
races which are inevitable in a distributed system - but with proper time 
synchronisation between nodes and support from log streaming might be limited 
to whatever precision the logging solution supports (ms/us etc). 
   
   I think for all cases where streaming logs **just works** during code 
execution, we would not have to do anything - they would be handed 
out-of-the-box. It would also utilize most of the other code in this PR - just 
setting up the handlers woudl be simpler, and it would support all streaming 
handers out of the box with 0 modifications to them.
   
   The approach I think we could use in this case is as follows:
   
   1) let all trigger code use "airflow.trigger" logger - which would have 
always a single handler, that would only do non-blocking calls - forwarding the 
logs to an in-memory queue, including context of the task instance that is the 
"cause" of the log.
   
   2) Have a separate non-async-io compatible process that reads from the 
in-memory queue and .... log it to "airflow.task" logger - pretending they are 
a special "worker". It could also serve the logs in the very same way as the 
other workers (and handle the Trigger logs tab). No manipulation of handlers is 
needed in this case. This separate process would have just the same set of 
loggers any other airflow process has. The only difference is that it would 
have to pass the context to "set_context" method of the configured logger 
before writing a log entry. 
   
   3) Either now or in the future this approach could be quite optimized so 
that the context does not switch too frequently (this is the main problem I see 
we have when one triggerer handles multiple tasks). For example we should allow 
N parallel threads (or processes) to write logs - each with its own queue based 
on hash of the dag_id, task_id, run_id. Then each such thread/process could do 
its own micro-batching for writing the logs (with slight delay) and sort them 
according to dag_id, task_id, run_id before writing, so that multiple log 
entries for the same task run could be written together in a batch with just a 
single "set_context" call.
   
   The benefit of doing it this way, all (including custom) streaming handlers 
will be handled out-of-the-box from day 0 - and neither the users nor us will 
have to make a single change to those handlers to make them work nicely - 
including the fact that the trigger log entries will be nicely interleaved with 
the "regular" task logs in case of streaming solutions, which is exactly what 
our users expect. 
   
   I believe that should be about it. No manipulation with handlers, no code 
for custom, streaming handlers, we can implement optimisation for context 
switching, same code handling local logs and serving them as for the streaming 
solutions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to