IanWesleyDev opened a new issue, #30245:
URL: https://github.com/apache/airflow/issues/30245

   ### Apache Airflow Provider(s)
   
   cncf-kubernetes
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-cncf-kubernetes==4.4.0, but it appears in the most 
recent code I have seen.
   
   ### Apache Airflow version
   
   apache-airflow==2.4.3
   
   ### Operating System
   
   Ubuntu
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### What happened
   
   The following function which gets logs from K8 pods occasionally, produces 
duplicated logs.  I believe the reason for this is `math.ceil()` rounds up the 
number of seconds and occasionally logs are duplicated due to rounding errors.  
We see this stream cut out while producing logs then return logs from just 
before it cut out.  An example of that is provided bellow. 
   ```
       def fetch_container_logs(
           self, pod: V1Pod, container_name: str, *, follow=False, since_time: 
DateTime | None = None
       ) -> PodLoggingStatus:
           """
           Follows the logs of container and streams to airflow logging.
           Returns when container exits.
           """
   
           def consume_logs(*, since_time: DateTime | None = None, follow: bool 
= True) -> DateTime | None:
               """
               Tries to follow container logs until container completes.
               For a long-running container, sometimes the log read may be 
interrupted
               Such errors of this kind are suppressed.
   
               Returns the last timestamp observed in logs.
               """
               timestamp = None
               try:
                   logs = self.read_pod_logs(
                       pod=pod,
                       container_name=container_name,
                       timestamps=True,
                       since_seconds=(
                           math.ceil((pendulum.now() - 
since_time).total_seconds()) if since_time else None
                       ),
                       follow=follow,
                   )
                   for raw_line in logs:
                       line = raw_line.decode('utf-8', 
errors="backslashreplace")
                       timestamp, message = self.parse_log_line(line)
                       self.log.info(message)
               except BaseHTTPError as e:
                   self.log.warning(
                       "Reading of logs interrupted with error %r; will retry. "
                       "Set log level to DEBUG for traceback.",
                       e,
                   )
                   self.log.debug(
                       "Traceback for interrupted logs read for pod %r",
                       pod.metadata.name,
                       exc_info=True,
                   )
               return timestamp or since_time
   
           # note: `read_pod_logs` follows the logs, so we shouldn't 
necessarily *need* to
           # loop as we do here. But in a long-running process we might 
temporarily lose connectivity.
           # So the looping logic is there to let us resume following the logs.
           last_log_time = since_time
           while True:
               last_log_time = consume_logs(since_time=last_log_time, 
follow=follow)
               if not self.container_is_running(pod, 
container_name=container_name):
                   return PodLoggingStatus(running=False, 
last_log_time=last_log_time)
               if not follow:
                   return PodLoggingStatus(running=True, 
last_log_time=last_log_time)
               else:
                   self.log.warning(
                       'Pod %s log read interrupted but container %s still 
running',
                       pod.metadata.name,
                       container_name,
                   )
                   time.sleep(1)
   ```
   
   Example log illustrating this issue:
   
   ```
   [2022-10-16, 14:45:32 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:32 
UTC level=INFO|message=        
|fileName=aws.py|funcName=send_sqs_message|lineno=349|run_name=e5b06226_geoloc_pipeline_radar_caf_1665924984.629687|run_id=e5b06226_geoloc_pipeline_radar_caf_1665924984.629687|platform_count=3|group_id=e5b06226-b7a2-4174-9301-b243b54e0df0|schedule_id=e5b06226-b7a2-4174-9301-b243b54e0df0|task_type=xband_collect|event=send_notification|notification_type=sqs|notification_url=https://sqs.us-gov-east-1.amazonaws.com/252649730194/scheduler-pass-status.fifo|notification_message=e5b06226-b7a2-4174-9301-b243b54e0df0:delivered-3ball|notification_dedup_id_id=nfaZZWF-vTRsEa5j4jDyRDxA7Z0=|notification_message_id=f9880a14-1dfe-4bb4-bd36-f05cc322dcff|notification_status_code=200|status=success
   [2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message={}        
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/e5b06226-b7a2-4174-9301-b243b54e0df0
   [2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message={}        
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/passes/2a3e94a4-0a5c-42aa-ba53-9bbf39fa30c9/status?status=downloaded
   [2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message=        
|fileName=scheduler_api.py|funcName=get_pass_id_status_timestamp|lineno=31|event=pass_id_status_retrieval|status=success|pass_id=2a3e94a4-0a5c-42aa-ba53-9bbf39fa30c9|pass_status=downloaded
   [2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message={}        
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/passes/2a3e94a4-0a5c-42aa-ba53-9bbf39fa30c9/status?status=Stored
   [2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message=        
|fileName=scheduler_api.py|funcName=get_pass_id_status_timestamp|lineno=31|event=pass_id_status_retrieval|status=success|pass_id=2a3e94a4-0a5c-42aa-ba53-9bbf39fa30c9|pass_status=Stored
   [2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message={}        
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/passes/2a3e94a4-0a5c-42aa-ba53-9bbf39fa30c9/status?status=inspected
   [2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message=        
|fileName=scheduler_api.py|funcName=get_pass_id_status_timestamp|lineno=31|event=pass_id_status_retrieval|status=success|pass_id=2a3e94a4-0a5c-42aa-ba53-9bbf39fa30c9|pass_status=inspected
   [2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message={}        
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/passes/63ef6f27-97b7-4c16-88e6-26c1a6e2581e/status?status=downloaded
   [2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message=        
|fileName=scheduler_api.py|funcName=get_pass_id_status_timestamp|lineno=31|event=pass_id_status_retrieval|status=success|pass_id=63ef6f27-97b7-4c16-88e6-26c1a6e2581e|pass_status=downloaded
   [2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message={}        
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/passes/63ef6f27-97b7-4c16-88e6-26c1a6e2581e/status?status=Stored
   [2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message=        
|fileName=scheduler_api.py|funcName=get_pass_id_status_timestamp|lineno=31|event=pass_id_status_retrieval|status=success|pass_id=63ef6f27-97b7-4c16-88e6-26c1a6e2581e|pass_status=Stored
   [2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message={}        
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/passes/63ef6f27-97b7-4c16-88e6-26c1a6e2581e/status?status=inspected
   [2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message=        
|fileName=scheduler_api.py|funcName=get_pass_id_status_timestamp|lineno=31|event=pass_id_status_retrieval|status=success|pass_id=63ef6f27-97b7-4c16-88e6-26c1a6e2581e|pass_status=inspected
   [2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message={}        
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/passes/beb79165-3d77-4615-b86d-37bb5b81ae64/status?status=downloaded
   [2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message=        
|fileName=scheduler_api.py|funcName=get_pass_id_status_timestamp|lineno=31|event=pass_id_status_retrieval|status=success|pass_id=beb79165-3d77-4615-b86d-37bb5b81ae64|pass_status=downloaded
   [2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message={}        
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/passes/beb79165-3d77-4615-b86d-37bb5b81ae64/status?status=Stored
   [2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message=        
|fileName=scheduler_api.py|funcName=get_pass_id_status_timestamp|lineno=31|event=pass_id_status_retrieval|status=success|pass_id=beb79165-3d77-4615-b86d-37bb5b81ae64|pass_status=Stored
   [2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message={}        
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/passes/beb79165-3d77-4615-b86d-37bb5b81ae64/status?status=inspected
   [2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message=        
|fileName=scheduler_api.py|funcName=get_pass_id_status_timestamp|lineno=31|event=pass_id_status_retrieval|status=success|pass_id=beb79165-3d77-4615-b86d-37bb5b81ae64|pass_status=inspected
   [2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message=        
|fileName=promote_processing_run.py|funcName=log_status_time_intervals|lineno=121|run_name=e5b06226_geoloc_pipeline_radar_caf_1665924984.629687|run_id=e5b06226_geoloc_pipeline_radar_caf_1665924984.629687|platform_count=3|group_id=e5b06226-b7a2-4174-9301-b243b54e0df0|schedule_id=e5b06226-b7a2-4174-9301-b243b54e0df0|task_type=xband_collect|event=collection_end_to_download|collection_end_time=1665887200|downloaded_time=1665904990|latency_minutes=296.5|collection_end_fallback=False|downloaded_fallback=False
   [2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message=        
|fileName=promote_processing_run.py|funcName=log_status_time_intervals|lineno=133|run_name=e5b06226_geoloc_pipeline_radar_caf_1665924984.629687|run_id=e5b06226_geoloc_pipeline_radar_caf_1665924984.629687|platform_count=3|group_id=e5b06226-b7a2-4174-9301-b243b54e0df0|schedule_id=e5b06226-b7a2-4174-9301-b243b54e0df0|task_type=xband_collect|event=download_to_stored|downloaded_time=1665904990|stored_time=1665924817|latency_minutes=330.45|downloaded_fallback=False|stored_fallback=False
   [2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message=        
|fileName=promote_processing_run.py|funcName=log_status_time_intervals|lineno=145|run_name=e5b06226_geoloc_pipeline_radar_caf_1665924984.629687|run_id=e5b06226_geoloc_pipeline_radar_caf_1665924984.629687|platform_count=3|group_id=e5b06226-b7a2-4174-9301-b243b54e0df0|schedule_id=e5b06226-b7a2-4174-9301-b243b54e0df0|task_type=xband_collect|event=stored_to_inspected|stored_time=1665924817|inspected_time=1665924984|latency_minutes=2.783333333333333|stored_fallback=False|inspected_fallback=False
   [2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message=        
|fileName=promote_processing_run.py|funcName=log_status_time_intervals|lineno=155|run_name=e5b06226_geoloc_pipeline_radar_caf_1665924984.629687|run_id=e5b06226_geoloc_pipeline_radar_caf_1665924984.629687|platform_count=3|group_id=e5b06226-b7a2-4174-9301-b243b54e0df0|schedule_id=e5b06226-b7a2-4174-9301-b243b54e0df0|task_type=xband_collect|event=inspected_to_promoted|inspected_time=1665924984|promotion_time=1665931533|latency_minutes=109.15|inspected_fallback=False|status=delivered-3ball
   [2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message=        
|fileName=common.py|funcName=stop|lineno=47|run_name=e5b06226_geoloc_pipeline_radar_caf_1665924984.629687|run_id=e5b06226_geoloc_pipeline_radar_caf_1665924984.629687|platform_count=3|group_id=e5b06226-b7a2-4174-9301-b243b54e0df0|schedule_id=e5b06226-b7a2-4174-9301-b243b54e0df0|task_type=xband_collect|event=ending_heartbeat
   [2022-10-16, 14:45:35 UTC] {pod_manager.py:251} WARNING - Pod 
geoloc-promote-processing-run-bdf89b8515e84aefa0fbb2cbe90b29a6 log read 
interrupted but container base still running
   [2022-10-16, 14:45:36 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message={}        
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/e5b06226-b7a2-4174-9301-b243b54e0df0
   [2022-10-16, 14:45:36 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message={}        
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/passes/2a3e94a4-0a5c-42aa-ba53-9bbf39fa30c9/status?status=downloaded
   [2022-10-16, 14:45:36 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message=        
|fileName=scheduler_api.py|funcName=get_pass_id_status_timestamp|lineno=31|event=pass_id_status_retrieval|status=success|pass_id=2a3e94a4-0a5c-42aa-ba53-9bbf39fa30c9|pass_status=downloaded
   [2022-10-16, 14:45:36 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message={}        
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/passes/2a3e94a4-0a5c-42aa-ba53-9bbf39fa30c9/status?status=Stored
   [2022-10-16, 14:45:36 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message=        
|fileName=scheduler_api.py|funcName=get_pass_id_status_timestamp|lineno=31|event=pass_id_status_retrieval|status=success|pass_id=2a3e94a4-0a5c-42aa-ba53-9bbf39fa30c9|pass_status=Stored
   [2022-10-16, 14:45:36 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message={}        
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/passes/2a3e94a4-0a5c-42aa-ba53-9bbf39fa30c9/status?status=inspected
   [2022-10-16, 14:45:36 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message=        
|fileName=scheduler_api.py|funcName=get_pass_id_status_timestamp|lineno=31|event=pass_id_status_retrieval|status=success|pass_id=2a3e94a4-0a5c-42aa-ba53-9bbf39fa30c9|pass_status=inspected
   [2022-10-16, 14:45:36 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message={}        
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/passes/63ef6f27-97b7-4c16-88e6-26c1a6e2581e/status?status=downloaded
   [2022-10-16, 14:45:36 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message=        
|fileName=scheduler_api.py|funcName=get_pass_id_status_timestamp|lineno=31|event=pass_id_status_retrieval|status=success|pass_id=63ef6f27-97b7-4c16-88e6-26c1a6e2581e|pass_status=downloaded
   [2022-10-16, 14:45:36 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message={}        
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/passes/63ef6f27-97b7-4c16-88e6-26c1a6e2581e/status?status=Stored
   [2022-10-16, 14:45:36 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message=        
|fileName=scheduler_api.py|funcName=get_pass_id_status_timestamp|lineno=31|event=pass_id_status_retrieval|status=success|pass_id=63ef6f27-97b7-4c16-88e6-26c1a6e2581e|pass_status=Stored
   [2022-10-16, 14:45:36 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message={}        
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/passes/63ef6f27-97b7-4c16-88e6-26c1a6e2581e/status?status=inspected
   [2022-10-16, 14:45:36 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message=        
|fileName=scheduler_api.py|funcName=get_pass_id_status_timestamp|lineno=31|event=pass_id_status_retrieval|status=success|pass_id=63ef6f27-97b7-4c16-88e6-26c1a6e2581e|pass_status=inspected
   [2022-10-16, 14:45:36 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message={}        
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/passes/beb79165-3d77-4615-b86d-37bb5b81ae64/status?status=downloaded
   [2022-10-16, 14:45:36 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message=        
|fileName=scheduler_api.py|funcName=get_pass_id_status_timestamp|lineno=31|event=pass_id_status_retrieval|status=success|pass_id=beb79165-3d77-4615-b86d-37bb5b81ae64|pass_status=downloaded
   [2022-10-16, 14:45:36 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message={}        
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/passes/beb79165-3d77-4615-b86d-37bb5b81ae64/status?status=Stored
   [2022-10-16, 14:45:36 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message=        
|fileName=scheduler_api.py|funcName=get_pass_id_status_timestamp|lineno=31|event=pass_id_status_retrieval|status=success|pass_id=beb79165-3d77-4615-b86d-37bb5b81ae64|pass_status=Stored
   [2022-10-16, 14:45:36 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message={}        
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/passes/beb79165-3d77-4615-b86d-37bb5b81ae64/status?status=inspected
   [2022-10-16, 14:45:36 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33 
UTC level=INFO|message=        
|fileName=scheduler_api.py|funcName=get_pass_id_status_timestamp|lineno=31|event=pass_id_status_retrieval|status=success|pass_id=beb79165-3d77-4615-b86d-37bb5b81ae64|pass_status=inspected
   ```
   
   ### What you think should happen instead
   
   I believe the following code should fix this issue. 
   
   ```
       def fetch_container_logs(
           self, pod: V1Pod, container_name: str, *, follow=False, since_time: 
DateTime | None = None
       ) -> PodLoggingStatus:
           """
           Follows the logs of container and streams to airflow logging.
           Returns when container exits.
           """
   
           def consume_logs(*, since_time: DateTime | None = None, follow: bool 
= True) -> DateTime | None:
               """
               Tries to follow container logs until container completes.
               For a long-running container, sometimes the log read may be 
interrupted
               Such errors of this kind are suppressed.
   
               Returns the last timestamp observed in logs.
               """
               timestamp = None
               try:
                   logs = self.read_pod_logs(
                       pod=pod,
                       container_name=container_name,
                       timestamps=True,
                       since_seconds=(
                           math.ceil((pendulum.now() - 
since_time).total_seconds()) if since_time else None
                       ),
                       follow=follow,
                   )
                   for raw_line in logs:
                       line = raw_line.decode('utf-8', 
errors="backslashreplace")
                       tmp_timestamp, message = self.parse_log_line(line)
                       if tmp_timestamp > timestamp:
                           timestamp = tmp_timestamp
                           self.log.info(message)
               except BaseHTTPError as e:
                   self.log.warning(
                       "Reading of logs interrupted with error %r; will retry. "
                       "Set log level to DEBUG for traceback.",
                       e,
                   )
                   self.log.debug(
                       "Traceback for interrupted logs read for pod %r",
                       pod.metadata.name,
                       exc_info=True,
                   )
               return timestamp or since_time
   ```    def fetch_container_logs(
           self, pod: V1Pod, container_name: str, *, follow=False, since_time: 
DateTime | None = None
       ) -> PodLoggingStatus:
           """
           Follows the logs of container and streams to airflow logging.
           Returns when container exits.
           """
   
           def consume_logs(*, since_time: DateTime | None = None, follow: bool 
= True) -> DateTime | None:
               """
               Tries to follow container logs until container completes.
               For a long-running container, sometimes the log read may be 
interrupted
               Such errors of this kind are suppressed.
   
               Returns the last timestamp observed in logs.
               """
               timestamp = None
               try:
                   logs = self.read_pod_logs(
                       pod=pod,
                       container_name=container_name,
                       timestamps=True,
                       since_seconds=(
                           math.ceil((pendulum.now() - 
since_time).total_seconds()) if since_time else None
                       ),
                       follow=follow,
                   )
                   for raw_line in logs:
                       line = raw_line.decode('utf-8', 
errors="backslashreplace")
                       tmp_timestamp, message = self.parse_log_line(line)
                       if tmp_timestamp > timestamp:
                           timestamp = tmp_timestamp
                           self.log.info(message)
               except BaseHTTPError as e:
                   self.log.warning(
                       "Reading of logs interrupted with error %r; will retry. "
                       "Set log level to DEBUG for traceback.",
                       e,
                   )
                   self.log.debug(
                       "Traceback for interrupted logs read for pod %r",
                       pod.metadata.name,
                       exc_info=True,
                   )
               return timestamp or since_time
   
   ### How to reproduce
   
   Please don't close my issue.  I don't know how to provide reproducible code 
for this.  I hope that my example, pointing to exactly where the problem is, 
and my proposed solution is sufficient to get this bug fixed.
   
   ### Anything else
   
   This bug appears fairly frequently in our set up which has lots of long 
running processes on K8 pods.  It impacts some of our reporting and metrics and 
we would like to be able to correct it.  Please let me know if there is anyway 
I can be of assistance as you work to correct the issue. 
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to