This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit e985df77fbab2e81eecd9f10f60adcb5824c317a
Author: gtully <[email protected]>
AuthorDate: Mon Jul 26 22:54:36 2021 +0100

    ARTEMIS-3223 - ensure distribution uses the address from the message, 
rather than the address from the queue which may be a wildcard sub and not 
valid for publishng on, fix and test
---
 .../core/postoffice/impl/PostOfficeImpl.java       |  19 +---
 .../artemis/core/server/ActiveMQServerLogger.java  |   4 +
 .../core/server/cluster/impl/Redistributor.java    |   1 +
 .../amqp/AmqpBridgeClusterRedistributionTest.java  |   4 +-
 .../mqtt/imported/MqttClusterWildcardTest.java     | 126 +++++++++++++++++++++
 5 files changed, 135 insertions(+), 19 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index af7dda6..5d5bcd9 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1289,29 +1289,14 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
    public Pair<RoutingContext, Message> redistribute(final Message message,
                                                      final Queue 
originatingQueue,
                                                      final Transaction tx) 
throws Exception {
-      Bindings bindings = 
addressManager.getBindingsForRoutingAddress(originatingQueue.getAddress());
+      Bindings bindings = 
addressManager.getBindingsForRoutingAddress(message.getAddressSimpleString());
 
       if (bindings != null && bindings.allowRedistribute()) {
          // We have to copy the message and store it separately, otherwise we 
may lose remote bindings in case of restart before the message
          // arrived the target node
          // as described on https://issues.jboss.org/browse/JBPAPP-6130
          Message copyRedistribute = message.copy(storageManager.generateID());
-         copyRedistribute.setAddress(originatingQueue.getAddress());
-
-         if (tx != null) {
-            tx.addOperation(new TransactionOperationAbstract() {
-               @Override
-               public void afterRollback(Transaction tx) {
-                  try {
-                     //this will cause large message file to be
-                     //cleaned up
-                     // copyRedistribute.refDown();
-                  } catch (Exception e) {
-                     logger.warn("Failed to clean up message: " + 
copyRedistribute);
-                  }
-               }
-            });
-         }
+         copyRedistribute.setAddress(message.getAddress());
 
          RoutingContext context = new RoutingContextImpl(tx);
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 6bae295..6e62c72 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1755,6 +1755,10 @@ public interface ActiveMQServerLogger extends 
BasicLogger {
    @Message(id = 222302, value = "Failed to deal with property {0} when 
converting message from core to OpenWire: {1}", format = 
Message.Format.MESSAGE_FORMAT)
    void failedToDealWithObjectProperty(SimpleString property, String 
exceptionMessage);
 
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222303, value = "Redistribution by {0} of messageID = {1} 
failed", format = Message.Format.MESSAGE_FORMAT)
+   void errorRedistributing(@Cause Throwable t, String queueName, long m);
+
    @LogMessage(level = Logger.Level.ERROR)
    @Message(id = 224000, value = "Failure in initialisation", format = 
Message.Format.MESSAGE_FORMAT)
    void initializationError(@Cause Throwable e);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
index 7982018..15a1b54 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
@@ -178,6 +178,7 @@ public class Redistributor implements Consumer {
                      queue.deliverAsync();
                   }
                } catch (Exception e) {
+                  ActiveMQServerLogger.LOGGER.errorRedistributing(e, 
toManagementString(), reference.getMessageID());
                   try {
                      tx.rollback();
                   } catch (Exception e2) {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBridgeClusterRedistributionTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBridgeClusterRedistributionTest.java
index 91f6a24..52fe95f 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBridgeClusterRedistributionTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBridgeClusterRedistributionTest.java
@@ -136,8 +136,8 @@ public class AmqpBridgeClusterRedistributionTest extends 
AmqpClientTestSupport {
       bridgeNotificationsQueue = 
SimpleString.toSimpleString("BridgeNotifications");
       notificationsQueue = SimpleString.toSimpleString("Notifications");
 
-      setupClusterConnection("cluster0", "", 
MessageLoadBalancingType.ON_DEMAND, 1, true, 1, 2);
-      setupClusterConnection("cluster1", "", 
MessageLoadBalancingType.ON_DEMAND, 1, true, 2, 1);
+      setupClusterConnection("cluster-1->2", "", 
MessageLoadBalancingType.ON_DEMAND, 1, true, 1, 2);
+      setupClusterConnection("cluster-2->1", "", 
MessageLoadBalancingType.ON_DEMAND, 1, true, 2, 1);
 
       server0.start();
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterWildcardTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterWildcardTest.java
index ab8d30c..5a8d314 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterWildcardTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterWildcardTest.java
@@ -20,6 +20,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.core.config.WildcardConfiguration;
 import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import 
org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
 import org.fusesource.mqtt.client.BlockingConnection;
 import org.fusesource.mqtt.client.MQTT;
@@ -100,6 +101,101 @@ public class MqttClusterWildcardTest extends 
ClusterTestBase {
          assertEquals(payload2, new String(message5.getPayload()));
          assertEquals(payload3, new String(message6.getPayload()));
 
+         assertNonWildcardTopic(message1);
+         assertNonWildcardTopic(message2);
+         assertNonWildcardTopic(message3);
+         assertNonWildcardTopic(message4);
+         assertNonWildcardTopic(message5);
+         assertNonWildcardTopic(message6);
+
+
+      } finally {
+         String[] topics = new String[]{TOPIC};
+         if (connection1 != null) {
+            connection1.unsubscribe(topics);
+            connection1.disconnect();
+         }
+         if (connection2 != null) {
+            connection2.unsubscribe(topics);
+            connection2.disconnect();
+         }
+      }
+   }
+
+   @Test
+   public void verifyRedistribution() throws Exception {
+      final String TOPIC = "test/+/some/#";
+      final String clientId = "SubOne";
+
+      WildcardConfiguration wildcardConfiguration = new 
WildcardConfiguration();
+      wildcardConfiguration.setAnyWords('#');
+      wildcardConfiguration.setDelimiter('/');
+      wildcardConfiguration.setRoutingEnabled(true);
+      wildcardConfiguration.setSingleWord('+');
+
+      setupServer(0, false, isNetty());
+      
servers[0].getConfiguration().setWildCardConfiguration(wildcardConfiguration);
+
+      setupServer(1, false, isNetty());
+      
servers[1].getConfiguration().setWildCardConfiguration(wildcardConfiguration);
+
+      // allow redistribution
+      AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setRedistributionDelay(0);
+      servers[0].getConfiguration().addAddressesSetting("#", addressSettings);
+      servers[1].getConfiguration().addAddressesSetting("#", addressSettings);
+
+      setupClusterConnection("cluster0", "", 
MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
+      setupClusterConnection("cluster1", "", 
MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
+
+      startServers(0, 1);
+
+      BlockingConnection connection1 = null;
+      BlockingConnection connection2 = null;
+      try {
+         connection1 = retrieveMQTTConnection("tcp://localhost:61616");
+         connection2 = retrieveMQTTConnection("tcp://localhost:61617", 
clientId);
+
+         // Subscribe to topics
+         Topic[] topics = {new Topic(TOPIC, QoS.EXACTLY_ONCE)};
+         connection2.subscribe(topics);
+
+         waitForBindings(0, TOPIC, 0, 0, true);
+         waitForBindings(1, TOPIC, 1, 1, true);
+
+         waitForBindings(0, TOPIC, 1, 1, false);
+         waitForBindings(1, TOPIC, 0, 0, false);
+
+         // Publish Messages
+         String payload1 = "This is message 1";
+         String payload2 = "This is message 2";
+         String payload3 = "This is message 3";
+
+         connection1.publish("test/1/some/la", payload1.getBytes(), 
QoS.EXACTLY_ONCE, false);
+         connection1.publish("test/1/some/la", payload2.getBytes(), 
QoS.EXACTLY_ONCE, false);
+         connection1.publish("test/1/some/la", payload3.getBytes(), 
QoS.EXACTLY_ONCE, false);
+
+
+         waitForMessages(1, TOPIC, 3);
+
+         connection2.disconnect();
+
+         // force redistribution
+         connection2 = retrieveMQTTConnection("tcp://localhost:61616", 
clientId);
+         connection2.subscribe(topics);
+
+         Message message4 = connection2.receive(15, TimeUnit.SECONDS);
+         Message message5 = connection2.receive(5, TimeUnit.SECONDS);
+         Message message6 = connection2.receive(5, TimeUnit.SECONDS);
+
+         assertEquals(payload1, new String(message4.getPayload()));
+         assertEquals(payload2, new String(message5.getPayload()));
+         assertEquals(payload3, new String(message6.getPayload()));
+
+         assertNonWildcardTopic(message4);
+         assertNonWildcardTopic(message5);
+         assertNonWildcardTopic(message6);
+
       } finally {
          String[] topics = new String[]{TOPIC};
          if (connection1 != null) {
@@ -189,6 +285,14 @@ public class MqttClusterWildcardTest extends 
ClusterTestBase {
          assertEquals(payload2, new String(message5.getPayload()));
          assertEquals(payload3, new String(message6.getPayload()));
 
+         assertNonWildcardTopic(message1);
+         assertNonWildcardTopic(message2);
+         assertNonWildcardTopic(message3);
+         assertNonWildcardTopic(message4);
+         assertNonWildcardTopic(message5);
+         assertNonWildcardTopic(message6);
+
+
       } finally {
          String[] topics = new String[]{TOPIC};
          if (connection1 != null) {
@@ -202,9 +306,31 @@ public class MqttClusterWildcardTest extends 
ClusterTestBase {
       }
    }
 
+   private void assertNonWildcardTopic(Message message1) {
+      assertNotNull(message1);
+      String payload = new String(message1.getPayload());
+      System.err.println("got payload: " + payload);
+
+      assertTrue(payload.contains("message"));
+      String topic = message1.getTopic();
+      System.err.println("got topic: " + topic);
+      assertTrue(!topic.contains("+"));
+      assertTrue(!topic.contains("*"));
+      assertTrue(!topic.contains("#"));
+   }
+
+
    private static BlockingConnection retrieveMQTTConnection(String host) 
throws Exception {
+      return retrieveMQTTConnection(host, null);
+   }
+
+   private static BlockingConnection retrieveMQTTConnection(String host, 
String clientId) throws Exception {
       MQTT mqtt = new MQTT();
       mqtt.setHost(host);
+      if (clientId != null) {
+         mqtt.setClientId(clientId);
+         mqtt.setCleanSession(false);
+      }
       BlockingConnection connection = mqtt.blockingConnection();
       connection.connect();
       return connection;

Reply via email to