This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-liminal.git

commit 326a042ce87e979c4d97bb6f1e9c8e89ec6c09c0
Author: aviemzur <[email protected]>
AuthorDate: Sun Mar 15 15:23:57 2020 +0200

    Change pythontask config to input/output enhancement
---
 rainbow/build/python/container-setup.sh            |   2 +-
 .../airflow/operators/kubernetes_pod_operator.py   | 140 -------------------
 .../kubernetes_pod_operator_with_input_output.py   | 148 +++++++++++++++++++++
 rainbow/runners/airflow/tasks/python.py            |  57 ++++----
 .../airflow/build/python/test_python_image.py      |  14 +-
 tests/runners/airflow/build/test_build_rainbow.py  |   2 +-
 .../airflow/rainbow/hello_world/hello_world.py     |   9 ++
 tests/runners/airflow/rainbow/rainbow.yml          |  35 +++--
 tests/runners/airflow/tasks/test_python.py         |   6 +-
 9 files changed, 225 insertions(+), 188 deletions(-)

diff --git a/rainbow/build/python/container-setup.sh 
b/rainbow/build/python/container-setup.sh
index 4e20fc2..883f1e1 100755
--- a/rainbow/build/python/container-setup.sh
+++ b/rainbow/build/python/container-setup.sh
@@ -1,6 +1,6 @@
 #!/bin/sh
 
-echo """$RAINBOW_INPUT""" > rainbow_input.json
+echo """$RAINBOW_INPUT""" > /rainbow_input.json
 
 AIRFLOW_RETURN_FILE=/airflow/xcom/return.json
 
diff --git a/rainbow/runners/airflow/operators/kubernetes_pod_operator.py 
b/rainbow/runners/airflow/operators/kubernetes_pod_operator.py
deleted file mode 100644
index a7b0bdd..0000000
--- a/rainbow/runners/airflow/operators/kubernetes_pod_operator.py
+++ /dev/null
@@ -1,140 +0,0 @@
-from airflow.contrib.operators.kubernetes_pod_operator import 
KubernetesPodOperator
-import json
-import traceback
-from airflow.models import DAG, TaskInstance
-from airflow.utils import timezone
-from random import randint
-
-
-def split_list(seq, num):
-    avg = len(seq) / float(num)
-    out = []
-    last = 0.0
-
-    while last < len(seq):
-        out.append(seq[int(last):int(last + avg)])
-        last += avg
-
-    return out
-
-
-class ConfigureParallelExecutionOperator(KubernetesPodOperator):
-
-    def __init__(self,
-                 config_type=None,
-                 config_path=None,
-                 executors=1,
-                 *args,
-                 **kwargs):
-        namespace = kwargs['namespace']
-        image = kwargs['image']
-        name = kwargs['name']
-
-        del kwargs['namespace']
-        del kwargs['image']
-        del kwargs['name']
-
-        super().__init__(
-            namespace=namespace,
-            image=image,
-            name=name,
-            *args,
-            **kwargs)
-        self.config_type = config_type
-        self.config_path = config_path
-        self.executors = executors
-
-    def execute(self, context):
-        config_dict = {}
-
-        self.log.info(f'config type: {self.config_type}')
-
-        if self.config_type:
-            if self.config_type == 'file':
-                config_dict = {}  # future feature: return config from file
-            elif self.config_type == 'sql':
-                config_dict = {}  # future feature: return from sql config
-            elif self.config_type == 'task':
-                ti = context['task_instance']
-                self.log.info(self.config_path)
-                config_dict = ti.xcom_pull(task_ids=self.config_path)
-            elif self.config_type == 'static':
-                config_dict = json.loads(self.config_path)
-            else:
-                raise ValueError(f'Unknown config type: {self.config_type}')
-
-        run_id = context['dag_run'].run_id
-
-        return_conf = {'config_type': self.config_type,
-                       'splits': {'0': {'run_id': run_id, 'configs': []}}}
-
-        if config_dict:
-            self.log.info(f'configs dict: {config_dict}')
-
-            configs = config_dict['configs']
-
-            self.log.info(f'configs: {configs}')
-
-            config_splits = split_list(configs, self.executors)
-
-            for i in range(self.executors):
-                return_conf['splits'][str(i)] = {'run_id': run_id, 'configs': 
config_splits[i]}
-
-        return return_conf
-
-    def run_pod(self, context):
-        return super().execute(context)
-
-
-class ConfigurableKubernetesPodOperator(KubernetesPodOperator):
-
-    def __init__(self,
-                 config_task_id,
-                 task_split,
-                 *args,
-                 **kwargs):
-        namespace = kwargs['namespace']
-        image = kwargs['image']
-        name = kwargs['name']
-
-        del kwargs['namespace']
-        del kwargs['image']
-        del kwargs['name']
-
-        super().__init__(
-            namespace=namespace,
-            image=image,
-            name=name,
-            *args,
-            **kwargs)
-
-        self.config_task_id = config_task_id
-        self.task_split = task_split
-
-    def execute(self, context):
-        if self.config_task_id:
-            ti = context['task_instance']
-
-            config = ti.xcom_pull(task_ids=self.config_task_id)
-
-            if config:
-                split = {}
-
-                if 'configs' in config:
-                    split = configs
-                else:
-                    split = config['splits'][str(self.task_split)]
-
-                self.log.info(split)
-
-                if split and split['configs']:
-                    self.env_vars.update({'DATA_PIPELINE_CONFIG': 
json.dumps(split)})
-                    return super().execute(context)
-                else:
-                    self.log.info(
-                        f'Empty split config for split {self.task_split}. 
split config: {split}. config: {config}')
-            else:
-                raise ValueError('Config not found in task: ' + 
self.config_task_id)
-        else:
-            self.env_vars.update({'DATA_PIPELINE_CONFIG': '{}'})
-            return super().execute(context)
diff --git 
a/rainbow/runners/airflow/operators/kubernetes_pod_operator_with_input_output.py
 
b/rainbow/runners/airflow/operators/kubernetes_pod_operator_with_input_output.py
new file mode 100644
index 0000000..eb6fa83
--- /dev/null
+++ 
b/rainbow/runners/airflow/operators/kubernetes_pod_operator_with_input_output.py
@@ -0,0 +1,148 @@
+import json
+
+from airflow.contrib.operators.kubernetes_pod_operator import 
KubernetesPodOperator
+
+
+def split_list(seq, num):
+    avg = len(seq) / float(num)
+    out = []
+    last = 0.0
+
+    while last < len(seq):
+        out.append(seq[int(last):int(last + avg)])
+        last += avg
+
+    return out
+
+
+_IS_SPLIT_KEY = 'is_split'
+
+
+class PrepareInputOperator(KubernetesPodOperator):
+
+    def __init__(self,
+                 input_type=None,
+                 input_path=None,
+                 split_input=False,
+                 executors=1,
+                 *args,
+                 **kwargs):
+        namespace = kwargs['namespace']
+        image = kwargs['image']
+        name = kwargs['name']
+
+        del kwargs['namespace']
+        del kwargs['image']
+        del kwargs['name']
+
+        super().__init__(
+            namespace=namespace,
+            image=image,
+            name=name,
+            *args,
+            **kwargs)
+
+        self.input_type = input_type
+        self.input_path = input_path
+        self.executors = executors
+        self.split_input = split_input
+
+    def execute(self, context):
+        input_dict = {}
+
+        self.log.info(f'config type: {self.input_type}')
+
+        ti = context['task_instance']
+
+        if self.input_type:
+            if self.input_type == 'file':
+                input_dict = {}  # future feature: return config from file
+            elif self.input_type == 'sql':
+                input_dict = {}  # future feature: return from sql config
+            elif self.input_type == 'task':
+                self.log.info(self.input_path)
+                input_dict = ti.xcom_pull(task_ids=self.input_path)
+            elif self.input_type == 'static':
+                input_dict = json.loads(self.input_path)
+            else:
+                raise ValueError(f'Unknown config type: {self.input_type}')
+
+        # TODO: pass run_id as well as env var
+        run_id = context['dag_run'].run_id
+        print(f'run_id = {run_id}')
+
+        if input_dict:
+            self.log.info(f'Generated input: {input_dict}')
+
+            if self.split_input:
+                input_splits = split_list(input_dict, self.executors)
+
+                ti.xcom_push(key=_IS_SPLIT_KEY, value=True)
+
+                return input_splits
+            else:
+                return input_dict
+        else:
+            return {}
+
+    def run_pod(self, context):
+        return super().execute(context)
+
+
+class KubernetesPodOperatorWithInputAndOutput(KubernetesPodOperator):
+    """
+    TODO: pydoc
+    """
+
+    _RAINBOW_INPUT_ENV_VAR = 'RAINBOW_INPUT'
+
+    def __init__(self,
+                 task_split,
+                 input_task_id=None,
+                 *args,
+                 **kwargs):
+        namespace = kwargs['namespace']
+        image = kwargs['image']
+        name = kwargs['name']
+
+        del kwargs['namespace']
+        del kwargs['image']
+        del kwargs['name']
+
+        super().__init__(
+            namespace=namespace,
+            image=image,
+            name=name,
+            *args,
+            **kwargs)
+
+        self.input_task_id = input_task_id
+        self.task_split = task_split
+
+    def execute(self, context):
+        task_input = {}
+
+        if self.input_task_id:
+            ti = context['task_instance']
+
+            self.log.info(f'Fetching input for task {self.task_split}.')
+
+            task_input = ti.xcom_pull(task_ids=self.input_task_id)
+
+            is_split = ti.xcom_pull(task_ids=self.input_task_id, 
key=_IS_SPLIT_KEY)
+            self.log.info(f'is_split = {is_split}')
+            if is_split:
+                self.log.info(f'Fetching split {self.task_split} of input.')
+
+                task_input = task_input[self.task_split]
+
+        if task_input:
+            self.log.info(f'task input = {task_input}')
+
+            self.env_vars.update({self._RAINBOW_INPUT_ENV_VAR: 
json.dumps(task_input)})
+        else:
+            self.env_vars.update({self._RAINBOW_INPUT_ENV_VAR: '{}'})
+
+            self.log.info(f'Empty input for task {self.task_split}.')
+
+        return super().execute(context)
diff --git a/rainbow/runners/airflow/tasks/python.py 
b/rainbow/runners/airflow/tasks/python.py
index ac46d0b..8bd11cf 100644
--- a/rainbow/runners/airflow/tasks/python.py
+++ b/rainbow/runners/airflow/tasks/python.py
@@ -21,9 +21,9 @@ from airflow.models import Variable
 from airflow.operators.dummy_operator import DummyOperator
 
 from rainbow.runners.airflow.model import task
-from rainbow.runners.airflow.operators.kubernetes_pod_operator import \
-    ConfigurableKubernetesPodOperator, \
-    ConfigureParallelExecutionOperator
+from 
rainbow.runners.airflow.operators.kubernetes_pod_operator_with_input_output 
import \
+    KubernetesPodOperatorWithInputAndOutput, \
+    PrepareInputOperator
 
 
 class PythonTask(task.Task):
@@ -42,25 +42,24 @@ class PythonTask(task.Task):
         self.env_vars = self.__env_vars()
         self.kubernetes_kwargs = self.__kubernetes_kwargs()
         self.cmds, self.arguments = self.__kubernetes_cmds_and_arguments()
-        self.config_task_id = self.task_name + '_input'
+        self.input_task_id = self.task_name + '_input'
         self.executors = self.__executors()
 
     def apply_task_to_dag(self):
-
-        config_task = None
+        input_task = None
 
         if self.input_type in ['static', 'task']:
-            config_task = self.__config_task(config_task)
+            input_task = self.__input_task()
 
         if self.executors == 1:
-            return self.__apply_task_to_dag_single_executor(config_task)
+            return self.__apply_task_to_dag_single_executor(input_task)
         else:
-            return self.__apply_task_to_dag_multiple_executors(config_task)
+            return self.__apply_task_to_dag_multiple_executors(input_task)
 
-    def __apply_task_to_dag_multiple_executors(self, config_task):
-        if not config_task:
-            config_task = DummyOperator(
-                task_id=self.config_task_id,
+    def __apply_task_to_dag_multiple_executors(self, input_task):
+        if not input_task:
+            input_task = DummyOperator(
+                task_id=self.input_task_id,
                 trigger_rule=self.trigger_rule,
                 dag=self.dag
             )
@@ -71,7 +70,7 @@ class PythonTask(task.Task):
         )
 
         if self.parent:
-            self.parent.set_downstream(config_task)
+            self.parent.set_downstream(input_task)
 
             for i in range(self.executors):
                 split_task = self.__create_pod_operator(
@@ -80,51 +79,51 @@ class PythonTask(task.Task):
                     image=self.image
                 )
 
-                config_task.set_downstream(split_task)
+                input_task.set_downstream(split_task)
 
                 split_task.set_downstream(end_task)
 
         return end_task
 
     def __create_pod_operator(self, task_id, task_split, image):
-        return ConfigurableKubernetesPodOperator(
+        return KubernetesPodOperatorWithInputAndOutput(
             task_id=task_id,
-            config_task_id=self.config_task_id,
-            task_split=task_split,
+            input_task_id=self.input_task_id,
+            task_split=task_split if task_split else 0,
             image=image,
             cmds=self.cmds,
             arguments=self.arguments,
             **self.kubernetes_kwargs
         )
 
-    def __apply_task_to_dag_single_executor(self, config_task):
+    def __apply_task_to_dag_single_executor(self, input_task):
         pod_task = self.__create_pod_operator(
             task_id=f'{self.task_name}',
-            task_split=0,
+            task_split=None,
             image=f'''{self.image}'''
         )
 
         first_task = pod_task
 
-        if config_task:
-            first_task = config_task
+        if input_task:
+            first_task = input_task
             first_task.set_downstream(pod_task)
         if self.parent:
             self.parent.set_downstream(first_task)
 
         return pod_task
 
-    def __config_task(self, config_task):
-        self.env_vars.update({'DATA_PIPELINE_INPUT': self.input_path})
-        config_task = ConfigureParallelExecutionOperator(
-            task_id=self.config_task_id,
+    def __input_task(self):
+        return PrepareInputOperator(
+            task_id=self.input_task_id,
             image=self.image,
-            config_type=self.input_type,
-            config_path=self.input_path,
+            input_type=self.input_type,
+            input_path=self.input_path,
+            split_input=True if 'split_input' in self.config and
+                                self.config['split_input'] else False,
             executors=self.executors,
             **self.kubernetes_kwargs
         )
-        return config_task
 
     def __executors(self):
         executors = 1
diff --git a/tests/runners/airflow/build/python/test_python_image.py 
b/tests/runners/airflow/build/python/test_python_image.py
index 368b05d..d190fba 100644
--- a/tests/runners/airflow/build/python/test_python_image.py
+++ b/tests/runners/airflow/build/python/test_python_image.py
@@ -29,16 +29,24 @@ class TestPythonImage(TestCase):
 
         image_name = config['image']
 
-        PythonImage().build('tests/runners/airflow/rainbow', 'hello_world', 
'image_name')
+        PythonImage().build('tests/runners/airflow/rainbow', 'hello_world', 
image_name)
 
         # TODO: elaborate test of image, validate input/output
 
         docker_client = docker.from_env()
         docker_client.images.get(image_name)
-        container_log = docker_client.containers.run(image_name, "python 
hello_world.py")
+
+        cmd = 'export RAINBOW_INPUT="{}" && ' + \
+              'sh container-setup.sh && ' + \
+              'python hello_world.py && ' + \
+              'sh container-teardown.sh'
+        cmds = ['/bin/bash', '-c', cmd]
+
+        container_log = docker_client.containers.run(image_name, cmds)
+
         docker_client.close()
 
-        self.assertEqual("b'Hello world!\\n'", str(container_log))
+        self.assertEqual("b'Hello world!\\n\\n{}\\n'", str(container_log))
 
     @staticmethod
     def __create_conf(task_id):
diff --git a/tests/runners/airflow/build/test_build_rainbow.py 
b/tests/runners/airflow/build/test_build_rainbow.py
index 533848f..0817d6c 100644
--- a/tests/runners/airflow/build/test_build_rainbow.py
+++ b/tests/runners/airflow/build/test_build_rainbow.py
@@ -9,7 +9,7 @@ class TestBuildRainbow(TestCase):
 
     def test_build_rainbow(self):
         docker_client = docker.client.from_env()
-        image_names = ['rainbow_image', 'rainbow_image2']
+        image_names = ['my_static_input_task_image', 
'my_task_output_input_task_image']
 
         for image_name in image_names:
             if len(docker_client.images.list(image_name)) > 0:
diff --git a/tests/runners/airflow/rainbow/hello_world/hello_world.py 
b/tests/runners/airflow/rainbow/hello_world/hello_world.py
index 9b87c05..3eae465 100644
--- a/tests/runners/airflow/rainbow/hello_world/hello_world.py
+++ b/tests/runners/airflow/rainbow/hello_world/hello_world.py
@@ -15,4 +15,13 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import json
+
 print('Hello world!')
+print()
+
+with open('/rainbow_input.json') as file:
+    print(json.loads(file.readline()))
+
+with open('/output.json', 'w') as file:
+    file.write(json.dumps({'a': 1, 'b': 2}))
diff --git a/tests/runners/airflow/rainbow/rainbow.yml 
b/tests/runners/airflow/rainbow/rainbow.yml
index 3e3ec4b..2000621 100644
--- a/tests/runners/airflow/rainbow/rainbow.yml
+++ b/tests/runners/airflow/rainbow/rainbow.yml
@@ -25,28 +25,41 @@ pipelines:
     schedule: 0 * 1 * *
     metrics-namespace: TestNamespace
     tasks:
-      - task: my_static_config_task
+      - task: my_static_input_task
         type: python
-        description: my 1st ds task
-        image: rainbow_image
+        description: static input task
+        image: my_static_input_task_image
         source: hello_world
         env_vars:
           env1: "a"
           env2: "b"
         input_type: static
-        input_path: "{\"configs\": [ { \"campaign_id\": 10 }, { 
\"campaign_id\": 20 } ]}"
-        cmd: 'python hello_world.py'
-      - task: my_static_config_task2
+        input_path: '[ { "foo": "bar" }, { "foo": "baz" } ]'
+        output_path: /output.json
+        cmd: python hello_world.py
+#      - task: my_parallelized_static_input_task
+#        type: python
+#        description: parallelized static input task
+#        image: my_static_input_task_image
+#        env_vars:
+#          env1: "a"
+#          env2: "b"
+#        input_type: static
+#        input_path: '[ { "foo": "bar" }, { "foo": "baz" } ]'
+#        split_input: True
+#        executors: 2
+#        cmd: python hello_world.py
+      - task: my_task_output_input_task
         type: python
-        description: my 1st ds task
-        image: rainbow_image2
+        description: parallelized static input task
+        image: my_task_output_input_task_image
         source: hello_world
         env_vars:
           env1: "a"
           env2: "b"
-        input_type: static
-        input_path: "{\"configs\": [ { \"campaign_id\": 10 }, { 
\"campaign_id\": 20 } ]}"
-        cmd: 'python hello_world.py'
+        input_type: task
+        input_path: my_static_input_task
+        cmd: python hello_world.py
 services:
   - service:
     name: myserver1
diff --git a/tests/runners/airflow/tasks/test_python.py 
b/tests/runners/airflow/tasks/test_python.py
index ffdcac3..260f71d 100644
--- a/tests/runners/airflow/tasks/test_python.py
+++ b/tests/runners/airflow/tasks/test_python.py
@@ -19,8 +19,8 @@
 import unittest
 from unittest import TestCase
 
-from rainbow.runners.airflow.operators.kubernetes_pod_operator import \
-    ConfigurableKubernetesPodOperator
+from 
rainbow.runners.airflow.operators.kubernetes_pod_operator_with_input_output 
import \
+    KubernetesPodOperatorWithInputAndOutput
 from rainbow.runners.airflow.tasks import python
 from tests.util import dag_test_utils
 
@@ -41,7 +41,7 @@ class TestPythonTask(TestCase):
         self.assertEqual(len(dag.tasks), 1)
         dag_task0 = dag.tasks[0]
 
-        self.assertIsInstance(dag_task0, ConfigurableKubernetesPodOperator)
+        self.assertIsInstance(dag_task0, 
KubernetesPodOperatorWithInputAndOutput)
         self.assertEqual(dag_task0.task_id, task_id)
 
     @staticmethod

Reply via email to