Author: chirino
Date: Fri May 18 15:17:12 2012
New Revision: 1340125
URL: http://svn.apache.org/viewvc?rev=1340125&view=rev
Log:
Improving APLO-199 fix. Previous fix attempt cause a different test case to
fail.
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1340125&r1=1340124&r2=1340125&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Fri May 18 15:17:12 2012
@@ -214,7 +214,9 @@ class StompProtocolHandler extends Proto
if( delivery.ack!=null ) {
delivery.ack(Consumed, null)
}
- credit_window_source.merge((delivery.size, 1))
+ if( !dead ) {
+ credit_window_source.merge((delivery.size, 1))
+ }
}
}
@@ -390,10 +392,7 @@ class StompProtocolHandler extends Proto
val consumer_sink = sink_manager.open()
val credit_window_filter = new
CreditWindowFilter[Delivery](consumer_sink.map { delivery =>
-
- if( !dead ) {
- ack_handler.track(delivery)
- }
+ ack_handler.track(delivery)
val message = delivery.message
var frame = if( message.protocol eq StompProtocol ) {
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1340125&r1=1340124&r2=1340125&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Fri May 18 15:17:12 2012
@@ -543,8 +543,7 @@ class StompPersistentQueueTest extends S
var counter = 0
for( i <- 0 until 100 ) {
connect("1.1")
- // Use exclusive to avoid 2 concurrent subs (disconnect is async..)
- subscribe("1", "/queue/BIGQUEUE", "client", false, "exclusive:true\n",
false)
+ subscribe("1", "/queue/BIGQUEUE", "client", false, "", false)
for( j <- 0 until 100 ) {
assert_received("message #"+counter)(true)
counter+=1
@@ -555,7 +554,10 @@ class StompPersistentQueueTest extends S
"\n")
wait_for_receipt("disco", client, true)
client.close
- Thread.sleep(200)
+ within(2, SECONDS) {
+ val status = queue_status("BIGQUEUE")
+ status.consumers.size() should be(0)
+ }
}
connect("1.1")