[AIRFLOW-1314] Rebasing against master

Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b9a87a07
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b9a87a07
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b9a87a07

Branch: refs/heads/master
Commit: b9a87a07e3b6dbd965c91f09d0f1c10281f628f9
Parents: 309f764
Author: Daniel Imberman <[email protected]>
Authored: Tue Oct 31 14:29:27 2017 -0700
Committer: Fokko Driesprong <[email protected]>
Committed: Sun Apr 22 10:23:06 2018 +0200

----------------------------------------------------------------------
 .travis.yml                                     |   38 +-
 airflow/__init__.py                             |    1 -
 airflow/bin/cli.py                              | 1796 ++++++++++++++++
 airflow/bin/cli/__init__.py                     |    3 -
 airflow/bin/cli/cli.py                          | 1915 ------------------
 airflow/bin/cli/cli_factory.py                  |  493 -----
 .../contrib/executors/kubernetes_executor.py    |  204 +-
 airflow/contrib/kubernetes/__init__.py          |   26 +-
 airflow/contrib/kubernetes/kube_client.py       |   39 +-
 .../contrib/kubernetes/kubernetes_factory.py    |   79 -
 airflow/contrib/kubernetes/kubernetes_helper.py |   45 -
 .../kubernetes/kubernetes_job_builder.py        |   75 -
 .../kubernetes_request_factory/__init__.py      |   24 +-
 .../job_request_factory.py                      |   57 -
 .../kubernetes_request_factory.py               |  100 +-
 .../pod_request_factory.py                      |   32 +-
 airflow/contrib/kubernetes/pod.py               |   54 +-
 airflow/contrib/kubernetes/pod_launcher.py      |  104 +-
 airflow/contrib/kubernetes/secret.py            |   34 +-
 .../contrib/kubernetes/worker_configuration.py  |   70 +-
 .../operators/k8s_pod_operator/__init__.py      |   13 -
 .../k8s_pod_operator/k8s_pod_operator.py        |  120 --
 .../contrib/operators/kubernetes/__init__.py    |   13 -
 .../operators/kubernetes/pod_operator.py        |  100 -
 airflow/dag_importer/__init__.py                |   83 -
 .../example_dags/example_kubernetes_executor.py |    6 +-
 airflow/example_dags/example_pod_operator.py    |   91 -
 airflow/executors/__init__.py                   |    2 +-
 airflow/executors/base_executor.py              |   12 +-
 airflow/executors/celery_executor.py            |    3 +-
 airflow/executors/local_executor.py             |    2 +-
 ...7c24_add_executor_config_to_task_instance.py |    1 +
 ...ff4_add_kubernetes_resource_checkpointing.py |    2 +-
 airflow/models.py                               |   15 +-
 kubectl                                         |    0
 scripts/ci/kubernetes/docker/Dockerfile         |    3 +
 scripts/ci/kubernetes/docker/requirements.txt   |   35 +
 .../ci/kubernetes/kube/airflow.yaml.template    |    6 +-
 scripts/ci/kubernetes/kube/deploy.sh            |   12 +-
 scripts/ci/kubernetes/kube/postgres.yaml        |   54 +-
 .../ci/kubernetes/minikube/start_minikube.sh    |   88 +-
 scripts/ci/travis_script.sh                     |    6 +-
 tests/contrib/executors/integration/__init__.py |   13 -
 .../executors/integration/airflow_controller.py |  114 --
 .../test_kubernetes_executor_integration.py     |   65 -
 .../executors/test_kubernetes_executor.py       |   16 +-
 tests/contrib/kubernetes/test_kubernetes_job.py |   12 -
 .../kubernetes/test_kubernetes_job_launcher.py  |   59 -
 .../minikube_tests/integration/__init__.py      |   13 +
 .../integration/airflow_controller.py           |  166 ++
 .../test_kubernetes_executor_integration.py     |   67 +
 tests/core.py                                   |    2 +-
 tests/jobs.py                                   |    3 +-
 53 files changed, 2641 insertions(+), 3745 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 6fd2d50..daae12c 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -51,20 +51,15 @@ env:
     # does not work with python 3
     - BOTO_CONFIG=/tmp/bogusvalue
   matrix:
-    - TOX_ENV=py27-cdh-airflow_backend_mysql
-    - TOX_ENV=py27-cdh-airflow_backend_sqlite
-    - TOX_ENV=py27-cdh-airflow_backend_postgres
-#    - TOX_ENV=py27-hdp-airflow_backend_mysql
-#    - TOX_ENV=py27-hdp-airflow_backend_sqlite
-#    - TOX_ENV=py27-hdp-airflow_backend_postgres
-    - TOX_ENV=py34-cdh-airflow_backend_mysql
-    - TOX_ENV=py34-cdh-airflow_backend_sqlite
-    - TOX_ENV=py34-cdh-airflow_backend_postgres
-#    - TOX_ENV=py34-hdp-airflow_backend_mysql
-#    - TOX_ENV=py34-hdp-airflow_backend_sqlite
-#    - TOX_ENV=py34-hdp-airflow_backend_postgres
-    # Run integration tests on minikube for the KubernetesExecutor
-    - TOX_ENV=py27-cdh-airflow_backend_postgres RUN_KUBE_INTEGRATION=true
+    - TOX_ENV=py27-backend_mysql
+    - TOX_ENV=py27-backend_sqlite
+    - TOX_ENV=py27-backend_postgres
+    - TOX_ENV=py35-backend_mysql
+    - TOX_ENV=py35-backend_sqlite
+    - TOX_ENV=py35-backend_postgres
+    - TOX_ENV=flake8
+    - TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0
+    - TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.9.0
 matrix:
   exclude:
     - python: "3.5"
@@ -80,15 +75,14 @@ matrix:
     - python: "2.7"
       env: TOX_ENV=py35-backend_postgres
     - python: "2.7"
-      env: TOX_ENV=py34-hdp-airflow_backend_mysql
-    - python: "2.7"
-      env: TOX_ENV=py34-hdp-airflow_backend_sqlite
-    - python: "2.7"
-      env: TOX_ENV=py34-hdp-airflow_backend_postgres
-    - python: "3.4"
-      env: TOX_ENV=py34-cdh-airflow_backend_postgres RUN_KUBE_INTEGRATION=true
+      env: TOX_ENV=flake8
+    - python: "3.5"
+      env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0
+    - python: "3.5"
+      env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.9.0
   allow_failures:
-    - env: TOX_ENV=py27-cdh-airflow_backend_postgres RUN_KUBE_INTEGRATION=true
+    - env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.8.0
+    - env: TOX_ENV=py27-backend_postgres KUBERNETES_VERSION=v1.9.0
 cache:
   directories:
     - $HOME/.wheelhouse/

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/__init__.py b/airflow/__init__.py
index 296b67c..f40b08a 100644
--- a/airflow/__init__.py
+++ b/airflow/__init__.py
@@ -85,7 +85,6 @@ from airflow import sensors  # noqa: E402
 from airflow import hooks
 from airflow import executors
 from airflow import macros
-from airflow import contrib
 
 operators._integrate_plugins()
 sensors._integrate_plugins()  # noqa: E402

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
new file mode 100644
index 0000000..20e7496
--- /dev/null
+++ b/airflow/bin/cli.py
@@ -0,0 +1,1796 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import print_function
+import logging
+
+import reprlib
+
+import os
+import socket
+import subprocess
+import textwrap
+from importlib import import_module
+
+import daemon
+import psutil
+import re
+import getpass
+from urllib.parse import urlunparse
+import reprlib
+import argparse
+from builtins import input
+from collections import namedtuple
+from airflow.utils.timezone import parse as parsedate
+import json
+from tabulate import tabulate
+
+import daemon
+from daemon.pidfile import TimeoutPIDLockFile
+import signal
+import sys
+import threading
+import traceback
+import time
+import psutil
+import re
+from urllib.parse import urlunparse
+
+import airflow
+from airflow import api
+from airflow import jobs, settings
+from airflow import configuration as conf
+from airflow.exceptions import AirflowException, AirflowWebServerTimeout
+from airflow.executors import GetDefaultExecutor
+from airflow.models import (DagModel, DagBag, TaskInstance,
+                            DagPickle, DagRun, Variable, DagStat,
+                            Connection, DAG)
+
+from airflow.ti_deps.dep_context import (DepContext, SCHEDULER_DEPS)
+from airflow.utils import cli as cli_utils
+from airflow.utils import db as db_utils
+from airflow.utils.net import get_hostname
+from airflow.utils.log.logging_mixin import (LoggingMixin, redirect_stderr,
+                                             redirect_stdout)
+from airflow.www.app import (cached_app, create_app)
+from airflow.www_rbac.app import cached_app as cached_app_rbac
+from airflow.www_rbac.app import create_app as create_app_rbac
+from airflow.www_rbac.app import cached_appbuilder
+
+from sqlalchemy import func
+from sqlalchemy.orm import exc
+
+api.load_auth()
+api_module = import_module(conf.get('cli', 'api_client'))
+api_client = api_module.Client(api_base_url=conf.get('cli', 'endpoint_url'),
+                               auth=api.api_auth.client_auth)
+
+log = LoggingMixin().log
+
+
+def sigint_handler(sig, frame):
+    sys.exit(0)
+
+
+def sigquit_handler(sig, frame):
+    """Helps debug deadlocks by printing stacktraces when this gets a SIGQUIT
+    e.g. kill -s QUIT <PID> or CTRL+\
+    """
+    print("Dumping stack traces for all threads in PID {}".format(os.getpid()))
+    id_to_name = dict([(th.ident, th.name) for th in threading.enumerate()])
+    code = []
+    for thread_id, stack in sys._current_frames().items():
+        code.append("\n# Thread: {}({})"
+                    .format(id_to_name.get(thread_id, ""), thread_id))
+        for filename, line_number, name, line in 
traceback.extract_stack(stack):
+            code.append('File: "{}", line {}, in {}'
+                        .format(filename, line_number, name))
+            if line:
+                code.append("  {}".format(line.strip()))
+    print("\n".join(code))
+
+
+def setup_logging(filename):
+    root = logging.getLogger()
+    handler = logging.FileHandler(filename)
+    formatter = logging.Formatter(settings.SIMPLE_LOG_FORMAT)
+    handler.setFormatter(formatter)
+    root.addHandler(handler)
+    root.setLevel(settings.LOGGING_LEVEL)
+
+    return handler.stream
+
+
+def setup_locations(process, pid=None, stdout=None, stderr=None, log=None):
+    if not stderr:
+        stderr = os.path.join(os.path.expanduser(settings.AIRFLOW_HOME), 
"airflow-{}.err".format(process))
+    if not stdout:
+        stdout = os.path.join(os.path.expanduser(settings.AIRFLOW_HOME), 
"airflow-{}.out".format(process))
+    if not log:
+        log = os.path.join(os.path.expanduser(settings.AIRFLOW_HOME), 
"airflow-{}.log".format(process))
+    if not pid:
+        pid = os.path.join(os.path.expanduser(settings.AIRFLOW_HOME), 
"airflow-{}.pid".format(process))
+
+    return pid, stdout, stderr, log
+
+
+def process_subdir(subdir):
+    if subdir:
+        subdir = subdir.replace('DAGS_FOLDER', settings.DAGS_FOLDER)
+        subdir = os.path.abspath(os.path.expanduser(subdir))
+        return subdir
+
+
+def get_dag(args):
+    dagbag = DagBag(process_subdir(args.subdir))
+    if args.dag_id not in dagbag.dags:
+        raise AirflowException(
+            'dag_id could not be found: {}. Either the dag did not exist or it 
failed to '
+            'parse.'.format(args.dag_id))
+    return dagbag.dags[args.dag_id]
+
+
+def get_dags(args):
+    if not args.dag_regex:
+        return [get_dag(args)]
+    dagbag = DagBag(process_subdir(args.subdir))
+    matched_dags = [dag for dag in dagbag.dags.values() if re.search(
+        args.dag_id, dag.dag_id)]
+    if not matched_dags:
+        raise AirflowException(
+            'dag_id could not be found with regex: {}. Either the dag did not 
exist '
+            'or it failed to parse.'.format(args.dag_id))
+    return matched_dags
+
+
+@cli_utils.action_logging
+def backfill(args, dag=None):
+    logging.basicConfig(
+        level=settings.LOGGING_LEVEL,
+        format=settings.SIMPLE_LOG_FORMAT)
+
+    dag = dag or get_dag(args)
+
+    if not args.start_date and not args.end_date:
+        raise AirflowException("Provide a start_date and/or end_date")
+
+    # If only one date is passed, using same as start and end
+    args.end_date = args.end_date or args.start_date
+    args.start_date = args.start_date or args.end_date
+
+    if args.task_regex:
+        dag = dag.sub_dag(
+            task_regex=args.task_regex,
+            include_upstream=not args.ignore_dependencies)
+
+    if args.dry_run:
+        print("Dry run of DAG {0} on {1}".format(args.dag_id,
+                                                 args.start_date))
+        for task in dag.tasks:
+            print("Task {0}".format(task.task_id))
+            ti = TaskInstance(task, args.start_date)
+            ti.dry_run()
+    else:
+        dag.run(
+            start_date=args.start_date,
+            end_date=args.end_date,
+            mark_success=args.mark_success,
+            include_adhoc=args.include_adhoc,
+            local=args.local,
+            donot_pickle=(args.donot_pickle or
+                          conf.getboolean('core', 'donot_pickle')),
+            ignore_first_depends_on_past=args.ignore_first_depends_on_past,
+            ignore_task_deps=args.ignore_dependencies,
+            pool=args.pool,
+            delay_on_limit_secs=args.delay_on_limit)
+
+
+@cli_utils.action_logging
+def trigger_dag(args):
+    """
+    Creates a dag run for the specified dag
+    :param args:
+    :return:
+    """
+    log = LoggingMixin().log
+    try:
+        message = api_client.trigger_dag(dag_id=args.dag_id,
+                                         run_id=args.run_id,
+                                         conf=args.conf,
+                                         execution_date=args.exec_date)
+    except IOError as err:
+        log.error(err)
+        raise AirflowException(err)
+    log.info(message)
+
+
+@cli_utils.action_logging
+def delete_dag(args):
+    """
+    Deletes all DB records related to the specified dag
+    :param args:
+    :return:
+    """
+    log = LoggingMixin().log
+    if args.yes or input(
+            "This will drop all existing records related to the specified DAG. 
"
+            "Proceed? (y/n)").upper() == "Y":
+        try:
+            message = api_client.delete_dag(dag_id=args.dag_id)
+        except IOError as err:
+            log.error(err)
+            raise AirflowException(err)
+        log.info(message)
+    else:
+        print("Bail.")
+
+
+@cli_utils.action_logging
+def pool(args):
+    log = LoggingMixin().log
+
+    def _tabulate(pools):
+        return "\n%s" % tabulate(pools, ['Pool', 'Slots', 'Description'],
+                                 tablefmt="fancy_grid")
+
+    try:
+        if args.get is not None:
+            pools = [api_client.get_pool(name=args.get)]
+        elif args.set:
+            pools = [api_client.create_pool(name=args.set[0],
+                                            slots=args.set[1],
+                                            description=args.set[2])]
+        elif args.delete:
+            pools = [api_client.delete_pool(name=args.delete)]
+        else:
+            pools = api_client.get_pools()
+    except (AirflowException, IOError) as err:
+        log.error(err)
+    else:
+        log.info(_tabulate(pools=pools))
+
+<<<<<<< HEAD:airflow/bin/cli/cli.py
+@cli_utils.action_logging
+def variables(args):
+=======
+>>>>>>> [AIRFLOW-1314] Rebasing against master:airflow/bin/cli.py
+
+def variables(args):
+    if args.get:
+        try:
+            var = Variable.get(args.get,
+                               deserialize_json=args.json,
+                               default_var=args.default)
+            print(var)
+        except ValueError as e:
+            print(e)
+    if args.delete:
+        session = settings.Session()
+        session.query(Variable).filter_by(key=args.delete).delete()
+        session.commit()
+        session.close()
+    if args.set:
+        Variable.set(args.set[0], args.set[1])
+    # Work around 'import' as a reserved keyword
+    imp = getattr(args, 'import')
+    if imp:
+        if os.path.exists(imp):
+            import_helper(imp)
+        else:
+            print("Missing variables file.")
+    if args.export:
+        export_helper(args.export)
+    if not (args.set or args.get or imp or args.export or args.delete):
+        # list all variables
+        session = settings.Session()
+        vars = session.query(Variable)
+        msg = "\n".join(var.key for var in vars)
+        print(msg)
+
+
+def import_helper(filepath):
+    with open(filepath, 'r') as varfile:
+        var = varfile.read()
+
+    try:
+        d = json.loads(var)
+    except Exception:
+        print("Invalid variables file.")
+    else:
+        try:
+            n = 0
+            for k, v in d.items():
+                if isinstance(v, dict):
+                    Variable.set(k, v, serialize_json=True)
+                else:
+                    Variable.set(k, v)
+                n += 1
+        except Exception:
+            pass
+        finally:
+            print("{} of {} variables successfully updated.".format(n, len(d)))
+
+
+def export_helper(filepath):
+    session = settings.Session()
+    qry = session.query(Variable).all()
+    session.close()
+
+    var_dict = {}
+    d = json.JSONDecoder()
+    for var in qry:
+        val = None
+        try:
+            val = d.decode(var.val)
+        except Exception:
+            val = var.val
+        var_dict[var.key] = val
+
+    with open(filepath, 'w') as varfile:
+        varfile.write(json.dumps(var_dict, sort_keys=True, indent=4))
+    print("{} variables successfully exported to {}".format(len(var_dict), 
filepath))
+
+
+@cli_utils.action_logging
+def pause(args, dag=None):
+    set_is_paused(True, args, dag)
+
+
+@cli_utils.action_logging
+def unpause(args, dag=None):
+    set_is_paused(False, args, dag)
+
+
+def set_is_paused(is_paused, args, dag=None):
+    dag = dag or get_dag(args)
+
+    session = settings.Session()
+    dm = session.query(DagModel).filter(
+        DagModel.dag_id == dag.dag_id).first()
+    dm.is_paused = is_paused
+    session.commit()
+
+    msg = "Dag: {}, paused: {}".format(dag, str(dag.is_paused))
+    print(msg)
+
+
+def _run(args, dag, ti):
+    if args.local:
+        run_job = jobs.LocalTaskJob(
+            task_instance=ti,
+            mark_success=args.mark_success,
+            pickle_id=args.pickle,
+            ignore_all_deps=args.ignore_all_dependencies,
+            ignore_depends_on_past=args.ignore_depends_on_past,
+            ignore_task_deps=args.ignore_dependencies,
+            ignore_ti_state=args.force,
+            pool=args.pool)
+        run_job.run()
+    elif args.raw:
+        ti._run_raw_task(
+            mark_success=args.mark_success,
+            job_id=args.job_id,
+            pool=args.pool,
+        )
+    else:
+        pickle_id = None
+        if args.ship_dag:
+            try:
+                # Running remotely, so pickling the DAG
+                session = settings.Session()
+                pickle = DagPickle(dag)
+                session.add(pickle)
+                session.commit()
+                pickle_id = pickle.id
+                # TODO: This should be written to a log
+                print('Pickled dag {dag} as pickle_id:{pickle_id}'
+                      .format(**locals()))
+            except Exception as e:
+                print('Could not pickle the DAG')
+                print(e)
+                raise e
+
+        executor = GetDefaultExecutor()
+        executor.start()
+        print("Sending to executor.")
+        executor.queue_task_instance(
+            ti,
+            mark_success=args.mark_success,
+            pickle_id=pickle_id,
+            ignore_all_deps=args.ignore_all_dependencies,
+            ignore_depends_on_past=args.ignore_depends_on_past,
+            ignore_task_deps=args.ignore_dependencies,
+            ignore_ti_state=args.force,
+            pool=args.pool)
+        executor.heartbeat()
+        executor.end()
+
+
+@cli_utils.action_logging
+def run(args, dag=None):
+    # Disable connection pooling to reduce the # of connections on the DB
+    # while it's waiting for the task to finish.
+    settings.configure_orm(disable_connection_pool=True)
+
+    if dag:
+        args.dag_id = dag.dag_id
+
+    log = LoggingMixin().log
+
+    # Load custom airflow config
+    if args.cfg_path:
+        with open(args.cfg_path, 'r') as conf_file:
+            conf_dict = json.load(conf_file)
+
+        if os.path.exists(args.cfg_path):
+            os.remove(args.cfg_path)
+
+        for section, config in conf_dict.items():
+            for option, value in config.items():
+                conf.set(section, option, value)
+        settings.configure_vars()
+        settings.configure_orm()
+
+    if not args.pickle and not dag:
+        dag = get_dag(args)
+    elif not dag:
+        session = settings.Session()
+        log.info('Loading pickle id {args.pickle}'.format(args=args))
+        dag_pickle = session.query(
+            DagPickle).filter(DagPickle.id == args.pickle).first()
+        if not dag_pickle:
+            raise AirflowException("Who hid the pickle!? [missing pickle]")
+        dag = dag_pickle.pickle
+
+    task = dag.get_task(task_id=args.task_id)
+    ti = TaskInstance(task, args.execution_date)
+    ti.refresh_from_db()
+
+    ti.init_run_context(raw=args.raw)
+
+    hostname = get_hostname()
+    log.info("Running %s on host %s", ti, hostname)
+
+    if args.interactive:
+        _run(args, dag, ti)
+    else:
+        with redirect_stdout(ti.log, logging.INFO), redirect_stderr(ti.log, 
logging.WARN):
+            _run(args, dag, ti)
+
+
+@cli_utils.action_logging
+def task_failed_deps(args):
+    """
+    Returns the unmet dependencies for a task instance from the perspective of 
the
+    scheduler (i.e. why a task instance doesn't get scheduled and then queued 
by the
+    scheduler, and then run by an executor).
+    >>> airflow task_failed_deps tutorial sleep 2015-01-01
+    Task instance dependencies not met:
+    Dagrun Running: Task instance's dagrun did not exist: Unknown reason
+    Trigger Rule: Task's trigger rule 'all_success' requires all upstream 
tasks to have succeeded, but found 1 non-success(es).
+    """
+    dag = get_dag(args)
+    task = dag.get_task(task_id=args.task_id)
+    ti = TaskInstance(task, args.execution_date)
+
+    dep_context = DepContext(deps=SCHEDULER_DEPS)
+    failed_deps = list(ti.get_failed_dep_statuses(dep_context=dep_context))
+    # TODO, Do we want to print or log this
+    if failed_deps:
+        print("Task instance dependencies not met:")
+        for dep in failed_deps:
+            print("{}: {}".format(dep.dep_name, dep.reason))
+    else:
+        print("Task instance dependencies are all met.")
+
+
+@cli_utils.action_logging
+def task_state(args):
+    """
+    Returns the state of a TaskInstance at the command line.
+    >>> airflow task_state tutorial sleep 2015-01-01
+    success
+    """
+    dag = get_dag(args)
+    task = dag.get_task(task_id=args.task_id)
+    ti = TaskInstance(task, args.execution_date)
+    print(ti.current_state())
+
+
+@cli_utils.action_logging
+def dag_state(args):
+    """
+    Returns the state of a DagRun at the command line.
+    >>> airflow dag_state tutorial 2015-01-01T00:00:00.000000
+    running
+    """
+    dag = get_dag(args)
+    dr = DagRun.find(dag.dag_id, execution_date=args.execution_date)
+    print(dr[0].state if len(dr) > 0 else None)
+
+
+@cli_utils.action_logging
+def list_dags(args):
+    dagbag = DagBag(process_subdir(args.subdir))
+    s = textwrap.dedent("""\n
+    -------------------------------------------------------------------
+    DAGS
+    -------------------------------------------------------------------
+    {dag_list}
+    """)
+    dag_list = "\n".join(sorted(dagbag.dags))
+    print(s.format(dag_list=dag_list))
+    if args.report:
+        print(dagbag.dagbag_report())
+
+
+@cli_utils.action_logging
+def list_tasks(args, dag=None):
+    dag = dag or get_dag(args)
+    if args.tree:
+        dag.tree_view()
+    else:
+        tasks = sorted([t.task_id for t in dag.tasks])
+        print("\n".join(sorted(tasks)))
+
+
+@cli_utils.action_logging
+def test(args, dag=None):
+    dag = dag or get_dag(args)
+
+    task = dag.get_task(task_id=args.task_id)
+    # Add CLI provided task_params to task.params
+    if args.task_params:
+        passed_in_params = json.loads(args.task_params)
+        task.params.update(passed_in_params)
+    ti = TaskInstance(task, args.execution_date)
+
+    if args.dry_run:
+        ti.dry_run()
+    else:
+        ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
+
+
+@cli_utils.action_logging
+def render(args):
+    dag = get_dag(args)
+    task = dag.get_task(task_id=args.task_id)
+    ti = TaskInstance(task, args.execution_date)
+    ti.render_templates()
+    for attr in task.__class__.template_fields:
+        print(textwrap.dedent("""\
+        # ----------------------------------------------------------
+        # property: {}
+        # ----------------------------------------------------------
+        {}
+        """.format(attr, getattr(task, attr))))
+
+
+@cli_utils.action_logging
+def clear(args):
+    logging.basicConfig(
+        level=settings.LOGGING_LEVEL,
+        format=settings.SIMPLE_LOG_FORMAT)
+    dags = get_dags(args)
+
+    if args.task_regex:
+        for idx, dag in enumerate(dags):
+            dags[idx] = dag.sub_dag(
+                task_regex=args.task_regex,
+                include_downstream=args.downstream,
+                include_upstream=args.upstream)
+
+    DAG.clear_dags(
+        dags,
+        start_date=args.start_date,
+        end_date=args.end_date,
+        only_failed=args.only_failed,
+        only_running=args.only_running,
+        confirm_prompt=not args.no_confirm,
+        include_subdags=not args.exclude_subdags)
+
+
+def get_num_ready_workers_running(gunicorn_master_proc):
+    workers = psutil.Process(gunicorn_master_proc.pid).children()
+
+    def ready_prefix_on_cmdline(proc):
+        try:
+            cmdline = proc.cmdline()
+            if len(cmdline) > 0:
+                return settings.GUNICORN_WORKER_READY_PREFIX in cmdline[0]
+        except psutil.NoSuchProcess:
+            pass
+        return False
+
+    ready_workers = [proc for proc in workers if ready_prefix_on_cmdline(proc)]
+    return len(ready_workers)
+
+
+def get_num_workers_running(gunicorn_master_proc):
+    workers = psutil.Process(gunicorn_master_proc.pid).children()
+    return len(workers)
+
+
+def restart_workers(gunicorn_master_proc, num_workers_expected, 
master_timeout):
+    """
+    Runs forever, monitoring the child processes of @gunicorn_master_proc and
+    restarting workers occasionally.
+    Each iteration of the loop traverses one edge of this state transition
+    diagram, where each state (node) represents
+    [ num_ready_workers_running / num_workers_running ]. We expect most time to
+    be spent in [n / n]. `bs` is the setting 
webserver.worker_refresh_batch_size.
+    The horizontal transition at ? happens after the new worker parses all the
+    dags (so it could take a while!)
+       V 
────────────────────────────────────────────────────────────────────────┐
+    [n / n] ──TTIN──> [ [n, n+bs) / n + bs ]  ────?───> 
[n + bs / n + bs] ──TTOU─┘
+       ^                          
^───────────────┘
+       │
+       │      ┌────────────────v
+       └──────┴────── [ [0, n) / n ] <─── 
start
+    We change the number of workers by sending TTIN and TTOU to the gunicorn
+    master process, which increases and decreases the number of child workers
+    respectively. Gunicorn guarantees that on TTOU workers are terminated
+    gracefully and that the oldest worker is terminated.
+    """
+
+    def wait_until_true(fn, timeout=0):
+        """
+        Sleeps until fn is true
+        """
+        t = time.time()
+        while not fn():
+            if 0 < timeout and timeout <= time.time() - t:
+                raise AirflowWebServerTimeout(
+                    "No response from gunicorn master within {0} seconds"
+                    .format(timeout))
+            time.sleep(0.1)
+
+    def start_refresh(gunicorn_master_proc):
+        batch_size = conf.getint('webserver', 'worker_refresh_batch_size')
+        log.debug('%s doing a refresh of %s workers', state, batch_size)
+        sys.stdout.flush()
+        sys.stderr.flush()
+
+        excess = 0
+        for _ in range(batch_size):
+            gunicorn_master_proc.send_signal(signal.SIGTTIN)
+            excess += 1
+            wait_until_true(lambda: num_workers_expected + excess ==
+                            get_num_workers_running(gunicorn_master_proc),
+                            master_timeout)
+
+    try:
+        wait_until_true(lambda: num_workers_expected ==
+                        get_num_workers_running(gunicorn_master_proc),
+                        master_timeout)
+        while True:
+            num_workers_running = get_num_workers_running(gunicorn_master_proc)
+            num_ready_workers_running = \
+                get_num_ready_workers_running(gunicorn_master_proc)
+
+            state = '[{0} / {1}]'.format(num_ready_workers_running, 
num_workers_running)
+
+            # Whenever some workers are not ready, wait until all workers are 
ready
+            if num_ready_workers_running < num_workers_running:
+                log.debug('%s some workers are starting up, waiting...', state)
+                sys.stdout.flush()
+                time.sleep(1)
+
+            # Kill a worker gracefully by asking gunicorn to reduce number of 
workers
+            elif num_workers_running > num_workers_expected:
+                excess = num_workers_running - num_workers_expected
+                log.debug('%s killing %s workers', state, excess)
+
+                for _ in range(excess):
+                    gunicorn_master_proc.send_signal(signal.SIGTTOU)
+                    excess -= 1
+                    wait_until_true(lambda: num_workers_expected + excess ==
+                                    
get_num_workers_running(gunicorn_master_proc),
+                                    master_timeout)
+
+            # Start a new worker by asking gunicorn to increase number of 
workers
+            elif num_workers_running == num_workers_expected:
+                refresh_interval = conf.getint('webserver', 
'worker_refresh_interval')
+                log.debug(
+                    '%s sleeping for %ss starting doing a refresh...',
+                    state, refresh_interval
+                )
+                time.sleep(refresh_interval)
+                start_refresh(gunicorn_master_proc)
+
+            else:
+                # num_ready_workers_running == num_workers_running < 
num_workers_expected
+                log.error((
+                    "%s some workers seem to have died and gunicorn"
+                    "did not restart them as expected"
+                ), state)
+                time.sleep(10)
+                if len(
+                    psutil.Process(gunicorn_master_proc.pid).children()
+                ) < num_workers_expected:
+                    start_refresh(gunicorn_master_proc)
+    except (AirflowWebServerTimeout, OSError) as err:
+        log.error(err)
+        log.error("Shutting down webserver")
+        try:
+            gunicorn_master_proc.terminate()
+            gunicorn_master_proc.wait()
+        finally:
+            sys.exit(1)
+
+
+@cli_utils.action_logging
+def webserver(args):
+    print(settings.HEADER)
+
+    access_logfile = args.access_logfile or conf.get('webserver', 
'access_logfile')
+    error_logfile = args.error_logfile or conf.get('webserver', 
'error_logfile')
+    num_workers = args.workers or conf.get('webserver', 'workers')
+    worker_timeout = (args.worker_timeout or
+                      conf.get('webserver', 'web_server_worker_timeout'))
+    ssl_cert = args.ssl_cert or conf.get('webserver', 'web_server_ssl_cert')
+    ssl_key = args.ssl_key or conf.get('webserver', 'web_server_ssl_key')
+    if not ssl_cert and ssl_key:
+        raise AirflowException(
+            'An SSL certificate must also be provided for use with ' + ssl_key)
+    if ssl_cert and not ssl_key:
+        raise AirflowException(
+            'An SSL key must also be provided for use with ' + ssl_cert)
+
+    if args.debug:
+        print(
+            "Starting the web server on port {0} and host {1}.".format(
+                args.port, args.hostname))
+        app = create_app_rbac(conf) if settings.RBAC else create_app(conf)
+        app.run(debug=True, port=args.port, host=args.hostname,
+                ssl_context=(ssl_cert, ssl_key) if ssl_cert and ssl_key else 
None)
+    else:
+        app = cached_app_rbac(conf) if settings.RBAC else cached_app(conf)
+        pid, stdout, stderr, log_file = setup_locations(
+            "webserver", args.pid, args.stdout, args.stderr, args.log_file)
+        if args.daemon:
+            handle = setup_logging(log_file)
+            stdout = open(stdout, 'w+')
+            stderr = open(stderr, 'w+')
+
+        print(
+            textwrap.dedent('''\
+                Running the Gunicorn Server with:
+                Workers: {num_workers} {args.workerclass}
+                Host: {args.hostname}:{args.port}
+                Timeout: {worker_timeout}
+                Logfiles: {access_logfile} {error_logfile}
+                
=================================================================\
+            '''.format(**locals())))
+
+        run_args = [
+            'gunicorn',
+            '-w', str(num_workers),
+            '-k', str(args.workerclass),
+            '-t', str(worker_timeout),
+            '-b', args.hostname + ':' + str(args.port),
+            '-n', 'airflow-webserver',
+            '-p', str(pid),
+            '-c', 'python:airflow.www.gunicorn_config',
+        ]
+
+        if args.access_logfile:
+            run_args += ['--access-logfile', str(args.access_logfile)]
+
+        if args.error_logfile:
+            run_args += ['--error-logfile', str(args.error_logfile)]
+
+        if args.daemon:
+            run_args += ['-D']
+
+        if ssl_cert:
+            run_args += ['--certfile', ssl_cert, '--keyfile', ssl_key]
+
+        webserver_module = 'www_rbac' if settings.RBAC else 'www'
+        run_args += ["airflow." + webserver_module + ".app:cached_app()"]
+
+        gunicorn_master_proc = None
+
+        def kill_proc(dummy_signum, dummy_frame):
+            gunicorn_master_proc.terminate()
+            gunicorn_master_proc.wait()
+            sys.exit(0)
+
+        def monitor_gunicorn(gunicorn_master_proc):
+            # These run forever until SIG{INT, TERM, KILL, ...} signal is sent
+            if conf.getint('webserver', 'worker_refresh_interval') > 0:
+                master_timeout = conf.getint('webserver', 
'web_server_master_timeout')
+                restart_workers(gunicorn_master_proc, num_workers, 
master_timeout)
+            else:
+                while True:
+                    time.sleep(1)
+
+        if args.daemon:
+            base, ext = os.path.splitext(pid)
+            ctx = daemon.DaemonContext(
+                pidfile=TimeoutPIDLockFile(base + "-monitor" + ext, -1),
+                files_preserve=[handle],
+                stdout=stdout,
+                stderr=stderr,
+                signal_map={
+                    signal.SIGINT: kill_proc,
+                    signal.SIGTERM: kill_proc
+                },
+            )
+            with ctx:
+                subprocess.Popen(run_args, close_fds=True)
+
+                # Reading pid file directly, since Popen#pid doesn't
+                # seem to return the right value with DaemonContext.
+                while True:
+                    try:
+                        with open(pid) as f:
+                            gunicorn_master_proc_pid = int(f.read())
+                            break
+                    except IOError:
+                        log.debug("Waiting for gunicorn's pid file to be 
created.")
+                        time.sleep(0.1)
+
+                gunicorn_master_proc = psutil.Process(gunicorn_master_proc_pid)
+                monitor_gunicorn(gunicorn_master_proc)
+
+            stdout.close()
+            stderr.close()
+        else:
+            gunicorn_master_proc = subprocess.Popen(run_args, close_fds=True)
+
+            signal.signal(signal.SIGINT, kill_proc)
+            signal.signal(signal.SIGTERM, kill_proc)
+
+            monitor_gunicorn(gunicorn_master_proc)
+
+
+@cli_utils.action_logging
+def scheduler(args):
+    print(settings.HEADER)
+    job = jobs.SchedulerJob(
+        dag_id=args.dag_id,
+        subdir=process_subdir(args.subdir),
+        run_duration=args.run_duration,
+        num_runs=args.num_runs,
+        do_pickle=args.do_pickle)
+
+    if args.daemon:
+        pid, stdout, stderr, log_file = setup_locations("scheduler", args.pid, 
args.stdout, args.stderr, args.log_file)
+        handle = setup_logging(log_file)
+        stdout = open(stdout, 'w+')
+        stderr = open(stderr, 'w+')
+
+        ctx = daemon.DaemonContext(
+            pidfile=TimeoutPIDLockFile(pid, -1),
+            files_preserve=[handle],
+            stdout=stdout,
+            stderr=stderr,
+        )
+        with ctx:
+            job.run()
+
+        stdout.close()
+        stderr.close()
+    else:
+        signal.signal(signal.SIGINT, sigint_handler)
+        signal.signal(signal.SIGTERM, sigint_handler)
+        signal.signal(signal.SIGQUIT, sigquit_handler)
+        job.run()
+
+
+@cli_utils.action_logging
+def serve_logs(args):
+    print("Starting flask")
+    import flask
+    flask_app = flask.Flask(__name__)
+
+    @flask_app.route('/log/<path:filename>')
+    def serve_logs(filename):  # noqa
+        log = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER'))
+        return flask.send_from_directory(
+            log,
+            filename,
+            mimetype="application/json",
+            as_attachment=False)
+
+    WORKER_LOG_SERVER_PORT = \
+        int(conf.get('celery', 'WORKER_LOG_SERVER_PORT'))
+    flask_app.run(
+        host='0.0.0.0', port=WORKER_LOG_SERVER_PORT)
+
+
+@cli_utils.action_logging
+def worker(args):
+    env = os.environ.copy()
+    env['AIRFLOW_HOME'] = settings.AIRFLOW_HOME
+
+    # Celery worker
+    from airflow.executors.celery_executor import app as celery_app
+    from celery.bin import worker
+
+    worker = worker.worker(app=celery_app)
+    options = {
+        'optimization': 'fair',
+        'O': 'fair',
+        'queues': args.queues,
+        'concurrency': args.concurrency,
+        'hostname': args.celery_hostname,
+    }
+
+    if args.daemon:
+        pid, stdout, stderr, log_file = setup_locations("worker", args.pid, 
args.stdout, args.stderr, args.log_file)
+        handle = setup_logging(log_file)
+        stdout = open(stdout, 'w+')
+        stderr = open(stderr, 'w+')
+
+        ctx = daemon.DaemonContext(
+            pidfile=TimeoutPIDLockFile(pid, -1),
+            files_preserve=[handle],
+            stdout=stdout,
+            stderr=stderr,
+        )
+        with ctx:
+            sp = subprocess.Popen(['airflow', 'serve_logs'], env=env, 
close_fds=True)
+            worker.run(**options)
+            sp.kill()
+
+        stdout.close()
+        stderr.close()
+    else:
+        signal.signal(signal.SIGINT, sigint_handler)
+        signal.signal(signal.SIGTERM, sigint_handler)
+
+        sp = subprocess.Popen(['airflow', 'serve_logs'], env=env, 
close_fds=True)
+
+        worker.run(**options)
+        sp.kill()
+
+
+@cli_utils.action_logging
+def initdb(args):  # noqa
+    print("DB: " + repr(settings.engine.url))
+    db_utils.initdb(settings.RBAC)
+    print("Done.")
+
+
+@cli_utils.action_logging
+def resetdb(args):
+    print("DB: " + repr(settings.engine.url))
+    if args.yes or input(
+        "This will drop existing tables if they exist. "
+        "Proceed? (y/n)").upper() == "Y":
+        db_utils.resetdb(settings.RBAC)
+    else:
+        print("Bail.")
+
+
+@cli_utils.action_logging
+def upgradedb(args):  # noqa
+    print("DB: " + repr(settings.engine.url))
+    db_utils.upgradedb()
+
+    # Populate DagStats table
+    session = settings.Session()
+    ds_rows = session.query(DagStat).count()
+    if not ds_rows:
+        qry = (
+            session.query(DagRun.dag_id, DagRun.state, func.count('*'))
+                .group_by(DagRun.dag_id, DagRun.state)
+        )
+        for dag_id, state, count in qry:
+            session.add(DagStat(dag_id=dag_id, state=state, count=count))
+        session.commit()
+
+
+@cli_utils.action_logging
+def version(args):  # noqa
+    print(settings.HEADER + "  v" + airflow.__version__)
+
+
+alternative_conn_specs = ['conn_type', 'conn_host',
+                          'conn_login', 'conn_password', 'conn_schema', 
'conn_port']
+
+
+@cli_utils.action_logging
+def connections(args):
+    if args.list:
+        # Check that no other flags were passed to the command
+        invalid_args = list()
+        for arg in ['conn_id', 'conn_uri', 'conn_extra'] + 
alternative_conn_specs:
+            if getattr(args, arg) is not None:
+                invalid_args.append(arg)
+        if invalid_args:
+            msg = ('\n\tThe following args are not compatible with the ' +
+                   '--list flag: {invalid!r}\n')
+            msg = msg.format(invalid=invalid_args)
+            print(msg)
+            return
+
+        session = settings.Session()
+        conns = session.query(Connection.conn_id, Connection.conn_type,
+                              Connection.host, Connection.port,
+                              Connection.is_encrypted,
+                              Connection.is_extra_encrypted,
+                              Connection.extra).all()
+        conns = [map(reprlib.repr, conn) for conn in conns]
+        print(tabulate(conns, ['Conn Id', 'Conn Type', 'Host', 'Port',
+                               'Is Encrypted', 'Is Extra Encrypted', 'Extra'],
+                       tablefmt="fancy_grid"))
+        return
+
+    if args.delete:
+        # Check that only the `conn_id` arg was passed to the command
+        invalid_args = list()
+        for arg in ['conn_uri', 'conn_extra'] + alternative_conn_specs:
+            if getattr(args, arg) is not None:
+                invalid_args.append(arg)
+        if invalid_args:
+            msg = ('\n\tThe following args are not compatible with the ' +
+                   '--delete flag: {invalid!r}\n')
+            msg = msg.format(invalid=invalid_args)
+            print(msg)
+            return
+
+        if args.conn_id is None:
+            print('\n\tTo delete a connection, you Must provide a value for ' +
+                  'the --conn_id flag.\n')
+            return
+
+        session = settings.Session()
+        try:
+            to_delete = (session
+                         .query(Connection)
+                         .filter(Connection.conn_id == args.conn_id)
+                         .one())
+        except exc.NoResultFound:
+            msg = '\n\tDid not find a connection with `conn_id`={conn_id}\n'
+            msg = msg.format(conn_id=args.conn_id)
+            print(msg)
+            return
+        except exc.MultipleResultsFound:
+            msg = ('\n\tFound more than one connection with ' +
+                   '`conn_id`={conn_id}\n')
+            msg = msg.format(conn_id=args.conn_id)
+            print(msg)
+            return
+        else:
+            deleted_conn_id = to_delete.conn_id
+            session.delete(to_delete)
+            session.commit()
+            msg = '\n\tSuccessfully deleted `conn_id`={conn_id}\n'
+            msg = msg.format(conn_id=deleted_conn_id)
+            print(msg)
+        return
+
+    if args.add:
+        # Check that the conn_id and conn_uri args were passed to the command:
+        missing_args = list()
+        invalid_args = list()
+        if not args.conn_id:
+            missing_args.append('conn_id')
+        if args.conn_uri:
+            for arg in alternative_conn_specs:
+                if getattr(args, arg) is not None:
+                    invalid_args.append(arg)
+        elif not args.conn_type:
+            missing_args.append('conn_uri or conn_type')
+        if missing_args:
+            msg = ('\n\tThe following args are required to add a connection:' +
+                   ' {missing!r}\n'.format(missing=missing_args))
+            print(msg)
+        if invalid_args:
+            msg = ('\n\tThe following args are not compatible with the ' +
+                   '--add flag and --conn_uri flag: {invalid!r}\n')
+            msg = msg.format(invalid=invalid_args)
+            print(msg)
+        if missing_args or invalid_args:
+            return
+
+        if args.conn_uri:
+            new_conn = Connection(conn_id=args.conn_id, uri=args.conn_uri)
+        else:
+            new_conn = Connection(conn_id=args.conn_id, 
conn_type=args.conn_type, host=args.conn_host,
+                                  login=args.conn_login, 
password=args.conn_password, schema=args.conn_schema, port=args.conn_port)
+        if args.conn_extra is not None:
+            new_conn.set_extra(args.conn_extra)
+
+        session = settings.Session()
+        if not (session
+                    .query(Connection)
+                    .filter(Connection.conn_id == new_conn.conn_id).first()):
+            session.add(new_conn)
+            session.commit()
+            msg = '\n\tSuccessfully added `conn_id`={conn_id} : {uri}\n'
+            msg = msg.format(conn_id=new_conn.conn_id, uri=args.conn_uri or 
urlunparse((args.conn_type, '{login}:{password}@{host}:{port}'.format(
+                login=args.conn_login or '', password=args.conn_password or 
'', host=args.conn_host or '', port=args.conn_port or ''), args.conn_schema or 
'', '', '', '')))
+            print(msg)
+        else:
+            msg = '\n\tA connection with `conn_id`={conn_id} already exists\n'
+            msg = msg.format(conn_id=new_conn.conn_id)
+            print(msg)
+
+        return
+
+
+@cli_utils.action_logging
+def flower(args):
+    broka = conf.get('celery', 'BROKER_URL')
+    address = '--address={}'.format(args.hostname)
+    port = '--port={}'.format(args.port)
+    api = ''
+    if args.broker_api:
+        api = '--broker_api=' + args.broker_api
+
+    url_prefix = ''
+    if args.url_prefix:
+        url_prefix = '--url-prefix=' + args.url_prefix
+
+    flower_conf = ''
+    if args.flower_conf:
+        flower_conf = '--conf=' + args.flower_conf
+
+    if args.daemon:
+        pid, stdout, stderr, log_file = setup_locations("flower", args.pid, 
args.stdout, args.stderr, args.log_file)
+        stdout = open(stdout, 'w+')
+        stderr = open(stderr, 'w+')
+
+        ctx = daemon.DaemonContext(
+            pidfile=TimeoutPIDLockFile(pid, -1),
+            stdout=stdout,
+            stderr=stderr,
+        )
+
+        with ctx:
+            os.execvp("flower", ['flower', '-b',
+                                 broka, address, port, api, flower_conf, 
url_prefix])
+
+        stdout.close()
+        stderr.close()
+    else:
+        signal.signal(signal.SIGINT, sigint_handler)
+        signal.signal(signal.SIGTERM, sigint_handler)
+
+        os.execvp("flower", ['flower', '-b',
+                             broka, address, port, api, flower_conf, 
url_prefix])
+
+
+@cli_utils.action_logging
+def kerberos(args):  # noqa
+    print(settings.HEADER)
+    import airflow.security.kerberos
+
+    if args.daemon:
+        pid, stdout, stderr, log_file = setup_locations("kerberos",
+                                                        args.pid,
+                                                        args.stdout,
+                                                        args.stderr,
+                                                        args.log_file)
+        stdout = open(stdout, 'w+')
+        stderr = open(stderr, 'w+')
+
+        ctx = daemon.DaemonContext(
+            pidfile=TimeoutPIDLockFile(pid, -1),
+            stdout=stdout,
+            stderr=stderr,
+        )
+
+        with ctx:
+            airflow.security.kerberos.run()
+
+        stdout.close()
+        stderr.close()
+    else:
+        airflow.security.kerberos.run()
+
+
+@cli_utils.action_logging
+def create_user(args):
+    fields = {
+        'role': args.role,
+        'username': args.username,
+        'email': args.email,
+        'firstname': args.firstname,
+        'lastname': args.lastname,
+    }
+    empty_fields = [k for k, v in fields.items() if not v]
+    if empty_fields:
+        print('Missing arguments: {}.'.format(', '.join(empty_fields)))
+        sys.exit(0)
+
+    appbuilder = cached_appbuilder()
+    role = appbuilder.sm.find_role(args.role)
+    if not role:
+        print('{} is not a valid role.'.format(args.role))
+        sys.exit(0)
+
+    password = getpass.getpass('Password:')
+    password_confirmation = getpass.getpass('Repeat for confirmation:')
+    if password != password_confirmation:
+        print('Passwords did not match!')
+        sys.exit(0)
+
+    user = appbuilder.sm.add_user(args.username, args.firstname, args.lastname,
+                                  args.email, role, password)
+    if user:
+        print('{} user {} created.'.format(args.role, args.username))
+    else:
+        print('Failed to create user.')
+
+
+Arg = namedtuple(
+    'Arg', ['flags', 'help', 'action', 'default', 'nargs', 'type', 'choices', 
'metavar'])
+Arg.__new__.__defaults__ = (None, None, None, None, None, None, None)
+
+
+class CLIFactory(object):
+    args = {
+        # Shared
+        'dag_id': Arg(("dag_id",), "The id of the dag"),
+        'task_id': Arg(("task_id",), "The id of the task"),
+        'execution_date': Arg(
+            ("execution_date",), help="The execution date of the DAG",
+            type=parsedate),
+        'task_regex': Arg(
+            ("-t", "--task_regex"),
+            "The regex to filter specific task_ids to backfill (optional)"),
+        'subdir': Arg(
+            ("-sd", "--subdir"),
+            "File location or directory from which to look for the dag",
+            default=settings.DAGS_FOLDER),
+        'start_date': Arg(
+            ("-s", "--start_date"), "Override start_date YYYY-MM-DD",
+            type=parsedate),
+        'end_date': Arg(
+            ("-e", "--end_date"), "Override end_date YYYY-MM-DD",
+            type=parsedate),
+        'dry_run': Arg(
+            ("-dr", "--dry_run"), "Perform a dry run", "store_true"),
+        'pid': Arg(
+            ("--pid",), "PID file location",
+            nargs='?'),
+        'daemon': Arg(
+            ("-D", "--daemon"), "Daemonize instead of running "
+                                "in the foreground",
+            "store_true"),
+        'stderr': Arg(
+            ("--stderr",), "Redirect stderr to this file"),
+        'stdout': Arg(
+            ("--stdout",), "Redirect stdout to this file"),
+        'log_file': Arg(
+            ("-l", "--log-file"), "Location of the log file"),
+        'yes': Arg(
+            ("-y", "--yes"),
+            "Do not prompt to confirm reset. Use with care!",
+            "store_true",
+            default=False),
+
+        # backfill
+        'mark_success': Arg(
+            ("-m", "--mark_success"),
+            "Mark jobs as succeeded without running them", "store_true"),
+        'local': Arg(
+            ("-l", "--local"),
+            "Run the task using the LocalExecutor", "store_true"),
+        'donot_pickle': Arg(
+            ("-x", "--donot_pickle"), (
+                "Do not attempt to pickle the DAG object to send over "
+                "to the workers, just tell the workers to run their version "
+                "of the code."),
+            "store_true"),
+        'include_adhoc': Arg(
+            ("-a", "--include_adhoc"),
+            "Include dags with the adhoc parameter.", "store_true"),
+        'bf_ignore_dependencies': Arg(
+            ("-i", "--ignore_dependencies"),
+            (
+                "Skip upstream tasks, run only the tasks "
+                "matching the regexp. Only works in conjunction "
+                "with task_regex"),
+            "store_true"),
+        'bf_ignore_first_depends_on_past': Arg(
+            ("-I", "--ignore_first_depends_on_past"),
+            (
+                "Ignores depends_on_past dependencies for the first "
+                "set of tasks only (subsequent executions in the backfill "
+                "DO respect depends_on_past)."),
+            "store_true"),
+        'pool': Arg(("--pool",), "Resource pool to use"),
+        'delay_on_limit': Arg(
+            ("--delay_on_limit",),
+            help=("Amount of time in seconds to wait when the limit "
+                  "on maximum active dag runs (max_active_runs) has "
+                  "been reached before trying to execute a dag run "
+                  "again."),
+            type=float,
+            default=1.0),
+        # list_tasks
+        'tree': Arg(("-t", "--tree"), "Tree view", "store_true"),
+        # list_dags
+        'report': Arg(
+            ("-r", "--report"), "Show DagBag loading report", "store_true"),
+        # clear
+        'upstream': Arg(
+            ("-u", "--upstream"), "Include upstream tasks", "store_true"),
+        'only_failed': Arg(
+            ("-f", "--only_failed"), "Only failed jobs", "store_true"),
+        'only_running': Arg(
+            ("-r", "--only_running"), "Only running jobs", "store_true"),
+        'downstream': Arg(
+            ("-d", "--downstream"), "Include downstream tasks", "store_true"),
+        'no_confirm': Arg(
+            ("-c", "--no_confirm"),
+            "Do not request confirmation", "store_true"),
+        'exclude_subdags': Arg(
+            ("-x", "--exclude_subdags"),
+            "Exclude subdags", "store_true"),
+        'dag_regex': Arg(
+            ("-dx", "--dag_regex"),
+            "Search dag_id as regex instead of exact string", "store_true"),
+        # trigger_dag
+        'run_id': Arg(("-r", "--run_id"), "Helps to identify this run"),
+        'conf': Arg(
+            ('-c', '--conf'),
+            "JSON string that gets pickled into the DagRun's conf attribute"),
+        'exec_date': Arg(
+            ("-e", "--exec_date"), help="The execution date of the DAG",
+            type=parsedate),
+        # pool
+        'pool_set': Arg(
+            ("-s", "--set"),
+            nargs=3,
+            metavar=('NAME', 'SLOT_COUNT', 'POOL_DESCRIPTION'),
+            help="Set pool slot count and description, respectively"),
+        'pool_get': Arg(
+            ("-g", "--get"),
+            metavar='NAME',
+            help="Get pool info"),
+        'pool_delete': Arg(
+            ("-x", "--delete"),
+            metavar="NAME",
+            help="Delete a pool"),
+        # variables
+        'set': Arg(
+            ("-s", "--set"),
+            nargs=2,
+            metavar=('KEY', 'VAL'),
+            help="Set a variable"),
+        'get': Arg(
+            ("-g", "--get"),
+            metavar='KEY',
+            help="Get value of a variable"),
+        'default': Arg(
+            ("-d", "--default"),
+            metavar="VAL",
+            default=None,
+            help="Default value returned if variable does not exist"),
+        'json': Arg(
+            ("-j", "--json"),
+            help="Deserialize JSON variable",
+            action="store_true"),
+        'var_import': Arg(
+            ("-i", "--import"),
+            metavar="FILEPATH",
+            help="Import variables from JSON file"),
+        'var_export': Arg(
+            ("-e", "--export"),
+            metavar="FILEPATH",
+            help="Export variables to JSON file"),
+        'var_delete': Arg(
+            ("-x", "--delete"),
+            metavar="KEY",
+            help="Delete a variable"),
+        # kerberos
+        'principal': Arg(
+            ("principal",), "kerberos principal",
+            nargs='?', default=conf.get('kerberos', 'principal')),
+        'keytab': Arg(
+            ("-kt", "--keytab"), "keytab",
+            nargs='?', default=conf.get('kerberos', 'keytab')),
+        # run
+        # TODO(aoen): "force" is a poor choice of name here since it implies 
it overrides
+        # all dependencies (not just past success), e.g. the 
ignore_depends_on_past
+        # dependency. This flag should be deprecated and renamed to 
'ignore_ti_state' and
+        # the "ignore_all_dependencies" command should be called the"force" 
command
+        # instead.
+        'interactive': Arg(
+            ('-int', '--interactive'),
+            help='Do not capture standard output and error streams '
+                 '(useful for interactive debugging)',
+            action='store_true'),
+        'force': Arg(
+            ("-f", "--force"),
+            "Ignore previous task instance state, rerun regardless if task 
already "
+            "succeeded/failed",
+            "store_true"),
+        'raw': Arg(("-r", "--raw"), argparse.SUPPRESS, "store_true"),
+        'ignore_all_dependencies': Arg(
+            ("-A", "--ignore_all_dependencies"),
+            "Ignores all non-critical dependencies, including ignore_ti_state 
and "
+            "ignore_task_deps",
+            "store_true"),
+        # TODO(aoen): ignore_dependencies is a poor choice of name here 
because it is too
+        # vague (e.g. a task being in the appropriate state to be run is also 
a dependency
+        # but is not ignored by this flag), the name 
'ignore_task_dependencies' is
+        # slightly better (as it ignores all dependencies that are specific to 
the task),
+        # so deprecate the old command name and use this instead.
+        'ignore_dependencies': Arg(
+            ("-i", "--ignore_dependencies"),
+            "Ignore task-specific dependencies, e.g. upstream, 
depends_on_past, and "
+            "retry delay dependencies",
+            "store_true"),
+        'ignore_depends_on_past': Arg(
+            ("-I", "--ignore_depends_on_past"),
+            "Ignore depends_on_past dependencies (but respect "
+            "upstream dependencies)",
+            "store_true"),
+        'ship_dag': Arg(
+            ("--ship_dag",),
+            "Pickles (serializes) the DAG and ships it to the worker",
+            "store_true"),
+        'pickle': Arg(
+            ("-p", "--pickle"),
+            "Serialized pickle object of the entire dag (used internally)"),
+        'job_id': Arg(("-j", "--job_id"), argparse.SUPPRESS),
+        'cfg_path': Arg(
+            ("--cfg_path",), "Path to config file to use instead of 
airflow.cfg"),
+        # webserver
+        'port': Arg(
+            ("-p", "--port"),
+            default=conf.get('webserver', 'WEB_SERVER_PORT'),
+            type=int,
+            help="The port on which to run the server"),
+        'ssl_cert': Arg(
+            ("--ssl_cert",),
+            default=conf.get('webserver', 'WEB_SERVER_SSL_CERT'),
+            help="Path to the SSL certificate for the webserver"),
+        'ssl_key': Arg(
+            ("--ssl_key",),
+            default=conf.get('webserver', 'WEB_SERVER_SSL_KEY'),
+            help="Path to the key to use with the SSL certificate"),
+        'workers': Arg(
+            ("-w", "--workers"),
+            default=conf.get('webserver', 'WORKERS'),
+            type=int,
+            help="Number of workers to run the webserver on"),
+        'workerclass': Arg(
+            ("-k", "--workerclass"),
+            default=conf.get('webserver', 'WORKER_CLASS'),
+            choices=['sync', 'eventlet', 'gevent', 'tornado'],
+            help="The worker class to use for Gunicorn"),
+        'worker_timeout': Arg(
+            ("-t", "--worker_timeout"),
+            default=conf.get('webserver', 'WEB_SERVER_WORKER_TIMEOUT'),
+            type=int,
+            help="The timeout for waiting on webserver workers"),
+        'hostname': Arg(
+            ("-hn", "--hostname"),
+            default=conf.get('webserver', 'WEB_SERVER_HOST'),
+            help="Set the hostname on which to run the web server"),
+        'debug': Arg(
+            ("-d", "--debug"),
+            "Use the server that ships with Flask in debug mode",
+            "store_true"),
+        'access_logfile': Arg(
+            ("-A", "--access_logfile"),
+            default=conf.get('webserver', 'ACCESS_LOGFILE'),
+            help="The logfile to store the webserver access log. Use '-' to 
print to "
+                 "stderr."),
+        'error_logfile': Arg(
+            ("-E", "--error_logfile"),
+            default=conf.get('webserver', 'ERROR_LOGFILE'),
+            help="The logfile to store the webserver error log. Use '-' to 
print to "
+                 "stderr."),
+        # scheduler
+        'dag_id_opt': Arg(("-d", "--dag_id"), help="The id of the dag to run"),
+        'run_duration': Arg(
+            ("-r", "--run-duration"),
+            default=None, type=int,
+            help="Set number of seconds to execute before exiting"),
+        'num_runs': Arg(
+            ("-n", "--num_runs"),
+            default=-1, type=int,
+            help="Set the number of runs to execute before exiting"),
+        # worker
+        'do_pickle': Arg(
+            ("-p", "--do_pickle"),
+            default=False,
+            help=(
+                "Attempt to pickle the DAG object to send over "
+                "to the workers, instead of letting workers run their version "
+                "of the code."),
+            action="store_true"),
+        'queues': Arg(
+            ("-q", "--queues"),
+            help="Comma delimited list of queues to serve",
+            default=conf.get('celery', 'DEFAULT_QUEUE')),
+        'concurrency': Arg(
+            ("-c", "--concurrency"),
+            type=int,
+            help="The number of worker processes",
+            default=conf.get('celery', 'worker_concurrency')),
+        'celery_hostname': Arg(
+            ("-cn", "--celery_hostname"),
+            help=("Set the hostname of celery worker "
+                  "if you have multiple workers on a single machine.")),
+        # flower
+        'broker_api': Arg(("-a", "--broker_api"), help="Broker api"),
+        'flower_hostname': Arg(
+            ("-hn", "--hostname"),
+            default=conf.get('celery', 'FLOWER_HOST'),
+            help="Set the hostname on which to run the server"),
+        'flower_port': Arg(
+            ("-p", "--port"),
+            default=conf.get('celery', 'FLOWER_PORT'),
+            type=int,
+            help="The port on which to run the server"),
+        'flower_conf': Arg(
+            ("-fc", "--flower_conf"),
+            help="Configuration file for flower"),
+        'flower_url_prefix': Arg(
+            ("-u", "--url_prefix"),
+            default=conf.get('celery', 'FLOWER_URL_PREFIX'),
+            help="URL prefix for Flower"),
+        'task_params': Arg(
+            ("-tp", "--task_params"),
+            help="Sends a JSON params dict to the task"),
+        # connections
+        'list_connections': Arg(
+            ('-l', '--list'),
+            help='List all connections',
+            action='store_true'),
+        'add_connection': Arg(
+            ('-a', '--add'),
+            help='Add a connection',
+            action='store_true'),
+        'delete_connection': Arg(
+            ('-d', '--delete'),
+            help='Delete a connection',
+            action='store_true'),
+        'conn_id': Arg(
+            ('--conn_id',),
+            help='Connection id, required to add/delete a connection',
+            type=str),
+        'conn_uri': Arg(
+            ('--conn_uri',),
+            help='Connection URI, required to add a connection without 
conn_type',
+            type=str),
+        'conn_type': Arg(
+            ('--conn_type',),
+            help='Connection type, required to add a connection without 
conn_uri',
+            type=str),
+        'conn_host': Arg(
+            ('--conn_host',),
+            help='Connection host, optional when adding a connection',
+            type=str),
+        'conn_login': Arg(
+            ('--conn_login',),
+            help='Connection login, optional when adding a connection',
+            type=str),
+        'conn_password': Arg(
+            ('--conn_password',),
+            help='Connection password, optional when adding a connection',
+            type=str),
+        'conn_schema': Arg(
+            ('--conn_schema',),
+            help='Connection schema, optional when adding a connection',
+            type=str),
+        'conn_port': Arg(
+            ('--conn_port',),
+            help='Connection port, optional when adding a connection',
+            type=str),
+        'conn_extra': Arg(
+            ('--conn_extra',),
+            help='Connection `Extra` field, optional when adding a connection',
+            type=str),
+        # create_user
+        'role': Arg(
+            ('-r', '--role',),
+            help='Role of the user. Existing roles include Admin, '
+                 'User, Op, Viewer, and Public',
+            type=str),
+        'firstname': Arg(
+            ('-f', '--firstname',),
+            help='First name of the user',
+            type=str),
+        'lastname': Arg(
+            ('-l', '--lastname',),
+            help='Last name of the user',
+            type=str),
+        'email': Arg(
+            ('-e', '--email',),
+            help='Email of the user',
+            type=str),
+        'username': Arg(
+            ('-u', '--username',),
+            help='Username of the user',
+            type=str),
+    }
+    subparsers = (
+        {
+            'func': backfill,
+            'help': "Run subsections of a DAG for a specified date range",
+            'args': (
+                'dag_id', 'task_regex', 'start_date', 'end_date',
+                'mark_success', 'local', 'donot_pickle', 'include_adhoc',
+                'bf_ignore_dependencies', 'bf_ignore_first_depends_on_past',
+                'subdir', 'pool', 'delay_on_limit', 'dry_run')
+        }, {
+            'func': list_tasks,
+            'help': "List the tasks within a DAG",
+            'args': ('dag_id', 'tree', 'subdir'),
+        }, {
+            'func': clear,
+            'help': "Clear a set of task instance, as if they never ran",
+            'args': (
+                'dag_id', 'task_regex', 'start_date', 'end_date', 'subdir',
+                'upstream', 'downstream', 'no_confirm', 'only_failed',
+                'only_running', 'exclude_subdags', 'dag_regex'),
+        }, {
+            'func': pause,
+            'help': "Pause a DAG",
+            'args': ('dag_id', 'subdir'),
+        }, {
+            'func': unpause,
+            'help': "Resume a paused DAG",
+            'args': ('dag_id', 'subdir'),
+        }, {
+            'func': trigger_dag,
+            'help': "Trigger a DAG run",
+            'args': ('dag_id', 'subdir', 'run_id', 'conf', 'exec_date'),
+        }, {
+            'func': delete_dag,
+            'help': "Delete all DB records related to the specified DAG",
+            'args': ('dag_id', 'yes',),
+        }, {
+            'func': pool,
+            'help': "CRUD operations on pools",
+            "args": ('pool_set', 'pool_get', 'pool_delete'),
+        }, {
+            'func': variables,
+            'help': "CRUD operations on variables",
+            "args": ('set', 'get', 'json', 'default', 'var_import', 
'var_export', 'var_delete'),
+        }, {
+            'func': kerberos,
+            'help': "Start a kerberos ticket renewer",
+            'args': ('principal', 'keytab', 'pid',
+                     'daemon', 'stdout', 'stderr', 'log_file'),
+        }, {
+            'func': render,
+            'help': "Render a task instance's template(s)",
+            'args': ('dag_id', 'task_id', 'execution_date', 'subdir'),
+        }, {
+            'func': run,
+            'help': "Run a single task instance",
+            'args': (
+                'dag_id', 'task_id', 'execution_date', 'subdir',
+                'mark_success', 'force', 'pool', 'cfg_path',
+                'local', 'raw', 'ignore_all_dependencies', 
'ignore_dependencies',
+                'ignore_depends_on_past', 'ship_dag', 'pickle', 'job_id', 
'interactive',),
+        }, {
+            'func': initdb,
+            'help': "Initialize the metadata database",
+            'args': tuple(),
+        }, {
+            'func': list_dags,
+            'help': "List all the DAGs",
+            'args': ('subdir', 'report'),
+        }, {
+            'func': dag_state,
+            'help': "Get the status of a dag run",
+            'args': ('dag_id', 'execution_date', 'subdir'),
+        }, {
+            'func': task_failed_deps,
+            'help': (
+                "Returns the unmet dependencies for a task instance from the 
perspective "
+                "of the scheduler. In other words, why a task instance doesn't 
get "
+                "scheduled and then queued by the scheduler, and then run by 
an "
+                "executor)."),
+            'args': ('dag_id', 'task_id', 'execution_date', 'subdir'),
+        }, {
+            'func': task_state,
+            'help': "Get the status of a task instance",
+            'args': ('dag_id', 'task_id', 'execution_date', 'subdir'),
+        }, {
+            'func': serve_logs,
+            'help': "Serve logs generate by worker",
+            'args': tuple(),
+        }, {
+            'func': test,
+            'help': (
+                "Test a task instance. This will run a task without checking 
for "
+                "dependencies or recording its state in the database."),
+            'args': (
+                'dag_id', 'task_id', 'execution_date', 'subdir', 'dry_run',
+                'task_params'),
+        }, {
+            'func': webserver,
+            'help': "Start a Airflow webserver instance",
+            'args': ('port', 'workers', 'workerclass', 'worker_timeout', 
'hostname',
+                     'pid', 'daemon', 'stdout', 'stderr', 'access_logfile',
+                     'error_logfile', 'log_file', 'ssl_cert', 'ssl_key', 
'debug'),
+        }, {
+            'func': resetdb,
+            'help': "Burn down and rebuild the metadata database",
+            'args': ('yes',),
+        }, {
+            'func': upgradedb,
+            'help': "Upgrade the metadata database to latest version",
+            'args': tuple(),
+        }, {
+            'func': scheduler,
+            'help': "Start a scheduler instance",
+            'args': ('dag_id_opt', 'subdir', 'run_duration', 'num_runs',
+                     'do_pickle', 'pid', 'daemon', 'stdout', 'stderr',
+                     'log_file'),
+        }, {
+            'func': worker,
+            'help': "Start a Celery worker node",
+            'args': ('do_pickle', 'queues', 'concurrency', 'celery_hostname',
+                     'pid', 'daemon', 'stdout', 'stderr', 'log_file'),
+        }, {
+            'func': flower,
+            'help': "Start a Celery Flower",
+            'args': ('flower_hostname', 'flower_port', 'flower_conf', 
'flower_url_prefix',
+                     'broker_api', 'pid', 'daemon', 'stdout', 'stderr', 
'log_file'),
+        }, {
+            'func': version,
+            'help': "Show the version",
+            'args': tuple(),
+        }, {
+            'func': connections,
+            'help': "List/Add/Delete connections",
+            'args': ('list_connections', 'add_connection', 'delete_connection',
+                     'conn_id', 'conn_uri', 'conn_extra') + 
tuple(alternative_conn_specs),
+        }, {
+            'func': create_user,
+            'help': "Create an admin account",
+            'args': ('role', 'username', 'email', 'firstname', 'lastname'),
+        },
+    )
+    subparsers_dict = {sp['func'].__name__: sp for sp in subparsers}
+    dag_subparsers = (
+        'list_tasks', 'backfill', 'test', 'run', 'pause', 'unpause')
+
+    @classmethod
+    def get_parser(cls, dag_parser=False):
+        parser = argparse.ArgumentParser()
+        subparsers = parser.add_subparsers(
+            help='sub-command help', dest='subcommand')
+        subparsers.required = True
+
+        subparser_list = cls.dag_subparsers if dag_parser else 
cls.subparsers_dict.keys()
+        for sub in subparser_list:
+            sub = cls.subparsers_dict[sub]
+            sp = subparsers.add_parser(sub['func'].__name__, help=sub['help'])
+            for arg in sub['args']:
+                if 'dag_id' in arg and dag_parser:
+                    continue
+                arg = cls.args[arg]
+                kwargs = {
+                    f: getattr(arg, f)
+                    for f in arg._fields if f != 'flags' and getattr(arg, f)}
+                sp.add_argument(*arg.flags, **kwargs)
+            sp.set_defaults(func=sub['func'])
+        return parser
+
+
+def get_parser():
+    return CLIFactory.get_parser()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/bin/cli/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli/__init__.py b/airflow/bin/cli/__init__.py
deleted file mode 100644
index 026f941..0000000
--- a/airflow/bin/cli/__init__.py
+++ /dev/null
@@ -1,3 +0,0 @@
-from cli import *
-from cli_factory import *
-from airflow import *

Reply via email to