[AIRFLOW-1314] Git Mode to pull in DAGs for Kubernetes Executor
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/bb1e05c3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/bb1e05c3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/bb1e05c3 Branch: refs/heads/master Commit: bb1e05c3fa9319d6e81fcd3f7ec646a29ecf5185 Parents: c177d6e Author: grantnicholas <[email protected]> Authored: Thu Aug 31 17:53:45 2017 -0500 Committer: Fokko Driesprong <[email protected]> Committed: Sun Apr 22 10:17:39 2018 +0200 ---------------------------------------------------------------------- .../contrib/executors/kubernetes_executor.py | 190 +++++++++---------- .../contrib/kubernetes/kubernetes_factory.py | 18 ++ airflow/contrib/kubernetes/kubernetes_helper.py | 10 + .../kubernetes_request_factory.py | 19 ++ .../pod_request_factory.py | 2 + docker/Dockerfile | 6 + kube/airflow.yaml.template | 30 ++- 7 files changed, 155 insertions(+), 120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bb1e05c3/airflow/contrib/executors/kubernetes_executor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py index 8eb2186..5e4afba 100644 --- a/airflow/contrib/executors/kubernetes_executor.py +++ b/airflow/contrib/executors/kubernetes_executor.py @@ -18,6 +18,7 @@ import time import os import multiprocessing from queue import Queue +from datetime import datetime from kubernetes import watch from airflow import settings from airflow.contrib.kubernetes.pod_launcher import PodLauncher @@ -29,33 +30,16 @@ from airflow import configuration from kubernetes import client class KubeConfig: - kube_image = configuration.get('core', 'k8s_image') - git_repo = configuration.get('core', 'k8s_git_repo') - - -def _prep_command_for_container(command): - """ - When creating a kubernetes pod, the yaml expects the command - in the form of ["cmd","arg","arg","arg"...] - This function splits the command string into tokens - and then matches it to the convention. - - :param command: - - :return: - - """ - return '"' + '","'.join(command.split(' ')[1:]) + '"' - - -PARALLELISM = configuration.getint('core', 'PARALLELISM') + def __init__(self): + self.kube_image = configuration.get('core', 'k8s_image') + self.git_repo = configuration.get('core', 'k8s_git_repo') + self.git_branch = configuration.get('core', 'k8s_git_branch') class KubernetesJobWatcher(multiprocessing.Process, object): - def __init__(self, watch_function, namespace, result_queue, watcher_queue): + def __init__(self, watch_function, namespace, watcher_queue): self.logger = logging.getLogger(__name__) multiprocessing.Process.__init__(self) - self.result_queue = result_queue self._watch_function = watch_function self._watch = watch.Watch() self.namespace = namespace @@ -66,49 +50,39 @@ class KubernetesJobWatcher(multiprocessing.Process, object): for event in self._watch.stream(self._watch_function, self.namespace, label_selector='airflow-slave'): task = event['object'] - self.logger.info( - "Event: {} had an event of type {}".format(task.metadata.name, - event['type'])) - self.process_status(task.metadata.name, task.status.phase) + self.logger.info("Event: {} had an event of type {}".format(task.metadata.name, + event['type'])) + self.process_status(task.metadata.name, task.status.phase, task.metadata.labels) - def process_status(self, job_id, status): + def process_status(self, job_id, status, labels): if status == 'Pending': self.logger.info("Event: {} Pending".format(job_id)) elif status == 'Failed': - # self.logger.info("Event: {} Failed".format(job_id)) - self.watcher_queue.put((job_id, State.FAILED)) + self.logger.info("Event: {} Failed".format(job_id)) + self.watcher_queue.put((job_id, State.FAILED, labels)) elif status == 'Succeeded': - # self.logger.info("Event: {} Succeeded".format(job_id)) - self.watcher_queue.put((job_id, None)) + self.logger.info("Event: {} Succeeded".format(job_id)) + self.watcher_queue.put((job_id, None, labels)) elif status == 'Running': # self.logger.info("Event: {} is Running".format(job_id)) self.watcher_queue.put((job_id, State.RUNNING)) else: - self.logger.info("Event: Invalid state {} on job {}".format(status, job_id)) + self.logger.info("Event: Invalid state: {} on job: {} with labels: {}".format(status, job_id, labels)) class AirflowKubernetesScheduler(object): - def __init__(self, - task_queue, - result_queue, - running): + def __init__(self, task_queue, result_queue): self.logger = logging.getLogger(__name__) self.logger.info("creating kubernetes executor") + self.kube_config = KubeConfig() self.task_queue = task_queue self.pending_jobs = set() self.namespace = os.environ['k8s_POD_NAMESPACE'] self.logger.info("k8s: using namespace {}".format(self.namespace)) self.result_queue = result_queue - self.launcher = PodLauncher() - self.current_jobs = {} - self.running = running - self._task_counter = 0 self.watcher_queue = multiprocessing.Queue() - self.api = client.CoreV1Api() - - watch_function = self.api.read_namespaced_pod - w = KubernetesJobWatcher(watch_function, self.namespace, - self.result_queue, self.watcher_queue) + self.helper = KubernetesHelper() + w = KubernetesJobWatcher(self.helper.pod_api.list_namespaced_pod, self.namespace, self.watcher_queue) w.start() def run_next(self, next_job): @@ -123,32 +97,39 @@ class AirflowKubernetesScheduler(object): """ self.logger.info('k8s: job is {}'.format(str(next_job))) - (key, command) = next_job + key, command = next_job + dag_id, task_id, execution_date = key self.logger.info("running for command {}".format(command)) - epoch_time = calendar.timegm(time.gmtime()) - cmd_args = "mkdir -p $AIRFLOW_HOME/dags/synched/git && " \ - "git clone {} /tmp/tmp_git && " \ - "mv /tmp/tmp_git/* $AIRFLOW_HOME/dags/synched/git/ &&" \ - "rm -rf /tmp/tmp_git &&" \ - "{} -km".format(KubeConfig.git_repo, command) - pod_id = self._create_job_id_from_key(key=key, epoch_time=epoch_time) - self.current_jobs[pod_id] = key - + cmd_args = "mkdir -p $AIRFLOW_HOME/dags/synched/git && cd $AIRFLOW_HOME/dags/synched/git &&" \ + "git init && git remote add origin {git_repo} && git pull origin {git_branch} --depth=1 &&" \ + "{command} -km".format(git_repo=self.kube_config.git_repo, git_branch=self.kube_config.git_branch, + command=command) + pod_id = self._create_job_id_from_key(key=key) pod = KubernetesPodBuilder( - image=KubeConfig.kube_image, + image=self.kube_config.kube_image, cmds=["bash", "-cx", "--"], args=[cmd_args], kub_req_factory=SimplePodRequestFactory(), namespace=self.namespace ) + pod.set_image_pull_policy("IfNotPresent") + pod.add_env_variables({"AIRFLOW__CORE__EXECUTOR": "LocalExecutor"}) pod.add_name(pod_id) + pod.add_labels({ + "dag_id": dag_id, + "task_id": task_id, + "execution_date": self._datetime_to_label_safe_datestring(execution_date) + }) pod.launch() - self._task_counter += 1 # the watcher will monitor pods, so we do not block. self.launcher.run_pod_async(pod) self.logger.info("k8s: Job created!") + def delete_job(self, key): + job_id = self._create_job_id_from_key(key) + self.helper.delete_pod(job_id, namespace=self.namespace) + def sync(self): """ @@ -162,41 +143,49 @@ class AirflowKubernetesScheduler(object): while not self.watcher_queue.empty(): self.process_watcher_task() - def process_watcher_task(self): - job_id, state = self.watcher_queue.get() - if state == State.RUNNING and job_id in self.pending_jobs: - self.pending_jobs.remove(job_id) - elif job_id in self.current_jobs: - self._complete_job(job_id, state) - - def _complete_job(self, job_id, state): - key = self.current_jobs[job_id] - self.logger.info("finishing job {}".format(key)) - self.result_queue.put((key, state)) - self.current_jobs.pop(job_id) - self.running.pop(key) - - def _create_job_id_from_key(self, key, epoch_time): - """ - - Kubernetes pod names must unique and match specific conventions - (i.e. no spaces, period, etc.) - This function creates a unique name using the epoch time and internal counter - - :param key: - - :param epoch_time: - - :return: - - """ + def end_task(self): + job_id, state, labels = self.watcher_queue.get() + logging.info("Attempting to finish job; job_id: {}; state: {}; labels: {}".format(job_id, state, labels)) + key = self._labels_to_key(labels) + if key: + self.logger.info("finishing job {}".format(key)) + self.result_queue.put((key, state)) + @staticmethod + def _create_job_id_from_key(key): keystr = '-'.join([str(x).replace(' ', '-') for x in key[:2]]) - job_fields = [keystr, str(self._task_counter), str(epoch_time)] + job_fields = [keystr] unformatted_job_id = '-'.join(job_fields) job_id = unformatted_job_id.replace('_', '-') return job_id + @staticmethod + def _label_safe_datestring_to_datetime(string): + """ + Kubernetes doesn't like ":" in labels, since ISO datetime format uses ":" but not "_" let's replace ":" with "_" + :param string: string + :return: datetime.datetime object + """ + return datetime.strptime(string.replace("_", ":"), "%Y-%m-%dT%H:%M:%S") + + @staticmethod + def _datetime_to_label_safe_datestring(datetime_obj): + """ + Kubernetes doesn't like ":" in labels, since ISO datetime format uses ":" but not "_" let's replace ":" with "_" + :param datetime_obj: datetime.datetime object + :return: ISO-like string representing the datetime + """ + return datetime_obj.isoformat().replace(":", "_") + + def _labels_to_key(self, labels): + try: + return labels["dag_id"], labels["task_id"], self._label_safe_datestring_to_datetime(labels["execution_date"]) + except Exception as e: + self.logger.warn("Error while converting labels to key; labels: {}; exception: {}".format( + labels, e + )) + return None + class KubernetesExecutor(BaseExecutor): def __init__(self): @@ -209,13 +198,10 @@ class KubernetesExecutor(BaseExecutor): def start(self): self.logger.info('k8s: starting kubernetes executor') - self.task_queue = Queue() self._session = settings.Session() + self.task_queue = Queue() self.result_queue = Queue() - self.pending_tasks = {} - self.kub_client = AirflowKubernetesScheduler(self.task_queue, - self.result_queue, - running=self.running) + self.kub_client = AirflowKubernetesScheduler(self.task_queue, self.result_queue) def sync(self): self.kub_client.sync() @@ -224,14 +210,9 @@ class KubernetesExecutor(BaseExecutor): self.logger.info("reporting {}".format(results)) self.change_state(*results) - # TODO this could be a job_counter based on max jobs a user wants - if self.job_queue_full() or self.cluster_at_capacity(): - self.logger.info("currently a job is running") - else: - self.logger.info("queue ready, running next") - if not self.task_queue.empty(): - (key, command) = self.task_queue.get() - self.kub_client.run_next((key, command)) + if not self.task_queue.empty(): + (key, command) = self.task_queue.get() + self.kub_client.run_next((key, command)) def job_queue_full(self): return len(self.kub_client.current_jobs) > PARALLELISM @@ -245,15 +226,18 @@ class KubernetesExecutor(BaseExecutor): def change_state(self, key, state): self.logger.info("k8s: setting state of {} to {}".format(key, state)) if state != State.RUNNING: - # self.kub_client.delete_job(key) - self.logger.info("current running {}".format(self.running)) - self.running.pop(key) + self.kub_client.delete_job(key) + try: + self.running.pop(key) + except KeyError: + pass self.event_buffer[key] = state (dag_id, task_id, ex_time) = key item = self._session.query(TaskInstance).filter_by( dag_id=dag_id, task_id=task_id, - execution_date=ex_time).one() + execution_date=ex_time + ).one() if item.state == State.RUNNING or item.state == State.QUEUED: item.state = state http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bb1e05c3/airflow/contrib/kubernetes/kubernetes_factory.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_factory.py b/airflow/contrib/kubernetes/kubernetes_factory.py index fc840bb..90b6f6c 100644 --- a/airflow/contrib/kubernetes/kubernetes_factory.py +++ b/airflow/contrib/kubernetes/kubernetes_factory.py @@ -37,6 +37,7 @@ class KubernetesResourceBuilder: self.secrets = {} self.node_selectors = [] self.name = None + self.image_pull_policy = None def add_env_variables(self, env): self.envs = env @@ -56,6 +57,23 @@ class KubernetesResourceBuilder: def set_namespace(self, namespace): self.namespace = namespace + def set_image_pull_policy(self, image_pull_policy): + self.image_pull_policy = image_pull_policy + + def launch(self): + """ + Launches the pod synchronously and waits for completion. + """ + k8s_beta = self._kube_client() + req = self.kub_req_factory.create(self) + self.logger.info(json.dumps(req)) + resp = k8s_beta.create_namespaced_pod(body=req, namespace=self.namespace) + self.logger.info("Job created. status='%s', yaml:\n%s", + str(resp.status), str(req)) + + def _kube_client(self): + config.load_incluster_config() + return client.CoreV1Api() class KubernetesPodBuilder(KubernetesResourceBuilder): def __init__(self, image, cmds, namespace, kub_req_factory=None): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bb1e05c3/airflow/contrib/kubernetes/kubernetes_helper.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_helper.py b/airflow/contrib/kubernetes/kubernetes_helper.py index 862d76a..cad7917 100644 --- a/airflow/contrib/kubernetes/kubernetes_helper.py +++ b/airflow/contrib/kubernetes/kubernetes_helper.py @@ -14,6 +14,8 @@ import yaml from kubernetes import client, config +from kubernetes.client.rest import ApiException +import kubernetes class KubernetesHelper(object): @@ -33,3 +35,11 @@ class KubernetesHelper(object): def delete_job(self, job_id, namespace): body = client.V1DeleteOptions() self.job_api.delete_namespaced_job(name=job_id, namespace=namespace, body=body) + + def delete_pod(self, pod_id, namespace): + body = client.V1DeleteOptions() + try: + self.pod_api.delete_namespaced_pod(pod_id, namespace, body=body) + except ApiException as e: + if e.status != 404: + raise http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bb1e05c3/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py index 2382561..7ea9bc2 100644 --- a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py @@ -32,6 +32,25 @@ class KubernetesRequestFactory(): """ pass + @abstractmethod + def after_create(self, body, pod): + """ + Is called after the create to augment the body. + + :param body: The request body + :param pod: The pod + """ + pass + + +class KubernetesRequestFactoryHelper(object): + """ + Helper methods to build a request for kubernetes + """ + @staticmethod + def extract_image_pull_policy(pod, req): + req['spec']['containers'][0]['imagePullPolicy'] = pod.image_pull_policy + @staticmethod def extract_image(pod, req): req['spec']['containers'][0]['image'] = pod.image http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bb1e05c3/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py index 4a0cbeb..d013016 100644 --- a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py @@ -45,6 +45,8 @@ spec: kreq.extract_name(pod, req) kreq.extract_labels(pod, req) kreq.extract_image(pod, req) + if pod.image_pull_policy: + kreq.extract_image_pull_policy(pod, req) kreq.extract_cmds(pod, req) kreq.extract_args(pod, req) if len(pod.node_selectors) > 0: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bb1e05c3/docker/Dockerfile ---------------------------------------------------------------------- diff --git a/docker/Dockerfile b/docker/Dockerfile index 2004e97..38e7c7c 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -27,6 +27,12 @@ RUN pip install /tmp/airflow.tar.gz # prep airflow ENV AIRFLOW_HOME=/root/airflow ENV AIRFLOW__CORE__EXECUTOR=KubernetesExecutor +ENV AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://root:root@postgres-airflow:5432/airflow + +# Don't read the raw revisions git-sync puts in the volume, we always want to read from the symlink +# A hack, how else can we do this +RUN mkdir -p $AIRFLOW_HOME/dags && echo "rev-[a-zA-Z0-9]+" > $AIRFLOW_HOME/dags/.airflowignore + COPY bootstrap.sh /bootstrap.sh RUN chmod +x /bootstrap.sh http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bb1e05c3/kube/airflow.yaml.template ---------------------------------------------------------------------- diff --git a/kube/airflow.yaml.template b/kube/airflow.yaml.template index c5877ca..eca6d3c 100644 --- a/kube/airflow.yaml.template +++ b/kube/airflow.yaml.template @@ -13,12 +13,7 @@ spec: { "name": "init", "image": "{{docker_image}}", - "command": ["bash", "-c", "cd /usr/local/lib/python2.7/dist-packages/airflow && airflow initdb && alembic upgrade head"], - "env": [ - {"name": "AIRFLOW__CORE__SQL_ALCHEMY_CONN", "value": "postgresql+psycopg2://root:root@postgres-airflow:5432/airflow"}, - {"name": "AIRFLOW__CORE__K8S_IMAGE", "value": "{{docker_image}}"}, - {"name": "AIRFLOW__CORE__K8S_GIT_REPO", "value": "https://github.com/grantnicholas/testdags.git"} - ] + "command": ["bash", "-c", "cd /usr/local/lib/python2.7/dist-packages/airflow && airflow initdb && alembic upgrade head"] } ]' spec: @@ -28,13 +23,6 @@ spec: command: [ "bash", "-c", "cd /usr/local/lib/python2.7/dist-packages/airflow && airflow initdb && alembic upgrade head" ] - env: - - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN - value: postgresql+psycopg2://root:root@postgres-airflow:5432/airflow - - name: AIRFLOW__CORE__K8S_IMAGE - value: {{docker_image}} - - name: AIRFLOW__CORE__K8S_GIT_REPO - value: https://github.com/grantnicholas/testdags.git containers: - name: web image: {{docker_image}} @@ -43,12 +31,14 @@ spec: containerPort: 8080 args: ["webserver"] env: - - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN - value: postgresql+psycopg2://root:root@postgres-airflow:5432/airflow + - name: AIRFLOW__CORE__EXECUTOR + value: KubernetesExecutor - name: AIRFLOW__CORE__K8S_IMAGE value: {{docker_image}} - name: AIRFLOW__CORE__K8S_GIT_REPO value: https://github.com/grantnicholas/testdags.git + - name: AIRFLOW__CORE__K8S_GIT_BRANCH + value: master volumeMounts: - name: dags mountPath: /root/airflow/dags/synched @@ -70,12 +60,16 @@ spec: image: {{docker_image}} args: ["scheduler"] env: - - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN - value: postgresql+psycopg2://root:root@postgres-airflow:5432/airflow + - name: AIRFLOW__CORE__EXECUTOR + value: KubernetesExecutor - name: AIRFLOW__CORE__K8S_IMAGE value: {{docker_image}} - name: AIRFLOW__CORE__K8S_GIT_REPO value: https://github.com/grantnicholas/testdags.git + - name: AIRFLOW__CORE__K8S_GIT_BRANCH + value: master + - name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL + value: "60" volumeMounts: - name: dags mountPath: /root/airflow/dags/synched @@ -84,6 +78,8 @@ spec: env: - name: GIT_SYNC_REPO value: https://github.com/grantnicholas/testdags.git + - name: GIT_SYNC_BRANCH + value: master - name: GIT_SYNC_DEST value: git volumeMounts:
