potiuk commented on code in PR #27758:
URL: https://github.com/apache/airflow/pull/27758#discussion_r1060424069
##########
airflow/jobs/triggerer_job.py:
##########
@@ -17,26 +17,175 @@
from __future__ import annotations
import asyncio
+import logging
import os
import signal
import sys
import threading
import time
+import warnings
from collections import deque
-from typing import Deque
+from queue import SimpleQueue
+from typing import TYPE_CHECKING, Deque
from sqlalchemy import func
from airflow.configuration import conf
from airflow.jobs.base_job import BaseJob
from airflow.models.trigger import Trigger
+from airflow.settings import DONOT_MODIFY_HANDLERS
from airflow.stats import Stats
from airflow.triggers.base import BaseTrigger, TriggerEvent
from airflow.typing_compat import TypedDict
+from airflow.utils.log.file_task_handler import FileTaskHandler
from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.log.trigger_handler import (
+ DropTriggerLogsFilter,
+ LocalQueueHandler,
+ TriggererHandlerWrapper,
+ TriggerMetadataFilter,
+ ctx_close_handler,
+ ctx_indiv_trigger,
+ ctx_task_instance,
+ ctx_trigger_id,
+)
from airflow.utils.module_loading import import_string
from airflow.utils.session import provide_session
+if TYPE_CHECKING:
+ from airflow.models import TaskInstance
+
+USING_TRIGGERER_HANDLER_WRAPPER = False
+"""
+If this value is true, trigger logging is configured to use
TriggerHandlerWrapper
+
+:meta :private
+"""
+
+logger = logging.getLogger(__name__)
+
+
+def configure_trigger_log_handler():
+ """
+ Configure logging such that each trigger logs to its own file and
+ can be exposed through the airflow webserver.
+
+ Generally speaking, we take the log handler configured for logger
``airflow.task``,
+ wrap it with TriggerHandlerWrapper, and set it as the handler for root
logger.
+
+ If there already is a handler configured for the root logger
+ and it supports triggers, we wrap it instead.
+
+ :meta private:
+ """
+ global USING_TRIGGERER_HANDLER_WRAPPER
+
+ def supports_triggerer(handler):
+ return getattr(handler, "supports_triggerer", False)
+
+ def get_task_handler_from_logger(logger_):
+ for h in logger_.handlers:
+ if isinstance(h, FileTaskHandler) and not supports_triggerer(h):
+ warnings.warn(
+ f"Handler {h.__class__.__name__} does not support "
+ "individual trigger logging. Please check the release
notes "
+ "for your provider to see if a newer version supports "
+ "individual trigger logging."
+ )
+ if supports_triggerer(h):
+ return h
+
+ def find_suitable_task_handler():
+ # check root logger then check airflow.task to see if a handler
+ # suitable for use with TriggerHandlerWrapper (has supports_triggerer
+ # attr, likely inherits from FileTaskHandler)
+ h = get_task_handler_from_logger(root_logger)
+ if not h:
+ # try to use handler configured from airflow task
+ logger.debug("No task logger configured for root logger; trying
`airflow.task`.")
+ h = get_task_handler_from_logger(logging.getLogger("airflow.task"))
+ if h:
+ logger.debug("Using logging configuration from `airflow.task`")
+ if not h:
+ warnings.warn("Could not find log handler suitable for individual
trigger logging.")
+ return None
+ return h
+
+ def filter_trigger_logs_from_other_root_handlers(new_hdlr):
+ # we add context vars to log records emitted for individual triggerer
logging
+ # we want these records to be processed by our special trigger handler
wrapper
+ # but not by any other handlers, so we filter out these messages from
+ # other handlers by adding DropTriggerLogsFilter
+ # we could consider only adding this filter to the default console
logger
+ # so as to leave other custom handlers alone
+ for h in root_logger.handlers:
+ if h is not new_hdlr:
+ h.addFilter(DropTriggerLogsFilter())
+
+ def add_handler_wrapper_to_root(base_handler):
+ # first make sure we remove from root logger if it happens to be there
+ # it could have come from root or airflow.task, but we only need
+ # to make sure we remove from root, since messages will not flow
+ # through airflow.task
+ if base_handler in root_logger.handlers:
+ root_logger.removeHandler(base_handler)
+
+ h = TriggererHandlerWrapper(base_handler=base_handler,
level=base_handler.level)
+ # just extra cautious, checking if user manually configured it there
+ if h not in root_logger.handlers:
+ root_logger.addHandler(h)
+ return h
+
+ root_logger = logging.getLogger()
+ task_handler = find_suitable_task_handler()
+ if not task_handler:
+ return None
+ if TYPE_CHECKING:
+ assert isinstance(task_handler, FileTaskHandler)
+ wrapper_handler = add_handler_wrapper_to_root(task_handler)
+ filter_trigger_logs_from_other_root_handlers(wrapper_handler)
Review Comment:
Just posting here the **summary** of our (long) discussion with @dstandish
that we had on slack - if only for the records but also to track and be able to
come back to the discussion in case we decide to improve the solution in the
future:
* the current approach is a relatively quick way to solve the painful
problem of missing trigger logs so it's ok to go for it now and see if more
scalable solution is needed
* it has some limitations that we should be aware of (and make our users
aware) - the most important is that it creates multiple (1000s or even 10000s)
handlers in a single process - each handler having opened file or socket in
parallel might lead to exhaution of sockets (especially in environments like
docker and k8s where sockets are actually shared resource). Also each Handlers
takes some memory (and for example streaming handlers might take more than
"small" amount). Also some streaming handlers might not be capable of runnning
multiple clients in the same process.
* this might - in effect - limit Triggerer capabilities in terms of number
of paralell tasks handled - and those limits will be system/kernel/deployment
dependent. Or it might lead to an excessive memory usage by running Triggerer
* we need to extensively test the resources limits/memory consumption and
capabilities of streaming handlers to run multiple clients in the same process
before releasing this one to see if these anticipated problems are "real".
* It's possible that even if we find the limits are there, we decide to
relase it and state the limits - warn our users and provide them escape hatches
(disabling this **individual** triggerer logging feature, proper diagnosis and
helpful advices when we hit the resource limits, advising the users to run
multiple triggerers etc.)
Also being constructive - I have a possible improvement propoasl we can
implement later if we decide to merge this one that I think is doable and takes
into account the way how triggers work. I will describe it in a separate
comment/start a separate thread for that.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]