This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new ca66028b2a ARTEMIS-4132 AMQP Receiver default to ANYCAST when creating 
an address
ca66028b2a is described below

commit ca66028b2a596a4ab5a61ec57633fed3e7b85b22
Author: Timothy Bish <[email protected]>
AuthorDate: Mon Jan 23 12:38:16 2023 -0500

    ARTEMIS-4132 AMQP Receiver default to ANYCAST when creating an address
    
    When an AMQP client subscribes to a new address (non-existing) with a 
receiver link, the
    address is created with routing type ANYCAST regardles of the default 
address creation
    configuration of the broker, and ignores even the broker wide default of 
MULTICAST.
---
 .../amqp/proton/ProtonServerSenderContext.java     |  12 +-
 .../AutoCreateWithDefaultRoutingTypesTest.java     | 205 +++++++++++++++++++++
 .../amqp/BrokerDefinedAnycastConsumerTest.java     |   1 +
 .../amqp/ClientDefinedAnycastConsumerTest.java     |  42 ++++-
 4 files changed, 256 insertions(+), 4 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 98d62c46cb..40af705c38 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -25,6 +25,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
+
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import 
org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached;
@@ -960,7 +962,6 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
       SimpleString tempQueueName;
       String selector;
 
-      private final RoutingType defaultRoutingType = RoutingType.ANYCAST;
       private RoutingType routingTypeToUse = RoutingType.ANYCAST;
 
       private boolean isVolatile = false;
@@ -1110,8 +1111,15 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
             } else {
                // if not we look up the address
                AddressQueryResult addressQueryResult = null;
+
+               // Set this to the broker configured default for the address 
prior to the lookup so that
+               // an auto create will actually use the configured defaults.  
The actual query result will
+               // contain the true answer on what routing type the address 
actually has though.
+               final RoutingType routingType = 
sessionSPI.getDefaultRoutingType(addressToUse);
+               routingTypeToUse = routingType == null ? 
ActiveMQDefaultConfiguration.getDefaultRoutingType() : routingType;
+
                try {
-                  addressQueryResult = sessionSPI.addressQuery(addressToUse, 
defaultRoutingType, true);
+                  addressQueryResult = sessionSPI.addressQuery(addressToUse, 
routingTypeToUse, true);
                } catch (ActiveMQSecurityException e) {
                   throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(e.getMessage());
                } catch (ActiveMQAMQPException e) {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AutoCreateWithDefaultRoutingTypesTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AutoCreateWithDefaultRoutingTypesTest.java
new file mode 100644
index 0000000000..d62293233e
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AutoCreateWithDefaultRoutingTypesTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import static 
org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.QUEUE_CAPABILITY;
+import static 
org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.TOPIC_CAPABILITY;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.AddressQueryResult;
+import org.apache.activemq.artemis.core.server.JournalType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(Parameterized.class)
+public class AutoCreateWithDefaultRoutingTypesTest extends 
JMSClientTestSupport {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   @Parameterized.Parameters(name = "routingType={0}")
+   public static Collection<Object[]> parameters() {
+      return Arrays.asList(new Object[][] {
+         {RoutingType.ANYCAST}, {RoutingType.MULTICAST}
+      });
+   }
+
+   @Parameterized.Parameter(0)
+   public RoutingType routingType;
+
+   @Override
+   protected String getConfiguredProtocols() {
+      return "AMQP";
+   }
+
+   @Override
+   protected void createAddressAndQueues(ActiveMQServer server) throws 
Exception {
+      // Don't create anything by default since we are testing auto create
+   }
+
+   @Override
+   protected void configureAddressPolicy(ActiveMQServer server) {
+      Configuration serverConfig = server.getConfiguration();
+      serverConfig.setJournalType(JournalType.NIO);
+      Map<String, AddressSettings> map = serverConfig.getAddressSettings();
+      if (map.size() == 0) {
+         AddressSettings as = new AddressSettings();
+         map.put("#", as);
+      }
+      Map.Entry<String, AddressSettings> entry = 
map.entrySet().iterator().next();
+      AddressSettings settings = entry.getValue();
+      settings.setAutoCreateQueues(true);
+      settings.setDefaultAddressRoutingType(routingType);
+      settings.setDefaultQueueRoutingType(routingType);
+      logger.info("server config, isauto? {}", 
entry.getValue().isAutoCreateQueues());
+      logger.info("server config, default queue routing type? {}", 
entry.getValue().getDefaultQueueRoutingType());
+      logger.info("server config, default address routing type? {}", 
entry.getValue().getDefaultAddressRoutingType());
+   }
+
+   @Test(timeout = 30_000)
+   public void testCreateSender() throws Exception {
+      final String addressName = "sender-address";
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpSender sender = session.createSender(addressName);
+
+      AddressQueryResult address = getProxyToAddress(addressName);
+
+      assertNotNull(address);
+      assertEquals(Set.of(routingType), address.getRoutingTypes());
+
+      sender.close();
+      connection.close();
+   }
+
+   @Test(timeout = 30_000)
+   public void testCreateReceiver() throws Exception {
+      final String addressName = "receiver-address";
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver(addressName);
+
+      AddressQueryResult address = getProxyToAddress(addressName);
+
+      assertNotNull(address);
+      assertEquals(Set.of(routingType), address.getRoutingTypes());
+
+      receiver.close();
+      connection.close();
+   }
+
+   @Test(timeout = 30_000)
+   public void testCreateSenderThatRequestsMultiCast() throws Exception {
+      dotestCreateSenderThatRequestsSpecificRoutingType(RoutingType.MULTICAST);
+   }
+
+   @Test(timeout = 30_000)
+   public void testCreateSenderThatRequestsAnyCast() throws Exception {
+      dotestCreateSenderThatRequestsSpecificRoutingType(RoutingType.ANYCAST);
+   }
+
+   private void dotestCreateSenderThatRequestsSpecificRoutingType(RoutingType 
routingType) throws Exception {
+      final String addressName = "sender-defined-address";
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      Target target = new Target();
+      target.setAddress(addressName);
+      if (routingType == RoutingType.ANYCAST) {
+         target.setCapabilities(QUEUE_CAPABILITY);
+      } else {
+         target.setCapabilities(TOPIC_CAPABILITY);
+      }
+
+      AmqpSender sender = session.createSender(target);
+
+      AddressQueryResult address = getProxyToAddress(addressName);
+
+      assertNotNull(address);
+      assertEquals(Set.of(routingType), address.getRoutingTypes());
+
+      sender.close();
+      connection.close();
+   }
+
+   @Test(timeout = 30_000)
+   public void testCreateReceiverThatRequestsMultiCast() throws Exception {
+      
dotestCreateReceiverThatRequestsSpecificRoutingType(RoutingType.MULTICAST);
+   }
+
+   @Test(timeout = 30_000)
+   public void testCreateReceiverThatRequestsAnyCast() throws Exception {
+      dotestCreateReceiverThatRequestsSpecificRoutingType(RoutingType.ANYCAST);
+   }
+
+   private void 
dotestCreateReceiverThatRequestsSpecificRoutingType(RoutingType routingType) 
throws Exception {
+      final String addressName = "receiver-defined-address";
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      Source source = new Source();
+      source.setAddress(addressName);
+      if (routingType == RoutingType.ANYCAST) {
+         source.setCapabilities(QUEUE_CAPABILITY);
+      } else {
+         source.setCapabilities(TOPIC_CAPABILITY);
+      }
+
+      AmqpReceiver receiver = session.createReceiver(source);
+
+      AddressQueryResult address = getProxyToAddress(addressName);
+
+      assertNotNull(address);
+      assertEquals(Set.of(routingType), address.getRoutingTypes());
+
+      receiver.close();
+      connection.close();
+   }
+
+   public AddressQueryResult getProxyToAddress(String addressName) throws 
Exception {
+      return server.addressQuery(SimpleString.toSimpleString(addressName));
+   }
+}
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java
index 00b48a19f4..927ec3ec5d 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java
@@ -197,6 +197,7 @@ public class BrokerDefinedAnycastConsumerTest extends 
AmqpClientTestSupport  {
       server.getAddressSettingsRepository().clear();
       AddressSettings settings = new AddressSettings();
       settings.setAutoCreateAddresses(true);
+      settings.setDefaultAddressRoutingType(RoutingType.ANYCAST);
       server.getAddressSettingsRepository().addMatch(address.toString(), 
settings);
       AmqpClient client = createAmqpClient();
       AmqpConnection connection = addConnection(client.connect());
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedAnycastConsumerTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedAnycastConsumerTest.java
index 3e504d7895..66c4b106b8 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedAnycastConsumerTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedAnycastConsumerTest.java
@@ -16,15 +16,23 @@
  */
 package org.apache.activemq.artemis.tests.integration.amqp;
 
+import static 
org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.QUEUE_CAPABILITY;
+import static 
org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.TOPIC_CAPABILITY;
+
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.Bindings;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.impl.QueueImpl;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
 import org.apache.activemq.transport.amqp.client.AmqpMessage;
 import org.apache.activemq.transport.amqp.client.AmqpReceiver;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.messaging.Source;
 import org.junit.Test;
 
 public class ClientDefinedAnycastConsumerTest  extends AmqpClientTestSupport  {
@@ -33,12 +41,15 @@ public class ClientDefinedAnycastConsumerTest  extends 
AmqpClientTestSupport  {
 
    @Test(timeout = 60000)
    public void testConsumeFromSingleQueueOnAddressSameName() throws Exception {
-
       AmqpClient client = createAmqpClient();
       AmqpConnection connection = addConnection(client.connect());
       AmqpSession session = connection.createSession();
 
-      AmqpReceiver receiver = session.createReceiver(address.toString());
+      Source source = new Source();
+      source.setAddress(address.toString());
+      source.setCapabilities(QUEUE_CAPABILITY);
+
+      AmqpReceiver receiver = session.createReceiver(source);
       sendMessages(address.toString(), 1);
       receiver.flow(1);
       AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
@@ -48,4 +59,31 @@ public class ClientDefinedAnycastConsumerTest  extends 
AmqpClientTestSupport  {
       receiver.close();
       connection.close();
    }
+
+   @Test(timeout = 60000)
+   public void testConsumeFromSingleQueueOnAddressSameNameNegativeValidation() 
throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      Source source = new Source();
+      source.setAddress(address.toString());
+      source.setCapabilities(TOPIC_CAPABILITY);
+
+      AmqpReceiver receiver = session.createReceiver(source);
+      sendMessages(address.toString(), 1);
+      receiver.flow(1);
+      AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(amqpMessage);
+      Bindings bindings = 
server.getPostOffice().getBindingsForAddress(address);
+      assertEquals(1, bindings.getBindings().size());
+      bindings.getBindings().forEach((binding) -> {
+         final Queue localQueue = ((LocalQueueBinding) binding).getQueue();
+         assertEquals(1, localQueue.getConsumerCount());
+         assertEquals(RoutingType.MULTICAST, localQueue.getRoutingType());
+      });
+
+      receiver.close();
+      connection.close();
+   }
 }

Reply via email to