http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/contrib/operators/k8s_pod_operator/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/k8s_pod_operator/__init__.py b/airflow/contrib/operators/k8s_pod_operator/__init__.py deleted file mode 100644 index 50c7b86..0000000 --- a/airflow/contrib/operators/k8s_pod_operator/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -from .k8s_pod_operator import *
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/contrib/operators/k8s_pod_operator/k8s_pod_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/k8s_pod_operator/k8s_pod_operator.py b/airflow/contrib/operators/k8s_pod_operator/k8s_pod_operator.py deleted file mode 100644 index 6af66ea..0000000 --- a/airflow/contrib/operators/k8s_pod_operator/k8s_pod_operator.py +++ /dev/null @@ -1,120 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging - -from airflow.exceptions import AirflowException -from airflow.operators.python_operator import PythonOperator -from airflow.utils.decorators import apply_defaults -from airflow.contrib.kubernetes.pod_launcher import KubernetesLauncher, \ - KubernetesCommunicationService, incluster_namespace -from airflow.contrib.kubernetes.kubernetes_request_factory import \ - SimplePodRequestFactory, \ - ReturnValuePodRequestFactory - - -class PodOperator(PythonOperator): - """ - Executes a pod and waits for the job to finish. - - :param dag_run_id: The unique run ID that would be attached to the pod as a label - :type dag_run_id: str - :param pod_factory: Reference to the function that creates the pod with format: - function (OpContext) => Pod - :type pod_factory: callable - """ - # template_fields = tuple('dag_run_id') - ui_color = '#8da7be' - - @apply_defaults - def __init__( - self, - dag_run_id, - pod_factory, - kube_request_factory=None, - *args, **kwargs): - super(PodOperator, self).__init__(python_callable=lambda _: 1, provide_context=True, *args, **kwargs) - self.logger = logging.getLogger(self.__class__.__name__) - if not callable(pod_factory): - raise AirflowException('`pod_factory` param must be callable') - self.dag_run_id = dag_run_id - self.pod_factory = pod_factory - self.kwargs = kwargs - self._kube_request_factory = kube_request_factory or SimplePodRequestFactory - - def execute(self, context): - pod = self.get_pod_object(context) - - # Customize the pod - pod.name = self.task_id - pod.labels['run_id'] = self.dag_run_id - try: - pod.namespace = self.dag.default_args.get('namespace', pod.namespace) or incluster_namespace() - except: - # Used default namespace - pass - - # Launch the pod and wait for it to finish - KubernetesLauncher(pod, self._kube_request_factory).launch() - result = pod.result - context['ti'].xcom_push(key='result', value=result) - - custom_return_value = self.on_pod_success(context) - self.set_custom_return_value(context, custom_return_value) - return result - - def on_pod_success(self, context): - """ - Called when pod is executed successfully. - :return: Returns a custom return value for pod which will - be stored in xcom - """ - pass - - def get_pod_object(self, context): - """ - Returns a pod object. Overwrite this method to define custom objects - :param context: The task context - :return: The pod object - """ - return self.pod_factory(context) - - def set_custom_return_value(self, context, custom_return_value): - if custom_return_value: - context['ti'].xcom_push(key='custom_result', value=custom_return_value) - - -class ReturnValuePodOperator(PodOperator): - """ - This pod operators is a normal pod operator with the addition of - reading custom return value back from kubernetes. - """ - def __init__(self, - result_data_file, - kube_com_service_factory=None, - *args, **kwargs): - super(ReturnValuePodOperator, self).__init__(*args, **kwargs) - kube_com_service_factory = kube_com_service_factory or ( - lambda: KubernetesCommunicationService.from_dag_default_args(self.dag)) - if not isinstance(kube_com_service_factory(), KubernetesCommunicationService): - raise AirflowException('`kube_com_service_factory` must be of type KubernetesCommunicationService') - self._kube_com_service_factory = kube_com_service_factory - self._result_data_file = result_data_file - self._kube_request_factory = self._return_value_kube_request # Overwrite the default request factory - - def on_pod_success(self, context): - return self._kube_com_service_factory().pod_return_data(self.task_id) - - def _return_value_kube_request(self): - return ReturnValuePodRequestFactory(self._kube_com_service_factory, self._result_data_file) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/contrib/operators/kubernetes/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/kubernetes/__init__.py b/airflow/contrib/operators/kubernetes/__init__.py deleted file mode 100644 index 9d7677a..0000000 --- a/airflow/contrib/operators/kubernetes/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/contrib/operators/kubernetes/pod_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/kubernetes/pod_operator.py b/airflow/contrib/operators/kubernetes/pod_operator.py deleted file mode 100644 index 0db8c6d..0000000 --- a/airflow/contrib/operators/kubernetes/pod_operator.py +++ /dev/null @@ -1,100 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from airflow.exceptions import AirflowException -from airflow.operators.python_operator import PythonOperator -from airflow.utils.decorators import apply_defaults -from airflow.contrib.kubernetes.pod_launcher import PodLauncher -from airflow.contrib.kubernetes.pod import Pod -from airflow.utils.state import State - - -class PodOperator(PythonOperator): - """ - Executes a pod and waits for the job to finish. - - :param dag_run_id: The unique run ID that would be attached to the pod as a label - :type dag_run_id: str - :param pod_factory: Reference to the function that creates the pod with format: - function (OpContext) => Pod - :type pod_factory: callable - :param cache_output: If set to true, the output of the pod would be saved in a - cache object using md5 hash of all the pod parameters and in case of success, the cached - results will be returned on consecutive calls. Only use this - """ - # template_fields = tuple('dag_run_id') - ui_color = '#8da7be' - - def blank_func(self, context): - return None - - @apply_defaults - def __init__( - self, - dag_run_id, - pod, - on_pod_success_func = blank_func, - *args, - **kwargs - ): - # type: (str, Pod) -> PodOperator - super(PodOperator, self).__init__( - python_callable=lambda _:1, - provide_context=True, - *args, - **kwargs) - self.pod = pod - self.dag_run_id = dag_run_id - self.pod_launcher = PodLauncher() - self.kwargs = kwargs - self._on_pod_success_func = on_pod_success_func - - def execute(self, context): - task_instance = context.get('task_instance') - if task_instance is None: - raise AirflowException('`task_instance` is empty! This should not happen') - - pod = self.pod - - # Customize the pod - pod.name = self.task_id - pod.labels['run_id'] = self.dag_run_id - pod.namespace = self.dag.default_args.get('namespace', pod.namespace) - - pod_result = self.pod_launcher.run_pod(pod) - - if pod_result == State.FAILED: - raise AirflowException("Pod returned a failed status") - - # Launch the pod and wait for it to finish - self.op_context.result = pod.result - if pod_result == State.FAILED: - raise AirflowException("Pod failed") - - # Cache the output - custom_return_value = self.on_pod_success(context) - if custom_return_value: - return custom_return_value - - def on_pod_success(self, context): - """ - Called when pod is executed successfully. - - If you want to access return values for XCOM, place values - in accessible file system or DB and override this function. - - :return: Returns a custom return value for pod which will - be stored in xcom - """ - return self._on_pod_success_func(context=context) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/dag_importer/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/dag_importer/__init__.py b/airflow/dag_importer/__init__.py deleted file mode 100644 index f0a792d..0000000 --- a/airflow/dag_importer/__init__.py +++ /dev/null @@ -1,83 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -from airflow import configuration - - -def _integrate_plugins(): - pass - - -dag_import_spec = {} - - -def import_dags(): - logging.info("importing dags") - if configuration.has_option('core', 'k8s_mode'): - mode = configuration.get('core', 'k8s_mode') - dag_import_func(mode)() - else: - _import_hostpath() - - -def dag_import_func(mode): - return { - 'git': _import_git, - 'cinder': _import_cinder, - }.get(mode, _import_hostpath) - - -def _import_hostpath(): - - logging.info("importing dags locally") - spec = {'name': 'shared-data', 'hostPath': {}} - spec['hostPath']['path'] = '/tmp/dags' - global dag_import_spec - dag_import_spec = spec - - -def _import_cinder(): - ''' - kind: StorageClass - apiVersion: storage.k8s.io/v1 - metadata: - name: gold - provisioner: kubernetes.io/cinder - parameters: - type: fast - availability: nova - :return: - ''' - global dag_import_spec - spec = {} - - spec['kind'] = 'StorageClass' - spec['apiVersion'] = 'storage.k8s.io/v1' - spec['metatdata']['name'] = 'gold' - spec['provisioner'] = 'kubernetes.io/cinder' - spec['parameters']['type'] = 'fast' - spec['availability'] = 'nova' - - -def _import_git(): - logging.info("importing dags from github") - global dag_import_spec - git_link = configuration.get('core', 'k8s_git_link') - spec = {'name': 'shared-data', 'gitRepo': {}} - spec['gitRepo']['repository'] = git_link - if configuration.has_option('core','k8s_git_revision'): - revision = configuration.get('core', 'k8s_git_revision') - spec['gitRepo']['revision'] = revision - dag_import_spec = spec http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/example_dags/example_kubernetes_executor.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_kubernetes_executor.py b/airflow/example_dags/example_kubernetes_executor.py index 31bc2fb..2a02ef6 100644 --- a/airflow/example_dags/example_kubernetes_executor.py +++ b/airflow/example_dags/example_kubernetes_executor.py @@ -15,10 +15,8 @@ from __future__ import print_function import airflow from airflow.operators.python_operator import PythonOperator from airflow.models import DAG -from airflow.contrib.executors.kubernetes_executor import KubernetesExecutorConfig import os - args = { 'owner': 'airflow', 'start_date': airflow.utils.dates.days_ago(2) @@ -39,7 +37,6 @@ def use_zip_binary(): assert rc == 0 - # You don't have to use any special KubernetesExecutor configuration if you don't want to start_task = PythonOperator( task_id="start_task", python_callable=print_stuff, dag=dag @@ -60,7 +57,8 @@ two_task = PythonOperator( # Limit resources on this operator/task three_task = PythonOperator( task_id="three_task", python_callable=print_stuff, dag=dag, - executor_config={"KubernetesExecutor": {"request_memory": "128Mi", "limit_memory": "128Mi"}} + executor_config={ + "KubernetesExecutor": {"request_memory": "128Mi", "limit_memory": "128Mi"}} ) start_task.set_downstream([one_task, two_task, three_task]) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/example_dags/example_pod_operator.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_pod_operator.py b/airflow/example_dags/example_pod_operator.py deleted file mode 100644 index ec62aaf..0000000 --- a/airflow/example_dags/example_pod_operator.py +++ /dev/null @@ -1,91 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -""" -Example of the PodOperator and ReturnValuePodOperator which would execute -pods on a Kubernetes cluster. PodOperator would only work if airflow is -deployed within kubernetes. -""" -import os - -import airflow -import random -from airflow.contrib.kubernetes.pod import Pod, Config -from airflow.contrib.operators.k8s_pod_operator import ReturnValuePodOperator, PodOperator -from airflow.models import DAG -from airflow.utils.trigger_rule import TriggerRule - -# TODO: Replace the etcd endpoint with your own etcd endpoint -args = { - 'owner': 'airflow', - 'etcd_endpoint': os.environ.get('AIRFLOWSVC_SERVICE_HOST') + ':' + - os.environ.get('AIRFLOWSVC_SERVICE_PORT_ETCDSVC_PORT'), - 'start_date': airflow.utils.dates.days_ago(2) -} - -docker_image = 'artprod.dev.bloomberg.com/ds/molecula-python:1.0.0.0-SNAPSHOT' # Replace with 'ubuntu:latest' -dag = DAG( - dag_id='example_pod_operator', default_args=args, - schedule_interval=None) - - -def pod_that_returns_hello(context): - """ - Returns a Pod object given the airflow context. - """ - image = docker_image - cmds = ['/bin/bash', '-c', 'echo "Hello $RANDOM" > /tmp/result.txt'] - return Pod(image=image, cmds=cmds) - - -hello_kube_step1 = ReturnValuePodOperator(dag=dag, - task_id='hello-kube-step1', - dag_run_id='run-1', - pod_factory=pod_that_returns_hello, - result_data_file='/tmp/result.txt') - - -def pod_that_reads_upstream_result(context): - up_task_id = 'hello-kube-step1' - # The message including a random number generated inside the upstream pod will be read here - return_val = context['ti'].xcom_pull(key='custom_result', task_ids=up_task_id) - image = docker_image - cmds = ['/bin/bash', '-c', 'echo ' + return_val] - return Pod(image=image, cmds=cmds) - - -hello_kube_step2 = PodOperator(dag=dag, - task_id='hello-kube-step2', - dag_run_id='run_1', - pod_factory=pod_that_reads_upstream_result) -hello_kube_step2.set_upstream(hello_kube_step1) - -def pod_that_injects_configs(context): - """ - The returning pod object has a configs map which tells the operator to inject some JSON objects as - config files - """ - image = docker_image # Replace with 'ubuntu:latest' - configs = [ Config('/configs/c1.json', { 'random_val': str(random.random()) }), Config('/configs/c2.json', { 'my_db': 'conn_str' }) ] - cmds = ['/bin/bash', '-c', 'sleep 3; cat /configs/c2.json'] - return Pod(image=image, cmds=cmds, configs=configs) - -hello_kube_step3 = ReturnValuePodOperator(dag=dag, - task_id='hello-kube-step3', - dag_run_id='run_1', - pod_factory=pod_that_injects_configs, - result_data_file='/configs/c1.json') -hello_kube_step3.set_upstream(hello_kube_step1) - - - http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/executors/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/executors/__init__.py b/airflow/executors/__init__.py index 047da6f..7ae396c 100644 --- a/airflow/executors/__init__.py +++ b/airflow/executors/__init__.py @@ -17,7 +17,7 @@ # specific language governing permissions and limitations # under the License. import sys - +from airflow.utils.log.logging_mixin import LoggingMixin from airflow import configuration from airflow.exceptions import AirflowException from airflow.executors.base_executor import BaseExecutor http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/executors/base_executor.py ---------------------------------------------------------------------- diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index 7f00e93..0648f9b 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -133,7 +133,10 @@ class BaseExecutor(LoggingMixin): ti.refresh_from_db() if ti.state != State.RUNNING: self.running[key] = command - self.execute_async(key, command=command, queue=queue, executor_config=ti.executor_config) + self.execute_async(key=key, + command=command, + queue=queue, + executor_config=ti.executor_config) else: self.logger.info( 'Task is already running, not sending to ' @@ -144,6 +147,7 @@ class BaseExecutor(LoggingMixin): self.sync() def change_state(self, key, state): + print("popping: {}".format(key)) self.running.pop(key) self.event_buffer[key] = state @@ -174,7 +178,11 @@ class BaseExecutor(LoggingMixin): return cleared_events - def execute_async(self, key, command, queue=None, executor_config=None): # pragma: no cover + def execute_async(self, + key, + command, + queue=None, + executor_config=None): # pragma: no cover """ This method will execute the command asynchronously. """ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/executors/celery_executor.py ---------------------------------------------------------------------- diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index 70d0088..2de7c46 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -79,7 +79,8 @@ class CeleryExecutor(BaseExecutor): self.last_state = {} def execute_async(self, key, command, - queue=DEFAULT_CELERY_CONFIG['task_default_queue'], executor_config=None): + queue=DEFAULT_CELERY_CONFIG['task_default_queue'], + executor_config=None): self.log.info( "[celery] queuing {key} through celery, " "queue={queue}".format(**locals())) self.tasks[key] = execute_command.apply_async( http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/executors/local_executor.py ---------------------------------------------------------------------- diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py index 4ac25f5..9f75948 100644 --- a/airflow/executors/local_executor.py +++ b/airflow/executors/local_executor.py @@ -222,7 +222,7 @@ class LocalExecutor(BaseExecutor): self.impl.start() def execute_async(self, key, command, queue=None, executor_config=None): - self.queue.put((key, command)) + self.impl.execute_async(key=key, command=command) def sync(self): self.impl.sync() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/migrations/versions/27c6a30d7c24_add_executor_config_to_task_instance.py ---------------------------------------------------------------------- diff --git a/airflow/migrations/versions/27c6a30d7c24_add_executor_config_to_task_instance.py b/airflow/migrations/versions/27c6a30d7c24_add_executor_config_to_task_instance.py index 84c41ec..b7213a3 100644 --- a/airflow/migrations/versions/27c6a30d7c24_add_executor_config_to_task_instance.py +++ b/airflow/migrations/versions/27c6a30d7c24_add_executor_config_to_task_instance.py @@ -1,3 +1,4 @@ +# flake8: noqa # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py ---------------------------------------------------------------------- diff --git a/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py b/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py index d642476..4347bae 100644 --- a/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py +++ b/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py @@ -1,3 +1,4 @@ +# flake8: noqa # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -25,7 +26,6 @@ down_revision = 'd2ae31099d61' branch_labels = None depends_on = None - from alembic import op import sqlalchemy as sa http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index ae387b6..2de1ade 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -2223,10 +2223,17 @@ class BaseOperator(LoggingMixin): :param task_concurrency: When set, a task will be able to limit the concurrent runs across execution_dates :type task_concurrency: int - :param executor_config: Additional task-level configuration parameters that are - interpreted by a specific executor. Parameters are namespaced by the name of executor. - ``example: to run this task in a specific docker container through the KubernetesExecutor - MyOperator(..., executor_config={"KubernetesExecutor": {"image": "myCustomDockerImage"}})`` + :param executor_config: Additional task-level configuration parameters that are + interpreted by a specific executor. Parameters are namespaced by the name of + executor. + ``example: to run this task in a specific docker container through + the KubernetesExecutor + MyOperator(..., + executor_config={ + "KubernetesExecutor": + {"image": "myCustomDockerImage"} + } + )`` :type executor_config: dict """ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/kubectl ---------------------------------------------------------------------- diff --git a/kubectl b/kubectl deleted file mode 100644 index e69de29..0000000 http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/scripts/ci/kubernetes/docker/Dockerfile ---------------------------------------------------------------------- diff --git a/scripts/ci/kubernetes/docker/Dockerfile b/scripts/ci/kubernetes/docker/Dockerfile index b1bc493..ad7919f 100644 --- a/scripts/ci/kubernetes/docker/Dockerfile +++ b/scripts/ci/kubernetes/docker/Dockerfile @@ -33,6 +33,9 @@ RUN apt-get update -y && apt-get install -y \ RUN pip install -U setuptools && \ pip install -U pip +COPY requirements.txt /tmp/requirements.txt +RUN pip install -r /tmp/requirements.txt + RUN pip install kubernetes && \ pip install cryptography && \ pip install psycopg2==2.7.3.1 # I had issues with older versions of psycopg2, just a warning http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/scripts/ci/kubernetes/docker/requirements.txt ---------------------------------------------------------------------- diff --git a/scripts/ci/kubernetes/docker/requirements.txt b/scripts/ci/kubernetes/docker/requirements.txt new file mode 100644 index 0000000..6b823d9 --- /dev/null +++ b/scripts/ci/kubernetes/docker/requirements.txt @@ -0,0 +1,35 @@ +alembic +bleach +configparser +croniter +dill +flask +flask-admin +flask-caching +flask-login +flask-swagger +flask-wtf +funcsigs +future +gitpython +gunicorn +iso8601 +jinja2 +lxml +markdown +pandas +pendulum +psutil +pygments +python-daemon +python-dateutil +python-nvd3 +requests +setproctitle +sqlalchemy +sqlalchemy-utc +tabulate +thrift +tzlocal +werkzeug +zope.deprecation http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/scripts/ci/kubernetes/kube/airflow.yaml.template ---------------------------------------------------------------------- diff --git a/scripts/ci/kubernetes/kube/airflow.yaml.template b/scripts/ci/kubernetes/kube/airflow.yaml.template index af54175..ae00983 100644 --- a/scripts/ci/kubernetes/kube/airflow.yaml.template +++ b/scripts/ci/kubernetes/kube/airflow.yaml.template @@ -73,7 +73,7 @@ spec: - "bash" args: - "-cx" - - "cd /usr/local/lib/python2.7/dist-packages/airflow && cp -R example_dags/* /root/airflow/dags/ && airflow initdb && alembic upgrade head" + - "cd /usr/local/lib/python2.7/dist-packages/airflow && cp -R example_dags/* /root/airflow/dags/ && airflow initdb && alembic upgrade heads" containers: - name: web image: {{docker_image}}:{{docker_tag}} @@ -113,7 +113,7 @@ spec: path: /admin port: 8080 - name: scheduler - image: {{docker_image}} + image: {{docker_image}}:{{docker_tag}} imagePullPolicy: IfNotPresent args: ["scheduler"] env: @@ -173,7 +173,7 @@ data: dags_folder = /root/airflow/dags base_log_folder = /root/airflow/logs logging_level = INFO - executor = KubernetesExecutor + executor = KubernetesExecutor parallelism = 32 plugins_folder = /root/airflow/plugins sql_alchemy_conn = $SQL_ALCHEMY_CONN http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/scripts/ci/kubernetes/kube/deploy.sh ---------------------------------------------------------------------- diff --git a/scripts/ci/kubernetes/kube/deploy.sh b/scripts/ci/kubernetes/kube/deploy.sh index 953f50f..27c707f 100755 --- a/scripts/ci/kubernetes/kube/deploy.sh +++ b/scripts/ci/kubernetes/kube/deploy.sh @@ -20,7 +20,7 @@ TAG=${2:-latest} DIRNAME=$(cd "$(dirname "$0")"; pwd) # create an emptydir for postgres to store it's volume data in -sudo mkdir -p /data/postgres-airflow +#sudo mkdir -p /data/postgres-airflow mkdir -p $DIRNAME/.generated kubectl apply -f $DIRNAME/postgres.yaml @@ -39,3 +39,13 @@ do fi sleep 4 done + +POD=$(kubectl get pods -o go-template --template '{{range .items}}{{.metadata.name}}{{"\n"}}{{end}}' | grep airflow | head -1) + +echo "------- pod description -------" +kubectl describe pod $POD +echo "------- web logs -------" +kubectl logs $POD web +echo "------- scheduler logs -------" +kubectl logs $POD scheduler +echo "--------------" http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/scripts/ci/kubernetes/kube/postgres.yaml ---------------------------------------------------------------------- diff --git a/scripts/ci/kubernetes/kube/postgres.yaml b/scripts/ci/kubernetes/kube/postgres.yaml index 79366d0..e0bbdff 100644 --- a/scripts/ci/kubernetes/kube/postgres.yaml +++ b/scripts/ci/kubernetes/kube/postgres.yaml @@ -14,30 +14,31 @@ # KIND, either express or implied. See the License for the * # specific language governing permissions and limitations * # under the License. * - -apiVersion: v1 -kind: PersistentVolume -metadata: - name: postgres-airflow -spec: - accessModes: - - ReadWriteOnce - capacity: - storage: 5Gi - hostPath: - path: /data/postgres-airflow ---- -kind: PersistentVolumeClaim -apiVersion: v1 -metadata: - name: postgres-airflow -spec: - accessModes: - - ReadWriteOnce - resources: - requests: - storage: 5Gi ---- +# +#apiVersion: v1 +#kind: PersistentVolume +#metadata: +# name: postgres-airflow +#spec: +# accessModes: +# - ReadWriteOnce +# capacity: +# storage: 5Gi +# hostPath: +# path: /data/postgres-airflow +# +#--- +#kind: PersistentVolumeClaim +#apiVersion: v1 +#metadata: +# name: postgres-airflow +#spec: +# accessModes: +# - ReadWriteOnce +# resources: +# requests: +# storage: 5Gi +#--- kind: Deployment apiVersion: extensions/v1beta1 metadata: @@ -95,8 +96,9 @@ spec: cpu: .5 volumes: - name: dbvol - persistentVolumeClaim: - claimName: postgres-airflow + emptyDir: {} +# persistentVolumeClaim: +# claimName: postgres-airflow --- apiVersion: v1 kind: Service http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/scripts/ci/kubernetes/minikube/start_minikube.sh ---------------------------------------------------------------------- diff --git a/scripts/ci/kubernetes/minikube/start_minikube.sh b/scripts/ci/kubernetes/minikube/start_minikube.sh index 8a27d75..be370cf 100755 --- a/scripts/ci/kubernetes/minikube/start_minikube.sh +++ b/scripts/ci/kubernetes/minikube/start_minikube.sh @@ -1,19 +1,19 @@ -# Licensed to the Apache Software Foundation (ASF) under one * -# or more contributor license agreements. See the NOTICE file * -# distributed with this work for additional information * -# regarding copyright ownership. The ASF licenses this file * -# to you under the Apache License, Version 2.0 (the * -# "License"); you may not use this file except in compliance * -# with the License. You may obtain a copy of the License at * -# * -# http://www.apache.org/licenses/LICENSE-2.0 * -# * -# Unless required by applicable law or agreed to in writing, * -# software distributed under the License is distributed on an * -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * -# KIND, either express or implied. See the License for the * -# specific language governing permissions and limitations * -# under the License. * +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. # Guard against a kubernetes cluster already being up kubectl get pods &> /dev/null @@ -23,8 +23,8 @@ if [ $? -eq 0 ]; then fi # -curl -Lo minikube https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64 && chmod +x minikube -curl -Lo kubectl https://storage.googleapis.com/kubernetes-release/release/v1.7.0/bin/linux/amd64/kubectl && chmod +x kubectl +curl -Lo minikube https://storage.googleapis.com/minikube/releases/v0.24.1/minikube-linux-amd64 && chmod +x minikube +curl -Lo kubectl https://storage.googleapis.com/kubernetes-release/release/${KUBERNETES_VERSION}/bin/linux/amd64/kubectl && chmod +x kubectl sudo mkdir -p /usr/local/bin sudo mv minikube /usr/local/bin/minikube @@ -38,15 +38,43 @@ mkdir $HOME/.kube || true touch $HOME/.kube/config export KUBECONFIG=$HOME/.kube/config -sudo -E minikube start --vm-driver=none - -# this for loop waits until kubectl can access the api server that minikube has created -for i in {1..150} # timeout for 5 minutes -do - echo "------- Running kubectl get pods -------" - kubectl get po &> /dev/null - if [ $? -ne 1 ]; then - break - fi - sleep 2 -done + +start_minikube(){ + sudo -E minikube start --vm-driver=none --kubernetes-version="${KUBERNETES_VERSION}" + + # this for loop waits until kubectl can access the api server that minikube has created + for i in {1..90} # timeout 3 minutes + do + echo "------- Running kubectl get pods -------" + STDERR=$(kubectl get pods 2>&1 >/dev/null) + if [ $? -eq 0 ]; then + echo $STDERR + + # We do not need dynamic hostpath provisioning, so disable the default storageclass + sudo -E minikube addons disable default-storageclass && kubectl delete storageclasses --all + + # We need to give permission to watch pods to the airflow scheduler. + # The easiest way to do that is by giving admin access to the default serviceaccount (NOT SAFE!) + kubectl create clusterrolebinding add-on-cluster-admin --clusterrole=cluster-admin --serviceaccount=default:default + exit 0 + fi + echo $STDERR + sleep 2 + done +} + +cleanup_minikube(){ + sudo -E minikube stop + sudo -E minikube delete + docker stop $(docker ps -a -q) || true + docker rm $(docker ps -a -q) || true + sleep 1 +} + +start_minikube +echo "Minikube cluster creation timedout. Attempting to restart the minikube cluster." +cleanup_minikube +start_minikube +echo "Minikube cluster creation timedout a second time. Failing." + +exit 1 http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/scripts/ci/travis_script.sh ---------------------------------------------------------------------- diff --git a/scripts/ci/travis_script.sh b/scripts/ci/travis_script.sh index 5b2a198..8766e94 100755 --- a/scripts/ci/travis_script.sh +++ b/scripts/ci/travis_script.sh @@ -21,12 +21,12 @@ DIRNAME=$(cd "$(dirname "$0")"; pwd) AIRFLOW_ROOT="$DIRNAME/../.." cd $AIRFLOW_ROOT && pip --version && ls -l $HOME/.wheelhouse && tox --version -if [ -z "$RUN_KUBE_INTEGRATION" ]; +if [ -z "$KUBERNETES_VERSION" ]; then tox -e $TOX_ENV else - $DIRNAME/kubernetes/setup_kubernetes.sh && \ - tox -e $TOX_ENV -- tests.contrib.executors.integration \ + KUBERNETES_VERSION=${KUBERNETES_VERSION} $DIRNAME/kubernetes/setup_kubernetes.sh && \ + tox -e $TOX_ENV -- tests.contrib.minikube_tests \ --with-coverage \ --cover-erase \ --cover-html \ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/tests/contrib/executors/integration/__init__.py ---------------------------------------------------------------------- diff --git a/tests/contrib/executors/integration/__init__.py b/tests/contrib/executors/integration/__init__.py deleted file mode 100644 index 9d7677a..0000000 --- a/tests/contrib/executors/integration/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/tests/contrib/executors/integration/airflow_controller.py ---------------------------------------------------------------------- diff --git a/tests/contrib/executors/integration/airflow_controller.py b/tests/contrib/executors/integration/airflow_controller.py deleted file mode 100644 index 499adb4..0000000 --- a/tests/contrib/executors/integration/airflow_controller.py +++ /dev/null @@ -1,114 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import subprocess -import time - - -class RunCommandError(Exception): - pass - - -class TimeoutError(Exception): - pass - - -class DagRunState: - SUCCESS = "success" - FAILED = "failed" - RUNNING = "running" - - -def run_command(command): - process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, stderr = process.communicate() - if process.returncode != 0: - raise RunCommandError("Error while running command: {}; Stdout: {}; Stderr: {}".format( - command, stdout, stderr - )) - return stdout, stderr - - -def run_command_in_pod(pod_name, container_name, command): - return run_command("kubectl exec {pod_name} -c {container_name} -- {command}".format( - pod_name=pod_name, container_name=container_name, command=command - )) - -def _unpause_dag(dag_id, airflow_pod=None): - airflow_pod = airflow_pod or _get_airflow_pod() - return run_command_in_pod(airflow_pod, "scheduler", "airflow unpause {dag_id}".format(dag_id=dag_id)) - -def run_dag(dag_id, run_id, airflow_pod=None): - airflow_pod = airflow_pod or _get_airflow_pod() - _unpause_dag(dag_id, airflow_pod) - return run_command_in_pod(airflow_pod, "scheduler", "airflow trigger_dag {dag_id} -r {run_id}".format( - dag_id=dag_id, run_id=run_id - )) - - -def _get_pod_by_grep(grep_phrase): - stdout, stderr = run_command("kubectl get pods | grep {grep_phrase} | awk '{{print $1}}'".format( - grep_phrase=grep_phrase - )) - pod_name = stdout.strip() - return pod_name - - -def _get_airflow_pod(): - return _get_pod_by_grep("^airflow") - - -def _get_postgres_pod(): - return _get_pod_by_grep("^postgres") - - -def _parse_state(stdout): - end_line = "(1 row)" - prev_line = None - for line in stdout.split("\n"): - if end_line in line: - return prev_line.strip() - prev_line = line - - raise Exception("Unknown psql output: {}".format(stdout)) - -def get_dag_run_state(dag_id, run_id, postgres_pod=None): - postgres_pod = postgres_pod or _get_postgres_pod() - stdout, stderr = run_command_in_pod( - postgres_pod, "postgres", - """psql airflow -c "select state from dag_run where dag_id='{dag_id}' and run_id='{run_id}'" """.format( - dag_id=dag_id, run_id=run_id - ) - ) - return _parse_state(stdout) - - -def dag_final_state(dag_id, run_id, postgres_pod=None, poll_interval=1, timeout=120): - postgres_pod = postgres_pod or _get_postgres_pod() - for _ in range(0, timeout / poll_interval): - dag_state = get_dag_run_state(dag_id, run_id, postgres_pod) - if dag_state != DagRunState.RUNNING: - return dag_state - time.sleep(poll_interval) - - raise TimeoutError("Timed out while waiting for DagRun with dag_id: {} run_id: {}".format(dag_id, run_id)) - - -def _kill_pod(pod_name): - return run_command("kubectl delete pod {pod_name}".format(pod_name=pod_name)) - - -def kill_scheduler(): - airflow_pod = _get_pod_by_grep("^airflow") - return _kill_pod(airflow_pod) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/tests/contrib/executors/integration/test_kubernetes_executor_integration.py ---------------------------------------------------------------------- diff --git a/tests/contrib/executors/integration/test_kubernetes_executor_integration.py b/tests/contrib/executors/integration/test_kubernetes_executor_integration.py deleted file mode 100644 index 97949ae..0000000 --- a/tests/contrib/executors/integration/test_kubernetes_executor_integration.py +++ /dev/null @@ -1,65 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest -import time -from uuid import uuid4 -from tests.contrib.executors.integration.airflow_controller import ( - run_command, RunCommandError, - run_dag, get_dag_run_state, dag_final_state, DagRunState, - kill_scheduler -) - - -try: - run_command("kubectl get pods") -except RunCommandError: - SKIP_KUBE = True -else: - SKIP_KUBE = False - - -class KubernetesExecutorTest(unittest.TestCase): - - @unittest.skipIf(SKIP_KUBE, 'Kubernetes integration tests are unsupported by this configuration') - def test_kubernetes_executor_dag_runs_successfully(self): - dag_id, run_id = "example_python_operator", uuid4().hex - run_dag(dag_id, run_id) - state = dag_final_state(dag_id, run_id, timeout=120) - self.assertEquals(state, DagRunState.SUCCESS) - - @unittest.skipIf(SKIP_KUBE, 'Kubernetes integration tests are unsupported by this configuration') - def test_start_dag_then_kill_scheduler_then_ensure_dag_succeeds(self): - dag_id, run_id = "example_python_operator", uuid4().hex - run_dag(dag_id, run_id) - - self.assertEquals(get_dag_run_state(dag_id, run_id), DagRunState.RUNNING) - - time.sleep(10) - - kill_scheduler() - - self.assertEquals(dag_final_state(dag_id, run_id, timeout=180), DagRunState.SUCCESS) - - @unittest.skipIf(SKIP_KUBE, 'Kubernetes integration tests are unsupported by this configuration') - def test_kubernetes_executor_config_works(self): - dag_id, run_id = "example_kubernetes_executor", uuid4().hex - run_dag(dag_id, run_id) - - self.assertEquals(get_dag_run_state(dag_id, run_id), DagRunState.RUNNING) - self.assertEquals(dag_final_state(dag_id, run_id, timeout=180), DagRunState.SUCCESS) - - -if __name__ == "__main__": - unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/tests/contrib/executors/test_kubernetes_executor.py ---------------------------------------------------------------------- diff --git a/tests/contrib/executors/test_kubernetes_executor.py b/tests/contrib/executors/test_kubernetes_executor.py index 4c9728e..0a38920 100644 --- a/tests/contrib/executors/test_kubernetes_executor.py +++ b/tests/contrib/executors/test_kubernetes_executor.py @@ -26,7 +26,6 @@ except ImportError: class TestAirflowKubernetesScheduler(unittest.TestCase): - def _gen_random_string(self, str_len): return ''.join([random.choice(string.printable) for _ in range(str_len)]) @@ -36,7 +35,7 @@ class TestAirflowKubernetesScheduler(unittest.TestCase): ("my.dag.id", "my.task.id"), ("MYDAGID", "MYTASKID"), ("my_dag_id", "my_task_id"), - ("mydagid"*200, "my_task_id"*200) + ("mydagid" * 200, "my_task_id" * 200) ] cases.extend([ @@ -53,17 +52,22 @@ class TestAirflowKubernetesScheduler(unittest.TestCase): all(ch.lower() == ch for ch in name) and re.match(regex, name)) - @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed') + @unittest.skipIf(AirflowKubernetesScheduler is None, + 'kubernetes python package is not installed') def test_create_pod_id(self): for dag_id, task_id in self._cases(): pod_name = AirflowKubernetesScheduler._create_pod_id(dag_id, task_id) self.assertTrue(self._is_valid_name(pod_name)) - @unittest.skipIf(AirflowKubernetesScheduler is None, "kubernetes python package is not installed") + @unittest.skipIf(AirflowKubernetesScheduler is None, + "kubernetes python package is not installed") def test_execution_date_serialize_deserialize(self): datetime_obj = datetime.now() - serialized_datetime = AirflowKubernetesScheduler._datetime_to_label_safe_datestring(datetime_obj) - new_datetime_obj = AirflowKubernetesScheduler._label_safe_datestring_to_datetime(serialized_datetime) + serialized_datetime = \ + AirflowKubernetesScheduler._datetime_to_label_safe_datestring( + datetime_obj) + new_datetime_obj = AirflowKubernetesScheduler._label_safe_datestring_to_datetime( + serialized_datetime) self.assertEquals(datetime_obj, new_datetime_obj) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/tests/contrib/kubernetes/test_kubernetes_job.py ---------------------------------------------------------------------- diff --git a/tests/contrib/kubernetes/test_kubernetes_job.py b/tests/contrib/kubernetes/test_kubernetes_job.py deleted file mode 100644 index 9921696..0000000 --- a/tests/contrib/kubernetes/test_kubernetes_job.py +++ /dev/null @@ -1,12 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/tests/contrib/kubernetes/test_kubernetes_job_launcher.py ---------------------------------------------------------------------- diff --git a/tests/contrib/kubernetes/test_kubernetes_job_launcher.py b/tests/contrib/kubernetes/test_kubernetes_job_launcher.py deleted file mode 100644 index 3353390..0000000 --- a/tests/contrib/kubernetes/test_kubernetes_job_launcher.py +++ /dev/null @@ -1,59 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and - -import unittest -from airflow.contrib.kubernetes.kubernetes_job_builder import KubernetesJobBuilder -from airflow.contrib.kubernetes.kubernetes_request_factory import SimpleJobRequestFactory -from airflow import configuration -import json - -secrets = {} -labels = {} -base_job = {'kind': 'Job', - 'spec': { - 'template': { - 'spec': { - 'restartPolicy': 'Never', - 'volumes': [{'hostPath': {'path': '/tmp/dags'}, 'name': 'shared-data'}], - 'containers': [ - {'command': ['try', 'this', 'first'], - 'image': 'foo.image', 'volumeMounts': [ - { - 'mountPath': '/usr/local/airflow/dags', - 'name': 'shared-data'} - ], - 'name': 'base', - 'imagePullPolicy': 'Never'} - ] - }, - 'metadata': {'name': 'name'} - } - }, - 'apiVersion': 'batch/v1', 'metadata': {'name': None} - } - - -class KubernetesJobRequestTest(unittest.TestCase): - job_to_load = None - job_req_factory = SimpleJobRequestFactory() - - def setUp(self): - configuration.load_test_config() - self.job_to_load = KubernetesJobBuilder( - image='foo.image', - cmds=['try', 'this', 'first'] - ) - - def test_job_creation_with_base_values(self): - base_job_result = self.job_req_factory.create(self.job_to_load) - self.assertEqual(base_job_result, base_job) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/tests/contrib/minikube_tests/integration/__init__.py ---------------------------------------------------------------------- diff --git a/tests/contrib/minikube_tests/integration/__init__.py b/tests/contrib/minikube_tests/integration/__init__.py new file mode 100644 index 0000000..9d7677a --- /dev/null +++ b/tests/contrib/minikube_tests/integration/__init__.py @@ -0,0 +1,13 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/tests/contrib/minikube_tests/integration/airflow_controller.py ---------------------------------------------------------------------- diff --git a/tests/contrib/minikube_tests/integration/airflow_controller.py b/tests/contrib/minikube_tests/integration/airflow_controller.py new file mode 100644 index 0000000..5604652 --- /dev/null +++ b/tests/contrib/minikube_tests/integration/airflow_controller.py @@ -0,0 +1,166 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import subprocess +import time + + +class RunCommandError(Exception): + pass + + +class TimeoutError(Exception): + pass + + +class DagRunState: + SUCCESS = "success" + FAILED = "failed" + RUNNING = "running" + + +def run_command(command): + process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + stdout, stderr = process.communicate() + if process.returncode != 0: + raise RunCommandError( + "Error while running command: {}; Stdout: {}; Stderr: {}".format( + command, stdout, stderr + )) + return stdout, stderr + + +def run_command_in_pod(pod_name, container_name, command): + return run_command("kubectl exec {pod_name} -c {container_name} -- {command}".format( + pod_name=pod_name, container_name=container_name, command=command + )) + + +def get_scheduler_logs(airflow_pod=None): + airflow_pod = airflow_pod or _get_airflow_pod() + + return run_command("kubectl logs {pod_name} scheduler" + .format(pod_name=airflow_pod)) + + +def _unpause_dag(dag_id, airflow_pod=None): + airflow_pod = airflow_pod or _get_airflow_pod() + return run_command_in_pod(airflow_pod, "scheduler", + "airflow unpause {dag_id}".format(dag_id=dag_id)) + + +def run_dag(dag_id, run_id, airflow_pod=None): + airflow_pod = airflow_pod or _get_airflow_pod() + _unpause_dag(dag_id, airflow_pod) + return run_command_in_pod(airflow_pod, "scheduler", + "airflow trigger_dag {dag_id} -r {run_id}".format( + dag_id=dag_id, run_id=run_id + )) + + +def _get_pod_by_grep(grep_phrase): + stdout, stderr = run_command( + "kubectl get pods | grep {grep_phrase} | awk '{{print $1}}'".format( + grep_phrase=grep_phrase + )) + pod_name = stdout.strip() + return pod_name + + +def _get_airflow_pod(): + return _get_pod_by_grep("^airflow") + + +def _get_postgres_pod(): + return _get_pod_by_grep("^postgres") + + +def _parse_state(stdout): + end_line = "(1 row)" + prev_line = None + for line in stdout.split("\n"): + if end_line in line: + return prev_line.strip() + prev_line = line + + raise Exception("Unknown psql output: {}".format(stdout)) + + +def get_dag_run_table(postgres_pod=None): + postgres_pod = postgres_pod or _get_postgres_pod() + stdout, stderr = run_command_in_pod( + postgres_pod, "postgres", + """psql airflow -c "select * from dag_run" """ + ) + return stdout + + +def get_task_instance_table(postgres_pod=None): + postgres_pod = postgres_pod or _get_postgres_pod() + stdout, stderr = run_command_in_pod( + postgres_pod, "postgres", + """psql airflow -c "select * from task_instance" """ + ) + return stdout + + +def get_dag_run_state(dag_id, run_id, postgres_pod=None): + postgres_pod = postgres_pod or _get_postgres_pod() + stdout, stderr = run_command_in_pod( + postgres_pod, "postgres", + """psql airflow -c "select state from dag_run where dag_id='{dag_id}' and + run_id='{run_id}'" """.format( + dag_id=dag_id, run_id=run_id + ) + ) + return _parse_state(stdout) + + +def dag_final_state(dag_id, run_id, postgres_pod=None, poll_interval=1, timeout=120): + postgres_pod = postgres_pod or _get_postgres_pod() + for _ in range(0, timeout / poll_interval): + dag_state = get_dag_run_state(dag_id, run_id, postgres_pod) + if dag_state != DagRunState.RUNNING: + capture_logs_for_failure(dag_state) + return dag_state + time.sleep(poll_interval) + + raise TimeoutError( + "Timed out while waiting for DagRun with dag_id: {} run_id: {}".format(dag_id, + run_id)) + + +def _kill_pod(pod_name): + return run_command("kubectl delete pod {pod_name}".format(pod_name=pod_name)) + + +def kill_scheduler(): + airflow_pod = _get_pod_by_grep("^airflow") + return _kill_pod(airflow_pod) + + +def capture_logs_for_failure(state): + if state != DagRunState.SUCCESS: + stdout, stderr = get_scheduler_logs() + print("stdout:") + for line in stdout.split('\n'): + print(line) + print("stderr:") + for line in stderr.split('\n'): + print(line) + print("dag_run:") + print(get_dag_run_table()) + print("task_instance") + print(get_task_instance_table()) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/tests/contrib/minikube_tests/integration/test_kubernetes_executor_integration.py ---------------------------------------------------------------------- diff --git a/tests/contrib/minikube_tests/integration/test_kubernetes_executor_integration.py b/tests/contrib/minikube_tests/integration/test_kubernetes_executor_integration.py new file mode 100644 index 0000000..602a717 --- /dev/null +++ b/tests/contrib/minikube_tests/integration/test_kubernetes_executor_integration.py @@ -0,0 +1,67 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import unittest +from uuid import uuid4 + +from tests.contrib.minikube_tests.integration.airflow_controller\ + import DagRunState, RunCommandError, \ + dag_final_state, get_dag_run_state, kill_scheduler, run_command, run_dag + +try: + run_command("kubectl get pods") +except RunCommandError: + SKIP_KUBE = True +else: + SKIP_KUBE = False + + +class KubernetesExecutorTest(unittest.TestCase): + @unittest.skipIf(SKIP_KUBE, + 'Kubernetes integration tests are unsupported by this configuration') + def test_kubernetes_executor_dag_runs_successfully(self): + dag_id, run_id = "example_python_operator", uuid4().hex + run_dag(dag_id, run_id) + state = dag_final_state(dag_id, run_id, timeout=120) + self.assertEquals(state, DagRunState.SUCCESS) + + @unittest.skipIf(SKIP_KUBE, + 'Kubernetes integration tests are unsupported by this configuration') + def test_start_dag_then_kill_scheduler_then_ensure_dag_succeeds(self): + dag_id, run_id = "example_python_operator", uuid4().hex + run_dag(dag_id, run_id) + + self.assertEquals(get_dag_run_state(dag_id, run_id), DagRunState.RUNNING) + + time.sleep(10) + + kill_scheduler() + + self.assertEquals(dag_final_state(dag_id, run_id, timeout=180), + DagRunState.SUCCESS) + + @unittest.skipIf(SKIP_KUBE, + 'Kubernetes integration tests are unsupported by this configuration') + def test_kubernetes_executor_config_works(self): + dag_id, run_id = "example_kubernetes_executor", uuid4().hex + run_dag(dag_id, run_id) + + self.assertEquals(get_dag_run_state(dag_id, run_id), DagRunState.RUNNING) + self.assertEquals(dag_final_state(dag_id, run_id, timeout=300), + DagRunState.SUCCESS) + + +if __name__ == "__main__": + unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/tests/core.py ---------------------------------------------------------------------- diff --git a/tests/core.py b/tests/core.py index e4dffcf..24c24fe 100644 --- a/tests/core.py +++ b/tests/core.py @@ -71,7 +71,7 @@ from jinja2 import UndefinedError import six -NUM_EXAMPLE_DAGS = 19 +NUM_EXAMPLE_DAGS = 20 DEV_NULL = '/dev/null' TEST_DAG_FOLDER = os.path.join( os.path.dirname(os.path.realpath(__file__)), 'dags') http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/tests/jobs.py ---------------------------------------------------------------------- diff --git a/tests/jobs.py b/tests/jobs.py index 615ca9a..9eb166b 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -161,7 +161,8 @@ class BackfillJobTest(unittest.TestCase): 'example_trigger_target_dag', 'example_trigger_controller_dag', # tested above 'test_utils', # sleeps forever - 'example_kubernetes_executor' # requires kubernetes cluster + 'example_kubernetes_executor', # requires kubernetes cluster + 'example_kubernetes_operator' # requires kubernetes cluster ] logger = logging.getLogger('BackfillJobTest.test_backfill_examples')
