Simplify and rename Actorsystem configuration properties to be more generic, e.g. starting with "usergrid.cluster" instead of "collection.akka"
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f0c9fd4b Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f0c9fd4b Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f0c9fd4b Branch: refs/heads/master Commit: f0c9fd4bd91a271ee1e9a93a6fa70bf69159f7db Parents: 2d5ad05 Author: Dave Johnson <[email protected]> Authored: Fri Jul 1 11:09:37 2016 -0400 Committer: Dave Johnson <[email protected]> Committed: Fri Jul 1 11:09:37 2016 -0400 ---------------------------------------------------------------------- .../main/resources/usergrid-default.properties | 56 ++--- .../corepersistence/CpEntityManagerFactory.java | 2 +- .../persistence/actorsystem/ActorSystemFig.java | 60 ++--- .../actorsystem/ActorSystemManagerImpl.java | 53 ++--- .../mvcc/stage/write/WriteCommit.java | 10 +- .../mvcc/stage/write/WriteUniqueVerify.java | 10 +- .../uniquevalues/UniqueValuesFig.java | 28 +-- .../mvcc/stage/delete/MarkCommitTest.java | 2 +- .../mvcc/stage/write/WriteCommitTest.java | 8 +- .../mvcc/stage/write/WriteUniqueVerifyTest.java | 2 +- .../org/apache/usergrid/rest/UniqueCatsIT.java | 237 +++++++++++++++++++ 11 files changed, 344 insertions(+), 124 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/f0c9fd4b/stack/config/src/main/resources/usergrid-default.properties ---------------------------------------------------------------------- diff --git a/stack/config/src/main/resources/usergrid-default.properties b/stack/config/src/main/resources/usergrid-default.properties index 29b8d36..fe70569 100644 --- a/stack/config/src/main/resources/usergrid-default.properties +++ b/stack/config/src/main/resources/usergrid-default.properties @@ -410,60 +410,44 @@ usergrid.queue.lock.timeout=5 #usergrid.queue.publish.queuesize=850000 -######################### Akka Actor System Configiuration ################### +######################### Usergrid Cluster Configuration ################### # -# Usergrid includes Akka, an Actor-based system that allows for the -# distribution of work across multiple Usergrid instances and multiple regions. -# -# All properties are required. If Akka is enabled then all properties in this -# section MUST be specified. -# -# For more information: https://issues.apache.org/jira/browse/USERGRID-1268 +# Usergrid includes a multi-region clustering system. +# To user it you must specify your region, the list of regions and seeds for each region. # -# Currently, Akka is disable and not required for Usergrid -collection.akka.enabled=false +# This is an experimentation new feature, disabled by default +usergrid.cluster.enabled=false -# host name of this machine -collection.akka.hostname=localhost +# Comma-separated list of regions to be considered +usergrid.cluster.region.list=default -# The region of this Usergrid installation -# Region MUST be in the region list specified in the 'usergrid.queue.regionList' property -collection.akka.region= +# The regions of this local instance of Usergrid +usergrid.cluster.region.local=default -# Comma-separated lists of Akka seeds each with format {region}:{hostname}:{port}. -# All regions MUST be listed in the 'usergrid.queue.regionList' -collection.akka.region.seeds= - -# The default authoritative region for when is not specified elsewhere -# Region MUST be in the region list specified in the 'usergrid.queue.regionList' property -collection.akka.authoritative.region= +# Comma-separated lists of cluster seeds each with format {region}:{hostname} +usergrid.cluster.seeds=default:localhost -# Default number of Akka actors to start per instance / router producer -collection.akka.instances-per-node=300 +# Port used for cluster communications. +usergrid.cluster.port=2551 ######################### Usergrid Unique Values Validation ################## # -# Usergrid includes a distributed unique values validation that ensure that -# unique values rename unique across a distributed and multi-region system. -# This system is based on the Akka actor system and requires some additional -# configuration. -# -# The system uses consistent hashing to ensure that one single-threaded actor -# ever accesses a unique value record at one time. -# -# For more information: https://issues.apache.org/jira/browse/USERGRID-1268 +# These only apply if the above Usergrid cluster system is enabled. # # The number of unique value actors to start on each Usergrid instance. -collection.akka.uniquevalue.actors=300 +collection.uniquevalues.actors=300 # TTL of unique value reservation in in-memory cache -collection.akka.uniquevalue.cache.ttl=10 +collection.uniquevalues.cache.ttl=10 # TTL of a unique value reservation when written to Cassandra -collection.akka.uniquevalue.reservation.ttl=10 +collection.uniquevalues.reservation.ttl=10 + +# The default authoritative region for when is not specified elsewhere +collection.uniquevalues.authoritative.region=default ############################## Usergrid Scheduler ########################### http://git-wip-us.apache.org/repos/asf/usergrid/blob/f0c9fd4b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java index eca5927..e70a6fd 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java @@ -142,7 +142,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application logger.info("EntityManagerFactoring starting..."); - if ( actorSystemFig.getAkkaEnabled() ) { + if ( actorSystemFig.getEnabled() ) { try { logger.info("Akka cluster starting..."); http://git-wip-us.apache.org/repos/asf/usergrid/blob/f0c9fd4b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java index ec010d0..5d7b6aa 100644 --- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java +++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java @@ -30,64 +30,54 @@ import java.io.Serializable; @FigSingleton public interface ActorSystemFig extends GuicyFig, Serializable { - String AKKA_ENABLED = "collection.akka.enabled"; + String CLUSTER_ENABLED = "usergrid.cluster.enabled"; - String AKKA_HOSTNAME = "collection.akka.hostname"; + String CLUSTER_REGIONS_LIST = "usergrid.cluster.region.list"; - String AKKA_REGION = "collection.akka.region"; + String CLUSTER_REGIONS_LOCAL = "usergrid.cluster.region.local"; - String AKKA_REGION_LIST = "usergrid.queue.regionList"; // same region list used by queues + String CLUSTER_SEEDS = "usergrid.cluster.seeds"; - String AKKA_REGION_SEEDS = "collection.akka.region.seeds"; - - String AKKA_AUTHORITATIVE_REGION = "collection.akka.authoritative.region"; - - String AKKA_INSTANCES_PER_NODE = "collection.akka.instances-per-node"; + String CLUSTER_PORT = "usergrid.cluster.port"; /** - * Use Akka or nah + * Use Cluster or nah */ - @Key(AKKA_ENABLED) + @Key(CLUSTER_ENABLED) @Default("true") - boolean getAkkaEnabled(); - - /** - * Hostname to be used in Akka configuration. - */ - @Key(AKKA_HOSTNAME) - String getHostname(); + boolean getEnabled(); /** * Local region to be used in Akka configuration. */ - @Key(AKKA_REGION) - String getRegion(); + @Key(CLUSTER_REGIONS_LOCAL) + @Default("default") + String getRegionLocal(); /** * Comma separated list of regions known to cluster. */ - @Key(AKKA_REGION_LIST) - String getRegionList(); + @Key(CLUSTER_REGIONS_LIST) + @Default("default") + String getRegionsList(); /** - * Comma-separated lists of seeds each with format {region}:{hostname}:{port}. - * Regions MUST be listed in the 'usergrid.queue.regionList' + * Comma-separated lists of seeds each with format {region}:{hostname} */ - @Key(AKKA_REGION_SEEDS) - String getRegionSeeds(); + @Key(CLUSTER_SEEDS) + @Default("default:localhost") + String getSeeds(); /** - * If no region specified for type, use the authoritative region + * Port for cluster comms. */ - @Key(AKKA_AUTHORITATIVE_REGION) - String getAkkaAuthoritativeRegion(); + @Key(CLUSTER_PORT) + @Default("2551") + String getPort(); - /** - * Number of actor instances to create on each node for each router. - */ - @Key(AKKA_INSTANCES_PER_NODE) - @Default("300") - int getInstancesPerNode(); + @Key("usergrid.cluster.hostname") + @Default("") + String getHostname(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/f0c9fd4b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java index 05f837d..a79f447 100644 --- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java +++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java @@ -39,6 +39,8 @@ import org.slf4j.LoggerFactory; import scala.concurrent.Await; import scala.concurrent.Future; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.*; import java.util.concurrent.TimeUnit; @@ -77,8 +79,17 @@ public class ActorSystemManagerImpl implements ActorSystemManager { @Override public void start() { - this.hostname = actorSystemFig.getHostname(); - this.currentRegion = actorSystemFig.getRegion(); + if ( !StringUtils.isEmpty( actorSystemFig.getHostname()) ) { + this.hostname = actorSystemFig.getHostname(); + } else { + try { + this.hostname = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + logger.error("Cannot get hostname, defaulting to 'localhost': " + e.getMessage()); + } + } + + this.currentRegion = actorSystemFig.getRegionLocal(); this.port = null; initAkka(); @@ -155,32 +166,22 @@ public class ActorSystemManagerImpl implements ActorSystemManager { // Create one actor system with request actor for each region - if ( StringUtils.isEmpty( hostname )) { - throw new RuntimeException( "No value specified for " + ActorSystemFig.AKKA_HOSTNAME ); - } - if ( StringUtils.isEmpty( currentRegion )) { - throw new RuntimeException( "No value specified for " + ActorSystemFig.AKKA_REGION ); + throw new RuntimeException( "No value specified for " + ActorSystemFig.CLUSTER_REGIONS_LOCAL ); } - if ( StringUtils.isEmpty( actorSystemFig.getRegionList() )) { - throw new RuntimeException( "No value specified for " + ActorSystemFig.AKKA_REGION_LIST ); + if ( StringUtils.isEmpty( actorSystemFig.getRegionsList() )) { + throw new RuntimeException( "No value specified for " + ActorSystemFig.CLUSTER_REGIONS_LIST ); } - if ( StringUtils.isEmpty( actorSystemFig.getRegionSeeds() )) { - throw new RuntimeException( "No value specified for " + ActorSystemFig.AKKA_REGION_SEEDS); + if ( StringUtils.isEmpty( actorSystemFig.getSeeds() )) { + throw new RuntimeException( "No value specified for " + ActorSystemFig.CLUSTER_SEEDS ); } - if ( StringUtils.isEmpty( actorSystemFig.getAkkaAuthoritativeRegion() )) { - logger.warn("No value for {} specified, will use current region as authoriative region", - ActorSystemFig.AKKA_AUTHORITATIVE_REGION); - //throw new RuntimeException( "No value specified for " + ActorSystemFig.AKKA_AUTHORITATIVE_REGION); - } - - List regionList = Arrays.asList( actorSystemFig.getRegionList().toLowerCase().split(",") ); + List regionList = Arrays.asList( actorSystemFig.getRegionsList().toLowerCase().split(",") ); logger.info("Initializing Akka for hostname {} region {} regionList {} seeds {}", - hostname, currentRegion, regionList, actorSystemFig.getRegionSeeds() ); + hostname, currentRegion, regionList, actorSystemFig.getSeeds() ); Config config = readClusterSystemConfig(); @@ -205,7 +206,7 @@ public class ActorSystemManagerImpl implements ActorSystemManager { seedsByRegion = ArrayListMultimap.create(); - String[] regionSeeds = actorSystemFig.getRegionSeeds().split( "," ); + String[] regionSeeds = actorSystemFig.getSeeds().split( "," ); logger.info( "Found region {} seeds {}", regionSeeds.length, regionSeeds ); @@ -226,7 +227,8 @@ public class ActorSystemManagerImpl implements ActorSystemManager { String[] parts = regionSeed.split( ":" ); String region = parts[0]; String hostname = parts[1]; - String regionPortString = parts[2]; + + String regionPortString = parts.length > 2 ? parts[2] : actorSystemFig.getPort(); // all seeds in same region must use same port // we assume 0th seed has the right port @@ -269,7 +271,7 @@ public class ActorSystemManagerImpl implements ActorSystemManager { try { - int numInstancesPerNode = actorSystemFig.getInstancesPerNode(); + int numInstancesPerNode = 300; // expect this to be overridden by RouterProducers String region = currentRegion; @@ -277,11 +279,8 @@ public class ActorSystemManagerImpl implements ActorSystemManager { int lastColon = seeds.get(0).lastIndexOf(":") + 1; final Integer regionPort = Integer.parseInt( seeds.get(0).substring( lastColon )); - logger.info( "Akka Config for region {} is:\n" + - " Hostname {}\n" + - " Seeds {}\n" + - " Authoritative Region {}\n", - region, hostname, seeds, actorSystemFig.getAkkaAuthoritativeRegion() ); + logger.info( "Akka Config for region {} is:\n" + " Hostname {}\n" + " Seeds {}\n", + region, hostname, seeds ); Map<String, Object> configMap = new HashMap<String, Object>() {{ http://git-wip-us.apache.org/repos/asf/usergrid/blob/f0c9fd4b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java index 65d1734..5b98ca5 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java @@ -25,6 +25,7 @@ import java.util.UUID; import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException; import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValueException; +import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesFig; import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,6 +68,7 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect private static final Logger logger = LoggerFactory.getLogger( WriteCommit.class ); ActorSystemFig actorSystemFig; + UniqueValuesFig uniqueValuesFig; UniqueValuesService akkaUvService; @Inject @@ -82,6 +84,7 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect final MvccEntitySerializationStrategy entryStrat, final UniqueValueSerializationStrategy uniqueValueStrat, final ActorSystemFig actorSystemFig, + final UniqueValuesFig uniqueValuesFig, final UniqueValuesService akkaUvService ) { Preconditions.checkNotNull( logStrat, "MvccLogEntrySerializationStrategy is required" ); @@ -92,6 +95,7 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect this.entityStrat = entryStrat; this.uniqueValueStrat = uniqueValueStrat; this.actorSystemFig = actorSystemFig; + this.uniqueValuesFig = uniqueValuesFig; this.akkaUvService = akkaUvService; } @@ -130,13 +134,13 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect logMutation.mergeShallow( entityMutation ); // akkaFig may be null when this is called from JUnit tests - if ( actorSystemFig != null && actorSystemFig.getAkkaEnabled() ) { + if ( actorSystemFig != null && actorSystemFig.getEnabled() ) { String region = ioEvent.getRegion(); if ( region == null ) { - region = actorSystemFig.getAkkaAuthoritativeRegion(); + region = uniqueValuesFig.getAuthoritativeRegion(); } if ( region == null ) { - region = actorSystemFig.getRegion(); + region = actorSystemFig.getRegionLocal(); } confirmUniqueFieldsAkka( mvccEntity, version, applicationScope, region ); } else { http://git-wip-us.apache.org/repos/asf/usergrid/blob/f0c9fd4b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java index f159096..985137b 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java @@ -40,6 +40,7 @@ import org.apache.usergrid.persistence.collection.serialization.UniqueValueSeria import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet; import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl; import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValueException; +import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesFig; import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesService; import org.apache.usergrid.persistence.core.astyanax.CassandraConfig; import org.apache.usergrid.persistence.core.scope.ApplicationScope; @@ -63,6 +64,7 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>> private static final Logger logger = LoggerFactory.getLogger( WriteUniqueVerify.class ); ActorSystemFig actorSystemFig; + UniqueValuesFig uniqueValuesFig; UniqueValuesService akkaUvService; private final UniqueValueSerializationStrategy uniqueValueStrat; @@ -83,11 +85,13 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>> final Keyspace keyspace, final CassandraConfig cassandraFig, final ActorSystemFig actorSystemFig, + final UniqueValuesFig uniqueValuesFig, final UniqueValuesService akkaUvService ) { this.keyspace = keyspace; this.cassandraFig = cassandraFig; this.actorSystemFig = actorSystemFig; + this.uniqueValuesFig = uniqueValuesFig; this.akkaUvService = akkaUvService; Preconditions.checkNotNull( uniqueValueSerializiationStrategy, "uniqueValueSerializationStrategy is required" ); @@ -102,7 +106,7 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>> @Override public void call( final CollectionIoEvent<MvccEntity> ioevent ) { - if ( actorSystemFig != null && actorSystemFig.getAkkaEnabled() ) { + if ( actorSystemFig != null && actorSystemFig.getEnabled() ) { verifyUniqueFieldsAkka( ioevent ); } else { verifyUniqueFields( ioevent ); @@ -121,10 +125,10 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>> String region = ioevent.getRegion(); if ( region == null ) { - region = actorSystemFig.getAkkaAuthoritativeRegion(); + region = uniqueValuesFig.getAuthoritativeRegion(); } if ( region == null ) { - region = actorSystemFig.getRegion(); + region = actorSystemFig.getRegionLocal(); } try { akkaUvService.reserveUniqueValues( applicationScope, entity, mvccEntity.getVersion(), region ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/f0c9fd4b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesFig.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesFig.java index c99824f..edd0cbe 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesFig.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesFig.java @@ -28,40 +28,40 @@ import java.io.Serializable; @FigSingleton public interface UniqueValuesFig extends GuicyFig, Serializable { - String AKKA_UNIQUEVALUE_ACTORS = "collection.akka.uniquevalue.actors"; + String UNIQUEVALUE_ACTORS = "collection.uniquevalues.actors"; - String AKKA_UNIQUEVALUE_CACHE_TTL = "collection.akka.uniquevalue.cache.ttl"; + String UNIQUEVALUE_CACHE_TTL = "collection.uniquevalues.cache.ttl"; - String AKKA_UNIQUEVALUE_RESERVATION_TTL= "collection.akka.uniquevalue.reservation.ttl"; + String UNIQUEVALUE_RESERVATION_TTL= "collection.uniquevalues.reservation.ttl"; - String AKKA_UNIQUEVALUE_INSTANCES_PER_NODE = "collection.akka.uniquevalue.instances-per-node"; + String UNIQUEVALUE_AUTHORITATIVE_REGION = "collection.uniquevalues.authoritative.region"; /** - * Number of UniqueValueActors to be started on each node - */ - @Key(AKKA_UNIQUEVALUE_ACTORS) - @Default("300") - int getUniqueValueActors(); - - /** * Unique Value cache TTL in seconds. */ - @Key(AKKA_UNIQUEVALUE_CACHE_TTL) + @Key(UNIQUEVALUE_CACHE_TTL) @Default("10") int getUniqueValueCacheTtl(); /** * Unique Value Reservation TTL in seconds. */ - @Key(AKKA_UNIQUEVALUE_RESERVATION_TTL) + @Key(UNIQUEVALUE_RESERVATION_TTL) @Default("10") int getUniqueValueReservationTtl(); /** * Number of actor instances to create on each. */ - @Key(AKKA_UNIQUEVALUE_INSTANCES_PER_NODE) + @Key(UNIQUEVALUE_ACTORS) @Default("300") int getUniqueValueInstancesPerNode(); + + /** + * Primary authoritative region (used if none other specified). + */ + @Key(UNIQUEVALUE_AUTHORITATIVE_REGION) + @Default("default") + String getAuthoritativeRegion(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/f0c9fd4b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java index e7cee21..a0ee6be 100644 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java @@ -71,7 +71,7 @@ public class MarkCommitTest extends AbstractMvccEntityStageTest { //run the stage - WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null ); + WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null, null); //verify the observable is correct http://git-wip-us.apache.org/repos/asf/usergrid/blob/f0c9fd4b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java index 8665ee9..dcc473c 100644 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java @@ -84,10 +84,12 @@ public class WriteCommitTest extends AbstractMvccEntityStageTest { //run the stage - WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null ); + WriteCommit newStage = + new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null, null ); - Entity result = newStage.call( new CollectionIoEvent<MvccEntity>( context, mvccEntityInput ) ).getEvent().getEntity().get(); + Entity result = newStage.call( + new CollectionIoEvent<MvccEntity>( context, mvccEntityInput ) ).getEvent().getEntity().get(); //verify the log entry is correct @@ -131,7 +133,7 @@ public class WriteCommitTest extends AbstractMvccEntityStageTest { when( mvccEntityStrategy.write( any( ApplicationScope.class ), any( MvccEntity.class ) ) ) .thenReturn( entityMutation ); - new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null ).call( event ); + new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null, null ).call( event ); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/f0c9fd4b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java index 635e262..46cfde1 100644 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java @@ -96,7 +96,7 @@ public class WriteUniqueVerifyTest extends AbstractUniqueValueTest { final MvccEntity mvccEntity = fromEntity( entity ); // run the stage - WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig, keyspace, cassandraConfig, null, null ); + WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig, keyspace, cassandraConfig, null, null, null ); newStage.call( new CollectionIoEvent<>( collectionScope, mvccEntity ) ) ; http://git-wip-us.apache.org/repos/asf/usergrid/blob/f0c9fd4b/stack/rest/src/test/java/org/apache/usergrid/rest/UniqueCatsIT.java ---------------------------------------------------------------------- diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/UniqueCatsIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/UniqueCatsIT.java new file mode 100644 index 0000000..0120660 --- /dev/null +++ b/stack/rest/src/test/java/org/apache/usergrid/rest/UniqueCatsIT.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.usergrid.rest; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; +import org.apache.commons.lang.RandomStringUtils; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.ProcessingException; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.net.ConnectException; +import java.text.DecimalFormat; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.codahale.metrics.MetricRegistry.name; + + +/** + * Tests that Catgrid will not allow creation of entities with duplicate names. + * + * Intended for use against a production-like cluster, not run during normal JUnit testing. + * + * Comment out the @Ignore annotation below and edit to add your target hosts. + */ +public class UniqueCatsIT { + private static final Logger logger = LoggerFactory.getLogger( UniqueCatsIT.class ); + + private static final AtomicInteger successCounter = new AtomicInteger( 0 ); + private static final AtomicInteger errorCounter = new AtomicInteger( 0 ); + private static final AtomicInteger dupCounter = new AtomicInteger( 0 ); + + @Test + //@Ignore("Intended for use against prod-like cluster") + public void testDuplicatePrevention() throws Exception { + + int numThreads = 20; + int poolSize = 20; + int numCats = 100; + + Multimap<String, String> catsCreated = Multimaps.synchronizedMultimap( HashMultimap.create() ); + Multimap<String, Map<String, Object>> dupsRejected = Multimaps.synchronizedMultimap( HashMultimap.create() ); + + ExecutorService execService = Executors.newFixedThreadPool( poolSize ); + + Client client = ClientBuilder.newClient(); + + final MetricRegistry metrics = new MetricRegistry(); + final Timer responses = metrics.timer(name(UniqueCatsIT.class, "responses")); + long startTime = System.currentTimeMillis(); + + final AtomicBoolean failed = new AtomicBoolean(false); + + //String[] targetHosts = {"http://localhost:8080"}; + + String[] targetHosts = { + "https://ug21-west.e2e.apigee.net", + "https://ug21-east.e2e.apigee.net" + }; + + for (int i = 0; i < numCats; i++) { + + if ( failed.get() ) { break; } + + String randomizer = RandomStringUtils.randomAlphanumeric( 8 ); + + // multiple threads simultaneously trying to create a cat with the same propertyName + for (int j = 0; j < numThreads; j++) { + + if ( failed.get() ) { break; } + + final String name = "uv_test_cat_" + randomizer; + final String host = targetHosts[ j % targetHosts.length ]; + + execService.submit( () -> { + + Map<String, Object> form = new HashMap<String, Object>() {{ + put("name", name); + }}; + + Timer.Context time = responses.time(); + try { + WebTarget target = client.target( host ).path( + //"/test-organization/test-app/cats" ); + "/dmjohnson/sandbox/cats" ); + + //logger.info("Posting cat {} to host {}", catname, host); + + Response response = target.request() + //.post( Entity.entity( form, MediaType.APPLICATION_FORM_URLENCODED )); + .post( Entity.entity( form, MediaType.APPLICATION_JSON)); + + org.apache.usergrid.rest.test.resource.model.ApiResponse apiResponse = null; + String responseAsString = ""; + if ( response.getStatus() >= 400 ) { + responseAsString = response.readEntity( String.class ); + } else { + apiResponse = response.readEntity( + org.apache.usergrid.rest.test.resource.model.ApiResponse.class ); + } + + if ( response.getStatus() == 200 || response.getStatus() == 201 ) { + catsCreated.put( name, apiResponse.getEntity().getUuid().toString() ); + successCounter.incrementAndGet(); + + } else if ( response.getStatus() == 400 + && responseAsString.contains("DuplicateUniquePropertyExistsException")) { + dupsRejected.put( name, form ); + dupCounter.incrementAndGet(); + + } else { + logger.error("Cat creation failed status {} message {}", + response.getStatus(), responseAsString ); + errorCounter.incrementAndGet(); + } + + } catch ( ProcessingException e ) { + errorCounter.incrementAndGet(); + if ( e.getCause() instanceof ConnectException ) { + logger.error("Error connecting to " + host); + } else { + logger.error( "Error", e ); + } + + } catch ( Exception e ) { + errorCounter.incrementAndGet(); + logger.error("Error", e); + } + time.stop(); + + } ); + } + } + execService.shutdown(); + + try { + while (!execService.awaitTermination( 60, TimeUnit.SECONDS )) { + System.out.println( "Waiting..." ); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + + long endTime = System.currentTimeMillis(); + + logger.info( "Total time {}s", (endTime - startTime) / 1000 ); + + DecimalFormat format = new DecimalFormat("##.###"); + + logger.info( "Timed {} requests:\n" + + "mean rate {}/s\n" + + "min {}s\n" + + "max {}s\n" + + "mean {}s", + responses.getCount(), + format.format( responses.getMeanRate() ), + format.format( (double)responses.getSnapshot().getMin() / 1000000000 ), + format.format( (double)responses.getSnapshot().getMax() / 1000000000 ), + format.format( responses.getSnapshot().getMean() / 1000000000 ) + ); + + logger.info( "Error count {} ratio = {}", + errorCounter.get(), (float) errorCounter.get() / (float) responses.getCount() ); + + logger.info( "Success count = {}", successCounter.get() ); + + logger.info( "Rejected dup count = {}", dupCounter.get() ); + +// for ( String catname : catsCreated.keys() ) { +// System.out.println( catname ); +// Collection<Cat> cats = catsCreated.get( catname ); +// for ( Cat cat : cats ) { +// System.out.println(" " + cat.getUuid() ); +// } +// } + +// int count = 0; +// for ( String catname : dupsRejected.keySet() ) { +// System.out.println( catname ); +// Collection<Cat> cats = dupsRejected.get( catname ); +// for ( Cat cat : cats ) { +// System.out.println(" " + (count++) + " rejected " + cat.getCatname() + ":" + cat.getUuid() ); +// } +// } + + int catCount = 0; + int catnamesWithDuplicates = 0; + for ( String name : catsCreated.keySet() ) { + //Collection<Map<String, String>> forms = + Collection<String> forms = catsCreated.get( name ); + if ( forms.size() > 1 ) { + catnamesWithDuplicates++; + logger.info("Duplicate " + name); + } + catCount++; + } + Assert.assertEquals( 0, catnamesWithDuplicates ); + Assert.assertEquals( 0, errorCounter.get() ); + Assert.assertEquals( numCats, successCounter.get() ); + Assert.assertEquals( numCats, catCount ); + + + } + +}
