---
 .../apollo/broker/perf/BrokerPerfSupport.scala     |   68 ++++---
 .../apollo/broker/perf/DeepQueueScenarios.scala    |   52 +-----
 .../apollo/broker/perf/LargeInitialDB.scala        |  123 ++++++++++++
 .../apollo/broker/perf/PersistentScenario.scala    |    6 +
 .../apollo/stomp/perf/StompBrokerPerfTest.scala    |  205 +-------------------
 .../apollo/stomp/perf/StompRemoteClients.scala     |  191 ++++++++++++++++++
 .../apache/activemq/apollo/util/FileSupport.scala  |   48 +++++
 .../activemq/apollo/util/FileSupportTest.scala     |   60 ++++++
 8 files changed, 484 insertions(+), 269 deletions(-)
 create mode 100644 
apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala
 create mode 100644 
apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
 create mode 100644 
apollo-util/src/main/scala/org/apache/activemq/apollo/util/FileSupport.scala
 create mode 100644 
apollo-util/src/test/scala/org/apache/activemq/apollo/util/FileSupportTest.scala

diff --git a/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala b/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
index bc841a7..07b2847 100644
--- a/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
+++ b/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
@@ -211,6 +211,24 @@ abstract class BrokerPerfSupport extends FunSuiteSupport with BeforeAndAfterEach
     config
   }
 
+  def createDestinations(destCount: Int) : Array[Destination] = {
+    var dests = new Array[Destination](destCount)
+
+    for (i <- 0 until destCount) {
+      val domain = if (PTP) {Router.QUEUE_DOMAIN} else {Router.TOPIC_DOMAIN}
+      val name = new AsciiBuffer("dest" + (i + 1))
+      var bean = new SingleDestination(domain, name)
+      dests(i) = bean
+      //        if (PTP) {
+      //          sendBroker.defaultVirtualHost.createQueue(dests(i))
+      //          if (MULTI_BROKER) {
+      //            rcvBroker.defaultVirtualHost.createQueue(dests(i))
+      //          }
+      //        }
+    }
+    dests
+  }
+
   def createConnections() = {
 
     if (MULTI_BROKER) {
@@ -229,24 +247,11 @@ abstract class BrokerPerfSupport extends FunSuiteSupport with BeforeAndAfterEach
 
     startBrokers()
 
-    var dests = new Array[Destination](destCount)
-
-    for (i <- 0 until destCount) {
-      val domain = if (PTP) {Router.QUEUE_DOMAIN} else {Router.TOPIC_DOMAIN}
-      val name = new AsciiBuffer("dest" + (i + 1))
-      var bean = new SingleDestination(domain, name)
-      dests(i) = bean
-//        if (PTP) {
-//          sendBroker.defaultVirtualHost.createQueue(dests(i))
-//          if (MULTI_BROKER) {
-//            rcvBroker.defaultVirtualHost.createQueue(dests(i))
-//          }
-//        }
-    }
+    val dests: Array[Destination] = createDestinations(destCount)
 
     for (i <- 0 until producerCount) {
       var destination = dests(i % destCount)
-      var producer = _createProducer(i, destination)
+      var producer = _createProducer(i, MESSAGE_SIZE,  destination)
       producer.persistent = PERSISTENT
       producers += (producer)
     }
@@ -286,7 +291,7 @@ abstract class BrokerPerfSupport extends FunSuiteSupport with BeforeAndAfterEach
   }
 
 
-  def _createProducer(id: Int, destination: Destination): RemoteProducer = {
+  def _createProducer(id: Int, messageSize: Int, destination: Destination): RemoteProducer = {
     var producer = createProducer()
     producer.stopping = stopping
 
@@ -296,7 +301,7 @@ abstract class BrokerPerfSupport extends FunSuiteSupport with BeforeAndAfterEach
     producer.destination = destination
     producer.messageIdGenerator = msgIdGenerator
     producer.rateAggregator = totalProducerRate
-    producer.payloadSize = MESSAGE_SIZE
+    producer.payloadSize = messageSize
     producer.init
     producer
   }
@@ -329,7 +334,6 @@ abstract class BrokerPerfSupport extends FunSuiteSupport with BeforeAndAfterEach
     tracker.await
   }
 
-
   def startClients() = {
     var tracker = new LoggingTracker("test consumer startup")
     for (connection <- consumers) {
@@ -415,6 +419,25 @@ abstract class BrokerPerfSupport extends FunSuiteSupport with BeforeAndAfterEach
 
 
 }
+
+trait FixedSampling extends BrokerPerfSupport {
+  val MIN_MESSAGES = 100000L
+
+  override def fixed_sampling = false
+
+  override def keep_sampling:Boolean = {
+    if( producerCount > 0 && totalMessageSent < MIN_MESSAGES ) {
+      println("Waiting for producers: %s/%s".format(totalMessageSent, MIN_MESSAGES));
+      return true
+    }
+    if ( consumerCount > 0 && totalMessageReceived < MIN_MESSAGES ) {
+      println("Waiting for consumers: %s/%s".format(totalMessageReceived, MIN_MESSAGES));
+      return true
+    }
+    return false
+  }
+}
+
 abstract class RemoteConnection extends Connection {
   var uri: String = null
   var name:String = null
@@ -425,8 +448,6 @@ abstract class RemoteConnection extends Connection {
   var stopping:AtomicBoolean = null
   var destination: Destination = null
 
-  var messageCount = 0
-
   def init = {
     if( rate.getName == null ) {
       rate.name(name + " Rate")
@@ -472,13 +493,6 @@ abstract class RemoteConnection extends Connection {
       }
     }
   }
-
-  protected def doStop()
-
-  protected def incrementMessageCount() = {
-    messageCount = messageCount + 1
-  }
-
 }
 
 abstract class RemoteConsumer extends RemoteConnection {
diff --git a/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/DeepQueueScenarios.scala b/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/DeepQueueScenarios.scala
index 3d58cb9..505ab8a 100644
--- a/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/DeepQueueScenarios.scala
+++ b/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/DeepQueueScenarios.scala
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.apollo.broker.perf
 
-import java.net.URL
-
 /**
  * <p>
  * </p>
@@ -26,30 +24,9 @@ import java.net.URL
  */
 trait DeepQueueScenarios extends PersistentScenario {
 
-  PERSISTENT = true
-  val MIN_MESSAGES = 100000
-
-  override def fixed_sampling = false
-
-  override def keep_sampling:Boolean = {
-    if( producerCount > 0 && totalMessageSent < MIN_MESSAGES ) {
-      println("Waiting for producers: %s/%s".format(totalMessageSent, MIN_MESSAGES));
-      return true
-    }
-    if ( consumerCount > 0 && totalMessageReceived < MIN_MESSAGES ) {
-      println("Waiting for consumers: %s/%s".format(totalMessageReceived, MIN_MESSAGES));
-      return true
-    }
-    return false
-  }
-
-  override def reportResourceTemplate():URL = { classOf[DeepQueueScenarios].getResource("persistent-report.html") }
-
-  //override def partitionedLoad = List(1, 2, 4, 8, 10)
   override def highContention = 100
-  //override def messageSizes = List(20, 1024, 1024*256)
 
-  for ( load <- partitionedLoad ; messageSize <- List(20,1024)  ) {
+  for ( count <- partitionedLoad ; messageSize <- messageSizes  ) {
 
     def benchmark(name: String)(func: => Unit) {
       test(name) {
@@ -59,12 +36,12 @@ trait DeepQueueScenarios extends PersistentScenario {
         func
       }
     }
+    
+    val prefix = "queue " + (if((messageSize%1024)==0) (messageSize/1024)+"k" else messageSize+"b" ) + " "
+    val suffix = "" //(if( durable ) " durable" else "")
 
-    val info = "queue " + MIN_MESSAGES + " " + (if((messageSize%1024)==0) (messageSize/1024)+"k" else messageSize+"b" ) + " with " + load + " "
-
-    benchmark("En" + info + "producer(s)") {
-      PURGE_STORE = true
-      producerCount = load;
+    benchmark(format("%s%d%s", prefix, count, suffix)) {
+      producerCount = count;
       createConnections();
 
       // Start 'em up.
@@ -74,23 +51,6 @@ trait DeepQueueScenarios extends PersistentScenario {
       } finally {
         stopServices();
       }
-      this.assert(totalMessageSent > MIN_MESSAGES, "Unexpected number of messages sent!")
-    }
-
-    benchmark("De" + info + "consumer(s)") {
-      PURGE_STORE = false
-      consumerCount = load;
-      createConnections();
-
-      // Start 'em up.
-      startClients();
-      try {
-        reportRates();
-      } finally {
-        stopServices();
-      }
-      this.assert(totalMessageReceived > MIN_MESSAGES, "Unexpected number of messages received!")
     }
   }
-
 }
\ No newline at end of file
diff --git a/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala b/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala
new file mode 100644
index 0000000..3574bef
--- /dev/null
+++ b/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.activemq.apollo.broker.perf
+
+import org.apache.activemq.apollo.broker.{Destination, Broker}
+import tools.nsc.io.Directory
+import org.apache.activemq.apollo.util.metric.MetricAggregator
+import org.apache.activemq.apollo.util.{FileSupport, LoggingTracker}
+
+
+trait LargeInitialDB extends PersistentScenario {
+
+  PURGE_STORE = false
+
+  var original: Directory = null
+  var backup: Directory = null;
+
+  // delete existing data file and copy new data file over
+  override protected def beforeEach() = {
+    println("Restoring DB")
+    restoreDB
+    super.beforeEach
+  }
+
+  // start a broker connect a producer and dump a bunch of messages
+  // into a destination
+  override protected def beforeAll(configMap: Map[String, Any]) = {
+    super.beforeAll(configMap)
+
+    sendBroker = new Broker()
+    sendBroker.config = createBrokerConfig("Broker", sendBrokerBindURI, sendBrokerConnectURI)
+    val store = sendBroker.config.virtual_hosts.get(0).store
+
+    original = new Directory(storeDirectory)
+    if ( original.exists ) {
+      original.deleteRecursively
+      original.createDirectory(true)      
+    }
+    val backupLocation = FileSupport.toDirectory(storeDirectory.getParent)./(FileSupport.toDirectory("backup"))
+    backup = backupLocation
+    cleanBackup
+
+    println("Using store at " + original + " and backup at " + backup)
+
+    var tracker = new LoggingTracker("initial db broker startup")
+    tracker.start(sendBroker)
+    tracker.await
+
+    PTP = true
+    val dests: Array[Destination] = createDestinations(1)
+    totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items")    
+    val producer: RemoteProducer = _createProducer(0, 1024, dests(0))
+    producer.persistent = true
+
+    tracker = new LoggingTracker("initial db producer startup")
+    tracker.start(producer)
+    tracker.await
+
+    val messages = 1000000L
+
+    println("Filling broker with " + messages + " 1k messages")
+    while (producer.rate.counter() < messages) {
+      println("Waiting for producer " + producer.rate.counter() + "/" + messages)
+      Thread.sleep(5000)
+    }
+
+    tracker = new LoggingTracker("producer shutdown")
+    tracker.stop(producer)
+    tracker.await
+    tracker = new LoggingTracker("broker shutdown")
+    tracker.stop(sendBroker)
+    tracker.await
+
+    Thread.sleep(10000)
+
+    saveDB
+  }
+
+  def saveDB {
+    println("Copying contents of " + original + " to " + backup)
+    cleanBackup
+    FileSupport.recursiveCopy(original, backup)
+    printStores
+  }
+
+  def printStores {
+    println("\nOriginal store")
+    original.deepList().foreach(println)
+    println("\n\nBackup store")
+    backup.deepList().foreach(println)
+  }
+
+  def restoreDB {
+    original.deleteRecursively
+    println("Copying contents of " + backup + " to " + original)
+    FileSupport.recursiveCopy(backup, original)
+    printStores
+  }
+
+  def cleanBackup {
+    if (backup.exists) {
+      backup.deleteRecursively
+    }
+    backup.createDirectory(true)
+    printStores
+  }
+
+}
\ No newline at end of file
diff --git a/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/PersistentScenario.scala b/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/PersistentScenario.scala
index 859d196..efcfb4e 100644
--- a/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/PersistentScenario.scala
+++ b/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/PersistentScenario.scala
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.apollo.broker.perf
 
+import java.net.URL
+import java.io.File
+
 /**
  * <p>
  * </p>
@@ -24,6 +27,9 @@ package org.apache.activemq.apollo.broker.perf
  */
 trait PersistentScenario extends BrokerPerfSupport {
 
+  var storeDirectory: File = null
+
+  override def reportResourceTemplate():URL = { classOf[PersistentScenario].getResource("persistent-report.html") }
   PERSISTENT = true
 
 }
\ No newline at end of file
diff --git a/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala b/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
index 78b60f1..b429b07 100644
--- a/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
+++ b/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
@@ -16,19 +16,7 @@
  */
 package org.apache.activemq.apollo.stomp.perf
 
-import _root_.java.util.concurrent.TimeUnit
-import _root_.org.apache.activemq.apollo.broker._
 import _root_.org.apache.activemq.apollo.broker.perf._
-import _root_.org.apache.activemq.apollo.stomp._
-import _root_.org.apache.activemq.apollo.util._
-
-import _root_.org.fusesource.hawtbuf._
-import collection.mutable.{ListBuffer, HashMap}
-
-import AsciiBuffer._
-import Stomp._
-import _root_.org.apache.activemq.apollo.stomp.StompFrame
-import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
 import java.io.File
 import org.apache.activemq.apollo.dto.{BrokerDTO, HawtDBStoreDTO}
 import org.apache.activemq.apollo.store.bdb.dto.BDBStoreDTO
@@ -42,11 +30,11 @@ class BasicHawtDBTest extends BasicScenarios with PersistentScenario with HawtDB
   override def description = "Using the STOMP protocol over TCP"
 }
 
-class DeepQueueHawtDBTest extends DeepQueueScenarios with HawtDBScenario with StompScenario {
+class DeepQueueHawtDBTest extends DeepQueueScenarios with LargeInitialDB with HawtDBScenario with StompScenario {
   override def description = "Using the STOMP protocol over TCP persisting to the HawtDB store."
 }
 
-class DeepQueueBDBTest extends DeepQueueScenarios with BDBScenario with StompScenario {
+class DeepQueueBDBTest extends DeepQueueScenarios with LargeInitialDB with BDBScenario with StompScenario {
   override def description = "Using the STOMP protocol over TCP persisting to the BerkleyDB store."
 }
 
@@ -56,200 +44,25 @@ trait StompScenario extends BrokerPerfSupport {
   override def getRemoteProtocolName() = "stomp"
 }
 
-trait HawtDBScenario extends BrokerPerfSupport {
+trait HawtDBScenario extends PersistentScenario {
   override def createBrokerConfig(name: String, bindURI: String, connectUri: String): BrokerDTO = {
     val rc = super.createBrokerConfig(name, bindURI, connectUri)
     val store = new HawtDBStoreDTO
-    store.directory = new File(new File(testDataDir, getClass.getName), name)
+    storeDirectory = new File(new File(testDataDir, getClass.getName), name)
+    store.directory = storeDirectory
     rc.virtual_hosts.get(0).store = store
     rc
   }
 }
-trait BDBScenario extends BrokerPerfSupport {
+
+trait BDBScenario extends PersistentScenario {
   override def createBrokerConfig(name: String, bindURI: String, connectUri: String): BrokerDTO = {
     val rc = super.createBrokerConfig(name, bindURI, connectUri)
-
     val store = new BDBStoreDTO
-    store.directory = new File(new File(testDataDir, getClass.getName), name)
-
+    storeDirectory = new File(new File(testDataDir, getClass.getName), name)
+    store.directory = storeDirectory
     rc.virtual_hosts.get(0).store = store
     rc
   }
 }
 
-class StompRemoteConsumer extends RemoteConsumer with Logging {
-  var outboundSink: OverflowSink[StompFrame] = null
-
-  def watchdog(lastMessageCount: Int) : Unit = {
-    val seconds = 10
-    dispatchQueue.dispatchAfter(seconds, TimeUnit.SECONDS, ^ {
-          if (messageCount == lastMessageCount) {
-            warn("Messages have stopped arriving after " + seconds + "s, stopping consumer")
-            stop
-          } else {
-            watchdog(messageCount)
-          }
-        })
-  }
-
-  def onConnected() = {
-    outboundSink = new OverflowSink[StompFrame](MapSink(transportSink){ x=>x })
-    outboundSink.refiller = ^{}
-
-    val stompDestination = if (destination.getDomain() == Router.QUEUE_DOMAIN) {
-      ascii("/queue/" + destination.getName().toString());
-    } else {
-      ascii("/topic/" + destination.getName().toString());
-    }
-
-    var frame = StompFrame(CONNECT);
-    outboundSink.offer(frame);
-
-    var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
-    headers ::= (DESTINATION, stompDestination)
-    headers ::= (ID, ascii("stomp-sub-" + name))
-
-    if( persistent ) {
-      headers ::= (ACK_MODE, CLIENT)
-    }
-
-    frame = StompFrame(SUBSCRIBE, headers);
-    outboundSink.offer(frame);
-    watchdog(messageCount)
-  }
-
-  override def onTransportCommand(command: Object) = {
-    var frame = command.asInstanceOf[StompFrame]
-    frame match {
-      case StompFrame(CONNECTED, headers, _, _) =>
-      case StompFrame(MESSAGE, headers, content, _) =>
-          messageReceived();
-
-          // we client ack if persistent messages are being used.
-          if( persistent ) {
-            var rc = List((MESSAGE_ID, frame.header(MESSAGE_ID)))
-            outboundSink.offer(StompFrame(ACK, rc));
-          }
-
-      case StompFrame(ERROR, headers, content, _) =>
-        onFailure(new Exception("Server reported an error: " + frame.content));
-      case _ =>
-        onFailure(new Exception("Unexpected stomp command: " + frame.action));
-    }
-  }
-
-  protected def messageReceived() {
-      if (thinkTime > 0) {
-        transport.suspendRead
-        dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, ^ {
-          incrementMessageCount          
-          rate.increment();
-          if (!stopped) {
-            transport.resumeRead
-          }
-        })
-      } else {
-        incrementMessageCount
-        rate.increment
-      }
-  }
-
-  override def doStop() = {
-    outboundSink.offer(StompFrame(DISCONNECT));
-    dispatchQueue.dispatchAfter(5, TimeUnit.SECONDS, ^ {
-        transport.stop
-        stop
-      })
-  }
-}
-
-class StompRemoteProducer extends RemoteProducer with Logging {
-  var outboundSink: OverflowSink[StompFrame] = null
-  var stompDestination: AsciiBuffer = null
-  var frame:StompFrame = null
-
-  def send_next: Unit = {
-      var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
-      headers ::= (DESTINATION, stompDestination);
-      if (property != null) {
-        headers ::= (ascii(property), ascii(property));
-      }
-      if( persistent ) {
-        headers ::= ((RECEIPT_REQUESTED, ascii("x")));
-      }
-      //    var p = this.priority;
-      //    if (priorityMod > 0) {
-      //        p = if ((counter % priorityMod) == 0) { 0 } else { priority }
-      //    }
-
-      var content = ascii(createPayload());
-      frame = StompFrame(SEND, headers, BufferContent(content))
-      drain()
-  }
-
-  def drain() = {
-    if( frame!=null ) {
-      if( !outboundSink.full ) {
-        outboundSink.offer(frame)
-        frame = null
-        rate.increment
-        val task = ^ {
-          if (!stopped) {
-            incrementMessageCount
-            send_next
-          }
-        }
-
-        if( !persistent ) {
-          // if we are not going to wait for an ack back from the server,
-          // then jut send the next one...
-          if (thinkTime > 0) {
-            dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, task)
-          } else {
-            dispatchQueue << task
-          }
-        }
-      }
-    }
-  }
-
-  override def onConnected() = {
-    outboundSink = new OverflowSink[StompFrame](MapSink(transportSink){ x=>x })
-    outboundSink.refiller = ^ { drain }
-
-    if (destination.getDomain() == Router.QUEUE_DOMAIN) {
-      stompDestination = ascii("/queue/" + destination.getName().toString());
-    } else {
-      stompDestination = ascii("/topic/" + destination.getName().toString());
-    }
-    outboundSink.offer(StompFrame(CONNECT));
-    send_next
-  }
-
-  override def onTransportCommand(command: Object) = {
-    var frame = command.asInstanceOf[StompFrame]
-    frame match {
-      case StompFrame(RECEIPT, headers, _, _) =>
-        assert( persistent )
-        // we got the ack for the previous message we sent.. now send the next one.
-        incrementMessageCount
-        send_next
-
-      case StompFrame(CONNECTED, headers, _, _) =>
-      case StompFrame(ERROR, headers, content, _) =>
-        onFailure(new Exception("Server reported an error: " + frame.content.utf8));
-      case _ =>
-        onFailure(new Exception("Unexpected stomp command: " + frame.action));
-    }
-  }
-
-  override def doStop() = {
-    outboundSink.offer(StompFrame(DISCONNECT));
-    dispatchQueue.dispatchAfter(5, TimeUnit.SECONDS, ^ {
-        transport.stop
-        stop
-      })
-  }  
-
-}
-
diff --git a/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala b/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
new file mode 100644
index 0000000..c5fce11
--- /dev/null
+++ b/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.stomp.perf
+
+import _root_.java.util.concurrent.TimeUnit
+import _root_.org.apache.activemq.apollo.broker._
+import _root_.org.apache.activemq.apollo.broker.perf._
+import _root_.org.apache.activemq.apollo.stomp._
+import _root_.org.apache.activemq.apollo.util._
+
+import _root_.org.fusesource.hawtbuf._
+
+import AsciiBuffer._
+import Stomp._
+import _root_.org.apache.activemq.apollo.stomp.StompFrame
+import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+
+class StompRemoteConsumer extends RemoteConsumer with Logging {
+  var outboundSink: OverflowSink[StompFrame] = null
+  var messageCount = 0
+
+  def watchdog(lastMessageCount: Int) : Unit = {
+    val seconds = 10
+    dispatchQueue.dispatchAfter(seconds, TimeUnit.SECONDS, ^ {
+          if (messageCount == lastMessageCount) {
+            warn("Messages have stopped arriving after " + seconds + "s, stopping consumer")
+            stop
+          } else {
+            watchdog(messageCount)
+          }
+        })
+  }
+
+  def onConnected() = {
+    outboundSink = new OverflowSink[StompFrame](MapSink(transportSink){ x=>x })
+    outboundSink.refiller = ^{}
+
+    val stompDestination = if (destination.getDomain() == Router.QUEUE_DOMAIN) {
+      ascii("/queue/" + destination.getName().toString());
+    } else {
+      ascii("/topic/" + destination.getName().toString());
+    }
+
+    var frame = StompFrame(Stomp.Commands.CONNECT);
+    outboundSink.offer(frame);
+
+    var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
+    headers ::= (Stomp.Headers.Subscribe.DESTINATION, stompDestination)
+    headers ::= (Stomp.Headers.Subscribe.ID, ascii("stomp-sub-" + name))
+
+    if( persistent ) {
+      headers ::= (Stomp.Headers.Subscribe.ACK_MODE, Stomp.Headers.Subscribe.AckModeValues.CLIENT)
+    }
+
+    frame = StompFrame(Stomp.Commands.SUBSCRIBE, headers);
+    outboundSink.offer(frame);
+    watchdog(messageCount)
+  }
+
+  override def onTransportCommand(command: Object) = {
+    var frame = command.asInstanceOf[StompFrame]
+    frame match {
+      case StompFrame(Responses.CONNECTED, headers, _, _) =>
+      case StompFrame(Responses.MESSAGE, headers, content, _) =>
+          messageReceived();
+
+          // we client ack if persistent messages are being used.
+          if( persistent ) {
+            var rc = List((Stomp.Headers.Ack.MESSAGE_ID, frame.header(Stomp.Headers.Message.MESSAGE_ID)))
+            outboundSink.offer(StompFrame(Stomp.Commands.ACK, rc));
+          }
+
+      case StompFrame(Responses.ERROR, headers, content, _) =>
+        onFailure(new Exception("Server reported an error: " + frame.content));
+      case _ =>
+        onFailure(new Exception("Unexpected stomp command: " + frame.action));
+    }
+  }
+
+  protected def messageReceived() {
+      if (thinkTime > 0) {
+        transport.suspendRead
+        dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, ^ {
+          messageCount += 1
+          rate.increment();
+          if (!stopped) {
+            transport.resumeRead
+          }
+        })
+      } else {
+        messageCount += 1
+        rate.increment
+      }
+  }
+}
+
+
+class StompRemoteProducer extends RemoteProducer with Logging {
+  var outboundSink: OverflowSink[StompFrame] = null
+  var stompDestination: AsciiBuffer = null
+  var frame:StompFrame = null
+
+  def send_next: Unit = {
+      var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
+      headers ::= (Stomp.Headers.Send.DESTINATION, stompDestination);
+      if (property != null) {
+        headers ::= (ascii(property), ascii(property));
+      }
+      if( persistent ) {
+        headers ::= ((Stomp.Headers.RECEIPT_REQUESTED, ascii("x")));
+      }
+      //    var p = this.priority;
+      //    if (priorityMod > 0) {
+      //        p = if ((counter % priorityMod) == 0) { 0 } else { priority }
+      //    }
+
+      var content = ascii(createPayload());
+      frame = StompFrame(Stomp.Commands.SEND, headers, BufferContent(content))
+      drain()
+  }
+
+  def drain() = {
+    if( frame!=null ) {
+      if( !outboundSink.full ) {
+        outboundSink.offer(frame)
+        frame = null
+        rate.increment
+        val task = ^ {
+          if (!stopped) {
+            send_next
+          }
+        }
+
+        if( !persistent ) {
+          // if we are not going to wait for an ack back from the server,
+          // then jut send the next one...
+          if (thinkTime > 0) {
+            dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, task)
+          } else {
+            dispatchQueue << task
+          }
+        }
+      }
+    }
+  }
+
+  override def onConnected() = {
+    outboundSink = new OverflowSink[StompFrame](MapSink(transportSink){ x=>x })
+    outboundSink.refiller = ^ { drain }
+
+    if (destination.getDomain() == Router.QUEUE_DOMAIN) {
+      stompDestination = ascii("/queue/" + destination.getName().toString());
+    } else {
+      stompDestination = ascii("/topic/" + destination.getName().toString());
+    }
+    outboundSink.offer(StompFrame(Stomp.Commands.CONNECT));
+    send_next
+  }
+
+  override def onTransportCommand(command: Object) = {
+    var frame = command.asInstanceOf[StompFrame]
+    frame match {
+      case StompFrame(Responses.RECEIPT, headers, _, _) =>
+        assert( persistent )
+        // we got the ack for the previous message we sent.. now send the next one.
+        send_next
+
+      case StompFrame(Responses.CONNECTED, headers, _, _) =>
+      case StompFrame(Responses.ERROR, headers, content, _) =>
+        onFailure(new Exception("Server reported an error: " + frame.content.utf8));
+      case _ =>
+        onFailure(new Exception("Unexpected stomp command: " + frame.action));
+    }
+  }
+}
+
diff --git a/apollo-util/src/main/scala/org/apache/activemq/apollo/util/FileSupport.scala b/apollo-util/src/main/scala/org/apache/activemq/apollo/util/FileSupport.scala
new file mode 100644
index 0000000..f8c8192
--- /dev/null
+++ b/apollo-util/src/main/scala/org/apache/activemq/apollo/util/FileSupport.scala
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.util
+
+import tools.nsc.io.{File, Path, Directory}
+
+object FileSupport {
+
+  def recursiveCopy(source: Path, target: Path) : Unit = {
+    require(source.isDirectory, source.path + " must be a directory.")
+    if ( !target.exists ) {
+      target.toDirectory.createDirectory()
+    }
+
+    def createOrCopy(file: Path) : Unit = {
+      val newTarget = target / FileSupport.toDirectory(file.name)
+      if (file.isDirectory) {
+        recursiveCopy(file.toDirectory, newTarget)
+      } else {
+        file.toFile.copyTo(newTarget)
+      }
+    }
+    source.toDirectory.list.foreach(createOrCopy)
+  }
+
+  def toDirectory(name: String) : Directory = {
+    new Directory(new java.io.File(name))
+  }
+
+  def toFile(name: String) : File = {
+    new File(new java.io.File(name))
+  }
+  
+}
\ No newline at end of file
diff --git a/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FileSupportTest.scala b/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FileSupportTest.scala
new file mode 100644
index 0000000..267f68e
--- /dev/null
+++ b/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FileSupportTest.scala
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.activemq.apollo.util
+
+import org.scalatest.matchers.ShouldMatchers
+import tools.nsc.io.{Path, File, Directory}
+import collection.mutable.ArrayBuffer
+
+class FileSupportTest extends FunSuiteSupport with ShouldMatchers {
+
+  test("recursive file copy test") {
+
+    val base = new Directory(baseDir)
+    var target = base / FileSupport.toDirectory("target")
+
+    val sourceDir: Directory = target / FileSupport.toDirectory("sourceDir")
+    if ( sourceDir.exists ) {
+      sourceDir.deleteRecursively
+    }
+    sourceDir.createDirectory(false)
+
+    val subDir: Directory = sourceDir / FileSupport.toDirectory("subDir")
+    subDir.createDirectory(false)
+
+    val someFile: File = subDir / FileSupport.toFile("someFile")
+    someFile.createFile(false)
+
+    val targetDir: Directory = target / FileSupport.toDirectory("targetDir")
+    if ( targetDir.exists ) {
+      targetDir.deleteRecursively
+    }
+
+    FileSupport.recursiveCopy(sourceDir, targetDir)
+
+    val listing = new ArrayBuffer[String]
+
+    targetDir.deepList().foreach(file => listing.append(file.toString))
+
+    listing should contain("./target/targetDir/subDir")
+    listing should contain("./target/targetDir/subDir/someFile")
+
+  }
+
+  
+}
\ No newline at end of file

Reply via email to