This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 19d6f54704 Add logging options to docker operator (#26653)
19d6f54704 is described below
commit 19d6f54704949d017b028e644bbcf45f5b53120b
Author: Nischith Shetty <[email protected]>
AuthorDate: Tue Sep 27 20:12:37 2022 +0530
Add logging options to docker operator (#26653)
---
airflow/providers/docker/operators/docker.py | 18 +++++++++++++++++-
tests/providers/docker/operators/test_docker.py | 8 +++++++-
2 files changed, 24 insertions(+), 2 deletions(-)
diff --git a/airflow/providers/docker/operators/docker.py
b/airflow/providers/docker/operators/docker.py
index 6456348188..cd97b99f4e 100644
--- a/airflow/providers/docker/operators/docker.py
+++ b/airflow/providers/docker/operators/docker.py
@@ -29,7 +29,7 @@ from typing import TYPE_CHECKING, Iterable, Sequence
from docker import APIClient, tls # type: ignore[attr-defined]
from docker.constants import DEFAULT_TIMEOUT_SECONDS # type:
ignore[attr-defined]
from docker.errors import APIError # type: ignore[attr-defined]
-from docker.types import DeviceRequest, Mount # type: ignore[attr-defined]
+from docker.types import DeviceRequest, LogConfig, Mount # type:
ignore[attr-defined]
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
@@ -140,6 +140,12 @@ class DockerOperator(BaseOperator):
output that is not posted to logs
:param retrieve_output_path: path for output file that will be retrieved
and passed to xcom
:param device_requests: Expose host resources such as GPUs to the
container.
+ :param log_opts_max_size: The maximum size of the log before it is rolled.
+ A positive integer plus a modifier representing the unit of measure
(k, m, or g).
+ Eg: 10m or 1g Defaults to -1 (unlimited).
+ :param log_opts_max_file: The maximum number of log files that can be
present.
+ If rolling the logs creates excess files, the oldest file is removed.
+ Only effective when max-size is also set. A positive integer. Defaults
to 1.
"""
template_fields: Sequence[str] = ('image', 'command', 'environment',
'container_name')
@@ -188,6 +194,8 @@ class DockerOperator(BaseOperator):
retrieve_output_path: str | None = None,
timeout: int = DEFAULT_TIMEOUT_SECONDS,
device_requests: list[DeviceRequest] | None = None,
+ log_opts_max_size: str | None = None,
+ log_opts_max_file: str | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -244,6 +252,8 @@ class DockerOperator(BaseOperator):
self.retrieve_output_path = retrieve_output_path
self.timeout = timeout
self.device_requests = device_requests
+ self.log_opts_max_size = log_opts_max_size
+ self.log_opts_max_file = log_opts_max_file
def get_hook(self) -> DockerHook:
"""
@@ -289,6 +299,11 @@ class DockerOperator(BaseOperator):
self.environment.pop('AIRFLOW_TMP_DIR', None)
if not self.cli:
raise Exception("The 'cli' should be initialized before!")
+ docker_log_config = {}
+ if self.log_opts_max_size is not None:
+ docker_log_config['max-size'] = self.log_opts_max_size
+ if self.log_opts_max_file is not None:
+ docker_log_config['max-file'] = self.log_opts_max_file
self.container = self.cli.create_container(
command=self.format_command(self.command),
name=self.container_name,
@@ -306,6 +321,7 @@ class DockerOperator(BaseOperator):
extra_hosts=self.extra_hosts,
privileged=self.privileged,
device_requests=self.device_requests,
+ log_config=LogConfig(config=docker_log_config),
),
image=self.image,
user=self.user,
diff --git a/tests/providers/docker/operators/test_docker.py
b/tests/providers/docker/operators/test_docker.py
index 04e9676412..72b544e321 100644
--- a/tests/providers/docker/operators/test_docker.py
+++ b/tests/providers/docker/operators/test_docker.py
@@ -30,7 +30,7 @@ from airflow.exceptions import AirflowException
try:
from docker import APIClient
- from docker.types import DeviceRequest, Mount
+ from docker.types import DeviceRequest, LogConfig, Mount
from airflow.providers.docker.hooks.docker import DockerHook
from airflow.providers.docker.operators.docker import DockerOperator
@@ -91,6 +91,8 @@ class TestDockerOperator(unittest.TestCase):
container_name='test_container',
tty=True,
device_requests=[DeviceRequest(count=-1, capabilities=[['gpu']])],
+ log_opts_max_file='5',
+ log_opts_max_size='10m',
)
operator.execute(None)
@@ -125,6 +127,7 @@ class TestDockerOperator(unittest.TestCase):
extra_hosts=None,
privileged=False,
device_requests=[DeviceRequest(count=-1, capabilities=[['gpu']])],
+ log_config=LogConfig(config={'max-size': '10m', 'max-file': '5'}),
)
self.tempdir_mock.assert_called_once_with(dir='/host/airflow',
prefix='airflowtmp')
self.client_mock.images.assert_called_once_with(name='ubuntu:latest')
@@ -188,6 +191,7 @@ class TestDockerOperator(unittest.TestCase):
extra_hosts=None,
privileged=False,
device_requests=None,
+ log_config=LogConfig(config={}),
)
self.tempdir_mock.assert_not_called()
self.client_mock.images.assert_called_once_with(name='ubuntu:latest')
@@ -276,6 +280,7 @@ class TestDockerOperator(unittest.TestCase):
extra_hosts=None,
privileged=False,
device_requests=None,
+ log_config=LogConfig(config={}),
),
call(
mounts=[
@@ -292,6 +297,7 @@ class TestDockerOperator(unittest.TestCase):
extra_hosts=None,
privileged=False,
device_requests=None,
+ log_config=LogConfig(config={}),
),
]
)