http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/airflow/bin/cli/cli_factory.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli/cli_factory.py b/airflow/bin/cli/cli_factory.py
new file mode 100644
index 0000000..46efcf7
--- /dev/null
+++ b/airflow/bin/cli/cli_factory.py
@@ -0,0 +1,491 @@
+from collections import namedtuple
+from cli import *
+from dateutil.parser import parse as parsedate
+
+
+Arg = namedtuple(
+    'Arg', ['flags', 'help', 'action', 'default', 'nargs', 'type', 'choices', 
'metavar'])
+Arg.__new__.__defaults__ = (None, None, None, None, None, None, None)
+
+
+class CLIFactory(object):
+    args = {
+        # Shared
+        'dag_id': Arg(("dag_id",), "The id of the dag"),
+        'task_id': Arg(("task_id",), "The id of the task"),
+        'execution_date': Arg(
+            ("execution_date",), help="The execution date of the DAG",
+            type=parsedate),
+        'task_regex': Arg(
+            ("-t", "--task_regex"),
+            "The regex to filter specific task_ids to backfill (optional)"),
+        'subdir': Arg(
+            ("-sd", "--subdir"),
+            "File location or directory from which to look for the dag",
+            default=settings.DAGS_FOLDER),
+        'start_date': Arg(
+            ("-s", "--start_date"), "Override start_date YYYY-MM-DD",
+            type=parsedate),
+        'end_date': Arg(
+            ("-e", "--end_date"), "Override end_date YYYY-MM-DD",
+            type=parsedate),
+        'dry_run': Arg(
+            ("-dr", "--dry_run"), "Perform a dry run", "store_true"),
+        'pid': Arg(
+            ("--pid", ), "PID file location",
+            nargs='?'),
+        'daemon': Arg(
+            ("-D", "--daemon"), "Daemonize instead of running "
+                                "in the foreground",
+            "store_true"),
+        'stderr': Arg(
+            ("--stderr", ), "Redirect stderr to this file"),
+        'stdout': Arg(
+            ("--stdout", ), "Redirect stdout to this file"),
+        'log_file': Arg(
+            ("-l", "--log-file"), "Location of the log file"),
+
+        # backfill
+        'mark_success': Arg(
+            ("-m", "--mark_success"),
+            "Mark jobs as succeeded without running them", "store_true"),
+        'local': Arg(
+            ("-l", "--local"),
+            "Run the task using the LocalExecutor", "store_true"),
+        'donot_pickle': Arg(
+            ("-x", "--donot_pickle"), (
+                "Do not attempt to pickle the DAG object to send over "
+                "to the workers, just tell the workers to run their version "
+                "of the code."),
+            "store_true"),
+        'include_adhoc': Arg(
+            ("-a", "--include_adhoc"),
+            "Include dags with the adhoc parameter.", "store_true"),
+        'bf_ignore_dependencies': Arg(
+            ("-i", "--ignore_dependencies"),
+            (
+                "Skip upstream tasks, run only the tasks "
+                "matching the regexp. Only works in conjunction "
+                "with task_regex"),
+            "store_true"),
+        'bf_ignore_first_depends_on_past': Arg(
+            ("-I", "--ignore_first_depends_on_past"),
+            (
+                "Ignores depends_on_past dependencies for the first "
+                "set of tasks only (subsequent executions in the backfill "
+                "DO respect depends_on_past)."),
+            "store_true"),
+        'pool': Arg(("--pool",), "Resource pool to use"),
+        # list_tasks
+        'tree': Arg(("-t", "--tree"), "Tree view", "store_true"),
+        # list_dags
+        'report': Arg(
+            ("-r", "--report"), "Show DagBag loading report", "store_true"),
+        # clear
+        'upstream': Arg(
+            ("-u", "--upstream"), "Include upstream tasks", "store_true"),
+        'only_failed': Arg(
+            ("-f", "--only_failed"), "Only failed jobs", "store_true"),
+        'only_running': Arg(
+            ("-r", "--only_running"), "Only running jobs", "store_true"),
+        'downstream': Arg(
+            ("-d", "--downstream"), "Include downstream tasks", "store_true"),
+        'no_confirm': Arg(
+            ("-c", "--no_confirm"),
+            "Do not request confirmation", "store_true"),
+        'exclude_subdags': Arg(
+            ("-x", "--exclude_subdags"),
+            "Exclude subdags", "store_true"),
+        # trigger_dag
+        'run_id': Arg(("-r", "--run_id"), "Helps to identify this run"),
+        'conf': Arg(
+            ('-c', '--conf'),
+            "JSON string that gets pickled into the DagRun's conf attribute"),
+        'exec_date': Arg(
+            ("-e", "--exec_date"), help="The execution date of the DAG",
+            type=parsedate),
+        # pool
+        'pool_set': Arg(
+            ("-s", "--set"),
+            nargs=3,
+            metavar=('NAME', 'SLOT_COUNT', 'POOL_DESCRIPTION'),
+            help="Set pool slot count and description, respectively"),
+        'pool_get': Arg(
+            ("-g", "--get"),
+            metavar='NAME',
+            help="Get pool info"),
+        'pool_delete': Arg(
+            ("-x", "--delete"),
+            metavar="NAME",
+            help="Delete a pool"),
+        # variables
+        'set': Arg(
+            ("-s", "--set"),
+            nargs=2,
+            metavar=('KEY', 'VAL'),
+            help="Set a variable"),
+        'get': Arg(
+            ("-g", "--get"),
+            metavar='KEY',
+            help="Get value of a variable"),
+        'default': Arg(
+            ("-d", "--default"),
+            metavar="VAL",
+            default=None,
+            help="Default value returned if variable does not exist"),
+        'json': Arg(
+            ("-j", "--json"),
+            help="Deserialize JSON variable",
+            action="store_true"),
+        'var_import': Arg(
+            ("-i", "--import"),
+            metavar="FILEPATH",
+            help="Import variables from JSON file"),
+        'var_export': Arg(
+            ("-e", "--export"),
+            metavar="FILEPATH",
+            help="Export variables to JSON file"),
+        'var_delete': Arg(
+            ("-x", "--delete"),
+            metavar="KEY",
+            help="Delete a variable"),
+        # kerberos
+        'principal': Arg(
+            ("principal",), "kerberos principal",
+            nargs='?', default=conf.get('kerberos', 'principal')),
+        'keytab': Arg(
+            ("-kt", "--keytab"), "keytab",
+            nargs='?', default=conf.get('kerberos', 'keytab')),
+        # run
+        # TODO(aoen): "force" is a poor choice of name here since it implies 
it overrides
+        # all dependencies (not just past success), e.g. the 
ignore_depends_on_past
+        # dependency. This flag should be deprecated and renamed to 
'ignore_ti_state' and
+        # the "ignore_all_dependencies" command should be called the"force" 
command
+        # instead.
+        'force': Arg(
+            ("-f", "--force"),
+            "Ignore previous task instance state, rerun regardless if task 
already "
+            "succeeded/failed",
+            "store_true"),
+        'raw': Arg(("-r", "--raw"), argparse.SUPPRESS, "store_true"),
+        'ignore_all_dependencies': Arg(
+            ("-A", "--ignore_all_dependencies"),
+            "Ignores all non-critical dependencies, including ignore_ti_state 
and "
+            "ignore_task_deps",
+            "store_true"),
+        # TODO(aoen): ignore_dependencies is a poor choice of name here 
because it is too
+        # vague (e.g. a task being in the appropriate state to be run is also 
a dependency
+        # but is not ignored by this flag), the name 
'ignore_task_dependencies' is
+        # slightly better (as it ignores all dependencies that are specific to 
the task),
+        # so deprecate the old command name and use this instead.
+        'ignore_dependencies': Arg(
+            ("-i", "--ignore_dependencies"),
+            "Ignore task-specific dependencies, e.g. upstream, 
depends_on_past, and "
+            "retry delay dependencies",
+            "store_true"),
+        'ignore_depends_on_past': Arg(
+            ("-I", "--ignore_depends_on_past"),
+            "Ignore depends_on_past dependencies (but respect "
+            "upstream dependencies)",
+            "store_true"),
+        'ship_dag': Arg(
+            ("--ship_dag",),
+            "Pickles (serializes) the DAG and ships it to the worker",
+            "store_true"),
+        'kubernetes_mode': Arg(
+            ("-km", "--kubernetes_mode"),
+            "Pod will not contact postgres instance. should only be used for 
kubernetes tasks",
+            "store_true"),
+        'pickle': Arg(
+            ("-p", "--pickle"),
+            "Serialized pickle object of the entire dag (used internally)"),
+        'job_id': Arg(("-j", "--job_id"), argparse.SUPPRESS),
+        'cfg_path': Arg(
+            ("--cfg_path", ), "Path to config file to use instead of 
airflow.cfg"),
+        # webserver
+        'port': Arg(
+            ("-p", "--port"),
+            default=conf.get('webserver', 'WEB_SERVER_PORT'),
+            type=int,
+            help="The port on which to run the server"),
+        'ssl_cert': Arg(
+            ("--ssl_cert", ),
+            default=conf.get('webserver', 'WEB_SERVER_SSL_CERT'),
+            help="Path to the SSL certificate for the webserver"),
+        'ssl_key': Arg(
+            ("--ssl_key", ),
+            default=conf.get('webserver', 'WEB_SERVER_SSL_KEY'),
+            help="Path to the key to use with the SSL certificate"),
+        'workers': Arg(
+            ("-w", "--workers"),
+            default=conf.get('webserver', 'WORKERS'),
+            type=int,
+            help="Number of workers to run the webserver on"),
+        'workerclass': Arg(
+            ("-k", "--workerclass"),
+            default=conf.get('webserver', 'WORKER_CLASS'),
+            choices=['sync', 'eventlet', 'gevent', 'tornado'],
+            help="The worker class to use for Gunicorn"),
+        'worker_timeout': Arg(
+            ("-t", "--worker_timeout"),
+            default=conf.get('webserver', 'WEB_SERVER_WORKER_TIMEOUT'),
+            type=int,
+            help="The timeout for waiting on webserver workers"),
+        'hostname': Arg(
+            ("-hn", "--hostname"),
+            default=conf.get('webserver', 'WEB_SERVER_HOST'),
+            help="Set the hostname on which to run the web server"),
+        'debug': Arg(
+            ("-d", "--debug"),
+            "Use the server that ships with Flask in debug mode",
+            "store_true"),
+        'access_logfile': Arg(
+            ("-A", "--access_logfile"),
+            default=conf.get('webserver', 'ACCESS_LOGFILE'),
+            help="The logfile to store the webserver access log. Use '-' to 
print to "
+                 "stderr."),
+        'error_logfile': Arg(
+            ("-E", "--error_logfile"),
+            default=conf.get('webserver', 'ERROR_LOGFILE'),
+            help="The logfile to store the webserver error log. Use '-' to 
print to "
+                 "stderr."),
+        # resetdb
+        'yes': Arg(
+            ("-y", "--yes"),
+            "Do not prompt to confirm reset. Use with care!",
+            "store_true",
+            default=False),
+        # scheduler
+        'dag_id_opt': Arg(("-d", "--dag_id"), help="The id of the dag to run"),
+        'run_duration': Arg(
+            ("-r", "--run-duration"),
+            default=None, type=int,
+            help="Set number of seconds to execute before exiting"),
+        'num_runs': Arg(
+            ("-n", "--num_runs"),
+            default=-1, type=int,
+            help="Set the number of runs to execute before exiting"),
+        # worker
+        'do_pickle': Arg(
+            ("-p", "--do_pickle"),
+            default=False,
+            help=(
+                "Attempt to pickle the DAG object to send over "
+                "to the workers, instead of letting workers run their version "
+                "of the code."),
+            action="store_true"),
+        'queues': Arg(
+            ("-q", "--queues"),
+            help="Comma delimited list of queues to serve",
+            default=conf.get('celery', 'DEFAULT_QUEUE')),
+        'concurrency': Arg(
+            ("-c", "--concurrency"),
+            type=int,
+            help="The number of worker processes",
+            default=conf.get('celery', 'celeryd_concurrency')),
+        # flower
+        'broker_api': Arg(("-a", "--broker_api"), help="Broker api"),
+        'flower_hostname': Arg(
+            ("-hn", "--hostname"),
+            default=conf.get('celery', 'FLOWER_HOST'),
+            help="Set the hostname on which to run the server"),
+        'flower_port': Arg(
+            ("-p", "--port"),
+            default=conf.get('celery', 'FLOWER_PORT'),
+            type=int,
+            help="The port on which to run the server"),
+        'flower_conf': Arg(
+            ("-fc", "--flower_conf"),
+            help="Configuration file for flower"),
+        'task_params': Arg(
+            ("-tp", "--task_params"),
+            help="Sends a JSON params dict to the task"),
+        # connections
+        'list_connections': Arg(
+            ('-l', '--list'),
+            help='List all connections',
+            action='store_true'),
+        'add_connection': Arg(
+            ('-a', '--add'),
+            help='Add a connection',
+            action='store_true'),
+        'delete_connection': Arg(
+            ('-d', '--delete'),
+            help='Delete a connection',
+            action='store_true'),
+        'conn_id': Arg(
+            ('--conn_id',),
+            help='Connection id, required to add/delete a connection',
+            type=str),
+        'conn_uri': Arg(
+            ('--conn_uri',),
+            help='Connection URI, required to add a connection',
+            type=str),
+        'conn_extra': Arg(
+            ('--conn_extra',),
+            help='Connection `Extra` field, optional when adding a connection',
+            type=str),
+    }
+    subparsers = (
+        {
+            'func': backfill,
+            'help': "Run subsections of a DAG for a specified date range",
+            'args': (
+                'dag_id', 'task_regex', 'start_date', 'end_date',
+                'mark_success', 'local', 'donot_pickle', 'include_adhoc',
+                'bf_ignore_dependencies', 'bf_ignore_first_depends_on_past',
+                'subdir', 'pool', 'dry_run')
+        }, {
+            'func': list_tasks,
+            'help': "List the tasks within a DAG",
+            'args': ('dag_id', 'tree', 'subdir'),
+        }, {
+            'func': clear,
+            'help': "Clear a set of task instance, as if they never ran",
+            'args': (
+                'dag_id', 'task_regex', 'start_date', 'end_date', 'subdir',
+                'upstream', 'downstream', 'no_confirm', 'only_failed',
+                'only_running', 'exclude_subdags'),
+        }, {
+            'func': pause,
+            'help': "Pause a DAG",
+            'args': ('dag_id', 'subdir'),
+        }, {
+            'func': unpause,
+            'help': "Resume a paused DAG",
+            'args': ('dag_id', 'subdir'),
+        }, {
+            'func': trigger_dag,
+            'help': "Trigger a DAG run",
+            'args': ('dag_id', 'subdir', 'run_id', 'conf', 'exec_date'),
+        }, {
+            'func': pool,
+            'help': "CRUD operations on pools",
+            "args": ('pool_set', 'pool_get', 'pool_delete'),
+        }, {
+            'func': variables,
+            'help': "CRUD operations on variables",
+            "args": ('set', 'get', 'json', 'default', 'var_import', 
'var_export', 'var_delete'),
+        }, {
+            'func': kerberos,
+            'help': "Start a kerberos ticket renewer",
+            'args': ('principal', 'keytab', 'pid',
+                     'daemon', 'stdout', 'stderr', 'log_file'),
+        }, {
+            'func': render,
+            'help': "Render a task instance's template(s)",
+            'args': ('dag_id', 'task_id', 'execution_date', 'subdir'),
+        }, {
+            'func': run,
+            'help': "Run a single task instance",
+            'args': (
+                'dag_id', 'task_id', 'execution_date', 'subdir',
+                'mark_success', 'force', 'pool', 'cfg_path',
+                'local', 'raw', 'ignore_all_dependencies', 
'ignore_dependencies',
+                'ignore_depends_on_past', 'ship_dag', 'pickle', 'job_id', 
'kubernetes_mode'),
+        }, {
+            'func': initdb,
+            'help': "Initialize the metadata database",
+            'args': tuple(),
+        }, {
+            'func': list_dags,
+            'help': "List all the DAGs",
+            'args': ('subdir', 'report'),
+        }, {
+            'func': dag_state,
+            'help': "Get the status of a dag run",
+            'args': ('dag_id', 'execution_date', 'subdir'),
+        }, {
+            'func': task_failed_deps,
+            'help': (
+                "Returns the unmet dependencies for a task instance from the 
perspective "
+                "of the scheduler. In other words, why a task instance doesn't 
get "
+                "scheduled and then queued by the scheduler, and then run by 
an "
+                "executor)."),
+            'args': ('dag_id', 'task_id', 'execution_date', 'subdir'),
+        }, {
+            'func': task_state,
+            'help': "Get the status of a task instance",
+            'args': ('dag_id', 'task_id', 'execution_date', 'subdir'),
+        }, {
+            'func': serve_logs,
+            'help': "Serve logs generate by worker",
+            'args': tuple(),
+        }, {
+            'func': test,
+            'help': (
+                "Test a task instance. This will run a task without checking 
for "
+                "dependencies or recording it's state in the database."),
+            'args': (
+                'dag_id', 'task_id', 'execution_date', 'subdir', 'dry_run',
+                'task_params'),
+        }, {
+            'func': webserver,
+            'help': "Start a Airflow webserver instance",
+            'args': ('port', 'workers', 'workerclass', 'worker_timeout', 
'hostname',
+                     'pid', 'daemon', 'stdout', 'stderr', 'access_logfile',
+                     'error_logfile', 'log_file', 'ssl_cert', 'ssl_key', 
'debug'),
+        }, {
+            'func': resetdb,
+            'help': "Burn down and rebuild the metadata database",
+            'args': ('yes',),
+        }, {
+            'func': upgradedb,
+            'help': "Upgrade the metadata database to latest version",
+            'args': tuple(),
+        }, {
+            'func': scheduler,
+            'help': "Start a scheduler instance",
+            'args': ('dag_id_opt', 'subdir', 'run_duration', 'num_runs',
+                     'do_pickle', 'pid', 'daemon', 'stdout', 'stderr',
+                     'log_file'),
+        }, {
+            'func': worker,
+            'help': "Start a Celery worker node",
+            'args': ('do_pickle', 'queues', 'concurrency',
+                     'pid', 'daemon', 'stdout', 'stderr', 'log_file'),
+        }, {
+            'func': flower,
+            'help': "Start a Celery Flower",
+            'args': ('flower_hostname', 'flower_port', 'flower_conf', 
'broker_api',
+                     'pid', 'daemon', 'stdout', 'stderr', 'log_file'),
+        }, {
+            'func': version,
+            'help': "Show the version",
+            'args': tuple(),
+        }, {
+            'func': connections,
+            'help': "List/Add/Delete connections",
+            'args': ('list_connections', 'add_connection', 'delete_connection',
+                     'conn_id', 'conn_uri', 'conn_extra'),
+        },
+    )
+    subparsers_dict = {sp['func'].__name__: sp for sp in subparsers}
+    dag_subparsers = (
+        'list_tasks', 'backfill', 'test', 'run', 'pause', 'unpause')
+
+    @classmethod
+    def get_parser(cls, dag_parser=False):
+        parser = argparse.ArgumentParser()
+        subparsers = parser.add_subparsers(
+            help='sub-command help', dest='subcommand')
+        subparsers.required = True
+
+        subparser_list = cls.dag_subparsers if dag_parser else 
cls.subparsers_dict.keys()
+        for sub in subparser_list:
+            sub = cls.subparsers_dict[sub]
+            sp = subparsers.add_parser(sub['func'].__name__, help=sub['help'])
+            for arg in sub['args']:
+                if 'dag_id' in arg and dag_parser:
+                    continue
+                arg = cls.args[arg]
+                kwargs = {
+                    f: getattr(arg, f)
+                    for f in arg._fields if f != 'flags' and getattr(arg, f)}
+                sp.add_argument(*arg.flags, **kwargs)
+            sp.set_defaults(func=sub['func'])
+        return parser
+
+
+def get_parser():
+    return CLIFactory.get_parser()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/airflow/contrib/executors/kubernetes_executor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/executors/kubernetes_executor.py 
b/airflow/contrib/executors/kubernetes_executor.py
index 0a3e9f2..8eb2186 100644
--- a/airflow/contrib/executors/kubernetes_executor.py
+++ b/airflow/contrib/executors/kubernetes_executor.py
@@ -17,20 +17,20 @@ import logging
 import time
 import os
 import multiprocessing
-from airflow.contrib.kubernetes.kubernetes_pod_builder import 
KubernetesPodBuilder
-from airflow.contrib.kubernetes.kubernetes_helper import KubernetesHelper
 from queue import Queue
 from kubernetes import watch
 from airflow import settings
-from airflow.contrib.kubernetes.kubernetes_request_factory import 
SimplePodRequestFactory
+from airflow.contrib.kubernetes.pod_launcher import PodLauncher
 from airflow.executors.base_executor import BaseExecutor
 from airflow.models import TaskInstance
+from airflow.contrib.kubernetes.pod import Pod
 from airflow.utils.state import State
 from airflow import configuration
-import json
-# TODO this is just for proof of concept. remove before merging.
-
+from kubernetes import client
 
+class KubeConfig:
+    kube_image = configuration.get('core', 'k8s_image')
+    git_repo = configuration.get('core', 'k8s_git_repo')
 
 
 def _prep_command_for_container(command):
@@ -48,6 +48,9 @@ def _prep_command_for_container(command):
     return '"' + '","'.join(command.split(' ')[1:]) + '"'
 
 
+PARALLELISM = configuration.getint('core', 'PARALLELISM')
+
+
 class KubernetesJobWatcher(multiprocessing.Process, object):
     def __init__(self, watch_function, namespace, result_queue, watcher_queue):
         self.logger = logging.getLogger(__name__)
@@ -60,26 +63,26 @@ class KubernetesJobWatcher(multiprocessing.Process, object):
 
     def run(self):
         self.logger.info("Event: and now my watch begins")
-        self.logger.info("Event: proof of image change")
-        self.logger.info("Event: running {} with 
{}".format(str(self._watch_function),
-                                                     self.namespace))
-        for event in self._watch.stream(self._watch_function, self.namespace):
-            task= event['object']
-            self.logger.info("Event: {} had an event of type 
{}".format(task.metadata.name,
-                                                                        
event['type']))
+        for event in self._watch.stream(self._watch_function, self.namespace,
+                                        label_selector='airflow-slave'):
+            task = event['object']
+            self.logger.info(
+                "Event: {} had an event of type {}".format(task.metadata.name,
+                                                           event['type']))
             self.process_status(task.metadata.name, task.status.phase)
 
     def process_status(self, job_id, status):
         if status == 'Pending':
             self.logger.info("Event: {} Pending".format(job_id))
         elif status == 'Failed':
-            self.logger.info("Event: {} Failed".format(job_id))
+            # self.logger.info("Event: {} Failed".format(job_id))
             self.watcher_queue.put((job_id, State.FAILED))
         elif status == 'Succeeded':
-            self.logger.info("Event: {} Succeeded".format(job_id))
+            # self.logger.info("Event: {} Succeeded".format(job_id))
             self.watcher_queue.put((job_id, None))
         elif status == 'Running':
-            self.logger.info("Event: {} is Running".format(job_id))
+            # self.logger.info("Event: {} is Running".format(job_id))
+            self.watcher_queue.put((job_id, State.RUNNING))
         else:
             self.logger.info("Event: Invalid state {} on job 
{}".format(status, job_id))
 
@@ -92,15 +95,19 @@ class AirflowKubernetesScheduler(object):
         self.logger = logging.getLogger(__name__)
         self.logger.info("creating kubernetes executor")
         self.task_queue = task_queue
+        self.pending_jobs = set()
         self.namespace = os.environ['k8s_POD_NAMESPACE']
         self.logger.info("k8s: using namespace {}".format(self.namespace))
         self.result_queue = result_queue
+        self.launcher = PodLauncher()
         self.current_jobs = {}
         self.running = running
         self._task_counter = 0
         self.watcher_queue = multiprocessing.Queue()
-        self.helper = KubernetesHelper()
-        w = KubernetesJobWatcher(self.helper.pod_api.list_namespaced_pod, 
self.namespace,
+        self.api = client.CoreV1Api()
+
+        watch_function = self.api.read_namespaced_pod
+        w = KubernetesJobWatcher(watch_function, self.namespace,
                                  self.result_queue, self.watcher_queue)
         w.start()
 
@@ -119,29 +126,29 @@ class AirflowKubernetesScheduler(object):
         (key, command) = next_job
         self.logger.info("running for command {}".format(command))
         epoch_time = calendar.timegm(time.gmtime())
-        command_list = ["/usr/local/airflow/entrypoint.sh"] + 
command.split()[1:] + \
-                       ['-km']
-        self._set_host_id(key)
+        cmd_args = "mkdir -p $AIRFLOW_HOME/dags/synched/git && " \
+                   "git clone {} /tmp/tmp_git && " \
+                   "mv /tmp/tmp_git/* $AIRFLOW_HOME/dags/synched/git/ &&" \
+                   "rm -rf /tmp/tmp_git &&" \
+                   "{} -km".format(KubeConfig.git_repo, command)
         pod_id = self._create_job_id_from_key(key=key, epoch_time=epoch_time)
         self.current_jobs[pod_id] = key
 
-        image = configuration.get('core','k8s_image')
-        print("k8s: launching image {}".format(image))
         pod = KubernetesPodBuilder(
-            image= image,
-            cmds=command_list,
+            image=KubeConfig.kube_image,
+            cmds=["bash", "-cx", "--"],
+            args=[cmd_args],
             kub_req_factory=SimplePodRequestFactory(),
-            namespace=self.namespace)
+            namespace=self.namespace
+        )
         pod.add_name(pod_id)
         pod.launch()
         self._task_counter += 1
 
+        # the watcher will monitor pods, so we do not block.
+        self.launcher.run_pod_async(pod)
         self.logger.info("k8s: Job created!")
 
-    def delete_job(self, key):
-        job_id = self.current_jobs[key]
-        self.helper.delete_job(job_id, namespace=self.namespace)
-
     def sync(self):
         """
 
@@ -153,17 +160,21 @@ class AirflowKubernetesScheduler(object):
 
         """
         while not self.watcher_queue.empty():
-            self.end_task()
+            self.process_watcher_task()
 
-    def end_task(self):
+    def process_watcher_task(self):
         job_id, state = self.watcher_queue.get()
-        if job_id in self.current_jobs:
-            key = self.current_jobs[job_id]
-            self.logger.info("finishing job {}".format(key))
-            if state:
-                self.result_queue.put((key, state))
-            self.current_jobs.pop(job_id)
-            self.running.pop(key)
+        if state == State.RUNNING and job_id in self.pending_jobs:
+            self.pending_jobs.remove(job_id)
+        elif job_id in self.current_jobs:
+            self._complete_job(job_id, state)
+
+    def _complete_job(self, job_id, state):
+        key = self.current_jobs[job_id]
+        self.logger.info("finishing job {}".format(key))
+        self.result_queue.put((key, state))
+        self.current_jobs.pop(job_id)
+        self.running.pop(key)
 
     def _create_job_id_from_key(self, key, epoch_time):
         """
@@ -186,23 +197,22 @@ class AirflowKubernetesScheduler(object):
         job_id = unformatted_job_id.replace('_', '-')
         return job_id
 
-    def _set_host_id(self, key):
-        (dag_id, task_id, ex_time) = key
-        session = settings.Session()
-        item = session.query(TaskInstance) \
-            .filter_by(dag_id=dag_id, task_id=task_id, 
execution_date=ex_time).one()
-
-        host_id = item.hostname
-        print("host is {}".format(host_id))
-
 
 class KubernetesExecutor(BaseExecutor):
+    def __init__(self):
+        super(KubernetesExecutor, self).__init__(parallelism=PARALLELISM)
+        self.task_queue = None
+        self._session = None
+        self.result_queue = None
+        self.pending_tasks = None
+        self.kub_client = None
 
     def start(self):
         self.logger.info('k8s: starting kubernetes executor')
         self.task_queue = Queue()
         self._session = settings.Session()
         self.result_queue = Queue()
+        self.pending_tasks = {}
         self.kub_client = AirflowKubernetesScheduler(self.task_queue,
                                                      self.result_queue,
                                                      running=self.running)
@@ -215,7 +225,7 @@ class KubernetesExecutor(BaseExecutor):
             self.change_state(*results)
 
         # TODO this could be a job_counter based on max jobs a user wants
-        if len(self.kub_client.current_jobs) > 3:
+        if self.job_queue_full() or self.cluster_at_capacity():
             self.logger.info("currently a job is running")
         else:
             self.logger.info("queue ready, running next")
@@ -223,13 +233,20 @@ class KubernetesExecutor(BaseExecutor):
                 (key, command) = self.task_queue.get()
                 self.kub_client.run_next((key, command))
 
+    def job_queue_full(self):
+        return len(self.kub_client.current_jobs) > PARALLELISM
+
+    def cluster_at_capacity(self):
+        return len(self.pending_tasks) > 5
+
     def terminate(self):
         pass
 
     def change_state(self, key, state):
         self.logger.info("k8s: setting state of {} to {}".format(key, state))
         if state != State.RUNNING:
-            self.kub_client.delete_job(key)
+            # self.kub_client.delete_job(key)
+            self.logger.info("current running {}".format(self.running))
             self.running.pop(key)
         self.event_buffer[key] = state
         (dag_id, task_id, ex_time) = key

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/airflow/contrib/kubernetes/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/__init__.py 
b/airflow/contrib/kubernetes/__init__.py
index 59eeddf..c82f579 100644
--- a/airflow/contrib/kubernetes/__init__.py
+++ b/airflow/contrib/kubernetes/__init__.py
@@ -12,6 +12,3 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from airflow import dag_importer
-
-dag_importer.import_dags()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/airflow/contrib/kubernetes/kubernetes_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_factory.py 
b/airflow/contrib/kubernetes/kubernetes_factory.py
new file mode 100644
index 0000000..fc840bb
--- /dev/null
+++ b/airflow/contrib/kubernetes/kubernetes_factory.py
@@ -0,0 +1,63 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+
+from airflow.contrib.kubernetes.kubernetes_request_factory import 
KubernetesRequestFactory
+import logging
+
+
+class KubernetesResourceBuilder:
+    def __init__(
+        self,
+        image,
+        cmds,
+        args,
+        namespace,
+        kub_req_factory=None
+    ):
+        # type: (str, list, str, KubernetesRequestFactory) -> 
KubernetesResourceBuilder
+
+        self.image = image
+        self.args = args
+        self.cmds = cmds
+        self.kub_req_factory = kub_req_factory
+        self.namespace = namespace
+        self.logger = logging.getLogger(self.__class__.__name__)
+        self.envs = {}
+        self.labels = {}
+        self.secrets = {}
+        self.node_selectors = []
+        self.name = None
+
+    def add_env_variables(self, env):
+        self.envs = env
+
+    def add_secret(self, secret):
+        self.secrets = self.secrets + [secret]
+
+    def add_secrets(self, secrets):
+        self.secrets = secrets
+
+    def add_labels(self, labels):
+        self.labels = labels
+
+    def add_name(self, name):
+        self.name = name
+
+    def set_namespace(self, namespace):
+        self.namespace = namespace
+
+
+class KubernetesPodBuilder(KubernetesResourceBuilder):
+    def __init__(self, image, cmds, namespace, kub_req_factory=None):
+        # type: (str, list, str, KubernetesRequestFactory) -> 
KubernetesPodBuilder
+        KubernetesResourceBuilder.__init__(self, image, cmds, namespace, 
kub_req_factory)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/airflow/contrib/kubernetes/kubernetes_job_builder.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_job_builder.py 
b/airflow/contrib/kubernetes/kubernetes_job_builder.py
index 65237ff..4171dbc 100644
--- a/airflow/contrib/kubernetes/kubernetes_job_builder.py
+++ b/airflow/contrib/kubernetes/kubernetes_job_builder.py
@@ -61,10 +61,6 @@ class KubernetesJobBuilder:
         self.logger.info("Job created. status='%s', yaml:\n%s",
                          str(resp.status), str(req))
 
-    def _kube_client(self):
-        config.load_incluster_config()
-        return client.BatchV1Api()
-
     def _execution_finished(self):
         k8s_beta = self._kube_client()
         resp = k8s_beta.read_namespaced_job_status(self.name, 
namespace=self.namespace)
@@ -72,3 +68,8 @@ class KubernetesJobBuilder:
         if resp.status.phase == 'Failed':
             raise Exception("Job " + self.name + " failed!")
         return resp.status.phase != 'Running'
+
+    @staticmethod
+    def _kube_client():
+        config.load_incluster_config()
+        return client.BatchV1Api()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/airflow/contrib/kubernetes/kubernetes_pod_builder.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_pod_builder.py 
b/airflow/contrib/kubernetes/kubernetes_pod_builder.py
deleted file mode 100644
index 2b0a9e4..0000000
--- a/airflow/contrib/kubernetes/kubernetes_pod_builder.py
+++ /dev/null
@@ -1,74 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-
-from kubernetes import client, config
-import json
-import logging
-
-
-class KubernetesPodBuilder:
-    def __init__(
-        self,
-        image,
-        cmds,
-        namespace,
-        kub_req_factory=None
-    ):
-        self.image = image
-        self.cmds = cmds
-        self.kub_req_factory = kub_req_factory
-        self.namespace = namespace
-        self.logger = logging.getLogger(self.__class__.__name__)
-        self.envs = {}
-        self.labels = {}
-        self.secrets = {}
-        self.node_selectors = []
-        self.name = None
-
-    def add_env_variables(self, env):
-        self.envs = env
-
-    def add_secrets(self, secrets):
-        self.secrets = secrets
-
-    def add_labels(self, labels):
-        self.labels = labels
-
-    def add_name(self, name):
-        self.name = name
-
-    def set_namespace(self, namespace):
-        self.namespace = namespace
-
-    def launch(self):
-        """
-            Launches the pod synchronously and waits for completion.
-        """
-        k8s_beta = self._kube_client()
-        req = self.kub_req_factory.create(self)
-        print(json.dumps(req))
-        resp = k8s_beta.create_namespaced_pod(body=req, 
namespace=self.namespace)
-        self.logger.info("Job created. status='%s', yaml:\n%s",
-                         str(resp.status), str(req))
-
-    def _kube_client(self):
-        config.load_incluster_config()
-        return client.CoreV1Api()
-
-    def _execution_finished(self):
-        k8s_beta = self._kube_client()
-        resp = k8s_beta.read_namespaced_job_status(self.name, 
namespace=self.namespace)
-        self.logger.info('status : ' + str(resp.status))
-        if resp.status.phase == 'Failed':
-            raise Exception("Job " + self.name + " failed!")
-        return resp.status.phase != 'Running'

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py 
b/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py
index 676245c..d2344a2 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py
@@ -12,5 +12,4 @@
 # See the License for the specific language governing permissions and
 
 from .kubernetes_request_factory import *
-from .job_request_factory import *
 from .pod_request_factory import *

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/airflow/contrib/kubernetes/kubernetes_request_factory/job_request_factory.py
----------------------------------------------------------------------
diff --git 
a/airflow/contrib/kubernetes/kubernetes_request_factory/job_request_factory.py 
b/airflow/contrib/kubernetes/kubernetes_request_factory/job_request_factory.py
index fda488e..a06b434 100644
--- 
a/airflow/contrib/kubernetes/kubernetes_request_factory/job_request_factory.py
+++ 
b/airflow/contrib/kubernetes/kubernetes_request_factory/job_request_factory.py
@@ -10,9 +10,9 @@
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
-
+import logging
 import yaml
-from .kubernetes_request_factory import *
+from .kubernetes_request_factory import KubernetesRequestFactory, 
KubernetesRequestFactoryHelper as kreq
 
 
 class SimpleJobRequestFactory(KubernetesRequestFactory):
@@ -21,7 +21,7 @@ class SimpleJobRequestFactory(KubernetesRequestFactory):
     """
 
     def __init__(self):
-        pass
+        super(SimpleJobRequestFactory, self).__init__()
 
     _yaml = """apiVersion: batch/v1
 kind: Job
@@ -36,25 +36,22 @@ spec:
       - name: base
         image: airflow-slave:latest
         command: ["/usr/local/airflow/entrypoint.sh", "/bin/bash sleep 25"]
-        volumeMounts:
-          - name: shared-data
-            mountPath: "/usr/local/airflow/dags"
       restartPolicy: Never
     """
 
-    def create(self, pod):
+    def create_body(self, pod):
         req = yaml.load(self._yaml)
-        sub_req = req['spec']['template']
-        extract_name(pod, sub_req)
-        extract_labels(pod, sub_req)
-        extract_image(pod, sub_req)
-        extract_cmds(pod, sub_req)
+        kreq.extract_name(pod, req)
+        kreq.extract_labels(pod, req)
+        kreq.extract_image(pod, req)
+        kreq.extract_cmds(pod, req)
+        kreq.extract_args(pod, req)
         if len(pod.node_selectors) > 0:
-            extract_node_selector(pod, sub_req)
-        extract_secrets(pod, sub_req)
-        print("attaching volume mounts")
-        attach_volume_mounts(sub_req)
+            kreq.extract_node_selector(pod, req)
+            kreq.extract_secrets(pod, req)
+        logging.info("attaching volume mounts")
+        kreq.attach_volume_mounts(req)
         return req
 
-
-
+    def after_create(self, body, pod):
+        pass

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
----------------------------------------------------------------------
diff --git 
a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
 
b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
index a103fd9..2382561 100644
--- 
a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
+++ 
b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
@@ -12,10 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import logging
 from abc import ABCMeta, abstractmethod
-from airflow import dag_importer
-
 
 class KubernetesRequestFactory():
     """
@@ -35,73 +32,75 @@ class KubernetesRequestFactory():
         """
         pass
 
-
-def extract_image(pod, req):
-    req['spec']['containers'][0]['image'] = pod.image
-
-
-def add_secret_to_env(env, secret):
-    env.append({
-        'name': secret.deploy_target,
-        'valueFrom': {
-            'secretKeyRef': {
-                'name': secret.secret,
-                'key': secret.key
-            }
-        }
-    })
-
-
-def extract_labels(pod, req):
-    for k in pod.labels.keys():
-        req['metadata']['labels'][k] = pod.labels[k]
-
-
-def extract_cmds(pod, req):
-    req['spec']['containers'][0]['command'] = pod.cmds
-
-
-def extract_node_selector(pod, req):
-    req['spec']['nodeSelector'] = pod.node_selectors
-
-
-def extract_secrets(pod, req):
-    env_secrets = [s for s in pod.secrets if s.deploy_type == 'env']
-    if len(pod.envs) > 0 or len(env_secrets) > 0:
-        env = []
-        for k in pod.envs.keys():
-            env.append({'name': k, 'value': pod.envs[k]})
-        for secret in env_secrets:
-            add_secret_to_env(env, secret)
-        req['spec']['containers'][0]['env'] = env
-
-
-def attach_volume_mounts(req):
-    logging.info("preparing to import dags")
-    dag_importer.import_dags()
-    logging.info("using file mount {}".format(dag_importer.dag_import_spec))
-    req['spec']['volumes'] = [dag_importer.dag_import_spec]
-
-
-def extract_name(pod, req):
-    req['metadata']['name'] = pod.name
-
-
-def extract_volume_secrets(pod, req):
-    vol_secrets = [s for s in pod.secrets if s.deploy_type == 'volume']
-    if any(vol_secrets):
-        req['spec']['containers'][0]['volumeMounts'] = []
-        req['spec']['volumes'] = []
-    for idx, vol in enumerate(vol_secrets):
-        vol_id = 'secretvol' + str(idx)
-        req['spec']['containers'][0]['volumeMounts'].append({
-            'mountPath': vol.deploy_target,
-            'name': vol_id,
-            'readOnly': True
-        })
-        req['spec']['volumes'].append({
-            'name': vol_id,
-            'secret': {
-                'secretName': vol.secret
+    @staticmethod
+    def extract_image(pod, req):
+        req['spec']['containers'][0]['image'] = pod.image
+
+    @staticmethod
+    def add_secret_to_env(env, secret):
+        env.append({
+            'name':secret.deploy_target,
+            'valueFrom':{
+                'secretKeyRef':{
+                    'name':secret.secret,
+                    'key':secret.key
+                }
             }
         })
+
+    @staticmethod
+    def extract_labels(pod, req):
+        for k in pod.labels.keys():
+            req['metadata']['labels'][k] = pod.labels[k]
+
+    @staticmethod
+    def extract_cmds(pod, req):
+        req['spec']['containers'][0]['command'] = pod.cmds
+
+    @staticmethod
+    def extract_args(pod, req):
+        req['spec']['containers'][0]['args'] = pod.args
+
+    @staticmethod
+    def extract_node_selector(pod, req):
+        req['spec']['nodeSelector'] = pod.node_selectors
+
+
+    @staticmethod
+    def attach_volume_mounts(pod, req):
+        req['spec']['volumes'] = pod.volumes
+
+    @staticmethod
+    def extract_name(pod, req):
+        req['metadata']['name'] = pod.name
+
+    @staticmethod
+    def extract_volume_secrets(pod, req):
+        vol_secrets = [s for s in pod.secrets if s.deploy_type == 'volume']
+        if any(vol_secrets):
+            req['spec']['containers'][0]['volumeMounts'] = []
+            req['spec']['volumes'] = []
+        for idx, vol in enumerate(vol_secrets):
+            vol_id = 'secretvol' + str(idx)
+            req['spec']['containers'][0]['volumeMounts'].append({
+                'mountPath':vol.deploy_target,
+                'name':vol_id,
+                'readOnly':True
+            })
+            req['spec']['volumes'].append({
+                'name':vol_id,
+                'secret':{
+                    'secretName':vol.secret
+                }
+            })
+
+    @staticmethod
+    def extract_secrets(pod, req):
+        env_secrets = [s for s in pod.secrets if s.deploy_type == 'env']
+        if len(pod.envs) > 0 or len(env_secrets) > 0:
+            env = []
+            for k in pod.envs.keys():
+                env.append({'name':k, 'value':pod.envs[k]})
+            for secret in env_secrets:
+                KubernetesRequestFactory.add_secret_to_env(env, secret)
+            req['spec']['containers'][0]['env'] = env

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
----------------------------------------------------------------------
diff --git 
a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py 
b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
index 466972b..4a0cbeb 100644
--- 
a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
+++ 
b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
@@ -13,6 +13,7 @@
 
 import kubernetes_request_factory as kreq
 import yaml
+from airflow.contrib.kubernetes.pod import Pod
 from airflow import AirflowException
 
 
@@ -39,48 +40,16 @@ spec:
         pass
 
     def create(self, pod):
+        # type: (Pod) -> dict
         req = yaml.load(self._yaml)
         kreq.extract_name(pod, req)
         kreq.extract_labels(pod, req)
         kreq.extract_image(pod, req)
         kreq.extract_cmds(pod, req)
+        kreq.extract_args(pod, req)
         if len(pod.node_selectors) > 0:
-            kreq.extract_node_selector(pod, req)
-        kreq.extract_secrets(pod, req)
-        kreq.extract_volume_secrets(pod, req)
-        kreq.attach_volume_mounts(req)
+            self.extract_node_selector(pod, req)
+        self.extract_secrets(pod, req)
+        self.extract_volume_secrets(pod, req)
+        self.attach_volume_mounts(req=req, pod=pod)
         return req
-
-
-class ReturnValuePodRequestFactory(SimplePodRequestFactory):
-    """
-    Pod request factory with a PreStop hook to upload return value
-    to the system's etcd service.
-    :param kube_com_service_factory: Kubernetes Communication Service factory
-    :type kube_com_service_factory: () => KubernetesCommunicationService
-    """
-
-    def __init__(self, kube_com_service_factory, result_data_file):
-        super(ReturnValuePodRequestFactory, self).__init__()
-        self._kube_com_service_factory = kube_com_service_factory
-        self._result_data_file = result_data_file
-
-    def after_create(self, body, pod):
-        """
-            Augment the pod with hyper-parameterized specific logic
-            Adds a Kubernetes PreStop hook to upload the model training
-            metrics to the Kubernetes communication engine (probably
-            an etcd service running with airflow)
-        """
-        container = body['spec']['containers'][0]
-        pre_stop_hook = self._kube_com_service_factory() \
-            .pod_pre_stop_hook(self._result_data_file, pod.name)
-        # Pre-stop hook only works on containers that are deleted. If the 
container
-        # naturally exists there would be no pre-stop hook execution. 
Therefore we
-        # simulate the hook by wrapping the exe command inside a script
-        if "'" in ' '.join(container['command']):
-            raise AirflowException('Please do not include single quote '
-                                   'in your command for hyperparameterized 
pods')
-        cmd = ' '.join(["'" + c + "'" if " " in c else c for c in 
container['command']])
-        container['command'] = ['/bin/bash', '-c', "({}) ; ({})"
-            .format(cmd, pre_stop_hook)]

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/airflow/contrib/kubernetes/pod.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod.py 
b/airflow/contrib/kubernetes/pod.py
index c38783c..0200afa 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/contrib/kubernetes/pod.py
@@ -12,12 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from kubernetes import client, config
-from kubernetes_request_factory import KubernetesRequestFactory, 
SimplePodRequestFactory
 import logging
-from airflow import AirflowException
-import time
-import json
 
 class Pod:
     """
@@ -43,49 +38,21 @@ class Pod:
             envs,
             cmds,
             secrets,
-            labels,
-            node_selectors,
-            kube_req_factory,
-            name,
+            labels=None,
+            node_selectors=None,
+            name=None,
+            volumes = [],
             namespace='default',
             result=None):
         self.image = image
-        self.envs = envs
+        self.envs = envs if envs else {}
         self.cmds = cmds
         self.secrets = secrets
         self.result = result
-        self.labels = labels
+        self.labels = labels if labels else []
         self.name = name
-        self.node_selectors = node_selectors
-        self.kube_req_factory = (kube_req_factory or SimplePodRequestFactory)()
+        self.volumes = volumes
+        self.node_selectors = node_selectors if node_selectors else []
         self.namespace = namespace
         self.logger = logging.getLogger(self.__class__.__name__)
-        if not isinstance(self.kube_req_factory, KubernetesRequestFactory):
-            raise AirflowException('`kube_req_factory`'
-                                   '  should implement 
KubernetesRequestFactory')
 
-    def launch(self):
-        """
-            Launches the pod synchronously and waits for completion.
-        """
-        k8s_beta = self._kube_client()
-        req = self.kube_req_factory.create(self)
-        print(json.dumps(req))
-        resp = k8s_beta.create_namespaced_job(body=req, 
namespace=self.namespace)
-        self.logger.info("Job created. status='%s', yaml:\n%s"
-                         % (str(resp.status), str(req)))
-        while not self._execution_finished():
-            time.sleep(10)
-        return self.result
-
-    def _kube_client(self):
-        config.load_incluster_config()
-        return client.BatchV1Api()
-
-    def _execution_finished(self):
-        k8s_beta = self._kube_client()
-        resp = k8s_beta.read_namespaced_job_status(self.name, 
namespace=self.namespace)
-        self.logger.info('status : ' + str(resp.status))
-        if resp.status.phase == 'Failed':
-            raise Exception("Job " + self.name + " failed!")
-        return resp.status.phase != 'Running'

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/airflow/contrib/kubernetes/pod_launcher.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod_launcher.py 
b/airflow/contrib/kubernetes/pod_launcher.py
index a774d79..e92ae5c 100644
--- a/airflow/contrib/kubernetes/pod_launcher.py
+++ b/airflow/contrib/kubernetes/pod_launcher.py
@@ -11,135 +11,72 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-import base64
+from airflow.contrib.kubernetes.pod import Pod
+from airflow.contrib.kubernetes.kubernetes_request_factory import 
SimplePodRequestFactory
+from kubernetes import config, client, watch
+from kubernetes.client import V1Pod
+from airflow.utils.state import State
 import json
 import logging
-import time
-import urllib2
-
-from kubernetes import client, config
-
-from kubernetes_request_factory import KubernetesRequestFactory
-from pod import Pod
-
-
-def kube_client():
-    config.load_incluster_config()
-    return client.CoreV1Api()
 
 
-def incluster_namespace():
-    """
-    :return: The incluster namespace.
-    """
-    config.load_incluster_config()
-    k8s_configuration = config.incluster_config.configuration
-    encoded_namespace = k8s_configuration.api_key['authorization'].split(' 
')[-1]
-    api_key = str(base64.b64decode(encoded_namespace))
-    key_with_namespace = [k for k in api_key.split(',') if 'namespace' in k][0]
-    unformatted_namespace = key_with_namespace.split(':')[-1]
-    return unformatted_namespace.replace('"', '')
+class PodLauncher:
+    def __init__(self):
+        self.kube_req_factory = SimplePodRequestFactory()
+        self._client = self._kube_client()
+        self._watch = watch.Watch()
+        self.logger = logging.getLogger(__name__)
 
+    def run_pod_async(self, pod):
+        req = self.kube_req_factory.create(pod)
+        print(json.dumps(req))
+        resp = self._client.create_namespaced_pod(body=req, 
namespace=pod.namespace)
+        return resp
 
-class KubernetesLauncher:
-    """
-    This class is responsible for launching objects to Kubernetes.
-    Extend this class to launch exotic objects.
-    Before trying to extend this method check if augmenting the request factory
-    is enough for your use-case
-    :param kube_object: A pod or anything that represents a Kubernetes object
-    :type kube_object: Pod
-    :param request_factory: A factory method to create kubernetes requests.
-    """
-
-    pod_timeout = 3600
-
-    def __init__(self, kube_object, request_factory):
-        if not isinstance(kube_object, Pod):
-            raise Exception('`kube_object` must inherit from Pod')
-        if not isinstance(request_factory, KubernetesRequestFactory):
-            raise Exception('`request_factory` must inherit from '
-                            'KubernetesRequestFactory')
-        self.pod = kube_object
-        self.request_factory = request_factory
-
-    def launch(self):
+    def run_pod(self, pod):
+        # type: (Pod) -> State
         """
             Launches the pod synchronously and waits for completion.
-            No return value from execution. Will raise an exception if things 
failed
         """
-        k8s_beta = kube_client()
-        req = self.request_factory.create(self)
-        logging.info(json.dumps(req))
-        resp = k8s_beta.create_namespaced_pod(body=req, 
namespace=self.pod.namespace)
-        logging.info("Job created. status='%s', yaml:\n%s"
-                     % (str(resp.status), str(req)))
-        for i in range(1, self.pod_timeout):
-            time.sleep(10)
-            logging.info('Waiting for success')
-            if self._execution_finished():
-                logging.info('Job finished!')
-                return
-        raise Exception("Job timed out!")
-
-    def _execution_finished(self):
-        k8s_beta = kube_client()
-        resp = k8s_beta.read_namespaced_pod_status(
-            self.pod.name,
-            namespace=self.pod.namespace)
-        logging.info('status : ' + str(resp.status))
-        logging.info('phase : i' + str(resp.status.phase))
-        if resp.status.phase == 'Failed':
-            raise Exception("Job " + self.pod.name + " failed!")
-        return resp.status.phase != 'Running'
-
-
-class KubernetesCommunicationService:
-    """
-    A service that manages communications between pods in Kubernetes and 
ariflow dagrun
-    Note that etcd service is running side by side of the airflow on the same 
machine
-    using kubernetes magic, so on airflow side we use localhost, and on the 
remote side
-    we use the provided etcd host.
-    """
-
-    def __init__(self, etcd_host, etcd_port):
-        self.etcd_host = etcd_host
-        self.etcd_port = etcd_port
-        self.url = 'http://localhost:{}'.format(self.etcd_port)
-
-    def pod_pre_stop_hook(self, return_data_file, task_id):
-        return 'echo value=$(cat %s) | curl -d "@-" -X PUT 
%s:%s/v2/keys/pod_metrics/%s' \
-               % (
-                   return_data_file, self.etcd_host, self.etcd_port, task_id)
-
-    def pod_return_data(self, task_id):
-        """
-            Returns the pod's return data. The pod_pre_stop_hook is 
responsible to upload 
-            the return data to etcd. 
-            
-            If the return_data_file is generated by the application, the pre 
stop hook 
-            will upload it to etcd and we will be download it back to airflow.
-        """
-        logging.info('querying {} for task id {}'.format(self.url, task_id))
-        try:
-            result = urllib2.urlopen(self.url + '/v2/keys/pod_metrics/' + 
task_id).read()
-            logging.info('result for querying {} for task id {}: {}'
-                         .format(self.url, task_id, result))
-            result = json.loads(result)['node']['value']
-            return result
-        except urllib2.HTTPError as err:
-            if err.code == 404:
-                return None  # Data not found
-            raise
-
-    @staticmethod
-    def from_dag_default_args(dag):
-        (etcd_host, etcd_port) = dag.default_args.get('etcd_endpoint', 
':').split(':')
-        logging.info('Setting etcd endpoint from dag default args {}:{}'
-                     .format(etcd_host, etcd_port))
-        if not etcd_host:
-            raise Exception('`KubernetesCommunicationService` '
-                            'requires etcd endpoint. Please defined it in dag '
-                            'degault_args')
-        return KubernetesCommunicationService(etcd_host, etcd_port)
+        resp = self.run_pod_async(pod)
+        final_status = self._monitor_pod(pod)
+        return final_status
+
+    def _kube_client(self):
+        #TODO: This should also allow people to point to a cluster.
+        config.load_incluster_config()
+        return client.CoreV1Api()
+
+    def _monitor_pod(self, pod):
+        # type: (Pod) -> State
+        for event in self._watch.stream(self.read_pod(pod), pod.namespace):
+            status = self._task_status(event)
+            if status == State.SUCCESS or status == State.FAILED:
+                return status
+
+    def _task_status(self, event):
+        # type: (V1Pod) -> State
+        task = event['object']
+        self.logger.info(
+            "Event: {} had an event of type {}".format(task.metadata.name,
+                                                       event['type']))
+        status = self.process_status(task.metadata.name, task.status.phase)
+        return status
+
+    def read_pod(self, pod):
+        return self._client.read_namespaced_pod(pod.name, pod.namespace)
+
+    def process_status(self, job_id, status):
+        if status == 'Pending':
+            return State.QUEUED
+        elif status == 'Failed':
+            self.logger.info("Event: {} Failed".format(job_id))
+            return State.FAILED
+        elif status == 'Succeeded':
+            self.logger.info("Event: {} Succeeded".format(job_id))
+            return State.SUCCESS
+        elif status == 'Running':
+            return State.RUNNING
+        else:
+            self.logger.info("Event: Invalid state {} on job 
{}".format(status, job_id))
+            return State.FAILED

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/airflow/contrib/operators/k8s_pod_operator/k8s_pod_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/k8s_pod_operator/k8s_pod_operator.py 
b/airflow/contrib/operators/k8s_pod_operator/k8s_pod_operator.py
index bf7b048..6af66ea 100644
--- a/airflow/contrib/operators/k8s_pod_operator/k8s_pod_operator.py
+++ b/airflow/contrib/operators/k8s_pod_operator/k8s_pod_operator.py
@@ -18,74 +18,61 @@ from airflow.exceptions import AirflowException
 from airflow.operators.python_operator import PythonOperator
 from airflow.utils.decorators import apply_defaults
 from airflow.contrib.kubernetes.pod_launcher import KubernetesLauncher, \
-    KubernetesCommunicationService
+    KubernetesCommunicationService, incluster_namespace
 from airflow.contrib.kubernetes.kubernetes_request_factory import \
     SimplePodRequestFactory, \
     ReturnValuePodRequestFactory
-from .op_context import OpContext
 
 
 class PodOperator(PythonOperator):
     """
         Executes a pod and waits for the job to finish.
+
         :param dag_run_id: The unique run ID that would be attached to the pod 
as a label
         :type dag_run_id: str
         :param pod_factory: Reference to the function that creates the pod 
with format:
                             function (OpContext) => Pod
         :type pod_factory: callable
-        :param cache_output: If set to true, the output of the pod would be 
saved in a
-                            cache object using md5 hash of all the pod 
parameters
-                            and in case of success, the cached results will be 
returned
-                            on consecutive calls. Only use this
     """
     # template_fields = tuple('dag_run_id')
     ui_color = '#8da7be'
 
     @apply_defaults
     def __init__(
-        self,
-        dag_run_id,
-        pod_factory,
-        cache_output,
-        kube_request_factory=None,
-        *args,
-        **kwargs
-    ):
-        super(PodOperator, self).__init__(
-            python_callable=lambda _: 1,
-            provide_context=True,
-            *args,
-            **kwargs)
+            self,
+            dag_run_id,
+            pod_factory,
+            kube_request_factory=None,
+            *args, **kwargs):
+        super(PodOperator, self).__init__(python_callable=lambda _: 1, 
provide_context=True, *args, **kwargs)
         self.logger = logging.getLogger(self.__class__.__name__)
         if not callable(pod_factory):
             raise AirflowException('`pod_factory` param must be callable')
         self.dag_run_id = dag_run_id
         self.pod_factory = pod_factory
-        self._cache_output = cache_output
-        self.op_context = OpContext(self.task_id)
         self.kwargs = kwargs
         self._kube_request_factory = kube_request_factory or 
SimplePodRequestFactory
 
     def execute(self, context):
-        task_instance = context.get('task_instance')
-        if task_instance is None:
-            raise AirflowException('`task_instance` is empty! This should not 
happen')
-        self.op_context.set_xcom_instance(task_instance)
-        pod = self.pod_factory(self.op_context, context)
+        pod = self.get_pod_object(context)
+
         # Customize the pod
         pod.name = self.task_id
         pod.labels['run_id'] = self.dag_run_id
-        pod.namespace = self.dag.default_args.get('namespace', pod.namespace)
+        try:
+            pod.namespace = self.dag.default_args.get('namespace', 
pod.namespace) or incluster_namespace()
+        except:
+            # Used default namespace
+            pass
 
         # Launch the pod and wait for it to finish
         KubernetesLauncher(pod, self._kube_request_factory).launch()
-        self.op_context.result = pod.result
+        result = pod.result
+        context['ti'].xcom_push(key='result', value=result)
 
-        # Cache the output
         custom_return_value = self.on_pod_success(context)
-        if custom_return_value:
-            self.op_context.custom_return_value = custom_return_value
-        return self.op_context.result
+        self.set_custom_return_value(context, custom_return_value)
+        return result
 
     def on_pod_success(self, context):
         """
@@ -95,32 +82,39 @@ class PodOperator(PythonOperator):
         """
         pass
 
+    def get_pod_object(self, context):
+        """
+            Returns a pod object. Overwrite this method to define custom 
objects
+        :param context: The task context
+        :return: The pod object
+        """
+        return self.pod_factory(context)
+
+    def set_custom_return_value(self, context, custom_return_value):
+        if custom_return_value:
+            context['ti'].xcom_push(key='custom_result', 
value=custom_return_value)
+
 
 class ReturnValuePodOperator(PodOperator):
     """
      This pod operators is a normal pod operator with the addition of
      reading custom return value back from kubernetes.
     """
-
     def __init__(self,
-                 kube_com_service_factory,
                  result_data_file,
+                 kube_com_service_factory=None,
                  *args, **kwargs):
         super(ReturnValuePodOperator, self).__init__(*args, **kwargs)
+        kube_com_service_factory = kube_com_service_factory or (
+                 lambda: 
KubernetesCommunicationService.from_dag_default_args(self.dag))
         if not isinstance(kube_com_service_factory(), 
KubernetesCommunicationService):
-            raise AirflowException(
-                '`kube_com_service_factory` must be of type '
-                'KubernetesCommunicationService')
+            raise AirflowException('`kube_com_service_factory` must be of type 
KubernetesCommunicationService')
         self._kube_com_service_factory = kube_com_service_factory
         self._result_data_file = result_data_file
-        self._kube_request_factory = self._return_value_kube_request  # 
Overwrite the
-        # default request factory
+        self._kube_request_factory = self._return_value_kube_request  # 
Overwrite the default request factory
 
     def on_pod_success(self, context):
-        return_val = 
self._kube_com_service_factory().pod_return_data(self.task_id)
-        self.op_context.result = return_val  # We also overwrite the results
-        return return_val
+        return self._kube_com_service_factory().pod_return_data(self.task_id)
 
     def _return_value_kube_request(self):
-        return ReturnValuePodRequestFactory(self._kube_com_service_factory,
-                                            self._result_data_file)
+        return ReturnValuePodRequestFactory(self._kube_com_service_factory, 
self._result_data_file)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/airflow/contrib/operators/k8s_pod_operator/op_context.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/k8s_pod_operator/op_context.py 
b/airflow/contrib/operators/k8s_pod_operator/op_context.py
deleted file mode 100644
index 55a3b00..0000000
--- a/airflow/contrib/operators/k8s_pod_operator/op_context.py
+++ /dev/null
@@ -1,104 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-
-from airflow import AirflowException
-import logging
-
-
-class OpContext(object):
-    """
-        Data model for operation context of a pod operator with hyper 
parameters. 
-        OpContext is able to communicate the context between PodOperators by 
-        encapsulating XCom communication
-        Note: do not directly modify the upstreams
-        Also note: xcom_instance MUST be set before any attribute of this 
class can be 
-        read.
-        :param: task_id             The task ID
-    """
-    _supported_attributes = {'hyper_parameters', 'custom_return_value'}
-
-    def __init__(self, task_id):
-        self.task_id = task_id
-        self._upstream = []
-        self._result = '__not_set__'
-        self._data = {}
-        self._xcom_instance = None
-        self._parent = None
-
-    def __str__(self):
-        return 'upstream: [' + \
-               ','.join([u.task_id for u in self._upstream]) + ']\n' + \
-               'params:' + ','.join(
-            [k + '=' + str(self._data[k]) for k in self._data.keys()])
-
-    def __setattr__(self, name, value):
-        if name in self._data:
-            raise AirflowException('`{}` is already set'.format(name))
-        if name not in self._supported_attributes:
-            logging.warn(
-                '`{}` is not in the supported attribute list for 
OpContext'.format(name))
-        self.get_xcom_instance().xcom_push(key=name, value=value)
-        self._data[name] = value
-
-    def __getattr__(self, item):
-        if item not in self._supported_attributes:
-            logging.warn(
-                '`{}` is not in the supported attribute list for 
OpContext'.format(item))
-        if item not in self._data:
-            self._data[item] = self.get_xcom_instance().xcom_pull(key=item,
-                                                                  
task_ids=self.task_id)
-        return self._data[item]
-
-    @property
-    def result(self):
-        if self._result == '__not_set__':
-            self._result = 
self.get_xcom_instance().xcom_pull(task_ids=self.task_id)
-        return self._result
-
-    @result.setter
-    def result(self, value):
-        if self._result != '__not_set__':
-            raise AirflowException('`result` is already set')
-        self._result = value
-
-    @property
-    def upstream(self):
-        return self._upstream
-
-    def append_upstream(self, upstream_op_contexes):
-        """
-        Appends a list of op_contexts to the upstream. It will create new 
instances and 
-        set the task_id.
-        All the upstream op_contextes will share the same xcom_instance with 
this 
-        op_context
-        :param upstream_op_contexes: List of upstream op_contextes
-        """
-        for up in upstream_op_contexes:
-            op_context = OpContext(up.tak_id)
-            op_context._parent = self
-            self._upstream.append(op_context)
-
-    def set_xcom_instance(self, xcom_instance):
-        """
-        Sets the xcom_instance for this op_context and upstreams
-        :param xcom_instance: The Airflow TaskInstance for communication 
through XCom
-        :type xcom_instance: airflow.models.TaskInstance
-        """
-        self._xcom_instance = xcom_instance
-
-    def get_xcom_instance(self):
-        if self._xcom_instance is None and self._parent is None:
-            raise AirflowException(
-                'Trying to access attribtues from OpContext before setting the 
'
-                'xcom_instance')
-        return self._xcom_instance or self._parent.get_xcom_instance()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/airflow/contrib/operators/kubernetes/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/kubernetes/__init__.py 
b/airflow/contrib/operators/kubernetes/__init__.py
new file mode 100644
index 0000000..9d7677a
--- /dev/null
+++ b/airflow/contrib/operators/kubernetes/__init__.py
@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/airflow/contrib/operators/kubernetes/pod_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/kubernetes/pod_operator.py 
b/airflow/contrib/operators/kubernetes/pod_operator.py
new file mode 100644
index 0000000..8b7a55f
--- /dev/null
+++ b/airflow/contrib/operators/kubernetes/pod_operator.py
@@ -0,0 +1,104 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from airflow.exceptions import AirflowException
+from airflow.operators.python_operator import PythonOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.contrib.kubernetes.pod_launcher import PodLauncher
+from airflow.contrib.kubernetes.pod import Pod
+from airflow.utils.state import State
+
+
+class PodOperator(PythonOperator):
+    """
+        Executes a pod and waits for the job to finish.
+        :param dag_run_id: The unique run ID that would be attached to the pod 
as a label
+        :type dag_run_id: str
+        :param pod_factory: Reference to the function that creates the pod 
with format:
+                            function (OpContext) => Pod
+        :type pod_factory: callable
+        :param cache_output: If set to true, the output of the pod would be 
saved in a
+                            cache object using md5 hash of all the pod 
parameters
+                            and in case of success, the cached results will be 
returned
+                            on consecutive calls. Only use this
+    """
+    # template_fields = tuple('dag_run_id')
+    ui_color = '#8da7be'
+
+    def blank_func(self, context):
+        return None
+
+    @apply_defaults
+    def __init__(
+        self,
+        dag_run_id,
+        pod,
+        on_pod_success_func = blank_func,
+        *args,
+        **kwargs
+    ):
+        # type: (str, Pod) -> PodOperator
+        super(PodOperator, self).__init__(
+            python_callable=lambda _:1,
+            provide_context=True,
+            *args,
+            **kwargs)
+        self.logger = logging.getLogger(self.__class__.__name__)
+        self.pod = pod
+        self.dag_run_id = dag_run_id
+        self.pod_launcher = PodLauncher()
+        self.kwargs = kwargs
+        self._on_pod_success_func = on_pod_success_func
+
+    def execute(self, context):
+        task_instance = context.get('task_instance')
+        if task_instance is None:
+            raise AirflowException('`task_instance` is empty! This should not 
happen')
+
+        pod = self.pod
+
+        # Customize the pod
+        pod.name = self.task_id
+        pod.labels['run_id'] = self.dag_run_id
+        pod.namespace = self.dag.default_args.get('namespace', pod.namespace)
+
+        pod_result = self.pod_launcher.run_pod(pod)
+
+        if pod_result == State.FAILED:
+            raise AirflowException("Pod returned a failed status")
+
+        # Launch the pod and wait for it to finish
+        self.op_context.result = pod.result
+        if pod_result == State.FAILED:
+            raise AirflowException("Pod failed")
+
+        # Cache the output
+        custom_return_value = self.on_pod_success(context)
+        if custom_return_value:
+            return custom_return_value
+
+    def on_pod_success(self, context):
+        """
+            Called when pod is executed successfully.
+            
+            If you want to access return values for XCOM, place values
+            in accessible file system or DB and override this function.
+            
+            :return: Returns a custom return value for pod which will
+                     be stored in xcom
+                     
+        """
+        return self._on_pod_success_func(context=context)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/airflow/example_dags/example_pod_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_pod_operator.py 
b/airflow/example_dags/example_pod_operator.py
new file mode 100644
index 0000000..ec62aaf
--- /dev/null
+++ b/airflow/example_dags/example_pod_operator.py
@@ -0,0 +1,91 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+Example of the PodOperator and ReturnValuePodOperator which would execute
+pods on a Kubernetes cluster. PodOperator would only work if airflow is
+deployed within kubernetes.
+"""
+import os
+
+import airflow
+import random
+from airflow.contrib.kubernetes.pod import Pod, Config
+from airflow.contrib.operators.k8s_pod_operator import ReturnValuePodOperator, 
PodOperator
+from airflow.models import DAG
+from airflow.utils.trigger_rule import TriggerRule
+
+# TODO: Replace the etcd endpoint with your own etcd endpoint
+args = {
+    'owner': 'airflow',
+    'etcd_endpoint': os.environ.get('AIRFLOWSVC_SERVICE_HOST') + ':' +
+                     os.environ.get('AIRFLOWSVC_SERVICE_PORT_ETCDSVC_PORT'),
+    'start_date': airflow.utils.dates.days_ago(2)
+}
+
+docker_image = 'artprod.dev.bloomberg.com/ds/molecula-python:1.0.0.0-SNAPSHOT' 
 # Replace with 'ubuntu:latest'
+dag = DAG(
+    dag_id='example_pod_operator', default_args=args,
+    schedule_interval=None)
+
+
+def pod_that_returns_hello(context):
+    """
+    Returns a Pod object given the airflow context.
+    """
+    image = docker_image
+    cmds = ['/bin/bash', '-c', 'echo "Hello $RANDOM" > /tmp/result.txt']
+    return Pod(image=image, cmds=cmds)
+
+
+hello_kube_step1 = ReturnValuePodOperator(dag=dag,
+                                          task_id='hello-kube-step1',
+                                          dag_run_id='run-1',
+                                          pod_factory=pod_that_returns_hello,
+                                          result_data_file='/tmp/result.txt')
+
+
+def pod_that_reads_upstream_result(context):
+    up_task_id = 'hello-kube-step1'
+    # The message including a random number generated inside the upstream pod 
will be read here
+    return_val = context['ti'].xcom_pull(key='custom_result', 
task_ids=up_task_id)
+    image = docker_image
+    cmds = ['/bin/bash', '-c', 'echo ' + return_val]
+    return Pod(image=image, cmds=cmds)
+
+
+hello_kube_step2 = PodOperator(dag=dag,
+                               task_id='hello-kube-step2',
+                               dag_run_id='run_1',
+                               pod_factory=pod_that_reads_upstream_result)
+hello_kube_step2.set_upstream(hello_kube_step1)
+
+def pod_that_injects_configs(context):
+    """
+    The returning pod object has a configs map which tells the operator to 
inject some JSON objects as
+    config files
+    """
+    image = docker_image  # Replace with 'ubuntu:latest'
+    configs = [ Config('/configs/c1.json', { 'random_val': 
str(random.random()) }), Config('/configs/c2.json', { 'my_db': 'conn_str' }) ]
+    cmds = ['/bin/bash', '-c', 'sleep 3; cat /configs/c2.json']
+    return Pod(image=image, cmds=cmds, configs=configs)
+
+hello_kube_step3 = ReturnValuePodOperator(dag=dag,
+                               task_id='hello-kube-step3',
+                               dag_run_id='run_1',
+                               pod_factory=pod_that_injects_configs,
+                               result_data_file='/configs/c1.json')
+hello_kube_step3.set_upstream(hello_kube_step1)
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/docker/Dockerfile
----------------------------------------------------------------------
diff --git a/docker/Dockerfile b/docker/Dockerfile
new file mode 100644
index 0000000..2004e97
--- /dev/null
+++ b/docker/Dockerfile
@@ -0,0 +1,34 @@
+FROM ubuntu:16.04
+
+# install deps
+RUN apt-get update -y && apt-get install -y \
+        wget \
+        python-dev \
+        python-pip \
+        libczmq-dev \
+        libcurlpp-dev \
+        curl \
+        libssl-dev \
+        git \
+        inetutils-telnet \
+        bind9utils
+
+RUN pip install -U setuptools && \
+    pip install -U pip
+
+RUN pip install kubernetes && \
+    pip install cryptography && \
+    pip install psycopg2==2.7.1
+
+# install airflow
+COPY airflow.tar.gz /tmp/airflow.tar.gz
+RUN pip install /tmp/airflow.tar.gz
+
+# prep airflow
+ENV AIRFLOW_HOME=/root/airflow
+ENV AIRFLOW__CORE__EXECUTOR=KubernetesExecutor
+
+COPY bootstrap.sh /bootstrap.sh
+RUN chmod +x /bootstrap.sh
+
+ENTRYPOINT ["/bootstrap.sh"]

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/docker/bootstrap.sh
----------------------------------------------------------------------
diff --git a/docker/bootstrap.sh b/docker/bootstrap.sh
new file mode 100644
index 0000000..82124ac
--- /dev/null
+++ b/docker/bootstrap.sh
@@ -0,0 +1,13 @@
+#!/bin/bash
+
+# launch the appropriate process
+
+if [ "$1" = "webserver" ]
+then
+       exec airflow webserver
+fi
+
+if [ "$1" = "scheduler" ]
+then
+       exec airflow scheduler
+fi

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/docker/build.sh
----------------------------------------------------------------------
diff --git a/docker/build.sh b/docker/build.sh
new file mode 100755
index 0000000..f2a942e
--- /dev/null
+++ b/docker/build.sh
@@ -0,0 +1,12 @@
+IMAGE=grantnicholas/kubeairflow
+TAG=${1:-latest}
+
+if [ -f airflow.tar.gz ]; then
+    echo "Not rebuilding airflow source"
+else
+    cd ../ && python setup.py sdist && cd docker && \
+    cp ../dist/apache-airflow-1.9.0.dev0+incubating.tar.gz airflow.tar.gz
+fi
+
+docker build . --tag=${IMAGE}:${TAG}
+docker push ${IMAGE}:${TAG}

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/kube/airflow.yaml.template
----------------------------------------------------------------------
diff --git a/kube/airflow.yaml.template b/kube/airflow.yaml.template
new file mode 100644
index 0000000..c5877ca
--- /dev/null
+++ b/kube/airflow.yaml.template
@@ -0,0 +1,107 @@
+apiVersion: extensions/v1beta1
+kind: Deployment
+metadata:
+  name: airflow
+spec:
+  replicas: 1
+  template:
+    metadata:
+      labels:
+        name: airflow
+      annotations:
+        pod.beta.kubernetes.io/init-containers: '[
+          {
+              "name": "init",
+              "image": "{{docker_image}}",
+              "command": ["bash", "-c", "cd 
/usr/local/lib/python2.7/dist-packages/airflow && airflow initdb && alembic 
upgrade head"],
+              "env": [
+                {"name": "AIRFLOW__CORE__SQL_ALCHEMY_CONN", "value": 
"postgresql+psycopg2://root:root@postgres-airflow:5432/airflow"},
+                {"name": "AIRFLOW__CORE__K8S_IMAGE", "value": 
"{{docker_image}}"},
+                {"name": "AIRFLOW__CORE__K8S_GIT_REPO", "value": 
"https://github.com/grantnicholas/testdags.git"}
+              ]
+          }
+      ]'
+    spec:
+      initContainers:
+      - name: init
+        image: {{docker_image}}
+        command: [
+          "bash", "-c", "cd /usr/local/lib/python2.7/dist-packages/airflow && 
airflow initdb && alembic upgrade head"
+        ]
+        env:
+        - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
+          value: postgresql+psycopg2://root:root@postgres-airflow:5432/airflow
+        - name: AIRFLOW__CORE__K8S_IMAGE
+          value: {{docker_image}}
+        - name: AIRFLOW__CORE__K8S_GIT_REPO
+          value: https://github.com/grantnicholas/testdags.git
+      containers:
+      - name: web
+        image: {{docker_image}}
+        ports:
+        - name: web
+          containerPort: 8080
+        args: ["webserver"]
+        env:
+        - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
+          value: postgresql+psycopg2://root:root@postgres-airflow:5432/airflow
+        - name: AIRFLOW__CORE__K8S_IMAGE
+          value: {{docker_image}}
+        - name: AIRFLOW__CORE__K8S_GIT_REPO
+          value: https://github.com/grantnicholas/testdags.git
+        volumeMounts:
+        - name: dags
+          mountPath: /root/airflow/dags/synched
+        readinessProbe:
+          initialDelaySeconds: 5
+          timeoutSeconds: 5
+          periodSeconds: 5
+          httpGet:
+            path: /admin
+            port: 8080
+        livenessProbe:
+          initialDelaySeconds: 5
+          timeoutSeconds: 5
+          failureThreshold: 5
+          httpGet:
+            path: /admin
+            port: 8080
+      - name: scheduler
+        image: {{docker_image}}
+        args: ["scheduler"]
+        env:
+        - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
+          value: postgresql+psycopg2://root:root@postgres-airflow:5432/airflow
+        - name: AIRFLOW__CORE__K8S_IMAGE
+          value: {{docker_image}}
+        - name: AIRFLOW__CORE__K8S_GIT_REPO
+          value: https://github.com/grantnicholas/testdags.git
+        volumeMounts:
+        - name: dags
+          mountPath: /root/airflow/dags/synched
+      - name: sync
+        image: gcr.io/google_containers/git-sync:v2.0.4
+        env:
+        - name: GIT_SYNC_REPO
+          value: https://github.com/grantnicholas/testdags.git
+        - name: GIT_SYNC_DEST
+          value: git
+        volumeMounts:
+        - name: dags
+          mountPath: /git
+      volumes:
+      - name: dags
+        emptyDir: {}
+---
+apiVersion: v1
+kind: Service
+metadata:
+  name: airflow
+spec:
+  type: NodePort
+  ports:
+    - port: 8080
+      nodePort: 30809
+  selector:
+    name: airflow
+

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c177d6e8/kube/deploy.sh
----------------------------------------------------------------------
diff --git a/kube/deploy.sh b/kube/deploy.sh
new file mode 100755
index 0000000..28b58ca
--- /dev/null
+++ b/kube/deploy.sh
@@ -0,0 +1,6 @@
+IMAGE=${1:-grantnicholas/kubeairflow}
+TAG=${2:-latest}
+
+mkdir -p .generated
+kubectl apply -f postgres.yaml
+sed "s#{{docker_image}}#$IMAGE:$TAG#g" airflow.yaml.template > 
.generated/airflow.yaml && kubectl apply -f .generated/airflow.yaml


Reply via email to