This is an automated email from the ASF dual-hosted git repository.

jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 00cae02ca4 ARTEMIS-4196 - set message routing type in mqtt publish
00cae02ca4 is described below

commit 00cae02ca46c60045f04c5778ce203c941222c15
Author: Gary Tully <[email protected]>
AuthorDate: Mon Mar 6 11:30:58 2023 +0000

    ARTEMIS-4196 - set message routing type in mqtt publish
---
 .../core/protocol/mqtt/MQTTPublishManager.java     |   8 +-
 .../artemis/core/postoffice/impl/BindingsImpl.java |   6 +-
 .../artemis/tests/integration/mqtt/MQTTTest.java   |   4 +-
 ...ttClusterRemoteSubscribeLoadBalanceOffTest.java | 155 +++++++++++++++++++++
 .../mqtt/MqttClusterRemoteSubscribeTest.java       |  54 +------
 5 files changed, 172 insertions(+), 55 deletions(-)

diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index 0059140a31..31a5409a44 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -43,6 +43,7 @@ import 
org.apache.activemq.artemis.core.protocol.mqtt.exceptions.DisconnectExcep
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerProducer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
@@ -223,8 +224,13 @@ public class MQTTPublishManager {
 
             Transaction tx = session.getServerSession().newTransaction();
             try {
-               if (session.getServer().getAddressInfo(address) == null && 
session.getServer().getAddressSettingsRepository().getMatch(coreAddress).isAutoCreateAddresses())
 {
+               AddressInfo addressInfo = 
session.getServer().getAddressInfo(address);
+               if (addressInfo == null && 
session.getServer().getAddressSettingsRepository().getMatch(coreAddress).isAutoCreateAddresses())
 {
                   session.getServerSession().createAddress(address, 
RoutingType.MULTICAST, true);
+                  serverMessage.setRoutingType(RoutingType.MULTICAST);
+               }
+               if (addressInfo != null) {
+                  serverMessage.setRoutingType(addressInfo.getRoutingType());
                }
                session.getServerSession().send(tx, serverMessage, true, 
senderName, false);
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
index bad83c59bd..1a7bbcf718 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
@@ -416,8 +416,10 @@ public final class BindingsImpl implements Bindings {
    private static boolean matchBinding(final Message message,
                                        final Binding binding,
                                        final MessageLoadBalancingType 
loadBalancingType) {
-      if ((loadBalancingType.equals(MessageLoadBalancingType.OFF) || 
loadBalancingType.equals(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION) && 
!Objects.equals(message.getRoutingType(), RoutingType.MULTICAST)) && binding 
instanceof RemoteQueueBinding) {
-         return false;
+      if (loadBalancingType.equals(MessageLoadBalancingType.OFF) || 
loadBalancingType.equals(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION)) {
+         if (!Objects.equals(message.getRoutingType(), RoutingType.MULTICAST) 
&& binding instanceof RemoteQueueBinding) {
+            return false;
+         }
       }
 
       final Filter filter = binding.getFilter();
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
index aee3ba78be..7ceb43fa87 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
@@ -1350,8 +1350,8 @@ public class MQTTTest extends MQTTTestSupport {
       connection.start();
 
       Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      javax.jms.Queue jmsQueue = s.createQueue(jmsTopicAddress);
-      MessageConsumer consumer = s.createConsumer(jmsQueue);
+      javax.jms.Topic jmsTopic = s.createTopic(jmsTopicAddress);
+      MessageConsumer consumer = s.createConsumer(jmsTopic);
 
       provider.publish(address, RETAINED.getBytes(), AT_LEAST_ONCE, true);
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttClusterRemoteSubscribeLoadBalanceOffTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttClusterRemoteSubscribeLoadBalanceOffTest.java
new file mode 100644
index 0000000000..dcaab64b68
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttClusterRemoteSubscribeLoadBalanceOffTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.mqtt;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
+import org.apache.activemq.artemis.core.config.WildcardConfiguration;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import 
org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.Message;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+import org.junit.Test;
+
+public class MqttClusterRemoteSubscribeLoadBalanceOffTest extends 
ClusterTestBase {
+
+   @Override
+   protected boolean isResolveProtocols() {
+      return true;
+   }
+
+   public boolean isNetty() {
+      return true;
+   }
+
+   @Test
+   public void testPub0Sub1() throws Exception {
+      final String TOPIC = "test/1";
+      final String clientId1 = "clientId1";
+      final String clientId2 = "clientId2";
+      Topic[] topics = {new Topic(TOPIC, QoS.AT_MOST_ONCE)};
+
+      setupServers(TOPIC);
+
+      startServers(0, 1);
+
+      final BlockingConnection connection1 = 
retrieveMQTTConnection("tcp://localhost:61616", clientId1);
+      final BlockingConnection connection2 = 
retrieveMQTTConnection("tcp://localhost:61617", clientId2);
+
+      assertTrue("Should be connected", Wait.waitFor(() -> 
connection1.isConnected(), 5000, 100));
+      assertTrue("Should be connected", Wait.waitFor(() -> 
connection2.isConnected(), 5000, 100));
+
+      waitForTopology(servers[0], "cluster0", 2, 5000);
+      waitForTopology(servers[1], "cluster1", 2, 5000);
+
+      // Subscribe to topics
+      connection1.subscribe(topics);
+      connection2.subscribe(topics);
+
+
+      waitForBindings(0, TOPIC, 1, 1, false);
+      waitForBindings(1, TOPIC, 1, 1, false);
+
+      // Publish Messages
+      String payload1 = "This is message 1";
+      String payload2 = "This is message 2";
+
+      connection1.publish(TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, 
false);
+      connection2.publish(TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false);
+
+      Message message1 = connection1.receive(5, TimeUnit.SECONDS);
+      message1.ack();
+      Message message2 = connection1.receive(5, TimeUnit.SECONDS);
+      message2.ack();
+
+      message1 = connection2.receive(5, TimeUnit.SECONDS);
+      message1.ack();
+      message2 = connection2.receive(5, TimeUnit.SECONDS);
+      message2.ack();
+
+      String[] topicsStrings = new String[]{TOPIC};
+      if (connection1 != null && connection1.isConnected()) {
+         connection1.unsubscribe(topicsStrings);
+         connection1.disconnect();
+      }
+      if (connection2 != null && connection2.isConnected()) {
+         connection2.unsubscribe(topicsStrings);
+         connection2.disconnect();
+      }
+   }
+
+   private static BlockingConnection retrieveMQTTConnection(String host, 
String clientId) throws Exception {
+      MQTT mqtt = new MQTT();
+      mqtt.setHost(host);
+      mqtt.setClientId(clientId);
+      mqtt.setConnectAttemptsMax(0);
+      mqtt.setReconnectAttemptsMax(0);
+      BlockingConnection connection = mqtt.blockingConnection();
+      connection.connect();
+      return connection;
+   }
+
+   private void setupServers(String address) throws Exception {
+
+      WildcardConfiguration wildcardConfiguration = 
createWildCardConfiguration();
+      CoreAddressConfiguration coreAddressConfiguration = 
createAddressConfiguration(address);
+      AddressSettings addressSettings = createAddressSettings();
+
+      setupServer(0, false, isNetty());
+      
servers[0].getConfiguration().addAddressConfiguration(coreAddressConfiguration);
+      servers[0].getConfiguration().addAddressSetting("#", addressSettings);
+      
servers[0].getConfiguration().setWildCardConfiguration(wildcardConfiguration);
+
+      setupServer(1, false, isNetty());
+      
servers[1].getConfiguration().addAddressConfiguration(coreAddressConfiguration);
+      servers[1].getConfiguration().addAddressSetting("#", addressSettings);
+      
servers[1].getConfiguration().setWildCardConfiguration(wildcardConfiguration);
+
+      setupClusterConnection("cluster0", "", 
MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION, 1, isNetty(), 0, 1);
+      setupClusterConnection("cluster1", "", MessageLoadBalancingType.OFF, 1, 
isNetty(), 1, 0);
+   }
+
+   private AddressSettings createAddressSettings() {
+      AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setRedistributionDelay(0);
+      addressSettings.setDefaultAddressRoutingType(RoutingType.MULTICAST);
+      return addressSettings;
+   }
+
+   private CoreAddressConfiguration createAddressConfiguration(String TOPIC) {
+      CoreAddressConfiguration coreAddressConfiguration = new 
CoreAddressConfiguration();
+      coreAddressConfiguration.addRoutingType(RoutingType.MULTICAST);
+      coreAddressConfiguration.setName(TOPIC);
+      return coreAddressConfiguration;
+   }
+
+   private WildcardConfiguration createWildCardConfiguration() {
+      WildcardConfiguration wildcardConfiguration = new 
WildcardConfiguration();
+      wildcardConfiguration.setAnyWords('#');
+      wildcardConfiguration.setDelimiter('/');
+      wildcardConfiguration.setRoutingEnabled(true);
+      wildcardConfiguration.setSingleWord('+');
+      return wildcardConfiguration;
+   }
+}
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttClusterRemoteSubscribeTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttClusterRemoteSubscribeTest.java
index e550d2499b..4f7d35b1d8 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttClusterRemoteSubscribeTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttClusterRemoteSubscribeTest.java
@@ -483,17 +483,8 @@ public class MqttClusterRemoteSubscribeTest extends 
ClusterTestBase {
          pubConnection.publish("anycast/test/1/some/la", payload2.getBytes(), 
QoS.AT_MOST_ONCE, false);
          pubConnection.publish("anycast/test/1/some/la", payload3.getBytes(), 
QoS.AT_MOST_ONCE, false);
 
-         Message message1 = subConnection2.receive(5, TimeUnit.SECONDS);
-         message1.ack();
-         Message message2 = subConnection2.receive(5, TimeUnit.SECONDS);
-         message2.ack();
-         Message message3 = subConnection2.receive(5, TimeUnit.SECONDS);
-         message3.ack();
-
-         assertEquals(payload1, new String(message1.getPayload()));
-         assertEquals(payload2, new String(message2.getPayload()));
-         assertEquals(payload3, new String(message3.getPayload()));
-
+         // pub queue gets auto created, the routing type set on the message 
to reflect that and the
+         // message does not get routed to the sub queue that has anycast
          subConnection2.unsubscribe(new String[]{ANYCAST_TOPIC});
 
          waitForBindings(0, ANYCAST_TOPIC, 1, 0, true);
@@ -506,13 +497,6 @@ public class MqttClusterRemoteSubscribeTest extends 
ClusterTestBase {
          pubConnection.publish("anycast/test/1/some/la", payload2.getBytes(), 
QoS.AT_MOST_ONCE, false);
          pubConnection.publish("anycast/test/1/some/la", payload3.getBytes(), 
QoS.AT_MOST_ONCE, false);
 
-         Message message11 = subConnection2.receive(100, 
TimeUnit.MILLISECONDS);
-         assertNull(message11);
-         Message message21 = subConnection2.receive(100, 
TimeUnit.MILLISECONDS);
-         assertNull(message21);
-         Message message31 = subConnection2.receive(100, 
TimeUnit.MILLISECONDS);
-         assertNull(message31);
-
       } finally {
          String[] topics = new String[]{ANYCAST_TOPIC};
          if (subConnection1 != null && subConnection1.isConnected()) {
@@ -576,20 +560,8 @@ public class MqttClusterRemoteSubscribeTest extends 
ClusterTestBase {
          connection1.publish("anycast/test/1/some/la", payload3.getBytes(), 
QoS.AT_MOST_ONCE, false);
 
 
-         Message message1 = connection1.receive(5, TimeUnit.SECONDS);
-         assertNotNull(message1);
-         message1.ack();
-         Message message2 = connection2.receive(5, TimeUnit.SECONDS);
-         assertNotNull(message2);
-         message2.ack();
-         Message message3 = connection1.receive(5, TimeUnit.SECONDS);
-         assertNotNull(message3);
-         message3.ack();
-
-         assertEquals(payload1, new String(message1.getPayload()));
-         assertEquals(payload2, new String(message2.getPayload()));
-         assertEquals(payload3, new String(message3.getPayload()));
-
+         // the pub queue is auto created and the message multicast routing 
type won't match the anycast sub queue
+         // so nothing gets routed to this queue
 
          connection2.unsubscribe(new String[]{ANYCAST_TOPIC});
 
@@ -603,24 +575,6 @@ public class MqttClusterRemoteSubscribeTest extends 
ClusterTestBase {
          connection1.publish("anycast/test/1/some/la", payload2.getBytes(), 
QoS.AT_MOST_ONCE, false);
          connection1.publish("anycast/test/1/some/la", payload3.getBytes(), 
QoS.AT_MOST_ONCE, false);
 
-         Message message11 = connection1.receive(5, TimeUnit.SECONDS);
-         assertNotNull(message11);
-         message11.ack();
-         Message message21 = connection1.receive(5, TimeUnit.SECONDS);
-         assertNotNull(message21);
-         message21.ack();
-         Message message31 = connection1.receive(5, TimeUnit.SECONDS);
-         assertNotNull(message31);
-         message31.ack();
-
-
-         String message11String = new String(message11.getPayload());
-         String message21String = new String(message21.getPayload());
-         String message31String = new String(message31.getPayload());
-         assertTrue(payload1.equals(message11String) || 
payload1.equals(message21String) || payload1.equals(message31String) );
-         assertTrue(payload2.equals(message11String) || 
payload2.equals(message21String) || payload2.equals(message31String) );
-         assertTrue(payload3.equals(message11String) || 
payload3.equals(message21String) || payload3.equals(message31String) );
-
       } finally {
          String[] topics = new String[]{ANYCAST_TOPIC};
          if (connection1 != null && connection1.isConnected()) {

Reply via email to