This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new a56ade3  ARTEMIS-3135 - track possible change in memory estimate when 
messages are converted to maps for JMX or UI display, follows up from 
ARTEMIS-3067
a56ade3 is described below

commit a56ade38b4e6cad18418ecb78c9d4f92fe06df4a
Author: gtully <[email protected]>
AuthorDate: Mon Feb 22 22:08:23 2021 +0000

    ARTEMIS-3135 - track possible change in memory estimate when messages are 
converted to maps for JMX or UI display, follows up from ARTEMIS-3067
---
 .../core/management/impl/QueueControlImpl.java     |  7 +++
 .../core/server/impl/MessageReferenceImpl.java     | 10 +++++
 .../artemis/core/server/impl/QueueImpl.java        | 14 +-----
 .../integration/amqp/AmqpScheduledMessageTest.java | 19 ++++++++
 .../tests/integration/amqp/JMXManagementTest.java  | 52 ++++++++++++++++++++++
 5 files changed, 90 insertions(+), 12 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index 5fb5a7d..cec65b2 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -51,6 +51,7 @@ import org.apache.activemq.artemis.core.server.Consumer;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
 import org.apache.activemq.artemis.core.server.impl.RefsOperation;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -792,7 +793,9 @@ public class QueueControlImpl extends AbstractControl 
implements QueueControl {
       int i = 0;
       for (MessageReference ref : refs) {
          Message message = ref.getMessage();
+         final int currentMemoryEstimate = message.getMemoryEstimate();
          messages[i++] = message.toMap();
+         MessageReferenceImpl.accountForChangeInMemoryEstimate(ref, 
currentMemoryEstimate);
       }
       return messages;
    }
@@ -853,7 +856,9 @@ public class QueueControlImpl extends AbstractControl 
implements QueueControl {
                   MessageReference ref = iterator.next();
                   if (filter == null || filter.match(ref.getMessage())) {
                      Message message = ref.getMessage();
+                     final int currentMemoryEstimate = 
message.getMemoryEstimate();
                      messages.add(message.toMap());
+                     
MessageReferenceImpl.accountForChangeInMemoryEstimate(ref, 
currentMemoryEstimate);
                   }
                }
             } catch (NoSuchElementException ignored) {
@@ -898,7 +903,9 @@ public class QueueControlImpl extends AbstractControl 
implements QueueControl {
             if (iterator.hasNext()) {
                MessageReference ref = iterator.next();
                Message message = ref.getMessage();
+               final int currentMemoryEstimate = message.getMemoryEstimate();
                messages.add(message.toMap());
+               MessageReferenceImpl.accountForChangeInMemoryEstimate(ref, 
currentMemoryEstimate);
             }
             return messages.toArray(new Map[1]);
          }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
index a7903fd..dfbeb87 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
@@ -179,6 +179,16 @@ public class MessageReferenceImpl extends 
LinkedListImpl.Node<MessageReferenceIm
       return MessageReferenceImpl.memoryOffset;
    }
 
+   public static void accountForChangeInMemoryEstimate(final MessageReference 
ref, final int existingMemoryEstimate) {
+      final int delta = ref.getMessageMemoryEstimate() - 
existingMemoryEstimate;
+      if (delta > 0) {
+         PagingStore pageStore = ref.getOwner();
+         if (pageStore != null) {
+            pageStore.addSize(delta);
+         }
+      }
+   }
+
    @Override
    public int getDeliveryCount() {
       return DELIVERY_COUNT_UPDATER.get(this);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 809c00c..61f1475 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -3072,7 +3072,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
          }
 
          if (existingMemoryEstimate > 0 ) {
-            accountForChangeInMemoryEstimate(ref, existingMemoryEstimate);
+            MessageReferenceImpl.accountForChangeInMemoryEstimate(ref, 
existingMemoryEstimate);
          }
       }
 
@@ -3702,7 +3702,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
             HandleStatus status = handle(ref, consumer);
 
-            accountForChangeInMemoryEstimate(ref, existingMemoryEstimate);
+            MessageReferenceImpl.accountForChangeInMemoryEstimate(ref, 
existingMemoryEstimate);
 
             if (status == HandleStatus.HANDLED) {
                final MessageReference reference;
@@ -3733,16 +3733,6 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
       }
    }
 
-   private static void accountForChangeInMemoryEstimate(final MessageReference 
ref, final int existingMemoryEstimate) {
-      final int delta = ref.getMessageMemoryEstimate() - 
existingMemoryEstimate;
-      if (delta > 0) {
-         PagingStore pageStore = ref.getOwner();
-         if (pageStore != null) {
-            pageStore.addSize(delta);
-         }
-      }
-   }
-
    private Consumer getGroupConsumer(SimpleString groupID) {
       Consumer groupConsumer = null;
       if (exclusive) {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java
index 6459e76..e488a21 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java
@@ -18,7 +18,13 @@ package org.apache.activemq.artemis.tests.integration.amqp;
 
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.server.Queue;
+import 
org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
 import org.apache.activemq.transport.amqp.client.AmqpMessage;
@@ -79,15 +85,26 @@ public class AmqpScheduledMessageTest extends 
AmqpClientTestSupport {
          final Queue queueView = getProxyToQueue(getQueueName());
          assertNotNull(queueView);
 
+         final SimpleString queueNameSS = 
SimpleString.toSimpleString(getQueueName());
+         PagingStore targetPagingStore = 
server.getPagingManager().getPageStore(queueNameSS);
+         assertNotNull(targetPagingStore);
+
+         QueueControl queueControl = 
ManagementControlHelper.createQueueControl(queueNameSS, queueNameSS, 
RoutingType.ANYCAST, this.mBeanServer);
+         assertNotNull(queueControl);
+
          AmqpMessage message = new AmqpMessage();
          long deliveryTime = System.currentTimeMillis() + 6000;
          message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
          message.setText("Test-Message");
+         message.setApplicationProperty("OneOfThose", "Please");
          sender.send(message);
          sender.close();
 
          assertEquals(1, queueView.getScheduledCount());
 
+         assertTrue(targetPagingStore.getAddressSize() > 0);
+         assertEquals(1, queueControl.listScheduledMessages().length);
+
          AmqpReceiver receiver = session.createReceiver(getQueueName());
          receiver.flow(1);
 
@@ -99,6 +116,8 @@ public class AmqpScheduledMessageTest extends 
AmqpClientTestSupport {
          received = receiver.receive(10, TimeUnit.SECONDS);
          assertNotNull(received);
          received.accept();
+
+         Wait.assertEquals(0L, targetPagingStore::getAddressSize);
       } finally {
          connection.close();
       }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java
index 0a63bde..fd6a781 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java
@@ -16,10 +16,13 @@
  */
 package org.apache.activemq.artemis.tests.integration.amqp;
 
+import org.apache.activemq.artemis.api.core.JsonUtil;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.core.paging.PagingStore;
 import 
org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
 import org.apache.activemq.transport.amqp.client.AmqpMessage;
@@ -34,6 +37,8 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.json.JsonArray;
+import javax.json.JsonObject;
 import java.util.Map;
 
 public class JMXManagementTest extends JMSClientTestSupport {
@@ -116,6 +121,53 @@ public class JMXManagementTest extends 
JMSClientTestSupport {
       }
    }
 
+   @Test
+   public void testAddressSizeOnDelete() throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+
+      try {
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(getQueueName());
+
+         session.begin();
+         AmqpMessage message = new AmqpMessage();
+         message.setApplicationProperty("TEST_STRING", "TEST");
+         message.setTimeToLive(100);
+         message.setText("TEST");
+         // send 2 so we can verify getFirstMessage and List
+         sender.send(message);
+         sender.send(message);
+         session.commit();
+
+         PagingStore targetPagingStore = 
server.getPagingManager().getPageStore(SimpleString.toSimpleString(getQueueName()));
+         assertNotNull(targetPagingStore);
+
+         assertTrue(targetPagingStore.getAddressSize() > 0);
+
+         SimpleString queue = new SimpleString(getQueueName());
+         QueueControl queueControl = createManagementControl(queue, queue);
+
+         Assert.assertEquals(2, queueControl.getMessageCount());
+
+         JsonArray array = 
JsonUtil.readJsonArray(queueControl.getFirstMessageAsJSON());
+         JsonObject object = (JsonObject) array.get(0);
+         
queueControl.removeMessage(object.getJsonNumber("messageID").longValue());
+
+         Wait.assertEquals(1L, queueControl::getMessageCount);
+
+         Map<String, Object>[] messages = queueControl.listMessages("");
+         Assert.assertEquals(1, messages.length);
+         queueControl.removeMessage((Long) messages[0].get("messageID"));
+
+         Assert.assertEquals(0, queueControl.getMessageCount());
+         Wait.assertEquals(0L, targetPagingStore::getAddressSize);
+
+      } finally {
+         connection.close();
+      }
+   }
+
    protected QueueControl createManagementControl(final SimpleString address,
                                                   final SimpleString queue) 
throws Exception {
       QueueControl queueControl = 
ManagementControlHelper.createQueueControl(address, queue, RoutingType.ANYCAST, 
this.mBeanServer);

Reply via email to