Author: chirino
Date: Tue Dec 21 15:33:59 2010
New Revision: 1051530
URL: http://svn.apache.org/viewvc?rev=1051530&view=rev
Log:
protect against an empty range load.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1051530&r1=1051529&r2=1051530&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Tue Dec 21 15:33:59 2010
@@ -1215,41 +1215,45 @@ class QueueEntry(val queue:Queue, val se
if( !loading ) {
loading = true
queue.host.store.listQueueEntries(queue.id, seq, last) { records =>
- queue.dispatchQueue {
+ if( !records.isEmpty ) {
+ queue.dispatchQueue {
- var item_count=0
- var size_count=0
+ var item_count=0
+ var size_count=0
- val tmpList = new LinkedNodeList[QueueEntry]()
- records.foreach { record =>
- val entry = new QueueEntry(queue, record.queueSeq).init(record)
- tmpList.addLast(entry)
- item_count += 1
- size_count += record.size
- }
-
- // we may need to adjust the enqueue count if entries
- // were dropped at the store level
- var item_delta = (count - item_count)
- val size_delta: Int = size - size_count
-
- if ( item_delta!=0 || size_delta!=0 ) {
- info("Detected store change in range %d to %d. %d message(s) and
%d bytes", seq, last, item_delta, size_delta)
- queue.enqueue_item_counter += item_delta
- queue.enqueue_size_counter += size_delta
- }
+ val tmpList = new LinkedNodeList[QueueEntry]()
+ records.foreach { record =>
+ val entry = new QueueEntry(queue, record.queueSeq).init(record)
+ tmpList.addLast(entry)
+ item_count += 1
+ size_count += record.size
+ }
+
+ // we may need to adjust the enqueue count if entries
+ // were dropped at the store level
+ var item_delta = (count - item_count)
+ val size_delta: Int = size - size_count
+
+ if ( item_delta!=0 || size_delta!=0 ) {
+ info("Detected store change in range %d to %d. %d message(s)
and %d bytes", seq, last, item_delta, size_delta)
+ queue.enqueue_item_counter += item_delta
+ queue.enqueue_size_counter += size_delta
+ }
+
+ linkAfter(tmpList)
+ val next = getNext
+
+ // move the subs to the first entry that we just loaded.
+ parked.foreach(_.advance(next))
+ next :::= parked
+ queue.trigger_swap
- linkAfter(tmpList)
- val next = getNext
+ unlink
- // move the subs to the first entry that we just loaded.
- parked.foreach(_.advance(next))
- next :::= parked
- queue.trigger_swap
-
- unlink
-
- // TODO: refill prefetches
+ // TODO: refill prefetches
+ }
+ } else {
+ warn("range load failed")
}
}
}