This is an automated email from the ASF dual-hosted git repository. nfilotto pushed a commit to branch CAMEL-18217/suspend-messages in repository https://gitbox.apache.org/repos/asf/camel.git
commit 22afcbd041824f70e5ed66bd01dd7edf0ce82af1 Author: Nicolas Filotto <[email protected]> AuthorDate: Thu Jun 23 12:16:31 2022 +0200 CAMEL-18217: debugger - Allow to suspend messages --- camel-dependencies/pom.xml | 1 + .../camel/impl/debugger/BacklogDebugger.java | 103 +++++++++++++++++++-- .../camel/impl/engine/CamelInternalProcessor.java | 10 +- .../mbean/ManagedBacklogDebuggerMBean.java | 6 ++ core/camel-management/pom.xml | 5 + .../management/mbean/ManagedBacklogDebugger.java | 10 ++ .../camel/management/BacklogDebuggerTest.java | 47 ++++++++++ docs/user-manual/modules/ROOT/pages/debugger.adoc | 7 +- parent/pom.xml | 6 ++ 9 files changed, 179 insertions(+), 16 deletions(-) diff --git a/camel-dependencies/pom.xml b/camel-dependencies/pom.xml index 822dcd37cef..a9b2b7b1744 100644 --- a/camel-dependencies/pom.xml +++ b/camel-dependencies/pom.xml @@ -355,6 +355,7 @@ <jt400-version>11.0</jt400-version> <jta-api-1.2-version>1.2</jta-api-1.2-version> <junit-jupiter-version>5.8.2</junit-jupiter-version> + <junit-pioneer-version>1.7.1</junit-pioneer-version> <junit-toolbox-version>2.4</junit-toolbox-version> <junit-version>4.13.2</junit-version> <jxmpp-version>0.6.4</jxmpp-version> diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogDebugger.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogDebugger.java index a7e92b902c3..db6a759afde 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogDebugger.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogDebugger.java @@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; @@ -44,6 +45,7 @@ import org.apache.camel.support.CamelContextHelper; import org.apache.camel.support.MessageHelper; import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.support.service.ServiceSupport; +import org.apache.camel.util.StopWatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,6 +63,11 @@ import org.slf4j.LoggerFactory; */ public final class BacklogDebugger extends ServiceSupport { + /** + * The name of the environment variable that contains the value of the flag indicating whether Camel should suspend + * processing the messages and wait for a debugger to attach or not. + */ + public static final String SUSPEND_MODE_ENV_VAR_NAME = "CAMEL_DEBUGGER_SUSPEND"; private static final Logger LOG = LoggerFactory.getLogger(BacklogDebugger.class); private long fallbackTimeout = 300; @@ -73,6 +80,14 @@ public final class BacklogDebugger extends ServiceSupport { private final ConcurrentMap<String, NodeBreakpoint> breakpoints = new ConcurrentHashMap<>(); private final ConcurrentMap<String, SuspendedExchange> suspendedBreakpoints = new ConcurrentHashMap<>(); private final ConcurrentMap<String, BacklogTracerEventMessage> suspendedBreakpointMessages = new ConcurrentHashMap<>(); + /** + * Indicates whether the suspend mode is enabled or not. + */ + private final boolean suspendMode; + /** + * The reference to the {@code CountDownLatch} used to suspend Camel from processing the incoming messages. + */ + private final AtomicReference<CountDownLatch> suspend = new AtomicReference<>(); private volatile String singleStepExchangeId; private int bodyMaxChars = 128 * 1024; private boolean bodyIncludeStreams; @@ -103,19 +118,31 @@ public final class BacklogDebugger extends ServiceSupport { } } - private BacklogDebugger(CamelContext camelContext) { + /** + * Constructs a {@code BacklogDebugger} with the given parameters. + * + * @param camelContext the camel context + * @param suspendMode Indicates whether the suspend mode is enabled or not. If {@code true} the message processing + * is immediately suspended until the {@link #attach()} is called. + */ + private BacklogDebugger(CamelContext camelContext, boolean suspendMode) { this.camelContext = camelContext; this.debugger = new DefaultDebugger(camelContext); + this.suspendMode = suspendMode; + detach(); } /** * Creates a new backlog debugger. + * <p> + * In case the environment variable {@link #SUSPEND_MODE_ENV_VAR_NAME} has been set to {@code true}, the message + * processing is directly suspended. * * @param context Camel context * @return a new backlog debugger */ public static BacklogDebugger createDebugger(CamelContext context) { - return new BacklogDebugger(context); + return new BacklogDebugger(context, Boolean.parseBoolean(System.getenv(SUSPEND_MODE_ENV_VAR_NAME))); } /** @@ -169,6 +196,65 @@ public final class BacklogDebugger extends ServiceSupport { return singleStepExchangeId != null; } + /** + * Attach the debugger which will resume the message processing in case the <i>suspend mode</i> is active. Do + * nothing otherwise. + */ + public void attach() { + if (suspendMode) { + logger.log("A debugger has been attached"); + resumeMessageProcessing(); + } + } + + /** + * Detach the debugger which will suspend the message processing in case the <i>suspend mode</i> is active. Do + * nothing otherwise. + */ + public void detach() { + if (suspendMode) { + logger.log("Waiting for a debugger to attach"); + suspendMessageProcessing(); + } + } + + /** + * Suspend the current thread from processing the message if the <i>suspend mode</i> is active as long as the method + * {@link #attach()} is not called. Do nothing otherwise. + */ + private void suspendIfNeeded() { + final CountDownLatch countDownLatch = suspend.get(); + if (countDownLatch != null) { + logger.log("Incoming message suspended"); + try { + countDownLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + /** + * Make Camel suspend processing of the incoming messages. + */ + private void suspendMessageProcessing() { + suspend.compareAndSet(null, new CountDownLatch(1)); + } + + /** + * Resume the processing of the incoming messages. + */ + private void resumeMessageProcessing() { + for (;;) { + final CountDownLatch countDownLatch = suspend.get(); + if (countDownLatch == null) { + break; + } else if (suspend.compareAndSet(countDownLatch, null)) { + countDownLatch.countDown(); + } + } + } + public void addBreakpoint(String nodeId) { NodeBreakpoint breakpoint = breakpoints.get(nodeId); if (breakpoint == null) { @@ -454,13 +540,18 @@ public final class BacklogDebugger extends ServiceSupport { debugCounter.set(0); } - public boolean beforeProcess(Exchange exchange, Processor processor, NamedNode definition) { - return debugger.beforeProcess(exchange, processor, definition); + public StopWatch beforeProcess(Exchange exchange, Processor processor, NamedNode definition) { + suspendIfNeeded(); + if (isEnabled() && (hasBreakpoint(definition.getId()) || isSingleStepMode())) { + StopWatch watch = new StopWatch(); + debugger.beforeProcess(exchange, processor, definition); + return watch; + } + return null; } - public boolean afterProcess(Exchange exchange, Processor processor, NamedNode definition, long timeTaken) { + public void afterProcess(Exchange exchange, Processor processor, NamedNode definition, long timeTaken) { // noop - return false; } @Override diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java index 6a031a1f402..7babd956ea8 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java @@ -639,24 +639,16 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In private final BacklogDebugger backlogDebugger; private final Processor target; private final NamedNode definition; - private final String nodeId; public BacklogDebuggerAdvice(BacklogDebugger backlogDebugger, Processor target, NamedNode definition) { this.backlogDebugger = backlogDebugger; this.target = target; this.definition = definition; - this.nodeId = definition.getId(); } @Override public StopWatch before(Exchange exchange) throws Exception { - if (backlogDebugger.isEnabled() && (backlogDebugger.hasBreakpoint(nodeId) || backlogDebugger.isSingleStepMode())) { - StopWatch watch = new StopWatch(); - backlogDebugger.beforeProcess(exchange, target, definition); - return watch; - } else { - return null; - } + return backlogDebugger.beforeProcess(exchange, target, definition); } @Override diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java index b824a84c356..57c2f7573e8 100644 --- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java +++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java @@ -175,4 +175,10 @@ public interface ManagedBacklogDebuggerMBean { @ManagedOperation(description = "Returns the message history at the given node id as XML") String messageHistoryOnBreakpointAsXml(String nodeId); + + @ManagedOperation(description = "Attach the debugger") + void attach(); + + @ManagedOperation(description = "Detach the debugger") + void detach(); } diff --git a/core/camel-management/pom.xml b/core/camel-management/pom.xml index 577572d5cef..1e225b27d4c 100644 --- a/core/camel-management/pom.xml +++ b/core/camel-management/pom.xml @@ -69,6 +69,11 @@ <artifactId>awaitility</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.junit-pioneer</groupId> + <artifactId>junit-pioneer</artifactId> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-core</artifactId> diff --git a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java index 0f78390a6fc..0fb57d6fc0b 100644 --- a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java +++ b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java @@ -441,6 +441,16 @@ public class ManagedBacklogDebugger implements ManagedBacklogDebuggerMBean { return messageHistoryBuffer.toString(); } + @Override + public void attach() { + backlogDebugger.attach(); + } + + @Override + public void detach() { + backlogDebugger.detach(); + } + private String dumpExchangePropertiesAsXml(String id) { StringBuilder sb = new StringBuilder(); sb.append(" <exchangeProperties>\n"); diff --git a/core/camel-management/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java b/core/camel-management/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java index 01b8702f5d8..24a1e03f758 100644 --- a/core/camel-management/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java +++ b/core/camel-management/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java @@ -24,9 +24,11 @@ import javax.management.ObjectName; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.debugger.BacklogDebugger; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.condition.OS; +import org.junitpioneer.jupiter.SetEnvironmentVariable; import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -890,6 +892,51 @@ public class BacklogDebuggerTest extends ManagementTestSupport { assertEquals(0, nodes.size()); } + /** + * Ensure that the suspend mode works as expected. + */ + @Test + @SetEnvironmentVariable(key = BacklogDebugger.SUSPEND_MODE_ENV_VAR_NAME, value = "true") + public void testSuspendMode() throws Exception { + MBeanServer mbeanServer = getMBeanServer(); + ObjectName on = new ObjectName( + "org.apache.camel:context=" + context.getManagementName() + ",type=tracer,name=BacklogDebugger"); + assertNotNull(on); + mbeanServer.isRegistered(on); + + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(0); + mock.setSleepForEmptyTest(100); + + template.sendBody("seda:start", "Hello World"); + assertMockEndpointsSatisfied(); + + resetMocks(); + + // Attach debugger + mbeanServer.invoke(on, "attach", null, null); + + mock.expectedMessageCount(1); + + resetMocks(); + + // Detach debugger + mbeanServer.invoke(on, "detach", null, null); + + mock.expectedMessageCount(0); + mock.setSleepForEmptyTest(100); + + template.sendBody("seda:start", "Hello World 2"); + assertMockEndpointsSatisfied(); + + resetMocks(); + + // Attach debugger + mbeanServer.invoke(on, "attach", null, null); + + mock.expectedMessageCount(1); + } + @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { diff --git a/docs/user-manual/modules/ROOT/pages/debugger.adoc b/docs/user-manual/modules/ROOT/pages/debugger.adoc index 1ac971f9b30..845ed592305 100644 --- a/docs/user-manual/modules/ROOT/pages/debugger.adoc +++ b/docs/user-manual/modules/ROOT/pages/debugger.adoc @@ -90,7 +90,12 @@ which can be used to extend for custom implementations. === JMX debugger -There is also a xref:backlog-debugger.adoc[Backlog Debugger] which allows debugging from JMX. +There is also a xref:backlog-debugger.adoc[Backlog Debugger] which allows debugging from JMX that is included into `camel-debug`. + +To be able to have enough time to add your breakpoints, you could need to suspend the message processing of Camel to make sure +that you won't miss any messages. For this kind of need, you have to set the environment variable `CAMEL_DEBUGGER_SUSPEND` to `true` +within the context of your application, then the `Backlog Debugger` suspends the message processing until the JMX operation `attach` is called. Calling the JMX operation `detach` suspends again the message processing. + Several 3rd party tooling are using it: - https://hawt.io/[hawtio] uses this for its web based debugging functionality - https://marketplace.visualstudio.com/items?itemName=redhat.vscode-debug-adapter-apache-camel[VS Code Debug Adapter for Camel] diff --git a/parent/pom.xml b/parent/pom.xml index 67d70ff8e80..772cc4ed68b 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -341,6 +341,7 @@ <junit-toolbox-version>2.4</junit-toolbox-version> <junit-version>4.13.2</junit-version> <junit-jupiter-version>5.8.2</junit-jupiter-version> + <junit-pioneer-version>1.7.1</junit-pioneer-version> <jxmpp-version>0.6.4</jxmpp-version> <jython-version>2.7.2</jython-version> <jython-standalone-version>2.7.2</jython-standalone-version> @@ -3614,6 +3615,11 @@ <type>pom</type> <scope>import</scope> </dependency> + <dependency> + <groupId>org.junit-pioneer</groupId> + <artifactId>junit-pioneer</artifactId> + <version>${junit-pioneer-version}</version> + </dependency> <dependency> <groupId>org.awaitility</groupId> <artifactId>awaitility</artifactId>
