[AIRFLOW-1517] addressed PR comments
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d5b13a3d Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d5b13a3d Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d5b13a3d Branch: refs/heads/master Commit: d5b13a3dadb4632006fabfd96ea062dd70f612de Parents: 12b725d Author: Daniel Imberman <[email protected]> Authored: Tue Jan 2 09:38:30 2018 -0800 Committer: Daniel Imberman <[email protected]> Committed: Thu Jan 11 15:29:27 2018 -0800 ---------------------------------------------------------------------- airflow/contrib/kubernetes/kube_client.py | 1 + airflow/contrib/kubernetes/pod.py | 2 -- airflow/contrib/kubernetes/pod_launcher.py | 8 +++-- docs/kubernetes.rst | 28 ++++++--------- .../ci/kubernetes/minikube/start_minikube.sh | 36 ++++++++++---------- .../test_kubernetes_pod_operator.py | 6 ++-- 6 files changed, 39 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d5b13a3d/airflow/contrib/kubernetes/kube_client.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kube_client.py b/airflow/contrib/kubernetes/kube_client.py index ecb3d55..d1a63a2 100644 --- a/airflow/contrib/kubernetes/kube_client.py +++ b/airflow/contrib/kubernetes/kube_client.py @@ -25,6 +25,7 @@ def _load_kube_config(in_cluster): config.load_kube_config() return client.CoreV1Api() + def get_kube_client(in_cluster=True): # TODO: This should also allow people to point to a cluster. return _load_kube_config(in_cluster) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d5b13a3d/airflow/contrib/kubernetes/pod.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py index d80b626..b4eb5a1 100644 --- a/airflow/contrib/kubernetes/pod.py +++ b/airflow/contrib/kubernetes/pod.py @@ -53,8 +53,6 @@ class Pod: successful execution of the pod :type result: any """ - pod_timeout = 3600 - def __init__( self, image, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d5b13a3d/airflow/contrib/kubernetes/pod_launcher.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/pod_launcher.py b/airflow/contrib/kubernetes/pod_launcher.py index d2d3af7..51f443b 100644 --- a/airflow/contrib/kubernetes/pod_launcher.py +++ b/airflow/contrib/kubernetes/pod_launcher.py @@ -25,7 +25,7 @@ from airflow.contrib.kubernetes.kubernetes_request_factory import \ from kubernetes import watch from kubernetes.client.rest import ApiException from airflow import AirflowException - +from requests.exceptions import HTTPError from .kube_client import get_kube_client @@ -102,7 +102,11 @@ class PodLauncher(LoggingMixin): return state != State.SUCCESS and state != State.FAILED def read_pod(self, pod): - return self._client.read_namespaced_pod(pod.name, pod.namespace) + try: + return self._client.read_namespaced_pod(pod.name, pod.namespace) + except HTTPError as e: + raise AirflowException("There was an error reading the kubernetes API: {}" + .format(e)) def process_status(self, job_id, status): status = status.lower() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d5b13a3d/docs/kubernetes.rst ---------------------------------------------------------------------- diff --git a/docs/kubernetes.rst b/docs/kubernetes.rst index 8d57028..21d8501 100644 --- a/docs/kubernetes.rst +++ b/docs/kubernetes.rst @@ -17,20 +17,14 @@ Kubernetes Operator -+--------------+----------------------------------------------------------------+---------------+ -| name | description | -+==============+================================================================+===============+ -| ``@namespace`` | The namespace is your isolated work environment within kubernetes| -+--------------+----------------------------------------------------------------+---------------+ -| ``@image`` | docker image you wish to launch. Defaults to dockerhub.io, but fully qualified URLS will point to custom repositories | -+--------------+----------------------------------------------------------------+---------------+ -| ``@cmnds`` | To start a task in a docker image, we need to tell it what to do. the cmds array is the space seperated bash command that will define the task completed by the container | -+--------------+----------------------------------------------------------------+---------------+ -| ``arguments`` | arguments for your bash command | -+--------------+----------------------------------------------------------------+---------------+ -| ``@labels`` | Labels are an important element of launching kubernetes pods, as it tells -| | kubernetes what pods a service can route to. For example, if you launch 5 postgres pods with the label {'postgres':'foo'} | -| | and create a postgres service with the same label, kubernetes will know that any time that service is queried, it can pick any of those 5 postgres instances as the endpoint for that service. | -+--------------+----------------------------------------------------------------+---------------+ -| ``@name`` | name of the task you want to run, will be used to generate a pod id | -+--------------+----------------------------------------------------------------+---------------+ +================================= ==================================== +Variable Description +================================= ==================================== +``@namespace`` The namespace is your isolated work environment within kubernetes +``@image`` docker image you wish to launch. Defaults to dockerhub.io, but fully qualified URLS will point to custom repositories + ``@cmds`` To start a task in a docker image, we need to tell it what to do. the cmds array is the space seperated bash command that will define the task completed by the container +``arguments`` arguments for your bash command +``@labels`` Labels are an important element of launching kubernetes pods, as it tells kubernetes what pods a service can route to. For example, if you launch 5 postgres pods with the label {'postgres':'foo'} and create a postgres service with the same label, kubernetes will know that any time that service is queried, it can pick any of those 5 postgres instances as the endpoint for that service. +``@name`` name of the task you want to run, will be used to generate a pod id +================================= ==================================== + http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d5b13a3d/scripts/ci/kubernetes/minikube/start_minikube.sh ---------------------------------------------------------------------- diff --git a/scripts/ci/kubernetes/minikube/start_minikube.sh b/scripts/ci/kubernetes/minikube/start_minikube.sh index 1da23d0..349b210 100755 --- a/scripts/ci/kubernetes/minikube/start_minikube.sh +++ b/scripts/ci/kubernetes/minikube/start_minikube.sh @@ -1,21 +1,21 @@ -# Licensed to the Apache Software Foundation (ASF) under one * -# or more contributor license agreements. See the NOTICE file * -# distributed with this work for additional information * -# regarding copyright ownership. The ASF licenses this file * -# to you under the Apache License, Version 2.0 (the * -# "License"); you may not use this file except in compliance * -# with the License. You may obtain a copy of the License at * -# * -# http://www.apache.org/licenses/LICENSE-2.0 * -# * -# Unless required by applicable law or agreed to in writing, * -# software distributed under the License is distributed on an * -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * -# KIND, either express or implied. See the License for the * -# specific language governing permissions and limitations * -# under the License. * - #!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + # Guard against a kubernetes cluster already being up kubectl get pods &> /dev/null if [ $? -eq 0 ]; then @@ -54,7 +54,7 @@ start_minikube(){ # We do not need dynamic hostpath provisioning, so disable the default storageclass sudo -E minikube addons disable default-storageclass && kubectl delete storageclasses --all - # We need to give permission to watch pods to the airflow scheduler. + # We need to give permission to watch pods to the airflow scheduler. # The easiest way to do that is by giving admin access to the default serviceaccount (NOT SAFE!) kubectl create clusterrolebinding add-on-cluster-admin --clusterrole=cluster-admin --serviceaccount=default:default exit 0 http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d5b13a3d/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py b/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py index 5cdd819..a9a8e97 100644 --- a/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py +++ b/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py @@ -19,16 +19,16 @@ import unittest from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator from airflow import AirflowException from subprocess import check_call +import logging try: check_call(["kubectl", "get", "pods"]) -except: +except Exception as e: raise unittest.SkipTest( - "Kubernetes integration tests require a minikube cluster; Skipping tests" + "Kubernetes integration tests require a minikube cluster; Skipping tests {}".format(e) ) - class KubernetesPodOperatorTest(unittest.TestCase): def test_working_pod(self):
