require RouterProducers to provide list of the Message Types that they handle, 
so that ActorSystemManager can set them up.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/06cc50f2
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/06cc50f2
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/06cc50f2

Branch: refs/heads/release-2.1.1
Commit: 06cc50f29196600fbecacbd20b615ad0cb8f9f02
Parents: 5dc3324
Author: Dave Johnson <[email protected]>
Authored: Mon Jul 25 15:37:29 2016 -0400
Committer: Dave Johnson <[email protected]>
Committed: Mon Jul 25 15:37:29 2016 -0400

----------------------------------------------------------------------
 .../corepersistence/CpEntityManagerFactory.java |  4 -
 .../corepersistence/index/IndexServiceTest.java |  4 -
 .../actorsystem/ActorSystemManager.java         |  9 +--
 .../actorsystem/ActorSystemManagerImpl.java     | 14 ++--
 .../persistence/actorsystem/RouterProducer.java |  8 ++
 .../actorsystem/ActorServiceServiceTest.java    |  4 -
 .../uniquevalues/UniqueValuesServiceImpl.java   | 85 ++++++++++++++------
 .../collection/AbstractUniqueValueTest.java     |  4 -
 8 files changed, 76 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/06cc50f2/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index a419e58..8055740 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -150,10 +150,6 @@ public class CpEntityManagerFactory implements 
EntityManagerFactory, Application
                 this.actorSystemManager = injector.getInstance( 
ActorSystemManager.class );
 
                 actorSystemManager.registerRouterProducer( uniqueValuesService 
);
-                actorSystemManager.registerMessageType( 
UniqueValueActor.Request.class, "/user/uvProxy" );
-                actorSystemManager.registerMessageType( 
UniqueValueActor.Reservation.class, "/user/uvProxy" );
-                actorSystemManager.registerMessageType( 
UniqueValueActor.Cancellation.class, "/user/uvProxy" );
-                actorSystemManager.registerMessageType( 
UniqueValueActor.Confirmation.class, "/user/uvProxy" );
                 actorSystemManager.start();
                 actorSystemManager.waitForClientActor();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/06cc50f2/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
index adecd9d..ecc2b46 100644
--- 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
+++ 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
@@ -106,10 +106,6 @@ public class IndexServiceTest {
         if ( startedAkka.get(port) == null ) {
 
             actorSystemManager.registerRouterProducer( uniqueValuesService );
-            actorSystemManager.registerMessageType( 
UniqueValueActor.Request.class, "/user/uvProxy" );
-            actorSystemManager.registerMessageType( 
UniqueValueActor.Reservation.class, "/user/uvProxy" );
-            actorSystemManager.registerMessageType( 
UniqueValueActor.Cancellation.class, "/user/uvProxy" );
-            actorSystemManager.registerMessageType( 
UniqueValueActor.Confirmation.class, "/user/uvProxy" );
             actorSystemManager.start( "localhost", port, "us-east" );
             actorSystemManager.waitForClientActor();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/06cc50f2/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java
 
b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java
index c7322dd..17754f0 100644
--- 
a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java
+++ 
b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java
@@ -51,13 +51,6 @@ public interface ActorSystemManager {
     void registerRouterProducer( RouterProducer routerProducer );
 
     /**
-     * MUST be called before start() to register any messages to be sent.
-     * @param messageType Class of message.
-     * @param routerPath Router-path to which such messages are to be sent.
-     */
-    void registerMessageType( Class messageType, String routerPath );
-
-    /**
      * Local client for ActorSystem, send all local messages here for routing.
      */
     ActorRef getClientActor();
@@ -75,7 +68,7 @@ public interface ActorSystemManager {
     /**
      * Get all regions known to system.
      */
-    public Set<String> getRegions();
+    Set<String> getRegions();
 
     /**
      * Publish message to all topic subscribers in all regions.

http://git-wip-us.apache.org/repos/asf/usergrid/blob/06cc50f2/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
 
b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
index d8d284f..bef9335 100644
--- 
a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
+++ 
b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
@@ -69,7 +69,6 @@ public class ActorSystemManagerImpl implements 
ActorSystemManager {
     private ActorSystem clusterSystem = null;
 
 
-
     @Inject
     public ActorSystemManagerImpl( ActorSystemFig actorSystemFig ) {
         this.actorSystemFig = actorSystemFig;
@@ -131,9 +130,7 @@ public class ActorSystemManagerImpl implements 
ActorSystemManager {
     }
 
 
-    @Override
     public void registerMessageType(Class messageType, String routerPath) {
-        routersByMessageType.put( messageType, routerPath );
     }
 
 
@@ -198,7 +195,14 @@ public class ActorSystemManagerImpl implements 
ActorSystemManager {
         createClientActors( clusterSystem );
 
         for ( RouterProducer routerProducer : routerProducers ) {
+
             routerProducer.createLocalSystemActors( clusterSystem );
+
+            Iterator<Class> messageTypes = 
routerProducer.getMessageTypes().iterator();
+            while ( messageTypes.hasNext() ) {
+                Class messageType = messageTypes.next();
+                routersByMessageType.put( messageType, 
routerProducer.getRouterPath() );
+            }
         }
 
         mediator = DistributedPubSub.get( clusterSystem ).mediator();
@@ -337,12 +341,10 @@ public class ActorSystemManagerImpl implements 
ActorSystemManager {
      */
     private ActorSystem createClusterSystemsFromConfigs( Config config ) {
 
-
         // there is only 1 akka system for a Usergrid cluster
         final String clusterName = "ClusterSystem";
 
-
-        if( clusterSystem == null) {
+        if ( clusterSystem == null) {
 
             logger.info("Class: {}. ActorSystem [{}] not initialized, 
creating...", this, clusterName);
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/06cc50f2/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java
 
b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java
index d849dd9..9c3ce3d 100644
--- 
a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java
+++ 
b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java
@@ -19,6 +19,8 @@
 package org.apache.usergrid.persistence.actorsystem;
 
 import akka.actor.ActorSystem;
+
+import java.util.Collection;
 import java.util.Map;
 
 
@@ -26,6 +28,8 @@ public interface RouterProducer {
 
     String getName();
 
+    String getRouterPath();
+
     /**
      * Create cluster single manager for current region.
      * Will be called once per router per JVM.
@@ -48,4 +52,8 @@ public interface RouterProducer {
      */
     void addConfiguration(Map<String, Object> configMap );
 
+    /**
+     * Get all message types that should be sent to this router.
+     */
+    Collection<Class> getMessageTypes();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/06cc50f2/stack/corepersistence/actorsystem/src/test/java/org/apache/usergrid/persistence/actorsystem/ActorServiceServiceTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/actorsystem/src/test/java/org/apache/usergrid/persistence/actorsystem/ActorServiceServiceTest.java
 
b/stack/corepersistence/actorsystem/src/test/java/org/apache/usergrid/persistence/actorsystem/ActorServiceServiceTest.java
index 7ac7b12..f1a3197 100644
--- 
a/stack/corepersistence/actorsystem/src/test/java/org/apache/usergrid/persistence/actorsystem/ActorServiceServiceTest.java
+++ 
b/stack/corepersistence/actorsystem/src/test/java/org/apache/usergrid/persistence/actorsystem/ActorServiceServiceTest.java
@@ -57,10 +57,6 @@ public class ActorServiceServiceTest {
         RouterProducer routerProducer = Mockito.mock( RouterProducer.class );
         actorSystemManager.registerRouterProducer( routerProducer );
 
-        actorSystemManager.registerMessageType( String.class, "/users/path" );
-        actorSystemManager.registerMessageType( Integer.class, "/users/path" );
-        actorSystemManager.registerMessageType( Long.class, "/users/path" );
-
         actorSystemManager.start( "localhost", 2770, "us-east" );
         actorSystemManager.waitForClientActor();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/06cc50f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
index 50114be..0edc9ff 100644
--- 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
+++ 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
@@ -35,7 +35,6 @@ import com.google.inject.Singleton;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
 import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
-import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -45,10 +44,7 @@ import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.TimeUnit;
 
 
@@ -86,6 +82,12 @@ public class UniqueValuesServiceImpl implements 
UniqueValuesService {
     }
 
 
+    @Override
+    public String getRouterPath() {
+        return "/user/uvProxy";
+    }
+
+
     private void subscribeToReservations( ActorSystem localSystem ) {
         logger.info("Starting ReservationCacheUpdater");
         localSystem.actorOf( Props.create( ReservationCacheActor.class ), 
"subscriber");
@@ -337,36 +339,67 @@ public class UniqueValuesServiceImpl implements 
UniqueValuesService {
         subscribeToReservations( localSystem );
     }
 
+
     @Override
     public void addConfiguration( Map<String, Object> configMap ) {
 
         int numInstancesPerNode = 
uniqueValuesFig.getUniqueValueInstancesPerNode();
 
-        Map<String, Object> akka = (Map<String, Object>)configMap.get("akka");
-
-        // TODO: replace this configuration stuff with equivalent Java code in 
the above "create" methods
-
-        akka.put( "actor", new HashMap<String, Object>() {{
-            put( "deployment", new HashMap<String, Object>() {{
-                put( "/uvRouter/singleton/router", new HashMap<String, 
Object>() {{
-                    put( "router", "consistent-hashing-pool" );
-                    put( "cluster", new HashMap<String, Object>() {{
-                        put( "enabled", "on" );
-                        put( "allow-local-routees", "on" );
-                        put( "use-role", "io" );
-                        put( "max-nr-of-instances-per-node", 
numInstancesPerNode );
-                        put( "failure-detector", new HashMap<String, Object>() 
{{
-                            put( "threshold", "10" );
-                            put( "acceptable-heartbeat-pause", "3 s" );
-                            put( "heartbeat-interval", "1 s" );
-                            put( "heartbeat-request", new HashMap<String, 
Object>() {{
-                                put( "expected-response-after", "3 s" );
-                            }} );
-                        }} );
+        // TODO: replace this configuration stuff with equivalent Java code in 
the above "create" methods?
+
+        // be careful not to overwrite configurations that other router 
producers may have added
+
+        Map<String, Object> akka = (Map<String, Object>) configMap.get( "akka" 
);
+        final Map<String, Object> deploymentMap;
+
+        if ( akka.get( "actor" ) == null ) {
+
+            // nobody has created anything under "actor" yet, so create it now
+            deploymentMap = new HashMap<>();
+            akka.put( "actor", new HashMap<String, Object>() {{
+                put( "deployment", deploymentMap );
+            }} );
+
+        } else if (((Map) akka.get( "actor" )).get( "deployment" ) == null) {
+
+            // nobody has created anything under "actor/deployment" yet, so 
create it now
+            deploymentMap = new HashMap<>();
+            ((Map) akka.get( "actor" )).put( "deployment", deploymentMap );
+
+        } else {
+
+            // somebody else already created "actor/deployment" config so use 
it
+            deploymentMap = (Map<String, Object>) ((Map) akka.get( "actor" 
)).get( "deployment" );
+        }
+
+        deploymentMap.put( "/uvRouter/singleton/router", new HashMap<String, 
Object>() {{
+            put( "router", "consistent-hashing-pool" );
+            put( "cluster", new HashMap<String, Object>() {{
+                put( "enabled", "on" );
+                put( "allow-local-routees", "on" );
+                put( "use-role", "io" );
+                put( "max-nr-of-instances-per-node", numInstancesPerNode );
+                put( "failure-detector", new HashMap<String, Object>() {{
+                    put( "threshold", "10" );
+                    put( "acceptable-heartbeat-pause", "3 s" );
+                    put( "heartbeat-interval", "1 s" );
+                    put( "heartbeat-request", new HashMap<String, Object>() {{
+                        put( "expected-response-after", "3 s" );
                     }} );
                 }} );
             }} );
         }} );
 
     }
+
+
+    @Override
+    public Collection<Class> getMessageTypes() {
+        List<Class> messageTypes = new ArrayList<>();
+        messageTypes.add( UniqueValueActor.Request.class);
+        messageTypes.add( UniqueValueActor.Reservation.class);
+        messageTypes.add( UniqueValueActor.Cancellation.class);
+        messageTypes.add( UniqueValueActor.Confirmation.class);
+        return messageTypes;
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/06cc50f2/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/AbstractUniqueValueTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/AbstractUniqueValueTest.java
 
b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/AbstractUniqueValueTest.java
index 3bfc48b..cff70ee 100644
--- 
a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/AbstractUniqueValueTest.java
+++ 
b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/AbstractUniqueValueTest.java
@@ -36,10 +36,6 @@ public class AbstractUniqueValueTest {
         if ( startedAkka.get(port) == null ) {
 
             actorSystemManager.registerRouterProducer( uniqueValuesService );
-            actorSystemManager.registerMessageType( 
UniqueValueActor.Request.class, "/user/uvProxy" );
-            actorSystemManager.registerMessageType( 
UniqueValueActor.Reservation.class, "/user/uvProxy" );
-            actorSystemManager.registerMessageType( 
UniqueValueActor.Cancellation.class, "/user/uvProxy" );
-            actorSystemManager.registerMessageType( 
UniqueValueActor.Confirmation.class, "/user/uvProxy" );
             actorSystemManager.start( "localhost", port, "us-east" );
             actorSystemManager.waitForClientActor();
 

Reply via email to