Author: chirino
Date: Mon Nov 8 13:01:44 2010
New Revision: 1032562
URL: http://svn.apache.org/viewvc?rev=1032562&view=rev
Log:
Fixes problem where multiple receipts were being sent back to senders on topics
which had consumers.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1032562&r1=1032561&r2=1032562&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
Mon Nov 8 13:01:44 2010
@@ -25,10 +25,10 @@ import _root_.org.fusesource.hawtdispatc
import collection.JavaConversions
import org.apache.activemq.apollo.util._
import collection.mutable.{ListBuffer, HashMap}
-import org.apache.activemq.apollo.store.QueueRecord
import org.apache.activemq.apollo.dto.{PointToPointBindingDTO, BindingDTO}
import path.{PathFilter, PathMap}
import scala.collection.immutable.List
+import org.apache.activemq.apollo.store.{StoreUOW, QueueRecord}
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -404,6 +404,7 @@ case class DeliveryProducerRoute(val rou
// Dispatch.
//
+ var pendingAck: (StoreUOW)=>Unit = null
var overflow:Delivery=null
var overflowSessions = List[DeliverySession]()
var refiller:Runnable=null
@@ -416,42 +417,49 @@ case class DeliveryProducerRoute(val rou
} else {
// Do we need to store the message if we have a matching consumer?
- delivery.message.retain
+ pendingAck = delivery.ack
+ val copy = delivery.copy
+ copy.message.retain
+
targets.foreach { target=>
// only deliver to matching consumers
- if( target.consumer.matches(delivery) ) {
+ if( target.consumer.matches(copy) ) {
- if( delivery.storeKey == -1L && target.consumer.is_persistent &&
delivery.message.persistent ) {
- if( delivery.uow==null ) {
- delivery.uow = router.host.store.createStoreUOW
+ if( copy.storeKey == -1L && target.consumer.is_persistent &&
copy.message.persistent ) {
+ if( copy.uow==null ) {
+ copy.uow = router.host.store.createStoreUOW
} else {
- delivery.uow.retain
+ copy.uow.retain
}
- delivery.storeKey =
delivery.uow.store(delivery.createMessageRecord)
+ copy.storeKey = copy.uow.store(copy.createMessageRecord)
}
- if( !target.offer(delivery) ) {
+ if( !target.offer(copy) ) {
overflowSessions ::= target
}
}
}
if( overflowSessions!=Nil ) {
- overflow = delivery
+ overflow = copy
} else {
- delivered(delivery)
+ delivered(copy)
}
true
}
}
private def delivered(delivery: Delivery): Unit = {
- if (delivery.ack != null) {
+ if (pendingAck != null) {
if (delivery.uow != null) {
- delivery.uow.setDisposer(^ {delivery.ack(null)})
+ delivery.uow.setDisposer(^ {
+ pendingAck(null)
+ pendingAck=null
+ })
} else {
- delivery.ack(null)
+ pendingAck(null)
+ pendingAck==null
}
}
if (delivery.uow != null) {
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1032562&r1=1032561&r2=1032562&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Mon Nov 8 13:01:44 2010
@@ -34,6 +34,7 @@ class StompTestSupport extends FunSuiteS
}
var client = new StompClient
+ var clients = List[StompClient]()
override protected def afterAll() = {
broker.stop
@@ -41,7 +42,8 @@ class StompTestSupport extends FunSuiteS
override protected def afterEach() = {
super.afterEach
- client.close
+ clients.foreach(_.close)
+ clients = Nil
}
def connect(version:String, c: StompClient = client) = {
@@ -63,6 +65,7 @@ class StompTestSupport extends FunSuiteS
frame should startWith("CONNECTED\n")
frame should include regex("""session:.+?\n""")
frame should include("version:"+version+"\n")
+ clients ::= c
c
}
@@ -399,6 +402,58 @@ class StompDestinationTest extends Stomp
}
}
+class StompReceiptTest extends StompTestSupport {
+
+ test("Receipts on SEND to unconsummed topic") {
+ connect("1.1")
+
+ def put(id:Int) = {
+ client.write(
+ "SEND\n" +
+ "destination:/topic/receipt-test\n" +
+ "receipt:"+id+"\n" +
+ "\n" +
+ "message:"+id+"\n")
+ }
+
+ put(1)
+ put(2)
+ wait_for_receipt("1")
+ wait_for_receipt("2")
+
+
+ }
+
+ test("Receipts on SEND to a consumed topic") {
+ connect("1.1")
+
+ def put(id:Int) = {
+ client.write(
+ "SEND\n" +
+ "destination:/topic/receipt-test\n" +
+ "receipt:"+id+"\n" +
+ "\n" +
+ "message:"+id+"\n")
+ }
+
+ // start a consumer on a different connection
+ var consumer = new StompClient
+ connect("1.1", consumer)
+ consumer.write(
+ "SUBSCRIBE\n" +
+ "destination:/topic/receipt-test\n" +
+ "id:0\n" +
+ "receipt:0\n" +
+ "\n")
+ wait_for_receipt("0", consumer)
+
+ put(1)
+ put(2)
+ wait_for_receipt("1")
+ wait_for_receipt("2")
+
+ }
+}
class StompTransactionTest extends StompTestSupport {