This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new fb2a57f3ed ARTEMIS-4941 Remove lazy update after application
properties as it's no longer needed
fb2a57f3ed is described below
commit fb2a57f3ed25895681d3636e3f5fb9d9b9d0a053
Author: Clebert Suconic <[email protected]>
AuthorDate: Mon Jul 22 12:25:03 2024 -0400
ARTEMIS-4941 Remove lazy update after application properties as it's no
longer needed
---
.../apache/activemq/artemis/api/core/Message.java | 6 --
.../protocol/amqp/broker/AMQPLargeMessage.java | 1 -
.../artemis/protocol/amqp/broker/AMQPMessage.java | 20 ------
.../protocol/amqp/broker/AMQPStandardMessage.java | 1 -
.../core/paging/cursor/PagedReferenceImpl.java | 14 ++--
.../artemis/core/server/impl/QueueImpl.java | 4 +-
.../integration/amqp/paging/AmqpPagingTest.java | 83 ----------------------
.../jms/multiprotocol/JMSMessageConsumerTest.java | 39 ++++++++++
8 files changed, 46 insertions(+), 122 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 2c6beaefda..88abfa5531 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -809,12 +809,6 @@ public interface Message {
int getMemoryEstimate();
- /** The first estimate that's been calculated without any updates. */
- default int getOriginalEstimate() {
- // For Core Protocol we always use the same estimate
- return getMemoryEstimate();
- }
-
/**
* This is the size of the message when persisted on disk which is used for
metrics tracking
* Note that even if the message itself is not persisted on disk (ie
non-durable) this value is
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
index 777f3752ec..19621b1c59 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
@@ -585,7 +585,6 @@ public class AMQPLargeMessage extends AMQPMessage
implements LargeServerMessage
public int getMemoryEstimate() {
if (memoryEstimate == -1) {
memoryEstimate = memoryOffset * 2 + (extraProperties != null ?
extraProperties.getEncodeSize() : 0);
- originalEstimate = memoryEstimate;
}
return memoryEstimate;
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 4c43401fac..86e6794d00 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -43,7 +43,6 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import
org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants;
import
org.apache.activemq.artemis.core.message.openmbean.MessageOpenTypeFactory;
-import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.server.MessageReference;
@@ -204,7 +203,6 @@ public abstract class AMQPMessage extends RefCountMessage
implements org.apache.
protected long messageID;
protected SimpleString address;
protected volatile int memoryEstimate = -1;
- protected volatile int originalEstimate = -1;
protected long expiration;
protected boolean expirationReload = false;
protected long scheduledTime = -1;
@@ -545,13 +543,6 @@ public abstract class AMQPMessage extends RefCountMessage
implements org.apache.
protected ApplicationProperties
lazyDecodeApplicationProperties(ReadableBuffer data) {
if (applicationProperties == null && applicationPropertiesPosition !=
VALUE_NOT_PRESENT) {
applicationProperties = scanForMessageSection(data,
applicationPropertiesPosition, ApplicationProperties.class);
- if (owner != null && memoryEstimate != -1) {
- // the memory has already been tracked and needs to be updated to
reflect the new decoding
- int addition =
unmarshalledApplicationPropertiesMemoryEstimateFromData(data);
- ((PagingStore)owner).addSize(addition, false);
- final int updatedEstimate = memoryEstimate + addition;
- memoryEstimate = updatedEstimate;
- }
}
return applicationProperties;
@@ -675,7 +666,6 @@ public abstract class AMQPMessage extends RefCountMessage
implements org.apache.
}
encodedHeaderSize = 0;
memoryEstimate = -1;
- originalEstimate = -1;
scheduledTime = -1;
encodedDeliveryAnnotationsSize = 0;
headerPosition = VALUE_NOT_PRESENT;
@@ -871,16 +861,6 @@ public abstract class AMQPMessage extends RefCountMessage
implements org.apache.
@Override
public abstract int getMemoryEstimate();
- @Override
- public int getOriginalEstimate() {
- if (originalEstimate < 0) {
- // getMemoryEstimate should initialize originalEstimate
- return getMemoryEstimate();
- } else {
- return originalEstimate;
- }
- }
-
@Override
public Map<String, Object> toPropertyMap(int valueSizeLimit) {
return toPropertyMap(false, valueSizeLimit);
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java
index 245a838ad7..178b96f884 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java
@@ -189,7 +189,6 @@ public class AMQPStandardMessage extends AMQPMessage {
public int getMemoryEstimate() {
if (memoryEstimate == -1) {
memoryEstimate = memoryOffset + (data != null ? data.capacity() +
unmarshalledApplicationPropertiesMemoryEstimateFromData(data) : 0);
- originalEstimate = memoryEstimate;
}
return memoryEstimate;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
index 873485b961..74b5598e0a 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
@@ -47,8 +47,6 @@ public class PagedReferenceImpl extends
AbstractProtocolReference implements Pag
private int persistedCount;
- private int messageEstimate = -1;
-
// this is a cached position returned on getPosition.
// just to avoid creating on object on each call
PagePosition cachedPositionObject;
@@ -164,14 +162,12 @@ public class PagedReferenceImpl extends
AbstractProtocolReference implements Pag
@Override
public int getMessageMemoryEstimate() {
- if (messageEstimate <= 0) {
- try {
- messageEstimate = getMessage().getMemoryEstimate();
- } catch (Throwable e) {
- ActiveMQServerLogger.LOGGER.errorCalculateMessageMemoryEstimate(e);
- }
+ try {
+ return getMessage().getMemoryEstimate();
+ } catch (Throwable e) {
+ ActiveMQServerLogger.LOGGER.errorCalculateMessageMemoryEstimate(e);
+ return 0;
}
- return messageEstimate;
}
@Override
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 249c7e44a9..f1751011ac 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1052,7 +1052,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
// If an AMQP message parses its properties, its size might be
updated and the address will receive more bytes.
// However, in this case, we should always use the original
estimate.
// Otherwise, we might get incorrect sizes after the update.
-
pagingStore.addSize(messageReference.getMessage().getOriginalEstimate(), false,
false);
+
pagingStore.addSize(messageReference.getMessage().getMemoryEstimate(), false,
false);
}
pagingStore.refUp(messageReference.getMessage(), count);
@@ -1071,7 +1071,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
// If an AMQP message parses its properties, its size might be
updated and the address will receive more bytes.
// However, in this case, we should always use the original
estimate.
// Otherwise, we might get incorrect sizes after the update.
-
pagingStore.addSize(-messageReference.getMessage().getOriginalEstimate(),
false, false);
+
pagingStore.addSize(-messageReference.getMessage().getMemoryEstimate(), false,
false);
}
pagingStore.refDown(messageReference.getMessage(), count);
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPagingTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPagingTest.java
index 9a41b04b62..d92ebf931a 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPagingTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPagingTest.java
@@ -121,87 +121,4 @@ public class AmqpPagingTest extends AmqpClientTestSupport {
connection.close();
}
-
- @TestTemplate
- @Timeout(60)
- public void testSizeCalculationsForApplicationProperties() throws Exception
{
- final int MSG_SIZE = 1000;
- final StringBuilder builder = new StringBuilder();
- for (int i = 0; i < MSG_SIZE; i++) {
- builder.append('0');
- }
- final String data = builder.toString();
- final int MSG_COUNT = 1;
-
- AmqpClient client = createAmqpClient();
- AmqpConnection connection = addConnection(client.connect());
- AmqpSession session = connection.createSession();
-
- AmqpSender sender = session.createSender(getQueueName(), true);
-
- // use selector expression that references a property to force decode of
application properties
- AmqpReceiver receiver = session.createReceiver(getQueueName(), "myData
IS NOT NULL");
- receiver.setPresettle(true);
- receiver.flow(10);
- assertNull(receiver.receiveNoWait(), "somehow the queue had messages
from a previous test");
- receiver.flow(0);
-
- AmqpMessage message = new AmqpMessage();
- message.setText(data);
-
- // large message property also
- message.setApplicationProperty("myData", data);
-
- if (durable != null) {
- message.setDurable(durable);
- }
- sender.send(message);
-
- PagingStore pagingStore =
server.getPagingManager().getPageStore(SimpleString.of(getQueueName()));
-
- // verify page usage reflects data + 2*application properties (encoded
and decoded)
- assertTrue(Wait.waitFor(() -> {
- return pagingStore.getAddressSize() > 3000;
- }));
-
- receiver.flow(MSG_COUNT);
- AmqpMessage receive = receiver.receive(10, TimeUnit.MINUTES);
- assertNotNull(receive, "Not received anything after receive");
- receive.accept();
-
- assertTrue(Wait.waitFor(() -> {
- return pagingStore.getAddressSize() == 0;
- }));
-
- // send another with duplicate id property, to force early decode
- message = new AmqpMessage();
- message.setText(data);
-
- // ensures application properties are referenced
- message.setApplicationProperty("_AMQ_DUPL_ID", "1");
-
- // large message property also
- message.setApplicationProperty("myData", data);
-
- if (durable != null) {
- message.setDurable(durable);
- }
- sender.send(message);
-
- sender.close();
-
- // verify page usage reflects data + 2*application properties (encoded
and decoded)
- assertTrue(Wait.waitFor(() -> {
- return pagingStore.getAddressSize() > 3000;
- }));
-
- receiver.flow(MSG_COUNT);
- receive = receiver.receive(10, TimeUnit.MINUTES);
- assertNotNull(receive, "Not received anything after receive");
- receive.accept();
-
- receiver.close();
- connection.close();
- }
-
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMessageConsumerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMessageConsumerTest.java
index 060f8862ee..bde0955955 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMessageConsumerTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMessageConsumerTest.java
@@ -35,9 +35,14 @@ import java.lang.invoke.MethodHandles;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.management.AddressControl;
+import org.apache.activemq.artemis.api.core.management.ResourceNames;
+import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.utils.DestinationUtil;
import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
@@ -398,4 +403,38 @@ public class JMSMessageConsumerTest extends
MultiprotocolJMSClientTestSupport {
consumerConnection.close();
}
}
+
+
+ @Test
+ public void testConvertedAndPaging() throws Exception {
+ final int MESSAGE_COUNT = 1;
+
server.createQueue(QueueConfiguration.of(getQueueName()).setRoutingType(RoutingType.ANYCAST));
+ PagingStore store =
server.getPagingManager().getPageStore(SimpleString.of(getQueueName()));
+ store.startPaging();
+ try (Connection senderConnection = createConnection(); Connection
consumerConnection = createCoreConnection()) {
+ Session consumerSession = consumerConnection.createSession(true,
Session.SESSION_TRANSACTED);
+ MessageConsumer consumer =
consumerSession.createConsumer(consumerSession.createQueue(getQueueName()));
+
+ Session senderSession = senderConnection.createSession(true,
Session.SESSION_TRANSACTED);
+ MessageProducer producer =
senderSession.createProducer(senderSession.createQueue(getQueueName()));
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ Message message = senderSession.createMessage();
+ message.setIntProperty("count", i); // test will also pass if this
is removed
+ producer.send(message);
+ }
+ senderSession.commit();
+
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ Message received = consumer.receive(1000);
+ assertNotNull(received);
+ }
+ consumerSession.commit();
+ consumer.close();
+
+ assertEquals(0, server.locateQueue(getQueueName()).getMessageCount());
+ Wait.assertEquals(0, store::getAddressSize, 5000);
+ assertEquals(0, ((AddressControl)
server.getManagementService().getResource(ResourceNames.ADDRESS +
getQueueName())).getAddressSize());
+ }
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact