This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-8-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit defaaefbc61187588de9aa95787178ddeb90ad54 Author: Daniel Standish <[email protected]> AuthorDate: Tue Feb 6 15:11:01 2024 -0800 Resolve handler close race condition at triggerer shutdown (#37206) At shutdown, a trigger may emit a "close" signal around the same time as logging.shutDown is called, so the handler may have been removed from the handlers by one thread when the other tries to delete it. (cherry picked from commit e24f0a975d070cee4b376da7d86fe22d6b02502c) --- airflow/utils/log/trigger_handler.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow/utils/log/trigger_handler.py b/airflow/utils/log/trigger_handler.py index aa9a43ec87..02af5f306f 100644 --- a/airflow/utils/log/trigger_handler.py +++ b/airflow/utils/log/trigger_handler.py @@ -19,6 +19,7 @@ from __future__ import annotations import asyncio import logging +from contextlib import suppress from contextvars import ContextVar from copy import copy from logging.handlers import QueueHandler @@ -110,7 +111,8 @@ class TriggererHandlerWrapper(logging.Handler): h = self.handlers.get(trigger_id) if h: h.close() - del self.handlers[trigger_id] + with suppress(KeyError): # race condition between `handle` and `close` + del self.handlers[trigger_id] def flush(self): for h in self.handlers.values(): @@ -118,9 +120,7 @@ class TriggererHandlerWrapper(logging.Handler): def close(self): for trigger_id in list(self.handlers.keys()): - h = self.handlers[trigger_id] - h.close() - del self.handlers[trigger_id] + self.close_one(trigger_id) class LocalQueueHandler(QueueHandler):
