This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new cbb04e5133 Remove duplicated logs by reusing PodLogsConsumer (#34127)
cbb04e5133 is described below
commit cbb04e513352e237baf6075ff8c6a59cc88d1122
Author: Dongwon Kim <[email protected]>
AuthorDate: Sun Oct 1 18:14:11 2023 +0900
Remove duplicated logs by reusing PodLogsConsumer (#34127)
* Remove duplicated logs by reusing PodLogsConsumer
* reuse the kubernetes connection not to re-consume the logs
* If a failure occurs while consuming logs through `PodLogsConsumer`, a
new `PodLogsConsumer` is created.
* But, at this time there are duplicated logs even though they have
already been consumed.
* To fix the duplicated logs, `PodLogsConsumer` instance is created
initially at once and is to reused
when a failure occurs to prevent duplicate logs from occurring.
Signed-off-by: 김동원 <[email protected]>
* Move the try inside the connection creation
Signed-off-by: 김동원 <[email protected]>
* Validate reusing PodLogsConsumer instance
Signed-off-by: 김동원 <[email protected]>
* Check there are duplicated logs
Signed-off-by: 김동원 <[email protected]>
---------
Signed-off-by: 김동원 <[email protected]>
Co-authored-by: 김동원 <[email protected]>
Co-authored-by: Elad Kalif <[email protected]>
---
.../providers/cncf/kubernetes/utils/pod_manager.py | 39 +++++++++++++---------
.../cncf/kubernetes/utils/test_pod_manager.py | 25 ++++++++++++++
2 files changed, 49 insertions(+), 15 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index cfb262cdec..05829a1615 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -392,8 +392,12 @@ class PodManager(LoggingMixin):
before=before_log(self.log, logging.INFO),
)
def consume_logs(
- *, since_time: DateTime | None = None, follow: bool = True,
termination_timeout: int = 120
- ) -> DateTime | None:
+ *,
+ since_time: DateTime | None = None,
+ follow: bool = True,
+ termination_timeout: int = 120,
+ logs: PodLogsConsumer | None,
+ ) -> tuple[DateTime | None, PodLogsConsumer | None]:
"""
Tries to follow container logs until container completes.
@@ -404,16 +408,17 @@ class PodManager(LoggingMixin):
"""
last_captured_timestamp = None
try:
- logs = self.read_pod_logs(
- pod=pod,
- container_name=container_name,
- timestamps=True,
- since_seconds=(
- math.ceil((pendulum.now() -
since_time).total_seconds()) if since_time else None
- ),
- follow=follow,
- post_termination_timeout=termination_timeout,
- )
+ if not logs:
+ logs = self.read_pod_logs(
+ pod=pod,
+ container_name=container_name,
+ timestamps=True,
+ since_seconds=(
+ math.ceil((pendulum.now() -
since_time).total_seconds()) if since_time else None
+ ),
+ follow=follow,
+ post_termination_timeout=post_termination_timeout,
+ )
for raw_line in logs:
line = raw_line.decode("utf-8", errors="backslashreplace")
line_timestamp, message = self.parse_log_line(line)
@@ -434,15 +439,19 @@ class PodManager(LoggingMixin):
pod.metadata.name,
exc_info=True,
)
- return last_captured_timestamp or since_time
+ return last_captured_timestamp or since_time, logs
# note: `read_pod_logs` follows the logs, so we shouldn't necessarily
*need* to
# loop as we do here. But in a long-running process we might
temporarily lose connectivity.
# So the looping logic is there to let us resume following the logs.
+ logs = None
last_log_time = since_time
while True:
- last_log_time = consume_logs(
- since_time=last_log_time, follow=follow,
termination_timeout=post_termination_timeout
+ last_log_time, logs = consume_logs(
+ since_time=last_log_time,
+ follow=follow,
+ termination_timeout=post_termination_timeout,
+ logs=logs,
)
if not self.container_is_running(pod,
container_name=container_name):
return PodLoggingStatus(running=False,
last_log_time=last_log_time)
diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
index dfe06d9a74..42e2a5c610 100644
--- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
+++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
@@ -284,6 +284,31 @@ class TestPodManager:
self.pod_manager.fetch_container_logs(mock.MagicMock(),
mock.MagicMock(), follow=True)
self.mock_progress_callback.assert_has_calls([mock.call(message),
mock.call(no_ts_message)])
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
+ def test_fetch_container_logs_failures(self, mock_container_is_running):
+ last_timestamp_string = "2020-10-08T14:18:17.793417674Z"
+ messages = [
+ bytes("2020-10-08T14:16:17.793417674Z message", "utf-8"),
+ bytes("2020-10-08T14:17:17.793417674Z message", "utf-8"),
+ None,
+ bytes(f"{last_timestamp_string} message", "utf-8"),
+ ]
+ expected_call_count = len([message for message in messages if message
is not None])
+
+ def consumer_iter():
+ while messages:
+ message = messages.pop(0)
+ if message is None:
+ raise BaseHTTPError("Boom")
+ yield message
+
+ with mock.patch.object(PodLogsConsumer, "__iter__") as
mock_consumer_iter:
+ mock_consumer_iter.side_effect = consumer_iter
+ mock_container_is_running.side_effect = [True, True, False]
+ status = self.pod_manager.fetch_container_logs(mock.MagicMock(),
mock.MagicMock(), follow=True)
+ assert status.last_log_time == cast(DateTime,
pendulum.parse(last_timestamp_string))
+ assert self.mock_progress_callback.call_count == expected_call_count
+
def test_parse_invalid_log_line(self, caplog):
with caplog.at_level(logging.INFO):
self.pod_manager.parse_log_line("2020-10-08T14:16:17.793417674ZInvalidmessage\n")