Repository: usergrid
Updated Branches:
  refs/heads/usergrid-1268-akka-211 [created] 9be069b15


Initial commit of Akka for Unique Values, Akka init works but much more work is 
needed.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/52ee2fb7
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/52ee2fb7
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/52ee2fb7

Branch: refs/heads/usergrid-1268-akka-211
Commit: 52ee2fb70e0672fba651edd2d998247e095e11fc
Parents: 4aa73aa
Author: Dave Johnson <[email protected]>
Authored: Tue Apr 12 07:38:40 2016 -0400
Committer: Dave Johnson <[email protected]>
Committed: Mon Apr 25 14:31:39 2016 -0400

----------------------------------------------------------------------
 stack/corepersistence/collection/pom.xml        | 116 ++--
 .../collection/guice/CollectionModule.java      |   6 +
 .../EntityCollectionManagerFactoryImpl.java     |  51 +-
 .../collection/uniquevalues/AkkaFig.java        |  95 +++
 .../uniquevalues/ClusterSingletonRouter.java    |  37 ++
 .../collection/uniquevalues/RequestActor.java   | 170 ++++++
 .../uniquevalues/ReservationCache.java          |  52 ++
 .../uniquevalues/ReservationCacheActor.java     |  73 +++
 .../uniquevalues/UniqueValueActor.java          | 245 ++++++++
 .../uniquevalues/UniqueValueException.java      |   7 +
 .../uniquevalues/UniqueValuesService.java       |  49 ++
 .../uniquevalues/UniqueValuesServiceImpl.java   | 587 +++++++++++++++++++
 .../uniquevalues/UniqueValuesTable.java         |  33 ++
 .../uniquevalues/UniqueValuesTableImpl.java     |  46 ++
 .../src/main/resources/application.conf         |  28 +
 .../src/main/resources/cluster-singleton.conf   |  25 +
 .../collection/guice/TestCollectionModule.java  |  17 +
 .../collection/uniquevalues/AkkaFigTest.java    |  41 ++
 .../uniquevalues/LocalPreventDupsTest.java      | 141 +++++
 .../src/test/resources/usergrid.properties      |  14 +
 20 files changed, 1774 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/pom.xml 
b/stack/corepersistence/collection/pom.xml
index 1b77735..ad9cefd 100644
--- a/stack/corepersistence/collection/pom.xml
+++ b/stack/corepersistence/collection/pom.xml
@@ -3,46 +3,82 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
 
-  <parent>
-    <artifactId>persistence</artifactId>
-    <groupId>org.apache.usergrid</groupId>
-    <version>2.1.1-SNAPSHOT</version>
-  </parent>
-
-  <modelVersion>4.0.0</modelVersion>
-  <description>The module for handling all scope I/O</description>
-
-  <artifactId>collection</artifactId>
-  <name>Usergrid Collection</name>
-
-  <dependencies>
-
-    <!-- Google Guice Integration Test Injectors -->
-
-    <dependency>
-      <groupId>org.apache.usergrid</groupId>
-      <artifactId>common</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-
-    <!-- tests -->
-
-    <dependency>
-      <groupId>org.apache.usergrid</groupId>
-      <artifactId>common</artifactId>
-      <version>${project.version}</version>
-      <classifier>tests</classifier>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.mockito</groupId>
-      <artifactId>mockito-core</artifactId>
-      <version>${mockito.version}</version>
-      <scope>test</scope>
-    </dependency>
-
-  </dependencies>
+    <parent>
+        <artifactId>persistence</artifactId>
+        <groupId>org.apache.usergrid</groupId>
+        <version>2.1.1-SNAPSHOT</version>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <description>The module for handling all scope I/O</description>
+
+    <artifactId>collection</artifactId>
+    <name>Usergrid Collection</name>
+
+    <dependencies>
+
+        <!-- Google Guice Integration Test Injectors -->
+
+        <dependency>
+            <groupId>org.apache.usergrid</groupId>
+            <artifactId>common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.typesafe.akka</groupId>
+            <artifactId>akka-actor_2.11</artifactId>
+            <version>2.4.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.typesafe.akka</groupId>
+            <artifactId>akka-remote_2.11</artifactId>
+            <version>2.4.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.typesafe.akka</groupId>
+            <artifactId>akka-cluster_2.11</artifactId>
+            <version>2.4.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.typesafe.akka</groupId>
+            <artifactId>akka-cluster-tools_2.11</artifactId>
+            <version>2.4.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.typesafe.akka</groupId>
+            <artifactId>akka-cluster-metrics_2.11</artifactId>
+            <version>2.4.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.typesafe.akka</groupId>
+            <artifactId>akka-slf4j_2.11</artifactId>
+            <version>2.4.0</version>
+        </dependency>
+
+        <!-- tests -->
+
+        <dependency>
+            <groupId>org.apache.usergrid</groupId>
+            <artifactId>common</artifactId>
+            <version>${project.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>${mockito.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
 
     <!--
         <profiles>

http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index d788174..3d794d1 100644
--- 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -20,6 +20,9 @@ package org.apache.usergrid.persistence.collection.guice;
 
 import java.util.concurrent.ThreadPoolExecutor;
 
+import org.apache.usergrid.persistence.collection.uniquevalues.AkkaFig;
+import 
org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesService;
+import 
org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesServiceImpl;
 import org.safehaus.guicyfig.GuicyFigModule;
 
 import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
@@ -54,6 +57,7 @@ public abstract class CollectionModule extends AbstractModule 
{
     protected void configure() {
 
         // noinspection unchecked
+        install( new GuicyFigModule( AkkaFig.class ) );
         install( new GuicyFigModule( SerializationFig.class ) );
         install( new GuicyFigModule( CollectionSchedulerFig.class ) );
         install( new SerializationModule() );
@@ -66,6 +70,8 @@ public abstract class CollectionModule extends AbstractModule 
{
         //bind this to our factory
         install( new GuicyFigModule( EntityCacheFig.class ) );
 
+        bind( UniqueValuesService.class ).to( UniqueValuesServiceImpl.class );
+
         bind( ChangeLogGenerator.class).to( ChangeLogGeneratorImpl.class);
 
         configureMigrationProvider();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
index a52ee9c..6ba23b6 100644
--- 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
+++ 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
@@ -82,14 +82,25 @@ public class EntityCollectionManagerFactoryImpl implements 
EntityCollectionManag
         CacheBuilder.newBuilder().maximumSize( 1000 )
                     .build( new CacheLoader<ApplicationScope, 
EntityCollectionManager>() {
                         public EntityCollectionManager load( ApplicationScope 
scope ) {
-                                  //create the target EM that will perform 
logic
+                            //create the target EM that will perform logic
                             final EntityCollectionManager target = new 
EntityCollectionManagerImpl(
-                                writeStart, writeVerifyUnique,
-                                writeOptimisticVerify, writeCommit, rollback, 
markStart, markCommit,  uniqueCleanup, versionCompact,
-                                entitySerializationStrategy, 
uniqueValueSerializationStrategy,
-                                mvccLogEntrySerializationStrategy, keyspace,
-                                metricsFactory, serializationFig,
-                                rxTaskScheduler, scope );
+                                writeStart,
+                                writeVerifyUnique,
+                                writeOptimisticVerify,
+                                writeCommit,
+                                rollback,
+                                markStart,
+                                markCommit,
+                                uniqueCleanup,
+                                versionCompact,
+                                entitySerializationStrategy,
+                                uniqueValueSerializationStrategy,
+                                mvccLogEntrySerializationStrategy,
+                                keyspace,
+                                metricsFactory,
+                                serializationFig,
+                                rxTaskScheduler,
+                                scope );
 
                             return target;
                         }
@@ -97,17 +108,19 @@ public class EntityCollectionManagerFactoryImpl implements 
EntityCollectionManag
 
 
     @Inject
-    public EntityCollectionManagerFactoryImpl( final WriteStart writeStart, 
final WriteUniqueVerify writeVerifyUnique,
-                                               final WriteOptimisticVerify 
writeOptimisticVerify,
-                                               final WriteCommit writeCommit, 
final RollbackAction rollback,
-                                               final MarkStart markStart, 
final MarkCommit markCommit,
-                                               final UniqueCleanup 
uniqueCleanup, final VersionCompact versionCompact,
-                                               final SerializationFig 
serializationFig, final
-                                                   
MvccEntitySerializationStrategy entitySerializationStrategy,
-                                               final 
UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
-                                               final 
MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
-                                               final Keyspace keyspace, final 
EntityCacheFig entityCacheFig,
-                                               final MetricsFactory 
metricsFactory, @CollectionExecutorScheduler  final RxTaskScheduler 
rxTaskScheduler ) {
+    public EntityCollectionManagerFactoryImpl(
+            final WriteStart writeStart, final WriteUniqueVerify 
writeVerifyUnique,
+            final WriteOptimisticVerify writeOptimisticVerify,
+            final WriteCommit writeCommit, final RollbackAction rollback,
+            final MarkStart markStart, final MarkCommit markCommit,
+            final UniqueCleanup uniqueCleanup, final VersionCompact 
versionCompact,
+            final SerializationFig serializationFig,
+            final MvccEntitySerializationStrategy entitySerializationStrategy,
+            final UniqueValueSerializationStrategy 
uniqueValueSerializationStrategy,
+            final MvccLogEntrySerializationStrategy 
mvccLogEntrySerializationStrategy,
+            final Keyspace keyspace, final EntityCacheFig entityCacheFig,
+            final MetricsFactory metricsFactory, @CollectionExecutorScheduler
+            final RxTaskScheduler rxTaskScheduler ) {
 
         this.writeStart = writeStart;
         this.writeVerifyUnique = writeVerifyUnique;
@@ -126,6 +139,7 @@ public class EntityCollectionManagerFactoryImpl implements 
EntityCollectionManag
         this.metricsFactory = metricsFactory;
         this.rxTaskScheduler = rxTaskScheduler;
     }
+
     @Override
     public EntityCollectionManager createCollectionManager(ApplicationScope 
applicationScope) {
         Preconditions.checkNotNull(applicationScope);
@@ -141,5 +155,4 @@ public class EntityCollectionManagerFactoryImpl implements 
EntityCollectionManag
     public void invalidate() {
         ecmCache.invalidateAll();
     }
-
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFig.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFig.java
 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFig.java
new file mode 100644
index 0000000..3bb9fcf
--- /dev/null
+++ 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFig.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.collection.uniquevalues;
+
+
+import org.safehaus.guicyfig.Default;
+import org.safehaus.guicyfig.FigSingleton;
+import org.safehaus.guicyfig.GuicyFig;
+import org.safehaus.guicyfig.Key;
+
+@FigSingleton
+public interface AkkaFig extends GuicyFig {
+
+    String AKKA_HOSTNAME = "collection.akka.hostname";
+
+    String AKKA_PORT = "collection.akka.port";
+
+    String AKKA_REGION = "collection.akka.region";
+
+    String AKKA_REGIONS = "collection.akka.regions";
+
+    String AKKA_UNIQUE_VALUE_ACTORS = "collection.akka.unique.value.actors";
+
+    String AKKA_REGION_SEEDS = "collection.akka.region.seeds";
+
+    String AKKA_REGION_TYPES = "collection.akka.region.types";
+
+
+    /**
+     * Hostname to be used in Akka configuration.
+     */
+    @Key(AKKA_HOSTNAME)
+    @Default("localhost")
+    String getHostname();
+
+    /**
+     * local port to be used in Akka configuration.
+     */
+    @Key(AKKA_PORT)
+    @Default("2551")
+    int getPort();
+
+    /**
+     * Local region to be used in Akka configuration.
+     */
+    @Key(AKKA_REGION)
+    @Default("us-east")
+    String getRegion();
+
+    /**
+     * Comma-separated list of all regions to be used in Akka configuration.
+     */
+    @Key(AKKA_REGIONS)
+    @Default("us-east")
+    String getRegions();
+
+    /**
+     * Number of UniqueValueActors to be started on each node
+     */
+    @Key(AKKA_UNIQUE_VALUE_ACTORS)
+    @Default("300")
+    int getUniqueValueActors();
+
+    /**
+     * Comma-separated lists of seeds each with format 
{region}:{hostname}:{port}
+     */
+    @Key(AKKA_REGION_SEEDS)
+    @Default("us-east:localhost:2551")
+    String getRegionSeeds();
+
+    /**
+     * Authoritative regions may be specified for types
+     * Comma-separated lists of region types each with format {region}:{type}
+     */
+    // TODO: allow this to be set via REST API
+    @Key(AKKA_REGION_TYPES)
+    @Default("")
+    String getRegionTypes();
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ClusterSingletonRouter.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ClusterSingletonRouter.java
 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ClusterSingletonRouter.java
new file mode 100644
index 0000000..8cd0ab0
--- /dev/null
+++ 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ClusterSingletonRouter.java
@@ -0,0 +1,37 @@
+package org.apache.usergrid.persistence.collection.uniquevalues;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.routing.ConsistentHashingRouter;
+import akka.routing.FromConfig;
+
+
+/**
+ * Uses a consistent hash to route Unique Value requests to UniqueValueActors.
+ */
+public class ClusterSingletonRouter extends UntypedActor {
+
+    private final ActorRef router;
+
+
+    public ClusterSingletonRouter( String injectorName ) {
+        router = getContext().actorOf(
+                
FromConfig.getInstance().props(Props.create(UniqueValueActor.class, 
injectorName )), "router");
+    }
+
+    @Override
+    public void onReceive(Object message) {
+
+        if ( message instanceof UniqueValueActor.Request) {
+            UniqueValueActor.Request request = 
(UniqueValueActor.Request)message;
+
+            ConsistentHashingRouter.ConsistentHashableEnvelope envelope =
+                    new ConsistentHashingRouter.ConsistentHashableEnvelope( 
message, request.getRowKey() );
+            router.tell( envelope, getSender());
+
+        } else {
+            unhandled(message);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/RequestActor.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/RequestActor.java
 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/RequestActor.java
new file mode 100644
index 0000000..c27d4e1
--- /dev/null
+++ 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/RequestActor.java
@@ -0,0 +1,170 @@
+package org.apache.usergrid.persistence.collection.uniquevalues;
+
+import akka.actor.ActorSelection;
+import akka.actor.Address;
+import akka.actor.UntypedActor;
+import akka.cluster.Cluster;
+import akka.cluster.ClusterEvent;
+import akka.cluster.Member;
+import akka.cluster.MemberStatus;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+
+/**
+ * Once notified of nodes, sends unique propertyValue requests to 
ClusterSingletonRouter via it's local proxy.
+ */
+class RequestActor extends UntypedActor {
+    private static final Logger logger = LoggerFactory.getLogger( 
RequestActor.class );
+
+    private final String name = RandomStringUtils.randomAlphanumeric( 4 );
+
+    private final Set<Address> nodes = new HashSet<>();
+
+    private final Cluster cluster = Cluster.get(getContext().system());
+    private final String routerProxyPath;
+
+    private boolean ready = false;
+
+
+    public RequestActor(String routerProxyPath ) {
+        this.routerProxyPath = routerProxyPath;
+    }
+
+    // subscribe to cluster changes, MemberEvent
+    @Override
+    public void preStart() {
+        logger.debug("{} role {} address {}:{} starting up, subscribing to 
cluster events...", name,
+            cluster.getSelfRoles().iterator().next(),
+            cluster.readView().selfAddress().host(),
+            cluster.readView().selfAddress().hostPort());
+        cluster.subscribe(getSelf(), ClusterEvent.MemberEvent.class, 
ClusterEvent.ReachabilityEvent.class);
+    }
+
+    // re-subscribe when restart
+    @Override
+    public void postStop() {
+        cluster.unsubscribe(getSelf());
+    }
+
+    @Override
+    public void onReceive(Object message) {
+
+        int startSize = nodes.size();
+
+        if ( message instanceof UniqueValueActor.Request && ready ) {
+
+            // just pick any node, the ClusterSingletonRouter will do the 
consistent hash routing
+            List<Address> nodesList = new ArrayList<>( nodes );
+            Address address = nodesList.get( 
ThreadLocalRandom.current().nextInt( nodesList.size() ) );
+            ActorSelection service = getContext().actorSelection( address + 
routerProxyPath );
+            service.tell( message, getSender() );
+
+        } else if ( message instanceof UniqueValueActor.Request && !ready ) {
+            logger.debug("{} responding with status unknown", name);
+
+            getSender().tell( new UniqueValueActor.Response(
+                    UniqueValueActor.Response.Status.ERROR ) , getSender() );
+
+        } else if ( message instanceof StatusRequest ) {
+            if ( ready ) {
+                getSender().tell( new StatusMessage( name, 
StatusMessage.Status.READY ), getSender() );
+            } else {
+                getSender().tell( new StatusMessage( name, 
StatusMessage.Status.INITIALIZING), getSender() );
+            }
+            return;
+
+        } else {
+            processAsClusterEvent( message );
+        }
+
+        if ( logger.isDebugEnabled() && startSize != nodes.size() ) {
+            logger.debug( "{} now knows {} nodes", name, nodes.size() );
+        }
+
+        if (!nodes.isEmpty() && !ready) {
+            logger.debug( name + " is ready" );
+            ready = true;
+
+        } else if (nodes.isEmpty() && ready) {
+            ready = false;
+        }
+    }
+
+    /**
+     * Process messages about nodes up, down, reachable and unreachable.
+     */
+    private void processAsClusterEvent(Object message) {
+
+        if (message instanceof ClusterEvent.CurrentClusterState) {
+            ClusterEvent.CurrentClusterState state = 
(ClusterEvent.CurrentClusterState) message;
+            nodes.clear();
+            for (Member member : state.getMembers()) {
+                if (member.hasRole("io") && member.status().equals( 
MemberStatus.up())) {
+                    nodes.add(member.address());
+                    logger.debug("RequestActor {} received cluster-state 
member-up for {}", name, member.address());
+                }
+            }
+
+        } else if (message instanceof ClusterEvent.MemberUp) {
+            ClusterEvent.MemberUp mUp = (ClusterEvent.MemberUp) message;
+            if (mUp.member().hasRole("io")) {
+                nodes.add( mUp.member().address() );
+            }
+            logger.debug("{} received member-up for {}", name, 
mUp.member().address());
+
+        } else if (message instanceof ClusterEvent.MemberEvent) {
+            ClusterEvent.MemberEvent other = (ClusterEvent.MemberEvent) 
message;
+            nodes.remove(other.member().address());
+
+        } else if (message instanceof ClusterEvent.UnreachableMember) {
+            ClusterEvent.UnreachableMember unreachable = 
(ClusterEvent.UnreachableMember) message;
+            nodes.remove(unreachable.member().address());
+            logger.debug("{} received un-reachable for {}", name, 
unreachable.member().address());
+
+        } else if (message instanceof ClusterEvent.ReachableMember) {
+            ClusterEvent.ReachableMember reachable = 
(ClusterEvent.ReachableMember) message;
+            if (reachable.member().hasRole("io")) {
+                nodes.add( reachable.member().address() );
+            }
+            logger.debug("{} received reachable for {}", name, 
reachable.member().address());
+
+        } else {
+            logger.error("{}: unhandled message: {}", name, 
message.toString());
+            unhandled(message);
+        }
+    }
+
+    /**
+     * RequestAction responds to StatusRequests.
+     */
+    static class StatusRequest implements Serializable { }
+
+    /**
+     * RequestActor responds with, and some times unilaterally sends 
StatusMessages.
+     */
+    static class StatusMessage implements Serializable {
+        final String name;
+        public enum Status { INITIALIZING, READY }
+        final Status status;
+        public StatusMessage(String name, Status status) {
+            this.name = name;
+            this.status = status;
+        }
+        public String getName() {
+            return name;
+        }
+        public boolean isReady() {
+            return status.equals( Status.READY );
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java
 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java
new file mode 100644
index 0000000..d5c67c3
--- /dev/null
+++ 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java
@@ -0,0 +1,52 @@
+package org.apache.usergrid.persistence.collection.uniquevalues;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheStats;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+
+// cannot be a Guice singleton, must be shared across injectors
+// @com.google.inject.Singleton
+public class ReservationCache {
+    private static final Logger logger = LoggerFactory.getLogger( 
RequestActor.class );
+
+    Cache<String, UniqueValueActor.Reservation> cache = 
CacheBuilder.newBuilder()
+       .maximumSize(1000)
+       .concurrencyLevel( 300 )
+       .expireAfterWrite(30, TimeUnit.SECONDS)
+       .recordStats()
+       .build();
+
+    private static ReservationCache instance = new ReservationCache();
+
+    public static ReservationCache getInstance() {
+        return instance;
+    }
+
+    private ReservationCache() {}
+
+    public UniqueValueActor.Reservation get( String rowKey ) {
+        UniqueValueActor.Reservation res = cache.getIfPresent( rowKey );
+        return res;
+    }
+
+    public void cacheReservation( UniqueValueActor.Reservation reservation ) {
+        cache.put( reservation.getRowKey(), reservation );
+    }
+
+    public void cancelReservation( UniqueValueActor.Cancellation cancellation 
) {
+        cache.invalidate( cancellation.getRowKey() );
+    }
+
+    public CacheStats getStats() {
+        return cache.stats();
+    }
+
+    public long getSize() {
+        return cache.size();
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java
 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java
new file mode 100644
index 0000000..add33fa
--- /dev/null
+++ 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.collection.uniquevalues;
+
+import akka.actor.ActorRef;
+import akka.actor.UntypedActor;
+import akka.cluster.pubsub.DistributedPubSub;
+import akka.cluster.pubsub.DistributedPubSubMediator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Updates local unique values cache based on reservations and cancellations.
+ */
+public class ReservationCacheActor extends UntypedActor {
+    private static final Logger logger = LoggerFactory.getLogger( 
ReservationCacheActor.class );
+
+    int reservationCount = 0;
+    int cancellationCount = 0;
+
+    public ReservationCacheActor(String injectorName ) {
+
+        logger.info("Starting for {}", injectorName);
+
+        // subscribe to the topic named "content"
+        ActorRef mediator = 
DistributedPubSub.get(getContext().system()).mediator();
+        mediator.tell(new DistributedPubSubMediator.Subscribe("content", 
getSelf()), getSelf());
+    }
+
+    public void onReceive( Object msg ) {
+
+        if ( msg instanceof UniqueValueActor.Reservation ) {
+            UniqueValueActor.Reservation res = 
(UniqueValueActor.Reservation)msg;
+            ReservationCache.getInstance().cacheReservation( res );
+
+            if ( ++reservationCount % 10 == 0 ) {
+                logger.debug("Received {} reservations cache size {}",
+                        reservationCount, 
ReservationCache.getInstance().getSize());
+            }
+
+        } else if ( msg instanceof UniqueValueActor.Cancellation ) {
+            UniqueValueActor.Cancellation can = 
(UniqueValueActor.Cancellation)msg;
+            ReservationCache.getInstance().cancelReservation( can );
+
+            if ( ++cancellationCount % 10 == 0 ) {
+                logger.debug("Received {} cancellations", cancellationCount);
+            }
+
+        } else if (msg instanceof DistributedPubSubMediator.SubscribeAck) {
+            logger.debug( "subscribing" );
+
+        } else {
+            unhandled( msg );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
new file mode 100644
index 0000000..faf0433
--- /dev/null
+++ 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
@@ -0,0 +1,245 @@
+package org.apache.usergrid.persistence.collection.uniquevalues;
+
+import akka.actor.ActorRef;
+import akka.actor.UntypedActor;
+import akka.cluster.pubsub.DistributedPubSub;
+import akka.cluster.pubsub.DistributedPubSubMediator;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.UUID;
+
+public class UniqueValueActor extends UntypedActor {
+    private static final Logger logger = LoggerFactory.getLogger( 
UniqueValueActor.class );
+
+    private final String name = RandomStringUtils.randomAlphanumeric( 4 );
+
+    //private MetricsService metricsService;
+
+    private UniqueValuesTable table = new UniqueValuesTableImpl();
+
+    private ActorRef mediator = 
DistributedPubSub.get(getContext().system()).mediator();
+
+    private int count = 0;
+
+
+    public UniqueValueActor( String injectorName ) {
+
+//        UniqueValuesService uniqueValuesService =
+//                GuiceModule.getInjector( injectorName ).getInstance( 
UniqueValuesService.class );
+//
+//        terminateOnError = Boolean.parseBoolean( 
uniqueValuesService.getProperties()
+//                .getProperty( "akka.unique-value-actor-terminate-on-error", 
"false" ) );
+//
+//        chaos = Boolean.parseBoolean( uniqueValuesService.getProperties()
+//                .getProperty( "akka.test.chaos", "false" ) );
+
+//        metricsService =
+//                GuiceModule.getInjector( injectorName ).getInstance( 
MetricsService.class );
+    }
+
+    @Override
+    public void onReceive(Object message) {
+
+        if ( message instanceof Request ) {
+            Request req = (Request) message;
+
+            count++;
+            if (count % 10 == 0) {
+                logger.debug( "UniqueValueActor {} processed {} requests", 
name, count );
+            }
+        }
+
+        if ( message instanceof Reservation ) {
+            Reservation res = (Reservation) message;
+
+//            final Timer.Context context = 
metricsService.getReservationTimer().time();
+
+            try {
+                UUID owner = table.lookupOwner( res.getType(), 
res.getPropertyName(), res.getPropertyValue() );
+
+                if ( owner != null && owner.equals( res.getUuid() )) {
+                    // sender already owns this unique value
+                    getSender().tell( new Response( Response.Status.IS_UNIQUE 
), getSender() );
+                    return;
+
+                } else if ( owner != null && !owner.equals( res.getUuid() )) {
+                    // tell sender value is not unique
+                    getSender().tell( new Response( Response.Status.NOT_UNIQUE 
), getSender() );
+                    return;
+                }
+
+                table.reserve( res.getUuid(), res.getType(), 
res.getPropertyName(), res.getPropertyValue() );
+
+                getSender().tell( new Response( Response.Status.IS_UNIQUE ), 
getSender() );
+
+                mediator.tell( new DistributedPubSubMediator.Publish( 
"content",
+                        new Reservation( res ) ), getSelf() );
+
+            } catch (Throwable t) {
+
+                getSender().tell( new Response( Response.Status.ERROR ), 
getSender() );
+                logger.error( "Error processing request", t );
+
+
+            } finally {
+//                context.stop();
+            }
+
+        } else if ( message instanceof Confirmation) {
+            Confirmation commit = (Confirmation) message;
+
+//            final Timer.Context context = 
metricsService.getCommitmentTimer().time();
+
+            try {
+                UUID owner = table.lookupOwner(  commit.getType(), 
commit.getPropertyName(), commit.getPropertyValue() );
+
+                if ( owner != null && !owner.equals( commit.getUuid() )) {
+                    // cannot reserve, somebody else owns the unique value
+                    getSender().tell( new Response( Response.Status.NOT_UNIQUE 
), getSender() );
+                    return;
+
+                } else if ( owner == null ) {
+                    // cannot commit without first reserving
+                    getSender().tell( new Response( 
Response.Status.BAD_REQUEST ), getSender() );
+                    return;
+                }
+
+                table.commit( commit.getUuid(), commit.getType(), 
commit.getPropertyName(), commit.getPropertyValue() );
+
+                getSender().tell( new Response( Response.Status.IS_UNIQUE ), 
getSender() );
+
+                mediator.tell( new DistributedPubSubMediator.Publish( 
"content",
+                        new Reservation( commit ) ), getSelf() );
+
+            } catch (Throwable t) {
+                getSender().tell( new Response( Response.Status.ERROR ), 
getSender() );
+                logger.error( "Error processing request", t );
+
+            } finally {
+//                context.stop();
+            }
+
+
+        } else if ( message instanceof Cancellation ) {
+            Cancellation can = (Cancellation) message;
+
+            try {
+                UUID owner = table.lookupOwner(  can.getType(), 
can.getPropertyName(), can.getPropertyValue() );
+
+                if ( owner != null && !owner.equals( can.getUuid() )) {
+                    // cannot cancel, somebody else owns the unique value
+                    getSender().tell( new Response( Response.Status.NOT_UNIQUE 
), getSender() );
+                    return;
+
+                } else if ( owner == null ) {
+                    // cannot cancel unique value that does not exist
+                    getSender().tell( new Response( 
Response.Status.BAD_REQUEST ), getSender() );
+                    return;
+                }
+
+                table.cancel( can.getType(), can.getPropertyName(), 
can.getPropertyValue() );
+
+                getSender().tell( new Response( Response.Status.SUCCESS ), 
getSender() );
+
+                mediator.tell( new DistributedPubSubMediator.Publish( 
"content",
+                        new Reservation( can ) ), getSelf() );
+
+            } catch (Throwable t) {
+                getSender().tell( new Response( Response.Status.ERROR ), 
getSender() );
+                logger.error( "Error processing request", t );
+            }
+
+        } else {
+            unhandled( message );
+        }
+    }
+
+
+    /**
+     * UniqueValue actor receives and processes Requests.
+     */
+    public abstract static class Request implements Serializable {
+        final UUID uuid;
+        final String type;
+        final String propertyName;
+        final String propertyValue;
+        final String rowKey;
+
+        public Request(UUID uuid, String type, String propertyName, String 
value) {
+            this.uuid = uuid;
+            this.type = type;
+            this.propertyName = propertyName;
+            this.propertyValue = value;
+            this.rowKey = getType() + ":" + getPropertyName() + ":" + 
getPropertyValue();
+        }
+        public Request( Request req ) {
+            this.uuid = req.uuid;
+            this.type = req.type;
+            this.propertyName = req.propertyName;
+            this.propertyValue = req.propertyValue;
+            this.rowKey = getType() + ":" + getPropertyName() + ":" + 
getPropertyValue();
+
+        }
+        public String getRowKey() {
+            return rowKey;
+        }
+        public UUID getUuid() {
+            return uuid;
+        }
+        public String getType() {
+            return type;
+        }
+        public String getPropertyName() {
+            return propertyName;
+        }
+        public String getPropertyValue() {
+            return propertyValue;
+        }
+    }
+
+    /**
+     * UniqueValue actor creates and sends Responses.
+     */
+    public static class Response implements Serializable {
+        public enum Status { IS_UNIQUE, NOT_UNIQUE, SUCCESS, ERROR, 
BAD_REQUEST }
+        final Status status;
+
+        public Response(Status status) {
+            this.status = status;
+        }
+        public Status getStatus() {
+            return status;
+        }
+    }
+
+    public static class Reservation extends Request implements Serializable {
+        public Reservation( Request req ) {
+            super( req );
+        }
+        public Reservation(UUID uuid, String type, String username, String 
value) {
+            super( uuid, type, username, value );
+        }
+    }
+
+    public static class Cancellation extends Request implements Serializable {
+        public Cancellation( Request req ) {
+            super( req );
+        }
+        public Cancellation(UUID uuid, String type, String username, String 
value) {
+            super( uuid, type, username, value );
+        }
+    }
+
+    public static class Confirmation extends Request implements Serializable {
+        public Confirmation(Request req ) {
+            super( req );
+        }
+        public Confirmation(UUID uuid, String type, String username, String 
value) {
+            super( uuid, type, username, value );
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueException.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueException.java
 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueException.java
new file mode 100644
index 0000000..5df8237
--- /dev/null
+++ 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueException.java
@@ -0,0 +1,7 @@
+package org.apache.usergrid.persistence.collection.uniquevalues;
+
+public class UniqueValueException extends Exception {
+    public UniqueValueException(String message) {
+        super( message );
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesService.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesService.java
 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesService.java
new file mode 100644
index 0000000..2219df6
--- /dev/null
+++ 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesService.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.collection.uniquevalues;
+
+
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+/**
+ * Service that reserves and confirms unique values.
+ */
+public interface UniqueValuesService {
+
+    /**
+     * Check that unique values are unique and reserve them for a limited time.
+     * If the reservations are not confirmed, they will expire.
+     */
+    void reserveUniqueValues( Entity entity ) throws UniqueValueException;
+
+    /**
+     * Confirm unique values that were reserved earlier.
+     */
+    void confirmUniqueValues( Entity entity ) throws UniqueValueException;
+
+    /**
+     * For test purposes only.
+     */
+    void start( String hostname, Integer port, String region );
+
+    /**
+     * For test purposes only.
+     */
+    void waitForRequestActors();
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
new file mode 100644
index 0000000..8897091
--- /dev/null
+++ 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
@@ -0,0 +1,587 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.collection.uniquevalues;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.cluster.singleton.ClusterSingletonManager;
+import akka.cluster.singleton.ClusterSingletonManagerSettings;
+import akka.cluster.singleton.ClusterSingletonProxy;
+import akka.cluster.singleton.ClusterSingletonProxySettings;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.field.Field;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+
+public class UniqueValuesServiceImpl implements UniqueValuesService {
+    private static final Logger logger = LoggerFactory.getLogger( 
UniqueValuesServiceImpl.class );
+
+    @Inject
+    AkkaFig akkaFig;
+
+    private String hostname;
+    private Integer port;
+    private String currentRegion;
+
+    private Map<String, ActorRef> requestActorsByRegion;
+    private Map<String, String> regionsByType = new HashMap<>();
+
+//    private final MetricRegistry metrics = new MetricRegistry();
+//
+//    private final Timer getTimer     = metrics.timer( "get" );
+//    private final Timer   saveTimer    = metrics.timer( "save" );
+//
+//    private final Counter cacheCounter = metrics.counter( "cache" );
+//    private final Counter dupCounter   = metrics.counter( "duplicates" );
+//
+//    private final Timer   reservationTimer = metrics.timer( "reservation" );
+//    private final Timer   commitmentTimer  = metrics.timer( "commitment" );
+
+    private ReservationCache reservationCache;
+
+    private final boolean disableUniqueValues;
+
+
+    public UniqueValuesServiceImpl() {
+        this.reservationCache = ReservationCache.getInstance();
+        this.disableUniqueValues = false;
+    }
+
+
+    /**
+     * Init Akka ActorSystems and wait for request actors to start.
+     */
+    public void start() {
+        this.hostname = akkaFig.getHostname();
+        this.port = akkaFig.getPort();
+        this.currentRegion = akkaFig.getRegion();
+
+        initAkka();
+        waitForRequestActors();
+    }
+
+    /**
+     * For testing purposes only; does not wait for request actors to start.
+     */
+    public void start( String hostname, Integer port, String currentRegion ) {
+        this.hostname = hostname;
+        this.port = port;
+        this.currentRegion = currentRegion;
+
+        initAkka();
+    }
+
+    private Map<String, ActorRef> getRequestActorsByRegion() {
+        return requestActorsByRegion;
+    }
+
+    private Map<String, String> getRegionsByType() {
+        return regionsByType;
+    }
+
+//    public Counter getDupCounter() {
+//        return dupCounter;
+//    }
+//
+//    public Counter getCacheCounter() {
+//        return cacheCounter;
+//    }
+//
+//    public Timer getReservationTimer() {
+//        return reservationTimer;
+//    }
+//
+//    public Timer getCommitmentTimer() {
+//        return commitmentTimer;
+//    }
+//
+//    public Timer getSaveTimer() {
+//        return saveTimer;
+//    }
+//
+//    public Timer getGetTimer() {
+//        return getTimer;
+//    }
+
+    private void initAkka() {
+        logger.info("Initializing Akka");
+
+        // Create one actor system with request actor for each region
+
+        if ( StringUtils.isEmpty( hostname )) {
+            throw new RuntimeException( "No value specified for 
akka.hostname");
+        }
+
+        if ( StringUtils.isEmpty( currentRegion )) {
+            throw new RuntimeException( "No value specified for akka.region");
+        }
+
+        String regionsValue = akkaFig.getRegions();
+        if ( StringUtils.isEmpty( regionsValue )) {
+            throw new RuntimeException( "No value specified for akka.regions");
+        }
+
+        String[] regions = regionsValue.split( "," );
+        for ( String region : regions ) {
+
+            akkaFig.getKeyByMethod( "" );
+
+            String typesValue = akkaFig.getRegionTypes();
+            String[] regionTypes = StringUtils.isEmpty( typesValue ) ? new 
String[0] : typesValue.split(",");
+            for ( String regionType : regionTypes ) {
+                String[] parts = regionType.split(":");
+                String typeRegion = parts[0];
+                String type = parts[1];
+                this.regionsByType.put( type, typeRegion );
+            }
+        }
+
+        final Map<String, ActorSystem> systemMap = new HashMap<>();
+
+        ActorSystem localSystem = createClusterSingletonProxies( 
readClusterSingletonConfigs(), systemMap );
+
+        createRequestActors( systemMap );
+
+        subscribeToReservations( localSystem, systemMap );
+    }
+
+    private void subscribeToReservations( ActorSystem localSystem, Map<String, 
ActorSystem> systemMap ) {
+
+        for ( String region : systemMap.keySet() ) {
+            ActorSystem actorSystem = systemMap.get( region );
+            if ( !actorSystem.equals( localSystem ) ) {
+                logger.info("Starting ReservationCacheUpdater for {}", region 
);
+                actorSystem.actorOf( Props.create( 
ReservationCacheActor.class, region ), "subscriber");
+            }
+        }
+    }
+
+    /**
+     * Create ActorSystem and ClusterSingletonProxy for every region.
+     * Create ClusterSingletonManager for the current region.
+     *
+     * @param configMap Configurations to be used to create ActorSystems
+     * @param systemMap Map of ActorSystems created by this method
+     *
+     * @return ActorSystem for this region.
+     */
+    private ActorSystem createClusterSingletonProxies(
+            Map<String, Config> configMap, Map<String, ActorSystem> systemMap 
) {
+
+        ActorSystem localSystem = null;
+
+        for ( String region : configMap.keySet() ) {
+            Config config = configMap.get( region );
+
+            ActorSystem system = ActorSystem.create( "ClusterSystem", config );
+            systemMap.put( region, system );
+
+            // cluster singletons only run role "io" nodes and NOT on "client" 
nodes of other regions
+            if ( currentRegion.equals( region ) ) {
+
+                localSystem = system;
+
+                // create cluster singleton supervisor for actor system
+                ClusterSingletonManagerSettings settings =
+                        ClusterSingletonManagerSettings.create( system 
).withRole("io");
+                system.actorOf( ClusterSingletonManager.props(
+                        Props.create( ClusterSingletonRouter.class, region ),
+                        PoisonPill.getInstance(), settings ), "uvRouter");
+            }
+
+            // create proxy for sending messages to singleton
+            ClusterSingletonProxySettings proxySettings =
+                    ClusterSingletonProxySettings.create( system 
).withRole("io");
+            system.actorOf( ClusterSingletonProxy.props( "/user/uvRouter", 
proxySettings ), "uvProxy" );
+        }
+
+        return localSystem;
+    }
+
+    /**
+     * Create RequestActor for each region.
+     *
+     * @param systemMap Map of regions to ActorSystems.
+     */
+    private void createRequestActors( Map<String, ActorSystem> systemMap ) {
+
+        requestActorsByRegion = new HashMap<>();
+
+        for ( String region : systemMap.keySet() ) {
+
+            // Each RequestActor needs to know path to ClusterSingletonProxy 
and region
+            ActorRef requestActor = systemMap.get( region ).actorOf(
+                    Props.create( RequestActor.class, "/user/uvProxy" ), 
"requestActor" );
+
+            requestActorsByRegion.put( region, requestActor );
+        }
+    }
+
+    public void waitForRequestActors() {
+
+        for ( String region : requestActorsByRegion.keySet() ) {
+            ActorRef ra = requestActorsByRegion.get( region );
+            waitForRequestActor( ra );
+        }
+    }
+
+    private void waitForRequestActor( ActorRef ra ) {
+
+        logger.info( "Waiting on request actor {}...", ra.path() );
+
+        boolean started = false;
+        int retries = 0;
+        int maxRetries = 60;
+        while (retries < maxRetries) {
+            Timeout t = new Timeout( 10, TimeUnit.SECONDS );
+
+            Future<Object> fut = Patterns.ask( ra, new 
RequestActor.StatusRequest(), t );
+            try {
+                RequestActor.StatusMessage result = 
(RequestActor.StatusMessage) Await.result( fut, t.duration() );
+
+                if (result.status.equals( 
RequestActor.StatusMessage.Status.READY )) {
+                    started = true;
+                    break;
+                }
+                logger.info( "Waiting for request actor {} region {} ({}s)", 
ra.path(), currentRegion, retries );
+                Thread.sleep( 1000 );
+
+            } catch (Exception e) {
+                logger.error( "Error: Timeout waiting for requestActor" );
+            }
+            retries++;
+        }
+
+        if (started) {
+            logger.info( "RequestActor has started" );
+        } else {
+            throw new RuntimeException( "RequestActor did not start in time" );
+        }
+    }
+
+
+    /**
+     * Read configuration and create a Config for each region.
+     *
+     * @return Map of regions to Configs.
+     */
+    private Map<String, Config> readClusterSingletonConfigs() {
+
+        Map<String, Config> configs = new HashMap<>();
+
+        ListMultimap<String, String> seedsByRegion = 
ArrayListMultimap.create();
+
+        String[] regionSeeds = akkaFig.getRegionSeeds().split( "," );
+
+        try {
+
+            if ( port != null ) {
+
+                // we are testing
+                String seed = "akka.tcp://ClusterSystem@" + hostname + ":" + 
port;
+                seedsByRegion.put( currentRegion, seed );
+
+            } else {
+
+                for (String regionSeed : regionSeeds) {
+
+                    String[] parts = regionSeed.split( ":" );
+                    String region = parts[0];
+                    String hostname = parts[1];
+                    String regionPortString = parts[2];
+
+                    // all seeds in same region must use same port
+                    // we assume 0th seed has the right port
+                    final Integer regionPort;
+
+                    if (port == null) {
+                        // we assume 0th seed has the right port
+                        regionPort = Integer.parseInt( regionPortString );
+                    } else {
+                        regionPort = port; // unless we are testing
+                    }
+
+                    String seed = "akka.tcp://ClusterSystem@" + hostname + ":" 
+ regionPort;
+
+                    seedsByRegion.put( region, seed );
+                }
+
+                if (seedsByRegion.keySet().isEmpty()) {
+                    throw new RuntimeException( "No seeds listed in 'parsing 
collection.akka.region.seeds' property." );
+                }
+            }
+
+            int numInstancesPerNode = akkaFig.getUniqueValueActors();
+
+            for ( String region : seedsByRegion.keySet() ) {
+
+                List<String> seeds = seedsByRegion.get( region );
+
+                final Integer regionPort;
+
+                if (port == null) {
+                    // we assume 0th seed has the right port
+                    int lastColon = seeds.get(0).lastIndexOf(":") + 1;
+                    regionPort = Integer.parseInt( seeds.get(0).substring( 
lastColon ));
+                } else {
+                    regionPort = port; // unless we are testing
+                }
+
+                // cluster singletons only run role "io" nodes and NOT on 
"client" nodes of other regions
+                String clusterRole = currentRegion.equals( region ) ? "io" : 
"client";
+
+                logger.info( "Config for region {} is:\npoc Akka Hostname 
{}\npoc Akka Seeds {}\n" +
+                                "poc Akka Port {}\npoc UniqueValueActors per 
node {}",
+                        region, hostname, seeds, port, numInstancesPerNode );
+
+                Map<String, Object> configMap = new HashMap<String, Object>() 
{{
+                    put( "akka", new HashMap<String, Object>() {{
+                        put( "remote", new HashMap<String, Object>() {{
+                            put( "netty.tcp", new HashMap<String, Object>() {{
+                                put( "hostname", hostname );
+                                put( "bind-hostname", hostname );
+                                put( "port", regionPort );
+                            }} );
+                        }} );
+                        put( "cluster", new HashMap<String, Object>() {{
+                            put( "max-nr-of-instances-per-node", 
numInstancesPerNode );
+                            put( "roles", 
Collections.singletonList(clusterRole) );
+                            put( "seed-nodes", new ArrayList<String>() {{
+                                for (String seed : seeds) {
+                                    add( seed );
+                                }
+                            }} );
+                        }} );
+                        put( "actor", new HashMap<String, Object>() {{
+                            put( "deployment", new HashMap<String, Object>() {{
+                                put( "/uvRouter/singleton/router", new 
HashMap<String, Object>() {{
+                                    put( "cluster", new HashMap<String, 
Object>() {{
+                                        //put( "roles", 
Collections.singletonList(role) );
+                                        put( "max-nr-of-instances-per-node", 
numInstancesPerNode );
+                                    }} );
+                                }} );
+                            }} );
+                        }} );
+                    }} );
+                }};
+
+                Config config = ConfigFactory
+                        .parseMap( configMap )
+                        .withFallback( ConfigFactory.parseString( 
"akka.cluster.roles = [io]" ) )
+                        .withFallback( ConfigFactory.load( "cluster-singleton" 
) );
+
+                configs.put( region, config );
+            }
+
+        } catch ( Exception e ) {
+            throw new RuntimeException("Error 'parsing 
collection.akka.region.seeds' property", e );
+        }
+
+        return configs;
+    }
+
+
+
+    @Override
+    public void reserveUniqueValues(Entity entity) throws UniqueValueException 
{
+
+        try {
+            for (Field field : entity.getFields()) {
+                if (field.isUnique()) {
+                    reserveUniqueField( entity, field.getName(), 
field.getValue().toString() );
+                }
+            }
+
+        } catch ( UniqueValueException e ) {
+
+            for (Field field : entity.getFields()) {
+                try {
+                    cancelUniqueField( entity, field.getName(), 
field.getValue().toString() );
+                } catch (Throwable ignored) {
+                    logger.debug( "Error canceling unique field", ignored );
+                }
+            }
+            throw e;
+        }
+
+    }
+
+
+    @Override
+    public void confirmUniqueValues(Entity entity) throws UniqueValueException 
{
+
+        try {
+            for (Field field : entity.getFields()) {
+                if (field.isUnique()) {
+                    confirmUniqueField( entity, field.getName(), 
field.getValue().toString() );
+                }
+            }
+
+        } catch ( UniqueValueException e ) {
+
+            for (Field field : entity.getFields()) {
+                try {
+                    cancelUniqueField( entity, field.getName(), 
field.getValue().toString() );
+                } catch (Throwable ignored) {
+                    logger.debug( "Error canceling unique field", ignored );
+                }
+            }
+            throw e;
+        }
+
+    }
+
+
+    private void reserveUniqueField(
+        Entity entity, String propertyName, String propertyValue ) throws 
UniqueValueException {
+
+        String region = getRegionsByType().get("user");
+        ActorRef requestActor = getRequestActorsByRegion().get(region);
+
+        if ( requestActor == null ) {
+            throw new RuntimeException( "No request actor for type, cannot 
verify unique fields!" );
+        }
+
+        UniqueValueActor.Request request = new UniqueValueActor.Reservation(
+            entity.getId().getUuid(), "user",  propertyName, propertyValue );
+
+        UniqueValueActor.Reservation res = reservationCache.get( 
request.getRowKey() );
+//        if ( res != null ) {
+//            getCacheCounter().inc();
+//        }
+        if ( res != null && !res.getUuid().equals( request.getUuid() )) {
+            throw new UniqueValueException( "Error property not unique 
(cache)" );
+        }
+
+        sendUniqueValueRequest( entity, requestActor, request );
+    }
+
+    private void confirmUniqueField(
+        Entity entity, String propertyName, String propertyValue ) throws 
UniqueValueException {
+
+        String region = getRegionsByType().get("user");
+        ActorRef requestActor = getRequestActorsByRegion().get(region);
+
+        if ( requestActor == null ) {
+            throw new RuntimeException( "No request actor for type, cannot 
verify unique fields!" );
+        }
+
+        UniqueValueActor.Confirmation request = new 
UniqueValueActor.Confirmation(
+                entity.getId().getUuid(), "user",  propertyName, propertyValue 
);
+
+        sendUniqueValueRequest( entity, requestActor, request );
+    }
+
+    private void cancelUniqueField(
+        Entity entity, String propertyName, String propertyValue ) throws 
UniqueValueException {
+
+        ActorRef requestActor = lookupRequestActorForType( "user" );
+
+        if ( requestActor == null ) {
+            throw new RuntimeException( "No request actor for type, cannot 
verify unique fields!" );
+        }
+
+        UniqueValueActor.Confirmation request = new 
UniqueValueActor.Confirmation(
+                entity.getId().getUuid(), "user",  propertyName, propertyValue 
);
+
+        requestActor.tell( request, null );
+    }
+
+    private ActorRef lookupRequestActorForType( String type ) {
+        String region = getRegionsByType().get( type );
+        if ( region == null ) {
+            throw new RuntimeException( "No region specified for type: " + 
type );
+        }
+        ActorRef requestActor = getRequestActorsByRegion().get(region);
+        if ( requestActor == null ) {
+            throw new RuntimeException( "No request actor available for 
region: " + region );
+        }
+        return requestActor;
+    }
+
+    private void sendUniqueValueRequest(
+           Entity entity, ActorRef requestActor, UniqueValueActor.Request 
request) throws UniqueValueException {
+
+        int maxRetries = 5;
+        int retries = 0;
+
+        UniqueValueActor.Response response = null;
+        while ( retries++ < maxRetries ) {
+            try {
+                Timeout t = new Timeout( 1, TimeUnit.SECONDS );
+
+                // ask RequestActor and wait (up to timeout) for response
+
+                Future<Object> fut = Patterns.ask( requestActor, request, t );
+                response = (UniqueValueActor.Response) Await.result( fut, 
t.duration() );
+
+                if ( response != null && (
+                        response.getStatus().equals( 
UniqueValueActor.Response.Status.IS_UNIQUE )
+                                || response.getStatus().equals( 
UniqueValueActor.Response.Status.NOT_UNIQUE ))) {
+                    if ( retries > 1 ) {
+                        logger.debug("IS_UNIQUE after retrying {} for entity 
{} rowkey {}",
+                                retries, entity.getId().getUuid(), 
request.getRowKey());
+                    }
+                    break;
+
+                } else if ( response != null  ) {
+                    logger.debug("ERROR status retrying {} entity {} rowkey 
{}",
+                            retries, entity.getId().getUuid(), 
request.getRowKey());
+                } else {
+                    logger.debug("Timed-out retrying {} entity {} rowkey",
+                            retries, entity.getId().getUuid(), 
request.getRowKey());
+                }
+
+            } catch ( Exception e ) {
+                logger.debug("{} caused retry {} for entity {} rowkey {}",
+                        e.getClass().getSimpleName(), retries, 
entity.getId().getUuid(), request.getRowKey());
+            }
+        }
+
+        if ( response == null || response.getStatus().equals( 
UniqueValueActor.Response.Status.ERROR )) {
+            logger.debug("ERROR after retrying {} for entity {} rowkey {}",
+                    retries, entity.getId().getUuid(), request.getRowKey());
+
+            // should result in an HTTP 503
+            throw new RuntimeException( "Error verifying unique value after " 
+ retries + " retries");
+        }
+
+        if ( response.getStatus().equals( 
UniqueValueActor.Response.Status.NOT_UNIQUE )) {
+
+            // should result in an HTTP 409 (conflict)
+            throw new UniqueValueException( "Error property not unique" );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTable.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTable.java
 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTable.java
new file mode 100644
index 0000000..4309eb0
--- /dev/null
+++ 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTable.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.collection.uniquevalues;
+
+import java.util.UUID;
+
+
+public interface UniqueValuesTable {
+
+    UUID lookupOwner(String entityType, String propertyName, String 
propertyValue);
+
+    void reserve(UUID owner, String entityType, String propertyName, String 
propertyValue);
+
+    void commit(UUID owner, String entityType, String propertyName, String 
propertyValue);
+
+    void cancel(String entityType, String propertyName, String propertyValue);
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java
 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java
new file mode 100644
index 0000000..ee3d621
--- /dev/null
+++ 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.collection.uniquevalues;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+
+
+public class UniqueValuesTableImpl implements UniqueValuesTable {
+    private static final Logger logger = LoggerFactory.getLogger( 
UniqueValuesTableImpl.class );
+
+    @Override
+    public UUID lookupOwner(String entityType, String propertyName, String 
propertyValue) {
+        return null;
+    }
+
+    @Override
+    public void reserve(UUID owner, String entityType, String propertyName, 
String propertyValue) {
+    }
+
+    @Override
+    public void commit(UUID owner, String entityType, String propertyName, 
String propertyValue) {
+    }
+
+    @Override
+    public void cancel(String entityType, String propertyName, String 
propertyValue) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/collection/src/main/resources/application.conf 
b/stack/corepersistence/collection/src/main/resources/application.conf
new file mode 100644
index 0000000..93854f9
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/resources/application.conf
@@ -0,0 +1,28 @@
+akka {
+  
+  loggers = ["akka.event.slf4j.Slf4jLogger"]
+  loglevel = "ERROR"
+  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
+  
+  actor {
+    provider = "akka.cluster.ClusterActorRefProvider"
+  }
+  
+  remote {
+    log-remote-lifecycle-events = off
+    netty.tcp {
+      hostname = "127.0.0.1"
+      port = 0
+    }
+  }
+}
+
+# Disable legacy metrics in akka-cluster.
+akka.cluster.metrics.enabled=off
+
+# Enable metrics extension in akka-cluster-metrics.
+akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension", 
"akka.cluster.pubsub.DistributedPubSub"]
+
+# Sigar native library extract location during tests.
+# Note: use per-jvm-instance folder when running multiple jvm on one host. 
+akka.cluster.metrics.native-library-extract-folder=${user.dir}/target/native

http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/src/main/resources/cluster-singleton.conf
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/collection/src/main/resources/cluster-singleton.conf 
b/stack/corepersistence/collection/src/main/resources/cluster-singleton.conf
new file mode 100644
index 0000000..907aebb
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/resources/cluster-singleton.conf
@@ -0,0 +1,25 @@
+include "application"
+
+akka.actor.deployment {
+  /uvRouter/singleton/router {
+    router = consistent-hashing-pool
+    cluster {
+      enabled = on
+      allow-local-routees = on
+      
+      # singleton will only run on nodes with role "io"
+      use-role = io
+
+      # more forgiving failure detector
+      failure-detector {
+        threshold = 10
+        acceptable-heartbeat-pause = 3 s
+        heartbeat-interval = 1 s
+        heartbeat-request {
+          expected-response-after = 3 s
+        }
+      }
+
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java
 
b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java
index 5718e4f..dd618a9 100644
--- 
a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java
+++ 
b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java
@@ -20,6 +20,8 @@
 package org.apache.usergrid.persistence.collection.guice;
 
 
+import com.google.inject.Guice;
+import com.google.inject.Injector;
 import 
org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
 import org.apache.usergrid.persistence.core.guice.CommonModule;
 import org.apache.usergrid.persistence.core.guice.TestModule;
@@ -28,6 +30,9 @@ import 
org.apache.usergrid.persistence.core.migration.data.TestMigrationDataProv
 
 import com.google.inject.TypeLiteral;
 
+import java.util.HashMap;
+import java.util.Map;
+
 
 public class TestCollectionModule extends TestModule {
 
@@ -51,4 +56,16 @@ public class TestCollectionModule extends TestModule {
 //        install(new MaxMigrationModule());
 
     }
+
+
+    private static Map<String, Injector> injectorsByName = new HashMap<>();
+
+    public static Injector getInjector( String name ) {
+        Injector i = injectorsByName.get( name );
+        if ( i == null ) {
+            i = Guice.createInjector( new TestCollectionModule() );
+        }
+        return i;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFigTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFigTest.java
 
b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFigTest.java
new file mode 100644
index 0000000..59f76de
--- /dev/null
+++ 
b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFigTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.collection.uniquevalues;
+
+import com.google.inject.Inject;
+import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
+import org.apache.usergrid.persistence.core.test.ITRunner;
+import org.apache.usergrid.persistence.core.test.UseModules;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+
+@RunWith( ITRunner.class )
+@UseModules( TestCollectionModule.class )
+public class AkkaFigTest {
+
+    @Inject
+    AkkaFig akkaFig;
+
+    @Test
+    public void testBasicOperation() {
+        Assert.assertNotNull( akkaFig );
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/uniquevalues/LocalPreventDupsTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/uniquevalues/LocalPreventDupsTest.java
 
b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/uniquevalues/LocalPreventDupsTest.java
new file mode 100644
index 0000000..bee47eb
--- /dev/null
+++ 
b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/uniquevalues/LocalPreventDupsTest.java
@@ -0,0 +1,141 @@
+package org.apache.usergrid.persistence.collection.uniquevalues;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.inject.Inject;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
+import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.core.test.ITRunner;
+import org.apache.usergrid.persistence.core.test.UseModules;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.field.StringField;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+
+import java.util.Collection;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Prevent dups test that uses UserManager and not REST.
+ */
+@RunWith( ITRunner.class )
+@UseModules( TestCollectionModule.class )
+public class LocalPreventDupsTest {
+    private static final Logger logger = LoggerFactory.getLogger( 
LocalPreventDupsTest.class );
+
+    @Inject
+    private EntityCollectionManagerFactory factory;
+
+    @Inject
+    @Rule
+    public MigrationManagerRule migrationManagerRule;
+
+
+    private static final AtomicInteger successCounter = new AtomicInteger( 0 );
+    private static final AtomicInteger errorCounter = new AtomicInteger( 0 );
+
+    @Test
+    public void testBasicOperation() throws Exception {
+
+        UniqueValuesService appEast1 =
+            TestCollectionModule.getInjector( "us-east" ).getInstance( 
UniqueValuesService.class );
+        appEast1.start("127.0.0.1", 2551, "us-east");
+
+        UniqueValuesService appEast2 =
+            TestCollectionModule.getInjector( "us-east" ).getInstance( 
UniqueValuesService.class );
+        appEast2.start("127.0.0.1", 2552, "us-east");
+
+        appEast1.waitForRequestActors();
+        appEast2.waitForRequestActors();
+
+        int numUsers = 100;
+        Multimap<String, Entity> usersCreated = generateDuplicateUsers( 
numUsers );
+
+        int userCount = 0;
+        int usernamesWithDuplicates = 0;
+        for ( String username : usersCreated.keySet() ) {
+            Collection<Entity> users = usersCreated.get( username );
+            if ( users.size() > 1 ) {
+                usernamesWithDuplicates++;
+            }
+            userCount++;
+        }
+
+        Assert.assertEquals( 0, usernamesWithDuplicates );
+        Assert.assertEquals( numUsers, successCounter.get() );
+        Assert.assertEquals( 0, errorCounter.get() );
+        Assert.assertEquals( numUsers, usersCreated.size() );
+        Assert.assertEquals( numUsers, userCount );
+    }
+
+    private Multimap<String, Entity> generateDuplicateUsers(int numUsers ) {
+
+        ApplicationScope context = new ApplicationScopeImpl( new SimpleId( 
"organization" ) );
+
+        EntityCollectionManager manager = factory.createCollectionManager( 
context );
+
+        Multimap<String, Entity> usersCreated =
+                Multimaps.synchronizedListMultimap( ArrayListMultimap.create() 
);
+
+        ExecutorService execService = Executors.newFixedThreadPool( 10 );
+
+        for (int i = 0; i < numUsers; i++) {
+
+            // multiple threads simultaneously trying to create a user with 
the same propertyName
+            for (int j = 0; j < 5; j++) {
+                String username = "user_" + i;
+
+                execService.submit( () -> {
+
+                    try {
+                        Entity newEntity = new Entity( new SimpleId( "user" ) 
);
+                        newEntity.setField( new StringField( "username", 
username, true ) );
+                        newEntity.setField( new StringField( "email", username 
+ "@example.org", true ) );
+
+                        Observable<Entity> observable = manager.write( 
newEntity );
+                        Entity returned = 
observable.toBlocking().lastOrDefault( null );
+
+                        usersCreated.put( username, newEntity );
+                        successCounter.incrementAndGet();
+
+                        logger.debug("Created user {}", username);
+
+                    } catch ( Throwable t ) {
+                        if ( t instanceof UniqueValueException ) {
+                            // we expect lots of these
+                        } else {
+                            errorCounter.incrementAndGet();
+                            logger.error( "Error creating user " + username, t 
);
+                        }
+                    }
+
+                } );
+            }
+        }
+        execService.shutdown();
+
+        try {
+            while (!execService.awaitTermination( 60, TimeUnit.SECONDS )) {
+                System.out.println( "Waiting..." );
+            }
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+        return usersCreated;
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/src/test/resources/usergrid.properties
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/collection/src/test/resources/usergrid.properties 
b/stack/corepersistence/collection/src/test/resources/usergrid.properties
index 015c681..9059f0e 100644
--- a/stack/corepersistence/collection/src/test/resources/usergrid.properties
+++ b/stack/corepersistence/collection/src/test/resources/usergrid.properties
@@ -1,2 +1,16 @@
 # This property is required to be set and cannot be defaulted anywhere
 usergrid.cluster_name=usergrid
+
+collection.akka.hostname=localhost
+
+collection.akka.port=2551
+
+collection.akka.region=us-east
+
+collection.akka.regions=us-east
+
+collection.akka.region.seeds=us-east:localhost:2551,us-east:localhost:2552
+
+collection.akka.region.types=us-east:users,us-east:cats
+
+collection.akka.unique.value.actors=400

Reply via email to