Author: chirino
Date: Mon Feb 20 10:03:17 2012
New Revision: 1291174
URL: http://svn.apache.org/viewvc?rev=1291174&view=rev
Log:
Support configuring the size of the tail buffer on a queue.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1291174&r1=1291173&r2=1291174&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Mon Feb 20 10:03:17 2012
@@ -184,9 +184,6 @@ class Queue(val router: LocalRouter, val
var swapping_out_size = 0
val producer_swapped_in = new MemorySpace
- // To allow overflow to drain into the queue even when there are no
producers.
- producer_swapped_in.size_max = 1024
-
val consumer_swapped_in = new MemorySpace
var swap_out_item_counter = 0L
@@ -220,26 +217,31 @@ class Queue(val router: LocalRouter, val
var loaded_size = 0
def swapped_in_size_max = this.producer_swapped_in.size_max +
this.consumer_swapped_in.size_max
- def configure(c:QueueDTO) = {
- config = c
- tune_persistent = virtual_host.store !=null &&
config.persistent.getOrElse(true)
- tune_swap = tune_persistent && config.swap.getOrElse(true)
- tune_swap_range_size = config.swap_range_size.getOrElse(10000)
- tune_consumer_buffer =
Option(config.consumer_buffer).map(MemoryPropertyEditor.parse(_).toInt).getOrElse(256*1024)
- tune_fast_delivery_rate =
Option(config.fast_delivery_rate).map(MemoryPropertyEditor.parse(_).toInt).getOrElse(1024*1024)
- tune_catchup_enqueue_rate =
Option(config.catchup_enqueue_rate).map(MemoryPropertyEditor.parse(_).toInt).getOrElse(-1)
- tune_max_enqueue_rate =
Option(config.max_enqueue_rate).map(MemoryPropertyEditor.parse(_).toInt).getOrElse(-1)
-
- tune_quota =
Option(config.quota).map(MemoryPropertyEditor.parse(_)).getOrElse(-1)
+ def configure(update:QueueDTO) = {
+ def mem_size(value:String, default:Int) =
Option(value).map(MemoryPropertyEditor.parse(_).toInt).getOrElse(default)
- auto_delete_after = config.auto_delete_after.getOrElse(30)
+ producer_swapped_in.size_max += mem_size(update.tail_buffer, 1024*64) -
Option(config).map{ config=>
+ mem_size(config.tail_buffer, 1024*64)
+ }.getOrElse(0)
+
+ tune_persistent = virtual_host.store !=null &&
update.persistent.getOrElse(true)
+ tune_swap = tune_persistent && update.swap.getOrElse(true)
+ tune_swap_range_size = update.swap_range_size.getOrElse(10000)
+ tune_consumer_buffer = mem_size(update.consumer_buffer, 256*1024)
+ tune_fast_delivery_rate = mem_size(update.fast_delivery_rate,1024*1024)
+ tune_catchup_enqueue_rate = mem_size(update.catchup_enqueue_rate,-1)
+ tune_max_enqueue_rate = mem_size(update.max_enqueue_rate,-1)
+ tune_quota = mem_size(update.quota,-1)
+ auto_delete_after = update.auto_delete_after.getOrElse(30)
if( auto_delete_after!= 0 ) {
// we don't auto delete explicitly configured queues,
// non destination queues, or mirrored queues.
- if( config.mirrored.getOrElse(false) ||
!binding.isInstanceOf[QueueDomainQueueBinding] ||
!LocalRouter.is_wildcard_config(config) ) {
+ if( update.mirrored.getOrElse(false) ||
!binding.isInstanceOf[QueueDomainQueueBinding] ||
!LocalRouter.is_wildcard_config(update) ) {
auto_delete_after = 0
}
}
+
+ config = update
}
configure(config)
Modified:
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java?rev=1291174&r1=1291173&r2=1291174&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
(original)
+++
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
Mon Feb 20 10:03:17 2012
@@ -51,6 +51,13 @@ public class QueueDTO extends StringIdDT
public Boolean mirrored;
/**
+ * The amount of memory buffer space to use for swapping messages
+ * out.
+ */
+ @XmlAttribute(name="tail_buffer")
+ public String tail_buffer;
+
+ /**
* The amount of memory buffer space to use per consumer.
*/
@XmlAttribute(name="consumer_buffer")
Modified:
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1291174&r1=1291173&r2=1291174&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
(original)
+++
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
Mon Feb 20 10:03:17 2012
@@ -313,6 +313,9 @@ A `queue` element may be configured with
[Mirrored Queues](Mirrored_Queues) documentation for more
details. Defaults to false.
+* `tail_buffer` : The amount of memory buffer space allocated for holding
+freshly enqueued message. Defaults to 64k.
+
* `consumer_buffer` : The amount of memory buffer space allocated to each
subscription for receiving messages. Defaults to 256k.