Reduce SQS hop for entity write/update indexing events.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/b4634dc4 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/b4634dc4 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/b4634dc4 Branch: refs/heads/master Commit: b4634dc42f767a982892362bbd4aa66059bf1998 Parents: d4c7a3c Author: Michael Russo <[email protected]> Authored: Fri Feb 19 13:35:48 2016 -0800 Committer: Michael Russo <[email protected]> Committed: Fri Feb 19 13:35:48 2016 -0800 ---------------------------------------------------------------------- .../corepersistence/CpEntityManager.java | 4 +- .../asyncevents/AmazonAsyncEventService.java | 844 ------------------- .../asyncevents/AsyncEventService.java | 5 +- .../asyncevents/AsyncEventServiceImpl.java | 798 ++++++++++++++++++ .../asyncevents/AsyncIndexProvider.java | 4 +- .../asyncevents/EventBuilder.java | 12 +- .../asyncevents/EventBuilderImpl.java | 15 +- .../asyncevents/IndexDocNotFoundException.java | 37 + .../corepersistence/index/IndexServiceImpl.java | 4 +- .../read/search/CandidateEntityFilter.java | 10 +- .../index/AmazonAsyncEventServiceTest.java | 103 --- .../index/AsyncEventServiceImplTest.java | 103 +++ .../index/AsyncIndexServiceTest.java | 3 +- .../index/impl/EsIndexProducerImpl.java | 5 +- .../usergrid/persistence/queue/QueueFig.java | 4 + .../usergrid/services/ServiceManager.java | 5 +- 16 files changed, 972 insertions(+), 984 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index 6c2ef0b..b677f79 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -590,7 +590,7 @@ public class CpEntityManager implements EntityManager { // update in all containing collections and connection indexes - indexService.queueEntityIndexUpdate( applicationScope, cpEntity ); + indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0); } @@ -1107,7 +1107,7 @@ public class CpEntityManager implements EntityManager { //Adding graphite metrics - indexService.queueEntityIndexUpdate(applicationScope, cpEntity); + indexService.queueEntityIndexUpdate(applicationScope, cpEntity, 0); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java deleted file mode 100644 index 00dc69a..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java +++ /dev/null @@ -1,844 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.asyncevents; - - -import java.io.IOException; -import java.io.Serializable; -import java.util.*; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import org.apache.usergrid.persistence.index.impl.*; -import org.elasticsearch.action.index.IndexRequest; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.usergrid.corepersistence.asyncevents.model.AsyncEvent; -import org.apache.usergrid.corepersistence.asyncevents.model.EdgeDeleteEvent; -import org.apache.usergrid.corepersistence.asyncevents.model.EdgeIndexEvent; -import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent; -import org.apache.usergrid.corepersistence.asyncevents.model.EntityDeleteEvent; -import org.apache.usergrid.corepersistence.asyncevents.model.EntityIndexEvent; -import org.apache.usergrid.corepersistence.asyncevents.model.InitializeApplicationIndexEvent; -import org.apache.usergrid.corepersistence.index.EntityIndexOperation; -import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory; -import org.apache.usergrid.corepersistence.index.IndexProcessorFig; -import org.apache.usergrid.corepersistence.index.ReplicatedIndexLocationStrategy; -import org.apache.usergrid.corepersistence.rx.impl.EdgeScope; -import org.apache.usergrid.corepersistence.util.CpNamingUtils; -import org.apache.usergrid.corepersistence.util.ObjectJsonSerializer; -import org.apache.usergrid.persistence.collection.EntityCollectionManager; -import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; -import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; -import org.apache.usergrid.persistence.core.metrics.MetricsFactory; -import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.graph.Edge; -import org.apache.usergrid.persistence.index.EntityIndex; -import org.apache.usergrid.persistence.index.EntityIndexFactory; -import org.apache.usergrid.persistence.index.IndexLocationStrategy; -import org.apache.usergrid.persistence.map.MapManager; -import org.apache.usergrid.persistence.map.MapManagerFactory; -import org.apache.usergrid.persistence.map.MapScope; -import org.apache.usergrid.persistence.map.impl.MapScopeImpl; -import org.apache.usergrid.persistence.model.entity.Entity; -import org.apache.usergrid.persistence.model.entity.Id; -import org.apache.usergrid.persistence.model.util.UUIDGenerator; -import org.apache.usergrid.persistence.queue.QueueFig; -import org.apache.usergrid.persistence.queue.QueueManager; -import org.apache.usergrid.persistence.queue.QueueManagerFactory; -import org.apache.usergrid.persistence.queue.QueueMessage; -import org.apache.usergrid.persistence.queue.QueueScope; -import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl; - -import com.codahale.metrics.Counter; -import com.codahale.metrics.Gauge; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.Timer; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.inject.Inject; -import com.google.inject.Singleton; - -import rx.Observable; -import rx.Subscriber; -import rx.Subscription; -import rx.schedulers.Schedulers; - - -/** - * TODO, this whole class is becoming a nightmare. We need to remove all consume from this class and refactor it into the following manner. - * - * 1. Produce. Keep the code in the handle as is - * 2. Consume: Move the code into a refactored system - * 2.1 A central dispatcher - * 2.2 An interface that produces an observable of type BatchOperation. Any handler will be refactored into it's own - * impl that will then emit a stream of batch operations to perform - * 2.3 The central dispatcher will then subscribe to these events and merge them. Handing them off to a batch handler - * 2.4 The batch handler will roll up the operations into a batch size, and then queue them - * 2.5 The receive batch handler will execute the batch operations - * - * TODO determine how we error handle? - * - */ -@Singleton -public class AmazonAsyncEventService implements AsyncEventService { - - - private static final Logger logger = LoggerFactory.getLogger(AmazonAsyncEventService.class); - - // SQS maximum receive messages is 10 - public int MAX_TAKE = 10; - public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars - - private final QueueManager queue; - private final IndexProcessorFig indexProcessorFig; - private final QueueFig queueFig; - private final IndexProducer indexProducer; - private final EntityCollectionManagerFactory entityCollectionManagerFactory; - private final IndexLocationStrategyFactory indexLocationStrategyFactory; - private final EntityIndexFactory entityIndexFactory; - private final EventBuilder eventBuilder; - private final RxTaskScheduler rxTaskScheduler; - - private final Timer readTimer; - private final Timer writeTimer; - private final Timer ackTimer; - - /** - * This mutex is used to start/stop workers to ensure we're not concurrently modifying our subscriptions - */ - private final Object mutex = new Object(); - - private final Counter indexErrorCounter; - private final AtomicLong counter = new AtomicLong(); - private final AtomicLong inFlight = new AtomicLong(); - private final Histogram messageCycle; - private final MapManager esMapPersistence; - - //the actively running subscription - private List<Subscription> subscriptions = new ArrayList<>(); - - - @Inject - public AmazonAsyncEventService( final QueueManagerFactory queueManagerFactory, - final IndexProcessorFig indexProcessorFig, - final IndexProducer indexProducer, - final MetricsFactory metricsFactory, - final EntityCollectionManagerFactory entityCollectionManagerFactory, - final IndexLocationStrategyFactory indexLocationStrategyFactory, - final EntityIndexFactory entityIndexFactory, - final EventBuilder eventBuilder, - final MapManagerFactory mapManagerFactory, - final QueueFig queueFig, - @EventExecutionScheduler - final RxTaskScheduler rxTaskScheduler ) { - this.indexProducer = indexProducer; - - this.entityCollectionManagerFactory = entityCollectionManagerFactory; - this.indexLocationStrategyFactory = indexLocationStrategyFactory; - this.entityIndexFactory = entityIndexFactory; - this.eventBuilder = eventBuilder; - - final MapScope mapScope = new MapScopeImpl( CpNamingUtils.getManagementApplicationId(), "indexEvents"); - - this.esMapPersistence = mapManagerFactory.createMapManager( mapScope ); - - this.rxTaskScheduler = rxTaskScheduler; - - QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL); - this.queue = queueManagerFactory.getQueueManager(queueScope); - - this.indexProcessorFig = indexProcessorFig; - this.queueFig = queueFig; - - this.writeTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.write"); - this.readTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.read"); - this.ackTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.ack"); - this.indexErrorCounter = metricsFactory.getCounter(AmazonAsyncEventService.class, "async_event.error"); - this.messageCycle = metricsFactory.getHistogram(AmazonAsyncEventService.class, "async_event.message_cycle"); - - - //wire up the gauge of inflight message - metricsFactory.addGauge(AmazonAsyncEventService.class, "async-event.inflight", new Gauge<Long>() { - @Override - public Long getValue() { - return inFlight.longValue(); - } - }); - - start(); - } - - - /** - * Offer the EntityIdScope to SQS - */ - private void offer(final Serializable operation) { - final Timer.Context timer = this.writeTimer.time(); - - try { - //signal to SQS - this.queue.sendMessage( operation ); - } catch (IOException e) { - throw new RuntimeException("Unable to queue message", e); - } finally { - timer.stop(); - } - } - - - private void offerTopic( final Serializable operation ) { - final Timer.Context timer = this.writeTimer.time(); - - try { - //signal to SQS - this.queue.sendMessageToTopic( operation ); - } - catch ( IOException e ) { - throw new RuntimeException( "Unable to queue message", e ); - } - finally { - timer.stop(); - } - } - - private void offerBatch(final List operations){ - final Timer.Context timer = this.writeTimer.time(); - - try { - //signal to SQS - this.queue.sendMessages(operations); - } catch (IOException e) { - throw new RuntimeException("Unable to queue message", e); - } finally { - timer.stop(); - } - } - - - /** - * Take message from SQS - */ - private List<QueueMessage> take() { - - final Timer.Context timer = this.readTimer.time(); - - try { - return queue.getMessages(MAX_TAKE, - indexProcessorFig.getIndexQueueVisibilityTimeout(), - indexProcessorFig.getIndexQueueTimeout(), - AsyncEvent.class); - } - //stop our timer - finally { - timer.stop(); - } - } - - - - /** - * Ack message in SQS - */ - public void ack(final List<QueueMessage> messages) { - - final Timer.Context timer = this.ackTimer.time(); - - try{ - queue.commitMessages( messages ); - - //decrement our in-flight counter - inFlight.decrementAndGet(); - - }catch(Exception e){ - throw new RuntimeException("Unable to ack messages", e); - }finally { - timer.stop(); - } - - - } - - /** - * calls the event handlers and returns a result with information on whether it needs to be ack'd and whether it needs to be indexed - * @param messages - * @return - */ - private List<IndexEventResult> callEventHandlers(final List<QueueMessage> messages) { - - if (logger.isDebugEnabled()) { - logger.debug("callEventHandlers with {} message", messages.size()); - } - - Stream<IndexEventResult> indexEventResults = messages.stream().map(message -> { - AsyncEvent event = null; - try { - event = (AsyncEvent) message.getBody(); - } catch (ClassCastException cce) { - logger.error("Failed to deserialize message body", cce); - } - - if (event == null) { - logger.error("AsyncEvent type or event is null!"); - return new IndexEventResult(Optional.fromNullable(message), Optional.<IndexOperationMessage>absent(), - System.currentTimeMillis()); - } - - final AsyncEvent thisEvent = event; - - if (logger.isDebugEnabled()) { - logger.debug("Processing {} event", event); - } - - try { - //check for empty sets if this is true - boolean validateEmptySets = true; - Observable<IndexOperationMessage> indexoperationObservable; - //merge each operation to a master observable; - if ( event instanceof EdgeDeleteEvent ) { - indexoperationObservable = handleEdgeDelete( message ); - } - else if ( event instanceof EdgeIndexEvent ) { - indexoperationObservable = handleEdgeIndex( message ); - } - else if ( event instanceof EntityDeleteEvent ) { - indexoperationObservable = handleEntityDelete( message ); - validateEmptySets = false; // do not check this one for an empty set b/c it can be empty - - } - else if ( event instanceof EntityIndexEvent ) { - indexoperationObservable = handleEntityIndexUpdate( message ); - } - else if ( event instanceof InitializeApplicationIndexEvent ) { - //does not return observable - handleInitializeApplicationIndex(event, message); - indexoperationObservable = Observable.just(new IndexOperationMessage()); - validateEmptySets = false; //do not check this one for an empty set b/c it will be empty. - } else if (event instanceof ElasticsearchIndexEvent) { - handleIndexOperation((ElasticsearchIndexEvent) event); - indexoperationObservable = Observable.just(new IndexOperationMessage()); - validateEmptySets = false; //do not check this one for an empty set b/c it will be empty. - } else { - throw new Exception("Unknown EventType");//TODO: print json instead - } - - //collect all of the - IndexOperationMessage indexOperationMessage = indexoperationObservable - .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single)) - .toBlocking().lastOrDefault(null); - - if (validateEmptySets && (indexOperationMessage == null || indexOperationMessage.isEmpty())) { - logger.error("Received empty index sequence message:({}), body:({}) ", message.getMessageId(), - message.getStringBody()); - throw new Exception("Received empty index sequence."); - } - - //return type that can be indexed and ack'd later - return new IndexEventResult(Optional.fromNullable(message), - Optional.fromNullable(indexOperationMessage), thisEvent.getCreationTime()); - } catch (Exception e) { - logger.error("Failed to index message: {} {}", message.getMessageId(), message.getStringBody(), e); - return new IndexEventResult(Optional.absent(), Optional.<IndexOperationMessage>absent(), - event.getCreationTime()); - } - }); - - - return indexEventResults.collect(Collectors.toList()); - } - - @Override - public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) { - IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy( - applicationScope); - offerTopic( new InitializeApplicationIndexEvent( queueFig.getPrimaryRegion(), - new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ) ); - } - - - @Override - public void queueEntityIndexUpdate(final ApplicationScope applicationScope, - final Entity entity) { - - offer(new EntityIndexEvent(queueFig.getPrimaryRegion(),new EntityIdScope(applicationScope, entity.getId()), 0)); - } - - - public Observable<IndexOperationMessage> handleEntityIndexUpdate(final QueueMessage message) { - - Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEntityIndexUpdate" ); - - final AsyncEvent event = ( AsyncEvent ) message.getBody(); - - Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEntityIndexUpdate"); - Preconditions.checkArgument(event instanceof EntityIndexEvent, String.format("Event Type for handleEntityIndexUpdate must be ENTITY_INDEX, got %s", event.getClass())); - - final EntityIndexEvent entityIndexEvent = (EntityIndexEvent) event; - - - //process the entity immediately - //only process the same version, otherwise ignore - final EntityIdScope entityIdScope = entityIndexEvent.getEntityIdScope(); - final ApplicationScope applicationScope = entityIdScope.getApplicationScope(); - final Id entityId = entityIdScope.getId(); - final long updatedAfter = entityIndexEvent.getUpdatedAfter(); - - final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, entityId, updatedAfter); - - final Observable<IndexOperationMessage> observable = eventBuilder.buildEntityIndex( entityIndexOperation ); - return observable; - } - - - @Override - public void queueNewEdge(final ApplicationScope applicationScope, - final Entity entity, - final Edge newEdge) { - - EdgeIndexEvent operation = new EdgeIndexEvent(queueFig.getPrimaryRegion(), applicationScope, entity.getId(), newEdge); - - offer( operation ); - } - - public Observable<IndexOperationMessage> handleEdgeIndex(final QueueMessage message) { - - Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEdgeIndex" ); - - final AsyncEvent event = (AsyncEvent) message.getBody(); - - Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEdgeIndex" ); - Preconditions.checkArgument(event instanceof EdgeIndexEvent, String.format("Event Type for handleEdgeIndex must be EDGE_INDEX, got %s", event.getClass())); - - final EdgeIndexEvent edgeIndexEvent = ( EdgeIndexEvent ) event; - - final ApplicationScope applicationScope = edgeIndexEvent.getApplicationScope(); - final Edge edge = edgeIndexEvent.getEdge(); - - - - final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope ); - - final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load( edgeIndexEvent.getEntityId() ).flatMap( - entity -> eventBuilder.buildNewEdge(applicationScope, entity, edge)); - return edgeIndexObservable; - } - - @Override - public void queueDeleteEdge(final ApplicationScope applicationScope, - final Edge edge) { - - offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ) ); - } - - public Observable<IndexOperationMessage> handleEdgeDelete(final QueueMessage message) { - - Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEdgeDelete" ); - - final AsyncEvent event = (AsyncEvent) message.getBody(); - - Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEdgeDelete" ); - Preconditions.checkArgument(event instanceof EdgeDeleteEvent, String.format("Event Type for handleEdgeDelete must be EDGE_DELETE, got %s", event.getClass())); - - - final EdgeDeleteEvent edgeDeleteEvent = ( EdgeDeleteEvent ) event; - - final ApplicationScope applicationScope = edgeDeleteEvent.getApplicationScope(); - final Edge edge = edgeDeleteEvent.getEdge(); - - if (logger.isDebugEnabled()) { - logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge); - } - - return eventBuilder.buildDeleteEdge(applicationScope, edge); - } - - - @Override - public void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId) { - - offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) ); - } - - - /** - * Queue up an indexOperationMessage for multi region execution - * @param indexOperationMessage - */ - public void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage ) { - - // don't try to produce something with nothing - if(indexOperationMessage.isEmpty()){ - return; - } - - final String jsonValue = ObjectJsonSerializer.INSTANCE.toString( indexOperationMessage ); - - final UUID newMessageId = UUIDGenerator.newTimeUUID(); - - final int expirationTimeInSeconds = - ( int ) TimeUnit.MILLISECONDS.toSeconds( indexProcessorFig.getIndexMessageTtl() ); - - //write to the map in ES - esMapPersistence.putString( newMessageId.toString(), jsonValue, expirationTimeInSeconds ); - - - - //now queue up the index message - - final ElasticsearchIndexEvent elasticsearchIndexEvent = - new ElasticsearchIndexEvent(queueFig.getPrimaryRegion(), newMessageId ); - - //send to the topic so all regions index the batch - - offerTopic( elasticsearchIndexEvent ); - } - - public void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){ - Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" ); - - final UUID messageId = elasticsearchIndexEvent.getIndexBatchId(); - - Preconditions.checkNotNull( messageId, "messageId must not be null" ); - - - //load the entity - - final String message = esMapPersistence.getString( messageId.toString() ); - - final IndexOperationMessage indexOperationMessage; - - if(message == null){ - logger.warn( "Received message with id {} to process, unable to find it, reading with higher consistency level", - messageId); - - final String highConsistency = esMapPersistence.getStringHighConsistency( messageId.toString() ); - - if(highConsistency == null){ - logger.error( "Unable to find the ES batch with id {} to process at a higher consistency level", - messageId); - - throw new RuntimeException( "Unable to find the ES batch to process with message id " + messageId ); - } - - indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( highConsistency, IndexOperationMessage.class ); - - } else{ - indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( message, IndexOperationMessage.class ); - } - - initializeEntityIndexes(indexOperationMessage); - - //NOTE that we intentionally do NOT delete from the map. We can't know when all regions have consumed the message - //so we'll let compaction on column expiration handle deletion - - //read the value from the string - - Preconditions.checkNotNull( indexOperationMessage, "indexOperationMessage cannot be null" ); - Preconditions.checkArgument( !indexOperationMessage.isEmpty() , "queued indexOperationMessage messages should not be empty" ); - - - //now execute it - indexProducer.put(indexOperationMessage).toBlocking().last(); - - } - - /** - * this method will call initialize for each message, since we are caching the entity indexes, - * we don't worry about aggregating by app id - * @param indexOperationMessage - */ - private void initializeEntityIndexes(final IndexOperationMessage indexOperationMessage) { - - // create a set so we can have a unique list of appIds for which we call createEntityIndex - Set<UUID> appIds = new HashSet<>(); - - // loop through all indexRequests and add the appIds to the set - indexOperationMessage.getIndexRequests().forEach(req -> { - UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId); - appIds.add(appId); - }); - - // loop through all deindexRequests and add the appIds to the set - indexOperationMessage.getDeIndexRequests().forEach(req -> { - UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId); - appIds.add(appId); - }); - - // for each of the appIds in the unique set, call create entity index to ensure the aliases are created - appIds.forEach(appId -> { - ApplicationScope appScope = CpNamingUtils.getApplicationScope(appId); - entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(appScope)); - } - ); - } - - - @Override - public long getQueueDepth() { - return queue.getQueueDepth(); - } - - public Observable<IndexOperationMessage> handleEntityDelete(final QueueMessage message) { - - Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityDelete"); - - final AsyncEvent event = (AsyncEvent) message.getBody(); - Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEntityDelete" ); - Preconditions.checkArgument( event instanceof EntityDeleteEvent, - String.format( "Event Type for handleEntityDelete must be ENTITY_DELETE, got %s", event.getClass() ) ); - - - final EntityDeleteEvent entityDeleteEvent = ( EntityDeleteEvent ) event; - final ApplicationScope applicationScope = entityDeleteEvent.getEntityIdScope().getApplicationScope(); - final Id entityId = entityDeleteEvent.getEntityIdScope().getId(); - - if (logger.isDebugEnabled()) - logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId); - - final EventBuilderImpl.EntityDeleteResults - entityDeleteResults = eventBuilder.buildEntityDelete( applicationScope, entityId ); - - - // Delete the entities and remove from graph separately - entityDeleteResults.getEntitiesDeleted().toBlocking().lastOrDefault(null); - - entityDeleteResults.getCompactedNode().toBlocking().lastOrDefault(null); - - return entityDeleteResults.getIndexObservable(); - } - - - public void handleInitializeApplicationIndex(final AsyncEvent event, final QueueMessage message) { - Preconditions.checkNotNull(message, "Queue Message cannot be null for handleInitializeApplicationIndex"); - Preconditions.checkArgument(event instanceof InitializeApplicationIndexEvent, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getClass())); - - final InitializeApplicationIndexEvent initializeApplicationIndexEvent = - ( InitializeApplicationIndexEvent ) event; - - final IndexLocationStrategy indexLocationStrategy = initializeApplicationIndexEvent.getIndexLocationStrategy(); - final EntityIndex index = entityIndexFactory.createEntityIndex( indexLocationStrategy ); - index.initialize(); - } - - /** - * Loop through and start the workers - */ - public void start() { - final int count = indexProcessorFig.getWorkerCount(); - - for (int i = 0; i < count; i++) { - startWorker(); - } - } - - - /** - * Stop the workers - */ - public void stop() { - synchronized (mutex) { - //stop consuming - - for (final Subscription subscription : subscriptions) { - subscription.unsubscribe(); - } - } - } - - - private void startWorker() { - synchronized (mutex) { - - Observable<List<QueueMessage>> consumer = - Observable.create( new Observable.OnSubscribe<List<QueueMessage>>() { - @Override - public void call( final Subscriber<? super List<QueueMessage>> subscriber ) { - - //name our thread so it's easy to see - Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() ); - - List<QueueMessage> drainList = null; - - do { - try { - drainList = take(); - //emit our list in it's entity to hand off to a worker pool - subscriber.onNext(drainList); - - //take since we're in flight - inFlight.addAndGet( drainList.size() ); - } - catch ( Throwable t ) { - final long sleepTime = indexProcessorFig.getFailureRetryTime(); - - logger.error( "Failed to dequeue. Sleeping for {} milliseconds", sleepTime, t ); - - if ( drainList != null ) { - inFlight.addAndGet( -1 * drainList.size() ); - } - - - try { - Thread.sleep( sleepTime ); - } - catch ( InterruptedException ie ) { - //swallow - } - - indexErrorCounter.inc(); - } - } - while ( true ); - } - } ) //this won't block our read loop, just reads and proceeds - .flatMap( sqsMessages -> { - - //do this on a different schedule, and introduce concurrency with flatmap for faster processing - return Observable.just( sqsMessages ) - - .map( messages -> { - if ( messages == null || messages.size() == 0 ) { - return null; - } - - try { - List<IndexEventResult> indexEventResults = - callEventHandlers( messages ); - List<QueueMessage> messagesToAck = - submitToIndex( indexEventResults ); - if ( messagesToAck == null || messagesToAck.size() == 0 ) { - logger.error( - "No messages came back from the queue operation, should have seen {} messages", - messages.size() ); - return messagesToAck; - } - if ( messagesToAck.size() < messages.size() ) { - logger.error( "Missing messages from queue post operation", - messages, messagesToAck ); - } - //ack each message, but only if we didn't error. - ack( messagesToAck ); - return messagesToAck; - } - catch ( Exception e ) { - logger.error( "failed to ack messages to sqs", e ); - return null; - //do not rethrow so we can process all of them - } - } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ); - //end flatMap - }, indexProcessorFig.getEventConcurrencyFactor() ); - - //start in the background - - final Subscription subscription = consumer.subscribeOn(Schedulers.newThread()).subscribe(); - - subscriptions.add(subscription); - } - } - - /** - * Submit results to index and return the queue messages to be ack'd - * @param indexEventResults - * @return - */ - private List<QueueMessage> submitToIndex( List<IndexEventResult> indexEventResults) { - //if nothing came back then return null - if(indexEventResults==null){ - return null; - } - - final IndexOperationMessage combined = new IndexOperationMessage(); - - //stream and filer the messages - List<QueueMessage> messagesToAck = indexEventResults.stream() - .map(indexEventResult -> { - //collect into the index submission - if (indexEventResult.getIndexOperationMessage().isPresent()) { - combined.ingest(indexEventResult.getIndexOperationMessage().get()); - } - return indexEventResult; - }) - //filter out the ones that need to be ack'd - .filter(indexEventResult -> indexEventResult.getQueueMessage().isPresent()) - .map(indexEventResult -> { - //record the cycle time - messageCycle.update(System.currentTimeMillis() - indexEventResult.getCreationTime()); - return indexEventResult; - }) - //ack after successful completion of the operation. - .map(result -> result.getQueueMessage().get()) - .collect(Collectors.toList()); - - queueIndexOperationMessage( combined ); - - return messagesToAck; - } - - public void index(final ApplicationScope applicationScope, final Id id, final long updatedSince) { - //change to id scope to avoid serialization issues - offer( new EntityIndexEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, id ), updatedSince ) ); - } - - public void indexBatch(final List<EdgeScope> edges, final long updatedSince) { - - List batch = new ArrayList<EdgeScope>(); - for ( EdgeScope e : edges){ - //change to id scope to avoid serialization issues - batch.add(new EntityIndexEvent(queueFig.getPrimaryRegion(), new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), updatedSince)); - } - offerBatch( batch ); - } - - - public class IndexEventResult{ - private final Optional<QueueMessage> queueMessage; - private final Optional<IndexOperationMessage> indexOperationMessage; - private final long creationTime; - - - public IndexEventResult(Optional<QueueMessage> queueMessage, Optional<IndexOperationMessage> indexOperationMessage, long creationTime){ - - this.queueMessage = queueMessage; - this.indexOperationMessage = indexOperationMessage; - - this.creationTime = creationTime; - } - - - public Optional<QueueMessage> getQueueMessage() { - return queueMessage; - } - - public Optional<IndexOperationMessage> getIndexOperationMessage() { - return indexOperationMessage; - } - - public long getCreationTime() { - return creationTime; - } - } - - -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java index dbf8996..288fb12 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java @@ -22,9 +22,7 @@ package org.apache.usergrid.corepersistence.asyncevents; import org.apache.usergrid.corepersistence.index.ReIndexAction; import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.entities.Application; import org.apache.usergrid.persistence.graph.Edge; -import org.apache.usergrid.persistence.index.IndexLocationStrategy; import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; @@ -48,8 +46,9 @@ public interface AsyncEventService extends ReIndexAction { * After SQS is removed, the tests should be enhanced to ensure that we're processing our queues correctly. * @param applicationScope * @param entity The entity to index. Should be fired when an entity is updated + * @param updatedAfter */ - void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity); + void queueEntityIndexUpdate(final ApplicationScope applicationScope, final Entity entity, long updatedAfter); /** http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java new file mode 100644 index 0000000..e101761 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java @@ -0,0 +1,798 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.asyncevents; + + +import java.io.IOException; +import java.io.Serializable; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.usergrid.persistence.index.impl.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.corepersistence.asyncevents.model.AsyncEvent; +import org.apache.usergrid.corepersistence.asyncevents.model.EdgeDeleteEvent; +import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent; +import org.apache.usergrid.corepersistence.asyncevents.model.EntityDeleteEvent; +import org.apache.usergrid.corepersistence.asyncevents.model.EntityIndexEvent; +import org.apache.usergrid.corepersistence.asyncevents.model.InitializeApplicationIndexEvent; +import org.apache.usergrid.corepersistence.index.EntityIndexOperation; +import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory; +import org.apache.usergrid.corepersistence.index.IndexProcessorFig; +import org.apache.usergrid.corepersistence.index.ReplicatedIndexLocationStrategy; +import org.apache.usergrid.corepersistence.rx.impl.EdgeScope; +import org.apache.usergrid.corepersistence.util.CpNamingUtils; +import org.apache.usergrid.corepersistence.util.ObjectJsonSerializer; +import org.apache.usergrid.persistence.collection.EntityCollectionManager; +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; +import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; +import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.graph.Edge; +import org.apache.usergrid.persistence.index.EntityIndex; +import org.apache.usergrid.persistence.index.EntityIndexFactory; +import org.apache.usergrid.persistence.index.IndexLocationStrategy; +import org.apache.usergrid.persistence.map.MapManager; +import org.apache.usergrid.persistence.map.MapManagerFactory; +import org.apache.usergrid.persistence.map.MapScope; +import org.apache.usergrid.persistence.map.impl.MapScopeImpl; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; +import org.apache.usergrid.persistence.model.util.UUIDGenerator; +import org.apache.usergrid.persistence.queue.QueueFig; +import org.apache.usergrid.persistence.queue.QueueManager; +import org.apache.usergrid.persistence.queue.QueueManagerFactory; +import org.apache.usergrid.persistence.queue.QueueMessage; +import org.apache.usergrid.persistence.queue.QueueScope; +import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Timer; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.inject.Inject; +import com.google.inject.Singleton; + +import rx.Observable; +import rx.Subscriber; +import rx.Subscription; +import rx.schedulers.Schedulers; + + +/** + * TODO, this whole class is becoming a nightmare. We need to remove all consume from this class and refactor it into the following manner. + * + * 1. Produce. Keep the code in the handle as is + * 2. Consume: Move the code into a refactored system + * 2.1 A central dispatcher + * 2.2 An interface that produces an observable of type BatchOperation. Any handler will be refactored into it's own + * impl that will then emit a stream of batch operations to perform + * 2.3 The central dispatcher will then subscribe to these events and merge them. Handing them off to a batch handler + * 2.4 The batch handler will roll up the operations into a batch size, and then queue them + * 2.5 The receive batch handler will execute the batch operations + * + * TODO determine how we error handle? + * + */ +@Singleton +public class AsyncEventServiceImpl implements AsyncEventService { + + + private static final Logger logger = LoggerFactory.getLogger(AsyncEventServiceImpl.class); + + // SQS maximum receive messages is 10 + public int MAX_TAKE = 10; + public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars + + private final QueueManager queue; + private final IndexProcessorFig indexProcessorFig; + private final QueueFig queueFig; + private final IndexProducer indexProducer; + private final EntityCollectionManagerFactory entityCollectionManagerFactory; + private final IndexLocationStrategyFactory indexLocationStrategyFactory; + private final EntityIndexFactory entityIndexFactory; + private final EventBuilder eventBuilder; + private final RxTaskScheduler rxTaskScheduler; + + private final Timer readTimer; + private final Timer writeTimer; + private final Timer ackTimer; + + /** + * This mutex is used to start/stop workers to ensure we're not concurrently modifying our subscriptions + */ + private final Object mutex = new Object(); + + private final Counter indexErrorCounter; + private final AtomicLong counter = new AtomicLong(); + private final AtomicLong inFlight = new AtomicLong(); + private final Histogram messageCycle; + private final MapManager esMapPersistence; + + //the actively running subscription + private List<Subscription> subscriptions = new ArrayList<>(); + + + @Inject + public AsyncEventServiceImpl(final QueueManagerFactory queueManagerFactory, + final IndexProcessorFig indexProcessorFig, + final IndexProducer indexProducer, + final MetricsFactory metricsFactory, + final EntityCollectionManagerFactory entityCollectionManagerFactory, + final IndexLocationStrategyFactory indexLocationStrategyFactory, + final EntityIndexFactory entityIndexFactory, + final EventBuilder eventBuilder, + final MapManagerFactory mapManagerFactory, + final QueueFig queueFig, + @EventExecutionScheduler + final RxTaskScheduler rxTaskScheduler ) { + this.indexProducer = indexProducer; + + this.entityCollectionManagerFactory = entityCollectionManagerFactory; + this.indexLocationStrategyFactory = indexLocationStrategyFactory; + this.entityIndexFactory = entityIndexFactory; + this.eventBuilder = eventBuilder; + + final MapScope mapScope = new MapScopeImpl( CpNamingUtils.getManagementApplicationId(), "indexEvents"); + + this.esMapPersistence = mapManagerFactory.createMapManager( mapScope ); + + this.rxTaskScheduler = rxTaskScheduler; + + QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL); + this.queue = queueManagerFactory.getQueueManager(queueScope); + + this.indexProcessorFig = indexProcessorFig; + this.queueFig = queueFig; + + this.writeTimer = metricsFactory.getTimer(AsyncEventServiceImpl.class, "async_event.write"); + this.readTimer = metricsFactory.getTimer(AsyncEventServiceImpl.class, "async_event.read"); + this.ackTimer = metricsFactory.getTimer(AsyncEventServiceImpl.class, "async_event.ack"); + this.indexErrorCounter = metricsFactory.getCounter(AsyncEventServiceImpl.class, "async_event.error"); + this.messageCycle = metricsFactory.getHistogram(AsyncEventServiceImpl.class, "async_event.message_cycle"); + + + //wire up the gauge of inflight message + metricsFactory.addGauge(AsyncEventServiceImpl.class, "async-event.inflight", new Gauge<Long>() { + @Override + public Long getValue() { + return inFlight.longValue(); + } + }); + + start(); + } + + + /** + * Offer the EntityIdScope to SQS + */ + private void offer(final Serializable operation) { + final Timer.Context timer = this.writeTimer.time(); + + try { + //signal to SQS + this.queue.sendMessage( operation ); + } catch (IOException e) { + throw new RuntimeException("Unable to queue message", e); + } finally { + timer.stop(); + } + } + + + private void offerTopic( final Serializable operation ) { + final Timer.Context timer = this.writeTimer.time(); + + try { + //signal to SQS + this.queue.sendMessageToTopic( operation ); + } + catch ( IOException e ) { + throw new RuntimeException( "Unable to queue message", e ); + } + finally { + timer.stop(); + } + } + + private void offerBatch(final List operations){ + final Timer.Context timer = this.writeTimer.time(); + + try { + //signal to SQS + this.queue.sendMessages(operations); + } catch (IOException e) { + throw new RuntimeException("Unable to queue message", e); + } finally { + timer.stop(); + } + } + + + /** + * Take message from SQS + */ + private List<QueueMessage> take() { + + final Timer.Context timer = this.readTimer.time(); + + try { + return queue.getMessages(MAX_TAKE, + indexProcessorFig.getIndexQueueVisibilityTimeout(), + indexProcessorFig.getIndexQueueTimeout(), + AsyncEvent.class); + } + //stop our timer + finally { + timer.stop(); + } + } + + + + /** + * Ack message in SQS + */ + public void ack(final List<QueueMessage> messages) { + + final Timer.Context timer = this.ackTimer.time(); + + try{ + queue.commitMessages( messages ); + + //decrement our in-flight counter + inFlight.decrementAndGet(); + + }catch(Exception e){ + throw new RuntimeException("Unable to ack messages", e); + }finally { + timer.stop(); + } + + + } + + /** + * calls the event handlers and returns a result with information on whether it needs to be ack'd and whether it needs to be indexed + * @param messages + * @return + */ + private List<IndexEventResult> callEventHandlers(final List<QueueMessage> messages) { + + if (logger.isDebugEnabled()) { + logger.debug("callEventHandlers with {} message", messages.size()); + } + + Stream<IndexEventResult> indexEventResults = messages.parallelStream().map(message -> + + { + AsyncEvent event = null; + try { + event = (AsyncEvent) message.getBody(); + + } catch (ClassCastException cce) { + logger.error("Failed to deserialize message body", cce); + return new IndexEventResult(Optional.absent(), System.currentTimeMillis()); + } + + if (event == null) { + logger.error("AsyncEvent type or event is null!"); + return new IndexEventResult(Optional.absent(), System.currentTimeMillis()); + } + + final AsyncEvent thisEvent = event; + + if (logger.isDebugEnabled()) { + logger.debug("Processing {} event", event); + } + + try { + + // deletes are 2-part, actual IO to delete data, then queue up a de-index + if ( event instanceof EdgeDeleteEvent ) { + + handleEdgeDelete( message ); + } + // deletes are 2-part, actual IO to delete data, then queue up a de-index + else if ( event instanceof EntityDeleteEvent ) { + + handleEntityDelete( message ); + } + // application initialization has special logic, therefore a special event type + else if ( event instanceof InitializeApplicationIndexEvent ) { + + handleInitializeApplicationIndex(event, message); + } + // this is the main event that pulls the index doc from map persistence and hands to the index producer + else if (event instanceof ElasticsearchIndexEvent) { + + handleIndexOperation((ElasticsearchIndexEvent) event); + + } else { + + throw new Exception("Unknown EventType for message: "+ message.getStringBody()); + } + + + //return type that can be indexed and ack'd later + return new IndexEventResult(Optional.of(message), thisEvent.getCreationTime()); + + } catch (IndexDocNotFoundException e){ + + // this exception is throw when we wait before trying quorum read on map persistence. + // return empty event result so the event's message doesn't get ack'd + logger.info(e.getMessage()); + return new IndexEventResult(Optional.absent(), event.getCreationTime()); + + } catch (Exception e) { + + // if the event fails to process, log the message and return empty event result so it doesn't get ack'd + logger.error("Failed to process message: {} {}", message.getMessageId(), message.getStringBody(), e); + return new IndexEventResult(Optional.absent(), event.getCreationTime()); + } + }); + + + return indexEventResults.collect(Collectors.toList()); + } + + @Override + public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) { + IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy( + applicationScope); + offerTopic( new InitializeApplicationIndexEvent( queueFig.getPrimaryRegion(), + new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ) ); + } + + + @Override + public void queueEntityIndexUpdate(final ApplicationScope applicationScope, + final Entity entity, long updatedAfter) { + + + final EntityIndexOperation entityIndexOperation = + new EntityIndexOperation( applicationScope, entity.getId(), updatedAfter); + + final IndexOperationMessage indexMessage = + eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null); + + queueIndexOperationMessage( indexMessage ); + + } + + + @Override + public void queueNewEdge(final ApplicationScope applicationScope, + final Entity entity, + final Edge newEdge) { + + final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope ); + + final IndexOperationMessage indexMessage = ecm.load( entity.getId() ) + .flatMap( loadedEntity -> eventBuilder.buildNewEdge(applicationScope, entity, newEdge) ) + .toBlocking().lastOrDefault(null); + + queueIndexOperationMessage( indexMessage ); + + } + + + @Override + public void queueDeleteEdge(final ApplicationScope applicationScope, + final Edge edge) { + + offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ) ); + } + + public void handleEdgeDelete(final QueueMessage message) { + + Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEdgeDelete" ); + + final AsyncEvent event = (AsyncEvent) message.getBody(); + + Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEdgeDelete" ); + Preconditions.checkArgument(event instanceof EdgeDeleteEvent, String.format("Event Type for handleEdgeDelete must be EDGE_DELETE, got %s", event.getClass())); + + + final EdgeDeleteEvent edgeDeleteEvent = ( EdgeDeleteEvent ) event; + + final ApplicationScope applicationScope = edgeDeleteEvent.getApplicationScope(); + final Edge edge = edgeDeleteEvent.getEdge(); + + if (logger.isDebugEnabled()) { + logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge); + } + + IndexOperationMessage indexMessage = + eventBuilder.buildDeleteEdge(applicationScope, edge).toBlocking().lastOrDefault(null); + + queueIndexOperationMessage(indexMessage); + + } + + + + /** + * Queue up an indexOperationMessage for multi region execution + * @param indexOperationMessage + */ + public void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage ) { + + // don't try to produce something with nothing + if(indexOperationMessage == null || indexOperationMessage.isEmpty()){ + return; + } + + final String jsonValue = ObjectJsonSerializer.INSTANCE.toString( indexOperationMessage ); + + final UUID newMessageId = UUIDGenerator.newTimeUUID(); + + final int expirationTimeInSeconds = + ( int ) TimeUnit.MILLISECONDS.toSeconds( indexProcessorFig.getIndexMessageTtl() ); + + //write to the map in ES + esMapPersistence.putString( newMessageId.toString(), jsonValue, expirationTimeInSeconds ); + + + + //now queue up the index message + + final ElasticsearchIndexEvent elasticsearchIndexEvent = + new ElasticsearchIndexEvent(queueFig.getPrimaryRegion(), newMessageId ); + + //send to the topic so all regions index the batch + + offerTopic( elasticsearchIndexEvent ); + } + + public void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){ + Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" ); + + final UUID messageId = elasticsearchIndexEvent.getIndexBatchId(); + + Preconditions.checkNotNull( messageId, "messageId must not be null" ); + + + //load the entity + + final String message = esMapPersistence.getString( messageId.toString() ); + + final IndexOperationMessage indexOperationMessage; + + if(message == null) { + + if ( System.currentTimeMillis() > elasticsearchIndexEvent.getCreationTime() + queueFig.getLocalQuorumTimeout() ) { + + logger.warn("Received message with id {} to process, unable to find it, reading with higher consistency level", + messageId); + + final String highConsistency = esMapPersistence.getStringHighConsistency(messageId.toString()); + + if (highConsistency == null) { + logger.error("Unable to find the ES batch with id {} to process at a higher consistency level", + messageId); + + throw new RuntimeException("Unable to find the ES batch to process with message id " + messageId); + } + + indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString(highConsistency, IndexOperationMessage.class); + + } else{ + + throw new IndexDocNotFoundException(elasticsearchIndexEvent.getIndexBatchId()); + + } + + } else{ + indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( message, IndexOperationMessage.class ); + } + + initializeEntityIndexes(indexOperationMessage); + + //NOTE that we intentionally do NOT delete from the map. We can't know when all regions have consumed the message + //so we'll let compaction on column expiration handle deletion + + //read the value from the string + + Preconditions.checkNotNull( indexOperationMessage, "indexOperationMessage cannot be null" ); + Preconditions.checkArgument( !indexOperationMessage.isEmpty() , "queued indexOperationMessage messages should not be empty" ); + + + //now execute it + indexProducer.put(indexOperationMessage).toBlocking().last(); + + } + + /** + * this method will call initialize for each message, since we are caching the entity indexes, + * we don't worry about aggregating by app id + * @param indexOperationMessage + */ + private void initializeEntityIndexes(final IndexOperationMessage indexOperationMessage) { + + // create a set so we can have a unique list of appIds for which we call createEntityIndex + Set<UUID> appIds = new HashSet<>(); + + // loop through all indexRequests and add the appIds to the set + indexOperationMessage.getIndexRequests().forEach(req -> { + UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId); + appIds.add(appId); + }); + + // loop through all deindexRequests and add the appIds to the set + indexOperationMessage.getDeIndexRequests().forEach(req -> { + UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId); + appIds.add(appId); + }); + + // for each of the appIds in the unique set, call create entity index to ensure the aliases are created + appIds.forEach(appId -> { + ApplicationScope appScope = CpNamingUtils.getApplicationScope(appId); + entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(appScope)); + } + ); + } + + + @Override + public long getQueueDepth() { + return queue.getQueueDepth(); + } + + @Override + public void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId) { + + offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) ); + } + + public void handleEntityDelete(final QueueMessage message) { + + Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityDelete"); + + final AsyncEvent event = (AsyncEvent) message.getBody(); + Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEntityDelete" ); + Preconditions.checkArgument( event instanceof EntityDeleteEvent, + String.format( "Event Type for handleEntityDelete must be ENTITY_DELETE, got %s", event.getClass() ) ); + + + final EntityDeleteEvent entityDeleteEvent = ( EntityDeleteEvent ) event; + final ApplicationScope applicationScope = entityDeleteEvent.getEntityIdScope().getApplicationScope(); + final Id entityId = entityDeleteEvent.getEntityIdScope().getId(); + + if (logger.isDebugEnabled()) + logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId); + + final EventBuilderImpl.EntityDeleteResults + entityDeleteResults = eventBuilder.buildEntityDelete( applicationScope, entityId ); + + + // Delete the entities and remove from graph separately + entityDeleteResults.getEntitiesDeleted().toBlocking().lastOrDefault(null); + + entityDeleteResults.getCompactedNode().toBlocking().lastOrDefault(null); + + IndexOperationMessage indexMessage = entityDeleteResults.getIndexObservable().toBlocking().lastOrDefault(null); + + queueIndexOperationMessage(indexMessage); + + } + + + public void handleInitializeApplicationIndex(final AsyncEvent event, final QueueMessage message) { + Preconditions.checkNotNull(message, "Queue Message cannot be null for handleInitializeApplicationIndex"); + Preconditions.checkArgument(event instanceof InitializeApplicationIndexEvent, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getClass())); + + final InitializeApplicationIndexEvent initializeApplicationIndexEvent = + ( InitializeApplicationIndexEvent ) event; + + final IndexLocationStrategy indexLocationStrategy = initializeApplicationIndexEvent.getIndexLocationStrategy(); + final EntityIndex index = entityIndexFactory.createEntityIndex( indexLocationStrategy ); + index.initialize(); + } + + /** + * Loop through and start the workers + */ + public void start() { + final int count = indexProcessorFig.getWorkerCount(); + + for (int i = 0; i < count; i++) { + startWorker(); + } + } + + + /** + * Stop the workers + */ + public void stop() { + synchronized (mutex) { + //stop consuming + + for (final Subscription subscription : subscriptions) { + subscription.unsubscribe(); + } + } + } + + + private void startWorker() { + synchronized (mutex) { + + Observable<List<QueueMessage>> consumer = + Observable.create( new Observable.OnSubscribe<List<QueueMessage>>() { + @Override + public void call( final Subscriber<? super List<QueueMessage>> subscriber ) { + + //name our thread so it's easy to see + Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() ); + + List<QueueMessage> drainList = null; + + do { + try { + drainList = take(); + //emit our list in it's entity to hand off to a worker pool + subscriber.onNext(drainList); + + //take since we're in flight + inFlight.addAndGet( drainList.size() ); + } + catch ( Throwable t ) { + final long sleepTime = indexProcessorFig.getFailureRetryTime(); + + logger.error( "Failed to dequeue. Sleeping for {} milliseconds", sleepTime, t ); + + if ( drainList != null ) { + inFlight.addAndGet( -1 * drainList.size() ); + } + + + try { + Thread.sleep( sleepTime ); + } + catch ( InterruptedException ie ) { + //swallow + } + + indexErrorCounter.inc(); + } + } + while ( true ); + } + } ) //this won't block our read loop, just reads and proceeds + .flatMap( sqsMessages -> { + + //do this on a different schedule, and introduce concurrency with flatmap for faster processing + return Observable.just( sqsMessages ) + + .map( messages -> { + if ( messages == null || messages.size() == 0 ) { + return null; + } + + try { + List<IndexEventResult> indexEventResults = callEventHandlers( messages ); + List<QueueMessage> messagesToAck = ackMessages( indexEventResults ); + + if ( messagesToAck == null || messagesToAck.size() == 0 ) { + logger.error( + "No messages came back from the queue operation, should have seen {} messages", + messages.size() ); + return messagesToAck; + } + + if ( messagesToAck.size() < messages.size() ) { + logger.error( "Missing messages from queue post operation", + messages, messagesToAck ); + } + //ack each message, but only if we didn't error. + ack( messagesToAck ); + return messagesToAck; + } + catch ( Exception e ) { + logger.error( "failed to ack messages to sqs", e ); + return null; + //do not rethrow so we can process all of them + } + } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ); + //end flatMap + }, indexProcessorFig.getEventConcurrencyFactor() ); + + //start in the background + + final Subscription subscription = consumer.subscribeOn(Schedulers.newThread()).subscribe(); + + subscriptions.add(subscription); + } + } + + /** + * Submit results to index and return the queue messages to be ack'd + * @param indexEventResults + * @return + */ + private List<QueueMessage> ackMessages(List<IndexEventResult> indexEventResults) { + //if nothing came back then return null + if(indexEventResults==null){ + return null; + } + + // stream the messages to record the cycle time + return indexEventResults.stream() + .map(indexEventResult -> { + //record the cycle time + messageCycle.update(System.currentTimeMillis() - indexEventResult.getCreationTime()); + return indexEventResult; + }) + // filter out messages that are not present, they were not processed and put into the results + .filter( result -> result.getQueueMessage().isPresent() ) + .map(result -> result.getQueueMessage().get()) + // collect + .collect(Collectors.toList()); + } + + public void index(final ApplicationScope applicationScope, final Id id, final long updatedSince) { + //change to id scope to avoid serialization issues + offer( new EntityIndexEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, id ), updatedSince ) ); + } + + public void indexBatch(final List<EdgeScope> edges, final long updatedSince) { + + List batch = new ArrayList<EdgeScope>(); + for ( EdgeScope e : edges){ + //change to id scope to avoid serialization issues + batch.add(new EntityIndexEvent(queueFig.getPrimaryRegion(), new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), updatedSince)); + } + offerBatch( batch ); + } + + + public class IndexEventResult{ + private final Optional<QueueMessage> queueMessage; + private final long creationTime; + + public IndexEventResult(Optional<QueueMessage> queueMessage, long creationTime){ + + this.queueMessage = queueMessage; + this.creationTime = creationTime; + } + + + public Optional<QueueMessage> getQueueMessage() { + return queueMessage; + } + + public long getCreationTime() { + return creationTime; + } + } + + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java index 96da2df..abd4ce1 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java @@ -104,14 +104,14 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> { switch (impl) { case LOCAL: - AmazonAsyncEventService eventService = new AmazonAsyncEventService(scope -> new LocalQueueManager(), indexProcessorFig, indexProducer, metricsFactory, + AsyncEventServiceImpl eventService = new AsyncEventServiceImpl(scope -> new LocalQueueManager(), indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder,mapManagerFactory, queueFig,rxTaskScheduler); eventService.MAX_TAKE = 1000; return eventService; case SQS: throw new IllegalArgumentException("Configuration value of SQS is no longer allowed. Use SNS instead with only a single region"); case SNS: - return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, + return new AsyncEventServiceImpl(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler ); default: throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed"); http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java index 480756f..a47ec77 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java @@ -39,14 +39,6 @@ import rx.Observable; public interface EventBuilder { /** - * Return the cold observable of entity index update operations - * @param applicationScope - * @param entity - * @return - */ - Observable<IndexOperationMessage> buildEntityIndexUpdate( ApplicationScope applicationScope, Entity entity ); - - /** * Return the cold observable of the new edge operation * @param applicationScope * @param entity @@ -69,7 +61,9 @@ public interface EventBuilder { * @param entityId * @return */ - EventBuilderImpl.EntityDeleteResults buildEntityDelete( ApplicationScope applicationScope, Id entityId ); + EntityDeleteResults buildEntityDelete(ApplicationScope applicationScope, Id entityId ); + + /** * Re-index an entity in the scope provided http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java index 4e476db..2edc668 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java @@ -73,19 +73,6 @@ public class EventBuilderImpl implements EventBuilder { } - @Override - public Observable<IndexOperationMessage> buildEntityIndexUpdate( final ApplicationScope applicationScope, - final Entity entity ) { - //process the entity immediately - //only process the same version, otherwise ignore - - if (logger.isDebugEnabled()) { - logger.debug("Indexing in app scope {} entity {}", entity, applicationScope); - } - - return indexService.indexEntity( applicationScope, entity ); - } - @Override public Observable<IndexOperationMessage> buildNewEdge( final ApplicationScope applicationScope, final Entity entity, @@ -118,7 +105,7 @@ public class EventBuilderImpl implements EventBuilder { //it'll need to be pushed up higher so we can do the marking that isn't async or does it not matter? @Override - public EntityDeleteResults buildEntityDelete( final ApplicationScope applicationScope, final Id entityId ) { + public EntityDeleteResults buildEntityDelete(final ApplicationScope applicationScope, final Id entityId ) { if (logger.isDebugEnabled()) { logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/IndexDocNotFoundException.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/IndexDocNotFoundException.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/IndexDocNotFoundException.java new file mode 100644 index 0000000..c0e022f --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/IndexDocNotFoundException.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.asyncevents; + + +import java.util.UUID; + +public class IndexDocNotFoundException extends RuntimeException { + + final UUID batchId; + + public IndexDocNotFoundException(final UUID batchId){ + + super("Index batch ID "+batchId.toString()+" not found in map persistence"); + this.batchId = batchId; + + + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java index 68c398f..7512c90 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java @@ -22,6 +22,7 @@ package org.apache.usergrid.corepersistence.index; import java.util.Iterator; import java.util.UUID; +import java.util.concurrent.TimeUnit; import org.apache.usergrid.persistence.index.*; import org.apache.usergrid.utils.UUIDUtils; @@ -104,7 +105,8 @@ public class IndexServiceImpl implements IndexService { //do our observable for batching //try to send a whole batch if we can - final Observable<IndexOperationMessage> batches = sourceEdgesToIndex.buffer( indexFig.getIndexBatchSize() ) + final Observable<IndexOperationMessage> batches = sourceEdgesToIndex + .buffer( indexFig.getIndexBatchSize(), 1000, TimeUnit.MILLISECONDS ) //map into batches based on our buffer size .flatMap( buffer -> Observable.from( buffer ) http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java index bf444b5..d47e96c 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java @@ -230,10 +230,16 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate //entity is newer than ES version, could be an update or the entity is marked as deleted - if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 || !entity.getEntity().isPresent()) { + if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 || + !entity.getEntity().isPresent() || + entity.getStatus() == MvccEntity.Status.DELETED ) { - logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}", + // when updating entities, we don't delete previous versions from ES so this action is expected + if(logger.isDebugEnabled()){ + logger.debug( "Deindexing stale entity on edge {} for entityId {} and version {}", searchEdge, entityId, entityVersion); + } + batch.deindex( searchEdge, entityId, candidateVersion ); return; }
