This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new fb2a57f3ed ARTEMIS-4941 Remove lazy update after application 
properties as it's no longer needed
fb2a57f3ed is described below

commit fb2a57f3ed25895681d3636e3f5fb9d9b9d0a053
Author: Clebert Suconic <[email protected]>
AuthorDate: Mon Jul 22 12:25:03 2024 -0400

    ARTEMIS-4941 Remove lazy update after application properties as it's no 
longer needed
---
 .../apache/activemq/artemis/api/core/Message.java  |  6 --
 .../protocol/amqp/broker/AMQPLargeMessage.java     |  1 -
 .../artemis/protocol/amqp/broker/AMQPMessage.java  | 20 ------
 .../protocol/amqp/broker/AMQPStandardMessage.java  |  1 -
 .../core/paging/cursor/PagedReferenceImpl.java     | 14 ++--
 .../artemis/core/server/impl/QueueImpl.java        |  4 +-
 .../integration/amqp/paging/AmqpPagingTest.java    | 83 ----------------------
 .../jms/multiprotocol/JMSMessageConsumerTest.java  | 39 ++++++++++
 8 files changed, 46 insertions(+), 122 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 2c6beaefda..88abfa5531 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -809,12 +809,6 @@ public interface Message {
 
    int getMemoryEstimate();
 
-   /** The first estimate that's been calculated without any updates. */
-   default int getOriginalEstimate() {
-      // For Core Protocol we always use the same estimate
-      return getMemoryEstimate();
-   }
-
    /**
     * This is the size of the message when persisted on disk which is used for 
metrics tracking
     * Note that even if the message itself is not persisted on disk (ie 
non-durable) this value is
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
index 777f3752ec..19621b1c59 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
@@ -585,7 +585,6 @@ public class AMQPLargeMessage extends AMQPMessage 
implements LargeServerMessage
    public int getMemoryEstimate() {
       if (memoryEstimate == -1) {
          memoryEstimate = memoryOffset * 2 + (extraProperties != null ? 
extraProperties.getEncodeSize() : 0);
-         originalEstimate = memoryEstimate;
       }
       return memoryEstimate;
    }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 4c43401fac..86e6794d00 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -43,7 +43,6 @@ import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import 
org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants;
 import 
org.apache.activemq.artemis.core.message.openmbean.MessageOpenTypeFactory;
-import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.server.MessageReference;
@@ -204,7 +203,6 @@ public abstract class AMQPMessage extends RefCountMessage 
implements org.apache.
    protected long messageID;
    protected SimpleString address;
    protected volatile int memoryEstimate = -1;
-   protected volatile int originalEstimate = -1;
    protected long expiration;
    protected boolean expirationReload = false;
    protected long scheduledTime = -1;
@@ -545,13 +543,6 @@ public abstract class AMQPMessage extends RefCountMessage 
implements org.apache.
    protected ApplicationProperties 
lazyDecodeApplicationProperties(ReadableBuffer data) {
       if (applicationProperties == null && applicationPropertiesPosition != 
VALUE_NOT_PRESENT) {
          applicationProperties = scanForMessageSection(data, 
applicationPropertiesPosition, ApplicationProperties.class);
-         if (owner != null && memoryEstimate != -1) {
-            // the memory has already been tracked and needs to be updated to 
reflect the new decoding
-            int addition = 
unmarshalledApplicationPropertiesMemoryEstimateFromData(data);
-            ((PagingStore)owner).addSize(addition, false);
-            final int updatedEstimate = memoryEstimate + addition;
-            memoryEstimate = updatedEstimate;
-         }
       }
 
       return applicationProperties;
@@ -675,7 +666,6 @@ public abstract class AMQPMessage extends RefCountMessage 
implements org.apache.
       }
       encodedHeaderSize = 0;
       memoryEstimate = -1;
-      originalEstimate = -1;
       scheduledTime = -1;
       encodedDeliveryAnnotationsSize = 0;
       headerPosition = VALUE_NOT_PRESENT;
@@ -871,16 +861,6 @@ public abstract class AMQPMessage extends RefCountMessage 
implements org.apache.
    @Override
    public abstract int getMemoryEstimate();
 
-   @Override
-   public int getOriginalEstimate() {
-      if (originalEstimate < 0) {
-         // getMemoryEstimate should initialize originalEstimate
-         return getMemoryEstimate();
-      } else {
-         return originalEstimate;
-      }
-   }
-
    @Override
    public Map<String, Object> toPropertyMap(int valueSizeLimit) {
       return toPropertyMap(false, valueSizeLimit);
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java
index 245a838ad7..178b96f884 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java
@@ -189,7 +189,6 @@ public class AMQPStandardMessage extends AMQPMessage {
    public int getMemoryEstimate() {
       if (memoryEstimate == -1) {
          memoryEstimate = memoryOffset + (data != null ? data.capacity() + 
unmarshalledApplicationPropertiesMemoryEstimateFromData(data) : 0);
-         originalEstimate = memoryEstimate;
       }
 
       return memoryEstimate;
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
index 873485b961..74b5598e0a 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
@@ -47,8 +47,6 @@ public class PagedReferenceImpl extends 
AbstractProtocolReference implements Pag
 
    private int persistedCount;
 
-   private int messageEstimate = -1;
-
    // this is a cached position returned on getPosition.
    // just to avoid creating on object on each call
    PagePosition cachedPositionObject;
@@ -164,14 +162,12 @@ public class PagedReferenceImpl extends 
AbstractProtocolReference implements Pag
 
    @Override
    public int getMessageMemoryEstimate() {
-      if (messageEstimate <= 0) {
-         try {
-            messageEstimate = getMessage().getMemoryEstimate();
-         } catch (Throwable e) {
-            ActiveMQServerLogger.LOGGER.errorCalculateMessageMemoryEstimate(e);
-         }
+      try {
+         return getMessage().getMemoryEstimate();
+      } catch (Throwable e) {
+         ActiveMQServerLogger.LOGGER.errorCalculateMessageMemoryEstimate(e);
+         return 0;
       }
-      return messageEstimate;
    }
 
    @Override
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 249c7e44a9..f1751011ac 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1052,7 +1052,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
             // If an AMQP message parses its properties, its size might be 
updated and the address will receive more bytes.
             // However, in this case, we should always use the original 
estimate.
             // Otherwise, we might get incorrect sizes after the update.
-            
pagingStore.addSize(messageReference.getMessage().getOriginalEstimate(), false, 
false);
+            
pagingStore.addSize(messageReference.getMessage().getMemoryEstimate(), false, 
false);
          }
 
          pagingStore.refUp(messageReference.getMessage(), count);
@@ -1071,7 +1071,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
             // If an AMQP message parses its properties, its size might be 
updated and the address will receive more bytes.
             // However, in this case, we should always use the original 
estimate.
             // Otherwise, we might get incorrect sizes after the update.
-            
pagingStore.addSize(-messageReference.getMessage().getOriginalEstimate(), 
false, false);
+            
pagingStore.addSize(-messageReference.getMessage().getMemoryEstimate(), false, 
false);
          }
          pagingStore.refDown(messageReference.getMessage(), count);
       }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPagingTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPagingTest.java
index 9a41b04b62..d92ebf931a 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPagingTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPagingTest.java
@@ -121,87 +121,4 @@ public class AmqpPagingTest extends AmqpClientTestSupport {
       connection.close();
    }
 
-
-   @TestTemplate
-   @Timeout(60)
-   public void testSizeCalculationsForApplicationProperties() throws Exception 
{
-      final int MSG_SIZE = 1000;
-      final StringBuilder builder = new StringBuilder();
-      for (int i = 0; i < MSG_SIZE; i++) {
-         builder.append('0');
-      }
-      final String data = builder.toString();
-      final int MSG_COUNT = 1;
-
-      AmqpClient client = createAmqpClient();
-      AmqpConnection connection = addConnection(client.connect());
-      AmqpSession session = connection.createSession();
-
-      AmqpSender sender = session.createSender(getQueueName(), true);
-
-      // use selector expression that references a property to force decode of 
application properties
-      AmqpReceiver receiver = session.createReceiver(getQueueName(), "myData 
IS NOT NULL");
-      receiver.setPresettle(true);
-      receiver.flow(10);
-      assertNull(receiver.receiveNoWait(), "somehow the queue had messages 
from a previous test");
-      receiver.flow(0);
-
-      AmqpMessage message = new AmqpMessage();
-      message.setText(data);
-
-      // large message property also
-      message.setApplicationProperty("myData", data);
-
-      if (durable != null) {
-         message.setDurable(durable);
-      }
-      sender.send(message);
-
-      PagingStore pagingStore = 
server.getPagingManager().getPageStore(SimpleString.of(getQueueName()));
-
-      // verify page usage reflects data + 2*application properties (encoded 
and decoded)
-      assertTrue(Wait.waitFor(() -> {
-         return pagingStore.getAddressSize() > 3000;
-      }));
-
-      receiver.flow(MSG_COUNT);
-      AmqpMessage receive = receiver.receive(10, TimeUnit.MINUTES);
-      assertNotNull(receive, "Not received anything after receive");
-      receive.accept();
-
-      assertTrue(Wait.waitFor(() -> {
-         return pagingStore.getAddressSize() == 0;
-      }));
-
-      // send another with duplicate id property, to force early decode
-      message = new AmqpMessage();
-      message.setText(data);
-
-      // ensures application properties are referenced
-      message.setApplicationProperty("_AMQ_DUPL_ID", "1");
-
-      // large message property also
-      message.setApplicationProperty("myData", data);
-
-      if (durable != null) {
-         message.setDurable(durable);
-      }
-      sender.send(message);
-
-      sender.close();
-
-      // verify page usage reflects data + 2*application properties (encoded 
and decoded)
-      assertTrue(Wait.waitFor(() -> {
-         return pagingStore.getAddressSize() > 3000;
-      }));
-
-      receiver.flow(MSG_COUNT);
-      receive = receiver.receive(10, TimeUnit.MINUTES);
-      assertNotNull(receive, "Not received anything after receive");
-      receive.accept();
-
-      receiver.close();
-      connection.close();
-   }
-
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMessageConsumerTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMessageConsumerTest.java
index 060f8862ee..bde0955955 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMessageConsumerTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMessageConsumerTest.java
@@ -35,9 +35,14 @@ import java.lang.invoke.MethodHandles;
 
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.management.AddressControl;
+import org.apache.activemq.artemis.api.core.management.ResourceNames;
+import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.utils.DestinationUtil;
 import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.slf4j.Logger;
@@ -398,4 +403,38 @@ public class JMSMessageConsumerTest extends 
MultiprotocolJMSClientTestSupport {
          consumerConnection.close();
       }
    }
+
+
+   @Test
+   public void testConvertedAndPaging() throws Exception {
+      final int MESSAGE_COUNT = 1;
+      
server.createQueue(QueueConfiguration.of(getQueueName()).setRoutingType(RoutingType.ANYCAST));
+      PagingStore store = 
server.getPagingManager().getPageStore(SimpleString.of(getQueueName()));
+      store.startPaging();
+      try (Connection senderConnection = createConnection(); Connection 
consumerConnection = createCoreConnection()) {
+         Session consumerSession = consumerConnection.createSession(true, 
Session.SESSION_TRANSACTED);
+         MessageConsumer consumer = 
consumerSession.createConsumer(consumerSession.createQueue(getQueueName()));
+
+         Session senderSession = senderConnection.createSession(true, 
Session.SESSION_TRANSACTED);
+         MessageProducer producer = 
senderSession.createProducer(senderSession.createQueue(getQueueName()));
+         for (int i = 0; i < MESSAGE_COUNT; i++) {
+            Message message = senderSession.createMessage();
+            message.setIntProperty("count", i); // test will also pass if this 
is removed
+            producer.send(message);
+         }
+         senderSession.commit();
+
+         for (int i = 0; i < MESSAGE_COUNT; i++) {
+            Message received = consumer.receive(1000);
+            assertNotNull(received);
+         }
+         consumerSession.commit();
+         consumer.close();
+
+         assertEquals(0, server.locateQueue(getQueueName()).getMessageCount());
+         Wait.assertEquals(0, store::getAddressSize, 5000);
+         assertEquals(0, ((AddressControl) 
server.getManagementService().getResource(ResourceNames.ADDRESS + 
getQueueName())).getAddressSize());
+      }
+   }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to