Repository: usergrid Updated Branches: refs/heads/master b0fbf14eb -> 849df6746
Hook ReservationCache in via ClusterClients Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/2fb3ab32 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/2fb3ab32 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/2fb3ab32 Branch: refs/heads/master Commit: 2fb3ab32bca20be4c9133e340cb97145ca452a3c Parents: d12307b Author: Dave Johnson <[email protected]> Authored: Fri Jun 24 11:34:53 2016 -0400 Committer: Dave Johnson <[email protected]> Committed: Fri Jun 24 11:34:53 2016 -0400 ---------------------------------------------------------------------- .../corepersistence/CpEntityManagerFactory.java | 2 +- .../actorsystem/ActorSystemManager.java | 36 ++++++++++++++- .../actorsystem/ActorSystemManagerImpl.java | 24 +++++++++- .../uniquevalues/ReservationCacheActor.java | 4 +- .../uniquevalues/UniqueValueActor.java | 27 ++++++----- .../uniquevalues/UniqueValuesServiceImpl.java | 47 ++++++++------------ .../collection/AbstractUniqueValueTest.java | 2 +- 7 files changed, 96 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/2fb3ab32/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java index 060ec18..eca5927 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java @@ -155,7 +155,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application actorSystemManager.registerMessageType( UniqueValueActor.Cancellation.class, "/user/uvProxy" ); actorSystemManager.registerMessageType( UniqueValueActor.Confirmation.class, "/user/uvProxy" ); actorSystemManager.start(); - actorSystemManager.waitForClientActors(); + actorSystemManager.waitForClientActor(); } catch (Throwable t) { logger.error("Error starting Akka", t); http://git-wip-us.apache.org/repos/asf/usergrid/blob/2fb3ab32/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java index c45ccac..cdb6caf 100644 --- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java +++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java @@ -23,21 +23,55 @@ import akka.actor.ActorRef; public interface ActorSystemManager { + /** + * Start create and start all Akka Actors, ClusterClients Routers and etc. + */ void start(); + /** + * Start method used in JUnit tests. + */ void start(String hostname, Integer port, String currentRegion); - void waitForClientActors(); + /** + * Wait until ClientActor has seen some nodes and is ready for use. + */ + void waitForClientActor(); + /** + * True if ActorSystem and ClientActor are ready to be used. + */ boolean isReady(); + /** + * MUST be called before start() to register any router producers to be configured. + */ void registerRouterProducer( RouterProducer routerProducer ); + /** + * MUST be called before start() to register any messages to be sent. + * @param messageType Class of message. + * @param routerPath Router-path to which such messages are to be sent. + */ void registerMessageType( Class messageType, String routerPath ); + /** + * Local client for ActorSystem, send all local messages here for routing. + */ ActorRef getClientActor(); + /** + * Get ClientClient for specified remote region. + */ ActorRef getClusterClient(String region ); + /** + * Get name of of this, the current region. + */ String getCurrentRegion(); + + /** + * Publish message to all topic subscribers in all regions. + */ + void publishToAllRegions( String topic, Object message, ActorRef sender ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/2fb3ab32/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java index 5e23c20..89980bc 100644 --- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java +++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java @@ -23,6 +23,8 @@ import akka.actor.*; import akka.cluster.client.ClusterClient; import akka.cluster.client.ClusterClientReceptionist; import akka.cluster.client.ClusterClientSettings; +import akka.cluster.pubsub.DistributedPubSub; +import akka.cluster.pubsub.DistributedPubSubMediator; import akka.pattern.Patterns; import akka.util.Timeout; import com.google.common.collect.ArrayListMultimap; @@ -56,6 +58,8 @@ public class ActorSystemManagerImpl implements ActorSystemManager { private final Map<Class, String> routersByMessageType = new HashMap<>(); private final Map<String, ActorRef> clusterClientsByRegion = new HashMap<String, ActorRef>(20); + private ActorRef mediator; + private ActorRef clientActor; private ListMultimap<String, String> seedsByRegion; @@ -78,7 +82,7 @@ public class ActorSystemManagerImpl implements ActorSystemManager { this.port = null; initAkka(); - waitForClientActors(); + waitForClientActor(); } @@ -132,6 +136,20 @@ public class ActorSystemManagerImpl implements ActorSystemManager { } + @Override + public void publishToAllRegions( String topic, Object message, ActorRef sender ) { + + // send to local subscribers to topic + mediator.tell( new DistributedPubSubMediator.Publish( topic, message ), sender ); + + // send to each ClusterClient + for ( ActorRef clusterClient : clusterClientsByRegion.values() ) { + clusterClient.tell( new ClusterClient.Publish( topic, message ), sender ); + } + + } + + private void initAkka() { logger.info("Initializing Akka"); @@ -171,6 +189,8 @@ public class ActorSystemManagerImpl implements ActorSystemManager { for ( RouterProducer routerProducer : routerProducers ) { routerProducer.createLocalSystemActors( localSystem ); } + + mediator = DistributedPubSub.get( localSystem ).mediator(); } @@ -359,7 +379,7 @@ public class ActorSystemManagerImpl implements ActorSystemManager { @Override - public void waitForClientActors() { + public void waitForClientActor() { waitForClientActor( clientActor ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/2fb3ab32/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java index 2912c7d..51f5c8c 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java @@ -49,7 +49,7 @@ public class ReservationCacheActor extends UntypedActor { ReservationCache.getInstance().cacheReservation( res ); if ( ++reservationCount % 10 == 0 ) { - logger.debug("Received {} reservations cache size {}", + logger.info("Received {} reservations cache size {}", reservationCount, ReservationCache.getInstance().getSize()); } @@ -58,7 +58,7 @@ public class ReservationCacheActor extends UntypedActor { ReservationCache.getInstance().cancelReservation( can ); if ( ++cancellationCount % 10 == 0 ) { - logger.debug("Received {} cancellations", cancellationCount); + logger.info("Received {} cancellations", cancellationCount); } } else if (msg instanceof DistributedPubSubMediator.SubscribeAck) { http://git-wip-us.apache.org/repos/asf/usergrid/blob/2fb3ab32/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java index e53710c..a14c63e 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java @@ -21,6 +21,7 @@ import akka.actor.UntypedActor; import akka.cluster.pubsub.DistributedPubSub; import akka.cluster.pubsub.DistributedPubSubMediator; import org.apache.commons.lang3.RandomStringUtils; +import org.apache.usergrid.persistence.actorsystem.ActorSystemManager; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.field.Field; @@ -30,6 +31,7 @@ import org.slf4j.LoggerFactory; import java.io.Serializable; import java.util.UUID; + public class UniqueValueActor extends UntypedActor { private static final Logger logger = LoggerFactory.getLogger( UniqueValueActor.class ); @@ -37,7 +39,7 @@ public class UniqueValueActor extends UntypedActor { //private MetricsService metricsService; - final private ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator(); + private final ActorSystemManager actorSystemManager; private final UniqueValuesTable table; @@ -48,6 +50,8 @@ public class UniqueValueActor extends UntypedActor { // TODO: is there a way to avoid this ugly kludge? see also: ClusterSingletonRouter this.table = UniqueValuesServiceImpl.injector.getInstance( UniqueValuesTable.class ); + this.actorSystemManager = UniqueValuesServiceImpl.injector.getInstance( ActorSystemManager.class ); + //logger.info("UniqueValueActor {} is live with table {}", name, table); } @@ -66,7 +70,7 @@ public class UniqueValueActor extends UntypedActor { if ( message instanceof Reservation ) { Reservation res = (Reservation) message; -// final Timer.Context context = metricsService.getReservationTimer().time(); + // final Timer.Context context = metricsService.getReservationTimer().time(); try { Id owner = table.lookupOwner( res.getApplicationScope(), res.getOwner().getType(), res.getField() ); @@ -86,8 +90,7 @@ public class UniqueValueActor extends UntypedActor { getSender().tell( new Response( Response.Status.IS_UNIQUE ), getSender() ); - mediator.tell( new DistributedPubSubMediator.Publish( "content", - new Reservation( res ) ), getSelf() ); + actorSystemManager.publishToAllRegions( "content", new Reservation( res ), getSelf() ); } catch (Throwable t) { @@ -96,13 +99,13 @@ public class UniqueValueActor extends UntypedActor { } finally { -// context.stop(); + // context.stop(); } } else if ( message instanceof Confirmation) { Confirmation con = (Confirmation) message; -// final Timer.Context context = metricsService.getCommitmentTimer().time(); + // final Timer.Context context = metricsService.getCommitmentTimer().time(); try { Id owner = table.lookupOwner( con.getApplicationScope(), con.getOwner().getType(), con.getField() ); @@ -122,15 +125,14 @@ public class UniqueValueActor extends UntypedActor { getSender().tell( new Response( Response.Status.IS_UNIQUE ), getSender() ); - mediator.tell( new DistributedPubSubMediator.Publish( "content", - new Reservation( con ) ), getSelf() ); + actorSystemManager.publishToAllRegions( "content", new Reservation( con ), getSelf() ); } catch (Throwable t) { getSender().tell( new Response( Response.Status.ERROR ), getSender() ); logger.error( "Error processing request", t ); } finally { -// context.stop(); + // context.stop(); } @@ -155,8 +157,7 @@ public class UniqueValueActor extends UntypedActor { getSender().tell( new Response( Response.Status.SUCCESS ), getSender() ); - mediator.tell( new DistributedPubSubMediator.Publish( "content", - new Reservation( can ) ), getSelf() ); + actorSystemManager.publishToAllRegions( "content", new Reservation( can ), getSelf() ); } catch (Throwable t) { getSender().tell( new Response( Response.Status.ERROR ), getSender() ); @@ -180,10 +181,12 @@ public class UniqueValueActor extends UntypedActor { final String consistentHashKey; public Request( ApplicationScope applicationScope, Id owner, UUID ownerVersion, Field field ) { + this.applicationScope = applicationScope; this.owner = owner; this.ownerVersion = ownerVersion; this.field = field; + StringBuilder sb = new StringBuilder(); sb.append( applicationScope.getApplication() ); sb.append(":"); @@ -195,10 +198,12 @@ public class UniqueValueActor extends UntypedActor { this.consistentHashKey = sb.toString(); } public Request( Request req ) { + this.applicationScope = req.applicationScope; this.owner = req.owner; this.ownerVersion = req.ownerVersion; this.field = req.field; + StringBuilder sb = new StringBuilder(); sb.append( req.applicationScope.getApplication() ); sb.append(":"); http://git-wip-us.apache.org/repos/asf/usergrid/blob/2fb3ab32/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java index b888b1f..352c2e5 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java @@ -82,17 +82,10 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { } - // TODO: restore reservation cache -// private void subscribeToReservations( ActorSystem localSystem, Map<String, ActorSystem> systemMap ) { -// -// for ( String region : systemMap.keySet() ) { -// ActorSystem actorSystem = systemMap.get( region ); -// if ( !actorSystem.equals( localSystem ) ) { -// logger.info("Starting ReservationCacheUpdater for {}", region ); -// actorSystem.actorOf( Props.create( ReservationCacheActor.class ), "subscriber"); -// } -// } -// } + private void subscribeToReservations( ActorSystem localSystem ) { + logger.info("Starting ReservationCacheUpdater"); + localSystem.actorOf( Props.create( ReservationCacheActor.class ), "subscriber"); + } @Override @@ -166,9 +159,9 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { scope, entity.getId(), version, field ); UniqueValueActor.Reservation res = reservationCache.get( request.getConsistentHashKey() ); -// if ( res != null ) { -// getCacheCounter().inc(); -// } + // if ( res != null ) { + // getCacheCounter().inc(); + // } if ( res != null && !res.getOwner().equals( request.getOwner() )) { throw new UniqueValueException( "Error property not unique (cache)", field); } @@ -194,10 +187,14 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { scope, entity.getId(), version, field ); if ( actorSystemManager.getCurrentRegion().equals( region ) ) { + + // sending to current region, use local clientActor ActorRef clientActor = actorSystemManager.getClientActor(); clientActor.tell( request, null ); } else { + + // sending to remote region, send via cluster client for that region ActorRef clusterClient = actorSystemManager.getClusterClient( region ); clusterClient.tell( new ClusterClient.Send("/user/clientActor", request), null ); } @@ -205,16 +202,6 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { } -// private ActorRef lookupRequestActorForType( String type ) { -// final String region = getRegionsByType().get( type ); -// ActorRef requestActor = getRequestActorsByRegion().get( region == null ? currentRegion : region ); -// if ( requestActor == null ) { -// throw new RuntimeException( "No request actor available for region: " + region ); -// } -// return requestActor; -// } - - private void sendUniqueValueRequest( Entity entity, String region, UniqueValueActor.Request request ) throws UniqueValueException { @@ -226,19 +213,22 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { try { Timeout t = new Timeout( 1, TimeUnit.SECONDS ); - // ask RequestActor and wait (up to timeout) for response - Future<Object> fut; if ( actorSystemManager.getCurrentRegion().equals( region ) ) { + + // sending to current region, use local clientActor ActorRef clientActor = actorSystemManager.getClientActor(); fut = Patterns.ask( clientActor, request, t ); } else { + + // sending to remote region, send via cluster client for that region ActorRef clusterClient = actorSystemManager.getClusterClient( region ); fut = Patterns.ask( clusterClient, new ClusterClient.Send("/user/clientActor", request), t ); } + // wait (up to timeout) for response response = (UniqueValueActor.Response) Await.result( fut, t.duration() ); if ( response != null && ( @@ -306,8 +296,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { @Override public void createLocalSystemActors( ActorSystem localSystem ) { - // TODO: restore reservation cache - //subscribeToReservations( localSystem ); + subscribeToReservations( localSystem ); } @Override @@ -317,6 +306,8 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { Map<String, Object> akka = (Map<String, Object>)configMap.get("akka"); + // TODO: replace this configuration stuff with equivalent Java code in the above "create" methods + akka.put( "actor", new HashMap<String, Object>() {{ put( "deployment", new HashMap<String, Object>() {{ put( "/uvRouter/singleton/router", new HashMap<String, Object>() {{ http://git-wip-us.apache.org/repos/asf/usergrid/blob/2fb3ab32/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/AbstractUniqueValueTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/AbstractUniqueValueTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/AbstractUniqueValueTest.java index 041835b..e7b4450 100644 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/AbstractUniqueValueTest.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/AbstractUniqueValueTest.java @@ -41,7 +41,7 @@ public class AbstractUniqueValueTest { actorSystemManager.registerMessageType( UniqueValueActor.Cancellation.class, "/user/uvProxy" ); actorSystemManager.registerMessageType( UniqueValueActor.Confirmation.class, "/user/uvProxy" ); actorSystemManager.start( "127.0.0.1", port, "us-east" ); - actorSystemManager.waitForRequestActors(); + actorSystemManager.waitForClientActor(); startedAkka.put( port, true ); }
