Author: chirino
Date: Thu Jun 9 01:12:33 2011
New Revision: 1133620
URL: http://svn.apache.org/viewvc?rev=1133620&view=rev
Log:
Tune the delivery sessions based on the the connection's read/write buffer
sizes.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1133620&r1=1133619&r2=1133620&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
Thu Jun 9 01:12:33 2011
@@ -39,6 +39,8 @@ trait DeliveryProducer {
def connection:Option[BrokerConnection] = None
+ def send_buffer_size = 64*1024
+
def collocate(value:DispatchQueue):Unit = {
if( value.getTargetQueue ne dispatch_queue.getTargetQueue ) {
debug("co-locating %s with %s", dispatch_queue.getLabel, value.getLabel);
@@ -57,6 +59,8 @@ trait DeliveryConsumer extends Retained
def connection:Option[BrokerConnection] = None
+ def receive_buffer_size = 64*1024
+
def browser = false
def exclusive = false
def dispatch_queue:DispatchQueue;
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1133620&r1=1133619&r2=1133620&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Thu Jun 9 01:12:33 2011
@@ -90,11 +90,6 @@ class Queue(val router: LocalRouter, val
//
/**
- * The amount of memory buffer space for receiving messages.
- */
- def tune_producer_buffer = config.producer_buffer.getOrElse(256*1024)
-
- /**
* The amount of memory buffer space for the queue..
*/
def tune_queue_buffer = config.queue_buffer.getOrElse(32*1024)
@@ -468,11 +463,12 @@ class Queue(val router: LocalRouter, val
override def producer = p
- val session = session_manager.open(producer.dispatch_queue)
+ val session_max = producer.send_buffer_size
+ val session = session_manager.open(producer.dispatch_queue, session_max)
dispatch_queue {
inbound_sessions += this
- addCapacity( tune_producer_buffer )
+ addCapacity( session_max )
}
def remaining_capacity = session.remaining_capacity
@@ -480,7 +476,7 @@ class Queue(val router: LocalRouter, val
def close = {
session_manager.close(session)
dispatch_queue {
- addCapacity( -tune_producer_buffer )
+ addCapacity( -session_max )
inbound_sessions -= this
}
release
@@ -592,6 +588,7 @@ class Queue(val router: LocalRouter, val
override def connection:Option[BrokerConnection] = None
+ override def send_buffer_size = tune_queue_buffer
/////////////////////////////////////////////////////////////////////
//
Modified:
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java?rev=1133620&r1=1133619&r2=1133620&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
(original)
+++
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
Thu Jun 9 01:12:33 2011
@@ -37,12 +37,6 @@ public class QueueDTO extends StringIdDT
public Boolean unified;
/**
- * The amount of memory buffer space for receiving messages.
- */
- @XmlAttribute(name="producer_buffer")
- public Integer producer_buffer;
-
- /**
* The amount of memory buffer space for the queue..
*/
@XmlAttribute(name="queue_buffer")
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1133620&r1=1133619&r2=1133620&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Thu Jun 9 01:12:33 2011
@@ -253,6 +253,7 @@ class StompProtocolHandler extends Proto
val dispatch_queue = StompProtocolHandler.this.dispatchQueue
override def connection = Some(StompProtocolHandler.this.connection)
+ override def receive_buffer_size = codec.write_buffer_size
def is_persistent = false
@@ -281,7 +282,7 @@ class StompProtocolHandler extends Proto
def consumer = StompConsumer.this
var closed = false
- val session = session_manager.open(producer.dispatch_queue)
+ val session = session_manager.open(producer.dispatch_queue,
codec.write_buffer_size)
def remaining_capacity = session.remaining_capacity
@@ -373,6 +374,8 @@ class StompProtocolHandler extends Proto
var destination_parser = Stomp.destination_parser
+ var codec:StompCodec = _
+
implicit def toDestinationDTO(value:AsciiBuffer):Array[DestinationDTO] = {
val rc = destination_parser.decode_destination(value.toString)
if( rc==null ) {
@@ -385,7 +388,7 @@ class StompProtocolHandler extends Proto
super.set_connection(connection)
import collection.JavaConversions._
- val codec = connection.transport.getProtocolCodec.asInstanceOf[StompCodec]
+ codec = connection.transport.getProtocolCodec.asInstanceOf[StompCodec]
config = connection.connector.config.protocols.find(
_.isInstanceOf[StompDTO]).map(_.asInstanceOf[StompDTO]).getOrElse(new StompDTO)
protocol_filters =
ProtocolFilter.create_filters(config.protocol_filters.toList, this)
@@ -758,6 +761,7 @@ class StompProtocolHandler extends Proto
// create the producer route...
val route = new DeliveryProducerRoute(host.router) {
+ override def send_buffer_size = codec.read_buffer_size
override def connection = Some(StompProtocolHandler.this.connection)
override def dispatch_queue = queue