This is an automated email from the ASF dual-hosted git repository. jbonofre pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-liminal.git
commit 0817e973ec27f9a25a8cacc464b0a043d8d47428 Author: aviemzur <[email protected]> AuthorDate: Thu Mar 12 11:13:58 2020 +0200 Refactor build --- rainbow/{runners/airflow => }/build/__init__.py | 0 rainbow/build/build_rainbow.py | 57 +++++++++++++++ .../airflow => }/build/python/container-setup.sh | 0 .../build/python/container-teardown.sh | 0 rainbow/core/__init__.py | 1 - .../hello_world => rainbow/core/util}/__init__.py | 0 rainbow/core/{__init__.py => util/files_util.py} | 12 +++- rainbow/docker/python/python_image.py | 61 +++++++++------- rainbow/runners/airflow/build/build_rainbow.py | 84 ---------------------- rainbow/runners/airflow/dag/rainbow_dags.py | 23 +++--- rainbow/runners/airflow/model/task.py | 6 -- .../airflow/tasks/create_cloudformation_stack.py | 3 - .../airflow/tasks/delete_cloudformation_stack.py | 3 - rainbow/runners/airflow/tasks/job_end.py | 3 - rainbow/runners/airflow/tasks/job_start.py | 3 - rainbow/runners/airflow/tasks/python.py | 9 --- rainbow/runners/airflow/tasks/spark.py | 3 - rainbow/runners/airflow/tasks/sql.py | 3 - requirements.txt | 2 + .../{tasks/hello_world => build}/__init__.py | 0 .../hello_world => build/python}/__init__.py | 0 .../airflow/build/python/test_python_image.py | 26 ++++--- tests/runners/airflow/build/test_build_rainbow.py | 8 +++ tests/runners/airflow/dag/test_rainbow_dags.py | 2 +- .../{tasks/hello_world => rainbow}/__init__.py | 0 .../{tasks => rainbow}/hello_world/__init__.py | 0 .../{tasks => rainbow}/hello_world/hello_world.py | 0 .../runners/airflow/{dag => }/rainbow/rainbow.yml | 24 +++++-- tests/runners/airflow/tasks/test_python.py | 18 +---- 29 files changed, 161 insertions(+), 190 deletions(-) diff --git a/rainbow/runners/airflow/build/__init__.py b/rainbow/build/__init__.py similarity index 100% rename from rainbow/runners/airflow/build/__init__.py rename to rainbow/build/__init__.py diff --git a/rainbow/build/build_rainbow.py b/rainbow/build/build_rainbow.py new file mode 100644 index 0000000..280b862 --- /dev/null +++ b/rainbow/build/build_rainbow.py @@ -0,0 +1,57 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import os + +import yaml + +from rainbow.core.util import files_util +from rainbow.docker.python.python_image import PythonImage + + +def build_rainbow(path): + """ + TODO: doc for build_rainbow + """ + + config_files = files_util.find_config_files(path) + + for config_file in config_files: + print(f'Building artifacts file: f{config_file}') + + with open(config_file) as stream: + # TODO: validate config + config = yaml.safe_load(stream) + + for pipeline in config['pipelines']: + for task in pipeline['tasks']: + task_type = task['type'] + task_instance = get_build_class(task_type)() + task_instance.build(base_path=os.path.dirname(config_file), + relative_source_path=task['source'], + tag=task['image']) + + +# TODO: task class registry +build_classes = { + 'python': PythonImage + +} + + +def get_build_class(task_type): + return build_classes[task_type] diff --git a/rainbow/runners/airflow/build/python/container-setup.sh b/rainbow/build/python/container-setup.sh similarity index 100% rename from rainbow/runners/airflow/build/python/container-setup.sh rename to rainbow/build/python/container-setup.sh diff --git a/rainbow/runners/airflow/build/python/container-teardown.sh b/rainbow/build/python/container-teardown.sh similarity index 100% rename from rainbow/runners/airflow/build/python/container-teardown.sh rename to rainbow/build/python/container-teardown.sh diff --git a/rainbow/core/__init__.py b/rainbow/core/__init__.py index 2162b08..217e5db 100644 --- a/rainbow/core/__init__.py +++ b/rainbow/core/__init__.py @@ -15,4 +15,3 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -# TODO: core diff --git a/tests/runners/airflow/tasks/hello_world/__init__.py b/rainbow/core/util/__init__.py similarity index 100% copy from tests/runners/airflow/tasks/hello_world/__init__.py copy to rainbow/core/util/__init__.py diff --git a/rainbow/core/__init__.py b/rainbow/core/util/files_util.py similarity index 76% copy from rainbow/core/__init__.py copy to rainbow/core/util/files_util.py index 2162b08..e5a8e09 100644 --- a/rainbow/core/__init__.py +++ b/rainbow/core/util/files_util.py @@ -15,4 +15,14 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -# TODO: core + +import os + + +def find_config_files(path): + files = [] + for r, d, f in os.walk(path): + for file in f: + if file[file.rfind('.') + 1:] in ['yml', 'yaml']: + files.append(os.path.join(r, file)) + return files diff --git a/rainbow/docker/python/python_image.py b/rainbow/docker/python/python_image.py index d66dfbe..2cd3594 100644 --- a/rainbow/docker/python/python_image.py +++ b/rainbow/docker/python/python_image.py @@ -18,44 +18,57 @@ import os import shutil import tempfile + import docker -def build(source_path, tag, extra_files=None): - if extra_files is None: - extra_files = [] +class PythonImage: + + def build(self, base_path, relative_source_path, tag, extra_files=None): + """ + TODO: pydoc - print(f'Building image {tag}') + :param base_path: + :param relative_source_path: + :param tag: + :param extra_files: + :return: + """ - temp_dir = tempfile.mkdtemp() - # Delete dir for shutil.copytree to work - os.rmdir(temp_dir) + if extra_files is None: + extra_files = [] - __copy_source(source_path, temp_dir) + print(f'Building image {tag}') - requirements_file_path = os.path.join(temp_dir, 'requirements.txt') - if not os.path.exists(requirements_file_path): - with open(requirements_file_path, 'w'): - pass + temp_dir = tempfile.mkdtemp() + # Delete dir for shutil.copytree to work + os.rmdir(temp_dir) - dockerfile_path = os.path.join(os.path.dirname(__file__), 'Dockerfile') + self.__copy_source(os.path.join(base_path, relative_source_path), temp_dir) - for file in extra_files + [dockerfile_path]: - __copy_file(file, temp_dir) + requirements_file_path = os.path.join(temp_dir, 'requirements.txt') + if not os.path.exists(requirements_file_path): + with open(requirements_file_path, 'w'): + pass - print(temp_dir, os.listdir(temp_dir)) + dockerfile_path = os.path.join(os.path.dirname(__file__), 'Dockerfile') - docker_client = docker.from_env() - docker_client.images.build(path=temp_dir, tag=tag) + for file in extra_files + [dockerfile_path]: + self.__copy_file(file, temp_dir) - docker_client.close() + print(temp_dir, os.listdir(temp_dir)) - shutil.rmtree(temp_dir) + docker_client = docker.from_env() + docker_client.images.build(path=temp_dir, tag=tag) + docker_client.close() -def __copy_source(source_path, destination_path): - shutil.copytree(source_path, destination_path) + shutil.rmtree(temp_dir) + @staticmethod + def __copy_source(source_path, destination_path): + shutil.copytree(source_path, destination_path) -def __copy_file(source_file_path, destination_file_path): - shutil.copy2(source_file_path, destination_file_path) + @staticmethod + def __copy_file(source_file_path, destination_file_path): + shutil.copy2(source_file_path, destination_file_path) diff --git a/rainbow/runners/airflow/build/build_rainbow.py b/rainbow/runners/airflow/build/build_rainbow.py deleted file mode 100644 index 222ea5f..0000000 --- a/rainbow/runners/airflow/build/build_rainbow.py +++ /dev/null @@ -1,84 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import os -import pprint -from datetime import datetime - -import yaml -from airflow import DAG - -from rainbow.runners.airflow.tasks.python import PythonTask - - -def build_rainbow(path): - """ - TODO: doc for build_rainbow - """ - files = [] - for r, d, f in os.walk(path): - for file in f: - if file[file.rfind('.') + 1:] in ['yml', 'yaml']: - files.append(os.path.join(r, file)) - - print(files) - - dags = [] - - for config_file in files: - print(f'Building artifacts file: f{config_file}') - - with open(config_file) as stream: - # TODO: validate config - config = yaml.safe_load(stream) - pp = pprint.PrettyPrinter(indent=4) - # pp.pprint(config) - - for pipeline in config['pipelines']: - parent = None - - default_args = { - 'owner': config['owner'], - 'start_date': datetime.combine(pipeline['start_date'], datetime.min.time()) - } - # TODO: add all relevant airflow args - dag = DAG( - dag_id='test_dag', - default_args=default_args - ) - - for task in pipeline['tasks']: - task_type = task['type'] - task_instance = get_task_class(task_type)( - dag, pipeline['pipeline'], parent if parent else None, task, 'all_success' - ) - parent = task_instance.build() - - -# TODO: task class registry -task_classes = { - 'python': PythonTask -} - - -def get_task_class(task_type): - return task_classes[task_type] - - -if __name__ == '__main__': - register_dags('') diff --git a/rainbow/runners/airflow/dag/rainbow_dags.py b/rainbow/runners/airflow/dag/rainbow_dags.py index c564737..639f0cc 100644 --- a/rainbow/runners/airflow/dag/rainbow_dags.py +++ b/rainbow/runners/airflow/dag/rainbow_dags.py @@ -16,38 +16,30 @@ # specific language governing permissions and limitations # under the License. -import os -import pprint from datetime import datetime import yaml from airflow import DAG -from rainbow.runners.airflow.build import build_rainbow +from rainbow.core.util import files_util +from rainbow.runners.airflow.tasks.python import PythonTask -def register_dags(path): +def register_dags(configs_path): """ TODO: doc for register_dags """ - files = [] - for r, d, f in os.walk(path): - for file in f: - if file[file.rfind('.') + 1:] in ['yml', 'yaml']: - files.append(os.path.join(r, file)) - print(files) + config_files = files_util.find_config_files(configs_path) dags = [] - for config_file in files: + for config_file in config_files: print(f'Registering DAG for file: f{config_file}') with open(config_file) as stream: # TODO: validate config config = yaml.safe_load(stream) - pp = pprint.PrettyPrinter(indent=4) - # pp.pprint(config) for pipeline in config['pipelines']: parent = None @@ -75,7 +67,10 @@ def register_dags(path): return dags -task_classes = build_rainbow.task_classes +# TODO: task class registry +task_classes = { + 'python': PythonTask +} def get_task_class(task_type): diff --git a/rainbow/runners/airflow/model/task.py b/rainbow/runners/airflow/model/task.py index 25656ee..8163117 100644 --- a/rainbow/runners/airflow/model/task.py +++ b/rainbow/runners/airflow/model/task.py @@ -32,12 +32,6 @@ class Task: self.config = config self.trigger_rule = trigger_rule - def build(self): - """ - Build task's artifacts. - """ - raise NotImplementedError() - def apply_task_to_dag(self): """ Registers Airflow operator to parent task. diff --git a/rainbow/runners/airflow/tasks/create_cloudformation_stack.py b/rainbow/runners/airflow/tasks/create_cloudformation_stack.py index c478dc7..ca8482a 100644 --- a/rainbow/runners/airflow/tasks/create_cloudformation_stack.py +++ b/rainbow/runners/airflow/tasks/create_cloudformation_stack.py @@ -27,8 +27,5 @@ class CreateCloudFormationStackTask(task.Task): def __init__(self, dag, pipeline_name, parent, config, trigger_rule): super().__init__(dag, pipeline_name, parent, config, trigger_rule) - def build(self): - pass - def apply_task_to_dag(self): pass diff --git a/rainbow/runners/airflow/tasks/delete_cloudformation_stack.py b/rainbow/runners/airflow/tasks/delete_cloudformation_stack.py index d172284..8ac4e8b 100644 --- a/rainbow/runners/airflow/tasks/delete_cloudformation_stack.py +++ b/rainbow/runners/airflow/tasks/delete_cloudformation_stack.py @@ -27,8 +27,5 @@ class DeleteCloudFormationStackTask(task.Task): def __init__(self, dag, pipeline_name, parent, config, trigger_rule): super().__init__(dag, pipeline_name, parent, config, trigger_rule) - def build(self): - pass - def apply_task_to_dag(self): pass diff --git a/rainbow/runners/airflow/tasks/job_end.py b/rainbow/runners/airflow/tasks/job_end.py index a6c5ef2..53e1eef 100644 --- a/rainbow/runners/airflow/tasks/job_end.py +++ b/rainbow/runners/airflow/tasks/job_end.py @@ -27,8 +27,5 @@ class JobEndTask(task.Task): def __init__(self, dag, pipeline_name, parent, config, trigger_rule): super().__init__(dag, pipeline_name, parent, config, trigger_rule) - def build(self): - pass - def apply_task_to_dag(self): pass diff --git a/rainbow/runners/airflow/tasks/job_start.py b/rainbow/runners/airflow/tasks/job_start.py index 7338363..5c82e1c 100644 --- a/rainbow/runners/airflow/tasks/job_start.py +++ b/rainbow/runners/airflow/tasks/job_start.py @@ -27,9 +27,6 @@ class JobStartTask(task.Task): def __init__(self, dag, pipeline_name, parent, config, trigger_rule): super().__init__(dag, pipeline_name, parent, config, trigger_rule) - def build(self): - pass - def apply_task_to_dag(self): # TODO: job start task pass diff --git a/rainbow/runners/airflow/tasks/python.py b/rainbow/runners/airflow/tasks/python.py index eb00c0e..b2769c8 100644 --- a/rainbow/runners/airflow/tasks/python.py +++ b/rainbow/runners/airflow/tasks/python.py @@ -47,15 +47,6 @@ class PythonTask(task.Task): self.config_task_id = self.task_name + '_input' self.executors = self.__executors() - def build(self): - if 'source' in self.config: - script_dir = os.path.dirname(__file__) - - python_image.build(self.config['source'], self.image, [ - os.path.join(script_dir, '../build/python/container-setup.sh'), - os.path.join(script_dir, '../build/python/container-teardown.sh') - ]) - def apply_task_to_dag(self): config_task = None diff --git a/rainbow/runners/airflow/tasks/spark.py b/rainbow/runners/airflow/tasks/spark.py index 8846f97..5822e92 100644 --- a/rainbow/runners/airflow/tasks/spark.py +++ b/rainbow/runners/airflow/tasks/spark.py @@ -27,8 +27,5 @@ class SparkTask(task.Task): def __init__(self, dag, pipeline_name, parent, config, trigger_rule): super().__init__(dag, pipeline_name, parent, config, trigger_rule) - def build(self): - pass - def apply_task_to_dag(self): pass diff --git a/rainbow/runners/airflow/tasks/sql.py b/rainbow/runners/airflow/tasks/sql.py index 23458a9..42c02ce 100644 --- a/rainbow/runners/airflow/tasks/sql.py +++ b/rainbow/runners/airflow/tasks/sql.py @@ -27,8 +27,5 @@ class SparkTask(task.Task): def __init__(self, dag, pipeline_name, parent, config, trigger_rule): super().__init__(dag, pipeline_name, parent, config, trigger_rule) - def build(self): - pass - def apply_task_to_dag(self): pass diff --git a/requirements.txt b/requirements.txt index e952e4c..6e05d98 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,5 @@ +botocore +PyYAML docker==4.2.0 apache-airflow==1.10.9 docker-pycreds==0.4.0 diff --git a/tests/runners/airflow/tasks/hello_world/__init__.py b/tests/runners/airflow/build/__init__.py similarity index 100% copy from tests/runners/airflow/tasks/hello_world/__init__.py copy to tests/runners/airflow/build/__init__.py diff --git a/tests/runners/airflow/tasks/hello_world/__init__.py b/tests/runners/airflow/build/python/__init__.py similarity index 100% copy from tests/runners/airflow/tasks/hello_world/__init__.py copy to tests/runners/airflow/build/python/__init__.py diff --git a/rainbow/runners/airflow/tasks/delete_cloudformation_stack.py b/tests/runners/airflow/build/python/test_python_image.py similarity index 58% copy from rainbow/runners/airflow/tasks/delete_cloudformation_stack.py copy to tests/runners/airflow/build/python/test_python_image.py index d172284..c290720 100644 --- a/rainbow/runners/airflow/tasks/delete_cloudformation_stack.py +++ b/tests/runners/airflow/build/python/test_python_image.py @@ -16,19 +16,23 @@ # specific language governing permissions and limitations # under the License. -from rainbow.runners.airflow.model import task +import docker +from rainbow.docker.python import python_image -class DeleteCloudFormationStackTask(task.Task): - """ - # TODO: Deletes cloud_formation stack. - """ - def __init__(self, dag, pipeline_name, parent, config, trigger_rule): - super().__init__(dag, pipeline_name, parent, config, trigger_rule) +def test_build(self): + config = self.__create_conf('my_task') - def build(self): - pass + image_name = config['image'] - def apply_task_to_dag(self): - pass + python_image.build('tests/runners/airflow/rainbow', 'hello_world', 'image_name') + + # TODO: elaborate test of image, validate input/output + + docker_client = docker.from_env() + docker_client.images.get(image_name) + container_log = docker_client.containers.run(image_name, "python hello_world.py") + docker_client.close() + + self.assertEqual("b'Hello world!\\n'", str(container_log)) diff --git a/tests/runners/airflow/build/test_build_rainbow.py b/tests/runners/airflow/build/test_build_rainbow.py new file mode 100644 index 0000000..c8fec6e --- /dev/null +++ b/tests/runners/airflow/build/test_build_rainbow.py @@ -0,0 +1,8 @@ +from unittest import TestCase + +from rainbow.build import build_rainbow + + +class Test(TestCase): + def test_build_rainbow(self): + build_rainbow.build_rainbow('tests/runners/airflow/rainbow') diff --git a/tests/runners/airflow/dag/test_rainbow_dags.py b/tests/runners/airflow/dag/test_rainbow_dags.py index c66e3bc..2a65f31 100644 --- a/tests/runners/airflow/dag/test_rainbow_dags.py +++ b/tests/runners/airflow/dag/test_rainbow_dags.py @@ -6,7 +6,7 @@ import unittest class Test(TestCase): def test_register_dags(self): - dags = rainbow_dags.register_dags("tests/runners/airflow/dag/rainbow") + dags = rainbow_dags.register_dags('tests/runners/airflow/rainbow') self.assertEqual(len(dags), 1) # TODO: elaborate test pass diff --git a/tests/runners/airflow/tasks/hello_world/__init__.py b/tests/runners/airflow/rainbow/__init__.py similarity index 100% copy from tests/runners/airflow/tasks/hello_world/__init__.py copy to tests/runners/airflow/rainbow/__init__.py diff --git a/tests/runners/airflow/tasks/hello_world/__init__.py b/tests/runners/airflow/rainbow/hello_world/__init__.py similarity index 100% rename from tests/runners/airflow/tasks/hello_world/__init__.py rename to tests/runners/airflow/rainbow/hello_world/__init__.py diff --git a/tests/runners/airflow/tasks/hello_world/hello_world.py b/tests/runners/airflow/rainbow/hello_world/hello_world.py similarity index 100% rename from tests/runners/airflow/tasks/hello_world/hello_world.py rename to tests/runners/airflow/rainbow/hello_world/hello_world.py diff --git a/tests/runners/airflow/dag/rainbow/rainbow.yml b/tests/runners/airflow/rainbow/rainbow.yml similarity index 59% rename from tests/runners/airflow/dag/rainbow/rainbow.yml rename to tests/runners/airflow/rainbow/rainbow.yml index 07afd08..fd30028 100644 --- a/tests/runners/airflow/dag/rainbow/rainbow.yml +++ b/tests/runners/airflow/rainbow/rainbow.yml @@ -1,4 +1,20 @@ - +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. --- name: MyPipeline owner: Bosco Albert Baracus @@ -12,8 +28,8 @@ pipelines: - task: my_static_config_task type: python description: my 1st ds task - image: mytask1artifactid - source: mytask1folder + image: my_image + source: hello_world env_vars: env1: "a" env2: "b" @@ -25,7 +41,7 @@ pipelines: type: python description: my 1st ds task image: mytask1artifactid - source: mytask1folder + source: hello_world env_vars: env1: "a" env2: "b" diff --git a/tests/runners/airflow/tasks/test_python.py b/tests/runners/airflow/tasks/test_python.py index 4bbbe9c..37a325a 100644 --- a/tests/runners/airflow/tasks/test_python.py +++ b/tests/runners/airflow/tasks/test_python.py @@ -46,29 +46,13 @@ class TestPythonTask(TestCase): self.assertIsInstance(dag_task0, ConfigurableKubernetesPodOperator) self.assertEqual(dag_task0.task_id, task_id) - def test_build(self): - config = self.__create_conf('my_task') - - task0 = python.PythonTask(None, None, None, config, None) - task0.build() - - # TODO: elaborate test of image, validate input/output - image_name = config['image'] - - docker_client = docker.from_env() - docker_client.images.get(image_name) - container_log = docker_client.containers.run(image_name, "python hello_world.py") - docker_client.close() - - self.assertEqual("b'Hello world!\\n'", str(container_log)) - @staticmethod def __create_conf(task_id): return { 'task': task_id, 'cmd': 'foo bar', 'image': 'my_image', - 'source': 'tests/runners/airflow/tasks/hello_world', + 'source': 'tests/runners/airflow/rainbow/hello_world', 'input_type': 'my_input_type', 'input_path': 'my_input', 'output_path': '/my_output.json'
