This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 31a2ea9a394 camel-jbang - Debug to work with split eip (#16643)
31a2ea9a394 is described below

commit 31a2ea9a394792e6f2c8254c718ed6e4046c8576
Author: Claus Ibsen <[email protected]>
AuthorDate: Mon Dec 23 16:39:29 2024 +0100

    camel-jbang - Debug to work with split eip (#16643)
    
    * CAMEL-21223: Debugger should support stepping inside Split EIP
    
    * camel-core: multicast/split EIP should remove internal exchange property 
for aggregation strategy if its no longer in use
    
    * CAMEL-21223: Debugger should support stepping inside Split EIP
---
 .../impl/debugger/DefaultBacklogDebugger.java      | 40 ++++++++++++++--------
 .../camel/impl/debugger/DefaultDebugger.java       | 20 ++++++++---
 .../apache/camel/processor/MulticastProcessor.java |  4 +++
 .../camel/dsl/jbang/core/commands/Debug.java       | 14 ++------
 4 files changed, 49 insertions(+), 29 deletions(-)

diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java
index 51c9025ed05..c3714c06b96 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.impl.debugger;
 
+import java.util.ArrayDeque;
+import java.util.Deque;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Set;
@@ -75,7 +77,7 @@ public final class DefaultBacklogDebugger extends 
ServiceSupport implements Back
     private final ConcurrentMap<String, BacklogTracerEventMessage> 
suspendedBreakpointMessages = new ConcurrentHashMap<>();
 
     private final AtomicReference<CountDownLatch> suspend = new 
AtomicReference<>();
-    private volatile String singleStepExchangeId;
+    private final Deque<String> singleStepExchangeId = new ArrayDeque<>();
 
     private boolean suspendMode;
     private String initialBreakpoints;
@@ -232,7 +234,11 @@ public final class DefaultBacklogDebugger extends 
ServiceSupport implements Back
 
     @Override
     public boolean isSingleStepMode() {
-        return singleStepExchangeId != null;
+        return !singleStepExchangeId.isEmpty();
+    }
+
+    private boolean isSingleStepMode(String exchangeId) {
+        return singleStepExchangeId.contains(exchangeId);
     }
 
     @Override
@@ -350,7 +356,7 @@ public final class DefaultBacklogDebugger extends 
ServiceSupport implements Back
     @Override
     public void removeAllBreakpoints() {
         // stop single stepping
-        singleStepExchangeId = null;
+        singleStepExchangeId.clear();
 
         for (String nodeId : getSuspendedBreakpointNodeIds()) {
             removeBreakpoint(nodeId);
@@ -371,9 +377,9 @@ public final class DefaultBacklogDebugger extends 
ServiceSupport implements Back
     public void resumeBreakpoint(String nodeId, boolean stepMode) {
         logger.log("Resume breakpoint " + nodeId);
 
-        if (!stepMode && singleStepExchangeId != null) {
-            debugger.stopSingleStepExchange(singleStepExchangeId);
-            singleStepExchangeId = null;
+        if (!stepMode && !singleStepExchangeId.isEmpty()) {
+            singleStepExchangeId.forEach(debugger::stopSingleStepExchange);
+            singleStepExchangeId.clear();
         }
 
         // remember to remove the dumped message as its no longer in need
@@ -565,7 +571,7 @@ public final class DefaultBacklogDebugger extends 
ServiceSupport implements Back
     public void resumeAll() {
         logger.log("Resume all");
         // stop single stepping
-        singleStepExchangeId = null;
+        singleStepExchangeId.clear();
 
         for (String node : getSuspendedBreakpointNodeIds()) {
             // remember to remove the dumped message as its no longer in need
@@ -594,8 +600,11 @@ public final class DefaultBacklogDebugger extends 
ServiceSupport implements Back
             String nodeId = msg.getToNode();
             NodeBreakpoint breakpoint = breakpoints.get(nodeId);
             if (breakpoint != null) {
-                singleStepExchangeId = msg.getExchangeId();
-                if (debugger.startSingleStepExchange(singleStepExchangeId, new 
StepBreakpoint())) {
+                String tid = !singleStepExchangeId.isEmpty() ? 
singleStepExchangeId.peek() : null;
+                if (tid == null || !tid.equals(msg.getExchangeId())) {
+                    singleStepExchangeId.push(msg.getExchangeId());
+                }
+                if (debugger.startSingleStepExchange(msg.getExchangeId(), new 
StepBreakpoint())) {
                     // now resume
                     resumeBreakpoint(nodeId, true);
                 }
@@ -616,8 +625,11 @@ public final class DefaultBacklogDebugger extends 
ServiceSupport implements Back
         BacklogTracerEventMessage msg = 
suspendedBreakpointMessages.get(nodeId);
         NodeBreakpoint breakpoint = breakpoints.get(nodeId);
         if (msg != null && breakpoint != null) {
-            singleStepExchangeId = msg.getExchangeId();
-            if (debugger.startSingleStepExchange(singleStepExchangeId, new 
StepBreakpoint())) {
+            String tid = !singleStepExchangeId.isEmpty() ? 
singleStepExchangeId.peek() : null;
+            if (tid == null || !tid.equals(msg.getExchangeId())) {
+                singleStepExchangeId.push(msg.getExchangeId());
+            }
+            if (debugger.startSingleStepExchange(msg.getExchangeId(), new 
StepBreakpoint())) {
                 // now resume
                 resumeBreakpoint(nodeId, true);
             }
@@ -1000,13 +1012,13 @@ public final class DefaultBacklogDebugger extends 
ServiceSupport implements Back
                 NamedRoute route = getOriginalRoute(exchange);
                 String completedId = event.getExchange().getExchangeId();
                 try {
-                    if (isSingleStepIncludeStartEnd() && singleStepExchangeId 
!= null
-                            && singleStepExchangeId.equals(completedId)) {
+                    String tid = !singleStepExchangeId.isEmpty() ? 
singleStepExchangeId.peek() : null;
+                    if (isSingleStepIncludeStartEnd() && 
completedId.equals(tid)) {
                         doCompleted(exchange, definition, route, cause);
                     }
                 } finally {
+                    singleStepExchangeId.remove(completedId);
                     logger.log("ExchangeId: " + completedId + " is completed, 
so exiting single step mode.");
-                    singleStepExchangeId = null;
                 }
             }
         }
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultDebugger.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultDebugger.java
index b001120c5ac..4fcdbefc793 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultDebugger.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultDebugger.java
@@ -52,7 +52,7 @@ public class DefaultDebugger extends ServiceSupport 
implements Debugger, CamelCo
 
     private final EventNotifier debugEventNotifier = new DebugEventNotifier();
     private final List<BreakpointConditions> breakpoints = new 
CopyOnWriteArrayList<>();
-    private final int maxConcurrentSingleSteps = 1;
+    private final int maxConcurrentSingleSteps = 100;
     private final Map<String, Breakpoint> singleSteps = new 
HashMap<>(maxConcurrentSingleSteps);
     private CamelContext camelContext;
 
@@ -217,7 +217,7 @@ public class DefaultDebugger extends ServiceSupport 
implements Debugger, CamelCo
     @Override
     public boolean beforeProcess(Exchange exchange, Processor processor, 
NamedNode definition) {
         // is the exchange in single step mode?
-        Breakpoint singleStep = singleSteps.get(exchange.getExchangeId());
+        Breakpoint singleStep = getSingleStepBreakpoint(exchange);
         if (singleStep != null) {
             onBeforeProcess(exchange, processor, definition, singleStep);
             return true;
@@ -241,7 +241,7 @@ public class DefaultDebugger extends ServiceSupport 
implements Debugger, CamelCo
     @Override
     public boolean afterProcess(Exchange exchange, Processor processor, 
NamedNode definition, long timeTaken) {
         // is the exchange in single step mode?
-        Breakpoint singleStep = singleSteps.get(exchange.getExchangeId());
+        Breakpoint singleStep = getSingleStepBreakpoint(exchange);
         if (singleStep != null) {
             onAfterProcess(exchange, processor, definition, timeTaken, 
singleStep);
             return true;
@@ -265,7 +265,7 @@ public class DefaultDebugger extends ServiceSupport 
implements Debugger, CamelCo
     @Override
     public boolean onEvent(Exchange exchange, ExchangeEvent event) {
         // is the exchange in single step mode?
-        Breakpoint singleStep = singleSteps.get(exchange.getExchangeId());
+        Breakpoint singleStep = getSingleStepBreakpoint(exchange);
         if (singleStep != null) {
             onEvent(exchange, event, singleStep);
             return true;
@@ -317,6 +317,18 @@ public class DefaultDebugger extends ServiceSupport 
implements Debugger, CamelCo
         }
     }
 
+    private Breakpoint getSingleStepBreakpoint(Exchange exchange) {
+        Breakpoint answer = singleSteps.get(exchange.getExchangeId());
+        if (answer == null) {
+            // we may step into an EIP such as split so check via correlation 
id (parent exchange)
+            String id = 
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
+            if (id != null) {
+                answer = singleSteps.get(id);
+            }
+        }
+        return answer;
+    }
+
     private boolean matchConditions(
             Exchange exchange, Processor processor, NamedNode definition, 
BreakpointConditions breakpoint, boolean before) {
         for (Condition condition : breakpoint.getConditions()) {
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index f9514de06a1..3fd2143fdd8 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -1278,6 +1278,10 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
         }
         // remove the strategy using this processor as the key
         map.remove(this);
+        // and remove map if its empty
+        if (map.isEmpty()) {
+            exchange.removeProperty(ExchangePropertyKey.AGGREGATION_STRATEGY);
+        }
     }
 
     /**
diff --git 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/Debug.java
 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/Debug.java
index 70fdf5b6427..f6c2b4faa21 100644
--- 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/Debug.java
+++ 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/Debug.java
@@ -219,7 +219,7 @@ public class Debug extends Run {
                             logUpdated.set(true);
                         }
                     }
-                    sendDebugCommand(spawnPid, "step", line, null);
+                    sendDebugCommand(spawnPid, "step", null);
                 }
                 // user have pressed ENTER so continue
                 waitForUser.set(false);
@@ -309,7 +309,7 @@ public class Debug extends Run {
         return 0;
     }
 
-    private void sendDebugCommand(long pid, String command, String argument, 
String breakpoint) {
+    private void sendDebugCommand(long pid, String command, String breakpoint) 
{
         // ensure output file is deleted before executing action
         File outputFile = getOutputFile(Long.toString(pid));
         FileUtil.deleteFile(outputFile);
@@ -319,14 +319,6 @@ public class Debug extends Run {
         if (command != null) {
             root.put("command", command);
         }
-        if (argument != null && !argument.isBlank()) {
-            if ("i".equals(argument)) {
-                argument = "into";
-            } else if ("o".equals(argument)) {
-                argument = "over";
-            }
-            root.put("argument", argument);
-        }
         if (breakpoint != null) {
             root.put("breakpoint", breakpoint);
         }
@@ -479,7 +471,7 @@ public class Debug extends Run {
                     }
                 }
 
-                String msg = "    Breakpoint suspended. Press ENTER to 
continue (i = step into (default), o = step over).";
+                String msg = "    Breakpoint suspended. Press ENTER to 
continue.";
                 if (loggingColor) {
                     
AnsiConsole.out().println(Ansi.ansi().a(Ansi.Attribute.INTENSITY_BOLD).a(msg).reset());
                 } else {

Reply via email to