Repository: giraph Updated Branches: refs/heads/trunk bc2babcc6 -> 31026d4d2
GIRAPH-985: Add more metrics Summary: When limit number of open requests is on, significant part of computation can be spent in just waiting. So adding metrics for total amount of time compute threads spent in waiting on open requests, and histogram with compute times per partition. These should help detecting why some job is slower than expected (is it from messaging or compute). Also changed the way we wait on open requests, so we continue on different limit than we stop, so we wouldn't have huge number of very tiny pauses. Test Plan: Run some jobs on the cluster and looked through these metrics. Reviewers: sergey.edunov, pavanka Subscribers: ikabiljo Differential Revision: https://reviews.facebook.net/D31683 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/31026d4d Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/31026d4d Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/31026d4d Branch: refs/heads/trunk Commit: 31026d4d2a4ecc6f0af13a545d0ee633900ff2e5 Parents: bc2babc Author: Maja Kabiljo <[email protected]> Authored: Fri Jan 16 09:45:46 2015 -0800 Committer: Maja Kabiljo <[email protected]> Committed: Tue Jan 20 15:12:39 2015 -0800 ---------------------------------------------------------------------- CHANGELOG | 2 + .../org/apache/giraph/comm/WorkerClient.java | 2 +- .../apache/giraph/comm/netty/NettyClient.java | 71 ++++++++++++++------ .../giraph/comm/netty/NettyWorkerClient.java | 2 +- .../apache/giraph/graph/ComputeCallable.java | 9 +++ .../org/apache/giraph/metrics/MetricNames.java | 7 ++ .../metrics/SuperstepMetricsRegistry.java | 11 ++- 7 files changed, 79 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/31026d4d/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 9adca87..527ac04 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.2.0 - unreleased + GIRAPH-985: Add more metrics (majakabiljo) + GIRAPH-986: Add more stuff to TypeOps (ikabiljo via majakabiljo) GIRAPH-962: TextAggregatorWriter with frequency AT_THE_END writes in every superstep (mju via majakabiljo) http://git-wip-us.apache.org/repos/asf/giraph/blob/31026d4d/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClient.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClient.java index 3759f6b..a84a14d 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClient.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClient.java @@ -73,7 +73,7 @@ else[HADOOP_NON_SECURE]*/ * @param destTaskId Destination worker id * @param request Request to send */ - void sendWritableRequest(Integer destTaskId, WritableRequest request); + void sendWritableRequest(int destTaskId, WritableRequest request); /** * Wait until all the outstanding requests are completed. http://git-wip-us.apache.org/repos/asf/giraph/blob/31026d4d/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java index 97394bf..78e318e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java @@ -30,9 +30,16 @@ import org.apache.giraph.comm.requests.RequestType; import org.apache.giraph.comm.requests.SaslTokenMessageRequest; /*end[HADOOP_NON_SECURE]*/ import org.apache.giraph.comm.requests.WritableRequest; +import org.apache.giraph.conf.BooleanConfOption; +import org.apache.giraph.conf.FloatConfOption; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.conf.IntConfOption; import org.apache.giraph.graph.TaskInfo; +import org.apache.giraph.metrics.GiraphMetrics; +import org.apache.giraph.metrics.MetricNames; +import org.apache.giraph.metrics.ResetSuperstepMetricsObserver; +import org.apache.giraph.metrics.SuperstepMetricsRegistry; import org.apache.giraph.utils.PipelineUtils; import org.apache.giraph.utils.ProgressableUtils; import org.apache.giraph.utils.ThreadUtils; @@ -43,6 +50,7 @@ import org.apache.log4j.Logger; import com.google.common.collect.Lists; import com.google.common.collect.MapMaker; import com.google.common.collect.Maps; +import com.yammer.metrics.core.Counter; import java.io.IOException; import java.net.InetSocketAddress; @@ -84,17 +92,23 @@ import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS; /** * Netty client for sending requests. Thread-safe. */ -public class NettyClient { +public class NettyClient implements ResetSuperstepMetricsObserver { /** Do we have a limit on number of open requests we can have */ - public static final String LIMIT_NUMBER_OF_OPEN_REQUESTS = - "giraph.waitForRequestsConfirmation"; - /** Default choice about having a limit on number of open requests */ - public static final boolean LIMIT_NUMBER_OF_OPEN_REQUESTS_DEFAULT = false; + public static final BooleanConfOption LIMIT_NUMBER_OF_OPEN_REQUESTS = + new BooleanConfOption("giraph.waitForRequestsConfirmation", false, + "Whether to have a limit on number of open requests or not"); /** Maximum number of requests without confirmation we should have */ - public static final String MAX_NUMBER_OF_OPEN_REQUESTS = - "giraph.maxNumberOfOpenRequests"; - /** Default maximum number of requests without confirmation */ - public static final int MAX_NUMBER_OF_OPEN_REQUESTS_DEFAULT = 10000; + public static final IntConfOption MAX_NUMBER_OF_OPEN_REQUESTS = + new IntConfOption("giraph.maxNumberOfOpenRequests", 10000, + "Maximum number of requests without confirmation we should have"); + /** + * After pausing a thread due to too large number of open requests, + * which fraction of these requests need to be closed before we continue + */ + public static final FloatConfOption + FRACTION_OF_REQUESTS_TO_CLOSE_BEFORE_PROCEEDING = + new FloatConfOption("giraph.fractionOfRequestsToCloseBeforeProceeding", + 0.2f, "Fraction of requsts to close before proceeding"); /** Maximum number of requests to list (for debugging) */ public static final int MAX_REQUESTS_TO_LIST = 10; /** @@ -147,6 +161,11 @@ public class NettyClient { private final boolean limitNumberOfOpenRequests; /** Maximum number of requests without confirmation we can have */ private final int maxNumberOfOpenRequests; + /** + * Maximum number of requests that can be open after the pause in order to + * proceed + */ + private final int numberOfRequestsToProceed; /** Maximum number of connection failures */ private final int maxConnectionFailures; /** Maximum number of milliseconds for a request */ @@ -181,6 +200,8 @@ public class NettyClient { */ private final LogOnErrorChannelFutureListener logErrorListener = new LogOnErrorChannelFutureListener(); + /** Counter for time spent waiting on too many open requests */ + private Counter timeWaitingOnOpenRequests; /** * Only constructor @@ -201,34 +222,32 @@ public class NettyClient { sendBufferSize = CLIENT_SEND_BUFFER_SIZE.get(conf); receiveBufferSize = CLIENT_RECEIVE_BUFFER_SIZE.get(conf); - limitNumberOfOpenRequests = conf.getBoolean( - LIMIT_NUMBER_OF_OPEN_REQUESTS, - LIMIT_NUMBER_OF_OPEN_REQUESTS_DEFAULT); + limitNumberOfOpenRequests = LIMIT_NUMBER_OF_OPEN_REQUESTS.get(conf); if (limitNumberOfOpenRequests) { - maxNumberOfOpenRequests = conf.getInt( - MAX_NUMBER_OF_OPEN_REQUESTS, - MAX_NUMBER_OF_OPEN_REQUESTS_DEFAULT); + maxNumberOfOpenRequests = MAX_NUMBER_OF_OPEN_REQUESTS.get(conf); + numberOfRequestsToProceed = (int) (maxNumberOfOpenRequests * + (1 - FRACTION_OF_REQUESTS_TO_CLOSE_BEFORE_PROCEEDING.get(conf))); if (LOG.isInfoEnabled()) { LOG.info("NettyClient: Limit number of open requests to " + - maxNumberOfOpenRequests); + maxNumberOfOpenRequests + " and proceed when <= " + + numberOfRequestsToProceed); } } else { maxNumberOfOpenRequests = -1; + numberOfRequestsToProceed = 0; } maxRequestMilliseconds = MAX_REQUEST_MILLISECONDS.get(conf); - maxConnectionFailures = NETTY_MAX_CONNECTION_FAILURES.get(conf); - waitingRequestMsecs = WAITING_REQUEST_MSECS.get(conf); - maxPoolSize = GiraphConstants.NETTY_CLIENT_THREADS.get(conf); - maxResolveAddressAttempts = MAX_RESOLVE_ADDRESS_ATTEMPTS.get(conf); clientRequestIdRequestInfoMap = new MapMaker().concurrencyLevel(maxPoolSize).makeMap(); + GiraphMetrics.get().addSuperstepResetObserver(this); + handlerToUseExecutionGroup = NETTY_CLIENT_EXECUTION_AFTER_HANDLER.get(conf); useExecutionGroup = NETTY_CLIENT_USE_EXECUTION_HANDLER.get(conf); @@ -354,6 +373,12 @@ public class NettyClient { }); } + @Override + public void newSuperstep(SuperstepMetricsRegistry metrics) { + timeWaitingOnOpenRequests = metrics.getCounter( + MetricNames.TIME_SPENT_WAITING_ON_TOO_MANY_OPEN_REQUESTS_MS); + } + /** * Pair object for connectAllAddresses(). */ @@ -673,7 +698,7 @@ public class NettyClient { * @param destTaskId Destination task id * @param request Request to send */ - public void sendWritableRequest(Integer destTaskId, + public void sendWritableRequest(int destTaskId, WritableRequest request) { InetSocketAddress remoteServer = taskIdAddressMap.get(destTaskId); if (clientRequestIdRequestInfoMap.isEmpty()) { @@ -709,7 +734,9 @@ public class NettyClient { if (limitNumberOfOpenRequests && clientRequestIdRequestInfoMap.size() > maxNumberOfOpenRequests) { - waitSomeRequests(maxNumberOfOpenRequests); + long startTime = System.currentTimeMillis(); + waitSomeRequests(numberOfRequestsToProceed); + timeWaitingOnOpenRequests.inc(System.currentTimeMillis() - startTime); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/31026d4d/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java index c893a24..2a89109 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java @@ -138,7 +138,7 @@ public class NettyWorkerClient<I extends WritableComparable, } @Override - public void sendWritableRequest(Integer destTaskId, + public void sendWritableRequest(int destTaskId, WritableRequest request) { Counter counter = superstepRequestCounters.get(request.getType()); if (counter != null) { http://git-wip-us.apache.org/repos/asf/giraph/blob/31026d4d/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java index 33f2255..996159f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java @@ -45,6 +45,7 @@ import org.apache.log4j.Logger; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.yammer.metrics.core.Counter; +import com.yammer.metrics.core.Histogram; import java.io.IOException; import java.util.Collection; @@ -99,6 +100,8 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable, private final Counter messagesSentCounter; /** Message bytes sent */ private final Counter messageBytesSentCounter; + /** Compute time per partition */ + private final Histogram histogramComputePerPartition; /** * Constructor @@ -127,6 +130,8 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable, messagesSentCounter = metrics.getCounter(MetricNames.MESSAGES_SENT); messageBytesSentCounter = metrics.getCounter(MetricNames.MESSAGE_BYTES_SENT); + histogramComputePerPartition = metrics.getUniformHistogram( + MetricNames.HISTOGRAM_COMPUTE_PER_PARTITION); } @Override @@ -150,6 +155,7 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable, Partition<I, V, E> partition = serviceWorker.getPartitionStore().getOrCreatePartition(partitionId); + long startTime = System.currentTimeMillis(); Computation<I, V, E, M1, M2> computation = (Computation<I, V, E, M1, M2>) configuration.createComputation(); @@ -183,6 +189,9 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable, } computation.postSuperstep(); + + histogramComputePerPartition.update( + System.currentTimeMillis() - startTime); } // Return VertexWriter after the usage http://git-wip-us.apache.org/repos/asf/giraph/blob/31026d4d/giraph-core/src/main/java/org/apache/giraph/metrics/MetricNames.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/MetricNames.java b/giraph-core/src/main/java/org/apache/giraph/metrics/MetricNames.java index f731bbc..ff46198 100644 --- a/giraph-core/src/main/java/org/apache/giraph/metrics/MetricNames.java +++ b/giraph-core/src/main/java/org/apache/giraph/metrics/MetricNames.java @@ -61,6 +61,10 @@ public interface MetricNames { /** Counter for sending aggregators from worker owner to other workers */ String SEND_AGGREGATORS_TO_WORKER_REQUESTS = "send-aggregators-to-worker-requests"; + + /** Counter for time spent waiting on too many open requests */ + String TIME_SPENT_WAITING_ON_TOO_MANY_OPEN_REQUESTS_MS = + "time-spent-waiting-on-too-many-open-requests-ms"; ////////////////////////////////////////////////////////////////////////////// // End of Request counters per superstep ////////////////////////////////////////////////////////////////////////////// @@ -91,4 +95,7 @@ public interface MetricNames { String VERTICES_FILTERED = "vertices-filtered"; /** Percent of vertices filtered out */ String VERTICES_FILTERED_PCT = "vertices-filtered-pct"; + + /** Name of metric for compute times per partition */ + String HISTOGRAM_COMPUTE_PER_PARTITION = "compute-per-partition-ms"; } http://git-wip-us.apache.org/repos/asf/giraph/blob/31026d4d/giraph-core/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java b/giraph-core/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java index 57e2431..3a22d69 100644 --- a/giraph-core/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java +++ b/giraph-core/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java @@ -21,6 +21,7 @@ package org.apache.giraph.metrics; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.bsp.BspService; +import com.yammer.metrics.core.Histogram; import com.yammer.metrics.core.Metric; import com.yammer.metrics.core.MetricName; import com.yammer.metrics.core.MetricPredicate; @@ -111,6 +112,14 @@ public class SuperstepMetricsRegistry extends GiraphMetricsRegistry { return name.getType().equals(getType()); } }; - new ConsoleReporter(getInternalRegistry(), out, superstepFilter).run(); + new ConsoleReporter(getInternalRegistry(), out, superstepFilter) { + @Override + public void processHistogram(MetricName name, Histogram histogram, + PrintStream stream) { + super.processHistogram(name, histogram, stream); + stream.printf(" count = %d%n", histogram.count()); + stream.printf(" sum = %,2.2f%n", histogram.sum()); + } + } .run(); } }
