Repository: incubator-ratis
Updated Branches:
  refs/heads/master 27c2dfe6e -> 8055e5d6e


RATIS-381. RaftTestUtil.waitForLeader should not return null.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/8055e5d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/8055e5d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/8055e5d6

Branch: refs/heads/master
Commit: 8055e5d6e4b885b05aee302cd5f99fa1622e63c3
Parents: 27c2dfe
Author: Tsz Wo Nicholas Sze <[email protected]>
Authored: Sat Nov 3 20:29:36 2018 +0800
Committer: Tsz Wo Nicholas Sze <[email protected]>
Committed: Sat Nov 3 20:29:36 2018 +0800

----------------------------------------------------------------------
 .../org/apache/ratis/util/CheckedRunnable.java  | 10 +-
 .../org/apache/ratis/util/CheckedSupplier.java  | 10 +-
 .../java/org/apache/ratis/util/JavaUtils.java   | 98 +++++++++++---------
 .../org/apache/ratis/util/TimeDuration.java     | 20 +++-
 .../org/apache/ratis/grpc/TestRaftStream.java   |  2 +-
 .../java/org/apache/ratis/MiniRaftCluster.java  | 75 ++++++++++++---
 .../java/org/apache/ratis/RaftTestUtil.java     | 82 ++++++++--------
 .../java/org/apache/ratis/RetryCacheTests.java  | 11 ++-
 .../apache/ratis/server/ServerRestartTests.java |  6 ++
 .../server/impl/GroupManagementBaseTest.java    |  9 +-
 .../ratis/server/impl/LeaderElectionTests.java  | 46 ++++++---
 .../impl/RaftReconfigurationBaseTest.java       |  2 +-
 .../ratis/server/impl/RaftServerTestUtil.java   |  9 +-
 .../server/impl/ServerInformationBaseTest.java  |  4 +-
 14 files changed, 243 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8055e5d6/ratis-common/src/main/java/org/apache/ratis/util/CheckedRunnable.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/CheckedRunnable.java 
b/ratis-common/src/main/java/org/apache/ratis/util/CheckedRunnable.java
index b6e90b9..2911254 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/CheckedRunnable.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/CheckedRunnable.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -25,4 +25,12 @@ public interface CheckedRunnable<THROWABLE extends 
Throwable> {
    * except that this method is declared with a throws-clause.
    */
   void run() throws THROWABLE;
+
+  static <THROWABLE extends Throwable> CheckedSupplier<?, THROWABLE> 
asCheckedSupplier(
+      CheckedRunnable<THROWABLE> runnable) {
+    return () -> {
+      runnable.run();
+      return null;
+    };
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8055e5d6/ratis-common/src/main/java/org/apache/ratis/util/CheckedSupplier.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/CheckedSupplier.java 
b/ratis-common/src/main/java/org/apache/ratis/util/CheckedSupplier.java
index 06abe4c..9bbb009 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/CheckedSupplier.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/CheckedSupplier.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -27,12 +27,4 @@ public interface CheckedSupplier<OUTPUT, THROWABLE extends 
Throwable> {
    * except that this method is declared with a throws-clause.
    */
   OUTPUT get() throws THROWABLE;
-
-  static <THROWABLE extends Throwable> CheckedSupplier<?, THROWABLE> valueOf(
-      CheckedRunnable<THROWABLE> runnable) {
-    return () -> {
-      runnable.run();
-      return null;
-    };
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8055e5d6/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
index 923e03d..b855b2a 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
@@ -1,23 +1,20 @@
 /*
- * *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
-
 package org.apache.ratis.util;
 
 import org.slf4j.Logger;
@@ -49,6 +46,7 @@ public interface JavaUtils {
   Logger LOG = LoggerFactory.getLogger(JavaUtils.class);
 
   DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss,SSS");
+  CompletableFuture[] EMPTY_COMPLETABLE_FUTURE_ARRAY = {};
 
   static String date() {
     return DATE_FORMAT.format(new Date());
@@ -92,20 +90,6 @@ public interface JavaUtils {
   }
 
   /**
-   * Get the value from the future and then consume it.
-   */
-  static <T> void getAndConsume(CompletableFuture<T> future, Consumer<T> 
consumer) {
-    final T t;
-    try {
-      t = future.get();
-    } catch (Exception ignored) {
-      LOG.warn("Failed to get()", ignored);
-      return;
-    }
-    consumer.accept(t);
-  }
-
-  /**
    * Create a memoized supplier which gets a value by invoking the initializer 
once
    * and then keeps returning the same value as its supplied results.
    *
@@ -129,14 +113,23 @@ public interface JavaUtils {
     return ROOT_THREAD_GROUP.get();
   }
 
-  /** Attempt to get a return value from the given supplier multiple times. */
+  /** @deprecated use {@link #attempt(CheckedSupplier, int, TimeDuration, 
String, Logger)} */
+  @Deprecated
   static <RETURN, THROWABLE extends Throwable> RETURN attempt(
       CheckedSupplier<RETURN, THROWABLE> supplier,
       int numAttempts, long sleepMs, String name, Logger log)
       throws THROWABLE, InterruptedException {
+    return attempt(supplier, numAttempts, TimeDuration.valueOf(sleepMs, 
TimeUnit.MILLISECONDS), name, log);
+  }
+
+  /** Attempt to get a return value from the given supplier multiple times. */
+  static <RETURN, THROWABLE extends Throwable> RETURN attempt(
+      CheckedSupplier<RETURN, THROWABLE> supplier,
+      int numAttempts, TimeDuration sleepTime, String name, Logger log)
+      throws THROWABLE, InterruptedException {
     Objects.requireNonNull(supplier, "supplier == null");
     Preconditions.assertTrue(numAttempts > 0, () -> "numAttempts = " + 
numAttempts + " <= 0");
-    Preconditions.assertTrue(sleepMs >= 0L, () -> "sleepMs = " + sleepMs + " < 
0");
+    Preconditions.assertTrue(!sleepTime.isNegative(), () -> "sleepTime = " + 
sleepTime + " < 0");
 
     for(int i = 1; i <= numAttempts; i++) {
       try {
@@ -147,31 +140,45 @@ public interface JavaUtils {
         }
         if (log != null && log.isWarnEnabled()) {
           log.warn("FAILED " + name + " attempt #" + i + "/" + numAttempts
-              + ": " + t + ", sleep " + sleepMs + "ms and then retry.", t);
+              + ": " + t + ", sleep " + sleepTime + " and then retry.", t);
         }
       }
 
-      if (sleepMs > 0) {
-        Thread.sleep(sleepMs);
-      }
+      sleepTime.sleep();
     }
     throw new IllegalStateException("BUG: this line should be unreachable.");
   }
 
-  /** Attempt to run the given op multiple times. */
+  /** @deprecated use {@link #attempt(CheckedRunnable, int, TimeDuration, 
String, Logger)} */
+  @Deprecated
   static <THROWABLE extends Throwable> void attempt(
       CheckedRunnable<THROWABLE> op, int numAttempts, long sleepMs, String 
name, Logger log)
       throws THROWABLE, InterruptedException {
-    attempt(CheckedSupplier.valueOf(op), numAttempts, sleepMs, name, log);
+    attempt(op, numAttempts, TimeDuration.valueOf(sleepMs, 
TimeUnit.MILLISECONDS), name, log);
   }
 
-  /** Attempt to wait the given condition to return true multiple times. */
+  /** Attempt to run the given op multiple times. */
+  static <THROWABLE extends Throwable> void attempt(
+      CheckedRunnable<THROWABLE> runnable, int numAttempts, TimeDuration 
sleepTime, String name, Logger log)
+      throws THROWABLE, InterruptedException {
+    attempt(CheckedRunnable.asCheckedSupplier(runnable), numAttempts, 
sleepTime, name, log);
+  }
+
+  /** @deprecated use {@link #attempt(BooleanSupplier, int, TimeDuration, 
String, Logger)} */
+  @Deprecated
   static void attempt(
       BooleanSupplier condition, int numAttempts, long sleepMs, String name, 
Logger log)
       throws InterruptedException {
+    attempt(condition, numAttempts, TimeDuration.valueOf(sleepMs, 
TimeUnit.MILLISECONDS), name, log);
+  }
+
+  /** Attempt to wait the given condition to return true multiple times. */
+  static void attempt(
+      BooleanSupplier condition, int numAttempts, TimeDuration sleepTime, 
String name, Logger log)
+      throws InterruptedException {
     Objects.requireNonNull(condition, "condition == null");
     Preconditions.assertTrue(numAttempts > 0, () -> "numAttempts = " + 
numAttempts + " <= 0");
-    Preconditions.assertTrue(sleepMs >= 0L, () -> "sleepMs = " + sleepMs + " < 
0");
+    Preconditions.assertTrue(!sleepTime.isNegative(), () -> "sleepTime = " + 
sleepTime + " < 0");
 
     for(int i = 1; i <= numAttempts; i++) {
       if (condition.getAsBoolean()) {
@@ -179,11 +186,10 @@ public interface JavaUtils {
       }
       if (log != null && log.isWarnEnabled()) {
         log.warn("FAILED " + name + " attempt #" + i + "/" + numAttempts
-            + ": sleep " + sleepMs + "ms and then retry.");
-      }
-      if (sleepMs > 0) {
-        Thread.sleep(sleepMs);
+            + ": sleep " + sleepTime + " and then retry.");
       }
+
+      sleepTime.sleep();
     }
 
     if (!condition.getAsBoolean()) {
@@ -222,7 +228,7 @@ public interface JavaUtils {
   }
 
   static <T> CompletableFuture<Void> allOf(List<CompletableFuture<T>> futures) 
{
-    return CompletableFuture.allOf(futures.toArray(new 
CompletableFuture<?>[futures.size()]));
+    return 
CompletableFuture.allOf(futures.toArray(EMPTY_COMPLETABLE_FUTURE_ARRAY));
   }
 
   static <OUTPUT, THROWABLE extends Throwable> OUTPUT 
supplyAndWrapAsCompletionException(

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8055e5d6/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java 
b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
index 8a7c44a..2fad806 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,11 +17,18 @@
  */
 package org.apache.ratis.util;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.TimeUnit;
+import java.util.function.LongUnaryOperator;
 
 /**
  * Time duration is represented together with a {@link TimeUnit}.
+ *
+ * This class is immutable.
  */
 public class TimeDuration implements Comparable<TimeDuration> {
 
@@ -119,6 +126,15 @@ public class TimeDuration implements 
Comparable<TimeDuration> {
     return Math.toIntExact(toLong(targetUnit));
   }
 
+  /**
+   * Apply the given operator to the duration value of this object.
+   *
+   * @return a new object with the new duration value and the same unit of 
this object.
+   */
+  public TimeDuration apply(LongUnaryOperator operator) {
+    return valueOf(operator.applyAsLong(duration), unit);
+  }
+
   public boolean isNegative() {
     return duration < 0;
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8055e5d6/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java 
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
index b3f6a41..ba31b2b 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
@@ -291,7 +291,7 @@ public class TestRaftStream extends BaseTest {
 
     // force change the leader
     Thread.sleep(500);
-    RaftTestUtil.waitAndKillLeader(cluster, true);
+    RaftTestUtil.waitAndKillLeader(cluster);
     final RaftServerImpl newLeader = waitForLeader(cluster);
     Assert.assertNotEquals(leader.getId(), newLeader.getId());
     Thread.sleep(500);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8055e5d6/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java 
b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index 3dd0612..0e352f4 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -46,6 +46,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -143,6 +144,10 @@ public abstract class MiniRaftCluster implements Closeable 
{
     return ids;
   }
 
+  public static int getIdIndex(String id) {
+    return Integer.parseInt(id.substring(1));
+  }
+
   protected RaftGroup group;
   protected final RaftProperties properties;
   protected final Parameters parameters;
@@ -226,10 +231,16 @@ public abstract class MiniRaftCluster implements 
Closeable {
     start();
   }
 
+  /** @deprecated use {@link #getTimeoutMax()}. */
+  @Deprecated
   public int getMaxTimeout() {
     return 
RaftServerConfigKeys.Rpc.timeoutMax(properties).toInt(TimeUnit.MILLISECONDS);
   }
 
+  public TimeDuration getTimeoutMax() {
+    return RaftServerConfigKeys.Rpc.timeoutMax(properties);
+  }
+
   private RaftServerProxy newRaftServer(RaftPeerId id, RaftGroup group, 
boolean format) {
     LOG.info("newRaftServer: {}, {}, format? {}", id, group, format);
     try {
@@ -410,15 +421,60 @@ public abstract class MiniRaftCluster implements 
Closeable {
     return leader;
   }
 
+  IllegalStateException newIllegalStateExceptionForNoLeaders(RaftGroupId 
groupId) {
+    final String g = groupId == null? "": " for " + groupId;
+    return new IllegalStateException("No leader yet " + g + ": " + 
printServers(groupId));
+  }
+
+  IllegalStateException newIllegalStateExceptionForMultipleLeaders(RaftGroupId 
groupId, List<RaftServerImpl> leaders) {
+    final String g = groupId == null? "": " for " + groupId;
+    return new IllegalStateException("Found multiple leaders" + g
+        + " at the same term (=" + leaders.get(0).getState().getCurrentTerm()
+        + "), leaders.size() = " + leaders.size() + " > 1, leaders = " + 
leaders
+        + ": " + printServers(groupId));
+  }
+
+  /**
+   * Get leader for the single group case.
+   * Do not use this method if this cluster has multiple groups.
+   *
+   * @return the unique leader with the highest term. Or, return null if there 
is no leader.
+   * @throws IllegalStateException if there are multiple leaders with the same 
highest term.
+   */
   public RaftServerImpl getLeader() {
-    return getLeader((RaftGroupId)null);
+    return getLeader(getLeaders(null), null, leaders -> {
+      throw newIllegalStateExceptionForMultipleLeaders(null, leaders);
+    });
   }
 
-  public RaftServerImpl getLeader(RaftGroupId groupId) {
-    return getLeader(getServerAliveStream(groupId));
+  RaftServerImpl getLeader(RaftGroupId groupId, Runnable handleNoLeaders,
+      Consumer<List<RaftServerImpl>> handleMultipleLeaders) {
+    return getLeader(getLeaders(groupId), handleNoLeaders, 
handleMultipleLeaders);
   }
 
-  static RaftServerImpl getLeader(Stream<RaftServerImpl> serverAliveStream) {
+  static RaftServerImpl getLeader(List<RaftServerImpl> leaders, Runnable 
handleNoLeaders,
+      Consumer<List<RaftServerImpl>> handleMultipleLeaders) {
+    if (leaders.isEmpty()) {
+      if (handleNoLeaders != null) {
+        handleNoLeaders.run();
+      }
+      return null;
+    } else if (leaders.size() > 1) {
+      if (handleMultipleLeaders != null) {
+        handleMultipleLeaders.accept(leaders);
+      }
+      return null;
+    } else {
+      return leaders.get(0);
+    }
+  }
+
+  /**
+   * @return the list of leaders with the highest term (i.e. leaders with a 
lower term are not included).
+   *         from the given group.
+   */
+  private List<RaftServerImpl> getLeaders(RaftGroupId groupId) {
+    final Stream<RaftServerImpl> serverAliveStream = 
getServerAliveStream(groupId);
     final List<RaftServerImpl> leaders = new ArrayList<>();
     serverAliveStream.filter(RaftServerImpl::isLeader).forEach(s -> {
       if (leaders.isEmpty()) {
@@ -434,13 +490,7 @@ public abstract class MiniRaftCluster implements Closeable 
{
         }
       }
     });
-    if (leaders.isEmpty()) {
-      return null;
-    } else if (leaders.size() > 1) {
-      throw new IllegalStateException(leaders
-          + ", leaders.size() = " + leaders.size() + " > 1");
-    }
-    return leaders.get(0);
+    return leaders;
   }
 
   boolean isLeader(String leaderId) {
@@ -469,7 +519,8 @@ public abstract class MiniRaftCluster implements Closeable {
 
   private Stream<RaftServerImpl> getServerStream(RaftGroupId groupId) {
     final Stream<RaftServerProxy> stream = getRaftServerProxyStream(groupId);
-    return groupId != null? stream.map(s -> 
RaftServerTestUtil.getRaftServerImpl(s, groupId))
+    return groupId != null?
+        stream.filter(s -> s.containsGroup(groupId)).map(s -> 
RaftServerTestUtil.getRaftServerImpl(s, groupId))
         : stream.flatMap(s -> 
RaftServerTestUtil.getRaftServerImpls(s).stream());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8055e5d6/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 78e3768..e4d9c8c 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -36,6 +36,7 @@ import 
org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.ProtoUtils;
+import org.apache.ratis.util.TimeDuration;
 import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,8 +47,11 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
 import java.util.function.IntSupplier;
 import java.util.function.Predicate;
 
@@ -57,60 +61,54 @@ public interface RaftTestUtil {
 
   static RaftServerImpl waitForLeader(MiniRaftCluster cluster)
       throws InterruptedException {
-    return waitForLeader(cluster, false);
+    return waitForLeader(cluster, null);
   }
 
-  static RaftServerImpl waitForLeader(
-      MiniRaftCluster cluster, boolean tolerateMultipleLeaders) throws 
InterruptedException {
-    return waitForLeader(cluster, tolerateMultipleLeaders, null);
+  static RaftServerImpl waitForLeader(MiniRaftCluster cluster, RaftGroupId 
groupId)
+      throws InterruptedException {
+    return waitForLeader(cluster, groupId, true);
   }
 
   static RaftServerImpl waitForLeader(
-      MiniRaftCluster cluster, boolean tolerateMultipleLeaders, RaftGroupId 
groupId)
+      MiniRaftCluster cluster, RaftGroupId groupId, boolean expectLeader)
       throws InterruptedException {
-    final long sleepTime = (cluster.getMaxTimeout() * 3) >> 1;
-    LOG.info(cluster.printServers(groupId));
-    RaftServerImpl leader = null;
-    for(int i = 0; leader == null && i < 10; i++) {
-      Thread.sleep(sleepTime);
-      try {
-        leader = cluster.getLeader(groupId);
-      } catch(IllegalStateException e) {
-        if (!tolerateMultipleLeaders) {
-          throw e;
-        }
-      }
-    }
+    final String name = "waitForLeader-" + groupId + "-(expectLeader? " + 
expectLeader + ")";
+    final int numAttempts = expectLeader? 100: 10;
+    final TimeDuration sleepTime = cluster.getTimeoutMax().apply(d -> (d * 3) 
>> 1);
     LOG.info(cluster.printServers(groupId));
-    return leader;
-  }
 
-  static RaftServerImpl waitForLeader(
-      MiniRaftCluster cluster, final String leaderId) throws 
InterruptedException {
-    LOG.info(cluster.printServers());
-    for(int i = 0; !cluster.tryEnforceLeader(leaderId) && i < 10; i++) {
-      RaftServerImpl currLeader = cluster.getLeader();
-      LOG.info("try enforcing leader to " + leaderId + " but " +
-          (currLeader == null ? "no leader for this round" : "new leader is " 
+ currLeader.getId()));
-    }
-    LOG.info(cluster.printServers());
+    final AtomicReference<IllegalStateException> exception = new 
AtomicReference<>();
+    final Runnable handleNoLeaders = () -> {
+      throw cluster.newIllegalStateExceptionForNoLeaders(groupId);
+    };
+    final Consumer<List<RaftServerImpl>> handleMultipleLeaders = leaders -> {
+      final IllegalStateException ise = 
cluster.newIllegalStateExceptionForMultipleLeaders(groupId, leaders);
+      exception.set(ise);
+    };
 
-    final RaftServerImpl leader = cluster.getLeader();
-    Assert.assertEquals(leaderId, leader.getId().toString());
-    return leader;
-  }
+    final RaftServerImpl leader = JavaUtils.attempt(
+        () -> cluster.getLeader(groupId, handleNoLeaders, 
handleMultipleLeaders),
+        numAttempts, sleepTime, name, LOG);
 
-  static String waitAndKillLeader(MiniRaftCluster cluster,
-      boolean expectLeader) throws InterruptedException {
-    final RaftServerImpl leader = waitForLeader(cluster);
-    if (!expectLeader) {
-      Assert.assertNull(leader);
+    LOG.info(cluster.printServers(groupId));
+    if (expectLeader) {
+      return Optional.ofNullable(leader).orElseThrow(exception::get);
     } else {
-      Assert.assertNotNull(leader);
-      LOG.info("killing leader = " + leader);
-      cluster.killServer(leader.getId());
+      if (leader == null) {
+        return null;
+      } else {
+        throw new IllegalStateException("expectLeader = " + expectLeader + " 
but leader = " + leader);
+      }
     }
-    return leader != null ? leader.getId().toString() : null;
+  }
+
+  static String waitAndKillLeader(MiniRaftCluster cluster) throws 
InterruptedException {
+    final RaftServerImpl leader = waitForLeader(cluster);
+    Assert.assertNotNull(leader);
+
+    LOG.info("killing leader = " + leader);
+    cluster.killServer(leader.getId());
+    return leader.getId().toString();
   }
 
   static boolean logEntriesContains(RaftLog log, SimpleMessage... 
expectedMessages) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8055e5d6/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
index c962481..a77e6e4 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
@@ -31,6 +31,8 @@ import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.RaftServerTestUtil;
 import org.apache.ratis.server.storage.RaftLog;
 import org.apache.ratis.server.storage.RaftLogIOException;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.TimeDuration;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -38,7 +40,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.stream.LongStream;
+import java.util.concurrent.TimeUnit;
 
 import static java.util.Arrays.asList;
 
@@ -154,8 +156,11 @@ public abstract class RetryCacheTests extends BaseTest {
     // trigger setConfiguration
     cluster.setConfiguration(allPeers);
 
-    RaftTestUtil.waitForLeader(cluster);
-    final RaftPeerId newLeaderId = cluster.getLeader().getId();
+    final RaftPeerId newLeaderId = JavaUtils.attempt(() -> {
+      final RaftPeerId id = RaftTestUtil.waitForLeader(cluster).getId();
+      Assert.assertNotEquals(leaderId, id);
+      return id;
+    }, 10, TimeDuration.valueOf(100, TimeUnit.MILLISECONDS), "wait for a 
leader different than " + leaderId, LOG);
     Assert.assertNotEquals(leaderId, newLeaderId);
     // same clientId and callId in the request
     r = cluster.newRaftClientRequest(client.getId(), newLeaderId,

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8055e5d6/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java 
b/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
index 531d2e2..c4cfe48 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
@@ -42,6 +42,7 @@ import org.apache.ratis.util.LogUtils;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.util.StringUtils;
+import org.apache.ratis.util.TimeDuration;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -50,6 +51,7 @@ import java.io.File;
 import java.io.RandomAccessFile;
 import java.nio.file.Path;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -180,6 +182,10 @@ public abstract class ServerRestartTests<CLUSTER extends 
MiniRaftCluster>
   static void runTestRestartWithCorruptedLogHeader(MiniRaftCluster cluster, 
Logger LOG) throws Exception {
     cluster.start();
     RaftTestUtil.waitForLeader(cluster);
+    for(RaftServerImpl impl : cluster.iterateServerImpls()) {
+      JavaUtils.attempt(() -> getOpenLogFile(impl), 10, 
TimeDuration.valueOf(100, TimeUnit.MILLISECONDS),
+          impl.getId() + ": wait for log file creation", LOG);
+    }
 
     // shutdown all servers
     cluster.getServers().forEach(RaftServerProxy::close);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8055e5d6/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
index 66e3c61..bc3b764 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
@@ -28,7 +28,6 @@ import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.server.storage.RaftStorageDirectory;
 import org.apache.ratis.util.CheckedBiConsumer;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LogUtils;
@@ -89,7 +88,7 @@ public abstract class GroupManagementBaseTest extends 
BaseTest {
     for(RaftPeer p : newGroup.getPeers()) {
       client.groupAdd(newGroup, p.getId());
     }
-    Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true));
+    Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster));
     TimeUnit.SECONDS.sleep(1);
 
     // restart the servers with null group
@@ -99,7 +98,7 @@ public abstract class GroupManagementBaseTest extends 
BaseTest {
     }
 
     // the servers should retrieve the conf from the log.
-    Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true));
+    Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster));
 
     cluster.shutdown();
   }
@@ -173,7 +172,7 @@ public abstract class GroupManagementBaseTest extends 
BaseTest {
           client.groupAdd(groups[i], p.getId());
         }
       }
-      Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true, gid));
+      Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, gid));
       checker.accept(cluster, groups[i]);
     }
     printThreadCount(type, "start groups");
@@ -220,7 +219,7 @@ public abstract class GroupManagementBaseTest extends 
BaseTest {
       client.setConfiguration(allPeers.toArray(RaftPeer.emptyArray()));
     }
 
-    Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true));
+    Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster));
     checker.accept(cluster, groups[chosen]);
     LOG.info("update groups: " + cluster.printServers());
     printThreadCount(type, "update groups");

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8055e5d6/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index 54fbf5f..a48edc4 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -30,12 +30,12 @@ import org.apache.ratis.util.LogUtils;
 import org.apache.ratis.util.TimeDuration;
 import org.junit.Assert;
 import org.junit.Test;
+import org.slf4j.Logger;
 
 import java.util.Iterator;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.ratis.RaftTestUtil.waitAndKillLeader;
 import static org.apache.ratis.RaftTestUtil.waitForLeader;
 
 public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
@@ -51,10 +51,12 @@ public abstract class LeaderElectionTests<CLUSTER extends 
MiniRaftCluster>
     LOG.info("Running testBasicLeaderElection");
     final MiniRaftCluster cluster = newCluster(5);
     cluster.start();
-    waitAndKillLeader(cluster, true);
-    waitAndKillLeader(cluster, true);
-    waitAndKillLeader(cluster, true);
-    waitAndKillLeader(cluster, false);
+    RaftTestUtil.waitAndKillLeader(cluster);
+    RaftTestUtil.waitAndKillLeader(cluster);
+    RaftTestUtil.waitAndKillLeader(cluster);
+    testFailureCase("waitForLeader after killed a majority of servers",
+        () -> RaftTestUtil.waitForLeader(cluster, null, false),
+        IllegalStateException.class);
     cluster.shutdown();
   }
 
@@ -76,15 +78,33 @@ public abstract class LeaderElectionTests<CLUSTER extends 
MiniRaftCluster>
 
   @Test
   public void testEnforceLeader() throws Exception {
-    final int numServer = 3;
     LOG.info("Running testEnforceLeader");
-    final String leader = "s" + ThreadLocalRandom.current().nextInt(numServer);
-    LOG.info("enforce leader to " + leader);
-    final MiniRaftCluster cluster = newCluster(numServer);
-    cluster.start();
-    waitForLeader(cluster);
-    waitForLeader(cluster, leader);
-    cluster.shutdown();
+    final int numServer = 5;
+    try(final MiniRaftCluster cluster = newCluster(numServer)) {
+      cluster.start();
+
+      final RaftPeerId firstLeader = waitForLeader(cluster).getId();
+      LOG.info("firstLeader = {}", firstLeader);
+      final int first = MiniRaftCluster.getIdIndex(firstLeader.toString());
+
+      final int random = ThreadLocalRandom.current().nextInt(numServer - 1);
+      final String newLeader = "s" + (random < first? random: random + 1);
+      LOG.info("enforce leader to {}", newLeader);
+      enforceLeader(cluster, newLeader, LOG);
+    }
+  }
+
+  static void enforceLeader(MiniRaftCluster cluster, final String newLeader, 
Logger LOG) throws InterruptedException {
+    LOG.info(cluster.printServers());
+    for(int i = 0; !cluster.tryEnforceLeader(newLeader) && i < 10; i++) {
+      RaftServerImpl currLeader = cluster.getLeader();
+      LOG.info("try enforcing leader to " + newLeader + " but " +
+          (currLeader == null ? "no leader for round " + i : "new leader is " 
+ currLeader.getId()));
+    }
+    LOG.info(cluster.printServers());
+
+    final RaftServerImpl leader = cluster.getLeader();
+    Assert.assertEquals(newLeader, leader.getId().toString());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8055e5d6/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index 246a9a2..7fde1c5 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -375,7 +375,7 @@ public abstract class RaftReconfigurationBaseTest extends 
BaseTest {
           committedIndex <= 1);
 
       LOG.info("kill the current leader");
-      final String oldLeaderId = RaftTestUtil.waitAndKillLeader(cluster, true);
+      final String oldLeaderId = RaftTestUtil.waitAndKillLeader(cluster);
       LOG.info("start new peers: {}", Arrays.asList(c1.newPeers));
       for (RaftPeer np : c1.newPeers) {
         cluster.restartServer(np.getId(), false);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8055e5d6/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index 827117e..ee9008a 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -22,10 +22,12 @@ import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.TimeDuration;
 import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.stream.Stream;
@@ -36,13 +38,13 @@ public class RaftServerTestUtil {
   public static void waitAndCheckNewConf(MiniRaftCluster cluster,
       RaftPeer[] peers, int numOfRemovedPeers, Collection<String> deadPeers)
       throws Exception {
-    final long sleepMs = cluster.getMaxTimeout() * (numOfRemovedPeers + 2);
+    final TimeDuration sleepTime = cluster.getTimeoutMax().apply(n -> n * 
(numOfRemovedPeers + 2));
     JavaUtils.attempt(() -> waitAndCheckNewConf(cluster, peers, deadPeers),
-        3, sleepMs, "waitAndCheckNewConf", LOG);
+        10, sleepTime, "waitAndCheckNewConf", LOG);
   }
   private static void waitAndCheckNewConf(MiniRaftCluster cluster,
       RaftPeer[] peers, Collection<String> deadPeers) {
-    LOG.info(cluster.printServers());
+    LOG.info("waitAndCheckNewConf: peers={}, deadPeers={}, {}", 
Arrays.asList(peers), deadPeers, cluster.printServers());
     Assert.assertNotNull(cluster.getLeader());
 
     int numIncluded = 0;
@@ -50,6 +52,7 @@ public class RaftServerTestUtil {
     final RaftConfiguration current = RaftConfiguration.newBuilder()
         .setConf(peers).setLogEntryIndex(0).build();
     for (RaftServerImpl server : cluster.iterateServerImpls()) {
+      LOG.info("checking {}", server);
       if (deadPeers != null && deadPeers.contains(server.getId().toString())) {
         if (current.containsInConf(server.getId())) {
           deadIncluded++;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8055e5d6/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerInformationBaseTest.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerInformationBaseTest.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerInformationBaseTest.java
index 30aae33..77a4209 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerInformationBaseTest.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerInformationBaseTest.java
@@ -20,7 +20,6 @@ package org.apache.ratis.server.impl;
 import org.apache.log4j.Level;
 import org.apache.ratis.BaseTest;
 import org.apache.ratis.MiniRaftCluster;
-import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
@@ -141,9 +140,8 @@ public abstract class ServerInformationBaseTest<CLUSTER 
extends MiniRaftCluster>
 
   RaftClientReply sendMessages(int n, MiniRaftCluster cluster) throws 
Exception {
     LOG.info("sendMessages: " + n);
-    final RaftPeerId leader = RaftTestUtil.waitForLeader(cluster, true, 
cluster.getGroupId()).getId();
     RaftClientReply reply = null;
-    try(final RaftClient client = cluster.createClient(leader)) {
+    try(final RaftClient client = cluster.createClient()) {
       for(int i = 0; i < n; i++) {
         reply = client.send(Message.valueOf("m" + i));
       }


Reply via email to