Repository: incubator-airflow Updated Branches: refs/heads/v1-9-test 08bada522 -> 1b210337e
[AIRFLOW-1554] Fix wrong DagFileProcessor termination method call Closes #2821 from pdambrauskas/fix/wrong_termination_call (cherry picked from commit ff0d75f062a34c267f0902c1d4c7b148ba4b490a) Signed-off-by: Bolke de Bruin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/1b210337 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/1b210337 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/1b210337 Branch: refs/heads/v1-9-test Commit: 1b210337e024b044c9c11f2950a9f3c2368ee47c Parents: 08bada5 Author: Paulius <[email protected]> Authored: Tue Dec 5 19:39:25 2017 +0100 Committer: Bolke de Bruin <[email protected]> Committed: Tue Dec 5 19:39:41 2017 +0100 ---------------------------------------------------------------------- airflow/utils/dag_processing.py | 2 +- tests/utils/test_dag_processing.py | 51 +++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1b210337/airflow/utils/dag_processing.py ---------------------------------------------------------------------- diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py index 5e92f0e..a945290 100644 --- a/airflow/utils/dag_processing.py +++ b/airflow/utils/dag_processing.py @@ -430,7 +430,7 @@ class DagFileProcessorManager(LoggingMixin): filtered_processors[file_path] = processor else: self.log.warning("Stopping processor for %s", file_path) - processor.stop() + processor.terminate() self._processors = filtered_processors def processing_count(self): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1b210337/tests/utils/test_dag_processing.py ---------------------------------------------------------------------- diff --git a/tests/utils/test_dag_processing.py b/tests/utils/test_dag_processing.py new file mode 100644 index 0000000..2b60cd0 --- /dev/null +++ b/tests/utils/test_dag_processing.py @@ -0,0 +1,51 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +from mock import MagicMock + +from airflow.utils.dag_processing import DagFileProcessorManager + + +class TestDagFileProcessorManager(unittest.TestCase): + def test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self): + manager = DagFileProcessorManager(dag_directory='directory', file_paths=['abc.txt'], + parallelism=1, process_file_interval=1, + max_runs=1, processor_factory=MagicMock().return_value) + + mock_processor = MagicMock() + mock_processor.stop.side_effect = AttributeError( + 'DagFileProcessor object has no attribute stop') + mock_processor.terminate.side_effect = None + + manager._processors['missing_file.txt'] = mock_processor + + manager.set_file_paths(['abc.txt']) + self.assertDictEqual(manager._processors, {}) + + def test_set_file_paths_when_processor_file_path_is_in_new_file_paths(self): + manager = DagFileProcessorManager(dag_directory='directory', file_paths=['abc.txt'], + parallelism=1, process_file_interval=1, + max_runs=1, processor_factory=MagicMock().return_value) + + mock_processor = MagicMock() + mock_processor.stop.side_effect = AttributeError( + 'DagFileProcessor object has no attribute stop') + mock_processor.terminate.side_effect = None + + manager._processors['abc.txt'] = mock_processor + + manager.set_file_paths(['abc.txt']) + self.assertDictEqual(manager._processors, {'abc.txt': mock_processor})
