[AIRFLOW-1314] Add support for volume mounts & Secrets in Kubernetes Executor
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c177d6e8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c177d6e8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c177d6e8 Branch: refs/heads/master Commit: c177d6e8632e75db82b21c6633c8648e7428990a Parents: 5821320 Author: nyeganeh <[email protected]> Authored: Wed Jul 26 19:31:33 2017 +0000 Committer: Fokko Driesprong <[email protected]> Committed: Sun Apr 22 10:17:39 2018 +0200 ---------------------------------------------------------------------- .gitignore | 3 + airflow/bin/cli.py | 1794 ---------------- airflow/bin/cli/__init__.py | 3 + airflow/bin/cli/cli.py | 1915 ++++++++++++++++++ airflow/bin/cli/cli_factory.py | 491 +++++ .../contrib/executors/kubernetes_executor.py | 117 +- airflow/contrib/kubernetes/__init__.py | 3 - .../contrib/kubernetes/kubernetes_factory.py | 63 + .../kubernetes/kubernetes_job_builder.py | 9 +- .../kubernetes/kubernetes_pod_builder.py | 74 - .../kubernetes_request_factory/__init__.py | 1 - .../job_request_factory.py | 33 +- .../kubernetes_request_factory.py | 141 +- .../pod_request_factory.py | 45 +- airflow/contrib/kubernetes/pod.py | 49 +- airflow/contrib/kubernetes/pod_launcher.py | 183 +- .../k8s_pod_operator/k8s_pod_operator.py | 82 +- .../operators/k8s_pod_operator/op_context.py | 104 - .../contrib/operators/kubernetes/__init__.py | 13 + .../operators/kubernetes/pod_operator.py | 104 + airflow/example_dags/example_pod_operator.py | 91 + docker/Dockerfile | 34 + docker/bootstrap.sh | 13 + docker/build.sh | 12 + kube/airflow.yaml.template | 107 + kube/deploy.sh | 6 + kube/postgres.yaml | 94 + 27 files changed, 3219 insertions(+), 2365 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 5e7b24d..46df849 100644 --- a/.gitignore +++ b/.gitignore @@ -133,3 +133,6 @@ rat-results.txt # Git stuff .gitattributes +# Kubernetes generated templated files +kube/.generated/ +airflow.tar.gz http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/airflow/bin/cli.py ---------------------------------------------------------------------- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py deleted file mode 100755 index 7dc3b92..0000000 --- a/airflow/bin/cli.py +++ /dev/null @@ -1,1794 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from __future__ import print_function -import logging - -import reprlib - -import os -import socket -import subprocess -import textwrap -from importlib import import_module - -import argparse -from builtins import input -from collections import namedtuple -from airflow.utils.timezone import parse as parsedate -import json -from tabulate import tabulate - -import daemon -from daemon.pidfile import TimeoutPIDLockFile -import signal -import sys -import threading -import traceback -import time -import psutil -import re -import getpass -from urllib.parse import urlunparse - -import airflow -from airflow import api -from airflow import jobs, settings -from airflow import configuration as conf -from airflow.exceptions import AirflowException, AirflowWebServerTimeout -from airflow.executors import GetDefaultExecutor -from airflow.models import (DagModel, DagBag, TaskInstance, - DagPickle, DagRun, Variable, DagStat, - Connection, DAG) - -from airflow.ti_deps.dep_context import (DepContext, SCHEDULER_DEPS) -from airflow.utils import cli as cli_utils -from airflow.utils import db as db_utils -from airflow.utils.net import get_hostname -from airflow.utils.log.logging_mixin import (LoggingMixin, redirect_stderr, - redirect_stdout) -from airflow.www.app import (cached_app, create_app) -from airflow.www_rbac.app import cached_app as cached_app_rbac -from airflow.www_rbac.app import create_app as create_app_rbac -from airflow.www_rbac.app import cached_appbuilder - -from sqlalchemy import func -from sqlalchemy.orm import exc - -api.load_auth() -api_module = import_module(conf.get('cli', 'api_client')) -api_client = api_module.Client(api_base_url=conf.get('cli', 'endpoint_url'), - auth=api.api_auth.client_auth) - -log = LoggingMixin().log - - -def sigint_handler(sig, frame): - sys.exit(0) - - -def sigquit_handler(sig, frame): - """Helps debug deadlocks by printing stacktraces when this gets a SIGQUIT - e.g. kill -s QUIT <PID> or CTRL+\ - """ - print("Dumping stack traces for all threads in PID {}".format(os.getpid())) - id_to_name = dict([(th.ident, th.name) for th in threading.enumerate()]) - code = [] - for thread_id, stack in sys._current_frames().items(): - code.append("\n# Thread: {}({})" - .format(id_to_name.get(thread_id, ""), thread_id)) - for filename, line_number, name, line in traceback.extract_stack(stack): - code.append('File: "{}", line {}, in {}' - .format(filename, line_number, name)) - if line: - code.append(" {}".format(line.strip())) - print("\n".join(code)) - - -def setup_logging(filename): - root = logging.getLogger() - handler = logging.FileHandler(filename) - formatter = logging.Formatter(settings.SIMPLE_LOG_FORMAT) - handler.setFormatter(formatter) - root.addHandler(handler) - root.setLevel(settings.LOGGING_LEVEL) - - return handler.stream - - -def setup_locations(process, pid=None, stdout=None, stderr=None, log=None): - if not stderr: - stderr = os.path.join(os.path.expanduser(settings.AIRFLOW_HOME), "airflow-{}.err".format(process)) - if not stdout: - stdout = os.path.join(os.path.expanduser(settings.AIRFLOW_HOME), "airflow-{}.out".format(process)) - if not log: - log = os.path.join(os.path.expanduser(settings.AIRFLOW_HOME), "airflow-{}.log".format(process)) - if not pid: - pid = os.path.join(os.path.expanduser(settings.AIRFLOW_HOME), "airflow-{}.pid".format(process)) - - return pid, stdout, stderr, log - - -def process_subdir(subdir): - if subdir: - subdir = subdir.replace('DAGS_FOLDER', settings.DAGS_FOLDER) - subdir = os.path.abspath(os.path.expanduser(subdir)) - return subdir - - -def get_dag(args): - dagbag = DagBag(process_subdir(args.subdir)) - if args.dag_id not in dagbag.dags: - raise AirflowException( - 'dag_id could not be found: {}. Either the dag did not exist or it failed to ' - 'parse.'.format(args.dag_id)) - return dagbag.dags[args.dag_id] - - -def get_dags(args): - if not args.dag_regex: - return [get_dag(args)] - dagbag = DagBag(process_subdir(args.subdir)) - matched_dags = [dag for dag in dagbag.dags.values() if re.search( - args.dag_id, dag.dag_id)] - if not matched_dags: - raise AirflowException( - 'dag_id could not be found with regex: {}. Either the dag did not exist ' - 'or it failed to parse.'.format(args.dag_id)) - return matched_dags - - -@cli_utils.action_logging -def backfill(args, dag=None): - logging.basicConfig( - level=settings.LOGGING_LEVEL, - format=settings.SIMPLE_LOG_FORMAT) - - dag = dag or get_dag(args) - - if not args.start_date and not args.end_date: - raise AirflowException("Provide a start_date and/or end_date") - - # If only one date is passed, using same as start and end - args.end_date = args.end_date or args.start_date - args.start_date = args.start_date or args.end_date - - if args.task_regex: - dag = dag.sub_dag( - task_regex=args.task_regex, - include_upstream=not args.ignore_dependencies) - - if args.dry_run: - print("Dry run of DAG {0} on {1}".format(args.dag_id, - args.start_date)) - for task in dag.tasks: - print("Task {0}".format(task.task_id)) - ti = TaskInstance(task, args.start_date) - ti.dry_run() - else: - dag.run( - start_date=args.start_date, - end_date=args.end_date, - mark_success=args.mark_success, - include_adhoc=args.include_adhoc, - local=args.local, - donot_pickle=(args.donot_pickle or - conf.getboolean('core', 'donot_pickle')), - ignore_first_depends_on_past=args.ignore_first_depends_on_past, - ignore_task_deps=args.ignore_dependencies, - pool=args.pool, - delay_on_limit_secs=args.delay_on_limit) - - -@cli_utils.action_logging -def trigger_dag(args): - """ - Creates a dag run for the specified dag - :param args: - :return: - """ - log = LoggingMixin().log - try: - message = api_client.trigger_dag(dag_id=args.dag_id, - run_id=args.run_id, - conf=args.conf, - execution_date=args.exec_date) - except IOError as err: - log.error(err) - raise AirflowException(err) - log.info(message) - - -@cli_utils.action_logging -def delete_dag(args): - """ - Deletes all DB records related to the specified dag - :param args: - :return: - """ - log = LoggingMixin().log - if args.yes or input( - "This will drop all existing records related to the specified DAG. " - "Proceed? (y/n)").upper() == "Y": - try: - message = api_client.delete_dag(dag_id=args.dag_id) - except IOError as err: - log.error(err) - raise AirflowException(err) - log.info(message) - else: - print("Bail.") - - -@cli_utils.action_logging -def pool(args): - log = LoggingMixin().log - - def _tabulate(pools): - return "\n%s" % tabulate(pools, ['Pool', 'Slots', 'Description'], - tablefmt="fancy_grid") - - try: - if args.get is not None: - pools = [api_client.get_pool(name=args.get)] - elif args.set: - pools = [api_client.create_pool(name=args.set[0], - slots=args.set[1], - description=args.set[2])] - elif args.delete: - pools = [api_client.delete_pool(name=args.delete)] - else: - pools = api_client.get_pools() - except (AirflowException, IOError) as err: - log.error(err) - else: - log.info(_tabulate(pools=pools)) - - -@cli_utils.action_logging -def variables(args): - if args.get: - try: - var = Variable.get(args.get, - deserialize_json=args.json, - default_var=args.default) - print(var) - except ValueError as e: - print(e) - if args.delete: - session = settings.Session() - session.query(Variable).filter_by(key=args.delete).delete() - session.commit() - session.close() - if args.set: - Variable.set(args.set[0], args.set[1]) - # Work around 'import' as a reserved keyword - imp = getattr(args, 'import') - if imp: - if os.path.exists(imp): - import_helper(imp) - else: - print("Missing variables file.") - if args.export: - export_helper(args.export) - if not (args.set or args.get or imp or args.export or args.delete): - # list all variables - session = settings.Session() - vars = session.query(Variable) - msg = "\n".join(var.key for var in vars) - print(msg) - - -def import_helper(filepath): - with open(filepath, 'r') as varfile: - var = varfile.read() - - try: - d = json.loads(var) - except Exception: - print("Invalid variables file.") - else: - try: - n = 0 - for k, v in d.items(): - if isinstance(v, dict): - Variable.set(k, v, serialize_json=True) - else: - Variable.set(k, v) - n += 1 - except Exception: - pass - finally: - print("{} of {} variables successfully updated.".format(n, len(d))) - - -def export_helper(filepath): - session = settings.Session() - qry = session.query(Variable).all() - session.close() - - var_dict = {} - d = json.JSONDecoder() - for var in qry: - val = None - try: - val = d.decode(var.val) - except Exception: - val = var.val - var_dict[var.key] = val - - with open(filepath, 'w') as varfile: - varfile.write(json.dumps(var_dict, sort_keys=True, indent=4)) - print("{} variables successfully exported to {}".format(len(var_dict), filepath)) - - -@cli_utils.action_logging -def pause(args, dag=None): - set_is_paused(True, args, dag) - - -@cli_utils.action_logging -def unpause(args, dag=None): - set_is_paused(False, args, dag) - - -def set_is_paused(is_paused, args, dag=None): - dag = dag or get_dag(args) - - session = settings.Session() - dm = session.query(DagModel).filter( - DagModel.dag_id == dag.dag_id).first() - dm.is_paused = is_paused - session.commit() - - msg = "Dag: {}, paused: {}".format(dag, str(dag.is_paused)) - print(msg) - - -def _run(args, dag, ti): - if args.local: - run_job = jobs.LocalTaskJob( - task_instance=ti, - mark_success=args.mark_success, - pickle_id=args.pickle, - ignore_all_deps=args.ignore_all_dependencies, - ignore_depends_on_past=args.ignore_depends_on_past, - ignore_task_deps=args.ignore_dependencies, - ignore_ti_state=args.force, - pool=args.pool) - run_job.run() - elif args.raw: - ti._run_raw_task( - mark_success=args.mark_success, - job_id=args.job_id, - pool=args.pool, - ) - else: - pickle_id = None - if args.ship_dag: - try: - # Running remotely, so pickling the DAG - session = settings.Session() - pickle = DagPickle(dag) - session.add(pickle) - session.commit() - pickle_id = pickle.id - # TODO: This should be written to a log - print('Pickled dag {dag} as pickle_id:{pickle_id}' - .format(**locals())) - except Exception as e: - print('Could not pickle the DAG') - print(e) - raise e - - executor = GetDefaultExecutor() - executor.start() - print("Sending to executor.") - executor.queue_task_instance( - ti, - mark_success=args.mark_success, - pickle_id=pickle_id, - ignore_all_deps=args.ignore_all_dependencies, - ignore_depends_on_past=args.ignore_depends_on_past, - ignore_task_deps=args.ignore_dependencies, - ignore_ti_state=args.force, - pool=args.pool) - executor.heartbeat() - executor.end() - - -@cli_utils.action_logging -def run(args, dag=None): - # Disable connection pooling to reduce the # of connections on the DB - # while it's waiting for the task to finish. - settings.configure_orm(disable_connection_pool=True) - - if dag: - args.dag_id = dag.dag_id - - log = LoggingMixin().log - - # Load custom airflow config - if args.cfg_path: - with open(args.cfg_path, 'r') as conf_file: - conf_dict = json.load(conf_file) - - if os.path.exists(args.cfg_path): - os.remove(args.cfg_path) - - for section, config in conf_dict.items(): - for option, value in config.items(): - conf.set(section, option, value) - settings.configure_vars() - settings.configure_orm() - - if not args.pickle and not dag: - dag = get_dag(args) - elif not dag: - session = settings.Session() - log.info('Loading pickle id {args.pickle}'.format(args=args)) - dag_pickle = session.query( - DagPickle).filter(DagPickle.id == args.pickle).first() - if not dag_pickle: - raise AirflowException("Who hid the pickle!? [missing pickle]") - dag = dag_pickle.pickle - - task = dag.get_task(task_id=args.task_id) - ti = TaskInstance(task, args.execution_date) - ti.refresh_from_db() - - ti.init_run_context(raw=args.raw) - - hostname = get_hostname() - log.info("Running %s on host %s", ti, hostname) - - if args.interactive: - _run(args, dag, ti) - else: - with redirect_stdout(ti.log, logging.INFO), redirect_stderr(ti.log, logging.WARN): - _run(args, dag, ti) - - -@cli_utils.action_logging -def task_failed_deps(args): - """ - Returns the unmet dependencies for a task instance from the perspective of the - scheduler (i.e. why a task instance doesn't get scheduled and then queued by the - scheduler, and then run by an executor). - - >>> airflow task_failed_deps tutorial sleep 2015-01-01 - Task instance dependencies not met: - Dagrun Running: Task instance's dagrun did not exist: Unknown reason - Trigger Rule: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). - """ - dag = get_dag(args) - task = dag.get_task(task_id=args.task_id) - ti = TaskInstance(task, args.execution_date) - - dep_context = DepContext(deps=SCHEDULER_DEPS) - failed_deps = list(ti.get_failed_dep_statuses(dep_context=dep_context)) - # TODO, Do we want to print or log this - if failed_deps: - print("Task instance dependencies not met:") - for dep in failed_deps: - print("{}: {}".format(dep.dep_name, dep.reason)) - else: - print("Task instance dependencies are all met.") - - -@cli_utils.action_logging -def task_state(args): - """ - Returns the state of a TaskInstance at the command line. - - >>> airflow task_state tutorial sleep 2015-01-01 - success - """ - dag = get_dag(args) - task = dag.get_task(task_id=args.task_id) - ti = TaskInstance(task, args.execution_date) - print(ti.current_state()) - - -@cli_utils.action_logging -def dag_state(args): - """ - Returns the state of a DagRun at the command line. - - >>> airflow dag_state tutorial 2015-01-01T00:00:00.000000 - running - """ - dag = get_dag(args) - dr = DagRun.find(dag.dag_id, execution_date=args.execution_date) - print(dr[0].state if len(dr) > 0 else None) - - -@cli_utils.action_logging -def list_dags(args): - dagbag = DagBag(process_subdir(args.subdir)) - s = textwrap.dedent("""\n - ------------------------------------------------------------------- - DAGS - ------------------------------------------------------------------- - {dag_list} - """) - dag_list = "\n".join(sorted(dagbag.dags)) - print(s.format(dag_list=dag_list)) - if args.report: - print(dagbag.dagbag_report()) - - -@cli_utils.action_logging -def list_tasks(args, dag=None): - dag = dag or get_dag(args) - if args.tree: - dag.tree_view() - else: - tasks = sorted([t.task_id for t in dag.tasks]) - print("\n".join(sorted(tasks))) - - -@cli_utils.action_logging -def test(args, dag=None): - dag = dag or get_dag(args) - - task = dag.get_task(task_id=args.task_id) - # Add CLI provided task_params to task.params - if args.task_params: - passed_in_params = json.loads(args.task_params) - task.params.update(passed_in_params) - ti = TaskInstance(task, args.execution_date) - - if args.dry_run: - ti.dry_run() - else: - ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True) - - -@cli_utils.action_logging -def render(args): - dag = get_dag(args) - task = dag.get_task(task_id=args.task_id) - ti = TaskInstance(task, args.execution_date) - ti.render_templates() - for attr in task.__class__.template_fields: - print(textwrap.dedent("""\ - # ---------------------------------------------------------- - # property: {} - # ---------------------------------------------------------- - {} - """.format(attr, getattr(task, attr)))) - - -@cli_utils.action_logging -def clear(args): - logging.basicConfig( - level=settings.LOGGING_LEVEL, - format=settings.SIMPLE_LOG_FORMAT) - dags = get_dags(args) - - if args.task_regex: - for idx, dag in enumerate(dags): - dags[idx] = dag.sub_dag( - task_regex=args.task_regex, - include_downstream=args.downstream, - include_upstream=args.upstream) - - DAG.clear_dags( - dags, - start_date=args.start_date, - end_date=args.end_date, - only_failed=args.only_failed, - only_running=args.only_running, - confirm_prompt=not args.no_confirm, - include_subdags=not args.exclude_subdags) - - -def get_num_ready_workers_running(gunicorn_master_proc): - workers = psutil.Process(gunicorn_master_proc.pid).children() - - def ready_prefix_on_cmdline(proc): - try: - cmdline = proc.cmdline() - if len(cmdline) > 0: - return settings.GUNICORN_WORKER_READY_PREFIX in cmdline[0] - except psutil.NoSuchProcess: - pass - return False - - ready_workers = [proc for proc in workers if ready_prefix_on_cmdline(proc)] - return len(ready_workers) - - -def get_num_workers_running(gunicorn_master_proc): - workers = psutil.Process(gunicorn_master_proc.pid).children() - return len(workers) - - -def restart_workers(gunicorn_master_proc, num_workers_expected, master_timeout): - """ - Runs forever, monitoring the child processes of @gunicorn_master_proc and - restarting workers occasionally. - - Each iteration of the loop traverses one edge of this state transition - diagram, where each state (node) represents - [ num_ready_workers_running / num_workers_running ]. We expect most time to - be spent in [n / n]. `bs` is the setting webserver.worker_refresh_batch_size. - - The horizontal transition at ? happens after the new worker parses all the - dags (so it could take a while!) - - V âââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââ - [n / n] ââTTINââ> [ [n, n+bs) / n + bs ] ââââ?âââ> [n + bs / n + bs] ââTTOUââ - ^ ^ââââââââââââââââ - â - â âââââââââââââââââv - ââââââââ´ââââââ [ [0, n) / n ] <âââ start - - We change the number of workers by sending TTIN and TTOU to the gunicorn - master process, which increases and decreases the number of child workers - respectively. Gunicorn guarantees that on TTOU workers are terminated - gracefully and that the oldest worker is terminated. - """ - - def wait_until_true(fn, timeout=0): - """ - Sleeps until fn is true - """ - t = time.time() - while not fn(): - if 0 < timeout and timeout <= time.time() - t: - raise AirflowWebServerTimeout( - "No response from gunicorn master within {0} seconds" - .format(timeout)) - time.sleep(0.1) - - def start_refresh(gunicorn_master_proc): - batch_size = conf.getint('webserver', 'worker_refresh_batch_size') - log.debug('%s doing a refresh of %s workers', state, batch_size) - sys.stdout.flush() - sys.stderr.flush() - - excess = 0 - for _ in range(batch_size): - gunicorn_master_proc.send_signal(signal.SIGTTIN) - excess += 1 - wait_until_true(lambda: num_workers_expected + excess == - get_num_workers_running(gunicorn_master_proc), - master_timeout) - - try: - wait_until_true(lambda: num_workers_expected == - get_num_workers_running(gunicorn_master_proc), - master_timeout) - while True: - num_workers_running = get_num_workers_running(gunicorn_master_proc) - num_ready_workers_running = \ - get_num_ready_workers_running(gunicorn_master_proc) - - state = '[{0} / {1}]'.format(num_ready_workers_running, num_workers_running) - - # Whenever some workers are not ready, wait until all workers are ready - if num_ready_workers_running < num_workers_running: - log.debug('%s some workers are starting up, waiting...', state) - sys.stdout.flush() - time.sleep(1) - - # Kill a worker gracefully by asking gunicorn to reduce number of workers - elif num_workers_running > num_workers_expected: - excess = num_workers_running - num_workers_expected - log.debug('%s killing %s workers', state, excess) - - for _ in range(excess): - gunicorn_master_proc.send_signal(signal.SIGTTOU) - excess -= 1 - wait_until_true(lambda: num_workers_expected + excess == - get_num_workers_running(gunicorn_master_proc), - master_timeout) - - # Start a new worker by asking gunicorn to increase number of workers - elif num_workers_running == num_workers_expected: - refresh_interval = conf.getint('webserver', 'worker_refresh_interval') - log.debug( - '%s sleeping for %ss starting doing a refresh...', - state, refresh_interval - ) - time.sleep(refresh_interval) - start_refresh(gunicorn_master_proc) - - else: - # num_ready_workers_running == num_workers_running < num_workers_expected - log.error(( - "%s some workers seem to have died and gunicorn" - "did not restart them as expected" - ), state) - time.sleep(10) - if len( - psutil.Process(gunicorn_master_proc.pid).children() - ) < num_workers_expected: - start_refresh(gunicorn_master_proc) - except (AirflowWebServerTimeout, OSError) as err: - log.error(err) - log.error("Shutting down webserver") - try: - gunicorn_master_proc.terminate() - gunicorn_master_proc.wait() - finally: - sys.exit(1) - - -@cli_utils.action_logging -def webserver(args): - print(settings.HEADER) - - access_logfile = args.access_logfile or conf.get('webserver', 'access_logfile') - error_logfile = args.error_logfile or conf.get('webserver', 'error_logfile') - num_workers = args.workers or conf.get('webserver', 'workers') - worker_timeout = (args.worker_timeout or - conf.get('webserver', 'web_server_worker_timeout')) - ssl_cert = args.ssl_cert or conf.get('webserver', 'web_server_ssl_cert') - ssl_key = args.ssl_key or conf.get('webserver', 'web_server_ssl_key') - if not ssl_cert and ssl_key: - raise AirflowException( - 'An SSL certificate must also be provided for use with ' + ssl_key) - if ssl_cert and not ssl_key: - raise AirflowException( - 'An SSL key must also be provided for use with ' + ssl_cert) - - if args.debug: - print( - "Starting the web server on port {0} and host {1}.".format( - args.port, args.hostname)) - app = create_app_rbac(conf) if settings.RBAC else create_app(conf) - app.run(debug=True, port=args.port, host=args.hostname, - ssl_context=(ssl_cert, ssl_key) if ssl_cert and ssl_key else None) - else: - app = cached_app_rbac(conf) if settings.RBAC else cached_app(conf) - pid, stdout, stderr, log_file = setup_locations( - "webserver", args.pid, args.stdout, args.stderr, args.log_file) - if args.daemon: - handle = setup_logging(log_file) - stdout = open(stdout, 'w+') - stderr = open(stderr, 'w+') - - print( - textwrap.dedent('''\ - Running the Gunicorn Server with: - Workers: {num_workers} {args.workerclass} - Host: {args.hostname}:{args.port} - Timeout: {worker_timeout} - Logfiles: {access_logfile} {error_logfile} - =================================================================\ - '''.format(**locals()))) - - run_args = [ - 'gunicorn', - '-w', str(num_workers), - '-k', str(args.workerclass), - '-t', str(worker_timeout), - '-b', args.hostname + ':' + str(args.port), - '-n', 'airflow-webserver', - '-p', str(pid), - '-c', 'python:airflow.www.gunicorn_config', - ] - - if args.access_logfile: - run_args += ['--access-logfile', str(args.access_logfile)] - - if args.error_logfile: - run_args += ['--error-logfile', str(args.error_logfile)] - - if args.daemon: - run_args += ['-D'] - - if ssl_cert: - run_args += ['--certfile', ssl_cert, '--keyfile', ssl_key] - - webserver_module = 'www_rbac' if settings.RBAC else 'www' - run_args += ["airflow." + webserver_module + ".app:cached_app()"] - - gunicorn_master_proc = None - - def kill_proc(dummy_signum, dummy_frame): - gunicorn_master_proc.terminate() - gunicorn_master_proc.wait() - sys.exit(0) - - def monitor_gunicorn(gunicorn_master_proc): - # These run forever until SIG{INT, TERM, KILL, ...} signal is sent - if conf.getint('webserver', 'worker_refresh_interval') > 0: - master_timeout = conf.getint('webserver', 'web_server_master_timeout') - restart_workers(gunicorn_master_proc, num_workers, master_timeout) - else: - while True: - time.sleep(1) - - if args.daemon: - base, ext = os.path.splitext(pid) - ctx = daemon.DaemonContext( - pidfile=TimeoutPIDLockFile(base + "-monitor" + ext, -1), - files_preserve=[handle], - stdout=stdout, - stderr=stderr, - signal_map={ - signal.SIGINT: kill_proc, - signal.SIGTERM: kill_proc - }, - ) - with ctx: - subprocess.Popen(run_args, close_fds=True) - - # Reading pid file directly, since Popen#pid doesn't - # seem to return the right value with DaemonContext. - while True: - try: - with open(pid) as f: - gunicorn_master_proc_pid = int(f.read()) - break - except IOError: - log.debug("Waiting for gunicorn's pid file to be created.") - time.sleep(0.1) - - gunicorn_master_proc = psutil.Process(gunicorn_master_proc_pid) - monitor_gunicorn(gunicorn_master_proc) - - stdout.close() - stderr.close() - else: - gunicorn_master_proc = subprocess.Popen(run_args, close_fds=True) - - signal.signal(signal.SIGINT, kill_proc) - signal.signal(signal.SIGTERM, kill_proc) - - monitor_gunicorn(gunicorn_master_proc) - - -@cli_utils.action_logging -def scheduler(args): - print(settings.HEADER) - job = jobs.SchedulerJob( - dag_id=args.dag_id, - subdir=process_subdir(args.subdir), - run_duration=args.run_duration, - num_runs=args.num_runs, - do_pickle=args.do_pickle) - - if args.daemon: - pid, stdout, stderr, log_file = setup_locations("scheduler", args.pid, args.stdout, args.stderr, args.log_file) - handle = setup_logging(log_file) - stdout = open(stdout, 'w+') - stderr = open(stderr, 'w+') - - ctx = daemon.DaemonContext( - pidfile=TimeoutPIDLockFile(pid, -1), - files_preserve=[handle], - stdout=stdout, - stderr=stderr, - ) - with ctx: - job.run() - - stdout.close() - stderr.close() - else: - signal.signal(signal.SIGINT, sigint_handler) - signal.signal(signal.SIGTERM, sigint_handler) - signal.signal(signal.SIGQUIT, sigquit_handler) - job.run() - - -@cli_utils.action_logging -def serve_logs(args): - print("Starting flask") - import flask - flask_app = flask.Flask(__name__) - - @flask_app.route('/log/<path:filename>') - def serve_logs(filename): # noqa - log = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER')) - return flask.send_from_directory( - log, - filename, - mimetype="application/json", - as_attachment=False) - - WORKER_LOG_SERVER_PORT = \ - int(conf.get('celery', 'WORKER_LOG_SERVER_PORT')) - flask_app.run( - host='0.0.0.0', port=WORKER_LOG_SERVER_PORT) - - -@cli_utils.action_logging -def worker(args): - env = os.environ.copy() - env['AIRFLOW_HOME'] = settings.AIRFLOW_HOME - - # Celery worker - from airflow.executors.celery_executor import app as celery_app - from celery.bin import worker - - worker = worker.worker(app=celery_app) - options = { - 'optimization': 'fair', - 'O': 'fair', - 'queues': args.queues, - 'concurrency': args.concurrency, - 'hostname': args.celery_hostname, - } - - if args.daemon: - pid, stdout, stderr, log_file = setup_locations("worker", args.pid, args.stdout, args.stderr, args.log_file) - handle = setup_logging(log_file) - stdout = open(stdout, 'w+') - stderr = open(stderr, 'w+') - - ctx = daemon.DaemonContext( - pidfile=TimeoutPIDLockFile(pid, -1), - files_preserve=[handle], - stdout=stdout, - stderr=stderr, - ) - with ctx: - sp = subprocess.Popen(['airflow', 'serve_logs'], env=env, close_fds=True) - worker.run(**options) - sp.kill() - - stdout.close() - stderr.close() - else: - signal.signal(signal.SIGINT, sigint_handler) - signal.signal(signal.SIGTERM, sigint_handler) - - sp = subprocess.Popen(['airflow', 'serve_logs'], env=env, close_fds=True) - - worker.run(**options) - sp.kill() - - -@cli_utils.action_logging -def initdb(args): # noqa - print("DB: " + repr(settings.engine.url)) - db_utils.initdb(settings.RBAC) - print("Done.") - - -@cli_utils.action_logging -def resetdb(args): - print("DB: " + repr(settings.engine.url)) - if args.yes or input( - "This will drop existing tables if they exist. " - "Proceed? (y/n)").upper() == "Y": - db_utils.resetdb(settings.RBAC) - else: - print("Bail.") - - -@cli_utils.action_logging -def upgradedb(args): # noqa - print("DB: " + repr(settings.engine.url)) - db_utils.upgradedb() - - # Populate DagStats table - session = settings.Session() - ds_rows = session.query(DagStat).count() - if not ds_rows: - qry = ( - session.query(DagRun.dag_id, DagRun.state, func.count('*')) - .group_by(DagRun.dag_id, DagRun.state) - ) - for dag_id, state, count in qry: - session.add(DagStat(dag_id=dag_id, state=state, count=count)) - session.commit() - - -@cli_utils.action_logging -def version(args): # noqa - print(settings.HEADER + " v" + airflow.__version__) - - -alternative_conn_specs = ['conn_type', 'conn_host', - 'conn_login', 'conn_password', 'conn_schema', 'conn_port'] - - -@cli_utils.action_logging -def connections(args): - if args.list: - # Check that no other flags were passed to the command - invalid_args = list() - for arg in ['conn_id', 'conn_uri', 'conn_extra'] + alternative_conn_specs: - if getattr(args, arg) is not None: - invalid_args.append(arg) - if invalid_args: - msg = ('\n\tThe following args are not compatible with the ' + - '--list flag: {invalid!r}\n') - msg = msg.format(invalid=invalid_args) - print(msg) - return - - session = settings.Session() - conns = session.query(Connection.conn_id, Connection.conn_type, - Connection.host, Connection.port, - Connection.is_encrypted, - Connection.is_extra_encrypted, - Connection.extra).all() - conns = [map(reprlib.repr, conn) for conn in conns] - print(tabulate(conns, ['Conn Id', 'Conn Type', 'Host', 'Port', - 'Is Encrypted', 'Is Extra Encrypted', 'Extra'], - tablefmt="fancy_grid")) - return - - if args.delete: - # Check that only the `conn_id` arg was passed to the command - invalid_args = list() - for arg in ['conn_uri', 'conn_extra'] + alternative_conn_specs: - if getattr(args, arg) is not None: - invalid_args.append(arg) - if invalid_args: - msg = ('\n\tThe following args are not compatible with the ' + - '--delete flag: {invalid!r}\n') - msg = msg.format(invalid=invalid_args) - print(msg) - return - - if args.conn_id is None: - print('\n\tTo delete a connection, you Must provide a value for ' + - 'the --conn_id flag.\n') - return - - session = settings.Session() - try: - to_delete = (session - .query(Connection) - .filter(Connection.conn_id == args.conn_id) - .one()) - except exc.NoResultFound: - msg = '\n\tDid not find a connection with `conn_id`={conn_id}\n' - msg = msg.format(conn_id=args.conn_id) - print(msg) - return - except exc.MultipleResultsFound: - msg = ('\n\tFound more than one connection with ' + - '`conn_id`={conn_id}\n') - msg = msg.format(conn_id=args.conn_id) - print(msg) - return - else: - deleted_conn_id = to_delete.conn_id - session.delete(to_delete) - session.commit() - msg = '\n\tSuccessfully deleted `conn_id`={conn_id}\n' - msg = msg.format(conn_id=deleted_conn_id) - print(msg) - return - - if args.add: - # Check that the conn_id and conn_uri args were passed to the command: - missing_args = list() - invalid_args = list() - if not args.conn_id: - missing_args.append('conn_id') - if args.conn_uri: - for arg in alternative_conn_specs: - if getattr(args, arg) is not None: - invalid_args.append(arg) - elif not args.conn_type: - missing_args.append('conn_uri or conn_type') - if missing_args: - msg = ('\n\tThe following args are required to add a connection:' + - ' {missing!r}\n'.format(missing=missing_args)) - print(msg) - if invalid_args: - msg = ('\n\tThe following args are not compatible with the ' + - '--add flag and --conn_uri flag: {invalid!r}\n') - msg = msg.format(invalid=invalid_args) - print(msg) - if missing_args or invalid_args: - return - - if args.conn_uri: - new_conn = Connection(conn_id=args.conn_id, uri=args.conn_uri) - else: - new_conn = Connection(conn_id=args.conn_id, conn_type=args.conn_type, host=args.conn_host, - login=args.conn_login, password=args.conn_password, schema=args.conn_schema, port=args.conn_port) - if args.conn_extra is not None: - new_conn.set_extra(args.conn_extra) - - session = settings.Session() - if not (session - .query(Connection) - .filter(Connection.conn_id == new_conn.conn_id).first()): - session.add(new_conn) - session.commit() - msg = '\n\tSuccessfully added `conn_id`={conn_id} : {uri}\n' - msg = msg.format(conn_id=new_conn.conn_id, uri=args.conn_uri or urlunparse((args.conn_type, '{login}:{password}@{host}:{port}'.format( - login=args.conn_login or '', password=args.conn_password or '', host=args.conn_host or '', port=args.conn_port or ''), args.conn_schema or '', '', '', ''))) - print(msg) - else: - msg = '\n\tA connection with `conn_id`={conn_id} already exists\n' - msg = msg.format(conn_id=new_conn.conn_id) - print(msg) - - return - - -@cli_utils.action_logging -def flower(args): - broka = conf.get('celery', 'BROKER_URL') - address = '--address={}'.format(args.hostname) - port = '--port={}'.format(args.port) - api = '' - if args.broker_api: - api = '--broker_api=' + args.broker_api - - url_prefix = '' - if args.url_prefix: - url_prefix = '--url-prefix=' + args.url_prefix - - flower_conf = '' - if args.flower_conf: - flower_conf = '--conf=' + args.flower_conf - - if args.daemon: - pid, stdout, stderr, log_file = setup_locations("flower", args.pid, args.stdout, args.stderr, args.log_file) - stdout = open(stdout, 'w+') - stderr = open(stderr, 'w+') - - ctx = daemon.DaemonContext( - pidfile=TimeoutPIDLockFile(pid, -1), - stdout=stdout, - stderr=stderr, - ) - - with ctx: - os.execvp("flower", ['flower', '-b', - broka, address, port, api, flower_conf, url_prefix]) - - stdout.close() - stderr.close() - else: - signal.signal(signal.SIGINT, sigint_handler) - signal.signal(signal.SIGTERM, sigint_handler) - - os.execvp("flower", ['flower', '-b', - broka, address, port, api, flower_conf, url_prefix]) - - -@cli_utils.action_logging -def kerberos(args): # noqa - print(settings.HEADER) - import airflow.security.kerberos - - if args.daemon: - pid, stdout, stderr, log_file = setup_locations("kerberos", - args.pid, - args.stdout, - args.stderr, - args.log_file) - stdout = open(stdout, 'w+') - stderr = open(stderr, 'w+') - - ctx = daemon.DaemonContext( - pidfile=TimeoutPIDLockFile(pid, -1), - stdout=stdout, - stderr=stderr, - ) - - with ctx: - airflow.security.kerberos.run() - - stdout.close() - stderr.close() - else: - airflow.security.kerberos.run() - - -@cli_utils.action_logging -def create_user(args): - fields = { - 'role': args.role, - 'username': args.username, - 'email': args.email, - 'firstname': args.firstname, - 'lastname': args.lastname, - } - empty_fields = [k for k, v in fields.items() if not v] - if empty_fields: - print('Missing arguments: {}.'.format(', '.join(empty_fields))) - sys.exit(0) - - appbuilder = cached_appbuilder() - role = appbuilder.sm.find_role(args.role) - if not role: - print('{} is not a valid role.'.format(args.role)) - sys.exit(0) - - password = getpass.getpass('Password:') - password_confirmation = getpass.getpass('Repeat for confirmation:') - if password != password_confirmation: - print('Passwords did not match!') - sys.exit(0) - - user = appbuilder.sm.add_user(args.username, args.firstname, args.lastname, - args.email, role, password) - if user: - print('{} user {} created.'.format(args.role, args.username)) - else: - print('Failed to create user.') - - -Arg = namedtuple( - 'Arg', ['flags', 'help', 'action', 'default', 'nargs', 'type', 'choices', 'metavar']) -Arg.__new__.__defaults__ = (None, None, None, None, None, None, None) - - -class CLIFactory(object): - args = { - # Shared - 'dag_id': Arg(("dag_id",), "The id of the dag"), - 'task_id': Arg(("task_id",), "The id of the task"), - 'execution_date': Arg( - ("execution_date",), help="The execution date of the DAG", - type=parsedate), - 'task_regex': Arg( - ("-t", "--task_regex"), - "The regex to filter specific task_ids to backfill (optional)"), - 'subdir': Arg( - ("-sd", "--subdir"), - "File location or directory from which to look for the dag", - default=settings.DAGS_FOLDER), - 'start_date': Arg( - ("-s", "--start_date"), "Override start_date YYYY-MM-DD", - type=parsedate), - 'end_date': Arg( - ("-e", "--end_date"), "Override end_date YYYY-MM-DD", - type=parsedate), - 'dry_run': Arg( - ("-dr", "--dry_run"), "Perform a dry run", "store_true"), - 'pid': Arg( - ("--pid",), "PID file location", - nargs='?'), - 'daemon': Arg( - ("-D", "--daemon"), "Daemonize instead of running " - "in the foreground", - "store_true"), - 'stderr': Arg( - ("--stderr",), "Redirect stderr to this file"), - 'stdout': Arg( - ("--stdout",), "Redirect stdout to this file"), - 'log_file': Arg( - ("-l", "--log-file"), "Location of the log file"), - 'yes': Arg( - ("-y", "--yes"), - "Do not prompt to confirm reset. Use with care!", - "store_true", - default=False), - - # backfill - 'mark_success': Arg( - ("-m", "--mark_success"), - "Mark jobs as succeeded without running them", "store_true"), - 'local': Arg( - ("-l", "--local"), - "Run the task using the LocalExecutor", "store_true"), - 'donot_pickle': Arg( - ("-x", "--donot_pickle"), ( - "Do not attempt to pickle the DAG object to send over " - "to the workers, just tell the workers to run their version " - "of the code."), - "store_true"), - 'include_adhoc': Arg( - ("-a", "--include_adhoc"), - "Include dags with the adhoc parameter.", "store_true"), - 'bf_ignore_dependencies': Arg( - ("-i", "--ignore_dependencies"), - ( - "Skip upstream tasks, run only the tasks " - "matching the regexp. Only works in conjunction " - "with task_regex"), - "store_true"), - 'bf_ignore_first_depends_on_past': Arg( - ("-I", "--ignore_first_depends_on_past"), - ( - "Ignores depends_on_past dependencies for the first " - "set of tasks only (subsequent executions in the backfill " - "DO respect depends_on_past)."), - "store_true"), - 'pool': Arg(("--pool",), "Resource pool to use"), - 'delay_on_limit': Arg( - ("--delay_on_limit",), - help=("Amount of time in seconds to wait when the limit " - "on maximum active dag runs (max_active_runs) has " - "been reached before trying to execute a dag run " - "again."), - type=float, - default=1.0), - # list_tasks - 'tree': Arg(("-t", "--tree"), "Tree view", "store_true"), - # list_dags - 'report': Arg( - ("-r", "--report"), "Show DagBag loading report", "store_true"), - # clear - 'upstream': Arg( - ("-u", "--upstream"), "Include upstream tasks", "store_true"), - 'only_failed': Arg( - ("-f", "--only_failed"), "Only failed jobs", "store_true"), - 'only_running': Arg( - ("-r", "--only_running"), "Only running jobs", "store_true"), - 'downstream': Arg( - ("-d", "--downstream"), "Include downstream tasks", "store_true"), - 'no_confirm': Arg( - ("-c", "--no_confirm"), - "Do not request confirmation", "store_true"), - 'exclude_subdags': Arg( - ("-x", "--exclude_subdags"), - "Exclude subdags", "store_true"), - 'dag_regex': Arg( - ("-dx", "--dag_regex"), - "Search dag_id as regex instead of exact string", "store_true"), - # trigger_dag - 'run_id': Arg(("-r", "--run_id"), "Helps to identify this run"), - 'conf': Arg( - ('-c', '--conf'), - "JSON string that gets pickled into the DagRun's conf attribute"), - 'exec_date': Arg( - ("-e", "--exec_date"), help="The execution date of the DAG", - type=parsedate), - # pool - 'pool_set': Arg( - ("-s", "--set"), - nargs=3, - metavar=('NAME', 'SLOT_COUNT', 'POOL_DESCRIPTION'), - help="Set pool slot count and description, respectively"), - 'pool_get': Arg( - ("-g", "--get"), - metavar='NAME', - help="Get pool info"), - 'pool_delete': Arg( - ("-x", "--delete"), - metavar="NAME", - help="Delete a pool"), - # variables - 'set': Arg( - ("-s", "--set"), - nargs=2, - metavar=('KEY', 'VAL'), - help="Set a variable"), - 'get': Arg( - ("-g", "--get"), - metavar='KEY', - help="Get value of a variable"), - 'default': Arg( - ("-d", "--default"), - metavar="VAL", - default=None, - help="Default value returned if variable does not exist"), - 'json': Arg( - ("-j", "--json"), - help="Deserialize JSON variable", - action="store_true"), - 'var_import': Arg( - ("-i", "--import"), - metavar="FILEPATH", - help="Import variables from JSON file"), - 'var_export': Arg( - ("-e", "--export"), - metavar="FILEPATH", - help="Export variables to JSON file"), - 'var_delete': Arg( - ("-x", "--delete"), - metavar="KEY", - help="Delete a variable"), - # kerberos - 'principal': Arg( - ("principal",), "kerberos principal", - nargs='?', default=conf.get('kerberos', 'principal')), - 'keytab': Arg( - ("-kt", "--keytab"), "keytab", - nargs='?', default=conf.get('kerberos', 'keytab')), - # run - # TODO(aoen): "force" is a poor choice of name here since it implies it overrides - # all dependencies (not just past success), e.g. the ignore_depends_on_past - # dependency. This flag should be deprecated and renamed to 'ignore_ti_state' and - # the "ignore_all_dependencies" command should be called the"force" command - # instead. - 'interactive': Arg( - ('-int', '--interactive'), - help='Do not capture standard output and error streams ' - '(useful for interactive debugging)', - action='store_true'), - 'force': Arg( - ("-f", "--force"), - "Ignore previous task instance state, rerun regardless if task already " - "succeeded/failed", - "store_true"), - 'raw': Arg(("-r", "--raw"), argparse.SUPPRESS, "store_true"), - 'ignore_all_dependencies': Arg( - ("-A", "--ignore_all_dependencies"), - "Ignores all non-critical dependencies, including ignore_ti_state and " - "ignore_task_deps", - "store_true"), - # TODO(aoen): ignore_dependencies is a poor choice of name here because it is too - # vague (e.g. a task being in the appropriate state to be run is also a dependency - # but is not ignored by this flag), the name 'ignore_task_dependencies' is - # slightly better (as it ignores all dependencies that are specific to the task), - # so deprecate the old command name and use this instead. - 'ignore_dependencies': Arg( - ("-i", "--ignore_dependencies"), - "Ignore task-specific dependencies, e.g. upstream, depends_on_past, and " - "retry delay dependencies", - "store_true"), - 'ignore_depends_on_past': Arg( - ("-I", "--ignore_depends_on_past"), - "Ignore depends_on_past dependencies (but respect " - "upstream dependencies)", - "store_true"), - 'ship_dag': Arg( - ("--ship_dag",), - "Pickles (serializes) the DAG and ships it to the worker", - "store_true"), - 'pickle': Arg( - ("-p", "--pickle"), - "Serialized pickle object of the entire dag (used internally)"), - 'job_id': Arg(("-j", "--job_id"), argparse.SUPPRESS), - 'cfg_path': Arg( - ("--cfg_path",), "Path to config file to use instead of airflow.cfg"), - # webserver - 'port': Arg( - ("-p", "--port"), - default=conf.get('webserver', 'WEB_SERVER_PORT'), - type=int, - help="The port on which to run the server"), - 'ssl_cert': Arg( - ("--ssl_cert",), - default=conf.get('webserver', 'WEB_SERVER_SSL_CERT'), - help="Path to the SSL certificate for the webserver"), - 'ssl_key': Arg( - ("--ssl_key",), - default=conf.get('webserver', 'WEB_SERVER_SSL_KEY'), - help="Path to the key to use with the SSL certificate"), - 'workers': Arg( - ("-w", "--workers"), - default=conf.get('webserver', 'WORKERS'), - type=int, - help="Number of workers to run the webserver on"), - 'workerclass': Arg( - ("-k", "--workerclass"), - default=conf.get('webserver', 'WORKER_CLASS'), - choices=['sync', 'eventlet', 'gevent', 'tornado'], - help="The worker class to use for Gunicorn"), - 'worker_timeout': Arg( - ("-t", "--worker_timeout"), - default=conf.get('webserver', 'WEB_SERVER_WORKER_TIMEOUT'), - type=int, - help="The timeout for waiting on webserver workers"), - 'hostname': Arg( - ("-hn", "--hostname"), - default=conf.get('webserver', 'WEB_SERVER_HOST'), - help="Set the hostname on which to run the web server"), - 'debug': Arg( - ("-d", "--debug"), - "Use the server that ships with Flask in debug mode", - "store_true"), - 'access_logfile': Arg( - ("-A", "--access_logfile"), - default=conf.get('webserver', 'ACCESS_LOGFILE'), - help="The logfile to store the webserver access log. Use '-' to print to " - "stderr."), - 'error_logfile': Arg( - ("-E", "--error_logfile"), - default=conf.get('webserver', 'ERROR_LOGFILE'), - help="The logfile to store the webserver error log. Use '-' to print to " - "stderr."), - # scheduler - 'dag_id_opt': Arg(("-d", "--dag_id"), help="The id of the dag to run"), - 'run_duration': Arg( - ("-r", "--run-duration"), - default=None, type=int, - help="Set number of seconds to execute before exiting"), - 'num_runs': Arg( - ("-n", "--num_runs"), - default=-1, type=int, - help="Set the number of runs to execute before exiting"), - # worker - 'do_pickle': Arg( - ("-p", "--do_pickle"), - default=False, - help=( - "Attempt to pickle the DAG object to send over " - "to the workers, instead of letting workers run their version " - "of the code."), - action="store_true"), - 'queues': Arg( - ("-q", "--queues"), - help="Comma delimited list of queues to serve", - default=conf.get('celery', 'DEFAULT_QUEUE')), - 'concurrency': Arg( - ("-c", "--concurrency"), - type=int, - help="The number of worker processes", - default=conf.get('celery', 'worker_concurrency')), - 'celery_hostname': Arg( - ("-cn", "--celery_hostname"), - help=("Set the hostname of celery worker " - "if you have multiple workers on a single machine.")), - # flower - 'broker_api': Arg(("-a", "--broker_api"), help="Broker api"), - 'flower_hostname': Arg( - ("-hn", "--hostname"), - default=conf.get('celery', 'FLOWER_HOST'), - help="Set the hostname on which to run the server"), - 'flower_port': Arg( - ("-p", "--port"), - default=conf.get('celery', 'FLOWER_PORT'), - type=int, - help="The port on which to run the server"), - 'flower_conf': Arg( - ("-fc", "--flower_conf"), - help="Configuration file for flower"), - 'flower_url_prefix': Arg( - ("-u", "--url_prefix"), - default=conf.get('celery', 'FLOWER_URL_PREFIX'), - help="URL prefix for Flower"), - 'task_params': Arg( - ("-tp", "--task_params"), - help="Sends a JSON params dict to the task"), - # connections - 'list_connections': Arg( - ('-l', '--list'), - help='List all connections', - action='store_true'), - 'add_connection': Arg( - ('-a', '--add'), - help='Add a connection', - action='store_true'), - 'delete_connection': Arg( - ('-d', '--delete'), - help='Delete a connection', - action='store_true'), - 'conn_id': Arg( - ('--conn_id',), - help='Connection id, required to add/delete a connection', - type=str), - 'conn_uri': Arg( - ('--conn_uri',), - help='Connection URI, required to add a connection without conn_type', - type=str), - 'conn_type': Arg( - ('--conn_type',), - help='Connection type, required to add a connection without conn_uri', - type=str), - 'conn_host': Arg( - ('--conn_host',), - help='Connection host, optional when adding a connection', - type=str), - 'conn_login': Arg( - ('--conn_login',), - help='Connection login, optional when adding a connection', - type=str), - 'conn_password': Arg( - ('--conn_password',), - help='Connection password, optional when adding a connection', - type=str), - 'conn_schema': Arg( - ('--conn_schema',), - help='Connection schema, optional when adding a connection', - type=str), - 'conn_port': Arg( - ('--conn_port',), - help='Connection port, optional when adding a connection', - type=str), - 'conn_extra': Arg( - ('--conn_extra',), - help='Connection `Extra` field, optional when adding a connection', - type=str), - # create_user - 'role': Arg( - ('-r', '--role',), - help='Role of the user. Existing roles include Admin, ' - 'User, Op, Viewer, and Public', - type=str), - 'firstname': Arg( - ('-f', '--firstname',), - help='First name of the user', - type=str), - 'lastname': Arg( - ('-l', '--lastname',), - help='Last name of the user', - type=str), - 'email': Arg( - ('-e', '--email',), - help='Email of the user', - type=str), - 'username': Arg( - ('-u', '--username',), - help='Username of the user', - type=str), - } - subparsers = ( - { - 'func': backfill, - 'help': "Run subsections of a DAG for a specified date range", - 'args': ( - 'dag_id', 'task_regex', 'start_date', 'end_date', - 'mark_success', 'local', 'donot_pickle', 'include_adhoc', - 'bf_ignore_dependencies', 'bf_ignore_first_depends_on_past', - 'subdir', 'pool', 'delay_on_limit', 'dry_run') - }, { - 'func': list_tasks, - 'help': "List the tasks within a DAG", - 'args': ('dag_id', 'tree', 'subdir'), - }, { - 'func': clear, - 'help': "Clear a set of task instance, as if they never ran", - 'args': ( - 'dag_id', 'task_regex', 'start_date', 'end_date', 'subdir', - 'upstream', 'downstream', 'no_confirm', 'only_failed', - 'only_running', 'exclude_subdags', 'dag_regex'), - }, { - 'func': pause, - 'help': "Pause a DAG", - 'args': ('dag_id', 'subdir'), - }, { - 'func': unpause, - 'help': "Resume a paused DAG", - 'args': ('dag_id', 'subdir'), - }, { - 'func': trigger_dag, - 'help': "Trigger a DAG run", - 'args': ('dag_id', 'subdir', 'run_id', 'conf', 'exec_date'), - }, { - 'func': delete_dag, - 'help': "Delete all DB records related to the specified DAG", - 'args': ('dag_id', 'yes',), - }, { - 'func': pool, - 'help': "CRUD operations on pools", - "args": ('pool_set', 'pool_get', 'pool_delete'), - }, { - 'func': variables, - 'help': "CRUD operations on variables", - "args": ('set', 'get', 'json', 'default', 'var_import', 'var_export', 'var_delete'), - }, { - 'func': kerberos, - 'help': "Start a kerberos ticket renewer", - 'args': ('principal', 'keytab', 'pid', - 'daemon', 'stdout', 'stderr', 'log_file'), - }, { - 'func': render, - 'help': "Render a task instance's template(s)", - 'args': ('dag_id', 'task_id', 'execution_date', 'subdir'), - }, { - 'func': run, - 'help': "Run a single task instance", - 'args': ( - 'dag_id', 'task_id', 'execution_date', 'subdir', - 'mark_success', 'force', 'pool', 'cfg_path', - 'local', 'raw', 'ignore_all_dependencies', 'ignore_dependencies', - 'ignore_depends_on_past', 'ship_dag', 'pickle', 'job_id', 'interactive',), - }, { - 'func': initdb, - 'help': "Initialize the metadata database", - 'args': tuple(), - }, { - 'func': list_dags, - 'help': "List all the DAGs", - 'args': ('subdir', 'report'), - }, { - 'func': dag_state, - 'help': "Get the status of a dag run", - 'args': ('dag_id', 'execution_date', 'subdir'), - }, { - 'func': task_failed_deps, - 'help': ( - "Returns the unmet dependencies for a task instance from the perspective " - "of the scheduler. In other words, why a task instance doesn't get " - "scheduled and then queued by the scheduler, and then run by an " - "executor)."), - 'args': ('dag_id', 'task_id', 'execution_date', 'subdir'), - }, { - 'func': task_state, - 'help': "Get the status of a task instance", - 'args': ('dag_id', 'task_id', 'execution_date', 'subdir'), - }, { - 'func': serve_logs, - 'help': "Serve logs generate by worker", - 'args': tuple(), - }, { - 'func': test, - 'help': ( - "Test a task instance. This will run a task without checking for " - "dependencies or recording its state in the database."), - 'args': ( - 'dag_id', 'task_id', 'execution_date', 'subdir', 'dry_run', - 'task_params'), - }, { - 'func': webserver, - 'help': "Start a Airflow webserver instance", - 'args': ('port', 'workers', 'workerclass', 'worker_timeout', 'hostname', - 'pid', 'daemon', 'stdout', 'stderr', 'access_logfile', - 'error_logfile', 'log_file', 'ssl_cert', 'ssl_key', 'debug'), - }, { - 'func': resetdb, - 'help': "Burn down and rebuild the metadata database", - 'args': ('yes',), - }, { - 'func': upgradedb, - 'help': "Upgrade the metadata database to latest version", - 'args': tuple(), - }, { - 'func': scheduler, - 'help': "Start a scheduler instance", - 'args': ('dag_id_opt', 'subdir', 'run_duration', 'num_runs', - 'do_pickle', 'pid', 'daemon', 'stdout', 'stderr', - 'log_file'), - }, { - 'func': worker, - 'help': "Start a Celery worker node", - 'args': ('do_pickle', 'queues', 'concurrency', 'celery_hostname', - 'pid', 'daemon', 'stdout', 'stderr', 'log_file'), - }, { - 'func': flower, - 'help': "Start a Celery Flower", - 'args': ('flower_hostname', 'flower_port', 'flower_conf', 'flower_url_prefix', - 'broker_api', 'pid', 'daemon', 'stdout', 'stderr', 'log_file'), - }, { - 'func': version, - 'help': "Show the version", - 'args': tuple(), - }, { - 'func': connections, - 'help': "List/Add/Delete connections", - 'args': ('list_connections', 'add_connection', 'delete_connection', - 'conn_id', 'conn_uri', 'conn_extra') + tuple(alternative_conn_specs), - }, { - 'func': create_user, - 'help': "Create an admin account", - 'args': ('role', 'username', 'email', 'firstname', 'lastname'), - }, - ) - subparsers_dict = {sp['func'].__name__: sp for sp in subparsers} - dag_subparsers = ( - 'list_tasks', 'backfill', 'test', 'run', 'pause', 'unpause') - - @classmethod - def get_parser(cls, dag_parser=False): - parser = argparse.ArgumentParser() - subparsers = parser.add_subparsers( - help='sub-command help', dest='subcommand') - subparsers.required = True - - subparser_list = cls.dag_subparsers if dag_parser else cls.subparsers_dict.keys() - for sub in subparser_list: - sub = cls.subparsers_dict[sub] - sp = subparsers.add_parser(sub['func'].__name__, help=sub['help']) - for arg in sub['args']: - if 'dag_id' in arg and dag_parser: - continue - arg = cls.args[arg] - kwargs = { - f: getattr(arg, f) - for f in arg._fields if f != 'flags' and getattr(arg, f)} - sp.add_argument(*arg.flags, **kwargs) - sp.set_defaults(func=sub['func']) - return parser - - -def get_parser(): - return CLIFactory.get_parser() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/airflow/bin/cli/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/bin/cli/__init__.py b/airflow/bin/cli/__init__.py new file mode 100644 index 0000000..026f941 --- /dev/null +++ b/airflow/bin/cli/__init__.py @@ -0,0 +1,3 @@ +from cli import * +from cli_factory import * +from airflow import *
