Author: rajdavies
Date: Thu Mar 13 04:36:43 2008
New Revision: 636724
URL: http://svn.apache.org/viewvc?rev=636724&view=rev
Log:
Added properties for caching of temp destinations
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractTempRegion.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractTempRegion.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractTempRegion.java?rev=636724&r1=636723&r2=636724&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractTempRegion.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractTempRegion.java
Thu Mar 13 04:36:43 2008
@@ -31,14 +31,18 @@
import org.apache.commons.logging.LogFactory;
/**
- *
+ *
*/
public abstract class AbstractTempRegion extends AbstractRegion {
- private static int TIME_BEFORE_PURGE = 60000;
private static final Log LOG = LogFactory.getLog(TempQueueRegion.class);
- private Map<CachedDestination,Destination> cachedDestinations = new
HashMap<CachedDestination,Destination>();
- private final Timer purgeTimer;
- private final TimerTask purgeTask;
+
+ private Map<CachedDestination, Destination> cachedDestinations = new
HashMap<CachedDestination, Destination>();
+ private final boolean doCacheTempDestinations;
+ private final int purgeTime;
+ private Timer purgeTimer;
+ private TimerTask purgeTask;
+
+
/**
* @param broker
* @param destinationStatistics
@@ -52,56 +56,70 @@
DestinationFactory destinationFactory) {
super(broker, destinationStatistics, memoryManager, taskRunnerFactory,
destinationFactory);
- this.purgeTimer = new Timer(true);
- this.purgeTask = new TimerTask() {
- public void run() {
- doPurge();
- }
-
- };
- this.purgeTimer.schedule(purgeTask,
TIME_BEFORE_PURGE,TIME_BEFORE_PURGE);
- }
+
this.doCacheTempDestinations=broker.getBrokerService().isCacheTempDestinations();
+ this.purgeTime =
broker.getBrokerService().getTimeBeforePurgeTempDestinations();
+ if (this.doCacheTempDestinations) {
+ this.purgeTimer = new Timer(true);
+ this.purgeTask = new TimerTask() {
+ public void run() {
+ doPurge();
+ }
+ };
+ }
+ this.purgeTimer.schedule(purgeTask, purgeTime, purgeTime);
+ }
+
public void stop() throws Exception {
super.stop();
if (purgeTimer != null) {
purgeTimer.cancel();
}
}
-
- protected abstract Destination doCreateDestination(ConnectionContext
context, ActiveMQDestination destination) throws Exception;
- protected synchronized Destination createDestination(ConnectionContext
context, ActiveMQDestination destination) throws Exception {
- Destination result = cachedDestinations.remove(new
CachedDestination(destination));
- if (result==null) {
+ protected abstract Destination doCreateDestination(
+ ConnectionContext context, ActiveMQDestination destination)
+ throws Exception;
+
+ protected synchronized Destination createDestination(
+ ConnectionContext context, ActiveMQDestination destination)
+ throws Exception {
+ Destination result = cachedDestinations.remove(new CachedDestination(
+ destination));
+ if (result == null) {
result = doCreateDestination(context, destination);
}
return result;
}
-
- protected final synchronized void dispose(ConnectionContext
context,Destination dest) throws Exception {
- //add to cache
- cachedDestinations.put(new
CachedDestination(dest.getActiveMQDestination()), dest);
+
+ protected final synchronized void dispose(ConnectionContext context,
+ Destination dest) throws Exception {
+ // add to cache
+ if (this.doCacheTempDestinations) {
+ cachedDestinations.put(new CachedDestination(dest
+ .getActiveMQDestination()), dest);
+ }
}
-
+
private void doDispose(Destination dest) {
ConnectionContext context = new ConnectionContext();
try {
dest.dispose(context);
dest.stop();
} catch (Exception e) {
- LOG.warn("Failed to dispose of " + dest,e);
+ LOG.warn("Failed to dispose of " + dest, e);
}
-
+
}
-
+
private synchronized void doPurge() {
long currentTime = System.currentTimeMillis();
if (cachedDestinations.size() > 0) {
- Set<CachedDestination> tmp = new
HashSet<CachedDestination>(cachedDestinations.keySet());
- for(CachedDestination key: tmp) {
- if ((key.timeStamp + TIME_BEFORE_PURGE) < currentTime) {
+ Set<CachedDestination> tmp = new HashSet<CachedDestination>(
+ cachedDestinations.keySet());
+ for (CachedDestination key : tmp) {
+ if ((key.timeStamp + purgeTime) < currentTime) {
Destination dest = cachedDestinations.remove(key);
if (dest != null) {
doDispose(dest);
@@ -110,20 +128,21 @@
}
}
}
-
- static class CachedDestination{
+
+ static class CachedDestination {
long timeStamp;
+
ActiveMQDestination destination;
-
- CachedDestination(ActiveMQDestination destination){
- this.destination=destination;
- this.timeStamp=System.currentTimeMillis();
+
+ CachedDestination(ActiveMQDestination destination) {
+ this.destination = destination;
+ this.timeStamp = System.currentTimeMillis();
}
-
+
public int hashCode() {
return destination.hashCode();
}
-
+
public boolean equals(Object o) {
if (o instanceof CachedDestination) {
CachedDestination other = (CachedDestination) o;
@@ -131,7 +150,7 @@
}
return false;
}
-
+
}
}