Author: gertv
Date: Thu Aug 21 06:28:31 2008
New Revision: 687751
URL: http://svn.apache.org/viewvc?rev=687751&view=rev
Log:
SM-1518: Aggregator keeps a list of closed aggregations causing a slight memory
leak
Modified:
servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
Modified:
servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
URL:
http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java?rev=687751&r1=687750&r2=687751&view=diff
==============================================================================
---
servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
(original)
+++
servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
Thu Aug 21 06:28:31 2008
@@ -29,9 +29,13 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.servicemix.eip.EIPEndpoint;
import org.apache.servicemix.common.JbiConstants;
import org.apache.servicemix.common.util.MessageUtil;
+import org.apache.servicemix.eip.EIPEndpoint;
+import org.apache.servicemix.store.Store;
+import org.apache.servicemix.store.StoreFactory;
+import org.apache.servicemix.store.memory.MemoryStore;
+import org.apache.servicemix.store.memory.MemoryStoreFactory;
import org.apache.servicemix.timers.Timer;
import org.apache.servicemix.timers.TimerListener;
@@ -41,7 +45,9 @@
* <a
href="http://www.enterpriseintegrationpatterns.com/Aggregator.html">Aggregator</a>
* pattern.
*
- * TODO: keep list of closed aggregations for a certain time
+ * Closed aggregations are being kept in a [EMAIL PROTECTED] Store}. By
default, we will use a simple
+ * [EMAIL PROTECTED] MemoryStore}, but you can set your own [EMAIL PROTECTED]
StoreFactory} to use other implementations.
+ *
* TODO: distributed lock manager
* TODO: persistent / transactional timer
*
@@ -58,7 +64,8 @@
private boolean synchronous;
- private ConcurrentMap<String, Boolean> closedAggregates = new
ConcurrentHashMap<String, Boolean>();
+ private Store closedAggregates;
+ private StoreFactory closedAggregatesStoreFactory;
private boolean copyProperties = true;
@@ -128,6 +135,24 @@
protected void processSync(MessageExchange exchange) throws Exception {
throw new IllegalStateException();
}
+
+ /**
+ * Access the currently configured [EMAIL PROTECTED] StoreFactory} for
storing closed aggregations
+ */
+ public StoreFactory getClosedAggregatesStoreFactory() {
+ return closedAggregatesStoreFactory;
+ }
+
+ /**
+ * Set a new [EMAIL PROTECTED] StoreFactory} for creating the [EMAIL
PROTECTED] Store} to hold closed aggregations
+ *
+ * If it hasn't been set, a simple [EMAIL PROTECTED] MemoryStoreFactory}
will be used by default.
+ *
+ * @param closedAggregatesStoreFactory
+ */
+ public void setClosedAggregatesStoreFactory(StoreFactory
closedAggregatesStoreFactory) {
+ this.closedAggregatesStoreFactory = closedAggregatesStoreFactory;
+ }
/* (non-Javadoc)
* @see
org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
@@ -135,6 +160,15 @@
protected void processAsync(MessageExchange exchange) throws Exception {
throw new IllegalStateException();
}
+
+ @Override
+ public void start() throws Exception {
+ super.start();
+ if (closedAggregatesStoreFactory == null) {
+ closedAggregatesStoreFactory = new MemoryStoreFactory();
+ }
+ closedAggregates =
closedAggregatesStoreFactory.open(getService().toString() + getEndpoint() +
"-closed-aggregates");
+ }
/* (non-Javadoc)
* @see
org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
@@ -261,19 +295,25 @@
*
* @param correlationId
* @return
+ * @throws Exception
*/
- protected boolean isAggregationClosed(String correlationId) {
+ protected boolean isAggregationClosed(String correlationId) throws
Exception {
// TODO: implement this using a persistent / cached behavior
- return closedAggregates.containsKey(correlationId);
+ Object data = store.load(correlationId);
+ if (data != null) {
+ store.store(correlationId, data);
+ }
+ return data != null;
}
/**
* Mark an aggregation as closed
* @param correlationId
+ * @throws Exception
*/
- protected void closeAggregation(String correlationId) {
+ protected void closeAggregation(String correlationId) throws Exception {
// TODO: implement this using a persistent / cached behavior
- closedAggregates.put(correlationId, Boolean.TRUE);
+ closedAggregates.store(correlationId, Boolean.TRUE);
}
private boolean isSynchronous(MessageExchange exchange) {