ephraimbuddy commented on code in PR #59372:
URL: https://github.com/apache/airflow/pull/59372#discussion_r2671587983


##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -487,12 +487,15 @@ def consume_logs(*, since_time: DateTime | None = None) 
-> tuple[DateTime | None
                                 message_timestamp = line_timestamp
                                 progress_callback_lines.append(line)
                             else:  # previous log line is complete
-                                for line in progress_callback_lines:

Review Comment:
   I have debugged this and the issue is variable shadowing. This line: 
https://github.com/apache/airflow/blob/acae180797b303fdeec1980796943c066c5b8d22/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py#L490
 iterates over the progress_callback_lines using the variable 'line' which was 
defined in outer space. The solution should be to use a different variable 
other than 'line' while iterating through the progress_callback_lines. 
   Here's my solution and I also renamed line as decoded_line to reflect the 
true meaning. A test is also added rather than editing existing tests
   
   ```diff
   diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
   index c9b48158d9..d0f050b543 100644
   --- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
   +++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
   @@ -479,18 +479,18 @@ class PodManager(LoggingMixin):
                    progress_callback_lines = []
                    try:
                        for raw_line in logs:
   -                        line = raw_line.decode("utf-8", 
errors="backslashreplace")
   -                        line_timestamp, message = parse_log_line(line)
   +                        decoded_line = raw_line.decode("utf-8", 
errors="backslashreplace")
   +                        line_timestamp, message = 
parse_log_line(decoded_line)
                            if line_timestamp:  # detect new log line
                                if message_to_log is None:  # first line in the 
log
                                    message_to_log = message
                                    message_timestamp = line_timestamp
   -                                progress_callback_lines.append(line)
   +                                progress_callback_lines.append(decoded_line)
                                else:  # previous log line is complete
   -                                for line in progress_callback_lines:
   +                                for callback_line in 
progress_callback_lines:
                                        for callback in self._callbacks:
                                            callback.progress_callback(
   -                                            line=line, client=self._client, 
mode=ExecutionMode.SYNC
   +                                            line=callback_line, 
client=self._client, mode=ExecutionMode.SYNC
                                            )
                                    if message_to_log is not None:
                                        self._log_message(
   @@ -502,16 +502,16 @@ class PodManager(LoggingMixin):
                                    last_captured_timestamp = message_timestamp
                                    message_to_log = message
                                    message_timestamp = line_timestamp
   -                                progress_callback_lines = [line]
   +                                progress_callback_lines = [decoded_line]
                            else:  # continuation of the previous log line
                                message_to_log = f"{message_to_log}\n{message}"
   -                            progress_callback_lines.append(line)
   +                            progress_callback_lines.append(decoded_line)
                    finally:
                        # log the last line and update the 
last_captured_timestamp
   -                    for line in progress_callback_lines:
   +                    for callback_line in progress_callback_lines:
                            for callback in self._callbacks:
                                callback.progress_callback(
   -                                line=line, client=self._client, 
mode=ExecutionMode.SYNC
   +                                line=callback_line, client=self._client, 
mode=ExecutionMode.SYNC
                                )
                        if message_to_log is not None:
                            self._log_message(
   diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
   index 0622665400..b7f765c421 100644
   --- 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
   +++ 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
   @@ -153,6 +153,28 @@ class TestPodManager:
                callbacks=[MockKubernetesPodOperatorCallback],
            )
   
   +    
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
   +    def 
test_fetch_container_logs_progress_callback_receives_each_line(self, 
mock_container_is_running):
   +        """Verify that progress callbacks receive each raw log line once."""
   +        MockWrapper.reset()
   +        pod = mock.MagicMock()
   +        pod.metadata = mock.MagicMock(name="pod-name", namespace="default")
   +
   +        mock_container_is_running.return_value = False
   +
   +        line_1 = "2025-12-23T10:33:22.125000004Z + for i in '{1..300}'\n"
   +        line_2 = "2025-12-23T10:33:23.000000000Z + echo 'Hi 1'\n"
   +        line_3 = "2025-12-23T10:33:24.000000000Z + sleep 1\n"
   +        self.pod_manager.read_pod_logs = mock.Mock(
   +            return_value=iter([line_1.encode(), line_2.encode(), 
line_3.encode()])
   +        )
   +        self.pod_manager.fetch_container_logs(pod=pod, 
container_name="base", follow=True)
   +
   +        called_lines = [
   +            kwargs["line"] for _args, kwargs in 
MockWrapper.mock_callbacks.progress_callback.call_args_list
   +        ]
   +        assert called_lines == [line_1, line_2, line_3]
   +
        def test_read_pod_logs_successfully_returns_logs(self):
            mock.sentinel.metadata = mock.MagicMock()
            self.mock_kube_client.read_namespaced_pod_log.return_value = 
mock.sentinel.logs
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to