Author: kwall
Date: Thu Dec 27 10:16:13 2012
New Revision: 1426152

URL: http://svn.apache.org/viewvc?rev=1426152&view=rev
Log:
NO-JIRA: [Java Broker] Perf Tests - tweak queue drain algorithm to better 
handle a slow broker (exposed by new batch size tests)

Modified:
    
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java
    
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java

Modified: 
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java?rev=1426152&r1=1426151&r2=1426152&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java
 (original)
+++ 
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java
 Thu Dec 27 10:16:13 2012
@@ -103,16 +103,22 @@ public class ConsumerParticipant impleme
         }
 
         Date end = new Date();
-        int numberOfMessagesSent = _totalNumberOfMessagesReceived.get();
+        int numberOfMessagesReceived = _totalNumberOfMessagesReceived.get();
         long totalPayloadSize = _totalPayloadSizeOfAllMessagesReceived.get();
         int payloadSize = 
getPayloadSizeForResultIfConstantOrZeroOtherwise(_allConsumedPayloadSizes);
 
+        if (LOGGER.isInfoEnabled())
+        {
+            LOGGER.info("Consumer {} finished consuming. Number of messages 
consumed: {}",
+                        getName(), numberOfMessagesReceived);
+        }
+
         ConsumerParticipantResult result = _resultFactory.createForConsumer(
                 getName(),
                 registeredClientName,
                 _command,
                 acknowledgeMode,
-                numberOfMessagesSent,
+                numberOfMessagesReceived,
                 payloadSize,
                 totalPayloadSize,
                 start, end, _messageLatencies);

Modified: 
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java?rev=1426152&r1=1426151&r2=1426152&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
 (original)
+++ 
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
 Thu Dec 27 10:16:13 2012
@@ -25,7 +25,6 @@ import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.Session;
 
-import org.apache.qpid.AMQException;
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.disttest.DistributedTestException;
@@ -34,12 +33,13 @@ import org.apache.qpid.framing.AMQShortS
 import org.apache.qpid.framing.FieldTable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 public class QpidQueueCreator implements QueueCreator
 {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(QpidQueueCreator.class);
     private static final FieldTable EMPTY_QUEUE_BIND_ARGUMENTS = new 
FieldTable();
     private static final String QUEUE_CREATOR_DRAIN_POLL_TIMEOUT = 
"qpid.disttest.queue.creator.drainPollTime";
-    private static int _drainPollTimeout = 
Integer.getInteger(QUEUE_CREATOR_DRAIN_POLL_TIMEOUT, 5000);
+    private static int _drainPollTimeout = 
Integer.getInteger(QUEUE_CREATOR_DRAIN_POLL_TIMEOUT, 500);
 
     @Override
     public void createQueues(Connection connection, Session session, 
List<QueueConfig> configs)
@@ -61,10 +61,8 @@ public class QpidQueueCreator implements
 
             // drainQueue method is added because deletion of queue with a lot
             // of messages takes time and might cause the timeout exception
-            if (queueHasMessages(amqSession, destination))
-            {
-                drainQueue(connection, destination);
-            }
+            drainQueue(connection, destination);
+
             deleteQueue(amqSession, destination.getAMQQueueName());
         }
     }
@@ -81,13 +79,12 @@ public class QpidQueueCreator implements
         }
     }
 
-    private boolean queueHasMessages(AMQSession<?, ?> amqSession, 
AMQDestination destination)
+    private long getQueueDepth(AMQSession<?, ?> amqSession, AMQDestination 
destination)
     {
         try
         {
             long queueDepth = amqSession.getQueueDepth(destination);
-            LOGGER.info("Queue {} has {} message(s)", 
destination.getQueueName(), queueDepth);
-            return queueDepth > 0;
+            return queueDepth;
         }
         catch (Exception e)
         {
@@ -103,10 +100,19 @@ public class QpidQueueCreator implements
             LOGGER.debug("About to drain the queue {}", 
destination.getQueueName());
             noAckSession = connection.createSession(false, 
org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
             MessageConsumer messageConsumer = 
noAckSession.createConsumer(destination);
+
+            long currentQueueDepth = 
getQueueDepth((AMQSession<?,?>)noAckSession, destination);
             int counter = 0;
-            while(messageConsumer.receive(_drainPollTimeout) != null)
+            while (currentQueueDepth > 0)
             {
-                counter++;
+                LOGGER.info("Queue {} has {} message(s)", 
destination.getQueueName(), currentQueueDepth);
+
+                while(messageConsumer.receive(_drainPollTimeout) != null)
+                {
+                    counter++;
+                }
+
+                currentQueueDepth = 
getQueueDepth((AMQSession<?,?>)noAckSession, destination);
             }
             LOGGER.info("Drained {} message(s) from queue {} ", counter, 
destination.getQueueName());
             messageConsumer.close();



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to