WIP squash
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/adf19ed6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/adf19ed6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/adf19ed6 Branch: refs/heads/USERGRID-509 Commit: adf19ed63bcbc532a7896ee36805289b391a9571 Parents: 8fe7267 Author: Todd Nine <[email protected]> Authored: Tue Mar 24 16:33:35 2015 -0600 Committer: Todd Nine <[email protected]> Committed: Tue Mar 24 16:33:35 2015 -0600 ---------------------------------------------------------------------- .../rx/impl/AllEntitiesInSystemImpl.java | 2 +- .../migration/EntityTypeMappingMigrationIT.java | 5 +- .../persistence/collection/CollectionScope.java | 6 +- .../collection/EntityCollectionManager.java | 39 ++-- .../EntityCollectionManagerFactory.java | 9 +- .../persistence/collection/ScopeSet.java | 50 +++++ .../cache/CachedEntityCollectionManager.java | 22 +- .../exception/CollectionRuntimeException.java | 36 ++- .../exception/WriteCommitException.java | 18 +- .../WriteOptimisticVerifyException.java | 17 +- .../exception/WriteStartException.java | 23 +- .../exception/WriteUniqueVerifyException.java | 7 +- .../EntityCollectionManagerFactoryImpl.java | 13 +- .../impl/EntityCollectionManagerImpl.java | 217 +++++++++++-------- .../collection/impl/EntityDeletedTask.java | 20 +- .../impl/EntityVersionCleanupTask.java | 29 +-- .../mvcc/MvccLogEntrySerializationStrategy.java | 16 +- .../mvcc/stage/CollectionIoEvent.java | 15 +- .../mvcc/stage/EntityUpdateEvent.java | 5 +- .../collection/mvcc/stage/delete/MarkStart.java | 8 +- .../mvcc/stage/write/RollbackAction.java | 6 +- .../mvcc/stage/write/WriteCommit.java | 10 +- .../mvcc/stage/write/WriteOptimisticVerify.java | 8 +- .../collection/mvcc/stage/write/WriteStart.java | 10 +- .../mvcc/stage/write/WriteUniqueVerify.java | 25 ++- .../MvccEntitySerializationStrategy.java | 18 +- .../UniqueValueSerializationStrategy.java | 18 +- .../serialization/UniqueValueSet.java | 4 +- .../serialization/impl/LogEntryIterator.java | 7 +- .../MvccEntitySerializationStrategyImpl.java | 61 +++--- ...vccEntitySerializationStrategyProxyImpl.java | 50 +++-- .../MvccEntitySerializationStrategyV3Impl.java | 55 +++-- .../MvccLogEntrySerializationStrategyImpl.java | 44 ++-- .../serialization/impl/ScopeSetImpl.java | 67 ++++++ .../UniqueValueSerializationStrategyImpl.java | 40 ++-- .../serialization/impl/UniqueValueSetImpl.java | 31 ++- .../impl/migration/EntityIdScope.java | 11 +- .../migration/MvccEntityDataMigrationImpl.java | 17 +- .../impl/EntityVersionCleanupTaskTest.java | 27 +-- .../mvcc/stage/AbstractEntityStageTest.java | 2 +- .../mvcc/stage/AbstractIdStageTest.java | 2 +- .../mvcc/stage/AbstractMvccEntityStageTest.java | 2 +- .../mvcc/stage/delete/MarkCommitTest.java | 2 +- .../mvcc/stage/delete/MarkStartTest.java | 2 +- .../mvcc/stage/write/WriteCommitTest.java | 2 +- .../stage/write/WriteOptimisticVerifyTest.java | 16 +- .../mvcc/stage/write/WriteStartTest.java | 4 +- .../mvcc/stage/write/WriteUniqueVerifyTest.java | 2 +- ...ctMvccEntityDataMigrationV1ToV3ImplTest.java | 3 +- 49 files changed, 680 insertions(+), 423 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/adf19ed6/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemImpl.java index 758a8aa..90eda53 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemImpl.java @@ -55,7 +55,7 @@ public class AllEntitiesInSystemImpl extends AbstractGraphVisitorImpl<EntityIdSc CollectionScope scope = CpNamingUtils.getCollectionScopeNameFromEntityType( applicationScope.getApplication(), nodeId.getType() ); - final EntityIdScope idScope = new EntityIdScope( scope, nodeId ); + final EntityIdScope idScope = new EntityIdScope( applicationScope, scope, nodeId ); return idScope; } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/adf19ed6/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java index 88f56c8..77ff882 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java @@ -22,7 +22,6 @@ package org.apache.usergrid.corepersistence.migration; import org.junit.Test; -import org.apache.usergrid.AbstractCoreIT; import org.apache.usergrid.corepersistence.ManagerCache; import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.persistence.collection.CollectionScope; @@ -60,7 +59,7 @@ public class EntityTypeMappingMigrationIT { final Id entityId1 = createId("thing"); - final EntityIdScope idScope1 = new EntityIdScope(scope1, entityId1 ); + final EntityIdScope idScope1 = new EntityIdScope( applicationScope, scope1, entityId1 ); final MapScope mapScope1 = new MapScopeImpl(applicationId, CpNamingUtils.TYPES_BY_UUID_MAP ); @@ -72,7 +71,7 @@ public class EntityTypeMappingMigrationIT { final Id entityId2 = createId("foo"); - final EntityIdScope idScope2 = new EntityIdScope( scope2, entityId2 ); + final EntityIdScope idScope2 = new EntityIdScope( applicationScope, scope2, entityId2 ); final MapScope mapScope2 = new MapScopeImpl(applicationId, CpNamingUtils.TYPES_BY_UUID_MAP ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/adf19ed6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionScope.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionScope.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionScope.java index 7fda54e..f4eb970 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionScope.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/CollectionScope.java @@ -28,18 +28,18 @@ import org.apache.usergrid.persistence.model.entity.Id; * organization. Data encapsulated within instances of a scope are mutually exclusive from instances with other ids and * names. */ -public interface CollectionScope extends ApplicationScope { +public interface CollectionScope { /** * @return The name of the collection. If you use pluralization for you names vs types, * you must keep the consistent or you will be unable to load data */ - public String getName(); + String getName(); /** * @return A uuid that is unique to this context. It can be any uuid (time uuid preferred). Usually an application * Id, but could be an entity Id that is the parent of another collection */ - public Id getOwner(); + Id getOwner(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/adf19ed6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java index 6532ee6..3a13b77 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java @@ -37,48 +37,59 @@ public interface EntityCollectionManager { * completely overwrite the previous values, if it exists. * @param entity The entity to update */ - public Observable<Entity> write( Entity entity ); + Observable<Entity> write( CollectionScope collectionScope, Entity entity ); /** * MarkCommit the entity and remove it's indexes with the given entity id + * @param collectionScope The scope this enttity exists in + * @param entityId The entity id to delete */ - public Observable<Id> delete( Id entityId ); + Observable<Id> delete( CollectionScope collectionScope, Id entityId ); /** * Load the entity with the given entity Id + * @param collectionScope The scope this entity exists in + * @param entityId The entity id to remove */ - public Observable<Entity> load( Id entityId ); + Observable<Entity> load(CollectionScope collectionScope, Id entityId ); /** * Return the latest versions of the specified entityIds + * + * @param entityId A collection of scopes with the entity Ids set */ - public Observable<VersionSet> getLatestVersion( Collection<Id> entityId ); + Observable<VersionSet> getLatestVersion( Collection<ScopeSet<Id>> entityId ); - public Observable<FieldSet> getEntitiesFromFields( Collection<Field> fields ); + /** + * + * @param fields The collection of scopes for fields to use + * @return + */ + Observable<FieldSet> getEntitiesFromFields( Collection<ScopeSet<Field>> fields ); /** * Gets the Id for a field + * + * @param collectionScope The scope for the field + * @param field The field for the scope + * * @return most likely a single Id, watch for onerror events */ - public Observable<Id> getIdField(final Field field); + Observable<Id> getIdField(CollectionScope collectionScope, Field field); /** - * Audit a unique field, and remove any stale entries in the system - * @param field The field to audit within this collection scope. - - public Observable<Integer> auditUniqueField(final Field field); - */ - /** * Load all the entityIds into the observable entity set + * + * @param entityIds The entity ids */ - public Observable<EntitySet> load(Collection<Id> entityIds); + Observable<EntitySet> load(Collection<ScopeSet<Id>> entityIds); /** * Returns health of entity data store. */ - public Health getHealth(); + Health getHealth(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/adf19ed6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactory.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactory.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactory.java index 16045ed..9a621e0 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactory.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactory.java @@ -19,6 +19,9 @@ package org.apache.usergrid.persistence.collection; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; + + /** * A basic factory that creates a collection manager with the given context. * Each instance of this factory should exist for a Single ApplicationScope @@ -31,13 +34,13 @@ public interface EntityCollectionManagerFactory { * and will shard responses. The returned instance should not be shared * among threads it will not be guaranteed to be thread safe. * - * @param collectionScope The collectionScope collectionScope to use + * @param applicationScope The application scope collectionScope to use * when creating the collectionScope manager * * @return The collectionScope manager to perform operations within the provided context */ - public EntityCollectionManager - createCollectionManager( CollectionScope collectionScope ); + EntityCollectionManager + createCollectionManager( ApplicationScope applicationScope ); void invalidate(); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/adf19ed6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/ScopeSet.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/ScopeSet.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/ScopeSet.java new file mode 100644 index 0000000..059c7cb --- /dev/null +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/ScopeSet.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.collection; + + +import java.util.Collection; + +import org.apache.usergrid.persistence.model.entity.Id; + + +/** + * A set that contains the collection scope and the ids + */ +public interface ScopeSet<T> { + + /** + * Get the scope for this set of ids + * @return + */ + CollectionScope getScope(); + + /** + * Get the set of Ids to load for this scope + * @return + */ + Collection<T> getIdentifiers(); + + /** + * Add the itentifier to the list + * @param identifier + */ + void addIdentifier(T identifier); +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/adf19ed6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java index 955f0b7..20a269b 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java @@ -70,19 +70,19 @@ public class CachedEntityCollectionManager implements EntityCollectionManager { } @Override - public Observable<FieldSet> getEntitiesFromFields( final Collection<Field> fields ) { + public Observable<FieldSet> getEntitiesFromFields( final Collection<ScopeSet<Field>> fields ) { return targetEntityCollectionManager.getEntitiesFromFields( fields ); } @Override - public Observable<Entity> write( final Entity entity ) { - return targetEntityCollectionManager.write( entity ).doOnNext( cacheAdd ); + public Observable<Entity> write( final CollectionScope collectionScope, final Entity entity ) { + return targetEntityCollectionManager.write(collectionScope, entity ).doOnNext( cacheAdd ); } @Override - public Observable<Id> delete( final Id entityId ) { - return targetEntityCollectionManager.delete( entityId ).doOnNext( new Action1<Id>() { + public Observable<Id> delete( final CollectionScope collectionScope, final Id entityId ) { + return targetEntityCollectionManager.delete( collectionScope, entityId ).doOnNext( new Action1<Id>() { @Override public void call( final Id id ) { entityCache.invalidate( id ); @@ -92,32 +92,32 @@ public class CachedEntityCollectionManager implements EntityCollectionManager { @Override - public Observable<Entity> load( final Id entityId ) { + public Observable<Entity> load(final CollectionScope collectionScope, final Id entityId ) { final Entity entity = entityCache.getIfPresent( entityId ); if ( entity != null ) { return Observable.just( entity ); } - return targetEntityCollectionManager.load( entityId ).doOnNext( cacheAdd ); + return targetEntityCollectionManager.load(collectionScope, entityId ).doOnNext( cacheAdd ); } @Override - public Observable<VersionSet> getLatestVersion( final Collection<Id> entityId ) { + public Observable<VersionSet> getLatestVersion( final Collection<ScopeSet<Id>> entityId ) { return targetEntityCollectionManager.getLatestVersion( entityId ); } @Override - public Observable<Id> getIdField( final Field field ) { - return targetEntityCollectionManager.getIdField( field ); + public Observable<Id> getIdField(final CollectionScope collectionScope, final Field field ) { + return targetEntityCollectionManager.getIdField( collectionScope, field ); } @Override - public Observable<EntitySet> load( final Collection<Id> entityIds ) { + public Observable<EntitySet> load( final Collection<ScopeSet<Id>> entityIds ) { return targetEntityCollectionManager.load( entityIds ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/adf19ed6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/CollectionRuntimeException.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/CollectionRuntimeException.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/CollectionRuntimeException.java index 416cb9f..fc4cac5 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/CollectionRuntimeException.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/CollectionRuntimeException.java @@ -17,53 +17,67 @@ */ package org.apache.usergrid.persistence.collection.exception; + import org.apache.usergrid.persistence.collection.CollectionScope; import org.apache.usergrid.persistence.collection.MvccEntity; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; public class CollectionRuntimeException extends RuntimeException { private MvccEntity entity; + private ApplicationScope applicationScope; private CollectionScope collectionScope; - public CollectionRuntimeException( MvccEntity entity, CollectionScope scope, final String message ) { - super( message ); + public CollectionRuntimeException(final MvccEntity entity,final ApplicationScope applicationScope, final CollectionScope collectionScope, final String message ) { + super( message ); this.entity = entity; - this.collectionScope = scope; + this.applicationScope = applicationScope; + this.collectionScope = collectionScope; } - public CollectionRuntimeException( MvccEntity entity, CollectionScope scope, final String message, final Throwable cause ) { + public CollectionRuntimeException(final MvccEntity entity, final ApplicationScope applicationScope,final CollectionScope collectionScope, final String message, final Throwable cause ) { super( message, cause ); this.entity = entity; - this.collectionScope = scope; + this.applicationScope = applicationScope; + this.collectionScope = collectionScope; } - public CollectionRuntimeException( MvccEntity entity, CollectionScope scope, final Throwable cause ) { + public CollectionRuntimeException(final MvccEntity entity, final ApplicationScope applicationScope,final CollectionScope collectionScope, final Throwable cause ) { super( cause ); this.entity = entity; - this.collectionScope = scope; + this.applicationScope = applicationScope; + this.collectionScope = collectionScope; } - public CollectionRuntimeException( MvccEntity entity, CollectionScope scope, + public CollectionRuntimeException( final MvccEntity entity, final ApplicationScope applicationScope, + final CollectionScope collectionScope, final String message, final Throwable cause, final boolean enableSuppression, final boolean writableStackTrace ) { super( message, cause, enableSuppression, writableStackTrace ); this.entity = entity; - this.collectionScope = scope; + this.applicationScope = applicationScope; + this.collectionScope = collectionScope; + } + + + public ApplicationScope getApplicationScope() { + return applicationScope; } - + public CollectionScope getCollectionScope() { return collectionScope; } + /** * Entity involved in operation. - * @return Entity or null if entity not instantiated yet in operation. + * @return Entity or null if entity not instantiated yet in operation. */ public MvccEntity getEntity() { return entity; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/adf19ed6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteCommitException.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteCommitException.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteCommitException.java index b0b8b9c..9e4cac3 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteCommitException.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteCommitException.java @@ -19,27 +19,15 @@ package org.apache.usergrid.persistence.collection.exception; import org.apache.usergrid.persistence.collection.CollectionScope; import org.apache.usergrid.persistence.collection.MvccEntity; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; public class WriteCommitException extends CollectionRuntimeException { - public WriteCommitException( MvccEntity entity, CollectionScope scope, final String message ) { - super( entity, scope, message ); - } - - - public WriteCommitException( MvccEntity entity, CollectionScope scope, final String message, final Throwable cause ) { - super( entity, scope, message, cause ); - } - - public WriteCommitException( MvccEntity entity, CollectionScope scope, final Throwable cause ) { - super( entity, scope, cause ); + public WriteCommitException( MvccEntity entity, ApplicationScope scope, final CollectionScope collectionScope, final String message, final Throwable cause ) { + super( entity, scope, collectionScope, message, cause ); } - public WriteCommitException( MvccEntity entity, CollectionScope scope, final String message, final Throwable cause, final boolean enableSuppression, - final boolean writableStackTrace ) { - super( entity, scope, message, cause, enableSuppression, writableStackTrace ); - } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/adf19ed6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteOptimisticVerifyException.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteOptimisticVerifyException.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteOptimisticVerifyException.java index ca9c7aa..cd13cd6 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteOptimisticVerifyException.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteOptimisticVerifyException.java @@ -19,29 +19,30 @@ package org.apache.usergrid.persistence.collection.exception; import org.apache.usergrid.persistence.collection.CollectionScope; import org.apache.usergrid.persistence.collection.MvccEntity; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; public class WriteOptimisticVerifyException extends CollectionRuntimeException { - public WriteOptimisticVerifyException( MvccEntity entity, CollectionScope scope, final String message ) { - super( entity, scope, message ); + public WriteOptimisticVerifyException( MvccEntity entity, ApplicationScope scope,final CollectionScope collectionScope, final String message ) { + super( entity, scope, collectionScope, message ); } - public WriteOptimisticVerifyException( MvccEntity entity, CollectionScope scope, final String message, final Throwable cause ) { - super( entity, scope, message, cause ); + public WriteOptimisticVerifyException( MvccEntity entity, ApplicationScope scope, final CollectionScope collectionScope, final String message, final Throwable cause ) { + super( entity, scope,collectionScope, message, cause ); } - public WriteOptimisticVerifyException( MvccEntity entity, CollectionScope scope, final Throwable cause ) { - super( entity, scope, cause ); + public WriteOptimisticVerifyException( MvccEntity entity, ApplicationScope scope, final CollectionScope collectionScope, final Throwable cause ) { + super( entity, scope,collectionScope, cause ); } - public WriteOptimisticVerifyException( MvccEntity entity, CollectionScope scope, + public WriteOptimisticVerifyException( MvccEntity entity, ApplicationScope scope, final CollectionScope collectionScope, final String message, final Throwable cause, final boolean enableSuppression, final boolean writableStackTrace ) { - super( entity, scope, message, cause, enableSuppression, writableStackTrace ); + super( entity, scope, collectionScope, message, cause, enableSuppression, writableStackTrace ); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/adf19ed6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteStartException.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteStartException.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteStartException.java index f8a90df..98bb132 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteStartException.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteStartException.java @@ -19,28 +19,33 @@ package org.apache.usergrid.persistence.collection.exception; import org.apache.usergrid.persistence.collection.CollectionScope; import org.apache.usergrid.persistence.collection.MvccEntity; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; public class WriteStartException extends CollectionRuntimeException { - public WriteStartException( MvccEntity entity, CollectionScope scope, final String message ) { - super( entity, scope, message ); + public WriteStartException( final MvccEntity entity, final ApplicationScope applicationScope, + final CollectionScope collectionScope, final String message ) { + super( entity, applicationScope, collectionScope, message ); } - public WriteStartException( MvccEntity entity, CollectionScope scope, final String message, final Throwable cause ) { - super( entity, scope, message, cause ); + public WriteStartException( final MvccEntity entity, final ApplicationScope applicationScope, + final CollectionScope collectionScope, final String message, final Throwable cause ) { + super( entity, applicationScope, collectionScope, message, cause ); } - public WriteStartException( MvccEntity entity, CollectionScope scope, final Throwable cause ) { - super( entity, scope, cause ); + public WriteStartException( final MvccEntity entity, final ApplicationScope applicationScope, + final CollectionScope collectionScope, final Throwable cause ) { + super( entity, applicationScope, collectionScope, cause ); } - public WriteStartException( MvccEntity entity, CollectionScope scope, final String message, final Throwable cause, final boolean enableSuppression, - final boolean writableStackTrace ) { - super( entity, scope, message, cause, enableSuppression, writableStackTrace ); + public WriteStartException( final MvccEntity entity, final ApplicationScope applicationScope, + final CollectionScope collectionScope, final String message, final Throwable cause, + final boolean enableSuppression, final boolean writableStackTrace ) { + super( entity, applicationScope, collectionScope, message, cause, enableSuppression, writableStackTrace ); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/adf19ed6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteUniqueVerifyException.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteUniqueVerifyException.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteUniqueVerifyException.java index a20e090..c441585 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteUniqueVerifyException.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteUniqueVerifyException.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.usergrid.persistence.collection.CollectionScope; import org.apache.usergrid.persistence.collection.MvccEntity; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.model.field.Field; @@ -31,9 +32,9 @@ import org.apache.usergrid.persistence.model.field.Field; public class WriteUniqueVerifyException extends CollectionRuntimeException { private Map<String, Field> violations; - - public WriteUniqueVerifyException( MvccEntity entity, CollectionScope scope, Map<String, Field> violations ) { - super( entity, scope, "Error: one or more duplicate fields detected"); + + public WriteUniqueVerifyException( MvccEntity entity, ApplicationScope scope, CollectionScope collectionScope, Map<String, Field> violations ) { + super( entity, scope, collectionScope, "Error: one or more duplicate fields detected"); this.violations = violations; } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/adf19ed6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java index 409467c..9e51e38 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java @@ -42,6 +42,7 @@ import org.apache.usergrid.persistence.collection.serialization.SerializationFig import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy; import org.apache.usergrid.persistence.core.guice.ProxyImpl; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.task.TaskExecutor; import com.google.common.base.Preconditions; @@ -78,10 +79,10 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag private final EntityCacheFig entityCacheFig; private final MetricsFactory metricsFactory; - private LoadingCache<CollectionScope, EntityCollectionManager> ecmCache = + private LoadingCache<ApplicationScope, EntityCollectionManager> ecmCache = CacheBuilder.newBuilder().maximumSize( 1000 ) - .build( new CacheLoader<CollectionScope, EntityCollectionManager>() { - public EntityCollectionManager load( CollectionScope scope ) { + .build( new CacheLoader<ApplicationScope, EntityCollectionManager>() { + public EntityCollectionManager load( ApplicationScope scope ) { //create the target EM that will perform logic final EntityCollectionManager target = new EntityCollectionManagerImpl( writeStart, writeVerifyUnique, @@ -129,10 +130,10 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag this.metricsFactory = metricsFactory; } @Override - public EntityCollectionManager createCollectionManager(CollectionScope collectionScope) { - Preconditions.checkNotNull(collectionScope); + public EntityCollectionManager createCollectionManager(ApplicationScope applicationScope) { + Preconditions.checkNotNull(applicationScope, "applicationScope cannot be null"); try{ - return ecmCache.get(collectionScope); + return ecmCache.get(applicationScope); }catch (ExecutionException ee){ throw new RuntimeException(ee); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/adf19ed6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java index 70b5a3a..9453c65 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java @@ -28,15 +28,17 @@ import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.lang.NotImplementedException; + import org.apache.usergrid.persistence.collection.CollectionScope; import org.apache.usergrid.persistence.collection.EntityCollectionManager; import org.apache.usergrid.persistence.collection.EntitySet; import org.apache.usergrid.persistence.collection.FieldSet; import org.apache.usergrid.persistence.collection.MvccEntity; +import org.apache.usergrid.persistence.collection.ScopeSet; import org.apache.usergrid.persistence.collection.VersionSet; import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor; import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy; -import org.apache.usergrid.persistence.collection.mvcc.entity.MvccValidationUtils; import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent; import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit; import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart; @@ -50,8 +52,10 @@ import org.apache.usergrid.persistence.collection.serialization.UniqueValue; import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy; import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet; import org.apache.usergrid.persistence.collection.serialization.impl.MutableFieldSet; +import org.apache.usergrid.persistence.collection.serialization.impl.ScopeSetImpl; import org.apache.usergrid.persistence.core.guice.ProxyImpl; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.task.Task; import org.apache.usergrid.persistence.core.task.TaskExecutor; import org.apache.usergrid.persistence.core.util.Health; @@ -91,7 +95,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { private static final Logger logger = LoggerFactory.getLogger( EntityCollectionManagerImpl.class ); - private final CollectionScope collectionScope; + private final ApplicationScope applicationScope; //start stages @@ -141,14 +145,15 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { final Keyspace keyspace, final EntityVersionTaskFactory entityVersionTaskFactory, @CollectionTaskExecutor final TaskExecutor taskExecutor, - @Assisted final CollectionScope collectionScope, + @Assisted final ApplicationScope applicationScope, final MetricsFactory metricsFactory ) { this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy; this.entitySerializationStrategy = entitySerializationStrategy; - MvccValidationUtils.validateCollectionScope( collectionScope ); + + ValidationUtils.validateApplicationScope( applicationScope ); this.writeStart = writeStart; this.writeVerifyUnique = writeVerifyUnique; @@ -165,7 +170,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { this.entityVersionTaskFactory = entityVersionTaskFactory; this.taskExecutor = taskExecutor; - this.collectionScope = collectionScope; + this.applicationScope = applicationScope; this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy; this.writeTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class,"write.timer"); this.writeMeter = metricsFactory.getMeter(EntityCollectionManagerImpl.class, "write.meter"); @@ -181,7 +186,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { @Override - public Observable<Entity> write( final Entity entity ) { + public Observable<Entity> write( final CollectionScope collectionScope, final Entity entity ) { //do our input validation Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" ); @@ -192,7 +197,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { // create our observable and start the write - final CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>( collectionScope, entity ); + final CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>( applicationScope, collectionScope, entity ); Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner( writeData, writeStart ); @@ -228,14 +233,14 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { @Override - public Observable<Id> delete( final Id entityId ) { + public Observable<Id> delete(final CollectionScope collectionScope, final Id entityId ) { Preconditions.checkNotNull( entityId, "Entity id is required in this stage" ); Preconditions.checkNotNull( entityId.getUuid(), "Entity id is required in this stage" ); Preconditions.checkNotNull( entityId.getType(), "Entity type is required in this stage" ); final Timer.Context timer = deleteTimer.time(); - Observable<Id> o = Observable.just( new CollectionIoEvent<Id>( collectionScope, entityId ) ) + Observable<Id> o = Observable.just( new CollectionIoEvent<Id>( applicationScope, collectionScope, entityId ) ) .map(markStart) .doOnNext( markCommit ) .map(new Func1<CollectionIoEvent<MvccEntity>, Id>() { @@ -268,14 +273,17 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { @Override - public Observable<Entity> load( final Id entityId ) { + public Observable<Entity> load(final CollectionScope collectionScope, final Id entityId ) { Preconditions.checkNotNull( entityId, "Entity id required in the load stage" ); Preconditions.checkNotNull( entityId.getUuid(), "Entity id uuid required in load stage" ); Preconditions.checkNotNull( entityId.getType(), "Entity id type required in load stage" ); final Timer.Context timer = loadTimer.time(); - return load( Collections.singleton( entityId ) ).flatMap(new Func1<EntitySet, Observable<Entity>>() { + + final ScopeSet<Id> entityScopeSet = new ScopeSetImpl<>( collectionScope, Collections.singleton( entityId ) ); + + return load( Collections.singleton( entityScopeSet) ).flatMap(new Func1<EntitySet, Observable<Entity>>() { @Override public Observable<Entity> call(final EntitySet entitySet) { final MvccEntity entity = entitySet.getEntity(entityId); @@ -306,7 +314,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { @Override - public Observable<EntitySet> load( final Collection<Id> entityIds ) { + public Observable<EntitySet> load( final Collection<ScopeSet<Id>> entityIds ) { Preconditions.checkNotNull( entityIds, "entityIds cannot be null" ); @@ -318,7 +326,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { public void call( final Subscriber<? super EntitySet> subscriber ) { try { final EntitySet results = - entitySerializationStrategy.load( collectionScope, entityIds, UUIDGenerator.newTimeUUID() ); + entitySerializationStrategy.load( applicationScope, entityIds, UUIDGenerator.newTimeUUID() ); subscriber.onNext( results ); subscriber.onCompleted(); @@ -344,14 +352,16 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { @Override - public Observable<Id> getIdField( final Field field ) { - final List<Field> fields = Collections.singletonList( field ); - return rx.Observable.from( fields ).map( new Func1<Field, Id>() { + public Observable<Id> getIdField(final CollectionScope collectionScope, final Field field ) { + + return rx.Observable.just( field ).map( new Func1<Field, Id>() { @Override public Id call( Field field ) { try { - final UniqueValueSet set = uniqueValueSerializationStrategy.load( collectionScope, fields ); - final UniqueValue value = set.getValue( field.getName() ); + + final ScopeSet<Field> scopeSet = new ScopeSetImpl<>( collectionScope, field ); + final UniqueValueSet set = uniqueValueSerializationStrategy.load(applicationScope, Collections.singleton( scopeSet ) ); + final UniqueValue value = set.getValue(collectionScope, field.getName() ); return value == null ? null : value.getEntityId(); } catch ( ConnectionException e ) { @@ -369,77 +379,102 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { * @return */ @Override - public Observable<FieldSet> getEntitiesFromFields( final Collection<Field> fields ) { - return rx.Observable.just(fields).map( new Func1<Collection<Field>, FieldSet>() { - @Override - public FieldSet call( Collection<Field> fields ) { - try { - - final UUID startTime = UUIDGenerator.newTimeUUID(); - - //Get back set of unique values that correspond to collection of fields - UniqueValueSet set = uniqueValueSerializationStrategy.load( collectionScope, fields ); - - //Short circut if we don't have any uniqueValues from the given fields. - if(!set.iterator().hasNext()){ - return new MutableFieldSet( 0 ); - } - - - //loop through each field, and construct an entity load - List<Id> entityIds = new ArrayList<>(fields.size()); - List<UniqueValue> uniqueValues = new ArrayList<>(fields.size()); - - for(final Field expectedField: fields) { - - UniqueValue value = set.getValue(expectedField.getName()); - - if(value ==null){ - logger.debug( "Field does not correspond to a unique value" ); - } - - entityIds.add(value.getEntityId()); - uniqueValues.add(value); - } - - //Load a entity for each entityId we retrieved. - final EntitySet entitySet = entitySerializationStrategy.load(collectionScope, entityIds, startTime); - - //now loop through and ensure the entities are there. - final MutationBatch deleteBatch = keyspace.prepareMutationBatch(); - - final MutableFieldSet response = new MutableFieldSet(fields.size()); - - for(final UniqueValue expectedUnique: uniqueValues) { - final MvccEntity entity = entitySet.getEntity(expectedUnique.getEntityId()); - - //bad unique value, delete this, it's inconsistent - if(entity == null || !entity.getEntity().isPresent()){ - final MutationBatch valueDelete = uniqueValueSerializationStrategy.delete(collectionScope, expectedUnique); - deleteBatch.mergeShallow(valueDelete); - continue; - } - - - //else add it to our result set - response.addEntity(expectedUnique.getField(),entity); - - } - - //TODO: explore making this an Async process - //We'll repair it again if we have to - deleteBatch.execute(); - - return response; - - - } - catch ( ConnectionException e ) { - logger.error( "Failed to getIdField", e ); - throw new RuntimeException( e ); - } - } - } ); + public Observable<FieldSet> getEntitiesFromFields( final Collection<ScopeSet<Field>> fields ) { +// return rx.Observable.just(fields).map( new Func1<Collection<ScopeSet<Field>>, FieldSet>() { +// @Override +// public FieldSet call( Collection<ScopeSet<Field>> fields ) { +// try { +// +// final UUID startTime = UUIDGenerator.newTimeUUID(); +// +// //Get back set of unique values that correspond to collection of fields +// UniqueValueSet set = uniqueValueSerializationStrategy.load( applicationScope, fields ); +// +// //Short circut if we don't have any uniqueValues from the given fields. +// if(!set.iterator().hasNext()){ +// return new MutableFieldSet( 0 ); +// } +// +// +// //loop through each field, and construct an entity load +// +// List<ScopeSet<Id>> scopes = new ArrayList<ScopeSet<Id>>( fields.size() ); +// +// final List<Id> entityIds = new ArrayList<>( fields.size() ); +// final List<ScopeSet<UniqueValue>> uniqueValues = new ArrayList<>( fields.size() ); +// +// +// +// for(ScopeSet<Field> scope: fields) { +// +// +// +// +// final CollectionScope collectionScope = scope.getScope(); +// +// final ScopeSet<UniqueValue> scopedValue = new Sc +// +// +// +// for ( final Field expectedField : scope.getIdentifiers() ) { +// +// UniqueValue value = set.getValue( collectionScope, expectedField.getName() ); +// +// if ( value == null ) { +// logger.debug( "Field does not correspond to a unique value" ); +// } +// +// entityIds.add( value.getEntityId() ); +// uniqueValues.add( value ); +// } +// +// final ScopeSet<Id> scopeSet = new ScopeSetImpl<Id>( collectionScope, entityIds ); +// +// scopes.add( scopeSet ); +// } +// +// +// //Load a entity for each entityId we retrieved. +// final EntitySet entitySet = entitySerializationStrategy.load(applicationScope, scopes, startTime); +// +// //now loop through and ensure the entities are there. +// final MutationBatch deleteBatch = keyspace.prepareMutationBatch(); +// +// final MutableFieldSet response = new MutableFieldSet(fields.size()); +// +// for(final UniqueValue expectedUnique: uniqueValues) { +// final MvccEntity entity = entitySet.getEntity( expectedUnique.getEntityId() ); +// +// //bad unique value, delete this, it's inconsistent +// if(entity == null || !entity.getEntity().isPresent()){ +// final MutationBatch valueDelete = uniqueValueSerializationStrategy.delete(collectionScope, expectedUnique); +// deleteBatch.mergeShallow( valueDelete ); +// continue; +// } +// +// +// //else add it to our result set +// response.addEntity( expectedUnique.getField(), entity ); +// +// } +// +// +// //TODO: explore making this an Async process +// //We'll repair it again if we have to +// deleteBatch.execute(); +// +// return response; +// +// +// } +// catch ( ConnectionException e ) { +// logger.error( "Failed to getIdField", e ); +// throw new RuntimeException( e ); +// } +// } +// } ); + + throw new NotImplementedException( "Do this" ); } @@ -473,7 +508,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { @Override - public Observable<VersionSet> getLatestVersion( final Collection<Id> entityIds ) { + public Observable<VersionSet> getLatestVersion( final Collection<ScopeSet<Id>> entityIds ) { final Timer.Context timer = getLatestTimer.time(); return Observable.create( new Observable.OnSubscribe<VersionSet>() { @@ -482,7 +517,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { public void call( final Subscriber<? super VersionSet> subscriber ) { try { final VersionSet logEntries = mvccLogEntrySerializationStrategy - .load( collectionScope, entityIds, UUIDGenerator.newTimeUUID() ); + .load( applicationScope, entityIds, UUIDGenerator.newTimeUUID() ); subscriber.onNext( logEntries ); subscriber.onCompleted(); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/adf19ed6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java index 7620907..55c0bfc 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java @@ -25,6 +25,7 @@ import org.apache.usergrid.persistence.collection.CollectionScope; import org.apache.usergrid.persistence.collection.event.EntityDeleted; import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy; import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.task.Task; import org.apache.usergrid.persistence.model.entity.Id; import org.slf4j.Logger; @@ -52,25 +53,24 @@ public class EntityDeletedTask implements Task<Void> { private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy; private final MvccEntitySerializationStrategy entitySerializationStrategy; private final Set<EntityDeleted> listeners; + private final ApplicationScope applicationScope; private final CollectionScope collectionScope; private final Id entityId; private final UUID version; @Inject - public EntityDeletedTask( - EntityVersionTaskFactory entityVersionTaskFactory, - final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, - @ProxyImpl final MvccEntitySerializationStrategy entitySerializationStrategy, - final Set<EntityDeleted> listeners, // MUST be a set or Guice will not inject - @Assisted final CollectionScope collectionScope, - @Assisted final Id entityId, - @Assisted final UUID version) { + public EntityDeletedTask( EntityVersionTaskFactory entityVersionTaskFactory, final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, + @ProxyImpl final MvccEntitySerializationStrategy entitySerializationStrategy, + final Set<EntityDeleted> listeners, // MUST be a set or Guice will not inject + @Assisted final ApplicationScope applicationScope, @Assisted final CollectionScope collectionScope, + @Assisted final Id entityId, @Assisted final UUID version ) { this.entityVersionTaskFactory = entityVersionTaskFactory; this.logEntrySerializationStrategy = logEntrySerializationStrategy; this.entitySerializationStrategy = entitySerializationStrategy; this.listeners = listeners; + this.applicationScope = applicationScope; this.collectionScope = collectionScope; this.entityId = entityId; this.version = version; @@ -103,8 +103,8 @@ public class EntityDeletedTask implements Task<Void> { entityVersionTaskFactory.getCleanupTask( collectionScope, entityId, version, true ).call(); fireEvents(); - final MutationBatch entityDelete = entitySerializationStrategy.delete(collectionScope, entityId, version); - final MutationBatch logDelete = logEntrySerializationStrategy.delete(collectionScope, entityId, version); + final MutationBatch entityDelete = entitySerializationStrategy.delete(applicationScope, collectionScope, entityId, version); + final MutationBatch logDelete = logEntrySerializationStrategy.delete(applicationScope, collectionScope, entityId, version); entityDelete.execute(); logDelete.execute(); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/adf19ed6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java index 1a7b86b..a10d899 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java @@ -39,6 +39,7 @@ import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerialization import org.apache.usergrid.persistence.collection.serialization.SerializationFig; import org.apache.usergrid.persistence.collection.serialization.impl.LogEntryIterator; import org.apache.usergrid.persistence.core.rx.ObservableIterator; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.task.Task; import org.apache.usergrid.persistence.model.entity.Id; @@ -46,6 +47,7 @@ import com.netflix.astyanax.Keyspace; import com.netflix.astyanax.MutationBatch; import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; +import javafx.application.Application; import rx.Observable; import rx.functions.Action1; import rx.functions.Func1; @@ -69,6 +71,7 @@ public class EntityVersionCleanupTask implements Task<Void> { private final SerializationFig serializationFig; + private final ApplicationScope applicationScope; private final CollectionScope scope; private final Id entityId; private final UUID version; @@ -76,22 +79,20 @@ public class EntityVersionCleanupTask implements Task<Void> { @Inject - public EntityVersionCleanupTask( - final SerializationFig serializationFig, - final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, - final UniqueValueSerializationStrategy uniqueValueSerializationStrategy, - final Keyspace keyspace, - final Set<EntityVersionDeleted> listeners, // MUST be a set or Guice will not inject - @Assisted final CollectionScope scope, - @Assisted final Id entityId, - @Assisted final UUID version, - @Assisted final boolean includeVersion) { + public EntityVersionCleanupTask( final SerializationFig serializationFig, final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, + final UniqueValueSerializationStrategy uniqueValueSerializationStrategy, + final Keyspace keyspace, final Set<EntityVersionDeleted> listeners, + // MUST be a set or Guice will not inject + @Assisted final ApplicationScope applicationScope, @Assisted final CollectionScope scope, + @Assisted final Id entityId, @Assisted final UUID version, + @Assisted final boolean includeVersion ) { this.serializationFig = serializationFig; this.logEntrySerializationStrategy = logEntrySerializationStrategy; this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy; this.keyspace = keyspace; this.listeners = listeners; + this.applicationScope = applicationScope; this.scope = scope; this.entityId = entityId; this.version = version; @@ -130,7 +131,7 @@ public class EntityVersionCleanupTask implements Task<Void> { Observable.create( new ObservableIterator<UniqueValue>( "Unique value load" ) { @Override protected Iterator<UniqueValue> getIterator() { - return uniqueValueSerializationStrategy.getAllUniqueFields( scope, entityId ); + return uniqueValueSerializationStrategy.getAllUniqueFields(applicationScope, scope, entityId ); } } ) @@ -149,7 +150,7 @@ public class EntityVersionCleanupTask implements Task<Void> { for ( UniqueValue value : uniqueValues ) { - uniqueCleanupBatch.mergeShallow( uniqueValueSerializationStrategy.delete( scope, value ) ); + uniqueCleanupBatch.mergeShallow( uniqueValueSerializationStrategy.delete(applicationScope, scope, value ) ); } try { @@ -169,7 +170,7 @@ public class EntityVersionCleanupTask implements Task<Void> { @Override protected Iterator<MvccLogEntry> getIterator() { - return new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, version, + return new LogEntryIterator( logEntrySerializationStrategy, applicationScope, scope, entityId, version, serializationFig.getBufferSize() ); } } ) @@ -191,7 +192,7 @@ public class EntityVersionCleanupTask implements Task<Void> { for ( MvccLogEntry entry : mvccEntities ) { - logCleanupBatch.mergeShallow( logEntrySerializationStrategy.delete( scope, entityId, entry.getVersion() )); + logCleanupBatch.mergeShallow( logEntrySerializationStrategy.delete( applicationScope, scope, entityId, entry.getVersion() )); } try { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/adf19ed6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccLogEntrySerializationStrategy.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccLogEntrySerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccLogEntrySerializationStrategy.java index 4baef84..9f85f8c 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccLogEntrySerializationStrategy.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccLogEntrySerializationStrategy.java @@ -25,8 +25,10 @@ import java.util.UUID; import org.apache.usergrid.persistence.collection.CollectionScope; import org.apache.usergrid.persistence.collection.MvccLogEntry; +import org.apache.usergrid.persistence.collection.ScopeSet; import org.apache.usergrid.persistence.collection.VersionSet; import org.apache.usergrid.persistence.core.migration.schema.Migration; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.model.entity.Id; import com.netflix.astyanax.MutationBatch; @@ -40,26 +42,29 @@ public interface MvccLogEntrySerializationStrategy extends Migration { /** * Serialize the entity to the data store with the given collection context * + * @param applicationScope The applicationScope + * @param context The collection scope to write to * @param entry the entry to write * * @return The mutation batch with the mutation operations for this write. */ - public MutationBatch write( final CollectionScope context, MvccLogEntry entry ); + MutationBatch write( ApplicationScope applicationScope, CollectionScope context, MvccLogEntry entry ); /** * Load and return the stage with the given id and a version that is <= the version provided * - * @param context The context to persist the entity into + * @param applicationScope The context to persist the entity into * @param entityIds The entity id to load * @param version The max version to load. This will return the first version <= the given version * * @return The deserialized version of the log entry */ - public VersionSet load( final CollectionScope context, final Collection<Id> entityIds, final UUID version ); + VersionSet load( ApplicationScope applicationScope, Collection<ScopeSet<Id>> entityIds, UUID version ); /** * Load a list, from highest to lowest of the stage with versions <= version up to maxSize elements * + * @param applicationScope The applicationScope * @param context The context to load the entity from * @param entityId The entity id to load * @param version The max version to seek from @@ -67,14 +72,15 @@ public interface MvccLogEntrySerializationStrategy extends Migration { * * @return A list of entities up to max size ordered from max(UUID)=> min(UUID) */ - public List<MvccLogEntry> load( CollectionScope context, Id entityId, UUID version, int maxSize ); + List<MvccLogEntry> load( ApplicationScope applicationScope, CollectionScope context, Id entityId, UUID version, int maxSize ); /** * MarkCommit the stage from the context with the given entityId and version * + * @param applicationScope The applicationScope * @param context The context that contains the entity * @param entityId The entity id to delete * @param version The version to delete */ - public MutationBatch delete( CollectionScope context, Id entityId, UUID version ); + MutationBatch delete( ApplicationScope applicationScope, CollectionScope context, Id entityId, UUID version ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/adf19ed6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/CollectionIoEvent.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/CollectionIoEvent.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/CollectionIoEvent.java index d54f69a..e8cdce9 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/CollectionIoEvent.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/CollectionIoEvent.java @@ -21,6 +21,7 @@ package org.apache.usergrid.persistence.collection.mvcc.stage; import java.io.Serializable; import org.apache.usergrid.persistence.collection.CollectionScope; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; /** @@ -28,12 +29,15 @@ import org.apache.usergrid.persistence.collection.CollectionScope; */ public class CollectionIoEvent<T> implements Serializable { - private CollectionScope context; + private final ApplicationScope applicationScope; - private T event; + private final CollectionScope context; + private final T event; - public CollectionIoEvent( final CollectionScope context, final T event ) { + + public CollectionIoEvent( final ApplicationScope applicationScope, final CollectionScope context, final T event ) { + this.applicationScope = applicationScope; this.context = context; this.event = event; } @@ -44,6 +48,11 @@ public class CollectionIoEvent<T> implements Serializable { } + public ApplicationScope getApplicationScope() { + return applicationScope; + } + + public T getEvent() { return event; } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/adf19ed6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/EntityUpdateEvent.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/EntityUpdateEvent.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/EntityUpdateEvent.java index 3951226..8b2e562 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/EntityUpdateEvent.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/EntityUpdateEvent.java @@ -20,6 +20,7 @@ package org.apache.usergrid.persistence.collection.mvcc.stage; import org.apache.usergrid.persistence.collection.CollectionScope; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.model.entity.Id; @@ -28,7 +29,7 @@ import org.apache.usergrid.persistence.model.entity.Id; */ public class EntityUpdateEvent extends CollectionIoEvent<Id> { - public EntityUpdateEvent( final CollectionScope context, final Id event ) { - super( context, event ); + public EntityUpdateEvent( final ApplicationScope applicationScope, final CollectionScope context, final Id event ) { + super( applicationScope, context, event ); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/adf19ed6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkStart.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkStart.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkStart.java index c80b076..6829374 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkStart.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkStart.java @@ -49,8 +49,8 @@ import rx.functions.Func1; /** - * This is the first stage and should be invoked immediately when a write is started. - * It should persist the start of a new write in the data store for + * This is the first stage and should be invoked immediately when a write is started. + * It should persist the start of a new write in the data store for * a checkpoint and recovery */ @Singleton @@ -99,7 +99,7 @@ public class MarkStart implements Func1<CollectionIoEvent<Id>, CollectionIoEvent } catch ( ConnectionException e ) { LOG.error( "Failed to execute write asynchronously ", e ); - throw new CollectionRuntimeException( null, collectionScope, + throw new CollectionRuntimeException( null, collectionScope, "Failed to execute write asynchronously ", e ); } @@ -109,6 +109,6 @@ public class MarkStart implements Func1<CollectionIoEvent<Id>, CollectionIoEvent entityId, version, MvccEntity.Status.COMPLETE, Optional.<Entity>absent() ); - return new CollectionIoEvent<MvccEntity>( collectionScope, nextStage ); + return new CollectionIoEvent<MvccEntity>( applicationScope, collectionScope, nextStage ); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/adf19ed6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java index e1becf8..1cd3c1b 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java @@ -28,6 +28,7 @@ import org.apache.usergrid.persistence.collection.MvccEntity; import org.apache.usergrid.persistence.collection.serialization.UniqueValue; import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl; import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.field.Field; @@ -70,6 +71,7 @@ public class RollbackAction implements Action1<Throwable> { CollectionRuntimeException cre = ( CollectionRuntimeException ) t; final MvccEntity mvccEntity = cre.getEntity(); + final ApplicationScope applicationScope = cre.getApplicationScope(); final CollectionScope scope = cre.getCollectionScope(); // one batch to handle rollback @@ -85,7 +87,7 @@ public class RollbackAction implements Action1<Throwable> { UniqueValue toDelete = new UniqueValueImpl( field, entity.get().getId(), mvccEntity.getVersion() ); - MutationBatch deleteMb = uniqueValueStrat.delete(scope, toDelete ); + MutationBatch deleteMb = uniqueValueStrat.delete(applicationScope, scope, toDelete ); if ( rollbackMb == null ) { rollbackMb = deleteMb; @@ -106,7 +108,7 @@ public class RollbackAction implements Action1<Throwable> { } } - logEntryStrat.delete( scope, entity.get().getId(), mvccEntity.getVersion() ); + logEntryStrat.delete( applicationScope, scope, entity.get().getId(), mvccEntity.getVersion() ); } } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/adf19ed6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java index 65ba0b4..5637fe4 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java @@ -36,6 +36,7 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent; import org.apache.usergrid.persistence.collection.serialization.UniqueValue; import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl; import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.model.util.EntityUtils; import org.apache.usergrid.persistence.core.guice.ProxyImpl; import org.apache.usergrid.persistence.core.util.ValidationUtils; @@ -93,6 +94,7 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Entity> final Id entityId = mvccEntity.getId(); final UUID version = mvccEntity.getVersion(); final CollectionScope collectionScope = ioEvent.getEntityCollection(); + final ApplicationScope applicationScope = ioEvent.getApplicationScope(); //set the version into the entity EntityUtils.setVersion( mvccEntity.getEntity().get(), version ); @@ -102,10 +104,10 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Entity> final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, Stage.COMMITTED, MvccLogEntry.State.COMPLETE ); - MutationBatch logMutation = logEntryStrat.write( collectionScope, startEntry ); + MutationBatch logMutation = logEntryStrat.write( applicationScope, collectionScope, startEntry ); // now get our actual insert into the entity data - MutationBatch entityMutation = entityStrat.write( collectionScope, mvccEntity ); + MutationBatch entityMutation = entityStrat.write( applicationScope, collectionScope, mvccEntity ); // merge the 2 into 1 mutation logMutation.mergeShallow( entityMutation ); @@ -116,7 +118,7 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Entity> UniqueValue written = new UniqueValueImpl( field, entityId,version); - MutationBatch mb = uniqueValueStrat.write(collectionScope, written ); + MutationBatch mb = uniqueValueStrat.write(applicationScope, collectionScope, written ); LOG.debug("Finalizing {} unqiue value {}", field.getName(), field.getValue().toString()); @@ -130,7 +132,7 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Entity> } catch ( ConnectionException e ) { LOG.error( "Failed to execute write asynchronously ", e ); - throw new WriteCommitException( mvccEntity, collectionScope, + throw new WriteCommitException( mvccEntity, applicationScope, collectionScope, "Failed to execute write asynchronously ", e ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/adf19ed6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java index b031237..86a3f03 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java @@ -31,6 +31,7 @@ import org.apache.usergrid.persistence.collection.MvccLogEntry; import org.apache.usergrid.persistence.collection.mvcc.entity.MvccValidationUtils; import org.apache.usergrid.persistence.collection.mvcc.entity.Stage; import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.model.entity.Entity; import com.google.inject.Inject; @@ -69,14 +70,15 @@ public class WriteOptimisticVerify implements Action1<CollectionIoEvent<MvccEnti MvccEntity mvccEntity = ioevent.getEvent(); final Entity entity = mvccEntity.getEntity().get(); - CollectionScope collectionScope = ioevent.getEntityCollection(); + final CollectionScope collectionScope = ioevent.getEntityCollection(); + final ApplicationScope applicationScope = ioevent.getApplicationScope(); if ( entity.getVersion() == null ) { return; } - List<MvccLogEntry> versions = logEntryStrat.load( collectionScope, entity.getId(), entity.getVersion(), 2 ); + List<MvccLogEntry> versions = logEntryStrat.load(applicationScope, collectionScope, entity.getId(), entity.getVersion(), 2 ); // Previous log entry must be committed, otherwise somebody is already writing if ( versions.size() > 1 && versions.get( 1 ).getStage().ordinal() < Stage.COMMITTED.ordinal() ) { @@ -84,7 +86,7 @@ public class WriteOptimisticVerify implements Action1<CollectionIoEvent<MvccEnti log.debug( "Conflict writing entity id {} version {}", entity.getId().toString(), entity.getVersion().toString() ); - throw new WriteOptimisticVerifyException( mvccEntity, collectionScope, + throw new WriteOptimisticVerifyException( mvccEntity, applicationScope, collectionScope, "Change conflict, not first writer" ); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/adf19ed6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java index 92dc69d..7af54b9 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java @@ -15,6 +15,7 @@ import org.apache.usergrid.persistence.collection.mvcc.entity.Stage; import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl; import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl; import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.util.UUIDGenerator; @@ -56,6 +57,7 @@ public class WriteStart implements Func1<CollectionIoEvent<Entity>, CollectionIo { final Entity entity = ioEvent.getEvent(); final CollectionScope collectionScope = ioEvent.getEntityCollection(); + final ApplicationScope applicationScope = ioEvent.getApplicationScope(); final Id entityId = entity.getId(); @@ -65,7 +67,7 @@ public class WriteStart implements Func1<CollectionIoEvent<Entity>, CollectionIo final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, newVersion, Stage.ACTIVE, MvccLogEntry.State.COMPLETE); - MutationBatch write = logStrategy.write( collectionScope, startEntry ); + MutationBatch write = logStrategy.write(applicationScope, collectionScope, startEntry ); final MvccEntityImpl nextStage = new MvccEntityImpl( entityId, newVersion, MvccEntity.Status.COMPLETE, entity ); if(ioEvent.getEvent().hasVersion()) { @@ -73,11 +75,11 @@ public class WriteStart implements Func1<CollectionIoEvent<Entity>, CollectionIo write.execute(); } catch (ConnectionException e) { LOG.error("Failed to execute write ", e); - throw new WriteStartException(nextStage, collectionScope, + throw new WriteStartException(nextStage, applicationScope, collectionScope, "Failed to execute write ", e); } catch (NullPointerException e) { LOG.error("Failed to execute write ", e); - throw new WriteStartException(nextStage, collectionScope, + throw new WriteStartException(nextStage, applicationScope, collectionScope, "Failed to execute write", e); } } @@ -85,7 +87,7 @@ public class WriteStart implements Func1<CollectionIoEvent<Entity>, CollectionIo //create the mvcc entity for the next stage //TODO: we need to create a complete or partial update here (or sooner) - return new CollectionIoEvent<MvccEntity>( collectionScope, nextStage ); + return new CollectionIoEvent<MvccEntity>( applicationScope, collectionScope, nextStage ); } } }
