Author: chirino
Date: Fri Dec 17 15:25:36 2010
New Revision: 1050420
URL: http://svn.apache.org/viewvc?rev=1050420&view=rev
Log:
Got queue browsers working from stomp.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1050420&r1=1050419&r2=1050420&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Fri Dec 17 15:25:36 2010
@@ -538,9 +538,7 @@ class Queue(val host: VirtualHost, var i
for (consumer <- values) {
all_subscriptions.get(consumer) match {
case Some(subscription) =>
- all_subscriptions -= consumer
subscription.close
- addCapacity( -tune_consumer_buffer )
case None =>
}
@@ -1011,7 +1009,7 @@ class QueueEntry(val queue:Queue, val se
} else {
if (sub.offer(delivery)) {
// advance: accepted...
- advancing == sub
+ advancing += sub
} else {
// hold back: flow controlled
heldBack += sub
@@ -1316,6 +1314,8 @@ class Subscription(val queue:Queue, val
// This opens up the consumer
pos = queue.head_entry;
+ assert(pos!=null)
+
session = consumer.connect(this)
session.refiller = pos
queue.head_entry ::= this
@@ -1327,22 +1327,27 @@ class Subscription(val queue:Queue, val
}
def close() = {
- pos -= this
- pos = null
-
- // nack all the acquired entries.
- var next = acquired.getHead
- while( next !=null ) {
- val cur = next;
- next = next.getNext
- cur.nack // this unlinks the entry.
- }
-
- session.refiller = NOOP
- session.close
- session = null
+ if(pos!=null) {
+ pos -= this
+ pos = null
+
+ queue.all_subscriptions -= consumer
+ queue.addCapacity( - queue.tune_consumer_buffer )
+
+ // nack all the acquired entries.
+ var next = acquired.getHead
+ while( next !=null ) {
+ val cur = next;
+ next = next.getNext
+ cur.nack // this unlinks the entry.
+ }
+
+ session.refiller = NOOP
+ session.close
+ session = null
- queue.trigger_swap
+ queue.trigger_swap
+ } else {}
}
/**
@@ -1352,9 +1357,6 @@ class Subscription(val queue:Queue, val
def advance(value:QueueEntry):Unit = {
assert(value!=null)
- if( pos == null ) {
- assert(pos!=null)
- }
advanced_size += pos.size
@@ -1363,6 +1365,9 @@ class Subscription(val queue:Queue, val
if( tail_parked ) {
tail_parkings += 0
+ if( browser ) {
+ close
+ }
}
}
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1050420&r1=1050419&r2=1050420&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
Fri Dec 17 15:25:36 2010
@@ -187,17 +187,24 @@ class SinkMux[T](val downstream:Sink[T],
}
}
- def open(producer_queue:DispatchQueue):Sink[T] = {
+ def open(producer_queue:DispatchQueue,allow_overflow:Boolean=false):Sink[T]
= {
val session = createSession(producer_queue, session_max_credits)
sessions ::= session
- session
+ if( allow_overflow ) {
+ new OverflowSink(session)
+ } else {
+ session
+ }
}
- def close(session:Sink[T]) = {
- val s = session.asInstanceOf[Session[T]]
- sessions = sessions.filterNot( _ == s )
- s.producer_queue {
- s.close
+ def close(session:Sink[T]):Unit = {
+ session match {
+ case s:OverflowSink[T] => close(s.downstream)
+ case s:Session[T] =>
+ sessions = sessions.filterNot( _ == s )
+ s.producer_queue {
+ s.close
+ }
}
}
@@ -267,10 +274,12 @@ class Session[T](val producer_queue:Disp
}
def close = {
- assert(getCurrentQueue eq producer_queue)
- credit_adder.release
- downstream.release
- closed=true
+ if( !closed ) {
+ closed=true
+ assert(getCurrentQueue eq producer_queue)
+ credit_adder.release
+ downstream.release
+ }
}
}
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1050420&r1=1050419&r2=1050420&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
Fri Dec 17 15:25:36 2010
@@ -416,11 +416,15 @@ object Stomp {
val SESSION = ascii("session")
val RESPONSE_ID = ascii("response-id")
+ val BROWSER = ascii("browser")
+ val EXCLUSIVE = ascii("exclusive")
+
///////////////////////////////////////////////////////////////////
// Common Values
///////////////////////////////////////////////////////////////////
val TRUE = ascii("true")
val FALSE = ascii("false")
+ val END = ascii("end")
val ACK_MODE_AUTO = ascii("auto")
val ACK_MODE_NONE = ascii("none")
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1050420&r1=1050419&r2=1050420&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Fri Dec 17 15:25:36 2010
@@ -197,16 +197,18 @@ class StompProtocolHandler extends Proto
}
}
- class StompConsumer(val subscription_id:Option[AsciiBuffer], val
destination:Destination, val ack_handler:AckHandler, val selector:(String,
BooleanExpression), val binding:BindingDTO) extends BaseRetained with
DeliveryConsumer {
- val dispatchQueue = StompProtocolHandler.this.dispatchQueue
+ class StompConsumer(
+ val subscription_id:Option[AsciiBuffer],
+ val destination:Destination,
+ val ack_handler:AckHandler,
+ val selector:(String, BooleanExpression),
+ val binding:BindingDTO,
+ override val browser:Boolean
+ ) extends BaseRetained with DeliveryConsumer {
- dispatchQueue.retain
- setDisposer(^{
- session_manager.release
- dispatchQueue.release
- })
+ val dispatchQueue = StompProtocolHandler.this.dispatchQueue
override def connection = Some(StompProtocolHandler.this.connection)
@@ -225,16 +227,48 @@ class StompProtocolHandler extends Proto
}
def connect(p:DeliveryProducer) = new DeliverySession {
+
+ // This session object should only be used from the dispatch queue
context
+ // of the producer.
+
retain
def producer = p
def consumer = StompConsumer.this
+ var closed = false
val session = session_manager.open(producer.dispatchQueue)
def close = {
- session_manager.close(session)
- release
+ assert(getCurrentQueue == producer.dispatchQueue)
+ if( !closed ) {
+ closed = true
+ if( browser ) {
+ // Then send the end of browse message.
+ var frame = StompFrame(MESSAGE, (BROWSER, END)::Nil,
BufferContent(EMPTY_BUFFER))
+ if( subscription_id != None ) {
+ frame = frame.append_headers((SUBSCRIPTION,
subscription_id.get)::Nil)
+ }
+
+ if( session.full ) {
+ // session is full so use an overflow sink so to hold the
message,
+ // and then trigger closing the session once it empties out.
+ val sink = new OverflowSink(session)
+ sink.refiller = ^{
+ session_manager.close(session)
+ release
+ }
+ sink.offer(frame)
+ } else {
+ session.offer(frame)
+ session_manager.close(session)
+ release
+ }
+ } else {
+ session_manager.close(session)
+ release
+ }
+ }
}
// Delegate all the flow control stuff to the session
@@ -736,6 +770,7 @@ class StompProtocolHandler extends Proto
val topic = destination.domain == Router.TOPIC_DOMAIN
var persistent = get(headers, PERSISTENT).map( _ == TRUE ).getOrElse(false)
+ var browser = get(headers, BROWSER).map( _ == TRUE ).getOrElse(false)
val ack = get(headers, ACK_MODE) match {
case None=> new AutoAckHandler
@@ -788,7 +823,7 @@ class StompProtocolHandler extends Proto
}
}
- val consumer = new StompConsumer(subscription_id, destination, ack,
selector, binding);
+ val consumer = new StompConsumer(subscription_id, destination, ack,
selector, binding, browser);
consumers += (id -> consumer)
if( binding==null ) {
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1050420&r1=1050419&r2=1050420&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Fri Dec 17 15:25:36 2010
@@ -225,6 +225,61 @@ class Stomp11HeartBeatTest extends Stomp
class StompDestinationTest extends StompTestSupport {
+ test("Queue browsers don't consume the messages") {
+ connect("1.1")
+
+ def put(id:Int) = {
+ client.write(
+ "SEND\n" +
+ "destination:/queue/browsing\n" +
+ "receipt:0\n" +
+ "\n" +
+ "message:"+id+"\n")
+ wait_for_receipt("0")
+ }
+
+ put(1)
+ put(2)
+ put(3)
+
+ // create a browser subscription.
+ client.write(
+ "SUBSCRIBE\n" +
+ "destination:/queue/browsing\n" +
+ "browser:true\n" +
+ "id:0\n" +
+ "\n")
+
+ def get(sub:Int, id:Int) = {
+ val frame = client.receive()
+ info(frame)
+ frame should startWith("MESSAGE\n")
+ frame should include ("subscription:%d\n".format(sub))
+ frame should endWith regex("\n\nmessage:%d\n".format(id))
+ }
+ get(0,1)
+ get(0,2)
+ get(0,3)
+
+ // Should get a browse end message
+ val frame = client.receive()
+ frame should startWith("MESSAGE\n")
+ frame should include ("subscription:0\n")
+ frame should include ("browser:end\n")
+
+ // create a regular subscription.
+ client.write(
+ "SUBSCRIBE\n" +
+ "destination:/queue/browsing\n" +
+ "id:1\n" +
+ "\n")
+
+ get(1,1)
+ get(1,2)
+ get(1,3)
+
+ }
+
test("Queue order preserved") {
connect("1.1")
@@ -409,6 +464,7 @@ class StompDestinationTest extends Stomp
get(1)
get(3)
}
+
}
class StompSslDestinationTest extends StompDestinationTest {
Modified:
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1050420&r1=1050419&r2=1050420&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
(original)
+++
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
Fri Dec 17 15:25:36 2010
@@ -718,6 +718,21 @@ ack mode to consume reliable messages. A
client which have not been acked when the client disconnects will get
redelivered to another subscribed client.
+### Browsing Subscriptions
+
+A normal subscription on a queue will consume messages so that no
+other subscription will get a copy of the message. If you want to
+browse all the messages on a queue in a non-destructive fashion, you
+can create browsing subscription. To make a subscription an browsing
+subscription, just add the `browser:true` header. For example:
+
+ SUBSCRIBE
+ id:mysub
+ browser:true
+ destination:/queue/foo
+
+ ^@
+
### Topic Durable Subscriptions
A durable subscription is a queue which is subscribed to a topic so that
@@ -728,7 +743,8 @@ durable subscription and since it's back
will have the topic's messages load balanced across them.
To create or reattach to a a durable subscription with STOMP, you uniquely name
-the durable subscription using the `id` header on the `SUBSCRIBE` frame and
+the durable subscription using the `id` header on the `
+SCRIBE` frame and
also adding a `persistent:true` header. Example:
SUBSCRIBE