This is an automated email from the ASF dual-hosted git repository.
jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 00cae02ca4 ARTEMIS-4196 - set message routing type in mqtt publish
00cae02ca4 is described below
commit 00cae02ca46c60045f04c5778ce203c941222c15
Author: Gary Tully <[email protected]>
AuthorDate: Mon Mar 6 11:30:58 2023 +0000
ARTEMIS-4196 - set message routing type in mqtt publish
---
.../core/protocol/mqtt/MQTTPublishManager.java | 8 +-
.../artemis/core/postoffice/impl/BindingsImpl.java | 6 +-
.../artemis/tests/integration/mqtt/MQTTTest.java | 4 +-
...ttClusterRemoteSubscribeLoadBalanceOffTest.java | 155 +++++++++++++++++++++
.../mqtt/MqttClusterRemoteSubscribeTest.java | 54 +------
5 files changed, 172 insertions(+), 55 deletions(-)
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index 0059140a31..31a5409a44 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -43,6 +43,7 @@ import
org.apache.activemq.artemis.core.protocol.mqtt.exceptions.DisconnectExcep
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerProducer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.UUIDGenerator;
@@ -223,8 +224,13 @@ public class MQTTPublishManager {
Transaction tx = session.getServerSession().newTransaction();
try {
- if (session.getServer().getAddressInfo(address) == null &&
session.getServer().getAddressSettingsRepository().getMatch(coreAddress).isAutoCreateAddresses())
{
+ AddressInfo addressInfo =
session.getServer().getAddressInfo(address);
+ if (addressInfo == null &&
session.getServer().getAddressSettingsRepository().getMatch(coreAddress).isAutoCreateAddresses())
{
session.getServerSession().createAddress(address,
RoutingType.MULTICAST, true);
+ serverMessage.setRoutingType(RoutingType.MULTICAST);
+ }
+ if (addressInfo != null) {
+ serverMessage.setRoutingType(addressInfo.getRoutingType());
}
session.getServerSession().send(tx, serverMessage, true,
senderName, false);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
index bad83c59bd..1a7bbcf718 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
@@ -416,8 +416,10 @@ public final class BindingsImpl implements Bindings {
private static boolean matchBinding(final Message message,
final Binding binding,
final MessageLoadBalancingType
loadBalancingType) {
- if ((loadBalancingType.equals(MessageLoadBalancingType.OFF) ||
loadBalancingType.equals(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION) &&
!Objects.equals(message.getRoutingType(), RoutingType.MULTICAST)) && binding
instanceof RemoteQueueBinding) {
- return false;
+ if (loadBalancingType.equals(MessageLoadBalancingType.OFF) ||
loadBalancingType.equals(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION)) {
+ if (!Objects.equals(message.getRoutingType(), RoutingType.MULTICAST)
&& binding instanceof RemoteQueueBinding) {
+ return false;
+ }
}
final Filter filter = binding.getFilter();
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
index aee3ba78be..7ceb43fa87 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
@@ -1350,8 +1350,8 @@ public class MQTTTest extends MQTTTestSupport {
connection.start();
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- javax.jms.Queue jmsQueue = s.createQueue(jmsTopicAddress);
- MessageConsumer consumer = s.createConsumer(jmsQueue);
+ javax.jms.Topic jmsTopic = s.createTopic(jmsTopicAddress);
+ MessageConsumer consumer = s.createConsumer(jmsTopic);
provider.publish(address, RETAINED.getBytes(), AT_LEAST_ONCE, true);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttClusterRemoteSubscribeLoadBalanceOffTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttClusterRemoteSubscribeLoadBalanceOffTest.java
new file mode 100644
index 0000000000..dcaab64b68
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttClusterRemoteSubscribeLoadBalanceOffTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.mqtt;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
+import org.apache.activemq.artemis.core.config.WildcardConfiguration;
+import
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import
org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.Message;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+import org.junit.Test;
+
+public class MqttClusterRemoteSubscribeLoadBalanceOffTest extends
ClusterTestBase {
+
+ @Override
+ protected boolean isResolveProtocols() {
+ return true;
+ }
+
+ public boolean isNetty() {
+ return true;
+ }
+
+ @Test
+ public void testPub0Sub1() throws Exception {
+ final String TOPIC = "test/1";
+ final String clientId1 = "clientId1";
+ final String clientId2 = "clientId2";
+ Topic[] topics = {new Topic(TOPIC, QoS.AT_MOST_ONCE)};
+
+ setupServers(TOPIC);
+
+ startServers(0, 1);
+
+ final BlockingConnection connection1 =
retrieveMQTTConnection("tcp://localhost:61616", clientId1);
+ final BlockingConnection connection2 =
retrieveMQTTConnection("tcp://localhost:61617", clientId2);
+
+ assertTrue("Should be connected", Wait.waitFor(() ->
connection1.isConnected(), 5000, 100));
+ assertTrue("Should be connected", Wait.waitFor(() ->
connection2.isConnected(), 5000, 100));
+
+ waitForTopology(servers[0], "cluster0", 2, 5000);
+ waitForTopology(servers[1], "cluster1", 2, 5000);
+
+ // Subscribe to topics
+ connection1.subscribe(topics);
+ connection2.subscribe(topics);
+
+
+ waitForBindings(0, TOPIC, 1, 1, false);
+ waitForBindings(1, TOPIC, 1, 1, false);
+
+ // Publish Messages
+ String payload1 = "This is message 1";
+ String payload2 = "This is message 2";
+
+ connection1.publish(TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE,
false);
+ connection2.publish(TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false);
+
+ Message message1 = connection1.receive(5, TimeUnit.SECONDS);
+ message1.ack();
+ Message message2 = connection1.receive(5, TimeUnit.SECONDS);
+ message2.ack();
+
+ message1 = connection2.receive(5, TimeUnit.SECONDS);
+ message1.ack();
+ message2 = connection2.receive(5, TimeUnit.SECONDS);
+ message2.ack();
+
+ String[] topicsStrings = new String[]{TOPIC};
+ if (connection1 != null && connection1.isConnected()) {
+ connection1.unsubscribe(topicsStrings);
+ connection1.disconnect();
+ }
+ if (connection2 != null && connection2.isConnected()) {
+ connection2.unsubscribe(topicsStrings);
+ connection2.disconnect();
+ }
+ }
+
+ private static BlockingConnection retrieveMQTTConnection(String host,
String clientId) throws Exception {
+ MQTT mqtt = new MQTT();
+ mqtt.setHost(host);
+ mqtt.setClientId(clientId);
+ mqtt.setConnectAttemptsMax(0);
+ mqtt.setReconnectAttemptsMax(0);
+ BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+ return connection;
+ }
+
+ private void setupServers(String address) throws Exception {
+
+ WildcardConfiguration wildcardConfiguration =
createWildCardConfiguration();
+ CoreAddressConfiguration coreAddressConfiguration =
createAddressConfiguration(address);
+ AddressSettings addressSettings = createAddressSettings();
+
+ setupServer(0, false, isNetty());
+
servers[0].getConfiguration().addAddressConfiguration(coreAddressConfiguration);
+ servers[0].getConfiguration().addAddressSetting("#", addressSettings);
+
servers[0].getConfiguration().setWildCardConfiguration(wildcardConfiguration);
+
+ setupServer(1, false, isNetty());
+
servers[1].getConfiguration().addAddressConfiguration(coreAddressConfiguration);
+ servers[1].getConfiguration().addAddressSetting("#", addressSettings);
+
servers[1].getConfiguration().setWildCardConfiguration(wildcardConfiguration);
+
+ setupClusterConnection("cluster0", "",
MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION, 1, isNetty(), 0, 1);
+ setupClusterConnection("cluster1", "", MessageLoadBalancingType.OFF, 1,
isNetty(), 1, 0);
+ }
+
+ private AddressSettings createAddressSettings() {
+ AddressSettings addressSettings = new AddressSettings();
+ addressSettings.setRedistributionDelay(0);
+ addressSettings.setDefaultAddressRoutingType(RoutingType.MULTICAST);
+ return addressSettings;
+ }
+
+ private CoreAddressConfiguration createAddressConfiguration(String TOPIC) {
+ CoreAddressConfiguration coreAddressConfiguration = new
CoreAddressConfiguration();
+ coreAddressConfiguration.addRoutingType(RoutingType.MULTICAST);
+ coreAddressConfiguration.setName(TOPIC);
+ return coreAddressConfiguration;
+ }
+
+ private WildcardConfiguration createWildCardConfiguration() {
+ WildcardConfiguration wildcardConfiguration = new
WildcardConfiguration();
+ wildcardConfiguration.setAnyWords('#');
+ wildcardConfiguration.setDelimiter('/');
+ wildcardConfiguration.setRoutingEnabled(true);
+ wildcardConfiguration.setSingleWord('+');
+ return wildcardConfiguration;
+ }
+}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttClusterRemoteSubscribeTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttClusterRemoteSubscribeTest.java
index e550d2499b..4f7d35b1d8 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttClusterRemoteSubscribeTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttClusterRemoteSubscribeTest.java
@@ -483,17 +483,8 @@ public class MqttClusterRemoteSubscribeTest extends
ClusterTestBase {
pubConnection.publish("anycast/test/1/some/la", payload2.getBytes(),
QoS.AT_MOST_ONCE, false);
pubConnection.publish("anycast/test/1/some/la", payload3.getBytes(),
QoS.AT_MOST_ONCE, false);
- Message message1 = subConnection2.receive(5, TimeUnit.SECONDS);
- message1.ack();
- Message message2 = subConnection2.receive(5, TimeUnit.SECONDS);
- message2.ack();
- Message message3 = subConnection2.receive(5, TimeUnit.SECONDS);
- message3.ack();
-
- assertEquals(payload1, new String(message1.getPayload()));
- assertEquals(payload2, new String(message2.getPayload()));
- assertEquals(payload3, new String(message3.getPayload()));
-
+ // pub queue gets auto created, the routing type set on the message
to reflect that and the
+ // message does not get routed to the sub queue that has anycast
subConnection2.unsubscribe(new String[]{ANYCAST_TOPIC});
waitForBindings(0, ANYCAST_TOPIC, 1, 0, true);
@@ -506,13 +497,6 @@ public class MqttClusterRemoteSubscribeTest extends
ClusterTestBase {
pubConnection.publish("anycast/test/1/some/la", payload2.getBytes(),
QoS.AT_MOST_ONCE, false);
pubConnection.publish("anycast/test/1/some/la", payload3.getBytes(),
QoS.AT_MOST_ONCE, false);
- Message message11 = subConnection2.receive(100,
TimeUnit.MILLISECONDS);
- assertNull(message11);
- Message message21 = subConnection2.receive(100,
TimeUnit.MILLISECONDS);
- assertNull(message21);
- Message message31 = subConnection2.receive(100,
TimeUnit.MILLISECONDS);
- assertNull(message31);
-
} finally {
String[] topics = new String[]{ANYCAST_TOPIC};
if (subConnection1 != null && subConnection1.isConnected()) {
@@ -576,20 +560,8 @@ public class MqttClusterRemoteSubscribeTest extends
ClusterTestBase {
connection1.publish("anycast/test/1/some/la", payload3.getBytes(),
QoS.AT_MOST_ONCE, false);
- Message message1 = connection1.receive(5, TimeUnit.SECONDS);
- assertNotNull(message1);
- message1.ack();
- Message message2 = connection2.receive(5, TimeUnit.SECONDS);
- assertNotNull(message2);
- message2.ack();
- Message message3 = connection1.receive(5, TimeUnit.SECONDS);
- assertNotNull(message3);
- message3.ack();
-
- assertEquals(payload1, new String(message1.getPayload()));
- assertEquals(payload2, new String(message2.getPayload()));
- assertEquals(payload3, new String(message3.getPayload()));
-
+ // the pub queue is auto created and the message multicast routing
type won't match the anycast sub queue
+ // so nothing gets routed to this queue
connection2.unsubscribe(new String[]{ANYCAST_TOPIC});
@@ -603,24 +575,6 @@ public class MqttClusterRemoteSubscribeTest extends
ClusterTestBase {
connection1.publish("anycast/test/1/some/la", payload2.getBytes(),
QoS.AT_MOST_ONCE, false);
connection1.publish("anycast/test/1/some/la", payload3.getBytes(),
QoS.AT_MOST_ONCE, false);
- Message message11 = connection1.receive(5, TimeUnit.SECONDS);
- assertNotNull(message11);
- message11.ack();
- Message message21 = connection1.receive(5, TimeUnit.SECONDS);
- assertNotNull(message21);
- message21.ack();
- Message message31 = connection1.receive(5, TimeUnit.SECONDS);
- assertNotNull(message31);
- message31.ack();
-
-
- String message11String = new String(message11.getPayload());
- String message21String = new String(message21.getPayload());
- String message31String = new String(message31.getPayload());
- assertTrue(payload1.equals(message11String) ||
payload1.equals(message21String) || payload1.equals(message31String) );
- assertTrue(payload2.equals(message11String) ||
payload2.equals(message21String) || payload2.equals(message31String) );
- assertTrue(payload3.equals(message11String) ||
payload3.equals(message21String) || payload3.equals(message31String) );
-
} finally {
String[] topics = new String[]{ANYCAST_TOPIC};
if (connection1 != null && connection1.isConnected()) {