ashb commented on code in PR #45570:
URL: https://github.com/apache/airflow/pull/45570#discussion_r1911509809
##########
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -417,42 +393,28 @@ def _on_socket_closed(self):
# We want to keep servicing this process until we've read up to EOF
from all the sockets.
self._num_open_sockets -= 1
+ def handle_requests(self, log: FilteringBoundLogger) -> Generator[None,
bytes, None]:
+ """Handle incoming requests from the task process, respond with the
appropriate data."""
+ while True:
+ line = yield
+
+ try:
+ msg = self.decoder.validate_json(line)
+ except Exception:
+ log.exception("Unable to decode message", line=line)
+ continue
+
+ self._handle_request(msg, log)
+
+ def _handle_request(self, msg, log: FilteringBoundLogger) -> None:
Review Comment:
`msg` here is not typed in the base class on purpose.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]