This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 74bc8728fb5 Fix logs with leading spaces in the Docker operator
(#33692) (#43840)
74bc8728fb5 is described below
commit 74bc8728fb58f73d65b9283e33577f234e7b8bb3
Author: Mark Andreev <[email protected]>
AuthorDate: Sat Nov 9 14:29:10 2024 +0000
Fix logs with leading spaces in the Docker operator (#33692) (#43840)
Python 3.11’s multi-line error arrows don’t display correctly in Airflow’s
DockerOperator logs due to leading spaces being removed, making error messages
hard to read.
Before fix:
return self.main(*args, **kwargs)
^^^^^^^^^^^^^^^^
After fix:
return self.main(*args, **kwargs)
^^^^^^^^^^^^^^^^
Fixes: #33692
---
.../airflow/providers/docker/operators/docker.py | 21 +++++++++-----
providers/tests/docker/operators/test_docker.py | 32 +++++++++++++++++++++-
2 files changed, 45 insertions(+), 8 deletions(-)
diff --git a/providers/src/airflow/providers/docker/operators/docker.py
b/providers/src/airflow/providers/docker/operators/docker.py
index f9d3ea472ff..01d07ffd981 100644
--- a/providers/src/airflow/providers/docker/operators/docker.py
+++ b/providers/src/airflow/providers/docker/operators/docker.py
@@ -47,6 +47,8 @@ from airflow.providers.docker.hooks.docker import DockerHook
from airflow.utils.types import NOTSET, ArgNotSet
if TYPE_CHECKING:
+ from logging import Logger
+
from docker import APIClient
from docker.types import DeviceRequest
@@ -62,6 +64,16 @@ def stringify(line: str | bytes):
return line
+def fetch_logs(log_stream, log: Logger):
+ log_lines = []
+ for log_chunk in log_stream:
+ log_chunk = stringify(log_chunk).rstrip()
+ log_lines.append(log_chunk)
+ for log_chunk_line in log_chunk.split("\n"):
+ log.info("%s", log_chunk_line)
+ return log_lines
+
+
class DockerOperator(BaseOperator):
"""
Execute a command inside a docker container.
@@ -426,16 +438,11 @@ class DockerOperator(BaseOperator):
tty=self.tty,
hostname=self.hostname,
)
- logstream = self.cli.attach(container=self.container["Id"],
stdout=True, stderr=True, stream=True)
+ log_stream = self.cli.attach(container=self.container["Id"],
stdout=True, stderr=True, stream=True)
try:
self.cli.start(self.container["Id"])
- log_lines = []
- for log_chunk in logstream:
- log_chunk = stringify(log_chunk).strip()
- log_lines.append(log_chunk)
- for log_chunk_line in log_chunk.split("\n"):
- self.log.info("%s", log_chunk_line)
+ log_lines = fetch_logs(log_stream, self.log)
result = self.cli.wait(self.container["Id"])
if result["StatusCode"] in self.skip_on_exit_code:
diff --git a/providers/tests/docker/operators/test_docker.py
b/providers/tests/docker/operators/test_docker.py
index 9dbc84657f4..8919fc0962d 100644
--- a/providers/tests/docker/operators/test_docker.py
+++ b/providers/tests/docker/operators/test_docker.py
@@ -28,7 +28,7 @@ from docker.types import DeviceRequest, LogConfig, Mount,
Ulimit
from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning, AirflowSkipException
from airflow.providers.docker.exceptions import DockerContainerFailedException
-from airflow.providers.docker.operators.docker import DockerOperator
+from airflow.providers.docker.operators.docker import DockerOperator,
fetch_logs
from airflow.utils.task_instance_session import
set_current_task_instance_session
TEST_CONN_ID = "docker_test_connection"
@@ -865,3 +865,33 @@ class TestDockerOperator:
pytest.raises(ValueError, match="Conflicting
`skip_on_exit_code` provided"),
):
ti.render_templates()
+
+ @pytest.mark.parametrize(
+ "log_lines, expected_lines",
+ [
+ pytest.param(
+ [
+ "return self.main(*args, **kwargs)",
+ " ^^^^^^^^^^^^^^^^",
+ ],
+ [
+ "return self.main(*args, **kwargs)",
+ " ^^^^^^^^^^^^^^^^",
+ ],
+ id="should-not-remove-leading-spaces",
+ ),
+ pytest.param(
+ [
+ " ^^^^^^^^^^^^^^^^ ",
+ ],
+ [
+ " ^^^^^^^^^^^^^^^^",
+ ],
+ id="should-remove-trailing-spaces",
+ ),
+ ],
+ )
+ @mock.patch("logging.Logger")
+ def test_fetch_logs(self, logger_mock, log_lines, expected_lines):
+ fetch_logs(log_lines, logger_mock)
+ assert logger_mock.info.call_args_list == [call("%s", line) for line
in expected_lines]