This is an automated email from the ASF dual-hosted git repository.

tirkarthi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new aeec6efccad Close the logger's file descriptor once the 
DagFileProcessorProcess is complete. (#47574)
aeec6efccad is described below

commit aeec6efccadea733a4c7f211e10927f9d9dcb40e
Author: Karthikeyan Singaravelan <[email protected]>
AuthorDate: Wed Mar 12 10:55:34 2025 +0530

    Close the logger's file descriptor once the DagFileProcessorProcess is 
complete. (#47574)
    
    Close the logger's file descriptor once the DagFileProcessorProcess is 
complete. This also includes closing the file descriptor in case of timeouts.
---
 airflow/dag_processing/manager.py      | 21 ++++++++++++++-------
 airflow/dag_processing/processor.py    |  3 ++-
 tests/dag_processing/test_manager.py   | 15 ++++++++++++---
 tests/dag_processing/test_processor.py | 16 +++++++---------
 4 files changed, 35 insertions(+), 20 deletions(-)

diff --git a/airflow/dag_processing/manager.py 
b/airflow/dag_processing/manager.py
index 13e77795060..5000da8f006 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -458,8 +458,7 @@ class DagFileProcessorManager(LoggingMixin):
         now_seconds = time.monotonic()
         if now_seconds < next_check:
             self.log.debug(
-                "Not time to check if DAG Bundles need refreshed yet - 
skipping. "
-                "Next check in %.2f seconds",
+                "Not time to check if DAG Bundles need refreshed yet - 
skipping. Next check in %.2f seconds",
                 next_check - now_seconds,
             )
             return
@@ -762,6 +761,7 @@ class DagFileProcessorManager(LoggingMixin):
                 self.log.warning("Stopping processor for %s", file)
                 Stats.decr("dag_processing.processes", tags={"file_path": 
file, "action": "stop"})
                 processor.kill(signal.SIGKILL)
+                processor.logger_filehandle.close()
                 self._file_stats.pop(file, None)
 
     @provide_session
@@ -786,7 +786,8 @@ class DagFileProcessorManager(LoggingMixin):
             )
 
         for file in finished:
-            self._processors.pop(file)
+            processor = self._processors.pop(file)
+            processor.logger_filehandle.close()
 
     def _get_log_dir(self) -> str:
         return os.path.join(self.base_log_dir, 
timezone.utcnow().strftime("%Y-%m-%d"))
@@ -829,14 +830,18 @@ class DagFileProcessorManager(LoggingMixin):
     def _get_logger_for_dag_file(self, dag_file: DagFileInfo):
         log_filename = self._render_log_filename(dag_file)
         log_file = init_log_file(log_filename)
-        underlying_logger = structlog.BytesLogger(log_file.open("ab"))
+        logger_filehandle = log_file.open("ab")
+        underlying_logger = structlog.BytesLogger(logger_filehandle)
         processors = logging_processors(enable_pretty_log=False)[0]
-        return structlog.wrap_logger(underlying_logger, processors=processors, 
logger_name="processor").bind()
+        return structlog.wrap_logger(
+            underlying_logger, processors=processors, logger_name="processor"
+        ).bind(), logger_filehandle
 
     def _create_process(self, dag_file: DagFileInfo) -> 
DagFileProcessorProcess:
         id = uuid7()
 
         callback_to_execute_for_file = self._callback_to_execute.pop(dag_file, 
[])
+        logger, logger_filehandle = self._get_logger_for_dag_file(dag_file)
 
         return DagFileProcessorProcess.start(
             id=id,
@@ -844,7 +849,8 @@ class DagFileProcessorManager(LoggingMixin):
             bundle_path=cast(Path, dag_file.bundle_path),
             callbacks=callback_to_execute_for_file,
             selector=self.selector,
-            logger=self._get_logger_for_dag_file(dag_file),
+            logger=logger,
+            logger_filehandle=logger_filehandle,
         )
 
     def _start_new_processes(self):
@@ -985,7 +991,8 @@ class DagFileProcessorManager(LoggingMixin):
 
         # Clean up `self._processors` after iterating over it
         for proc in processors_to_remove:
-            self._processors.pop(proc)
+            processor = self._processors.pop(proc)
+            processor.logger_filehandle.close()
 
     def _add_files_to_queue(self, files: list[DagFileInfo], add_at_front: 
bool):
         """Add stuff to the back or front of the file queue, unless it's 
already present."""
diff --git a/airflow/dag_processing/processor.py 
b/airflow/dag_processing/processor.py
index c5420d0b21f..889f9b232ce 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -21,7 +21,7 @@ import os
 import sys
 import traceback
 from pathlib import Path
-from typing import TYPE_CHECKING, Annotated, Callable, ClassVar, Literal, Union
+from typing import TYPE_CHECKING, Annotated, BinaryIO, Callable, ClassVar, 
Literal, Union
 
 import attrs
 from pydantic import BaseModel, Field, TypeAdapter
@@ -220,6 +220,7 @@ class DagFileProcessorProcess(WatchedSubprocess):
     in core Airflow.
     """
 
+    logger_filehandle: BinaryIO
     parsing_result: DagFileParsingResult | None = None
     decoder: ClassVar[TypeAdapter[ToManager]] = 
TypeAdapter[ToManager](ToManager)
 
diff --git a/tests/dag_processing/test_manager.py 
b/tests/dag_processing/test_manager.py
index 12ccf60b113..fb3f03bd9b8 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -134,6 +134,7 @@ class TestDagFileProcessorManager:
 
     def mock_processor(self) -> DagFileProcessorProcess:
         proc = MagicMock()
+        logger_filehandle = MagicMock()
         proc.create_time.return_value = time.time()
         proc.wait.return_value = 0
         ret = DagFileProcessorProcess(
@@ -143,6 +144,7 @@ class TestDagFileProcessorManager:
             process=proc,
             stdin=io.BytesIO(),
             requests_fd=123,
+            logger_filehandle=logger_filehandle,
         )
         ret._num_open_sockets = 0
         return ret
@@ -502,6 +504,7 @@ class TestDagFileProcessorManager:
             manager._kill_timed_out_processors()
         mock_kill.assert_called_once_with(signal.SIGKILL)
         assert len(manager._processors) == 0
+        processor.logger_filehandle.close.assert_called()
 
     def test_kill_timed_out_processors_no_kill(self):
         manager = DagFileProcessorManager(
@@ -774,7 +777,11 @@ class TestDagFileProcessorManager:
                 assert session.query(DbCallbackRequest).count() == 1
 
     @mock.patch.object(DagFileProcessorManager, "_get_logger_for_dag_file")
-    def test_callback_queue(self, mock_logger, configure_testing_dag_bundle):
+    def test_callback_queue(self, mock_get_logger, 
configure_testing_dag_bundle):
+        mock_logger = MagicMock()
+        mock_filehandle = MagicMock()
+        mock_get_logger.return_value = [mock_logger, mock_filehandle]
+
         tmp_path = "/green_eggs/ham"
         with configure_testing_dag_bundle(tmp_path):
             # given
@@ -855,7 +862,8 @@ class TestDagFileProcessorManager:
                     bundle_path=dag2_path.bundle_path,
                     callbacks=[dag2_req1],
                     selector=mock.ANY,
-                    logger=mock_logger.return_value,
+                    logger=mock_logger,
+                    logger_filehandle=mock_filehandle,
                 ),
                 mock.call(
                     id=mock.ANY,
@@ -863,7 +871,8 @@ class TestDagFileProcessorManager:
                     bundle_path=dag1_path.bundle_path,
                     callbacks=[dag1_req1, dag1_req2],
                     selector=mock.ANY,
-                    logger=mock_logger.return_value,
+                    logger=mock_logger,
+                    logger_filehandle=mock_filehandle,
                 ),
             ]
             # And removed from the queue
diff --git a/tests/dag_processing/test_processor.py 
b/tests/dag_processing/test_processor.py
index 3be1d70d84d..e920f1c3311 100644
--- a/tests/dag_processing/test_processor.py
+++ b/tests/dag_processing/test_processor.py
@@ -23,7 +23,7 @@ import sys
 import textwrap
 from socket import socketpair
 from typing import TYPE_CHECKING, Callable
-from unittest.mock import patch
+from unittest.mock import MagicMock, patch
 
 import pytest
 import structlog
@@ -133,6 +133,8 @@ class TestDagFileProcessor:
     def test_top_level_variable_access(
         self, spy_agency: SpyAgency, tmp_path: pathlib.Path, monkeypatch: 
pytest.MonkeyPatch
     ):
+        logger_filehandle = MagicMock()
+
         def dag_in_a_fn():
             from airflow.sdk import DAG, Variable
 
@@ -143,10 +145,7 @@ class TestDagFileProcessor:
 
         monkeypatch.setenv("AIRFLOW_VAR_MYVAR", "abc")
         proc = DagFileProcessorProcess.start(
-            id=1,
-            path=path,
-            bundle_path=tmp_path,
-            callbacks=[],
+            id=1, path=path, bundle_path=tmp_path, callbacks=[], 
logger_filehandle=logger_filehandle
         )
 
         while not proc.is_ready:
@@ -158,6 +157,8 @@ class TestDagFileProcessor:
         assert result.serialized_dags[0].dag_id == "test_abc"
 
     def test_top_level_connection_access(self, tmp_path: pathlib.Path, 
monkeypatch: pytest.MonkeyPatch):
+        logger_filehandle = MagicMock()
+
         def dag_in_a_fn():
             from airflow.hooks.base import BaseHook
             from airflow.sdk import DAG
@@ -169,10 +170,7 @@ class TestDagFileProcessor:
 
         monkeypatch.setenv("AIRFLOW_CONN_MY_CONN", '{"conn_type": "aws"}')
         proc = DagFileProcessorProcess.start(
-            id=1,
-            path=path,
-            bundle_path=tmp_path,
-            callbacks=[],
+            id=1, path=path, bundle_path=tmp_path, callbacks=[], 
logger_filehandle=logger_filehandle
         )
 
         while not proc.is_ready:

Reply via email to