Repository: incubator-airflow Updated Branches: refs/heads/master 8482b208b -> fff87b5cf
[AIRFLOW-2397] Support affinity policies for Kubernetes executor/operator KubernetesPodOperator now accept a dict type parameter called "affinity", which represents a group of affinity scheduling rules (nodeAffinity, podAffinity, podAntiAffinity). API reference: https://kubernetes.io/docs/referenc e/generated/kubernetes-api/v1.10/#affinity-v1-core Closes #3369 from imroc/AIRFLOW-2397 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/fff87b5c Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/fff87b5c Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/fff87b5c Branch: refs/heads/master Commit: fff87b5cfdfac904c9ddd8ca84e0aa192379080f Parents: 8482b20 Author: roc <[email protected]> Authored: Sat May 19 00:47:53 2018 +0200 Committer: Fokko Driesprong <[email protected]> Committed: Sat May 19 00:47:53 2018 +0200 ---------------------------------------------------------------------- .../kubernetes_request_factory.py | 6 +++ .../pod_request_factory.py | 1 + airflow/contrib/kubernetes/pod.py | 6 ++- .../operators/kubernetes_pod_operator.py | 5 ++ docs/kubernetes.rst | 53 +++++++++++++++++++- 5 files changed, 69 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fff87b5c/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py index 12d05ec..7133125 100644 --- a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py @@ -70,6 +70,12 @@ class KubernetesRequestFactory: req['metadata']['annotations'][k] = v @staticmethod + def extract_affinity(pod, req): + req['spec']['affinity'] = req['spec'].get('affinity', {}) + for k, v in six.iteritems(pod.affinity): + req['spec']['affinity'][k] = v + + @staticmethod def extract_cmds(pod, req): req['spec']['containers'][0]['command'] = pod.cmds http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fff87b5c/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py index 0f06d49..b41ee8a 100644 --- a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py @@ -58,4 +58,5 @@ spec: self.extract_init_containers(pod, req) self.extract_image_pull_secrets(pod, req) self.extract_annotations(pod, req) + self.extract_affinity(pod, req) return req http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fff87b5c/airflow/contrib/kubernetes/pod.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py index e740bae..c422214 100644 --- a/airflow/contrib/kubernetes/pod.py +++ b/airflow/contrib/kubernetes/pod.py @@ -54,6 +54,8 @@ class Pod: :type result: any :param image_pull_policy: Specify a policy to cache or always pull an image :type image_pull_policy: str + :param affinity: A dict containing a group of affinity scheduling rules + :type affinity: dict """ def __init__( self, @@ -74,7 +76,8 @@ class Pod: init_containers=None, service_account_name=None, resources=None, - annotations=None + annotations=None, + affinity=None ): self.image = image self.envs = envs or {} @@ -94,3 +97,4 @@ class Pod: self.service_account_name = service_account_name self.resources = resources or Resources() self.annotations = annotations or {} + self.affinity = affinity or {} http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fff87b5c/airflow/contrib/operators/kubernetes_pod_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py index ffdc2cf..31ffd92 100644 --- a/airflow/contrib/operators/kubernetes_pod_operator.py +++ b/airflow/contrib/operators/kubernetes_pod_operator.py @@ -64,6 +64,8 @@ class KubernetesPodOperator(BaseOperator): :type in_cluster: bool :param get_logs: get the stdout of the container as logs of the tasks :type get_logs: bool + :param affinity: A dict containing a group of affinity scheduling rules + :type affinity: dict """ template_fields = ('cmds', 'arguments', 'env_vars') @@ -91,6 +93,7 @@ class KubernetesPodOperator(BaseOperator): pod.image_pull_policy = self.image_pull_policy pod.annotations = self.annotations pod.resources = self.resources + pod.affinity = self.affinity launcher = pod_launcher.PodLauncher(client) final_state = launcher.run_pod( @@ -122,6 +125,7 @@ class KubernetesPodOperator(BaseOperator): image_pull_policy='IfNotPresent', annotations=None, resources=None, + affinity=None, *args, **kwargs): super(KubernetesPodOperator, self).__init__(*args, **kwargs) @@ -140,4 +144,5 @@ class KubernetesPodOperator(BaseOperator): self.get_logs = get_logs self.image_pull_policy = image_pull_policy self.annotations = annotations or {} + self.affinity = affinity or {} self.resources = resources or Resources() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fff87b5c/docs/kubernetes.rst ---------------------------------------------------------------------- diff --git a/docs/kubernetes.rst b/docs/kubernetes.rst index 9358250..a491685 100644 --- a/docs/kubernetes.rst +++ b/docs/kubernetes.rst @@ -31,6 +31,56 @@ Kubernetes Operator } } volume = Volume(name='test-volume', configs=volume_config) + + affinity = { + 'nodeAffinity': { + 'preferredDuringSchedulingIgnoredDuringExecution': [ + { + "weight": 1, + "preference": { + "matchExpressions": [ + "key": "disktype", + "operator": "In", + "values": ["ssd"] + ] + } + } + ] + }, + "podAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": [ + { + "labelSelector": { + "matchExpressions": [ + { + "key": "security", + "operator": "In", + "values": ["S1"] + } + ] + }, + "topologyKey": "failure-domain.beta.kubernetes.io/zone" + } + ] + }, + "podAntiAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": [ + { + "labelSelector": { + "matchExpressions": [ + { + "key": "security", + "operator": "In", + "values": ["S2"] + } + ] + }, + "topologyKey": "kubernetes.io/hostname" + } + ] + } + } + k = KubernetesPodOperator(namespace='default', image="ubuntu:16.04", cmds=["bash", "-cx"], @@ -40,7 +90,8 @@ Kubernetes Operator volume=[volume], volume_mounts=[volume_mount] name="test", - task_id="task" + task_id="task", + affinity=affinity )
