Repository: camel Updated Branches: refs/heads/camel-2.15.x bb3e39951 -> 77fc1550a refs/heads/master 64a13a75b -> d063f471f
CAMEL-8626: Fixed leaking exchangesInFlightKeys in ManagedRoute if exchange failed. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d063f471 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d063f471 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d063f471 Branch: refs/heads/master Commit: d063f471f8d1f7595d30527b8ee9bdc94e217173 Parents: 64a13a7 Author: Claus Ibsen <[email protected]> Authored: Tue Apr 14 08:17:34 2015 +0200 Committer: Claus Ibsen <[email protected]> Committed: Tue Apr 14 08:17:34 2015 +0200 ---------------------------------------------------------------------- .../camel/management/mbean/ManagedRoute.java | 13 ++++- .../ManagedInflightStatisticsTest.java | 58 ++++++++++++++++++++ 2 files changed, 69 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/d063f471/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java index d31e4e5..c4b6d85 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java @@ -417,8 +417,8 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList @Override public void init(ManagementStrategy strategy) { - super.init(strategy); exchangesInFlightStartTimestamps.clear(); + super.init(strategy); } @Override @@ -438,14 +438,23 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList super.completedExchange(exchange, time); } + @Override + public synchronized void failedExchange(Exchange exchange) { + InFlightKey key = exchangesInFlightKeys.remove(exchange.getExchangeId()); + if (key != null) { + exchangesInFlightStartTimestamps.remove(key); + } + super.failedExchange(exchange); + } + private static class InFlightKey implements Comparable<InFlightKey> { private final Long timeStamp; private final String exchangeId; InFlightKey(Long timeStamp, String exchangeId) { - this.exchangeId = exchangeId; this.timeStamp = timeStamp; + this.exchangeId = exchangeId; } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/d063f471/camel-core/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java index d0599bf..aa6fbe7 100644 --- a/camel-core/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java @@ -90,7 +90,60 @@ public class ManagedInflightStatisticsTest extends ManagementTestSupport { assertNull(ts); id = (String) mbeanServer.getAttribute(on, "OldestInflightExchangeId"); assertNull(id); + } + + public void testManageStatisticsFailed() throws Exception { + // JMX tests dont work well on AIX CI servers (hangs them) + if (isPlatform("aix")) { + return; + } + + // get the stats for the route + MBeanServer mbeanServer = getMBeanServer(); + + Set<ObjectName> set = mbeanServer.queryNames(new ObjectName("*:type=routes,*"), null); + assertEquals(1, set.size()); + ObjectName on = set.iterator().next(); + + Long inflight = (Long) mbeanServer.getAttribute(on, "ExchangesInflight"); + assertEquals(0, inflight.longValue()); + Long ts = (Long) mbeanServer.getAttribute(on, "OldestInflightDuration"); + assertNull(ts); + String id = (String) mbeanServer.getAttribute(on, "OldestInflightExchangeId"); + assertNull(id); + + MockEndpoint result = getMockEndpoint("mock:result"); + result.expectedMessageCount(1); + + // start some exchanges. + template.asyncSendBody("direct:start", 1000L); + Thread.sleep(500); + try { + template.sendBody("direct:start", "Kaboom"); + fail("Should have thrown exception"); + } catch (Exception e) { + // expected + } + + inflight = (Long) mbeanServer.getAttribute(on, "ExchangesInflight"); + assertEquals(1, inflight.longValue()); + ts = (Long) mbeanServer.getAttribute(on, "OldestInflightDuration"); + assertNotNull(ts); + id = (String) mbeanServer.getAttribute(on, "OldestInflightExchangeId"); + assertNotNull(id); + + assertMockEndpointsSatisfied(); + + // Lets wait for all the exchanges to complete. + Thread.sleep(500); + + inflight = (Long) mbeanServer.getAttribute(on, "ExchangesInflight"); + assertEquals(0, inflight.longValue()); + ts = (Long) mbeanServer.getAttribute(on, "OldestInflightDuration"); + assertNull(ts); + id = (String) mbeanServer.getAttribute(on, "OldestInflightExchangeId"); + assertNull(id); } @Override @@ -102,6 +155,11 @@ public class ManagedInflightStatisticsTest extends ManagementTestSupport { .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { + String body = exchange.getIn().getBody(String.class); + if ("Kaboom".equals(body)) { + throw new IllegalArgumentException("Forced"); + } + Long delay = (Long) exchange.getIn().getBody(); Thread.sleep(delay.longValue()); }
