This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-liminal.git

commit 9e646df8d50cd5a9b9b2d966d79ce08df71efebb
Author: aviemzur <[email protected]>
AuthorDate: Tue Mar 10 12:16:41 2020 +0200

    Tasks stubs
---
 rainbow/runners/airflow/dag/rainbow_dags.py        |  4 +-
 rainbow/runners/airflow/model/task.py              |  7 ++
 .../create_cloudformation_stack.py}                | 22 +++---
 .../delete_cloudformation_stack.py}                | 22 +++---
 .../airflow/{model/task.py => tasks/job_end.py}    | 22 +++---
 .../airflow/{model/task.py => tasks/job_start.py}  | 23 +++---
 rainbow/runners/airflow/tasks/python.py            | 81 ++++++++++------------
 .../airflow/{model/task.py => tasks/spark.py}      | 22 +++---
 .../airflow/{model/task.py => tasks/sql.py}        | 22 +++---
 rainbow/sql/__init__.py                            |  1 +
 10 files changed, 101 insertions(+), 125 deletions(-)

diff --git a/rainbow/runners/airflow/dag/rainbow_dags.py 
b/rainbow/runners/airflow/dag/rainbow_dags.py
index 577da07..6bdf66b 100644
--- a/rainbow/runners/airflow/dag/rainbow_dags.py
+++ b/rainbow/runners/airflow/dag/rainbow_dags.py
@@ -15,17 +15,15 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-# TODO: Iterate over each pipeline and create a DAG for it. \
-#  Within every pipeline iterate over tasks and apply them to DAG.
 
 import os
 import pprint
+from datetime import datetime
 
 import yaml
 from airflow import DAG
 
 from rainbow.runners.airflow.tasks.python import PythonTask
-from datetime import datetime
 
 
 def register_dags(path):
diff --git a/rainbow/runners/airflow/model/task.py 
b/rainbow/runners/airflow/model/task.py
index e74085d..2650aa1 100644
--- a/rainbow/runners/airflow/model/task.py
+++ b/rainbow/runners/airflow/model/task.py
@@ -25,6 +25,13 @@ class Task:
     Task.
     """
 
+    def __init__(self, dag, pipeline_name, parent, config, trigger_rule):
+        self.dag = dag
+        self.pipeline_name = pipeline_name
+        self.parent = parent
+        self.config = config
+        self.trigger_rule = trigger_rule
+
     def setup(self):
         """
         Setup method for task.
diff --git a/rainbow/runners/airflow/model/task.py 
b/rainbow/runners/airflow/tasks/create_cloudformation_stack.py
similarity index 73%
copy from rainbow/runners/airflow/model/task.py
copy to rainbow/runners/airflow/tasks/create_cloudformation_stack.py
index e74085d..9304167 100644
--- a/rainbow/runners/airflow/model/task.py
+++ b/rainbow/runners/airflow/tasks/create_cloudformation_stack.py
@@ -15,24 +15,20 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""
-Base task.
-"""
 
+from rainbow.runners.airflow.model import task
 
-class Task:
+
+class CreateCloudFormationStackTask(task.Task):
     """
-    Task.
+    # TODO: Creates cloud_formation stack.
     """
 
+    def __init__(self, dag, pipeline_name, parent, config, trigger_rule):
+        super().__init__(dag, pipeline_name, parent, config, trigger_rule)
+
     def setup(self):
-        """
-        Setup method for task.
-        """
-        raise NotImplementedError()
+        pass
 
     def apply_task_to_dag(self):
-        """
-        Registers Airflow operator to parent task.
-        """
-        raise NotImplementedError()
+        pass
diff --git a/rainbow/runners/airflow/model/task.py 
b/rainbow/runners/airflow/tasks/delete_cloudformation_stack.py
similarity index 73%
copy from rainbow/runners/airflow/model/task.py
copy to rainbow/runners/airflow/tasks/delete_cloudformation_stack.py
index e74085d..66d5783 100644
--- a/rainbow/runners/airflow/model/task.py
+++ b/rainbow/runners/airflow/tasks/delete_cloudformation_stack.py
@@ -15,24 +15,20 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""
-Base task.
-"""
 
+from rainbow.runners.airflow.model import task
 
-class Task:
+
+class DeleteCloudFormationStackTask(task.Task):
     """
-    Task.
+    # TODO: Deletes cloud_formation stack.
     """
 
+    def __init__(self, dag, pipeline_name, parent, config, trigger_rule):
+        super().__init__(dag, pipeline_name, parent, config, trigger_rule)
+
     def setup(self):
-        """
-        Setup method for task.
-        """
-        raise NotImplementedError()
+        pass
 
     def apply_task_to_dag(self):
-        """
-        Registers Airflow operator to parent task.
-        """
-        raise NotImplementedError()
+        pass
diff --git a/rainbow/runners/airflow/model/task.py 
b/rainbow/runners/airflow/tasks/job_end.py
similarity index 73%
copy from rainbow/runners/airflow/model/task.py
copy to rainbow/runners/airflow/tasks/job_end.py
index e74085d..b3244c4 100644
--- a/rainbow/runners/airflow/model/task.py
+++ b/rainbow/runners/airflow/tasks/job_end.py
@@ -15,24 +15,20 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""
-Base task.
-"""
 
+from rainbow.runners.airflow.model import task
 
-class Task:
+
+class JobEndTask(task.Task):
     """
-    Task.
+    # TODO: Job end task. Reports job end metrics.
     """
 
+    def __init__(self, dag, pipeline_name, parent, config, trigger_rule):
+        super().__init__(dag, pipeline_name, parent, config, trigger_rule)
+
     def setup(self):
-        """
-        Setup method for task.
-        """
-        raise NotImplementedError()
+        pass
 
     def apply_task_to_dag(self):
-        """
-        Registers Airflow operator to parent task.
-        """
-        raise NotImplementedError()
+        pass
diff --git a/rainbow/runners/airflow/model/task.py 
b/rainbow/runners/airflow/tasks/job_start.py
similarity index 71%
copy from rainbow/runners/airflow/model/task.py
copy to rainbow/runners/airflow/tasks/job_start.py
index e74085d..f794e09 100644
--- a/rainbow/runners/airflow/model/task.py
+++ b/rainbow/runners/airflow/tasks/job_start.py
@@ -15,24 +15,21 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""
-Base task.
-"""
 
+from rainbow.runners.airflow.model import task
 
-class Task:
+
+class JobStartTask(task.Task):
     """
-    Task.
+    # TODO: Job start task. Reports job start metrics.
     """
 
+    def __init__(self, dag, pipeline_name, parent, config, trigger_rule):
+        super().__init__(dag, pipeline_name, parent, config, trigger_rule)
+
     def setup(self):
-        """
-        Setup method for task.
-        """
-        raise NotImplementedError()
+        pass
 
     def apply_task_to_dag(self):
-        """
-        Registers Airflow operator to parent task.
-        """
-        raise NotImplementedError()
+        # TODO: job start task
+        pass
diff --git a/rainbow/runners/airflow/tasks/python.py 
b/rainbow/runners/airflow/tasks/python.py
index 727e11c..983ce0c 100644
--- a/rainbow/runners/airflow/tasks/python.py
+++ b/rainbow/runners/airflow/tasks/python.py
@@ -32,22 +32,18 @@ class PythonTask(task.Task):
     """
 
     def __init__(self, dag, pipeline_name, parent, config, trigger_rule):
-        self.dag = dag
-        self.parent = parent
-        self.config = config
-        self.trigger_rule = trigger_rule
-        self.input_type = config['input_type']
-        self.input_path = config['input_path']
-        self.task_name = config['task']
+        super().__init__(dag, pipeline_name, parent, config, trigger_rule)
+
+        self.input_type = self.config['input_type']
+        self.input_path = self.config['input_path']
+        self.task_name = self.config['task']
         self.image = self.config['image']
-        self.resources = self.__resources_config(config)
-        self.env_vars = self.__env_vars(pipeline_name, config)
-        self.kubernetes_kwargs = self.__kubernetes_kwargs(
-            dag, self.env_vars, self.resources, self.task_name
-        )
-        self.cmds, self.arguments = 
self.__kubernetes_cmds_and_arguments(config)
+        self.resources = self.__kubernetes_resources()
+        self.env_vars = self.__env_vars()
+        self.kubernetes_kwargs = self.__kubernetes_kwargs()
+        self.cmds, self.arguments = self.__kubernetes_cmds_and_arguments()
         self.config_task_id = self.task_name + '_input'
-        self.executors = self.__executors(config)
+        self.executors = self.__executors()
 
     def setup(self):
         # TODO: build docker image if needed.
@@ -126,65 +122,62 @@ class PythonTask(task.Task):
 
             return end_task
 
-    @staticmethod
-    def __executors(config):
+    def __executors(self):
         executors = 1
-        if 'executors' in config:
-            executors = config['executors']
+        if 'executors' in self.config:
+            executors = self.config['executors']
         return executors
 
-    @staticmethod
-    def __kubernetes_cmds_and_arguments(config):
+    def __kubernetes_cmds_and_arguments(self):
         cmds = ['/bin/bash', '-c']
         arguments = [
             f'''sh container-setup.sh && \
-            {config['cmd']} && \
-            sh container-teardown.sh {config['output_path']}'''
+            {self.config['cmd']} && \
+            sh container-teardown.sh {self.config['output_path']}'''
         ]
         return cmds, arguments
 
-    @staticmethod
-    def __kubernetes_kwargs(dag, env_vars, resources, task_name):
+    def __kubernetes_kwargs(self):
         kubernetes_kwargs = {
             'namespace': Variable.get('kubernetes_namespace', 
default_var='default'),
-            'name': task_name.replace('_', '-'),
+            'name': self.task_name.replace('_', '-'),
             'in_cluster': Variable.get('in_kubernetes_cluster', 
default_var=False),
             'image_pull_policy': Variable.get('image_pull_policy', 
default_var='IfNotPresent'),
             'get_logs': True,
-            'env_vars': env_vars,
+            'env_vars': self.env_vars,
             'do_xcom_push': True,
             'is_delete_operator_pod': True,
             'startup_timeout_seconds': 300,
             'image_pull_secrets': 'regcred',
-            'resources': resources,
-            'dag': dag
+            'resources': self.resources,
+            'dag': self.dag
         }
         return kubernetes_kwargs
 
-    @staticmethod
-    def __env_vars(pipeline_name, config):
+    def __env_vars(self):
         env_vars = {}
-        if 'env_vars' in config:
-            env_vars = config['env_vars']
+        if 'env_vars' in self.config:
+            env_vars = self.config['env_vars']
         airflow_configuration_variable = Variable.get(
-            f'''{pipeline_name}_dag_configuration''',
+            f'''{self.pipeline_name}_dag_configuration''',
             default_var=None)
         if airflow_configuration_variable:
             airflow_configs = json.loads(airflow_configuration_variable)
-            environment_variables_key = 
f'''{self.pipeline}_environment_variables'''
+            environment_variables_key = 
f'''{self.pipeline_name}_environment_variables'''
             if environment_variables_key in airflow_configs:
                 env_vars = airflow_configs[environment_variables_key]
         return env_vars
 
-    @staticmethod
-    def __resources_config(config):
+    def __kubernetes_resources(self):
         resources = {}
-        if 'request_cpu' in config:
-            resources['request_cpu'] = config['request_cpu']
-        if 'request_memory' in config:
-            resources['request_memory'] = config['request_memory']
-        if 'limit_cpu' in config:
-            resources['limit_cpu'] = config['limit_cpu']
-        if 'limit_memory' in config:
-            resources['limit_memory'] = config['limit_memory']
+
+        if 'request_cpu' in self.config:
+            resources['request_cpu'] = self.config['request_cpu']
+        if 'request_memory' in self.config:
+            resources['request_memory'] = self.config['request_memory']
+        if 'limit_cpu' in self.config:
+            resources['limit_cpu'] = self.config['limit_cpu']
+        if 'limit_memory' in self.config:
+            resources['limit_memory'] = self.config['limit_memory']
+
         return resources
diff --git a/rainbow/runners/airflow/model/task.py 
b/rainbow/runners/airflow/tasks/spark.py
similarity index 74%
copy from rainbow/runners/airflow/model/task.py
copy to rainbow/runners/airflow/tasks/spark.py
index e74085d..ebae64e 100644
--- a/rainbow/runners/airflow/model/task.py
+++ b/rainbow/runners/airflow/tasks/spark.py
@@ -15,24 +15,20 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""
-Base task.
-"""
 
+from rainbow.runners.airflow.model import task
 
-class Task:
+
+class SparkTask(task.Task):
     """
-    Task.
+    # TODO: Executes a Spark application.
     """
 
+    def __init__(self, dag, pipeline_name, parent, config, trigger_rule):
+        super().__init__(dag, pipeline_name, parent, config, trigger_rule)
+
     def setup(self):
-        """
-        Setup method for task.
-        """
-        raise NotImplementedError()
+        pass
 
     def apply_task_to_dag(self):
-        """
-        Registers Airflow operator to parent task.
-        """
-        raise NotImplementedError()
+        pass
diff --git a/rainbow/runners/airflow/model/task.py 
b/rainbow/runners/airflow/tasks/sql.py
similarity index 74%
copy from rainbow/runners/airflow/model/task.py
copy to rainbow/runners/airflow/tasks/sql.py
index e74085d..6dfc0f1 100644
--- a/rainbow/runners/airflow/model/task.py
+++ b/rainbow/runners/airflow/tasks/sql.py
@@ -15,24 +15,20 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""
-Base task.
-"""
 
+from rainbow.runners.airflow.model import task
 
-class Task:
+
+class SparkTask(task.Task):
     """
-    Task.
+    # TODO: Executes an SQL application.
     """
 
+    def __init__(self, dag, pipeline_name, parent, config, trigger_rule):
+        super().__init__(dag, pipeline_name, parent, config, trigger_rule)
+
     def setup(self):
-        """
-        Setup method for task.
-        """
-        raise NotImplementedError()
+        pass
 
     def apply_task_to_dag(self):
-        """
-        Registers Airflow operator to parent task.
-        """
-        raise NotImplementedError()
+        pass
diff --git a/rainbow/sql/__init__.py b/rainbow/sql/__init__.py
index 217e5db..495bf9c 100644
--- a/rainbow/sql/__init__.py
+++ b/rainbow/sql/__init__.py
@@ -15,3 +15,4 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+# TODO: SQL (Scala? Python?)

Reply via email to