Author: chirino
Date: Wed Sep 29 17:40:19 2010
New Revision: 1002779
URL: http://svn.apache.org/viewvc?rev=1002779&view=rev
Log:
Decoupling the deep queue scenario a bit from the base class.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/DeepQueueScenarios.scala
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/metric/Metric.java
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/metric/MetricAggregator.java
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/metric/MetricCounter.java
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala?rev=1002779&r1=1002778&r2=1002779&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
Wed Sep 29 17:40:19 2010
@@ -40,8 +40,6 @@ abstract class BrokerPerfSupport extends
var PERFORMANCE_SAMPLES =
Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "6"))
var SAMPLE_PERIOD =
java.lang.Long.parseLong(System.getProperty("SAMPLE_PERIOD", "1000"))
- var MAX_MESSAGES = 0
-
protected var TCP = true // Set to use tcp IO
var USE_KAHA_DB = true
@@ -69,7 +67,9 @@ abstract class BrokerPerfSupport extends
protected var destCount = 0
protected var totalProducerRate:MetricAggregator = null
- protected var totalConsumerRate:MetricAggregator = null
+ protected var totalConsumerRate:MetricAggregator = null
+ var totalMessageSent = 0L
+ var totalMessageReceived = 0L
protected var sendBroker: Broker = null
protected var rcvBroker: Broker = null
@@ -97,7 +97,9 @@ abstract class BrokerPerfSupport extends
sendBroker=null
producerCount = 0
consumerCount = 0
- destCount =0
+ destCount = 0
+ totalMessageSent = 0
+ totalMessageReceived = 0
}
override protected def beforeAll(configMap: Map[String, Any]) = {
@@ -274,7 +276,6 @@ abstract class BrokerPerfSupport extends
consumer.destination = destination
consumer.name = "Consumer:" + (i + 1)
consumer.rateAggregator = totalConsumerRate
- consumer.maxMessages = MAX_MESSAGES
consumer.init
return consumer
@@ -296,7 +297,6 @@ abstract class BrokerPerfSupport extends
producer.messageIdGenerator = msgIdGenerator
producer.rateAggregator = totalProducerRate
producer.payloadSize = MESSAGE_SIZE
- producer.maxMessages = MAX_MESSAGES
producer.init
producer
}
@@ -345,33 +345,14 @@ abstract class BrokerPerfSupport extends
tracker.await
}
- def messagesSent() : Long = {
- var sum = 0
- producers.foreach((producer:RemoteConnection) => sum +=
producer.messageCount)
- sum
- }
+ def fixed_sampling = true
+ def keep_sampling = false
- def messagesReceived() : Long = {
- var sum = 0
- consumers.foreach((consumer:RemoteConnection) => sum +=
consumer.messageCount)
- sum
- }
-
def reportRates() = {
- println("Warming up...")
- Thread.sleep(SAMPLE_PERIOD)
- totalProducerRate.reset()
- totalConsumerRate.reset()
-
- println("Sampling rates")
-
case class Summary(producer:java.lang.Float, pdev:java.lang.Float,
consumer:java.lang.Float, cdev:java.lang.Float)
-
var best = 0
-
import scala.collection.mutable.ArrayBuffer
-
val sample_rates = new ArrayBuffer[Summary]()
def fillRateSummary(i: Int): Unit = {
@@ -392,32 +373,29 @@ abstract class BrokerPerfSupport extends
best = i
}
- totalProducerRate.reset()
- totalConsumerRate.reset()
+ totalMessageSent += totalProducerRate.reset()
+ totalMessageReceived += totalConsumerRate.reset()
}
- // either we want to do x number of samples or sample over the course of x
number of messages
- if ( MAX_MESSAGES == 0 ) {
+ // either we want to do x number of samples or we want to keep sampling
while some condition is true.
+ if ( fixed_sampling ) {
+
+ // Do 1 period of warm up that's not counted...
+ println("Warming up...")
+ Thread.sleep(SAMPLE_PERIOD)
+ totalMessageSent += totalProducerRate.reset()
+ totalMessageSent += totalConsumerRate.reset()
+
+ println("Sampling rates")
for (i <- 0 until PERFORMANCE_SAMPLES) {
fillRateSummary(i)
}
} else {
- var clientsRunning = true
+ println("Sampling rates")
var i = 0
-
- while (clientsRunning) {
+ while( keep_sampling ) {
fillRateSummary(i)
- i = i + 1
- clientsRunning = false
-
- def checkForRunningClients(connection: Connection) = {
- if (connection.stopped == false) {
- clientsRunning = true
- }
- }
-
- producers.foreach(checkForRunningClients)
- consumers.foreach(checkForRunningClients)
+ i += 1
}
}
@@ -448,7 +426,6 @@ abstract class RemoteConnection extends
var destination: Destination = null
var messageCount = 0
- var maxMessages = 0
def init = {
if( rate.getName == null ) {
@@ -500,16 +477,6 @@ abstract class RemoteConnection extends
protected def incrementMessageCount() = {
messageCount = messageCount + 1
- if( maxMessages > 0 ) {
- if ( messageCount % (maxMessages / 10) == 0 ) {
- trace(name + " message count : " + messageCount)
- }
- if (messageCount == maxMessages) {
- trace(name + " message count (" + messageCount + ") max (" +
maxMessages + ") reached, stopping connection")
- doStop
- }
-
- }
}
}
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/DeepQueueScenarios.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/DeepQueueScenarios.scala?rev=1002779&r1=1002778&r2=1002779&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/DeepQueueScenarios.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/DeepQueueScenarios.scala
Wed Sep 29 17:40:19 2010
@@ -27,6 +27,21 @@ import java.net.URL
trait DeepQueueScenarios extends PersistentScenario {
PERSISTENT = true
+ val MIN_MESSAGES = 100000
+
+ override def fixed_sampling = false
+
+ override def keep_sampling:Boolean = {
+ if( producerCount > 0 && totalMessageSent < MIN_MESSAGES ) {
+ println("Waiting for producers: %s/%s".format(totalMessageSent,
MIN_MESSAGES));
+ return true
+ }
+ if ( consumerCount > 0 && totalMessageReceived < MIN_MESSAGES ) {
+ println("Waiting for consumers: %s/%s".format(totalMessageReceived,
MIN_MESSAGES));
+ return true
+ }
+ return false
+ }
override def reportResourceTemplate():URL = {
classOf[DeepQueueScenarios].getResource("persistent-report.html") }
@@ -36,12 +51,8 @@ trait DeepQueueScenarios extends Persist
for ( load <- partitionedLoad ; messageSize <- List(20,1024) ) {
- val totalMessages = 100000
- val numMessages = totalMessages / load
-
def benchmark(name: String)(func: => Unit) {
test(name) {
- MAX_MESSAGES = numMessages
PTP = true
MESSAGE_SIZE = messageSize
destCount = 1;
@@ -49,7 +60,7 @@ trait DeepQueueScenarios extends Persist
}
}
- val info = "queue " + numMessages + " " + (if((messageSize%1024)==0)
(messageSize/1024)+"k" else messageSize+"b" ) + " with " + load + " "
+ val info = "queue " + MIN_MESSAGES + " " + (if((messageSize%1024)==0)
(messageSize/1024)+"k" else messageSize+"b" ) + " with " + load + " "
benchmark("En" + info + "producer(s)") {
PURGE_STORE = true
@@ -63,7 +74,7 @@ trait DeepQueueScenarios extends Persist
} finally {
stopServices();
}
- this.assert(messagesSent == totalMessages, "Unexpected number of
messages sent!")
+ this.assert(totalMessageSent > MIN_MESSAGES, "Unexpected number of
messages sent!")
}
benchmark("De" + info + "consumer(s)") {
@@ -78,7 +89,7 @@ trait DeepQueueScenarios extends Persist
} finally {
stopServices();
}
- this.assert(messagesReceived == totalMessages, "Unexpected number of
messages received!")
+ this.assert(totalMessageReceived > MIN_MESSAGES, "Unexpected number of
messages received!")
}
}
Modified:
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/metric/Metric.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/metric/Metric.java?rev=1002779&r1=1002778&r2=1002779&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/metric/Metric.java
(original)
+++
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/metric/Metric.java
Wed Sep 29 17:40:19 2010
@@ -53,6 +53,6 @@ abstract public class Metric {
return String.format("%s: %(,.2f %s/s", name, period.rate(counter()),
unit);
}
- abstract public void reset();
+ abstract public long reset();
}
Modified:
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/metric/MetricAggregator.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/metric/MetricAggregator.java?rev=1002779&r1=1002778&r2=1002779&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/metric/MetricAggregator.java
(original)
+++
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/metric/MetricAggregator.java
Wed Sep 29 17:40:19 2010
@@ -142,10 +142,12 @@ public class MetricAggregator extends Me
}
@Override
- public void reset() {
+ public long reset() {
+ long rc = 0;
for (Metric metric : metrics) {
- metric.reset();
+ rc += metric.reset();
}
+ return rc;
}
}
Modified:
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/metric/MetricCounter.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/metric/MetricCounter.java?rev=1002779&r1=1002778&r2=1002779&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/metric/MetricCounter.java
(original)
+++
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/metric/MetricCounter.java
Wed Sep 29 17:40:19 2010
@@ -40,8 +40,8 @@ public class MetricCounter extends Metri
}
@Override
- public void reset() {
- counter.set(0);
+ public long reset() {
+ return counter.getAndSet(0);
}
}