Author: omalley
Date: Tue Aug 12 13:15:38 2008
New Revision: 685290
URL: http://svn.apache.org/viewvc?rev=685290&view=rev
Log:
HADOOP-657. Free disk space should be modelled and used by the scheduler
to make scheduling decisions. (Ari Rabkin via omalley)
Added:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=685290&r1=685289&r2=685290&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Aug 12 13:15:38 2008
@@ -72,6 +72,9 @@
HADOOP-153. Provides a way to skip bad records. (Sharad Agarwal via ddas)
+ HADOOP-657. Free disk space should be modelled and used by the scheduler
+ to make scheduling decisions. (Ari Rabkin via omalley)
+
IMPROVEMENTS
HADOOP-3732. Delay intialization of datanode block verification till
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=685290&r1=685289&r2=685290&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java Tue
Aug 12 13:15:38 2008
@@ -800,7 +800,8 @@
private String splitClass;
private BytesWritable bytes = new BytesWritable();
private String[] locations;
-
+ long dataLength;
+
public void setBytes(byte[] data, int offset, int length) {
bytes.set(data, offset, length);
}
@@ -831,6 +832,7 @@
public void readFields(DataInput in) throws IOException {
splitClass = Text.readString(in);
+ dataLength = in.readLong();
bytes.readFields(in);
int len = WritableUtils.readVInt(in);
locations = new String[len];
@@ -841,12 +843,21 @@
public void write(DataOutput out) throws IOException {
Text.writeString(out, splitClass);
+ out.writeLong(dataLength);
bytes.write(out);
WritableUtils.writeVInt(out, locations.length);
for(int i = 0; i < locations.length; i++) {
Text.writeString(out, locations[i]);
}
}
+
+ public long getDataLength() {
+ return dataLength;
+ }
+ public void setDataLength(long l) {
+ dataLength = l;
+ }
+
}
private static final int CURRENT_SPLIT_FILE_VERSION = 0;
@@ -871,6 +882,7 @@
rawSplit.setClassName(split.getClass().getName());
buffer.reset();
split.write(buffer);
+ rawSplit.setDataLength(split.getLength());
rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
rawSplit.setLocations(split.getLocations());
rawSplit.write(out);
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=685290&r1=685289&r2=685290&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Tue Aug 12 13:15:38 2008
@@ -119,6 +119,9 @@
private Map<String, Integer> trackerToFailuresMap =
new TreeMap<String, Integer>();
+ //Confine estimation algorithms to an "oracle" class that JIP queries.
+ private ResourceEstimator resourceEstimator;
+
long startTime;
long finishTime;
@@ -129,6 +132,7 @@
private JobID jobId;
private boolean hasSpeculativeMaps;
private boolean hasSpeculativeReduces;
+ private long inputLength = 0;
// Per-job counters
public static enum Counter {
@@ -220,6 +224,7 @@
this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
this.nonRunningReduces = new LinkedList<TaskInProgress>();
this.runningReduces = new LinkedHashSet<TaskInProgress>();
+ this.resourceEstimator = new ResourceEstimator(this);
}
/**
@@ -335,10 +340,12 @@
numMapTasks = splits.length;
maps = new TaskInProgress[numMapTasks];
for(int i=0; i < numMapTasks; ++i) {
+ inputLength += splits[i].getDataLength();
maps[i] = new TaskInProgress(jobId, jobFile,
splits[i],
jobtracker, conf, this, i);
}
+ LOG.info("Input size for job "+ jobId + " = " + inputLength);
if (numMapTasks > 0) {
LOG.info("Split info for job:" + jobId);
nonRunningMapCache = createCache(splits, maxLevel);
@@ -434,6 +441,10 @@
this.priority = priority;
}
}
+
+ long getInputLength() {
+ return inputLength;
+ }
/**
* Get the list of map tasks
@@ -1076,6 +1087,17 @@
Node node = jobtracker.getNode(tts.getHost());
Node nodeParentAtMaxLevel = null;
+
+ long outSize = resourceEstimator.getEstimatedMapOutputSize();
+ if(tts.getAvailableSpace() < outSize) {
+ LOG.warn("No room for map task. Node " + node +
+ " has " + tts.getAvailableSpace() +
+ " bytes free; but we expect map to take " + outSize);
+
+ return -1; //see if a different TIP might work better.
+ }
+
+
// For scheduling a map task, we have two caches and a list (optional)
// I) one for non-running task
// II) one for running task (this is for handling speculation)
@@ -1272,6 +1294,15 @@
return -1;
}
+ long outSize = resourceEstimator.getEstimatedReduceInputSize();
+ if(tts.getAvailableSpace() < outSize) {
+ LOG.warn("No room for reduce task. Node " + taskTracker + " has " +
+ tts.getAvailableSpace() +
+ " bytes free; but we expect reduce input to take " + outSize);
+
+ return -1; //see if a different TIP might work better.
+ }
+
// 1. check for a never-executed reduce tip
// reducers don't have a cache and so pass -1 to explicitly call that out
tip = findTaskFromList(nonRunningReduces, tts, numUniqueHosts, false);
@@ -1342,6 +1373,7 @@
// Mark the TIP as complete
tip.completed(taskid);
+ resourceEstimator.updateWithCompletedTask(status, tip);
// Update jobhistory
String taskTrackerName = jobtracker.getNode(jobtracker.getTaskTracker(
Added:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java?rev=685290&view=auto
==============================================================================
---
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java
(added)
+++
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java
Tue Aug 12 13:15:38 2008
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.util.concurrent.atomic.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Class responsible for modeling the resource consumption of running tasks.
+ *
+ * For now, we just do temp space for maps
+ *
+ * There is one ResourceEstimator per JobInProgress
+ *
+ */
+public class ResourceEstimator {
+
+ //Log with JobInProgress
+ private static final Log LOG = LogFactory.getLog(
+ "org.apache.hadoop.mapred.ResourceEstimator");
+
+
+ /**
+ * Estimated ratio of output to input size for map tasks.
+ */
+ private double mapBlowupRatio;
+ private double estimateWeight;
+ private JobInProgress job;
+
+ //guess a factor of two blowup due to temp space for merge
+ public static final double INITIAL_BLOWUP_GUESS = 1;
+
+ //initial estimate is weighted as much as this fraction of the real
datapoints
+ static final double INITIAL_EST_WEIGHT_PERCENT = 0.05;
+
+
+ public ResourceEstimator(JobInProgress job) {
+ mapBlowupRatio = INITIAL_BLOWUP_GUESS;
+ this.job = job;
+ estimateWeight = INITIAL_EST_WEIGHT_PERCENT * job.desiredMaps();
+ }
+
+
+ /**
+ * Have private access methods to abstract away synchro.
+ * @return
+ */
+ private synchronized double getBlowupRatio() {
+ return mapBlowupRatio;
+ }
+
+ private synchronized void setBlowupRatio(double b) {
+ mapBlowupRatio = b;
+ }
+
+
+
+ public void updateWithCompletedTask(TaskStatus ts, TaskInProgress tip) {
+
+ //-1 indicates error, which we don't average in.
+ if(tip.isMapTask() && ts.getOutputSize() != -1) {
+ double blowupOnThisTask = ts.getOutputSize() /
+ (double) tip.getMapInputSize();
+
+ LOG.info("measured blowup on " + tip.getTIPId() + " was " +
+ ts.getOutputSize() + "/" +tip.getMapInputSize() + " = "
+ + blowupOnThisTask);
+
+ double newEstimate = blowupOnThisTask / estimateWeight +
+ ((estimateWeight - 1) / estimateWeight) * getBlowupRatio();
+ estimateWeight++;
+ setBlowupRatio(newEstimate);
+ }
+ }
+
+ /**
+ *
+ * @return estimated length of this job's average map output
+ * @throws IOException if the split's getLength() does.
+ */
+ public long getEstimatedMapOutputSize() {
+ double blowup =getBlowupRatio();
+ long estimate =
+ (long) (job.getInputLength() * blowup / job.desiredMaps() * 2.0);
+ LOG.info("estimate map will take " + estimate +
+ " bytes. (blowup = 2*" + blowup + ")");
+ return estimate;
+ }
+
+
+ //estimate that each reduce gets an equal share of total map output
+ public long getEstimatedReduceInputSize() {
+ return
+ getEstimatedMapOutputSize() * job.desiredMaps() / job.desiredReduces();
+ }
+
+
+}
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=685290&r1=685289&r2=685290&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
Tue Aug 12 13:15:38 2008
@@ -820,6 +820,14 @@
return ret.toString();
}
+ public long getMapInputSize() {
+ if(isMapTask()) {
+ return rawSplit.getDataLength();
+ } else {
+ return 0;
+ }
+ }
+
public void clearSplit() {
rawSplit.clearBytes();
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java?rev=685290&r1=685289&r2=685290&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java Tue
Aug 12 13:15:38 2008
@@ -52,6 +52,7 @@
private long startTime;
private long finishTime;
+ private long outputSize;
private Phase phase = Phase.STARTING;
private Counters counters;
@@ -225,6 +226,21 @@
}
/**
+ * Returns the number of bytes of output from this map.
+ */
+ public long getOutputSize() {
+ return outputSize;
+ }
+
+ /**
+ * Set the size on disk of this task's output.
+ * @param l the number of map output bytes
+ */
+ void setOutputSize(long l) {
+ outputSize = l;
+ }
+
+ /**
* Get the list of maps from which output-fetches failed.
*
* @return the list of maps from which output-fetches failed.
@@ -278,6 +294,7 @@
this.phase = status.getPhase();
this.counters = status.getCounters();
+ this.outputSize = status.outputSize;
}
/**
@@ -313,6 +330,7 @@
out.writeLong(startTime);
out.writeLong(finishTime);
out.writeBoolean(includeCounters);
+ out.writeLong(outputSize);
if (includeCounters) {
counters.write(out);
}
@@ -330,6 +348,7 @@
this.finishTime = in.readLong();
counters = new Counters();
this.includeCounters = in.readBoolean();
+ this.outputSize = in.readLong();
if (includeCounters) {
counters.readFields(in);
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=685290&r1=685289&r2=685290&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Tue
Aug 12 13:15:38 2008
@@ -1025,6 +1025,7 @@
if (askForNewTask) {
checkLocalDirs(fConf.getLocalDirs());
askForNewTask = enoughFreeSpace(localMinSpaceStart);
+ status.setAvailableSpace( getFreeSpace() );
}
//
@@ -1243,8 +1244,8 @@
}
/**
- * Check if all of the local directories have enough
- * free space
+ * Check if any of the local directories has enough
+ * free space (more than minSpace)
*
* If not, do not try to get a new task assigned
* @return
@@ -1254,6 +1255,11 @@
if (minSpace == 0) {
return true;
}
+ return minSpace < getFreeSpace();
+ }
+
+ private long getFreeSpace() throws IOException {
+ long biggestSeenSoFar = 0;
String[] localDirs = fConf.getLocalDirs();
for (int i = 0; i < localDirs.length; i++) {
DF df = null;
@@ -1264,14 +1270,52 @@
localDirsDf.put(localDirs[i], df);
}
- if (df.getAvailable() > minSpace)
- return true;
+ long availOnThisVol = df.getAvailable();
+ if (availOnThisVol > biggestSeenSoFar) {
+ biggestSeenSoFar = availOnThisVol;
+ }
}
-
- return false;
+
+ //Should ultimately hold back the space we expect running tasks to use but
+ //that estimate isn't currently being passed down to the TaskTrackers
+ return biggestSeenSoFar;
}
/**
+ * Try to get the size of output for this task.
+ * Returns -1 if it can't be found.
+ * @return
+ */
+ long tryToGetOutputSize(TaskAttemptID taskId, JobConf conf) {
+
+ try{
+ TaskInProgress tip;
+ synchronized(this) {
+ tip = tasks.get(taskId);
+ }
+ if(tip == null)
+ return -1;
+
+ MapOutputFile mapOutputFile = new MapOutputFile();
+ mapOutputFile.setJobId(taskId.getJobID());
+ mapOutputFile.setConf(conf);
+
+ Path tmp_output = mapOutputFile.getOutputFile(taskId);
+ if(tmp_output == null)
+ return 0;
+ FileSystem localFS = FileSystem.getLocal(conf);
+ FileStatus stat = localFS.getFileStatus(tmp_output);
+ if(stat == null)
+ return 0;
+ else
+ return stat.getLen();
+ } catch(IOException e) {
+ LOG.info(e);
+ return -1;
+ }
+ }
+
+ /**
* Start a new task.
* All exceptions are handled locally, so that we don't mess up the
* task tracker.
@@ -1596,6 +1640,8 @@
this.done = true;
LOG.info("Task " + task.getTaskID() + " is done.");
+ LOG.info("reported output size for " + task.getTaskID() + " was " +
taskStatus.getOutputSize());
+
}
/**
@@ -1660,7 +1706,7 @@
localJobConf). toString());
} catch (IOException e) {
LOG.warn("Working Directory of the task " + task.getTaskID() +
- "doesnt exist. Throws expetion " +
+ "doesnt exist. Caught exception " +
StringUtils.stringifyException(e));
}
// Build the command
@@ -2195,6 +2241,7 @@
for(TaskInProgress tip: runningTasks.values()) {
TaskStatus status = tip.getStatus();
status.setIncludeCounters(sendCounters);
+ status.setOutputSize(tryToGetOutputSize(status.getTaskID(), fConf));
// send counters for finished or failed tasks.
if (status.getRunState() != TaskStatus.State.RUNNING) {
status.setIncludeCounters(true);
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=685290&r1=685289&r2=685290&view=diff
==============================================================================
---
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
(original)
+++
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
Tue Aug 12 13:15:38 2008
@@ -48,11 +48,13 @@
volatile long lastSeen;
private int maxMapTasks;
private int maxReduceTasks;
+ long availableSpace; //space available on this node
/**
*/
public TaskTrackerStatus() {
taskReports = new ArrayList<TaskStatus>();
+ this.availableSpace = Long.MAX_VALUE; //not measured by default.
}
/**
@@ -69,6 +71,7 @@
this.failures = failures;
this.maxMapTasks = maxMapTasks;
this.maxReduceTasks = maxReduceTasks;
+ this.availableSpace = Long.MAX_VALUE; //not measured by default.
}
/**
@@ -166,6 +169,20 @@
public int getMaxReduceTasks() {
return maxReduceTasks;
}
+
+ /**
+ * Will return LONG_MAX if space hasn't been measured yet.
+ * @return bytes of available local disk space on this tasktracker.
+ */
+ public long getAvailableSpace() {
+ return availableSpace;
+ }
+
+ public void setAvailableSpace(long a) {
+ availableSpace = a;
+ }
+
+
///////////////////////////////////////////
// Writable
///////////////////////////////////////////
@@ -177,6 +194,7 @@
out.writeInt(maxMapTasks);
out.writeInt(maxReduceTasks);
out.writeInt(taskReports.size());
+ out.writeLong(availableSpace);
for (TaskStatus taskStatus : taskReports) {
TaskStatus.writeTaskStatus(out, taskStatus);
}
@@ -191,6 +209,7 @@
this.maxReduceTasks = in.readInt();
taskReports.clear();
int numTasks = in.readInt();
+ this.availableSpace = in.readLong();
for (int i = 0; i < numTasks; i++) {
taskReports.add(TaskStatus.readTaskStatus(in));
}