This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 8078dd098c ARTEMIS-4171 Messages leaking thorugh AMQP Delivery
8078dd098c is described below

commit 8078dd098ccc38cd57eefa3e94ba1dfe50c6865b
Author: Clebert Suconic <[email protected]>
AuthorDate: Mon Feb 20 13:18:18 2023 -0500

    ARTEMIS-4171 Messages leaking thorugh AMQP Delivery
    
    there are two leaks here:
    
    * QueueImpl::delivery might create a new iterator if a delivery happens 
right after a consumer was removed, and that iterator might belog to a consumer 
that was already closed
                 as a result of that, the iterator may leak messages and hold 
references until a reboot is done. I have seen scenarios where messages would 
not be dleivered because of this.
    
    * ProtonTransaction holding references: the last transaction might hold 
messages in the memory longer than expected. In tests I have performed the 
messages were accumulating in memory. and I cleared it here.
---
 .../artemis/utils/collections/LinkedListImpl.java  | 13 +++-
 .../amqp/proton/AMQPConnectionContext.java         | 11 +--
 .../amqp/proton/ProtonServerSenderContext.java     |  4 +-
 .../proton/transaction/ProtonTransactionImpl.java  | 42 +++++++----
 .../activemq/artemis/core/server/Consumer.java     |  4 +
 .../core/server/impl/QueueConsumersImpl.java       |  2 +-
 .../artemis/core/server/impl/QueueImpl.java        | 21 ++++--
 .../core/server/impl/ServerConsumerImpl.java       | 16 +++-
 .../artemis/tests/leak/ConnectionLeakTest.java     | 86 ++++++++++++++++++++--
 .../artemis/tests/leak/LinkedListMemoryTest.java   | 71 ++++++++++++++++++
 .../artemis/tests/leak/MemoryAssertions.java       |  6 +-
 11 files changed, 231 insertions(+), 45 deletions(-)

diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
index 5e36d333d2..121b5ba924 100644
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
@@ -350,6 +350,9 @@ public class LinkedListImpl<E> implements LinkedList<E> {
 
    @Override
    public LinkedListIterator<E> iterator() {
+      if (logger.isTraceEnabled()) {
+         logger.trace("Creating new iterator at", new Exception("trace 
location"));
+      }
       return new Iterator();
    }
 
@@ -434,6 +437,9 @@ public class LinkedListImpl<E> implements LinkedList<E> {
    }
 
    private synchronized void removeIter(Iterator iter) {
+      if (logger.isTraceEnabled()) {
+         logger.trace("Removing iterator at", new Exception("trace location"));
+      }
       for (int i = 0; i < numIters; i++) {
          if (iter == iters[i]) {
             iters[i] = null;
@@ -449,8 +455,10 @@ public class LinkedListImpl<E> implements LinkedList<E> {
             if (numIters >= INITIAL_ITERATOR_ARRAY_SIZE && numIters == 
iters.length / 2) {
                resize(numIters);
             }
-
             nextIndex--;
+            if (nextIndex < iters.length) {
+               iters[nextIndex] = null;
+            }
 
             return;
          }
@@ -515,8 +523,7 @@ public class LinkedListImpl<E> implements LinkedList<E> {
       }
    }
 
-   private class Iterator implements LinkedListIterator<E> {
-
+   public class Iterator implements LinkedListIterator<E> {
       Node<E> last;
 
       Node<E> current = head.next;
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index a81f0d8c20..45ab664053 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -743,14 +743,9 @@ public class AMQPConnectionContext extends 
ProtonInitializable implements EventH
          }
       }
 
-      /// we have to perform the link.close after the linkContext.close is 
finished.
-      // linkeContext.close will perform a few executions on the netty loop,
-      // this has to come next
-      runLater(() -> {
-         link.close();
-         link.free();
-         flush();
-      });
+      link.close();
+      link.free();
+      flush();
    }
 
    @Override
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 1068e5c044..312b9bda8d 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -370,7 +370,9 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
       OperationContext oldContext = sessionSPI.recoverContext();
 
       try {
-         Message message = ((MessageReference) 
delivery.getContext()).getMessage();
+         MessageReference reference = (MessageReference) delivery.getContext();
+         Message message = reference != null ? reference.getMessage() : null;
+
          DeliveryState remoteState = delivery.getRemoteState();
 
          if (remoteState != null && remoteState.getType() == 
DeliveryStateType.Accepted) {
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java
index 123dbb5d9b..0e3c4e63ea 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java
@@ -45,26 +45,38 @@ public class ProtonTransactionImpl extends TransactionImpl {
       deliveries have been settled.  We also need to ensure we are settling on 
the correct link.  Hence why we keep a ref
       to the ProtonServerSenderContext here.
    */
-   private final Map<MessageReference, Pair<Delivery, 
ProtonServerSenderContext>> deliveries = new HashMap<>();
+   final Map<MessageReference, Pair<Delivery, ProtonServerSenderContext>> 
deliveries = new HashMap<>();
 
    private boolean discharged;
 
+   private static class TXOperations extends TransactionOperationAbstract {
+      final ProtonTransactionImpl protonTransaction;
+      final AMQPConnectionContext connection;
+
+      TXOperations(AMQPConnectionContext connection, ProtonTransactionImpl tx) 
{
+         this.protonTransaction = tx;
+         this.connection = connection;
+      }
+
+      @Override
+      public void afterCommit(Transaction tx) {
+         super.afterCommit(tx);
+         connection.runNow(() -> {
+            // Settle all unsettled deliveries if commit is successful
+            for (Pair<Delivery, ProtonServerSenderContext> p : 
protonTransaction.deliveries.values()) {
+               if (!p.getA().isSettled())
+                  p.getB().settle(p.getA());
+            }
+            connection.flush();
+            protonTransaction.deliveries.forEach((a, b) -> 
b.getA().setContext(null));
+            protonTransaction.deliveries.clear();
+         });
+      }
+   }
+
    public ProtonTransactionImpl(final Xid xid, final StorageManager 
storageManager, final int timeoutSeconds, final AMQPConnectionContext 
connection) {
       super(xid, storageManager, timeoutSeconds);
-      addOperation(new TransactionOperationAbstract() {
-         @Override
-         public void afterCommit(Transaction tx) {
-            super.afterCommit(tx);
-            connection.runNow(() -> {
-               // Settle all unsettled deliveries if commit is successful
-               for (Pair<Delivery, ProtonServerSenderContext> p : 
deliveries.values()) {
-                  if (!p.getA().isSettled())
-                     p.getB().settle(p.getA());
-               }
-               connection.flush();
-            });
-         }
-      });
+      addOperation(new TXOperations(connection, this));
    }
 
    @Override
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
index f5178da111..7db22175da 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
@@ -53,6 +53,10 @@ public interface Consumer extends PriorityAware {
    default void promptDelivery() {
    }
 
+   default boolean isClosed() {
+      return false;
+   }
+
    /**
     * This will proceed with the actual delivery.
     * Notice that handle should hold a readLock and proceedDelivery should 
release the readLock
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
index e32bbccac6..7f16ceea3a 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
@@ -93,9 +93,9 @@ public class QueueConsumersImpl<T extends PriorityAware> 
implements QueueConsume
 
    @Override
    public boolean remove(T t) {
-      iterator.removed(t);
       boolean result = consumers.remove(t);
       if (result) {
+         iterator.removed(t);
          iterator.update(consumers.resettableIterator());
          if (consumers.isEmpty()) {
             reset();
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 2aeb364588..738620b173 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -380,10 +380,11 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
       boolean foundRef = false;
 
       synchronized (this) {
-         Iterator<MessageReference> iter = messageReferences.iterator();
-         while (iter.hasNext()) {
-            foundRef = true;
-            out.println("reference = " + iter.next());
+         try (LinkedListIterator<MessageReference> iter = 
messageReferences.iterator()) {
+            while (iter.hasNext()) {
+               foundRef = true;
+               out.println("reference = " + iter.next());
+            }
          }
       }
 
@@ -1483,7 +1484,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
       logger.debug("Removing consumer {}", consumer);
 
       try (ArtemisCloseable metric = measureCritical(CRITICAL_CONSUMER)) {
-         synchronized (this) {
+         synchronized (QueueImpl.this) {
 
             boolean consumerRemoved = false;
             for (ConsumerHolder holder : consumers) {
@@ -3060,7 +3061,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
          MessageReference ref;
          Consumer handledconsumer = null;
 
-         synchronized (this) {
+         synchronized (QueueImpl.this) {
 
             if (queueDestroyed) {
                if (messageReferences.size() == 0) {
@@ -3094,6 +3095,14 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
             Consumer consumer = holder.consumer;
             Consumer groupConsumer = null;
 
+            // we remove the consumerHolder when the Consumer is closed
+            // however the QueueConsumerIterator may hold a reference until 
the reset is called, which
+            // could happen a little later.
+            if (consumer.isClosed()) {
+               deliverAsync(true);
+               return false;
+            }
+
             if (holder.iter == null) {
                holder.iter = messageReferences.iterator();
             }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index a13d362af1..d4c7e32272 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -163,6 +163,11 @@ public class ServerConsumerImpl implements ServerConsumer, 
ReadyListener {
 
    private boolean isClosed = false;
 
+   @Override
+   public boolean isClosed() {
+      return isClosed;
+   }
+
    ServerConsumerMetrics metrics = new ServerConsumerMetrics();
 
 
@@ -618,11 +623,14 @@ public class ServerConsumerImpl implements 
ServerConsumer, ReadyListener {
          server.callBrokerConsumerPlugins(plugin -> 
plugin.afterCloseConsumer(this, failed));
       }
 
-      protocolContext = null;
+      messageQueue.getExecutor().execute(() -> {
+         protocolContext = null;
 
-      callback = null;
+         callback = null;
+
+         session = null;
+      });
 
-      session = null;
    }
 
    private void addLingerRefs() throws Exception {
@@ -1116,7 +1124,7 @@ public class ServerConsumerImpl implements 
ServerConsumer, ReadyListener {
     */
    @Override
    public String toString() {
-      return "ServerConsumerImpl [id=" + id + ", filter=" + filter + ", 
binding=" + binding + "]";
+      return "ServerConsumerImpl [id=" + id + ", filter=" + filter + ", 
binding=" + binding + ", closed=" + isClosed + "]";
    }
 
    @Override
diff --git 
a/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/ConnectionLeakTest.java
 
b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/ConnectionLeakTest.java
index 9cbcda36f6..1af125e141 100644
--- 
a/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/ConnectionLeakTest.java
+++ 
b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/ConnectionLeakTest.java
@@ -24,19 +24,23 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
-
 import java.lang.invoke.MethodHandles;
 
+import io.github.checkleak.core.CheckLeak;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
 import org.apache.activemq.artemis.core.server.impl.ServerStatus;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.tests.util.CFUtil;
 import org.apache.activemq.artemis.utils.Wait;
-import io.github.checkleak.core.CheckLeak;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
+import org.apache.qpid.proton.engine.impl.DeliveryImpl;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Assume;
@@ -81,7 +85,7 @@ public class ConnectionLeakTest extends ActiveMQTestBase {
    @Override
    @Before
    public void setUp() throws Exception {
-      server = createServer(true, createDefaultConfig(1, true));
+      server = createServer(false, createDefaultConfig(1, true));
       server.getConfiguration().setJournalPoolFiles(4).setJournalMinFiles(2);
       server.start();
    }
@@ -102,6 +106,9 @@ public class ConnectionLeakTest extends ActiveMQTestBase {
    }
 
    private void doTest(String protocol) throws Exception {
+      CheckLeak checkLeak = new CheckLeak();
+      // Some protocols may create ServerConsumers
+      int originalConsumers = 
checkLeak.getAllObjects(ServerConsumerImpl.class).length;
       int REPEATS = 100;
       int MESSAGES = 20;
       basicMemoryAsserts();
@@ -143,12 +150,17 @@ public class ConnectionLeakTest extends ActiveMQTestBase {
                      targetProducer.send(m);
                   }
                   Assert.assertNull(sourceConsumer.receiveNoWait());
+                  consumerSession.commit();
+
+                  Wait.assertTrue(() -> validateClosedConsumers(checkLeak));
                }
-               consumerSession.commit();
             }
          }
       }
 
+      assertMemory(new CheckLeak(), 0, ServerConsumerImpl.class.getName());
+
+
       // this is just to drain the messages
       try (Connection targetConnection = cf.createConnection(); Connection 
consumerConnection = cf.createConnection()) {
          Session targetSession = targetConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
@@ -160,6 +172,9 @@ public class ConnectionLeakTest extends ActiveMQTestBase {
          }
 
          Assert.assertNull(consumer.receiveNoWait());
+         assertMemory(new CheckLeak(), 0, DeliveryImpl.class.getName());
+         Wait.assertTrue(() -> validateClosedConsumers(checkLeak));
+         consumer = null;
       }
 
       Queue sourceQueue = server.locateQueue("source");
@@ -173,6 +188,65 @@ public class ConnectionLeakTest extends ActiveMQTestBase {
       }
 
       basicMemoryAsserts();
+   }
+
+   @Test
+   public void testCheckIteratorsAMQP() throws Exception {
+      testCheckIterators("AMQP");
+   }
+
+   @Test
+   public void testCheckIteratorsOpenWire() throws Exception {
+      testCheckIterators("OPENWIRE");
+   }
 
+   @Test
+   public void testCheckIteratorsCORE() throws Exception {
+      testCheckIterators("CORE");
+   }
+
+   public void testCheckIterators(String protocol) throws Exception {
+      CheckLeak checkLeak = new CheckLeak();
+
+      String queueName = getName();
+
+      Queue queue = server.createQueue(new 
QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST));
+
+      ConnectionFactory cf = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:61616");
+      for (int i = 0; i < 10; i++) {
+         Connection connection = cf.createConnection();
+         connection.start();
+         for (int j = 0; j < 10; j++) {
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            MessageProducer producer = 
session.createProducer(session.createQueue(queueName));
+            producer.send(session.createTextMessage("test"));
+            session.commit();
+            session.close();
+         }
+
+         for (int j = 0; j < 10; j++) {
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            MessageConsumer consumer = 
session.createConsumer(session.createQueue(queueName));
+            consumer.receiveNoWait(); // it doesn't matter if it received or 
not, just doing something in the queue to kick the iterators
+            session.commit();
+         }
+         connection.close();
+
+         assertMemory(checkLeak, 0, 1, 1, ServerConsumerImpl.class.getName());
+         assertMemory(checkLeak, 0, 2, 1, 
LinkedListImpl.Iterator.class.getName());
+      }
+   }
+
+
+   private boolean validateClosedConsumers(CheckLeak checkLeak) throws 
Exception {
+      Object[] objecs = checkLeak.getAllObjects(ServerConsumerImpl.class);
+      for (Object obj : objecs) {
+         ServerConsumerImpl consumer = (ServerConsumerImpl) obj;
+         if (consumer.isClosed()) {
+            logger.info("References to closedConsumer {}\n{}", consumer, 
checkLeak.exploreObjectReferences(3, 1, true, consumer));
+            return false;
+         }
+      }
+      return true;
    }
 }
\ No newline at end of file
diff --git 
a/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/LinkedListMemoryTest.java
 
b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/LinkedListMemoryTest.java
new file mode 100644
index 0000000000..2663d94d0c
--- /dev/null
+++ 
b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/LinkedListMemoryTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.leak;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Random;
+
+import io.github.checkleak.core.CheckLeak;
+import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LinkedListMemoryTest {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   Random random = new Random();
+
+   CheckLeak checkLeak = new CheckLeak();
+
+   public int randomInt(int x, int y) {
+
+      int randomNumber = random.nextInt(y - x + 1) + x;
+
+      return randomNumber;
+   }
+
+   @Test
+   public void testRemoveIteratorsRandom() throws Exception {
+      LinkedListImpl<String> linkedList = new LinkedListImpl<>((a, b) -> 
a.compareTo(b));
+
+      linkedList.addSorted("Test");
+
+      int iterators = 100;
+      ArrayList<LinkedListIterator> listIerators = new ArrayList();
+
+      for (int i = 0; i < iterators; i++) {
+         listIerators.add(linkedList.iterator());
+      }
+
+      int countRemoved = 0;
+
+      while (listIerators.size() > 0) {
+         int removeElement = randomInt(0, listIerators.size() - 1);
+         countRemoved++;
+         LinkedListIterator toRemove = listIerators.remove(removeElement);
+         toRemove.close();
+         toRemove = null;
+         MemoryAssertions.assertMemory(checkLeak, iterators - countRemoved, 
LinkedListImpl.Iterator.class.getName());
+      }
+      MemoryAssertions.assertMemory(checkLeak, 0, 
LinkedListImpl.Iterator.class.getName());
+   }
+}
diff --git 
a/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java
 
b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java
index 3e5eac93ea..47e4cab2b9 100644
--- 
a/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java
+++ 
b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java
@@ -49,6 +49,10 @@ public class MemoryAssertions {
    }
 
    public static void assertMemory(CheckLeak checkLeak, int maxExpected, 
String clazz) throws Exception {
+      assertMemory(checkLeak, maxExpected, 10, 10, clazz);
+   }
+
+   public static void assertMemory(CheckLeak checkLeak, int maxExpected, int 
maxLevel, int maxObjects, String clazz) throws Exception {
       Wait.waitFor(() -> checkLeak.getAllObjects(clazz).length <= maxExpected, 
5000, 100);
 
       Object[] objects = checkLeak.getAllObjects(clazz);
@@ -56,7 +60,7 @@ public class MemoryAssertions {
          for (Object obj : objects) {
             logger.warn("Object {} still in the heap", obj);
          }
-         String report = checkLeak.exploreObjectReferences(10, 10, true, 
objects);
+         String report = checkLeak.exploreObjectReferences(maxLevel, 
maxObjects, true, objects);
          logger.info(report);
 
          Assert.fail("Class " + clazz + " has leaked " + objects.length + " 
objects\n" + report);

Reply via email to