Make compatible with 1.8
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/8df046bf Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/8df046bf Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/8df046bf Branch: refs/heads/v1-8-test Commit: 8df046bfbec670a253139c83c6174bb88f25ee7f Parents: 2b26a5d Author: Bolke de Bruin <[email protected]> Authored: Sun Mar 12 10:11:15 2017 -0700 Committer: Bolke de Bruin <[email protected]> Committed: Sun Mar 12 10:11:15 2017 -0700 ---------------------------------------------------------------------- tests/executors/__init__.py | 13 ++++++++ tests/executors/test_executor.py | 56 +++++++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8df046bf/tests/executors/__init__.py ---------------------------------------------------------------------- diff --git a/tests/executors/__init__.py b/tests/executors/__init__.py new file mode 100644 index 0000000..a85b772 --- /dev/null +++ b/tests/executors/__init__.py @@ -0,0 +1,13 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8df046bf/tests/executors/test_executor.py ---------------------------------------------------------------------- diff --git a/tests/executors/test_executor.py b/tests/executors/test_executor.py new file mode 100644 index 0000000..9ec6cd4 --- /dev/null +++ b/tests/executors/test_executor.py @@ -0,0 +1,56 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from airflow.executors.base_executor import BaseExecutor +from airflow.utils.state import State + +from airflow import settings + + +class TestExecutor(BaseExecutor): + """ + TestExecutor is used for unit testing purposes. + """ + def __init__(self, do_update=False, *args, **kwargs): + self.do_update = do_update + self._running = [] + self.history = [] + + super(TestExecutor, self).__init__(*args, **kwargs) + + def execute_async(self, key, command, queue=None): + self.logger.debug("{} running task instances".format(len(self.running))) + self.logger.debug("{} in queue".format(len(self.queued_tasks))) + + def heartbeat(self): + session = settings.Session() + if self.do_update: + self.history.append(list(self.queued_tasks.values())) + while len(self._running) > 0: + ti = self._running.pop() + ti.set_state(State.SUCCESS, session) + for key, val in list(self.queued_tasks.items()): + (command, priority, queue, ti) = val + ti.set_state(State.RUNNING, session) + self._running.append(ti) + self.queued_tasks.pop(key) + + session.commit() + session.close() + + def terminate(self): + pass + + def end(self): + self.sync() +
