Author: chirino
Date: Fri Jun 22 15:37:45 2012
New Revision: 1352930
URL: http://svn.apache.org/viewvc?rev=1352930&view=rev
Log:
Fixes APLO-211: Incorrect dequeued counter for topics
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1352930&r1=1352929&r2=1352930&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
Fri Jun 22 15:37:45 2012
@@ -439,7 +439,6 @@ class Topic(val router:LocalRouter, val
def unbind (consumer:DeliveryConsumer, persistent:Boolean) = {
for(proxy <- consumers.remove(consumer)) {
- add_dequeue_counters(topic_metrics, proxy.link)
val list = consumer_queues.remove(consumer) match {
case Some(queue) =>
queue.unbind(List(consumer))
@@ -465,6 +464,7 @@ class Topic(val router:LocalRouter, val
}
List(queue)
case None =>
+ add_dequeue_counters(topic_metrics, proxy.link)
List(consumer)
}
producers.keys.foreach({ r=>
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala?rev=1352930&r1=1352929&r2=1352930&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
Fri Jun 22 15:37:45 2012
@@ -141,6 +141,8 @@ class BrokerFunSuiteSupport extends FunS
Option(broker.web_server).flatMap(_.uris().find(_.getScheme == scheme)).get
}
+ def json(value:Any) =
org.apache.activemq.apollo.dto.JsonCodec.encode(value).ascii().toString;
+
}
class MultiBrokerTestSupport extends FunSuiteSupport {
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1352930&r1=1352929&r2=1352930&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Fri Jun 22 15:37:45 2012
@@ -21,13 +21,13 @@ import org.scalatest.BeforeAndAfterEach
import java.lang.String
import java.util.concurrent.TimeUnit._
import org.apache.activemq.apollo.util._
-import org.apache.activemq.apollo.dto.KeyStorageDTO
import java.util.concurrent.atomic.AtomicLong
import FileSupport._
import java.net.InetSocketAddress
import java.nio.channels.DatagramChannel
import org.fusesource.hawtbuf.AsciiBuffer
import org.apache.activemq.apollo.broker._
+import org.apache.activemq.apollo.dto.{TopicStatusDTO, KeyStorageDTO}
class StompTestSupport extends BrokerFunSuiteSupport with ShouldMatchers with
BeforeAndAfterEach {
@@ -72,6 +72,16 @@ class StompTestSupport extends BrokerFun
c
}
+ def disconnect(c: StompClient = client) = {
+ val rid = receipt_counter.incrementAndGet()
+ client.write(
+ "DISCONNECT\n" +
+ "receipt:"+rid+"\n" +
+ "\n")
+ wait_for_receipt(""+rid, c)
+ close(c)
+ }
+
def close(c: StompClient = client) = c.close()
val receipt_counter = new AtomicLong()
@@ -171,6 +181,25 @@ class StompTestSupport extends BrokerFun
*/
class StompMetricsTest extends StompTestSupport {
+ test("slow_consumer_policy='queue' metrics stay consistent on consumer close
(APLO-211)") {
+ connect("1.1")
+
+ subscribe("0", "/topic/queued.APLO-211", "client");
+ async_send("/topic/queued.APLO-211", 1)
+ assert_received(1)(true)
+
+ val stat1 = topic_status("queued.APLO-211").metrics
+ disconnect()
+
+ within(3, SECONDS) {
+ val stat2 = topic_status("queued.APLO-211").metrics
+ stat2.producer_count should be(stat1.producer_count-1)
+ stat2.consumer_count should be(stat1.consumer_count-1)
+ stat2.enqueue_item_counter should be(stat1.enqueue_item_counter)
+ stat2.dequeue_item_counter should be(stat1.dequeue_item_counter)
+ }
+ }
+
test("Deleted qeueus are removed to aggregate queue-stats") {
connect("1.1")