Repository: incubator-airflow
Updated Branches:
  refs/heads/master 0ce068e08 -> adb648c94


[AIRFLOW-2662][AIRFLOW-2397] Add k8s node_selectors and affinity

Add the ability to set the node selection and the affinity
for the k8s executor

Closes #3535 from Cplo/affinity


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/adb648c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/adb648c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/adb648c9

Branch: refs/heads/master
Commit: adb648c9497d38789f43a2a941d7d887b6cee84e
Parents: 0ce068e
Author: pengchen <[email protected]>
Authored: Mon Jun 25 13:09:14 2018 +0200
Committer: Fokko Driesprong <[email protected]>
Committed: Mon Jun 25 13:09:16 2018 +0200

----------------------------------------------------------------------
 airflow/config_templates/default_airflow.cfg    |  5 ++
 .../contrib/executors/kubernetes_executor.py    | 19 ++++++--
 .../kubernetes_request_factory.py               |  5 +-
 airflow/contrib/kubernetes/pod.py               |  2 +-
 .../contrib/kubernetes/worker_configuration.py  |  5 +-
 .../operators/kubernetes_pod_operator.py        |  5 ++
 scripts/ci/kubernetes/kube/configmaps.yaml      |  5 ++
 .../minikube/test_kubernetes_pod_operator.py    | 48 ++++++++++++++++++++
 8 files changed, 85 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/adb648c9/airflow/config_templates/default_airflow.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index cd139d3..fe99ece 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -611,6 +611,11 @@ gcp_service_account_keys =
 # It will raise an exception if called from a process not running in a 
kubernetes environment.
 in_cluster = True
 
+[kubernetes_node_selectors]
+# The Key-value pairs to be given to worker pods.
+# The worker pods will be scheduled to the nodes of the specified key-value 
pairs.
+# Should be supplied in the format: key = value
+
 [kubernetes_secrets]
 # The scheduler mounts the following secrets into your workers as they are 
launched by the
 # scheduler. You may define as many secrets as needed and the kubernetes 
launcher will parse the

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/adb648c9/airflow/contrib/executors/kubernetes_executor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/executors/kubernetes_executor.py 
b/airflow/contrib/executors/kubernetes_executor.py
index 65053bd..4ea52c4 100644
--- a/airflow/contrib/executors/kubernetes_executor.py
+++ b/airflow/contrib/executors/kubernetes_executor.py
@@ -39,7 +39,7 @@ from airflow.utils.log.logging_mixin import LoggingMixin
 class KubernetesExecutorConfig:
     def __init__(self, image=None, image_pull_policy=None, request_memory=None,
                  request_cpu=None, limit_memory=None, limit_cpu=None,
-                 gcp_service_account_key=None):
+                 gcp_service_account_key=None, node_selectors=None, 
affinity=None):
         self.image = image
         self.image_pull_policy = image_pull_policy
         self.request_memory = request_memory
@@ -47,13 +47,17 @@ class KubernetesExecutorConfig:
         self.limit_memory = limit_memory
         self.limit_cpu = limit_cpu
         self.gcp_service_account_key = gcp_service_account_key
+        self.node_selectors = node_selectors
+        self.affinity = affinity
 
     def __repr__(self):
         return "{}(image={}, image_pull_policy={}, request_memory={}, 
request_cpu={}, " \
-               "limit_memory={}, limit_cpu={}, gcp_service_account_key={})" \
+               "limit_memory={}, limit_cpu={}, gcp_service_account_key={}, " \
+               "node_selectors={}, affinity={})" \
             .format(KubernetesExecutorConfig.__name__, self.image, 
self.image_pull_policy,
                     self.request_memory, self.request_cpu, self.limit_memory,
-                    self.limit_cpu, self.gcp_service_account_key)
+                    self.limit_cpu, self.gcp_service_account_key, 
self.node_selectors,
+                    self.affinity)
 
     @staticmethod
     def from_dict(obj):
@@ -73,7 +77,9 @@ class KubernetesExecutorConfig:
             request_cpu=namespaced.get('request_cpu', None),
             limit_memory=namespaced.get('limit_memory', None),
             limit_cpu=namespaced.get('limit_cpu', None),
-            gcp_service_account_key=namespaced.get('gcp_service_account_key', 
None)
+            gcp_service_account_key=namespaced.get('gcp_service_account_key', 
None),
+            node_selectors=namespaced.get('node_selectors', None),
+            affinity=namespaced.get('affinity', None)
         )
 
     def as_dict(self):
@@ -84,7 +90,9 @@ class KubernetesExecutorConfig:
             'request_cpu': self.request_cpu,
             'limit_memory': self.limit_memory,
             'limit_cpu': self.limit_cpu,
-            'gcp_service_account_key': self.gcp_service_account_key
+            'gcp_service_account_key': self.gcp_service_account_key,
+            'node_selectors': self.node_selectors,
+            'affinity': self.affinity
         }
 
 
@@ -108,6 +116,7 @@ class KubeConfig:
         self.kube_image_pull_policy = configuration.get(
             self.kubernetes_section, "worker_container_image_pull_policy"
         )
+        self.kube_node_selectors = 
configuration_dict.get('kubernetes_node_selectors', {})
         self.delete_worker_pods = conf.getboolean(
             self.kubernetes_section, 'delete_worker_pods')
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/adb648c9/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
----------------------------------------------------------------------
diff --git 
a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
 
b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
index 7133125..27e0ebd 100644
--- 
a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
+++ 
b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
@@ -85,8 +85,9 @@ class KubernetesRequestFactory:
 
     @staticmethod
     def extract_node_selector(pod, req):
-        if len(pod.node_selectors) > 0:
-            req['spec']['nodeSelector'] = pod.node_selectors
+        req['spec']['nodeSelector'] = req['spec'].get('nodeSelector', {})
+        for k, v in six.iteritems(pod.node_selectors):
+            req['spec']['nodeSelector'][k] = v
 
     @staticmethod
     def attach_volumes(pod, req):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/adb648c9/airflow/contrib/kubernetes/pod.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod.py 
b/airflow/contrib/kubernetes/pod.py
index c422214..6fcf354 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/contrib/kubernetes/pod.py
@@ -89,7 +89,7 @@ class Pod:
         self.name = name
         self.volumes = volumes or []
         self.volume_mounts = volume_mounts or []
-        self.node_selectors = node_selectors or []
+        self.node_selectors = node_selectors or {}
         self.namespace = namespace
         self.image_pull_policy = image_pull_policy
         self.image_pull_secrets = image_pull_secrets

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/adb648c9/airflow/contrib/kubernetes/worker_configuration.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/worker_configuration.py 
b/airflow/contrib/kubernetes/worker_configuration.py
index 784bb77..059b352 100644
--- a/airflow/contrib/kubernetes/worker_configuration.py
+++ b/airflow/contrib/kubernetes/worker_configuration.py
@@ -215,5 +215,8 @@ class WorkerConfiguration(LoggingMixin):
             volumes=volumes,
             volume_mounts=volume_mounts,
             resources=resources,
-            annotations=annotations
+            annotations=annotations,
+            node_selectors=(kube_executor_config.node_selectors or
+                            self.kube_config.kube_node_selectors),
+            affinity=kube_executor_config.affinity
         )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/adb648c9/airflow/contrib/operators/kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py 
b/airflow/contrib/operators/kubernetes_pod_operator.py
index bf656f1..fb90562 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -70,6 +70,8 @@ class KubernetesPodOperator(BaseOperator):
     :type get_logs: bool
     :param affinity: A dict containing a group of affinity scheduling rules
     :type affinity: dict
+    :param node_selectors: A dict containing a group of scheduling rules
+    :type node_selectors: dict
     :param config_file: The path to the Kubernetes config file
     :type config_file: str
     :param xcom_push: If xcom_push is True, the content of the file
@@ -106,6 +108,7 @@ class KubernetesPodOperator(BaseOperator):
             pod.annotations = self.annotations
             pod.resources = self.resources
             pod.affinity = self.affinity
+            pod.node_selectors = self.node_selectors
 
             launcher = pod_launcher.PodLauncher(kube_client=client,
                                                 extract_xcom=self.xcom_push)
@@ -144,6 +147,7 @@ class KubernetesPodOperator(BaseOperator):
                  affinity=None,
                  config_file=None,
                  xcom_push=False,
+                 node_selectors=None,
                  *args,
                  **kwargs):
         super(KubernetesPodOperator, self).__init__(*args, **kwargs)
@@ -162,6 +166,7 @@ class KubernetesPodOperator(BaseOperator):
         self.cluster_context = cluster_context
         self.get_logs = get_logs
         self.image_pull_policy = image_pull_policy
+        self.node_selectors = node_selectors or {}
         self.annotations = annotations or {}
         self.affinity = affinity or {}
         self.xcom_push = xcom_push

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/adb648c9/scripts/ci/kubernetes/kube/configmaps.yaml
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/kube/configmaps.yaml 
b/scripts/ci/kubernetes/kube/configmaps.yaml
index 7b91aa2..97556bf 100644
--- a/scripts/ci/kubernetes/kube/configmaps.yaml
+++ b/scripts/ci/kubernetes/kube/configmaps.yaml
@@ -198,6 +198,11 @@ data:
     git_sync_container_tag = v2.0.5
     git_sync_init_container_name = git-sync-clone
 
+    [kubernetes_node_selectors]
+    # The Key-value pairs to be given to worker pods.
+    # The worker pods will be scheduled to the nodes of the specified 
key-value pairs.
+    # Should be supplied in the format: key = value
+
     [kubernetes_secrets]
     SQL_ALCHEMY_CONN = airflow-secrets=sql_alchemy_conn
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/adb648c9/tests/contrib/minikube/test_kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube/test_kubernetes_pod_operator.py 
b/tests/contrib/minikube/test_kubernetes_pod_operator.py
index 5c799f4..531343e 100644
--- a/tests/contrib/minikube/test_kubernetes_pod_operator.py
+++ b/tests/contrib/minikube/test_kubernetes_pod_operator.py
@@ -91,6 +91,54 @@ class KubernetesPodOperatorTest(unittest.TestCase):
         )
         k.execute(None)
 
+    def test_pod_node_selectors(self):
+        node_selectors = {
+            'beta.kubernetes.io/os': 'linux'
+        }
+        k = KubernetesPodOperator(
+            namespace='default',
+            image="ubuntu:16.04",
+            cmds=["bash", "-cx"],
+            arguments=["echo", "10"],
+            labels={"foo": "bar"},
+            name="test",
+            task_id="task",
+            node_selectors=node_selectors,
+            executor_config={'KubernetesExecutor': {'node_selectors': 
node_selectors}}
+        )
+        k.execute(None)
+
+    def test_pod_affinity(self):
+        affinity = {
+            'nodeAffinity': {
+                'requiredDuringSchedulingIgnoredDuringExecution': {
+                    'nodeSelectorTerms': [
+                        {
+                            'matchExpressions': [
+                                {
+                                    'key': 'beta.kubernetes.io/os',
+                                    'operator': 'In',
+                                    'values': ['linux']
+                                }
+                            ]
+                        }
+                    ]
+                }
+            }
+        }
+        k = KubernetesPodOperator(
+            namespace='default',
+            image="ubuntu:16.04",
+            cmds=["bash", "-cx"],
+            arguments=["echo", "10"],
+            labels={"foo": "bar"},
+            name="test",
+            task_id="task",
+            affinity=affinity,
+            executor_config={'KubernetesExecutor': {'affinity': affinity}}
+        )
+        k.execute(None)
+
     def test_logging(self):
         with mock.patch.object(PodLauncher, 'log') as mock_logger:
             k = KubernetesPodOperator(

Reply via email to