Improvements around async event processing.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/84357831 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/84357831 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/84357831 Branch: refs/heads/master Commit: 8435783104a5e01bdc0c6b257ebe29eac71765a3 Parents: 36a0161 Author: Michael Russo <[email protected]> Authored: Thu Mar 10 20:44:40 2016 -0800 Committer: Michael Russo <[email protected]> Committed: Thu Mar 10 20:44:40 2016 -0800 ---------------------------------------------------------------------- .../asyncevents/AsyncEventServiceImpl.java | 103 ++++++++++--------- .../usergrid/persistence/queue/QueueFig.java | 2 +- .../persistence/queue/QueueMessage.java | 10 ++ .../queue/impl/SNSQueueManagerImpl.java | 7 ++ .../resources/usergrid-rest-deploy-context.xml | 29 ++++-- 5 files changed, 89 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/84357831/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java index 200d1e6..4d78340 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java @@ -282,12 +282,18 @@ public class AsyncEventServiceImpl implements AsyncEventService { private List<IndexEventResult> callEventHandlers(final List<QueueMessage> messages) { if (logger.isDebugEnabled()) { - logger.debug("callEventHandlers with {} message", messages.size()); + logger.debug("callEventHandlers with {} message(s)", messages.size()); } Stream<IndexEventResult> indexEventResults = messages.stream().map(message -> { + if(logger.isDebugEnabled()){ + logger.debug("Queue message with ID {} has been received {} time(s)", + message.getMessageId(), + message.getReceiveCount() ); + } + AsyncEvent event = null; try { event = (AsyncEvent) message.getBody(); @@ -305,7 +311,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { final AsyncEvent thisEvent = event; if (logger.isDebugEnabled()) { - logger.debug("Processing {} event", event); + logger.debug("Processing event with type {}", event.getClass().getSimpleName()); } IndexOperationMessage indexOperationMessage = null; @@ -333,7 +339,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { } else { - throw new Exception("Unknown EventType for message: "+ message.getStringBody()); + throw new Exception("Unknown EventType for message: "+ message.getStringBody().trim()); } @@ -345,13 +351,15 @@ public class AsyncEventServiceImpl implements AsyncEventService { // this exception is throw when we wait before trying quorum read on map persistence. // return empty event result so the event's message doesn't get ack'd - logger.info(e.getMessage()); + if(logger.isDebugEnabled()){ + logger.debug(e.getMessage()); + } return new IndexEventResult(Optional.absent(), Optional.absent(), event.getCreationTime()); } catch (Exception e) { // if the event fails to process, log and return empty message result so it doesn't get ack'd - logger.error("Failed to process message: {} {}", message.getMessageId(), message.getStringBody(), e); + logger.error("{}. Failed to process message: {}", e.getMessage(), message.getStringBody().trim() ); return new IndexEventResult(Optional.absent(), Optional.absent(), event.getCreationTime()); } }); @@ -471,59 +479,49 @@ public class AsyncEventServiceImpl implements AsyncEventService { } public IndexOperationMessage handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){ - Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" ); - final UUID messageId = elasticsearchIndexEvent.getIndexBatchId(); + Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" ); + final UUID messageId = elasticsearchIndexEvent.getIndexBatchId(); Preconditions.checkNotNull( messageId, "messageId must not be null" ); - //load the entity - final String message = esMapPersistence.getString( messageId.toString() ); - final IndexOperationMessage indexOperationMessage; + final IndexOperationMessage indexOperationMessage; if(message == null) { - if ( System.currentTimeMillis() > elasticsearchIndexEvent.getCreationTime() + queueFig.getLocalQuorumTimeout() ) { + // provide some time back pressure before performing a quorum read + if ( System.currentTimeMillis() > elasticsearchIndexEvent.getCreationTime() + queueFig.getLocalQuorumTimeout() ) { - logger.warn("Received message with id {} to process, unable to find it, reading with higher consistency level", - messageId); - - final String highConsistency = esMapPersistence.getStringHighConsistency(messageId.toString()); + if(logger.isDebugEnabled()){ + logger.debug("ES batch with id {} not found, reading with strong consistency", messageId); + } - if (highConsistency == null) { - logger.error("Unable to find the ES batch with id {} to process at a higher consistency level", - messageId); + final String highConsistency = esMapPersistence.getStringHighConsistency(messageId.toString()); + if (highConsistency == null) { - throw new RuntimeException("Unable to find the ES batch to process with message id " + messageId); + throw new RuntimeException("ES batch with id "+messageId+" not found when reading with strong consistency"); } indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString(highConsistency, IndexOperationMessage.class); - } else{ + } else { throw new IndexDocNotFoundException(elasticsearchIndexEvent.getIndexBatchId()); } - } else{ + } else { + indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( message, IndexOperationMessage.class ); } - initializeEntityIndexes(indexOperationMessage); - - //NOTE that we intentionally do NOT delete from the map. We can't know when all regions have consumed the message - //so we'll let compaction on column expiration handle deletion - - //read the value from the string - - Preconditions.checkNotNull( indexOperationMessage, "indexOperationMessage cannot be null" ); - Preconditions.checkArgument( !indexOperationMessage.isEmpty() , "queued indexOperationMessage messages should not be empty" ); + // always do a check to ensure the indexes are initialized for the index requests + initializeEntityIndexes(indexOperationMessage); - //now execute it return indexOperationMessage; } @@ -694,30 +692,31 @@ public class AsyncEventServiceImpl implements AsyncEventService { .map( messages -> { if ( messages == null || messages.size() == 0 ) { + // no messages came from the queue, move on return null; } try { + // process the messages List<IndexEventResult> indexEventResults = callEventHandlers( messages ); + + // submit the processed messages to index producer List<QueueMessage> messagesToAck = submitToIndex( indexEventResults ); - if ( messagesToAck == null || messagesToAck.size() == 0 ) { - logger.error( - "No messages came back from the queue operation, should have seen {} messages", - messages.size() ); - return messagesToAck; + if ( messagesToAck.size() < messages.size() ) { + logger.warn( "Missing {} message(s) from index processing", + messages.size() - messagesToAck.size() ); } - if ( messagesToAck.size() < messages.size() ) { - logger.error( "Missing messages from queue post operation", - messages, messagesToAck ); + // ack each message if making it to this point + if( messagesToAck.size() > 0 ){ + ack( messagesToAck ); } - //ack each message, but only if we didn't error. - ack( messagesToAck ); + return messagesToAck; } catch ( Exception e ) { - logger.error( "failed to ack messages to sqs", e ); + logger.error( "Failed to ack messages", e ); return null; //do not rethrow so we can process all of them } @@ -739,26 +738,30 @@ public class AsyncEventServiceImpl implements AsyncEventService { * @return */ private List<QueueMessage> submitToIndex(List<IndexEventResult> indexEventResults) { - //if nothing came back then return null + + // if nothing came back then return null if(indexEventResults==null){ return null; } - IndexOperationMessage combined = new IndexOperationMessage(); - // stream the messages to record the cycle time + IndexOperationMessage combined = new IndexOperationMessage(); List<QueueMessage> queueMessages = indexEventResults.stream() + + // filter out messages that are not present, they were not processed and put into the results + .filter( result -> result.getQueueMessage().isPresent() ) .map(indexEventResult -> { + //record the cycle time messageCycle.update(System.currentTimeMillis() - indexEventResult.getCreationTime()); + + // ingest each index op into our combined, single index op for the index producer if(indexEventResult.getIndexOperationMessage().isPresent()){ combined.ingest(indexEventResult.getIndexOperationMessage().get()); } - return indexEventResult; + + return indexEventResult.getQueueMessage().get(); }) - // filter out messages that are not present, they were not processed and put into the results - .filter( result -> result.getQueueMessage().isPresent() ) - .map(result -> result.getQueueMessage().get()) - // collect + // collect into a list of QueueMessages that can be ack'd later .collect(Collectors.toList()); // sumbit the requests to Elasticsearch http://git-wip-us.apache.org/repos/asf/usergrid/blob/84357831/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java index a7d299e..ca6e011 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java @@ -84,7 +84,7 @@ public interface QueueFig extends GuicyFig { int getVisibilityTimeout(); @Key( "usergrid.queue.localquorum.timeout") - @Default("5000") // 5 seconds + @Default("30000") // 30 seconds int getLocalQuorumTimeout(); @Key( "usergrid.queue.client.connection.timeout") http://git-wip-us.apache.org/repos/asf/usergrid/blob/84357831/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java index 55f79f4..f8ce6ef 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java @@ -23,6 +23,7 @@ public class QueueMessage { private final String handle; private final String type; private String stringBody; + private int receiveCount; public QueueMessage(String messageId, String handle, Object body,String type) { @@ -31,6 +32,7 @@ public class QueueMessage { this.handle = handle; this.type = type; this.stringBody = ""; + this.receiveCount = 1; // we'll always receive once if we're taking it off the in mem or AWS queue } public String getHandle() { @@ -57,4 +59,12 @@ public class QueueMessage { public String getStringBody() { return stringBody; } + + public void setReceiveCount(int receiveCount){ + this.receiveCount = receiveCount; + } + + public int getReceiveCount(){ + return receiveCount; + } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/84357831/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java index 6c78035..0be5bd0 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java @@ -427,7 +427,12 @@ public class SNSQueueManagerImpl implements QueueManager { logger.trace( "Getting up to {} messages from {}", limit, url ); } + ArrayList<String> requestMessageAttributeNames = new ArrayList<String>(1); + requestMessageAttributeNames.add("ApproximateReceiveCount"); + + ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest( url ); + receiveMessageRequest.setAttributeNames(requestMessageAttributeNames); receiveMessageRequest.setMaxNumberOfMessages( limit ); receiveMessageRequest.setVisibilityTimeout( Math.max( MIN_VISIBILITY_TIMEOUT, fig.getVisibilityTimeout() / 1000 ) ); @@ -477,6 +482,8 @@ public class SNSQueueManagerImpl implements QueueManager { QueueMessage queueMessage = new QueueMessage( message.getMessageId(), message.getReceiptHandle(), payload, message.getAttributes().get( "type" ) ); queueMessage.setStringBody( originalBody ); + int receiveCount = Integer.valueOf(message.getAttributes().get("ApproximateReceiveCount")); + queueMessage.setReceiveCount( receiveCount ); queueMessages.add( queueMessage ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/84357831/stack/rest/src/main/resources/usergrid-rest-deploy-context.xml ---------------------------------------------------------------------- diff --git a/stack/rest/src/main/resources/usergrid-rest-deploy-context.xml b/stack/rest/src/main/resources/usergrid-rest-deploy-context.xml index 4438bbd..012f0c8 100644 --- a/stack/rest/src/main/resources/usergrid-rest-deploy-context.xml +++ b/stack/rest/src/main/resources/usergrid-rest-deploy-context.xml @@ -26,16 +26,23 @@ <import resource="classpath:/usergrid-rest-context.xml" /> - <bean id="properties" - class="org.springframework.beans.factory.config.PropertiesFactoryBean"> - <property name="singleton" value="true" /> - <property name="ignoreResourceNotFound" value="true" /> - <property name="locations"> - <list> - <value>classpath:/usergrid-default.properties</value> - <value>classpath:/usergrid-deployment.properties</value> - </list> - </property> - </bean> + <bean id="properties" + class="org.springframework.beans.factory.config.PropertiesFactoryBean"> + <property name="properties" ref="sysProps"/> + <property name="localOverride" value="true"/> + <property name="singleton" value="true" /> + <property name="ignoreResourceNotFound" value="true" /> + <property name="locations"> + <list> + <value>classpath:/usergrid-default.properties</value> + <value>classpath:/usergrid-deployment.properties</value> + </list> + </property> + </bean> + + <bean id="sysProps" class="org.springframework.beans.factory.config.MethodInvokingFactoryBean"> + <property name="targetClass"><value>java.lang.System</value></property> + <property name="targetMethod"><value>getProperties</value></property> + </bean> </beans>
