This is an automated email from the ASF dual-hosted git repository. jbonofre pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-liminal.git
commit 75965668ccf2e75b2f261ce0ca2d5d26ce117852 Author: zionrubin <[email protected]> AuthorDate: Sun Mar 22 10:47:52 2020 +0200 Add job_start and job_end tasks --- rainbow/runners/airflow/dag/rainbow_dags.py | 21 ++++-- .../{ => airflow/tasks/defaults}/__init__.py | 0 .../tasks/{job_end.py => defaults/default_task.py} | 14 +++- .../airflow/tasks/{ => defaults}/job_end.py | 21 ++++-- .../tasks/{job_end.py => defaults/job_end.py~HEAD} | 21 ++++-- .../airflow/tasks/{ => defaults}/job_start.py | 20 ++++-- .../{job_start.py => defaults/job_start.py~HEAD} | 20 ++++-- tests/runners/airflow/dag/test_rainbow_dags.py | 31 +++++++-- tests/runners/airflow/rainbow/rainbow.yml | 8 ++- .../runners/airflow/tasks/defaults}/__init__.py | 0 .../runners/airflow/tasks/defaults/test_job_end.py | 77 ++++++++++++++++++++++ .../airflow/tasks/defaults/test_job_start.py | 77 ++++++++++++++++++++++ 12 files changed, 276 insertions(+), 34 deletions(-) diff --git a/rainbow/runners/airflow/dag/rainbow_dags.py b/rainbow/runners/airflow/dag/rainbow_dags.py index 15b7d9a..71d18d2 100644 --- a/rainbow/runners/airflow/dag/rainbow_dags.py +++ b/rainbow/runners/airflow/dag/rainbow_dags.py @@ -22,8 +22,11 @@ import yaml from airflow import DAG from airflow.models import Variable -from rainbow.core.util import files_util, class_util +from rainbow.core.util import class_util +from rainbow.core.util import files_util from rainbow.runners.airflow.model.task import Task +from rainbow.runners.airflow.tasks.defaults.job_end import JobEndTask +from rainbow.runners.airflow.tasks.defaults.job_start import JobStartTask def register_dags(configs_path): @@ -42,8 +45,6 @@ def register_dags(configs_path): config = yaml.safe_load(stream) for pipeline in config['pipelines']: - parent = None - pipeline_name = pipeline['pipeline'] default_args = { @@ -58,6 +59,9 @@ def register_dags(configs_path): catchup=False ) + job_start_task = JobStartTask(dag, pipeline_name, None, pipeline, 'all_success') + parent = job_start_task.apply_task_to_dag() + trigger_rule = 'all_success' if 'always_run' in config and config['always_run']: trigger_rule = 'all_done' @@ -70,12 +74,15 @@ def register_dags(configs_path): parent = task_instance.apply_task_to_dag() - print(f'{pipeline_name}: {dag.tasks}') + job_end_task = JobEndTask(dag, pipeline_name, parent, pipeline, 'all_done') + job_end_task.apply_task_to_dag() + + print(f'{pipeline_name}: {dag.tasks}') - globals()[pipeline_name] = dag + globals()[pipeline_name] = dag - dags.append(dag) - return dags + dags.append(dag) + return dags print(f'Loading task implementations..') diff --git a/rainbow/runners/__init__.py b/rainbow/runners/airflow/tasks/defaults/__init__.py similarity index 100% copy from rainbow/runners/__init__.py copy to rainbow/runners/airflow/tasks/defaults/__init__.py diff --git a/rainbow/runners/airflow/tasks/job_end.py b/rainbow/runners/airflow/tasks/defaults/default_task.py similarity index 74% copy from rainbow/runners/airflow/tasks/job_end.py copy to rainbow/runners/airflow/tasks/defaults/default_task.py index 42b5e7f..0e901fc 100644 --- a/rainbow/runners/airflow/tasks/job_end.py +++ b/rainbow/runners/airflow/tasks/defaults/default_task.py @@ -15,17 +15,25 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +""" +Default base task. +""" +from abc import abstractmethod -from rainbow.runners.airflow.model import task +from rainbow.runners.airflow.model.task import Task -class JobEndTask(task.Task): +class DefaultTask(Task): """ - Job end task. Reports job end metrics. + Default Base task. """ def __init__(self, dag, pipeline_name, parent, config, trigger_rule): super().__init__(dag, pipeline_name, parent, config, trigger_rule) + metrics = self.config.get('metrics', {}) + self.metrics_namespace = metrics.get('namespace', '') + self.metrics_backends = metrics.get('backends', []) + @abstractmethod def apply_task_to_dag(self): pass diff --git a/rainbow/runners/airflow/tasks/job_end.py b/rainbow/runners/airflow/tasks/defaults/job_end.py similarity index 61% copy from rainbow/runners/airflow/tasks/job_end.py copy to rainbow/runners/airflow/tasks/defaults/job_end.py index 42b5e7f..e177ccc 100644 --- a/rainbow/runners/airflow/tasks/job_end.py +++ b/rainbow/runners/airflow/tasks/defaults/job_end.py @@ -16,16 +16,29 @@ # specific language governing permissions and limitations # under the License. -from rainbow.runners.airflow.model import task +from rainbow.runners.airflow.operators.job_status_operator import JobEndOperator +from rainbow.runners.airflow.tasks.defaults.default_task import DefaultTask -class JobEndTask(task.Task): +class JobEndTask(DefaultTask): """ - Job end task. Reports job end metrics. + Job end task. Reports job end metrics. """ def __init__(self, dag, pipeline_name, parent, config, trigger_rule): super().__init__(dag, pipeline_name, parent, config, trigger_rule) def apply_task_to_dag(self): - pass + job_end_task = JobEndOperator( + task_id='end', + namespace=self.metrics_namespace, + application_name=self.pipeline_name, + backends=self.metrics_backends, + dag=self.dag, + trigger_rule=self.trigger_rule + ) + + if self.parent: + self.parent.set_downstream(job_end_task) + + return job_end_task diff --git a/rainbow/runners/airflow/tasks/job_end.py b/rainbow/runners/airflow/tasks/defaults/job_end.py~HEAD similarity index 61% rename from rainbow/runners/airflow/tasks/job_end.py rename to rainbow/runners/airflow/tasks/defaults/job_end.py~HEAD index 42b5e7f..e177ccc 100644 --- a/rainbow/runners/airflow/tasks/job_end.py +++ b/rainbow/runners/airflow/tasks/defaults/job_end.py~HEAD @@ -16,16 +16,29 @@ # specific language governing permissions and limitations # under the License. -from rainbow.runners.airflow.model import task +from rainbow.runners.airflow.operators.job_status_operator import JobEndOperator +from rainbow.runners.airflow.tasks.defaults.default_task import DefaultTask -class JobEndTask(task.Task): +class JobEndTask(DefaultTask): """ - Job end task. Reports job end metrics. + Job end task. Reports job end metrics. """ def __init__(self, dag, pipeline_name, parent, config, trigger_rule): super().__init__(dag, pipeline_name, parent, config, trigger_rule) def apply_task_to_dag(self): - pass + job_end_task = JobEndOperator( + task_id='end', + namespace=self.metrics_namespace, + application_name=self.pipeline_name, + backends=self.metrics_backends, + dag=self.dag, + trigger_rule=self.trigger_rule + ) + + if self.parent: + self.parent.set_downstream(job_end_task) + + return job_end_task diff --git a/rainbow/runners/airflow/tasks/job_start.py b/rainbow/runners/airflow/tasks/defaults/job_start.py similarity index 63% copy from rainbow/runners/airflow/tasks/job_start.py copy to rainbow/runners/airflow/tasks/defaults/job_start.py index 64a2f4a..e196919 100644 --- a/rainbow/runners/airflow/tasks/job_start.py +++ b/rainbow/runners/airflow/tasks/defaults/job_start.py @@ -15,11 +15,11 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from rainbow.runners.airflow.operators.job_status_operator import JobStartOperator +from rainbow.runners.airflow.tasks.defaults.default_task import DefaultTask -from rainbow.runners.airflow.model import task - -class JobStartTask(task.Task): +class JobStartTask(DefaultTask): """ Job start task. Reports job start metrics. """ @@ -28,4 +28,16 @@ class JobStartTask(task.Task): super().__init__(dag, pipeline_name, parent, config, trigger_rule) def apply_task_to_dag(self): - pass + job_start_task = JobStartOperator( + task_id='start', + namespace=self.metrics_namespace, + application_name=self.pipeline_name, + backends=self.metrics_backends, + dag=self.dag, + trigger_rule=self.trigger_rule + ) + + if self.parent: + self.parent.set_downstream(job_start_task) + + return job_start_task diff --git a/rainbow/runners/airflow/tasks/job_start.py b/rainbow/runners/airflow/tasks/defaults/job_start.py~HEAD similarity index 63% rename from rainbow/runners/airflow/tasks/job_start.py rename to rainbow/runners/airflow/tasks/defaults/job_start.py~HEAD index 64a2f4a..e196919 100644 --- a/rainbow/runners/airflow/tasks/job_start.py +++ b/rainbow/runners/airflow/tasks/defaults/job_start.py~HEAD @@ -15,11 +15,11 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from rainbow.runners.airflow.operators.job_status_operator import JobStartOperator +from rainbow.runners.airflow.tasks.defaults.default_task import DefaultTask -from rainbow.runners.airflow.model import task - -class JobStartTask(task.Task): +class JobStartTask(DefaultTask): """ Job start task. Reports job start metrics. """ @@ -28,4 +28,16 @@ class JobStartTask(task.Task): super().__init__(dag, pipeline_name, parent, config, trigger_rule) def apply_task_to_dag(self): - pass + job_start_task = JobStartOperator( + task_id='start', + namespace=self.metrics_namespace, + application_name=self.pipeline_name, + backends=self.metrics_backends, + dag=self.dag, + trigger_rule=self.trigger_rule + ) + + if self.parent: + self.parent.set_downstream(job_start_task) + + return job_start_task diff --git a/tests/runners/airflow/dag/test_rainbow_dags.py b/tests/runners/airflow/dag/test_rainbow_dags.py index c8f2e38..d8c1afc 100644 --- a/tests/runners/airflow/dag/test_rainbow_dags.py +++ b/tests/runners/airflow/dag/test_rainbow_dags.py @@ -1,17 +1,38 @@ import os +import unittest from unittest import TestCase from rainbow.runners.airflow.dag import rainbow_dags -import unittest +from rainbow.runners.airflow.operators.job_status_operator import JobEndOperator, JobStartOperator class Test(TestCase): def test_register_dags(self): - base_path = os.path.join(os.path.dirname(__file__), '../rainbow') - dags = rainbow_dags.register_dags(base_path) + dags = self.get_register_dags() + self.assertEqual(len(dags), 1) - # TODO: elaborate test - pass + + test_pipeline = dags[0] + self.assertEqual(test_pipeline.dag_id, 'my_pipeline') + + def test_default_start_task(self): + dags = self.get_register_dags() + + task_dict = dags[0].task_dict + + self.assertIsInstance(task_dict['start'], JobStartOperator) + + def test_default_end_task(self): + dags = self.get_register_dags() + + task_dict = dags[0].task_dict + + self.assertIsInstance(task_dict['end'], JobEndOperator) + + @staticmethod + def get_register_dags(): + base_path = os.path.join(os.path.dirname(__file__), '../rainbow') + return rainbow_dags.register_dags(base_path) if __name__ == '__main__': diff --git a/tests/runners/airflow/rainbow/rainbow.yml b/tests/runners/airflow/rainbow/rainbow.yml index e9f9045..27507fd 100644 --- a/tests/runners/airflow/rainbow/rainbow.yml +++ b/tests/runners/airflow/rainbow/rainbow.yml @@ -23,7 +23,9 @@ pipelines: start_date: 1970-01-01 timeout-minutes: 45 schedule: 0 * 1 * * - metrics-namespace: TestNamespace + metrics: + namespace: TestNamespace + backends: [ 'cloudwatch' ] tasks: - task: my_static_input_task type: python @@ -36,7 +38,7 @@ pipelines: input_type: static input_path: '[ { "foo": "bar" }, { "foo": "baz" } ]' output_path: /output.json - cmd: python -u helloworld.py + cmd: python -u hello_world.py # - task: my_parallelized_static_input_task # type: python # description: parallelized static input task @@ -59,7 +61,7 @@ pipelines: env2: "b" input_type: task input_path: my_static_input_task - cmd: python -u helloworld.py + cmd: python -u hello_world.py services: - service: name: my_python_server diff --git a/rainbow/runners/__init__.py b/tests/runners/airflow/tasks/defaults/__init__.py similarity index 100% rename from rainbow/runners/__init__.py rename to tests/runners/airflow/tasks/defaults/__init__.py diff --git a/tests/runners/airflow/tasks/defaults/test_job_end.py b/tests/runners/airflow/tasks/defaults/test_job_end.py new file mode 100644 index 0000000..9a2c398 --- /dev/null +++ b/tests/runners/airflow/tasks/defaults/test_job_end.py @@ -0,0 +1,77 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import unittest +from unittest import TestCase + +from rainbow.runners.airflow.tasks.defaults import job_end +from tests.util import dag_test_utils + + +class TestJobEndTask(TestCase): + + def test_apply_task_to_dag(self): + conf = { + 'pipeline': 'my_pipeline', + 'metrics': {'namespace': 'EndJobNameSpace', 'backends': ['cloudwatch']}, + } + + dag = dag_test_utils.create_dag() + + task0 = job_end.JobEndTask(dag, 'my_end_pipeline', None, conf, 'all_done') + task0.apply_task_to_dag() + + self.assertEqual(len(dag.tasks), 1) + dag_task0 = dag.tasks[0] + + self.assertEqual(dag_task0.namespace, 'EndJobNameSpace') + self.assertEqual(dag_task0.backends, ['cloudwatch']) + + self.assertEqual(dag_task0.task_id, 'end') + + def test_apply_task_to_dag_missing_metrics(self): + conf = {'pipeline': 'my_pipeline'} + dag = dag_test_utils.create_dag() + + task0 = job_end.JobEndTask(dag, 'my_end_pipeline', None, conf, 'all_done') + task0.apply_task_to_dag() + + self.assertEqual(len(dag.tasks), 1) + dag_task0 = dag.tasks[0] + + self.assertEqual(dag_task0.namespace, '') + self.assertEqual(dag_task0.backends, []) + self.assertEqual(dag_task0.trigger_rule, 'all_done') + + def test_apply_task_to_dag_with_partial_configuration(self): + conf = {'pipeline': 'my_pipeline', 'metrics': {'namespace': 'EndJobNameSpace'}} + dag = dag_test_utils.create_dag() + + task0 = job_end.JobEndTask(dag, 'my_end_pipeline', None, conf, 'all_done') + task0.apply_task_to_dag() + + self.assertEqual(len(dag.tasks), 1) + dag_task0 = dag.tasks[0] + + self.assertEqual(dag_task0.namespace, 'EndJobNameSpace') + self.assertEqual(dag_task0.backends, []) + self.assertEqual(dag_task0.trigger_rule, 'all_done') + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/runners/airflow/tasks/defaults/test_job_start.py b/tests/runners/airflow/tasks/defaults/test_job_start.py new file mode 100644 index 0000000..d07cf4b --- /dev/null +++ b/tests/runners/airflow/tasks/defaults/test_job_start.py @@ -0,0 +1,77 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import unittest +from unittest import TestCase + +from rainbow.runners.airflow.tasks.defaults import job_end, job_start +from tests.util import dag_test_utils + + +class TestJobStartTask(TestCase): + + def test_apply_task_to_dag(self): + conf = { + 'pipeline': 'my_pipeline', + 'metrics': {'namespace': 'StartJobNameSpace', 'backends': ['cloudwatch']}, + } + + dag = dag_test_utils.create_dag() + + task0 = job_start.JobStartTask(dag, 'my_start_pipeline', None, conf, 'all_success') + task0.apply_task_to_dag() + + self.assertEqual(len(dag.tasks), 1) + dag_task0 = dag.tasks[0] + + self.assertEqual(dag_task0.namespace, 'StartJobNameSpace') + self.assertEqual(dag_task0.backends, ['cloudwatch']) + + self.assertEqual(dag_task0.task_id, 'start') + + def test_apply_task_to_dag_missing_metrics(self): + conf = {'pipeline': 'my_pipeline'} + + dag = dag_test_utils.create_dag() + + task0 = job_start.JobStartTask(dag, 'my_end_pipeline', None, conf, 'all_success') + task0.apply_task_to_dag() + + self.assertEqual(len(dag.tasks), 1) + dag_task0 = dag.tasks[0] + + self.assertEqual(dag_task0.namespace, '') + self.assertEqual(dag_task0.backends, []) + self.assertEqual(dag_task0.trigger_rule, 'all_success') + + def test_apply_task_to_dag_with_partial_configuration(self): + conf = {'pipeline': 'my_pipeline', 'metrics': {'namespace': 'StartJobNameSpace'}} + dag = dag_test_utils.create_dag() + + task0 = job_start.JobStartTask(dag, 'my_start_pipeline', None, conf, 'all_success') + task0.apply_task_to_dag() + + self.assertEqual(len(dag.tasks), 1) + dag_task0 = dag.tasks[0] + + self.assertEqual(dag_task0.namespace, 'StartJobNameSpace') + self.assertEqual(dag_task0.backends, []) + + +if __name__ == '__main__': + unittest.main()
