Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-414-Current-events-handler-mechanism-relies-on-sqlalchemy b9d94f576 -> c77c0e448 (forced update)
ARIA-414 Current events handler mechanism relies on sqlalchemy Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/c77c0e44 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/c77c0e44 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/c77c0e44 Branch: refs/heads/ARIA-414-Current-events-handler-mechanism-relies-on-sqlalchemy Commit: c77c0e448d1241689d0154cee2dc5b44e72a83cd Parents: e71ddc9 Author: max-orlov <[email protected]> Authored: Tue Nov 28 17:19:48 2017 +0200 Committer: max-orlov <[email protected]> Committed: Wed Nov 29 19:04:22 2017 +0200 ---------------------------------------------------------------------- aria/orchestrator/context/operation.py | 7 - aria/orchestrator/context/workflow.py | 6 - .../workflows/core/events_handler.py | 164 ++++++++++--------- aria/orchestrator/workflows/executor/base.py | 5 +- aria/orchestrator/workflows/executor/dry.py | 51 +++--- .../orchestrator/workflows/executor/__init__.py | 20 +-- .../workflows/executor/test_executor.py | 59 +++---- .../workflows/executor/test_process_executor.py | 21 +-- tests/orchestrator/workflows/helpers.py | 18 ++ 9 files changed, 183 insertions(+), 168 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c77c0e44/aria/orchestrator/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py index 8613ec3..2961aa2 100644 --- a/aria/orchestrator/context/operation.py +++ b/aria/orchestrator/context/operation.py @@ -18,7 +18,6 @@ Operation contexts. """ import threading -from contextlib import contextmanager import aria from aria.utils import file @@ -106,12 +105,6 @@ class BaseOperationContext(common.BaseContext): self.model.log._session.remove() self.model.log._engine.dispose() - @property - @contextmanager - def persist_changes(self): - yield - self.model.task.update(self.task) - class NodeOperationContext(BaseOperationContext): """ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c77c0e44/aria/orchestrator/context/workflow.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py index 738d2fd..ba66a78 100644 --- a/aria/orchestrator/context/workflow.py +++ b/aria/orchestrator/context/workflow.py @@ -93,12 +93,6 @@ class WorkflowContext(BaseContext): } ) - @property - @contextmanager - def persist_changes(self): - yield - self._model.execution.update(self.execution) - class _CurrentContext(threading.local): """ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c77c0e44/aria/orchestrator/workflows/core/events_handler.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py index 473475e..2867216 100644 --- a/aria/orchestrator/workflows/core/events_handler.py +++ b/aria/orchestrator/workflows/core/events_handler.py @@ -28,126 +28,130 @@ from ... import exceptions @events.sent_task_signal.connect def _task_sent(ctx, *args, **kwargs): - with ctx.persist_changes: - ctx.task.status = ctx.task.SENT + task = ctx.task + task.status = ctx.task.SENT + ctx.model.task.update(task) @events.start_task_signal.connect def _task_started(ctx, *args, **kwargs): - with ctx.persist_changes: - ctx.task.started_at = datetime.utcnow() - ctx.task.status = ctx.task.STARTED - _update_node_state_if_necessary(ctx, is_transitional=True) + task = ctx.task + ctx.task.started_at = datetime.utcnow() + ctx.task.status = ctx.task.STARTED + _update_node_state_if_necessary(ctx, is_transitional=True) + ctx.model.task.update(task) @events.on_failure_task_signal.connect def _task_failed(ctx, exception, *args, **kwargs): - with ctx.persist_changes: - should_retry = all([ - not isinstance(exception, exceptions.TaskAbortException), - ctx.task.attempts_count < ctx.task.max_attempts or - ctx.task.max_attempts == ctx.task.INFINITE_RETRIES, - # ignore_failure check here means the task will not be retried and it will be marked - # as failed. The engine will also look at ignore_failure so it won't fail the - # workflow. - not ctx.task.ignore_failure - ]) - if should_retry: - retry_interval = None - if isinstance(exception, exceptions.TaskRetryException): - retry_interval = exception.retry_interval - if retry_interval is None: - retry_interval = ctx.task.retry_interval - ctx.task.status = ctx.task.RETRYING - ctx.task.attempts_count += 1 - ctx.task.due_at = datetime.utcnow() + timedelta(seconds=retry_interval) - else: - ctx.task.ended_at = datetime.utcnow() - ctx.task.status = ctx.task.FAILED + task = ctx.task + should_retry = all([ + not isinstance(exception, exceptions.TaskAbortException), + task.attempts_count < task.max_attempts or + task.max_attempts == task.INFINITE_RETRIES, + # ignore_failure check here means the task will not be retried and it will be marked + # as failed. The engine will also look at ignore_failure so it won't fail the + # workflow. + not task.ignore_failure + ]) + if should_retry: + retry_interval = None + if isinstance(exception, exceptions.TaskRetryException): + retry_interval = exception.retry_interval + if retry_interval is None: + retry_interval = ctx.task.retry_interval + task.status = task.RETRYING + task.attempts_count += 1 + task.due_at = datetime.utcnow() + timedelta(seconds=retry_interval) + else: + task.ended_at = datetime.utcnow() + task.status = task.FAILED + ctx.model.task.update(task) @events.on_success_task_signal.connect def _task_succeeded(ctx, *args, **kwargs): - with ctx.persist_changes: - ctx.task.ended_at = datetime.utcnow() - ctx.task.status = ctx.task.SUCCESS - ctx.task.attempts_count += 1 + task = ctx.task + ctx.task.ended_at = datetime.utcnow() + ctx.task.status = ctx.task.SUCCESS + ctx.task.attempts_count += 1 - _update_node_state_if_necessary(ctx) + _update_node_state_if_necessary(ctx) + ctx.model.task.update(task) @events.start_workflow_signal.connect def _workflow_started(workflow_context, *args, **kwargs): - with workflow_context.persist_changes: - execution = workflow_context.execution - # the execution may already be in the process of cancelling - if execution.status in (execution.CANCELLING, execution.CANCELLED): - return - execution.status = execution.STARTED - execution.started_at = datetime.utcnow() + execution = workflow_context.execution + # the execution may already be in the process of cancelling + if execution.status in (execution.CANCELLING, execution.CANCELLED): + return + execution.status = execution.STARTED + execution.started_at = datetime.utcnow() + workflow_context.model.execution.update(execution) @events.on_failure_workflow_signal.connect def _workflow_failed(workflow_context, exception, *args, **kwargs): - with workflow_context.persist_changes: - execution = workflow_context.execution - execution.error = str(exception) - execution.status = execution.FAILED - execution.ended_at = datetime.utcnow() + execution = workflow_context.execution + execution.error = str(exception) + execution.status = execution.FAILED + execution.ended_at = datetime.utcnow() + workflow_context.model.execution.update(execution) @events.on_success_workflow_signal.connect def _workflow_succeeded(workflow_context, *args, **kwargs): - with workflow_context.persist_changes: - execution = workflow_context.execution - execution.status = execution.SUCCEEDED - execution.ended_at = datetime.utcnow() + execution = workflow_context.execution + execution.status = execution.SUCCEEDED + execution.ended_at = datetime.utcnow() + workflow_context.model.execution.update(execution) @events.on_cancelled_workflow_signal.connect def _workflow_cancelled(workflow_context, *args, **kwargs): - with workflow_context.persist_changes: - execution = workflow_context.execution - # _workflow_cancelling function may have called this function already - if execution.status == execution.CANCELLED: - return - # the execution may have already been finished - elif execution.status in (execution.SUCCEEDED, execution.FAILED): - _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status) - else: - execution.status = execution.CANCELLED - execution.ended_at = datetime.utcnow() + execution = workflow_context.execution + # _workflow_cancelling function may have called this function already + if execution.status == execution.CANCELLED: + return + # the execution may have already been finished + elif execution.status in (execution.SUCCEEDED, execution.FAILED): + _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status) + else: + execution.status = execution.CANCELLED + execution.ended_at = datetime.utcnow() + workflow_context.model.execution.update(execution) @events.on_resume_workflow_signal.connect def _workflow_resume(workflow_context, retry_failed=False, *args, **kwargs): - with workflow_context.persist_changes: - execution = workflow_context.execution - execution.status = execution.PENDING - # Any non ended task would be put back to pending state + execution = workflow_context.execution + execution.status = execution.PENDING + # Any non ended task would be put back to pending state + for task in execution.tasks: + if not task.has_ended(): + task.status = task.PENDING + + if retry_failed: for task in execution.tasks: - if not task.has_ended(): + if task.status == task.FAILED and not task.ignore_failure: + task.attempts_count = 0 task.status = task.PENDING - - if retry_failed: - for task in execution.tasks: - if task.status == task.FAILED and not task.ignore_failure: - task.attempts_count = 0 - task.status = task.PENDING + workflow_context.model.execution.update(execution) @events.on_cancelling_workflow_signal.connect def _workflow_cancelling(workflow_context, *args, **kwargs): - with workflow_context.persist_changes: - execution = workflow_context.execution - if execution.status == execution.PENDING: - return _workflow_cancelled(workflow_context=workflow_context) - # the execution may have already been finished - elif execution.status in (execution.SUCCEEDED, execution.FAILED): - _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status) - else: - execution.status = execution.CANCELLING + execution = workflow_context.execution + if execution.status == execution.PENDING: + return _workflow_cancelled(workflow_context=workflow_context) + # the execution may have already been finished + elif execution.status in (execution.SUCCEEDED, execution.FAILED): + _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status) + else: + execution.status = execution.CANCELLING + workflow_context.model.execution.update(execution) def _update_node_state_if_necessary(ctx, is_transitional=False): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c77c0e44/aria/orchestrator/workflows/executor/base.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py index e7d03ea..d550b53 100644 --- a/aria/orchestrator/workflows/executor/base.py +++ b/aria/orchestrator/workflows/executor/base.py @@ -71,5 +71,6 @@ class BaseExecutor(logger.LoggerMixin): class StubTaskExecutor(BaseExecutor): # pylint: disable=abstract-method def execute(self, ctx, *args, **kwargs): - with ctx.persist_changes: - ctx.task.status = ctx.task.SUCCESS + task = ctx.task + task.status = ctx.task.SUCCESS + ctx.model.task.update(task) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c77c0e44/aria/orchestrator/workflows/executor/dry.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/dry.py b/aria/orchestrator/workflows/executor/dry.py index 9314e5d..628d9cb 100644 --- a/aria/orchestrator/workflows/executor/dry.py +++ b/aria/orchestrator/workflows/executor/dry.py @@ -27,28 +27,29 @@ class DryExecutor(base.BaseExecutor): Dry task executor: prints task information without causing any side effects. """ def execute(self, ctx): - with ctx.persist_changes: - # updating the task manually instead of calling self._task_started(task), - # to avoid any side effects raising that event might cause - ctx.task.started_at = datetime.utcnow() - ctx.task.status = ctx.task.STARTED - - dry_msg = '<dry> {name} {task.interface_name}.{task.operation_name} {suffix}' - logger = ctx.logger.info if ctx.task.function else ctx.logger.debug - - if hasattr(ctx.task.actor, 'source_node'): - name = '{source_node.name}->{target_node.name}'.format( - source_node=ctx.task.actor.source_node, target_node=ctx.task.actor.target_node) - else: - name = ctx.task.actor.name - - if ctx.task.function: - logger(dry_msg.format(name=name, task=ctx.task, suffix='started...')) - logger(dry_msg.format(name=name, task=ctx.task, suffix='successful')) - else: - logger(dry_msg.format(name=name, task=ctx.task, suffix='has no implementation')) - - # updating the task manually instead of calling self._task_succeeded(task), - # to avoid any side effects raising that event might cause - ctx.task.ended_at = datetime.utcnow() - ctx.task.status = ctx.task.SUCCESS + task = ctx.task + # updating the task manually instead of calling self._task_started(task), + # to avoid any side effects raising that event might cause + task.started_at = datetime.utcnow() + task.status = task.STARTED + + dry_msg = '<dry> {name} {task.interface_name}.{task.operation_name} {suffix}' + logger = ctx.logger.info if task.function else ctx.logger.debug + + if hasattr(task.actor, 'source_node'): + name = '{source_node.name}->{target_node.name}'.format( + source_node=task.actor.source_node, target_node=task.actor.target_node) + else: + name = task.actor.name + + if task.function: + logger(dry_msg.format(name=name, task=ctx.task, suffix='started...')) + logger(dry_msg.format(name=name, task=ctx.task, suffix='successful')) + else: + logger(dry_msg.format(name=name, task=ctx.task, suffix='has no implementation')) + + # updating the task manually instead of calling self._task_succeeded(task), + # to avoid any side effects raising that event might cause + task.ended_at = datetime.utcnow() + task.status = task.SUCCESS + ctx.model.task.update(task) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c77c0e44/tests/orchestrator/workflows/executor/__init__.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/__init__.py b/tests/orchestrator/workflows/executor/__init__.py index 99d0b39..5c25fa2 100644 --- a/tests/orchestrator/workflows/executor/__init__.py +++ b/tests/orchestrator/workflows/executor/__init__.py @@ -25,10 +25,12 @@ class MockContext(object): INSTRUMENTATION_FIELDS = BaseContext.INSTRUMENTATION_FIELDS - def __init__(self, storage, task_kwargs=None): + def __init__(self, storage=None, task_kwargs=None): self.logger = logging.getLogger('mock_logger') self._task_kwargs = task_kwargs or {} - self._storage = storage + import mock + self._storage = storage or mock.MagicMock() + self._storage_kwargs = self._storage.serialization_dict if storage else None self.task = MockTask(storage, **task_kwargs) self.states = [] self.exception = None @@ -38,7 +40,7 @@ class MockContext(object): return { 'context_cls': self.__class__, 'context': { - 'storage_kwargs': self._storage.serialization_dict, + 'storage_kwargs': self._storage_kwargs, 'task_kwargs': self._task_kwargs } } @@ -55,13 +57,11 @@ class MockContext(object): @classmethod def instantiate_from_dict(cls, storage_kwargs=None, task_kwargs=None): - return cls(storage=aria.application_model_storage(**(storage_kwargs or {})), - task_kwargs=(task_kwargs or {})) - - @property - @contextmanager - def persist_changes(self): - yield + if storage_kwargs: + return cls(storage=aria.application_model_storage(**(storage_kwargs or {})), + task_kwargs=task_kwargs) + else: + return cls(task_kwargs=task_kwargs) class MockActor(object): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c77c0e44/tests/orchestrator/workflows/executor/test_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py index 32a68e0..11aacec 100644 --- a/tests/orchestrator/workflows/executor/test_executor.py +++ b/tests/orchestrator/workflows/executor/test_executor.py @@ -28,6 +28,7 @@ except ImportError: import aria from aria.modeling import models from aria.orchestrator import events +from aria.orchestrator.workflows.core import events_handler from aria.orchestrator.workflows.executor import ( thread, process, @@ -35,38 +36,40 @@ from aria.orchestrator.workflows.executor import ( ) import tests +from .. import helpers from . import MockContext + def _get_function(func): return '{module}.{func.__name__}'.format(module=__name__, func=func) def execute_and_assert(executor, storage=None): expected_value = 'value' - successful_task = MockContext( + successful_ctx = MockContext( storage, task_kwargs=dict(function=_get_function(mock_successful_task)) ) - failing_task = MockContext( + failing_ctx = MockContext( storage, task_kwargs=dict(function=_get_function(mock_failing_task)) ) - task_with_inputs = MockContext( + task_with_inputs_ctx = MockContext( storage, task_kwargs=dict(function=_get_function(mock_task_with_input), arguments={'input': models.Argument.wrap('input', 'value')}) ) - for task in [successful_task, failing_task, task_with_inputs]: - executor.execute(task) + for ctx in [successful_ctx, failing_ctx, task_with_inputs_ctx]: + executor.execute(ctx) @retrying.retry(stop_max_delay=10000, wait_fixed=100) def assertion(): - assert successful_task.states == ['start', 'success'] - assert failing_task.states == ['start', 'failure'] - assert task_with_inputs.states == ['start', 'failure'] - assert isinstance(failing_task.exception, MockException) - assert isinstance(task_with_inputs.exception, MockException) - assert task_with_inputs.exception.message == expected_value + assert successful_ctx.states == ['start', 'success'] + assert failing_ctx.states == ['start', 'failure'] + assert task_with_inputs_ctx.states == ['start', 'failure'] + assert isinstance(failing_ctx.exception, MockException) + assert isinstance(task_with_inputs_ctx.exception, MockException) + assert task_with_inputs_ctx.exception.message == expected_value assertion() @@ -130,20 +133,20 @@ def process_executor(): @pytest.fixture(autouse=True) def register_signals(): - def start_handler(task, *args, **kwargs): - task.states.append('start') - - def success_handler(task, *args, **kwargs): - task.states.append('success') - - def failure_handler(task, exception, *args, **kwargs): - task.states.append('failure') - task.exception = exception - - events.start_task_signal.connect(start_handler) - events.on_success_task_signal.connect(success_handler) - events.on_failure_task_signal.connect(failure_handler) - yield - events.start_task_signal.disconnect(start_handler) - events.on_success_task_signal.disconnect(success_handler) - events.on_failure_task_signal.disconnect(failure_handler) + def start_handler(ctx, *args, **kwargs): + ctx.states.append('start') + + def success_handler(ctx, *args, **kwargs): + ctx.states.append('success') + + def failure_handler(ctx, exception, *args, **kwargs): + ctx.states.append('failure') + ctx.exception = exception + with helpers.disconnect_event_handlers(): + events.start_task_signal.connect(start_handler) + events.on_success_task_signal.connect(success_handler) + events.on_failure_task_signal.connect(failure_handler) + yield + events.start_task_signal.disconnect(start_handler) + events.on_success_task_signal.disconnect(success_handler) + events.on_failure_task_signal.disconnect(failure_handler) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c77c0e44/tests/orchestrator/workflows/executor/test_process_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py index e050d18..94c3585 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor.py +++ b/tests/orchestrator/workflows/executor/test_process_executor.py @@ -33,11 +33,12 @@ from aria.orchestrator.workflows.executor import process import tests.storage import tests.resources -from tests.helpers import FilesystemDataHolder from tests.fixtures import ( # pylint: disable=unused-import plugins_dir, plugin_manager, ) +from tests.helpers import FilesystemDataHolder +from ..helpers import disconnect_event_handlers from . import MockContext @@ -48,7 +49,6 @@ class TestProcessExecutor(object): model, task_kwargs=dict(function='mock_plugin1.operation', plugin_fk=mock_plugin.id) ) - executor.execute(ctx) error = queue.get(timeout=60) # tests/resources/plugins/mock-plugin1 is the plugin installed @@ -85,7 +85,6 @@ while True: model.argument.put(holder_path_argument) model.argument.put(script_path_argument) ctx = MockContext( - model, task_kwargs=dict( function='{0}.{1}'.format(__name__, freezing_task.__name__), arguments=dict(holder_path=holder_path_argument, @@ -124,13 +123,15 @@ def queue(): def handler(_, exception=None, **kwargs): _queue.put(exception) - events.on_success_task_signal.connect(handler) - events.on_failure_task_signal.connect(handler) - try: - yield _queue - finally: - events.on_success_task_signal.disconnect(handler) - events.on_failure_task_signal.disconnect(handler) + with disconnect_event_handlers(): + + events.on_success_task_signal.connect(handler) + events.on_failure_task_signal.connect(handler) + try: + yield _queue + finally: + events.on_success_task_signal.disconnect(handler) + events.on_failure_task_signal.disconnect(handler) @pytest.fixture http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c77c0e44/tests/orchestrator/workflows/helpers.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/helpers.py b/tests/orchestrator/workflows/helpers.py index 8e3f9b1..b059542 100644 --- a/tests/orchestrator/workflows/helpers.py +++ b/tests/orchestrator/workflows/helpers.py @@ -15,6 +15,9 @@ from contextlib import contextmanager +from aria.orchestrator import events +from aria.orchestrator.workflows.core import events_handler + @contextmanager def events_collector(*signals): @@ -35,3 +38,18 @@ def events_collector(*signals): finally: for signal in signals: signal.disconnect(handlers[signal]) + + +@contextmanager +def disconnect_event_handlers(): + # disconnect the system events handler + events.start_task_signal.disconnect(events_handler._task_started) + events.on_success_task_signal.disconnect(events_handler._task_succeeded) + events.on_failure_task_signal.disconnect(events_handler._task_failed) + try: + yield + finally: + # reconnect the system events handler + events.start_task_signal.connect(events_handler._task_started) + events.on_success_task_signal.connect(events_handler._task_succeeded) + events.on_failure_task_signal.connect(events_handler._task_failed) \ No newline at end of file
