Repository: incubator-usergrid Updated Branches: refs/heads/two-dot-o-dev ba5c49141 -> 9427b4ce4
Updates amazon async service wiring to wire to event builder Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/d67c220d Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/d67c220d Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/d67c220d Branch: refs/heads/two-dot-o-dev Commit: d67c220dd7221131a5c7bbcfebcd60f78cddffb1 Parents: b9d2d35 Author: Todd Nine <tn...@apigee.com> Authored: Tue Aug 4 12:30:27 2015 -0600 Committer: Todd Nine <tn...@apigee.com> Committed: Tue Aug 4 12:30:27 2015 -0600 ---------------------------------------------------------------------- .../asyncevents/AmazonAsyncEventService.java | 132 +++++++++---------- .../asyncevents/AsyncIndexProvider.java | 11 +- .../asyncevents/EventBuilder.java | 11 +- .../asyncevents/EventBuilderImpl.java | 12 +- .../asyncevents/InMemoryAsyncEventService.java | 14 +- .../asyncevents/model/EntityIndexEvent.java | 16 ++- .../index/AmazonAsyncEventServiceTest.java | 7 +- 7 files changed, 107 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d67c220d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java index ec43ab7..0429af3 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java @@ -27,14 +27,15 @@ import java.util.concurrent.atomic.AtomicLong; import com.codahale.metrics.Histogram; import com.google.common.base.Preconditions; -import org.apache.usergrid.corepersistence.CpEntityManager; + import org.apache.usergrid.corepersistence.asyncevents.model.*; import org.apache.usergrid.corepersistence.index.*; import org.apache.usergrid.corepersistence.rx.impl.EdgeScope; import org.apache.usergrid.persistence.index.EntityIndex; import org.apache.usergrid.persistence.index.EntityIndexFactory; import org.apache.usergrid.persistence.index.IndexLocationStrategy; -import org.apache.usergrid.utils.UUIDUtils; +import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,15 +79,19 @@ public class AmazonAsyncEventService implements AsyncEventService { private final QueueManager queue; private final QueueScope queueScope; private final IndexProcessorFig indexProcessorFig; - private final IndexService indexService; private final EntityCollectionManagerFactory entityCollectionManagerFactory; private final IndexLocationStrategyFactory indexLocationStrategyFactory; private final EntityIndexFactory entityIndexFactory; + private final EventBuilder eventBuilder; + private final RxTaskScheduler rxTaskScheduler; private final Timer readTimer; private final Timer writeTimer; private final Timer ackTimer; + /** + * This mutex is used to start/stop workers to ensure we're not concurrently modifying our subscriptions + */ private final Object mutex = new Object(); private final Counter indexErrorCounter; @@ -99,19 +104,17 @@ public class AmazonAsyncEventService implements AsyncEventService { @Inject - public AmazonAsyncEventService(final QueueManagerFactory queueManagerFactory, - final IndexProcessorFig indexProcessorFig, - final MetricsFactory metricsFactory, - final IndexService indexService, - final EntityCollectionManagerFactory entityCollectionManagerFactory, - final IndexLocationStrategyFactory indexLocationStrategyFactory, - final EntityIndexFactory entityIndexFactory - ) { - - this.indexService = indexService; + public AmazonAsyncEventService( final QueueManagerFactory queueManagerFactory, final IndexProcessorFig indexProcessorFig, + final MetricsFactory metricsFactory, final EntityCollectionManagerFactory entityCollectionManagerFactory, + final IndexLocationStrategyFactory indexLocationStrategyFactory, final EntityIndexFactory entityIndexFactory, + final EventBuilder eventBuilder, + final RxTaskScheduler rxTaskScheduler ) { + this.entityCollectionManagerFactory = entityCollectionManagerFactory; this.indexLocationStrategyFactory = indexLocationStrategyFactory; this.entityIndexFactory = entityIndexFactory; + this.eventBuilder = eventBuilder; + this.rxTaskScheduler = rxTaskScheduler; this.queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALLREGIONS); this.queue = queueManagerFactory.getQueueManager(queueScope); @@ -144,7 +147,7 @@ public class AmazonAsyncEventService implements AsyncEventService { try { //signal to SQS - this.queue.sendMessage(operation); + this.queue.sendMessage( operation ); } catch (IOException e) { throw new RuntimeException("Unable to queue message", e); } finally { @@ -157,7 +160,7 @@ public class AmazonAsyncEventService implements AsyncEventService { try { //signal to SQS - this.queue.sendMessages(operations); + this.queue.sendMessages( operations ); } catch (IOException e) { throw new RuntimeException("Unable to queue message", e); } finally { @@ -189,33 +192,6 @@ public class AmazonAsyncEventService implements AsyncEventService { /** * Ack message in SQS */ - public void ack(final List<QueueMessage> messages) { - - final Timer.Context timer = this.ackTimer.time(); - - try{ - // no op - if (messages.size() == 0) { - return; - } - queue.commitMessages(messages); - - //decrement our in-flight counter - inFlight.addAndGet(-1 * messages.size()); - - }catch(Exception e){ - throw new RuntimeException("Unable to ack messages", e); - } - finally { - timer.stop(); - } - - - } - - /** - * Ack message in SQS - */ public void ack(final QueueMessage message) { final Timer.Context timer = this.ackTimer.time(); @@ -290,7 +266,7 @@ public class AmazonAsyncEventService implements AsyncEventService { public void queueEntityIndexUpdate(final ApplicationScope applicationScope, final Entity entity) { - offer(new EntityIndexEvent(new EntityIdScope(applicationScope, entity.getId()))); + offer(new EntityIndexEvent(new EntityIdScope(applicationScope, entity.getId()), 0)); } @@ -298,7 +274,7 @@ public class AmazonAsyncEventService implements AsyncEventService { Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityIndexUpdate"); - final AsyncEvent event = (AsyncEvent) message.getBody(); + final EntityIndexEvent event = (EntityIndexEvent) message.getBody(); Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEntityIndexUpdate"); Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.ENTITY_INDEX, String.format("Event Type for handleEntityIndexUpdate must be ENTITY_INDEX, got %s", event.getEventType())); @@ -307,13 +283,14 @@ public class AmazonAsyncEventService implements AsyncEventService { //only process the same version, otherwise ignore final EntityIdScope entityIdScope = event.getEntityIdScope(); final ApplicationScope applicationScope = entityIdScope.getApplicationScope(); + final Id entityId = entityIdScope.getId(); + final long updatedAfter = event.getUpdatedAfter(); + + final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, entityId, updatedAfter); - final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager(applicationScope); + final Observable<IndexOperationMessage> observable = eventBuilder.buildEntityIndex( entityIndexOperation ); - ecm.load(entityIdScope.getId()) - .first() - .flatMap(entity -> indexService.indexEntity(applicationScope, entity)) - .doOnNext(ignore -> ack(message)).subscribe(); + subscibeAndAck( observable, message ); } @@ -324,7 +301,7 @@ public class AmazonAsyncEventService implements AsyncEventService { EdgeIndexEvent operation = new EdgeIndexEvent(applicationScope, entity.getId(), newEdge); - offer(operation); + offer( operation ); } public void handleEdgeIndex(final QueueMessage message) { @@ -339,18 +316,21 @@ public class AmazonAsyncEventService implements AsyncEventService { final ApplicationScope applicationScope = event.getApplicationScope(); final Edge edge = event.getEdge(); - final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager(applicationScope); - ecm.load(event.getEntityId()) - .flatMap(entity -> indexService.indexEdge(applicationScope, entity, edge)) - .doOnNext(ignore -> ack(message)).subscribe(); + + final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope ); + + final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load(event.getEntityId()).flatMap( entity -> eventBuilder.buildNewEdge( + applicationScope, entity, edge ) ); + + subscibeAndAck( edgeIndexObservable, message ); } @Override public void queueDeleteEdge(final ApplicationScope applicationScope, final Edge edge) { - offer(new EdgeDeleteEvent(applicationScope, edge)); + offer( new EdgeDeleteEvent( applicationScope, edge ) ); } public void handleEdgeDelete(final QueueMessage message) { @@ -367,15 +347,16 @@ public class AmazonAsyncEventService implements AsyncEventService { if (logger.isDebugEnabled()) logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge); - indexService.deleteIndexEdge(applicationScope, edge) - .doOnNext(ignore -> ack(message)).subscribe(); + final Observable<IndexOperationMessage> observable = eventBuilder.buildDeleteEdge( applicationScope, edge ); + + subscibeAndAck( observable, message ); } @Override public void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId) { - offer(new EntityDeleteEvent(new EntityIdScope(applicationScope, entityId))); + offer( new EntityDeleteEvent( new EntityIdScope( applicationScope, entityId ) ) ); } public void handleEntityDelete(final QueueMessage message) { @@ -384,7 +365,8 @@ public class AmazonAsyncEventService implements AsyncEventService { final AsyncEvent event = (AsyncEvent) message.getBody(); Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEntityDelete"); - Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.ENTITY_DELETE, String.format("Event Type for handleEntityDelete must be ENTITY_DELETE, got %s", event.getEventType())); + Preconditions.checkArgument( event.getEventType() == AsyncEvent.EventType.ENTITY_DELETE, + String.format( "Event Type for handleEntityDelete must be ENTITY_DELETE, got %s", event.getEventType() ) ); final ApplicationScope applicationScope = event.getApplicationScope(); final Id entityId = event.getEntityId(); @@ -392,10 +374,16 @@ public class AmazonAsyncEventService implements AsyncEventService { if (logger.isDebugEnabled()) logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId); - ack(message); + ack( message ); + + final EventBuilderImpl.EntityDeleteResults + entityDeleteResults = eventBuilder.buildEntityDelete( applicationScope, entityId ); + + + final Observable merged = Observable.merge( entityDeleteResults.getEntitiesCompacted(), + entityDeleteResults.getIndexObservable() ); - indexService.deleteEntityIndexes(applicationScope, entityId, UUIDUtils.maxTimeUUID(Long.MAX_VALUE)) - .doOnNext(ignore -> ack(message)).subscribe(); + subscibeAndAck( merged, message ); } @@ -407,9 +395,9 @@ public class AmazonAsyncEventService implements AsyncEventService { Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.APPLICATION_INDEX, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getEventType())); final IndexLocationStrategy indexLocationStrategy = event.getIndexLocationStrategy(); - final EntityIndex index = entityIndexFactory.createEntityIndex(indexLocationStrategy); + final EntityIndex index = entityIndexFactory.createEntityIndex( indexLocationStrategy ); index.initialize(); - ack(message); + ack( message ); } /** @@ -494,7 +482,7 @@ public class AmazonAsyncEventService implements AsyncEventService { public void index(final ApplicationScope applicationScope, final Id id, final long updatedSince) { //change to id scope to avoid serialization issues - offer(new EntityIndexEvent(new EntityIdScope(applicationScope, id))); + offer(new EntityIndexEvent(new EntityIdScope(applicationScope, id), updatedSince)); } public void indexBatch(final List<EdgeScope> edges, final long updatedSince) { @@ -502,8 +490,18 @@ public class AmazonAsyncEventService implements AsyncEventService { List batch = new ArrayList<EdgeScope>(); for ( EdgeScope e : edges){ //change to id scope to avoid serialization issues - batch.add(new EntityIndexEvent(new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()))); + batch.add(new EntityIndexEvent(new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), updatedSince)); } - offerBatch(batch); + offerBatch( batch ); + } + + + /** + * Subscribes to the observable and acks the message via SQS on completion + * @param observable + * @param message + */ + private void subscibeAndAck( final Observable<?> observable, final QueueMessage message ){ + observable.doOnCompleted( ()-> ack(message) ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe(); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d67c220d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java index 0a58369..0e773cf 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java @@ -44,7 +44,6 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> { private final QueueManagerFactory queueManagerFactory; private final MetricsFactory metricsFactory; - private final IndexService indexService; private final RxTaskScheduler rxTaskScheduler; private final EntityCollectionManagerFactory entityCollectionManagerFactory; private final EventBuilder eventBuilder; @@ -58,7 +57,6 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> { public AsyncIndexProvider(final IndexProcessorFig indexProcessorFig, final QueueManagerFactory queueManagerFactory, final MetricsFactory metricsFactory, - final IndexService indexService, final RxTaskScheduler rxTaskScheduler, final EntityCollectionManagerFactory entityCollectionManagerFactory, final EventBuilder eventBuilder, @@ -68,7 +66,6 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> { this.indexProcessorFig = indexProcessorFig; this.queueManagerFactory = queueManagerFactory; this.metricsFactory = metricsFactory; - this.indexService = indexService; this.rxTaskScheduler = rxTaskScheduler; this.entityCollectionManagerFactory = entityCollectionManagerFactory; this.eventBuilder = eventBuilder; @@ -97,11 +94,11 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> { case LOCAL: return new InMemoryAsyncEventService(eventBuilder, rxTaskScheduler, indexProcessorFig.resolveSynchronously()); case SQS: - return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, metricsFactory, indexService, - entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory ); + return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, metricsFactory, + entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler ); case SNS: - return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, metricsFactory, indexService, - entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory ); + return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, metricsFactory, + entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler ); default: throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed"); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d67c220d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java index f9f157e..d246e2f 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java @@ -37,13 +37,14 @@ import rx.Observable; * Interface for constructing an observable stream to perform asynchonous events */ public interface EventBuilder { + /** * Return the cold observable of entity index update operations * @param applicationScope * @param entity * @return */ - Observable<IndexOperationMessage> queueEntityIndexUpdate( ApplicationScope applicationScope, Entity entity ); + Observable<IndexOperationMessage> buildEntityIndexUpdate( ApplicationScope applicationScope, Entity entity ); /** * Return the cold observable of the new edge operation @@ -52,7 +53,7 @@ public interface EventBuilder { * @param newEdge * @return */ - Observable<IndexOperationMessage> queueNewEdge( ApplicationScope applicationScope, Entity entity, Edge newEdge ); + Observable<IndexOperationMessage> buildNewEdge( ApplicationScope applicationScope, Entity entity, Edge newEdge ); /** * Return the cold observable of the deleted edge operations @@ -60,7 +61,7 @@ public interface EventBuilder { * @param edge * @return */ - Observable<IndexOperationMessage> queueDeleteEdge( ApplicationScope applicationScope, Edge edge ); + Observable<IndexOperationMessage> buildDeleteEdge( ApplicationScope applicationScope, Edge edge ); /** * Return a ben with 2 obervable streams for entity delete. @@ -68,14 +69,14 @@ public interface EventBuilder { * @param entityId * @return */ - EventBuilderImpl.EntityDeleteResults queueEntityDelete( ApplicationScope applicationScope, Id entityId ); + EventBuilderImpl.EntityDeleteResults buildEntityDelete( ApplicationScope applicationScope, Id entityId ); /** * Re-index an entity in the scope provided * @param entityIndexOperation * @return */ - Observable<IndexOperationMessage> index( EntityIndexOperation entityIndexOperation ); + Observable<IndexOperationMessage> buildEntityIndex( EntityIndexOperation entityIndexOperation ); /** * A bean to hold both our observables so the caller can choose the subscription mechanism. Note that http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d67c220d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java index e4cd4b5..46cec2e 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java @@ -73,7 +73,7 @@ public class EventBuilderImpl implements EventBuilder { @Override - public Observable<IndexOperationMessage> queueEntityIndexUpdate( final ApplicationScope applicationScope, + public Observable<IndexOperationMessage> buildEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity ) { //process the entity immediately @@ -89,7 +89,7 @@ public class EventBuilderImpl implements EventBuilder { @Override - public Observable<IndexOperationMessage> queueNewEdge( final ApplicationScope applicationScope, final Entity entity, + public Observable<IndexOperationMessage> buildNewEdge( final ApplicationScope applicationScope, final Entity entity, final Edge newEdge ) { log.debug( "Indexing in app scope {} with entity {} and new edge {}", @@ -103,8 +103,8 @@ public class EventBuilderImpl implements EventBuilder { @Override - public Observable<IndexOperationMessage> queueDeleteEdge( final ApplicationScope applicationScope, - final Edge edge ) { + public Observable<IndexOperationMessage> buildDeleteEdge( final ApplicationScope applicationScope, final Edge + edge ) { log.debug( "Deleting in app scope {} with edge {} }", applicationScope, edge ); final Observable<IndexOperationMessage> edgeObservable = @@ -121,7 +121,7 @@ public class EventBuilderImpl implements EventBuilder { //it'll need to be pushed up higher so we can do the marking that isn't async or does it not matter? @Override - public EntityDeleteResults queueEntityDelete( final ApplicationScope applicationScope, final Id entityId ) { + public EntityDeleteResults buildEntityDelete( final ApplicationScope applicationScope, final Id entityId ) { log.debug( "Deleting entity id from index in app scope {} with entityId {} }", applicationScope, entityId ); final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope ); @@ -163,7 +163,7 @@ public class EventBuilderImpl implements EventBuilder { @Override - public Observable<IndexOperationMessage> index( final EntityIndexOperation entityIndexOperation ) { + public Observable<IndexOperationMessage> buildEntityIndex( final EntityIndexOperation entityIndexOperation ) { final ApplicationScope applicationScope = entityIndexOperation.getApplicationScope(); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d67c220d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java index adb4a90..830033d 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java @@ -20,8 +20,6 @@ package org.apache.usergrid.corepersistence.asyncevents; -import com.amazonaws.services.opsworks.model.App; -import org.apache.usergrid.persistence.index.IndexLocationStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,19 +75,19 @@ public class InMemoryAsyncEventService implements AsyncEventService { //only process the same version, otherwise ignore - run( eventBuilder.queueEntityIndexUpdate(applicationScope, entity) ); + run( eventBuilder.buildEntityIndexUpdate( applicationScope, entity ) ); } @Override public void queueNewEdge( final ApplicationScope applicationScope, final Entity entity, final Edge newEdge ) { - run( eventBuilder.queueNewEdge(applicationScope, entity, newEdge) ); + run( eventBuilder.buildNewEdge( applicationScope, entity, newEdge ) ); } @Override public void queueDeleteEdge( final ApplicationScope applicationScope, final Edge edge ) { - run( eventBuilder.queueDeleteEdge(applicationScope, edge) ); + run( eventBuilder.buildDeleteEdge( applicationScope, edge ) ); } @@ -97,7 +95,7 @@ public class InMemoryAsyncEventService implements AsyncEventService { public void queueEntityDelete( final ApplicationScope applicationScope, final Id entityId ) { final EventBuilderImpl.EntityDeleteResults results = - eventBuilder.queueEntityDelete( applicationScope, entityId ); + eventBuilder.buildEntityDelete( applicationScope, entityId ); run( results.getIndexObservable() ); run( results.getEntitiesCompacted() ); @@ -107,7 +105,7 @@ public class InMemoryAsyncEventService implements AsyncEventService { public void index( final ApplicationScope applicationScope, final Id id, final long updatedSince ) { final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, id, updatedSince ); - run(eventBuilder.index( entityIndexOperation )); + run(eventBuilder.buildEntityIndex( entityIndexOperation )); } public void indexBatch(final List<EdgeScope> edges, final long updatedSince) { @@ -115,7 +113,7 @@ public class InMemoryAsyncEventService implements AsyncEventService { final EntityIndexOperation entityIndexOperation = new EntityIndexOperation(e.getApplicationScope(), e.getEdge().getTargetNode(), updatedSince); - run(eventBuilder.index (entityIndexOperation)); + run(eventBuilder.buildEntityIndex( entityIndexOperation )); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d67c220d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java index 7b79987..81961a0 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java @@ -27,10 +27,24 @@ import org.apache.usergrid.persistence.collection.serialization.impl.migration.E */ @JsonDeserialize(as = AsyncEvent.class) public final class EntityIndexEvent extends AsyncEvent { + + private long updatedAfter; + public EntityIndexEvent() { } - public EntityIndexEvent(EntityIdScope entityIdScope) { + public EntityIndexEvent(EntityIdScope entityIdScope, final long updatedAfter ) { super(EventType.ENTITY_INDEX, entityIdScope); + this.updatedAfter = updatedAfter; + } + + + public long getUpdatedAfter() { + return updatedAfter; + } + + + public void setUpdatedAfter( long updatedAfter ) { + this.updatedAfter = updatedAfter; } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d67c220d/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java index 9cf896c..d37701b 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java @@ -20,6 +20,7 @@ package org.apache.usergrid.corepersistence.index; +import org.apache.usergrid.corepersistence.asyncevents.EventBuilder; import org.apache.usergrid.persistence.index.EntityIndexFactory; import org.junit.Rule; import org.junit.runner.RunWith; @@ -71,6 +72,9 @@ public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest { @Inject public RxTaskScheduler rxTaskScheduler; + @Inject + public EventBuilder eventBuilder; + @Inject public IndexLocationStrategyFactory indexLocationStrategyFactory; @@ -81,8 +85,7 @@ public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest { @Override protected AsyncEventService getAsyncEventService() { - return new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, metricsFactory, indexService, - entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory ); + return new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, rxTaskScheduler ); }