Repository: incubator-airflow
Updated Branches:
  refs/heads/master b48bbbd6f -> 1abe7f6d5


[AIRFLOW-1517] Kubernetes Operator


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/78ff2fc1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/78ff2fc1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/78ff2fc1

Branch: refs/heads/master
Commit: 78ff2fc180808a38c53cc643bd87c509d7540b4a
Parents: c0dffb5
Author: Daniel Imberman <[email protected]>
Authored: Thu Dec 7 09:41:05 2017 -0600
Committer: Daniel Imberman <[email protected]>
Committed: Tue Dec 26 08:45:31 2017 -0800

----------------------------------------------------------------------
 .travis.yml                                     |   5 +-
 airflow/contrib/kubernetes/__init__.py          |  13 +
 airflow/contrib/kubernetes/kube_client.py       |  34 +++
 .../kubernetes_request_factory/__init__.py      |  12 +
 .../kubernetes_request_factory.py               | 162 +++++++++++
 .../pod_request_factory.py                      |  56 ++++
 airflow/contrib/kubernetes/pod.py               |  92 ++++++
 airflow/contrib/kubernetes/pod_generator.py     | 278 +++++++++++++++++++
 airflow/contrib/kubernetes/pod_launcher.py      | 119 ++++++++
 airflow/contrib/kubernetes/secret.py            |  36 +++
 .../operators/kubernetes_pod_operator.py        |  71 +++++
 .../ci/kubernetes/minikube/start_minikube.sh    |  53 ++++
 scripts/ci/kubernetes/setup_kubernetes.sh       |  28 ++
 scripts/ci/requirements.txt                     |   1 +
 scripts/ci/travis_script.sh                     |  39 +++
 setup.py                                        |   6 +-
 .../operators/test_kubernetes_pod_operator.py   |  69 +++++
 17 files changed, 1069 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 9b173a9..6b45153 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -89,9 +89,6 @@ before_script:
   - psql -c 'create database airflow;' -U postgres
   - export PATH=${PATH}:/tmp/hive/bin
 script:
-  - pip --version
-  - ls -l $HOME/.wheelhouse
-  - tox --version
-  - tox -e $TOX_ENV
+  - ./scripts/ci/travis_script.sh
 after_success:
   - codecov

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/kubernetes/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/__init__.py 
b/airflow/contrib/kubernetes/__init__.py
new file mode 100644
index 0000000..9d7677a
--- /dev/null
+++ b/airflow/contrib/kubernetes/__init__.py
@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/kubernetes/kube_client.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kube_client.py 
b/airflow/contrib/kubernetes/kube_client.py
new file mode 100644
index 0000000..7dc895e
--- /dev/null
+++ b/airflow/contrib/kubernetes/kube_client.py
@@ -0,0 +1,34 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+def load_kube_config(in_cluster=True):
+    from kubernetes import config, client
+    if in_cluster:
+        config.load_incluster_config()
+    else:
+        try:
+            config.load_kube_config()
+            return client.CoreV1Api()
+        except NotImplementedError:
+            NotImplementedError(
+                "requires incluster config or defined configuration in 
airflow.cfg")
+
+
+def get_kube_client(in_cluster=True):
+    # TODO: This should also allow people to point to a cluster.
+
+    from kubernetes import client
+    load_kube_config(in_cluster)
+    return client.CoreV1Api()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py 
b/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py
new file mode 100644
index 0000000..9921696
--- /dev/null
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py
@@ -0,0 +1,12 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
----------------------------------------------------------------------
diff --git 
a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
 
b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
new file mode 100644
index 0000000..9398bef
--- /dev/null
+++ 
b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
@@ -0,0 +1,162 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from abc import ABCMeta, abstractmethod
+import six
+
+
+class KubernetesRequestFactory:
+    """
+    Create requests to be sent to kube API.
+    Extend this class to talk to kubernetes and generate your specific 
resources.
+    This is equivalent of generating yaml files that can be used by `kubectl`
+    """
+    __metaclass__ = ABCMeta
+
+    @abstractmethod
+    def create(self, pod):
+        """
+        Creates the request for kubernetes API.
+
+        :param pod: The pod object
+        """
+        pass
+
+    @staticmethod
+    def extract_image(pod, req):
+        req['spec']['containers'][0]['image'] = pod.image
+
+    @staticmethod
+    def extract_image_pull_policy(pod, req):
+        if pod.image_pull_policy:
+            req['spec']['containers'][0]['imagePullPolicy'] = 
pod.image_pull_policy
+
+    @staticmethod
+    def add_secret_to_env(env, secret):
+        env.append({
+            'name': secret.deploy_target,
+            'valueFrom': {
+                'secretKeyRef': {
+                    'name': secret.secret,
+                    'key': secret.key
+                }
+            }
+        })
+
+    @staticmethod
+    def extract_labels(pod, req):
+        req['metadata']['labels'] = req['metadata'].get('labels', {})
+        for k, v in six.iteritems(pod.labels):
+            req['metadata']['labels'][k] = v
+
+    @staticmethod
+    def extract_cmds(pod, req):
+        req['spec']['containers'][0]['command'] = pod.cmds
+
+    @staticmethod
+    def extract_args(pod, req):
+        req['spec']['containers'][0]['args'] = pod.args
+
+    @staticmethod
+    def extract_node_selector(pod, req):
+        if len(pod.node_selectors) > 0:
+            req['spec']['nodeSelector'] = pod.node_selectors
+
+    @staticmethod
+    def attach_volumes(pod, req):
+        req['spec']['volumes'] = pod.volumes
+
+    @staticmethod
+    def attach_volume_mounts(pod, req):
+        if len(pod.volume_mounts) > 0:
+            req['spec']['containers'][0]['volumeMounts'] = (
+                req['spec']['containers'][0].get('volumeMounts', []))
+            
req['spec']['containers'][0]['volumeMounts'].extend(pod.volume_mounts)
+
+    @staticmethod
+    def extract_name(pod, req):
+        req['metadata']['name'] = pod.name
+
+    @staticmethod
+    def extract_volume_secrets(pod, req):
+        vol_secrets = [s for s in pod.secrets if s.deploy_type == 'volume']
+        if any(vol_secrets):
+            req['spec']['containers'][0]['volumeMounts'] = []
+            req['spec']['volumes'] = []
+        for idx, vol in enumerate(vol_secrets):
+            vol_id = 'secretvol' + str(idx)
+            req['spec']['containers'][0]['volumeMounts'].append({
+                'mountPath': vol.deploy_target,
+                'name': vol_id,
+                'readOnly': True
+            })
+            req['spec']['volumes'].append({
+                'name': vol_id,
+                'secret': {
+                    'secretName': vol.secret
+                }
+            })
+
+    @staticmethod
+    def extract_env_and_secrets(pod, req):
+        env_secrets = [s for s in pod.secrets if s.deploy_type == 'env']
+        if len(pod.envs) > 0 or len(env_secrets) > 0:
+            env = []
+            for k in pod.envs.keys():
+                env.append({'name': k, 'value': pod.envs[k]})
+            for secret in env_secrets:
+                KubernetesRequestFactory.add_secret_to_env(env, secret)
+            req['spec']['containers'][0]['env'] = env
+
+    @staticmethod
+    def extract_resources(pod, req):
+        if not pod.resources or pod.resources.is_empty_resource_request():
+            return
+
+        req['spec']['containers'][0]['resources'] = {}
+
+        if pod.resources.has_requests():
+            req['spec']['containers'][0]['resources']['requests'] = {}
+            if pod.resources.request_memory:
+                req['spec']['containers'][0]['resources']['requests'][
+                    'memory'] = pod.resources.request_memory
+            if pod.resources.request_cpu:
+                req['spec']['containers'][0]['resources']['requests'][
+                    'cpu'] = pod.resources.request_cpu
+
+        if pod.resources.has_limits():
+            req['spec']['containers'][0]['resources']['limits'] = {}
+            if pod.resources.request_memory:
+                req['spec']['containers'][0]['resources']['limits'][
+                    'memory'] = pod.resources.limit_memory
+            if pod.resources.request_cpu:
+                req['spec']['containers'][0]['resources']['limits'][
+                    'cpu'] = pod.resources.limit_cpu
+
+    @staticmethod
+    def extract_init_containers(pod, req):
+        if pod.init_containers:
+            req['spec']['initContainers'] = pod.init_containers
+
+    @staticmethod
+    def extract_service_account_name(pod, req):
+        if pod.service_account_name:
+            req['spec']['serviceAccountName'] = pod.service_account_name
+
+    @staticmethod
+    def extract_image_pull_secrets(pod, req):
+        if pod.image_pull_secrets:
+            req['spec']['imagePullSecrets'] = [{
+                'name': pull_secret
+            } for pull_secret in pod.image_pull_secrets.split(',')]

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
----------------------------------------------------------------------
diff --git 
a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py 
b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
new file mode 100644
index 0000000..3be1a13
--- /dev/null
+++ 
b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
@@ -0,0 +1,56 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+
+import yaml
+from 
airflow.contrib.kubernetes.kubernetes_request_factory.kubernetes_request_factory
 \
+    import KubernetesRequestFactory
+
+
+class SimplePodRequestFactory(KubernetesRequestFactory):
+    """
+    Request generator for a simple pod.
+    """
+    _yaml = """apiVersion: v1
+kind: Pod
+metadata:
+  name: name
+spec:
+  containers:
+    - name: base
+      image: airflow-slave:latest
+      command: ["/usr/local/airflow/entrypoint.sh", "/bin/bash sleep 25"]
+  restartPolicy: Never
+    """
+
+    def __init__(self):
+        pass
+
+    def create(self, pod):
+        # type: (Pod) -> dict
+        req = yaml.load(self._yaml)
+        self.extract_name(pod, req)
+        self.extract_labels(pod, req)
+        self.extract_image(pod, req)
+        self.extract_image_pull_policy(pod, req)
+        self.extract_cmds(pod, req)
+        self.extract_args(pod, req)
+        self.extract_node_selector(pod, req)
+        self.extract_env_and_secrets(pod, req)
+        self.extract_volume_secrets(pod, req)
+        self.attach_volumes(pod, req)
+        self.attach_volume_mounts(pod, req)
+        self.extract_resources(pod, req)
+        self.extract_service_account_name(pod, req)
+        self.extract_init_containers(pod, req)
+        self.extract_image_pull_secrets(pod, req)
+        return req

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/kubernetes/pod.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod.py 
b/airflow/contrib/kubernetes/pod.py
new file mode 100644
index 0000000..6a9f76d
--- /dev/null
+++ b/airflow/contrib/kubernetes/pod.py
@@ -0,0 +1,92 @@
+
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class Resources:
+    def __init__(
+            self,
+            request_memory=None,
+            request_cpu=None,
+            limit_memory=None,
+            limit_cpu=None):
+        self.request_memory = request_memory
+        self.request_cpu = request_cpu
+        self.limit_memory = limit_memory
+        self.limit_cpu = limit_cpu
+
+    def is_empty_resource_request(self):
+        return not self.has_limits() and not self.has_requests()
+
+    def has_limits(self):
+        return self.limit_cpu is not None or self.limit_memory is not None
+
+    def has_requests(self):
+        return self.request_cpu is not None or self.request_memory is not None
+
+
+class Pod:
+    """
+    Represents a kubernetes pod and manages execution of a single pod.
+    :param image: The docker image
+    :type image: str
+    :param env: A dict containing the environment variables
+    :type env: dict
+    :param cmds: The command to be run on the pod
+    :type cmd: list str
+    :param secrets: Secrets to be launched to the pod
+    :type secrets: list Secret
+    :param result: The result that will be returned to the operator after
+                   successful execution of the pod
+    :type result: any
+    """
+    pod_timeout = 3600
+
+    def __init__(
+            self,
+            image,
+            envs,
+            cmds,
+            args=None,
+            secrets=None,
+            labels=None,
+            node_selectors=None,
+            name=None,
+            volumes=None,
+            volume_mounts=None,
+            namespace='default',
+            result=None,
+            image_pull_policy="IfNotPresent",
+            image_pull_secrets=None,
+            init_containers=None,
+            service_account_name=None,
+            resources=None
+    ):
+        self.image = image
+        self.envs = envs or {}
+        self.cmds = cmds
+        self.args = args or []
+        self.secrets = secrets or []
+        self.result = result
+        self.labels = labels or {}
+        self.name = name
+        self.volumes = volumes or []
+        self.volume_mounts = volume_mounts or []
+        self.node_selectors = node_selectors or []
+        self.namespace = namespace
+        self.image_pull_policy = image_pull_policy
+        self.image_pull_secrets = image_pull_secrets
+        self.init_containers = init_containers
+        self.service_account_name = service_account_name
+        self.resources = resources or Resources()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/kubernetes/pod_generator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod_generator.py 
b/airflow/contrib/kubernetes/pod_generator.py
new file mode 100644
index 0000000..685be37
--- /dev/null
+++ b/airflow/contrib/kubernetes/pod_generator.py
@@ -0,0 +1,278 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+
+from airflow.contrib.kubernetes.pod import Pod
+import uuid
+
+
+class PodGenerator:
+    """Contains Kubernetes Airflow Worker configuration logic"""
+
+    def __init__(self, kube_config=None):
+        self.kube_config = kube_config
+        self.env_vars = {}
+        self.volumes = []
+        self.volume_mounts = []
+        self.init_containers = []
+        self.secrets = []
+
+    def add_init_container(self,
+                           name,
+                           image,
+                           securityContext,
+                           init_environment,
+                           volume_mounts
+                           ):
+        """
+
+        Adds an init container to the launched pod. useful for pre-
+
+        Args:
+            name (str):
+            image (str):
+            securityContext (dict):
+            init_environment (dict):
+            volume_mounts (dict):
+
+        Returns:
+
+        """
+        self.init_containers.append(
+            {
+                'name': name,
+                'image': image,
+                'securityContext': securityContext,
+                'env': init_environment,
+                'volumeMounts': volume_mounts
+            }
+        )
+
+    def _get_init_containers(self):
+        return self.init_containers
+
+    def add_volume(self, name):
+        """
+
+        Args:
+            name (str):
+
+        Returns:
+
+        """
+        self.volumes.append({'name': name})
+
+    def add_volume_with_configmap(self, name, config_map):
+        self.volumes.append(
+            {
+                'name': name,
+                'configMap': config_map
+            }
+        )
+
+    def add_mount(self,
+                  name,
+                  mount_path,
+                  sub_path,
+                  read_only):
+        """
+
+        Args:
+            name (str):
+            mount_path (str):
+            sub_path (str):
+            read_only:
+
+        Returns:
+
+        """
+
+        self.volume_mounts.append({
+            'name': name,
+            'mountPath': mount_path,
+            'subPath': sub_path,
+            'readOnly': read_only
+        })
+
+    def _get_volumes_and_mounts(self):
+        return self.volumes, self.volume_mounts
+
+    def _get_image_pull_secrets(self):
+        """Extracts any image pull secrets for fetching container(s)"""
+        if not self.kube_config.image_pull_secrets:
+            return []
+        return self.kube_config.image_pull_secrets.split(',')
+
+    def make_pod(self, namespace, image, pod_id, cmds,
+                 arguments, labels, kube_executor_config=None):
+        volumes, volume_mounts = self._get_volumes_and_mounts()
+        worker_init_container_spec = self._get_init_containers()
+
+        # resources = Resources(
+        #     request_memory=kube_executor_config.request_memory,
+        #     request_cpu=kube_executor_config.request_cpu,
+        #     limit_memory=kube_executor_config.limit_memory,
+        #     limit_cpu=kube_executor_config.limit_cpu
+        # )
+
+        return Pod(
+            namespace=namespace,
+            name=pod_id + "-" + str(uuid.uuid1())[:8],
+            image=image,
+            cmds=cmds,
+            args=arguments,
+            labels=labels,
+            envs=self.env_vars,
+            secrets={},
+            # 
service_account_name=self.kube_config.worker_service_account_name,
+            # image_pull_secrets=self.kube_config.image_pull_secrets,
+            init_containers=worker_init_container_spec,
+            volumes=volumes,
+            volume_mounts=volume_mounts,
+            resources=None
+        )
+
+
+'''
+This class is a necessary building block to the kubernetes executor, which 
will be PR'd
+shortly
+'''
+
+
+class WorkerGenerator(PodGenerator):
+    def __init__(self, kube_config):
+        PodGenerator.__init__(self, kube_config)
+        self.volumes, self.volume_mounts = self._init_volumes_and_mounts()
+        self.init_containers = self._init_init_containers()
+
+    def _init_volumes_and_mounts(self):
+        dags_volume_name = "airflow-dags"
+        dags_path = os.path.join(self.kube_config.dags_folder,
+                                 self.kube_config.git_subpath)
+        volumes = [{
+            'name': dags_volume_name
+        }]
+        volume_mounts = [{
+            'name': dags_volume_name,
+            'mountPath': dags_path,
+            'readOnly': True
+        }]
+
+        # Mount the airflow.cfg file via a configmap the user has specified
+        if self.kube_config.airflow_configmap:
+            config_volume_name = "airflow-config"
+            config_path = 
'{}/airflow.cfg'.format(self.kube_config.airflow_home)
+            volumes.append({
+                'name': config_volume_name,
+                'configMap': {
+                    'name': self.kube_config.airflow_configmap
+                }
+            })
+            volume_mounts.append({
+                'name': config_volume_name,
+                'mountPath': config_path,
+                'subPath': 'airflow.cfg',
+                'readOnly': True
+            })
+
+        # A PV with the DAGs should be mounted
+        if self.kube_config.dags_volume_claim:
+            volumes[0]['persistentVolumeClaim'] = {
+                "claimName": self.kube_config.dags_volume_claim}
+            if self.kube_config.dags_volume_subpath:
+                volume_mounts[0]["subPath"] = 
self.kube_config.dags_volume_subpath
+        else:
+            # Create a Shared Volume for the Git-Sync module to populate
+            volumes[0]["emptyDir"] = {}
+        return volumes, volume_mounts
+
+    def _init_labels(self, dag_id, task_id, execution_date):
+        return {
+            "airflow-slave": "",
+            "dag_id": dag_id,
+            "task_id": task_id,
+            "execution_date": execution_date
+        },
+
+    def _get_environment(self):
+        env = super(self, WorkerGenerator).env_vars
+        """Defines any necessary environment variables for the pod executor"""
+        env['AIRFLOW__CORE__DAGS_FOLDER'] = '/tmp/dags'
+        env['AIRFLOW__CORE__EXECUTOR'] = 'LocalExecutor'
+
+        if self.kube_config.airflow_configmap:
+            env['AIRFLOW__CORE__AIRFLOW_HOME'] = self.kube_config.airflow_home
+        return env
+
+    def _init_init_containers(self, volume_mounts):
+        """When using git to retrieve the DAGs, use the GitSync Init 
Container"""
+        # If we're using volume claims to mount the dags, no init container is 
needed
+        if self.kube_config.dags_volume_claim:
+            return []
+
+        # Otherwise, define a git-sync init container
+        init_environment = [{
+            'name': 'GIT_SYNC_REPO',
+            'value': self.kube_config.git_repo
+        }, {
+            'name': 'GIT_SYNC_BRANCH',
+            'value': self.kube_config.git_branch
+        }, {
+            'name': 'GIT_SYNC_ROOT',
+            'value': '/tmp'
+        }, {
+            'name': 'GIT_SYNC_DEST',
+            'value': 'dags'
+        }, {
+            'name': 'GIT_SYNC_ONE_TIME',
+            'value': 'true'
+        }]
+        if self.kube_config.git_user:
+            init_environment.append({
+                'name': 'GIT_SYNC_USERNAME',
+                'value': self.kube_config.git_user
+            })
+        if self.kube_config.git_password:
+            init_environment.append({
+                'name': 'GIT_SYNC_PASSWORD',
+                'value': self.kube_config.git_password
+            })
+
+        volume_mounts[0]['readOnly'] = False
+        return [{
+            'name': self.kube_config.git_sync_init_container_name,
+            'image': self.kube_config.git_sync_container,
+            'securityContext': {'runAsUser': 0},
+            'env': init_environment,
+            'volumeMounts': volume_mounts
+        }]
+
+    def make_worker_pod(self,
+                        namespace,
+                        pod_id,
+                        dag_id,
+                        task_id,
+                        execution_date,
+                        airflow_command,
+                        kube_executor_config):
+        cmds = ["bash", "-cx", "--"]
+        labels = self._init_labels(dag_id, task_id, execution_date)
+        PodGenerator.make_pod(self,
+                              namespace=namespace,
+                              pod_id=pod_id,
+                              cmds=cmds,
+                              arguments=airflow_command,
+                              labels=labels,
+                              kube_executor_config=kube_executor_config)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/kubernetes/pod_launcher.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod_launcher.py 
b/airflow/contrib/kubernetes/pod_launcher.py
new file mode 100644
index 0000000..c910929
--- /dev/null
+++ b/airflow/contrib/kubernetes/pod_launcher.py
@@ -0,0 +1,119 @@
+
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import time
+from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.state import State
+from datetime import datetime as dt
+from airflow.contrib.kubernetes.kubernetes_request_factory import \
+    pod_request_factory as pod_fac
+from kubernetes import watch
+from kubernetes.client.rest import ApiException
+from airflow import AirflowException
+
+from .kube_client import get_kube_client
+
+
+class PodStatus(object):
+    PENDING = 'pending'
+    RUNNING = 'running'
+    FAILED = 'failed'
+    SUCCEEDED = 'succeeded'
+
+
+class PodLauncher(LoggingMixin):
+    def __init__(self, kube_client=None):
+        super(PodLauncher, self).__init__()
+        self._client = kube_client or get_kube_client()
+        self._watch = watch.Watch()
+        self.kube_req_factory = pod_fac.SimplePodRequestFactory()
+
+    def run_pod_async(self, pod):
+        req = self.kube_req_factory.create(pod)
+        self.log.debug('Pod Creation Request: \n{}'.format(json.dumps(req, 
indent=2)))
+        try:
+            resp = self._client.create_namespaced_pod(body=req, 
namespace=pod.namespace)
+            self.log.debug('Pod Creation Response: {}'.format(resp))
+        except ApiException:
+            self.log.exception('Exception when attempting to create Namespaced 
Pod.')
+            raise
+        return resp
+
+    def run_pod(self, pod, startup_timeout=120):
+        # type: (Pod) -> State
+        """
+        Launches the pod synchronously and waits for completion.
+
+        Args:
+            pod (Pod):
+            startup_timeout (int): Timeout for startup of the pod (if pod is 
pending for
+             too long, considers task a failure
+        """
+        resp = self.run_pod_async(pod)
+        curr_time = dt.now()
+        if resp.status.start_time is None:
+            while self.pod_not_started(pod):
+                delta = dt.now() - curr_time
+                if delta.seconds >= startup_timeout:
+                    raise AirflowException("Pod took too long to start")
+                time.sleep(1)
+            self.log.debug('Pod not yet started')
+
+        final_status = self._monitor_pod(pod)
+        return final_status
+
+    def _monitor_pod(self, pod):
+        # type: (Pod) -> State
+
+        while self.pod_is_running(pod):
+            self.log.info("Pod {} has state {}".format(pod.name, 
State.RUNNING))
+            time.sleep(2)
+        return self._task_status(self.read_pod(pod))
+
+    def _task_status(self, event):
+        # type: (V1Pod) -> State
+        self.log.info(
+            "Event: {} had an event of type {}".format(event.metadata.name,
+                                                       event.status.phase))
+        status = self.process_status(event.metadata.name, event.status.phase)
+        return status
+
+    def pod_not_started(self, pod):
+        state = self._task_status(self.read_pod(pod))
+        return state == State.QUEUED
+
+    def pod_is_running(self, pod):
+        state = self._task_status(self.read_pod(pod))
+        return state != State.SUCCESS and state != State.FAILED
+
+    def read_pod(self, pod):
+        return self._client.read_namespaced_pod(pod.name, pod.namespace)
+
+    def process_status(self, job_id, status):
+        status = status.lower()
+        if status == PodStatus.PENDING:
+            return State.QUEUED
+        elif status == PodStatus.FAILED:
+            self.log.info("Event: {} Failed".format(job_id))
+            return State.FAILED
+        elif status == PodStatus.SUCCEEDED:
+            self.log.info("Event: {} Succeeded".format(job_id))
+            return State.SUCCESS
+        elif status == PodStatus.RUNNING:
+            return State.RUNNING
+        else:
+            self.log.info("Event: Invalid state {} on job {}".format(status, 
job_id))
+            return State.FAILED

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/kubernetes/secret.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/secret.py 
b/airflow/contrib/kubernetes/secret.py
new file mode 100644
index 0000000..15f070e
--- /dev/null
+++ b/airflow/contrib/kubernetes/secret.py
@@ -0,0 +1,36 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class Secret:
+    """Defines Kubernetes Secret Volume"""
+
+    def __init__(self, deploy_type, deploy_target, secret, key):
+        """Initialize a Kubernetes Secret Object. Used to track requested 
secrets from
+        the user.
+
+        :param deploy_type: The type of secret deploy in Kubernetes, either 
`env` or
+            `volume`
+        :type deploy_type: ``str``
+        :param deploy_target: The environment variable to be created in the 
worker.
+        :type deploy_target: ``str``
+        :param secret: Name of the secrets object in Kubernetes
+        :type secret: ``str``
+        :param key: Key of the secret within the Kubernetes Secret
+        :type key: ``str``
+        """
+        self.deploy_type = deploy_type
+        self.deploy_target = deploy_target.upper()
+        self.secret = secret
+        self.key = key

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/airflow/contrib/operators/kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py 
b/airflow/contrib/operators/kubernetes_pod_operator.py
new file mode 100644
index 0000000..f09a25c
--- /dev/null
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -0,0 +1,71 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.contrib.kubernetes import kube_client, pod_generator, pod_launcher
+from airflow.utils.state import State
+
+template_fields = ('templates_dict',)
+template_ext = tuple()
+ui_color = '#ffefeb'
+
+
+class KubernetesPodOperator(BaseOperator):
+    def execute(self, context):
+        try:
+
+            client = kube_client.get_kube_client(in_cluster=self.in_cluster)
+            gen = pod_generator.PodGenerator()
+
+            pod = gen.make_pod(namespace=self.namespace,
+                               image=self.image,
+                               pod_id=self.name,
+                               cmds=self.cmds,
+                               arguments=self.arguments,
+                               labels=self.labels,
+                               kube_executor_config=self.kube_executor_config
+                               )
+
+            launcher = pod_launcher.PodLauncher(client)
+            final_state = launcher.run_pod(pod, self.startup_timeout_seconds)
+            if final_state != State.SUCCESS:
+                raise AirflowException("Pod returned a failure")
+        except AirflowException as ex:
+            raise AirflowException("Pod Launching failed: 
{error}".format(error=ex))
+
+    @apply_defaults
+    def __init__(self,
+                 namespace,
+                 image,
+                 cmds,
+                 arguments,
+                 name,
+                 in_cluster=False,
+                 labels=None,
+                 startup_timeout_seconds=120,
+                 kube_executor_config=None,
+                 *args,
+                 **kwargs):
+        super(KubernetesPodOperator, self).__init__(*args, **kwargs)
+        self.kube_executor_config = kube_executor_config or {}
+        self.image = image
+        self.namespace = namespace
+        self.cmds = cmds
+        self.arguments = arguments
+        self.labels = labels or {}
+        self.startup_timeout_seconds = startup_timeout_seconds
+        self.name = name
+        self.in_cluster = in_cluster

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/scripts/ci/kubernetes/minikube/start_minikube.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/minikube/start_minikube.sh 
b/scripts/ci/kubernetes/minikube/start_minikube.sh
new file mode 100755
index 0000000..f78cb3a
--- /dev/null
+++ b/scripts/ci/kubernetes/minikube/start_minikube.sh
@@ -0,0 +1,53 @@
+#  Licensed to the Apache Software Foundation (ASF) under one   *
+#  or more contributor license agreements.  See the NOTICE file *
+#  distributed with this work for additional information        *
+#  regarding copyright ownership.  The ASF licenses this file   *
+#  to you under the Apache License, Version 2.0 (the            *
+#  "License"); you may not use this file except in compliance   *
+#  with the License.  You may obtain a copy of the License at   *
+#                                                               *
+#    http://www.apache.org/licenses/LICENSE-2.0                 *
+#                                                               *
+#  Unless required by applicable law or agreed to in writing,   *
+#  software distributed under the License is distributed on an  *
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+#  KIND, either express or implied.  See the License for the    *
+#  specific language governing permissions and limitations      *
+#  under the License.                                           *
+
+# Guard against a kubernetes cluster already being up
+#!/usr/bin/env bash
+kubectl get pods &> /dev/null
+if [ $? -eq 0 ]; then
+  echo "kubectl get pods returned 0 exit code, exiting early"
+  exit 0
+fi
+#
+
+curl -Lo minikube 
https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64 && 
chmod +x minikube
+curl -Lo kubectl  
https://storage.googleapis.com/kubernetes-release/release/v1.7.0/bin/linux/amd64/kubectl
 && chmod +x kubectl
+
+sudo mkdir -p /usr/local/bin
+sudo mv minikube /usr/local/bin/minikube
+sudo mv kubectl /usr/local/bin/kubectl
+
+export MINIKUBE_WANTUPDATENOTIFICATION=false
+export MINIKUBE_WANTREPORTERRORPROMPT=false
+export MINIKUBE_HOME=$HOME
+export CHANGE_MINIKUBE_NONE_USER=true
+mkdir $HOME/.kube || true
+touch $HOME/.kube/config
+
+export KUBECONFIG=$HOME/.kube/config
+sudo -E minikube start --vm-driver=none
+
+# this for loop waits until kubectl can access the api server that minikube 
has created
+for i in {1..150} # timeout for 5 minutes
+do
+  echo "------- Running kubectl get pods -------"
+  kubectl get po &> /dev/null
+  if [ $? -ne 1 ]; then
+    break
+  fi
+  sleep 2
+done

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/scripts/ci/kubernetes/setup_kubernetes.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/setup_kubernetes.sh 
b/scripts/ci/kubernetes/setup_kubernetes.sh
new file mode 100755
index 0000000..fa4e523
--- /dev/null
+++ b/scripts/ci/kubernetes/setup_kubernetes.sh
@@ -0,0 +1,28 @@
+#  Licensed to the Apache Software Foundation (ASF) under one   *
+#  or more contributor license agreements.  See the NOTICE file *
+#  distributed with this work for additional information        *
+#  regarding copyright ownership.  The ASF licenses this file   *
+#  to you under the Apache License, Version 2.0 (the            *
+#  "License"); you may not use this file except in compliance   *
+#  with the License.  You may obtain a copy of the License at   *
+#                                                               *
+#    http://www.apache.org/licenses/LICENSE-2.0                 *
+#                                                               *
+#  Unless required by applicable law or agreed to in writing,   *
+#  software distributed under the License is distributed on an  *
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+#  KIND, either express or implied.  See the License for the    *
+#  specific language governing permissions and limitations      *
+#  under the License.                                           *
+
+set -o xtrace
+set -e
+
+echo "This script downloads minikube, starts a driver=None minikube cluster, 
builds the airflow source and docker image, and then deploys airflow onto 
kubernetes"
+echo "For development, start minikube yourself (ie: minikube start) then run 
this script as you probably do not want a driver=None minikube cluster"
+
+DIRNAME=$(cd "$(dirname "$0")"; pwd)
+
+$DIRNAME/minikube/start_minikube.sh
+
+echo "Airflow environment on kubernetes is good to go!"

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/scripts/ci/requirements.txt
----------------------------------------------------------------------
diff --git a/scripts/ci/requirements.txt b/scripts/ci/requirements.txt
index 2b5a8c9..b6ed49c 100644
--- a/scripts/ci/requirements.txt
+++ b/scripts/ci/requirements.txt
@@ -93,3 +93,4 @@ thrift
 thrift_sasl
 unicodecsv
 zdesk
+kubernetes

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/scripts/ci/travis_script.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/travis_script.sh b/scripts/ci/travis_script.sh
new file mode 100755
index 0000000..a51e742
--- /dev/null
+++ b/scripts/ci/travis_script.sh
@@ -0,0 +1,39 @@
+#  Licensed to the Apache Software Foundation (ASF) under one   *
+#  or more contributor license agreements.  See the NOTICE file *
+#  distributed with this work for additional information        *
+#  regarding copyright ownership.  The ASF licenses this file   *
+#  to you under the Apache License, Version 2.0 (the            *
+#  "License"); you may not use this file except in compliance   *
+#  with the License.  You may obtain a copy of the License at   *
+#                                                               *
+#    http://www.apache.org/licenses/LICENSE-2.0                 *
+#                                                               *
+#  Unless required by applicable law or agreed to in writing,   *
+#  software distributed under the License is distributed on an  *
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+#  KIND, either express or implied.  See the License for the    *
+#  specific language governing permissions and limitations      *
+#  under the License.                                           *
+
+DIRNAME=$(cd "$(dirname "$0")"; pwd)
+AIRFLOW_ROOT="$DIRNAME/../.."
+cd $AIRFLOW_ROOT && pip --version && ls -l $HOME/.wheelhouse && tox --version
+
+if [ -z "$RUN_KUBE_INTEGRATION" ];
+then
+  $DIRNAME/kubernetes/setup_kubernetes.sh
+  tox -e $TOX_ENV
+else
+  $DIRNAME/kubernetes/setup_kubernetes.sh && \
+  tox -e $TOX_ENV -- tests.contrib.executors.integration \
+                     --with-coverage \
+                     --cover-erase \
+                     --cover-html \
+                     --cover-package=airflow \
+                     --cover-html-dir=airflow/www/static/coverage \
+                     --with-ignore-docstrings \
+                     --rednose \
+                     --with-timer \
+                     -v \
+                     --logging-level=DEBUG
+fi

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index 77e79bf..ba5b031 100644
--- a/setup.py
+++ b/setup.py
@@ -162,6 +162,8 @@ github_enterprise = ['Flask-OAuthlib>=0.9.1']
 qds = ['qds-sdk>=1.9.6']
 cloudant = ['cloudant>=0.5.9,<2.0'] # major update coming soon, clamp to 0.x
 redis = ['redis>=2.10.5']
+kubernetes = ['kubernetes>=3.0.0',
+              'cryptography>=2.0.0']
 
 all_dbs = postgres + mysql + hive + mssql + hdfs + vertica + cloudant
 devel = [
@@ -182,7 +184,8 @@ devel = [
 ]
 devel_minreq = devel + mysql + doc + password + s3 + cgroups
 devel_hadoop = devel_minreq + hive + hdfs + webhdfs + kerberos
-devel_all = devel + all_dbs + doc + samba + s3 + slack + crypto + oracle + 
docker + ssh
+devel_all = (devel + all_dbs + doc + samba + s3 + slack + crypto + oracle + 
docker + ssh +
+             kubernetes)
 
 
 def do_setup():
@@ -278,6 +281,7 @@ def do_setup():
             'webhdfs': webhdfs,
             'jira': jira,
             'redis': redis,
+            'kubernetes': kubernetes
         },
         classifiers=[
             'Development Status :: 5 - Production/Stable',

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78ff2fc1/tests/contrib/operators/test_kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_kubernetes_pod_operator.py 
b/tests/contrib/operators/test_kubernetes_pod_operator.py
new file mode 100644
index 0000000..205f183
--- /dev/null
+++ b/tests/contrib/operators/test_kubernetes_pod_operator.py
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from airflow.contrib.operators.kubernetes_pod_operator import 
KubernetesPodOperator
+from airflow import AirflowException
+
+
+class KubernetesPodOperatorTest(unittest.TestCase):
+
+    def test_working_pod(self):
+        k = KubernetesPodOperator(namespace='default',
+                                  image="ubuntu:16.04",
+                                  cmds=["bash", "-cx"],
+                                  arguments=["echo", "10"],
+                                  labels={"foo": "bar"},
+                                  name="test",
+                                  task_id="task"
+                                  )
+
+        k.execute(None)
+
+    def test_faulty_image(self):
+        bad_image_name = "foobar"
+        k = KubernetesPodOperator(namespace='default',
+                                  image=bad_image_name,
+                                  cmds=["bash", "-cx"],
+                                  arguments=["echo", "10"],
+                                  labels={"foo": "bar"},
+                                  name="test",
+                                  task_id="task",
+                                  startup_timeout_seconds=5
+                                  )
+        with self.assertRaises(AirflowException) as cm:
+            k.execute(None),
+
+        print("exception: {}".format(cm))
+
+    def test_pod_failure(self):
+        """
+            Tests that the task fails when a pod reports a failure
+        """
+
+        bad_internal_command = "foobar"
+        k = KubernetesPodOperator(namespace='default',
+                                  image="ubuntu:16.04",
+                                  cmds=["bash", "-cx"],
+                                  arguments=[bad_internal_command, "10"],
+                                  labels={"foo": "bar"},
+                                  name="test",
+                                  task_id="task"
+                                  )
+
+        with self.assertRaises(AirflowException):
+            k.execute(None)

Reply via email to