ephraimbuddy commented on code in PR #59372:
URL: https://github.com/apache/airflow/pull/59372#discussion_r2671587983
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -487,12 +487,15 @@ def consume_logs(*, since_time: DateTime | None = None)
-> tuple[DateTime | None
message_timestamp = line_timestamp
progress_callback_lines.append(line)
else: # previous log line is complete
- for line in progress_callback_lines:
Review Comment:
I have debugged this and the issue is variable shadowing. This line:
https://github.com/apache/airflow/blob/acae180797b303fdeec1980796943c066c5b8d22/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py#L490
iterates over the progress_callback_lines using the variable 'line' which was
defined in outer space. The solution should be to use a different variable
other than 'line' while iterating through the progress_callback_lines.
Here's my solution and I also renamed line as decoded_line to reflect the
true meaning. A test is also added rather than editing existing tests
```diff
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index c9b48158d9..d0f050b543 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -479,18 +479,18 @@ class PodManager(LoggingMixin):
progress_callback_lines = []
try:
for raw_line in logs:
- line = raw_line.decode("utf-8",
errors="backslashreplace")
- line_timestamp, message = parse_log_line(line)
+ decoded_line = raw_line.decode("utf-8",
errors="backslashreplace")
+ line_timestamp, message =
parse_log_line(decoded_line)
if line_timestamp: # detect new log line
if message_to_log is None: # first line in the
log
message_to_log = message
message_timestamp = line_timestamp
- progress_callback_lines.append(line)
+ progress_callback_lines.append(decoded_line)
else: # previous log line is complete
- for line in progress_callback_lines:
+ for callback_line in
progress_callback_lines:
for callback in self._callbacks:
callback.progress_callback(
- line=line, client=self._client,
mode=ExecutionMode.SYNC
+ line=callback_line,
client=self._client, mode=ExecutionMode.SYNC
)
if message_to_log is not None:
self._log_message(
@@ -502,16 +502,16 @@ class PodManager(LoggingMixin):
last_captured_timestamp = message_timestamp
message_to_log = message
message_timestamp = line_timestamp
- progress_callback_lines = [line]
+ progress_callback_lines = [decoded_line]
else: # continuation of the previous log line
message_to_log = f"{message_to_log}\n{message}"
- progress_callback_lines.append(line)
+ progress_callback_lines.append(decoded_line)
finally:
# log the last line and update the
last_captured_timestamp
- for line in progress_callback_lines:
+ for callback_line in progress_callback_lines:
for callback in self._callbacks:
callback.progress_callback(
- line=line, client=self._client,
mode=ExecutionMode.SYNC
+ line=callback_line, client=self._client,
mode=ExecutionMode.SYNC
)
if message_to_log is not None:
self._log_message(
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
index 0622665400..b7f765c421 100644
---
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
+++
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
@@ -153,6 +153,28 @@ class TestPodManager:
callbacks=[MockKubernetesPodOperatorCallback],
)
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
+ def
test_fetch_container_logs_progress_callback_receives_each_line(self,
mock_container_is_running):
+ """Verify that progress callbacks receive each raw log line once."""
+ MockWrapper.reset()
+ pod = mock.MagicMock()
+ pod.metadata = mock.MagicMock(name="pod-name", namespace="default")
+
+ mock_container_is_running.return_value = False
+
+ line_1 = "2025-12-23T10:33:22.125000004Z + for i in '{1..300}'\n"
+ line_2 = "2025-12-23T10:33:23.000000000Z + echo 'Hi 1'\n"
+ line_3 = "2025-12-23T10:33:24.000000000Z + sleep 1\n"
+ self.pod_manager.read_pod_logs = mock.Mock(
+ return_value=iter([line_1.encode(), line_2.encode(),
line_3.encode()])
+ )
+ self.pod_manager.fetch_container_logs(pod=pod,
container_name="base", follow=True)
+
+ called_lines = [
+ kwargs["line"] for _args, kwargs in
MockWrapper.mock_callbacks.progress_callback.call_args_list
+ ]
+ assert called_lines == [line_1, line_2, line_3]
+
def test_read_pod_logs_successfully_returns_logs(self):
mock.sentinel.metadata = mock.MagicMock()
self.mock_kube_client.read_namespaced_pod_log.return_value =
mock.sentinel.logs
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]