Kludgy work-around to avoid serializing injector, which is impossible.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/0c5deac1 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/0c5deac1 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/0c5deac1 Branch: refs/heads/release-2.1.1 Commit: 0c5deac1e04f5bedeeac355d71085351f9cbdd48 Parents: e0d95bf Author: Dave Johnson <[email protected]> Authored: Fri Jun 17 12:41:54 2016 -0400 Committer: Dave Johnson <[email protected]> Committed: Fri Jun 17 12:41:54 2016 -0400 ---------------------------------------------------------------------- .../uniquevalues/ClusterSingletonRouter.java | 19 ++++- .../uniquevalues/UniqueValueActor.java | 10 ++- .../uniquevalues/UniqueValuesServiceImpl.java | 90 ++++++++++---------- .../uniquevalues/UniqueValuesTableImpl.java | 2 + 4 files changed, 69 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c5deac1/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ClusterSingletonRouter.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ClusterSingletonRouter.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ClusterSingletonRouter.java index 7cc24eb..f2f80bf 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ClusterSingletonRouter.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ClusterSingletonRouter.java @@ -23,19 +23,34 @@ import akka.routing.ConsistentHashingRouter; import akka.routing.FromConfig; import com.google.inject.Inject; import com.google.inject.Injector; +import org.apache.commons.lang.RandomStringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Uses a consistent hash to route Unique Value requests to UniqueValueActors. */ public class ClusterSingletonRouter extends UntypedActor { + private static final Logger logger = LoggerFactory.getLogger( UniqueValueActor.class ); + + private final String name = RandomStringUtils.randomAlphanumeric( 4 ); private final ActorRef router; @Inject public ClusterSingletonRouter( Injector injector ) { - router = getContext().actorOf( FromConfig.getInstance().props( - Props.create( GuiceActorProducer.class, injector, UniqueValueActor.class)), "router"); + + router = getContext().actorOf( + FromConfig.getInstance().props(Props.create(UniqueValueActor.class)), "router"); + + // TODO: is there some way to pass the injector here without getting this exception: + // NotSerializableException: No configured serialization-bindings for class [InjectorImpl] + //router = getContext().actorOf( + //FromConfig.getInstance().props( Props.create( GuiceActorProducer.class, injector, UniqueValueActor.class)), + //"router" ); + + logger.info("ClusterSingletonRouter {} is live with injector {}", name, injector); } @Override http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c5deac1/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java index fe2e356..1e7879a 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java @@ -20,7 +20,6 @@ import akka.actor.ActorRef; import akka.actor.UntypedActor; import akka.cluster.pubsub.DistributedPubSub; import akka.cluster.pubsub.DistributedPubSubMediator; -import com.google.inject.Inject; import org.apache.commons.lang3.RandomStringUtils; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.model.entity.Id; @@ -40,13 +39,16 @@ public class UniqueValueActor extends UntypedActor { final private ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator(); + private final UniqueValuesTable table; + private int count = 0; - @Inject - UniqueValuesTable table; + public UniqueValueActor() { - public UniqueValueActor( ) { + // TODO: is there a way to avoid this ugly kludge? see also: ClusterSingletonRouter + this.table = UniqueValuesServiceImpl.injector.getInstance( UniqueValuesTable.class ); + logger.info("UniqueValueActor {} is live with table {}", name, table); } @Override http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c5deac1/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java index 5c12165..670fffd 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java @@ -53,7 +53,8 @@ import java.util.concurrent.TimeUnit; public class UniqueValuesServiceImpl implements UniqueValuesService { private static final Logger logger = LoggerFactory.getLogger( UniqueValuesServiceImpl.class ); - private final Injector injector; + static Injector injector; + AkkaFig akkaFig; UniqueValuesTable table; private String hostname; @@ -61,7 +62,8 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { private String currentRegion; private Map<String, ActorRef> requestActorsByRegion; - private Map<String, String> regionsByType = new HashMap<>(); + + //private Map<String, String> regionsByType = new HashMap<>(); // private final MetricRegistry metrics = new MetricRegistry(); // @@ -76,19 +78,15 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { private ReservationCache reservationCache; - private final boolean disableUniqueValues; - @Inject - public UniqueValuesServiceImpl(Injector injector, AkkaFig akkaFig, UniqueValuesTable table ) { - this.injector = injector; + public UniqueValuesServiceImpl(Injector inj, AkkaFig akkaFig, UniqueValuesTable table ) { + injector = inj; this.akkaFig = akkaFig; this.table = table; ReservationCache.init( akkaFig.getUniqueValueCacheTtl() ); this.reservationCache = ReservationCache.getInstance(); - - this.disableUniqueValues = false; } @@ -124,9 +122,9 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { } - private Map<String, String> getRegionsByType() { - return regionsByType; - } +// private Map<String, String> getRegionsByType() { +// return regionsByType; +// } // public Counter getDupCounter() { // return dupCounter; @@ -173,24 +171,28 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { throw new RuntimeException( "No value specified for " + AkkaFig.AKKA_REGION_SEEDS); } + if ( StringUtils.isEmpty( akkaFig.getAkkaAuthoritativeRegion() )) { + throw new RuntimeException( "No value specified for " + AkkaFig.AKKA_AUTHORITATIVE_REGION); + } + List regionList = Arrays.asList( akkaFig.getRegionList().toLowerCase().split(",") ); logger.info("Initializing Akka for hostname {} region {} regionList {} seeds {}", hostname, currentRegion, regionList, akkaFig.getRegionSeeds() ); - String typesValue = akkaFig.getRegionTypes(); - String[] regionTypes = StringUtils.isEmpty( typesValue ) ? new String[0] : typesValue.split(","); - for ( String regionType : regionTypes ) { - String[] parts = regionType.toLowerCase().split(":"); - String typeRegion = parts[0]; - String type = parts[1]; - - if ( !regionList.contains( typeRegion) ) { - throw new RuntimeException( - "'collection.akka.region.seeds' references unknown region: " + typeRegion ); - } - this.regionsByType.put( type, typeRegion ); - } +// String typesValue = akkaFig.getRegionTypes(); +// String[] regionTypes = StringUtils.isEmpty( typesValue ) ? new String[0] : typesValue.split(","); +// for ( String regionType : regionTypes ) { +// String[] parts = regionType.toLowerCase().split(":"); +// String typeRegion = parts[0]; +// String type = parts[1]; +// +// if ( !regionList.contains( typeRegion) ) { +// throw new RuntimeException( +// "'collection.akka.region.seeds' references unknown region: " + typeRegion ); +// } +// this.regionsByType.put( type, typeRegion ); +// } final Map<String, ActorSystem> systemMap = new HashMap<>(); @@ -395,11 +397,12 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { String clusterRole = currentRegion.equals( region ) ? "io" : "client"; logger.info( "Config for region {} is:\n" + - "poc Akka Hostname {}\n" + - "poc Akka Seeds {}\n" + - "poc Akka Port {}\n" + - "poc UniqueValueActors per node {}", - region, hostname, seeds, port, numInstancesPerNode ); + " AkkaUV Hostname {}\n" + + " AkkaUV Seeds {}\n" + + " AkkaUV Port {}\n" + + " AkkaUV UniqueValueActors per node {}\n" + + " AkkaUV Authoritative Region {}", + region, hostname, seeds, port, numInstancesPerNode, akkaFig.getAkkaAuthoritativeRegion() ); Map<String, Object> configMap = new HashMap<String, Object>() {{ put( "akka", new HashMap<String, Object>() {{ @@ -515,15 +518,10 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { private void reserveUniqueField( ApplicationScope scope, Entity entity, UUID version, Field field, String region ) throws UniqueValueException { - final ActorRef requestActor; - if ( region != null ) { - requestActor = getRequestActorsByRegion().get( region ); - } else { - requestActor = lookupRequestActorForType( entity.getId().getType() ); - } + final ActorRef requestActor = getRequestActorsByRegion().get( region ); if ( requestActor == null ) { - throw new RuntimeException( "No request actor for region or type, cannot verify unique fields!" ); + throw new RuntimeException( "No request actor for region " + region); } UniqueValueActor.Request request = new UniqueValueActor.Reservation( @@ -544,7 +542,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { private void confirmUniqueField( ApplicationScope scope, Entity entity, UUID version, Field field, String region) throws UniqueValueException { - ActorRef requestActor = lookupRequestActorForType( entity.getId().getType() ); + final ActorRef requestActor = getRequestActorsByRegion().get( region ); if ( requestActor == null ) { throw new RuntimeException( "No request actor for type, cannot verify unique fields!" ); @@ -560,7 +558,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { private void cancelUniqueField( ApplicationScope scope, Entity entity, UUID version, Field field, String region ) throws UniqueValueException { - ActorRef requestActor = lookupRequestActorForType( entity.getId().getType() ); + final ActorRef requestActor = getRequestActorsByRegion().get( region ); if ( requestActor == null ) { throw new RuntimeException( "No request actor for type, cannot verify unique fields!" ); @@ -573,14 +571,14 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { } - private ActorRef lookupRequestActorForType( String type ) { - final String region = getRegionsByType().get( type ); - ActorRef requestActor = getRequestActorsByRegion().get( region == null ? currentRegion : region ); - if ( requestActor == null ) { - throw new RuntimeException( "No request actor available for region: " + region ); - } - return requestActor; - } +// private ActorRef lookupRequestActorForType( String type ) { +// final String region = getRegionsByType().get( type ); +// ActorRef requestActor = getRequestActorsByRegion().get( region == null ? currentRegion : region ); +// if ( requestActor == null ) { +// throw new RuntimeException( "No request actor available for region: " + region ); +// } +// return requestActor; +// } private void sendUniqueValueRequest( http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c5deac1/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java index 8ba7cbd..c0fa390 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java @@ -19,6 +19,7 @@ package org.apache.usergrid.persistence.collection.uniquevalues; import com.google.inject.Inject; +import com.google.inject.Singleton; import com.netflix.astyanax.MutationBatch; import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; import org.apache.usergrid.persistence.collection.serialization.UniqueValue; @@ -35,6 +36,7 @@ import java.util.Collections; import java.util.UUID; +@Singleton public class UniqueValuesTableImpl implements UniqueValuesTable { private static final Logger logger = LoggerFactory.getLogger( UniqueValuesTableImpl.class );
