Taragolis commented on code in PR #28336:
URL: https://github.com/apache/airflow/pull/28336#discussion_r1052084840
##########
airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -91,6 +98,62 @@ def get_container_termination_message(pod: V1Pod,
container_name: str):
return container_status.state.terminated.message if container_status
else None
+class PodLogsConsumer:
+ """
+ PodLogsConsumer is responsible for pulling pod logs from a stream with
checking a container status before
+ reading data.
+ This class is a workaround for the issue
https://github.com/apache/airflow/issues/23497
+ """
+
+ def __init__(
+ self,
+ response: HTTPResponse,
+ pod: V1Pod,
+ pod_manager: PodManager,
+ container_name: str,
+ timeout: int = 120,
+ ):
+ self.response = response
+ self.pod = pod
+ self.pod_manager = pod_manager
+ self.container_name = container_name
+ self.timeout = timeout
+
+ def __iter__(self) -> Generator[bytes, None, None]:
+ messages: list[bytes] = []
+ if self.logs_available():
+ for chunk in self.response.stream(amt=None, decode_content=True):
+ if b"\n" in chunk:
+ chunks = chunk.split(b"\n")
+ yield b"".join(messages) + chunks[0] + b"\n"
+ for x in chunks[1:-1]:
+ yield x + b"\n"
+ if chunks[-1]:
+ messages = [chunks[-1]]
+ else:
+ messages = []
+ else:
+ messages.append(chunk)
+ if not self.logs_available():
+ break
+ if messages:
+ yield b"".join(messages)
+
+ def logs_available(self):
+ remote_pod = self.pod_manager.read_pod(self.pod)
+ if container_is_running(pod=remote_pod,
container_name=self.container_name):
+ return True
+ container_status = get_container_status(pod=remote_pod,
container_name=self.container_name)
+ state = container_status.state if container_status else None
+ terminated = state.terminated if state else None
+ if terminated:
+ termination_time = terminated.finished_at
+ if termination_time:
+ now = datetime.now(tz=timezone.utc)
+ return termination_time + timedelta(seconds=self.timeout) > now
Review Comment:
General approach for get datetime now in UTC timezone in Airflow by use this
helper:
https://github.com/apache/airflow/blob/e377e869da9f0e42ac1e0a615347cf7cd6565d54/airflow/utils/timezone.py#L55-L67
##########
tests/providers/cncf/kubernetes/utils/test_pod_manager.py:
##########
@@ -403,3 +421,77 @@ def test_container_is_running(remote_pod, result):
an object `e` such that `e.status.container_statuses` is None, and so on.
This test
verifies the expected behavior."""
assert container_is_running(remote_pod, "base") is result
+
+
+class TestPodLogsConsumer:
+ @pytest.mark.parametrize(
+ "chunks, expected_logs",
+ [
+ ([b"message"], [b"message"]),
+ ([b"message1\nmessage2"], [b"message1\n", b"message2"]),
+ ([b"message1\n", b"message2"], [b"message1\n", b"message2"]),
+ ([b"first_part", b"_second_part"], [b"first_part_second_part"]),
+ ([b""], [b""]),
+ ],
+ )
+ def test_chunks(self, chunks, expected_logs):
+ with mock.patch.object(PodLogsConsumer, "logs_available") as
logs_available:
+ logs_available.return_value = True
+ consumer = PodLogsConsumer(
+
response=mock.MagicMock(stream=mock.MagicMock(return_value=chunks)),
+ pod=mock.MagicMock(),
+
pod_manager=mock.MagicMock(container_is_running=mock.MagicMock(return_value=True)),
+ container_name="base",
+ )
+ assert list(consumer) == expected_logs
+
+ def test_container_is_not_running(self):
+ with mock.patch.object(PodLogsConsumer, "logs_available") as
logs_available:
+ logs_available.return_value = False
+ consumer = PodLogsConsumer(
+
response=mock.MagicMock(stream=mock.MagicMock(return_value=[b"message1",
b"message2"])),
+ pod=mock.MagicMock(),
+
pod_manager=mock.MagicMock(container_is_running=mock.MagicMock(return_value=False)),
+ container_name="base",
+ )
+ assert list(consumer) == []
+
+ @pytest.mark.parametrize(
+ "container_run, termination_time, now_time, timeout,
expected_logs_available",
+ [
+ (False, datetime(2022, 1, 1, 0, 0, 0, 0), datetime(2022, 1, 1, 0,
1, 0, 0), 120, True),
+ (False, datetime(2022, 1, 1, 0, 0, 0, 0), datetime(2022, 1, 1, 0,
2, 0, 0), 120, False),
+ (False, datetime(2022, 1, 1, 0, 0, 0, 0), datetime(2022, 1, 1, 0,
5, 0, 0), 120, False),
+ (True, datetime(2022, 1, 1, 0, 0, 0, 0), datetime(2022, 1, 1, 0,
1, 0, 0), 120, True),
+ (True, datetime(2022, 1, 1, 0, 0, 0, 0), datetime(2022, 1, 1, 0,
2, 0, 0), 120, True),
+ (True, datetime(2022, 1, 1, 0, 0, 0, 0), datetime(2022, 1, 1, 0,
5, 0, 0), 120, True),
+ ],
+ )
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running")
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.get_container_status")
+ def test_logs_available(
+ self,
+ mock_get_container_status,
+ mock_container_is_running,
+ container_run,
+ termination_time,
+ now_time,
+ timeout,
+ expected_logs_available,
+ ):
+ mock_container_is_running.return_value = container_run
+ mock_get_container_status.return_value = mock.MagicMock(
+
state=mock.MagicMock(terminated=mock.MagicMock(finished_at=termination_time))
+ )
+ with mock.patch(
+ "airflow.providers.cncf.kubernetes.utils.pod_manager.datetime",
+ mock.MagicMock(now=mock.MagicMock(return_value=now_time),
fromtimestamp=datetime.fromtimestamp),
+ ):
Review Comment:
In general we use
[`time-machine`](https://github.com/adamchainz/time-machine), previously
`freezegun`, for mock everything related to current datetime rather than
manually mock it.
For sure there are some tests exists which use mock for datetime/time
however better to use time-machine (as context manager or fixture).
In additional I'm not familiar with k8s objects, and do not know is
[`V1ContainerStateTerminated.finished_at`](https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1ContainerStateTerminated.md)
return timezone-naive or timezone-aware datetime.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]