This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new ab3b40c6bd ARTEMIS-5452 Address counts extra messages on hierarchical 
topic cases
ab3b40c6bd is described below

commit ab3b40c6bd0dd07f30ba0332c1bdf35707c0907c
Author: Clebert Suconic <[email protected]>
AuthorDate: Fri Apr 25 12:37:56 2025 -0400

    ARTEMIS-5452 Address counts extra messages on hierarchical topic cases
---
 .../artemis/api/core/QueueConfiguration.java       |  7 ++
 .../amqp/broker/ProtonProtocolManager.java         |  4 +-
 .../artemis/core/server/impl/QueueImpl.java        | 22 +++----
 .../amqp/connect/AMQPMirrorExpiryQueueTest.java    |  3 +-
 .../integration/amqp/connect/AMQPReplicaTest.java  | 77 +++++++++++++++++++++-
 .../integration/amqp/connect/AckManagerTest.java   |  3 +-
 6 files changed, 97 insertions(+), 19 deletions(-)

diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueConfiguration.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueConfiguration.java
index db768d7614..83250e1416 100644
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueConfiguration.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueConfiguration.java
@@ -43,6 +43,9 @@ import org.apache.activemq.artemis.utils.JsonLoader;
  */
 public class QueueConfiguration implements Serializable {
 
+   // The prefix for Mirror SNF Queues
+   public static final String MIRROR_ADDRESS = "$ACTIVEMQ_ARTEMIS_MIRROR";
+
    private static final long serialVersionUID = 2601016432150225938L;
 
    public static final String ID = "id";
@@ -916,6 +919,10 @@ public class QueueConfiguration implements Serializable {
       return true;
    }
 
+   public boolean isMirrorQueue() {
+      return isInternal() && name != null && 
name.toString().startsWith(MIRROR_ADDRESS);
+   }
+
    @Override
    public int hashCode() {
       int result = Objects.hashCode(id);
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
index af3398f243..2b8f5ef09f 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
@@ -26,12 +26,12 @@ import java.util.concurrent.Executor;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import 
org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.impl.QueueImpl;
 import org.apache.activemq.artemis.core.server.management.Notification;
 import org.apache.activemq.artemis.core.server.management.NotificationListener;
 import 
org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager;
@@ -65,7 +65,7 @@ public class ProtonProtocolManager extends 
AbstractProtocolManager<AMQPMessage,
 
    private static final List<String> websocketRegistryNames = 
Arrays.asList("amqp");
 
-   public static final String MIRROR_ADDRESS = QueueImpl.MIRROR_ADDRESS;
+   public static final String MIRROR_ADDRESS = 
QueueConfiguration.MIRROR_ADDRESS;
 
    private final List<AmqpInterceptor> incomingInterceptors = new 
ArrayList<>();
    private final List<AmqpInterceptor> outgoingInterceptors = new 
ArrayList<>();
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 528effc12b..2f077a0ac0 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -141,9 +141,6 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
    protected static final int CRITICAL_CONSUMER = 3;
    protected static final int CRITICAL_CHECK_DEPAGE = 4;
 
-   // The prefix for Mirror SNF Queues
-   public static final String MIRROR_ADDRESS = "$ACTIVEMQ_ARTEMIS_MIRROR";
-
    private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final AtomicIntegerFieldUpdater<QueueImpl> 
dispatchingUpdater = AtomicIntegerFieldUpdater.newUpdater(QueueImpl.class, 
"dispatching");
    private static final AtomicLongFieldUpdater<QueueImpl> 
dispatchStartTimeUpdater = AtomicLongFieldUpdater.newUpdater(QueueImpl.class, 
"dispatchStartTime");
@@ -389,6 +386,9 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
       this.createdTimestamp = System.currentTimeMillis();
 
       this.queueConfiguration = QueueConfiguration.of(queueConfiguration);
+
+      this.mirrorController = queueConfiguration.isMirrorQueue();
+
       QueueConfigurationUtils.applyStaticDefaults(this.queueConfiguration);
 
       this.refCountForConsumers = this.queueConfiguration.isTransient() ? new 
TransientQueueManagerImpl(server, this.queueConfiguration.getName()) : new 
QueueManagerImpl(server, this.queueConfiguration.getName());
@@ -747,10 +747,9 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
          }
       }
       if (pagingStore != null) {
-         if (owner != null && pagingStore != owner) {
-            // If an AMQP message parses its properties, its size might be 
updated and the address will receive more bytes.
-            // However, in this case, we should always use the original 
estimate.
-            // Otherwise, we might get incorrect sizes after the update.
+         if (isMirrorController() && owner != null && pagingStore != owner) {
+            // When using mirror in this situation, it means the address 
belong to another queue
+            // it's acting as if the message is being copied
             
pagingStore.addSize(messageReference.getMessage().getOriginalEstimate(), false, 
false);
          }
 
@@ -766,10 +765,9 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
          owner.addSize(-messageReference.getMessageMemoryEstimate(), false);
       }
       if (pagingStore != null) {
-         if (owner != null && pagingStore != owner) {
-            // If an AMQP message parses its properties, its size might be 
updated and the address will receive more bytes.
-            // However, in this case, we should always use the original 
estimate.
-            // Otherwise, we might get incorrect sizes after the update.
+         if (isMirrorController() && owner != null && pagingStore != owner) {
+            // When using mirror in this situation, it means the address 
belong to another queue
+            // it's acting as if the message is being copied
             
pagingStore.addSize(-messageReference.getMessage().getOriginalEstimate(), 
false, false);
          }
          pagingStore.refDown(messageReference.getMessage(), count);
@@ -2291,7 +2289,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
          return true;
       }
 
-      if (isInternalQueue() && 
queueConfiguration.getName().toString().startsWith(MIRROR_ADDRESS)) {
+      if (isMirrorController()) {
          logger.trace("Mirror SNF queues are not supposed to expire messages. 
Address={}, Queue={}", queueConfiguration.getAddress(), 
queueConfiguration.getName());
          return true;
       }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorExpiryQueueTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorExpiryQueueTest.java
index d2825d2411..49dbe75a41 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorExpiryQueueTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorExpiryQueueTest.java
@@ -33,7 +33,6 @@ import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirror
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.impl.QueueImpl;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.tests.util.CFUtil;
@@ -93,7 +92,7 @@ public class AMQPMirrorExpiryQueueTest extends 
ActiveMQTestBase {
 
       
serverA.createQueue(QueueConfiguration.of(queueName).setName(queueName).setRoutingType(RoutingType.ANYCAST));
       Queue expiryA = 
serverA.createQueue(QueueConfiguration.of(EXPIRY_QUEUE).setName(EXPIRY_QUEUE).setRoutingType(RoutingType.ANYCAST));
-      Queue snfQueue = serverA.locateQueue(QueueImpl.MIRROR_ADDRESS + "_" + 
getTestMethodName() + "_willNeverConnect");
+      Queue snfQueue = serverA.locateQueue(QueueConfiguration.MIRROR_ADDRESS + 
"_" + getTestMethodName() + "_willNeverConnect");
       assertNotNull(snfQueue);
       assertNotNull(expiryA);
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
index 53b822c0f3..05fe7252b0 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
@@ -31,6 +31,7 @@ import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
+import javax.jms.TextMessage;
 import javax.jms.Topic;
 
 import java.lang.invoke.MethodHandles;
@@ -50,6 +51,7 @@ import 
org.apache.activemq.artemis.api.core.management.SimpleManagement;
 import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
 import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
 import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
@@ -1410,7 +1412,7 @@ public class AMQPReplicaTest extends 
AmqpClientTestSupport {
 
       // We create the address to avoid auto delete on the queue
       server_2.addAddressInfo(new 
AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
-      server_2.createQueue(new 
QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));
+      
server_2.createQueue(QueueConfiguration.of(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));
 
       ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", 
"tcp://localhost:" + AMQP_PORT_2);
       Connection connection = factory.createConnection();
@@ -1463,4 +1465,77 @@ public class AMQPReplicaTest extends 
AmqpClientTestSupport {
       assertSame(snfreplica, server_2.locateQueue(replica.getMirrorSNF()));
    }
 
+   @Test
+   public void topicHierarchy() throws Exception {
+
+      String brokerConnectionName = "brokerConnectionName:" + 
UUIDGenerator.getInstance().generateStringUUID();
+
+      server.setIdentity("sourceServer");
+
+      server_2 = createServer(AMQP_PORT_2, false);
+      server_2.setIdentity("targetServer");
+      server_2.getConfiguration().setName("server2");
+
+      AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration(brokerConnectionName, "tcp://localhost:" + 
AMQP_PORT_2).setReconnectAttempts(-1).setRetryInterval(100);
+      AMQPMirrorBrokerConnectionElement replica = new 
AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(true).setDurable(true);
+      replica.setName("theReplica");
+      amqpConnection.addElement(replica);
+      server.getConfiguration().addAMQPConnection(amqpConnection);
+
+      server_2.start();
+
+      server.start();
+      Connection connection = null;
+      try {
+         ConnectionFactory cf = CFUtil.createConnectionFactory("AMQP", 
"tcp://localhost:" + AMQP_PORT);
+         connection = cf.createConnection();
+         connection.setClientID("myID");
+
+         SimpleString rootName = SimpleString.of("root.#");
+         SimpleString topicAName = SimpleString.of("root.A");
+
+         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+         Topic rootTopic = session.createTopic(rootName.toString());
+
+         MessageConsumer messageConsumer = 
session.createDurableConsumer(rootTopic, "iAmDurable");
+
+         MessageProducer producer = session.createProducer(null);
+         TextMessage messageToA = session.createTextMessage("messageToA");
+
+         Topic topicA = session.createTopic(topicAName.toString());
+
+         producer.send(topicA, messageToA);
+         validSizeStore(1, topicAName);
+         validSizeStore(0, rootName);
+
+         connection.start();
+         TextMessage messageReceived1 = (TextMessage) 
messageConsumer.receive(5000);
+         assertNotNull(messageReceived1);
+         assertNull(messageConsumer.receiveNoWait());
+
+         validSizeStore(0, topicAName);
+         validSizeStore(0, rootName);
+
+      } finally {
+         // Step 12. Be sure to close our resources!
+         if (connection != null) {
+            connection.close();
+         }
+      }
+   }
+
+   void validSizeStore(int expectedMessages, SimpleString name) throws 
Exception {
+      {
+         PagingStoreImpl store = (PagingStoreImpl) 
server_2.getPagingManager().getPageStore(name);
+         assertNotNull(store);
+         Wait.assertEquals((long)expectedMessages, () -> 
store.getAddressElements(), 5000, 100);
+      }
+
+      {
+         PagingStoreImpl store = (PagingStoreImpl) 
server.getPagingManager().getPageStore(name);
+         assertNotNull(store);
+         Wait.assertEquals((long)expectedMessages, () -> 
store.getAddressElements(), 5000, 100);
+      }
+   }
 }
\ No newline at end of file
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java
index a185b9c4a2..15761c5d04 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java
@@ -46,7 +46,6 @@ import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
-import org.apache.activemq.artemis.core.server.impl.QueueImpl;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
 import 
org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
@@ -507,7 +506,7 @@ public class AckManagerTest extends ActiveMQTestBase {
 
       // this is simulating a mirror connection...
       // we play with a direct sender here to make sure flow control is 
working as expected when the records are beyond capacity.
-      AmqpSender sender = session.createSender(QueueImpl.MIRROR_ADDRESS, true, 
new Symbol[]{Symbol.getSymbol("amq.mirror")}, new 
Symbol[]{Symbol.getSymbol("amq.mirror")}, properties);
+      AmqpSender sender = 
session.createSender(QueueConfiguration.MIRROR_ADDRESS, true, new 
Symbol[]{Symbol.getSymbol("amq.mirror")}, new 
Symbol[]{Symbol.getSymbol("amq.mirror")}, properties);
 
       AMQPMirrorControllerTarget mirrorControllerTarget = 
Wait.assertNotNull(() -> locateMirrorTarget(server1), 5000, 100);
       assertEquals(100, 
mirrorControllerTarget.getConnection().getProtocolManager().getMirrorMaxPendingAcks());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to