Repository: incubator-airflow Updated Branches: refs/heads/master b48bbbd6f -> 1abe7f6d5
[AIRFLOW-1517] Kubernetes Operator Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/78ff2fc1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/78ff2fc1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/78ff2fc1 Branch: refs/heads/master Commit: 78ff2fc180808a38c53cc643bd87c509d7540b4a Parents: c0dffb5 Author: Daniel Imberman <[email protected]> Authored: Thu Dec 7 09:41:05 2017 -0600 Committer: Daniel Imberman <[email protected]> Committed: Tue Dec 26 08:45:31 2017 -0800 ---------------------------------------------------------------------- .travis.yml | 5 +- airflow/contrib/kubernetes/__init__.py | 13 + airflow/contrib/kubernetes/kube_client.py | 34 +++ .../kubernetes_request_factory/__init__.py | 12 + .../kubernetes_request_factory.py | 162 +++++++++++ .../pod_request_factory.py | 56 ++++ airflow/contrib/kubernetes/pod.py | 92 ++++++ airflow/contrib/kubernetes/pod_generator.py | 278 +++++++++++++++++++ airflow/contrib/kubernetes/pod_launcher.py | 119 ++++++++ airflow/contrib/kubernetes/secret.py | 36 +++ .../operators/kubernetes_pod_operator.py | 71 +++++ .../ci/kubernetes/minikube/start_minikube.sh | 53 ++++ scripts/ci/kubernetes/setup_kubernetes.sh | 28 ++ scripts/ci/requirements.txt | 1 + scripts/ci/travis_script.sh | 39 +++ setup.py | 6 +- .../operators/test_kubernetes_pod_operator.py | 69 +++++ 17 files changed, 1069 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index 9b173a9..6b45153 100644 --- a/.travis.yml +++ b/.travis.yml @@ -89,9 +89,6 @@ before_script: - psql -c 'create database airflow;' -U postgres - export PATH=${PATH}:/tmp/hive/bin script: - - pip --version - - ls -l $HOME/.wheelhouse - - tox --version - - tox -e $TOX_ENV + - ./scripts/ci/travis_script.sh after_success: - codecov http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/kubernetes/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/__init__.py b/airflow/contrib/kubernetes/__init__.py new file mode 100644 index 0000000..9d7677a --- /dev/null +++ b/airflow/contrib/kubernetes/__init__.py @@ -0,0 +1,13 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/kubernetes/kube_client.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kube_client.py b/airflow/contrib/kubernetes/kube_client.py new file mode 100644 index 0000000..7dc895e --- /dev/null +++ b/airflow/contrib/kubernetes/kube_client.py @@ -0,0 +1,34 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def load_kube_config(in_cluster=True): + from kubernetes import config, client + if in_cluster: + config.load_incluster_config() + else: + try: + config.load_kube_config() + return client.CoreV1Api() + except NotImplementedError: + NotImplementedError( + "requires incluster config or defined configuration in airflow.cfg") + + +def get_kube_client(in_cluster=True): + # TODO: This should also allow people to point to a cluster. + + from kubernetes import client + load_kube_config(in_cluster) + return client.CoreV1Api() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py b/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py new file mode 100644 index 0000000..9921696 --- /dev/null +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py @@ -0,0 +1,12 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py new file mode 100644 index 0000000..9398bef --- /dev/null +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py @@ -0,0 +1,162 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABCMeta, abstractmethod +import six + + +class KubernetesRequestFactory: + """ + Create requests to be sent to kube API. + Extend this class to talk to kubernetes and generate your specific resources. + This is equivalent of generating yaml files that can be used by `kubectl` + """ + __metaclass__ = ABCMeta + + @abstractmethod + def create(self, pod): + """ + Creates the request for kubernetes API. + + :param pod: The pod object + """ + pass + + @staticmethod + def extract_image(pod, req): + req['spec']['containers'][0]['image'] = pod.image + + @staticmethod + def extract_image_pull_policy(pod, req): + if pod.image_pull_policy: + req['spec']['containers'][0]['imagePullPolicy'] = pod.image_pull_policy + + @staticmethod + def add_secret_to_env(env, secret): + env.append({ + 'name': secret.deploy_target, + 'valueFrom': { + 'secretKeyRef': { + 'name': secret.secret, + 'key': secret.key + } + } + }) + + @staticmethod + def extract_labels(pod, req): + req['metadata']['labels'] = req['metadata'].get('labels', {}) + for k, v in six.iteritems(pod.labels): + req['metadata']['labels'][k] = v + + @staticmethod + def extract_cmds(pod, req): + req['spec']['containers'][0]['command'] = pod.cmds + + @staticmethod + def extract_args(pod, req): + req['spec']['containers'][0]['args'] = pod.args + + @staticmethod + def extract_node_selector(pod, req): + if len(pod.node_selectors) > 0: + req['spec']['nodeSelector'] = pod.node_selectors + + @staticmethod + def attach_volumes(pod, req): + req['spec']['volumes'] = pod.volumes + + @staticmethod + def attach_volume_mounts(pod, req): + if len(pod.volume_mounts) > 0: + req['spec']['containers'][0]['volumeMounts'] = ( + req['spec']['containers'][0].get('volumeMounts', [])) + req['spec']['containers'][0]['volumeMounts'].extend(pod.volume_mounts) + + @staticmethod + def extract_name(pod, req): + req['metadata']['name'] = pod.name + + @staticmethod + def extract_volume_secrets(pod, req): + vol_secrets = [s for s in pod.secrets if s.deploy_type == 'volume'] + if any(vol_secrets): + req['spec']['containers'][0]['volumeMounts'] = [] + req['spec']['volumes'] = [] + for idx, vol in enumerate(vol_secrets): + vol_id = 'secretvol' + str(idx) + req['spec']['containers'][0]['volumeMounts'].append({ + 'mountPath': vol.deploy_target, + 'name': vol_id, + 'readOnly': True + }) + req['spec']['volumes'].append({ + 'name': vol_id, + 'secret': { + 'secretName': vol.secret + } + }) + + @staticmethod + def extract_env_and_secrets(pod, req): + env_secrets = [s for s in pod.secrets if s.deploy_type == 'env'] + if len(pod.envs) > 0 or len(env_secrets) > 0: + env = [] + for k in pod.envs.keys(): + env.append({'name': k, 'value': pod.envs[k]}) + for secret in env_secrets: + KubernetesRequestFactory.add_secret_to_env(env, secret) + req['spec']['containers'][0]['env'] = env + + @staticmethod + def extract_resources(pod, req): + if not pod.resources or pod.resources.is_empty_resource_request(): + return + + req['spec']['containers'][0]['resources'] = {} + + if pod.resources.has_requests(): + req['spec']['containers'][0]['resources']['requests'] = {} + if pod.resources.request_memory: + req['spec']['containers'][0]['resources']['requests'][ + 'memory'] = pod.resources.request_memory + if pod.resources.request_cpu: + req['spec']['containers'][0]['resources']['requests'][ + 'cpu'] = pod.resources.request_cpu + + if pod.resources.has_limits(): + req['spec']['containers'][0]['resources']['limits'] = {} + if pod.resources.request_memory: + req['spec']['containers'][0]['resources']['limits'][ + 'memory'] = pod.resources.limit_memory + if pod.resources.request_cpu: + req['spec']['containers'][0]['resources']['limits'][ + 'cpu'] = pod.resources.limit_cpu + + @staticmethod + def extract_init_containers(pod, req): + if pod.init_containers: + req['spec']['initContainers'] = pod.init_containers + + @staticmethod + def extract_service_account_name(pod, req): + if pod.service_account_name: + req['spec']['serviceAccountName'] = pod.service_account_name + + @staticmethod + def extract_image_pull_secrets(pod, req): + if pod.image_pull_secrets: + req['spec']['imagePullSecrets'] = [{ + 'name': pull_secret + } for pull_secret in pod.image_pull_secrets.split(',')] http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py new file mode 100644 index 0000000..3be1a13 --- /dev/null +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py @@ -0,0 +1,56 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and + +import yaml +from airflow.contrib.kubernetes.kubernetes_request_factory.kubernetes_request_factory \ + import KubernetesRequestFactory + + +class SimplePodRequestFactory(KubernetesRequestFactory): + """ + Request generator for a simple pod. + """ + _yaml = """apiVersion: v1 +kind: Pod +metadata: + name: name +spec: + containers: + - name: base + image: airflow-slave:latest + command: ["/usr/local/airflow/entrypoint.sh", "/bin/bash sleep 25"] + restartPolicy: Never + """ + + def __init__(self): + pass + + def create(self, pod): + # type: (Pod) -> dict + req = yaml.load(self._yaml) + self.extract_name(pod, req) + self.extract_labels(pod, req) + self.extract_image(pod, req) + self.extract_image_pull_policy(pod, req) + self.extract_cmds(pod, req) + self.extract_args(pod, req) + self.extract_node_selector(pod, req) + self.extract_env_and_secrets(pod, req) + self.extract_volume_secrets(pod, req) + self.attach_volumes(pod, req) + self.attach_volume_mounts(pod, req) + self.extract_resources(pod, req) + self.extract_service_account_name(pod, req) + self.extract_init_containers(pod, req) + self.extract_image_pull_secrets(pod, req) + return req http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/kubernetes/pod.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py new file mode 100644 index 0000000..6a9f76d --- /dev/null +++ b/airflow/contrib/kubernetes/pod.py @@ -0,0 +1,92 @@ + +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class Resources: + def __init__( + self, + request_memory=None, + request_cpu=None, + limit_memory=None, + limit_cpu=None): + self.request_memory = request_memory + self.request_cpu = request_cpu + self.limit_memory = limit_memory + self.limit_cpu = limit_cpu + + def is_empty_resource_request(self): + return not self.has_limits() and not self.has_requests() + + def has_limits(self): + return self.limit_cpu is not None or self.limit_memory is not None + + def has_requests(self): + return self.request_cpu is not None or self.request_memory is not None + + +class Pod: + """ + Represents a kubernetes pod and manages execution of a single pod. + :param image: The docker image + :type image: str + :param env: A dict containing the environment variables + :type env: dict + :param cmds: The command to be run on the pod + :type cmd: list str + :param secrets: Secrets to be launched to the pod + :type secrets: list Secret + :param result: The result that will be returned to the operator after + successful execution of the pod + :type result: any + """ + pod_timeout = 3600 + + def __init__( + self, + image, + envs, + cmds, + args=None, + secrets=None, + labels=None, + node_selectors=None, + name=None, + volumes=None, + volume_mounts=None, + namespace='default', + result=None, + image_pull_policy="IfNotPresent", + image_pull_secrets=None, + init_containers=None, + service_account_name=None, + resources=None + ): + self.image = image + self.envs = envs or {} + self.cmds = cmds + self.args = args or [] + self.secrets = secrets or [] + self.result = result + self.labels = labels or {} + self.name = name + self.volumes = volumes or [] + self.volume_mounts = volume_mounts or [] + self.node_selectors = node_selectors or [] + self.namespace = namespace + self.image_pull_policy = image_pull_policy + self.image_pull_secrets = image_pull_secrets + self.init_containers = init_containers + self.service_account_name = service_account_name + self.resources = resources or Resources() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/kubernetes/pod_generator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/pod_generator.py b/airflow/contrib/kubernetes/pod_generator.py new file mode 100644 index 0000000..685be37 --- /dev/null +++ b/airflow/contrib/kubernetes/pod_generator.py @@ -0,0 +1,278 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +from airflow.contrib.kubernetes.pod import Pod +import uuid + + +class PodGenerator: + """Contains Kubernetes Airflow Worker configuration logic""" + + def __init__(self, kube_config=None): + self.kube_config = kube_config + self.env_vars = {} + self.volumes = [] + self.volume_mounts = [] + self.init_containers = [] + self.secrets = [] + + def add_init_container(self, + name, + image, + securityContext, + init_environment, + volume_mounts + ): + """ + + Adds an init container to the launched pod. useful for pre- + + Args: + name (str): + image (str): + securityContext (dict): + init_environment (dict): + volume_mounts (dict): + + Returns: + + """ + self.init_containers.append( + { + 'name': name, + 'image': image, + 'securityContext': securityContext, + 'env': init_environment, + 'volumeMounts': volume_mounts + } + ) + + def _get_init_containers(self): + return self.init_containers + + def add_volume(self, name): + """ + + Args: + name (str): + + Returns: + + """ + self.volumes.append({'name': name}) + + def add_volume_with_configmap(self, name, config_map): + self.volumes.append( + { + 'name': name, + 'configMap': config_map + } + ) + + def add_mount(self, + name, + mount_path, + sub_path, + read_only): + """ + + Args: + name (str): + mount_path (str): + sub_path (str): + read_only: + + Returns: + + """ + + self.volume_mounts.append({ + 'name': name, + 'mountPath': mount_path, + 'subPath': sub_path, + 'readOnly': read_only + }) + + def _get_volumes_and_mounts(self): + return self.volumes, self.volume_mounts + + def _get_image_pull_secrets(self): + """Extracts any image pull secrets for fetching container(s)""" + if not self.kube_config.image_pull_secrets: + return [] + return self.kube_config.image_pull_secrets.split(',') + + def make_pod(self, namespace, image, pod_id, cmds, + arguments, labels, kube_executor_config=None): + volumes, volume_mounts = self._get_volumes_and_mounts() + worker_init_container_spec = self._get_init_containers() + + # resources = Resources( + # request_memory=kube_executor_config.request_memory, + # request_cpu=kube_executor_config.request_cpu, + # limit_memory=kube_executor_config.limit_memory, + # limit_cpu=kube_executor_config.limit_cpu + # ) + + return Pod( + namespace=namespace, + name=pod_id + "-" + str(uuid.uuid1())[:8], + image=image, + cmds=cmds, + args=arguments, + labels=labels, + envs=self.env_vars, + secrets={}, + # service_account_name=self.kube_config.worker_service_account_name, + # image_pull_secrets=self.kube_config.image_pull_secrets, + init_containers=worker_init_container_spec, + volumes=volumes, + volume_mounts=volume_mounts, + resources=None + ) + + +''' +This class is a necessary building block to the kubernetes executor, which will be PR'd +shortly +''' + + +class WorkerGenerator(PodGenerator): + def __init__(self, kube_config): + PodGenerator.__init__(self, kube_config) + self.volumes, self.volume_mounts = self._init_volumes_and_mounts() + self.init_containers = self._init_init_containers() + + def _init_volumes_and_mounts(self): + dags_volume_name = "airflow-dags" + dags_path = os.path.join(self.kube_config.dags_folder, + self.kube_config.git_subpath) + volumes = [{ + 'name': dags_volume_name + }] + volume_mounts = [{ + 'name': dags_volume_name, + 'mountPath': dags_path, + 'readOnly': True + }] + + # Mount the airflow.cfg file via a configmap the user has specified + if self.kube_config.airflow_configmap: + config_volume_name = "airflow-config" + config_path = '{}/airflow.cfg'.format(self.kube_config.airflow_home) + volumes.append({ + 'name': config_volume_name, + 'configMap': { + 'name': self.kube_config.airflow_configmap + } + }) + volume_mounts.append({ + 'name': config_volume_name, + 'mountPath': config_path, + 'subPath': 'airflow.cfg', + 'readOnly': True + }) + + # A PV with the DAGs should be mounted + if self.kube_config.dags_volume_claim: + volumes[0]['persistentVolumeClaim'] = { + "claimName": self.kube_config.dags_volume_claim} + if self.kube_config.dags_volume_subpath: + volume_mounts[0]["subPath"] = self.kube_config.dags_volume_subpath + else: + # Create a Shared Volume for the Git-Sync module to populate + volumes[0]["emptyDir"] = {} + return volumes, volume_mounts + + def _init_labels(self, dag_id, task_id, execution_date): + return { + "airflow-slave": "", + "dag_id": dag_id, + "task_id": task_id, + "execution_date": execution_date + }, + + def _get_environment(self): + env = super(self, WorkerGenerator).env_vars + """Defines any necessary environment variables for the pod executor""" + env['AIRFLOW__CORE__DAGS_FOLDER'] = '/tmp/dags' + env['AIRFLOW__CORE__EXECUTOR'] = 'LocalExecutor' + + if self.kube_config.airflow_configmap: + env['AIRFLOW__CORE__AIRFLOW_HOME'] = self.kube_config.airflow_home + return env + + def _init_init_containers(self, volume_mounts): + """When using git to retrieve the DAGs, use the GitSync Init Container""" + # If we're using volume claims to mount the dags, no init container is needed + if self.kube_config.dags_volume_claim: + return [] + + # Otherwise, define a git-sync init container + init_environment = [{ + 'name': 'GIT_SYNC_REPO', + 'value': self.kube_config.git_repo + }, { + 'name': 'GIT_SYNC_BRANCH', + 'value': self.kube_config.git_branch + }, { + 'name': 'GIT_SYNC_ROOT', + 'value': '/tmp' + }, { + 'name': 'GIT_SYNC_DEST', + 'value': 'dags' + }, { + 'name': 'GIT_SYNC_ONE_TIME', + 'value': 'true' + }] + if self.kube_config.git_user: + init_environment.append({ + 'name': 'GIT_SYNC_USERNAME', + 'value': self.kube_config.git_user + }) + if self.kube_config.git_password: + init_environment.append({ + 'name': 'GIT_SYNC_PASSWORD', + 'value': self.kube_config.git_password + }) + + volume_mounts[0]['readOnly'] = False + return [{ + 'name': self.kube_config.git_sync_init_container_name, + 'image': self.kube_config.git_sync_container, + 'securityContext': {'runAsUser': 0}, + 'env': init_environment, + 'volumeMounts': volume_mounts + }] + + def make_worker_pod(self, + namespace, + pod_id, + dag_id, + task_id, + execution_date, + airflow_command, + kube_executor_config): + cmds = ["bash", "-cx", "--"] + labels = self._init_labels(dag_id, task_id, execution_date) + PodGenerator.make_pod(self, + namespace=namespace, + pod_id=pod_id, + cmds=cmds, + arguments=airflow_command, + labels=labels, + kube_executor_config=kube_executor_config) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/kubernetes/pod_launcher.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/pod_launcher.py b/airflow/contrib/kubernetes/pod_launcher.py new file mode 100644 index 0000000..c910929 --- /dev/null +++ b/airflow/contrib/kubernetes/pod_launcher.py @@ -0,0 +1,119 @@ + +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import time +from airflow.utils.log.logging_mixin import LoggingMixin +from airflow.utils.state import State +from datetime import datetime as dt +from airflow.contrib.kubernetes.kubernetes_request_factory import \ + pod_request_factory as pod_fac +from kubernetes import watch +from kubernetes.client.rest import ApiException +from airflow import AirflowException + +from .kube_client import get_kube_client + + +class PodStatus(object): + PENDING = 'pending' + RUNNING = 'running' + FAILED = 'failed' + SUCCEEDED = 'succeeded' + + +class PodLauncher(LoggingMixin): + def __init__(self, kube_client=None): + super(PodLauncher, self).__init__() + self._client = kube_client or get_kube_client() + self._watch = watch.Watch() + self.kube_req_factory = pod_fac.SimplePodRequestFactory() + + def run_pod_async(self, pod): + req = self.kube_req_factory.create(pod) + self.log.debug('Pod Creation Request: \n{}'.format(json.dumps(req, indent=2))) + try: + resp = self._client.create_namespaced_pod(body=req, namespace=pod.namespace) + self.log.debug('Pod Creation Response: {}'.format(resp)) + except ApiException: + self.log.exception('Exception when attempting to create Namespaced Pod.') + raise + return resp + + def run_pod(self, pod, startup_timeout=120): + # type: (Pod) -> State + """ + Launches the pod synchronously and waits for completion. + + Args: + pod (Pod): + startup_timeout (int): Timeout for startup of the pod (if pod is pending for + too long, considers task a failure + """ + resp = self.run_pod_async(pod) + curr_time = dt.now() + if resp.status.start_time is None: + while self.pod_not_started(pod): + delta = dt.now() - curr_time + if delta.seconds >= startup_timeout: + raise AirflowException("Pod took too long to start") + time.sleep(1) + self.log.debug('Pod not yet started') + + final_status = self._monitor_pod(pod) + return final_status + + def _monitor_pod(self, pod): + # type: (Pod) -> State + + while self.pod_is_running(pod): + self.log.info("Pod {} has state {}".format(pod.name, State.RUNNING)) + time.sleep(2) + return self._task_status(self.read_pod(pod)) + + def _task_status(self, event): + # type: (V1Pod) -> State + self.log.info( + "Event: {} had an event of type {}".format(event.metadata.name, + event.status.phase)) + status = self.process_status(event.metadata.name, event.status.phase) + return status + + def pod_not_started(self, pod): + state = self._task_status(self.read_pod(pod)) + return state == State.QUEUED + + def pod_is_running(self, pod): + state = self._task_status(self.read_pod(pod)) + return state != State.SUCCESS and state != State.FAILED + + def read_pod(self, pod): + return self._client.read_namespaced_pod(pod.name, pod.namespace) + + def process_status(self, job_id, status): + status = status.lower() + if status == PodStatus.PENDING: + return State.QUEUED + elif status == PodStatus.FAILED: + self.log.info("Event: {} Failed".format(job_id)) + return State.FAILED + elif status == PodStatus.SUCCEEDED: + self.log.info("Event: {} Succeeded".format(job_id)) + return State.SUCCESS + elif status == PodStatus.RUNNING: + return State.RUNNING + else: + self.log.info("Event: Invalid state {} on job {}".format(status, job_id)) + return State.FAILED http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/kubernetes/secret.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/secret.py b/airflow/contrib/kubernetes/secret.py new file mode 100644 index 0000000..15f070e --- /dev/null +++ b/airflow/contrib/kubernetes/secret.py @@ -0,0 +1,36 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class Secret: + """Defines Kubernetes Secret Volume""" + + def __init__(self, deploy_type, deploy_target, secret, key): + """Initialize a Kubernetes Secret Object. Used to track requested secrets from + the user. + + :param deploy_type: The type of secret deploy in Kubernetes, either `env` or + `volume` + :type deploy_type: ``str`` + :param deploy_target: The environment variable to be created in the worker. + :type deploy_target: ``str`` + :param secret: Name of the secrets object in Kubernetes + :type secret: ``str`` + :param key: Key of the secret within the Kubernetes Secret + :type key: ``str`` + """ + self.deploy_type = deploy_type + self.deploy_target = deploy_target.upper() + self.secret = secret + self.key = key http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/operators/kubernetes_pod_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py new file mode 100644 index 0000000..f09a25c --- /dev/null +++ b/airflow/contrib/operators/kubernetes_pod_operator.py @@ -0,0 +1,71 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from airflow.exceptions import AirflowException +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults +from airflow.contrib.kubernetes import kube_client, pod_generator, pod_launcher +from airflow.utils.state import State + +template_fields = ('templates_dict',) +template_ext = tuple() +ui_color = '#ffefeb' + + +class KubernetesPodOperator(BaseOperator): + def execute(self, context): + try: + + client = kube_client.get_kube_client(in_cluster=self.in_cluster) + gen = pod_generator.PodGenerator() + + pod = gen.make_pod(namespace=self.namespace, + image=self.image, + pod_id=self.name, + cmds=self.cmds, + arguments=self.arguments, + labels=self.labels, + kube_executor_config=self.kube_executor_config + ) + + launcher = pod_launcher.PodLauncher(client) + final_state = launcher.run_pod(pod, self.startup_timeout_seconds) + if final_state != State.SUCCESS: + raise AirflowException("Pod returned a failure") + except AirflowException as ex: + raise AirflowException("Pod Launching failed: {error}".format(error=ex)) + + @apply_defaults + def __init__(self, + namespace, + image, + cmds, + arguments, + name, + in_cluster=False, + labels=None, + startup_timeout_seconds=120, + kube_executor_config=None, + *args, + **kwargs): + super(KubernetesPodOperator, self).__init__(*args, **kwargs) + self.kube_executor_config = kube_executor_config or {} + self.image = image + self.namespace = namespace + self.cmds = cmds + self.arguments = arguments + self.labels = labels or {} + self.startup_timeout_seconds = startup_timeout_seconds + self.name = name + self.in_cluster = in_cluster http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/scripts/ci/kubernetes/minikube/start_minikube.sh ---------------------------------------------------------------------- diff --git a/scripts/ci/kubernetes/minikube/start_minikube.sh b/scripts/ci/kubernetes/minikube/start_minikube.sh new file mode 100755 index 0000000..f78cb3a --- /dev/null +++ b/scripts/ci/kubernetes/minikube/start_minikube.sh @@ -0,0 +1,53 @@ +# Licensed to the Apache Software Foundation (ASF) under one * +# or more contributor license agreements. See the NOTICE file * +# distributed with this work for additional information * +# regarding copyright ownership. The ASF licenses this file * +# to you under the Apache License, Version 2.0 (the * +# "License"); you may not use this file except in compliance * +# with the License. You may obtain a copy of the License at * +# * +# http://www.apache.org/licenses/LICENSE-2.0 * +# * +# Unless required by applicable law or agreed to in writing, * +# software distributed under the License is distributed on an * +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * +# KIND, either express or implied. See the License for the * +# specific language governing permissions and limitations * +# under the License. * + +# Guard against a kubernetes cluster already being up +#!/usr/bin/env bash +kubectl get pods &> /dev/null +if [ $? -eq 0 ]; then + echo "kubectl get pods returned 0 exit code, exiting early" + exit 0 +fi +# + +curl -Lo minikube https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64 && chmod +x minikube +curl -Lo kubectl https://storage.googleapis.com/kubernetes-release/release/v1.7.0/bin/linux/amd64/kubectl && chmod +x kubectl + +sudo mkdir -p /usr/local/bin +sudo mv minikube /usr/local/bin/minikube +sudo mv kubectl /usr/local/bin/kubectl + +export MINIKUBE_WANTUPDATENOTIFICATION=false +export MINIKUBE_WANTREPORTERRORPROMPT=false +export MINIKUBE_HOME=$HOME +export CHANGE_MINIKUBE_NONE_USER=true +mkdir $HOME/.kube || true +touch $HOME/.kube/config + +export KUBECONFIG=$HOME/.kube/config +sudo -E minikube start --vm-driver=none + +# this for loop waits until kubectl can access the api server that minikube has created +for i in {1..150} # timeout for 5 minutes +do + echo "------- Running kubectl get pods -------" + kubectl get po &> /dev/null + if [ $? -ne 1 ]; then + break + fi + sleep 2 +done http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/scripts/ci/kubernetes/setup_kubernetes.sh ---------------------------------------------------------------------- diff --git a/scripts/ci/kubernetes/setup_kubernetes.sh b/scripts/ci/kubernetes/setup_kubernetes.sh new file mode 100755 index 0000000..fa4e523 --- /dev/null +++ b/scripts/ci/kubernetes/setup_kubernetes.sh @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one * +# or more contributor license agreements. See the NOTICE file * +# distributed with this work for additional information * +# regarding copyright ownership. The ASF licenses this file * +# to you under the Apache License, Version 2.0 (the * +# "License"); you may not use this file except in compliance * +# with the License. You may obtain a copy of the License at * +# * +# http://www.apache.org/licenses/LICENSE-2.0 * +# * +# Unless required by applicable law or agreed to in writing, * +# software distributed under the License is distributed on an * +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * +# KIND, either express or implied. See the License for the * +# specific language governing permissions and limitations * +# under the License. * + +set -o xtrace +set -e + +echo "This script downloads minikube, starts a driver=None minikube cluster, builds the airflow source and docker image, and then deploys airflow onto kubernetes" +echo "For development, start minikube yourself (ie: minikube start) then run this script as you probably do not want a driver=None minikube cluster" + +DIRNAME=$(cd "$(dirname "$0")"; pwd) + +$DIRNAME/minikube/start_minikube.sh + +echo "Airflow environment on kubernetes is good to go!" http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/scripts/ci/requirements.txt ---------------------------------------------------------------------- diff --git a/scripts/ci/requirements.txt b/scripts/ci/requirements.txt index 2b5a8c9..b6ed49c 100644 --- a/scripts/ci/requirements.txt +++ b/scripts/ci/requirements.txt @@ -93,3 +93,4 @@ thrift thrift_sasl unicodecsv zdesk +kubernetes http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/scripts/ci/travis_script.sh ---------------------------------------------------------------------- diff --git a/scripts/ci/travis_script.sh b/scripts/ci/travis_script.sh new file mode 100755 index 0000000..a51e742 --- /dev/null +++ b/scripts/ci/travis_script.sh @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one * +# or more contributor license agreements. See the NOTICE file * +# distributed with this work for additional information * +# regarding copyright ownership. The ASF licenses this file * +# to you under the Apache License, Version 2.0 (the * +# "License"); you may not use this file except in compliance * +# with the License. You may obtain a copy of the License at * +# * +# http://www.apache.org/licenses/LICENSE-2.0 * +# * +# Unless required by applicable law or agreed to in writing, * +# software distributed under the License is distributed on an * +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * +# KIND, either express or implied. See the License for the * +# specific language governing permissions and limitations * +# under the License. * + +DIRNAME=$(cd "$(dirname "$0")"; pwd) +AIRFLOW_ROOT="$DIRNAME/../.." +cd $AIRFLOW_ROOT && pip --version && ls -l $HOME/.wheelhouse && tox --version + +if [ -z "$RUN_KUBE_INTEGRATION" ]; +then + $DIRNAME/kubernetes/setup_kubernetes.sh + tox -e $TOX_ENV +else + $DIRNAME/kubernetes/setup_kubernetes.sh && \ + tox -e $TOX_ENV -- tests.contrib.executors.integration \ + --with-coverage \ + --cover-erase \ + --cover-html \ + --cover-package=airflow \ + --cover-html-dir=airflow/www/static/coverage \ + --with-ignore-docstrings \ + --rednose \ + --with-timer \ + -v \ + --logging-level=DEBUG +fi http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/setup.py ---------------------------------------------------------------------- diff --git a/setup.py b/setup.py index 77e79bf..ba5b031 100644 --- a/setup.py +++ b/setup.py @@ -162,6 +162,8 @@ github_enterprise = ['Flask-OAuthlib>=0.9.1'] qds = ['qds-sdk>=1.9.6'] cloudant = ['cloudant>=0.5.9,<2.0'] # major update coming soon, clamp to 0.x redis = ['redis>=2.10.5'] +kubernetes = ['kubernetes>=3.0.0', + 'cryptography>=2.0.0'] all_dbs = postgres + mysql + hive + mssql + hdfs + vertica + cloudant devel = [ @@ -182,7 +184,8 @@ devel = [ ] devel_minreq = devel + mysql + doc + password + s3 + cgroups devel_hadoop = devel_minreq + hive + hdfs + webhdfs + kerberos -devel_all = devel + all_dbs + doc + samba + s3 + slack + crypto + oracle + docker + ssh +devel_all = (devel + all_dbs + doc + samba + s3 + slack + crypto + oracle + docker + ssh + + kubernetes) def do_setup(): @@ -278,6 +281,7 @@ def do_setup(): 'webhdfs': webhdfs, 'jira': jira, 'redis': redis, + 'kubernetes': kubernetes }, classifiers=[ 'Development Status :: 5 - Production/Stable', http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/tests/contrib/operators/test_kubernetes_pod_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_kubernetes_pod_operator.py b/tests/contrib/operators/test_kubernetes_pod_operator.py new file mode 100644 index 0000000..205f183 --- /dev/null +++ b/tests/contrib/operators/test_kubernetes_pod_operator.py @@ -0,0 +1,69 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import unittest +from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator +from airflow import AirflowException + + +class KubernetesPodOperatorTest(unittest.TestCase): + + def test_working_pod(self): + k = KubernetesPodOperator(namespace='default', + image="ubuntu:16.04", + cmds=["bash", "-cx"], + arguments=["echo", "10"], + labels={"foo": "bar"}, + name="test", + task_id="task" + ) + + k.execute(None) + + def test_faulty_image(self): + bad_image_name = "foobar" + k = KubernetesPodOperator(namespace='default', + image=bad_image_name, + cmds=["bash", "-cx"], + arguments=["echo", "10"], + labels={"foo": "bar"}, + name="test", + task_id="task", + startup_timeout_seconds=5 + ) + with self.assertRaises(AirflowException) as cm: + k.execute(None), + + print("exception: {}".format(cm)) + + def test_pod_failure(self): + """ + Tests that the task fails when a pod reports a failure + """ + + bad_internal_command = "foobar" + k = KubernetesPodOperator(namespace='default', + image="ubuntu:16.04", + cmds=["bash", "-cx"], + arguments=[bad_internal_command, "10"], + labels={"foo": "bar"}, + name="test", + task_id="task" + ) + + with self.assertRaises(AirflowException): + k.execute(None)
