Use ClusterClient feature instead of roles to ensure that all write are done only in the appropriate authoritative region.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/d12307bd Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/d12307bd Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/d12307bd Branch: refs/heads/release-2.1.1 Commit: d12307bdb3219ac87550147cb23cbb0e14155200 Parents: 841409f Author: Dave Johnson <[email protected]> Authored: Thu Jun 23 17:33:33 2016 -0400 Committer: Dave Johnson <[email protected]> Committed: Thu Jun 23 17:33:33 2016 -0400 ---------------------------------------------------------------------- .../corepersistence/CpEntityManagerFactory.java | 2 +- .../actorsystem/ActorSystemManager.java | 8 +- .../actorsystem/ActorSystemManagerImpl.java | 297 ++++++++++--------- .../persistence/actorsystem/RouterProducer.java | 8 +- .../src/main/resources/application.conf | 7 +- .../src/main/resources/cluster-singleton.conf | 25 -- .../uniquevalues/UniqueValueActor.java | 2 +- .../uniquevalues/UniqueValuesServiceImpl.java | 95 +++--- 8 files changed, 223 insertions(+), 221 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/d12307bd/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java index 9bd589a..060ec18 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java @@ -155,7 +155,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application actorSystemManager.registerMessageType( UniqueValueActor.Cancellation.class, "/user/uvProxy" ); actorSystemManager.registerMessageType( UniqueValueActor.Confirmation.class, "/user/uvProxy" ); actorSystemManager.start(); - actorSystemManager.waitForRequestActors(); + actorSystemManager.waitForClientActors(); } catch (Throwable t) { logger.error("Error starting Akka", t); http://git-wip-us.apache.org/repos/asf/usergrid/blob/d12307bd/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java index e2c2913..c45ccac 100644 --- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java +++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java @@ -27,7 +27,7 @@ public interface ActorSystemManager { void start(String hostname, Integer port, String currentRegion); - void waitForRequestActors(); + void waitForClientActors(); boolean isReady(); @@ -35,5 +35,9 @@ public interface ActorSystemManager { void registerMessageType( Class messageType, String routerPath ); - ActorRef getClientActor(String region ); + ActorRef getClientActor(); + + ActorRef getClusterClient(String region ); + + String getCurrentRegion(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/d12307bd/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java index 1f7bf70..5e23c20 100644 --- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java +++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java @@ -19,15 +19,15 @@ package org.apache.usergrid.persistence.actorsystem; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Props; +import akka.actor.*; +import akka.cluster.client.ClusterClient; +import akka.cluster.client.ClusterClientReceptionist; +import akka.cluster.client.ClusterClientSettings; import akka.pattern.Patterns; import akka.util.Timeout; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ListMultimap; import com.google.inject.Inject; -import com.google.inject.Injector; import com.google.inject.Singleton; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; @@ -45,22 +45,25 @@ import java.util.concurrent.TimeUnit; public class ActorSystemManagerImpl implements ActorSystemManager { private static final Logger logger = LoggerFactory.getLogger( ActorSystemManagerImpl.class ); + private boolean started = false; + private String hostname; private Integer port; private String currentRegion; - private static Injector injector; private final ActorSystemFig actorSystemFig; - private final Map<String, ActorRef> requestActorsByRegion; private final List<RouterProducer> routerProducers = new ArrayList<>(); private final Map<Class, String> routersByMessageType = new HashMap<>(); + private final Map<String, ActorRef> clusterClientsByRegion = new HashMap<String, ActorRef>(20); + + private ActorRef clientActor; + + private ListMultimap<String, String> seedsByRegion; @Inject - public ActorSystemManagerImpl(Injector inj, ActorSystemFig actorSystemFig) { - injector = inj; + public ActorSystemManagerImpl( ActorSystemFig actorSystemFig ) { this.actorSystemFig = actorSystemFig; - this.requestActorsByRegion = new HashMap<>(); } @@ -75,7 +78,7 @@ public class ActorSystemManagerImpl implements ActorSystemManager { this.port = null; initAkka(); - waitForRequestActors(); + waitForClientActors(); } @@ -95,7 +98,7 @@ public class ActorSystemManagerImpl implements ActorSystemManager { @Override public boolean isReady() { - return !getRequestActorsByRegion().isEmpty(); + return started; } @@ -112,13 +115,20 @@ public class ActorSystemManagerImpl implements ActorSystemManager { @Override - public ActorRef getClientActor(String region) { - return getRequestActorsByRegion().get( region ); + public ActorRef getClientActor() { + return clientActor; + } + + + @Override + public ActorRef getClusterClient(String region) { + return clusterClientsByRegion.get( region ); } - private Map<String, ActorRef> getRequestActorsByRegion() { - return requestActorsByRegion; + @Override + public String getCurrentRegion() { + return currentRegion; } @@ -152,215 +162,214 @@ public class ActorSystemManagerImpl implements ActorSystemManager { logger.info("Initializing Akka for hostname {} region {} regionList {} seeds {}", hostname, currentRegion, regionList, actorSystemFig.getRegionSeeds() ); - final Map<String, ActorSystem> systemMap = new HashMap<>(); - - Map<String, Config> configMap = readClusterSingletonConfigs(); + Config config = readClusterSystemConfig(); - ActorSystem localSystem = createClusterSingletonProxies( configMap, systemMap ); + ActorSystem localSystem = createClusterSystemsFromConfigs( config ); - createRequestActors( systemMap ); + createClientActors( localSystem ); for ( RouterProducer routerProducer : routerProducers ) { - routerProducer.createLocalSystemActors( localSystem, systemMap ); + routerProducer.createLocalSystemActors( localSystem ); } } /** - * Read configuration and create a Config for each region. - * - * @return Map of regions to Configs. + * Read Usergrid's list of seeds, put them in handy multi-map. */ - private Map<String, Config> readClusterSingletonConfigs() { + private ListMultimap<String, String> getSeedsByRegion() { - Map<String, Config> configs = new HashMap<>(); + if ( seedsByRegion == null ) { - ListMultimap<String, String> seedsByRegion = ArrayListMultimap.create(); + seedsByRegion = ArrayListMultimap.create(); - String[] regionSeeds = actorSystemFig.getRegionSeeds().split( "," ); + String[] regionSeeds = actorSystemFig.getRegionSeeds().split( "," ); - logger.info("Found region {} seeds {}", regionSeeds.length, regionSeeds); + logger.info( "Found region {} seeds {}", regionSeeds.length, regionSeeds ); - try { + try { - if ( port != null ) { + if (port != null) { - // we are testing, create seeds-by-region map for one region, one seed + // we are testing, create seeds-by-region map for one region, one seed - String seed = "akka.tcp://ClusterSystem@" + hostname + ":" + port; - seedsByRegion.put( currentRegion, seed ); - logger.info("Akka testing, only starting one seed"); + String seed = "akka.tcp://ClusterSystem" + "@" + hostname + ":" + port; + seedsByRegion.put( currentRegion, seed ); + logger.info( "Akka testing, only starting one seed" ); - } else { // create seeds-by-region map + } else { // create seeds-by-region map - for (String regionSeed : regionSeeds) { + for (String regionSeed : regionSeeds) { - String[] parts = regionSeed.split( ":" ); - String region = parts[0]; - String hostname = parts[1]; - String regionPortString = parts[2]; + String[] parts = regionSeed.split( ":" ); + String region = parts[0]; + String hostname = parts[1]; + String regionPortString = parts[2]; - // all seeds in same region must use same port - // we assume 0th seed has the right port - final Integer regionPort; + // all seeds in same region must use same port + // we assume 0th seed has the right port + final Integer regionPort; - if (port == null) { - regionPort = Integer.parseInt( regionPortString ); - } else { - regionPort = port; // unless we are testing - } + if (port == null) { + regionPort = Integer.parseInt( regionPortString ); + } else { + regionPort = port; // unless we are testing + } - String seed = "akka.tcp://ClusterSystem@" + hostname + ":" + regionPort; + String seed = "akka.tcp://ClusterSystem" + "@" + hostname + ":" + regionPort; - logger.info("Adding seed {} for region {}", seed, region ); + logger.info( "Adding seed {} for region {}", seed, region ); - seedsByRegion.put( region, seed ); - } + seedsByRegion.put( region, seed ); + } - if (seedsByRegion.keySet().isEmpty()) { - throw new RuntimeException( - "No seeds listed in 'parsing collection.akka.region.seeds' property." ); + if (seedsByRegion.keySet().isEmpty()) { + throw new RuntimeException( + "No seeds listed in 'parsing collection.akka.region.seeds' property." ); + } } + + } catch (Exception e) { + throw new RuntimeException( "Error 'parsing collection.akka.region.seeds' property", e ); } + } + + return seedsByRegion; + } - int numInstancesPerNode = actorSystemFig.getInstancesPerNode(); - // read config file once for each region + /** + * Read cluster config and add seed nodes to it. + */ + private Config readClusterSystemConfig() { - for ( String region : seedsByRegion.keySet() ) { + Config config = null; - List<String> seeds = seedsByRegion.get( region ); - int lastColon = seeds.get(0).lastIndexOf(":") + 1; - final Integer regionPort = Integer.parseInt( seeds.get(0).substring( lastColon )); + try { - // cluster singletons only run role "io" nodes and NOT on "client" nodes of other regions - String clusterRole = currentRegion.equals( region ) ? "io" : "client"; + int numInstancesPerNode = actorSystemFig.getInstancesPerNode(); - logger.info( "Akka Config for region {} is:\n" + - " Hostname {}\n" + - " Seeds {}\n" + - " Authoritative Region {}\n", - region, hostname, seeds, actorSystemFig.getAkkaAuthoritativeRegion() ); + String region = currentRegion; - Map<String, Object> configMap = new HashMap<String, Object>() {{ + List<String> seeds = getSeedsByRegion().get( region ); + int lastColon = seeds.get(0).lastIndexOf(":") + 1; + final Integer regionPort = Integer.parseInt( seeds.get(0).substring( lastColon )); - put( "akka", new HashMap<String, Object>() {{ + logger.info( "Akka Config for region {} is:\n" + + " Hostname {}\n" + + " Seeds {}\n" + + " Authoritative Region {}\n", + region, hostname, seeds, actorSystemFig.getAkkaAuthoritativeRegion() ); - put( "remote", new HashMap<String, Object>() {{ - put( "netty.tcp", new HashMap<String, Object>() {{ - put( "hostname", hostname ); - put( "bind-hostname", hostname ); - put( "port", regionPort ); - }} ); - }} ); + Map<String, Object> configMap = new HashMap<String, Object>() {{ - put( "cluster", new HashMap<String, Object>() {{ - put( "max-nr-of-instances-per-node", 300); - put( "roles", Collections.singletonList(clusterRole) ); - put( "seed-nodes", new ArrayList<String>() {{ - for (String seed : seeds) { - add( seed ); - } - }} ); - }} ); + put( "akka", new HashMap<String, Object>() {{ + put( "remote", new HashMap<String, Object>() {{ + put( "netty.tcp", new HashMap<String, Object>() {{ + put( "hostname", hostname ); + put( "bind-hostname", hostname ); + put( "port", regionPort ); + }} ); }} ); - }}; - for ( RouterProducer routerProducer : routerProducers ) { - routerProducer.addConfiguration( configMap ); - } + put( "cluster", new HashMap<String, Object>() {{ + put( "max-nr-of-instances-per-node", numInstancesPerNode); + put( "roles", Collections.singletonList("io") ); + put( "seed-nodes", new ArrayList<String>() {{ + for (String seed : seeds) { + add( seed ); + } + }} ); + }} ); - Config config = ConfigFactory.parseMap( configMap ) - .withFallback( ConfigFactory.parseString( "akka.cluster.roles = [io]" ) ) - .withFallback( ConfigFactory.load( "application.conf" ) ); + }} ); + }}; - configs.put( region, config ); + for ( RouterProducer routerProducer : routerProducers ) { + routerProducer.addConfiguration( configMap ); } + config = ConfigFactory.parseMap( configMap ) + .withFallback( ConfigFactory.load( "application.conf" ) ); + + } catch ( Exception e ) { - throw new RuntimeException("Error 'parsing collection.akka.region.seeds' property", e ); + throw new RuntimeException("Error reading and adding to cluster config", e ); } - return configs; + return config; } /** - * Create ActorSystem and ClusterSingletonProxy for every region. - * Create ClusterSingletonManager for the current region. - * - * @param configMap Configurations to be used to create ActorSystems - * @param systemMap Map of ActorSystems created by this method - * - * @return ActorSystem for this region. + * Create actor system for this region, with cluster singleton manager & proxy. */ - private ActorSystem createClusterSingletonProxies( - Map<String, Config> configMap, Map<String, ActorSystem> systemMap ) { - - ActorSystem localSystem = null; + private ActorSystem createClusterSystemsFromConfigs( Config config ) { - for ( String region : configMap.keySet() ) { - Config config = configMap.get( region ); + ActorSystem system = ActorSystem.create( "ClusterSystem", config ); - ActorSystem system = ActorSystem.create( "ClusterSystem", config ); - systemMap.put( region, system ); - - // cluster singletons only run role "io" nodes and NOT on "client" nodes of other regions - if ( currentRegion.equals( region ) ) { - - localSystem = system; - - for ( RouterProducer routerProducer : routerProducers ) { - routerProducer.createClusterSingletonManager( system ); - } - } + for ( RouterProducer routerProducer : routerProducers ) { + logger.info("Creating {} for region {}", routerProducer.getName(), currentRegion ); + routerProducer.createClusterSingletonManager( system ); + } - for ( RouterProducer routerProducer : routerProducers ) { - routerProducer.createClusterSingletonProxy( system ); - } + for ( RouterProducer routerProducer : routerProducers ) { + logger.info("Creating {} proxy for region {} role 'io'", routerProducer.getName(), currentRegion); + routerProducer.createClusterSingletonProxy( system, "io" ); } - return localSystem; + return system; } /** * Create RequestActor for each region. - * - * @param systemMap Map of regions to ActorSystems. */ - private void createRequestActors( Map<String, ActorSystem> systemMap ) { + private void createClientActors( ActorSystem system ) { + + for ( String region : getSeedsByRegion().keySet() ) { + + if ( currentRegion.equals( region )) { + + logger.info( "Creating clientActor for region {}", region ); + + // Each clientActor needs to know path to ClusterSingletonProxy and region + clientActor = system.actorOf( + Props.create( ClientActor.class, routersByMessageType ), "clientActor" ); - for ( String region : systemMap.keySet() ) { + ClusterClientReceptionist.get(system).registerService( clientActor ); - logger.info("Creating request actor for region {}", region); + } else { - // Each RequestActor needs to know path to ClusterSingletonProxy and region - ActorRef requestActor = systemMap.get( region ).actorOf( - //Props.create( ClientActor.class, "/user/uvProxy" ), "requestActor" ); - Props.create( ClientActor.class, routersByMessageType ), "requestActor" ); + List<String> regionSeeds = getSeedsByRegion().get( region ); + Set<ActorPath> seedPaths = new HashSet<>(20); + for ( String seed : getSeedsByRegion().get( region ) ) { + seedPaths.add( ActorPaths.fromString( seed + "/system/receptionist") ); + } + + ActorRef clusterClient = system.actorOf( ClusterClient.props( + ClusterClientSettings.create(system).withInitialContacts( seedPaths )), "client"); + + clusterClientsByRegion.put( region, clusterClient ); + } - requestActorsByRegion.put( region, requestActor ); } } @Override - public void waitForRequestActors() { + public void waitForClientActors() { - for ( String region : requestActorsByRegion.keySet() ) { - ActorRef ra = requestActorsByRegion.get( region ); - waitForRequestActor( ra ); - } + waitForClientActor( clientActor ); } - - private void waitForRequestActor( ActorRef ra ) { + private void waitForClientActor( ActorRef ra ) { logger.info( "Waiting on request actor {}...", ra.path() ); - boolean started = false; + started = false; + int retries = 0; int maxRetries = 60; while (retries < maxRetries) { http://git-wip-us.apache.org/repos/asf/usergrid/blob/d12307bd/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java index 3aa91cf..d849dd9 100644 --- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java +++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java @@ -24,6 +24,8 @@ import java.util.Map; public interface RouterProducer { + String getName(); + /** * Create cluster single manager for current region. * Will be called once per router per JVM. @@ -34,16 +36,16 @@ public interface RouterProducer { * Create cluster singleton proxy for region. * Will be called once per router per JVM per region. */ - void createClusterSingletonProxy( ActorSystem system ); + void createClusterSingletonProxy( ActorSystem system, String role ); /** * Create other actors needed to support the router produced by the implementation. */ - void createLocalSystemActors( ActorSystem localSystem, Map<String, ActorSystem> systemMap ); + void createLocalSystemActors( ActorSystem localSystem ); /** * Add configuration for the router to configuration map */ - void addConfiguration( Map<String, Object> configMap ); + void addConfiguration(Map<String, Object> configMap ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/d12307bd/stack/corepersistence/actorsystem/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/main/resources/application.conf b/stack/corepersistence/actorsystem/src/main/resources/application.conf index a243163..5706610 100644 --- a/stack/corepersistence/actorsystem/src/main/resources/application.conf +++ b/stack/corepersistence/actorsystem/src/main/resources/application.conf @@ -38,7 +38,12 @@ akka { akka.cluster.metrics.enabled=off # Enable metrics extension in akka-cluster-metrics. -akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension", "akka.cluster.pubsub.DistributedPubSub"] +akka.extensions=[ + "akka.cluster.metrics.ClusterMetricsExtension", + "akka.cluster.pubsub.DistributedPubSub", + "akka.cluster.client.ClusterClientReceptionist" +] + # Sigar native library extract location during tests. # Note: use per-jvm-instance folder when running multiple jvm on one host. http://git-wip-us.apache.org/repos/asf/usergrid/blob/d12307bd/stack/corepersistence/actorsystem/src/main/resources/cluster-singleton.conf ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/main/resources/cluster-singleton.conf b/stack/corepersistence/actorsystem/src/main/resources/cluster-singleton.conf deleted file mode 100644 index 907aebb..0000000 --- a/stack/corepersistence/actorsystem/src/main/resources/cluster-singleton.conf +++ /dev/null @@ -1,25 +0,0 @@ -include "application" - -akka.actor.deployment { - /uvRouter/singleton/router { - router = consistent-hashing-pool - cluster { - enabled = on - allow-local-routees = on - - # singleton will only run on nodes with role "io" - use-role = io - - # more forgiving failure detector - failure-detector { - threshold = 10 - acceptable-heartbeat-pause = 3 s - heartbeat-interval = 1 s - heartbeat-request { - expected-response-after = 3 s - } - } - - } - } -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/d12307bd/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java index bb30b92..e53710c 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java @@ -59,7 +59,7 @@ public class UniqueValueActor extends UntypedActor { count++; if (count % 10 == 0) { - logger.debug( "UniqueValueActor {} processed {} requests", name, count ); + logger.info( "UniqueValueActor {} processed {} requests", name, count ); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/d12307bd/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java index 119d6f6..b888b1f 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java @@ -22,6 +22,7 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.PoisonPill; import akka.actor.Props; +import akka.cluster.client.ClusterClient; import akka.cluster.singleton.ClusterSingletonManager; import akka.cluster.singleton.ClusterSingletonManagerSettings; import akka.cluster.singleton.ClusterSingletonProxy; @@ -75,18 +76,25 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { } - private void subscribeToReservations( ActorSystem localSystem, Map<String, ActorSystem> systemMap ) { - - for ( String region : systemMap.keySet() ) { - ActorSystem actorSystem = systemMap.get( region ); - if ( !actorSystem.equals( localSystem ) ) { - logger.info("Starting ReservationCacheUpdater for {}", region ); - actorSystem.actorOf( Props.create( ReservationCacheActor.class ), "subscriber"); - } - } + @Override + public String getName() { + return "UniqueValues ClusterSingleton Router"; } + // TODO: restore reservation cache +// private void subscribeToReservations( ActorSystem localSystem, Map<String, ActorSystem> systemMap ) { +// +// for ( String region : systemMap.keySet() ) { +// ActorSystem actorSystem = systemMap.get( region ); +// if ( !actorSystem.equals( localSystem ) ) { +// logger.info("Starting ReservationCacheUpdater for {}", region ); +// actorSystem.actorOf( Props.create( ReservationCacheActor.class ), "subscriber"); +// } +// } +// } + + @Override public void reserveUniqueValues( ApplicationScope scope, Entity entity, UUID version, String region ) throws UniqueValueException { @@ -154,12 +162,6 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { private void reserveUniqueField( ApplicationScope scope, Entity entity, UUID version, Field field, String region ) throws UniqueValueException { - final ActorRef requestActor = actorSystemManager.getClientActor( region ); - - if ( requestActor == null ) { - throw new RuntimeException( "No request actor for region " + region); - } - UniqueValueActor.Request request = new UniqueValueActor.Reservation( scope, entity.getId(), version, field ); @@ -171,39 +173,35 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { throw new UniqueValueException( "Error property not unique (cache)", field); } - sendUniqueValueRequest( entity, requestActor, request ); + sendUniqueValueRequest( entity, region, request ); } private void confirmUniqueField( ApplicationScope scope, Entity entity, UUID version, Field field, String region) throws UniqueValueException { - final ActorRef requestActor = actorSystemManager.getClientActor( region ); - - if ( requestActor == null ) { - throw new RuntimeException( "No request actor for type, cannot verify unique fields!" ); - } - UniqueValueActor.Confirmation request = new UniqueValueActor.Confirmation( scope, entity.getId(), version, field ); - sendUniqueValueRequest( entity, requestActor, request ); + sendUniqueValueRequest( entity, region, request ); } private void cancelUniqueField( ApplicationScope scope, Entity entity, UUID version, Field field, String region ) throws UniqueValueException { - final ActorRef requestActor = actorSystemManager.getClientActor( region ); - - if ( requestActor == null ) { - throw new RuntimeException( "No request actor for type, cannot verify unique fields!" ); - } - UniqueValueActor.Cancellation request = new UniqueValueActor.Cancellation( scope, entity.getId(), version, field ); - requestActor.tell( request, null ); + if ( actorSystemManager.getCurrentRegion().equals( region ) ) { + ActorRef clientActor = actorSystemManager.getClientActor(); + clientActor.tell( request, null ); + + } else { + ActorRef clusterClient = actorSystemManager.getClusterClient( region ); + clusterClient.tell( new ClusterClient.Send("/user/clientActor", request), null ); + } + } @@ -218,7 +216,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { private void sendUniqueValueRequest( - Entity entity, ActorRef requestActor, UniqueValueActor.Request request ) throws UniqueValueException { + Entity entity, String region, UniqueValueActor.Request request ) throws UniqueValueException { int maxRetries = 5; int retries = 0; @@ -230,7 +228,17 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { // ask RequestActor and wait (up to timeout) for response - Future<Object> fut = Patterns.ask( requestActor, request, t ); + Future<Object> fut; + + if ( actorSystemManager.getCurrentRegion().equals( region ) ) { + ActorRef clientActor = actorSystemManager.getClientActor(); + fut = Patterns.ask( clientActor, request, t ); + + } else { + ActorRef clusterClient = actorSystemManager.getClusterClient( region ); + fut = Patterns.ask( clusterClient, new ClusterClient.Send("/user/clientActor", request), t ); + } + response = (UniqueValueActor.Response) Await.result( fut, t.duration() ); if ( response != null && ( @@ -280,34 +288,33 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { ClusterSingletonManagerSettings.create( system ).withRole("io"); system.actorOf( ClusterSingletonManager.props( - //Props.create( ClusterSingletonRouter.class, table ), - Props.create( GuiceActorProducer.class, injector, UniqueValuesRouter.class), - PoisonPill.getInstance(), settings ), "uvRouter"); + Props.create( GuiceActorProducer.class, injector, UniqueValuesRouter.class ), + PoisonPill.getInstance(), settings ), "uvRouter" ); + } @Override - public void createClusterSingletonProxy(ActorSystem system) { + public void createClusterSingletonProxy( ActorSystem system, String role ) { ClusterSingletonProxySettings proxySettings = - ClusterSingletonProxySettings.create( system ).withRole("io"); + ClusterSingletonProxySettings.create( system ).withRole( role ); system.actorOf( ClusterSingletonProxy.props( "/user/uvRouter", proxySettings ), "uvProxy" ); } @Override - public void createLocalSystemActors( ActorSystem localSystem, Map<String, ActorSystem> systemMap ) { - subscribeToReservations( localSystem, systemMap ); + public void createLocalSystemActors( ActorSystem localSystem ) { + // TODO: restore reservation cache + //subscribeToReservations( localSystem ); } @Override - public void addConfiguration(Map<String, Object> configMap) { + public void addConfiguration( Map<String, Object> configMap ) { int numInstancesPerNode = uniqueValuesFig.getUniqueValueInstancesPerNode(); - // TODO: will the below overwrite things other routers have added under "actor.deployment"? - Map<String, Object> akka = (Map<String, Object>)configMap.get("akka"); akka.put( "actor", new HashMap<String, Object>() {{ @@ -317,10 +324,10 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { put( "cluster", new HashMap<String, Object>() {{ put( "enabled", "on" ); put( "allow-local-routees", "on" ); - put( "user-role", "io" ); + put( "use-role", "io" ); put( "max-nr-of-instances-per-node", numInstancesPerNode ); put( "failure-detector", new HashMap<String, Object>() {{ - put( "threshold", "" ); + put( "threshold", "10" ); put( "acceptable-heartbeat-pause", "3 s" ); put( "heartbeat-interval", "1 s" ); put( "heartbeat-request", new HashMap<String, Object>() {{
