This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new b11945e0c7 ARTEMIS-4410 - process deliveries before removing consumer
on session close, ensure strict order for a single consumer
b11945e0c7 is described below
commit b11945e0c7376db01f25aaf1b61934bda95cb80e
Author: Gary Tully <[email protected]>
AuthorDate: Wed Aug 30 17:52:33 2023 +0100
ARTEMIS-4410 - process deliveries before removing consumer on session
close, ensure strict order for a single consumer
---
.../core/server/impl/ServerConsumerImpl.java | 5 +-
.../tests/integration/client/JMSOrderTest.java | 19 ++++--
.../PrefetchRedeliveryCountOpenwireTest.java | 75 +++++++++++++++++++++-
3 files changed, 92 insertions(+), 7 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index b720baf64c..fa389b0912 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -573,8 +573,6 @@ public class ServerConsumerImpl implements ServerConsumer,
ReadyListener {
del.finish();
}
- removeItself();
-
List<MessageReference> refs = cancelRefs(failed, false, null);
Transaction tx = new TransactionImpl(storageManager);
@@ -587,6 +585,9 @@ public class ServerConsumerImpl implements ServerConsumer,
ReadyListener {
tx.rollback();
+ // started is false, leaving remove till after cancel ensures order for
a single exclusive consumer
+ removeItself();
+
addLingerRefs();
if (!browseOnly) {
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java
index 333bcda901..5e642d2562 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java
@@ -35,6 +35,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
@@ -50,11 +53,12 @@ import static
org.apache.activemq.artemis.tests.util.CFUtil.createConnectionFact
public class JMSOrderTest extends JMSTestBase {
String protocol;
-
+ boolean exclusive;
ConnectionFactory protocolCF;
- public JMSOrderTest(String protocol) {
+ public JMSOrderTest(String protocol, boolean exclusive) {
this.protocol = protocol;
+ this.exclusive = exclusive;
}
@Before
@@ -62,9 +66,16 @@ public class JMSOrderTest extends JMSTestBase {
protocolCF = createConnectionFactory(protocol, "tcp://localhost:61616");
}
- @Parameterized.Parameters(name = "protocol={0}")
+ @Parameterized.Parameters(name = "protocol={0}&exclusive={1}")
public static Collection getParameters() {
- return Arrays.asList(new Object[][]{{"AMQP"}, {"OPENWIRE"}, {"CORE"}});
+ return Arrays.asList(new Object[][]{{"AMQP", true}, {"AMQP", false},
{"OPENWIRE", true}, {"OPENWIRE", false}, {"CORE", true}, {"CORE", false}});
+ }
+
+ @Override
+ protected void extraServerConfig(ActiveMQServer server) {
+ if (exclusive) {
+ server.getConfiguration().getAddressSettings().put("#", new
AddressSettings().setAutoCreateQueues(true).setAutoCreateAddresses(true).setDeadLetterAddress(new
SimpleString("ActiveMQ.DLQ")).setDefaultExclusiveQueue(true));
+ }
}
protected void sendToAmqQueue(int count) throws Exception {
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java
index 5fef3c4a9c..b5dab5f344 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java
@@ -26,11 +26,14 @@ import javax.jms.TextMessage;
import java.util.Map;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.transport.failover.FailoverTransport;
import org.junit.Assert;
import org.junit.Test;
@@ -46,7 +49,9 @@ public class PrefetchRedeliveryCountOpenwireTest extends
OpenWireTestBase {
protected void configureAddressSettings(Map<String, AddressSettings>
addressSettingsMap) {
super.configureAddressSettings(addressSettingsMap);
// force send to dlq early
- addressSettingsMap.get("#").setMaxDeliveryAttempts(2);
+ addressSettingsMap.put("exampleQueue", new
AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(new
SimpleString("ActiveMQ.DLQ")).setAutoCreateAddresses(true).setMaxDeliveryAttempts(2));
+ // force send to dlq late
+ addressSettingsMap.put("exampleQueueTwo", new
AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(new
SimpleString("ActiveMQ.DLQ")).setAutoCreateAddresses(true).setMaxDeliveryAttempts(4000));
}
@Test(timeout = 60_000)
@@ -93,4 +98,72 @@ public class PrefetchRedeliveryCountOpenwireTest extends
OpenWireTestBase {
}
}
}
+
+ @Test(timeout = 60_000)
+ public void testExclusiveConsumerOrderOnReconnectionLargePrefetch() throws
Exception {
+ Connection exConn = null;
+
+ SimpleString durableQueue = new SimpleString("exampleQueueTwo");
+ this.server.createQueue(new
QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST).setExclusive(true));
+
+ try {
+ ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
+ exFact.setWatchTopicAdvisories(false);
+
+ ActiveMQPrefetchPolicy prefetchPastMaxDeliveriesInLoop = new
ActiveMQPrefetchPolicy();
+ prefetchPastMaxDeliveriesInLoop.setAll(2000);
+ exFact.setPrefetchPolicy(prefetchPastMaxDeliveriesInLoop);
+
+ RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+ redeliveryPolicy.setMaximumRedeliveries(4000);
+ exFact.setRedeliveryPolicy(redeliveryPolicy);
+
+ Queue queue = new ActiveMQQueue("exampleQueueTwo");
+
+ exConn = exFact.createConnection();
+
+ exConn.start();
+
+ Session session = exConn.createSession(true,
Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer = session.createProducer(queue);
+
+ TextMessage message = session.createTextMessage("This is a text
message");
+
+ int numMessages = 2000;
+ for (int i = 0; i < numMessages; i++) {
+ message.setIntProperty("SEQ", i);
+ producer.send(message);
+ }
+ session.commit();
+ exConn.close();
+
+ final int batch = 100;
+ for (int i = 0; i < numMessages; i += batch) {
+ // connection per batch
+ exConn = exFact.createConnection();
+ exConn.start();
+
+ session = exConn.createSession(true, Session.SESSION_TRANSACTED);
+
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ TextMessage messageReceived = null;
+ for (int j = 0; j < batch; j++) { // a small batch
+ messageReceived = (TextMessage) messageConsumer.receive(5000);
+ Assert.assertNotNull("null @ i=" + i, messageReceived);
+ Assert.assertEquals(i + j,
messageReceived.getIntProperty("SEQ"));
+
+ assertEquals("This is a text message",
messageReceived.getText());
+ }
+ session.commit();
+
+
((FailoverTransport)((org.apache.activemq.ActiveMQConnection)exConn).getTransport().narrow(FailoverTransport.class)).stop();
+ exConn.close();
+ }
+ } finally {
+ if (exConn != null) {
+ exConn.close();
+ }
+ }
+ }
}