require RouterProducers to provide list of the Message Types that they handle, so that ActorSystemManager can set them up.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/06cc50f2 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/06cc50f2 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/06cc50f2 Branch: refs/heads/master Commit: 06cc50f29196600fbecacbd20b615ad0cb8f9f02 Parents: 5dc3324 Author: Dave Johnson <[email protected]> Authored: Mon Jul 25 15:37:29 2016 -0400 Committer: Dave Johnson <[email protected]> Committed: Mon Jul 25 15:37:29 2016 -0400 ---------------------------------------------------------------------- .../corepersistence/CpEntityManagerFactory.java | 4 - .../corepersistence/index/IndexServiceTest.java | 4 - .../actorsystem/ActorSystemManager.java | 9 +-- .../actorsystem/ActorSystemManagerImpl.java | 14 ++-- .../persistence/actorsystem/RouterProducer.java | 8 ++ .../actorsystem/ActorServiceServiceTest.java | 4 - .../uniquevalues/UniqueValuesServiceImpl.java | 85 ++++++++++++++------ .../collection/AbstractUniqueValueTest.java | 4 - 8 files changed, 76 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/06cc50f2/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java index a419e58..8055740 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java @@ -150,10 +150,6 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application this.actorSystemManager = injector.getInstance( ActorSystemManager.class ); actorSystemManager.registerRouterProducer( uniqueValuesService ); - actorSystemManager.registerMessageType( UniqueValueActor.Request.class, "/user/uvProxy" ); - actorSystemManager.registerMessageType( UniqueValueActor.Reservation.class, "/user/uvProxy" ); - actorSystemManager.registerMessageType( UniqueValueActor.Cancellation.class, "/user/uvProxy" ); - actorSystemManager.registerMessageType( UniqueValueActor.Confirmation.class, "/user/uvProxy" ); actorSystemManager.start(); actorSystemManager.waitForClientActor(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/06cc50f2/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java index adecd9d..ecc2b46 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java @@ -106,10 +106,6 @@ public class IndexServiceTest { if ( startedAkka.get(port) == null ) { actorSystemManager.registerRouterProducer( uniqueValuesService ); - actorSystemManager.registerMessageType( UniqueValueActor.Request.class, "/user/uvProxy" ); - actorSystemManager.registerMessageType( UniqueValueActor.Reservation.class, "/user/uvProxy" ); - actorSystemManager.registerMessageType( UniqueValueActor.Cancellation.class, "/user/uvProxy" ); - actorSystemManager.registerMessageType( UniqueValueActor.Confirmation.class, "/user/uvProxy" ); actorSystemManager.start( "localhost", port, "us-east" ); actorSystemManager.waitForClientActor(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/06cc50f2/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java index c7322dd..17754f0 100644 --- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java +++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java @@ -51,13 +51,6 @@ public interface ActorSystemManager { void registerRouterProducer( RouterProducer routerProducer ); /** - * MUST be called before start() to register any messages to be sent. - * @param messageType Class of message. - * @param routerPath Router-path to which such messages are to be sent. - */ - void registerMessageType( Class messageType, String routerPath ); - - /** * Local client for ActorSystem, send all local messages here for routing. */ ActorRef getClientActor(); @@ -75,7 +68,7 @@ public interface ActorSystemManager { /** * Get all regions known to system. */ - public Set<String> getRegions(); + Set<String> getRegions(); /** * Publish message to all topic subscribers in all regions. http://git-wip-us.apache.org/repos/asf/usergrid/blob/06cc50f2/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java index d8d284f..bef9335 100644 --- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java +++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java @@ -69,7 +69,6 @@ public class ActorSystemManagerImpl implements ActorSystemManager { private ActorSystem clusterSystem = null; - @Inject public ActorSystemManagerImpl( ActorSystemFig actorSystemFig ) { this.actorSystemFig = actorSystemFig; @@ -131,9 +130,7 @@ public class ActorSystemManagerImpl implements ActorSystemManager { } - @Override public void registerMessageType(Class messageType, String routerPath) { - routersByMessageType.put( messageType, routerPath ); } @@ -198,7 +195,14 @@ public class ActorSystemManagerImpl implements ActorSystemManager { createClientActors( clusterSystem ); for ( RouterProducer routerProducer : routerProducers ) { + routerProducer.createLocalSystemActors( clusterSystem ); + + Iterator<Class> messageTypes = routerProducer.getMessageTypes().iterator(); + while ( messageTypes.hasNext() ) { + Class messageType = messageTypes.next(); + routersByMessageType.put( messageType, routerProducer.getRouterPath() ); + } } mediator = DistributedPubSub.get( clusterSystem ).mediator(); @@ -337,12 +341,10 @@ public class ActorSystemManagerImpl implements ActorSystemManager { */ private ActorSystem createClusterSystemsFromConfigs( Config config ) { - // there is only 1 akka system for a Usergrid cluster final String clusterName = "ClusterSystem"; - - if( clusterSystem == null) { + if ( clusterSystem == null) { logger.info("Class: {}. ActorSystem [{}] not initialized, creating...", this, clusterName); http://git-wip-us.apache.org/repos/asf/usergrid/blob/06cc50f2/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java index d849dd9..9c3ce3d 100644 --- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java +++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java @@ -19,6 +19,8 @@ package org.apache.usergrid.persistence.actorsystem; import akka.actor.ActorSystem; + +import java.util.Collection; import java.util.Map; @@ -26,6 +28,8 @@ public interface RouterProducer { String getName(); + String getRouterPath(); + /** * Create cluster single manager for current region. * Will be called once per router per JVM. @@ -48,4 +52,8 @@ public interface RouterProducer { */ void addConfiguration(Map<String, Object> configMap ); + /** + * Get all message types that should be sent to this router. + */ + Collection<Class> getMessageTypes(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/06cc50f2/stack/corepersistence/actorsystem/src/test/java/org/apache/usergrid/persistence/actorsystem/ActorServiceServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/test/java/org/apache/usergrid/persistence/actorsystem/ActorServiceServiceTest.java b/stack/corepersistence/actorsystem/src/test/java/org/apache/usergrid/persistence/actorsystem/ActorServiceServiceTest.java index 7ac7b12..f1a3197 100644 --- a/stack/corepersistence/actorsystem/src/test/java/org/apache/usergrid/persistence/actorsystem/ActorServiceServiceTest.java +++ b/stack/corepersistence/actorsystem/src/test/java/org/apache/usergrid/persistence/actorsystem/ActorServiceServiceTest.java @@ -57,10 +57,6 @@ public class ActorServiceServiceTest { RouterProducer routerProducer = Mockito.mock( RouterProducer.class ); actorSystemManager.registerRouterProducer( routerProducer ); - actorSystemManager.registerMessageType( String.class, "/users/path" ); - actorSystemManager.registerMessageType( Integer.class, "/users/path" ); - actorSystemManager.registerMessageType( Long.class, "/users/path" ); - actorSystemManager.start( "localhost", 2770, "us-east" ); actorSystemManager.waitForClientActor(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/06cc50f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java index 50114be..0edc9ff 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java @@ -35,7 +35,6 @@ import com.google.inject.Singleton; import org.apache.commons.lang3.StringUtils; import org.apache.usergrid.persistence.actorsystem.ActorSystemManager; import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer; -import org.apache.usergrid.persistence.collection.serialization.UniqueValue; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; @@ -45,10 +44,7 @@ import org.slf4j.LoggerFactory; import scala.concurrent.Await; import scala.concurrent.Future; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.UUID; +import java.util.*; import java.util.concurrent.TimeUnit; @@ -86,6 +82,12 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { } + @Override + public String getRouterPath() { + return "/user/uvProxy"; + } + + private void subscribeToReservations( ActorSystem localSystem ) { logger.info("Starting ReservationCacheUpdater"); localSystem.actorOf( Props.create( ReservationCacheActor.class ), "subscriber"); @@ -337,36 +339,67 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { subscribeToReservations( localSystem ); } + @Override public void addConfiguration( Map<String, Object> configMap ) { int numInstancesPerNode = uniqueValuesFig.getUniqueValueInstancesPerNode(); - Map<String, Object> akka = (Map<String, Object>)configMap.get("akka"); - - // TODO: replace this configuration stuff with equivalent Java code in the above "create" methods - - akka.put( "actor", new HashMap<String, Object>() {{ - put( "deployment", new HashMap<String, Object>() {{ - put( "/uvRouter/singleton/router", new HashMap<String, Object>() {{ - put( "router", "consistent-hashing-pool" ); - put( "cluster", new HashMap<String, Object>() {{ - put( "enabled", "on" ); - put( "allow-local-routees", "on" ); - put( "use-role", "io" ); - put( "max-nr-of-instances-per-node", numInstancesPerNode ); - put( "failure-detector", new HashMap<String, Object>() {{ - put( "threshold", "10" ); - put( "acceptable-heartbeat-pause", "3 s" ); - put( "heartbeat-interval", "1 s" ); - put( "heartbeat-request", new HashMap<String, Object>() {{ - put( "expected-response-after", "3 s" ); - }} ); - }} ); + // TODO: replace this configuration stuff with equivalent Java code in the above "create" methods? + + // be careful not to overwrite configurations that other router producers may have added + + Map<String, Object> akka = (Map<String, Object>) configMap.get( "akka" ); + final Map<String, Object> deploymentMap; + + if ( akka.get( "actor" ) == null ) { + + // nobody has created anything under "actor" yet, so create it now + deploymentMap = new HashMap<>(); + akka.put( "actor", new HashMap<String, Object>() {{ + put( "deployment", deploymentMap ); + }} ); + + } else if (((Map) akka.get( "actor" )).get( "deployment" ) == null) { + + // nobody has created anything under "actor/deployment" yet, so create it now + deploymentMap = new HashMap<>(); + ((Map) akka.get( "actor" )).put( "deployment", deploymentMap ); + + } else { + + // somebody else already created "actor/deployment" config so use it + deploymentMap = (Map<String, Object>) ((Map) akka.get( "actor" )).get( "deployment" ); + } + + deploymentMap.put( "/uvRouter/singleton/router", new HashMap<String, Object>() {{ + put( "router", "consistent-hashing-pool" ); + put( "cluster", new HashMap<String, Object>() {{ + put( "enabled", "on" ); + put( "allow-local-routees", "on" ); + put( "use-role", "io" ); + put( "max-nr-of-instances-per-node", numInstancesPerNode ); + put( "failure-detector", new HashMap<String, Object>() {{ + put( "threshold", "10" ); + put( "acceptable-heartbeat-pause", "3 s" ); + put( "heartbeat-interval", "1 s" ); + put( "heartbeat-request", new HashMap<String, Object>() {{ + put( "expected-response-after", "3 s" ); }} ); }} ); }} ); }} ); } + + + @Override + public Collection<Class> getMessageTypes() { + List<Class> messageTypes = new ArrayList<>(); + messageTypes.add( UniqueValueActor.Request.class); + messageTypes.add( UniqueValueActor.Reservation.class); + messageTypes.add( UniqueValueActor.Cancellation.class); + messageTypes.add( UniqueValueActor.Confirmation.class); + return messageTypes; + } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/06cc50f2/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/AbstractUniqueValueTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/AbstractUniqueValueTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/AbstractUniqueValueTest.java index 3bfc48b..cff70ee 100644 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/AbstractUniqueValueTest.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/AbstractUniqueValueTest.java @@ -36,10 +36,6 @@ public class AbstractUniqueValueTest { if ( startedAkka.get(port) == null ) { actorSystemManager.registerRouterProducer( uniqueValuesService ); - actorSystemManager.registerMessageType( UniqueValueActor.Request.class, "/user/uvProxy" ); - actorSystemManager.registerMessageType( UniqueValueActor.Reservation.class, "/user/uvProxy" ); - actorSystemManager.registerMessageType( UniqueValueActor.Cancellation.class, "/user/uvProxy" ); - actorSystemManager.registerMessageType( UniqueValueActor.Confirmation.class, "/user/uvProxy" ); actorSystemManager.start( "localhost", port, "us-east" ); actorSystemManager.waitForClientActor();
