Repository: giraph Updated Branches: refs/heads/trunk daf6bd54a -> 160a0d35f
GIRAPH-1087: Retry requests after channel failure Summary: We currently don't have a callback to retry requests after channel failure, and would either wait for request timeout or not retrying request at all at places where we don't wait for open requests. Test Plan: Hard to reproduce the issue (ran many jobs but was unable to), we'll see if the problem happens again in prod with this change. Differential Revision: https://reviews.facebook.net/D60675 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/160a0d35 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/160a0d35 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/160a0d35 Branch: refs/heads/trunk Commit: 160a0d35f64a52b9e31a725adc2672498d1f3f29 Parents: daf6bd5 Author: Maja Kabiljo <majakabi...@fb.com> Authored: Tue Jul 12 10:27:47 2016 -0700 Committer: Maja Kabiljo <majakabi...@fb.com> Committed: Tue Jul 19 08:51:21 2016 -0700 ---------------------------------------------------------------------- .../org/apache/giraph/comm/netty/NettyClient.java | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/160a0d35/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java index 6afe329..785d906 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java @@ -69,6 +69,7 @@ import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; @@ -375,6 +376,14 @@ public class NettyClient { } /*end[HADOOP_NON_SECURE]*/ } + + @Override + public void channelUnregistered(ChannelHandlerContext ctx) throws + Exception { + super.channelUnregistered(ctx); + LOG.error("Channel failed " + ctx.channel()); + checkRequestsAfterChannelFailure(ctx.channel()); + } }); } @@ -1065,13 +1074,14 @@ public class NettyClient { /** * Resend requests related to channel which failed * - * @param future ChannelFuture of the failed channel + * @param channel Channel which failed */ - private void checkRequestsAfterChannelFailure(final ChannelFuture future) { + private void checkRequestsAfterChannelFailure(final Channel channel) { resendRequestsWhenNeeded(new Predicate<RequestInfo>() { @Override public boolean apply(RequestInfo requestInfo) { - return requestInfo.getWriteFuture() == future; + return requestInfo.getWriteFuture().channel().remoteAddress().equals( + channel.remoteAddress()); } }); } @@ -1087,7 +1097,7 @@ public class NettyClient { public void operationComplete(ChannelFuture future) throws Exception { if (future.isDone() && !future.isSuccess()) { LOG.error("Request failed", future.cause()); - checkRequestsAfterChannelFailure(future); + checkRequestsAfterChannelFailure(future.channel()); } } }