This is an automated email from the ASF dual-hosted git repository.
gtully pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new a56ade3 ARTEMIS-3135 - track possible change in memory estimate when
messages are converted to maps for JMX or UI display, follows up from
ARTEMIS-3067
a56ade3 is described below
commit a56ade38b4e6cad18418ecb78c9d4f92fe06df4a
Author: gtully <[email protected]>
AuthorDate: Mon Feb 22 22:08:23 2021 +0000
ARTEMIS-3135 - track possible change in memory estimate when messages are
converted to maps for JMX or UI display, follows up from ARTEMIS-3067
---
.../core/management/impl/QueueControlImpl.java | 7 +++
.../core/server/impl/MessageReferenceImpl.java | 10 +++++
.../artemis/core/server/impl/QueueImpl.java | 14 +-----
.../integration/amqp/AmqpScheduledMessageTest.java | 19 ++++++++
.../tests/integration/amqp/JMXManagementTest.java | 52 ++++++++++++++++++++++
5 files changed, 90 insertions(+), 12 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index 5fb5a7d..cec65b2 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -51,6 +51,7 @@ import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
import org.apache.activemq.artemis.core.server.impl.RefsOperation;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -792,7 +793,9 @@ public class QueueControlImpl extends AbstractControl
implements QueueControl {
int i = 0;
for (MessageReference ref : refs) {
Message message = ref.getMessage();
+ final int currentMemoryEstimate = message.getMemoryEstimate();
messages[i++] = message.toMap();
+ MessageReferenceImpl.accountForChangeInMemoryEstimate(ref,
currentMemoryEstimate);
}
return messages;
}
@@ -853,7 +856,9 @@ public class QueueControlImpl extends AbstractControl
implements QueueControl {
MessageReference ref = iterator.next();
if (filter == null || filter.match(ref.getMessage())) {
Message message = ref.getMessage();
+ final int currentMemoryEstimate =
message.getMemoryEstimate();
messages.add(message.toMap());
+
MessageReferenceImpl.accountForChangeInMemoryEstimate(ref,
currentMemoryEstimate);
}
}
} catch (NoSuchElementException ignored) {
@@ -898,7 +903,9 @@ public class QueueControlImpl extends AbstractControl
implements QueueControl {
if (iterator.hasNext()) {
MessageReference ref = iterator.next();
Message message = ref.getMessage();
+ final int currentMemoryEstimate = message.getMemoryEstimate();
messages.add(message.toMap());
+ MessageReferenceImpl.accountForChangeInMemoryEstimate(ref,
currentMemoryEstimate);
}
return messages.toArray(new Map[1]);
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
index a7903fd..dfbeb87 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
@@ -179,6 +179,16 @@ public class MessageReferenceImpl extends
LinkedListImpl.Node<MessageReferenceIm
return MessageReferenceImpl.memoryOffset;
}
+ public static void accountForChangeInMemoryEstimate(final MessageReference
ref, final int existingMemoryEstimate) {
+ final int delta = ref.getMessageMemoryEstimate() -
existingMemoryEstimate;
+ if (delta > 0) {
+ PagingStore pageStore = ref.getOwner();
+ if (pageStore != null) {
+ pageStore.addSize(delta);
+ }
+ }
+ }
+
@Override
public int getDeliveryCount() {
return DELIVERY_COUNT_UPDATER.get(this);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 809c00c..61f1475 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -3072,7 +3072,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
}
if (existingMemoryEstimate > 0 ) {
- accountForChangeInMemoryEstimate(ref, existingMemoryEstimate);
+ MessageReferenceImpl.accountForChangeInMemoryEstimate(ref,
existingMemoryEstimate);
}
}
@@ -3702,7 +3702,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
HandleStatus status = handle(ref, consumer);
- accountForChangeInMemoryEstimate(ref, existingMemoryEstimate);
+ MessageReferenceImpl.accountForChangeInMemoryEstimate(ref,
existingMemoryEstimate);
if (status == HandleStatus.HANDLED) {
final MessageReference reference;
@@ -3733,16 +3733,6 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
}
}
- private static void accountForChangeInMemoryEstimate(final MessageReference
ref, final int existingMemoryEstimate) {
- final int delta = ref.getMessageMemoryEstimate() -
existingMemoryEstimate;
- if (delta > 0) {
- PagingStore pageStore = ref.getOwner();
- if (pageStore != null) {
- pageStore.addSize(delta);
- }
- }
- }
-
private Consumer getGroupConsumer(SimpleString groupID) {
Consumer groupConsumer = null;
if (exclusive) {
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java
index 6459e76..e488a21 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java
@@ -18,7 +18,13 @@ package org.apache.activemq.artemis.tests.integration.amqp;
import java.util.concurrent.TimeUnit;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.server.Queue;
+import
org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
+import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
@@ -79,15 +85,26 @@ public class AmqpScheduledMessageTest extends
AmqpClientTestSupport {
final Queue queueView = getProxyToQueue(getQueueName());
assertNotNull(queueView);
+ final SimpleString queueNameSS =
SimpleString.toSimpleString(getQueueName());
+ PagingStore targetPagingStore =
server.getPagingManager().getPageStore(queueNameSS);
+ assertNotNull(targetPagingStore);
+
+ QueueControl queueControl =
ManagementControlHelper.createQueueControl(queueNameSS, queueNameSS,
RoutingType.ANYCAST, this.mBeanServer);
+ assertNotNull(queueControl);
+
AmqpMessage message = new AmqpMessage();
long deliveryTime = System.currentTimeMillis() + 6000;
message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
message.setText("Test-Message");
+ message.setApplicationProperty("OneOfThose", "Please");
sender.send(message);
sender.close();
assertEquals(1, queueView.getScheduledCount());
+ assertTrue(targetPagingStore.getAddressSize() > 0);
+ assertEquals(1, queueControl.listScheduledMessages().length);
+
AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(1);
@@ -99,6 +116,8 @@ public class AmqpScheduledMessageTest extends
AmqpClientTestSupport {
received = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull(received);
received.accept();
+
+ Wait.assertEquals(0L, targetPagingStore::getAddressSize);
} finally {
connection.close();
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java
index 0a63bde..fd6a781 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java
@@ -16,10 +16,13 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
+import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.core.paging.PagingStore;
import
org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
+import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
@@ -34,6 +37,8 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
+import javax.json.JsonArray;
+import javax.json.JsonObject;
import java.util.Map;
public class JMXManagementTest extends JMSClientTestSupport {
@@ -116,6 +121,53 @@ public class JMXManagementTest extends
JMSClientTestSupport {
}
}
+ @Test
+ public void testAddressSizeOnDelete() throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+
+ try {
+ AmqpSession session = connection.createSession();
+ AmqpSender sender = session.createSender(getQueueName());
+
+ session.begin();
+ AmqpMessage message = new AmqpMessage();
+ message.setApplicationProperty("TEST_STRING", "TEST");
+ message.setTimeToLive(100);
+ message.setText("TEST");
+ // send 2 so we can verify getFirstMessage and List
+ sender.send(message);
+ sender.send(message);
+ session.commit();
+
+ PagingStore targetPagingStore =
server.getPagingManager().getPageStore(SimpleString.toSimpleString(getQueueName()));
+ assertNotNull(targetPagingStore);
+
+ assertTrue(targetPagingStore.getAddressSize() > 0);
+
+ SimpleString queue = new SimpleString(getQueueName());
+ QueueControl queueControl = createManagementControl(queue, queue);
+
+ Assert.assertEquals(2, queueControl.getMessageCount());
+
+ JsonArray array =
JsonUtil.readJsonArray(queueControl.getFirstMessageAsJSON());
+ JsonObject object = (JsonObject) array.get(0);
+
queueControl.removeMessage(object.getJsonNumber("messageID").longValue());
+
+ Wait.assertEquals(1L, queueControl::getMessageCount);
+
+ Map<String, Object>[] messages = queueControl.listMessages("");
+ Assert.assertEquals(1, messages.length);
+ queueControl.removeMessage((Long) messages[0].get("messageID"));
+
+ Assert.assertEquals(0, queueControl.getMessageCount());
+ Wait.assertEquals(0L, targetPagingStore::getAddressSize);
+
+ } finally {
+ connection.close();
+ }
+ }
+
protected QueueControl createManagementControl(final SimpleString address,
final SimpleString queue)
throws Exception {
QueueControl queueControl =
ManagementControlHelper.createQueueControl(address, queue, RoutingType.ANYCAST,
this.mBeanServer);