[AIRFLOW-1314] Use VolumeClaim for transporting DAGs - fix issue where watcher process randomly dies - fixed alembic head, was pointing to two tips
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a9d90dc9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a9d90dc9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a9d90dc9 Branch: refs/heads/master Commit: a9d90dc9a5bb251e1490390b6dfba309ecef48a8 Parents: 29daa58 Author: grantnicholas <[email protected]> Authored: Thu Sep 7 12:04:36 2017 -0500 Committer: Fokko Driesprong <[email protected]> Committed: Sun Apr 22 10:22:44 2018 +0200 ---------------------------------------------------------------------- .gitignore | 4 +- .travis.yml | 38 +- .../contrib/executors/kubernetes_executor.py | 369 ++++++++++++++----- airflow/contrib/kubernetes/kube_client.py | 38 +- .../kubernetes_request_factory/__init__.py | 3 - .../pod_request_factory.py | 3 +- airflow/contrib/kubernetes/pod_launcher.py | 15 +- airflow/executors/base_executor.py | 5 +- ...ff4_add_kubernetes_resource_checkpointing.py | 50 +++ airflow/models.py | 45 ++- airflow/plugins_manager.py | 1 - docker/Dockerfile | 40 -- docker/bootstrap.sh | 13 - docker/build.sh | 12 - kube/airflow.yaml.template | 103 ------ kube/deploy.sh | 6 - kube/postgres.yaml | 94 ----- scripts/ci/kubernetes/docker/Dockerfile | 53 +++ scripts/ci/kubernetes/docker/bootstrap.sh | 29 ++ scripts/ci/kubernetes/docker/build.sh | 29 ++ .../ci/kubernetes/kube/airflow.yaml.template | 165 +++++++++ scripts/ci/kubernetes/kube/deploy.sh | 42 +++ scripts/ci/kubernetes/kube/postgres.yaml | 111 ++++++ .../ci/kubernetes/minikube/start_minikube.sh | 88 ++--- scripts/ci/kubernetes/setup_kubernetes.sh | 2 + scripts/ci/run_tests.sh | 10 +- scripts/ci/travis_script.sh | 6 +- tests/contrib/__init__.py | 1 - tests/contrib/executors/__init__.py | 13 + tests/contrib/executors/integration/__init__.py | 13 + .../executors/integration/airflow_controller.py | 114 ++++++ .../test_kubernetes_executor_integration.py | 57 +++ .../executors/test_kubernetes_executor.py | 71 ++++ tst.txt | 0 34 files changed, 1134 insertions(+), 509 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 46df849..638ab19 100644 --- a/.gitignore +++ b/.gitignore @@ -134,5 +134,5 @@ rat-results.txt # Git stuff .gitattributes # Kubernetes generated templated files -kube/.generated/ -airflow.tar.gz +*.generated +*.tar.gz http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index 883473d..ec2d44d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -51,15 +51,20 @@ env: # does not work with python 3 - BOTO_CONFIG=/tmp/bogusvalue matrix: - - TOX_ENV=py27-backend_mysql - - TOX_ENV=py27-backend_sqlite - - TOX_ENV=py27-backend_postgres - - TOX_ENV=py35-backend_mysql - - TOX_ENV=py35-backend_sqlite - - TOX_ENV=py35-backend_postgres - - TOX_ENV=flake8 - - TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.7.0 - - TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0 + - TOX_ENV=py27-cdh-airflow_backend_mysql + - TOX_ENV=py27-cdh-airflow_backend_sqlite + - TOX_ENV=py27-cdh-airflow_backend_postgres +# - TOX_ENV=py27-hdp-airflow_backend_mysql +# - TOX_ENV=py27-hdp-airflow_backend_sqlite +# - TOX_ENV=py27-hdp-airflow_backend_postgres + - TOX_ENV=py34-cdh-airflow_backend_mysql + - TOX_ENV=py34-cdh-airflow_backend_sqlite + - TOX_ENV=py34-cdh-airflow_backend_postgres +# - TOX_ENV=py34-hdp-airflow_backend_mysql +# - TOX_ENV=py34-hdp-airflow_backend_sqlite +# - TOX_ENV=py34-hdp-airflow_backend_postgres + # Run integration tests on minikube for the KubernetesExecutor + - TOX_ENV=py27-cdh-airflow_backend_postgres RUN_KUBE_INTEGRATION=true matrix: exclude: - python: "3.5" @@ -75,14 +80,15 @@ matrix: - python: "2.7" env: TOX_ENV=py35-backend_postgres - python: "2.7" - env: TOX_ENV=flake8 - - python: "3.5" - env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.7.0 - - python: "3.5" - env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0 + env: TOX_ENV=py34-hdp-airflow_backend_mysql + - python: "2.7" + env: TOX_ENV=py34-hdp-airflow_backend_sqlite + - python: "2.7" + env: TOX_ENV=py34-hdp-airflow_backend_postgres + - python: "3.4" + env: TOX_ENV=py27-cdh-airflow_backend_postgres RUN_KUBE_INTEGRATION=true allow_failures: - - env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.7.0 - - env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0 + - env: TOX_ENV=py27-cdh-airflow_backend_postgres RUN_KUBE_INTEGRATION=true cache: directories: - $HOME/.wheelhouse/ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/airflow/contrib/executors/kubernetes_executor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py index 63aa696..8989add 100644 --- a/airflow/contrib/executors/kubernetes_executor.py +++ b/airflow/contrib/executors/kubernetes_executor.py @@ -14,64 +14,188 @@ import calendar import logging -import time import os import multiprocessing from queue import Queue -from datetime import datetime -from kubernetes import watch -from airflow import settings +from dateutil import parser +from uuid import uuid4 +from kubernetes import watch, client +from kubernetes.client.rest import ApiException from airflow.contrib.kubernetes.pod_launcher import PodLauncher +from airflow.contrib.kubernetes.kube_client import get_kube_client from airflow.executors.base_executor import BaseExecutor -from airflow.models import TaskInstance -from airflow.contrib.kubernetes.pod import Pod +from airflow.models import TaskInstance, KubeResourceVersion from airflow.utils.state import State -from airflow import configuration -from kubernetes import client +from airflow import configuration, settings +from airflow.exceptions import AirflowConfigException +from airflow.contrib.kubernetes.pod import Pod + class KubeConfig: + core_section = "core" + kubernetes_section = "kubernetes" + + @staticmethod + def safe_get(section, option, default): + try: + return configuration.get(section, option) + except AirflowConfigException: + return default + + @staticmethod + def safe_getboolean(section, option, default): + try: + return configuration.getboolean(section, option) + except AirflowConfigException: + return default + def __init__(self): - self.kube_image = configuration.get('core', 'k8s_image') - self.git_repo = configuration.get('core', 'k8s_git_repo') - self.git_branch = configuration.get('core', 'k8s_git_branch') + self.dags_folder = configuration.get(self.core_section, 'dags_folder') + self.parallelism = configuration.getint(self.core_section, 'PARALLELISM') + self.kube_image = configuration.get(self.kubernetes_section, 'container_image') + self.delete_worker_pods = self.safe_getboolean(self.kubernetes_section, 'delete_worker_pods', True) + self.kube_namespace = os.environ.get('AIRFLOW_KUBE_NAMESPACE', 'default') + + # These two props must be set together + self.git_repo = self.safe_get(self.kubernetes_section, 'git_repo', None) + self.git_branch = self.safe_get(self.kubernetes_section, 'git_branch', None) + + # Or this one prop + self.dags_volume_claim = self.safe_get(self.kubernetes_section, 'dags_volume_claim', None) + # And optionally this prop + self.dags_volume_subpath = self.safe_get(self.kubernetes_section, 'dags_volume_subpath', None) + + self._validate() + + def _validate(self): + if self.dags_volume_claim: + # do volume things + pass + elif self.git_repo and self.git_branch: + # do git things + pass + else: + raise AirflowConfigException( + "In kubernetes mode you must set the following configs in the `kubernetes` section: " + "`dags_volume_claim` or " + "`git_repo and git_branch`" + ) + + +class PodMaker: + def __init__(self, kube_config): + self.logger = logging.getLogger(__name__) + self.kube_config = kube_config + + def _get_volumes_and_mounts(self): + volume_name = "airflow-dags" + + if self.kube_config.dags_volume_claim: + volumes = [{ + "name": volume_name, "persistentVolumeClaim": {"claimName": self.kube_config.dags_volume_claim} + }] + volume_mounts = [{ + "name": volume_name, "mountPath": self.kube_config.dags_folder, + "readOnly": True + }] + if self.kube_config.dags_volume_subpath: + volume_mounts[0]["subPath"] = self.kube_config.dags_volume_subpath + + return volumes, volume_mounts + else: + return [], [] + + def _get_args(self, airflow_command): + if self.kube_config.dags_volume_claim: + self.logger.info("Using k8s_dags_volume_claim for airflow dags") + return [airflow_command] + else: + self.logger.info("Using git-syncher for airflow dags") + cmd_args = "mkdir -p {dags_folder} && cd {dags_folder} &&" \ + "git init && git remote add origin {git_repo} && git pull origin {git_branch} --depth=1 &&" \ + "{command}".format(dags_folder=self.kube_config.dags_folder, git_repo=self.kube_config.git_repo, + git_branch=self.kube_config.git_branch, command=airflow_command) + return [cmd_args] + + def make_pod(self, namespace, pod_id, dag_id, task_id, execution_date, airflow_command): + volumes, volume_mounts = self._get_volumes_and_mounts() + + pod = Pod( + namespace=namespace, + name=pod_id, + image=self.kube_config.kube_image, + cmds=["bash", "-cx", "--"], + args=self._get_args(airflow_command), + labels={ + "airflow-slave": "", + "dag_id": dag_id, + "task_id": task_id, + "execution_date": execution_date + }, + envs={"AIRFLOW__CORE__EXECUTOR": "LocalExecutor"}, + volumes=volumes, + volume_mounts=volume_mounts + ) + return pod class KubernetesJobWatcher(multiprocessing.Process, object): - def __init__(self, watch_function, namespace, watcher_queue): + def __init__(self, namespace, watcher_queue, resource_version): self.logger = logging.getLogger(__name__) multiprocessing.Process.__init__(self) - self._watch_function = watch_function - self._watch = watch.Watch() self.namespace = namespace self.watcher_queue = watcher_queue + self.resource_version = resource_version def run(self): - self.logger.info("Event: and now my watch begins") - for event in self._watch.stream(self._watch_function, self.namespace, - label_selector='airflow-slave'): + kube_client = get_kube_client() + while True: + try: + self.resource_version = self._run(kube_client, self.resource_version) + except Exception: + self.logger.exception("Unknown error in KubernetesJobWatcher. Failing") + raise + else: + self.logger.warn("Watch died gracefully, starting back up with: " + "last resource_version: {}".format(self.resource_version)) + + def _run(self, kube_client, resource_version): + self.logger.info("Event: and now my watch begins starting at resource_version: {}".format(resource_version)) + watcher = watch.Watch() + + kwargs = {"label_selector": "airflow-slave"} + if resource_version: + kwargs["resource_version"] = resource_version + + last_resource_version = None + for event in watcher.stream(kube_client.list_namespaced_pod, self.namespace, **kwargs): task = event['object'] - self.logger.info("Event: {} had an event of type {}".format(task.metadata.name, - event['type'])) - self.process_status(task.metadata.name, task.status.phase, task.metadata.labels) + self.logger.info("Event: {} had an event of type {}".format(task.metadata.name, event['type'])) + self.process_status( + task.metadata.name, task.status.phase, task.metadata.labels, task.metadata.resource_version + ) + last_resource_version = task.metadata.resource_version - def process_status(self, job_id, status, labels): + return last_resource_version + + def process_status(self, pod_id, status, labels, resource_version): if status == 'Pending': - self.logger.info("Event: {} Pending".format(job_id)) + self.logger.info("Event: {} Pending".format(pod_id)) elif status == 'Failed': - self.logger.info("Event: {} Failed".format(job_id)) - self.watcher_queue.put((job_id, State.FAILED, labels)) + self.logger.info("Event: {} Failed".format(pod_id)) + self.watcher_queue.put((pod_id, State.FAILED, labels, resource_version)) elif status == 'Succeeded': - self.logger.info("Event: {} Succeeded".format(job_id)) - self.watcher_queue.put((job_id, None, labels)) + self.logger.info("Event: {} Succeeded".format(pod_id)) + self.watcher_queue.put((pod_id, None, labels, resource_version)) elif status == 'Running': - # self.logger.info("Event: {} is Running".format(job_id)) - self.watcher_queue.put((job_id, State.RUNNING)) + self.logger.info("Event: {} is Running".format(pod_id)) else: - self.logger.info("Event: Invalid state: {} on job: {} with labels: {}".format(status, job_id, labels)) + self.logger.warn("Event: Invalid state: {} on pod: {} with labels: {} " + "with resource_version: {}".format(status, pod_id, labels, resource_version)) class AirflowKubernetesScheduler(object): - def __init__(self, task_queue, result_queue): + def __init__(self, kube_config, task_queue, result_queue, session, kube_client): self.logger = logging.getLogger(__name__) self.logger.info("creating kubernetes executor") self.kube_config = KubeConfig() @@ -79,11 +203,25 @@ class AirflowKubernetesScheduler(object): self.pending_jobs = set() self.namespace = os.environ['k8s_POD_NAMESPACE'] self.logger.info("k8s: using namespace {}".format(self.namespace)) - self.result_queue = result_queue + self.kube_client = kube_client + self.launcher = PodLauncher(kube_client=self.kube_client) + self.pod_maker = PodMaker(kube_config=self.kube_config) self.watcher_queue = multiprocessing.Queue() - self.helper = KubernetesHelper() - w = KubernetesJobWatcher(self.helper.pod_api.list_namespaced_pod, self.namespace, self.watcher_queue) - w.start() + self._session = session + self.kube_watcher = self._make_kube_watcher() + + def _make_kube_watcher(self): + resource_version = KubeResourceVersion.get_current_resource_version(self._session) + watcher = KubernetesJobWatcher(self.namespace, self.watcher_queue, resource_version) + watcher.start() + return watcher + + def _health_check_kube_watcher(self): + if self.kube_watcher.is_alive(): + pass + else: + self.logger.error("Error while health checking kube watcher process. Process died for unknown reasons") + self.kube_watcher = self._make_kube_watcher() def run_next(self, next_job): """ @@ -99,36 +237,24 @@ class AirflowKubernetesScheduler(object): self.logger.info('k8s: job is {}'.format(str(next_job))) key, command = next_job dag_id, task_id, execution_date = key - self.logger.info("running for command {}".format(command)) - cmd_args = "mkdir -p $AIRFLOW_HOME/dags/synched/git && cd $AIRFLOW_HOME/dags/synched/git &&" \ - "git init && git remote add origin {git_repo} && git pull origin {git_branch} --depth=1 &&" \ - "{command}".format(git_repo=self.kube_config.git_repo, git_branch=self.kube_config.git_branch, - command=command) - pod_id = self._create_job_id_from_key(key=key) - pod = KubernetesPodBuilder( - image=self.kube_config.kube_image, - cmds=["bash", "-cx", "--"], - args=[cmd_args], - kub_req_factory=SimplePodRequestFactory(), - namespace=self.namespace + self.logger.info("k8s: running for command {}".format(command)) + self.logger.info("k8s: launching image {}".format(self.kube_config.kube_image)) + pod = self.pod_maker.make_pod( + namespace=self.namespace, pod_id=self._create_pod_id(dag_id, task_id), + dag_id=dag_id, task_id=task_id, execution_date=self._datetime_to_label_safe_datestring(execution_date), + airflow_command=command ) - pod.set_image_pull_policy("IfNotPresent") - pod.add_env_variables({"AIRFLOW__CORE__EXECUTOR": "LocalExecutor"}) - pod.add_name(pod_id) - pod.add_labels({ - "dag_id": dag_id, - "task_id": task_id, - "execution_date": self._datetime_to_label_safe_datestring(execution_date) - }) - pod.launch() - # the watcher will monitor pods, so we do not block. self.launcher.run_pod_async(pod) self.logger.info("k8s: Job created!") - def delete_job(self, key): - job_id = self._create_job_id_from_key(key) - self.helper.delete_pod(job_id, namespace=self.namespace) + def delete_pod(self, pod_id): + if self.kube_config.delete_worker_pods: + try: + self.kube_client.delete_namespaced_pod(pod_id, self.namespace, body=client.V1DeleteOptions()) + except ApiException as e: + if e.status != 404: + raise def sync(self): """ @@ -140,24 +266,53 @@ class AirflowKubernetesScheduler(object): :return: """ + self._health_check_kube_watcher() while not self.watcher_queue.empty(): self.process_watcher_task() - def end_task(self): - job_id, state, labels = self.watcher_queue.get() - logging.info("Attempting to finish job; job_id: {}; state: {}; labels: {}".format(job_id, state, labels)) + def process_watcher_task(self): + pod_id, state, labels, resource_version = self.watcher_queue.get() + logging.info("Attempting to finish pod; pod_id: {}; state: {}; labels: {}".format(pod_id, state, labels)) key = self._labels_to_key(labels) if key: self.logger.info("finishing job {}".format(key)) - self.result_queue.put((key, state)) + self.result_queue.put((key, state, pod_id, resource_version)) + + @staticmethod + def _strip_unsafe_kubernetes_special_chars(string): + """ + Kubernetes only supports lowercase alphanumeric characters and "-" and "." in the pod name + However, there are special rules about how "-" and "." can be used so let's only keep alphanumeric chars + see here for detail: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/ + :param string: + :return: + """ + return ''.join(ch.lower() for ind, ch in enumerate(string) if ch.isalnum()) + + @staticmethod + def _make_safe_pod_id(safe_dag_id, safe_task_id, safe_uuid): + """ + Kubernetes pod names must be <= 253 chars and must pass the following regex for validation + "^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$" + :param safe_dag_id: a dag_id with only alphanumeric characters + :param safe_task_id: a task_id with only alphanumeric characters + :param random_uuid: a uuid + :return: + """ + MAX_POD_ID_LEN = 253 + + safe_key = safe_dag_id + safe_task_id + + safe_pod_id = safe_key[:MAX_POD_ID_LEN-len(safe_uuid)-1] + "-" + safe_uuid + + return safe_pod_id @staticmethod - def _create_job_id_from_key(key): - keystr = '-'.join([str(x).replace(' ', '-') for x in key[:2]]) - job_fields = [keystr] - unformatted_job_id = '-'.join(job_fields) - job_id = unformatted_job_id.replace('_', '-') - return job_id + def _create_pod_id(dag_id, task_id): + safe_dag_id = AirflowKubernetesScheduler._strip_unsafe_kubernetes_special_chars(dag_id) + safe_task_id = AirflowKubernetesScheduler._strip_unsafe_kubernetes_special_chars(task_id) + safe_uuid = AirflowKubernetesScheduler._strip_unsafe_kubernetes_special_chars(uuid4().hex) + return AirflowKubernetesScheduler._make_safe_pod_id(safe_dag_id, safe_task_id, safe_uuid) @staticmethod def _label_safe_datestring_to_datetime(string): @@ -166,7 +321,7 @@ class AirflowKubernetesScheduler(object): :param string: string :return: datetime.datetime object """ - return datetime.strptime(string.replace("_", ":"), "%Y-%m-%dT%H:%M:%S") + return parser.parse(string.replace("_", ":")) @staticmethod def _datetime_to_label_safe_datestring(datetime_obj): @@ -193,40 +348,74 @@ class KubernetesExecutor(BaseExecutor): self.task_queue = None self._session = None self.result_queue = None - self.pending_tasks = None - self.kub_client = None + self.kube_scheduler = None + self.kube_client = None + super(KubernetesExecutor, self).__init__(parallelism=self.kube_config.parallelism) + + def clear_not_launched_queued_tasks(self): + """ + If the airflow scheduler restarts with pending "Queued" tasks, the tasks may or may not have been launched + Thus, on starting up the scheduler let's check every "Queued" task to see if it has been launched + (ie: if there is a corresponding pod on kubernetes) + If it has been launched then do nothing, otherwise reset the state to "None" so the task will be rescheduled + This will not be necessary in a future version of airflow in which there is proper support for State.LAUNCHED + :return: None + """ + queued_tasks = self._session.query(TaskInstance).filter(TaskInstance.state == State.QUEUED).all() + self.logger.info("When executor started up, found {} queued task instances".format(len(queued_tasks))) + + for t in queued_tasks: + kwargs = dict(label_selector="dag_id={},task_id={},execution_date={}".format( + t.dag_id, t.task_id, AirflowKubernetesScheduler._datetime_to_label_safe_datestring(t.execution_date) + )) + pod_list = self.kube_client.list_namespaced_pod(self.kube_config.kube_namespace, **kwargs) + if len(pod_list.items) == 0: + self.logger.info("TaskInstance: {} found in queued state but was not launched, rescheduling".format(t)) + self._session.query(TaskInstance).filter( + TaskInstance.dag_id == t.dag_id, + TaskInstance.task_id == t.task_id, + TaskInstance.execution_date == t.execution_date + ).update({TaskInstance.state: State.NONE}) + + self._session.commit() def start(self): self.logger.info('k8s: starting kubernetes executor') self._session = settings.Session() self.task_queue = Queue() self.result_queue = Queue() - self.kub_client = AirflowKubernetesScheduler(self.task_queue, self.result_queue) + self.kube_client = get_kube_client() + self.kube_scheduler = AirflowKubernetesScheduler( + self.kube_config, self.task_queue, self.result_queue, self._session, self.kube_client + ) + self.clear_not_launched_queued_tasks() + + def execute_async(self, key, command, queue=None): + self.logger.info("k8s: adding task {} with command {}".format(key, command)) + self.task_queue.put((key, command)) def sync(self): - self.kub_client.sync() + self.logger.info("self.running: {}".format(self.running)) + self.logger.info("self.queued: {}".format(self.queued_tasks)) + self.kube_scheduler.sync() + + last_resource_version = None while not self.result_queue.empty(): results = self.result_queue.get() - self.logger.info("reporting {}".format(results)) - self.change_state(*results) + key, state, pod_id, resource_version = results + last_resource_version = resource_version + self.logger.info("Changing state of {}".format(results)) + self._change_state(key, state, pod_id) - if not self.task_queue.empty(): - (key, command) = self.task_queue.get() - self.kub_client.run_next((key, command)) + KubeResourceVersion.checkpoint_resource_version(last_resource_version, session=self._session) - def job_queue_full(self): - return len(self.kub_client.current_jobs) > PARALLELISM - - def cluster_at_capacity(self): - return len(self.pending_tasks) > 5 - - def terminate(self): - pass + if not self.task_queue.empty(): + key, command = self.task_queue.get() + self.kube_scheduler.run_next((key, command)) - def change_state(self, key, state): - self.logger.info("k8s: setting state of {} to {}".format(key, state)) + def _change_state(self, key, state, pod_id): if state != State.RUNNING: - self.kub_client.delete_job(key) + self.kube_scheduler.delete_pod(pod_id) try: self.running.pop(key) except KeyError: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/airflow/contrib/kubernetes/kube_client.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kube_client.py b/airflow/contrib/kubernetes/kube_client.py index d1a63a2..b01e14d 100644 --- a/airflow/contrib/kubernetes/kube_client.py +++ b/airflow/contrib/kubernetes/kube_client.py @@ -1,31 +1,25 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at +# -*- coding: utf-8 -*- # -# http://www.apache.org/licenses/LICENSE-2.0 +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at # -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. -def _load_kube_config(in_cluster): +def get_kube_client(in_cluster=True): + # TODO: This should also allow people to point to a cluster. + from kubernetes import config, client + if in_cluster: config.load_incluster_config() return client.CoreV1Api() else: - config.load_kube_config() - return client.CoreV1Api() - - -def get_kube_client(in_cluster=True): - # TODO: This should also allow people to point to a cluster. - return _load_kube_config(in_cluster) + NotImplementedError("Running kubernetes jobs from not within the cluster is not supported at this time") http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py b/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py index d2344a2..9921696 100644 --- a/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py @@ -10,6 +10,3 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and - -from .kubernetes_request_factory import * -from .pod_request_factory import * http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py index d013016..89631e0 100644 --- a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py @@ -11,10 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -import kubernetes_request_factory as kreq import yaml +import airflow.contrib.kubernetes.kubernetes_request_factory.kubernetes_request_factory as kreq from airflow.contrib.kubernetes.pod import Pod -from airflow import AirflowException class SimplePodRequestFactory(kreq.KubernetesRequestFactory): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/airflow/contrib/kubernetes/pod_launcher.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/pod_launcher.py b/airflow/contrib/kubernetes/pod_launcher.py index e92ae5c..e435a12 100644 --- a/airflow/contrib/kubernetes/pod_launcher.py +++ b/airflow/contrib/kubernetes/pod_launcher.py @@ -12,18 +12,20 @@ # See the License for the specific language governing permissions and # limitations under the License. from airflow.contrib.kubernetes.pod import Pod -from airflow.contrib.kubernetes.kubernetes_request_factory import SimplePodRequestFactory -from kubernetes import config, client, watch +from airflow.contrib.kubernetes.kubernetes_request_factory.pod_request_factory import SimplePodRequestFactory +from kubernetes import watch from kubernetes.client import V1Pod from airflow.utils.state import State import json import logging +from .kube_client import get_kube_client + class PodLauncher: - def __init__(self): + def __init__(self, kube_client=None): self.kube_req_factory = SimplePodRequestFactory() - self._client = self._kube_client() + self._client = kube_client or get_kube_client() self._watch = watch.Watch() self.logger = logging.getLogger(__name__) @@ -42,11 +44,6 @@ class PodLauncher: final_status = self._monitor_pod(pod) return final_status - def _kube_client(self): - #TODO: This should also allow people to point to a cluster. - config.load_incluster_config() - return client.CoreV1Api() - def _monitor_pod(self, pod): # type: (Pod) -> State for event in self._watch.stream(self.read_pod(pod), pod.namespace): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/airflow/executors/base_executor.py ---------------------------------------------------------------------- diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index d5e958f..4515dac 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -39,7 +39,6 @@ class BaseExecutor(LoggingMixin): self.queued_tasks = {} self.running = {} self.event_buffer = {} - self.logger.setLevel(10) def start(self): # pragma: no cover """ @@ -106,7 +105,7 @@ class BaseExecutor(LoggingMixin): """ pass - def heartbeat(self, km=False): + def heartbeat(self): # Triggering new jobs if not self.parallelism: open_slots = len(self.queued_tasks) @@ -132,7 +131,7 @@ class BaseExecutor(LoggingMixin): # does NOT eliminate it. self.queued_tasks.pop(key) ti.refresh_from_db() - if ti.state != State.RUNNING or km: + if ti.state != State.RUNNING: self.running[key] = command self.execute_async(key, command=command, queue=queue) else: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py ---------------------------------------------------------------------- diff --git a/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py b/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py new file mode 100644 index 0000000..d642476 --- /dev/null +++ b/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py @@ -0,0 +1,50 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""kubernetes_resource_checkpointing + +Revision ID: 33ae817a1ff4 +Revises: 947454bf1dff +Create Date: 2017-09-11 15:26:47.598494 + +""" + +# revision identifiers, used by Alembic. +revision = '33ae817a1ff4' +down_revision = 'd2ae31099d61' +branch_labels = None +depends_on = None + + +from alembic import op +import sqlalchemy as sa + + +RESOURCE_TABLE = "kube_resource_version" + + +def upgrade(): + table = op.create_table( + RESOURCE_TABLE, + sa.Column("one_row_id", sa.Boolean, server_default=sa.true(), primary_key=True), + sa.Column("resource_version", sa.String(255)), + sa.CheckConstraint("one_row_id", name="kube_resource_version_one_row_id") + ) + op.bulk_insert(table, [ + {"resource_version": ""} + ]) + + +def downgrade(): + op.drop_table(RESOURCE_TABLE) + http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 02409dd..d03c363 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -56,7 +56,7 @@ from urllib.parse import urlparse, quote from sqlalchemy import ( Column, Integer, String, DateTime, Text, Boolean, ForeignKey, PickleType, Index, Float, LargeBinary) -from sqlalchemy import func, or_, and_ +from sqlalchemy import func, or_, and_, true as sqltrue from sqlalchemy.ext.declarative import declarative_base, declared_attr from sqlalchemy.dialects.mysql import LONGTEXT from sqlalchemy.orm import reconstructor, relationship, synonym @@ -1118,28 +1118,6 @@ class TaskInstance(Base, LoggingMixin): session.commit() @provide_session - def update_hostname(self, hostname, session=None): - """ - For use in kubernetes mode. Update the session to allow heartbeating to SQL - :param session: - - :return: - - """ - t_i = TaskInstance - - qry = session.query(t_i).filter( - t_i.dag_id == self.dag_id, - t_i.task_id == self.task_id, - t_i.execution_date == self.execution_date) - - ti = qry.first() - if ti: - ti.hostname = hostname - session.add(ti) - session.commit() - - @provide_session def refresh_from_db(self, session=None, lock_for_update=False): """ Refreshes the task instance from the database based on the primary key @@ -5121,3 +5099,24 @@ class ImportError(Base): timestamp = Column(UtcDateTime) filename = Column(String(1024)) stacktrace = Column(Text) + + +class KubeResourceVersion(Base): + __tablename__ = "kube_resource_version" + one_row_id = Column(Boolean, server_default=sqltrue(), primary_key=True) + resource_version = Column(String(255)) + + @staticmethod + @provide_session + def get_current_resource_version(session=None): + (resource_version,) = session.query(KubeResourceVersion.resource_version).one() + return resource_version + + @staticmethod + @provide_session + def checkpoint_resource_version(resource_version, session=None): + if resource_version: + session.query(KubeResourceVersion).update({ + KubeResourceVersion.resource_version: resource_version + }) + session.commit() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/airflow/plugins_manager.py ---------------------------------------------------------------------- diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py index a7adda6..735f2de 100644 --- a/airflow/plugins_manager.py +++ b/airflow/plugins_manager.py @@ -48,7 +48,6 @@ class AirflowPlugin(object): admin_views = [] flask_blueprints = [] menu_links = [] - dag_importer = None @classmethod def validate(cls): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/docker/Dockerfile ---------------------------------------------------------------------- diff --git a/docker/Dockerfile b/docker/Dockerfile deleted file mode 100644 index 38e7c7c..0000000 --- a/docker/Dockerfile +++ /dev/null @@ -1,40 +0,0 @@ -FROM ubuntu:16.04 - -# install deps -RUN apt-get update -y && apt-get install -y \ - wget \ - python-dev \ - python-pip \ - libczmq-dev \ - libcurlpp-dev \ - curl \ - libssl-dev \ - git \ - inetutils-telnet \ - bind9utils - -RUN pip install -U setuptools && \ - pip install -U pip - -RUN pip install kubernetes && \ - pip install cryptography && \ - pip install psycopg2==2.7.1 - -# install airflow -COPY airflow.tar.gz /tmp/airflow.tar.gz -RUN pip install /tmp/airflow.tar.gz - -# prep airflow -ENV AIRFLOW_HOME=/root/airflow -ENV AIRFLOW__CORE__EXECUTOR=KubernetesExecutor -ENV AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://root:root@postgres-airflow:5432/airflow - -# Don't read the raw revisions git-sync puts in the volume, we always want to read from the symlink -# A hack, how else can we do this -RUN mkdir -p $AIRFLOW_HOME/dags && echo "rev-[a-zA-Z0-9]+" > $AIRFLOW_HOME/dags/.airflowignore - - -COPY bootstrap.sh /bootstrap.sh -RUN chmod +x /bootstrap.sh - -ENTRYPOINT ["/bootstrap.sh"] http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/docker/bootstrap.sh ---------------------------------------------------------------------- diff --git a/docker/bootstrap.sh b/docker/bootstrap.sh deleted file mode 100644 index 82124ac..0000000 --- a/docker/bootstrap.sh +++ /dev/null @@ -1,13 +0,0 @@ -#!/bin/bash - -# launch the appropriate process - -if [ "$1" = "webserver" ] -then - exec airflow webserver -fi - -if [ "$1" = "scheduler" ] -then - exec airflow scheduler -fi http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/docker/build.sh ---------------------------------------------------------------------- diff --git a/docker/build.sh b/docker/build.sh deleted file mode 100755 index f2a942e..0000000 --- a/docker/build.sh +++ /dev/null @@ -1,12 +0,0 @@ -IMAGE=grantnicholas/kubeairflow -TAG=${1:-latest} - -if [ -f airflow.tar.gz ]; then - echo "Not rebuilding airflow source" -else - cd ../ && python setup.py sdist && cd docker && \ - cp ../dist/apache-airflow-1.9.0.dev0+incubating.tar.gz airflow.tar.gz -fi - -docker build . --tag=${IMAGE}:${TAG} -docker push ${IMAGE}:${TAG} http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/kube/airflow.yaml.template ---------------------------------------------------------------------- diff --git a/kube/airflow.yaml.template b/kube/airflow.yaml.template deleted file mode 100644 index eca6d3c..0000000 --- a/kube/airflow.yaml.template +++ /dev/null @@ -1,103 +0,0 @@ -apiVersion: extensions/v1beta1 -kind: Deployment -metadata: - name: airflow -spec: - replicas: 1 - template: - metadata: - labels: - name: airflow - annotations: - pod.beta.kubernetes.io/init-containers: '[ - { - "name": "init", - "image": "{{docker_image}}", - "command": ["bash", "-c", "cd /usr/local/lib/python2.7/dist-packages/airflow && airflow initdb && alembic upgrade head"] - } - ]' - spec: - initContainers: - - name: init - image: {{docker_image}} - command: [ - "bash", "-c", "cd /usr/local/lib/python2.7/dist-packages/airflow && airflow initdb && alembic upgrade head" - ] - containers: - - name: web - image: {{docker_image}} - ports: - - name: web - containerPort: 8080 - args: ["webserver"] - env: - - name: AIRFLOW__CORE__EXECUTOR - value: KubernetesExecutor - - name: AIRFLOW__CORE__K8S_IMAGE - value: {{docker_image}} - - name: AIRFLOW__CORE__K8S_GIT_REPO - value: https://github.com/grantnicholas/testdags.git - - name: AIRFLOW__CORE__K8S_GIT_BRANCH - value: master - volumeMounts: - - name: dags - mountPath: /root/airflow/dags/synched - readinessProbe: - initialDelaySeconds: 5 - timeoutSeconds: 5 - periodSeconds: 5 - httpGet: - path: /admin - port: 8080 - livenessProbe: - initialDelaySeconds: 5 - timeoutSeconds: 5 - failureThreshold: 5 - httpGet: - path: /admin - port: 8080 - - name: scheduler - image: {{docker_image}} - args: ["scheduler"] - env: - - name: AIRFLOW__CORE__EXECUTOR - value: KubernetesExecutor - - name: AIRFLOW__CORE__K8S_IMAGE - value: {{docker_image}} - - name: AIRFLOW__CORE__K8S_GIT_REPO - value: https://github.com/grantnicholas/testdags.git - - name: AIRFLOW__CORE__K8S_GIT_BRANCH - value: master - - name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL - value: "60" - volumeMounts: - - name: dags - mountPath: /root/airflow/dags/synched - - name: sync - image: gcr.io/google_containers/git-sync:v2.0.4 - env: - - name: GIT_SYNC_REPO - value: https://github.com/grantnicholas/testdags.git - - name: GIT_SYNC_BRANCH - value: master - - name: GIT_SYNC_DEST - value: git - volumeMounts: - - name: dags - mountPath: /git - volumes: - - name: dags - emptyDir: {} ---- -apiVersion: v1 -kind: Service -metadata: - name: airflow -spec: - type: NodePort - ports: - - port: 8080 - nodePort: 30809 - selector: - name: airflow - http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/kube/deploy.sh ---------------------------------------------------------------------- diff --git a/kube/deploy.sh b/kube/deploy.sh deleted file mode 100755 index 28b58ca..0000000 --- a/kube/deploy.sh +++ /dev/null @@ -1,6 +0,0 @@ -IMAGE=${1:-grantnicholas/kubeairflow} -TAG=${2:-latest} - -mkdir -p .generated -kubectl apply -f postgres.yaml -sed "s#{{docker_image}}#$IMAGE:$TAG#g" airflow.yaml.template > .generated/airflow.yaml && kubectl apply -f .generated/airflow.yaml http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/kube/postgres.yaml ---------------------------------------------------------------------- diff --git a/kube/postgres.yaml b/kube/postgres.yaml deleted file mode 100644 index a7298d1..0000000 --- a/kube/postgres.yaml +++ /dev/null @@ -1,94 +0,0 @@ -apiVersion: v1 -kind: PersistentVolume -metadata: - name: postgres-airflow -spec: - accessModes: - - ReadWriteOnce - capacity: - storage: 5Gi - hostPath: - path: /data/postgres-airflow ---- -kind: PersistentVolumeClaim -apiVersion: v1 -metadata: - name: postgres-airflow -spec: - accessModes: - - ReadWriteOnce - resources: - requests: - storage: 5Gi ---- -kind: Deployment -apiVersion: extensions/v1beta1 -metadata: - name: postgres-airflow -spec: - replicas: 1 - template: - metadata: - labels: - name: postgres-airflow - spec: - restartPolicy: Always - containers: - - name: postgres - image: postgres - ports: - - containerPort: 5432 - protocol: TCP - volumeMounts: - - name: dbvol - mountPath: /var/lib/postgresql/data/pgdata - subPath: pgdata - env: - - name: POSTGRES_USER - value: root - - name: POSTGRES_PASSWORD - value: root - - name: POSTGRES_DB - value: airflow - - name: PGDATA - value: /var/lib/postgresql/data/pgdata - - name: POD_IP - valueFrom: { fieldRef: { fieldPath: status.podIP } } - livenessProbe: - initialDelaySeconds: 60 - timeoutSeconds: 5 - failureThreshold: 5 - exec: - command: - - /bin/sh - - -c - - exec pg_isready --host $POD_IP || if [[ $(psql -qtAc --host $POD_IP 'SELECT pg_is_in_recovery') != "f" ]]; then exit 0 else; exit 1; fi - readinessProbe: - initialDelaySeconds: 5 - timeoutSeconds: 5 - periodSeconds: 5 - exec: - command: - - /bin/sh - - -c - - exec pg_isready --host $POD_IP - resources: - requests: - memory: .5Gi - cpu: .5 - volumes: - - name: dbvol - persistentVolumeClaim: - claimName: postgres-airflow ---- -apiVersion: v1 -kind: Service -metadata: - name: postgres-airflow -spec: - clusterIP: None - ports: - - port: 5432 - targetPort: 5432 - selector: - name: postgres-airflow http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/scripts/ci/kubernetes/docker/Dockerfile ---------------------------------------------------------------------- diff --git a/scripts/ci/kubernetes/docker/Dockerfile b/scripts/ci/kubernetes/docker/Dockerfile new file mode 100644 index 0000000..a3b05b0 --- /dev/null +++ b/scripts/ci/kubernetes/docker/Dockerfile @@ -0,0 +1,53 @@ +# Licensed to the Apache Software Foundation (ASF) under one * +# or more contributor license agreements. See the NOTICE file * +# distributed with this work for additional information * +# regarding copyright ownership. The ASF licenses this file * +# to you under the Apache License, Version 2.0 (the * +# "License"); you may not use this file except in compliance * +# with the License. You may obtain a copy of the License at * +# * +# http://www.apache.org/licenses/LICENSE-2.0 * +# * +# Unless required by applicable law or agreed to in writing, * +# software distributed under the License is distributed on an * +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * +# KIND, either express or implied. See the License for the * +# specific language governing permissions and limitations * +# under the License. * + +FROM ubuntu:16.04 + +# install deps +RUN apt-get update -y && apt-get install -y \ + wget \ + python-dev \ + python-pip \ + libczmq-dev \ + libcurlpp-dev \ + curl \ + libssl-dev \ + git \ + inetutils-telnet \ + bind9utils + +RUN pip install -U setuptools && \ + pip install -U pip + +RUN pip install kubernetes && \ + pip install cryptography && \ + pip install psycopg2==2.7.3.1 # I had issues with older versions of psycopg2, just a warning + +# install airflow +COPY airflow.tar.gz /tmp/airflow.tar.gz +RUN pip install /tmp/airflow.tar.gz + +# prep airflow +ENV AIRFLOW_HOME=/root/airflow +ENV AIRFLOW__CORE__EXECUTOR=KubernetesExecutor +ENV AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://root:root@postgres-airflow:5432/airflow + + +COPY bootstrap.sh /bootstrap.sh +RUN chmod +x /bootstrap.sh + +ENTRYPOINT ["/bootstrap.sh"] http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/scripts/ci/kubernetes/docker/bootstrap.sh ---------------------------------------------------------------------- diff --git a/scripts/ci/kubernetes/docker/bootstrap.sh b/scripts/ci/kubernetes/docker/bootstrap.sh new file mode 100644 index 0000000..b8e54e6 --- /dev/null +++ b/scripts/ci/kubernetes/docker/bootstrap.sh @@ -0,0 +1,29 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one * +# or more contributor license agreements. See the NOTICE file * +# distributed with this work for additional information * +# regarding copyright ownership. The ASF licenses this file * +# to you under the Apache License, Version 2.0 (the * +# "License"); you may not use this file except in compliance * +# with the License. You may obtain a copy of the License at * +# * +# http://www.apache.org/licenses/LICENSE-2.0 * +# * +# Unless required by applicable law or agreed to in writing, * +# software distributed under the License is distributed on an * +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * +# KIND, either express or implied. See the License for the * +# specific language governing permissions and limitations * +# under the License. * + +# launch the appropriate process + +if [ "$1" = "webserver" ] +then + exec airflow webserver +fi + +if [ "$1" = "scheduler" ] +then + exec airflow scheduler +fi http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/scripts/ci/kubernetes/docker/build.sh ---------------------------------------------------------------------- diff --git a/scripts/ci/kubernetes/docker/build.sh b/scripts/ci/kubernetes/docker/build.sh new file mode 100755 index 0000000..d36ea86 --- /dev/null +++ b/scripts/ci/kubernetes/docker/build.sh @@ -0,0 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one * +# or more contributor license agreements. See the NOTICE file * +# distributed with this work for additional information * +# regarding copyright ownership. The ASF licenses this file * +# to you under the Apache License, Version 2.0 (the * +# "License"); you may not use this file except in compliance * +# with the License. You may obtain a copy of the License at * +# * +# http://www.apache.org/licenses/LICENSE-2.0 * +# * +# Unless required by applicable law or agreed to in writing, * +# software distributed under the License is distributed on an * +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * +# KIND, either express or implied. See the License for the * +# specific language governing permissions and limitations * +# under the License. * + +IMAGE=${1:-airflow/ci} +TAG=${2:-latest} +DIRNAME=$(cd "$(dirname "$0")"; pwd) +AIRFLOW_ROOT="$DIRNAME/../../../.." + +ENVCONFIG=$(minikube docker-env) +if [ $? -eq 0 ]; then + eval $ENVCONFIG +fi + +cd $AIRFLOW_ROOT && python setup.py sdist && cp $AIRFLOW_ROOT/dist/*.tar.gz $DIRNAME/airflow.tar.gz && \ +cd $DIRNAME && docker build $DIRNAME --tag=${IMAGE}:${TAG} http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/scripts/ci/kubernetes/kube/airflow.yaml.template ---------------------------------------------------------------------- diff --git a/scripts/ci/kubernetes/kube/airflow.yaml.template b/scripts/ci/kubernetes/kube/airflow.yaml.template new file mode 100644 index 0000000..a297b95 --- /dev/null +++ b/scripts/ci/kubernetes/kube/airflow.yaml.template @@ -0,0 +1,165 @@ +# Licensed to the Apache Software Foundation (ASF) under one * +# or more contributor license agreements. See the NOTICE file * +# distributed with this work for additional information * +# regarding copyright ownership. The ASF licenses this file * +# to you under the Apache License, Version 2.0 (the * +# "License"); you may not use this file except in compliance * +# with the License. You may obtain a copy of the License at * +# * +# http://www.apache.org/licenses/LICENSE-2.0 * +# * +# Unless required by applicable law or agreed to in writing, * +# software distributed under the License is distributed on an * +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * +# KIND, either express or implied. See the License for the * +# specific language governing permissions and limitations * +# under the License. * + +# The backing volume can be anything you want, it just needs to be `ReadWriteOnce` +# I'm using hostPath since minikube is nice for testing, but any (non-local) volume will work on a real cluster +kind: PersistentVolume +apiVersion: v1 +metadata: + name: airflow-dags + labels: + type: local +spec: + capacity: + storage: 10Gi + accessModes: + - ReadWriteOnce + hostPath: + path: "/data/airflow-dags" +--- +kind: PersistentVolumeClaim +apiVersion: v1 +metadata: + name: airflow-dags +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 10Gi +--- +apiVersion: extensions/v1beta1 +kind: Deployment +metadata: + name: airflow +spec: + replicas: 1 + template: + metadata: + labels: + name: airflow + annotations: + pod.beta.kubernetes.io/init-containers: '[ + { + "name": "init", + "image": "{{docker_image}}", + "imagePullPolicy": "IfNotPresent", + "command": [ + "bash", "-cx", "cd /usr/local/lib/python2.7/dist-packages/airflow && cp -R example_dags/* $AIRFLOW_HOME/dags/ && airflow initdb && alembic upgrade head" + ], + "env": [ + {"name": "AIRFLOW__KUBERNETES__CONTAINER_IMAGE", "value": ""}, + {"name": "AIRFLOW__KUBERNETES__DAGS_VOLUME_CLAIM", "value": "airflow-dags"}, + {"name": "AIRFLOW__KUBERNETES__DAGS_VOLUME_SUBPATH", "value": "git"} + ], + "volumeMounts": [ + {"name": "airflow-dags", "mountPath": "/root/airflow/dags"} + ] + } + ]' + spec: + containers: + - name: web + image: {{docker_image}} + imagePullPolicy: IfNotPresent + ports: + - name: web + containerPort: 8080 + args: ["webserver"] + env: + - name: AIRFLOW_KUBE_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: AIRFLOW__CORE__EXECUTOR + value: KubernetesExecutor + - name: AIRFLOW__KUBERNETES__CONTAINER_IMAGE + value: {{docker_image}} + - name: AIRFLOW__KUBERNETES__DELETE_WORKER_PODS + value: "True" + # set these two confs + - name: AIRFLOW__KUBERNETES__GIT_REPO + value: https://github.com/grantnicholas/testdags.git + - name: AIRFLOW__KUBERNETES__GIT_BRANCH + value: master + # or this one + - name: AIRFLOW__KUBERNETES__DAGS_VOLUME_CLAIM + value: airflow-dags + # + volumeMounts: + - name: airflow-dags + mountPath: /root/airflow/dags + readinessProbe: + initialDelaySeconds: 5 + timeoutSeconds: 5 + periodSeconds: 5 + httpGet: + path: /admin + port: 8080 + livenessProbe: + initialDelaySeconds: 5 + timeoutSeconds: 5 + failureThreshold: 5 + httpGet: + path: /admin + port: 8080 + - name: scheduler + image: {{docker_image}} + imagePullPolicy: IfNotPresent + args: ["scheduler"] + env: + - name: AIRFLOW_KUBE_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: AIRFLOW__CORE__EXECUTOR + value: KubernetesExecutor + - name: AIRFLOW__KUBERNETES__CONTAINER_IMAGE + value: {{docker_image}} + - name: AIRFLOW__KUBERNETES__DELETE_WORKER_PODS + value: "True" + # set these two confs + - name: AIRFLOW__KUBERNETES__GIT_REPO + value: https://github.com/grantnicholas/testdags.git + - name: AIRFLOW__KUBERNETES__GIT_BRANCH + value: master + # or set this one + - name: AIRFLOW__KUBERNETES__DAGS_VOLUME_CLAIM + value: airflow-dags + # + - name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL + value: "60" + volumeMounts: + - name: airflow-dags + mountPath: /root/airflow/dags + volumes: + - name: airflow-dags + persistentVolumeClaim: + claimName: airflow-dags +--- +apiVersion: v1 +kind: Service +metadata: + name: airflow +spec: + type: NodePort + ports: + - port: 8080 + nodePort: 30809 + selector: + name: airflow + http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/scripts/ci/kubernetes/kube/deploy.sh ---------------------------------------------------------------------- diff --git a/scripts/ci/kubernetes/kube/deploy.sh b/scripts/ci/kubernetes/kube/deploy.sh new file mode 100755 index 0000000..2532d83 --- /dev/null +++ b/scripts/ci/kubernetes/kube/deploy.sh @@ -0,0 +1,42 @@ +# Licensed to the Apache Software Foundation (ASF) under one * +# or more contributor license agreements. See the NOTICE file * +# distributed with this work for additional information * +# regarding copyright ownership. The ASF licenses this file * +# to you under the Apache License, Version 2.0 (the * +# "License"); you may not use this file except in compliance * +# with the License. You may obtain a copy of the License at * +# * +# http://www.apache.org/licenses/LICENSE-2.0 * +# * +# Unless required by applicable law or agreed to in writing, * +# software distributed under the License is distributed on an * +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * +# KIND, either express or implied. See the License for the * +# specific language governing permissions and limitations * +# under the License. * + +IMAGE=${1:-airflow/ci} +TAG=${2:-latest} +DIRNAME=$(cd "$(dirname "$0")"; pwd) + +# create an emptydir for postgres to store it's volume data in +sudo mkdir -p /data/postgres-airflow + +mkdir -p $DIRNAME/.generated +kubectl apply -f $DIRNAME/postgres.yaml +sed "s#{{docker_image}}#$IMAGE:$TAG#g" $DIRNAME/airflow.yaml.template > $DIRNAME/.generated/airflow.yaml && kubectl apply -f $DIRNAME/.generated/airflow.yaml + + +# wait for up to 10 minutes for everything to be deployed +for i in {1..150} +do + echo "------- Running kubectl get pods -------" + PODS=$(kubectl get pods | awk 'NR>1 {print $0}') + echo "$PODS" + NUM_AIRFLOW_READY=$(echo $PODS | grep airflow | awk '{print $2}' | grep -E '([0-9])\/(\1)' | wc -l) + NUM_POSTGRES_READY=$(echo $PODS | grep postgres | awk '{print $2}' | grep -E '([0-9])\/(\1)' | wc -l) + if [ "$NUM_AIRFLOW_READY" == "1" ] && [ "$NUM_POSTGRES_READY" == "1" ]; then + break + fi + sleep 4 +done http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/scripts/ci/kubernetes/kube/postgres.yaml ---------------------------------------------------------------------- diff --git a/scripts/ci/kubernetes/kube/postgres.yaml b/scripts/ci/kubernetes/kube/postgres.yaml new file mode 100644 index 0000000..79366d0 --- /dev/null +++ b/scripts/ci/kubernetes/kube/postgres.yaml @@ -0,0 +1,111 @@ +# Licensed to the Apache Software Foundation (ASF) under one * +# or more contributor license agreements. See the NOTICE file * +# distributed with this work for additional information * +# regarding copyright ownership. The ASF licenses this file * +# to you under the Apache License, Version 2.0 (the * +# "License"); you may not use this file except in compliance * +# with the License. You may obtain a copy of the License at * +# * +# http://www.apache.org/licenses/LICENSE-2.0 * +# * +# Unless required by applicable law or agreed to in writing, * +# software distributed under the License is distributed on an * +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * +# KIND, either express or implied. See the License for the * +# specific language governing permissions and limitations * +# under the License. * + +apiVersion: v1 +kind: PersistentVolume +metadata: + name: postgres-airflow +spec: + accessModes: + - ReadWriteOnce + capacity: + storage: 5Gi + hostPath: + path: /data/postgres-airflow +--- +kind: PersistentVolumeClaim +apiVersion: v1 +metadata: + name: postgres-airflow +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 5Gi +--- +kind: Deployment +apiVersion: extensions/v1beta1 +metadata: + name: postgres-airflow +spec: + replicas: 1 + template: + metadata: + labels: + name: postgres-airflow + spec: + restartPolicy: Always + containers: + - name: postgres + image: postgres + ports: + - containerPort: 5432 + protocol: TCP + volumeMounts: + - name: dbvol + mountPath: /var/lib/postgresql/data/pgdata + subPath: pgdata + env: + - name: POSTGRES_USER + value: root + - name: POSTGRES_PASSWORD + value: root + - name: POSTGRES_DB + value: airflow + - name: PGDATA + value: /var/lib/postgresql/data/pgdata + - name: POD_IP + valueFrom: { fieldRef: { fieldPath: status.podIP } } + livenessProbe: + initialDelaySeconds: 60 + timeoutSeconds: 5 + failureThreshold: 5 + exec: + command: + - /bin/sh + - -c + - exec pg_isready --host $POD_IP || if [[ $(psql -qtAc --host $POD_IP 'SELECT pg_is_in_recovery') != "f" ]]; then exit 0 else; exit 1; fi + readinessProbe: + initialDelaySeconds: 5 + timeoutSeconds: 5 + periodSeconds: 5 + exec: + command: + - /bin/sh + - -c + - exec pg_isready --host $POD_IP + resources: + requests: + memory: .5Gi + cpu: .5 + volumes: + - name: dbvol + persistentVolumeClaim: + claimName: postgres-airflow +--- +apiVersion: v1 +kind: Service +metadata: + name: postgres-airflow +spec: + clusterIP: None + ports: + - port: 5432 + targetPort: 5432 + selector: + name: postgres-airflow http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/scripts/ci/kubernetes/minikube/start_minikube.sh ---------------------------------------------------------------------- diff --git a/scripts/ci/kubernetes/minikube/start_minikube.sh b/scripts/ci/kubernetes/minikube/start_minikube.sh index be370cf..8a27d75 100755 --- a/scripts/ci/kubernetes/minikube/start_minikube.sh +++ b/scripts/ci/kubernetes/minikube/start_minikube.sh @@ -1,19 +1,19 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. +# Licensed to the Apache Software Foundation (ASF) under one * +# or more contributor license agreements. See the NOTICE file * +# distributed with this work for additional information * +# regarding copyright ownership. The ASF licenses this file * +# to you under the Apache License, Version 2.0 (the * +# "License"); you may not use this file except in compliance * +# with the License. You may obtain a copy of the License at * +# * +# http://www.apache.org/licenses/LICENSE-2.0 * +# * +# Unless required by applicable law or agreed to in writing, * +# software distributed under the License is distributed on an * +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * +# KIND, either express or implied. See the License for the * +# specific language governing permissions and limitations * +# under the License. * # Guard against a kubernetes cluster already being up kubectl get pods &> /dev/null @@ -23,8 +23,8 @@ if [ $? -eq 0 ]; then fi # -curl -Lo minikube https://storage.googleapis.com/minikube/releases/v0.24.1/minikube-linux-amd64 && chmod +x minikube -curl -Lo kubectl https://storage.googleapis.com/kubernetes-release/release/${KUBERNETES_VERSION}/bin/linux/amd64/kubectl && chmod +x kubectl +curl -Lo minikube https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64 && chmod +x minikube +curl -Lo kubectl https://storage.googleapis.com/kubernetes-release/release/v1.7.0/bin/linux/amd64/kubectl && chmod +x kubectl sudo mkdir -p /usr/local/bin sudo mv minikube /usr/local/bin/minikube @@ -38,43 +38,15 @@ mkdir $HOME/.kube || true touch $HOME/.kube/config export KUBECONFIG=$HOME/.kube/config - -start_minikube(){ - sudo -E minikube start --vm-driver=none --kubernetes-version="${KUBERNETES_VERSION}" - - # this for loop waits until kubectl can access the api server that minikube has created - for i in {1..90} # timeout 3 minutes - do - echo "------- Running kubectl get pods -------" - STDERR=$(kubectl get pods 2>&1 >/dev/null) - if [ $? -eq 0 ]; then - echo $STDERR - - # We do not need dynamic hostpath provisioning, so disable the default storageclass - sudo -E minikube addons disable default-storageclass && kubectl delete storageclasses --all - - # We need to give permission to watch pods to the airflow scheduler. - # The easiest way to do that is by giving admin access to the default serviceaccount (NOT SAFE!) - kubectl create clusterrolebinding add-on-cluster-admin --clusterrole=cluster-admin --serviceaccount=default:default - exit 0 - fi - echo $STDERR - sleep 2 - done -} - -cleanup_minikube(){ - sudo -E minikube stop - sudo -E minikube delete - docker stop $(docker ps -a -q) || true - docker rm $(docker ps -a -q) || true - sleep 1 -} - -start_minikube -echo "Minikube cluster creation timedout. Attempting to restart the minikube cluster." -cleanup_minikube -start_minikube -echo "Minikube cluster creation timedout a second time. Failing." - -exit 1 +sudo -E minikube start --vm-driver=none + +# this for loop waits until kubectl can access the api server that minikube has created +for i in {1..150} # timeout for 5 minutes +do + echo "------- Running kubectl get pods -------" + kubectl get po &> /dev/null + if [ $? -ne 1 ]; then + break + fi + sleep 2 +done http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/scripts/ci/kubernetes/setup_kubernetes.sh ---------------------------------------------------------------------- diff --git a/scripts/ci/kubernetes/setup_kubernetes.sh b/scripts/ci/kubernetes/setup_kubernetes.sh index fa4e523..ea559a0 100755 --- a/scripts/ci/kubernetes/setup_kubernetes.sh +++ b/scripts/ci/kubernetes/setup_kubernetes.sh @@ -24,5 +24,7 @@ echo "For development, start minikube yourself (ie: minikube start) then run thi DIRNAME=$(cd "$(dirname "$0")"; pwd) $DIRNAME/minikube/start_minikube.sh +$DIRNAME/docker/build.sh +$DIRNAME/kube/deploy.sh echo "Airflow environment on kubernetes is good to go!" http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/scripts/ci/run_tests.sh ---------------------------------------------------------------------- diff --git a/scripts/ci/run_tests.sh b/scripts/ci/run_tests.sh index 9542305..d5e7655 100755 --- a/scripts/ci/run_tests.sh +++ b/scripts/ci/run_tests.sh @@ -43,11 +43,5 @@ if [ "${TRAVIS}" ]; then kinit -kt ${KRB5_KTNAME} airflow fi -if [[ "$RUN_FLAKE8" == "true" ]]; then - ./flake8_diff.sh -fi - -if [[ "$SKIP_TESTS" != "true" ]]; then - echo Backend: $AIRFLOW__CORE__SQL_ALCHEMY_CONN - ./run_unit_tests.sh $@ -fi +echo Backend: $AIRFLOW__CORE__SQL_ALCHEMY_CONN +./run_unit_tests.sh $@ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/scripts/ci/travis_script.sh ---------------------------------------------------------------------- diff --git a/scripts/ci/travis_script.sh b/scripts/ci/travis_script.sh index 8766e94..5b2a198 100755 --- a/scripts/ci/travis_script.sh +++ b/scripts/ci/travis_script.sh @@ -21,12 +21,12 @@ DIRNAME=$(cd "$(dirname "$0")"; pwd) AIRFLOW_ROOT="$DIRNAME/../.." cd $AIRFLOW_ROOT && pip --version && ls -l $HOME/.wheelhouse && tox --version -if [ -z "$KUBERNETES_VERSION" ]; +if [ -z "$RUN_KUBE_INTEGRATION" ]; then tox -e $TOX_ENV else - KUBERNETES_VERSION=${KUBERNETES_VERSION} $DIRNAME/kubernetes/setup_kubernetes.sh && \ - tox -e $TOX_ENV -- tests.contrib.minikube_tests \ + $DIRNAME/kubernetes/setup_kubernetes.sh && \ + tox -e $TOX_ENV -- tests.contrib.executors.integration \ --with-coverage \ --cover-erase \ --cover-html \ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/tests/contrib/__init__.py ---------------------------------------------------------------------- diff --git a/tests/contrib/__init__.py b/tests/contrib/__init__.py index 008677e..50b2e1d 100644 --- a/tests/contrib/__init__.py +++ b/tests/contrib/__init__.py @@ -20,4 +20,3 @@ from __future__ import absolute_import from .operators import * from .sensors import * -from .kubernetes import * http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/tests/contrib/executors/__init__.py ---------------------------------------------------------------------- diff --git a/tests/contrib/executors/__init__.py b/tests/contrib/executors/__init__.py new file mode 100644 index 0000000..9d7677a --- /dev/null +++ b/tests/contrib/executors/__init__.py @@ -0,0 +1,13 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/tests/contrib/executors/integration/__init__.py ---------------------------------------------------------------------- diff --git a/tests/contrib/executors/integration/__init__.py b/tests/contrib/executors/integration/__init__.py new file mode 100644 index 0000000..9d7677a --- /dev/null +++ b/tests/contrib/executors/integration/__init__.py @@ -0,0 +1,13 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/tests/contrib/executors/integration/airflow_controller.py ---------------------------------------------------------------------- diff --git a/tests/contrib/executors/integration/airflow_controller.py b/tests/contrib/executors/integration/airflow_controller.py new file mode 100644 index 0000000..499adb4 --- /dev/null +++ b/tests/contrib/executors/integration/airflow_controller.py @@ -0,0 +1,114 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import subprocess +import time + + +class RunCommandError(Exception): + pass + + +class TimeoutError(Exception): + pass + + +class DagRunState: + SUCCESS = "success" + FAILED = "failed" + RUNNING = "running" + + +def run_command(command): + process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = process.communicate() + if process.returncode != 0: + raise RunCommandError("Error while running command: {}; Stdout: {}; Stderr: {}".format( + command, stdout, stderr + )) + return stdout, stderr + + +def run_command_in_pod(pod_name, container_name, command): + return run_command("kubectl exec {pod_name} -c {container_name} -- {command}".format( + pod_name=pod_name, container_name=container_name, command=command + )) + +def _unpause_dag(dag_id, airflow_pod=None): + airflow_pod = airflow_pod or _get_airflow_pod() + return run_command_in_pod(airflow_pod, "scheduler", "airflow unpause {dag_id}".format(dag_id=dag_id)) + +def run_dag(dag_id, run_id, airflow_pod=None): + airflow_pod = airflow_pod or _get_airflow_pod() + _unpause_dag(dag_id, airflow_pod) + return run_command_in_pod(airflow_pod, "scheduler", "airflow trigger_dag {dag_id} -r {run_id}".format( + dag_id=dag_id, run_id=run_id + )) + + +def _get_pod_by_grep(grep_phrase): + stdout, stderr = run_command("kubectl get pods | grep {grep_phrase} | awk '{{print $1}}'".format( + grep_phrase=grep_phrase + )) + pod_name = stdout.strip() + return pod_name + + +def _get_airflow_pod(): + return _get_pod_by_grep("^airflow") + + +def _get_postgres_pod(): + return _get_pod_by_grep("^postgres") + + +def _parse_state(stdout): + end_line = "(1 row)" + prev_line = None + for line in stdout.split("\n"): + if end_line in line: + return prev_line.strip() + prev_line = line + + raise Exception("Unknown psql output: {}".format(stdout)) + +def get_dag_run_state(dag_id, run_id, postgres_pod=None): + postgres_pod = postgres_pod or _get_postgres_pod() + stdout, stderr = run_command_in_pod( + postgres_pod, "postgres", + """psql airflow -c "select state from dag_run where dag_id='{dag_id}' and run_id='{run_id}'" """.format( + dag_id=dag_id, run_id=run_id + ) + ) + return _parse_state(stdout) + + +def dag_final_state(dag_id, run_id, postgres_pod=None, poll_interval=1, timeout=120): + postgres_pod = postgres_pod or _get_postgres_pod() + for _ in range(0, timeout / poll_interval): + dag_state = get_dag_run_state(dag_id, run_id, postgres_pod) + if dag_state != DagRunState.RUNNING: + return dag_state + time.sleep(poll_interval) + + raise TimeoutError("Timed out while waiting for DagRun with dag_id: {} run_id: {}".format(dag_id, run_id)) + + +def _kill_pod(pod_name): + return run_command("kubectl delete pod {pod_name}".format(pod_name=pod_name)) + + +def kill_scheduler(): + airflow_pod = _get_pod_by_grep("^airflow") + return _kill_pod(airflow_pod) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/tests/contrib/executors/integration/test_kubernetes_executor_integration.py ---------------------------------------------------------------------- diff --git a/tests/contrib/executors/integration/test_kubernetes_executor_integration.py b/tests/contrib/executors/integration/test_kubernetes_executor_integration.py new file mode 100644 index 0000000..709ae6a --- /dev/null +++ b/tests/contrib/executors/integration/test_kubernetes_executor_integration.py @@ -0,0 +1,57 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import time +from uuid import uuid4 +from tests.contrib.executors.integration.airflow_controller import ( + run_command, RunCommandError, + run_dag, get_dag_run_state, dag_final_state, DagRunState, + kill_scheduler +) + + +try: + run_command("kubectl get pods") +except RunCommandError: + SKIP_KUBE = True +else: + SKIP_KUBE = False + + +class KubernetesExecutorTest(unittest.TestCase): + + @unittest.skipIf(SKIP_KUBE, 'Kubernetes integration tests are unsupported by this configuration') + def test_kubernetes_executor_dag_runs_successfully(self): + dag_id, run_id = "example_python_operator", uuid4().hex + run_dag(dag_id, run_id) + state = dag_final_state(dag_id, run_id, timeout=120) + self.assertEquals(state, DagRunState.SUCCESS) + + @unittest.skipIf(SKIP_KUBE, 'Kubernetes integration tests are unsupported by this configuration') + def test_start_dag_then_kill_scheduler_then_ensure_dag_succeeds(self): + dag_id, run_id = "example_python_operator", uuid4().hex + run_dag(dag_id, run_id) + + self.assertEquals(get_dag_run_state(dag_id, run_id), DagRunState.RUNNING) + + time.sleep(10) + + kill_scheduler() + + self.assertEquals(dag_final_state(dag_id, run_id, timeout=180), DagRunState.SUCCESS) + + +if __name__ == "__main__": + unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/tests/contrib/executors/test_kubernetes_executor.py ---------------------------------------------------------------------- diff --git a/tests/contrib/executors/test_kubernetes_executor.py b/tests/contrib/executors/test_kubernetes_executor.py new file mode 100644 index 0000000..a60489e --- /dev/null +++ b/tests/contrib/executors/test_kubernetes_executor.py @@ -0,0 +1,71 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import unittest +import re +import string +import random +from datetime import datetime + +try: + from airflow.contrib.executors.kubernetes_executor import AirflowKubernetesScheduler +except ImportError: + AirflowKubernetesScheduler = None + + +class TestAirflowKubernetesScheduler(unittest.TestCase): + + def _gen_random_string(self, str_len): + return ''.join([random.choice(string.printable) for _ in range(str_len)]) + + def _cases(self): + cases = [ + ("my_dag_id", "my-task-id"), + ("my.dag.id", "my.task.id"), + ("MYDAGID", "MYTASKID"), + ("my_dag_id", "my_task_id"), + ("mydagid"*200, "my_task_id"*200) + ] + + cases.extend([ + (self._gen_random_string(200), self._gen_random_string(200)) + for _ in range(100) + ]) + + return cases + + def _is_valid_name(self, name): + regex = "^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$" + return len(name) <= 253 and \ + all(ch.lower() == ch for ch in name) and \ + re.match(regex, name) + + @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed') + def test_create_pod_id(self): + for dag_id, task_id in self._cases(): + pod_name = AirflowKubernetesScheduler._create_pod_id(dag_id, task_id) + assert self._is_valid_name(pod_name) + + @unittest.skipIf(AirflowKubernetesScheduler is None, "kubernetes python package is not installed") + def test_execution_date_serialize_deserialize(self): + datetime_obj = datetime.now() + serialized_datetime = AirflowKubernetesScheduler._datetime_to_label_safe_datestring(datetime_obj) + new_datetime_obj = AirflowKubernetesScheduler._label_safe_datestring_to_datetime(serialized_datetime) + + assert datetime_obj == new_datetime_obj + + +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9d90dc9/tst.txt ---------------------------------------------------------------------- diff --git a/tst.txt b/tst.txt deleted file mode 100644 index e69de29..0000000
