Author: chirino
Date: Mon Oct 18 20:25:23 2010
New Revision: 1023986
URL: http://svn.apache.org/viewvc?rev=1023986&view=rev
Log:
Create initial DB with 1M messages, save it and re-use it, modified
DeepQueueScenarios to just push messages onto the queue and took out the fixed
sampling. Still needs a bit of work.
Added:
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
- copied, changed from r1023948,
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/FileSupport.scala
activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FileSupportTest.scala
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/DeepQueueScenarios.scala
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/PersistentScenario.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala?rev=1023986&r1=1023985&r2=1023986&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
Mon Oct 18 20:25:23 2010
@@ -211,6 +211,24 @@ abstract class BrokerPerfSupport extends
config
}
+ def createDestinations(destCount: Int) : Array[Destination] = {
+ var dests = new Array[Destination](destCount)
+
+ for (i <- 0 until destCount) {
+ val domain = if (PTP) {Router.QUEUE_DOMAIN} else {Router.TOPIC_DOMAIN}
+ val name = new AsciiBuffer("dest" + (i + 1))
+ var bean = new SingleDestination(domain, name)
+ dests(i) = bean
+ // if (PTP) {
+ // sendBroker.defaultVirtualHost.createQueue(dests(i))
+ // if (MULTI_BROKER) {
+ // rcvBroker.defaultVirtualHost.createQueue(dests(i))
+ // }
+ // }
+ }
+ dests
+ }
+
def createConnections() = {
if (MULTI_BROKER) {
@@ -229,24 +247,11 @@ abstract class BrokerPerfSupport extends
startBrokers()
- var dests = new Array[Destination](destCount)
-
- for (i <- 0 until destCount) {
- val domain = if (PTP) {Router.QUEUE_DOMAIN} else {Router.TOPIC_DOMAIN}
- val name = new AsciiBuffer("dest" + (i + 1))
- var bean = new SingleDestination(domain, name)
- dests(i) = bean
-// if (PTP) {
-// sendBroker.defaultVirtualHost.createQueue(dests(i))
-// if (MULTI_BROKER) {
-// rcvBroker.defaultVirtualHost.createQueue(dests(i))
-// }
-// }
- }
+ val dests: Array[Destination] = createDestinations(destCount)
for (i <- 0 until producerCount) {
var destination = dests(i % destCount)
- var producer = _createProducer(i, destination)
+ var producer = _createProducer(i, MESSAGE_SIZE, destination)
producer.persistent = PERSISTENT
producers += (producer)
}
@@ -286,7 +291,7 @@ abstract class BrokerPerfSupport extends
}
- def _createProducer(id: Int, destination: Destination): RemoteProducer = {
+ def _createProducer(id: Int, messageSize: Int, destination: Destination):
RemoteProducer = {
var producer = createProducer()
producer.stopping = stopping
@@ -296,7 +301,7 @@ abstract class BrokerPerfSupport extends
producer.destination = destination
producer.messageIdGenerator = msgIdGenerator
producer.rateAggregator = totalProducerRate
- producer.payloadSize = MESSAGE_SIZE
+ producer.payloadSize = messageSize
producer.init
producer
}
@@ -329,7 +334,6 @@ abstract class BrokerPerfSupport extends
tracker.await
}
-
def startClients() = {
var tracker = new LoggingTracker("test consumer startup")
for (connection <- consumers) {
@@ -415,6 +419,25 @@ abstract class BrokerPerfSupport extends
}
+
+trait FixedSampling extends BrokerPerfSupport {
+ val MIN_MESSAGES = 100000L
+
+ override def fixed_sampling = false
+
+ override def keep_sampling:Boolean = {
+ if( producerCount > 0 && totalMessageSent < MIN_MESSAGES ) {
+ println("Waiting for producers: %s/%s".format(totalMessageSent,
MIN_MESSAGES));
+ return true
+ }
+ if ( consumerCount > 0 && totalMessageReceived < MIN_MESSAGES ) {
+ println("Waiting for consumers: %s/%s".format(totalMessageReceived,
MIN_MESSAGES));
+ return true
+ }
+ return false
+ }
+}
+
abstract class RemoteConnection extends Connection {
var uri: String = null
var name:String = null
@@ -425,8 +448,6 @@ abstract class RemoteConnection extends
var stopping:AtomicBoolean = null
var destination: Destination = null
- var messageCount = 0
-
def init = {
if( rate.getName == null ) {
rate.name(name + " Rate")
@@ -472,13 +493,6 @@ abstract class RemoteConnection extends
}
}
}
-
- protected def doStop()
-
- protected def incrementMessageCount() = {
- messageCount = messageCount + 1
- }
-
}
abstract class RemoteConsumer extends RemoteConnection {
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/DeepQueueScenarios.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/DeepQueueScenarios.scala?rev=1023986&r1=1023985&r2=1023986&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/DeepQueueScenarios.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/DeepQueueScenarios.scala
Mon Oct 18 20:25:23 2010
@@ -16,8 +16,6 @@
*/
package org.apache.activemq.apollo.broker.perf
-import java.net.URL
-
/**
* <p>
* </p>
@@ -26,30 +24,9 @@ import java.net.URL
*/
trait DeepQueueScenarios extends PersistentScenario {
- PERSISTENT = true
- val MIN_MESSAGES = 100000
-
- override def fixed_sampling = false
-
- override def keep_sampling:Boolean = {
- if( producerCount > 0 && totalMessageSent < MIN_MESSAGES ) {
- println("Waiting for producers: %s/%s".format(totalMessageSent,
MIN_MESSAGES));
- return true
- }
- if ( consumerCount > 0 && totalMessageReceived < MIN_MESSAGES ) {
- println("Waiting for consumers: %s/%s".format(totalMessageReceived,
MIN_MESSAGES));
- return true
- }
- return false
- }
-
- override def reportResourceTemplate():URL = {
classOf[DeepQueueScenarios].getResource("persistent-report.html") }
-
- //override def partitionedLoad = List(1, 2, 4, 8, 10)
override def highContention = 100
- //override def messageSizes = List(20, 1024, 1024*256)
- for ( load <- partitionedLoad ; messageSize <- List(20,1024) ) {
+ for ( count <- partitionedLoad ; messageSize <- messageSizes ) {
def benchmark(name: String)(func: => Unit) {
test(name) {
@@ -59,12 +36,12 @@ trait DeepQueueScenarios extends Persist
func
}
}
+
+ val prefix = "queue " + (if((messageSize%1024)==0) (messageSize/1024)+"k"
else messageSize+"b" ) + " "
+ val suffix = "" //(if( durable ) " durable" else "")
- val info = "queue " + MIN_MESSAGES + " " + (if((messageSize%1024)==0)
(messageSize/1024)+"k" else messageSize+"b" ) + " with " + load + " "
-
- benchmark("En" + info + "producer(s)") {
- PURGE_STORE = true
- producerCount = load;
+ benchmark(format("%s%d%s", prefix, count, suffix)) {
+ producerCount = count;
createConnections();
// Start 'em up.
@@ -74,23 +51,6 @@ trait DeepQueueScenarios extends Persist
} finally {
stopServices();
}
- this.assert(totalMessageSent > MIN_MESSAGES, "Unexpected number of
messages sent!")
- }
-
- benchmark("De" + info + "consumer(s)") {
- PURGE_STORE = false
- consumerCount = load;
- createConnections();
-
- // Start 'em up.
- startClients();
- try {
- reportRates();
- } finally {
- stopServices();
- }
- this.assert(totalMessageReceived > MIN_MESSAGES, "Unexpected number of
messages received!")
}
}
-
}
\ No newline at end of file
Added:
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala?rev=1023986&view=auto
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala
(added)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala
Mon Oct 18 20:25:23 2010
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.broker.perf
+
+import org.apache.activemq.apollo.broker.{Destination, Broker}
+import tools.nsc.io.Directory
+import org.apache.activemq.apollo.util.metric.MetricAggregator
+import org.apache.activemq.apollo.util.{FileSupport, LoggingTracker}
+
+
+trait LargeInitialDB extends PersistentScenario {
+
+ PURGE_STORE = false
+
+ var original: Directory = null
+ var backup: Directory = null;
+
+ // delete existing data file and copy new data file over
+ override protected def beforeEach() = {
+ println("Restoring DB")
+ restoreDB
+ super.beforeEach
+ }
+
+ // start a broker connect a producer and dump a bunch of messages
+ // into a destination
+ override protected def beforeAll(configMap: Map[String, Any]) = {
+ super.beforeAll(configMap)
+
+ sendBroker = new Broker()
+ sendBroker.config = createBrokerConfig("Broker", sendBrokerBindURI,
sendBrokerConnectURI)
+ val store = sendBroker.config.virtual_hosts.get(0).store
+
+ original = new Directory(storeDirectory)
+ if ( original.exists ) {
+ original.deleteRecursively
+ original.createDirectory(true)
+ }
+ val backupLocation =
FileSupport.toDirectory(storeDirectory.getParent)./(FileSupport.toDirectory("backup"))
+ backup = backupLocation
+ cleanBackup
+
+ println("Using store at " + original + " and backup at " + backup)
+
+ var tracker = new LoggingTracker("initial db broker startup")
+ tracker.start(sendBroker)
+ tracker.await
+
+ PTP = true
+ val dests: Array[Destination] = createDestinations(1)
+ totalProducerRate = new MetricAggregator().name("Aggregate Producer
Rate").unit("items")
+ val producer: RemoteProducer = _createProducer(0, 1024, dests(0))
+ producer.persistent = true
+
+ tracker = new LoggingTracker("initial db producer startup")
+ tracker.start(producer)
+ tracker.await
+
+ val messages = 1000000L
+
+ println("Filling broker with " + messages + " 1k messages")
+ while (producer.rate.counter() < messages) {
+ println("Waiting for producer " + producer.rate.counter() + "/" +
messages)
+ Thread.sleep(5000)
+ }
+
+ tracker = new LoggingTracker("producer shutdown")
+ tracker.stop(producer)
+ tracker.await
+ tracker = new LoggingTracker("broker shutdown")
+ tracker.stop(sendBroker)
+ tracker.await
+
+ Thread.sleep(10000)
+
+ saveDB
+ }
+
+ def saveDB {
+ println("Copying contents of " + original + " to " + backup)
+ cleanBackup
+ FileSupport.recursiveCopy(original, backup)
+ printStores
+ }
+
+ def printStores {
+ println("\nOriginal store")
+ original.deepList().foreach(println)
+ println("\n\nBackup store")
+ backup.deepList().foreach(println)
+ }
+
+ def restoreDB {
+ original.deleteRecursively
+ println("Copying contents of " + backup + " to " + original)
+ FileSupport.recursiveCopy(backup, original)
+ printStores
+ }
+
+ def cleanBackup {
+ if (backup.exists) {
+ backup.deleteRecursively
+ }
+ backup.createDirectory(true)
+ printStores
+ }
+
+}
\ No newline at end of file
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/PersistentScenario.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/PersistentScenario.scala?rev=1023986&r1=1023985&r2=1023986&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/PersistentScenario.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/PersistentScenario.scala
Mon Oct 18 20:25:23 2010
@@ -16,6 +16,9 @@
*/
package org.apache.activemq.apollo.broker.perf
+import java.net.URL
+import java.io.File
+
/**
* <p>
* </p>
@@ -24,6 +27,9 @@ package org.apache.activemq.apollo.broke
*/
trait PersistentScenario extends BrokerPerfSupport {
+ var storeDirectory: File = null
+
+ override def reportResourceTemplate():URL = {
classOf[PersistentScenario].getResource("persistent-report.html") }
PERSISTENT = true
}
\ No newline at end of file
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala?rev=1023986&r1=1023985&r2=1023986&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
Mon Oct 18 20:25:23 2010
@@ -16,19 +16,7 @@
*/
package org.apache.activemq.apollo.stomp.perf
-import _root_.java.util.concurrent.TimeUnit
-import _root_.org.apache.activemq.apollo.broker._
import _root_.org.apache.activemq.apollo.broker.perf._
-import _root_.org.apache.activemq.apollo.stomp._
-import _root_.org.apache.activemq.apollo.util._
-
-import _root_.org.fusesource.hawtbuf._
-import collection.mutable.{ListBuffer, HashMap}
-
-import AsciiBuffer._
-import Stomp._
-import _root_.org.apache.activemq.apollo.stomp.StompFrame
-import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
import java.io.File
import org.apache.activemq.apollo.dto.{BrokerDTO, HawtDBStoreDTO}
import org.apache.activemq.apollo.store.bdb.dto.BDBStoreDTO
@@ -42,11 +30,11 @@ class BasicHawtDBTest extends BasicScena
override def description = "Using the STOMP protocol over TCP"
}
-class DeepQueueHawtDBTest extends DeepQueueScenarios with HawtDBScenario with
StompScenario {
+class DeepQueueHawtDBTest extends DeepQueueScenarios with LargeInitialDB with
HawtDBScenario with StompScenario {
override def description = "Using the STOMP protocol over TCP persisting to
the HawtDB store."
}
-class DeepQueueBDBTest extends DeepQueueScenarios with BDBScenario with
StompScenario {
+class DeepQueueBDBTest extends DeepQueueScenarios with LargeInitialDB with
BDBScenario with StompScenario {
override def description = "Using the STOMP protocol over TCP persisting to
the BerkleyDB store."
}
@@ -56,200 +44,25 @@ trait StompScenario extends BrokerPerfSu
override def getRemoteProtocolName() = "stomp"
}
-trait HawtDBScenario extends BrokerPerfSupport {
+trait HawtDBScenario extends PersistentScenario {
override def createBrokerConfig(name: String, bindURI: String, connectUri:
String): BrokerDTO = {
val rc = super.createBrokerConfig(name, bindURI, connectUri)
val store = new HawtDBStoreDTO
- store.directory = new File(new File(testDataDir, getClass.getName), name)
+ storeDirectory = new File(new File(testDataDir, getClass.getName), name)
+ store.directory = storeDirectory
rc.virtual_hosts.get(0).store = store
rc
}
}
-trait BDBScenario extends BrokerPerfSupport {
+
+trait BDBScenario extends PersistentScenario {
override def createBrokerConfig(name: String, bindURI: String, connectUri:
String): BrokerDTO = {
val rc = super.createBrokerConfig(name, bindURI, connectUri)
-
val store = new BDBStoreDTO
- store.directory = new File(new File(testDataDir, getClass.getName), name)
-
+ storeDirectory = new File(new File(testDataDir, getClass.getName), name)
+ store.directory = storeDirectory
rc.virtual_hosts.get(0).store = store
rc
}
}
-class StompRemoteConsumer extends RemoteConsumer with Logging {
- var outboundSink: OverflowSink[StompFrame] = null
-
- def watchdog(lastMessageCount: Int) : Unit = {
- val seconds = 10
- dispatchQueue.dispatchAfter(seconds, TimeUnit.SECONDS, ^ {
- if (messageCount == lastMessageCount) {
- warn("Messages have stopped arriving after " + seconds + "s,
stopping consumer")
- stop
- } else {
- watchdog(messageCount)
- }
- })
- }
-
- def onConnected() = {
- outboundSink = new OverflowSink[StompFrame](MapSink(transportSink){ x=>x })
- outboundSink.refiller = ^{}
-
- val stompDestination = if (destination.getDomain() == Router.QUEUE_DOMAIN)
{
- ascii("/queue/" + destination.getName().toString());
- } else {
- ascii("/topic/" + destination.getName().toString());
- }
-
- var frame = StompFrame(CONNECT);
- outboundSink.offer(frame);
-
- var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
- headers ::= (DESTINATION, stompDestination)
- headers ::= (ID, ascii("stomp-sub-" + name))
-
- if( persistent ) {
- headers ::= (ACK_MODE, CLIENT)
- }
-
- frame = StompFrame(SUBSCRIBE, headers);
- outboundSink.offer(frame);
- watchdog(messageCount)
- }
-
- override def onTransportCommand(command: Object) = {
- var frame = command.asInstanceOf[StompFrame]
- frame match {
- case StompFrame(CONNECTED, headers, _, _) =>
- case StompFrame(MESSAGE, headers, content, _) =>
- messageReceived();
-
- // we client ack if persistent messages are being used.
- if( persistent ) {
- var rc = List((MESSAGE_ID, frame.header(MESSAGE_ID)))
- outboundSink.offer(StompFrame(ACK, rc));
- }
-
- case StompFrame(ERROR, headers, content, _) =>
- onFailure(new Exception("Server reported an error: " + frame.content));
- case _ =>
- onFailure(new Exception("Unexpected stomp command: " + frame.action));
- }
- }
-
- protected def messageReceived() {
- if (thinkTime > 0) {
- transport.suspendRead
- dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, ^ {
- incrementMessageCount
- rate.increment();
- if (!stopped) {
- transport.resumeRead
- }
- })
- } else {
- incrementMessageCount
- rate.increment
- }
- }
-
- override def doStop() = {
- outboundSink.offer(StompFrame(DISCONNECT));
- dispatchQueue.dispatchAfter(5, TimeUnit.SECONDS, ^ {
- transport.stop
- stop
- })
- }
-}
-
-class StompRemoteProducer extends RemoteProducer with Logging {
- var outboundSink: OverflowSink[StompFrame] = null
- var stompDestination: AsciiBuffer = null
- var frame:StompFrame = null
-
- def send_next: Unit = {
- var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
- headers ::= (DESTINATION, stompDestination);
- if (property != null) {
- headers ::= (ascii(property), ascii(property));
- }
- if( persistent ) {
- headers ::= ((RECEIPT_REQUESTED, ascii("x")));
- }
- // var p = this.priority;
- // if (priorityMod > 0) {
- // p = if ((counter % priorityMod) == 0) { 0 } else { priority }
- // }
-
- var content = ascii(createPayload());
- frame = StompFrame(SEND, headers, BufferContent(content))
- drain()
- }
-
- def drain() = {
- if( frame!=null ) {
- if( !outboundSink.full ) {
- outboundSink.offer(frame)
- frame = null
- rate.increment
- val task = ^ {
- if (!stopped) {
- incrementMessageCount
- send_next
- }
- }
-
- if( !persistent ) {
- // if we are not going to wait for an ack back from the server,
- // then jut send the next one...
- if (thinkTime > 0) {
- dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, task)
- } else {
- dispatchQueue << task
- }
- }
- }
- }
- }
-
- override def onConnected() = {
- outboundSink = new OverflowSink[StompFrame](MapSink(transportSink){ x=>x })
- outboundSink.refiller = ^ { drain }
-
- if (destination.getDomain() == Router.QUEUE_DOMAIN) {
- stompDestination = ascii("/queue/" + destination.getName().toString());
- } else {
- stompDestination = ascii("/topic/" + destination.getName().toString());
- }
- outboundSink.offer(StompFrame(CONNECT));
- send_next
- }
-
- override def onTransportCommand(command: Object) = {
- var frame = command.asInstanceOf[StompFrame]
- frame match {
- case StompFrame(RECEIPT, headers, _, _) =>
- assert( persistent )
- // we got the ack for the previous message we sent.. now send the next
one.
- incrementMessageCount
- send_next
-
- case StompFrame(CONNECTED, headers, _, _) =>
- case StompFrame(ERROR, headers, content, _) =>
- onFailure(new Exception("Server reported an error: " +
frame.content.utf8));
- case _ =>
- onFailure(new Exception("Unexpected stomp command: " + frame.action));
- }
- }
-
- override def doStop() = {
- outboundSink.offer(StompFrame(DISCONNECT));
- dispatchQueue.dispatchAfter(5, TimeUnit.SECONDS, ^ {
- transport.stop
- stop
- })
- }
-
-}
-
Copied:
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
(from r1023948,
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala)
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala?p2=activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala&p1=activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala&r1=1023948&r2=1023986&rev=1023986&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
Mon Oct 18 20:25:23 2010
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.activemq.apollo.stomp.perf
import _root_.java.util.concurrent.TimeUnit
@@ -23,62 +24,15 @@ import _root_.org.apache.activemq.apollo
import _root_.org.apache.activemq.apollo.util._
import _root_.org.fusesource.hawtbuf._
-import collection.mutable.{ListBuffer, HashMap}
import AsciiBuffer._
import Stomp._
import _root_.org.apache.activemq.apollo.stomp.StompFrame
import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
-import java.io.File
-import org.apache.activemq.apollo.dto.{BrokerDTO, HawtDBStoreDTO}
-import org.apache.activemq.apollo.store.bdb.dto.BDBStoreDTO
-
-
-class BasicNonPersistentTest extends BasicScenarios with StompScenario {
- override def description = "Using the STOMP protocol over TCP"
-}
-
-class BasicHawtDBTest extends BasicScenarios with PersistentScenario with
HawtDBScenario with StompScenario {
- override def description = "Using the STOMP protocol over TCP"
-}
-
-class DeepQueueHawtDBTest extends DeepQueueScenarios with HawtDBScenario with
StompScenario {
- override def description = "Using the STOMP protocol over TCP persisting to
the HawtDB store."
-}
-
-class DeepQueueBDBTest extends DeepQueueScenarios with BDBScenario with
StompScenario {
- override def description = "Using the STOMP protocol over TCP persisting to
the BerkleyDB store."
-}
-
-trait StompScenario extends BrokerPerfSupport {
- override def createProducer() = new StompRemoteProducer()
- override def createConsumer() = new StompRemoteConsumer()
- override def getRemoteProtocolName() = "stomp"
-}
-
-trait HawtDBScenario extends BrokerPerfSupport {
- override def createBrokerConfig(name: String, bindURI: String, connectUri:
String): BrokerDTO = {
- val rc = super.createBrokerConfig(name, bindURI, connectUri)
- val store = new HawtDBStoreDTO
- store.directory = new File(new File(testDataDir, getClass.getName), name)
- rc.virtual_hosts.get(0).store = store
- rc
- }
-}
-trait BDBScenario extends BrokerPerfSupport {
- override def createBrokerConfig(name: String, bindURI: String, connectUri:
String): BrokerDTO = {
- val rc = super.createBrokerConfig(name, bindURI, connectUri)
-
- val store = new BDBStoreDTO
- store.directory = new File(new File(testDataDir, getClass.getName), name)
-
- rc.virtual_hosts.get(0).store = store
- rc
- }
-}
class StompRemoteConsumer extends RemoteConsumer with Logging {
var outboundSink: OverflowSink[StompFrame] = null
+ var messageCount = 0
def watchdog(lastMessageCount: Int) : Unit = {
val seconds = 10
@@ -102,18 +56,18 @@ class StompRemoteConsumer extends Remote
ascii("/topic/" + destination.getName().toString());
}
- var frame = StompFrame(CONNECT);
+ var frame = StompFrame(Stomp.Commands.CONNECT);
outboundSink.offer(frame);
var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
- headers ::= (DESTINATION, stompDestination)
- headers ::= (ID, ascii("stomp-sub-" + name))
+ headers ::= (Stomp.Headers.Subscribe.DESTINATION, stompDestination)
+ headers ::= (Stomp.Headers.Subscribe.ID, ascii("stomp-sub-" + name))
if( persistent ) {
- headers ::= (ACK_MODE, CLIENT)
+ headers ::= (Stomp.Headers.Subscribe.ACK_MODE,
Stomp.Headers.Subscribe.AckModeValues.CLIENT)
}
- frame = StompFrame(SUBSCRIBE, headers);
+ frame = StompFrame(Stomp.Commands.SUBSCRIBE, headers);
outboundSink.offer(frame);
watchdog(messageCount)
}
@@ -121,17 +75,17 @@ class StompRemoteConsumer extends Remote
override def onTransportCommand(command: Object) = {
var frame = command.asInstanceOf[StompFrame]
frame match {
- case StompFrame(CONNECTED, headers, _, _) =>
- case StompFrame(MESSAGE, headers, content, _) =>
+ case StompFrame(Responses.CONNECTED, headers, _, _) =>
+ case StompFrame(Responses.MESSAGE, headers, content, _) =>
messageReceived();
// we client ack if persistent messages are being used.
if( persistent ) {
- var rc = List((MESSAGE_ID, frame.header(MESSAGE_ID)))
- outboundSink.offer(StompFrame(ACK, rc));
+ var rc = List((Stomp.Headers.Ack.MESSAGE_ID,
frame.header(Stomp.Headers.Message.MESSAGE_ID)))
+ outboundSink.offer(StompFrame(Stomp.Commands.ACK, rc));
}
- case StompFrame(ERROR, headers, content, _) =>
+ case StompFrame(Responses.ERROR, headers, content, _) =>
onFailure(new Exception("Server reported an error: " + frame.content));
case _ =>
onFailure(new Exception("Unexpected stomp command: " + frame.action));
@@ -142,27 +96,20 @@ class StompRemoteConsumer extends Remote
if (thinkTime > 0) {
transport.suspendRead
dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, ^ {
- incrementMessageCount
+ messageCount += 1
rate.increment();
if (!stopped) {
transport.resumeRead
}
})
} else {
- incrementMessageCount
+ messageCount += 1
rate.increment
}
}
-
- override def doStop() = {
- outboundSink.offer(StompFrame(DISCONNECT));
- dispatchQueue.dispatchAfter(5, TimeUnit.SECONDS, ^ {
- transport.stop
- stop
- })
- }
}
+
class StompRemoteProducer extends RemoteProducer with Logging {
var outboundSink: OverflowSink[StompFrame] = null
var stompDestination: AsciiBuffer = null
@@ -170,12 +117,12 @@ class StompRemoteProducer extends Remote
def send_next: Unit = {
var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
- headers ::= (DESTINATION, stompDestination);
+ headers ::= (Stomp.Headers.Send.DESTINATION, stompDestination);
if (property != null) {
headers ::= (ascii(property), ascii(property));
}
if( persistent ) {
- headers ::= ((RECEIPT_REQUESTED, ascii("x")));
+ headers ::= ((Stomp.Headers.RECEIPT_REQUESTED, ascii("x")));
}
// var p = this.priority;
// if (priorityMod > 0) {
@@ -183,7 +130,7 @@ class StompRemoteProducer extends Remote
// }
var content = ascii(createPayload());
- frame = StompFrame(SEND, headers, BufferContent(content))
+ frame = StompFrame(Stomp.Commands.SEND, headers, BufferContent(content))
drain()
}
@@ -195,7 +142,6 @@ class StompRemoteProducer extends Remote
rate.increment
val task = ^ {
if (!stopped) {
- incrementMessageCount
send_next
}
}
@@ -222,34 +168,24 @@ class StompRemoteProducer extends Remote
} else {
stompDestination = ascii("/topic/" + destination.getName().toString());
}
- outboundSink.offer(StompFrame(CONNECT));
+ outboundSink.offer(StompFrame(Stomp.Commands.CONNECT));
send_next
}
override def onTransportCommand(command: Object) = {
var frame = command.asInstanceOf[StompFrame]
frame match {
- case StompFrame(RECEIPT, headers, _, _) =>
+ case StompFrame(Responses.RECEIPT, headers, _, _) =>
assert( persistent )
// we got the ack for the previous message we sent.. now send the next
one.
- incrementMessageCount
send_next
- case StompFrame(CONNECTED, headers, _, _) =>
- case StompFrame(ERROR, headers, content, _) =>
+ case StompFrame(Responses.CONNECTED, headers, _, _) =>
+ case StompFrame(Responses.ERROR, headers, content, _) =>
onFailure(new Exception("Server reported an error: " +
frame.content.utf8));
case _ =>
onFailure(new Exception("Unexpected stomp command: " + frame.action));
}
}
-
- override def doStop() = {
- outboundSink.offer(StompFrame(DISCONNECT));
- dispatchQueue.dispatchAfter(5, TimeUnit.SECONDS, ^ {
- transport.stop
- stop
- })
- }
-
}
Added:
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/FileSupport.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/FileSupport.scala?rev=1023986&view=auto
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/FileSupport.scala
(added)
+++
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/FileSupport.scala
Mon Oct 18 20:25:23 2010
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.util
+
+import tools.nsc.io.{File, Path, Directory}
+
+object FileSupport {
+
+ def recursiveCopy(source: Path, target: Path) : Unit = {
+ require(source.isDirectory, source.path + " must be a directory.")
+ if ( !target.exists ) {
+ target.toDirectory.createDirectory()
+ }
+
+ def createOrCopy(file: Path) : Unit = {
+ val newTarget = target / FileSupport.toDirectory(file.name)
+ if (file.isDirectory) {
+ recursiveCopy(file.toDirectory, newTarget)
+ } else {
+ file.toFile.copyTo(newTarget)
+ }
+ }
+ source.toDirectory.list.foreach(createOrCopy)
+ }
+
+ def toDirectory(name: String) : Directory = {
+ new Directory(new java.io.File(name))
+ }
+
+ def toFile(name: String) : File = {
+ new File(new java.io.File(name))
+ }
+
+}
\ No newline at end of file
Added:
activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FileSupportTest.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FileSupportTest.scala?rev=1023986&view=auto
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FileSupportTest.scala
(added)
+++
activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FileSupportTest.scala
Mon Oct 18 20:25:23 2010
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.util
+
+import org.scalatest.matchers.ShouldMatchers
+import tools.nsc.io.{Path, File, Directory}
+import collection.mutable.ArrayBuffer
+
+class FileSupportTest extends FunSuiteSupport with ShouldMatchers {
+
+ test("recursive file copy test") {
+
+ val base = new Directory(baseDir)
+ var target = base / FileSupport.toDirectory("target")
+
+ val sourceDir: Directory = target / FileSupport.toDirectory("sourceDir")
+ if ( sourceDir.exists ) {
+ sourceDir.deleteRecursively
+ }
+ sourceDir.createDirectory(false)
+
+ val subDir: Directory = sourceDir / FileSupport.toDirectory("subDir")
+ subDir.createDirectory(false)
+
+ val someFile: File = subDir / FileSupport.toFile("someFile")
+ someFile.createFile(false)
+
+ val targetDir: Directory = target / FileSupport.toDirectory("targetDir")
+ if ( targetDir.exists ) {
+ targetDir.deleteRecursively
+ }
+
+ FileSupport.recursiveCopy(sourceDir, targetDir)
+
+ val listing = new ArrayBuffer[String]
+
+ targetDir.deepList().foreach(file => listing.append(file.toString))
+
+ listing should contain("./target/targetDir/subDir")
+ listing should contain("./target/targetDir/subDir/someFile")
+
+ }
+
+
+}
\ No newline at end of file