Author: chirino
Date: Mon Oct 18 18:53:17 2010
New Revision: 1023948
URL: http://svn.apache.org/viewvc?rev=1023948&view=rev
Log:
Added the Stomp 1.1 heart-beat feature
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1023948&r1=1023947&r2=1023948&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
Mon Oct 18 18:53:17 2010
@@ -203,7 +203,12 @@ class StompCodec extends ProtocolCodec w
ProtocolCodec.BufferState.FULL
} else {
val was_empty = is_empty
- encode(command.asInstanceOf[StompFrame], next_write_buffer);
+ command match {
+ case buffer:Buffer=>
+ buffer.writeTo(next_write_buffer.asInstanceOf[DataOutput])
+ case frame:StompFrame=>
+ encode(frame, next_write_buffer);
+ }
if( was_empty ) {
ProtocolCodec.BufferState.WAS_EMPTY
} else {
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1023948&r1=1023947&r2=1023948&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
Mon Oct 18 18:53:17 2010
@@ -398,6 +398,7 @@ object Stomp {
val REQUEST_ID = ascii("request-id")
val ACCEPT_VERSION = ascii("accept-version")
val HOST = ascii("host")
+ val HEART_BEAT = ascii("heart-beat")
val MESSAGE_HEADER = ascii("message")
val VERSION = ascii("version")
@@ -414,6 +415,7 @@ object Stomp {
val INDIVIDUAL = ascii("client-individual")
val V1_0 = ascii("1.0")
val V1_1 = ascii("1.1")
+ val DEFAULT_HEAT_BEAT = ascii("0,0")
val SUPPORTED_PROTOCOL_VERSIONS = Set(V1_0,V1_1)
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=1023948&r1=1023947&r2=1023948&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
Mon Oct 18 18:53:17 2010
@@ -93,17 +93,86 @@ object StompProtocol extends StompProtoc
}
-object StompProtocolHandler extends Log
+
+class HeartBeatMonitor() {
+
+ var transport:Transport = _
+ var write_interval = 0L
+ var read_interval = 0L
+
+ var on_keep_alive = ()=>{}
+ var on_dead = ()=>{}
+
+ var session = 0
+
+ def schedual_check_writes(session:Int):Unit = {
+ val last_write_counter = transport.getProtocolCodec.getWriteCounter()
+ transport.getDispatchQueue.after(write_interval, TimeUnit.MILLISECONDS) {
+ if( this.session == session ) {
+ if( last_write_counter==transport.getProtocolCodec.getWriteCounter ) {
+ on_keep_alive()
+ }
+ schedual_check_writes(session)
+ }
+ }
+ }
+
+ def schedual_check_reads(session:Int):Unit = {
+ val last_read_counter = transport.getProtocolCodec.getReadCounter()
+ transport.getDispatchQueue.after(read_interval, TimeUnit.MILLISECONDS) {
+ if( this.session == session ) {
+ if( last_read_counter==transport.getProtocolCodec.getReadCounter ) {
+ on_dead()
+ }
+ schedual_check_reads(session)
+ }
+ }
+ }
+
+ def start = {
+ session += 1
+ if( write_interval!=0 ) {
+ schedual_check_writes(session)
+ }
+ if( read_interval!=0 ) {
+ schedual_check_reads(session)
+ }
+ }
+
+ def stop = {
+ session += 1
+ }
+}
+
+object StompProtocolHandler extends Log {
+
+ // How long we hold a failed connection open so that the remote end
+ // can get the resulting error message.
+ val DEFAULT_DIE_DELAY = 5*1000L
+ var die_delay = DEFAULT_DIE_DELAY
+
+ // How often we can send heartbeats of the connection is idle.
+ val DEFAULT_OUTBOUND_HEARTBEAT = 100L
+ var outbound_heartbeat = DEFAULT_OUTBOUND_HEARTBEAT
+
+ // How often we want to get heartbeats from the peer if the connection is
idle.
+ val DEFAULT_INBOUND_HEARTBEAT = 10*1000L
+ var inbound_heartbeat = DEFAULT_INBOUND_HEARTBEAT
+
+}
+
+import StompProtocolHandler._
+
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
class StompProtocolHandler extends ProtocolHandler with DispatchLogging {
-
+
def protocol = "stomp"
override protected def log = StompProtocolHandler
-
+
protected def dispatchQueue:DispatchQueue = connection.dispatchQueue
class StompConsumer(val subscription_id:Option[AsciiBuffer], val
destination:Destination, val ackMode:AsciiBuffer, val selector:(AsciiBuffer,
BooleanExpression), val binding:BindingDTO) extends BaseRetained with
DeliveryConsumer {
@@ -115,7 +184,7 @@ class StompProtocolHandler extends Proto
dispatchQueue.release
})
- override def connection = Some(StompProtocolHandler.this.connection)
+ override def connection = Some(StompProtocolHandler.this.connection)
def matches(delivery:Delivery) = {
if( delivery.message.protocol eq StompProtocol ) {
@@ -170,7 +239,7 @@ class StompProtocolHandler extends Proto
true
}
}
-
+
def refiller = session.refiller
def refiller_=(value:Runnable) = { session.refiller=value }
@@ -189,16 +258,24 @@ class StompProtocolHandler extends Proto
private def queue = connection.dispatchQueue
var pendingAcks = HashMap[AsciiBuffer, (StoreUOW)=>Unit]()
+
+ var session_id:Option[AsciiBuffer] = None
+ var protocol_version:Option[AsciiBuffer] = None
+
+ var heart_beat_monitor:HeartBeatMonitor = new HeartBeatMonitor
+
override def onTransportConnected() = {
session_manager = new SinkMux[StompFrame](
MapSink(connection.transportSink){x=>x}, dispatchQueue, StompFrame)
connection_sink = new OverflowSink(session_manager.open(dispatchQueue));
connection_sink.refiller = ^{}
connection.transport.resumeRead
+
}
override def onTransportDisconnected() = {
if( !closed ) {
+ heart_beat_monitor.stop
closed=true;
producerRoutes.foreach{
case(_,route)=> host.router.disconnect(route)
@@ -223,56 +300,102 @@ class StompProtocolHandler extends Proto
override def onTransportCommand(command:Any) = {
try {
command match {
- case StompFrame(SEND, _, _, _) =>
- on_stomp_send(command.asInstanceOf[StompFrame])
- case StompFrame(ACK, headers, content, _) =>
- on_stomp_ack(command.asInstanceOf[StompFrame])
-
- case StompFrame(BEGIN, headers, content, _) =>
- on_stomp_begin(headers)
- case StompFrame(COMMIT, headers, content, _) =>
- on_stomp_commit(headers)
- case StompFrame(ABORT, headers, content, _) =>
- on_stomp_abort(headers)
-
- case StompFrame(SUBSCRIBE, headers, content, _) =>
- info("got command: %s", command)
- on_stomp_subscribe(headers)
-
- case StompFrame(STOMP, headers, _, _) =>
- info("got command: %s", command)
- on_stomp_connect(headers)
- case StompFrame(CONNECT, headers, _, _) =>
- info("got command: %s", command)
- on_stomp_connect(headers)
-
- case StompFrame(DISCONNECT, headers, content, _t) =>
- info("got command: %s", command)
- connection.stop
case s:StompCodec =>
// this is passed on to us by the protocol discriminator
// so we know which wire format is being used.
- case StompFrame(unknown, _, _, _) =>
- die("Unsupported STOMP command: "+unknown);
- case _ =>
- die("Unsupported command: "+command);
+ case frame:StompFrame=>
+
+ info("got command: %s", frame)
+ if( protocol_version eq None ) {
+ frame.action match {
+ case STOMP =>
+ on_stomp_connect(frame.headers)
+ case CONNECT =>
+ on_stomp_connect(frame.headers)
+ case DISCONNECT =>
+ connection.stop
+ case _ =>
+ die("Client must first send a connect frame");
+ }
+
+ } else {
+ frame.action match {
+ case SEND =>
+ on_stomp_send(frame)
+ case ACK =>
+ on_stomp_ack(frame)
+
+ case BEGIN =>
+ on_stomp_begin(frame.headers)
+ case COMMIT =>
+ on_stomp_commit(frame.headers)
+ case ABORT =>
+ on_stomp_abort(frame.headers)
+ case SUBSCRIBE =>
+ on_stomp_subscribe(frame.headers)
+
+ case DISCONNECT =>
+ connection.stop
+
+ case _ =>
+ die("Invalid frame: "+frame.action);
+ }
+ }
+
+ case _=>
+ warn("Internal Server Error: unexpected command type")
+ die("Internal Server Error");
}
} catch {
case e:Exception =>
- die("Unexpected Error", e.toString);
+ warn(e, "Internal Server Error")
+ die("Internal Server Error");
}
}
+ def on_stomp_connect(headers:HeaderMap):Unit = {
- var session_id:Option[AsciiBuffer] = None
- var protocol_version:Option[AsciiBuffer] = None
-
- def on_stomp_connect(headers:HeaderMap) = {
protocol_version = get(headers,
ACCEPT_VERSION).getOrElse(V1_0).split(COMMA).map(_.ascii).reverse.find{v=>
SUPPORTED_PROTOCOL_VERSIONS.contains(v)
}
+ val heart_beat = get(headers, HEART_BEAT).getOrElse(DEFAULT_HEAT_BEAT)
+ heart_beat.split(COMMA).map(_.ascii) match {
+ case Array(cx,cy) =>
+ try {
+ val can_send = cx.toString.toLong
+ val please_send = cy.toString.toLong
+
+ if( inbound_heartbeat>=0 && can_send > 0 ) {
+ heart_beat_monitor.read_interval = inbound_heartbeat.max(can_send)
+
+ // lets be a little forgiving to account to packet transmission
latency.
+ heart_beat_monitor.read_interval +=
heart_beat_monitor.read_interval.min(5000)
+
+ heart_beat_monitor.on_dead = () => {
+ die("Stale connection. Missed heartbeat.")
+ }
+ }
+ if( outbound_heartbeat>=0 && please_send > 0 ) {
+ heart_beat_monitor.write_interval =
outbound_heartbeat.max(please_send)
+ heart_beat_monitor.on_keep_alive = () => {
+ connection.transport.offer(NEWLINE_BUFFER)
+ }
+ }
+
+ heart_beat_monitor.transport = connection.transport
+ heart_beat_monitor.start
+
+ } catch {
+ case x:NumberFormatException=>
+ die("Invalid heart-beat header: "+heart_beat)
+ return
+ }
+ case _ =>
+ die("Invalid heart-beat header: "+heart_beat)
+ return
+ }
protocol_version match {
case None =>
@@ -281,21 +404,25 @@ class StompProtocolHandler extends Proto
_die((MESSAGE_HEADER, ascii("version not supported"))::
(VERSION, ascii(supported_versions))::Nil,
"Supported protocol versions are %s".format(supported_versions))
+ return
case Some(x) =>
connection.transport.suspendRead
val host_header = get(headers, HOST)
- val cb: (VirtualHost)=>Unit = queue.wrap { (host)=>
-
+ val cb: (VirtualHost)=>Unit = (host)=>
+ queue {
if(host!=null) {
this.host=host
+ val outbound_heart_beat_header =
ascii("%d,%d".format(outbound_heartbeat,inbound_heartbeat))
session_id = Some(ascii(this.host.config.id +
":"+this.host.session_counter.incrementAndGet))
+
connection_sink.offer(
StompFrame(CONNECTED, List(
(VERSION, protocol_version.get),
- (SESSION, session_id.get)
+ (SESSION, session_id.get),
+ (HEART_BEAT, outbound_heart_beat_header)
)))
if( this.host.direct_buffer_pool!=null ) {
@@ -595,7 +722,7 @@ class StompProtocolHandler extends Proto
connection.transport.offer(StompFrame(ERROR, headers,
BufferContent(ascii(explained))) )
// TODO: if there are too many open connections we should just close the
connection
// without waiting for the error to get sent to the client.
- queue.after(5, TimeUnit.SECONDS) {
+ queue.after(die_delay, TimeUnit.MILLISECONDS) {
connection.stop()
}
}
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala?rev=1023948&r1=1023947&r2=1023948&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala
Mon Oct 18 18:53:17 2010
@@ -71,13 +71,17 @@ import java.io._
}
def receive():String = {
+ var start = true;
val buffer = new BAOS()
var c = in.read
while( c >= 0 ) {
if( c==0 ) {
return new String(buffer.toByteArray, "UTF-8")
}
- buffer.write(c)
+ if( !start || c!= Stomp.NEWLINE) {
+ start = false
+ buffer.write(c)
+ }
c = in.read()
}
throw new EOFException()
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1023948&r1=1023947&r2=1023948&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Mon Oct 18 18:53:17 2010
@@ -37,7 +37,7 @@ class StompTestSupport extends FunSuiteS
}
-class Stomp10Test extends StompTestSupport {
+class Stomp10ConnectTest extends StompTestSupport {
test("Stomp 1.0 CONNECT") {
val client = new StompClient
@@ -54,7 +54,7 @@ class Stomp10Test extends StompTestSuppo
}
-class Stomp11Test extends StompTestSupport {
+class Stomp11ConnectTest extends StompTestSupport {
test("Stomp 1.1 CONNECT") {
val client = new StompClient
@@ -130,4 +130,66 @@ class Stomp11Test extends StompTestSuppo
frame should include regex("""message:.+?\n""")
}
+}
+
+class Stomp11HeartBeatTest extends StompTestSupport {
+
+ test("Stomp 1.1 Broker sends heart-beat") {
+ val client = new StompClient
+ client.open("localhost", 61613)
+
+ client.send(
+ "CONNECT\n" +
+ "accept-version:1.1\n" +
+ "host:default\n" +
+ "heart-beat:0,1000\n" +
+ "\n")
+ val frame = client.receive()
+ frame should startWith("CONNECTED\n")
+ frame should include regex("""heart-beat:.+?\n""")
+
+ def heart_beat_after(time:Long) {
+ var start = System.currentTimeMillis
+ val c = client.in.read()
+ c should be === (Stomp.NEWLINE)
+ var end = System.currentTimeMillis
+ (end - start) should be >= time
+ }
+ client.in.read()
+ heart_beat_after(900)
+ heart_beat_after(900)
+ }
+
+
+ test("Stomp 1.1 Broker times out idle connection") {
+ StompProtocolHandler.inbound_heartbeat = 1000L
+ try {
+ val client = new StompClient
+ client.open("localhost", 61613)
+
+ client.send(
+ "CONNECT\n" +
+ "accept-version:1.1\n" +
+ "host:default\n" +
+ "heart-beat:1000,0\n" +
+ "\n")
+
+ var frame = client.receive()
+ frame should startWith("CONNECTED\n")
+ frame should include regex("""heart-beat:.+?\n""")
+
+ var start = System.currentTimeMillis
+
+ frame = client.receive()
+ frame should startWith("ERROR\n")
+ frame should include regex("""message:.+?\n""")
+
+ var end = System.currentTimeMillis
+ (end - start) should be >= 1000L
+
+ } finally {
+ StompProtocolHandler.inbound_heartbeat =
StompProtocolHandler.DEFAULT_INBOUND_HEARTBEAT
+ }
+ }
+
}
\ No newline at end of file