This is an automated email from the ASF dual-hosted git repository. jbonofre pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-liminal.git
commit a32d4eb722511db7b2f29228fae8e08ca5de8e81 Author: aviemzur <[email protected]> AuthorDate: Thu Mar 12 10:08:43 2020 +0200 Add build module --- README.md | 4 ++ rainbow/docker/__init__.py | 1 - rainbow/docker/python/Dockerfile | 19 +++++++ rainbow/docker/{ => python}/__init__.py | 1 - rainbow/docker/python/python_image.py | 61 ++++++++++++++++++++++ rainbow/{ => runners/airflow}/build/__init__.py | 0 .../rainbow_dags.py => build/build_rainbow.py} | 18 +++---- .../airflow/build/python/container-setup.sh | 9 ++++ .../airflow/build/python/container-teardown.sh | 6 +++ rainbow/runners/airflow/dag/rainbow_dags.py | 10 ++-- rainbow/runners/airflow/model/task.py | 4 +- .../airflow/tasks/create_cloudformation_stack.py | 2 +- .../airflow/tasks/delete_cloudformation_stack.py | 2 +- rainbow/runners/airflow/tasks/job_end.py | 2 +- rainbow/runners/airflow/tasks/job_start.py | 2 +- rainbow/runners/airflow/tasks/python.py | 13 +++-- rainbow/runners/airflow/tasks/spark.py | 2 +- rainbow/runners/airflow/tasks/sql.py | 2 +- requirements.txt | 3 ++ tests/runners/airflow/dag/test_rainbow_dags.py | 5 ++ .../runners/airflow/tasks/hello_world}/__init__.py | 1 - .../airflow/tasks/hello_world/hello_world.py | 2 +- tests/runners/airflow/tasks/test_python.py | 45 +++++++++++++--- 23 files changed, 175 insertions(+), 39 deletions(-) diff --git a/README.md b/README.md index 7168564..d8b9a23 100644 --- a/README.md +++ b/README.md @@ -1 +1,5 @@ # rainbow + +``` +ln -s "/Applications/Docker.app/Contents//Resources/bin/docker-credential-desktop" "/usr/local/bin/docker-credential-desktop" +``` \ No newline at end of file diff --git a/rainbow/docker/__init__.py b/rainbow/docker/__init__.py index 8bb1ec2..217e5db 100644 --- a/rainbow/docker/__init__.py +++ b/rainbow/docker/__init__.py @@ -15,4 +15,3 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -# TODO: docker diff --git a/rainbow/docker/python/Dockerfile b/rainbow/docker/python/Dockerfile new file mode 100644 index 0000000..d4e3ed2 --- /dev/null +++ b/rainbow/docker/python/Dockerfile @@ -0,0 +1,19 @@ +# Use an official Python runtime as a parent image +FROM python:3.7-slim + +# Install aptitude build-essential +#RUN apt-get install -y --reinstall build-essential + +# Set the working directory to /app +WORKDIR /app + +# Order of operations is important here for docker's caching & incremental build performance. ! +# Be careful when changing this code. ! + +# Install any needed packages specified in requirements.txt +COPY ./requirements.txt /app +RUN pip install -r requirements.txt + +# Copy the current directory contents into the container at /app +RUN echo "Copying source code.." +COPY . /app diff --git a/rainbow/docker/__init__.py b/rainbow/docker/python/__init__.py similarity index 98% copy from rainbow/docker/__init__.py copy to rainbow/docker/python/__init__.py index 8bb1ec2..217e5db 100644 --- a/rainbow/docker/__init__.py +++ b/rainbow/docker/python/__init__.py @@ -15,4 +15,3 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -# TODO: docker diff --git a/rainbow/docker/python/python_image.py b/rainbow/docker/python/python_image.py new file mode 100644 index 0000000..d66dfbe --- /dev/null +++ b/rainbow/docker/python/python_image.py @@ -0,0 +1,61 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import os +import shutil +import tempfile +import docker + + +def build(source_path, tag, extra_files=None): + if extra_files is None: + extra_files = [] + + print(f'Building image {tag}') + + temp_dir = tempfile.mkdtemp() + # Delete dir for shutil.copytree to work + os.rmdir(temp_dir) + + __copy_source(source_path, temp_dir) + + requirements_file_path = os.path.join(temp_dir, 'requirements.txt') + if not os.path.exists(requirements_file_path): + with open(requirements_file_path, 'w'): + pass + + dockerfile_path = os.path.join(os.path.dirname(__file__), 'Dockerfile') + + for file in extra_files + [dockerfile_path]: + __copy_file(file, temp_dir) + + print(temp_dir, os.listdir(temp_dir)) + + docker_client = docker.from_env() + docker_client.images.build(path=temp_dir, tag=tag) + + docker_client.close() + + shutil.rmtree(temp_dir) + + +def __copy_source(source_path, destination_path): + shutil.copytree(source_path, destination_path) + + +def __copy_file(source_file_path, destination_file_path): + shutil.copy2(source_file_path, destination_file_path) diff --git a/rainbow/build/__init__.py b/rainbow/runners/airflow/build/__init__.py similarity index 100% rename from rainbow/build/__init__.py rename to rainbow/runners/airflow/build/__init__.py diff --git a/rainbow/runners/airflow/dag/rainbow_dags.py b/rainbow/runners/airflow/build/build_rainbow.py similarity index 86% copy from rainbow/runners/airflow/dag/rainbow_dags.py copy to rainbow/runners/airflow/build/build_rainbow.py index 6bdf66b..222ea5f 100644 --- a/rainbow/runners/airflow/dag/rainbow_dags.py +++ b/rainbow/runners/airflow/build/build_rainbow.py @@ -26,7 +26,10 @@ from airflow import DAG from rainbow.runners.airflow.tasks.python import PythonTask -def register_dags(path): +def build_rainbow(path): + """ + TODO: doc for build_rainbow + """ files = [] for r, d, f in os.walk(path): for file in f: @@ -38,7 +41,7 @@ def register_dags(path): dags = [] for config_file in files: - print(f'Registering DAG for file: f{config_file}') + print(f'Building artifacts file: f{config_file}') with open(config_file) as stream: # TODO: validate config @@ -64,12 +67,7 @@ def register_dags(path): task_instance = get_task_class(task_type)( dag, pipeline['pipeline'], parent if parent else None, task, 'all_success' ) - parent = task_instance.apply_task_to_dag() - - print(f'{parent}{{{task_type}}}') - - dags.append(dag) - return dags + parent = task_instance.build() # TODO: task class registry @@ -83,6 +81,4 @@ def get_task_class(task_type): if __name__ == '__main__': - # TODO: configurable yaml dir - path = 'tests/runners/airflow/dag/rainbow' - register_dags(path) + register_dags('') diff --git a/rainbow/runners/airflow/build/python/container-setup.sh b/rainbow/runners/airflow/build/python/container-setup.sh new file mode 100755 index 0000000..6e8d242 --- /dev/null +++ b/rainbow/runners/airflow/build/python/container-setup.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +echo """$RAINBOW_INPUT""" > rainbow_input.json + +AIRFLOW_RETURN_FILE=/airflow/xcom/return.json + +mkdir -p /airflow/xcom/ + +echo {} > $AIRFLOW_RETURN_FILE diff --git a/rainbow/runners/airflow/build/python/container-teardown.sh b/rainbow/runners/airflow/build/python/container-teardown.sh new file mode 100755 index 0000000..1219407 --- /dev/null +++ b/rainbow/runners/airflow/build/python/container-teardown.sh @@ -0,0 +1,6 @@ +#!/bin/bash + +USER_CONFIG_OUTPUT_FILE=$1 +if [ "$USER_CONFIG_OUTPUT_FILE" != "" ]; then + cp ${USER_CONFIG_OUTPUT_FILE} /airflow/xcom/return.json +fi diff --git a/rainbow/runners/airflow/dag/rainbow_dags.py b/rainbow/runners/airflow/dag/rainbow_dags.py index 6bdf66b..c564737 100644 --- a/rainbow/runners/airflow/dag/rainbow_dags.py +++ b/rainbow/runners/airflow/dag/rainbow_dags.py @@ -23,10 +23,13 @@ from datetime import datetime import yaml from airflow import DAG -from rainbow.runners.airflow.tasks.python import PythonTask +from rainbow.runners.airflow.build import build_rainbow def register_dags(path): + """ + TODO: doc for register_dags + """ files = [] for r, d, f in os.walk(path): for file in f: @@ -72,10 +75,7 @@ def register_dags(path): return dags -# TODO: task class registry -task_classes = { - 'python': PythonTask -} +task_classes = build_rainbow.task_classes def get_task_class(task_type): diff --git a/rainbow/runners/airflow/model/task.py b/rainbow/runners/airflow/model/task.py index 2650aa1..25656ee 100644 --- a/rainbow/runners/airflow/model/task.py +++ b/rainbow/runners/airflow/model/task.py @@ -32,9 +32,9 @@ class Task: self.config = config self.trigger_rule = trigger_rule - def setup(self): + def build(self): """ - Setup method for task. + Build task's artifacts. """ raise NotImplementedError() diff --git a/rainbow/runners/airflow/tasks/create_cloudformation_stack.py b/rainbow/runners/airflow/tasks/create_cloudformation_stack.py index 9304167..c478dc7 100644 --- a/rainbow/runners/airflow/tasks/create_cloudformation_stack.py +++ b/rainbow/runners/airflow/tasks/create_cloudformation_stack.py @@ -27,7 +27,7 @@ class CreateCloudFormationStackTask(task.Task): def __init__(self, dag, pipeline_name, parent, config, trigger_rule): super().__init__(dag, pipeline_name, parent, config, trigger_rule) - def setup(self): + def build(self): pass def apply_task_to_dag(self): diff --git a/rainbow/runners/airflow/tasks/delete_cloudformation_stack.py b/rainbow/runners/airflow/tasks/delete_cloudformation_stack.py index 66d5783..d172284 100644 --- a/rainbow/runners/airflow/tasks/delete_cloudformation_stack.py +++ b/rainbow/runners/airflow/tasks/delete_cloudformation_stack.py @@ -27,7 +27,7 @@ class DeleteCloudFormationStackTask(task.Task): def __init__(self, dag, pipeline_name, parent, config, trigger_rule): super().__init__(dag, pipeline_name, parent, config, trigger_rule) - def setup(self): + def build(self): pass def apply_task_to_dag(self): diff --git a/rainbow/runners/airflow/tasks/job_end.py b/rainbow/runners/airflow/tasks/job_end.py index b3244c4..a6c5ef2 100644 --- a/rainbow/runners/airflow/tasks/job_end.py +++ b/rainbow/runners/airflow/tasks/job_end.py @@ -27,7 +27,7 @@ class JobEndTask(task.Task): def __init__(self, dag, pipeline_name, parent, config, trigger_rule): super().__init__(dag, pipeline_name, parent, config, trigger_rule) - def setup(self): + def build(self): pass def apply_task_to_dag(self): diff --git a/rainbow/runners/airflow/tasks/job_start.py b/rainbow/runners/airflow/tasks/job_start.py index f794e09..7338363 100644 --- a/rainbow/runners/airflow/tasks/job_start.py +++ b/rainbow/runners/airflow/tasks/job_start.py @@ -27,7 +27,7 @@ class JobStartTask(task.Task): def __init__(self, dag, pipeline_name, parent, config, trigger_rule): super().__init__(dag, pipeline_name, parent, config, trigger_rule) - def setup(self): + def build(self): pass def apply_task_to_dag(self): diff --git a/rainbow/runners/airflow/tasks/python.py b/rainbow/runners/airflow/tasks/python.py index 983ce0c..8317854 100644 --- a/rainbow/runners/airflow/tasks/python.py +++ b/rainbow/runners/airflow/tasks/python.py @@ -16,10 +16,12 @@ # specific language governing permissions and limitations # under the License. import json +import os from airflow.models import Variable from airflow.operators.dummy_operator import DummyOperator +from rainbow.docker.python import python_image from rainbow.runners.airflow.model import task from rainbow.runners.airflow.operators.kubernetes_pod_operator import \ ConfigurableKubernetesPodOperator, \ @@ -45,9 +47,14 @@ class PythonTask(task.Task): self.config_task_id = self.task_name + '_input' self.executors = self.__executors() - def setup(self): - # TODO: build docker image if needed. - pass + def build(self): + if 'source' in self.config: + script_dir = os.path.dirname(__file__) + + python_image.build(self.config['source'], self.image, [ + os.path.join(script_dir, '../build/python/container-setup.sh'), + os.path.join(script_dir, '../build/python/container-teardown.sh') + ]) def apply_task_to_dag(self): diff --git a/rainbow/runners/airflow/tasks/spark.py b/rainbow/runners/airflow/tasks/spark.py index ebae64e..8846f97 100644 --- a/rainbow/runners/airflow/tasks/spark.py +++ b/rainbow/runners/airflow/tasks/spark.py @@ -27,7 +27,7 @@ class SparkTask(task.Task): def __init__(self, dag, pipeline_name, parent, config, trigger_rule): super().__init__(dag, pipeline_name, parent, config, trigger_rule) - def setup(self): + def build(self): pass def apply_task_to_dag(self): diff --git a/rainbow/runners/airflow/tasks/sql.py b/rainbow/runners/airflow/tasks/sql.py index 6dfc0f1..23458a9 100644 --- a/rainbow/runners/airflow/tasks/sql.py +++ b/rainbow/runners/airflow/tasks/sql.py @@ -27,7 +27,7 @@ class SparkTask(task.Task): def __init__(self, dag, pipeline_name, parent, config, trigger_rule): super().__init__(dag, pipeline_name, parent, config, trigger_rule) - def setup(self): + def build(self): pass def apply_task_to_dag(self): diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..f22c0a7 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +docker:4.2.0 +apache-airflow:1.10.9 +docker-pycreds:0.4.0 diff --git a/tests/runners/airflow/dag/test_rainbow_dags.py b/tests/runners/airflow/dag/test_rainbow_dags.py index 41bea09..c66e3bc 100644 --- a/tests/runners/airflow/dag/test_rainbow_dags.py +++ b/tests/runners/airflow/dag/test_rainbow_dags.py @@ -1,6 +1,7 @@ from unittest import TestCase from rainbow.runners.airflow.dag import rainbow_dags +import unittest class Test(TestCase): @@ -9,3 +10,7 @@ class Test(TestCase): self.assertEqual(len(dags), 1) # TODO: elaborate test pass + + +if __name__ == '__main__': + unittest.main() diff --git a/rainbow/docker/__init__.py b/tests/runners/airflow/tasks/hello_world/__init__.py similarity index 98% copy from rainbow/docker/__init__.py copy to tests/runners/airflow/tasks/hello_world/__init__.py index 8bb1ec2..217e5db 100644 --- a/rainbow/docker/__init__.py +++ b/tests/runners/airflow/tasks/hello_world/__init__.py @@ -15,4 +15,3 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -# TODO: docker diff --git a/rainbow/docker/__init__.py b/tests/runners/airflow/tasks/hello_world/hello_world.py similarity index 97% copy from rainbow/docker/__init__.py copy to tests/runners/airflow/tasks/hello_world/hello_world.py index 8bb1ec2..9b87c05 100644 --- a/rainbow/docker/__init__.py +++ b/tests/runners/airflow/tasks/hello_world/hello_world.py @@ -15,4 +15,4 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -# TODO: docker +print('Hello world!') diff --git a/tests/runners/airflow/tasks/test_python.py b/tests/runners/airflow/tasks/test_python.py index 4f5808b..4bbbe9c 100644 --- a/tests/runners/airflow/tasks/test_python.py +++ b/tests/runners/airflow/tasks/test_python.py @@ -16,8 +16,11 @@ # specific language governing permissions and limitations # under the License. +import unittest from unittest import TestCase +import docker + from rainbow.runners.airflow.operators.kubernetes_pod_operator import \ ConfigurableKubernetesPodOperator from rainbow.runners.airflow.tasks import python @@ -25,20 +28,14 @@ from tests.util import dag_test_utils class TestPythonTask(TestCase): + def test_apply_task_to_dag(self): # TODO: elaborate tests dag = dag_test_utils.create_dag() task_id = 'my_task' - config = { - 'task': task_id, - 'cmd': 'foo bar', - 'image': 'my_image', - 'input_type': 'my_input_type', - 'input_path': 'my_input', - 'output_path': '/my_output.json' - } + config = self.__create_conf(task_id) task0 = python.PythonTask(dag, 'my_pipeline', None, config, 'all_success') task0.apply_task_to_dag() @@ -48,3 +45,35 @@ class TestPythonTask(TestCase): self.assertIsInstance(dag_task0, ConfigurableKubernetesPodOperator) self.assertEqual(dag_task0.task_id, task_id) + + def test_build(self): + config = self.__create_conf('my_task') + + task0 = python.PythonTask(None, None, None, config, None) + task0.build() + + # TODO: elaborate test of image, validate input/output + image_name = config['image'] + + docker_client = docker.from_env() + docker_client.images.get(image_name) + container_log = docker_client.containers.run(image_name, "python hello_world.py") + docker_client.close() + + self.assertEqual("b'Hello world!\\n'", str(container_log)) + + @staticmethod + def __create_conf(task_id): + return { + 'task': task_id, + 'cmd': 'foo bar', + 'image': 'my_image', + 'source': 'tests/runners/airflow/tasks/hello_world', + 'input_type': 'my_input_type', + 'input_path': 'my_input', + 'output_path': '/my_output.json' + } + + +if __name__ == '__main__': + unittest.main()
