Modified: hadoop/core/branches/HADOOP-4687/core/src/java/org/apache/hadoop/util/ProcessTree.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/core/src/java/org/apache/hadoop/util/ProcessTree.java?rev=784965&r1=784964&r2=784965&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/core/src/java/org/apache/hadoop/util/ProcessTree.java (original) +++ hadoop/core/branches/HADOOP-4687/core/src/java/org/apache/hadoop/util/ProcessTree.java Mon Jun 15 20:28:06 2009 @@ -54,73 +54,24 @@ } /** - * Kills the process(OR process group) by sending the signal SIGKILL - * in the current thread - * @param pid Process id(OR process group id) of to-be-deleted-process - * @param isProcessGroup Is pid a process group id of to-be-deleted-processes - * @param sleepTimeBeforeSigKill wait time before sending SIGKILL after - * sending SIGTERM - */ - private static void sigKillInCurrentThread(String pid, boolean isProcessGroup, - long sleepTimeBeforeSigKill) { - // Kill the subprocesses of root process(even if the root process is not - // alive) if process group is to be killed. - if (isProcessGroup || ProcessTree.isAlive(pid)) { - try { - // Sleep for some time before sending SIGKILL - Thread.sleep(sleepTimeBeforeSigKill); - } catch (InterruptedException i) { - LOG.warn("Thread sleep is interrupted."); - } - - ShellCommandExecutor shexec = null; - - try { - String pid_pgrpid; - if(isProcessGroup) {//kill the whole process group - pid_pgrpid = "-" + pid; - } - else {//kill single process - pid_pgrpid = pid; - } - - String[] args = { "kill", "-9", pid_pgrpid }; - shexec = new ShellCommandExecutor(args); - shexec.execute(); - } catch (IOException ioe) { - LOG.warn("Error executing shell command " + ioe); - } finally { - if(isProcessGroup) { - LOG.info("Killing process group" + pid + " with SIGKILL. Exit code " - + shexec.getExitCode()); - } - else { - LOG.info("Killing process " + pid + " with SIGKILL. Exit code " - + shexec.getExitCode()); - } - } - } - } - - /** Kills the process(OR process group) by sending the signal SIGKILL - * @param pid Process id(OR process group id) of to-be-deleted-process - * @param isProcessGroup Is pid a process group id of to-be-deleted-processes + * Destroy the process-tree. + * @param pid process id of the root process of the subtree of processes + * to be killed * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL * after sending SIGTERM + * @param isProcessGroup pid is a process group leader or not * @param inBackground Process is to be killed in the back ground with * a separate thread */ - private static void sigKill(String pid, boolean isProcessGroup, - long sleeptimeBeforeSigkill, boolean inBackground) { - - if(inBackground) { // use a separate thread for killing - SigKillThread sigKillThread = new SigKillThread(pid, isProcessGroup, - sleeptimeBeforeSigkill); - sigKillThread.setDaemon(true); - sigKillThread.start(); + public static void destroy(String pid, long sleeptimeBeforeSigkill, + boolean isProcessGroup, boolean inBackground) { + if(isProcessGroup) { + destroyProcessGroup(pid, sleeptimeBeforeSigkill, inBackground); } else { - sigKillInCurrentThread(pid, isProcessGroup, sleeptimeBeforeSigkill); + //TODO: Destroy all the processes in the subtree in this case also. + // For the time being, killing only the root process. + destroyProcess(pid, sleeptimeBeforeSigkill, inBackground); } } @@ -133,6 +84,29 @@ */ protected static void destroyProcess(String pid, long sleeptimeBeforeSigkill, boolean inBackground) { + terminateProcess(pid); + sigKill(pid, false, sleeptimeBeforeSigkill, inBackground); + } + + /** Destroy the process group. + * @param pgrpId Process group id of to-be-killed-processes + * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL + * after sending SIGTERM + * @param inBackground Process group is to be killed in the back ground with + * a separate thread + */ + protected static void destroyProcessGroup(String pgrpId, + long sleeptimeBeforeSigkill, boolean inBackground) { + terminateProcessGroup(pgrpId); + sigKill(pgrpId, true, sleeptimeBeforeSigkill, inBackground); + } + + /** + * Sends terminate signal to the process, allowing it to gracefully exit. + * + * @param pid pid of the process to be sent SIGTERM + */ + public static void terminateProcess(String pid) { ShellCommandExecutor shexec = null; try { String[] args = { "kill", pid }; @@ -144,19 +118,15 @@ LOG.info("Killing process " + pid + " with SIGTERM. Exit code " + shexec.getExitCode()); } - - sigKill(pid, false, sleeptimeBeforeSigkill, inBackground); } - - /** Destroy the process group. - * @param pgrpId Process group id of to-be-killed-processes - * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL - * after sending SIGTERM - * @param inBackground Process group is to be killed in the back ground with - * a separate thread + + /** + * Sends terminate signal to all the process belonging to the passed process + * group, allowing the group to gracefully exit. + * + * @param pgrpId process group id */ - protected static void destroyProcessGroup(String pgrpId, - long sleeptimeBeforeSigkill, boolean inBackground) { + public static void terminateProcessGroup(String pgrpId) { ShellCommandExecutor shexec = null; try { String[] args = { "kill", "--", "-" + pgrpId }; @@ -168,37 +138,115 @@ LOG.info("Killing all processes in the process group " + pgrpId + " with SIGTERM. Exit code " + shexec.getExitCode()); } - - sigKill(pgrpId, true, sleeptimeBeforeSigkill, inBackground); } /** - * Destroy the process-tree. - * @param pid process id of the root process of the subtree of processes - * to be killed + * Kills the process(OR process group) by sending the signal SIGKILL + * in the current thread + * @param pid Process id(OR process group id) of to-be-deleted-process + * @param isProcessGroup Is pid a process group id of to-be-deleted-processes + * @param sleepTimeBeforeSigKill wait time before sending SIGKILL after + * sending SIGTERM + */ + private static void sigKillInCurrentThread(String pid, boolean isProcessGroup, + long sleepTimeBeforeSigKill) { + // Kill the subprocesses of root process(even if the root process is not + // alive) if process group is to be killed. + if (isProcessGroup || ProcessTree.isAlive(pid)) { + try { + // Sleep for some time before sending SIGKILL + Thread.sleep(sleepTimeBeforeSigKill); + } catch (InterruptedException i) { + LOG.warn("Thread sleep is interrupted."); + } + if(isProcessGroup) { + killProcessGroup(pid); + } else { + killProcess(pid); + } + } + } + + + /** Kills the process(OR process group) by sending the signal SIGKILL + * @param pid Process id(OR process group id) of to-be-deleted-process + * @param isProcessGroup Is pid a process group id of to-be-deleted-processes * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL * after sending SIGTERM - * @param isProcessGroup pid is a process group leader or not * @param inBackground Process is to be killed in the back ground with * a separate thread */ - public static void destroy(String pid, long sleeptimeBeforeSigkill, - boolean isProcessGroup, boolean inBackground) { - if(isProcessGroup) { - destroyProcessGroup(pid, sleeptimeBeforeSigkill, inBackground); + private static void sigKill(String pid, boolean isProcessGroup, + long sleeptimeBeforeSigkill, boolean inBackground) { + + if(inBackground) { // use a separate thread for killing + SigKillThread sigKillThread = new SigKillThread(pid, isProcessGroup, + sleeptimeBeforeSigkill); + sigKillThread.setDaemon(true); + sigKillThread.start(); } else { - //TODO: Destroy all the processes in the subtree in this case also. - // For the time being, killing only the root process. - destroyProcess(pid, sleeptimeBeforeSigkill, inBackground); + sigKillInCurrentThread(pid, isProcessGroup, sleeptimeBeforeSigkill); + } + } + + /** + * Sends kill signal to process, forcefully terminating the process. + * + * @param pid process id + */ + public static void killProcess(String pid) { + + //If process tree is not alive then return immediately. + if(!ProcessTree.isAlive(pid)) { + return; + } + String[] args = { "kill", "-9", pid }; + ShellCommandExecutor shexec = new ShellCommandExecutor(args); + try { + shexec.execute(); + } catch (IOException e) { + LOG.warn("Error sending SIGKILL to process "+ pid + " ."+ + StringUtils.stringifyException(e)); + } finally { + LOG.info("Killing process " + pid + " with SIGKILL. Exit code " + + shexec.getExitCode()); } } + /** + * Sends kill signal to all process belonging to same process group, + * forcefully terminating the process group. + * + * @param pgrpId process group id + */ + public static void killProcessGroup(String pgrpId) { + + //If process tree is not alive then return immediately. + if(!ProcessTree.isProcessGroupAlive(pgrpId)) { + return; + } + String[] args = { "kill", "-9", "-"+pgrpId }; + ShellCommandExecutor shexec = new ShellCommandExecutor(args); + try { + shexec.execute(); + } catch (IOException e) { + LOG.warn("Error sending SIGKILL to process group "+ pgrpId + " ."+ + StringUtils.stringifyException(e)); + } finally { + LOG.info("Killing process group" + pgrpId + " with SIGKILL. Exit code " + + shexec.getExitCode()); + } + } + /** * Is the process with PID pid still alive? * This method assumes that isAlive is called on a pid that was alive not * too long ago, and hence assumes no chance of pid-wrapping-around. + * + * @param pid pid of the process to check. + * @return true if process is alive. */ public static boolean isAlive(String pid) { ShellCommandExecutor shexec = null; @@ -215,6 +263,32 @@ } return (shexec.getExitCode() == 0 ? true : false); } + + /** + * Is the process group with still alive? + * + * This method assumes that isAlive is called on a pid that was alive not + * too long ago, and hence assumes no chance of pid-wrapping-around. + * + * @param pgrpId process group id + * @return true if any of process in group is alive. + */ + public static boolean isProcessGroupAlive(String pgrpId) { + ShellCommandExecutor shexec = null; + try { + String[] args = { "kill", "-0", "-"+pgrpId }; + shexec = new ShellCommandExecutor(args); + shexec.execute(); + } catch (ExitCodeException ee) { + return false; + } catch (IOException ioe) { + LOG.warn("Error executing shell command " + + Arrays.toString(shexec.getExecString()) + ioe); + return false; + } + return (shexec.getExitCode() == 0 ? true : false); + } + /** * Helper thread class that kills process-tree with SIGKILL in background
Modified: hadoop/core/branches/HADOOP-4687/core/src/java/org/apache/hadoop/util/ProcfsBasedProcessTree.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/core/src/java/org/apache/hadoop/util/ProcfsBasedProcessTree.java?rev=784965&r1=784964&r2=784965&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/core/src/java/org/apache/hadoop/util/ProcfsBasedProcessTree.java (original) +++ hadoop/core/branches/HADOOP-4687/core/src/java/org/apache/hadoop/util/ProcfsBasedProcessTree.java Mon Jun 15 20:28:06 2009 @@ -47,6 +47,10 @@ private static final Pattern PROCFS_STAT_FILE_FORMAT = Pattern .compile("^([0-9-]+)\\s([^\\s]+)\\s[^\\s]\\s([0-9-]+)\\s([0-9-]+)\\s([0-9-]+)\\s([0-9-]+\\s){16}([0-9]+)(\\s[0-9-]+){16}"); + // to enable testing, using this variable which can be configured + // to a test directory. + private String procfsDir; + private Integer pid = -1; private boolean setsidUsed = false; private long sleeptimeBeforeSigkill = DEFAULT_SLEEPTIME_BEFORE_SIGKILL; @@ -59,11 +63,29 @@ public ProcfsBasedProcessTree(String pid, boolean setsidUsed, long sigkillInterval) { + this(pid, setsidUsed, sigkillInterval, PROCFS); + } + + /** + * Build a new process tree rooted at the pid. + * + * This method is provided mainly for testing purposes, where + * the root of the proc file system can be adjusted. + * + * @param pid root of the process tree + * @param setsidUsed true, if setsid was used for the root pid + * @param sigkillInterval how long to wait between a SIGTERM and SIGKILL + * when killing a process tree + * @param procfsDir the root of a proc file system - only used for testing. + */ + public ProcfsBasedProcessTree(String pid, boolean setsidUsed, + long sigkillInterval, String procfsDir) { this.pid = getValidPID(pid); this.setsidUsed = setsidUsed; sleeptimeBeforeSigkill = sigkillInterval; + this.procfsDir = procfsDir; } - + /** * Sets SIGKILL interval * @deprecated Use {...@link ProcfsBasedProcessTree#ProcfsBasedProcessTree( @@ -108,13 +130,17 @@ List<Integer> processList = getProcessList(); Map<Integer, ProcessInfo> allProcessInfo = new HashMap<Integer, ProcessInfo>(); + + // cache the processTree to get the age for processes + Map<Integer, ProcessInfo> oldProcs = + new HashMap<Integer, ProcessInfo>(processTree); processTree.clear(); ProcessInfo me = null; for (Integer proc : processList) { // Get information for each process ProcessInfo pInfo = new ProcessInfo(proc); - if (constructProcessInfo(pInfo) != null) { + if (constructProcessInfo(pInfo, procfsDir) != null) { allProcessInfo.put(proc, pInfo); if (proc.equals(this.pid)) { me = pInfo; // cache 'me' @@ -150,6 +176,16 @@ pInfoQueue.addAll(pInfo.getChildren()); } + // update age values. + for (Map.Entry<Integer, ProcessInfo> procs : processTree.entrySet()) { + ProcessInfo oldInfo = oldProcs.get(procs.getKey()); + if (oldInfo != null) { + if (procs.getValue() != null) { + procs.getValue().updateAge(oldInfo); + } + } + } + if (LOG.isDebugEnabled()) { // Log.debug the ProcfsBasedProcessTree LOG.debug(this.toString()); @@ -269,15 +305,29 @@ * @return cumulative virtual memory used by the process-tree in bytes. */ public long getCumulativeVmem() { + // include all processes.. all processes will be older than 0. + return getCumulativeVmem(0); + } + + /** + * Get the cumulative virtual memory used by all the processes in the + * process-tree that are older than the passed in age. + * + * @param olderThanAge processes above this age are included in the + * memory addition + * @return cumulative virtual memory used by the process-tree in bytes, + * for processes older than this age. + */ + public long getCumulativeVmem(int olderThanAge) { long total = 0; for (ProcessInfo p : processTree.values()) { - if (p != null) { + if ((p != null) && (p.getAge() > olderThanAge)) { total += p.getVmem(); } } return total; } - + private static Integer getValidPID(String pid) { Integer retPid = -1; try { @@ -295,13 +345,13 @@ * Get the list of all processes in the system. */ private List<Integer> getProcessList() { - String[] processDirs = (new File(PROCFS)).list(); + String[] processDirs = (new File(procfsDir)).list(); List<Integer> processList = new ArrayList<Integer>(); for (String dir : processDirs) { try { int pd = Integer.parseInt(dir); - if ((new File(PROCFS + dir)).isDirectory()) { + if ((new File(procfsDir, dir)).isDirectory()) { processList.add(Integer.valueOf(pd)); } } catch (NumberFormatException n) { @@ -319,12 +369,29 @@ * same. Returns null on failing to read from procfs, */ private static ProcessInfo constructProcessInfo(ProcessInfo pinfo) { + return constructProcessInfo(pinfo, PROCFS); + } + + /** + * Construct the ProcessInfo using the process' PID and procfs rooted at the + * specified directory and return the same. It is provided mainly to assist + * testing purposes. + * + * Returns null on failing to read from procfs, + * + * @param pinfo ProcessInfo that needs to be updated + * @param procfsDir root of the proc file system + * @return updated ProcessInfo, null on errors. + */ + private static ProcessInfo constructProcessInfo(ProcessInfo pinfo, + String procfsDir) { ProcessInfo ret = null; - // Read "/proc/<pid>/stat" file + // Read "procfsDir/<pid>/stat" file - typically /proc/<pid>/stat BufferedReader in = null; FileReader fReader = null; try { - fReader = new FileReader(PROCFS + pinfo.getPid() + "/stat"); + File pidDir = new File(procfsDir, String.valueOf(pinfo.getPid())); + fReader = new FileReader(new File(pidDir, "/stat")); in = new BufferedReader(fReader); } catch (FileNotFoundException f) { // The process vanished in the interim! @@ -338,7 +405,7 @@ boolean mat = m.find(); if (mat) { // Set ( name ) ( ppid ) ( pgrpId ) (session ) (vsize ) - pinfo.update(m.group(2), Integer.parseInt(m.group(3)), Integer + pinfo.updateProcessInfo(m.group(2), Integer.parseInt(m.group(3)), Integer .parseInt(m.group(4)), Integer.parseInt(m.group(5)), Long .parseLong(m.group(7))); } @@ -365,7 +432,6 @@ return ret; } - /** * Returns a string printing PIDs of process present in the * ProcfsBasedProcessTree. Output format : [pid pid ..] @@ -391,10 +457,14 @@ private Integer ppid; // parent process-id private Integer sessionId; // session-id private Long vmem; // virtual memory usage + // how many times has this process been seen alive + private int age; private List<ProcessInfo> children = new ArrayList<ProcessInfo>(); // list of children public ProcessInfo(int pid) { this.pid = Integer.valueOf(pid); + // seeing this the first time. + this.age = 1; } public Integer getPid() { @@ -421,6 +491,10 @@ return vmem; } + public int getAge() { + return age; + } + public boolean isParent(ProcessInfo p) { if (pid.equals(p.getPpid())) { return true; @@ -428,7 +502,7 @@ return false; } - public void update(String name, Integer ppid, Integer pgrpId, + public void updateProcessInfo(String name, Integer ppid, Integer pgrpId, Integer sessionId, Long vmem) { this.name = name; this.ppid = ppid; @@ -437,6 +511,10 @@ this.vmem = vmem; } + public void updateAge(ProcessInfo oldInfo) { + this.age = oldInfo.age + 1; + } + public boolean addChild(ProcessInfo p) { return children.add(p); } Modified: hadoop/core/branches/HADOOP-4687/core/src/java/org/apache/hadoop/util/Progress.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/core/src/java/org/apache/hadoop/util/Progress.java?rev=784965&r1=784964&r2=784965&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/core/src/java/org/apache/hadoop/util/Progress.java (original) +++ hadoop/core/branches/HADOOP-4687/core/src/java/org/apache/hadoop/util/Progress.java Mon Jun 15 20:28:06 2009 @@ -20,19 +20,32 @@ import java.util.ArrayList; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + /** Utility to assist with generation of progress reports. Applications build * a hierarchy of {...@link Progress} instances, each modelling a phase of * execution. The root is constructed with {...@link #Progress()}. Nodes for * sub-phases are created by calling {...@link #addPhase()}. */ public class Progress { + private static final Log LOG = LogFactory.getLog(Progress.class); private String status = ""; private float progress; private int currentPhase; private ArrayList<Progress> phases = new ArrayList<Progress>(); private Progress parent; - private float progressPerPhase; + // Each phase can have different progress weightage. For example, in + // Map Task, map phase accounts for 66.7% and sort phase for 33.3%. + // User needs to give weightages as parameters to all phases(when adding + // phases) in a Progress object, if he wants to give weightage to any of the + // phases. So when nodes are added without specifying weightage, it means + // fixed weightage for all phases. + private boolean fixedWeightageForAllPhases = false; + private float progressPerPhase = 0.0f; + private ArrayList<Float> progressWeightagesForPhases = new ArrayList<Float>(); + /** Creates a new root node. */ public Progress() {} @@ -43,15 +56,73 @@ return phase; } - /** Adds a node to the tree. */ + /** Adds a node to the tree. Gives equal weightage to all phases */ public synchronized Progress addPhase() { + Progress phase = addNewPhase(); + // set equal weightage for all phases + progressPerPhase = 1.0f / (float)phases.size(); + fixedWeightageForAllPhases = true; + return phase; + } + + /** Adds a new phase. Caller needs to set progress weightage */ + private synchronized Progress addNewPhase() { Progress phase = new Progress(); phases.add(phase); phase.setParent(this); - progressPerPhase = 1.0f / (float)phases.size(); return phase; } + /** Adds a named node with a specified progress weightage to the tree. */ + public Progress addPhase(String status, float weightage) { + Progress phase = addPhase(weightage); + phase.setStatus(status); + + return phase; + } + + /** Adds a node with a specified progress weightage to the tree. */ + public synchronized Progress addPhase(float weightage) { + Progress phase = new Progress(); + progressWeightagesForPhases.add(weightage); + phases.add(phase); + phase.setParent(this); + + // Ensure that the sum of weightages does not cross 1.0 + float sum = 0; + for (int i = 0; i < phases.size(); i++) { + sum += progressWeightagesForPhases.get(i); + } + if (sum > 1.0) { + LOG.warn("Sum of weightages can not be more than 1.0; But sum = " + sum); + } + + return phase; + } + + /** Adds n nodes to the tree. Gives equal weightage to all phases */ + public synchronized void addPhases(int n) { + for (int i = 0; i < n; i++) { + addNewPhase(); + } + // set equal weightage for all phases + progressPerPhase = 1.0f / (float)phases.size(); + fixedWeightageForAllPhases = true; + } + + /** + * returns progress weightage of the given phase + * @param phaseNum the phase number of the phase(child node) for which we need + * progress weightage + * @return returns the progress weightage of the specified phase + */ + float getProgressWeightage(int phaseNum) { + if (fixedWeightageForAllPhases) { + return progressPerPhase; // all phases are of equal weightage + } + return progressWeightagesForPhases.get(phaseNum); + } + synchronized Progress getParent() { return parent; } synchronized void setParent(Progress parent) { this.parent = parent; } @@ -89,8 +160,8 @@ } /** Returns the overall progress of the root. */ - // this method probably does not need to be synchronized as getINternal() is synchronized - // and the node's parent never changes. Still, it doesn't hurt. + // this method probably does not need to be synchronized as getInternal() is + // synchronized and the node's parent never changes. Still, it doesn't hurt. public synchronized float get() { Progress node = this; while (node.getParent() != null) { // find the root @@ -99,13 +170,37 @@ return node.getInternal(); } + /** + * Returns progress in this node. get() would give overall progress of the + * root node(not just given current node). + */ + public synchronized float getProgress() { + return getInternal(); + } + /** Computes progress in this node. */ private synchronized float getInternal() { int phaseCount = phases.size(); if (phaseCount != 0) { - float subProgress = - currentPhase < phaseCount ? phase().getInternal() : 0.0f; - return progressPerPhase*(currentPhase + subProgress); + float subProgress = 0.0f; + float progressFromCurrentPhase = 0.0f; + if (currentPhase < phaseCount) { + subProgress = phase().getInternal(); + progressFromCurrentPhase = + getProgressWeightage(currentPhase) * subProgress; + } + + float progressFromCompletedPhases = 0.0f; + if (fixedWeightageForAllPhases) { // same progress weightage for each phase + progressFromCompletedPhases = progressPerPhase * currentPhase; + } + else { + for (int i = 0; i < currentPhase; i++) { + // progress weightages of phases could be different. Add them + progressFromCompletedPhases += getProgressWeightage(i); + } + } + return progressFromCompletedPhases + progressFromCurrentPhase; } else { return progress; } Modified: hadoop/core/branches/HADOOP-4687/core/src/java/org/apache/hadoop/util/RunJar.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/core/src/java/org/apache/hadoop/util/RunJar.java?rev=784965&r1=784964&r2=784965&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/core/src/java/org/apache/hadoop/util/RunJar.java (original) +++ hadoop/core/branches/HADOOP-4687/core/src/java/org/apache/hadoop/util/RunJar.java Mon Jun 15 20:28:06 2009 @@ -108,7 +108,7 @@ File tmpDir = new File(new Configuration().get("hadoop.tmp.dir")); boolean b = tmpDir.mkdirs(); - if (!b || !tmpDir.isDirectory()) { + if (!b && !tmpDir.isDirectory()) { System.err.println("Mkdirs failed to create " + tmpDir); System.exit(-1); } @@ -119,7 +119,7 @@ System.exit(-1); } b = workDir.mkdirs(); - if (!b || !workDir.isDirectory()) { + if (!b && !workDir.isDirectory()) { System.err.println("Mkdirs failed to create " + workDir); System.exit(-1); } Modified: hadoop/core/branches/HADOOP-4687/core/src/java/org/apache/hadoop/util/StringUtils.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/core/src/java/org/apache/hadoop/util/StringUtils.java?rev=784965&r1=784964&r2=784965&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/core/src/java/org/apache/hadoop/util/StringUtils.java (original) +++ hadoop/core/branches/HADOOP-4687/core/src/java/org/apache/hadoop/util/StringUtils.java Mon Jun 15 20:28:06 2009 @@ -88,7 +88,8 @@ double result = number; String suffix = ""; if (absNumber < 1024) { - // nothing + // since no division has occurred, don't format with a decimal point + return String.valueOf(number); } else if (absNumber < 1024 * 1024) { result = number / 1024.0; suffix = "k"; Propchange: hadoop/core/branches/HADOOP-4687/core/src/test/core/ ------------------------------------------------------------------------------ --- svn:mergeinfo (added) +++ svn:mergeinfo Mon Jun 15 20:28:06 2009 @@ -0,0 +1,2 @@ +/hadoop/core/branches/branch-0.19/core/src/test/core:713112 +/hadoop/core/trunk/src/test/core:776175-784663 Modified: hadoop/core/branches/HADOOP-4687/core/src/test/core/org/apache/hadoop/filecache/TestDistributedCache.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/core/src/test/core/org/apache/hadoop/filecache/TestDistributedCache.java?rev=784965&r1=784964&r2=784965&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/core/src/test/core/org/apache/hadoop/filecache/TestDistributedCache.java (original) +++ hadoop/core/branches/HADOOP-4687/core/src/test/core/org/apache/hadoop/filecache/TestDistributedCache.java Mon Jun 15 20:28:06 2009 @@ -55,6 +55,15 @@ assertTrue("DistributedCache failed deleting old cache when the cache store is full.", dirStatuses.length > 1); } + + public void testFileSystemOtherThanDefault() throws Exception { + Configuration conf = new Configuration(); + conf.set("fs.fakefile.impl", conf.get("fs.file.impl")); + Path fileToCache = new Path("fakefile:///" + firstCacheFile.toUri().getPath()); + Path result = DistributedCache.getLocalCache(fileToCache.toUri(), conf, new Path(TEST_CACHE_BASE_DIR), + false, System.currentTimeMillis(), new Path(TEST_ROOT_DIR)); + assertNotNull("DistributedCache cached file on non-default filesystem.", result); + } private void createTempFile(FileSystem fs, Path p) throws IOException { FSDataOutputStream out = fs.create(p); Modified: hadoop/core/branches/HADOOP-4687/core/src/test/core/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/core/src/test/core/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java?rev=784965&r1=784964&r2=784965&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/core/src/test/core/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java (original) +++ hadoop/core/branches/HADOOP-4687/core/src/test/core/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java Mon Jun 15 20:28:06 2009 @@ -12,7 +12,6 @@ * implied. See the License for the specific language governing * permissions and limitations under the License. * - * @author: Sriram Rao (Kosmix Corp.) * * We need to provide the ability to the code in fs/kfs without really * having a KFS deployment. For this purpose, use the LocalFileSystem Modified: hadoop/core/branches/HADOOP-4687/core/src/test/core/org/apache/hadoop/fs/kfs/TestKosmosFileSystem.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/core/src/test/core/org/apache/hadoop/fs/kfs/TestKosmosFileSystem.java?rev=784965&r1=784964&r2=784965&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/core/src/test/core/org/apache/hadoop/fs/kfs/TestKosmosFileSystem.java (original) +++ hadoop/core/branches/HADOOP-4687/core/src/test/core/org/apache/hadoop/fs/kfs/TestKosmosFileSystem.java Mon Jun 15 20:28:06 2009 @@ -12,7 +12,6 @@ * implied. See the License for the specific language governing * permissions and limitations under the License. * - * @author: Sriram Rao (Kosmix Corp.) * * Unit tests for testing the KosmosFileSystem API implementation. */ Modified: hadoop/core/branches/HADOOP-4687/core/src/test/core/org/apache/hadoop/fs/s3/S3FileSystemContractBaseTest.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/core/src/test/core/org/apache/hadoop/fs/s3/S3FileSystemContractBaseTest.java?rev=784965&r1=784964&r2=784965&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/core/src/test/core/org/apache/hadoop/fs/s3/S3FileSystemContractBaseTest.java (original) +++ hadoop/core/branches/HADOOP-4687/core/src/test/core/org/apache/hadoop/fs/s3/S3FileSystemContractBaseTest.java Mon Jun 15 20:28:06 2009 @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystemContractBaseTest; +import org.apache.hadoop.fs.Path; public abstract class S3FileSystemContractBaseTest extends FileSystemContractBaseTest { @@ -45,4 +46,15 @@ super.tearDown(); } + public void testBlockSize() throws Exception { + + long newBlockSize = fs.getDefaultBlockSize() * 2; + fs.getConf().setLong("fs.s3.block.size", newBlockSize); + + Path file = path("/test/hadoop/file"); + createFile(file); + assertEquals("Double default block size", newBlockSize, + fs.getFileStatus(file).getBlockSize()); + } + } Modified: hadoop/core/branches/HADOOP-4687/core/src/test/core/org/apache/hadoop/fs/s3native/InMemoryNativeFileSystemStore.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/core/src/test/core/org/apache/hadoop/fs/s3native/InMemoryNativeFileSystemStore.java?rev=784965&r1=784964&r2=784965&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/core/src/test/core/org/apache/hadoop/fs/s3native/InMemoryNativeFileSystemStore.java (original) +++ hadoop/core/branches/HADOOP-4687/core/src/test/core/org/apache/hadoop/fs/s3native/InMemoryNativeFileSystemStore.java Mon Jun 15 20:28:06 2009 @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.s3native; import static org.apache.hadoop.fs.s3native.NativeS3FileSystem.PATH_DELIMITER; + import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.ByteArrayOutputStream; @@ -122,19 +123,13 @@ public PartialListing list(String prefix, int maxListingLength) throws IOException { - return list(prefix, maxListingLength, null); + return list(prefix, maxListingLength, null, false); } public PartialListing list(String prefix, int maxListingLength, - String priorLastKey) throws IOException { - - return list(prefix, PATH_DELIMITER, maxListingLength, priorLastKey); - } - - public PartialListing listAll(String prefix, int maxListingLength, - String priorLastKey) throws IOException { + String priorLastKey, boolean recursive) throws IOException { - return list(prefix, null, maxListingLength, priorLastKey); + return list(prefix, recursive ? null : PATH_DELIMITER, maxListingLength, priorLastKey); } private PartialListing list(String prefix, String delimiter, @@ -174,9 +169,9 @@ dataMap.remove(key); } - public void rename(String srcKey, String dstKey) throws IOException { - metadataMap.put(dstKey, metadataMap.remove(srcKey)); - dataMap.put(dstKey, dataMap.remove(srcKey)); + public void copy(String srcKey, String dstKey) throws IOException { + metadataMap.put(dstKey, metadataMap.get(srcKey)); + dataMap.put(dstKey, dataMap.get(srcKey)); } public void purge(String prefix) throws IOException { Modified: hadoop/core/branches/HADOOP-4687/core/src/test/core/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/core/src/test/core/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java?rev=784965&r1=784964&r2=784965&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/core/src/test/core/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java (original) +++ hadoop/core/branches/HADOOP-4687/core/src/test/core/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java Mon Jun 15 20:28:06 2009 @@ -56,4 +56,94 @@ assertEquals(path("/test"), paths[0].getPath()); } + public void testNoTrailingBackslashOnBucket() throws Exception { + assertTrue(fs.getFileStatus(new Path(fs.getUri().toString())).isDir()); + } + + private void createTestFiles(String base) throws IOException { + store.storeEmptyFile(base + "/file1"); + store.storeEmptyFile(base + "/dir/file2"); + store.storeEmptyFile(base + "/dir/file3"); + } + + public void testDirWithDifferentMarkersWorks() throws Exception { + + for (int i = 0; i < 3; i++) { + String base = "test/hadoop" + i; + Path path = path("/" + base); + + createTestFiles(base); + + if (i == 0 ) { + //do nothing, we are testing correctness with no markers + } + else if (i == 1) { + // test for _$folder$ marker + store.storeEmptyFile(base + "_$folder$"); + store.storeEmptyFile(base + "/dir_$folder$"); + } + else if (i == 2) { + // test the end slash file marker + store.storeEmptyFile(base + "/"); + store.storeEmptyFile(base + "/dir/"); + } + else if (i == 3) { + // test both markers + store.storeEmptyFile(base + "_$folder$"); + store.storeEmptyFile(base + "/dir_$folder$"); + store.storeEmptyFile(base + "/"); + store.storeEmptyFile(base + "/dir/"); + } + + assertTrue(fs.getFileStatus(path).isDir()); + assertEquals(2, fs.listStatus(path).length); + } + } + + public void testDeleteWithNoMarker() throws Exception { + String base = "test/hadoop"; + Path path = path("/" + base); + + createTestFiles(base); + + fs.delete(path, true); + + path = path("/test"); + assertTrue(fs.getFileStatus(path).isDir()); + assertEquals(0, fs.listStatus(path).length); + } + + public void testRenameWithNoMarker() throws Exception { + String base = "test/hadoop"; + Path dest = path("/test/hadoop2"); + + createTestFiles(base); + + fs.rename(path("/" + base), dest); + + Path path = path("/test"); + assertTrue(fs.getFileStatus(path).isDir()); + assertEquals(1, fs.listStatus(path).length); + assertTrue(fs.getFileStatus(dest).isDir()); + assertEquals(2, fs.listStatus(dest).length); + } + + public void testEmptyFile() throws Exception { + store.storeEmptyFile("test/hadoop/file1"); + fs.open(path("/test/hadoop/file1")).close(); + } + + public void testBlockSize() throws Exception { + Path file = path("/test/hadoop/file"); + createFile(file); + assertEquals("Default block size", fs.getDefaultBlockSize(), + fs.getFileStatus(file).getBlockSize()); + + // Block size is determined at read time + long newBlockSize = fs.getDefaultBlockSize() * 2; + fs.getConf().setLong("fs.s3n.block.size", newBlockSize); + assertEquals("Double default block size", newBlockSize, + fs.getFileStatus(file).getBlockSize()); + } + } Modified: hadoop/core/branches/HADOOP-4687/core/src/test/core/org/apache/hadoop/util/TestProcfsBasedProcessTree.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/core/src/test/core/org/apache/hadoop/util/TestProcfsBasedProcessTree.java?rev=784965&r1=784964&r2=784965&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/core/src/test/core/org/apache/hadoop/util/TestProcfsBasedProcessTree.java (original) +++ hadoop/core/branches/HADOOP-4687/core/src/test/core/org/apache/hadoop/util/TestProcfsBasedProcessTree.java Mon Jun 15 20:28:06 2009 @@ -19,6 +19,7 @@ package org.apache.hadoop.util; import java.io.BufferedReader; +import java.io.BufferedWriter; import java.io.File; import java.io.FileNotFoundException; import java.io.FileReader; @@ -29,6 +30,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.Shell.ExitCodeException; import org.apache.hadoop.util.Shell.ShellCommandExecutor; @@ -231,4 +233,236 @@ } return pid; } + + public static class ProcessStatInfo { + // sample stat in a single line : 3910 (gpm) S 1 3910 3910 0 -1 4194624 + // 83 0 0 0 0 0 0 0 16 0 1 0 7852 2408448 88 4294967295 134512640 + // 134590050 3220521392 3220520036 10975138 0 0 4096 134234626 + // 4294967295 0 0 17 1 0 0 + String pid; + String name; + String ppid; + String pgrpId; + String session; + String vmem; + + public ProcessStatInfo(String[] statEntries) { + pid = statEntries[0]; + name = statEntries[1]; + ppid = statEntries[2]; + pgrpId = statEntries[3]; + session = statEntries[4]; + vmem = statEntries[5]; + } + + // construct a line that mimics the procfs stat file. + // all unused numerical entries are set to 0. + public String getStatLine() { + return String.format("%s (%s) S %s %s %s 0 0 0" + + " 0 0 0 0 0 0 0 0 0 0 0 0 0 %s 0 0 0" + + " 0 0 0 0 0 0 0 0" + + " 0 0 0 0 0", + pid, name, ppid, pgrpId, session, vmem); + } + } + + /** + * A basic test that creates a few process directories and writes + * stat files. Verifies that the virtual memory is correctly + * computed. + * @throws IOException if there was a problem setting up the + * fake procfs directories or files. + */ + public void testVirtualMemoryForProcessTree() throws IOException { + + // test processes + String[] pids = { "100", "200", "300", "400" }; + // create the fake procfs root directory. + File procfsRootDir = new File(TEST_ROOT_DIR, "proc"); + + try { + setupProcfsRootDir(procfsRootDir); + setupPidDirs(procfsRootDir, pids); + + // create stat objects. + // assuming processes 100, 200, 300 are in tree and 400 is not. + ProcessStatInfo[] procInfos = new ProcessStatInfo[4]; + procInfos[0] = new ProcessStatInfo(new String[] + {"100", "proc1", "1", "100", "100", "100000"}); + procInfos[1] = new ProcessStatInfo(new String[] + {"200", "proc2", "100", "100", "100", "200000"}); + procInfos[2] = new ProcessStatInfo(new String[] + {"300", "proc3", "200", "100", "100", "300000"}); + procInfos[3] = new ProcessStatInfo(new String[] + {"400", "proc4", "1", "400", "400", "400000"}); + + writeStatFiles(procfsRootDir, pids, procInfos); + + // crank up the process tree class. + ProcfsBasedProcessTree processTree = + new ProcfsBasedProcessTree("100", true, 100L, + procfsRootDir.getAbsolutePath()); + // build the process tree. + processTree.getProcessTree(); + + // verify cumulative memory + assertEquals("Cumulative memory does not match", + Long.parseLong("600000"), processTree.getCumulativeVmem()); + } finally { + FileUtil.fullyDelete(procfsRootDir); + } + } + + /** + * Tests that cumulative memory is computed only for + * processes older than a given age. + * @throws IOException if there was a problem setting up the + * fake procfs directories or files. + */ + public void testVMemForOlderProcesses() throws IOException { + // initial list of processes + String[] pids = { "100", "200", "300", "400" }; + // create the fake procfs root directory. + File procfsRootDir = new File(TEST_ROOT_DIR, "proc"); + + try { + setupProcfsRootDir(procfsRootDir); + setupPidDirs(procfsRootDir, pids); + + // create stat objects. + // assuming 100, 200 and 400 are in tree, 300 is not. + ProcessStatInfo[] procInfos = new ProcessStatInfo[4]; + procInfos[0] = new ProcessStatInfo(new String[] + {"100", "proc1", "1", "100", "100", "100000"}); + procInfos[1] = new ProcessStatInfo(new String[] + {"200", "proc2", "100", "100", "100", "200000"}); + procInfos[2] = new ProcessStatInfo(new String[] + {"300", "proc3", "1", "300", "300", "300000"}); + procInfos[3] = new ProcessStatInfo(new String[] + {"400", "proc4", "100", "100", "100", "400000"}); + + writeStatFiles(procfsRootDir, pids, procInfos); + + // crank up the process tree class. + ProcfsBasedProcessTree processTree = + new ProcfsBasedProcessTree("100", true, 100L, + procfsRootDir.getAbsolutePath()); + // build the process tree. + processTree.getProcessTree(); + + // verify cumulative memory + assertEquals("Cumulative memory does not match", + Long.parseLong("700000"), processTree.getCumulativeVmem()); + + // write one more process as child of 100. + String[] newPids = { "500" }; + setupPidDirs(procfsRootDir, newPids); + + ProcessStatInfo[] newProcInfos = new ProcessStatInfo[1]; + newProcInfos[0] = new ProcessStatInfo(new String[] + {"500", "proc5", "100", "100", "100", "500000"}); + writeStatFiles(procfsRootDir, newPids, newProcInfos); + + // check vmem includes the new process. + processTree.getProcessTree(); + assertEquals("Cumulative memory does not include new process", + Long.parseLong("1200000"), processTree.getCumulativeVmem()); + + // however processes older than 1 iteration will retain the older value + assertEquals("Cumulative memory shouldn't have included new process", + Long.parseLong("700000"), processTree.getCumulativeVmem(1)); + + // one more process + newPids = new String[]{ "600" }; + setupPidDirs(procfsRootDir, newPids); + + newProcInfos = new ProcessStatInfo[1]; + newProcInfos[0] = new ProcessStatInfo(new String[] + {"600", "proc6", "100", "100", "100", "600000"}); + writeStatFiles(procfsRootDir, newPids, newProcInfos); + + // refresh process tree + processTree.getProcessTree(); + + // processes older than 2 iterations should be same as before. + assertEquals("Cumulative memory shouldn't have included new processes", + Long.parseLong("700000"), processTree.getCumulativeVmem(2)); + + // processes older than 1 iteration should not include new process, + // but include process 500 + assertEquals("Cumulative memory shouldn't have included new processes", + Long.parseLong("1200000"), processTree.getCumulativeVmem(1)); + + // no processes older than 3 iterations, this should be 0 + assertEquals("Getting non-zero vmem for processes older than 3 iterations", + 0L, processTree.getCumulativeVmem(3)); + } finally { + FileUtil.fullyDelete(procfsRootDir); + } + } + + /** + * Create a directory to mimic the procfs file system's root. + * @param procfsRootDir root directory to create. + * @throws IOException if could not delete the procfs root directory + */ + public static void setupProcfsRootDir(File procfsRootDir) + throws IOException { + // cleanup any existing process root dir. + if (procfsRootDir.exists()) { + assertTrue(FileUtil.fullyDelete(procfsRootDir)); + } + + // create afresh + assertTrue(procfsRootDir.mkdirs()); + } + + /** + * Create PID directories under the specified procfs root directory + * @param procfsRootDir root directory of procfs file system + * @param pids the PID directories to create. + * @throws IOException If PID dirs could not be created + */ + public static void setupPidDirs(File procfsRootDir, String[] pids) + throws IOException { + for (String pid : pids) { + File pidDir = new File(procfsRootDir, pid); + pidDir.mkdir(); + if (!pidDir.exists()) { + throw new IOException ("couldn't make process directory under " + + "fake procfs"); + } else { + LOG.info("created pid dir"); + } + } + } + + /** + * Write stat files under the specified pid directories with data + * setup in the corresponding ProcessStatInfo objects + * @param procfsRootDir root directory of procfs file system + * @param pids the PID directories under which to create the stat file + * @param procs corresponding ProcessStatInfo objects whose data should be + * written to the stat files. + * @throws IOException if stat files could not be written + */ + public static void writeStatFiles(File procfsRootDir, String[] pids, + ProcessStatInfo[] procs) throws IOException { + for (int i=0; i<pids.length; i++) { + File statFile = new File(new File(procfsRootDir, pids[i]), "stat"); + BufferedWriter bw = null; + try { + FileWriter fw = new FileWriter(statFile); + bw = new BufferedWriter(fw); + bw.write(procs[i].getStatLine()); + LOG.info("wrote stat file for " + pids[i] + + " with contents: " + procs[i].getStatLine()); + } finally { + // not handling exception - will throw an error and fail the test. + if (bw != null) { + bw.close(); + } + } + } + } }
