http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/airflow/bin/cli/cli_factory.py ---------------------------------------------------------------------- diff --git a/airflow/bin/cli/cli_factory.py b/airflow/bin/cli/cli_factory.py new file mode 100644 index 0000000..46efcf7 --- /dev/null +++ b/airflow/bin/cli/cli_factory.py @@ -0,0 +1,491 @@ +from collections import namedtuple +from cli import * +from dateutil.parser import parse as parsedate + + +Arg = namedtuple( + 'Arg', ['flags', 'help', 'action', 'default', 'nargs', 'type', 'choices', 'metavar']) +Arg.__new__.__defaults__ = (None, None, None, None, None, None, None) + + +class CLIFactory(object): + args = { + # Shared + 'dag_id': Arg(("dag_id",), "The id of the dag"), + 'task_id': Arg(("task_id",), "The id of the task"), + 'execution_date': Arg( + ("execution_date",), help="The execution date of the DAG", + type=parsedate), + 'task_regex': Arg( + ("-t", "--task_regex"), + "The regex to filter specific task_ids to backfill (optional)"), + 'subdir': Arg( + ("-sd", "--subdir"), + "File location or directory from which to look for the dag", + default=settings.DAGS_FOLDER), + 'start_date': Arg( + ("-s", "--start_date"), "Override start_date YYYY-MM-DD", + type=parsedate), + 'end_date': Arg( + ("-e", "--end_date"), "Override end_date YYYY-MM-DD", + type=parsedate), + 'dry_run': Arg( + ("-dr", "--dry_run"), "Perform a dry run", "store_true"), + 'pid': Arg( + ("--pid", ), "PID file location", + nargs='?'), + 'daemon': Arg( + ("-D", "--daemon"), "Daemonize instead of running " + "in the foreground", + "store_true"), + 'stderr': Arg( + ("--stderr", ), "Redirect stderr to this file"), + 'stdout': Arg( + ("--stdout", ), "Redirect stdout to this file"), + 'log_file': Arg( + ("-l", "--log-file"), "Location of the log file"), + + # backfill + 'mark_success': Arg( + ("-m", "--mark_success"), + "Mark jobs as succeeded without running them", "store_true"), + 'local': Arg( + ("-l", "--local"), + "Run the task using the LocalExecutor", "store_true"), + 'donot_pickle': Arg( + ("-x", "--donot_pickle"), ( + "Do not attempt to pickle the DAG object to send over " + "to the workers, just tell the workers to run their version " + "of the code."), + "store_true"), + 'include_adhoc': Arg( + ("-a", "--include_adhoc"), + "Include dags with the adhoc parameter.", "store_true"), + 'bf_ignore_dependencies': Arg( + ("-i", "--ignore_dependencies"), + ( + "Skip upstream tasks, run only the tasks " + "matching the regexp. Only works in conjunction " + "with task_regex"), + "store_true"), + 'bf_ignore_first_depends_on_past': Arg( + ("-I", "--ignore_first_depends_on_past"), + ( + "Ignores depends_on_past dependencies for the first " + "set of tasks only (subsequent executions in the backfill " + "DO respect depends_on_past)."), + "store_true"), + 'pool': Arg(("--pool",), "Resource pool to use"), + # list_tasks + 'tree': Arg(("-t", "--tree"), "Tree view", "store_true"), + # list_dags + 'report': Arg( + ("-r", "--report"), "Show DagBag loading report", "store_true"), + # clear + 'upstream': Arg( + ("-u", "--upstream"), "Include upstream tasks", "store_true"), + 'only_failed': Arg( + ("-f", "--only_failed"), "Only failed jobs", "store_true"), + 'only_running': Arg( + ("-r", "--only_running"), "Only running jobs", "store_true"), + 'downstream': Arg( + ("-d", "--downstream"), "Include downstream tasks", "store_true"), + 'no_confirm': Arg( + ("-c", "--no_confirm"), + "Do not request confirmation", "store_true"), + 'exclude_subdags': Arg( + ("-x", "--exclude_subdags"), + "Exclude subdags", "store_true"), + # trigger_dag + 'run_id': Arg(("-r", "--run_id"), "Helps to identify this run"), + 'conf': Arg( + ('-c', '--conf'), + "JSON string that gets pickled into the DagRun's conf attribute"), + 'exec_date': Arg( + ("-e", "--exec_date"), help="The execution date of the DAG", + type=parsedate), + # pool + 'pool_set': Arg( + ("-s", "--set"), + nargs=3, + metavar=('NAME', 'SLOT_COUNT', 'POOL_DESCRIPTION'), + help="Set pool slot count and description, respectively"), + 'pool_get': Arg( + ("-g", "--get"), + metavar='NAME', + help="Get pool info"), + 'pool_delete': Arg( + ("-x", "--delete"), + metavar="NAME", + help="Delete a pool"), + # variables + 'set': Arg( + ("-s", "--set"), + nargs=2, + metavar=('KEY', 'VAL'), + help="Set a variable"), + 'get': Arg( + ("-g", "--get"), + metavar='KEY', + help="Get value of a variable"), + 'default': Arg( + ("-d", "--default"), + metavar="VAL", + default=None, + help="Default value returned if variable does not exist"), + 'json': Arg( + ("-j", "--json"), + help="Deserialize JSON variable", + action="store_true"), + 'var_import': Arg( + ("-i", "--import"), + metavar="FILEPATH", + help="Import variables from JSON file"), + 'var_export': Arg( + ("-e", "--export"), + metavar="FILEPATH", + help="Export variables to JSON file"), + 'var_delete': Arg( + ("-x", "--delete"), + metavar="KEY", + help="Delete a variable"), + # kerberos + 'principal': Arg( + ("principal",), "kerberos principal", + nargs='?', default=conf.get('kerberos', 'principal')), + 'keytab': Arg( + ("-kt", "--keytab"), "keytab", + nargs='?', default=conf.get('kerberos', 'keytab')), + # run + # TODO(aoen): "force" is a poor choice of name here since it implies it overrides + # all dependencies (not just past success), e.g. the ignore_depends_on_past + # dependency. This flag should be deprecated and renamed to 'ignore_ti_state' and + # the "ignore_all_dependencies" command should be called the"force" command + # instead. + 'force': Arg( + ("-f", "--force"), + "Ignore previous task instance state, rerun regardless if task already " + "succeeded/failed", + "store_true"), + 'raw': Arg(("-r", "--raw"), argparse.SUPPRESS, "store_true"), + 'ignore_all_dependencies': Arg( + ("-A", "--ignore_all_dependencies"), + "Ignores all non-critical dependencies, including ignore_ti_state and " + "ignore_task_deps", + "store_true"), + # TODO(aoen): ignore_dependencies is a poor choice of name here because it is too + # vague (e.g. a task being in the appropriate state to be run is also a dependency + # but is not ignored by this flag), the name 'ignore_task_dependencies' is + # slightly better (as it ignores all dependencies that are specific to the task), + # so deprecate the old command name and use this instead. + 'ignore_dependencies': Arg( + ("-i", "--ignore_dependencies"), + "Ignore task-specific dependencies, e.g. upstream, depends_on_past, and " + "retry delay dependencies", + "store_true"), + 'ignore_depends_on_past': Arg( + ("-I", "--ignore_depends_on_past"), + "Ignore depends_on_past dependencies (but respect " + "upstream dependencies)", + "store_true"), + 'ship_dag': Arg( + ("--ship_dag",), + "Pickles (serializes) the DAG and ships it to the worker", + "store_true"), + 'kubernetes_mode': Arg( + ("-km", "--kubernetes_mode"), + "Pod will not contact postgres instance. should only be used for kubernetes tasks", + "store_true"), + 'pickle': Arg( + ("-p", "--pickle"), + "Serialized pickle object of the entire dag (used internally)"), + 'job_id': Arg(("-j", "--job_id"), argparse.SUPPRESS), + 'cfg_path': Arg( + ("--cfg_path", ), "Path to config file to use instead of airflow.cfg"), + # webserver + 'port': Arg( + ("-p", "--port"), + default=conf.get('webserver', 'WEB_SERVER_PORT'), + type=int, + help="The port on which to run the server"), + 'ssl_cert': Arg( + ("--ssl_cert", ), + default=conf.get('webserver', 'WEB_SERVER_SSL_CERT'), + help="Path to the SSL certificate for the webserver"), + 'ssl_key': Arg( + ("--ssl_key", ), + default=conf.get('webserver', 'WEB_SERVER_SSL_KEY'), + help="Path to the key to use with the SSL certificate"), + 'workers': Arg( + ("-w", "--workers"), + default=conf.get('webserver', 'WORKERS'), + type=int, + help="Number of workers to run the webserver on"), + 'workerclass': Arg( + ("-k", "--workerclass"), + default=conf.get('webserver', 'WORKER_CLASS'), + choices=['sync', 'eventlet', 'gevent', 'tornado'], + help="The worker class to use for Gunicorn"), + 'worker_timeout': Arg( + ("-t", "--worker_timeout"), + default=conf.get('webserver', 'WEB_SERVER_WORKER_TIMEOUT'), + type=int, + help="The timeout for waiting on webserver workers"), + 'hostname': Arg( + ("-hn", "--hostname"), + default=conf.get('webserver', 'WEB_SERVER_HOST'), + help="Set the hostname on which to run the web server"), + 'debug': Arg( + ("-d", "--debug"), + "Use the server that ships with Flask in debug mode", + "store_true"), + 'access_logfile': Arg( + ("-A", "--access_logfile"), + default=conf.get('webserver', 'ACCESS_LOGFILE'), + help="The logfile to store the webserver access log. Use '-' to print to " + "stderr."), + 'error_logfile': Arg( + ("-E", "--error_logfile"), + default=conf.get('webserver', 'ERROR_LOGFILE'), + help="The logfile to store the webserver error log. Use '-' to print to " + "stderr."), + # resetdb + 'yes': Arg( + ("-y", "--yes"), + "Do not prompt to confirm reset. Use with care!", + "store_true", + default=False), + # scheduler + 'dag_id_opt': Arg(("-d", "--dag_id"), help="The id of the dag to run"), + 'run_duration': Arg( + ("-r", "--run-duration"), + default=None, type=int, + help="Set number of seconds to execute before exiting"), + 'num_runs': Arg( + ("-n", "--num_runs"), + default=-1, type=int, + help="Set the number of runs to execute before exiting"), + # worker + 'do_pickle': Arg( + ("-p", "--do_pickle"), + default=False, + help=( + "Attempt to pickle the DAG object to send over " + "to the workers, instead of letting workers run their version " + "of the code."), + action="store_true"), + 'queues': Arg( + ("-q", "--queues"), + help="Comma delimited list of queues to serve", + default=conf.get('celery', 'DEFAULT_QUEUE')), + 'concurrency': Arg( + ("-c", "--concurrency"), + type=int, + help="The number of worker processes", + default=conf.get('celery', 'celeryd_concurrency')), + # flower + 'broker_api': Arg(("-a", "--broker_api"), help="Broker api"), + 'flower_hostname': Arg( + ("-hn", "--hostname"), + default=conf.get('celery', 'FLOWER_HOST'), + help="Set the hostname on which to run the server"), + 'flower_port': Arg( + ("-p", "--port"), + default=conf.get('celery', 'FLOWER_PORT'), + type=int, + help="The port on which to run the server"), + 'flower_conf': Arg( + ("-fc", "--flower_conf"), + help="Configuration file for flower"), + 'task_params': Arg( + ("-tp", "--task_params"), + help="Sends a JSON params dict to the task"), + # connections + 'list_connections': Arg( + ('-l', '--list'), + help='List all connections', + action='store_true'), + 'add_connection': Arg( + ('-a', '--add'), + help='Add a connection', + action='store_true'), + 'delete_connection': Arg( + ('-d', '--delete'), + help='Delete a connection', + action='store_true'), + 'conn_id': Arg( + ('--conn_id',), + help='Connection id, required to add/delete a connection', + type=str), + 'conn_uri': Arg( + ('--conn_uri',), + help='Connection URI, required to add a connection', + type=str), + 'conn_extra': Arg( + ('--conn_extra',), + help='Connection `Extra` field, optional when adding a connection', + type=str), + } + subparsers = ( + { + 'func': backfill, + 'help': "Run subsections of a DAG for a specified date range", + 'args': ( + 'dag_id', 'task_regex', 'start_date', 'end_date', + 'mark_success', 'local', 'donot_pickle', 'include_adhoc', + 'bf_ignore_dependencies', 'bf_ignore_first_depends_on_past', + 'subdir', 'pool', 'dry_run') + }, { + 'func': list_tasks, + 'help': "List the tasks within a DAG", + 'args': ('dag_id', 'tree', 'subdir'), + }, { + 'func': clear, + 'help': "Clear a set of task instance, as if they never ran", + 'args': ( + 'dag_id', 'task_regex', 'start_date', 'end_date', 'subdir', + 'upstream', 'downstream', 'no_confirm', 'only_failed', + 'only_running', 'exclude_subdags'), + }, { + 'func': pause, + 'help': "Pause a DAG", + 'args': ('dag_id', 'subdir'), + }, { + 'func': unpause, + 'help': "Resume a paused DAG", + 'args': ('dag_id', 'subdir'), + }, { + 'func': trigger_dag, + 'help': "Trigger a DAG run", + 'args': ('dag_id', 'subdir', 'run_id', 'conf', 'exec_date'), + }, { + 'func': pool, + 'help': "CRUD operations on pools", + "args": ('pool_set', 'pool_get', 'pool_delete'), + }, { + 'func': variables, + 'help': "CRUD operations on variables", + "args": ('set', 'get', 'json', 'default', 'var_import', 'var_export', 'var_delete'), + }, { + 'func': kerberos, + 'help': "Start a kerberos ticket renewer", + 'args': ('principal', 'keytab', 'pid', + 'daemon', 'stdout', 'stderr', 'log_file'), + }, { + 'func': render, + 'help': "Render a task instance's template(s)", + 'args': ('dag_id', 'task_id', 'execution_date', 'subdir'), + }, { + 'func': run, + 'help': "Run a single task instance", + 'args': ( + 'dag_id', 'task_id', 'execution_date', 'subdir', + 'mark_success', 'force', 'pool', 'cfg_path', + 'local', 'raw', 'ignore_all_dependencies', 'ignore_dependencies', + 'ignore_depends_on_past', 'ship_dag', 'pickle', 'job_id', 'kubernetes_mode'), + }, { + 'func': initdb, + 'help': "Initialize the metadata database", + 'args': tuple(), + }, { + 'func': list_dags, + 'help': "List all the DAGs", + 'args': ('subdir', 'report'), + }, { + 'func': dag_state, + 'help': "Get the status of a dag run", + 'args': ('dag_id', 'execution_date', 'subdir'), + }, { + 'func': task_failed_deps, + 'help': ( + "Returns the unmet dependencies for a task instance from the perspective " + "of the scheduler. In other words, why a task instance doesn't get " + "scheduled and then queued by the scheduler, and then run by an " + "executor)."), + 'args': ('dag_id', 'task_id', 'execution_date', 'subdir'), + }, { + 'func': task_state, + 'help': "Get the status of a task instance", + 'args': ('dag_id', 'task_id', 'execution_date', 'subdir'), + }, { + 'func': serve_logs, + 'help': "Serve logs generate by worker", + 'args': tuple(), + }, { + 'func': test, + 'help': ( + "Test a task instance. This will run a task without checking for " + "dependencies or recording it's state in the database."), + 'args': ( + 'dag_id', 'task_id', 'execution_date', 'subdir', 'dry_run', + 'task_params'), + }, { + 'func': webserver, + 'help': "Start a Airflow webserver instance", + 'args': ('port', 'workers', 'workerclass', 'worker_timeout', 'hostname', + 'pid', 'daemon', 'stdout', 'stderr', 'access_logfile', + 'error_logfile', 'log_file', 'ssl_cert', 'ssl_key', 'debug'), + }, { + 'func': resetdb, + 'help': "Burn down and rebuild the metadata database", + 'args': ('yes',), + }, { + 'func': upgradedb, + 'help': "Upgrade the metadata database to latest version", + 'args': tuple(), + }, { + 'func': scheduler, + 'help': "Start a scheduler instance", + 'args': ('dag_id_opt', 'subdir', 'run_duration', 'num_runs', + 'do_pickle', 'pid', 'daemon', 'stdout', 'stderr', + 'log_file'), + }, { + 'func': worker, + 'help': "Start a Celery worker node", + 'args': ('do_pickle', 'queues', 'concurrency', + 'pid', 'daemon', 'stdout', 'stderr', 'log_file'), + }, { + 'func': flower, + 'help': "Start a Celery Flower", + 'args': ('flower_hostname', 'flower_port', 'flower_conf', 'broker_api', + 'pid', 'daemon', 'stdout', 'stderr', 'log_file'), + }, { + 'func': version, + 'help': "Show the version", + 'args': tuple(), + }, { + 'func': connections, + 'help': "List/Add/Delete connections", + 'args': ('list_connections', 'add_connection', 'delete_connection', + 'conn_id', 'conn_uri', 'conn_extra'), + }, + ) + subparsers_dict = {sp['func'].__name__: sp for sp in subparsers} + dag_subparsers = ( + 'list_tasks', 'backfill', 'test', 'run', 'pause', 'unpause') + + @classmethod + def get_parser(cls, dag_parser=False): + parser = argparse.ArgumentParser() + subparsers = parser.add_subparsers( + help='sub-command help', dest='subcommand') + subparsers.required = True + + subparser_list = cls.dag_subparsers if dag_parser else cls.subparsers_dict.keys() + for sub in subparser_list: + sub = cls.subparsers_dict[sub] + sp = subparsers.add_parser(sub['func'].__name__, help=sub['help']) + for arg in sub['args']: + if 'dag_id' in arg and dag_parser: + continue + arg = cls.args[arg] + kwargs = { + f: getattr(arg, f) + for f in arg._fields if f != 'flags' and getattr(arg, f)} + sp.add_argument(*arg.flags, **kwargs) + sp.set_defaults(func=sub['func']) + return parser + + +def get_parser(): + return CLIFactory.get_parser()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/airflow/contrib/executors/kubernetes_executor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py index 0a3e9f2..8eb2186 100644 --- a/airflow/contrib/executors/kubernetes_executor.py +++ b/airflow/contrib/executors/kubernetes_executor.py @@ -17,20 +17,20 @@ import logging import time import os import multiprocessing -from airflow.contrib.kubernetes.kubernetes_pod_builder import KubernetesPodBuilder -from airflow.contrib.kubernetes.kubernetes_helper import KubernetesHelper from queue import Queue from kubernetes import watch from airflow import settings -from airflow.contrib.kubernetes.kubernetes_request_factory import SimplePodRequestFactory +from airflow.contrib.kubernetes.pod_launcher import PodLauncher from airflow.executors.base_executor import BaseExecutor from airflow.models import TaskInstance +from airflow.contrib.kubernetes.pod import Pod from airflow.utils.state import State from airflow import configuration -import json -# TODO this is just for proof of concept. remove before merging. - +from kubernetes import client +class KubeConfig: + kube_image = configuration.get('core', 'k8s_image') + git_repo = configuration.get('core', 'k8s_git_repo') def _prep_command_for_container(command): @@ -48,6 +48,9 @@ def _prep_command_for_container(command): return '"' + '","'.join(command.split(' ')[1:]) + '"' +PARALLELISM = configuration.getint('core', 'PARALLELISM') + + class KubernetesJobWatcher(multiprocessing.Process, object): def __init__(self, watch_function, namespace, result_queue, watcher_queue): self.logger = logging.getLogger(__name__) @@ -60,26 +63,26 @@ class KubernetesJobWatcher(multiprocessing.Process, object): def run(self): self.logger.info("Event: and now my watch begins") - self.logger.info("Event: proof of image change") - self.logger.info("Event: running {} with {}".format(str(self._watch_function), - self.namespace)) - for event in self._watch.stream(self._watch_function, self.namespace): - task= event['object'] - self.logger.info("Event: {} had an event of type {}".format(task.metadata.name, - event['type'])) + for event in self._watch.stream(self._watch_function, self.namespace, + label_selector='airflow-slave'): + task = event['object'] + self.logger.info( + "Event: {} had an event of type {}".format(task.metadata.name, + event['type'])) self.process_status(task.metadata.name, task.status.phase) def process_status(self, job_id, status): if status == 'Pending': self.logger.info("Event: {} Pending".format(job_id)) elif status == 'Failed': - self.logger.info("Event: {} Failed".format(job_id)) + # self.logger.info("Event: {} Failed".format(job_id)) self.watcher_queue.put((job_id, State.FAILED)) elif status == 'Succeeded': - self.logger.info("Event: {} Succeeded".format(job_id)) + # self.logger.info("Event: {} Succeeded".format(job_id)) self.watcher_queue.put((job_id, None)) elif status == 'Running': - self.logger.info("Event: {} is Running".format(job_id)) + # self.logger.info("Event: {} is Running".format(job_id)) + self.watcher_queue.put((job_id, State.RUNNING)) else: self.logger.info("Event: Invalid state {} on job {}".format(status, job_id)) @@ -92,15 +95,19 @@ class AirflowKubernetesScheduler(object): self.logger = logging.getLogger(__name__) self.logger.info("creating kubernetes executor") self.task_queue = task_queue + self.pending_jobs = set() self.namespace = os.environ['k8s_POD_NAMESPACE'] self.logger.info("k8s: using namespace {}".format(self.namespace)) self.result_queue = result_queue + self.launcher = PodLauncher() self.current_jobs = {} self.running = running self._task_counter = 0 self.watcher_queue = multiprocessing.Queue() - self.helper = KubernetesHelper() - w = KubernetesJobWatcher(self.helper.pod_api.list_namespaced_pod, self.namespace, + self.api = client.CoreV1Api() + + watch_function = self.api.read_namespaced_pod + w = KubernetesJobWatcher(watch_function, self.namespace, self.result_queue, self.watcher_queue) w.start() @@ -119,29 +126,29 @@ class AirflowKubernetesScheduler(object): (key, command) = next_job self.logger.info("running for command {}".format(command)) epoch_time = calendar.timegm(time.gmtime()) - command_list = ["/usr/local/airflow/entrypoint.sh"] + command.split()[1:] + \ - ['-km'] - self._set_host_id(key) + cmd_args = "mkdir -p $AIRFLOW_HOME/dags/synched/git && " \ + "git clone {} /tmp/tmp_git && " \ + "mv /tmp/tmp_git/* $AIRFLOW_HOME/dags/synched/git/ &&" \ + "rm -rf /tmp/tmp_git &&" \ + "{} -km".format(KubeConfig.git_repo, command) pod_id = self._create_job_id_from_key(key=key, epoch_time=epoch_time) self.current_jobs[pod_id] = key - image = configuration.get('core','k8s_image') - print("k8s: launching image {}".format(image)) pod = KubernetesPodBuilder( - image= image, - cmds=command_list, + image=KubeConfig.kube_image, + cmds=["bash", "-cx", "--"], + args=[cmd_args], kub_req_factory=SimplePodRequestFactory(), - namespace=self.namespace) + namespace=self.namespace + ) pod.add_name(pod_id) pod.launch() self._task_counter += 1 + # the watcher will monitor pods, so we do not block. + self.launcher.run_pod_async(pod) self.logger.info("k8s: Job created!") - def delete_job(self, key): - job_id = self.current_jobs[key] - self.helper.delete_job(job_id, namespace=self.namespace) - def sync(self): """ @@ -153,17 +160,21 @@ class AirflowKubernetesScheduler(object): """ while not self.watcher_queue.empty(): - self.end_task() + self.process_watcher_task() - def end_task(self): + def process_watcher_task(self): job_id, state = self.watcher_queue.get() - if job_id in self.current_jobs: - key = self.current_jobs[job_id] - self.logger.info("finishing job {}".format(key)) - if state: - self.result_queue.put((key, state)) - self.current_jobs.pop(job_id) - self.running.pop(key) + if state == State.RUNNING and job_id in self.pending_jobs: + self.pending_jobs.remove(job_id) + elif job_id in self.current_jobs: + self._complete_job(job_id, state) + + def _complete_job(self, job_id, state): + key = self.current_jobs[job_id] + self.logger.info("finishing job {}".format(key)) + self.result_queue.put((key, state)) + self.current_jobs.pop(job_id) + self.running.pop(key) def _create_job_id_from_key(self, key, epoch_time): """ @@ -186,23 +197,22 @@ class AirflowKubernetesScheduler(object): job_id = unformatted_job_id.replace('_', '-') return job_id - def _set_host_id(self, key): - (dag_id, task_id, ex_time) = key - session = settings.Session() - item = session.query(TaskInstance) \ - .filter_by(dag_id=dag_id, task_id=task_id, execution_date=ex_time).one() - - host_id = item.hostname - print("host is {}".format(host_id)) - class KubernetesExecutor(BaseExecutor): + def __init__(self): + super(KubernetesExecutor, self).__init__(parallelism=PARALLELISM) + self.task_queue = None + self._session = None + self.result_queue = None + self.pending_tasks = None + self.kub_client = None def start(self): self.logger.info('k8s: starting kubernetes executor') self.task_queue = Queue() self._session = settings.Session() self.result_queue = Queue() + self.pending_tasks = {} self.kub_client = AirflowKubernetesScheduler(self.task_queue, self.result_queue, running=self.running) @@ -215,7 +225,7 @@ class KubernetesExecutor(BaseExecutor): self.change_state(*results) # TODO this could be a job_counter based on max jobs a user wants - if len(self.kub_client.current_jobs) > 3: + if self.job_queue_full() or self.cluster_at_capacity(): self.logger.info("currently a job is running") else: self.logger.info("queue ready, running next") @@ -223,13 +233,20 @@ class KubernetesExecutor(BaseExecutor): (key, command) = self.task_queue.get() self.kub_client.run_next((key, command)) + def job_queue_full(self): + return len(self.kub_client.current_jobs) > PARALLELISM + + def cluster_at_capacity(self): + return len(self.pending_tasks) > 5 + def terminate(self): pass def change_state(self, key, state): self.logger.info("k8s: setting state of {} to {}".format(key, state)) if state != State.RUNNING: - self.kub_client.delete_job(key) + # self.kub_client.delete_job(key) + self.logger.info("current running {}".format(self.running)) self.running.pop(key) self.event_buffer[key] = state (dag_id, task_id, ex_time) = key http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/airflow/contrib/kubernetes/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/__init__.py b/airflow/contrib/kubernetes/__init__.py index 59eeddf..c82f579 100644 --- a/airflow/contrib/kubernetes/__init__.py +++ b/airflow/contrib/kubernetes/__init__.py @@ -12,6 +12,3 @@ # See the License for the specific language governing permissions and # limitations under the License. -from airflow import dag_importer - -dag_importer.import_dags() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/airflow/contrib/kubernetes/kubernetes_factory.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_factory.py b/airflow/contrib/kubernetes/kubernetes_factory.py new file mode 100644 index 0000000..fc840bb --- /dev/null +++ b/airflow/contrib/kubernetes/kubernetes_factory.py @@ -0,0 +1,63 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and + +from airflow.contrib.kubernetes.kubernetes_request_factory import KubernetesRequestFactory +import logging + + +class KubernetesResourceBuilder: + def __init__( + self, + image, + cmds, + args, + namespace, + kub_req_factory=None + ): + # type: (str, list, str, KubernetesRequestFactory) -> KubernetesResourceBuilder + + self.image = image + self.args = args + self.cmds = cmds + self.kub_req_factory = kub_req_factory + self.namespace = namespace + self.logger = logging.getLogger(self.__class__.__name__) + self.envs = {} + self.labels = {} + self.secrets = {} + self.node_selectors = [] + self.name = None + + def add_env_variables(self, env): + self.envs = env + + def add_secret(self, secret): + self.secrets = self.secrets + [secret] + + def add_secrets(self, secrets): + self.secrets = secrets + + def add_labels(self, labels): + self.labels = labels + + def add_name(self, name): + self.name = name + + def set_namespace(self, namespace): + self.namespace = namespace + + +class KubernetesPodBuilder(KubernetesResourceBuilder): + def __init__(self, image, cmds, namespace, kub_req_factory=None): + # type: (str, list, str, KubernetesRequestFactory) -> KubernetesPodBuilder + KubernetesResourceBuilder.__init__(self, image, cmds, namespace, kub_req_factory) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/airflow/contrib/kubernetes/kubernetes_job_builder.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_job_builder.py b/airflow/contrib/kubernetes/kubernetes_job_builder.py index 65237ff..4171dbc 100644 --- a/airflow/contrib/kubernetes/kubernetes_job_builder.py +++ b/airflow/contrib/kubernetes/kubernetes_job_builder.py @@ -61,10 +61,6 @@ class KubernetesJobBuilder: self.logger.info("Job created. status='%s', yaml:\n%s", str(resp.status), str(req)) - def _kube_client(self): - config.load_incluster_config() - return client.BatchV1Api() - def _execution_finished(self): k8s_beta = self._kube_client() resp = k8s_beta.read_namespaced_job_status(self.name, namespace=self.namespace) @@ -72,3 +68,8 @@ class KubernetesJobBuilder: if resp.status.phase == 'Failed': raise Exception("Job " + self.name + " failed!") return resp.status.phase != 'Running' + + @staticmethod + def _kube_client(): + config.load_incluster_config() + return client.BatchV1Api() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/airflow/contrib/kubernetes/kubernetes_pod_builder.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_pod_builder.py b/airflow/contrib/kubernetes/kubernetes_pod_builder.py deleted file mode 100644 index 2b0a9e4..0000000 --- a/airflow/contrib/kubernetes/kubernetes_pod_builder.py +++ /dev/null @@ -1,74 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and - -from kubernetes import client, config -import json -import logging - - -class KubernetesPodBuilder: - def __init__( - self, - image, - cmds, - namespace, - kub_req_factory=None - ): - self.image = image - self.cmds = cmds - self.kub_req_factory = kub_req_factory - self.namespace = namespace - self.logger = logging.getLogger(self.__class__.__name__) - self.envs = {} - self.labels = {} - self.secrets = {} - self.node_selectors = [] - self.name = None - - def add_env_variables(self, env): - self.envs = env - - def add_secrets(self, secrets): - self.secrets = secrets - - def add_labels(self, labels): - self.labels = labels - - def add_name(self, name): - self.name = name - - def set_namespace(self, namespace): - self.namespace = namespace - - def launch(self): - """ - Launches the pod synchronously and waits for completion. - """ - k8s_beta = self._kube_client() - req = self.kub_req_factory.create(self) - print(json.dumps(req)) - resp = k8s_beta.create_namespaced_pod(body=req, namespace=self.namespace) - self.logger.info("Job created. status='%s', yaml:\n%s", - str(resp.status), str(req)) - - def _kube_client(self): - config.load_incluster_config() - return client.CoreV1Api() - - def _execution_finished(self): - k8s_beta = self._kube_client() - resp = k8s_beta.read_namespaced_job_status(self.name, namespace=self.namespace) - self.logger.info('status : ' + str(resp.status)) - if resp.status.phase == 'Failed': - raise Exception("Job " + self.name + " failed!") - return resp.status.phase != 'Running' http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py b/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py index 676245c..d2344a2 100644 --- a/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py @@ -12,5 +12,4 @@ # See the License for the specific language governing permissions and from .kubernetes_request_factory import * -from .job_request_factory import * from .pod_request_factory import * http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/airflow/contrib/kubernetes/kubernetes_request_factory/job_request_factory.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/job_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/job_request_factory.py index fda488e..a06b434 100644 --- a/airflow/contrib/kubernetes/kubernetes_request_factory/job_request_factory.py +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/job_request_factory.py @@ -10,9 +10,9 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and - +import logging import yaml -from .kubernetes_request_factory import * +from .kubernetes_request_factory import KubernetesRequestFactory, KubernetesRequestFactoryHelper as kreq class SimpleJobRequestFactory(KubernetesRequestFactory): @@ -21,7 +21,7 @@ class SimpleJobRequestFactory(KubernetesRequestFactory): """ def __init__(self): - pass + super(SimpleJobRequestFactory, self).__init__() _yaml = """apiVersion: batch/v1 kind: Job @@ -36,25 +36,22 @@ spec: - name: base image: airflow-slave:latest command: ["/usr/local/airflow/entrypoint.sh", "/bin/bash sleep 25"] - volumeMounts: - - name: shared-data - mountPath: "/usr/local/airflow/dags" restartPolicy: Never """ - def create(self, pod): + def create_body(self, pod): req = yaml.load(self._yaml) - sub_req = req['spec']['template'] - extract_name(pod, sub_req) - extract_labels(pod, sub_req) - extract_image(pod, sub_req) - extract_cmds(pod, sub_req) + kreq.extract_name(pod, req) + kreq.extract_labels(pod, req) + kreq.extract_image(pod, req) + kreq.extract_cmds(pod, req) + kreq.extract_args(pod, req) if len(pod.node_selectors) > 0: - extract_node_selector(pod, sub_req) - extract_secrets(pod, sub_req) - print("attaching volume mounts") - attach_volume_mounts(sub_req) + kreq.extract_node_selector(pod, req) + kreq.extract_secrets(pod, req) + logging.info("attaching volume mounts") + kreq.attach_volume_mounts(req) return req - - + def after_create(self, body, pod): + pass http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py index a103fd9..2382561 100644 --- a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py @@ -12,10 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging from abc import ABCMeta, abstractmethod -from airflow import dag_importer - class KubernetesRequestFactory(): """ @@ -35,73 +32,75 @@ class KubernetesRequestFactory(): """ pass - -def extract_image(pod, req): - req['spec']['containers'][0]['image'] = pod.image - - -def add_secret_to_env(env, secret): - env.append({ - 'name': secret.deploy_target, - 'valueFrom': { - 'secretKeyRef': { - 'name': secret.secret, - 'key': secret.key - } - } - }) - - -def extract_labels(pod, req): - for k in pod.labels.keys(): - req['metadata']['labels'][k] = pod.labels[k] - - -def extract_cmds(pod, req): - req['spec']['containers'][0]['command'] = pod.cmds - - -def extract_node_selector(pod, req): - req['spec']['nodeSelector'] = pod.node_selectors - - -def extract_secrets(pod, req): - env_secrets = [s for s in pod.secrets if s.deploy_type == 'env'] - if len(pod.envs) > 0 or len(env_secrets) > 0: - env = [] - for k in pod.envs.keys(): - env.append({'name': k, 'value': pod.envs[k]}) - for secret in env_secrets: - add_secret_to_env(env, secret) - req['spec']['containers'][0]['env'] = env - - -def attach_volume_mounts(req): - logging.info("preparing to import dags") - dag_importer.import_dags() - logging.info("using file mount {}".format(dag_importer.dag_import_spec)) - req['spec']['volumes'] = [dag_importer.dag_import_spec] - - -def extract_name(pod, req): - req['metadata']['name'] = pod.name - - -def extract_volume_secrets(pod, req): - vol_secrets = [s for s in pod.secrets if s.deploy_type == 'volume'] - if any(vol_secrets): - req['spec']['containers'][0]['volumeMounts'] = [] - req['spec']['volumes'] = [] - for idx, vol in enumerate(vol_secrets): - vol_id = 'secretvol' + str(idx) - req['spec']['containers'][0]['volumeMounts'].append({ - 'mountPath': vol.deploy_target, - 'name': vol_id, - 'readOnly': True - }) - req['spec']['volumes'].append({ - 'name': vol_id, - 'secret': { - 'secretName': vol.secret + @staticmethod + def extract_image(pod, req): + req['spec']['containers'][0]['image'] = pod.image + + @staticmethod + def add_secret_to_env(env, secret): + env.append({ + 'name':secret.deploy_target, + 'valueFrom':{ + 'secretKeyRef':{ + 'name':secret.secret, + 'key':secret.key + } } }) + + @staticmethod + def extract_labels(pod, req): + for k in pod.labels.keys(): + req['metadata']['labels'][k] = pod.labels[k] + + @staticmethod + def extract_cmds(pod, req): + req['spec']['containers'][0]['command'] = pod.cmds + + @staticmethod + def extract_args(pod, req): + req['spec']['containers'][0]['args'] = pod.args + + @staticmethod + def extract_node_selector(pod, req): + req['spec']['nodeSelector'] = pod.node_selectors + + + @staticmethod + def attach_volume_mounts(pod, req): + req['spec']['volumes'] = pod.volumes + + @staticmethod + def extract_name(pod, req): + req['metadata']['name'] = pod.name + + @staticmethod + def extract_volume_secrets(pod, req): + vol_secrets = [s for s in pod.secrets if s.deploy_type == 'volume'] + if any(vol_secrets): + req['spec']['containers'][0]['volumeMounts'] = [] + req['spec']['volumes'] = [] + for idx, vol in enumerate(vol_secrets): + vol_id = 'secretvol' + str(idx) + req['spec']['containers'][0]['volumeMounts'].append({ + 'mountPath':vol.deploy_target, + 'name':vol_id, + 'readOnly':True + }) + req['spec']['volumes'].append({ + 'name':vol_id, + 'secret':{ + 'secretName':vol.secret + } + }) + + @staticmethod + def extract_secrets(pod, req): + env_secrets = [s for s in pod.secrets if s.deploy_type == 'env'] + if len(pod.envs) > 0 or len(env_secrets) > 0: + env = [] + for k in pod.envs.keys(): + env.append({'name':k, 'value':pod.envs[k]}) + for secret in env_secrets: + KubernetesRequestFactory.add_secret_to_env(env, secret) + req['spec']['containers'][0]['env'] = env http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py index 466972b..4a0cbeb 100644 --- a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py @@ -13,6 +13,7 @@ import kubernetes_request_factory as kreq import yaml +from airflow.contrib.kubernetes.pod import Pod from airflow import AirflowException @@ -39,48 +40,16 @@ spec: pass def create(self, pod): + # type: (Pod) -> dict req = yaml.load(self._yaml) kreq.extract_name(pod, req) kreq.extract_labels(pod, req) kreq.extract_image(pod, req) kreq.extract_cmds(pod, req) + kreq.extract_args(pod, req) if len(pod.node_selectors) > 0: - kreq.extract_node_selector(pod, req) - kreq.extract_secrets(pod, req) - kreq.extract_volume_secrets(pod, req) - kreq.attach_volume_mounts(req) + self.extract_node_selector(pod, req) + self.extract_secrets(pod, req) + self.extract_volume_secrets(pod, req) + self.attach_volume_mounts(req=req, pod=pod) return req - - -class ReturnValuePodRequestFactory(SimplePodRequestFactory): - """ - Pod request factory with a PreStop hook to upload return value - to the system's etcd service. - :param kube_com_service_factory: Kubernetes Communication Service factory - :type kube_com_service_factory: () => KubernetesCommunicationService - """ - - def __init__(self, kube_com_service_factory, result_data_file): - super(ReturnValuePodRequestFactory, self).__init__() - self._kube_com_service_factory = kube_com_service_factory - self._result_data_file = result_data_file - - def after_create(self, body, pod): - """ - Augment the pod with hyper-parameterized specific logic - Adds a Kubernetes PreStop hook to upload the model training - metrics to the Kubernetes communication engine (probably - an etcd service running with airflow) - """ - container = body['spec']['containers'][0] - pre_stop_hook = self._kube_com_service_factory() \ - .pod_pre_stop_hook(self._result_data_file, pod.name) - # Pre-stop hook only works on containers that are deleted. If the container - # naturally exists there would be no pre-stop hook execution. Therefore we - # simulate the hook by wrapping the exe command inside a script - if "'" in ' '.join(container['command']): - raise AirflowException('Please do not include single quote ' - 'in your command for hyperparameterized pods') - cmd = ' '.join(["'" + c + "'" if " " in c else c for c in container['command']]) - container['command'] = ['/bin/bash', '-c', "({}) ; ({})" - .format(cmd, pre_stop_hook)] http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/airflow/contrib/kubernetes/pod.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py index c38783c..0200afa 100644 --- a/airflow/contrib/kubernetes/pod.py +++ b/airflow/contrib/kubernetes/pod.py @@ -12,12 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from kubernetes import client, config -from kubernetes_request_factory import KubernetesRequestFactory, SimplePodRequestFactory import logging -from airflow import AirflowException -import time -import json class Pod: """ @@ -43,49 +38,21 @@ class Pod: envs, cmds, secrets, - labels, - node_selectors, - kube_req_factory, - name, + labels=None, + node_selectors=None, + name=None, + volumes = [], namespace='default', result=None): self.image = image - self.envs = envs + self.envs = envs if envs else {} self.cmds = cmds self.secrets = secrets self.result = result - self.labels = labels + self.labels = labels if labels else [] self.name = name - self.node_selectors = node_selectors - self.kube_req_factory = (kube_req_factory or SimplePodRequestFactory)() + self.volumes = volumes + self.node_selectors = node_selectors if node_selectors else [] self.namespace = namespace self.logger = logging.getLogger(self.__class__.__name__) - if not isinstance(self.kube_req_factory, KubernetesRequestFactory): - raise AirflowException('`kube_req_factory`' - ' should implement KubernetesRequestFactory') - def launch(self): - """ - Launches the pod synchronously and waits for completion. - """ - k8s_beta = self._kube_client() - req = self.kube_req_factory.create(self) - print(json.dumps(req)) - resp = k8s_beta.create_namespaced_job(body=req, namespace=self.namespace) - self.logger.info("Job created. status='%s', yaml:\n%s" - % (str(resp.status), str(req))) - while not self._execution_finished(): - time.sleep(10) - return self.result - - def _kube_client(self): - config.load_incluster_config() - return client.BatchV1Api() - - def _execution_finished(self): - k8s_beta = self._kube_client() - resp = k8s_beta.read_namespaced_job_status(self.name, namespace=self.namespace) - self.logger.info('status : ' + str(resp.status)) - if resp.status.phase == 'Failed': - raise Exception("Job " + self.name + " failed!") - return resp.status.phase != 'Running' http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/airflow/contrib/kubernetes/pod_launcher.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/pod_launcher.py b/airflow/contrib/kubernetes/pod_launcher.py index a774d79..e92ae5c 100644 --- a/airflow/contrib/kubernetes/pod_launcher.py +++ b/airflow/contrib/kubernetes/pod_launcher.py @@ -11,135 +11,72 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import base64 +from airflow.contrib.kubernetes.pod import Pod +from airflow.contrib.kubernetes.kubernetes_request_factory import SimplePodRequestFactory +from kubernetes import config, client, watch +from kubernetes.client import V1Pod +from airflow.utils.state import State import json import logging -import time -import urllib2 - -from kubernetes import client, config - -from kubernetes_request_factory import KubernetesRequestFactory -from pod import Pod - - -def kube_client(): - config.load_incluster_config() - return client.CoreV1Api() -def incluster_namespace(): - """ - :return: The incluster namespace. - """ - config.load_incluster_config() - k8s_configuration = config.incluster_config.configuration - encoded_namespace = k8s_configuration.api_key['authorization'].split(' ')[-1] - api_key = str(base64.b64decode(encoded_namespace)) - key_with_namespace = [k for k in api_key.split(',') if 'namespace' in k][0] - unformatted_namespace = key_with_namespace.split(':')[-1] - return unformatted_namespace.replace('"', '') +class PodLauncher: + def __init__(self): + self.kube_req_factory = SimplePodRequestFactory() + self._client = self._kube_client() + self._watch = watch.Watch() + self.logger = logging.getLogger(__name__) + def run_pod_async(self, pod): + req = self.kube_req_factory.create(pod) + print(json.dumps(req)) + resp = self._client.create_namespaced_pod(body=req, namespace=pod.namespace) + return resp -class KubernetesLauncher: - """ - This class is responsible for launching objects to Kubernetes. - Extend this class to launch exotic objects. - Before trying to extend this method check if augmenting the request factory - is enough for your use-case - :param kube_object: A pod or anything that represents a Kubernetes object - :type kube_object: Pod - :param request_factory: A factory method to create kubernetes requests. - """ - - pod_timeout = 3600 - - def __init__(self, kube_object, request_factory): - if not isinstance(kube_object, Pod): - raise Exception('`kube_object` must inherit from Pod') - if not isinstance(request_factory, KubernetesRequestFactory): - raise Exception('`request_factory` must inherit from ' - 'KubernetesRequestFactory') - self.pod = kube_object - self.request_factory = request_factory - - def launch(self): + def run_pod(self, pod): + # type: (Pod) -> State """ Launches the pod synchronously and waits for completion. - No return value from execution. Will raise an exception if things failed """ - k8s_beta = kube_client() - req = self.request_factory.create(self) - logging.info(json.dumps(req)) - resp = k8s_beta.create_namespaced_pod(body=req, namespace=self.pod.namespace) - logging.info("Job created. status='%s', yaml:\n%s" - % (str(resp.status), str(req))) - for i in range(1, self.pod_timeout): - time.sleep(10) - logging.info('Waiting for success') - if self._execution_finished(): - logging.info('Job finished!') - return - raise Exception("Job timed out!") - - def _execution_finished(self): - k8s_beta = kube_client() - resp = k8s_beta.read_namespaced_pod_status( - self.pod.name, - namespace=self.pod.namespace) - logging.info('status : ' + str(resp.status)) - logging.info('phase : i' + str(resp.status.phase)) - if resp.status.phase == 'Failed': - raise Exception("Job " + self.pod.name + " failed!") - return resp.status.phase != 'Running' - - -class KubernetesCommunicationService: - """ - A service that manages communications between pods in Kubernetes and ariflow dagrun - Note that etcd service is running side by side of the airflow on the same machine - using kubernetes magic, so on airflow side we use localhost, and on the remote side - we use the provided etcd host. - """ - - def __init__(self, etcd_host, etcd_port): - self.etcd_host = etcd_host - self.etcd_port = etcd_port - self.url = 'http://localhost:{}'.format(self.etcd_port) - - def pod_pre_stop_hook(self, return_data_file, task_id): - return 'echo value=$(cat %s) | curl -d "@-" -X PUT %s:%s/v2/keys/pod_metrics/%s' \ - % ( - return_data_file, self.etcd_host, self.etcd_port, task_id) - - def pod_return_data(self, task_id): - """ - Returns the pod's return data. The pod_pre_stop_hook is responsible to upload - the return data to etcd. - - If the return_data_file is generated by the application, the pre stop hook - will upload it to etcd and we will be download it back to airflow. - """ - logging.info('querying {} for task id {}'.format(self.url, task_id)) - try: - result = urllib2.urlopen(self.url + '/v2/keys/pod_metrics/' + task_id).read() - logging.info('result for querying {} for task id {}: {}' - .format(self.url, task_id, result)) - result = json.loads(result)['node']['value'] - return result - except urllib2.HTTPError as err: - if err.code == 404: - return None # Data not found - raise - - @staticmethod - def from_dag_default_args(dag): - (etcd_host, etcd_port) = dag.default_args.get('etcd_endpoint', ':').split(':') - logging.info('Setting etcd endpoint from dag default args {}:{}' - .format(etcd_host, etcd_port)) - if not etcd_host: - raise Exception('`KubernetesCommunicationService` ' - 'requires etcd endpoint. Please defined it in dag ' - 'degault_args') - return KubernetesCommunicationService(etcd_host, etcd_port) + resp = self.run_pod_async(pod) + final_status = self._monitor_pod(pod) + return final_status + + def _kube_client(self): + #TODO: This should also allow people to point to a cluster. + config.load_incluster_config() + return client.CoreV1Api() + + def _monitor_pod(self, pod): + # type: (Pod) -> State + for event in self._watch.stream(self.read_pod(pod), pod.namespace): + status = self._task_status(event) + if status == State.SUCCESS or status == State.FAILED: + return status + + def _task_status(self, event): + # type: (V1Pod) -> State + task = event['object'] + self.logger.info( + "Event: {} had an event of type {}".format(task.metadata.name, + event['type'])) + status = self.process_status(task.metadata.name, task.status.phase) + return status + + def read_pod(self, pod): + return self._client.read_namespaced_pod(pod.name, pod.namespace) + + def process_status(self, job_id, status): + if status == 'Pending': + return State.QUEUED + elif status == 'Failed': + self.logger.info("Event: {} Failed".format(job_id)) + return State.FAILED + elif status == 'Succeeded': + self.logger.info("Event: {} Succeeded".format(job_id)) + return State.SUCCESS + elif status == 'Running': + return State.RUNNING + else: + self.logger.info("Event: Invalid state {} on job {}".format(status, job_id)) + return State.FAILED http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/airflow/contrib/operators/k8s_pod_operator/k8s_pod_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/k8s_pod_operator/k8s_pod_operator.py b/airflow/contrib/operators/k8s_pod_operator/k8s_pod_operator.py index bf7b048..6af66ea 100644 --- a/airflow/contrib/operators/k8s_pod_operator/k8s_pod_operator.py +++ b/airflow/contrib/operators/k8s_pod_operator/k8s_pod_operator.py @@ -18,74 +18,61 @@ from airflow.exceptions import AirflowException from airflow.operators.python_operator import PythonOperator from airflow.utils.decorators import apply_defaults from airflow.contrib.kubernetes.pod_launcher import KubernetesLauncher, \ - KubernetesCommunicationService + KubernetesCommunicationService, incluster_namespace from airflow.contrib.kubernetes.kubernetes_request_factory import \ SimplePodRequestFactory, \ ReturnValuePodRequestFactory -from .op_context import OpContext class PodOperator(PythonOperator): """ Executes a pod and waits for the job to finish. + :param dag_run_id: The unique run ID that would be attached to the pod as a label :type dag_run_id: str :param pod_factory: Reference to the function that creates the pod with format: function (OpContext) => Pod :type pod_factory: callable - :param cache_output: If set to true, the output of the pod would be saved in a - cache object using md5 hash of all the pod parameters - and in case of success, the cached results will be returned - on consecutive calls. Only use this """ # template_fields = tuple('dag_run_id') ui_color = '#8da7be' @apply_defaults def __init__( - self, - dag_run_id, - pod_factory, - cache_output, - kube_request_factory=None, - *args, - **kwargs - ): - super(PodOperator, self).__init__( - python_callable=lambda _: 1, - provide_context=True, - *args, - **kwargs) + self, + dag_run_id, + pod_factory, + kube_request_factory=None, + *args, **kwargs): + super(PodOperator, self).__init__(python_callable=lambda _: 1, provide_context=True, *args, **kwargs) self.logger = logging.getLogger(self.__class__.__name__) if not callable(pod_factory): raise AirflowException('`pod_factory` param must be callable') self.dag_run_id = dag_run_id self.pod_factory = pod_factory - self._cache_output = cache_output - self.op_context = OpContext(self.task_id) self.kwargs = kwargs self._kube_request_factory = kube_request_factory or SimplePodRequestFactory def execute(self, context): - task_instance = context.get('task_instance') - if task_instance is None: - raise AirflowException('`task_instance` is empty! This should not happen') - self.op_context.set_xcom_instance(task_instance) - pod = self.pod_factory(self.op_context, context) + pod = self.get_pod_object(context) + # Customize the pod pod.name = self.task_id pod.labels['run_id'] = self.dag_run_id - pod.namespace = self.dag.default_args.get('namespace', pod.namespace) + try: + pod.namespace = self.dag.default_args.get('namespace', pod.namespace) or incluster_namespace() + except: + # Used default namespace + pass # Launch the pod and wait for it to finish KubernetesLauncher(pod, self._kube_request_factory).launch() - self.op_context.result = pod.result + result = pod.result + context['ti'].xcom_push(key='result', value=result) - # Cache the output custom_return_value = self.on_pod_success(context) - if custom_return_value: - self.op_context.custom_return_value = custom_return_value - return self.op_context.result + self.set_custom_return_value(context, custom_return_value) + return result def on_pod_success(self, context): """ @@ -95,32 +82,39 @@ class PodOperator(PythonOperator): """ pass + def get_pod_object(self, context): + """ + Returns a pod object. Overwrite this method to define custom objects + :param context: The task context + :return: The pod object + """ + return self.pod_factory(context) + + def set_custom_return_value(self, context, custom_return_value): + if custom_return_value: + context['ti'].xcom_push(key='custom_result', value=custom_return_value) + class ReturnValuePodOperator(PodOperator): """ This pod operators is a normal pod operator with the addition of reading custom return value back from kubernetes. """ - def __init__(self, - kube_com_service_factory, result_data_file, + kube_com_service_factory=None, *args, **kwargs): super(ReturnValuePodOperator, self).__init__(*args, **kwargs) + kube_com_service_factory = kube_com_service_factory or ( + lambda: KubernetesCommunicationService.from_dag_default_args(self.dag)) if not isinstance(kube_com_service_factory(), KubernetesCommunicationService): - raise AirflowException( - '`kube_com_service_factory` must be of type ' - 'KubernetesCommunicationService') + raise AirflowException('`kube_com_service_factory` must be of type KubernetesCommunicationService') self._kube_com_service_factory = kube_com_service_factory self._result_data_file = result_data_file - self._kube_request_factory = self._return_value_kube_request # Overwrite the - # default request factory + self._kube_request_factory = self._return_value_kube_request # Overwrite the default request factory def on_pod_success(self, context): - return_val = self._kube_com_service_factory().pod_return_data(self.task_id) - self.op_context.result = return_val # We also overwrite the results - return return_val + return self._kube_com_service_factory().pod_return_data(self.task_id) def _return_value_kube_request(self): - return ReturnValuePodRequestFactory(self._kube_com_service_factory, - self._result_data_file) + return ReturnValuePodRequestFactory(self._kube_com_service_factory, self._result_data_file) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/airflow/contrib/operators/k8s_pod_operator/op_context.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/k8s_pod_operator/op_context.py b/airflow/contrib/operators/k8s_pod_operator/op_context.py deleted file mode 100644 index 55a3b00..0000000 --- a/airflow/contrib/operators/k8s_pod_operator/op_context.py +++ /dev/null @@ -1,104 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and - -from airflow import AirflowException -import logging - - -class OpContext(object): - """ - Data model for operation context of a pod operator with hyper parameters. - OpContext is able to communicate the context between PodOperators by - encapsulating XCom communication - Note: do not directly modify the upstreams - Also note: xcom_instance MUST be set before any attribute of this class can be - read. - :param: task_id The task ID - """ - _supported_attributes = {'hyper_parameters', 'custom_return_value'} - - def __init__(self, task_id): - self.task_id = task_id - self._upstream = [] - self._result = '__not_set__' - self._data = {} - self._xcom_instance = None - self._parent = None - - def __str__(self): - return 'upstream: [' + \ - ','.join([u.task_id for u in self._upstream]) + ']\n' + \ - 'params:' + ','.join( - [k + '=' + str(self._data[k]) for k in self._data.keys()]) - - def __setattr__(self, name, value): - if name in self._data: - raise AirflowException('`{}` is already set'.format(name)) - if name not in self._supported_attributes: - logging.warn( - '`{}` is not in the supported attribute list for OpContext'.format(name)) - self.get_xcom_instance().xcom_push(key=name, value=value) - self._data[name] = value - - def __getattr__(self, item): - if item not in self._supported_attributes: - logging.warn( - '`{}` is not in the supported attribute list for OpContext'.format(item)) - if item not in self._data: - self._data[item] = self.get_xcom_instance().xcom_pull(key=item, - task_ids=self.task_id) - return self._data[item] - - @property - def result(self): - if self._result == '__not_set__': - self._result = self.get_xcom_instance().xcom_pull(task_ids=self.task_id) - return self._result - - @result.setter - def result(self, value): - if self._result != '__not_set__': - raise AirflowException('`result` is already set') - self._result = value - - @property - def upstream(self): - return self._upstream - - def append_upstream(self, upstream_op_contexes): - """ - Appends a list of op_contexts to the upstream. It will create new instances and - set the task_id. - All the upstream op_contextes will share the same xcom_instance with this - op_context - :param upstream_op_contexes: List of upstream op_contextes - """ - for up in upstream_op_contexes: - op_context = OpContext(up.tak_id) - op_context._parent = self - self._upstream.append(op_context) - - def set_xcom_instance(self, xcom_instance): - """ - Sets the xcom_instance for this op_context and upstreams - :param xcom_instance: The Airflow TaskInstance for communication through XCom - :type xcom_instance: airflow.models.TaskInstance - """ - self._xcom_instance = xcom_instance - - def get_xcom_instance(self): - if self._xcom_instance is None and self._parent is None: - raise AirflowException( - 'Trying to access attribtues from OpContext before setting the ' - 'xcom_instance') - return self._xcom_instance or self._parent.get_xcom_instance() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/airflow/contrib/operators/kubernetes/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/kubernetes/__init__.py b/airflow/contrib/operators/kubernetes/__init__.py new file mode 100644 index 0000000..9d7677a --- /dev/null +++ b/airflow/contrib/operators/kubernetes/__init__.py @@ -0,0 +1,13 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/airflow/contrib/operators/kubernetes/pod_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/kubernetes/pod_operator.py b/airflow/contrib/operators/kubernetes/pod_operator.py new file mode 100644 index 0000000..8b7a55f --- /dev/null +++ b/airflow/contrib/operators/kubernetes/pod_operator.py @@ -0,0 +1,104 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from airflow.exceptions import AirflowException +from airflow.operators.python_operator import PythonOperator +from airflow.utils.decorators import apply_defaults +from airflow.contrib.kubernetes.pod_launcher import PodLauncher +from airflow.contrib.kubernetes.pod import Pod +from airflow.utils.state import State + + +class PodOperator(PythonOperator): + """ + Executes a pod and waits for the job to finish. + :param dag_run_id: The unique run ID that would be attached to the pod as a label + :type dag_run_id: str + :param pod_factory: Reference to the function that creates the pod with format: + function (OpContext) => Pod + :type pod_factory: callable + :param cache_output: If set to true, the output of the pod would be saved in a + cache object using md5 hash of all the pod parameters + and in case of success, the cached results will be returned + on consecutive calls. Only use this + """ + # template_fields = tuple('dag_run_id') + ui_color = '#8da7be' + + def blank_func(self, context): + return None + + @apply_defaults + def __init__( + self, + dag_run_id, + pod, + on_pod_success_func = blank_func, + *args, + **kwargs + ): + # type: (str, Pod) -> PodOperator + super(PodOperator, self).__init__( + python_callable=lambda _:1, + provide_context=True, + *args, + **kwargs) + self.logger = logging.getLogger(self.__class__.__name__) + self.pod = pod + self.dag_run_id = dag_run_id + self.pod_launcher = PodLauncher() + self.kwargs = kwargs + self._on_pod_success_func = on_pod_success_func + + def execute(self, context): + task_instance = context.get('task_instance') + if task_instance is None: + raise AirflowException('`task_instance` is empty! This should not happen') + + pod = self.pod + + # Customize the pod + pod.name = self.task_id + pod.labels['run_id'] = self.dag_run_id + pod.namespace = self.dag.default_args.get('namespace', pod.namespace) + + pod_result = self.pod_launcher.run_pod(pod) + + if pod_result == State.FAILED: + raise AirflowException("Pod returned a failed status") + + # Launch the pod and wait for it to finish + self.op_context.result = pod.result + if pod_result == State.FAILED: + raise AirflowException("Pod failed") + + # Cache the output + custom_return_value = self.on_pod_success(context) + if custom_return_value: + return custom_return_value + + def on_pod_success(self, context): + """ + Called when pod is executed successfully. + + If you want to access return values for XCOM, place values + in accessible file system or DB and override this function. + + :return: Returns a custom return value for pod which will + be stored in xcom + + """ + return self._on_pod_success_func(context=context) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/airflow/example_dags/example_pod_operator.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_pod_operator.py b/airflow/example_dags/example_pod_operator.py new file mode 100644 index 0000000..ec62aaf --- /dev/null +++ b/airflow/example_dags/example_pod_operator.py @@ -0,0 +1,91 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Example of the PodOperator and ReturnValuePodOperator which would execute +pods on a Kubernetes cluster. PodOperator would only work if airflow is +deployed within kubernetes. +""" +import os + +import airflow +import random +from airflow.contrib.kubernetes.pod import Pod, Config +from airflow.contrib.operators.k8s_pod_operator import ReturnValuePodOperator, PodOperator +from airflow.models import DAG +from airflow.utils.trigger_rule import TriggerRule + +# TODO: Replace the etcd endpoint with your own etcd endpoint +args = { + 'owner': 'airflow', + 'etcd_endpoint': os.environ.get('AIRFLOWSVC_SERVICE_HOST') + ':' + + os.environ.get('AIRFLOWSVC_SERVICE_PORT_ETCDSVC_PORT'), + 'start_date': airflow.utils.dates.days_ago(2) +} + +docker_image = 'artprod.dev.bloomberg.com/ds/molecula-python:1.0.0.0-SNAPSHOT' # Replace with 'ubuntu:latest' +dag = DAG( + dag_id='example_pod_operator', default_args=args, + schedule_interval=None) + + +def pod_that_returns_hello(context): + """ + Returns a Pod object given the airflow context. + """ + image = docker_image + cmds = ['/bin/bash', '-c', 'echo "Hello $RANDOM" > /tmp/result.txt'] + return Pod(image=image, cmds=cmds) + + +hello_kube_step1 = ReturnValuePodOperator(dag=dag, + task_id='hello-kube-step1', + dag_run_id='run-1', + pod_factory=pod_that_returns_hello, + result_data_file='/tmp/result.txt') + + +def pod_that_reads_upstream_result(context): + up_task_id = 'hello-kube-step1' + # The message including a random number generated inside the upstream pod will be read here + return_val = context['ti'].xcom_pull(key='custom_result', task_ids=up_task_id) + image = docker_image + cmds = ['/bin/bash', '-c', 'echo ' + return_val] + return Pod(image=image, cmds=cmds) + + +hello_kube_step2 = PodOperator(dag=dag, + task_id='hello-kube-step2', + dag_run_id='run_1', + pod_factory=pod_that_reads_upstream_result) +hello_kube_step2.set_upstream(hello_kube_step1) + +def pod_that_injects_configs(context): + """ + The returning pod object has a configs map which tells the operator to inject some JSON objects as + config files + """ + image = docker_image # Replace with 'ubuntu:latest' + configs = [ Config('/configs/c1.json', { 'random_val': str(random.random()) }), Config('/configs/c2.json', { 'my_db': 'conn_str' }) ] + cmds = ['/bin/bash', '-c', 'sleep 3; cat /configs/c2.json'] + return Pod(image=image, cmds=cmds, configs=configs) + +hello_kube_step3 = ReturnValuePodOperator(dag=dag, + task_id='hello-kube-step3', + dag_run_id='run_1', + pod_factory=pod_that_injects_configs, + result_data_file='/configs/c1.json') +hello_kube_step3.set_upstream(hello_kube_step1) + + + http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/docker/Dockerfile ---------------------------------------------------------------------- diff --git a/docker/Dockerfile b/docker/Dockerfile new file mode 100644 index 0000000..2004e97 --- /dev/null +++ b/docker/Dockerfile @@ -0,0 +1,34 @@ +FROM ubuntu:16.04 + +# install deps +RUN apt-get update -y && apt-get install -y \ + wget \ + python-dev \ + python-pip \ + libczmq-dev \ + libcurlpp-dev \ + curl \ + libssl-dev \ + git \ + inetutils-telnet \ + bind9utils + +RUN pip install -U setuptools && \ + pip install -U pip + +RUN pip install kubernetes && \ + pip install cryptography && \ + pip install psycopg2==2.7.1 + +# install airflow +COPY airflow.tar.gz /tmp/airflow.tar.gz +RUN pip install /tmp/airflow.tar.gz + +# prep airflow +ENV AIRFLOW_HOME=/root/airflow +ENV AIRFLOW__CORE__EXECUTOR=KubernetesExecutor + +COPY bootstrap.sh /bootstrap.sh +RUN chmod +x /bootstrap.sh + +ENTRYPOINT ["/bootstrap.sh"] http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/docker/bootstrap.sh ---------------------------------------------------------------------- diff --git a/docker/bootstrap.sh b/docker/bootstrap.sh new file mode 100644 index 0000000..82124ac --- /dev/null +++ b/docker/bootstrap.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +# launch the appropriate process + +if [ "$1" = "webserver" ] +then + exec airflow webserver +fi + +if [ "$1" = "scheduler" ] +then + exec airflow scheduler +fi http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/docker/build.sh ---------------------------------------------------------------------- diff --git a/docker/build.sh b/docker/build.sh new file mode 100755 index 0000000..f2a942e --- /dev/null +++ b/docker/build.sh @@ -0,0 +1,12 @@ +IMAGE=grantnicholas/kubeairflow +TAG=${1:-latest} + +if [ -f airflow.tar.gz ]; then + echo "Not rebuilding airflow source" +else + cd ../ && python setup.py sdist && cd docker && \ + cp ../dist/apache-airflow-1.9.0.dev0+incubating.tar.gz airflow.tar.gz +fi + +docker build . --tag=${IMAGE}:${TAG} +docker push ${IMAGE}:${TAG} http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/kube/airflow.yaml.template ---------------------------------------------------------------------- diff --git a/kube/airflow.yaml.template b/kube/airflow.yaml.template new file mode 100644 index 0000000..c5877ca --- /dev/null +++ b/kube/airflow.yaml.template @@ -0,0 +1,107 @@ +apiVersion: extensions/v1beta1 +kind: Deployment +metadata: + name: airflow +spec: + replicas: 1 + template: + metadata: + labels: + name: airflow + annotations: + pod.beta.kubernetes.io/init-containers: '[ + { + "name": "init", + "image": "{{docker_image}}", + "command": ["bash", "-c", "cd /usr/local/lib/python2.7/dist-packages/airflow && airflow initdb && alembic upgrade head"], + "env": [ + {"name": "AIRFLOW__CORE__SQL_ALCHEMY_CONN", "value": "postgresql+psycopg2://root:root@postgres-airflow:5432/airflow"}, + {"name": "AIRFLOW__CORE__K8S_IMAGE", "value": "{{docker_image}}"}, + {"name": "AIRFLOW__CORE__K8S_GIT_REPO", "value": "https://github.com/grantnicholas/testdags.git"} + ] + } + ]' + spec: + initContainers: + - name: init + image: {{docker_image}} + command: [ + "bash", "-c", "cd /usr/local/lib/python2.7/dist-packages/airflow && airflow initdb && alembic upgrade head" + ] + env: + - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN + value: postgresql+psycopg2://root:root@postgres-airflow:5432/airflow + - name: AIRFLOW__CORE__K8S_IMAGE + value: {{docker_image}} + - name: AIRFLOW__CORE__K8S_GIT_REPO + value: https://github.com/grantnicholas/testdags.git + containers: + - name: web + image: {{docker_image}} + ports: + - name: web + containerPort: 8080 + args: ["webserver"] + env: + - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN + value: postgresql+psycopg2://root:root@postgres-airflow:5432/airflow + - name: AIRFLOW__CORE__K8S_IMAGE + value: {{docker_image}} + - name: AIRFLOW__CORE__K8S_GIT_REPO + value: https://github.com/grantnicholas/testdags.git + volumeMounts: + - name: dags + mountPath: /root/airflow/dags/synched + readinessProbe: + initialDelaySeconds: 5 + timeoutSeconds: 5 + periodSeconds: 5 + httpGet: + path: /admin + port: 8080 + livenessProbe: + initialDelaySeconds: 5 + timeoutSeconds: 5 + failureThreshold: 5 + httpGet: + path: /admin + port: 8080 + - name: scheduler + image: {{docker_image}} + args: ["scheduler"] + env: + - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN + value: postgresql+psycopg2://root:root@postgres-airflow:5432/airflow + - name: AIRFLOW__CORE__K8S_IMAGE + value: {{docker_image}} + - name: AIRFLOW__CORE__K8S_GIT_REPO + value: https://github.com/grantnicholas/testdags.git + volumeMounts: + - name: dags + mountPath: /root/airflow/dags/synched + - name: sync + image: gcr.io/google_containers/git-sync:v2.0.4 + env: + - name: GIT_SYNC_REPO + value: https://github.com/grantnicholas/testdags.git + - name: GIT_SYNC_DEST + value: git + volumeMounts: + - name: dags + mountPath: /git + volumes: + - name: dags + emptyDir: {} +--- +apiVersion: v1 +kind: Service +metadata: + name: airflow +spec: + type: NodePort + ports: + - port: 8080 + nodePort: 30809 + selector: + name: airflow + http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/kube/deploy.sh ---------------------------------------------------------------------- diff --git a/kube/deploy.sh b/kube/deploy.sh new file mode 100755 index 0000000..28b58ca --- /dev/null +++ b/kube/deploy.sh @@ -0,0 +1,6 @@ +IMAGE=${1:-grantnicholas/kubeairflow} +TAG=${2:-latest} + +mkdir -p .generated +kubectl apply -f postgres.yaml +sed "s#{{docker_image}}#$IMAGE:$TAG#g" airflow.yaml.template > .generated/airflow.yaml && kubectl apply -f .generated/airflow.yaml
