Author: chirino
Date: Thu Feb 16 20:32:17 2012
New Revision: 1245159
URL: http://svn.apache.org/viewvc?rev=1245159&view=rev
Log:
Fixes APLO-157 : Unsubscribing durable subscriptions is not successful
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1245159&r1=1245158&r2=1245159&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Thu Feb 16 20:32:17 2012
@@ -1311,9 +1311,25 @@ class StompProtocolHandler extends Proto
consumers.get(id) match {
case None=>
- die("The subscription '%s' not found.".format(id))
+ if( persistent ) {
+ // We just want to delete a durable sub but client has not connected
+ // to it yet in this session
+ host.dispatch_queue {
+ var addresses =
Array[DestinationAddress](SubscriptionAddress(destination_parser.decode_path(decode_header(id)),
null, Array[BindAddress]()))
+ host.router.delete(addresses, security_context) match {
+ case Some(error)=>
+ dispatchQueue {
+ async_die(error)
+ }
+ case None =>
+ send_receipt(headers)
+ }
+ }
+ } else {
+ die("The subscription '%s' not found.".format(id))
+ }
+
case Some(consumer)=>
-
// consumer gets disposed after all producer stop sending to it...
consumer.setDisposer(^{ send_receipt(headers) })
consumers -= id
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1245159&r1=1245158&r2=1245159&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Thu Feb 16 20:32:17 2012
@@ -125,12 +125,13 @@ class StompTestSupport extends FunSuiteS
wait_for_receipt(""+rid, c)
}
- def unsubscribe(id:String, c: StompClient = client) = {
+ def unsubscribe(id:String, headers:String="", c: StompClient=client) = {
val rid = receipt_counter.incrementAndGet()
client.write(
"UNSUBSCRIBE\n" +
"id:"+id+"\n" +
"receipt:"+rid+"\n" +
+ headers+
"\n")
wait_for_receipt(""+rid, c)
}
@@ -1182,6 +1183,31 @@ class DurableSubscriptionOnLevelDBTest e
override val broker_config_uri: String =
"xml:classpath:apollo-stomp-leveldb.xml"
+ test("You connection then unsubscribe form existing durable sub (APLO-157)")
{
+ connect("1.1")
+ subscribe("APLO-157", "/topic/APLO-157", "auto", true)
+ client.close()
+
+ // Make sure the durable sub exists.
+ connect("1.1")
+ sync_send("/topic/APLO-157", "1")
+ subscribe("APLO-157", "/topic/APLO-157", "client", true)
+ assert_received("1")
+ client.close()
+
+ // Delete the durable sub..
+ connect("1.1")
+ unsubscribe("APLO-157", "persistent:true\n")
+ client.close()
+
+ // Make sure the durable sub does not exists.
+ connect("1.1")
+ subscribe("APLO-157", "/topic/APLO-157", "client", true)
+ async_send("/topic/APLO-157", "2")
+ assert_received("2")
+ unsubscribe("APLO-157", "persistent:true\n")
+
+ }
test("Can create dsubs with dots in them") {
connect("1.1")