Author: chirino
Date: Wed Mar 20 19:20:34 2013
New Revision: 1459006
URL: http://svn.apache.org/r1459006
Log:
Fixes APLO-313: Avoid blocking producers if consumers are not likely to catch
up within a few seconds.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1459006&r1=1459005&r2=1459006&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Wed Mar 20 19:20:34 2013
@@ -878,6 +878,7 @@ class Queue(val router: LocalRouter, val
// swap out messages.
cur = entries.getHead.getNext
var dropping_head_entries = is_topic_queue
+ var distance_from_last_prefetch = 0L
while( cur!=null ) {
val next = cur.getNext
if ( dropping_head_entries ) {
@@ -897,7 +898,9 @@ class Queue(val router: LocalRouter, val
if( cur.prefetched ) {
// Prefteched entries need to get loaded..
cur.load(consumer_swapped_in)
+ distance_from_last_prefetch = 0
} else {
+
// This is a non-prefetched entry.. entires ahead and behind the
// consumer subscriptions.
val loaded = cur.as_loaded
@@ -911,8 +914,17 @@ class Queue(val router: LocalRouter, val
// about what gets swapped out..
if (cur.memory_space eq producer_swapped_in ) {
- // Entry will be used soon..
- cur.load(producer_swapped_in)
+ // If we think we can catch up in seconds.. lets keep it in
producer_swapped_in to
+ // pause the producer.
+ val max_distance = delivery_rate * 2;
+ if( distance_from_last_prefetch < max_distance ) {
+ // Looks like the entry will be used soon..
+ cur.load(producer_swapped_in)
+ } else {
+ // Does not look to be anywhere close to the consumer.. so
get
+ // rid of it asap.
+ cur.swap(true)
+ }
} else if ( cur.is_acquired ) {
// Entry was just used...
cur.load(consumer_swapped_in)
@@ -924,6 +936,8 @@ class Queue(val router: LocalRouter, val
}
}
}
+
+ distance_from_last_prefetch += cur.size
}
}
cur = next
@@ -971,12 +985,13 @@ class Queue(val router: LocalRouter, val
}
def swapped_out_size = queue_size - (producer_swapped_in.size +
consumer_swapped_in.size)
+ var delivery_rate = 0
def queue_maintenance:Unit = {
var elapsed = System.currentTimeMillis-now
now += elapsed
- var delivery_rate = 0
+ delivery_rate = 0
var avg_browser_delivery_rate = 0
var avg_sub_stall_ms = 0L