This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 45c70f3  Add support of placement in the DockerSwarmOperator (#18990)
45c70f3 is described below

commit 45c70f397afc54a931bf40ceb843c7b9a9cd75e3
Author: AmineB <[email protected]>
AuthorDate: Fri Oct 29 08:03:57 2021 +0200

    Add support of placement in the DockerSwarmOperator (#18990)
---
 airflow/providers/docker/operators/docker_swarm.py    | 6 ++++++
 tests/providers/docker/operators/test_docker_swarm.py | 2 ++
 2 files changed, 8 insertions(+)

diff --git a/airflow/providers/docker/operators/docker_swarm.py 
b/airflow/providers/docker/operators/docker_swarm.py
index a1f3f0b..9a50679 100644
--- a/airflow/providers/docker/operators/docker_swarm.py
+++ b/airflow/providers/docker/operators/docker_swarm.py
@@ -105,6 +105,9 @@ class DockerSwarmOperator(DockerOperator):
     :type mode: docker.types.ServiceMode
     :param networks: List of network names or IDs or NetworkAttachmentConfig 
to attach the service to.
     :type networks: List[Union[str, NetworkAttachmentConfig]]
+    :param placement: Placement instructions for the scheduler. If a list is 
passed instead,
+        it is assumed to be a list of constraints as part of a Placement 
object.
+    :type placement: Union[types.Placement, List[types.Placement]]
     """
 
     def __init__(
@@ -116,6 +119,7 @@ class DockerSwarmOperator(DockerOperator):
         secrets: Optional[List[types.SecretReference]] = None,
         mode: Optional[types.ServiceMode] = None,
         networks: Optional[List[Union[str, types.NetworkAttachmentConfig]]] = 
None,
+        placement: Optional[Union[types.Placement, List[types.Placement]]] = 
None,
         **kwargs,
     ) -> None:
         super().__init__(image=image, **kwargs)
@@ -126,6 +130,7 @@ class DockerSwarmOperator(DockerOperator):
         self.secrets = secrets
         self.mode = mode
         self.networks = networks
+        self.placement = placement
 
     def execute(self, context) -> None:
         self.cli = self._get_cli()
@@ -153,6 +158,7 @@ class DockerSwarmOperator(DockerOperator):
                 restart_policy=types.RestartPolicy(condition='none'),
                 resources=types.Resources(mem_limit=self.mem_limit),
                 networks=self.networks,
+                placement=self.placement,
             ),
             name=f'airflow-{get_random_string()}',
             labels={'name': f'airflow__{self.dag_id}__{self.task_id}'},
diff --git a/tests/providers/docker/operators/test_docker_swarm.py 
b/tests/providers/docker/operators/test_docker_swarm.py
index 09207b4..d2c0c9c 100644
--- a/tests/providers/docker/operators/test_docker_swarm.py
+++ b/tests/providers/docker/operators/test_docker_swarm.py
@@ -71,6 +71,7 @@ class TestDockerSwarmOperator(unittest.TestCase):
             secrets=[types.SecretReference(secret_id="dummy_secret_id", 
secret_name="dummy_secret_name")],
             mode=types.ServiceMode(mode="replicated", replicas=3),
             networks=["dummy_network"],
+            
placement=types.Placement(constraints=["node.labels.region==east"]),
         )
         operator.execute(None)
 
@@ -79,6 +80,7 @@ class TestDockerSwarmOperator(unittest.TestCase):
             restart_policy=mock_obj,
             resources=mock_obj,
             networks=["dummy_network"],
+            
placement=types.Placement(constraints=["node.labels.region==east"]),
         )
         types_mock.ContainerSpec.assert_called_once_with(
             image='ubuntu:latest',

Reply via email to