IanWesleyDev opened a new issue, #30245:
URL: https://github.com/apache/airflow/issues/30245
### Apache Airflow Provider(s)
cncf-kubernetes
### Versions of Apache Airflow Providers
apache-airflow-providers-cncf-kubernetes==4.4.0, but it appears in the most
recent code I have seen.
### Apache Airflow version
apache-airflow==2.4.3
### Operating System
Ubuntu
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
_No response_
### What happened
The following function which gets logs from K8 pods occasionally, produces
duplicated logs. I believe the reason for this is `math.ceil()` rounds up the
number of seconds and occasionally logs are duplicated due to rounding errors.
We see this stream cut out while producing logs then return logs from just
before it cut out. An example of that is provided bellow.
```
def fetch_container_logs(
self, pod: V1Pod, container_name: str, *, follow=False, since_time:
DateTime | None = None
) -> PodLoggingStatus:
"""
Follows the logs of container and streams to airflow logging.
Returns when container exits.
"""
def consume_logs(*, since_time: DateTime | None = None, follow: bool
= True) -> DateTime | None:
"""
Tries to follow container logs until container completes.
For a long-running container, sometimes the log read may be
interrupted
Such errors of this kind are suppressed.
Returns the last timestamp observed in logs.
"""
timestamp = None
try:
logs = self.read_pod_logs(
pod=pod,
container_name=container_name,
timestamps=True,
since_seconds=(
math.ceil((pendulum.now() -
since_time).total_seconds()) if since_time else None
),
follow=follow,
)
for raw_line in logs:
line = raw_line.decode('utf-8',
errors="backslashreplace")
timestamp, message = self.parse_log_line(line)
self.log.info(message)
except BaseHTTPError as e:
self.log.warning(
"Reading of logs interrupted with error %r; will retry. "
"Set log level to DEBUG for traceback.",
e,
)
self.log.debug(
"Traceback for interrupted logs read for pod %r",
pod.metadata.name,
exc_info=True,
)
return timestamp or since_time
# note: `read_pod_logs` follows the logs, so we shouldn't
necessarily *need* to
# loop as we do here. But in a long-running process we might
temporarily lose connectivity.
# So the looping logic is there to let us resume following the logs.
last_log_time = since_time
while True:
last_log_time = consume_logs(since_time=last_log_time,
follow=follow)
if not self.container_is_running(pod,
container_name=container_name):
return PodLoggingStatus(running=False,
last_log_time=last_log_time)
if not follow:
return PodLoggingStatus(running=True,
last_log_time=last_log_time)
else:
self.log.warning(
'Pod %s log read interrupted but container %s still
running',
pod.metadata.name,
container_name,
)
time.sleep(1)
```
Example log illustrating this issue:
```
[2022-10-16, 14:45:32 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:32
UTC level=INFO|message=
|fileName=aws.py|funcName=send_sqs_message|lineno=349|run_name=e5b06226_geoloc_pipeline_radar_caf_1665924984.629687|run_id=e5b06226_geoloc_pipeline_radar_caf_1665924984.629687|platform_count=3|group_id=e5b06226-b7a2-4174-9301-b243b54e0df0|schedule_id=e5b06226-b7a2-4174-9301-b243b54e0df0|task_type=xband_collect|event=send_notification|notification_type=sqs|notification_url=https://sqs.us-gov-east-1.amazonaws.com/252649730194/scheduler-pass-status.fifo|notification_message=e5b06226-b7a2-4174-9301-b243b54e0df0:delivered-3ball|notification_dedup_id_id=nfaZZWF-vTRsEa5j4jDyRDxA7Z0=|notification_message_id=f9880a14-1dfe-4bb4-bd36-f05cc322dcff|notification_status_code=200|status=success
[2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message={}
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/e5b06226-b7a2-4174-9301-b243b54e0df0
[2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message={}
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/passes/2a3e94a4-0a5c-42aa-ba53-9bbf39fa30c9/status?status=downloaded
[2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message=
|fileName=scheduler_api.py|funcName=get_pass_id_status_timestamp|lineno=31|event=pass_id_status_retrieval|status=success|pass_id=2a3e94a4-0a5c-42aa-ba53-9bbf39fa30c9|pass_status=downloaded
[2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message={}
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/passes/2a3e94a4-0a5c-42aa-ba53-9bbf39fa30c9/status?status=Stored
[2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message=
|fileName=scheduler_api.py|funcName=get_pass_id_status_timestamp|lineno=31|event=pass_id_status_retrieval|status=success|pass_id=2a3e94a4-0a5c-42aa-ba53-9bbf39fa30c9|pass_status=Stored
[2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message={}
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/passes/2a3e94a4-0a5c-42aa-ba53-9bbf39fa30c9/status?status=inspected
[2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message=
|fileName=scheduler_api.py|funcName=get_pass_id_status_timestamp|lineno=31|event=pass_id_status_retrieval|status=success|pass_id=2a3e94a4-0a5c-42aa-ba53-9bbf39fa30c9|pass_status=inspected
[2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message={}
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/passes/63ef6f27-97b7-4c16-88e6-26c1a6e2581e/status?status=downloaded
[2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message=
|fileName=scheduler_api.py|funcName=get_pass_id_status_timestamp|lineno=31|event=pass_id_status_retrieval|status=success|pass_id=63ef6f27-97b7-4c16-88e6-26c1a6e2581e|pass_status=downloaded
[2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message={}
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/passes/63ef6f27-97b7-4c16-88e6-26c1a6e2581e/status?status=Stored
[2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message=
|fileName=scheduler_api.py|funcName=get_pass_id_status_timestamp|lineno=31|event=pass_id_status_retrieval|status=success|pass_id=63ef6f27-97b7-4c16-88e6-26c1a6e2581e|pass_status=Stored
[2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message={}
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/passes/63ef6f27-97b7-4c16-88e6-26c1a6e2581e/status?status=inspected
[2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message=
|fileName=scheduler_api.py|funcName=get_pass_id_status_timestamp|lineno=31|event=pass_id_status_retrieval|status=success|pass_id=63ef6f27-97b7-4c16-88e6-26c1a6e2581e|pass_status=inspected
[2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message={}
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/passes/beb79165-3d77-4615-b86d-37bb5b81ae64/status?status=downloaded
[2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message=
|fileName=scheduler_api.py|funcName=get_pass_id_status_timestamp|lineno=31|event=pass_id_status_retrieval|status=success|pass_id=beb79165-3d77-4615-b86d-37bb5b81ae64|pass_status=downloaded
[2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message={}
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/passes/beb79165-3d77-4615-b86d-37bb5b81ae64/status?status=Stored
[2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message=
|fileName=scheduler_api.py|funcName=get_pass_id_status_timestamp|lineno=31|event=pass_id_status_retrieval|status=success|pass_id=beb79165-3d77-4615-b86d-37bb5b81ae64|pass_status=Stored
[2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message={}
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/passes/beb79165-3d77-4615-b86d-37bb5b81ae64/status?status=inspected
[2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message=
|fileName=scheduler_api.py|funcName=get_pass_id_status_timestamp|lineno=31|event=pass_id_status_retrieval|status=success|pass_id=beb79165-3d77-4615-b86d-37bb5b81ae64|pass_status=inspected
[2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message=
|fileName=promote_processing_run.py|funcName=log_status_time_intervals|lineno=121|run_name=e5b06226_geoloc_pipeline_radar_caf_1665924984.629687|run_id=e5b06226_geoloc_pipeline_radar_caf_1665924984.629687|platform_count=3|group_id=e5b06226-b7a2-4174-9301-b243b54e0df0|schedule_id=e5b06226-b7a2-4174-9301-b243b54e0df0|task_type=xband_collect|event=collection_end_to_download|collection_end_time=1665887200|downloaded_time=1665904990|latency_minutes=296.5|collection_end_fallback=False|downloaded_fallback=False
[2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message=
|fileName=promote_processing_run.py|funcName=log_status_time_intervals|lineno=133|run_name=e5b06226_geoloc_pipeline_radar_caf_1665924984.629687|run_id=e5b06226_geoloc_pipeline_radar_caf_1665924984.629687|platform_count=3|group_id=e5b06226-b7a2-4174-9301-b243b54e0df0|schedule_id=e5b06226-b7a2-4174-9301-b243b54e0df0|task_type=xband_collect|event=download_to_stored|downloaded_time=1665904990|stored_time=1665924817|latency_minutes=330.45|downloaded_fallback=False|stored_fallback=False
[2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message=
|fileName=promote_processing_run.py|funcName=log_status_time_intervals|lineno=145|run_name=e5b06226_geoloc_pipeline_radar_caf_1665924984.629687|run_id=e5b06226_geoloc_pipeline_radar_caf_1665924984.629687|platform_count=3|group_id=e5b06226-b7a2-4174-9301-b243b54e0df0|schedule_id=e5b06226-b7a2-4174-9301-b243b54e0df0|task_type=xband_collect|event=stored_to_inspected|stored_time=1665924817|inspected_time=1665924984|latency_minutes=2.783333333333333|stored_fallback=False|inspected_fallback=False
[2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message=
|fileName=promote_processing_run.py|funcName=log_status_time_intervals|lineno=155|run_name=e5b06226_geoloc_pipeline_radar_caf_1665924984.629687|run_id=e5b06226_geoloc_pipeline_radar_caf_1665924984.629687|platform_count=3|group_id=e5b06226-b7a2-4174-9301-b243b54e0df0|schedule_id=e5b06226-b7a2-4174-9301-b243b54e0df0|task_type=xband_collect|event=inspected_to_promoted|inspected_time=1665924984|promotion_time=1665931533|latency_minutes=109.15|inspected_fallback=False|status=delivered-3ball
[2022-10-16, 14:45:33 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message=
|fileName=common.py|funcName=stop|lineno=47|run_name=e5b06226_geoloc_pipeline_radar_caf_1665924984.629687|run_id=e5b06226_geoloc_pipeline_radar_caf_1665924984.629687|platform_count=3|group_id=e5b06226-b7a2-4174-9301-b243b54e0df0|schedule_id=e5b06226-b7a2-4174-9301-b243b54e0df0|task_type=xband_collect|event=ending_heartbeat
[2022-10-16, 14:45:35 UTC] {pod_manager.py:251} WARNING - Pod
geoloc-promote-processing-run-bdf89b8515e84aefa0fbb2cbe90b29a6 log read
interrupted but container base still running
[2022-10-16, 14:45:36 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message={}
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/e5b06226-b7a2-4174-9301-b243b54e0df0
[2022-10-16, 14:45:36 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message={}
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/passes/2a3e94a4-0a5c-42aa-ba53-9bbf39fa30c9/status?status=downloaded
[2022-10-16, 14:45:36 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message=
|fileName=scheduler_api.py|funcName=get_pass_id_status_timestamp|lineno=31|event=pass_id_status_retrieval|status=success|pass_id=2a3e94a4-0a5c-42aa-ba53-9bbf39fa30c9|pass_status=downloaded
[2022-10-16, 14:45:36 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message={}
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/passes/2a3e94a4-0a5c-42aa-ba53-9bbf39fa30c9/status?status=Stored
[2022-10-16, 14:45:36 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message=
|fileName=scheduler_api.py|funcName=get_pass_id_status_timestamp|lineno=31|event=pass_id_status_retrieval|status=success|pass_id=2a3e94a4-0a5c-42aa-ba53-9bbf39fa30c9|pass_status=Stored
[2022-10-16, 14:45:36 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message={}
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/passes/2a3e94a4-0a5c-42aa-ba53-9bbf39fa30c9/status?status=inspected
[2022-10-16, 14:45:36 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message=
|fileName=scheduler_api.py|funcName=get_pass_id_status_timestamp|lineno=31|event=pass_id_status_retrieval|status=success|pass_id=2a3e94a4-0a5c-42aa-ba53-9bbf39fa30c9|pass_status=inspected
[2022-10-16, 14:45:36 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message={}
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/passes/63ef6f27-97b7-4c16-88e6-26c1a6e2581e/status?status=downloaded
[2022-10-16, 14:45:36 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message=
|fileName=scheduler_api.py|funcName=get_pass_id_status_timestamp|lineno=31|event=pass_id_status_retrieval|status=success|pass_id=63ef6f27-97b7-4c16-88e6-26c1a6e2581e|pass_status=downloaded
[2022-10-16, 14:45:36 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message={}
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/passes/63ef6f27-97b7-4c16-88e6-26c1a6e2581e/status?status=Stored
[2022-10-16, 14:45:36 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message=
|fileName=scheduler_api.py|funcName=get_pass_id_status_timestamp|lineno=31|event=pass_id_status_retrieval|status=success|pass_id=63ef6f27-97b7-4c16-88e6-26c1a6e2581e|pass_status=Stored
[2022-10-16, 14:45:36 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message={}
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/passes/63ef6f27-97b7-4c16-88e6-26c1a6e2581e/status?status=inspected
[2022-10-16, 14:45:36 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message=
|fileName=scheduler_api.py|funcName=get_pass_id_status_timestamp|lineno=31|event=pass_id_status_retrieval|status=success|pass_id=63ef6f27-97b7-4c16-88e6-26c1a6e2581e|pass_status=inspected
[2022-10-16, 14:45:36 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message={}
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/passes/beb79165-3d77-4615-b86d-37bb5b81ae64/status?status=downloaded
[2022-10-16, 14:45:36 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message=
|fileName=scheduler_api.py|funcName=get_pass_id_status_timestamp|lineno=31|event=pass_id_status_retrieval|status=success|pass_id=beb79165-3d77-4615-b86d-37bb5b81ae64|pass_status=downloaded
[2022-10-16, 14:45:36 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message={}
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/passes/beb79165-3d77-4615-b86d-37bb5b81ae64/status?status=Stored
[2022-10-16, 14:45:36 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message=
|fileName=scheduler_api.py|funcName=get_pass_id_status_timestamp|lineno=31|event=pass_id_status_retrieval|status=success|pass_id=beb79165-3d77-4615-b86d-37bb5b81ae64|pass_status=Stored
[2022-10-16, 14:45:36 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message={}
|fileName=api_utils.py|funcName=_check_request_response|lineno=54|event=get_retry_request|status=success|url=https://scheduler.space.prod.gov.he2pi.com/api/schedule/passes/beb79165-3d77-4615-b86d-37bb5b81ae64/status?status=inspected
[2022-10-16, 14:45:36 UTC] {pod_manager.py:226} INFO - 2022-10-16, 14:45:33
UTC level=INFO|message=
|fileName=scheduler_api.py|funcName=get_pass_id_status_timestamp|lineno=31|event=pass_id_status_retrieval|status=success|pass_id=beb79165-3d77-4615-b86d-37bb5b81ae64|pass_status=inspected
```
### What you think should happen instead
I believe the following code should fix this issue.
```
def fetch_container_logs(
self, pod: V1Pod, container_name: str, *, follow=False, since_time:
DateTime | None = None
) -> PodLoggingStatus:
"""
Follows the logs of container and streams to airflow logging.
Returns when container exits.
"""
def consume_logs(*, since_time: DateTime | None = None, follow: bool
= True) -> DateTime | None:
"""
Tries to follow container logs until container completes.
For a long-running container, sometimes the log read may be
interrupted
Such errors of this kind are suppressed.
Returns the last timestamp observed in logs.
"""
timestamp = None
try:
logs = self.read_pod_logs(
pod=pod,
container_name=container_name,
timestamps=True,
since_seconds=(
math.ceil((pendulum.now() -
since_time).total_seconds()) if since_time else None
),
follow=follow,
)
for raw_line in logs:
line = raw_line.decode('utf-8',
errors="backslashreplace")
tmp_timestamp, message = self.parse_log_line(line)
if tmp_timestamp > timestamp:
timestamp = tmp_timestamp
self.log.info(message)
except BaseHTTPError as e:
self.log.warning(
"Reading of logs interrupted with error %r; will retry. "
"Set log level to DEBUG for traceback.",
e,
)
self.log.debug(
"Traceback for interrupted logs read for pod %r",
pod.metadata.name,
exc_info=True,
)
return timestamp or since_time
``` def fetch_container_logs(
self, pod: V1Pod, container_name: str, *, follow=False, since_time:
DateTime | None = None
) -> PodLoggingStatus:
"""
Follows the logs of container and streams to airflow logging.
Returns when container exits.
"""
def consume_logs(*, since_time: DateTime | None = None, follow: bool
= True) -> DateTime | None:
"""
Tries to follow container logs until container completes.
For a long-running container, sometimes the log read may be
interrupted
Such errors of this kind are suppressed.
Returns the last timestamp observed in logs.
"""
timestamp = None
try:
logs = self.read_pod_logs(
pod=pod,
container_name=container_name,
timestamps=True,
since_seconds=(
math.ceil((pendulum.now() -
since_time).total_seconds()) if since_time else None
),
follow=follow,
)
for raw_line in logs:
line = raw_line.decode('utf-8',
errors="backslashreplace")
tmp_timestamp, message = self.parse_log_line(line)
if tmp_timestamp > timestamp:
timestamp = tmp_timestamp
self.log.info(message)
except BaseHTTPError as e:
self.log.warning(
"Reading of logs interrupted with error %r; will retry. "
"Set log level to DEBUG for traceback.",
e,
)
self.log.debug(
"Traceback for interrupted logs read for pod %r",
pod.metadata.name,
exc_info=True,
)
return timestamp or since_time
### How to reproduce
Please don't close my issue. I don't know how to provide reproducible code
for this. I hope that my example, pointing to exactly where the problem is,
and my proposed solution is sufficient to get this bug fixed.
### Anything else
This bug appears fairly frequently in our set up which has lots of long
running processes on K8 pods. It impacts some of our reporting and metrics and
we would like to be able to correct it. Please let me know if there is anyway
I can be of assistance as you work to correct the issue.
### Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]