[AIRFLOW-1314] Improve error handling Handle too old resource versions and throw exceptions on errors
- K8s API errors will now throw Airflow exceptions - Add scheduler uuid to worker pod labels to match the two Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/317b6c7b Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/317b6c7b Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/317b6c7b Branch: refs/heads/master Commit: 317b6c7bd54099ac3f38ce895fd6ec571635312c Parents: cdb43cb Author: Jordan Zucker <[email protected]> Authored: Tue Apr 10 21:56:14 2018 -0700 Committer: Fokko Driesprong <[email protected]> Committed: Sun Apr 22 10:23:06 2018 +0200 ---------------------------------------------------------------------- .../contrib/executors/kubernetes_executor.py | 58 +++++++++++++++----- .../pod_request_factory.py | 2 +- airflow/contrib/kubernetes/pod_generator.py | 7 ++- .../contrib/kubernetes/worker_configuration.py | 4 +- ...215c0_add_kubernetes_scheduler_uniqueness.py | 49 +++++++++++++++++ airflow/models.py | 34 ++++++++++++ .../ci/kubernetes/kube/airflow.yaml.template | 17 +++++- .../ci/kubernetes/minikube/start_minikube.sh | 2 +- 8 files changed, 150 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/317b6c7b/airflow/contrib/executors/kubernetes_executor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py index b497387..1a50d85 100644 --- a/airflow/contrib/executors/kubernetes_executor.py +++ b/airflow/contrib/executors/kubernetes_executor.py @@ -28,10 +28,10 @@ from airflow.contrib.kubernetes.kube_client import get_kube_client from airflow.contrib.kubernetes.worker_configuration import WorkerConfiguration from airflow.executors.base_executor import BaseExecutor from airflow.executors import Executors -from airflow.models import TaskInstance, KubeResourceVersion +from airflow.models import TaskInstance, KubeResourceVersion, KubeWorkerIdentifier from airflow.utils.state import State from airflow import configuration, settings -from airflow.exceptions import AirflowConfigException +from airflow.exceptions import AirflowConfigException, AirflowException from airflow.utils.log.logging_mixin import LoggingMixin @@ -197,9 +197,10 @@ class KubeConfig: class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin, object): - def __init__(self, namespace, watcher_queue, resource_version): + def __init__(self, namespace, watcher_queue, resource_version, worker_uuid): multiprocessing.Process.__init__(self) self.namespace = namespace + self.worker_uuid = worker_uuid self.watcher_queue = watcher_queue self.resource_version = resource_version @@ -207,7 +208,8 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin, object): kube_client = get_kube_client() while True: try: - self.resource_version = self._run(kube_client, self.resource_version) + self.resource_version = self._run(kube_client, self.resource_version, + self.worker_uuid) except Exception: self.log.exception("Unknown error in KubernetesJobWatcher. Failing") raise @@ -215,13 +217,13 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin, object): self.log.warn("Watch died gracefully, starting back up with: " "last resource_version: {}".format(self.resource_version)) - def _run(self, kube_client, resource_version): + def _run(self, kube_client, resource_version, worker_uuid): self.log.info( "Event: and now my watch begins starting at resource_version: {}" .format(resource_version)) watcher = watch.Watch() - kwargs = {"label_selector": "airflow-slave"} + kwargs = {"label_selector": "airflow-worker={}".format(worker_uuid)} if resource_version: kwargs["resource_version"] = resource_version @@ -232,6 +234,8 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin, object): self.log.info( "Event: {} had an event of type {}".format(task.metadata.name, event['type'])) + if event['type'] == 'ERROR': + return self.process_error(event) self.process_status( task.metadata.name, task.status.phase, task.metadata.labels, task.metadata.resource_version @@ -240,6 +244,19 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin, object): return last_resource_version + def process_error(self, event): + self.log.error("Encountered Error response from k8s list namespaced pod " + "stream => {}".format(event)) + raw_object = event['raw_object'] + if raw_object['code'] == 410: + self.log.info('Kubernetes resource version is too old, must ' + 'reset to 0 => {}'.format(raw_object['message'])) + # Return resource version 0 + return '0' + raise AirflowException( + 'Kubernetes failure for {} with code {} and message: {}' + .format(raw_object['reason'], raw_object['code'], raw_object['message'])) + def process_status(self, pod_id, status, labels, resource_version): if status == 'Pending': self.log.info("Event: {} Pending".format(pod_id)) @@ -258,7 +275,8 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin, object): class AirflowKubernetesScheduler(LoggingMixin, object): - def __init__(self, kube_config, task_queue, result_queue, session, kube_client): + def __init__(self, kube_config, task_queue, result_queue, + session, kube_client, worker_uuid): self.log.debug("creating kubernetes executor") self.kube_config = kube_config self.task_queue = task_queue @@ -270,12 +288,13 @@ class AirflowKubernetesScheduler(LoggingMixin, object): self.worker_configuration = WorkerConfiguration(kube_config=self.kube_config) self.watcher_queue = multiprocessing.Queue() self._session = session + self.worker_uuid = worker_uuid self.kube_watcher = self._make_kube_watcher() def _make_kube_watcher(self): resource_version = KubeResourceVersion.get_current_resource_version(self._session) watcher = KubernetesJobWatcher(self.namespace, self.watcher_queue, - resource_version) + resource_version, self.worker_uuid) watcher.start() return watcher @@ -302,7 +321,8 @@ class AirflowKubernetesScheduler(LoggingMixin, object): self.log.debug("k8s: running for command {}".format(command)) self.log.debug("k8s: launching image {}".format(self.kube_config.kube_image)) pod = self.worker_configuration.make_pod( - namespace=self.namespace, pod_id=self._create_pod_id(dag_id, task_id), + namespace=self.namespace, worker_uuid=self.worker_uuid, + pod_id=self._create_pod_id(dag_id, task_id), dag_id=dag_id, task_id=task_id, execution_date=self._datetime_to_label_safe_datestring(execution_date), airflow_command=command, kube_executor_config=kube_executor_config @@ -436,6 +456,7 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin): self.result_queue = None self.kube_scheduler = None self.kube_client = None + self.worker_uuid = None super(KubernetesExecutor, self).__init__(parallelism=self.kube_config.parallelism) def clear_not_launched_queued_tasks(self): @@ -461,11 +482,11 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin): len(queued_tasks))) for t in queued_tasks: - kwargs = dict(label_selector="dag_id={},task_id={},execution_date={}".format( - t.dag_id, t.task_id, - AirflowKubernetesScheduler._datetime_to_label_safe_datestring( - t.execution_date) - )) + dict_string = "dag_id={},task_id={},execution_date={},airflow-worker={}"\ + .format(t.dag_id, t.task_id, + AirflowKubernetesScheduler._datetime_to_label_safe_datestring( + t.execution_date), self.worker_uuid) + kwargs = dict(label_selector=dict_string) pod_list = self.kube_client.list_namespaced_pod( self.kube_config.kube_namespace, **kwargs) if len(pod_list.items) == 0: @@ -516,12 +537,19 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin): def start(self): self.log.info('k8s: starting kubernetes executor') self._session = settings.Session() + self.worker_uuid = KubeWorkerIdentifier.get_or_create_current_kube_worker_uuid( + self._session) + self.log.debug('k8s: starting with worker_uuid: {}'.format(self.worker_uuid)) + # always need to reset resource version since we don't know + # when we last started, note for behavior below + # https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/CoreV1Api.md#list_namespaced_pod + KubeResourceVersion.reset_resource_version(self._session) self.task_queue = Queue() self.result_queue = Queue() self.kube_client = get_kube_client() self.kube_scheduler = AirflowKubernetesScheduler( self.kube_config, self.task_queue, self.result_queue, self._session, - self.kube_client + self.kube_client, self.worker_uuid ) self._inject_secrets() self.clear_not_launched_queued_tasks() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/317b6c7b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py index 106a6be..0f06d49 100644 --- a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py @@ -31,7 +31,7 @@ metadata: spec: containers: - name: base - image: airflow-slave:latest + image: airflow-worker:latest command: ["/usr/local/airflow/entrypoint.sh", "/bin/bash sleep 25"] restartPolicy: Never """ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/317b6c7b/airflow/contrib/kubernetes/pod_generator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/pod_generator.py b/airflow/contrib/kubernetes/pod_generator.py index cf85092..82ac7c4 100644 --- a/airflow/contrib/kubernetes/pod_generator.py +++ b/airflow/contrib/kubernetes/pod_generator.py @@ -201,9 +201,9 @@ class WorkerGenerator(PodGenerator): volumes[0]["emptyDir"] = {} return volumes, volume_mounts - def _init_labels(self, dag_id, task_id, execution_date): + def _init_labels(self, dag_id, task_id, execution_date, worker_uuid): return { - "airflow-slave": "", + "airflow-worker": worker_uuid, "dag_id": dag_id, "task_id": task_id, "execution_date": execution_date @@ -264,6 +264,7 @@ class WorkerGenerator(PodGenerator): def make_worker_pod(self, namespace, + worker_uuid, pod_id, dag_id, task_id, @@ -271,7 +272,7 @@ class WorkerGenerator(PodGenerator): airflow_command, kube_executor_config): cmds = ["bash", "-cx", "--"] - labels = self._init_labels(dag_id, task_id, execution_date) + labels = self._init_labels(dag_id, task_id, execution_date, worker_uuid) PodGenerator.make_pod(self, namespace=namespace, pod_id=pod_id, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/317b6c7b/airflow/contrib/kubernetes/worker_configuration.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/worker_configuration.py b/airflow/contrib/kubernetes/worker_configuration.py index 988f4a5..e97f5a3 100644 --- a/airflow/contrib/kubernetes/worker_configuration.py +++ b/airflow/contrib/kubernetes/worker_configuration.py @@ -139,7 +139,7 @@ class WorkerConfiguration: return [] return self.kube_config.image_pull_secrets.split(',') - def make_pod(self, namespace, pod_id, dag_id, task_id, execution_date, + def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_date, airflow_command, kube_executor_config): volumes, volume_mounts = self._get_volumes_and_mounts() worker_init_container_spec = self._get_init_containers( @@ -162,7 +162,7 @@ class WorkerConfiguration: cmds=["bash", "-cx", "--"], args=[airflow_command], labels={ - "airflow-slave": "", + "airflow-worker": worker_uuid, "dag_id": dag_id, "task_id": task_id, "execution_date": execution_date http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/317b6c7b/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py ---------------------------------------------------------------------- diff --git a/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py b/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py new file mode 100644 index 0000000..6bc48f1 --- /dev/null +++ b/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py @@ -0,0 +1,49 @@ +# flake8: noqa +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""add kubernetes scheduler uniqueness + +Revision ID: 86770d1215c0 +Revises: 27c6a30d7c24 +Create Date: 2018-04-03 15:31:20.814328 + +""" + +# revision identifiers, used by Alembic. +revision = '86770d1215c0' +down_revision = '27c6a30d7c24' +branch_labels = None +depends_on = None + +from alembic import op +import sqlalchemy as sa + + +RESOURCE_TABLE = "kube_worker_uuid" + + +def upgrade(): + table = op.create_table( + RESOURCE_TABLE, + sa.Column("one_row_id", sa.Boolean, server_default=sa.true(), primary_key=True), + sa.Column("worker_uuid", sa.String(255)), + sa.CheckConstraint("one_row_id", name="kube_worker_one_row_id") + ) + op.bulk_insert(table, [ + {"worker_uuid": ""} + ]) + + +def downgrade(): + op.drop_table(RESOURCE_TABLE) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/317b6c7b/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 2de1ade..18e9e26 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -50,6 +50,7 @@ import traceback import warnings import hashlib +import uuid from datetime import datetime from urllib.parse import urlparse, quote @@ -5137,3 +5138,36 @@ class KubeResourceVersion(Base): KubeResourceVersion.resource_version: resource_version }) session.commit() + + @staticmethod + @provide_session + def reset_resource_version(session=None): + session.query(KubeResourceVersion).update({ + KubeResourceVersion.resource_version: '0' + }) + session.commit() + return '0' + + +class KubeWorkerIdentifier(Base): + __tablename__ = "kube_worker_uuid" + one_row_id = Column(Boolean, server_default=sqltrue(), primary_key=True) + worker_uuid = Column(String(255)) + + @staticmethod + @provide_session + def get_or_create_current_kube_worker_uuid(session=None): + (worker_uuid,) = session.query(KubeWorkerIdentifier.worker_uuid).one() + if worker_uuid == '': + worker_uuid = str(uuid.uuid4()) + KubeWorkerIdentifier.checkpoint_kube_worker_uuid(worker_uuid, session) + return worker_uuid + + @staticmethod + @provide_session + def checkpoint_kube_worker_uuid(worker_uuid, session=None): + if worker_uuid: + session.query(KubeWorkerIdentifier).update({ + KubeWorkerIdentifier.worker_uuid: worker_uuid + }) + session.commit() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/317b6c7b/scripts/ci/kubernetes/kube/airflow.yaml.template ---------------------------------------------------------------------- diff --git a/scripts/ci/kubernetes/kube/airflow.yaml.template b/scripts/ci/kubernetes/kube/airflow.yaml.template index ae00983..e16cc08 100644 --- a/scripts/ci/kubernetes/kube/airflow.yaml.template +++ b/scripts/ci/kubernetes/kube/airflow.yaml.template @@ -42,6 +42,21 @@ spec: requests: storage: 10Gi --- +apiVersion: rbac.authorization.k8s.io/v1beta1 +kind: ClusterRoleBinding +metadata: + name: admin-rbac +subjects: + - kind: ServiceAccount + # Reference to upper's `metadata.name` + name: default + # Reference to upper's `metadata.namespace` + namespace: default +roleRef: + kind: ClusterRole + name: cluster-admin + apiGroup: rbac.authorization.k8s.io +--- apiVersion: extensions/v1beta1 kind: Deployment metadata: @@ -173,7 +188,7 @@ data: dags_folder = /root/airflow/dags base_log_folder = /root/airflow/logs logging_level = INFO - executor = KubernetesExecutor + executor = KubernetesExecutor parallelism = 32 plugins_folder = /root/airflow/plugins sql_alchemy_conn = $SQL_ALCHEMY_CONN http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/317b6c7b/scripts/ci/kubernetes/minikube/start_minikube.sh ---------------------------------------------------------------------- diff --git a/scripts/ci/kubernetes/minikube/start_minikube.sh b/scripts/ci/kubernetes/minikube/start_minikube.sh index be370cf..525529b 100755 --- a/scripts/ci/kubernetes/minikube/start_minikube.sh +++ b/scripts/ci/kubernetes/minikube/start_minikube.sh @@ -23,7 +23,7 @@ if [ $? -eq 0 ]; then fi # -curl -Lo minikube https://storage.googleapis.com/minikube/releases/v0.24.1/minikube-linux-amd64 && chmod +x minikube +curl -Lo minikube https://storage.googleapis.com/minikube/releases/v0.26.0/minikube-linux-amd64 && chmod +x minikube curl -Lo kubectl https://storage.googleapis.com/kubernetes-release/release/${KUBERNETES_VERSION}/bin/linux/amd64/kubectl && chmod +x kubectl sudo mkdir -p /usr/local/bin
