This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new b11945e0c7 ARTEMIS-4410 - process deliveries before removing consumer 
on session close, ensure strict order for a single consumer
b11945e0c7 is described below

commit b11945e0c7376db01f25aaf1b61934bda95cb80e
Author: Gary Tully <[email protected]>
AuthorDate: Wed Aug 30 17:52:33 2023 +0100

    ARTEMIS-4410 - process deliveries before removing consumer on session 
close, ensure strict order for a single consumer
---
 .../core/server/impl/ServerConsumerImpl.java       |  5 +-
 .../tests/integration/client/JMSOrderTest.java     | 19 ++++--
 .../PrefetchRedeliveryCountOpenwireTest.java       | 75 +++++++++++++++++++++-
 3 files changed, 92 insertions(+), 7 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index b720baf64c..fa389b0912 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -573,8 +573,6 @@ public class ServerConsumerImpl implements ServerConsumer, 
ReadyListener {
          del.finish();
       }
 
-      removeItself();
-
       List<MessageReference> refs = cancelRefs(failed, false, null);
 
       Transaction tx = new TransactionImpl(storageManager);
@@ -587,6 +585,9 @@ public class ServerConsumerImpl implements ServerConsumer, 
ReadyListener {
 
       tx.rollback();
 
+      // started is false, leaving remove till after cancel ensures order for 
a single exclusive consumer
+      removeItself();
+
       addLingerRefs();
 
       if (!browseOnly) {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java
index 333bcda901..5e642d2562 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java
@@ -35,6 +35,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.tests.util.CFUtil;
 import org.apache.activemq.artemis.tests.util.JMSTestBase;
 import org.apache.activemq.artemis.tests.util.Wait;
@@ -50,11 +53,12 @@ import static 
org.apache.activemq.artemis.tests.util.CFUtil.createConnectionFact
 public class JMSOrderTest extends JMSTestBase {
 
    String protocol;
-
+   boolean exclusive;
    ConnectionFactory protocolCF;
 
-   public JMSOrderTest(String protocol) {
+   public JMSOrderTest(String protocol, boolean exclusive) {
       this.protocol = protocol;
+      this.exclusive = exclusive;
    }
 
    @Before
@@ -62,9 +66,16 @@ public class JMSOrderTest extends JMSTestBase {
       protocolCF = createConnectionFactory(protocol, "tcp://localhost:61616");
    }
 
-   @Parameterized.Parameters(name = "protocol={0}")
+   @Parameterized.Parameters(name = "protocol={0}&exclusive={1}")
    public static Collection getParameters() {
-      return Arrays.asList(new Object[][]{{"AMQP"}, {"OPENWIRE"}, {"CORE"}});
+      return Arrays.asList(new Object[][]{{"AMQP", true}, {"AMQP", false}, 
{"OPENWIRE", true},  {"OPENWIRE", false}, {"CORE", true}, {"CORE", false}});
+   }
+
+   @Override
+   protected void extraServerConfig(ActiveMQServer server) {
+      if (exclusive) {
+         server.getConfiguration().getAddressSettings().put("#", new 
AddressSettings().setAutoCreateQueues(true).setAutoCreateAddresses(true).setDeadLetterAddress(new
 SimpleString("ActiveMQ.DLQ")).setDefaultExclusiveQueue(true));
+      }
    }
 
    protected void sendToAmqQueue(int count) throws Exception {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java
index 5fef3c4a9c..b5dab5f344 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java
@@ -26,11 +26,14 @@ import javax.jms.TextMessage;
 import java.util.Map;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.RedeliveryPolicy;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.transport.failover.FailoverTransport;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -46,7 +49,9 @@ public class PrefetchRedeliveryCountOpenwireTest extends 
OpenWireTestBase {
    protected void configureAddressSettings(Map<String, AddressSettings> 
addressSettingsMap) {
       super.configureAddressSettings(addressSettingsMap);
       // force send to dlq early
-      addressSettingsMap.get("#").setMaxDeliveryAttempts(2);
+      addressSettingsMap.put("exampleQueue", new 
AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(new
 
SimpleString("ActiveMQ.DLQ")).setAutoCreateAddresses(true).setMaxDeliveryAttempts(2));
+      // force send to dlq late
+      addressSettingsMap.put("exampleQueueTwo", new 
AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(new
 
SimpleString("ActiveMQ.DLQ")).setAutoCreateAddresses(true).setMaxDeliveryAttempts(4000));
    }
 
    @Test(timeout = 60_000)
@@ -93,4 +98,72 @@ public class PrefetchRedeliveryCountOpenwireTest extends 
OpenWireTestBase {
          }
       }
    }
+
+   @Test(timeout = 60_000)
+   public void testExclusiveConsumerOrderOnReconnectionLargePrefetch() throws 
Exception {
+      Connection exConn = null;
+
+      SimpleString durableQueue = new SimpleString("exampleQueueTwo");
+      this.server.createQueue(new 
QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST).setExclusive(true));
+
+      try {
+         ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
+         exFact.setWatchTopicAdvisories(false);
+
+         ActiveMQPrefetchPolicy prefetchPastMaxDeliveriesInLoop = new 
ActiveMQPrefetchPolicy();
+         prefetchPastMaxDeliveriesInLoop.setAll(2000);
+         exFact.setPrefetchPolicy(prefetchPastMaxDeliveriesInLoop);
+
+         RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+         redeliveryPolicy.setMaximumRedeliveries(4000);
+         exFact.setRedeliveryPolicy(redeliveryPolicy);
+
+         Queue queue = new ActiveMQQueue("exampleQueueTwo");
+
+         exConn = exFact.createConnection();
+
+         exConn.start();
+
+         Session session = exConn.createSession(true, 
Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer producer = session.createProducer(queue);
+
+         TextMessage message = session.createTextMessage("This is a text 
message");
+
+         int numMessages = 2000;
+         for (int i = 0; i < numMessages; i++) {
+            message.setIntProperty("SEQ", i);
+            producer.send(message);
+         }
+         session.commit();
+         exConn.close();
+
+         final int batch = 100;
+         for (int i = 0; i < numMessages; i += batch) {
+            // connection per batch
+            exConn = exFact.createConnection();
+            exConn.start();
+
+            session = exConn.createSession(true, Session.SESSION_TRANSACTED);
+
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            TextMessage messageReceived = null;
+            for (int j = 0; j < batch; j++) { // a small batch
+               messageReceived = (TextMessage) messageConsumer.receive(5000);
+               Assert.assertNotNull("null @ i=" + i, messageReceived);
+               Assert.assertEquals(i + j, 
messageReceived.getIntProperty("SEQ"));
+
+               assertEquals("This is a text message", 
messageReceived.getText());
+            }
+            session.commit();
+
+            
((FailoverTransport)((org.apache.activemq.ActiveMQConnection)exConn).getTransport().narrow(FailoverTransport.class)).stop();
+            exConn.close();
+         }
+      } finally {
+         if (exConn != null) {
+            exConn.close();
+         }
+      }
+   }
 }

Reply via email to