Author: chirino
Date: Sat Jan 21 04:21:11 2012
New Revision: 1234263
URL: http://svn.apache.org/viewvc?rev=1234263&view=rev
Log:
When a queue is being stopped, make sure you wait for any in flight message
store operations to complete before reporting the queue has been stopped.
This avoids an ordering issue where a queue could get deleted from a store but
then queue store operations are performed later.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1234263&r1=1234262&r2=1234263&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Sat Jan 21 04:21:11 2012
@@ -457,6 +457,8 @@ class Queue(val router: LocalRouter, val
}
}
+ var stop_listener_waiting_for_flush:Runnable = _
+
protected def _stop(on_completed: Runnable) = {
// Disconnect the producers..
producers.foreach { producer =>
@@ -469,15 +471,25 @@ class Queue(val router: LocalRouter, val
trigger_swap
- destination_dto match {
- case d:DurableSubscriptionDestinationDTO =>
-
DestinationMetricsSupport.add_destination_metrics(virtual_host.dead_dsub_metrics,
get_queue_metrics)
- case t:TopicDestinationDTO =>
- // metrics are taken care of by topic
- case _ =>
-
DestinationMetricsSupport.add_destination_metrics(virtual_host.dead_queue_metrics,
get_queue_metrics)
+ stop_listener_waiting_for_flush = on_completed
+ if( swapping_out_size==0 ) {
+ on_queue_flushed
+ }
+ }
+
+ def on_queue_flushed = {
+ if(stop_listener_waiting_for_flush!=null) {
+ destination_dto match {
+ case d:DurableSubscriptionDestinationDTO =>
+
DestinationMetricsSupport.add_destination_metrics(virtual_host.dead_dsub_metrics,
get_queue_metrics)
+ case t:TopicDestinationDTO =>
+ // metrics are taken care of by topic
+ case _ =>
+
DestinationMetricsSupport.add_destination_metrics(virtual_host.dead_queue_metrics,
get_queue_metrics)
+ }
+ stop_listener_waiting_for_flush.run()
+ stop_listener_waiting_for_flush = null
}
- on_completed.run
}
def might_unfill[T](func: =>T):T = {
@@ -1383,9 +1395,14 @@ class QueueEntry(val queue:Queue, val se
if(!storing) {
storing = true
delivery.uow.enqueue(toQueueEntryRecord)
+ queue.swapping_out_size+=size
delivery.uow.on_flush { canceled =>
queue.swap_out_completes_source.merge(^{
+ queue.swapping_out_size-=size
this.swapped_out(!canceled)
+ if( queue.swapping_out_size==0 ) {
+ queue.on_queue_flushed
+ }
})
}
}
@@ -1395,7 +1412,6 @@ class QueueEntry(val queue:Queue, val se
if( queue.tune_swap && !swapping_out ) {
swapping_out=true
- queue.swapping_out_size+=size
if( stored ) {
swapped_out(false)
} else {
@@ -1439,10 +1455,8 @@ class QueueEntry(val queue:Queue, val se
stored = true
delivery.uow = null
if( swapping_out ) {
-
swapping_out = false
space -= delivery
- queue.swapping_out_size-=size
if( store_wrote_to_disk ) {
queue.swap_out_size_counter += size
@@ -1474,10 +1488,7 @@ class QueueEntry(val queue:Queue, val se
this.space = space
this.space += delivery
}
- if( swapping_out ) {
- swapping_out = false
- queue.swapping_out_size-=size
- }
+ swapping_out = false
}
override def remove = {