http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/contrib/operators/k8s_pod_operator/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/k8s_pod_operator/__init__.py 
b/airflow/contrib/operators/k8s_pod_operator/__init__.py
deleted file mode 100644
index 50c7b86..0000000
--- a/airflow/contrib/operators/k8s_pod_operator/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-from .k8s_pod_operator import *

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/contrib/operators/k8s_pod_operator/k8s_pod_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/k8s_pod_operator/k8s_pod_operator.py 
b/airflow/contrib/operators/k8s_pod_operator/k8s_pod_operator.py
deleted file mode 100644
index 6af66ea..0000000
--- a/airflow/contrib/operators/k8s_pod_operator/k8s_pod_operator.py
+++ /dev/null
@@ -1,120 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import logging
-
-from airflow.exceptions import AirflowException
-from airflow.operators.python_operator import PythonOperator
-from airflow.utils.decorators import apply_defaults
-from airflow.contrib.kubernetes.pod_launcher import KubernetesLauncher, \
-    KubernetesCommunicationService, incluster_namespace
-from airflow.contrib.kubernetes.kubernetes_request_factory import \
-    SimplePodRequestFactory, \
-    ReturnValuePodRequestFactory
-
-
-class PodOperator(PythonOperator):
-    """
-        Executes a pod and waits for the job to finish.
-
-        :param dag_run_id: The unique run ID that would be attached to the pod 
as a label
-        :type dag_run_id: str
-        :param pod_factory: Reference to the function that creates the pod 
with format:
-                            function (OpContext) => Pod
-        :type pod_factory: callable
-    """
-    # template_fields = tuple('dag_run_id')
-    ui_color = '#8da7be'
-
-    @apply_defaults
-    def __init__(
-            self,
-            dag_run_id,
-            pod_factory,
-            kube_request_factory=None,
-            *args, **kwargs):
-        super(PodOperator, self).__init__(python_callable=lambda _: 1, 
provide_context=True, *args, **kwargs)
-        self.logger = logging.getLogger(self.__class__.__name__)
-        if not callable(pod_factory):
-            raise AirflowException('`pod_factory` param must be callable')
-        self.dag_run_id = dag_run_id
-        self.pod_factory = pod_factory
-        self.kwargs = kwargs
-        self._kube_request_factory = kube_request_factory or 
SimplePodRequestFactory
-
-    def execute(self, context):
-        pod = self.get_pod_object(context)
-
-        # Customize the pod
-        pod.name = self.task_id
-        pod.labels['run_id'] = self.dag_run_id
-        try:
-            pod.namespace = self.dag.default_args.get('namespace', 
pod.namespace) or incluster_namespace()
-        except:
-            # Used default namespace
-            pass
-
-        # Launch the pod and wait for it to finish
-        KubernetesLauncher(pod, self._kube_request_factory).launch()
-        result = pod.result
-        context['ti'].xcom_push(key='result', value=result)
-
-        custom_return_value = self.on_pod_success(context)
-        self.set_custom_return_value(context, custom_return_value)
-        return result
-
-    def on_pod_success(self, context):
-        """
-            Called when pod is executed successfully.
-            :return: Returns a custom return value for pod which will
-                     be stored in xcom
-        """
-        pass
-
-    def get_pod_object(self, context):
-        """
-            Returns a pod object. Overwrite this method to define custom 
objects
-        :param context: The task context
-        :return: The pod object
-        """
-        return self.pod_factory(context)
-
-    def set_custom_return_value(self, context, custom_return_value):
-        if custom_return_value:
-            context['ti'].xcom_push(key='custom_result', 
value=custom_return_value)
-
-
-class ReturnValuePodOperator(PodOperator):
-    """
-     This pod operators is a normal pod operator with the addition of
-     reading custom return value back from kubernetes.
-    """
-    def __init__(self,
-                 result_data_file,
-                 kube_com_service_factory=None,
-                 *args, **kwargs):
-        super(ReturnValuePodOperator, self).__init__(*args, **kwargs)
-        kube_com_service_factory = kube_com_service_factory or (
-                 lambda: 
KubernetesCommunicationService.from_dag_default_args(self.dag))
-        if not isinstance(kube_com_service_factory(), 
KubernetesCommunicationService):
-            raise AirflowException('`kube_com_service_factory` must be of type 
KubernetesCommunicationService')
-        self._kube_com_service_factory = kube_com_service_factory
-        self._result_data_file = result_data_file
-        self._kube_request_factory = self._return_value_kube_request  # 
Overwrite the default request factory
-
-    def on_pod_success(self, context):
-        return self._kube_com_service_factory().pod_return_data(self.task_id)
-
-    def _return_value_kube_request(self):
-        return ReturnValuePodRequestFactory(self._kube_com_service_factory, 
self._result_data_file)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/contrib/operators/kubernetes/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/kubernetes/__init__.py 
b/airflow/contrib/operators/kubernetes/__init__.py
deleted file mode 100644
index 9d7677a..0000000
--- a/airflow/contrib/operators/kubernetes/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/contrib/operators/kubernetes/pod_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/kubernetes/pod_operator.py 
b/airflow/contrib/operators/kubernetes/pod_operator.py
deleted file mode 100644
index 0db8c6d..0000000
--- a/airflow/contrib/operators/kubernetes/pod_operator.py
+++ /dev/null
@@ -1,100 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from airflow.exceptions import AirflowException
-from airflow.operators.python_operator import PythonOperator
-from airflow.utils.decorators import apply_defaults
-from airflow.contrib.kubernetes.pod_launcher import PodLauncher
-from airflow.contrib.kubernetes.pod import Pod
-from airflow.utils.state import State
-
-
-class PodOperator(PythonOperator):
-    """
-    Executes a pod and waits for the job to finish.
-
-    :param dag_run_id: The unique run ID that would be attached to the pod as 
a label
-    :type dag_run_id: str
-    :param pod_factory: Reference to the function that creates the pod with 
format:
-        function (OpContext) => Pod
-    :type pod_factory: callable
-    :param cache_output: If set to true, the output of the pod would be saved 
in a
-        cache object using md5 hash of all the pod parameters and in case of 
success, the cached
-        results will be returned on consecutive calls. Only use this
-    """
-    # template_fields = tuple('dag_run_id')
-    ui_color = '#8da7be'
-
-    def blank_func(self, context):
-        return None
-
-    @apply_defaults
-    def __init__(
-        self,
-        dag_run_id,
-        pod,
-        on_pod_success_func = blank_func,
-        *args,
-        **kwargs
-    ):
-        # type: (str, Pod) -> PodOperator
-        super(PodOperator, self).__init__(
-            python_callable=lambda _:1,
-            provide_context=True,
-            *args,
-            **kwargs)
-        self.pod = pod
-        self.dag_run_id = dag_run_id
-        self.pod_launcher = PodLauncher()
-        self.kwargs = kwargs
-        self._on_pod_success_func = on_pod_success_func
-
-    def execute(self, context):
-        task_instance = context.get('task_instance')
-        if task_instance is None:
-            raise AirflowException('`task_instance` is empty! This should not 
happen')
-
-        pod = self.pod
-
-        # Customize the pod
-        pod.name = self.task_id
-        pod.labels['run_id'] = self.dag_run_id
-        pod.namespace = self.dag.default_args.get('namespace', pod.namespace)
-
-        pod_result = self.pod_launcher.run_pod(pod)
-
-        if pod_result == State.FAILED:
-            raise AirflowException("Pod returned a failed status")
-
-        # Launch the pod and wait for it to finish
-        self.op_context.result = pod.result
-        if pod_result == State.FAILED:
-            raise AirflowException("Pod failed")
-
-        # Cache the output
-        custom_return_value = self.on_pod_success(context)
-        if custom_return_value:
-            return custom_return_value
-
-    def on_pod_success(self, context):
-        """
-        Called when pod is executed successfully.
-
-        If you want to access return values for XCOM, place values
-        in accessible file system or DB and override this function.
-
-        :return: Returns a custom return value for pod which will
-            be stored in xcom
-        """
-        return self._on_pod_success_func(context=context)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/dag_importer/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/dag_importer/__init__.py b/airflow/dag_importer/__init__.py
deleted file mode 100644
index f0a792d..0000000
--- a/airflow/dag_importer/__init__.py
+++ /dev/null
@@ -1,83 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import logging
-from airflow import configuration
-
-
-def _integrate_plugins():
-    pass
-
-
-dag_import_spec = {}
-
-
-def import_dags():
-    logging.info("importing dags")
-    if configuration.has_option('core', 'k8s_mode'):
-        mode = configuration.get('core', 'k8s_mode')
-        dag_import_func(mode)()
-    else:
-        _import_hostpath()
-
-
-def dag_import_func(mode):
-    return {
-        'git': _import_git,
-        'cinder': _import_cinder,
-    }.get(mode, _import_hostpath)
-
-
-def _import_hostpath():
-
-    logging.info("importing dags locally")
-    spec = {'name': 'shared-data', 'hostPath': {}}
-    spec['hostPath']['path'] = '/tmp/dags'
-    global dag_import_spec
-    dag_import_spec = spec
-
-
-def _import_cinder():
-    '''
-    kind: StorageClass
-    apiVersion: storage.k8s.io/v1
-    metadata:
-        name: gold
-    provisioner: kubernetes.io/cinder
-    parameters:
-        type: fast
-    availability: nova
-    :return: 
-    '''
-    global dag_import_spec
-    spec = {}
-
-    spec['kind'] = 'StorageClass'
-    spec['apiVersion'] = 'storage.k8s.io/v1'
-    spec['metatdata']['name'] = 'gold'
-    spec['provisioner'] = 'kubernetes.io/cinder'
-    spec['parameters']['type'] = 'fast'
-    spec['availability'] = 'nova'
-
-
-def _import_git():
-    logging.info("importing dags from github")
-    global dag_import_spec
-    git_link = configuration.get('core', 'k8s_git_link')
-    spec = {'name': 'shared-data', 'gitRepo': {}}
-    spec['gitRepo']['repository'] = git_link
-    if configuration.has_option('core','k8s_git_revision'):
-        revision = configuration.get('core', 'k8s_git_revision')
-        spec['gitRepo']['revision'] = revision
-    dag_import_spec = spec

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/example_dags/example_kubernetes_executor.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_kubernetes_executor.py 
b/airflow/example_dags/example_kubernetes_executor.py
index 31bc2fb..2a02ef6 100644
--- a/airflow/example_dags/example_kubernetes_executor.py
+++ b/airflow/example_dags/example_kubernetes_executor.py
@@ -15,10 +15,8 @@ from __future__ import print_function
 import airflow
 from airflow.operators.python_operator import PythonOperator
 from airflow.models import DAG
-from airflow.contrib.executors.kubernetes_executor import 
KubernetesExecutorConfig
 import os
 
-
 args = {
     'owner': 'airflow',
     'start_date': airflow.utils.dates.days_ago(2)
@@ -39,7 +37,6 @@ def use_zip_binary():
     assert rc == 0
 
 
-
 # You don't have to use any special KubernetesExecutor configuration if you 
don't want to
 start_task = PythonOperator(
     task_id="start_task", python_callable=print_stuff, dag=dag
@@ -60,7 +57,8 @@ two_task = PythonOperator(
 # Limit resources on this operator/task
 three_task = PythonOperator(
     task_id="three_task", python_callable=print_stuff, dag=dag,
-    executor_config={"KubernetesExecutor": {"request_memory": "128Mi", 
"limit_memory": "128Mi"}}
+    executor_config={
+        "KubernetesExecutor": {"request_memory": "128Mi", "limit_memory": 
"128Mi"}}
 )
 
 start_task.set_downstream([one_task, two_task, three_task])

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/example_dags/example_pod_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_pod_operator.py 
b/airflow/example_dags/example_pod_operator.py
deleted file mode 100644
index ec62aaf..0000000
--- a/airflow/example_dags/example_pod_operator.py
+++ /dev/null
@@ -1,91 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""
-Example of the PodOperator and ReturnValuePodOperator which would execute
-pods on a Kubernetes cluster. PodOperator would only work if airflow is
-deployed within kubernetes.
-"""
-import os
-
-import airflow
-import random
-from airflow.contrib.kubernetes.pod import Pod, Config
-from airflow.contrib.operators.k8s_pod_operator import ReturnValuePodOperator, 
PodOperator
-from airflow.models import DAG
-from airflow.utils.trigger_rule import TriggerRule
-
-# TODO: Replace the etcd endpoint with your own etcd endpoint
-args = {
-    'owner': 'airflow',
-    'etcd_endpoint': os.environ.get('AIRFLOWSVC_SERVICE_HOST') + ':' +
-                     os.environ.get('AIRFLOWSVC_SERVICE_PORT_ETCDSVC_PORT'),
-    'start_date': airflow.utils.dates.days_ago(2)
-}
-
-docker_image = 'artprod.dev.bloomberg.com/ds/molecula-python:1.0.0.0-SNAPSHOT' 
 # Replace with 'ubuntu:latest'
-dag = DAG(
-    dag_id='example_pod_operator', default_args=args,
-    schedule_interval=None)
-
-
-def pod_that_returns_hello(context):
-    """
-    Returns a Pod object given the airflow context.
-    """
-    image = docker_image
-    cmds = ['/bin/bash', '-c', 'echo "Hello $RANDOM" > /tmp/result.txt']
-    return Pod(image=image, cmds=cmds)
-
-
-hello_kube_step1 = ReturnValuePodOperator(dag=dag,
-                                          task_id='hello-kube-step1',
-                                          dag_run_id='run-1',
-                                          pod_factory=pod_that_returns_hello,
-                                          result_data_file='/tmp/result.txt')
-
-
-def pod_that_reads_upstream_result(context):
-    up_task_id = 'hello-kube-step1'
-    # The message including a random number generated inside the upstream pod 
will be read here
-    return_val = context['ti'].xcom_pull(key='custom_result', 
task_ids=up_task_id)
-    image = docker_image
-    cmds = ['/bin/bash', '-c', 'echo ' + return_val]
-    return Pod(image=image, cmds=cmds)
-
-
-hello_kube_step2 = PodOperator(dag=dag,
-                               task_id='hello-kube-step2',
-                               dag_run_id='run_1',
-                               pod_factory=pod_that_reads_upstream_result)
-hello_kube_step2.set_upstream(hello_kube_step1)
-
-def pod_that_injects_configs(context):
-    """
-    The returning pod object has a configs map which tells the operator to 
inject some JSON objects as
-    config files
-    """
-    image = docker_image  # Replace with 'ubuntu:latest'
-    configs = [ Config('/configs/c1.json', { 'random_val': 
str(random.random()) }), Config('/configs/c2.json', { 'my_db': 'conn_str' }) ]
-    cmds = ['/bin/bash', '-c', 'sleep 3; cat /configs/c2.json']
-    return Pod(image=image, cmds=cmds, configs=configs)
-
-hello_kube_step3 = ReturnValuePodOperator(dag=dag,
-                               task_id='hello-kube-step3',
-                               dag_run_id='run_1',
-                               pod_factory=pod_that_injects_configs,
-                               result_data_file='/configs/c1.json')
-hello_kube_step3.set_upstream(hello_kube_step1)
-
-
-

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/executors/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/executors/__init__.py b/airflow/executors/__init__.py
index 047da6f..7ae396c 100644
--- a/airflow/executors/__init__.py
+++ b/airflow/executors/__init__.py
@@ -17,7 +17,7 @@
 # specific language governing permissions and limitations
 # under the License.
 import sys
-
+from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow import configuration
 from airflow.exceptions import AirflowException
 from airflow.executors.base_executor import BaseExecutor

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/executors/base_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/base_executor.py 
b/airflow/executors/base_executor.py
index 7f00e93..0648f9b 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -133,7 +133,10 @@ class BaseExecutor(LoggingMixin):
             ti.refresh_from_db()
             if ti.state != State.RUNNING:
                 self.running[key] = command
-                self.execute_async(key, command=command, queue=queue, 
executor_config=ti.executor_config)
+                self.execute_async(key=key,
+                                   command=command,
+                                   queue=queue,
+                                   executor_config=ti.executor_config)
             else:
                 self.logger.info(
                     'Task is already running, not sending to '
@@ -144,6 +147,7 @@ class BaseExecutor(LoggingMixin):
         self.sync()
 
     def change_state(self, key, state):
+        print("popping: {}".format(key))
         self.running.pop(key)
         self.event_buffer[key] = state
 
@@ -174,7 +178,11 @@ class BaseExecutor(LoggingMixin):
 
         return cleared_events
 
-    def execute_async(self, key, command, queue=None, executor_config=None):  
# pragma: no cover
+    def execute_async(self,
+                      key,
+                      command,
+                      queue=None,
+                      executor_config=None):  # pragma: no cover
         """
         This method will execute the command asynchronously.
         """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/executors/celery_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/celery_executor.py 
b/airflow/executors/celery_executor.py
index 70d0088..2de7c46 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -79,7 +79,8 @@ class CeleryExecutor(BaseExecutor):
         self.last_state = {}
 
     def execute_async(self, key, command,
-                      queue=DEFAULT_CELERY_CONFIG['task_default_queue'], 
executor_config=None):
+                      queue=DEFAULT_CELERY_CONFIG['task_default_queue'],
+                      executor_config=None):
         self.log.info( "[celery] queuing {key} through celery, "
                        "queue={queue}".format(**locals()))
         self.tasks[key] = execute_command.apply_async(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/executors/local_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/local_executor.py 
b/airflow/executors/local_executor.py
index 4ac25f5..9f75948 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -222,7 +222,7 @@ class LocalExecutor(BaseExecutor):
         self.impl.start()
 
     def execute_async(self, key, command, queue=None, executor_config=None):
-        self.queue.put((key, command))
+        self.impl.execute_async(key=key, command=command)
 
     def sync(self):
         self.impl.sync()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/migrations/versions/27c6a30d7c24_add_executor_config_to_task_instance.py
----------------------------------------------------------------------
diff --git 
a/airflow/migrations/versions/27c6a30d7c24_add_executor_config_to_task_instance.py
 
b/airflow/migrations/versions/27c6a30d7c24_add_executor_config_to_task_instance.py
index 84c41ec..b7213a3 100644
--- 
a/airflow/migrations/versions/27c6a30d7c24_add_executor_config_to_task_instance.py
+++ 
b/airflow/migrations/versions/27c6a30d7c24_add_executor_config_to_task_instance.py
@@ -1,3 +1,4 @@
+# flake8: noqa
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py
----------------------------------------------------------------------
diff --git 
a/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py
 
b/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py
index d642476..4347bae 100644
--- 
a/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py
+++ 
b/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py
@@ -1,3 +1,4 @@
+# flake8: noqa
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -25,7 +26,6 @@ down_revision = 'd2ae31099d61'
 branch_labels = None
 depends_on = None
 
-
 from alembic import op
 import sqlalchemy as sa
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index ae387b6..2de1ade 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -2223,10 +2223,17 @@ class BaseOperator(LoggingMixin):
     :param task_concurrency: When set, a task will be able to limit the 
concurrent
         runs across execution_dates
     :type task_concurrency: int
-    :param executor_config: Additional task-level configuration parameters 
that are 
-        interpreted by a specific executor. Parameters are namespaced by the 
name of executor.
-        ``example: to run this task in a specific docker container through the 
KubernetesExecutor
-        MyOperator(..., executor_config={"KubernetesExecutor": {"image": 
"myCustomDockerImage"}})``
+    :param executor_config: Additional task-level configuration parameters 
that are
+        interpreted by a specific executor. Parameters are namespaced by the 
name of
+        executor.
+        ``example: to run this task in a specific docker container through
+        the KubernetesExecutor
+        MyOperator(...,
+            executor_config={
+            "KubernetesExecutor":
+                {"image": "myCustomDockerImage"}
+                }
+        )``
     :type executor_config: dict
     """
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/kubectl
----------------------------------------------------------------------
diff --git a/kubectl b/kubectl
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/scripts/ci/kubernetes/docker/Dockerfile
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/docker/Dockerfile 
b/scripts/ci/kubernetes/docker/Dockerfile
index b1bc493..ad7919f 100644
--- a/scripts/ci/kubernetes/docker/Dockerfile
+++ b/scripts/ci/kubernetes/docker/Dockerfile
@@ -33,6 +33,9 @@ RUN apt-get update -y && apt-get install -y \
 RUN pip install -U setuptools && \
     pip install -U pip
 
+COPY requirements.txt /tmp/requirements.txt
+RUN pip install -r /tmp/requirements.txt
+
 RUN pip install kubernetes && \
     pip install cryptography && \
     pip install psycopg2==2.7.3.1  # I had issues with older versions of 
psycopg2, just a warning

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/scripts/ci/kubernetes/docker/requirements.txt
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/docker/requirements.txt 
b/scripts/ci/kubernetes/docker/requirements.txt
new file mode 100644
index 0000000..6b823d9
--- /dev/null
+++ b/scripts/ci/kubernetes/docker/requirements.txt
@@ -0,0 +1,35 @@
+alembic
+bleach
+configparser
+croniter
+dill
+flask
+flask-admin
+flask-caching
+flask-login
+flask-swagger
+flask-wtf
+funcsigs
+future
+gitpython
+gunicorn
+iso8601
+jinja2
+lxml
+markdown
+pandas
+pendulum
+psutil
+pygments
+python-daemon
+python-dateutil
+python-nvd3
+requests
+setproctitle
+sqlalchemy
+sqlalchemy-utc
+tabulate
+thrift
+tzlocal
+werkzeug
+zope.deprecation

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/scripts/ci/kubernetes/kube/airflow.yaml.template
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/kube/airflow.yaml.template 
b/scripts/ci/kubernetes/kube/airflow.yaml.template
index af54175..ae00983 100644
--- a/scripts/ci/kubernetes/kube/airflow.yaml.template
+++ b/scripts/ci/kubernetes/kube/airflow.yaml.template
@@ -73,7 +73,7 @@ spec:
           - "bash"
         args:
           - "-cx"
-          - "cd /usr/local/lib/python2.7/dist-packages/airflow && cp -R 
example_dags/* /root/airflow/dags/ && airflow initdb && alembic upgrade head"
+          - "cd /usr/local/lib/python2.7/dist-packages/airflow && cp -R 
example_dags/* /root/airflow/dags/ && airflow initdb && alembic upgrade heads"
       containers:
       - name: web
         image: {{docker_image}}:{{docker_tag}}
@@ -113,7 +113,7 @@ spec:
             path: /admin
             port: 8080
       - name: scheduler
-        image: {{docker_image}}
+        image: {{docker_image}}:{{docker_tag}}
         imagePullPolicy: IfNotPresent
         args: ["scheduler"]
         env:
@@ -173,7 +173,7 @@ data:
     dags_folder = /root/airflow/dags
     base_log_folder = /root/airflow/logs
     logging_level = INFO
-    executor = KubernetesExecutor
+    executor = KubernetesExecutor 
     parallelism = 32
     plugins_folder = /root/airflow/plugins
     sql_alchemy_conn = $SQL_ALCHEMY_CONN

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/scripts/ci/kubernetes/kube/deploy.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/kube/deploy.sh 
b/scripts/ci/kubernetes/kube/deploy.sh
index 953f50f..27c707f 100755
--- a/scripts/ci/kubernetes/kube/deploy.sh
+++ b/scripts/ci/kubernetes/kube/deploy.sh
@@ -20,7 +20,7 @@ TAG=${2:-latest}
 DIRNAME=$(cd "$(dirname "$0")"; pwd)
 
 # create an emptydir for postgres to store it's volume data in
-sudo mkdir -p /data/postgres-airflow
+#sudo mkdir -p /data/postgres-airflow
 
 mkdir -p $DIRNAME/.generated
 kubectl apply -f $DIRNAME/postgres.yaml
@@ -39,3 +39,13 @@ do
   fi
   sleep 4
 done
+
+POD=$(kubectl get pods -o go-template --template '{{range 
.items}}{{.metadata.name}}{{"\n"}}{{end}}' | grep airflow | head -1)
+
+echo "------- pod description -------"
+kubectl describe pod $POD
+echo "------- web logs -------"
+kubectl logs $POD web
+echo "------- scheduler logs -------"
+kubectl logs $POD scheduler
+echo "--------------"

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/scripts/ci/kubernetes/kube/postgres.yaml
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/kube/postgres.yaml 
b/scripts/ci/kubernetes/kube/postgres.yaml
index 79366d0..e0bbdff 100644
--- a/scripts/ci/kubernetes/kube/postgres.yaml
+++ b/scripts/ci/kubernetes/kube/postgres.yaml
@@ -14,30 +14,31 @@
 #  KIND, either express or implied.  See the License for the    *
 #  specific language governing permissions and limitations      *
 #  under the License.                                           *
-
-apiVersion: v1
-kind: PersistentVolume
-metadata:
-  name: postgres-airflow
-spec:
-  accessModes:
-    - ReadWriteOnce
-  capacity:
-    storage: 5Gi
-  hostPath:
-    path: /data/postgres-airflow
----
-kind: PersistentVolumeClaim
-apiVersion: v1
-metadata:
-  name: postgres-airflow
-spec:
-  accessModes:
-    - ReadWriteOnce
-  resources:
-    requests:
-      storage: 5Gi
----
+#
+#apiVersion: v1
+#kind: PersistentVolume
+#metadata:
+#  name: postgres-airflow
+#spec:
+#  accessModes:
+#    - ReadWriteOnce
+#  capacity:
+#    storage: 5Gi
+#  hostPath:
+#    path: /data/postgres-airflow
+#
+#---
+#kind: PersistentVolumeClaim
+#apiVersion: v1
+#metadata:
+#  name: postgres-airflow
+#spec:
+#  accessModes:
+#    - ReadWriteOnce
+#  resources:
+#    requests:
+#      storage: 5Gi
+#---
 kind: Deployment
 apiVersion: extensions/v1beta1
 metadata:
@@ -95,8 +96,9 @@ spec:
               cpu: .5
       volumes:
         - name: dbvol
-          persistentVolumeClaim:
-            claimName: postgres-airflow
+          emptyDir: {}
+#          persistentVolumeClaim:
+#            claimName: postgres-airflow
 ---
 apiVersion: v1
 kind: Service

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/scripts/ci/kubernetes/minikube/start_minikube.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/minikube/start_minikube.sh 
b/scripts/ci/kubernetes/minikube/start_minikube.sh
index 8a27d75..be370cf 100755
--- a/scripts/ci/kubernetes/minikube/start_minikube.sh
+++ b/scripts/ci/kubernetes/minikube/start_minikube.sh
@@ -1,19 +1,19 @@
-#  Licensed to the Apache Software Foundation (ASF) under one   *
-#  or more contributor license agreements.  See the NOTICE file *
-#  distributed with this work for additional information        *
-#  regarding copyright ownership.  The ASF licenses this file   *
-#  to you under the Apache License, Version 2.0 (the            *
-#  "License"); you may not use this file except in compliance   *
-#  with the License.  You may obtain a copy of the License at   *
-#                                                               *
-#    http://www.apache.org/licenses/LICENSE-2.0                 *
-#                                                               *
-#  Unless required by applicable law or agreed to in writing,   *
-#  software distributed under the License is distributed on an  *
-#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
-#  KIND, either express or implied.  See the License for the    *
-#  specific language governing permissions and limitations      *
-#  under the License.                                           *
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 
 # Guard against a kubernetes cluster already being up
 kubectl get pods &> /dev/null
@@ -23,8 +23,8 @@ if [ $? -eq 0 ]; then
 fi
 #
 
-curl -Lo minikube 
https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64 && 
chmod +x minikube
-curl -Lo kubectl  
https://storage.googleapis.com/kubernetes-release/release/v1.7.0/bin/linux/amd64/kubectl
 && chmod +x kubectl
+curl -Lo minikube 
https://storage.googleapis.com/minikube/releases/v0.24.1/minikube-linux-amd64 
&& chmod +x minikube
+curl -Lo kubectl  
https://storage.googleapis.com/kubernetes-release/release/${KUBERNETES_VERSION}/bin/linux/amd64/kubectl
 && chmod +x kubectl
 
 sudo mkdir -p /usr/local/bin
 sudo mv minikube /usr/local/bin/minikube
@@ -38,15 +38,43 @@ mkdir $HOME/.kube || true
 touch $HOME/.kube/config
 
 export KUBECONFIG=$HOME/.kube/config
-sudo -E minikube start --vm-driver=none
-
-# this for loop waits until kubectl can access the api server that minikube 
has created
-for i in {1..150} # timeout for 5 minutes
-do
-  echo "------- Running kubectl get pods -------"
-  kubectl get po &> /dev/null
-  if [ $? -ne 1 ]; then
-    break
-  fi
-  sleep 2
-done
+
+start_minikube(){
+  sudo -E minikube start --vm-driver=none 
--kubernetes-version="${KUBERNETES_VERSION}"
+
+  # this for loop waits until kubectl can access the api server that minikube 
has created
+  for i in {1..90} # timeout 3 minutes
+  do
+    echo "------- Running kubectl get pods -------"
+    STDERR=$(kubectl get pods  2>&1 >/dev/null)
+    if [ $? -eq 0 ]; then
+      echo $STDERR
+
+      # We do not need dynamic hostpath provisioning, so disable the default 
storageclass
+      sudo -E minikube addons disable default-storageclass && kubectl delete 
storageclasses --all
+
+      # We need to give permission to watch pods to the airflow scheduler.
+      # The easiest way to do that is by giving admin access to the default 
serviceaccount (NOT SAFE!)
+      kubectl create clusterrolebinding add-on-cluster-admin   
--clusterrole=cluster-admin   --serviceaccount=default:default
+      exit 0
+    fi
+    echo $STDERR
+    sleep 2
+  done
+}
+
+cleanup_minikube(){
+  sudo -E minikube stop
+  sudo -E minikube delete
+  docker stop $(docker ps -a -q) || true
+  docker rm $(docker ps -a -q) || true
+  sleep 1
+}
+
+start_minikube
+echo "Minikube cluster creation timedout. Attempting to restart the minikube 
cluster."
+cleanup_minikube
+start_minikube
+echo "Minikube cluster creation timedout a second time. Failing."
+
+exit 1

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/scripts/ci/travis_script.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/travis_script.sh b/scripts/ci/travis_script.sh
index 5b2a198..8766e94 100755
--- a/scripts/ci/travis_script.sh
+++ b/scripts/ci/travis_script.sh
@@ -21,12 +21,12 @@ DIRNAME=$(cd "$(dirname "$0")"; pwd)
 AIRFLOW_ROOT="$DIRNAME/../.."
 cd $AIRFLOW_ROOT && pip --version && ls -l $HOME/.wheelhouse && tox --version
 
-if [ -z "$RUN_KUBE_INTEGRATION" ];
+if [ -z "$KUBERNETES_VERSION" ];
 then
   tox -e $TOX_ENV
 else
-  $DIRNAME/kubernetes/setup_kubernetes.sh && \
-  tox -e $TOX_ENV -- tests.contrib.executors.integration \
+  KUBERNETES_VERSION=${KUBERNETES_VERSION} 
$DIRNAME/kubernetes/setup_kubernetes.sh && \
+  tox -e $TOX_ENV -- tests.contrib.minikube_tests \
                      --with-coverage \
                      --cover-erase \
                      --cover-html \

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/tests/contrib/executors/integration/__init__.py
----------------------------------------------------------------------
diff --git a/tests/contrib/executors/integration/__init__.py 
b/tests/contrib/executors/integration/__init__.py
deleted file mode 100644
index 9d7677a..0000000
--- a/tests/contrib/executors/integration/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/tests/contrib/executors/integration/airflow_controller.py
----------------------------------------------------------------------
diff --git a/tests/contrib/executors/integration/airflow_controller.py 
b/tests/contrib/executors/integration/airflow_controller.py
deleted file mode 100644
index 499adb4..0000000
--- a/tests/contrib/executors/integration/airflow_controller.py
+++ /dev/null
@@ -1,114 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import subprocess
-import time
-
-
-class RunCommandError(Exception):
-    pass
-
-
-class TimeoutError(Exception):
-    pass
-
-
-class DagRunState:
-    SUCCESS = "success"
-    FAILED = "failed"
-    RUNNING = "running"
-
-
-def run_command(command):
-    process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, 
stderr=subprocess.PIPE)
-    stdout, stderr = process.communicate()
-    if process.returncode != 0:
-        raise RunCommandError("Error while running command: {}; Stdout: {}; 
Stderr: {}".format(
-            command, stdout, stderr
-        ))
-    return stdout, stderr
-
-
-def run_command_in_pod(pod_name, container_name, command):
-    return run_command("kubectl exec {pod_name} -c {container_name} -- 
{command}".format(
-        pod_name=pod_name, container_name=container_name, command=command
-    ))
-
-def _unpause_dag(dag_id, airflow_pod=None):
-    airflow_pod = airflow_pod or _get_airflow_pod()
-    return run_command_in_pod(airflow_pod, "scheduler", "airflow unpause 
{dag_id}".format(dag_id=dag_id))
-
-def run_dag(dag_id, run_id, airflow_pod=None):
-    airflow_pod = airflow_pod or _get_airflow_pod()
-    _unpause_dag(dag_id, airflow_pod)
-    return run_command_in_pod(airflow_pod, "scheduler", "airflow trigger_dag 
{dag_id} -r {run_id}".format(
-        dag_id=dag_id, run_id=run_id
-    ))
-
-
-def _get_pod_by_grep(grep_phrase):
-    stdout, stderr = run_command("kubectl get pods | grep {grep_phrase} | awk 
'{{print $1}}'".format(
-        grep_phrase=grep_phrase
-    ))
-    pod_name = stdout.strip()
-    return pod_name
-
-
-def _get_airflow_pod():
-    return _get_pod_by_grep("^airflow")
-
-
-def _get_postgres_pod():
-    return _get_pod_by_grep("^postgres")
-
-
-def _parse_state(stdout):
-    end_line = "(1 row)"
-    prev_line = None
-    for line in stdout.split("\n"):
-        if end_line in line:
-            return prev_line.strip()
-        prev_line = line
-
-    raise Exception("Unknown psql output: {}".format(stdout))
-
-def get_dag_run_state(dag_id, run_id, postgres_pod=None):
-    postgres_pod = postgres_pod or _get_postgres_pod()
-    stdout, stderr = run_command_in_pod(
-        postgres_pod, "postgres",
-        """psql airflow -c "select state from dag_run where dag_id='{dag_id}' 
and run_id='{run_id}'" """.format(
-            dag_id=dag_id, run_id=run_id
-        )
-    )
-    return _parse_state(stdout)
-
-
-def dag_final_state(dag_id, run_id, postgres_pod=None, poll_interval=1, 
timeout=120):
-    postgres_pod = postgres_pod or _get_postgres_pod()
-    for _ in range(0, timeout / poll_interval):
-        dag_state = get_dag_run_state(dag_id, run_id, postgres_pod)
-        if dag_state != DagRunState.RUNNING:
-            return dag_state
-        time.sleep(poll_interval)
-
-    raise TimeoutError("Timed out while waiting for DagRun with dag_id: {} 
run_id: {}".format(dag_id, run_id))
-
-
-def _kill_pod(pod_name):
-    return run_command("kubectl delete pod 
{pod_name}".format(pod_name=pod_name))
-
-
-def kill_scheduler():
-    airflow_pod = _get_pod_by_grep("^airflow")
-    return _kill_pod(airflow_pod)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/tests/contrib/executors/integration/test_kubernetes_executor_integration.py
----------------------------------------------------------------------
diff --git 
a/tests/contrib/executors/integration/test_kubernetes_executor_integration.py 
b/tests/contrib/executors/integration/test_kubernetes_executor_integration.py
deleted file mode 100644
index 97949ae..0000000
--- 
a/tests/contrib/executors/integration/test_kubernetes_executor_integration.py
+++ /dev/null
@@ -1,65 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import unittest
-import time
-from uuid import uuid4
-from tests.contrib.executors.integration.airflow_controller import (
-    run_command, RunCommandError,
-    run_dag, get_dag_run_state, dag_final_state, DagRunState,
-    kill_scheduler
-)
-
-
-try:
-    run_command("kubectl get pods")
-except RunCommandError:
-    SKIP_KUBE = True
-else:
-    SKIP_KUBE = False
-
-
-class KubernetesExecutorTest(unittest.TestCase):
-
-    @unittest.skipIf(SKIP_KUBE, 'Kubernetes integration tests are unsupported 
by this configuration')
-    def test_kubernetes_executor_dag_runs_successfully(self):
-        dag_id, run_id = "example_python_operator", uuid4().hex
-        run_dag(dag_id, run_id)
-        state = dag_final_state(dag_id, run_id, timeout=120)
-        self.assertEquals(state, DagRunState.SUCCESS)
-
-    @unittest.skipIf(SKIP_KUBE, 'Kubernetes integration tests are unsupported 
by this configuration')
-    def test_start_dag_then_kill_scheduler_then_ensure_dag_succeeds(self):
-        dag_id, run_id = "example_python_operator", uuid4().hex
-        run_dag(dag_id, run_id)
-
-        self.assertEquals(get_dag_run_state(dag_id, run_id), 
DagRunState.RUNNING)
-
-        time.sleep(10)
-
-        kill_scheduler()
-
-        self.assertEquals(dag_final_state(dag_id, run_id, timeout=180), 
DagRunState.SUCCESS)
-
-    @unittest.skipIf(SKIP_KUBE, 'Kubernetes integration tests are unsupported 
by this configuration')
-    def test_kubernetes_executor_config_works(self):
-        dag_id, run_id = "example_kubernetes_executor", uuid4().hex
-        run_dag(dag_id, run_id)
-
-        self.assertEquals(get_dag_run_state(dag_id, run_id), 
DagRunState.RUNNING)
-        self.assertEquals(dag_final_state(dag_id, run_id, timeout=180), 
DagRunState.SUCCESS)
-
-
-if __name__ == "__main__":
-    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/tests/contrib/executors/test_kubernetes_executor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/executors/test_kubernetes_executor.py 
b/tests/contrib/executors/test_kubernetes_executor.py
index 4c9728e..0a38920 100644
--- a/tests/contrib/executors/test_kubernetes_executor.py
+++ b/tests/contrib/executors/test_kubernetes_executor.py
@@ -26,7 +26,6 @@ except ImportError:
 
 
 class TestAirflowKubernetesScheduler(unittest.TestCase):
-
     def _gen_random_string(self, str_len):
         return ''.join([random.choice(string.printable) for _ in 
range(str_len)])
 
@@ -36,7 +35,7 @@ class TestAirflowKubernetesScheduler(unittest.TestCase):
             ("my.dag.id", "my.task.id"),
             ("MYDAGID", "MYTASKID"),
             ("my_dag_id", "my_task_id"),
-            ("mydagid"*200, "my_task_id"*200)
+            ("mydagid" * 200, "my_task_id" * 200)
         ]
 
         cases.extend([
@@ -53,17 +52,22 @@ class TestAirflowKubernetesScheduler(unittest.TestCase):
             all(ch.lower() == ch for ch in name) and
             re.match(regex, name))
 
-    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python 
package is not installed')
+    @unittest.skipIf(AirflowKubernetesScheduler is None,
+                     'kubernetes python package is not installed')
     def test_create_pod_id(self):
         for dag_id, task_id in self._cases():
             pod_name = AirflowKubernetesScheduler._create_pod_id(dag_id, 
task_id)
             self.assertTrue(self._is_valid_name(pod_name))
 
-    @unittest.skipIf(AirflowKubernetesScheduler is None, "kubernetes python 
package is not installed")
+    @unittest.skipIf(AirflowKubernetesScheduler is None,
+                     "kubernetes python package is not installed")
     def test_execution_date_serialize_deserialize(self):
         datetime_obj = datetime.now()
-        serialized_datetime = 
AirflowKubernetesScheduler._datetime_to_label_safe_datestring(datetime_obj)
-        new_datetime_obj = 
AirflowKubernetesScheduler._label_safe_datestring_to_datetime(serialized_datetime)
+        serialized_datetime = \
+            AirflowKubernetesScheduler._datetime_to_label_safe_datestring(
+                datetime_obj)
+        new_datetime_obj = 
AirflowKubernetesScheduler._label_safe_datestring_to_datetime(
+            serialized_datetime)
 
         self.assertEquals(datetime_obj, new_datetime_obj)
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/tests/contrib/kubernetes/test_kubernetes_job.py
----------------------------------------------------------------------
diff --git a/tests/contrib/kubernetes/test_kubernetes_job.py 
b/tests/contrib/kubernetes/test_kubernetes_job.py
deleted file mode 100644
index 9921696..0000000
--- a/tests/contrib/kubernetes/test_kubernetes_job.py
+++ /dev/null
@@ -1,12 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/tests/contrib/kubernetes/test_kubernetes_job_launcher.py
----------------------------------------------------------------------
diff --git a/tests/contrib/kubernetes/test_kubernetes_job_launcher.py 
b/tests/contrib/kubernetes/test_kubernetes_job_launcher.py
deleted file mode 100644
index 3353390..0000000
--- a/tests/contrib/kubernetes/test_kubernetes_job_launcher.py
+++ /dev/null
@@ -1,59 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-
-import unittest
-from airflow.contrib.kubernetes.kubernetes_job_builder import 
KubernetesJobBuilder
-from airflow.contrib.kubernetes.kubernetes_request_factory import 
SimpleJobRequestFactory
-from airflow import configuration
-import json
-
-secrets = {}
-labels = {}
-base_job = {'kind': 'Job',
-            'spec': {
-                'template': {
-                    'spec': {
-                        'restartPolicy': 'Never',
-                        'volumes': [{'hostPath': {'path': '/tmp/dags'}, 
'name': 'shared-data'}],
-                        'containers': [
-                            {'command': ['try', 'this', 'first'],
-                             'image': 'foo.image', 'volumeMounts': [
-                                {
-                                    'mountPath': '/usr/local/airflow/dags',
-                                    'name': 'shared-data'}
-                            ],
-                             'name': 'base',
-                             'imagePullPolicy': 'Never'}
-                        ]
-                    },
-                    'metadata': {'name': 'name'}
-                }
-            },
-            'apiVersion': 'batch/v1', 'metadata': {'name': None}
-            }
-
-
-class KubernetesJobRequestTest(unittest.TestCase):
-    job_to_load = None
-    job_req_factory = SimpleJobRequestFactory()
-
-    def setUp(self):
-        configuration.load_test_config()
-        self.job_to_load = KubernetesJobBuilder(
-            image='foo.image',
-            cmds=['try', 'this', 'first']
-        )
-
-    def test_job_creation_with_base_values(self):
-        base_job_result = self.job_req_factory.create(self.job_to_load)
-        self.assertEqual(base_job_result, base_job)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/tests/contrib/minikube_tests/integration/__init__.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/integration/__init__.py 
b/tests/contrib/minikube_tests/integration/__init__.py
new file mode 100644
index 0000000..9d7677a
--- /dev/null
+++ b/tests/contrib/minikube_tests/integration/__init__.py
@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/tests/contrib/minikube_tests/integration/airflow_controller.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube_tests/integration/airflow_controller.py 
b/tests/contrib/minikube_tests/integration/airflow_controller.py
new file mode 100644
index 0000000..5604652
--- /dev/null
+++ b/tests/contrib/minikube_tests/integration/airflow_controller.py
@@ -0,0 +1,166 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import subprocess
+import time
+
+
+class RunCommandError(Exception):
+    pass
+
+
+class TimeoutError(Exception):
+    pass
+
+
+class DagRunState:
+    SUCCESS = "success"
+    FAILED = "failed"
+    RUNNING = "running"
+
+
+def run_command(command):
+    process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE,
+                               stderr=subprocess.PIPE)
+    stdout, stderr = process.communicate()
+    if process.returncode != 0:
+        raise RunCommandError(
+            "Error while running command: {}; Stdout: {}; Stderr: {}".format(
+                command, stdout, stderr
+            ))
+    return stdout, stderr
+
+
+def run_command_in_pod(pod_name, container_name, command):
+    return run_command("kubectl exec {pod_name} -c {container_name} -- 
{command}".format(
+        pod_name=pod_name, container_name=container_name, command=command
+    ))
+
+
+def get_scheduler_logs(airflow_pod=None):
+    airflow_pod = airflow_pod or _get_airflow_pod()
+
+    return run_command("kubectl logs {pod_name} scheduler"
+                       .format(pod_name=airflow_pod))
+
+
+def _unpause_dag(dag_id, airflow_pod=None):
+    airflow_pod = airflow_pod or _get_airflow_pod()
+    return run_command_in_pod(airflow_pod, "scheduler",
+                              "airflow unpause {dag_id}".format(dag_id=dag_id))
+
+
+def run_dag(dag_id, run_id, airflow_pod=None):
+    airflow_pod = airflow_pod or _get_airflow_pod()
+    _unpause_dag(dag_id, airflow_pod)
+    return run_command_in_pod(airflow_pod, "scheduler",
+                              "airflow trigger_dag {dag_id} -r 
{run_id}".format(
+                                  dag_id=dag_id, run_id=run_id
+                              ))
+
+
+def _get_pod_by_grep(grep_phrase):
+    stdout, stderr = run_command(
+        "kubectl get pods | grep {grep_phrase} | awk '{{print $1}}'".format(
+            grep_phrase=grep_phrase
+        ))
+    pod_name = stdout.strip()
+    return pod_name
+
+
+def _get_airflow_pod():
+    return _get_pod_by_grep("^airflow")
+
+
+def _get_postgres_pod():
+    return _get_pod_by_grep("^postgres")
+
+
+def _parse_state(stdout):
+    end_line = "(1 row)"
+    prev_line = None
+    for line in stdout.split("\n"):
+        if end_line in line:
+            return prev_line.strip()
+        prev_line = line
+
+    raise Exception("Unknown psql output: {}".format(stdout))
+
+
+def get_dag_run_table(postgres_pod=None):
+    postgres_pod = postgres_pod or _get_postgres_pod()
+    stdout, stderr = run_command_in_pod(
+        postgres_pod, "postgres",
+        """psql airflow -c "select * from dag_run" """
+    )
+    return stdout
+
+
+def get_task_instance_table(postgres_pod=None):
+    postgres_pod = postgres_pod or _get_postgres_pod()
+    stdout, stderr = run_command_in_pod(
+        postgres_pod, "postgres",
+        """psql airflow -c "select * from task_instance" """
+    )
+    return stdout
+
+
+def get_dag_run_state(dag_id, run_id, postgres_pod=None):
+    postgres_pod = postgres_pod or _get_postgres_pod()
+    stdout, stderr = run_command_in_pod(
+        postgres_pod, "postgres",
+        """psql airflow -c "select state from dag_run where dag_id='{dag_id}' 
and
+         run_id='{run_id}'" """.format(
+            dag_id=dag_id, run_id=run_id
+        )
+    )
+    return _parse_state(stdout)
+
+
+def dag_final_state(dag_id, run_id, postgres_pod=None, poll_interval=1, 
timeout=120):
+    postgres_pod = postgres_pod or _get_postgres_pod()
+    for _ in range(0, timeout / poll_interval):
+        dag_state = get_dag_run_state(dag_id, run_id, postgres_pod)
+        if dag_state != DagRunState.RUNNING:
+            capture_logs_for_failure(dag_state)
+            return dag_state
+        time.sleep(poll_interval)
+
+    raise TimeoutError(
+        "Timed out while waiting for DagRun with dag_id: {} run_id: 
{}".format(dag_id,
+                                                                               
run_id))
+
+
+def _kill_pod(pod_name):
+    return run_command("kubectl delete pod 
{pod_name}".format(pod_name=pod_name))
+
+
+def kill_scheduler():
+    airflow_pod = _get_pod_by_grep("^airflow")
+    return _kill_pod(airflow_pod)
+
+
+def capture_logs_for_failure(state):
+    if state != DagRunState.SUCCESS:
+        stdout, stderr = get_scheduler_logs()
+        print("stdout:")
+        for line in stdout.split('\n'):
+            print(line)
+        print("stderr:")
+        for line in stderr.split('\n'):
+            print(line)
+        print("dag_run:")
+        print(get_dag_run_table())
+        print("task_instance")
+        print(get_task_instance_table())

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/tests/contrib/minikube_tests/integration/test_kubernetes_executor_integration.py
----------------------------------------------------------------------
diff --git 
a/tests/contrib/minikube_tests/integration/test_kubernetes_executor_integration.py
 
b/tests/contrib/minikube_tests/integration/test_kubernetes_executor_integration.py
new file mode 100644
index 0000000..602a717
--- /dev/null
+++ 
b/tests/contrib/minikube_tests/integration/test_kubernetes_executor_integration.py
@@ -0,0 +1,67 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+import unittest
+from uuid import uuid4
+
+from tests.contrib.minikube_tests.integration.airflow_controller\
+    import DagRunState, RunCommandError, \
+    dag_final_state, get_dag_run_state, kill_scheduler, run_command, run_dag
+
+try:
+    run_command("kubectl get pods")
+except RunCommandError:
+    SKIP_KUBE = True
+else:
+    SKIP_KUBE = False
+
+
+class KubernetesExecutorTest(unittest.TestCase):
+    @unittest.skipIf(SKIP_KUBE,
+                     'Kubernetes integration tests are unsupported by this 
configuration')
+    def test_kubernetes_executor_dag_runs_successfully(self):
+        dag_id, run_id = "example_python_operator", uuid4().hex
+        run_dag(dag_id, run_id)
+        state = dag_final_state(dag_id, run_id, timeout=120)
+        self.assertEquals(state, DagRunState.SUCCESS)
+
+    @unittest.skipIf(SKIP_KUBE,
+                     'Kubernetes integration tests are unsupported by this 
configuration')
+    def test_start_dag_then_kill_scheduler_then_ensure_dag_succeeds(self):
+        dag_id, run_id = "example_python_operator", uuid4().hex
+        run_dag(dag_id, run_id)
+
+        self.assertEquals(get_dag_run_state(dag_id, run_id), 
DagRunState.RUNNING)
+
+        time.sleep(10)
+
+        kill_scheduler()
+
+        self.assertEquals(dag_final_state(dag_id, run_id, timeout=180),
+                          DagRunState.SUCCESS)
+
+    @unittest.skipIf(SKIP_KUBE,
+                     'Kubernetes integration tests are unsupported by this 
configuration')
+    def test_kubernetes_executor_config_works(self):
+        dag_id, run_id = "example_kubernetes_executor", uuid4().hex
+        run_dag(dag_id, run_id)
+
+        self.assertEquals(get_dag_run_state(dag_id, run_id), 
DagRunState.RUNNING)
+        self.assertEquals(dag_final_state(dag_id, run_id, timeout=300),
+                          DagRunState.SUCCESS)
+
+
+if __name__ == "__main__":
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index e4dffcf..24c24fe 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -71,7 +71,7 @@ from jinja2 import UndefinedError
 
 import six
 
-NUM_EXAMPLE_DAGS = 19
+NUM_EXAMPLE_DAGS = 20
 DEV_NULL = '/dev/null'
 TEST_DAG_FOLDER = os.path.join(
     os.path.dirname(os.path.realpath(__file__)), 'dags')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 615ca9a..9eb166b 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -161,7 +161,8 @@ class BackfillJobTest(unittest.TestCase):
             'example_trigger_target_dag',
             'example_trigger_controller_dag',  # tested above
             'test_utils',  # sleeps forever
-            'example_kubernetes_executor'  # requires kubernetes cluster
+            'example_kubernetes_executor',  # requires kubernetes cluster
+            'example_kubernetes_operator'  # requires kubernetes cluster
         ]
 
         logger = logging.getLogger('BackfillJobTest.test_backfill_examples')

Reply via email to