johnhoran commented on code in PR #59372:
URL: https://github.com/apache/airflow/pull/59372#discussion_r2642735898


##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -487,12 +487,15 @@ def consume_logs(*, since_time: DateTime | None = None) 
-> tuple[DateTime | None
                                 message_timestamp = line_timestamp
                                 progress_callback_lines.append(line)
                             else:  # previous log line is complete
-                                for line in progress_callback_lines:

Review Comment:
   I wrote a simple dag to test this
   ```python
   """Example DAG demonstrating the use of the KubernetesPodOperator."""
   
   import logging
   import os
   from datetime import datetime, timedelta
   from typing import TYPE_CHECKING
   
   from airflow import DAG
   from airflow.providers.cncf.kubernetes.callbacks import (
       KubernetesPodOperatorCallback,
   )
   from airflow.providers.cncf.kubernetes.operators.pod import 
KubernetesPodOperator
   
   
   
   class InlineCallback(KubernetesPodOperatorCallback):
       log = logging.getLogger(__name__)
   
       @classmethod
       # @serializable
       def progress_callback(
           cls, *, line: str, **kwargs
       ) -> None:
           cls.log.info(f"callback {line}")
   
   
   with DAG(
       dag_id="example_kubernetes_operator",
       schedule=None,
       start_date=datetime(2021, 1, 1),
       tags=["example"],
       doc_md=__doc__,
       dagrun_timeout=timedelta(minutes=60),
   ) as dag:
       k = KubernetesPodOperator(
           image="ubuntu:16.04",
           cmds=["bash", "-cx"],
           arguments=['for i in {1..300}; do echo "Hi $i"; sleep 1; done'],
           name="airflow-test-pod",
           task_id="task",
           callbacks=[InlineCallback],
       )
   ```
   
   My output is then 
   ```
   [2025-12-23T10:33:20.009+0000] {dag.py:4435} INFO - dagrun id: 
example_kubernetes_operator
   [2025-12-23T10:33:20.029+0000] {dag.py:4451} INFO - created dagrun <DagRun 
example_kubernetes_operator @ 2025-10-03 08:00:00+00:00: 
manual__2025-10-03T08:00:00+00:00, state:running, queued_at: None. externally 
triggered: False>
   [2025-12-23T10:33:20.040+0000] {dag.py:4396} INFO - [DAG TEST] starting 
task_id=task map_index=-1
   [2025-12-23T10:33:20.040+0000] {dag.py:4399} INFO - [DAG TEST] running task 
<TaskInstance: example_kubernetes_operator.task 
manual__2025-10-03T08:00:00+00:00 [scheduled]>
   [2025-12-23 10:33:20,188] {taskinstance.py:3157} INFO - Exporting env vars: 
AIRFLOW_CTX_DAG_OWNER='airflow' 
AIRFLOW_CTX_DAG_ID='example_kubernetes_operator' AIRFLOW_CTX_TASK_ID='task' 
AIRFLOW_CTX_EXECUTION_DATE='2025-10-03T08:00:00+00:00' 
AIRFLOW_CTX_TRY_NUMBER='1' 
AIRFLOW_CTX_DAG_RUN_ID='manual__2025-10-03T08:00:00+00:00'
   [2025-12-23T10:33:20.188+0000] {taskinstance.py:3157} INFO - Exporting env 
vars: AIRFLOW_CTX_DAG_OWNER='airflow' 
AIRFLOW_CTX_DAG_ID='example_kubernetes_operator' AIRFLOW_CTX_TASK_ID='task' 
AIRFLOW_CTX_EXECUTION_DATE='2025-10-03T08:00:00+00:00' 
AIRFLOW_CTX_TRY_NUMBER='1' 
AIRFLOW_CTX_DAG_RUN_ID='manual__2025-10-03T08:00:00+00:00'
   [2025-12-23T10:33:20.191+0000] {taskinstance.py:740} INFO - ::endgroup::
   [2025-12-23T10:33:20.226+0000] {base.py:84} INFO - Retrieving connection 
'kubernetes_default'
   [2025-12-23 10:33:20,226] {pod.py:1325} INFO - Building pod 
airflow-test-pod-l4faabb3 with labels: {'dag_id': 
'example_kubernetes_operator', 'task_id': 'task', 'run_id': 
'manual__2025-10-03T0800000000-37306337f', 'kubernetes_pod_operator': 'True', 
'try_number': '1'}
   [2025-12-23T10:33:20.226+0000] {pod.py:1325} INFO - Building pod 
airflow-test-pod-l4faabb3 with labels: {'dag_id': 
'example_kubernetes_operator', 'task_id': 'task', 'run_id': 
'manual__2025-10-03T0800000000-37306337f', 'kubernetes_pod_operator': 'True', 
'try_number': '1'}
   [2025-12-23 10:33:20,280] {pod.py:586} INFO - Found matching pod 
airflow-test-pod-yw6mn5em with labels {'airflow_kpo_in_cluster': 'False', 
'airflow_version': '2.11.0-astro.3', 'dag_id': 'example_kubernetes_operator', 
'foo': 'bar', 'kubernetes_pod_operator': 'True', 'run_id': 
'manual__2025-10-03T0800000000-37306337f', 'task_id': 'task', 'try_number': '1'}
   [2025-12-23T10:33:20.280+0000] {pod.py:586} INFO - Found matching pod 
airflow-test-pod-yw6mn5em with labels {'airflow_kpo_in_cluster': 'False', 
'airflow_version': '2.11.0-astro.3', 'dag_id': 'example_kubernetes_operator', 
'foo': 'bar', 'kubernetes_pod_operator': 'True', 'run_id': 
'manual__2025-10-03T0800000000-37306337f', 'task_id': 'task', 'try_number': '1'}
   [2025-12-23 10:33:20,281] {pod.py:587} INFO - `try_number` of task_instance: 
1
   [2025-12-23T10:33:20.281+0000] {pod.py:587} INFO - `try_number` of 
task_instance: 1
   [2025-12-23 10:33:20,281] {pod.py:588} INFO - `try_number` of pod: 1
   [2025-12-23T10:33:20.281+0000] {pod.py:588} INFO - `try_number` of pod: 1
   [2025-12-23 10:33:20,281] {pod.py:606} INFO - Found terminated old matching 
pod airflow-test-pod-yw6mn5em with labels {'airflow_kpo_in_cluster': 'False', 
'airflow_version': '2.11.0-astro.3', 'dag_id': 'example_kubernetes_operator', 
'foo': 'bar', 'kubernetes_pod_operator': 'True', 'run_id': 
'manual__2025-10-03T0800000000-37306337f', 'task_id': 'task', 'try_number': '1'}
   [2025-12-23T10:33:20.281+0000] {pod.py:606} INFO - Found terminated old 
matching pod airflow-test-pod-yw6mn5em with labels {'airflow_kpo_in_cluster': 
'False', 'airflow_version': '2.11.0-astro.3', 'dag_id': 
'example_kubernetes_operator', 'foo': 'bar', 'kubernetes_pod_operator': 'True', 
'run_id': 'manual__2025-10-03T0800000000-37306337f', 'task_id': 'task', 
'try_number': '1'}
   [2025-12-23 10:33:20,281] {pod.py:1169} INFO - Deleting pod: 
airflow-test-pod-yw6mn5em
   [2025-12-23T10:33:20.281+0000] {pod.py:1169} INFO - Deleting pod: 
airflow-test-pod-yw6mn5em
   [2025-12-23 10:33:20,324] {pod.py:617} INFO - Deleted pod to handle rerun 
and create new pod!
   [2025-12-23T10:33:20.324+0000] {pod.py:617} INFO - Deleted pod to handle 
rerun and create new pod!
   [2025-12-23 10:33:20,380] {pod.py:586} INFO - Found matching pod 
airflow-test-pod-l4faabb3 with labels {'airflow_kpo_in_cluster': 'False', 
'airflow_version': '2.11.0-astro.3', 'dag_id': 'example_kubernetes_operator', 
'foo': 'bar', 'kubernetes_pod_operator': 'True', 'run_id': 
'manual__2025-10-03T0800000000-37306337f', 'task_id': 'task', 'try_number': '1'}
   [2025-12-23T10:33:20.380+0000] {pod.py:586} INFO - Found matching pod 
airflow-test-pod-l4faabb3 with labels {'airflow_kpo_in_cluster': 'False', 
'airflow_version': '2.11.0-astro.3', 'dag_id': 'example_kubernetes_operator', 
'foo': 'bar', 'kubernetes_pod_operator': 'True', 'run_id': 
'manual__2025-10-03T0800000000-37306337f', 'task_id': 'task', 'try_number': '1'}
   [2025-12-23 10:33:20,381] {pod.py:587} INFO - `try_number` of task_instance: 
1
   [2025-12-23T10:33:20.381+0000] {pod.py:587} INFO - `try_number` of 
task_instance: 1
   [2025-12-23 10:33:20,381] {pod.py:588} INFO - `try_number` of pod: 1
   [2025-12-23T10:33:20.381+0000] {pod.py:588} INFO - `try_number` of pod: 1
   [2025-12-23T10:33:20.383+0000] {pod_manager.py:146} INFO - ::group::Waiting 
until 120s to get the POD scheduled...
   [2025-12-23T10:33:20.392+0000] {pod_manager.py:170} INFO - Waiting 120s to 
get the POD running...
   [2025-12-23T10:33:20.399+0000] {pod_manager.py:120} INFO - The Pod has an 
Event: Successfully assigned default/airflow-test-pod-l4faabb3 to 
docker-desktop from None
   [2025-12-23T10:33:25.424+0000] {pod_manager.py:158} INFO - ::endgroup::
   [2025-12-23 10:33:25,446] {pod.py:586} INFO - Found matching pod 
airflow-test-pod-l4faabb3 with labels {'airflow_kpo_in_cluster': 'False', 
'airflow_version': '2.11.0-astro.3', 'dag_id': 'example_kubernetes_operator', 
'foo': 'bar', 'kubernetes_pod_operator': 'True', 'run_id': 
'manual__2025-10-03T0800000000-37306337f', 'task_id': 'task', 'try_number': '1'}
   [2025-12-23T10:33:25.446+0000] {pod.py:586} INFO - Found matching pod 
airflow-test-pod-l4faabb3 with labels {'airflow_kpo_in_cluster': 'False', 
'airflow_version': '2.11.0-astro.3', 'dag_id': 'example_kubernetes_operator', 
'foo': 'bar', 'kubernetes_pod_operator': 'True', 'run_id': 
'manual__2025-10-03T0800000000-37306337f', 'task_id': 'task', 'try_number': '1'}
   [2025-12-23 10:33:25,448] {pod.py:587} INFO - `try_number` of task_instance: 
1
   [2025-12-23T10:33:25.448+0000] {pod.py:587} INFO - `try_number` of 
task_instance: 1
   [2025-12-23 10:33:25,448] {pod.py:588} INFO - `try_number` of pod: 1
   [2025-12-23T10:33:25.448+0000] {pod.py:588} INFO - `try_number` of pod: 1
   [2025-12-23T10:33:28.052+0000] {test.py:25} INFO - callback 
2025-12-23T10:33:22.125000004Z + for i in '{1..300}'
   
   [2025-12-23T10:33:28.057+0000] {pod_manager.py:401} INFO - [base] + for i in 
'{1..300}'
   [2025-12-23T10:33:29.332+0000] {test.py:25} INFO - callback 
2025-12-23T10:33:22.125000004Z + for i in '{1..300}'
   
   [2025-12-23T10:33:29.333+0000] {pod_manager.py:401} INFO - [base] + echo 'Hi 
1'
   [2025-12-23T10:33:31.966+0000] {test.py:25} INFO - callback 
2025-12-23T10:33:22.125000004Z + for i in '{1..300}'
   
   [2025-12-23T10:33:31.967+0000] {pod_manager.py:401} INFO - [base] + sleep 1
   [2025-12-23T10:33:33.257+0000] {test.py:25} INFO - callback 
2025-12-23T10:33:22.125000004Z + for i in '{1..300}'
   
   [2025-12-23T10:33:33.259+0000] {pod_manager.py:401} INFO - [base] Hi 1
   [2025-12-23T10:33:36.756+0000] {test.py:25} INFO - callback 
2025-12-23T10:33:22.125000004Z + for i in '{1..300}'
   
   [2025-12-23T10:33:36.758+0000] {pod_manager.py:401} INFO - [base] Hi 2
   [2025-12-23T10:33:43.641+0000] {test.py:25} INFO - callback 
2025-12-23T10:33:22.125000004Z + for i in '{1..300}'
   
   [2025-12-23T10:33:43.643+0000] {pod_manager.py:401} INFO - [base] + for i in 
'{1..300}'
   ```
   It just continues like that, the callback receives the first line and then 
keeps repeating it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to