Cherry picking Akka/UV changes into release-2.1.1
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/11aa1386 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/11aa1386 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/11aa1386 Branch: refs/heads/usergrid-1268-akka-211 Commit: 11aa13865d4f168bda9c394be791f8f635760f60 Parents: 3f4bd02 Author: Dave Johnson <[email protected]> Authored: Thu Apr 21 14:46:24 2016 -0400 Committer: Dave Johnson <[email protected]> Committed: Mon Apr 25 14:52:51 2016 -0400 ---------------------------------------------------------------------- .../main/resources/usergrid-default.properties | 36 +++ .../usergrid/corepersistence/CoreModule.java | 4 +- .../corepersistence/CpEntityManager.java | 295 ++++++++----------- .../corepersistence/CpEntityManagerFactory.java | 26 +- .../corepersistence/CpRelationManager.java | 14 +- .../corepersistence/EntityManagerFig.java | 9 + .../index/CollectionSettingsCacheFactory.java | 44 +++ .../index/IndexSchemaCacheFactory.java | 44 --- .../corepersistence/index/IndexServiceImpl.java | 29 +- .../index/ReIndexServiceImpl.java | 14 +- .../service/ApplicationServiceImpl.java | 10 +- .../index/AsyncIndexServiceTest.java | 2 +- .../corepersistence/index/IndexServiceTest.java | 8 +- .../collection/EntityCollectionManager.java | 3 +- .../impl/EntityCollectionManagerImpl.java | 25 +- .../mvcc/stage/CollectionIoEvent.java | 14 +- .../mvcc/stage/write/WriteCommit.java | 53 +++- .../mvcc/stage/write/WriteUniqueVerify.java | 2 +- .../collection/uniquevalues/AkkaFig.java | 21 +- .../uniquevalues/ReservationCache.java | 30 +- .../uniquevalues/ReservationCacheActor.java | 4 +- .../uniquevalues/UniqueValueActor.java | 6 - .../uniquevalues/UniqueValuesService.java | 27 +- .../uniquevalues/UniqueValuesServiceImpl.java | 64 ++-- .../collection/EntityCollectionManagerIT.java | 57 ++-- .../EntityCollectionManagerStressTest.java | 2 +- .../mvcc/stage/write/WriteUniqueVerifyIT.java | 14 +- .../uniquevalues/UniqueValuesServiceTest.java | 2 +- .../usergrid/rest/AbstractContextResource.java | 1 + .../rest/applications/CollectionResource.java | 27 +- .../collection/CollectionsResourceIT.java | 180 +++++------ .../resources/usergrid-services-context.xml | 2 +- 32 files changed, 561 insertions(+), 508 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/11aa1386/stack/config/src/main/resources/usergrid-default.properties ---------------------------------------------------------------------- diff --git a/stack/config/src/main/resources/usergrid-default.properties b/stack/config/src/main/resources/usergrid-default.properties index 5cd7c7a..4c4f29d 100644 --- a/stack/config/src/main/resources/usergrid-default.properties +++ b/stack/config/src/main/resources/usergrid-default.properties @@ -412,6 +412,42 @@ usergrid.queue.lock.timeout=5 +######################### Usergrid Unique Values Validation ################## +# +# Usergrid includes a distributed unique values validation that ensure that +# unique values rename unique across a distributed and multi-region system. +# This system is based on the Akka actor system and requires some additional +# configuration. +# +# The system uses consistent hashing to ensure that one single-threaded actor +# ever accesses a unique value record at one time. +# +# For more information: https://issues.apache.org/jira/browse/USERGRID-1268 +# + +collection.akka.enabled=false + +collection.akka.hostname=localhost + +collection.akka.port=2551 + +# the region MUST be in the region list specified in the 'usergrid.queue.regionList' property +collection.akka.region=us-east-1 + +# Comma-separated lists of seeds each with format {region}:{hostname}:{port}. +# Regions MUST be listed in the 'usergrid.queue.regionList' +collection.akka.region.seeds=us-east-1:localhost:2551 + +# The number of unique value actors to start on each Usergrid instance. +collection.akka.uniquevalue.actors=300 + +# +collection.akka.uniquevalue.cache.ttl=10 + +# TTL of a unique value reservation when written to Cassandra +collection.akka.uniquevalue.reservation.ttl=10 + + ############################## Usergrid Scheduler ########################### # # Usergrid uses a scheduler for some functions such as scheduled push notificatins. http://git-wip-us.apache.org/repos/asf/usergrid/blob/11aa1386/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java index 0457000..933090d 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java @@ -16,7 +16,7 @@ package org.apache.usergrid.corepersistence; -import org.apache.usergrid.corepersistence.index.IndexSchemaCacheFactory; +import org.apache.usergrid.corepersistence.index.CollectionSettingsCacheFactory; import org.apache.usergrid.corepersistence.index.IndexSchemaCacheFig; import org.apache.usergrid.locking.guice.LockModule; import org.apache.usergrid.persistence.cache.guice.CacheModule; @@ -130,7 +130,7 @@ public class CoreModule extends AbstractModule { bind( ManagerCache.class ).to( CpManagerCache.class ); bind( ApplicationIdCacheFactory.class ); - bind( IndexSchemaCacheFactory.class ); + bind( CollectionSettingsCacheFactory.class ); /** http://git-wip-us.apache.org/repos/asf/usergrid/blob/11aa1386/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index 0db63be..fabe33b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -16,76 +16,37 @@ package org.apache.usergrid.corepersistence; -import java.nio.ByteBuffer; -import java.time.Instant; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.UUID; -import java.util.stream.Collectors; - -import org.apache.usergrid.persistence.collection.EntitySet; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.util.Assert; - +import com.codahale.metrics.Meter; +import com.codahale.metrics.Timer; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import me.prettyprint.hector.api.Keyspace; +import me.prettyprint.hector.api.beans.*; +import me.prettyprint.hector.api.factory.HFactory; +import me.prettyprint.hector.api.mutation.Mutator; +import me.prettyprint.hector.api.query.MultigetSliceCounterQuery; +import me.prettyprint.hector.api.query.QueryResult; +import me.prettyprint.hector.api.query.SliceCounterQuery; +import org.apache.commons.lang.NullArgumentException; import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; import org.apache.usergrid.corepersistence.index.CollectionSettingsCache; -import org.apache.usergrid.corepersistence.index.IndexSchemaCacheFactory; +import org.apache.usergrid.corepersistence.index.CollectionSettingsCacheFactory; import org.apache.usergrid.corepersistence.service.CollectionService; import org.apache.usergrid.corepersistence.service.ConnectionService; import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; import org.apache.usergrid.corepersistence.util.CpNamingUtils; -import org.apache.usergrid.persistence.AggregateCounter; -import org.apache.usergrid.persistence.AggregateCounterSet; -import org.apache.usergrid.persistence.CollectionRef; -import org.apache.usergrid.persistence.ConnectedEntityRef; -import org.apache.usergrid.persistence.ConnectionRef; -import org.apache.usergrid.persistence.Entity; -import org.apache.usergrid.persistence.EntityFactory; -import org.apache.usergrid.persistence.EntityManager; -import org.apache.usergrid.persistence.EntityRef; -import org.apache.usergrid.persistence.IndexBucketLocator; -import org.apache.usergrid.persistence.Query; +import org.apache.usergrid.persistence.*; import org.apache.usergrid.persistence.Query.Level; -import org.apache.usergrid.persistence.RelationManager; -import org.apache.usergrid.persistence.Results; -import org.apache.usergrid.persistence.Schema; -import org.apache.usergrid.persistence.SimpleEntityRef; -import org.apache.usergrid.persistence.SimpleRoleRef; -import org.apache.usergrid.persistence.TypedEntity; -import org.apache.usergrid.persistence.cassandra.ApplicationCF; -import org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils; -import org.apache.usergrid.persistence.cassandra.CassandraService; -import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl; -import org.apache.usergrid.persistence.cassandra.CounterUtils; +import org.apache.usergrid.persistence.cassandra.*; import org.apache.usergrid.persistence.cassandra.util.TraceParticipant; import org.apache.usergrid.persistence.collection.EntityCollectionManager; +import org.apache.usergrid.persistence.collection.EntitySet; import org.apache.usergrid.persistence.collection.FieldSet; import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.entities.Application; -import org.apache.usergrid.persistence.entities.Event; -import org.apache.usergrid.persistence.entities.Group; -import org.apache.usergrid.persistence.entities.Role; -import org.apache.usergrid.persistence.entities.User; -import org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsException; -import org.apache.usergrid.persistence.exceptions.EntityNotFoundException; -import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException; -import org.apache.usergrid.persistence.exceptions.UnexpectedEntityTypeException; +import org.apache.usergrid.persistence.entities.*; +import org.apache.usergrid.persistence.exceptions.*; import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.graph.GraphManagerFactory; import org.apache.usergrid.persistence.graph.SearchEdgeType; @@ -99,75 +60,32 @@ import org.apache.usergrid.persistence.model.entity.SimpleId; import org.apache.usergrid.persistence.model.field.Field; import org.apache.usergrid.persistence.model.field.StringField; import org.apache.usergrid.persistence.model.util.UUIDGenerator; -import org.apache.usergrid.utils.ClassUtils; -import org.apache.usergrid.utils.CompositeUtils; -import org.apache.usergrid.utils.InflectionUtils; -import org.apache.usergrid.utils.Inflector; -import org.apache.usergrid.utils.JsonUtils; -import org.apache.usergrid.utils.StringUtils; -import org.apache.usergrid.utils.UUIDUtils; - -import com.codahale.metrics.Meter; -import com.codahale.metrics.Timer; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; - -import me.prettyprint.hector.api.Keyspace; -import me.prettyprint.hector.api.beans.ColumnSlice; -import me.prettyprint.hector.api.beans.CounterRow; -import me.prettyprint.hector.api.beans.CounterRows; -import me.prettyprint.hector.api.beans.CounterSlice; -import me.prettyprint.hector.api.beans.DynamicComposite; -import me.prettyprint.hector.api.beans.HColumn; -import me.prettyprint.hector.api.beans.HCounterColumn; -import me.prettyprint.hector.api.factory.HFactory; -import me.prettyprint.hector.api.mutation.Mutator; -import me.prettyprint.hector.api.query.MultigetSliceCounterQuery; -import me.prettyprint.hector.api.query.QueryResult; -import me.prettyprint.hector.api.query.SliceCounterQuery; +import org.apache.usergrid.utils.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.Assert; import rx.Observable; +import java.nio.ByteBuffer; +import java.time.Instant; +import java.util.*; +import java.util.stream.Collectors; + import static java.lang.String.CASE_INSENSITIVE_ORDER; import static java.util.Arrays.asList; - import static me.prettyprint.hector.api.factory.HFactory.createCounterSliceQuery; import static me.prettyprint.hector.api.factory.HFactory.createMutator; import static org.apache.commons.lang.StringUtils.capitalize; import static org.apache.commons.lang.StringUtils.isBlank; import static org.apache.usergrid.corepersistence.util.CpEntityMapUtils.entityToCpEntity; -import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createConnectionTypeSearch; -import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createGraphOperationTimestamp; -import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getConnectionNameFromEdgeName; -import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES; -import static org.apache.usergrid.persistence.Schema.COLLECTION_USERS; -import static org.apache.usergrid.persistence.Schema.DICTIONARY_PERMISSIONS; -import static org.apache.usergrid.persistence.Schema.DICTIONARY_ROLENAMES; -import static org.apache.usergrid.persistence.Schema.DICTIONARY_ROLETIMES; -import static org.apache.usergrid.persistence.Schema.DICTIONARY_SETS; -import static org.apache.usergrid.persistence.Schema.PROPERTY_CREATED; -import static org.apache.usergrid.persistence.Schema.PROPERTY_INACTIVITY; -import static org.apache.usergrid.persistence.Schema.PROPERTY_MODIFIED; -import static org.apache.usergrid.persistence.Schema.PROPERTY_NAME; -import static org.apache.usergrid.persistence.Schema.PROPERTY_TIMESTAMP; -import static org.apache.usergrid.persistence.Schema.PROPERTY_TYPE; -import static org.apache.usergrid.persistence.Schema.PROPERTY_UUID; -import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION; -import static org.apache.usergrid.persistence.Schema.TYPE_ENTITY; +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.*; +import static org.apache.usergrid.persistence.Schema.*; import static org.apache.usergrid.persistence.SimpleEntityRef.getUuid; -import static org.apache.usergrid.persistence.cassandra.ApplicationCF.APPLICATION_AGGREGATE_COUNTERS; -import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_COMPOSITE_DICTIONARIES; -import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_COUNTERS; -import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_DICTIONARIES; +import static org.apache.usergrid.persistence.cassandra.ApplicationCF.*; import static org.apache.usergrid.persistence.cassandra.CassandraService.ALL_COUNT; -import static org.apache.usergrid.persistence.cassandra.Serializers.be; -import static org.apache.usergrid.persistence.cassandra.Serializers.le; -import static org.apache.usergrid.persistence.cassandra.Serializers.se; -import static org.apache.usergrid.persistence.cassandra.Serializers.ue; +import static org.apache.usergrid.persistence.cassandra.Serializers.*; import static org.apache.usergrid.utils.ClassUtils.cast; -import static org.apache.usergrid.utils.ConversionUtils.bytebuffer; -import static org.apache.usergrid.utils.ConversionUtils.getLong; -import static org.apache.usergrid.utils.ConversionUtils.object; -import static org.apache.usergrid.utils.ConversionUtils.string; +import static org.apache.usergrid.utils.ConversionUtils.*; import static org.apache.usergrid.utils.InflectionUtils.singularize; @@ -188,7 +106,7 @@ public class CpEntityManager implements EntityManager { private final ManagerCache managerCache; - private final IndexSchemaCacheFactory indexSchemaCacheFactory; + private final CollectionSettingsCacheFactory collectionSettingsCacheFactory; private final ApplicationScope applicationScope; @@ -251,7 +169,7 @@ public class CpEntityManager implements EntityManager { final GraphManagerFactory graphManagerFactory, final CollectionService collectionService, final ConnectionService connectionService, - final IndexSchemaCacheFactory indexSchemaCacheFactory, + final CollectionSettingsCacheFactory collectionSettingsCacheFactory, final UUID applicationId ) { this.entityManagerFig = entityManagerFig; @@ -275,7 +193,7 @@ public class CpEntityManager implements EntityManager { this.managerCache = managerCache; this.applicationId = applicationId; this.indexService = indexService; - this.indexSchemaCacheFactory = indexSchemaCacheFactory; + this.collectionSettingsCacheFactory = collectionSettingsCacheFactory; applicationScope = CpNamingUtils.getApplicationScope( applicationId ); @@ -406,7 +324,7 @@ public class CpEntityManager implements EntityManager { * * @param entityType the entity type * @param entityClass the entity class - * @param properties the properties + * @param properties the newSettings * @param importId an existing external UUID to use as the id for the new entity * * @return new entity @@ -586,10 +504,12 @@ public class CpEntityManager implements EntityManager { cpEntity = CpEntityMapUtils.fromMap( cpEntity, entity.getProperties(), entity.getType(), true ); try { - cpEntity = ecm.write( cpEntity ).toBlocking().last(); + + String region = lookupRegionForType( entity.getType() ); + + cpEntity = ecm.write( cpEntity, region ).toBlocking().last(); + // cpEntity = ecm.update( cpEntity ).toBlockingObservable().last(); -// -// // // need to reload entity so bypass entity cache // cpEntity = ecm.load( entityId ).toBlockingObservable().last(); @@ -620,15 +540,15 @@ public class CpEntityManager implements EntityManager { boolean skipIndexing = false; MapManager mm = getMapManagerForTypes(); - CollectionSettingsCache collectionSettingsCache = indexSchemaCacheFactory.getInstance( mm ); + CollectionSettingsCache collectionSettingsCache = collectionSettingsCacheFactory.getInstance( mm ); String collectionName = Schema.defaultCollectionName( type ); - Optional<Map<String, Object>> collectionIndexingSchema = + Optional<Map<String, Object>> collectionSettings = collectionSettingsCache.getCollectionSettings( collectionName ); - if ( collectionIndexingSchema.isPresent()) { - Map jsonMapData = collectionIndexingSchema.get(); - final ArrayList fields = (ArrayList) jsonMapData.get( "fields" ); - if ( fields.size() == 1 && fields.get(0).equals("none")) { + if ( collectionSettings.isPresent()) { + Map jsonMapData = collectionSettings.get(); + Object fields = jsonMapData.get("fields"); + if ( fields != null && "none".equalsIgnoreCase( fields.toString() ) ) { skipIndexing = true; } } @@ -782,7 +702,7 @@ public class CpEntityManager implements EntityManager { Preconditions.checkNotNull(entityRef, "entityRef cannot be null"); CpRelationManager relationManager = new CpRelationManager( managerCache, indexService, collectionService, - connectionService, this, entityManagerFig, applicationId, indexSchemaCacheFactory, entityRef ); + connectionService, this, entityManagerFig, applicationId, collectionSettingsCacheFactory, entityRef ); return relationManager; } @@ -1157,8 +1077,18 @@ public class CpEntityManager implements EntityManager { cpEntity.getId().getType(), cpEntity.getId().getUuid(), cpEntity.getVersion() ); } + String region = null; + MapManager mm = getMapManagerForTypes(); + CollectionSettingsCache collectionSettingsCache = collectionSettingsCacheFactory.getInstance( mm ); + String collectionName = Schema.defaultCollectionName( entityRef.getType() ); + Optional<Map<String, Object>> collectionSettings = + collectionSettingsCache.getCollectionSettings( collectionName ); + if ( collectionSettings.isPresent() ) { + region = collectionSettings.get().get("region").toString(); + } + //TODO: does this call and others like it need a graphite reporter? - cpEntity = ecm.write( cpEntity ).toBlocking().last(); + cpEntity = ecm.write( cpEntity, region ).toBlocking().last(); if(logger.isTraceEnabled()){ logger.trace("Wrote {}:{} version {}", @@ -1779,68 +1709,69 @@ public class CpEntityManager implements EntityManager { } @Override - public Map createCollectionSettings( String collectionName, String owner ,Map<String, Object> properties ){ - + public Map createCollectionSettings(String collectionName, String owner, Map<String, Object> newSettings) { //TODO: change timeservice as below then use timeservice. Instant timeInstance = Instant.now(); Long epoch = timeInstance.toEpochMilli(); - Map<String,Object> schemaMap = new HashMap<>( ); - - schemaMap.put("lastUpdated",epoch); - //this needs the method that can extract the user from the token no matter the token. - //Possible values are app credentials, org credentials, or the user email(Admin tokens). - schemaMap.put("lastUpdateBy",owner); + Map<String, Object> updatedSettings = new HashMap<>(); + updatedSettings.put( "lastUpdated", epoch ); + // this needs the method that can extract the user from the token no matter the token. + // Possible values are app credentials, org credentials, or the user email(Admin tokens). + updatedSettings.put( "lastUpdateBy", owner ); MapManager mm = getMapManagerForTypes(); - CollectionSettingsCache collectionSettingsCache = indexSchemaCacheFactory.getInstance( mm ); + CollectionSettingsCache collectionSettingsCache = collectionSettingsCacheFactory.getInstance( mm ); - Optional<Map<String, Object>> collectionIndexingSchema = + Optional<Map<String, Object>> existingSettings = collectionSettingsCache.getCollectionSettings( collectionName ); - - //If there is an existing schema then take the lastReindexed time and keep it around.Otherwise initialize to 0. - if ( collectionIndexingSchema.isPresent() ) { - Map jsonMapData = collectionIndexingSchema.get(); - schemaMap.put( "lastReindexed", jsonMapData.get( "lastReindexed" ) ); - } - else { - schemaMap.put( "lastReindexed", 0 ); + // If there is an existing schema then take the lastReindexed time and keep it around. + // Otherwise initialize to 0. + if ( existingSettings.isPresent() ) { + Map<String, Object> jsonMapData = existingSettings.get(); + updatedSettings.put( "lastReindexed", jsonMapData.get( "lastReindexed" ) ); + } else { + updatedSettings.put( "lastReindexed", 0 ); } - ArrayList<String> fieldProperties = ( ArrayList<String> ) properties.get( "fields" ); + // if fields specified, then put in settings + if ( newSettings.get("fields") != null ) { + updatedSettings.put("fields", newSettings.get("fields")); + } - //do a check to see if you have a * field. If you do have a * field then ignore all other fields - //and only accept the * field. - if ( fieldProperties.contains( "*" )) { - ArrayList<String> wildCardArrayList = new ArrayList<>(); - wildCardArrayList.add( "*" ); - schemaMap.put( "fields", wildCardArrayList ); + // if region specified + Object region = newSettings.get("region"); + if ( region != null ) { - } else if ( fieldProperties.contains( "none" )) { - ArrayList<String> wildCardArrayList = new ArrayList<>(); - wildCardArrayList.add( "none" ); - schemaMap.put( "fields", wildCardArrayList ); + // passing an empty string causes region to be removed from settings + if ( region.toString().trim().isEmpty() ) { + updatedSettings.remove("region"); - } else { - schemaMap.putAll( properties ); + } else { + // make sure region is in the configured region list + List regionList = Arrays.asList( entityManagerFig.getRegionList().toLowerCase().split( "," ) ); + if (!regionList.contains( region )) { + throw new NullArgumentException( "Region " + region + " not in region list" ); + } + updatedSettings.put("region", region); + } } - collectionSettingsCache.putCollectionSettings( collectionName, JsonUtils.mapToJsonString( schemaMap ) ); - - return schemaMap; + collectionSettingsCache.putCollectionSettings( collectionName, JsonUtils.mapToJsonString( updatedSettings ) ); + return updatedSettings; } @Override public void deleteCollectionSettings( String collectionName ){ MapManager mm = getMapManagerForTypes(); - CollectionSettingsCache collectionSettingsCache = indexSchemaCacheFactory.getInstance( mm ); + CollectionSettingsCache collectionSettingsCache = collectionSettingsCacheFactory.getInstance( mm ); collectionSettingsCache.deleteCollectionSettings( collectionName ); @@ -1848,18 +1779,17 @@ public class CpEntityManager implements EntityManager { @Override - public Object getCollectionSettings( String collectionName ){ + public Object getCollectionSettings(String collectionName) { MapManager mm = getMapManagerForTypes(); - CollectionSettingsCache collectionSettingsCache = indexSchemaCacheFactory.getInstance( mm ); + CollectionSettingsCache collectionSettingsCache = collectionSettingsCacheFactory.getInstance( mm ); Optional<Map<String, Object>> collectionIndexingSchema = collectionSettingsCache.getCollectionSettings( collectionName ); - if(collectionIndexingSchema.isPresent()){ + if (collectionIndexingSchema.isPresent()) { return collectionIndexingSchema.get(); - } - else{ + } else { return null; } } @@ -2763,7 +2693,7 @@ public class CpEntityManager implements EntityManager { properties.put( PROPERTY_MODIFIED, ( long ) ( timestamp / 1000 ) ); } - // special case timestamp and published properties + // special case timestamp and published newSettings // and dictionary their timestamp values if not set // this is sure to break something for someone someday @@ -2811,13 +2741,16 @@ public class CpEntityManager implements EntityManager { try { - if(logger.isTraceEnabled()) { + if ( logger.isTraceEnabled()) { logger.trace( "About to Write {}:{} version {}", cpEntity.getId().getType(), cpEntity.getId().getUuid(), cpEntity.getVersion() ); } + String region = lookupRegionForType( entity.getType() ); + //this does the write so before adding to a collection everything already exists already. - cpEntity = ecm.write( cpEntity ).toBlocking().last(); + cpEntity = ecm.write( cpEntity, region ).toBlocking().last(); + entity.setSize(cpEntity.getSize()); if(logger.isTraceEnabled()) { @@ -3117,6 +3050,24 @@ public class CpEntityManager implements EntityManager { } } + + + private String lookupRegionForType( String type ) { + + String region = null; + MapManager mm = getMapManagerForTypes(); + CollectionSettingsCache collectionSettingsCache = collectionSettingsCacheFactory.getInstance( mm ); + String collectionName = Schema.defaultCollectionName( type ); + + Optional<Map<String, Object>> collectionSettings = + collectionSettingsCache.getCollectionSettings( collectionName ); + + if ( collectionSettings.isPresent() && collectionSettings.get().get("region") != null ) { + region = collectionSettings.get().get("region").toString(); + } + return region; + } + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/11aa1386/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java index c3bcaf6..b95f143 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java @@ -22,6 +22,8 @@ import java.util.Set; import java.util.TreeMap; import java.util.UUID; +import org.apache.usergrid.persistence.collection.uniquevalues.AkkaFig; +import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; @@ -31,7 +33,7 @@ import org.springframework.context.ApplicationContextAware; import org.apache.commons.lang.StringUtils; import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; -import org.apache.usergrid.corepersistence.index.IndexSchemaCacheFactory; +import org.apache.usergrid.corepersistence.index.CollectionSettingsCacheFactory; import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilder; import org.apache.usergrid.corepersistence.index.ReIndexService; import org.apache.usergrid.corepersistence.service.CollectionService; @@ -111,7 +113,6 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application }); private final ApplicationIdCache applicationIdCache; - //private final IndexSchemaCache indexSchemaCache; private ManagerCache managerCache; @@ -124,7 +125,8 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application private final CollectionService collectionService; private final ConnectionService connectionService; private final GraphManagerFactory graphManagerFactory; - private final IndexSchemaCacheFactory indexSchemaCacheFactory; + private final CollectionSettingsCacheFactory collectionSettingsCacheFactory; + private UniqueValuesService uniqueValuesService; public CpEntityManagerFactory( final CassandraService cassandraService, final CounterUtils counterUtils, final Injector injector ) { @@ -140,14 +142,18 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application this.graphManagerFactory = injector.getInstance( GraphManagerFactory.class ); this.collectionService = injector.getInstance( CollectionService.class ); this.connectionService = injector.getInstance( ConnectionService.class ); - this.indexSchemaCacheFactory = injector.getInstance( IndexSchemaCacheFactory.class ); - //this line always needs to be last due to the temporary cicular dependency until spring is removed + this.collectionSettingsCacheFactory = injector.getInstance( CollectionSettingsCacheFactory.class ); + AkkaFig akkaFig = injector.getInstance( AkkaFig.class ); + if ( akkaFig.getAkkaEnabled() ) { + this.uniqueValuesService = injector.getInstance( UniqueValuesService.class ); + this.uniqueValuesService.start(); + } + + // this line always needs to be last due to the temporary cicular dependency until spring is removed this.applicationIdCache = injector.getInstance(ApplicationIdCacheFactory.class).getInstance( getManagementEntityManager() ); - - } @@ -201,8 +207,10 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application private EntityManager _getEntityManager( UUID applicationId ) { - EntityManager em = new CpEntityManager(cassandraService, counterUtils, indexService, managerCache, - metricsFactory, entityManagerFig, graphManagerFactory, collectionService, connectionService,indexSchemaCacheFactory, applicationId ); + + EntityManager em = new CpEntityManager( cassandraService, counterUtils, indexService, managerCache, + metricsFactory, entityManagerFig, graphManagerFactory, collectionService, connectionService, + collectionSettingsCacheFactory, applicationId ); return em; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/11aa1386/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index 6c6dc55..e7b42b9 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -20,7 +20,7 @@ package org.apache.usergrid.corepersistence; import java.util.*; import org.apache.usergrid.corepersistence.index.CollectionSettingsCache; -import org.apache.usergrid.corepersistence.index.IndexSchemaCacheFactory; +import org.apache.usergrid.corepersistence.index.CollectionSettingsCacheFactory; import org.apache.usergrid.corepersistence.results.IdQueryExecutor; import org.apache.usergrid.persistence.map.MapManager; import org.apache.usergrid.persistence.map.MapScope; @@ -107,7 +107,7 @@ public class CpRelationManager implements RelationManager { private final AsyncEventService indexService; - private final IndexSchemaCacheFactory indexSchemaCacheFactory; + private final CollectionSettingsCacheFactory collectionSettingsCacheFactory; private final CollectionService collectionService; @@ -119,7 +119,7 @@ public class CpRelationManager implements RelationManager { final ConnectionService connectionService, final EntityManager em, final EntityManagerFig entityManagerFig, final UUID applicationId, - final IndexSchemaCacheFactory indexSchemaCacheFactory, + final CollectionSettingsCacheFactory collectionSettingsCacheFactory, final EntityRef headEntity) { @@ -159,7 +159,7 @@ public class CpRelationManager implements RelationManager { .format( "cpHeadEntity cannot be null for entity id %s, app id %s", entityId.getUuid(), applicationId ) ); this.indexService = indexService; - this.indexSchemaCacheFactory = indexSchemaCacheFactory; + this.collectionSettingsCacheFactory = collectionSettingsCacheFactory; } @@ -1087,15 +1087,15 @@ public class CpRelationManager implements RelationManager { boolean skipIndexing = false; MapManager mm = getMapManagerForTypes(); - CollectionSettingsCache collectionSettingsCache = indexSchemaCacheFactory.getInstance( mm ); + CollectionSettingsCache collectionSettingsCache = collectionSettingsCacheFactory.getInstance( mm ); String collectionName = Schema.defaultCollectionName( type ); Optional<Map<String, Object>> collectionIndexingSchema = collectionSettingsCache.getCollectionSettings( collectionName ); if ( collectionIndexingSchema.isPresent()) { Map jsonMapData = collectionIndexingSchema.get(); - final ArrayList fields = (ArrayList) jsonMapData.get( "fields" ); - if ( fields.size() == 1 && fields.get(0).equals("none")) { + final Object fields = jsonMapData.get( "fields" ); + if ( fields != null && fields instanceof String && "none".equalsIgnoreCase( fields.toString())) { skipIndexing = true; } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/11aa1386/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java index 4c50aee..46c7a1d 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java @@ -41,4 +41,13 @@ public interface EntityManagerFig extends GuicyFig { @Key( "usergrid.entityManager.enable_deindex_on_update" ) @Default( "false" ) boolean getDeindexOnUpdate(); + + /** + * Comma-separated list of one or more Amazon regions to use if multiregion + * is set to true. + */ + @Key( "usergrid.queue.regionList" ) + @Default("us-east-1") + String getRegionList(); + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/11aa1386/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionSettingsCacheFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionSettingsCacheFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionSettingsCacheFactory.java new file mode 100644 index 0000000..6c720ef --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionSettingsCacheFactory.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import org.apache.usergrid.persistence.map.MapManager; + +import com.google.inject.Inject; +import com.google.inject.Singleton; + + +/** + * This can only be implemented after we have the impl for the application cache. + */ +@Singleton +public class CollectionSettingsCacheFactory { + private final IndexSchemaCacheFig fig; + + @Inject + public CollectionSettingsCacheFactory(IndexSchemaCacheFig fig){ + this.fig = fig; + } + + public CollectionSettingsCache getInstance(MapManager mapManager ){ + return new CollectionSettingsCacheImpl( mapManager,fig ); + } + + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/11aa1386/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexSchemaCacheFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexSchemaCacheFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexSchemaCacheFactory.java deleted file mode 100644 index 694132c..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexSchemaCacheFactory.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.usergrid.corepersistence.index; - - -import org.apache.usergrid.persistence.map.MapManager; - -import com.google.inject.Inject; -import com.google.inject.Singleton; - - -/** - * This can only be implemented after we have the impl for the application cache. - */ -@Singleton -public class IndexSchemaCacheFactory { - private final IndexSchemaCacheFig fig; - - @Inject - public IndexSchemaCacheFactory(IndexSchemaCacheFig fig){ - this.fig = fig; - } - - public CollectionSettingsCache getInstance(MapManager mapManager ){ - return new CollectionSettingsCacheImpl( mapManager,fig ); - } - - -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/11aa1386/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java index bfe60fd..7b93abc 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java @@ -20,11 +20,7 @@ package org.apache.usergrid.corepersistence.index; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; -import java.util.UUID; +import java.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,7 +78,7 @@ public class IndexServiceImpl implements IndexService { private final GraphManagerFactory graphManagerFactory; private final EntityIndexFactory entityIndexFactory; private final MapManagerFactory mapManagerFactory; - private final IndexSchemaCacheFactory indexSchemaCacheFactory; + private final CollectionSettingsCacheFactory collectionSettingsCacheFactory; private final EdgesObservable edgesObservable; private final IndexFig indexFig; private final IndexLocationStrategyFactory indexLocationStrategyFactory; @@ -93,7 +89,7 @@ public class IndexServiceImpl implements IndexService { @Inject public IndexServiceImpl( final GraphManagerFactory graphManagerFactory, final EntityIndexFactory entityIndexFactory, final MapManagerFactory mapManagerFactory, - final IndexSchemaCacheFactory indexSchemaCacheFactory, + final CollectionSettingsCacheFactory collectionSettingsCacheFactory, final EdgesObservable edgesObservable, final IndexFig indexFig, final IndexLocationStrategyFactory indexLocationStrategyFactory, final MetricsFactory metricsFactory ) { @@ -103,7 +99,7 @@ public class IndexServiceImpl implements IndexService { this.edgesObservable = edgesObservable; this.indexFig = indexFig; this.indexLocationStrategyFactory = indexLocationStrategyFactory; - this.indexSchemaCacheFactory = indexSchemaCacheFactory; + this.collectionSettingsCacheFactory = collectionSettingsCacheFactory; this.indexTimer = metricsFactory.getTimer( IndexServiceImpl.class, "index.update_all"); this.addTimer = metricsFactory.getTimer( IndexServiceImpl.class, "index.add" ); } @@ -204,11 +200,10 @@ public class IndexServiceImpl implements IndexService { MapManager mm = mapManagerFactory.createMapManager( ms ); Set<String> defaultProperties; - ArrayList fieldsToKeep; String collectionName = CpNamingUtils.getCollectionNameFromEdgeName( indexEdge.getEdgeName() ); - CollectionSettingsCache collectionSettingsCache = indexSchemaCacheFactory.getInstance( mm ); + CollectionSettingsCache collectionSettingsCache = collectionSettingsCacheFactory.getInstance( mm ); Optional<Map<String, Object>> collectionIndexingSchema = collectionSettingsCache.getCollectionSettings( collectionName ); @@ -219,18 +214,18 @@ public class IndexServiceImpl implements IndexService { Map jsonMapData = collectionIndexingSchema.get(); Schema schema = Schema.getDefaultSchema(); defaultProperties = schema.getRequiredProperties( collectionName ); - fieldsToKeep = ( ArrayList ) jsonMapData.get( "fields" ); - if(fieldsToKeep.contains( "*" )){ + Object fields = jsonMapData.get("fields"); + + if ( fields != null && fields instanceof String && "all".equalsIgnoreCase(fields.toString())) { return Optional.absent(); } - // never add "none" because it has special meaning, "none" disables indexing for a type - fieldsToKeep.remove("none"); + if ( fields != null && fields instanceof List ) { + defaultProperties.addAll( (List) fields ); + } - defaultProperties.addAll( fieldsToKeep ); - } - else { + } else { return Optional.absent(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/11aa1386/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java index fc06100..93e6b90 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java @@ -83,7 +83,7 @@ public class ReIndexServiceImpl implements ReIndexService { private final MapManagerFactory mapManagerFactory; private final AsyncEventService indexService; private final EntityIndexFactory entityIndexFactory; - private final IndexSchemaCacheFactory indexSchemaCacheFactory; + private final CollectionSettingsCacheFactory collectionSettingsCacheFactory; @Inject @@ -93,7 +93,7 @@ public class ReIndexServiceImpl implements ReIndexService { final MapManagerFactory mapManagerFactory, final AllApplicationsObservable allApplicationsObservable, final IndexProcessorFig indexProcessorFig, - final IndexSchemaCacheFactory indexSchemaCacheFactory, + final CollectionSettingsCacheFactory collectionSettingsCacheFactory, final AsyncEventService indexService ) { this.entityIndexFactory = entityIndexFactory; this.indexLocationStrategyFactory = indexLocationStrategyFactory; @@ -101,7 +101,7 @@ public class ReIndexServiceImpl implements ReIndexService { this.allApplicationsObservable = allApplicationsObservable; this.indexProcessorFig = indexProcessorFig; this.indexService = indexService; - this.indexSchemaCacheFactory = indexSchemaCacheFactory; + this.collectionSettingsCacheFactory = collectionSettingsCacheFactory; this.mapManagerFactory = mapManagerFactory; this.mapManager = mapManagerFactory.createMapManager( RESUME_MAP_SCOPE ); } @@ -145,15 +145,15 @@ public class ReIndexServiceImpl implements ReIndexService { CpNamingUtils.getEntityTypeMapScope( appId.get().getApplication() ) ); CollectionSettingsCache collectionSettingsCache = - indexSchemaCacheFactory.getInstance( collectionMapStorage ); + collectionSettingsCacheFactory.getInstance( collectionMapStorage ); - Optional<Map<String, Object>> collectionIndexingSchema = + Optional<Map<String, Object>> collectionSettings = collectionSettingsCache.getCollectionSettings( collectionName ); // If we do have a schema then parse it and add it to a list of properties we want to keep.Otherwise return. - if ( collectionIndexingSchema.isPresent() ) { + if ( collectionSettings.isPresent() ) { - Map jsonMapData = collectionIndexingSchema.get(); + Map jsonMapData = collectionSettings.get(); jsonMapData.put( "lastReindexed", Instant.now().toEpochMilli() ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/11aa1386/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationServiceImpl.java index 82e0695..91f11f5 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationServiceImpl.java @@ -24,7 +24,7 @@ import com.google.inject.Inject; import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; import org.apache.usergrid.corepersistence.asyncevents.EventBuilder; import org.apache.usergrid.corepersistence.index.CollectionSettingsCache; -import org.apache.usergrid.corepersistence.index.IndexSchemaCacheFactory; +import org.apache.usergrid.corepersistence.index.CollectionSettingsCacheFactory; import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable; import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.persistence.Schema; @@ -59,7 +59,7 @@ public class ApplicationServiceImpl implements ApplicationService{ private final EventBuilder eventBuilder; private final MapManagerFactory mapManagerFactory; private final GraphManagerFactory graphManagerFactory; - private final IndexSchemaCacheFactory indexSchemaCacheFactory; + private final CollectionSettingsCacheFactory collectionSettingsCacheFactory; @@ -70,7 +70,7 @@ public class ApplicationServiceImpl implements ApplicationService{ EventBuilder eventBuilder, MapManagerFactory mapManagerFactory, GraphManagerFactory graphManagerFactory, - IndexSchemaCacheFactory indexSchemaCacheFactory + CollectionSettingsCacheFactory collectionSettingsCacheFactory ){ this.allEntityIdsObservable = allEntityIdsObservable; @@ -79,7 +79,7 @@ public class ApplicationServiceImpl implements ApplicationService{ this.eventBuilder = eventBuilder; this.mapManagerFactory = mapManagerFactory; this.graphManagerFactory = graphManagerFactory; - this.indexSchemaCacheFactory = indexSchemaCacheFactory; + this.collectionSettingsCacheFactory = collectionSettingsCacheFactory; } @@ -149,7 +149,7 @@ public class ApplicationServiceImpl implements ApplicationService{ boolean skipIndexing = false; MapManager mm = getMapManagerForTypes(applicationScope); - CollectionSettingsCache collectionSettingsCache = indexSchemaCacheFactory.getInstance( mm ); + CollectionSettingsCache collectionSettingsCache = collectionSettingsCacheFactory.getInstance( mm ); String collectionName = Schema.defaultCollectionName( type ); Optional<Map<String, Object>> collectionIndexingSchema = collectionSettingsCache.getCollectionSettings( collectionName ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/11aa1386/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java index 12a33cf..366ab34 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java @@ -120,7 +120,7 @@ public abstract class AsyncIndexServiceTest { final EntityCollectionManager collectionManager = entityCollectionManagerFactory.createCollectionManager(applicationScope); - collectionManager.write(testEntity).toBlocking().last(); + collectionManager.write(testEntity, null ).toBlocking().last(); final GraphManager graphManager = graphManagerFactory.createEdgeManager( applicationScope ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/11aa1386/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java index 90d6c5a..dab14ed 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java @@ -20,8 +20,6 @@ package org.apache.usergrid.corepersistence.index; -import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.UUID; @@ -155,7 +153,7 @@ public class IndexServiceTest { final EntityCollectionManager collectionManager = entityCollectionManagerFactory.createCollectionManager( applicationScope ); - collectionManager.write( testEntity ).toBlocking().last(); + collectionManager.write( testEntity, null ).toBlocking().last(); final GraphManager graphManager = graphManagerFactory.createEdgeManager( applicationScope ); @@ -228,7 +226,7 @@ public class IndexServiceTest { final EntityCollectionManager collectionManager = entityCollectionManagerFactory.createCollectionManager( applicationScope ); - collectionManager.write( testEntity ).toBlocking().last(); + collectionManager.write( testEntity, null ).toBlocking().last(); final GraphManager graphManager = graphManagerFactory.createEdgeManager( applicationScope ); @@ -475,7 +473,7 @@ public class IndexServiceTest { final EntityCollectionManager collectionManager = entityCollectionManagerFactory.createCollectionManager( applicationScope ); - collectionManager.write( testEntity ).toBlocking().last(); + collectionManager.write( testEntity, null ).toBlocking().last(); //create our collection edge final Edge collectionEdge = http://git-wip-us.apache.org/repos/asf/usergrid/blob/11aa1386/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java index 9de8f41..0fc5636 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java @@ -39,10 +39,11 @@ public interface EntityCollectionManager { * completely overwrite the previous values, if it exists. * * @param entity The entity to update + * @param region The authoritative region for the entity type or null to use current region. * * @return the Observable with the updated entity in the body */ - Observable<Entity> write( Entity entity ); + Observable<Entity> write( Entity entity, String region ); /** http://git-wip-us.apache.org/repos/asf/usergrid/blob/11aa1386/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java index e71e6bb..b7ea865 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java @@ -64,7 +64,6 @@ import org.apache.usergrid.persistence.core.util.ValidationUtils; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.field.Field; -import org.apache.usergrid.persistence.model.util.EntityUtils; import org.apache.usergrid.persistence.model.util.UUIDGenerator; import com.codahale.metrics.Timer; @@ -81,7 +80,6 @@ import com.netflix.astyanax.serializers.StringSerializer; import rx.Observable; import rx.Subscriber; -import rx.functions.Action0; /** @@ -171,7 +169,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { @Override - public Observable<Entity> write( final Entity entity ) { + public Observable<Entity> write(final Entity entity, String region) { //do our input validation Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" ); @@ -182,20 +180,20 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { // create our observable and start the write - final CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>( applicationScope, entity ); + final CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>( applicationScope, entity, region ); Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner( writeData, writeStart ); - final Observable<Entity> write = observable.map( writeCommit ) - .map(ioEvent -> { - //fire this in the background so we don't block writes - Observable.just( ioEvent ).compose( uniqueCleanup ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe(); - return ioEvent; - } - ) - //now extract the ioEvent we need to return and update the version - .map( ioEvent -> ioEvent.getEvent().getEntity().get() ); + final Observable<Entity> write = observable.map( writeCommit ).map(ioEvent -> { + + // fire this in the background so we don't block writes + Observable.just( ioEvent ).compose( uniqueCleanup ) + .subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe(); + return ioEvent; + + }) // now extract the ioEvent we need to return and update the version + .map( ioEvent -> ioEvent.getEvent().getEntity().get() ); return ObservableTimer.time( write, writeTimer ); } @@ -212,7 +210,6 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { .doOnNext( markCommit ).compose( uniqueCleanup ).map( entityEvent -> entityEvent.getEvent().getId() ); - return ObservableTimer.time( o, deleteTimer ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/11aa1386/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/CollectionIoEvent.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/CollectionIoEvent.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/CollectionIoEvent.java index 4fc3f8e..e0388de 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/CollectionIoEvent.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/CollectionIoEvent.java @@ -29,14 +29,20 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope; */ public class CollectionIoEvent<T> implements Serializable { - private ApplicationScope context; + final private ApplicationScope context; - private T event; + final private T event; + final private String region; public CollectionIoEvent( final ApplicationScope context, final T event ) { + this( context, event, null ); + } + + public CollectionIoEvent( final ApplicationScope context, final T event, String region ) { this.context = context; this.event = event; + this.region = region; } @@ -48,4 +54,8 @@ public class CollectionIoEvent<T> implements Serializable { public T getEvent() { return event; } + + public String getRegion() { + return region; + } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/11aa1386/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java index 360e954..7556911 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java @@ -117,7 +117,8 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect final Entity entity = mvccEntity.getEntity().get(); try { - akkaUvService.confirmUniqueValues( applicationScope, entity, mvccEntity.getVersion() ); + akkaUvService.confirmUniqueValues( + applicationScope, entity, mvccEntity.getVersion(), ioEvent.getRegion() ); } catch (UniqueValueException e) { Map<String, Field> violations = new HashMap<>(); @@ -155,13 +156,37 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect // merge the 2 into 1 mutation logMutation.mergeShallow( entityMutation ); + // akkaFig may be null when this is called from JUnit tests + if ( akkaFig != null && akkaFig.getAkkaEnabled() ) { + confirmUniqueFieldsAkka( mvccEntity, version, applicationScope, ioEvent.getRegion() ); + } else { + confirmUniqueFields( mvccEntity, version, applicationScope, logMutation ); + } + + try { + logMutation.execute(); + } + catch ( ConnectionException e ) { + logger.error( "Failed to execute write asynchronously ", e ); + throw new WriteCommitException( mvccEntity, applicationScope, + "Failed to execute write asynchronously ", e ); + } + + return ioEvent; + } + + + private void confirmUniqueFields( + MvccEntity mvccEntity, UUID version, ApplicationScope scope, MutationBatch logMutation) { + + final Entity entity = mvccEntity.getEntity().get(); + // re-write the unique values but this time with no TTL for ( Field field : EntityUtils.getUniqueFields(mvccEntity.getEntity().get()) ) { - UniqueValue written = new UniqueValueImpl( field, - entityId,version); + UniqueValue written = new UniqueValueImpl( field, entity.getId(), version); - MutationBatch mb = uniqueValueStrat.write(applicationScope, written ); + MutationBatch mb = uniqueValueStrat.write(scope, written ); logger.debug("Finalizing {} unique value {}", field.getName(), field.getValue().toString()); @@ -174,10 +199,26 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect } catch ( ConnectionException e ) { logger.error( "Failed to execute write asynchronously ", e ); - throw new WriteCommitException( mvccEntity, applicationScope, + throw new WriteCommitException( mvccEntity, scope, "Failed to execute write asynchronously ", e ); } + } - return ioEvent; + + private void confirmUniqueFieldsAkka( + MvccEntity mvccEntity, UUID version, ApplicationScope scope, String region ) { + + final Entity entity = mvccEntity.getEntity().get(); + + try { + akkaUvService.confirmUniqueValues( scope, entity, version, region ); + + } catch (UniqueValueException e) { + + Map<String, Field> violations = new HashMap<>(); + violations.put( e.getField().getName(), e.getField() ); + + throw new WriteUniqueVerifyException( mvccEntity, scope, violations ); + } } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/11aa1386/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java index a37abf1..4fb8c39 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java @@ -123,7 +123,7 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>> final ApplicationScope applicationScope = ioevent.getEntityCollection(); try { - akkaUvService.reserveUniqueValues( applicationScope, entity, mvccEntity.getVersion() ); + akkaUvService.reserveUniqueValues( applicationScope, entity, mvccEntity.getVersion(), ioevent.getRegion() ); } catch (UniqueValueException e) { Map<String, Field> violations = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/11aa1386/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFig.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFig.java index 67db3e9..7514a32 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFig.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFig.java @@ -35,6 +35,8 @@ public interface AkkaFig extends GuicyFig { String AKKA_REGION = "collection.akka.region"; + String AKKA_REGION_LIST = "usergrid.queue.regionList"; // same region list used by queues + String AKKA_REGION_SEEDS = "collection.akka.region.seeds"; String AKKA_UNIQUEVALUE_ACTORS = "collection.akka.uniquevalue.actors"; @@ -75,6 +77,13 @@ public interface AkkaFig extends GuicyFig { String getRegion(); /** + * Comma separated list of regions known to cluster. + */ + @Key(AKKA_REGION_LIST) + @Default("us-east") + String getRegionList(); + + /** * Number of UniqueValueActors to be started on each node */ @Key(AKKA_UNIQUEVALUE_ACTORS) @@ -82,32 +91,32 @@ public interface AkkaFig extends GuicyFig { int getUniqueValueActors(); /** - * Comma-separated lists of seeds each with format {region}:{hostname}:{port} + * Comma-separated lists of seeds each with format {region}:{hostname}:{port}. + * Regions MUST be listed in the 'usergrid.queue.regionList' */ @Key(AKKA_REGION_SEEDS) - @Default("us-east:localhost:2551") + @Default("us-east-1:localhost:2551") String getRegionSeeds(); /** * Authoritative regions may be specified for types * Comma-separated lists of region types each with format {region}:{type} */ - // TODO: allow this to be set via REST API @Key(AKKA_UNIQUEVALUE_REGION_TYPES) - @Default("us-east:user") + @Default("us-east-1:user") String getRegionTypes(); /** * Unique Value cache TTL in seconds. */ @Key(AKKA_UNIQUEVALUE_CACHE_TTL) - @Default("5") + @Default("10") int getUniqueValueCacheTtl(); /** * Unique Value Reservation TTL in seconds. */ @Key(AKKA_UNIQUEVALUE_RESERVATION_TTL) - @Default("5") + @Default("10") int getUniqueValueReservationTtl(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/11aa1386/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java index c0911b0..3bbcbd9 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java @@ -9,26 +9,34 @@ import org.slf4j.LoggerFactory; import java.util.concurrent.TimeUnit; -// cannot be a Guice singleton, must be shared across injectors -// @com.google.inject.Singleton public class ReservationCache { private static final Logger logger = LoggerFactory.getLogger( RequestActor.class ); - Cache<String, UniqueValueActor.Reservation> cache = CacheBuilder.newBuilder() - .maximumSize(1000) - .concurrencyLevel( 300 ) - .expireAfterWrite(30, TimeUnit.SECONDS) - .recordStats() - .build(); + Cache<String, UniqueValueActor.Reservation> cache; - private static ReservationCache instance = new ReservationCache(); + // use hokey old-style singleton because its not that easy to get Guice into an actor + private static ReservationCache instance = null; + + ReservationCache( long ttl ) { + cache = CacheBuilder.newBuilder() + .maximumSize(1000) + .concurrencyLevel( 300 ) + .expireAfterWrite(ttl, TimeUnit.SECONDS) + .recordStats() + .build(); + } + + public static void init( long ttl ) { + instance = new ReservationCache( ttl ); + } public static ReservationCache getInstance() { + if ( instance == null ) { + throw new IllegalStateException( "ReservationCache not initialized yet" ); + } return instance; } - private ReservationCache() {} - public UniqueValueActor.Reservation get( String rowKey ) { UniqueValueActor.Reservation res = cache.getIfPresent( rowKey ); return res; http://git-wip-us.apache.org/repos/asf/usergrid/blob/11aa1386/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java index add33fa..2912c7d 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java @@ -35,9 +35,7 @@ public class ReservationCacheActor extends UntypedActor { int reservationCount = 0; int cancellationCount = 0; - public ReservationCacheActor(String injectorName ) { - - logger.info("Starting for {}", injectorName); + public ReservationCacheActor() { // subscribe to the topic named "content" ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/11aa1386/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java index ce1d72f..7becd47 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java @@ -30,12 +30,6 @@ public class UniqueValueActor extends UntypedActor { public UniqueValueActor( UniqueValuesTable table ) { this.table = table; - -// chaos = Boolean.parseBoolean( uniqueValuesService.getProperties() -// .getProperty( "akka.test.chaos", "false" ) ); - -// metricsService = -// GuiceModule.getInjector( injectorName ).getInstance( MetricsService.class ); } @Override http://git-wip-us.apache.org/repos/asf/usergrid/blob/11aa1386/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesService.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesService.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesService.java index 06aea4b..744c5b9 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesService.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesService.java @@ -30,20 +30,35 @@ import java.util.UUID; public interface UniqueValuesService { /** + * Initialize and start service. + */ + void start(); + + /** * Check that unique values are unique and reserve them for a limited time. * If the reservations are not confirmed, they will expire. + * + * @param scope Application scope of entity. + * @param entity Entity with unique values to be confirmed. + * @param version Version of entity claiming unique values. + * @param region Authoritative Region to be used for this entity or null to use current region. + * @throws UniqueValueException if unique values cannot be confirmed. */ - void reserveUniqueValues( ApplicationScope scope, Entity entity, UUID version ) throws UniqueValueException; + void reserveUniqueValues( ApplicationScope scope, Entity entity, UUID version, String region ) + throws UniqueValueException; /** * Confirm unique values that were reserved earlier. + * + * @param scope Application scope of entity. + * @param entity Entity with unique values to be reserved. + * @param version Version of entity claiming unique values. + * @param region Authoritative Region to be used for this entity or null to use current region. + * @throws UniqueValueException if unique values cannot be reserved. */ - void confirmUniqueValues( ApplicationScope scope, Entity entity, UUID version ) throws UniqueValueException; + void confirmUniqueValues( ApplicationScope scope, Entity entity, UUID version , String region ) + throws UniqueValueException; - /** - * Initialize and wait for Akka actors to start. - */ - void start(); /** * For test purposes only. http://git-wip-us.apache.org/repos/asf/usergrid/blob/11aa1386/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java index bd357ae..9f06baa 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java @@ -51,13 +51,8 @@ import java.util.concurrent.TimeUnit; public class UniqueValuesServiceImpl implements UniqueValuesService { private static final Logger logger = LoggerFactory.getLogger( UniqueValuesServiceImpl.class ); - @Inject AkkaFig akkaFig; - - @Inject UniqueValuesTable table; - - private String hostname; private Integer port; private String currentRegion; @@ -81,8 +76,14 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { private final boolean disableUniqueValues; - public UniqueValuesServiceImpl() { + @Inject + public UniqueValuesServiceImpl( AkkaFig akkaFig, UniqueValuesTable table ) { + this.akkaFig = akkaFig; + this.table = table; + + ReservationCache.init( akkaFig.getUniqueValueCacheTtl() ); this.reservationCache = ReservationCache.getInstance(); + this.disableUniqueValues = false; } @@ -91,6 +92,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { * Init Akka ActorSystems and wait for request actors to start. */ public void start() { + this.hostname = akkaFig.getHostname(); this.port = akkaFig.getPort(); this.currentRegion = akkaFig.getRegion(); @@ -104,6 +106,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { * For testing purposes only; does not wait for request actors to start. */ public void start( String hostname, Integer port, String currentRegion ) { + this.hostname = hostname; this.port = port; this.currentRegion = currentRegion; @@ -158,12 +161,19 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { throw new RuntimeException( "No value specified for akka.region"); } + List regionList = Arrays.asList( akkaFig.getRegionList().toLowerCase().split(",") ); + String typesValue = akkaFig.getRegionTypes(); String[] regionTypes = StringUtils.isEmpty( typesValue ) ? new String[0] : typesValue.split(","); for ( String regionType : regionTypes ) { - String[] parts = regionType.split(":"); + String[] parts = regionType.toLowerCase().split(":"); String typeRegion = parts[0]; String type = parts[1]; + + if ( !regionList.contains( typeRegion) ) { + throw new RuntimeException( + "'collection.akka.region.seeds' references unknown region: " + typeRegion ); + } this.regionsByType.put( type, typeRegion ); } @@ -419,12 +429,13 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { @Override - public void reserveUniqueValues( ApplicationScope scope, Entity entity, UUID version) throws UniqueValueException { + public void reserveUniqueValues( + ApplicationScope scope, Entity entity, UUID version, String region ) throws UniqueValueException { try { for (Field field : entity.getFields()) { if (field.isUnique()) { - reserveUniqueField( scope, entity, version, field ); + reserveUniqueField( scope, entity, version, field, region ); } } @@ -432,7 +443,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { for (Field field : entity.getFields()) { try { - cancelUniqueField( scope, entity, version, field ); + cancelUniqueField( scope, entity, version, field, region ); } catch (Throwable ignored) { logger.debug( "Error canceling unique field", ignored ); } @@ -444,12 +455,13 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { @Override - public void confirmUniqueValues( ApplicationScope scope, Entity entity, UUID version ) throws UniqueValueException { + public void confirmUniqueValues( + ApplicationScope scope, Entity entity, UUID version, String region ) throws UniqueValueException { try { for (Field field : entity.getFields()) { if (field.isUnique()) { - confirmUniqueField( scope, entity, version, field ); + confirmUniqueField( scope, entity, version, field, region ); } } @@ -457,7 +469,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { for (Field field : entity.getFields()) { try { - cancelUniqueField( scope, entity, version, field ); + cancelUniqueField( scope, entity, version, field, region) ; } catch (Throwable ignored) { logger.debug( "Error canceling unique field", ignored ); } @@ -469,12 +481,17 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { private void reserveUniqueField( - ApplicationScope scope, Entity entity, UUID version, Field field ) throws UniqueValueException { + ApplicationScope scope, Entity entity, UUID version, Field field, String region ) throws UniqueValueException { - ActorRef requestActor = lookupRequestActorForType( entity.getId().getType() ); + final ActorRef requestActor; + if ( region != null ) { + requestActor = getRequestActorsByRegion().get( region ); + } else { + requestActor = lookupRequestActorForType( entity.getId().getType() ); + } if ( requestActor == null ) { - throw new RuntimeException( "No request actor for type, cannot verify unique fields!" ); + throw new RuntimeException( "No request actor for region or type, cannot verify unique fields!" ); } UniqueValueActor.Request request = new UniqueValueActor.Reservation( @@ -488,12 +505,12 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { throw new UniqueValueException( "Error property not unique (cache)", field); } - sendUniqueValueRequest( entity, requestActor, request ); + sendUniqueValueRequest( entity, requestActor, request ); } private void confirmUniqueField( - ApplicationScope scope, Entity entity, UUID version, Field field ) throws UniqueValueException { + ApplicationScope scope, Entity entity, UUID version, Field field, String region) throws UniqueValueException { ActorRef requestActor = lookupRequestActorForType( entity.getId().getType() ); @@ -509,7 +526,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { private void cancelUniqueField( - ApplicationScope scope, Entity entity, UUID version, Field field ) throws UniqueValueException { + ApplicationScope scope, Entity entity, UUID version, Field field, String region ) throws UniqueValueException { ActorRef requestActor = lookupRequestActorForType( entity.getId().getType() ); @@ -525,11 +542,8 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { private ActorRef lookupRequestActorForType( String type ) { - String region = getRegionsByType().get( type ); - if ( region == null ) { - region = currentRegion; - } - ActorRef requestActor = getRequestActorsByRegion().get(region); + final String region = getRegionsByType().get( type ); + ActorRef requestActor = getRequestActorsByRegion().get( region == null ? currentRegion : region ); if ( requestActor == null ) { throw new RuntimeException( "No request actor available for region: " + region ); } @@ -538,7 +552,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { private void sendUniqueValueRequest( - Entity entity, ActorRef requestActor, UniqueValueActor.Request request) throws UniqueValueException { + Entity entity, ActorRef requestActor, UniqueValueActor.Request request ) throws UniqueValueException { int maxRetries = 5; int retries = 0;
