Repository: giraph
Updated Branches:
  refs/heads/trunk daf6bd54a -> 160a0d35f


GIRAPH-1087: Retry requests after channel failure

Summary: We currently don't have a callback to retry requests after channel 
failure, and would either wait for request timeout or not retrying request at 
all at places where we don't wait for open requests.

Test Plan: Hard to reproduce the issue (ran many jobs but was unable to), we'll 
see if the problem happens again in prod with this change.

Differential Revision: https://reviews.facebook.net/D60675


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/160a0d35
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/160a0d35
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/160a0d35

Branch: refs/heads/trunk
Commit: 160a0d35f64a52b9e31a725adc2672498d1f3f29
Parents: daf6bd5
Author: Maja Kabiljo <majakabi...@fb.com>
Authored: Tue Jul 12 10:27:47 2016 -0700
Committer: Maja Kabiljo <majakabi...@fb.com>
Committed: Tue Jul 19 08:51:21 2016 -0700

----------------------------------------------------------------------
 .../org/apache/giraph/comm/netty/NettyClient.java | 18 ++++++++++++++----
 1 file changed, 14 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/160a0d35/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index 6afe329..785d906 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -69,6 +69,7 @@ import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
@@ -375,6 +376,14 @@ public class NettyClient {
             }
 /*end[HADOOP_NON_SECURE]*/
           }
+
+          @Override
+          public void channelUnregistered(ChannelHandlerContext ctx) throws
+              Exception {
+            super.channelUnregistered(ctx);
+            LOG.error("Channel failed " + ctx.channel());
+            checkRequestsAfterChannelFailure(ctx.channel());
+          }
         });
   }
 
@@ -1065,13 +1074,14 @@ public class NettyClient {
   /**
    * Resend requests related to channel which failed
    *
-   * @param future ChannelFuture of the failed channel
+   * @param channel Channel which failed
    */
-  private void checkRequestsAfterChannelFailure(final ChannelFuture future) {
+  private void checkRequestsAfterChannelFailure(final Channel channel) {
     resendRequestsWhenNeeded(new Predicate<RequestInfo>() {
       @Override
       public boolean apply(RequestInfo requestInfo) {
-        return requestInfo.getWriteFuture() == future;
+        return requestInfo.getWriteFuture().channel().remoteAddress().equals(
+            channel.remoteAddress());
       }
     });
   }
@@ -1087,7 +1097,7 @@ public class NettyClient {
     public void operationComplete(ChannelFuture future) throws Exception {
       if (future.isDone() && !future.isSuccess()) {
         LOG.error("Request failed", future.cause());
-        checkRequestsAfterChannelFailure(future);
+        checkRequestsAfterChannelFailure(future.channel());
       }
     }
   }

Reply via email to