This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new ab3b40c6bd ARTEMIS-5452 Address counts extra messages on hierarchical
topic cases
ab3b40c6bd is described below
commit ab3b40c6bd0dd07f30ba0332c1bdf35707c0907c
Author: Clebert Suconic <[email protected]>
AuthorDate: Fri Apr 25 12:37:56 2025 -0400
ARTEMIS-5452 Address counts extra messages on hierarchical topic cases
---
.../artemis/api/core/QueueConfiguration.java | 7 ++
.../amqp/broker/ProtonProtocolManager.java | 4 +-
.../artemis/core/server/impl/QueueImpl.java | 22 +++----
.../amqp/connect/AMQPMirrorExpiryQueueTest.java | 3 +-
.../integration/amqp/connect/AMQPReplicaTest.java | 77 +++++++++++++++++++++-
.../integration/amqp/connect/AckManagerTest.java | 3 +-
6 files changed, 97 insertions(+), 19 deletions(-)
diff --git
a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueConfiguration.java
b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueConfiguration.java
index db768d7614..83250e1416 100644
---
a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueConfiguration.java
+++
b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueConfiguration.java
@@ -43,6 +43,9 @@ import org.apache.activemq.artemis.utils.JsonLoader;
*/
public class QueueConfiguration implements Serializable {
+ // The prefix for Mirror SNF Queues
+ public static final String MIRROR_ADDRESS = "$ACTIVEMQ_ARTEMIS_MIRROR";
+
private static final long serialVersionUID = 2601016432150225938L;
public static final String ID = "id";
@@ -916,6 +919,10 @@ public class QueueConfiguration implements Serializable {
return true;
}
+ public boolean isMirrorQueue() {
+ return isInternal() && name != null &&
name.toString().startsWith(MIRROR_ADDRESS);
+ }
+
@Override
public int hashCode() {
int result = Objects.hashCode(id);
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
index af3398f243..2b8f5ef09f 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
@@ -26,12 +26,12 @@ import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import
org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationListener;
import
org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager;
@@ -65,7 +65,7 @@ public class ProtonProtocolManager extends
AbstractProtocolManager<AMQPMessage,
private static final List<String> websocketRegistryNames =
Arrays.asList("amqp");
- public static final String MIRROR_ADDRESS = QueueImpl.MIRROR_ADDRESS;
+ public static final String MIRROR_ADDRESS =
QueueConfiguration.MIRROR_ADDRESS;
private final List<AmqpInterceptor> incomingInterceptors = new
ArrayList<>();
private final List<AmqpInterceptor> outgoingInterceptors = new
ArrayList<>();
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 528effc12b..2f077a0ac0 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -141,9 +141,6 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
protected static final int CRITICAL_CONSUMER = 3;
protected static final int CRITICAL_CHECK_DEPAGE = 4;
- // The prefix for Mirror SNF Queues
- public static final String MIRROR_ADDRESS = "$ACTIVEMQ_ARTEMIS_MIRROR";
-
private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final AtomicIntegerFieldUpdater<QueueImpl>
dispatchingUpdater = AtomicIntegerFieldUpdater.newUpdater(QueueImpl.class,
"dispatching");
private static final AtomicLongFieldUpdater<QueueImpl>
dispatchStartTimeUpdater = AtomicLongFieldUpdater.newUpdater(QueueImpl.class,
"dispatchStartTime");
@@ -389,6 +386,9 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
this.createdTimestamp = System.currentTimeMillis();
this.queueConfiguration = QueueConfiguration.of(queueConfiguration);
+
+ this.mirrorController = queueConfiguration.isMirrorQueue();
+
QueueConfigurationUtils.applyStaticDefaults(this.queueConfiguration);
this.refCountForConsumers = this.queueConfiguration.isTransient() ? new
TransientQueueManagerImpl(server, this.queueConfiguration.getName()) : new
QueueManagerImpl(server, this.queueConfiguration.getName());
@@ -747,10 +747,9 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
}
}
if (pagingStore != null) {
- if (owner != null && pagingStore != owner) {
- // If an AMQP message parses its properties, its size might be
updated and the address will receive more bytes.
- // However, in this case, we should always use the original
estimate.
- // Otherwise, we might get incorrect sizes after the update.
+ if (isMirrorController() && owner != null && pagingStore != owner) {
+ // When using mirror in this situation, it means the address
belong to another queue
+ // it's acting as if the message is being copied
pagingStore.addSize(messageReference.getMessage().getOriginalEstimate(), false,
false);
}
@@ -766,10 +765,9 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
owner.addSize(-messageReference.getMessageMemoryEstimate(), false);
}
if (pagingStore != null) {
- if (owner != null && pagingStore != owner) {
- // If an AMQP message parses its properties, its size might be
updated and the address will receive more bytes.
- // However, in this case, we should always use the original
estimate.
- // Otherwise, we might get incorrect sizes after the update.
+ if (isMirrorController() && owner != null && pagingStore != owner) {
+ // When using mirror in this situation, it means the address
belong to another queue
+ // it's acting as if the message is being copied
pagingStore.addSize(-messageReference.getMessage().getOriginalEstimate(),
false, false);
}
pagingStore.refDown(messageReference.getMessage(), count);
@@ -2291,7 +2289,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
return true;
}
- if (isInternalQueue() &&
queueConfiguration.getName().toString().startsWith(MIRROR_ADDRESS)) {
+ if (isMirrorController()) {
logger.trace("Mirror SNF queues are not supposed to expire messages.
Address={}, Queue={}", queueConfiguration.getAddress(),
queueConfiguration.getName());
return true;
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorExpiryQueueTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorExpiryQueueTest.java
index d2825d2411..49dbe75a41 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorExpiryQueueTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorExpiryQueueTest.java
@@ -33,7 +33,6 @@ import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirror
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
@@ -93,7 +92,7 @@ public class AMQPMirrorExpiryQueueTest extends
ActiveMQTestBase {
serverA.createQueue(QueueConfiguration.of(queueName).setName(queueName).setRoutingType(RoutingType.ANYCAST));
Queue expiryA =
serverA.createQueue(QueueConfiguration.of(EXPIRY_QUEUE).setName(EXPIRY_QUEUE).setRoutingType(RoutingType.ANYCAST));
- Queue snfQueue = serverA.locateQueue(QueueImpl.MIRROR_ADDRESS + "_" +
getTestMethodName() + "_willNeverConnect");
+ Queue snfQueue = serverA.locateQueue(QueueConfiguration.MIRROR_ADDRESS +
"_" + getTestMethodName() + "_willNeverConnect");
assertNotNull(snfQueue);
assertNotNull(expiryA);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
index 53b822c0f3..05fe7252b0 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
@@ -31,6 +31,7 @@ import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
+import javax.jms.TextMessage;
import javax.jms.Topic;
import java.lang.invoke.MethodHandles;
@@ -50,6 +51,7 @@ import
org.apache.activemq.artemis.api.core.management.SimpleManagement;
import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
@@ -1410,7 +1412,7 @@ public class AMQPReplicaTest extends
AmqpClientTestSupport {
// We create the address to avoid auto delete on the queue
server_2.addAddressInfo(new
AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
- server_2.createQueue(new
QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));
+
server_2.createQueue(QueueConfiguration.of(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));
ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP",
"tcp://localhost:" + AMQP_PORT_2);
Connection connection = factory.createConnection();
@@ -1463,4 +1465,77 @@ public class AMQPReplicaTest extends
AmqpClientTestSupport {
assertSame(snfreplica, server_2.locateQueue(replica.getMirrorSNF()));
}
+ @Test
+ public void topicHierarchy() throws Exception {
+
+ String brokerConnectionName = "brokerConnectionName:" +
UUIDGenerator.getInstance().generateStringUUID();
+
+ server.setIdentity("sourceServer");
+
+ server_2 = createServer(AMQP_PORT_2, false);
+ server_2.setIdentity("targetServer");
+ server_2.getConfiguration().setName("server2");
+
+ AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration(brokerConnectionName, "tcp://localhost:" +
AMQP_PORT_2).setReconnectAttempts(-1).setRetryInterval(100);
+ AMQPMirrorBrokerConnectionElement replica = new
AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(true).setDurable(true);
+ replica.setName("theReplica");
+ amqpConnection.addElement(replica);
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+
+ server_2.start();
+
+ server.start();
+ Connection connection = null;
+ try {
+ ConnectionFactory cf = CFUtil.createConnectionFactory("AMQP",
"tcp://localhost:" + AMQP_PORT);
+ connection = cf.createConnection();
+ connection.setClientID("myID");
+
+ SimpleString rootName = SimpleString.of("root.#");
+ SimpleString topicAName = SimpleString.of("root.A");
+
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+
+ Topic rootTopic = session.createTopic(rootName.toString());
+
+ MessageConsumer messageConsumer =
session.createDurableConsumer(rootTopic, "iAmDurable");
+
+ MessageProducer producer = session.createProducer(null);
+ TextMessage messageToA = session.createTextMessage("messageToA");
+
+ Topic topicA = session.createTopic(topicAName.toString());
+
+ producer.send(topicA, messageToA);
+ validSizeStore(1, topicAName);
+ validSizeStore(0, rootName);
+
+ connection.start();
+ TextMessage messageReceived1 = (TextMessage)
messageConsumer.receive(5000);
+ assertNotNull(messageReceived1);
+ assertNull(messageConsumer.receiveNoWait());
+
+ validSizeStore(0, topicAName);
+ validSizeStore(0, rootName);
+
+ } finally {
+ // Step 12. Be sure to close our resources!
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+
+ void validSizeStore(int expectedMessages, SimpleString name) throws
Exception {
+ {
+ PagingStoreImpl store = (PagingStoreImpl)
server_2.getPagingManager().getPageStore(name);
+ assertNotNull(store);
+ Wait.assertEquals((long)expectedMessages, () ->
store.getAddressElements(), 5000, 100);
+ }
+
+ {
+ PagingStoreImpl store = (PagingStoreImpl)
server.getPagingManager().getPageStore(name);
+ assertNotNull(store);
+ Wait.assertEquals((long)expectedMessages, () ->
store.getAddressElements(), 5000, 100);
+ }
+ }
}
\ No newline at end of file
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java
index a185b9c4a2..15761c5d04 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java
@@ -46,7 +46,6 @@ import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
-import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import
org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
@@ -507,7 +506,7 @@ public class AckManagerTest extends ActiveMQTestBase {
// this is simulating a mirror connection...
// we play with a direct sender here to make sure flow control is
working as expected when the records are beyond capacity.
- AmqpSender sender = session.createSender(QueueImpl.MIRROR_ADDRESS, true,
new Symbol[]{Symbol.getSymbol("amq.mirror")}, new
Symbol[]{Symbol.getSymbol("amq.mirror")}, properties);
+ AmqpSender sender =
session.createSender(QueueConfiguration.MIRROR_ADDRESS, true, new
Symbol[]{Symbol.getSymbol("amq.mirror")}, new
Symbol[]{Symbol.getSymbol("amq.mirror")}, properties);
AMQPMirrorControllerTarget mirrorControllerTarget =
Wait.assertNotNull(() -> locateMirrorTarget(server1), 5000, 100);
assertEquals(100,
mirrorControllerTarget.getConnection().getProtocolManager().getMirrorMaxPendingAcks());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact