Author: chirino
Date: Wed Oct 20 14:12:54 2010
New Revision: 1025593
URL: http://svn.apache.org/viewvc?rev=1025593&view=rev
Log:
Implemented all the Stomp 1.1 ack modes. Added more stomp test cases (most
features are now exercised)
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1025593&r1=1025592&r2=1025593&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
Wed Oct 20 14:12:54 2010
@@ -190,7 +190,7 @@ class Router(val host:VirtualHost) exten
cb(queues.get(binding))
} >>: dispatchQueue
- def bind(destination:Destination, consumer:DeliveryConsumer) =
retaining(consumer) {
+ def bind(destination:Destination, consumer:DeliveryConsumer,
on_complete:Runnable = ^{} ) = retaining(consumer) {
assert( is_topic(destination) )
@@ -206,6 +206,8 @@ class Router(val host:VirtualHost) exten
)
broadcast_consumers.put(name, consumer)
+ on_complete.run
+
} >>: dispatchQueue
def unbind(destination:Destination, consumer:DeliveryConsumer) =
releasing(consumer) {
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1025593&r1=1025592&r2=1025593&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
Wed Oct 20 14:12:54 2010
@@ -410,14 +410,20 @@ object Stomp {
///////////////////////////////////////////////////////////////////
val TRUE = ascii("true")
val FALSE = ascii("false")
- val AUTO = ascii("auto")
- val CLIENT = ascii("client")
- val INDIVIDUAL = ascii("client-individual")
+
+ val ACK_MODE_AUTO = ascii("auto")
+ val ACK_MODE_NONE = ascii("none")
+
+ val ACK_MODE_CLIENT = ascii("client")
+ val ACK_MODE_SESSION = ascii("session")
+
+ val ACK_MODE_MESSAGE = ascii("message")
+
val V1_0 = ascii("1.0")
val V1_1 = ascii("1.1")
val DEFAULT_HEAT_BEAT = ascii("0,0")
- val SUPPORTED_PROTOCOL_VERSIONS = Set(V1_0,V1_1)
+ val SUPPORTED_PROTOCOL_VERSIONS = List(V1_1, V1_0)
// public enum Transformations {
// JMS_BYTE, JMS_OBJECT_XML, JMS_OBJECT_JSON, JMS_MAP_XML,
JMS_MAP_JSON
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=1025593&r1=1025592&r2=1025593&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
Wed Oct 20 14:12:54 2010
@@ -176,9 +176,105 @@ class StompProtocolHandler extends Proto
protected def dispatchQueue:DispatchQueue = connection.dispatchQueue
- class StompConsumer(val subscription_id:Option[AsciiBuffer], val
destination:Destination, val ackMode:AsciiBuffer, val selector:(AsciiBuffer,
BooleanExpression), val binding:BindingDTO) extends BaseRetained with
DeliveryConsumer {
+
+ trait AckHandler {
+ def track(delivery:Delivery):Unit
+ def perform_ack(msgid: AsciiBuffer, uow:StoreUOW=null):Unit
+ }
+
+ class AutoAckHandler extends AckHandler {
+ def track(delivery:Delivery) = {
+ if( delivery.ack!=null ) {
+ delivery.ack(null)
+ }
+ }
+
+ def perform_ack(msgid: AsciiBuffer, uow:StoreUOW=null) = {
+ die("The subscription ack mode does not expect ACK frames")
+ }
+ }
+
+ class SessionAckHandler extends AckHandler{
+ var consumer_acks = ListBuffer[(AsciiBuffer, (StoreUOW)=>Unit)]()
+
+ def track(delivery:Delivery) = {
+ queue {
+ if( protocol_version eq V1_0 ) {
+ // register on the connection since 1.0 acks may not include the
subscription id
+ connection_ack_handlers += ( delivery.message.id-> this )
+ }
+ consumer_acks += (( delivery.message.id, delivery.ack ))
+ }
+
+ }
+
+
+ def perform_ack(msgid: AsciiBuffer, uow:StoreUOW=null) = {
+
+ // session acks ack all previously recieved messages..
+ var found = false
+ val (acked, not_acked) = consumer_acks.partition{ case (id, ack)=>
+ if( found ) {
+ false
+ } else {
+ if( id == msgid ) {
+ found = true
+ }
+ true
+ }
+ }
+
+ if( acked.isEmpty ) {
+ die("ACK failed, invalid message id: %s".format(msgid))
+ } else {
+ consumer_acks = not_acked
+ acked.foreach{case (id, ack)=>
+ if( ack!=null ) {
+ ack(uow)
+ }
+ }
+ }
+
+ if( protocol_version eq V1_0 ) {
+ connection_ack_handlers.remove(msgid)
+ }
+ }
+
+
+
+ }
+ class MessageAckHandler extends AckHandler {
+ var consumer_acks = HashMap[AsciiBuffer, (StoreUOW)=>Unit]()
+
+ def track(delivery:Delivery) = {
+ queue {
+ if( protocol_version eq V1_0 ) {
+ // register on the connection since 1.0 acks may not include the
subscription id
+ connection_ack_handlers += ( delivery.message.id-> this )
+ }
+ consumer_acks += ( delivery.message.id -> delivery.ack )
+ }
+ }
+
+ def perform_ack(msgid: AsciiBuffer, uow:StoreUOW=null) = {
+ consumer_acks.remove(msgid) match {
+ case Some(ack) =>
+ if( ack!=null ) {
+ ack(uow)
+ }
+ case None => die("ACK failed, invalid message id: %s".format(msgid))
+ }
+
+ if( protocol_version eq V1_0 ) {
+ connection_ack_handlers.remove(msgid)
+ }
+ }
+ }
+
+ class StompConsumer(val subscription_id:Option[AsciiBuffer], val
destination:Destination, val ack_handler:AckHandler, val selector:(AsciiBuffer,
BooleanExpression), val binding:BindingDTO) extends BaseRetained with
DeliveryConsumer {
val dispatchQueue = StompProtocolHandler.this.dispatchQueue
+
dispatchQueue.retain
setDisposer(^{
session_manager.release
@@ -218,18 +314,7 @@ class StompProtocolHandler extends Proto
if( session.full ) {
false
} else {
- if( delivery.ack!=null) {
- if( ackMode eq AUTO ) {
- delivery.ack(null)
- } else {
- // switch the the queue context.. this method is in the
producer's context.
- queue {
- // we need to correlate acks from the client.. to invoke the
- // delivery ack.
- pendingAcks += ( delivery.message.id->delivery.ack )
- }
- }
- }
+ ack_handler.track(delivery)
var frame = delivery.message.asInstanceOf[StompFrameMessage].frame
if( subscription_id != None ) {
frame = frame.append_headers((SUBSCRIPTION,
subscription_id.get)::Nil)
@@ -262,11 +347,12 @@ class StompProtocolHandler extends Proto
var host:VirtualHost = null
private def queue = connection.dispatchQueue
- var pendingAcks = HashMap[AsciiBuffer, (StoreUOW)=>Unit]()
+ // uses by STOMP 1.0 clients
+ var connection_ack_handlers = HashMap[AsciiBuffer, AckHandler]()
- var session_id:Option[AsciiBuffer] = None
- var protocol_version:Option[AsciiBuffer] = None
+ var session_id:AsciiBuffer = _
+ var protocol_version:AsciiBuffer = _
var heart_beat_monitor:HeartBeatMonitor = new HeartBeatMonitor
@@ -313,7 +399,7 @@ class StompProtocolHandler extends Proto
// so we know which wire format is being used.
case frame:StompFrame=>
- if( protocol_version eq None ) {
+ if( protocol_version == null ) {
info("got command: %s", frame)
frame.action match {
@@ -364,9 +450,15 @@ class StompProtocolHandler extends Proto
def on_stomp_connect(headers:HeaderMap):Unit = {
-
- protocol_version = get(headers,
ACCEPT_VERSION).getOrElse(V1_0).split(COMMA).map(_.ascii).reverse.find{v=>
- SUPPORTED_PROTOCOL_VERSIONS.contains(v)
+ val accept_versions = get(headers,
ACCEPT_VERSION).getOrElse(V1_0).split(COMMA).map(_.ascii)
+ protocol_version = SUPPORTED_PROTOCOL_VERSIONS.find( v=>
accept_versions.contains(v) ) match {
+ case Some(x) => x
+ case None=>
+ val supported_versions = SUPPORTED_PROTOCOL_VERSIONS.mkString(",")
+ _die((MESSAGE_HEADER, ascii("version not supported"))::
+ (VERSION, ascii(supported_versions))::Nil,
+ "Supported protocol versions are %s".format(supported_versions))
+ return
}
val heart_beat = get(headers, HEART_BEAT).getOrElse(DEFAULT_HEAT_BEAT)
@@ -406,52 +498,40 @@ class StompProtocolHandler extends Proto
return
}
- protocol_version match {
- case None =>
- val supported_versions = SUPPORTED_PROTOCOL_VERSIONS.mkString(",")
-
- _die((MESSAGE_HEADER, ascii("version not supported"))::
- (VERSION, ascii(supported_versions))::Nil,
- "Supported protocol versions are %s".format(supported_versions))
- return
-
- case Some(x) =>
- connection.transport.suspendRead
-
- val host_header = get(headers, HOST)
- val cb: (VirtualHost)=>Unit = (host)=>
- queue {
- if(host!=null) {
- this.host=host
-
- val outbound_heart_beat_header =
ascii("%d,%d".format(outbound_heartbeat,inbound_heartbeat))
- session_id = Some(ascii(this.host.config.id +
":"+this.host.session_counter.incrementAndGet))
-
- connection_sink.offer(
- StompFrame(CONNECTED, List(
- (VERSION, protocol_version.get),
- (SESSION, session_id.get),
- (HEART_BEAT, outbound_heart_beat_header)
- )))
-
- if( this.host.direct_buffer_pool!=null ) {
- val wf =
connection.transport.getProtocolCodec.asInstanceOf[StompCodec]
- wf.memory_pool = this.host.direct_buffer_pool
- }
- connection.transport.resumeRead
+ connection.transport.suspendRead
- } else {
- die("Invalid virtual host: "+host_header.get)
- }
+ val host_header = get(headers, HOST)
+ val cb: (VirtualHost)=>Unit = (host)=>
+ queue {
+ if(host!=null) {
+ this.host=host
+
+ val outbound_heart_beat_header =
ascii("%d,%d".format(outbound_heartbeat,inbound_heartbeat))
+ session_id = ascii(this.host.config.id +
":"+this.host.session_counter.incrementAndGet)
+
+ connection_sink.offer(
+ StompFrame(CONNECTED, List(
+ (VERSION, protocol_version),
+ (SESSION, session_id),
+ (HEART_BEAT, outbound_heart_beat_header)
+ )))
+
+ if( this.host.direct_buffer_pool!=null ) {
+ val wf =
connection.transport.getProtocolCodec.asInstanceOf[StompCodec]
+ wf.memory_pool = this.host.direct_buffer_pool
}
+ connection.transport.resumeRead
- host_header match {
- case None=>
- connection.connector.broker.getDefaultVirtualHost(cb)
- case Some(host)=>
- connection.connector.broker.getVirtualHost(host, cb)
+ } else {
+ die("Invalid virtual host: "+host_header.get)
}
+ }
+ host_header match {
+ case None=>
+ connection.connector.broker.getDefaultVirtualHost(cb)
+ case Some(host)=>
+ connection.connector.broker.getVirtualHost(host, cb)
}
}
@@ -485,7 +565,7 @@ class StompProtocolHandler extends Proto
perform_send(frame)
case Some(txid)=>
get_or_create_tx_queue(txid){ txqueue=>
- txqueue.add(frame)
+ txqueue.add(frame, (uow)=>{perform_send(frame, uow)} )
}
}
@@ -580,7 +660,8 @@ class StompProtocolHandler extends Proto
frame.release
}
- def on_stomp_subscribe(headers:HeaderMap) = {
+ def on_stomp_subscribe(headers:HeaderMap):Unit = {
+ val receipt = get(headers, RECEIPT_REQUESTED)
get(headers, DESTINATION) match {
case Some(dest)=>
@@ -588,13 +669,13 @@ class StompProtocolHandler extends Proto
val subscription_id = get(headers, ID)
var id:AsciiBuffer = subscription_id match {
case None =>
- if( protocol_version.get == V1_0 )
+ if( protocol_version eq V1_0 )
// in 1.0 it's ok if the client does not send us the
// the id header
dest
else
die("The id header is missing from the SUBSCRIBE frame");
- null
+ return
case Some(x:AsciiBuffer)=> x
}
@@ -607,11 +688,14 @@ class StompProtocolHandler extends Proto
null
}
- val ack:AsciiBuffer = get(headers, ACK_MODE) match {
- case None=> AUTO
+ val ack = get(headers, ACK_MODE) match {
+ case None=> new AutoAckHandler
case Some(x)=> x match {
- case AUTO=>AUTO
- case CLIENT=> CLIENT
+ case ACK_MODE_AUTO=>new AutoAckHandler
+ case ACK_MODE_NONE=>new AutoAckHandler
+ case ACK_MODE_CLIENT=> new SessionAckHandler
+ case ACK_MODE_SESSION=> new SessionAckHandler
+ case ACK_MODE_MESSAGE=> new MessageAckHandler
case ack:AsciiBuffer => die("Unsuported ack mode: "+ack); null
}
}
@@ -660,7 +744,11 @@ class StompProtocolHandler extends Proto
if( binding==null ) {
// consumer is bind bound as a topic
- host.router.bind(destination, consumer)
+ host.router.bind(destination, consumer, ^{
+ receipt.foreach{ receipt =>
+ connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID,
receipt))))
+ }
+ })
consumer.release
} else {
@@ -670,6 +758,9 @@ class StompProtocolHandler extends Proto
x match {
case Some(queue:Queue) =>
queue.bind(consumer::Nil)
+ receipt.foreach{ receipt =>
+ connection_sink.offer(StompFrame(RECEIPT,
List((RECEIPT_ID, receipt))))
+ }
consumer.release
case None => throw new RuntimeException("case not yet
implemented.")
}
@@ -687,36 +778,55 @@ class StompProtocolHandler extends Proto
}
- def on_stomp_ack(frame:StompFrame) = {
+ def on_stomp_ack(frame:StompFrame):Unit = {
val headers = frame.headers
get(headers, MESSAGE_ID) match {
case Some(messageId)=>
- pendingAcks.get(messageId) match {
- case Some(ack) =>
- get(headers, TRANSACTION) match {
+
+ val subscription_id = get(headers, SUBSCRIPTION);
+ if( subscription_id == None && !(protocol_version eq V1_0) ) {
+ die("The subscription header is required")
+ return
+ }
+
+ val handler = subscription_id match {
+ case None=>
+
+ connection_ack_handlers.get(messageId) match {
+ case None =>
+ die("Not expecting ack for message id '%s'".format(messageId))
+ None
+ case Some(handler) =>
+ Some(handler)
+ }
+
+ case Some(id) =>
+ consumers.get(id) match {
case None=>
- perform_ack(frame)
- case Some(txid)=>
- get_or_create_tx_queue(txid){ txqueue=>
- txqueue.add(frame)
- }
+ die("The subscription '%s' does not exist".format(id))
+ None
+ case Some(consumer)=>
+ Some(consumer.ack_handler)
}
+ }
+ handler.foreach{ handler=>
+
+ get(headers, TRANSACTION) match {
+ case None=>
+ handler.perform_ack(messageId, null)
+ case Some(txid)=>
+ get_or_create_tx_queue(txid){ _.add(frame, (uow)=>{
handler.perform_ack(messageId, uow)} ) }
+ }
+
+ get(headers, RECEIPT_REQUESTED).foreach { receipt =>
+ connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID,
receipt))))
+ }
- case None =>
- // This can easily happen if the consumer is doing client acks on
something like
- // a non-durable topic.
- // trace("The specified message id is not waiting for a client
ack: %s", messageId)
}
- case None=> die("message id header not set")
- }
- }
- def perform_ack(frame: StompFrame, uow:StoreUOW=null) = {
- val msgid = get(frame.headers, MESSAGE_ID).get
- pendingAcks.remove(msgid) match {
- case Some(ack) => ack(uow)
- case None => die("message allready acked: %s".format(msgid))
+
+ case None=> die("message id header not set")
}
}
@@ -779,10 +889,10 @@ class StompProtocolHandler extends Proto
// TODO: eventually we want to back this /w a broker Queue which
// can provides persistence and memory swapping.
- val queue = ListBuffer[StompFrame]()
+ val queue = ListBuffer[(StompFrame, (StoreUOW)=>Unit)]()
- def add(frame:StompFrame) = {
- queue += frame
+ def add(frame:StompFrame, proc:(StoreUOW)=>Unit) = {
+ queue += ( frame->proc )
}
def commit(onComplete: => Unit) = {
@@ -793,14 +903,15 @@ class StompProtocolHandler extends Proto
null
}
- queue.foreach { frame=>
- frame.action match {
- case SEND =>
- perform_send(frame, uow)
- case ACK =>
- perform_ack(frame, uow)
- case _ => throw new java.lang.AssertionError("assertion failed: only
send or ack frames are transactional")
- }
+ queue.foreach { case (frame, proc) =>
+ proc(uow)
+// frame.action match {
+// case SEND =>
+// perform_send(frame, uow)
+// case ACK =>
+// perform_ack(frame, uow)
+// case _ => throw new java.lang.AssertionError("assertion failed:
only send or ack frames are transactional")
+// }
}
if( uow!=null ) {
uow.onComplete(^{
@@ -826,7 +937,9 @@ class StompProtocolHandler extends Proto
if ( transactions.contains(txid) ) {
die("transaction allready started")
} else {
- proc( transactions.put(txid, new TransactionQueue).get )
+ val queue = new TransactionQueue
+ transactions.put(txid, queue)
+ proc( queue )
}
}
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala?rev=1025593&r1=1025592&r2=1025593&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala
Wed Oct 20 14:12:54 2010
@@ -45,14 +45,14 @@ import java.io._
socket.close
}
- def send(frame:String) = {
+ def write(frame:String) = {
out.write(frame.getBytes("UTF-8"))
out.write(0)
out.write('\n')
out.flush
}
- def send(frame:Array[Byte]) = {
+ def write(frame:Array[Byte]) = {
out.write(frame)
out.write(0)
out.write('\n')
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1025593&r1=1025592&r2=1025593&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Wed Oct 20 14:12:54 2010
@@ -19,8 +19,9 @@ package org.apache.activemq.apollo.stomp
import org.scalatest.matchers.ShouldMatchers
import org.apache.activemq.apollo.util.FunSuiteSupport
import org.apache.activemq.apollo.broker.{Broker, BrokerFactory}
+import org.scalatest.BeforeAndAfterEach
-class StompTestSupport extends FunSuiteSupport with ShouldMatchers {
+class StompTestSupport extends FunSuiteSupport with ShouldMatchers with
BeforeAndAfterEach {
var broker: Broker = null
override protected def beforeAll() = {
@@ -30,26 +31,51 @@ class StompTestSupport extends FunSuiteS
Thread.sleep(1000); //TODO implement waitUntilStarted
}
+ var client = new StompClient
+
override protected def afterAll() = {
broker.stop
}
+ override protected def afterEach() = {
+ super.afterEach
+ client.close
+ }
+
+ def connect(version:String, c: StompClient = client) = {
+ c.open("localhost", 61613)
+ version match {
+ case "1.0"=>
+ c.write(
+ "CONNECT\n" +
+ "\n")
+ case "1.1"=>
+ c.write(
+ "CONNECT\n" +
+ "accept-version:1.1\n" +
+ "host:default\n" +
+ "\n")
+ case x=> throw new RuntimeException("invalid version: %f".format(x))
+ }
+ val frame = c.receive()
+ frame should startWith("CONNECTED\n")
+ frame should include regex("""session:.+?\n""")
+ frame should include("version:"+version+"\n")
+ c
+ }
+
+ def wait_for_receipt(id:String, c: StompClient = client): Unit = {
+ val frame = c.receive()
+ frame should startWith("RECEIPT\n")
+ frame should include("receipt-id:"+id+"\n")
+ }
}
class Stomp10ConnectTest extends StompTestSupport {
test("Stomp 1.0 CONNECT") {
- val client = new StompClient
- client.open("localhost", 61613)
-
- client.send(
- "CONNECT\n" +
- "\n")
- val frame = client.receive()
- frame should startWith("CONNECTED\n")
- frame should include regex("""session:.+?\n""")
- frame should include("version:1.0\n")
+ connect("1.0")
}
}
@@ -57,25 +83,14 @@ class Stomp10ConnectTest extends StompTe
class Stomp11ConnectTest extends StompTestSupport {
test("Stomp 1.1 CONNECT") {
- val client = new StompClient
- client.open("localhost", 61613)
-
- client.send(
- "CONNECT\n" +
- "accept-version:1.0,1.1\n" +
- "host:default\n" +
- "\n")
- val frame = client.receive()
- frame should startWith("CONNECTED\n")
- frame should include regex("""session:.+?\n""")
- frame should include("version:1.1\n")
+ connect("1.1")
}
test("Stomp 1.1 CONNECT /w STOMP Action") {
- val client = new StompClient
+
client.open("localhost", 61613)
- client.send(
+ client.write(
"STOMP\n" +
"accept-version:1.0,1.1\n" +
"host:default\n" +
@@ -87,10 +102,10 @@ class Stomp11ConnectTest extends StompTe
}
test("Stomp 1.1 CONNECT /w valid version fallback") {
- val client = new StompClient
+
client.open("localhost", 61613)
- client.send(
+ client.write(
"CONNECT\n" +
"accept-version:1.0,10.0\n" +
"host:default\n" +
@@ -102,10 +117,10 @@ class Stomp11ConnectTest extends StompTe
}
test("Stomp 1.1 CONNECT /w invalid version fallback") {
- val client = new StompClient
+
client.open("localhost", 61613)
- client.send(
+ client.write(
"CONNECT\n" +
"accept-version:9.0,10.0\n" +
"host:default\n" +
@@ -117,10 +132,10 @@ class Stomp11ConnectTest extends StompTe
}
test("Stomp CONNECT /w invalid virtual host") {
- val client = new StompClient
+
client.open("localhost", 61613)
- client.send(
+ client.write(
"CONNECT\n" +
"accept-version:1.0,1.1\n" +
"host:invalid\n" +
@@ -135,10 +150,10 @@ class Stomp11ConnectTest extends StompTe
class Stomp11HeartBeatTest extends StompTestSupport {
test("Stomp 1.1 Broker sends heart-beat") {
- val client = new StompClient
+
client.open("localhost", 61613)
- client.send(
+ client.write(
"CONNECT\n" +
"accept-version:1.1\n" +
"host:default\n" +
@@ -164,10 +179,10 @@ class Stomp11HeartBeatTest extends Stomp
test("Stomp 1.1 Broker times out idle connection") {
StompProtocolHandler.inbound_heartbeat = 1000L
try {
- val client = new StompClient
+
client.open("localhost", 61613)
- client.send(
+ client.write(
"CONNECT\n" +
"accept-version:1.1\n" +
"host:default\n" +
@@ -192,4 +207,421 @@ class Stomp11HeartBeatTest extends Stomp
}
}
+}
+
+class StompDestinationTest extends StompTestSupport {
+
+ test("Queue order preserved") {
+ connect("1.1")
+
+ def put(id:Int) = {
+ client.write(
+ "SEND\n" +
+ "destination:/queue/example\n" +
+ "\n" +
+ "message:"+id+"\n")
+ }
+ put(1)
+ put(2)
+ put(3)
+
+ client.write(
+ "SUBSCRIBE\n" +
+ "destination:/queue/example\n" +
+ "id:0\n" +
+ "\n")
+
+ def get(id:Int) = {
+ val frame = client.receive()
+ println(frame)
+ frame should startWith("MESSAGE\n")
+ frame should include ("subscription:0\n")
+ frame should endWith regex("\n\nmessage:"+id+"\n")
+ }
+ get(1)
+ get(2)
+ get(3)
+ }
+
+ test("Topic drops messages sent before before subscription is established") {
+ connect("1.1")
+
+ def put(id:Int) = {
+ client.write(
+ "SEND\n" +
+ "destination:/topic/updates\n" +
+ "\n" +
+ "message:"+id+"\n")
+ }
+ put(1)
+
+ client.write(
+ "SUBSCRIBE\n" +
+ "destination:/topic/updates\n" +
+ "id:0\n" +
+ "receipt:0\n" +
+ "\n")
+ wait_for_receipt("0")
+
+ put(2)
+ put(3)
+
+ def get(id:Int) = {
+ val frame = client.receive()
+ println(frame)
+ frame should startWith("MESSAGE\n")
+ frame should include ("subscription:0\n")
+ frame should endWith regex("\n\nmessage:"+id+"\n")
+ }
+
+ // note that the put(1) message gets dropped.
+ get(2)
+ get(3)
+ }
+
+ test("Topic /w Durable sub retains messages.") {
+ connect("1.1")
+
+ def put(id:Int) = {
+ client.write(
+ "SEND\n" +
+ "destination:/topic/updates\n" +
+ "\n" +
+ "message:"+id+"\n")
+ }
+
+ client.write(
+ "SUBSCRIBE\n" +
+ "destination:/topic/updates\n" +
+ "id:durable:my-sub-name\n" +
+ "receipt:0\n" +
+ "\n")
+ wait_for_receipt("0")
+ client.close
+
+ // Close him out.. since his id started /w durable: then
+ // the topic subscription will be persistent accross client
+ // connections.
+
+ connect("1.1")
+ put(1)
+ put(2)
+ put(3)
+
+ client.write(
+ "SUBSCRIBE\n" +
+ "destination:/topic/updates\n" +
+ "id:durable:my-sub-name\n" +
+ "\n")
+
+ def get(id:Int) = {
+ val frame = client.receive()
+ println(frame)
+ frame should startWith("MESSAGE\n")
+ frame should include ("subscription:durable:my-sub-name\n")
+ frame should endWith regex("\n\nmessage:"+id+"\n")
+ }
+
+ get(1)
+ get(2)
+ get(3)
+ }
+
+ test("Queue and a selector") {
+ connect("1.1")
+
+ def put(id:Int, color:String) = {
+ client.write(
+ "SEND\n" +
+ "destination:/queue/selected\n" +
+ "color:"+color+"\n" +
+ "\n" +
+ "message:"+id+"\n")
+ }
+ put(1, "red")
+ put(2, "blue")
+ put(3, "red")
+
+ client.write(
+ "SUBSCRIBE\n" +
+ "destination:/queue/selected\n" +
+ "selector:color='red'\n" +
+ "id:0\n" +
+ "\n")
+
+ def get(id:Int) = {
+ val frame = client.receive()
+ println(frame)
+ frame should startWith("MESSAGE\n")
+ frame should endWith regex("\n\nmessage:"+id+"\n")
+ }
+ get(1)
+ get(3)
+ }
+
+ test("Topic and a selector") {
+ connect("1.1")
+
+ def put(id:Int, color:String) = {
+ client.write(
+ "SEND\n" +
+ "destination:/topic/selected\n" +
+ "color:"+color+"\n" +
+ "\n" +
+ "message:"+id+"\n")
+ }
+
+ client.write(
+ "SUBSCRIBE\n" +
+ "destination:/topic/selected\n" +
+ "selector:color='red'\n" +
+ "id:0\n" +
+ "receipt:0\n" +
+ "\n")
+ wait_for_receipt("0")
+
+ put(1, "red")
+ put(2, "blue")
+ put(3, "red")
+
+ def get(id:Int) = {
+ val frame = client.receive()
+ println(frame)
+ frame should startWith("MESSAGE\n")
+ frame should endWith regex("\n\nmessage:"+id+"\n")
+ }
+ get(1)
+ get(3)
+ }
+}
+
+class StompTransactionTest extends StompTestSupport {
+
+
+ test("Queue and a transacted send") {
+ connect("1.1")
+
+ def put(id:Int, tx:String=null) = {
+ client.write(
+ "SEND\n" +
+ "destination:/queue/transacted\n" +
+ { if(tx!=null) { "transaction:"+tx+"\n" } else { "" } }+
+ "\n" +
+ "message:"+id+"\n")
+ }
+
+ put(1)
+ client.write(
+ "BEGIN\n" +
+ "transaction:x\n" +
+ "\n")
+ put(2, "x")
+ put(3)
+
+ client.write(
+ "SUBSCRIBE\n" +
+ "destination:/queue/transacted\n" +
+ "id:0\n" +
+ "\n")
+
+ def get(id:Int) = {
+ val frame = client.receive()
+ println(frame)
+ frame should startWith("MESSAGE\n")
+ frame should endWith regex("\n\nmessage:"+id+"\n")
+ }
+ get(1)
+ get(3)
+
+ client.write(
+ "COMMIT\n" +
+ "transaction:x\n" +
+ "\n")
+
+ get(2)
+
+ }
+
+ test("Topic and a transacted send") {
+ connect("1.1")
+
+ def put(id:Int, tx:String=null) = {
+ client.write(
+ "SEND\n" +
+ "destination:/topic/transacted\n" +
+ { if(tx!=null) { "transaction:"+tx+"\n" } else { "" } }+
+ "\n" +
+ "message:"+id+"\n")
+ }
+
+ client.write(
+ "SUBSCRIBE\n" +
+ "destination:/topic/transacted\n" +
+ "id:0\n" +
+ "receipt:0\n" +
+ "\n")
+ wait_for_receipt("0")
+
+ put(1)
+ client.write(
+ "BEGIN\n" +
+ "transaction:x\n" +
+ "\n")
+ put(2, "x")
+ put(3)
+
+ def get(id:Int) = {
+ val frame = client.receive()
+ println(frame)
+ frame should startWith("MESSAGE\n")
+ frame should endWith regex("\n\nmessage:"+id+"\n")
+ }
+
+ get(1)
+ get(3)
+
+ client.write(
+ "COMMIT\n" +
+ "transaction:x\n" +
+ "\n")
+
+ get(2)
+
+ }
+
+}
+
+
+class StompAckModeTest extends StompTestSupport {
+
+ test("ack:session redelivers on client disconnect") {
+ connect("1.1")
+
+ def put(id:Int) = {
+ client.write(
+ "SEND\n" +
+ "destination:/queue/ackmode-session\n" +
+ "\n" +
+ "message:"+id+"\n")
+ }
+ put(1)
+ put(2)
+ put(3)
+
+ client.write(
+ "SUBSCRIBE\n" +
+ "destination:/queue/ackmode-session\n" +
+ "ack:session\n" +
+ "id:0\n" +
+ "\n")
+
+ def get(id:Int) = {
+ val frame = client.receive()
+ println(frame)
+ frame should startWith("MESSAGE\n")
+ frame should include ("subscription:0\n")
+ frame should include regex("message-id:.+?\n")
+ frame should endWith regex("\n\nmessage:"+id+"\n")
+
+ val p = """(?s).*?\nmessage-id:(.+?)\n.*""".r
+ frame match {
+ case p(x) => x
+ case _=> null
+ }
+ }
+
+ get(1)
+ val mid = get(2)
+ get(3)
+
+ // Ack the first 2 messages..
+ client.write(
+ "ACK\n" +
+ "subscription:0\n" +
+ "message-id:"+mid+"\n" +
+ "receipt:0\n"+
+ "\n")
+
+ wait_for_receipt("0")
+ client.close
+
+ connect("1.1")
+
+ client.write(
+ "SUBSCRIBE\n" +
+ "destination:/queue/ackmode-session\n" +
+ "ack:session\n" +
+ "id:0\n" +
+ "\n")
+ get(3)
+
+
+ }
+
+
+ test("ack:message redelivers on client disconnect") {
+ connect("1.1")
+
+ def put(id:Int) = {
+ client.write(
+ "SEND\n" +
+ "destination:/queue/ackmode-message\n" +
+ "\n" +
+ "message:"+id+"\n")
+ }
+ put(1)
+ put(2)
+ put(3)
+
+ client.write(
+ "SUBSCRIBE\n" +
+ "destination:/queue/ackmode-message\n" +
+ "ack:message\n" +
+ "id:0\n" +
+ "\n")
+
+ def get(id:Int) = {
+ val frame = client.receive()
+ println(frame)
+ frame should startWith("MESSAGE\n")
+ frame should include ("subscription:0\n")
+ frame should include regex("message-id:.+?\n")
+ frame should endWith regex("\n\nmessage:"+id+"\n")
+
+ val p = """(?s).*?\nmessage-id:(.+?)\n.*""".r
+ frame match {
+ case p(x) => x
+ case _=> null
+ }
+ }
+
+ get(1)
+ val mid = get(2)
+ get(3)
+
+ // Ack the first 2 messages..
+ client.write(
+ "ACK\n" +
+ "subscription:0\n" +
+ "message-id:"+mid+"\n" +
+ "receipt:0\n"+
+ "\n")
+
+ wait_for_receipt("0")
+ client.close
+
+ connect("1.1")
+
+ client.write(
+ "SUBSCRIBE\n" +
+ "destination:/queue/ackmode-message\n" +
+ "ack:session\n" +
+ "id:0\n" +
+ "\n")
+ get(1)
+ get(3)
+
+
+ }
+
}
\ No newline at end of file
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala?rev=1025593&r1=1025592&r2=1025593&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
Wed Oct 20 14:12:54 2010
@@ -186,7 +186,7 @@ object StompLoadClient {
try {
val connectUri = new URI(uri)
client.open(connectUri.getHost(), connectUri.getPort())
- client.send("""CONNECT
+ client.write("""CONNECT
""")
client.receive("CONNECTED")
@@ -235,7 +235,7 @@ object StompLoadClient {
this.client=client
var i =0
while (!done.get) {
- client.send(content)
+ client.write(content)
if( syncSend ) {
// waits for the reply..
client.skip
@@ -272,7 +272,7 @@ object StompLoadClient {
while (!done.get) {
connect {
val headers = Map[AsciiBuffer, AsciiBuffer]()
- client.send(
+ client.write(
"SUBSCRIBE\n" +
(if(!durable) {""} else {"id:durable:mysub-"+id+"\n"}) +
(if(selector==null) {""} else {"selector: "+selector+"\n"}) +
@@ -294,7 +294,7 @@ object StompLoadClient {
assert( start >= 0 )
val end = msg.indexOf("\n", start)
val msgId = msg.slice(start+MESSAGE_ID.length+1, end).ascii
- client.send("""
+ client.write("""
ACK
message-id:"""+msgId+"""
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala?rev=1025593&r1=1025592&r2=1025593&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
Wed Oct 20 14:12:54 2010
@@ -52,7 +52,7 @@ class StompRemoteConsumer extends Remote
headers ::= (ID, ascii("stomp-sub-" + name))
if( persistent ) {
- headers ::= (ACK_MODE, CLIENT)
+ headers ::= (ACK_MODE, ACK_MODE_CLIENT)
}
frame = StompFrame(SUBSCRIBE, headers);