Updated Branches: refs/heads/trunk 2347627d5 -> 71eab655e
GIRAPH-539: When having open requests log which workers are they sent to (majakabiljo) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/71eab655 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/71eab655 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/71eab655 Branch: refs/heads/trunk Commit: 71eab655eea02b946fb88907650168d7c2d18bef Parents: 2347627 Author: Maja Kabiljo <[email protected]> Authored: Mon Feb 25 11:21:00 2013 -0800 Committer: Maja Kabiljo <[email protected]> Committed: Mon Feb 25 11:22:00 2013 -0800 ---------------------------------------------------------------------- CHANGELOG | 2 + .../org/apache/giraph/comm/netty/NettyClient.java | 83 ++++++++++++--- 2 files changed, 69 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/71eab655/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index ec34ba0..3323438 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-539: When having open requests log which workers are they sent to (majakabiljo) + GIRAPH-532: Give an explanation when trying to use unregistered aggregators (majakabiljo) GIRAPH-453: Pure Hive I/O (nitay) http://git-wip-us.apache.org/repos/asf/giraph/blob/71eab655/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java index feae3e2..af76410 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java @@ -53,11 +53,14 @@ import org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor; import com.google.common.collect.Lists; import com.google.common.collect.MapMaker; +import com.google.common.collect.Maps; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; import java.net.InetSocketAddress; import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; @@ -85,6 +88,11 @@ public class NettyClient { public static final int MAX_NUMBER_OF_OPEN_REQUESTS_DEFAULT = 10000; /** Maximum number of requests to list (for debugging) */ public static final int MAX_REQUESTS_TO_LIST = 10; + /** + * Maximum number of destination task ids with open requests to list + * (for debugging) + */ + public static final int MAX_DESTINATION_TASK_IDS_TO_LIST = 10; /** 30 seconds to connect by default */ public static final int MAX_CONNECTION_MILLISECONDS_DEFAULT = 30 * 1000; /*if_not[HADOOP_NON_SECURE]*/ @@ -685,21 +693,7 @@ public class NettyClient { private void waitSomeRequests(int maxOpenRequests) { while (clientRequestIdRequestInfoMap.size() > maxOpenRequests) { // Wait for requests to complete for some time - if (LOG.isInfoEnabled() && requestLogger.isPrintable()) { - LOG.info("waitSomeRequests: Waiting interval of " + - waitingRequestMsecs + " msecs, " + - clientRequestIdRequestInfoMap.size() + - " open requests, waiting for it to be <= " + maxOpenRequests + - ", " + byteCounter.getMetrics()); - - if (clientRequestIdRequestInfoMap.size() < MAX_REQUESTS_TO_LIST) { - for (Map.Entry<ClientRequestId, RequestInfo> entry : - clientRequestIdRequestInfoMap.entrySet()) { - LOG.info("waitSomeRequests: Waiting for request " + - entry.getKey() + " - " + entry.getValue()); - } - } - } + logInfoAboutOpenRequests(maxOpenRequests); synchronized (clientRequestIdRequestInfoMap) { if (clientRequestIdRequestInfoMap.size() <= maxOpenRequests) { break; @@ -707,7 +701,7 @@ public class NettyClient { try { clientRequestIdRequestInfoMap.wait(waitingRequestMsecs); } catch (InterruptedException e) { - LOG.error("waitFutures: Got unexpected InterruptedException", e); + LOG.error("waitSomeRequests: Got unexpected InterruptedException", e); } } // Make sure that waiting doesn't kill the job @@ -718,6 +712,63 @@ public class NettyClient { } /** + * Log the status of open requests. + * + * @param maxOpenRequests Maximum number of requests which can be not complete + */ + private void logInfoAboutOpenRequests(int maxOpenRequests) { + if (LOG.isInfoEnabled() && requestLogger.isPrintable()) { + LOG.info("logInfoAboutOpenRequests: Waiting interval of " + + waitingRequestMsecs + " msecs, " + + clientRequestIdRequestInfoMap.size() + + " open requests, waiting for it to be <= " + maxOpenRequests + + ", " + byteCounter.getMetrics()); + + if (clientRequestIdRequestInfoMap.size() < MAX_REQUESTS_TO_LIST) { + for (Map.Entry<ClientRequestId, RequestInfo> entry : + clientRequestIdRequestInfoMap.entrySet()) { + LOG.info("logInfoAboutOpenRequests: Waiting for request " + + entry.getKey() + " - " + entry.getValue()); + } + } + + // Count how many open requests each task has + Map<Integer, Integer> openRequestCounts = Maps.newHashMap(); + for (ClientRequestId clientRequestId : + clientRequestIdRequestInfoMap.keySet()) { + int taskId = clientRequestId.getDestinationTaskId(); + Integer currentCount = openRequestCounts.get(taskId); + openRequestCounts.put(taskId, + (currentCount == null ? 0 : currentCount) + 1); + } + // Sort it in decreasing order of number of open requests + List<Map.Entry<Integer, Integer>> sorted = + Lists.newArrayList(openRequestCounts.entrySet()); + Collections.sort(sorted, new Comparator<Map.Entry<Integer, Integer>>() { + @Override + public int compare(Map.Entry<Integer, Integer> entry1, + Map.Entry<Integer, Integer> entry2) { + int value1 = entry1.getValue(); + int value2 = entry2.getValue(); + return (value1 < value2) ? 1 : ((value1 == value2) ? 0 : -1); + } + }); + // Print task ids which have the most open requests + StringBuilder message = new StringBuilder(); + message.append("logInfoAboutOpenRequests: "); + int itemsToPrint = + Math.min(MAX_DESTINATION_TASK_IDS_TO_LIST, sorted.size()); + for (int i = 0; i < itemsToPrint; i++) { + message.append(sorted.get(i).getValue()) + .append(" requests for taskId=") + .append(sorted.get(i).getKey()) + .append(", "); + } + LOG.info(message); + } + } + + /** * Check if there are some open requests which have been sent a long time * ago, and if so resend them. */
