Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-214-Dry-execution-changes-the-state-of-non-implemented-operations 724b4db9c -> 57cc88867
wip Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/57cc8886 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/57cc8886 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/57cc8886 Branch: refs/heads/ARIA-214-Dry-execution-changes-the-state-of-non-implemented-operations Commit: 57cc888673365f87077a28aa04ff5db10fd6199b Parents: 724b4db Author: max-orlov <[email protected]> Authored: Sun May 7 18:39:20 2017 +0300 Committer: max-orlov <[email protected]> Committed: Sun May 7 18:39:20 2017 +0300 ---------------------------------------------------------------------- aria/orchestrator/workflows/executor/base.py | 6 ++++-- aria/orchestrator/workflows/executor/celery.py | 2 -- aria/orchestrator/workflows/executor/process.py | 5 +---- aria/orchestrator/workflows/executor/thread.py | 4 +--- 4 files changed, 6 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/57cc8886/aria/orchestrator/workflows/executor/base.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py index a95e58f..7c18586 100644 --- a/aria/orchestrator/workflows/executor/base.py +++ b/aria/orchestrator/workflows/executor/base.py @@ -25,16 +25,18 @@ class BaseExecutor(logger.LoggerMixin): """ Base class for executors for running tasks """ + def _execute(self, task): + raise NotImplementedError def execute(self, task): """ Execute a task :param task: task to execute """ - if task.model_task.implementation: + if hasattr(task, 'model_task') and task.model_task.implementation: self._execute_empty_task(task) else: - raise NotImplementedError + self._execute(task) def _execute_empty_task(self, task): self._task_started(task) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/57cc8886/aria/orchestrator/workflows/executor/celery.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/celery.py b/aria/orchestrator/workflows/executor/celery.py index 0406d4e..7bd9b7c 100644 --- a/aria/orchestrator/workflows/executor/celery.py +++ b/aria/orchestrator/workflows/executor/celery.py @@ -43,8 +43,6 @@ class CeleryExecutor(BaseExecutor): self._started_queue.get(timeout=30) def execute(self, task): - if not task.implementation: - return self._execute_empty_task(task) self._tasks[task.id] = task inputs = dict(inp.unwrap() for inp in task.inputs.values()) inputs['ctx'] = task.context http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/57cc8886/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index 3053c7e..25413c8 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -117,11 +117,8 @@ class ProcessExecutor(base.BaseExecutor): self._server_socket.close() self._listener_thread.join(timeout=60) - def execute(self, task): + def _execute(self, task): self._check_closed() - - if not task.implementation: - return self._execute_empty_task(task) self._tasks[task.id] = task # Temporary file used to pass arguments to the started subprocess http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/57cc8886/aria/orchestrator/workflows/executor/thread.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py index 8e16870..f53362a 100644 --- a/aria/orchestrator/workflows/executor/thread.py +++ b/aria/orchestrator/workflows/executor/thread.py @@ -46,9 +46,7 @@ class ThreadExecutor(BaseExecutor): thread.start() self._pool.append(thread) - def execute(self, task): - if not task.implementation: - return self._execute_empty_task(task) + def _execute(self, task): self._queue.put(task) def close(self):
