[AIRFLOW-1517] Add minikube for kubernetes integration tests Add better support for minikube integration tests; By default minikube integration tests will run with kubernetes 1.7 and kubernetes 1.8
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/cde3a5fe Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/cde3a5fe Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/cde3a5fe Branch: refs/heads/master Commit: cde3a5fecd825c90847bc563255707caafdd9336 Parents: 361dad9 Author: GRANT NICHOLAS <[email protected]> Authored: Wed Dec 27 14:21:20 2017 -0600 Committer: Daniel Imberman <[email protected]> Committed: Thu Jan 11 15:28:32 2018 -0800 ---------------------------------------------------------------------- .travis.yml | 9 +++ airflow/contrib/kubernetes/kube_client.py | 8 +- .../ci/kubernetes/minikube/start_minikube.sh | 58 +++++++++++---- scripts/ci/run_tests.sh | 2 +- scripts/ci/travis_script.sh | 7 +- tests/contrib/minikube_tests/__init__.py | 13 ++++ .../test_kubernetes_pod_operator.py | 78 ++++++++++++++++++++ .../operators/test_kubernetes_pod_operator.py | 69 ----------------- 8 files changed, 150 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cde3a5fe/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index 6b45153..dec9181 100644 --- a/.travis.yml +++ b/.travis.yml @@ -54,6 +54,8 @@ env: - TOX_ENV=py35-backend_sqlite - TOX_ENV=py35-backend_postgres - TOX_ENV=flake8 + - TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.7.0 + - TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0 matrix: exclude: - python: "3.5" @@ -70,6 +72,13 @@ matrix: env: TOX_ENV=py35-backend_postgres - python: "2.7" env: TOX_ENV=flake8 + - python: "3.5" + env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.7.0 + - python: "3.5" + env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0 + allow_failures: + - env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.7.0 + - env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0 cache: directories: - $HOME/.wheelhouse/ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cde3a5fe/airflow/contrib/kubernetes/kube_client.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kube_client.py b/airflow/contrib/kubernetes/kube_client.py index 7dc895e..9db8641 100644 --- a/airflow/contrib/kubernetes/kube_client.py +++ b/airflow/contrib/kubernetes/kube_client.py @@ -13,10 +13,11 @@ # limitations under the License. -def load_kube_config(in_cluster=True): +def _load_kube_config(in_cluster): from kubernetes import config, client if in_cluster: config.load_incluster_config() + return client.CoreV1Api() else: try: config.load_kube_config() @@ -28,7 +29,4 @@ def load_kube_config(in_cluster=True): def get_kube_client(in_cluster=True): # TODO: This should also allow people to point to a cluster. - - from kubernetes import client - load_kube_config(in_cluster) - return client.CoreV1Api() + return _load_kube_config(in_cluster) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cde3a5fe/scripts/ci/kubernetes/minikube/start_minikube.sh ---------------------------------------------------------------------- diff --git a/scripts/ci/kubernetes/minikube/start_minikube.sh b/scripts/ci/kubernetes/minikube/start_minikube.sh index f78cb3a..1da23d0 100755 --- a/scripts/ci/kubernetes/minikube/start_minikube.sh +++ b/scripts/ci/kubernetes/minikube/start_minikube.sh @@ -15,8 +15,8 @@ # specific language governing permissions and limitations * # under the License. * -# Guard against a kubernetes cluster already being up #!/usr/bin/env bash +# Guard against a kubernetes cluster already being up kubectl get pods &> /dev/null if [ $? -eq 0 ]; then echo "kubectl get pods returned 0 exit code, exiting early" @@ -24,8 +24,8 @@ if [ $? -eq 0 ]; then fi # -curl -Lo minikube https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64 && chmod +x minikube -curl -Lo kubectl https://storage.googleapis.com/kubernetes-release/release/v1.7.0/bin/linux/amd64/kubectl && chmod +x kubectl +curl -Lo minikube https://storage.googleapis.com/minikube/releases/v0.24.1/minikube-linux-amd64 && chmod +x minikube +curl -Lo kubectl https://storage.googleapis.com/kubernetes-release/release/${KUBERNETES_VERSION}/bin/linux/amd64/kubectl && chmod +x kubectl sudo mkdir -p /usr/local/bin sudo mv minikube /usr/local/bin/minikube @@ -39,15 +39,43 @@ mkdir $HOME/.kube || true touch $HOME/.kube/config export KUBECONFIG=$HOME/.kube/config -sudo -E minikube start --vm-driver=none - -# this for loop waits until kubectl can access the api server that minikube has created -for i in {1..150} # timeout for 5 minutes -do - echo "------- Running kubectl get pods -------" - kubectl get po &> /dev/null - if [ $? -ne 1 ]; then - break - fi - sleep 2 -done + +start_minikube(){ + sudo -E minikube start --vm-driver=none --kubernetes-version="${KUBERNETES_VERSION}" + + # this for loop waits until kubectl can access the api server that minikube has created + for i in {1..90} # timeout 3 minutes + do + echo "------- Running kubectl get pods -------" + STDERR=$(kubectl get pods 2>&1 >/dev/null) + if [ $? -ne 1 ]; then + echo $STDERR + + # We do not need dynamic hostpath provisioning, so disable the default storageclass + sudo -E minikube addons disable default-storageclass && kubectl delete storageclasses --all + + # We need to give permission to watch pods to the airflow scheduler. + # The easiest way to do that is by giving admin access to the default serviceaccount (NOT SAFE!) + kubectl create clusterrolebinding add-on-cluster-admin --clusterrole=cluster-admin --serviceaccount=default:default + exit 0 + fi + echo $STDERR + sleep 2 + done +} + +cleanup_minikube(){ + sudo -E minikube stop + sudo -E minikube delete + docker stop $(docker ps -a -q) || true + docker rm $(docker ps -a -q) || true + sleep 1 +} + +start_minikube +echo "Minikube cluster creation timedout. Attempting to restart the minikube cluster." +cleanup_minikube +start_minikube +echo "Minikube cluster creation timedout a second time. Failing." + +exit 1 http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cde3a5fe/scripts/ci/run_tests.sh ---------------------------------------------------------------------- diff --git a/scripts/ci/run_tests.sh b/scripts/ci/run_tests.sh index 1253686..8c47ee8 100755 --- a/scripts/ci/run_tests.sh +++ b/scripts/ci/run_tests.sh @@ -44,5 +44,5 @@ fi if [[ "$SKIP_TESTS" != "true" ]]; then echo Backend: $AIRFLOW__CORE__SQL_ALCHEMY_CONN - ./run_unit_tests.sh + ./run_unit_tests.sh $@ fi http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cde3a5fe/scripts/ci/travis_script.sh ---------------------------------------------------------------------- diff --git a/scripts/ci/travis_script.sh b/scripts/ci/travis_script.sh index a51e742..86c086a 100755 --- a/scripts/ci/travis_script.sh +++ b/scripts/ci/travis_script.sh @@ -19,13 +19,12 @@ DIRNAME=$(cd "$(dirname "$0")"; pwd) AIRFLOW_ROOT="$DIRNAME/../.." cd $AIRFLOW_ROOT && pip --version && ls -l $HOME/.wheelhouse && tox --version -if [ -z "$RUN_KUBE_INTEGRATION" ]; +if [ -z "$KUBERNETES_VERSION" ]; then - $DIRNAME/kubernetes/setup_kubernetes.sh tox -e $TOX_ENV else - $DIRNAME/kubernetes/setup_kubernetes.sh && \ - tox -e $TOX_ENV -- tests.contrib.executors.integration \ + KUBERNETES_VERSION=${KUBERNETES_VERSION} $DIRNAME/kubernetes/setup_kubernetes.sh && \ + tox -e $TOX_ENV -- tests.contrib.minikube_tests \ --with-coverage \ --cover-erase \ --cover-html \ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cde3a5fe/tests/contrib/minikube_tests/__init__.py ---------------------------------------------------------------------- diff --git a/tests/contrib/minikube_tests/__init__.py b/tests/contrib/minikube_tests/__init__.py new file mode 100644 index 0000000..9d7677a --- /dev/null +++ b/tests/contrib/minikube_tests/__init__.py @@ -0,0 +1,13 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cde3a5fe/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py b/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py new file mode 100644 index 0000000..18e614b --- /dev/null +++ b/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py @@ -0,0 +1,78 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import unittest +from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator +from airflow import AirflowException +from subprocess import check_call, CalledProcessError + + +try: + check_call(["kubectl", "get", "pods"]) +except CalledProcessError: + raise unittest.SkipTest( + "Kubernetes integration tests require a minikube cluster; Skipping tests" + ) + + +class KubernetesPodOperatorTest(unittest.TestCase): + + def test_working_pod(self): + k = KubernetesPodOperator(namespace='default', + image="ubuntu:16.04", + cmds=["bash", "-cx"], + arguments=["echo", "10"], + labels={"foo": "bar"}, + name="test", + task_id="task" + ) + + k.execute(None) + + def test_faulty_image(self): + bad_image_name = "foobar" + k = KubernetesPodOperator(namespace='default', + image=bad_image_name, + cmds=["bash", "-cx"], + arguments=["echo", "10"], + labels={"foo": "bar"}, + name="test", + task_id="task", + startup_timeout_seconds=5 + ) + with self.assertRaises(AirflowException) as cm: + k.execute(None), + + print("exception: {}".format(cm)) + + def test_pod_failure(self): + """ + Tests that the task fails when a pod reports a failure + """ + + bad_internal_command = "foobar" + k = KubernetesPodOperator(namespace='default', + image="ubuntu:16.04", + cmds=["bash", "-cx"], + arguments=[bad_internal_command, "10"], + labels={"foo": "bar"}, + name="test", + task_id="task" + ) + + with self.assertRaises(AirflowException): + k.execute(None) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cde3a5fe/tests/contrib/operators/test_kubernetes_pod_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_kubernetes_pod_operator.py b/tests/contrib/operators/test_kubernetes_pod_operator.py deleted file mode 100644 index 205f183..0000000 --- a/tests/contrib/operators/test_kubernetes_pod_operator.py +++ /dev/null @@ -1,69 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import unittest -from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator -from airflow import AirflowException - - -class KubernetesPodOperatorTest(unittest.TestCase): - - def test_working_pod(self): - k = KubernetesPodOperator(namespace='default', - image="ubuntu:16.04", - cmds=["bash", "-cx"], - arguments=["echo", "10"], - labels={"foo": "bar"}, - name="test", - task_id="task" - ) - - k.execute(None) - - def test_faulty_image(self): - bad_image_name = "foobar" - k = KubernetesPodOperator(namespace='default', - image=bad_image_name, - cmds=["bash", "-cx"], - arguments=["echo", "10"], - labels={"foo": "bar"}, - name="test", - task_id="task", - startup_timeout_seconds=5 - ) - with self.assertRaises(AirflowException) as cm: - k.execute(None), - - print("exception: {}".format(cm)) - - def test_pod_failure(self): - """ - Tests that the task fails when a pod reports a failure - """ - - bad_internal_command = "foobar" - k = KubernetesPodOperator(namespace='default', - image="ubuntu:16.04", - cmds=["bash", "-cx"], - arguments=[bad_internal_command, "10"], - labels={"foo": "bar"}, - name="test", - task_id="task" - ) - - with self.assertRaises(AirflowException): - k.execute(None)
