ARIA-79-concurrent-modifications
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/96581d9f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/96581d9f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/96581d9f Branch: refs/heads/ARIA-106-Create-sqla-logging-handler Commit: 96581d9f9b00736d3c83ecda20d2e9f81073d4da Parents: 787d7e7 Author: Dan Kilman <d...@gigaspaces.com> Authored: Mon Jan 30 16:49:00 2017 +0200 Committer: mxmrlv <mxm...@gmail.com> Committed: Fri Feb 17 21:12:36 2017 +0200 ---------------------------------------------------------------------- aria/.pylintrc | 2 +- aria/orchestrator/workflows/executor/process.py | 162 +++++++++++------- aria/storage/instrumentation.py | 73 +++++++-- aria/storage/modeling/instance_elements.py | 5 +- aria/storage/sql_mapi.py | 4 + aria/storage_initializer.py | 1 - tests/mock/models.py | 2 - ...process_executor_concurrent_modifications.py | 164 +++++++++++++++++++ tests/requirements.txt | 3 +- tests/storage/test_structures.py | 1 - 10 files changed, 338 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96581d9f/aria/.pylintrc ---------------------------------------------------------------------- diff --git a/aria/.pylintrc b/aria/.pylintrc index ee4d0ef..b7656a3 100644 --- a/aria/.pylintrc +++ b/aria/.pylintrc @@ -250,7 +250,7 @@ docstring-min-length=-1 [ELIF] # Maximum number of nested blocks for function / method body -max-nested-blocks=5 +max-nested-blocks=7 [SIMILARITIES] http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96581d9f/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index 560ac43..75bbbce 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -53,6 +53,8 @@ _IS_WIN = os.name == 'nt' _INT_FMT = 'I' _INT_SIZE = struct.calcsize(_INT_FMT) +UPDATE_TRACKED_CHANGES_FAILED_STR = \ + 'Some changes failed writing to storage. For more info refer to the log.' class ProcessExecutor(base.BaseExecutor): @@ -74,6 +76,13 @@ class ProcessExecutor(base.BaseExecutor): # Contains reference to all currently running tasks self._tasks = {} + self._request_handlers = { + 'started': self._handle_task_started_request, + 'succeeded': self._handle_task_succeeded_request, + 'failed': self._handle_task_failed_request, + 'apply_tracked_changes': self._handle_apply_tracked_changes_request + } + # Server socket used to accept task status messages from subprocesses self._server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._server_socket.bind(('localhost', 0)) @@ -131,58 +140,6 @@ class ProcessExecutor(base.BaseExecutor): def _remove_task(self, task_id): return self._tasks.pop(task_id) - def _listener(self): - # Notify __init__ method this thread has actually started - self._listener_started.put(True) - while not self._stopped: - try: - # Accept messages written to the server socket - with contextlib.closing(self._server_socket.accept()[0]) as connection: - message = self._recv_message(connection) - message_type = message['type'] - if message_type == 'closed': - break - task_id = message['task_id'] - if message_type == 'started': - self._task_started(self._tasks[task_id]) - elif message_type == 'apply_tracked_changes': - task = self._tasks[task_id] - instrumentation.apply_tracked_changes( - tracked_changes=message['tracked_changes'], - model=task.context.model) - elif message_type == 'succeeded': - task = self._remove_task(task_id) - instrumentation.apply_tracked_changes( - tracked_changes=message['tracked_changes'], - model=task.context.model) - self._task_succeeded(task) - elif message_type == 'failed': - task = self._remove_task(task_id) - instrumentation.apply_tracked_changes( - tracked_changes=message['tracked_changes'], - model=task.context.model) - self._task_failed(task, exception=message['exception']) - else: - raise RuntimeError('Invalid state') - except BaseException as e: - self.logger.debug('Error in process executor listener: {0}'.format(e)) - - def _recv_message(self, connection): - message_len, = struct.unpack(_INT_FMT, self._recv_bytes(connection, _INT_SIZE)) - return jsonpickle.loads(self._recv_bytes(connection, message_len)) - - @staticmethod - def _recv_bytes(connection, count): - result = io.BytesIO() - while True: - if not count: - return result.getvalue() - read = connection.recv(count) - if not read: - return result.getvalue() - result.write(read) - count -= len(read) - def _check_closed(self): if self._stopped: raise RuntimeError('Executor closed') @@ -231,6 +188,90 @@ class ProcessExecutor(base.BaseExecutor): os.pathsep, env.get('PYTHONPATH', '')) + def _listener(self): + # Notify __init__ method this thread has actually started + self._listener_started.put(True) + while not self._stopped: + try: + with self._accept_request() as (request, response): + request_type = request['type'] + if request_type == 'closed': + break + request_handler = self._request_handlers.get(request_type) + if not request_handler: + raise RuntimeError('Invalid request type: {0}'.format(request_type)) + request_handler(task_id=request['task_id'], request=request, response=response) + except BaseException as e: + self.logger.debug('Error in process executor listener: {0}'.format(e)) + + @contextlib.contextmanager + def _accept_request(self): + with contextlib.closing(self._server_socket.accept()[0]) as connection: + message = _recv_message(connection) + response = {} + yield message, response + _send_message(connection, response) + + def _handle_task_started_request(self, task_id, **kwargs): + self._task_started(self._tasks[task_id]) + + def _handle_task_succeeded_request(self, task_id, request, **kwargs): + task = self._remove_task(task_id) + try: + self._apply_tracked_changes(task, request) + except BaseException as e: + e.message += UPDATE_TRACKED_CHANGES_FAILED_STR + self._task_failed(task, exception=e) + else: + self._task_succeeded(task) + + def _handle_task_failed_request(self, task_id, request, **kwargs): + task = self._remove_task(task_id) + try: + self._apply_tracked_changes(task, request) + except BaseException as e: + e.message += 'Task failed due to {0}.'.format(request['exception']) + \ + UPDATE_TRACKED_CHANGES_FAILED_STR + self._task_failed(task, exception=e) + else: + self._task_failed(task, exception=request['exception']) + + def _handle_apply_tracked_changes_request(self, task_id, request, response): + task = self._tasks[task_id] + try: + self._apply_tracked_changes(task, request) + except BaseException as e: + response['exception'] = exceptions.wrap_if_needed(e) + + @staticmethod + def _apply_tracked_changes(task, request): + instrumentation.apply_tracked_changes( + tracked_changes=request['tracked_changes'], + model=task.context.model) + + +def _send_message(connection, message): + data = jsonpickle.dumps(message) + connection.send(struct.pack(_INT_FMT, len(data))) + connection.sendall(data) + + +def _recv_message(connection): + message_len, = struct.unpack(_INT_FMT, _recv_bytes(connection, _INT_SIZE)) + return jsonpickle.loads(_recv_bytes(connection, message_len)) + + +def _recv_bytes(connection, count): + result = io.BytesIO() + while True: + if not count: + return result.getvalue() + read = connection.recv(count) + if not read: + return result.getvalue() + result.write(read) + count -= len(read) + class _Messenger(object): @@ -261,17 +302,16 @@ class _Messenger(object): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect(('localhost', self.port)) try: - data = jsonpickle.dumps({ + _send_message(sock, { 'type': type, 'task_id': self.task_id, 'exception': exceptions.wrap_if_needed(exception), 'tracked_changes': tracked_changes }) - sock.send(struct.pack(_INT_FMT, len(data))) - sock.sendall(data) - # send message will block until the server side closes the connection socket - # because we want it to be synchronous - sock.recv(1) + response = _recv_message(sock) + response_exception = response.get('exception') + if response_exception: + raise response_exception finally: sock.close() @@ -294,12 +334,17 @@ def _patch_session(ctx, messenger, instrument): messenger.apply_tracked_changes(instrument.tracked_changes) instrument.clear() + def patched_rollback(): + # Rollback is performed on parent process when commit fails + pass + # when autoflush is set to true (the default), refreshing an object will trigger # an auto flush by sqlalchemy, this autoflush will attempt to commit changes made so # far on the session. this is not the desired behavior in the subprocess session.autoflush = False session.commit = patched_commit + session.rollback = patched_rollback session.refresh = patched_refresh @@ -324,7 +369,6 @@ def _main(): # This is required for the instrumentation work properly. # See docstring of `remove_mutable_association_listener` for further details storage_type.remove_mutable_association_listener() - with instrumentation.track_changes() as instrument: try: ctx = context_dict['context_cls'].deserialize_from_dict(**context_dict['context']) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96581d9f/aria/storage/instrumentation.py ---------------------------------------------------------------------- diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py index 57fe9bd..8fb9d82 100644 --- a/aria/storage/instrumentation.py +++ b/aria/storage/instrumentation.py @@ -14,11 +14,16 @@ # limitations under the License. import copy +import json +import sqlalchemy import sqlalchemy.event +from . import exceptions + from .modeling import model as _model +_VERSION_ID_COL = 'version' _STUB = object() _INSTRUMENTED = { _model.Node.runtime_properties: dict @@ -92,6 +97,11 @@ class _Instrumentation(object): mapi_name = instrumented_class.__modelname__ tracked_instances = self.tracked_changes.setdefault(mapi_name, {}) tracked_attributes = tracked_instances.setdefault(target.id, {}) + if hasattr(target, _VERSION_ID_COL): + # We want to keep track of the initial version id so it can be compared + # with the committed version id when the tracked changes are applied + tracked_attributes.setdefault(_VERSION_ID_COL, + _Value(_STUB, getattr(target, _VERSION_ID_COL))) for attribute_name, attribute_type in instrumented_attributes.items(): if attribute_name not in tracked_attributes: initial = getattr(target, attribute_name) @@ -143,7 +153,11 @@ class _Value(object): return self.initial == other.initial and self.current == other.current def __hash__(self): - return hash(self.initial) ^ hash(self.current) + return hash((self.initial, self.current)) + + @property + def dict(self): + return {'initial': self.initial, 'current': self.current}.copy() def apply_tracked_changes(tracked_changes, model): @@ -153,14 +167,49 @@ def apply_tracked_changes(tracked_changes, model): returned by calling ``track_changes()`` :param model: The model storage used to actually apply the changes """ - for mapi_name, tracked_instances in tracked_changes.items(): - mapi = getattr(model, mapi_name) - for instance_id, tracked_attributes in tracked_instances.items(): - instance = None - for attribute_name, value in tracked_attributes.items(): - if value.initial != value.current: - if not instance: - instance = mapi.get(instance_id) - setattr(instance, attribute_name, value.current) - if instance: - mapi.update(instance) + successfully_updated_changes = dict() + try: + for mapi_name, tracked_instances in tracked_changes.items(): + successfully_updated_changes[mapi_name] = dict() + mapi = getattr(model, mapi_name) + for instance_id, tracked_attributes in tracked_instances.items(): + successfully_updated_changes[mapi_name][instance_id] = dict() + instance = None + for attribute_name, value in tracked_attributes.items(): + if value.initial != value.current: + if not instance: + instance = mapi.get(instance_id) + setattr(instance, attribute_name, value.current) + if instance: + _validate_version_id(instance, mapi) + mapi.update(instance) + successfully_updated_changes[mapi_name][instance_id] = [ + v.dict for v in tracked_attributes.values()] + except BaseException: + for key, value in successfully_updated_changes.items(): + if not value: + del successfully_updated_changes[key] + model.logger.error( + 'Registering all the changes to the storage has failed. \n' + 'The successful updates were: \n ' + '{0}'.format(json.dumps(successfully_updated_changes, indent=4))) + + raise + + +def _validate_version_id(instance, mapi): + version_id = sqlalchemy.inspect(instance).committed_state.get(_VERSION_ID_COL) + # There are two version conflict code paths: + # 1. The instance committed state loaded already holds a newer version, + # in this case, we manually raise the error + # 2. The UPDATE statement is executed with version validation and sqlalchemy + # will raise a StateDataError if there is a version mismatch. + if version_id and getattr(instance, _VERSION_ID_COL) != version_id: + object_version_id = getattr(instance, _VERSION_ID_COL) + mapi._session.rollback() + raise exceptions.StorageError( + 'Version conflict: committed and object {0} differ ' + '[committed {0}={1}, object {0}={2}]' + .format(_VERSION_ID_COL, + version_id, + object_version_id)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96581d9f/aria/storage/modeling/instance_elements.py ---------------------------------------------------------------------- diff --git a/aria/storage/modeling/instance_elements.py b/aria/storage/modeling/instance_elements.py index 0666c8a..2b102f1 100644 --- a/aria/storage/modeling/instance_elements.py +++ b/aria/storage/modeling/instance_elements.py @@ -553,7 +553,7 @@ class PolicyBase(structure.ModelMixin): # region many-to-one relationships @declared_attr - def service_instnce(cls): + def service_instance(cls): return cls.many_to_one_relationship('service_instance') # region many-to-many relationships @@ -851,6 +851,8 @@ class NodeBase(structure.ModelMixin): * :code:`relationships`: List of :class:`Relationship` """ __tablename__ = 'node' + version = Column(Integer, nullable=False) + __mapper_args__ = {'version_id_col': version} __private_fields__ = ['service_instance_fk', 'host_fk', @@ -878,7 +880,6 @@ class NodeBase(structure.ModelMixin): runtime_properties = Column(aria_types.Dict) scaling_groups = Column(aria_types.List) state = Column(Text, nullable=False) - version = Column(Integer, default=1) @declared_attr def plugins(cls): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96581d9f/aria/storage/sql_mapi.py ---------------------------------------------------------------------- diff --git a/aria/storage/sql_mapi.py b/aria/storage/sql_mapi.py index b80ac8e..2711aee 100644 --- a/aria/storage/sql_mapi.py +++ b/aria/storage/sql_mapi.py @@ -23,6 +23,7 @@ from sqlalchemy import ( orm, ) from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.orm.exc import StaleDataError from aria.utils.collections import OrderedDict from . import ( @@ -162,6 +163,9 @@ class SQLAlchemyModelAPI(api.ModelAPI): """ try: self._session.commit() + except StaleDataError as e: + self._session.rollback() + raise exceptions.StorageError('Version conflict: {0}'.format(str(e))) except (SQLAlchemyError, ValueError) as e: self._session.rollback() raise exceptions.StorageError('SQL Storage error: {0}'.format(str(e))) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96581d9f/aria/storage_initializer.py ---------------------------------------------------------------------- diff --git a/aria/storage_initializer.py b/aria/storage_initializer.py index aea5ec8..175ec22 100644 --- a/aria/storage_initializer.py +++ b/aria/storage_initializer.py @@ -95,7 +95,6 @@ def _create_node_instance(service_instance, node, node_model): service_instance=service_instance, name=node_model.id, runtime_properties={}, - version=None, node_template=node, state='', scaling_groups=[] http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96581d9f/tests/mock/models.py ---------------------------------------------------------------------- diff --git a/tests/mock/models.py b/tests/mock/models.py index 047526a..301fc01 100644 --- a/tests/mock/models.py +++ b/tests/mock/models.py @@ -50,7 +50,6 @@ def get_dependency_node_instance(dependency_node, deployment): name=DEPENDENCY_NODE_INSTANCE_NAME, service_instance=deployment, runtime_properties={'ip': '1.1.1.1'}, - version=None, node_template=dependency_node, state='', scaling_groups=[] @@ -96,7 +95,6 @@ def get_dependent_node_instance(dependent_node, deployment): name=DEPENDENT_NODE_INSTANCE_NAME, service_instance=deployment, runtime_properties={}, - version=None, node_template=dependent_node, state='', scaling_groups=[], http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96581d9f/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py new file mode 100644 index 0000000..ad3cb76 --- /dev/null +++ b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py @@ -0,0 +1,164 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import time + +import fasteners +import pytest + +from aria.storage.exceptions import StorageError +from aria.orchestrator import events +from aria.orchestrator.workflows.exceptions import ExecutorException +from aria.orchestrator.workflows import api +from aria.orchestrator.workflows.executor import process +from aria.orchestrator import workflow, operation + +import tests +from tests.orchestrator.context import execute as execute_workflow +from tests.orchestrator.workflows.helpers import events_collector +from tests import mock +from tests import storage + + +def test_concurrent_modification_on_task_succeeded(context, executor, lock_files): + _test(context, executor, lock_files, _test_task_succeeded, expected_failure=True) + + +@operation +def _test_task_succeeded(ctx, lock_files, key, first_value, second_value): + _concurrent_update(lock_files, ctx.node, key, first_value, second_value) + + +def test_concurrent_modification_on_task_failed(context, executor, lock_files): + _test(context, executor, lock_files, _test_task_failed, expected_failure=True) + + +@operation +def _test_task_failed(ctx, lock_files, key, first_value, second_value): + first = _concurrent_update(lock_files, ctx.node, key, first_value, second_value) + if not first: + raise RuntimeError('MESSAGE') + + +def test_concurrent_modification_on_update_and_refresh(context, executor, lock_files): + _test(context, executor, lock_files, _test_update_and_refresh, expected_failure=False) + + +@operation +def _test_update_and_refresh(ctx, lock_files, key, first_value, second_value): + node = ctx.node + first = _concurrent_update(lock_files, node, key, first_value, second_value) + if not first: + try: + ctx.model.node.update(node) + except StorageError as e: + assert 'Version conflict' in str(e) + ctx.model.node.refresh(node) + else: + raise RuntimeError('Unexpected') + + +def _test(context, executor, lock_files, func, expected_failure): + def _node(ctx): + return ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME) + + op_name = mock.operations.NODE_OPERATIONS_INSTALL[0] + + key = 'key' + first_value = 'value1' + second_value = 'value2' + inputs = { + 'lock_files': lock_files, + 'key': key, + 'first_value': first_value, + 'second_value': second_value + } + + node = _node(context) + node.interfaces = [mock.models.get_interface( + op_name, + operation_kwargs=dict(implementation='{0}.{1}'.format(__name__, func.__name__)) + )] + context.model.node.update(node) + + @workflow + def mock_workflow(graph, **_): + graph.add_tasks( + api.task.OperationTask.node(instance=node, name=op_name, inputs=inputs), + api.task.OperationTask.node(instance=node, name=op_name, inputs=inputs) + ) + + signal = events.on_failure_task_signal + with events_collector(signal) as collected: + try: + execute_workflow(mock_workflow, context, executor) + except ExecutorException: + pass + + props = _node(context).runtime_properties + assert props[key] == first_value + + exceptions = [event['kwargs']['exception'] for event in collected.get(signal, [])] + if expected_failure: + assert exceptions + exception = exceptions[-1] + assert isinstance(exception, StorageError) + assert 'Version conflict' in str(exception) + else: + assert not exceptions + + +@pytest.fixture +def executor(): + result = process.ProcessExecutor(python_path=[tests.ROOT_DIR]) + yield result + result.close() + + +@pytest.fixture +def context(tmpdir): + result = mock.context.simple(str(tmpdir)) + yield result + storage.release_sqlite_storage(result.model) + + +@pytest.fixture +def lock_files(tmpdir): + return str(tmpdir.join('first_lock_file')), str(tmpdir.join('second_lock_file')) + + +def _concurrent_update(lock_files, node, key, first_value, second_value): + + locker1 = fasteners.InterProcessLock(lock_files[0]) + locker2 = fasteners.InterProcessLock(lock_files[1]) + + first = locker1.acquire(blocking=False) + + if first: + # Give chance for both processes to acquire locks + while locker2.acquire(blocking=False): + locker2.release() + time.sleep(0.01) + else: + locker2.acquire() + + node.runtime_properties[key] = first_value if first else second_value + + if first: + locker1.release() + else: + with locker1: + locker2.release() + + return first http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96581d9f/tests/requirements.txt ---------------------------------------------------------------------- diff --git a/tests/requirements.txt b/tests/requirements.txt index 0e4740f..2f0245a 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -11,8 +11,9 @@ # limitations under the License. testtools +fasteners==0.13.0 mock==1.0.1 pylint==1.6.4 pytest==3.0.2 pytest-cov==2.3.1 -pytest-mock==1.2 \ No newline at end of file +pytest-mock==1.2 http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96581d9f/tests/storage/test_structures.py ---------------------------------------------------------------------- diff --git a/tests/storage/test_structures.py b/tests/storage/test_structures.py index 30f0064..666256e 100644 --- a/tests/storage/test_structures.py +++ b/tests/storage/test_structures.py @@ -125,7 +125,6 @@ def test_relationship_model_ordering(context): name='new_node_instance', runtime_properties={}, service_instance=service_instance, - version=None, node_template=new_node_template, state='', scaling_groups=[]