Repository: incubator-airflow Updated Branches: refs/heads/master 4f459b64e -> d165377d2
[AIRFLOW-1234] Cover utils.operator_helpers with UTs Closes #2317 from skudriashev/airflow-1234 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d165377d Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d165377d Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d165377d Branch: refs/heads/master Commit: d165377d2fa84763b36b6becbea2c89f0e18c4e1 Parents: 4f459b6 Author: Stanislav Kudriashev <[email protected]> Authored: Mon May 22 11:46:44 2017 +0200 Committer: Bolke de Bruin <[email protected]> Committed: Mon May 22 11:46:44 2017 +0200 ---------------------------------------------------------------------- airflow/utils/operator_helpers.py | 9 +++-- tests/utils/test_operator_helpers.py | 60 +++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d165377d/airflow/utils/operator_helpers.py ---------------------------------------------------------------------- diff --git a/airflow/utils/operator_helpers.py b/airflow/utils/operator_helpers.py index 7381fb3..fb9f74a 100644 --- a/airflow/utils/operator_helpers.py +++ b/airflow/utils/operator_helpers.py @@ -22,18 +22,23 @@ def context_to_airflow_vars(context): :param context: The context for the task_instance of interest :type context: dict """ - params = dict() + params = {} dag = context.get('dag') if dag and dag.dag_id: params['airflow.ctx.dag.dag_id'] = dag.dag_id + dag_run = context.get('dag_run') if dag_run and dag_run.execution_date: params['airflow.ctx.dag_run.execution_date'] = dag_run.execution_date.isoformat() + task = context.get('task') if task and task.task_id: params['airflow.ctx.task.task_id'] = task.task_id + task_instance = context.get('task_instance') if task_instance and task_instance.execution_date: - params['airflow.ctx.task_instance.execution_date'] = \ + params['airflow.ctx.task_instance.execution_date'] = ( task_instance.execution_date.isoformat() + ) + return params http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d165377d/tests/utils/test_operator_helpers.py ---------------------------------------------------------------------- diff --git a/tests/utils/test_operator_helpers.py b/tests/utils/test_operator_helpers.py new file mode 100644 index 0000000..bda9ec2 --- /dev/null +++ b/tests/utils/test_operator_helpers.py @@ -0,0 +1,60 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime +import mock +import unittest + +from airflow.utils import operator_helpers + + +class TestOperatorHelpers(unittest.TestCase): + + def setUp(self): + super(TestOperatorHelpers, self).setUp() + self.dag_id = 'dag_id' + self.task_id = 'task_id' + self.execution_date = '2017-05-21T00:00:00' + self.context = { + 'dag': mock.MagicMock(name='dag', dag_id=self.dag_id), + 'dag_run': mock.MagicMock( + name='dag_run', + execution_date=datetime.strptime(self.execution_date, + '%Y-%m-%dT%H:%M:%S'), + ), + 'task': mock.MagicMock(name='task', task_id=self.task_id), + 'task_instance': mock.MagicMock( + name='task_instance', + execution_date=datetime.strptime(self.execution_date, + '%Y-%m-%dT%H:%M:%S'), + ), + } + + def test_context_to_airflow_vars_empty_context(self): + self.assertDictEqual(operator_helpers.context_to_airflow_vars({}), {}) + + def test_context_to_airflow_vars_all_context(self): + self.assertDictEqual( + operator_helpers.context_to_airflow_vars(self.context), + { + 'airflow.ctx.dag.dag_id': self.dag_id, + 'airflow.ctx.dag_run.execution_date': self.execution_date, + 'airflow.ctx.task.task_id': self.task_id, + 'airflow.ctx.task_instance.execution_date': self.execution_date, + } + ) + + +if __name__ == '__main__': + unittest.main()
