Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-11-wf-cancel a4b7894a5 -> 391d6fc5e
change, fix and add some Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/391d6fc5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/391d6fc5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/391d6fc5 Branch: refs/heads/ARIA-11-wf-cancel Commit: 391d6fc5e54ed9da6b44dd677cd230c42c373c89 Parents: a4b7894 Author: Dan Kilman <[email protected]> Authored: Thu Nov 10 13:52:05 2016 +0200 Committer: Dan Kilman <[email protected]> Committed: Thu Nov 10 15:38:04 2016 +0200 ---------------------------------------------------------------------- aria/context/workflow.py | 26 ++++++++++------- aria/events/__init__.py | 2 +- aria/events/builtin_event_handler.py | 4 +-- aria/events/workflow_engine_event_handler.py | 4 +-- aria/storage/models.py | 2 +- aria/storage/structures.py | 4 +++ aria/tools/validation.py | 11 ------- aria/workflows/core/engine.py | 9 ++---- tests/context/test_workflow.py | 3 ++ tests/workflows/core/test_engine.py | 35 ++++++++++++----------- 10 files changed, 49 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/391d6fc5/aria/context/workflow.py ---------------------------------------------------------------------- diff --git a/aria/context/workflow.py b/aria/context/workflow.py index 9c5abdc..329757a 100644 --- a/aria/context/workflow.py +++ b/aria/context/workflow.py @@ -61,20 +61,12 @@ class WorkflowContext(logger.LoggerMixin): self.parameters = parameters or {} self.task_max_retries = task_max_retries self.task_retry_interval = task_retry_interval - + # TODO: execution creation should happen somewhere else + # should be moved there, when such logical place exists try: self.model.execution.get(self.execution_id) except exceptions.StorageError: - execution_cls = self.model.execution.model_cls - execution = self.model.execution.model_cls( - id=self.execution_id, - deployment_id=self.deployment_id, - workflow_id=self.workflow_id, - blueprint_id=self.blueprint_id, - status=execution_cls.PENDING, - parameters=self.parameters, - ) - self.model.execution.store(execution) + self._create_execution() def __repr__(self): return ( @@ -83,6 +75,18 @@ class WorkflowContext(logger.LoggerMixin): 'execution_id={self.execution_id})'.format( name=self.__class__.__name__, self=self)) + def _create_execution(self): + execution_cls = self.model.execution.model_cls + execution = self.model.execution.model_cls( + id=self.execution_id, + deployment_id=self.deployment_id, + workflow_id=self.workflow_id, + blueprint_id=self.blueprint_id, + status=execution_cls.PENDING, + parameters=self.parameters, + ) + self.model.execution.store(execution) + @property def blueprint_id(self): """ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/391d6fc5/aria/events/__init__.py ---------------------------------------------------------------------- diff --git a/aria/events/__init__.py b/aria/events/__init__.py index a158308..2e88733 100644 --- a/aria/events/__init__.py +++ b/aria/events/__init__.py @@ -47,7 +47,7 @@ on_failure_task_signal = signal('failure_task_signal') # workflow engine workflow signals: start_workflow_signal = signal('start_workflow_signal') on_cancelling_workflow_signal = signal('on_cancelling_workflow_signal') -on_cancel_workflow_signal = signal('on_cancel_workflow_signal') +on_cancelled_workflow_signal = signal('on_cancelled_workflow_signal') on_success_workflow_signal = signal('on_success_workflow_signal') on_failure_workflow_signal = signal('on_failure_workflow_signal') http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/391d6fc5/aria/events/builtin_event_handler.py ---------------------------------------------------------------------- diff --git a/aria/events/builtin_event_handler.py b/aria/events/builtin_event_handler.py index 11ee744..3d73f2b 100644 --- a/aria/events/builtin_event_handler.py +++ b/aria/events/builtin_event_handler.py @@ -30,7 +30,7 @@ from . import ( start_workflow_signal, on_success_workflow_signal, on_failure_workflow_signal, - on_cancel_workflow_signal, + on_cancelled_workflow_signal, on_cancelling_workflow_signal, sent_task_signal, start_task_signal, @@ -96,7 +96,7 @@ def _workflow_succeeded(workflow_context, *args, **kwargs): workflow_context.execution = execution -@on_cancel_workflow_signal.connect +@on_cancelled_workflow_signal.connect def _workflow_cancelled(workflow_context, *args, **kwargs): execution = workflow_context.execution execution.status = execution.CANCELLED http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/391d6fc5/aria/events/workflow_engine_event_handler.py ---------------------------------------------------------------------- diff --git a/aria/events/workflow_engine_event_handler.py b/aria/events/workflow_engine_event_handler.py index 4067674..7df11d1 100644 --- a/aria/events/workflow_engine_event_handler.py +++ b/aria/events/workflow_engine_event_handler.py @@ -28,7 +28,7 @@ from . import ( start_workflow_signal, on_success_workflow_signal, on_failure_workflow_signal, - on_cancel_workflow_signal, + on_cancelled_workflow_signal, on_cancelling_workflow_signal, ) @@ -64,7 +64,7 @@ def _success_workflow_handler(context, **kwargs): context.logger.debug('Event: Workflow success: {context.name}'.format(context=context)) -@on_cancel_workflow_signal.connect +@on_cancelled_workflow_signal.connect def _cancel_workflow_handler(context, **kwargs): context.logger.debug('Event: Workflow cancelled: {context.name}'.format(context=context)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/391d6fc5/aria/storage/models.py ---------------------------------------------------------------------- diff --git a/aria/storage/models.py b/aria/storage/models.py index eb943aa..a63f0e9 100644 --- a/aria/storage/models.py +++ b/aria/storage/models.py @@ -207,11 +207,11 @@ class Execution(Model): deployment_id = Field(type=basestring) workflow_id = Field(type=basestring) blueprint_id = Field(type=basestring) + created_at = Field(type=datetime, default=datetime.utcnow) started_at = Field(type=datetime, default=None) ended_at = Field(type=datetime, default=None) error = Field(type=basestring, default=None) parameters = Field() - is_system_workflow = Field(type=bool, default=False) class Relationship(Model): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/391d6fc5/aria/storage/structures.py ---------------------------------------------------------------------- diff --git a/aria/storage/structures.py b/aria/storage/structures.py index c692d36..9a1524a 100644 --- a/aria/storage/structures.py +++ b/aria/storage/structures.py @@ -72,6 +72,7 @@ class Field(ValidatorMixin): self, type=None, choices=(), + validation_func=None, default=NO_DEFAULT, **kwargs): """ @@ -85,6 +86,7 @@ class Field(ValidatorMixin): self.type = type self.choices = choices self.default = default + self.validation_func = validation_func super(Field, self).__init__(**kwargs) def __get__(self, instance, owner): @@ -120,6 +122,8 @@ class Field(ValidatorMixin): self.validate_instance(name, value, self.type) if self.choices: self.validate_in_choice(name, value, self.choices) + if self.validation_func: + pass def _field_name(self, instance): """ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/391d6fc5/aria/tools/validation.py ---------------------------------------------------------------------- diff --git a/aria/tools/validation.py b/aria/tools/validation.py index ea1dae7..a33f7a2 100644 --- a/aria/tools/validation.py +++ b/aria/tools/validation.py @@ -24,20 +24,9 @@ class ValidatorMixin(object): """ _ARGUMENT_TYPE_MESSAGE = '{name} argument must be {type} based, got {arg!r}' - _ACTION_MESSAGE = 'action arg options: {actions}, got {action}' _ARGUMENT_CHOICE_MESSAGE = '{name} argument must be in {choices}, got {arg!r}' @classmethod - def validate_actions(cls, action): - """ - Validate action is defined in the class ``ACTIONS`` attribute - """ - # todo: remove this and use validate choice - if action not in cls.ACTIONS: - raise TypeError(cls._ACTION_MESSAGE.format( - actions=cls.ACTIONS, action=action)) - - @classmethod def validate_in_choice(cls, name, argument, choices): """ Validate ``argument`` is in ``choices`` http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/391d6fc5/aria/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/workflows/core/engine.py b/aria/workflows/core/engine.py index c4a049c..26354eb 100644 --- a/aria/workflows/core/engine.py +++ b/aria/workflows/core/engine.py @@ -58,22 +58,19 @@ class Engine(logger.LoggerMixin): else: time.sleep(0.1) if self._is_cancelling(): - events.on_cancel_workflow_signal.send(self._workflow_context) + events.on_cancelled_workflow_signal.send(self._workflow_context) else: events.on_success_workflow_signal.send(self._workflow_context) except BaseException as e: events.on_failure_workflow_signal.send(self._workflow_context, exception=e) raise - def cancel_request(self): + def cancel_execution(self): """ Send a cancel request to the engine. If execution already started, execution status will be modified to 'cancelling' status. If execution is in pending mode, execution status - will be modified to 'cancelled' directly. If execution is in one if its ended state, an - AriaEngineError will be raised + will be modified to 'cancelled' directly. """ - if self._workflow_context.execution.status not in models.Execution.ACTIVE_STATES: - raise exceptions.AriaEngineError('Execution already ended') events.on_cancelling_workflow_signal.send(self._workflow_context) def _is_cancelling(self): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/391d6fc5/tests/context/test_workflow.py ---------------------------------------------------------------------- diff --git a/tests/context/test_workflow.py b/tests/context/test_workflow.py index 2e19aa2..b40a66d 100644 --- a/tests/context/test_workflow.py +++ b/tests/context/test_workflow.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from datetime import datetime + import pytest from aria import context, application_model_storage @@ -32,6 +34,7 @@ class TestWorkflowContext(object): assert execution.blueprint_id == models.BLUEPRINT_ID assert execution.status == storage.execution.model_cls.PENDING assert execution.parameters == {} + assert execution.created_at <= datetime.utcnow() def test_subsequent_workflow_context_creation_do_not_fail(self, storage): self._create_ctx(storage) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/391d6fc5/tests/workflows/core/test_engine.py ---------------------------------------------------------------------- diff --git a/tests/workflows/core/test_engine.py b/tests/workflows/core/test_engine.py index 748281e..3ddbaf0 100644 --- a/tests/workflows/core/test_engine.py +++ b/tests/workflows/core/test_engine.py @@ -94,7 +94,7 @@ class BaseTest(object): events.start_workflow_signal.connect(start_workflow_handler) events.on_success_workflow_signal.connect(success_workflow_handler) events.on_failure_workflow_signal.connect(failure_workflow_handler) - events.on_cancel_workflow_signal.connect(cancel_workflow_handler) + events.on_cancelled_workflow_signal.connect(cancel_workflow_handler) events.sent_task_signal.connect(sent_task_handler) try: yield @@ -102,7 +102,7 @@ class BaseTest(object): events.start_workflow_signal.disconnect(start_workflow_handler) events.on_success_workflow_signal.disconnect(success_workflow_handler) events.on_failure_workflow_signal.disconnect(failure_workflow_handler) - events.on_cancel_workflow_signal.disconnect(cancel_workflow_handler) + events.on_cancelled_workflow_signal.disconnect(cancel_workflow_handler) events.sent_task_signal.disconnect(sent_task_handler) @pytest.fixture(scope='function') @@ -152,6 +152,10 @@ class TestEngine(BaseTest): assert workflow_context.states == ['start', 'success'] assert workflow_context.exception is None assert 'sent_task_signal_calls' not in global_test_holder + execution = workflow_context.execution + assert execution.started_at <= execution.ended_at <= datetime.utcnow() + assert execution.error is None + assert execution.status == models.Execution.TERMINATED def test_single_task_successful_execution(self, workflow_context, executor): @workflow @@ -177,6 +181,10 @@ class TestEngine(BaseTest): assert workflow_context.states == ['start', 'failure'] assert isinstance(workflow_context.exception, exceptions.ExecutorException) assert global_test_holder.get('sent_task_signal_calls') == 1 + execution = workflow_context.execution + assert execution.started_at <= execution.ended_at <= datetime.utcnow() + assert execution.error is not None + assert execution.status == models.Execution.FAILED def test_two_tasks_execution_order(self, workflow_context, executor): @workflow @@ -225,22 +233,16 @@ class TestEngine(BaseTest): t = threading.Thread(target=eng.execute) t.start() time.sleep(1) - eng.cancel_request() + eng.cancel_execution() t.join(timeout=30) assert workflow_context.states == ['start', 'cancel'] assert workflow_context.exception is None invocations = global_test_holder.get('invocations', []) assert 0 < len(invocations) < number_of_tasks - - def test_invalid_cancel_ended_execution(self, workflow_context, executor): - @workflow - def mock_workflow(**_): - pass - eng = self._execute(workflow_func=mock_workflow, - workflow_context=workflow_context, - executor=executor) - with pytest.raises(exceptions.AriaEngineError): - eng.cancel_request() + execution = workflow_context.execution + assert execution.started_at <= execution.ended_at <= datetime.utcnow() + assert execution.error is None + assert execution.status == models.Execution.CANCELLED def test_cancel_pending_execution(self, workflow_context, executor): @workflow @@ -249,10 +251,9 @@ class TestEngine(BaseTest): eng = self._engine(workflow_func=mock_workflow, workflow_context=workflow_context, executor=executor) - eng.cancel_request() - # sanity to verify previous cancel request actually did something - with pytest.raises(exceptions.AriaEngineError): - eng.cancel_request() + eng.cancel_execution() + execution = workflow_context.execution + assert execution.status == models.Execution.CANCELLED class TestRetries(BaseTest):
