Author: chirino
Date: Wed Oct 20 14:12:41 2010
New Revision: 1025592
URL: http://svn.apache.org/viewvc?rev=1025592&view=rev
Log:
Fixing nack/redelivery bug.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1025592&r1=1025591&r2=1025592&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Wed Oct 20 14:12:41 2010
@@ -29,7 +29,7 @@ import collection.mutable.ListBuffer
import org.apache.activemq.apollo.store._
import org.apache.activemq.apollo.util._
import org.apache.activemq.apollo.util.list._
-import org.fusesource.hawtdispatch.{ListEventAggregator, DispatchQueue,
BaseRetained}
+import org.fusesource.hawtdispatch.{Dispatch, ListEventAggregator,
DispatchQueue, BaseRetained}
object Queue extends Log {
val subcsription_counter = new AtomicInteger(0)
@@ -377,7 +377,7 @@ class Queue(val host: VirtualHost, var i
// Combine flushed items into flushed ranges
if( flushed_items > tune_flush_range_size*2 ) {
- println("Looking for flushed entries to combine")
+ debug("Looking for flushed entries to combine")
var distance_from_sub = tune_flush_range_size;
var cur = entries.getHead
@@ -405,7 +405,7 @@ class Queue(val host: VirtualHost, var i
cur = next
}
- println("combined "+combine_counter+" entries")
+ debug("combined %d entries", combine_counter)
}
@@ -880,7 +880,6 @@ class QueueEntry(val queue:Queue, val se
*/
override def dispatch() = {
if( parked != Nil ) {
-
advance(parked)
parked = Nil
true
@@ -1370,10 +1369,6 @@ class Subscription(queue:Queue) extends
pos -= this
pos = null
- session.refiller = null
- session.close
- session = null
-
// nack all the acquired entries.
var next = acquired.getHead
while( next !=null ) {
@@ -1381,6 +1376,10 @@ class Subscription(queue:Queue) extends
next = next.getNext
cur.nack // this unlinks the entry.
}
+
+ session.refiller = null
+ session.close
+ session = null
}
/**
@@ -1411,8 +1410,10 @@ class Subscription(queue:Queue) extends
*/
def rewind(value:QueueEntry):Unit = {
assert(value!=null)
+ pos -= this
+ value ::= this
pos = value
- session.refiller = pos
+ session.refiller = value
queue.dispatchQueue << value // queue up the entry to get dispatched..
}