Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskRunner.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=785794&r1=785793&r2=785794&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskRunner.java (original) +++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskRunner.java Wed Jun 17 20:55:51 2009 @@ -17,21 +17,30 @@ */ package org.apache.hadoop.mapred; -import org.apache.commons.logging.*; - -import org.apache.hadoop.fs.*; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.filecache.*; -import org.apache.hadoop.util.*; - -import java.io.*; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintStream; import java.net.InetSocketAddress; +import java.net.URI; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Vector; -import java.net.URI; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FSError; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.Shell; +import org.apache.hadoop.util.StringUtils; /** Base class that runs a task in a separate process. Tasks are run in a * separate process in order to isolate the map/reduce system code from bugs in @@ -49,6 +58,9 @@ private int exitCode = -1; private boolean exitCodeSet = false; + private static String SYSTEM_PATH_SEPARATOR = System.getProperty("path.separator"); + + private TaskTracker tracker; protected JobConf conf; @@ -108,163 +120,40 @@ //all the archives TaskAttemptID taskid = t.getTaskID(); LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir"); - File jobCacheDir = null; - if (conf.getJar() != null) { - jobCacheDir = new File( - new Path(conf.getJar()).getParent().toString()); - } - File workDir = new File(lDirAlloc.getLocalPathToRead( - TaskTracker.getLocalTaskDir( - t.getJobID().toString(), - t.getTaskID().toString(), - t.isTaskCleanupTask()) - + Path.SEPARATOR + MRConstants.WORKDIR, - conf). toString()); - + File workDir = formWorkDir(lDirAlloc, taskid, t.isTaskCleanupTask(), conf); + URI[] archives = DistributedCache.getCacheArchives(conf); URI[] files = DistributedCache.getCacheFiles(conf); - FileStatus fileStatus; - FileSystem fileSystem; - Path localPath; - String baseDir; - - if ((archives != null) || (files != null)) { - if (archives != null) { - String[] archivesTimestamps = - DistributedCache.getArchiveTimestamps(conf); - Path[] p = new Path[archives.length]; - for (int i = 0; i < archives.length;i++){ - fileSystem = FileSystem.get(archives[i], conf); - fileStatus = fileSystem.getFileStatus( - new Path(archives[i].getPath())); - String cacheId = DistributedCache.makeRelative(archives[i],conf); - String cachePath = TaskTracker.getCacheSubdir() + - Path.SEPARATOR + cacheId; - - localPath = lDirAlloc.getLocalPathForWrite(cachePath, - fileStatus.getLen(), conf); - baseDir = localPath.toString().replace(cacheId, ""); - p[i] = DistributedCache.getLocalCache(archives[i], conf, - new Path(baseDir), - fileStatus, - true, Long.parseLong( - archivesTimestamps[i]), - new Path(workDir. - getAbsolutePath()), - false); - - } - DistributedCache.setLocalArchives(conf, stringifyPathArray(p)); - } - if ((files != null)) { - String[] fileTimestamps = DistributedCache.getFileTimestamps(conf); - Path[] p = new Path[files.length]; - for (int i = 0; i < files.length;i++){ - fileSystem = FileSystem.get(files[i], conf); - fileStatus = fileSystem.getFileStatus( - new Path(files[i].getPath())); - String cacheId = DistributedCache.makeRelative(files[i], conf); - String cachePath = TaskTracker.getCacheSubdir() + - Path.SEPARATOR + cacheId; - - localPath = lDirAlloc.getLocalPathForWrite(cachePath, - fileStatus.getLen(), conf); - baseDir = localPath.toString().replace(cacheId, ""); - p[i] = DistributedCache.getLocalCache(files[i], conf, - new Path(baseDir), - fileStatus, - false, Long.parseLong( - fileTimestamps[i]), - new Path(workDir. - getAbsolutePath()), - false); - } - DistributedCache.setLocalFiles(conf, stringifyPathArray(p)); - } - Path localTaskFile = new Path(t.getJobFile()); - FileSystem localFs = FileSystem.getLocal(conf); - localFs.delete(localTaskFile, true); - OutputStream out = localFs.create(localTaskFile); - try { - conf.writeXml(out); - } finally { - out.close(); - } - } + setupDistributedCache(lDirAlloc, workDir, archives, files); if (!prepare()) { return; } - String sep = System.getProperty("path.separator"); - StringBuffer classPath = new StringBuffer(); + // Accumulates class paths for child. + List<String> classPaths = new ArrayList<String>(); // start with same classpath as parent process - classPath.append(System.getProperty("java.class.path")); - classPath.append(sep); + appendSystemClasspaths(classPaths); + if (!workDir.mkdirs()) { if (!workDir.isDirectory()) { LOG.fatal("Mkdirs failed to create " + workDir.toString()); } } - - String jar = conf.getJar(); - if (jar != null) { - // if jar exists, it into workDir - File[] libs = new File(jobCacheDir, "lib").listFiles(); - if (libs != null) { - for (int i = 0; i < libs.length; i++) { - classPath.append(sep); // add libs from jar to classpath - classPath.append(libs[i]); - } - } - classPath.append(sep); - classPath.append(new File(jobCacheDir, "classes")); - classPath.append(sep); - classPath.append(jobCacheDir); - - } // include the user specified classpath + appendJobJarClasspaths(conf.getJar(), classPaths); - //archive paths - Path[] archiveClasspaths = DistributedCache.getArchiveClassPaths(conf); - if (archiveClasspaths != null && archives != null) { - Path[] localArchives = DistributedCache - .getLocalCacheArchives(conf); - if (localArchives != null){ - for (int i=0;i<archives.length;i++){ - for(int j=0;j<archiveClasspaths.length;j++){ - if (archives[i].getPath().equals( - archiveClasspaths[j].toString())){ - classPath.append(sep); - classPath.append(localArchives[i] - .toString()); - } - } - } - } - } - //file paths - Path[] fileClasspaths = DistributedCache.getFileClassPaths(conf); - if (fileClasspaths!=null && files != null) { - Path[] localFiles = DistributedCache - .getLocalCacheFiles(conf); - if (localFiles != null) { - for (int i = 0; i < files.length; i++) { - for (int j = 0; j < fileClasspaths.length; j++) { - if (files[i].getPath().equals( - fileClasspaths[j].toString())) { - classPath.append(sep); - classPath.append(localFiles[i].toString()); - } - } - } - } - } - - classPath.append(sep); - classPath.append(workDir); - // Build exec child jmv args. + // Distributed cache paths + appendDistributedCacheClasspaths(conf, archives, files, classPaths); + + // Include the working dir too + classPaths.add(workDir.toString()); + + // Build classpath + + + // Build exec child JVM args. Vector<String> vargs = new Vector<String>(8); File jvm = // use same jvm as parent new File(new File(System.getProperty("java.home"), "bin"), "java"); @@ -308,12 +197,12 @@ if (libraryPath == null) { libraryPath = workDir.getAbsolutePath(); } else { - libraryPath += sep + workDir; + libraryPath += SYSTEM_PATH_SEPARATOR + workDir; } boolean hasUserLDPath = false; for(int i=0; i<javaOptsSplit.length ;i++) { if(javaOptsSplit[i].startsWith("-Djava.library.path=")) { - javaOptsSplit[i] += sep + libraryPath; + javaOptsSplit[i] += SYSTEM_PATH_SEPARATOR + libraryPath; hasUserLDPath = true; break; } @@ -342,7 +231,8 @@ // Add classpath. vargs.add("-classpath"); - vargs.add(classPath.toString()); + String classPath = StringUtils.join(SYSTEM_PATH_SEPARATOR, classPaths); + vargs.add(classPath); // Setup the log4j prop long logSize = TaskLog.getTaskLogLength(conf); @@ -396,7 +286,7 @@ String oldLdLibraryPath = null; oldLdLibraryPath = System.getenv("LD_LIBRARY_PATH"); if (oldLdLibraryPath != null) { - ldLibraryPath.append(sep); + ldLibraryPath.append(SYSTEM_PATH_SEPARATOR); ldLibraryPath.append(oldLdLibraryPath); } env.put("LD_LIBRARY_PATH", ldLibraryPath.toString()); @@ -494,6 +384,156 @@ tip.reportTaskFinished(); } } + + /** Creates the working directory pathname for a task attempt. */ + static File formWorkDir(LocalDirAllocator lDirAlloc, + TaskAttemptID task, boolean isCleanup, JobConf conf) + throws IOException { + File workDir = new File(lDirAlloc.getLocalPathToRead( + TaskTracker.getLocalTaskDir(task.getJobID().toString(), + task.toString(), isCleanup) + + Path.SEPARATOR + MRConstants.WORKDIR, conf).toString()); + return workDir; + } + + private void setupDistributedCache(LocalDirAllocator lDirAlloc, File workDir, + URI[] archives, URI[] files) throws IOException { + FileStatus fileStatus; + FileSystem fileSystem; + Path localPath; + String baseDir; + if ((archives != null) || (files != null)) { + if (archives != null) { + String[] archivesTimestamps = + DistributedCache.getArchiveTimestamps(conf); + Path[] p = new Path[archives.length]; + for (int i = 0; i < archives.length;i++){ + fileSystem = FileSystem.get(archives[i], conf); + fileStatus = fileSystem.getFileStatus( + new Path(archives[i].getPath())); + String cacheId = DistributedCache.makeRelative(archives[i],conf); + String cachePath = TaskTracker.getCacheSubdir() + + Path.SEPARATOR + cacheId; + + localPath = lDirAlloc.getLocalPathForWrite(cachePath, + fileStatus.getLen(), conf); + baseDir = localPath.toString().replace(cacheId, ""); + p[i] = DistributedCache.getLocalCache(archives[i], conf, + new Path(baseDir), + fileStatus, + true, Long.parseLong( + archivesTimestamps[i]), + new Path(workDir. + getAbsolutePath()), + false); + + } + DistributedCache.setLocalArchives(conf, stringifyPathArray(p)); + } + if ((files != null)) { + String[] fileTimestamps = DistributedCache.getFileTimestamps(conf); + Path[] p = new Path[files.length]; + for (int i = 0; i < files.length;i++){ + fileSystem = FileSystem.get(files[i], conf); + fileStatus = fileSystem.getFileStatus( + new Path(files[i].getPath())); + String cacheId = DistributedCache.makeRelative(files[i], conf); + String cachePath = TaskTracker.getCacheSubdir() + + Path.SEPARATOR + cacheId; + + localPath = lDirAlloc.getLocalPathForWrite(cachePath, + fileStatus.getLen(), conf); + baseDir = localPath.toString().replace(cacheId, ""); + p[i] = DistributedCache.getLocalCache(files[i], conf, + new Path(baseDir), + fileStatus, + false, Long.parseLong( + fileTimestamps[i]), + new Path(workDir. + getAbsolutePath()), + false); + } + DistributedCache.setLocalFiles(conf, stringifyPathArray(p)); + } + Path localTaskFile = new Path(t.getJobFile()); + FileSystem localFs = FileSystem.getLocal(conf); + localFs.delete(localTaskFile, true); + OutputStream out = localFs.create(localTaskFile); + try { + conf.writeXml(out); + } finally { + out.close(); + } + } + } + + private void appendDistributedCacheClasspaths(JobConf conf, URI[] archives, + URI[] files, List<String> classPaths) throws IOException { + // Archive paths + Path[] archiveClasspaths = DistributedCache.getArchiveClassPaths(conf); + if (archiveClasspaths != null && archives != null) { + Path[] localArchives = DistributedCache.getLocalCacheArchives(conf); + if (localArchives != null){ + for (int i=0;i<archives.length;i++){ + for(int j=0;j<archiveClasspaths.length;j++){ + if (archives[i].getPath().equals( + archiveClasspaths[j].toString())){ + classPaths.add(localArchives[i].toString()); + } + } + } + } + } + + //file paths + Path[] fileClasspaths = DistributedCache.getFileClassPaths(conf); + if (fileClasspaths!=null && files != null) { + Path[] localFiles = DistributedCache + .getLocalCacheFiles(conf); + if (localFiles != null) { + for (int i = 0; i < files.length; i++) { + for (int j = 0; j < fileClasspaths.length; j++) { + if (files[i].getPath().equals( + fileClasspaths[j].toString())) { + classPaths.add(localFiles[i].toString()); + } + } + } + } + } + } + + private void appendSystemClasspaths(List<String> classPaths) { + for (String c : System.getProperty("java.class.path").split(SYSTEM_PATH_SEPARATOR)) { + classPaths.add(c); + } + } + + /** + * Given a "jobJar" (typically retrieved via {...@link Configuration.getJar()}), + * appends classpath entries for it, as well as its lib/ and classes/ + * subdirectories. + * + * @param jobJar Job jar from configuration + * @param classPaths Accumulator for class paths + */ + static void appendJobJarClasspaths(String jobJar, List<String> classPaths) { + if (jobJar == null) { + return; + + } + File jobCacheDir = new File(new Path(jobJar).getParent().toString()); + + // if jar exists, it into workDir + File[] libs = new File(jobCacheDir, "lib").listFiles(); + if (libs != null) { + for (File l : libs) { + classPaths.add(l.toString()); + } + } + classPaths.add(new File(jobCacheDir, "classes").toString()); + classPaths.add(jobCacheDir.toString()); + } //Mostly for setting up the symlinks. Note that when we setup the distributed //cache, we didn't create the symlinks. This is done on a per task basis
Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskStatus.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskStatus.java?rev=785794&r1=785793&r2=785794&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskStatus.java (original) +++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskStatus.java Wed Jun 17 20:55:51 2009 @@ -50,7 +50,7 @@ private String stateString; private String taskTracker; - private long startTime; + private long startTime; //in ms private long finishTime; private long outputSize; @@ -81,7 +81,9 @@ public TaskAttemptID getTaskID() { return taskid; } public abstract boolean getIsMap(); public float getProgress() { return progress; } - public void setProgress(float progress) { this.progress = progress; } + public void setProgress(float progress) { + this.progress = progress; + } public State getRunState() { return runState; } public String getTaskTracker() {return taskTracker;} public void setTaskTracker(String tracker) { this.taskTracker = tracker;} @@ -279,7 +281,7 @@ public List<TaskAttemptID> getFetchFailedMaps() { return null; } - + /** * Add to the list of maps from which output-fetches failed. * @@ -310,7 +312,7 @@ * @param status updated status */ synchronized void statusUpdate(TaskStatus status) { - this.progress = status.getProgress(); + setProgress (status.getProgress()); this.runState = status.getRunState(); this.stateString = status.getStateString(); this.nextRecordRange = status.getNextRecordRange(); @@ -397,7 +399,7 @@ public void readFields(DataInput in) throws IOException { this.taskid.readFields(in); - this.progress = in.readFloat(); + setProgress(in.readFloat()); this.runState = WritableUtils.readEnum(in, State.class); this.diagnosticInfo = Text.readString(in); this.stateString = Text.readString(in); Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=785794&r1=785793&r2=785794&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskTracker.java (original) +++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Jun 17 20:55:51 2009 @@ -778,7 +778,7 @@ // job-specific shared directory for use as scratch space Path workDir = lDirAlloc.getLocalPathForWrite( (getLocalJobDir(jobId.toString()) - + Path.SEPARATOR + "work"), fConf); + + Path.SEPARATOR + MRConstants.WORKDIR), fConf); if (!localFs.mkdirs(workDir)) { throw new IOException("Mkdirs failed to create " + workDir.toString()); Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=785794&r1=785793&r2=785794&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java (original) +++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java Wed Jun 17 20:55:51 2009 @@ -192,6 +192,12 @@ taskReports = new ArrayList<TaskStatus>(); resStatus = new ResourceStatus(); } + + public TaskTrackerStatus(String trackerName, String host) { + this(); + this.trackerName = trackerName; + this.host = host; + } /** */ Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/tools/MRAdmin.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/tools/MRAdmin.java?rev=785794&r1=785793&r2=785794&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/tools/MRAdmin.java (original) +++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/tools/MRAdmin.java Wed Jun 17 20:55:51 2009 @@ -30,7 +30,6 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UnixUserGroupInformation; import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol; -import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -54,15 +53,15 @@ private static void printHelp(String cmd) { String summary = "hadoop mradmin is the command to execute Map-Reduce administrative commands.\n" + "The full syntax is: \n\n" + - "hadoop mradmin [-refreshServiceAcl] [-refreshQueueAcls] [-help [cmd]] " + "hadoop mradmin [-refreshServiceAcl] [-refreshQueues] [-help [cmd]] " + "[-refreshNodes]\n"; String refreshServiceAcl = "-refreshServiceAcl: Reload the service-level authorization policy file\n" + "\t\tJobtracker will reload the authorization policy file.\n"; - String refreshQueueAcls = - "-refreshQueueAcls: Reload the queue acls\n" - + "\t\tJobTracker will reload the mapred-queue-acls.xml file.\n"; + String refreshQueues = + "-refreshQueues: Reload the queue acls and state.\n" + + "\t\tJobTracker will reload the mapred-queues.xml file.\n"; String refreshNodes = "-refreshNodes: Refresh the hosts information at the jobtracker.\n"; @@ -72,8 +71,8 @@ if ("refreshServiceAcl".equals(cmd)) { System.out.println(refreshServiceAcl); - } else if ("refreshQueueAcls".equals(cmd)) { - System.out.println(refreshQueueAcls); + } else if ("refreshQueues".equals(cmd)) { + System.out.println(refreshQueues); } else if ("refreshNodes".equals(cmd)) { System.out.println(refreshNodes); } else if ("help".equals(cmd)) { @@ -81,7 +80,7 @@ } else { System.out.println(summary); System.out.println(refreshServiceAcl); - System.out.println(refreshQueueAcls); + System.out.println(refreshQueues); System.out.println(refreshNodes); System.out.println(help); System.out.println(); @@ -97,14 +96,14 @@ private static void printUsage(String cmd) { if ("-refreshServiceAcl".equals(cmd)) { System.err.println("Usage: java MRAdmin" + " [-refreshServiceAcl]"); - } else if ("-refreshQueueAcls".equals(cmd)) { - System.err.println("Usage: java MRAdmin" + " [-refreshQueueAcls]"); + } else if ("-refreshQueues".equals(cmd)) { + System.err.println("Usage: java MRAdmin" + " [-refreshQueues]"); } else if ("-refreshNodes".equals(cmd)) { System.err.println("Usage: java MRAdmin" + " [-refreshNodes]"); } else { System.err.println("Usage: java MRAdmin"); System.err.println(" [-refreshServiceAcl]"); - System.err.println(" [-refreshQueueAcls]"); + System.err.println(" [-refreshQueues]"); System.err.println(" [-refreshNodes]"); System.err.println(" [-help [cmd]]"); System.err.println(); @@ -143,7 +142,7 @@ return 0; } - private int refreshQueueAcls() throws IOException { + private int refreshQueues() throws IOException { // Get the current configuration Configuration conf = getConf(); @@ -157,7 +156,7 @@ AdminOperationsProtocol.class)); // Refresh the queue properties - adminOperationsProtocol.refreshQueueAcls(); + adminOperationsProtocol.refreshQueues(); return 0; } @@ -201,7 +200,7 @@ // // verify that we have enough command line parameters // - if ("-refreshServiceAcl".equals(cmd) || "-refreshQueueAcls".equals(cmd) + if ("-refreshServiceAcl".equals(cmd) || "-refreshQueues".equals(cmd) || "-refreshNodes".equals(cmd)) { if (args.length != 1) { printUsage(cmd); @@ -213,8 +212,8 @@ try { if ("-refreshServiceAcl".equals(cmd)) { exitCode = refreshAuthorizationPolicy(); - } else if ("-refreshQueueAcls".equals(cmd)) { - exitCode = refreshQueueAcls(); + } else if ("-refreshQueues".equals(cmd)) { + exitCode = refreshQueues(); } else if ("-refreshNodes".equals(cmd)) { exitCode = refreshNodes(); } else if ("-help".equals(cmd)) { Propchange: hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Jun 17 20:55:51 2009 @@ -1,2 +1,2 @@ /hadoop/core/branches/branch-0.19/mapred/src/test/mapred:713112 -/hadoop/core/trunk/src/test/mapred:776175-784663 +/hadoop/core/trunk/src/test/mapred:776175-785643 Modified: hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java?rev=785794&r1=785793&r2=785794&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java (original) +++ hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java Wed Jun 17 20:55:51 2009 @@ -31,6 +31,7 @@ import junit.framework.TestCase; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; @@ -813,6 +814,7 @@ } finally { if (mr != null) { + cleanupLocalFiles(mr); mr.shutdown(); } } @@ -855,7 +857,7 @@ assertTrue("User log file " + logFile + " does not exist", fileSys.exists(logFile)); } - else if (conf.get("hadoop.job.history.user.location") == "none") { + else if ("none".equals(conf.get("hadoop.job.history.user.location"))) { // history file should not exist in the output path assertFalse("Unexpected. User log file exists in output dir when " + "hadoop.job.history.user.location is set to \"none\"", @@ -920,11 +922,24 @@ } finally { if (mr != null) { + cleanupLocalFiles(mr); mr.shutdown(); } } } + private void cleanupLocalFiles(MiniMRCluster mr) + throws IOException { + Configuration conf = mr.createJobConf(); + JobTracker jt = mr.getJobTrackerRunner().getJobTracker(); + Path sysDir = new Path(jt.getSystemDir()); + FileSystem fs = sysDir.getFileSystem(conf); + fs.delete(sysDir, true); + Path jobHistoryDir = JobHistory.getJobHistoryLocation(); + fs = jobHistoryDir.getFileSystem(conf); + fs.delete(jobHistoryDir, true); + } + /** * Checks if the history file has expected job status * @param id job id @@ -1001,6 +1016,7 @@ } finally { if (mr != null) { + cleanupLocalFiles(mr); mr.shutdown(); } } Modified: hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueInformation.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueInformation.java?rev=785794&r1=785793&r2=785794&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueInformation.java (original) +++ hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueInformation.java Wed Jun 17 20:55:51 2009 @@ -108,6 +108,8 @@ assertNotNull(queueInfos); assertEquals(1, queueInfos.length); assertEquals("default", queueInfos[0].getQueueName()); + assertEquals(Queue.QueueState.RUNNING.getStateName(), + queueInfos[0].getQueueState()); JobConf conf = mrCluster.createJobConf(); FileSystem fileSys = dfsCluster.getFileSystem(); Modified: hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=785794&r1=785793&r2=785794&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (original) +++ hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Wed Jun 17 20:55:51 2009 @@ -45,7 +45,7 @@ public FakeJobInProgress(JobConf jobConf, FakeTaskTrackerManager taskTrackerManager) throws IOException { - super(new JobID("test", ++jobCounter), jobConf); + super(new JobID("test", ++jobCounter), jobConf, null); this.taskTrackerManager = taskTrackerManager; this.startTime = System.currentTimeMillis(); this.status = new JobStatus(getJobID(), 0f, 0f, JobStatus.PREP); Modified: hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java?rev=785794&r1=785793&r2=785794&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java (original) +++ hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java Wed Jun 17 20:55:51 2009 @@ -42,7 +42,7 @@ public FakeJobInProgress(JobConf jobConf, FakeTaskTrackerManager taskTrackerManager) throws IOException { - super(new JobID("test", ++jobCounter), jobConf); + super(new JobID("test", ++jobCounter), jobConf, null); this.startTime = System.currentTimeMillis(); this.status = new JobStatus(getJobID(), 0f, 0f, JobStatus.PREP); this.status.setJobPriority(JobPriority.NORMAL); Modified: hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestQueueAclsForCurrentUser.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestQueueAclsForCurrentUser.java?rev=785794&r1=785793&r2=785794&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestQueueAclsForCurrentUser.java (original) +++ hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestQueueAclsForCurrentUser.java Wed Jun 17 20:55:51 2009 @@ -32,8 +32,8 @@ private QueueManager queueManager; private JobConf conf = null; UserGroupInformation currentUGI = null; - String submitAcl = QueueManager.QueueOperation.SUBMIT_JOB.getAclName(); - String adminAcl = QueueManager.QueueOperation.ADMINISTER_JOBS.getAclName(); + String submitAcl = Queue.QueueOperation.SUBMIT_JOB.getAclName(); + String adminAcl = Queue.QueueOperation.ADMINISTER_JOBS.getAclName(); private void setupConfForNoAccess() throws IOException,LoginException { currentUGI = UnixUserGroupInformation.login(); Modified: hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java?rev=785794&r1=785793&r2=785794&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java (original) +++ hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java Wed Jun 17 20:55:51 2009 @@ -222,7 +222,7 @@ String queueConfigPath = System.getProperty("test.build.extraconf", "build/test/extraconf"); File queueConfigFile = - new File(queueConfigPath, QueueManager.QUEUE_ACLS_FILE_NAME); + new File(queueConfigPath, QueueManager.QUEUE_CONF_FILE_NAME); File hadoopConfigFile = new File(queueConfigPath, "mapred-site.xml"); try { //Setting up default mapred-site.xml @@ -248,13 +248,13 @@ UserGroupInformation ugi = UnixUserGroupInformation.getCurrentUGI(); //Job Submission should fail because ugi to be used is set to blank. assertFalse("User Job Submission Succeeded before refresh.", - queueManager.hasAccess("default", QueueManager.QueueOperation. + queueManager.hasAccess("default", Queue.QueueOperation. SUBMIT_JOB, ugi)); assertFalse("User Job Submission Succeeded before refresh.", - queueManager.hasAccess("q1", QueueManager.QueueOperation. + queueManager.hasAccess("q1", Queue.QueueOperation. SUBMIT_JOB, ugi)); assertFalse("User Job Submission Succeeded before refresh.", - queueManager.hasAccess("q2", QueueManager.QueueOperation. + queueManager.hasAccess("q2", Queue.QueueOperation. SUBMIT_JOB, ugi)); //Test job submission as alternate user. @@ -263,7 +263,7 @@ UserGroupInformation alternateUgi = UserGroupInformation.readFrom(alternateUserConfig); assertTrue("Alternate User Job Submission failed before refresh.", - queueManager.hasAccess("q2", QueueManager.QueueOperation. + queueManager.hasAccess("q2", Queue.QueueOperation. SUBMIT_JOB, alternateUgi)); //Set acl for the current user. @@ -273,19 +273,19 @@ //write out queue-acls.xml. UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile); //refresh configuration - queueManager.refreshAcls(conf); + queueManager.refreshQueues(conf); //Submission should succeed assertTrue("User Job Submission failed after refresh.", - queueManager.hasAccess("default", QueueManager.QueueOperation. + queueManager.hasAccess("default", Queue.QueueOperation. SUBMIT_JOB, ugi)); assertTrue("User Job Submission failed after refresh.", - queueManager.hasAccess("q1", QueueManager.QueueOperation. + queueManager.hasAccess("q1", Queue.QueueOperation. SUBMIT_JOB, ugi)); assertTrue("User Job Submission failed after refresh.", - queueManager.hasAccess("q2", QueueManager.QueueOperation. + queueManager.hasAccess("q2", Queue.QueueOperation. SUBMIT_JOB, ugi)); assertFalse("Alternate User Job Submission succeeded after refresh.", - queueManager.hasAccess("q2", QueueManager.QueueOperation. + queueManager.hasAccess("q2", Queue.QueueOperation. SUBMIT_JOB, alternateUgi)); //delete the ACL file. queueConfigFile.delete(); @@ -294,9 +294,9 @@ hadoopConfProps.put("mapred.acls.enabled", "true"); hadoopConfProps.put("mapred.queue.default.acl-submit-job", ugi.getUserName()); UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile); - queueManager.refreshAcls(conf); + queueManager.refreshQueues(conf); assertTrue("User Job Submission failed after refresh and no queue acls file.", - queueManager.hasAccess("default", QueueManager.QueueOperation. + queueManager.hasAccess("default", Queue.QueueOperation. SUBMIT_JOB, ugi)); } finally{ if(queueConfigFile.exists()) { @@ -308,11 +308,98 @@ } } + + + /** + * Test to verify refreshing of queue properties by using MRAdmin tool. + * + * @throws Exception + */ + public void testStateRefresh() throws Exception { + String queueConfigPath = + System.getProperty("test.build.extraconf", "build/test/extraconf"); + File queueConfigFile = + new File(queueConfigPath, QueueManager.QUEUE_CONF_FILE_NAME); + try { + //Setting up default mapred-site.xml + Properties queueConfProps = new Properties(); + //these properties should be retained. + queueConfProps.put("mapred.queue.names", "default,qu1"); + queueConfProps.put("mapred.acls.enabled", "true"); + //These property should always be overridden + queueConfProps.put("mapred.queue.default.state", "running"); + queueConfProps.put("mapred.queue.qu1.state", "stopped"); + UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile); + + //Create a new configuration to be used with QueueManager + JobConf conf = new JobConf(); + setUpCluster(conf); + QueueManager queueManager = + this.miniMRCluster.getJobTrackerRunner().getJobTracker().getQueueManager(); + + try{ + Job job = submitSleepJob(10, 2, 10, 10, true,null, "default" ); + assert(job.isSuccessful()); + }catch(Exception e){ + fail("submit job in default queue should be sucessful "); + } + + try{ + submitSleepJob(10, 2, 10, 10, true,null, "qu1" ); + fail("submit job in default queue should be failed "); + }catch(Exception e){ + assert(e.getMessage().contains("Queue \"" + "qu1" + "\" is not running")); + } + + // verify state of queues before refresh + JobQueueInfo queueInfo = queueManager.getJobQueueInfo("default"); + assertEquals(Queue.QueueState.RUNNING.getStateName(), + queueInfo.getQueueState()); + queueInfo = queueManager.getJobQueueInfo("qu1"); + assertEquals(Queue.QueueState.STOPPED.getStateName(), + queueInfo.getQueueState()); + + queueConfProps.put("mapred.queue.default.state", "stopped"); + queueConfProps.put("mapred.queue.qu1.state", "running"); + UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile); + + //refresh configuration + queueManager.refreshQueues(conf); + + //Job Submission should pass now because ugi to be used is set to blank. + try{ + submitSleepJob(10, 2, 10, 10, true,null,"qu1"); + }catch(Exception e){ + fail("submit job in qu1 queue should be sucessful "); + } + + try{ + submitSleepJob(10, 2, 10, 10, true,null, "default" ); + fail("submit job in default queue should be failed "); + }catch(Exception e){ + assert(e.getMessage().contains("Queue \"" + "default" + "\" is not running")); + } + + // verify state of queues after refresh + queueInfo = queueManager.getJobQueueInfo("default"); + assertEquals(Queue.QueueState.STOPPED.getStateName(), + queueInfo.getQueueState()); + queueInfo = queueManager.getJobQueueInfo("qu1"); + assertEquals(Queue.QueueState.RUNNING.getStateName(), + queueInfo.getQueueState()); + } finally{ + if(queueConfigFile.exists()) { + queueConfigFile.delete(); + } + this.tearDownCluster(); + } + } + public void testQueueAclRefreshWithInvalidConfFile() throws IOException { String queueConfigPath = System.getProperty("test.build.extraconf", "build/test/extraconf"); File queueConfigFile = - new File(queueConfigPath, QueueManager.QUEUE_ACLS_FILE_NAME); + new File(queueConfigPath, QueueManager.QUEUE_CONF_FILE_NAME); File hadoopConfigFile = new File(queueConfigPath, "hadoop-site.xml"); try { // queue properties with which the cluster is started. @@ -333,13 +420,13 @@ QueueManager queueManager = new QueueManager(conf); //Testing access to queue. assertTrue("User Job Submission failed.", - queueManager.hasAccess("default", QueueManager.QueueOperation. + queueManager.hasAccess("default", Queue.QueueOperation. SUBMIT_JOB, ugi)); assertTrue("User Job Submission failed.", - queueManager.hasAccess("q1", QueueManager.QueueOperation. + queueManager.hasAccess("q1", Queue.QueueOperation. SUBMIT_JOB, ugi)); assertTrue("User Job Submission failed.", - queueManager.hasAccess("q2", QueueManager.QueueOperation. + queueManager.hasAccess("q2", Queue.QueueOperation. SUBMIT_JOB, ugi)); //Write out a new incomplete invalid configuration file. @@ -351,18 +438,18 @@ try { //Exception to be thrown by queue manager because configuration passed //is invalid. - queueManager.refreshAcls(conf); + queueManager.refreshQueues(conf); fail("Refresh of ACLs should have failed with invalid conf file."); } catch (Exception e) { } assertTrue("User Job Submission failed after invalid conf file refresh.", - queueManager.hasAccess("default", QueueManager.QueueOperation. + queueManager.hasAccess("default", Queue.QueueOperation. SUBMIT_JOB, ugi)); assertTrue("User Job Submission failed after invalid conf file refresh.", - queueManager.hasAccess("q1", QueueManager.QueueOperation. + queueManager.hasAccess("q1", Queue.QueueOperation. SUBMIT_JOB, ugi)); assertTrue("User Job Submission failed after invalid conf file refresh.", - queueManager.hasAccess("q2", QueueManager.QueueOperation. + queueManager.hasAccess("q2", Queue.QueueOperation. SUBMIT_JOB, ugi)); } finally { //Cleanup the configuration files in all cases Modified: hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java?rev=785794&r1=785793&r2=785794&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java (original) +++ hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java Wed Jun 17 20:55:51 2009 @@ -32,7 +32,7 @@ jc.setNumMapTasks(maps); jc.setNumReduceTasks(reduces); - JobInProgress jip = new JobInProgress(jid, jc); + JobInProgress jip = new JobInProgress(jid, jc, null); //unfortunately, we can't set job input size from here. ResourceEstimator re = new ResourceEstimator(jip); @@ -64,7 +64,7 @@ jc.setNumMapTasks(maps); jc.setNumReduceTasks(reduces); - JobInProgress jip = new JobInProgress(jid, jc) { + JobInProgress jip = new JobInProgress(jid, jc, null) { long getInputLength() { return singleMapInputSize*desiredMaps(); } Modified: hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java?rev=785794&r1=785793&r2=785794&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java (original) +++ hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java Wed Jun 17 20:55:51 2009 @@ -24,9 +24,6 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; -import java.io.BufferedReader; -import java.io.FileReader; -import java.io.FileNotFoundException; import java.text.DecimalFormat; import java.util.Arrays; import java.util.Enumeration; @@ -35,7 +32,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.examples.RandomWriter; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSTestUtil; @@ -650,7 +646,18 @@ return job; } + static class FakeClock extends Clock { + long time = 0; + + public void advance(long millis) { + time += millis; + } + @Override + long getTime() { + return time; + } + } // Mapper that fails static class FailMapper extends MapReduceBase implements Mapper<WritableComparable, Writable, WritableComparable, Writable> { Propchange: hadoop/core/branches/HADOOP-4687/mapred/src/webapps/job/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Jun 17 20:55:51 2009 @@ -1,2 +1,2 @@ /hadoop/core/branches/branch-0.19/mapred/src/webapps/job:713112 -/hadoop/core/trunk/src/webapps/job:776175-784663 +/hadoop/core/trunk/src/webapps/job:776175-785643 Modified: hadoop/core/branches/HADOOP-4687/mapred/src/webapps/job/jobqueue_details.jsp URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/webapps/job/jobqueue_details.jsp?rev=785794&r1=785793&r2=785794&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/mapred/src/webapps/job/jobqueue_details.jsp (original) +++ hadoop/core/branches/HADOOP-4687/mapred/src/webapps/job/jobqueue_details.jsp Wed Jun 17 20:55:51 2009 @@ -56,6 +56,7 @@ <a href="jobtracker.jsp"><%=trackerName%></a> </h1> <div> +State : <%= schedInfo.getQueueState() %> <br/> Scheduling Information : <%= schedulingInfoString.replaceAll("\n","<br/>") %> </div> <hr/> Modified: hadoop/core/branches/HADOOP-4687/mapred/src/webapps/job/jobtracker.jsp URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/webapps/job/jobtracker.jsp?rev=785794&r1=785793&r2=785794&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/mapred/src/webapps/job/jobtracker.jsp (original) +++ hadoop/core/branches/HADOOP-4687/mapred/src/webapps/job/jobtracker.jsp Wed Jun 17 20:55:51 2009 @@ -124,6 +124,7 @@ <thead style="font-weight: bold"> <tr> <td> Queue Name </td> +<td> State </td> <td> Scheduling Information</td> </tr> </thead> @@ -131,6 +132,7 @@ <% for(JobQueueInfo queue: queues) { String queueName = queue.getQueueName(); + String state = queue.getQueueState(); String schedulingInformation = queue.getSchedulingInfo(); if(schedulingInformation == null || schedulingInformation.trim().equals("")) { schedulingInformation = "NA"; @@ -138,6 +140,7 @@ %> <tr> <td><a href="jobqueue_details.jsp?queueName=<%=queueName%>"><%=queueName%></a></td> +<td><%=state%></td> <td><%=schedulingInformation.replaceAll("\n","<br/>") %> </td> </tr>
