---
.../apollo/stomp/perf/StompRemoteClients.scala | 65 +++++++++++++------
1 files changed, 44 insertions(+), 21 deletions(-)
diff --git a/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala b/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
index c5fce11..322c1ae 100644
--- a/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
+++ b/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
@@ -32,7 +32,6 @@ import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
class StompRemoteConsumer extends RemoteConsumer with Logging {
var outboundSink: OverflowSink[StompFrame] = null
- var messageCount = 0
def watchdog(lastMessageCount: Int) : Unit = {
val seconds = 10
@@ -56,18 +55,18 @@ class StompRemoteConsumer extends RemoteConsumer with Logging {
ascii("/topic/" + destination.getName().toString());
}
- var frame = StompFrame(Stomp.Commands.CONNECT);
+ var frame = StompFrame(CONNECT);
outboundSink.offer(frame);
var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
- headers ::= (Stomp.Headers.Subscribe.DESTINATION, stompDestination)
- headers ::= (Stomp.Headers.Subscribe.ID, ascii("stomp-sub-" + name))
+ headers ::= (DESTINATION, stompDestination)
+ headers ::= (ID, ascii("stomp-sub-" + name))
if( persistent ) {
- headers ::= (Stomp.Headers.Subscribe.ACK_MODE, Stomp.Headers.Subscribe.AckModeValues.CLIENT)
+ headers ::= (ACK_MODE, CLIENT)
}
- frame = StompFrame(Stomp.Commands.SUBSCRIBE, headers);
+ frame = StompFrame(SUBSCRIBE, headers);
outboundSink.offer(frame);
watchdog(messageCount)
}
@@ -75,17 +74,17 @@ class StompRemoteConsumer extends RemoteConsumer with Logging {
override def onTransportCommand(command: Object) = {
var frame = command.asInstanceOf[StompFrame]
frame match {
- case StompFrame(Responses.CONNECTED, headers, _, _) =>
- case StompFrame(Responses.MESSAGE, headers, content, _) =>
+ case StompFrame(CONNECTED, headers, _, _) =>
+ case StompFrame(MESSAGE, headers, content, _) =>
messageReceived();
// we client ack if persistent messages are being used.
if( persistent ) {
- var rc = List((Stomp.Headers.Ack.MESSAGE_ID, frame.header(Stomp.Headers.Message.MESSAGE_ID)))
- outboundSink.offer(StompFrame(Stomp.Commands.ACK, rc));
+ var rc = List((MESSAGE_ID, frame.header(MESSAGE_ID)))
+ outboundSink.offer(StompFrame(ACK, rc));
}
- case StompFrame(Responses.ERROR, headers, content, _) =>
+ case StompFrame(ERROR, headers, content, _) =>
onFailure(new Exception("Server reported an error: " + frame.content));
case _ =>
onFailure(new Exception("Unexpected stomp command: " + frame.action));
@@ -96,20 +95,17 @@ class StompRemoteConsumer extends RemoteConsumer with Logging {
if (thinkTime > 0) {
transport.suspendRead
dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, ^ {
- messageCount += 1
rate.increment();
if (!stopped) {
transport.resumeRead
}
})
} else {
- messageCount += 1
rate.increment
}
}
}
-
class StompRemoteProducer extends RemoteProducer with Logging {
var outboundSink: OverflowSink[StompFrame] = null
var stompDestination: AsciiBuffer = null
@@ -117,12 +113,12 @@ class StompRemoteProducer extends RemoteProducer with Logging {
def send_next: Unit = {
var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
- headers ::= (Stomp.Headers.Send.DESTINATION, stompDestination);
+ headers ::= (DESTINATION, stompDestination);
if (property != null) {
headers ::= (ascii(property), ascii(property));
}
if( persistent ) {
- headers ::= ((Stomp.Headers.RECEIPT_REQUESTED, ascii("x")));
+ headers ::= ((RECEIPT_REQUESTED, ascii("x")));
}
// var p = this.priority;
// if (priorityMod > 0) {
@@ -130,7 +126,7 @@ class StompRemoteProducer extends RemoteProducer with Logging {
// }
var content = ascii(createPayload());
- frame = StompFrame(Stomp.Commands.SEND, headers, BufferContent(content))
+ frame = StompFrame(SEND, headers, BufferContent(content))
drain()
}
@@ -168,20 +164,20 @@ class StompRemoteProducer extends RemoteProducer with Logging {
} else {
stompDestination = ascii("/topic/" + destination.getName().toString());
}
- outboundSink.offer(StompFrame(Stomp.Commands.CONNECT));
+ outboundSink.offer(StompFrame(CONNECT));
send_next
}
override def onTransportCommand(command: Object) = {
var frame = command.asInstanceOf[StompFrame]
frame match {
- case StompFrame(Responses.RECEIPT, headers, _, _) =>
+ case StompFrame(RECEIPT, headers, _, _) =>
assert( persistent )
// we got the ack for the previous message we sent.. now send the next one.
send_next
- case StompFrame(Responses.CONNECTED, headers, _, _) =>
- case StompFrame(Responses.ERROR, headers, content, _) =>
+ case StompFrame(CONNECTED, headers, _, _) =>
+ case StompFrame(ERROR, headers, content, _) =>
onFailure(new Exception("Server reported an error: " + frame.content.utf8));
case _ =>
onFailure(new Exception("Unexpected stomp command: " + frame.action));
@@ -189,3 +185,30 @@ class StompRemoteProducer extends RemoteProducer with Logging {
}
}
+trait Watchog extends RemoteConsumer {
+ var messageCount = 0
+
+ def watchdog(lastMessageCount: Int): Unit = {
+ val seconds = 10
+ dispatchQueue.dispatchAfter(seconds, TimeUnit.SECONDS, ^ {
+ if (messageCount == lastMessageCount) {
+ warn("Messages have stopped arriving after " + seconds + "s, stopping consumer")
+ stop
+ } else {
+ watchdog(messageCount)
+ }
+ })
+ }
+
+ abstract override protected def messageReceived() = {
+ super.messageReceived
+ messageCount += 1
+ }
+
+ abstract override protected def onConnected() = {
+ super.onConnected
+ watchdog(messageCount)
+ }
+
+}
+