Author: chirino
Date: Fri Oct 15 16:27:30 2010
New Revision: 1023000
URL: http://svn.apache.org/viewvc?rev=1023000&view=rev
Log:
- Consolidated the Stomp constants
- Started Stomp 1.1 implementation
Removed:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/Stomp.java
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=1023000&r1=1022999&r2=1023000&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
Fri Oct 15 16:27:30 2010
@@ -30,6 +30,7 @@ import path.PathFilter
import ReporterLevel._
import org.fusesource.hawtbuf.{Buffer, AsciiBuffer}
import collection.JavaConversions
+import java.util.concurrent.atomic.AtomicLong
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -93,6 +94,8 @@ class VirtualHost(val broker: Broker, va
var transactionManager:TransactionManagerX = new TransactionManagerX
val queue_id_counter = new LongCounter
+ val session_counter = new AtomicLong(0)
+
override def toString = if (config==null) "virtual-host" else "virtual-host:
"+config.id
/**
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1023000&r1=1022999&r2=1023000&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
Fri Oct 15 16:27:30 2010
@@ -21,11 +21,9 @@ import _root_.org.apache.activemq.apollo
import java.nio.ByteBuffer
import collection.mutable.{ListBuffer, HashMap}
import Stomp._
-import Stomp.Headers._
import BufferConversions._
import _root_.scala.collection.JavaConversions._
-import StompFrameConstants._
import java.io.{EOFException, DataOutput, DataInput, IOException}
import java.nio.channels.{SocketChannel, WritableByteChannel,
ReadableByteChannel}
import org.apache.activemq.apollo.transport._
@@ -48,7 +46,7 @@ object StompCodec extends Log {
val frame = message.frame
val rc = new MessageRecord
- rc.protocol = StompConstants.PROTOCOL
+ rc.protocol = PROTOCOL
rc.size = frame.size
rc.expiration = message.expiration
@@ -465,7 +463,7 @@ class StompCodec extends ProtocolCodec w
// lets try to keep the content of big message outside of the JVM's
garbage collection
// to keep the number of GCs down when moving big messages.
- def is_message = action == Commands.SEND || action ==
Responses.MESSAGE
+ def is_message = action == SEND || action == MESSAGE
if( length > 1024 && memory_pool!=null && is_message) {
val ma = memory_pool.alloc(length+1)
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1023000&r1=1022999&r2=1023000&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
Fri Oct 15 16:27:30 2010
@@ -30,17 +30,9 @@ import java.io.{OutputStream, DataOutput
*
* @author <a href="http://hiramchirino.com">chirino</a>
*/
-object StompFrameConstants {
- type HeaderMap = List[(AsciiBuffer, AsciiBuffer)]
- type HeaderMapBuffer = ListBuffer[(AsciiBuffer, AsciiBuffer)]
- val NO_DATA = new Buffer(0);
-
-}
-
-import StompFrameConstants._
-import StompConstants._;
import BufferConversions._
import Buffer._
+import Stomp._
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -82,15 +74,15 @@ case class StompFrameMessage(frame:Stomp
for( header <- (frame.updated_headers ::: frame.headers).reverse ) {
header match {
- case (Stomp.Headers.Message.MESSAGE_ID, value) =>
+ case (MESSAGE_ID, value) =>
id = value
- case (Stomp.Headers.Send.PRIORITY, value) =>
+ case (PRIORITY, value) =>
priority = java.lang.Integer.parseInt(value).toByte
- case (Stomp.Headers.Send.DESTINATION, value) =>
+ case (DESTINATION, value) =>
destination = value
- case (Stomp.Headers.Send.EXPIRATION_TIME, value) =>
+ case (EXPIRATION_TIME, value) =>
expiration = java.lang.Long.parseLong(value)
- case (Stomp.Headers.Send.PERSISTENT, value) =>
+ case (PERSISTENT, value) =>
persistent = java.lang.Boolean.parseBoolean(value)
case _ =>
}
@@ -301,6 +293,139 @@ case class StompFrame(action:AsciiBuffer
).map(_._2).getOrElse(null)
}
+ def append_headers(value:HeaderMap) = StompFrame(action, headers, content,
value ::: updated_headers)
+
def retain = content.retain
def release = content.release
}
+
+object Stomp {
+
+ val PROTOCOL = "stomp"
+ val DURABLE_PREFIX = ascii("durable:")
+ val DURABLE_QUEUE_KIND = ascii("stomp:sub")
+
+ val options = new ParserOptions
+ options.queuePrefix = ascii("/queue/")
+ options.topicPrefix = ascii("/topic/")
+
+ options.defaultDomain = Router.QUEUE_DOMAIN
+
+ implicit def toDestination(value:AsciiBuffer):Destination = {
+ val d = DestinationParser.parse(value, options)
+ if( d==null ) {
+ throw new ProtocolException("Invalid stomp destiantion name: "+value);
+ }
+ d
+ }
+
+ type HeaderMap = List[(AsciiBuffer, AsciiBuffer)]
+ type HeaderMapBuffer = ListBuffer[(AsciiBuffer, AsciiBuffer)]
+ val NO_DATA = new Buffer(0);
+
+ ///////////////////////////////////////////////////////////////////
+ // Framing
+ ///////////////////////////////////////////////////////////////////
+
+ val EMPTY_BUFFER = new Buffer(0)
+ val NULL: Byte = 0
+ val NULL_BUFFER = new Buffer(Array(NULL))
+ val NEWLINE: Byte = '\n'
+ val COMMA: Byte = ','
+ val NEWLINE_BUFFER = new Buffer(Array(NEWLINE))
+ val END_OF_FRAME_BUFFER = new Buffer(Array(NULL, NEWLINE))
+ val SEPERATOR: Byte = ':'
+ val SEPERATOR_BUFFER = new Buffer(Array(SEPERATOR))
+
+ ///////////////////////////////////////////////////////////////////
+ // Frame Commands
+ ///////////////////////////////////////////////////////////////////
+ val STOMP = ascii("STOMP")
+ val CONNECT = ascii("CONNECT")
+ val SEND = ascii("SEND")
+ val DISCONNECT = ascii("DISCONNECT")
+ val SUBSCRIBE = ascii("SUBSCRIBE")
+ val UNSUBSCRIBE = ascii("UNSUBSCRIBE")
+
+ val BEGIN_TRANSACTION = ascii("BEGIN")
+ val COMMIT_TRANSACTION = ascii("COMMIT")
+ val ABORT_TRANSACTION = ascii("ABORT")
+ val BEGIN = ascii("BEGIN")
+ val COMMIT = ascii("COMMIT")
+ val ABORT = ascii("ABORT")
+ val ACK = ascii("ACK")
+
+ ///////////////////////////////////////////////////////////////////
+ // Frame Responses
+ ///////////////////////////////////////////////////////////////////
+ val CONNECTED = ascii("CONNECTED")
+ val ERROR = ascii("ERROR")
+ val MESSAGE = ascii("MESSAGE")
+ val RECEIPT = ascii("RECEIPT")
+
+ ///////////////////////////////////////////////////////////////////
+ // Frame Headers
+ ///////////////////////////////////////////////////////////////////
+ val RECEIPT_REQUESTED = ascii("receipt")
+ val TRANSACTION = ascii("transaction")
+ val CONTENT_LENGTH = ascii("content-length")
+ val TRANSFORMATION = ascii("transformation")
+ val TRANSFORMATION_ERROR = ascii("transformation-error")
+
+ val RECEIPT_ID = ascii("receipt-id")
+
+ val DESTINATION = ascii("destination")
+ val CORRELATION_ID = ascii("correlation-id")
+ val REPLY_TO = ascii("reply-to")
+ val EXPIRATION_TIME = ascii("expires")
+ val PRIORITY = ascii("priority")
+ val TYPE = ascii("type")
+ val PERSISTENT = ascii("persistent")
+
+ val MESSAGE_ID = ascii("message-id")
+ val PRORITY = ascii("priority")
+ val REDELIVERED = ascii("redelivered")
+ val TIMESTAMP = ascii("timestamp")
+ val SUBSCRIPTION = ascii("subscription")
+
+ val ACK_MODE = ascii("ack")
+ val ID = ascii("id")
+ val SELECTOR = ascii("selector")
+
+ val LOGIN = ascii("login")
+ val PASSCODE = ascii("passcode")
+ val CLIENT_ID = ascii("client-id")
+ val REQUEST_ID = ascii("request-id")
+ val ACCEPT_VERSION = ascii("accept-version")
+ val HOST = ascii("host")
+
+ val MESSAGE_HEADER = ascii("message")
+ val VERSION = ascii("version")
+ val SESSION = ascii("session")
+ val RESPONSE_ID = ascii("response-id")
+
+ ///////////////////////////////////////////////////////////////////
+ // Common Values
+ ///////////////////////////////////////////////////////////////////
+ val TRUE = ascii("true")
+ val FALSE = ascii("false")
+ val AUTO = ascii("auto")
+ val CLIENT = ascii("client")
+ val INDIVIDUAL = ascii("client-individual")
+ val V1_0 = ascii("1.0")
+ val V1_1 = ascii("1.1")
+
+ val SUPPORTED_PROTOCOL_VERSIONS = Set(V1_0,V1_1)
+
+ // public enum Transformations {
+ // JMS_BYTE, JMS_OBJECT_XML, JMS_OBJECT_JSON, JMS_MAP_XML,
JMS_MAP_JSON
+ //
+ // public String toString() {
+ // return name().replaceAll("_", "-").toLowerCase()
+ // }
+ //
+ // public static Transformations getValue(String value) {
+ // return valueOf(value.replaceAll("-", "_").toUpperCase())
+ // }
+ // }
+}
\ No newline at end of file
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=1023000&r1=1022999&r2=1023000&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
Fri Oct 15 16:27:30 2010
@@ -27,7 +27,6 @@ import protocol.{ProtocolFactory, Protoc
import java.lang.String
import Stomp._
import BufferConversions._
-import StompFrameConstants._
import java.io.IOException
import org.apache.activemq.apollo.selector.SelectorParser
import org.apache.activemq.apollo.filter.{BooleanExpression, FilterException}
@@ -39,27 +38,6 @@ import org.apache.activemq.apollo.dto.{B
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-object StompConstants {
-
- val PROTOCOL = "stomp"
- val DURABLE_PREFIX = ascii("durable:")
- val DURABLE_QUEUE_KIND = ascii("stomp:sub")
-
- val options = new ParserOptions
- options.queuePrefix = ascii("/queue/")
- options.topicPrefix = ascii("/topic/")
-
- options.defaultDomain = Router.QUEUE_DOMAIN
-
- implicit def toDestination(value:AsciiBuffer):Destination = {
- val d = DestinationParser.parse(value, options)
- if( d==null ) {
- throw new ProtocolException("Invalid stomp destiantion name: "+value);
- }
- d
- }
-
-}
/**
* Creates StompCodec objects that encode/decode the
* <a href="http://activemq.apache.org/stomp/">Stomp</a> protocol.
@@ -67,10 +45,8 @@ object StompConstants {
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
class StompProtocolCodecFactory extends ProtocolCodecFactory.Provider {
- import Stomp.Commands.CONNECT
- import Stomp.Commands.STOMP
- def protocol = StompConstants.PROTOCOL
+ def protocol = PROTOCOL
def createProtocolCodec() = new StompCodec();
@@ -116,8 +92,6 @@ object StompProtocol extends StompProtoc
}
-import StompConstants._
-
object StompProtocolHandler extends Log
/**
@@ -131,7 +105,7 @@ class StompProtocolHandler extends Proto
protected def dispatchQueue:DispatchQueue = connection.dispatchQueue
- class StompConsumer(val destination:Destination, val ackMode:AsciiBuffer,
val selector:(AsciiBuffer, BooleanExpression), val binding:BindingDTO) extends
BaseRetained with DeliveryConsumer {
+ class StompConsumer(val subscription_id:Option[AsciiBuffer], val
destination:Destination, val ackMode:AsciiBuffer, val selector:(AsciiBuffer,
BooleanExpression), val binding:BindingDTO) extends BaseRetained with
DeliveryConsumer {
val dispatchQueue = StompProtocolHandler.this.dispatchQueue
dispatchQueue.retain
@@ -174,7 +148,7 @@ class StompProtocolHandler extends Proto
false
} else {
if( delivery.ack!=null) {
- if( ackMode eq Headers.Subscribe.AckModeValues.AUTO ) {
+ if( ackMode eq AUTO ) {
delivery.ack(null)
} else {
// switch the the queue context.. this method is in the
producer's context.
@@ -185,7 +159,10 @@ class StompProtocolHandler extends Proto
}
}
}
- val frame = delivery.message.asInstanceOf[StompFrameMessage].frame
+ var frame = delivery.message.asInstanceOf[StompFrameMessage].frame
+ if( subscription_id != None ) {
+ frame = frame.append_headers((SUBSCRIPTION,
subscription_id.get)::Nil)
+ }
frame.retain
val rc = session.offer(frame)
assert(rc, "offer should be accepted since it was not full")
@@ -216,17 +193,7 @@ class StompProtocolHandler extends Proto
session_manager = new SinkMux[StompFrame](
MapSink(connection.transportSink){x=>x}, dispatchQueue, StompFrame)
connection_sink = new OverflowSink(session_manager.open(dispatchQueue));
connection_sink.refiller = ^{}
-
- connection.connector.broker.getDefaultVirtualHost(
- queue.wrap { (host)=>
- this.host=host
- if( this.host.direct_buffer_pool!=null ) {
- val wf =
connection.transport.getProtocolCodec.asInstanceOf[StompCodec]
- wf.memory_pool = this.host.direct_buffer_pool
- }
- connection.transport.resumeRead
- }
- )
+ connection.transport.resumeRead
}
override def onTransportDisconnected() = {
@@ -255,25 +222,30 @@ class StompProtocolHandler extends Proto
override def onTransportCommand(command:Any) = {
try {
command match {
- case StompFrame(Commands.SEND, _, _, _) =>
+ case StompFrame(SEND, _, _, _) =>
on_stomp_send(command.asInstanceOf[StompFrame])
- case StompFrame(Commands.ACK, headers, content, _) =>
+ case StompFrame(ACK, headers, content, _) =>
on_stomp_ack(command.asInstanceOf[StompFrame])
- case StompFrame(Commands.BEGIN, headers, content, _) =>
+ case StompFrame(BEGIN, headers, content, _) =>
on_stomp_begin(headers)
- case StompFrame(Commands.COMMIT, headers, content, _) =>
+ case StompFrame(COMMIT, headers, content, _) =>
on_stomp_commit(headers)
- case StompFrame(Commands.ABORT, headers, content, _) =>
+ case StompFrame(ABORT, headers, content, _) =>
on_stomp_abort(headers)
- case StompFrame(Commands.SUBSCRIBE, headers, content, _) =>
+ case StompFrame(SUBSCRIBE, headers, content, _) =>
info("got command: %s", command)
on_stomp_subscribe(headers)
- case StompFrame(Commands.CONNECT, headers, _, _) =>
+
+ case StompFrame(STOMP, headers, _, _) =>
+ info("got command: %s", command)
+ on_stomp_connect(headers)
+ case StompFrame(CONNECT, headers, _, _) =>
info("got command: %s", command)
on_stomp_connect(headers)
- case StompFrame(Commands.DISCONNECT, headers, content, _t) =>
+
+ case StompFrame(DISCONNECT, headers, content, _t) =>
info("got command: %s", command)
connection.stop
case s:StompCodec =>
@@ -286,13 +258,69 @@ class StompProtocolHandler extends Proto
}
} catch {
case e:Exception =>
- die("Unexpected error: "+e);
+ die("Unexpected Error", e.toString);
}
}
+ var session_id:Option[AsciiBuffer] = None
+ var protocol_version:Option[AsciiBuffer] = None
+
def on_stomp_connect(headers:HeaderMap) = {
- connection_sink.offer(StompFrame(Responses.CONNECTED))
+
+ protocol_version = get(headers,
ACCEPT_VERSION).getOrElse(V1_0).split(COMMA).map(_.ascii).reverse.find{v=>
+ SUPPORTED_PROTOCOL_VERSIONS.contains(v)
+ }
+
+
+ protocol_version match {
+ case None =>
+ val supported_versions = SUPPORTED_PROTOCOL_VERSIONS.mkString(",")
+
+ _die((MESSAGE_HEADER, ascii("version not supported"))::
+ (VERSION, ascii(supported_versions))::Nil,
+ "Supported protocol versions are %s".format(supported_versions))
+
+ case Some(x) =>
+ connection.transport.suspendRead
+
+ val host_header = get(headers, HOST)
+ val cb: (VirtualHost)=>Unit = queue.wrap { (host)=>
+
+ if(host!=null) {
+ this.host=host
+
+ session_id = Some(ascii(this.host.config.id +
":"+this.host.session_counter.incrementAndGet))
+ connection_sink.offer(
+ StompFrame(CONNECTED, List(
+ (VERSION, protocol_version.get),
+ (SESSION, session_id.get)
+ )))
+
+ if( this.host.direct_buffer_pool!=null ) {
+ val wf =
connection.transport.getProtocolCodec.asInstanceOf[StompCodec]
+ wf.memory_pool = this.host.direct_buffer_pool
+ }
+ connection.transport.resumeRead
+
+ } else {
+ die("Invalid virtual host: "+host_header.get)
+ }
+ }
+
+ host_header match {
+ case None=>
+ connection.connector.broker.getDefaultVirtualHost(cb)
+ case Some(host)=>
+ connection.connector.broker.getVirtualHost(host, cb)
+ }
+
+ }
+
+ }
+
+ def get(headers:HeaderMap,
names:List[AsciiBuffer]):List[Option[AsciiBuffer]] = {
+ names.map(x=>get(headers, x))
}
def get(headers:HeaderMap, name:AsciiBuffer):Option[AsciiBuffer] = {
@@ -308,14 +336,14 @@ class StompProtocolHandler extends Proto
def on_stomp_send(frame:StompFrame) = {
- get(frame.headers, Headers.Send.DESTINATION) match {
+ get(frame.headers, DESTINATION) match {
case None=>
frame.release
die("destination not set.")
case Some(dest)=>
- get(frame.headers, Headers.TRANSACTION) match {
+ get(frame.headers, TRANSACTION) match {
case None=>
perform_send(frame)
case Some(txid)=>
@@ -329,7 +357,7 @@ class StompProtocolHandler extends Proto
def perform_send(frame:StompFrame, uow:StoreUOW=null): Unit = {
- val destiantion: Destination = get(frame.headers,
Headers.Send.DESTINATION).get
+ val destiantion: Destination = get(frame.headers, DESTINATION).get
producerRoutes.get(destiantion) match {
case None =>
// create the producer route...
@@ -372,18 +400,18 @@ class StompProtocolHandler extends Proto
def send_via_route(route:DeliveryProducerRoute, frame:StompFrame,
uow:StoreUOW) = {
var storeBatch:StoreUOW=null
// User might be asking for ack that we have processed the message..
- val receipt = frame.header(Stomp.Headers.RECEIPT_REQUESTED)
+ val receipt = frame.header(RECEIPT_REQUESTED)
if( !route.targets.isEmpty ) {
// We may need to add some headers..
- var message = get( frame.headers, Stomp.Headers.Message.MESSAGE_ID)
match {
+ var message = get( frame.headers, MESSAGE_ID) match {
case None=>
var updated_headers:HeaderMap=Nil;
- updated_headers ::= (Stomp.Headers.Message.MESSAGE_ID,
next_message_id)
- StompFrameMessage(StompFrame(Stomp.Responses.MESSAGE, frame.headers,
frame.content, updated_headers))
+ updated_headers ::= (MESSAGE_ID, next_message_id)
+ StompFrameMessage(StompFrame(MESSAGE, frame.headers, frame.content,
updated_headers))
case Some(id)=>
- StompFrameMessage(StompFrame(Stomp.Responses.MESSAGE, frame.headers,
frame.content))
+ StompFrameMessage(StompFrame(MESSAGE, frame.headers, frame.content))
}
val delivery = new Delivery
@@ -393,7 +421,7 @@ class StompProtocolHandler extends Proto
if( receipt!=null ) {
delivery.ack = { storeTx =>
- connection_sink.offer(StompFrame(Responses.RECEIPT,
List((Stomp.Headers.Response.RECEIPT_ID, receipt))))
+ connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID,
receipt))))
}
}
@@ -409,20 +437,28 @@ class StompProtocolHandler extends Proto
} else {
// info("Dropping message. No consumers interested in message.")
if( receipt!=null ) {
- connection_sink.offer(StompFrame(Responses.RECEIPT,
List((Stomp.Headers.Response.RECEIPT_ID, receipt))))
+ connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
}
}
frame.release
}
def on_stomp_subscribe(headers:HeaderMap) = {
- get(headers, Headers.Subscribe.DESTINATION) match {
+ get(headers, DESTINATION) match {
case Some(dest)=>
val destination:Destination = dest
+ val subscription_id = get(headers, ID)
+ var id:AsciiBuffer = subscription_id match {
+ case None =>
+ if( protocol_version.get == V1_0 )
+ // in 1.0 it's ok if the client does not send us the
+ // the id header
+ dest
+ else
+ die("The id header is missing from the SUBSCRIBE frame");
+ null
- var id:AsciiBuffer = get(headers, Headers.Subscribe.ID) match {
- case None => dest
case Some(x:AsciiBuffer)=> x
}
@@ -434,16 +470,16 @@ class StompProtocolHandler extends Proto
null
}
- val ack:AsciiBuffer = get(headers, Headers.Subscribe.ACK_MODE) match {
- case None=> Headers.Subscribe.AckModeValues.AUTO
+ val ack:AsciiBuffer = get(headers, ACK_MODE) match {
+ case None=> AUTO
case Some(x)=> x match {
- case Headers.Subscribe.AckModeValues.AUTO=>
Headers.Subscribe.AckModeValues.AUTO
- case Headers.Subscribe.AckModeValues.CLIENT=>
Headers.Subscribe.AckModeValues.CLIENT
+ case AUTO=>AUTO
+ case CLIENT=> CLIENT
case ack:AsciiBuffer => die("Unsuported ack mode: "+ack); null
}
}
- val selector = get(headers, Headers.Subscribe.SELECTOR) match {
+ val selector = get(headers, SELECTOR) match {
case None=> null
case Some(x)=> x
try {
@@ -481,7 +517,7 @@ class StompProtocolHandler extends Proto
}
}
- val consumer = new StompConsumer(destination, ack, selector,
binding);
+ val consumer = new StompConsumer(subscription_id, destination,
ack, selector, binding);
consumers += (id -> consumer)
if( binding==null ) {
@@ -516,11 +552,11 @@ class StompProtocolHandler extends Proto
def on_stomp_ack(frame:StompFrame) = {
val headers = frame.headers
- get(headers, Headers.Ack.MESSAGE_ID) match {
+ get(headers, MESSAGE_ID) match {
case Some(messageId)=>
pendingAcks.get(messageId) match {
case Some(ack) =>
- get(headers, Headers.TRANSACTION) match {
+ get(headers, TRANSACTION) match {
case None=>
perform_ack(frame)
case Some(txid)=>
@@ -540,18 +576,22 @@ class StompProtocolHandler extends Proto
}
def perform_ack(frame: StompFrame, uow:StoreUOW=null) = {
- val msgid = get(frame.headers, Headers.Ack.MESSAGE_ID).get
+ val msgid = get(frame.headers, MESSAGE_ID).get
pendingAcks.remove(msgid) match {
case Some(ack) => ack(uow)
case None => die("message allready acked: %s".format(msgid))
}
}
- private def die(msg:String) = {
+ private def die(msg:String, explained:String="") = {
+ info("Shutting connection down due to: "+msg)
+ _die((MESSAGE_HEADER, ascii(msg))::Nil, explained)
+ }
+
+ private def _die(headers:HeaderMap, explained:String="") = {
if( !connection.stopped ) {
- info("Shutting connection down due to: "+msg)
connection.transport.suspendRead
- connection.transport.offer(StompFrame(Responses.ERROR, Nil,
BufferContent(ascii(msg))) )
+ connection.transport.offer(StompFrame(ERROR, headers,
BufferContent(ascii(explained))) )
^ {
connection.stop()
} >>: queue
@@ -568,7 +608,7 @@ class StompProtocolHandler extends Proto
def
require_transaction_header[T](headers:HeaderMap)(proc:(AsciiBuffer)=>T):Option[T]
= {
- get(headers, Headers.TRANSACTION) match {
+ get(headers, TRANSACTION) match {
case None=> die("transaction header not set")
None
case Some(txid)=> Some(proc(txid))
@@ -589,9 +629,9 @@ class StompProtocolHandler extends Proto
def send_receipt(headers:HeaderMap) = {
- get(headers, Stomp.Headers.RECEIPT_REQUESTED) match {
+ get(headers, RECEIPT_REQUESTED) match {
case Some(receipt)=>
- connection_sink.offer(StompFrame(Responses.RECEIPT,
List((Stomp.Headers.Response.RECEIPT_ID, receipt))))
+ connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
case None=>
}
}
@@ -616,9 +656,9 @@ class StompProtocolHandler extends Proto
queue.foreach { frame=>
frame.action match {
- case Commands.SEND =>
+ case SEND =>
perform_send(frame, uow)
- case Commands.ACK =>
+ case ACK =>
perform_ack(frame, uow)
case _ => throw new java.lang.AssertionError("assertion failed: only
send or ack frames are transactional")
}
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1023000&r1=1022999&r2=1023000&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Fri Oct 15 16:27:30 2010
@@ -22,20 +22,83 @@ import org.apache.activemq.apollo.util.F
import org.apache.activemq.apollo.broker.{Broker, BrokerFactory}
class StompTest extends FunSuiteSupport with ShouldMatchers {
+ var broker: Broker = null
- var broker:Broker = null
+
+ test("Stomp 1.0 CONNECT") {
+ val client = new StompClient
+ client.open("localhost", 61613)
+
+ client.send(
+ "CONNECT\n" +
+ "\n")
+ val frame = client.receive()
+ frame should startWith("CONNECTED\n")
+ frame should include regex("""session:.+?\n""")
+ frame should include("version:1.0\n")
+ }
- test("Stomp Connect") {
+ test("Stomp 1.1 CONNECT") {
val client = new StompClient
client.open("localhost", 61613)
- client.send("""CONNECT
-""")
- val frame = client.receive()
- frame should startWith ("CONNECTED")
+ client.send(
+ "CONNECT\n" +
+ "accept-version:1.0,1.1\n" +
+ "host:default\n" +
+ "\n")
+ val frame = client.receive()
+ frame should startWith("CONNECTED\n")
+ frame should include regex("""session:.+?\n""")
+ frame should include("version:1.1\n")
}
+ test("Stomp 1.1 CONNECT /w STOMP Action") {
+ val client = new StompClient
+ client.open("localhost", 61613)
+
+ client.send(
+ "STOMP\n" +
+ "accept-version:1.0,1.1\n" +
+ "host:default\n" +
+ "\n")
+ val frame = client.receive()
+ frame should startWith("CONNECTED\n")
+ frame should include regex("""session:.+?\n""")
+ frame should include("version:1.1\n")
+ }
+
+ test("Stomp 1.1 CONNECT /w Version Fallback") {
+ val client = new StompClient
+ client.open("localhost", 61613)
+
+ client.send(
+ "CONNECT\n" +
+ "accept-version:1.0,10.0\n" +
+ "host:default\n" +
+ "\n")
+ val frame = client.receive()
+ frame should startWith("CONNECTED\n")
+ frame should include regex("""session:.+?\n""")
+ frame should include("version:1.0\n")
+ }
+
+ test("Stomp CONNECT /w invalid virtual host") {
+ val client = new StompClient
+ client.open("localhost", 61613)
+
+ client.send(
+ "CONNECT\n" +
+ "accept-version:1.0,1.1\n" +
+ "host:invalid\n" +
+ "\n")
+ val frame = client.receive()
+ frame should startWith("ERROR\n")
+ frame should include regex("""message:.+?\n""")
+ }
+
+
override protected def beforeAll() = {
val uri = "xml:classpath:activemq-stomp.xml"
info("Loading broker configuration from the classpath with URI: " + uri)
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala?rev=1023000&r1=1022999&r2=1023000&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
Fri Oct 15 16:27:30 2010
@@ -102,18 +102,18 @@ class StompRemoteConsumer extends Remote
ascii("/topic/" + destination.getName().toString());
}
- var frame = StompFrame(Stomp.Commands.CONNECT);
+ var frame = StompFrame(CONNECT);
outboundSink.offer(frame);
var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
- headers ::= (Stomp.Headers.Subscribe.DESTINATION, stompDestination)
- headers ::= (Stomp.Headers.Subscribe.ID, ascii("stomp-sub-" + name))
+ headers ::= (DESTINATION, stompDestination)
+ headers ::= (ID, ascii("stomp-sub-" + name))
if( persistent ) {
- headers ::= (Stomp.Headers.Subscribe.ACK_MODE,
Stomp.Headers.Subscribe.AckModeValues.CLIENT)
+ headers ::= (ACK_MODE, CLIENT)
}
- frame = StompFrame(Stomp.Commands.SUBSCRIBE, headers);
+ frame = StompFrame(SUBSCRIBE, headers);
outboundSink.offer(frame);
watchdog(messageCount)
}
@@ -121,17 +121,17 @@ class StompRemoteConsumer extends Remote
override def onTransportCommand(command: Object) = {
var frame = command.asInstanceOf[StompFrame]
frame match {
- case StompFrame(Responses.CONNECTED, headers, _, _) =>
- case StompFrame(Responses.MESSAGE, headers, content, _) =>
+ case StompFrame(CONNECTED, headers, _, _) =>
+ case StompFrame(MESSAGE, headers, content, _) =>
messageReceived();
// we client ack if persistent messages are being used.
if( persistent ) {
- var rc = List((Stomp.Headers.Ack.MESSAGE_ID,
frame.header(Stomp.Headers.Message.MESSAGE_ID)))
- outboundSink.offer(StompFrame(Stomp.Commands.ACK, rc));
+ var rc = List((MESSAGE_ID, frame.header(MESSAGE_ID)))
+ outboundSink.offer(StompFrame(ACK, rc));
}
- case StompFrame(Responses.ERROR, headers, content, _) =>
+ case StompFrame(ERROR, headers, content, _) =>
onFailure(new Exception("Server reported an error: " + frame.content));
case _ =>
onFailure(new Exception("Unexpected stomp command: " + frame.action));
@@ -155,7 +155,7 @@ class StompRemoteConsumer extends Remote
}
override def doStop() = {
- outboundSink.offer(StompFrame(Stomp.Commands.DISCONNECT));
+ outboundSink.offer(StompFrame(DISCONNECT));
dispatchQueue.dispatchAfter(5, TimeUnit.SECONDS, ^ {
transport.stop
stop
@@ -170,12 +170,12 @@ class StompRemoteProducer extends Remote
def send_next: Unit = {
var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
- headers ::= (Stomp.Headers.Send.DESTINATION, stompDestination);
+ headers ::= (DESTINATION, stompDestination);
if (property != null) {
headers ::= (ascii(property), ascii(property));
}
if( persistent ) {
- headers ::= ((Stomp.Headers.RECEIPT_REQUESTED, ascii("x")));
+ headers ::= ((RECEIPT_REQUESTED, ascii("x")));
}
// var p = this.priority;
// if (priorityMod > 0) {
@@ -183,7 +183,7 @@ class StompRemoteProducer extends Remote
// }
var content = ascii(createPayload());
- frame = StompFrame(Stomp.Commands.SEND, headers, BufferContent(content))
+ frame = StompFrame(SEND, headers, BufferContent(content))
drain()
}
@@ -222,21 +222,21 @@ class StompRemoteProducer extends Remote
} else {
stompDestination = ascii("/topic/" + destination.getName().toString());
}
- outboundSink.offer(StompFrame(Stomp.Commands.CONNECT));
+ outboundSink.offer(StompFrame(CONNECT));
send_next
}
override def onTransportCommand(command: Object) = {
var frame = command.asInstanceOf[StompFrame]
frame match {
- case StompFrame(Responses.RECEIPT, headers, _, _) =>
+ case StompFrame(RECEIPT, headers, _, _) =>
assert( persistent )
// we got the ack for the previous message we sent.. now send the next
one.
incrementMessageCount
send_next
- case StompFrame(Responses.CONNECTED, headers, _, _) =>
- case StompFrame(Responses.ERROR, headers, content, _) =>
+ case StompFrame(CONNECTED, headers, _, _) =>
+ case StompFrame(ERROR, headers, content, _) =>
onFailure(new Exception("Server reported an error: " +
frame.content.utf8));
case _ =>
onFailure(new Exception("Unexpected stomp command: " + frame.action));
@@ -244,7 +244,7 @@ class StompRemoteProducer extends Remote
}
override def doStop() = {
- outboundSink.offer(StompFrame(Stomp.Commands.DISCONNECT));
+ outboundSink.offer(StompFrame(DISCONNECT));
dispatchQueue.dispatchAfter(5, TimeUnit.SECONDS, ^ {
transport.stop
stop
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala?rev=1023000&r1=1022999&r2=1023000&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
Fri Oct 15 16:27:30 2010
@@ -27,7 +27,8 @@ import java.net.{ProtocolException, Inet
import java.lang.String._
import java.util.concurrent.TimeUnit._
import collection.mutable.Map
-import org.apache.activemq.apollo.stomp.{StompClient, Stomp}
+import org.apache.activemq.apollo.stomp.StompClient
+import org.apache.activemq.apollo.stomp.Stomp._
/**
*
@@ -289,10 +290,10 @@ object StompLoadClient {
while (!done.get) {
if( clientAck ) {
val msg = client.receiveAscii()
- val start = msg.indexOf(Stomp.Headers.Message.MESSAGE_ID)
+ val start = msg.indexOf(MESSAGE_ID)
assert( start >= 0 )
val end = msg.indexOf("\n", start)
- val msgId =
msg.slice(start+Stomp.Headers.Message.MESSAGE_ID.length+1, end).ascii
+ val msgId = msg.slice(start+MESSAGE_ID.length+1, end).ascii
client.send("""
ACK
message-id:"""+msgId+"""