Author: ffang
Date: Mon Feb  9 09:43:21 2009
New Revision: 742358

URL: http://svn.apache.org/viewvc?rev=742358&view=rev
Log:
[SM-1792]LockManager impl causes memory leak in ServiceMix EIP

Modified:
    
servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/main/java/org/apache/servicemix/locks/LockManager.java
    
servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/main/java/org/apache/servicemix/locks/impl/SimpleLockManager.java
    
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-file/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
    
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpPollerEndpoint.java
    
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-truezip/src/main/java/org/apache/servicemix/truezip/TrueZipPollerEndpoint.java
    
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java
    
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
    
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/WireTapTest.java

Modified: 
servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/main/java/org/apache/servicemix/locks/LockManager.java
URL: 
http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/main/java/org/apache/servicemix/locks/LockManager.java?rev=742358&r1=742357&r2=742358&view=diff
==============================================================================
--- 
servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/main/java/org/apache/servicemix/locks/LockManager.java
 (original)
+++ 
servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/main/java/org/apache/servicemix/locks/LockManager.java
 Mon Feb  9 09:43:21 2009
@@ -22,4 +22,6 @@
 
     Lock getLock(String id);
     
+    void removeLock(String id);
+
 }

Modified: 
servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/main/java/org/apache/servicemix/locks/impl/SimpleLockManager.java
URL: 
http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/main/java/org/apache/servicemix/locks/impl/SimpleLockManager.java?rev=742358&r1=742357&r2=742358&view=diff
==============================================================================
--- 
servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/main/java/org/apache/servicemix/locks/impl/SimpleLockManager.java
 (original)
+++ 
servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/main/java/org/apache/servicemix/locks/impl/SimpleLockManager.java
 Mon Feb  9 09:43:21 2009
@@ -38,5 +38,9 @@
         }
         return lock;
     }
+    
+    public void removeLock(String id) {
+        locks.remove(id);
+    }
 
 }

Modified: 
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-file/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
URL: 
http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-file/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java?rev=742358&r1=742357&r2=742358&view=diff
==============================================================================
--- 
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-file/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
 (original)
+++ 
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-file/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
 Mon Feb  9 09:43:21 2009
@@ -310,7 +310,13 @@
                         unlock = processFileAndDelete(aFile);
                     } finally {
                         if (unlock) {
-                            lock.unlock();
+                            try {
+                                lock.unlock();
+                            } catch (Exception ex) {
+                                //can't release the lock
+                                logger.error(ex);
+                            }
+                            lockManager.removeLock(uri);
                         }
                     }
                 } else {

Modified: 
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpPollerEndpoint.java
URL: 
http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpPollerEndpoint.java?rev=742358&r1=742357&r2=742358&view=diff
==============================================================================
--- 
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpPollerEndpoint.java
 (original)
+++ 
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpPollerEndpoint.java
 Mon Feb  9 09:43:21 2009
@@ -265,7 +265,14 @@
                         unlock = processFileAndDelete(file);
                     } finally {
                         if (unlock) {
-                            lock.unlock();
+                            try {
+                                lock.unlock();
+                            } catch (Exception ex) {
+                                // can't release the lock
+                                logger.error(ex);
+                            }
+                            lockManager.removeLock(file);
+
                         }
                     }
                 }

Modified: 
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-truezip/src/main/java/org/apache/servicemix/truezip/TrueZipPollerEndpoint.java
URL: 
http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-truezip/src/main/java/org/apache/servicemix/truezip/TrueZipPollerEndpoint.java?rev=742358&r1=742357&r2=742358&view=diff
==============================================================================
--- 
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-truezip/src/main/java/org/apache/servicemix/truezip/TrueZipPollerEndpoint.java
 (original)
+++ 
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-truezip/src/main/java/org/apache/servicemix/truezip/TrueZipPollerEndpoint.java
 Mon Feb  9 09:43:21 2009
@@ -206,12 +206,19 @@
                     try {
                         processFileAndDelete(aFile);
                     } finally {
-                        lock.unlock();
+                        try {
+                            lock.unlock();
+                        } catch (Exception ex) {
+                            // can't release the lock
+                            logger.error(ex);
+                        } 
+                        lockManager.removeLock(uri);
                     }
                 } else {
                     if (logger.isDebugEnabled()) {
                         logger.debug("Unable to acquire lock on " + aFile);
                     }
+                    lockManager.removeLock(uri);
                 }
             }
         });

Modified: 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java
URL: 
http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java?rev=742358&r1=742357&r2=742358&view=diff
==============================================================================
--- 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java
 (original)
+++ 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java
 Mon Feb  9 09:43:21 2009
@@ -25,6 +25,8 @@
 import javax.jbi.messaging.NormalizedMessage;
 import javax.jbi.messaging.RobustInOnly;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.servicemix.eip.EIPEndpoint;
 import org.apache.servicemix.eip.support.ExchangeTarget;
 import org.apache.servicemix.jbi.util.MessageUtil;
@@ -50,6 +52,7 @@
     /**
      * List of recipients
      */
+    private static final Log LOG = 
LogFactory.getLog(StaticRecipientList.class);
     private ExchangeTarget[] recipients;
     /**
      * Indicates if faults and errors from recipients should be sent
@@ -145,10 +148,12 @@
         if (exchange.getRole() == MessageExchange.Role.CONSUMER) {
             String corrId = (String) 
exchange.getMessage("in").getProperty(RECIPIENT_LIST_CORRID);
             int count = (Integer) 
exchange.getMessage("in").getProperty(RECIPIENT_LIST_COUNT);
+            Integer acks = null;
             Lock lock = lockManager.getLock(corrId);
             lock.lock();
+            boolean removeLock = true;
             try {
-                Integer acks = (Integer) store.load(corrId + ".acks");
+                acks = (Integer) store.load(corrId + ".acks");
                 if (exchange.getStatus() == ExchangeStatus.DONE) {
                     // If the acks integer is not here anymore, the message 
response has been sent already
                     if (acks != null) {
@@ -157,6 +162,7 @@
                             done(me);
                         } else {
                             store.store(corrId + ".acks", Integer.valueOf(acks 
+ 1));
+                            removeLock = false;
                         }
                     }
                 } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
@@ -170,6 +176,7 @@
                             done(me);
                         } else {
                             store.store(corrId + ".acks", Integer.valueOf(acks 
+ 1));
+                            removeLock = false;
                         }
                     }
                 } else if (exchange.getFault() != null) {
@@ -185,13 +192,21 @@
                             done(me);
                         } else {
                             store.store(corrId + ".acks", Integer.valueOf(acks 
+ 1));
+                            removeLock = false;
                         }
                     } else {
                         done(exchange);
                     }
                 }
             } finally {
-                lock.unlock();
+                try {
+                    lock.unlock();
+                } catch (Exception ex) {
+                    LOG.info("Caught exception while attempting to release 
lock", ex);
+                }
+                if (removeLock) {
+                    lockManager.removeLock(corrId);
+                }
             }
         } else {
             if (!(exchange instanceof InOnly) && !(exchange instanceof 
RobustInOnly)) {

Modified: 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
URL: 
http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java?rev=742358&r1=742357&r2=742358&view=diff
==============================================================================
--- 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
 (original)
+++ 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
 Mon Feb  9 09:43:21 2009
@@ -287,6 +287,7 @@
         // Load existing aggregation
         Lock lock = getLockManager().getLock(correlationId);
         lock.lock();
+        boolean removeLock = true;
         try {
             Object aggregation = store.load(correlationId);
             Date timeout = null;
@@ -310,6 +311,7 @@
                     }
                     exchanges.add(exchange);
                     store.store(correlationId + "-exchanges", exchanges);
+                    removeLock = false;
                 }
                 if (addMessage(aggregation, in, exchange)) {
                     sendAggregate(processCorrelationId, correlationId, 
aggregation, false, isSynchronous(exchange));
@@ -326,6 +328,7 @@
                         }, timeout);
                         timers.put(correlationId, t);
                     }
+                    removeLock = false;
                 }
                 if (!reportErrors) {
                     done(exchange);
@@ -338,7 +341,14 @@
                 }
             }
         } finally {
-            lock.unlock();
+            try {
+                lock.unlock();
+            } catch (Exception ex) {
+                LOG.info("Caught exception while attempting to release 
aggregation lock", ex);
+            }
+            if (removeLock) {
+                lockManager.removeLock(correlationId);
+            }
         }
     }
 
@@ -403,7 +413,12 @@
         } catch (Exception e) {
             LOG.info("Caught exception while processing timeout aggregation", 
e);
         } finally {
-            lock.unlock();
+            try {
+                lock.unlock();
+            } catch (Exception ex) {
+                LOG.info("Caught exception while attempting to release 
aggregation lock", ex);
+            }
+            lockManager.removeLock(correlationId);
         }
     }
 

Modified: 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/WireTapTest.java
URL: 
http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/WireTapTest.java?rev=742358&r1=742357&r2=742358&view=diff
==============================================================================
--- 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/WireTapTest.java
 (original)
+++ 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/WireTapTest.java
 Mon Feb  9 09:43:21 2009
@@ -125,10 +125,19 @@
         assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
         assertNotNull(me.getFault());
         client.done(me);
+
+        me = client.createRobustInOnlyExchange();
+        me.setService(new QName("wireTap"));
+        me.getInMessage().setContent(createSource("<hello/>"));
+        client.send(me);
+        me = (RobustInOnly) client.receive();
+        assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+        assertNotNull(me.getFault());
+        client.done(me);
         
-        inReceiver.getMessageList().assertMessagesReceived(1);
+        inReceiver.getMessageList().assertMessagesReceived(2);
         outReceiver.getMessageList().assertMessagesReceived(0);
-        faultReceiver.getMessageList().assertMessagesReceived(1);
+        faultReceiver.getMessageList().assertMessagesReceived(2);
         
         listener.assertExchangeCompleted();
     }
@@ -144,9 +153,19 @@
         assertNotNull(me.getFault());
         client.fail(me, new Exception("I do not like faults"));
         
-        inReceiver.getMessageList().assertMessagesReceived(1);
+        me = client.createRobustInOnlyExchange();
+        me.setService(new QName("wireTap"));
+        me.getInMessage().setContent(createSource("<hello/>"));
+        client.send(me);
+        
+        me = (RobustInOnly) client.receive(); 
+        assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+        assertNotNull(me.getFault());
+        client.fail(me, new Exception("I do not like faults"));
+        
+        inReceiver.getMessageList().assertMessagesReceived(2);
         outReceiver.getMessageList().assertMessagesReceived(0);
-        faultReceiver.getMessageList().assertMessagesReceived(1);
+        faultReceiver.getMessageList().assertMessagesReceived(2);
         
         listener.assertExchangeCompleted();
     }
@@ -416,4 +435,5 @@
         
         listener.assertExchangeCompleted();
     }
+    
 }


Reply via email to