---
.../apollo/broker/perf/BrokerPerfSupport.scala | 196 +++++++++-----------
.../apollo/broker/perf/DeepQueueScenarios.scala | 9 +-
.../apollo/broker/perf/LargeInitialDB.scala | 41 ++---
.../apollo/broker/perf/PersistentScenario.scala | 4 +-
.../apollo/stomp/perf/StompBrokerPerfTest.scala | 12 +-
.../activemq/apollo/util/ServiceController.scala | 43 +++++
6 files changed, 160 insertions(+), 145 deletions(-)
create mode 100644
apollo-util/src/main/scala/org/apache/activemq/apollo/util/ServiceController.scala
diff --git a/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala b/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
index 07b2847..05f3260 100644
--- a/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
+++ b/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
@@ -17,7 +17,7 @@
package org.apache.activemq.apollo.broker.perf
import _root_.java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
-import _root_.java.lang.{String}
+import _root_.java.lang.String
import org.apache.activemq.apollo.broker._
import org.scalatest._
@@ -33,10 +33,9 @@ import org.apache.activemq.apollo.transport.TransportFactory
import org.apache.activemq.apollo.util._
/**
- *
+ *
*/
-abstract class BrokerPerfSupport extends FunSuiteSupport with BeforeAndAfterEach {
-
+abstract class BrokerPerfSupport extends FunSuiteSupport with BeforeAndAfterEach with ServiceController {
var PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "6"))
var SAMPLE_PERIOD = java.lang.Long.parseLong(System.getProperty("SAMPLE_PERIOD", "1000"))
@@ -66,25 +65,27 @@ abstract class BrokerPerfSupport extends FunSuiteSupport with BeforeAndAfterEach
protected var consumerCount = 0
protected var destCount = 0
- protected var totalProducerRate:MetricAggregator = null
- protected var totalConsumerRate:MetricAggregator = null
+ protected var totalProducerRate: MetricAggregator = null
+ protected var totalConsumerRate: MetricAggregator = null
var totalMessageSent = 0L
var totalMessageReceived = 0L
protected var sendBroker: Broker = null
protected var rcvBroker: Broker = null
- protected val brokers = ListBuffer[Broker]()
+ protected val brokers = ListBuffer[Service]()
protected val msgIdGenerator = new AtomicLong()
val stopping = new AtomicBoolean()
- val producers = ListBuffer[RemoteProducer]()
- val consumers = ListBuffer[RemoteConsumer]()
+ val producers = ListBuffer[Service]()
+ val consumers = ListBuffer[Service]()
- var samples:List[(String, AnyRef)] = Nil
+ var samples: List[(String, AnyRef)] = Nil
def partitionedLoad = List(1, 2, 4, 8, 10)
+
def highContention = 10
- def messageSizes = List(20,1024,1024*256)
+
+ def messageSizes = List(20, 1024, 1024 * 256)
override protected def beforeEach() = {
totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items")
@@ -93,8 +94,8 @@ abstract class BrokerPerfSupport extends FunSuiteSupport with BeforeAndAfterEach
producers.clear
consumers.clear
stopping.set(false)
- rcvBroker=null
- sendBroker=null
+ rcvBroker = null
+ sendBroker = null
producerCount = 0
consumerCount = 0
destCount = 0
@@ -119,9 +120,9 @@ abstract class BrokerPerfSupport extends FunSuiteSupport with BeforeAndAfterEach
}
}
- def reportResourceTemplate:URL
+ def reportResourceTemplate: URL
- def reportTargetName = "perf-"+getClass.getName+".html"
+ def reportTargetName = "perf-" + getClass.getName + ".html"
override protected def afterAll() = {
val basedir = new File(System.getProperty("user.home", "."))
@@ -129,19 +130,17 @@ abstract class BrokerPerfSupport extends FunSuiteSupport with BeforeAndAfterEach
val report_parser = """(?s)(.*// DATA-START\r?\n)(.*)(// DATA-END.*<!-- DESCRIPTION-START -->)(.*)(<!-- DESCRIPTION-END -->.*)""".r
-
-
// Load the previous dataset if the file exists
var report_data = ""
- if( htmlFile.exists ) {
+ if (htmlFile.exists) {
IOHelper.readText(htmlFile) match {
case report_parser(_, data, _, _, _) =>
report_data = data.stripLineEnd
case _ =>
- println("could not parse existing report file: "+htmlFile)
+ println("could not parse existing report file: " + htmlFile)
val backup: File = new File(htmlFile.getParentFile, htmlFile.getName + ".bak")
- println("backing up to: "+backup)
- IOHelper.copyFile(htmlFile, backup )
+ println("backing up to: " + backup)
+ IOHelper.copyFile(htmlFile, backup)
}
}
@@ -150,56 +149,58 @@ abstract class BrokerPerfSupport extends FunSuiteSupport with BeforeAndAfterEach
template match {
case report_parser(report_header, _, report_mid, _, report_footer) =>
var notes = System.getProperty("notes")
- if( notes==null ) {
+ if (notes == null) {
val version = new String(ProcessSupport.system("git", "rev-list", "--max-count=1", "HEAD").toByteArray).trim
- notes = "commit "+version
+ notes = "commit " + version
}
- if( !report_data.isEmpty ) {
+ if (!report_data.isEmpty) {
report_data += ",\n"
}
- report_data += " ['"+jsescape(notes)+"', "+samples.map(x=>String.format("%.2f",x._2)).mkString(", ")+"]\n"
- IOHelper.writeText(htmlFile, report_header+report_data+report_mid+description+report_footer)
+ report_data += " ['" + jsescape(notes) + "', " + samples.map(x => String.format("%.2f", x._2)).mkString(", ") + "]\n"
+ IOHelper.writeText(htmlFile, report_header + report_data + report_mid + description + report_footer)
case _ =>
println("could not parse template report file")
}
- println("Updated: "+htmlFile)
+ println("Updated: " + htmlFile)
- if( DUMP_REPORT_COLS ) {
- samples.map(_._1).foreach{x=>
- println(" data.addColumn('number', '"+x+"')")
+ if (DUMP_REPORT_COLS) {
+ samples.map(_._1).foreach {
+ x =>
+ println(" data.addColumn('number', '" + x + "')")
}
}
}
def description = ""
- def jsescape(value:String) = {
+ def jsescape(value: String) = {
var rc = ""
- value.foreach{ c=>
- c match {
- case '\n'=> rc+="\\n"
- case '\r'=> rc+="\\r"
- case '\t'=> rc+="\\t"
- case '\''=> rc+="\\\'"
- case '\"'=> rc+="\\\""
- case _ => rc+=c
- }
+ value.foreach {
+ c =>
+ c match {
+ case '\n' => rc += "\\n"
+ case '\r' => rc += "\\r"
+ case '\t' => rc += "\\t"
+ case '\'' => rc += "\\\'"
+ case '\"' => rc += "\\\""
+ case _ => rc += c
+ }
}
rc
}
-
protected def createConsumer(): RemoteConsumer
+
protected def createProducer(): RemoteProducer
def getBrokerProtocolName() = "multi"
+
def getRemoteProtocolName(): String
def createBrokerConfig(name: String, bindURI: String, connectUri: String): BrokerDTO = {
-
val config = Broker.defaultConfig
val connector = config.connectors.get(0)
connector.bind = bindURI
@@ -211,7 +212,7 @@ abstract class BrokerPerfSupport extends FunSuiteSupport with BeforeAndAfterEach
config
}
- def createDestinations(destCount: Int) : Array[Destination] = {
+ def createDestinations(destCount: Int): Array[Destination] = {
var dests = new Array[Destination](destCount)
for (i <- 0 until destCount) {
@@ -229,8 +230,7 @@ abstract class BrokerPerfSupport extends FunSuiteSupport with BeforeAndAfterEach
dests
}
- def createConnections() = {
-
+ protected def initBrokers: Unit = {
if (MULTI_BROKER) {
sendBroker = new Broker()
sendBroker.config = createBrokerConfig("SendBroker", sendBrokerBindURI, sendBrokerConnectURI)
@@ -244,14 +244,17 @@ abstract class BrokerPerfSupport extends FunSuiteSupport with BeforeAndAfterEach
sendBroker.config = createBrokerConfig("Broker", sendBrokerBindURI, sendBrokerConnectURI)
brokers += (sendBroker)
}
+ }
- startBrokers()
+ def createConnections() = {
+ initBrokers
+ startBrokers
val dests: Array[Destination] = createDestinations(destCount)
for (i <- 0 until producerCount) {
var destination = dests(i % destCount)
- var producer = _createProducer(i, MESSAGE_SIZE, destination)
+ var producer = _createProducer(i, MESSAGE_SIZE, destination)
producer.persistent = PERSISTENT
producers += (producer)
}
@@ -282,11 +285,11 @@ abstract class BrokerPerfSupport extends FunSuiteSupport with BeforeAndAfterEach
consumer.name = "Consumer:" + (i + 1)
consumer.rateAggregator = totalConsumerRate
consumer.init
-
+
return consumer
}
- def connectUri(broker:Broker) = {
+ def connectUri(broker: Broker) = {
broker.config.connectors.get(0).advertise
}
@@ -309,52 +312,32 @@ abstract class BrokerPerfSupport extends FunSuiteSupport with BeforeAndAfterEach
def stopServices() = {
println("waiting for services to stop")
stopping.set(true)
- var tracker = new LoggingTracker("broker shutdown")
- for (broker <- brokers) {
- tracker.stop(broker)
- }
- tracker.await
- tracker = new LoggingTracker("producer shutdown")
- for (connection <- producers) {
- tracker.stop(connection)
- }
- tracker.await
- tracker = new LoggingTracker("consumer shutdown")
- for (connection <- consumers) {
- tracker.stop(connection)
- }
- tracker.await
+
+ controlServices(false, producers, "producer shutdown")
+ controlServices(false, consumers, "consumer shutdown")
+ controlServices(false, brokers, "broker shutdown")
}
+
def startBrokers() = {
- val tracker = new LoggingTracker("test broker startup")
- for (broker <- brokers) {
- tracker.start(broker)
- }
- tracker.await
+ controlServices(true, brokers, "test broker startup")
}
+
def startClients() = {
- var tracker = new LoggingTracker("test consumer startup")
- for (connection <- consumers) {
- tracker.start(connection)
- }
- tracker.await
+ controlServices(true, consumers, "test consumer startup")
// let the consumers drain the destination for a bit...
Thread.sleep(1000)
- tracker = new LoggingTracker("test producer startup")
- for (connection <- producers) {
- tracker.start(connection)
- }
- tracker.await
+ controlServices(true, producers, "test producer startup")
}
def fixed_sampling = true
+
def keep_sampling = false
def reportRates() = {
- case class Summary(producer:java.lang.Float, pdev:java.lang.Float, consumer:java.lang.Float, cdev:java.lang.Float)
+ case class Summary(producer: java.lang.Float, pdev: java.lang.Float, consumer: java.lang.Float, cdev: java.lang.Float)
var best = 0
import scala.collection.mutable.ArrayBuffer
val sample_rates = new ArrayBuffer[Summary]()
@@ -382,13 +365,13 @@ abstract class BrokerPerfSupport extends FunSuiteSupport with BeforeAndAfterEach
}
// either we want to do x number of samples or we want to keep sampling while some condition is true.
- if ( fixed_sampling ) {
+ if (fixed_sampling) {
// Do 1 period of warm up that's not counted...
println("Warming up...")
Thread.sleep(SAMPLE_PERIOD)
- totalMessageSent += totalProducerRate.reset()
- totalMessageSent += totalConsumerRate.reset()
+ totalMessageSent += totalProducerRate.reset()
+ totalMessageSent += totalConsumerRate.reset()
println("Sampling rates")
for (i <- 0 until PERFORMANCE_SAMPLES) {
@@ -397,22 +380,22 @@ abstract class BrokerPerfSupport extends FunSuiteSupport with BeforeAndAfterEach
} else {
println("Sampling rates")
var i = 0
- while( keep_sampling ) {
+ while (keep_sampling) {
fillRateSummary(i)
i += 1
}
}
- if( producerCount > 0 ) {
- samples = samples ::: ( testName+" producer", sample_rates(best).producer ) :: Nil
- if( producerCount > 1 ) {
- samples = samples ::: ( testName+" producer sd", sample_rates(best).pdev ) :: Nil
+ if (producerCount > 0) {
+ samples = samples ::: (testName + " producer", sample_rates(best).producer) :: Nil
+ if (producerCount > 1) {
+ samples = samples ::: (testName + " producer sd", sample_rates(best).pdev) :: Nil
}
}
- if( consumerCount > 0 ) {
- samples = samples ::: ( testName+" consumer", sample_rates(best).consumer ) :: Nil
- if( consumerCount > 1 ) {
- samples = samples ::: ( testName+" consumer sd", sample_rates(best).cdev ) :: Nil
+ if (consumerCount > 0) {
+ samples = samples ::: (testName + " consumer", sample_rates(best).consumer) :: Nil
+ if (consumerCount > 1) {
+ samples = samples ::: (testName + " consumer sd", sample_rates(best).cdev) :: Nil
}
}
}
@@ -425,12 +408,12 @@ trait FixedSampling extends BrokerPerfSupport {
override def fixed_sampling = false
- override def keep_sampling:Boolean = {
- if( producerCount > 0 && totalMessageSent < MIN_MESSAGES ) {
+ override def keep_sampling: Boolean = {
+ if (producerCount > 0 && totalMessageSent < MIN_MESSAGES) {
println("Waiting for producers: %s/%s".format(totalMessageSent, MIN_MESSAGES));
return true
}
- if ( consumerCount > 0 && totalMessageReceived < MIN_MESSAGES ) {
+ if (consumerCount > 0 && totalMessageReceived < MIN_MESSAGES) {
println("Waiting for consumers: %s/%s".format(totalMessageReceived, MIN_MESSAGES));
return true
}
@@ -440,27 +423,27 @@ trait FixedSampling extends BrokerPerfSupport {
abstract class RemoteConnection extends Connection {
var uri: String = null
- var name:String = null
+ var name: String = null
val rate = new MetricCounter()
var rateAggregator: MetricAggregator = null
- var stopping:AtomicBoolean = null
+ var stopping: AtomicBoolean = null
var destination: Destination = null
def init = {
- if( rate.getName == null ) {
+ if (rate.getName == null) {
rate.name(name + " Rate")
}
rateAggregator.add(rate)
}
- var callbackWhenConnected:Runnable = null
+ var callbackWhenConnected: Runnable = null
- override protected def _start(onComplete:Runnable) = {
+ override protected def _start(onComplete: Runnable) = {
callbackWhenConnected = onComplete
transport = TransportFactory.connect(uri)
- super._start(^{ })
+ super._start(^ {})
}
override def onTransportConnected() = {
@@ -474,19 +457,19 @@ abstract class RemoteConnection extends Connection {
override def onTransportFailure(error: IOException) = {
if (!stopped) {
- if(stopping.get()) {
+ if (stopping.get()) {
transport.stop
} else {
onFailure(error)
- if( callbackWhenConnected!=null ) {
+ if (callbackWhenConnected != null) {
warn("connect attempt failed. will retry connection..")
- dispatchQueue.dispatchAfter(50, TimeUnit.MILLISECONDS, ^{
- if(stopping.get()) {
+ dispatchQueue.dispatchAfter(50, TimeUnit.MILLISECONDS, ^ {
+ if (stopping.get()) {
callbackWhenConnected.run
} else {
// try to connect again...
transport = TransportFactory.connect(uri)
- super._start(^{ })
+ super._start(^ {})
}
})
}
@@ -504,7 +487,6 @@ abstract class RemoteConsumer extends RemoteConnection {
abstract class RemoteProducer extends RemoteConnection {
-
var messageIdGenerator: AtomicLong = null
var priority = 0
var persistent = false
diff --git a/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/DeepQueueScenarios.scala b/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/DeepQueueScenarios.scala
index 505ab8a..96c9cd7 100644
--- a/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/DeepQueueScenarios.scala
+++ b/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/DeepQueueScenarios.scala
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.apollo.broker.perf
+import java.net.URL
+
/**
* <p>
* </p>
@@ -23,10 +25,11 @@ package org.apache.activemq.apollo.broker.perf
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
trait DeepQueueScenarios extends PersistentScenario {
+ override def reportResourceTemplate(): URL = {classOf[PersistentScenario].getResource("persistent-report.html")}
override def highContention = 100
- for ( count <- partitionedLoad ; messageSize <- messageSizes ) {
+ for (count <- partitionedLoad; messageSize <- messageSizes) {
def benchmark(name: String)(func: => Unit) {
test(name) {
@@ -36,8 +39,8 @@ trait DeepQueueScenarios extends PersistentScenario {
func
}
}
-
- val prefix = "queue " + (if((messageSize%1024)==0) (messageSize/1024)+"k" else messageSize+"b" ) + " "
+
+ val prefix = "queue " + (if ((messageSize % 1024) == 0) (messageSize / 1024) + "k" else messageSize + "b") + " "
val suffix = "" //(if( durable ) " durable" else "")
benchmark(format("%s%d%s", prefix, count, suffix)) {
diff --git a/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala b/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala
index 3574bef..bc9351b 100644
--- a/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala
+++ b/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala
@@ -14,18 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
+
package org.apache.activemq.apollo.broker.perf
-import org.apache.activemq.apollo.broker.{Destination, Broker}
+import org.apache.activemq.apollo.broker.Destination
import tools.nsc.io.Directory
import org.apache.activemq.apollo.util.metric.MetricAggregator
-import org.apache.activemq.apollo.util.{FileSupport, LoggingTracker}
+import org.apache.activemq.apollo.util.FileSupport
trait LargeInitialDB extends PersistentScenario {
-
PURGE_STORE = false
+ MULTI_BROKER = false
var original: Directory = null
var backup: Directory = null;
@@ -42,34 +42,27 @@ trait LargeInitialDB extends PersistentScenario {
override protected def beforeAll(configMap: Map[String, Any]) = {
super.beforeAll(configMap)
- sendBroker = new Broker()
- sendBroker.config = createBrokerConfig("Broker", sendBrokerBindURI, sendBrokerConnectURI)
- val store = sendBroker.config.virtual_hosts.get(0).store
+ initBrokers
original = new Directory(storeDirectory)
- if ( original.exists ) {
+ if (original.exists) {
original.deleteRecursively
- original.createDirectory(true)
+ original.createDirectory(true)
}
- val backupLocation = FileSupport.toDirectory(storeDirectory.getParent)./(FileSupport.toDirectory("backup"))
- backup = backupLocation
+ backup = FileSupport.toDirectory(storeDirectory.getParent)./(FileSupport.toDirectory("backup"))
cleanBackup
println("Using store at " + original + " and backup at " + backup)
- var tracker = new LoggingTracker("initial db broker startup")
- tracker.start(sendBroker)
- tracker.await
+ controlService(true, sendBroker, "initial db broker startup")
PTP = true
val dests: Array[Destination] = createDestinations(1)
- totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items")
- val producer: RemoteProducer = _createProducer(0, 1024, dests(0))
+ totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items")
+ val producer: RemoteProducer = _createProducer(0, 20, dests(0))
producer.persistent = true
- tracker = new LoggingTracker("initial db producer startup")
- tracker.start(producer)
- tracker.await
+ controlService(true, producer, "initial db producer startup")
val messages = 1000000L
@@ -79,14 +72,8 @@ trait LargeInitialDB extends PersistentScenario {
Thread.sleep(5000)
}
- tracker = new LoggingTracker("producer shutdown")
- tracker.stop(producer)
- tracker.await
- tracker = new LoggingTracker("broker shutdown")
- tracker.stop(sendBroker)
- tracker.await
-
- Thread.sleep(10000)
+ controlService(false, producer, "producer shutdown")
+ controlService(false, sendBroker, "broker shutdown")
saveDB
}
diff --git a/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/PersistentScenario.scala b/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/PersistentScenario.scala
index efcfb4e..53789c8 100644
--- a/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/PersistentScenario.scala
+++ b/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/PersistentScenario.scala
@@ -16,7 +16,6 @@
*/
package org.apache.activemq.apollo.broker.perf
-import java.net.URL
import java.io.File
/**
@@ -26,10 +25,9 @@ import java.io.File
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
trait PersistentScenario extends BrokerPerfSupport {
-
var storeDirectory: File = null
- override def reportResourceTemplate():URL = { classOf[PersistentScenario].getResource("persistent-report.html") }
+ //override def reportResourceTemplate():URL = { classOf[PersistentScenario].getResource("persistent-report.html") }
PERSISTENT = true
}
\ No newline at end of file
diff --git a/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala b/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
index b429b07..df7731c 100644
--- a/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
+++ b/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
@@ -27,20 +27,22 @@ class BasicNonPersistentTest extends BasicScenarios with StompScenario {
}
class BasicHawtDBTest extends BasicScenarios with PersistentScenario with HawtDBScenario with StompScenario {
- override def description = "Using the STOMP protocol over TCP"
+ override def description = "Using the STOMP protocol over TCP persistent to the HawtDB store"
}
-class DeepQueueHawtDBTest extends DeepQueueScenarios with LargeInitialDB with HawtDBScenario with StompScenario {
- override def description = "Using the STOMP protocol over TCP persisting to the HawtDB store."
+class DeepQueueHawtDBTest extends BasicScenarios with LargeInitialDB with HawtDBScenario with StompScenario {
+ override def description = "Using the STOMP protocol over TCP persisting to the HawtDB store that contains 1M messages in a queue."
}
-class DeepQueueBDBTest extends DeepQueueScenarios with LargeInitialDB with BDBScenario with StompScenario {
- override def description = "Using the STOMP protocol over TCP persisting to the BerkleyDB store."
+class DeepQueueBDBTest extends BasicScenarios with LargeInitialDB with BDBScenario with StompScenario {
+ override def description = "Using the STOMP protocol over TCP persisting to the BerkleyDB store that contains 1M messages in a queue."
}
trait StompScenario extends BrokerPerfSupport {
override def createProducer() = new StompRemoteProducer()
+
override def createConsumer() = new StompRemoteConsumer()
+
override def getRemoteProtocolName() = "stomp"
}
diff --git a/apollo-util/src/main/scala/org/apache/activemq/apollo/util/ServiceController.scala b/apollo-util/src/main/scala/org/apache/activemq/apollo/util/ServiceController.scala
new file mode 100644
index 0000000..887a433
--- /dev/null
+++ b/apollo-util/src/main/scala/org/apache/activemq/apollo/util/ServiceController.scala
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.util
+
+import collection.mutable.ListBuffer
+
+/*
+ Simple trait to cut down on the code necessary to manage BaseService instances
+ */
+
+trait ServiceController {
+
+ // start or stop a single service
+ def controlService(start: Boolean, service: Service, action: String) = {
+ val tracker = new LoggingTracker(action)
+ if (start) tracker.start(service) else tracker.stop(service)
+ tracker.await
+ }
+
+ // start or stop a bunch of services in one go
+ def controlServices(start: Boolean, services: ListBuffer[Service], action: String) = {
+ val tracker = new LoggingTracker(action)
+ services.foreach((service: Service) => {if (start) tracker.start(service) else tracker.stop(service)})
+ tracker.await
+ }
+
+
+}
\ No newline at end of file