[AIRFLOW=1314] Basic Kubernetes Mode
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/58213208 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/58213208 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/58213208 Branch: refs/heads/master Commit: 582132088079a6ca89199af4f41944259565287a Parents: f520990 Author: dimberman <[email protected]> Authored: Tue Jun 27 09:55:19 2017 -0700 Committer: Fokko Driesprong <[email protected]> Committed: Sun Apr 22 10:17:39 2018 +0200 ---------------------------------------------------------------------- airflow/__init__.py | 1 + .../contrib/executors/kubernetes_executor.py | 252 ++++++++++++++++++ airflow/contrib/kubernetes/__init__.py | 29 +- airflow/contrib/kubernetes/kubernetes_helper.py | 35 +++ .../kubernetes/kubernetes_job_builder.py | 74 ++++++ .../kubernetes/kubernetes_pod_builder.py | 74 ++++++ .../kubernetes_request_factory/__init__.py | 28 +- .../job_request_factory.py | 60 +++++ .../kubernetes_request_factory.py | 234 +++++++--------- .../pod_request_factory.py | 94 ++++--- airflow/contrib/kubernetes/pod.py | 147 +++++------ airflow/contrib/kubernetes/pod_launcher.py | 264 ++++++++++--------- .../operators/k8s_pod_operator/__init__.py | 13 + .../k8s_pod_operator/k8s_pod_operator.py | 126 +++++++++ .../operators/k8s_pod_operator/op_context.py | 104 ++++++++ airflow/dag_importer/__init__.py | 83 ++++++ airflow/executors/__init__.py | 9 +- airflow/executors/base_executor.py | 20 +- airflow/models.py | 22 ++ airflow/plugins_manager.py | 1 + kubectl | 0 scripts/ci/kubernetes/docker/airflow.tar.gz | Bin 0 -> 2374737 bytes .../ci/kubernetes/kube/.generated/airflow.yaml | 195 ++++++++++++++ scripts/ci/requirements.txt | 96 +++++++ tests/contrib/__init__.py | 2 +- tests/contrib/kubernetes/__init__.py | 14 + tests/contrib/kubernetes/test_kubernetes_job.py | 12 + .../kubernetes/test_kubernetes_job_launcher.py | 59 +++++ 28 files changed, 1625 insertions(+), 423 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/58213208/airflow/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/__init__.py b/airflow/__init__.py index f40b08a..296b67c 100644 --- a/airflow/__init__.py +++ b/airflow/__init__.py @@ -85,6 +85,7 @@ from airflow import sensors # noqa: E402 from airflow import hooks from airflow import executors from airflow import macros +from airflow import contrib operators._integrate_plugins() sensors._integrate_plugins() # noqa: E402 http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/58213208/airflow/contrib/executors/kubernetes_executor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py new file mode 100644 index 0000000..0a3e9f2 --- /dev/null +++ b/airflow/contrib/executors/kubernetes_executor.py @@ -0,0 +1,252 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import calendar +import logging +import time +import os +import multiprocessing +from airflow.contrib.kubernetes.kubernetes_pod_builder import KubernetesPodBuilder +from airflow.contrib.kubernetes.kubernetes_helper import KubernetesHelper +from queue import Queue +from kubernetes import watch +from airflow import settings +from airflow.contrib.kubernetes.kubernetes_request_factory import SimplePodRequestFactory +from airflow.executors.base_executor import BaseExecutor +from airflow.models import TaskInstance +from airflow.utils.state import State +from airflow import configuration +import json +# TODO this is just for proof of concept. remove before merging. + + + + +def _prep_command_for_container(command): + """ + When creating a kubernetes pod, the yaml expects the command + in the form of ["cmd","arg","arg","arg"...] + This function splits the command string into tokens + and then matches it to the convention. + + :param command: + + :return: + + """ + return '"' + '","'.join(command.split(' ')[1:]) + '"' + + +class KubernetesJobWatcher(multiprocessing.Process, object): + def __init__(self, watch_function, namespace, result_queue, watcher_queue): + self.logger = logging.getLogger(__name__) + multiprocessing.Process.__init__(self) + self.result_queue = result_queue + self._watch_function = watch_function + self._watch = watch.Watch() + self.namespace = namespace + self.watcher_queue = watcher_queue + + def run(self): + self.logger.info("Event: and now my watch begins") + self.logger.info("Event: proof of image change") + self.logger.info("Event: running {} with {}".format(str(self._watch_function), + self.namespace)) + for event in self._watch.stream(self._watch_function, self.namespace): + task= event['object'] + self.logger.info("Event: {} had an event of type {}".format(task.metadata.name, + event['type'])) + self.process_status(task.metadata.name, task.status.phase) + + def process_status(self, job_id, status): + if status == 'Pending': + self.logger.info("Event: {} Pending".format(job_id)) + elif status == 'Failed': + self.logger.info("Event: {} Failed".format(job_id)) + self.watcher_queue.put((job_id, State.FAILED)) + elif status == 'Succeeded': + self.logger.info("Event: {} Succeeded".format(job_id)) + self.watcher_queue.put((job_id, None)) + elif status == 'Running': + self.logger.info("Event: {} is Running".format(job_id)) + else: + self.logger.info("Event: Invalid state {} on job {}".format(status, job_id)) + + +class AirflowKubernetesScheduler(object): + def __init__(self, + task_queue, + result_queue, + running): + self.logger = logging.getLogger(__name__) + self.logger.info("creating kubernetes executor") + self.task_queue = task_queue + self.namespace = os.environ['k8s_POD_NAMESPACE'] + self.logger.info("k8s: using namespace {}".format(self.namespace)) + self.result_queue = result_queue + self.current_jobs = {} + self.running = running + self._task_counter = 0 + self.watcher_queue = multiprocessing.Queue() + self.helper = KubernetesHelper() + w = KubernetesJobWatcher(self.helper.pod_api.list_namespaced_pod, self.namespace, + self.result_queue, self.watcher_queue) + w.start() + + def run_next(self, next_job): + """ + + The run_next command will check the task_queue for any un-run jobs. + It will then create a unique job-id, launch that job in the cluster, + and store relevent info in the current_jobs map so we can track the job's + status + + :return: + + """ + self.logger.info('k8s: job is {}'.format(str(next_job))) + (key, command) = next_job + self.logger.info("running for command {}".format(command)) + epoch_time = calendar.timegm(time.gmtime()) + command_list = ["/usr/local/airflow/entrypoint.sh"] + command.split()[1:] + \ + ['-km'] + self._set_host_id(key) + pod_id = self._create_job_id_from_key(key=key, epoch_time=epoch_time) + self.current_jobs[pod_id] = key + + image = configuration.get('core','k8s_image') + print("k8s: launching image {}".format(image)) + pod = KubernetesPodBuilder( + image= image, + cmds=command_list, + kub_req_factory=SimplePodRequestFactory(), + namespace=self.namespace) + pod.add_name(pod_id) + pod.launch() + self._task_counter += 1 + + self.logger.info("k8s: Job created!") + + def delete_job(self, key): + job_id = self.current_jobs[key] + self.helper.delete_job(job_id, namespace=self.namespace) + + def sync(self): + """ + + The sync function checks the status of all currently running kubernetes jobs. + If a job is completed, it's status is placed in the result queue to + be sent back to the scheduler. + + :return: + + """ + while not self.watcher_queue.empty(): + self.end_task() + + def end_task(self): + job_id, state = self.watcher_queue.get() + if job_id in self.current_jobs: + key = self.current_jobs[job_id] + self.logger.info("finishing job {}".format(key)) + if state: + self.result_queue.put((key, state)) + self.current_jobs.pop(job_id) + self.running.pop(key) + + def _create_job_id_from_key(self, key, epoch_time): + """ + + Kubernetes pod names must unique and match specific conventions + (i.e. no spaces, period, etc.) + This function creates a unique name using the epoch time and internal counter + + :param key: + + :param epoch_time: + + :return: + + """ + + keystr = '-'.join([str(x).replace(' ', '-') for x in key[:2]]) + job_fields = [keystr, str(self._task_counter), str(epoch_time)] + unformatted_job_id = '-'.join(job_fields) + job_id = unformatted_job_id.replace('_', '-') + return job_id + + def _set_host_id(self, key): + (dag_id, task_id, ex_time) = key + session = settings.Session() + item = session.query(TaskInstance) \ + .filter_by(dag_id=dag_id, task_id=task_id, execution_date=ex_time).one() + + host_id = item.hostname + print("host is {}".format(host_id)) + + +class KubernetesExecutor(BaseExecutor): + + def start(self): + self.logger.info('k8s: starting kubernetes executor') + self.task_queue = Queue() + self._session = settings.Session() + self.result_queue = Queue() + self.kub_client = AirflowKubernetesScheduler(self.task_queue, + self.result_queue, + running=self.running) + + def sync(self): + self.kub_client.sync() + while not self.result_queue.empty(): + results = self.result_queue.get() + self.logger.info("reporting {}".format(results)) + self.change_state(*results) + + # TODO this could be a job_counter based on max jobs a user wants + if len(self.kub_client.current_jobs) > 3: + self.logger.info("currently a job is running") + else: + self.logger.info("queue ready, running next") + if not self.task_queue.empty(): + (key, command) = self.task_queue.get() + self.kub_client.run_next((key, command)) + + def terminate(self): + pass + + def change_state(self, key, state): + self.logger.info("k8s: setting state of {} to {}".format(key, state)) + if state != State.RUNNING: + self.kub_client.delete_job(key) + self.running.pop(key) + self.event_buffer[key] = state + (dag_id, task_id, ex_time) = key + item = self._session.query(TaskInstance).filter_by( + dag_id=dag_id, + task_id=task_id, + execution_date=ex_time).one() + + if item.state == State.RUNNING or item.state == State.QUEUED: + item.state = state + self._session.add(item) + self._session.commit() + + def end(self): + self.logger.info('ending kube executor') + self.task_queue.join() + + def execute_async(self, key, command, queue=None): + self.logger.info("k8s: adding task {} with command {}".format(key, command)) + self.task_queue.put((key, command)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/58213208/airflow/contrib/kubernetes/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/__init__.py b/airflow/contrib/kubernetes/__init__.py index 13a8339..59eeddf 100644 --- a/airflow/contrib/kubernetes/__init__.py +++ b/airflow/contrib/kubernetes/__init__.py @@ -1,16 +1,17 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at +# -*- coding: utf-8 -*- # -# http://www.apache.org/licenses/LICENSE-2.0 +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at # -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from airflow import dag_importer + +dag_importer.import_dags() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/58213208/airflow/contrib/kubernetes/kubernetes_helper.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_helper.py b/airflow/contrib/kubernetes/kubernetes_helper.py new file mode 100644 index 0000000..862d76a --- /dev/null +++ b/airflow/contrib/kubernetes/kubernetes_helper.py @@ -0,0 +1,35 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import yaml +from kubernetes import client, config + + +class KubernetesHelper(object): + def __init__(self): + config.load_incluster_config() + self.job_api = client.BatchV1Api() + self.pod_api = client.CoreV1Api() + + def launch_job(self, pod_info, namespace): + dep = yaml.load(pod_info) + resp = self.job_api.create_namespaced_job(body=dep, namespace=namespace) + return resp + + def get_status(self, pod_id, namespace): + return self.job_api.read_namespaced_job(pod_id, namespace).status + + def delete_job(self, job_id, namespace): + body = client.V1DeleteOptions() + self.job_api.delete_namespaced_job(name=job_id, namespace=namespace, body=body) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/58213208/airflow/contrib/kubernetes/kubernetes_job_builder.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_job_builder.py b/airflow/contrib/kubernetes/kubernetes_job_builder.py new file mode 100644 index 0000000..65237ff --- /dev/null +++ b/airflow/contrib/kubernetes/kubernetes_job_builder.py @@ -0,0 +1,74 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and + +from kubernetes import client, config +import json +import logging + + +class KubernetesJobBuilder: + def __init__( + self, + image, + cmds, + namespace, + kub_req_factory=None + ): + self.image = image + self.cmds = cmds + self.kub_req_factory = kub_req_factory + self.namespace = namespace + self.logger = logging.getLogger(self.__class__.__name__) + self.envs = {} + self.labels = {} + self.secrets = {} + self.node_selectors = [] + self.name = None + + def add_env_variables(self, env): + self.envs = env + + def add_secrets(self, secrets): + self.secrets = secrets + + def add_labels(self, labels): + self.labels = labels + + def add_name(self, name): + self.name = name + + def set_namespace(self, namespace): + self.namespace = namespace + + def launch(self): + """ + Launches the pod synchronously and waits for completion. + """ + k8s_beta = self._kube_client() + req = self.kub_req_factory.create(self) + print(json.dumps(req)) + resp = k8s_beta.create_namespaced_job(body=req, namespace=self.namespace) + self.logger.info("Job created. status='%s', yaml:\n%s", + str(resp.status), str(req)) + + def _kube_client(self): + config.load_incluster_config() + return client.BatchV1Api() + + def _execution_finished(self): + k8s_beta = self._kube_client() + resp = k8s_beta.read_namespaced_job_status(self.name, namespace=self.namespace) + self.logger.info('status : ' + str(resp.status)) + if resp.status.phase == 'Failed': + raise Exception("Job " + self.name + " failed!") + return resp.status.phase != 'Running' http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/58213208/airflow/contrib/kubernetes/kubernetes_pod_builder.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_pod_builder.py b/airflow/contrib/kubernetes/kubernetes_pod_builder.py new file mode 100644 index 0000000..2b0a9e4 --- /dev/null +++ b/airflow/contrib/kubernetes/kubernetes_pod_builder.py @@ -0,0 +1,74 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and + +from kubernetes import client, config +import json +import logging + + +class KubernetesPodBuilder: + def __init__( + self, + image, + cmds, + namespace, + kub_req_factory=None + ): + self.image = image + self.cmds = cmds + self.kub_req_factory = kub_req_factory + self.namespace = namespace + self.logger = logging.getLogger(self.__class__.__name__) + self.envs = {} + self.labels = {} + self.secrets = {} + self.node_selectors = [] + self.name = None + + def add_env_variables(self, env): + self.envs = env + + def add_secrets(self, secrets): + self.secrets = secrets + + def add_labels(self, labels): + self.labels = labels + + def add_name(self, name): + self.name = name + + def set_namespace(self, namespace): + self.namespace = namespace + + def launch(self): + """ + Launches the pod synchronously and waits for completion. + """ + k8s_beta = self._kube_client() + req = self.kub_req_factory.create(self) + print(json.dumps(req)) + resp = k8s_beta.create_namespaced_pod(body=req, namespace=self.namespace) + self.logger.info("Job created. status='%s', yaml:\n%s", + str(resp.status), str(req)) + + def _kube_client(self): + config.load_incluster_config() + return client.CoreV1Api() + + def _execution_finished(self): + k8s_beta = self._kube_client() + resp = k8s_beta.read_namespaced_job_status(self.name, namespace=self.namespace) + self.logger.info('status : ' + str(resp.status)) + if resp.status.phase == 'Failed': + raise Exception("Job " + self.name + " failed!") + return resp.status.phase != 'Running' http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/58213208/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py b/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py index 13a8339..676245c 100644 --- a/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py @@ -1,16 +1,16 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at +# -*- coding: utf-8 -*- # -# http://www.apache.org/licenses/LICENSE-2.0 +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at # -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and + +from .kubernetes_request_factory import * +from .job_request_factory import * +from .pod_request_factory import * http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/58213208/airflow/contrib/kubernetes/kubernetes_request_factory/job_request_factory.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/job_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/job_request_factory.py new file mode 100644 index 0000000..fda488e --- /dev/null +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/job_request_factory.py @@ -0,0 +1,60 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and + +import yaml +from .kubernetes_request_factory import * + + +class SimpleJobRequestFactory(KubernetesRequestFactory): + """ + Request generator for a simple pod. + """ + + def __init__(self): + pass + + _yaml = """apiVersion: batch/v1 +kind: Job +metadata: + name: name +spec: + template: + metadata: + name: name + spec: + containers: + - name: base + image: airflow-slave:latest + command: ["/usr/local/airflow/entrypoint.sh", "/bin/bash sleep 25"] + volumeMounts: + - name: shared-data + mountPath: "/usr/local/airflow/dags" + restartPolicy: Never + """ + + def create(self, pod): + req = yaml.load(self._yaml) + sub_req = req['spec']['template'] + extract_name(pod, sub_req) + extract_labels(pod, sub_req) + extract_image(pod, sub_req) + extract_cmds(pod, sub_req) + if len(pod.node_selectors) > 0: + extract_node_selector(pod, sub_req) + extract_secrets(pod, sub_req) + print("attaching volume mounts") + attach_volume_mounts(sub_req) + return req + + + http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/58213208/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py index cbf3fce..a103fd9 100644 --- a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py @@ -1,165 +1,107 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at +# -*- coding: utf-8 -*- # -# http://www.apache.org/licenses/LICENSE-2.0 +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at # -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging from abc import ABCMeta, abstractmethod -import six +from airflow import dag_importer -class KubernetesRequestFactory: +class KubernetesRequestFactory(): """ - Create requests to be sent to kube API. - Extend this class to talk to kubernetes and generate your specific resources. - This is equivalent of generating yaml files that can be used by `kubectl` + Create requests to be sent to kube API. Extend this class + to talk to kubernetes and generate your specific resources. + This is equivalent of generating yaml files that can be used + by `kubectl` """ __metaclass__ = ABCMeta @abstractmethod def create(self, pod): """ - Creates the request for kubernetes API. + Creates the request for kubernetes API. - :param pod: The pod object + :param pod: The pod object """ pass - @staticmethod - def extract_image(pod, req): - req['spec']['containers'][0]['image'] = pod.image - - @staticmethod - def extract_image_pull_policy(pod, req): - if pod.image_pull_policy: - req['spec']['containers'][0]['imagePullPolicy'] = pod.image_pull_policy - - @staticmethod - def add_secret_to_env(env, secret): - env.append({ - 'name': secret.deploy_target, - 'valueFrom': { - 'secretKeyRef': { - 'name': secret.secret, - 'key': secret.key - } + +def extract_image(pod, req): + req['spec']['containers'][0]['image'] = pod.image + + +def add_secret_to_env(env, secret): + env.append({ + 'name': secret.deploy_target, + 'valueFrom': { + 'secretKeyRef': { + 'name': secret.secret, + 'key': secret.key } - }) + } + }) + + +def extract_labels(pod, req): + for k in pod.labels.keys(): + req['metadata']['labels'][k] = pod.labels[k] + + +def extract_cmds(pod, req): + req['spec']['containers'][0]['command'] = pod.cmds + + +def extract_node_selector(pod, req): + req['spec']['nodeSelector'] = pod.node_selectors - @staticmethod - def extract_labels(pod, req): - req['metadata']['labels'] = req['metadata'].get('labels', {}) - for k, v in six.iteritems(pod.labels): - req['metadata']['labels'][k] = v - - @staticmethod - def extract_cmds(pod, req): - req['spec']['containers'][0]['command'] = pod.cmds - - @staticmethod - def extract_args(pod, req): - req['spec']['containers'][0]['args'] = pod.args - - @staticmethod - def extract_node_selector(pod, req): - if len(pod.node_selectors) > 0: - req['spec']['nodeSelector'] = pod.node_selectors - - @staticmethod - def attach_volumes(pod, req): - req['spec']['volumes'] = pod.volumes - - @staticmethod - def attach_volume_mounts(pod, req): - if len(pod.volume_mounts) > 0: - req['spec']['containers'][0]['volumeMounts'] = ( - req['spec']['containers'][0].get('volumeMounts', [])) - req['spec']['containers'][0]['volumeMounts'].extend(pod.volume_mounts) - - @staticmethod - def extract_name(pod, req): - req['metadata']['name'] = pod.name - - @staticmethod - def extract_volume_secrets(pod, req): - vol_secrets = [s for s in pod.secrets if s.deploy_type == 'volume'] - if any(vol_secrets): - req['spec']['containers'][0]['volumeMounts'] = [] - req['spec']['volumes'] = [] - for idx, vol in enumerate(vol_secrets): - vol_id = 'secretvol' + str(idx) - req['spec']['containers'][0]['volumeMounts'].append({ - 'mountPath': vol.deploy_target, - 'name': vol_id, - 'readOnly': True - }) - req['spec']['volumes'].append({ - 'name': vol_id, - 'secret': { - 'secretName': vol.secret - } - }) - - @staticmethod - def extract_env_and_secrets(pod, req): - env_secrets = [s for s in pod.secrets if s.deploy_type == 'env'] - if len(pod.envs) > 0 or len(env_secrets) > 0: - env = [] - for k in pod.envs.keys(): - env.append({'name': k, 'value': pod.envs[k]}) - for secret in env_secrets: - KubernetesRequestFactory.add_secret_to_env(env, secret) - req['spec']['containers'][0]['env'] = env - - @staticmethod - def extract_resources(pod, req): - if not pod.resources or pod.resources.is_empty_resource_request(): - return - - req['spec']['containers'][0]['resources'] = {} - - if pod.resources.has_requests(): - req['spec']['containers'][0]['resources']['requests'] = {} - if pod.resources.request_memory: - req['spec']['containers'][0]['resources']['requests'][ - 'memory'] = pod.resources.request_memory - if pod.resources.request_cpu: - req['spec']['containers'][0]['resources']['requests'][ - 'cpu'] = pod.resources.request_cpu - - if pod.resources.has_limits(): - req['spec']['containers'][0]['resources']['limits'] = {} - if pod.resources.request_memory: - req['spec']['containers'][0]['resources']['limits'][ - 'memory'] = pod.resources.limit_memory - if pod.resources.request_cpu: - req['spec']['containers'][0]['resources']['limits'][ - 'cpu'] = pod.resources.limit_cpu - - @staticmethod - def extract_init_containers(pod, req): - if pod.init_containers: - req['spec']['initContainers'] = pod.init_containers - - @staticmethod - def extract_service_account_name(pod, req): - if pod.service_account_name: - req['spec']['serviceAccountName'] = pod.service_account_name - - @staticmethod - def extract_image_pull_secrets(pod, req): - if pod.image_pull_secrets: - req['spec']['imagePullSecrets'] = [{ - 'name': pull_secret - } for pull_secret in pod.image_pull_secrets.split(',')] + +def extract_secrets(pod, req): + env_secrets = [s for s in pod.secrets if s.deploy_type == 'env'] + if len(pod.envs) > 0 or len(env_secrets) > 0: + env = [] + for k in pod.envs.keys(): + env.append({'name': k, 'value': pod.envs[k]}) + for secret in env_secrets: + add_secret_to_env(env, secret) + req['spec']['containers'][0]['env'] = env + + +def attach_volume_mounts(req): + logging.info("preparing to import dags") + dag_importer.import_dags() + logging.info("using file mount {}".format(dag_importer.dag_import_spec)) + req['spec']['volumes'] = [dag_importer.dag_import_spec] + + +def extract_name(pod, req): + req['metadata']['name'] = pod.name + + +def extract_volume_secrets(pod, req): + vol_secrets = [s for s in pod.secrets if s.deploy_type == 'volume'] + if any(vol_secrets): + req['spec']['containers'][0]['volumeMounts'] = [] + req['spec']['volumes'] = [] + for idx, vol in enumerate(vol_secrets): + vol_id = 'secretvol' + str(idx) + req['spec']['containers'][0]['volumeMounts'].append({ + 'mountPath': vol.deploy_target, + 'name': vol_id, + 'readOnly': True + }) + req['spec']['volumes'].append({ + 'name': vol_id, + 'secret': { + 'secretName': vol.secret + } + }) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/58213208/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py index 44b05dd..466972b 100644 --- a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py @@ -1,28 +1,24 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at +# -*- coding: utf-8 -*- # -# http://www.apache.org/licenses/LICENSE-2.0 +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at # -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +import kubernetes_request_factory as kreq import yaml -from airflow.contrib.kubernetes.kubernetes_request_factory.kubernetes_request_factory \ - import KubernetesRequestFactory +from airflow import AirflowException -class SimplePodRequestFactory(KubernetesRequestFactory): +class SimplePodRequestFactory(kreq.KubernetesRequestFactory): """ - Request generator for a simple pod. + Request generator for a simple pod. """ _yaml = """apiVersion: v1 kind: Pod @@ -33,6 +29,9 @@ spec: - name: base image: airflow-slave:latest command: ["/usr/local/airflow/entrypoint.sh", "/bin/bash sleep 25"] + volumeMounts: + - name: shared-data + mountPath: "/usr/local/airflow/dags" restartPolicy: Never """ @@ -40,21 +39,48 @@ spec: pass def create(self, pod): - # type: (Pod) -> dict req = yaml.load(self._yaml) - self.extract_name(pod, req) - self.extract_labels(pod, req) - self.extract_image(pod, req) - self.extract_image_pull_policy(pod, req) - self.extract_cmds(pod, req) - self.extract_args(pod, req) - self.extract_node_selector(pod, req) - self.extract_env_and_secrets(pod, req) - self.extract_volume_secrets(pod, req) - self.attach_volumes(pod, req) - self.attach_volume_mounts(pod, req) - self.extract_resources(pod, req) - self.extract_service_account_name(pod, req) - self.extract_init_containers(pod, req) - self.extract_image_pull_secrets(pod, req) + kreq.extract_name(pod, req) + kreq.extract_labels(pod, req) + kreq.extract_image(pod, req) + kreq.extract_cmds(pod, req) + if len(pod.node_selectors) > 0: + kreq.extract_node_selector(pod, req) + kreq.extract_secrets(pod, req) + kreq.extract_volume_secrets(pod, req) + kreq.attach_volume_mounts(req) return req + + +class ReturnValuePodRequestFactory(SimplePodRequestFactory): + """ + Pod request factory with a PreStop hook to upload return value + to the system's etcd service. + :param kube_com_service_factory: Kubernetes Communication Service factory + :type kube_com_service_factory: () => KubernetesCommunicationService + """ + + def __init__(self, kube_com_service_factory, result_data_file): + super(ReturnValuePodRequestFactory, self).__init__() + self._kube_com_service_factory = kube_com_service_factory + self._result_data_file = result_data_file + + def after_create(self, body, pod): + """ + Augment the pod with hyper-parameterized specific logic + Adds a Kubernetes PreStop hook to upload the model training + metrics to the Kubernetes communication engine (probably + an etcd service running with airflow) + """ + container = body['spec']['containers'][0] + pre_stop_hook = self._kube_com_service_factory() \ + .pod_pre_stop_hook(self._result_data_file, pod.name) + # Pre-stop hook only works on containers that are deleted. If the container + # naturally exists there would be no pre-stop hook execution. Therefore we + # simulate the hook by wrapping the exe command inside a script + if "'" in ' '.join(container['command']): + raise AirflowException('Please do not include single quote ' + 'in your command for hyperparameterized pods') + cmd = ' '.join(["'" + c + "'" if " " in c else c for c in container['command']]) + container['command'] = ['/bin/bash', '-c', "({}) ; ({})" + .format(cmd, pre_stop_hook)] http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/58213208/airflow/contrib/kubernetes/pod.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py index b4eb5a1..c38783c 100644 --- a/airflow/contrib/kubernetes/pod.py +++ b/airflow/contrib/kubernetes/pod.py @@ -1,92 +1,91 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at +# -*- coding: utf-8 -*- # -# http://www.apache.org/licenses/LICENSE-2.0 +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at # -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - - -class Resources: - def __init__( - self, - request_memory=None, - request_cpu=None, - limit_memory=None, - limit_cpu=None): - self.request_memory = request_memory - self.request_cpu = request_cpu - self.limit_memory = limit_memory - self.limit_cpu = limit_cpu - - def is_empty_resource_request(self): - return not self.has_limits() and not self.has_requests() - - def has_limits(self): - return self.limit_cpu is not None or self.limit_memory is not None - - def has_requests(self): - return self.request_cpu is not None or self.request_memory is not None +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from kubernetes import client, config +from kubernetes_request_factory import KubernetesRequestFactory, SimplePodRequestFactory +import logging +from airflow import AirflowException +import time +import json class Pod: """ - Represents a kubernetes pod and manages execution of a single pod. - :param image: The docker image - :type image: str - :param env: A dict containing the environment variables - :type env: dict - :param cmds: The command to be run on the pod - :type cmd: list str - :param secrets: Secrets to be launched to the pod - :type secrets: list Secret - :param result: The result that will be returned to the operator after - successful execution of the pod - :type result: any + Represents a kubernetes pod and manages execution of a single pod. + :param image: The docker image + :type image: str + :param env: A dict containing the environment variables + :type env: dict + :param cmds: The command to be run on the pod + :type cmd: list str + :param secrets: Secrets to be launched to the pod + :type secrets: list Secret + :param result: The result that will be returned to the operator after + successful execution of the pod + :type result: any + """ + pod_timeout = 3600 + def __init__( self, image, envs, cmds, - args=None, - secrets=None, - labels=None, - node_selectors=None, - name=None, - volumes=None, - volume_mounts=None, + secrets, + labels, + node_selectors, + kube_req_factory, + name, namespace='default', - result=None, - image_pull_policy="IfNotPresent", - image_pull_secrets=None, - init_containers=None, - service_account_name=None, - resources=None - ): + result=None): self.image = image - self.envs = envs or {} + self.envs = envs self.cmds = cmds - self.args = args or [] - self.secrets = secrets or [] + self.secrets = secrets self.result = result - self.labels = labels or {} + self.labels = labels self.name = name - self.volumes = volumes or [] - self.volume_mounts = volume_mounts or [] - self.node_selectors = node_selectors or [] + self.node_selectors = node_selectors + self.kube_req_factory = (kube_req_factory or SimplePodRequestFactory)() self.namespace = namespace - self.image_pull_policy = image_pull_policy - self.image_pull_secrets = image_pull_secrets - self.init_containers = init_containers - self.service_account_name = service_account_name - self.resources = resources or Resources() + self.logger = logging.getLogger(self.__class__.__name__) + if not isinstance(self.kube_req_factory, KubernetesRequestFactory): + raise AirflowException('`kube_req_factory`' + ' should implement KubernetesRequestFactory') + + def launch(self): + """ + Launches the pod synchronously and waits for completion. + """ + k8s_beta = self._kube_client() + req = self.kube_req_factory.create(self) + print(json.dumps(req)) + resp = k8s_beta.create_namespaced_job(body=req, namespace=self.namespace) + self.logger.info("Job created. status='%s', yaml:\n%s" + % (str(resp.status), str(req))) + while not self._execution_finished(): + time.sleep(10) + return self.result + + def _kube_client(self): + config.load_incluster_config() + return client.BatchV1Api() + + def _execution_finished(self): + k8s_beta = self._kube_client() + resp = k8s_beta.read_namespaced_job_status(self.name, namespace=self.namespace) + self.logger.info('status : ' + str(resp.status)) + if resp.status.phase == 'Failed': + raise Exception("Job " + self.name + " failed!") + return resp.status.phase != 'Running' http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/58213208/airflow/contrib/kubernetes/pod_launcher.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/pod_launcher.py b/airflow/contrib/kubernetes/pod_launcher.py index a765986..a774d79 100644 --- a/airflow/contrib/kubernetes/pod_launcher.py +++ b/airflow/contrib/kubernetes/pod_launcher.py @@ -1,135 +1,145 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at +# -*- coding: utf-8 -*- # -# http://www.apache.org/licenses/LICENSE-2.0 +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at # -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import base64 import json +import logging import time -from airflow.utils.log.logging_mixin import LoggingMixin -from airflow.utils.state import State -from datetime import datetime as dt -from airflow.contrib.kubernetes.kubernetes_request_factory import \ - pod_request_factory as pod_fac -from kubernetes import watch -from kubernetes.client.rest import ApiException -from airflow import AirflowException -from requests.exceptions import HTTPError -from .kube_client import get_kube_client - - -class PodStatus(object): - PENDING = 'pending' - RUNNING = 'running' - FAILED = 'failed' - SUCCEEDED = 'succeeded' - - -class PodLauncher(LoggingMixin): - def __init__(self, kube_client=None): - super(PodLauncher, self).__init__() - self._client = kube_client or get_kube_client() - self._watch = watch.Watch() - self.kube_req_factory = pod_fac.SimplePodRequestFactory() - - def run_pod_async(self, pod): - req = self.kube_req_factory.create(pod) - self.log.debug('Pod Creation Request: \n{}'.format(json.dumps(req, indent=2))) - try: - resp = self._client.create_namespaced_pod(body=req, namespace=pod.namespace) - self.log.debug('Pod Creation Response: {}'.format(resp)) - except ApiException: - self.log.exception('Exception when attempting to create Namespaced Pod.') - raise - return resp - - def run_pod(self, pod, startup_timeout=120, get_logs=True): - # type: (Pod) -> State +import urllib2 + +from kubernetes import client, config + +from kubernetes_request_factory import KubernetesRequestFactory +from pod import Pod + + +def kube_client(): + config.load_incluster_config() + return client.CoreV1Api() + + +def incluster_namespace(): + """ + :return: The incluster namespace. + """ + config.load_incluster_config() + k8s_configuration = config.incluster_config.configuration + encoded_namespace = k8s_configuration.api_key['authorization'].split(' ')[-1] + api_key = str(base64.b64decode(encoded_namespace)) + key_with_namespace = [k for k in api_key.split(',') if 'namespace' in k][0] + unformatted_namespace = key_with_namespace.split(':')[-1] + return unformatted_namespace.replace('"', '') + + +class KubernetesLauncher: + """ + This class is responsible for launching objects to Kubernetes. + Extend this class to launch exotic objects. + Before trying to extend this method check if augmenting the request factory + is enough for your use-case + :param kube_object: A pod or anything that represents a Kubernetes object + :type kube_object: Pod + :param request_factory: A factory method to create kubernetes requests. + """ + + pod_timeout = 3600 + + def __init__(self, kube_object, request_factory): + if not isinstance(kube_object, Pod): + raise Exception('`kube_object` must inherit from Pod') + if not isinstance(request_factory, KubernetesRequestFactory): + raise Exception('`request_factory` must inherit from ' + 'KubernetesRequestFactory') + self.pod = kube_object + self.request_factory = request_factory + + def launch(self): """ - Launches the pod synchronously and waits for completion. - - Args: - pod (Pod): - startup_timeout (int): Timeout for startup of the pod (if pod is pending for - too long, considers task a failure + Launches the pod synchronously and waits for completion. + No return value from execution. Will raise an exception if things failed """ - resp = self.run_pod_async(pod) - curr_time = dt.now() - if resp.status.start_time is None: - while self.pod_not_started(pod): - delta = dt.now() - curr_time - if delta.seconds >= startup_timeout: - raise AirflowException("Pod took too long to start") - time.sleep(1) - self.log.debug('Pod not yet started') - - final_status = self._monitor_pod(pod, get_logs) - return final_status - - def _monitor_pod(self, pod, get_logs): - # type: (Pod) -> State - - if get_logs: - logs = self._client.read_namespaced_pod_log( - name=pod.name, - namespace=pod.namespace, - follow=True, - tail_lines=10, - _preload_content=False) - for line in logs: - self.log.info(line) - else: - while self.pod_is_running(pod): - self.log.info("Pod {} has state {}".format(pod.name, State.RUNNING)) - time.sleep(2) - return self._task_status(self.read_pod(pod)) - - def _task_status(self, event): - # type: (V1Pod) -> State - self.log.info( - "Event: {} had an event of type {}".format(event.metadata.name, - event.status.phase)) - status = self.process_status(event.metadata.name, event.status.phase) - return status - - def pod_not_started(self, pod): - state = self._task_status(self.read_pod(pod)) - return state == State.QUEUED - - def pod_is_running(self, pod): - state = self._task_status(self.read_pod(pod)) - return state != State.SUCCESS and state != State.FAILED - - def read_pod(self, pod): + k8s_beta = kube_client() + req = self.request_factory.create(self) + logging.info(json.dumps(req)) + resp = k8s_beta.create_namespaced_pod(body=req, namespace=self.pod.namespace) + logging.info("Job created. status='%s', yaml:\n%s" + % (str(resp.status), str(req))) + for i in range(1, self.pod_timeout): + time.sleep(10) + logging.info('Waiting for success') + if self._execution_finished(): + logging.info('Job finished!') + return + raise Exception("Job timed out!") + + def _execution_finished(self): + k8s_beta = kube_client() + resp = k8s_beta.read_namespaced_pod_status( + self.pod.name, + namespace=self.pod.namespace) + logging.info('status : ' + str(resp.status)) + logging.info('phase : i' + str(resp.status.phase)) + if resp.status.phase == 'Failed': + raise Exception("Job " + self.pod.name + " failed!") + return resp.status.phase != 'Running' + + +class KubernetesCommunicationService: + """ + A service that manages communications between pods in Kubernetes and ariflow dagrun + Note that etcd service is running side by side of the airflow on the same machine + using kubernetes magic, so on airflow side we use localhost, and on the remote side + we use the provided etcd host. + """ + + def __init__(self, etcd_host, etcd_port): + self.etcd_host = etcd_host + self.etcd_port = etcd_port + self.url = 'http://localhost:{}'.format(self.etcd_port) + + def pod_pre_stop_hook(self, return_data_file, task_id): + return 'echo value=$(cat %s) | curl -d "@-" -X PUT %s:%s/v2/keys/pod_metrics/%s' \ + % ( + return_data_file, self.etcd_host, self.etcd_port, task_id) + + def pod_return_data(self, task_id): + """ + Returns the pod's return data. The pod_pre_stop_hook is responsible to upload + the return data to etcd. + + If the return_data_file is generated by the application, the pre stop hook + will upload it to etcd and we will be download it back to airflow. + """ + logging.info('querying {} for task id {}'.format(self.url, task_id)) try: - return self._client.read_namespaced_pod(pod.name, pod.namespace) - except HTTPError as e: - raise AirflowException("There was an error reading the kubernetes API: {}" - .format(e)) - - def process_status(self, job_id, status): - status = status.lower() - if status == PodStatus.PENDING: - return State.QUEUED - elif status == PodStatus.FAILED: - self.log.info("Event: {} Failed".format(job_id)) - return State.FAILED - elif status == PodStatus.SUCCEEDED: - self.log.info("Event: {} Succeeded".format(job_id)) - return State.SUCCESS - elif status == PodStatus.RUNNING: - return State.RUNNING - else: - self.log.info("Event: Invalid state {} on job {}".format(status, job_id)) - return State.FAILED + result = urllib2.urlopen(self.url + '/v2/keys/pod_metrics/' + task_id).read() + logging.info('result for querying {} for task id {}: {}' + .format(self.url, task_id, result)) + result = json.loads(result)['node']['value'] + return result + except urllib2.HTTPError as err: + if err.code == 404: + return None # Data not found + raise + + @staticmethod + def from_dag_default_args(dag): + (etcd_host, etcd_port) = dag.default_args.get('etcd_endpoint', ':').split(':') + logging.info('Setting etcd endpoint from dag default args {}:{}' + .format(etcd_host, etcd_port)) + if not etcd_host: + raise Exception('`KubernetesCommunicationService` ' + 'requires etcd endpoint. Please defined it in dag ' + 'degault_args') + return KubernetesCommunicationService(etcd_host, etcd_port) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/58213208/airflow/contrib/operators/k8s_pod_operator/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/k8s_pod_operator/__init__.py b/airflow/contrib/operators/k8s_pod_operator/__init__.py new file mode 100644 index 0000000..50c7b86 --- /dev/null +++ b/airflow/contrib/operators/k8s_pod_operator/__init__.py @@ -0,0 +1,13 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +from .k8s_pod_operator import * http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/58213208/airflow/contrib/operators/k8s_pod_operator/k8s_pod_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/k8s_pod_operator/k8s_pod_operator.py b/airflow/contrib/operators/k8s_pod_operator/k8s_pod_operator.py new file mode 100644 index 0000000..bf7b048 --- /dev/null +++ b/airflow/contrib/operators/k8s_pod_operator/k8s_pod_operator.py @@ -0,0 +1,126 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from airflow.exceptions import AirflowException +from airflow.operators.python_operator import PythonOperator +from airflow.utils.decorators import apply_defaults +from airflow.contrib.kubernetes.pod_launcher import KubernetesLauncher, \ + KubernetesCommunicationService +from airflow.contrib.kubernetes.kubernetes_request_factory import \ + SimplePodRequestFactory, \ + ReturnValuePodRequestFactory +from .op_context import OpContext + + +class PodOperator(PythonOperator): + """ + Executes a pod and waits for the job to finish. + :param dag_run_id: The unique run ID that would be attached to the pod as a label + :type dag_run_id: str + :param pod_factory: Reference to the function that creates the pod with format: + function (OpContext) => Pod + :type pod_factory: callable + :param cache_output: If set to true, the output of the pod would be saved in a + cache object using md5 hash of all the pod parameters + and in case of success, the cached results will be returned + on consecutive calls. Only use this + """ + # template_fields = tuple('dag_run_id') + ui_color = '#8da7be' + + @apply_defaults + def __init__( + self, + dag_run_id, + pod_factory, + cache_output, + kube_request_factory=None, + *args, + **kwargs + ): + super(PodOperator, self).__init__( + python_callable=lambda _: 1, + provide_context=True, + *args, + **kwargs) + self.logger = logging.getLogger(self.__class__.__name__) + if not callable(pod_factory): + raise AirflowException('`pod_factory` param must be callable') + self.dag_run_id = dag_run_id + self.pod_factory = pod_factory + self._cache_output = cache_output + self.op_context = OpContext(self.task_id) + self.kwargs = kwargs + self._kube_request_factory = kube_request_factory or SimplePodRequestFactory + + def execute(self, context): + task_instance = context.get('task_instance') + if task_instance is None: + raise AirflowException('`task_instance` is empty! This should not happen') + self.op_context.set_xcom_instance(task_instance) + pod = self.pod_factory(self.op_context, context) + # Customize the pod + pod.name = self.task_id + pod.labels['run_id'] = self.dag_run_id + pod.namespace = self.dag.default_args.get('namespace', pod.namespace) + + # Launch the pod and wait for it to finish + KubernetesLauncher(pod, self._kube_request_factory).launch() + self.op_context.result = pod.result + + # Cache the output + custom_return_value = self.on_pod_success(context) + if custom_return_value: + self.op_context.custom_return_value = custom_return_value + return self.op_context.result + + def on_pod_success(self, context): + """ + Called when pod is executed successfully. + :return: Returns a custom return value for pod which will + be stored in xcom + """ + pass + + +class ReturnValuePodOperator(PodOperator): + """ + This pod operators is a normal pod operator with the addition of + reading custom return value back from kubernetes. + """ + + def __init__(self, + kube_com_service_factory, + result_data_file, + *args, **kwargs): + super(ReturnValuePodOperator, self).__init__(*args, **kwargs) + if not isinstance(kube_com_service_factory(), KubernetesCommunicationService): + raise AirflowException( + '`kube_com_service_factory` must be of type ' + 'KubernetesCommunicationService') + self._kube_com_service_factory = kube_com_service_factory + self._result_data_file = result_data_file + self._kube_request_factory = self._return_value_kube_request # Overwrite the + # default request factory + + def on_pod_success(self, context): + return_val = self._kube_com_service_factory().pod_return_data(self.task_id) + self.op_context.result = return_val # We also overwrite the results + return return_val + + def _return_value_kube_request(self): + return ReturnValuePodRequestFactory(self._kube_com_service_factory, + self._result_data_file) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/58213208/airflow/contrib/operators/k8s_pod_operator/op_context.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/k8s_pod_operator/op_context.py b/airflow/contrib/operators/k8s_pod_operator/op_context.py new file mode 100644 index 0000000..55a3b00 --- /dev/null +++ b/airflow/contrib/operators/k8s_pod_operator/op_context.py @@ -0,0 +1,104 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and + +from airflow import AirflowException +import logging + + +class OpContext(object): + """ + Data model for operation context of a pod operator with hyper parameters. + OpContext is able to communicate the context between PodOperators by + encapsulating XCom communication + Note: do not directly modify the upstreams + Also note: xcom_instance MUST be set before any attribute of this class can be + read. + :param: task_id The task ID + """ + _supported_attributes = {'hyper_parameters', 'custom_return_value'} + + def __init__(self, task_id): + self.task_id = task_id + self._upstream = [] + self._result = '__not_set__' + self._data = {} + self._xcom_instance = None + self._parent = None + + def __str__(self): + return 'upstream: [' + \ + ','.join([u.task_id for u in self._upstream]) + ']\n' + \ + 'params:' + ','.join( + [k + '=' + str(self._data[k]) for k in self._data.keys()]) + + def __setattr__(self, name, value): + if name in self._data: + raise AirflowException('`{}` is already set'.format(name)) + if name not in self._supported_attributes: + logging.warn( + '`{}` is not in the supported attribute list for OpContext'.format(name)) + self.get_xcom_instance().xcom_push(key=name, value=value) + self._data[name] = value + + def __getattr__(self, item): + if item not in self._supported_attributes: + logging.warn( + '`{}` is not in the supported attribute list for OpContext'.format(item)) + if item not in self._data: + self._data[item] = self.get_xcom_instance().xcom_pull(key=item, + task_ids=self.task_id) + return self._data[item] + + @property + def result(self): + if self._result == '__not_set__': + self._result = self.get_xcom_instance().xcom_pull(task_ids=self.task_id) + return self._result + + @result.setter + def result(self, value): + if self._result != '__not_set__': + raise AirflowException('`result` is already set') + self._result = value + + @property + def upstream(self): + return self._upstream + + def append_upstream(self, upstream_op_contexes): + """ + Appends a list of op_contexts to the upstream. It will create new instances and + set the task_id. + All the upstream op_contextes will share the same xcom_instance with this + op_context + :param upstream_op_contexes: List of upstream op_contextes + """ + for up in upstream_op_contexes: + op_context = OpContext(up.tak_id) + op_context._parent = self + self._upstream.append(op_context) + + def set_xcom_instance(self, xcom_instance): + """ + Sets the xcom_instance for this op_context and upstreams + :param xcom_instance: The Airflow TaskInstance for communication through XCom + :type xcom_instance: airflow.models.TaskInstance + """ + self._xcom_instance = xcom_instance + + def get_xcom_instance(self): + if self._xcom_instance is None and self._parent is None: + raise AirflowException( + 'Trying to access attribtues from OpContext before setting the ' + 'xcom_instance') + return self._xcom_instance or self._parent.get_xcom_instance() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/58213208/airflow/dag_importer/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/dag_importer/__init__.py b/airflow/dag_importer/__init__.py new file mode 100644 index 0000000..f0a792d --- /dev/null +++ b/airflow/dag_importer/__init__.py @@ -0,0 +1,83 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from airflow import configuration + + +def _integrate_plugins(): + pass + + +dag_import_spec = {} + + +def import_dags(): + logging.info("importing dags") + if configuration.has_option('core', 'k8s_mode'): + mode = configuration.get('core', 'k8s_mode') + dag_import_func(mode)() + else: + _import_hostpath() + + +def dag_import_func(mode): + return { + 'git': _import_git, + 'cinder': _import_cinder, + }.get(mode, _import_hostpath) + + +def _import_hostpath(): + + logging.info("importing dags locally") + spec = {'name': 'shared-data', 'hostPath': {}} + spec['hostPath']['path'] = '/tmp/dags' + global dag_import_spec + dag_import_spec = spec + + +def _import_cinder(): + ''' + kind: StorageClass + apiVersion: storage.k8s.io/v1 + metadata: + name: gold + provisioner: kubernetes.io/cinder + parameters: + type: fast + availability: nova + :return: + ''' + global dag_import_spec + spec = {} + + spec['kind'] = 'StorageClass' + spec['apiVersion'] = 'storage.k8s.io/v1' + spec['metatdata']['name'] = 'gold' + spec['provisioner'] = 'kubernetes.io/cinder' + spec['parameters']['type'] = 'fast' + spec['availability'] = 'nova' + + +def _import_git(): + logging.info("importing dags from github") + global dag_import_spec + git_link = configuration.get('core', 'k8s_git_link') + spec = {'name': 'shared-data', 'gitRepo': {}} + spec['gitRepo']['repository'] = git_link + if configuration.has_option('core','k8s_git_revision'): + revision = configuration.get('core', 'k8s_git_revision') + spec['gitRepo']['revision'] = revision + dag_import_spec = spec http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/58213208/airflow/executors/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/executors/__init__.py b/airflow/executors/__init__.py index 8aa15e4..e8d1c32 100644 --- a/airflow/executors/__init__.py +++ b/airflow/executors/__init__.py @@ -19,13 +19,11 @@ import sys from airflow import configuration +from airflow.exceptions import AirflowException from airflow.executors.base_executor import BaseExecutor from airflow.executors.local_executor import LocalExecutor from airflow.executors.sequential_executor import SequentialExecutor -from airflow.exceptions import AirflowException -from airflow.utils.log.logging_mixin import LoggingMixin - DEFAULT_EXECUTOR = None def _integrate_plugins(): @@ -52,6 +50,8 @@ def GetDefaultExecutor(): return DEFAULT_EXECUTOR + + def _get_executor(executor_name): """ Creates a new instance of the named executor. In case the executor name is not know in airflow, @@ -70,6 +70,9 @@ def _get_executor(executor_name): elif executor_name == 'MesosExecutor': from airflow.contrib.executors.mesos_executor import MesosExecutor return MesosExecutor() + elif executor_name == 'KubernetesExecutor': + from airflow.contrib.executors.kubernetes_executor import KubernetesExecutor + return KubernetesExecutor() else: # Loading plugins _integrate_plugins() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/58213208/airflow/executors/base_executor.py ---------------------------------------------------------------------- diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index 30ecee0..d5e958f 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,7 +21,6 @@ from builtins import range from airflow import configuration from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.state import State - PARALLELISM = configuration.conf.getint('core', 'PARALLELISM') @@ -40,6 +39,7 @@ class BaseExecutor(LoggingMixin): self.queued_tasks = {} self.running = {} self.event_buffer = {} + self.logger.setLevel(10) def start(self): # pragma: no cover """ @@ -53,6 +53,8 @@ class BaseExecutor(LoggingMixin): if key not in self.queued_tasks and key not in self.running: self.log.info("Adding to queue: %s", command) self.queued_tasks[key] = (command, priority, queue, task_instance) + else: + self.logger.info("could not queue task {}".format(key)) def queue_task_instance( self, @@ -104,8 +106,7 @@ class BaseExecutor(LoggingMixin): """ pass - def heartbeat(self): - + def heartbeat(self, km=False): # Triggering new jobs if not self.parallelism: open_slots = len(self.queued_tasks) @@ -131,14 +132,13 @@ class BaseExecutor(LoggingMixin): # does NOT eliminate it. self.queued_tasks.pop(key) ti.refresh_from_db() - if ti.state != State.RUNNING: + if ti.state != State.RUNNING or km: self.running[key] = command self.execute_async(key, command=command, queue=queue) else: - self.log.debug( - 'Task is already running, not sending to executor: %s', - key - ) + self.logger.info( + 'Task is already running, not sending to ' + 'executor: {}'.format(key)) # Calling child class sync method self.log.debug("Calling the %s sync method", self.__class__) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/58213208/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index ee92689..02409dd 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -1118,6 +1118,28 @@ class TaskInstance(Base, LoggingMixin): session.commit() @provide_session + def update_hostname(self, hostname, session=None): + """ + For use in kubernetes mode. Update the session to allow heartbeating to SQL + :param session: + + :return: + + """ + t_i = TaskInstance + + qry = session.query(t_i).filter( + t_i.dag_id == self.dag_id, + t_i.task_id == self.task_id, + t_i.execution_date == self.execution_date) + + ti = qry.first() + if ti: + ti.hostname = hostname + session.add(ti) + session.commit() + + @provide_session def refresh_from_db(self, session=None, lock_for_update=False): """ Refreshes the task instance from the database based on the primary key http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/58213208/airflow/plugins_manager.py ---------------------------------------------------------------------- diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py index 735f2de..a7adda6 100644 --- a/airflow/plugins_manager.py +++ b/airflow/plugins_manager.py @@ -48,6 +48,7 @@ class AirflowPlugin(object): admin_views = [] flask_blueprints = [] menu_links = [] + dag_importer = None @classmethod def validate(cls): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/58213208/kubectl ---------------------------------------------------------------------- diff --git a/kubectl b/kubectl new file mode 100644 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/58213208/scripts/ci/kubernetes/docker/airflow.tar.gz ---------------------------------------------------------------------- diff --git a/scripts/ci/kubernetes/docker/airflow.tar.gz b/scripts/ci/kubernetes/docker/airflow.tar.gz new file mode 100644 index 0000000..114c5b4 Binary files /dev/null and b/scripts/ci/kubernetes/docker/airflow.tar.gz differ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/58213208/scripts/ci/kubernetes/kube/.generated/airflow.yaml ---------------------------------------------------------------------- diff --git a/scripts/ci/kubernetes/kube/.generated/airflow.yaml b/scripts/ci/kubernetes/kube/.generated/airflow.yaml new file mode 100644 index 0000000..bd197d3 --- /dev/null +++ b/scripts/ci/kubernetes/kube/.generated/airflow.yaml @@ -0,0 +1,195 @@ +# Licensed to the Apache Software Foundation (ASF) under one * +# or more contributor license agreements. See the NOTICE file * +# distributed with this work for additional information * +# regarding copyright ownership. The ASF licenses this file * +# to you under the Apache License, Version 2.0 (the * +# "License"); you may not use this file except in compliance * +# with the License. You may obtain a copy of the License at * +# * +# http://www.apache.org/licenses/LICENSE-2.0 * +# * +# Unless required by applicable law or agreed to in writing, * +# software distributed under the License is distributed on an * +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * +# KIND, either express or implied. See the License for the * +# specific language governing permissions and limitations * +# under the License. * + +# The backing volume can be anything you want, it just needs to be `ReadWriteOnce` +# I'm using hostPath since minikube is nice for testing, but any (non-local) volume will work on a real cluster +kind: PersistentVolume +apiVersion: v1 +metadata: + name: airflow-dags + labels: + type: local +spec: + capacity: + storage: 10Gi + accessModes: + - ReadWriteOnce + hostPath: + path: "/data/airflow-dags" +--- +kind: PersistentVolumeClaim +apiVersion: v1 +metadata: + name: airflow-dags +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 10Gi +--- +apiVersion: extensions/v1beta1 +kind: Deployment +metadata: + name: airflow +spec: + replicas: 1 + template: + metadata: + labels: + name: airflow + spec: + initContainers: + - name: "init" + image: "airflow/ci:latest" + imagePullPolicy: "IfNotPresent" + volumeMounts: + - name: airflow-configmap + mountPath: /root/airflow/airflow.cfg + subPath: airflow.cfg + - name: airflow-dags + mountPath: /root/airflow/dags + env: + - name: SQL_ALCHEMY_CONN + valueFrom: + secretKeyRef: + name: airflow-secrets + key: sql_alchemy_conn + command: + - "bash" + args: + - "-cx" + - "cd /usr/local/lib/python2.7/dist-packages/airflow && cp -R example_dags/* /root/airflow/dags/ && airflow initdb && alembic upgrade heads" + containers: + - name: web + image: airflow/ci:latest + imagePullPolicy: IfNotPresent + ports: + - name: web + containerPort: 8080 + args: ["webserver"] + env: + - name: AIRFLOW_KUBE_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: SQL_ALCHEMY_CONN + valueFrom: + secretKeyRef: + name: airflow-secrets + key: sql_alchemy_conn + volumeMounts: + - name: airflow-configmap + mountPath: /root/airflow/airflow.cfg + subPath: airflow.cfg + - name: airflow-dags + mountPath: /root/airflow/dags + readinessProbe: + initialDelaySeconds: 5 + timeoutSeconds: 5 + periodSeconds: 5 + httpGet: + path: /admin + port: 8080 + livenessProbe: + initialDelaySeconds: 5 + timeoutSeconds: 5 + failureThreshold: 5 + httpGet: + path: /admin + port: 8080 + - name: scheduler + image: airflow/ci:latest + imagePullPolicy: IfNotPresent + args: ["scheduler"] + env: + - name: AIRFLOW_KUBE_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: SQL_ALCHEMY_CONN + valueFrom: + secretKeyRef: + name: airflow-secrets + key: sql_alchemy_conn + volumeMounts: + - name: airflow-configmap + mountPath: /root/airflow/airflow.cfg + subPath: airflow.cfg + - name: airflow-dags + mountPath: /root/airflow/dags + volumes: + - name: airflow-dags + persistentVolumeClaim: + claimName: airflow-dags + - name: airflow-configmap + configMap: + name: airflow-configmap +--- +apiVersion: v1 +kind: Service +metadata: + name: airflow +spec: + type: NodePort + ports: + - port: 8080 + nodePort: 30809 + selector: + name: airflow +--- +apiVersion: v1 +kind: Secret +metadata: + name: airflow-secrets +type: Opaque +data: + # The sql_alchemy_conn value is a base64 encoded represenation of this connection string: + # postgresql+psycopg2://root:root@postgres-airflow:5432/airflow + sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6cm9vdEBwb3N0Z3Jlcy1haXJmbG93OjU0MzIvYWlyZmxvdwo= +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: airflow-configmap +data: + airflow.cfg: | + [core] + airflow_home = /root/airflow + dags_folder = /root/airflow/dags + base_log_folder = /root/airflow/logs + logging_level = INFO + executor = KubernetesExecutor + parallelism = 32 + plugins_folder = /root/airflow/plugins + sql_alchemy_conn = $SQL_ALCHEMY_CONN + + [scheduler] + dag_dir_list_interval = 60 + child_process_log_directory = /root/airflow/logs/scheduler + + [kubernetes] + airflow_configmap = airflow-configmap + worker_container_repository = airflow/ci + worker_container_tag = latest + delete_worker_pods = False + git_repo = https://github.com/grantnicholas/testdags.git + git_branch = master + dags_volume_claim = airflow-dags + + [kubernetes_secrets] + SQL_ALCHEMY_CONN = airflow-secrets=sql_alchemy_conn http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/58213208/scripts/ci/requirements.txt ---------------------------------------------------------------------- diff --git a/scripts/ci/requirements.txt b/scripts/ci/requirements.txt new file mode 100644 index 0000000..bba5d29 --- /dev/null +++ b/scripts/ci/requirements.txt @@ -0,0 +1,96 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +alembic +azure-storage>=0.34.0 +bcrypt +bleach +boto +boto3 +celery +cgroupspy +chartkick +cloudant +coverage +coveralls +croniter>=0.3.17 +cryptography +datadog +dill +distributed +docker-py +filechunkio +flake8 +flask +flask-admin +flask-bcrypt +flask-cache +flask-login==0.2.11 +Flask-WTF +flower +freezegun +future +google-api-python-client>=1.5.0,<1.6.0 +gunicorn +hdfs +hive-thrift-py +impyla +ipython +jaydebeapi +jinja2<2.9.0 +jira +ldap3 +lxml +markdown +mock +moto==1.1.19 +mysqlclient +nose +nose-exclude +nose-ignore-docstring==0.2 +nose-timer +oauth2client>=2.0.2,<2.1.0 +pandas +pandas-gbq +parameterized +paramiko>=2.1.1 +pendulum>=1.3.2 +psutil>=4.2.0, <5.0.0 +psycopg2 +pygments +pyhive +pykerberos +PyOpenSSL +PySmbClient +python-daemon +python-dateutil +qds-sdk>=1.9.6 +redis +rednose +requests +requests-kerberos +requests_mock +sendgrid +setproctitle +slackclient +sphinx +sphinx-argparse +Sphinx-PyPI-upload +sphinx_rtd_theme +sqlalchemy>=1.1.15, <1.2.0 +statsd +thrift +thrift_sasl +unicodecsv +zdesk +kubernetes http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/58213208/tests/contrib/__init__.py ---------------------------------------------------------------------- diff --git a/tests/contrib/__init__.py b/tests/contrib/__init__.py index 03b4dcc..008677e 100644 --- a/tests/contrib/__init__.py +++ b/tests/contrib/__init__.py @@ -20,4 +20,4 @@ from __future__ import absolute_import from .operators import * from .sensors import * -from .utils import * +from .kubernetes import *
