Updated Branches: refs/heads/camel-2.12.x 6e8689a2d -> f2749a941 refs/heads/master d6646e648 -> 5f726d0b9
CAMEL-7163: BacklogDebugger - Should not change body/header type to string Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5f726d0b Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5f726d0b Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5f726d0b Branch: refs/heads/master Commit: 5f726d0b995f90f59489b5ebf32aad09ca80f951 Parents: d6646e6 Author: Claus Ibsen <[email protected]> Authored: Mon Feb 3 20:13:57 2014 +0100 Committer: Claus Ibsen <[email protected]> Committed: Mon Feb 3 20:13:57 2014 +0100 ---------------------------------------------------------------------- .../mbean/ManagedBacklogDebuggerMBean.java | 20 ++- .../mbean/ManagedBacklogDebugger.java | 38 ++++- .../processor/interceptor/BacklogDebugger.java | 94 ++++++++++- .../camel/management/BacklogDebuggerTest.java | 160 ++++++++++++++++++- 4 files changed, 296 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/5f726d0b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java index 02d2d96..036ef3d 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java @@ -53,11 +53,23 @@ public interface ManagedBacklogDebuggerMBean { @ManagedOperation(description = "Resume running from the suspended breakpoint at the given node id") void resumeBreakpoint(String nodeId); - @ManagedOperation(description = "Updates the message body on the suspended breakpoint at the given node id") - void setMessageBodyOnBreakpoint(String nodeId, String body); + @ManagedOperation(description = "Updates the message body (uses same type as old body) on the suspended breakpoint at the given node id") + void setMessageBodyOnBreakpoint(String nodeId, Object body); - @ManagedOperation(description = "Updates/adds the message header on the suspended breakpoint at the given node id") - void setMessageHeaderOnBreakpoint(String nodeId, String headerName, String value); + @ManagedOperation(description = "Updates the message body (with a new type) on the suspended breakpoint at the given node id") + void setMessageBodyOnBreakpoint(String nodeId, Object body, String type); + + @ManagedOperation(description = "Removes the message body on the suspended breakpoint at the given node id") + void removeMessageBodyOnBreakpoint(String nodeId); + + @ManagedOperation(description = "Updates/adds the message header (uses same type as old header value) on the suspended breakpoint at the given node id") + void setMessageHeaderOnBreakpoint(String nodeId, String headerName, Object value); + + @ManagedOperation(description = "Removes the message header on the suspended breakpoint at the given node id") + void removeMessageHeaderOnBreakpoint(String nodeId, String headerName); + + @ManagedOperation(description = "Updates/adds the message header (with a new type) on the suspended breakpoint at the given node id") + void setMessageHeaderOnBreakpoint(String nodeId, String headerName, Object value, String type); @ManagedOperation(description = "Resume running any suspended breakpoints, and exits step mode") void resumeAll(); http://git-wip-us.apache.org/repos/asf/camel/blob/5f726d0b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java index 33f1310..9f3a94c 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java @@ -19,10 +19,12 @@ package org.apache.camel.management.mbean; import java.util.Set; import org.apache.camel.CamelContext; +import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.api.management.ManagedResource; import org.apache.camel.api.management.mbean.ManagedBacklogDebuggerMBean; import org.apache.camel.processor.interceptor.BacklogDebugger; import org.apache.camel.spi.ManagementStrategy; +import org.apache.camel.util.ObjectHelper; @ManagedResource(description = "Managed BacklogDebugger") public class ManagedBacklogDebugger implements ManagedBacklogDebuggerMBean { @@ -91,12 +93,42 @@ public class ManagedBacklogDebugger implements ManagedBacklogDebuggerMBean { backlogDebugger.resumeBreakpoint(nodeId); } - public void setMessageBodyOnBreakpoint(String nodeId, String body) { + public void setMessageBodyOnBreakpoint(String nodeId, Object body) { backlogDebugger.setMessageBodyOnBreakpoint(nodeId, body); } - public void setMessageHeaderOnBreakpoint(String nodeId, String headerName, String value) { - backlogDebugger.setMessageHeaderOnBreakpoint(nodeId, headerName, value); + public void setMessageBodyOnBreakpoint(String nodeId, Object body, String type) { + try { + Class<?> classType = camelContext.getClassResolver().resolveMandatoryClass(type); + backlogDebugger.setMessageBodyOnBreakpoint(nodeId, body, classType); + } catch (ClassNotFoundException e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } + } + + public void removeMessageBodyOnBreakpoint(String nodeId) { + backlogDebugger.removeMessageBodyOnBreakpoint(nodeId); + } + + public void setMessageHeaderOnBreakpoint(String nodeId, String headerName, Object value) { + try { + backlogDebugger.setMessageHeaderOnBreakpoint(nodeId, headerName, value); + } catch (NoTypeConversionAvailableException e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } + } + + public void setMessageHeaderOnBreakpoint(String nodeId, String headerName, Object value, String type) { + try { + Class<?> classType = camelContext.getClassResolver().resolveMandatoryClass(type); + backlogDebugger.setMessageHeaderOnBreakpoint(nodeId, headerName, value, classType); + } catch (Exception e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } + } + + public void removeMessageHeaderOnBreakpoint(String nodeId, String headerName) { + backlogDebugger.removeMessageHeaderOnBreakpoint(nodeId, headerName); } public void resumeAll() { http://git-wip-us.apache.org/repos/asf/camel/blob/5f726d0b/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogDebugger.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogDebugger.java b/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogDebugger.java index 3c2e290..571b174 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogDebugger.java +++ b/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogDebugger.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.LoggingLevel; +import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.api.management.mbean.BacklogTracerEventMessage; @@ -268,26 +269,105 @@ public class BacklogDebugger extends ServiceSupport implements InterceptStrategy } } - public void setMessageBodyOnBreakpoint(String nodeId, String body) { + public void setMessageBodyOnBreakpoint(String nodeId, Object body) { SuspendedExchange se = suspendedBreakpoints.get(nodeId); if (se != null) { - logger.log("Breakpoint at node " + nodeId + " is updating message body on exchangeId: " + se.getExchange().getExchangeId() + " with new body: " + body); + boolean remove = body == null; + if (remove) { + removeMessageBodyOnBreakpoint(nodeId); + } else { + Class oldType; + if (se.getExchange().hasOut()) { + oldType = se.getExchange().getOut().getBody() != null ? se.getExchange().getOut().getBody().getClass() : null; + } else { + oldType = se.getExchange().getIn().getBody() != null ? se.getExchange().getIn().getBody().getClass() : null; + } + setMessageBodyOnBreakpoint(nodeId, body, oldType); + } + } + } + + public void setMessageBodyOnBreakpoint(String nodeId, Object body, Class type) { + SuspendedExchange se = suspendedBreakpoints.get(nodeId); + if (se != null) { + boolean remove = body == null; + if (remove) { + removeMessageBodyOnBreakpoint(nodeId); + } else { + logger.log("Breakpoint at node " + nodeId + " is updating message body on exchangeId: " + se.getExchange().getExchangeId() + " with new body: " + body); + if (se.getExchange().hasOut()) { + // preserve type + if (type != null) { + se.getExchange().getOut().setBody(body, type); + } else { + se.getExchange().getOut().setBody(body); + } + } else { + if (type != null) { + se.getExchange().getIn().setBody(body, type); + } else { + se.getExchange().getIn().setBody(body); + } + } + } + } + } + + public void removeMessageBodyOnBreakpoint(String nodeId) { + SuspendedExchange se = suspendedBreakpoints.get(nodeId); + if (se != null) { + logger.log("Breakpoint at node " + nodeId + " is removing message body on exchangeId: " + se.getExchange().getExchangeId()); if (se.getExchange().hasOut()) { - se.getExchange().getOut().setBody(body); + se.getExchange().getOut().setBody(null); } else { - se.getExchange().getIn().setBody(body); + se.getExchange().getIn().setBody(null); } } } - public void setMessageHeaderOnBreakpoint(String nodeId, String headerName, String value) { + public void setMessageHeaderOnBreakpoint(String nodeId, String headerName, Object value) throws NoTypeConversionAvailableException { + SuspendedExchange se = suspendedBreakpoints.get(nodeId); + if (se != null) { + Class oldType; + if (se.getExchange().hasOut()) { + oldType = se.getExchange().getOut().getHeader(headerName) != null ? se.getExchange().getOut().getHeader(headerName).getClass() : null; + } else { + oldType = se.getExchange().getIn().getHeader(headerName) != null ? se.getExchange().getIn().getHeader(headerName).getClass() : null; + } + setMessageHeaderOnBreakpoint(nodeId, headerName, value, oldType); + } + } + + public void setMessageHeaderOnBreakpoint(String nodeId, String headerName, Object value, Class type) throws NoTypeConversionAvailableException { SuspendedExchange se = suspendedBreakpoints.get(nodeId); if (se != null) { logger.log("Breakpoint at node " + nodeId + " is updating message header on exchangeId: " + se.getExchange().getExchangeId() + " with header: " + headerName + " and value: " + value); if (se.getExchange().hasOut()) { - se.getExchange().getOut().setHeader(headerName, value); + if (type != null) { + Object convertedValue = se.getExchange().getContext().getTypeConverter().mandatoryConvertTo(type, se.getExchange(), value); + se.getExchange().getOut().setHeader(headerName, convertedValue); + } else { + se.getExchange().getOut().setHeader(headerName, value); + } + } else { + if (type != null) { + Object convertedValue = se.getExchange().getContext().getTypeConverter().mandatoryConvertTo(type, se.getExchange(), value); + se.getExchange().getIn().setHeader(headerName, convertedValue); + } else { + se.getExchange().getIn().setHeader(headerName, value); + } + } + } + } + + public void removeMessageHeaderOnBreakpoint(String nodeId, String headerName) { + SuspendedExchange se = suspendedBreakpoints.get(nodeId); + if (se != null) { + logger.log("Breakpoint at node " + nodeId + " is removing message header on exchangeId: " + se.getExchange().getExchangeId() + " with header: " + headerName); + if (se.getExchange().hasOut()) { + se.getExchange().getOut().removeHeader(headerName); } else { - se.getExchange().getIn().setHeader(headerName, value); + se.getExchange().getIn().removeHeader(headerName); } } } http://git-wip-us.apache.org/repos/asf/camel/blob/5f726d0b/camel-core/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java b/camel-core/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java index 6d9140c..d717fa4 100644 --- a/camel-core/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java @@ -125,8 +125,8 @@ public class BacklogDebuggerTest extends ManagementTestSupport { assertEquals("foo", nodes.iterator().next()); // update body and header - mbeanServer.invoke(on, "setMessageBodyOnBreakpoint", new Object[]{"foo", "Changed body"}, new String[]{"java.lang.String", "java.lang.String"}); - mbeanServer.invoke(on, "setMessageHeaderOnBreakpoint", new Object[]{"foo", "beer", "Carlsberg"}, new String[]{"java.lang.String", "java.lang.String", "java.lang.String"}); + mbeanServer.invoke(on, "setMessageBodyOnBreakpoint", new Object[]{"foo", "Changed body"}, new String[]{"java.lang.String", "java.lang.Object"}); + mbeanServer.invoke(on, "setMessageHeaderOnBreakpoint", new Object[]{"foo", "beer", "Carlsberg"}, new String[]{"java.lang.String", "java.lang.String", "java.lang.Object"}); // resume breakpoint mbeanServer.invoke(on, "resumeBreakpoint", new Object[]{"foo"}, new String[]{"java.lang.String"}); @@ -163,6 +163,162 @@ public class BacklogDebuggerTest extends ManagementTestSupport { } @SuppressWarnings("unchecked") + public void testBacklogDebuggerUpdateBodyAndHeaderType() throws Exception { + // JMX tests dont work well on AIX CI servers (hangs them) + if (isPlatform("aix")) { + return; + } + + MBeanServer mbeanServer = getMBeanServer(); + ObjectName on = new ObjectName("org.apache.camel:context=camel-1,type=tracer,name=BacklogDebugger"); + assertNotNull(on); + mbeanServer.isRegistered(on); + + Boolean enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled"); + assertEquals("Should not be enabled", Boolean.FALSE, enabled); + + // enable debugger + mbeanServer.invoke(on, "enableDebugger", null, null); + + enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled"); + assertEquals("Should be enabled", Boolean.TRUE, enabled); + + // add breakpoint at bar + mbeanServer.invoke(on, "addBreakpoint", new Object[]{"foo"}, new String[]{"java.lang.String"}); + mbeanServer.invoke(on, "addBreakpoint", new Object[]{"bar"}, new String[]{"java.lang.String"}); + + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(0); + mock.setSleepForEmptyTest(1000); + + template.sendBody("seda:start", "Hello World"); + + assertMockEndpointsSatisfied(); + + // add breakpoint at bar + Set<String> nodes = (Set<String>) mbeanServer.invoke(on, "getSuspendedBreakpointNodeIds", null, null); + assertNotNull(nodes); + assertEquals(1, nodes.size()); + assertEquals("foo", nodes.iterator().next()); + + // update body and header + mbeanServer.invoke(on, "setMessageBodyOnBreakpoint", new Object[]{"foo", "444", "java.lang.Integer"}, + new String[]{"java.lang.String", "java.lang.Object", "java.lang.String"}); + mbeanServer.invoke(on, "setMessageHeaderOnBreakpoint", new Object[]{"foo", "beer", "123", "java.lang.Integer"}, + new String[]{"java.lang.String", "java.lang.String", "java.lang.Object", "java.lang.String"}); + + // resume breakpoint + mbeanServer.invoke(on, "resumeBreakpoint", new Object[]{"foo"}, new String[]{"java.lang.String"}); + + Thread.sleep(1000); + + // add breakpoint at bar + nodes = (Set<String>) mbeanServer.invoke(on, "getSuspendedBreakpointNodeIds", null, null); + assertNotNull(nodes); + assertEquals(1, nodes.size()); + assertEquals("bar", nodes.iterator().next()); + + // the message should be ours + String xml = (String) mbeanServer.invoke(on, "dumpTracedMessagesAsXml", new Object[]{"bar"}, new String[]{"java.lang.String"}); + assertNotNull(xml); + log.info(xml); + + assertTrue("Should contain our body", xml.contains("444")); + assertTrue("Should contain bar node", xml.contains("<toNode>bar</toNode>")); + assertTrue("Should contain our added header", xml.contains("<header key=\"beer\" type=\"java.lang.Integer\">123</header>")); + + resetMocks(); + mock.expectedMessageCount(1); + + // resume breakpoint + mbeanServer.invoke(on, "resumeBreakpoint", new Object[]{"bar"}, new String[]{"java.lang.String"}); + + assertMockEndpointsSatisfied(); + + // and no suspended anymore + nodes = (Set<String>) mbeanServer.invoke(on, "getSuspendedBreakpointNodeIds", null, null); + assertNotNull(nodes); + assertEquals(0, nodes.size()); + } + + @SuppressWarnings("unchecked") + public void testBacklogDebuggerRemoveBodyAndHeader() throws Exception { + // JMX tests dont work well on AIX CI servers (hangs them) + if (isPlatform("aix")) { + return; + } + + MBeanServer mbeanServer = getMBeanServer(); + ObjectName on = new ObjectName("org.apache.camel:context=camel-1,type=tracer,name=BacklogDebugger"); + assertNotNull(on); + mbeanServer.isRegistered(on); + + Boolean enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled"); + assertEquals("Should not be enabled", Boolean.FALSE, enabled); + + // enable debugger + mbeanServer.invoke(on, "enableDebugger", null, null); + + enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled"); + assertEquals("Should be enabled", Boolean.TRUE, enabled); + + // add breakpoint at bar + mbeanServer.invoke(on, "addBreakpoint", new Object[]{"foo"}, new String[]{"java.lang.String"}); + mbeanServer.invoke(on, "addBreakpoint", new Object[]{"bar"}, new String[]{"java.lang.String"}); + + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(0); + mock.setSleepForEmptyTest(1000); + + template.sendBody("seda:start", "Hello World"); + + assertMockEndpointsSatisfied(); + + // add breakpoint at bar + Set<String> nodes = (Set<String>) mbeanServer.invoke(on, "getSuspendedBreakpointNodeIds", null, null); + assertNotNull(nodes); + assertEquals(1, nodes.size()); + assertEquals("foo", nodes.iterator().next()); + + // update body and header + mbeanServer.invoke(on, "removeMessageBodyOnBreakpoint", new Object[]{"foo"}, new String[]{"java.lang.String"}); + mbeanServer.invoke(on, "removeMessageHeaderOnBreakpoint", new Object[]{"foo", "beer"}, new String[]{"java.lang.String", "java.lang.String"}); + + // resume breakpoint + mbeanServer.invoke(on, "resumeBreakpoint", new Object[]{"foo"}, new String[]{"java.lang.String"}); + + Thread.sleep(1000); + + // add breakpoint at bar + nodes = (Set<String>) mbeanServer.invoke(on, "getSuspendedBreakpointNodeIds", null, null); + assertNotNull(nodes); + assertEquals(1, nodes.size()); + assertEquals("bar", nodes.iterator().next()); + + // the message should be ours + String xml = (String) mbeanServer.invoke(on, "dumpTracedMessagesAsXml", new Object[]{"bar"}, new String[]{"java.lang.String"}); + assertNotNull(xml); + log.info(xml); + + assertTrue("Should not contain our body", xml.contains("<body>[Body is null]</body>")); + assertTrue("Should contain bar node", xml.contains("<toNode>bar</toNode>")); + assertFalse("Should not contain any headers", xml.contains("<header")); + + resetMocks(); + mock.expectedMessageCount(1); + + // resume breakpoint + mbeanServer.invoke(on, "resumeBreakpoint", new Object[]{"bar"}, new String[]{"java.lang.String"}); + + assertMockEndpointsSatisfied(); + + // and no suspended anymore + nodes = (Set<String>) mbeanServer.invoke(on, "getSuspendedBreakpointNodeIds", null, null); + assertNotNull(nodes); + assertEquals(0, nodes.size()); + } + + @SuppressWarnings("unchecked") public void testBacklogDebuggerSuspendOnlyOneAtBreakpoint() throws Exception { // JMX tests dont work well on AIX CI servers (hangs them) if (isPlatform("aix")) {
