Repository: incubator-airflow Updated Branches: refs/heads/master 0b6a7000c -> 8fa0bbd56 (forced update)
[AIRFLOW-2460] Users can now use volume mounts and volumes When launching pods using k8s operator Closes #3356 from dimberman/k8s-mounts Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/8fa0bbd5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/8fa0bbd5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/8fa0bbd5 Branch: refs/heads/master Commit: 8fa0bbd56e2a7adbdbbad120ab38f5f126d900bd Parents: 6c19468 Author: Daniel Imberman <[email protected]> Authored: Mon May 14 21:58:39 2018 +0200 Committer: Fokko Driesprong <[email protected]> Committed: Mon May 14 21:59:59 2018 +0200 ---------------------------------------------------------------------- airflow/contrib/kubernetes/pod_generator.py | 43 ++++++++++++++++---- airflow/contrib/kubernetes/volume.py | 33 +++++++++++++++ airflow/contrib/kubernetes/volume_mount.py | 37 +++++++++++++++++ .../operators/kubernetes_pod_operator.py | 24 +++++++++-- docs/kubernetes.rst | 15 ++++++- scripts/ci/kubernetes/docker/Dockerfile | 2 +- scripts/ci/kubernetes/docker/airflow-init.sh | 24 ----------- .../kubernetes/docker/airflow-test-env-init.sh | 25 ++++++++++++ scripts/ci/kubernetes/kube/airflow.yaml | 7 +++- scripts/ci/kubernetes/kube/volumes.yaml | 24 ++++++++++- .../minikube/test_kubernetes_pod_operator.py | 40 +++++++++++++++--- 11 files changed, 230 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8fa0bbd5/airflow/contrib/kubernetes/pod_generator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/pod_generator.py b/airflow/contrib/kubernetes/pod_generator.py index 78d3926..0f9dabd 100644 --- a/airflow/contrib/kubernetes/pod_generator.py +++ b/airflow/contrib/kubernetes/pod_generator.py @@ -19,6 +19,8 @@ import os from airflow.contrib.kubernetes.pod import Pod import uuid +from airflow.contrib.kubernetes.volume_mount import VolumeMount # noqa +from airflow.contrib.kubernetes.volume import Volume # noqa class PodGenerator: @@ -64,16 +66,30 @@ class PodGenerator: def _get_init_containers(self): return self.init_containers - def add_volume(self, name): + def add_volume(self, volume): + """ + Args: + volume (Volume): + """ + + self._add_volume(name=volume.name, configs=volume.configs) + + def _add_volume(self, name, configs): """ Args: name (str): + configs (dict): Configurations for the volume. + Could be used to define PersistentVolumeClaim, ConfigMap, etc... Returns: """ - self.volumes.append({'name': name}) + volume_map = {'name': name} + for k, v in configs.items(): + volume_map[k] = v + + self.volumes.append(volume_map) def add_volume_with_configmap(self, name, config_map): self.volumes.append( @@ -83,11 +99,11 @@ class PodGenerator: } ) - def add_mount(self, - name, - mount_path, - sub_path, - read_only): + def _add_mount(self, + name, + mount_path, + sub_path, + read_only): """ Args: @@ -107,6 +123,19 @@ class PodGenerator: 'readOnly': read_only }) + def add_mount(self, + volume_mount): + """ + Args: + volume_mount (VolumeMount): + """ + self._add_mount( + name=volume_mount.name, + mount_path=volume_mount.mount_path, + sub_path=volume_mount.sub_path, + read_only=volume_mount.read_only + ) + def _get_volumes_and_mounts(self): return self.volumes, self.volume_mounts http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8fa0bbd5/airflow/contrib/kubernetes/volume.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/volume.py b/airflow/contrib/kubernetes/volume.py new file mode 100644 index 0000000..d5b4f60 --- /dev/null +++ b/airflow/contrib/kubernetes/volume.py @@ -0,0 +1,33 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +class Volume: + """Defines Kubernetes Volume""" + + def __init__(self, name, configs): + """ Adds Kubernetes Volume to pod. allows pod to access features like ConfigMaps + and Persistent Volumes + :param name: the name of the volume mount + :type: name: str + :param configs: dictionary of any features needed for volume. + We purposely keep this vague since there are multiple volume types with changing + configs. + :type: configs: dict + """ + self.name = name + self.configs = configs http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8fa0bbd5/airflow/contrib/kubernetes/volume_mount.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/volume_mount.py b/airflow/contrib/kubernetes/volume_mount.py new file mode 100644 index 0000000..4bdf09c --- /dev/null +++ b/airflow/contrib/kubernetes/volume_mount.py @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +class VolumeMount: + """Defines Kubernetes Volume Mount""" + + def __init__(self, name, mount_path, sub_path, read_only): + """Initialize a Kubernetes Volume Mount. Used to mount pod level volumes to + running container. + :param name: the name of the volume mount + :type name: str + :param mount_path: + :type mount_path: str + :param sub_path: subpath within the volume mount + :type sub_path: str + :param read_only: whether to access pod with read-only mode + :type read_only: bool + """ + self.name = name + self.mount_path = mount_path + self.sub_path = sub_path + self.read_only = read_only http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8fa0bbd5/airflow/contrib/operators/kubernetes_pod_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py index 0f9c943..ffdc2cf 100644 --- a/airflow/contrib/operators/kubernetes_pod_operator.py +++ b/airflow/contrib/operators/kubernetes_pod_operator.py @@ -21,6 +21,9 @@ from airflow.utils.decorators import apply_defaults from airflow.contrib.kubernetes import kube_client, pod_generator, pod_launcher from airflow.contrib.kubernetes.pod import Resources from airflow.utils.state import State +from airflow.contrib.kubernetes.volume_mount import VolumeMount # noqa +from airflow.contrib.kubernetes.volume import Volume # noqa +from airflow.contrib.kubernetes.secret import Secret # noqa template_fields = ('templates_dict',) template_ext = tuple() @@ -37,10 +40,14 @@ class KubernetesPodOperator(BaseOperator): :type: namespace: str :param cmds: entrypoint of the container. The docker images's entrypoint is used if this is not provide. - :type cmds: list + :type cmds: list of str :param arguments: arguments of to the entrypoint. The docker image's CMD is used if this is not provided. - :type arguments: list + :type arguments: list of str + :param volume_mounts: volumeMounts for launched pod + :type volume_mounts: list of VolumeMount + :param volumes: volumes for launched pod. Includes ConfigMaps and PersistentVolumes + :type volumes: list of Volume :param labels: labels to apply to the Pod :type labels: dict :param startup_timeout_seconds: timeout in seconds to startup the pod @@ -52,7 +59,7 @@ class KubernetesPodOperator(BaseOperator): :type env_vars: dict :param secrets: Kubernetes secrets to inject in the container, They can be exposed as environment vars or files in a volume. - :type secrets: list + :type secrets: list of Secret :param in_cluster: run kubernetes client with in_cluster configuration :type in_cluster: bool :param get_logs: get the stdout of the container as logs of the tasks @@ -65,13 +72,18 @@ class KubernetesPodOperator(BaseOperator): client = kube_client.get_kube_client(in_cluster=self.in_cluster) gen = pod_generator.PodGenerator() + for mount in self.volume_mounts: + gen.add_mount(mount) + for volume in self.volumes: + gen.add_volume(volume) + pod = gen.make_pod( namespace=self.namespace, image=self.image, pod_id=self.name, cmds=self.cmds, arguments=self.arguments, - labels=self.labels + labels=self.labels, ) pod.secrets = self.secrets @@ -99,6 +111,8 @@ class KubernetesPodOperator(BaseOperator): name, cmds=None, arguments=None, + volume_mounts=None, + volumes=None, env_vars=None, secrets=None, in_cluster=False, @@ -119,6 +133,8 @@ class KubernetesPodOperator(BaseOperator): self.startup_timeout_seconds = startup_timeout_seconds self.name = name self.env_vars = env_vars or {} + self.volume_mounts = volume_mounts or [] + self.volumes = volumes or [] self.secrets = secrets or [] self.in_cluster = in_cluster self.get_logs = get_logs http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8fa0bbd5/docs/kubernetes.rst ---------------------------------------------------------------------- diff --git a/docs/kubernetes.rst b/docs/kubernetes.rst index 4b83fc0..cb0ad87 100644 --- a/docs/kubernetes.rst +++ b/docs/kubernetes.rst @@ -19,13 +19,26 @@ Kubernetes Operator secret_file = Secret('volume', '/etc/sql_conn', 'airflow-secrets', 'sql_alchemy_conn') secret_env = Secret('env', 'SQL_CONN', 'airflow-secrets', 'sql_alchemy_conn') - + volume_mount = VolumeMount('test-volume', + mount_path='/root/mount_file', + sub_path=None, + read_only=True) + + volume_config= { + 'persistentVolumeClaim': + { + 'claimName': 'test-volume' + } + } + volume = Volume(name='test-volume', configs=volume_config) k = KubernetesPodOperator(namespace='default', image="ubuntu:16.04", cmds=["bash", "-cx"], arguments=["echo", "10"], labels={"foo": "bar"}, secrets=[secret_file,secret_env] + volume=[volume], + volume_mounts=[volume_mount] name="test", task_id="task" ) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8fa0bbd5/scripts/ci/kubernetes/docker/Dockerfile ---------------------------------------------------------------------- diff --git a/scripts/ci/kubernetes/docker/Dockerfile b/scripts/ci/kubernetes/docker/Dockerfile index 6d2c62d..498c47b 100644 --- a/scripts/ci/kubernetes/docker/Dockerfile +++ b/scripts/ci/kubernetes/docker/Dockerfile @@ -46,7 +46,7 @@ RUN pip install -U setuptools && \ COPY airflow.tar.gz /tmp/airflow.tar.gz RUN pip install /tmp/airflow.tar.gz -COPY airflow-init.sh /tmp/airflow-init.sh +COPY airflow-test-env-init.sh /tmp/airflow-test-env-init.sh COPY bootstrap.sh /bootstrap.sh RUN chmod +x /bootstrap.sh http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8fa0bbd5/scripts/ci/kubernetes/docker/airflow-init.sh ---------------------------------------------------------------------- diff --git a/scripts/ci/kubernetes/docker/airflow-init.sh b/scripts/ci/kubernetes/docker/airflow-init.sh deleted file mode 100755 index cbd1c98..0000000 --- a/scripts/ci/kubernetes/docker/airflow-init.sh +++ /dev/null @@ -1,24 +0,0 @@ -#!/usr/bin/env bash -# -# Licensed to the Apache Software Foundation (ASF) under one * -# or more contributor license agreements. See the NOTICE file * -# distributed with this work for additional information * -# regarding copyright ownership. The ASF licenses this file * -# to you under the Apache License, Version 2.0 (the * -# "License"); you may not use this file except in compliance * -# with the License. You may obtain a copy of the License at * -# * -# http://www.apache.org/licenses/LICENSE-2.0 * -# * -# Unless required by applicable law or agreed to in writing, * -# software distributed under the License is distributed on an * -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * -# KIND, either express or implied. See the License for the * -# specific language governing permissions and limitations * -# under the License. - -cd /usr/local/lib/python2.7/dist-packages/airflow && \ -cp -R example_dags/* /root/airflow/dags/ && \ -airflow initdb && \ -alembic upgrade heads && \ -(airflow create_user -u airflow -l airflow -f jon -e [email protected] -r Admin -p airflow || true) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8fa0bbd5/scripts/ci/kubernetes/docker/airflow-test-env-init.sh ---------------------------------------------------------------------- diff --git a/scripts/ci/kubernetes/docker/airflow-test-env-init.sh b/scripts/ci/kubernetes/docker/airflow-test-env-init.sh new file mode 100755 index 0000000..aa86da7 --- /dev/null +++ b/scripts/ci/kubernetes/docker/airflow-test-env-init.sh @@ -0,0 +1,25 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one * +# or more contributor license agreements. See the NOTICE file * +# distributed with this work for additional information * +# regarding copyright ownership. The ASF licenses this file * +# to you under the Apache License, Version 2.0 (the * +# "License"); you may not use this file except in compliance * +# with the License. You may obtain a copy of the License at * +# * +# http://www.apache.org/licenses/LICENSE-2.0 * +# * +# Unless required by applicable law or agreed to in writing, * +# software distributed under the License is distributed on an * +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * +# KIND, either express or implied. See the License for the * +# specific language governing permissions and limitations * +# under the License. + +cd /usr/local/lib/python2.7/dist-packages/airflow && \ +cp -R example_dags/* /root/airflow/dags/ && \ +airflow initdb && \ +alembic upgrade heads && \ +(airflow create_user -u airflow -l airflow -f jon -e [email protected] -r Admin -p airflow || true) && \ +echo "retrieved from mount" > /root/test_volume/test.txt http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8fa0bbd5/scripts/ci/kubernetes/kube/airflow.yaml ---------------------------------------------------------------------- diff --git a/scripts/ci/kubernetes/kube/airflow.yaml b/scripts/ci/kubernetes/kube/airflow.yaml index 09bbcd8..4f451ba 100644 --- a/scripts/ci/kubernetes/kube/airflow.yaml +++ b/scripts/ci/kubernetes/kube/airflow.yaml @@ -51,6 +51,8 @@ spec: subPath: airflow.cfg - name: airflow-dags mountPath: /root/airflow/dags + - name: test-volume + mountPath: /root/test_volume env: - name: SQL_ALCHEMY_CONN valueFrom: @@ -61,7 +63,7 @@ spec: - "bash" args: - "-cx" - - "./tmp/airflow-init.sh" + - "./tmp/airflow-test-env-init.sh" containers: - name: webserver image: airflow @@ -128,6 +130,9 @@ spec: - name: airflow-dags persistentVolumeClaim: claimName: airflow-dags + - name: test-volume + persistentVolumeClaim: + claimName: test-volume - name: airflow-logs persistentVolumeClaim: claimName: airflow-logs http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8fa0bbd5/scripts/ci/kubernetes/kube/volumes.yaml ---------------------------------------------------------------------- diff --git a/scripts/ci/kubernetes/kube/volumes.yaml b/scripts/ci/kubernetes/kube/volumes.yaml index 58ad368..b5488e7 100644 --- a/scripts/ci/kubernetes/kube/volumes.yaml +++ b/scripts/ci/kubernetes/kube/volumes.yaml @@ -62,4 +62,26 @@ spec: resources: requests: storage: 2Gi - +--- +kind: PersistentVolume +apiVersion: v1 +metadata: + name: test-volume +spec: + accessModes: + - ReadWriteOnce + capacity: + storage: 2Gi + hostPath: + path: /airflow-dags/ +--- +kind: PersistentVolumeClaim +apiVersion: v1 +metadata: + name: test-volume +spec: + accessModes: + - ReadWriteMany + resources: + requests: + storage: 2Gi http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8fa0bbd5/tests/contrib/minikube/test_kubernetes_pod_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/minikube/test_kubernetes_pod_operator.py b/tests/contrib/minikube/test_kubernetes_pod_operator.py index 081fc04..8d888a3 100644 --- a/tests/contrib/minikube/test_kubernetes_pod_operator.py +++ b/tests/contrib/minikube/test_kubernetes_pod_operator.py @@ -21,6 +21,8 @@ from airflow import AirflowException from subprocess import check_call import mock from airflow.contrib.kubernetes.pod_launcher import PodLauncher +from airflow.contrib.kubernetes.volume_mount import VolumeMount +from airflow.contrib.kubernetes.volume import Volume try: check_call(["kubectl", "get", "pods"]) @@ -37,7 +39,7 @@ class KubernetesPodOperatorTest(unittest.TestCase): namespace='default', image="ubuntu:16.04", cmds=["bash", "-cx"], - arguments=["echo", "10"], + arguments=["echo 10"], labels={"foo": "bar"}, name="test", task_id="task" @@ -50,14 +52,42 @@ class KubernetesPodOperatorTest(unittest.TestCase): namespace='default', image="ubuntu:16.04", cmds=["bash", "-cx"], - arguments=["echo", "10"], + arguments=["echo 10"], labels={"foo": "bar"}, name="test", task_id="task", get_logs=True ) k.execute(None) - mock_logger.info.assert_any_call(b"+ echo\n") + mock_logger.info.assert_any_call(b"+ echo 10\n") + + def test_volume_mount(self): + with mock.patch.object(PodLauncher, 'log') as mock_logger: + volume_mount = VolumeMount('test-volume', + mount_path='/root/mount_file', + sub_path=None, + read_only=True) + + volume_config = { + 'persistentVolumeClaim': + { + 'claimName': 'test-volume' + } + } + volume = Volume(name='test-volume', configs=volume_config) + k = KubernetesPodOperator( + namespace='default', + image="ubuntu:16.04", + cmds=["bash", "-cx"], + arguments=["cat /root/mount_file/test.txt"], + labels={"foo": "bar"}, + volume_mounts=[volume_mount], + volumes=[volume], + name="test", + task_id="task" + ) + k.execute(None) + mock_logger.info.assert_any_call(b"retrieved from mount\n") def test_faulty_image(self): bad_image_name = "foobar" @@ -65,7 +95,7 @@ class KubernetesPodOperatorTest(unittest.TestCase): namespace='default', image=bad_image_name, cmds=["bash", "-cx"], - arguments=["echo", "10"], + arguments=["echo 10"], labels={"foo": "bar"}, name="test", task_id="task", @@ -85,7 +115,7 @@ class KubernetesPodOperatorTest(unittest.TestCase): namespace='default', image="ubuntu:16.04", cmds=["bash", "-cx"], - arguments=[bad_internal_command, "10"], + arguments=[bad_internal_command + " 10"], labels={"foo": "bar"}, name="test", task_id="task"
