This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 124ed87 ARTEMIS-2702 QuorumVoteServerConnect with requestToStayLive
is voting order sensitive
new 67b1b01 This closes #3068
124ed87 is described below
commit 124ed872a6f42e6ff65fc302eb61867d18fa65e6
Author: Francesco Nigro <[email protected]>
AuthorDate: Mon Apr 6 19:32:31 2020 +0200
ARTEMIS-2702 QuorumVoteServerConnect with requestToStayLive is voting order
sensitive
---
.../artemis/core/server/ActiveMQServerLogger.java | 7 +-
.../cluster/qourum/QuorumVoteServerConnect.java | 62 +++----
.../failover/QuorumVoteServerConnectTest.java | 178 +++++++++++++++++++++
3 files changed, 215 insertions(+), 32 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 5024c82..a40230c 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -43,6 +43,7 @@ import
org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl;
import
org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
+import
org.apache.activemq.artemis.core.server.cluster.qourum.ServerConnectVote;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.artemis.core.server.management.Notification;
@@ -81,6 +82,10 @@ public interface ActiveMQServerLogger extends BasicLogger {
@Message(id = 223000, value = "Received Interrupt Exception whilst waiting
for component to shutdown: {0}", format = Message.Format.MESSAGE_FORMAT)
void interruptWhilstStoppingComponent(String componentClassName);
+ @LogMessage(level = Logger.Level.DEBUG)
+ @Message(id = 223001, value = "Ignored quorum vote due to quorum reached or
vote casted: {0}", format = Message.Format.MESSAGE_FORMAT)
+ void ignoredQuorumVote(ServerConnectVote vote);
+
@LogMessage(level = Logger.Level.INFO)
@Message(id = 221000, value = "{0} Message Broker is starting with
configuration {1}", format = Message.Format.MESSAGE_FORMAT)
void serverStarting(String type, Configuration configuration);
@@ -2041,7 +2046,7 @@ public interface ActiveMQServerLogger extends BasicLogger
{
@LogMessage(level = Logger.Level.INFO)
@Message(id = 224098, value = "Received a vote saying the backup is live
with connector: {0}", format = Message.Format.MESSAGE_FORMAT)
- void qourumBackupIsLive(String liveConnector);
+ void quorumBackupIsLive(String liveConnector);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 224099, value = "Message with ID {0} has a header too large.
More information available on debug level for class {1}",
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumVoteServerConnect.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumVoteServerConnect.java
index 32fea40..299835a 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumVoteServerConnect.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumVoteServerConnect.java
@@ -29,18 +29,14 @@ import
org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
public class QuorumVoteServerConnect extends QuorumVote<ServerConnectVote,
Boolean> {
public static final SimpleString LIVE_FAILOVER_VOTE = new
SimpleString("LiveFailoverQuorumVote");
- private final CountDownLatch latch;
+ // this flag mark the end of the vote
+ private final CountDownLatch voteCompleted;
private final String targetNodeId;
private final String liveConnector;
-
private int votesNeeded;
- private int total = 0;
-
- private boolean decision = false;
-
// Is this the live requesting to stay live, or a backup requesting to
become live.
- private boolean requestToStayLive = false;
+ private final boolean requestToStayLive;
/**
* live nodes | remaining nodes | majority | votes needed
@@ -65,9 +61,9 @@ public class QuorumVoteServerConnect extends
QuorumVote<ServerConnectVote, Boole
}
//votes needed could be say 2.5 so we add 1 in this case
votesNeeded = (int) majority;
- latch = new CountDownLatch(votesNeeded);
+ voteCompleted = new CountDownLatch(1);
if (votesNeeded == 0) {
- decision = true;
+ voteCompleted.countDown();
}
this.requestToStayLive = requestToStayLive;
}
@@ -108,42 +104,46 @@ public class QuorumVoteServerConnect extends
QuorumVote<ServerConnectVote, Boole
*/
@Override
public synchronized void vote(ServerConnectVote vote) {
- if (decision)
+ if (voteCompleted.getCount() == 0) {
+ ActiveMQServerLogger.LOGGER.ignoredQuorumVote(vote);
return;
- if (!requestToStayLive && vote.getVote()) {
- total++;
- latch.countDown();
- if (total >= votesNeeded) {
- decision = true;
- }//do the opposite, if it says there is a node connected it means the
backup has come live
- } else if (requestToStayLive && vote.getVote()) {
- total++;
- latch.countDown();
- if (liveConnector != null &&
!liveConnector.equals(vote.getTransportConfiguration())) {
- ActiveMQServerLogger.LOGGER.qourumBackupIsLive(liveConnector);
- return;
- }
- if (total >= votesNeeded) {
- decision = true;
+ }
+ if (vote.getVote()) {
+ if (!requestToStayLive) {
+ acceptPositiveVote();
+ } else if (liveConnector.equals(vote.getTransportConfiguration())) {
+ acceptPositiveVote();
+ } else {
+
ActiveMQServerLogger.LOGGER.quorumBackupIsLive(vote.getTransportConfiguration());
}
}
}
+ private synchronized void acceptPositiveVote() {
+ if (voteCompleted.getCount() == 0) {
+ throw new IllegalStateException("Cannot accept any new positive vote
if the vote is completed or the decision is already taken");
+ }
+ votesNeeded--;
+ if (votesNeeded == 0) {
+ voteCompleted.countDown();
+ }
+ }
+
@Override
- public void allVotesCast(Topology voteTopology) {
- while (latch.getCount() > 0) {
- latch.countDown();
+ public synchronized void allVotesCast(Topology voteTopology) {
+ if (voteCompleted.getCount() > 0) {
+ voteCompleted.countDown();
}
}
@Override
- public Boolean getDecision() {
- return decision;
+ public synchronized Boolean getDecision() {
+ return votesNeeded == 0;
}
public void await(int latchTimeout, TimeUnit unit) throws
InterruptedException {
ActiveMQServerLogger.LOGGER.waitingForQuorumVoteResults(latchTimeout,
unit.toString().toLowerCase());
- if (latch.await(latchTimeout, unit))
+ if (voteCompleted.await(latchTimeout, unit))
ActiveMQServerLogger.LOGGER.receivedAllQuorumVotes();
else
ActiveMQServerLogger.LOGGER.timeoutWaitingForQuorumVoteResponses();
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/QuorumVoteServerConnectTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/QuorumVoteServerConnectTest.java
index 0d11bcf..a2ed117 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/QuorumVoteServerConnectTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/QuorumVoteServerConnectTest.java
@@ -18,10 +18,20 @@ package
org.apache.activemq.artemis.tests.integration.cluster.failover;
import java.util.Arrays;
import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import
org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteServerConnect;
import
org.apache.activemq.artemis.core.server.cluster.qourum.ServerConnectVote;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.hamcrest.Matchers;
+import org.junit.Assert;
+import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -44,6 +54,174 @@ public class QuorumVoteServerConnectTest extends
ActiveMQTestBase {
}
@Test
+ public void testVoteOnRequestToStay() {
+ Assume.assumeThat(trueVotes, Matchers.greaterThan(0));
+ Assume.assumeThat(size, Matchers.greaterThan(trueVotes));
+ final String liveConnector = "live";
+ final String backupConnector = "backup";
+ QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size,
"foo", true, liveConnector);
+ quorum.vote(new ServerConnectVote("foo", true, backupConnector));
+ Assert.assertFalse(quorum.getDecision());
+ for (int i = 0; i < trueVotes - 1; i++) {
+ quorum.vote(new ServerConnectVote("foo", true, liveConnector));
+ Assert.assertFalse(quorum.getDecision());
+ }
+ quorum.vote(new ServerConnectVote("foo", true, liveConnector));
+ Assert.assertTrue(quorum.getDecision());
+ }
+
+ @Test
+ public void testAllVoteCastFreezeNotRequestToStayDecision() {
+ QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size,
"foo");
+ Assert.assertFalse(quorum.isRequestToStayLive());
+ final boolean decisionBeforeVoteCompleted = quorum.getDecision();
+ quorum.allVotesCast(null);
+ for (int i = 0; i < trueVotes; i++) {
+ quorum.vote(new ServerConnectVote("foo", true, null));
+ }
+ Assert.assertEquals(decisionBeforeVoteCompleted, quorum.getDecision());
+ }
+
+ @Test
+ public void testAllVoteCastFreezeRequestToStayDecision() {
+ final String liveConnector = "live";
+ QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size,
"foo", true, liveConnector);
+ Assert.assertTrue(quorum.isRequestToStayLive());
+ final boolean decisionBeforeVoteCompleted = quorum.getDecision();
+ quorum.allVotesCast(null);
+ for (int i = 0; i < trueVotes; i++) {
+ quorum.vote(new ServerConnectVote("foo", true, liveConnector));
+ }
+ Assert.assertEquals(decisionBeforeVoteCompleted, quorum.getDecision());
+ }
+
+ @Test
+ public void testAllVoteCastUnblockAwait() throws InterruptedException {
+ Assume.assumeThat(trueVotes, Matchers.greaterThan(0));
+ Assume.assumeThat(size, Matchers.greaterThan(trueVotes));
+ QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size,
"foo");
+ Assert.assertFalse(quorum.getDecision());
+ CountDownLatch taskStarted = new CountDownLatch(1);
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ try {
+ final Future<InterruptedException> waitingTaskResult =
executor.submit(() -> {
+ taskStarted.countDown();
+ try {
+ quorum.await(1, TimeUnit.DAYS);
+ return null;
+ } catch (InterruptedException e) {
+ return e;
+ }
+ });
+ // realistic expectation of the max time to start a Thread
+ Assert.assertTrue(taskStarted.await(10, TimeUnit.SECONDS));
+ Assert.assertFalse(waitingTaskResult.isDone());
+ quorum.allVotesCast(null);
+ try {
+ Assert.assertNull(waitingTaskResult.get(5, TimeUnit.SECONDS));
+ } catch (TimeoutException ex) {
+ Assert.fail("allVoteCast hasn't unblocked the waiting task");
+ } catch (ExecutionException ex) {
+ Assert.fail("This shouldn't really happen: the wait task shouldn't
throw any exception: " + ex);
+ }
+ Assert.assertTrue(waitingTaskResult.isDone());
+ Assert.assertFalse(quorum.getDecision());
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ @Test
+ public void testRequestToStayQuorumUnblockAwait() throws
InterruptedException {
+ Assume.assumeThat(trueVotes, Matchers.greaterThan(0));
+ Assume.assumeThat(size, Matchers.greaterThan(trueVotes));
+ final String liveConnector = "live";
+ final String backupConnector = "backup";
+ QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size,
"foo", true, liveConnector);
+ Assert.assertFalse(quorum.getDecision());
+ CountDownLatch taskStarted = new CountDownLatch(1);
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ try {
+ final Future<InterruptedException> waitingTaskResult =
executor.submit(() -> {
+ taskStarted.countDown();
+ try {
+ quorum.await(1, TimeUnit.DAYS);
+ return null;
+ } catch (InterruptedException e) {
+ return e;
+ }
+ });
+ // realistic expectation of the max time to start a Thread
+ Assert.assertTrue(taskStarted.await(10, TimeUnit.SECONDS));
+ quorum.vote(new ServerConnectVote("foo", true, backupConnector));
+ Assert.assertFalse(waitingTaskResult.isDone());
+ Assert.assertFalse(quorum.getDecision());
+ for (int i = 0; i < trueVotes - 1; i++) {
+ quorum.vote(new ServerConnectVote("foo", true, liveConnector));
+ Assert.assertFalse(waitingTaskResult.isDone());
+ Assert.assertFalse(quorum.getDecision());
+ }
+ quorum.vote(new ServerConnectVote("foo", true, liveConnector));
+ Assert.assertTrue(quorum.getDecision());
+ try {
+ Assert.assertNull(waitingTaskResult.get(5, TimeUnit.SECONDS));
+ } catch (TimeoutException ex) {
+ Assert.fail("allVoteCast hasn't unblocked the waiting task");
+ } catch (ExecutionException ex) {
+ Assert.fail("This shouldn't really happen: the wait task shouldn't
throw any exception: " + ex);
+ }
+ Assert.assertTrue(waitingTaskResult.isDone());
+ Assert.assertTrue(quorum.getDecision());
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ @Test
+ public void testNotRequestToStayQuorumUnblockAwait() throws
InterruptedException {
+ Assume.assumeThat(trueVotes, Matchers.greaterThan(0));
+ Assume.assumeThat(size, Matchers.greaterThan(trueVotes));
+ QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size,
"foo");
+ Assert.assertFalse(quorum.getDecision());
+ CountDownLatch taskStarted = new CountDownLatch(1);
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ try {
+ final Future<InterruptedException> waitingTaskResult =
executor.submit(() -> {
+ taskStarted.countDown();
+ try {
+ quorum.await(1, TimeUnit.DAYS);
+ return null;
+ } catch (InterruptedException e) {
+ return e;
+ }
+ });
+ // realistic expectation of the max time to start a Thread
+ Assert.assertTrue(taskStarted.await(10, TimeUnit.SECONDS));
+ quorum.vote(new ServerConnectVote("foo", false, null));
+ Assert.assertFalse(waitingTaskResult.isDone());
+ Assert.assertFalse(quorum.getDecision());
+ for (int i = 0; i < trueVotes - 1; i++) {
+ quorum.vote(new ServerConnectVote("foo", true, null));
+ Assert.assertFalse(waitingTaskResult.isDone());
+ Assert.assertFalse(quorum.getDecision());
+ }
+ quorum.vote(new ServerConnectVote("foo", true, null));
+ Assert.assertTrue(quorum.getDecision());
+ try {
+ Assert.assertNull(waitingTaskResult.get(5, TimeUnit.SECONDS));
+ } catch (TimeoutException ex) {
+ Assert.fail("allVoteCast hasn't unblocked the waiting task");
+ } catch (ExecutionException ex) {
+ Assert.fail("This shouldn't really happen: the wait task shouldn't
throw any exception: " + ex);
+ }
+ Assert.assertTrue(waitingTaskResult.isDone());
+ Assert.assertTrue(quorum.getDecision());
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ @Test
public void testSuccessfulVote() {
QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size,
"foo");
for (int i = 0; i < trueVotes - 1; i++) {