ARTEMIS-1680 - Synchronize message load balacing type between brokers

This guarantees the update of message load balancing type between addresses and 
linked adresses


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/13e07115
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/13e07115
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/13e07115

Branch: refs/heads/master
Commit: 13e071158da465aad0f2608076f078143d9ce83c
Parents: 8dfa345
Author: raul.valdoleiros <raul.valdolei...@ceiia.com>
Authored: Tue Feb 13 15:55:46 2018 +0000
Committer: Clebert Suconic <clebertsuco...@apache.org>
Committed: Mon Feb 19 11:33:09 2018 -0500

----------------------------------------------------------------------
 .../artemis/core/postoffice/AddressManager.java |  3 +
 .../artemis/core/postoffice/PostOffice.java     |  3 +
 .../core/postoffice/impl/PostOfficeImpl.java    |  6 ++
 .../postoffice/impl/SimpleAddressManager.java   |  6 ++
 .../postoffice/impl/WildcardAddressManager.java | 19 ++++
 .../cluster/impl/ClusterConnectionImpl.java     | 14 +--
 .../mqtt/imported/MqttClusterWildcardTest.java  | 93 ++++++++++++++++++++
 .../core/server/impl/fakes/FakePostOffice.java  |  6 ++
 8 files changed, 145 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/13e07115/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java
index 858754d..c8c0428 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java
@@ -22,6 +22,7 @@ import java.util.Set;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.RoutingType;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 
@@ -76,4 +77,6 @@ public interface AddressManager {
 
    AddressInfo getAddressInfo(SimpleString address);
 
+   void updateMessageLoadBalancingTypeForAddress(SimpleString address, 
MessageLoadBalancingType messageLoadBalancingType) throws Exception;
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/13e07115/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
index b78883f..5d081a3 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
@@ -29,6 +29,7 @@ import 
org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
 import org.apache.activemq.artemis.api.core.RoutingType;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 
@@ -150,4 +151,6 @@ public interface PostOffice extends ActiveMQComponent {
    boolean isAddressBound(SimpleString address) throws Exception;
 
    Set<SimpleString> getAddresses();
+
+   void updateMessageLoadBalancingTypeForAddress(SimpleString  address, 
MessageLoadBalancingType messageLoadBalancingType) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/13e07115/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 73d6953..3f9356c 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -69,6 +69,7 @@ import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueFactory;
 import org.apache.activemq.artemis.core.server.RouteContextList;
 import org.apache.activemq.artemis.core.server.RoutingContext;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.core.server.group.GroupingHandler;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
@@ -1006,6 +1007,11 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
    }
 
    @Override
+   public void updateMessageLoadBalancingTypeForAddress(SimpleString  address, 
MessageLoadBalancingType messageLoadBalancingType) throws Exception {
+      addressManager.updateMessageLoadBalancingTypeForAddress(address, 
messageLoadBalancingType);
+   }
+
+   @Override
    public SimpleString getMatchingQueue(SimpleString address, RoutingType 
routingType) throws Exception {
       return addressManager.getMatchingQueue(address, routingType);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/13e07115/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
index 054b536..aa94de2 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
@@ -36,6 +36,7 @@ import 
org.apache.activemq.artemis.core.postoffice.BindingsFactory;
 import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.api.core.RoutingType;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.utils.CompositeAddress;
@@ -346,4 +347,9 @@ public class SimpleAddressManager implements AddressManager 
{
    public AddressInfo getAddressInfo(SimpleString addressName) {
       return addressInfoMap.get(addressName);
    }
+
+   @Override
+   public void updateMessageLoadBalancingTypeForAddress(SimpleString  address, 
MessageLoadBalancingType messageLoadBalancingType) throws Exception {
+      
getBindingsForRoutingAddress(address).setMessageLoadBalancingType(messageLoadBalancingType);
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/13e07115/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/WildcardAddressManager.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/WildcardAddressManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/WildcardAddressManager.java
index eb242f3..2180e0b 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/WildcardAddressManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/WildcardAddressManager.java
@@ -23,6 +23,7 @@ import org.apache.activemq.artemis.core.postoffice.Address;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.Bindings;
 import org.apache.activemq.artemis.core.postoffice.BindingsFactory;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 
@@ -107,6 +108,24 @@ public class WildcardAddressManager extends 
SimpleAddressManager {
       return exists;
    }
 
+   @Override
+   public void updateMessageLoadBalancingTypeForAddress(SimpleString address, 
MessageLoadBalancingType messageLoadBalancingType) throws Exception {
+      Address add = addAndUpdateAddressMap(address);
+      Bindings bindingsForRoutingAddress = 
super.getBindingsForRoutingAddress(address);
+      if (bindingsForRoutingAddress != null) {
+         
bindingsForRoutingAddress.setMessageLoadBalancingType(messageLoadBalancingType);
+      }
+      if (add.containsWildCard()) {
+         for (Address destAdd : add.getLinkedAddresses()) {
+            
getBindingsForRoutingAddress(destAdd.getAddress()).setMessageLoadBalancingType(messageLoadBalancingType);
+         }
+      } else {
+         for (Address destAdd : add.getLinkedAddresses()) {
+            
super.getBindingsForRoutingAddress(destAdd.getAddress()).setMessageLoadBalancingType(messageLoadBalancingType);
+         }
+      }
+   }
+
    /**
     * If the address is a wild card then the binding will be removed from the 
actual mappings for any linked address.
     * otherwise it will be removed as normal.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/13e07115/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
index 454ba6f..70923be 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
@@ -48,7 +48,6 @@ import org.apache.activemq.artemis.core.client.impl.Topology;
 import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
 import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
 import org.apache.activemq.artemis.core.postoffice.Binding;
-import org.apache.activemq.artemis.core.postoffice.Bindings;
 import org.apache.activemq.artemis.core.postoffice.PostOffice;
 import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
@@ -1233,9 +1232,13 @@ public final class ClusterConnectionImpl implements 
ClusterConnection, AfterConn
          } catch (Exception ignore) {
          }
 
-         Bindings theBindings = postOffice.getBindingsForAddress(queueAddress);
-
-         theBindings.setMessageLoadBalancingType(messageLoadBalancingType);
+         try {
+            postOffice.updateMessageLoadBalancingTypeForAddress(queueAddress, 
messageLoadBalancingType);
+         } catch (Exception e) {
+            if (logger.isTraceEnabled()) {
+               logger.trace(e.getLocalizedMessage(), e);
+            }
+         }
 
       }
 
@@ -1256,7 +1259,8 @@ public final class ClusterConnectionImpl implements 
ClusterConnection, AfterConn
          RemoteQueueBinding binding = bindings.remove(clusterName);
 
          if (binding == null) {
-            throw new IllegalStateException("Cannot find binding for queue " + 
clusterName);
+            logger.warn("Cannot remove binding, because cannot find binding 
for queue " + clusterName);
+            return;
          }
 
          postOffice.removeBinding(binding.getUniqueName(), null, false);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/13e07115/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterWildcardTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterWildcardTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterWildcardTest.java
index 5485f57..105e7d7 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterWildcardTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterWildcardTest.java
@@ -112,6 +112,99 @@ public class MqttClusterWildcardTest extends 
ClusterTestBase {
       }
    }
 
+   @Test
+   public void wildcardsWithBroker1Disconnected() throws Exception {
+      BlockingConnection connection1 = null;
+      BlockingConnection connection2 = null;
+      BlockingConnection connection3 = null;
+      final String TOPIC = "test/+/some/#";
+      try {
+
+         WildcardConfiguration wildcardConfiguration = new 
WildcardConfiguration();
+         wildcardConfiguration.setAnyWords('#');
+         wildcardConfiguration.setDelimiter('/');
+         wildcardConfiguration.setRoutingEnabled(true);
+         wildcardConfiguration.setSingleWord('+');
+
+         setupServer(0, false, isNetty());
+         
servers[0].getConfiguration().setWildCardConfiguration(wildcardConfiguration);
+
+
+         setupClusterConnection("cluster0", "", 
MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
+
+         startServers(0);
+
+
+         connection1 = retrieveMQTTConnection("tcp://localhost:61616");
+
+
+         // Subscribe to topics
+         Topic[] topics = {new Topic(TOPIC, QoS.AT_MOST_ONCE)};
+         connection1.subscribe(topics);
+
+         waitForBindings(0, TOPIC, 1, 1, true);
+         waitForBindings(0, TOPIC, 0, 0, false);
+
+         // Publish Messages
+         String payload1 = "This is message 1";
+         String payload2 = "This is message 2";
+         String payload3 = "This is message 3";
+
+         connection1.publish("test/1/some/la", payload1.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+         connection1.publish("test/1/some/la", payload2.getBytes(), 
QoS.AT_MOST_ONCE, false);
+         connection1.publish("test/1/some/la", payload3.getBytes(), 
QoS.AT_MOST_ONCE, false);
+
+         Message message1 = connection1.receive(5, TimeUnit.SECONDS);
+
+         setupServer(1, false, isNetty());
+         
servers[1].getConfiguration().setWildCardConfiguration(wildcardConfiguration);
+
+         setupClusterConnection("cluster1", "", 
MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
+         startServers(1);
+
+         connection2 = retrieveMQTTConnection("tcp://localhost:61617");
+         connection3 = retrieveMQTTConnection("tcp://localhost:61617");
+         connection2.subscribe(topics);
+         connection3.subscribe(new Topic[]{new Topic("teste/1/some/1", 
QoS.AT_MOST_ONCE)});
+
+         waitForBindings(1, TOPIC, 1, 1, false);
+         waitForBindings(1, TOPIC, 1, 1, true);
+
+         connection1.publish("test/1/some/la", payload1.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+         connection1.publish("test/1/some/la", payload2.getBytes(), 
QoS.AT_MOST_ONCE, false);
+         connection1.publish("test/1/some/la", payload3.getBytes(), 
QoS.AT_MOST_ONCE, false);
+
+
+         Message message2 = connection1.receive(5, TimeUnit.SECONDS);
+         Message message3 = connection1.receive(5, TimeUnit.SECONDS);
+         Message message4 = connection2.receive(5, TimeUnit.SECONDS);
+         Message message5 = connection2.receive(5, TimeUnit.SECONDS);
+         Message message6 = connection2.receive(5, TimeUnit.SECONDS);
+
+         assertEquals(payload1, new String(message1.getPayload()));
+         assertEquals(payload2, new String(message2.getPayload()));
+         assertEquals(payload3, new String(message3.getPayload()));
+         assertEquals(payload1, new String(message4.getPayload()));
+         assertEquals(payload2, new String(message5.getPayload()));
+         assertEquals(payload3, new String(message6.getPayload()));
+
+      } finally {
+         String[] topics = new String[]{TOPIC};
+         if (connection1 != null) {
+            connection1.unsubscribe(topics);
+            connection1.disconnect();
+         }
+         if (connection2 != null) {
+            connection2.unsubscribe(topics);
+            connection2.disconnect();
+         }
+         if (connection3 != null) {
+            connection3.unsubscribe(new String[]{"teste/1/some/1"});
+            connection3.disconnect();
+         }
+      }
+   }
+
    private static BlockingConnection retrieveMQTTConnection(String host) 
throws Exception {
       MQTT mqtt = new MQTT();
       mqtt.setHost(host);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/13e07115/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
----------------------------------------------------------------------
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
index 96c7451..e455e41 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
@@ -36,6 +36,7 @@ import 
org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
 import org.apache.activemq.artemis.api.core.RoutingType;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
 import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -69,6 +70,11 @@ public class FakePostOffice implements PostOffice {
    }
 
    @Override
+   public void updateMessageLoadBalancingTypeForAddress(SimpleString  address, 
MessageLoadBalancingType messageLoadBalancingType) throws Exception {
+
+   }
+
+   @Override
    public SimpleString getMatchingQueue(SimpleString address, RoutingType 
routingType) {
 
       return null;

Reply via email to