Repository: incubator-ratis
Updated Branches:
  refs/heads/master 06002e67a -> ddb82cd19


RATIS-90. After hitting IOException, RaftClient should randomly pick a known 
server as leader. Contributed by Jing Zhao.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/ddb82cd1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/ddb82cd1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/ddb82cd1

Branch: refs/heads/master
Commit: ddb82cd190f91997d37b6937db1630a96737112c
Parents: 06002e6
Author: Jing Zhao <[email protected]>
Authored: Mon May 29 10:18:57 2017 -0700
Committer: Jing Zhao <[email protected]>
Committed: Mon May 29 10:18:57 2017 -0700

----------------------------------------------------------------------
 .../ratis/client/impl/RaftClientImpl.java       |  5 ++--
 .../org/apache/ratis/util/CollectionUtils.java  | 24 ++++++++++++++++++++
 .../ratis/grpc/client/AppendStreamer.java       |  3 ++-
 .../ratis/RaftNotLeaderExceptionBaseTest.java   | 20 +++++++++++++++-
 4 files changed, 47 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ddb82cd1/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 40e670d..75ef2a4 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -26,12 +26,10 @@ import org.apache.ratis.protocol.*;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
-import java.util.stream.Collector;
 import java.util.stream.Collectors;
 
 /** A client who sends requests to a raft service. */
@@ -177,7 +175,8 @@ final class RaftClientImpl implements RaftClient {
 
     final RaftPeerId oldLeader = request.getServerId();
     if (newLeader == null && oldLeader.equals(leaderId)) {
-      newLeader = CollectionUtils.next(oldLeader, CollectionUtils.as(peers, 
RaftPeer::getId));
+      newLeader = CollectionUtils.random(oldLeader,
+          peers.stream().map(RaftPeer::getId).collect(Collectors.toList()));
     }
     if (newLeader != null && oldLeader.equals(leaderId)) {
       LOG.debug("{}: change Leader from {} to {}", clientId, oldLeader, 
newLeader);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ddb82cd1/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
index 05fa2fb..5f68d47 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
@@ -20,11 +20,16 @@
 
 package org.apache.ratis.util;
 
+import java.util.Collection;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Objects;
+import java.util.Random;
 import java.util.function.Function;
 
 public interface CollectionUtils {
+  Random random = new Random();
+
   /**
    *  @return the next element in the iteration right after the given element;
    *          if the given element is not in the iteration, return the first 
one
@@ -45,6 +50,25 @@ public interface CollectionUtils {
     return first;
   }
 
+  /**
+   *  @return a randomly picked element which is not the given element.
+   */
+  static <T> T random(final T given, List<T> list) {
+    Objects.requireNonNull(given, "given == null");
+    Preconditions.assertTrue(list != null && !list.isEmpty(), "c is null or 
empty");
+
+    if (list.size() == 1) {
+      return list.get(0);
+    }
+
+    T selected;
+    do {
+      selected = list.get(random.nextInt(list.size()));
+    } while (selected == given);
+
+    return selected;
+  }
+
   static <INPUT, OUTPUT> Iterable<OUTPUT> as(
       Iterable<INPUT> iteration, Function<INPUT, OUTPUT> converter) {
     return () -> new Iterator<OUTPUT>() {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ddb82cd1/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
index c92820c..f9ff00a 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
@@ -115,7 +115,8 @@ public class AppendStreamer implements Closeable {
       if (oldLeader == null) {
         leaderId = peers.keySet().iterator().next();
       } else {
-        leaderId = CollectionUtils.next(oldLeader, peers.keySet());
+        leaderId = CollectionUtils.random(oldLeader,
+            new ArrayList<>(peers.keySet()));
       }
     }
     LOG.debug("{} switches leader from {} to {}. suggested leader: {}", this,

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ddb82cd1/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java
 
b/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java
index fe38778..83c88f5 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java
@@ -46,7 +46,7 @@ public abstract class RaftNotLeaderExceptionBaseTest {
 
   public static final Logger LOG =
       LoggerFactory.getLogger(RaftNotLeaderExceptionBaseTest.class);
-  public static final int NUM_PEERS = 3;
+  public static final int NUM_PEERS = 5;
 
   @Rule
   public Timeout globalTimeout = new Timeout(60 * 1000);
@@ -70,6 +70,19 @@ public abstract class RaftNotLeaderExceptionBaseTest {
 
   @Test
   public void testHandleNotLeaderException() throws Exception {
+    testHandleNotLeaderException(false);
+  }
+
+  /**
+   * Test handle both IOException and NotLeaderException
+   */
+  @Test
+  public void testHandleNotLeaderAndIOException() throws Exception {
+    testHandleNotLeaderException(true);
+  }
+
+  private void testHandleNotLeaderException(boolean killNewLeader)
+      throws Exception {
     RaftTestUtil.waitForLeader(cluster);
     final RaftPeerId leaderId = cluster.getLeader().getId();
     final RaftClient client = cluster.createClient(leaderId);
@@ -81,6 +94,11 @@ public abstract class RaftNotLeaderExceptionBaseTest {
     RaftPeerId newLeader = RaftTestUtil.changeLeader(cluster, leaderId);
     Assert.assertNotEquals(leaderId, newLeader);
 
+    if (killNewLeader) {
+      // kill the new leader
+      cluster.killServer(newLeader);
+    }
+
     RaftClientRpc rpc = client.getClientRpc();
     reply= null;
     for (int i = 0; reply == null && i < 10; i++) {

Reply via email to