---
.../apollo/broker/perf/BrokerPerfSupport.scala | 68 ++++---
.../apollo/broker/perf/DeepQueueScenarios.scala | 52 +-----
.../apollo/broker/perf/LargeInitialDB.scala | 123 ++++++++++++
.../apollo/broker/perf/PersistentScenario.scala | 6 +
.../apollo/stomp/perf/StompBrokerPerfTest.scala | 205 +-------------------
.../apollo/stomp/perf/StompRemoteClients.scala | 191 ++++++++++++++++++
.../apache/activemq/apollo/util/FileSupport.scala | 48 +++++
.../activemq/apollo/util/FileSupportTest.scala | 60 ++++++
8 files changed, 484 insertions(+), 269 deletions(-)
create mode 100644
apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala
create mode 100644
apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
create mode 100644
apollo-util/src/main/scala/org/apache/activemq/apollo/util/FileSupport.scala
create mode 100644
apollo-util/src/test/scala/org/apache/activemq/apollo/util/FileSupportTest.scala
diff --git a/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala b/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
index bc841a7..07b2847 100644
--- a/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
+++ b/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
@@ -211,6 +211,24 @@ abstract class BrokerPerfSupport extends FunSuiteSupport with BeforeAndAfterEach
config
}
+ def createDestinations(destCount: Int) : Array[Destination] = {
+ var dests = new Array[Destination](destCount)
+
+ for (i <- 0 until destCount) {
+ val domain = if (PTP) {Router.QUEUE_DOMAIN} else {Router.TOPIC_DOMAIN}
+ val name = new AsciiBuffer("dest" + (i + 1))
+ var bean = new SingleDestination(domain, name)
+ dests(i) = bean
+ // if (PTP) {
+ // sendBroker.defaultVirtualHost.createQueue(dests(i))
+ // if (MULTI_BROKER) {
+ // rcvBroker.defaultVirtualHost.createQueue(dests(i))
+ // }
+ // }
+ }
+ dests
+ }
+
def createConnections() = {
if (MULTI_BROKER) {
@@ -229,24 +247,11 @@ abstract class BrokerPerfSupport extends FunSuiteSupport with BeforeAndAfterEach
startBrokers()
- var dests = new Array[Destination](destCount)
-
- for (i <- 0 until destCount) {
- val domain = if (PTP) {Router.QUEUE_DOMAIN} else {Router.TOPIC_DOMAIN}
- val name = new AsciiBuffer("dest" + (i + 1))
- var bean = new SingleDestination(domain, name)
- dests(i) = bean
-// if (PTP) {
-// sendBroker.defaultVirtualHost.createQueue(dests(i))
-// if (MULTI_BROKER) {
-// rcvBroker.defaultVirtualHost.createQueue(dests(i))
-// }
-// }
- }
+ val dests: Array[Destination] = createDestinations(destCount)
for (i <- 0 until producerCount) {
var destination = dests(i % destCount)
- var producer = _createProducer(i, destination)
+ var producer = _createProducer(i, MESSAGE_SIZE, destination)
producer.persistent = PERSISTENT
producers += (producer)
}
@@ -286,7 +291,7 @@ abstract class BrokerPerfSupport extends FunSuiteSupport with BeforeAndAfterEach
}
- def _createProducer(id: Int, destination: Destination): RemoteProducer = {
+ def _createProducer(id: Int, messageSize: Int, destination: Destination): RemoteProducer = {
var producer = createProducer()
producer.stopping = stopping
@@ -296,7 +301,7 @@ abstract class BrokerPerfSupport extends FunSuiteSupport with BeforeAndAfterEach
producer.destination = destination
producer.messageIdGenerator = msgIdGenerator
producer.rateAggregator = totalProducerRate
- producer.payloadSize = MESSAGE_SIZE
+ producer.payloadSize = messageSize
producer.init
producer
}
@@ -329,7 +334,6 @@ abstract class BrokerPerfSupport extends FunSuiteSupport with BeforeAndAfterEach
tracker.await
}
-
def startClients() = {
var tracker = new LoggingTracker("test consumer startup")
for (connection <- consumers) {
@@ -415,6 +419,25 @@ abstract class BrokerPerfSupport extends FunSuiteSupport with BeforeAndAfterEach
}
+
+trait FixedSampling extends BrokerPerfSupport {
+ val MIN_MESSAGES = 100000L
+
+ override def fixed_sampling = false
+
+ override def keep_sampling:Boolean = {
+ if( producerCount > 0 && totalMessageSent < MIN_MESSAGES ) {
+ println("Waiting for producers: %s/%s".format(totalMessageSent, MIN_MESSAGES));
+ return true
+ }
+ if ( consumerCount > 0 && totalMessageReceived < MIN_MESSAGES ) {
+ println("Waiting for consumers: %s/%s".format(totalMessageReceived, MIN_MESSAGES));
+ return true
+ }
+ return false
+ }
+}
+
abstract class RemoteConnection extends Connection {
var uri: String = null
var name:String = null
@@ -425,8 +448,6 @@ abstract class RemoteConnection extends Connection {
var stopping:AtomicBoolean = null
var destination: Destination = null
- var messageCount = 0
-
def init = {
if( rate.getName == null ) {
rate.name(name + " Rate")
@@ -472,13 +493,6 @@ abstract class RemoteConnection extends Connection {
}
}
}
-
- protected def doStop()
-
- protected def incrementMessageCount() = {
- messageCount = messageCount + 1
- }
-
}
abstract class RemoteConsumer extends RemoteConnection {
diff --git a/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/DeepQueueScenarios.scala b/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/DeepQueueScenarios.scala
index 3d58cb9..505ab8a 100644
--- a/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/DeepQueueScenarios.scala
+++ b/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/DeepQueueScenarios.scala
@@ -16,8 +16,6 @@
*/
package org.apache.activemq.apollo.broker.perf
-import java.net.URL
-
/**
* <p>
* </p>
@@ -26,30 +24,9 @@ import java.net.URL
*/
trait DeepQueueScenarios extends PersistentScenario {
- PERSISTENT = true
- val MIN_MESSAGES = 100000
-
- override def fixed_sampling = false
-
- override def keep_sampling:Boolean = {
- if( producerCount > 0 && totalMessageSent < MIN_MESSAGES ) {
- println("Waiting for producers: %s/%s".format(totalMessageSent, MIN_MESSAGES));
- return true
- }
- if ( consumerCount > 0 && totalMessageReceived < MIN_MESSAGES ) {
- println("Waiting for consumers: %s/%s".format(totalMessageReceived, MIN_MESSAGES));
- return true
- }
- return false
- }
-
- override def reportResourceTemplate():URL = { classOf[DeepQueueScenarios].getResource("persistent-report.html") }
-
- //override def partitionedLoad = List(1, 2, 4, 8, 10)
override def highContention = 100
- //override def messageSizes = List(20, 1024, 1024*256)
- for ( load <- partitionedLoad ; messageSize <- List(20,1024) ) {
+ for ( count <- partitionedLoad ; messageSize <- messageSizes ) {
def benchmark(name: String)(func: => Unit) {
test(name) {
@@ -59,12 +36,12 @@ trait DeepQueueScenarios extends PersistentScenario {
func
}
}
+
+ val prefix = "queue " + (if((messageSize%1024)==0) (messageSize/1024)+"k" else messageSize+"b" ) + " "
+ val suffix = "" //(if( durable ) " durable" else "")
- val info = "queue " + MIN_MESSAGES + " " + (if((messageSize%1024)==0) (messageSize/1024)+"k" else messageSize+"b" ) + " with " + load + " "
-
- benchmark("En" + info + "producer(s)") {
- PURGE_STORE = true
- producerCount = load;
+ benchmark(format("%s%d%s", prefix, count, suffix)) {
+ producerCount = count;
createConnections();
// Start 'em up.
@@ -74,23 +51,6 @@ trait DeepQueueScenarios extends PersistentScenario {
} finally {
stopServices();
}
- this.assert(totalMessageSent > MIN_MESSAGES, "Unexpected number of messages sent!")
- }
-
- benchmark("De" + info + "consumer(s)") {
- PURGE_STORE = false
- consumerCount = load;
- createConnections();
-
- // Start 'em up.
- startClients();
- try {
- reportRates();
- } finally {
- stopServices();
- }
- this.assert(totalMessageReceived > MIN_MESSAGES, "Unexpected number of messages received!")
}
}
-
}
\ No newline at end of file
diff --git a/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala b/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala
new file mode 100644
index 0000000..3574bef
--- /dev/null
+++ b/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.broker.perf
+
+import org.apache.activemq.apollo.broker.{Destination, Broker}
+import tools.nsc.io.Directory
+import org.apache.activemq.apollo.util.metric.MetricAggregator
+import org.apache.activemq.apollo.util.{FileSupport, LoggingTracker}
+
+
+trait LargeInitialDB extends PersistentScenario {
+
+ PURGE_STORE = false
+
+ var original: Directory = null
+ var backup: Directory = null;
+
+ // delete existing data file and copy new data file over
+ override protected def beforeEach() = {
+ println("Restoring DB")
+ restoreDB
+ super.beforeEach
+ }
+
+ // start a broker connect a producer and dump a bunch of messages
+ // into a destination
+ override protected def beforeAll(configMap: Map[String, Any]) = {
+ super.beforeAll(configMap)
+
+ sendBroker = new Broker()
+ sendBroker.config = createBrokerConfig("Broker", sendBrokerBindURI, sendBrokerConnectURI)
+ val store = sendBroker.config.virtual_hosts.get(0).store
+
+ original = new Directory(storeDirectory)
+ if ( original.exists ) {
+ original.deleteRecursively
+ original.createDirectory(true)
+ }
+ val backupLocation = FileSupport.toDirectory(storeDirectory.getParent)./(FileSupport.toDirectory("backup"))
+ backup = backupLocation
+ cleanBackup
+
+ println("Using store at " + original + " and backup at " + backup)
+
+ var tracker = new LoggingTracker("initial db broker startup")
+ tracker.start(sendBroker)
+ tracker.await
+
+ PTP = true
+ val dests: Array[Destination] = createDestinations(1)
+ totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items")
+ val producer: RemoteProducer = _createProducer(0, 1024, dests(0))
+ producer.persistent = true
+
+ tracker = new LoggingTracker("initial db producer startup")
+ tracker.start(producer)
+ tracker.await
+
+ val messages = 1000000L
+
+ println("Filling broker with " + messages + " 1k messages")
+ while (producer.rate.counter() < messages) {
+ println("Waiting for producer " + producer.rate.counter() + "/" + messages)
+ Thread.sleep(5000)
+ }
+
+ tracker = new LoggingTracker("producer shutdown")
+ tracker.stop(producer)
+ tracker.await
+ tracker = new LoggingTracker("broker shutdown")
+ tracker.stop(sendBroker)
+ tracker.await
+
+ Thread.sleep(10000)
+
+ saveDB
+ }
+
+ def saveDB {
+ println("Copying contents of " + original + " to " + backup)
+ cleanBackup
+ FileSupport.recursiveCopy(original, backup)
+ printStores
+ }
+
+ def printStores {
+ println("\nOriginal store")
+ original.deepList().foreach(println)
+ println("\n\nBackup store")
+ backup.deepList().foreach(println)
+ }
+
+ def restoreDB {
+ original.deleteRecursively
+ println("Copying contents of " + backup + " to " + original)
+ FileSupport.recursiveCopy(backup, original)
+ printStores
+ }
+
+ def cleanBackup {
+ if (backup.exists) {
+ backup.deleteRecursively
+ }
+ backup.createDirectory(true)
+ printStores
+ }
+
+}
\ No newline at end of file
diff --git a/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/PersistentScenario.scala b/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/PersistentScenario.scala
index 859d196..efcfb4e 100644
--- a/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/PersistentScenario.scala
+++ b/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/PersistentScenario.scala
@@ -16,6 +16,9 @@
*/
package org.apache.activemq.apollo.broker.perf
+import java.net.URL
+import java.io.File
+
/**
* <p>
* </p>
@@ -24,6 +27,9 @@ package org.apache.activemq.apollo.broker.perf
*/
trait PersistentScenario extends BrokerPerfSupport {
+ var storeDirectory: File = null
+
+ override def reportResourceTemplate():URL = { classOf[PersistentScenario].getResource("persistent-report.html") }
PERSISTENT = true
}
\ No newline at end of file
diff --git a/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala b/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
index 78b60f1..b429b07 100644
--- a/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
+++ b/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
@@ -16,19 +16,7 @@
*/
package org.apache.activemq.apollo.stomp.perf
-import _root_.java.util.concurrent.TimeUnit
-import _root_.org.apache.activemq.apollo.broker._
import _root_.org.apache.activemq.apollo.broker.perf._
-import _root_.org.apache.activemq.apollo.stomp._
-import _root_.org.apache.activemq.apollo.util._
-
-import _root_.org.fusesource.hawtbuf._
-import collection.mutable.{ListBuffer, HashMap}
-
-import AsciiBuffer._
-import Stomp._
-import _root_.org.apache.activemq.apollo.stomp.StompFrame
-import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
import java.io.File
import org.apache.activemq.apollo.dto.{BrokerDTO, HawtDBStoreDTO}
import org.apache.activemq.apollo.store.bdb.dto.BDBStoreDTO
@@ -42,11 +30,11 @@ class BasicHawtDBTest extends BasicScenarios with PersistentScenario with HawtDB
override def description = "Using the STOMP protocol over TCP"
}
-class DeepQueueHawtDBTest extends DeepQueueScenarios with HawtDBScenario with StompScenario {
+class DeepQueueHawtDBTest extends DeepQueueScenarios with LargeInitialDB with HawtDBScenario with StompScenario {
override def description = "Using the STOMP protocol over TCP persisting to the HawtDB store."
}
-class DeepQueueBDBTest extends DeepQueueScenarios with BDBScenario with StompScenario {
+class DeepQueueBDBTest extends DeepQueueScenarios with LargeInitialDB with BDBScenario with StompScenario {
override def description = "Using the STOMP protocol over TCP persisting to the BerkleyDB store."
}
@@ -56,200 +44,25 @@ trait StompScenario extends BrokerPerfSupport {
override def getRemoteProtocolName() = "stomp"
}
-trait HawtDBScenario extends BrokerPerfSupport {
+trait HawtDBScenario extends PersistentScenario {
override def createBrokerConfig(name: String, bindURI: String, connectUri: String): BrokerDTO = {
val rc = super.createBrokerConfig(name, bindURI, connectUri)
val store = new HawtDBStoreDTO
- store.directory = new File(new File(testDataDir, getClass.getName), name)
+ storeDirectory = new File(new File(testDataDir, getClass.getName), name)
+ store.directory = storeDirectory
rc.virtual_hosts.get(0).store = store
rc
}
}
-trait BDBScenario extends BrokerPerfSupport {
+
+trait BDBScenario extends PersistentScenario {
override def createBrokerConfig(name: String, bindURI: String, connectUri: String): BrokerDTO = {
val rc = super.createBrokerConfig(name, bindURI, connectUri)
-
val store = new BDBStoreDTO
- store.directory = new File(new File(testDataDir, getClass.getName), name)
-
+ storeDirectory = new File(new File(testDataDir, getClass.getName), name)
+ store.directory = storeDirectory
rc.virtual_hosts.get(0).store = store
rc
}
}
-class StompRemoteConsumer extends RemoteConsumer with Logging {
- var outboundSink: OverflowSink[StompFrame] = null
-
- def watchdog(lastMessageCount: Int) : Unit = {
- val seconds = 10
- dispatchQueue.dispatchAfter(seconds, TimeUnit.SECONDS, ^ {
- if (messageCount == lastMessageCount) {
- warn("Messages have stopped arriving after " + seconds + "s, stopping consumer")
- stop
- } else {
- watchdog(messageCount)
- }
- })
- }
-
- def onConnected() = {
- outboundSink = new OverflowSink[StompFrame](MapSink(transportSink){ x=>x })
- outboundSink.refiller = ^{}
-
- val stompDestination = if (destination.getDomain() == Router.QUEUE_DOMAIN) {
- ascii("/queue/" + destination.getName().toString());
- } else {
- ascii("/topic/" + destination.getName().toString());
- }
-
- var frame = StompFrame(CONNECT);
- outboundSink.offer(frame);
-
- var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
- headers ::= (DESTINATION, stompDestination)
- headers ::= (ID, ascii("stomp-sub-" + name))
-
- if( persistent ) {
- headers ::= (ACK_MODE, CLIENT)
- }
-
- frame = StompFrame(SUBSCRIBE, headers);
- outboundSink.offer(frame);
- watchdog(messageCount)
- }
-
- override def onTransportCommand(command: Object) = {
- var frame = command.asInstanceOf[StompFrame]
- frame match {
- case StompFrame(CONNECTED, headers, _, _) =>
- case StompFrame(MESSAGE, headers, content, _) =>
- messageReceived();
-
- // we client ack if persistent messages are being used.
- if( persistent ) {
- var rc = List((MESSAGE_ID, frame.header(MESSAGE_ID)))
- outboundSink.offer(StompFrame(ACK, rc));
- }
-
- case StompFrame(ERROR, headers, content, _) =>
- onFailure(new Exception("Server reported an error: " + frame.content));
- case _ =>
- onFailure(new Exception("Unexpected stomp command: " + frame.action));
- }
- }
-
- protected def messageReceived() {
- if (thinkTime > 0) {
- transport.suspendRead
- dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, ^ {
- incrementMessageCount
- rate.increment();
- if (!stopped) {
- transport.resumeRead
- }
- })
- } else {
- incrementMessageCount
- rate.increment
- }
- }
-
- override def doStop() = {
- outboundSink.offer(StompFrame(DISCONNECT));
- dispatchQueue.dispatchAfter(5, TimeUnit.SECONDS, ^ {
- transport.stop
- stop
- })
- }
-}
-
-class StompRemoteProducer extends RemoteProducer with Logging {
- var outboundSink: OverflowSink[StompFrame] = null
- var stompDestination: AsciiBuffer = null
- var frame:StompFrame = null
-
- def send_next: Unit = {
- var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
- headers ::= (DESTINATION, stompDestination);
- if (property != null) {
- headers ::= (ascii(property), ascii(property));
- }
- if( persistent ) {
- headers ::= ((RECEIPT_REQUESTED, ascii("x")));
- }
- // var p = this.priority;
- // if (priorityMod > 0) {
- // p = if ((counter % priorityMod) == 0) { 0 } else { priority }
- // }
-
- var content = ascii(createPayload());
- frame = StompFrame(SEND, headers, BufferContent(content))
- drain()
- }
-
- def drain() = {
- if( frame!=null ) {
- if( !outboundSink.full ) {
- outboundSink.offer(frame)
- frame = null
- rate.increment
- val task = ^ {
- if (!stopped) {
- incrementMessageCount
- send_next
- }
- }
-
- if( !persistent ) {
- // if we are not going to wait for an ack back from the server,
- // then jut send the next one...
- if (thinkTime > 0) {
- dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, task)
- } else {
- dispatchQueue << task
- }
- }
- }
- }
- }
-
- override def onConnected() = {
- outboundSink = new OverflowSink[StompFrame](MapSink(transportSink){ x=>x })
- outboundSink.refiller = ^ { drain }
-
- if (destination.getDomain() == Router.QUEUE_DOMAIN) {
- stompDestination = ascii("/queue/" + destination.getName().toString());
- } else {
- stompDestination = ascii("/topic/" + destination.getName().toString());
- }
- outboundSink.offer(StompFrame(CONNECT));
- send_next
- }
-
- override def onTransportCommand(command: Object) = {
- var frame = command.asInstanceOf[StompFrame]
- frame match {
- case StompFrame(RECEIPT, headers, _, _) =>
- assert( persistent )
- // we got the ack for the previous message we sent.. now send the next one.
- incrementMessageCount
- send_next
-
- case StompFrame(CONNECTED, headers, _, _) =>
- case StompFrame(ERROR, headers, content, _) =>
- onFailure(new Exception("Server reported an error: " + frame.content.utf8));
- case _ =>
- onFailure(new Exception("Unexpected stomp command: " + frame.action));
- }
- }
-
- override def doStop() = {
- outboundSink.offer(StompFrame(DISCONNECT));
- dispatchQueue.dispatchAfter(5, TimeUnit.SECONDS, ^ {
- transport.stop
- stop
- })
- }
-
-}
-
diff --git a/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala b/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
new file mode 100644
index 0000000..c5fce11
--- /dev/null
+++ b/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.stomp.perf
+
+import _root_.java.util.concurrent.TimeUnit
+import _root_.org.apache.activemq.apollo.broker._
+import _root_.org.apache.activemq.apollo.broker.perf._
+import _root_.org.apache.activemq.apollo.stomp._
+import _root_.org.apache.activemq.apollo.util._
+
+import _root_.org.fusesource.hawtbuf._
+
+import AsciiBuffer._
+import Stomp._
+import _root_.org.apache.activemq.apollo.stomp.StompFrame
+import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+
+class StompRemoteConsumer extends RemoteConsumer with Logging {
+ var outboundSink: OverflowSink[StompFrame] = null
+ var messageCount = 0
+
+ def watchdog(lastMessageCount: Int) : Unit = {
+ val seconds = 10
+ dispatchQueue.dispatchAfter(seconds, TimeUnit.SECONDS, ^ {
+ if (messageCount == lastMessageCount) {
+ warn("Messages have stopped arriving after " + seconds + "s, stopping consumer")
+ stop
+ } else {
+ watchdog(messageCount)
+ }
+ })
+ }
+
+ def onConnected() = {
+ outboundSink = new OverflowSink[StompFrame](MapSink(transportSink){ x=>x })
+ outboundSink.refiller = ^{}
+
+ val stompDestination = if (destination.getDomain() == Router.QUEUE_DOMAIN) {
+ ascii("/queue/" + destination.getName().toString());
+ } else {
+ ascii("/topic/" + destination.getName().toString());
+ }
+
+ var frame = StompFrame(Stomp.Commands.CONNECT);
+ outboundSink.offer(frame);
+
+ var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
+ headers ::= (Stomp.Headers.Subscribe.DESTINATION, stompDestination)
+ headers ::= (Stomp.Headers.Subscribe.ID, ascii("stomp-sub-" + name))
+
+ if( persistent ) {
+ headers ::= (Stomp.Headers.Subscribe.ACK_MODE, Stomp.Headers.Subscribe.AckModeValues.CLIENT)
+ }
+
+ frame = StompFrame(Stomp.Commands.SUBSCRIBE, headers);
+ outboundSink.offer(frame);
+ watchdog(messageCount)
+ }
+
+ override def onTransportCommand(command: Object) = {
+ var frame = command.asInstanceOf[StompFrame]
+ frame match {
+ case StompFrame(Responses.CONNECTED, headers, _, _) =>
+ case StompFrame(Responses.MESSAGE, headers, content, _) =>
+ messageReceived();
+
+ // we client ack if persistent messages are being used.
+ if( persistent ) {
+ var rc = List((Stomp.Headers.Ack.MESSAGE_ID, frame.header(Stomp.Headers.Message.MESSAGE_ID)))
+ outboundSink.offer(StompFrame(Stomp.Commands.ACK, rc));
+ }
+
+ case StompFrame(Responses.ERROR, headers, content, _) =>
+ onFailure(new Exception("Server reported an error: " + frame.content));
+ case _ =>
+ onFailure(new Exception("Unexpected stomp command: " + frame.action));
+ }
+ }
+
+ protected def messageReceived() {
+ if (thinkTime > 0) {
+ transport.suspendRead
+ dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, ^ {
+ messageCount += 1
+ rate.increment();
+ if (!stopped) {
+ transport.resumeRead
+ }
+ })
+ } else {
+ messageCount += 1
+ rate.increment
+ }
+ }
+}
+
+
+class StompRemoteProducer extends RemoteProducer with Logging {
+ var outboundSink: OverflowSink[StompFrame] = null
+ var stompDestination: AsciiBuffer = null
+ var frame:StompFrame = null
+
+ def send_next: Unit = {
+ var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
+ headers ::= (Stomp.Headers.Send.DESTINATION, stompDestination);
+ if (property != null) {
+ headers ::= (ascii(property), ascii(property));
+ }
+ if( persistent ) {
+ headers ::= ((Stomp.Headers.RECEIPT_REQUESTED, ascii("x")));
+ }
+ // var p = this.priority;
+ // if (priorityMod > 0) {
+ // p = if ((counter % priorityMod) == 0) { 0 } else { priority }
+ // }
+
+ var content = ascii(createPayload());
+ frame = StompFrame(Stomp.Commands.SEND, headers, BufferContent(content))
+ drain()
+ }
+
+ def drain() = {
+ if( frame!=null ) {
+ if( !outboundSink.full ) {
+ outboundSink.offer(frame)
+ frame = null
+ rate.increment
+ val task = ^ {
+ if (!stopped) {
+ send_next
+ }
+ }
+
+ if( !persistent ) {
+ // if we are not going to wait for an ack back from the server,
+ // then jut send the next one...
+ if (thinkTime > 0) {
+ dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, task)
+ } else {
+ dispatchQueue << task
+ }
+ }
+ }
+ }
+ }
+
+ override def onConnected() = {
+ outboundSink = new OverflowSink[StompFrame](MapSink(transportSink){ x=>x })
+ outboundSink.refiller = ^ { drain }
+
+ if (destination.getDomain() == Router.QUEUE_DOMAIN) {
+ stompDestination = ascii("/queue/" + destination.getName().toString());
+ } else {
+ stompDestination = ascii("/topic/" + destination.getName().toString());
+ }
+ outboundSink.offer(StompFrame(Stomp.Commands.CONNECT));
+ send_next
+ }
+
+ override def onTransportCommand(command: Object) = {
+ var frame = command.asInstanceOf[StompFrame]
+ frame match {
+ case StompFrame(Responses.RECEIPT, headers, _, _) =>
+ assert( persistent )
+ // we got the ack for the previous message we sent.. now send the next one.
+ send_next
+
+ case StompFrame(Responses.CONNECTED, headers, _, _) =>
+ case StompFrame(Responses.ERROR, headers, content, _) =>
+ onFailure(new Exception("Server reported an error: " + frame.content.utf8));
+ case _ =>
+ onFailure(new Exception("Unexpected stomp command: " + frame.action));
+ }
+ }
+}
+
diff --git a/apollo-util/src/main/scala/org/apache/activemq/apollo/util/FileSupport.scala b/apollo-util/src/main/scala/org/apache/activemq/apollo/util/FileSupport.scala
new file mode 100644
index 0000000..f8c8192
--- /dev/null
+++ b/apollo-util/src/main/scala/org/apache/activemq/apollo/util/FileSupport.scala
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.util
+
+import tools.nsc.io.{File, Path, Directory}
+
+object FileSupport {
+
+ def recursiveCopy(source: Path, target: Path) : Unit = {
+ require(source.isDirectory, source.path + " must be a directory.")
+ if ( !target.exists ) {
+ target.toDirectory.createDirectory()
+ }
+
+ def createOrCopy(file: Path) : Unit = {
+ val newTarget = target / FileSupport.toDirectory(file.name)
+ if (file.isDirectory) {
+ recursiveCopy(file.toDirectory, newTarget)
+ } else {
+ file.toFile.copyTo(newTarget)
+ }
+ }
+ source.toDirectory.list.foreach(createOrCopy)
+ }
+
+ def toDirectory(name: String) : Directory = {
+ new Directory(new java.io.File(name))
+ }
+
+ def toFile(name: String) : File = {
+ new File(new java.io.File(name))
+ }
+
+}
\ No newline at end of file
diff --git a/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FileSupportTest.scala b/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FileSupportTest.scala
new file mode 100644
index 0000000..267f68e
--- /dev/null
+++ b/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FileSupportTest.scala
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.util
+
+import org.scalatest.matchers.ShouldMatchers
+import tools.nsc.io.{Path, File, Directory}
+import collection.mutable.ArrayBuffer
+
+class FileSupportTest extends FunSuiteSupport with ShouldMatchers {
+
+ test("recursive file copy test") {
+
+ val base = new Directory(baseDir)
+ var target = base / FileSupport.toDirectory("target")
+
+ val sourceDir: Directory = target / FileSupport.toDirectory("sourceDir")
+ if ( sourceDir.exists ) {
+ sourceDir.deleteRecursively
+ }
+ sourceDir.createDirectory(false)
+
+ val subDir: Directory = sourceDir / FileSupport.toDirectory("subDir")
+ subDir.createDirectory(false)
+
+ val someFile: File = subDir / FileSupport.toFile("someFile")
+ someFile.createFile(false)
+
+ val targetDir: Directory = target / FileSupport.toDirectory("targetDir")
+ if ( targetDir.exists ) {
+ targetDir.deleteRecursively
+ }
+
+ FileSupport.recursiveCopy(sourceDir, targetDir)
+
+ val listing = new ArrayBuffer[String]
+
+ targetDir.deepList().foreach(file => listing.append(file.toString))
+
+ listing should contain("./target/targetDir/subDir")
+ listing should contain("./target/targetDir/subDir/someFile")
+
+ }
+
+
+}
\ No newline at end of file