This is an automated email from the ASF dual-hosted git repository.
nfilotto pushed a commit to branch camel-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.18.x by this push:
new 20cfedbdd98 CAMEL-18260: debugger - Reflect exchange changes on
BacklogTracerEventMessage (#7991) (#7994)
20cfedbdd98 is described below
commit 20cfedbdd987b09205f1ce425ef2716bf6e4ca99
Author: Nicolas Filotto <[email protected]>
AuthorDate: Fri Jul 8 19:18:30 2022 +0200
CAMEL-18260: debugger - Reflect exchange changes on
BacklogTracerEventMessage (#7991) (#7994)
## Motivation
While investigating on
https://github.com/camel-tooling/camel-idea-plugin/issues/734, I realized that
is was actually due to the fact that when a change is made on an exchange, the
corresponding backlog tracer event message is not modified such that the change
is not reflected in the debugger variables.
## Modifications:
* Adds the new method `refreshBacklogTracerEventMessage` to refresh the
content of the message when the exchange is modified
* Fixes warnings raised by IntelliJ (not related to the issue)
---
.../camel/impl/debugger/BacklogDebugger.java | 107 ++++++++++++++-------
.../camel/management/BacklogDebuggerTest.java | 22 +++++
2 files changed, 93 insertions(+), 36 deletions(-)
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogDebugger.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogDebugger.java
index f95315ed42f..fdc3398b3e4 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogDebugger.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogDebugger.java
@@ -107,7 +107,7 @@ public final class BacklogDebugger extends ServiceSupport {
private final CountDownLatch latch;
/**
- * @param exchange the suspend exchange
+ * @param exchange the suspended exchange
* @param latch the latch to use to continue routing the exchange
*/
private SuspendedExchange(Exchange exchange, CountDownLatch latch) {
@@ -338,11 +338,9 @@ public final class BacklogDebugger extends ServiceSupport {
private void resumeBreakpoint(String nodeId, boolean stepMode) {
logger.log("Resume breakpoint " + nodeId);
- if (!stepMode) {
- if (singleStepExchangeId != null) {
- debugger.stopSingleStepExchange(singleStepExchangeId);
- singleStepExchangeId = null;
- }
+ if (!stepMode && singleStepExchangeId != null) {
+ debugger.stopSingleStepExchange(singleStepExchangeId);
+ singleStepExchangeId = null;
}
// remember to remove the dumped message as its no longer in need
@@ -360,8 +358,8 @@ public final class BacklogDebugger extends ServiceSupport {
if (remove) {
removeMessageBodyOnBreakpoint(nodeId);
} else {
- Class<?> oldType = se.getExchange().getMessage().getBody() !=
null
- ? se.getExchange().getMessage().getBody().getClass() :
null;
+ Class<?> oldType = se.getExchange().getMessage().getBody() ==
null
+ ? null :
se.getExchange().getMessage().getBody().getClass();
setMessageBodyOnBreakpoint(nodeId, body, oldType);
}
}
@@ -374,14 +372,15 @@ public final class BacklogDebugger extends ServiceSupport
{
if (remove) {
removeMessageBodyOnBreakpoint(nodeId);
} else {
- logger.log("Breakpoint at node " + nodeId + " is updating
message body on exchangeId: "
- + se.getExchange().getExchangeId() + " with new
body: " + body);
+ logger.log(String.format("Breakpoint at node %s is updating
message body on exchangeId: %s with new body: %s",
+ nodeId, se.getExchange().getExchangeId(), body));
// preserve type
- if (type != null) {
- se.getExchange().getMessage().setBody(body, type);
- } else {
+ if (type == null) {
se.getExchange().getMessage().setBody(body);
+ } else {
+ se.getExchange().getMessage().setBody(body, type);
}
+ refreshBacklogTracerEventMessage(nodeId, se);
}
}
}
@@ -389,9 +388,10 @@ public final class BacklogDebugger extends ServiceSupport {
public void removeMessageBodyOnBreakpoint(String nodeId) {
SuspendedExchange se = suspendedBreakpoints.get(nodeId);
if (se != null) {
- logger.log("Breakpoint at node " + nodeId + " is removing message
body on exchangeId: "
- + se.getExchange().getExchangeId());
+ logger.log(String.format("Breakpoint at node %s is removing
message body on exchangeId: %s", nodeId,
+ se.getExchange().getExchangeId()));
se.getExchange().getMessage().setBody(null);
+ refreshBacklogTracerEventMessage(nodeId, se);
}
}
@@ -399,8 +399,8 @@ public final class BacklogDebugger extends ServiceSupport {
throws NoTypeConversionAvailableException {
SuspendedExchange se = suspendedBreakpoints.get(nodeId);
if (se != null) {
- Class<?> oldType =
se.getExchange().getMessage().getHeader(headerName) != null
- ?
se.getExchange().getMessage().getHeader(headerName).getClass() : null;
+ Class<?> oldType =
se.getExchange().getMessage().getHeader(headerName) == null
+ ? null :
se.getExchange().getMessage().getHeader(headerName).getClass();
setMessageHeaderOnBreakpoint(nodeId, headerName, value, oldType);
}
}
@@ -411,13 +411,14 @@ public final class BacklogDebugger extends ServiceSupport
{
if (se != null) {
logger.log("Breakpoint at node " + nodeId + " is updating message
header on exchangeId: "
+ se.getExchange().getExchangeId() + " with header: " +
headerName + " and value: " + value);
- if (type != null) {
+ if (type == null) {
+ se.getExchange().getMessage().setHeader(headerName, value);
+ } else {
Object convertedValue
=
se.getExchange().getContext().getTypeConverter().mandatoryConvertTo(type,
se.getExchange(), value);
se.getExchange().getMessage().setHeader(headerName,
convertedValue);
- } else {
- se.getExchange().getMessage().setHeader(headerName, value);
}
+ refreshBacklogTracerEventMessage(nodeId, se);
}
}
@@ -435,6 +436,7 @@ public final class BacklogDebugger extends ServiceSupport {
logger.log("Breakpoint at node " + nodeId + " is removing message
header on exchangeId: "
+ se.getExchange().getExchangeId() + " with header: " +
headerName);
se.getExchange().getMessage().removeHeader(headerName);
+ refreshBacklogTracerEventMessage(nodeId, se);
}
}
@@ -496,7 +498,7 @@ public final class BacklogDebugger extends ServiceSupport {
*/
public Exchange getSuspendedExchange(String id) {
SuspendedExchange suspendedExchange = suspendedBreakpoints.get(id);
- return suspendedExchange != null ? suspendedExchange.getExchange() :
null;
+ return suspendedExchange == null ? null :
suspendedExchange.getExchange();
}
public void disableBreakpoint(String nodeId) {
@@ -542,11 +544,10 @@ public final class BacklogDebugger extends ServiceSupport
{
public String dumpTracedMessagesAsXml(String nodeId) {
logger.log("Dump trace message from breakpoint " + nodeId);
BacklogTracerEventMessage msg =
suspendedBreakpointMessages.get(nodeId);
- if (msg != null) {
- return msg.toXml(0);
- } else {
+ if (msg == null) {
return null;
}
+ return msg.toXml(0);
}
public long getDebugCounter() {
@@ -586,7 +587,7 @@ public final class BacklogDebugger extends ServiceSupport {
}
private void clearBreakpoints() {
- // make sure to clear state and latches is counted down so we wont
have hanging threads
+ // make sure to clear state and latches is counted down, so we won't
have hanging threads
breakpoints.clear();
for (SuspendedExchange se : suspendedBreakpoints.values()) {
se.getLatch().countDown();
@@ -595,6 +596,34 @@ public final class BacklogDebugger extends ServiceSupport {
suspendedBreakpointMessages.clear();
}
+ /**
+ * Refresh the content of the existing backlog tracer event message
corresponding to the given node id with the new
+ * content of exchange.
+ *
+ * @param nodeId the node id for the breakpoint
+ * @param suspendedExchange the content of the new suspended exchange to
use to refresh the backlog tracer event
+ * message.
+ */
+ private void refreshBacklogTracerEventMessage(String nodeId,
SuspendedExchange suspendedExchange) {
+ suspendedBreakpointMessages.computeIfPresent(
+ nodeId,
+ (nId, message) -> new DefaultBacklogTracerEventMessage(
+ message.getUid(), message.getTimestamp(),
message.getRouteId(), message.getToNode(),
+ message.getExchangeId(),
+ dumpAsXml(suspendedExchange.getExchange())));
+ }
+
+ /**
+ * Dumps the message as a generic XML structure.
+ *
+ * @param exchange the exchange to dump as XML
+ * @return the XML
+ */
+ private String dumpAsXml(Exchange exchange) {
+ return MessageHelper.dumpAsXml(exchange.getIn(), true, 2,
isBodyIncludeStreams(), isBodyIncludeFiles(),
+ getBodyMaxChars());
+ }
+
/**
* Represents a {@link org.apache.camel.spi.Breakpoint} that has a {@link
Condition} on a specific node id.
*/
@@ -623,8 +652,7 @@ public final class BacklogDebugger extends ServiceSupport {
String toNode = definition.getId();
String routeId = CamelContextHelper.getRouteId(definition);
String exchangeId = exchange.getExchangeId();
- String messageAsXml = MessageHelper.dumpAsXml(exchange.getIn(),
true, 2, isBodyIncludeStreams(),
- isBodyIncludeFiles(), getBodyMaxChars());
+ String messageAsXml = dumpAsXml(exchange);
long uid = debugCounter.incrementAndGet();
BacklogTracerEventMessage msg
@@ -635,17 +663,21 @@ public final class BacklogDebugger extends ServiceSupport
{
final SuspendedExchange se = suspendedBreakpoints.get(nodeId);
if (se != null) {
// now wait until we should continue
- logger.log("NodeBreakpoint at node " + toNode + " is waiting
to continue for exchangeId: " + exchangeId);
+ logger.log(String.format("NodeBreakpoint at node %s is waiting
to continue for exchangeId: %s", toNode,
+ exchangeId));
try {
boolean hit = se.getLatch().await(fallbackTimeout,
TimeUnit.SECONDS);
if (!hit) {
- logger.log("NodeBreakpoint at node " + toNode + "
timed out and is continued exchangeId: " + exchangeId,
+ logger.log(
+ String.format("NodeBreakpoint at node %s timed
out and is continued exchangeId: %s", toNode,
+ exchangeId),
LoggingLevel.WARN);
} else {
- logger.log("NodeBreakpoint at node " + toNode + " is
continued exchangeId: " + exchangeId);
+ logger.log(String.format("NodeBreakpoint at node %s is
continued exchangeId: %s", toNode, exchangeId));
}
} catch (InterruptedException e) {
// ignore
+ Thread.currentThread().interrupt();
}
}
}
@@ -686,8 +718,7 @@ public final class BacklogDebugger extends ServiceSupport {
String toNode = definition.getId();
String routeId = CamelContextHelper.getRouteId(definition);
String exchangeId = exchange.getExchangeId();
- String messageAsXml = MessageHelper.dumpAsXml(exchange.getIn(),
true, 2, isBodyIncludeStreams(),
- isBodyIncludeFiles(), getBodyMaxChars());
+ String messageAsXml = dumpAsXml(exchange);
long uid = debugCounter.incrementAndGet();
BacklogTracerEventMessage msg
@@ -700,18 +731,22 @@ public final class BacklogDebugger extends ServiceSupport
{
// now wait until we should continue
logger.log(
- "StepBreakpoint at node " + toNode + " is waiting to
continue for exchangeId: " + exchange.getExchangeId());
+ String.format("StepBreakpoint at node %s is waiting to
continue for exchangeId: %s", toNode,
+ exchange.getExchangeId()));
try {
boolean hit = se.getLatch().await(fallbackTimeout,
TimeUnit.SECONDS);
if (!hit) {
- logger.log("StepBreakpoint at node " + toNode + " timed
out and is continued exchangeId: "
- + exchange.getExchangeId(),
+ logger.log(
+ String.format("StepBreakpoint at node %s timed out
and is continued exchangeId: %s", toNode,
+ exchange.getExchangeId()),
LoggingLevel.WARN);
} else {
- logger.log("StepBreakpoint at node " + toNode + " is
continued exchangeId: " + exchange.getExchangeId());
+ logger.log(String.format("StepBreakpoint at node %s is
continued exchangeId: %s", toNode,
+ exchange.getExchangeId()));
}
} catch (InterruptedException e) {
// ignore
+ Thread.currentThread().interrupt();
}
}
diff --git
a/core/camel-management/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java
b/core/camel-management/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java
index 21386b3526c..1d1cc1c443c 100644
---
a/core/camel-management/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java
+++
b/core/camel-management/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java
@@ -250,6 +250,28 @@ public class BacklogDebuggerTest extends
ManagementTestSupport {
"Should contain our added header");
assertTrue(xml.contains("<exchangeProperty name=\"food\"
type=\"java.lang.Integer\">987</exchangeProperty>"),
"Should contain our added exchange property");
+
+ // update body and header
+ mbeanServer.invoke(on, "setMessageBodyOnBreakpoint", new Object[] {
"bar", "555", "java.lang.Integer" },
+ new String[] { "java.lang.String", "java.lang.Object",
"java.lang.String" });
+ mbeanServer.invoke(on, "setMessageHeaderOnBreakpoint", new Object[] {
"bar", "wine", "456", "java.lang.Integer" },
+ new String[] { "java.lang.String", "java.lang.String",
"java.lang.Object", "java.lang.String" });
+ mbeanServer.invoke(on, "setExchangePropertyOnBreakpoint", new Object[]
{ "bar", "drink", "798", "java.lang.Integer" },
+ new String[] { "java.lang.String", "java.lang.String",
"java.lang.Object", "java.lang.String" });
+
+ // the message should be updated
+ xml = (String) mbeanServer.invoke(on, "dumpTracedMessagesAsXml", new
Object[] { "bar", true },
+ new String[] { "java.lang.String", "boolean" });
+ assertNotNull(xml);
+ log.info(xml);
+
+ assertTrue(xml.contains("555"), "Should contain our body");
+ assertTrue(xml.contains("<toNode>bar</toNode>"), "Should contain bar
node");
+ assertTrue(xml.contains("<header key=\"wine\"
type=\"java.lang.Integer\">456</header>"),
+ "Should contain our added header");
+ assertTrue(xml.contains("<exchangeProperty name=\"drink\"
type=\"java.lang.Integer\">798</exchangeProperty>"),
+ "Should contain our added exchange property");
+
resetMocks();
mock.expectedMessageCount(1);