This is an automated email from the ASF dual-hosted git repository. jbonofre pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-liminal.git
commit 9e646df8d50cd5a9b9b2d966d79ce08df71efebb Author: aviemzur <[email protected]> AuthorDate: Tue Mar 10 12:16:41 2020 +0200 Tasks stubs --- rainbow/runners/airflow/dag/rainbow_dags.py | 4 +- rainbow/runners/airflow/model/task.py | 7 ++ .../create_cloudformation_stack.py} | 22 +++--- .../delete_cloudformation_stack.py} | 22 +++--- .../airflow/{model/task.py => tasks/job_end.py} | 22 +++--- .../airflow/{model/task.py => tasks/job_start.py} | 23 +++--- rainbow/runners/airflow/tasks/python.py | 81 ++++++++++------------ .../airflow/{model/task.py => tasks/spark.py} | 22 +++--- .../airflow/{model/task.py => tasks/sql.py} | 22 +++--- rainbow/sql/__init__.py | 1 + 10 files changed, 101 insertions(+), 125 deletions(-) diff --git a/rainbow/runners/airflow/dag/rainbow_dags.py b/rainbow/runners/airflow/dag/rainbow_dags.py index 577da07..6bdf66b 100644 --- a/rainbow/runners/airflow/dag/rainbow_dags.py +++ b/rainbow/runners/airflow/dag/rainbow_dags.py @@ -15,17 +15,15 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -# TODO: Iterate over each pipeline and create a DAG for it. \ -# Within every pipeline iterate over tasks and apply them to DAG. import os import pprint +from datetime import datetime import yaml from airflow import DAG from rainbow.runners.airflow.tasks.python import PythonTask -from datetime import datetime def register_dags(path): diff --git a/rainbow/runners/airflow/model/task.py b/rainbow/runners/airflow/model/task.py index e74085d..2650aa1 100644 --- a/rainbow/runners/airflow/model/task.py +++ b/rainbow/runners/airflow/model/task.py @@ -25,6 +25,13 @@ class Task: Task. """ + def __init__(self, dag, pipeline_name, parent, config, trigger_rule): + self.dag = dag + self.pipeline_name = pipeline_name + self.parent = parent + self.config = config + self.trigger_rule = trigger_rule + def setup(self): """ Setup method for task. diff --git a/rainbow/runners/airflow/model/task.py b/rainbow/runners/airflow/tasks/create_cloudformation_stack.py similarity index 73% copy from rainbow/runners/airflow/model/task.py copy to rainbow/runners/airflow/tasks/create_cloudformation_stack.py index e74085d..9304167 100644 --- a/rainbow/runners/airflow/model/task.py +++ b/rainbow/runners/airflow/tasks/create_cloudformation_stack.py @@ -15,24 +15,20 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -""" -Base task. -""" +from rainbow.runners.airflow.model import task -class Task: + +class CreateCloudFormationStackTask(task.Task): """ - Task. + # TODO: Creates cloud_formation stack. """ + def __init__(self, dag, pipeline_name, parent, config, trigger_rule): + super().__init__(dag, pipeline_name, parent, config, trigger_rule) + def setup(self): - """ - Setup method for task. - """ - raise NotImplementedError() + pass def apply_task_to_dag(self): - """ - Registers Airflow operator to parent task. - """ - raise NotImplementedError() + pass diff --git a/rainbow/runners/airflow/model/task.py b/rainbow/runners/airflow/tasks/delete_cloudformation_stack.py similarity index 73% copy from rainbow/runners/airflow/model/task.py copy to rainbow/runners/airflow/tasks/delete_cloudformation_stack.py index e74085d..66d5783 100644 --- a/rainbow/runners/airflow/model/task.py +++ b/rainbow/runners/airflow/tasks/delete_cloudformation_stack.py @@ -15,24 +15,20 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -""" -Base task. -""" +from rainbow.runners.airflow.model import task -class Task: + +class DeleteCloudFormationStackTask(task.Task): """ - Task. + # TODO: Deletes cloud_formation stack. """ + def __init__(self, dag, pipeline_name, parent, config, trigger_rule): + super().__init__(dag, pipeline_name, parent, config, trigger_rule) + def setup(self): - """ - Setup method for task. - """ - raise NotImplementedError() + pass def apply_task_to_dag(self): - """ - Registers Airflow operator to parent task. - """ - raise NotImplementedError() + pass diff --git a/rainbow/runners/airflow/model/task.py b/rainbow/runners/airflow/tasks/job_end.py similarity index 73% copy from rainbow/runners/airflow/model/task.py copy to rainbow/runners/airflow/tasks/job_end.py index e74085d..b3244c4 100644 --- a/rainbow/runners/airflow/model/task.py +++ b/rainbow/runners/airflow/tasks/job_end.py @@ -15,24 +15,20 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -""" -Base task. -""" +from rainbow.runners.airflow.model import task -class Task: + +class JobEndTask(task.Task): """ - Task. + # TODO: Job end task. Reports job end metrics. """ + def __init__(self, dag, pipeline_name, parent, config, trigger_rule): + super().__init__(dag, pipeline_name, parent, config, trigger_rule) + def setup(self): - """ - Setup method for task. - """ - raise NotImplementedError() + pass def apply_task_to_dag(self): - """ - Registers Airflow operator to parent task. - """ - raise NotImplementedError() + pass diff --git a/rainbow/runners/airflow/model/task.py b/rainbow/runners/airflow/tasks/job_start.py similarity index 71% copy from rainbow/runners/airflow/model/task.py copy to rainbow/runners/airflow/tasks/job_start.py index e74085d..f794e09 100644 --- a/rainbow/runners/airflow/model/task.py +++ b/rainbow/runners/airflow/tasks/job_start.py @@ -15,24 +15,21 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -""" -Base task. -""" +from rainbow.runners.airflow.model import task -class Task: + +class JobStartTask(task.Task): """ - Task. + # TODO: Job start task. Reports job start metrics. """ + def __init__(self, dag, pipeline_name, parent, config, trigger_rule): + super().__init__(dag, pipeline_name, parent, config, trigger_rule) + def setup(self): - """ - Setup method for task. - """ - raise NotImplementedError() + pass def apply_task_to_dag(self): - """ - Registers Airflow operator to parent task. - """ - raise NotImplementedError() + # TODO: job start task + pass diff --git a/rainbow/runners/airflow/tasks/python.py b/rainbow/runners/airflow/tasks/python.py index 727e11c..983ce0c 100644 --- a/rainbow/runners/airflow/tasks/python.py +++ b/rainbow/runners/airflow/tasks/python.py @@ -32,22 +32,18 @@ class PythonTask(task.Task): """ def __init__(self, dag, pipeline_name, parent, config, trigger_rule): - self.dag = dag - self.parent = parent - self.config = config - self.trigger_rule = trigger_rule - self.input_type = config['input_type'] - self.input_path = config['input_path'] - self.task_name = config['task'] + super().__init__(dag, pipeline_name, parent, config, trigger_rule) + + self.input_type = self.config['input_type'] + self.input_path = self.config['input_path'] + self.task_name = self.config['task'] self.image = self.config['image'] - self.resources = self.__resources_config(config) - self.env_vars = self.__env_vars(pipeline_name, config) - self.kubernetes_kwargs = self.__kubernetes_kwargs( - dag, self.env_vars, self.resources, self.task_name - ) - self.cmds, self.arguments = self.__kubernetes_cmds_and_arguments(config) + self.resources = self.__kubernetes_resources() + self.env_vars = self.__env_vars() + self.kubernetes_kwargs = self.__kubernetes_kwargs() + self.cmds, self.arguments = self.__kubernetes_cmds_and_arguments() self.config_task_id = self.task_name + '_input' - self.executors = self.__executors(config) + self.executors = self.__executors() def setup(self): # TODO: build docker image if needed. @@ -126,65 +122,62 @@ class PythonTask(task.Task): return end_task - @staticmethod - def __executors(config): + def __executors(self): executors = 1 - if 'executors' in config: - executors = config['executors'] + if 'executors' in self.config: + executors = self.config['executors'] return executors - @staticmethod - def __kubernetes_cmds_and_arguments(config): + def __kubernetes_cmds_and_arguments(self): cmds = ['/bin/bash', '-c'] arguments = [ f'''sh container-setup.sh && \ - {config['cmd']} && \ - sh container-teardown.sh {config['output_path']}''' + {self.config['cmd']} && \ + sh container-teardown.sh {self.config['output_path']}''' ] return cmds, arguments - @staticmethod - def __kubernetes_kwargs(dag, env_vars, resources, task_name): + def __kubernetes_kwargs(self): kubernetes_kwargs = { 'namespace': Variable.get('kubernetes_namespace', default_var='default'), - 'name': task_name.replace('_', '-'), + 'name': self.task_name.replace('_', '-'), 'in_cluster': Variable.get('in_kubernetes_cluster', default_var=False), 'image_pull_policy': Variable.get('image_pull_policy', default_var='IfNotPresent'), 'get_logs': True, - 'env_vars': env_vars, + 'env_vars': self.env_vars, 'do_xcom_push': True, 'is_delete_operator_pod': True, 'startup_timeout_seconds': 300, 'image_pull_secrets': 'regcred', - 'resources': resources, - 'dag': dag + 'resources': self.resources, + 'dag': self.dag } return kubernetes_kwargs - @staticmethod - def __env_vars(pipeline_name, config): + def __env_vars(self): env_vars = {} - if 'env_vars' in config: - env_vars = config['env_vars'] + if 'env_vars' in self.config: + env_vars = self.config['env_vars'] airflow_configuration_variable = Variable.get( - f'''{pipeline_name}_dag_configuration''', + f'''{self.pipeline_name}_dag_configuration''', default_var=None) if airflow_configuration_variable: airflow_configs = json.loads(airflow_configuration_variable) - environment_variables_key = f'''{self.pipeline}_environment_variables''' + environment_variables_key = f'''{self.pipeline_name}_environment_variables''' if environment_variables_key in airflow_configs: env_vars = airflow_configs[environment_variables_key] return env_vars - @staticmethod - def __resources_config(config): + def __kubernetes_resources(self): resources = {} - if 'request_cpu' in config: - resources['request_cpu'] = config['request_cpu'] - if 'request_memory' in config: - resources['request_memory'] = config['request_memory'] - if 'limit_cpu' in config: - resources['limit_cpu'] = config['limit_cpu'] - if 'limit_memory' in config: - resources['limit_memory'] = config['limit_memory'] + + if 'request_cpu' in self.config: + resources['request_cpu'] = self.config['request_cpu'] + if 'request_memory' in self.config: + resources['request_memory'] = self.config['request_memory'] + if 'limit_cpu' in self.config: + resources['limit_cpu'] = self.config['limit_cpu'] + if 'limit_memory' in self.config: + resources['limit_memory'] = self.config['limit_memory'] + return resources diff --git a/rainbow/runners/airflow/model/task.py b/rainbow/runners/airflow/tasks/spark.py similarity index 74% copy from rainbow/runners/airflow/model/task.py copy to rainbow/runners/airflow/tasks/spark.py index e74085d..ebae64e 100644 --- a/rainbow/runners/airflow/model/task.py +++ b/rainbow/runners/airflow/tasks/spark.py @@ -15,24 +15,20 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -""" -Base task. -""" +from rainbow.runners.airflow.model import task -class Task: + +class SparkTask(task.Task): """ - Task. + # TODO: Executes a Spark application. """ + def __init__(self, dag, pipeline_name, parent, config, trigger_rule): + super().__init__(dag, pipeline_name, parent, config, trigger_rule) + def setup(self): - """ - Setup method for task. - """ - raise NotImplementedError() + pass def apply_task_to_dag(self): - """ - Registers Airflow operator to parent task. - """ - raise NotImplementedError() + pass diff --git a/rainbow/runners/airflow/model/task.py b/rainbow/runners/airflow/tasks/sql.py similarity index 74% copy from rainbow/runners/airflow/model/task.py copy to rainbow/runners/airflow/tasks/sql.py index e74085d..6dfc0f1 100644 --- a/rainbow/runners/airflow/model/task.py +++ b/rainbow/runners/airflow/tasks/sql.py @@ -15,24 +15,20 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -""" -Base task. -""" +from rainbow.runners.airflow.model import task -class Task: + +class SparkTask(task.Task): """ - Task. + # TODO: Executes an SQL application. """ + def __init__(self, dag, pipeline_name, parent, config, trigger_rule): + super().__init__(dag, pipeline_name, parent, config, trigger_rule) + def setup(self): - """ - Setup method for task. - """ - raise NotImplementedError() + pass def apply_task_to_dag(self): - """ - Registers Airflow operator to parent task. - """ - raise NotImplementedError() + pass diff --git a/rainbow/sql/__init__.py b/rainbow/sql/__init__.py index 217e5db..495bf9c 100644 --- a/rainbow/sql/__init__.py +++ b/rainbow/sql/__init__.py @@ -15,3 +15,4 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +# TODO: SQL (Scala? Python?)
