Author: chirino
Date: Thu Apr 14 16:27:37 2011
New Revision: 1092388
URL: http://svn.apache.org/viewvc?rev=1092388&view=rev
Log:
Simplify setting up the session refiller.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1092388&r1=1092387&r2=1092388&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Thu Apr 14 16:27:37 2011
@@ -674,7 +674,8 @@ class QueueEntry(val queue:Queue, val se
* Dispatches this entry to the consumers and continues dispatching
subsequent
* entries as long as the dispatch results in advancing in their dispatch
position.
*/
- def run() = queue.dispatch_queue {
+ def run() = {
+ queue.assert_executing
var next = this;
while( next!=null && next.dispatch) {
next = next.getNext
@@ -1371,7 +1372,7 @@ class Subscription(val queue:Queue, val
assert(pos!=null)
session = consumer.connect(this)
- session.refiller = pos
+ session.refiller = dispatch_queue.runnable { pos.run }
queue.head_entry ::= this
queue.all_subscriptions += consumer -> this
@@ -1431,7 +1432,6 @@ class Subscription(val queue:Queue, val
advanced_size += pos.size
pos = value
- session.refiller = pos
if( tail_parked ) {
tail_parkings += 0
@@ -1450,7 +1450,6 @@ class Subscription(val queue:Queue, val
pos -= this
value ::= this
pos = value
- session.refiller = value
queue.dispatch_queue << value // queue up the entry to get dispatched..
}