This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new a8075e3  RATIS-1095. Move the related methods from RaftClientImpl to 
the corresponding impls. (#222)
a8075e3 is described below

commit a8075e3d6ae8610ce5e0c731cdca303f73d25839
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Tue Oct 13 18:56:08 2020 +0800

    RATIS-1095. Move the related methods from RaftClientImpl to the 
corresponding impls. (#222)
    
    * RATIS-1095. Move the related methods from RaftClientImpl to the 
corresponding impls.
    
    * Revise the javadoc in BlockingApi.
    
    * Revised the javadoc again.
    
    * Fixed checkstyle.
---
 .../java/org/apache/ratis/client/api/AsyncApi.java |  33 ++++++-
 .../org/apache/ratis/client/api/BlockingApi.java   |  33 ++++++-
 .../org/apache/ratis/client/impl/AsyncImpl.java    |  11 ++-
 .../org/apache/ratis/client/impl/BlockingImpl.java |  91 ++++++++++++++++-
 .../ratis/client/impl/GroupManagementImpl.java     |   8 +-
 .../ratis/client/impl/MessageStreamImpl.java       |  10 +-
 .../apache/ratis/client/impl/RaftClientImpl.java   | 109 ++++-----------------
 7 files changed, 181 insertions(+), 114 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java 
b/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
index de8320e..fd000ad 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
@@ -24,11 +24,14 @@ import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftPeerId;
 
 /**
- * APIs to support asynchronous operations such as send message, send 
(stale)read message and watch request.
+ * Asynchronous API to support operations
+ * such as sending message, read-message, stale-read-message and watch-request.
+ *
+ * Note that this API and {@link BlockingApi} support the same set of 
operations.
  */
 public interface AsyncApi {
   /**
-   * Async call to send the given message to the raft service.
+   * Send the given message asynchronously to the raft service.
    * The message may change the state of the service.
    * For readonly messages, use {@link #sendReadOnly(Message)} instead.
    *
@@ -37,12 +40,32 @@ public interface AsyncApi {
    */
   CompletableFuture<RaftClientReply> send(Message message);
 
-  /** Async call to send the given readonly message to the raft service. */
+  /**
+   * Send the given readonly message asynchronously to the raft service.
+   *
+   * @param message The request message.
+   * @return a future of the reply.
+   */
   CompletableFuture<RaftClientReply> sendReadOnly(Message message);
 
-  /** Async call to send the given stale-read message to the given server (not 
the raft service). */
+  /**
+   * Send the given stale-read message asynchronously to the given server (not 
the raft service).
+   * If the server commit index is larger than or equal to the given 
min-index, the request will be processed.
+   * Otherwise, the server returns a {@link 
org.apache.ratis.protocol.exceptions.StaleReadException}.
+   *
+   * @param message The request message.
+   * @param minIndex The minimum log index that the server log must have 
already committed.
+   * @param server The target server
+   * @return a future of the reply.
+   */
   CompletableFuture<RaftClientReply> sendStaleRead(Message message, long 
minIndex, RaftPeerId server);
 
-  /** Async call to watch the given index to satisfy the given replication 
level. */
+  /**
+   * Watch the given index asynchronously to satisfy the given replication 
level.
+   *
+   * @param index The log index to be watched.
+   * @param replication The replication level required.
+   * @return a future of the reply.
+   */
   CompletableFuture<RaftClientReply> watch(long index, ReplicationLevel 
replication);
 }
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java 
b/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
index dc14cd5..e1679b9 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
@@ -24,7 +24,10 @@ import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftPeerId;
 
 /**
- * APIs to support blocking operations such as send message, send (stale)read 
message and watch request.
+ * Blocking API to support operations
+ * such as sending message, read-message, stale-read-message and watch-request.
+ *
+ * Note that this API and {@link AsyncApi} support the same set of operations.
  */
 public interface BlockingApi {
   /**
@@ -37,12 +40,32 @@ public interface BlockingApi {
    */
   RaftClientReply send(Message message) throws IOException;
 
-  /** Send the given readonly message to the raft service. */
+  /**
+   * Send the given readonly message to the raft service.
+   *
+   * @param message The request message.
+   * @return the reply.
+   */
   RaftClientReply sendReadOnly(Message message) throws IOException;
 
-  /** Send the given stale-read message to the given server (not the raft 
service). */
+  /**
+   * Send the given stale-read message to the given server (not the raft 
service).
+   * If the server commit index is larger than or equal to the given 
min-index, the request will be processed.
+   * Otherwise, the server throws a {@link 
org.apache.ratis.protocol.exceptions.StaleReadException}.
+   *
+   * @param message The request message.
+   * @param minIndex The minimum log index that the server log must have 
already committed.
+   * @param server The target server
+   * @return the reply.
+   */
   RaftClientReply sendStaleRead(Message message, long minIndex, RaftPeerId 
server) throws IOException;
 
-  /** Watch the given index to satisfy the given replication level. */
+  /**
+   * Watch the given index to satisfy the given replication level.
+   *
+   * @param index The log index to be watched.
+   * @param replication The replication level required.
+   * @return the reply.
+   */
   RaftClientReply watch(long index, ReplicationLevel replication) throws 
IOException;
-}
+}
\ No newline at end of file
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
index 09c3ae5..8537d35 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
@@ -34,19 +34,24 @@ class AsyncImpl implements AsyncApi {
     this.client = Objects.requireNonNull(client, "client == null");
   }
 
+  CompletableFuture<RaftClientReply> send(
+      RaftClientRequest.Type type, Message message, RaftPeerId server) {
+    return client.getOrderedAsync().send(type, message, server);
+  }
+
   @Override
   public CompletableFuture<RaftClientReply> send(Message message) {
-    return client.sendAsync(RaftClientRequest.writeRequestType(), message, 
null);
+    return send(RaftClientRequest.writeRequestType(), message, null);
   }
 
   @Override
   public CompletableFuture<RaftClientReply> sendReadOnly(Message message) {
-    return client.sendAsync(RaftClientRequest.readRequestType(), message, 
null);
+    return send(RaftClientRequest.readRequestType(), message, null);
   }
 
   @Override
   public CompletableFuture<RaftClientReply> sendStaleRead(Message message, 
long minIndex, RaftPeerId server) {
-    return client.sendAsync(RaftClientRequest.staleReadRequestType(minIndex), 
message, server);
+    return send(RaftClientRequest.staleReadRequestType(minIndex), message, 
server);
   }
 
   @Override
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
index d73c227..4b85b11 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,16 +18,30 @@
 package org.apache.ratis.client.impl;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.util.Objects;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
 import org.apache.ratis.client.api.BlockingApi;
+import org.apache.ratis.client.retry.ClientRetryEvent;
+import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
 import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.exceptions.GroupMismatchException;
+import org.apache.ratis.protocol.exceptions.StateMachineException;
+import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.util.TimeDuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** Blocking api implementations. */
 class BlockingImpl implements BlockingApi {
+  static final Logger LOG = LoggerFactory.getLogger(BlockingImpl.class);
+
   private final RaftClientImpl client;
 
   BlockingImpl(RaftClientImpl client) {
@@ -36,22 +50,89 @@ class BlockingImpl implements BlockingApi {
 
   @Override
   public RaftClientReply send(Message message) throws IOException {
-    return client.send(RaftClientRequest.writeRequestType(), message, null);
+    return send(RaftClientRequest.writeRequestType(), message, null);
   }
 
   @Override
   public RaftClientReply sendReadOnly(Message message) throws IOException {
-    return client.send(RaftClientRequest.readRequestType(), message, null);
+    return send(RaftClientRequest.readRequestType(), message, null);
   }
 
   @Override
   public RaftClientReply sendStaleRead(Message message, long minIndex, 
RaftPeerId server)
       throws IOException {
-    return client.send(RaftClientRequest.staleReadRequestType(minIndex), 
message, server);
+    return send(RaftClientRequest.staleReadRequestType(minIndex), message, 
server);
   }
 
   @Override
   public RaftClientReply watch(long index, ReplicationLevel replication) 
throws IOException {
-    return client.send(RaftClientRequest.watchRequestType(index, replication), 
null, null);
+    return send(RaftClientRequest.watchRequestType(index, replication), null, 
null);
+  }
+
+  private RaftClientReply send(RaftClientRequest.Type type, Message message, 
RaftPeerId server)
+      throws IOException {
+    if (!type.is(TypeCase.WATCH)) {
+      Objects.requireNonNull(message, "message == null");
+    }
+
+    final long callId = RaftClientImpl.nextCallId();
+    return sendRequestWithRetry(() -> client.newRaftClientRequest(server, 
callId, message, type, null));
+  }
+
+  RaftClientReply sendRequestWithRetry(Supplier<RaftClientRequest> supplier) 
throws IOException {
+    RaftClientImpl.PendingClientRequest pending = new 
RaftClientImpl.PendingClientRequest() {
+      @Override
+      public RaftClientRequest newRequestImpl() {
+        return supplier.get();
+      }
+    };
+    while (true) {
+      final RaftClientRequest request = pending.newRequest();
+      IOException ioe = null;
+      try {
+        final RaftClientReply reply = sendRequest(request);
+
+        if (reply != null) {
+          return reply;
+        }
+      } catch (GroupMismatchException | StateMachineException e) {
+        throw e;
+      } catch (IOException e) {
+        ioe = e;
+      }
+
+      pending.incrementExceptionCount(ioe);
+      ClientRetryEvent event = new ClientRetryEvent(request, ioe, pending);
+      final RetryPolicy retryPolicy = client.getRetryPolicy();
+      final RetryPolicy.Action action = 
retryPolicy.handleAttemptFailure(event);
+      TimeDuration sleepTime = client.getEffectiveSleepTime(ioe, 
action.getSleepTime());
+
+      if (!action.shouldRetry()) {
+        throw (IOException)client.noMoreRetries(event);
+      }
+
+      try {
+        sleepTime.sleep();
+      } catch (InterruptedException e) {
+        throw new InterruptedIOException("retry policy=" + retryPolicy);
+      }
+    }
+  }
+
+  RaftClientReply sendRequest(RaftClientRequest request) throws IOException {
+    LOG.debug("{}: send {}", client.getId(), request);
+    RaftClientReply reply;
+    try {
+      reply = client.getClientRpc().sendRequest(request);
+    } catch (GroupMismatchException gme) {
+      throw gme;
+    } catch (IOException ioe) {
+      client.handleIOException(request, ioe);
+      throw ioe;
+    }
+    LOG.debug("{}: receive {}", client.getId(), reply);
+    reply = client.handleLeaderException(request, reply);
+    reply = RaftClientImpl.handleRaftException(reply, Function.identity());
+    return reply;
   }
 }
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java
index 7ed407e..fab45b3 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java
@@ -47,7 +47,7 @@ class GroupManagementImpl implements GroupManagementApi {
 
     final long callId = RaftClientImpl.nextCallId();
     client.addServers(newGroup.getPeers().stream());
-    return client.sendRequest(GroupManagementRequest.newAdd(client.getId(), 
server, callId, newGroup));
+    return 
client.io().sendRequest(GroupManagementRequest.newAdd(client.getId(), server, 
callId, newGroup));
   }
 
   @Override
@@ -56,14 +56,14 @@ class GroupManagementImpl implements GroupManagementApi {
     Objects.requireNonNull(groupId, "groupId == null");
 
     final long callId = RaftClientImpl.nextCallId();
-    return client.sendRequest(GroupManagementRequest.newRemove(client.getId(), 
server,
+    return 
client.io().sendRequest(GroupManagementRequest.newRemove(client.getId(), server,
         callId, groupId, deleteDirectory, renameDirectory));
   }
 
   @Override
   public GroupListReply list() throws IOException {
     final long callId = RaftClientImpl.nextCallId();
-    final RaftClientReply reply = client.sendRequest(
+    final RaftClientReply reply = client.io().sendRequest(
         new GroupListRequest(client.getId(), server, client.getGroupId(), 
callId));
     Preconditions.assertTrue(reply instanceof GroupListReply, () -> 
"Unexpected reply: " + reply);
     return (GroupListReply)reply;
@@ -75,7 +75,7 @@ class GroupManagementImpl implements GroupManagementApi {
       groupId = client.getGroupId();
     }
     final long callId = RaftClientImpl.nextCallId();
-    final RaftClientReply reply = client.sendRequest(
+    final RaftClientReply reply = client.io().sendRequest(
         new GroupInfoRequest(client.getId(), server, groupId, callId));
     Preconditions.assertTrue(reply instanceof GroupInfoReply, () -> 
"Unexpected reply: " + reply);
     return (GroupInfoReply)reply;
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/MessageStreamImpl.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/MessageStreamImpl.java
index 77a8932..e2971d6 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/MessageStreamImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/MessageStreamImpl.java
@@ -23,6 +23,8 @@ import org.apache.ratis.client.api.MessageStreamApi;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftClientRequest.Type;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.SizeInBytes;
 import org.slf4j.Logger;
@@ -48,14 +50,18 @@ public final class MessageStreamImpl implements 
MessageStreamApi {
       this.id = id;
     }
 
+    private Type getStreamRequestType(boolean endOfRequest) {
+      return RaftClientRequest.streamRequestType(id, 
messageId.getAndIncrement(), endOfRequest);
+    }
+
     @Override
     public CompletableFuture<RaftClientReply> sendAsync(Message message, 
boolean endOfRequest) {
-      return client.streamAsync(id, messageId.getAndIncrement(), message, 
endOfRequest);
+      return client.async().send(getStreamRequestType(endOfRequest), message, 
null);
     }
 
     @Override
     public CompletableFuture<RaftClientReply> closeAsync() {
-      return client.streamCloseAsync(id, messageId.getAndIncrement());
+      return client.async().send(getStreamRequestType(true), null, null);
     }
   }
 
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 9d096ce..7105657 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -17,23 +17,27 @@
  */
 package org.apache.ratis.client.impl;
 
-import org.apache.ratis.client.api.AsyncApi;
-import org.apache.ratis.client.api.GroupManagementApi;
-import org.apache.ratis.client.retry.ClientRetryEvent;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.RaftClientRpc;
+import org.apache.ratis.client.api.GroupManagementApi;
 import org.apache.ratis.client.api.MessageStreamApi;
+import org.apache.ratis.client.retry.ClientRetryEvent;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
 import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry;
-import org.apache.ratis.protocol.*;
-import org.apache.ratis.protocol.exceptions.GroupMismatchException;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.SetConfigurationRequest;
 import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.apache.ratis.protocol.exceptions.RaftException;
 import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;
 import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
-import org.apache.ratis.protocol.exceptions.StateMachineException;
 import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.util.CollectionUtils;
 import org.apache.ratis.util.JavaUtils;
@@ -42,7 +46,6 @@ import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.TimeoutScheduler;
 
 import java.io.IOException;
-import java.io.InterruptedIOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
@@ -116,6 +119,8 @@ public final class RaftClientImpl implements RaftClient {
 
   private final Supplier<OrderedAsync> orderedAsync;
   private final Supplier<MessageStreamApi> streamApi;
+  private final Supplier<AsyncImpl> asyncApi;
+  private final Supplier<BlockingImpl> blockingApi;
 
   RaftClientImpl(ClientId clientId, RaftGroup group, RaftPeerId leaderId,
       RaftClientRpc clientRpc, RaftProperties properties, RetryPolicy 
retryPolicy) {
@@ -132,6 +137,8 @@ public final class RaftClientImpl implements RaftClient {
 
     this.orderedAsync = JavaUtils.memoize(() -> OrderedAsync.newInstance(this, 
properties));
     this.streamApi = JavaUtils.memoize(() -> 
MessageStreamImpl.newInstance(this, properties));
+    this.asyncApi = JavaUtils.memoize(() -> new AsyncImpl(this));
+    this.blockingApi = JavaUtils.memoize(() -> new BlockingImpl(this));
   }
 
   public RaftPeerId getLeaderId() {
@@ -186,19 +193,6 @@ public final class RaftClientImpl implements RaftClient {
     return streamApi.get();
   }
 
-  CompletableFuture<RaftClientReply> streamAsync(long streamId, long 
messageId, Message message, boolean endOfRequest) {
-    return sendAsync(RaftClientRequest.streamRequestType(streamId, messageId, 
endOfRequest), message, null);
-  }
-
-  CompletableFuture<RaftClientReply> streamCloseAsync(long streamId, long 
messageId) {
-    return sendAsync(RaftClientRequest.streamRequestType(streamId, messageId, 
true), null, null);
-  }
-
-  CompletableFuture<RaftClientReply> sendAsync(
-      RaftClientRequest.Type type, Message message, RaftPeerId server) {
-    return getOrderedAsync().send(type, message, server);
-  }
-
   RaftClientRequest newRaftClientRequest(
       RaftPeerId server, long callId, Message message, RaftClientRequest.Type 
type,
       SlidingWindowEntry slidingWindowEntry) {
@@ -206,15 +200,6 @@ public final class RaftClientImpl implements RaftClient {
         callId, message, type, slidingWindowEntry);
   }
 
-  RaftClientReply send(RaftClientRequest.Type type, Message message, 
RaftPeerId server)
-      throws IOException {
-    if (!type.is(TypeCase.WATCH)) {
-      Objects.requireNonNull(message, "message == null");
-    }
-
-    final long callId = nextCallId();
-    return sendRequestWithRetry(() -> newRaftClientRequest(server, callId, 
message, type, null));
-  }
 
   // TODO: change peersInNewConf to List<RaftPeer>
   @Override
@@ -225,13 +210,13 @@ public final class RaftClientImpl implements RaftClient {
     final long callId = nextCallId();
     // also refresh the rpc proxies for these peers
     addServers(Arrays.stream(peersInNewConf));
-    return sendRequestWithRetry(() -> new SetConfigurationRequest(
+    return io().sendRequestWithRetry(() -> new SetConfigurationRequest(
         clientId, leaderId, groupId, callId, Arrays.asList(peersInNewConf)));
   }
 
   @Override
-  public AsyncApi async() {
-    return new AsyncImpl(this);
+  public AsyncImpl async() {
+    return asyncApi.get();
   }
 
   @Override
@@ -241,7 +226,7 @@ public final class RaftClientImpl implements RaftClient {
 
   @Override
   public BlockingImpl io() {
-    return new BlockingImpl(this);
+    return blockingApi.get();
   }
 
   void addServers(Stream<RaftPeer> peersInNewConf) {
@@ -249,45 +234,6 @@ public final class RaftClientImpl implements RaftClient {
         peersInNewConf.filter(p -> !peers.contains(p))::iterator);
   }
 
-  private RaftClientReply sendRequestWithRetry(Supplier<RaftClientRequest> 
supplier) throws IOException {
-    PendingClientRequest pending = new PendingClientRequest() {
-      @Override
-      public RaftClientRequest newRequestImpl() {
-        return supplier.get();
-      }
-    };
-    while (true) {
-      final RaftClientRequest request = pending.newRequest();
-      IOException ioe = null;
-      try {
-        final RaftClientReply reply = sendRequest(request);
-
-        if (reply != null) {
-          return reply;
-        }
-      } catch (GroupMismatchException | StateMachineException e) {
-        throw e;
-      } catch (IOException e) {
-        ioe = e;
-      }
-
-      pending.incrementExceptionCount(ioe);
-      ClientRetryEvent event = new ClientRetryEvent(request, ioe, pending);
-      final RetryPolicy.Action action = 
retryPolicy.handleAttemptFailure(event);
-      TimeDuration sleepTime = getEffectiveSleepTime(ioe, 
action.getSleepTime());
-
-      if (!action.shouldRetry()) {
-        throw (IOException)noMoreRetries(event);
-      }
-
-      try {
-        sleepTime.sleep();
-      } catch (InterruptedException e) {
-        throw new InterruptedIOException("retry policy=" + retryPolicy);
-      }
-    }
-  }
-
   Throwable noMoreRetries(ClientRetryEvent event) {
     final int attemptCount = event.getAttemptCount();
     final Throwable throwable = event.getCause();
@@ -297,23 +243,6 @@ public final class RaftClientImpl implements RaftClient {
     return new RaftRetryFailureException(event.getRequest(), attemptCount, 
retryPolicy, throwable);
   }
 
-  RaftClientReply sendRequest(RaftClientRequest request) throws IOException {
-    LOG.debug("{}: send {}", clientId, request);
-    RaftClientReply reply;
-    try {
-      reply = clientRpc.sendRequest(request);
-    } catch (GroupMismatchException gme) {
-      throw gme;
-    } catch (IOException ioe) {
-      handleIOException(request, ioe);
-      throw ioe;
-    }
-    LOG.debug("{}: receive {}", clientId, reply);
-    reply = handleLeaderException(request, reply);
-    reply = handleRaftException(reply, Function.identity());
-    return reply;
-  }
-
   static <E extends Throwable> RaftClientReply handleRaftException(
       RaftClientReply reply, Function<RaftException, E> converter) throws E {
     if (reply != null) {

Reply via email to