This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 882108862d Fix `enable_logging=True` not working in
`DockerSwarmOperator` (#35677)
882108862d is described below
commit 882108862dcaf08e7f5da519b3d186048d4ec7f9
Author: stavdav <[email protected]>
AuthorDate: Wed Dec 6 23:07:43 2023 +0100
Fix `enable_logging=True` not working in `DockerSwarmOperator` (#35677)
* Fixes #28452: "TaskInstances do not succeed when using
enable_logging=True option in DockerSwarmOperator"
It introduces logging of Docker Swarm services which was previously not
working.
* tty=True/False to be chosen by the user as it was the case before this
fix (#28452)
* Follow=true for logs will always result in tasks not ending. This is
standard and provided upstream by the Docker API.
Therefore in DockerSwarmOperator follow is always false.
* service_logs called multiple times as we continuously poll the Docker API
for logs. As we indicated in the previous commmit,
the docker client malfunctions when we try to get the logs with
follow=True. Therefore we make multiple calls to the API (every 2 seconds), to
fetch the new logs.
* service_logs called multiple times. In this test the tasks increase (6
instead of 5) as we check if the service has terminated (+1). As this assertion
makes less sense in a situation where we do multiple calls to the Docker API
(polling), we might think of removing it or replacing it with something more
suitable.
* Final commit of this PR marking the test case that validates logging in
the Docker Swarm Operator. We log two times a different message and we assert
that the two lines are given back in the logs in the expected sequence.
* Formatting ruff
* Reverting as Github actions don't run this test as a swarm node:
docker.errors.APIError: 503 Server Error for
http+docker://localhost/v1.43/services/create: Service Unavailable ("This node
is not a swarm manager. Use "docker swarm init" or "docker swarm join" to
connect this node to swarm and try again.")
Revert "Final commit of this PR marking the test case that validates
logging in the Docker Swarm Operator. We log two times a different message and
we assert that the two lines are given back in the logs in the expected
sequence."
This reverts commit 048ba1ece25fcd02c5ccc752d63759c76b098e44.
* Logging "since" timestamp to avoid memory issues
Fix for #28452
* Formatting - Fix for #28452
---
airflow/providers/docker/operators/docker_swarm.py | 48 ++++++++++++++--------
.../docker/operators/test_docker_swarm.py | 8 ++--
2 files changed, 35 insertions(+), 21 deletions(-)
diff --git a/airflow/providers/docker/operators/docker_swarm.py
b/airflow/providers/docker/operators/docker_swarm.py
index 7a8a419921..5622d87437 100644
--- a/airflow/providers/docker/operators/docker_swarm.py
+++ b/airflow/providers/docker/operators/docker_swarm.py
@@ -17,6 +17,9 @@
"""Run ephemeral Docker Swarm services."""
from __future__ import annotations
+import re
+from datetime import datetime
+from time import sleep
from typing import TYPE_CHECKING
from docker import types
@@ -179,23 +182,34 @@ class DockerSwarmOperator(DockerOperator):
def _stream_logs_to_output(self) -> None:
if not self.service:
raise Exception("The 'service' should be initialized before!")
- logs = self.cli.service_logs(
- self.service["ID"], follow=True, stdout=True, stderr=True,
is_tty=self.tty
- )
- line = ""
- for log in logs:
- try:
- log = log.decode()
- except UnicodeDecodeError:
- continue
- if log == "\n":
- self.log.info(line)
- line = ""
- else:
- line += log
- # flush any remaining log stream
- if line:
- self.log.info(line)
+ last_line_logged, last_timestamp = "", 0
+
+ def stream_new_logs(last_line_logged, since=0):
+ logs = self.cli.service_logs(
+ self.service["ID"],
+ follow=False,
+ stdout=True,
+ stderr=True,
+ is_tty=self.tty,
+ since=since,
+ timestamps=True,
+ )
+ logs = b"".join(logs).decode().splitlines()
+ if last_line_logged in logs:
+ logs = logs[logs.index(last_line_logged) + 1 :]
+ for line in logs:
+ match =
re.match(r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{6,}Z) (.*)", line)
+ timestamp, message = match.groups()
+ self.log.info(message)
+ # Floor nanoseconds to microseconds
+ last_timestamp = re.sub(r"(\.\d{6})\d+Z", r"\1Z", timestamp)
+ last_timestamp = datetime.strptime(last_timestamp,
"%Y-%m-%dT%H:%M:%S.%fZ")
+ last_timestamp = last_timestamp.timestamp()
+ return last_line_logged, last_timestamp
+
+ while not self._has_service_terminated():
+ sleep(2)
+ last_line_logged, last_timestamp =
stream_new_logs(last_line_logged, since=last_timestamp)
def on_kill(self) -> None:
if self.hook.client_created and self.service is not None:
diff --git a/tests/providers/docker/operators/test_docker_swarm.py
b/tests/providers/docker/operators/test_docker_swarm.py
index e14a7aa8b9..7bc74749dc 100644
--- a/tests/providers/docker/operators/test_docker_swarm.py
+++ b/tests/providers/docker/operators/test_docker_swarm.py
@@ -40,7 +40,7 @@ class TestDockerSwarmOperator:
yield [{"Status": {"State": "complete"}}]
def _client_service_logs_effect():
- yield b"Testing is awesome."
+ yield b"2023-12-05T00:00:00.000000000Z Testing is awesome."
client_mock = mock.Mock(spec=APIClient)
client_mock.create_service.return_value = {"ID": "some_id"}
@@ -98,8 +98,8 @@ class TestDockerSwarmOperator:
base_url="unix://var/run/docker.sock", tls=False, version="1.19",
timeout=DEFAULT_TIMEOUT_SECONDS
)
- client_mock.service_logs.assert_called_once_with(
- "some_id", follow=True, stdout=True, stderr=True, is_tty=True
+ client_mock.service_logs.assert_called_with(
+ "some_id", follow=False, stdout=True, stderr=True, is_tty=True,
since=0, timestamps=True
)
csargs, cskwargs = client_mock.create_service.call_args_list[0]
@@ -108,7 +108,7 @@ class TestDockerSwarmOperator:
assert cskwargs["labels"] == {"name":
"airflow__adhoc_airflow__unittest"}
assert cskwargs["name"].startswith("airflow-")
assert cskwargs["mode"] == types.ServiceMode(mode="replicated",
replicas=3)
- assert client_mock.tasks.call_count == 5
+ assert client_mock.tasks.call_count == 6
client_mock.remove_service.assert_called_once_with("some_id")
@mock.patch("airflow.providers.docker.operators.docker_swarm.types")