This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 7555319dd0 ARTEMIS-5153 Mark federation events and control queues as 
internal
7555319dd0 is described below

commit 7555319dd01b44536189f34f0c6fc7117cb467ff
Author: Timothy Bish <[email protected]>
AuthorDate: Wed Nov 13 12:28:05 2024 -0500

    ARTEMIS-5153 Mark federation events and control queues as internal
    
    In order to better indicate their nature as broker feature specific queues
    we can mark the temporary queues created for AMQP federation events and
    control link messages as internal.
---
 .../protocol/amqp/broker/AMQPSessionCallback.java  | 26 +++++++++++++++++-----
 .../AMQPFederationCommandDispatcher.java           |  2 +-
 .../federation/AMQPFederationEventDispatcher.java  |  2 +-
 .../amqp/connect/AMQPFederationConnectTest.java    | 19 ++++++++++++++++
 4 files changed, 41 insertions(+), 8 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 819b3311bb..b93a1280bd 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -306,11 +306,15 @@ public class AMQPSessionCallback implements 
SessionCallback {
       createTemporaryQueue(queueName, queueName, routingType, null, 
maxConsumers);
    }
 
+   public void createTemporaryQueue(SimpleString queueName, RoutingType 
routingType, Integer maxConsumers, Boolean internal) throws Exception {
+      createTemporaryQueue(queueName, queueName, routingType, null, 
maxConsumers, internal);
+   }
+
    public void createTemporaryQueue(SimpleString address,
                                     SimpleString queueName,
                                     RoutingType routingType,
                                     SimpleString filter) throws Exception {
-      createTemporaryQueue(address, queueName, routingType, filter, null);
+      createTemporaryQueue(address, queueName, routingType, filter, null, 
null);
    }
 
    public void createTemporaryQueue(SimpleString address,
@@ -318,13 +322,23 @@ public class AMQPSessionCallback implements 
SessionCallback {
                                     RoutingType routingType,
                                     SimpleString filter,
                                     Integer maxConsumers) throws Exception {
+      createTemporaryQueue(address, queueName, routingType, filter, null, 
null);
+   }
+
+   public void createTemporaryQueue(SimpleString address,
+                                    SimpleString queueName,
+                                    RoutingType routingType,
+                                    SimpleString filter,
+                                    Integer maxConsumers,
+                                    Boolean internal) throws Exception {
       try {
          
serverSession.createQueue(QueueConfiguration.of(queueName).setAddress(address)
-                                                                    
.setRoutingType(routingType)
-                                                                    
.setFilterString(filter)
-                                                                    
.setTemporary(true)
-                                                                    
.setDurable(false)
-                                                                    
.setMaxConsumers(maxConsumers));
+                                                                   
.setRoutingType(routingType)
+                                                                   
.setFilterString(filter)
+                                                                   
.setTemporary(true)
+                                                                   
.setDurable(false)
+                                                                   
.setMaxConsumers(maxConsumers)
+                                                                   
.setInternal(internal));
       } catch (ActiveMQSecurityException se) {
          throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingTempDestination(se.getMessage());
       }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationCommandDispatcher.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationCommandDispatcher.java
index c44c75d4c8..b4fbc9fdb5 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationCommandDispatcher.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationCommandDispatcher.java
@@ -125,7 +125,7 @@ public class AMQPFederationCommandDispatcher implements 
SenderController {
       controlAddress = 
federation.prefixControlLinkQueueName(sender.getRemoteTarget().getAddress());
 
       try {
-         
session.createTemporaryQueue(SimpleString.of(getControlLinkAddress()), 
RoutingType.ANYCAST, 1);
+         
session.createTemporaryQueue(SimpleString.of(getControlLinkAddress()), 
RoutingType.ANYCAST, 1, true);
       } catch (Exception e) {
          throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
       }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationEventDispatcher.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationEventDispatcher.java
index d7e92a72c7..9b4f23832b 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationEventDispatcher.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationEventDispatcher.java
@@ -136,7 +136,7 @@ public class AMQPFederationEventDispatcher implements 
SenderController, ActiveMQ
       }
 
       try {
-         session.createTemporaryQueue(SimpleString.of(getEventsLinkAddress()), 
RoutingType.ANYCAST, 1);
+         session.createTemporaryQueue(SimpleString.of(getEventsLinkAddress()), 
RoutingType.ANYCAST, 1, true);
       } catch (Exception e) {
          throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
       }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
index f2b2c61ef4..43bb4cd5d6 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
@@ -47,6 +47,9 @@ import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPF
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.IGNORE_QUEUE_CONSUMER_PRIORITIES;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.allOf;
 
@@ -66,11 +69,13 @@ import javax.jms.JMSException;
 import javax.jms.Session;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
 import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederatedBrokerConnectionElement;
 import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationAddressPolicyElement;
 import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationQueuePolicyElement;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
 import 
org.apache.activemq.artemis.protocol.amqp.connect.federation.ActiveMQServerAMQPFederationPlugin;
 import 
org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo;
 import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
@@ -870,6 +875,13 @@ public class AMQPFederationConnectTest extends 
AmqpClientTestSupport {
 
          Wait.assertTrue(() -> 
server.locateQueue(federationControlSenderAddress) != null);
 
+         final Queue result = 
server.locateQueue(SimpleString.of(federationControlSenderAddress));
+
+         assertNotNull(result);
+         assertTrue(result.isTemporary());
+         assertTrue(result.isInternalQueue());
+         assertEquals(1, result.getMaxConsumers());
+
          // Try and bind to the control address which should be rejected as 
the queue
          // was created with max consumers of one.
          peer.expectAttach().ofSender()
@@ -1027,6 +1039,13 @@ public class AMQPFederationConnectTest extends 
AmqpClientTestSupport {
          // the server to allow sends of events beyond currently available 
credit.
          Wait.assertTrue(() -> 
server.locateQueue(federationEventsSenderAddress) != null);
 
+         final Queue result = 
server.locateQueue(SimpleString.of(federationEventsSenderAddress));
+
+         assertNotNull(result);
+         assertTrue(result.isTemporary());
+         assertTrue(result.isInternalQueue());
+         assertEquals(1, result.getMaxConsumers());
+
          // Try and bind to the events address which should be rejected as the 
queue
          // was created with max consumers of one.
          peer.expectAttach().ofSender()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to