This is an automated email from the ASF dual-hosted git repository.
kaxil pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new fd950357d6b Fix KubernetesPodOperator emitting orphan timestamps for
empty container writes (#67652)
fd950357d6b is described below
commit fd950357d6b0485a962b66707be88d947c81bd06
Author: Kaxil Naik <[email protected]>
AuthorDate: Thu May 28 20:00:43 2026 +0100
Fix KubernetesPodOperator emitting orphan timestamps for empty container
writes (#67652)
When a container running under `KubernetesPodOperator` writes an empty
line, kubelet streams it back (with `timestamps=True`) as `"<rfc3339-ts> \n"`
-- a timestamp followed by a separator space and an empty message.
`parse_log_line` in `pod_manager.py` called `line.strip().partition(" ")`,
which removed the trailing separator space before partitioning, so the function
returned `timestamp=None` and the caller treated the line as a continuation of
the previous buffered log record. The b [...]
```
[2026-05-28T13:07:50.160+0000] {pod_manager.py:520} INFO - [base] first
test line
2026-05-28T13:07:57.030578889Z
2026-05-28T13:07:57.030581518Z
2026-05-28T13:07:57.030642740Z
[2026-05-28T13:07:57.034+0000] {pod_manager.py:520} INFO - [base] last test
line
```
Downstream that breaks
[`airflow.utils.log.file_task_handler._parse_timestamp`](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/utils/log/file_task_handler.py#L201-L203),
which feeds the line to `pendulum.parse` after stripping `[]`: malformed
fragments from these orphan rows can raise `ValueError: month must be in 1..12`
and fail the task entirely.
Closes #36571.
## Root cause and history
Regressed in [#33675](https://github.com/apache/airflow/pull/33675) (merged
2023-08-24, shipped in cncf-kubernetes **7.5.0**) which replaced the original
`line.find(\" \")` split with a `line.strip().partition(\" \")` pattern under
the banner of a refactor:
```diff
- split_at = line.find(\" \")
- if split_at == -1:
- ...
- timestamp = line[:split_at]
- message = line[split_at + 1 :].rstrip()
+ timestamp, sep, message = line.strip().partition(\" \")
+ if not sep:
+ ...
```
The pre-refactor implementation correctly handled `<ts> \n` because
`find(\" \")` matched the separator space directly and the message-side
`.rstrip()` produced an empty string. The new code strips the separator off
before partitioning, so the function loses its only signal that the line is
well-formed.
This matches the regression window the original reporter described in
[#36571](https://github.com/apache/airflow/issues/36571): the bug appeared
after upgrading cncf-kubernetes from 7.4.2 (pre-refactor) to 7.12.0+
(post-refactor) and is still reproducible on current `main` (10.17.x).
## Fix
* `parse_log_line` no longer pre-strips the line; it `rstrip(\"\\n\")` only
and partitions on the original separator, so empty container writes are
recognised as `(timestamp, \"\")` rather than as continuations. If the
partition yields no separator the whole line is tried as a bare timestamp (some
kubelet versions emit `<ts>\\n` with no trailing space), and parse failures
fall through to the original return-the-raw-line path. It also catches
`ValueError`, not just `ParserError`, so a [...]
* The sync (`PodManager.fetch_container_logs.consume_logs`) and async
(`AsyncPodManager.fetch_container_logs_before_current_sec`) log consumer loops
skip emit for empty messages -- the resume marker still advances in the sync
path so reconnect-since-time stays correct, but no noisy `[base] ` row is
written.
## Tests
* Parametrized `test_parse_log_line_handles_empty_container_writes` covers
`<ts> \\n`, `<ts>\\n`, and `<ts> ` (no newline). Verified RED on `main`, GREEN
with the fix.
* End-to-end `test_empty_container_lines_do_not_pollute_previous_message`
drives `fetch_container_logs` with the exact log sequence from the issue and
asserts no orphan timestamps land in `caplog`. Also RED on `main`, GREEN with
the fix.
## Gotchas
* Truly empty container output (just `\\n`) is no longer surfaced as a
`[base]` row. That output carries no information for the task log reader and
was previously the trigger for downstream pendulum failures, so dropping it is
a net improvement; if a future use case needs to count blank container lines,
that's separable work.
---
.../providers/cncf/kubernetes/utils/pod_manager.py | 27 +++++++--
.../unit/cncf/kubernetes/utils/test_pod_manager.py | 69 ++++++++++++++++++++++
2 files changed, 92 insertions(+), 4 deletions(-)
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index e4bb7ae9115..835d239f209 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -566,6 +566,15 @@ class PodManager(LoggingMixin):
line = raw_line.decode("utf-8",
errors="backslashreplace")
line_timestamp, message = parse_log_line(line)
if line_timestamp: # detect new log line
+ if not message:
+ # Empty container write: advance the resume
+ # marker but do not emit a noisy ``[base] ``
+ # row or break the previous buffered message
+ # with a stray continuation (#36571).
+ self.container_log_times[
+ (pod.metadata.namespace,
pod.metadata.name, container_name)
+ ] = line_timestamp
+ continue
if message_to_log is None: # first line in the log
message_to_log = message
message_timestamp = line_timestamp
@@ -1108,12 +1117,17 @@ def parse_log_line(line: str) -> tuple[DateTime | None,
str]:
:param line: k8s log line
:return: timestamp and log message
"""
- timestamp, sep, message = line.strip().partition(" ")
- if not sep:
- return None, line
+ # Strip only the trailing newline so an empty container write (which
+ # kubelet streams back as "<rfc3339-ts> \n" under ``timestamps=True``)
+ # keeps the separator space and is recognised as a real log line, not a
+ # continuation of the previous one (#36571). When kubelet emits "<ts>\n"
+ # with no trailing space, ``partition`` returns the whole line as
+ # ``timestamp`` and ``message`` as ``""`` -- the parse below handles both.
+ stripped = line.rstrip("\n")
+ timestamp, _, message = stripped.partition(" ")
try:
last_log_time = cast("DateTime", pendulum.parse(timestamp))
- except ParserError:
+ except (ParserError, ValueError):
return None, line
return last_log_time, message
@@ -1220,6 +1234,11 @@ class AsyncPodManager(LoggingMixin):
if line_timestamp and line_timestamp.replace(microsecond=0) ==
now_seconds:
break
if line_timestamp: # detect new log line
+ if not message:
+ # Empty container write -- drop it instead of letting
+ # it overwrite the buffered message with "" or be
+ # emitted as a noisy ``[base] `` row (#36571).
+ continue
if message_to_log is None: # first line in the log
message_to_log = message
else: # previous log line is complete
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
index 27cfd87fc6d..e2390f90dc3 100644
---
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
+++
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
@@ -88,6 +88,40 @@ def test_parse_log_line():
assert line == log_message
[email protected](
+ ("raw_line", "expected_ts"),
+ [
+ pytest.param(
+ "2026-05-28T13:07:57.030578889Z \n",
+ "2026-05-28T13:07:57.030578889Z",
+ id="trailing-space-and-newline",
+ ),
+ pytest.param(
+ "2026-05-28T13:07:57.030581518Z\n",
+ "2026-05-28T13:07:57.030581518Z",
+ id="newline-only",
+ ),
+ pytest.param(
+ "2026-05-28T13:07:57.030642740Z ",
+ "2026-05-28T13:07:57.030642740Z",
+ id="trailing-space-no-newline",
+ ),
+ ],
+)
+def test_parse_log_line_handles_empty_container_writes(raw_line, expected_ts):
+ """
+ Regression for #36571: an empty container write (just ``\\n``) is streamed
+ back by kubelet as ``"<rfc3339-ts> \\n"`` when ``timestamps=True``. The
+ parser must recognise it as a real (empty) log line rather than as a
+ continuation of the previous one, otherwise the bare timestamp is appended
+ onto the previous buffered message and emitted unformatted into task logs.
+ """
+ timestamp, message = parse_log_line(raw_line)
+
+ assert timestamp == pendulum.parse(expected_ts)
+ assert message == ""
+
+
def test_log_pod_event():
"""Test logging a pod event."""
mock_pod_manager = mock.Mock()
@@ -782,6 +816,41 @@ class TestPodManager:
assert "message3 line1" in caplog.text
assert "ERROR" not in caplog.text
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.read_pod_logs")
+ def test_empty_container_lines_do_not_pollute_previous_message(
+ self, mock_read_pod_logs, mock_container_is_running, caplog
+ ):
+ """
+ Regression for #36571: when a container writes empty lines, kubelet
+ returns them as ``"<ts> \\n"`` rows. Previously these slipped through
+ ``parse_log_line`` as "no timestamp" and were appended as continuations
+ onto the previous buffered message, which then emitted multi-line
+ records where only the first line carried the Airflow log prefix --
+ leaving bare ``<ts>`` rows in task logs that downstream pendulum-based
+ parsers ``(file_task_handler._parse_timestamp)`` then choked on.
+ """
+ log = (
+ "2026-05-28T13:07:50.160Z first test line\n"
+ "2026-05-28T13:07:57.030578889Z \n"
+ "2026-05-28T13:07:57.030581518Z\n"
+ "2026-05-28T13:07:57.030642740Z \n"
+ "2026-05-28T13:07:57.034Z last test line\n"
+ )
+ mock_read_pod_logs.return_value = [bytes(line, "utf-8") for line in
log.split("\n")]
+ mock_container_is_running.return_value = False
+
+ with caplog.at_level(logging.INFO):
+ self.pod_manager.fetch_container_logs(mock.MagicMock(), "base",
follow=True)
+
+ assert "first test line" in caplog.text
+ assert "last test line" in caplog.text
+ # The empty-line timestamps must not leak into the previous message and
+ # must not be emitted as orphan rows.
+ assert "2026-05-28T13:07:57.030578889Z" not in caplog.text
+ assert "2026-05-28T13:07:57.030581518Z" not in caplog.text
+ assert "2026-05-28T13:07:57.030642740Z" not in caplog.text
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.read_pod_logs")
def test_container_log_times_tracks_last_timestamp(self,
mock_read_pod_logs, mock_container_is_running):