This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2bf989763f5 Feat: support docker operator arg `labels` (#46643)
2bf989763f5 is described below
commit 2bf989763f5a02b74f8074d3c3c5233e1e49bc50
Author: phi-friday <[email protected]>
AuthorDate: Thu Feb 13 05:11:18 2025 +0900
Feat: support docker operator arg `labels` (#46643)
* feat: add args labels
* fix: add larg labels stub
* test: add labels test
* fix: assert_called_once_with error
* fix: create_container.assert_* error
* fix: rm create_host_config labels
---
airflow/decorators/__init__.pyi | 3 +++
.../docker/src/airflow/providers/docker/operators/docker.py | 5 +++++
.../tests/provider_tests/docker/operators/test_docker.py | 13 +++++++++++++
3 files changed, 21 insertions(+)
diff --git a/airflow/decorators/__init__.pyi b/airflow/decorators/__init__.pyi
index ab8f636cb05..34f110983cb 100644
--- a/airflow/decorators/__init__.pyi
+++ b/airflow/decorators/__init__.pyi
@@ -404,6 +404,7 @@ class TaskDecoratorCollection:
skip_on_exit_code: int | Container[int] | None = None,
port_bindings: dict | None = None,
ulimits: list[dict] | None = None,
+ labels: dict[str, str] | list[str] | None = None,
**kwargs,
) -> TaskDecorator:
"""Create a decorator to convert the decorated callable to a Docker
task.
@@ -508,6 +509,8 @@ class TaskDecoratorCollection:
Incompatible with ``"host"`` in ``network_mode``.
:param ulimits: List of ulimit options to set for the container. Each
item should
be a :py:class:`docker.types.Ulimit` instance.
+ :param labels: A dictionary of name-value labels (e.g. ``{"label1":
"value1", "label2": "value2"}``)
+ or a list of names of labels to set with empty values (e.g.
``["label1", "label2"]``)
"""
# [END decorator_signature]
def kubernetes(
diff --git a/providers/docker/src/airflow/providers/docker/operators/docker.py
b/providers/docker/src/airflow/providers/docker/operators/docker.py
index a175b3ef4c9..dd9bd630bec 100644
--- a/providers/docker/src/airflow/providers/docker/operators/docker.py
+++ b/providers/docker/src/airflow/providers/docker/operators/docker.py
@@ -192,6 +192,8 @@ class DockerOperator(BaseOperator):
Incompatible with ``"host"`` in ``network_mode``.
:param ulimits: List of ulimit options to set for the container. Each item
should
be a :py:class:`docker.types.Ulimit` instance.
+ :param labels: A dictionary of name-value labels (e.g. ``{"label1":
"value1", "label2": "value2"}``)
+ or a list of names of labels to set with empty values (e.g.
``["label1", "label2"]``)
"""
# !!! Changes in DockerOperator's arguments should be also reflected in !!!
@@ -255,6 +257,7 @@ class DockerOperator(BaseOperator):
skip_on_exit_code: int | Container[int] | None = None,
port_bindings: dict | None = None,
ulimits: list[Ulimit] | None = None,
+ labels: dict[str, str] | list[str] | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -301,6 +304,7 @@ class DockerOperator(BaseOperator):
self.cap_add = cap_add
self.extra_hosts = extra_hosts
self.ulimits = ulimits or []
+ self.labels = labels
self.container: dict = None # type: ignore[assignment]
self.retrieve_output = retrieve_output
@@ -407,6 +411,7 @@ class DockerOperator(BaseOperator):
working_dir=self.working_dir,
tty=self.tty,
hostname=self.hostname,
+ labels=self.labels,
)
log_stream = self.cli.attach(container=self.container["Id"],
stdout=True, stderr=True, stream=True)
try:
diff --git
a/providers/docker/tests/provider_tests/docker/operators/test_docker.py
b/providers/docker/tests/provider_tests/docker/operators/test_docker.py
index 2e9b0019592..50ff7e4f553 100644
--- a/providers/docker/tests/provider_tests/docker/operators/test_docker.py
+++ b/providers/docker/tests/provider_tests/docker/operators/test_docker.py
@@ -226,6 +226,7 @@ class TestDockerOperator:
tty=True,
hostname=TEST_CONTAINER_HOSTNAME,
ports=[],
+ labels=None,
)
self.client_mock.create_host_config.assert_called_once_with(
mounts=[
@@ -299,6 +300,7 @@ class TestDockerOperator:
tty=True,
hostname=TEST_CONTAINER_HOSTNAME,
ports=[],
+ labels=None,
)
self.client_mock.create_host_config.assert_called_once_with(
mounts=[
@@ -392,6 +394,7 @@ class TestDockerOperator:
tty=True,
hostname=None,
ports=[],
+ labels=None,
),
call(
command="env",
@@ -405,6 +408,7 @@ class TestDockerOperator:
tty=True,
hostname=None,
ports=[],
+ labels=None,
),
]
)
@@ -508,6 +512,7 @@ class TestDockerOperator:
tty=True,
hostname=None,
ports=[],
+ labels=None,
)
stringio_mock.assert_called_once_with("UNIT=FILE\nPRIVATE=FILE\nVAR=VALUE")
self.dotenv_mock.assert_called_once_with(stream="UNIT=FILE\nPRIVATE=FILE\nVAR=VALUE")
@@ -781,3 +786,11 @@ class TestDockerOperator:
def test_fetch_logs(self, logger_mock, log_lines, expected_lines):
fetch_logs(log_lines, logger_mock)
assert logger_mock.info.call_args_list == [call("%s", line) for line
in expected_lines]
+
+ @pytest.mark.parametrize("labels", ({"key": "value"}, ["key=value"]))
+ def test_labels(self, labels: dict[str, str] | list[str]):
+ operator = DockerOperator(task_id="test", image="test", labels=labels)
+ operator.execute(None)
+ self.client_mock.create_container.assert_called_once()
+ assert "labels" in self.client_mock.create_container.call_args.kwargs
+ assert labels ==
self.client_mock.create_container.call_args.kwargs["labels"]