Fix REST test issues and implement a way to shutdown the Akka actor system, with hooks for JVM stop/servlet stop.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/5f39ee0a Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/5f39ee0a Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/5f39ee0a Branch: refs/heads/release-2.1.1 Commit: 5f39ee0af119305ef1a63c7fb5b19e08db8a7890 Parents: 31b2040 Author: Michael Russo <[email protected]> Authored: Fri Jul 8 15:33:26 2016 -0700 Committer: Michael Russo <[email protected]> Committed: Fri Jul 8 15:33:26 2016 -0700 ---------------------------------------------------------------------- .../src/test/resources/usergrid-test.properties | 5 +- .../actorsystem/ActorSystemManager.java | 2 + .../actorsystem/ActorSystemManagerImpl.java | 81 +++++++++++++------- .../uniquevalues/ReservationCacheActor.java | 25 ++++-- .../uniquevalues/UniqueValueActor.java | 4 +- .../apache/usergrid/rest/ShutdownListener.java | 11 +++ .../applications/ApplicationResourceIT.java | 2 +- .../resources/usergrid-custom-test.properties | 6 +- .../resources/usergrid-rest-deploy-context.xml | 1 - 9 files changed, 96 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/5f39ee0a/stack/config/src/test/resources/usergrid-test.properties ---------------------------------------------------------------------- diff --git a/stack/config/src/test/resources/usergrid-test.properties b/stack/config/src/test/resources/usergrid-test.properties index 368c585..085a11d 100644 --- a/stack/config/src/test/resources/usergrid-test.properties +++ b/stack/config/src/test/resources/usergrid-test.properties @@ -30,6 +30,9 @@ # This property is required to be set and cannot be defaulted anywhere usergrid.cluster_name=usergrid +# Set this for testing purposesly only +usergrid.test=true + # Whether to user the remote Cassandra cluster or not cassandra.use_remote=false @@ -153,7 +156,7 @@ groupid=counter_group autooffset.reset=smallest # set high batch size to minimize count overhead -usergrid.counter.batch.size=10000 +usergrid.counter.batch.size=1 usergrid.recaptcha.public= usergrid.recaptcha.private= http://git-wip-us.apache.org/repos/asf/usergrid/blob/5f39ee0a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java index 893afca..c7322dd 100644 --- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java +++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java @@ -81,4 +81,6 @@ public interface ActorSystemManager { * Publish message to all topic subscribers in all regions. */ void publishToAllRegions( String topic, Object message, ActorRef sender ); + + void shutdownAll(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/5f39ee0a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java index 8399979..d8d284f 100644 --- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java +++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java @@ -66,6 +66,8 @@ public class ActorSystemManagerImpl implements ActorSystemManager { private ListMultimap<String, String> seedsByRegion; + private ActorSystem clusterSystem = null; + @Inject @@ -173,15 +175,15 @@ public class ActorSystemManagerImpl implements ActorSystemManager { // Create one actor system with request actor for each region if ( StringUtils.isEmpty( currentRegion )) { - throw new RuntimeException( "No value specified for " + ActorSystemFig.CLUSTER_REGIONS_LOCAL ); + throw new RuntimeException( "No value specified for: " + ActorSystemFig.CLUSTER_REGIONS_LOCAL ); } if ( StringUtils.isEmpty( actorSystemFig.getRegionsList() )) { - throw new RuntimeException( "No value specified for " + ActorSystemFig.CLUSTER_REGIONS_LIST ); + throw new RuntimeException( "No value specified for: " + ActorSystemFig.CLUSTER_REGIONS_LIST ); } if ( StringUtils.isEmpty( actorSystemFig.getSeeds() )) { - throw new RuntimeException( "No value specified for " + ActorSystemFig.CLUSTER_SEEDS ); + throw new RuntimeException( "No value specified for: " + ActorSystemFig.CLUSTER_SEEDS ); } List regionList = Arrays.asList( actorSystemFig.getRegionsList().toLowerCase().split(",") ); @@ -191,15 +193,15 @@ public class ActorSystemManagerImpl implements ActorSystemManager { Config config = readClusterSystemConfig(); - ActorSystem localSystem = createClusterSystemsFromConfigs( config ); + clusterSystem = createClusterSystemsFromConfigs( config ); - createClientActors( localSystem ); + createClientActors( clusterSystem ); for ( RouterProducer routerProducer : routerProducers ) { - routerProducer.createLocalSystemActors( localSystem ); + routerProducer.createLocalSystemActors( clusterSystem ); } - mediator = DistributedPubSub.get( localSystem ).mediator(); + mediator = DistributedPubSub.get( clusterSystem ).mediator(); } @@ -214,7 +216,7 @@ public class ActorSystemManagerImpl implements ActorSystemManager { String[] regionSeeds = actorSystemFig.getSeeds().split( "," ); - logger.info( "Found region {} seeds {}", regionSeeds.length, regionSeeds ); + logger.info( "Found region [{}] seeds [{}]", regionSeeds.length, regionSeeds ); try { @@ -248,7 +250,7 @@ public class ActorSystemManagerImpl implements ActorSystemManager { String seed = "akka.tcp://ClusterSystem" + "@" + hostname + ":" + regionPort; - logger.info( "Adding seed {} for region {}", seed, region ); + logger.info( "Adding seed [{}] for region [{}]", seed, region ); seedsByRegion.put( region, seed ); } @@ -283,7 +285,7 @@ public class ActorSystemManagerImpl implements ActorSystemManager { List<String> seeds = getSeedsByRegion().get( region ); - logger.info( "Akka Config for region {} is:\n" + " Hostname {}\n" + " Seeds {}\n", + logger.info( "Akka Config for region [{}] is:\n" + " Hostname [{}]\n" + " Seeds [{}]\n", region, hostname, seeds ); int lastColon = seeds.get(0).lastIndexOf(":") + 1; @@ -335,19 +337,38 @@ public class ActorSystemManagerImpl implements ActorSystemManager { */ private ActorSystem createClusterSystemsFromConfigs( Config config ) { - ActorSystem system = ActorSystem.create( "ClusterSystem", config ); - for ( RouterProducer routerProducer : routerProducers ) { - logger.info("Creating {} for region {}", routerProducer.getName(), currentRegion ); - routerProducer.createClusterSingletonManager( system ); - } + // there is only 1 akka system for a Usergrid cluster + final String clusterName = "ClusterSystem"; + + + if( clusterSystem == null) { + + logger.info("Class: {}. ActorSystem [{}] not initialized, creating...", this, clusterName); + + clusterSystem = ActorSystem.create( clusterName, config ); + + for ( RouterProducer routerProducer : routerProducers ) { + logger.info("Creating router producer [{}] for region [{}]", routerProducer.getName(), currentRegion ); + routerProducer.createClusterSingletonManager( clusterSystem ); + } + + for ( RouterProducer routerProducer : routerProducers ) { + logger.info("Creating [{}] proxy for region [{}] role 'io'", routerProducer.getName(), currentRegion); + routerProducer.createClusterSingletonProxy( clusterSystem, "io" ); + } + + //add a shutdown hook to clean all actor systems if the JVM exits without the servlet container knowing + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + shutdownAll(); + } + }); - for ( RouterProducer routerProducer : routerProducers ) { - logger.info("Creating {} proxy for region {} role 'io'", routerProducer.getName(), currentRegion); - routerProducer.createClusterSingletonProxy( system, "io" ); } - return system; + return clusterSystem; } @@ -360,7 +381,7 @@ public class ActorSystemManagerImpl implements ActorSystemManager { if ( currentRegion.equals( region )) { - logger.info( "Creating clientActor for region {}", region ); + logger.info( "Creating clientActor for region [{}]", region ); // Each clientActor needs to know path to ClusterSingletonProxy and region clientActor = system.actorOf( @@ -381,7 +402,6 @@ public class ActorSystemManagerImpl implements ActorSystemManager { clusterClientsByRegion.put( region, clusterClient ); } - } } @@ -394,7 +414,7 @@ public class ActorSystemManagerImpl implements ActorSystemManager { private void waitForClientActor( ActorRef ra ) { - logger.info( "Waiting on request actor {}...", ra.path() ); + logger.info( "Waiting on RequestActor [{}]...", ra.path() ); started = false; @@ -411,20 +431,29 @@ public class ActorSystemManagerImpl implements ActorSystemManager { started = true; break; } - logger.info( "Waiting for request actor {} region {} ({}s)", ra.path(), currentRegion, retries ); + logger.info( "Waiting for RequestActor [{}] region [{}] for [{}s]", ra.path(), currentRegion, retries ); Thread.sleep( 1000 ); } catch (Exception e) { - logger.error( "Error: Timeout waiting for requestActor" ); + logger.error( "Error: Timeout waiting for RequestActor [{}]", ra.path() ); } retries++; } if (started) { - logger.info( "RequestActor has started" ); + logger.info( "RequestActor [{}] has started", ra.path() ); } else { - throw new RuntimeException( "RequestActor did not start in time" ); + throw new RuntimeException( "RequestActor ["+ra.path()+"] did not start in time" ); } } + @Override + public void shutdownAll(){ + + logger.info("Shutting down Akka cluster: {}", clusterSystem.name()); + clusterSystem.shutdown(); + + + } + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/5f39ee0a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java index 158b099..3998eb6 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java @@ -49,27 +49,36 @@ public class ReservationCacheActor extends UntypedActor { ReservationCache.getInstance().cacheReservation( res ); if ( ++reservationCount % 10 == 0 ) { - logger.info("Received {} reservations cache size {}", + if(logger.isDebugEnabled()) { + logger.debug("Received {} reservations cache size {}", reservationCount, ReservationCache.getInstance().getSize()); + } } } else if ( msg instanceof UniqueValueActor.Cancellation ) { UniqueValueActor.Cancellation can = (UniqueValueActor.Cancellation) msg; - ReservationCache.getInstance().cancelReservation( can ); - - if (++cancellationCount % 10 == 0) { - logger.info( "Received {} cancellations", cancellationCount ); + ReservationCache.getInstance().cancelReservation(can); + if (logger.isDebugEnabled()) { + if (++cancellationCount % 10 == 0) { + logger.debug("Received {} cancellations", cancellationCount); + } else { + logger.debug("Removing cancelled {} from reservation cache", can.getConsistentHashKey()); + } } - logger.debug("Removing cancelled {} from reservation cache", can.getConsistentHashKey()); + } else if ( msg instanceof UniqueValueActor.Response ) { UniqueValueActor.Response response = (UniqueValueActor.Response) msg; ReservationCache.getInstance().cancelReservation( response ); - logger.info("Removing completed {} from reservation cache", response.getConsistentHashKey()); + if(logger.isDebugEnabled()) { + logger.debug("Removing completed {} from reservation cache", response.getConsistentHashKey()); + } } else if (msg instanceof DistributedPubSubMediator.SubscribeAck) { - logger.debug( "subscribing" ); + if(logger.isDebugEnabled()) { + logger.debug("subscribing"); + } } else { unhandled( msg ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/5f39ee0a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java index 501037f..74f45eb 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java @@ -57,8 +57,8 @@ public class UniqueValueActor extends UntypedActor { Request req = (Request) message; count++; - if (count % 10 == 0) { - logger.info( "UniqueValueActor {} processed {} requests", name, count ); + if (count % 10 == 0 && logger.isDebugEnabled()) { + logger.debug( "UniqueValueActor {} processed {} requests", name, count ); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/5f39ee0a/stack/rest/src/main/java/org/apache/usergrid/rest/ShutdownListener.java ---------------------------------------------------------------------- diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/ShutdownListener.java b/stack/rest/src/main/java/org/apache/usergrid/rest/ShutdownListener.java index f9f5421..f9f1653 100644 --- a/stack/rest/src/main/java/org/apache/usergrid/rest/ShutdownListener.java +++ b/stack/rest/src/main/java/org/apache/usergrid/rest/ShutdownListener.java @@ -18,8 +18,10 @@ package org.apache.usergrid.rest; +import com.google.inject.Injector; import org.apache.usergrid.batch.service.JobSchedulerService; import org.apache.usergrid.batch.service.SchedulerService; +import org.apache.usergrid.persistence.actorsystem.ActorSystemManager; import org.apache.usergrid.persistence.cassandra.CassandraService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,6 +65,15 @@ public class ShutdownListener implements ServletContextListener { logger.info("ShutdownListener invoked"); + ApplicationContext ctx = WebApplicationContextUtils + .getWebApplicationContext(sce.getServletContext()); + + Injector injector = ctx.getBean(Injector.class); + ActorSystemManager actorSystemManager = injector.getInstance(ActorSystemManager.class); + + // stop the Akka actor system + actorSystemManager.shutdownAll(); + boolean started = Boolean.parseBoolean( properties.getProperty(JobServiceBoostrap.START_SCHEDULER_PROP, "true")); http://git-wip-us.apache.org/repos/asf/usergrid/blob/5f39ee0a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationResourceIT.java ---------------------------------------------------------------------- diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationResourceIT.java index 06615df..9f4f8aa 100644 --- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationResourceIT.java +++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationResourceIT.java @@ -279,7 +279,7 @@ public class ApplicationResourceIT extends AbstractRestIT { .get(ApiResponse.class); // assert that the response returns the correct URI - assertEquals(apiResponse.getUri(), String.format("http://sometestvalue/%s/%s", orgName, appName)); + assertEquals(String.format("http://localhost:8080/%s/%s", orgName, appName), apiResponse.getUri()); //unmarshal the application from the response Application application = new Application(apiResponse); http://git-wip-us.apache.org/repos/asf/usergrid/blob/5f39ee0a/stack/rest/src/test/resources/usergrid-custom-test.properties ---------------------------------------------------------------------- diff --git a/stack/rest/src/test/resources/usergrid-custom-test.properties b/stack/rest/src/test/resources/usergrid-custom-test.properties index f20f1e5..d845fcc 100644 --- a/stack/rest/src/test/resources/usergrid-custom-test.properties +++ b/stack/rest/src/test/resources/usergrid-custom-test.properties @@ -35,8 +35,6 @@ collection.stage.transient.timeout=5 # other... usergrid.mongo.disable=true -usergrid.counter.batch.size=1 -usergrid.api.url.base=http://sometestvalue usergrid.notifications.listener.run=false @@ -63,5 +61,9 @@ usergrid.cluster.region.local=us-east usergrid.cluster.region.list=us-east usergrid.cluster.seeds=us-east\:localhost +# Use random port here for REST tests run outside embedded tomcat because these will get an instance of Spring that +# starts the Akka cluster, then the embedded tomcat will also try when it starts ( but using default props and port 2551) +usergrid.cluster.port=2555 + collection.uniquevalues.actors=300 collection.uniquevalues.authoritative.region=us-east http://git-wip-us.apache.org/repos/asf/usergrid/blob/5f39ee0a/stack/rest/src/test/resources/usergrid-rest-deploy-context.xml ---------------------------------------------------------------------- diff --git a/stack/rest/src/test/resources/usergrid-rest-deploy-context.xml b/stack/rest/src/test/resources/usergrid-rest-deploy-context.xml index 9cc5ea6..07215ab 100644 --- a/stack/rest/src/test/resources/usergrid-rest-deploy-context.xml +++ b/stack/rest/src/test/resources/usergrid-rest-deploy-context.xml @@ -29,7 +29,6 @@ <list> <value>classpath:/usergrid-default.properties</value> <value>classpath:/usergrid-test.properties</value> - <value>classpath:/usergrid-custom-test-rest.properties</value> </list> </property> </bean>
