ramitkataria commented on code in PR #66379:
URL: https://github.com/apache/airflow/pull/66379#discussion_r3284948214
##########
task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py:
##########
@@ -244,8 +244,22 @@ def wait(self) -> int:
self.selector.close()
self._exit_code = self._exit_code if self._exit_code is not None else 1
+
+ self._upload_logs()
+
return self._exit_code
+ def _upload_logs(self):
+ """Upload callback logs to remote storage if configured."""
+ from airflow.sdk.execution_time.supervisor import _remote_logging_conn
+ from airflow.sdk.log import upload_to_remote
+
+ try:
+ with _remote_logging_conn(self.client):
+ upload_to_remote(self.process_log)
+ except Exception:
+ log.exception("Failed to upload callback logs to remote storage")
Review Comment:
Nit: the `ActivitySubprocess` equivalent at `supervisor.py:1381` logs
structured context for debuggability:
`self.process_log.exception("Failed to upload remote logs", ti_id=self.id,
pid=self.pid)`
So, it would be nice to include similar fields here. When multiple callbacks
run concurrently, the bare message won't tell you which one failed. Maybe
something like:
```suggestion
log.exception("Failed to upload callback logs to remote
storage", callback_id=self.id, pid=self.pid)
```
##########
task-sdk/tests/task_sdk/execution_time/test_callback_supervisor.py:
##########
@@ -239,3 +239,70 @@ def test_handle_requests(
if client_mock:
mock_client_method.assert_called_once_with(*client_mock.args,
**client_mock.kwargs)
+
+
+class TestConfigureLogging:
+ """Tests for _configure_logging remote logging connection setup."""
+
+ def test_configure_logging_uses_remote_logging_conn(self, tmp_path,
mocker):
+ """Verify that _remote_logging_conn is invoked with the client during
logging setup."""
+ from airflow.sdk.execution_time.callback_supervisor import
_configure_logging
+
+ mock_client = mocker.Mock()
+ log_path = str(tmp_path / "callback.log")
+
+ mock_remote_conn = mocker.patch(
+ "airflow.sdk.execution_time.supervisor._remote_logging_conn",
+ )
+
+ logger, fd = _configure_logging(log_path, mock_client)
+ fd.close()
+
+ mock_remote_conn.assert_called_once_with(mock_client)
+
+
+class TestUploadLogs:
Review Comment:
Nit: these test `_upload_logs()` directly, which is great for validating its
internal behaviour but it would be nice to have a small test that verifies
wait() actually calls _upload_logs() after the subprocess exits
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]