added another sqla event handler - needs rework, since we listen to init and not commit
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/41cd1bee Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/41cd1bee Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/41cd1bee Branch: refs/heads/ARIA-213-Sporadic-tests-failures-over-locked-database-issue Commit: 41cd1bee4e09d1872113a1ddae2d4cc17387c5e1 Parents: c8896ab Author: max-orlov <[email protected]> Authored: Wed May 10 18:38:36 2017 +0300 Committer: max-orlov <[email protected]> Committed: Thu May 11 14:28:49 2017 +0300 ---------------------------------------------------------------------- aria/orchestrator/context/common.py | 3 +- aria/orchestrator/workflows/executor/process.py | 43 +++++++--- aria/storage/instrumentation.py | 83 ++++++++++++-------- aria/storage/sql_mapi.py | 2 +- examples/hello-world/scripts/configure.sh | 4 +- examples/hello-world/scripts/start.sh | 6 +- examples/hello-world/scripts/stop.sh | 4 +- 7 files changed, 89 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/41cd1bee/aria/orchestrator/context/common.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py index c0047e9..0854a27 100644 --- a/aria/orchestrator/context/common.py +++ b/aria/orchestrator/context/common.py @@ -192,7 +192,6 @@ class BaseContext(object): def _render_resource(self, resource_content, variables): variables = variables or {} - if 'ctx' not in variables: - variables['ctx'] = self + variables.setdefault('ctx', self) resource_template = jinja2.Template(resource_content) return resource_template.render(variables) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/41cd1bee/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index e464f7d..52da26d 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -229,6 +229,7 @@ class ProcessExecutor(base.BaseExecutor): def _apply_tracked_changes(task, request): instrumentation.apply_tracked_changes( tracked_changes=request['tracked_changes'], + new_instances=request['new_instances'], model=task.context.model) @@ -277,22 +278,28 @@ class _Messenger(object): """Task started message""" self._send_message(type='started') - def succeeded(self, tracked_changes): + def succeeded(self, tracked_changes, new_instances): """Task succeeded message""" - self._send_message(type='succeeded', tracked_changes=tracked_changes) + self._send_message( + type='succeeded', tracked_changes=tracked_changes, new_instances=new_instances) - def failed(self, tracked_changes, exception): + def failed(self, tracked_changes, new_instances, exception): """Task failed message""" - self._send_message(type='failed', tracked_changes=tracked_changes, exception=exception) + self._send_message(type='failed', + tracked_changes=tracked_changes, + new_instances=new_instances, + exception=exception) - def apply_tracked_changes(self, tracked_changes): - self._send_message(type='apply_tracked_changes', tracked_changes=tracked_changes) + def apply_tracked_changes(self, tracked_changes, new_instances): + self._send_message(type='apply_tracked_changes', + tracked_changes=tracked_changes, + new_instances=new_instances) def closed(self): """Executor closed message""" self._send_message(type='closed') - def _send_message(self, type, tracked_changes=None, exception=None): + def _send_message(self, type, tracked_changes=None, new_instances=None, exception=None): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect(('localhost', self.port)) try: @@ -301,7 +308,8 @@ class _Messenger(object): 'task_id': self.task_id, 'exception': exceptions.wrap_if_needed(exception), 'traceback': exceptions.get_exception_as_string(*sys.exc_info()), - 'tracked_changes': tracked_changes + 'tracked_changes': tracked_changes, + 'new_instances': new_instances or {} }) response = _recv_message(sock) response_exception = response.get('exception') @@ -311,7 +319,7 @@ class _Messenger(object): sock.close() -def _patch_session(ctx, messenger, instrument): +def _patch_ctx(ctx, messenger, instrument): # model will be None only in tests that test the executor component directly if not ctx.model: return @@ -326,17 +334,23 @@ def _patch_session(ctx, messenger, instrument): original_refresh(target) def patched_commit(): - messenger.apply_tracked_changes(instrument.tracked_changes) + messenger.apply_tracked_changes(instrument.tracked_changes, instrument.new_instances) instrument.clear() def patched_rollback(): # Rollback is performed on parent process when commit fails pass + def patched_put(_): + # TODO: currently we need to add signal to the put event (or commit per model), but currently we just use the init event. + pass + # when autoflush is set to true (the default), refreshing an object will trigger # an auto flush by sqlalchemy, this autoflush will attempt to commit changes made so # far on the session. this is not the desired behavior in the subprocess session.autoflush = False + # for model_name in instrument.new_instances: + ctx.model.log.put = patched_put session.commit = patched_commit session.rollback = patched_rollback @@ -368,15 +382,18 @@ def _main(): try: messenger.started() ctx = context_dict['context_cls'].deserialize_from_dict(**context_dict['context']) - _patch_session(ctx=ctx, messenger=messenger, instrument=instrument) + _patch_ctx(ctx=ctx, messenger=messenger, instrument=instrument) task_func = imports.load_attribute(implementation) aria.install_aria_extensions() for decorate in process_executor.decorate(): task_func = decorate(task_func) task_func(ctx=ctx, **operation_inputs) - messenger.succeeded(tracked_changes=instrument.tracked_changes) + messenger.succeeded(tracked_changes=instrument.tracked_changes, + new_instances=instrument.new_instances) except BaseException as e: - messenger.failed(exception=e, tracked_changes=instrument.tracked_changes) + messenger.failed(exception=e, + tracked_changes=instrument.tracked_changes, + new_instances=instrument.new_instances) if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/41cd1bee/aria/storage/instrumentation.py ---------------------------------------------------------------------- diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py index 79f821a..6ca79ce 100644 --- a/aria/storage/instrumentation.py +++ b/aria/storage/instrumentation.py @@ -24,22 +24,17 @@ from ..storage.exceptions import StorageError _VERSION_ID_COL = 'version' _STUB = object() _INSTRUMENTED = { - # Node related stuff - _models.Node.runtime_properties: dict, - _models.Node.state: str, - - # # Task related stuff - _models.Task.status: str, - - # # Log related stuff - _models.Log.level: str, - _models.Log.msg: str, - _models.Log.traceback: str, - _models.Log.created_at: lambda date: date, - _models.Log.execution_fk: int, - _models.Log.task_fk: int, + 'modified': { + _models.Node.runtime_properties: dict, + _models.Node.state: str, + # _models.Task.status: str, + }, + 'new': (_models.Log, ) + } +_NEW_INSTANCE = 'NEW_INSTANCE' + def track_changes(instrumented=None): """Track changes in the specified model columns @@ -71,23 +66,45 @@ class _Instrumentation(object): def __init__(self, instrumented): self.tracked_changes = {} + self.new_instances = {} self.listeners = [] self._track_changes(instrumented) + self._new_instance_index = 0 + + @property + def _new_instance_id(self): + self._new_instance_index += 1 + return '{prefix}_{index}'.format(prefix=_NEW_INSTANCE, index=self._new_instance_index - 1) def _track_changes(self, instrumented): - instrumented_classes = {} - for instrumented_attribute, attribute_type in instrumented.items(): + instrumented_attribute_classes = {} + for instrumented_attribute, attribute_type in instrumented.get('modified', {}).items(): self._register_set_attribute_listener( instrumented_attribute=instrumented_attribute, attribute_type=attribute_type) instrumented_class = instrumented_attribute.parent.entity - instrumented_class_attributes = instrumented_classes.setdefault(instrumented_class, {}) + instrumented_class_attributes = instrumented_attribute_classes.setdefault( + instrumented_class, {}) instrumented_class_attributes[instrumented_attribute.key] = attribute_type - for instrumented_class, instrumented_attributes in instrumented_classes.items(): - self._register_instance_listeners( + for instrumented_class, instrumented_attributes in instrumented_attribute_classes.items(): + self._register_instance_attribute_listeners( instrumented_class=instrumented_class, instrumented_attributes=instrumented_attributes) + # instrument creation of new instances + for instrumented_class in instrumented.get('new', {}): + self._register_instance_listener(instrumented_class) + + def _register_instance_listener(self, instrumented_class): + def listener(target, *args, **_): + mapi_name = target.__modelname__ + tracked_instances = self.new_instances.setdefault(mapi_name, {}) + tracked_attributes = tracked_instances.setdefault(self._new_instance_id, {}) + tracked_attributes.update(**args[1]) + listener_args = (instrumented_class, 'init', listener) + sqlalchemy.event.listen(*listener_args) + self.listeners.append(listener_args) + def _register_set_attribute_listener(self, instrumented_attribute, attribute_type): def listener(target, value, *_): mapi_name = target.__modelname__ @@ -103,7 +120,7 @@ class _Instrumentation(object): sqlalchemy.event.listen(*listener_args, retval=True) self.listeners.append(listener_args) - def _register_instance_listeners(self, instrumented_class, instrumented_attributes): + def _register_instance_attribute_listeners(self, instrumented_class, instrumented_attributes): def listener(target, *_): mapi_name = instrumented_class.__modelname__ tracked_instances = self.tracked_changes.setdefault(mapi_name, {}) @@ -136,6 +153,8 @@ class _Instrumentation(object): else: self.tracked_changes.clear() + self.new_instances.clear() + def restore(self): """Remove all listeners registered by this instrumentation""" for listener_args in self.listeners: @@ -171,7 +190,7 @@ class _Value(object): return {'initial': self.initial, 'current': self.current}.copy() -def apply_tracked_changes(tracked_changes, model): +def apply_tracked_changes(tracked_changes, new_instances, model): """Write tracked changes back to the database using provided model storage :param tracked_changes: The ``tracked_changes`` attribute of the instrumentation context @@ -188,23 +207,21 @@ def apply_tracked_changes(tracked_changes, model): instance = None for attribute_name, value in tracked_attributes.items(): if value.initial != value.current: - if not instance: - # The object can be entirely new (Log is an example of this use case, - # its id is None (or 'null'), thus we need to create it from scratch, - # and not just update it. - instance = \ - mapi.model_cls() if instance_id == 'null' else mapi.get(instance_id) - + instance = instance or mapi.get(instance_id) setattr(instance, attribute_name, value.current) if instance: _validate_version_id(instance, mapi) - # This follows the same logic as the same comment regarding 'null' - if instance_id in ('null', None): - mapi.put(instance) - else: - mapi.update(instance) + mapi.update(instance) successfully_updated_changes[mapi_name][instance_id] = [ v.dict for v in tracked_attributes.values()] + + for mapi_name, new_instance in new_instances.items(): + successfully_updated_changes[mapi_name] = dict() + mapi = getattr(model, mapi_name) + for new_instance_kwargs in sorted(new_instance.values()): + instance = mapi.model_cls(**new_instance_kwargs) + mapi.put(instance) + successfully_updated_changes[mapi_name][instance.id] = new_instance_kwargs except BaseException: for key, value in successfully_updated_changes.items(): if not value: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/41cd1bee/aria/storage/sql_mapi.py ---------------------------------------------------------------------- diff --git a/aria/storage/sql_mapi.py b/aria/storage/sql_mapi.py index 730d007..a22c4b1 100644 --- a/aria/storage/sql_mapi.py +++ b/aria/storage/sql_mapi.py @@ -397,7 +397,7 @@ def init_storage(base_dir, filename='db.sqlite'): path=os.path.join(base_dir, filename)) - engine = create_engine(uri) + engine = create_engine(uri, ) session_factory = orm.sessionmaker(bind=engine) session = orm.scoped_session(session_factory=session_factory) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/41cd1bee/examples/hello-world/scripts/configure.sh ---------------------------------------------------------------------- diff --git a/examples/hello-world/scripts/configure.sh b/examples/hello-world/scripts/configure.sh index 6e93053..b55ec17 100755 --- a/examples/hello-world/scripts/configure.sh +++ b/examples/hello-world/scripts/configure.sh @@ -8,7 +8,7 @@ if [ -d ${PYTHON_FILE_SERVER_ROOT} ]; then echo "Removing file server root folder ${PYTHON_FILE_SERVER_ROOT}" rm -rf ${PYTHON_FILE_SERVER_ROOT} fi -#ctx logger info "Creating HTTP server root directory at ${PYTHON_FILE_SERVER_ROOT}" +ctx logger info "Creating HTTP server root directory at ${PYTHON_FILE_SERVER_ROOT}" mkdir -p ${PYTHON_FILE_SERVER_ROOT} @@ -17,7 +17,7 @@ cd ${PYTHON_FILE_SERVER_ROOT} index_path="index.html" image_path="images/aria-logo.png" -#ctx logger info "Downloading blueprint resources..." +ctx logger info "Downloading blueprint resources..." ctx download-resource-and-render ${PYTHON_FILE_SERVER_ROOT}/index.html ${index_path} ctx download-resource ${PYTHON_FILE_SERVER_ROOT}/aria-logo.png ${image_path} http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/41cd1bee/examples/hello-world/scripts/start.sh ---------------------------------------------------------------------- diff --git a/examples/hello-world/scripts/start.sh b/examples/hello-world/scripts/start.sh index 03c59d4..41b6ec7 100755 --- a/examples/hello-world/scripts/start.sh +++ b/examples/hello-world/scripts/start.sh @@ -6,16 +6,16 @@ TEMP_DIR="/tmp" PYTHON_FILE_SERVER_ROOT=${TEMP_DIR}/python-simple-http-webserver PID_FILE="server.pid" -#ctx logger info "Starting HTTP server from ${PYTHON_FILE_SERVER_ROOT}" +ctx logger info "Starting HTTP server from ${PYTHON_FILE_SERVER_ROOT}" port=$(ctx node properties port) cd ${PYTHON_FILE_SERVER_ROOT} -#ctx logger info "Starting SimpleHTTPServer" +ctx logger info "Starting SimpleHTTPServer" nohup python -m SimpleHTTPServer ${port} > /dev/null 2>&1 & echo $! > ${PID_FILE} -#ctx logger info "Waiting for server to launch on port ${port}" +ctx logger info "Waiting for server to launch on port ${port}" url="http://localhost:${port}" server_is_up() { http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/41cd1bee/examples/hello-world/scripts/stop.sh ---------------------------------------------------------------------- diff --git a/examples/hello-world/scripts/stop.sh b/examples/hello-world/scripts/stop.sh index 63e1971..5461caf 100755 --- a/examples/hello-world/scripts/stop.sh +++ b/examples/hello-world/scripts/stop.sh @@ -8,8 +8,8 @@ PID_FILE="server.pid" PID=`cat ${PYTHON_FILE_SERVER_ROOT}/${PID_FILE}` -#ctx logger info "Shutting down file server. pid = ${PID}" +ctx logger info "Shutting down file server. pid = ${PID}" kill -9 ${PID} || exit $? -#ctx logger info "Deleting file server root directory (${PYTHON_FILE_SERVER_ROOT})" +ctx logger info "Deleting file server root directory (${PYTHON_FILE_SERVER_ROOT})" rm -rf ${PYTHON_FILE_SERVER_ROOT}
