[AIRFLOW-1314] Improve k8s support

Add kubernetes config section in airflow.cfg and Inject GCP secrets upon 
executor start. (#17)
Update Airflow to Pass configuration to k8s containers, add some Py3 … (#9)

* Update Airflow to Pass configuration to k8s containers, add some Py3 compat., 
create git-sync pod

* Undo changes to display-source config setter for to_dict

* WIP Secrets and Configmaps

* Improve secrets support for multiple secrets. Add support for registry 
secrets. Add support for RBAC service accounts.

* Swap order of variables, overlooked very basic issue

* Secret env var names must be upper

* Update logging

* Revert spothero test code in setup.py

* WIP Fix tests

* Worker should be using local executor

* Consolidate worker setup and address code review comments

* reconfigure airflow script to use new secrets method


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ad4e67ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ad4e67ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ad4e67ce

Branch: refs/heads/master
Commit: ad4e67ce1be114f248ecd5ea1468ccea79366590
Parents: a9d90dc
Author: fenglu-g <[email protected]>
Authored: Mon Oct 23 09:52:06 2017 -0700
Committer: Fokko Driesprong <[email protected]>
Committed: Sun Apr 22 10:23:06 2018 +0200

----------------------------------------------------------------------
 airflow/config_templates/default_airflow.cfg    |  63 ++++-
 .../contrib/executors/kubernetes_executor.py    | 239 ++++++++++---------
 .../contrib/kubernetes/kubernetes_factory.py    |   2 -
 .../kubernetes_request_factory.py               |  25 +-
 .../pod_request_factory.py                      |  28 ++-
 airflow/contrib/kubernetes/pod.py               |  13 +-
 airflow/contrib/kubernetes/pod_launcher.py      |  28 ++-
 airflow/contrib/kubernetes/secret.py            |  33 ++-
 .../contrib/kubernetes/worker_configuration.py  | 158 ++++++++++++
 airflow/jobs.py                                 |   5 +-
 airflow/utils/dag_processing.py                 |   2 +-
 scripts/ci/kubernetes/docker/Dockerfile         |   6 -
 .../ci/kubernetes/kube/airflow.yaml.template    | 134 +++++++----
 scripts/ci/kubernetes/kube/deploy.sh            |   7 +-
 14 files changed, 513 insertions(+), 230 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ad4e67ce/airflow/config_templates/default_airflow.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index 400bcc0..6a966cb 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -5,9 +5,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -498,3 +498,62 @@ hide_sensitive_variable_fields = True
 
 [elasticsearch]
 elasticsearch_host =
+[kubernetes]
+# The repository and tag of the Kubernetes Image for the Worker to Run
+worker_container_repository =
+worker_container_tag =
+
+# If True (default), worker pods will be deleted upon termination
+delete_worker_pods = True
+
+# The Kubernetes namespace where airflow workers should be created. Defaults 
to `default`
+namespace = default
+
+# The name of the Kubernetes ConfigMap Containing the Airflow Configuration 
(this file)
+airflow_configmap =
+
+# For either git sync or volume mounted DAGs, the worker will look in this 
subpath for DAGs
+dags_volume_subpath =
+
+# For DAGs mounted via a volume claim (mutually exclusive with volume claim)
+dags_volume_claim =
+
+# Git credentials and repository for DAGs mounted via Git (mutually exclusive 
with volume claim)
+git_repo =
+git_branch =
+git_user =
+git_password =
+
+# For cloning DAGs from git repositories into volumes: 
https://github.com/kubernetes/git-sync
+git_sync_container_repository = gcr.io/google-containers/git-sync-amd64
+git_sync_container_tag = v2.0.5
+git_sync_init_container_name = git-sync-clone
+
+# The name of the Kubernetes service account to be associated with airflow 
workers, if any.
+# Service accounts are required for workers that require access to secrets or 
cluster resources.
+# See the Kubernetes RBAC documentation for more:
+#   https://kubernetes.io/docs/admin/authorization/rbac/
+worker_service_account_name =
+
+# Any image pull secrets to be given to worker pods, If more than one secret is
+# required, provide a comma separated list: secret_a,secret_b
+image_pull_secrets =
+
+# GCP Service Account Keys to be provided to tasks run on Kubernetes Executors
+# Should be supplied in the format: key-name-1:key-path-1,key-name-2:key-path-2
+gcp_service_account_keys =
+
+[kubernetes_secrets]
+# The scheduler mounts the following secrets into your workers as they are 
launched by the
+# scheduler. You may define as many secrets as needed and the kubernetes 
launcher will parse the
+# defined secrets and mount them as secret environment variables in the 
launched workers.
+# Secrets in this section are defined as follows
+#     <environment_variable_mount> = 
<kubernetes_secret_object>:<kubernetes_secret_key>
+#
+# For example if you wanted to mount a kubernetes secret key named 
`postgres_password` from the
+# kubernetes secret object `airflow-secret` as the environment variable 
`POSTGRES_PASSWORD` into
+# your workers you would follow the following format:
+#     POSTGRES_PASSWORD = airflow-secret:postgres_credentials
+#
+# Additionally you may override worker airflow settings with the 
AIRFLOW__<SECTION>__<KEY>
+# formatting as supported by airflow normally.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ad4e67ce/airflow/contrib/executors/kubernetes_executor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/executors/kubernetes_executor.py 
b/airflow/contrib/executors/kubernetes_executor.py
index 8989add..9675e81 100644
--- a/airflow/contrib/executors/kubernetes_executor.py
+++ b/airflow/contrib/executors/kubernetes_executor.py
@@ -12,10 +12,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import calendar
-import logging
+import base64
 import os
 import multiprocessing
+import six
 from queue import Queue
 from dateutil import parser
 from uuid import uuid4
@@ -23,13 +23,13 @@ from kubernetes import watch, client
 from kubernetes.client.rest import ApiException
 from airflow.contrib.kubernetes.pod_launcher import PodLauncher
 from airflow.contrib.kubernetes.kube_client import get_kube_client
+from airflow.contrib.kubernetes.worker_configuration import WorkerConfiguration
 from airflow.executors.base_executor import BaseExecutor
 from airflow.models import TaskInstance, KubeResourceVersion
 from airflow.utils.state import State
 from airflow import configuration, settings
 from airflow.exceptions import AirflowConfigException
-from airflow.contrib.kubernetes.pod import Pod
-
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 class KubeConfig:
     core_section = "core"
@@ -50,98 +50,84 @@ class KubeConfig:
             return default
 
     def __init__(self):
+        configuration_dict = configuration.as_dict(display_sensitive=True)
+        self.core_configuration = configuration_dict['core']
+        self.kube_secrets = configuration_dict.get('kubernetes_secrets', {})
+        self.airflow_home = configuration.get(self.core_section, 
'airflow_home')
         self.dags_folder = configuration.get(self.core_section, 'dags_folder')
         self.parallelism = configuration.getint(self.core_section, 
'PARALLELISM')
-        self.kube_image = configuration.get(self.kubernetes_section, 
'container_image')
+        self.worker_container_repository = configuration.get(
+            self.kubernetes_section, 'worker_container_repository')
+        self.worker_container_tag = configuration.get(
+            self.kubernetes_section, 'worker_container_tag')
+        self.kube_image = '{}:{}'.format(
+            self.worker_container_repository, self.worker_container_tag)
         self.delete_worker_pods = 
self.safe_getboolean(self.kubernetes_section, 'delete_worker_pods', True)
-        self.kube_namespace = os.environ.get('AIRFLOW_KUBE_NAMESPACE', 
'default')
 
-        # These two props must be set together
+        self.worker_service_account_name = self.safe_get(
+            self.kubernetes_section, 'worker_service_account_name', 'default')
+        self.image_pull_secrets = self.safe_get(
+            self.kubernetes_section, 'image_pull_secrets', '')
+
+        # NOTE: `git_repo` and `git_branch` must be specified together as a 
pair
+        # The http URL of the git repository to clone from
         self.git_repo = self.safe_get(self.kubernetes_section, 'git_repo', 
None)
+        # The branch of the repository to be checked out
         self.git_branch = self.safe_get(self.kubernetes_section, 'git_branch', 
None)
+        # Optionally, the directory in the git repository containing the dags
+        self.git_subpath = self.safe_get(self.kubernetes_section, 
'git_subpath', '')
+
+        # Optionally a user may supply a `git_user` and `git_password` for 
private repositories
+        self.git_user = self.safe_get(self.kubernetes_section, 'git_user', 
None)
+        self.git_password = self.safe_get(self.kubernetes_section, 
'git_password', None)
 
-        # Or this one prop
+        # NOTE: The user may optionally use a volume claim to mount a PV 
containing DAGs directly
         self.dags_volume_claim = self.safe_get(self.kubernetes_section, 
'dags_volume_claim', None)
-        # And optionally this prop
+
+        # This prop may optionally be set for PV Claims and is used to locate 
DAGs on a SubPath
         self.dags_volume_subpath = self.safe_get(self.kubernetes_section, 
'dags_volume_subpath', None)
 
+        # The Kubernetes Namespace in which the Scheduler and Webserver 
reside. Note that if your
+        # cluster has RBAC enabled, your scheduler may need service account 
permissions to
+        # create, watch, get, and delete pods in this namespace.
+        self.kube_namespace = self.safe_get(self.kubernetes_section, 
'namespace', 'default')
+        # The Kubernetes Namespace in which pods will be created by the 
executor. Note that if your
+        # cluster has RBAC enabled, your workers may need service account 
permissions to
+        # interact with cluster components.
+        self.executor_namespace = self.safe_get(self.kubernetes_section, 
'namespace', 'default')
+        # Task secrets managed by KubernetesExecutor.
+        self.gcp_service_account_keys = self.safe_get(self.kubernetes_section, 
'gcp_service_account_keys', None)
+
+        # If the user is using the git-sync container to clone their 
repository via git,
+        # allow them to specify repository, tag, and pod name for the init 
container.
+        self.git_sync_container_repository = self.safe_get(
+            self.kubernetes_section, 'git_sync_container_repository',
+            'gcr.io/google-containers/git-sync-amd64')
+
+        self.git_sync_container_tag = self.safe_get(
+            self.kubernetes_section, 'git_sync_container_tag', 'v2.0.5')
+        self.git_sync_container = '{}:{}'.format(
+            self.git_sync_container_repository, self.git_sync_container_tag)
+
+        self.git_sync_init_container_name = self.safe_get(
+            self.kubernetes_section, 'git_sync_init_container_name', 
'git-sync-clone')
+
+        # The worker pod may optionally have a  valid Airflow config loaded 
via a configmap
+        self.airflow_configmap = self.safe_get(self.kubernetes_section, 
'airflow_configmap', None)
+
         self._validate()
 
     def _validate(self):
-        if self.dags_volume_claim:
-            # do volume things
-            pass
-        elif self.git_repo and self.git_branch:
-            # do git things
-            pass
-        else:
+        if not self.dags_volume_claim and (not self.git_repo or not 
self.git_branch):
             raise AirflowConfigException(
                 "In kubernetes mode you must set the following configs in the 
`kubernetes` section: "
                 "`dags_volume_claim` or "
-                "`git_repo and git_branch`"
+                "`git_repo and git_branch` "
             )
 
 
-class PodMaker:
-    def __init__(self, kube_config):
-        self.logger = logging.getLogger(__name__)
-        self.kube_config = kube_config
-
-    def _get_volumes_and_mounts(self):
-        volume_name = "airflow-dags"
-
-        if self.kube_config.dags_volume_claim:
-            volumes = [{
-                "name": volume_name, "persistentVolumeClaim": {"claimName": 
self.kube_config.dags_volume_claim}
-            }]
-            volume_mounts = [{
-                "name": volume_name, "mountPath": self.kube_config.dags_folder,
-                "readOnly": True
-            }]
-            if self.kube_config.dags_volume_subpath:
-                volume_mounts[0]["subPath"] = 
self.kube_config.dags_volume_subpath
-
-            return volumes, volume_mounts
-        else:
-            return [], []
-
-    def _get_args(self, airflow_command):
-        if self.kube_config.dags_volume_claim:
-            self.logger.info("Using k8s_dags_volume_claim for airflow dags")
-            return [airflow_command]
-        else:
-            self.logger.info("Using git-syncher for airflow dags")
-            cmd_args = "mkdir -p {dags_folder} && cd {dags_folder} &&" \
-                       "git init && git remote add origin {git_repo} && git 
pull origin {git_branch} --depth=1 &&" \
-                       
"{command}".format(dags_folder=self.kube_config.dags_folder, 
git_repo=self.kube_config.git_repo,
-                                          
git_branch=self.kube_config.git_branch, command=airflow_command)
-            return [cmd_args]
-
-    def make_pod(self, namespace, pod_id, dag_id, task_id, execution_date, 
airflow_command):
-        volumes, volume_mounts = self._get_volumes_and_mounts()
-
-        pod = Pod(
-            namespace=namespace,
-            name=pod_id,
-            image=self.kube_config.kube_image,
-            cmds=["bash", "-cx", "--"],
-            args=self._get_args(airflow_command),
-            labels={
-                "airflow-slave": "",
-                "dag_id": dag_id,
-                "task_id": task_id,
-                "execution_date": execution_date
-            },
-            envs={"AIRFLOW__CORE__EXECUTOR": "LocalExecutor"},
-            volumes=volumes,
-            volume_mounts=volume_mounts
-        )
-        return pod
-
-
-class KubernetesJobWatcher(multiprocessing.Process, object):
+class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin, object):
     def __init__(self, namespace, watcher_queue, resource_version):
-        self.logger = logging.getLogger(__name__)
         multiprocessing.Process.__init__(self)
         self.namespace = namespace
         self.watcher_queue = watcher_queue
@@ -153,14 +139,14 @@ class KubernetesJobWatcher(multiprocessing.Process, 
object):
             try:
                 self.resource_version = self._run(kube_client, 
self.resource_version)
             except Exception:
-                self.logger.exception("Unknown error in KubernetesJobWatcher. 
Failing")
+                self.log.exception("Unknown error in KubernetesJobWatcher. 
Failing")
                 raise
             else:
-                self.logger.warn("Watch died gracefully, starting back up 
with: "
-                                 "last resource_version: 
{}".format(self.resource_version))
+                self.log.warn("Watch died gracefully, starting back up with: "
+                              "last resource_version: 
{}".format(self.resource_version))
 
     def _run(self, kube_client, resource_version):
-        self.logger.info("Event: and now my watch begins starting at 
resource_version: {}".format(resource_version))
+        self.log.info("Event: and now my watch begins starting at 
resource_version: {}".format(resource_version))
         watcher = watch.Watch()
 
         kwargs = {"label_selector": "airflow-slave"}
@@ -170,7 +156,7 @@ class KubernetesJobWatcher(multiprocessing.Process, object):
         last_resource_version = None
         for event in watcher.stream(kube_client.list_namespaced_pod, 
self.namespace, **kwargs):
             task = event['object']
-            self.logger.info("Event: {} had an event of type 
{}".format(task.metadata.name, event['type']))
+            self.log.info("Event: {} had an event of type 
{}".format(task.metadata.name, event['type']))
             self.process_status(
                 task.metadata.name, task.status.phase, task.metadata.labels, 
task.metadata.resource_version
             )
@@ -180,32 +166,31 @@ class KubernetesJobWatcher(multiprocessing.Process, 
object):
 
     def process_status(self, pod_id, status, labels, resource_version):
         if status == 'Pending':
-            self.logger.info("Event: {} Pending".format(pod_id))
+            self.log.info("Event: {} Pending".format(pod_id))
         elif status == 'Failed':
-            self.logger.info("Event: {} Failed".format(pod_id))
+            self.log.info("Event: {} Failed".format(pod_id))
             self.watcher_queue.put((pod_id, State.FAILED, labels, 
resource_version))
         elif status == 'Succeeded':
-            self.logger.info("Event: {} Succeeded".format(pod_id))
+            self.log.info("Event: {} Succeeded".format(pod_id))
             self.watcher_queue.put((pod_id, None, labels, resource_version))
         elif status == 'Running':
-            self.logger.info("Event: {} is Running".format(pod_id))
+            self.log.info("Event: {} is Running".format(pod_id))
         else:
-            self.logger.warn("Event: Invalid state: {} on pod: {} with labels: 
{} "
+            self.log.warn("Event: Invalid state: {} on pod: {} with labels: {} 
"
                              "with resource_version: {}".format(status, 
pod_id, labels, resource_version))
 
 
-class AirflowKubernetesScheduler(object):
+class AirflowKubernetesScheduler(LoggingMixin, object):
     def __init__(self, kube_config, task_queue, result_queue, session, 
kube_client):
-        self.logger = logging.getLogger(__name__)
-        self.logger.info("creating kubernetes executor")
-        self.kube_config = KubeConfig()
+        self.log.debug("creating kubernetes executor")
+        self.kube_config = kube_config
         self.task_queue = task_queue
-        self.pending_jobs = set()
-        self.namespace = os.environ['k8s_POD_NAMESPACE']
-        self.logger.info("k8s: using namespace {}".format(self.namespace))
+        self.result_queue = result_queue
+        self.namespace = self.kube_config.kube_namespace
+        self.log.debug("k8s: using namespace {}".format(self.namespace))
         self.kube_client = kube_client
         self.launcher = PodLauncher(kube_client=self.kube_client)
-        self.pod_maker = PodMaker(kube_config=self.kube_config)
+        self.worker_configuration = 
WorkerConfiguration(kube_config=self.kube_config)
         self.watcher_queue = multiprocessing.Queue()
         self._session = session
         self.kube_watcher = self._make_kube_watcher()
@@ -220,7 +205,7 @@ class AirflowKubernetesScheduler(object):
         if self.kube_watcher.is_alive():
             pass
         else:
-            self.logger.error("Error while health checking kube watcher 
process. Process died for unknown reasons")
+            self.log.error("Error while health checking kube watcher process. 
Process died for unknown reasons")
             self.kube_watcher = self._make_kube_watcher()
 
     def run_next(self, next_job):
@@ -234,19 +219,19 @@ class AirflowKubernetesScheduler(object):
         :return: 
 
         """
-        self.logger.info('k8s: job is {}'.format(str(next_job)))
+        self.log.debug('k8s: job is {}'.format(str(next_job)))
         key, command = next_job
         dag_id, task_id, execution_date = key
-        self.logger.info("k8s: running for command {}".format(command))
-        self.logger.info("k8s: launching image 
{}".format(self.kube_config.kube_image))
-        pod = self.pod_maker.make_pod(
+        self.log.debug("k8s: running for command {}".format(command))
+        self.log.debug("k8s: launching image 
{}".format(self.kube_config.kube_image))
+        pod = self.worker_configuration.make_pod(
             namespace=self.namespace, pod_id=self._create_pod_id(dag_id, 
task_id),
             dag_id=dag_id, task_id=task_id, 
execution_date=self._datetime_to_label_safe_datestring(execution_date),
             airflow_command=command
         )
         # the watcher will monitor pods, so we do not block.
         self.launcher.run_pod_async(pod)
-        self.logger.info("k8s: Job created!")
+        self.log.debug("k8s: Job created!")
 
     def delete_pod(self, pod_id):
         if self.kube_config.delete_worker_pods:
@@ -272,10 +257,10 @@ class AirflowKubernetesScheduler(object):
 
     def process_watcher_task(self):
         pod_id, state, labels, resource_version = self.watcher_queue.get()
-        logging.info("Attempting to finish pod; pod_id: {}; state: {}; labels: 
{}".format(pod_id, state, labels))
+        self.log.info("Attempting to finish pod; pod_id: {}; state: {}; 
labels: {}".format(pod_id, state, labels))
         key = self._labels_to_key(labels)
         if key:
-            self.logger.info("finishing job {}".format(key))
+            self.log.debug("finishing job {} - {} ({})".format(key, state, 
pod_id))
             self.result_queue.put((key, state, pod_id, resource_version))
 
     @staticmethod
@@ -336,13 +321,13 @@ class AirflowKubernetesScheduler(object):
         try:
             return labels["dag_id"], labels["task_id"], 
self._label_safe_datestring_to_datetime(labels["execution_date"])
         except Exception as e:
-            self.logger.warn("Error while converting labels to key; labels: 
{}; exception: {}".format(
+            self.log.warn("Error while converting labels to key; labels: {}; 
exception: {}".format(
                 labels, e
             ))
             return None
 
 
-class KubernetesExecutor(BaseExecutor):
+class KubernetesExecutor(BaseExecutor, LoggingMixin):
     def __init__(self):
         super(KubernetesExecutor, self).__init__(parallelism=PARALLELISM)
         self.task_queue = None
@@ -362,7 +347,7 @@ class KubernetesExecutor(BaseExecutor):
         :return: None
         """
         queued_tasks = 
self._session.query(TaskInstance).filter(TaskInstance.state == 
State.QUEUED).all()
-        self.logger.info("When executor started up, found {} queued task 
instances".format(len(queued_tasks)))
+        self.log.info("When executor started up, found {} queued task 
instances".format(len(queued_tasks)))
 
         for t in queued_tasks:
             kwargs = 
dict(label_selector="dag_id={},task_id={},execution_date={}".format(
@@ -370,7 +355,7 @@ class KubernetesExecutor(BaseExecutor):
             ))
             pod_list = 
self.kube_client.list_namespaced_pod(self.kube_config.kube_namespace, **kwargs)
             if len(pod_list.items) == 0:
-                self.logger.info("TaskInstance: {} found in queued state but 
was not launched, rescheduling".format(t))
+                self.log.info("TaskInstance: {} found in queued state but was 
not launched, rescheduling".format(t))
                 self._session.query(TaskInstance).filter(
                     TaskInstance.dag_id == t.dag_id,
                     TaskInstance.task_id == t.task_id,
@@ -379,8 +364,37 @@ class KubernetesExecutor(BaseExecutor):
 
         self._session.commit()
 
+    def _inject_secrets(self):
+        def _create_or_update_secret(secret_name, secret_path):
+            try:
+                return self.kube_client.create_namespaced_secret(
+                    self.kube_config.executor_namespace, 
kubernetes.client.V1Secret(
+                        data={'key.json' : base64.b64encode(open(secret_path, 
'r').read())},
+                        
metadata=kubernetes.client.V1ObjectMeta(name=secret_name)))
+            except ApiException as e:
+                if e.status == 409:
+                    return self.kube_client.replace_namespaced_secret(
+                        secret_name, self.kube_config.executor_namespace,
+                        kubernetes.client.V1Secret(
+                            data={'key.json' : 
base64.b64encode(open(secret_path, 'r').read())},
+                            
metadata=kubernetes.client.V1ObjectMeta(name=secret_name)))
+                self.log.exception("Exception while trying to inject secret."
+                                      "Secret name: {}, error details: 
{}.".format(secret_name, e))
+                raise
+
+        # For each GCP service account key, inject it as a secret in executor
+        # namespace with the specific secret name configured in the 
airflow.cfg.
+        # We let exceptions to pass through to users.
+        if self.kube_config.gcp_service_account_keys:
+            name_path_pair_list = [
+                {'name' : account_spec.strip().split('=')[0],
+                 'path' : account_spec.strip().split('=')[1]}
+                for account_spec in 
self.kube_config.gcp_service_account_keys.split(',')]
+            for service_account in name_path_pair_list:
+                _create_or_update_secret(service_account['name'], 
service_account['path'])
+
     def start(self):
-        self.logger.info('k8s: starting kubernetes executor')
+        self.log.info('k8s: starting kubernetes executor')
         self._session = settings.Session()
         self.task_queue = Queue()
         self.result_queue = Queue()
@@ -388,15 +402,16 @@ class KubernetesExecutor(BaseExecutor):
         self.kube_scheduler = AirflowKubernetesScheduler(
             self.kube_config, self.task_queue, self.result_queue, 
self._session, self.kube_client
         )
+        self._inject_secrets()
         self.clear_not_launched_queued_tasks()
 
     def execute_async(self, key, command, queue=None):
-        self.logger.info("k8s: adding task {} with command {}".format(key, 
command))
+        self.log.info("k8s: adding task {} with command {}".format(key, 
command))
         self.task_queue.put((key, command))
 
     def sync(self):
-        self.logger.info("self.running: {}".format(self.running))
-        self.logger.info("self.queued: {}".format(self.queued_tasks))
+        self.log.info("self.running: {}".format(self.running))
+        self.log.info("self.queued: {}".format(self.queued_tasks))
         self.kube_scheduler.sync()
 
         last_resource_version = None
@@ -404,7 +419,7 @@ class KubernetesExecutor(BaseExecutor):
             results = self.result_queue.get()
             key, state, pod_id, resource_version = results
             last_resource_version = resource_version
-            self.logger.info("Changing state of {}".format(results))
+            self.log.info("Changing state of {}".format(results))
             self._change_state(key, state, pod_id)
 
         KubeResourceVersion.checkpoint_resource_version(last_resource_version, 
session=self._session)
@@ -434,7 +449,7 @@ class KubernetesExecutor(BaseExecutor):
             self._session.commit()
 
     def end(self):
-        self.logger.info('ending kube executor')
+        self.log.info('ending kube executor')
         self.task_queue.join()
 
     def execute_async(self, key, command, queue=None):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ad4e67ce/airflow/contrib/kubernetes/kubernetes_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_factory.py 
b/airflow/contrib/kubernetes/kubernetes_factory.py
index 90b6f6c..715075a 100644
--- a/airflow/contrib/kubernetes/kubernetes_factory.py
+++ b/airflow/contrib/kubernetes/kubernetes_factory.py
@@ -12,7 +12,6 @@
 # See the License for the specific language governing permissions and
 
 from airflow.contrib.kubernetes.kubernetes_request_factory import 
KubernetesRequestFactory
-import logging
 
 
 class KubernetesResourceBuilder:
@@ -31,7 +30,6 @@ class KubernetesResourceBuilder:
         self.cmds = cmds
         self.kub_req_factory = kub_req_factory
         self.namespace = namespace
-        self.logger = logging.getLogger(self.__class__.__name__)
         self.envs = {}
         self.labels = {}
         self.secrets = {}

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ad4e67ce/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
----------------------------------------------------------------------
diff --git 
a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
 
b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
index 7ea9bc2..9cfd77f 100644
--- 
a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
+++ 
b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
@@ -13,6 +13,7 @@
 # limitations under the License.
 
 from abc import ABCMeta, abstractmethod
+import six
 
 class KubernetesRequestFactory():
     """
@@ -69,8 +70,9 @@ class KubernetesRequestFactoryHelper(object):
 
     @staticmethod
     def extract_labels(pod, req):
-        for k in pod.labels.keys():
-            req['metadata']['labels'][k] = pod.labels[k]
+        req['metadata']['labels'] = req['metadata'].get('labels', {})
+        for k, v in six.iteritems(pod.labels):
+            req['metadata']['labels'][k] = v
 
     @staticmethod
     def extract_cmds(pod, req):
@@ -114,7 +116,7 @@ class KubernetesRequestFactoryHelper(object):
             })
 
     @staticmethod
-    def extract_secrets(pod, req):
+    def extract_env_and_secrets(pod, req):
         env_secrets = [s for s in pod.secrets if s.deploy_type == 'env']
         if len(pod.envs) > 0 or len(env_secrets) > 0:
             env = []
@@ -123,3 +125,20 @@ class KubernetesRequestFactoryHelper(object):
             for secret in env_secrets:
                 KubernetesRequestFactory.add_secret_to_env(env, secret)
             req['spec']['containers'][0]['env'] = env
+
+    @staticmethod
+    def extract_init_containers(pod, req):
+        if pod.init_containers:
+            req['spec']['initContainers'] = pod.init_containers
+
+    @staticmethod
+    def extract_service_account_name(pod, req):
+        if pod.service_account_name:
+            req['spec']['serviceAccountName'] = pod.service_account_name
+
+    @staticmethod
+    def extract_image_pull_secrets(pod, req):
+        if pod.image_pull_secrets:
+            req['spec']['imagePullSecrets'] = [{
+                'name': pull_secret
+            } for pull_secret in pod.image_pull_secrets.split(',')]

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ad4e67ce/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
----------------------------------------------------------------------
diff --git 
a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py 
b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
index 89631e0..dfa247f 100644
--- 
a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
+++ 
b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
@@ -12,11 +12,11 @@
 # See the License for the specific language governing permissions and
 
 import yaml
-import 
airflow.contrib.kubernetes.kubernetes_request_factory.kubernetes_request_factory
 as kreq
+from 
airflow.contrib.kubernetes.kubernetes_request_factory.kubernetes_request_factory
 import KubernetesRequestFactory
 from airflow.contrib.kubernetes.pod import Pod
 
 
-class SimplePodRequestFactory(kreq.KubernetesRequestFactory):
+class SimplePodRequestFactory(KubernetesRequestFactory):
     """
         Request generator for a simple pod.
     """
@@ -41,16 +41,18 @@ spec:
     def create(self, pod):
         # type: (Pod) -> dict
         req = yaml.load(self._yaml)
-        kreq.extract_name(pod, req)
-        kreq.extract_labels(pod, req)
-        kreq.extract_image(pod, req)
-        if pod.image_pull_policy:
-            kreq.extract_image_pull_policy(pod, req)
-        kreq.extract_cmds(pod, req)
-        kreq.extract_args(pod, req)
-        if len(pod.node_selectors) > 0:
-            self.extract_node_selector(pod, req)
-        self.extract_secrets(pod, req)
+        self.extract_name(pod, req)
+        self.extract_labels(pod, req)
+        self.extract_image(pod, req)
+        self.extract_image_pull_policy(pod, req)
+        self.extract_cmds(pod, req)
+        self.extract_args(pod, req)
+        self.extract_node_selector(pod, req)
+        self.extract_env_and_secrets(pod, req)
         self.extract_volume_secrets(pod, req)
-        self.attach_volume_mounts(req=req, pod=pod)
+        self.attach_volumes(pod, req)
+        self.attach_volume_mounts(pod, req)
+        self.extract_service_account_name(pod, req)
+        self.extract_init_containers(pod, req)
+        self.extract_image_pull_secrets(pod, req)
         return req

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ad4e67ce/airflow/contrib/kubernetes/pod.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod.py 
b/airflow/contrib/kubernetes/pod.py
index 0200afa..be99bbf 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/contrib/kubernetes/pod.py
@@ -12,7 +12,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import logging
 
 class Pod:
     """
@@ -43,7 +42,11 @@ class Pod:
             name=None,
             volumes = [],
             namespace='default',
-            result=None):
+            result=None,
+            image_pull_policy="IfNotPresent",
+            image_pull_secrets=None,
+            init_containers=None,
+            service_account_name=None):
         self.image = image
         self.envs = envs if envs else {}
         self.cmds = cmds
@@ -54,5 +57,7 @@ class Pod:
         self.volumes = volumes
         self.node_selectors = node_selectors if node_selectors else []
         self.namespace = namespace
-        self.logger = logging.getLogger(self.__class__.__name__)
-
+        self.image_pull_policy = image_pull_policy
+        self.image_pull_secrets = image_pull_secrets
+        self.init_containers = init_containers
+        self.service_account_name = service_account_name

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ad4e67ce/airflow/contrib/kubernetes/pod_launcher.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod_launcher.py 
b/airflow/contrib/kubernetes/pod_launcher.py
index e435a12..1fcdb10 100644
--- a/airflow/contrib/kubernetes/pod_launcher.py
+++ b/airflow/contrib/kubernetes/pod_launcher.py
@@ -11,28 +11,34 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import json
+
 from airflow.contrib.kubernetes.pod import Pod
 from airflow.contrib.kubernetes.kubernetes_request_factory.pod_request_factory 
import SimplePodRequestFactory
+from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.state import State
 from kubernetes import watch
 from kubernetes.client import V1Pod
-from airflow.utils.state import State
-import json
-import logging
+from kubernetes.client.rest import ApiException
 
 from .kube_client import get_kube_client
 
 
-class PodLauncher:
+class PodLauncher(LoggingMixin):
     def __init__(self, kube_client=None):
         self.kube_req_factory = SimplePodRequestFactory()
         self._client = kube_client or get_kube_client()
         self._watch = watch.Watch()
-        self.logger = logging.getLogger(__name__)
 
     def run_pod_async(self, pod):
         req = self.kube_req_factory.create(pod)
-        print(json.dumps(req))
-        resp = self._client.create_namespaced_pod(body=req, 
namespace=pod.namespace)
+        self.log.debug('Pod Creation Request: \n{}'.format(json.dumps(req, 
indent=2)))
+        try:
+            resp = self._client.create_namespaced_pod(body=req, 
namespace=pod.namespace)
+            self.log.debug('Pod Creation Response: {}'.format(resp))
+        except ApiException:
+            self.log.exception('Exception when attempting to create Namespaced 
Pod.')
+            raise
         return resp
 
     def run_pod(self, pod):
@@ -54,7 +60,7 @@ class PodLauncher:
     def _task_status(self, event):
         # type: (V1Pod) -> State
         task = event['object']
-        self.logger.info(
+        self.log.info(
             "Event: {} had an event of type {}".format(task.metadata.name,
                                                        event['type']))
         status = self.process_status(task.metadata.name, task.status.phase)
@@ -67,13 +73,13 @@ class PodLauncher:
         if status == 'Pending':
             return State.QUEUED
         elif status == 'Failed':
-            self.logger.info("Event: {} Failed".format(job_id))
+            self.log.info("Event: {} Failed".format(job_id))
             return State.FAILED
         elif status == 'Succeeded':
-            self.logger.info("Event: {} Succeeded".format(job_id))
+            self.log.info("Event: {} Succeeded".format(job_id))
             return State.SUCCESS
         elif status == 'Running':
             return State.RUNNING
         else:
-            self.logger.info("Event: Invalid state {} on job 
{}".format(status, job_id))
+            self.log.info("Event: Invalid state {} on job {}".format(status, 
job_id))
             return State.FAILED

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ad4e67ce/airflow/contrib/kubernetes/secret.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/secret.py 
b/airflow/contrib/kubernetes/secret.py
index ec5d51c..a798a24 100644
--- a/airflow/contrib/kubernetes/secret.py
+++ b/airflow/contrib/kubernetes/secret.py
@@ -1,30 +1,25 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
+# -*- coding: utf-8 -*-
 #
-#   http://www.apache.org/licenses/LICENSE-2.0
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
 #
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 
 
 class Secret:
-    """Defines Kubernetes Secret Volume"""
+    """Defines Kubernetes Secret Containers"""
 
     def __init__(self, deploy_type, deploy_target, secret, key):
-        """Initialize a Kubernetes Secret Object. Used to track requested 
secrets from
-        the user.
+        """Initialize a Kubernetes Secret Object. Used to track requested 
secrets from the user.
 
-        :param deploy_type: The type of secret deploy in Kubernetes, either 
`env` or
-            `volume`
+        :param deploy_type: The type of secret deploy in Kubernetes, either 
`env` or `volume`
         :type deploy_type: ``str``
         :param deploy_target: The environment variable to be created in the 
worker.
         :type deploy_target: ``str``

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ad4e67ce/airflow/contrib/kubernetes/worker_configuration.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/worker_configuration.py 
b/airflow/contrib/kubernetes/worker_configuration.py
new file mode 100644
index 0000000..5e87941
--- /dev/null
+++ b/airflow/contrib/kubernetes/worker_configuration.py
@@ -0,0 +1,158 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import copy
+import os
+import six
+
+from airflow.contrib.kubernetes.pod import Pod
+from airflow.contrib.kubernetes.secret import Secret
+
+
+class WorkerConfiguration:
+    """Contains Kubernetes Airflow Worker configuration logic"""
+
+    def __init__(self, kube_config):
+        self.kube_config = kube_config
+
+    def _get_init_containers(self, volume_mounts):
+        """When using git to retrieve the DAGs, use the GitSync Init 
Container"""
+        # If we're using volume claims to mount the dags, no init container is 
needed
+        if self.kube_config.dags_volume_claim:
+            return []
+
+        # Otherwise, define a git-sync init container
+        init_environment = [{
+                'name': 'GIT_SYNC_REPO',
+                'value': self.kube_config.git_repo
+            }, {
+                'name': 'GIT_SYNC_BRANCH',
+                'value': self.kube_config.git_branch
+            }, {
+                'name': 'GIT_SYNC_ROOT',
+                'value': '/tmp'
+            }, {
+                'name': 'GIT_SYNC_DEST',
+                'value': 'dags'
+            }, {
+                'name': 'GIT_SYNC_ONE_TIME',
+                'value': 'true'
+            }]
+        if self.kube_config.git_user:
+            init_environment.append({
+                'name': 'GIT_SYNC_USERNAME',
+                'value': self.kube_config.git_user
+            })
+        if self.kube_config.git_password:
+            init_environment.append({
+                'name': 'GIT_SYNC_PASSWORD',
+                'value': self.kube_config.git_password
+            })
+
+        volume_mounts[0]['readOnly'] = False
+        return [{
+            'name': self.kube_config.git_sync_init_container_name,
+            'image': self.kube_config.git_sync_container,
+            'securityContext': {'runAsUser': 0},
+            'env': init_environment,
+            'volumeMounts': volume_mounts
+        }]
+
+    def _get_volumes_and_mounts(self):
+        """Determine volumes and volume mounts for Airflow workers"""
+        dags_volume_name = "airflow-dags"
+        dags_path = os.path.join(self.kube_config.dags_folder, 
self.kube_config.git_subpath)
+        volumes = [{
+            'name': dags_volume_name
+        }]
+        volume_mounts = [{
+            'name': dags_volume_name,
+            'mountPath': dags_path,
+            'readOnly': True
+        }]
+
+        # Mount the airflow.cfg file via a configmap the user has specified
+        if self.kube_config.airflow_configmap:
+            config_volume_name = "airflow-config"
+            config_path = 
'{}/airflow.cfg'.format(self.kube_config.airflow_home)
+            volumes.append({
+                'name': config_volume_name,
+                'configMap': {
+                    'name': self.kube_config.airflow_configmap
+                }
+            })
+            volume_mounts.append({
+                'name': config_volume_name,
+                'mountPath': config_path,
+                'subPath': 'airflow.cfg',
+                'readOnly': True
+            })
+
+        # A PV with the DAGs should be mounted
+        if self.kube_config.dags_volume_claim:
+            volumes[0]['persistentVolumeClaim'] = {"claimName": 
self.kube_config.dags_volume_claim}
+            if self.kube_config.dags_volume_subpath:
+                volume_mounts[0]["subPath"] = 
self.kube_config.dags_volume_subpath
+        else:
+            # Create a Shared Volume for the Git-Sync module to populate
+            volumes[0]["emptyDir"] = {}
+        return volumes, volume_mounts
+
+    def _get_environment(self):
+        """Defines any necessary environment variables for the pod executor"""
+        env = {
+            'AIRFLOW__CORE__DAGS_FOLDER': '/tmp/dags',
+            'AIRFLOW__CORE__EXECUTOR': 'LocalExecutor'
+        }
+        if self.kube_config.airflow_configmap:
+            env['AIRFLOW__CORE__AIRFLOW_HOME'] = self.kube_config.airflow_home
+        return env
+
+    def _get_secrets(self):
+        """Defines any necessary secrets for the pod executor"""
+        worker_secrets = []
+        for env_var_name, obj_key_pair in 
six.iteritems(self.kube_config.kube_secrets):
+            k8s_secret_obj, k8s_secret_key = obj_key_pair.split('=')
+            worker_secrets.append(Secret('env', env_var_name, k8s_secret_obj, 
k8s_secret_key))
+        return worker_secrets
+
+    def _get_image_pull_secrets(self):
+        """Extracts any image pull secrets for fetching container(s)"""
+        if not self.kube_config.image_pull_secrets:
+            return []
+        return self.kube_config.image_pull_secrets.split(',')
+
+    def make_pod(self, namespace, pod_id, dag_id, task_id, execution_date, 
airflow_command):
+        volumes, volume_mounts = self._get_volumes_and_mounts()
+        worker_init_container_spec = 
self._get_init_containers(copy.deepcopy(volume_mounts))
+        return Pod(
+            namespace=namespace,
+            name=pod_id,
+            image=self.kube_config.kube_image,
+            cmds=["bash", "-cx", "--"],
+            args=[airflow_command],
+            labels={
+                "airflow-slave": "",
+                "dag_id": dag_id,
+                "task_id": task_id,
+                "execution_date": execution_date
+            },
+            envs=self._get_environment(),
+            secrets=self._get_secrets(),
+            service_account_name=self.kube_config.worker_service_account_name,
+            image_pull_secrets=self.kube_config.image_pull_secrets,
+            init_containers=worker_init_container_spec,
+            volumes=volumes,
+            volume_mounts=volume_mounts
+        )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ad4e67ce/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 9754415..fe76fbd 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1587,7 +1587,9 @@ class SchedulerJob(BaseJob):
                 timeout = 5
                 self.log.info("Waiting up to %s seconds for processes to 
exit...", timeout)
                 try:
-                    psutil.wait_procs(child_processes, timeout)
+                    psutil.wait_procs(
+                        child_processes, timeout=timeout,
+                        callback=lambda x: self.log.info('Terminated PID %s', 
x.pid))
                 except psutil.TimeoutExpired:
                     self.log.debug("Ran out of time while waiting for 
processes to exit")
 
@@ -1595,6 +1597,7 @@ class SchedulerJob(BaseJob):
                 child_processes = [x for x in 
this_process.children(recursive=True)
                                    if x.is_running() and x.pid in pids_to_kill]
                 if len(child_processes) > 0:
+                    self.log.info("SIGKILL processes that did not terminate 
gracefully")
                     for child in child_processes:
                         self.log.info("Killing child PID: %s", child.pid)
                         child.kill()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ad4e67ce/airflow/utils/dag_processing.py
----------------------------------------------------------------------
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index f75b756..543eb41 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -461,7 +461,7 @@ class DagFileProcessorManager(LoggingMixin):
     def heartbeat(self):
         """
         This should be periodically called by the scheduler. This method will
-        kick of new processes to process DAG definition files and read the
+        kick off new processes to process DAG definition files and read the
         results from the finished processors.
 
         :return: a list of SimpleDags that were produced by processors that

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ad4e67ce/scripts/ci/kubernetes/docker/Dockerfile
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/docker/Dockerfile 
b/scripts/ci/kubernetes/docker/Dockerfile
index a3b05b0..b1bc493 100644
--- a/scripts/ci/kubernetes/docker/Dockerfile
+++ b/scripts/ci/kubernetes/docker/Dockerfile
@@ -41,12 +41,6 @@ RUN pip install kubernetes && \
 COPY airflow.tar.gz /tmp/airflow.tar.gz
 RUN pip install /tmp/airflow.tar.gz
 
-# prep airflow
-ENV AIRFLOW_HOME=/root/airflow
-ENV AIRFLOW__CORE__EXECUTOR=KubernetesExecutor
-ENV 
AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://root:root@postgres-airflow:5432/airflow
-
-
 COPY bootstrap.sh /bootstrap.sh
 RUN chmod +x /bootstrap.sh
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ad4e67ce/scripts/ci/kubernetes/kube/airflow.yaml.template
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/kube/airflow.yaml.template 
b/scripts/ci/kubernetes/kube/airflow.yaml.template
index a297b95..af54175 100644
--- a/scripts/ci/kubernetes/kube/airflow.yaml.template
+++ b/scripts/ci/kubernetes/kube/airflow.yaml.template
@@ -52,29 +52,31 @@ spec:
     metadata:
       labels:
         name: airflow
-      annotations:
-        pod.beta.kubernetes.io/init-containers: '[
-          {
-              "name": "init",
-              "image": "{{docker_image}}",
-              "imagePullPolicy": "IfNotPresent",
-              "command": [
-                "bash", "-cx", "cd 
/usr/local/lib/python2.7/dist-packages/airflow && cp -R example_dags/* 
$AIRFLOW_HOME/dags/ && airflow initdb && alembic upgrade head"
-              ],
-              "env": [
-                {"name": "AIRFLOW__KUBERNETES__CONTAINER_IMAGE", "value": ""},
-                {"name": "AIRFLOW__KUBERNETES__DAGS_VOLUME_CLAIM", "value": 
"airflow-dags"},
-                {"name": "AIRFLOW__KUBERNETES__DAGS_VOLUME_SUBPATH", "value": 
"git"}
-              ],
-              "volumeMounts": [
-                {"name": "airflow-dags", "mountPath": "/root/airflow/dags"}
-              ]
-          }
-      ]'
     spec:
+      initContainers:
+      - name: "init"
+        image: "{{docker_image}}:{{docker_tag}}"
+        imagePullPolicy: "IfNotPresent"
+        volumeMounts:
+        - name: airflow-configmap
+          mountPath: /root/airflow/airflow.cfg
+          subPath: airflow.cfg
+        - name: airflow-dags
+          mountPath: /root/airflow/dags
+        env:
+        - name: SQL_ALCHEMY_CONN
+          valueFrom:
+            secretKeyRef:
+              name: airflow-secrets
+              key: sql_alchemy_conn
+        command:
+          - "bash"
+        args:
+          - "-cx"
+          - "cd /usr/local/lib/python2.7/dist-packages/airflow && cp -R 
example_dags/* /root/airflow/dags/ && airflow initdb && alembic upgrade head"
       containers:
       - name: web
-        image: {{docker_image}}
+        image: {{docker_image}}:{{docker_tag}}
         imagePullPolicy: IfNotPresent
         ports:
         - name: web
@@ -85,22 +87,15 @@ spec:
           valueFrom:
             fieldRef:
               fieldPath: metadata.namespace
-        - name: AIRFLOW__CORE__EXECUTOR
-          value: KubernetesExecutor
-        - name: AIRFLOW__KUBERNETES__CONTAINER_IMAGE
-          value: {{docker_image}}
-        - name: AIRFLOW__KUBERNETES__DELETE_WORKER_PODS
-          value: "True"
-        # set these two confs
-        - name: AIRFLOW__KUBERNETES__GIT_REPO
-          value: https://github.com/grantnicholas/testdags.git
-        - name: AIRFLOW__KUBERNETES__GIT_BRANCH
-          value: master
-        # or this one
-        - name: AIRFLOW__KUBERNETES__DAGS_VOLUME_CLAIM
-          value: airflow-dags
-        #
+        - name: SQL_ALCHEMY_CONN
+          valueFrom:
+            secretKeyRef:
+              name: airflow-secrets
+              key: sql_alchemy_conn
         volumeMounts:
+        - name: airflow-configmap
+          mountPath: /root/airflow/airflow.cfg
+          subPath: airflow.cfg
         - name: airflow-dags
           mountPath: /root/airflow/dags
         readinessProbe:
@@ -126,30 +121,24 @@ spec:
           valueFrom:
             fieldRef:
               fieldPath: metadata.namespace
-        - name: AIRFLOW__CORE__EXECUTOR
-          value: KubernetesExecutor
-        - name: AIRFLOW__KUBERNETES__CONTAINER_IMAGE
-          value: {{docker_image}}
-        - name: AIRFLOW__KUBERNETES__DELETE_WORKER_PODS
-          value: "True"
-        # set these two confs
-        - name: AIRFLOW__KUBERNETES__GIT_REPO
-          value: https://github.com/grantnicholas/testdags.git
-        - name: AIRFLOW__KUBERNETES__GIT_BRANCH
-          value: master
-        # or set this one
-        - name: AIRFLOW__KUBERNETES__DAGS_VOLUME_CLAIM
-          value: airflow-dags
-        #
-        - name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL
-          value: "60"
+        - name: SQL_ALCHEMY_CONN
+          valueFrom:
+            secretKeyRef:
+              name: airflow-secrets
+              key: sql_alchemy_conn
         volumeMounts:
+        - name: airflow-configmap
+          mountPath: /root/airflow/airflow.cfg
+          subPath: airflow.cfg
         - name: airflow-dags
           mountPath: /root/airflow/dags
       volumes:
       - name: airflow-dags
         persistentVolumeClaim:
           claimName: airflow-dags
+      - name: airflow-configmap
+        configMap:
+          name: airflow-configmap
 ---
 apiVersion: v1
 kind: Service
@@ -162,4 +151,45 @@ spec:
       nodePort: 30809
   selector:
     name: airflow
+---
+apiVersion: v1
+kind: Secret
+metadata:
+  name: airflow-secrets
+type: Opaque
+data:
+  # The sql_alchemy_conn value is a base64 encoded represenation of this 
connection string:
+  # postgresql+psycopg2://root:root@postgres-airflow:5432/airflow
+  sql_alchemy_conn: 
cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6cm9vdEBwb3N0Z3Jlcy1haXJmbG93OjU0MzIvYWlyZmxvdwo=
+---
+apiVersion: v1
+kind: ConfigMap
+metadata:
+  name: airflow-configmap
+data:
+  airflow.cfg: |
+    [core]
+    airflow_home = /root/airflow
+    dags_folder = /root/airflow/dags
+    base_log_folder = /root/airflow/logs
+    logging_level = INFO
+    executor = KubernetesExecutor
+    parallelism = 32
+    plugins_folder = /root/airflow/plugins
+    sql_alchemy_conn = $SQL_ALCHEMY_CONN
+
+    [scheduler]
+    dag_dir_list_interval = 60
+    child_process_log_directory = /root/airflow/logs/scheduler
+
+    [kubernetes]
+    airflow_configmap = airflow-configmap
+    worker_container_repository = {{docker_image}}
+    worker_container_tag = {{docker_tag}}
+    delete_worker_pods = True
+    git_repo = https://github.com/grantnicholas/testdags.git
+    git_branch = master
+    dags_volume_claim = airflow-dags
 
+    [kubernetes_secrets]
+    SQL_ALCHEMY_CONN = airflow-secrets=sql_alchemy_conn

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ad4e67ce/scripts/ci/kubernetes/kube/deploy.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/kubernetes/kube/deploy.sh 
b/scripts/ci/kubernetes/kube/deploy.sh
index 2532d83..953f50f 100755
--- a/scripts/ci/kubernetes/kube/deploy.sh
+++ b/scripts/ci/kubernetes/kube/deploy.sh
@@ -24,8 +24,7 @@ sudo mkdir -p /data/postgres-airflow
 
 mkdir -p $DIRNAME/.generated
 kubectl apply -f $DIRNAME/postgres.yaml
-sed "s#{{docker_image}}#$IMAGE:$TAG#g" $DIRNAME/airflow.yaml.template > 
$DIRNAME/.generated/airflow.yaml && kubectl apply -f 
$DIRNAME/.generated/airflow.yaml
-
+sed -e "s#{{docker_image}}#$IMAGE#g" -e "s#{{docker_tag}}#$TAG#g" 
$DIRNAME/airflow.yaml.template > $DIRNAME/.generated/airflow.yaml && kubectl 
apply -f $DIRNAME/.generated/airflow.yaml
 
 # wait for up to 10 minutes for everything to be deployed
 for i in {1..150}
@@ -33,8 +32,8 @@ do
   echo "------- Running kubectl get pods -------"
   PODS=$(kubectl get pods | awk 'NR>1 {print $0}')
   echo "$PODS"
-  NUM_AIRFLOW_READY=$(echo $PODS | grep airflow | awk '{print $2}' | grep -E 
'([0-9])\/(\1)' | wc -l)
-  NUM_POSTGRES_READY=$(echo $PODS | grep postgres | awk '{print $2}' | grep -E 
'([0-9])\/(\1)' | wc -l)
+  NUM_AIRFLOW_READY=$(echo $PODS | grep airflow | awk '{print $2}' | grep -E 
'([0-9])\/(\1)' | wc -l | xargs)
+  NUM_POSTGRES_READY=$(echo $PODS | grep postgres | awk '{print $2}' | grep -E 
'([0-9])\/(\1)' | wc -l | xargs)
   if [ "$NUM_AIRFLOW_READY" == "1" ] && [ "$NUM_POSTGRES_READY" == "1" ]; then
     break
   fi

Reply via email to