This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e24f0a975d Resolve handler close race condition at triggerer shutdown
(#37206)
e24f0a975d is described below
commit e24f0a975d070cee4b376da7d86fe22d6b02502c
Author: Daniel Standish <[email protected]>
AuthorDate: Tue Feb 6 15:11:01 2024 -0800
Resolve handler close race condition at triggerer shutdown (#37206)
At shutdown, a trigger may emit a "close" signal around the same time as
logging.shutDown is called, so the handler may have been removed from the
handlers by one thread when the other tries to delete it.
---
airflow/utils/log/trigger_handler.py | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git a/airflow/utils/log/trigger_handler.py
b/airflow/utils/log/trigger_handler.py
index aa9a43ec87..02af5f306f 100644
--- a/airflow/utils/log/trigger_handler.py
+++ b/airflow/utils/log/trigger_handler.py
@@ -19,6 +19,7 @@ from __future__ import annotations
import asyncio
import logging
+from contextlib import suppress
from contextvars import ContextVar
from copy import copy
from logging.handlers import QueueHandler
@@ -110,7 +111,8 @@ class TriggererHandlerWrapper(logging.Handler):
h = self.handlers.get(trigger_id)
if h:
h.close()
- del self.handlers[trigger_id]
+ with suppress(KeyError): # race condition between `handle` and
`close`
+ del self.handlers[trigger_id]
def flush(self):
for h in self.handlers.values():
@@ -118,9 +120,7 @@ class TriggererHandlerWrapper(logging.Handler):
def close(self):
for trigger_id in list(self.handlers.keys()):
- h = self.handlers[trigger_id]
- h.close()
- del self.handlers[trigger_id]
+ self.close_one(trigger_id)
class LocalQueueHandler(QueueHandler):