This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 882108862d Fix `enable_logging=True` not working in 
`DockerSwarmOperator` (#35677)
882108862d is described below

commit 882108862dcaf08e7f5da519b3d186048d4ec7f9
Author: stavdav <[email protected]>
AuthorDate: Wed Dec 6 23:07:43 2023 +0100

    Fix `enable_logging=True` not working in `DockerSwarmOperator` (#35677)
    
    * Fixes #28452: "TaskInstances do not succeed when using 
enable_logging=True option in DockerSwarmOperator"
    It introduces logging of Docker Swarm services which was previously not
    working.
    
    * tty=True/False to be chosen by the user as it was the case before this 
fix (#28452)
    
    * Follow=true for logs will always result in tasks not ending. This is  
standard and provided upstream by the Docker API.
    Therefore in DockerSwarmOperator follow is always false.
    
    * service_logs called multiple times as we continuously poll the Docker API 
for logs. As we indicated in the previous commmit,
    the docker client malfunctions when we try to get the logs with 
follow=True. Therefore we make multiple calls to the API (every 2 seconds), to 
fetch the new logs.
    
    * service_logs called multiple times. In this test the tasks increase (6 
instead of 5) as we check if the service has terminated (+1). As this assertion 
makes less sense in a situation where we do multiple calls to the Docker API 
(polling), we might think of removing it or replacing it with something more 
suitable.
    
    * Final commit of this PR marking the test case that validates logging in 
the Docker Swarm Operator. We log two times a different message and we assert 
that the two lines are given back in the logs in the expected sequence.
    
    * Formatting ruff
    
    * Reverting as Github actions don't run this test as a swarm node:
    
    docker.errors.APIError: 503 Server Error for 
http+docker://localhost/v1.43/services/create: Service Unavailable ("This node 
is not a swarm manager. Use "docker swarm init" or "docker swarm join" to 
connect this node to swarm and try again.")
    
    Revert "Final commit of this PR marking the test case that validates 
logging in the Docker Swarm Operator. We log two times a different message and 
we assert that the two lines are given back in the logs in the expected 
sequence."
    
    This reverts commit 048ba1ece25fcd02c5ccc752d63759c76b098e44.
    
    * Logging "since" timestamp to avoid memory issues
    Fix for #28452
    
    * Formatting - Fix for #28452
---
 airflow/providers/docker/operators/docker_swarm.py | 48 ++++++++++++++--------
 .../docker/operators/test_docker_swarm.py          |  8 ++--
 2 files changed, 35 insertions(+), 21 deletions(-)

diff --git a/airflow/providers/docker/operators/docker_swarm.py 
b/airflow/providers/docker/operators/docker_swarm.py
index 7a8a419921..5622d87437 100644
--- a/airflow/providers/docker/operators/docker_swarm.py
+++ b/airflow/providers/docker/operators/docker_swarm.py
@@ -17,6 +17,9 @@
 """Run ephemeral Docker Swarm services."""
 from __future__ import annotations
 
+import re
+from datetime import datetime
+from time import sleep
 from typing import TYPE_CHECKING
 
 from docker import types
@@ -179,23 +182,34 @@ class DockerSwarmOperator(DockerOperator):
     def _stream_logs_to_output(self) -> None:
         if not self.service:
             raise Exception("The 'service' should be initialized before!")
-        logs = self.cli.service_logs(
-            self.service["ID"], follow=True, stdout=True, stderr=True, 
is_tty=self.tty
-        )
-        line = ""
-        for log in logs:
-            try:
-                log = log.decode()
-            except UnicodeDecodeError:
-                continue
-            if log == "\n":
-                self.log.info(line)
-                line = ""
-            else:
-                line += log
-        # flush any remaining log stream
-        if line:
-            self.log.info(line)
+        last_line_logged, last_timestamp = "", 0
+
+        def stream_new_logs(last_line_logged, since=0):
+            logs = self.cli.service_logs(
+                self.service["ID"],
+                follow=False,
+                stdout=True,
+                stderr=True,
+                is_tty=self.tty,
+                since=since,
+                timestamps=True,
+            )
+            logs = b"".join(logs).decode().splitlines()
+            if last_line_logged in logs:
+                logs = logs[logs.index(last_line_logged) + 1 :]
+            for line in logs:
+                match = 
re.match(r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{6,}Z) (.*)", line)
+                timestamp, message = match.groups()
+                self.log.info(message)
+            # Floor nanoseconds to microseconds
+            last_timestamp = re.sub(r"(\.\d{6})\d+Z", r"\1Z", timestamp)
+            last_timestamp = datetime.strptime(last_timestamp, 
"%Y-%m-%dT%H:%M:%S.%fZ")
+            last_timestamp = last_timestamp.timestamp()
+            return last_line_logged, last_timestamp
+
+        while not self._has_service_terminated():
+            sleep(2)
+            last_line_logged, last_timestamp = 
stream_new_logs(last_line_logged, since=last_timestamp)
 
     def on_kill(self) -> None:
         if self.hook.client_created and self.service is not None:
diff --git a/tests/providers/docker/operators/test_docker_swarm.py 
b/tests/providers/docker/operators/test_docker_swarm.py
index e14a7aa8b9..7bc74749dc 100644
--- a/tests/providers/docker/operators/test_docker_swarm.py
+++ b/tests/providers/docker/operators/test_docker_swarm.py
@@ -40,7 +40,7 @@ class TestDockerSwarmOperator:
                 yield [{"Status": {"State": "complete"}}]
 
         def _client_service_logs_effect():
-            yield b"Testing is awesome."
+            yield b"2023-12-05T00:00:00.000000000Z Testing is awesome."
 
         client_mock = mock.Mock(spec=APIClient)
         client_mock.create_service.return_value = {"ID": "some_id"}
@@ -98,8 +98,8 @@ class TestDockerSwarmOperator:
             base_url="unix://var/run/docker.sock", tls=False, version="1.19", 
timeout=DEFAULT_TIMEOUT_SECONDS
         )
 
-        client_mock.service_logs.assert_called_once_with(
-            "some_id", follow=True, stdout=True, stderr=True, is_tty=True
+        client_mock.service_logs.assert_called_with(
+            "some_id", follow=False, stdout=True, stderr=True, is_tty=True, 
since=0, timestamps=True
         )
 
         csargs, cskwargs = client_mock.create_service.call_args_list[0]
@@ -108,7 +108,7 @@ class TestDockerSwarmOperator:
         assert cskwargs["labels"] == {"name": 
"airflow__adhoc_airflow__unittest"}
         assert cskwargs["name"].startswith("airflow-")
         assert cskwargs["mode"] == types.ServiceMode(mode="replicated", 
replicas=3)
-        assert client_mock.tasks.call_count == 5
+        assert client_mock.tasks.call_count == 6
         client_mock.remove_service.assert_called_once_with("some_id")
 
     @mock.patch("airflow.providers.docker.operators.docker_swarm.types")

Reply via email to