This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 33f0f1d639b Improve handling around stopping/killing individual dag 
file processors (#48019)
33f0f1d639b is described below

commit 33f0f1d639b53d4221757c8493b819684bd26d21
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Thu Mar 20 17:11:13 2025 +0000

    Improve handling around stopping/killing individual dag file processors 
(#48019)
    
    We've had occasional reports of the following stack trace from the Dag
    processor:
    
    ```
        self.processor.terminate()
      File "/opt/airflow/airflow/dag_processing/manager.py", line 1020, in 
terminate
        processor.kill(signal.SIGTERM, escalation_delay=5.0)
      File 
"/opt/airflow/task-sdk/src/airflow/sdk/execution_time/supervisor.py", line 544, 
in kill
        exit_code := self._service_subprocess(max_wait_time=end - now, 
raise_on_timeout=False)
      File 
"/opt/airflow/task-sdk/src/airflow/sdk/execution_time/supervisor.py", line 601, 
in _service_subprocess
        need_more = socket_handler(key.fileobj)
      File 
"/opt/airflow/task-sdk/src/airflow/sdk/execution_time/supervisor.py", line 960, 
in cb
        gen.send(line)
    StopIteration
    ```
    
    This should now correctly handle this case.
    
    And another thing noticed in testing this I noticed that the duration for
    in-process parsing processes was incorrect.
---
 airflow/dag_processing/manager.py                     |  2 +-
 task-sdk/src/airflow/sdk/execution_time/supervisor.py | 13 +++++++++----
 2 files changed, 10 insertions(+), 5 deletions(-)

diff --git a/airflow/dag_processing/manager.py 
b/airflow/dag_processing/manager.py
index b0795131d88..70829cd20ed 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -656,7 +656,7 @@ class DagFileProcessorManager(LoggingMixin):
 
         rows = []
         utcnow = timezone.utcnow()
-        now = time.monotonic()
+        now = time.time()
 
         for files in known_files.values():
             for file in files:
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py 
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 3af3fa9e8c0..2bc88563e1a 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -640,7 +640,7 @@ class WatchedSubprocess:
         if self._exit_code is None:
             try:
                 self._exit_code = self._process.wait(timeout=0)
-                log.debug("Workload process exited", exit_code=self._exit_code)
+                log.debug("%s process exited", type(self).__name__, 
exit_code=self._exit_code)
                 self._close_unused_sockets(self.stdin)
             except psutil.TimeoutExpired:
                 if raise_on_timeout:
@@ -976,7 +976,7 @@ class ActivitySubprocess(WatchedSubprocess):
 # This returns a callback suitable for attaching to a `selector` that reads in 
to a buffer, and yields lines
 # to a (sync) generator
 def make_buffered_socket_reader(
-    gen: Generator[None, bytes, None],
+    gen: Generator[None, bytes | bytearray, None],
     on_close: Callable,
     buffer_size: int = 4096,
 ) -> Callable[[socket], bool]:
@@ -994,7 +994,8 @@ def make_buffered_socket_reader(
         if not n_received:
             # If no data is returned, the connection is closed. Return 
whatever is left in the buffer
             if len(buffer):
-                gen.send(buffer)
+                with suppress(StopIteration):
+                    gen.send(buffer)
             # Tell loop to close this selector
             on_close()
             return False
@@ -1004,7 +1005,11 @@ def make_buffered_socket_reader(
         # We could have read multiple lines in one go, yield them all
         while (newline_pos := buffer.find(b"\n")) != -1:
             line = buffer[: newline_pos + 1]
-            gen.send(line)
+            try:
+                gen.send(line)
+            except StopIteration:
+                on_close()
+                return False
             buffer = buffer[newline_pos + 1 :]  # Update the buffer with 
remaining data
 
         return True

Reply via email to