Repository: giraph Updated Branches: refs/heads/trunk f69f77d20 -> d41221966
JIRA-1211 closes #93 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/d4122196 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/d4122196 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/d4122196 Branch: refs/heads/trunk Commit: d41221966cece5ed8c029a7e941b5d621486ddb1 Parents: f69f77d Author: Maja Kabiljo <majakabi...@fb.com> Authored: Fri Nov 9 11:42:35 2018 -0800 Committer: Maja Kabiljo <majakabi...@fb.com> Committed: Fri Nov 9 11:42:35 2018 -0800 ---------------------------------------------------------------------- .../apache/giraph/comm/netty/NettyClient.java | 23 ++++++++++++++++---- .../org/apache/giraph/conf/GiraphConstants.java | 9 ++++++++ 2 files changed, 28 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/d4122196/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java index 7e9fa87..74011b9 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java @@ -94,6 +94,7 @@ import static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_EXECUTION_AFTE import static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_EXECUTION_THREADS; import static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_USE_EXECUTION_HANDLER; import static org.apache.giraph.conf.GiraphConstants.NETTY_MAX_CONNECTION_FAILURES; +import static org.apache.giraph.conf.GiraphConstants.RESEND_TIMED_OUT_REQUESTS; import static org.apache.giraph.conf.GiraphConstants.WAIT_TIME_BETWEEN_CONNECTION_RETRIES_MS; import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS; @@ -188,6 +189,11 @@ public class NettyClient { private final long waitTimeBetweenConnectionRetriesMs; /** Maximum number of milliseconds for a request */ private final int maxRequestMilliseconds; + /** + * Whether to resend request which timed out or fail the job if timeout + * happens + */ + private final boolean resendTimedOutRequests; /** Waiting interval for checking outstanding requests msecs */ private final int waitingRequestMsecs; /** Timed logger for printing request debugging */ @@ -278,6 +284,7 @@ public class NettyClient { NETWORK_REQUESTS_RESENT_FOR_CONNECTION_FAILURE_NAME)); maxRequestMilliseconds = MAX_REQUEST_MILLISECONDS.get(conf); + resendTimedOutRequests = RESEND_TIMED_OUT_REQUESTS.get(conf); maxConnectionFailures = NETTY_MAX_CONNECTION_FAILURES.get(conf); waitTimeBetweenConnectionRetriesMs = WAIT_TIME_BETWEEN_CONNECTION_RETRIES_MS.get(conf); @@ -997,7 +1004,7 @@ public class NettyClient { // If the request is taking too long, re-establish and resend return requestInfo.getElapsedMsecs() > maxRequestMilliseconds; } - }, networkRequestsResentForTimeout); + }, networkRequestsResentForTimeout, resendTimedOutRequests); resendRequestsWhenNeeded(new Predicate<RequestInfo>() { @Override public boolean apply(RequestInfo requestInfo) { @@ -1006,7 +1013,7 @@ public class NettyClient { return writeFuture != null && (!writeFuture.channel().isActive() || (writeFuture.isDone() && !writeFuture.isSuccess())); } - }, networkRequestsResentForConnectionFailure); + }, networkRequestsResentForConnectionFailure, true); } /** @@ -1014,10 +1021,13 @@ public class NettyClient { * @param shouldResendRequestPredicate Predicate to use to check whether * request should be resent * @param counter Counter to increment for every resent network request + * @param resendProblematicRequest Whether to resend problematic request or + * fail the job if such request is found */ private void resendRequestsWhenNeeded( Predicate<RequestInfo> shouldResendRequestPredicate, - GiraphHadoopCounter counter) { + GiraphHadoopCounter counter, + boolean resendProblematicRequest) { // Check if there are open requests which have been sent a long time ago, // and if so, resend them. List<ClientRequestId> addedRequestIds = Lists.newArrayList(); @@ -1028,6 +1038,11 @@ public class NettyClient { RequestInfo requestInfo = entry.getValue(); // If request should be resent if (shouldResendRequestPredicate.apply(requestInfo)) { + if (!resendProblematicRequest) { + throw new IllegalStateException("Problem with request id " + + entry.getKey() + " for " + requestInfo.getDestinationAddress() + + ", failing the job"); + } ChannelFuture writeFuture = requestInfo.getWriteFuture(); String logMessage; if (writeFuture == null) { @@ -1135,7 +1150,7 @@ public class NettyClient { return requestInfo.getDestinationAddress().equals( channel.remoteAddress()); } - }, networkRequestsResentForChannelFailure); + }, networkRequestsResentForChannelFailure, true); } /** http://git-wip-us.apache.org/repos/asf/giraph/blob/d4122196/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index 4c02fff..e9a5f32 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -690,6 +690,15 @@ public interface GiraphConstants { new IntConfOption("giraph.maxRequestMilliseconds", MINUTES.toMillis(10), "Milliseconds for a request to complete (or else resend)"); + /** + * Whether to resend request which timed out or fail the job if timeout + * happens + */ + BooleanConfOption RESEND_TIMED_OUT_REQUESTS = + new BooleanConfOption("giraph.resendTimedOutRequests", true, + "Whether to resend request which timed out or fail the job if " + + "timeout happens"); + /** Netty max connection failures */ IntConfOption NETTY_MAX_CONNECTION_FAILURES = new IntConfOption("giraph.nettyMaxConnectionFailures", 1000,