Repository: usergrid Updated Branches: refs/heads/usergrid-1268-akka-211 [created] 9be069b15
Initial commit of Akka for Unique Values, Akka init works but much more work is needed. Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/52ee2fb7 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/52ee2fb7 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/52ee2fb7 Branch: refs/heads/usergrid-1268-akka-211 Commit: 52ee2fb70e0672fba651edd2d998247e095e11fc Parents: 4aa73aa Author: Dave Johnson <[email protected]> Authored: Tue Apr 12 07:38:40 2016 -0400 Committer: Dave Johnson <[email protected]> Committed: Mon Apr 25 14:31:39 2016 -0400 ---------------------------------------------------------------------- stack/corepersistence/collection/pom.xml | 116 ++-- .../collection/guice/CollectionModule.java | 6 + .../EntityCollectionManagerFactoryImpl.java | 51 +- .../collection/uniquevalues/AkkaFig.java | 95 +++ .../uniquevalues/ClusterSingletonRouter.java | 37 ++ .../collection/uniquevalues/RequestActor.java | 170 ++++++ .../uniquevalues/ReservationCache.java | 52 ++ .../uniquevalues/ReservationCacheActor.java | 73 +++ .../uniquevalues/UniqueValueActor.java | 245 ++++++++ .../uniquevalues/UniqueValueException.java | 7 + .../uniquevalues/UniqueValuesService.java | 49 ++ .../uniquevalues/UniqueValuesServiceImpl.java | 587 +++++++++++++++++++ .../uniquevalues/UniqueValuesTable.java | 33 ++ .../uniquevalues/UniqueValuesTableImpl.java | 46 ++ .../src/main/resources/application.conf | 28 + .../src/main/resources/cluster-singleton.conf | 25 + .../collection/guice/TestCollectionModule.java | 17 + .../collection/uniquevalues/AkkaFigTest.java | 41 ++ .../uniquevalues/LocalPreventDupsTest.java | 141 +++++ .../src/test/resources/usergrid.properties | 14 + 20 files changed, 1774 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/pom.xml ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/pom.xml b/stack/corepersistence/collection/pom.xml index 1b77735..ad9cefd 100644 --- a/stack/corepersistence/collection/pom.xml +++ b/stack/corepersistence/collection/pom.xml @@ -3,46 +3,82 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <parent> - <artifactId>persistence</artifactId> - <groupId>org.apache.usergrid</groupId> - <version>2.1.1-SNAPSHOT</version> - </parent> - - <modelVersion>4.0.0</modelVersion> - <description>The module for handling all scope I/O</description> - - <artifactId>collection</artifactId> - <name>Usergrid Collection</name> - - <dependencies> - - <!-- Google Guice Integration Test Injectors --> - - <dependency> - <groupId>org.apache.usergrid</groupId> - <artifactId>common</artifactId> - <version>${project.version}</version> - </dependency> - - <!-- tests --> - - <dependency> - <groupId>org.apache.usergrid</groupId> - <artifactId>common</artifactId> - <version>${project.version}</version> - <classifier>tests</classifier> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-core</artifactId> - <version>${mockito.version}</version> - <scope>test</scope> - </dependency> - - </dependencies> + <parent> + <artifactId>persistence</artifactId> + <groupId>org.apache.usergrid</groupId> + <version>2.1.1-SNAPSHOT</version> + </parent> + + <modelVersion>4.0.0</modelVersion> + <description>The module for handling all scope I/O</description> + + <artifactId>collection</artifactId> + <name>Usergrid Collection</name> + + <dependencies> + + <!-- Google Guice Integration Test Injectors --> + + <dependency> + <groupId>org.apache.usergrid</groupId> + <artifactId>common</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-actor_2.11</artifactId> + <version>2.4.0</version> + </dependency> + + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-remote_2.11</artifactId> + <version>2.4.0</version> + </dependency> + + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-cluster_2.11</artifactId> + <version>2.4.0</version> + </dependency> + + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-cluster-tools_2.11</artifactId> + <version>2.4.0</version> + </dependency> + + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-cluster-metrics_2.11</artifactId> + <version>2.4.0</version> + </dependency> + + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-slf4j_2.11</artifactId> + <version>2.4.0</version> + </dependency> + + <!-- tests --> + + <dependency> + <groupId>org.apache.usergrid</groupId> + <artifactId>common</artifactId> + <version>${project.version}</version> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <version>${mockito.version}</version> + <scope>test</scope> + </dependency> + + </dependencies> <!-- <profiles> http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java index d788174..3d794d1 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java @@ -20,6 +20,9 @@ package org.apache.usergrid.persistence.collection.guice; import java.util.concurrent.ThreadPoolExecutor; +import org.apache.usergrid.persistence.collection.uniquevalues.AkkaFig; +import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesService; +import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesServiceImpl; import org.safehaus.guicyfig.GuicyFigModule; import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; @@ -54,6 +57,7 @@ public abstract class CollectionModule extends AbstractModule { protected void configure() { // noinspection unchecked + install( new GuicyFigModule( AkkaFig.class ) ); install( new GuicyFigModule( SerializationFig.class ) ); install( new GuicyFigModule( CollectionSchedulerFig.class ) ); install( new SerializationModule() ); @@ -66,6 +70,8 @@ public abstract class CollectionModule extends AbstractModule { //bind this to our factory install( new GuicyFigModule( EntityCacheFig.class ) ); + bind( UniqueValuesService.class ).to( UniqueValuesServiceImpl.class ); + bind( ChangeLogGenerator.class).to( ChangeLogGeneratorImpl.class); configureMigrationProvider(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java index a52ee9c..6ba23b6 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java @@ -82,14 +82,25 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag CacheBuilder.newBuilder().maximumSize( 1000 ) .build( new CacheLoader<ApplicationScope, EntityCollectionManager>() { public EntityCollectionManager load( ApplicationScope scope ) { - //create the target EM that will perform logic + //create the target EM that will perform logic final EntityCollectionManager target = new EntityCollectionManagerImpl( - writeStart, writeVerifyUnique, - writeOptimisticVerify, writeCommit, rollback, markStart, markCommit, uniqueCleanup, versionCompact, - entitySerializationStrategy, uniqueValueSerializationStrategy, - mvccLogEntrySerializationStrategy, keyspace, - metricsFactory, serializationFig, - rxTaskScheduler, scope ); + writeStart, + writeVerifyUnique, + writeOptimisticVerify, + writeCommit, + rollback, + markStart, + markCommit, + uniqueCleanup, + versionCompact, + entitySerializationStrategy, + uniqueValueSerializationStrategy, + mvccLogEntrySerializationStrategy, + keyspace, + metricsFactory, + serializationFig, + rxTaskScheduler, + scope ); return target; } @@ -97,17 +108,19 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag @Inject - public EntityCollectionManagerFactoryImpl( final WriteStart writeStart, final WriteUniqueVerify writeVerifyUnique, - final WriteOptimisticVerify writeOptimisticVerify, - final WriteCommit writeCommit, final RollbackAction rollback, - final MarkStart markStart, final MarkCommit markCommit, - final UniqueCleanup uniqueCleanup, final VersionCompact versionCompact, - final SerializationFig serializationFig, final - MvccEntitySerializationStrategy entitySerializationStrategy, - final UniqueValueSerializationStrategy uniqueValueSerializationStrategy, - final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy, - final Keyspace keyspace, final EntityCacheFig entityCacheFig, - final MetricsFactory metricsFactory, @CollectionExecutorScheduler final RxTaskScheduler rxTaskScheduler ) { + public EntityCollectionManagerFactoryImpl( + final WriteStart writeStart, final WriteUniqueVerify writeVerifyUnique, + final WriteOptimisticVerify writeOptimisticVerify, + final WriteCommit writeCommit, final RollbackAction rollback, + final MarkStart markStart, final MarkCommit markCommit, + final UniqueCleanup uniqueCleanup, final VersionCompact versionCompact, + final SerializationFig serializationFig, + final MvccEntitySerializationStrategy entitySerializationStrategy, + final UniqueValueSerializationStrategy uniqueValueSerializationStrategy, + final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy, + final Keyspace keyspace, final EntityCacheFig entityCacheFig, + final MetricsFactory metricsFactory, @CollectionExecutorScheduler + final RxTaskScheduler rxTaskScheduler ) { this.writeStart = writeStart; this.writeVerifyUnique = writeVerifyUnique; @@ -126,6 +139,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag this.metricsFactory = metricsFactory; this.rxTaskScheduler = rxTaskScheduler; } + @Override public EntityCollectionManager createCollectionManager(ApplicationScope applicationScope) { Preconditions.checkNotNull(applicationScope); @@ -141,5 +155,4 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag public void invalidate() { ecmCache.invalidateAll(); } - } http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFig.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFig.java new file mode 100644 index 0000000..3bb9fcf --- /dev/null +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFig.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.persistence.collection.uniquevalues; + + +import org.safehaus.guicyfig.Default; +import org.safehaus.guicyfig.FigSingleton; +import org.safehaus.guicyfig.GuicyFig; +import org.safehaus.guicyfig.Key; + +@FigSingleton +public interface AkkaFig extends GuicyFig { + + String AKKA_HOSTNAME = "collection.akka.hostname"; + + String AKKA_PORT = "collection.akka.port"; + + String AKKA_REGION = "collection.akka.region"; + + String AKKA_REGIONS = "collection.akka.regions"; + + String AKKA_UNIQUE_VALUE_ACTORS = "collection.akka.unique.value.actors"; + + String AKKA_REGION_SEEDS = "collection.akka.region.seeds"; + + String AKKA_REGION_TYPES = "collection.akka.region.types"; + + + /** + * Hostname to be used in Akka configuration. + */ + @Key(AKKA_HOSTNAME) + @Default("localhost") + String getHostname(); + + /** + * local port to be used in Akka configuration. + */ + @Key(AKKA_PORT) + @Default("2551") + int getPort(); + + /** + * Local region to be used in Akka configuration. + */ + @Key(AKKA_REGION) + @Default("us-east") + String getRegion(); + + /** + * Comma-separated list of all regions to be used in Akka configuration. + */ + @Key(AKKA_REGIONS) + @Default("us-east") + String getRegions(); + + /** + * Number of UniqueValueActors to be started on each node + */ + @Key(AKKA_UNIQUE_VALUE_ACTORS) + @Default("300") + int getUniqueValueActors(); + + /** + * Comma-separated lists of seeds each with format {region}:{hostname}:{port} + */ + @Key(AKKA_REGION_SEEDS) + @Default("us-east:localhost:2551") + String getRegionSeeds(); + + /** + * Authoritative regions may be specified for types + * Comma-separated lists of region types each with format {region}:{type} + */ + // TODO: allow this to be set via REST API + @Key(AKKA_REGION_TYPES) + @Default("") + String getRegionTypes(); +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ClusterSingletonRouter.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ClusterSingletonRouter.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ClusterSingletonRouter.java new file mode 100644 index 0000000..8cd0ab0 --- /dev/null +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ClusterSingletonRouter.java @@ -0,0 +1,37 @@ +package org.apache.usergrid.persistence.collection.uniquevalues; + +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.actor.UntypedActor; +import akka.routing.ConsistentHashingRouter; +import akka.routing.FromConfig; + + +/** + * Uses a consistent hash to route Unique Value requests to UniqueValueActors. + */ +public class ClusterSingletonRouter extends UntypedActor { + + private final ActorRef router; + + + public ClusterSingletonRouter( String injectorName ) { + router = getContext().actorOf( + FromConfig.getInstance().props(Props.create(UniqueValueActor.class, injectorName )), "router"); + } + + @Override + public void onReceive(Object message) { + + if ( message instanceof UniqueValueActor.Request) { + UniqueValueActor.Request request = (UniqueValueActor.Request)message; + + ConsistentHashingRouter.ConsistentHashableEnvelope envelope = + new ConsistentHashingRouter.ConsistentHashableEnvelope( message, request.getRowKey() ); + router.tell( envelope, getSender()); + + } else { + unhandled(message); + } + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/RequestActor.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/RequestActor.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/RequestActor.java new file mode 100644 index 0000000..c27d4e1 --- /dev/null +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/RequestActor.java @@ -0,0 +1,170 @@ +package org.apache.usergrid.persistence.collection.uniquevalues; + +import akka.actor.ActorSelection; +import akka.actor.Address; +import akka.actor.UntypedActor; +import akka.cluster.Cluster; +import akka.cluster.ClusterEvent; +import akka.cluster.Member; +import akka.cluster.MemberStatus; +import org.apache.commons.lang3.RandomStringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; + + +/** + * Once notified of nodes, sends unique propertyValue requests to ClusterSingletonRouter via it's local proxy. + */ +class RequestActor extends UntypedActor { + private static final Logger logger = LoggerFactory.getLogger( RequestActor.class ); + + private final String name = RandomStringUtils.randomAlphanumeric( 4 ); + + private final Set<Address> nodes = new HashSet<>(); + + private final Cluster cluster = Cluster.get(getContext().system()); + private final String routerProxyPath; + + private boolean ready = false; + + + public RequestActor(String routerProxyPath ) { + this.routerProxyPath = routerProxyPath; + } + + // subscribe to cluster changes, MemberEvent + @Override + public void preStart() { + logger.debug("{} role {} address {}:{} starting up, subscribing to cluster events...", name, + cluster.getSelfRoles().iterator().next(), + cluster.readView().selfAddress().host(), + cluster.readView().selfAddress().hostPort()); + cluster.subscribe(getSelf(), ClusterEvent.MemberEvent.class, ClusterEvent.ReachabilityEvent.class); + } + + // re-subscribe when restart + @Override + public void postStop() { + cluster.unsubscribe(getSelf()); + } + + @Override + public void onReceive(Object message) { + + int startSize = nodes.size(); + + if ( message instanceof UniqueValueActor.Request && ready ) { + + // just pick any node, the ClusterSingletonRouter will do the consistent hash routing + List<Address> nodesList = new ArrayList<>( nodes ); + Address address = nodesList.get( ThreadLocalRandom.current().nextInt( nodesList.size() ) ); + ActorSelection service = getContext().actorSelection( address + routerProxyPath ); + service.tell( message, getSender() ); + + } else if ( message instanceof UniqueValueActor.Request && !ready ) { + logger.debug("{} responding with status unknown", name); + + getSender().tell( new UniqueValueActor.Response( + UniqueValueActor.Response.Status.ERROR ) , getSender() ); + + } else if ( message instanceof StatusRequest ) { + if ( ready ) { + getSender().tell( new StatusMessage( name, StatusMessage.Status.READY ), getSender() ); + } else { + getSender().tell( new StatusMessage( name, StatusMessage.Status.INITIALIZING), getSender() ); + } + return; + + } else { + processAsClusterEvent( message ); + } + + if ( logger.isDebugEnabled() && startSize != nodes.size() ) { + logger.debug( "{} now knows {} nodes", name, nodes.size() ); + } + + if (!nodes.isEmpty() && !ready) { + logger.debug( name + " is ready" ); + ready = true; + + } else if (nodes.isEmpty() && ready) { + ready = false; + } + } + + /** + * Process messages about nodes up, down, reachable and unreachable. + */ + private void processAsClusterEvent(Object message) { + + if (message instanceof ClusterEvent.CurrentClusterState) { + ClusterEvent.CurrentClusterState state = (ClusterEvent.CurrentClusterState) message; + nodes.clear(); + for (Member member : state.getMembers()) { + if (member.hasRole("io") && member.status().equals( MemberStatus.up())) { + nodes.add(member.address()); + logger.debug("RequestActor {} received cluster-state member-up for {}", name, member.address()); + } + } + + } else if (message instanceof ClusterEvent.MemberUp) { + ClusterEvent.MemberUp mUp = (ClusterEvent.MemberUp) message; + if (mUp.member().hasRole("io")) { + nodes.add( mUp.member().address() ); + } + logger.debug("{} received member-up for {}", name, mUp.member().address()); + + } else if (message instanceof ClusterEvent.MemberEvent) { + ClusterEvent.MemberEvent other = (ClusterEvent.MemberEvent) message; + nodes.remove(other.member().address()); + + } else if (message instanceof ClusterEvent.UnreachableMember) { + ClusterEvent.UnreachableMember unreachable = (ClusterEvent.UnreachableMember) message; + nodes.remove(unreachable.member().address()); + logger.debug("{} received un-reachable for {}", name, unreachable.member().address()); + + } else if (message instanceof ClusterEvent.ReachableMember) { + ClusterEvent.ReachableMember reachable = (ClusterEvent.ReachableMember) message; + if (reachable.member().hasRole("io")) { + nodes.add( reachable.member().address() ); + } + logger.debug("{} received reachable for {}", name, reachable.member().address()); + + } else { + logger.error("{}: unhandled message: {}", name, message.toString()); + unhandled(message); + } + } + + /** + * RequestAction responds to StatusRequests. + */ + static class StatusRequest implements Serializable { } + + /** + * RequestActor responds with, and some times unilaterally sends StatusMessages. + */ + static class StatusMessage implements Serializable { + final String name; + public enum Status { INITIALIZING, READY } + final Status status; + public StatusMessage(String name, Status status) { + this.name = name; + this.status = status; + } + public String getName() { + return name; + } + public boolean isReady() { + return status.equals( Status.READY ); + } + } +} + http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java new file mode 100644 index 0000000..d5c67c3 --- /dev/null +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java @@ -0,0 +1,52 @@ +package org.apache.usergrid.persistence.collection.uniquevalues; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheStats; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + + +// cannot be a Guice singleton, must be shared across injectors +// @com.google.inject.Singleton +public class ReservationCache { + private static final Logger logger = LoggerFactory.getLogger( RequestActor.class ); + + Cache<String, UniqueValueActor.Reservation> cache = CacheBuilder.newBuilder() + .maximumSize(1000) + .concurrencyLevel( 300 ) + .expireAfterWrite(30, TimeUnit.SECONDS) + .recordStats() + .build(); + + private static ReservationCache instance = new ReservationCache(); + + public static ReservationCache getInstance() { + return instance; + } + + private ReservationCache() {} + + public UniqueValueActor.Reservation get( String rowKey ) { + UniqueValueActor.Reservation res = cache.getIfPresent( rowKey ); + return res; + } + + public void cacheReservation( UniqueValueActor.Reservation reservation ) { + cache.put( reservation.getRowKey(), reservation ); + } + + public void cancelReservation( UniqueValueActor.Cancellation cancellation ) { + cache.invalidate( cancellation.getRowKey() ); + } + + public CacheStats getStats() { + return cache.stats(); + } + + public long getSize() { + return cache.size(); + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java new file mode 100644 index 0000000..add33fa --- /dev/null +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.persistence.collection.uniquevalues; + +import akka.actor.ActorRef; +import akka.actor.UntypedActor; +import akka.cluster.pubsub.DistributedPubSub; +import akka.cluster.pubsub.DistributedPubSubMediator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Updates local unique values cache based on reservations and cancellations. + */ +public class ReservationCacheActor extends UntypedActor { + private static final Logger logger = LoggerFactory.getLogger( ReservationCacheActor.class ); + + int reservationCount = 0; + int cancellationCount = 0; + + public ReservationCacheActor(String injectorName ) { + + logger.info("Starting for {}", injectorName); + + // subscribe to the topic named "content" + ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator(); + mediator.tell(new DistributedPubSubMediator.Subscribe("content", getSelf()), getSelf()); + } + + public void onReceive( Object msg ) { + + if ( msg instanceof UniqueValueActor.Reservation ) { + UniqueValueActor.Reservation res = (UniqueValueActor.Reservation)msg; + ReservationCache.getInstance().cacheReservation( res ); + + if ( ++reservationCount % 10 == 0 ) { + logger.debug("Received {} reservations cache size {}", + reservationCount, ReservationCache.getInstance().getSize()); + } + + } else if ( msg instanceof UniqueValueActor.Cancellation ) { + UniqueValueActor.Cancellation can = (UniqueValueActor.Cancellation)msg; + ReservationCache.getInstance().cancelReservation( can ); + + if ( ++cancellationCount % 10 == 0 ) { + logger.debug("Received {} cancellations", cancellationCount); + } + + } else if (msg instanceof DistributedPubSubMediator.SubscribeAck) { + logger.debug( "subscribing" ); + + } else { + unhandled( msg ); + } + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java new file mode 100644 index 0000000..faf0433 --- /dev/null +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java @@ -0,0 +1,245 @@ +package org.apache.usergrid.persistence.collection.uniquevalues; + +import akka.actor.ActorRef; +import akka.actor.UntypedActor; +import akka.cluster.pubsub.DistributedPubSub; +import akka.cluster.pubsub.DistributedPubSubMediator; +import org.apache.commons.lang3.RandomStringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.UUID; + +public class UniqueValueActor extends UntypedActor { + private static final Logger logger = LoggerFactory.getLogger( UniqueValueActor.class ); + + private final String name = RandomStringUtils.randomAlphanumeric( 4 ); + + //private MetricsService metricsService; + + private UniqueValuesTable table = new UniqueValuesTableImpl(); + + private ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator(); + + private int count = 0; + + + public UniqueValueActor( String injectorName ) { + +// UniqueValuesService uniqueValuesService = +// GuiceModule.getInjector( injectorName ).getInstance( UniqueValuesService.class ); +// +// terminateOnError = Boolean.parseBoolean( uniqueValuesService.getProperties() +// .getProperty( "akka.unique-value-actor-terminate-on-error", "false" ) ); +// +// chaos = Boolean.parseBoolean( uniqueValuesService.getProperties() +// .getProperty( "akka.test.chaos", "false" ) ); + +// metricsService = +// GuiceModule.getInjector( injectorName ).getInstance( MetricsService.class ); + } + + @Override + public void onReceive(Object message) { + + if ( message instanceof Request ) { + Request req = (Request) message; + + count++; + if (count % 10 == 0) { + logger.debug( "UniqueValueActor {} processed {} requests", name, count ); + } + } + + if ( message instanceof Reservation ) { + Reservation res = (Reservation) message; + +// final Timer.Context context = metricsService.getReservationTimer().time(); + + try { + UUID owner = table.lookupOwner( res.getType(), res.getPropertyName(), res.getPropertyValue() ); + + if ( owner != null && owner.equals( res.getUuid() )) { + // sender already owns this unique value + getSender().tell( new Response( Response.Status.IS_UNIQUE ), getSender() ); + return; + + } else if ( owner != null && !owner.equals( res.getUuid() )) { + // tell sender value is not unique + getSender().tell( new Response( Response.Status.NOT_UNIQUE ), getSender() ); + return; + } + + table.reserve( res.getUuid(), res.getType(), res.getPropertyName(), res.getPropertyValue() ); + + getSender().tell( new Response( Response.Status.IS_UNIQUE ), getSender() ); + + mediator.tell( new DistributedPubSubMediator.Publish( "content", + new Reservation( res ) ), getSelf() ); + + } catch (Throwable t) { + + getSender().tell( new Response( Response.Status.ERROR ), getSender() ); + logger.error( "Error processing request", t ); + + + } finally { +// context.stop(); + } + + } else if ( message instanceof Confirmation) { + Confirmation commit = (Confirmation) message; + +// final Timer.Context context = metricsService.getCommitmentTimer().time(); + + try { + UUID owner = table.lookupOwner( commit.getType(), commit.getPropertyName(), commit.getPropertyValue() ); + + if ( owner != null && !owner.equals( commit.getUuid() )) { + // cannot reserve, somebody else owns the unique value + getSender().tell( new Response( Response.Status.NOT_UNIQUE ), getSender() ); + return; + + } else if ( owner == null ) { + // cannot commit without first reserving + getSender().tell( new Response( Response.Status.BAD_REQUEST ), getSender() ); + return; + } + + table.commit( commit.getUuid(), commit.getType(), commit.getPropertyName(), commit.getPropertyValue() ); + + getSender().tell( new Response( Response.Status.IS_UNIQUE ), getSender() ); + + mediator.tell( new DistributedPubSubMediator.Publish( "content", + new Reservation( commit ) ), getSelf() ); + + } catch (Throwable t) { + getSender().tell( new Response( Response.Status.ERROR ), getSender() ); + logger.error( "Error processing request", t ); + + } finally { +// context.stop(); + } + + + } else if ( message instanceof Cancellation ) { + Cancellation can = (Cancellation) message; + + try { + UUID owner = table.lookupOwner( can.getType(), can.getPropertyName(), can.getPropertyValue() ); + + if ( owner != null && !owner.equals( can.getUuid() )) { + // cannot cancel, somebody else owns the unique value + getSender().tell( new Response( Response.Status.NOT_UNIQUE ), getSender() ); + return; + + } else if ( owner == null ) { + // cannot cancel unique value that does not exist + getSender().tell( new Response( Response.Status.BAD_REQUEST ), getSender() ); + return; + } + + table.cancel( can.getType(), can.getPropertyName(), can.getPropertyValue() ); + + getSender().tell( new Response( Response.Status.SUCCESS ), getSender() ); + + mediator.tell( new DistributedPubSubMediator.Publish( "content", + new Reservation( can ) ), getSelf() ); + + } catch (Throwable t) { + getSender().tell( new Response( Response.Status.ERROR ), getSender() ); + logger.error( "Error processing request", t ); + } + + } else { + unhandled( message ); + } + } + + + /** + * UniqueValue actor receives and processes Requests. + */ + public abstract static class Request implements Serializable { + final UUID uuid; + final String type; + final String propertyName; + final String propertyValue; + final String rowKey; + + public Request(UUID uuid, String type, String propertyName, String value) { + this.uuid = uuid; + this.type = type; + this.propertyName = propertyName; + this.propertyValue = value; + this.rowKey = getType() + ":" + getPropertyName() + ":" + getPropertyValue(); + } + public Request( Request req ) { + this.uuid = req.uuid; + this.type = req.type; + this.propertyName = req.propertyName; + this.propertyValue = req.propertyValue; + this.rowKey = getType() + ":" + getPropertyName() + ":" + getPropertyValue(); + + } + public String getRowKey() { + return rowKey; + } + public UUID getUuid() { + return uuid; + } + public String getType() { + return type; + } + public String getPropertyName() { + return propertyName; + } + public String getPropertyValue() { + return propertyValue; + } + } + + /** + * UniqueValue actor creates and sends Responses. + */ + public static class Response implements Serializable { + public enum Status { IS_UNIQUE, NOT_UNIQUE, SUCCESS, ERROR, BAD_REQUEST } + final Status status; + + public Response(Status status) { + this.status = status; + } + public Status getStatus() { + return status; + } + } + + public static class Reservation extends Request implements Serializable { + public Reservation( Request req ) { + super( req ); + } + public Reservation(UUID uuid, String type, String username, String value) { + super( uuid, type, username, value ); + } + } + + public static class Cancellation extends Request implements Serializable { + public Cancellation( Request req ) { + super( req ); + } + public Cancellation(UUID uuid, String type, String username, String value) { + super( uuid, type, username, value ); + } + } + + public static class Confirmation extends Request implements Serializable { + public Confirmation(Request req ) { + super( req ); + } + public Confirmation(UUID uuid, String type, String username, String value) { + super( uuid, type, username, value ); + } + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueException.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueException.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueException.java new file mode 100644 index 0000000..5df8237 --- /dev/null +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueException.java @@ -0,0 +1,7 @@ +package org.apache.usergrid.persistence.collection.uniquevalues; + +public class UniqueValueException extends Exception { + public UniqueValueException(String message) { + super( message ); + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesService.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesService.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesService.java new file mode 100644 index 0000000..2219df6 --- /dev/null +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesService.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.persistence.collection.uniquevalues; + + +import org.apache.usergrid.persistence.model.entity.Entity; + +/** + * Service that reserves and confirms unique values. + */ +public interface UniqueValuesService { + + /** + * Check that unique values are unique and reserve them for a limited time. + * If the reservations are not confirmed, they will expire. + */ + void reserveUniqueValues( Entity entity ) throws UniqueValueException; + + /** + * Confirm unique values that were reserved earlier. + */ + void confirmUniqueValues( Entity entity ) throws UniqueValueException; + + /** + * For test purposes only. + */ + void start( String hostname, Integer port, String region ); + + /** + * For test purposes only. + */ + void waitForRequestActors(); +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java new file mode 100644 index 0000000..8897091 --- /dev/null +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java @@ -0,0 +1,587 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.persistence.collection.uniquevalues; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.PoisonPill; +import akka.actor.Props; +import akka.cluster.singleton.ClusterSingletonManager; +import akka.cluster.singleton.ClusterSingletonManagerSettings; +import akka.cluster.singleton.ClusterSingletonProxy; +import akka.cluster.singleton.ClusterSingletonProxySettings; +import akka.pattern.Patterns; +import akka.util.Timeout; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; +import com.google.inject.Inject; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.apache.commons.lang3.StringUtils; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.field.Field; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Await; +import scala.concurrent.Future; + +import java.util.*; +import java.util.concurrent.TimeUnit; + + +public class UniqueValuesServiceImpl implements UniqueValuesService { + private static final Logger logger = LoggerFactory.getLogger( UniqueValuesServiceImpl.class ); + + @Inject + AkkaFig akkaFig; + + private String hostname; + private Integer port; + private String currentRegion; + + private Map<String, ActorRef> requestActorsByRegion; + private Map<String, String> regionsByType = new HashMap<>(); + +// private final MetricRegistry metrics = new MetricRegistry(); +// +// private final Timer getTimer = metrics.timer( "get" ); +// private final Timer saveTimer = metrics.timer( "save" ); +// +// private final Counter cacheCounter = metrics.counter( "cache" ); +// private final Counter dupCounter = metrics.counter( "duplicates" ); +// +// private final Timer reservationTimer = metrics.timer( "reservation" ); +// private final Timer commitmentTimer = metrics.timer( "commitment" ); + + private ReservationCache reservationCache; + + private final boolean disableUniqueValues; + + + public UniqueValuesServiceImpl() { + this.reservationCache = ReservationCache.getInstance(); + this.disableUniqueValues = false; + } + + + /** + * Init Akka ActorSystems and wait for request actors to start. + */ + public void start() { + this.hostname = akkaFig.getHostname(); + this.port = akkaFig.getPort(); + this.currentRegion = akkaFig.getRegion(); + + initAkka(); + waitForRequestActors(); + } + + /** + * For testing purposes only; does not wait for request actors to start. + */ + public void start( String hostname, Integer port, String currentRegion ) { + this.hostname = hostname; + this.port = port; + this.currentRegion = currentRegion; + + initAkka(); + } + + private Map<String, ActorRef> getRequestActorsByRegion() { + return requestActorsByRegion; + } + + private Map<String, String> getRegionsByType() { + return regionsByType; + } + +// public Counter getDupCounter() { +// return dupCounter; +// } +// +// public Counter getCacheCounter() { +// return cacheCounter; +// } +// +// public Timer getReservationTimer() { +// return reservationTimer; +// } +// +// public Timer getCommitmentTimer() { +// return commitmentTimer; +// } +// +// public Timer getSaveTimer() { +// return saveTimer; +// } +// +// public Timer getGetTimer() { +// return getTimer; +// } + + private void initAkka() { + logger.info("Initializing Akka"); + + // Create one actor system with request actor for each region + + if ( StringUtils.isEmpty( hostname )) { + throw new RuntimeException( "No value specified for akka.hostname"); + } + + if ( StringUtils.isEmpty( currentRegion )) { + throw new RuntimeException( "No value specified for akka.region"); + } + + String regionsValue = akkaFig.getRegions(); + if ( StringUtils.isEmpty( regionsValue )) { + throw new RuntimeException( "No value specified for akka.regions"); + } + + String[] regions = regionsValue.split( "," ); + for ( String region : regions ) { + + akkaFig.getKeyByMethod( "" ); + + String typesValue = akkaFig.getRegionTypes(); + String[] regionTypes = StringUtils.isEmpty( typesValue ) ? new String[0] : typesValue.split(","); + for ( String regionType : regionTypes ) { + String[] parts = regionType.split(":"); + String typeRegion = parts[0]; + String type = parts[1]; + this.regionsByType.put( type, typeRegion ); + } + } + + final Map<String, ActorSystem> systemMap = new HashMap<>(); + + ActorSystem localSystem = createClusterSingletonProxies( readClusterSingletonConfigs(), systemMap ); + + createRequestActors( systemMap ); + + subscribeToReservations( localSystem, systemMap ); + } + + private void subscribeToReservations( ActorSystem localSystem, Map<String, ActorSystem> systemMap ) { + + for ( String region : systemMap.keySet() ) { + ActorSystem actorSystem = systemMap.get( region ); + if ( !actorSystem.equals( localSystem ) ) { + logger.info("Starting ReservationCacheUpdater for {}", region ); + actorSystem.actorOf( Props.create( ReservationCacheActor.class, region ), "subscriber"); + } + } + } + + /** + * Create ActorSystem and ClusterSingletonProxy for every region. + * Create ClusterSingletonManager for the current region. + * + * @param configMap Configurations to be used to create ActorSystems + * @param systemMap Map of ActorSystems created by this method + * + * @return ActorSystem for this region. + */ + private ActorSystem createClusterSingletonProxies( + Map<String, Config> configMap, Map<String, ActorSystem> systemMap ) { + + ActorSystem localSystem = null; + + for ( String region : configMap.keySet() ) { + Config config = configMap.get( region ); + + ActorSystem system = ActorSystem.create( "ClusterSystem", config ); + systemMap.put( region, system ); + + // cluster singletons only run role "io" nodes and NOT on "client" nodes of other regions + if ( currentRegion.equals( region ) ) { + + localSystem = system; + + // create cluster singleton supervisor for actor system + ClusterSingletonManagerSettings settings = + ClusterSingletonManagerSettings.create( system ).withRole("io"); + system.actorOf( ClusterSingletonManager.props( + Props.create( ClusterSingletonRouter.class, region ), + PoisonPill.getInstance(), settings ), "uvRouter"); + } + + // create proxy for sending messages to singleton + ClusterSingletonProxySettings proxySettings = + ClusterSingletonProxySettings.create( system ).withRole("io"); + system.actorOf( ClusterSingletonProxy.props( "/user/uvRouter", proxySettings ), "uvProxy" ); + } + + return localSystem; + } + + /** + * Create RequestActor for each region. + * + * @param systemMap Map of regions to ActorSystems. + */ + private void createRequestActors( Map<String, ActorSystem> systemMap ) { + + requestActorsByRegion = new HashMap<>(); + + for ( String region : systemMap.keySet() ) { + + // Each RequestActor needs to know path to ClusterSingletonProxy and region + ActorRef requestActor = systemMap.get( region ).actorOf( + Props.create( RequestActor.class, "/user/uvProxy" ), "requestActor" ); + + requestActorsByRegion.put( region, requestActor ); + } + } + + public void waitForRequestActors() { + + for ( String region : requestActorsByRegion.keySet() ) { + ActorRef ra = requestActorsByRegion.get( region ); + waitForRequestActor( ra ); + } + } + + private void waitForRequestActor( ActorRef ra ) { + + logger.info( "Waiting on request actor {}...", ra.path() ); + + boolean started = false; + int retries = 0; + int maxRetries = 60; + while (retries < maxRetries) { + Timeout t = new Timeout( 10, TimeUnit.SECONDS ); + + Future<Object> fut = Patterns.ask( ra, new RequestActor.StatusRequest(), t ); + try { + RequestActor.StatusMessage result = (RequestActor.StatusMessage) Await.result( fut, t.duration() ); + + if (result.status.equals( RequestActor.StatusMessage.Status.READY )) { + started = true; + break; + } + logger.info( "Waiting for request actor {} region {} ({}s)", ra.path(), currentRegion, retries ); + Thread.sleep( 1000 ); + + } catch (Exception e) { + logger.error( "Error: Timeout waiting for requestActor" ); + } + retries++; + } + + if (started) { + logger.info( "RequestActor has started" ); + } else { + throw new RuntimeException( "RequestActor did not start in time" ); + } + } + + + /** + * Read configuration and create a Config for each region. + * + * @return Map of regions to Configs. + */ + private Map<String, Config> readClusterSingletonConfigs() { + + Map<String, Config> configs = new HashMap<>(); + + ListMultimap<String, String> seedsByRegion = ArrayListMultimap.create(); + + String[] regionSeeds = akkaFig.getRegionSeeds().split( "," ); + + try { + + if ( port != null ) { + + // we are testing + String seed = "akka.tcp://ClusterSystem@" + hostname + ":" + port; + seedsByRegion.put( currentRegion, seed ); + + } else { + + for (String regionSeed : regionSeeds) { + + String[] parts = regionSeed.split( ":" ); + String region = parts[0]; + String hostname = parts[1]; + String regionPortString = parts[2]; + + // all seeds in same region must use same port + // we assume 0th seed has the right port + final Integer regionPort; + + if (port == null) { + // we assume 0th seed has the right port + regionPort = Integer.parseInt( regionPortString ); + } else { + regionPort = port; // unless we are testing + } + + String seed = "akka.tcp://ClusterSystem@" + hostname + ":" + regionPort; + + seedsByRegion.put( region, seed ); + } + + if (seedsByRegion.keySet().isEmpty()) { + throw new RuntimeException( "No seeds listed in 'parsing collection.akka.region.seeds' property." ); + } + } + + int numInstancesPerNode = akkaFig.getUniqueValueActors(); + + for ( String region : seedsByRegion.keySet() ) { + + List<String> seeds = seedsByRegion.get( region ); + + final Integer regionPort; + + if (port == null) { + // we assume 0th seed has the right port + int lastColon = seeds.get(0).lastIndexOf(":") + 1; + regionPort = Integer.parseInt( seeds.get(0).substring( lastColon )); + } else { + regionPort = port; // unless we are testing + } + + // cluster singletons only run role "io" nodes and NOT on "client" nodes of other regions + String clusterRole = currentRegion.equals( region ) ? "io" : "client"; + + logger.info( "Config for region {} is:\npoc Akka Hostname {}\npoc Akka Seeds {}\n" + + "poc Akka Port {}\npoc UniqueValueActors per node {}", + region, hostname, seeds, port, numInstancesPerNode ); + + Map<String, Object> configMap = new HashMap<String, Object>() {{ + put( "akka", new HashMap<String, Object>() {{ + put( "remote", new HashMap<String, Object>() {{ + put( "netty.tcp", new HashMap<String, Object>() {{ + put( "hostname", hostname ); + put( "bind-hostname", hostname ); + put( "port", regionPort ); + }} ); + }} ); + put( "cluster", new HashMap<String, Object>() {{ + put( "max-nr-of-instances-per-node", numInstancesPerNode ); + put( "roles", Collections.singletonList(clusterRole) ); + put( "seed-nodes", new ArrayList<String>() {{ + for (String seed : seeds) { + add( seed ); + } + }} ); + }} ); + put( "actor", new HashMap<String, Object>() {{ + put( "deployment", new HashMap<String, Object>() {{ + put( "/uvRouter/singleton/router", new HashMap<String, Object>() {{ + put( "cluster", new HashMap<String, Object>() {{ + //put( "roles", Collections.singletonList(role) ); + put( "max-nr-of-instances-per-node", numInstancesPerNode ); + }} ); + }} ); + }} ); + }} ); + }} ); + }}; + + Config config = ConfigFactory + .parseMap( configMap ) + .withFallback( ConfigFactory.parseString( "akka.cluster.roles = [io]" ) ) + .withFallback( ConfigFactory.load( "cluster-singleton" ) ); + + configs.put( region, config ); + } + + } catch ( Exception e ) { + throw new RuntimeException("Error 'parsing collection.akka.region.seeds' property", e ); + } + + return configs; + } + + + + @Override + public void reserveUniqueValues(Entity entity) throws UniqueValueException { + + try { + for (Field field : entity.getFields()) { + if (field.isUnique()) { + reserveUniqueField( entity, field.getName(), field.getValue().toString() ); + } + } + + } catch ( UniqueValueException e ) { + + for (Field field : entity.getFields()) { + try { + cancelUniqueField( entity, field.getName(), field.getValue().toString() ); + } catch (Throwable ignored) { + logger.debug( "Error canceling unique field", ignored ); + } + } + throw e; + } + + } + + + @Override + public void confirmUniqueValues(Entity entity) throws UniqueValueException { + + try { + for (Field field : entity.getFields()) { + if (field.isUnique()) { + confirmUniqueField( entity, field.getName(), field.getValue().toString() ); + } + } + + } catch ( UniqueValueException e ) { + + for (Field field : entity.getFields()) { + try { + cancelUniqueField( entity, field.getName(), field.getValue().toString() ); + } catch (Throwable ignored) { + logger.debug( "Error canceling unique field", ignored ); + } + } + throw e; + } + + } + + + private void reserveUniqueField( + Entity entity, String propertyName, String propertyValue ) throws UniqueValueException { + + String region = getRegionsByType().get("user"); + ActorRef requestActor = getRequestActorsByRegion().get(region); + + if ( requestActor == null ) { + throw new RuntimeException( "No request actor for type, cannot verify unique fields!" ); + } + + UniqueValueActor.Request request = new UniqueValueActor.Reservation( + entity.getId().getUuid(), "user", propertyName, propertyValue ); + + UniqueValueActor.Reservation res = reservationCache.get( request.getRowKey() ); +// if ( res != null ) { +// getCacheCounter().inc(); +// } + if ( res != null && !res.getUuid().equals( request.getUuid() )) { + throw new UniqueValueException( "Error property not unique (cache)" ); + } + + sendUniqueValueRequest( entity, requestActor, request ); + } + + private void confirmUniqueField( + Entity entity, String propertyName, String propertyValue ) throws UniqueValueException { + + String region = getRegionsByType().get("user"); + ActorRef requestActor = getRequestActorsByRegion().get(region); + + if ( requestActor == null ) { + throw new RuntimeException( "No request actor for type, cannot verify unique fields!" ); + } + + UniqueValueActor.Confirmation request = new UniqueValueActor.Confirmation( + entity.getId().getUuid(), "user", propertyName, propertyValue ); + + sendUniqueValueRequest( entity, requestActor, request ); + } + + private void cancelUniqueField( + Entity entity, String propertyName, String propertyValue ) throws UniqueValueException { + + ActorRef requestActor = lookupRequestActorForType( "user" ); + + if ( requestActor == null ) { + throw new RuntimeException( "No request actor for type, cannot verify unique fields!" ); + } + + UniqueValueActor.Confirmation request = new UniqueValueActor.Confirmation( + entity.getId().getUuid(), "user", propertyName, propertyValue ); + + requestActor.tell( request, null ); + } + + private ActorRef lookupRequestActorForType( String type ) { + String region = getRegionsByType().get( type ); + if ( region == null ) { + throw new RuntimeException( "No region specified for type: " + type ); + } + ActorRef requestActor = getRequestActorsByRegion().get(region); + if ( requestActor == null ) { + throw new RuntimeException( "No request actor available for region: " + region ); + } + return requestActor; + } + + private void sendUniqueValueRequest( + Entity entity, ActorRef requestActor, UniqueValueActor.Request request) throws UniqueValueException { + + int maxRetries = 5; + int retries = 0; + + UniqueValueActor.Response response = null; + while ( retries++ < maxRetries ) { + try { + Timeout t = new Timeout( 1, TimeUnit.SECONDS ); + + // ask RequestActor and wait (up to timeout) for response + + Future<Object> fut = Patterns.ask( requestActor, request, t ); + response = (UniqueValueActor.Response) Await.result( fut, t.duration() ); + + if ( response != null && ( + response.getStatus().equals( UniqueValueActor.Response.Status.IS_UNIQUE ) + || response.getStatus().equals( UniqueValueActor.Response.Status.NOT_UNIQUE ))) { + if ( retries > 1 ) { + logger.debug("IS_UNIQUE after retrying {} for entity {} rowkey {}", + retries, entity.getId().getUuid(), request.getRowKey()); + } + break; + + } else if ( response != null ) { + logger.debug("ERROR status retrying {} entity {} rowkey {}", + retries, entity.getId().getUuid(), request.getRowKey()); + } else { + logger.debug("Timed-out retrying {} entity {} rowkey", + retries, entity.getId().getUuid(), request.getRowKey()); + } + + } catch ( Exception e ) { + logger.debug("{} caused retry {} for entity {} rowkey {}", + e.getClass().getSimpleName(), retries, entity.getId().getUuid(), request.getRowKey()); + } + } + + if ( response == null || response.getStatus().equals( UniqueValueActor.Response.Status.ERROR )) { + logger.debug("ERROR after retrying {} for entity {} rowkey {}", + retries, entity.getId().getUuid(), request.getRowKey()); + + // should result in an HTTP 503 + throw new RuntimeException( "Error verifying unique value after " + retries + " retries"); + } + + if ( response.getStatus().equals( UniqueValueActor.Response.Status.NOT_UNIQUE )) { + + // should result in an HTTP 409 (conflict) + throw new UniqueValueException( "Error property not unique" ); + } + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTable.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTable.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTable.java new file mode 100644 index 0000000..4309eb0 --- /dev/null +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTable.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.persistence.collection.uniquevalues; + +import java.util.UUID; + + +public interface UniqueValuesTable { + + UUID lookupOwner(String entityType, String propertyName, String propertyValue); + + void reserve(UUID owner, String entityType, String propertyName, String propertyValue); + + void commit(UUID owner, String entityType, String propertyName, String propertyValue); + + void cancel(String entityType, String propertyName, String propertyValue); +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java new file mode 100644 index 0000000..ee3d621 --- /dev/null +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.persistence.collection.uniquevalues; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.UUID; + + +public class UniqueValuesTableImpl implements UniqueValuesTable { + private static final Logger logger = LoggerFactory.getLogger( UniqueValuesTableImpl.class ); + + @Override + public UUID lookupOwner(String entityType, String propertyName, String propertyValue) { + return null; + } + + @Override + public void reserve(UUID owner, String entityType, String propertyName, String propertyValue) { + } + + @Override + public void commit(UUID owner, String entityType, String propertyName, String propertyValue) { + } + + @Override + public void cancel(String entityType, String propertyName, String propertyValue) { + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/resources/application.conf b/stack/corepersistence/collection/src/main/resources/application.conf new file mode 100644 index 0000000..93854f9 --- /dev/null +++ b/stack/corepersistence/collection/src/main/resources/application.conf @@ -0,0 +1,28 @@ +akka { + + loggers = ["akka.event.slf4j.Slf4jLogger"] + loglevel = "ERROR" + logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" + + actor { + provider = "akka.cluster.ClusterActorRefProvider" + } + + remote { + log-remote-lifecycle-events = off + netty.tcp { + hostname = "127.0.0.1" + port = 0 + } + } +} + +# Disable legacy metrics in akka-cluster. +akka.cluster.metrics.enabled=off + +# Enable metrics extension in akka-cluster-metrics. +akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension", "akka.cluster.pubsub.DistributedPubSub"] + +# Sigar native library extract location during tests. +# Note: use per-jvm-instance folder when running multiple jvm on one host. +akka.cluster.metrics.native-library-extract-folder=${user.dir}/target/native http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/src/main/resources/cluster-singleton.conf ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/resources/cluster-singleton.conf b/stack/corepersistence/collection/src/main/resources/cluster-singleton.conf new file mode 100644 index 0000000..907aebb --- /dev/null +++ b/stack/corepersistence/collection/src/main/resources/cluster-singleton.conf @@ -0,0 +1,25 @@ +include "application" + +akka.actor.deployment { + /uvRouter/singleton/router { + router = consistent-hashing-pool + cluster { + enabled = on + allow-local-routees = on + + # singleton will only run on nodes with role "io" + use-role = io + + # more forgiving failure detector + failure-detector { + threshold = 10 + acceptable-heartbeat-pause = 3 s + heartbeat-interval = 1 s + heartbeat-request { + expected-response-after = 3 s + } + } + + } + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java index 5718e4f..dd618a9 100644 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java @@ -20,6 +20,8 @@ package org.apache.usergrid.persistence.collection.guice; +import com.google.inject.Guice; +import com.google.inject.Injector; import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; import org.apache.usergrid.persistence.core.guice.CommonModule; import org.apache.usergrid.persistence.core.guice.TestModule; @@ -28,6 +30,9 @@ import org.apache.usergrid.persistence.core.migration.data.TestMigrationDataProv import com.google.inject.TypeLiteral; +import java.util.HashMap; +import java.util.Map; + public class TestCollectionModule extends TestModule { @@ -51,4 +56,16 @@ public class TestCollectionModule extends TestModule { // install(new MaxMigrationModule()); } + + + private static Map<String, Injector> injectorsByName = new HashMap<>(); + + public static Injector getInjector( String name ) { + Injector i = injectorsByName.get( name ); + if ( i == null ) { + i = Guice.createInjector( new TestCollectionModule() ); + } + return i; + } + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFigTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFigTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFigTest.java new file mode 100644 index 0000000..59f76de --- /dev/null +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFigTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.persistence.collection.uniquevalues; + +import com.google.inject.Inject; +import org.apache.usergrid.persistence.collection.guice.TestCollectionModule; +import org.apache.usergrid.persistence.core.test.ITRunner; +import org.apache.usergrid.persistence.core.test.UseModules; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; + + +@RunWith( ITRunner.class ) +@UseModules( TestCollectionModule.class ) +public class AkkaFigTest { + + @Inject + AkkaFig akkaFig; + + @Test + public void testBasicOperation() { + Assert.assertNotNull( akkaFig ); + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/uniquevalues/LocalPreventDupsTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/uniquevalues/LocalPreventDupsTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/uniquevalues/LocalPreventDupsTest.java new file mode 100644 index 0000000..bee47eb --- /dev/null +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/uniquevalues/LocalPreventDupsTest.java @@ -0,0 +1,141 @@ +package org.apache.usergrid.persistence.collection.uniquevalues; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; +import com.google.inject.Inject; +import org.apache.usergrid.persistence.collection.EntityCollectionManager; +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; +import org.apache.usergrid.persistence.collection.guice.TestCollectionModule; +import org.apache.usergrid.persistence.core.guice.MigrationManagerRule; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; +import org.apache.usergrid.persistence.core.test.ITRunner; +import org.apache.usergrid.persistence.core.test.UseModules; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.SimpleId; +import org.apache.usergrid.persistence.model.field.StringField; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import rx.Observable; + +import java.util.Collection; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Prevent dups test that uses UserManager and not REST. + */ +@RunWith( ITRunner.class ) +@UseModules( TestCollectionModule.class ) +public class LocalPreventDupsTest { + private static final Logger logger = LoggerFactory.getLogger( LocalPreventDupsTest.class ); + + @Inject + private EntityCollectionManagerFactory factory; + + @Inject + @Rule + public MigrationManagerRule migrationManagerRule; + + + private static final AtomicInteger successCounter = new AtomicInteger( 0 ); + private static final AtomicInteger errorCounter = new AtomicInteger( 0 ); + + @Test + public void testBasicOperation() throws Exception { + + UniqueValuesService appEast1 = + TestCollectionModule.getInjector( "us-east" ).getInstance( UniqueValuesService.class ); + appEast1.start("127.0.0.1", 2551, "us-east"); + + UniqueValuesService appEast2 = + TestCollectionModule.getInjector( "us-east" ).getInstance( UniqueValuesService.class ); + appEast2.start("127.0.0.1", 2552, "us-east"); + + appEast1.waitForRequestActors(); + appEast2.waitForRequestActors(); + + int numUsers = 100; + Multimap<String, Entity> usersCreated = generateDuplicateUsers( numUsers ); + + int userCount = 0; + int usernamesWithDuplicates = 0; + for ( String username : usersCreated.keySet() ) { + Collection<Entity> users = usersCreated.get( username ); + if ( users.size() > 1 ) { + usernamesWithDuplicates++; + } + userCount++; + } + + Assert.assertEquals( 0, usernamesWithDuplicates ); + Assert.assertEquals( numUsers, successCounter.get() ); + Assert.assertEquals( 0, errorCounter.get() ); + Assert.assertEquals( numUsers, usersCreated.size() ); + Assert.assertEquals( numUsers, userCount ); + } + + private Multimap<String, Entity> generateDuplicateUsers(int numUsers ) { + + ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); + + EntityCollectionManager manager = factory.createCollectionManager( context ); + + Multimap<String, Entity> usersCreated = + Multimaps.synchronizedListMultimap( ArrayListMultimap.create() ); + + ExecutorService execService = Executors.newFixedThreadPool( 10 ); + + for (int i = 0; i < numUsers; i++) { + + // multiple threads simultaneously trying to create a user with the same propertyName + for (int j = 0; j < 5; j++) { + String username = "user_" + i; + + execService.submit( () -> { + + try { + Entity newEntity = new Entity( new SimpleId( "user" ) ); + newEntity.setField( new StringField( "username", username, true ) ); + newEntity.setField( new StringField( "email", username + "@example.org", true ) ); + + Observable<Entity> observable = manager.write( newEntity ); + Entity returned = observable.toBlocking().lastOrDefault( null ); + + usersCreated.put( username, newEntity ); + successCounter.incrementAndGet(); + + logger.debug("Created user {}", username); + + } catch ( Throwable t ) { + if ( t instanceof UniqueValueException ) { + // we expect lots of these + } else { + errorCounter.incrementAndGet(); + logger.error( "Error creating user " + username, t ); + } + } + + } ); + } + } + execService.shutdown(); + + try { + while (!execService.awaitTermination( 60, TimeUnit.SECONDS )) { + System.out.println( "Waiting..." ); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + + return usersCreated; + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/52ee2fb7/stack/corepersistence/collection/src/test/resources/usergrid.properties ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/resources/usergrid.properties b/stack/corepersistence/collection/src/test/resources/usergrid.properties index 015c681..9059f0e 100644 --- a/stack/corepersistence/collection/src/test/resources/usergrid.properties +++ b/stack/corepersistence/collection/src/test/resources/usergrid.properties @@ -1,2 +1,16 @@ # This property is required to be set and cannot be defaulted anywhere usergrid.cluster_name=usergrid + +collection.akka.hostname=localhost + +collection.akka.port=2551 + +collection.akka.region=us-east + +collection.akka.regions=us-east + +collection.akka.region.seeds=us-east:localhost:2551,us-east:localhost:2552 + +collection.akka.region.types=us-east:users,us-east:cats + +collection.akka.unique.value.actors=400
