extended error msg
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/5af1a8f2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/5af1a8f2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/5af1a8f2 Branch: refs/heads/ARIA-79-concurrent-storage-modifications Commit: 5af1a8f23237768516af4cca018cd418a9abe7b1 Parents: e6d68d7 Author: mxmrlv <mxm...@gmail.com> Authored: Thu Feb 16 14:49:19 2017 +0200 Committer: mxmrlv <mxm...@gmail.com> Committed: Thu Feb 16 16:24:00 2017 +0200 ---------------------------------------------------------------------- aria/orchestrator/workflows/executor/process.py | 8 +--- aria/storage/instrumentation.py | 40 +++++++++++------ aria/storage/modeling/instance_elements.py | 5 ++- aria/storage_initializer.py | 1 - tests/mock/models.py | 2 - ...process_executor_concurrent_modifications.py | 45 +++++++++++--------- tests/storage/test_structures.py | 1 - 7 files changed, 58 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5af1a8f2/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index 655d75d..84f5f58 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -227,6 +227,8 @@ class ProcessExecutor(base.BaseExecutor): try: self._apply_tracked_changes(task, request) except BaseException as e: + e.message = \ + '{0} Remote task execution failed due: {1}'.format(str(e), request['exception']) self._task_failed(task, exception=e) else: self._task_failed(task, exception=request['exception']) @@ -364,9 +366,7 @@ def _main(): # This is required for the instrumentation work properly. # See docstring of `remove_mutable_association_listener` for further details storage_type.remove_mutable_association_listener() - with instrumentation.track_changes() as instrument: - # import pydevd; pydevd.settrace('localhost') try: ctx = context_dict['context_cls'].deserialize_from_dict(**context_dict['context']) _patch_session(ctx=ctx, messenger=messenger, instrument=instrument) @@ -377,10 +377,6 @@ def _main(): task_func(ctx=ctx, **operation_inputs) messenger.succeeded(tracked_changes=instrument.tracked_changes) except BaseException as e: - # import traceback - # with open('/home/maxim/Desktop/tmp_log', 'wr+') as f: - # traceback.print_exc(file=f) - messenger.failed(exception=e, tracked_changes=instrument.tracked_changes) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5af1a8f2/aria/storage/instrumentation.py ---------------------------------------------------------------------- diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py index 41818b6..b3ca24a 100644 --- a/aria/storage/instrumentation.py +++ b/aria/storage/instrumentation.py @@ -14,6 +14,7 @@ # limitations under the License. import copy +import json import sqlalchemy import sqlalchemy.event @@ -22,7 +23,7 @@ from . import exceptions from .modeling import model as _model -_VERSION_ID_COL = 'version_id' +_VERSION_ID_COL = 'version' _STUB = object() _INSTRUMENTED = { _model.Node.runtime_properties: dict @@ -162,18 +163,31 @@ def apply_tracked_changes(tracked_changes, model): returned by calling ``track_changes()`` :param model: The model storage used to actually apply the changes """ - for mapi_name, tracked_instances in tracked_changes.items(): - mapi = getattr(model, mapi_name) - for instance_id, tracked_attributes in tracked_instances.items(): - instance = None - for attribute_name, value in tracked_attributes.items(): - if value.initial != value.current: - if not instance: - instance = mapi.get(instance_id) - setattr(instance, attribute_name, value.current) - if instance: - _validate_version_id(instance, mapi) - mapi.update(instance) + successfully_updated_instances = dict() + try: + for mapi_name, tracked_instances in tracked_changes.items(): + successfully_updated_instances[mapi_name] = list() + mapi = getattr(model, mapi_name) + for instance_id, tracked_attributes in tracked_instances.items(): + instance = None + for attribute_name, value in tracked_attributes.items(): + if value.initial != value.current: + if not instance: + instance = mapi.get(instance_id) + setattr(instance, attribute_name, value.current) + if instance: + _validate_version_id(instance, mapi) + mapi.update(instance) + successfully_updated_instances[mapi_name].append(instance_id) + except BaseException as e: + for key, value in successfully_updated_instances.items(): + if not value: + del successfully_updated_instances[key] + e.message = \ + 'Registering all the changes to the storage has failed. ' \ + 'The instances that were successfully updated : {0} .' \ + 'This was caused by {1}.'.format(json.dumps(successfully_updated_instances), str(e)) + raise e def _validate_version_id(instance, mapi): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5af1a8f2/aria/storage/modeling/instance_elements.py ---------------------------------------------------------------------- diff --git a/aria/storage/modeling/instance_elements.py b/aria/storage/modeling/instance_elements.py index 0666c8a..2b102f1 100644 --- a/aria/storage/modeling/instance_elements.py +++ b/aria/storage/modeling/instance_elements.py @@ -553,7 +553,7 @@ class PolicyBase(structure.ModelMixin): # region many-to-one relationships @declared_attr - def service_instnce(cls): + def service_instance(cls): return cls.many_to_one_relationship('service_instance') # region many-to-many relationships @@ -851,6 +851,8 @@ class NodeBase(structure.ModelMixin): * :code:`relationships`: List of :class:`Relationship` """ __tablename__ = 'node' + version = Column(Integer, nullable=False) + __mapper_args__ = {'version_id_col': version} __private_fields__ = ['service_instance_fk', 'host_fk', @@ -878,7 +880,6 @@ class NodeBase(structure.ModelMixin): runtime_properties = Column(aria_types.Dict) scaling_groups = Column(aria_types.List) state = Column(Text, nullable=False) - version = Column(Integer, default=1) @declared_attr def plugins(cls): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5af1a8f2/aria/storage_initializer.py ---------------------------------------------------------------------- diff --git a/aria/storage_initializer.py b/aria/storage_initializer.py index aea5ec8..175ec22 100644 --- a/aria/storage_initializer.py +++ b/aria/storage_initializer.py @@ -95,7 +95,6 @@ def _create_node_instance(service_instance, node, node_model): service_instance=service_instance, name=node_model.id, runtime_properties={}, - version=None, node_template=node, state='', scaling_groups=[] http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5af1a8f2/tests/mock/models.py ---------------------------------------------------------------------- diff --git a/tests/mock/models.py b/tests/mock/models.py index 047526a..301fc01 100644 --- a/tests/mock/models.py +++ b/tests/mock/models.py @@ -50,7 +50,6 @@ def get_dependency_node_instance(dependency_node, deployment): name=DEPENDENCY_NODE_INSTANCE_NAME, service_instance=deployment, runtime_properties={'ip': '1.1.1.1'}, - version=None, node_template=dependency_node, state='', scaling_groups=[] @@ -96,7 +95,6 @@ def get_dependent_node_instance(dependent_node, deployment): name=DEPENDENT_NODE_INSTANCE_NAME, service_instance=deployment, runtime_properties={}, - version=None, node_template=dependent_node, state='', scaling_groups=[], http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5af1a8f2/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py index e46921e..40deedd 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py +++ b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py @@ -38,7 +38,7 @@ def test_concurrent_modification_on_task_succeeded(context, executor, lock_files @operation def _test_task_succeeded(ctx, lock_files, key, first_value, second_value): - _concurrent_update(lock_files, ctx.node_instance, key, first_value, second_value) + _concurrent_update(lock_files, ctx.node, key, first_value, second_value) def test_concurrent_modification_on_task_failed(context, executor, lock_files): @@ -47,7 +47,7 @@ def test_concurrent_modification_on_task_failed(context, executor, lock_files): @operation def _test_task_failed(ctx, lock_files, key, first_value, second_value): - first = _concurrent_update(lock_files, ctx.node_instance, key, first_value, second_value) + first = _concurrent_update(lock_files, ctx.node, key, first_value, second_value) if not first: raise RuntimeError('MESSAGE') @@ -58,21 +58,23 @@ def test_concurrent_modification_on_update_and_refresh(context, executor, lock_f @operation def _test_update_and_refresh(ctx, lock_files, key, first_value, second_value): - node_instance = ctx.node_instance - first = _concurrent_update(lock_files, node_instance, key, first_value, second_value) + node = ctx.node + first = _concurrent_update(lock_files, node, key, first_value, second_value) if not first: try: - ctx.model.node_instance.update(node_instance) + ctx.model.node.update(node) except StorageError as e: assert 'Version conflict' in str(e) - ctx.model.node_instance.refresh(node_instance) + ctx.model.node.refresh(node) else: raise RuntimeError('Unexpected') def _test(context, executor, lock_files, func, expected_failure): - def _node_instance(ctx): - return ctx.model.node_instance.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME) + def _node(ctx): + return ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME) + + op_name = mock.operations.NODE_OPERATIONS_INSTALL[0] key = 'key' first_value = 'value1' @@ -84,14 +86,19 @@ def _test(context, executor, lock_files, func, expected_failure): 'second_value': second_value } + node = _node(context) + node.interfaces = [mock.models.get_interface( + op_name, + operation_kwargs=dict(implementation='{0}.{1}'.format(__name__, func.__name__)) + )] + context.model.node.update(node) + @workflow - def mock_workflow(ctx, graph): - op = 'test.op' - node_instance = _node_instance(ctx) - node_instance.node.operations[op] = {'operation': '{0}.{1}'.format(__name__, func.__name__)} + def mock_workflow(graph, **_): graph.add_tasks( - api.task.OperationTask.node_instance(instance=node_instance, name=op, inputs=inputs), - api.task.OperationTask.node_instance(instance=node_instance, name=op, inputs=inputs)) + api.task.OperationTask.node(instance=node, name=op_name, inputs=inputs), + api.task.OperationTask.node(instance=node, name=op_name, inputs=inputs) + ) signal = events.on_failure_task_signal with events_collector(signal) as collected: @@ -100,7 +107,7 @@ def _test(context, executor, lock_files, func, expected_failure): except ExecutorException: pass - props = _node_instance(context).runtime_properties + props = _node(context).runtime_properties assert props[key] == first_value exceptions = [event['kwargs']['exception'] for event in collected.get(signal, [])] @@ -122,7 +129,7 @@ def executor(): @pytest.fixture def context(tmpdir): - result = mock.context.simple(storage.get_sqlite_api_kwargs(str(tmpdir))) + result = mock.context.simple(str(tmpdir)) yield result storage.release_sqlite_storage(result.model) @@ -132,7 +139,7 @@ def lock_files(tmpdir): return str(tmpdir.join('first_lock_file')), str(tmpdir.join('second_lock_file')) -def _concurrent_update(lock_files, node_instance, key, first_value, second_value): +def _concurrent_update(lock_files, node, key, first_value, second_value): locker1 = fasteners.InterProcessLock(lock_files[0]) locker2 = fasteners.InterProcessLock(lock_files[1]) @@ -143,11 +150,11 @@ def _concurrent_update(lock_files, node_instance, key, first_value, second_value # Give chance for both processes to acquire locks while locker2.acquire(blocking=False): locker2.release() - time.sleep(0.1) + time.sleep(0.01) else: locker2.acquire() - node_instance.runtime_properties[key] = first_value if first else second_value + node.runtime_properties[key] = first_value if first else second_value if first: locker1.release() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5af1a8f2/tests/storage/test_structures.py ---------------------------------------------------------------------- diff --git a/tests/storage/test_structures.py b/tests/storage/test_structures.py index 30f0064..666256e 100644 --- a/tests/storage/test_structures.py +++ b/tests/storage/test_structures.py @@ -125,7 +125,6 @@ def test_relationship_model_ordering(context): name='new_node_instance', runtime_properties={}, service_instance=service_instance, - version=None, node_template=new_node_template, state='', scaling_groups=[]