This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new fa80c03  ARTEMIS-3234 - fix and test, the existing tests suffered with 
suppressInternalManagementObjects defaulting to true. credit accounting is now 
independent of the ack list such that preack for advisories can work
     new f580ecb  This closes #3534
fa80c03 is described below

commit fa80c03049d1faaca5e3f00a68b5ac438cd40548
Author: gtully <[email protected]>
AuthorDate: Fri Apr 9 19:51:09 2021 +0100

    ARTEMIS-3234 - fix and test, the existing tests suffered with 
suppressInternalManagementObjects defaulting to true. credit accounting is now 
independent of the ack list such that preack for advisories can work
---
 .../core/protocol/openwire/amq/AMQConsumer.java    |  98 ++++++++++---------
 .../integration/openwire/AdvisoryOpenWireTest.java | 106 ++++++++++++---------
 2 files changed, 114 insertions(+), 90 deletions(-)

diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 6a00f02..89eafe7 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -70,7 +70,7 @@ public class AMQConsumer {
 
    private int prefetchSize;
    private final AtomicInteger currentWindow;
-   private int deliveredAcks;
+   private int deliveredAcksCreditExtension = 0;
    private long messagePullSequence = 0;
    private final AtomicReference<MessagePullHandler> messagePullHandler = new 
AtomicReference<>(null);
    //internal means we don't expose
@@ -90,7 +90,6 @@ public class AMQConsumer {
       this.scheduledPool = scheduledPool;
       this.prefetchSize = info.getPrefetchSize();
       this.currentWindow = new AtomicInteger(prefetchSize);
-      this.deliveredAcks = 0;
       if (prefetchSize == 0) {
          messagePullHandler.set(new MessagePullHandler());
       }
@@ -295,6 +294,28 @@ public class AMQConsumer {
     */
    public void acknowledge(MessageAck ack) throws Exception {
 
+      if (ack.isRedeliveredAck()) {
+         // we don't mind if the client thinks it is a redelivery
+         return;
+      }
+
+      final int ackMessageCount = ack.getMessageCount();
+      acquireCredit(ackMessageCount);
+
+      if (ack.isDeliveredAck()) {
+         deliveredAcksCreditExtension += ackMessageCount;
+         // our work is done
+         return;
+      }
+
+      // some sort of real ack, rebalance deliveredAcksCreditExtension
+      if (deliveredAcksCreditExtension > 0) {
+         deliveredAcksCreditExtension -= ackMessageCount;
+         if (deliveredAcksCreditExtension >= 0) {
+            currentWindow.addAndGet(-ackMessageCount);
+         }
+      }
+
       final MessageId startID, lastID;
 
       if (ack.getFirstMessageId() == null) {
@@ -309,59 +330,42 @@ public class AMQConsumer {
       if (serverConsumer.getQueue().isNonDestructive()) {
          removeReferences = false;
       }
-      if (ack.isRedeliveredAck() || ack.isDeliveredAck() || 
ack.isExpiredAck()) {
-         removeReferences = false;
-      }
-
-      List<MessageReference> ackList = 
serverConsumer.scanDeliveringReferences(removeReferences, reference -> 
startID.equals(reference.getProtocolData()), reference -> 
lastID.equals(reference.getProtocolData()));
-
-      if (removeReferences && (ack.isIndividualAck() || ack.isStandardAck() || 
ack.isPoisonAck())) {
-         if (deliveredAcks < ackList.size()) {
-            acquireCredit(ackList.size() - deliveredAcks);
-            deliveredAcks = 0;
-         } else {
-            deliveredAcks -= ackList.size();
-         }
-      } else {
-         if (ack.isDeliveredAck()) {
-            this.deliveredAcks += ack.getMessageCount();
-         }
-
-         acquireCredit(ack.getMessageCount());
-      }
 
-      if (removeReferences) {
+      final List<MessageReference> ackList = 
serverConsumer.scanDeliveringReferences(removeReferences, reference -> 
startID.equals(reference.getProtocolData()), reference -> 
lastID.equals(reference.getProtocolData()));
 
-         Transaction originalTX = 
session.getCoreSession().getCurrentTransaction();
-         Transaction transaction;
+      if (!ackList.isEmpty()) {
+         if (ack.isExpiredAck()) {
+            for (MessageReference ref : ackList) {
+               ref.getQueue().expire(ref, serverConsumer);
+            }
+         } else if (removeReferences) {
 
-         if (originalTX == null) {
-            transaction = session.getCoreSession().newTransaction();
-         } else {
-            transaction = originalTX;
-         }
+            Transaction originalTX = 
session.getCoreSession().getCurrentTransaction();
+            Transaction transaction;
 
-         if (ack.isIndividualAck() || ack.isStandardAck()) {
-            for (MessageReference ref : ackList) {
-               ref.acknowledge(transaction, serverConsumer);
+            if (originalTX == null) {
+               transaction = session.getCoreSession().newTransaction();
+            } else {
+               transaction = originalTX;
             }
-         } else if (ack.isPoisonAck()) {
-            for (MessageReference ref : ackList) {
-               Throwable poisonCause = ack.getPoisonCause();
-               if (poisonCause != null) {
-                  
ref.getMessage().putStringProperty(OpenWireMessageConverter.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY,
 new SimpleString(poisonCause.toString()));
+
+            if (ack.isIndividualAck() || ack.isStandardAck()) {
+               for (MessageReference ref : ackList) {
+                  ref.acknowledge(transaction, serverConsumer);
+               }
+            } else if (ack.isPoisonAck()) {
+               for (MessageReference ref : ackList) {
+                  Throwable poisonCause = ack.getPoisonCause();
+                  if (poisonCause != null) {
+                     
ref.getMessage().putStringProperty(OpenWireMessageConverter.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY,
 new SimpleString(poisonCause.toString()));
+                  }
+                  ref.getQueue().sendToDeadLetterAddress(transaction, ref);
                }
-               ref.getQueue().sendToDeadLetterAddress(transaction, ref);
             }
-         }
 
-         if (originalTX == null) {
-            transaction.commit(true);
-         }
-      }
-      if (ack.isExpiredAck()) {
-         for (MessageReference ref : ackList) {
-            ref.getQueue().expire(ref, serverConsumer);
+            if (originalTX == null) {
+               transaction.commit(true);
+            }
          }
       }
    }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/AdvisoryOpenWireTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/AdvisoryOpenWireTest.java
index a76f5e3..fb80e2e 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/AdvisoryOpenWireTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/AdvisoryOpenWireTest.java
@@ -18,7 +18,6 @@ package 
org.apache.activemq.artemis.tests.integration.openwire;
 
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.artemis.api.core.management.AddressControl;
-import org.apache.activemq.artemis.api.core.management.QueueControl;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.junit.Before;
 import org.junit.Test;
@@ -30,6 +29,8 @@ import javax.jms.TemporaryTopic;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.artemis.core.config.Configuration;
+
 public class AdvisoryOpenWireTest extends BasicOpenWireTest {
 
    @Override
@@ -44,12 +45,17 @@ public class AdvisoryOpenWireTest extends BasicOpenWireTest 
{
       super.setUp();
    }
 
+   @Override
+   protected void extraServerConfig(Configuration serverConfig) {
+      // ensure advisory addresses are visible
+      
serverConfig.getAcceptorConfigurations().iterator().next().getExtraParams().put("suppressInternalManagementObjects",
 "false");
+      super.extraServerConfig(serverConfig);
+   }
+
    @Test
    public void testTempTopicLeak() throws Exception {
-      Connection connection = null;
 
-      try {
-         connection = factory.createConnection();
+      try (Connection connection = factory.createConnection()) {
          connection.start();
 
          Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
@@ -57,32 +63,32 @@ public class AdvisoryOpenWireTest extends BasicOpenWireTest 
{
          TemporaryTopic temporaryTopic = session.createTemporaryTopic();
          temporaryTopic.delete();
 
-         Object[] queueResources = 
server.getManagementService().getResources(QueueControl.class);
+         AddressControl advisoryAddress = 
assertNonNullAddressControl("ActiveMQ.Advisory.TempTopic");
+         Wait.waitFor(() -> advisoryAddress.getMessageCount() == 0);
 
-         for (Object queueResource : queueResources) {
+         Wait.assertEquals(0, advisoryAddress::getMessageCount);
+         Wait.assertEquals(2, advisoryAddress::getRoutedMessageCount);
 
-            if (((QueueControl) 
queueResource).getAddress().equals("ActiveMQ.Advisory.TempTopic")) {
-               QueueControl queueControl = (QueueControl) queueResource;
-               Wait.waitFor(() -> queueControl.getMessageCount() == 0);
-               assertNotNull("addressControl for temp advisory", queueControl);
+      }
+   }
 
-               Wait.assertEquals(0, queueControl::getMessageCount);
-               Wait.assertEquals(2, queueControl::getMessagesAdded);
-            }
-         }
-      } finally {
-         if (connection != null) {
-            connection.close();
+   private AddressControl assertNonNullAddressControl(String match) {
+      AddressControl advisoryAddressControl = null;
+      Object[] addressResources = 
server.getManagementService().getResources(AddressControl.class);
+
+      for (Object addressResource : addressResources) {
+         if (((AddressControl) addressResource).getAddress().equals(match)) {
+            advisoryAddressControl = (AddressControl) addressResource;
          }
       }
+      assertNotNull("addressControl for temp advisory", 
advisoryAddressControl);
+      return advisoryAddressControl;
    }
 
    @Test
    public void testTempQueueLeak() throws Exception {
-      Connection connection = null;
 
-      try {
-         connection = factory.createConnection();
+      try (Connection connection = factory.createConnection()) {
          connection.start();
 
          Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
@@ -90,23 +96,12 @@ public class AdvisoryOpenWireTest extends BasicOpenWireTest 
{
          TemporaryQueue temporaryQueue = session.createTemporaryQueue();
          temporaryQueue.delete();
 
-         Object[] queueResources = 
server.getManagementService().getResources(QueueControl.class);
-
-         for (Object queueResource : queueResources) {
+         AddressControl advisoryAddress = 
assertNonNullAddressControl("ActiveMQ.Advisory.TempQueue");
+         Wait.waitFor(() -> advisoryAddress.getMessageCount() == 0);
 
-            if (((QueueControl) 
queueResource).getAddress().equals("ActiveMQ.Advisory.TempQueue")) {
-               QueueControl queueControl = (QueueControl) queueResource;
-               Wait.waitFor(() -> queueControl.getMessageCount() == 0);
-               assertNotNull("addressControl for temp advisory", queueControl);
-               Wait.assertEquals(0, queueControl::getMessageCount);
-               Wait.assertEquals(2, queueControl::getMessagesAdded);
+         Wait.assertEquals(0, advisoryAddress::getMessageCount);
+         Wait.assertEquals(2, advisoryAddress::getRoutedMessageCount);
 
-            }
-         }
-      } finally {
-         if (connection != null) {
-            connection.close();
-         }
       }
    }
 
@@ -127,20 +122,45 @@ public class AdvisoryOpenWireTest extends 
BasicOpenWireTest {
             temporaryQueue.delete();
          }
 
-         Object[] addressResources = 
server.getManagementService().getResources(AddressControl.class);
+         AddressControl advisoryAddress = 
assertNonNullAddressControl("ActiveMQ.Advisory.TempQueue");
 
-         for (Object addressResource : addressResources) {
+         Wait.waitFor(() -> advisoryAddress.getMessageCount() == 0);
+         Wait.assertEquals(0, advisoryAddress::getMessageCount);
 
-            if (((AddressControl) 
addressResource).getAddress().equals("ActiveMQ.Advisory.TempQueue")) {
-               AddressControl addressControl = (AddressControl) 
addressResource;
-               Wait.waitFor(() -> addressControl.getMessageCount() == 0);
-               assertNotNull("addressControl for temp advisory", 
addressControl);
-               Wait.assertEquals(0, addressControl::getMessageCount);
+      } finally {
+         for (Connection conn : connections) {
+            if (conn != null) {
+               conn.close();
             }
          }
+      }
+   }
+
+   @Test
+   public void testLongLivedConnectionGetsAllPastPrefetch() throws Exception {
+      final Connection[] connections = new Connection[2];
+
+      final int numTempDestinations = 600;  // such that 2x exceeds default 1k 
prefetch for advisory consumer
+      try {
+         for (int i = 0; i < connections.length; i++) {
+            connections[i] = factory.createConnection();
+            connections[i].start();
+         }
+
+         Session session = connections[0].createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+         for (int i = 0; i < numTempDestinations; i++) {
+            TemporaryQueue temporaryQueue = session.createTemporaryQueue();
+            temporaryQueue.delete();
+         }
+
+         AddressControl advisoryAddress = 
assertNonNullAddressControl("ActiveMQ.Advisory.TempQueue");
+         Wait.waitFor(() -> advisoryAddress.getMessageCount() == 0);
+         Wait.assertEquals(0, advisoryAddress::getMessageCount);
 
+         // there is an advisory for create and another for delete
+         assertEquals("all routed", numTempDestinations * 2, 
advisoryAddress.getRoutedMessageCount());
 
-         //sleep a bit to allow message count to go down.
       } finally {
          for (Connection conn : connections) {
             if (conn != null) {

Reply via email to