Expose option to perform the uniqueIndexRepair higher up to the callers. This allows better control of when the repair happens on the write path.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/82428785 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/82428785 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/82428785 Branch: refs/heads/master Commit: 824287853137334cca134efbcfc61e6c797e7e0e Parents: a6c307b Author: Michael Russo <[email protected]> Authored: Wed Jun 29 23:03:02 2016 -0700 Committer: Michael Russo <[email protected]> Committed: Wed Jun 29 23:03:02 2016 -0700 ---------------------------------------------------------------------- .../usergrid/corepersistence/CpEntityManager.java | 9 +++++---- .../usergrid/persistence/EntityManager.java | 4 ++-- .../apache/usergrid/persistence/PathQuery.java | 3 ++- .../impl/EntityCollectionManagerImpl.java | 8 +++++--- .../mvcc/stage/write/WriteUniqueVerify.java | 18 +++++++++++------- .../UniqueValueSerializationStrategy.java | 18 +++++++++++++++++- .../UniqueValueSerializationStrategyImpl.java | 8 ++++++++ ...UniqueValueSerializationStrategyProxyImpl.java | 13 +++++++++++++ .../services/AbstractCollectionService.java | 9 ++++++--- .../services/AbstractConnectionsService.java | 5 ++++- 10 files changed, 73 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/82428785/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index 3dc0d13..1ed05bd 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -853,7 +853,7 @@ public class CpEntityManager implements EntityManager { } @Override - public Entity getUniqueEntityFromAlias(String collectionType, String aliasType, boolean useReadRepair){ + public Entity getUniqueEntityFromAlias(String collectionType, String aliasType, boolean uniqueIndexRepair){ String collName = Schema.defaultCollectionName( collectionType ); String propertyName = Schema.getDefaultSchema().aliasProperty( collName ); @@ -866,7 +866,7 @@ public class CpEntityManager implements EntityManager { StringField uniqueLookupRepairField = new StringField( propertyName, aliasType.toString()); Observable<FieldSet> fieldSetObservable = ecm.getEntitiesFromFields( - Inflector.getInstance().singularize( collectionType ), Arrays.<Field>asList( uniqueLookupRepairField ), useReadRepair); + Inflector.getInstance().singularize( collectionType ), Arrays.<Field>asList( uniqueLookupRepairField ), uniqueIndexRepair); if(fieldSetObservable == null){ @@ -893,14 +893,15 @@ public class CpEntityManager implements EntityManager { } @Override - public UUID getUniqueIdFromAlias( String collectionType, String aliasType ){ + public UUID getUniqueIdFromAlias(String collectionType, String aliasType, boolean uniqueIndexRepair){ String collName = Schema.defaultCollectionName( collectionType ); String propertyName = Schema.getDefaultSchema().aliasProperty( collName ); StringField uniqueLookupRepairField = new StringField( propertyName, aliasType); Observable<FieldSet> fieldSetObservable = ecm.getEntitiesFromFields( - Inflector.getInstance().singularize( collectionType ), Collections.singletonList(uniqueLookupRepairField), true); + Inflector.getInstance().singularize( collectionType ), + Collections.singletonList(uniqueLookupRepairField), uniqueIndexRepair); if(fieldSetObservable == null){ http://git-wip-us.apache.org/repos/asf/usergrid/blob/82428785/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java index 7e25a80..874e618 100644 --- a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java @@ -721,9 +721,9 @@ public interface EntityManager { public void flushManagerCaches(); - public Entity getUniqueEntityFromAlias(String aliasType, String aliasValue, boolean useReadRepair); + public Entity getUniqueEntityFromAlias(String aliasType, String aliasValue, boolean uniqueIndexRepair); - public UUID getUniqueIdFromAlias( String aliasType, String aliasValue ); + public UUID getUniqueIdFromAlias(String aliasType, String aliasValue, boolean uniqueIndexRepair); /** http://git-wip-us.apache.org/repos/asf/usergrid/blob/82428785/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java b/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java index 30636ab..c5833af 100644 --- a/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java +++ b/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java @@ -128,7 +128,8 @@ public class PathQuery<E> { String name = query.getSingleNameOrEmailIdentifier(); String entityType = InflectionUtils.singularize(query.getCollection()); - UUID entityId = em.getUniqueIdFromAlias( entityType, name ); + // don't use unique index repair on read only logic + UUID entityId = em.getUniqueIdFromAlias( entityType, name, false); if( entityId == null){ throw new http://git-wip-us.apache.org/repos/asf/usergrid/blob/82428785/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java index 523b4df..658c68c 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java @@ -325,14 +325,16 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { * Retrieves all entities that correspond to each field given in the Collection. */ @Override - public Observable<FieldSet> getEntitiesFromFields(final String type, final Collection<Field> fields, boolean useReadRepair) { + public Observable<FieldSet> getEntitiesFromFields(final String type, final Collection<Field> fields, + boolean uniqueIndexRepair) { final Observable<FieldSet> fieldSetObservable = Observable.just( fields ).map( fields1 -> { try { final UUID startTime = UUIDGenerator.newTimeUUID(); //Get back set of unique values that correspond to collection of fields - UniqueValueSet set = uniqueValueSerializationStrategy.load( applicationScope, type, fields1 ); + UniqueValueSet set = + uniqueValueSerializationStrategy.load( applicationScope, type, fields1 , uniqueIndexRepair); //Short circuit if we don't have any uniqueValues from the given fields. if ( !set.iterator().hasNext() ) { @@ -407,7 +409,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { response.addEntity( expectedUnique.getField(), entity ); } - if ( useReadRepair && deleteBatch.getRowCount() > 0 ) { + if ( deleteBatch.getRowCount() > 0 ) { deleteBatch.execute(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/82428785/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java index adbe03d..0ebb6a1 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java @@ -122,9 +122,9 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>> try { // loading will retrieve the oldest unique value entry for the field - // purposely enable the read repair here to clean up before we write - UniqueValueSet set = uniqueValueStrat.load(scope, cassandraFig.getReadCL(), - written.getEntityId().getType(), Collections.singletonList(written.getField()), true); + // don't use read repair on this pre-write check + UniqueValueSet set = uniqueValueStrat.load(scope, written.getEntityId().getType(), + Collections.singletonList(written.getField()), false); set.forEach(uniqueValue -> { @@ -143,11 +143,15 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>> throw new RuntimeException("Error connecting to cassandra", e); } - // use TTL in case something goes wrong before entity is finally committed - final MutationBatch mb = uniqueValueStrat.write( scope, written, serializationFig.getTimeout() ); + // only build the batch statement if we don't have a violation for the field + if( preWriteUniquenessViolations.get(field.getName()) != null) { - batch.mergeShallow( mb ); - uniqueFields.add(field); + // use TTL in case something goes wrong before entity is finally committed + final MutationBatch mb = uniqueValueStrat.write(scope, written, serializationFig.getTimeout()); + + batch.mergeShallow(mb); + uniqueFields.add(field); + } } if(preWriteUniquenessViolations.size() > 0 ){ http://git-wip-us.apache.org/repos/asf/usergrid/blob/82428785/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java index 35bb1b8..c6c70b9 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java @@ -70,7 +70,23 @@ public interface UniqueValueSerializationStrategy extends Migration, VersionedDa * * @throws ConnectionException on error connecting to Cassandra */ - UniqueValueSet load( ApplicationScope applicationScope, String type, Collection<Field> fields ) throws ConnectionException; + UniqueValueSet load( ApplicationScope applicationScope, String type, Collection<Field> fields ) + throws ConnectionException; + + /** + * Load UniqueValue that matches field from collection or null if that value does not exist. Returns the oldest + * unique value entry if more than 1 exists + * + * @param applicationScope scope in which to look for field name/value + * @param type The type the unique value exists within + * @param fields Field name/value to search for + * + * @return UniqueValueSet containing fields from the collection that exist in cassandra + * + * @throws ConnectionException on error connecting to Cassandra + */ + UniqueValueSet load( ApplicationScope applicationScope, String type, Collection<Field> fields, + boolean useReadRepair ) throws ConnectionException; /** * Load UniqueValue that matches field from collection or null if that value does not exist. http://git-wip-us.apache.org/repos/asf/usergrid/blob/82428785/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java index db93272..4456123 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java @@ -240,6 +240,14 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey> return load( colScope, ConsistencyLevel.valueOf( cassandraFig.getReadCL() ), type, fields, false); } + @Override + public UniqueValueSet load( final ApplicationScope colScope, final String type, final Collection<Field> fields, + boolean useReadRepair) + throws ConnectionException { + return load( colScope, ConsistencyLevel.valueOf( cassandraFig.getReadCL() ), type, fields, useReadRepair); + } + + @Override public UniqueValueSet load(final ApplicationScope appScope, final ConsistencyLevel consistencyLevel, http://git-wip-us.apache.org/repos/asf/usergrid/blob/82428785/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java index b9c9999..f971b23 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java @@ -113,6 +113,19 @@ public class UniqueValueSerializationStrategyProxyImpl implements UniqueValueSer return migration.to.load( applicationScope, type, fields ); } + @Override + public UniqueValueSet load( final ApplicationScope applicationScope, final String type, + final Collection<Field> fields, boolean useReadRepair ) throws ConnectionException { + + final MigrationRelationship<UniqueValueSerializationStrategy> migration = getMigrationRelationShip(); + + if ( migration.needsMigration() ) { + return migration.from.load( applicationScope, type, fields, useReadRepair ); + } + + return migration.to.load( applicationScope, type, fields, useReadRepair ); + } + @Override public UniqueValueSet load(final ApplicationScope applicationScope, final ConsistencyLevel consistencyLevel, http://git-wip-us.apache.org/repos/asf/usergrid/blob/82428785/stack/services/src/main/java/org/apache/usergrid/services/AbstractCollectionService.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/AbstractCollectionService.java b/stack/services/src/main/java/org/apache/usergrid/services/AbstractCollectionService.java index 1ed7255..8be86ab 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/AbstractCollectionService.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/AbstractCollectionService.java @@ -82,7 +82,7 @@ public class AbstractCollectionService extends AbstractService { nameProperty = "name"; } - Entity entity = em.getUniqueEntityFromAlias( getEntityType(), name, true); + Entity entity = em.getUniqueEntityFromAlias( getEntityType(), name, false); if ( entity != null ) { entity = importEntity( request, entity ); } @@ -172,7 +172,8 @@ public class AbstractCollectionService extends AbstractService { public ServiceResults getItemByName( ServiceContext context, String name ) throws Exception { // just get the UUID and then getItemById such that same results are being returned in both cases - UUID entityId = em.getUniqueIdFromAlias( getEntityType(), name ); + // don't use uniqueIndexRepair on read only logic + UUID entityId = em.getUniqueIdFromAlias( getEntityType(), name, false); if ( entityId == null ) { @@ -302,7 +303,7 @@ public class AbstractCollectionService extends AbstractService { return getItemByName( context, name ); } - // EntityRef ref = em.getAlias( getEntityType(), name ); + // use unique index repair here before any write logic if there are problems Entity entity = em.getUniqueEntityFromAlias( getEntityType(), name, true); if ( entity == null ) { // null entity ref means we tried to put a non-existing entity @@ -516,6 +517,7 @@ public class AbstractCollectionService extends AbstractService { return super.postItemByName( context, name ); } + // use unique index repair here before any write logic if there are problems Entity entity = em.getUniqueEntityFromAlias( getEntityType(), name, true); if ( entity == null ) { throw new ServiceResourceNotFoundException( context ); @@ -570,6 +572,7 @@ public class AbstractCollectionService extends AbstractService { return getItemByName( context, name ); } + // use unique index repair here before any write logic if there are problems Entity entity = em.getUniqueEntityFromAlias( getEntityType(), name, true); if ( entity == null ) { throw new ServiceResourceNotFoundException( context ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/82428785/stack/services/src/main/java/org/apache/usergrid/services/AbstractConnectionsService.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/AbstractConnectionsService.java b/stack/services/src/main/java/org/apache/usergrid/services/AbstractConnectionsService.java index 5f7ca6b..3791644 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/AbstractConnectionsService.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/AbstractConnectionsService.java @@ -271,7 +271,8 @@ public class AbstractConnectionsService extends AbstractService { //TODO T.N. USERGRID-1919 actually validate this is connected - Entity entity = em.getUniqueEntityFromAlias( query.getEntityType(), name, true); + // this is purely read only, don't use unique index repair + Entity entity = em.getUniqueEntityFromAlias( query.getEntityType(), name, false); if ( entity == null ) { return null; } @@ -372,6 +373,7 @@ public class AbstractConnectionsService extends AbstractService { if ( query.containsSingleNameOrEmailIdentifier() ) { String name = query.getSingleNameOrEmailIdentifier(); + // use unique index repair here before any write logic if there are problems entity = em.getUniqueEntityFromAlias( query.getEntityType(), name, true); if ( entity == null ) { throw new ServiceResourceNotFoundException( context ); @@ -529,6 +531,7 @@ public class AbstractConnectionsService extends AbstractService { nameProperty = "name"; } + // use unique index repair here before any write logic if there are problems Entity entity = em.getUniqueEntityFromAlias( query.getEntityType(), name, true); if ( entity == null ) { throw new ServiceResourceNotFoundException( context );
