ARTEMIS-1654 - fix brige reconnect logic

Make sure that if a bridge disconnects and there is no record in the topology 
that it uses the original bridge connector to reconnect.

Originally the live broker that disconnected was left in the Topology, thie 
broke quorum voting as when th evote happened all brokers when asked though th 
etarget broker was still alive.
The fix for this was to remove the target live broker from the Topology. Since 
the bridge reconnect logic relied on this in a non HA environment to reconnect 
this stopped working.
The fix now uses the original target connector (or backup) to reconnect in the 
case where the broker was actually removed from the cluster.

https://issues.apache.org/jira/browse/ARTEMIS-1654


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/032210a7
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/032210a7
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/032210a7

Branch: refs/heads/master
Commit: 032210a7c692d26baa13a80f30a3cf62c5df594e
Parents: 2a72923
Author: Andy Taylor <andy.tayl...@gmail.com>
Authored: Thu Feb 1 13:22:59 2018 +0000
Committer: Clebert Suconic <clebertsuco...@apache.org>
Committed: Fri Feb 2 12:17:41 2018 -0500

----------------------------------------------------------------------
 .../core/server/cluster/impl/BridgeImpl.java    |  11 +-
 .../cluster/impl/ClusterConnectionBridge.java   |  14 +-
 .../bridge/ClusteredBridgeReconnectTest.java    | 143 +++++++++++++++++++
 3 files changed, 154 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/032210a7/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index 4790fda..2c4db3e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -506,10 +506,6 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
       }
    }
 
-   protected boolean isPlainCoreBridge() {
-      return true;
-   }
-
    /* Hook for processing message before forwarding */
    protected Message beforeForward(final Message message, final SimpleString 
forwardingAddress) {
       if (useDuplicateDetection) {
@@ -824,7 +820,7 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
       return csf;
    }
 
-   private ClientSessionFactoryInternal reconnectOnOriginalNode() throws 
Exception {
+   protected ClientSessionFactoryInternal reconnectOnOriginalNode() throws 
Exception {
       String targetNodeIdUse = targetNodeID;
       TopologyMember nodeUse = targetNode;
       if (targetNodeIdUse != null && nodeUse != null) {
@@ -916,10 +912,7 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
 
             ActiveMQServerLogger.LOGGER.bridgeConnected(this);
 
-            // We only do this on plain core bridges
-            if (isPlainCoreBridge()) {
-               serverLocator.addClusterTopologyListener(new 
TopologyListener());
-            }
+            serverLocator.addClusterTopologyListener(new TopologyListener());
 
             keepConnecting = false;
             return;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/032210a7/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
index 7ebc273..cf17bbe 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
@@ -36,6 +36,7 @@ import 
org.apache.activemq.artemis.api.core.management.ManagementHelper;
 import org.apache.activemq.artemis.api.core.management.ResourceNames;
 import 
org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
 import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
+import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.postoffice.BindingType;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -79,6 +80,7 @@ public class ClusterConnectionBridge extends BridgeImpl {
    private final ServerLocatorInternal discoveryLocator;
 
    private final String storeAndForwardPrefix;
+   private TopologyMemberImpl member;
 
    public ClusterConnectionBridge(final ClusterConnection clusterConnection,
                                   final ClusterManager clusterManager,
@@ -139,6 +141,13 @@ public class ClusterConnectionBridge extends BridgeImpl {
    protected ClientSessionFactoryInternal createSessionFactory() throws 
Exception {
       
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator));
       ClientSessionFactoryInternal factory = (ClientSessionFactoryInternal) 
serverLocator.createSessionFactory(targetNodeID);
+      //if it is null then its possible the broker was removed after a 
disconnect so lets try the original connectors
+      if (factory == null) {
+         factory = reconnectOnOriginalNode();
+         if (factory == null) {
+            return null;
+         }
+      }
       setSessionFactory(factory);
 
       if (factory == null) {
@@ -372,9 +381,4 @@ public class ClusterConnectionBridge extends BridgeImpl {
          clusterConnection.disconnectRecord(targetNodeID);
       }
    }
-
-   @Override
-   protected boolean isPlainCoreBridge() {
-      return false;
-   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/032210a7/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java
new file mode 100644
index 0000000..a280b85
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.cluster.bridge;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionBridge;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import 
org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * This will simulate a failure of a failure.
+ * The bridge could eventually during a race or multiple failures not be able 
to reconnect because it failed again.
+ * this should make the bridge to always reconnect itself.
+ */
+
+public class ClusteredBridgeReconnectTest extends ClusterTestBase {
+
+   @Test
+   public void testReconnectBridge() throws Exception {
+      setupServer(0, isFileStorage(), isNetty());
+      setupServer(1, isFileStorage(), isNetty());
+
+      setupClusterConnection("cluster0", "queues", 
MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
+
+      setupClusterConnection("cluster1", "queues", 
MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
+
+      startServers(0, 1);
+
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+
+      createQueue(0, "queues.testaddress", "queue0", null, true);
+      createQueue(1, "queues.testaddress", "queue0", null, true);
+
+      addConsumer(0, 0, "queue0", null);
+      addConsumer(1, 1, "queue0", null);
+
+      waitForBindings(0, "queues.testaddress", 1, 1, true);
+      waitForBindings(1, "queues.testaddress", 1, 1, true);
+
+      waitForBindings(0, "queues.testaddress", 1, 1, false);
+      waitForBindings(1, "queues.testaddress", 1, 1, false);
+
+      ClientSession session0 = sfs[0].createSession();
+      ClientSession session1 = sfs[0].createSession();
+
+      session0.start();
+      session1.start();
+
+      ClientProducer producer = session0.createProducer("queues.testaddress");
+
+      int NUMBER_OF_MESSAGES = 100;
+
+      Assert.assertEquals(1, 
servers[0].getClusterManager().getClusterConnections().size());
+
+      ClusterConnectionImpl connection = 
servers[0].getClusterManager().getClusterConnections().toArray(new 
ClusterConnectionImpl[0])[0];
+      Assert.assertEquals(1, connection.getRecords().size());
+
+      MessageFlowRecord record = connection.getRecords().values().toArray(new 
MessageFlowRecord[1])[0];
+      ClusterConnectionBridge bridge = (ClusterConnectionBridge) 
record.getBridge();
+
+      for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+         ClientMessage msg = session0.createMessage(true);
+         producer.send(msg);
+         session0.commit();
+
+         if (i == 17) {
+            bridge.getSessionFactory().getConnection().fail(new 
ActiveMQException("failed once!"));
+         }
+      }
+
+      int cons0Count = 0, cons1Count = 0;
+
+      while (true) {
+         ClientMessage msg = consumers[0].getConsumer().receive(1000);
+         if (msg == null) {
+            break;
+         }
+         cons0Count++;
+         msg.acknowledge();
+         session0.commit();
+      }
+
+      while (true) {
+         ClientMessage msg = consumers[1].getConsumer().receive(1000);
+         if (msg == null) {
+            break;
+         }
+         cons1Count++;
+         msg.acknowledge();
+         session1.commit();
+      }
+
+      Assert.assertEquals("cons0 = " + cons0Count + ", cons1 = " + cons1Count, 
NUMBER_OF_MESSAGES, cons0Count + cons1Count);
+
+      session0.commit();
+      session1.commit();
+
+      connection = 
servers[0].getClusterManager().getClusterConnections().toArray(new 
ClusterConnectionImpl[0])[0];
+      Assert.assertEquals(1, connection.getRecords().size());
+      Assert.assertNotNull(bridge.getSessionFactory());
+
+      stopServers(0, 1);
+
+   }
+
+
+
+   @Override
+   @After
+   public void tearDown() throws Exception {
+      closeAllConsumers();
+      closeAllSessionFactories();
+      closeAllServerLocatorsFactories();
+      super.tearDown();
+   }
+
+   public boolean isNetty() {
+      return true;
+   }
+}

Reply via email to