Author: ffang
Date: Mon Feb 9 09:43:21 2009
New Revision: 742358
URL: http://svn.apache.org/viewvc?rev=742358&view=rev
Log:
[SM-1792]LockManager impl causes memory leak in ServiceMix EIP
Modified:
servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/main/java/org/apache/servicemix/locks/LockManager.java
servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/main/java/org/apache/servicemix/locks/impl/SimpleLockManager.java
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-file/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpPollerEndpoint.java
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-truezip/src/main/java/org/apache/servicemix/truezip/TrueZipPollerEndpoint.java
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/WireTapTest.java
Modified:
servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/main/java/org/apache/servicemix/locks/LockManager.java
URL:
http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/main/java/org/apache/servicemix/locks/LockManager.java?rev=742358&r1=742357&r2=742358&view=diff
==============================================================================
---
servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/main/java/org/apache/servicemix/locks/LockManager.java
(original)
+++
servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/main/java/org/apache/servicemix/locks/LockManager.java
Mon Feb 9 09:43:21 2009
@@ -22,4 +22,6 @@
Lock getLock(String id);
+ void removeLock(String id);
+
}
Modified:
servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/main/java/org/apache/servicemix/locks/impl/SimpleLockManager.java
URL:
http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/main/java/org/apache/servicemix/locks/impl/SimpleLockManager.java?rev=742358&r1=742357&r2=742358&view=diff
==============================================================================
---
servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/main/java/org/apache/servicemix/locks/impl/SimpleLockManager.java
(original)
+++
servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/main/java/org/apache/servicemix/locks/impl/SimpleLockManager.java
Mon Feb 9 09:43:21 2009
@@ -38,5 +38,9 @@
}
return lock;
}
+
+ public void removeLock(String id) {
+ locks.remove(id);
+ }
}
Modified:
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-file/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
URL:
http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-file/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java?rev=742358&r1=742357&r2=742358&view=diff
==============================================================================
---
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-file/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
(original)
+++
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-file/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
Mon Feb 9 09:43:21 2009
@@ -310,7 +310,13 @@
unlock = processFileAndDelete(aFile);
} finally {
if (unlock) {
- lock.unlock();
+ try {
+ lock.unlock();
+ } catch (Exception ex) {
+ //can't release the lock
+ logger.error(ex);
+ }
+ lockManager.removeLock(uri);
}
}
} else {
Modified:
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpPollerEndpoint.java
URL:
http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpPollerEndpoint.java?rev=742358&r1=742357&r2=742358&view=diff
==============================================================================
---
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpPollerEndpoint.java
(original)
+++
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpPollerEndpoint.java
Mon Feb 9 09:43:21 2009
@@ -265,7 +265,14 @@
unlock = processFileAndDelete(file);
} finally {
if (unlock) {
- lock.unlock();
+ try {
+ lock.unlock();
+ } catch (Exception ex) {
+ // can't release the lock
+ logger.error(ex);
+ }
+ lockManager.removeLock(file);
+
}
}
}
Modified:
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-truezip/src/main/java/org/apache/servicemix/truezip/TrueZipPollerEndpoint.java
URL:
http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-truezip/src/main/java/org/apache/servicemix/truezip/TrueZipPollerEndpoint.java?rev=742358&r1=742357&r2=742358&view=diff
==============================================================================
---
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-truezip/src/main/java/org/apache/servicemix/truezip/TrueZipPollerEndpoint.java
(original)
+++
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-truezip/src/main/java/org/apache/servicemix/truezip/TrueZipPollerEndpoint.java
Mon Feb 9 09:43:21 2009
@@ -206,12 +206,19 @@
try {
processFileAndDelete(aFile);
} finally {
- lock.unlock();
+ try {
+ lock.unlock();
+ } catch (Exception ex) {
+ // can't release the lock
+ logger.error(ex);
+ }
+ lockManager.removeLock(uri);
}
} else {
if (logger.isDebugEnabled()) {
logger.debug("Unable to acquire lock on " + aFile);
}
+ lockManager.removeLock(uri);
}
}
});
Modified:
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java
URL:
http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java?rev=742358&r1=742357&r2=742358&view=diff
==============================================================================
---
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java
(original)
+++
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java
Mon Feb 9 09:43:21 2009
@@ -25,6 +25,8 @@
import javax.jbi.messaging.NormalizedMessage;
import javax.jbi.messaging.RobustInOnly;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.servicemix.eip.EIPEndpoint;
import org.apache.servicemix.eip.support.ExchangeTarget;
import org.apache.servicemix.jbi.util.MessageUtil;
@@ -50,6 +52,7 @@
/**
* List of recipients
*/
+ private static final Log LOG =
LogFactory.getLog(StaticRecipientList.class);
private ExchangeTarget[] recipients;
/**
* Indicates if faults and errors from recipients should be sent
@@ -145,10 +148,12 @@
if (exchange.getRole() == MessageExchange.Role.CONSUMER) {
String corrId = (String)
exchange.getMessage("in").getProperty(RECIPIENT_LIST_CORRID);
int count = (Integer)
exchange.getMessage("in").getProperty(RECIPIENT_LIST_COUNT);
+ Integer acks = null;
Lock lock = lockManager.getLock(corrId);
lock.lock();
+ boolean removeLock = true;
try {
- Integer acks = (Integer) store.load(corrId + ".acks");
+ acks = (Integer) store.load(corrId + ".acks");
if (exchange.getStatus() == ExchangeStatus.DONE) {
// If the acks integer is not here anymore, the message
response has been sent already
if (acks != null) {
@@ -157,6 +162,7 @@
done(me);
} else {
store.store(corrId + ".acks", Integer.valueOf(acks
+ 1));
+ removeLock = false;
}
}
} else if (exchange.getStatus() == ExchangeStatus.ERROR) {
@@ -170,6 +176,7 @@
done(me);
} else {
store.store(corrId + ".acks", Integer.valueOf(acks
+ 1));
+ removeLock = false;
}
}
} else if (exchange.getFault() != null) {
@@ -185,13 +192,21 @@
done(me);
} else {
store.store(corrId + ".acks", Integer.valueOf(acks
+ 1));
+ removeLock = false;
}
} else {
done(exchange);
}
}
} finally {
- lock.unlock();
+ try {
+ lock.unlock();
+ } catch (Exception ex) {
+ LOG.info("Caught exception while attempting to release
lock", ex);
+ }
+ if (removeLock) {
+ lockManager.removeLock(corrId);
+ }
}
} else {
if (!(exchange instanceof InOnly) && !(exchange instanceof
RobustInOnly)) {
Modified:
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
URL:
http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java?rev=742358&r1=742357&r2=742358&view=diff
==============================================================================
---
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
(original)
+++
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
Mon Feb 9 09:43:21 2009
@@ -287,6 +287,7 @@
// Load existing aggregation
Lock lock = getLockManager().getLock(correlationId);
lock.lock();
+ boolean removeLock = true;
try {
Object aggregation = store.load(correlationId);
Date timeout = null;
@@ -310,6 +311,7 @@
}
exchanges.add(exchange);
store.store(correlationId + "-exchanges", exchanges);
+ removeLock = false;
}
if (addMessage(aggregation, in, exchange)) {
sendAggregate(processCorrelationId, correlationId,
aggregation, false, isSynchronous(exchange));
@@ -326,6 +328,7 @@
}, timeout);
timers.put(correlationId, t);
}
+ removeLock = false;
}
if (!reportErrors) {
done(exchange);
@@ -338,7 +341,14 @@
}
}
} finally {
- lock.unlock();
+ try {
+ lock.unlock();
+ } catch (Exception ex) {
+ LOG.info("Caught exception while attempting to release
aggregation lock", ex);
+ }
+ if (removeLock) {
+ lockManager.removeLock(correlationId);
+ }
}
}
@@ -403,7 +413,12 @@
} catch (Exception e) {
LOG.info("Caught exception while processing timeout aggregation",
e);
} finally {
- lock.unlock();
+ try {
+ lock.unlock();
+ } catch (Exception ex) {
+ LOG.info("Caught exception while attempting to release
aggregation lock", ex);
+ }
+ lockManager.removeLock(correlationId);
}
}
Modified:
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/WireTapTest.java
URL:
http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/WireTapTest.java?rev=742358&r1=742357&r2=742358&view=diff
==============================================================================
---
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/WireTapTest.java
(original)
+++
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/WireTapTest.java
Mon Feb 9 09:43:21 2009
@@ -125,10 +125,19 @@
assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
assertNotNull(me.getFault());
client.done(me);
+
+ me = client.createRobustInOnlyExchange();
+ me.setService(new QName("wireTap"));
+ me.getInMessage().setContent(createSource("<hello/>"));
+ client.send(me);
+ me = (RobustInOnly) client.receive();
+ assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+ assertNotNull(me.getFault());
+ client.done(me);
- inReceiver.getMessageList().assertMessagesReceived(1);
+ inReceiver.getMessageList().assertMessagesReceived(2);
outReceiver.getMessageList().assertMessagesReceived(0);
- faultReceiver.getMessageList().assertMessagesReceived(1);
+ faultReceiver.getMessageList().assertMessagesReceived(2);
listener.assertExchangeCompleted();
}
@@ -144,9 +153,19 @@
assertNotNull(me.getFault());
client.fail(me, new Exception("I do not like faults"));
- inReceiver.getMessageList().assertMessagesReceived(1);
+ me = client.createRobustInOnlyExchange();
+ me.setService(new QName("wireTap"));
+ me.getInMessage().setContent(createSource("<hello/>"));
+ client.send(me);
+
+ me = (RobustInOnly) client.receive();
+ assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+ assertNotNull(me.getFault());
+ client.fail(me, new Exception("I do not like faults"));
+
+ inReceiver.getMessageList().assertMessagesReceived(2);
outReceiver.getMessageList().assertMessagesReceived(0);
- faultReceiver.getMessageList().assertMessagesReceived(1);
+ faultReceiver.getMessageList().assertMessagesReceived(2);
listener.assertExchangeCompleted();
}
@@ -416,4 +435,5 @@
listener.assertExchangeCompleted();
}
+
}