Repository: incubator-airflow
Updated Branches:
  refs/heads/master 8482b208b -> fff87b5cf


[AIRFLOW-2397] Support affinity policies for Kubernetes executor/operator

KubernetesPodOperator now accept a dict type
parameter called "affinity", which represents a
group of affinity scheduling rules (nodeAffinity,
podAffinity, podAntiAffinity).

API reference: https://kubernetes.io/docs/referenc
e/generated/kubernetes-api/v1.10/#affinity-v1-core

Closes #3369 from imroc/AIRFLOW-2397


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/fff87b5c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/fff87b5c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/fff87b5c

Branch: refs/heads/master
Commit: fff87b5cfdfac904c9ddd8ca84e0aa192379080f
Parents: 8482b20
Author: roc <[email protected]>
Authored: Sat May 19 00:47:53 2018 +0200
Committer: Fokko Driesprong <[email protected]>
Committed: Sat May 19 00:47:53 2018 +0200

----------------------------------------------------------------------
 .../kubernetes_request_factory.py               |  6 +++
 .../pod_request_factory.py                      |  1 +
 airflow/contrib/kubernetes/pod.py               |  6 ++-
 .../operators/kubernetes_pod_operator.py        |  5 ++
 docs/kubernetes.rst                             | 53 +++++++++++++++++++-
 5 files changed, 69 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fff87b5c/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
----------------------------------------------------------------------
diff --git 
a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
 
b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
index 12d05ec..7133125 100644
--- 
a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
+++ 
b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
@@ -70,6 +70,12 @@ class KubernetesRequestFactory:
             req['metadata']['annotations'][k] = v
 
     @staticmethod
+    def extract_affinity(pod, req):
+        req['spec']['affinity'] = req['spec'].get('affinity', {})
+        for k, v in six.iteritems(pod.affinity):
+            req['spec']['affinity'][k] = v
+
+    @staticmethod
     def extract_cmds(pod, req):
         req['spec']['containers'][0]['command'] = pod.cmds
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fff87b5c/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
----------------------------------------------------------------------
diff --git 
a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py 
b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
index 0f06d49..b41ee8a 100644
--- 
a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
+++ 
b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
@@ -58,4 +58,5 @@ spec:
         self.extract_init_containers(pod, req)
         self.extract_image_pull_secrets(pod, req)
         self.extract_annotations(pod, req)
+        self.extract_affinity(pod, req)
         return req

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fff87b5c/airflow/contrib/kubernetes/pod.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod.py 
b/airflow/contrib/kubernetes/pod.py
index e740bae..c422214 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/contrib/kubernetes/pod.py
@@ -54,6 +54,8 @@ class Pod:
     :type result: any
     :param image_pull_policy: Specify a policy to cache or always pull an image
     :type image_pull_policy: str
+    :param affinity: A dict containing a group of affinity scheduling rules
+    :type affinity: dict
     """
     def __init__(
             self,
@@ -74,7 +76,8 @@ class Pod:
             init_containers=None,
             service_account_name=None,
             resources=None,
-            annotations=None
+            annotations=None,
+            affinity=None
     ):
         self.image = image
         self.envs = envs or {}
@@ -94,3 +97,4 @@ class Pod:
         self.service_account_name = service_account_name
         self.resources = resources or Resources()
         self.annotations = annotations or {}
+        self.affinity = affinity or {}

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fff87b5c/airflow/contrib/operators/kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py 
b/airflow/contrib/operators/kubernetes_pod_operator.py
index ffdc2cf..31ffd92 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -64,6 +64,8 @@ class KubernetesPodOperator(BaseOperator):
     :type in_cluster: bool
     :param get_logs: get the stdout of the container as logs of the tasks
     :type get_logs: bool
+    :param affinity: A dict containing a group of affinity scheduling rules
+    :type affinity: dict
     """
     template_fields = ('cmds', 'arguments', 'env_vars')
 
@@ -91,6 +93,7 @@ class KubernetesPodOperator(BaseOperator):
             pod.image_pull_policy = self.image_pull_policy
             pod.annotations = self.annotations
             pod.resources = self.resources
+            pod.affinity = self.affinity
 
             launcher = pod_launcher.PodLauncher(client)
             final_state = launcher.run_pod(
@@ -122,6 +125,7 @@ class KubernetesPodOperator(BaseOperator):
                  image_pull_policy='IfNotPresent',
                  annotations=None,
                  resources=None,
+                 affinity=None,
                  *args,
                  **kwargs):
         super(KubernetesPodOperator, self).__init__(*args, **kwargs)
@@ -140,4 +144,5 @@ class KubernetesPodOperator(BaseOperator):
         self.get_logs = get_logs
         self.image_pull_policy = image_pull_policy
         self.annotations = annotations or {}
+        self.affinity = affinity or {}
         self.resources = resources or Resources()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fff87b5c/docs/kubernetes.rst
----------------------------------------------------------------------
diff --git a/docs/kubernetes.rst b/docs/kubernetes.rst
index 9358250..a491685 100644
--- a/docs/kubernetes.rst
+++ b/docs/kubernetes.rst
@@ -31,6 +31,56 @@ Kubernetes Operator
           }
         }
     volume = Volume(name='test-volume', configs=volume_config)
+
+    affinity = {
+        'nodeAffinity': {
+          'preferredDuringSchedulingIgnoredDuringExecution': [
+            {
+              "weight": 1,
+              "preference": {
+                "matchExpressions": [
+                  "key": "disktype",
+                  "operator": "In",
+                  "values": ["ssd"]
+                ]
+              }
+            }
+          ]
+        },
+        "podAffinity": {
+          "requiredDuringSchedulingIgnoredDuringExecution": [
+            {
+              "labelSelector": {
+                "matchExpressions": [
+                  {
+                    "key": "security",
+                    "operator": "In",
+                    "values": ["S1"]
+                  }
+                ]
+              },
+              "topologyKey": "failure-domain.beta.kubernetes.io/zone"
+            }
+          ]
+        },
+        "podAntiAffinity": {
+          "requiredDuringSchedulingIgnoredDuringExecution": [
+            {
+              "labelSelector": {
+                "matchExpressions": [
+                  {
+                    "key": "security",
+                    "operator": "In",
+                    "values": ["S2"]
+                  }
+                ]
+              },
+              "topologyKey": "kubernetes.io/hostname"
+            }
+          ]
+        }
+    }
+
     k = KubernetesPodOperator(namespace='default',
                               image="ubuntu:16.04",
                               cmds=["bash", "-cx"],
@@ -40,7 +90,8 @@ Kubernetes Operator
                               volume=[volume],
                               volume_mounts=[volume_mount]
                               name="test",
-                              task_id="task"
+                              task_id="task",
+                              affinity=affinity
                               )
 
 

Reply via email to