This is an automated email from the ASF dual-hosted git repository.
tirkarthi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new aeec6efccad Close the logger's file descriptor once the
DagFileProcessorProcess is complete. (#47574)
aeec6efccad is described below
commit aeec6efccadea733a4c7f211e10927f9d9dcb40e
Author: Karthikeyan Singaravelan <[email protected]>
AuthorDate: Wed Mar 12 10:55:34 2025 +0530
Close the logger's file descriptor once the DagFileProcessorProcess is
complete. (#47574)
Close the logger's file descriptor once the DagFileProcessorProcess is
complete. This also includes closing the file descriptor in case of timeouts.
---
airflow/dag_processing/manager.py | 21 ++++++++++++++-------
airflow/dag_processing/processor.py | 3 ++-
tests/dag_processing/test_manager.py | 15 ++++++++++++---
tests/dag_processing/test_processor.py | 16 +++++++---------
4 files changed, 35 insertions(+), 20 deletions(-)
diff --git a/airflow/dag_processing/manager.py
b/airflow/dag_processing/manager.py
index 13e77795060..5000da8f006 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -458,8 +458,7 @@ class DagFileProcessorManager(LoggingMixin):
now_seconds = time.monotonic()
if now_seconds < next_check:
self.log.debug(
- "Not time to check if DAG Bundles need refreshed yet -
skipping. "
- "Next check in %.2f seconds",
+ "Not time to check if DAG Bundles need refreshed yet -
skipping. Next check in %.2f seconds",
next_check - now_seconds,
)
return
@@ -762,6 +761,7 @@ class DagFileProcessorManager(LoggingMixin):
self.log.warning("Stopping processor for %s", file)
Stats.decr("dag_processing.processes", tags={"file_path":
file, "action": "stop"})
processor.kill(signal.SIGKILL)
+ processor.logger_filehandle.close()
self._file_stats.pop(file, None)
@provide_session
@@ -786,7 +786,8 @@ class DagFileProcessorManager(LoggingMixin):
)
for file in finished:
- self._processors.pop(file)
+ processor = self._processors.pop(file)
+ processor.logger_filehandle.close()
def _get_log_dir(self) -> str:
return os.path.join(self.base_log_dir,
timezone.utcnow().strftime("%Y-%m-%d"))
@@ -829,14 +830,18 @@ class DagFileProcessorManager(LoggingMixin):
def _get_logger_for_dag_file(self, dag_file: DagFileInfo):
log_filename = self._render_log_filename(dag_file)
log_file = init_log_file(log_filename)
- underlying_logger = structlog.BytesLogger(log_file.open("ab"))
+ logger_filehandle = log_file.open("ab")
+ underlying_logger = structlog.BytesLogger(logger_filehandle)
processors = logging_processors(enable_pretty_log=False)[0]
- return structlog.wrap_logger(underlying_logger, processors=processors,
logger_name="processor").bind()
+ return structlog.wrap_logger(
+ underlying_logger, processors=processors, logger_name="processor"
+ ).bind(), logger_filehandle
def _create_process(self, dag_file: DagFileInfo) ->
DagFileProcessorProcess:
id = uuid7()
callback_to_execute_for_file = self._callback_to_execute.pop(dag_file,
[])
+ logger, logger_filehandle = self._get_logger_for_dag_file(dag_file)
return DagFileProcessorProcess.start(
id=id,
@@ -844,7 +849,8 @@ class DagFileProcessorManager(LoggingMixin):
bundle_path=cast(Path, dag_file.bundle_path),
callbacks=callback_to_execute_for_file,
selector=self.selector,
- logger=self._get_logger_for_dag_file(dag_file),
+ logger=logger,
+ logger_filehandle=logger_filehandle,
)
def _start_new_processes(self):
@@ -985,7 +991,8 @@ class DagFileProcessorManager(LoggingMixin):
# Clean up `self._processors` after iterating over it
for proc in processors_to_remove:
- self._processors.pop(proc)
+ processor = self._processors.pop(proc)
+ processor.logger_filehandle.close()
def _add_files_to_queue(self, files: list[DagFileInfo], add_at_front:
bool):
"""Add stuff to the back or front of the file queue, unless it's
already present."""
diff --git a/airflow/dag_processing/processor.py
b/airflow/dag_processing/processor.py
index c5420d0b21f..889f9b232ce 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -21,7 +21,7 @@ import os
import sys
import traceback
from pathlib import Path
-from typing import TYPE_CHECKING, Annotated, Callable, ClassVar, Literal, Union
+from typing import TYPE_CHECKING, Annotated, BinaryIO, Callable, ClassVar,
Literal, Union
import attrs
from pydantic import BaseModel, Field, TypeAdapter
@@ -220,6 +220,7 @@ class DagFileProcessorProcess(WatchedSubprocess):
in core Airflow.
"""
+ logger_filehandle: BinaryIO
parsing_result: DagFileParsingResult | None = None
decoder: ClassVar[TypeAdapter[ToManager]] =
TypeAdapter[ToManager](ToManager)
diff --git a/tests/dag_processing/test_manager.py
b/tests/dag_processing/test_manager.py
index 12ccf60b113..fb3f03bd9b8 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -134,6 +134,7 @@ class TestDagFileProcessorManager:
def mock_processor(self) -> DagFileProcessorProcess:
proc = MagicMock()
+ logger_filehandle = MagicMock()
proc.create_time.return_value = time.time()
proc.wait.return_value = 0
ret = DagFileProcessorProcess(
@@ -143,6 +144,7 @@ class TestDagFileProcessorManager:
process=proc,
stdin=io.BytesIO(),
requests_fd=123,
+ logger_filehandle=logger_filehandle,
)
ret._num_open_sockets = 0
return ret
@@ -502,6 +504,7 @@ class TestDagFileProcessorManager:
manager._kill_timed_out_processors()
mock_kill.assert_called_once_with(signal.SIGKILL)
assert len(manager._processors) == 0
+ processor.logger_filehandle.close.assert_called()
def test_kill_timed_out_processors_no_kill(self):
manager = DagFileProcessorManager(
@@ -774,7 +777,11 @@ class TestDagFileProcessorManager:
assert session.query(DbCallbackRequest).count() == 1
@mock.patch.object(DagFileProcessorManager, "_get_logger_for_dag_file")
- def test_callback_queue(self, mock_logger, configure_testing_dag_bundle):
+ def test_callback_queue(self, mock_get_logger,
configure_testing_dag_bundle):
+ mock_logger = MagicMock()
+ mock_filehandle = MagicMock()
+ mock_get_logger.return_value = [mock_logger, mock_filehandle]
+
tmp_path = "/green_eggs/ham"
with configure_testing_dag_bundle(tmp_path):
# given
@@ -855,7 +862,8 @@ class TestDagFileProcessorManager:
bundle_path=dag2_path.bundle_path,
callbacks=[dag2_req1],
selector=mock.ANY,
- logger=mock_logger.return_value,
+ logger=mock_logger,
+ logger_filehandle=mock_filehandle,
),
mock.call(
id=mock.ANY,
@@ -863,7 +871,8 @@ class TestDagFileProcessorManager:
bundle_path=dag1_path.bundle_path,
callbacks=[dag1_req1, dag1_req2],
selector=mock.ANY,
- logger=mock_logger.return_value,
+ logger=mock_logger,
+ logger_filehandle=mock_filehandle,
),
]
# And removed from the queue
diff --git a/tests/dag_processing/test_processor.py
b/tests/dag_processing/test_processor.py
index 3be1d70d84d..e920f1c3311 100644
--- a/tests/dag_processing/test_processor.py
+++ b/tests/dag_processing/test_processor.py
@@ -23,7 +23,7 @@ import sys
import textwrap
from socket import socketpair
from typing import TYPE_CHECKING, Callable
-from unittest.mock import patch
+from unittest.mock import MagicMock, patch
import pytest
import structlog
@@ -133,6 +133,8 @@ class TestDagFileProcessor:
def test_top_level_variable_access(
self, spy_agency: SpyAgency, tmp_path: pathlib.Path, monkeypatch:
pytest.MonkeyPatch
):
+ logger_filehandle = MagicMock()
+
def dag_in_a_fn():
from airflow.sdk import DAG, Variable
@@ -143,10 +145,7 @@ class TestDagFileProcessor:
monkeypatch.setenv("AIRFLOW_VAR_MYVAR", "abc")
proc = DagFileProcessorProcess.start(
- id=1,
- path=path,
- bundle_path=tmp_path,
- callbacks=[],
+ id=1, path=path, bundle_path=tmp_path, callbacks=[],
logger_filehandle=logger_filehandle
)
while not proc.is_ready:
@@ -158,6 +157,8 @@ class TestDagFileProcessor:
assert result.serialized_dags[0].dag_id == "test_abc"
def test_top_level_connection_access(self, tmp_path: pathlib.Path,
monkeypatch: pytest.MonkeyPatch):
+ logger_filehandle = MagicMock()
+
def dag_in_a_fn():
from airflow.hooks.base import BaseHook
from airflow.sdk import DAG
@@ -169,10 +170,7 @@ class TestDagFileProcessor:
monkeypatch.setenv("AIRFLOW_CONN_MY_CONN", '{"conn_type": "aws"}')
proc = DagFileProcessorProcess.start(
- id=1,
- path=path,
- bundle_path=tmp_path,
- callbacks=[],
+ id=1, path=path, bundle_path=tmp_path, callbacks=[],
logger_filehandle=logger_filehandle
)
while not proc.is_ready: