Repository: incubator-airflow Updated Branches: refs/heads/master 0ce068e08 -> adb648c94
[AIRFLOW-2662][AIRFLOW-2397] Add k8s node_selectors and affinity Add the ability to set the node selection and the affinity for the k8s executor Closes #3535 from Cplo/affinity Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/adb648c9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/adb648c9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/adb648c9 Branch: refs/heads/master Commit: adb648c9497d38789f43a2a941d7d887b6cee84e Parents: 0ce068e Author: pengchen <[email protected]> Authored: Mon Jun 25 13:09:14 2018 +0200 Committer: Fokko Driesprong <[email protected]> Committed: Mon Jun 25 13:09:16 2018 +0200 ---------------------------------------------------------------------- airflow/config_templates/default_airflow.cfg | 5 ++ .../contrib/executors/kubernetes_executor.py | 19 ++++++-- .../kubernetes_request_factory.py | 5 +- airflow/contrib/kubernetes/pod.py | 2 +- .../contrib/kubernetes/worker_configuration.py | 5 +- .../operators/kubernetes_pod_operator.py | 5 ++ scripts/ci/kubernetes/kube/configmaps.yaml | 5 ++ .../minikube/test_kubernetes_pod_operator.py | 48 ++++++++++++++++++++ 8 files changed, 85 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/adb648c9/airflow/config_templates/default_airflow.cfg ---------------------------------------------------------------------- diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index cd139d3..fe99ece 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -611,6 +611,11 @@ gcp_service_account_keys = # It will raise an exception if called from a process not running in a kubernetes environment. in_cluster = True +[kubernetes_node_selectors] +# The Key-value pairs to be given to worker pods. +# The worker pods will be scheduled to the nodes of the specified key-value pairs. +# Should be supplied in the format: key = value + [kubernetes_secrets] # The scheduler mounts the following secrets into your workers as they are launched by the # scheduler. You may define as many secrets as needed and the kubernetes launcher will parse the http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/adb648c9/airflow/contrib/executors/kubernetes_executor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py index 65053bd..4ea52c4 100644 --- a/airflow/contrib/executors/kubernetes_executor.py +++ b/airflow/contrib/executors/kubernetes_executor.py @@ -39,7 +39,7 @@ from airflow.utils.log.logging_mixin import LoggingMixin class KubernetesExecutorConfig: def __init__(self, image=None, image_pull_policy=None, request_memory=None, request_cpu=None, limit_memory=None, limit_cpu=None, - gcp_service_account_key=None): + gcp_service_account_key=None, node_selectors=None, affinity=None): self.image = image self.image_pull_policy = image_pull_policy self.request_memory = request_memory @@ -47,13 +47,17 @@ class KubernetesExecutorConfig: self.limit_memory = limit_memory self.limit_cpu = limit_cpu self.gcp_service_account_key = gcp_service_account_key + self.node_selectors = node_selectors + self.affinity = affinity def __repr__(self): return "{}(image={}, image_pull_policy={}, request_memory={}, request_cpu={}, " \ - "limit_memory={}, limit_cpu={}, gcp_service_account_key={})" \ + "limit_memory={}, limit_cpu={}, gcp_service_account_key={}, " \ + "node_selectors={}, affinity={})" \ .format(KubernetesExecutorConfig.__name__, self.image, self.image_pull_policy, self.request_memory, self.request_cpu, self.limit_memory, - self.limit_cpu, self.gcp_service_account_key) + self.limit_cpu, self.gcp_service_account_key, self.node_selectors, + self.affinity) @staticmethod def from_dict(obj): @@ -73,7 +77,9 @@ class KubernetesExecutorConfig: request_cpu=namespaced.get('request_cpu', None), limit_memory=namespaced.get('limit_memory', None), limit_cpu=namespaced.get('limit_cpu', None), - gcp_service_account_key=namespaced.get('gcp_service_account_key', None) + gcp_service_account_key=namespaced.get('gcp_service_account_key', None), + node_selectors=namespaced.get('node_selectors', None), + affinity=namespaced.get('affinity', None) ) def as_dict(self): @@ -84,7 +90,9 @@ class KubernetesExecutorConfig: 'request_cpu': self.request_cpu, 'limit_memory': self.limit_memory, 'limit_cpu': self.limit_cpu, - 'gcp_service_account_key': self.gcp_service_account_key + 'gcp_service_account_key': self.gcp_service_account_key, + 'node_selectors': self.node_selectors, + 'affinity': self.affinity } @@ -108,6 +116,7 @@ class KubeConfig: self.kube_image_pull_policy = configuration.get( self.kubernetes_section, "worker_container_image_pull_policy" ) + self.kube_node_selectors = configuration_dict.get('kubernetes_node_selectors', {}) self.delete_worker_pods = conf.getboolean( self.kubernetes_section, 'delete_worker_pods') http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/adb648c9/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py index 7133125..27e0ebd 100644 --- a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py @@ -85,8 +85,9 @@ class KubernetesRequestFactory: @staticmethod def extract_node_selector(pod, req): - if len(pod.node_selectors) > 0: - req['spec']['nodeSelector'] = pod.node_selectors + req['spec']['nodeSelector'] = req['spec'].get('nodeSelector', {}) + for k, v in six.iteritems(pod.node_selectors): + req['spec']['nodeSelector'][k] = v @staticmethod def attach_volumes(pod, req): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/adb648c9/airflow/contrib/kubernetes/pod.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py index c422214..6fcf354 100644 --- a/airflow/contrib/kubernetes/pod.py +++ b/airflow/contrib/kubernetes/pod.py @@ -89,7 +89,7 @@ class Pod: self.name = name self.volumes = volumes or [] self.volume_mounts = volume_mounts or [] - self.node_selectors = node_selectors or [] + self.node_selectors = node_selectors or {} self.namespace = namespace self.image_pull_policy = image_pull_policy self.image_pull_secrets = image_pull_secrets http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/adb648c9/airflow/contrib/kubernetes/worker_configuration.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/worker_configuration.py b/airflow/contrib/kubernetes/worker_configuration.py index 784bb77..059b352 100644 --- a/airflow/contrib/kubernetes/worker_configuration.py +++ b/airflow/contrib/kubernetes/worker_configuration.py @@ -215,5 +215,8 @@ class WorkerConfiguration(LoggingMixin): volumes=volumes, volume_mounts=volume_mounts, resources=resources, - annotations=annotations + annotations=annotations, + node_selectors=(kube_executor_config.node_selectors or + self.kube_config.kube_node_selectors), + affinity=kube_executor_config.affinity ) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/adb648c9/airflow/contrib/operators/kubernetes_pod_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py index bf656f1..fb90562 100644 --- a/airflow/contrib/operators/kubernetes_pod_operator.py +++ b/airflow/contrib/operators/kubernetes_pod_operator.py @@ -70,6 +70,8 @@ class KubernetesPodOperator(BaseOperator): :type get_logs: bool :param affinity: A dict containing a group of affinity scheduling rules :type affinity: dict + :param node_selectors: A dict containing a group of scheduling rules + :type node_selectors: dict :param config_file: The path to the Kubernetes config file :type config_file: str :param xcom_push: If xcom_push is True, the content of the file @@ -106,6 +108,7 @@ class KubernetesPodOperator(BaseOperator): pod.annotations = self.annotations pod.resources = self.resources pod.affinity = self.affinity + pod.node_selectors = self.node_selectors launcher = pod_launcher.PodLauncher(kube_client=client, extract_xcom=self.xcom_push) @@ -144,6 +147,7 @@ class KubernetesPodOperator(BaseOperator): affinity=None, config_file=None, xcom_push=False, + node_selectors=None, *args, **kwargs): super(KubernetesPodOperator, self).__init__(*args, **kwargs) @@ -162,6 +166,7 @@ class KubernetesPodOperator(BaseOperator): self.cluster_context = cluster_context self.get_logs = get_logs self.image_pull_policy = image_pull_policy + self.node_selectors = node_selectors or {} self.annotations = annotations or {} self.affinity = affinity or {} self.xcom_push = xcom_push http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/adb648c9/scripts/ci/kubernetes/kube/configmaps.yaml ---------------------------------------------------------------------- diff --git a/scripts/ci/kubernetes/kube/configmaps.yaml b/scripts/ci/kubernetes/kube/configmaps.yaml index 7b91aa2..97556bf 100644 --- a/scripts/ci/kubernetes/kube/configmaps.yaml +++ b/scripts/ci/kubernetes/kube/configmaps.yaml @@ -198,6 +198,11 @@ data: git_sync_container_tag = v2.0.5 git_sync_init_container_name = git-sync-clone + [kubernetes_node_selectors] + # The Key-value pairs to be given to worker pods. + # The worker pods will be scheduled to the nodes of the specified key-value pairs. + # Should be supplied in the format: key = value + [kubernetes_secrets] SQL_ALCHEMY_CONN = airflow-secrets=sql_alchemy_conn http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/adb648c9/tests/contrib/minikube/test_kubernetes_pod_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/minikube/test_kubernetes_pod_operator.py b/tests/contrib/minikube/test_kubernetes_pod_operator.py index 5c799f4..531343e 100644 --- a/tests/contrib/minikube/test_kubernetes_pod_operator.py +++ b/tests/contrib/minikube/test_kubernetes_pod_operator.py @@ -91,6 +91,54 @@ class KubernetesPodOperatorTest(unittest.TestCase): ) k.execute(None) + def test_pod_node_selectors(self): + node_selectors = { + 'beta.kubernetes.io/os': 'linux' + } + k = KubernetesPodOperator( + namespace='default', + image="ubuntu:16.04", + cmds=["bash", "-cx"], + arguments=["echo", "10"], + labels={"foo": "bar"}, + name="test", + task_id="task", + node_selectors=node_selectors, + executor_config={'KubernetesExecutor': {'node_selectors': node_selectors}} + ) + k.execute(None) + + def test_pod_affinity(self): + affinity = { + 'nodeAffinity': { + 'requiredDuringSchedulingIgnoredDuringExecution': { + 'nodeSelectorTerms': [ + { + 'matchExpressions': [ + { + 'key': 'beta.kubernetes.io/os', + 'operator': 'In', + 'values': ['linux'] + } + ] + } + ] + } + } + } + k = KubernetesPodOperator( + namespace='default', + image="ubuntu:16.04", + cmds=["bash", "-cx"], + arguments=["echo", "10"], + labels={"foo": "bar"}, + name="test", + task_id="task", + affinity=affinity, + executor_config={'KubernetesExecutor': {'affinity': affinity}} + ) + k.execute(None) + def test_logging(self): with mock.patch.object(PodLauncher, 'log') as mock_logger: k = KubernetesPodOperator(
