Fix authoritative region logic and add property for specifying the system-wide authoritative region.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/58fc657c Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/58fc657c Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/58fc657c Branch: refs/heads/release-2.1.1 Commit: 58fc657cb501312934ba7d9888ed31affbe3fcb7 Parents: 0c5deac Author: Dave Johnson <[email protected]> Authored: Fri Jun 17 12:43:18 2016 -0400 Committer: Dave Johnson <[email protected]> Committed: Fri Jun 17 12:43:18 2016 -0400 ---------------------------------------------------------------------- .../main/resources/usergrid-default.properties | 7 +++++-- .../corepersistence/CpEntityManager.java | 14 ++++++++++--- .../corepersistence/CpEntityManagerFactory.java | 22 ++++++++++++++------ .../service/ApplicationServiceImpl.java | 9 +++++--- .../collection/EntityCollectionManager.java | 3 ++- .../impl/EntityCollectionManagerImpl.java | 6 +++--- .../mvcc/stage/write/WriteCommit.java | 6 +++++- .../mvcc/stage/write/WriteUniqueVerify.java | 6 +++++- .../collection/uniquevalues/AkkaFig.java | 10 ++++----- .../uniquevalues/ReservationCache.java | 5 +++++ .../collection/EntityCollectionManagerIT.java | 2 +- 11 files changed, 63 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/58fc657c/stack/config/src/main/resources/usergrid-default.properties ---------------------------------------------------------------------- diff --git a/stack/config/src/main/resources/usergrid-default.properties b/stack/config/src/main/resources/usergrid-default.properties index d7a3311..ffe5fc1 100644 --- a/stack/config/src/main/resources/usergrid-default.properties +++ b/stack/config/src/main/resources/usergrid-default.properties @@ -443,16 +443,19 @@ collection.akka.region.seeds=us-east-1:localhost:2551 # The number of unique value actors to start on each Usergrid instance. collection.akka.uniquevalue.actors=300 -# TTL of unique value reservastion in in-memory cache +# TTL of unique value reservation in in-memory cache collection.akka.uniquevalue.cache.ttl=10 # TTL of a unique value reservation when written to Cassandra collection.akka.uniquevalue.reservation.ttl=10 +# If no region specified for type, use the authoritative region +#collection.akka.uniquevalue.authoritative.region= + ############################## Usergrid Scheduler ########################### # -# Usergrid uses a scheduler for some functions such as scheduled push notificatins. +# Usergrid uses a scheduler for some functions such as scheduled push notifications. # Use the below settings to configure the scheduler. # http://git-wip-us.apache.org/repos/asf/usergrid/blob/58fc657c/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index b33321b..b0d3f59 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -43,6 +43,7 @@ import org.apache.usergrid.persistence.collection.EntityCollectionManager; import org.apache.usergrid.persistence.collection.EntitySet; import org.apache.usergrid.persistence.collection.FieldSet; import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException; +import org.apache.usergrid.persistence.collection.uniquevalues.AkkaFig; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.entities.*; @@ -104,6 +105,8 @@ public class CpEntityManager implements EntityManager { private final UUID applicationId; private final EntityManagerFig entityManagerFig; + private final AkkaFig akkaFig; + private Application application; @@ -168,6 +171,7 @@ public class CpEntityManager implements EntityManager { final AsyncEventService indexService, final ManagerCache managerCache, final MetricsFactory metricsFactory, + final AkkaFig akkaFig, final EntityManagerFig entityManagerFig, final GraphManagerFactory graphManagerFactory, final CollectionService collectionService, @@ -176,6 +180,7 @@ public class CpEntityManager implements EntityManager { final UUID applicationId ) { this.entityManagerFig = entityManagerFig; + this.akkaFig = akkaFig; Preconditions.checkNotNull( cass, "cass must not be null" ); Preconditions.checkNotNull( counterUtils, "counterUtils must not be null" ); @@ -249,8 +254,7 @@ public class CpEntityManager implements EntityManager { */ org.apache.usergrid.persistence.model.entity.Entity load( Id entityId ) { - return ecm .load( entityId ).toBlocking() - .lastOrDefault(null); + return ecm .load( entityId ).toBlocking().lastOrDefault(null); } @@ -605,7 +609,8 @@ public class CpEntityManager implements EntityManager { Id entityId = new SimpleId( entityRef.getUuid(), entityRef.getType() ); //Step 1 & 2 of delete - return ecm.mark( entityId ).mergeWith( gm.markNode( entityId, createGraphOperationTimestamp() ) ); + String region = this.lookupRegionForType( entityRef.getType() ); + return ecm.mark( entityId, region ).mergeWith( gm.markNode( entityId, createGraphOperationTimestamp() ) ); } @@ -3058,6 +3063,8 @@ public class CpEntityManager implements EntityManager { private String lookupRegionForType( String type ) { String region = null; + + // get collection settings for type MapManager mm = getMapManagerForTypes(); CollectionSettingsCache collectionSettingsCache = collectionSettingsCacheFactory.getInstance( mm ); String collectionName = Schema.defaultCollectionName( type ); @@ -3068,6 +3075,7 @@ public class CpEntityManager implements EntityManager { if ( collectionSettings.isPresent() && collectionSettings.get().get("region") != null ) { region = collectionSettings.get().get("region").toString(); } + return region; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/58fc657c/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java index 8e8c5e8..bc1b335 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java @@ -80,7 +80,9 @@ import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION; public class CpEntityManagerFactory implements EntityManagerFactory, ApplicationContextAware { private static final Logger logger = LoggerFactory.getLogger( CpEntityManagerFactory.class ); + private final EntityManagerFig entityManagerFig; + private final AkkaFig akkaFig; private ApplicationContext applicationContext; @@ -123,6 +125,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application this.injector = injector; this.reIndexService = injector.getInstance(ReIndexService.class); this.entityManagerFig = injector.getInstance(EntityManagerFig.class); + this.akkaFig = injector.getInstance( AkkaFig.class ); this.managerCache = injector.getInstance( ManagerCache.class ); this.metricsFactory = injector.getInstance( MetricsFactory.class ); this.indexService = injector.getInstance( AsyncEventService.class ); @@ -134,9 +137,6 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application Properties properties = cassandraService.getProperties(); this.entityManagers = createEntityManagerCache( properties ); - AkkaFig akkaFig = injector.getInstance( AkkaFig.class ); - - logger.info("EntityManagerFactoring starting..."); if ( akkaFig.getAkkaEnabled() ) { @@ -354,9 +354,19 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application private EntityManager _getEntityManager( UUID applicationId ) { - EntityManager em = new CpEntityManager( cassandraService, counterUtils, indexService, managerCache, - metricsFactory, entityManagerFig, graphManagerFactory, collectionService, connectionService, - collectionSettingsCacheFactory, applicationId ); + EntityManager em = new CpEntityManager( + cassandraService, + counterUtils, + indexService, + managerCache, + metricsFactory, + akkaFig, + entityManagerFig, + graphManagerFactory, + collectionService, + connectionService, + collectionSettingsCacheFactory, + applicationId ); return em; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/58fc657c/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationServiceImpl.java index 91f11f5..c6b3b15 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationServiceImpl.java @@ -31,6 +31,7 @@ import org.apache.usergrid.persistence.Schema; import org.apache.usergrid.persistence.collection.EntityCollectionManager; import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; +import org.apache.usergrid.persistence.collection.uniquevalues.AkkaFig; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.graph.GraphManagerFactory; @@ -60,7 +61,7 @@ public class ApplicationServiceImpl implements ApplicationService{ private final MapManagerFactory mapManagerFactory; private final GraphManagerFactory graphManagerFactory; private final CollectionSettingsCacheFactory collectionSettingsCacheFactory; - + private final AkkaFig akkaFig; @Inject @@ -70,7 +71,8 @@ public class ApplicationServiceImpl implements ApplicationService{ EventBuilder eventBuilder, MapManagerFactory mapManagerFactory, GraphManagerFactory graphManagerFactory, - CollectionSettingsCacheFactory collectionSettingsCacheFactory + CollectionSettingsCacheFactory collectionSettingsCacheFactory, + AkkaFig akkaFig ){ this.allEntityIdsObservable = allEntityIdsObservable; @@ -80,6 +82,7 @@ public class ApplicationServiceImpl implements ApplicationService{ this.mapManagerFactory = mapManagerFactory; this.graphManagerFactory = graphManagerFactory; this.collectionSettingsCacheFactory = collectionSettingsCacheFactory; + this.akkaFig = akkaFig; } @@ -112,7 +115,7 @@ public class ApplicationServiceImpl implements ApplicationService{ } countObservable = countObservable.map(id -> { - entityCollectionManager.mark((Id) id) + entityCollectionManager.mark((Id) id, null ) .mergeWith(graphManager.markNode((Id) id, createGraphOperationTimestamp())).toBlocking().last(); return id; }) http://git-wip-us.apache.org/repos/asf/usergrid/blob/58fc657c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java index 0dad0d7..b6056b5 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java @@ -51,9 +51,10 @@ public interface EntityCollectionManager { * @param entityId MarkCommit the entity as deleted. Will not actually remove it from cassandra. This operation will * also remove all unique properties for this entity * + * @param region * @return The observable of the id after the operation has completed */ - Observable<Id> mark( Id entityId ); + Observable<Id> mark(Id entityId, String region); /** * @param entityId The entity id to load. http://git-wip-us.apache.org/repos/asf/usergrid/blob/58fc657c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java index 09bd01b..3cc4a07 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java @@ -214,14 +214,14 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { @Override - public Observable<Id> mark( final Id entityId ) { + public Observable<Id> mark(final Id entityId, String region) { Preconditions.checkNotNull( entityId, "Entity id is required in this stage" ); Preconditions.checkNotNull( entityId.getUuid(), "Entity id is required in this stage" ); Preconditions.checkNotNull( entityId.getType(), "Entity type is required in this stage" ); - Observable<Id> o = Observable.just( new CollectionIoEvent<>( applicationScope, entityId ) ).map( markStart ) - .doOnNext( markCommit ).compose( uniqueCleanup ).map( + Observable<Id> o = Observable.just( new CollectionIoEvent<>( applicationScope, entityId, region ) ) + .map( markStart ).doOnNext( markCommit ).compose( uniqueCleanup ).map( entityEvent -> entityEvent.getEvent().getId() ); return ObservableTimer.time( o, deleteTimer ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/58fc657c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java index abb54c9..44028ae 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java @@ -131,7 +131,11 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect // akkaFig may be null when this is called from JUnit tests if ( akkaFig != null && akkaFig.getAkkaEnabled() ) { - confirmUniqueFieldsAkka( mvccEntity, version, applicationScope, ioEvent.getRegion() ); + String region = ioEvent.getRegion(); + if ( region == null ) { + region = akkaFig.getAkkaAuthoritativeRegion(); + } + confirmUniqueFieldsAkka( mvccEntity, version, applicationScope, region ); } else { confirmUniqueFields( mvccEntity, version, applicationScope, logMutation ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/58fc657c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java index acfc5f6..da394f7 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java @@ -122,8 +122,12 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>> final ApplicationScope applicationScope = ioevent.getEntityCollection(); + String region = ioevent.getRegion(); + if ( region == null ) { + region = akkaFig.getAkkaAuthoritativeRegion(); + } try { - akkaUvService.reserveUniqueValues( applicationScope, entity, mvccEntity.getVersion(), ioevent.getRegion() ); + akkaUvService.reserveUniqueValues( applicationScope, entity, mvccEntity.getVersion(), region ); } catch (UniqueValueException e) { Map<String, Field> violations = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/58fc657c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFig.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFig.java index e709920..0f97403 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFig.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFig.java @@ -47,8 +47,7 @@ public interface AkkaFig extends GuicyFig, Serializable { String AKKA_UNIQUEVALUE_RESERVATION_TTL= "collection.akka.uniquevalue.reservation.ttl"; - String AKKA_UNIQUEVALUE_REGION_TYPES = "collection.akka.uniquevalue.region.types"; - + String AKKA_AUTHORITATIVE_REGION = "collection.akka.uniquevalue.authoritative.region"; /** * Use Akka or nah @@ -96,11 +95,10 @@ public interface AkkaFig extends GuicyFig, Serializable { String getRegionSeeds(); /** - * Authoritative regions may be specified for types - * Comma-separated lists of region types each with format {region}:{type} + * If no region specified for type, use the authoritative region */ - @Key(AKKA_UNIQUEVALUE_REGION_TYPES) - String getRegionTypes(); + @Key(AKKA_AUTHORITATIVE_REGION) + String getAkkaAuthoritativeRegion(); /** * Unique Value cache TTL in seconds. http://git-wip-us.apache.org/repos/asf/usergrid/blob/58fc657c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java index 8dba606..24b7f6e 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java @@ -29,11 +29,13 @@ public class ReservationCache { private static final Logger logger = LoggerFactory.getLogger( RequestActor.class ); Cache<String, UniqueValueActor.Reservation> cache; + long ttl; // use hokey old-style singleton because its not that easy to get Guice into an actor private static ReservationCache instance = null; ReservationCache( long ttl ) { + this.ttl = ttl; cache = CacheBuilder.newBuilder() .maximumSize(1000) .concurrencyLevel( 300 ) @@ -54,15 +56,18 @@ public class ReservationCache { } public UniqueValueActor.Reservation get( String rowKey ) { + if ( ttl == 0 ) { return null; } UniqueValueActor.Reservation res = cache.getIfPresent( rowKey ); return res; } public void cacheReservation( UniqueValueActor.Reservation reservation ) { + if ( ttl == 0 ) { return; } cache.put( reservation.getConsistentHashKey(), reservation ); } public void cancelReservation( UniqueValueActor.Cancellation cancellation ) { + if ( ttl == 0 ) { return; } cache.invalidate( cancellation.getConsistentHashKey() ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/58fc657c/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java index 14565c2..d94b7b5 100644 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java @@ -190,7 +190,7 @@ public class EntityCollectionManagerIT { assertEquals( "Same value", createReturned, loadReturned ); - manager.mark( createReturned.getId() ).toBlocking().last(); + manager.mark( createReturned.getId(), null ).toBlocking().last(); loadObservable = manager.load( createReturned.getId() );
