This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-liminal.git

commit 2d34195321dcc5ddd2e33a37b8042b73f749c09a
Author: aviemzur <[email protected]>
AuthorDate: Mon Mar 23 10:31:27 2020 +0200

    Update README with yml example
---
 README.md                                   | 69 +++++++++++++++++++++++++++++
 rainbow/build/build_rainbows.py             | 15 ++++---
 rainbow/runners/airflow/dag/rainbow_dags.py |  3 +-
 tests/runners/airflow/rainbow/rainbow.yml   | 26 +++++------
 4 files changed, 94 insertions(+), 19 deletions(-)

diff --git a/README.md b/README.md
index 257bb9a..62036e0 100644
--- a/README.md
+++ b/README.md
@@ -9,3 +9,72 @@ Rainbow's goal is to operationalize the machine learning 
process, allowing data
 quickly transition from a successful experiment to an automated pipeline of 
model training,
 validation, deployment and inference in production, freeing them from 
engineering and
 non-functional tasks, and allowing them to focus on machine learning code and 
artifacts.
+
+# Basics
+
+Using simple YAML configuration, create your own schedule data pipelines (a 
sequence of tasks to
+perform), application servers,  and more.
+
+## Example YAML config file
+
+name: MyPipeline
+owner: Bosco Albert Baracus
+pipelines:
+  - pipeline: my_pipeline
+    start_date: 1970-01-01
+    timeout_minutes: 45
+    schedule: 0 * 1 * *
+    metrics:
+     namespace: TestNamespace
+     backends: [ 'cloudwatch' ]
+    tasks:
+      - task: my_static_input_task
+        type: python
+        description: static input task
+        image: my_static_input_task_image
+        source: helloworld
+        env_vars:
+          env1: "a"
+          env2: "b"
+        input_type: static
+        input_path: '[ { "foo": "bar" }, { "foo": "baz" } ]'
+        output_path: /output.json
+        cmd: python -u hello_world.py
+      - task: my_parallelized_static_input_task
+        type: python
+        description: parallelized static input task
+        image: my_static_input_task_image
+        env_vars:
+          env1: "a"
+          env2: "b"
+        input_type: static
+        input_path: '[ { "foo": "bar" }, { "foo": "baz" } ]'
+        split_input: True
+        executors: 2
+        cmd: python -u helloworld.py
+      - task: my_task_output_input_task
+        type: python
+        description: parallelized static input task
+        image: my_task_output_input_task_image
+        source: helloworld
+        env_vars:
+          env1: "a"
+          env2: "b"
+        input_type: task
+        input_path: my_static_input_task
+        cmd: python -u hello_world.py
+services:
+  - service:
+    name: my_python_server
+    type: python_server
+    description: my python server
+    image: my_server_image
+    source: myserver
+    endpoints:
+      - endpoint: /myendpoint1
+        module: myserver.my_server
+        function: myendpoint1func
+
+# Installation
+
+TODO: installation.
diff --git a/rainbow/build/build_rainbows.py b/rainbow/build/build_rainbows.py
index fa3a922..4ed5bab 100644
--- a/rainbow/build/build_rainbows.py
+++ b/rainbow/build/build_rainbows.py
@@ -40,12 +40,17 @@ def build_rainbows(path):
 
             for pipeline in rainbow_config['pipelines']:
                 for task in pipeline['tasks']:
-                    task_type = task['type']
-                    builder_class = __get_task_build_class(task_type)
-                    if builder_class:
-                        __build_image(base_path, task, builder_class)
+                    task_name = task['task']
+
+                    if 'source' in task:
+                        task_type = task['type']
+                        builder_class = __get_task_build_class(task_type)
+                        if builder_class:
+                            __build_image(base_path, task, builder_class)
+                        else:
+                            raise ValueError(f'No such task type: {task_type}')
                     else:
-                        raise ValueError(f'No such task type: {task_type}')
+                        print(f'No source configured for task {task_name}, 
skipping build..')
 
                 for service in rainbow_config['services']:
                     service_type = service['type']
diff --git a/rainbow/runners/airflow/dag/rainbow_dags.py 
b/rainbow/runners/airflow/dag/rainbow_dags.py
index 71d18d2..17fd8d9 100644
--- a/rainbow/runners/airflow/dag/rainbow_dags.py
+++ b/rainbow/runners/airflow/dag/rainbow_dags.py
@@ -16,7 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from datetime import datetime
+from datetime import datetime, timedelta
 
 import yaml
 from airflow import DAG
@@ -56,6 +56,7 @@ def register_dags(configs_path):
                 dag = DAG(
                     dag_id=pipeline_name,
                     default_args=default_args,
+                    
dagrun_timeout=timedelta(minutes=pipeline['timeout_minutes']),
                     catchup=False
                 )
 
diff --git a/tests/runners/airflow/rainbow/rainbow.yml 
b/tests/runners/airflow/rainbow/rainbow.yml
index 27507fd..66e3dec 100644
--- a/tests/runners/airflow/rainbow/rainbow.yml
+++ b/tests/runners/airflow/rainbow/rainbow.yml
@@ -21,7 +21,7 @@ owner: Bosco Albert Baracus
 pipelines:
   - pipeline: my_pipeline
     start_date: 1970-01-01
-    timeout-minutes: 45
+    timeout_minutes: 45
     schedule: 0 * 1 * *
     metrics:
      namespace: TestNamespace
@@ -39,18 +39,18 @@ pipelines:
         input_path: '[ { "foo": "bar" }, { "foo": "baz" } ]'
         output_path: /output.json
         cmd: python -u hello_world.py
-#      - task: my_parallelized_static_input_task
-#        type: python
-#        description: parallelized static input task
-#        image: my_static_input_task_image
-#        env_vars:
-#          env1: "a"
-#          env2: "b"
-#        input_type: static
-#        input_path: '[ { "foo": "bar" }, { "foo": "baz" } ]'
-#        split_input: True
-#        executors: 2
-#        cmd: python -u helloworld.py
+      - task: my_parallelized_static_input_task
+        type: python
+        description: parallelized static input task
+        image: my_static_input_task_image
+        env_vars:
+          env1: "a"
+          env2: "b"
+        input_type: static
+        input_path: '[ { "foo": "bar" }, { "foo": "baz" } ]'
+        split_input: True
+        executors: 2
+        cmd: python -u helloworld.py
       - task: my_task_output_input_task
         type: python
         description: parallelized static input task

Reply via email to