Move cluster-singleton router config into RouterProducer and out of actorsystem module.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/fb1d78d0 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/fb1d78d0 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/fb1d78d0 Branch: refs/heads/release-2.1.1 Commit: fb1d78d0469c33067ba415cafa44540c5185dd61 Parents: 343ac51 Author: Dave Johnson <[email protected]> Authored: Wed Jun 22 10:33:30 2016 -0400 Committer: Dave Johnson <[email protected]> Committed: Wed Jun 22 10:33:30 2016 -0400 ---------------------------------------------------------------------- .../persistence/actorsystem/ActorSystemFig.java | 29 ++------- .../actorsystem/ActorSystemManagerImpl.java | 38 +++++------ .../persistence/actorsystem/RouterProducer.java | 6 +- .../src/main/resources/application.conf | 25 ++++++-- .../collection/guice/CollectionModule.java | 1 + .../uniquevalues/UniqueValuesFig.java | 67 ++++++++++++++++++++ .../uniquevalues/UniqueValuesServiceImpl.java | 42 ++++++++++-- .../uniquevalues/UniqueValuesTableImpl.java | 8 +-- .../src/test/resources/usergrid.properties | 2 +- 9 files changed, 160 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/fb1d78d0/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java index 50e860b..ec010d0 100644 --- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java +++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java @@ -26,6 +26,7 @@ import org.safehaus.guicyfig.Key; import java.io.Serializable; + @FigSingleton public interface ActorSystemFig extends GuicyFig, Serializable { @@ -39,13 +40,10 @@ public interface ActorSystemFig extends GuicyFig, Serializable { String AKKA_REGION_SEEDS = "collection.akka.region.seeds"; - String AKKA_UNIQUEVALUE_ACTORS = "collection.akka.uniquevalue.actors"; - - String AKKA_UNIQUEVALUE_CACHE_TTL = "collection.akka.uniquevalue.cache.ttl"; + String AKKA_AUTHORITATIVE_REGION = "collection.akka.authoritative.region"; - String AKKA_UNIQUEVALUE_RESERVATION_TTL= "collection.akka.uniquevalue.reservation.ttl"; + String AKKA_INSTANCES_PER_NODE = "collection.akka.instances-per-node"; - String AKKA_AUTHORITATIVE_REGION = "collection.akka.uniquevalue.authoritative.region"; /** * Use Akka or nah @@ -73,13 +71,6 @@ public interface ActorSystemFig extends GuicyFig, Serializable { String getRegionList(); /** - * Number of UniqueValueActors to be started on each node - */ - @Key(AKKA_UNIQUEVALUE_ACTORS) - @Default("300") - int getUniqueValueActors(); - - /** * Comma-separated lists of seeds each with format {region}:{hostname}:{port}. * Regions MUST be listed in the 'usergrid.queue.regionList' */ @@ -92,17 +83,11 @@ public interface ActorSystemFig extends GuicyFig, Serializable { @Key(AKKA_AUTHORITATIVE_REGION) String getAkkaAuthoritativeRegion(); - /** - * Unique Value cache TTL in seconds. - */ - @Key(AKKA_UNIQUEVALUE_CACHE_TTL) - @Default("10") - int getUniqueValueCacheTtl(); /** - * Unique Value Reservation TTL in seconds. + * Number of actor instances to create on each node for each router. */ - @Key(AKKA_UNIQUEVALUE_RESERVATION_TTL) - @Default("10") - int getUniqueValueReservationTtl(); + @Key(AKKA_INSTANCES_PER_NODE) + @Default("300") + int getInstancesPerNode(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/fb1d78d0/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java index b3af978..1f7bf70 100644 --- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java +++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java @@ -223,7 +223,7 @@ public class ActorSystemManagerImpl implements ActorSystemManager { } } - int numInstancesPerNode = actorSystemFig.getUniqueValueActors(); + int numInstancesPerNode = actorSystemFig.getInstancesPerNode(); // read config file once for each region @@ -236,15 +236,16 @@ public class ActorSystemManagerImpl implements ActorSystemManager { // cluster singletons only run role "io" nodes and NOT on "client" nodes of other regions String clusterRole = currentRegion.equals( region ) ? "io" : "client"; - logger.info( "Config for region {} is:\n" + - " Akka Hostname {}\n" + - " Akka Seeds {}\n" + - " Akka UniqueValueActors per node {}\n" + - " Akka Authoritative Region {}", - region, hostname, seeds, port, numInstancesPerNode, actorSystemFig.getAkkaAuthoritativeRegion() ); + logger.info( "Akka Config for region {} is:\n" + + " Hostname {}\n" + + " Seeds {}\n" + + " Authoritative Region {}\n", + region, hostname, seeds, actorSystemFig.getAkkaAuthoritativeRegion() ); Map<String, Object> configMap = new HashMap<String, Object>() {{ + put( "akka", new HashMap<String, Object>() {{ + put( "remote", new HashMap<String, Object>() {{ put( "netty.tcp", new HashMap<String, Object>() {{ put( "hostname", hostname ); @@ -252,8 +253,9 @@ public class ActorSystemManagerImpl implements ActorSystemManager { put( "port", regionPort ); }} ); }} ); + put( "cluster", new HashMap<String, Object>() {{ - put( "max-nr-of-instances-per-node", numInstancesPerNode ); + put( "max-nr-of-instances-per-node", 300); put( "roles", Collections.singletonList(clusterRole) ); put( "seed-nodes", new ArrayList<String>() {{ for (String seed : seeds) { @@ -262,24 +264,16 @@ public class ActorSystemManagerImpl implements ActorSystemManager { }} ); }} ); - // TODO: allow RouterProducers to add in router-specific stuff like this: - put( "actor", new HashMap<String, Object>() {{ - put( "deployment", new HashMap<String, Object>() {{ - put( "/uvRouter/singleton/router", new HashMap<String, Object>() {{ - put( "cluster", new HashMap<String, Object>() {{ - //put( "roles", Collections.singletonList(role) ); - put( "max-nr-of-instances-per-node", numInstancesPerNode ); - }} ); - }} ); - }} ); - }} ); }} ); }}; - Config config = ConfigFactory - .parseMap( configMap ) + for ( RouterProducer routerProducer : routerProducers ) { + routerProducer.addConfiguration( configMap ); + } + + Config config = ConfigFactory.parseMap( configMap ) .withFallback( ConfigFactory.parseString( "akka.cluster.roles = [io]" ) ) - .withFallback( ConfigFactory.load( "cluster-singleton" ) ); + .withFallback( ConfigFactory.load( "application.conf" ) ); configs.put( region, config ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/fb1d78d0/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java index ac2c7ee..3aa91cf 100644 --- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java +++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java @@ -19,7 +19,6 @@ package org.apache.usergrid.persistence.actorsystem; import akka.actor.ActorSystem; - import java.util.Map; @@ -42,4 +41,9 @@ public interface RouterProducer { */ void createLocalSystemActors( ActorSystem localSystem, Map<String, ActorSystem> systemMap ); + /** + * Add configuration for the router to configuration map + */ + void addConfiguration( Map<String, Object> configMap ); + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/fb1d78d0/stack/corepersistence/actorsystem/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/main/resources/application.conf b/stack/corepersistence/actorsystem/src/main/resources/application.conf index 93854f9..a243163 100644 --- a/stack/corepersistence/actorsystem/src/main/resources/application.conf +++ b/stack/corepersistence/actorsystem/src/main/resources/application.conf @@ -1,13 +1,30 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + akka { - + loggers = ["akka.event.slf4j.Slf4jLogger"] loglevel = "ERROR" logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" - + actor { provider = "akka.cluster.ClusterActorRefProvider" } - + remote { log-remote-lifecycle-events = off netty.tcp { @@ -24,5 +41,5 @@ akka.cluster.metrics.enabled=off akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension", "akka.cluster.pubsub.DistributedPubSub"] # Sigar native library extract location during tests. -# Note: use per-jvm-instance folder when running multiple jvm on one host. +# Note: use per-jvm-instance folder when running multiple jvm on one host. akka.cluster.metrics.native-library-extract-folder=${user.dir}/target/native http://git-wip-us.apache.org/repos/asf/usergrid/blob/fb1d78d0/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java index ae73e47..daf3fdc 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java @@ -59,6 +59,7 @@ public abstract class CollectionModule extends AbstractModule { // noinspection unchecked install( new GuicyFigModule( SerializationFig.class ) ); install( new GuicyFigModule( CollectionSchedulerFig.class ) ); + install( new GuicyFigModule( UniqueValuesFig.class ) ); install( new SerializationModule() ); install( new ServiceModule() ); install( new ActorSystemModule() ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/fb1d78d0/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesFig.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesFig.java new file mode 100644 index 0000000..c99824f --- /dev/null +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesFig.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.persistence.collection.uniquevalues; + +import org.safehaus.guicyfig.Default; +import org.safehaus.guicyfig.FigSingleton; +import org.safehaus.guicyfig.GuicyFig; +import org.safehaus.guicyfig.Key; +import java.io.Serializable; + + +@FigSingleton +public interface UniqueValuesFig extends GuicyFig, Serializable { + + String AKKA_UNIQUEVALUE_ACTORS = "collection.akka.uniquevalue.actors"; + + String AKKA_UNIQUEVALUE_CACHE_TTL = "collection.akka.uniquevalue.cache.ttl"; + + String AKKA_UNIQUEVALUE_RESERVATION_TTL= "collection.akka.uniquevalue.reservation.ttl"; + + String AKKA_UNIQUEVALUE_INSTANCES_PER_NODE = "collection.akka.uniquevalue.instances-per-node"; + + + /** + * Number of UniqueValueActors to be started on each node + */ + @Key(AKKA_UNIQUEVALUE_ACTORS) + @Default("300") + int getUniqueValueActors(); + + /** + * Unique Value cache TTL in seconds. + */ + @Key(AKKA_UNIQUEVALUE_CACHE_TTL) + @Default("10") + int getUniqueValueCacheTtl(); + + /** + * Unique Value Reservation TTL in seconds. + */ + @Key(AKKA_UNIQUEVALUE_RESERVATION_TTL) + @Default("10") + int getUniqueValueReservationTtl(); + + /** + * Number of actor instances to create on each. + */ + @Key(AKKA_UNIQUEVALUE_INSTANCES_PER_NODE) + @Default("300") + int getUniqueValueInstancesPerNode(); +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/fb1d78d0/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java index 85b9d1a..6035e04 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java @@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory; import scala.concurrent.Await; import scala.concurrent.Future; +import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -52,7 +53,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { private static final Logger logger = LoggerFactory.getLogger( UniqueValuesServiceImpl.class ); static Injector injector; - ActorSystemFig actorSystemFig; + UniqueValuesFig uniqueValuesFig; ActorSystemManager actorSystemManager; UniqueValuesTable table; private ReservationCache reservationCache; @@ -61,16 +62,16 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { @Inject public UniqueValuesServiceImpl( Injector inj, - ActorSystemFig actorSystemFig, + UniqueValuesFig uniqueValuesFig, ActorSystemManager actorSystemManager, UniqueValuesTable table ) { injector = inj; this.actorSystemManager = actorSystemManager; - this.actorSystemFig = actorSystemFig; + this.uniqueValuesFig = uniqueValuesFig; this.table = table; - ReservationCache.init( actorSystemFig.getUniqueValueCacheTtl() ); + ReservationCache.init( uniqueValuesFig.getUniqueValueCacheTtl() ); this.reservationCache = ReservationCache.getInstance(); } @@ -300,4 +301,37 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { public void createLocalSystemActors( ActorSystem localSystem, Map<String, ActorSystem> systemMap ) { subscribeToReservations( localSystem, systemMap ); } + + @Override + public void addConfiguration(Map<String, Object> configMap) { + + int numInstancesPerNode = uniqueValuesFig.getUniqueValueInstancesPerNode(); + + // TODO: will the below overwrite things other routers have added under "actor.deployment"? + + Map<String, Object> akka = (Map<String, Object>)configMap.get("akka"); + + akka.put( "actor", new HashMap<String, Object>() {{ + put( "deployment", new HashMap<String, Object>() {{ + put( "/uvRouter/singleton/router", new HashMap<String, Object>() {{ + put( "router", "consistent-hashing-pool" ); + put( "cluster", new HashMap<String, Object>() {{ + put( "enabled", "on" ); + put( "allow-local-routees", "on" ); + put( "user-role", "io" ); + put( "max-nr-of-instances-per-node", numInstancesPerNode ); + put( "failure-detector", new HashMap<String, Object>() {{ + put( "threshold", "" ); + put( "acceptable-heartbeat-pause", "3 s" ); + put( "heartbeat-interval", "1 s" ); + put( "heartbeat-request", new HashMap<String, Object>() {{ + put( "expected-response-after", "3 s" ); + }} ); + }} ); + }} ); + }} ); + }} ); + }} ); + + } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/fb1d78d0/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java index de326dd..9cb13be 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java @@ -42,12 +42,12 @@ public class UniqueValuesTableImpl implements UniqueValuesTable { private static final Logger logger = LoggerFactory.getLogger( UniqueValuesTableImpl.class ); final UniqueValueSerializationStrategy strat; - final ActorSystemFig actorSystemFig; + final UniqueValuesFig uniqueValuesFig; @Inject - public UniqueValuesTableImpl( final UniqueValueSerializationStrategy strat, ActorSystemFig actorSystemFig) { + public UniqueValuesTableImpl( final UniqueValueSerializationStrategy strat, UniqueValuesFig uniqueValuesFig) { this.strat = strat; - this.actorSystemFig = actorSystemFig; + this.uniqueValuesFig = uniqueValuesFig; } @@ -63,7 +63,7 @@ public class UniqueValuesTableImpl implements UniqueValuesTable { public void reserve( ApplicationScope scope, Id owner, UUID version, Field field ) throws ConnectionException { UniqueValue uv = new UniqueValueImpl( field, owner, version); - final MutationBatch write = strat.write( scope, uv, actorSystemFig.getUniqueValueReservationTtl() ); + final MutationBatch write = strat.write( scope, uv, uniqueValuesFig.getUniqueValueReservationTtl() ); write.execute(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/fb1d78d0/stack/corepersistence/collection/src/test/resources/usergrid.properties ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/resources/usergrid.properties b/stack/corepersistence/collection/src/test/resources/usergrid.properties index f20dfe8..759a3b3 100644 --- a/stack/corepersistence/collection/src/test/resources/usergrid.properties +++ b/stack/corepersistence/collection/src/test/resources/usergrid.properties @@ -7,7 +7,7 @@ collection.akka.hostname=localhost collection.akka.port=2551 collection.akka.region=us-east usergrid.queue.regionList=us-east -collection.akka.uniquevalue.authoritative.region=us-east +collection.akka.authoritative.region=us-east collection.akka.region.seeds=us-east\:localhost\:2551 collection.akka.uniquevalue.actors=300
