Author: chirino
Date: Tue Oct 19 00:39:02 2010
New Revision: 1024072
URL: http://svn.apache.org/viewvc?rev=1024072&view=rev
Log:
Send persistent messages async when creating the large initial DB so that it
gets created faster.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/RemoteConnection.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala?rev=1024072&r1=1024071&r2=1024072&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala
Tue Oct 19 00:39:02 2010
@@ -29,6 +29,9 @@ trait LargeInitialDB extends PersistentS
var original: Directory = null
var backup: Directory = null;
+ // Keep it simple.. we are only creating 1 queue with a large number of
entries.
+ override def partitionedLoad = List(1)
+
// delete existing data file and copy new data file over
override protected def beforeEach() = {
println("Restoring DB")
@@ -57,21 +60,26 @@ trait LargeInitialDB extends PersistentS
PTP = true
val dests: Array[Destination] = createDestinations(1)
+
totalProducerRate = new MetricAggregator().name("Aggregate Producer
Rate").unit("items")
+
val producer: RemoteProducer = _createProducer(0, 20, dests(0))
+
producer.persistent = true
+ producer.sync_persistent_send = false // this should speed things up.
ServiceControl.start(producer, "initial db producer startup")
val messages = 1000000L
- println("Filling broker with " + messages + " 1k messages")
+ println("Sending %d messages".format(messages))
while (producer.rate.counter() < messages) {
println("Waiting for producer " + producer.rate.counter() + "/" +
messages)
Thread.sleep(5000)
}
ServiceControl.stop(producer, "producer shutdown")
+
ServiceControl.stop(sendBroker, "broker shutdown")
saveDB
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/RemoteConnection.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/RemoteConnection.scala?rev=1024072&r1=1024071&r2=1024072&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/RemoteConnection.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/RemoteConnection.scala
Tue Oct 19 00:39:02 2010
@@ -96,6 +96,7 @@ abstract class RemoteProducer extends Re
var messageIdGenerator: AtomicLong = null
var priority = 0
var persistent = false
+ var sync_persistent_send = true
var priorityMod = 0
var counter = 0
var producerId = 0
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala?rev=1024072&r1=1024071&r2=1024072&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
Tue Oct 19 00:39:02 2010
@@ -99,13 +99,15 @@ class StompRemoteProducer extends Remote
var stompDestination: AsciiBuffer = null
var frame: StompFrame = null
+ def use_send_receipt = persistent && sync_persistent_send
+
def send_next: Unit = {
var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
headers ::= (DESTINATION, stompDestination);
if (property != null) {
headers ::= (ascii(property), ascii(property));
}
- if( persistent ) {
+ if( use_send_receipt ) {
headers ::= ((RECEIPT_REQUESTED, ascii("x")));
}
// var p = this.priority;
@@ -130,7 +132,7 @@ class StompRemoteProducer extends Remote
}
}
- if (!persistent) {
+ if (!use_send_receipt) {
// if we are not going to wait for an ack back from the server,
// then jut send the next one...
if (thinkTime > 0) {
@@ -160,7 +162,7 @@ class StompRemoteProducer extends Remote
var frame = command.asInstanceOf[StompFrame]
frame match {
case StompFrame(RECEIPT, headers, _, _) =>
- assert( persistent )
+ assert( use_send_receipt )
// we got the ack for the previous message we sent.. now send the next
one.
send_next