Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-11-wf-cancel 391d6fc5e -> fa1b5e96a (forced update)
change, fix and add some Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/fa1b5e96 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/fa1b5e96 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/fa1b5e96 Branch: refs/heads/ARIA-11-wf-cancel Commit: fa1b5e96a0d51964344417fa84aa5236509eeff1 Parents: a4b7894 Author: Dan Kilman <[email protected]> Authored: Thu Nov 10 13:52:05 2016 +0200 Committer: Dan Kilman <[email protected]> Committed: Sun Nov 13 16:43:18 2016 +0200 ---------------------------------------------------------------------- aria/context/workflow.py | 26 ++++++---- aria/events/__init__.py | 2 +- aria/events/builtin_event_handler.py | 10 ++-- aria/events/workflow_engine_event_handler.py | 4 +- aria/storage/models.py | 22 +++++++- aria/storage/structures.py | 11 ++-- aria/tools/validation.py | 11 ---- aria/workflows/core/engine.py | 22 ++++---- tests/context/test_workflow.py | 3 ++ tests/storage/test_field.py | 24 +++++++-- tests/storage/test_models.py | 62 ++++++++++++++++++++++- tests/workflows/core/test_engine.py | 38 +++++++------- 12 files changed, 169 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fa1b5e96/aria/context/workflow.py ---------------------------------------------------------------------- diff --git a/aria/context/workflow.py b/aria/context/workflow.py index 9c5abdc..329757a 100644 --- a/aria/context/workflow.py +++ b/aria/context/workflow.py @@ -61,20 +61,12 @@ class WorkflowContext(logger.LoggerMixin): self.parameters = parameters or {} self.task_max_retries = task_max_retries self.task_retry_interval = task_retry_interval - + # TODO: execution creation should happen somewhere else + # should be moved there, when such logical place exists try: self.model.execution.get(self.execution_id) except exceptions.StorageError: - execution_cls = self.model.execution.model_cls - execution = self.model.execution.model_cls( - id=self.execution_id, - deployment_id=self.deployment_id, - workflow_id=self.workflow_id, - blueprint_id=self.blueprint_id, - status=execution_cls.PENDING, - parameters=self.parameters, - ) - self.model.execution.store(execution) + self._create_execution() def __repr__(self): return ( @@ -83,6 +75,18 @@ class WorkflowContext(logger.LoggerMixin): 'execution_id={self.execution_id})'.format( name=self.__class__.__name__, self=self)) + def _create_execution(self): + execution_cls = self.model.execution.model_cls + execution = self.model.execution.model_cls( + id=self.execution_id, + deployment_id=self.deployment_id, + workflow_id=self.workflow_id, + blueprint_id=self.blueprint_id, + status=execution_cls.PENDING, + parameters=self.parameters, + ) + self.model.execution.store(execution) + @property def blueprint_id(self): """ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fa1b5e96/aria/events/__init__.py ---------------------------------------------------------------------- diff --git a/aria/events/__init__.py b/aria/events/__init__.py index a158308..2e88733 100644 --- a/aria/events/__init__.py +++ b/aria/events/__init__.py @@ -47,7 +47,7 @@ on_failure_task_signal = signal('failure_task_signal') # workflow engine workflow signals: start_workflow_signal = signal('start_workflow_signal') on_cancelling_workflow_signal = signal('on_cancelling_workflow_signal') -on_cancel_workflow_signal = signal('on_cancel_workflow_signal') +on_cancelled_workflow_signal = signal('on_cancelled_workflow_signal') on_success_workflow_signal = signal('on_success_workflow_signal') on_failure_workflow_signal = signal('on_failure_workflow_signal') http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fa1b5e96/aria/events/builtin_event_handler.py ---------------------------------------------------------------------- diff --git a/aria/events/builtin_event_handler.py b/aria/events/builtin_event_handler.py index 11ee744..ac65978 100644 --- a/aria/events/builtin_event_handler.py +++ b/aria/events/builtin_event_handler.py @@ -30,7 +30,7 @@ from . import ( start_workflow_signal, on_success_workflow_signal, on_failure_workflow_signal, - on_cancel_workflow_signal, + on_cancelled_workflow_signal, on_cancelling_workflow_signal, sent_task_signal, start_task_signal, @@ -96,9 +96,13 @@ def _workflow_succeeded(workflow_context, *args, **kwargs): workflow_context.execution = execution -@on_cancel_workflow_signal.connect +@on_cancelled_workflow_signal.connect def _workflow_cancelled(workflow_context, *args, **kwargs): execution = workflow_context.execution + # _workflow_cancelling function may have called this function + # already + if execution.status == execution.CANCELLED: + return execution.status = execution.CANCELLED execution.ended_at = datetime.utcnow() workflow_context.execution = execution @@ -107,8 +111,6 @@ def _workflow_cancelled(workflow_context, *args, **kwargs): @on_cancelling_workflow_signal.connect def _workflow_cancelling(workflow_context, *args, **kwargs): execution = workflow_context.execution - # TODO: handle concurrency, locks may not be enough as this may - # be modified somewhere else entirely if execution.status == execution.PENDING: return _workflow_cancelled(workflow_context=workflow_context) execution.status = execution.CANCELLING http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fa1b5e96/aria/events/workflow_engine_event_handler.py ---------------------------------------------------------------------- diff --git a/aria/events/workflow_engine_event_handler.py b/aria/events/workflow_engine_event_handler.py index 4067674..7df11d1 100644 --- a/aria/events/workflow_engine_event_handler.py +++ b/aria/events/workflow_engine_event_handler.py @@ -28,7 +28,7 @@ from . import ( start_workflow_signal, on_success_workflow_signal, on_failure_workflow_signal, - on_cancel_workflow_signal, + on_cancelled_workflow_signal, on_cancelling_workflow_signal, ) @@ -64,7 +64,7 @@ def _success_workflow_handler(context, **kwargs): context.logger.debug('Event: Workflow success: {context.name}'.format(context=context)) -@on_cancel_workflow_signal.connect +@on_cancelled_workflow_signal.connect def _cancel_workflow_handler(context, **kwargs): context.logger.debug('Event: Workflow cancelled: {context.name}'.format(context=context)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fa1b5e96/aria/storage/models.py ---------------------------------------------------------------------- diff --git a/aria/storage/models.py b/aria/storage/models.py index eb943aa..0c704b8 100644 --- a/aria/storage/models.py +++ b/aria/storage/models.py @@ -181,6 +181,18 @@ class DeploymentUpdate(Model): steps = IterPointerField(type=DeploymentUpdateStep, default=()) +def _execution_status_transition_validation(_, value, instance): + try: + current_status = instance.status + except AttributeError: + return + valid_transitions = Execution.VALID_TRANSITIONS.get(current_status, []) + if current_status != value and value not in valid_transitions: + raise ValueError('Cannot change execution status from {current} to {new}'.format( + current=current_status, + new=value)) + + class Execution(Model): """ A Model which represents an execution @@ -201,17 +213,23 @@ class Execution(Model): ) END_STATES = [TERMINATED, FAILED, CANCELLED] ACTIVE_STATES = [state for state in STATES if state not in END_STATES] + VALID_TRANSITIONS = { + PENDING: [STARTED, CANCELLED], + STARTED: END_STATES + [CANCELLING], + CANCELLING: END_STATES + } id = Field(type=basestring, default=uuid_generator) - status = Field(type=basestring, choices=STATES) + status = Field(type=basestring, choices=STATES, + validation_func=_execution_status_transition_validation) deployment_id = Field(type=basestring) workflow_id = Field(type=basestring) blueprint_id = Field(type=basestring) + created_at = Field(type=datetime, default=datetime.utcnow) started_at = Field(type=datetime, default=None) ended_at = Field(type=datetime, default=None) error = Field(type=basestring, default=None) parameters = Field() - is_system_workflow = Field(type=bool, default=False) class Relationship(Model): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fa1b5e96/aria/storage/structures.py ---------------------------------------------------------------------- diff --git a/aria/storage/structures.py b/aria/storage/structures.py index c692d36..a26e7eb 100644 --- a/aria/storage/structures.py +++ b/aria/storage/structures.py @@ -72,6 +72,7 @@ class Field(ValidatorMixin): self, type=None, choices=(), + validation_func=None, default=NO_DEFAULT, **kwargs): """ @@ -85,6 +86,7 @@ class Field(ValidatorMixin): self.type = type self.choices = choices self.default = default + self.validation_func = validation_func super(Field, self).__init__(**kwargs) def __get__(self, instance, owner): @@ -104,15 +106,16 @@ class Field(ValidatorMixin): def __set__(self, instance, value): field_name = self._field_name(instance) - self.validate_value(field_name, value) + self.validate_value(field_name, value, instance) setattr(instance, self._ATTRIBUTE_NAME(field_name), value) - def validate_value(self, name, value): + def validate_value(self, name, value, instance): """ Validates the value of the field. :param name: the name of the field. :param value: the value of the field. + :param instance: the instance containing the field. """ if self.default != self.NO_DEFAULT and value == self.default: return @@ -120,6 +123,8 @@ class Field(ValidatorMixin): self.validate_instance(name, value, self.type) if self.choices: self.validate_in_choice(name, value, self.choices) + if self.validation_func: + self.validation_func(name, value, instance) def _field_name(self, instance): """ @@ -147,7 +152,7 @@ class IterField(Field): """ super(IterField, self).__init__(choices=(), **kwargs) - def validate_value(self, name, values): + def validate_value(self, name, values, *args): """ Validates the value of each iterable value. http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fa1b5e96/aria/tools/validation.py ---------------------------------------------------------------------- diff --git a/aria/tools/validation.py b/aria/tools/validation.py index ea1dae7..a33f7a2 100644 --- a/aria/tools/validation.py +++ b/aria/tools/validation.py @@ -24,20 +24,9 @@ class ValidatorMixin(object): """ _ARGUMENT_TYPE_MESSAGE = '{name} argument must be {type} based, got {arg!r}' - _ACTION_MESSAGE = 'action arg options: {actions}, got {action}' _ARGUMENT_CHOICE_MESSAGE = '{name} argument must be in {choices}, got {arg!r}' @classmethod - def validate_actions(cls, action): - """ - Validate action is defined in the class ``ACTIONS`` attribute - """ - # todo: remove this and use validate choice - if action not in cls.ACTIONS: - raise TypeError(cls._ACTION_MESSAGE.format( - actions=cls.ACTIONS, action=action)) - - @classmethod def validate_in_choice(cls, name, argument, choices): """ Validate ``argument`` is in ``choices`` http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fa1b5e96/aria/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/workflows/core/engine.py b/aria/workflows/core/engine.py index c4a049c..9578870 100644 --- a/aria/workflows/core/engine.py +++ b/aria/workflows/core/engine.py @@ -48,7 +48,12 @@ class Engine(logger.LoggerMixin): """ try: events.start_workflow_signal.send(self._workflow_context) - while not self._is_cancelling(): + execution_status = None + cancel_statuses = [models.Execution.CANCELLING, models.Execution.CANCELLED] + while True: + execution_status = self._get_execution_status() + if execution_status in cancel_statuses: + break for task in self._ended_tasks(): self._handle_ended_tasks(task) for task in self._executable_tasks(): @@ -57,27 +62,24 @@ class Engine(logger.LoggerMixin): break else: time.sleep(0.1) - if self._is_cancelling(): - events.on_cancel_workflow_signal.send(self._workflow_context) + if execution_status in cancel_statuses: + events.on_cancelled_workflow_signal.send(self._workflow_context) else: events.on_success_workflow_signal.send(self._workflow_context) except BaseException as e: events.on_failure_workflow_signal.send(self._workflow_context, exception=e) raise - def cancel_request(self): + def cancel_execution(self): """ Send a cancel request to the engine. If execution already started, execution status will be modified to 'cancelling' status. If execution is in pending mode, execution status - will be modified to 'cancelled' directly. If execution is in one if its ended state, an - AriaEngineError will be raised + will be modified to 'cancelled' directly. """ - if self._workflow_context.execution.status not in models.Execution.ACTIVE_STATES: - raise exceptions.AriaEngineError('Execution already ended') events.on_cancelling_workflow_signal.send(self._workflow_context) - def _is_cancelling(self): - return self._workflow_context.execution.status == models.Execution.CANCELLING + def _get_execution_status(self): + return self._workflow_context.execution.status def _executable_tasks(self): now = datetime.utcnow() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fa1b5e96/tests/context/test_workflow.py ---------------------------------------------------------------------- diff --git a/tests/context/test_workflow.py b/tests/context/test_workflow.py index 2e19aa2..b40a66d 100644 --- a/tests/context/test_workflow.py +++ b/tests/context/test_workflow.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from datetime import datetime + import pytest from aria import context, application_model_storage @@ -32,6 +34,7 @@ class TestWorkflowContext(object): assert execution.blueprint_id == models.BLUEPRINT_ID assert execution.status == storage.execution.model_cls.PENDING assert execution.parameters == {} + assert execution.created_at <= datetime.utcnow() def test_subsequent_workflow_context_creation_do_not_fail(self, storage): self._create_ctx(storage) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fa1b5e96/tests/storage/test_field.py ---------------------------------------------------------------------- diff --git a/tests/storage/test_field.py b/tests/storage/test_field.py index fcabe80..cab218f 100644 --- a/tests/storage/test_field.py +++ b/tests/storage/test_field.py @@ -90,9 +90,9 @@ def test_pointer_field(): vars(PointerField(type=Model, choices=(), default=Field.NO_DEFAULT)) with pytest.raises(AssertionError): PointerField(type=list) - pointer_field.validate_value('pointer_field', test_model) + pointer_field.validate_value('pointer_field', test_model, None) with pytest.raises(TypeError): - pointer_field.validate_value('pointer_field', int) + pointer_field.validate_value('pointer_field', int, None) def test_iterable_pointer_field(): @@ -103,6 +103,22 @@ def test_iterable_pointer_field(): with pytest.raises(AssertionError): IterPointerField(type=list) - iter_pointer_field.validate_value('iter_pointer_field', [test_model, test_model]) + iter_pointer_field.validate_value('iter_pointer_field', [test_model, test_model], None) with pytest.raises(TypeError): - iter_pointer_field.validate_value('iter_pointer_field', [int, test_model]) + iter_pointer_field.validate_value('iter_pointer_field', [int, test_model], None) + + +def test_custom_field_validation(): + def validation_func(name, value, instance): + assert name == 'id' + assert value == 'value' + assert isinstance(instance, TestModel) + + class TestModel(Model): + id = Field(default='_', validation_func=validation_func) + + obj = TestModel() + obj.id = 'value' + + with pytest.raises(AssertionError): + obj.id = 'not_value' http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fa1b5e96/tests/storage/test_models.py ---------------------------------------------------------------------- diff --git a/tests/storage/test_models.py b/tests/storage/test_models.py index 19262bb..e524b61 100644 --- a/tests/storage/test_models.py +++ b/tests/storage/test_models.py @@ -26,7 +26,8 @@ from aria.storage.models import ( RelationshipInstance, Node, NodeInstance, - Blueprint) + Blueprint, + Execution) # TODO: add tests per model @@ -273,3 +274,62 @@ def test_relationship_instance(): assert set(relationship_instances) == set(chain( node_instance.relationships_by_target('target_0'), node_instance.relationships_by_target('target_1'))) + + +def test_execution_status_transition(): + def create_execution(status): + return Execution( + id='e_id', + deployment_id='d_id', + workflow_id='w_id', + blueprint_id='b_id', + status=status, + parameters={} + ) + + valid_transitions = { + Execution.PENDING: [Execution.STARTED, + Execution.CANCELLED], + Execution.STARTED: [Execution.FAILED, + Execution.TERMINATED, + Execution.CANCELLED, + Execution.CANCELLING], + Execution.CANCELLING: [Execution.FAILED, + Execution.TERMINATED, + Execution.CANCELLED] + } + + invalid_transitions = { + Execution.PENDING: [Execution.FAILED, + Execution.TERMINATED, + Execution.CANCELLING], + Execution.STARTED: [Execution.PENDING], + Execution.CANCELLING: [Execution.PENDING, + Execution.STARTED], + Execution.FAILED: [Execution.PENDING, + Execution.STARTED, + Execution.TERMINATED, + Execution.CANCELLED, + Execution.CANCELLING], + Execution.TERMINATED: [Execution.PENDING, + Execution.STARTED, + Execution.FAILED, + Execution.CANCELLED, + Execution.CANCELLING], + Execution.CANCELLED: [Execution.PENDING, + Execution.STARTED, + Execution.FAILED, + Execution.TERMINATED, + Execution.CANCELLING], + } + + for current_status, valid_transitioned_statues in valid_transitions.items(): + for transitioned_status in valid_transitioned_statues: + execution = create_execution(current_status) + execution.status = transitioned_status + + for current_status, invalid_transitioned_statues in invalid_transitions.items(): + for transitioned_status in invalid_transitioned_statues: + execution = create_execution(current_status) + with pytest.raises(ValueError): + execution.status = transitioned_status http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fa1b5e96/tests/workflows/core/test_engine.py ---------------------------------------------------------------------- diff --git a/tests/workflows/core/test_engine.py b/tests/workflows/core/test_engine.py index 748281e..984d5da 100644 --- a/tests/workflows/core/test_engine.py +++ b/tests/workflows/core/test_engine.py @@ -94,7 +94,7 @@ class BaseTest(object): events.start_workflow_signal.connect(start_workflow_handler) events.on_success_workflow_signal.connect(success_workflow_handler) events.on_failure_workflow_signal.connect(failure_workflow_handler) - events.on_cancel_workflow_signal.connect(cancel_workflow_handler) + events.on_cancelled_workflow_signal.connect(cancel_workflow_handler) events.sent_task_signal.connect(sent_task_handler) try: yield @@ -102,7 +102,7 @@ class BaseTest(object): events.start_workflow_signal.disconnect(start_workflow_handler) events.on_success_workflow_signal.disconnect(success_workflow_handler) events.on_failure_workflow_signal.disconnect(failure_workflow_handler) - events.on_cancel_workflow_signal.disconnect(cancel_workflow_handler) + events.on_cancelled_workflow_signal.disconnect(cancel_workflow_handler) events.sent_task_signal.disconnect(sent_task_handler) @pytest.fixture(scope='function') @@ -152,6 +152,10 @@ class TestEngine(BaseTest): assert workflow_context.states == ['start', 'success'] assert workflow_context.exception is None assert 'sent_task_signal_calls' not in global_test_holder + execution = workflow_context.execution + assert execution.started_at <= execution.ended_at <= datetime.utcnow() + assert execution.error is None + assert execution.status == models.Execution.TERMINATED def test_single_task_successful_execution(self, workflow_context, executor): @workflow @@ -177,6 +181,10 @@ class TestEngine(BaseTest): assert workflow_context.states == ['start', 'failure'] assert isinstance(workflow_context.exception, exceptions.ExecutorException) assert global_test_holder.get('sent_task_signal_calls') == 1 + execution = workflow_context.execution + assert execution.started_at <= execution.ended_at <= datetime.utcnow() + assert execution.error is not None + assert execution.status == models.Execution.FAILED def test_two_tasks_execution_order(self, workflow_context, executor): @workflow @@ -212,6 +220,9 @@ class TestEngine(BaseTest): assert global_test_holder.get('invocations') == [1, 2] assert global_test_holder.get('sent_task_signal_calls') == 2 + +class TestCancel(BaseTest): + def test_cancel_started_execution(self, workflow_context, executor): number_of_tasks = 100 @@ -225,22 +236,16 @@ class TestEngine(BaseTest): t = threading.Thread(target=eng.execute) t.start() time.sleep(1) - eng.cancel_request() + eng.cancel_execution() t.join(timeout=30) assert workflow_context.states == ['start', 'cancel'] assert workflow_context.exception is None invocations = global_test_holder.get('invocations', []) assert 0 < len(invocations) < number_of_tasks - - def test_invalid_cancel_ended_execution(self, workflow_context, executor): - @workflow - def mock_workflow(**_): - pass - eng = self._execute(workflow_func=mock_workflow, - workflow_context=workflow_context, - executor=executor) - with pytest.raises(exceptions.AriaEngineError): - eng.cancel_request() + execution = workflow_context.execution + assert execution.started_at <= execution.ended_at <= datetime.utcnow() + assert execution.error is None + assert execution.status == models.Execution.CANCELLED def test_cancel_pending_execution(self, workflow_context, executor): @workflow @@ -249,10 +254,9 @@ class TestEngine(BaseTest): eng = self._engine(workflow_func=mock_workflow, workflow_context=workflow_context, executor=executor) - eng.cancel_request() - # sanity to verify previous cancel request actually did something - with pytest.raises(exceptions.AriaEngineError): - eng.cancel_request() + eng.cancel_execution() + execution = workflow_context.execution + assert execution.status == models.Execution.CANCELLED class TestRetries(BaseTest):
