This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 3bdef0e8e1 ARTEMIS-4512 JMS q consumer can wrongly connect to
multicast queue
3bdef0e8e1 is described below
commit 3bdef0e8e1316fb37223e9ac5fd9e4764e5caa06
Author: Justin Bertram <[email protected]>
AuthorDate: Mon Nov 20 16:16:24 2023 -0600
ARTEMIS-4512 JMS q consumer can wrongly connect to multicast queue
---
.../artemis/core/client/impl/QueueQueryImpl.java | 2 +-
.../activemq/artemis/utils/AutoCreateUtil.java | 16 +++++++++---
.../protocol/amqp/broker/AMQPSessionCallback.java | 4 +++
.../amqp/proton/ProtonServerReceiverContext.java | 2 +-
.../core/server/impl/ServerSessionImpl.java | 11 ++++++--
.../jms/multiprotocol/JMSMessageConsumerTest.java | 30 ++++++++++++++++++++++
.../resources/reload-divert-undeploy-after.xml | 4 +--
.../resources/reload-divert-undeploy-before.xml | 4 +--
.../artemis/tests/smoke/console/QueuesTest.java | 4 +--
9 files changed, 64 insertions(+), 13 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
index 5529448a4c..862624a4cf 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
@@ -103,7 +103,7 @@ public class QueueQueryImpl implements
ClientSession.QueueQuery {
final SimpleString name,
final boolean exists,
final boolean autoCreateQueues) {
- this(durable, temporary, consumerCount, messageCount, filterString,
address, name, exists, autoCreateQueues, -1, false, false,
RoutingType.MULTICAST);
+ this(durable, temporary, consumerCount, messageCount, filterString,
address, name, exists, autoCreateQueues, -1, false, false, null);
}
public QueueQueryImpl(final boolean durable,
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/AutoCreateUtil.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/AutoCreateUtil.java
index 33b2003352..da8f48d6fc 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/AutoCreateUtil.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/AutoCreateUtil.java
@@ -16,18 +16,21 @@
*/
package org.apache.activemq.artemis.utils;
+import java.lang.invoke.MethodHandles;
+
import org.apache.activemq.artemis.api.core.ActiveMQException;
-import static
org.apache.activemq.artemis.api.core.ActiveMQExceptionType.QUEUE_DOES_NOT_EXIST;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSession.AddressQuery;
+import org.apache.activemq.artemis.api.core.client.ClientSession.QueueQuery;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.lang.invoke.MethodHandles;
+
+import static
org.apache.activemq.artemis.api.core.ActiveMQExceptionType.QUEUE_DOES_NOT_EXIST;
/**
* Utility class to create queues 'automatically'.
@@ -58,6 +61,13 @@ public class AutoCreateUtil {
} else {
throw new ActiveMQException("Destination " + destAddress + " does
not exist", QUEUE_DOES_NOT_EXIST);
}
+ } else {
+ QueueQuery queueQueryResult = session.queueQuery(queueName);
+ // the routing type might be null if the server is very old in which
case we default to the old behavior
+ RoutingType routingType = queueQueryResult.getRoutingType();
+ if (routingType != null && routingType != RoutingType.ANYCAST &&
!CompositeAddress.isFullyQualified(destAddress)) {
+ throw new ActiveMQException("Destination " + destAddress + " does
not support JMS queue semantics", QUEUE_DOES_NOT_EXIST);
+ }
}
}
@@ -70,7 +80,7 @@ public class AutoCreateUtil {
* @param filter to apply on the queue
* @param durable if queue is durable
*/
- public static void setRequiredQueueConfigurationIfNotSet(QueueConfiguration
queueConfiguration, ClientSession.AddressQuery addressQuery, RoutingType
routingType, SimpleString filter, boolean durable) {
+ public static void setRequiredQueueConfigurationIfNotSet(QueueConfiguration
queueConfiguration, AddressQuery addressQuery, RoutingType routingType,
SimpleString filter, boolean durable) {
if (queueConfiguration.getRoutingType() == null) {
queueConfiguration.setRoutingType(routingType);
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 8ab49f6b85..d8d1bb05f3 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -766,6 +766,10 @@ public class AMQPSessionCallback implements
SessionCallback {
return
manager.getServer().getAddressSettingsRepository().getMatch(address.toString()).getDefaultAddressRoutingType();
}
+ public RoutingType getRoutingTypeFromPrefix(SimpleString address,
RoutingType defaultRoutingType) {
+ return serverSession.getRoutingTypeFromPrefix(address,
defaultRoutingType);
+ }
+
public void check(SimpleString address, CheckType checkType, SecurityAuth
session) throws Exception {
manager.getServer().getSecurityStore().check(address, checkType,
session);
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index 57e2af6bd8..f8edea304f 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -176,7 +176,7 @@ public class ProtonServerReceiverContext extends
ProtonAbstractReceiver {
}
private RoutingType getDefaultRoutingType(SimpleString address) {
- RoutingType defaultRoutingType =
sessionSPI.getDefaultRoutingType(address);
+ RoutingType defaultRoutingType =
sessionSPI.getRoutingTypeFromPrefix(address,
sessionSPI.getDefaultRoutingType(address));
return defaultRoutingType == null ?
ActiveMQDefaultConfiguration.getDefaultRoutingType() : defaultRoutingType;
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 41c25d25db..d2f7eba3bc 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -1838,7 +1838,8 @@ public class ServerSessionImpl implements ServerSession,
FailureListener {
}
if (queueConfig.getRoutingType() == RoutingType.ANYCAST ||
queueConfig.isFqqn()) {
- if (server.locateQueue(unPrefixedQueue) == null) {
+ Queue q = server.locateQueue(unPrefixedQueue);
+ if (q == null) {
// The queue doesn't exist.
Bindings bindings =
server.getPostOffice().lookupBindingsForAddress(unPrefixedAddress);
if (bindings != null && bindings.hasLocalBinding() &&
!queueConfig.isFqqn()) {
@@ -1858,7 +1859,13 @@ public class ServerSessionImpl implements ServerSession,
FailureListener {
}
} else {
// The queue exists.
- result = AutoCreateResult.EXISTED;
+ if (q.getRoutingType() != RoutingType.ANYCAST &&
!queueConfig.isFqqn()) {
+ // The queue exists, but it does not support the requested
routing type, and it's not FQQN.
+ return AutoCreateResult.NOT_FOUND;
+ } else {
+ // The queue exists, and it supports the requested routing type
or it's FQQN so it doesn't matter.
+ result = AutoCreateResult.EXISTED;
+ }
}
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMessageConsumerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMessageConsumerTest.java
index 988fdc14f0..5db14ed8f0 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMessageConsumerTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMessageConsumerTest.java
@@ -98,6 +98,36 @@ public class JMSMessageConsumerTest extends
MultiprotocolJMSClientTestSupport {
}
}
+ @Test(timeout = 30000)
+ public void testQueueRoutingTypeMismatchCore() throws Exception {
+ testQueueRoutingTypeMismatch(createCoreConnection());
+ }
+
+ @Test(timeout = 30000)
+ public void testQueueRoutingTypeMismatchOpenWire() throws Exception {
+ testQueueRoutingTypeMismatch(createOpenWireConnection());
+ }
+
+ @Test(timeout = 30000)
+ public void testQueueRoutingTypeMismatchAMQP() throws Exception {
+ testQueueRoutingTypeMismatch(createConnection());
+ }
+
+ private void testQueueRoutingTypeMismatch(Connection connection) throws
Exception {
+
server.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(false).setAutoCreateAddresses(false);
+ String name = getTopicName();
+ server.createQueue(new
QueueConfiguration(name).setAddress(name).setRoutingType(RoutingType.MULTICAST).setAutoCreateAddress(true));
+ try {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ session.createConsumer(session.createQueue(name));
+ fail("Should have thrown a JMSException!");
+ } catch (JMSException e) {
+ // expected
+ } finally {
+ connection.close();
+ }
+ }
+
@Test(timeout = 30000)
public void testPriorityAMQPProducerCoreConsumer() throws Exception {
Connection connection = createConnection(); //AMQP
diff --git
a/tests/integration-tests/src/test/resources/reload-divert-undeploy-after.xml
b/tests/integration-tests/src/test/resources/reload-divert-undeploy-after.xml
index c6889a37ef..521382fdb6 100644
---
a/tests/integration-tests/src/test/resources/reload-divert-undeploy-after.xml
+++
b/tests/integration-tests/src/test/resources/reload-divert-undeploy-after.xml
@@ -83,9 +83,9 @@ under the License.
</anycast>
</address>
<address name="target">
- <multicast>
+ <anycast>
<queue name="target"/>
- </multicast>
+ </anycast>
</address>
</addresses>
diff --git
a/tests/integration-tests/src/test/resources/reload-divert-undeploy-before.xml
b/tests/integration-tests/src/test/resources/reload-divert-undeploy-before.xml
index 34d0f8c04d..0df992e5bb 100644
---
a/tests/integration-tests/src/test/resources/reload-divert-undeploy-before.xml
+++
b/tests/integration-tests/src/test/resources/reload-divert-undeploy-before.xml
@@ -83,9 +83,9 @@ under the License.
</anycast>
</address>
<address name="target">
- <multicast>
+ <anycast>
<queue name="target"/>
- </multicast>
+ </anycast>
</address>
</addresses>
diff --git
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/console/QueuesTest.java
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/console/QueuesTest.java
index d4eef874ec..a1cb199f69 100644
---
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/console/QueuesTest.java
+++
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/console/QueuesTest.java
@@ -218,8 +218,8 @@ public class QueuesTest extends ConsoleTest {
createQueueCommand.setUser(SERVER_ADMIN_USERNAME);
createQueueCommand.setPassword(SERVER_ADMIN_PASSWORD);
createQueueCommand.setName(queueName);
- createQueueCommand.setMulticast(true);
- createQueueCommand.setAnycast(false);
+ createQueueCommand.setMulticast(false);
+ createQueueCommand.setAnycast(true);
createQueueCommand.setAutoCreateAddress(true);
createQueueCommand.execute(new ActionContext());