ARTEMIS-1543 Fix Quorum Vote with remain live setting
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0e3468ce Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0e3468ce Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0e3468ce Branch: refs/heads/master Commit: 0e3468cefbf15afc6a01458c133fa2db966d6444 Parents: 27f448f Author: Martyn Taylor <[email protected]> Authored: Fri Dec 8 12:47:03 2017 +0000 Committer: Andy Taylor <[email protected]> Committed: Mon Dec 18 10:36:11 2017 +0000 ---------------------------------------------------------------------- .../server/cluster/qourum/QuorumManager.java | 1 + .../cluster/qourum/QuorumVoteServerConnect.java | 16 +- .../server/impl/ServerConnectVoteHandler.java | 5 +- .../impl/SharedNothingLiveActivation.java | 2 +- .../failover/ClusterWithBackupFailoverTest.java | 344 +++++++++++++++++++ .../ClusterWithBackupFailoverTestBase.java | 328 +----------------- .../DiscoveryClusterWithBackupFailoverTest.java | 2 +- .../LiveVoteOnBackupFailureClusterTest.java | 132 +++++++ 8 files changed, 497 insertions(+), 333 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e3468ce/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumManager.java index f8b3908..77a7d18 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumManager.java @@ -299,6 +299,7 @@ public final class QuorumManager implements ClusterTopologyListener, ActiveMQCom clusterControl.authorize(); //if we are successful get the vote and check whether we need to send it to the target server, //just connecting may be enough + vote = quorumVote.connected(); if (vote.isRequestServerVote()) { vote = clusterControl.sendQuorumVote(quorumVote.getName(), vote); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e3468ce/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumVoteServerConnect.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumVoteServerConnect.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumVoteServerConnect.java index f6c608e..dcc1892 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumVoteServerConnect.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumVoteServerConnect.java @@ -38,6 +38,9 @@ public class QuorumVoteServerConnect extends QuorumVote<ServerConnectVote, Boole private boolean decision = false; + // Is this the live requesting to stay live, or a backup requesting to become live. + private boolean requestToStayLive = false; + /** * live nodes | remaining nodes | majority | votes needed * 1 | 0 | 0 | 0 @@ -48,7 +51,7 @@ public class QuorumVoteServerConnect extends QuorumVote<ServerConnectVote, Boole * 5 | 4 | 3.5 | 3 * 6 | 5 | 4 | 4 */ - public QuorumVoteServerConnect(int size, String targetNodeId) { + public QuorumVoteServerConnect(int size, String targetNodeId, boolean requestToStayLive) { super(LIVE_FAILOVER_VOTE); this.targetNodeId = targetNodeId; double majority; @@ -64,8 +67,12 @@ public class QuorumVoteServerConnect extends QuorumVote<ServerConnectVote, Boole if (votesNeeded == 0) { decision = true; } + this.requestToStayLive = requestToStayLive; } + public QuorumVoteServerConnect(int size, String targetNodeId) { + this(size, targetNodeId, false); + } /** * if we can connect to a node * @@ -73,9 +80,8 @@ public class QuorumVoteServerConnect extends QuorumVote<ServerConnectVote, Boole */ @Override public Vote connected() { - return new ServerConnectVote(targetNodeId); + return new ServerConnectVote(targetNodeId, requestToStayLive); } - /** * if we cant connect to the node * @@ -130,4 +136,8 @@ public class QuorumVoteServerConnect extends QuorumVote<ServerConnectVote, Boole else ActiveMQServerLogger.LOGGER.timeoutWaitingForQuorumVoteResponses(); } + + public boolean isRequestToStayLive() { + return requestToStayLive; + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e3468ce/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConnectVoteHandler.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConnectVoteHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConnectVoteHandler.java index ef0438a..3fdfaa5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConnectVoteHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConnectVoteHandler.java @@ -37,12 +37,13 @@ public class ServerConnectVoteHandler implements QuorumVoteHandler { ServerConnectVote serverConnectVote = (ServerConnectVote) vote; String nodeid = serverConnectVote.getNodeId(); TopologyMemberImpl member = server.getClusterManager().getDefaultConnection(null).getTopology().getMember(nodeid); + if (member != null && member.getLive() != null) { ActiveMQServerLogger.LOGGER.nodeFoundInClusterTopology(nodeid); - return new ServerConnectVote(nodeid, false); + return new ServerConnectVote(nodeid, (Boolean) vote.getVote()); } ActiveMQServerLogger.LOGGER.nodeNotFoundInClusterTopology(nodeid); - return new ServerConnectVote(nodeid, true); + return new ServerConnectVote(nodeid, !((Boolean) vote.getVote())); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e3468ce/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java index 920366a..2e289b5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java @@ -255,7 +255,7 @@ public class SharedNothingLiveActivation extends LiveActivation { QuorumManager quorumManager = activeMQServer.getClusterManager().getQuorumManager(); int size = replicatedPolicy.getQuorumSize() == -1 ? quorumManager.getMaxClusterSize() : replicatedPolicy.getQuorumSize(); - QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, activeMQServer.getNodeID().toString()); + QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, activeMQServer.getNodeID().toString(), true); quorumManager.vote(quorumVote); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e3468ce/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ClusterWithBackupFailoverTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ClusterWithBackupFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ClusterWithBackupFailoverTest.java new file mode 100644 index 0000000..646e073 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ClusterWithBackupFailoverTest.java @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.cluster.failover; + +import org.junit.Test; + +public abstract class ClusterWithBackupFailoverTest extends ClusterWithBackupFailoverTestBase { + @Test + public void testFailLiveNodes() throws Throwable { + setupCluster(); + + startServers(3, 4, 5, 0, 1, 2); + //startServers(0, 1, 2, 3, 4, 5); + + for (int i = 0; i < 3; i++) { + waitForTopology(servers[i], 3, 3); + } + + waitForFailoverTopology(3, 0, 1, 2); + waitForFailoverTopology(4, 0, 1, 2); + waitForFailoverTopology(5, 0, 1, 2); + + setupSessionFactory(0, 3, isNetty(), false); + setupSessionFactory(1, 4, isNetty(), false); + setupSessionFactory(2, 5, isNetty(), false); + + createQueue(0, QUEUES_TESTADDRESS, QUEUE_NAME, null, true); + createQueue(1, QUEUES_TESTADDRESS, QUEUE_NAME, null, true); + createQueue(2, QUEUES_TESTADDRESS, QUEUE_NAME, null, true); + + addConsumer(0, 0, QUEUE_NAME, null); + waitForBindings(0, QUEUES_TESTADDRESS, 1, 1, true); + addConsumer(1, 1, QUEUE_NAME, null); + waitForBindings(1, QUEUES_TESTADDRESS, 1, 1, true); + addConsumer(2, 2, QUEUE_NAME, null); + waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true); + + waitForBindings(); + + send(0, QUEUES_TESTADDRESS, 10, false, null); + verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); + + send(1, QUEUES_TESTADDRESS, 10, false, null); + verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); + + send(2, QUEUES_TESTADDRESS, 10, false, null); + verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); + Thread.sleep(1000); + log.info("######### Topology on client = " + locators[0].getTopology().describe() + " locator = " + locators[0]); + log.info("######### Crashing it........., sfs[0] = " + sfs[0]); + failNode(0); + + waitForFailoverTopology(4, 3, 1, 2); + waitForFailoverTopology(5, 3, 1, 2); + + // live nodes + waitForBindings(1, QUEUES_TESTADDRESS, 1, 1, true); + waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true); + // activated backup nodes + waitForBindings(3, QUEUES_TESTADDRESS, 1, 1, true); + + // live nodes + waitForBindings(1, QUEUES_TESTADDRESS, 2, 2, false); + waitForBindings(2, QUEUES_TESTADDRESS, 2, 2, false); + // activated backup nodes + waitForBindings(3, QUEUES_TESTADDRESS, 2, 2, false); + + ClusterWithBackupFailoverTestBase.log.info("** now sending"); + + send(0, QUEUES_TESTADDRESS, 10, false, null); + verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); + + send(1, QUEUES_TESTADDRESS, 10, false, null); + verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); + + send(2, QUEUES_TESTADDRESS, 10, false, null); + verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); + + failNode(1); + + waitForFailoverTopology(5, 3, 4, 2); + + Thread.sleep(1000); + // live nodes + waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true); + // activated backup nodes + waitForBindings(3, QUEUES_TESTADDRESS, 1, 1, true); + waitForBindings(4, QUEUES_TESTADDRESS, 1, 1, true); + + // live nodes + waitForBindings(2, QUEUES_TESTADDRESS, 2, 2, false); + // activated backup nodes + waitForBindings(3, QUEUES_TESTADDRESS, 2, 2, false); + waitForBindings(4, QUEUES_TESTADDRESS, 2, 2, false); + + send(0, QUEUES_TESTADDRESS, 10, false, null); + verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); + + send(1, QUEUES_TESTADDRESS, 10, false, null); + verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); + + send(2, QUEUES_TESTADDRESS, 10, false, null); + verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); + + failNode(2); + + Thread.sleep(1000); + // activated backup nodes + waitForBindings(3, QUEUES_TESTADDRESS, 1, 1, true); + waitForBindings(4, QUEUES_TESTADDRESS, 1, 1, true); + waitForBindings(5, QUEUES_TESTADDRESS, 1, 1, true); + + // activated backup nodes + waitForBindings(3, QUEUES_TESTADDRESS, 2, 2, false); + waitForBindings(4, QUEUES_TESTADDRESS, 2, 2, false); + waitForBindings(5, QUEUES_TESTADDRESS, 2, 2, false); + + send(0, QUEUES_TESTADDRESS, 10, false, null); + verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); + + send(1, QUEUES_TESTADDRESS, 10, false, null); + verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); + + send(2, QUEUES_TESTADDRESS, 10, false, null); + verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); + + removeConsumer(0); + removeConsumer(1); + removeConsumer(2); + } + + @Test + public void testFailBackupNodes() throws Exception { + setupCluster(); + + startServers(3, 4, 5, 0, 1, 2); + + for (int i = 0; i < 3; i++) { + waitForTopology(servers[i], 3, 3); + } + + setupSessionFactory(0, 3, isNetty(), false); + setupSessionFactory(1, 4, isNetty(), false); + setupSessionFactory(2, 5, isNetty(), false); + + createQueue(0, QUEUES_TESTADDRESS, QUEUE_NAME, null, true); + createQueue(1, QUEUES_TESTADDRESS, QUEUE_NAME, null, true); + createQueue(2, QUEUES_TESTADDRESS, QUEUE_NAME, null, true); + + addConsumer(0, 0, QUEUE_NAME, null); + addConsumer(1, 1, QUEUE_NAME, null); + addConsumer(2, 2, QUEUE_NAME, null); + + waitForBindings(); + + send(0, QUEUES_TESTADDRESS, 10, false, null); + verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); + + send(1, QUEUES_TESTADDRESS, 10, false, null); + verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); + + send(2, QUEUES_TESTADDRESS, 10, false, null); + verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); + + failNode(3); + + waitForBindings(); + + send(0, QUEUES_TESTADDRESS, 10, false, null); + verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); + + send(1, QUEUES_TESTADDRESS, 10, false, null); + verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); + + send(2, QUEUES_TESTADDRESS, 10, false, null); + verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); + + failNode(4); + + waitForBindings(); + + send(0, QUEUES_TESTADDRESS, 10, false, null); + verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); + + send(1, QUEUES_TESTADDRESS, 10, false, null); + verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); + + send(2, QUEUES_TESTADDRESS, 10, false, null); + verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); + + failNode(5); + + waitForBindings(); + + send(0, QUEUES_TESTADDRESS, 10, false, null); + verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); + + send(1, QUEUES_TESTADDRESS, 10, false, null); + verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); + + send(2, QUEUES_TESTADDRESS, 10, false, null); + verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); + + removeConsumer(0); + removeConsumer(1); + removeConsumer(2); + } + + @Test + public void testFailAllNodes() throws Exception { + setupCluster(); + + startServers(0, 1, 2, 3, 4, 5); + + setupSessionFactory(0, 3, isNetty(), false); + setupSessionFactory(1, 4, isNetty(), false); + setupSessionFactory(2, 5, isNetty(), false); + + waitForFailoverTopology(3, 0, 1, 2); + waitForFailoverTopology(4, 0, 1, 2); + waitForFailoverTopology(5, 0, 1, 2); + + createQueue(0, QUEUES_TESTADDRESS, QUEUE_NAME, null, true); + createQueue(1, QUEUES_TESTADDRESS, QUEUE_NAME, null, true); + createQueue(2, QUEUES_TESTADDRESS, QUEUE_NAME, null, true); + + addConsumer(0, 0, QUEUE_NAME, null); + addConsumer(1, 1, QUEUE_NAME, null); + addConsumer(2, 2, QUEUE_NAME, null); + + waitForBindings(); + + send(0, QUEUES_TESTADDRESS, 10, false, null); + verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); + + send(1, QUEUES_TESTADDRESS, 10, false, null); + verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); + + send(2, QUEUES_TESTADDRESS, 10, false, null); + verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); + + failNode(0); + + waitForFailoverTopology(4, 3, 1, 2); + waitForFailoverTopology(5, 3, 1, 2); + // live nodes + waitForBindings(1, QUEUES_TESTADDRESS, 1, 1, true); + waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true); + // activated backup nodes + waitForBindings(3, QUEUES_TESTADDRESS, 1, 1, true); + + // live nodes + waitForBindings(1, QUEUES_TESTADDRESS, 2, 2, false); + waitForBindings(2, QUEUES_TESTADDRESS, 2, 2, false); + // activated backup nodes + waitForBindings(3, QUEUES_TESTADDRESS, 2, 2, false); + + ClusterWithBackupFailoverTestBase.log.info("** now sending"); + + send(0, QUEUES_TESTADDRESS, 10, false, null); + verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); + + send(1, QUEUES_TESTADDRESS, 10, false, null); + verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); + + send(2, QUEUES_TESTADDRESS, 10, false, null); + verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); + + removeConsumer(0); + failNode(3); + + // live nodes + waitForBindings(1, QUEUES_TESTADDRESS, 1, 1, true); + waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true); + + // live nodes + waitForBindings(1, QUEUES_TESTADDRESS, 1, 1, false); + waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, false); + + send(1, QUEUES_TESTADDRESS, 10, false, null); + verifyReceiveRoundRobinInSomeOrder(true, 10, 1, 2); + + send(2, QUEUES_TESTADDRESS, 10, false, null); + verifyReceiveRoundRobinInSomeOrder(true, 10, 1, 2); + + failNode(1); + + waitForFailoverTopology(5, 2, 4); + // live nodes + waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true); + // activated backup nodes + waitForBindings(4, QUEUES_TESTADDRESS, 1, 1, true); + + // live nodes + waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, false); + // activated backup nodes + waitForBindings(4, QUEUES_TESTADDRESS, 1, 1, false); + + send(1, QUEUES_TESTADDRESS, 10, false, null); + verifyReceiveRoundRobinInSomeOrder(true, 10, 1, 2); + + send(2, QUEUES_TESTADDRESS, 10, false, null); + verifyReceiveRoundRobinInSomeOrder(true, 10, 1, 2); + + removeConsumer(1); + + // live nodes + waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true); + // live nodes + waitForBindings(2, QUEUES_TESTADDRESS, 1, 0, false); + + failNode(4, 1); + + send(2, QUEUES_TESTADDRESS, 10, false, null); + verifyReceiveRoundRobinInSomeOrder(true, 10, 2); + + failNode(2); + + // live nodes + waitForBindings(5, QUEUES_TESTADDRESS, 1, 1, true); + // live nodes + waitForBindings(5, QUEUES_TESTADDRESS, 0, 0, false); + + send(2, QUEUES_TESTADDRESS, 10, false, null); + verifyReceiveRoundRobinInSomeOrder(true, 10, 2); + + removeConsumer(2); + failNode(5); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e3468ce/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java index 745b84c..28746da 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java @@ -26,13 +26,12 @@ import org.apache.activemq.artemis.tests.integration.cluster.distribution.Cluste import org.apache.activemq.artemis.tests.integration.cluster.util.SameProcessActiveMQServer; import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer; import org.junit.Before; -import org.junit.Test; public abstract class ClusterWithBackupFailoverTestBase extends ClusterTestBase { protected static final String QUEUE_NAME = "queue0"; protected static final String QUEUES_TESTADDRESS = "queues.testaddress"; - private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER; + protected static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER; protected abstract void setupCluster(MessageLoadBalancingType messageLoadBalancingType) throws Exception; @@ -49,131 +48,7 @@ public abstract class ClusterWithBackupFailoverTestBase extends ClusterTestBase return false; } - @Test - public void testFailLiveNodes() throws Throwable { - setupCluster(); - - startServers(3, 4, 5, 0, 1, 2); - //startServers(0, 1, 2, 3, 4, 5); - - for (int i = 0; i < 3; i++) { - waitForTopology(servers[i], 3, 3); - } - - waitForFailoverTopology(3, 0, 1, 2); - waitForFailoverTopology(4, 0, 1, 2); - waitForFailoverTopology(5, 0, 1, 2); - - setupSessionFactory(0, 3, isNetty(), false); - setupSessionFactory(1, 4, isNetty(), false); - setupSessionFactory(2, 5, isNetty(), false); - - createQueue(0, QUEUES_TESTADDRESS, QUEUE_NAME, null, true); - createQueue(1, QUEUES_TESTADDRESS, QUEUE_NAME, null, true); - createQueue(2, QUEUES_TESTADDRESS, QUEUE_NAME, null, true); - - addConsumer(0, 0, QUEUE_NAME, null); - waitForBindings(0, QUEUES_TESTADDRESS, 1, 1, true); - addConsumer(1, 1, QUEUE_NAME, null); - waitForBindings(1, QUEUES_TESTADDRESS, 1, 1, true); - addConsumer(2, 2, QUEUE_NAME, null); - waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true); - - waitForBindings(); - - send(0, QUEUES_TESTADDRESS, 10, false, null); - verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); - - send(1, QUEUES_TESTADDRESS, 10, false, null); - verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); - - send(2, QUEUES_TESTADDRESS, 10, false, null); - verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); - Thread.sleep(1000); - log.info("######### Topology on client = " + locators[0].getTopology().describe() + " locator = " + locators[0]); - log.info("######### Crashing it........., sfs[0] = " + sfs[0]); - failNode(0); - - waitForFailoverTopology(4, 3, 1, 2); - waitForFailoverTopology(5, 3, 1, 2); - - // live nodes - waitForBindings(1, QUEUES_TESTADDRESS, 1, 1, true); - waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true); - // activated backup nodes - waitForBindings(3, QUEUES_TESTADDRESS, 1, 1, true); - - // live nodes - waitForBindings(1, QUEUES_TESTADDRESS, 2, 2, false); - waitForBindings(2, QUEUES_TESTADDRESS, 2, 2, false); - // activated backup nodes - waitForBindings(3, QUEUES_TESTADDRESS, 2, 2, false); - - ClusterWithBackupFailoverTestBase.log.info("** now sending"); - - send(0, QUEUES_TESTADDRESS, 10, false, null); - verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); - - send(1, QUEUES_TESTADDRESS, 10, false, null); - verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); - - send(2, QUEUES_TESTADDRESS, 10, false, null); - verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); - - failNode(1); - - waitForFailoverTopology(5, 3, 4, 2); - - Thread.sleep(1000); - // live nodes - waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true); - // activated backup nodes - waitForBindings(3, QUEUES_TESTADDRESS, 1, 1, true); - waitForBindings(4, QUEUES_TESTADDRESS, 1, 1, true); - - // live nodes - waitForBindings(2, QUEUES_TESTADDRESS, 2, 2, false); - // activated backup nodes - waitForBindings(3, QUEUES_TESTADDRESS, 2, 2, false); - waitForBindings(4, QUEUES_TESTADDRESS, 2, 2, false); - - send(0, QUEUES_TESTADDRESS, 10, false, null); - verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); - - send(1, QUEUES_TESTADDRESS, 10, false, null); - verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); - - send(2, QUEUES_TESTADDRESS, 10, false, null); - verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); - - failNode(2); - - Thread.sleep(1000); - // activated backup nodes - waitForBindings(3, QUEUES_TESTADDRESS, 1, 1, true); - waitForBindings(4, QUEUES_TESTADDRESS, 1, 1, true); - waitForBindings(5, QUEUES_TESTADDRESS, 1, 1, true); - - // activated backup nodes - waitForBindings(3, QUEUES_TESTADDRESS, 2, 2, false); - waitForBindings(4, QUEUES_TESTADDRESS, 2, 2, false); - waitForBindings(5, QUEUES_TESTADDRESS, 2, 2, false); - - send(0, QUEUES_TESTADDRESS, 10, false, null); - verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); - - send(1, QUEUES_TESTADDRESS, 10, false, null); - verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); - - send(2, QUEUES_TESTADDRESS, 10, false, null); - verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); - - removeConsumer(0); - removeConsumer(1); - removeConsumer(2); - } - - private void waitForBindings() throws Exception { + protected void waitForBindings() throws Exception { waitForBindings(0, QUEUES_TESTADDRESS, 1, 1, true); waitForBindings(1, QUEUES_TESTADDRESS, 1, 1, true); waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true); @@ -183,83 +58,6 @@ public abstract class ClusterWithBackupFailoverTestBase extends ClusterTestBase waitForBindings(2, QUEUES_TESTADDRESS, 2, 2, false); } - @Test - public void testFailBackupNodes() throws Exception { - setupCluster(); - - startServers(3, 4, 5, 0, 1, 2); - - for (int i = 0; i < 3; i++) { - waitForTopology(servers[i], 3, 3); - } - - setupSessionFactory(0, 3, isNetty(), false); - setupSessionFactory(1, 4, isNetty(), false); - setupSessionFactory(2, 5, isNetty(), false); - - createQueue(0, QUEUES_TESTADDRESS, QUEUE_NAME, null, true); - createQueue(1, QUEUES_TESTADDRESS, QUEUE_NAME, null, true); - createQueue(2, QUEUES_TESTADDRESS, QUEUE_NAME, null, true); - - addConsumer(0, 0, QUEUE_NAME, null); - addConsumer(1, 1, QUEUE_NAME, null); - addConsumer(2, 2, QUEUE_NAME, null); - - waitForBindings(); - - send(0, QUEUES_TESTADDRESS, 10, false, null); - verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); - - send(1, QUEUES_TESTADDRESS, 10, false, null); - verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); - - send(2, QUEUES_TESTADDRESS, 10, false, null); - verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); - - failNode(3); - - waitForBindings(); - - send(0, QUEUES_TESTADDRESS, 10, false, null); - verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); - - send(1, QUEUES_TESTADDRESS, 10, false, null); - verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); - - send(2, QUEUES_TESTADDRESS, 10, false, null); - verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); - - failNode(4); - - waitForBindings(); - - send(0, QUEUES_TESTADDRESS, 10, false, null); - verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); - - send(1, QUEUES_TESTADDRESS, 10, false, null); - verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); - - send(2, QUEUES_TESTADDRESS, 10, false, null); - verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); - - failNode(5); - - waitForBindings(); - - send(0, QUEUES_TESTADDRESS, 10, false, null); - verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); - - send(1, QUEUES_TESTADDRESS, 10, false, null); - verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); - - send(2, QUEUES_TESTADDRESS, 10, false, null); - verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); - - removeConsumer(0); - removeConsumer(1); - removeConsumer(2); - } - protected void setupCluster() throws Exception { setupCluster(MessageLoadBalancingType.ON_DEMAND); } @@ -297,126 +95,4 @@ public abstract class ClusterWithBackupFailoverTestBase extends ClusterTestBase ClientSession[] sessionsArray = sessions.toArray(new ClientSession[sessions.size()]); return sessionsArray; } - - @Test - public void testFailAllNodes() throws Exception { - setupCluster(); - - startServers(0, 1, 2, 3, 4, 5); - - setupSessionFactory(0, 3, isNetty(), false); - setupSessionFactory(1, 4, isNetty(), false); - setupSessionFactory(2, 5, isNetty(), false); - - waitForFailoverTopology(3, 0, 1, 2); - waitForFailoverTopology(4, 0, 1, 2); - waitForFailoverTopology(5, 0, 1, 2); - - createQueue(0, QUEUES_TESTADDRESS, QUEUE_NAME, null, true); - createQueue(1, QUEUES_TESTADDRESS, QUEUE_NAME, null, true); - createQueue(2, QUEUES_TESTADDRESS, QUEUE_NAME, null, true); - - addConsumer(0, 0, QUEUE_NAME, null); - addConsumer(1, 1, QUEUE_NAME, null); - addConsumer(2, 2, QUEUE_NAME, null); - - waitForBindings(); - - send(0, QUEUES_TESTADDRESS, 10, false, null); - verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); - - send(1, QUEUES_TESTADDRESS, 10, false, null); - verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); - - send(2, QUEUES_TESTADDRESS, 10, false, null); - verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); - - failNode(0); - - waitForFailoverTopology(4, 3, 1, 2); - waitForFailoverTopology(5, 3, 1, 2); - // live nodes - waitForBindings(1, QUEUES_TESTADDRESS, 1, 1, true); - waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true); - // activated backup nodes - waitForBindings(3, QUEUES_TESTADDRESS, 1, 1, true); - - // live nodes - waitForBindings(1, QUEUES_TESTADDRESS, 2, 2, false); - waitForBindings(2, QUEUES_TESTADDRESS, 2, 2, false); - // activated backup nodes - waitForBindings(3, QUEUES_TESTADDRESS, 2, 2, false); - - ClusterWithBackupFailoverTestBase.log.info("** now sending"); - - send(0, QUEUES_TESTADDRESS, 10, false, null); - verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); - - send(1, QUEUES_TESTADDRESS, 10, false, null); - verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); - - send(2, QUEUES_TESTADDRESS, 10, false, null); - verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2); - - removeConsumer(0); - failNode(3); - - // live nodes - waitForBindings(1, QUEUES_TESTADDRESS, 1, 1, true); - waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true); - - // live nodes - waitForBindings(1, QUEUES_TESTADDRESS, 1, 1, false); - waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, false); - - send(1, QUEUES_TESTADDRESS, 10, false, null); - verifyReceiveRoundRobinInSomeOrder(true, 10, 1, 2); - - send(2, QUEUES_TESTADDRESS, 10, false, null); - verifyReceiveRoundRobinInSomeOrder(true, 10, 1, 2); - - failNode(1); - - waitForFailoverTopology(5, 2, 4); - // live nodes - waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true); - // activated backup nodes - waitForBindings(4, QUEUES_TESTADDRESS, 1, 1, true); - - // live nodes - waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, false); - // activated backup nodes - waitForBindings(4, QUEUES_TESTADDRESS, 1, 1, false); - - send(1, QUEUES_TESTADDRESS, 10, false, null); - verifyReceiveRoundRobinInSomeOrder(true, 10, 1, 2); - - send(2, QUEUES_TESTADDRESS, 10, false, null); - verifyReceiveRoundRobinInSomeOrder(true, 10, 1, 2); - - removeConsumer(1); - - // live nodes - waitForBindings(2, QUEUES_TESTADDRESS, 1, 1, true); - // live nodes - waitForBindings(2, QUEUES_TESTADDRESS, 1, 0, false); - - failNode(4, 1); - - send(2, QUEUES_TESTADDRESS, 10, false, null); - verifyReceiveRoundRobinInSomeOrder(true, 10, 2); - - failNode(2); - - // live nodes - waitForBindings(5, QUEUES_TESTADDRESS, 1, 1, true); - // live nodes - waitForBindings(5, QUEUES_TESTADDRESS, 0, 0, false); - - send(2, QUEUES_TESTADDRESS, 10, false, null); - verifyReceiveRoundRobinInSomeOrder(true, 10, 2); - - removeConsumer(2); - failNode(5); - } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e3468ce/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/DiscoveryClusterWithBackupFailoverTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/DiscoveryClusterWithBackupFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/DiscoveryClusterWithBackupFailoverTest.java index 0f33681..ded7829 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/DiscoveryClusterWithBackupFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/DiscoveryClusterWithBackupFailoverTest.java @@ -18,7 +18,7 @@ package org.apache.activemq.artemis.tests.integration.cluster.failover; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; -public class DiscoveryClusterWithBackupFailoverTest extends ClusterWithBackupFailoverTestBase { +public class DiscoveryClusterWithBackupFailoverTest extends ClusterWithBackupFailoverTest { protected final String groupAddress = getUDPDiscoveryAddress(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e3468ce/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveVoteOnBackupFailureClusterTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveVoteOnBackupFailureClusterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveVoteOnBackupFailureClusterTest.java new file mode 100644 index 0000000..546415a --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveVoteOnBackupFailureClusterTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.cluster.failover; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration; +import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.core.remoting.FailureListener; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.junit.Wait; +import org.apache.activemq.artemis.tests.integration.cluster.util.BackupSyncDelay; +import org.junit.Test; + +public class LiveVoteOnBackupFailureClusterTest extends ClusterWithBackupFailoverTestBase { + + @Override + protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingType) throws Exception { + setupClusterConnectionWithBackups("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), 0, new int[]{1, 2}); + + setupClusterConnectionWithBackups("cluster1", "queues", messageLoadBalancingType, 1, isNetty(), 1, new int[]{0, 2}); + + setupClusterConnectionWithBackups("cluster2", "queues", messageLoadBalancingType, 1, isNetty(), 2, new int[]{0, 1}); + + setupClusterConnectionWithBackups("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), 3, new int[]{1, 2}); + + setupClusterConnectionWithBackups("cluster1", "queues", messageLoadBalancingType, 1, isNetty(), 4, new int[]{0, 2}); + + setupClusterConnectionWithBackups("cluster2", "queues", messageLoadBalancingType, 1, isNetty(), 5, new int[]{0, 1}); + } + + @Override + protected void setupServers() throws Exception { + // The backups + setupBackupServer(3, 0, isFileStorage(), isSharedStorage(), isNetty()); + setupBackupServer(4, 1, isFileStorage(), isSharedStorage(), isNetty()); + setupBackupServer(5, 2, isFileStorage(), isSharedStorage(), isNetty()); + + // The lives + setupLiveServer(0, isFileStorage(), isSharedStorage(), isNetty(), false); + setupLiveServer(1, isFileStorage(), isSharedStorage(), isNetty(), false); + setupLiveServer(2, isFileStorage(), isSharedStorage(), isNetty(), false); + + //we need to know who is connected to who + ((ReplicatedPolicyConfiguration) servers[0].getConfiguration().getHAPolicyConfiguration()).setGroupName("group0"); + ((ReplicatedPolicyConfiguration) servers[1].getConfiguration().getHAPolicyConfiguration()).setGroupName("group1"); + ((ReplicatedPolicyConfiguration) servers[2].getConfiguration().getHAPolicyConfiguration()).setGroupName("group2"); + ((ReplicaPolicyConfiguration) servers[3].getConfiguration().getHAPolicyConfiguration()).setGroupName("group0"); + ((ReplicaPolicyConfiguration) servers[4].getConfiguration().getHAPolicyConfiguration()).setGroupName("group1"); + ((ReplicaPolicyConfiguration) servers[5].getConfiguration().getHAPolicyConfiguration()).setGroupName("group2"); + + // Configure to vote to stay live, when backup dies + ((ReplicatedPolicyConfiguration) servers[0].getConfiguration().getHAPolicyConfiguration()).setVoteOnReplicationFailure(true); + ((ReplicatedPolicyConfiguration) servers[1].getConfiguration().getHAPolicyConfiguration()).setVoteOnReplicationFailure(true); + ((ReplicatedPolicyConfiguration) servers[2].getConfiguration().getHAPolicyConfiguration()).setVoteOnReplicationFailure(true); + } + + protected boolean isSharedStorage() { + return false; + } + + @Test + public void testLiveVoteSucceedsAfterBackupFailure() throws Exception { + startCluster(); + + // Wait for servers to start + for (int i = 0; i < servers.length; i++) { + waitForServerToStart(servers[i]); + } + + // Wait for backup to sync replication + for (int i = 3; i < servers.length; i++) { + Wait.waitFor(() -> servers[3].isReplicaSync()); + } + + // Register failure listener to detect when live recognises the backup has died. + final CountDownLatch latch = new CountDownLatch(1); + servers[0].getReplicationManager().getBackupTransportConnection().addFailureListener(new FailureListener() { + @Override + public void connectionFailed(ActiveMQException exception, boolean failedOver) { + latch.countDown(); + } + + @Override + public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) { + latch.countDown(); + } + }); + + servers[3].stop(); + + // Wait for live to notice backup is down. + latch.await(30, TimeUnit.SECONDS); + + // The quorum vote time out is hardcoded 5s. Wait for double the time then check server is live + Thread.sleep(10000); + assertTrue(servers[0].isStarted()); + } + + private void startCluster() throws Exception { + int[] liveServerIDs = new int[]{0, 1, 2}; + setupCluster(); + startServers(0, 1, 2); + new BackupSyncDelay(servers[4], servers[1], PacketImpl.REPLICATION_SCHEDULED_FAILOVER); + startServers(3, 4, 5); + + for (int i : liveServerIDs) { + waitForTopology(servers[i], 3, 3); + } + + waitForFailoverTopology(3, 0, 1, 2); + waitForFailoverTopology(4, 0, 1, 2); + waitForFailoverTopology(5, 0, 1, 2); + } +}
