This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 31a2ea9a394 camel-jbang - Debug to work with split eip (#16643)
31a2ea9a394 is described below
commit 31a2ea9a394792e6f2c8254c718ed6e4046c8576
Author: Claus Ibsen <[email protected]>
AuthorDate: Mon Dec 23 16:39:29 2024 +0100
camel-jbang - Debug to work with split eip (#16643)
* CAMEL-21223: Debugger should support stepping inside Split EIP
* camel-core: multicast/split EIP should remove internal exchange property
for aggregation strategy if its no longer in use
* CAMEL-21223: Debugger should support stepping inside Split EIP
---
.../impl/debugger/DefaultBacklogDebugger.java | 40 ++++++++++++++--------
.../camel/impl/debugger/DefaultDebugger.java | 20 ++++++++---
.../apache/camel/processor/MulticastProcessor.java | 4 +++
.../camel/dsl/jbang/core/commands/Debug.java | 14 ++------
4 files changed, 49 insertions(+), 29 deletions(-)
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java
index 51c9025ed05..c3714c06b96 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.impl.debugger;
+import java.util.ArrayDeque;
+import java.util.Deque;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
@@ -75,7 +77,7 @@ public final class DefaultBacklogDebugger extends
ServiceSupport implements Back
private final ConcurrentMap<String, BacklogTracerEventMessage>
suspendedBreakpointMessages = new ConcurrentHashMap<>();
private final AtomicReference<CountDownLatch> suspend = new
AtomicReference<>();
- private volatile String singleStepExchangeId;
+ private final Deque<String> singleStepExchangeId = new ArrayDeque<>();
private boolean suspendMode;
private String initialBreakpoints;
@@ -232,7 +234,11 @@ public final class DefaultBacklogDebugger extends
ServiceSupport implements Back
@Override
public boolean isSingleStepMode() {
- return singleStepExchangeId != null;
+ return !singleStepExchangeId.isEmpty();
+ }
+
+ private boolean isSingleStepMode(String exchangeId) {
+ return singleStepExchangeId.contains(exchangeId);
}
@Override
@@ -350,7 +356,7 @@ public final class DefaultBacklogDebugger extends
ServiceSupport implements Back
@Override
public void removeAllBreakpoints() {
// stop single stepping
- singleStepExchangeId = null;
+ singleStepExchangeId.clear();
for (String nodeId : getSuspendedBreakpointNodeIds()) {
removeBreakpoint(nodeId);
@@ -371,9 +377,9 @@ public final class DefaultBacklogDebugger extends
ServiceSupport implements Back
public void resumeBreakpoint(String nodeId, boolean stepMode) {
logger.log("Resume breakpoint " + nodeId);
- if (!stepMode && singleStepExchangeId != null) {
- debugger.stopSingleStepExchange(singleStepExchangeId);
- singleStepExchangeId = null;
+ if (!stepMode && !singleStepExchangeId.isEmpty()) {
+ singleStepExchangeId.forEach(debugger::stopSingleStepExchange);
+ singleStepExchangeId.clear();
}
// remember to remove the dumped message as its no longer in need
@@ -565,7 +571,7 @@ public final class DefaultBacklogDebugger extends
ServiceSupport implements Back
public void resumeAll() {
logger.log("Resume all");
// stop single stepping
- singleStepExchangeId = null;
+ singleStepExchangeId.clear();
for (String node : getSuspendedBreakpointNodeIds()) {
// remember to remove the dumped message as its no longer in need
@@ -594,8 +600,11 @@ public final class DefaultBacklogDebugger extends
ServiceSupport implements Back
String nodeId = msg.getToNode();
NodeBreakpoint breakpoint = breakpoints.get(nodeId);
if (breakpoint != null) {
- singleStepExchangeId = msg.getExchangeId();
- if (debugger.startSingleStepExchange(singleStepExchangeId, new
StepBreakpoint())) {
+ String tid = !singleStepExchangeId.isEmpty() ?
singleStepExchangeId.peek() : null;
+ if (tid == null || !tid.equals(msg.getExchangeId())) {
+ singleStepExchangeId.push(msg.getExchangeId());
+ }
+ if (debugger.startSingleStepExchange(msg.getExchangeId(), new
StepBreakpoint())) {
// now resume
resumeBreakpoint(nodeId, true);
}
@@ -616,8 +625,11 @@ public final class DefaultBacklogDebugger extends
ServiceSupport implements Back
BacklogTracerEventMessage msg =
suspendedBreakpointMessages.get(nodeId);
NodeBreakpoint breakpoint = breakpoints.get(nodeId);
if (msg != null && breakpoint != null) {
- singleStepExchangeId = msg.getExchangeId();
- if (debugger.startSingleStepExchange(singleStepExchangeId, new
StepBreakpoint())) {
+ String tid = !singleStepExchangeId.isEmpty() ?
singleStepExchangeId.peek() : null;
+ if (tid == null || !tid.equals(msg.getExchangeId())) {
+ singleStepExchangeId.push(msg.getExchangeId());
+ }
+ if (debugger.startSingleStepExchange(msg.getExchangeId(), new
StepBreakpoint())) {
// now resume
resumeBreakpoint(nodeId, true);
}
@@ -1000,13 +1012,13 @@ public final class DefaultBacklogDebugger extends
ServiceSupport implements Back
NamedRoute route = getOriginalRoute(exchange);
String completedId = event.getExchange().getExchangeId();
try {
- if (isSingleStepIncludeStartEnd() && singleStepExchangeId
!= null
- && singleStepExchangeId.equals(completedId)) {
+ String tid = !singleStepExchangeId.isEmpty() ?
singleStepExchangeId.peek() : null;
+ if (isSingleStepIncludeStartEnd() &&
completedId.equals(tid)) {
doCompleted(exchange, definition, route, cause);
}
} finally {
+ singleStepExchangeId.remove(completedId);
logger.log("ExchangeId: " + completedId + " is completed,
so exiting single step mode.");
- singleStepExchangeId = null;
}
}
}
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultDebugger.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultDebugger.java
index b001120c5ac..4fcdbefc793 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultDebugger.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultDebugger.java
@@ -52,7 +52,7 @@ public class DefaultDebugger extends ServiceSupport
implements Debugger, CamelCo
private final EventNotifier debugEventNotifier = new DebugEventNotifier();
private final List<BreakpointConditions> breakpoints = new
CopyOnWriteArrayList<>();
- private final int maxConcurrentSingleSteps = 1;
+ private final int maxConcurrentSingleSteps = 100;
private final Map<String, Breakpoint> singleSteps = new
HashMap<>(maxConcurrentSingleSteps);
private CamelContext camelContext;
@@ -217,7 +217,7 @@ public class DefaultDebugger extends ServiceSupport
implements Debugger, CamelCo
@Override
public boolean beforeProcess(Exchange exchange, Processor processor,
NamedNode definition) {
// is the exchange in single step mode?
- Breakpoint singleStep = singleSteps.get(exchange.getExchangeId());
+ Breakpoint singleStep = getSingleStepBreakpoint(exchange);
if (singleStep != null) {
onBeforeProcess(exchange, processor, definition, singleStep);
return true;
@@ -241,7 +241,7 @@ public class DefaultDebugger extends ServiceSupport
implements Debugger, CamelCo
@Override
public boolean afterProcess(Exchange exchange, Processor processor,
NamedNode definition, long timeTaken) {
// is the exchange in single step mode?
- Breakpoint singleStep = singleSteps.get(exchange.getExchangeId());
+ Breakpoint singleStep = getSingleStepBreakpoint(exchange);
if (singleStep != null) {
onAfterProcess(exchange, processor, definition, timeTaken,
singleStep);
return true;
@@ -265,7 +265,7 @@ public class DefaultDebugger extends ServiceSupport
implements Debugger, CamelCo
@Override
public boolean onEvent(Exchange exchange, ExchangeEvent event) {
// is the exchange in single step mode?
- Breakpoint singleStep = singleSteps.get(exchange.getExchangeId());
+ Breakpoint singleStep = getSingleStepBreakpoint(exchange);
if (singleStep != null) {
onEvent(exchange, event, singleStep);
return true;
@@ -317,6 +317,18 @@ public class DefaultDebugger extends ServiceSupport
implements Debugger, CamelCo
}
}
+ private Breakpoint getSingleStepBreakpoint(Exchange exchange) {
+ Breakpoint answer = singleSteps.get(exchange.getExchangeId());
+ if (answer == null) {
+ // we may step into an EIP such as split so check via correlation
id (parent exchange)
+ String id =
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
+ if (id != null) {
+ answer = singleSteps.get(id);
+ }
+ }
+ return answer;
+ }
+
private boolean matchConditions(
Exchange exchange, Processor processor, NamedNode definition,
BreakpointConditions breakpoint, boolean before) {
for (Condition condition : breakpoint.getConditions()) {
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index f9514de06a1..3fd2143fdd8 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -1278,6 +1278,10 @@ public class MulticastProcessor extends
AsyncProcessorSupport
}
// remove the strategy using this processor as the key
map.remove(this);
+ // and remove map if its empty
+ if (map.isEmpty()) {
+ exchange.removeProperty(ExchangePropertyKey.AGGREGATION_STRATEGY);
+ }
}
/**
diff --git
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/Debug.java
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/Debug.java
index 70fdf5b6427..f6c2b4faa21 100644
---
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/Debug.java
+++
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/Debug.java
@@ -219,7 +219,7 @@ public class Debug extends Run {
logUpdated.set(true);
}
}
- sendDebugCommand(spawnPid, "step", line, null);
+ sendDebugCommand(spawnPid, "step", null);
}
// user have pressed ENTER so continue
waitForUser.set(false);
@@ -309,7 +309,7 @@ public class Debug extends Run {
return 0;
}
- private void sendDebugCommand(long pid, String command, String argument,
String breakpoint) {
+ private void sendDebugCommand(long pid, String command, String breakpoint)
{
// ensure output file is deleted before executing action
File outputFile = getOutputFile(Long.toString(pid));
FileUtil.deleteFile(outputFile);
@@ -319,14 +319,6 @@ public class Debug extends Run {
if (command != null) {
root.put("command", command);
}
- if (argument != null && !argument.isBlank()) {
- if ("i".equals(argument)) {
- argument = "into";
- } else if ("o".equals(argument)) {
- argument = "over";
- }
- root.put("argument", argument);
- }
if (breakpoint != null) {
root.put("breakpoint", breakpoint);
}
@@ -479,7 +471,7 @@ public class Debug extends Run {
}
}
- String msg = " Breakpoint suspended. Press ENTER to
continue (i = step into (default), o = step over).";
+ String msg = " Breakpoint suspended. Press ENTER to
continue.";
if (loggingColor) {
AnsiConsole.out().println(Ansi.ansi().a(Ansi.Attribute.INTENSITY_BOLD).a(msg).reset());
} else {