[AIRFLOW-1517] Restore authorship of secrets and init container
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/361dad95 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/361dad95 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/361dad95 Branch: refs/heads/master Commit: 361dad957cca17eb37964c5945446bf62d74592f Parents: c5ced07 Author: Benjamin Goldberg <[email protected]> Authored: Wed Dec 27 08:30:04 2017 -0600 Committer: Benjamin Goldberg <[email protected]> Committed: Wed Dec 27 08:44:39 2017 -0600 ---------------------------------------------------------------------- .../kubernetes_request_factory.py | 48 +++++++++ .../pod_request_factory.py | 4 + airflow/contrib/kubernetes/pod.py | 3 + airflow/contrib/kubernetes/pod_generator.py | 105 +++++++++++++++++++ airflow/contrib/kubernetes/secret.py | 36 +++++++ 5 files changed, 196 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/361dad95/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py index 88d3f32..9398bef 100644 --- a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py @@ -89,6 +89,37 @@ class KubernetesRequestFactory: req['metadata']['name'] = pod.name @staticmethod + def extract_volume_secrets(pod, req): + vol_secrets = [s for s in pod.secrets if s.deploy_type == 'volume'] + if any(vol_secrets): + req['spec']['containers'][0]['volumeMounts'] = [] + req['spec']['volumes'] = [] + for idx, vol in enumerate(vol_secrets): + vol_id = 'secretvol' + str(idx) + req['spec']['containers'][0]['volumeMounts'].append({ + 'mountPath': vol.deploy_target, + 'name': vol_id, + 'readOnly': True + }) + req['spec']['volumes'].append({ + 'name': vol_id, + 'secret': { + 'secretName': vol.secret + } + }) + + @staticmethod + def extract_env_and_secrets(pod, req): + env_secrets = [s for s in pod.secrets if s.deploy_type == 'env'] + if len(pod.envs) > 0 or len(env_secrets) > 0: + env = [] + for k in pod.envs.keys(): + env.append({'name': k, 'value': pod.envs[k]}) + for secret in env_secrets: + KubernetesRequestFactory.add_secret_to_env(env, secret) + req['spec']['containers'][0]['env'] = env + + @staticmethod def extract_resources(pod, req): if not pod.resources or pod.resources.is_empty_resource_request(): return @@ -112,3 +143,20 @@ class KubernetesRequestFactory: if pod.resources.request_cpu: req['spec']['containers'][0]['resources']['limits'][ 'cpu'] = pod.resources.limit_cpu + + @staticmethod + def extract_init_containers(pod, req): + if pod.init_containers: + req['spec']['initContainers'] = pod.init_containers + + @staticmethod + def extract_service_account_name(pod, req): + if pod.service_account_name: + req['spec']['serviceAccountName'] = pod.service_account_name + + @staticmethod + def extract_image_pull_secrets(pod, req): + if pod.image_pull_secrets: + req['spec']['imagePullSecrets'] = [{ + 'name': pull_secret + } for pull_secret in pod.image_pull_secrets.split(',')] http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/361dad95/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py index ac9ded1..3be1a13 100644 --- a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py @@ -45,8 +45,12 @@ spec: self.extract_cmds(pod, req) self.extract_args(pod, req) self.extract_node_selector(pod, req) + self.extract_env_and_secrets(pod, req) self.extract_volume_secrets(pod, req) self.attach_volumes(pod, req) self.attach_volume_mounts(pod, req) self.extract_resources(pod, req) + self.extract_service_account_name(pod, req) + self.extract_init_containers(pod, req) + self.extract_image_pull_secrets(pod, req) return req http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/361dad95/airflow/contrib/kubernetes/pod.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py index f6a0583..6a9f76d 100644 --- a/airflow/contrib/kubernetes/pod.py +++ b/airflow/contrib/kubernetes/pod.py @@ -86,4 +86,7 @@ class Pod: self.node_selectors = node_selectors or [] self.namespace = namespace self.image_pull_policy = image_pull_policy + self.image_pull_secrets = image_pull_secrets + self.init_containers = init_containers + self.service_account_name = service_account_name self.resources = resources or Resources() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/361dad95/airflow/contrib/kubernetes/pod_generator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/pod_generator.py b/airflow/contrib/kubernetes/pod_generator.py index 69c6fcb..685be37 100644 --- a/airflow/contrib/kubernetes/pod_generator.py +++ b/airflow/contrib/kubernetes/pod_generator.py @@ -29,6 +29,39 @@ class PodGenerator: self.init_containers = [] self.secrets = [] + def add_init_container(self, + name, + image, + securityContext, + init_environment, + volume_mounts + ): + """ + + Adds an init container to the launched pod. useful for pre- + + Args: + name (str): + image (str): + securityContext (dict): + init_environment (dict): + volume_mounts (dict): + + Returns: + + """ + self.init_containers.append( + { + 'name': name, + 'image': image, + 'securityContext': securityContext, + 'env': init_environment, + 'volumeMounts': volume_mounts + } + ) + + def _get_init_containers(self): + return self.init_containers def add_volume(self, name): """ @@ -76,6 +109,12 @@ class PodGenerator: def _get_volumes_and_mounts(self): return self.volumes, self.volume_mounts + def _get_image_pull_secrets(self): + """Extracts any image pull secrets for fetching container(s)""" + if not self.kube_config.image_pull_secrets: + return [] + return self.kube_config.image_pull_secrets.split(',') + def make_pod(self, namespace, image, pod_id, cmds, arguments, labels, kube_executor_config=None): volumes, volume_mounts = self._get_volumes_and_mounts() @@ -97,6 +136,9 @@ class PodGenerator: labels=labels, envs=self.env_vars, secrets={}, + # service_account_name=self.kube_config.worker_service_account_name, + # image_pull_secrets=self.kube_config.image_pull_secrets, + init_containers=worker_init_container_spec, volumes=volumes, volume_mounts=volume_mounts, resources=None @@ -128,12 +170,32 @@ class WorkerGenerator(PodGenerator): 'readOnly': True }] + # Mount the airflow.cfg file via a configmap the user has specified + if self.kube_config.airflow_configmap: + config_volume_name = "airflow-config" + config_path = '{}/airflow.cfg'.format(self.kube_config.airflow_home) + volumes.append({ + 'name': config_volume_name, + 'configMap': { + 'name': self.kube_config.airflow_configmap + } + }) + volume_mounts.append({ + 'name': config_volume_name, + 'mountPath': config_path, + 'subPath': 'airflow.cfg', + 'readOnly': True + }) + # A PV with the DAGs should be mounted if self.kube_config.dags_volume_claim: volumes[0]['persistentVolumeClaim'] = { "claimName": self.kube_config.dags_volume_claim} if self.kube_config.dags_volume_subpath: volume_mounts[0]["subPath"] = self.kube_config.dags_volume_subpath + else: + # Create a Shared Volume for the Git-Sync module to populate + volumes[0]["emptyDir"] = {} return volumes, volume_mounts def _init_labels(self, dag_id, task_id, execution_date): @@ -154,6 +216,49 @@ class WorkerGenerator(PodGenerator): env['AIRFLOW__CORE__AIRFLOW_HOME'] = self.kube_config.airflow_home return env + def _init_init_containers(self, volume_mounts): + """When using git to retrieve the DAGs, use the GitSync Init Container""" + # If we're using volume claims to mount the dags, no init container is needed + if self.kube_config.dags_volume_claim: + return [] + + # Otherwise, define a git-sync init container + init_environment = [{ + 'name': 'GIT_SYNC_REPO', + 'value': self.kube_config.git_repo + }, { + 'name': 'GIT_SYNC_BRANCH', + 'value': self.kube_config.git_branch + }, { + 'name': 'GIT_SYNC_ROOT', + 'value': '/tmp' + }, { + 'name': 'GIT_SYNC_DEST', + 'value': 'dags' + }, { + 'name': 'GIT_SYNC_ONE_TIME', + 'value': 'true' + }] + if self.kube_config.git_user: + init_environment.append({ + 'name': 'GIT_SYNC_USERNAME', + 'value': self.kube_config.git_user + }) + if self.kube_config.git_password: + init_environment.append({ + 'name': 'GIT_SYNC_PASSWORD', + 'value': self.kube_config.git_password + }) + + volume_mounts[0]['readOnly'] = False + return [{ + 'name': self.kube_config.git_sync_init_container_name, + 'image': self.kube_config.git_sync_container, + 'securityContext': {'runAsUser': 0}, + 'env': init_environment, + 'volumeMounts': volume_mounts + }] + def make_worker_pod(self, namespace, pod_id, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/361dad95/airflow/contrib/kubernetes/secret.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/secret.py b/airflow/contrib/kubernetes/secret.py index e69de29..15f070e 100644 --- a/airflow/contrib/kubernetes/secret.py +++ b/airflow/contrib/kubernetes/secret.py @@ -0,0 +1,36 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class Secret: + """Defines Kubernetes Secret Volume""" + + def __init__(self, deploy_type, deploy_target, secret, key): + """Initialize a Kubernetes Secret Object. Used to track requested secrets from + the user. + + :param deploy_type: The type of secret deploy in Kubernetes, either `env` or + `volume` + :type deploy_type: ``str`` + :param deploy_target: The environment variable to be created in the worker. + :type deploy_target: ``str`` + :param secret: Name of the secrets object in Kubernetes + :type secret: ``str`` + :param key: Key of the secret within the Kubernetes Secret + :type key: ``str`` + """ + self.deploy_type = deploy_type + self.deploy_target = deploy_target.upper() + self.secret = secret + self.key = key
