johnhoran commented on code in PR #59372:
URL: https://github.com/apache/airflow/pull/59372#discussion_r2642735898
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -487,12 +487,15 @@ def consume_logs(*, since_time: DateTime | None = None)
-> tuple[DateTime | None
message_timestamp = line_timestamp
progress_callback_lines.append(line)
else: # previous log line is complete
- for line in progress_callback_lines:
Review Comment:
I wrote a simple dag to test this
```python
"""Example DAG demonstrating the use of the KubernetesPodOperator."""
import logging
import os
from datetime import datetime, timedelta
from typing import TYPE_CHECKING
from airflow import DAG
from airflow.providers.cncf.kubernetes.callbacks import (
KubernetesPodOperatorCallback,
)
from airflow.providers.cncf.kubernetes.operators.pod import
KubernetesPodOperator
class InlineCallback(KubernetesPodOperatorCallback):
log = logging.getLogger(__name__)
@classmethod
# @serializable
def progress_callback(
cls, *, line: str, **kwargs
) -> None:
cls.log.info(f"callback {line}")
with DAG(
dag_id="example_kubernetes_operator",
schedule=None,
start_date=datetime(2021, 1, 1),
tags=["example"],
doc_md=__doc__,
dagrun_timeout=timedelta(minutes=60),
) as dag:
k = KubernetesPodOperator(
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=['for i in {1..300}; do echo "Hi $i"; sleep 1; done'],
name="airflow-test-pod",
task_id="task",
callbacks=[InlineCallback],
)
```
My output is then
```
[2025-12-23T10:33:20.009+0000] {dag.py:4435} INFO - dagrun id:
example_kubernetes_operator
[2025-12-23T10:33:20.029+0000] {dag.py:4451} INFO - created dagrun <DagRun
example_kubernetes_operator @ 2025-10-03 08:00:00+00:00:
manual__2025-10-03T08:00:00+00:00, state:running, queued_at: None. externally
triggered: False>
[2025-12-23T10:33:20.040+0000] {dag.py:4396} INFO - [DAG TEST] starting
task_id=task map_index=-1
[2025-12-23T10:33:20.040+0000] {dag.py:4399} INFO - [DAG TEST] running task
<TaskInstance: example_kubernetes_operator.task
manual__2025-10-03T08:00:00+00:00 [scheduled]>
[2025-12-23 10:33:20,188] {taskinstance.py:3157} INFO - Exporting env vars:
AIRFLOW_CTX_DAG_OWNER='airflow'
AIRFLOW_CTX_DAG_ID='example_kubernetes_operator' AIRFLOW_CTX_TASK_ID='task'
AIRFLOW_CTX_EXECUTION_DATE='2025-10-03T08:00:00+00:00'
AIRFLOW_CTX_TRY_NUMBER='1'
AIRFLOW_CTX_DAG_RUN_ID='manual__2025-10-03T08:00:00+00:00'
[2025-12-23T10:33:20.188+0000] {taskinstance.py:3157} INFO - Exporting env
vars: AIRFLOW_CTX_DAG_OWNER='airflow'
AIRFLOW_CTX_DAG_ID='example_kubernetes_operator' AIRFLOW_CTX_TASK_ID='task'
AIRFLOW_CTX_EXECUTION_DATE='2025-10-03T08:00:00+00:00'
AIRFLOW_CTX_TRY_NUMBER='1'
AIRFLOW_CTX_DAG_RUN_ID='manual__2025-10-03T08:00:00+00:00'
[2025-12-23T10:33:20.191+0000] {taskinstance.py:740} INFO - ::endgroup::
[2025-12-23T10:33:20.226+0000] {base.py:84} INFO - Retrieving connection
'kubernetes_default'
[2025-12-23 10:33:20,226] {pod.py:1325} INFO - Building pod
airflow-test-pod-l4faabb3 with labels: {'dag_id':
'example_kubernetes_operator', 'task_id': 'task', 'run_id':
'manual__2025-10-03T0800000000-37306337f', 'kubernetes_pod_operator': 'True',
'try_number': '1'}
[2025-12-23T10:33:20.226+0000] {pod.py:1325} INFO - Building pod
airflow-test-pod-l4faabb3 with labels: {'dag_id':
'example_kubernetes_operator', 'task_id': 'task', 'run_id':
'manual__2025-10-03T0800000000-37306337f', 'kubernetes_pod_operator': 'True',
'try_number': '1'}
[2025-12-23 10:33:20,280] {pod.py:586} INFO - Found matching pod
airflow-test-pod-yw6mn5em with labels {'airflow_kpo_in_cluster': 'False',
'airflow_version': '2.11.0-astro.3', 'dag_id': 'example_kubernetes_operator',
'foo': 'bar', 'kubernetes_pod_operator': 'True', 'run_id':
'manual__2025-10-03T0800000000-37306337f', 'task_id': 'task', 'try_number': '1'}
[2025-12-23T10:33:20.280+0000] {pod.py:586} INFO - Found matching pod
airflow-test-pod-yw6mn5em with labels {'airflow_kpo_in_cluster': 'False',
'airflow_version': '2.11.0-astro.3', 'dag_id': 'example_kubernetes_operator',
'foo': 'bar', 'kubernetes_pod_operator': 'True', 'run_id':
'manual__2025-10-03T0800000000-37306337f', 'task_id': 'task', 'try_number': '1'}
[2025-12-23 10:33:20,281] {pod.py:587} INFO - `try_number` of task_instance:
1
[2025-12-23T10:33:20.281+0000] {pod.py:587} INFO - `try_number` of
task_instance: 1
[2025-12-23 10:33:20,281] {pod.py:588} INFO - `try_number` of pod: 1
[2025-12-23T10:33:20.281+0000] {pod.py:588} INFO - `try_number` of pod: 1
[2025-12-23 10:33:20,281] {pod.py:606} INFO - Found terminated old matching
pod airflow-test-pod-yw6mn5em with labels {'airflow_kpo_in_cluster': 'False',
'airflow_version': '2.11.0-astro.3', 'dag_id': 'example_kubernetes_operator',
'foo': 'bar', 'kubernetes_pod_operator': 'True', 'run_id':
'manual__2025-10-03T0800000000-37306337f', 'task_id': 'task', 'try_number': '1'}
[2025-12-23T10:33:20.281+0000] {pod.py:606} INFO - Found terminated old
matching pod airflow-test-pod-yw6mn5em with labels {'airflow_kpo_in_cluster':
'False', 'airflow_version': '2.11.0-astro.3', 'dag_id':
'example_kubernetes_operator', 'foo': 'bar', 'kubernetes_pod_operator': 'True',
'run_id': 'manual__2025-10-03T0800000000-37306337f', 'task_id': 'task',
'try_number': '1'}
[2025-12-23 10:33:20,281] {pod.py:1169} INFO - Deleting pod:
airflow-test-pod-yw6mn5em
[2025-12-23T10:33:20.281+0000] {pod.py:1169} INFO - Deleting pod:
airflow-test-pod-yw6mn5em
[2025-12-23 10:33:20,324] {pod.py:617} INFO - Deleted pod to handle rerun
and create new pod!
[2025-12-23T10:33:20.324+0000] {pod.py:617} INFO - Deleted pod to handle
rerun and create new pod!
[2025-12-23 10:33:20,380] {pod.py:586} INFO - Found matching pod
airflow-test-pod-l4faabb3 with labels {'airflow_kpo_in_cluster': 'False',
'airflow_version': '2.11.0-astro.3', 'dag_id': 'example_kubernetes_operator',
'foo': 'bar', 'kubernetes_pod_operator': 'True', 'run_id':
'manual__2025-10-03T0800000000-37306337f', 'task_id': 'task', 'try_number': '1'}
[2025-12-23T10:33:20.380+0000] {pod.py:586} INFO - Found matching pod
airflow-test-pod-l4faabb3 with labels {'airflow_kpo_in_cluster': 'False',
'airflow_version': '2.11.0-astro.3', 'dag_id': 'example_kubernetes_operator',
'foo': 'bar', 'kubernetes_pod_operator': 'True', 'run_id':
'manual__2025-10-03T0800000000-37306337f', 'task_id': 'task', 'try_number': '1'}
[2025-12-23 10:33:20,381] {pod.py:587} INFO - `try_number` of task_instance:
1
[2025-12-23T10:33:20.381+0000] {pod.py:587} INFO - `try_number` of
task_instance: 1
[2025-12-23 10:33:20,381] {pod.py:588} INFO - `try_number` of pod: 1
[2025-12-23T10:33:20.381+0000] {pod.py:588} INFO - `try_number` of pod: 1
[2025-12-23T10:33:20.383+0000] {pod_manager.py:146} INFO - ::group::Waiting
until 120s to get the POD scheduled...
[2025-12-23T10:33:20.392+0000] {pod_manager.py:170} INFO - Waiting 120s to
get the POD running...
[2025-12-23T10:33:20.399+0000] {pod_manager.py:120} INFO - The Pod has an
Event: Successfully assigned default/airflow-test-pod-l4faabb3 to
docker-desktop from None
[2025-12-23T10:33:25.424+0000] {pod_manager.py:158} INFO - ::endgroup::
[2025-12-23 10:33:25,446] {pod.py:586} INFO - Found matching pod
airflow-test-pod-l4faabb3 with labels {'airflow_kpo_in_cluster': 'False',
'airflow_version': '2.11.0-astro.3', 'dag_id': 'example_kubernetes_operator',
'foo': 'bar', 'kubernetes_pod_operator': 'True', 'run_id':
'manual__2025-10-03T0800000000-37306337f', 'task_id': 'task', 'try_number': '1'}
[2025-12-23T10:33:25.446+0000] {pod.py:586} INFO - Found matching pod
airflow-test-pod-l4faabb3 with labels {'airflow_kpo_in_cluster': 'False',
'airflow_version': '2.11.0-astro.3', 'dag_id': 'example_kubernetes_operator',
'foo': 'bar', 'kubernetes_pod_operator': 'True', 'run_id':
'manual__2025-10-03T0800000000-37306337f', 'task_id': 'task', 'try_number': '1'}
[2025-12-23 10:33:25,448] {pod.py:587} INFO - `try_number` of task_instance:
1
[2025-12-23T10:33:25.448+0000] {pod.py:587} INFO - `try_number` of
task_instance: 1
[2025-12-23 10:33:25,448] {pod.py:588} INFO - `try_number` of pod: 1
[2025-12-23T10:33:25.448+0000] {pod.py:588} INFO - `try_number` of pod: 1
[2025-12-23T10:33:28.052+0000] {test.py:25} INFO - callback
2025-12-23T10:33:22.125000004Z + for i in '{1..300}'
[2025-12-23T10:33:28.057+0000] {pod_manager.py:401} INFO - [base] + for i in
'{1..300}'
[2025-12-23T10:33:29.332+0000] {test.py:25} INFO - callback
2025-12-23T10:33:22.125000004Z + for i in '{1..300}'
[2025-12-23T10:33:29.333+0000] {pod_manager.py:401} INFO - [base] + echo 'Hi
1'
[2025-12-23T10:33:31.966+0000] {test.py:25} INFO - callback
2025-12-23T10:33:22.125000004Z + for i in '{1..300}'
[2025-12-23T10:33:31.967+0000] {pod_manager.py:401} INFO - [base] + sleep 1
[2025-12-23T10:33:33.257+0000] {test.py:25} INFO - callback
2025-12-23T10:33:22.125000004Z + for i in '{1..300}'
[2025-12-23T10:33:33.259+0000] {pod_manager.py:401} INFO - [base] Hi 1
[2025-12-23T10:33:36.756+0000] {test.py:25} INFO - callback
2025-12-23T10:33:22.125000004Z + for i in '{1..300}'
[2025-12-23T10:33:36.758+0000] {pod_manager.py:401} INFO - [base] Hi 2
[2025-12-23T10:33:43.641+0000] {test.py:25} INFO - callback
2025-12-23T10:33:22.125000004Z + for i in '{1..300}'
[2025-12-23T10:33:43.643+0000] {pod_manager.py:401} INFO - [base] + for i in
'{1..300}'
```
It just continues like that, the callback receives the first line and then
keeps repeating it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]