This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new ea1341b05 RATIS-2012. Client should not retry after close. (#1025)
ea1341b05 is described below

commit ea1341b05dbaedb7b9775fa870522bf1de052a0e
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Tue Jan 23 10:01:01 2024 -0800

    RATIS-2012. Client should not retry after close. (#1025)
---
 .../org/apache/ratis/client/impl/BlockingImpl.java |  12 +-
 .../org/apache/ratis/client/impl/OrderedAsync.java | 101 ++++------
 .../apache/ratis/client/impl/RaftClientImpl.java   |  42 ++--
 .../apache/ratis/client/impl/UnorderedAsync.java   |  14 +-
 .../ratis/client/retry/ClientRetryEvent.java       |  24 +--
 .../java/org/apache/ratis/util/PeerProxyMap.java   |  17 +-
 .../test/java/org/apache/ratis/RaftAsyncTests.java |   2 +-
 .../test/java/org/apache/ratis/RaftBasicTests.java |  14 +-
 .../test/java/org/apache/ratis/RaftTestUtil.java   |  30 ++-
 .../org/apache/ratis/RaftLogTruncateTests.java     | 216 +++++++++++++++++++++
 .../ratis/grpc/TestRaftLogTruncateWithGrpc.java    |  24 +++
 .../org/apache/ratis/grpc/TestRaftWithGrpc.java    |   6 -
 .../ratis/retry/TestExceptionDependentRetry.java   |   2 +-
 .../org/apache/ratis/retry/TestRetryPolicy.java    |  20 +-
 .../apache/ratis/server/ServerRestartTests.java    |   2 +-
 15 files changed, 380 insertions(+), 146 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
index 4be9fa327..76987801b 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
@@ -119,16 +119,18 @@ class BlockingImpl implements BlockingApi {
         ioe = e;
       }
 
-      pending.incrementExceptionCount(ioe);
-      ClientRetryEvent event = new ClientRetryEvent(request, ioe, pending);
+      if (client.isClosed()) {
+        throw new AlreadyClosedException(this + " is closed.");
+      }
+
+      final ClientRetryEvent event = pending.newClientRetryEvent(request, ioe);
       final RetryPolicy retryPolicy = client.getRetryPolicy();
       final RetryPolicy.Action action = 
retryPolicy.handleAttemptFailure(event);
-      TimeDuration sleepTime = client.getEffectiveSleepTime(ioe, 
action.getSleepTime());
-
       if (!action.shouldRetry()) {
-        throw (IOException)client.noMoreRetries(event);
+        throw client.noMoreRetries(event);
       }
 
+      final TimeDuration sleepTime = client.getEffectiveSleepTime(ioe, 
action.getSleepTime());
       try {
         sleepTime.sleep();
       } catch (InterruptedException e) {
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
index a1aa58681..34dc3be11 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
@@ -149,10 +149,6 @@ public final class OrderedAsync {
     
getSlidingWindow(request).fail(request.getSlidingWindowEntry().getSeqNum(), t);
   }
 
-  private void handleAsyncRetryFailure(ClientRetryEvent event) {
-    failAllAsyncRequests(event.getRequest(), client.noMoreRetries(event));
-  }
-
   CompletableFuture<RaftClientReply> send(RaftClientRequest.Type type, Message 
message, RaftPeerId server) {
     if (!type.is(TypeCase.WATCH) && !type.is(TypeCase.MESSAGESTREAM)) {
       Objects.requireNonNull(message, "message == null");
@@ -187,85 +183,68 @@ public final class OrderedAsync {
     if (pending == null) {
       return;
     }
-
-    final CompletableFuture<RaftClientReply> f = pending.getReplyFuture();
-    if (f.isDone()) {
+    if (pending.getReplyFuture().isDone()) {
       return;
     }
 
-    final RaftClientRequest request = pending.newRequestImpl();
+    final RaftClientRequest request = pending.newRequest();
     if (request == null) { // already done
-      LOG.debug("{} newRequestImpl returns null", pending);
+      LOG.debug("{} newRequest returns null", pending);
       return;
     }
 
-    final RetryPolicy retryPolicy = client.getRetryPolicy();
-    sendRequest(pending).exceptionally(e -> {
-      if (e instanceof CompletionException) {
-        e = JavaUtils.unwrapCompletionException(e);
-        scheduleWithTimeout(pending, request, retryPolicy, e);
-        return null;
-      }
-      f.completeExceptionally(e);
-      return null;
-    });
-  }
-
-  private void scheduleWithTimeout(PendingOrderedRequest pending,
-      RaftClientRequest request, RetryPolicy retryPolicy, Throwable e) {
-    final int attempt = pending.getAttemptCount();
-    final ClientRetryEvent event = new ClientRetryEvent(request, e, pending);
-    final TimeDuration sleepTime = client.getEffectiveSleepTime(e,
-        retryPolicy.handleAttemptFailure(event).getSleepTime());
-    LOG.debug("schedule* attempt #{} with sleep {} and policy {} for {}", 
attempt, sleepTime, retryPolicy, request);
-    scheduleWithTimeout(pending, sleepTime, getSlidingWindow(request));
-  }
-
-  private void scheduleWithTimeout(PendingOrderedRequest pending, TimeDuration 
sleepTime,
-      SlidingWindow.Client<PendingOrderedRequest, RaftClientReply> 
slidingWindow) {
-    client.getScheduler().onTimeout(sleepTime,
-        () -> slidingWindow.retry(pending, this::sendRequestWithRetry),
-        LOG, () -> "Failed* to retry " + pending);
-  }
-
-  private CompletableFuture<RaftClientReply> sendRequest(PendingOrderedRequest 
pending) {
-    final RetryPolicy retryPolicy = client.getRetryPolicy();
-    final RaftClientRequest request;
     if (getSlidingWindow((RaftPeerId) null).isFirst(pending.getSeqNum())) {
       pending.setFirstRequest();
     }
-    request = pending.newRequest();
     LOG.debug("{}: send* {}", client.getId(), request);
-    return client.getClientRpc().sendRequestAsync(request).thenApply(reply -> {
+    client.getClientRpc().sendRequestAsync(request).thenAccept(reply -> {
       LOG.debug("{}: receive* {}", client.getId(), reply);
       Objects.requireNonNull(reply, "reply == null");
       client.handleReply(request, reply);
       getSlidingWindow(request).receiveReply(
           request.getSlidingWindowEntry().getSeqNum(), reply, 
this::sendRequestWithRetry);
-      return reply;
     }).exceptionally(e -> {
       LOG.error(client.getId() + ": Failed* " + request, e);
-      e = JavaUtils.unwrapCompletionException(e);
-      if (e instanceof IOException && !(e instanceof GroupMismatchException)) {
-        pending.incrementExceptionCount(e);
-        final ClientRetryEvent event = new ClientRetryEvent(request, e, 
pending);
-        if (!retryPolicy.handleAttemptFailure(event).shouldRetry()) {
-          handleAsyncRetryFailure(event);
-        } else {
-          if (e instanceof NotLeaderException) {
-            NotLeaderException nle = (NotLeaderException)e;
-            client.handleNotLeaderException(request, nle, 
this::resetSlidingWindow);
-          } else {
-            client.handleIOException(request, (IOException) e, null, 
this::resetSlidingWindow);
-          }
-        }
-        throw new CompletionException(e);
-      }
-      failAllAsyncRequests(request, e);
+      handleException(pending, request, e);
       return null;
     });
   }
 
+  private void handleException(PendingOrderedRequest pending, 
RaftClientRequest request, Throwable e) {
+    final RetryPolicy retryPolicy = client.getRetryPolicy();
+    if (client.isClosed()) {
+      failAllAsyncRequests(request, new AlreadyClosedException(client + " is 
closed."));
+      return;
+    }
+
+    e = JavaUtils.unwrapCompletionException(e);
+    if (!(e instanceof IOException) || e instanceof GroupMismatchException) {
+      // non-retryable exceptions
+      failAllAsyncRequests(request, e);
+      return;
+    }
+
+    final ClientRetryEvent event = pending.newClientRetryEvent(request, e);
+    final RetryPolicy.Action action = retryPolicy.handleAttemptFailure(event);
+    if (!action.shouldRetry()) {
+      failAllAsyncRequests(request, client.noMoreRetries(event));
+      return;
+    }
+
+    if (e instanceof NotLeaderException) {
+      client.handleNotLeaderException(request, (NotLeaderException) e, 
this::resetSlidingWindow);
+    } else {
+      client.handleIOException(request, (IOException) e, null, 
this::resetSlidingWindow);
+    }
+    final TimeDuration sleepTime = client.getEffectiveSleepTime(e, 
action.getSleepTime());
+    LOG.debug("schedule* retry with sleep {} for attempt #{} of {}, {}",
+        sleepTime, event.getAttemptCount(), request, retryPolicy);
+    final SlidingWindow.Client<PendingOrderedRequest, RaftClientReply> 
slidingWindow = getSlidingWindow(request);
+    client.getScheduler().onTimeout(sleepTime,
+        () -> slidingWindow.retry(pending, this::sendRequestWithRetry),
+        LOG, () -> "Failed* to retry " + pending);
+  }
+
   void assertRequestSemaphore(int expectedAvailablePermits, int 
expectedQueueLength) {
     Preconditions.assertSame(expectedAvailablePermits, 
requestSemaphore.availablePermits(), "availablePermits");
     Preconditions.assertSame(expectedQueueLength, 
requestSemaphore.getQueueLength(), "queueLength");
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index f42391947..1b82709da 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -44,11 +44,13 @@ import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.thirdparty.com.google.common.cache.Cache;
 import org.apache.ratis.thirdparty.com.google.common.cache.CacheBuilder;
 import org.apache.ratis.util.CollectionUtils;
+import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.TimeoutExecutor;
+import org.apache.ratis.util.Timestamp;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -65,6 +67,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
@@ -79,10 +82,10 @@ public final class RaftClientImpl implements RaftClient {
       .build();
 
   public abstract static class PendingClientRequest {
-    private final long creationTimeInMs = System.currentTimeMillis();
+    private final Timestamp creationTime = Timestamp.currentTime();
     private final CompletableFuture<RaftClientReply> replyFuture = new 
CompletableFuture<>();
     private final AtomicInteger attemptCount = new AtomicInteger();
-    private final Map<Class<?>, Integer> exceptionCount = new 
ConcurrentHashMap<>();
+    private final Map<Class<?>, Integer> exceptionCounts = new 
ConcurrentHashMap<>();
 
     public abstract RaftClientRequest newRequestImpl();
 
@@ -101,19 +104,10 @@ public final class RaftClientImpl implements RaftClient {
       return attemptCount.get();
     }
 
-    int incrementExceptionCount(Throwable t) {
-      return t != null ? exceptionCount.compute(t.getClass(), (k, v) -> v != 
null ? v + 1 : 1) : 0;
-    }
-
-    public int getExceptionCount(Throwable t) {
-      return t != null ? 
Optional.ofNullable(exceptionCount.get(t.getClass())).orElse(0) : 0;
-    }
-
-    public boolean isRequestTimeout(TimeDuration timeout) {
-      if (timeout == null) {
-        return false;
-      }
-      return System.currentTimeMillis() - creationTimeInMs > 
timeout.toLong(TimeUnit.MILLISECONDS);
+    public ClientRetryEvent newClientRetryEvent(RaftClientRequest request, 
Throwable throwable) {
+      final int exceptionCount = throwable == null? 0
+          : exceptionCounts.compute(throwable.getClass(), (k, v) -> v == null? 
1: v+1);
+      return new ClientRetryEvent(getAttemptCount(), request, exceptionCount, 
throwable, creationTime);
     }
   }
 
@@ -196,6 +190,8 @@ public final class RaftClientImpl implements RaftClient {
   private final ConcurrentMap<RaftPeerId, LeaderElectionManagementApi>
       leaderElectionManagement = new ConcurrentHashMap<>();
 
+  private final AtomicBoolean closed = new AtomicBoolean();
+
   @SuppressWarnings("checkstyle:ParameterNumber")
   RaftClientImpl(ClientId clientId, RaftGroup group, RaftPeerId leaderId, 
RaftPeer primaryDataStreamServer,
       RaftClientRpc clientRpc, RetryPolicy retryPolicy, RaftProperties 
properties, Parameters parameters) {
@@ -346,11 +342,11 @@ public final class RaftClientImpl implements RaftClient {
     return dataStreamApi.get();
   }
 
-  Throwable noMoreRetries(ClientRetryEvent event) {
+  IOException noMoreRetries(ClientRetryEvent event) {
     final int attemptCount = event.getAttemptCount();
     final Throwable throwable = event.getCause();
     if (attemptCount == 1 && throwable != null) {
-      return throwable;
+      return IOUtils.asIOException(throwable);
     }
     return new RaftRetryFailureException(event.getRequest(), attemptCount, 
retryPolicy, throwable);
   }
@@ -418,8 +414,7 @@ public final class RaftClientImpl implements RaftClient {
 
   void handleIOException(RaftClientRequest request, IOException ioe,
       RaftPeerId newLeader, Consumer<RaftClientRequest> handler) {
-    LOG.debug("{}: suggested new leader: {}. Failed {} with {}",
-        clientId, newLeader, request, ioe);
+    LOG.debug("{}: suggested new leader: {}. Failed {}", clientId, newLeader, 
request, ioe);
     if (LOG.isTraceEnabled()) {
       LOG.trace("Stack trace", new Throwable("TRACE"));
     }
@@ -456,8 +451,17 @@ public final class RaftClientImpl implements RaftClient {
     return clientRpc;
   }
 
+  boolean isClosed() {
+    return closed.get();
+  }
+
   @Override
   public void close() throws IOException {
+    if (!closed.compareAndSet(false, true)) {
+      return;
+    }
+
+    LOG.debug("close {}", getId());
     clientRpc.close();
     if (dataStreamApi.isInitialized()) {
       dataStreamApi.get().close();
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
index 84b817b58..eccda4dbd 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
@@ -22,6 +22,7 @@ import 
org.apache.ratis.client.impl.RaftClientImpl.PendingClientRequest;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
 import org.apache.ratis.protocol.exceptions.GroupMismatchException;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.apache.ratis.protocol.RaftClientReply;
@@ -89,11 +90,14 @@ public interface UnorderedAsync {
         }
 
         final Throwable cause = replyException != null ? replyException : e;
-        pending.incrementExceptionCount(cause);
-        final ClientRetryEvent event = new ClientRetryEvent(request, cause, 
pending);
+        if (client.isClosed()) {
+          f.completeExceptionally(new AlreadyClosedException(client + " is 
closed"));
+          return;
+        }
+
+        final ClientRetryEvent event = pending.newClientRetryEvent(request, 
cause);
         RetryPolicy retryPolicy = client.getRetryPolicy();
         final RetryPolicy.Action action = 
retryPolicy.handleAttemptFailure(event);
-        TimeDuration sleepTime = client.getEffectiveSleepTime(cause, 
action.getSleepTime());
         if (!action.shouldRetry()) {
           f.completeExceptionally(client.noMoreRetries(event));
           return;
@@ -124,7 +128,9 @@ public interface UnorderedAsync {
           }
         }
 
-        LOG.debug("schedule retry for attempt #{}, policy={}, request={}", 
attemptCount, retryPolicy, request);
+        final TimeDuration sleepTime = client.getEffectiveSleepTime(cause, 
action.getSleepTime());
+        LOG.debug("schedule~ attempt #{} with sleep {} and policy {} for {}",
+            attemptCount, sleepTime, retryPolicy, request);
         client.getScheduler().onTimeout(sleepTime,
             () -> sendRequestWithRetry(pending, client), LOG, () -> clientId + 
": Failed~ to retry " + request);
       } catch (Exception ex) {
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/retry/ClientRetryEvent.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/retry/ClientRetryEvent.java
index f0c38efb9..c6a8beb06 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/retry/ClientRetryEvent.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/retry/ClientRetryEvent.java
@@ -17,12 +17,11 @@
  */
 package org.apache.ratis.client.retry;
 
-import org.apache.ratis.client.impl.RaftClientImpl.PendingClientRequest;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.retry.RetryPolicy;
-import 
org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.Timestamp;
 
 /** An {@link RetryPolicy.Event} specific to client request failure. */
 public class ClientRetryEvent implements RetryPolicy.Event {
@@ -30,23 +29,15 @@ public class ClientRetryEvent implements RetryPolicy.Event {
   private final int causeCount;
   private final RaftClientRequest request;
   private final Throwable cause;
-  private PendingClientRequest pending;
+  private final Timestamp pendingRequestCreationTime;
 
-  @VisibleForTesting
-  public ClientRetryEvent(int attemptCount, RaftClientRequest request, 
Throwable cause) {
-    this(attemptCount, request, attemptCount, cause);
-  }
-
-  public ClientRetryEvent(RaftClientRequest request, Throwable t, 
PendingClientRequest pending) {
-    this(pending.getAttemptCount(), request, pending.getExceptionCount(t), t);
-    this.pending = pending;
-  }
-
-  private ClientRetryEvent(int attemptCount, RaftClientRequest request, int 
causeCount, Throwable cause) {
+  public ClientRetryEvent(int attemptCount, RaftClientRequest request, int 
causeCount, Throwable cause,
+      Timestamp pendingRequestCreationTime) {
     this.attemptCount = attemptCount;
     this.causeCount = causeCount;
     this.request = request;
     this.cause = cause;
+    this.pendingRequestCreationTime = pendingRequestCreationTime;
   }
 
   @Override
@@ -69,7 +60,7 @@ public class ClientRetryEvent implements RetryPolicy.Event {
   }
 
   boolean isRequestTimeout(TimeDuration timeout) {
-    return pending != null && pending.isRequestTimeout(timeout);
+    return timeout != null && 
pendingRequestCreationTime.elapsedTime().compareTo(timeout) >= 0;
   }
 
   @Override
@@ -77,6 +68,7 @@ public class ClientRetryEvent implements RetryPolicy.Event {
     return JavaUtils.getClassSimpleName(getClass())
         + ":attempt=" + attemptCount
         + ",request=" + request
-        + ",cause=" + cause;
+        + ",cause=" + cause
+        + ",causeCount=" + causeCount;
   }
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java 
b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
index 105ecbfb4..0ce0595fa 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
@@ -36,6 +36,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /** A map from peer id to peer and its proxy. */
 public class PeerProxyMap<PROXY extends Closeable> implements RaftPeer.Add, 
Closeable {
@@ -65,7 +66,7 @@ public class PeerProxyMap<PROXY extends Closeable> implements 
RaftPeer.Add, Clos
               throw new AlreadyClosedException(name + " is already " + 
current);
             }
             lifeCycle.startAndTransition(
-                () -> proxy = createProxy.apply(peer), IOException.class);
+                () -> proxy = createProxyImpl(peer), IOException.class);
           }
         }
       }
@@ -92,6 +93,7 @@ public class PeerProxyMap<PROXY extends Closeable> implements 
RaftPeer.Add, Clos
   private final Object resetLock = new Object();
 
   private final CheckedFunction<RaftPeer, PROXY, IOException> createProxy;
+  private final AtomicBoolean closed = new AtomicBoolean();
 
   public PeerProxyMap(String name, CheckedFunction<RaftPeer, PROXY, 
IOException> createProxy) {
     this.name = name;
@@ -102,6 +104,13 @@ public class PeerProxyMap<PROXY extends Closeable> 
implements RaftPeer.Add, Clos
     return name;
   }
 
+  private PROXY createProxyImpl(RaftPeer peer) throws IOException {
+    if (closed.get()) {
+      throw new AlreadyClosedException(name + ": Failed to create proxy for " 
+ peer);
+    }
+    return createProxy.apply(peer);
+  }
+
   public PROXY getProxy(RaftPeerId id) throws IOException {
     Objects.requireNonNull(id, "id == null");
     PeerAndProxy p = peers.get(id);
@@ -161,6 +170,10 @@ public class PeerProxyMap<PROXY extends Closeable> 
implements RaftPeer.Add, Clos
 
   @Override
   public void close() {
+    if (!closed.compareAndSet(false, true)) {
+      return;
+    }
+
     final List<IOException> exceptions = Collections.synchronizedList(new 
ArrayList<>());
     ConcurrentUtils.parallelForEachAsync(peers.values(),
         pp -> pp.setNullProxyAndClose().map(proxy -> closeProxy(proxy, 
pp)).ifPresent(exceptions::add),
@@ -180,7 +193,7 @@ public class PeerProxyMap<PROXY extends Closeable> 
implements RaftPeer.Add, Clos
 
   private IOException closeProxy(PROXY proxy, PeerAndProxy pp) {
     try {
-      LOG.debug("{}: Closing proxy for peer {}", name, pp);
+      LOG.debug("{}: Closing proxy {} {} for peer {}", name, 
proxy.getClass().getSimpleName(), proxy, pp);
       proxy.close();
       return null;
     } catch (IOException e) {
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index 260f6013e..71c5c5ef0 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -372,7 +372,7 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
   @Test
   public void testStateMachineMetrics() throws Exception {
     runWithNewCluster(NUM_SERVERS, cluster ->
-        RaftBasicTests.testStateMachineMetrics(true, cluster, LOG));
+        RaftBasicTests.runTestStateMachineMetrics(true, cluster));
   }
 
   @Test
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index 47c9b0e08..4ff9681f0 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -168,12 +168,10 @@ public abstract class RaftBasicTests<CLUSTER extends 
MiniRaftCluster>
 
     final List<RaftServer.Division> divisions = 
cluster.getServerAliveStream().collect(Collectors.toList());
     for(RaftServer.Division impl: divisions) {
-        JavaUtils.attempt(() -> RaftTestUtil.assertLogEntries(impl, term, 
messages),
-            50, TimeDuration.valueOf(1, TimeUnit.SECONDS), impl.getId() + " 
assertLogEntries", LOG);
+      RaftTestUtil.assertLogEntries(impl, term, messages, 50, LOG);
     }
   }
 
-
   @Test
   public void testOldLeaderCommit() throws Exception {
     runWithNewCluster(NUM_SERVERS, this::runTestOldLeaderCommit);
@@ -218,7 +216,7 @@ public abstract class RaftBasicTests<CLUSTER extends 
MiniRaftCluster>
 
     cluster.getServerAliveStream()
         .map(RaftServer.Division::getRaftLog)
-        .forEach(log -> RaftTestUtil.assertLogEntries(log, term, messages));
+        .forEach(log -> RaftTestUtil.assertLogEntries(log, term, messages, 
System.out::println));
   }
 
   @Test
@@ -453,8 +451,12 @@ public abstract class RaftBasicTests<CLUSTER extends 
MiniRaftCluster>
     }
   }
 
-  public static void testStateMachineMetrics(boolean async,
-      MiniRaftCluster cluster, Logger LOG) throws Exception {
+  @Test
+  public void testStateMachineMetrics() throws Exception {
+    runWithNewCluster(NUM_SERVERS, cluster -> 
runTestStateMachineMetrics(false, cluster));
+  }
+
+  static void runTestStateMachineMetrics(boolean async, MiniRaftCluster 
cluster) throws Exception {
     RaftServer.Division leader = waitForLeader(cluster);
     try (final RaftClient client = cluster.createClient()) {
       Gauge appliedIndexGauge = getStatemachineGaugeWithName(leader,
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index fa4188716..41a431149 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -249,19 +249,16 @@ public interface RaftTestUtil {
     }
   }
 
-  static void assertLogEntries(RaftServer.Division server, long expectedTerm, 
SimpleMessage... expectedMessages) {
-    LOG.info("checking raft log for {}", server.getMemberId());
-    final RaftLog log = server.getRaftLog();
-    try {
-      RaftTestUtil.assertLogEntries(log, expectedTerm, expectedMessages);
-    } catch (AssertionError e) {
-      LOG.error("Unexpected raft log in {}", server.getMemberId(), e);
-      throw e;
-    }
+  static void assertLogEntries(RaftServer.Division server, long expectedTerm, 
SimpleMessage[] expectedMessages,
+      int numAttempts, Logger log) throws Exception {
+    final String name = server.getId() + " assertLogEntries";
+    final Function<Integer, Consumer<String>> print = i -> i < numAttempts? s 
-> {}: System.out::println;
+    JavaUtils.attempt(i -> assertLogEntries(server.getRaftLog(), expectedTerm, 
expectedMessages, print.apply(i)),
+        numAttempts, TimeDuration.ONE_SECOND, () -> name, log);
   }
 
   static Iterable<LogEntryProto> getLogEntryProtos(RaftLog log) {
-    return CollectionUtils.as(log.getEntries(0, Long.MAX_VALUE), ti -> {
+    return CollectionUtils.as(log.getEntries(0, 
log.getLastEntryTermIndex().getIndex() + 1), ti -> {
       try {
         return log.get(ti.getIndex());
       } catch (IOException exception) {
@@ -270,17 +267,17 @@ public interface RaftTestUtil {
     });
   }
 
-  static List<LogEntryProto> getStateMachineLogEntries(RaftLog log) {
+  static List<LogEntryProto> getStateMachineLogEntries(RaftLog log, 
Consumer<String> print) {
     final List<LogEntryProto> entries = new ArrayList<>();
     for (LogEntryProto e : getLogEntryProtos(log)) {
       final String s = LogProtoUtils.toLogEntryString(e);
       if (e.hasStateMachineLogEntry()) {
-        LOG.info(s + ", " + 
e.getStateMachineLogEntry().toString().trim().replace("\n", ", "));
+        print.accept(entries.size() + ") " + s);
         entries.add(e);
       } else if (e.hasConfigurationEntry()) {
-        LOG.info("Found {}, ignoring it.", s);
+        print.accept("Ignoring " + s);
       } else if (e.hasMetadataEntry()) {
-        LOG.info("Found {}, ignoring it.", s);
+        print.accept("Ignoring " + s);
       } else {
         throw new AssertionError("Unexpected LogEntryBodyCase " + 
e.getLogEntryBodyCase() + " at " + s);
       }
@@ -288,13 +285,14 @@ public interface RaftTestUtil {
     return entries;
   }
 
-  static void assertLogEntries(RaftLog log, long expectedTerm, 
SimpleMessage... expectedMessages) {
-    final List<LogEntryProto> entries = getStateMachineLogEntries(log);
+  static Void assertLogEntries(RaftLog log, long expectedTerm, SimpleMessage[] 
expectedMessages, Consumer<String> print) {
+    final List<LogEntryProto> entries = getStateMachineLogEntries(log, print);
     try {
       assertLogEntries(entries, expectedTerm, expectedMessages);
     } catch(Exception t) {
       throw new AssertionError("entries: " + entries, t);
     }
+    return null;
   }
 
   static void assertLogEntries(List<LogEntryProto> entries, long expectedTerm, 
SimpleMessage... expectedMessages) {
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/RaftLogTruncateTests.java 
b/ratis-test/src/test/java/org/apache/ratis/RaftLogTruncateTests.java
new file mode 100644
index 000000000..80c57741c
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/RaftLogTruncateTests.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import org.apache.ratis.RaftTestUtil.SimpleMessage;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.RaftClientConfigKeys;
+import org.apache.ratis.client.impl.OrderedAsync;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.MiniRaftCluster;
+import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLog;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
+import org.apache.ratis.util.Slf4jUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.event.Level;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
+
+public abstract class RaftLogTruncateTests<CLUSTER extends MiniRaftCluster> 
extends BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
+  public static final int NUM_SERVERS = 5;
+  final TimeDuration MIN_TIMEOUT = TimeDuration.valueOf(3, TimeUnit.SECONDS);
+
+  static SimpleMessage[] arraycopy(SimpleMessage[] src1, SimpleMessage[] src2) 
{
+    final SimpleMessage[] dst = new SimpleMessage[src1.length + src2.length];
+    System.arraycopy(src1, 0, dst, 0, src1.length);
+    System.arraycopy(src2, 0, dst, src1.length, src2.length);
+    return dst;
+  }
+
+  {
+    Slf4jUtils.setLogLevel(OrderedAsync.LOG, Level.ERROR);
+    Slf4jUtils.setLogLevel(RaftServerConfigKeys.LOG, Level.ERROR);
+    Slf4jUtils.setLogLevel(RaftClientConfigKeys.LOG, Level.ERROR);
+
+    final RaftProperties p = getProperties();
+    p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, 
SimpleStateMachine4Testing.class, StateMachine.class);
+
+    // set a long rpc timeout so, when the leader does not have the majority, 
it won't step down fast.
+    RaftServerConfigKeys.Rpc.setTimeoutMin(p, MIN_TIMEOUT);
+    RaftServerConfigKeys.Rpc.setTimeoutMax(p, MIN_TIMEOUT.multiply(2));
+    RaftServerConfigKeys.Rpc.setFirstElectionTimeoutMin(p, 
TimeDuration.ONE_SECOND);
+    RaftServerConfigKeys.Rpc.setFirstElectionTimeoutMax(p, 
TimeDuration.ONE_SECOND.multiply(2));
+  }
+
+  @Override
+  public int getGlobalTimeoutSeconds() {
+    return 200;
+  }
+
+  @Test
+  public void testLogTruncate() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::runTestLogTruncate);
+  }
+
+  void runTestLogTruncate(MiniRaftCluster cluster) throws Exception {
+    final RaftServer.Division oldLeader = waitForLeader(cluster);
+    final List<RaftServer.Division> oldFollowers = cluster.getFollowers();
+    final List<RaftPeerId> killedPeers = new ArrayList<>();
+    final List<RaftPeerId> remainingPeers = new ArrayList<>();
+
+    final int majorityIndex = NUM_SERVERS / 2 + 1;
+    Assert.assertEquals(NUM_SERVERS - 1, oldFollowers.size());
+    Assert.assertTrue(majorityIndex < oldFollowers.size());
+
+    for (int i = 0; i < majorityIndex; i++) {
+      killedPeers.add(oldFollowers.get(i).getId());
+    }
+    remainingPeers.add(oldLeader.getId());
+    for (int i = majorityIndex; i < oldFollowers.size(); i++) {
+      remainingPeers.add(oldFollowers.get(i).getId());
+    }
+
+    try {
+      runTestLogTruncate(cluster, oldLeader, killedPeers, remainingPeers);
+    } catch (Throwable e) {
+      LOG.info("killedPeers   : {}", killedPeers);
+      LOG.info("remainingPeers: {}", remainingPeers);
+      throw e;
+    }
+  }
+
+  void runTestLogTruncate(MiniRaftCluster cluster, RaftServer.Division 
oldLeader,
+      List<RaftPeerId> killedPeers, List<RaftPeerId> remainingPeers) throws 
Exception {
+    final List<Throwable> exceptions = Collections.synchronizedList(new 
ArrayList<>());
+    final long oldLeaderTerm = oldLeader.getInfo().getCurrentTerm();
+    LOG.info("oldLeader: {}, term={}", oldLeader.getId(), oldLeaderTerm);
+
+    final SimpleMessage[] firstBatch = SimpleMessage.create(5, "first");
+    final SimpleMessage[] secondBatch = SimpleMessage.create(4, "second");
+
+    try (final RaftClient client = cluster.createClient(oldLeader.getId())) {
+      // send some messages
+      for (SimpleMessage batch : firstBatch) {
+        final RaftClientReply reply = client.io().send(batch);
+        Assert.assertTrue(reply.isSuccess());
+      }
+      for (RaftServer.Division f : cluster.getFollowers()) {
+        assertLogEntries(f, oldLeaderTerm, firstBatch);
+      }
+
+      // kill a majority of followers
+      LOG.info("Before killServer {}: {}", killedPeers, 
cluster.printServers());
+      for (RaftPeerId f : killedPeers) {
+        cluster.killServer(f);
+      }
+      LOG.info("After killServer {}: {}", killedPeers, cluster.printServers());
+
+      // send more messages, but they won't be committed due to not enough 
followers
+      final SimpleMessage[] messagesToBeTruncated = SimpleMessage.create(3, 
"messagesToBeTruncated");
+      final AtomicBoolean done = new AtomicBoolean();
+      for (SimpleMessage message : messagesToBeTruncated) {
+        client.async().send(message).whenComplete((r, e) -> {
+          if (!done.get()) {
+            exceptions.add(new IllegalStateException(message + " is completed: 
reply=" + r, e));
+          }
+        });
+      }
+
+      // check log messages
+      final SimpleMessage[] expectedMessages = arraycopy(firstBatch, 
messagesToBeTruncated);
+      for (RaftPeerId f : remainingPeers) {
+        assertLogEntries(cluster.getDivision(f), oldLeaderTerm, 
expectedMessages);
+      }
+      done.set(true);
+      LOG.info("done");
+    }
+
+    // kill the remaining servers
+    LOG.info("Before killServer {}: {}", remainingPeers, 
cluster.printServers());
+    for (RaftPeerId f : remainingPeers) {
+      cluster.killServer(f);
+    }
+    LOG.info("After killServer {}: {}", remainingPeers, 
cluster.printServers());
+
+    // restart the earlier followers
+    for (RaftPeerId f : killedPeers) {
+      cluster.restartServer(f, false);
+    }
+
+    // The new leader should be one of the earlier followers
+    final RaftServer.Division newLeader = waitForLeader(cluster);
+    LOG.info("After restartServer {}: {}", killedPeers, 
cluster.printServers());
+    final long newLeaderTerm = newLeader.getInfo().getCurrentTerm();
+
+    final SegmentedRaftLog newLeaderLog = (SegmentedRaftLog) 
newLeader.getRaftLog();
+    LOG.info("newLeader: {}, term {}, last={}", newLeader.getId(), 
newLeaderTerm,
+        newLeaderLog.getLastEntryTermIndex());
+    Assert.assertTrue(killedPeers.contains(newLeader.getId()));
+
+    // restart the remaining servers
+    for (RaftPeerId f : remainingPeers) {
+      cluster.restartServer(f, false);
+    }
+
+    // check RaftLog truncate
+    for (RaftPeerId f : remainingPeers) {
+      assertLogEntries(cluster.getDivision(f), oldLeaderTerm, firstBatch);
+    }
+
+    try (final RaftClient client = cluster.createClient(newLeader.getId())) {
+      // send more messages
+      for (SimpleMessage batch : secondBatch) {
+        final RaftClientReply reply = client.io().send(batch);
+        Assert.assertTrue(reply.isSuccess());
+      }
+    }
+
+    // check log messages -- it should be truncated and then append the new 
messages
+    final SimpleMessage[] expectedMessages = arraycopy(firstBatch, 
secondBatch);
+    for (RaftPeerId f : killedPeers) {
+      assertLogEntries(cluster.getDivision(f), oldLeaderTerm, 
expectedMessages);
+    }
+
+    if (!exceptions.isEmpty()) {
+      LOG.info("{} exceptions", exceptions.size());
+      for(int i = 0 ; i < exceptions.size(); i++) {
+        LOG.info("exception {})", i, exceptions.get(i));
+      }
+      Assert.fail();
+    }
+  }
+
+  private void assertLogEntries(RaftServer.Division server, long term, 
SimpleMessage[] expectedMessages)
+      throws Exception {
+    RaftTestUtil.assertLogEntries(server, term, expectedMessages, 30, LOG);
+  }
+}
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftLogTruncateWithGrpc.java
 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftLogTruncateWithGrpc.java
new file mode 100644
index 000000000..dc2846374
--- /dev/null
+++ 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftLogTruncateWithGrpc.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.ratis.RaftLogTruncateTests;
+
+public class TestRaftLogTruncateWithGrpc extends 
RaftLogTruncateTests<MiniRaftClusterWithGrpc>
+    implements MiniRaftClusterWithGrpc.FactoryGet {
+}
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
index bc0061f5f..046453d58 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
@@ -73,12 +73,6 @@ public class TestRaftWithGrpc
     runWithNewCluster(NUM_SERVERS, cluster -> testRequestTimeout(false, 
cluster, LOG));
   }
 
-  @Test
-  public void testStateMachineMetrics() throws Exception {
-    runWithNewCluster(NUM_SERVERS, cluster ->
-        testStateMachineMetrics(false, cluster, LOG));
-  }
-
   @Test
   public void testUpdateViaHeartbeat() throws Exception {
     runWithNewCluster(NUM_SERVERS, this::runTestUpdateViaHeartbeat);
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
 
b/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
index 264db8946..36e6dfbcc 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
@@ -154,7 +154,7 @@ public class TestExceptionDependentRetry extends BaseTest 
implements MiniRaftClu
       long sleepTime) {
     for (int i = 0; i < retries + 1; i++) {
       RetryPolicy.Action action = exceptionDependentRetry
-          .handleAttemptFailure(new ClientRetryEvent(i, null, exception));
+          .handleAttemptFailure(TestRetryPolicy.newClientRetryEvent(i, null, 
exception));
 
       final boolean expected = i < retries && i < maxAttempts;
       Assert.assertEquals(expected, action.shouldRetry());
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/retry/TestRetryPolicy.java 
b/ratis-test/src/test/java/org/apache/ratis/retry/TestRetryPolicy.java
index d69cd1a2e..1b9536b4b 100644
--- a/ratis-test/src/test/java/org/apache/ratis/retry/TestRetryPolicy.java
+++ b/ratis-test/src/test/java/org/apache/ratis/retry/TestRetryPolicy.java
@@ -33,6 +33,7 @@ import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.exceptions.TimeoutIOException;
 import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
 import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.Timestamp;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -70,6 +71,10 @@ public class TestRetryPolicy extends BaseTest {
     }
   }
 
+  static ClientRetryEvent newClientRetryEvent(int attemptCount, 
RaftClientRequest request, Throwable cause) {
+    return new ClientRetryEvent(attemptCount, request, attemptCount, cause, 
Timestamp.currentTime());
+  }
+
   @Test
   public void testRequestTypeDependentRetry() {
     final RequestTypeDependentRetryPolicy.Builder b = 
RequestTypeDependentRetryPolicy.newBuilder();
@@ -88,7 +93,7 @@ public class TestRetryPolicy extends BaseTest {
         RaftClientRequest.watchRequestType(1, ReplicationLevel.MAJORITY));
     for(int i = 1; i < 2*n; i++) {
       { //write
-        final ClientRetryEvent event = new ClientRetryEvent(i, writeRequest, 
null);
+        final ClientRetryEvent event = newClientRetryEvent(i, writeRequest, 
null);
         final RetryPolicy.Action action = policy.handleAttemptFailure(event);
 
         final boolean expected = i < n;
@@ -101,21 +106,21 @@ public class TestRetryPolicy extends BaseTest {
       }
 
       { //read and stale read are using default
-        final ClientRetryEvent event = new ClientRetryEvent(i, readRequest, 
null);
+        final ClientRetryEvent event = newClientRetryEvent(i, readRequest, 
null);
         final RetryPolicy.Action action = policy.handleAttemptFailure(event);
         Assert.assertTrue(action.shouldRetry());
         Assert.assertEquals(0L, action.getSleepTime().getDuration());
       }
 
       {
-        final ClientRetryEvent event = new ClientRetryEvent(i, 
staleReadRequest, null);
+        final ClientRetryEvent event = newClientRetryEvent(i, 
staleReadRequest, null);
         final RetryPolicy.Action action = policy.handleAttemptFailure(event);
         Assert.assertTrue(action.shouldRetry());
         Assert.assertEquals(0L, action.getSleepTime().getDuration());
       }
 
       { //watch has no retry
-        final ClientRetryEvent event = new ClientRetryEvent(i, watchRequest, 
null);
+        final ClientRetryEvent event = newClientRetryEvent(i, watchRequest, 
null);
         final RetryPolicy.Action action = policy.handleAttemptFailure(event);
         Assert.assertFalse(action.shouldRetry());
         Assert.assertEquals(0L, action.getSleepTime().getDuration());
@@ -148,7 +153,7 @@ public class TestRetryPolicy extends BaseTest {
     };
 
     for (RaftClientRequest request : requests) {
-      final ClientRetryEvent event = new ClientRetryEvent(request, new 
Exception(), pending);
+      final ClientRetryEvent event = pending.newClientRetryEvent(request, new 
Exception());
       final RetryPolicy.Action action = policy.handleAttemptFailure(event);
       Assert.assertTrue(action.shouldRetry());
       Assert.assertEquals(0L, action.getSleepTime().getDuration());
@@ -156,7 +161,7 @@ public class TestRetryPolicy extends BaseTest {
 
     timeout.sleep();
     for (RaftClientRequest request : requests) {
-      final ClientRetryEvent event = new ClientRetryEvent(request, new 
Exception(), pending);
+      final ClientRetryEvent event = pending.newClientRetryEvent(request, new 
Exception());
       final RetryPolicy.Action action = policy.handleAttemptFailure(event);
       Assert.assertFalse(action.shouldRetry());
     }
@@ -218,8 +223,7 @@ public class TestRetryPolicy extends BaseTest {
    */
   private void checkEvent(int exceptionAttemptCount, RetryPolicy retryPolicy, 
RaftClientRequest raftClientRequest,
       Throwable exception, Pair exceptionPolicyPair) {
-    final ClientRetryEvent event =
-        new ClientRetryEvent(exceptionAttemptCount, raftClientRequest, 
exception);
+    final ClientRetryEvent event = newClientRetryEvent(exceptionAttemptCount, 
raftClientRequest, exception);
     final RetryPolicy.Action action = retryPolicy.handleAttemptFailure(event);
 
     final boolean expected = exceptionAttemptCount < 
exceptionPolicyPair.retries;
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java 
b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
index 73ff1eb53..2f3edf781 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
@@ -261,7 +261,7 @@ public abstract class ServerRestartTests<CLUSTER extends 
MiniRaftCluster>
     final RaftPeerId leaderId = leader.getId();
     ids.add(leaderId);
 
-    RaftTestUtil.getStateMachineLogEntries(leaderLog);
+    RaftTestUtil.getStateMachineLogEntries(leaderLog, LOG::info);
 
     // check that the last metadata entry is written to the log
     JavaUtils.attempt(() -> assertLastLogEntry(leader), 20, HUNDRED_MILLIS, 
"leader last metadata entry", LOG);


Reply via email to