Fixes to get Akka unique values code running and working in a JUnit test.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/eeb8a60a Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/eeb8a60a Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/eeb8a60a Branch: refs/heads/usergrid-1268-akka-211 Commit: eeb8a60a6a37eed3254583e525e88ef29884da39 Parents: 52ee2fb Author: Dave Johnson <[email protected]> Authored: Wed Apr 13 11:42:53 2016 -0400 Committer: Dave Johnson <[email protected]> Committed: Mon Apr 25 14:34:15 2016 -0400 ---------------------------------------------------------------------- .../collection/guice/CollectionModule.java | 6 +- .../mvcc/stage/write/WriteCommit.java | 49 +++++++- .../mvcc/stage/write/WriteUniqueVerify.java | 51 +++++++- .../collection/uniquevalues/AkkaFig.java | 46 ++++--- .../uniquevalues/ClusterSingletonRouter.java | 6 +- .../uniquevalues/ReservationCache.java | 4 +- .../uniquevalues/UniqueValueActor.java | 122 +++++++++++-------- .../uniquevalues/UniqueValueException.java | 28 ++++- .../uniquevalues/UniqueValuesService.java | 7 +- .../uniquevalues/UniqueValuesServiceImpl.java | 107 ++++++++-------- .../uniquevalues/UniqueValuesTable.java | 17 ++- .../uniquevalues/UniqueValuesTableImpl.java | 47 ++++++- .../mvcc/stage/delete/MarkCommitTest.java | 7 +- .../mvcc/stage/write/WriteCommitTest.java | 4 +- .../mvcc/stage/write/WriteUniqueVerifyTest.java | 2 +- .../uniquevalues/LocalPreventDupsTest.java | 18 ++- .../src/test/resources/usergrid.properties | 13 -- 17 files changed, 358 insertions(+), 176 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/eeb8a60a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java index 3d794d1..45e519e 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java @@ -20,9 +20,7 @@ package org.apache.usergrid.persistence.collection.guice; import java.util.concurrent.ThreadPoolExecutor; -import org.apache.usergrid.persistence.collection.uniquevalues.AkkaFig; -import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesService; -import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesServiceImpl; +import org.apache.usergrid.persistence.collection.uniquevalues.*; import org.safehaus.guicyfig.GuicyFigModule; import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; @@ -72,6 +70,8 @@ public abstract class CollectionModule extends AbstractModule { bind( UniqueValuesService.class ).to( UniqueValuesServiceImpl.class ); + bind( UniqueValuesTable.class ).to( UniqueValuesTableImpl.class ); + bind( ChangeLogGenerator.class).to( ChangeLogGeneratorImpl.class); configureMigrationProvider(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/eeb8a60a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java index 7eb96e7..360e954 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java @@ -18,8 +18,14 @@ package org.apache.usergrid.persistence.collection.mvcc.stage.write; +import java.util.HashMap; +import java.util.Map; import java.util.UUID; +import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException; +import org.apache.usergrid.persistence.collection.uniquevalues.AkkaFig; +import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValueException; +import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,6 +66,9 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect private static final Logger logger = LoggerFactory.getLogger( WriteCommit.class ); + AkkaFig akkaFig; + UniqueValuesService akkaUvService; + @Inject private UniqueValueSerializationStrategy uniqueValueStrat; @@ -71,7 +80,9 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect @Inject public WriteCommit( final MvccLogEntrySerializationStrategy logStrat, final MvccEntitySerializationStrategy entryStrat, - final UniqueValueSerializationStrategy uniqueValueStrat) { + final UniqueValueSerializationStrategy uniqueValueStrat, + final AkkaFig akkaFig, + final UniqueValuesService akkaUvService ) { Preconditions.checkNotNull( logStrat, "MvccLogEntrySerializationStrategy is required" ); Preconditions.checkNotNull( entryStrat, "MvccEntitySerializationStrategy is required" ); @@ -80,12 +91,44 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect this.logEntryStrat = logStrat; this.entityStrat = entryStrat; this.uniqueValueStrat = uniqueValueStrat; + this.akkaFig = akkaFig; + this.akkaUvService = akkaUvService; } @Override public CollectionIoEvent<MvccEntity> call( final CollectionIoEvent<MvccEntity> ioEvent ) { + if ( akkaFig.getAkkaEnabled() ) { + return confirmUniqueFieldsAkka( ioEvent ); + } + return confirmUniqueFields( ioEvent ); + } + + private CollectionIoEvent<MvccEntity> confirmUniqueFieldsAkka(CollectionIoEvent<MvccEntity> ioEvent) { + + final MvccEntity mvccEntity = ioEvent.getEvent(); + MvccValidationUtils.verifyMvccEntityWithEntity( mvccEntity ); + + final Id entityId = mvccEntity.getId(); + final UUID version = mvccEntity.getVersion(); + final ApplicationScope applicationScope = ioEvent.getEntityCollection(); + //set the version into the entity + final Entity entity = mvccEntity.getEntity().get(); + + try { + akkaUvService.confirmUniqueValues( applicationScope, entity, mvccEntity.getVersion() ); + + } catch (UniqueValueException e) { + Map<String, Field> violations = new HashMap<>(); + violations.put( e.getField().getName(), e.getField() ); + throw new WriteUniqueVerifyException( mvccEntity, applicationScope, violations ); + } + + return ioEvent; + } + + private CollectionIoEvent<MvccEntity> confirmUniqueFields(CollectionIoEvent<MvccEntity> ioEvent) { final MvccEntity mvccEntity = ioEvent.getEvent(); MvccValidationUtils.verifyMvccEntityWithEntity( mvccEntity ); @@ -101,7 +144,8 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect MvccValidationUtils.verifyMvccEntityWithEntity( ioEvent.getEvent() ); ValidationUtils.verifyTimeUuid( version ,"version" ); - final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, Stage.COMMITTED, MvccLogEntry.State.COMPLETE ); + final MvccLogEntry startEntry = + new MvccLogEntryImpl( entityId, version, Stage.COMMITTED, MvccLogEntry.State.COMPLETE ); MutationBatch logMutation = logEntryStrat.write( applicationScope, startEntry ); @@ -134,7 +178,6 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect "Failed to execute write asynchronously ", e ); } - return ioEvent; } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/eeb8a60a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java index d05f838..a37abf1 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java @@ -23,6 +23,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.usergrid.persistence.collection.uniquevalues.AkkaFig; +import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValueException; +import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,6 +67,9 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>> private static final Logger logger = LoggerFactory.getLogger( WriteUniqueVerify.class ); + AkkaFig akkaFig; + UniqueValuesService akkaUvService; + private final UniqueValueSerializationStrategy uniqueValueStrat; public static int uniqueVerifyPoolSize = 100; @@ -75,10 +81,17 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>> @Inject - public WriteUniqueVerify( final UniqueValueSerializationStrategy uniqueValueSerializiationStrategy, - final SerializationFig serializationFig, final Keyspace keyspace, final CassandraConfig cassandraFig ) { + public WriteUniqueVerify(final UniqueValueSerializationStrategy uniqueValueSerializiationStrategy, + final SerializationFig serializationFig, + final Keyspace keyspace, + final CassandraConfig cassandraFig, + final AkkaFig akkaFig, + final UniqueValuesService akkaUvService ) { + this.keyspace = keyspace; this.cassandraFig = cassandraFig; + this.akkaFig = akkaFig; + this.akkaUvService = akkaUvService; Preconditions.checkNotNull( uniqueValueSerializiationStrategy, "uniqueValueSerializationStrategy is required" ); Preconditions.checkNotNull( serializationFig, "serializationFig is required" ); @@ -92,6 +105,34 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>> @Override public void call( final CollectionIoEvent<MvccEntity> ioevent ) { + if ( akkaFig.getAkkaEnabled() ) { + verifyUniqueFieldsAkka( ioevent ); + } else { + verifyUniqueFields( ioevent ); + } + } + + private void verifyUniqueFieldsAkka(CollectionIoEvent<MvccEntity> ioevent) { + + MvccValidationUtils.verifyMvccEntityWithEntity( ioevent.getEvent() ); + + final MvccEntity mvccEntity = ioevent.getEvent(); + + final Entity entity = mvccEntity.getEntity().get(); + + final ApplicationScope applicationScope = ioevent.getEntityCollection(); + + try { + akkaUvService.reserveUniqueValues( applicationScope, entity, mvccEntity.getVersion() ); + + } catch (UniqueValueException e) { + Map<String, Field> violations = new HashMap<>(); + violations.put( e.getField().getName(), e.getField() ); + throw new WriteUniqueVerifyException( mvccEntity, applicationScope, violations ); + } + } + + private void verifyUniqueFields(CollectionIoEvent<MvccEntity> ioevent) { MvccValidationUtils.verifyMvccEntityWithEntity( ioevent.getEvent() ); @@ -139,9 +180,10 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>> } // use simple thread pool to verify fields in parallel - ConsistentReplayCommand cmd = new ConsistentReplayCommand(uniqueValueStrat,cassandraFig,scope, entity.getId().getType(), uniqueFields,entity); + ConsistentReplayCommand cmd = new ConsistentReplayCommand( + uniqueValueStrat,cassandraFig,scope, entity.getId().getType(), uniqueFields,entity); - Map<String,Field> uniquenessViolations = cmd.execute(); + Map<String,Field> uniquenessViolations = cmd.execute(); //do we want to do this? @@ -151,6 +193,7 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>> } } + private static class ConsistentReplayCommand extends HystrixCommand<Map<String,Field>>{ private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy; http://git-wip-us.apache.org/repos/asf/usergrid/blob/eeb8a60a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFig.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFig.java index 3bb9fcf..67db3e9 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFig.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFig.java @@ -27,22 +27,33 @@ import org.safehaus.guicyfig.Key; @FigSingleton public interface AkkaFig extends GuicyFig { + String AKKA_ENABLED = "collection.akka.enabled"; + String AKKA_HOSTNAME = "collection.akka.hostname"; String AKKA_PORT = "collection.akka.port"; String AKKA_REGION = "collection.akka.region"; - String AKKA_REGIONS = "collection.akka.regions"; + String AKKA_REGION_SEEDS = "collection.akka.region.seeds"; - String AKKA_UNIQUE_VALUE_ACTORS = "collection.akka.unique.value.actors"; + String AKKA_UNIQUEVALUE_ACTORS = "collection.akka.uniquevalue.actors"; - String AKKA_REGION_SEEDS = "collection.akka.region.seeds"; + String AKKA_UNIQUEVALUE_CACHE_TTL = "collection.akka.uniquevalue.cache.ttl"; - String AKKA_REGION_TYPES = "collection.akka.region.types"; + String AKKA_UNIQUEVALUE_RESERVATION_TTL= "collection.akka.uniquevalue.reservation.ttl"; + + String AKKA_UNIQUEVALUE_REGION_TYPES = "collection.akka.uniquevalue.region.types"; /** + * Use Akka or nah + */ + @Key(AKKA_ENABLED) + @Default("true") + boolean getAkkaEnabled(); + + /** * Hostname to be used in Akka configuration. */ @Key(AKKA_HOSTNAME) @@ -64,16 +75,9 @@ public interface AkkaFig extends GuicyFig { String getRegion(); /** - * Comma-separated list of all regions to be used in Akka configuration. - */ - @Key(AKKA_REGIONS) - @Default("us-east") - String getRegions(); - - /** * Number of UniqueValueActors to be started on each node */ - @Key(AKKA_UNIQUE_VALUE_ACTORS) + @Key(AKKA_UNIQUEVALUE_ACTORS) @Default("300") int getUniqueValueActors(); @@ -89,7 +93,21 @@ public interface AkkaFig extends GuicyFig { * Comma-separated lists of region types each with format {region}:{type} */ // TODO: allow this to be set via REST API - @Key(AKKA_REGION_TYPES) - @Default("") + @Key(AKKA_UNIQUEVALUE_REGION_TYPES) + @Default("us-east:user") String getRegionTypes(); + + /** + * Unique Value cache TTL in seconds. + */ + @Key(AKKA_UNIQUEVALUE_CACHE_TTL) + @Default("5") + int getUniqueValueCacheTtl(); + + /** + * Unique Value Reservation TTL in seconds. + */ + @Key(AKKA_UNIQUEVALUE_RESERVATION_TTL) + @Default("5") + int getUniqueValueReservationTtl(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/eeb8a60a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ClusterSingletonRouter.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ClusterSingletonRouter.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ClusterSingletonRouter.java index 8cd0ab0..d9c1aa4 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ClusterSingletonRouter.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ClusterSingletonRouter.java @@ -15,9 +15,9 @@ public class ClusterSingletonRouter extends UntypedActor { private final ActorRef router; - public ClusterSingletonRouter( String injectorName ) { + public ClusterSingletonRouter( UniqueValuesTable table ) { router = getContext().actorOf( - FromConfig.getInstance().props(Props.create(UniqueValueActor.class, injectorName )), "router"); + FromConfig.getInstance().props(Props.create(UniqueValueActor.class, table )), "router"); } @Override @@ -27,7 +27,7 @@ public class ClusterSingletonRouter extends UntypedActor { UniqueValueActor.Request request = (UniqueValueActor.Request)message; ConsistentHashingRouter.ConsistentHashableEnvelope envelope = - new ConsistentHashingRouter.ConsistentHashableEnvelope( message, request.getRowKey() ); + new ConsistentHashingRouter.ConsistentHashableEnvelope( message, request.getConsistentHashKey() ); router.tell( envelope, getSender()); } else { http://git-wip-us.apache.org/repos/asf/usergrid/blob/eeb8a60a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java index d5c67c3..c0911b0 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java @@ -35,11 +35,11 @@ public class ReservationCache { } public void cacheReservation( UniqueValueActor.Reservation reservation ) { - cache.put( reservation.getRowKey(), reservation ); + cache.put( reservation.getConsistentHashKey(), reservation ); } public void cancelReservation( UniqueValueActor.Cancellation cancellation ) { - cache.invalidate( cancellation.getRowKey() ); + cache.invalidate( cancellation.getConsistentHashKey() ); } public CacheStats getStats() { http://git-wip-us.apache.org/repos/asf/usergrid/blob/eeb8a60a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java index faf0433..ce1d72f 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java @@ -5,6 +5,9 @@ import akka.actor.UntypedActor; import akka.cluster.pubsub.DistributedPubSub; import akka.cluster.pubsub.DistributedPubSubMediator; import org.apache.commons.lang3.RandomStringUtils; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.model.entity.Id; +import org.apache.usergrid.persistence.model.field.Field; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,21 +21,16 @@ public class UniqueValueActor extends UntypedActor { //private MetricsService metricsService; - private UniqueValuesTable table = new UniqueValuesTableImpl(); + final private UniqueValuesTable table; - private ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator(); + final private ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator(); private int count = 0; - public UniqueValueActor( String injectorName ) { + public UniqueValueActor( UniqueValuesTable table ) { + this.table = table; -// UniqueValuesService uniqueValuesService = -// GuiceModule.getInjector( injectorName ).getInstance( UniqueValuesService.class ); -// -// terminateOnError = Boolean.parseBoolean( uniqueValuesService.getProperties() -// .getProperty( "akka.unique-value-actor-terminate-on-error", "false" ) ); -// // chaos = Boolean.parseBoolean( uniqueValuesService.getProperties() // .getProperty( "akka.test.chaos", "false" ) ); @@ -58,20 +56,20 @@ public class UniqueValueActor extends UntypedActor { // final Timer.Context context = metricsService.getReservationTimer().time(); try { - UUID owner = table.lookupOwner( res.getType(), res.getPropertyName(), res.getPropertyValue() ); + Id owner = table.lookupOwner( res.getApplicationScope(), res.getOwner().getType(), res.getField() ); - if ( owner != null && owner.equals( res.getUuid() )) { + if ( owner != null && owner.equals( res.getOwner() )) { // sender already owns this unique value getSender().tell( new Response( Response.Status.IS_UNIQUE ), getSender() ); return; - } else if ( owner != null && !owner.equals( res.getUuid() )) { + } else if ( owner != null && !owner.equals( res.getOwner() )) { // tell sender value is not unique getSender().tell( new Response( Response.Status.NOT_UNIQUE ), getSender() ); return; } - table.reserve( res.getUuid(), res.getType(), res.getPropertyName(), res.getPropertyValue() ); + table.reserve( res.getApplicationScope(), res.getOwner(), res.getOwnerVersion(), res.getField() ); getSender().tell( new Response( Response.Status.IS_UNIQUE ), getSender() ); @@ -89,14 +87,14 @@ public class UniqueValueActor extends UntypedActor { } } else if ( message instanceof Confirmation) { - Confirmation commit = (Confirmation) message; + Confirmation con = (Confirmation) message; // final Timer.Context context = metricsService.getCommitmentTimer().time(); try { - UUID owner = table.lookupOwner( commit.getType(), commit.getPropertyName(), commit.getPropertyValue() ); + Id owner = table.lookupOwner( con.getApplicationScope(), con.getOwner().getType(), con.getField() ); - if ( owner != null && !owner.equals( commit.getUuid() )) { + if ( owner != null && !owner.equals( con.getOwner() )) { // cannot reserve, somebody else owns the unique value getSender().tell( new Response( Response.Status.NOT_UNIQUE ), getSender() ); return; @@ -107,12 +105,12 @@ public class UniqueValueActor extends UntypedActor { return; } - table.commit( commit.getUuid(), commit.getType(), commit.getPropertyName(), commit.getPropertyValue() ); + table.confirm( con.getApplicationScope(), con.getOwner(), con.getOwnerVersion(), con.getField() ); getSender().tell( new Response( Response.Status.IS_UNIQUE ), getSender() ); mediator.tell( new DistributedPubSubMediator.Publish( "content", - new Reservation( commit ) ), getSelf() ); + new Reservation( con ) ), getSelf() ); } catch (Throwable t) { getSender().tell( new Response( Response.Status.ERROR ), getSender() ); @@ -127,9 +125,9 @@ public class UniqueValueActor extends UntypedActor { Cancellation can = (Cancellation) message; try { - UUID owner = table.lookupOwner( can.getType(), can.getPropertyName(), can.getPropertyValue() ); + Id owner = table.lookupOwner( can.getApplicationScope(), can.getOwner().getType(), can.getField() ); - if ( owner != null && !owner.equals( can.getUuid() )) { + if ( owner != null && !owner.equals( can.getOwner() )) { // cannot cancel, somebody else owns the unique value getSender().tell( new Response( Response.Status.NOT_UNIQUE ), getSender() ); return; @@ -140,7 +138,7 @@ public class UniqueValueActor extends UntypedActor { return; } - table.cancel( can.getType(), can.getPropertyName(), can.getPropertyValue() ); + table.confirm( can.getApplicationScope(), can.getOwner(), can.getOwnerVersion(), can.getField() ); getSender().tell( new Response( Response.Status.SUCCESS ), getSender() ); @@ -162,41 +160,57 @@ public class UniqueValueActor extends UntypedActor { * UniqueValue actor receives and processes Requests. */ public abstract static class Request implements Serializable { - final UUID uuid; - final String type; - final String propertyName; - final String propertyValue; - final String rowKey; - - public Request(UUID uuid, String type, String propertyName, String value) { - this.uuid = uuid; - this.type = type; - this.propertyName = propertyName; - this.propertyValue = value; - this.rowKey = getType() + ":" + getPropertyName() + ":" + getPropertyValue(); + final ApplicationScope applicationScope; + final Id owner; + final UUID ownerVersion; + final Field field; + final String consistentHashKey; + + public Request( ApplicationScope applicationScope, Id owner, UUID ownerVersion, Field field ) { + this.applicationScope = applicationScope; + this.owner = owner; + this.ownerVersion = ownerVersion; + this.field = field; + StringBuilder sb = new StringBuilder(); + sb.append( applicationScope.getApplication() ); + sb.append(":"); + sb.append( owner.getType() ); + sb.append(":"); + sb.append( field.getName() ); + sb.append(":"); + sb.append( field.getValue().toString() ); + this.consistentHashKey = sb.toString(); } public Request( Request req ) { - this.uuid = req.uuid; - this.type = req.type; - this.propertyName = req.propertyName; - this.propertyValue = req.propertyValue; - this.rowKey = getType() + ":" + getPropertyName() + ":" + getPropertyValue(); + this.applicationScope = req.applicationScope; + this.owner = req.owner; + this.ownerVersion = req.ownerVersion; + this.field = req.field; + StringBuilder sb = new StringBuilder(); + sb.append( req.applicationScope.getApplication() ); + sb.append(":"); + sb.append( req.owner.getType() ); + sb.append(":"); + sb.append( req.field.getName() ); + sb.append(":"); + sb.append( req.field.getValue().toString() ); + this.consistentHashKey = sb.toString(); } - public String getRowKey() { - return rowKey; + public ApplicationScope getApplicationScope() { + return applicationScope; } - public UUID getUuid() { - return uuid; + public Id getOwner() { + return owner; } - public String getType() { - return type; + public Field getField() { + return field; } - public String getPropertyName() { - return propertyName; + public String getConsistentHashKey() { + return consistentHashKey; } - public String getPropertyValue() { - return propertyValue; + public UUID getOwnerVersion() { + return ownerVersion; } } @@ -219,8 +233,8 @@ public class UniqueValueActor extends UntypedActor { public Reservation( Request req ) { super( req ); } - public Reservation(UUID uuid, String type, String username, String value) { - super( uuid, type, username, value ); + public Reservation( ApplicationScope applicationScope, Id owner, UUID ownerVersion, Field field) { + super( applicationScope, owner, ownerVersion, field ); } } @@ -228,8 +242,8 @@ public class UniqueValueActor extends UntypedActor { public Cancellation( Request req ) { super( req ); } - public Cancellation(UUID uuid, String type, String username, String value) { - super( uuid, type, username, value ); + public Cancellation( ApplicationScope applicationScope, Id owner, UUID ownerVersion, Field field) { + super( applicationScope, owner, ownerVersion, field ); } } @@ -237,8 +251,8 @@ public class UniqueValueActor extends UntypedActor { public Confirmation(Request req ) { super( req ); } - public Confirmation(UUID uuid, String type, String username, String value) { - super( uuid, type, username, value ); + public Confirmation( ApplicationScope applicationScope, Id owner, UUID ownerVersion, Field field) { + super( applicationScope, owner, ownerVersion, field ); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/eeb8a60a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueException.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueException.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueException.java index 5df8237..5ecfb68 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueException.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueException.java @@ -1,7 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. The ASF licenses this file to You + * under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. For additional information regarding + * copyright in this work, please see the NOTICE file in the top level + * directory of this distribution. + */ package org.apache.usergrid.persistence.collection.uniquevalues; +import org.apache.usergrid.persistence.model.field.Field; + public class UniqueValueException extends Exception { - public UniqueValueException(String message) { + final Field field; + + public UniqueValueException(String message, Field field ) { super( message ); + this.field = field; + } + + public Field getField() { + return field; } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/eeb8a60a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesService.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesService.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesService.java index 2219df6..7ebab15 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesService.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesService.java @@ -19,8 +19,11 @@ package org.apache.usergrid.persistence.collection.uniquevalues; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.model.entity.Entity; +import java.util.UUID; + /** * Service that reserves and confirms unique values. */ @@ -30,12 +33,12 @@ public interface UniqueValuesService { * Check that unique values are unique and reserve them for a limited time. * If the reservations are not confirmed, they will expire. */ - void reserveUniqueValues( Entity entity ) throws UniqueValueException; + void reserveUniqueValues( ApplicationScope scope, Entity entity, UUID version ) throws UniqueValueException; /** * Confirm unique values that were reserved earlier. */ - void confirmUniqueValues( Entity entity ) throws UniqueValueException; + void confirmUniqueValues( ApplicationScope scope, Entity entity, UUID version ) throws UniqueValueException; /** * For test purposes only. http://git-wip-us.apache.org/repos/asf/usergrid/blob/eeb8a60a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java index 8897091..45909b8 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java @@ -31,9 +31,11 @@ import akka.util.Timeout; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ListMultimap; import com.google.inject.Inject; +import com.google.inject.Singleton; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import org.apache.commons.lang3.StringUtils; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.field.Field; import org.slf4j.Logger; @@ -45,12 +47,17 @@ import java.util.*; import java.util.concurrent.TimeUnit; +@Singleton public class UniqueValuesServiceImpl implements UniqueValuesService { private static final Logger logger = LoggerFactory.getLogger( UniqueValuesServiceImpl.class ); @Inject AkkaFig akkaFig; + @Inject + UniqueValuesTable table; + + private String hostname; private Integer port; private String currentRegion; @@ -92,6 +99,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { waitForRequestActors(); } + /** * For testing purposes only; does not wait for request actors to start. */ @@ -103,10 +111,12 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { initAkka(); } + private Map<String, ActorRef> getRequestActorsByRegion() { return requestActorsByRegion; } + private Map<String, String> getRegionsByType() { return regionsByType; } @@ -148,24 +158,13 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { throw new RuntimeException( "No value specified for akka.region"); } - String regionsValue = akkaFig.getRegions(); - if ( StringUtils.isEmpty( regionsValue )) { - throw new RuntimeException( "No value specified for akka.regions"); - } - - String[] regions = regionsValue.split( "," ); - for ( String region : regions ) { - - akkaFig.getKeyByMethod( "" ); - - String typesValue = akkaFig.getRegionTypes(); - String[] regionTypes = StringUtils.isEmpty( typesValue ) ? new String[0] : typesValue.split(","); - for ( String regionType : regionTypes ) { - String[] parts = regionType.split(":"); - String typeRegion = parts[0]; - String type = parts[1]; - this.regionsByType.put( type, typeRegion ); - } + String typesValue = akkaFig.getRegionTypes(); + String[] regionTypes = StringUtils.isEmpty( typesValue ) ? new String[0] : typesValue.split(","); + for ( String regionType : regionTypes ) { + String[] parts = regionType.split(":"); + String typeRegion = parts[0]; + String type = parts[1]; + this.regionsByType.put( type, typeRegion ); } final Map<String, ActorSystem> systemMap = new HashMap<>(); @@ -177,6 +176,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { subscribeToReservations( localSystem, systemMap ); } + private void subscribeToReservations( ActorSystem localSystem, Map<String, ActorSystem> systemMap ) { for ( String region : systemMap.keySet() ) { @@ -188,6 +188,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { } } + /** * Create ActorSystem and ClusterSingletonProxy for every region. * Create ClusterSingletonManager for the current region. @@ -217,7 +218,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { ClusterSingletonManagerSettings settings = ClusterSingletonManagerSettings.create( system ).withRole("io"); system.actorOf( ClusterSingletonManager.props( - Props.create( ClusterSingletonRouter.class, region ), + Props.create( ClusterSingletonRouter.class, table ), PoisonPill.getInstance(), settings ), "uvRouter"); } @@ -230,6 +231,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { return localSystem; } + /** * Create RequestActor for each region. * @@ -249,6 +251,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { } } + public void waitForRequestActors() { for ( String region : requestActorsByRegion.keySet() ) { @@ -257,6 +260,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { } } + private void waitForRequestActor( ActorRef ra ) { logger.info( "Waiting on request actor {}...", ra.path() ); @@ -339,7 +343,8 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { } if (seedsByRegion.keySet().isEmpty()) { - throw new RuntimeException( "No seeds listed in 'parsing collection.akka.region.seeds' property." ); + throw new RuntimeException( + "No seeds listed in 'parsing collection.akka.region.seeds' property." ); } } @@ -413,14 +418,13 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { } - @Override - public void reserveUniqueValues(Entity entity) throws UniqueValueException { + public void reserveUniqueValues( ApplicationScope scope, Entity entity, UUID version) throws UniqueValueException { try { for (Field field : entity.getFields()) { if (field.isUnique()) { - reserveUniqueField( entity, field.getName(), field.getValue().toString() ); + reserveUniqueField( scope, entity, version, field ); } } @@ -428,7 +432,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { for (Field field : entity.getFields()) { try { - cancelUniqueField( entity, field.getName(), field.getValue().toString() ); + cancelUniqueField( scope, entity, version, field ); } catch (Throwable ignored) { logger.debug( "Error canceling unique field", ignored ); } @@ -440,12 +444,12 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { @Override - public void confirmUniqueValues(Entity entity) throws UniqueValueException { + public void confirmUniqueValues( ApplicationScope scope, Entity entity, UUID version ) throws UniqueValueException { try { for (Field field : entity.getFields()) { if (field.isUnique()) { - confirmUniqueField( entity, field.getName(), field.getValue().toString() ); + confirmUniqueField( scope, entity, version, field ); } } @@ -453,7 +457,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { for (Field field : entity.getFields()) { try { - cancelUniqueField( entity, field.getName(), field.getValue().toString() ); + cancelUniqueField( scope, entity, version, field ); } catch (Throwable ignored) { logger.debug( "Error canceling unique field", ignored ); } @@ -465,9 +469,9 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { private void reserveUniqueField( - Entity entity, String propertyName, String propertyValue ) throws UniqueValueException { + ApplicationScope scope, Entity entity, UUID version, Field field ) throws UniqueValueException { - String region = getRegionsByType().get("user"); + String region = getRegionsByType().get( entity.getId().getType() ); ActorRef requestActor = getRequestActorsByRegion().get(region); if ( requestActor == null ) { @@ -475,23 +479,24 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { } UniqueValueActor.Request request = new UniqueValueActor.Reservation( - entity.getId().getUuid(), "user", propertyName, propertyValue ); + scope, entity.getId(), version, field ); - UniqueValueActor.Reservation res = reservationCache.get( request.getRowKey() ); + UniqueValueActor.Reservation res = reservationCache.get( request.getConsistentHashKey() ); // if ( res != null ) { // getCacheCounter().inc(); // } - if ( res != null && !res.getUuid().equals( request.getUuid() )) { - throw new UniqueValueException( "Error property not unique (cache)" ); + if ( res != null && !res.getOwner().equals( request.getOwner() )) { + throw new UniqueValueException( "Error property not unique (cache)", field); } - sendUniqueValueRequest( entity, requestActor, request ); + sendUniqueValueRequest( scope, entity, field, requestActor, request ); } + private void confirmUniqueField( - Entity entity, String propertyName, String propertyValue ) throws UniqueValueException { + ApplicationScope scope, Entity entity, UUID version, Field field ) throws UniqueValueException { - String region = getRegionsByType().get("user"); + String region = getRegionsByType().get( entity.getId().getType() ); ActorRef requestActor = getRequestActorsByRegion().get(region); if ( requestActor == null ) { @@ -499,26 +504,27 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { } UniqueValueActor.Confirmation request = new UniqueValueActor.Confirmation( - entity.getId().getUuid(), "user", propertyName, propertyValue ); + scope, entity.getId(), version, field ); - sendUniqueValueRequest( entity, requestActor, request ); + sendUniqueValueRequest( scope, entity, field, requestActor, request ); } - private void cancelUniqueField( - Entity entity, String propertyName, String propertyValue ) throws UniqueValueException { - ActorRef requestActor = lookupRequestActorForType( "user" ); + private void cancelUniqueField( ApplicationScope scope, Entity entity, UUID version, Field field ) throws UniqueValueException { + + ActorRef requestActor = lookupRequestActorForType( entity.getId().getType() ); if ( requestActor == null ) { throw new RuntimeException( "No request actor for type, cannot verify unique fields!" ); } UniqueValueActor.Confirmation request = new UniqueValueActor.Confirmation( - entity.getId().getUuid(), "user", propertyName, propertyValue ); + scope, entity.getId(), version, field ); requestActor.tell( request, null ); } + private ActorRef lookupRequestActorForType( String type ) { String region = getRegionsByType().get( type ); if ( region == null ) { @@ -531,8 +537,9 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { return requestActor; } - private void sendUniqueValueRequest( - Entity entity, ActorRef requestActor, UniqueValueActor.Request request) throws UniqueValueException { + + private void sendUniqueValueRequest( ApplicationScope scope, Entity entity, Field field, + ActorRef requestActor, UniqueValueActor.Request request) throws UniqueValueException { int maxRetries = 5; int retries = 0; @@ -552,27 +559,27 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { || response.getStatus().equals( UniqueValueActor.Response.Status.NOT_UNIQUE ))) { if ( retries > 1 ) { logger.debug("IS_UNIQUE after retrying {} for entity {} rowkey {}", - retries, entity.getId().getUuid(), request.getRowKey()); + retries, entity.getId().getUuid(), request.getConsistentHashKey()); } break; } else if ( response != null ) { logger.debug("ERROR status retrying {} entity {} rowkey {}", - retries, entity.getId().getUuid(), request.getRowKey()); + retries, entity.getId().getUuid(), request.getConsistentHashKey()); } else { logger.debug("Timed-out retrying {} entity {} rowkey", - retries, entity.getId().getUuid(), request.getRowKey()); + retries, entity.getId().getUuid(), request.getConsistentHashKey()); } } catch ( Exception e ) { logger.debug("{} caused retry {} for entity {} rowkey {}", - e.getClass().getSimpleName(), retries, entity.getId().getUuid(), request.getRowKey()); + e.getClass().getSimpleName(), retries, entity.getId().getUuid(), request.getConsistentHashKey()); } } if ( response == null || response.getStatus().equals( UniqueValueActor.Response.Status.ERROR )) { logger.debug("ERROR after retrying {} for entity {} rowkey {}", - retries, entity.getId().getUuid(), request.getRowKey()); + retries, entity.getId().getUuid(), request.getConsistentHashKey()); // should result in an HTTP 503 throw new RuntimeException( "Error verifying unique value after " + retries + " retries"); @@ -581,7 +588,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { if ( response.getStatus().equals( UniqueValueActor.Response.Status.NOT_UNIQUE )) { // should result in an HTTP 409 (conflict) - throw new UniqueValueException( "Error property not unique" ); + throw new UniqueValueException( "Error property not unique", field ); } } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/eeb8a60a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTable.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTable.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTable.java index 4309eb0..0e69ef7 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTable.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTable.java @@ -18,16 +18,25 @@ */ package org.apache.usergrid.persistence.collection.uniquevalues; +import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.model.entity.Id; +import org.apache.usergrid.persistence.model.field.Field; + import java.util.UUID; public interface UniqueValuesTable { - UUID lookupOwner(String entityType, String propertyName, String propertyValue); + Id lookupOwner( + ApplicationScope applicationScope, String type, Field field ) throws ConnectionException; - void reserve(UUID owner, String entityType, String propertyName, String propertyValue); + void reserve( + ApplicationScope applicationScope, Id owner, UUID version, Field field ) throws ConnectionException; - void commit(UUID owner, String entityType, String propertyName, String propertyValue); + void confirm( + ApplicationScope applicationScope, Id owner, UUID version, Field field ) throws ConnectionException; - void cancel(String entityType, String propertyName, String propertyValue); + void cancel( + ApplicationScope applicationScope, Id owner, UUID version, Field field ) throws ConnectionException; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/eeb8a60a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java index ee3d621..8ba7cbd 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java @@ -18,29 +18,66 @@ */ package org.apache.usergrid.persistence.collection.uniquevalues; +import com.google.inject.Inject; +import com.netflix.astyanax.MutationBatch; +import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; +import org.apache.usergrid.persistence.collection.serialization.UniqueValue; +import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy; +import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet; +import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.model.entity.Id; +import org.apache.usergrid.persistence.model.field.Field; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; import java.util.UUID; public class UniqueValuesTableImpl implements UniqueValuesTable { private static final Logger logger = LoggerFactory.getLogger( UniqueValuesTableImpl.class ); + final UniqueValueSerializationStrategy strat; + final AkkaFig akkaFig; + + @Inject + public UniqueValuesTableImpl( final UniqueValueSerializationStrategy strat, AkkaFig akkaFig ) { + this.strat = strat; + this.akkaFig = akkaFig; + } + + @Override - public UUID lookupOwner(String entityType, String propertyName, String propertyValue) { - return null; + public Id lookupOwner( ApplicationScope scope, String type, Field field) throws ConnectionException { + + UniqueValueSet set = strat.load( scope, type, Collections.singletonList( field ) ); + UniqueValue uv = set.getValue( field.getName() ); + return uv == null ? null : uv.getEntityId(); } @Override - public void reserve(UUID owner, String entityType, String propertyName, String propertyValue) { + public void reserve( ApplicationScope scope, Id owner, UUID version, Field field ) throws ConnectionException { + + UniqueValue uv = new UniqueValueImpl( field, owner, version); + final MutationBatch write = strat.write( scope, uv, akkaFig.getUniqueValueReservationTtl() ); + write.execute(); } @Override - public void commit(UUID owner, String entityType, String propertyName, String propertyValue) { + public void confirm( ApplicationScope scope, Id owner, UUID version, Field field) throws ConnectionException { + + UniqueValue uv = new UniqueValueImpl( field, owner, version); + final MutationBatch write = strat.write( scope, uv ); + write.execute(); + } @Override - public void cancel(String entityType, String propertyName, String propertyValue) { + public void cancel( ApplicationScope scope, Id owner, UUID version, Field field) throws ConnectionException { + + UniqueValue uv = new UniqueValueImpl( field, owner, version ); + final MutationBatch write = strat.delete( scope, uv ); + write.execute(); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/eeb8a60a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java index ad6eac6..e7cee21 100644 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java @@ -71,13 +71,12 @@ public class MarkCommitTest extends AbstractMvccEntityStageTest { //run the stage - WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy ); + WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null ); //verify the observable is correct - Entity result = newStage.call( new CollectionIoEvent<MvccEntity>( context, mvccEntityInput ) ).getEvent().getEntity().get(); - - + Entity result = newStage.call( + new CollectionIoEvent<MvccEntity>( context, mvccEntityInput ) ).getEvent().getEntity().get(); //verify the log entry is correct http://git-wip-us.apache.org/repos/asf/usergrid/blob/eeb8a60a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java index 58642d3..8665ee9 100644 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java @@ -84,7 +84,7 @@ public class WriteCommitTest extends AbstractMvccEntityStageTest { //run the stage - WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy ); + WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null ); Entity result = newStage.call( new CollectionIoEvent<MvccEntity>( context, mvccEntityInput ) ).getEvent().getEntity().get(); @@ -131,7 +131,7 @@ public class WriteCommitTest extends AbstractMvccEntityStageTest { when( mvccEntityStrategy.write( any( ApplicationScope.class ), any( MvccEntity.class ) ) ) .thenReturn( entityMutation ); - new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy ).call( event ); + new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null ).call( event ); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/eeb8a60a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java index b9a1565..1b0c7e4 100644 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java @@ -82,7 +82,7 @@ public class WriteUniqueVerifyTest { final MvccEntity mvccEntity = fromEntity( entity ); // run the stage - WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig, keyspace,cassandraConfig ); + WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig, keyspace,cassandraConfig, null, null ); newStage.call( new CollectionIoEvent<>( collectionScope, mvccEntity ) ) ; http://git-wip-us.apache.org/repos/asf/usergrid/blob/eeb8a60a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/uniquevalues/LocalPreventDupsTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/uniquevalues/LocalPreventDupsTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/uniquevalues/LocalPreventDupsTest.java index bee47eb..ef5c16f 100644 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/uniquevalues/LocalPreventDupsTest.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/uniquevalues/LocalPreventDupsTest.java @@ -6,6 +6,7 @@ import com.google.common.collect.Multimaps; import com.google.inject.Inject; import org.apache.usergrid.persistence.collection.EntityCollectionManager; import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; +import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException; import org.apache.usergrid.persistence.collection.guice.TestCollectionModule; import org.apache.usergrid.persistence.core.guice.MigrationManagerRule; import org.apache.usergrid.persistence.core.scope.ApplicationScope; @@ -44,6 +45,9 @@ public class LocalPreventDupsTest { @Rule public MigrationManagerRule migrationManagerRule; + @Inject + UniqueValuesService uniqueValuesService; + private static final AtomicInteger successCounter = new AtomicInteger( 0 ); private static final AtomicInteger errorCounter = new AtomicInteger( 0 ); @@ -51,16 +55,8 @@ public class LocalPreventDupsTest { @Test public void testBasicOperation() throws Exception { - UniqueValuesService appEast1 = - TestCollectionModule.getInjector( "us-east" ).getInstance( UniqueValuesService.class ); - appEast1.start("127.0.0.1", 2551, "us-east"); - - UniqueValuesService appEast2 = - TestCollectionModule.getInjector( "us-east" ).getInstance( UniqueValuesService.class ); - appEast2.start("127.0.0.1", 2552, "us-east"); - - appEast1.waitForRequestActors(); - appEast2.waitForRequestActors(); + uniqueValuesService.start("127.0.0.1", 2551, "us-east"); + uniqueValuesService.waitForRequestActors(); int numUsers = 100; Multimap<String, Entity> usersCreated = generateDuplicateUsers( numUsers ); @@ -115,7 +111,7 @@ public class LocalPreventDupsTest { logger.debug("Created user {}", username); } catch ( Throwable t ) { - if ( t instanceof UniqueValueException ) { + if ( t instanceof WriteUniqueVerifyException) { // we expect lots of these } else { errorCounter.incrementAndGet(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/eeb8a60a/stack/corepersistence/collection/src/test/resources/usergrid.properties ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/resources/usergrid.properties b/stack/corepersistence/collection/src/test/resources/usergrid.properties index 9059f0e..8de5c27 100644 --- a/stack/corepersistence/collection/src/test/resources/usergrid.properties +++ b/stack/corepersistence/collection/src/test/resources/usergrid.properties @@ -1,16 +1,3 @@ # This property is required to be set and cannot be defaulted anywhere usergrid.cluster_name=usergrid -collection.akka.hostname=localhost - -collection.akka.port=2551 - -collection.akka.region=us-east - -collection.akka.regions=us-east - -collection.akka.region.seeds=us-east:localhost:2551,us-east:localhost:2552 - -collection.akka.region.types=us-east:users,us-east:cats - -collection.akka.unique.value.actors=400
