ashb commented on code in PR #45570:
URL: https://github.com/apache/airflow/pull/45570#discussion_r1911520136
##########
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -510,6 +472,141 @@ def kill(
log.error("Failed to terminate process after full escalation",
pid=self.pid)
+ def wait(self) -> int:
+ raise NotImplementedError()
+
+ def __rich_repr__(self):
+ yield "id", self.id
+ yield "pid", self.pid
+ # only include this if it's not the default (third argument)
+ yield "exit_code", self._exit_code, None
+
+ __rich_repr__.angular = True # type: ignore[attr-defined]
+
+ def __repr__(self) -> str:
+ rep = f"<{type(self).__name__} id={self.id} pid={self.pid}"
+ if self._exit_code is not None:
+ rep += f" exit_code={self._exit_code}"
+ return rep + " >"
+
+ def _service_subprocess(self, max_wait_time: float, raise_on_timeout: bool
= False):
+ """
+ Service subprocess events by processing socket activity and checking
for process exit.
+
+ This method:
+ - Waits for activity on the registered file objects (via
`self.selector.select`).
+ - Processes any events triggered on these file objects.
+ - Checks if the subprocess has exited during the wait.
+
+ :param max_wait_time: Maximum time to block while waiting for events,
in seconds.
+ :param raise_on_timeout: If True, raise an exception if the subprocess
does not exit within the timeout.
+ :returns: The process exit code, or None if it's still alive
+ """
+ events = self.selector.select(timeout=max_wait_time)
+ for key, _ in events:
+ # Retrieve the handler responsible for processing this file object
(e.g., stdout, stderr)
+ socket_handler = key.data
+
+ # Example of handler behavior:
+ # If the subprocess writes "Hello, World!" to stdout:
+ # - `socket_handler` reads and processes the message.
+ # - If EOF is reached, the handler returns False to signal no more
reads are expected.
+ need_more = socket_handler(key.fileobj)
+
+ # If the handler signals that the file object is no longer needed
(EOF, closed, etc.)
+ # unregister it from the selector to stop monitoring; `wait()`
blocks until all selectors
+ # are removed.
+ if not need_more:
+ self.selector.unregister(key.fileobj)
+ key.fileobj.close() # type: ignore[union-attr]
+
+ # Check if the subprocess has exited
+ return self._check_subprocess_exit(raise_on_timeout=raise_on_timeout)
+
+ def _check_subprocess_exit(self, raise_on_timeout: bool = False) -> int |
None:
+ """Check if the subprocess has exited."""
+ if self._exit_code is None:
+ try:
+ self._exit_code = self._process.wait(timeout=0)
+ log.debug("Workload process exited", exit_code=self._exit_code)
+ except psutil.TimeoutExpired:
+ if raise_on_timeout:
+ raise
+ return self._exit_code
+
+
+def _fake_required():
+ raise TypeError("ActivitySubprocess.__init__() missing 1 required keyword
argument: 'client")
Review Comment:
https://github.com/apache/airflow/pull/45570#discussion_r1911513259 -- but
I've just worked out a way to not have it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]