amoghrajesh commented on code in PR #62103:
URL: https://github.com/apache/airflow/pull/62103#discussion_r2828166394
##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -319,6 +318,17 @@ def __call__(self, processors:
Iterable[structlog.typing.Processor]) -> WrappedL
self.bound_logger = logger
return logger
+ def __del__(self):
Review Comment:
The one concern I mainly have with using `del` is that if an exception
occurs during cleanup, it will quietly exit
##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -319,6 +318,17 @@ def __call__(self, processors:
Iterable[structlog.typing.Processor]) -> WrappedL
self.bound_logger = logger
return logger
+ def __del__(self):
+ # Explicitly close the file descriptor when the logger is garbage
collected.
+ if raw_logger := getattr(self.bound_logger, "_logger", None):
+ file_handle = getattr(raw_logger, "_file", None)
+ else:
+ return
+
+ if file_handle and not file_handle.closed:
+ file_handle.flush()
+ file_handle.close()
Review Comment:
Can we handle it similar to how it is done for DAG processor:
https://github.com/apache/airflow/pull/47574
In short something like this, where we store the handler and clear it to
avoid diversion from that approach?
```diff
diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
--- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py (revision
10cd08dff8916b93f8c3f94bc34265bb7544fde4)
+++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py (date
1771510381992)
@@ -30,7 +30,7 @@
from datetime import datetime
from socket import socket
from traceback import format_exception
-from typing import TYPE_CHECKING, Annotated, Any, ClassVar, Literal,
TypedDict
+from typing import IO, TYPE_CHECKING, Annotated, Any, ClassVar, Literal,
TypedDict
import anyio
import attrs
@@ -302,6 +302,8 @@
bound_logger: WrappedLogger = attrs.field(init=False, repr=False)
+ _filehandle: IO[Any] = attrs.field(init=False, repr=False)
+
def __call__(self, processors: Iterable[structlog.typing.Processor]) ->
WrappedLogger:
if hasattr(self, "bound_logger"):
return self.bound_logger
@@ -312,13 +314,20 @@
pretty_logs = False
if pretty_logs:
- underlying_logger: WrappedLogger =
structlog.WriteLogger(log_file.open("w", buffering=1))
+ self._filehandle = log_file.open("w", buffering=1)
+ underlying_logger: WrappedLogger =
structlog.WriteLogger(self._filehandle)
else:
- underlying_logger = structlog.BytesLogger(log_file.open("wb"))
+ self._filehandle = log_file.open("wb")
+ underlying_logger = structlog.BytesLogger(self._filehandle)
logger = structlog.wrap_logger(underlying_logger,
processors=processors).bind()
self.bound_logger = logger
return logger
+ def close(self):
+ """Explicitly close the underlying log file handle."""
+ if hasattr(self, "_filehandle"):
+ self._filehandle.close()
+
def upload_to_remote(self):
from airflow.sdk.log import upload_to_remote
@@ -421,10 +430,9 @@
for id in msg.finished or ():
self.running_triggers.discard(id)
self.cancelling_triggers.discard(id)
# Remove logger from the cache, and since structlog doesn't
have an explicit close method, we
# only need to remove the last reference to it to close the
open FH
if factory := self.logger_cache.pop(id, None):
factory.upload_to_remote()
+ factory.close()
response = messages.TriggerStateSync(
to_create=[],
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]