Repository: giraph Updated Branches: refs/heads/trunk 24664b939 -> d827c97fc
Improve out-of-core metrics Summary: For the metric showing the percentage of the graph in memory it makes more sense to show the lowest fraction of the graph that was in memory during a superstep. Basically, a user is more interested to see how bad was the out-of-core execution, and how many more machines he/she needs to use to run the job entirely in memory. Test Plan: mvn clean verify visual, looking at Hadoop metric and per-worker metric Reviewers: sergey.edunov, dionysis.logothetis, maja.kabiljo Reviewed By: dionysis.logothetis, maja.kabiljo Differential Revision: https://reviews.facebook.net/D59451 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/d827c97f Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/d827c97f Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/d827c97f Branch: refs/heads/trunk Commit: d827c97fc244dd89ccee46e686eaf008889550ed Parents: 24664b9 Author: Hassan Eslami <[email protected]> Authored: Mon Jun 20 12:23:42 2016 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Mon Jun 20 12:23:52 2016 -0700 ---------------------------------------------------------------------- .../org/apache/giraph/counters/GiraphStats.java | 14 ++++++++- .../org/apache/giraph/graph/GlobalStats.java | 17 ++++++++++ .../giraph/job/CombinedWorkerProgress.java | 18 +++++++++++ .../apache/giraph/master/BspServiceMaster.java | 11 +++++++ .../giraph/metrics/WorkerSuperstepMetrics.java | 4 +-- .../org/apache/giraph/ooc/OutOfCoreEngine.java | 13 +++----- .../giraph/ooc/data/MetaPartitionManager.java | 33 ++++++++++++++++++++ .../apache/giraph/worker/WorkerProgress.java | 22 +++++++++++++ .../giraph/worker/WorkerProgressStats.java | 3 ++ 9 files changed, 122 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/d827c97f/giraph-core/src/main/java/org/apache/giraph/counters/GiraphStats.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/counters/GiraphStats.java b/giraph-core/src/main/java/org/apache/giraph/counters/GiraphStats.java index 0cb8486..7e22d48 100644 --- a/giraph-core/src/main/java/org/apache/giraph/counters/GiraphStats.java +++ b/giraph-core/src/main/java/org/apache/giraph/counters/GiraphStats.java @@ -62,6 +62,9 @@ public class GiraphStats extends HadoopCountersBase { /** aggregate bytes stored to local disks in out-of-core */ public static final String OOC_BYTES_STORED_NAME = "Aggregate bytes stored to local disks (out-of-core)"; + /** lowest percentage of graph in memory throughout the execution */ + public static final String LOWEST_GRAPH_PERCENTAGE_IN_MEMORY_NAME = + "Lowest percentage of graph in memory so far (out-of-core)"; /** Singleton instance for everyone to use */ private static GiraphStats INSTANCE; @@ -92,8 +95,10 @@ public class GiraphStats extends HadoopCountersBase { private static final int OOC_BYTES_LOADED = 11; /** Aggregate OOC stored bytes counter */ private static final int OOC_BYTES_STORED = 12; + /** Lowest percentage of graph in memory over time */ + private static final int LOWEST_GRAPH_PERCENTAGE_IN_MEMORY = 13; /** Number of counters in this class */ - private static final int NUM_COUNTERS = 13; + private static final int NUM_COUNTERS = 14; /** All the counters stored */ private final GiraphHadoopCounter[] counters; @@ -123,6 +128,9 @@ public class GiraphStats extends HadoopCountersBase { getCounter(AGGREGATE_SENT_MESSAGE_BYTES_NAME); counters[OOC_BYTES_LOADED] = getCounter(OOC_BYTES_LOADED_NAME); counters[OOC_BYTES_STORED] = getCounter(OOC_BYTES_STORED_NAME); + counters[LOWEST_GRAPH_PERCENTAGE_IN_MEMORY] = + getCounter(LOWEST_GRAPH_PERCENTAGE_IN_MEMORY_NAME); + counters[LOWEST_GRAPH_PERCENTAGE_IN_MEMORY].setValue(100); } /** @@ -260,6 +268,10 @@ public class GiraphStats extends HadoopCountersBase { return counters[OOC_BYTES_STORED]; } + public GiraphHadoopCounter getLowestGraphPercentageInMemory() { + return counters[LOWEST_GRAPH_PERCENTAGE_IN_MEMORY]; + } + @Override public Iterator<GiraphHadoopCounter> iterator() { return Arrays.asList(counters).iterator(); http://git-wip-us.apache.org/repos/asf/giraph/blob/d827c97f/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java b/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java index dab3c2f..5636260 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java @@ -46,6 +46,8 @@ public class GlobalStats implements Writable { private long oocStoreBytesCount = 0; /** Bytes of data loaded to disk in the last superstep */ private long oocLoadBytesCount = 0; + /** Lowest percentage of graph in memory throughout the execution */ + private int lowestGraphPercentageInMemory = 100; /** * Master's decision on whether we should checkpoint and * what to do next. @@ -108,6 +110,15 @@ public class GlobalStats implements Writable { this.checkpointStatus = checkpointStatus; } + public int getLowestGraphPercentageInMemory() { + return lowestGraphPercentageInMemory; + } + + public void setLowestGraphPercentageInMemory( + int lowestGraphPercentageInMemory) { + this.lowestGraphPercentageInMemory = lowestGraphPercentageInMemory; + } + /** * Add bytes loaded to the global stats. * @@ -151,6 +162,9 @@ public class GlobalStats implements Writable { edgeCount = input.readLong(); messageCount = input.readLong(); messageBytesCount = input.readLong(); + oocLoadBytesCount = input.readLong(); + oocStoreBytesCount = input.readLong(); + lowestGraphPercentageInMemory = input.readInt(); haltComputation = input.readBoolean(); if (input.readBoolean()) { checkpointStatus = CheckpointStatus.values()[input.readInt()]; @@ -166,6 +180,9 @@ public class GlobalStats implements Writable { output.writeLong(edgeCount); output.writeLong(messageCount); output.writeLong(messageBytesCount); + output.writeLong(oocLoadBytesCount); + output.writeLong(oocStoreBytesCount); + output.writeInt(lowestGraphPercentageInMemory); output.writeBoolean(haltComputation); output.writeBoolean(checkpointStatus != null); if (checkpointStatus != null) { http://git-wip-us.apache.org/repos/asf/giraph/blob/d827c97f/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java b/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java index e931a99..e265163 100644 --- a/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java +++ b/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java @@ -64,6 +64,13 @@ public class CombinedWorkerProgress extends WorkerProgressStats { private int workerWithMinFreeMemory; /** Minimum fraction of free memory on a worker */ private double minFreeMemoryFraction = Double.MAX_VALUE; + /** + * Minimum percentage of graph in memory in any worker so far in the + * computation + */ + private int minGraphPercentageInMemory = 100; + /** Id of the worker with min percentage of graph in memory */ + private int workerWithMinGraphPercentageInMemory = -1; /** * Constructor @@ -116,6 +123,11 @@ public class CombinedWorkerProgress extends WorkerProgressStats { minFreeMemoryFraction = Math.min(minFreeMemoryFraction, workerProgress.getFreeMemoryFraction()); freeMemoryMB += workerProgress.getFreeMemoryMB(); + int percentage = workerProgress.getLowestGraphPercentageInMemory(); + if (percentage < minGraphPercentageInMemory) { + minGraphPercentageInMemory = percentage; + workerWithMinGraphPercentageInMemory = workerProgress.getTaskId(); + } } if (!Iterables.isEmpty(workerProgresses)) { freeMemoryMB /= Iterables.size(workerProgresses); @@ -164,6 +176,12 @@ public class CombinedWorkerProgress extends WorkerProgressStats { if (minFreeMemoryFraction < normalFreeMemoryFraction) { sb.append(", ******* YOUR JOB IS RUNNING LOW ON MEMORY *******"); } + if (minGraphPercentageInMemory < 100) { + sb.append(" Spilling ") + .append(100 - minGraphPercentageInMemory) + .append("% of data to external storage on worker ") + .append(workerWithMinGraphPercentageInMemory); + } return sb.toString(); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/d827c97f/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java index 8372bd3..605e818 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -946,6 +946,12 @@ public class BspServiceMaster<I extends WritableComparable, workerMetrics.getBytesLoadedFromDisk()); globalStats.addOocStoreBytesCount( workerMetrics.getBytesStoredOnDisk()); + // Find the lowest percentage of graph in memory across all workers + // for one superstep + globalStats.setLowestGraphPercentageInMemory( + Math.min(globalStats.getLowestGraphPercentageInMemory(), + (int) Math.round( + workerMetrics.getGraphPercentageInMemory()))); aggregatedMetrics.add(workerMetrics, hostnamePartitionId); } } catch (JSONException e) { @@ -2058,5 +2064,10 @@ public class BspServiceMaster<I extends WritableComparable, .increment(globalStats.getOocLoadBytesCount()); gs.getAggregateOOCBytesStored() .increment(globalStats.getOocStoreBytesCount()); + // Updating the lowest percentage of graph in memory throughout the + // execution across all the supersteps + int percentage = (int) gs.getLowestGraphPercentageInMemory().getValue(); + gs.getLowestGraphPercentageInMemory().setValue( + Math.min(percentage, globalStats.getLowestGraphPercentageInMemory())); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/d827c97f/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java b/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java index 219bcbd..e4281d9 100644 --- a/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java +++ b/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java @@ -68,7 +68,7 @@ public class WorkerSuperstepMetrics implements Writable { superstepGCTimer.setTimeUnit(TimeUnit.MILLISECONDS); bytesLoadedFromDisk = 0; bytesStoredOnDisk = 0; - graphPercentageInMemory = 0; + graphPercentageInMemory = 100; } /** @@ -93,8 +93,6 @@ public class WorkerSuperstepMetrics implements Writable { registry.getExistingGauge(OutOfCoreEngine.GRAPH_PERCENTAGE_IN_MEMORY); if (gauge != null) { graphPercentageInMemory = gauge.value(); - } else { - graphPercentageInMemory = 100; } return this; } http://git-wip-us.apache.org/repos/asf/giraph/blob/d827c97f/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java index d4c2de5..2037abe 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java @@ -19,7 +19,7 @@ package org.apache.giraph.ooc; import com.sun.management.GarbageCollectionNotificationInfo; -import com.yammer.metrics.util.PercentGauge; +import com.yammer.metrics.core.Gauge; import org.apache.giraph.bsp.BspService; import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.comm.ServerData; @@ -485,15 +485,10 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver { @Override public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) { - superstepMetrics.getGauge(GRAPH_PERCENTAGE_IN_MEMORY, new PercentGauge() { + superstepMetrics.getGauge(GRAPH_PERCENTAGE_IN_MEMORY, new Gauge<Double>() { @Override - protected double getNumerator() { - return metaPartitionManager.getNumInMemoryPartitions(); - } - - @Override - protected double getDenominator() { - return metaPartitionManager.getNumPartitions(); + public Double value() { + return metaPartitionManager.getLowestGraphFractionInMemory() * 100; } }); } http://git-wip-us.apache.org/repos/asf/giraph/blob/d827c97f/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java index 784d578..1332a3a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java @@ -20,9 +20,11 @@ package org.apache.giraph.ooc.data; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.AtomicDouble; import org.apache.giraph.bsp.BspService; import org.apache.giraph.ooc.OutOfCoreEngine; import org.apache.giraph.worker.BspServiceWorker; +import org.apache.giraph.worker.WorkerProgress; import org.apache.log4j.Logger; import java.util.ArrayList; @@ -87,6 +89,16 @@ public class MetaPartitionManager { * processing */ private final Random randomGenerator; + /** + * What is the lowest fraction of partitions in memory, relative to the total + * number of available partitions? This is an indirect estimation of the + * amount of graph in memory, which can be used to estimate how many more + * machines needed to avoid out-of-core execution. At the beginning all the + * graph is in memory, so the fraction is 1. This fraction is calculated per + * superstep. + */ + private final AtomicDouble lowestGraphFractionInMemory = + new AtomicDouble(1); /** * Constructor @@ -125,6 +137,24 @@ public class MetaPartitionManager { return partitions.size(); } + public double getLowestGraphFractionInMemory() { + return lowestGraphFractionInMemory.get(); + } + + /** + * Update the lowest fraction of graph in memory so to have a more accurate + * information in one of the counters. + */ + private synchronized void updateGraphFractionInMemory() { + double graphInMemory = + (double) getNumInMemoryPartitions() / getNumPartitions(); + if (graphInMemory < lowestGraphFractionInMemory.get()) { + lowestGraphFractionInMemory.set(graphInMemory); + WorkerProgress.get().updateLowestGraphPercentageInMemory( + (int) (graphInMemory * 100)); + } + } + /** * Whether a given partition is available * @@ -592,6 +622,7 @@ public class MetaPartitionManager { */ public void doneOffloadingPartition(int partitionId) { numInMemoryPartitions.getAndDecrement(); + updateGraphFractionInMemory(); MetaPartition meta = partitions.get(partitionId); int owner = oocEngine.getIOScheduler().getOwnerThreadId(partitionId); synchronized (meta) { @@ -618,6 +649,8 @@ public class MetaPartitionManager { dictionary.reset(); } numPartitionsProcessed.set(0); + lowestGraphFractionInMemory.set((double) getNumInMemoryPartitions() / + getNumPartitions()); } /** http://git-wip-us.apache.org/repos/asf/giraph/blob/d827c97f/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java index eb543cd..4065869 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java @@ -181,6 +181,17 @@ public final class WorkerProgress extends WorkerProgressStats { freeMemoryFraction = MemoryUtils.freeMemoryFraction(); } + /** + * Update lowest percentage of graph which stayed in memory so far in the + * execution + * + * @param fraction the fraction of graph in memory so far in this superstep + */ + public synchronized void updateLowestGraphPercentageInMemory(int fraction) { + lowestGraphPercentageInMemory = + Math.min(lowestGraphPercentageInMemory, fraction); + } + @ThriftField(1) public synchronized long getCurrentSuperstep() { return currentSuperstep; @@ -281,6 +292,11 @@ public final class WorkerProgress extends WorkerProgressStats { return freeMemoryFraction; } + @ThriftField(21) + public synchronized int getLowestGraphPercentageInMemory() { + return lowestGraphPercentageInMemory; + } + public synchronized boolean isInputSuperstep() { return currentSuperstep == -1; } @@ -392,4 +408,10 @@ public final class WorkerProgress extends WorkerProgressStats { public synchronized void setTaskId(int taskId) { this.taskId = taskId; } + + @ThriftField + public synchronized void setLowestGraphPercentageInMemory( + int lowestGraphPercentageInMemory) { + this.lowestGraphPercentageInMemory = lowestGraphPercentageInMemory; + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/d827c97f/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressStats.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressStats.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressStats.java index 04ed2ea..583b073 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressStats.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressStats.java @@ -72,6 +72,9 @@ public class WorkerProgressStats { /** Fraction of memory that's free */ protected double freeMemoryFraction; + /** Lowest percentage of graph in memory throughout the execution so far */ + protected int lowestGraphPercentageInMemory = 100; + public boolean isInputSuperstep() { return currentSuperstep == -1; }
