Author: chirino
Date: Mon Oct 18 20:26:29 2010
New Revision: 1023996
URL: http://svn.apache.org/viewvc?rev=1023996&view=rev
Log:
placing producer routes in an LRU cache
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/LRUCache.java
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=1023996&r1=1023995&r2=1023996&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
Mon Oct 18 20:26:29 2010
@@ -35,6 +35,7 @@ import org.apache.activemq.apollo.store.
import org.apache.activemq.apollo.util._
import org.apache.activemq.apollo.dto.{BindingDTO,
DurableSubscriptionBindingDTO, PointToPointBindingDTO}
import java.util.concurrent.TimeUnit
+import java.util.Map.Entry
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -252,7 +253,12 @@ class StompProtocolHandler extends Proto
var closed = false
var consumers = Map[AsciiBuffer, StompConsumer]()
- var producerRoutes = Map[Destination, DeliveryProducerRoute]()
+ var producerRoutes = new LRUCache[Destination, DeliveryProducerRoute](10) {
+ override def onCacheEviction(eldest: Entry[Destination,
DeliveryProducerRoute]) = {
+ host.router.disconnect(eldest.getValue)
+ }
+ }
+
var host:VirtualHost = null
private def queue = connection.dispatchQueue
@@ -277,10 +283,12 @@ class StompProtocolHandler extends Proto
if( !closed ) {
heart_beat_monitor.stop
closed=true;
+
+ import collection.JavaConversions._
producerRoutes.foreach{
case(_,route)=> host.router.disconnect(route)
}
- producerRoutes = Map()
+ producerRoutes.clear
consumers.foreach {
case (_,consumer)=>
if( consumer.binding==null ) {
@@ -487,7 +495,7 @@ class StompProtocolHandler extends Proto
val destiantion: Destination = get(frame.headers, DESTINATION).get
producerRoutes.get(destiantion) match {
- case None =>
+ case null =>
// create the producer route...
val producer = new DeliveryProducer() {
@@ -505,12 +513,12 @@ class StompProtocolHandler extends Proto
route.refiller = ^ {
connection.transport.resumeRead
}
- producerRoutes += destiantion -> route
+ producerRoutes.put(destiantion, route)
send_via_route(route, frame, uow)
}
}
- case Some(route) =>
+ case route =>
// we can re-use the existing producer route
send_via_route(route, frame, uow)
Modified:
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/LRUCache.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/LRUCache.java?rev=1023996&r1=1023995&r2=1023996&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/LRUCache.java
(original)
+++
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/LRUCache.java
Mon Oct 18 20:26:29 2010
@@ -79,6 +79,13 @@ public class LRUCache<K, V> extends Link
}
protected boolean removeEldestEntry(Map.Entry<K,V> eldest) {
- return size() > maxCacheSize;
+ if( size() > maxCacheSize ) {
+ onCacheEviction(eldest);
+ return true;
+ }
+ return false;
+ }
+
+ protected void onCacheEviction(Map.Entry<K,V> eldest) {
}
}