Author: chirino
Date: Mon Feb 13 19:31:26 2012
New Revision: 1243663
URL: http://svn.apache.org/viewvc?rev=1243663&view=rev
Log:
Perf optimization, avoid parsing destination address for every message that
comes in.
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1243663&r1=1243662&r2=1243663&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Mon Feb 13 19:31:26 2012
@@ -553,9 +553,9 @@ class StompProtocolHandler extends Proto
var closed = false
var consumers = Map[AsciiBuffer, StompConsumer]()
- var producerRoutes = new LRUCache[List[ConnectAddress],
DeliveryProducerRoute](10) {
- override def onCacheEviction(eldest: Entry[List[ConnectAddress],
DeliveryProducerRoute]) = {
- host.router.disconnect(eldest.getKey.toArray, eldest.getValue)
+ var producerRoutes = new LRUCache[AsciiBuffer, StompProducerRoute](10) {
+ override def onCacheEviction(eldest: Entry[AsciiBuffer,
StompProducerRoute]) = {
+ host.router.disconnect(eldest.getValue.addresses, eldest.getValue)
}
}
@@ -722,10 +722,9 @@ class StompProtocolHandler extends Proto
dead = true;
import collection.JavaConversions._
- producerRoutes.foreach{ case(dests,route)=>
- val addresses = dests.toArray
+ producerRoutes.values().foreach{ route=>
host.dispatch_queue {
- host.router.disconnect(addresses, route)
+ host.router.disconnect(route.addresses, route)
}
}
producerRoutes.clear
@@ -1003,27 +1002,32 @@ class StompProtocolHandler extends Proto
}
}
- def perform_send(frame:StompFrame, uow:StoreUOW=null): Unit = {
- val addresses = decode_addresses(get(frame.headers, DESTINATION).get)
+ class StompProducerRoute(dest: AsciiBuffer) extends
DeliveryProducerRoute(host.router) {
+
+ val addresses = decode_addresses(dest)
val key = addresses.toList
- producerRoutes.get(key) match {
- case null =>
- // create the producer route...
- val route = new DeliveryProducerRoute(host.router) {
- override def send_buffer_size = codec.read_buffer_size
- override def connection = Some(StompProtocolHandler.this.connection)
- override def dispatch_queue = queue
+ override def send_buffer_size = codec.read_buffer_size
- refiller = ^{
- resume_read
- }
- }
+ override def connection = Some(StompProtocolHandler.this.connection)
+
+ override def dispatch_queue = queue
- // don't process frames until producer is connected...
+ refiller = ^ {
+ resume_read
+ }
+ }
+
+
+ def perform_send(frame:StompFrame, uow:StoreUOW=null): Unit = {
+ val dest = get(frame.headers, DESTINATION).get
+ producerRoutes.get(dest) match {
+ case null =>
+ // create the producer route...
+ val route = new StompProducerRoute(dest) // don't process frames
until producer is connected...
connection.transport.suspendRead
host.dispatch_queue {
- val rc = host.router.connect(addresses, route, security_context)
+ val rc = host.router.connect(route.addresses, route,
security_context)
dispatchQueue {
rc match {
case Some(failure) =>
@@ -1031,8 +1035,8 @@ class StompProtocolHandler extends Proto
case None =>
if (!connection.stopped) {
resume_read
- producerRoutes.put(key, route)
- send_via_route(addresses, route, frame, uow)
+ producerRoutes.put(dest, route)
+ send_via_route(route.addresses, route, frame, uow)
}
}
}
@@ -1040,7 +1044,7 @@ class StompProtocolHandler extends Proto
case route =>
// we can re-use the existing producer route
- send_via_route(addresses, route, frame, uow)
+ send_via_route(route.addresses, route, frame, uow)
}
}