---
.../apollo/broker/perf/BrokerPerfSupport.scala | 150 +++-----------------
.../apollo/broker/perf/LargeInitialDB.scala | 11 +-
.../apollo/broker/perf/RemoteConnection.scala | 142 ++++++++++++++++++
.../apollo/stomp/perf/StompRemoteClients.scala | 46 +++++--
.../activemq/apollo/util/ServiceController.scala | 24 +++-
5 files changed, 222 insertions(+), 151 deletions(-)
create mode 100644
apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/RemoteConnection.scala
diff --git a/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala b/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
index 05f3260..977728c 100644
--- a/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
+++ b/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
@@ -21,21 +21,18 @@ import _root_.java.lang.String
import org.apache.activemq.apollo.broker._
import org.scalatest._
-import java.io.{File, IOException}
-import org.apache.activemq.apollo.util.metric.{Period, MetricAggregator, MetricCounter}
+import java.io.File
+import org.apache.activemq.apollo.util.metric.{Period, MetricAggregator}
import org.fusesource.hawtbuf.AsciiBuffer
import collection.mutable.ListBuffer
import java.net.URL
-import java.util.concurrent.TimeUnit
-import org.fusesource.hawtdispatch.ScalaDispatch._
import org.apache.activemq.apollo.dto.BrokerDTO
-import org.apache.activemq.apollo.transport.TransportFactory
import org.apache.activemq.apollo.util._
/**
*
*/
-abstract class BrokerPerfSupport extends FunSuiteSupport with BeforeAndAfterEach with ServiceController {
+abstract class BrokerPerfSupport extends FunSuiteSupport with BeforeAndAfterEach {
var PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "6"))
var SAMPLE_PERIOD = java.lang.Long.parseLong(System.getProperty("SAMPLE_PERIOD", "1000"))
@@ -293,7 +290,6 @@ abstract class BrokerPerfSupport extends FunSuiteSupport with BeforeAndAfterEach
broker.config.connectors.get(0).advertise
}
-
def _createProducer(id: Int, messageSize: Int, destination: Destination): RemoteProducer = {
var producer = createProducer()
producer.stopping = stopping
@@ -313,22 +309,21 @@ abstract class BrokerPerfSupport extends FunSuiteSupport with BeforeAndAfterEach
println("waiting for services to stop")
stopping.set(true)
- controlServices(false, producers, "producer shutdown")
- controlServices(false, consumers, "consumer shutdown")
- controlServices(false, brokers, "broker shutdown")
+ ServiceControl.stop(producers, "producer shutdown")
+ ServiceControl.stop(consumers, "consumer shutdown")
+ ServiceControl.stop(brokers, "broker shutdown")
}
def startBrokers() = {
- controlServices(true, brokers, "test broker startup")
+ ServiceControl.start(brokers, "test broker startup")
}
-
def startClients() = {
- controlServices(true, consumers, "test consumer startup")
+ ServiceControl.start(consumers, "test consumer startup")
// let the consumers drain the destination for a bit...
Thread.sleep(1000)
- controlServices(true, producers, "test producer startup")
+ ServiceControl.start(producers, "test producer startup")
}
def fixed_sampling = true
@@ -367,11 +362,15 @@ abstract class BrokerPerfSupport extends FunSuiteSupport with BeforeAndAfterEach
// either we want to do x number of samples or we want to keep sampling while some condition is true.
if (fixed_sampling) {
- // Do 1 period of warm up that's not counted...
- println("Warming up...")
- Thread.sleep(SAMPLE_PERIOD)
- totalMessageSent += totalProducerRate.reset()
- totalMessageSent += totalConsumerRate.reset()
+ // Do 1 period of warm up that's not counted, wait for the broker to load up it's store...
+ var messagesForWarmup = totalMessageSent
+
+ while (messagesForWarmup == totalMessageSent) {
+ println("Warming up...")
+ Thread.sleep(SAMPLE_PERIOD)
+ totalMessageSent += totalProducerRate.reset()
+ totalMessageSent += totalConsumerRate.reset()
+ }
println("Sampling rates")
for (i <- 0 until PERFORMANCE_SAMPLES) {
@@ -421,116 +420,3 @@ trait FixedSampling extends BrokerPerfSupport {
}
}
-abstract class RemoteConnection extends Connection {
- var uri: String = null
- var name: String = null
-
- val rate = new MetricCounter()
- var rateAggregator: MetricAggregator = null
-
- var stopping: AtomicBoolean = null
- var destination: Destination = null
-
- def init = {
- if (rate.getName == null) {
- rate.name(name + " Rate")
- }
- rateAggregator.add(rate)
- }
-
- var callbackWhenConnected: Runnable = null
-
- override protected def _start(onComplete: Runnable) = {
- callbackWhenConnected = onComplete
- transport = TransportFactory.connect(uri)
- super._start(^ {})
- }
-
- override def onTransportConnected() = {
- onConnected()
- transport.resumeRead
- callbackWhenConnected.run
- callbackWhenConnected = null
- }
-
- protected def onConnected()
-
- override def onTransportFailure(error: IOException) = {
- if (!stopped) {
- if (stopping.get()) {
- transport.stop
- } else {
- onFailure(error)
- if (callbackWhenConnected != null) {
- warn("connect attempt failed. will retry connection..")
- dispatchQueue.dispatchAfter(50, TimeUnit.MILLISECONDS, ^ {
- if (stopping.get()) {
- callbackWhenConnected.run
- } else {
- // try to connect again...
- transport = TransportFactory.connect(uri)
- super._start(^ {})
- }
- })
- }
- }
- }
- }
-}
-
-abstract class RemoteConsumer extends RemoteConnection {
- var thinkTime: Long = 0
- var selector: String = null
- var durable = false
- var persistent = false
-}
-
-
-abstract class RemoteProducer extends RemoteConnection {
- var messageIdGenerator: AtomicLong = null
- var priority = 0
- var persistent = false
- var priorityMod = 0
- var counter = 0
- var producerId = 0
- var property: String = null
- var next: Delivery = null
- var thinkTime: Long = 0
-
- var filler: String = null
- var payloadSize = 20
-
- override def init = {
- super.init
-
- if (payloadSize > 0) {
- var sb = new StringBuilder(payloadSize)
- for (i <- 0 until payloadSize) {
- sb.append(('a' + (i % 26)).toChar)
- }
- filler = sb.toString()
- }
- }
-
- def createPayload(): String = {
- if (payloadSize >= 0) {
- var sb = new StringBuilder(payloadSize)
- sb.append(name)
- sb.append(':')
- counter += 1
- sb.append(counter)
- sb.append(':')
- var length = sb.length
- if (length <= payloadSize) {
- sb.append(filler.subSequence(0, payloadSize - length))
- return sb.toString()
- } else {
- return sb.substring(0, payloadSize)
- }
- } else {
- counter += 1
- return name + ":" + (counter)
- }
- }
-
-}
\ No newline at end of file
diff --git a/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala b/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala
index bc9351b..9c5b810 100644
--- a/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala
+++ b/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala
@@ -20,8 +20,7 @@ package org.apache.activemq.apollo.broker.perf
import org.apache.activemq.apollo.broker.Destination
import tools.nsc.io.Directory
import org.apache.activemq.apollo.util.metric.MetricAggregator
-import org.apache.activemq.apollo.util.FileSupport
-
+import org.apache.activemq.apollo.util.{ServiceControl, FileSupport}
trait LargeInitialDB extends PersistentScenario {
PURGE_STORE = false
@@ -54,7 +53,7 @@ trait LargeInitialDB extends PersistentScenario {
println("Using store at " + original + " and backup at " + backup)
- controlService(true, sendBroker, "initial db broker startup")
+ ServiceControl.start(sendBroker, "initial db broker startup")
PTP = true
val dests: Array[Destination] = createDestinations(1)
@@ -62,7 +61,7 @@ trait LargeInitialDB extends PersistentScenario {
val producer: RemoteProducer = _createProducer(0, 20, dests(0))
producer.persistent = true
- controlService(true, producer, "initial db producer startup")
+ ServiceControl.start(producer, "initial db producer startup")
val messages = 1000000L
@@ -72,8 +71,8 @@ trait LargeInitialDB extends PersistentScenario {
Thread.sleep(5000)
}
- controlService(false, producer, "producer shutdown")
- controlService(false, sendBroker, "broker shutdown")
+ ServiceControl.stop(producer, "producer shutdown")
+ ServiceControl.stop(sendBroker, "broker shutdown")
saveDB
}
diff --git a/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/RemoteConnection.scala b/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/RemoteConnection.scala
new file mode 100644
index 0000000..28e5d1b
--- /dev/null
+++ b/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/RemoteConnection.scala
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.broker.perf
+
+import org.apache.activemq.apollo.util.metric._
+import org.apache.activemq.apollo.broker.{Destination, Delivery, Connection}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+import java.util.concurrent.TimeUnit
+import org.fusesource.hawtdispatch.ScalaDispatch._
+import java.io.IOException
+import org.apache.activemq.apollo.transport.TransportFactory
+
+abstract class RemoteConnection extends Connection {
+ var uri: String = null
+ var name: String = null
+
+ val rate = new MetricCounter()
+ var rateAggregator: MetricAggregator = null
+
+ var stopping: AtomicBoolean = null
+ var destination: Destination = null
+
+ def init = {
+ if (rate.getName == null) {
+ rate.name(name + " Rate")
+ }
+ rateAggregator.add(rate)
+ }
+
+ var callbackWhenConnected: Runnable = null
+
+ override protected def _start(onComplete: Runnable) = {
+ callbackWhenConnected = onComplete
+ transport = TransportFactory.connect(uri)
+ super._start(^ {})
+ }
+
+ override def onTransportConnected() = {
+ onConnected()
+ transport.resumeRead
+ callbackWhenConnected.run
+ callbackWhenConnected = null
+ }
+
+ protected def onConnected()
+
+ override def onTransportFailure(error: IOException) = {
+ if (!stopped) {
+ if (stopping.get()) {
+ transport.stop
+ } else {
+ onFailure(error)
+ if (callbackWhenConnected != null) {
+ warn("connect attempt failed. will retry connection..")
+ dispatchQueue.dispatchAfter(50, TimeUnit.MILLISECONDS, ^ {
+ if (stopping.get()) {
+ callbackWhenConnected.run
+ } else {
+ // try to connect again...
+ transport = TransportFactory.connect(uri)
+ super._start(^ {})
+ }
+ })
+ }
+ }
+ }
+ }
+}
+
+abstract class RemoteConsumer extends RemoteConnection {
+ var thinkTime: Long = 0
+ var selector: String = null
+ var durable = false
+ var persistent = false
+
+ protected def messageReceived()
+}
+
+
+abstract class RemoteProducer extends RemoteConnection {
+ var messageIdGenerator: AtomicLong = null
+ var priority = 0
+ var persistent = false
+ var priorityMod = 0
+ var counter = 0
+ var producerId = 0
+ var property: String = null
+ var next: Delivery = null
+ var thinkTime: Long = 0
+
+ var filler: String = null
+ var payloadSize = 20
+
+ override def init = {
+ super.init
+
+ if (payloadSize > 0) {
+ var sb = new StringBuilder(payloadSize)
+ for (i <- 0 until payloadSize) {
+ sb.append(('a' + (i % 26)).toChar)
+ }
+ filler = sb.toString()
+ }
+ }
+
+ def createPayload(): String = {
+ if (payloadSize >= 0) {
+ var sb = new StringBuilder(payloadSize)
+ sb.append(name)
+ sb.append(':')
+ counter += 1
+ sb.append(counter)
+ sb.append(':')
+ var length = sb.length
+ if (length <= payloadSize) {
+ sb.append(filler.subSequence(0, payloadSize - length))
+ return sb.toString()
+ } else {
+ return sb.substring(0, payloadSize)
+ }
+ } else {
+ counter += 1
+ return name + ":" + (counter)
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala b/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
index 1b91d06..c2cdff3 100644
--- a/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
+++ b/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
@@ -30,12 +30,13 @@ import Stomp._
import _root_.org.apache.activemq.apollo.stomp.StompFrame
import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
-class StompRemoteConsumer extends RemoteConsumer with Logging {
+
+class StompRemoteConsumer extends RemoteConsumer {
var outboundSink: OverflowSink[StompFrame] = null
def onConnected() = {
- outboundSink = new OverflowSink[StompFrame](MapSink(transportSink){ x=>x })
- outboundSink.refiller = ^{}
+ outboundSink = new OverflowSink[StompFrame](MapSink(transportSink) {x => x})
+ outboundSink.refiller = ^ {}
val stompDestination = if (destination.getDomain() == Router.QUEUE_DOMAIN) {
ascii("/queue/" + destination.getName().toString());
@@ -96,7 +97,7 @@ class StompRemoteConsumer extends RemoteConsumer with Logging {
class StompRemoteProducer extends RemoteProducer with Logging {
var outboundSink: OverflowSink[StompFrame] = null
var stompDestination: AsciiBuffer = null
- var frame:StompFrame = null
+ var frame: StompFrame = null
def send_next: Unit = {
var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
@@ -118,8 +119,8 @@ class StompRemoteProducer extends RemoteProducer with Logging {
}
def drain() = {
- if( frame!=null ) {
- if( !outboundSink.full ) {
+ if (frame != null) {
+ if (!outboundSink.full) {
outboundSink.offer(frame)
frame = null
rate.increment
@@ -129,7 +130,7 @@ class StompRemoteProducer extends RemoteProducer with Logging {
}
}
- if( !persistent ) {
+ if (!persistent) {
// if we are not going to wait for an ack back from the server,
// then jut send the next one...
if (thinkTime > 0) {
@@ -143,8 +144,8 @@ class StompRemoteProducer extends RemoteProducer with Logging {
}
override def onConnected() = {
- outboundSink = new OverflowSink[StompFrame](MapSink(transportSink){ x=>x })
- outboundSink.refiller = ^ { drain }
+ outboundSink = new OverflowSink[StompFrame](MapSink(transportSink) {x => x})
+ outboundSink.refiller = ^ {drain}
if (destination.getDomain() == Router.QUEUE_DOMAIN) {
stompDestination = ascii("/queue/" + destination.getName().toString());
@@ -172,3 +173,30 @@ class StompRemoteProducer extends RemoteProducer with Logging {
}
}
+trait Watchog extends RemoteConsumer {
+ var messageCount = 0
+
+ def watchdog(lastMessageCount: Int): Unit = {
+ val seconds = 10
+ dispatchQueue.dispatchAfter(seconds, TimeUnit.SECONDS, ^ {
+ if (messageCount == lastMessageCount) {
+ warn("Messages have stopped arriving after " + seconds + "s, stopping consumer")
+ stop
+ } else {
+ watchdog(messageCount)
+ }
+ })
+ }
+
+ abstract override protected def messageReceived() = {
+ super.messageReceived
+ messageCount += 1
+ }
+
+ abstract override protected def onConnected() = {
+ super.onConnected
+ watchdog(messageCount)
+ }
+
+}
+
diff --git a/apollo-util/src/main/scala/org/apache/activemq/apollo/util/ServiceController.scala b/apollo-util/src/main/scala/org/apache/activemq/apollo/util/ServiceController.scala
index 887a433..1b65308 100644
--- a/apollo-util/src/main/scala/org/apache/activemq/apollo/util/ServiceController.scala
+++ b/apollo-util/src/main/scala/org/apache/activemq/apollo/util/ServiceController.scala
@@ -23,21 +23,37 @@ import collection.mutable.ListBuffer
Simple trait to cut down on the code necessary to manage BaseService instances
*/
-trait ServiceController {
+object ServiceControl {
// start or stop a single service
- def controlService(start: Boolean, service: Service, action: String) = {
+ private def controlService(start: Boolean, service: Service, action: String) = {
val tracker = new LoggingTracker(action)
if (start) tracker.start(service) else tracker.stop(service)
tracker.await
}
// start or stop a bunch of services in one go
- def controlServices(start: Boolean, services: ListBuffer[Service], action: String) = {
+ private def controlServices(start: Boolean, services: ListBuffer[Service], action: String) = {
val tracker = new LoggingTracker(action)
- services.foreach((service: Service) => {if (start) tracker.start(service) else tracker.stop(service)})
+ services.foreach(service => {if (start) tracker.start(service) else tracker.stop(service)})
tracker.await
}
+ def start(services: ListBuffer[Service], action: String) = {
+ controlServices(true, services, action)
+ }
+
+ def stop(services: ListBuffer[Service], action: String) = {
+ controlServices(false, services, action)
+ }
+
+ def start(service: Service, action: String) = {
+ controlService(true, service, action)
+ }
+
+ def stop(service: Service, action: String) = {
+ controlService(false, service, action)
+ }
+
}
\ No newline at end of file