Repository: camel
Updated Branches:
  refs/heads/camel-2.15.x bb3e39951 -> 77fc1550a
  refs/heads/master 64a13a75b -> d063f471f


CAMEL-8626: Fixed leaking exchangesInFlightKeys in ManagedRoute if exchange 
failed.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d063f471
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d063f471
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d063f471

Branch: refs/heads/master
Commit: d063f471f8d1f7595d30527b8ee9bdc94e217173
Parents: 64a13a7
Author: Claus Ibsen <[email protected]>
Authored: Tue Apr 14 08:17:34 2015 +0200
Committer: Claus Ibsen <[email protected]>
Committed: Tue Apr 14 08:17:34 2015 +0200

----------------------------------------------------------------------
 .../camel/management/mbean/ManagedRoute.java    | 13 ++++-
 .../ManagedInflightStatisticsTest.java          | 58 ++++++++++++++++++++
 2 files changed, 69 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d063f471/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
index d31e4e5..c4b6d85 100644
--- 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
+++ 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
@@ -417,8 +417,8 @@ public class ManagedRoute extends ManagedPerformanceCounter 
implements TimerList
 
     @Override
     public void init(ManagementStrategy strategy) {
-        super.init(strategy);
         exchangesInFlightStartTimestamps.clear();
+        super.init(strategy);
     }
 
     @Override
@@ -438,14 +438,23 @@ public class ManagedRoute extends 
ManagedPerformanceCounter implements TimerList
         super.completedExchange(exchange, time);
     }
 
+    @Override
+    public synchronized void failedExchange(Exchange exchange) {
+        InFlightKey key = 
exchangesInFlightKeys.remove(exchange.getExchangeId());
+        if (key != null) {
+            exchangesInFlightStartTimestamps.remove(key);
+        }
+        super.failedExchange(exchange);
+    }
+
     private static class InFlightKey implements Comparable<InFlightKey> {
 
         private final Long timeStamp;
         private final String exchangeId;
 
         InFlightKey(Long timeStamp, String exchangeId) {
-            this.exchangeId = exchangeId;
             this.timeStamp = timeStamp;
+            this.exchangeId = exchangeId;
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/d063f471/camel-core/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java
 
b/camel-core/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java
index d0599bf..aa6fbe7 100644
--- 
a/camel-core/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java
@@ -90,7 +90,60 @@ public class ManagedInflightStatisticsTest extends 
ManagementTestSupport {
         assertNull(ts);
         id = (String) mbeanServer.getAttribute(on, "OldestInflightExchangeId");
         assertNull(id);
+    }
+
+    public void testManageStatisticsFailed() throws Exception {
+        // JMX tests dont work well on AIX CI servers (hangs them)
+        if (isPlatform("aix")) {
+            return;
+        }
+
+        // get the stats for the route
+        MBeanServer mbeanServer = getMBeanServer();
+
+        Set<ObjectName> set = mbeanServer.queryNames(new 
ObjectName("*:type=routes,*"), null);
+        assertEquals(1, set.size());
+        ObjectName on = set.iterator().next();
+
+        Long inflight = (Long) mbeanServer.getAttribute(on, 
"ExchangesInflight");
+        assertEquals(0, inflight.longValue());
+        Long ts = (Long) mbeanServer.getAttribute(on, 
"OldestInflightDuration");
+        assertNull(ts);
+        String id = (String) mbeanServer.getAttribute(on, 
"OldestInflightExchangeId");
+        assertNull(id);
+
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedMessageCount(1);
+
+        // start some exchanges.
+        template.asyncSendBody("direct:start", 1000L);
+        Thread.sleep(500);
+        try {
+            template.sendBody("direct:start", "Kaboom");
+            fail("Should have thrown exception");
+        } catch (Exception e) {
+            // expected
+        }
+
+        inflight = (Long) mbeanServer.getAttribute(on, "ExchangesInflight");
+        assertEquals(1, inflight.longValue());
 
+        ts = (Long) mbeanServer.getAttribute(on, "OldestInflightDuration");
+        assertNotNull(ts);
+        id = (String) mbeanServer.getAttribute(on, "OldestInflightExchangeId");
+        assertNotNull(id);
+
+        assertMockEndpointsSatisfied();
+
+        // Lets wait for all the exchanges to complete.
+        Thread.sleep(500);
+
+        inflight = (Long) mbeanServer.getAttribute(on, "ExchangesInflight");
+        assertEquals(0, inflight.longValue());
+        ts = (Long) mbeanServer.getAttribute(on, "OldestInflightDuration");
+        assertNull(ts);
+        id = (String) mbeanServer.getAttribute(on, "OldestInflightExchangeId");
+        assertNull(id);
     }
 
     @Override
@@ -102,6 +155,11 @@ public class ManagedInflightStatisticsTest extends 
ManagementTestSupport {
                         .process(new Processor() {
                             @Override
                             public void process(Exchange exchange) throws 
Exception {
+                                String body = 
exchange.getIn().getBody(String.class);
+                                if ("Kaboom".equals(body)) {
+                                    throw new 
IllegalArgumentException("Forced");
+                                }
+
                                 Long delay = (Long) exchange.getIn().getBody();
                                 Thread.sleep(delay.longValue());
                             }

Reply via email to