Repository: hadoop Updated Branches: refs/heads/trunk cfb915f3d -> 10cf5773b
HDDS-845. Create a new raftClient instance for every watch request for Ratis. Contributed by Shashikant Banerjee. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/10cf5773 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/10cf5773 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/10cf5773 Branch: refs/heads/trunk Commit: 10cf5773ba32566dd76730e32a3ccdf2b3bd4d09 Parents: cfb915f Author: Shashikant Banerjee <[email protected]> Authored: Mon Nov 19 14:38:51 2018 +0530 Committer: Shashikant Banerjee <[email protected]> Committed: Mon Nov 19 14:38:51 2018 +0530 ---------------------------------------------------------------------- .../hadoop/hdds/scm/XceiverClientGrpc.java | 3 +- .../hadoop/hdds/scm/XceiverClientRatis.java | 38 +++++++++++++++----- .../hadoop/hdds/scm/XceiverClientSpi.java | 3 +- 3 files changed, 33 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/10cf5773/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index c6b19ab..5592c1d 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -290,7 +290,8 @@ public class XceiverClientGrpc extends XceiverClientSpi { @Override public void watchForCommit(long index, long timeout) - throws InterruptedException, ExecutionException, TimeoutException { + throws InterruptedException, ExecutionException, TimeoutException, + IOException { // there is no notion of watch for commit index in standalone pipeline }; http://git-wip-us.apache.org/repos/asf/hadoop/blob/10cf5773/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index 6b3b001..b238a09 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -122,11 +122,15 @@ public final class XceiverClientRatis extends XceiverClientSpi { public void close() { final RaftClient c = client.getAndSet(null); if (c != null) { - try { - c.close(); - } catch (IOException e) { - throw new IllegalStateException(e); - } + closeRaftClient(c); + } + } + + private void closeRaftClient(RaftClient raftClient) { + try { + raftClient.close(); + } catch (IOException e) { + throw new IllegalStateException(e); } } @@ -145,19 +149,35 @@ public final class XceiverClientRatis extends XceiverClientSpi { @Override public void watchForCommit(long index, long timeout) - throws InterruptedException, ExecutionException, TimeoutException { - // TODO: Create a new Raft client instance to watch - CompletableFuture<RaftClientReply> replyFuture = getClient() + throws InterruptedException, ExecutionException, TimeoutException, + IOException { + LOG.debug("commit index : {} watch timeout : {}", index, timeout); + // create a new RaftClient instance for watch request + RaftClient raftClient = + RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy); + CompletableFuture<RaftClientReply> replyFuture = raftClient .sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED); try { replyFuture.get(timeout, TimeUnit.MILLISECONDS); } catch (TimeoutException toe) { LOG.warn("3 way commit failed ", toe); - getClient() + + closeRaftClient(raftClient); + // generate a new raft client instance again so that next watch request + // does not get blocked for the previous one + + // TODO : need to remove the code to create the new RaftClient instance + // here once the watch request bypassing sliding window in Raft Client + // gets fixed. + raftClient = + RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy); + raftClient .sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED) .get(timeout, TimeUnit.MILLISECONDS); LOG.info("Could not commit " + index + " to all the nodes." + "Committed by majority."); + } finally { + closeRaftClient(raftClient); } } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/10cf5773/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java index 7000660..e9896dc 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java @@ -126,5 +126,6 @@ public abstract class XceiverClientSpi implements Closeable { public abstract HddsProtos.ReplicationType getPipelineType(); public abstract void watchForCommit(long index, long timeout) - throws InterruptedException, ExecutionException, TimeoutException; + throws InterruptedException, ExecutionException, TimeoutException, + IOException; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
