Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-30-SQL-based-storage-implementation 0f15c99f1 -> 86f4aa835 (forced update)
tmp fix Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/86f4aa83 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/86f4aa83 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/86f4aa83 Branch: refs/heads/ARIA-30-SQL-based-storage-implementation Commit: 86f4aa835e938e277cb7dd50acd6f35f826b5ae9 Parents: 2fa5b7b Author: mxmrlv <[email protected]> Authored: Wed Dec 7 18:59:02 2016 +0200 Committer: mxmrlv <[email protected]> Committed: Wed Dec 7 19:31:15 2016 +0200 ---------------------------------------------------------------------- aria/orchestrator/workflows/core/engine.py | 12 +++++------- tests/orchestrator/workflows/core/test_engine.py | 3 +-- tests/orchestrator/workflows/executor/test_executor.py | 4 ++-- tests/storage/__init__.py | 7 +------ 4 files changed, 9 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/86f4aa83/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index 35aa976..2d26aeb 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -63,7 +63,6 @@ class Engine(logger.LoggerMixin): break else: time.sleep(0.1) - self.refresh_tasks() if cancel: events.on_cancelled_workflow_signal.send(self._workflow_context) else: @@ -72,11 +71,6 @@ class Engine(logger.LoggerMixin): events.on_failure_workflow_signal.send(self._workflow_context, exception=e) raise - def refresh_tasks(self): - for task in self._tasks_iter(): - if isinstance(task, engine_task.OperationTask): - self._workflow_context.model.task.refresh(task) - def cancel_execution(self): """ Send a cancel request to the engine. If execution already started, execution status @@ -106,7 +100,11 @@ class Engine(logger.LoggerMixin): return len(self._execution_graph.node) == 0 def _tasks_iter(self): - return (data['task'] for _, data in self._execution_graph.nodes_iter(data=True)) + for _, data in self._execution_graph.nodes_iter(data=True): + task = data['task'] + if isinstance(task, engine_task.OperationTask): + self._workflow_context.model.task.refresh(task.model_task) + yield task def _handle_executable_task(self, task): if isinstance(task, engine_task.StubTask): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/86f4aa83/tests/orchestrator/workflows/core/test_engine.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py index 8eaf0be..baded7f 100644 --- a/tests/orchestrator/workflows/core/test_engine.py +++ b/tests/orchestrator/workflows/core/test_engine.py @@ -214,7 +214,6 @@ class TestEngine(BaseTest): class TestCancel(BaseTest): - # TODO: what is up with this test? def test_cancel_started_execution(self, workflow_context, executor): number_of_tasks = 100 @@ -227,7 +226,7 @@ class TestCancel(BaseTest): executor=executor) t = threading.Thread(target=eng.execute) t.start() - time.sleep(1) + time.sleep(10) eng.cancel_execution() t.join(timeout=30) assert workflow_context.states == ['start', 'cancel'] http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/86f4aa83/tests/orchestrator/workflows/executor/test_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py index 078697e..a425799 100644 --- a/tests/orchestrator/workflows/executor/test_executor.py +++ b/tests/orchestrator/workflows/executor/test_executor.py @@ -43,8 +43,8 @@ class TestExecutor(object): @pytest.mark.parametrize('executor_cls,executor_kwargs', [ (thread.ThreadExecutor, {'pool_size': 1}), (thread.ThreadExecutor, {'pool_size': 2}), - # (multiprocess.MultiprocessExecutor, {'pool_size': 1}), - # (multiprocess.MultiprocessExecutor, {'pool_size': 2}), + (multiprocess.MultiprocessExecutor, {'pool_size': 1}), + (multiprocess.MultiprocessExecutor, {'pool_size': 2}), (blocking.CurrentThreadBlockingExecutor, {}), # (celery.CeleryExecutor, {'app': app}) ]) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/86f4aa83/tests/storage/__init__.py ---------------------------------------------------------------------- diff --git a/tests/storage/__init__.py b/tests/storage/__init__.py index 849d4d6..edff982 100644 --- a/tests/storage/__init__.py +++ b/tests/storage/__init__.py @@ -20,11 +20,7 @@ from shutil import rmtree from sqlalchemy import ( create_engine, orm) -from sqlalchemy.orm import ( - Session, - scoped_session -) -from sqlalchemy.orm.scoping import ScopedSession +from sqlalchemy.orm import scoped_session from sqlalchemy.pool import StaticPool from aria.storage import structures @@ -82,4 +78,3 @@ def release_sqlite_storage(storage): session.close() for engine in set(mapi._engine for mapi in mapis): structures.Model.metadata.drop_all(engine) -
