Repository: giraph Updated Branches: refs/heads/trunk 480475a51 -> 5def2380c
GIRAPH-976: More command line logging Summary: Add more logging to command line: - if a worker fails log which worker failed - if some worker is low on memory, warn user about it Test Plan: Run a job where worker fails, saw: FATAL 2014-12-29 11:29:25,879 ******* WORKERS [Worker(hostname=hadoop5247.frc1.facebook.com, MRtaskID=2, port=30002)] FAILED ******* Run a job low on memory, saw: INFO 2014-12-29 11:40:55,731 Data from 5 workers - Compute superstep 0: 1040758 out of 1302981 vertices computed; 20 out of 25 partitions computed; min free memory on worker 2 - 3775.79MB, average 94126.79MB, ******* YOUR JOB IS RUNNING LOW ON MEMORY ******* Run normal job verified everything is as before. Reviewers: pavanka, sergey.edunov Differential Revision: https://reviews.facebook.net/D30729 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/5def2380 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/5def2380 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/5def2380 Branch: refs/heads/trunk Commit: 5def2380c35c4b42faf4fe30b7b81910a963d919 Parents: 480475a Author: Maja Kabiljo <[email protected]> Authored: Mon Dec 29 11:47:26 2014 -0800 Committer: Maja Kabiljo <[email protected]> Committed: Mon Dec 29 14:37:18 2014 -0800 ---------------------------------------------------------------------- CHANGELOG | 1 + .../giraph/job/CombinedWorkerProgress.java | 27 +++++++++++++++++++- .../giraph/job/JobProgressTrackerService.java | 2 +- .../apache/giraph/master/BspServiceMaster.java | 19 ++++++++++---- .../org/apache/giraph/utils/MemoryUtils.java | 16 ++++++++++++ .../apache/giraph/worker/WorkerProgress.java | 15 ++++++++++- 6 files changed, 72 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/5def2380/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index efa2878..60f64d1 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,7 @@ Giraph Change Log Release 1.2.0 - unreleased + GIRAPH-976: More command line logging (majakabiljo) GIRAPH-972: Race condition in checkpointing (edunov) http://git-wip-us.apache.org/repos/asf/giraph/blob/5def2380/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java b/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java index a0410b4..e5fef8a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java +++ b/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java @@ -18,7 +18,9 @@ package org.apache.giraph.job; +import org.apache.giraph.conf.FloatConfOption; import org.apache.giraph.worker.WorkerProgress; +import org.apache.hadoop.conf.Configuration; import com.google.common.collect.Iterables; @@ -34,8 +36,21 @@ import javax.annotation.concurrent.NotThreadSafe; public class CombinedWorkerProgress extends WorkerProgress { /** Decimal format which rounds numbers to two decimal places */ public static final DecimalFormat DECIMAL_FORMAT = new DecimalFormat("#.##"); + /** + * If free memory fraction on some worker goes below this value, + * warning will be printed + */ + public static final FloatConfOption NORMAL_FREE_MEMORY_FRACTION = + new FloatConfOption("giraph.normalFreeMemoryFraction", 0.1f, + "If free memory fraction on some worker goes below this value, " + + "warning will be printed"); /** + * If free memory fraction on some worker goes below this value, + * warning will be printed + */ + private double normalFreeMemoryFraction; + /** * How many workers have reported that they are in highest reported * superstep */ @@ -48,13 +63,18 @@ public class CombinedWorkerProgress extends WorkerProgress { private double minFreeMemoryMB = Double.MAX_VALUE; /** Name of the worker with min free memory */ private int workerWithMinFreeMemory; + /** Minimum fraction of free memory on a worker */ + private double minFreeMemoryFraction = Double.MAX_VALUE; /** * Constructor * * @param workerProgresses Worker progresses to combine + * @param conf Configuration */ - public CombinedWorkerProgress(Iterable<WorkerProgress> workerProgresses) { + public CombinedWorkerProgress(Iterable<WorkerProgress> workerProgresses, + Configuration conf) { + normalFreeMemoryFraction = NORMAL_FREE_MEMORY_FRACTION.get(conf); for (WorkerProgress workerProgress : workerProgresses) { if (workerProgress.getCurrentSuperstep() > currentSuperstep) { verticesToCompute = 0; @@ -94,6 +114,8 @@ public class CombinedWorkerProgress extends WorkerProgress { minFreeMemoryMB = workerProgress.getFreeMemoryMB(); workerWithMinFreeMemory = workerProgress.getTaskId(); } + minFreeMemoryFraction = Math.min(minFreeMemoryFraction, + workerProgress.getFreeMemoryFraction()); freeMemoryMB += workerProgress.getFreeMemoryMB(); } if (!Iterables.isEmpty(workerProgresses)) { @@ -140,6 +162,9 @@ public class CombinedWorkerProgress extends WorkerProgress { workerWithMinFreeMemory).append(" - ").append( DECIMAL_FORMAT.format(minFreeMemoryMB)).append("MB, average ").append( DECIMAL_FORMAT.format(freeMemoryMB)).append("MB"); + if (minFreeMemoryFraction < normalFreeMemoryFraction) { + sb.append(", ******* YOUR JOB IS RUNNING LOW ON MEMORY *******"); + } return sb.toString(); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/5def2380/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java index 49610de..064ed5b 100644 --- a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java +++ b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java @@ -89,7 +89,7 @@ public class JobProgressTrackerService implements JobProgressTracker { !workerProgresses.isEmpty()) { // Combine and log CombinedWorkerProgress combinedWorkerProgress = - new CombinedWorkerProgress(workerProgresses.values()); + new CombinedWorkerProgress(workerProgresses.values(), conf); if (LOG.isInfoEnabled()) { LOG.info(combinedWorkerProgress.toString()); } http://git-wip-us.apache.org/repos/asf/giraph/blob/5def2380/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java index 798f544..884dd83 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -651,8 +651,8 @@ public class BspServiceMaster<I extends WritableComparable, "check input of " + inputFormat.getClass().getName() + "!"); getContext().setStatus("Failing job due to 0 input splits, " + "check input of " + inputFormat.getClass().getName() + "!"); - setJobStateFailed("Please check your input tables - partitions which " + - "you specified are missing. Failing the job!!!"); + setJobStateFailed("******* PLEASE CHECK YOUR INPUT TABLES - PARTITIONS " + + "WHICH YOU SPECIFIED ARE MISSING. FAILING THE JOB *******"); } if (minSplitCountHint > splitList.size()) { LOG.warn(logPrefix + ": Number of inputSplits=" + @@ -1410,9 +1410,18 @@ public class BspServiceMaster<I extends WritableComparable, workerInfoHealthyPath, workerInfoList)); if (!ignoreDeath && deadWorkers.size() > 0) { - LOG.error("barrierOnWorkerList: Missing chosen " + - "workers " + deadWorkers + - " on superstep " + getSuperstep()); + String errorMessage = "******* WORKERS " + deadWorkers + + " FAILED *******"; + // If checkpointing is not used, we should fail the job + if (!getConfiguration().useCheckpointing()) { + setJobStateFailed(errorMessage); + } else { + LOG.error("barrierOnWorkerList: Missing chosen " + + "workers " + deadWorkers + + " on superstep " + getSuperstep()); + // Log worker failure to command line + getGraphTaskManager().getJobProgressTracker().logInfo(errorMessage); + } return false; } } catch (KeeperException e) { http://git-wip-us.apache.org/repos/asf/giraph/blob/5def2380/giraph-core/src/main/java/org/apache/giraph/utils/MemoryUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/MemoryUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/MemoryUtils.java index 072265b..18fb00e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/MemoryUtils.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/MemoryUtils.java @@ -65,6 +65,22 @@ public class MemoryUtils { } /** + * Get free plus unallocated memory in megabytes + * @return free plus unallocated memory in megabytes + */ + public static double freePlusUnallocatedMemoryMB() { + return freeMemoryMB() + maxMemoryMB() - totalMemoryMB(); + } + + /** + * Get fraction of memory that's free + * @return Fraction of memory that's free + */ + public static double freeMemoryFraction() { + return freePlusUnallocatedMemoryMB() / maxMemoryMB(); + } + + /** * Initialize metrics tracked by this helper. */ public static void initMetrics() { http://git-wip-us.apache.org/repos/asf/giraph/blob/5def2380/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java index 24f791b..3c25cfe 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java @@ -79,6 +79,8 @@ public class WorkerProgress { /** Free memory */ protected double freeMemoryMB; + /** Fraction of memory that's free */ + protected double freeMemoryFraction; /** * Get singleton instance of WorkerProgress. @@ -216,7 +218,8 @@ public class WorkerProgress { * Update memory info */ public synchronized void updateMemory() { - freeMemoryMB = MemoryUtils.freeMemoryMB(); + freeMemoryMB = MemoryUtils.freePlusUnallocatedMemoryMB(); + freeMemoryFraction = MemoryUtils.freeMemoryFraction(); } @ThriftField(1) @@ -314,6 +317,11 @@ public class WorkerProgress { return freeMemoryMB; } + @ThriftField(20) + public synchronized double getFreeMemoryFraction() { + return freeMemoryFraction; + } + public synchronized boolean isInputSuperstep() { return currentSuperstep == -1; } @@ -417,6 +425,11 @@ public class WorkerProgress { } @ThriftField + public void setFreeMemoryFraction(double freeMemoryFraction) { + this.freeMemoryFraction = freeMemoryFraction; + } + + @ThriftField public synchronized void setTaskId(int taskId) { this.taskId = taskId; }
