Repository: usergrid
Updated Branches:
  refs/heads/release-2.1.1 36a01617c -> 843578310


Improvements around async event processing.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/84357831
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/84357831
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/84357831

Branch: refs/heads/release-2.1.1
Commit: 8435783104a5e01bdc0c6b257ebe29eac71765a3
Parents: 36a0161
Author: Michael Russo <[email protected]>
Authored: Thu Mar 10 20:44:40 2016 -0800
Committer: Michael Russo <[email protected]>
Committed: Thu Mar 10 20:44:40 2016 -0800

----------------------------------------------------------------------
 .../asyncevents/AsyncEventServiceImpl.java      | 103 ++++++++++---------
 .../usergrid/persistence/queue/QueueFig.java    |   2 +-
 .../persistence/queue/QueueMessage.java         |  10 ++
 .../queue/impl/SNSQueueManagerImpl.java         |   7 ++
 .../resources/usergrid-rest-deploy-context.xml  |  29 ++++--
 5 files changed, 89 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/84357831/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 200d1e6..4d78340 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -282,12 +282,18 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
     private List<IndexEventResult> callEventHandlers(final List<QueueMessage> 
messages) {
 
         if (logger.isDebugEnabled()) {
-            logger.debug("callEventHandlers with {} message", messages.size());
+            logger.debug("callEventHandlers with {} message(s)", 
messages.size());
         }
 
         Stream<IndexEventResult> indexEventResults = 
messages.stream().map(message ->
 
         {
+            if(logger.isDebugEnabled()){
+                logger.debug("Queue message with ID {} has been received {} 
time(s)",
+                    message.getMessageId(),
+                    message.getReceiveCount() );
+            }
+
             AsyncEvent event = null;
             try {
                 event = (AsyncEvent) message.getBody();
@@ -305,7 +311,7 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
             final AsyncEvent thisEvent = event;
 
             if (logger.isDebugEnabled()) {
-                logger.debug("Processing {} event", event);
+                logger.debug("Processing event with type {}", 
event.getClass().getSimpleName());
             }
 
             IndexOperationMessage indexOperationMessage = null;
@@ -333,7 +339,7 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
 
                 } else {
 
-                    throw new Exception("Unknown EventType for message: "+ 
message.getStringBody());
+                    throw new Exception("Unknown EventType for message: "+ 
message.getStringBody().trim());
                 }
 
 
@@ -345,13 +351,15 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
 
                 // this exception is throw when we wait before trying quorum 
read on map persistence.
                 // return empty event result so the event's message doesn't 
get ack'd
-                logger.info(e.getMessage());
+                if(logger.isDebugEnabled()){
+                    logger.debug(e.getMessage());
+                }
                 return new IndexEventResult(Optional.absent(), 
Optional.absent(), event.getCreationTime());
 
             } catch (Exception e) {
 
                 // if the event fails to process, log and return empty message 
result so it doesn't get ack'd
-                logger.error("Failed to process message: {} {}", 
message.getMessageId(), message.getStringBody(), e);
+                logger.error("{}. Failed to process message: {}", 
e.getMessage(), message.getStringBody().trim() );
                 return new IndexEventResult(Optional.absent(), 
Optional.absent(), event.getCreationTime());
             }
         });
@@ -471,59 +479,49 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
     }
 
     public IndexOperationMessage handleIndexOperation(final 
ElasticsearchIndexEvent elasticsearchIndexEvent){
-         Preconditions.checkNotNull( elasticsearchIndexEvent, 
"elasticsearchIndexEvent cannot be null" );
 
-        final UUID messageId = elasticsearchIndexEvent.getIndexBatchId();
+        Preconditions.checkNotNull( elasticsearchIndexEvent, 
"elasticsearchIndexEvent cannot be null" );
 
+        final UUID messageId = elasticsearchIndexEvent.getIndexBatchId();
         Preconditions.checkNotNull( messageId, "messageId must not be null" );
 
 
-        //load the entity
-
         final String message = esMapPersistence.getString( 
messageId.toString() );
 
-        final IndexOperationMessage indexOperationMessage;
 
+        final IndexOperationMessage indexOperationMessage;
         if(message == null) {
 
-           if ( System.currentTimeMillis() > 
elasticsearchIndexEvent.getCreationTime() + queueFig.getLocalQuorumTimeout() ) {
+            // provide some time back pressure before performing a quorum read
+            if ( System.currentTimeMillis() > 
elasticsearchIndexEvent.getCreationTime() + queueFig.getLocalQuorumTimeout() ) {
 
-               logger.warn("Received message with id {} to process, unable to 
find it, reading with higher consistency level",
-                   messageId);
-
-               final String highConsistency = 
esMapPersistence.getStringHighConsistency(messageId.toString());
+                if(logger.isDebugEnabled()){
+                    logger.debug("ES batch with id {} not found, reading with 
strong consistency", messageId);
+                }
 
-               if (highConsistency == null) {
-                   logger.error("Unable to find the ES batch with id {} to 
process at a higher consistency level",
-                       messageId);
+                final String highConsistency = 
esMapPersistence.getStringHighConsistency(messageId.toString());
+                if (highConsistency == null) {
 
-                   throw new RuntimeException("Unable to find the ES batch to 
process with message id " + messageId);
+                   throw new RuntimeException("ES batch with id "+messageId+" 
not found when reading with strong consistency");
                }
 
                indexOperationMessage = 
ObjectJsonSerializer.INSTANCE.fromString(highConsistency, 
IndexOperationMessage.class);
 
-           } else{
+           } else {
 
                throw new 
IndexDocNotFoundException(elasticsearchIndexEvent.getIndexBatchId());
 
            }
 
-        } else{
+        } else {
+
             indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( 
message, IndexOperationMessage.class );
         }
 
-        initializeEntityIndexes(indexOperationMessage);
-
-        //NOTE that we intentionally do NOT delete from the map.  We can't 
know when all regions have consumed the message
-        //so we'll let compaction on column expiration handle deletion
-
-        //read the value from the string
-
-        Preconditions.checkNotNull( indexOperationMessage, 
"indexOperationMessage cannot be null" );
-        Preconditions.checkArgument( !indexOperationMessage.isEmpty() , 
"queued indexOperationMessage messages should not be empty" );
 
+        // always do a check to ensure the indexes are initialized for the 
index requests
+        initializeEntityIndexes(indexOperationMessage);
 
-        //now execute it
         return indexOperationMessage;
 
     }
@@ -694,30 +692,31 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
 
                                              .map( messages -> {
                                                  if ( messages == null || 
messages.size() == 0 ) {
+                                                     // no messages came from 
the queue, move on
                                                      return null;
                                                  }
 
                                                  try {
+                                                     // process the messages
                                                      List<IndexEventResult> 
indexEventResults = callEventHandlers( messages );
+
+                                                     // submit the processed 
messages to index producer
                                                      List<QueueMessage> 
messagesToAck = submitToIndex( indexEventResults );
 
-                                                     if ( messagesToAck == 
null || messagesToAck.size() == 0 ) {
-                                                         logger.error(
-                                                             "No messages came 
back from the queue operation, should have seen {} messages",
-                                                                 
messages.size() );
-                                                         return messagesToAck;
+                                                     if ( messagesToAck.size() 
< messages.size() ) {
+                                                         logger.warn( "Missing 
{} message(s) from index processing",
+                                                            messages.size() - 
messagesToAck.size() );
                                                      }
 
-                                                     if ( messagesToAck.size() 
< messages.size() ) {
-                                                         logger.error( 
"Missing messages from queue post operation",
-                                                             messages, 
messagesToAck );
+                                                     // ack each message if 
making it to this point
+                                                     if( messagesToAck.size() 
> 0 ){
+                                                         ack( messagesToAck );
                                                      }
-                                                     //ack each message, but 
only if we didn't error.
-                                                     ack( messagesToAck );
+
                                                      return messagesToAck;
                                                  }
                                                  catch ( Exception e ) {
-                                                     logger.error( "failed to 
ack messages to sqs", e );
+                                                     logger.error( "Failed to 
ack messages", e );
                                                      return null;
                                                      //do not rethrow so we 
can process all of them
                                                  }
@@ -739,26 +738,30 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
      * @return
      */
     private List<QueueMessage> submitToIndex(List<IndexEventResult> 
indexEventResults) {
-        //if nothing came back then return null
+
+        // if nothing came back then return null
         if(indexEventResults==null){
             return null;
         }
-        IndexOperationMessage combined = new IndexOperationMessage();
 
-        // stream the messages to record the cycle time
+        IndexOperationMessage combined = new IndexOperationMessage();
         List<QueueMessage> queueMessages = indexEventResults.stream()
+
+            // filter out messages that are not present, they were not 
processed and put into the results
+            .filter( result -> result.getQueueMessage().isPresent() )
             .map(indexEventResult -> {
+
                 //record the cycle time
                 messageCycle.update(System.currentTimeMillis() - 
indexEventResult.getCreationTime());
+
+                // ingest each index op into our combined, single index op for 
the index producer
                 if(indexEventResult.getIndexOperationMessage().isPresent()){
                     
combined.ingest(indexEventResult.getIndexOperationMessage().get());
                 }
-                return indexEventResult;
+
+                return indexEventResult.getQueueMessage().get();
             })
-            // filter out messages that are not present, they were not 
processed and put into the results
-            .filter( result -> result.getQueueMessage().isPresent() )
-            .map(result -> result.getQueueMessage().get())
-            // collect
+            // collect into a list of QueueMessages that can be ack'd later
             .collect(Collectors.toList());
 
         // sumbit the requests to Elasticsearch

http://git-wip-us.apache.org/repos/asf/usergrid/blob/84357831/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
index a7d299e..ca6e011 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
@@ -84,7 +84,7 @@ public interface QueueFig extends GuicyFig {
     int getVisibilityTimeout();
 
     @Key( "usergrid.queue.localquorum.timeout")
-    @Default("5000") // 5 seconds
+    @Default("30000") // 30 seconds
     int getLocalQuorumTimeout();
 
     @Key( "usergrid.queue.client.connection.timeout")

http://git-wip-us.apache.org/repos/asf/usergrid/blob/84357831/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
index 55f79f4..f8ce6ef 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
@@ -23,6 +23,7 @@ public class QueueMessage {
     private final String handle;
     private final String type;
     private String stringBody;
+    private int receiveCount;
 
 
     public QueueMessage(String messageId, String handle, Object body,String 
type) {
@@ -31,6 +32,7 @@ public class QueueMessage {
         this.handle = handle;
         this.type = type;
         this.stringBody = "";
+        this.receiveCount = 1; // we'll always receive once if we're taking it 
off the in mem or AWS queue
     }
 
     public String getHandle() {
@@ -57,4 +59,12 @@ public class QueueMessage {
     public String getStringBody() {
         return stringBody;
     }
+
+    public void setReceiveCount(int receiveCount){
+        this.receiveCount = receiveCount;
+    }
+
+    public int getReceiveCount(){
+        return receiveCount;
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/84357831/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 6c78035..0be5bd0 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -427,7 +427,12 @@ public class SNSQueueManagerImpl implements QueueManager {
             logger.trace( "Getting up to {} messages from {}", limit, url );
         }
 
+        ArrayList<String> requestMessageAttributeNames = new 
ArrayList<String>(1);
+        requestMessageAttributeNames.add("ApproximateReceiveCount");
+
+
         ReceiveMessageRequest receiveMessageRequest = new 
ReceiveMessageRequest( url );
+        receiveMessageRequest.setAttributeNames(requestMessageAttributeNames);
         receiveMessageRequest.setMaxNumberOfMessages( limit );
         receiveMessageRequest.setVisibilityTimeout(
             Math.max( MIN_VISIBILITY_TIMEOUT, fig.getVisibilityTimeout() / 
1000 ) );
@@ -477,6 +482,8 @@ public class SNSQueueManagerImpl implements QueueManager {
                 QueueMessage queueMessage = new QueueMessage( 
message.getMessageId(), message.getReceiptHandle(), payload,
                     message.getAttributes().get( "type" ) );
                 queueMessage.setStringBody( originalBody );
+                int receiveCount = 
Integer.valueOf(message.getAttributes().get("ApproximateReceiveCount"));
+                queueMessage.setReceiveCount( receiveCount );
                 queueMessages.add( queueMessage );
             }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/84357831/stack/rest/src/main/resources/usergrid-rest-deploy-context.xml
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/resources/usergrid-rest-deploy-context.xml 
b/stack/rest/src/main/resources/usergrid-rest-deploy-context.xml
index 4438bbd..012f0c8 100644
--- a/stack/rest/src/main/resources/usergrid-rest-deploy-context.xml
+++ b/stack/rest/src/main/resources/usergrid-rest-deploy-context.xml
@@ -26,16 +26,23 @@
        <import resource="classpath:/usergrid-rest-context.xml" />
 
 
-       <bean id="properties"
-               
class="org.springframework.beans.factory.config.PropertiesFactoryBean">
-               <property name="singleton" value="true" />
-               <property name="ignoreResourceNotFound" value="true" />
-               <property name="locations">
-                       <list>
-                               
<value>classpath:/usergrid-default.properties</value>
-                               
<value>classpath:/usergrid-deployment.properties</value>
-                       </list>
-               </property>
-       </bean>
+    <bean id="properties"
+          
class="org.springframework.beans.factory.config.PropertiesFactoryBean">
+        <property name="properties" ref="sysProps"/>
+        <property name="localOverride" value="true"/>
+        <property name="singleton" value="true" />
+        <property name="ignoreResourceNotFound" value="true" />
+        <property name="locations">
+            <list>
+                <value>classpath:/usergrid-default.properties</value>
+                <value>classpath:/usergrid-deployment.properties</value>
+            </list>
+        </property>
+    </bean>
+
+    <bean id="sysProps" 
class="org.springframework.beans.factory.config.MethodInvokingFactoryBean">
+        <property name="targetClass"><value>java.lang.System</value></property>
+        <property name="targetMethod"><value>getProperties</value></property>
+    </bean>
 
 </beans>

Reply via email to