This is an automated email from the ASF dual-hosted git repository. nfilotto pushed a commit to branch CAMEL-18260/reflect-changes-in-backlog-tracer-event-message in repository https://gitbox.apache.org/repos/asf/camel.git
commit f560a5f43dd24453dd374d659a97c087351fa2d0 Author: Nicolas Filotto <[email protected]> AuthorDate: Fri Jul 8 16:26:10 2022 +0200 CAMEL-18260: debugger - Reflect exchange changes on BacklogTracerEventMessage --- .../camel/impl/debugger/BacklogDebugger.java | 92 +++++++++++++++------- .../camel/management/BacklogDebuggerTest.java | 22 ++++++ 2 files changed, 85 insertions(+), 29 deletions(-) diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogDebugger.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogDebugger.java index f95315ed42f..4b68faecc34 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogDebugger.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogDebugger.java @@ -107,7 +107,7 @@ public final class BacklogDebugger extends ServiceSupport { private final CountDownLatch latch; /** - * @param exchange the suspend exchange + * @param exchange the suspended exchange * @param latch the latch to use to continue routing the exchange */ private SuspendedExchange(Exchange exchange, CountDownLatch latch) { @@ -338,11 +338,9 @@ public final class BacklogDebugger extends ServiceSupport { private void resumeBreakpoint(String nodeId, boolean stepMode) { logger.log("Resume breakpoint " + nodeId); - if (!stepMode) { - if (singleStepExchangeId != null) { - debugger.stopSingleStepExchange(singleStepExchangeId); - singleStepExchangeId = null; - } + if (!stepMode && singleStepExchangeId != null) { + debugger.stopSingleStepExchange(singleStepExchangeId); + singleStepExchangeId = null; } // remember to remove the dumped message as its no longer in need @@ -374,14 +372,15 @@ public final class BacklogDebugger extends ServiceSupport { if (remove) { removeMessageBodyOnBreakpoint(nodeId); } else { - logger.log("Breakpoint at node " + nodeId + " is updating message body on exchangeId: " - + se.getExchange().getExchangeId() + " with new body: " + body); + logger.log(String.format("Breakpoint at node %s is updating message body on exchangeId: %s with new body: %s", + nodeId, se.getExchange().getExchangeId(), body)); // preserve type - if (type != null) { - se.getExchange().getMessage().setBody(body, type); - } else { + if (type == null) { se.getExchange().getMessage().setBody(body); + } else { + se.getExchange().getMessage().setBody(body, type); } + refreshBacklogTracerEventMessage(nodeId, se); } } } @@ -389,9 +388,10 @@ public final class BacklogDebugger extends ServiceSupport { public void removeMessageBodyOnBreakpoint(String nodeId) { SuspendedExchange se = suspendedBreakpoints.get(nodeId); if (se != null) { - logger.log("Breakpoint at node " + nodeId + " is removing message body on exchangeId: " - + se.getExchange().getExchangeId()); + logger.log(String.format("Breakpoint at node %s is removing message body on exchangeId: %s", nodeId, + se.getExchange().getExchangeId())); se.getExchange().getMessage().setBody(null); + refreshBacklogTracerEventMessage(nodeId, se); } } @@ -399,8 +399,8 @@ public final class BacklogDebugger extends ServiceSupport { throws NoTypeConversionAvailableException { SuspendedExchange se = suspendedBreakpoints.get(nodeId); if (se != null) { - Class<?> oldType = se.getExchange().getMessage().getHeader(headerName) != null - ? se.getExchange().getMessage().getHeader(headerName).getClass() : null; + Class<?> oldType = se.getExchange().getMessage().getHeader(headerName) == null + ? null : se.getExchange().getMessage().getHeader(headerName).getClass(); setMessageHeaderOnBreakpoint(nodeId, headerName, value, oldType); } } @@ -418,6 +418,7 @@ public final class BacklogDebugger extends ServiceSupport { } else { se.getExchange().getMessage().setHeader(headerName, value); } + refreshBacklogTracerEventMessage(nodeId, se); } } @@ -435,6 +436,7 @@ public final class BacklogDebugger extends ServiceSupport { logger.log("Breakpoint at node " + nodeId + " is removing message header on exchangeId: " + se.getExchange().getExchangeId() + " with header: " + headerName); se.getExchange().getMessage().removeHeader(headerName); + refreshBacklogTracerEventMessage(nodeId, se); } } @@ -542,11 +544,10 @@ public final class BacklogDebugger extends ServiceSupport { public String dumpTracedMessagesAsXml(String nodeId) { logger.log("Dump trace message from breakpoint " + nodeId); BacklogTracerEventMessage msg = suspendedBreakpointMessages.get(nodeId); - if (msg != null) { - return msg.toXml(0); - } else { + if (msg == null) { return null; } + return msg.toXml(0); } public long getDebugCounter() { @@ -595,6 +596,33 @@ public final class BacklogDebugger extends ServiceSupport { suspendedBreakpointMessages.clear(); } + /** + * Refresh the content of the existing backlog tracer event message corresponding to the given node id with the new + * content of exchange. + * + * @param nodeId the node id for the breakpoint + * @param se the content of the new exchange to use to refresh the backlog tracer event message; + */ + private void refreshBacklogTracerEventMessage(String nodeId, SuspendedExchange se) { + suspendedBreakpointMessages.computeIfPresent( + nodeId, + (nId, message) -> new DefaultBacklogTracerEventMessage( + message.getUid(), message.getTimestamp(), message.getRouteId(), message.getToNode(), + message.getExchangeId(), + dumpAsXml(se.getExchange()))); + } + + /** + * Dumps the message as a generic XML structure. + * + * @param exchange the exchange to dump as XML + * @return the XML + */ + private String dumpAsXml(Exchange exchange) { + return MessageHelper.dumpAsXml(exchange.getIn(), true, 2, isBodyIncludeStreams(), isBodyIncludeFiles(), + getBodyMaxChars()); + } + /** * Represents a {@link org.apache.camel.spi.Breakpoint} that has a {@link Condition} on a specific node id. */ @@ -623,8 +651,7 @@ public final class BacklogDebugger extends ServiceSupport { String toNode = definition.getId(); String routeId = CamelContextHelper.getRouteId(definition); String exchangeId = exchange.getExchangeId(); - String messageAsXml = MessageHelper.dumpAsXml(exchange.getIn(), true, 2, isBodyIncludeStreams(), - isBodyIncludeFiles(), getBodyMaxChars()); + String messageAsXml = dumpAsXml(exchange); long uid = debugCounter.incrementAndGet(); BacklogTracerEventMessage msg @@ -635,17 +662,21 @@ public final class BacklogDebugger extends ServiceSupport { final SuspendedExchange se = suspendedBreakpoints.get(nodeId); if (se != null) { // now wait until we should continue - logger.log("NodeBreakpoint at node " + toNode + " is waiting to continue for exchangeId: " + exchangeId); + logger.log(String.format("NodeBreakpoint at node %s is waiting to continue for exchangeId: %s", toNode, + exchangeId)); try { boolean hit = se.getLatch().await(fallbackTimeout, TimeUnit.SECONDS); if (!hit) { - logger.log("NodeBreakpoint at node " + toNode + " timed out and is continued exchangeId: " + exchangeId, + logger.log( + String.format("NodeBreakpoint at node %s timed out and is continued exchangeId: %s", toNode, + exchangeId), LoggingLevel.WARN); } else { - logger.log("NodeBreakpoint at node " + toNode + " is continued exchangeId: " + exchangeId); + logger.log(String.format("NodeBreakpoint at node %s is continued exchangeId: %s", toNode, exchangeId)); } } catch (InterruptedException e) { // ignore + Thread.currentThread().interrupt(); } } } @@ -686,8 +717,7 @@ public final class BacklogDebugger extends ServiceSupport { String toNode = definition.getId(); String routeId = CamelContextHelper.getRouteId(definition); String exchangeId = exchange.getExchangeId(); - String messageAsXml = MessageHelper.dumpAsXml(exchange.getIn(), true, 2, isBodyIncludeStreams(), - isBodyIncludeFiles(), getBodyMaxChars()); + String messageAsXml = dumpAsXml(exchange); long uid = debugCounter.incrementAndGet(); BacklogTracerEventMessage msg @@ -700,18 +730,22 @@ public final class BacklogDebugger extends ServiceSupport { // now wait until we should continue logger.log( - "StepBreakpoint at node " + toNode + " is waiting to continue for exchangeId: " + exchange.getExchangeId()); + String.format("StepBreakpoint at node %s is waiting to continue for exchangeId: %s", toNode, + exchange.getExchangeId())); try { boolean hit = se.getLatch().await(fallbackTimeout, TimeUnit.SECONDS); if (!hit) { - logger.log("StepBreakpoint at node " + toNode + " timed out and is continued exchangeId: " - + exchange.getExchangeId(), + logger.log( + String.format("StepBreakpoint at node %s timed out and is continued exchangeId: %s", toNode, + exchange.getExchangeId()), LoggingLevel.WARN); } else { - logger.log("StepBreakpoint at node " + toNode + " is continued exchangeId: " + exchange.getExchangeId()); + logger.log(String.format("StepBreakpoint at node %s is continued exchangeId: %s", toNode, + exchange.getExchangeId())); } } catch (InterruptedException e) { // ignore + Thread.currentThread().interrupt(); } } diff --git a/core/camel-management/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java b/core/camel-management/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java index 21386b3526c..1d1cc1c443c 100644 --- a/core/camel-management/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java +++ b/core/camel-management/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java @@ -250,6 +250,28 @@ public class BacklogDebuggerTest extends ManagementTestSupport { "Should contain our added header"); assertTrue(xml.contains("<exchangeProperty name=\"food\" type=\"java.lang.Integer\">987</exchangeProperty>"), "Should contain our added exchange property"); + + // update body and header + mbeanServer.invoke(on, "setMessageBodyOnBreakpoint", new Object[] { "bar", "555", "java.lang.Integer" }, + new String[] { "java.lang.String", "java.lang.Object", "java.lang.String" }); + mbeanServer.invoke(on, "setMessageHeaderOnBreakpoint", new Object[] { "bar", "wine", "456", "java.lang.Integer" }, + new String[] { "java.lang.String", "java.lang.String", "java.lang.Object", "java.lang.String" }); + mbeanServer.invoke(on, "setExchangePropertyOnBreakpoint", new Object[] { "bar", "drink", "798", "java.lang.Integer" }, + new String[] { "java.lang.String", "java.lang.String", "java.lang.Object", "java.lang.String" }); + + // the message should be updated + xml = (String) mbeanServer.invoke(on, "dumpTracedMessagesAsXml", new Object[] { "bar", true }, + new String[] { "java.lang.String", "boolean" }); + assertNotNull(xml); + log.info(xml); + + assertTrue(xml.contains("555"), "Should contain our body"); + assertTrue(xml.contains("<toNode>bar</toNode>"), "Should contain bar node"); + assertTrue(xml.contains("<header key=\"wine\" type=\"java.lang.Integer\">456</header>"), + "Should contain our added header"); + assertTrue(xml.contains("<exchangeProperty name=\"drink\" type=\"java.lang.Integer\">798</exchangeProperty>"), + "Should contain our added exchange property"); + resetMocks(); mock.expectedMessageCount(1);
