Author: chirino
Date: Mon Feb 20 10:03:17 2012
New Revision: 1291174

URL: http://svn.apache.org/viewvc?rev=1291174&view=rev
Log:
Support configuring the size of the tail buffer on a queue.

Modified:
    
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
    
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md

Modified: 
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: 
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1291174&r1=1291173&r2=1291174&view=diff
==============================================================================
--- 
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
 (original)
+++ 
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
 Mon Feb 20 10:03:17 2012
@@ -184,9 +184,6 @@ class Queue(val router: LocalRouter, val
   var swapping_out_size = 0
 
   val producer_swapped_in = new MemorySpace
-  // To allow overflow to drain into the queue even when there are no 
producers.
-  producer_swapped_in.size_max = 1024
-
   val consumer_swapped_in = new MemorySpace
 
   var swap_out_item_counter = 0L
@@ -220,26 +217,31 @@ class Queue(val router: LocalRouter, val
   var loaded_size = 0
   def swapped_in_size_max = this.producer_swapped_in.size_max + 
this.consumer_swapped_in.size_max
 
-  def configure(c:QueueDTO) = {
-    config = c
-    tune_persistent = virtual_host.store !=null && 
config.persistent.getOrElse(true)
-    tune_swap = tune_persistent && config.swap.getOrElse(true)
-    tune_swap_range_size = config.swap_range_size.getOrElse(10000)
-    tune_consumer_buffer = 
Option(config.consumer_buffer).map(MemoryPropertyEditor.parse(_).toInt).getOrElse(256*1024)
-    tune_fast_delivery_rate = 
Option(config.fast_delivery_rate).map(MemoryPropertyEditor.parse(_).toInt).getOrElse(1024*1024)
-    tune_catchup_enqueue_rate = 
Option(config.catchup_enqueue_rate).map(MemoryPropertyEditor.parse(_).toInt).getOrElse(-1)
-    tune_max_enqueue_rate = 
Option(config.max_enqueue_rate).map(MemoryPropertyEditor.parse(_).toInt).getOrElse(-1)
-
-    tune_quota = 
Option(config.quota).map(MemoryPropertyEditor.parse(_)).getOrElse(-1)
+  def configure(update:QueueDTO) = {
+    def mem_size(value:String, default:Int) = 
Option(value).map(MemoryPropertyEditor.parse(_).toInt).getOrElse(default)
 
-    auto_delete_after = config.auto_delete_after.getOrElse(30)
+    producer_swapped_in.size_max += mem_size(update.tail_buffer, 1024*64) - 
Option(config).map{ config=>
+      mem_size(config.tail_buffer, 1024*64)
+    }.getOrElse(0)
+
+    tune_persistent = virtual_host.store !=null && 
update.persistent.getOrElse(true)
+    tune_swap = tune_persistent && update.swap.getOrElse(true)
+    tune_swap_range_size = update.swap_range_size.getOrElse(10000)
+    tune_consumer_buffer = mem_size(update.consumer_buffer, 256*1024)
+    tune_fast_delivery_rate = mem_size(update.fast_delivery_rate,1024*1024)
+    tune_catchup_enqueue_rate = mem_size(update.catchup_enqueue_rate,-1)
+    tune_max_enqueue_rate = mem_size(update.max_enqueue_rate,-1)
+    tune_quota = mem_size(update.quota,-1)
+    auto_delete_after = update.auto_delete_after.getOrElse(30)
     if( auto_delete_after!= 0 ) {
       // we don't auto delete explicitly configured queues,
       // non destination queues, or mirrored queues.
-      if( config.mirrored.getOrElse(false) || 
!binding.isInstanceOf[QueueDomainQueueBinding] || 
!LocalRouter.is_wildcard_config(config) ) {
+      if( update.mirrored.getOrElse(false) || 
!binding.isInstanceOf[QueueDomainQueueBinding] || 
!LocalRouter.is_wildcard_config(update) ) {
         auto_delete_after = 0
       }
     }
+
+    config = update
   }
   configure(config)
 

Modified: 
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
URL: 
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java?rev=1291174&r1=1291173&r2=1291174&view=diff
==============================================================================
--- 
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
 (original)
+++ 
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
 Mon Feb 20 10:03:17 2012
@@ -51,6 +51,13 @@ public class QueueDTO extends StringIdDT
     public Boolean mirrored;
 
     /**
+     *  The amount of memory buffer space to use for swapping messages
+     *  out.
+     */
+    @XmlAttribute(name="tail_buffer")
+    public String tail_buffer;
+
+    /**
      *  The amount of memory buffer space to use per consumer.
      */
     @XmlAttribute(name="consumer_buffer")

Modified: 
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: 
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1291174&r1=1291173&r2=1291174&view=diff
==============================================================================
--- 
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md 
(original)
+++ 
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md 
Mon Feb 20 10:03:17 2012
@@ -313,6 +313,9 @@ A `queue` element may be configured with
   [Mirrored Queues](Mirrored_Queues) documentation for more 
   details.  Defaults to false.
 
+* `tail_buffer` : The amount of memory buffer space allocated for holding
+freshly enqueued message.  Defaults to 64k.
+
 * `consumer_buffer` : The amount of memory buffer space allocated to each
 subscription for receiving messages.  Defaults to 256k.
 


Reply via email to