Simplify RouterProducer interface, make it more generic (it is not just for cluster singleton routers).
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/00aeed8d Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/00aeed8d Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/00aeed8d Branch: refs/heads/release-2.1.1 Commit: 00aeed8d9b7ed2c59a8052722b682b35eff0b2ff Parents: 06cc50f Author: Dave Johnson <[email protected]> Authored: Tue Jul 26 11:18:20 2016 -0400 Committer: Dave Johnson <[email protected]> Committed: Tue Jul 26 11:18:20 2016 -0400 ---------------------------------------------------------------------- .../actorsystem/ActorSystemManagerImpl.java | 40 +++++++------------- .../persistence/actorsystem/RouterProducer.java | 30 ++++++--------- .../actorsystem/ActorServiceServiceTest.java | 4 +- .../uniquevalues/UniqueValuesServiceImpl.java | 40 +------------------- 4 files changed, 28 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/00aeed8d/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java index bef9335..8dcb550 100644 --- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java +++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java @@ -130,10 +130,6 @@ public class ActorSystemManagerImpl implements ActorSystemManager { } - public void registerMessageType(Class messageType, String routerPath) { - } - - @Override public ActorRef getClientActor() { return clientActor; @@ -188,23 +184,12 @@ public class ActorSystemManagerImpl implements ActorSystemManager { logger.info("Initializing Akka for hostname {} region {} regionList {} seeds {}", hostname, currentRegion, regionList, actorSystemFig.getSeeds() ); - Config config = readClusterSystemConfig(); + Config config = createConfiguration(); - clusterSystem = createClusterSystemsFromConfigs( config ); + clusterSystem = createClusterSystem( config ); createClientActors( clusterSystem ); - for ( RouterProducer routerProducer : routerProducers ) { - - routerProducer.createLocalSystemActors( clusterSystem ); - - Iterator<Class> messageTypes = routerProducer.getMessageTypes().iterator(); - while ( messageTypes.hasNext() ) { - Class messageType = messageTypes.next(); - routersByMessageType.put( messageType, routerProducer.getRouterPath() ); - } - } - mediator = DistributedPubSub.get( clusterSystem ).mediator(); } @@ -277,7 +262,7 @@ public class ActorSystemManagerImpl implements ActorSystemManager { /** * Read cluster config and add seed nodes to it. */ - private Config readClusterSystemConfig() { + private Config createConfiguration() { Config config = null; @@ -337,9 +322,9 @@ public class ActorSystemManagerImpl implements ActorSystemManager { /** - * Create actor system for this region, with cluster singleton manager & proxy. + * Create cluster system for this the current region */ - private ActorSystem createClusterSystemsFromConfigs( Config config ) { + private ActorSystem createClusterSystem( Config config ) { // there is only 1 akka system for a Usergrid cluster final String clusterName = "ClusterSystem"; @@ -351,16 +336,19 @@ public class ActorSystemManagerImpl implements ActorSystemManager { clusterSystem = ActorSystem.create( clusterName, config ); for ( RouterProducer routerProducer : routerProducers ) { - logger.info("Creating router producer [{}] for region [{}]", routerProducer.getName(), currentRegion ); - routerProducer.createClusterSingletonManager( clusterSystem ); + logger.info("Creating router [{}] for region [{}]", routerProducer.getRouterPath(), currentRegion ); + routerProducer.produceRouter( clusterSystem, "io" ); } for ( RouterProducer routerProducer : routerProducers ) { - logger.info("Creating [{}] proxy for region [{}] role 'io'", routerProducer.getName(), currentRegion); - routerProducer.createClusterSingletonProxy( clusterSystem, "io" ); + Iterator<Class> messageTypes = routerProducer.getMessageTypes().iterator(); + while ( messageTypes.hasNext() ) { + Class messageType = messageTypes.next(); + routersByMessageType.put( messageType, routerProducer.getRouterPath() ); + } } - //add a shutdown hook to clean all actor systems if the JVM exits without the servlet container knowing + // add a shutdown hook to clean all actor systems if the JVM exits without the servlet container knowing Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { @@ -454,8 +442,6 @@ public class ActorSystemManagerImpl implements ActorSystemManager { logger.info("Shutting down Akka cluster: {}", clusterSystem.name()); clusterSystem.shutdown(); - - } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/00aeed8d/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java index 9c3ce3d..5c14c6b 100644 --- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java +++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java @@ -24,36 +24,30 @@ import java.util.Collection; import java.util.Map; +/** + * Interface used by ActorSystemManager to configure and create an Akka router. + */ public interface RouterProducer { - String getName(); - - String getRouterPath(); - - /** - * Create cluster single manager for current region. - * Will be called once per router per JVM. - */ - void createClusterSingletonManager( ActorSystem system ); - /** - * Create cluster singleton proxy for region. - * Will be called once per router per JVM per region. + * Path to be used to send messages to this router. */ - void createClusterSingletonProxy( ActorSystem system, String role ); + String getRouterPath(); /** - * Create other actors needed to support the router produced by the implementation. + * Returns all message types that should be sent to this router for routing. */ - void createLocalSystemActors( ActorSystem localSystem ); + Collection<Class> getMessageTypes(); /** - * Add configuration for the router to configuration map + * Add configuration for the router to existing ActorSystem configuration. + * Called before ActorSystem is created. */ void addConfiguration(Map<String, Object> configMap ); /** - * Get all message types that should be sent to this router. + * Produce router and any supporting objects. + * Called after ActorSystem is created. */ - Collection<Class> getMessageTypes(); + void produceRouter( ActorSystem system, String role ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/00aeed8d/stack/corepersistence/actorsystem/src/test/java/org/apache/usergrid/persistence/actorsystem/ActorServiceServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/test/java/org/apache/usergrid/persistence/actorsystem/ActorServiceServiceTest.java b/stack/corepersistence/actorsystem/src/test/java/org/apache/usergrid/persistence/actorsystem/ActorServiceServiceTest.java index f1a3197..c20b9a1 100644 --- a/stack/corepersistence/actorsystem/src/test/java/org/apache/usergrid/persistence/actorsystem/ActorServiceServiceTest.java +++ b/stack/corepersistence/actorsystem/src/test/java/org/apache/usergrid/persistence/actorsystem/ActorServiceServiceTest.java @@ -60,10 +60,8 @@ public class ActorServiceServiceTest { actorSystemManager.start( "localhost", 2770, "us-east" ); actorSystemManager.waitForClientActor(); - verify( routerProducer ).createClusterSingletonManager( any() ); - verify( routerProducer ).createClusterSingletonProxy( any(), eq("io") ); - verify( routerProducer ).createLocalSystemActors( any() ); verify( routerProducer ).addConfiguration( any() ); + verify( routerProducer ).produceRouter( any(), eq("io") ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/00aeed8d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java index 0edc9ff..777029f 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java @@ -77,12 +77,6 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { @Override - public String getName() { - return "UniqueValues ClusterSingleton Router"; - } - - - @Override public String getRouterPath() { return "/user/uvProxy"; } @@ -153,25 +147,6 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { } - // TODO: do we need this or can we rely on UniqueCleanup + Cassandra replication? - -// @Override -// public void releaseUniqueValues(ApplicationScope scope, Id entityId, UUID version, String region) -// throws UniqueValueException { -// -// ready(); -// -// TODO: need to replicate logic from UniqueCleanup and make sure it happens in Authoritative Region -// -// Iterator<UniqueValue> iterator = table.getUniqueValues( scope, entityId ); -// -// while ( iterator.hasNext() ) { -// UniqueValue uniqueValue = iterator.next(); -// cancelUniqueField( scope, entityId, uniqueValue.getEntityVersion(), uniqueValue.getField(), region ); -// } -// } - - private void reserveUniqueField( ApplicationScope scope, Entity entity, UUID version, Field field, String region ) throws UniqueValueException { @@ -311,9 +286,8 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { @Override - public void createClusterSingletonManager(ActorSystem system) { + public void produceRouter( ActorSystem system, String role ) { - // create cluster singleton supervisor for actor system ClusterSingletonManagerSettings settings = ClusterSingletonManagerSettings.create( system ).withRole("io"); @@ -321,22 +295,12 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { Props.create( GuiceActorProducer.class, injector, UniqueValuesRouter.class ), PoisonPill.getInstance(), settings ), "uvRouter" ); - } - - - @Override - public void createClusterSingletonProxy( ActorSystem system, String role ) { - ClusterSingletonProxySettings proxySettings = ClusterSingletonProxySettings.create( system ).withRole( role ); system.actorOf( ClusterSingletonProxy.props( "/user/uvRouter", proxySettings ), "uvProxy" ); - } - - @Override - public void createLocalSystemActors( ActorSystem localSystem ) { - subscribeToReservations( localSystem ); + subscribeToReservations( system ); }
