This is an automated email from the ASF dual-hosted git repository.
jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new bf83a9b ARTEMIS-3608 - Add distribution for Multicast messages to
OFF_WITH_REDISTRIBUTION to avoid message loss
new 124cd6c This closes #3884
bf83a9b is described below
commit bf83a9b3d1c9573bcbed01f6846ebbed8bfb96e6
Author: AntonRoskvist <[email protected]>
AuthorDate: Thu Dec 16 11:00:27 2021 +0100
ARTEMIS-3608 - Add distribution for Multicast messages to
OFF_WITH_REDISTRIBUTION to avoid message loss
---
.../artemis/core/postoffice/impl/BindingsImpl.java | 4 ++-
.../cluster/distribution/ClusterTestBase.java | 19 +++++++++++--
.../distribution/MessageRedistributionTest.java | 33 ++++++++++++++++++++++
3 files changed, 53 insertions(+), 3 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
index e193296..75fb7b8 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -30,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.postoffice.Binding;
@@ -408,7 +410,7 @@ public final class BindingsImpl implements Bindings {
private static boolean matchBinding(final Message message,
final Binding binding,
final MessageLoadBalancingType
loadBalancingType) {
- if ((loadBalancingType.equals(MessageLoadBalancingType.OFF) ||
loadBalancingType.equals(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION)) &&
binding instanceof RemoteQueueBinding) {
+ if ((loadBalancingType.equals(MessageLoadBalancingType.OFF) ||
loadBalancingType.equals(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION) &&
!Objects.equals(message.getRoutingType(), RoutingType.MULTICAST)) && binding
instanceof RemoteQueueBinding) {
return false;
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
index 939d916..6c5d176 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
@@ -723,7 +723,7 @@ public abstract class ClusterTestBase extends
ActiveMQTestBase {
final int msgEnd,
final boolean durable,
final String filterVal) throws Exception {
- sendInRange(node, address, msgStart, msgEnd, durable, filterVal, null);
+ sendInRange(node, address, msgStart, msgEnd, durable, filterVal, null,
null);
}
protected void sendInRange(final int node,
@@ -732,6 +732,7 @@ public abstract class ClusterTestBase extends
ActiveMQTestBase {
final int msgEnd,
final boolean durable,
final String filterVal,
+ final RoutingType routingType,
final AtomicInteger duplicateDetectionSeq)
throws Exception {
ClientSessionFactory sf = sfs[node];
@@ -756,6 +757,10 @@ public abstract class ClusterTestBase extends
ActiveMQTestBase {
message.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID,
new SimpleString(str));
}
+ if (routingType != null) {
+ message.setRoutingType(routingType);
+ }
+
message.putIntProperty(ClusterTestBase.COUNT_PROP, i);
if (isLargeMessage()) {
@@ -853,7 +858,17 @@ public abstract class ClusterTestBase extends
ActiveMQTestBase {
final boolean durable,
final String filterVal,
final AtomicInteger duplicateDetectionCounter) throws
Exception {
- sendInRange(node, address, 0, numMessages, durable, filterVal,
duplicateDetectionCounter);
+ send(node, address, numMessages, durable, filterVal, null,
duplicateDetectionCounter);
+ }
+
+ protected void send(final int node,
+ final String address,
+ final int numMessages,
+ final boolean durable,
+ final String filterVal,
+ final RoutingType routingType,
+ final AtomicInteger duplicateDetectionCounter) throws
Exception {
+ sendInRange(node, address, 0, numMessages, durable, filterVal,
routingType, duplicateDetectionCounter);
}
protected void verifyReceiveAllInRange(final boolean ack,
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
index 1ee78bb..360f1a0 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
@@ -832,6 +832,39 @@ public class MessageRedistributionTest extends
ClusterTestBase {
}
@Test
+ public void
testRedistributionToRemoteMulticastConsumerLbOffWithRedistribution() throws
Exception {
+
+ String address = "test.address";
+ String queue = "queue";
+ String clusterAddress = "test";
+ AddressSettings settings = new
AddressSettings().setRedistributionDelay(0).setAutoCreateAddresses(true).setAutoCreateQueues(true);
+ RoutingType routingType = RoutingType.MULTICAST;
+
+ getServer(0).getAddressSettingsRepository().addMatch(address, settings);
+ getServer(1).getAddressSettingsRepository().addMatch(address, settings);
+
+ setupClusterConnection("cluster0", clusterAddress,
MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION, 1, isNetty(), 0, 1);
+ setupClusterConnection("cluster0", clusterAddress,
MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION, 1, isNetty(), 1, 0);
+
+ startServers(0, 1);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+
+ createQueue(0, address, queue, null, false, routingType);
+ addConsumer(0, 0, queue, null);
+ waitForBindings(0, address, 1, 1, true);
+ waitForBindings(1, address, 1, 1, false);
+
+ createAddressInfo(1, address, routingType, 0, false);
+
+ final int noMessages = 10;
+ send(1, address, noMessages, false, null, routingType, null);
+ verifyReceiveAll(noMessages, 0);
+
+ }
+
+ @Test
public void testBackAndForth() throws Exception {
for (int i = 0; i < 10; i++) {
setupCluster(MessageLoadBalancingType.ON_DEMAND);