This is an automated email from the ASF dual-hosted git repository. gtully pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit e985df77fbab2e81eecd9f10f60adcb5824c317a Author: gtully <[email protected]> AuthorDate: Mon Jul 26 22:54:36 2021 +0100 ARTEMIS-3223 - ensure distribution uses the address from the message, rather than the address from the queue which may be a wildcard sub and not valid for publishng on, fix and test --- .../core/postoffice/impl/PostOfficeImpl.java | 19 +--- .../artemis/core/server/ActiveMQServerLogger.java | 4 + .../core/server/cluster/impl/Redistributor.java | 1 + .../amqp/AmqpBridgeClusterRedistributionTest.java | 4 +- .../mqtt/imported/MqttClusterWildcardTest.java | 126 +++++++++++++++++++++ 5 files changed, 135 insertions(+), 19 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index af7dda6..5d5bcd9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -1289,29 +1289,14 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding public Pair<RoutingContext, Message> redistribute(final Message message, final Queue originatingQueue, final Transaction tx) throws Exception { - Bindings bindings = addressManager.getBindingsForRoutingAddress(originatingQueue.getAddress()); + Bindings bindings = addressManager.getBindingsForRoutingAddress(message.getAddressSimpleString()); if (bindings != null && bindings.allowRedistribute()) { // We have to copy the message and store it separately, otherwise we may lose remote bindings in case of restart before the message // arrived the target node // as described on https://issues.jboss.org/browse/JBPAPP-6130 Message copyRedistribute = message.copy(storageManager.generateID()); - copyRedistribute.setAddress(originatingQueue.getAddress()); - - if (tx != null) { - tx.addOperation(new TransactionOperationAbstract() { - @Override - public void afterRollback(Transaction tx) { - try { - //this will cause large message file to be - //cleaned up - // copyRedistribute.refDown(); - } catch (Exception e) { - logger.warn("Failed to clean up message: " + copyRedistribute); - } - } - }); - } + copyRedistribute.setAddress(message.getAddress()); RoutingContext context = new RoutingContextImpl(tx); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index 6bae295..6e62c72 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1755,6 +1755,10 @@ public interface ActiveMQServerLogger extends BasicLogger { @Message(id = 222302, value = "Failed to deal with property {0} when converting message from core to OpenWire: {1}", format = Message.Format.MESSAGE_FORMAT) void failedToDealWithObjectProperty(SimpleString property, String exceptionMessage); + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222303, value = "Redistribution by {0} of messageID = {1} failed", format = Message.Format.MESSAGE_FORMAT) + void errorRedistributing(@Cause Throwable t, String queueName, long m); + @LogMessage(level = Logger.Level.ERROR) @Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT) void initializationError(@Cause Throwable e); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java index 7982018..15a1b54 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java @@ -178,6 +178,7 @@ public class Redistributor implements Consumer { queue.deliverAsync(); } } catch (Exception e) { + ActiveMQServerLogger.LOGGER.errorRedistributing(e, toManagementString(), reference.getMessageID()); try { tx.rollback(); } catch (Exception e2) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBridgeClusterRedistributionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBridgeClusterRedistributionTest.java index 91f6a24..52fe95f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBridgeClusterRedistributionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBridgeClusterRedistributionTest.java @@ -136,8 +136,8 @@ public class AmqpBridgeClusterRedistributionTest extends AmqpClientTestSupport { bridgeNotificationsQueue = SimpleString.toSimpleString("BridgeNotifications"); notificationsQueue = SimpleString.toSimpleString("Notifications"); - setupClusterConnection("cluster0", "", MessageLoadBalancingType.ON_DEMAND, 1, true, 1, 2); - setupClusterConnection("cluster1", "", MessageLoadBalancingType.ON_DEMAND, 1, true, 2, 1); + setupClusterConnection("cluster-1->2", "", MessageLoadBalancingType.ON_DEMAND, 1, true, 1, 2); + setupClusterConnection("cluster-2->1", "", MessageLoadBalancingType.ON_DEMAND, 1, true, 2, 1); server0.start(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterWildcardTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterWildcardTest.java index ab8d30c..5a8d314 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterWildcardTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterWildcardTest.java @@ -20,6 +20,7 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.core.config.WildcardConfiguration; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase; import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.MQTT; @@ -100,6 +101,101 @@ public class MqttClusterWildcardTest extends ClusterTestBase { assertEquals(payload2, new String(message5.getPayload())); assertEquals(payload3, new String(message6.getPayload())); + assertNonWildcardTopic(message1); + assertNonWildcardTopic(message2); + assertNonWildcardTopic(message3); + assertNonWildcardTopic(message4); + assertNonWildcardTopic(message5); + assertNonWildcardTopic(message6); + + + } finally { + String[] topics = new String[]{TOPIC}; + if (connection1 != null) { + connection1.unsubscribe(topics); + connection1.disconnect(); + } + if (connection2 != null) { + connection2.unsubscribe(topics); + connection2.disconnect(); + } + } + } + + @Test + public void verifyRedistribution() throws Exception { + final String TOPIC = "test/+/some/#"; + final String clientId = "SubOne"; + + WildcardConfiguration wildcardConfiguration = new WildcardConfiguration(); + wildcardConfiguration.setAnyWords('#'); + wildcardConfiguration.setDelimiter('/'); + wildcardConfiguration.setRoutingEnabled(true); + wildcardConfiguration.setSingleWord('+'); + + setupServer(0, false, isNetty()); + servers[0].getConfiguration().setWildCardConfiguration(wildcardConfiguration); + + setupServer(1, false, isNetty()); + servers[1].getConfiguration().setWildCardConfiguration(wildcardConfiguration); + + // allow redistribution + AddressSettings addressSettings = new AddressSettings(); + addressSettings.setRedistributionDelay(0); + servers[0].getConfiguration().addAddressesSetting("#", addressSettings); + servers[1].getConfiguration().addAddressesSetting("#", addressSettings); + + setupClusterConnection("cluster0", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1); + setupClusterConnection("cluster1", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0); + + startServers(0, 1); + + BlockingConnection connection1 = null; + BlockingConnection connection2 = null; + try { + connection1 = retrieveMQTTConnection("tcp://localhost:61616"); + connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId); + + // Subscribe to topics + Topic[] topics = {new Topic(TOPIC, QoS.EXACTLY_ONCE)}; + connection2.subscribe(topics); + + waitForBindings(0, TOPIC, 0, 0, true); + waitForBindings(1, TOPIC, 1, 1, true); + + waitForBindings(0, TOPIC, 1, 1, false); + waitForBindings(1, TOPIC, 0, 0, false); + + // Publish Messages + String payload1 = "This is message 1"; + String payload2 = "This is message 2"; + String payload3 = "This is message 3"; + + connection1.publish("test/1/some/la", payload1.getBytes(), QoS.EXACTLY_ONCE, false); + connection1.publish("test/1/some/la", payload2.getBytes(), QoS.EXACTLY_ONCE, false); + connection1.publish("test/1/some/la", payload3.getBytes(), QoS.EXACTLY_ONCE, false); + + + waitForMessages(1, TOPIC, 3); + + connection2.disconnect(); + + // force redistribution + connection2 = retrieveMQTTConnection("tcp://localhost:61616", clientId); + connection2.subscribe(topics); + + Message message4 = connection2.receive(15, TimeUnit.SECONDS); + Message message5 = connection2.receive(5, TimeUnit.SECONDS); + Message message6 = connection2.receive(5, TimeUnit.SECONDS); + + assertEquals(payload1, new String(message4.getPayload())); + assertEquals(payload2, new String(message5.getPayload())); + assertEquals(payload3, new String(message6.getPayload())); + + assertNonWildcardTopic(message4); + assertNonWildcardTopic(message5); + assertNonWildcardTopic(message6); + } finally { String[] topics = new String[]{TOPIC}; if (connection1 != null) { @@ -189,6 +285,14 @@ public class MqttClusterWildcardTest extends ClusterTestBase { assertEquals(payload2, new String(message5.getPayload())); assertEquals(payload3, new String(message6.getPayload())); + assertNonWildcardTopic(message1); + assertNonWildcardTopic(message2); + assertNonWildcardTopic(message3); + assertNonWildcardTopic(message4); + assertNonWildcardTopic(message5); + assertNonWildcardTopic(message6); + + } finally { String[] topics = new String[]{TOPIC}; if (connection1 != null) { @@ -202,9 +306,31 @@ public class MqttClusterWildcardTest extends ClusterTestBase { } } + private void assertNonWildcardTopic(Message message1) { + assertNotNull(message1); + String payload = new String(message1.getPayload()); + System.err.println("got payload: " + payload); + + assertTrue(payload.contains("message")); + String topic = message1.getTopic(); + System.err.println("got topic: " + topic); + assertTrue(!topic.contains("+")); + assertTrue(!topic.contains("*")); + assertTrue(!topic.contains("#")); + } + + private static BlockingConnection retrieveMQTTConnection(String host) throws Exception { + return retrieveMQTTConnection(host, null); + } + + private static BlockingConnection retrieveMQTTConnection(String host, String clientId) throws Exception { MQTT mqtt = new MQTT(); mqtt.setHost(host); + if (clientId != null) { + mqtt.setClientId(clientId); + mqtt.setCleanSession(false); + } BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); return connection;
