This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new ca66028b2a ARTEMIS-4132 AMQP Receiver default to ANYCAST when creating
an address
ca66028b2a is described below
commit ca66028b2a596a4ab5a61ec57633fed3e7b85b22
Author: Timothy Bish <[email protected]>
AuthorDate: Mon Jan 23 12:38:16 2023 -0500
ARTEMIS-4132 AMQP Receiver default to ANYCAST when creating an address
When an AMQP client subscribes to a new address (non-existing) with a
receiver link, the
address is created with routing type ANYCAST regardles of the default
address creation
configuration of the broker, and ignores even the broker wide default of
MULTICAST.
---
.../amqp/proton/ProtonServerSenderContext.java | 12 +-
.../AutoCreateWithDefaultRoutingTypesTest.java | 205 +++++++++++++++++++++
.../amqp/BrokerDefinedAnycastConsumerTest.java | 1 +
.../amqp/ClientDefinedAnycastConsumerTest.java | 42 ++++-
4 files changed, 256 insertions(+), 4 deletions(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 98d62c46cb..40af705c38 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -25,6 +25,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
+
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import
org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached;
@@ -960,7 +962,6 @@ public class ProtonServerSenderContext extends
ProtonInitializable implements Pr
SimpleString tempQueueName;
String selector;
- private final RoutingType defaultRoutingType = RoutingType.ANYCAST;
private RoutingType routingTypeToUse = RoutingType.ANYCAST;
private boolean isVolatile = false;
@@ -1110,8 +1111,15 @@ public class ProtonServerSenderContext extends
ProtonInitializable implements Pr
} else {
// if not we look up the address
AddressQueryResult addressQueryResult = null;
+
+ // Set this to the broker configured default for the address
prior to the lookup so that
+ // an auto create will actually use the configured defaults.
The actual query result will
+ // contain the true answer on what routing type the address
actually has though.
+ final RoutingType routingType =
sessionSPI.getDefaultRoutingType(addressToUse);
+ routingTypeToUse = routingType == null ?
ActiveMQDefaultConfiguration.getDefaultRoutingType() : routingType;
+
try {
- addressQueryResult = sessionSPI.addressQuery(addressToUse,
defaultRoutingType, true);
+ addressQueryResult = sessionSPI.addressQuery(addressToUse,
routingTypeToUse, true);
} catch (ActiveMQSecurityException e) {
throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(e.getMessage());
} catch (ActiveMQAMQPException e) {
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AutoCreateWithDefaultRoutingTypesTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AutoCreateWithDefaultRoutingTypesTest.java
new file mode 100644
index 0000000000..d62293233e
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AutoCreateWithDefaultRoutingTypesTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import static
org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.QUEUE_CAPABILITY;
+import static
org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.TOPIC_CAPABILITY;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.AddressQueryResult;
+import org.apache.activemq.artemis.core.server.JournalType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(Parameterized.class)
+public class AutoCreateWithDefaultRoutingTypesTest extends
JMSClientTestSupport {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ @Parameterized.Parameters(name = "routingType={0}")
+ public static Collection<Object[]> parameters() {
+ return Arrays.asList(new Object[][] {
+ {RoutingType.ANYCAST}, {RoutingType.MULTICAST}
+ });
+ }
+
+ @Parameterized.Parameter(0)
+ public RoutingType routingType;
+
+ @Override
+ protected String getConfiguredProtocols() {
+ return "AMQP";
+ }
+
+ @Override
+ protected void createAddressAndQueues(ActiveMQServer server) throws
Exception {
+ // Don't create anything by default since we are testing auto create
+ }
+
+ @Override
+ protected void configureAddressPolicy(ActiveMQServer server) {
+ Configuration serverConfig = server.getConfiguration();
+ serverConfig.setJournalType(JournalType.NIO);
+ Map<String, AddressSettings> map = serverConfig.getAddressSettings();
+ if (map.size() == 0) {
+ AddressSettings as = new AddressSettings();
+ map.put("#", as);
+ }
+ Map.Entry<String, AddressSettings> entry =
map.entrySet().iterator().next();
+ AddressSettings settings = entry.getValue();
+ settings.setAutoCreateQueues(true);
+ settings.setDefaultAddressRoutingType(routingType);
+ settings.setDefaultQueueRoutingType(routingType);
+ logger.info("server config, isauto? {}",
entry.getValue().isAutoCreateQueues());
+ logger.info("server config, default queue routing type? {}",
entry.getValue().getDefaultQueueRoutingType());
+ logger.info("server config, default address routing type? {}",
entry.getValue().getDefaultAddressRoutingType());
+ }
+
+ @Test(timeout = 30_000)
+ public void testCreateSender() throws Exception {
+ final String addressName = "sender-address";
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createSender(addressName);
+
+ AddressQueryResult address = getProxyToAddress(addressName);
+
+ assertNotNull(address);
+ assertEquals(Set.of(routingType), address.getRoutingTypes());
+
+ sender.close();
+ connection.close();
+ }
+
+ @Test(timeout = 30_000)
+ public void testCreateReceiver() throws Exception {
+ final String addressName = "receiver-address";
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpReceiver receiver = session.createReceiver(addressName);
+
+ AddressQueryResult address = getProxyToAddress(addressName);
+
+ assertNotNull(address);
+ assertEquals(Set.of(routingType), address.getRoutingTypes());
+
+ receiver.close();
+ connection.close();
+ }
+
+ @Test(timeout = 30_000)
+ public void testCreateSenderThatRequestsMultiCast() throws Exception {
+ dotestCreateSenderThatRequestsSpecificRoutingType(RoutingType.MULTICAST);
+ }
+
+ @Test(timeout = 30_000)
+ public void testCreateSenderThatRequestsAnyCast() throws Exception {
+ dotestCreateSenderThatRequestsSpecificRoutingType(RoutingType.ANYCAST);
+ }
+
+ private void dotestCreateSenderThatRequestsSpecificRoutingType(RoutingType
routingType) throws Exception {
+ final String addressName = "sender-defined-address";
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ Target target = new Target();
+ target.setAddress(addressName);
+ if (routingType == RoutingType.ANYCAST) {
+ target.setCapabilities(QUEUE_CAPABILITY);
+ } else {
+ target.setCapabilities(TOPIC_CAPABILITY);
+ }
+
+ AmqpSender sender = session.createSender(target);
+
+ AddressQueryResult address = getProxyToAddress(addressName);
+
+ assertNotNull(address);
+ assertEquals(Set.of(routingType), address.getRoutingTypes());
+
+ sender.close();
+ connection.close();
+ }
+
+ @Test(timeout = 30_000)
+ public void testCreateReceiverThatRequestsMultiCast() throws Exception {
+
dotestCreateReceiverThatRequestsSpecificRoutingType(RoutingType.MULTICAST);
+ }
+
+ @Test(timeout = 30_000)
+ public void testCreateReceiverThatRequestsAnyCast() throws Exception {
+ dotestCreateReceiverThatRequestsSpecificRoutingType(RoutingType.ANYCAST);
+ }
+
+ private void
dotestCreateReceiverThatRequestsSpecificRoutingType(RoutingType routingType)
throws Exception {
+ final String addressName = "receiver-defined-address";
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ Source source = new Source();
+ source.setAddress(addressName);
+ if (routingType == RoutingType.ANYCAST) {
+ source.setCapabilities(QUEUE_CAPABILITY);
+ } else {
+ source.setCapabilities(TOPIC_CAPABILITY);
+ }
+
+ AmqpReceiver receiver = session.createReceiver(source);
+
+ AddressQueryResult address = getProxyToAddress(addressName);
+
+ assertNotNull(address);
+ assertEquals(Set.of(routingType), address.getRoutingTypes());
+
+ receiver.close();
+ connection.close();
+ }
+
+ public AddressQueryResult getProxyToAddress(String addressName) throws
Exception {
+ return server.addressQuery(SimpleString.toSimpleString(addressName));
+ }
+}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java
index 00b48a19f4..927ec3ec5d 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java
@@ -197,6 +197,7 @@ public class BrokerDefinedAnycastConsumerTest extends
AmqpClientTestSupport {
server.getAddressSettingsRepository().clear();
AddressSettings settings = new AddressSettings();
settings.setAutoCreateAddresses(true);
+ settings.setDefaultAddressRoutingType(RoutingType.ANYCAST);
server.getAddressSettingsRepository().addMatch(address.toString(),
settings);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedAnycastConsumerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedAnycastConsumerTest.java
index 3e504d7895..66c4b106b8 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedAnycastConsumerTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedAnycastConsumerTest.java
@@ -16,15 +16,23 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
+import static
org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.QUEUE_CAPABILITY;
+import static
org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.TOPIC_CAPABILITY;
+
import java.util.concurrent.TimeUnit;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.Bindings;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.messaging.Source;
import org.junit.Test;
public class ClientDefinedAnycastConsumerTest extends AmqpClientTestSupport {
@@ -33,12 +41,15 @@ public class ClientDefinedAnycastConsumerTest extends
AmqpClientTestSupport {
@Test(timeout = 60000)
public void testConsumeFromSingleQueueOnAddressSameName() throws Exception {
-
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
- AmqpReceiver receiver = session.createReceiver(address.toString());
+ Source source = new Source();
+ source.setAddress(address.toString());
+ source.setCapabilities(QUEUE_CAPABILITY);
+
+ AmqpReceiver receiver = session.createReceiver(source);
sendMessages(address.toString(), 1);
receiver.flow(1);
AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
@@ -48,4 +59,31 @@ public class ClientDefinedAnycastConsumerTest extends
AmqpClientTestSupport {
receiver.close();
connection.close();
}
+
+ @Test(timeout = 60000)
+ public void testConsumeFromSingleQueueOnAddressSameNameNegativeValidation()
throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ Source source = new Source();
+ source.setAddress(address.toString());
+ source.setCapabilities(TOPIC_CAPABILITY);
+
+ AmqpReceiver receiver = session.createReceiver(source);
+ sendMessages(address.toString(), 1);
+ receiver.flow(1);
+ AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(amqpMessage);
+ Bindings bindings =
server.getPostOffice().getBindingsForAddress(address);
+ assertEquals(1, bindings.getBindings().size());
+ bindings.getBindings().forEach((binding) -> {
+ final Queue localQueue = ((LocalQueueBinding) binding).getQueue();
+ assertEquals(1, localQueue.getConsumerCount());
+ assertEquals(RoutingType.MULTICAST, localQueue.getRoutingType());
+ });
+
+ receiver.close();
+ connection.close();
+ }
}