Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-10-task-retries 0b3c85d8a -> feacd9f66 (forced update)
ARIA-10 Implement task retries mechanism Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/feacd9f6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/feacd9f6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/feacd9f6 Branch: refs/heads/ARIA-10-task-retries Commit: feacd9f664cb068b7ebeebad3dda51a941889de3 Parents: 8947f72 Author: Dan Kilman <[email protected]> Authored: Tue Nov 8 11:03:08 2016 +0200 Committer: Dan Kilman <[email protected]> Committed: Tue Nov 8 16:34:14 2016 +0200 ---------------------------------------------------------------------- .gitignore | 2 +- aria/context/workflow.py | 5 +- aria/events/builtin_event_handler.py | 14 +- aria/storage/models.py | 6 +- aria/storage/structures.py | 10 +- aria/tools/application.py | 6 +- aria/workflows/api/task.py | 8 +- aria/workflows/core/engine.py | 6 +- aria/workflows/core/task.py | 52 ++-- aria/workflows/core/translation.py | 21 +- tests/mock/context.py | 4 +- tests/mock/models.py | 6 +- tests/requirements.txt | 2 +- tests/storage/test_models.py | 26 +- tests/workflows/api/__init__.py | 1 - tests/workflows/api/test_task.py | 20 +- tests/workflows/core/test_engine.py | 322 +++++++++++++++++++++++++ tests/workflows/core/test_executor.py | 136 ----------- tests/workflows/executor/__init__.py | 14 ++ tests/workflows/executor/test_executor.py | 138 +++++++++++ tests/workflows/test_engine.py | 187 -------------- 21 files changed, 596 insertions(+), 390 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 29c4e9c..482383a 100644 --- a/.gitignore +++ b/.gitignore @@ -29,7 +29,7 @@ pip-delete-this-directory.txt # Unit test / coverage reports htmlcov/ .tox/ -.coverage +.coverage* .cache nosetests.xml coverage.xml http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/aria/context/workflow.py ---------------------------------------------------------------------- diff --git a/aria/context/workflow.py b/aria/context/workflow.py index 8183d42..44db38d 100644 --- a/aria/context/workflow.py +++ b/aria/context/workflow.py @@ -47,6 +47,8 @@ class WorkflowContext(logger.LoggerMixin): workflow_id, execution_id=None, parameters=None, + task_max_retries=0, + task_retry_interval=0, **kwargs): super(WorkflowContext, self).__init__(**kwargs) self.name = name @@ -57,6 +59,8 @@ class WorkflowContext(logger.LoggerMixin): self.workflow_id = workflow_id self.execution_id = execution_id or str(uuid4()) self.parameters = parameters or {} + self.task_max_retries = task_max_retries + self.task_retry_interval = task_retry_interval def __repr__(self): return ( @@ -187,4 +191,3 @@ class _CurrentContext(threading.local): self._set(prev_workflow_context) current = _CurrentContext() - http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/aria/events/builtin_event_handler.py ---------------------------------------------------------------------- diff --git a/aria/events/builtin_event_handler.py b/aria/events/builtin_event_handler.py index 2abdd9f..280785e 100644 --- a/aria/events/builtin_event_handler.py +++ b/aria/events/builtin_event_handler.py @@ -21,7 +21,10 @@ Implementation of storage handlers for workflow and operation events. """ -from datetime import datetime +from datetime import ( + datetime, + timedelta, +) from . import ( start_workflow_signal, @@ -50,8 +53,13 @@ def _task_started(task, *args, **kwargs): @on_failure_task_signal.connect def _task_failed(task, *args, **kwargs): with task.update(): - task.ended_at = datetime.utcnow() - task.status = task.FAILED + if task.retry_count < task.max_retries: + task.status = task.RETRYING + task.retry_count += 1 + task.eta = datetime.utcnow() + timedelta(seconds=task.retry_interval) + else: + task.ended_at = datetime.utcnow() + task.status = task.FAILED @on_success_task_signal.connect http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/aria/storage/models.py ---------------------------------------------------------------------- diff --git a/aria/storage/models.py b/aria/storage/models.py index 9aa7cf0..e74d733 100644 --- a/aria/storage/models.py +++ b/aria/storage/models.py @@ -376,27 +376,31 @@ class Task(Model): A Model which represents an task """ PENDING = 'pending' + RETRYING = 'retrying' SENT = 'sent' STARTED = 'started' SUCCESS = 'success' FAILED = 'failed' STATES = ( PENDING, + RETRYING, SENT, STARTED, SUCCESS, FAILED, ) + WAIT_STATES = [PENDING, RETRYING] END_STATES = [SUCCESS, FAILED] id = Field(type=basestring, default=uuid_generator) status = Field(type=basestring, choices=STATES, default=PENDING) execution_id = Field(type=basestring) - eta = Field(type=datetime, default=datetime.now) + eta = Field(type=datetime, default=datetime.utcnow) started_at = Field(type=datetime, default=None) ended_at = Field(type=datetime, default=None) max_retries = Field(type=int, default=1) retry_count = Field(type=int, default=0) + retry_interval = Field(type=(int, float), default=0) # Operation specific fields name = Field(type=basestring) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/aria/storage/structures.py ---------------------------------------------------------------------- diff --git a/aria/storage/structures.py b/aria/storage/structures.py index ea4cf3a..c692d36 100644 --- a/aria/storage/structures.py +++ b/aria/storage/structures.py @@ -195,17 +195,17 @@ class Model(object): :param fields: each item is validated and transformed into instance attributes. """ self._assert_model_have_id_field(**fields) - missing_fileds, unexpected_fileds = self._setup_fields(fields) + missing_fields, unexpected_fields = self._setup_fields(fields) - if missing_fileds: + if missing_fields: raise StorageError( 'Model {name} got missing keyword arguments: {fields}'.format( - name=self.__class__.__name__, fields=missing_fileds)) + name=self.__class__.__name__, fields=missing_fields)) - if unexpected_fileds: + if unexpected_fields: raise StorageError( 'Model {name} got unexpected keyword arguments: {fields}'.format( - name=self.__class__.__name__, fields=unexpected_fileds)) + name=self.__class__.__name__, fields=unexpected_fields)) def __repr__(self): return '{name}(fields={0})'.format(sorted(self.fields), name=self.__class__.__name__) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/aria/tools/application.py ---------------------------------------------------------------------- diff --git a/aria/tools/application.py b/aria/tools/application.py index ddc1317..360ba33 100644 --- a/aria/tools/application.py +++ b/aria/tools/application.py @@ -108,7 +108,7 @@ class StorageManager(LoggerMixin): self.logger.debug('created blueprint resource storage entry') self.logger.debug('creating blueprint model storage entry') - now = datetime.now() + now = datetime.utcnow() blueprint = self.model_storage.blueprint.model_cls( plan=self.blueprint_plan, id=self.blueprint_id, @@ -175,7 +175,7 @@ class StorageManager(LoggerMixin): self.logger.debug('created deployment resource storage entry') self.logger.debug('creating deployment model storage entry') - now = datetime.now() + now = datetime.utcnow() deployment = self.model_storage.deployment.model_cls( id=self.deployment_id, blueprint_id=self.blueprint_id, @@ -241,7 +241,7 @@ class StorageManager(LoggerMixin): self.logger.debug('creating plugin model storage entry') plugin = _load_plugin_from_archive(source) build_props = plugin.get('build_server_os_properties') - now = datetime.now() + now = datetime.utcnow() plugin = self.model_storage.plugin.model_cls( id=plugin_id, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/aria/workflows/api/task.py ---------------------------------------------------------------------- diff --git a/aria/workflows/api/task.py b/aria/workflows/api/task.py index 1070d99..ca62cf7 100644 --- a/aria/workflows/api/task.py +++ b/aria/workflows/api/task.py @@ -14,7 +14,7 @@ # limitations under the License. """ -Provides the tasks to be enterd into the task graph +Provides the tasks to be entered into the task graph """ from uuid import uuid4 @@ -58,6 +58,8 @@ class OperationTask(BaseTask): name, operation_details, node_instance, + max_retries=None, + retry_interval=None, inputs=None): """ Creates an operation task using the name, details, node instance and any additional kwargs. @@ -71,6 +73,10 @@ class OperationTask(BaseTask): self.operation_details = operation_details self.node_instance = node_instance self.inputs = inputs or {} + self.max_retries = (self.workflow_context.task_max_retries + if max_retries is None else max_retries) + self.retry_interval = (self.workflow_context.task_retry_interval + if retry_interval is None else retry_interval) class WorkflowTask(BaseTask): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/aria/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/workflows/core/engine.py b/aria/workflows/core/engine.py index a288757..65e17c3 100644 --- a/aria/workflows/core/engine.py +++ b/aria/workflows/core/engine.py @@ -63,9 +63,9 @@ class Engine(logger.LoggerMixin): raise def _executable_tasks(self): - now = datetime.now() + now = datetime.utcnow() return (task for task in self._tasks_iter() - if task.status == models.Task.PENDING and + if task.status in models.Task.WAIT_STATES and task.eta <= now and not self._task_has_dependencies(task)) @@ -82,7 +82,7 @@ class Engine(logger.LoggerMixin): return (data['task'] for _, data in self._execution_graph.nodes_iter(data=True)) def _handle_executable_task(self, task): - if isinstance(task, engine_task.BaseWorkflowTask): + if isinstance(task, engine_task.StubTask): task.status = models.Task.SUCCESS else: events.sent_task_signal.send(task) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/aria/workflows/core/task.py ---------------------------------------------------------------------- diff --git a/aria/workflows/core/task.py b/aria/workflows/core/task.py index fc72b59..1cded79 100644 --- a/aria/workflows/core/task.py +++ b/aria/workflows/core/task.py @@ -41,39 +41,39 @@ class BaseTask(logger.LoggerMixin): return self._id -class BaseWorkflowTask(BaseTask): +class StubTask(BaseTask): """ - Base class for all workflow wrapping tasks + Base stub task for all tasks that don't actually run anything """ def __init__(self, *args, **kwargs): - super(BaseWorkflowTask, self).__init__(*args, **kwargs) + super(StubTask, self).__init__(*args, **kwargs) self.status = models.Task.PENDING - self.eta = datetime.now() + self.eta = datetime.utcnow() -class StartWorkflowTask(BaseWorkflowTask): +class StartWorkflowTask(StubTask): """ Tasks marking a workflow start """ pass -class EndWorkflowTask(BaseWorkflowTask): +class EndWorkflowTask(StubTask): """ Tasks marking a workflow end """ pass -class StartSubWorkflowTask(BaseWorkflowTask): +class StartSubWorkflowTask(StubTask): """ Tasks marking a subworkflow start """ pass -class EndSubWorkflowTask(BaseWorkflowTask): +class EndSubWorkflowTask(StubTask): """ Tasks marking a subworkflow end """ @@ -96,7 +96,8 @@ class OperationTask(BaseTask, logger.LoggerMixin): inputs=api_task.inputs, status=task_model.PENDING, execution_id=self.workflow_context.execution_id, - max_retries=self.workflow_context.parameters.get('max_retries', 1), + max_retries=api_task.max_retries, + retry_interval=api_task.retry_interval, ) self.workflow_context.model.task.store(task) self._task_id = task.id @@ -147,9 +148,7 @@ class OperationTask(BaseTask, logger.LoggerMixin): @status.setter def status(self, value): - if self._update_fields is None: - raise exceptions.TaskException("Task is not in update mode") - self._update_fields['status'] = value + self._update_property('status', value) @property def started_at(self): @@ -161,9 +160,7 @@ class OperationTask(BaseTask, logger.LoggerMixin): @started_at.setter def started_at(self, value): - if self._update_fields is None: - raise exceptions.TaskException("Task is not in update mode") - self._update_fields['started_at'] = value + self._update_property('started_at', value) @property def ended_at(self): @@ -175,9 +172,7 @@ class OperationTask(BaseTask, logger.LoggerMixin): @ended_at.setter def ended_at(self, value): - if self._update_fields is None: - raise exceptions.TaskException("Task is not in update mode") - self._update_fields['ended_at'] = value + self._update_property('ended_at', value) @property def retry_count(self): @@ -189,12 +184,27 @@ class OperationTask(BaseTask, logger.LoggerMixin): @retry_count.setter def retry_count(self, value): - if self._update_fields is None: - raise exceptions.TaskException("Task is not in update mode") - self._update_fields['retry_count'] = value + self._update_property('retry_count', value) + + @property + def eta(self): + """ + Returns the minimum datetime in which the task can be executed + :return: eta + """ + return self.context.eta + + @eta.setter + def eta(self, value): + self._update_property('eta', value) def __getattr__(self, attr): try: return getattr(self.context, attr) except AttributeError: return super(OperationTask, self).__getattribute__(attr) + + def _update_property(self, key, value): + if self._update_fields is None: + raise exceptions.TaskException("Task is not in update mode") + self._update_fields[key] = value http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/aria/workflows/core/translation.py ---------------------------------------------------------------------- diff --git a/aria/workflows/core/translation.py b/aria/workflows/core/translation.py index cd9a0e6..b6cbdad 100644 --- a/aria/workflows/core/translation.py +++ b/aria/workflows/core/translation.py @@ -17,8 +17,8 @@ Translation of user graph's API to the execution graph """ -from . import task as core_task from .. import api +from . import task as core_task def build_execution_graph( @@ -47,12 +47,12 @@ def build_execution_graph( dependencies, default=[start_task]) - if _is_operation(api_task): + if isinstance(api_task, api.task.OperationTask): # Add the task an the dependencies operation_task = core_task.OperationTask(api_task) _add_task_and_dependencies(execution_graph, operation_task, operation_dependencies) - else: - # Built the graph recursively while adding start and end markers + elif isinstance(api_task, api.task.WorkflowTask): + # Build the graph recursively while adding start and end markers build_execution_graph( task_graph=api_task, execution_graph=execution_graph, @@ -60,6 +60,11 @@ def build_execution_graph( end_cls=core_task.EndSubWorkflowTask, depends_on=operation_dependencies ) + elif isinstance(api_task, api.task.StubTask): + stub_task = core_task.StubTask(id=api_task.id) + _add_task_and_dependencies(execution_graph, stub_task, operation_dependencies) + else: + raise RuntimeError('Undefined state') # Insert end marker workflow_dependencies = _get_tasks_from_dependencies( @@ -80,15 +85,13 @@ def _get_tasks_from_dependencies(execution_graph, dependencies, default=()): """ Returns task list from dependencies. """ - return [execution_graph.node[dependency.id if _is_operation(dependency) + return [execution_graph.node[dependency.id + if isinstance(dependency, (api.task.OperationTask, + api.task.StubTask)) else _end_graph_suffix(dependency.id)]['task'] for dependency in dependencies] or default -def _is_operation(task): - return isinstance(task, api.task.OperationTask) - - def _start_graph_suffix(id): return '{0}-Start'.format(id) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/tests/mock/context.py ---------------------------------------------------------------------- diff --git a/tests/mock/context.py b/tests/mock/context.py index a89612e..13020f3 100644 --- a/tests/mock/context.py +++ b/tests/mock/context.py @@ -28,5 +28,7 @@ def simple(): resource_storage=None, deployment_id=models.DEPLOYMENT_ID, workflow_id=models.WORKFLOW_ID, - execution_id=models.EXECUTION_ID + execution_id=models.EXECUTION_ID, + task_max_retries=models.TASK_MAX_RETRIES, + task_retry_interval=models.TASK_RETRY_INTERVAL ) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/tests/mock/models.py ---------------------------------------------------------------------- diff --git a/tests/mock/models.py b/tests/mock/models.py index 633adbb..5547321 100644 --- a/tests/mock/models.py +++ b/tests/mock/models.py @@ -23,6 +23,8 @@ DEPLOYMENT_ID = 'test_deployment_id' BLUEPRINT_ID = 'test_blueprint_id' WORKFLOW_ID = 'test_workflow_id' EXECUTION_ID = 'test_execution_id' +TASK_RETRY_INTERVAL = 1 +TASK_MAX_RETRIES = 1 def get_dependency_node(): @@ -115,13 +117,13 @@ def get_execution(): deployment_id=DEPLOYMENT_ID, workflow_id=WORKFLOW_ID, blueprint_id=BLUEPRINT_ID, - started_at=datetime.now(), + started_at=datetime.utcnow(), parameters=None ) def get_deployment(): - now = datetime.now() + now = datetime.utcnow() return models.Deployment( id=DEPLOYMENT_ID, description=None, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/tests/requirements.txt ---------------------------------------------------------------------- diff --git a/tests/requirements.txt b/tests/requirements.txt index 92b4e78..cda295a 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -12,7 +12,7 @@ testtools mock==1.0.1 -pylint==1.5.5 +pylint==1.6.4 pytest==3.0.2 pytest-cov==2.3.1 pytest-mock==1.2 http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/tests/storage/test_models.py ---------------------------------------------------------------------- diff --git a/tests/storage/test_models.py b/tests/storage/test_models.py index fa7333b..19262bb 100644 --- a/tests/storage/test_models.py +++ b/tests/storage/test_models.py @@ -71,8 +71,8 @@ def test_blueprint_model(): plan={}, id='id', description='description', - created_at=datetime.now(), - updated_at=datetime.now(), + created_at=datetime.utcnow(), + updated_at=datetime.utcnow(), main_file_name='/path', ) with pytest.raises(TypeError): @@ -80,8 +80,8 @@ def test_blueprint_model(): plan=None, id='id', description='description', - created_at=datetime.now(), - updated_at=datetime.now(), + created_at=datetime.utcnow(), + updated_at=datetime.utcnow(), main_file_name='/path', ) with pytest.raises(TypeError): @@ -89,8 +89,8 @@ def test_blueprint_model(): plan={}, id=999, description='description', - created_at=datetime.now(), - updated_at=datetime.now(), + created_at=datetime.utcnow(), + updated_at=datetime.utcnow(), main_file_name='/path', ) with pytest.raises(TypeError): @@ -98,8 +98,8 @@ def test_blueprint_model(): plan={}, id='id', description=999, - created_at=datetime.now(), - updated_at=datetime.now(), + created_at=datetime.utcnow(), + updated_at=datetime.utcnow(), main_file_name='/path', ) with pytest.raises(TypeError): @@ -108,7 +108,7 @@ def test_blueprint_model(): id='id', description='description', created_at='error', - updated_at=datetime.now(), + updated_at=datetime.utcnow(), main_file_name='/path', ) with pytest.raises(TypeError): @@ -116,7 +116,7 @@ def test_blueprint_model(): plan={}, id='id', description='description', - created_at=datetime.now(), + created_at=datetime.utcnow(), updated_at=None, main_file_name='/path', ) @@ -125,15 +125,15 @@ def test_blueprint_model(): plan={}, id='id', description='description', - created_at=datetime.now(), + created_at=datetime.utcnow(), updated_at=None, main_file_name=88, ) Blueprint( plan={}, description='description', - created_at=datetime.now(), - updated_at=datetime.now(), + created_at=datetime.utcnow(), + updated_at=datetime.utcnow(), main_file_name='/path', ) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/tests/workflows/api/__init__.py ---------------------------------------------------------------------- diff --git a/tests/workflows/api/__init__.py b/tests/workflows/api/__init__.py index 09697dc..ae1e83e 100644 --- a/tests/workflows/api/__init__.py +++ b/tests/workflows/api/__init__.py @@ -12,4 +12,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/tests/workflows/api/test_task.py ---------------------------------------------------------------------- diff --git a/tests/workflows/api/test_task.py b/tests/workflows/api/test_task.py index 7119529..7ecd28c 100644 --- a/tests/workflows/api/test_task.py +++ b/tests/workflows/api/test_task.py @@ -67,17 +67,35 @@ class TestOperationTask(object): op_details = {'operation_details': True} node_instance = mock.models.get_dependency_node_instance() inputs = {'inputs': True} + max_retries = 10 + retry_interval = 10 with context.workflow.current.push(workflow_context): model_task = api.task.OperationTask(name=name, operation_details=op_details, node_instance=node_instance, - inputs=inputs) + inputs=inputs, + max_retries=max_retries, + retry_interval=retry_interval) assert model_task.name == name assert model_task.operation_details == op_details assert model_task.node_instance == node_instance assert model_task.inputs == inputs + assert model_task.retry_interval == retry_interval + assert model_task.max_retries == max_retries + + def test_operation_task_default_values(self): + workflow_context = mock.context.simple() + with context.workflow.current.push(workflow_context): + model_task = api.task.OperationTask( + name='stub', + operation_details={}, + node_instance=mock.models.get_dependency_node_instance()) + + assert model_task.inputs == {} + assert model_task.retry_interval == workflow_context.task_retry_interval + assert model_task.max_retries == workflow_context.task_max_retries class TestWorkflowTask(object): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/tests/workflows/core/test_engine.py ---------------------------------------------------------------------- diff --git a/tests/workflows/core/test_engine.py b/tests/workflows/core/test_engine.py new file mode 100644 index 0000000..39c9cb2 --- /dev/null +++ b/tests/workflows/core/test_engine.py @@ -0,0 +1,322 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time +from datetime import datetime + +import pytest + +import aria +from aria import events +from aria import workflow +from aria import context +from aria.storage import models +from aria.workflows import exceptions +from aria.workflows.executor import thread +from aria.workflows.core import engine +from aria.workflows import api + +from tests import mock + +import tests.storage + +logging.basicConfig() +global_test_holder = {} + + +class BaseTest(object): + + @staticmethod + def _execute(workflow_func, workflow_context, executor): + graph = workflow_func(ctx=workflow_context) + eng = engine.Engine(executor=executor, workflow_context=workflow_context, tasks_graph=graph) + eng.execute() + + @staticmethod + def _op(func, ctx, inputs=None, max_retries=None, retry_interval=None): + return api.task.OperationTask( + name='task', + operation_details={'operation': 'tests.workflows.core.test_engine.{name}'.format( + name=func.__name__)}, + node_instance=ctx.model.node_instance.get('dependency_node_instance'), + inputs=inputs, + max_retries=max_retries, + retry_interval=retry_interval + ) + + @pytest.fixture(scope='function', autouse=True) + def globals_cleanup(self): + try: + yield + finally: + global_test_holder.clear() + + @pytest.fixture(scope='function', autouse=True) + def signals_registration(self, ): + def sent_task_handler(*args, **kwargs): + calls = global_test_holder.setdefault('sent_task_signal_calls', 0) + global_test_holder['sent_task_signal_calls'] = calls + 1 + + def start_workflow_handler(workflow_context, *args, **kwargs): + workflow_context.states.append('start') + + def success_workflow_handler(workflow_context, *args, **kwargs): + workflow_context.states.append('success') + + def failure_workflow_handler(workflow_context, exception, *args, **kwargs): + workflow_context.states.append('failure') + workflow_context.exception = exception + + events.start_workflow_signal.connect(start_workflow_handler) + events.on_success_workflow_signal.connect(success_workflow_handler) + events.on_failure_workflow_signal.connect(failure_workflow_handler) + events.sent_task_signal.connect(sent_task_handler) + try: + yield + finally: + events.start_workflow_signal.disconnect(start_workflow_handler) + events.on_success_workflow_signal.disconnect(success_workflow_handler) + events.on_failure_workflow_signal.disconnect(failure_workflow_handler) + events.sent_task_signal.disconnect(sent_task_handler) + + @pytest.fixture(scope='function') + def executor(self): + result = thread.ThreadExecutor() + try: + yield result + finally: + result.close() + + @pytest.fixture(scope='function') + def workflow_context(self): + model_storage = aria.application_model_storage(tests.storage.InMemoryModelDriver()) + model_storage.setup() + deployment = models.Deployment( + id='d1', + blueprint_id='b1', + description=None, + created_at=datetime.utcnow(), + updated_at=datetime.utcnow(), + workflows={}) + model_storage.deployment.store(deployment) + node = mock.models.get_dependency_node() + node_instance = mock.models.get_dependency_node_instance(node) + model_storage.node.store(node) + model_storage.node_instance.store(node_instance) + result = context.workflow.WorkflowContext( + name='test', + model_storage=model_storage, + resource_storage=None, + deployment_id=deployment.id, + workflow_id='name') + result.states = [] + result.exception = None + return result + + +class TestEngine(BaseTest): + + def test_empty_graph_execution(self, workflow_context, executor): + @workflow + def mock_workflow(**_): + pass + self._execute(workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert 'sent_task_signal_calls' not in global_test_holder + + def test_single_task_successful_execution(self, workflow_context, executor): + @workflow + def mock_workflow(ctx, graph): + graph.add_tasks(self._op(mock_success_task, ctx)) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert global_test_holder.get('sent_task_signal_calls') == 1 + + def test_single_task_failed_execution(self, workflow_context, executor): + @workflow + def mock_workflow(ctx, graph): + graph.add_tasks(self._op(mock_failed_task, ctx)) + with pytest.raises(exceptions.ExecutorException): + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'failure'] + assert isinstance(workflow_context.exception, exceptions.ExecutorException) + assert global_test_holder.get('sent_task_signal_calls') == 1 + + def test_two_tasks_execution_order(self, workflow_context, executor): + @workflow + def mock_workflow(ctx, graph): + op1 = self._op(mock_ordered_task, ctx, inputs={'counter': 1}) + op2 = self._op(mock_ordered_task, ctx, inputs={'counter': 2}) + graph.sequence(op1, op2) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert global_test_holder.get('invocations') == [1, 2] + assert global_test_holder.get('sent_task_signal_calls') == 2 + + def test_stub_and_subworkflow_execution(self, workflow_context, executor): + @workflow + def sub_workflow(ctx, graph): + op1 = self._op(mock_ordered_task, ctx, inputs={'counter': 1}) + op2 = api.task.StubTask() + op3 = self._op(mock_ordered_task, ctx, inputs={'counter': 2}) + graph.sequence(op1, op2, op3) + + @workflow + def mock_workflow(ctx, graph): + graph.add_tasks(api.task.WorkflowTask(sub_workflow, ctx=ctx)) + + self._execute(workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert global_test_holder.get('invocations') == [1, 2] + assert global_test_holder.get('sent_task_signal_calls') == 2 + + +class TestRetries(BaseTest): + + def test_one_max_retries_and_success_on_retry(self, workflow_context, executor): + @workflow + def mock_workflow(ctx, graph): + op = self._op(mock_conditional_failure_task, ctx, + inputs={'fail_on_invocations_less_than': 1}, + max_retries=1) + graph.add_tasks(op) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert len(global_test_holder.get('invocations', [])) == 2 + assert global_test_holder.get('sent_task_signal_calls') == 2 + + def test_one_max_retries_and_failure_on_retry(self, workflow_context, executor): + @workflow + def mock_workflow(ctx, graph): + op = self._op(mock_conditional_failure_task, ctx, + inputs={'fail_on_invocations_less_than': 2}, + max_retries=1) + graph.add_tasks(op) + with pytest.raises(exceptions.ExecutorException): + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'failure'] + assert isinstance(workflow_context.exception, exceptions.ExecutorException) + assert len(global_test_holder.get('invocations', [])) == 2 + assert global_test_holder.get('sent_task_signal_calls') == 2 + + def test_two_max_retries_and_success_on_first_retry(self, workflow_context, executor): + @workflow + def mock_workflow(ctx, graph): + op = self._op(mock_conditional_failure_task, ctx, + inputs={'fail_on_invocations_less_than': 1}, + max_retries=2) + graph.add_tasks(op) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert len(global_test_holder.get('invocations', [])) == 2 + assert global_test_holder.get('sent_task_signal_calls') == 2 + + def test_two_max_retries_and_success_on_second_retry(self, workflow_context, executor): + @workflow + def mock_workflow(ctx, graph): + op = self._op(mock_conditional_failure_task, ctx, + inputs={'fail_on_invocations_less_than': 2}, + max_retries=2) + graph.add_tasks(op) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert len(global_test_holder.get('invocations', [])) == 3 + assert global_test_holder.get('sent_task_signal_calls') == 3 + + def test_retry_interval_float(self, workflow_context, executor): + self._test_retry_interval(retry_interval=0.3, + workflow_context=workflow_context, + executor=executor) + + def test_retry_interval_int(self, workflow_context, executor): + self._test_retry_interval(retry_interval=1, + workflow_context=workflow_context, + executor=executor) + + def _test_retry_interval(self, retry_interval, workflow_context, executor): + @workflow + def mock_workflow(ctx, graph): + op = self._op(mock_conditional_failure_task, ctx, + inputs={'fail_on_invocations_less_than': 1}, + max_retries=1, + retry_interval=retry_interval) + graph.add_tasks(op) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + invocations = global_test_holder.get('invocations', []) + assert len(invocations) == 2 + invocation1, invocation2 = invocations + assert invocation2 - invocation1 >= retry_interval + assert global_test_holder.get('sent_task_signal_calls') == 2 + + +def mock_success_task(): + pass + + +def mock_failed_task(): + raise RuntimeError + + +def mock_ordered_task(counter): + invocations = global_test_holder.setdefault('invocations', []) + invocations.append(counter) + + +def mock_conditional_failure_task(fail_on_invocations_less_than): + invocations = global_test_holder.setdefault('invocations', []) + try: + if len(invocations) < fail_on_invocations_less_than: + raise RuntimeError + finally: + invocations.append(time.time()) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/tests/workflows/core/test_executor.py ---------------------------------------------------------------------- diff --git a/tests/workflows/core/test_executor.py b/tests/workflows/core/test_executor.py deleted file mode 100644 index 8ec0303..0000000 --- a/tests/workflows/core/test_executor.py +++ /dev/null @@ -1,136 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -import uuid -from contextlib import contextmanager - -import pytest -import retrying - -from aria import events -from aria.storage import models -from aria.workflows.executor import ( - thread, - multiprocess, - blocking, - # celery -) - -try: - import celery as _celery - app = _celery.Celery() - app.conf.update(CELERY_RESULT_BACKEND='amqp://') -except ImportError: - _celery = None - app = None - - -class TestExecutor(object): - - @pytest.mark.parametrize('executor_cls,executor_kwargs', [ - (thread.ThreadExecutor, {'pool_size': 1}), - (thread.ThreadExecutor, {'pool_size': 2}), - (multiprocess.MultiprocessExecutor, {'pool_size': 1}), - (multiprocess.MultiprocessExecutor, {'pool_size': 2}), - (blocking.CurrentThreadBlockingExecutor, {}), - # (celery.CeleryExecutor, {'app': app}) - ]) - def test_execute(self, executor_cls, executor_kwargs): - self.executor = executor_cls(**executor_kwargs) - expected_value = 'value' - successful_task = MockTask(mock_successful_task) - failing_task = MockTask(mock_failing_task) - task_with_inputs = MockTask(mock_task_with_input, inputs={'input': expected_value}) - - for task in [successful_task, failing_task, task_with_inputs]: - self.executor.execute(task) - - @retrying.retry(stop_max_delay=10000, wait_fixed=100) - def assertion(): - assert successful_task.states == ['start', 'success'] - assert failing_task.states == ['start', 'failure'] - assert task_with_inputs.states == ['start', 'failure'] - assert isinstance(failing_task.exception, MockException) - assert isinstance(task_with_inputs.exception, MockException) - assert task_with_inputs.exception.message == expected_value - assertion() - - def setup_method(self): - events.start_task_signal.connect(start_handler) - events.on_success_task_signal.connect(success_handler) - events.on_failure_task_signal.connect(failure_handler) - - def teardown_method(self): - events.start_task_signal.disconnect(start_handler) - events.on_success_task_signal.disconnect(success_handler) - events.on_failure_task_signal.disconnect(failure_handler) - if hasattr(self, 'executor'): - self.executor.close() - - -def mock_successful_task(): - pass - - -def mock_failing_task(): - raise MockException - - -def mock_task_with_input(input): - raise MockException(input) - -if app: - mock_successful_task = app.task(mock_successful_task) - mock_failing_task = app.task(mock_failing_task) - mock_task_with_input = app.task(mock_task_with_input) - - -class MockException(Exception): - pass - - -class MockTask(object): - - def __init__(self, func, inputs=None): - self.states = [] - self.exception = None - self.id = str(uuid.uuid4()) - name = func.__name__ - operation = 'tests.workflows.core.test_executor.{name}'.format(name=name) - self.operation_details = {'operation': operation} - self.logger = logging.getLogger() - self.name = name - self.inputs = inputs or {} - - for state in models.Task.STATES: - setattr(self, state.upper(), state) - - @contextmanager - def update(self): - yield self - - -def start_handler(task, *args, **kwargs): - task.states.append('start') - - -def success_handler(task, *args, **kwargs): - task.states.append('success') - - -def failure_handler(task, exception, *args, **kwargs): - task.states.append('failure') - task.exception = exception http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/tests/workflows/executor/__init__.py ---------------------------------------------------------------------- diff --git a/tests/workflows/executor/__init__.py b/tests/workflows/executor/__init__.py new file mode 100644 index 0000000..ae1e83e --- /dev/null +++ b/tests/workflows/executor/__init__.py @@ -0,0 +1,14 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/tests/workflows/executor/test_executor.py ---------------------------------------------------------------------- diff --git a/tests/workflows/executor/test_executor.py b/tests/workflows/executor/test_executor.py new file mode 100644 index 0000000..bbbfbc7 --- /dev/null +++ b/tests/workflows/executor/test_executor.py @@ -0,0 +1,138 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import uuid +from contextlib import contextmanager + +import pytest +import retrying + +from aria import events +from aria.storage import models +from aria.workflows.executor import ( + thread, + multiprocess, + blocking, + # celery +) + +try: + import celery as _celery + app = _celery.Celery() + app.conf.update(CELERY_RESULT_BACKEND='amqp://') +except ImportError: + _celery = None + app = None + + +class TestExecutor(object): + + @pytest.mark.parametrize('executor_cls,executor_kwargs', [ + (thread.ThreadExecutor, {'pool_size': 1}), + (thread.ThreadExecutor, {'pool_size': 2}), + (multiprocess.MultiprocessExecutor, {'pool_size': 1}), + (multiprocess.MultiprocessExecutor, {'pool_size': 2}), + (blocking.CurrentThreadBlockingExecutor, {}), + # (celery.CeleryExecutor, {'app': app}) + ]) + def test_execute(self, executor_cls, executor_kwargs): + self.executor = executor_cls(**executor_kwargs) + expected_value = 'value' + successful_task = MockTask(mock_successful_task) + failing_task = MockTask(mock_failing_task) + task_with_inputs = MockTask(mock_task_with_input, inputs={'input': expected_value}) + + for task in [successful_task, failing_task, task_with_inputs]: + self.executor.execute(task) + + @retrying.retry(stop_max_delay=10000, wait_fixed=100) + def assertion(): + assert successful_task.states == ['start', 'success'] + assert failing_task.states == ['start', 'failure'] + assert task_with_inputs.states == ['start', 'failure'] + assert isinstance(failing_task.exception, MockException) + assert isinstance(task_with_inputs.exception, MockException) + assert task_with_inputs.exception.message == expected_value + assertion() + + def setup_method(self): + events.start_task_signal.connect(start_handler) + events.on_success_task_signal.connect(success_handler) + events.on_failure_task_signal.connect(failure_handler) + + def teardown_method(self): + events.start_task_signal.disconnect(start_handler) + events.on_success_task_signal.disconnect(success_handler) + events.on_failure_task_signal.disconnect(failure_handler) + if hasattr(self, 'executor'): + self.executor.close() + + +def mock_successful_task(): + pass + + +def mock_failing_task(): + raise MockException + + +def mock_task_with_input(input): + raise MockException(input) + +if app: + mock_successful_task = app.task(mock_successful_task) + mock_failing_task = app.task(mock_failing_task) + mock_task_with_input = app.task(mock_task_with_input) + + +class MockException(Exception): + pass + + +class MockTask(object): + + def __init__(self, func, inputs=None): + self.states = [] + self.exception = None + self.id = str(uuid.uuid4()) + name = func.__name__ + operation = 'tests.workflows.executor.test_executor.{name}'.format(name=name) + self.operation_details = {'operation': operation} + self.logger = logging.getLogger() + self.name = name + self.inputs = inputs or {} + self.retry_count = 0 + self.max_retries = 0 + + for state in models.Task.STATES: + setattr(self, state.upper(), state) + + @contextmanager + def update(self): + yield self + + +def start_handler(task, *args, **kwargs): + task.states.append('start') + + +def success_handler(task, *args, **kwargs): + task.states.append('success') + + +def failure_handler(task, exception, *args, **kwargs): + task.states.append('failure') + task.exception = exception http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/tests/workflows/test_engine.py ---------------------------------------------------------------------- diff --git a/tests/workflows/test_engine.py b/tests/workflows/test_engine.py deleted file mode 100644 index ea703f5..0000000 --- a/tests/workflows/test_engine.py +++ /dev/null @@ -1,187 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from datetime import datetime - -import pytest - -import aria -from aria import events -from aria import workflow -from aria import context -from aria.storage import models -from aria.workflows import exceptions -from aria.workflows.executor import thread -from aria.workflows.core import engine -from aria.workflows import api - -from .. import mock - -import tests.storage - - -global_test_holder = {} - - -class TestEngine(object): - - def test_empty_graph_execution(self, workflow_context, executor): - @workflow - def mock_workflow(**_): - pass - self._execute(workflow_func=mock_workflow, - workflow_context=workflow_context, - executor=executor) - assert workflow_context.states == ['start', 'success'] - assert workflow_context.exception is None - assert 'sent_task_signal_calls' not in global_test_holder - - def test_single_task_successful_execution(self, workflow_context, executor): - @workflow - def mock_workflow(ctx, graph): - graph.add_tasks(self._op(mock_success_task, ctx)) - self._execute( - workflow_func=mock_workflow, - workflow_context=workflow_context, - executor=executor) - assert workflow_context.states == ['start', 'success'] - assert workflow_context.exception is None - assert global_test_holder.get('sent_task_signal_calls') == 1 - - def test_single_task_failed_execution(self, workflow_context, executor): - @workflow - def mock_workflow(ctx, graph): - graph.add_tasks(self._op(mock_failed_task, ctx)) - with pytest.raises(exceptions.ExecutorException): - self._execute( - workflow_func=mock_workflow, - workflow_context=workflow_context, - executor=executor) - assert workflow_context.states == ['start', 'failure'] - assert isinstance(workflow_context.exception, exceptions.ExecutorException) - assert global_test_holder.get('sent_task_signal_calls') == 1 - - def test_two_tasks_execution_order(self, workflow_context, executor): - @workflow - def mock_workflow(ctx, graph): - op1 = self._op(mock_ordered_task, ctx, inputs={'counter': 1}) - op2 = self._op(mock_ordered_task, ctx, inputs={'counter': 2}) - graph.sequence(op1, op2) - self._execute( - workflow_func=mock_workflow, - workflow_context=workflow_context, - executor=executor) - assert workflow_context.states == ['start', 'success'] - assert workflow_context.exception is None - assert global_test_holder.get('invocations') == [1, 2] - assert global_test_holder.get('sent_task_signal_calls') == 2 - - @staticmethod - def _execute(workflow_func, workflow_context, executor): - graph = workflow_func(ctx=workflow_context) - eng = engine.Engine(executor=executor, workflow_context=workflow_context, tasks_graph=graph) - eng.execute() - - @staticmethod - def _op(func, ctx, inputs=None): - return api.task.OperationTask( - name='task', - operation_details={'operation': 'tests.workflows.test_engine.{name}'.format( - name=func.__name__)}, - node_instance=ctx.model.node_instance.get('dependency_node_instance'), - inputs=inputs - ) - - @pytest.fixture(scope='function', autouse=True) - def globals_cleanup(self): - try: - yield - finally: - global_test_holder.clear() - - @pytest.fixture(scope='function', autouse=True) - def signals_registration(self, ): - def sent_task_handler(*args, **kwargs): - calls = global_test_holder.setdefault('sent_task_signal_calls', 0) - global_test_holder['sent_task_signal_calls'] = calls + 1 - - def start_workflow_handler(workflow_context, *args, **kwargs): - workflow_context.states.append('start') - - def success_workflow_handler(workflow_context, *args, **kwargs): - workflow_context.states.append('success') - - def failure_workflow_handler(workflow_context, exception, *args, **kwargs): - workflow_context.states.append('failure') - workflow_context.exception = exception - - events.start_workflow_signal.connect(start_workflow_handler) - events.on_success_workflow_signal.connect(success_workflow_handler) - events.on_failure_workflow_signal.connect(failure_workflow_handler) - events.sent_task_signal.connect(sent_task_handler) - try: - yield - finally: - events.start_workflow_signal.disconnect(start_workflow_handler) - events.on_success_workflow_signal.disconnect(success_workflow_handler) - events.on_failure_workflow_signal.disconnect(failure_workflow_handler) - events.sent_task_signal.disconnect(sent_task_handler) - - @pytest.fixture(scope='function') - def executor(self): - result = thread.ThreadExecutor() - try: - yield result - finally: - result.close() - - @pytest.fixture(scope='function') - def workflow_context(self): - model_storage = aria.application_model_storage(tests.storage.InMemoryModelDriver()) - model_storage.setup() - deployment = models.Deployment( - id='d1', - blueprint_id='b1', - description=None, - created_at=datetime.now(), - updated_at=datetime.now(), - workflows={}) - model_storage.deployment.store(deployment) - node = mock.models.get_dependency_node() - node_instance = mock.models.get_dependency_node_instance(node) - model_storage.node.store(node) - model_storage.node_instance.store(node_instance) - result = context.workflow.WorkflowContext( - name='test', - model_storage=model_storage, - resource_storage=None, - deployment_id=deployment.id, - workflow_id='name') - result.states = [] - result.exception = None - return result - - -def mock_success_task(): - pass - - -def mock_failed_task(): - raise RuntimeError - - -def mock_ordered_task(counter): - invocations = global_test_holder.setdefault('invocations', []) - invocations.append(counter)
