This is an automated email from the ASF dual-hosted git repository.

jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new e8337b9c2e ARTEMIS-3933 - ScaleDown NPE on DLA resources with multiple 
destinations
     new 451514f915 This closes #4175
e8337b9c2e is described below

commit e8337b9c2e7c8d7f3028dc018a069542d4b90dbc
Author: AntonRoskvist <[email protected]>
AuthorDate: Thu Aug 11 20:29:59 2022 +0200

    ARTEMIS-3933 - ScaleDown NPE on DLA resources with multiple destinations
---
 .../artemis/core/server/impl/ScaleDownHandler.java |  8 ++--
 .../tests/integration/server/ScaleDownTest.java    | 47 ++++++++++++++++++++++
 2 files changed, 52 insertions(+), 3 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
index ec12f74f35..3dabea54d0 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
@@ -576,7 +576,7 @@ public class ScaleDownHandler {
 
             MessageReference initialRef = null;
             for (int i = 0; i < numberOfScans; i++) {
-               logger.debug("iterating on queue " + queue + " while looking 
for reference " + reference);
+               logger.debug("Iterating on queue " + queue + " while looking 
for reference " + reference);
                memoryIterator = queue.iterator();
 
                while (memoryIterator.hasNext()) {
@@ -610,8 +610,10 @@ public class ScaleDownHandler {
 
          // if we reached two iterations without finding anything.. we just go 
away by cleaning everything up
          lastRef = null;
-         memoryIterator.close();
-         memoryIterator = null;
+         if (memoryIterator != null) {
+            memoryIterator.close();
+            memoryIterator = null;
+         }
 
          return false;
       }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java
index 9b4f1df545..9101df875f 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.tests.integration.server;
 
+import javax.management.MBeanServer;
 import java.lang.reflect.Field;
 import java.net.URI;
 import java.util.Arrays;
@@ -28,6 +29,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
@@ -36,6 +38,7 @@ import 
org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientProducer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
 import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
 import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
@@ -48,6 +51,7 @@ import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancing
 import org.apache.activemq.artemis.core.server.impl.QueueImpl;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import 
org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
+import 
org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
@@ -796,4 +800,47 @@ public class ScaleDownTest extends ClusterTestBase {
 
       consumer.close();
    }
+
+   @Test
+   public void testScaleDownPagedMessageWithMultipleAutoCreatedDLAResources() 
throws Exception {
+      final SimpleString dla = new SimpleString("DLA");
+      final SimpleString qName = new SimpleString("Q");
+      final SimpleString adName = new SimpleString("ADDR");
+      final String sampleText = "Put me on DLQ";
+      final int messageCount = 10;
+      final int numQueues = 2;
+
+      AddressSettings addressSettings = new 
AddressSettings().setDeadLetterAddress(dla).setAutoCreateDeadLetterResources(true).setDeadLetterQueuePrefix(dla);
+      AddressSettings dlaAddressSettings = new 
AddressSettings().setDeadLetterAddress(dla).setMaxSizeBytes(200L).setAutoCreateQueues(true);
+
+      servers[0].getAddressSettingsRepository().addMatch( "#", 
addressSettings);
+      servers[0].getAddressSettingsRepository().addMatch(dla.toString(), 
dlaAddressSettings);
+
+      ClientSessionFactory sf = sfs[0];
+      ClientSession session = addClientSession(sf.createSession(true, true));
+      MBeanServer mbeanServer = servers[0].getMBeanServer();
+
+      for (int i = 0; i < numQueues; i++) {
+         SimpleString curAddr = adName.concat(String.valueOf(i));
+         SimpleString curQ = qName.concat(String.valueOf(i));
+         SimpleString dlq = dla.concat(curAddr);
+
+         session.createQueue(new 
QueueConfiguration(curQ).setAddress(curAddr).setDurable(true));
+         ClientProducer producer = session.createProducer(curAddr);
+
+         for (int p = 0; p < messageCount; p++) {
+            producer.send(createTextMessage(session, sampleText));
+         }
+
+         QueueControl queueControl = 
ManagementControlHelper.createQueueControl(curAddr, curQ, mbeanServer);
+         Assert.assertEquals(messageCount, 
queueControl.sendMessagesToDeadLetterAddress(null));
+         Assert.assertEquals(0, queueControl.getMessageCount());
+         
Wait.assertTrue(servers[0].locateQueue(dlq).getPagingStore()::isPaging);
+      }
+
+      
servers[0].getActiveMQServerControl().scaleDown(servers[0].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors().get(0));
+      Wait.assertEquals((messageCount * numQueues), () -> 
servers[1].getTotalMessageCount());
+
+   }
+
 }

Reply via email to