This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push: new 2583ac6d79 ARTEMIS-5523 Fixing race between on estimating AMQP Message Memory Size 2583ac6d79 is described below commit 2583ac6d79b325e8fb2ebc21a9358c054206ae11 Author: Clebert Suconic <clebertsuco...@apache.org> AuthorDate: Thu Jun 5 14:40:18 2025 -0400 ARTEMIS-5523 Fixing race between on estimating AMQP Message Memory Size --- .../apache/activemq/artemis/api/core/Message.java | 3 + .../protocol/amqp/broker/AMQPLargeMessage.java | 2 +- .../artemis/protocol/amqp/broker/AMQPMessage.java | 13 +- .../protocol/amqp/broker/AMQPStandardMessage.java | 2 +- .../artemis/core/server/impl/QueueImpl.java | 1 + .../openwire/amq/ValidateAddressSizeTest.java | 212 +++++++++++++++++++++ 6 files changed, 229 insertions(+), 4 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java index 7bf8fbfbd7..0c87c08d9d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java @@ -801,6 +801,9 @@ public interface Message { int durableDown(); + default void routed() { + } + /** * {@return Returns the message in Map form, useful when encoding to JSON} */ diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java index c43fe56bc6..7aca908ad0 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java @@ -601,7 +601,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage @Override - public int getMemoryEstimate() { + public synchronized int getMemoryEstimate() { if (memoryEstimate == -1) { memoryEstimate = memoryOffset * 2 + (extraProperties != null ? extraProperties.getEncodeSize() : 0); originalEstimate = memoryEstimate; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index dc46ae80ba..e07c38e60b 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -211,6 +211,12 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. protected byte priority = DEFAULT_MESSAGE_PRIORITY; protected boolean isPaged; + protected volatile boolean routed = false; + + @Override + public void routed() { + this.routed = true; + } // The Proton based AMQP message section that are retained in memory, these are the // mutable portions of the Message as the broker sees it, although AMQP defines that @@ -537,7 +543,8 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. return lazyDecodeApplicationProperties(getData().duplicate().position(0)); } - protected ApplicationProperties lazyDecodeApplicationProperties(ReadableBuffer data) { + // need to synchronize access to lazyDecodeApplicationProperties to avoid clashes with getMemoryEstimate + protected synchronized ApplicationProperties lazyDecodeApplicationProperties(ReadableBuffer data) { if (applicationProperties == null && applicationPropertiesPosition != VALUE_NOT_PRESENT) { applicationProperties = scanForMessageSection(data, applicationPropertiesPosition, ApplicationProperties.class); if (owner != null && memoryEstimate != -1) { @@ -546,7 +553,9 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. // it is difficult to track the updates for paged messages // for that reason we won't do it if paged - if (!isPaged) { + // we also only do the update if the message was previously routed + // so if a debug method or an interceptor changed the size before routing we would get a different size + if (!isPaged && routed) { ((PagingStore) owner).addSize(addition, false); final int updatedEstimate = memoryEstimate + addition; memoryEstimate = updatedEstimate; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java index c5f2e14fe5..c514009bce 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java @@ -190,7 +190,7 @@ public class AMQPStandardMessage extends AMQPMessage { } @Override - public int getMemoryEstimate() { + public synchronized int getMemoryEstimate() { if (memoryEstimate == -1) { if (isPaged) { // When the message is paged, we don't take the unmarshalled application properties because it could be diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 2f077a0ac0..8acf1f755d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -744,6 +744,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (count == 1) { if (owner != null) { owner.addSize(messageReference.getMessageMemoryEstimate(), false); + messageReference.getMessage().routed(); } } if (pagingStore != null) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ValidateAddressSizeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ValidateAddressSizeTest.java new file mode 100644 index 0000000000..6e1b096f00 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ValidateAddressSizeTest.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.openwire.amq; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import java.lang.invoke.MethodHandles; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.ActiveMQSession; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl; +import org.apache.activemq.artemis.core.server.RoutingContext; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin; +import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ValidateAddressSizeTest extends BasicOpenWireTest { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + @BeforeEach + @Override + public void setUp() throws Exception { + // making data persistent makes it easier to debug it with print-data + this.realStore = true; + super.setUp(); + } + + + @Override + protected void extraServerConfig(Configuration serverConfig) { + Set<TransportConfiguration> acceptors = serverConfig.getAcceptorConfigurations(); + for (TransportConfiguration tc : acceptors) { + if (tc.getName().equals("netty")) { + tc.getExtraParams().put("virtualTopicConsumerWildcards", "Consumer.*.>;2,C.*.>;2;selectorAware=true"); + } + } + } + + @Test + public void testValidateSizeAfterConsumption() throws Exception { + internalTest(false); + } + + @Test + public void testValidateSizeChangeMessageEstimate() throws Exception { + internalTest(true); + } + + private void internalTest(boolean changeWithPlugin) throws Exception { + ConnectionFactory amqpCF = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:61616"); + ConnectionFactory owCF = CFUtil.createConnectionFactory("OPENWIRE", "tcp://localhost:61616"); + + String topicName = "topicTest"; + + server.addAddressInfo(new AddressInfo(topicName).addRoutingType(RoutingType.MULTICAST)); + + int messageBodySize = 100; + int largeMessageSize = 101 * 1024; + String messageBody = "a".repeat(messageBodySize); + String largeMessageBody = "a".repeat(largeMessageSize); + + int consumers = 10; + int numberOfMessages = 10; + int numberOfLargeMessages = 5; + + ExecutorService executorService = Executors.newFixedThreadPool(consumers); + runAfter(executorService::shutdownNow); + + CyclicBarrier startFlag = new CyclicBarrier(consumers + 1); + + String endbody = "EndNow"; + + CountDownLatch done = new CountDownLatch(consumers); + + AtomicInteger errors = new AtomicInteger(0); + + AtomicBoolean running = new AtomicBoolean(true); + runAfter(() -> running.set(false)); + + for (int i = 1; i <= consumers; i++) { + final int consumerID = i; + executorService.execute(() -> { + try (Connection owConnection = owCF.createConnection()) { + owConnection.setClientID("ow" + consumerID); + Session owSession = owConnection.createSession(false, ActiveMQSession.CLIENT_ACKNOWLEDGE); + owConnection.start(); + MessageConsumer consumer; + Topic topic = owSession.createTopic(topicName); + consumer = owSession.createDurableSubscriber(topic, "cons_" + consumerID); + startFlag.await(10, TimeUnit.SECONDS); + while (running.get()) { + TextMessage message = (TextMessage) consumer.receive(1000); + if (message != null) { + message.acknowledge(); + if (message.getText().equals(endbody)) { + break; + } + } + } + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + errors.incrementAndGet(); + } finally { + done.countDown(); + } + }); + } + + // aligning everybody after consumers are created + startFlag.await(10, TimeUnit.SECONDS); + + PagingStoreImpl store = (PagingStoreImpl) server.getPagingManager().getPageStore(SimpleString.of(topicName)); + assertNotNull(store); + + if (changeWithPlugin) { + server.getBrokerMessagePlugins().add(new ActiveMQServerMessagePlugin() { + @Override + public void beforeMessageRoute(Message message, + RoutingContext context, + boolean direct, + boolean rejectDuplicates) throws ActiveMQException { + // this is introducing a race that could happen + message.getMemoryEstimate(); + message.setOwner(store); + // to force lazy decode + message.getPropertyNames(); + // it's meant to force a toString, what would happen on a debug message + logger.debug("message {}", String.valueOf(message)); + } + }); + } + + try (Connection amqpConnection = amqpCF.createConnection()) { + Session amqpSession = amqpConnection.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = amqpSession.createProducer(amqpSession.createTopic(topicName)); + for (int i = 0; i < numberOfMessages + numberOfLargeMessages; i++) { + TextMessage message; + if (i < numberOfMessages) { + logger.info("message {}", i); + message = amqpSession.createTextMessage(messageBody); + } else { + logger.info("largemessage {}", i); + message = amqpSession.createTextMessage(largeMessageBody); + } + message.setIntProperty("i", i); + message.setStringProperty("myvalue", "2"); + producer.send(message); + } + TextMessage endMessage = amqpSession.createTextMessage(endbody); + endMessage.setStringProperty("end", "theEnd"); + endMessage.setStringProperty("myvalue", "2"); + producer.send(endMessage); + amqpSession.commit(); + } + + assertTrue(done.await(1, TimeUnit.MINUTES)); + assertEquals(0, errors.get()); + running.set(false); + + try { + Wait.assertEquals(0L, store::getAddressSize, 5000, 100); + } catch (Throwable e) { + logger.warn("error -> {}", e.getMessage(), e); + throw e; + } + + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@activemq.apache.org For additional commands, e-mail: commits-h...@activemq.apache.org For further information, visit: https://activemq.apache.org/contact