This is an automated email from the ASF dual-hosted git repository.
jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new e8337b9c2e ARTEMIS-3933 - ScaleDown NPE on DLA resources with multiple
destinations
new 451514f915 This closes #4175
e8337b9c2e is described below
commit e8337b9c2e7c8d7f3028dc018a069542d4b90dbc
Author: AntonRoskvist <[email protected]>
AuthorDate: Thu Aug 11 20:29:59 2022 +0200
ARTEMIS-3933 - ScaleDown NPE on DLA resources with multiple destinations
---
.../artemis/core/server/impl/ScaleDownHandler.java | 8 ++--
.../tests/integration/server/ScaleDownTest.java | 47 ++++++++++++++++++++++
2 files changed, 52 insertions(+), 3 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
index ec12f74f35..3dabea54d0 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
@@ -576,7 +576,7 @@ public class ScaleDownHandler {
MessageReference initialRef = null;
for (int i = 0; i < numberOfScans; i++) {
- logger.debug("iterating on queue " + queue + " while looking
for reference " + reference);
+ logger.debug("Iterating on queue " + queue + " while looking
for reference " + reference);
memoryIterator = queue.iterator();
while (memoryIterator.hasNext()) {
@@ -610,8 +610,10 @@ public class ScaleDownHandler {
// if we reached two iterations without finding anything.. we just go
away by cleaning everything up
lastRef = null;
- memoryIterator.close();
- memoryIterator = null;
+ if (memoryIterator != null) {
+ memoryIterator.close();
+ memoryIterator = null;
+ }
return false;
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java
index 9b4f1df545..9101df875f 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.tests.integration.server;
+import javax.management.MBeanServer;
import java.lang.reflect.Field;
import java.net.URI;
import java.util.Arrays;
@@ -28,6 +29,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
@@ -36,6 +38,7 @@ import
org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
@@ -48,6 +51,7 @@ import
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancing
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import
org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
+import
org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
@@ -796,4 +800,47 @@ public class ScaleDownTest extends ClusterTestBase {
consumer.close();
}
+
+ @Test
+ public void testScaleDownPagedMessageWithMultipleAutoCreatedDLAResources()
throws Exception {
+ final SimpleString dla = new SimpleString("DLA");
+ final SimpleString qName = new SimpleString("Q");
+ final SimpleString adName = new SimpleString("ADDR");
+ final String sampleText = "Put me on DLQ";
+ final int messageCount = 10;
+ final int numQueues = 2;
+
+ AddressSettings addressSettings = new
AddressSettings().setDeadLetterAddress(dla).setAutoCreateDeadLetterResources(true).setDeadLetterQueuePrefix(dla);
+ AddressSettings dlaAddressSettings = new
AddressSettings().setDeadLetterAddress(dla).setMaxSizeBytes(200L).setAutoCreateQueues(true);
+
+ servers[0].getAddressSettingsRepository().addMatch( "#",
addressSettings);
+ servers[0].getAddressSettingsRepository().addMatch(dla.toString(),
dlaAddressSettings);
+
+ ClientSessionFactory sf = sfs[0];
+ ClientSession session = addClientSession(sf.createSession(true, true));
+ MBeanServer mbeanServer = servers[0].getMBeanServer();
+
+ for (int i = 0; i < numQueues; i++) {
+ SimpleString curAddr = adName.concat(String.valueOf(i));
+ SimpleString curQ = qName.concat(String.valueOf(i));
+ SimpleString dlq = dla.concat(curAddr);
+
+ session.createQueue(new
QueueConfiguration(curQ).setAddress(curAddr).setDurable(true));
+ ClientProducer producer = session.createProducer(curAddr);
+
+ for (int p = 0; p < messageCount; p++) {
+ producer.send(createTextMessage(session, sampleText));
+ }
+
+ QueueControl queueControl =
ManagementControlHelper.createQueueControl(curAddr, curQ, mbeanServer);
+ Assert.assertEquals(messageCount,
queueControl.sendMessagesToDeadLetterAddress(null));
+ Assert.assertEquals(0, queueControl.getMessageCount());
+
Wait.assertTrue(servers[0].locateQueue(dlq).getPagingStore()::isPaging);
+ }
+
+
servers[0].getActiveMQServerControl().scaleDown(servers[0].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors().get(0));
+ Wait.assertEquals((messageCount * numQueues), () ->
servers[1].getTotalMessageCount());
+
+ }
+
}