This is an automated email from the ASF dual-hosted git repository. nfilotto pushed a commit to branch CAMEL-18260/reflect-changes-in-backlog-tracer-event-message-3.18 in repository https://gitbox.apache.org/repos/asf/camel.git
commit 01d6567497b6f357e9607abbe4cc7248553bd1f2 Author: Nicolas Filotto <[email protected]> AuthorDate: Fri Jul 8 19:08:58 2022 +0200 CAMEL-18260: debugger - Reflect exchange changes on BacklogTracerEventMessage (#7991) ## Motivation While investigating on https://github.com/camel-tooling/camel-idea-plugin/issues/734, I realized that is was actually due to the fact that when a change is made on an exchange, the corresponding backlog tracer event message is not modified such that the change is not reflected in the debugger variables. ## Modifications: * Adds the new method `refreshBacklogTracerEventMessage` to refresh the content of the message when the exchange is modified * Fixes warnings raised by IntelliJ (not related to the issue) --- .../camel/impl/debugger/BacklogDebugger.java | 107 ++++++++++++++------- .../camel/management/BacklogDebuggerTest.java | 22 +++++ 2 files changed, 93 insertions(+), 36 deletions(-) diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogDebugger.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogDebugger.java index f95315ed42f..fdc3398b3e4 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogDebugger.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogDebugger.java @@ -107,7 +107,7 @@ public final class BacklogDebugger extends ServiceSupport { private final CountDownLatch latch; /** - * @param exchange the suspend exchange + * @param exchange the suspended exchange * @param latch the latch to use to continue routing the exchange */ private SuspendedExchange(Exchange exchange, CountDownLatch latch) { @@ -338,11 +338,9 @@ public final class BacklogDebugger extends ServiceSupport { private void resumeBreakpoint(String nodeId, boolean stepMode) { logger.log("Resume breakpoint " + nodeId); - if (!stepMode) { - if (singleStepExchangeId != null) { - debugger.stopSingleStepExchange(singleStepExchangeId); - singleStepExchangeId = null; - } + if (!stepMode && singleStepExchangeId != null) { + debugger.stopSingleStepExchange(singleStepExchangeId); + singleStepExchangeId = null; } // remember to remove the dumped message as its no longer in need @@ -360,8 +358,8 @@ public final class BacklogDebugger extends ServiceSupport { if (remove) { removeMessageBodyOnBreakpoint(nodeId); } else { - Class<?> oldType = se.getExchange().getMessage().getBody() != null - ? se.getExchange().getMessage().getBody().getClass() : null; + Class<?> oldType = se.getExchange().getMessage().getBody() == null + ? null : se.getExchange().getMessage().getBody().getClass(); setMessageBodyOnBreakpoint(nodeId, body, oldType); } } @@ -374,14 +372,15 @@ public final class BacklogDebugger extends ServiceSupport { if (remove) { removeMessageBodyOnBreakpoint(nodeId); } else { - logger.log("Breakpoint at node " + nodeId + " is updating message body on exchangeId: " - + se.getExchange().getExchangeId() + " with new body: " + body); + logger.log(String.format("Breakpoint at node %s is updating message body on exchangeId: %s with new body: %s", + nodeId, se.getExchange().getExchangeId(), body)); // preserve type - if (type != null) { - se.getExchange().getMessage().setBody(body, type); - } else { + if (type == null) { se.getExchange().getMessage().setBody(body); + } else { + se.getExchange().getMessage().setBody(body, type); } + refreshBacklogTracerEventMessage(nodeId, se); } } } @@ -389,9 +388,10 @@ public final class BacklogDebugger extends ServiceSupport { public void removeMessageBodyOnBreakpoint(String nodeId) { SuspendedExchange se = suspendedBreakpoints.get(nodeId); if (se != null) { - logger.log("Breakpoint at node " + nodeId + " is removing message body on exchangeId: " - + se.getExchange().getExchangeId()); + logger.log(String.format("Breakpoint at node %s is removing message body on exchangeId: %s", nodeId, + se.getExchange().getExchangeId())); se.getExchange().getMessage().setBody(null); + refreshBacklogTracerEventMessage(nodeId, se); } } @@ -399,8 +399,8 @@ public final class BacklogDebugger extends ServiceSupport { throws NoTypeConversionAvailableException { SuspendedExchange se = suspendedBreakpoints.get(nodeId); if (se != null) { - Class<?> oldType = se.getExchange().getMessage().getHeader(headerName) != null - ? se.getExchange().getMessage().getHeader(headerName).getClass() : null; + Class<?> oldType = se.getExchange().getMessage().getHeader(headerName) == null + ? null : se.getExchange().getMessage().getHeader(headerName).getClass(); setMessageHeaderOnBreakpoint(nodeId, headerName, value, oldType); } } @@ -411,13 +411,14 @@ public final class BacklogDebugger extends ServiceSupport { if (se != null) { logger.log("Breakpoint at node " + nodeId + " is updating message header on exchangeId: " + se.getExchange().getExchangeId() + " with header: " + headerName + " and value: " + value); - if (type != null) { + if (type == null) { + se.getExchange().getMessage().setHeader(headerName, value); + } else { Object convertedValue = se.getExchange().getContext().getTypeConverter().mandatoryConvertTo(type, se.getExchange(), value); se.getExchange().getMessage().setHeader(headerName, convertedValue); - } else { - se.getExchange().getMessage().setHeader(headerName, value); } + refreshBacklogTracerEventMessage(nodeId, se); } } @@ -435,6 +436,7 @@ public final class BacklogDebugger extends ServiceSupport { logger.log("Breakpoint at node " + nodeId + " is removing message header on exchangeId: " + se.getExchange().getExchangeId() + " with header: " + headerName); se.getExchange().getMessage().removeHeader(headerName); + refreshBacklogTracerEventMessage(nodeId, se); } } @@ -496,7 +498,7 @@ public final class BacklogDebugger extends ServiceSupport { */ public Exchange getSuspendedExchange(String id) { SuspendedExchange suspendedExchange = suspendedBreakpoints.get(id); - return suspendedExchange != null ? suspendedExchange.getExchange() : null; + return suspendedExchange == null ? null : suspendedExchange.getExchange(); } public void disableBreakpoint(String nodeId) { @@ -542,11 +544,10 @@ public final class BacklogDebugger extends ServiceSupport { public String dumpTracedMessagesAsXml(String nodeId) { logger.log("Dump trace message from breakpoint " + nodeId); BacklogTracerEventMessage msg = suspendedBreakpointMessages.get(nodeId); - if (msg != null) { - return msg.toXml(0); - } else { + if (msg == null) { return null; } + return msg.toXml(0); } public long getDebugCounter() { @@ -586,7 +587,7 @@ public final class BacklogDebugger extends ServiceSupport { } private void clearBreakpoints() { - // make sure to clear state and latches is counted down so we wont have hanging threads + // make sure to clear state and latches is counted down, so we won't have hanging threads breakpoints.clear(); for (SuspendedExchange se : suspendedBreakpoints.values()) { se.getLatch().countDown(); @@ -595,6 +596,34 @@ public final class BacklogDebugger extends ServiceSupport { suspendedBreakpointMessages.clear(); } + /** + * Refresh the content of the existing backlog tracer event message corresponding to the given node id with the new + * content of exchange. + * + * @param nodeId the node id for the breakpoint + * @param suspendedExchange the content of the new suspended exchange to use to refresh the backlog tracer event + * message. + */ + private void refreshBacklogTracerEventMessage(String nodeId, SuspendedExchange suspendedExchange) { + suspendedBreakpointMessages.computeIfPresent( + nodeId, + (nId, message) -> new DefaultBacklogTracerEventMessage( + message.getUid(), message.getTimestamp(), message.getRouteId(), message.getToNode(), + message.getExchangeId(), + dumpAsXml(suspendedExchange.getExchange()))); + } + + /** + * Dumps the message as a generic XML structure. + * + * @param exchange the exchange to dump as XML + * @return the XML + */ + private String dumpAsXml(Exchange exchange) { + return MessageHelper.dumpAsXml(exchange.getIn(), true, 2, isBodyIncludeStreams(), isBodyIncludeFiles(), + getBodyMaxChars()); + } + /** * Represents a {@link org.apache.camel.spi.Breakpoint} that has a {@link Condition} on a specific node id. */ @@ -623,8 +652,7 @@ public final class BacklogDebugger extends ServiceSupport { String toNode = definition.getId(); String routeId = CamelContextHelper.getRouteId(definition); String exchangeId = exchange.getExchangeId(); - String messageAsXml = MessageHelper.dumpAsXml(exchange.getIn(), true, 2, isBodyIncludeStreams(), - isBodyIncludeFiles(), getBodyMaxChars()); + String messageAsXml = dumpAsXml(exchange); long uid = debugCounter.incrementAndGet(); BacklogTracerEventMessage msg @@ -635,17 +663,21 @@ public final class BacklogDebugger extends ServiceSupport { final SuspendedExchange se = suspendedBreakpoints.get(nodeId); if (se != null) { // now wait until we should continue - logger.log("NodeBreakpoint at node " + toNode + " is waiting to continue for exchangeId: " + exchangeId); + logger.log(String.format("NodeBreakpoint at node %s is waiting to continue for exchangeId: %s", toNode, + exchangeId)); try { boolean hit = se.getLatch().await(fallbackTimeout, TimeUnit.SECONDS); if (!hit) { - logger.log("NodeBreakpoint at node " + toNode + " timed out and is continued exchangeId: " + exchangeId, + logger.log( + String.format("NodeBreakpoint at node %s timed out and is continued exchangeId: %s", toNode, + exchangeId), LoggingLevel.WARN); } else { - logger.log("NodeBreakpoint at node " + toNode + " is continued exchangeId: " + exchangeId); + logger.log(String.format("NodeBreakpoint at node %s is continued exchangeId: %s", toNode, exchangeId)); } } catch (InterruptedException e) { // ignore + Thread.currentThread().interrupt(); } } } @@ -686,8 +718,7 @@ public final class BacklogDebugger extends ServiceSupport { String toNode = definition.getId(); String routeId = CamelContextHelper.getRouteId(definition); String exchangeId = exchange.getExchangeId(); - String messageAsXml = MessageHelper.dumpAsXml(exchange.getIn(), true, 2, isBodyIncludeStreams(), - isBodyIncludeFiles(), getBodyMaxChars()); + String messageAsXml = dumpAsXml(exchange); long uid = debugCounter.incrementAndGet(); BacklogTracerEventMessage msg @@ -700,18 +731,22 @@ public final class BacklogDebugger extends ServiceSupport { // now wait until we should continue logger.log( - "StepBreakpoint at node " + toNode + " is waiting to continue for exchangeId: " + exchange.getExchangeId()); + String.format("StepBreakpoint at node %s is waiting to continue for exchangeId: %s", toNode, + exchange.getExchangeId())); try { boolean hit = se.getLatch().await(fallbackTimeout, TimeUnit.SECONDS); if (!hit) { - logger.log("StepBreakpoint at node " + toNode + " timed out and is continued exchangeId: " - + exchange.getExchangeId(), + logger.log( + String.format("StepBreakpoint at node %s timed out and is continued exchangeId: %s", toNode, + exchange.getExchangeId()), LoggingLevel.WARN); } else { - logger.log("StepBreakpoint at node " + toNode + " is continued exchangeId: " + exchange.getExchangeId()); + logger.log(String.format("StepBreakpoint at node %s is continued exchangeId: %s", toNode, + exchange.getExchangeId())); } } catch (InterruptedException e) { // ignore + Thread.currentThread().interrupt(); } } diff --git a/core/camel-management/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java b/core/camel-management/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java index 21386b3526c..1d1cc1c443c 100644 --- a/core/camel-management/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java +++ b/core/camel-management/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java @@ -250,6 +250,28 @@ public class BacklogDebuggerTest extends ManagementTestSupport { "Should contain our added header"); assertTrue(xml.contains("<exchangeProperty name=\"food\" type=\"java.lang.Integer\">987</exchangeProperty>"), "Should contain our added exchange property"); + + // update body and header + mbeanServer.invoke(on, "setMessageBodyOnBreakpoint", new Object[] { "bar", "555", "java.lang.Integer" }, + new String[] { "java.lang.String", "java.lang.Object", "java.lang.String" }); + mbeanServer.invoke(on, "setMessageHeaderOnBreakpoint", new Object[] { "bar", "wine", "456", "java.lang.Integer" }, + new String[] { "java.lang.String", "java.lang.String", "java.lang.Object", "java.lang.String" }); + mbeanServer.invoke(on, "setExchangePropertyOnBreakpoint", new Object[] { "bar", "drink", "798", "java.lang.Integer" }, + new String[] { "java.lang.String", "java.lang.String", "java.lang.Object", "java.lang.String" }); + + // the message should be updated + xml = (String) mbeanServer.invoke(on, "dumpTracedMessagesAsXml", new Object[] { "bar", true }, + new String[] { "java.lang.String", "boolean" }); + assertNotNull(xml); + log.info(xml); + + assertTrue(xml.contains("555"), "Should contain our body"); + assertTrue(xml.contains("<toNode>bar</toNode>"), "Should contain bar node"); + assertTrue(xml.contains("<header key=\"wine\" type=\"java.lang.Integer\">456</header>"), + "Should contain our added header"); + assertTrue(xml.contains("<exchangeProperty name=\"drink\" type=\"java.lang.Integer\">798</exchangeProperty>"), + "Should contain our added exchange property"); + resetMocks(); mock.expectedMessageCount(1);
