http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/bin/cli/cli_factory.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli/cli_factory.py b/airflow/bin/cli/cli_factory.py
deleted file mode 100644
index 167643b..0000000
--- a/airflow/bin/cli/cli_factory.py
+++ /dev/null
@@ -1,493 +0,0 @@
-from collections import namedtuple
-from cli import *
-from dateutil.parser import parse as parsedate
-from airflow import settings
-from airflow import conf
-
-
-Arg = namedtuple(
-    'Arg', ['flags', 'help', 'action', 'default', 'nargs', 'type', 'choices', 
'metavar'])
-Arg.__new__.__defaults__ = (None, None, None, None, None, None, None)
-
-
-class CLIFactory(object):
-    args = {
-        # Shared
-        'dag_id': Arg(("dag_id",), "The id of the dag"),
-        'task_id': Arg(("task_id",), "The id of the task"),
-        'execution_date': Arg(
-            ("execution_date",), help="The execution date of the DAG",
-            type=parsedate),
-        'task_regex': Arg(
-            ("-t", "--task_regex"),
-            "The regex to filter specific task_ids to backfill (optional)"),
-        'subdir': Arg(
-            ("-sd", "--subdir"),
-            "File location or directory from which to look for the dag",
-            default=settings.DAGS_FOLDER),
-        'start_date': Arg(
-            ("-s", "--start_date"), "Override start_date YYYY-MM-DD",
-            type=parsedate),
-        'end_date': Arg(
-            ("-e", "--end_date"), "Override end_date YYYY-MM-DD",
-            type=parsedate),
-        'dry_run': Arg(
-            ("-dr", "--dry_run"), "Perform a dry run", "store_true"),
-        'pid': Arg(
-            ("--pid", ), "PID file location",
-            nargs='?'),
-        'daemon': Arg(
-            ("-D", "--daemon"), "Daemonize instead of running "
-                                "in the foreground",
-            "store_true"),
-        'stderr': Arg(
-            ("--stderr", ), "Redirect stderr to this file"),
-        'stdout': Arg(
-            ("--stdout", ), "Redirect stdout to this file"),
-        'log_file': Arg(
-            ("-l", "--log-file"), "Location of the log file"),
-
-        # backfill
-        'mark_success': Arg(
-            ("-m", "--mark_success"),
-            "Mark jobs as succeeded without running them", "store_true"),
-        'local': Arg(
-            ("-l", "--local"),
-            "Run the task using the LocalExecutor", "store_true"),
-        'donot_pickle': Arg(
-            ("-x", "--donot_pickle"), (
-                "Do not attempt to pickle the DAG object to send over "
-                "to the workers, just tell the workers to run their version "
-                "of the code."),
-            "store_true"),
-        'include_adhoc': Arg(
-            ("-a", "--include_adhoc"),
-            "Include dags with the adhoc parameter.", "store_true"),
-        'bf_ignore_dependencies': Arg(
-            ("-i", "--ignore_dependencies"),
-            (
-                "Skip upstream tasks, run only the tasks "
-                "matching the regexp. Only works in conjunction "
-                "with task_regex"),
-            "store_true"),
-        'bf_ignore_first_depends_on_past': Arg(
-            ("-I", "--ignore_first_depends_on_past"),
-            (
-                "Ignores depends_on_past dependencies for the first "
-                "set of tasks only (subsequent executions in the backfill "
-                "DO respect depends_on_past)."),
-            "store_true"),
-        'pool': Arg(("--pool",), "Resource pool to use"),
-        # list_tasks
-        'tree': Arg(("-t", "--tree"), "Tree view", "store_true"),
-        # list_dags
-        'report': Arg(
-            ("-r", "--report"), "Show DagBag loading report", "store_true"),
-        # clear
-        'upstream': Arg(
-            ("-u", "--upstream"), "Include upstream tasks", "store_true"),
-        'only_failed': Arg(
-            ("-f", "--only_failed"), "Only failed jobs", "store_true"),
-        'only_running': Arg(
-            ("-r", "--only_running"), "Only running jobs", "store_true"),
-        'downstream': Arg(
-            ("-d", "--downstream"), "Include downstream tasks", "store_true"),
-        'no_confirm': Arg(
-            ("-c", "--no_confirm"),
-            "Do not request confirmation", "store_true"),
-        'exclude_subdags': Arg(
-            ("-x", "--exclude_subdags"),
-            "Exclude subdags", "store_true"),
-        # trigger_dag
-        'run_id': Arg(("-r", "--run_id"), "Helps to identify this run"),
-        'conf': Arg(
-            ('-c', '--conf'),
-            "JSON string that gets pickled into the DagRun's conf attribute"),
-        'exec_date': Arg(
-            ("-e", "--exec_date"), help="The execution date of the DAG",
-            type=parsedate),
-        # pool
-        'pool_set': Arg(
-            ("-s", "--set"),
-            nargs=3,
-            metavar=('NAME', 'SLOT_COUNT', 'POOL_DESCRIPTION'),
-            help="Set pool slot count and description, respectively"),
-        'pool_get': Arg(
-            ("-g", "--get"),
-            metavar='NAME',
-            help="Get pool info"),
-        'pool_delete': Arg(
-            ("-x", "--delete"),
-            metavar="NAME",
-            help="Delete a pool"),
-        # variables
-        'set': Arg(
-            ("-s", "--set"),
-            nargs=2,
-            metavar=('KEY', 'VAL'),
-            help="Set a variable"),
-        'get': Arg(
-            ("-g", "--get"),
-            metavar='KEY',
-            help="Get value of a variable"),
-        'default': Arg(
-            ("-d", "--default"),
-            metavar="VAL",
-            default=None,
-            help="Default value returned if variable does not exist"),
-        'json': Arg(
-            ("-j", "--json"),
-            help="Deserialize JSON variable",
-            action="store_true"),
-        'var_import': Arg(
-            ("-i", "--import"),
-            metavar="FILEPATH",
-            help="Import variables from JSON file"),
-        'var_export': Arg(
-            ("-e", "--export"),
-            metavar="FILEPATH",
-            help="Export variables to JSON file"),
-        'var_delete': Arg(
-            ("-x", "--delete"),
-            metavar="KEY",
-            help="Delete a variable"),
-        # kerberos
-        'principal': Arg(
-            ("principal",), "kerberos principal",
-            nargs='?', default=conf.get('kerberos', 'principal')),
-        'keytab': Arg(
-            ("-kt", "--keytab"), "keytab",
-            nargs='?', default=conf.get('kerberos', 'keytab')),
-        # run
-        # TODO(aoen): "force" is a poor choice of name here since it implies 
it overrides
-        # all dependencies (not just past success), e.g. the 
ignore_depends_on_past
-        # dependency. This flag should be deprecated and renamed to 
'ignore_ti_state' and
-        # the "ignore_all_dependencies" command should be called the"force" 
command
-        # instead.
-        'force': Arg(
-            ("-f", "--force"),
-            "Ignore previous task instance state, rerun regardless if task 
already "
-            "succeeded/failed",
-            "store_true"),
-        'raw': Arg(("-r", "--raw"), argparse.SUPPRESS, "store_true"),
-        'ignore_all_dependencies': Arg(
-            ("-A", "--ignore_all_dependencies"),
-            "Ignores all non-critical dependencies, including ignore_ti_state 
and "
-            "ignore_task_deps",
-            "store_true"),
-        # TODO(aoen): ignore_dependencies is a poor choice of name here 
because it is too
-        # vague (e.g. a task being in the appropriate state to be run is also 
a dependency
-        # but is not ignored by this flag), the name 
'ignore_task_dependencies' is
-        # slightly better (as it ignores all dependencies that are specific to 
the task),
-        # so deprecate the old command name and use this instead.
-        'ignore_dependencies': Arg(
-            ("-i", "--ignore_dependencies"),
-            "Ignore task-specific dependencies, e.g. upstream, 
depends_on_past, and "
-            "retry delay dependencies",
-            "store_true"),
-        'ignore_depends_on_past': Arg(
-            ("-I", "--ignore_depends_on_past"),
-            "Ignore depends_on_past dependencies (but respect "
-            "upstream dependencies)",
-            "store_true"),
-        'ship_dag': Arg(
-            ("--ship_dag",),
-            "Pickles (serializes) the DAG and ships it to the worker",
-            "store_true"),
-        'kubernetes_mode': Arg(
-            ("-km", "--kubernetes_mode"),
-            "Pod will not contact postgres instance. should only be used for 
kubernetes tasks",
-            "store_true"),
-        'pickle': Arg(
-            ("-p", "--pickle"),
-            "Serialized pickle object of the entire dag (used internally)"),
-        'job_id': Arg(("-j", "--job_id"), argparse.SUPPRESS),
-        'cfg_path': Arg(
-            ("--cfg_path", ), "Path to config file to use instead of 
airflow.cfg"),
-        # webserver
-        'port': Arg(
-            ("-p", "--port"),
-            default=conf.get('webserver', 'WEB_SERVER_PORT'),
-            type=int,
-            help="The port on which to run the server"),
-        'ssl_cert': Arg(
-            ("--ssl_cert", ),
-            default=conf.get('webserver', 'WEB_SERVER_SSL_CERT'),
-            help="Path to the SSL certificate for the webserver"),
-        'ssl_key': Arg(
-            ("--ssl_key", ),
-            default=conf.get('webserver', 'WEB_SERVER_SSL_KEY'),
-            help="Path to the key to use with the SSL certificate"),
-        'workers': Arg(
-            ("-w", "--workers"),
-            default=conf.get('webserver', 'WORKERS'),
-            type=int,
-            help="Number of workers to run the webserver on"),
-        'workerclass': Arg(
-            ("-k", "--workerclass"),
-            default=conf.get('webserver', 'WORKER_CLASS'),
-            choices=['sync', 'eventlet', 'gevent', 'tornado'],
-            help="The worker class to use for Gunicorn"),
-        'worker_timeout': Arg(
-            ("-t", "--worker_timeout"),
-            default=conf.get('webserver', 'WEB_SERVER_WORKER_TIMEOUT'),
-            type=int,
-            help="The timeout for waiting on webserver workers"),
-        'hostname': Arg(
-            ("-hn", "--hostname"),
-            default=conf.get('webserver', 'WEB_SERVER_HOST'),
-            help="Set the hostname on which to run the web server"),
-        'debug': Arg(
-            ("-d", "--debug"),
-            "Use the server that ships with Flask in debug mode",
-            "store_true"),
-        'access_logfile': Arg(
-            ("-A", "--access_logfile"),
-            default=conf.get('webserver', 'ACCESS_LOGFILE'),
-            help="The logfile to store the webserver access log. Use '-' to 
print to "
-                 "stderr."),
-        'error_logfile': Arg(
-            ("-E", "--error_logfile"),
-            default=conf.get('webserver', 'ERROR_LOGFILE'),
-            help="The logfile to store the webserver error log. Use '-' to 
print to "
-                 "stderr."),
-        # resetdb
-        'yes': Arg(
-            ("-y", "--yes"),
-            "Do not prompt to confirm reset. Use with care!",
-            "store_true",
-            default=False),
-        # scheduler
-        'dag_id_opt': Arg(("-d", "--dag_id"), help="The id of the dag to run"),
-        'run_duration': Arg(
-            ("-r", "--run-duration"),
-            default=None, type=int,
-            help="Set number of seconds to execute before exiting"),
-        'num_runs': Arg(
-            ("-n", "--num_runs"),
-            default=-1, type=int,
-            help="Set the number of runs to execute before exiting"),
-        # worker
-        'do_pickle': Arg(
-            ("-p", "--do_pickle"),
-            default=False,
-            help=(
-                "Attempt to pickle the DAG object to send over "
-                "to the workers, instead of letting workers run their version "
-                "of the code."),
-            action="store_true"),
-        'queues': Arg(
-            ("-q", "--queues"),
-            help="Comma delimited list of queues to serve",
-            default=conf.get('celery', 'DEFAULT_QUEUE')),
-        'concurrency': Arg(
-            ("-c", "--concurrency"),
-            type=int,
-            help="The number of worker processes",
-            default=conf.get('celery', 'celeryd_concurrency')),
-        # flower
-        'broker_api': Arg(("-a", "--broker_api"), help="Broker api"),
-        'flower_hostname': Arg(
-            ("-hn", "--hostname"),
-            default=conf.get('celery', 'FLOWER_HOST'),
-            help="Set the hostname on which to run the server"),
-        'flower_port': Arg(
-            ("-p", "--port"),
-            default=conf.get('celery', 'FLOWER_PORT'),
-            type=int,
-            help="The port on which to run the server"),
-        'flower_conf': Arg(
-            ("-fc", "--flower_conf"),
-            help="Configuration file for flower"),
-        'task_params': Arg(
-            ("-tp", "--task_params"),
-            help="Sends a JSON params dict to the task"),
-        # connections
-        'list_connections': Arg(
-            ('-l', '--list'),
-            help='List all connections',
-            action='store_true'),
-        'add_connection': Arg(
-            ('-a', '--add'),
-            help='Add a connection',
-            action='store_true'),
-        'delete_connection': Arg(
-            ('-d', '--delete'),
-            help='Delete a connection',
-            action='store_true'),
-        'conn_id': Arg(
-            ('--conn_id',),
-            help='Connection id, required to add/delete a connection',
-            type=str),
-        'conn_uri': Arg(
-            ('--conn_uri',),
-            help='Connection URI, required to add a connection',
-            type=str),
-        'conn_extra': Arg(
-            ('--conn_extra',),
-            help='Connection `Extra` field, optional when adding a connection',
-            type=str),
-    }
-    subparsers = (
-        {
-            'func': backfill,
-            'help': "Run subsections of a DAG for a specified date range",
-            'args': (
-                'dag_id', 'task_regex', 'start_date', 'end_date',
-                'mark_success', 'local', 'donot_pickle', 'include_adhoc',
-                'bf_ignore_dependencies', 'bf_ignore_first_depends_on_past',
-                'subdir', 'pool', 'dry_run')
-        }, {
-            'func': list_tasks,
-            'help': "List the tasks within a DAG",
-            'args': ('dag_id', 'tree', 'subdir'),
-        }, {
-            'func': clear,
-            'help': "Clear a set of task instance, as if they never ran",
-            'args': (
-                'dag_id', 'task_regex', 'start_date', 'end_date', 'subdir',
-                'upstream', 'downstream', 'no_confirm', 'only_failed',
-                'only_running', 'exclude_subdags'),
-        }, {
-            'func': pause,
-            'help': "Pause a DAG",
-            'args': ('dag_id', 'subdir'),
-        }, {
-            'func': unpause,
-            'help': "Resume a paused DAG",
-            'args': ('dag_id', 'subdir'),
-        }, {
-            'func': trigger_dag,
-            'help': "Trigger a DAG run",
-            'args': ('dag_id', 'subdir', 'run_id', 'conf', 'exec_date'),
-        }, {
-            'func': pool,
-            'help': "CRUD operations on pools",
-            "args": ('pool_set', 'pool_get', 'pool_delete'),
-        }, {
-            'func': variables,
-            'help': "CRUD operations on variables",
-            "args": ('set', 'get', 'json', 'default', 'var_import', 
'var_export', 'var_delete'),
-        }, {
-            'func': kerberos,
-            'help': "Start a kerberos ticket renewer",
-            'args': ('principal', 'keytab', 'pid',
-                     'daemon', 'stdout', 'stderr', 'log_file'),
-        }, {
-            'func': render,
-            'help': "Render a task instance's template(s)",
-            'args': ('dag_id', 'task_id', 'execution_date', 'subdir'),
-        }, {
-            'func': run,
-            'help': "Run a single task instance",
-            'args': (
-                'dag_id', 'task_id', 'execution_date', 'subdir',
-                'mark_success', 'force', 'pool', 'cfg_path',
-                'local', 'raw', 'ignore_all_dependencies', 
'ignore_dependencies',
-                'ignore_depends_on_past', 'ship_dag', 'pickle', 'job_id', 
'kubernetes_mode'),
-        }, {
-            'func': initdb,
-            'help': "Initialize the metadata database",
-            'args': tuple(),
-        }, {
-            'func': list_dags,
-            'help': "List all the DAGs",
-            'args': ('subdir', 'report'),
-        }, {
-            'func': dag_state,
-            'help': "Get the status of a dag run",
-            'args': ('dag_id', 'execution_date', 'subdir'),
-        }, {
-            'func': task_failed_deps,
-            'help': (
-                "Returns the unmet dependencies for a task instance from the 
perspective "
-                "of the scheduler. In other words, why a task instance doesn't 
get "
-                "scheduled and then queued by the scheduler, and then run by 
an "
-                "executor)."),
-            'args': ('dag_id', 'task_id', 'execution_date', 'subdir'),
-        }, {
-            'func': task_state,
-            'help': "Get the status of a task instance",
-            'args': ('dag_id', 'task_id', 'execution_date', 'subdir'),
-        }, {
-            'func': serve_logs,
-            'help': "Serve logs generate by worker",
-            'args': tuple(),
-        }, {
-            'func': test,
-            'help': (
-                "Test a task instance. This will run a task without checking 
for "
-                "dependencies or recording it's state in the database."),
-            'args': (
-                'dag_id', 'task_id', 'execution_date', 'subdir', 'dry_run',
-                'task_params'),
-        }, {
-            'func': webserver,
-            'help': "Start a Airflow webserver instance",
-            'args': ('port', 'workers', 'workerclass', 'worker_timeout', 
'hostname',
-                     'pid', 'daemon', 'stdout', 'stderr', 'access_logfile',
-                     'error_logfile', 'log_file', 'ssl_cert', 'ssl_key', 
'debug'),
-        }, {
-            'func': resetdb,
-            'help': "Burn down and rebuild the metadata database",
-            'args': ('yes',),
-        }, {
-            'func': upgradedb,
-            'help': "Upgrade the metadata database to latest version",
-            'args': tuple(),
-        }, {
-            'func': scheduler,
-            'help': "Start a scheduler instance",
-            'args': ('dag_id_opt', 'subdir', 'run_duration', 'num_runs',
-                     'do_pickle', 'pid', 'daemon', 'stdout', 'stderr',
-                     'log_file'),
-        }, {
-            'func': worker,
-            'help': "Start a Celery worker node",
-            'args': ('do_pickle', 'queues', 'concurrency',
-                     'pid', 'daemon', 'stdout', 'stderr', 'log_file'),
-        }, {
-            'func': flower,
-            'help': "Start a Celery Flower",
-            'args': ('flower_hostname', 'flower_port', 'flower_conf', 
'broker_api',
-                     'pid', 'daemon', 'stdout', 'stderr', 'log_file'),
-        }, {
-            'func': version,
-            'help': "Show the version",
-            'args': tuple(),
-        }, {
-            'func': connections,
-            'help': "List/Add/Delete connections",
-            'args': ('list_connections', 'add_connection', 'delete_connection',
-                     'conn_id', 'conn_uri', 'conn_extra'),
-        },
-    )
-    subparsers_dict = {sp['func'].__name__: sp for sp in subparsers}
-    dag_subparsers = (
-        'list_tasks', 'backfill', 'test', 'run', 'pause', 'unpause')
-
-    @classmethod
-    def get_parser(cls, dag_parser=False):
-        parser = argparse.ArgumentParser()
-        subparsers = parser.add_subparsers(
-            help='sub-command help', dest='subcommand')
-        subparsers.required = True
-
-        subparser_list = cls.dag_subparsers if dag_parser else 
cls.subparsers_dict.keys()
-        for sub in subparser_list:
-            sub = cls.subparsers_dict[sub]
-            sp = subparsers.add_parser(sub['func'].__name__, help=sub['help'])
-            for arg in sub['args']:
-                if 'dag_id' in arg and dag_parser:
-                    continue
-                arg = cls.args[arg]
-                kwargs = {
-                    f: getattr(arg, f)
-                    for f in arg._fields if f != 'flags' and getattr(arg, f)}
-                sp.add_argument(*arg.flags, **kwargs)
-            sp.set_defaults(func=sub['func'])
-        return parser
-
-
-def get_parser():
-    return CLIFactory.get_parser()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/contrib/executors/kubernetes_executor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/executors/kubernetes_executor.py 
b/airflow/contrib/executors/kubernetes_executor.py
index a5aa1e1..49993a8 100644
--- a/airflow/contrib/executors/kubernetes_executor.py
+++ b/airflow/contrib/executors/kubernetes_executor.py
@@ -1,21 +1,22 @@
-# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
 #
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+#   http://www.apache.org/licenses/LICENSE-2.0
 #
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 
 import base64
-import os
 import multiprocessing
-import six
 from queue import Queue
 from dateutil import parser
 from uuid import uuid4
@@ -31,13 +32,12 @@ from airflow.models import TaskInstance, KubeResourceVersion
 from airflow.utils.state import State
 from airflow import configuration, settings
 from airflow.exceptions import AirflowConfigException
-from airflow.contrib.kubernetes.pod import Pod, Resources
 from airflow.utils.log.logging_mixin import LoggingMixin
 
 
 class KubernetesExecutorConfig:
-
-    def __init__(self, image=None, request_memory=None, request_cpu=None, 
limit_memory=None, limit_cpu=None):
+    def __init__(self, image=None, request_memory=None, request_cpu=None,
+                 limit_memory=None, limit_cpu=None):
         self.image = image
         self.request_memory = request_memory
         self.request_cpu = request_cpu
@@ -45,10 +45,16 @@ class KubernetesExecutorConfig:
         self.limit_cpu = limit_cpu
 
     def __repr__(self):
-        return "{}(image={}, request_memory={} ,request_cpu={}, 
limit_memory={}, limit_cpu={})".format(
-            KubernetesExecutorConfig.__name__,
-            self.image, self.request_memory, self.request_cpu, 
self.limit_memory,self.limit_cpu
-        )
+        return "{}(image={}, request_memory={} ,request_cpu={}, 
limit_memory={}, " \
+               "limit_cpu={})"\
+            .format(
+                KubernetesExecutorConfig.__name__,
+                self.image,
+                self.request_memory,
+                self.request_cpu,
+                self.limit_memory,
+                self.limit_cpu
+            )
 
     @staticmethod
     def from_dict(obj):
@@ -56,7 +62,8 @@ class KubernetesExecutorConfig:
             return KubernetesExecutorConfig()
 
         if not isinstance(obj, dict):
-            raise TypeError("Cannot convert a non-dictionary object into a 
KubernetesExecutorConfig")
+            raise TypeError(
+                "Cannot convert a non-dictionary object into a 
KubernetesExecutorConfig")
 
         namespaced = obj.get(Executors.KubernetesExecutor, {})
 
@@ -125,25 +132,33 @@ class KubeConfig:
         # Optionally, the directory in the git repository containing the dags
         self.git_subpath = self.safe_get(self.kubernetes_section, 
'git_subpath', '')
 
-        # Optionally a user may supply a `git_user` and `git_password` for 
private repositories
+        # Optionally a user may supply a `git_user` and `git_password` for 
private
+        # repositories
         self.git_user = self.safe_get(self.kubernetes_section, 'git_user', 
None)
         self.git_password = self.safe_get(self.kubernetes_section, 
'git_password', None)
 
-        # NOTE: The user may optionally use a volume claim to mount a PV 
containing DAGs directly
-        self.dags_volume_claim = self.safe_get(self.kubernetes_section, 
'dags_volume_claim', None)
+        # NOTE: The user may optionally use a volume claim to mount a PV 
containing
+        # DAGs directly
+        self.dags_volume_claim = self.safe_get(self.kubernetes_section,
+                                               'dags_volume_claim', None)
 
-        # This prop may optionally be set for PV Claims and is used to locate 
DAGs on a SubPath
+        # This prop may optionally be set for PV Claims and is used to locate 
DAGs on a
+        #  SubPath
         self.dags_volume_subpath = self.safe_get(
             self.kubernetes_section, 'dags_volume_subpath', None)
 
-        # The Kubernetes Namespace in which the Scheduler and Webserver 
reside. Note that if your
+        # The Kubernetes Namespace in which the Scheduler and Webserver 
reside. Note
+        # that if your
         # cluster has RBAC enabled, your scheduler may need service account 
permissions to
         # create, watch, get, and delete pods in this namespace.
-        self.kube_namespace = self.safe_get(self.kubernetes_section, 
'namespace', 'default')
-        # The Kubernetes Namespace in which pods will be created by the 
executor. Note that if your
+        self.kube_namespace = self.safe_get(self.kubernetes_section, 
'namespace',
+                                            'default')
+        # The Kubernetes Namespace in which pods will be created by the 
executor. Note
+        # that if your
         # cluster has RBAC enabled, your workers may need service account 
permissions to
         # interact with cluster components.
-        self.executor_namespace = self.safe_get(self.kubernetes_section, 
'namespace', 'default')
+        self.executor_namespace = self.safe_get(self.kubernetes_section, 
'namespace',
+                                                'default')
         # Task secrets managed by KubernetesExecutor.
         self.gcp_service_account_keys = self.safe_get(
             self.kubernetes_section, 'gcp_service_account_keys', None)
@@ -162,15 +177,18 @@ class KubeConfig:
         self.git_sync_init_container_name = self.safe_get(
             self.kubernetes_section, 'git_sync_init_container_name', 
'git-sync-clone')
 
-        # The worker pod may optionally have a  valid Airflow config loaded 
via a configmap
-        self.airflow_configmap = self.safe_get(self.kubernetes_section, 
'airflow_configmap', None)
+        # The worker pod may optionally have a  valid Airflow config loaded 
via a
+        # configmap
+        self.airflow_configmap = self.safe_get(self.kubernetes_section,
+                                               'airflow_configmap', None)
 
         self._validate()
 
     def _validate(self):
         if not self.dags_volume_claim and (not self.git_repo or not 
self.git_branch):
             raise AirflowConfigException(
-                "In kubernetes mode the following must be set in the 
`kubernetes` config section: "
+                "In kubernetes mode the following must be set in the 
`kubernetes` "
+                "config section: "
                 "`dags_volume_claim` or `git_repo and git_branch` ")
 
 
@@ -196,7 +214,7 @@ class KubernetesJobWatcher(multiprocessing.Process, 
LoggingMixin, object):
     def _run(self, kube_client, resource_version):
         self.log.info(
             "Event: and now my watch begins starting at resource_version: {}"
-            .format(resource_version))
+                .format(resource_version))
         watcher = watch.Watch()
 
         kwargs = {"label_selector": "airflow-slave"}
@@ -204,10 +222,12 @@ class KubernetesJobWatcher(multiprocessing.Process, 
LoggingMixin, object):
             kwargs["resource_version"] = resource_version
 
         last_resource_version = None
-        for event in watcher.stream(kube_client.list_namespaced_pod, 
self.namespace, **kwargs):
+        for event in watcher.stream(kube_client.list_namespaced_pod, 
self.namespace,
+                                    **kwargs):
             task = event['object']
             self.log.info(
-                "Event: {} had an event of type {}".format(task.metadata.name, 
event['type']))
+                "Event: {} had an event of type {}".format(task.metadata.name,
+                                                           event['type']))
             self.process_status(
                 task.metadata.name, task.status.phase, task.metadata.labels,
                 task.metadata.resource_version
@@ -224,7 +244,7 @@ class KubernetesJobWatcher(multiprocessing.Process, 
LoggingMixin, object):
             self.watcher_queue.put((pod_id, State.FAILED, labels, 
resource_version))
         elif status == 'Succeeded':
             self.log.info("Event: {} Succeeded".format(pod_id))
-            self.watcher_queue.put((pod_id, None, labels, resource_version))
+            self.watcher_queue.put((pod_id, State.SUCCESS, labels, 
resource_version))
         elif status == 'Running':
             self.log.info("Event: {} is Running".format(pod_id))
         else:
@@ -250,7 +270,8 @@ class AirflowKubernetesScheduler(LoggingMixin, object):
 
     def _make_kube_watcher(self):
         resource_version = 
KubeResourceVersion.get_current_resource_version(self._session)
-        watcher = KubernetesJobWatcher(self.namespace, self.watcher_queue, 
resource_version)
+        watcher = KubernetesJobWatcher(self.namespace, self.watcher_queue,
+                                       resource_version)
         watcher.start()
         return watcher
 
@@ -278,7 +299,7 @@ class AirflowKubernetesScheduler(LoggingMixin, object):
         self.log.debug("k8s: launching image 
{}".format(self.kube_config.kube_image))
         pod = self.worker_configuration.make_pod(
             namespace=self.namespace, pod_id=self._create_pod_id(dag_id, 
task_id),
-            dag_id=dag_id, task_id=task_id, 
+            dag_id=dag_id, task_id=task_id,
             
execution_date=self._datetime_to_label_safe_datestring(execution_date),
             airflow_command=command, kube_executor_config=kube_executor_config
         )
@@ -299,7 +320,7 @@ class AirflowKubernetesScheduler(LoggingMixin, object):
         """
 
         The sync function checks the status of all currently running 
kubernetes jobs.
-        If a job is completed, it's status is placed in the result queue to 
+        If a job is completed, it's status is placed in the result queue to
         be sent back to the scheduler.
 
         :return:
@@ -313,8 +334,8 @@ class AirflowKubernetesScheduler(LoggingMixin, object):
         pod_id, state, labels, resource_version = self.watcher_queue.get()
         self.log.info(
             "Attempting to finish pod; pod_id: {}; state: {}; labels: {}"
-            .format(pod_id, state, labels))
-        key = self._labels_to_key(labels)
+                .format(pod_id, state, labels))
+        key = self._labels_to_key(labels=labels)
         if key:
             self.log.debug("finishing job {} - {} ({})".format(key, state, 
pod_id))
             self.result_queue.put((key, state, pod_id, resource_version))
@@ -322,8 +343,10 @@ class AirflowKubernetesScheduler(LoggingMixin, object):
     @staticmethod
     def _strip_unsafe_kubernetes_special_chars(string):
         """
-        Kubernetes only supports lowercase alphanumeric characters and "-" and 
"." in the pod name
-        However, there are special rules about how "-" and "." can be used so 
let's only keep
+        Kubernetes only supports lowercase alphanumeric characters and "-" and 
"." in
+        the pod name
+        However, there are special rules about how "-" and "." can be used so 
let's
+        only keep
         alphanumeric chars  see here for detail:
         
https://kubernetes.io/docs/concepts/overview/working-with-objects/names/
 
@@ -335,7 +358,8 @@ class AirflowKubernetesScheduler(LoggingMixin, object):
     @staticmethod
     def _make_safe_pod_id(safe_dag_id, safe_task_id, safe_uuid):
         """
-        Kubernetes pod names must be <= 253 chars and must pass the following 
regex for validation
+        Kubernetes pod names must be <= 253 chars and must pass the following 
regex for
+        validation
         "^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$"
 
         :param safe_dag_id: a dag_id with only alphanumeric characters
@@ -347,37 +371,45 @@ class AirflowKubernetesScheduler(LoggingMixin, object):
 
         safe_key = safe_dag_id + safe_task_id
 
-        safe_pod_id = safe_key[:MAX_POD_ID_LEN-len(safe_uuid)-1] + "-" + 
safe_uuid
+        safe_pod_id = safe_key[:MAX_POD_ID_LEN - len(safe_uuid) - 1] + "-" + 
safe_uuid
 
         return safe_pod_id
 
     @staticmethod
     def _create_pod_id(dag_id, task_id):
-        safe_dag_id = 
AirflowKubernetesScheduler._strip_unsafe_kubernetes_special_chars(dag_id)
-        safe_task_id = 
AirflowKubernetesScheduler._strip_unsafe_kubernetes_special_chars(task_id)
-        safe_uuid = 
AirflowKubernetesScheduler._strip_unsafe_kubernetes_special_chars(uuid4().hex)
-        return AirflowKubernetesScheduler._make_safe_pod_id(safe_dag_id, 
safe_task_id, safe_uuid)
+        safe_dag_id = 
AirflowKubernetesScheduler._strip_unsafe_kubernetes_special_chars(
+            dag_id)
+        safe_task_id = 
AirflowKubernetesScheduler._strip_unsafe_kubernetes_special_chars(
+            task_id)
+        safe_uuid = 
AirflowKubernetesScheduler._strip_unsafe_kubernetes_special_chars(
+            uuid4().hex)
+        return AirflowKubernetesScheduler._make_safe_pod_id(safe_dag_id, 
safe_task_id,
+                                                            safe_uuid)
 
     @staticmethod
     def _label_safe_datestring_to_datetime(string):
         """
-        Kubernetes doesn't permit ":" in labels. ISO datetime format uses ":" 
but not "_", let's
+        Kubernetes doesn't permit ":" in labels. ISO datetime format uses ":" 
but not
+        "_", let's
         replace ":" with "_"
 
         :param string: string
         :return: datetime.datetime object
         """
-        return parser.parse(string.replace("_", ":"))
+        print("unparsing date!")
+
+        return parser.parse(string.replace('_plus_', '+').replace("_", ":"))
 
     @staticmethod
     def _datetime_to_label_safe_datestring(datetime_obj):
         """
-        Kubernetes doesn't like ":" in labels, since ISO datetime format uses 
":" but not "_" let's
+        Kubernetes doesn't like ":" in labels, since ISO datetime format uses 
":" but
+        not "_" let's
         replace ":" with "_"
         :param datetime_obj: datetime.datetime object
         :return: ISO-like string representing the datetime
         """
-        return datetime_obj.isoformat().replace(":", "_")
+        return datetime_obj.isoformat().replace(":", "_").replace('+', 
'_plus_')
 
     def _labels_to_key(self, labels):
         try:
@@ -385,15 +417,16 @@ class AirflowKubernetesScheduler(LoggingMixin, object):
                 labels["dag_id"], labels["task_id"],
                 
self._label_safe_datestring_to_datetime(labels["execution_date"]))
         except Exception as e:
-            self.log.warn("Error while converting labels to key; labels: {}; 
exception: {}".format(
-                labels, e
-            ))
+            self.log.warn(
+                "Error while converting labels to key; labels: {}; exception: 
{}".format(
+                    labels, e
+                ))
             return None
 
 
 class KubernetesExecutor(BaseExecutor, LoggingMixin):
     def __init__(self):
-        super(KubernetesExecutor, self).__init__(parallelism=PARALLELISM)
+        self.kube_config = KubeConfig()
         self.task_queue = None
         self._session = None
         self.result_queue = None
@@ -403,32 +436,39 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
 
     def clear_not_launched_queued_tasks(self):
         """
-        If the airflow scheduler restarts with pending "Queued" tasks, the 
tasks may or may not
-        have been launched Thus, on starting up the scheduler let's check 
every "Queued" task to
+        If the airflow scheduler restarts with pending "Queued" tasks, the 
tasks may or
+        may not
+        have been launched Thus, on starting up the scheduler let's check every
+        "Queued" task to
         see if it has been launched (ie: if there is a corresponding pod on 
kubernetes)
 
-        If it has been launched then do nothing, otherwise reset the state to 
"None" so the task
+        If it has been launched then do nothing, otherwise reset the state to 
"None" so
+        the task
         will be rescheduled
 
-        This will not be necessary in a future version of airflow in which 
there is proper support
+        This will not be necessary in a future version of airflow in which 
there is
+        proper support
         for State.LAUNCHED
         """
         queued_tasks = self._session.query(
             TaskInstance).filter(TaskInstance.state == State.QUEUED).all()
         self.log.info(
-            "When executor started up, found {} queued task 
instances".format(len(queued_tasks)))
+            "When executor started up, found {} queued task instances".format(
+                len(queued_tasks)))
 
         for t in queued_tasks:
             kwargs = 
dict(label_selector="dag_id={},task_id={},execution_date={}".format(
                 t.dag_id, t.task_id,
-                
AirflowKubernetesScheduler._datetime_to_label_safe_datestring(t.execution_date)
+                AirflowKubernetesScheduler._datetime_to_label_safe_datestring(
+                    t.execution_date)
             ))
             pod_list = self.kube_client.list_namespaced_pod(
                 self.kube_config.kube_namespace, **kwargs)
             if len(pod_list.items) == 0:
                 self.log.info(
-                    "TaskInstance: {} found in queued state but was not 
launched, rescheduling"
-                    .format(t))
+                    "TaskInstance: {} found in queued state but was not 
launched, "
+                    "rescheduling"
+                        .format(t))
                 self._session.query(TaskInstance).filter(
                     TaskInstance.dag_id == t.dag_id,
                     TaskInstance.task_id == t.task_id,
@@ -442,17 +482,20 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
             try:
                 return self.kube_client.create_namespaced_secret(
                     self.kube_config.executor_namespace, 
kubernetes.client.V1Secret(
-                        data={'key.json' : base64.b64encode(open(secret_path, 
'r').read())},
+                        data={
+                            'key.json': base64.b64encode(open(secret_path, 
'r').read())},
                         
metadata=kubernetes.client.V1ObjectMeta(name=secret_name)))
             except ApiException as e:
                 if e.status == 409:
                     return self.kube_client.replace_namespaced_secret(
                         secret_name, self.kube_config.executor_namespace,
                         kubernetes.client.V1Secret(
-                            data={'key.json' : 
base64.b64encode(open(secret_path, 'r').read())},
+                            data={'key.json': base64.b64encode(
+                                open(secret_path, 'r').read())},
                             
metadata=kubernetes.client.V1ObjectMeta(name=secret_name)))
                 self.log.exception("Exception while trying to inject secret."
-                                      "Secret name: {}, error details: 
{}.".format(secret_name, e))
+                                   "Secret name: {}, error details: {}."
+                                   .format(secret_name, e))
                 raise
 
         # For each GCP service account key, inject it as a secret in executor
@@ -460,8 +503,8 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
         # We let exceptions to pass through to users.
         if self.kube_config.gcp_service_account_keys:
             name_path_pair_list = [
-                {'name' : account_spec.strip().split('=')[0],
-                 'path' : account_spec.strip().split('=')[1]}
+                {'name': account_spec.strip().split('=')[0],
+                 'path': account_spec.strip().split('=')[1]}
                 for account_spec in 
self.kube_config.gcp_service_account_keys.split(',')]
             for service_account in name_path_pair_list:
                 _create_or_update_secret(service_account['name'], 
service_account['path'])
@@ -473,16 +516,17 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
         self.result_queue = Queue()
         self.kube_client = get_kube_client()
         self.kube_scheduler = AirflowKubernetesScheduler(
-            self.kube_config, self.task_queue, self.result_queue, 
self._session, self.kube_client
+            self.kube_config, self.task_queue, self.result_queue, 
self._session,
+            self.kube_client
         )
         self._inject_secrets()
         self.clear_not_launched_queued_tasks()
 
-
     def execute_async(self, key, command, queue=None, executor_config=None):
-        self.log.info("k8s: adding task {} with command {} with 
executor_config {}".format(
-            key, command, executor_config
-        ))
+        self.log.info(
+            "k8s: adding task {} with command {} with executor_config 
{}".format(
+                key, command, executor_config
+            ))
         kube_executor_config = 
KubernetesExecutorConfig.from_dict(executor_config)
         self.task_queue.put((key, command, kube_executor_config))
 
@@ -496,7 +540,7 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
             results = self.result_queue.get()
             key, state, pod_id, resource_version = results
             last_resource_version = resource_version
-            self.log.info("Changing state of {}".format(results))
+            self.log.info("Changing state of {} to {}".format(results, state))
             self._change_state(key, state, pod_id)
 
         KubeResourceVersion.checkpoint_resource_version(
@@ -510,8 +554,10 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
         if state != State.RUNNING:
             self.kube_scheduler.delete_pod(pod_id)
             try:
+                self.log.info("popping: {}".format(str(key)))
                 self.running.pop(key)
-            except KeyError:
+            except KeyError as k:
+                print("{}".format(k))
                 pass
         self.event_buffer[key] = state
         (dag_id, task_id, ex_time) = key
@@ -529,7 +575,3 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
     def end(self):
         self.log.info('ending kube executor')
         self.task_queue.join()
-
-    def execute_async(self, key, command, queue=None):
-        self.logger.info("k8s: adding task {} with command {}".format(key, 
command))
-        self.task_queue.put((key, command))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/contrib/kubernetes/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/__init__.py 
b/airflow/contrib/kubernetes/__init__.py
index c82f579..13a8339 100644
--- a/airflow/contrib/kubernetes/__init__.py
+++ b/airflow/contrib/kubernetes/__init__.py
@@ -1,14 +1,16 @@
-# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
 #
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+#   http://www.apache.org/licenses/LICENSE-2.0
 #
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/contrib/kubernetes/kube_client.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kube_client.py 
b/airflow/contrib/kubernetes/kube_client.py
index 9603963..d1a63a2 100644
--- a/airflow/contrib/kubernetes/kube_client.py
+++ b/airflow/contrib/kubernetes/kube_client.py
@@ -1,26 +1,31 @@
-# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
 #
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+#   http://www.apache.org/licenses/LICENSE-2.0
 #
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 
 
-def get_kube_client(in_cluster=True):
-    # TODO: This should also allow people to point to a cluster.
-
+def _load_kube_config(in_cluster):
     from kubernetes import config, client
-
     if in_cluster:
         config.load_incluster_config()
         return client.CoreV1Api()
     else:
-        NotImplementedError(
-            "Running kubernetes jobs from not within the cluster is not 
supported at this time")
+        config.load_kube_config()
+        return client.CoreV1Api()
+
+
+def get_kube_client(in_cluster=True):
+    # TODO: This should also allow people to point to a cluster.
+    return _load_kube_config(in_cluster)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/contrib/kubernetes/kubernetes_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_factory.py 
b/airflow/contrib/kubernetes/kubernetes_factory.py
deleted file mode 100644
index 715075a..0000000
--- a/airflow/contrib/kubernetes/kubernetes_factory.py
+++ /dev/null
@@ -1,79 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-
-from airflow.contrib.kubernetes.kubernetes_request_factory import 
KubernetesRequestFactory
-
-
-class KubernetesResourceBuilder:
-    def __init__(
-        self,
-        image,
-        cmds,
-        args,
-        namespace,
-        kub_req_factory=None
-    ):
-        # type: (str, list, str, KubernetesRequestFactory) -> 
KubernetesResourceBuilder
-
-        self.image = image
-        self.args = args
-        self.cmds = cmds
-        self.kub_req_factory = kub_req_factory
-        self.namespace = namespace
-        self.envs = {}
-        self.labels = {}
-        self.secrets = {}
-        self.node_selectors = []
-        self.name = None
-        self.image_pull_policy = None
-
-    def add_env_variables(self, env):
-        self.envs = env
-
-    def add_secret(self, secret):
-        self.secrets = self.secrets + [secret]
-
-    def add_secrets(self, secrets):
-        self.secrets = secrets
-
-    def add_labels(self, labels):
-        self.labels = labels
-
-    def add_name(self, name):
-        self.name = name
-
-    def set_namespace(self, namespace):
-        self.namespace = namespace
-
-    def set_image_pull_policy(self, image_pull_policy):
-        self.image_pull_policy = image_pull_policy
-
-    def launch(self):
-        """
-            Launches the pod synchronously and waits for completion.
-        """
-        k8s_beta = self._kube_client()
-        req = self.kub_req_factory.create(self)
-        self.logger.info(json.dumps(req))
-        resp = k8s_beta.create_namespaced_pod(body=req, 
namespace=self.namespace)
-        self.logger.info("Job created. status='%s', yaml:\n%s",
-                         str(resp.status), str(req))
-
-    def _kube_client(self):
-        config.load_incluster_config()
-        return client.CoreV1Api()
-
-class KubernetesPodBuilder(KubernetesResourceBuilder):
-    def __init__(self, image, cmds, namespace, kub_req_factory=None):
-        # type: (str, list, str, KubernetesRequestFactory) -> 
KubernetesPodBuilder
-        KubernetesResourceBuilder.__init__(self, image, cmds, namespace, 
kub_req_factory)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/contrib/kubernetes/kubernetes_helper.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_helper.py 
b/airflow/contrib/kubernetes/kubernetes_helper.py
deleted file mode 100644
index cad7917..0000000
--- a/airflow/contrib/kubernetes/kubernetes_helper.py
+++ /dev/null
@@ -1,45 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import yaml
-from kubernetes import client, config
-from kubernetes.client.rest import ApiException
-import kubernetes
-
-
-class KubernetesHelper(object):
-    def __init__(self):
-        config.load_incluster_config()
-        self.job_api = client.BatchV1Api()
-        self.pod_api = client.CoreV1Api()
-
-    def launch_job(self, pod_info, namespace):
-        dep = yaml.load(pod_info)
-        resp = self.job_api.create_namespaced_job(body=dep, 
namespace=namespace)
-        return resp
-
-    def get_status(self, pod_id, namespace):
-        return self.job_api.read_namespaced_job(pod_id, namespace).status
-
-    def delete_job(self, job_id, namespace):
-        body = client.V1DeleteOptions()
-        self.job_api.delete_namespaced_job(name=job_id, namespace=namespace, 
body=body)
-
-    def delete_pod(self, pod_id, namespace):
-        body = client.V1DeleteOptions()
-        try:
-            self.pod_api.delete_namespaced_pod(pod_id, namespace, body=body)
-        except ApiException as e:
-            if e.status != 404:
-                raise

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/contrib/kubernetes/kubernetes_job_builder.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_job_builder.py 
b/airflow/contrib/kubernetes/kubernetes_job_builder.py
deleted file mode 100644
index 4171dbc..0000000
--- a/airflow/contrib/kubernetes/kubernetes_job_builder.py
+++ /dev/null
@@ -1,75 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-
-from kubernetes import client, config
-import json
-import logging
-
-
-class KubernetesJobBuilder:
-    def __init__(
-        self,
-        image,
-        cmds,
-        namespace,
-        kub_req_factory=None
-    ):
-        self.image = image
-        self.cmds = cmds
-        self.kub_req_factory = kub_req_factory
-        self.namespace = namespace
-        self.logger = logging.getLogger(self.__class__.__name__)
-        self.envs = {}
-        self.labels = {}
-        self.secrets = {}
-        self.node_selectors = []
-        self.name = None
-
-    def add_env_variables(self, env):
-        self.envs = env
-
-    def add_secrets(self, secrets):
-        self.secrets = secrets
-
-    def add_labels(self, labels):
-        self.labels = labels
-
-    def add_name(self, name):
-        self.name = name
-
-    def set_namespace(self, namespace):
-        self.namespace = namespace
-
-    def launch(self):
-        """
-            Launches the pod synchronously and waits for completion.
-        """
-        k8s_beta = self._kube_client()
-        req = self.kub_req_factory.create(self)
-        print(json.dumps(req))
-        resp = k8s_beta.create_namespaced_job(body=req, 
namespace=self.namespace)
-        self.logger.info("Job created. status='%s', yaml:\n%s",
-                         str(resp.status), str(req))
-
-    def _execution_finished(self):
-        k8s_beta = self._kube_client()
-        resp = k8s_beta.read_namespaced_job_status(self.name, 
namespace=self.namespace)
-        self.logger.info('status : ' + str(resp.status))
-        if resp.status.phase == 'Failed':
-            raise Exception("Job " + self.name + " failed!")
-        return resp.status.phase != 'Running'
-
-    @staticmethod
-    def _kube_client():
-        config.load_incluster_config()
-        return client.BatchV1Api()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py 
b/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py
index 9921696..13a8339 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py
@@ -1,12 +1,16 @@
-# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
 #
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+#   http://www.apache.org/licenses/LICENSE-2.0
 #
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/contrib/kubernetes/kubernetes_request_factory/job_request_factory.py
----------------------------------------------------------------------
diff --git 
a/airflow/contrib/kubernetes/kubernetes_request_factory/job_request_factory.py 
b/airflow/contrib/kubernetes/kubernetes_request_factory/job_request_factory.py
deleted file mode 100644
index a06b434..0000000
--- 
a/airflow/contrib/kubernetes/kubernetes_request_factory/job_request_factory.py
+++ /dev/null
@@ -1,57 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-import logging
-import yaml
-from .kubernetes_request_factory import KubernetesRequestFactory, 
KubernetesRequestFactoryHelper as kreq
-
-
-class SimpleJobRequestFactory(KubernetesRequestFactory):
-    """
-        Request generator for a simple pod.
-    """
-
-    def __init__(self):
-        super(SimpleJobRequestFactory, self).__init__()
-
-    _yaml = """apiVersion: batch/v1
-kind: Job
-metadata:
-  name: name
-spec:
-  template:
-    metadata:
-      name: name
-    spec:
-      containers:
-      - name: base
-        image: airflow-slave:latest
-        command: ["/usr/local/airflow/entrypoint.sh", "/bin/bash sleep 25"]
-      restartPolicy: Never
-    """
-
-    def create_body(self, pod):
-        req = yaml.load(self._yaml)
-        kreq.extract_name(pod, req)
-        kreq.extract_labels(pod, req)
-        kreq.extract_image(pod, req)
-        kreq.extract_cmds(pod, req)
-        kreq.extract_args(pod, req)
-        if len(pod.node_selectors) > 0:
-            kreq.extract_node_selector(pod, req)
-            kreq.extract_secrets(pod, req)
-        logging.info("attaching volume mounts")
-        kreq.attach_volume_mounts(req)
-        return req
-
-    def after_create(self, body, pod):
-        pass

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
----------------------------------------------------------------------
diff --git 
a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
 
b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
index b5ab074..cbf3fce 100644
--- 
a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
+++ 
b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
@@ -1,25 +1,29 @@
-# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
 #
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+#   http://www.apache.org/licenses/LICENSE-2.0
 #
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 
 from abc import ABCMeta, abstractmethod
 import six
 
-class KubernetesRequestFactory():
+
+class KubernetesRequestFactory:
     """
-    Create requests to be sent to kube API. Extend this class to talk to 
kubernetes and generate
-    your specific resources. This is equivalent of generating yaml files that 
can be used by
-    `kubectl`
+    Create requests to be sent to kube API.
+    Extend this class to talk to kubernetes and generate your specific 
resources.
+    This is equivalent of generating yaml files that can be used by `kubectl`
     """
     __metaclass__ = ABCMeta
 
@@ -32,37 +36,23 @@ class KubernetesRequestFactory():
         """
         pass
 
-    @abstractmethod
-    def after_create(self, body, pod):
-        """
-            Is called after the create to augment the body.
-
-            :param body: The request body
-            :param pod:  The pod
-        """
-        pass
-
-
-class KubernetesRequestFactoryHelper(object):
-    """
-    Helper methods to build a request for kubernetes
-    """
-    @staticmethod
-    def extract_image_pull_policy(pod, req):
-        req['spec']['containers'][0]['imagePullPolicy'] = pod.image_pull_policy
-
     @staticmethod
     def extract_image(pod, req):
         req['spec']['containers'][0]['image'] = pod.image
 
     @staticmethod
+    def extract_image_pull_policy(pod, req):
+        if pod.image_pull_policy:
+            req['spec']['containers'][0]['imagePullPolicy'] = 
pod.image_pull_policy
+
+    @staticmethod
     def add_secret_to_env(env, secret):
         env.append({
-            'name':secret.deploy_target,
-            'valueFrom':{
-                'secretKeyRef':{
-                    'name':secret.secret,
-                    'key':secret.key
+            'name': secret.deploy_target,
+            'valueFrom': {
+                'secretKeyRef': {
+                    'name': secret.secret,
+                    'key': secret.key
                 }
             }
         })
@@ -83,8 +73,12 @@ class KubernetesRequestFactoryHelper(object):
 
     @staticmethod
     def extract_node_selector(pod, req):
-        req['spec']['nodeSelector'] = pod.node_selectors
+        if len(pod.node_selectors) > 0:
+            req['spec']['nodeSelector'] = pod.node_selectors
 
+    @staticmethod
+    def attach_volumes(pod, req):
+        req['spec']['volumes'] = pod.volumes
 
     @staticmethod
     def attach_volume_mounts(pod, req):
@@ -106,14 +100,14 @@ class KubernetesRequestFactoryHelper(object):
         for idx, vol in enumerate(vol_secrets):
             vol_id = 'secretvol' + str(idx)
             req['spec']['containers'][0]['volumeMounts'].append({
-                'mountPath':vol.deploy_target,
-                'name':vol_id,
-                'readOnly':True
+                'mountPath': vol.deploy_target,
+                'name': vol_id,
+                'readOnly': True
             })
             req['spec']['volumes'].append({
-                'name':vol_id,
-                'secret':{
-                    'secretName':vol.secret
+                'name': vol_id,
+                'secret': {
+                    'secretName': vol.secret
                 }
             })
 
@@ -123,7 +117,7 @@ class KubernetesRequestFactoryHelper(object):
         if len(pod.envs) > 0 or len(env_secrets) > 0:
             env = []
             for k in pod.envs.keys():
-                env.append({'name':k, 'value':pod.envs[k]})
+                env.append({'name': k, 'value': pod.envs[k]})
             for secret in env_secrets:
                 KubernetesRequestFactory.add_secret_to_env(env, secret)
             req['spec']['containers'][0]['env'] = env
@@ -138,16 +132,20 @@ class KubernetesRequestFactoryHelper(object):
         if pod.resources.has_requests():
             req['spec']['containers'][0]['resources']['requests'] = {}
             if pod.resources.request_memory:
-                
req['spec']['containers'][0]['resources']['requests']['memory'] = 
pod.resources.request_memory
+                req['spec']['containers'][0]['resources']['requests'][
+                    'memory'] = pod.resources.request_memory
             if pod.resources.request_cpu:
-                req['spec']['containers'][0]['resources']['requests']['cpu'] = 
pod.resources.request_cpu
+                req['spec']['containers'][0]['resources']['requests'][
+                    'cpu'] = pod.resources.request_cpu
 
         if pod.resources.has_limits():
             req['spec']['containers'][0]['resources']['limits'] = {}
             if pod.resources.request_memory:
-                req['spec']['containers'][0]['resources']['limits']['memory'] 
= pod.resources.limit_memory
+                req['spec']['containers'][0]['resources']['limits'][
+                    'memory'] = pod.resources.limit_memory
             if pod.resources.request_cpu:
-                req['spec']['containers'][0]['resources']['limits']['cpu'] = 
pod.resources.limit_cpu
+                req['spec']['containers'][0]['resources']['limits'][
+                    'cpu'] = pod.resources.limit_cpu
 
     @staticmethod
     def extract_init_containers(pod, req):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
----------------------------------------------------------------------
diff --git 
a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py 
b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
index ea6b94b..44b05dd 100644
--- 
a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
+++ 
b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
@@ -1,20 +1,23 @@
-# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
 #
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+#   http://www.apache.org/licenses/LICENSE-2.0
 #
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 
 import yaml
-from 
airflow.contrib.kubernetes.kubernetes_request_factory.kubernetes_request_factory
 import (
-    KubernetesRequestFactory)
-from airflow.contrib.kubernetes.pod import Pod
+from 
airflow.contrib.kubernetes.kubernetes_request_factory.kubernetes_request_factory
 \
+    import KubernetesRequestFactory
 
 
 class SimplePodRequestFactory(KubernetesRequestFactory):
@@ -30,9 +33,6 @@ spec:
     - name: base
       image: airflow-slave:latest
       command: ["/usr/local/airflow/entrypoint.sh", "/bin/bash sleep 25"]
-      volumeMounts:
-        - name: shared-data
-          mountPath: "/usr/local/airflow/dags"
   restartPolicy: Never
     """
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/contrib/kubernetes/pod.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod.py 
b/airflow/contrib/kubernetes/pod.py
index 1877da7..01d6760 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/contrib/kubernetes/pod.py
@@ -1,20 +1,29 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
 #
-# http://www.apache.org/licenses/LICENSE-2.0
+#   http://www.apache.org/licenses/LICENSE-2.0
 #
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 
 
 class Resources:
-    def __init__(self, request_memory=None, request_cpu=None, 
limit_memory=None, limit_cpu=None):
+    def __init__(
+            self,
+            request_memory=None,
+            request_cpu=None,
+            limit_memory=None,
+            limit_cpu=None):
         self.request_memory = request_memory
         self.request_cpu = request_cpu
         self.limit_memory = limit_memory
@@ -33,7 +42,6 @@ class Resources:
 class Pod:
     """
     Represents a kubernetes pod and manages execution of a single pod.
-
     :param image: The docker image
     :type image: str
     :param env: A dict containing the environment variables
@@ -46,18 +54,18 @@ class Pod:
                    successful execution of the pod
     :type result: any
     """
-    pod_timeout = 3600
-
     def __init__(
             self,
             image,
             envs,
             cmds,
-            secrets,
+            args=None,
+            secrets=None,
             labels=None,
             node_selectors=None,
             name=None,
-            volumes = [],
+            volumes=None,
+            volume_mounts=None,
             namespace='default',
             result=None,
             image_pull_policy="IfNotPresent",
@@ -67,14 +75,16 @@ class Pod:
             resources=None
     ):
         self.image = image
-        self.envs = envs if envs else {}
+        self.envs = envs or {}
         self.cmds = cmds
-        self.secrets = secrets
+        self.args = args or []
+        self.secrets = secrets or []
         self.result = result
-        self.labels = labels if labels else []
+        self.labels = labels or {}
         self.name = name
-        self.volumes = volumes
-        self.node_selectors = node_selectors if node_selectors else []
+        self.volumes = volumes or []
+        self.volume_mounts = volume_mounts or []
+        self.node_selectors = node_selectors or []
         self.namespace = namespace
         self.image_pull_policy = image_pull_policy
         self.image_pull_secrets = image_pull_secrets

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/contrib/kubernetes/pod_launcher.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod_launcher.py 
b/airflow/contrib/kubernetes/pod_launcher.py
index 1903060..23d3a7c 100644
--- a/airflow/contrib/kubernetes/pod_launcher.py
+++ b/airflow/contrib/kubernetes/pod_launcher.py
@@ -1,28 +1,31 @@
-# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
 #
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+#   http://www.apache.org/licenses/LICENSE-2.0
 #
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 
 import json
-
-from airflow.contrib.kubernetes.pod import Pod
-from airflow.contrib.kubernetes.kubernetes_request_factory.pod_request_factory 
import (
-    SimplePodRequestFactory)
+import time
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.state import State
+from datetime import datetime as dt
+from airflow.contrib.kubernetes.kubernetes_request_factory import \
+    pod_request_factory as pod_fac
 from kubernetes import watch
-from kubernetes.client import V1Pod
 from kubernetes.client.rest import ApiException
-
+from airflow import AirflowException
+from requests.exceptions import HTTPError
 from .kube_client import get_kube_client
 
 
@@ -34,10 +37,11 @@ class PodStatus(object):
 
 
 class PodLauncher(LoggingMixin):
-    def __init__(self, kube_client=None):
-        self.kube_req_factory = SimplePodRequestFactory()
-        self._client = kube_client or get_kube_client()
+    def __init__(self, kube_client=None, in_cluster=True):
+        super(PodLauncher, self).__init__()
+        self._client = kube_client or get_kube_client(in_cluster=in_cluster)
         self._watch = watch.Watch()
+        self.kube_req_factory = pod_fac.SimplePodRequestFactory()
 
     def run_pod_async(self, pod):
         req = self.kube_req_factory.create(pod)
@@ -50,35 +54,71 @@ class PodLauncher(LoggingMixin):
             raise
         return resp
 
-    def run_pod(self, pod):
+    def run_pod(self, pod, startup_timeout=120, get_logs=True):
         # type: (Pod) -> State
         """
         Launches the pod synchronously and waits for completion.
+        Args:
+            pod (Pod):
+            startup_timeout (int): Timeout for startup of the pod (if pod is 
pending for
+             too long, considers task a failure
         """
         resp = self.run_pod_async(pod)
-        final_status = self._monitor_pod(pod)
+        curr_time = dt.now()
+        if resp.status.start_time is None:
+            while self.pod_not_started(pod):
+                delta = dt.now() - curr_time
+                if delta.seconds >= startup_timeout:
+                    raise AirflowException("Pod took too long to start")
+                time.sleep(1)
+            self.log.debug('Pod not yet started')
+
+        final_status = self._monitor_pod(pod, get_logs)
         return final_status
 
-    def _monitor_pod(self, pod):
+    def _monitor_pod(self, pod, get_logs):
         # type: (Pod) -> State
-        for event in self._watch.stream(self.read_pod(pod), pod.namespace):
-            status = self._task_status(event)
-            if status == State.SUCCESS or status == State.FAILED:
-                return status
+
+        if get_logs:
+            logs = self._client.read_namespaced_pod_log(
+                name=pod.name,
+                namespace=pod.namespace,
+                follow=True,
+                tail_lines=10,
+                _preload_content=False)
+            for line in logs:
+                self.log.info(line)
+        else:
+            while self.pod_is_running(pod):
+                self.log.info("Pod {} has state {}".format(pod.name, 
State.RUNNING))
+                time.sleep(2)
+        return self._task_status(self.read_pod(pod))
 
     def _task_status(self, event):
         # type: (V1Pod) -> State
-        task = event['object']
         self.log.info(
-            "Event: {} had an event of type {}".format(task.metadata.name,
-                                                       event['type']))
-        status = self.process_status(task.metadata.name, task.status.phase)
+            "Event: {} had an event of type {}".format(event.metadata.name,
+                                                       event.status.phase))
+        status = self.process_status(event.metadata.name, event.status.phase)
         return status
 
+    def pod_not_started(self, pod):
+        state = self._task_status(self.read_pod(pod))
+        return state == State.QUEUED
+
+    def pod_is_running(self, pod):
+        state = self._task_status(self.read_pod(pod))
+        return state != State.SUCCESS and state != State.FAILED
+
     def read_pod(self, pod):
-        return self._client.read_namespaced_pod(pod.name, pod.namespace)
+        try:
+            return self._client.read_namespaced_pod(pod.name, pod.namespace)
+        except HTTPError as e:
+            raise AirflowException("There was an error reading the kubernetes 
API: {}"
+                                   .format(e))
 
     def process_status(self, job_id, status):
+        status = status.lower()
         if status == PodStatus.PENDING:
             return State.QUEUED
         elif status == PodStatus.FAILED:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/contrib/kubernetes/secret.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/secret.py 
b/airflow/contrib/kubernetes/secret.py
index a798a24..23bfacc 100644
--- a/airflow/contrib/kubernetes/secret.py
+++ b/airflow/contrib/kubernetes/secret.py
@@ -1,25 +1,29 @@
-# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
 #
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+#   http://www.apache.org/licenses/LICENSE-2.0
 #
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 
 
 class Secret:
-    """Defines Kubernetes Secret Containers"""
+    """Defines Kubernetes Secret Volume"""
 
     def __init__(self, deploy_type, deploy_target, secret, key):
-        """Initialize a Kubernetes Secret Object. Used to track requested 
secrets from the user.
-
-        :param deploy_type: The type of secret deploy in Kubernetes, either 
`env` or `volume`
+        """Initialize a Kubernetes Secret Object. Used to track requested 
secrets from
+        the user.
+        :param deploy_type: The type of secret deploy in Kubernetes, either 
`env` or
+            `volume`
         :type deploy_type: ``str``
         :param deploy_target: The environment variable to be created in the 
worker.
         :type deploy_target: ``str``

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/contrib/kubernetes/worker_configuration.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/worker_configuration.py 
b/airflow/contrib/kubernetes/worker_configuration.py
index f59576a..5cb92ef 100644
--- a/airflow/contrib/kubernetes/worker_configuration.py
+++ b/airflow/contrib/kubernetes/worker_configuration.py
@@ -1,16 +1,19 @@
-# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
 #
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+#   http://www.apache.org/licenses/LICENSE-2.0
 #
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 
 import copy
 import os
@@ -34,21 +37,21 @@ class WorkerConfiguration:
 
         # Otherwise, define a git-sync init container
         init_environment = [{
-                'name': 'GIT_SYNC_REPO',
-                'value': self.kube_config.git_repo
-            }, {
-                'name': 'GIT_SYNC_BRANCH',
-                'value': self.kube_config.git_branch
-            }, {
-                'name': 'GIT_SYNC_ROOT',
-                'value': '/tmp'
-            }, {
-                'name': 'GIT_SYNC_DEST',
-                'value': 'dags'
-            }, {
-                'name': 'GIT_SYNC_ONE_TIME',
-                'value': 'true'
-            }]
+            'name': 'GIT_SYNC_REPO',
+            'value': self.kube_config.git_repo
+        }, {
+            'name': 'GIT_SYNC_BRANCH',
+            'value': self.kube_config.git_branch
+        }, {
+            'name': 'GIT_SYNC_ROOT',
+            'value': '/tmp'
+        }, {
+            'name': 'GIT_SYNC_DEST',
+            'value': 'dags'
+        }, {
+            'name': 'GIT_SYNC_ONE_TIME',
+            'value': 'true'
+        }]
         if self.kube_config.git_user:
             init_environment.append({
                 'name': 'GIT_SYNC_USERNAME',
@@ -72,7 +75,8 @@ class WorkerConfiguration:
     def _get_volumes_and_mounts(self):
         """Determine volumes and volume mounts for Airflow workers"""
         dags_volume_name = "airflow-dags"
-        dags_path = os.path.join(self.kube_config.dags_folder, 
self.kube_config.git_subpath)
+        dags_path = os.path.join(self.kube_config.dags_folder,
+                                 self.kube_config.git_subpath)
         volumes = [{
             'name': dags_volume_name
         }]
@@ -101,7 +105,8 @@ class WorkerConfiguration:
 
         # A PV with the DAGs should be mounted
         if self.kube_config.dags_volume_claim:
-            volumes[0]['persistentVolumeClaim'] = {"claimName": 
self.kube_config.dags_volume_claim}
+            volumes[0]['persistentVolumeClaim'] = {
+                "claimName": self.kube_config.dags_volume_claim}
             if self.kube_config.dags_volume_subpath:
                 volume_mounts[0]["subPath"] = 
self.kube_config.dags_volume_subpath
         else:
@@ -124,7 +129,8 @@ class WorkerConfiguration:
         worker_secrets = []
         for env_var_name, obj_key_pair in 
six.iteritems(self.kube_config.kube_secrets):
             k8s_secret_obj, k8s_secret_key = obj_key_pair.split('=')
-            worker_secrets.append(Secret('env', env_var_name, k8s_secret_obj, 
k8s_secret_key))
+            worker_secrets.append(
+                Secret('env', env_var_name, k8s_secret_obj, k8s_secret_key))
         return worker_secrets
 
     def _get_image_pull_secrets(self):
@@ -133,9 +139,11 @@ class WorkerConfiguration:
             return []
         return self.kube_config.image_pull_secrets.split(',')
 
-    def make_pod(self, namespace, pod_id, dag_id, task_id, execution_date, 
airflow_command, kube_executor_config):
+    def make_pod(self, namespace, pod_id, dag_id, task_id, execution_date,
+                 airflow_command, kube_executor_config):
         volumes, volume_mounts = self._get_volumes_and_mounts()
-        worker_init_container_spec = 
self._get_init_containers(copy.deepcopy(volume_mounts))
+        worker_init_container_spec = self._get_init_containers(
+            copy.deepcopy(volume_mounts))
         resources = Resources(
             request_memory=kube_executor_config.request_memory,
             request_cpu=kube_executor_config.request_cpu,


Reply via email to