[AIRFLOW-1314] Git Mode to pull in DAGs for Kubernetes Executor

Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/bb1e05c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/bb1e05c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/bb1e05c3

Branch: refs/heads/master
Commit: bb1e05c3fa9319d6e81fcd3f7ec646a29ecf5185
Parents: c177d6e
Author: grantnicholas <[email protected]>
Authored: Thu Aug 31 17:53:45 2017 -0500
Committer: Fokko Driesprong <[email protected]>
Committed: Sun Apr 22 10:17:39 2018 +0200

----------------------------------------------------------------------
 .../contrib/executors/kubernetes_executor.py    | 190 +++++++++----------
 .../contrib/kubernetes/kubernetes_factory.py    |  18 ++
 airflow/contrib/kubernetes/kubernetes_helper.py |  10 +
 .../kubernetes_request_factory.py               |  19 ++
 .../pod_request_factory.py                      |   2 +
 docker/Dockerfile                               |   6 +
 kube/airflow.yaml.template                      |  30 ++-
 7 files changed, 155 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bb1e05c3/airflow/contrib/executors/kubernetes_executor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/executors/kubernetes_executor.py 
b/airflow/contrib/executors/kubernetes_executor.py
index 8eb2186..5e4afba 100644
--- a/airflow/contrib/executors/kubernetes_executor.py
+++ b/airflow/contrib/executors/kubernetes_executor.py
@@ -18,6 +18,7 @@ import time
 import os
 import multiprocessing
 from queue import Queue
+from datetime import datetime
 from kubernetes import watch
 from airflow import settings
 from airflow.contrib.kubernetes.pod_launcher import PodLauncher
@@ -29,33 +30,16 @@ from airflow import configuration
 from kubernetes import client
 
 class KubeConfig:
-    kube_image = configuration.get('core', 'k8s_image')
-    git_repo = configuration.get('core', 'k8s_git_repo')
-
-
-def _prep_command_for_container(command):
-    """  
-    When creating a kubernetes pod, the yaml expects the command
-    in the form of ["cmd","arg","arg","arg"...]
-    This function splits the command string into tokens 
-    and then matches it to the convention.
-
-    :param command:
-
-    :return:
-
-    """
-    return '"' + '","'.join(command.split(' ')[1:]) + '"'
-
-
-PARALLELISM = configuration.getint('core', 'PARALLELISM')
+    def __init__(self):
+        self.kube_image = configuration.get('core', 'k8s_image')
+        self.git_repo = configuration.get('core', 'k8s_git_repo')
+        self.git_branch = configuration.get('core', 'k8s_git_branch')
 
 
 class KubernetesJobWatcher(multiprocessing.Process, object):
-    def __init__(self, watch_function, namespace, result_queue, watcher_queue):
+    def __init__(self, watch_function, namespace, watcher_queue):
         self.logger = logging.getLogger(__name__)
         multiprocessing.Process.__init__(self)
-        self.result_queue = result_queue
         self._watch_function = watch_function
         self._watch = watch.Watch()
         self.namespace = namespace
@@ -66,49 +50,39 @@ class KubernetesJobWatcher(multiprocessing.Process, object):
         for event in self._watch.stream(self._watch_function, self.namespace,
                                         label_selector='airflow-slave'):
             task = event['object']
-            self.logger.info(
-                "Event: {} had an event of type {}".format(task.metadata.name,
-                                                           event['type']))
-            self.process_status(task.metadata.name, task.status.phase)
+            self.logger.info("Event: {} had an event of type 
{}".format(task.metadata.name,
+                                                                        
event['type']))
+            self.process_status(task.metadata.name, task.status.phase, 
task.metadata.labels)
 
-    def process_status(self, job_id, status):
+    def process_status(self, job_id, status, labels):
         if status == 'Pending':
             self.logger.info("Event: {} Pending".format(job_id))
         elif status == 'Failed':
-            # self.logger.info("Event: {} Failed".format(job_id))
-            self.watcher_queue.put((job_id, State.FAILED))
+            self.logger.info("Event: {} Failed".format(job_id))
+            self.watcher_queue.put((job_id, State.FAILED, labels))
         elif status == 'Succeeded':
-            # self.logger.info("Event: {} Succeeded".format(job_id))
-            self.watcher_queue.put((job_id, None))
+            self.logger.info("Event: {} Succeeded".format(job_id))
+            self.watcher_queue.put((job_id, None, labels))
         elif status == 'Running':
             # self.logger.info("Event: {} is Running".format(job_id))
             self.watcher_queue.put((job_id, State.RUNNING))
         else:
-            self.logger.info("Event: Invalid state {} on job 
{}".format(status, job_id))
+            self.logger.info("Event: Invalid state: {} on job: {} with labels: 
{}".format(status, job_id, labels))
 
 
 class AirflowKubernetesScheduler(object):
-    def __init__(self,
-                 task_queue,
-                 result_queue,
-                 running):
+    def __init__(self, task_queue, result_queue):
         self.logger = logging.getLogger(__name__)
         self.logger.info("creating kubernetes executor")
+        self.kube_config = KubeConfig()
         self.task_queue = task_queue
         self.pending_jobs = set()
         self.namespace = os.environ['k8s_POD_NAMESPACE']
         self.logger.info("k8s: using namespace {}".format(self.namespace))
         self.result_queue = result_queue
-        self.launcher = PodLauncher()
-        self.current_jobs = {}
-        self.running = running
-        self._task_counter = 0
         self.watcher_queue = multiprocessing.Queue()
-        self.api = client.CoreV1Api()
-
-        watch_function = self.api.read_namespaced_pod
-        w = KubernetesJobWatcher(watch_function, self.namespace,
-                                 self.result_queue, self.watcher_queue)
+        self.helper = KubernetesHelper()
+        w = KubernetesJobWatcher(self.helper.pod_api.list_namespaced_pod, 
self.namespace, self.watcher_queue)
         w.start()
 
     def run_next(self, next_job):
@@ -123,32 +97,39 @@ class AirflowKubernetesScheduler(object):
 
         """
         self.logger.info('k8s: job is {}'.format(str(next_job)))
-        (key, command) = next_job
+        key, command = next_job
+        dag_id, task_id, execution_date = key
         self.logger.info("running for command {}".format(command))
-        epoch_time = calendar.timegm(time.gmtime())
-        cmd_args = "mkdir -p $AIRFLOW_HOME/dags/synched/git && " \
-                   "git clone {} /tmp/tmp_git && " \
-                   "mv /tmp/tmp_git/* $AIRFLOW_HOME/dags/synched/git/ &&" \
-                   "rm -rf /tmp/tmp_git &&" \
-                   "{} -km".format(KubeConfig.git_repo, command)
-        pod_id = self._create_job_id_from_key(key=key, epoch_time=epoch_time)
-        self.current_jobs[pod_id] = key
-
+        cmd_args = "mkdir -p $AIRFLOW_HOME/dags/synched/git && cd 
$AIRFLOW_HOME/dags/synched/git &&" \
+                   "git init && git remote add origin {git_repo} && git pull 
origin {git_branch} --depth=1 &&" \
+                   "{command} -km".format(git_repo=self.kube_config.git_repo, 
git_branch=self.kube_config.git_branch,
+                                          command=command)
+        pod_id = self._create_job_id_from_key(key=key)
         pod = KubernetesPodBuilder(
-            image=KubeConfig.kube_image,
+            image=self.kube_config.kube_image,
             cmds=["bash", "-cx", "--"],
             args=[cmd_args],
             kub_req_factory=SimplePodRequestFactory(),
             namespace=self.namespace
         )
+        pod.set_image_pull_policy("IfNotPresent")
+        pod.add_env_variables({"AIRFLOW__CORE__EXECUTOR": "LocalExecutor"})
         pod.add_name(pod_id)
+        pod.add_labels({
+            "dag_id": dag_id,
+            "task_id": task_id,
+            "execution_date": 
self._datetime_to_label_safe_datestring(execution_date)
+        })
         pod.launch()
-        self._task_counter += 1
 
         # the watcher will monitor pods, so we do not block.
         self.launcher.run_pod_async(pod)
         self.logger.info("k8s: Job created!")
 
+    def delete_job(self, key):
+        job_id = self._create_job_id_from_key(key)
+        self.helper.delete_pod(job_id, namespace=self.namespace)
+
     def sync(self):
         """
 
@@ -162,41 +143,49 @@ class AirflowKubernetesScheduler(object):
         while not self.watcher_queue.empty():
             self.process_watcher_task()
 
-    def process_watcher_task(self):
-        job_id, state = self.watcher_queue.get()
-        if state == State.RUNNING and job_id in self.pending_jobs:
-            self.pending_jobs.remove(job_id)
-        elif job_id in self.current_jobs:
-            self._complete_job(job_id, state)
-
-    def _complete_job(self, job_id, state):
-        key = self.current_jobs[job_id]
-        self.logger.info("finishing job {}".format(key))
-        self.result_queue.put((key, state))
-        self.current_jobs.pop(job_id)
-        self.running.pop(key)
-
-    def _create_job_id_from_key(self, key, epoch_time):
-        """
-
-        Kubernetes pod names must unique and match specific conventions 
-        (i.e. no spaces, period, etc.)
-        This function creates a unique name using the epoch time and internal 
counter
-
-        :param key: 
-
-        :param epoch_time: 
-
-        :return:
-
-        """
+    def end_task(self):
+        job_id, state, labels = self.watcher_queue.get()
+        logging.info("Attempting to finish job; job_id: {}; state: {}; labels: 
{}".format(job_id, state, labels))
+        key = self._labels_to_key(labels)
+        if key:
+            self.logger.info("finishing job {}".format(key))
+            self.result_queue.put((key, state))
 
+    @staticmethod
+    def _create_job_id_from_key(key):
         keystr = '-'.join([str(x).replace(' ', '-') for x in key[:2]])
-        job_fields = [keystr, str(self._task_counter), str(epoch_time)]
+        job_fields = [keystr]
         unformatted_job_id = '-'.join(job_fields)
         job_id = unformatted_job_id.replace('_', '-')
         return job_id
 
+    @staticmethod
+    def _label_safe_datestring_to_datetime(string):
+        """
+        Kubernetes doesn't like ":" in labels, since ISO datetime format uses 
":" but not "_" let's replace ":" with "_"
+        :param string: string
+        :return: datetime.datetime object
+        """
+        return datetime.strptime(string.replace("_", ":"), "%Y-%m-%dT%H:%M:%S")
+
+    @staticmethod
+    def _datetime_to_label_safe_datestring(datetime_obj):
+        """
+        Kubernetes doesn't like ":" in labels, since ISO datetime format uses 
":" but not "_" let's replace ":" with "_"
+        :param datetime_obj: datetime.datetime object
+        :return: ISO-like string representing the datetime
+        """
+        return datetime_obj.isoformat().replace(":", "_")
+
+    def _labels_to_key(self, labels):
+        try:
+            return labels["dag_id"], labels["task_id"], 
self._label_safe_datestring_to_datetime(labels["execution_date"])
+        except Exception as e:
+            self.logger.warn("Error while converting labels to key; labels: 
{}; exception: {}".format(
+                labels, e
+            ))
+            return None
+
 
 class KubernetesExecutor(BaseExecutor):
     def __init__(self):
@@ -209,13 +198,10 @@ class KubernetesExecutor(BaseExecutor):
 
     def start(self):
         self.logger.info('k8s: starting kubernetes executor')
-        self.task_queue = Queue()
         self._session = settings.Session()
+        self.task_queue = Queue()
         self.result_queue = Queue()
-        self.pending_tasks = {}
-        self.kub_client = AirflowKubernetesScheduler(self.task_queue,
-                                                     self.result_queue,
-                                                     running=self.running)
+        self.kub_client = AirflowKubernetesScheduler(self.task_queue, 
self.result_queue)
 
     def sync(self):
         self.kub_client.sync()
@@ -224,14 +210,9 @@ class KubernetesExecutor(BaseExecutor):
             self.logger.info("reporting {}".format(results))
             self.change_state(*results)
 
-        # TODO this could be a job_counter based on max jobs a user wants
-        if self.job_queue_full() or self.cluster_at_capacity():
-            self.logger.info("currently a job is running")
-        else:
-            self.logger.info("queue ready, running next")
-            if not self.task_queue.empty():
-                (key, command) = self.task_queue.get()
-                self.kub_client.run_next((key, command))
+        if not self.task_queue.empty():
+            (key, command) = self.task_queue.get()
+            self.kub_client.run_next((key, command))
 
     def job_queue_full(self):
         return len(self.kub_client.current_jobs) > PARALLELISM
@@ -245,15 +226,18 @@ class KubernetesExecutor(BaseExecutor):
     def change_state(self, key, state):
         self.logger.info("k8s: setting state of {} to {}".format(key, state))
         if state != State.RUNNING:
-            # self.kub_client.delete_job(key)
-            self.logger.info("current running {}".format(self.running))
-            self.running.pop(key)
+            self.kub_client.delete_job(key)
+            try:
+                self.running.pop(key)
+            except KeyError:
+                pass
         self.event_buffer[key] = state
         (dag_id, task_id, ex_time) = key
         item = self._session.query(TaskInstance).filter_by(
             dag_id=dag_id,
             task_id=task_id,
-            execution_date=ex_time).one()
+            execution_date=ex_time
+        ).one()
 
         if item.state == State.RUNNING or item.state == State.QUEUED:
             item.state = state

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bb1e05c3/airflow/contrib/kubernetes/kubernetes_factory.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_factory.py 
b/airflow/contrib/kubernetes/kubernetes_factory.py
index fc840bb..90b6f6c 100644
--- a/airflow/contrib/kubernetes/kubernetes_factory.py
+++ b/airflow/contrib/kubernetes/kubernetes_factory.py
@@ -37,6 +37,7 @@ class KubernetesResourceBuilder:
         self.secrets = {}
         self.node_selectors = []
         self.name = None
+        self.image_pull_policy = None
 
     def add_env_variables(self, env):
         self.envs = env
@@ -56,6 +57,23 @@ class KubernetesResourceBuilder:
     def set_namespace(self, namespace):
         self.namespace = namespace
 
+    def set_image_pull_policy(self, image_pull_policy):
+        self.image_pull_policy = image_pull_policy
+
+    def launch(self):
+        """
+            Launches the pod synchronously and waits for completion.
+        """
+        k8s_beta = self._kube_client()
+        req = self.kub_req_factory.create(self)
+        self.logger.info(json.dumps(req))
+        resp = k8s_beta.create_namespaced_pod(body=req, 
namespace=self.namespace)
+        self.logger.info("Job created. status='%s', yaml:\n%s",
+                         str(resp.status), str(req))
+
+    def _kube_client(self):
+        config.load_incluster_config()
+        return client.CoreV1Api()
 
 class KubernetesPodBuilder(KubernetesResourceBuilder):
     def __init__(self, image, cmds, namespace, kub_req_factory=None):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bb1e05c3/airflow/contrib/kubernetes/kubernetes_helper.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kubernetes_helper.py 
b/airflow/contrib/kubernetes/kubernetes_helper.py
index 862d76a..cad7917 100644
--- a/airflow/contrib/kubernetes/kubernetes_helper.py
+++ b/airflow/contrib/kubernetes/kubernetes_helper.py
@@ -14,6 +14,8 @@
 
 import yaml
 from kubernetes import client, config
+from kubernetes.client.rest import ApiException
+import kubernetes
 
 
 class KubernetesHelper(object):
@@ -33,3 +35,11 @@ class KubernetesHelper(object):
     def delete_job(self, job_id, namespace):
         body = client.V1DeleteOptions()
         self.job_api.delete_namespaced_job(name=job_id, namespace=namespace, 
body=body)
+
+    def delete_pod(self, pod_id, namespace):
+        body = client.V1DeleteOptions()
+        try:
+            self.pod_api.delete_namespaced_pod(pod_id, namespace, body=body)
+        except ApiException as e:
+            if e.status != 404:
+                raise

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bb1e05c3/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
----------------------------------------------------------------------
diff --git 
a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
 
b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
index 2382561..7ea9bc2 100644
--- 
a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
+++ 
b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
@@ -32,6 +32,25 @@ class KubernetesRequestFactory():
         """
         pass
 
+    @abstractmethod
+    def after_create(self, body, pod):
+        """
+            Is called after the create to augment the body.
+
+            :param body: The request body
+            :param pod:  The pod
+        """
+        pass
+
+
+class KubernetesRequestFactoryHelper(object):
+    """
+    Helper methods to build a request for kubernetes
+    """
+    @staticmethod
+    def extract_image_pull_policy(pod, req):
+        req['spec']['containers'][0]['imagePullPolicy'] = pod.image_pull_policy
+
     @staticmethod
     def extract_image(pod, req):
         req['spec']['containers'][0]['image'] = pod.image

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bb1e05c3/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
----------------------------------------------------------------------
diff --git 
a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py 
b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
index 4a0cbeb..d013016 100644
--- 
a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
+++ 
b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
@@ -45,6 +45,8 @@ spec:
         kreq.extract_name(pod, req)
         kreq.extract_labels(pod, req)
         kreq.extract_image(pod, req)
+        if pod.image_pull_policy:
+            kreq.extract_image_pull_policy(pod, req)
         kreq.extract_cmds(pod, req)
         kreq.extract_args(pod, req)
         if len(pod.node_selectors) > 0:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bb1e05c3/docker/Dockerfile
----------------------------------------------------------------------
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 2004e97..38e7c7c 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -27,6 +27,12 @@ RUN pip install /tmp/airflow.tar.gz
 # prep airflow
 ENV AIRFLOW_HOME=/root/airflow
 ENV AIRFLOW__CORE__EXECUTOR=KubernetesExecutor
+ENV 
AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://root:root@postgres-airflow:5432/airflow
+
+# Don't read the raw revisions git-sync puts in the volume, we always want to 
read from the symlink
+# A hack, how else can we do this
+RUN mkdir -p $AIRFLOW_HOME/dags && echo "rev-[a-zA-Z0-9]+" > 
$AIRFLOW_HOME/dags/.airflowignore
+
 
 COPY bootstrap.sh /bootstrap.sh
 RUN chmod +x /bootstrap.sh

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bb1e05c3/kube/airflow.yaml.template
----------------------------------------------------------------------
diff --git a/kube/airflow.yaml.template b/kube/airflow.yaml.template
index c5877ca..eca6d3c 100644
--- a/kube/airflow.yaml.template
+++ b/kube/airflow.yaml.template
@@ -13,12 +13,7 @@ spec:
           {
               "name": "init",
               "image": "{{docker_image}}",
-              "command": ["bash", "-c", "cd 
/usr/local/lib/python2.7/dist-packages/airflow && airflow initdb && alembic 
upgrade head"],
-              "env": [
-                {"name": "AIRFLOW__CORE__SQL_ALCHEMY_CONN", "value": 
"postgresql+psycopg2://root:root@postgres-airflow:5432/airflow"},
-                {"name": "AIRFLOW__CORE__K8S_IMAGE", "value": 
"{{docker_image}}"},
-                {"name": "AIRFLOW__CORE__K8S_GIT_REPO", "value": 
"https://github.com/grantnicholas/testdags.git"}
-              ]
+              "command": ["bash", "-c", "cd 
/usr/local/lib/python2.7/dist-packages/airflow && airflow initdb && alembic 
upgrade head"]
           }
       ]'
     spec:
@@ -28,13 +23,6 @@ spec:
         command: [
           "bash", "-c", "cd /usr/local/lib/python2.7/dist-packages/airflow && 
airflow initdb && alembic upgrade head"
         ]
-        env:
-        - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
-          value: postgresql+psycopg2://root:root@postgres-airflow:5432/airflow
-        - name: AIRFLOW__CORE__K8S_IMAGE
-          value: {{docker_image}}
-        - name: AIRFLOW__CORE__K8S_GIT_REPO
-          value: https://github.com/grantnicholas/testdags.git
       containers:
       - name: web
         image: {{docker_image}}
@@ -43,12 +31,14 @@ spec:
           containerPort: 8080
         args: ["webserver"]
         env:
-        - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
-          value: postgresql+psycopg2://root:root@postgres-airflow:5432/airflow
+        - name: AIRFLOW__CORE__EXECUTOR
+          value: KubernetesExecutor
         - name: AIRFLOW__CORE__K8S_IMAGE
           value: {{docker_image}}
         - name: AIRFLOW__CORE__K8S_GIT_REPO
           value: https://github.com/grantnicholas/testdags.git
+        - name: AIRFLOW__CORE__K8S_GIT_BRANCH
+          value: master
         volumeMounts:
         - name: dags
           mountPath: /root/airflow/dags/synched
@@ -70,12 +60,16 @@ spec:
         image: {{docker_image}}
         args: ["scheduler"]
         env:
-        - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
-          value: postgresql+psycopg2://root:root@postgres-airflow:5432/airflow
+        - name: AIRFLOW__CORE__EXECUTOR
+          value: KubernetesExecutor
         - name: AIRFLOW__CORE__K8S_IMAGE
           value: {{docker_image}}
         - name: AIRFLOW__CORE__K8S_GIT_REPO
           value: https://github.com/grantnicholas/testdags.git
+        - name: AIRFLOW__CORE__K8S_GIT_BRANCH
+          value: master
+        - name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL
+          value: "60"
         volumeMounts:
         - name: dags
           mountPath: /root/airflow/dags/synched
@@ -84,6 +78,8 @@ spec:
         env:
         - name: GIT_SYNC_REPO
           value: https://github.com/grantnicholas/testdags.git
+        - name: GIT_SYNC_BRANCH
+          value: master
         - name: GIT_SYNC_DEST
           value: git
         volumeMounts:

Reply via email to