This is an automated email from the ASF dual-hosted git repository. jbonofre pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-liminal.git
commit aa63e16674d6bfa904a2b6764dd2257d2f3827de Author: aviemzur <[email protected]> AuthorDate: Tue Mar 10 11:57:25 2020 +0200 rainbow_dags dag creation + python task --- rainbow/cli/__init__.py | 1 + rainbow/core/__init__.py | 1 + rainbow/docker/__init__.py | 1 + rainbow/http/__init__.py | 1 + rainbow/monitoring/__init__.py | 1 + .../runners/airflow/compiler/rainbow_compiler.py | 8 +- rainbow/runners/airflow/dag/rainbow_dags.py | 90 ++++++++++ .../airflow/model}/__init__.py | 0 .../rainbow_compiler.py => model/task.py} | 22 ++- .../airflow/tasks}/__init__.py | 0 rainbow/runners/airflow/tasks/python.py | 190 +++++++++++++++++++++ tests/runners/airflow/compiler/rainbow.yml | 115 ------------- .../airflow/compiler/test_rainbow_compiler.py | 33 ---- .../runners/airflow/dag}/__init__.py | 0 tests/runners/airflow/dag/rainbow/rainbow.yml | 51 ++++++ tests/runners/airflow/dag/test_rainbow_dags.py | 11 ++ .../runners/airflow/tasks}/__init__.py | 0 tests/runners/airflow/tasks/test_python.py | 50 ++++++ {rainbow/monitoring => tests/util}/__init__.py | 0 .../util/dag_test_utils.py | 21 ++- 20 files changed, 429 insertions(+), 167 deletions(-) diff --git a/rainbow/cli/__init__.py b/rainbow/cli/__init__.py index 217e5db..c24b2fa 100644 --- a/rainbow/cli/__init__.py +++ b/rainbow/cli/__init__.py @@ -15,3 +15,4 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +# TODO: cli diff --git a/rainbow/core/__init__.py b/rainbow/core/__init__.py index 217e5db..2162b08 100644 --- a/rainbow/core/__init__.py +++ b/rainbow/core/__init__.py @@ -15,3 +15,4 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +# TODO: core diff --git a/rainbow/docker/__init__.py b/rainbow/docker/__init__.py index 217e5db..8bb1ec2 100644 --- a/rainbow/docker/__init__.py +++ b/rainbow/docker/__init__.py @@ -15,3 +15,4 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +# TODO: docker diff --git a/rainbow/http/__init__.py b/rainbow/http/__init__.py index 217e5db..d723ae2 100644 --- a/rainbow/http/__init__.py +++ b/rainbow/http/__init__.py @@ -15,3 +15,4 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +# TODO: http diff --git a/rainbow/monitoring/__init__.py b/rainbow/monitoring/__init__.py index 217e5db..8df8694 100644 --- a/rainbow/monitoring/__init__.py +++ b/rainbow/monitoring/__init__.py @@ -15,3 +15,4 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +# TODO: monitoring diff --git a/rainbow/runners/airflow/compiler/rainbow_compiler.py b/rainbow/runners/airflow/compiler/rainbow_compiler.py index 818fdc5..bed1efd 100644 --- a/rainbow/runners/airflow/compiler/rainbow_compiler.py +++ b/rainbow/runners/airflow/compiler/rainbow_compiler.py @@ -16,11 +16,5 @@ # specific language governing permissions and limitations # under the License. """ -Compiler for rainbows. +TODO: compiler for rainbows. """ -import yaml - - -def parse_yaml(path): - with open(path, 'r') as stream: - return yaml.safe_load(stream) diff --git a/rainbow/runners/airflow/dag/rainbow_dags.py b/rainbow/runners/airflow/dag/rainbow_dags.py new file mode 100644 index 0000000..577da07 --- /dev/null +++ b/rainbow/runners/airflow/dag/rainbow_dags.py @@ -0,0 +1,90 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# TODO: Iterate over each pipeline and create a DAG for it. \ +# Within every pipeline iterate over tasks and apply them to DAG. + +import os +import pprint + +import yaml +from airflow import DAG + +from rainbow.runners.airflow.tasks.python import PythonTask +from datetime import datetime + + +def register_dags(path): + files = [] + for r, d, f in os.walk(path): + for file in f: + if file[file.rfind('.') + 1:] in ['yml', 'yaml']: + files.append(os.path.join(r, file)) + + print(files) + + dags = [] + + for config_file in files: + print(f'Registering DAG for file: f{config_file}') + + with open(config_file) as stream: + # TODO: validate config + config = yaml.safe_load(stream) + pp = pprint.PrettyPrinter(indent=4) + # pp.pprint(config) + + for pipeline in config['pipelines']: + parent = None + + default_args = { + 'owner': config['owner'], + 'start_date': datetime.combine(pipeline['start_date'], datetime.min.time()) + } + # TODO: add all relevant airflow args + dag = DAG( + dag_id='test_dag', + default_args=default_args + ) + + for task in pipeline['tasks']: + task_type = task['type'] + task_instance = get_task_class(task_type)( + dag, pipeline['pipeline'], parent if parent else None, task, 'all_success' + ) + parent = task_instance.apply_task_to_dag() + + print(f'{parent}{{{task_type}}}') + + dags.append(dag) + return dags + + +# TODO: task class registry +task_classes = { + 'python': PythonTask +} + + +def get_task_class(task_type): + return task_classes[task_type] + + +if __name__ == '__main__': + # TODO: configurable yaml dir + path = 'tests/runners/airflow/dag/rainbow' + register_dags(path) diff --git a/rainbow/monitoring/__init__.py b/rainbow/runners/airflow/model/__init__.py similarity index 100% copy from rainbow/monitoring/__init__.py copy to rainbow/runners/airflow/model/__init__.py diff --git a/rainbow/runners/airflow/compiler/rainbow_compiler.py b/rainbow/runners/airflow/model/task.py similarity index 72% copy from rainbow/runners/airflow/compiler/rainbow_compiler.py copy to rainbow/runners/airflow/model/task.py index 818fdc5..e74085d 100644 --- a/rainbow/runners/airflow/compiler/rainbow_compiler.py +++ b/rainbow/runners/airflow/model/task.py @@ -16,11 +16,23 @@ # specific language governing permissions and limitations # under the License. """ -Compiler for rainbows. +Base task. """ -import yaml -def parse_yaml(path): - with open(path, 'r') as stream: - return yaml.safe_load(stream) +class Task: + """ + Task. + """ + + def setup(self): + """ + Setup method for task. + """ + raise NotImplementedError() + + def apply_task_to_dag(self): + """ + Registers Airflow operator to parent task. + """ + raise NotImplementedError() diff --git a/rainbow/monitoring/__init__.py b/rainbow/runners/airflow/tasks/__init__.py similarity index 100% copy from rainbow/monitoring/__init__.py copy to rainbow/runners/airflow/tasks/__init__.py diff --git a/rainbow/runners/airflow/tasks/python.py b/rainbow/runners/airflow/tasks/python.py new file mode 100644 index 0000000..727e11c --- /dev/null +++ b/rainbow/runners/airflow/tasks/python.py @@ -0,0 +1,190 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import json + +from airflow.models import Variable +from airflow.operators.dummy_operator import DummyOperator + +from rainbow.runners.airflow.model import task +from rainbow.runners.airflow.operators.kubernetes_pod_operator import \ + ConfigurableKubernetesPodOperator, \ + ConfigureParallelExecutionOperator + + +class PythonTask(task.Task): + """ + Python task. + """ + + def __init__(self, dag, pipeline_name, parent, config, trigger_rule): + self.dag = dag + self.parent = parent + self.config = config + self.trigger_rule = trigger_rule + self.input_type = config['input_type'] + self.input_path = config['input_path'] + self.task_name = config['task'] + self.image = self.config['image'] + self.resources = self.__resources_config(config) + self.env_vars = self.__env_vars(pipeline_name, config) + self.kubernetes_kwargs = self.__kubernetes_kwargs( + dag, self.env_vars, self.resources, self.task_name + ) + self.cmds, self.arguments = self.__kubernetes_cmds_and_arguments(config) + self.config_task_id = self.task_name + '_input' + self.executors = self.__executors(config) + + def setup(self): + # TODO: build docker image if needed. + pass + + def apply_task_to_dag(self): + + def create_pod_operator(task_id, task_split, image): + return ConfigurableKubernetesPodOperator( + task_id=task_id, + config_task_id=self.config_task_id, + task_split=task_split, + image=image, + cmds=self.cmds, + arguments=self.arguments, + **self.kubernetes_kwargs + ) + + config_task = None + + if self.input_type in ['static', 'task']: + self.env_vars.update({'DATA_PIPELINE_INPUT': self.input_path}) + + config_task = ConfigureParallelExecutionOperator( + task_id=self.config_task_id, + image=self.image, + config_type=self.input_type, + config_path=self.input_path, + executors=self.executors, + **self.kubernetes_kwargs + ) + + if self.executors == 1: + pod_task = create_pod_operator( + task_id=f'{self.task_name}', + task_split=0, + image=f'''{self.image}''' + ) + + first_task = pod_task + + if config_task: + first_task = config_task + first_task.set_downstream(pod_task) + + if self.parent: + self.parent.set_downstream(first_task) + + return pod_task + else: + if not config_task: + config_task = DummyOperator( + task_id=self.config_task_id, + trigger_rule=self.trigger_rule, + dag=self.dag + ) + + end_task = DummyOperator( + task_id=self.task_name, + dag=self.dag + ) + + if self.parent: + self.parent.set_downstream(config_task) + + for i in range(self.executors): + split_task = create_pod_operator( + task_id=f'''{self.task_name}_{i}''', + task_split=i, + image=self.image + ) + + config_task.set_downstream(split_task) + + split_task.set_downstream(end_task) + + return end_task + + @staticmethod + def __executors(config): + executors = 1 + if 'executors' in config: + executors = config['executors'] + return executors + + @staticmethod + def __kubernetes_cmds_and_arguments(config): + cmds = ['/bin/bash', '-c'] + arguments = [ + f'''sh container-setup.sh && \ + {config['cmd']} && \ + sh container-teardown.sh {config['output_path']}''' + ] + return cmds, arguments + + @staticmethod + def __kubernetes_kwargs(dag, env_vars, resources, task_name): + kubernetes_kwargs = { + 'namespace': Variable.get('kubernetes_namespace', default_var='default'), + 'name': task_name.replace('_', '-'), + 'in_cluster': Variable.get('in_kubernetes_cluster', default_var=False), + 'image_pull_policy': Variable.get('image_pull_policy', default_var='IfNotPresent'), + 'get_logs': True, + 'env_vars': env_vars, + 'do_xcom_push': True, + 'is_delete_operator_pod': True, + 'startup_timeout_seconds': 300, + 'image_pull_secrets': 'regcred', + 'resources': resources, + 'dag': dag + } + return kubernetes_kwargs + + @staticmethod + def __env_vars(pipeline_name, config): + env_vars = {} + if 'env_vars' in config: + env_vars = config['env_vars'] + airflow_configuration_variable = Variable.get( + f'''{pipeline_name}_dag_configuration''', + default_var=None) + if airflow_configuration_variable: + airflow_configs = json.loads(airflow_configuration_variable) + environment_variables_key = f'''{self.pipeline}_environment_variables''' + if environment_variables_key in airflow_configs: + env_vars = airflow_configs[environment_variables_key] + return env_vars + + @staticmethod + def __resources_config(config): + resources = {} + if 'request_cpu' in config: + resources['request_cpu'] = config['request_cpu'] + if 'request_memory' in config: + resources['request_memory'] = config['request_memory'] + if 'limit_cpu' in config: + resources['limit_cpu'] = config['limit_cpu'] + if 'limit_memory' in config: + resources['limit_memory'] = config['limit_memory'] + return resources diff --git a/tests/runners/airflow/compiler/rainbow.yml b/tests/runners/airflow/compiler/rainbow.yml deleted file mode 100644 index 45333a8..0000000 --- a/tests/runners/airflow/compiler/rainbow.yml +++ /dev/null @@ -1,115 +0,0 @@ - ---- -name: MyPipeline -owner: Bosco Albert Baracus -pipeline: - timeout-minutes: 45 - schedule: 0 * 1 * * - metrics-namespace: TestNamespace - tasks: - - name: mytask1 - type: sql - description: mytask1 is cool - query: "select * from mytable" - overrides: - - prod: - partition-columns: dt - output-table: test.test_impression_prod - output-path: s3://mybucket/myproject-test/impression - emr-cluster-name: spark-playground-prod - - stg: - query: "select * from mytable" - partition-columns: dt - output-table: test.test_impression_stg - output-path: s3://mybucket/haya-test/impression - emr-cluster-name: spark-playground-staging - tasks: - - name: my_static_config_task - type: python - description: my 1st ds task - artifact-id: mytask1artifactid - source: mytask1folder - env-vars: - env1: "a" - env2: "b" - config-type: static - config-path: "{\"configs\": [ { \"campaign_id\": 10 }, { \"campaign_id\": 20 } ]}" - cmd: python -u my_app.py - - task: - name: my_no_config_task - type: python - description: my 2nd ds task - artifact-id: mytask1artifactid - env-vars: - env1: "a" - env2: "b" - request-cpu: 100m - request-memory: 65M - cmd: python -u my_app.py foo bar - - task: - name: my_create_custom_config_task - type: python - description: my 2nd ds task - artifact-id: myconftask - source: myconftask - output-config-path: /my_conf.json - env-vars: - env1: "a" - env2: "b" - cmd: python -u my_app.py foo bar - - task: - name: my_custom_config_task - type: python - description: my 2nd ds task - artifact-id: mytask1artifactid - config-type: task - config-path: my_create_custom_config_task - env-vars: - env1: "a" - env2: "b" - cmd: python -u my_app.py foo bar - - task: - name: my_parallelized_static_config_task - type: python - description: my 3rd ds task - artifact-id: mytask1artifactid - executors: 5 - env-vars: - env1: "x" - env2: "y" - myconf: $CONFIG_FILE - config-type: static - config-path: "{\"configs\": [ { \"campaign_id\": 10 }, { \"campaign_id\": 20 }, { \"campaign_id\": 30 }, { \"campaign_id\": 40 }, { \"campaign_id\": 50 }, { \"campaign_id\": 60 }, { \"campaign_id\": 70 }, { \"campaign_id\": 80 } ]}" - cmd: python -u my_app.py $CONFIG_FILE - - task: - name: my_parallelized_custom_config_task - type: python - description: my 4th ds task - artifact-id: mytask1artifactid - executors: 5 - config-type: task - config-path: my_create_custom_config_task - cmd: python -u my_app.py - - task: - name: my_parallelized_no_config_task - type: python - description: my 4th ds task - artifact-id: mytask1artifactid - executors: 5 - cmd: python -u my_app.py -services: - - service: - name: myserver1 - type: python-server - description: my python server - artifact-id: myserver1artifactid - source: myserver1logicfolder - endpoints: - - endpoint: - path: /myendpoint1 - module: mymodule1 - function: myfun1 - - endpoint: - path: /myendpoint2 - module: mymodule2 - function: myfun2 diff --git a/tests/runners/airflow/compiler/test_rainbow_compiler.py b/tests/runners/airflow/compiler/test_rainbow_compiler.py deleted file mode 100644 index 6e73d8f..0000000 --- a/tests/runners/airflow/compiler/test_rainbow_compiler.py +++ /dev/null @@ -1,33 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import unittest - -from rainbow.runners.airflow.compiler import rainbow_compiler - - -class TestRainbowCompiler(unittest.TestCase): - - def test_parse(self): - expected = {'name': 'MyPipeline', 'owner': 'Bosco Albert Baracus', 'pipeline': {'timeout-minutes': 45, 'schedule': '0 * 1 * *', 'metrics-namespace': 'TestNamespace', 'tasks': [{'name': 'mytask1', 'type': 'sql', 'description': 'mytask1 is cool', 'query': 'select * from mytable', 'overrides': [{'prod': None, 'partition-columns': 'dt', 'output-table': 'test.test_impression_prod', 'output-path': 's3://mybucket/myproject-test/impression', 'emr-cluster-name': 'spark-playground-prod'}, [...] - actual = rainbow_compiler.parse_yaml('tests/runners/airflow/compiler/rainbow.yml') - self.assertEqual(expected, actual) - - -if __name__ == '__main__': - unittest.main() diff --git a/rainbow/monitoring/__init__.py b/tests/runners/airflow/dag/__init__.py similarity index 100% copy from rainbow/monitoring/__init__.py copy to tests/runners/airflow/dag/__init__.py diff --git a/tests/runners/airflow/dag/rainbow/rainbow.yml b/tests/runners/airflow/dag/rainbow/rainbow.yml new file mode 100644 index 0000000..07afd08 --- /dev/null +++ b/tests/runners/airflow/dag/rainbow/rainbow.yml @@ -0,0 +1,51 @@ + +--- +name: MyPipeline +owner: Bosco Albert Baracus +pipelines: + - pipeline: my_pipeline + start_date: 1970-01-01 + timeout-minutes: 45 + schedule: 0 * 1 * * + metrics-namespace: TestNamespace + tasks: + - task: my_static_config_task + type: python + description: my 1st ds task + image: mytask1artifactid + source: mytask1folder + env_vars: + env1: "a" + env2: "b" + input_type: static + input_path: "{\"configs\": [ { \"campaign_id\": 10 }, { \"campaign_id\": 20 } ]}" + output_path: 'baz' + cmd: 'foo bar' + - task: my_static_config_task2 + type: python + description: my 1st ds task + image: mytask1artifactid + source: mytask1folder + env_vars: + env1: "a" + env2: "b" + input_type: static + input_path: "{\"configs\": [ { \"campaign_id\": 10 }, { \"campaign_id\": 20 } ]}" + output_path: 'baz' + cmd: 'foo bar' +services: + - service: + name: myserver1 + type: python-server + description: my python server + artifact-id: myserver1artifactid + source: myserver1logicfolder + endpoints: + - endpoint: + path: /myendpoint1 + module: mymodule1 + function: myfun1 + - endpoint: + path: /myendpoint2 + module: mymodule2 + function: myfun2 diff --git a/tests/runners/airflow/dag/test_rainbow_dags.py b/tests/runners/airflow/dag/test_rainbow_dags.py new file mode 100644 index 0000000..41bea09 --- /dev/null +++ b/tests/runners/airflow/dag/test_rainbow_dags.py @@ -0,0 +1,11 @@ +from unittest import TestCase + +from rainbow.runners.airflow.dag import rainbow_dags + + +class Test(TestCase): + def test_register_dags(self): + dags = rainbow_dags.register_dags("tests/runners/airflow/dag/rainbow") + self.assertEqual(len(dags), 1) + # TODO: elaborate test + pass diff --git a/rainbow/monitoring/__init__.py b/tests/runners/airflow/tasks/__init__.py similarity index 100% copy from rainbow/monitoring/__init__.py copy to tests/runners/airflow/tasks/__init__.py diff --git a/tests/runners/airflow/tasks/test_python.py b/tests/runners/airflow/tasks/test_python.py new file mode 100644 index 0000000..4f5808b --- /dev/null +++ b/tests/runners/airflow/tasks/test_python.py @@ -0,0 +1,50 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from unittest import TestCase + +from rainbow.runners.airflow.operators.kubernetes_pod_operator import \ + ConfigurableKubernetesPodOperator +from rainbow.runners.airflow.tasks import python +from tests.util import dag_test_utils + + +class TestPythonTask(TestCase): + def test_apply_task_to_dag(self): + # TODO: elaborate tests + dag = dag_test_utils.create_dag() + + task_id = 'my_task' + + config = { + 'task': task_id, + 'cmd': 'foo bar', + 'image': 'my_image', + 'input_type': 'my_input_type', + 'input_path': 'my_input', + 'output_path': '/my_output.json' + } + + task0 = python.PythonTask(dag, 'my_pipeline', None, config, 'all_success') + task0.apply_task_to_dag() + + self.assertEqual(len(dag.tasks), 1) + dag_task0 = dag.tasks[0] + + self.assertIsInstance(dag_task0, ConfigurableKubernetesPodOperator) + self.assertEqual(dag_task0.task_id, task_id) diff --git a/rainbow/monitoring/__init__.py b/tests/util/__init__.py similarity index 100% copy from rainbow/monitoring/__init__.py copy to tests/util/__init__.py diff --git a/rainbow/runners/airflow/compiler/rainbow_compiler.py b/tests/util/dag_test_utils.py similarity index 76% copy from rainbow/runners/airflow/compiler/rainbow_compiler.py copy to tests/util/dag_test_utils.py index 818fdc5..b1fbcab 100644 --- a/rainbow/runners/airflow/compiler/rainbow_compiler.py +++ b/tests/util/dag_test_utils.py @@ -15,12 +15,19 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -""" -Compiler for rainbows. -""" -import yaml -def parse_yaml(path): - with open(path, 'r') as stream: - return yaml.safe_load(stream) +from datetime import datetime + +from airflow import DAG + + +def create_dag(): + """ + Test util to create a basic DAG for testing. + """ + + return DAG( + dag_id='test_dag', + default_args={'start_date': datetime(1970, 1, 1)} + )
