This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 7555319dd0 ARTEMIS-5153 Mark federation events and control queues as
internal
7555319dd0 is described below
commit 7555319dd01b44536189f34f0c6fc7117cb467ff
Author: Timothy Bish <[email protected]>
AuthorDate: Wed Nov 13 12:28:05 2024 -0500
ARTEMIS-5153 Mark federation events and control queues as internal
In order to better indicate their nature as broker feature specific queues
we can mark the temporary queues created for AMQP federation events and
control link messages as internal.
---
.../protocol/amqp/broker/AMQPSessionCallback.java | 26 +++++++++++++++++-----
.../AMQPFederationCommandDispatcher.java | 2 +-
.../federation/AMQPFederationEventDispatcher.java | 2 +-
.../amqp/connect/AMQPFederationConnectTest.java | 19 ++++++++++++++++
4 files changed, 41 insertions(+), 8 deletions(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 819b3311bb..b93a1280bd 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -306,11 +306,15 @@ public class AMQPSessionCallback implements
SessionCallback {
createTemporaryQueue(queueName, queueName, routingType, null,
maxConsumers);
}
+ public void createTemporaryQueue(SimpleString queueName, RoutingType
routingType, Integer maxConsumers, Boolean internal) throws Exception {
+ createTemporaryQueue(queueName, queueName, routingType, null,
maxConsumers, internal);
+ }
+
public void createTemporaryQueue(SimpleString address,
SimpleString queueName,
RoutingType routingType,
SimpleString filter) throws Exception {
- createTemporaryQueue(address, queueName, routingType, filter, null);
+ createTemporaryQueue(address, queueName, routingType, filter, null,
null);
}
public void createTemporaryQueue(SimpleString address,
@@ -318,13 +322,23 @@ public class AMQPSessionCallback implements
SessionCallback {
RoutingType routingType,
SimpleString filter,
Integer maxConsumers) throws Exception {
+ createTemporaryQueue(address, queueName, routingType, filter, null,
null);
+ }
+
+ public void createTemporaryQueue(SimpleString address,
+ SimpleString queueName,
+ RoutingType routingType,
+ SimpleString filter,
+ Integer maxConsumers,
+ Boolean internal) throws Exception {
try {
serverSession.createQueue(QueueConfiguration.of(queueName).setAddress(address)
-
.setRoutingType(routingType)
-
.setFilterString(filter)
-
.setTemporary(true)
-
.setDurable(false)
-
.setMaxConsumers(maxConsumers));
+
.setRoutingType(routingType)
+
.setFilterString(filter)
+
.setTemporary(true)
+
.setDurable(false)
+
.setMaxConsumers(maxConsumers)
+
.setInternal(internal));
} catch (ActiveMQSecurityException se) {
throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingTempDestination(se.getMessage());
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationCommandDispatcher.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationCommandDispatcher.java
index c44c75d4c8..b4fbc9fdb5 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationCommandDispatcher.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationCommandDispatcher.java
@@ -125,7 +125,7 @@ public class AMQPFederationCommandDispatcher implements
SenderController {
controlAddress =
federation.prefixControlLinkQueueName(sender.getRemoteTarget().getAddress());
try {
-
session.createTemporaryQueue(SimpleString.of(getControlLinkAddress()),
RoutingType.ANYCAST, 1);
+
session.createTemporaryQueue(SimpleString.of(getControlLinkAddress()),
RoutingType.ANYCAST, 1, true);
} catch (Exception e) {
throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationEventDispatcher.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationEventDispatcher.java
index d7e92a72c7..9b4f23832b 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationEventDispatcher.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationEventDispatcher.java
@@ -136,7 +136,7 @@ public class AMQPFederationEventDispatcher implements
SenderController, ActiveMQ
}
try {
- session.createTemporaryQueue(SimpleString.of(getEventsLinkAddress()),
RoutingType.ANYCAST, 1);
+ session.createTemporaryQueue(SimpleString.of(getEventsLinkAddress()),
RoutingType.ANYCAST, 1, true);
} catch (Exception e) {
throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
index f2b2c61ef4..43bb4cd5d6 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
@@ -47,6 +47,9 @@ import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPF
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.IGNORE_QUEUE_CONSUMER_PRIORITIES;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.allOf;
@@ -66,11 +69,13 @@ import javax.jms.JMSException;
import javax.jms.Session;
import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederatedBrokerConnectionElement;
import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationAddressPolicyElement;
import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationQueuePolicyElement;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
import
org.apache.activemq.artemis.protocol.amqp.connect.federation.ActiveMQServerAMQPFederationPlugin;
import
org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
@@ -870,6 +875,13 @@ public class AMQPFederationConnectTest extends
AmqpClientTestSupport {
Wait.assertTrue(() ->
server.locateQueue(federationControlSenderAddress) != null);
+ final Queue result =
server.locateQueue(SimpleString.of(federationControlSenderAddress));
+
+ assertNotNull(result);
+ assertTrue(result.isTemporary());
+ assertTrue(result.isInternalQueue());
+ assertEquals(1, result.getMaxConsumers());
+
// Try and bind to the control address which should be rejected as
the queue
// was created with max consumers of one.
peer.expectAttach().ofSender()
@@ -1027,6 +1039,13 @@ public class AMQPFederationConnectTest extends
AmqpClientTestSupport {
// the server to allow sends of events beyond currently available
credit.
Wait.assertTrue(() ->
server.locateQueue(federationEventsSenderAddress) != null);
+ final Queue result =
server.locateQueue(SimpleString.of(federationEventsSenderAddress));
+
+ assertNotNull(result);
+ assertTrue(result.isTemporary());
+ assertTrue(result.isInternalQueue());
+ assertEquals(1, result.getMaxConsumers());
+
// Try and bind to the events address which should be rejected as the
queue
// was created with max consumers of one.
peer.expectAttach().ofSender()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact