This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 07a66cf4b0c CAMEL-18844: Possible memory leak in org.apache.camel.impl.console.EventConsole 07a66cf4b0c is described below commit 07a66cf4b0c995cc2a93714255dc6068d89bb5df Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Tue Dec 27 16:18:14 2022 +0100 CAMEL-18844: Possible memory leak in org.apache.camel.impl.console.EventConsole --- .../apache/camel/impl/console/EventConsole.java | 54 ++++++++++++++-------- 1 file changed, 34 insertions(+), 20 deletions(-) diff --git a/core/camel-console/src/main/java/org/apache/camel/impl/console/EventConsole.java b/core/camel-console/src/main/java/org/apache/camel/impl/console/EventConsole.java index 57b2b9fee66..360a55c390a 100644 --- a/core/camel-console/src/main/java/org/apache/camel/impl/console/EventConsole.java +++ b/core/camel-console/src/main/java/org/apache/camel/impl/console/EventConsole.java @@ -16,11 +16,11 @@ */ package org.apache.camel.impl.console; -import java.util.ArrayDeque; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import org.apache.camel.spi.CamelEvent; import org.apache.camel.spi.Configurer; @@ -37,9 +37,9 @@ public class EventConsole extends AbstractDevConsole { @Metadata(defaultValue = "25", description = "Maximum capacity of last number of events to capture") private int capacity = 25; - private Queue<CamelEvent> events; - private Queue<CamelEvent.RouteEvent> routeEvents; - private Queue<CamelEvent.ExchangeEvent> exchangeEvents; + private BlockingQueue<CamelEvent> events; + private BlockingQueue<CamelEvent.RouteEvent> routeEvents; + private BlockingQueue<CamelEvent.ExchangeEvent> exchangeEvents; private final ConsoleEventNotifier listener = new ConsoleEventNotifier(); public EventConsole() { @@ -56,9 +56,10 @@ public class EventConsole extends AbstractDevConsole { @Override protected void doInit() throws Exception { - this.events = new ArrayDeque<>(capacity); - this.routeEvents = new ArrayDeque<>(capacity); - this.exchangeEvents = new ArrayDeque<>(capacity); + // capacity capped queue using fair to make sure events are in correct order + this.events = new ArrayBlockingQueue<>(capacity, true); + this.routeEvents = new ArrayBlockingQueue<>(capacity, true); + this.exchangeEvents = new ArrayBlockingQueue<>(capacity, true); } @Override @@ -164,21 +165,34 @@ public class EventConsole extends AbstractDevConsole { @Override public void notify(CamelEvent event) throws Exception { + // offer new event and if false, then remove head and try again + if (event instanceof CamelEvent.ExchangeEvent) { - if (exchangeEvents.size() >= capacity) { - exchangeEvents.poll(); - } - exchangeEvents.add((CamelEvent.ExchangeEvent) event); + CamelEvent.ExchangeEvent ce = (CamelEvent.ExchangeEvent) event; + boolean added; + do { + added = exchangeEvents.offer(ce); + if (!added) { + exchangeEvents.poll(); + } + } while (!added); } else if (event instanceof CamelEvent.RouteEvent) { - if (routeEvents.size() >= capacity) { - routeEvents.poll(); - } - routeEvents.add((CamelEvent.RouteEvent) event); + CamelEvent.RouteEvent re = (CamelEvent.RouteEvent) event; + boolean added; + do { + added = routeEvents.offer(re); + if (!added) { + exchangeEvents.poll(); + } + } while (!added); } else { - if (events.size() >= capacity) { - events.poll(); - } - events.offer(event); + boolean added; + do { + added = events.offer(event); + if (!added) { + exchangeEvents.poll(); + } + } while (!added); } }