seanghaeli commented on code in PR #66379:
URL: https://github.com/apache/airflow/pull/66379#discussion_r3319295744


##########
task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py:
##########
@@ -242,8 +242,23 @@ def wait(self) -> int:
             self.selector.close()
 
         self._exit_code = self._exit_code if self._exit_code is not None else 1
+
+        # Upload logs to remote storage for UI visibility (same pattern as 
ActivitySubprocess)

Review Comment:
   Done — removed the redundant text from the docstring in the latest commit.



##########
task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py:
##########
@@ -244,8 +244,22 @@ def wait(self) -> int:
             self.selector.close()
 
         self._exit_code = self._exit_code if self._exit_code is not None else 1
+
+        self._upload_logs()
+
         return self._exit_code
 
+    def _upload_logs(self):
+        """Upload callback logs to remote storage if configured."""
+        from airflow.sdk.execution_time.supervisor import _remote_logging_conn
+        from airflow.sdk.log import upload_to_remote
+
+        try:
+            with _remote_logging_conn(self.client):
+                upload_to_remote(self.process_log)
+        except Exception:
+            log.exception("Failed to upload callback logs to remote storage")

Review Comment:
   Fixed — now uses callback_id= to match the pattern in ActivitySubprocess.



##########
task-sdk/tests/task_sdk/execution_time/test_callback_supervisor.py:
##########
@@ -239,3 +239,70 @@ def test_handle_requests(
 
         if client_mock:
             mock_client_method.assert_called_once_with(*client_mock.args, 
**client_mock.kwargs)
+
+
+class TestConfigureLogging:
+    """Tests for _configure_logging remote logging connection setup."""
+
+    def test_configure_logging_uses_remote_logging_conn(self, tmp_path, 
mocker):
+        """Verify that _remote_logging_conn is invoked with the client during 
logging setup."""
+        from airflow.sdk.execution_time.callback_supervisor import 
_configure_logging
+
+        mock_client = mocker.Mock()
+        log_path = str(tmp_path / "callback.log")
+
+        mock_remote_conn = mocker.patch(
+            "airflow.sdk.execution_time.supervisor._remote_logging_conn",
+        )
+
+        logger, fd = _configure_logging(log_path, mock_client)
+        fd.close()
+
+        mock_remote_conn.assert_called_once_with(mock_client)
+
+
+class TestUploadLogs:

Review Comment:
   The test test_wait_calls_upload_logs_after_subprocess_completes already 
covers this — it patches _upload_logs and asserts it was called after wait() 
returns.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to