This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-liminal.git

commit a32d4eb722511db7b2f29228fae8e08ca5de8e81
Author: aviemzur <[email protected]>
AuthorDate: Thu Mar 12 10:08:43 2020 +0200

    Add build module
---
 README.md                                          |  4 ++
 rainbow/docker/__init__.py                         |  1 -
 rainbow/docker/python/Dockerfile                   | 19 +++++++
 rainbow/docker/{ => python}/__init__.py            |  1 -
 rainbow/docker/python/python_image.py              | 61 ++++++++++++++++++++++
 rainbow/{ => runners/airflow}/build/__init__.py    |  0
 .../rainbow_dags.py => build/build_rainbow.py}     | 18 +++----
 .../airflow/build/python/container-setup.sh        |  9 ++++
 .../airflow/build/python/container-teardown.sh     |  6 +++
 rainbow/runners/airflow/dag/rainbow_dags.py        | 10 ++--
 rainbow/runners/airflow/model/task.py              |  4 +-
 .../airflow/tasks/create_cloudformation_stack.py   |  2 +-
 .../airflow/tasks/delete_cloudformation_stack.py   |  2 +-
 rainbow/runners/airflow/tasks/job_end.py           |  2 +-
 rainbow/runners/airflow/tasks/job_start.py         |  2 +-
 rainbow/runners/airflow/tasks/python.py            | 13 +++--
 rainbow/runners/airflow/tasks/spark.py             |  2 +-
 rainbow/runners/airflow/tasks/sql.py               |  2 +-
 requirements.txt                                   |  3 ++
 tests/runners/airflow/dag/test_rainbow_dags.py     |  5 ++
 .../runners/airflow/tasks/hello_world}/__init__.py |  1 -
 .../airflow/tasks/hello_world/hello_world.py       |  2 +-
 tests/runners/airflow/tasks/test_python.py         | 45 +++++++++++++---
 23 files changed, 175 insertions(+), 39 deletions(-)

diff --git a/README.md b/README.md
index 7168564..d8b9a23 100644
--- a/README.md
+++ b/README.md
@@ -1 +1,5 @@
 # rainbow
+
+```
+ln -s 
"/Applications/Docker.app/Contents//Resources/bin/docker-credential-desktop" 
"/usr/local/bin/docker-credential-desktop"
+```
\ No newline at end of file
diff --git a/rainbow/docker/__init__.py b/rainbow/docker/__init__.py
index 8bb1ec2..217e5db 100644
--- a/rainbow/docker/__init__.py
+++ b/rainbow/docker/__init__.py
@@ -15,4 +15,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-# TODO: docker
diff --git a/rainbow/docker/python/Dockerfile b/rainbow/docker/python/Dockerfile
new file mode 100644
index 0000000..d4e3ed2
--- /dev/null
+++ b/rainbow/docker/python/Dockerfile
@@ -0,0 +1,19 @@
+# Use an official Python runtime as a parent image
+FROM python:3.7-slim
+
+# Install aptitude build-essential
+#RUN apt-get install -y --reinstall build-essential
+
+# Set the working directory to /app
+WORKDIR /app
+
+# Order of operations is important here for docker's caching & incremental 
build performance.    !
+# Be careful when changing this code.                                          
                  !
+
+# Install any needed packages specified in requirements.txt
+COPY ./requirements.txt /app
+RUN pip install -r requirements.txt
+
+# Copy the current directory contents into the container at /app
+RUN echo "Copying source code.."
+COPY . /app
diff --git a/rainbow/docker/__init__.py b/rainbow/docker/python/__init__.py
similarity index 98%
copy from rainbow/docker/__init__.py
copy to rainbow/docker/python/__init__.py
index 8bb1ec2..217e5db 100644
--- a/rainbow/docker/__init__.py
+++ b/rainbow/docker/python/__init__.py
@@ -15,4 +15,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-# TODO: docker
diff --git a/rainbow/docker/python/python_image.py 
b/rainbow/docker/python/python_image.py
new file mode 100644
index 0000000..d66dfbe
--- /dev/null
+++ b/rainbow/docker/python/python_image.py
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+import shutil
+import tempfile
+import docker
+
+
+def build(source_path, tag, extra_files=None):
+    if extra_files is None:
+        extra_files = []
+
+    print(f'Building image {tag}')
+
+    temp_dir = tempfile.mkdtemp()
+    # Delete dir for shutil.copytree to work
+    os.rmdir(temp_dir)
+
+    __copy_source(source_path, temp_dir)
+
+    requirements_file_path = os.path.join(temp_dir, 'requirements.txt')
+    if not os.path.exists(requirements_file_path):
+        with open(requirements_file_path, 'w'):
+            pass
+
+    dockerfile_path = os.path.join(os.path.dirname(__file__), 'Dockerfile')
+
+    for file in extra_files + [dockerfile_path]:
+        __copy_file(file, temp_dir)
+
+    print(temp_dir, os.listdir(temp_dir))
+
+    docker_client = docker.from_env()
+    docker_client.images.build(path=temp_dir, tag=tag)
+
+    docker_client.close()
+
+    shutil.rmtree(temp_dir)
+
+
+def __copy_source(source_path, destination_path):
+    shutil.copytree(source_path, destination_path)
+
+
+def __copy_file(source_file_path, destination_file_path):
+    shutil.copy2(source_file_path, destination_file_path)
diff --git a/rainbow/build/__init__.py 
b/rainbow/runners/airflow/build/__init__.py
similarity index 100%
rename from rainbow/build/__init__.py
rename to rainbow/runners/airflow/build/__init__.py
diff --git a/rainbow/runners/airflow/dag/rainbow_dags.py 
b/rainbow/runners/airflow/build/build_rainbow.py
similarity index 86%
copy from rainbow/runners/airflow/dag/rainbow_dags.py
copy to rainbow/runners/airflow/build/build_rainbow.py
index 6bdf66b..222ea5f 100644
--- a/rainbow/runners/airflow/dag/rainbow_dags.py
+++ b/rainbow/runners/airflow/build/build_rainbow.py
@@ -26,7 +26,10 @@ from airflow import DAG
 from rainbow.runners.airflow.tasks.python import PythonTask
 
 
-def register_dags(path):
+def build_rainbow(path):
+    """
+    TODO: doc for build_rainbow
+    """
     files = []
     for r, d, f in os.walk(path):
         for file in f:
@@ -38,7 +41,7 @@ def register_dags(path):
     dags = []
 
     for config_file in files:
-        print(f'Registering DAG for file: f{config_file}')
+        print(f'Building artifacts file: f{config_file}')
 
         with open(config_file) as stream:
             # TODO: validate config
@@ -64,12 +67,7 @@ def register_dags(path):
                     task_instance = get_task_class(task_type)(
                         dag, pipeline['pipeline'], parent if parent else None, 
task, 'all_success'
                     )
-                    parent = task_instance.apply_task_to_dag()
-
-                    print(f'{parent}{{{task_type}}}')
-
-                dags.append(dag)
-    return dags
+                    parent = task_instance.build()
 
 
 # TODO: task class registry
@@ -83,6 +81,4 @@ def get_task_class(task_type):
 
 
 if __name__ == '__main__':
-    # TODO: configurable yaml dir
-    path = 'tests/runners/airflow/dag/rainbow'
-    register_dags(path)
+    register_dags('')
diff --git a/rainbow/runners/airflow/build/python/container-setup.sh 
b/rainbow/runners/airflow/build/python/container-setup.sh
new file mode 100755
index 0000000..6e8d242
--- /dev/null
+++ b/rainbow/runners/airflow/build/python/container-setup.sh
@@ -0,0 +1,9 @@
+#!/bin/bash
+
+echo """$RAINBOW_INPUT""" > rainbow_input.json
+
+AIRFLOW_RETURN_FILE=/airflow/xcom/return.json
+
+mkdir -p /airflow/xcom/
+
+echo {} > $AIRFLOW_RETURN_FILE
diff --git a/rainbow/runners/airflow/build/python/container-teardown.sh 
b/rainbow/runners/airflow/build/python/container-teardown.sh
new file mode 100755
index 0000000..1219407
--- /dev/null
+++ b/rainbow/runners/airflow/build/python/container-teardown.sh
@@ -0,0 +1,6 @@
+#!/bin/bash
+
+USER_CONFIG_OUTPUT_FILE=$1
+if [ "$USER_CONFIG_OUTPUT_FILE" != "" ]; then
+    cp ${USER_CONFIG_OUTPUT_FILE} /airflow/xcom/return.json
+fi
diff --git a/rainbow/runners/airflow/dag/rainbow_dags.py 
b/rainbow/runners/airflow/dag/rainbow_dags.py
index 6bdf66b..c564737 100644
--- a/rainbow/runners/airflow/dag/rainbow_dags.py
+++ b/rainbow/runners/airflow/dag/rainbow_dags.py
@@ -23,10 +23,13 @@ from datetime import datetime
 import yaml
 from airflow import DAG
 
-from rainbow.runners.airflow.tasks.python import PythonTask
+from rainbow.runners.airflow.build import build_rainbow
 
 
 def register_dags(path):
+    """
+    TODO: doc for register_dags
+    """
     files = []
     for r, d, f in os.walk(path):
         for file in f:
@@ -72,10 +75,7 @@ def register_dags(path):
     return dags
 
 
-# TODO: task class registry
-task_classes = {
-    'python': PythonTask
-}
+task_classes = build_rainbow.task_classes
 
 
 def get_task_class(task_type):
diff --git a/rainbow/runners/airflow/model/task.py 
b/rainbow/runners/airflow/model/task.py
index 2650aa1..25656ee 100644
--- a/rainbow/runners/airflow/model/task.py
+++ b/rainbow/runners/airflow/model/task.py
@@ -32,9 +32,9 @@ class Task:
         self.config = config
         self.trigger_rule = trigger_rule
 
-    def setup(self):
+    def build(self):
         """
-        Setup method for task.
+        Build task's artifacts.
         """
         raise NotImplementedError()
 
diff --git a/rainbow/runners/airflow/tasks/create_cloudformation_stack.py 
b/rainbow/runners/airflow/tasks/create_cloudformation_stack.py
index 9304167..c478dc7 100644
--- a/rainbow/runners/airflow/tasks/create_cloudformation_stack.py
+++ b/rainbow/runners/airflow/tasks/create_cloudformation_stack.py
@@ -27,7 +27,7 @@ class CreateCloudFormationStackTask(task.Task):
     def __init__(self, dag, pipeline_name, parent, config, trigger_rule):
         super().__init__(dag, pipeline_name, parent, config, trigger_rule)
 
-    def setup(self):
+    def build(self):
         pass
 
     def apply_task_to_dag(self):
diff --git a/rainbow/runners/airflow/tasks/delete_cloudformation_stack.py 
b/rainbow/runners/airflow/tasks/delete_cloudformation_stack.py
index 66d5783..d172284 100644
--- a/rainbow/runners/airflow/tasks/delete_cloudformation_stack.py
+++ b/rainbow/runners/airflow/tasks/delete_cloudformation_stack.py
@@ -27,7 +27,7 @@ class DeleteCloudFormationStackTask(task.Task):
     def __init__(self, dag, pipeline_name, parent, config, trigger_rule):
         super().__init__(dag, pipeline_name, parent, config, trigger_rule)
 
-    def setup(self):
+    def build(self):
         pass
 
     def apply_task_to_dag(self):
diff --git a/rainbow/runners/airflow/tasks/job_end.py 
b/rainbow/runners/airflow/tasks/job_end.py
index b3244c4..a6c5ef2 100644
--- a/rainbow/runners/airflow/tasks/job_end.py
+++ b/rainbow/runners/airflow/tasks/job_end.py
@@ -27,7 +27,7 @@ class JobEndTask(task.Task):
     def __init__(self, dag, pipeline_name, parent, config, trigger_rule):
         super().__init__(dag, pipeline_name, parent, config, trigger_rule)
 
-    def setup(self):
+    def build(self):
         pass
 
     def apply_task_to_dag(self):
diff --git a/rainbow/runners/airflow/tasks/job_start.py 
b/rainbow/runners/airflow/tasks/job_start.py
index f794e09..7338363 100644
--- a/rainbow/runners/airflow/tasks/job_start.py
+++ b/rainbow/runners/airflow/tasks/job_start.py
@@ -27,7 +27,7 @@ class JobStartTask(task.Task):
     def __init__(self, dag, pipeline_name, parent, config, trigger_rule):
         super().__init__(dag, pipeline_name, parent, config, trigger_rule)
 
-    def setup(self):
+    def build(self):
         pass
 
     def apply_task_to_dag(self):
diff --git a/rainbow/runners/airflow/tasks/python.py 
b/rainbow/runners/airflow/tasks/python.py
index 983ce0c..8317854 100644
--- a/rainbow/runners/airflow/tasks/python.py
+++ b/rainbow/runners/airflow/tasks/python.py
@@ -16,10 +16,12 @@
 # specific language governing permissions and limitations
 # under the License.
 import json
+import os
 
 from airflow.models import Variable
 from airflow.operators.dummy_operator import DummyOperator
 
+from rainbow.docker.python import python_image
 from rainbow.runners.airflow.model import task
 from rainbow.runners.airflow.operators.kubernetes_pod_operator import \
     ConfigurableKubernetesPodOperator, \
@@ -45,9 +47,14 @@ class PythonTask(task.Task):
         self.config_task_id = self.task_name + '_input'
         self.executors = self.__executors()
 
-    def setup(self):
-        # TODO: build docker image if needed.
-        pass
+    def build(self):
+        if 'source' in self.config:
+            script_dir = os.path.dirname(__file__)
+
+            python_image.build(self.config['source'], self.image, [
+                os.path.join(script_dir, '../build/python/container-setup.sh'),
+                os.path.join(script_dir, 
'../build/python/container-teardown.sh')
+            ])
 
     def apply_task_to_dag(self):
 
diff --git a/rainbow/runners/airflow/tasks/spark.py 
b/rainbow/runners/airflow/tasks/spark.py
index ebae64e..8846f97 100644
--- a/rainbow/runners/airflow/tasks/spark.py
+++ b/rainbow/runners/airflow/tasks/spark.py
@@ -27,7 +27,7 @@ class SparkTask(task.Task):
     def __init__(self, dag, pipeline_name, parent, config, trigger_rule):
         super().__init__(dag, pipeline_name, parent, config, trigger_rule)
 
-    def setup(self):
+    def build(self):
         pass
 
     def apply_task_to_dag(self):
diff --git a/rainbow/runners/airflow/tasks/sql.py 
b/rainbow/runners/airflow/tasks/sql.py
index 6dfc0f1..23458a9 100644
--- a/rainbow/runners/airflow/tasks/sql.py
+++ b/rainbow/runners/airflow/tasks/sql.py
@@ -27,7 +27,7 @@ class SparkTask(task.Task):
     def __init__(self, dag, pipeline_name, parent, config, trigger_rule):
         super().__init__(dag, pipeline_name, parent, config, trigger_rule)
 
-    def setup(self):
+    def build(self):
         pass
 
     def apply_task_to_dag(self):
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..f22c0a7
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1,3 @@
+docker:4.2.0
+apache-airflow:1.10.9
+docker-pycreds:0.4.0
diff --git a/tests/runners/airflow/dag/test_rainbow_dags.py 
b/tests/runners/airflow/dag/test_rainbow_dags.py
index 41bea09..c66e3bc 100644
--- a/tests/runners/airflow/dag/test_rainbow_dags.py
+++ b/tests/runners/airflow/dag/test_rainbow_dags.py
@@ -1,6 +1,7 @@
 from unittest import TestCase
 
 from rainbow.runners.airflow.dag import rainbow_dags
+import unittest
 
 
 class Test(TestCase):
@@ -9,3 +10,7 @@ class Test(TestCase):
         self.assertEqual(len(dags), 1)
         # TODO: elaborate test
         pass
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/rainbow/docker/__init__.py 
b/tests/runners/airflow/tasks/hello_world/__init__.py
similarity index 98%
copy from rainbow/docker/__init__.py
copy to tests/runners/airflow/tasks/hello_world/__init__.py
index 8bb1ec2..217e5db 100644
--- a/rainbow/docker/__init__.py
+++ b/tests/runners/airflow/tasks/hello_world/__init__.py
@@ -15,4 +15,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-# TODO: docker
diff --git a/rainbow/docker/__init__.py 
b/tests/runners/airflow/tasks/hello_world/hello_world.py
similarity index 97%
copy from rainbow/docker/__init__.py
copy to tests/runners/airflow/tasks/hello_world/hello_world.py
index 8bb1ec2..9b87c05 100644
--- a/rainbow/docker/__init__.py
+++ b/tests/runners/airflow/tasks/hello_world/hello_world.py
@@ -15,4 +15,4 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-# TODO: docker
+print('Hello world!')
diff --git a/tests/runners/airflow/tasks/test_python.py 
b/tests/runners/airflow/tasks/test_python.py
index 4f5808b..4bbbe9c 100644
--- a/tests/runners/airflow/tasks/test_python.py
+++ b/tests/runners/airflow/tasks/test_python.py
@@ -16,8 +16,11 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import unittest
 from unittest import TestCase
 
+import docker
+
 from rainbow.runners.airflow.operators.kubernetes_pod_operator import \
     ConfigurableKubernetesPodOperator
 from rainbow.runners.airflow.tasks import python
@@ -25,20 +28,14 @@ from tests.util import dag_test_utils
 
 
 class TestPythonTask(TestCase):
+
     def test_apply_task_to_dag(self):
         # TODO: elaborate tests
         dag = dag_test_utils.create_dag()
 
         task_id = 'my_task'
 
-        config = {
-            'task': task_id,
-            'cmd': 'foo bar',
-            'image': 'my_image',
-            'input_type': 'my_input_type',
-            'input_path': 'my_input',
-            'output_path': '/my_output.json'
-        }
+        config = self.__create_conf(task_id)
 
         task0 = python.PythonTask(dag, 'my_pipeline', None, config, 
'all_success')
         task0.apply_task_to_dag()
@@ -48,3 +45,35 @@ class TestPythonTask(TestCase):
 
         self.assertIsInstance(dag_task0, ConfigurableKubernetesPodOperator)
         self.assertEqual(dag_task0.task_id, task_id)
+
+    def test_build(self):
+        config = self.__create_conf('my_task')
+
+        task0 = python.PythonTask(None, None, None, config, None)
+        task0.build()
+
+        # TODO: elaborate test of image, validate input/output
+        image_name = config['image']
+
+        docker_client = docker.from_env()
+        docker_client.images.get(image_name)
+        container_log = docker_client.containers.run(image_name, "python 
hello_world.py")
+        docker_client.close()
+
+        self.assertEqual("b'Hello world!\\n'", str(container_log))
+
+    @staticmethod
+    def __create_conf(task_id):
+        return {
+            'task': task_id,
+            'cmd': 'foo bar',
+            'image': 'my_image',
+            'source': 'tests/runners/airflow/tasks/hello_world',
+            'input_type': 'my_input_type',
+            'input_path': 'my_input',
+            'output_path': '/my_output.json'
+        }
+
+
+if __name__ == '__main__':
+    unittest.main()

Reply via email to