Author: cutting Date: Wed May 24 15:36:55 2006 New Revision: 409261 URL: http://svn.apache.org/viewvc?rev=409261&view=rev Log: HADOOP-195. Improve performance of transfer of map outputs to reduce nodes by performing multiple transfers in parallel, each using a separate socket.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/conf/hadoop-default.xml lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=409261&r1=409260&r2=409261&view=diff ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Wed May 24 15:36:55 2006 @@ -58,6 +58,10 @@ 15. HADOOP-247. Fix sort progress to better handle exceptions. (Mahadev Konar via cutting) +16. HADOOP-195. Improve performance of the transfer of map outputs to + reduce nodes by performing multiple transfers in parallel, each on + a separate socket. (Sameer Paranjpye via cutting) + Release 0.2.1 - 2006-05-12 Modified: lucene/hadoop/trunk/conf/hadoop-default.xml URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?rev=409261&r1=409260&r2=409261&view=diff ============================================================================== --- lucene/hadoop/trunk/conf/hadoop-default.xml (original) +++ lucene/hadoop/trunk/conf/hadoop-default.xml Wed May 24 15:36:55 2006 @@ -230,6 +230,14 @@ </property> <property> + <name>mapred.reduce.parallel.copies</name> + <value>5</value> + <description>The default number of parallel transfers run by reduce + during the copy(shuffle) phase. + </description> +</property> + +<property> <name>mapred.task.timeout</name> <value>600000</value> <description>The number of milliseconds before a task will be Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java?rev=409261&r1=409260&r2=409261&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java Wed May 24 15:36:55 2006 @@ -23,6 +23,7 @@ import java.lang.reflect.InvocationTargetException; import java.net.InetSocketAddress; +import java.net.Socket; import java.util.logging.*; import java.io.*; @@ -184,6 +185,64 @@ values[i] = ((ObjectWritable)wrappedValues[i]).get(); return values; + } + + + /** Expert: Make an RPC call over the specified socket. Assumes that no other calls + * are in flight on this connection. */ + public static Object callRaw(Method method, Object[] params, + Socket sock, Configuration conf) + throws IOException { + + Invocation inv = new Invocation(method, params); + DataInputStream in = + new DataInputStream(new BufferedInputStream(sock.getInputStream())); + DataOutputStream out = + new DataOutputStream(new BufferedOutputStream(sock.getOutputStream())); + String name = new String("Client connection to " + + sock.getInetAddress().getHostName() + + ":" + sock.getPort()); + + try { + if (LOG.isLoggable(Level.FINE)) { + LOG.fine(name + " sending #0"); + } + + // write out method invocation + out.writeInt(0); + inv.write(out); + out.flush(); + + // read return value + int callId = in.readInt(); + + if (LOG.isLoggable(Level.FINE)) { + LOG.fine(name + " got response to call #" + callId); + } + + boolean isError = in.readBoolean(); + if (isError) { + throw new RemoteException(WritableUtils.readString(in), + WritableUtils.readString(in)); + } + else { + + Writable wrappedValue = (Writable)ObjectWritable.class.newInstance(); + if (wrappedValue instanceof Configurable) { + ((Configurable) wrappedValue).setConf(conf); + } + wrappedValue.readFields(in); + + return method.getReturnType() != Void.TYPE ? + ((ObjectWritable)wrappedValue).get() : null; + } + } + catch (InstantiationException e) { + throw new IOException(e.toString()); + } + catch (IllegalAccessException e) { + throw new IOException(e.toString()); + } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java?rev=409261&r1=409260&r2=409261&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java Wed May 24 15:36:55 2006 @@ -101,12 +101,27 @@ public void run() { LOG.info(getName() + ": starting"); while (running) { + Socket acceptedSock = null; try { - new Connection(socket.accept()).start(); // start a new connection + acceptedSock = socket.accept(); + new Connection(acceptedSock).start(); // start a new connection } catch (SocketTimeoutException e) { // ignore timeouts - } catch (Exception e) { // log all other exceptions - LOG.log(Level.INFO, getName() + " caught: " + e, e); + } catch (OutOfMemoryError e) { + // we can run out of memory if we have too many threads + // log the event and sleep for a minute and give + // some thread(s) a chance to finish + LOG.log(Level.WARNING, + getName() + " out of memory, sleeping...", e); + try { + acceptedSock.close(); + Thread.sleep(60000); + } catch (InterruptedException ie) { // ignore interrupts + } catch (IOException ioe) { // ignore IOexceptions + } } + catch (Exception e) { // log all other exceptions + LOG.log(Level.INFO, getName() + " caught: " + e, e); + } } try { socket.close(); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java?rev=409261&r1=409260&r2=409261&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java Wed May 24 15:36:55 2006 @@ -41,6 +41,7 @@ private String reduceTaskId; private int mapId; private int partition; + private long size; /** Permits reporting of file copy progress. */ public interface ProgressReporter { @@ -101,6 +102,10 @@ private FileSystem getLocalFs() throws IOException { return FileSystem.getNamed("local", this.jobConf); } + + public long getSize() { + return size; + } public void write(DataOutput out) throws IOException { UTF8.writeString(out, mapTaskId); @@ -112,7 +117,8 @@ FSDataInputStream in = null; try { // write the length-prefixed file content to the wire - out.writeLong(getLocalFs().getLength(file)); + this.size = getLocalFs().getLength(file); + out.writeLong(this.size); in = getLocalFs().open(file); } catch (FileNotFoundException e) { TaskTracker.LOG.log(Level.SEVERE, "Can't open map output:" + file, e); @@ -120,7 +126,7 @@ throw e; } try { - byte[] buffer = new byte[8192]; + byte[] buffer = new byte[65536]; int l = 0; while (l != -1) { @@ -149,11 +155,13 @@ // read the length-prefixed file content into a local file Path file = getInputFile(mapId, reduceTaskId); long length = in.readLong(); + this.size = length; + float progPerByte = 1.0f / length; long unread = length; FSDataOutputStream out = getLocalFs().create(file); try { - byte[] buffer = new byte[8192]; + byte[] buffer = new byte[65536]; while (unread > 0) { int bytesToRead = (int)Math.min(unread, buffer.length); in.readFully(buffer, 0, bytesToRead); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?rev=409261&r1=409260&r2=409261&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Wed May 24 15:36:55 2006 @@ -21,122 +21,423 @@ import java.io.*; import java.net.*; import java.util.*; -import java.util.logging.*; +import java.lang.reflect.Method; +import java.text.DecimalFormat; + /** Runs a reduce task. */ class ReduceTaskRunner extends TaskRunner { + + /** + * for cleaning up old map outputs + */ private MapOutputFile mapOutputFile; + + /** + * our reduce task instance + */ + private ReduceTask reduceTask; + + /** + * the list of map outputs currently being copied + */ + private List scheduledCopies; + + /** + * the results of dispatched copy attempts + */ + private List copyResults; + + /** + * the number of outputs to copy in parallel + */ + private int numCopiers; + + /** + * timeout for copy operations + */ + private int copyTimeout; + + /** + * the maximum amount of time (less 1 minute) to wait to + * contact a host after a copy from it fails. We wait for (1 min + + * Random.nextInt(maxBackoff)) seconds. + */ + private int maxBackoff; + + /** + * busy hosts from which copies are being backed off + * Map of host -> next contact time + */ + private Map penaltyBox; + + /** + * the set of unique hosts from which we are copying + */ + private Set uniqueHosts; + + /** + * the last time we polled the job tracker + */ + private long lastPollTime; + + /** + * the minimum interval between jobtracker polls + */ + private static final long MIN_POLL_INTERVAL = 5000; + + /** + * the number of map output locations to poll for at one time + */ + private static final int PROBE_SAMPLE_SIZE = 50; + + // initialization code to resolve "getFile" to a method object + private static Method getFileMethod = null; + static { + Class[] paramTypes = { String.class, String.class, + int.class, int.class }; + try { + getFileMethod = + MapOutputProtocol.class.getDeclaredMethod("getFile", paramTypes); + } + catch (NoSuchMethodException e) { + LOG.severe(StringUtils.stringifyException(e)); + throw new RuntimeException("Can't find \"getFile\" method " + + "of MapOutputProtocol", e); + } + } + + /** Represents the result of an attempt to copy a map output */ + private class CopyResult { + + // the map output location against which a copy attempt was made + private final MapOutputLocation loc; + + // the size of the file copied, -1 if the transfer failed + private final long size; + + CopyResult(MapOutputLocation loc, long size) { + this.loc = loc; + this.size = size; + } + + public int getMapId() { return loc.getMapId(); } + public boolean getSuccess() { return size >= 0; } + public long getSize() { return size; } + public String getHost() { return loc.getHost(); } + public MapOutputLocation getLocation() { return loc; } + } + + /** Copies map outputs as they become available */ + private class MapOutputCopier extends Thread { + public MapOutputCopier() { + } + + /** Loop forever and fetch map outputs as they become available. + * The thread exits when it is interrupted by the [EMAIL PROTECTED] ReduceTaskRunner} + */ + public void run() { + try { + while (true) { + MapOutputLocation loc = null; + long size = -1; + + synchronized (scheduledCopies) { + while (scheduledCopies.isEmpty()) { + scheduledCopies.wait(); + } + loc = (MapOutputLocation)scheduledCopies.remove(0); + } + + try { + size = copyOutput(loc); + } catch (IOException e) { + LOG.warning(reduceTask.getTaskId() + " copy failed: " + + loc.getMapTaskId() + " from " + loc.getHost()); + LOG.warning(StringUtils.stringifyException(e)); + } + + synchronized (copyResults) { + copyResults.add(new CopyResult(loc, size)); + copyResults.notifyAll(); + } + } + } catch (InterruptedException e) { } // ALL DONE! + } + + /** Copies a a map output from a remote host, using raw RPC. + * @param loc the map output location to be copied + * @return the size of the copied file + * @throws IOException if there is an error copying the file + */ + private long copyOutput(MapOutputLocation loc) + throws IOException { + + Object[] params = new Object[4]; + params[0] = loc.getMapTaskId(); + params[1] = reduceTask.getTaskId(); + params[2] = new Integer(loc.getMapId()); + params[3] = new Integer(reduceTask.getPartition()); + + LOG.info(reduceTask.getTaskId() + " copy started: " + + loc.getMapTaskId() + " from " + loc.getHost()); + + Socket sock = new Socket(loc.getHost(), loc.getPort()); + try { + sock.setSoTimeout(copyTimeout); + + // this copies the map output file + MapOutputFile file = + (MapOutputFile)RPC.callRaw(getFileMethod, params, sock, conf); + + LOG.info(reduceTask.getTaskId() + " copy finished: " + + loc.getMapTaskId() + " from " + loc.getHost()); + + return file.getSize(); + } + finally { + try { + sock.close(); + } catch (IOException e) { } // IGNORE + } + } + + } + public ReduceTaskRunner(Task task, TaskTracker tracker, JobConf conf) { super(task, tracker, conf); this.mapOutputFile = new MapOutputFile(); this.mapOutputFile.setConf(conf); + + this.reduceTask = (ReduceTask)getTask(); + this.scheduledCopies = new ArrayList(100); + this.copyResults = new ArrayList(100); + this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5); + this.copyTimeout = conf.getInt("ipc.client.timeout", 10000); + this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300); + + // hosts -> next contact time + this.penaltyBox = new Hashtable(); + + // hostnames + this.uniqueHosts = new HashSet(); + + this.lastPollTime = 0; } - /** Assemble all of the map output files. */ + /** Assemble all of the map output files */ public boolean prepare() throws IOException { - ReduceTask task = ((ReduceTask)getTask()); - this.mapOutputFile.removeAll(task.getTaskId()); // cleanup from failures - int numMaps = task.getNumMaps(); + + // cleanup from failures + this.mapOutputFile.removeAll(reduceTask.getTaskId()); + + final int numOutputs = reduceTask.getNumMaps(); + List neededOutputs = new ArrayList(numOutputs); + List knownOutputs = new ArrayList(100); + int numInFlight = 0, numCopied = 0; + int lowThreshold = numCopiers*2; + long bytesTransferred = 0; + DecimalFormat mbpsFormat = new DecimalFormat("0.00"); + Random backoff = new Random(); final Progress copyPhase = getTask().getProgress().phase(); - - // we need input from every map task - List needed = new ArrayList(numMaps); - for (int i = 0; i < numMaps; i++) { - needed.add(new Integer(i)); - copyPhase.addPhase(); // add sub-phase per file + + for (int i = 0; i < numOutputs; i++) { + neededOutputs.add(new Integer(i)); + copyPhase.addPhase(); // add sub-phase per file } InterTrackerProtocol jobClient = getTracker().getJobClient(); - while (needed.size() > 0) { - LOG.info(task.getTaskId()+" Need "+needed.size()+" map output(s)."); - getTask().reportProgress(getTracker()); - - // query for a just a random subset of needed segments so that we don't - // overwhelm jobtracker. ideally perhaps we could send a more compact - // representation of all needed, i.e., a bit-vector - int checkSize = Math.min(10, needed.size()); - int[] neededIds = new int[checkSize]; - Collections.shuffle(needed); - ListIterator itr = needed.listIterator(); - for (int i = 0; i < checkSize; i++) { - neededIds[i] = ((Integer) itr.next()).intValue(); - } - MapOutputLocation[] locs = null; - try { - locs = jobClient.locateMapOutputs(task.getJobId().toString(), - neededIds, task.getPartition()); - } catch (IOException ie) { - LOG.info("Problem locating map outputs: " + - StringUtils.stringifyException(ie)); - } - if (locs == null || locs.length == 0) { - try { - if (killed) { - return false; - } - LOG.info(task.getTaskId()+" No map outputs available; sleeping..."); - Thread.sleep(10000); - } catch (InterruptedException e) { - } - continue; - } - - LOG.info(task.getTaskId()+" Got "+locs.length+" map output locations."); - - // try each of these locations - for (int i = 0; i < locs.length; i++) { - MapOutputLocation loc = locs[i]; - InetSocketAddress addr = - new InetSocketAddress(loc.getHost(), loc.getPort()); - MapOutputProtocol client = - (MapOutputProtocol)RPC.getProxy(MapOutputProtocol.class, addr, this.conf); - - this.mapOutputFile.setProgressReporter(new MapOutputFile.ProgressReporter() { - public void progress(float progress) { - copyPhase.phase().set(progress); - try { - getTask().reportProgress(getTracker()); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - }); + MapOutputCopier[] copiers = new MapOutputCopier[numCopiers]; - getTask().reportProgress(getTracker()); + // start all the copying threads + for (int i=0; i < copiers.length; i++) { + copiers[i] = new MapOutputCopier(); + copiers[i].start(); + } + + // start the clock for bandwidth measurement + long startTime = System.currentTimeMillis(); + long currentTime = startTime; + + // loop until we get all required outputs or are killed + while (!killed && numCopied < numOutputs) { + + LOG.info(reduceTask.getTaskId() + " Need " + (numOutputs-numCopied) + + " map output(s)"); + + if (!neededOutputs.isEmpty()) { + LOG.info(reduceTask.getTaskId() + " Need " + neededOutputs.size() + + " map output location(s)"); try { - copyPhase.phase().setStatus(loc.toString()); + MapOutputLocation[] locs = queryJobTracker(neededOutputs, jobClient); - LOG.info(task.getTaskId()+" Copying "+loc.getMapTaskId() - +" output from "+loc.getHost()+"."); - client.getFile(loc.getMapTaskId(), task.getTaskId(), - loc.getMapId(), - task.getPartition()); - - // Success: remove from 'needed' - for (Iterator it = needed.iterator(); it.hasNext(); ) { - int mapId = ((Integer) it.next()).intValue(); - if (mapId == loc.getMapId()) { - it.remove(); - break; - } + // remove discovered outputs from needed list + // and put them on the known list + for (int i=0; i < locs.length; i++) { + neededOutputs.remove(new Integer(locs[i].getMapId())); + knownOutputs.add(locs[i]); + } + LOG.info(reduceTask.getTaskId() + + " Got " + (locs == null ? 0 : locs.length) + + " map outputs from jobtracker"); + } + catch (IOException ie) { + LOG.warning(reduceTask.getTaskId() + + " Problem locating map outputs: " + + StringUtils.stringifyException(ie)); + } + } + + // now walk through the cache and schedule what we can + int numKnown = knownOutputs.size(), numScheduled = 0; + int numSlow = 0, numDups = 0; + + LOG.info(reduceTask.getTaskId() + " Got " + numKnown + + " known map output location(s); scheduling..."); + + synchronized (scheduledCopies) { + ListIterator locIt = knownOutputs.listIterator(); + + currentTime = System.currentTimeMillis(); + while (locIt.hasNext()) { + + MapOutputLocation loc = (MapOutputLocation)locIt.next(); + Long penaltyEnd = (Long)penaltyBox.get(loc.getHost()); + boolean penalized = false, duplicate = false; + + if (penaltyEnd != null && currentTime < penaltyEnd.longValue()) { + penalized = true; numSlow++; } + if (uniqueHosts.contains(loc.getHost())) { + duplicate = true; numDups++; + } + + if (!penalized && !duplicate) { + uniqueHosts.add(loc.getHost()); + scheduledCopies.add(loc); + locIt.remove(); // remove from knownOutputs + numInFlight++; numScheduled++; + } + } + scheduledCopies.notifyAll(); + } + LOG.info(reduceTask.getTaskId() + " Scheduled " + numScheduled + + " of " + numKnown + " known outputs (" + numSlow + + " slow hosts and " + numDups + " dup hosts)"); + + // if we have no copies in flight and we can't schedule anything + // new, just wait for a bit + try { + if (numInFlight == 0 && numScheduled == 0) { + Thread.sleep(5000); + } + } catch (InterruptedException e) { } // IGNORE + + while (!killed && numInFlight > 0) { + CopyResult cr = getCopyResult(); + + if (cr.getSuccess()) { // a successful copy + numCopied++; + bytesTransferred += cr.getSize(); + + long secsSinceStart = (System.currentTimeMillis()-startTime)/1000+1; + float mbs = ((float)bytesTransferred)/(1024*1024); + float transferRate = mbs/secsSinceStart; + copyPhase.startNextPhase(); + copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs + " at " + + mbpsFormat.format(transferRate) + " MB/s)"); + getTask().reportProgress(getTracker()); + } + else { + // this copy failed, put it back onto neededOutputs + neededOutputs.add(new Integer(cr.getMapId())); - } catch (IOException e) { // failed: try again later - LOG.log(Level.WARNING, - task.getTaskId()+" copy failed: " - +loc.getMapTaskId()+" from "+addr, - e); - } finally { - this.mapOutputFile.setProgressReporter(null); + // wait a random amount of time for next contact + currentTime = System.currentTimeMillis(); + long nextContact = currentTime + 60 * 1000 + + backoff.nextInt(maxBackoff*1000); + penaltyBox.put(cr.getHost(), new Long(nextContact)); + LOG.warning(reduceTask.getTaskId() + " adding host " + + cr.getHost() + " to penalty box, next contact in " + + ((nextContact-currentTime)/1000) + " seconds"); + } + uniqueHosts.remove(cr.getHost()); + numInFlight--; + + // ensure we have enough to keep us busy + if (numInFlight < lowThreshold && (numOutputs-numCopied) > PROBE_SAMPLE_SIZE) { + break; } } + + } + + // all done, inform the copiers to exit + synchronized (scheduledCopies) { + for (int i=0; i < copiers.length; i++) { + copiers[i].interrupt(); + } + } + + return numCopied == numOutputs && !killed; + } + + + private CopyResult getCopyResult() { + CopyResult cr = null; + + synchronized (copyResults) { + while (copyResults.isEmpty()) { + try { + copyResults.wait(); + } catch (InterruptedException e) { } + } + cr = (CopyResult)copyResults.remove(0); + } + return cr; + } + + /** Queries the job tracker for a set of outputs ready to be copied + * @param neededOutputs the list of currently unknown outputs + * @param jobClient the job tracker + * @return a set of locations to copy outputs from + * @throws IOException + */ + private MapOutputLocation[] queryJobTracker(List neededOutputs, + InterTrackerProtocol jobClient) + throws IOException { + + // query for a just a random subset of needed segments so that we don't + // overwhelm jobtracker. ideally perhaps we could send a more compact + // representation of all needed, i.e., a bit-vector + int checkSize = Math.min(PROBE_SAMPLE_SIZE, neededOutputs.size()); + int neededIds[] = new int[checkSize]; + + Collections.shuffle(neededOutputs); + + ListIterator itr = neededOutputs.listIterator(); + for (int i=0; i < checkSize; i++) { + neededIds[i] = ((Integer)itr.next()).intValue(); } - getTask().reportProgress(getTracker()); - return true; + + long currentTime = System.currentTimeMillis(); + long pollTime = lastPollTime + MIN_POLL_INTERVAL; + while (currentTime < pollTime) { + try { + Thread.sleep(pollTime-currentTime); + } catch (InterruptedException ie) { } // IGNORE + currentTime = System.currentTimeMillis(); + } + lastPollTime = pollTime; + + return jobClient.locateMapOutputs(reduceTask.getJobId().toString(), + neededIds, + reduceTask.getPartition()); } + /** Delete all of the temporary map output files. */ public void close() throws IOException { getTask().getProgress().setStatus("closed"); Modified: lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp?rev=409261&r1=409260&r2=409261&view=diff ============================================================================== --- lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp (original) +++ lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp Wed May 24 15:36:55 2006 @@ -90,7 +90,10 @@ %> <html> +<head> +<meta http-equiv="refresh" content=60> <title>Hadoop <%=jobid%> on <%=trackerName%></title> +</head> <body> <h1>Hadoop <%=jobid%> on <a href="/jobtracker.jsp"><%=trackerName%></a></h1>