This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 124ed87  ARTEMIS-2702 QuorumVoteServerConnect with requestToStayLive 
is voting order sensitive
     new 67b1b01  This closes #3068
124ed87 is described below

commit 124ed872a6f42e6ff65fc302eb61867d18fa65e6
Author: Francesco Nigro <[email protected]>
AuthorDate: Mon Apr 6 19:32:31 2020 +0200

    ARTEMIS-2702 QuorumVoteServerConnect with requestToStayLive is voting order 
sensitive
---
 .../artemis/core/server/ActiveMQServerLogger.java  |   7 +-
 .../cluster/qourum/QuorumVoteServerConnect.java    |  62 +++----
 .../failover/QuorumVoteServerConnectTest.java      | 178 +++++++++++++++++++++
 3 files changed, 215 insertions(+), 32 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 5024c82..a40230c 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -43,6 +43,7 @@ import 
org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.server.cluster.Bridge;
 import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl;
 import 
org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
+import 
org.apache.activemq.artemis.core.server.cluster.qourum.ServerConnectVote;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
 import org.apache.activemq.artemis.core.server.management.Notification;
@@ -81,6 +82,10 @@ public interface ActiveMQServerLogger extends BasicLogger {
    @Message(id = 223000, value = "Received Interrupt Exception whilst waiting 
for component to shutdown: {0}", format = Message.Format.MESSAGE_FORMAT)
    void interruptWhilstStoppingComponent(String componentClassName);
 
+   @LogMessage(level = Logger.Level.DEBUG)
+   @Message(id = 223001, value = "Ignored quorum vote due to quorum reached or 
vote casted: {0}", format = Message.Format.MESSAGE_FORMAT)
+   void ignoredQuorumVote(ServerConnectVote vote);
+
    @LogMessage(level = Logger.Level.INFO)
    @Message(id = 221000, value = "{0} Message Broker is starting with 
configuration {1}", format = Message.Format.MESSAGE_FORMAT)
    void serverStarting(String type, Configuration configuration);
@@ -2041,7 +2046,7 @@ public interface ActiveMQServerLogger extends BasicLogger 
{
 
    @LogMessage(level = Logger.Level.INFO)
    @Message(id = 224098, value = "Received a vote saying the backup is live 
with connector: {0}", format = Message.Format.MESSAGE_FORMAT)
-   void qourumBackupIsLive(String liveConnector);
+   void quorumBackupIsLive(String liveConnector);
 
    @LogMessage(level = Logger.Level.WARN)
    @Message(id = 224099, value = "Message with ID {0} has a header too large. 
More information available on debug level for class {1}",
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumVoteServerConnect.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumVoteServerConnect.java
index 32fea40..299835a 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumVoteServerConnect.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumVoteServerConnect.java
@@ -29,18 +29,14 @@ import 
org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 public class QuorumVoteServerConnect extends QuorumVote<ServerConnectVote, 
Boolean> {
 
    public static final SimpleString LIVE_FAILOVER_VOTE = new 
SimpleString("LiveFailoverQuorumVote");
-   private final CountDownLatch latch;
+   // this flag mark the end of the vote
+   private final CountDownLatch voteCompleted;
    private final String targetNodeId;
    private final String liveConnector;
-
    private int votesNeeded;
 
-   private int total = 0;
-
-   private boolean decision = false;
-
    // Is this the live requesting to stay live, or a backup requesting to 
become live.
-   private boolean requestToStayLive = false;
+   private final boolean requestToStayLive;
 
    /**
     * live nodes | remaining nodes |  majority   | votes needed
@@ -65,9 +61,9 @@ public class QuorumVoteServerConnect extends 
QuorumVote<ServerConnectVote, Boole
       }
       //votes needed could be say 2.5 so we add 1 in this case
       votesNeeded = (int) majority;
-      latch = new CountDownLatch(votesNeeded);
+      voteCompleted = new CountDownLatch(1);
       if (votesNeeded == 0) {
-         decision = true;
+         voteCompleted.countDown();
       }
       this.requestToStayLive = requestToStayLive;
    }
@@ -108,42 +104,46 @@ public class QuorumVoteServerConnect extends 
QuorumVote<ServerConnectVote, Boole
     */
    @Override
    public synchronized void vote(ServerConnectVote vote) {
-      if (decision)
+      if (voteCompleted.getCount() == 0) {
+         ActiveMQServerLogger.LOGGER.ignoredQuorumVote(vote);
          return;
-      if (!requestToStayLive && vote.getVote()) {
-         total++;
-         latch.countDown();
-         if (total >= votesNeeded) {
-            decision = true;
-         }//do the opposite, if it says there is a node connected it means the 
backup has come live
-      } else if (requestToStayLive && vote.getVote()) {
-         total++;
-         latch.countDown();
-         if (liveConnector != null && 
!liveConnector.equals(vote.getTransportConfiguration())) {
-            ActiveMQServerLogger.LOGGER.qourumBackupIsLive(liveConnector);
-            return;
-         }
-         if (total >= votesNeeded) {
-            decision = true;
+      }
+      if (vote.getVote()) {
+         if (!requestToStayLive) {
+            acceptPositiveVote();
+         } else if (liveConnector.equals(vote.getTransportConfiguration())) {
+            acceptPositiveVote();
+         } else {
+            
ActiveMQServerLogger.LOGGER.quorumBackupIsLive(vote.getTransportConfiguration());
          }
       }
    }
 
+   private synchronized void acceptPositiveVote() {
+      if (voteCompleted.getCount() == 0) {
+         throw new IllegalStateException("Cannot accept any new positive vote 
if the vote is completed or the decision is already taken");
+      }
+      votesNeeded--;
+      if (votesNeeded == 0) {
+         voteCompleted.countDown();
+      }
+   }
+
    @Override
-   public void allVotesCast(Topology voteTopology) {
-      while (latch.getCount() > 0) {
-         latch.countDown();
+   public synchronized void allVotesCast(Topology voteTopology) {
+      if (voteCompleted.getCount() > 0) {
+         voteCompleted.countDown();
       }
    }
 
    @Override
-   public Boolean getDecision() {
-      return decision;
+   public synchronized Boolean getDecision() {
+      return votesNeeded == 0;
    }
 
    public void await(int latchTimeout, TimeUnit unit) throws 
InterruptedException {
       ActiveMQServerLogger.LOGGER.waitingForQuorumVoteResults(latchTimeout, 
unit.toString().toLowerCase());
-      if (latch.await(latchTimeout, unit))
+      if (voteCompleted.await(latchTimeout, unit))
          ActiveMQServerLogger.LOGGER.receivedAllQuorumVotes();
       else
          ActiveMQServerLogger.LOGGER.timeoutWaitingForQuorumVoteResponses();
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/QuorumVoteServerConnectTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/QuorumVoteServerConnectTest.java
index 0d11bcf..a2ed117 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/QuorumVoteServerConnectTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/QuorumVoteServerConnectTest.java
@@ -18,10 +18,20 @@ package 
org.apache.activemq.artemis.tests.integration.cluster.failover;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import 
org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteServerConnect;
 import 
org.apache.activemq.artemis.core.server.cluster.qourum.ServerConnectVote;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.hamcrest.Matchers;
+import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -44,6 +54,174 @@ public class QuorumVoteServerConnectTest extends 
ActiveMQTestBase {
    }
 
    @Test
+   public void testVoteOnRequestToStay() {
+      Assume.assumeThat(trueVotes, Matchers.greaterThan(0));
+      Assume.assumeThat(size, Matchers.greaterThan(trueVotes));
+      final String liveConnector = "live";
+      final String backupConnector = "backup";
+      QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size, 
"foo", true, liveConnector);
+      quorum.vote(new ServerConnectVote("foo", true, backupConnector));
+      Assert.assertFalse(quorum.getDecision());
+      for (int i = 0; i < trueVotes - 1; i++) {
+         quorum.vote(new ServerConnectVote("foo", true, liveConnector));
+         Assert.assertFalse(quorum.getDecision());
+      }
+      quorum.vote(new ServerConnectVote("foo", true, liveConnector));
+      Assert.assertTrue(quorum.getDecision());
+   }
+
+   @Test
+   public void testAllVoteCastFreezeNotRequestToStayDecision() {
+      QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size, 
"foo");
+      Assert.assertFalse(quorum.isRequestToStayLive());
+      final boolean decisionBeforeVoteCompleted = quorum.getDecision();
+      quorum.allVotesCast(null);
+      for (int i = 0; i < trueVotes; i++) {
+         quorum.vote(new ServerConnectVote("foo", true, null));
+      }
+      Assert.assertEquals(decisionBeforeVoteCompleted, quorum.getDecision());
+   }
+
+   @Test
+   public void testAllVoteCastFreezeRequestToStayDecision() {
+      final String liveConnector = "live";
+      QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size, 
"foo", true, liveConnector);
+      Assert.assertTrue(quorum.isRequestToStayLive());
+      final boolean decisionBeforeVoteCompleted = quorum.getDecision();
+      quorum.allVotesCast(null);
+      for (int i = 0; i < trueVotes; i++) {
+         quorum.vote(new ServerConnectVote("foo", true, liveConnector));
+      }
+      Assert.assertEquals(decisionBeforeVoteCompleted, quorum.getDecision());
+   }
+
+   @Test
+   public void testAllVoteCastUnblockAwait() throws InterruptedException {
+      Assume.assumeThat(trueVotes, Matchers.greaterThan(0));
+      Assume.assumeThat(size, Matchers.greaterThan(trueVotes));
+      QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size, 
"foo");
+      Assert.assertFalse(quorum.getDecision());
+      CountDownLatch taskStarted = new CountDownLatch(1);
+      ExecutorService executor = Executors.newSingleThreadExecutor();
+      try {
+         final Future<InterruptedException> waitingTaskResult = 
executor.submit(() -> {
+            taskStarted.countDown();
+            try {
+               quorum.await(1, TimeUnit.DAYS);
+               return null;
+            } catch (InterruptedException e) {
+               return e;
+            }
+         });
+         // realistic expectation of the max time to start a Thread
+         Assert.assertTrue(taskStarted.await(10, TimeUnit.SECONDS));
+         Assert.assertFalse(waitingTaskResult.isDone());
+         quorum.allVotesCast(null);
+         try {
+            Assert.assertNull(waitingTaskResult.get(5, TimeUnit.SECONDS));
+         } catch (TimeoutException ex) {
+            Assert.fail("allVoteCast hasn't unblocked the waiting task");
+         } catch (ExecutionException ex) {
+            Assert.fail("This shouldn't really happen: the wait task shouldn't 
throw any exception: " + ex);
+         }
+         Assert.assertTrue(waitingTaskResult.isDone());
+         Assert.assertFalse(quorum.getDecision());
+      } finally {
+         executor.shutdownNow();
+      }
+   }
+
+   @Test
+   public void testRequestToStayQuorumUnblockAwait() throws 
InterruptedException {
+      Assume.assumeThat(trueVotes, Matchers.greaterThan(0));
+      Assume.assumeThat(size, Matchers.greaterThan(trueVotes));
+      final String liveConnector = "live";
+      final String backupConnector = "backup";
+      QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size, 
"foo", true, liveConnector);
+      Assert.assertFalse(quorum.getDecision());
+      CountDownLatch taskStarted = new CountDownLatch(1);
+      ExecutorService executor = Executors.newSingleThreadExecutor();
+      try {
+         final Future<InterruptedException> waitingTaskResult = 
executor.submit(() -> {
+            taskStarted.countDown();
+            try {
+               quorum.await(1, TimeUnit.DAYS);
+               return null;
+            } catch (InterruptedException e) {
+               return e;
+            }
+         });
+         // realistic expectation of the max time to start a Thread
+         Assert.assertTrue(taskStarted.await(10, TimeUnit.SECONDS));
+         quorum.vote(new ServerConnectVote("foo", true, backupConnector));
+         Assert.assertFalse(waitingTaskResult.isDone());
+         Assert.assertFalse(quorum.getDecision());
+         for (int i = 0; i < trueVotes - 1; i++) {
+            quorum.vote(new ServerConnectVote("foo", true, liveConnector));
+            Assert.assertFalse(waitingTaskResult.isDone());
+            Assert.assertFalse(quorum.getDecision());
+         }
+         quorum.vote(new ServerConnectVote("foo", true, liveConnector));
+         Assert.assertTrue(quorum.getDecision());
+         try {
+            Assert.assertNull(waitingTaskResult.get(5, TimeUnit.SECONDS));
+         } catch (TimeoutException ex) {
+            Assert.fail("allVoteCast hasn't unblocked the waiting task");
+         } catch (ExecutionException ex) {
+            Assert.fail("This shouldn't really happen: the wait task shouldn't 
throw any exception: " + ex);
+         }
+         Assert.assertTrue(waitingTaskResult.isDone());
+         Assert.assertTrue(quorum.getDecision());
+      } finally {
+         executor.shutdownNow();
+      }
+   }
+
+   @Test
+   public void testNotRequestToStayQuorumUnblockAwait() throws 
InterruptedException {
+      Assume.assumeThat(trueVotes, Matchers.greaterThan(0));
+      Assume.assumeThat(size, Matchers.greaterThan(trueVotes));
+      QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size, 
"foo");
+      Assert.assertFalse(quorum.getDecision());
+      CountDownLatch taskStarted = new CountDownLatch(1);
+      ExecutorService executor = Executors.newSingleThreadExecutor();
+      try {
+         final Future<InterruptedException> waitingTaskResult = 
executor.submit(() -> {
+            taskStarted.countDown();
+            try {
+               quorum.await(1, TimeUnit.DAYS);
+               return null;
+            } catch (InterruptedException e) {
+               return e;
+            }
+         });
+         // realistic expectation of the max time to start a Thread
+         Assert.assertTrue(taskStarted.await(10, TimeUnit.SECONDS));
+         quorum.vote(new ServerConnectVote("foo", false, null));
+         Assert.assertFalse(waitingTaskResult.isDone());
+         Assert.assertFalse(quorum.getDecision());
+         for (int i = 0; i < trueVotes - 1; i++) {
+            quorum.vote(new ServerConnectVote("foo", true, null));
+            Assert.assertFalse(waitingTaskResult.isDone());
+            Assert.assertFalse(quorum.getDecision());
+         }
+         quorum.vote(new ServerConnectVote("foo", true, null));
+         Assert.assertTrue(quorum.getDecision());
+         try {
+            Assert.assertNull(waitingTaskResult.get(5, TimeUnit.SECONDS));
+         } catch (TimeoutException ex) {
+            Assert.fail("allVoteCast hasn't unblocked the waiting task");
+         } catch (ExecutionException ex) {
+            Assert.fail("This shouldn't really happen: the wait task shouldn't 
throw any exception: " + ex);
+         }
+         Assert.assertTrue(waitingTaskResult.isDone());
+         Assert.assertTrue(quorum.getDecision());
+      } finally {
+         executor.shutdownNow();
+      }
+   }
+
+   @Test
    public void testSuccessfulVote() {
       QuorumVoteServerConnect quorum = new QuorumVoteServerConnect(size, 
"foo");
       for (int i = 0; i < trueVotes - 1; i++) {

Reply via email to