This is an automated email from the ASF dual-hosted git repository. jbonofre pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-liminal.git
commit 253a15a16cef606085632d83ddf48a6640450b52 Author: Oded Rosenberg <[email protected]> AuthorDate: Thu Apr 9 15:06:03 2020 +0300 Add pipeline configuration as default arguments --- rainbow/runners/airflow/dag/rainbow_dags.py | 11 ++++++++--- tests/runners/airflow/dag/test_rainbow_dags.py | 9 +++++++++ tests/runners/airflow/rainbow/rainbow.yml | 7 ++++++- 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/rainbow/runners/airflow/dag/rainbow_dags.py b/rainbow/runners/airflow/dag/rainbow_dags.py index 6b071fd..d5e3be1 100644 --- a/rainbow/runners/airflow/dag/rainbow_dags.py +++ b/rainbow/runners/airflow/dag/rainbow_dags.py @@ -28,6 +28,8 @@ from rainbow.runners.airflow.model.task import Task from rainbow.runners.airflow.tasks.defaults.job_end import JobEndTask from rainbow.runners.airflow.tasks.defaults.job_start import JobStartTask +__DEPENDS_ON_PAST = 'depends_on_past' + def register_dags(configs_path): """ @@ -47,12 +49,15 @@ def register_dags(configs_path): for pipeline in config['pipelines']: pipeline_name = pipeline['pipeline'] - default_args = { - 'owner': config['owner'], + default_args = {k: v for k, v in pipeline.items()} + + override_args = { 'start_date': datetime.combine(pipeline['start_date'], datetime.min.time()), - 'depends_on_past': False, + __DEPENDS_ON_PAST: default_args[__DEPENDS_ON_PAST] if __DEPENDS_ON_PAST in default_args else False, } + default_args.update(override_args) + dag = DAG( dag_id=pipeline_name, default_args=default_args, diff --git a/tests/runners/airflow/dag/test_rainbow_dags.py b/tests/runners/airflow/dag/test_rainbow_dags.py index c744ce5..5ffdf07 100644 --- a/tests/runners/airflow/dag/test_rainbow_dags.py +++ b/tests/runners/airflow/dag/test_rainbow_dags.py @@ -31,6 +31,15 @@ class Test(TestCase): self.assertIsInstance(task_dict['end'], JobEndOperator) + def test_default_args(self): + dag = self.get_register_dags()[0] + default_args = dag.default_args + + keys = default_args.keys() + self.assertIn('default_arg_loaded', keys) + self.assertIn('default_array_loaded', keys) + self.assertIn('default_object_loaded', keys) + @staticmethod def get_register_dags(): base_path = os.path.join(os.path.dirname(__file__), '../rainbow') diff --git a/tests/runners/airflow/rainbow/rainbow.yml b/tests/runners/airflow/rainbow/rainbow.yml index 05c0a09..0b08a1f 100644 --- a/tests/runners/airflow/rainbow/rainbow.yml +++ b/tests/runners/airflow/rainbow/rainbow.yml @@ -17,12 +17,17 @@ # under the License. --- name: MyPipeline -owner: Bosco Albert Baracus pipelines: - pipeline: my_pipeline + owner: Bosco Albert Baracus start_date: 1970-01-01 timeout_minutes: 45 schedule: 0 * 1 * * + default_arg_loaded: check + default_array_loaded: [2, 3, 4] + default_object_loaded: + key1: val1 + key2: val2 metrics: namespace: TestNamespace backends: [ 'cloudwatch' ]
