Author: cutting
Date: Thu Mar 23 10:38:47 2006
New Revision: 388229

URL: http://svn.apache.org/viewcvs?rev=388229&view=rev
Log:
Fix for HADOOP-52.  Add username and working-directory to FileSystem and 
JobConf and use these to resolve relative paths.  Contributed by Owen O'Malley.

Added:
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestLocalDFS.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestLocalFileSystem.java
Modified:
    
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobProfile.java
    
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java
    lucene/hadoop/trunk/src/webapps/mapred/jobtracker.jsp

Modified: 
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java?rev=388229&r1=388228&r2=388229&view=diff
==============================================================================
--- 
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java 
(original)
+++ 
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java 
Thu Mar 23 10:38:47 2006
@@ -32,10 +32,9 @@
  * @author Mike Cafarella
  *****************************************************************/
 public class DistributedFileSystem extends FileSystem {
-    private static final String HOME_DIR =
-      "/user/" + System.getProperty("user.name") + "/";
+    private File workingDir = 
+      new File("/user/", System.getProperty("user.name"));
 
-    private Random r = new Random();
     private String name;
 
     DFSClient dfs;
@@ -50,11 +49,24 @@
 
     public String getName() { return name; }
 
-    private UTF8 getPath(File file) {
-      String path = getDFSPath(file);
-      if (!path.startsWith(DFSFile.DFS_FILE_SEPARATOR)) {
-        path = getDFSPath(new File(HOME_DIR, path)); // make absolute
+    public File getWorkingDirectory() {
+      return workingDir;
+    }
+    
+    private File makeAbsolute(File f) {
+      if (f.isAbsolute()) {
+        return f;
+      } else {
+        return new File(workingDir, f.toString());
       }
+    }
+    
+    public void setWorkingDirectory(File dir) {
+      workingDir = makeAbsolute(dir);
+    }
+    
+    private UTF8 getPath(File file) {
+      String path = getDFSPath(makeAbsolute(file));
       return new UTF8(path);
     }
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java?rev=388229&r1=388228&r2=388229&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java Thu Mar 
23 10:38:47 2006
@@ -295,6 +295,19 @@
     }
 
     /**
+     * Set the current working directory for the given file system.
+     * All relative paths will be resolved relative to it.
+     * @param new_dir
+     */
+    public abstract void setWorkingDirectory(File new_dir);
+    
+    /**
+     * Get the current working directory for the given file system
+     * @return the directory pathname
+     */
+    public abstract File getWorkingDirectory();
+    
+    /**
      * Make the given file and all non-existent parents into
      * directories.
      */

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java?rev=388229&r1=388228&r2=388229&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java Thu 
Mar 23 10:38:47 2006
@@ -22,7 +22,6 @@
 
 import org.apache.hadoop.fs.DF;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.UTF8;
 
 /****************************************************************
  * Implement the FileSystem API for the native filesystem.
@@ -30,6 +29,7 @@
  * @author Mike Cafarella
  *****************************************************************/
 public class LocalFileSystem extends FileSystem {
+    private File workingDir = new File(System.getProperty("user.dir"));
     TreeMap sharedLockDataSet = new TreeMap();
     TreeMap nonsharedLockDataSet = new TreeMap();
     TreeMap lockObjSet = new TreeMap();
@@ -109,6 +109,7 @@
     }
     
     public FSInputStream openRaw(File f) throws IOException {
+        f = makeAbsolute(f);
         if (! f.exists()) {
             throw new FileNotFoundException(f.toString());
         }
@@ -151,8 +152,17 @@
       }
     }
 
+    private File makeAbsolute(File f) {
+      if (f.isAbsolute()) {
+        return f;
+      } else {
+        return new File(workingDir, f.toString());
+      }
+    }
+    
     public FSOutputStream createRaw(File f, boolean overwrite)
       throws IOException {
+        f = makeAbsolute(f);
         if (f.exists() && ! overwrite) {
             throw new IOException("File already exists:"+f);
         }
@@ -164,6 +174,8 @@
     }
 
     public boolean renameRaw(File src, File dst) throws IOException {
+        src = makeAbsolute(src);
+        dst = makeAbsolute(dst);
         if (useCopyForRename) {
             FileUtil.copyContents(this, src, dst, true, getConf());
             return fullyDelete(src);
@@ -171,32 +183,54 @@
     }
 
     public boolean deleteRaw(File f) throws IOException {
+        f = makeAbsolute(f);
         if (f.isFile()) {
             return f.delete();
         } else return fullyDelete(f);
     }
 
     public boolean exists(File f) throws IOException {
+        f = makeAbsolute(f);
         return f.exists();
     }
 
     public boolean isDirectory(File f) throws IOException {
+        f = makeAbsolute(f);
         return f.isDirectory();
     }
 
     public long getLength(File f) throws IOException {
+        f = makeAbsolute(f);
         return f.length();
     }
 
     public File[] listFilesRaw(File f) throws IOException {
+        f = makeAbsolute(f);
         return f.listFiles();
     }
 
     public void mkdirs(File f) throws IOException {
+        f = makeAbsolute(f);
         f.mkdirs();
     }
 
+    /**
+     * Set the working directory to the given directory.
+     * Sets both a local variable and the system property.
+     * Note that the system property is only used if the application explictly
+     * calls java.io.File.getAbsolutePath().
+     */
+    public void setWorkingDirectory(File new_dir) {
+      workingDir = makeAbsolute(new_dir);
+      System.setProperty("user.dir", workingDir.toString());
+    }
+    
+    public File getWorkingDirectory() {
+      return workingDir;
+    }
+    
     public synchronized void lock(File f, boolean shared) throws IOException {
+        f = makeAbsolute(f);
         f.createNewFile();
 
         FileLock lockObj = null;
@@ -213,6 +247,7 @@
     }
 
     public synchronized void release(File f) throws IOException {
+        f = makeAbsolute(f);
         FileLock lockObj = (FileLock) lockObjSet.get(f);
         FileInputStream sharedLockData = (FileInputStream) 
sharedLockDataSet.get(f);
         FileOutputStream nonsharedLockData = (FileOutputStream) 
nonsharedLockDataSet.get(f);
@@ -238,6 +273,8 @@
     // In the case of the local filesystem, we can just rename the file.
     public void moveFromLocalFile(File src, File dst) throws IOException {
         if (! src.equals(dst)) {
+            src = makeAbsolute(src);
+            dst = makeAbsolute(dst);
             if (useCopyForRename) {
                 FileUtil.copyContents(this, src, dst, true, getConf());
                 fullyDelete(src);
@@ -248,6 +285,8 @@
     // Similar to moveFromLocalFile(), except the source is kept intact.
     public void copyFromLocalFile(File src, File dst) throws IOException {
         if (! src.equals(dst)) {
+            src = makeAbsolute(src);
+            dst = makeAbsolute(dst);
             FileUtil.copyContents(this, src, dst, true, getConf());
         }
     }
@@ -255,13 +294,15 @@
     // We can't delete the src file in this case.  Too bad.
     public void copyToLocalFile(File src, File dst) throws IOException {
         if (! src.equals(dst)) {
+            src = makeAbsolute(src);
+            dst = makeAbsolute(dst);
             FileUtil.copyContents(this, src, dst, true, getConf());
         }
     }
 
     // We can write output directly to the final location
     public File startLocalOutput(File fsOutputFile, File tmpLocalFile) throws 
IOException {
-        return fsOutputFile;
+        return makeAbsolute(fsOutputFile);
     }
 
     // It's in the right place - nothing to do.
@@ -270,7 +311,7 @@
 
     // We can read directly from the real local fs.
     public File startLocalInput(File fsInputFile, File tmpLocalFile) throws 
IOException {
-        return fsInputFile;
+        return makeAbsolute(fsInputFile);
     }
 
     // We're done reading.  Nothing to clean up.
@@ -292,6 +333,7 @@
      * @throws IOException
      */
     private boolean fullyDelete(File dir) throws IOException {
+        dir = makeAbsolute(dir);
         File contents[] = dir.listFiles();
         if (contents != null) {
             for (int i = 0; i < contents.length; i++) {
@@ -315,7 +357,7 @@
                                       long start, long length, int crc) {
       try {
         // canonicalize f   
-        f = f.getCanonicalFile();
+        f = makeAbsolute(f).getCanonicalFile();
       
         // find highest writable parent dir of f on the same device
         String device = new DF(f.toString()).getMount();

Modified: 
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java?rev=388229&r1=388228&r2=388229&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java 
Thu Mar 23 10:38:47 2006
@@ -59,9 +59,15 @@
   protected File[] listFiles(FileSystem fs, JobConf job)
     throws IOException {
     File[] dirs = job.getInputDirs();
+    String workDir = job.getWorkingDirectory();
     String subdir = job.get("mapred.input.subdir");
     ArrayList result = new ArrayList();
     for (int i = 0; i < dirs.length; i++) {
+      // if it is relative, make it absolute using the directory from the 
+      // JobConf
+      if (workDir != null && !dirs[i].isAbsolute()) {
+        dirs[i] = new File(workDir, dirs[i].toString());
+      }
       File[] dir = fs.listFiles(dirs[i]);
       if (dir != null) {
         for (int j = 0; j < dir.length; j++) {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?rev=388229&r1=388228&r2=388229&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Thu 
Mar 23 10:38:47 2006
@@ -247,7 +247,14 @@
           getFs().copyFromLocalFile(new File(originalJarPath), submitJarFile);
         }
 
-        FileSystem fs = getFs();
+        FileSystem fileSys = getFs();
+
+        // Set the user's name and working directory
+        String user = System.getProperty("user.name");
+        job.setUser(user != null ? user : "Dr Who");
+        if (job.getWorkingDirectory() == null) {
+          job.setWorkingDirectory(fileSys.getWorkingDirectory().toString());   
       
+        }
 
         // Ensure that the output directory is set and not already there
         File outDir = job.getOutputDir();
@@ -260,7 +267,7 @@
         }
 
         // Write job file to JobTracker's fs        
-        FSDataOutputStream out = fs.create(submitJobFile);
+        FSDataOutputStream out = fileSys.create(submitJobFile);
         try {
           job.write(out);
         } finally {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?rev=388229&r1=388228&r2=388229&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Thu Mar 
23 10:38:47 2006
@@ -143,6 +143,38 @@
     return result;
   }
 
+  /**
+   * Get the reported username for this job.
+   * @return the username
+   */
+  public String getUser() {
+    return get("user.name");
+  }
+  
+  /**
+   * Set the reported username for this job.
+   * @param user the username
+   */
+  public void setUser(String user) {
+    set("user.name", user);
+  }
+  
+  /**
+   * Set the current working directory for the default file system
+   * @param dir the new current working directory
+   */
+  public void setWorkingDirectory(String dir) {
+    set("mapred.working.dir", dir);
+  }
+  
+  /**
+   * Get the current working directory for the default file system.
+   * @return the directory name
+   */
+  public String getWorkingDirectory() {
+    return get("mapred.working.dir"); 
+  }
+  
   public File getOutputDir() { 
     String name = get("mapred.output.dir");
     return name == null ? null: new File(name);

Modified: 
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=388229&r1=388228&r2=388229&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java 
Thu Mar 23 10:38:47 2006
@@ -62,7 +62,6 @@
         String jobid = "job_" + jobtracker.createUniqueId();
         String url = "http://"; + jobtracker.getJobTrackerMachine() + ":" + 
jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + jobid;
         this.jobtracker = jobtracker;
-        this.profile = new JobProfile(jobid, jobFile, url);
         this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP);
         this.startTime = System.currentTimeMillis();
 
@@ -75,6 +74,7 @@
         fs.copyToLocalFile(new File(jobFile), localJobFile);
 
         conf = new JobConf(localJobFile);
+        this.profile = new JobProfile(conf.getUser(), jobid, jobFile, url);
 
         String jarFile = conf.getJar();
         if (jarFile != null) {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobProfile.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobProfile.java?rev=388229&r1=388228&r2=388229&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobProfile.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobProfile.java Thu 
Mar 23 10:38:47 2006
@@ -36,6 +36,7 @@
          });
     }
 
+    String user;
     String jobid;
     String jobFile;
     String url;
@@ -47,13 +48,21 @@
 
     /**
      */
-    public JobProfile(String jobid, String jobFile, String url) {
+    public JobProfile(String user, String jobid, String jobFile, String url) {
+        this.user = user;
         this.jobid = jobid;
         this.jobFile = jobFile;
         this.url = url;
     }
 
     /**
+     * Get the user id.
+     */
+    public String getUser() {
+      return user;
+    }
+    
+    /**
      */
     public String getJobId() {
         return jobid;
@@ -83,11 +92,13 @@
         UTF8.writeString(out, jobid);
         UTF8.writeString(out, jobFile);
         UTF8.writeString(out, url);
+        UTF8.writeString(out, user);
     }
     public void readFields(DataInput in) throws IOException {
         this.jobid = UTF8.readString(in);
         this.jobFile = UTF8.readString(in);
         this.url = UTF8.readString(in);
+        this.user = UTF8.readString(in);
     }
 }
 

Modified: 
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java?rev=388229&r1=388228&r2=388229&view=diff
==============================================================================
--- 
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
 (original)
+++ 
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
 Thu Mar 23 10:38:47 2006
@@ -17,7 +17,6 @@
 package org.apache.hadoop.mapred;
 
 import java.io.*;
-import java.util.*;
 
 /** 
  * Protocol that a JobClient and the central JobTracker use to communicate.  
The

Modified: 
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=388229&r1=388228&r2=388229&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java 
Thu Mar 23 10:38:47 2006
@@ -44,6 +44,7 @@
     private JobStatus status = new JobStatus();
     private ArrayList mapIds = new ArrayList();
     private MapOutputFile mapoutputFile;
+    private JobProfile profile;
 
     public Job(String file, Configuration conf) throws IOException {
       this.file = file;
@@ -54,8 +55,8 @@
       File localFile = new JobConf(conf).getLocalFile("localRunner", 
id+".xml");
       fs.copyToLocalFile(new File(file), localFile);
       this.job = new JobConf(localFile);
-      
-      
+      profile = new JobProfile(job.getUser(), id, file, 
+                               "http://localhost:8080/";);
       this.status.jobid = id;
       this.status.runState = JobStatus.RUNNING;
 
@@ -64,15 +65,30 @@
       this.start();
     }
 
+    JobProfile getProfile() {
+      return profile;
+    }
+    
+    private void setWorkingDirectory(JobConf conf, FileSystem fs) {
+      String dir = conf.getWorkingDirectory();
+      if (dir != null) {
+        fs.setWorkingDirectory(new File(dir));
+      }
+    }
+    
     public void run() {
       try {
         // split input into minimum number of splits
-        FileSplit[] splits = job.getInputFormat().getSplits(fs, job, 1);
+        FileSplit[] splits;
+        setWorkingDirectory(job, fs);
+        splits = job.getInputFormat().getSplits(fs, job, 1);
 
+        
         // run a map task for each split
         job.setNumReduceTasks(1);                 // force a single reduce task
         for (int i = 0; i < splits.length; i++) {
           mapIds.add("map_" + newId());
+          setWorkingDirectory(job, fs);
           MapTask map = new MapTask(file, (String)mapIds.get(i), splits[i]);
           map.setConf(job);
           map_tasks += 1;
@@ -97,10 +113,9 @@
         for (int i = 0; i < mapIds.size(); i++) {
             mapDependencies[i][0] = (String) mapIds.get(i);
         }
-        ReduceTask reduce =
-          new ReduceTask(file, reduceId,
-                         mapDependencies,
-                         0);
+        setWorkingDirectory(job, fs);
+        ReduceTask reduce = new ReduceTask(file, reduceId,
+            mapDependencies,0);
         reduce.setConf(job);
         reduce_tasks += 1;
         reduce.run(job, this);
@@ -172,7 +187,7 @@
 
   public JobProfile getJobProfile(String id) {
     Job job = (Job)jobs.get(id);
-    return new JobProfile(id, job.file, "http://localhost:8080/";);
+    return job.getProfile();
   }
 
   public TaskReport[] getMapTaskReports(String id) {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=388229&r1=388228&r2=388229&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Thu 
Mar 23 10:38:47 2006
@@ -700,6 +700,12 @@
           startPinging(umbilical, taskid);        // start pinging parent
 
           try {
+              // If the user set a working directory, use it
+              String workDir = job.getWorkingDirectory();
+              if (workDir != null) {
+                FileSystem file_sys = FileSystem.get(job);
+                file_sys.setWorkingDirectory(new File(workDir));
+              }
               task.run(job, umbilical);           // run the task
           } catch (FSError e) {
             LOG.log(Level.SEVERE, "FSError from child", e);

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java?rev=388229&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java 
(added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java Thu 
Mar 23 10:38:47 2006
@@ -0,0 +1,124 @@
+package org.apache.hadoop.dfs;
+
+import java.io.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+
+/**
+ * This class creates a single-process DFS cluster for junit testing.
+ * One thread is created for each server.
+ * The data directories for DFS are undering the testing directory.
+ * @author Owen O'Malley
+ */
+public class MiniDFSCluster {
+
+  private Configuration conf;
+  private Thread nameNodeThread;
+  private Thread dataNodeThread;
+  private NameNodeRunner nameNode;
+  private DataNodeRunner dataNode;
+
+  /**
+   * An inner class that runs a name node.
+   */
+  class NameNodeRunner implements Runnable {
+    private NameNode node;
+    
+    /**
+     * Create the name node and run it.
+     */
+    public void run() {
+      try {
+        node = new NameNode(conf);
+      } catch (Throwable e) {
+        node = null;
+        System.err.println("Name node crashed:");
+        e.printStackTrace();
+      }
+    }
+    
+    /**
+     * Shutdown the name node and wait for it to finish.
+     */
+    public void shutdown() {
+      if (node != null) {
+        node.stop();
+        node.join();
+      }
+    }
+  }
+  
+  /**
+   * An inner class to run the data node.
+   */
+  class DataNodeRunner implements Runnable {
+    private DataNode node;
+    
+    /**
+     * Create and run the data node.
+     */
+    public void run() {
+      try {
+        File dataDir = new File(conf.get("dfs.data.dir"));
+        dataDir.mkdirs();
+        node = new DataNode(conf, dataDir.getPath());
+        node.run();
+      } catch (Throwable e) {
+        node = null;
+        System.err.println("Data node crashed:");
+        e.printStackTrace();
+      }
+    }
+
+    /**    
+     * Shut down the server and wait for it to finish.
+     */
+    public void shutdown() {
+      if (node != null) {
+        node.shutdown();
+      }
+    }
+  }
+  
+  /**
+   * Create the config and start up the servers.
+   */
+  public MiniDFSCluster(int namenodePort, Configuration conf) {
+    this.conf = conf;
+    conf.set("fs.default.name", 
+             "localhost:"+ Integer.toString(namenodePort));
+    File base_dir = new File(System.getProperty("test.build.data"),
+                             "dfs/");
+    conf.set("dfs.name.dir", new File(base_dir, "name").getPath());
+    conf.set("dfs.data.dir", new File(base_dir, "data").getPath());
+    conf.setInt("dfs.replication", 1);
+    // this timeout seems to control the minimum time for the test, so
+    // set it down at 5 seconds.
+    conf.setInt("ipc.client.timeout", 5000);
+    nameNode = new NameNodeRunner();
+    nameNodeThread = new Thread(nameNode);
+    nameNodeThread.start();
+    dataNode = new DataNodeRunner();
+    dataNodeThread = new Thread(dataNode);
+    dataNodeThread.start();
+    try {                                     // let daemons get started
+      Thread.sleep(1000);
+    } catch(InterruptedException e) {
+    }
+  }
+  
+  /**
+   * Shut down the servers.
+   */
+  public void shutdown() {
+    nameNode.shutdown();
+    dataNode.shutdown();
+  }
+  
+  /**
+   * Get a client handle to the DFS cluster.
+   */
+  public FileSystem getFileSystem() throws IOException {
+    return FileSystem.get(conf);
+  }
+}

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestLocalDFS.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestLocalDFS.java?rev=388229&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestLocalDFS.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestLocalDFS.java Thu 
Mar 23 10:38:47 2006
@@ -0,0 +1,64 @@
+package org.apache.hadoop.dfs;
+
+import junit.framework.TestCase;
+import java.io.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
+/**
+ * This class tests the DFS class via the FileSystem interface in a single node
+ * mini-cluster.
+ * @author Owen O'Malley
+ */
+public class TestLocalDFS extends TestCase {
+
+  private void writeFile(FileSystem fileSys, File name) throws IOException {
+    DataOutputStream stm = fileSys.create(name);
+    stm.writeBytes("oom");
+    stm.close();
+  }
+  
+  private void readFile(FileSystem fileSys, File name) throws IOException {
+    DataInputStream stm = fileSys.open(name);
+    byte[] buffer = new byte[4];
+    int bytesRead = stm.read(buffer, 0 ,4);
+    assertEquals("oom", new String(buffer, 0 , bytesRead));
+    stm.close();
+  }
+  
+  private void cleanupFile(FileSystem fileSys, File name) throws IOException {
+    assertTrue(fileSys.exists(name));
+    fileSys.delete(name);
+    assertTrue(!fileSys.exists(name));
+  }
+  
+  /**
+   * Tests get/set working directory in DFS.
+   */
+  public void testWorkingDirectory() throws IOException {
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster(65312, conf);
+    FileSystem fileSys = cluster.getFileSystem();
+    try {
+      File orig_path = fileSys.getWorkingDirectory();
+      assertTrue(orig_path.isAbsolute());
+      File file1 = new File("somewhat/random.txt");
+      writeFile(fileSys, file1);
+      assertTrue(fileSys.exists(new File(orig_path, file1.getPath())));
+      fileSys.delete(file1);
+      File subdir1 = new File("/somewhere");
+      fileSys.setWorkingDirectory(subdir1);
+      writeFile(fileSys, file1);
+      cleanupFile(fileSys, new File(subdir1, file1.getPath()));
+      File subdir2 = new File("else");
+      fileSys.setWorkingDirectory(subdir2);
+      writeFile(fileSys, file1);
+      readFile(fileSys, file1);
+      cleanupFile(fileSys, new File(new File(subdir1, subdir2.getPath()),
+                                     file1.getPath()));
+    } finally {
+      fileSys.close();
+      cluster.shutdown();
+    }
+  }
+}

Added: 
lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestLocalFileSystem.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestLocalFileSystem.java?rev=388229&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestLocalFileSystem.java 
(added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestLocalFileSystem.java 
Thu Mar 23 10:38:47 2006
@@ -0,0 +1,85 @@
+package org.apache.hadoop.fs;
+
+import org.apache.hadoop.conf.Configuration;
+import java.io.*;
+import junit.framework.*;
+
+/**
+ * This class tests the local file system via the FileSystem abstraction.
+ * @author Owen O'Malley
+ */
+public class TestLocalFileSystem extends TestCase {
+
+  private void writeFile(FileSystem fs, File name) throws IOException {
+    FSDataOutputStream stm = fs.create(name);
+    stm.writeBytes("42\n");
+    stm.close();
+  }
+  
+  private void cleanupFile(FileSystem fs, File name) throws IOException {
+    assertTrue(fs.exists(name));
+    fs.delete(name);
+    assertTrue(!fs.exists(name));
+  }
+  
+  /**
+   * Test the capability of setting the working directory.
+   */
+  public void testWorkingDirectory() throws IOException {
+    Configuration conf = new Configuration();
+    FileSystem fileSys = FileSystem.getNamed("local", conf);
+    File origDir = fileSys.getWorkingDirectory();
+    File subdir = new File("build/test/data/work-dir/new subdir");
+    File subdirAbsolute = subdir.getAbsoluteFile();
+    try {
+      // make sure it doesn't already exist
+      assertTrue(!fileSys.exists(subdir));
+      // make it and check for it
+      fileSys.mkdirs(subdir);
+      assertTrue(fileSys.isDirectory(subdir));
+      
+      fileSys.setWorkingDirectory(subdir);
+      
+      // create a directory and check for it
+      File dir1 = new File("dir1");
+      File dir1Absolute = dir1.getAbsoluteFile();
+      fileSys.mkdirs(dir1);
+      assertTrue(fileSys.isDirectory(dir1));
+      assertTrue(fileSys.isDirectory(dir1Absolute));
+      
+      // delete the directory and make sure it went away
+      fileSys.delete(dir1);
+      assertTrue(!fileSys.exists(dir1));
+      assertTrue(!fileSys.exists(dir1Absolute));
+      
+      // create files and manipulate them.
+      File file1 = new File("file1");
+      File file2 = new File("sub/file2");
+      File file2_abs = file2.getAbsoluteFile();
+      assertEquals(file2_abs, new File(subdirAbsolute, file2.getPath()));
+      writeFile(fileSys, file1);
+      fileSys.copyFromLocalFile(file1, file2);
+      assertTrue(fileSys.exists(file1));
+      assertTrue(fileSys.isFile(file1));
+      cleanupFile(fileSys, file2_abs);
+      fileSys.copyToLocalFile(file1, file2);
+      cleanupFile(fileSys, file2_abs);
+      
+      // try a rename
+      fileSys.rename(file1, file2);
+      assertTrue(!fileSys.exists(file1));
+      assertTrue(fileSys.exists(file2_abs));
+      fileSys.rename(file2, file1);
+      
+      // try reading a file
+      InputStream stm = fileSys.openRaw(file1);
+      byte[] buffer = new byte[3];
+      int bytesRead = stm.read(buffer, 0, 3);
+      assertEquals("42\n", new String(buffer, 0, bytesRead));
+      stm.close();
+    } finally {
+      fileSys.setWorkingDirectory(origDir);
+      fileSys.delete(subdir);
+    }
+  }
+}

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java?rev=388229&r1=388228&r2=388229&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java Thu Mar 23 
10:38:47 2006
@@ -40,6 +40,7 @@
 
   // quiet during testing, since output ends up on console
   static {
+    conf.setInt("ipc.client.timeout", 5000);
     LOG.setLevel(Level.WARNING);
     Client.LOG.setLevel(Level.WARNING);
     Server.LOG.setLevel(Level.WARNING);

Modified: lucene/hadoop/trunk/src/webapps/mapred/jobtracker.jsp
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/webapps/mapred/jobtracker.jsp?rev=388229&r1=388228&r2=388229&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/webapps/mapred/jobtracker.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/mapred/jobtracker.jsp Thu Mar 23 10:38:47 
2006
@@ -4,11 +4,13 @@
   import="javax.servlet.http.*"
   import="java.io.*"
   import="java.util.*"
+  import="java.text.DecimalFormat"
   import="org.apache.hadoop.mapred.*"
 %>
 <%!
   JobTracker tracker = JobTracker.getTracker();
   String trackerLabel = tracker.getJobTrackerMachine() + ":" + 
tracker.getTrackerPort();
+  private static DecimalFormat percentFormat = new DecimalFormat("##0.00");
 
   public void generateTaskTrackerTable(JspWriter out) throws IOException {
     Collection c = tracker.taskTrackers();
@@ -44,14 +46,16 @@
       out.print("<center>\n");
       out.print("<table border=\"2\" cellpadding=\"5\" cellspacing=\"2\">\n");
       out.print("<tr><td align=\"center\" colspan=\"8\"><b>" + label + " Jobs 
</b></td></tr>\n");
-
       if (jobs.size() > 0) {
-        out.print("<tr><td><b>Jobid</b></td><td><b>% 
complete</b></td><td><b>Required maps</b></td><td><b>maps 
completed</b></td><td><b>Required reduces</b></td><td><b>reduces 
completed</b></td></tr>\n");
+        out.print("<tr><td><b>Jobid</b></td><td><b>User</b></td>");
+        out.print("<td><b>% complete</b></td><td><b>Required maps</b></td>");
+        out.print("<td><b>maps completed</b></td>");
+        out.print("<td><b>Required reduces</b></td>");
+        out.print("<td><b>reduces completed</b></td></tr>\n");
         for (Iterator it = jobs.iterator(); it.hasNext(); ) {
           JobInProgress job = (JobInProgress) it.next();
           JobProfile profile = job.getProfile();
           JobStatus status = job.getStatus();
-
           String jobid = profile.getJobId();
           double completedRatio = (0.5 * (100 * status.mapProgress())) +
                                  (0.5 * (100 * status.reduceProgress()));
@@ -61,7 +65,12 @@
           int completedMaps = job.finishedMaps();
           int completedReduces = job.finishedReduces();
 
-          out.print("<tr><td><a href=\"jobdetails.jsp?jobid=" + jobid + "\">" 
+ jobid + "</a></td><td>" + completedRatio + "%</td><td>" + desiredMaps + 
"</td><td>" + completedMaps + "</td><td>" + desiredReduces + "</td><td> " + 
completedReduces + "</td></tr>\n");
+          out.print("<tr><td><a href=\"jobdetails.jsp?jobid=" + jobid + "\">" 
+ 
+                    jobid + "</a></td><td>"+ profile.getUser() + "</td><td>" +
+                    percentFormat.format(completedRatio) + "%</td><td>" + 
+                    desiredMaps + "</td><td>" + completedMaps + "</td><td>" + 
+                    desiredReduces + "</td><td> " + completedReduces + 
+                    "</td></tr>\n");
         }
       } else {
         out.print("<tr><td align=\"center\" 
colspan=\"8\"><i>none</i></td></tr>\n");


Reply via email to