This is an automated email from the ASF dual-hosted git repository. jbonofre pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-liminal.git
commit 2d34195321dcc5ddd2e33a37b8042b73f749c09a Author: aviemzur <[email protected]> AuthorDate: Mon Mar 23 10:31:27 2020 +0200 Update README with yml example --- README.md | 69 +++++++++++++++++++++++++++++ rainbow/build/build_rainbows.py | 15 ++++--- rainbow/runners/airflow/dag/rainbow_dags.py | 3 +- tests/runners/airflow/rainbow/rainbow.yml | 26 +++++------ 4 files changed, 94 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index 257bb9a..62036e0 100644 --- a/README.md +++ b/README.md @@ -9,3 +9,72 @@ Rainbow's goal is to operationalize the machine learning process, allowing data quickly transition from a successful experiment to an automated pipeline of model training, validation, deployment and inference in production, freeing them from engineering and non-functional tasks, and allowing them to focus on machine learning code and artifacts. + +# Basics + +Using simple YAML configuration, create your own schedule data pipelines (a sequence of tasks to +perform), application servers, and more. + +## Example YAML config file + +name: MyPipeline +owner: Bosco Albert Baracus +pipelines: + - pipeline: my_pipeline + start_date: 1970-01-01 + timeout_minutes: 45 + schedule: 0 * 1 * * + metrics: + namespace: TestNamespace + backends: [ 'cloudwatch' ] + tasks: + - task: my_static_input_task + type: python + description: static input task + image: my_static_input_task_image + source: helloworld + env_vars: + env1: "a" + env2: "b" + input_type: static + input_path: '[ { "foo": "bar" }, { "foo": "baz" } ]' + output_path: /output.json + cmd: python -u hello_world.py + - task: my_parallelized_static_input_task + type: python + description: parallelized static input task + image: my_static_input_task_image + env_vars: + env1: "a" + env2: "b" + input_type: static + input_path: '[ { "foo": "bar" }, { "foo": "baz" } ]' + split_input: True + executors: 2 + cmd: python -u helloworld.py + - task: my_task_output_input_task + type: python + description: parallelized static input task + image: my_task_output_input_task_image + source: helloworld + env_vars: + env1: "a" + env2: "b" + input_type: task + input_path: my_static_input_task + cmd: python -u hello_world.py +services: + - service: + name: my_python_server + type: python_server + description: my python server + image: my_server_image + source: myserver + endpoints: + - endpoint: /myendpoint1 + module: myserver.my_server + function: myendpoint1func + +# Installation + +TODO: installation. diff --git a/rainbow/build/build_rainbows.py b/rainbow/build/build_rainbows.py index fa3a922..4ed5bab 100644 --- a/rainbow/build/build_rainbows.py +++ b/rainbow/build/build_rainbows.py @@ -40,12 +40,17 @@ def build_rainbows(path): for pipeline in rainbow_config['pipelines']: for task in pipeline['tasks']: - task_type = task['type'] - builder_class = __get_task_build_class(task_type) - if builder_class: - __build_image(base_path, task, builder_class) + task_name = task['task'] + + if 'source' in task: + task_type = task['type'] + builder_class = __get_task_build_class(task_type) + if builder_class: + __build_image(base_path, task, builder_class) + else: + raise ValueError(f'No such task type: {task_type}') else: - raise ValueError(f'No such task type: {task_type}') + print(f'No source configured for task {task_name}, skipping build..') for service in rainbow_config['services']: service_type = service['type'] diff --git a/rainbow/runners/airflow/dag/rainbow_dags.py b/rainbow/runners/airflow/dag/rainbow_dags.py index 71d18d2..17fd8d9 100644 --- a/rainbow/runners/airflow/dag/rainbow_dags.py +++ b/rainbow/runners/airflow/dag/rainbow_dags.py @@ -16,7 +16,7 @@ # specific language governing permissions and limitations # under the License. -from datetime import datetime +from datetime import datetime, timedelta import yaml from airflow import DAG @@ -56,6 +56,7 @@ def register_dags(configs_path): dag = DAG( dag_id=pipeline_name, default_args=default_args, + dagrun_timeout=timedelta(minutes=pipeline['timeout_minutes']), catchup=False ) diff --git a/tests/runners/airflow/rainbow/rainbow.yml b/tests/runners/airflow/rainbow/rainbow.yml index 27507fd..66e3dec 100644 --- a/tests/runners/airflow/rainbow/rainbow.yml +++ b/tests/runners/airflow/rainbow/rainbow.yml @@ -21,7 +21,7 @@ owner: Bosco Albert Baracus pipelines: - pipeline: my_pipeline start_date: 1970-01-01 - timeout-minutes: 45 + timeout_minutes: 45 schedule: 0 * 1 * * metrics: namespace: TestNamespace @@ -39,18 +39,18 @@ pipelines: input_path: '[ { "foo": "bar" }, { "foo": "baz" } ]' output_path: /output.json cmd: python -u hello_world.py -# - task: my_parallelized_static_input_task -# type: python -# description: parallelized static input task -# image: my_static_input_task_image -# env_vars: -# env1: "a" -# env2: "b" -# input_type: static -# input_path: '[ { "foo": "bar" }, { "foo": "baz" } ]' -# split_input: True -# executors: 2 -# cmd: python -u helloworld.py + - task: my_parallelized_static_input_task + type: python + description: parallelized static input task + image: my_static_input_task_image + env_vars: + env1: "a" + env2: "b" + input_type: static + input_path: '[ { "foo": "bar" }, { "foo": "baz" } ]' + split_input: True + executors: 2 + cmd: python -u helloworld.py - task: my_task_output_input_task type: python description: parallelized static input task
