This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 33f0f1d639b Improve handling around stopping/killing individual dag
file processors (#48019)
33f0f1d639b is described below
commit 33f0f1d639b53d4221757c8493b819684bd26d21
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Thu Mar 20 17:11:13 2025 +0000
Improve handling around stopping/killing individual dag file processors
(#48019)
We've had occasional reports of the following stack trace from the Dag
processor:
```
self.processor.terminate()
File "/opt/airflow/airflow/dag_processing/manager.py", line 1020, in
terminate
processor.kill(signal.SIGTERM, escalation_delay=5.0)
File
"/opt/airflow/task-sdk/src/airflow/sdk/execution_time/supervisor.py", line 544,
in kill
exit_code := self._service_subprocess(max_wait_time=end - now,
raise_on_timeout=False)
File
"/opt/airflow/task-sdk/src/airflow/sdk/execution_time/supervisor.py", line 601,
in _service_subprocess
need_more = socket_handler(key.fileobj)
File
"/opt/airflow/task-sdk/src/airflow/sdk/execution_time/supervisor.py", line 960,
in cb
gen.send(line)
StopIteration
```
This should now correctly handle this case.
And another thing noticed in testing this I noticed that the duration for
in-process parsing processes was incorrect.
---
airflow/dag_processing/manager.py | 2 +-
task-sdk/src/airflow/sdk/execution_time/supervisor.py | 13 +++++++++----
2 files changed, 10 insertions(+), 5 deletions(-)
diff --git a/airflow/dag_processing/manager.py
b/airflow/dag_processing/manager.py
index b0795131d88..70829cd20ed 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -656,7 +656,7 @@ class DagFileProcessorManager(LoggingMixin):
rows = []
utcnow = timezone.utcnow()
- now = time.monotonic()
+ now = time.time()
for files in known_files.values():
for file in files:
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 3af3fa9e8c0..2bc88563e1a 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -640,7 +640,7 @@ class WatchedSubprocess:
if self._exit_code is None:
try:
self._exit_code = self._process.wait(timeout=0)
- log.debug("Workload process exited", exit_code=self._exit_code)
+ log.debug("%s process exited", type(self).__name__,
exit_code=self._exit_code)
self._close_unused_sockets(self.stdin)
except psutil.TimeoutExpired:
if raise_on_timeout:
@@ -976,7 +976,7 @@ class ActivitySubprocess(WatchedSubprocess):
# This returns a callback suitable for attaching to a `selector` that reads in
to a buffer, and yields lines
# to a (sync) generator
def make_buffered_socket_reader(
- gen: Generator[None, bytes, None],
+ gen: Generator[None, bytes | bytearray, None],
on_close: Callable,
buffer_size: int = 4096,
) -> Callable[[socket], bool]:
@@ -994,7 +994,8 @@ def make_buffered_socket_reader(
if not n_received:
# If no data is returned, the connection is closed. Return
whatever is left in the buffer
if len(buffer):
- gen.send(buffer)
+ with suppress(StopIteration):
+ gen.send(buffer)
# Tell loop to close this selector
on_close()
return False
@@ -1004,7 +1005,11 @@ def make_buffered_socket_reader(
# We could have read multiple lines in one go, yield them all
while (newline_pos := buffer.find(b"\n")) != -1:
line = buffer[: newline_pos + 1]
- gen.send(line)
+ try:
+ gen.send(line)
+ except StopIteration:
+ on_close()
+ return False
buffer = buffer[newline_pos + 1 :] # Update the buffer with
remaining data
return True