This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 45c70f3 Add support of placement in the DockerSwarmOperator (#18990)
45c70f3 is described below
commit 45c70f397afc54a931bf40ceb843c7b9a9cd75e3
Author: AmineB <[email protected]>
AuthorDate: Fri Oct 29 08:03:57 2021 +0200
Add support of placement in the DockerSwarmOperator (#18990)
---
airflow/providers/docker/operators/docker_swarm.py | 6 ++++++
tests/providers/docker/operators/test_docker_swarm.py | 2 ++
2 files changed, 8 insertions(+)
diff --git a/airflow/providers/docker/operators/docker_swarm.py
b/airflow/providers/docker/operators/docker_swarm.py
index a1f3f0b..9a50679 100644
--- a/airflow/providers/docker/operators/docker_swarm.py
+++ b/airflow/providers/docker/operators/docker_swarm.py
@@ -105,6 +105,9 @@ class DockerSwarmOperator(DockerOperator):
:type mode: docker.types.ServiceMode
:param networks: List of network names or IDs or NetworkAttachmentConfig
to attach the service to.
:type networks: List[Union[str, NetworkAttachmentConfig]]
+ :param placement: Placement instructions for the scheduler. If a list is
passed instead,
+ it is assumed to be a list of constraints as part of a Placement
object.
+ :type placement: Union[types.Placement, List[types.Placement]]
"""
def __init__(
@@ -116,6 +119,7 @@ class DockerSwarmOperator(DockerOperator):
secrets: Optional[List[types.SecretReference]] = None,
mode: Optional[types.ServiceMode] = None,
networks: Optional[List[Union[str, types.NetworkAttachmentConfig]]] =
None,
+ placement: Optional[Union[types.Placement, List[types.Placement]]] =
None,
**kwargs,
) -> None:
super().__init__(image=image, **kwargs)
@@ -126,6 +130,7 @@ class DockerSwarmOperator(DockerOperator):
self.secrets = secrets
self.mode = mode
self.networks = networks
+ self.placement = placement
def execute(self, context) -> None:
self.cli = self._get_cli()
@@ -153,6 +158,7 @@ class DockerSwarmOperator(DockerOperator):
restart_policy=types.RestartPolicy(condition='none'),
resources=types.Resources(mem_limit=self.mem_limit),
networks=self.networks,
+ placement=self.placement,
),
name=f'airflow-{get_random_string()}',
labels={'name': f'airflow__{self.dag_id}__{self.task_id}'},
diff --git a/tests/providers/docker/operators/test_docker_swarm.py
b/tests/providers/docker/operators/test_docker_swarm.py
index 09207b4..d2c0c9c 100644
--- a/tests/providers/docker/operators/test_docker_swarm.py
+++ b/tests/providers/docker/operators/test_docker_swarm.py
@@ -71,6 +71,7 @@ class TestDockerSwarmOperator(unittest.TestCase):
secrets=[types.SecretReference(secret_id="dummy_secret_id",
secret_name="dummy_secret_name")],
mode=types.ServiceMode(mode="replicated", replicas=3),
networks=["dummy_network"],
+
placement=types.Placement(constraints=["node.labels.region==east"]),
)
operator.execute(None)
@@ -79,6 +80,7 @@ class TestDockerSwarmOperator(unittest.TestCase):
restart_policy=mock_obj,
resources=mock_obj,
networks=["dummy_network"],
+
placement=types.Placement(constraints=["node.labels.region==east"]),
)
types_mock.ContainerSpec.assert_called_once_with(
image='ubuntu:latest',