Author: kwall
Date: Thu Dec 27 10:16:13 2012
New Revision: 1426152
URL: http://svn.apache.org/viewvc?rev=1426152&view=rev
Log:
NO-JIRA: [Java Broker] Perf Tests - tweak queue drain algorithm to better
handle a slow broker (exposed by new batch size tests)
Modified:
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
Modified:
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java?rev=1426152&r1=1426151&r2=1426152&view=diff
==============================================================================
---
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java
(original)
+++
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java
Thu Dec 27 10:16:13 2012
@@ -103,16 +103,22 @@ public class ConsumerParticipant impleme
}
Date end = new Date();
- int numberOfMessagesSent = _totalNumberOfMessagesReceived.get();
+ int numberOfMessagesReceived = _totalNumberOfMessagesReceived.get();
long totalPayloadSize = _totalPayloadSizeOfAllMessagesReceived.get();
int payloadSize =
getPayloadSizeForResultIfConstantOrZeroOtherwise(_allConsumedPayloadSizes);
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Consumer {} finished consuming. Number of messages
consumed: {}",
+ getName(), numberOfMessagesReceived);
+ }
+
ConsumerParticipantResult result = _resultFactory.createForConsumer(
getName(),
registeredClientName,
_command,
acknowledgeMode,
- numberOfMessagesSent,
+ numberOfMessagesReceived,
payloadSize,
totalPayloadSize,
start, end, _messageLatencies);
Modified:
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java?rev=1426152&r1=1426151&r2=1426152&view=diff
==============================================================================
---
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
(original)
+++
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
Thu Dec 27 10:16:13 2012
@@ -25,7 +25,6 @@ import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
-import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.disttest.DistributedTestException;
@@ -34,12 +33,13 @@ import org.apache.qpid.framing.AMQShortS
import org.apache.qpid.framing.FieldTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
public class QpidQueueCreator implements QueueCreator
{
private static final Logger LOGGER =
LoggerFactory.getLogger(QpidQueueCreator.class);
private static final FieldTable EMPTY_QUEUE_BIND_ARGUMENTS = new
FieldTable();
private static final String QUEUE_CREATOR_DRAIN_POLL_TIMEOUT =
"qpid.disttest.queue.creator.drainPollTime";
- private static int _drainPollTimeout =
Integer.getInteger(QUEUE_CREATOR_DRAIN_POLL_TIMEOUT, 5000);
+ private static int _drainPollTimeout =
Integer.getInteger(QUEUE_CREATOR_DRAIN_POLL_TIMEOUT, 500);
@Override
public void createQueues(Connection connection, Session session,
List<QueueConfig> configs)
@@ -61,10 +61,8 @@ public class QpidQueueCreator implements
// drainQueue method is added because deletion of queue with a lot
// of messages takes time and might cause the timeout exception
- if (queueHasMessages(amqSession, destination))
- {
- drainQueue(connection, destination);
- }
+ drainQueue(connection, destination);
+
deleteQueue(amqSession, destination.getAMQQueueName());
}
}
@@ -81,13 +79,12 @@ public class QpidQueueCreator implements
}
}
- private boolean queueHasMessages(AMQSession<?, ?> amqSession,
AMQDestination destination)
+ private long getQueueDepth(AMQSession<?, ?> amqSession, AMQDestination
destination)
{
try
{
long queueDepth = amqSession.getQueueDepth(destination);
- LOGGER.info("Queue {} has {} message(s)",
destination.getQueueName(), queueDepth);
- return queueDepth > 0;
+ return queueDepth;
}
catch (Exception e)
{
@@ -103,10 +100,19 @@ public class QpidQueueCreator implements
LOGGER.debug("About to drain the queue {}",
destination.getQueueName());
noAckSession = connection.createSession(false,
org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
MessageConsumer messageConsumer =
noAckSession.createConsumer(destination);
+
+ long currentQueueDepth =
getQueueDepth((AMQSession<?,?>)noAckSession, destination);
int counter = 0;
- while(messageConsumer.receive(_drainPollTimeout) != null)
+ while (currentQueueDepth > 0)
{
- counter++;
+ LOGGER.info("Queue {} has {} message(s)",
destination.getQueueName(), currentQueueDepth);
+
+ while(messageConsumer.receive(_drainPollTimeout) != null)
+ {
+ counter++;
+ }
+
+ currentQueueDepth =
getQueueDepth((AMQSession<?,?>)noAckSession, destination);
}
LOGGER.info("Drained {} message(s) from queue {} ", counter,
destination.getQueueName());
messageConsumer.close();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]