Repository: giraph
Updated Branches:
  refs/heads/trunk f69f77d20 -> d41221966


JIRA-1211

closes #93


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/d4122196
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/d4122196
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/d4122196

Branch: refs/heads/trunk
Commit: d41221966cece5ed8c029a7e941b5d621486ddb1
Parents: f69f77d
Author: Maja Kabiljo <majakabi...@fb.com>
Authored: Fri Nov 9 11:42:35 2018 -0800
Committer: Maja Kabiljo <majakabi...@fb.com>
Committed: Fri Nov 9 11:42:35 2018 -0800

----------------------------------------------------------------------
 .../apache/giraph/comm/netty/NettyClient.java   | 23 ++++++++++++++++----
 .../org/apache/giraph/conf/GiraphConstants.java |  9 ++++++++
 2 files changed, 28 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/d4122196/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index 7e9fa87..74011b9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -94,6 +94,7 @@ import static 
org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_EXECUTION_AFTE
 import static 
org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_EXECUTION_THREADS;
 import static 
org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_USE_EXECUTION_HANDLER;
 import static 
org.apache.giraph.conf.GiraphConstants.NETTY_MAX_CONNECTION_FAILURES;
+import static org.apache.giraph.conf.GiraphConstants.RESEND_TIMED_OUT_REQUESTS;
 import static 
org.apache.giraph.conf.GiraphConstants.WAIT_TIME_BETWEEN_CONNECTION_RETRIES_MS;
 import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS;
 
@@ -188,6 +189,11 @@ public class NettyClient {
   private final long waitTimeBetweenConnectionRetriesMs;
   /** Maximum number of milliseconds for a request */
   private final int maxRequestMilliseconds;
+  /**
+   * Whether to resend request which timed out or fail the job if timeout
+   * happens
+   */
+  private final boolean resendTimedOutRequests;
   /** Waiting interval for checking outstanding requests msecs */
   private final int waitingRequestMsecs;
   /** Timed logger for printing request debugging */
@@ -278,6 +284,7 @@ public class NettyClient {
         NETWORK_REQUESTS_RESENT_FOR_CONNECTION_FAILURE_NAME));
 
     maxRequestMilliseconds = MAX_REQUEST_MILLISECONDS.get(conf);
+    resendTimedOutRequests = RESEND_TIMED_OUT_REQUESTS.get(conf);
     maxConnectionFailures = NETTY_MAX_CONNECTION_FAILURES.get(conf);
     waitTimeBetweenConnectionRetriesMs =
         WAIT_TIME_BETWEEN_CONNECTION_RETRIES_MS.get(conf);
@@ -997,7 +1004,7 @@ public class NettyClient {
         // If the request is taking too long, re-establish and resend
         return requestInfo.getElapsedMsecs() > maxRequestMilliseconds;
       }
-    }, networkRequestsResentForTimeout);
+    }, networkRequestsResentForTimeout, resendTimedOutRequests);
     resendRequestsWhenNeeded(new Predicate<RequestInfo>() {
       @Override
       public boolean apply(RequestInfo requestInfo) {
@@ -1006,7 +1013,7 @@ public class NettyClient {
         return writeFuture != null && (!writeFuture.channel().isActive() ||
             (writeFuture.isDone() && !writeFuture.isSuccess()));
       }
-    }, networkRequestsResentForConnectionFailure);
+    }, networkRequestsResentForConnectionFailure, true);
   }
 
   /**
@@ -1014,10 +1021,13 @@ public class NettyClient {
    *  @param shouldResendRequestPredicate Predicate to use to check whether
    *                                     request should be resent
    * @param counter Counter to increment for every resent network request
+   * @param resendProblematicRequest Whether to resend problematic request or
+   *                                fail the job if such request is found
    */
   private void resendRequestsWhenNeeded(
       Predicate<RequestInfo> shouldResendRequestPredicate,
-      GiraphHadoopCounter counter) {
+      GiraphHadoopCounter counter,
+      boolean resendProblematicRequest) {
     // Check if there are open requests which have been sent a long time ago,
     // and if so, resend them.
     List<ClientRequestId> addedRequestIds = Lists.newArrayList();
@@ -1028,6 +1038,11 @@ public class NettyClient {
       RequestInfo requestInfo = entry.getValue();
       // If request should be resent
       if (shouldResendRequestPredicate.apply(requestInfo)) {
+        if (!resendProblematicRequest) {
+          throw new IllegalStateException("Problem with request id " +
+              entry.getKey() + " for " + requestInfo.getDestinationAddress() +
+              ", failing the job");
+        }
         ChannelFuture writeFuture = requestInfo.getWriteFuture();
         String logMessage;
         if (writeFuture == null) {
@@ -1135,7 +1150,7 @@ public class NettyClient {
         return requestInfo.getDestinationAddress().equals(
             channel.remoteAddress());
       }
-    }, networkRequestsResentForChannelFailure);
+    }, networkRequestsResentForChannelFailure, true);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/d4122196/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java 
b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 4c02fff..e9a5f32 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -690,6 +690,15 @@ public interface GiraphConstants {
       new IntConfOption("giraph.maxRequestMilliseconds", MINUTES.toMillis(10),
           "Milliseconds for a request to complete (or else resend)");
 
+  /**
+   * Whether to resend request which timed out or fail the job if timeout
+   * happens
+   */
+  BooleanConfOption RESEND_TIMED_OUT_REQUESTS =
+      new BooleanConfOption("giraph.resendTimedOutRequests", true,
+          "Whether to resend request which timed out or fail the job if " +
+              "timeout happens");
+
   /** Netty max connection failures */
   IntConfOption NETTY_MAX_CONNECTION_FAILURES =
       new IntConfOption("giraph.nettyMaxConnectionFailures", 1000,

Reply via email to