Author: cutting
Date: Wed Feb 15 10:50:55 2006
New Revision: 378058

URL: http://svn.apache.org/viewcvs?rev=378058&view=rev
Log:
Fix HADOOP-38: Add FileSystem.getBlockSize() method and use it as the maximum 
split size.  Also change FileSystem to implement Configurable, and improve some 
javadoc, using inherited comments where possible and removing implementation 
details from public javadoc.

Modified:
    
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java

Modified: 
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java?rev=378058&r1=378057&r2=378058&view=diff
==============================================================================
--- 
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java 
(original)
+++ 
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java 
Wed Feb 15 10:50:55 2006
@@ -29,9 +29,6 @@
  * This object is the way end-user code interacts with a Hadoop
  * DistributedFileSystem.
  *
- * It's substantially a wrapper around the DFSClient class, with
- * a few extra functions.
- *
  * @author Mike Cafarella
  *****************************************************************/
 public class DistributedFileSystem extends FileSystem {
@@ -43,9 +40,7 @@
 
     DFSClient dfs;
 
-    /**
-     * Create the ShareSet automatically, and then go on to
-     * the regular constructor.
+    /** Construct a client for the filesystem at <code>namenode</code>.
      */
     public DistributedFileSystem(InetSocketAddress namenode, Configuration 
conf) throws IOException {
       super(conf);
@@ -90,27 +85,19 @@
         return dfs.delete(getPath(f));
     }
 
-    /**
-     */
     public boolean exists(File f) throws IOException {
         return dfs.exists(getPath(f));
     }
 
-    /**
-     */
     public boolean isDirectory(File f) throws IOException {
         return dfs.isDirectory(getPath(f));
     }
 
-    /**
-     */
     public long getLength(File f) throws IOException {
         DFSFileInfo info[] = dfs.listFiles(getPath(f));
         return info[0].getLen();
     }
 
-    /**
-     */
     public File[] listFilesRaw(File f) throws IOException {
         DFSFileInfo info[] = dfs.listFiles(getPath(f));
         if (info == null) {
@@ -124,36 +111,22 @@
         }
     }
 
-    /**
-     */
     public void mkdirs(File f) throws IOException {
         dfs.mkdirs(getPath(f));
     }
 
-    /**
-     * Obtain a filesystem lock at File f.
-     */
     public void lock(File f, boolean shared) throws IOException {
         dfs.lock(getPath(f), ! shared);
     }
 
-    /**
-     * Release a held lock
-     */
     public void release(File f) throws IOException {
         dfs.release(getPath(f));
     }
 
-    /**
-     * Remove the src when finished.
-     */
     public void moveFromLocalFile(File src, File dst) throws IOException {
         doFromLocalFile(src, dst, true);
     }
 
-    /**
-     * keep the src when finished.
-     */
     public void copyFromLocalFile(File src, File dst) throws IOException {
         doFromLocalFile(src, dst, false);
     }
@@ -177,7 +150,7 @@
                 doFromLocalFile(contents[i], new File(dst, 
contents[i].getName()), deleteSource);
             }
         } else {
-            byte buf[] = new byte[this.conf.getInt("io.file.buffer.size", 
4096)];
+            byte buf[] = new byte[getConf().getInt("io.file.buffer.size", 
4096)];
             InputStream in = new BufferedInputStream(new FileInputStream(src));
             try {
                 OutputStream out = create(dst);
@@ -198,10 +171,6 @@
             src.delete();
     }
 
-    /**
-     * Takes a hierarchy of files from the FS system and writes to
-     * the given local target.
-     */
     public void copyToLocalFile(File src, File dst) throws IOException {
         if (dst.exists()) {
             if (! dst.isDirectory()) {
@@ -222,10 +191,10 @@
                 copyToLocalFile(contents[i], new File(dst, 
contents[i].getName()));
             }
         } else {
-            byte buf[] = new byte[this.conf.getInt("io.file.buffer.size", 
4096)];
+            byte buf[] = new byte[getConf().getInt("io.file.buffer.size", 
4096)];
             InputStream in = open(src);
             try {
-                OutputStream out = FileSystem.getNamed("local", 
this.conf).create(dst);
+                OutputStream out = FileSystem.getNamed("local", 
getConf()).create(dst);
                 try {
                     int bytesRead = in.read(buf);
                     while (bytesRead >= 0) {
@@ -241,10 +210,6 @@
         }
     }
 
-    /**
-     * Output will go to the tmp working area.  There may be some source
-     * material that we obtain first.
-     */
     public File startLocalOutput(File fsOutputFile, File tmpLocalFile) throws 
IOException {
         if (exists(fsOutputFile)) {
             copyToLocalFile(fsOutputFile, tmpLocalFile);
@@ -272,25 +237,18 @@
      */
     public void completeLocalInput(File localFile) throws IOException {
         // Get rid of the local copy - we don't need it anymore.
-        FileUtil.fullyDelete(localFile, this.conf);
+        FileUtil.fullyDelete(localFile, getConf());
     }
 
-    /**
-     * Shut down the FS.  Not necessary for regular filesystem.
-     */
     public void close() throws IOException {
         dfs.close();
     }
 
-    /**
-     */
     public String toString() {
         return "DFS[" + dfs + "]";
     }
 
-    /**
-     */
-    public DFSClient getClient() {
+    DFSClient getClient() {
         return dfs;
     }
     
@@ -321,4 +279,9 @@
       // directory on their datanode, and then re-replicate the blocks, so that
       // no data is lost. a task may fail, but on retry it should succeed.
     }
+
+    public long getBlockSize() {
+      return dfs.BLOCK_SIZE;
+    }
+
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java?rev=378058&r1=378057&r2=378058&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java Wed Feb 
15 10:50:55 2006
@@ -43,7 +43,7 @@
  * implementation is [EMAIL PROTECTED] DistributedFileSystem}.
  * @author Mike Cafarella
  *****************************************************************/
-public abstract class FileSystem {
+public abstract class FileSystem extends Configured {
     public static final Logger LOG = 
LogFormatter.getLogger("org.apache.hadoop.dfs.DistributedFileSystem");
 
     private static final HashMap NAME_TO_FS = new HashMap();
@@ -87,7 +87,6 @@
       return getNamed(conf.get("fs.default.name", "local"), conf);
     }
 
-    protected Configuration conf;
     /** Returns a name for this filesystem, suitable to pass to [EMAIL 
PROTECTED]
      * FileSystem#getNamed(String,Configuration)}.*/
     public abstract String getName();
@@ -96,7 +95,6 @@
      * host:port pair, naming an DFS name server.*/
     public static FileSystem getNamed(String name, Configuration conf) throws 
IOException {
       FileSystem fs = (FileSystem)NAME_TO_FS.get(name);
-      int ioFileBufferSize = conf.getInt("io.file.buffer.size", 4096);
       if (fs == null) {
         if ("local".equals(name)) {
           fs = new LocalFileSystem(conf);
@@ -122,10 +120,9 @@
     ///////////////////////////////////////////////////////////////
     // FileSystem
     ///////////////////////////////////////////////////////////////
-    /**
-     */
-    public FileSystem(Configuration conf) {
-        this.conf = conf;
+
+    protected FileSystem(Configuration conf) {
+      super(conf);
     }
 
     /**
@@ -146,7 +143,7 @@
      * @param bufferSize the size of the buffer to be used.
      */
     public FSDataInputStream open(File f, int bufferSize) throws IOException {
-      return new FSDataInputStream(this, f, bufferSize, this.conf);
+      return new FSDataInputStream(this, f, bufferSize, getConf());
     }
     
     /**
@@ -154,7 +151,7 @@
      * @param f the file to open
      */
     public FSDataInputStream open(File f) throws IOException {
-      return new FSDataInputStream(this, f, conf);
+      return new FSDataInputStream(this, f, getConf());
     }
 
     /**
@@ -168,7 +165,7 @@
      * Files are overwritten by default.
      */
     public FSDataOutputStream create(File f) throws IOException {
-      return create(f, true,this.conf.getInt("io.file.buffer.size", 4096));
+      return create(f, true, getConf().getInt("io.file.buffer.size", 4096));
     }
 
     /**
@@ -180,7 +177,7 @@
      */
     public FSDataOutputStream create(File f, boolean overwrite,
                                       int bufferSize) throws IOException {
-      return new FSDataOutputStream(this, f, overwrite, this.conf);
+      return new FSDataOutputStream(this, f, overwrite, getConf());
     }
 
     /** Opens an OutputStream at the indicated File.
@@ -256,8 +253,10 @@
      */
     public abstract boolean exists(File f) throws IOException;
 
+    /** True iff the named path is a directory. */
     public abstract boolean isDirectory(File f) throws IOException;
 
+    /** True iff the named path is a regular file. */
     public boolean isFile(File f) throws IOException {
         if (exists(f) && ! isDirectory(f)) {
             return true;
@@ -266,8 +265,10 @@
         }
     }
     
+    /** The number of bytes in a file. */
     public abstract long getLength(File f) throws IOException;
 
+    /** List files in a directory. */
     public File[] listFiles(File f) throws IOException {
       return listFiles(f, new FileFilter() {
           public boolean accept(File file) {
@@ -276,8 +277,10 @@
         });
     }
 
+    /** List files in a directory. */
     public abstract File[] listFilesRaw(File f) throws IOException;
 
+    /** Filter files in a directory. */
     public File[] listFiles(File f, FileFilter filter) throws IOException {
         Vector results = new Vector();
         File listing[] = listFilesRaw(f);
@@ -311,7 +314,6 @@
      * The src file is on the local disk.  Add it to FS at
      * the given dst name and the source is kept intact afterwards
      */
-    // not implemneted yet
     public abstract void copyFromLocalFile(File src, File dst) throws 
IOException;
 
     /**
@@ -382,5 +384,9 @@
     public abstract void reportChecksumFailure(File f, FSInputStream in,
                                                long start, long length,
                                                int crc);
+
+    /** Return the number of bytes that large input files should be optimally
+     * be split into to minimize i/o time. */
+    public abstract long getBlockSize();
 
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java?rev=378058&r1=378057&r2=378058&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java Wed 
Feb 15 10:50:55 2006
@@ -25,9 +25,7 @@
 import org.apache.hadoop.io.UTF8;
 
 /****************************************************************
- * Implement the FileSystem interface for the local disk.
- * This is pretty easy.  The interface exists so we can use either
- * remote or local Files very easily.
+ * Implement the FileSystem API for the native filesystem.
  *
  * @author Mike Cafarella
  *****************************************************************/
@@ -38,8 +36,7 @@
     // by default use copy/delete instead of rename
     boolean useCopyForRename = true;
     
-    /**
-     */
+    /** Construct a local filesystem client. */
     public LocalFileSystem(Configuration conf) throws IOException {
         super(conf);
         // if you find an OS which reliably supports non-POSIX
@@ -111,9 +108,6 @@
         public long skip(long n) throws IOException { return fis.skip(n); }
     }
     
-    /**
-     * Open the file at f
-     */
     public FSInputStream openRaw(File f) throws IOException {
         if (! f.exists()) {
             throw new FileNotFoundException(f.toString());
@@ -169,58 +163,39 @@
         return new LocalFSFileOutputStream(f);
     }
 
-    /**
-     * Rename files/dirs
-     */
     public boolean renameRaw(File src, File dst) throws IOException {
         if (useCopyForRename) {
-            FileUtil.copyContents(this, src, dst, true, conf);
+            FileUtil.copyContents(this, src, dst, true, getConf());
             return fullyDelete(src);
         } else return src.renameTo(dst);
     }
 
-    /**
-     * Get rid of File f, whether a true file or dir.
-     */
     public boolean deleteRaw(File f) throws IOException {
         if (f.isFile()) {
             return f.delete();
         } else return fullyDelete(f);
     }
 
-    /**
-     */
     public boolean exists(File f) throws IOException {
         return f.exists();
     }
 
-    /**
-     */
     public boolean isDirectory(File f) throws IOException {
         return f.isDirectory();
     }
 
-    /**
-     */
     public long getLength(File f) throws IOException {
         return f.length();
     }
 
-    /**
-     */
     public File[] listFilesRaw(File f) throws IOException {
         return f.listFiles();
     }
 
-    /**
-     */
     public void mkdirs(File f) throws IOException {
         f.mkdirs();
     }
 
-    /**
-     * Obtain a filesystem lock at File f.
-     */
     public synchronized void lock(File f, boolean shared) throws IOException {
         f.createNewFile();
 
@@ -237,9 +212,6 @@
         lockObjSet.put(f, lockObj);
     }
 
-    /**
-     * Release a held lock
-     */
     public synchronized void release(File f) throws IOException {
         FileLock lockObj = (FileLock) lockObjSet.get(f);
         FileInputStream sharedLockData = (FileInputStream) 
sharedLockDataSet.get(f);
@@ -263,71 +235,51 @@
         }
     }
 
-    /**
-     * In the case of the local filesystem, we can just rename the file.
-     */
+    // In the case of the local filesystem, we can just rename the file.
     public void moveFromLocalFile(File src, File dst) throws IOException {
         if (! src.equals(dst)) {
             if (useCopyForRename) {
-                FileUtil.copyContents(this, src, dst, true, this.conf);
+                FileUtil.copyContents(this, src, dst, true, getConf());
                 fullyDelete(src);
             } else src.renameTo(dst);
         }
     }
 
-    /**
-     * Similar to moveFromLocalFile(), except the source is kept intact.
-     */
+    // Similar to moveFromLocalFile(), except the source is kept intact.
     public void copyFromLocalFile(File src, File dst) throws IOException {
         if (! src.equals(dst)) {
-            FileUtil.copyContents(this, src, dst, true, this.conf);
+            FileUtil.copyContents(this, src, dst, true, getConf());
         }
     }
 
-    /**
-     * We can't delete the src file in this case.  Too bad.
-     */
+    // We can't delete the src file in this case.  Too bad.
     public void copyToLocalFile(File src, File dst) throws IOException {
         if (! src.equals(dst)) {
-            FileUtil.copyContents(this, src, dst, true, this.conf);
+            FileUtil.copyContents(this, src, dst, true, getConf());
         }
     }
 
-    /**
-     * We can write output directly to the final location
-     */
+    // We can write output directly to the final location
     public File startLocalOutput(File fsOutputFile, File tmpLocalFile) throws 
IOException {
         return fsOutputFile;
     }
 
-    /**
-     * It's in the right place - nothing to do.
-     */
+    // It's in the right place - nothing to do.
     public void completeLocalOutput(File fsWorkingFile, File tmpLocalFile) 
throws IOException {
     }
 
-    /**
-     * We can read directly from the real local fs.
-     */
+    // We can read directly from the real local fs.
     public File startLocalInput(File fsInputFile, File tmpLocalFile) throws 
IOException {
         return fsInputFile;
     }
 
-    /**
-     * We're done reading.  Nothing to clean up.
-     */
+    // We're done reading.  Nothing to clean up.
     public void completeLocalInput(File localFile) throws IOException {
         // Ignore the file, it's at the right destination!
     }
 
-    /**
-     * Shut down the FS.  Not necessary for regular filesystem.
-     */
-    public void close() throws IOException {
-    }
+    public void close() throws IOException {}
 
-    /**
-     */
     public String toString() {
         return "LocalFS";
     }
@@ -391,5 +343,11 @@
         LOG.warning("Error moving bad file " + f + ": " + e);
       }
     }
+
+    public long getBlockSize() {
+      // default to 32MB: large enough to minimize the impact of seeks
+      return getConf().getLong("fs.local.block.size", 32 * 1024 * 1024);
+    }
+
 
 }

Modified: 
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java?rev=378058&r1=378057&r2=378058&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java 
Wed Feb 15 10:50:55 2006
@@ -105,7 +105,17 @@
       totalSize += fs.getLength(files[i]);
     }
 
-    long bytesPerSplit = Math.max(totalSize / numSplits, minSplitSize);
+    long bytesPerSplit = totalSize / numSplits;   // start w/ desired num 
splits
+
+    long fsBlockSize = fs.getBlockSize();
+    if (bytesPerSplit > fsBlockSize) {            // no larger than fs blocks
+      bytesPerSplit = fsBlockSize;
+    }
+
+    if (bytesPerSplit < minSplitSize) {           // no smaller than min size
+      bytesPerSplit = minSplitSize;
+    }
+
     long maxPerSplit = bytesPerSplit + (long)(bytesPerSplit*SPLIT_SLOP);
 
     //LOG.info("bytesPerSplit = " + bytesPerSplit);


Reply via email to