This is an automated email from the ASF dual-hosted git repository.

jbertram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new e82d95f  ARTEMIS-2338 - Live server does not shutdown when using 
vote-on-replication-failure
     new 55e59fe  This closes #2709
e82d95f is described below

commit e82d95fff640e697f5104a325e66e40bd2b1c69b
Author: Andy <[email protected]>
AuthorDate: Wed May 15 10:48:18 2019 +0100

    ARTEMIS-2338 - Live server does not shutdown when using 
vote-on-replication-failure
    
    https://issues.apache.org/jira/browse/ARTEMIS-2338
---
 .../artemis/core/server/ActiveMQServerLogger.java  |   4 +
 .../cluster/qourum/QuorumVoteServerConnect.java    |  20 +++-
 .../server/cluster/qourum/ServerConnectVote.java   |  10 +-
 .../core/server/impl/ServerConnectVoteHandler.java |  16 ++-
 .../server/impl/SharedNothingLiveActivation.java   |  11 +-
 .../failover/QuorumFailOverLiveVotesTest.java      | 133 +++++++++++++++++++++
 .../failover/QuorumVoteServerConnectTest.java      |   8 +-
 7 files changed, 185 insertions(+), 17 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index a2eb1ea..70ee3ec 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1998,4 +1998,8 @@ public interface ActiveMQServerLogger extends BasicLogger 
{
    @LogMessage(level = Logger.Level.ERROR)
    @Message(id = 224097, value = "Failed to start server", format = 
Message.Format.MESSAGE_FORMAT)
    void failedToStartServer(@Cause Throwable t);
+
+   @LogMessage(level = Logger.Level.INFO)
+   @Message(id = 224098, value = "Received a vote saying the backup is live 
with connector: {0}", format = Message.Format.MESSAGE_FORMAT)
+   void qourumBackupIsLive(String liveConnector);
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumVoteServerConnect.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumVoteServerConnect.java
index dcc1892..32fea40 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumVoteServerConnect.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumVoteServerConnect.java
@@ -31,6 +31,7 @@ public class QuorumVoteServerConnect extends 
QuorumVote<ServerConnectVote, Boole
    public static final SimpleString LIVE_FAILOVER_VOTE = new 
SimpleString("LiveFailoverQuorumVote");
    private final CountDownLatch latch;
    private final String targetNodeId;
+   private final String liveConnector;
 
    private int votesNeeded;
 
@@ -51,9 +52,10 @@ public class QuorumVoteServerConnect extends 
QuorumVote<ServerConnectVote, Boole
     * 5      |       4         |     3.5     |      3
     * 6      |       5         |      4      |      4
     */
-   public QuorumVoteServerConnect(int size, String targetNodeId, boolean 
requestToStayLive) {
+   public QuorumVoteServerConnect(int size, String targetNodeId, boolean 
requestToStayLive, String liveConnector) {
       super(LIVE_FAILOVER_VOTE);
       this.targetNodeId = targetNodeId;
+      this.liveConnector = liveConnector;
       double majority;
       if (size <= 2) {
          majority = ((double) size) / 2;
@@ -71,7 +73,7 @@ public class QuorumVoteServerConnect extends 
QuorumVote<ServerConnectVote, Boole
    }
 
    public QuorumVoteServerConnect(int size, String targetNodeId) {
-      this(size, targetNodeId, false);
+      this(size, targetNodeId, false, null);
    }
    /**
     * if we can connect to a node
@@ -80,7 +82,7 @@ public class QuorumVoteServerConnect extends 
QuorumVote<ServerConnectVote, Boole
     */
    @Override
    public Vote connected() {
-      return new ServerConnectVote(targetNodeId, requestToStayLive);
+      return new ServerConnectVote(targetNodeId, requestToStayLive, null);
    }
    /**
     * if we cant connect to the node
@@ -108,11 +110,21 @@ public class QuorumVoteServerConnect extends 
QuorumVote<ServerConnectVote, Boole
    public synchronized void vote(ServerConnectVote vote) {
       if (decision)
          return;
-      if (vote.getVote()) {
+      if (!requestToStayLive && vote.getVote()) {
          total++;
          latch.countDown();
          if (total >= votesNeeded) {
             decision = true;
+         }//do the opposite, if it says there is a node connected it means the 
backup has come live
+      } else if (requestToStayLive && vote.getVote()) {
+         total++;
+         latch.countDown();
+         if (liveConnector != null && 
!liveConnector.equals(vote.getTransportConfiguration())) {
+            ActiveMQServerLogger.LOGGER.qourumBackupIsLive(liveConnector);
+            return;
+         }
+         if (total >= votesNeeded) {
+            decision = true;
          }
       }
    }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/ServerConnectVote.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/ServerConnectVote.java
index 9f108e0..8a54fc3 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/ServerConnectVote.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/ServerConnectVote.java
@@ -23,6 +23,7 @@ import java.util.Map;
 public class ServerConnectVote extends BooleanVote {
 
    private String nodeId;
+   private String transportConfiguration;
 
    public ServerConnectVote(String nodeId) {
       super(false);
@@ -33,9 +34,10 @@ public class ServerConnectVote extends BooleanVote {
       super(false);
    }
 
-   public ServerConnectVote(String nodeid, boolean isLive) {
+   public ServerConnectVote(String nodeid, boolean isLive, String 
transportConfiguration) {
       super(isLive);
       this.nodeId = nodeid;
+      this.transportConfiguration = transportConfiguration;
    }
 
    @Override
@@ -52,12 +54,18 @@ public class ServerConnectVote extends BooleanVote {
    public void encode(ActiveMQBuffer buff) {
       super.encode(buff);
       buff.writeString(nodeId);
+      buff.writeNullableString(transportConfiguration);
+   }
+
+   public String getTransportConfiguration() {
+      return transportConfiguration;
    }
 
    @Override
    public void decode(ActiveMQBuffer buff) {
       super.decode(buff);
       nodeId = buff.readString();
+      transportConfiguration = buff.readNullableString();
    }
 
    public String getNodeId() {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConnectVoteHandler.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConnectVoteHandler.java
index 3fdfaa5..7c573bb 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConnectVoteHandler.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConnectVoteHandler.java
@@ -36,14 +36,18 @@ public class ServerConnectVoteHandler implements 
QuorumVoteHandler {
    public Vote vote(Vote vote) {
       ServerConnectVote serverConnectVote = (ServerConnectVote) vote;
       String nodeid = serverConnectVote.getNodeId();
-      TopologyMemberImpl member = 
server.getClusterManager().getDefaultConnection(null).getTopology().getMember(nodeid);
+      try {
+         TopologyMemberImpl member = 
server.getClusterManager().getDefaultConnection(null).getTopology().getMember(nodeid);
 
-      if (member != null && member.getLive() != null) {
-         ActiveMQServerLogger.LOGGER.nodeFoundInClusterTopology(nodeid);
-         return new ServerConnectVote(nodeid, (Boolean) vote.getVote());
+         if (member != null && member.getLive() != null) {
+            ActiveMQServerLogger.LOGGER.nodeFoundInClusterTopology(nodeid);
+            return new ServerConnectVote(nodeid, (Boolean) vote.getVote(), 
member.getLive().toString());
+         }
+         ActiveMQServerLogger.LOGGER.nodeNotFoundInClusterTopology(nodeid);
+      } catch (Exception e) {
+         e.printStackTrace();
       }
-      ActiveMQServerLogger.LOGGER.nodeNotFoundInClusterTopology(nodeid);
-      return new ServerConnectVote(nodeid, !((Boolean) vote.getVote()));
+      return new ServerConnectVote(nodeid, !((Boolean) vote.getVote()), null);
    }
 
    @Override
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
index a4d8db2..200d167 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
@@ -254,8 +254,15 @@ public class SharedNothingLiveActivation extends 
LiveActivation {
                         if (failed && 
replicatedPolicy.isVoteOnReplicationFailure()) {
                            QuorumManager quorumManager = 
activeMQServer.getClusterManager().getQuorumManager();
                            int size = replicatedPolicy.getQuorumSize() == -1 ? 
quorumManager.getMaxClusterSize() : replicatedPolicy.getQuorumSize();
-
-                           QuorumVoteServerConnect quorumVote = new 
QuorumVoteServerConnect(size, activeMQServer.getNodeID().toString(), true);
+                           String liveConnector = null;
+                           List<ClusterConnectionConfiguration> 
clusterConfigurations = 
activeMQServer.getConfiguration().getClusterConfigurations();
+                           if (clusterConfigurations != null && 
clusterConfigurations.size() > 0) {
+                              ClusterConnectionConfiguration 
clusterConnectionConfiguration = clusterConfigurations.get(0);
+                              String connectorName = 
clusterConnectionConfiguration.getConnectorName();
+                              TransportConfiguration transportConfiguration = 
activeMQServer.getConfiguration().getConnectorConfigurations().get(connectorName);
+                              liveConnector = transportConfiguration != null ? 
transportConfiguration.toString() : null;
+                           }
+                           QuorumVoteServerConnect quorumVote = new 
QuorumVoteServerConnect(size, activeMQServer.getNodeID().toString(), true, 
liveConnector);
 
                            quorumManager.vote(quorumVote);
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/QuorumFailOverLiveVotesTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/QuorumFailOverLiveVotesTest.java
new file mode 100644
index 0000000..ded54d2
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/QuorumFailOverLiveVotesTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.cluster.failover;
+
+import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
+import org.apache.activemq.artemis.api.core.client.TopologyMember;
+import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
+import 
org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
+import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import 
org.apache.activemq.artemis.core.server.impl.SharedNothingLiveActivation;
+import 
org.apache.activemq.artemis.tests.integration.cluster.util.BackupSyncDelay;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+public class QuorumFailOverLiveVotesTest extends 
StaticClusterWithBackupFailoverTest {
+   @Override
+   protected void setupServers() throws Exception {
+      super.setupServers();
+      //we need to know who is connected to who
+      ((ReplicatedPolicyConfiguration) 
servers[0].getConfiguration().getHAPolicyConfiguration()).setGroupName("group0");
+      ((ReplicatedPolicyConfiguration) 
servers[1].getConfiguration().getHAPolicyConfiguration()).setGroupName("group1");
+      ((ReplicatedPolicyConfiguration) 
servers[2].getConfiguration().getHAPolicyConfiguration()).setGroupName("group2");
+      ((ReplicaPolicyConfiguration) 
servers[3].getConfiguration().getHAPolicyConfiguration()).setGroupName("group0");
+      ((ReplicaPolicyConfiguration) 
servers[4].getConfiguration().getHAPolicyConfiguration()).setGroupName("group1");
+      ((ReplicaPolicyConfiguration) 
servers[5].getConfiguration().getHAPolicyConfiguration()).setGroupName("group2");
+
+      //reduce the numbers so that the vote finishes faster
+      ((ReplicaPolicyConfiguration) 
servers[3].getConfiguration().getHAPolicyConfiguration()).setVoteRetries(5);
+      ((ReplicaPolicyConfiguration) 
servers[3].getConfiguration().getHAPolicyConfiguration()).setVoteRetryWait(500);
+      ((ReplicatedPolicyConfiguration) 
servers[0].getConfiguration().getHAPolicyConfiguration()).setVoteOnReplicationFailure(true);
+      ((ReplicatedPolicyConfiguration) 
servers[1].getConfiguration().getHAPolicyConfiguration()).setVoteOnReplicationFailure(true);
+      ((ReplicatedPolicyConfiguration) 
servers[2].getConfiguration().getHAPolicyConfiguration()).setVoteOnReplicationFailure(true);
+
+   }
+
+
+   @Test
+   public void testQuorumVotingLiveNotDead() throws Exception {
+      int[] liveServerIDs = new int[]{0, 1, 2};
+      setupCluster();
+      startServers(0, 1, 2);
+      new BackupSyncDelay(servers[4], servers[1], 
PacketImpl.REPLICATION_SCHEDULED_FAILOVER);
+      startServers(3, 4, 5);
+
+      for (int i : liveServerIDs) {
+         waitForTopology(servers[i], 3, 3);
+      }
+
+      waitForFailoverTopology(3, 0, 1, 2);
+      waitForFailoverTopology(4, 0, 1, 2);
+      waitForFailoverTopology(5, 0, 1, 2);
+
+      for (int i : liveServerIDs) {
+         setupSessionFactory(i, i + 3, isNetty(), false);
+         createQueue(i, QUEUES_TESTADDRESS, QUEUE_NAME, null, true);
+         addConsumer(i, i, QUEUE_NAME, null);
+      }
+
+      waitForBindings(0, QUEUES_TESTADDRESS, 1, 1, true);
+      waitForBindings(1, QUEUES_TESTADDRESS, 1, 1, true);
+      waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true);
+
+      send(0, QUEUES_TESTADDRESS, 10, false, null);
+      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
+      final QuorumFailOverLiveVotesTest.TopologyListener liveTopologyListener 
= new QuorumFailOverLiveVotesTest.TopologyListener("LIVE-1");
+
+      locators[0].addClusterTopologyListener(liveTopologyListener);
+
+      assertTrue("we assume 3 is a backup", 
servers[3].getHAPolicy().isBackup());
+      assertFalse("no shared storage", 
servers[3].getHAPolicy().isSharedStore());
+
+      SharedNothingLiveActivation liveActivation = 
(SharedNothingLiveActivation) servers[0].getActivation();
+     // ;
+      servers[0].getRemotingService().freeze(null, null);
+      waitForFailoverTopology(4, 3, 1, 2);
+      waitForFailoverTopology(5, 3, 1, 2);
+
+      assertTrue(servers[3].waitForActivation(2, TimeUnit.SECONDS));
+      liveActivation.freezeReplication();
+      waitForServerToStop(servers[0]);
+   }
+
+   @Override
+   protected boolean isSharedStorage() {
+      return false;
+   }
+
+   private static class TopologyListener implements ClusterTopologyListener {
+
+      final String prefix;
+      final Map<String, Pair<TransportConfiguration, TransportConfiguration>> 
nodes = new ConcurrentHashMap<>();
+
+      private TopologyListener(String string) {
+         prefix = string;
+      }
+
+      @Override
+      public void nodeUP(TopologyMember topologyMember, boolean last) {
+         Pair<TransportConfiguration, TransportConfiguration> connectorPair = 
new Pair<>(topologyMember.getLive(), topologyMember.getBackup());
+         nodes.put(topologyMember.getBackupGroupName(), connectorPair);
+      }
+
+      @Override
+      public void nodeDown(long eventUID, String nodeID) {
+         nodes.remove(nodeID);
+      }
+
+      @Override
+      public String toString() {
+         return "TopologyListener(" + prefix + ", #=" + nodes.size() + ")";
+      }
+   }
+}
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/QuorumVoteServerConnectTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/QuorumVoteServerConnectTest.java
index 9c4b4f3..0d11bcf 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/QuorumVoteServerConnectTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/QuorumVoteServerConnectTest.java
@@ -47,7 +47,7 @@ public class QuorumVoteServerConnectTest extends 
ActiveMQTestBase {
    public void testSuccessfulVote() {
       QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size, 
"foo");
       for (int i = 0; i < trueVotes - 1; i++) {
-         quorum.vote(new ServerConnectVote("foo", true));
+         quorum.vote(new ServerConnectVote("foo", true, null));
       }
 
       if (size > 1) {
@@ -55,7 +55,7 @@ public class QuorumVoteServerConnectTest extends 
ActiveMQTestBase {
       }
       quorum = new QuorumVoteServerConnect(size, "foo");
       for (int i = 0; i < trueVotes; i++) {
-         quorum.vote(new ServerConnectVote("foo", true));
+         quorum.vote(new ServerConnectVote("foo", true, null));
       }
       assertTrue(quorum.getDecision());
    }
@@ -64,7 +64,7 @@ public class QuorumVoteServerConnectTest extends 
ActiveMQTestBase {
    public void testUnSuccessfulVote() {
       QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size, 
"foo");
       for (int i = 0; i < trueVotes - 1; i++) {
-         quorum.vote(new ServerConnectVote("foo", true));
+         quorum.vote(new ServerConnectVote("foo", true, null));
       }
 
       if (size > 1) {
@@ -72,7 +72,7 @@ public class QuorumVoteServerConnectTest extends 
ActiveMQTestBase {
       }
       quorum = new QuorumVoteServerConnect(size, "foo");
       for (int i = 0; i < trueVotes - 1; i++) {
-         quorum.vote(new ServerConnectVote("foo", true));
+         quorum.vote(new ServerConnectVote("foo", true, null));
       }
       if (size == 1) {
          assertTrue(quorum.getDecision());

Reply via email to