This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 74bc8728fb5 Fix logs with leading spaces in the Docker operator 
(#33692) (#43840)
74bc8728fb5 is described below

commit 74bc8728fb58f73d65b9283e33577f234e7b8bb3
Author: Mark Andreev <[email protected]>
AuthorDate: Sat Nov 9 14:29:10 2024 +0000

    Fix logs with leading spaces in the Docker operator (#33692) (#43840)
    
    Python 3.11’s multi-line error arrows don’t display correctly in Airflow’s 
DockerOperator logs due to leading spaces being removed, making error messages 
hard to read.
    
    Before fix:
    return self.main(*args, **kwargs)
    ^^^^^^^^^^^^^^^^
    
    After fix:
    return self.main(*args, **kwargs)
           ^^^^^^^^^^^^^^^^
    
    Fixes: #33692
---
 .../airflow/providers/docker/operators/docker.py   | 21 +++++++++-----
 providers/tests/docker/operators/test_docker.py    | 32 +++++++++++++++++++++-
 2 files changed, 45 insertions(+), 8 deletions(-)

diff --git a/providers/src/airflow/providers/docker/operators/docker.py 
b/providers/src/airflow/providers/docker/operators/docker.py
index f9d3ea472ff..01d07ffd981 100644
--- a/providers/src/airflow/providers/docker/operators/docker.py
+++ b/providers/src/airflow/providers/docker/operators/docker.py
@@ -47,6 +47,8 @@ from airflow.providers.docker.hooks.docker import DockerHook
 from airflow.utils.types import NOTSET, ArgNotSet
 
 if TYPE_CHECKING:
+    from logging import Logger
+
     from docker import APIClient
     from docker.types import DeviceRequest
 
@@ -62,6 +64,16 @@ def stringify(line: str | bytes):
         return line
 
 
+def fetch_logs(log_stream, log: Logger):
+    log_lines = []
+    for log_chunk in log_stream:
+        log_chunk = stringify(log_chunk).rstrip()
+        log_lines.append(log_chunk)
+        for log_chunk_line in log_chunk.split("\n"):
+            log.info("%s", log_chunk_line)
+    return log_lines
+
+
 class DockerOperator(BaseOperator):
     """
     Execute a command inside a docker container.
@@ -426,16 +438,11 @@ class DockerOperator(BaseOperator):
             tty=self.tty,
             hostname=self.hostname,
         )
-        logstream = self.cli.attach(container=self.container["Id"], 
stdout=True, stderr=True, stream=True)
+        log_stream = self.cli.attach(container=self.container["Id"], 
stdout=True, stderr=True, stream=True)
         try:
             self.cli.start(self.container["Id"])
 
-            log_lines = []
-            for log_chunk in logstream:
-                log_chunk = stringify(log_chunk).strip()
-                log_lines.append(log_chunk)
-                for log_chunk_line in log_chunk.split("\n"):
-                    self.log.info("%s", log_chunk_line)
+            log_lines = fetch_logs(log_stream, self.log)
 
             result = self.cli.wait(self.container["Id"])
             if result["StatusCode"] in self.skip_on_exit_code:
diff --git a/providers/tests/docker/operators/test_docker.py 
b/providers/tests/docker/operators/test_docker.py
index 9dbc84657f4..8919fc0962d 100644
--- a/providers/tests/docker/operators/test_docker.py
+++ b/providers/tests/docker/operators/test_docker.py
@@ -28,7 +28,7 @@ from docker.types import DeviceRequest, LogConfig, Mount, 
Ulimit
 
 from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning, AirflowSkipException
 from airflow.providers.docker.exceptions import DockerContainerFailedException
-from airflow.providers.docker.operators.docker import DockerOperator
+from airflow.providers.docker.operators.docker import DockerOperator, 
fetch_logs
 from airflow.utils.task_instance_session import 
set_current_task_instance_session
 
 TEST_CONN_ID = "docker_test_connection"
@@ -865,3 +865,33 @@ class TestDockerOperator:
                     pytest.raises(ValueError, match="Conflicting 
`skip_on_exit_code` provided"),
                 ):
                     ti.render_templates()
+
+    @pytest.mark.parametrize(
+        "log_lines, expected_lines",
+        [
+            pytest.param(
+                [
+                    "return self.main(*args, **kwargs)",
+                    "                 ^^^^^^^^^^^^^^^^",
+                ],
+                [
+                    "return self.main(*args, **kwargs)",
+                    "                 ^^^^^^^^^^^^^^^^",
+                ],
+                id="should-not-remove-leading-spaces",
+            ),
+            pytest.param(
+                [
+                    "   ^^^^^^^^^^^^^^^^   ",
+                ],
+                [
+                    "   ^^^^^^^^^^^^^^^^",
+                ],
+                id="should-remove-trailing-spaces",
+            ),
+        ],
+    )
+    @mock.patch("logging.Logger")
+    def test_fetch_logs(self, logger_mock, log_lines, expected_lines):
+        fetch_logs(log_lines, logger_mock)
+        assert logger_mock.info.call_args_list == [call("%s", line) for line 
in expected_lines]

Reply via email to