Author: cutting
Date: Mon May 15 14:26:04 2006
New Revision: 406746

URL: http://svn.apache.org/viewcvs?rev=406746&view=rev
Log:
HADOOP-212.  Permit alteration of the file block size in DFS.  Contributed by 
Owen.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/conf/hadoop-default.xml
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSFileInfo.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSck.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DfsPath.java
    
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DistributedFSCheck.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/CHANGES.txt?rev=406746&r1=406745&r2=406746&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon May 15 14:26:04 2006
@@ -29,6 +29,12 @@
  7. HADOOP-219. Fix a NullPointerException when handling a checksum
     exception under SequenceFile.Sorter.sort().  (cutting & stack)
 
+ 8. HADOOP-212. Permit alteration of the file block size in DFS.  The
+    default block size for new files may now be specified in the
+    configuration with the dfs.block.size property.  The block size
+    may also be specified when files are opened.
+    (omalley via cutting)
+
 
 Release 0.2.1 - 2006-05-12
 

Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/conf/hadoop-default.xml?rev=406746&r1=406745&r2=406746&view=diff
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Mon May 15 14:26:04 2006
@@ -135,6 +135,12 @@
 </property>
 
 <property>
+  <name>dfs.block.size</name>
+  <value>67108864</value>
+  <description>The default block size for new files.</description>
+</property>
+
+<property>
   <name>dfs.df.interval</name>
   <value>3000</value>
   <description>Disk usage statistics refresh interval in msec.</description>

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java?rev=406746&r1=406745&r2=406746&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java Mon May 15 
14:26:04 2006
@@ -110,10 +110,9 @@
     public void readFields(DataInput in) throws IOException {
         this.blkid = in.readLong();
         this.len = in.readLong();
-        if( len < 0 || len > FSConstants.BLOCK_SIZE )
-          throw new IOException("Unexpected block size: " + len 
-                                + ". System block size = " 
-                                + FSConstants.BLOCK_SIZE + ".");
+        if( len < 0 ) {
+          throw new IOException("Unexpected block size: " + len);
+        }
     }
 
     /////////////////////////////////////

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java?rev=406746&r1=406745&r2=406746&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java Mon 
May 15 14:26:04 2006
@@ -55,7 +55,8 @@
                                 String clientName, 
                                 String clientMachine, 
                                 boolean overwrite, 
-                                short replication
+                                short replication,
+                                long blockSize
                               ) throws IOException;
 
     /**
@@ -231,4 +232,12 @@
      * One DatanodeInfo object is returned for each DataNode.
      */
     public DatanodeInfo[] getDatanodeReport() throws IOException;
+
+    /**
+     * Get the block size for the given file.
+     * @param filename The name of the file
+     * @return The number of bytes in each block
+     * @throws IOException
+     */
+    public long getBlockSize(String filename) throws IOException;
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=406746&r1=406745&r2=406746&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Mon May 
15 14:26:04 2006
@@ -41,6 +41,7 @@
 class DFSClient implements FSConstants {
     public static final Logger LOG = 
LogFormatter.getLogger("org.apache.hadoop.fs.DFSClient");
     static int MAX_BLOCK_ACQUIRE_FAILURES = 3;
+    private static final long DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024;
     ClientProtocol namenode;
     String localName;
     boolean running = true;
@@ -48,6 +49,8 @@
     String clientName;
     Daemon leaseChecker;
     private Configuration conf;
+    private long defaultBlockSize;
+    private short defaultReplication;
     
     // required for unknown reason to make WritableFactories work distributed
     static { new DFSFileInfo(); }
@@ -110,6 +113,8 @@
         } else {
             this.clientName = "DFSClient_" + r.nextInt();
         }
+        defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+        defaultReplication = (short) conf.getInt("dfs.replication", 3);
         this.leaseChecker = new Daemon(new LeaseChecker());
         this.leaseChecker.start();
     }
@@ -151,6 +156,37 @@
     }
 
     /**
+     * Get the default block size for this cluster
+     * @return the default block size in bytes
+     */
+    public long getDefaultBlockSize() {
+      return defaultBlockSize;
+    }
+    
+    public long getBlockSize(Path f) throws IOException {
+      // if we already know the answer, use it.
+      if (f instanceof DfsPath) {
+        return ((DfsPath) f).getBlockSize();
+      }
+      int retries = 4;
+      while (true) {
+        try {
+          return namenode.getBlockSize(f.toString());
+        } catch (IOException ie) {
+          LOG.info("Problem getting block size: " + 
+                   StringUtils.stringifyException(ie));
+          if (--retries == 0) {
+            throw ie;
+          }
+        }
+      }
+    }
+    
+    public short getDefaultReplication() {
+      return defaultReplication;
+    }
+    
+    /**
      * Get hints about the location of the indicated block(s).  The
      * array returned is as long as there are blocks in the indicated
      * range.  Each block may have one or more locations.
@@ -182,7 +218,7 @@
     public FSOutputStream create( UTF8 src, 
                                   boolean overwrite
                                 ) throws IOException {
-      return create( src, overwrite, (short)conf.getInt("dfs.replication", 3));
+      return create( src, overwrite, defaultReplication, defaultBlockSize);
     }
     
     /**
@@ -197,10 +233,12 @@
      */
     public FSOutputStream create( UTF8 src, 
                                   boolean overwrite, 
-                                  short replication
+                                  short replication,
+                                  long blockSize
                                 ) throws IOException {
       checkOpen();
-      FSOutputStream result = new DFSOutputStream(src, overwrite, replication);
+      FSOutputStream result = new DFSOutputStream(src, overwrite, 
+                                                  replication, blockSize);
       synchronized (pendingCreates) {
         pendingCreates.put(src.toString(), result);
       }
@@ -672,15 +710,19 @@
         private long filePos = 0;
         private int bytesWrittenToBlock = 0;
         private String datanodeName;
+        private long blockSize;
 
         /**
          * Create a new output stream to the given DataNode.
          */
-        public DFSOutputStream(UTF8 src, boolean overwrite, short replication) 
throws IOException {
+        public DFSOutputStream(UTF8 src, boolean overwrite, 
+                               short replication, long blockSize
+                               ) throws IOException {
             this.src = src;
             this.overwrite = overwrite;
             this.replication = replication;
             this.backupFile = newBackupFile();
+            this.blockSize = blockSize;
             this.backupStream = new FileOutputStream(backupFile);
         }
 
@@ -766,7 +808,7 @@
             while (true) {
               try {
                 return namenode.create(src.toString(), clientName.toString(),
-                    localName, overwrite, replication);
+                    localName, overwrite, replication, blockSize);
               } catch (RemoteException e) {
                 if (--retries == 0 || 
                     
"org.apache.hadoop.dfs.NameNode.AlreadyBeingCreatedException".
@@ -835,7 +877,7 @@
                 throw new IOException("Stream closed");
             }
 
-            if ((bytesWrittenToBlock + pos == BLOCK_SIZE) ||
+            if ((bytesWrittenToBlock + pos == blockSize) ||
                 (pos >= BUFFER_SIZE)) {
                 flush();
             }
@@ -861,7 +903,7 @@
               len -= toWrite;
               filePos += toWrite;
 
-              if ((bytesWrittenToBlock + pos >= BLOCK_SIZE) ||
+              if ((bytesWrittenToBlock + pos >= blockSize) ||
                   (pos == BUFFER_SIZE)) {
                 flush();
               }
@@ -877,10 +919,10 @@
                 throw new IOException("Stream closed");
             }
 
-            if (bytesWrittenToBlock + pos >= BLOCK_SIZE) {
-                flushData(BLOCK_SIZE - bytesWrittenToBlock);
+            if (bytesWrittenToBlock + pos >= blockSize) {
+                flushData((int) blockSize - bytesWrittenToBlock);
             }
-            if (bytesWrittenToBlock == BLOCK_SIZE) {
+            if (bytesWrittenToBlock == blockSize) {
                 endBlock();
             }
             flushData(pos);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSFileInfo.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSFileInfo.java?rev=406746&r1=406745&r2=406746&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSFileInfo.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSFileInfo.java Mon May 
15 14:26:04 2006
@@ -40,6 +40,7 @@
     long contentsLen;
     boolean isDir;
     short blockReplication;
+    long blockSize;
 
     /**
      */
@@ -58,6 +59,7 @@
         } else 
           this.len = this.contentsLen = node.computeFileLength();
         this.blockReplication = node.getReplication();
+        blockSize = node.getBlockSize();
     }
 
     /**
@@ -102,6 +104,14 @@
       return this.blockReplication;
     }
 
+    /**
+     * Get the block size of the file.
+     * @return the number of bytes
+     */
+    public long getBlockSize() {
+      return blockSize;
+    }
+    
     //////////////////////////////////////////////////
     // Writable
     //////////////////////////////////////////////////
@@ -111,6 +121,7 @@
         out.writeLong(contentsLen);
         out.writeBoolean(isDir);
         out.writeShort(blockReplication);
+        out.writeLong(blockSize);
     }
 
     public void readFields(DataInput in) throws IOException {
@@ -120,6 +131,7 @@
         this.contentsLen = in.readLong();
         this.isDir = in.readBoolean();
         this.blockReplication = in.readShort();
+        blockSize = in.readLong();
     }
 }
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSck.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSck.java?rev=406746&r1=406745&r2=406746&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSck.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSck.java Mon May 15 
14:26:04 2006
@@ -106,7 +106,7 @@
   public Result fsck(String path) throws Exception {
     DFSFileInfo[] files = dfs.listPaths(new UTF8(path));
     Result res = new Result();
-    res.setReplication(conf.getInt("dfs.replication", 3));
+    res.setReplication(dfs.getDefaultReplication());
     for (int i = 0; i < files.length; i++) {
       check(files[i], res);
     }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DfsPath.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DfsPath.java?rev=406746&r1=406745&r2=406746&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DfsPath.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DfsPath.java Mon May 15 
14:26:04 2006
@@ -18,7 +18,11 @@
 import org.apache.hadoop.fs.Path;
 
 
-/** DfsPath is a Path that's been annotated with some extra information. */
+/** 
+ * DfsPath is a Path that's been annotated with some extra information.
+ * The point of this class is to pass back the "common" metadata about 
+ * a file with the names in a directory listing to make accesses faster.
+ */
 class DfsPath extends Path {
     DFSFileInfo info;
 
@@ -41,5 +45,8 @@
     }
     public short getReplication() {
       return info.getReplication();
+    }
+    public long getBlockSize() {
+      return info.getBlockSize();
     }
 }

Modified: 
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java?rev=406746&r1=406745&r2=406746&view=diff
==============================================================================
--- 
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java 
(original)
+++ 
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java 
Mon May 15 14:26:04 2006
@@ -54,6 +54,18 @@
       return workingDir;
     }
     
+    public long getDefaultBlockSize() {
+      return dfs.getDefaultBlockSize();
+    }
+    
+    public long getBlockSize(Path f) throws IOException {
+      return dfs.getBlockSize(f);
+    }
+    
+    public short getDefaultReplication() {
+      return dfs.getDefaultReplication();
+    }
+    
     private Path makeAbsolute(Path f) {
       if (f.isAbsolute()) {
         return f;
@@ -78,9 +90,10 @@
       return dfs.open(getPath(f));
     }
 
-    public FSOutputStream createRaw(Path f, boolean overwrite, short 
replication)
+    public FSOutputStream createRaw(Path f, boolean overwrite, 
+                                    short replication, long blockSize)
       throws IOException {
-      return dfs.create(getPath(f), overwrite, replication);
+      return dfs.create(getPath(f), overwrite, replication, blockSize);
     }
 
     public boolean setReplicationRaw( Path src, 
@@ -203,10 +216,6 @@
       // FIXME: we should move the bad block(s) involved to a bad block
       // directory on their datanode, and then re-replicate the blocks, so that
       // no data is lost. a task may fail, but on retry it should succeed.
-    }
-
-    public long getBlockSize() {
-      return FSConstants.BLOCK_SIZE;
     }
 
     /** Return the total raw capacity of the filesystem, disregarding

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?rev=406746&r1=406745&r2=406746&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Mon May 
15 14:26:04 2006
@@ -23,7 +23,6 @@
  * @author Mike Cafarella
  ************************************/
 interface FSConstants {
-    public static int BLOCK_SIZE = 32 * 1000 * 1000;
     public static int MIN_BLOCKS_FOR_WRITE = 5;
 
     public static final long WRITE_COMPLETE = 0xcafae11a;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java?rev=406746&r1=406745&r2=406746&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java Mon May 
15 14:26:04 2006
@@ -263,6 +263,7 @@
         if (isValidBlock(b)) {
             throw new IOException("Block " + b + " is valid, and cannot be 
written to.");
         }
+        long blockSize = b.getNumBytes();
 
         //
         // Serialize access to /tmp, and check if file already there.
@@ -279,7 +280,7 @@
             //
             // Check if we have too little space
             //
-            if (getRemaining() < BLOCK_SIZE) {
+            if (getRemaining() < blockSize) {
                 throw new IOException("Insufficient space for an additional 
block");
             }
 
@@ -288,7 +289,7 @@
             // 'reserved' size, & create file
             //
             ongoingCreates.add(b);
-            reserved += BLOCK_SIZE;
+            reserved += blockSize;
             f = getTmpFile(b);
            try {
                if (f.exists()) {
@@ -304,7 +305,7 @@
            } catch (IOException ie) {
                 System.out.println("Exception!  " + ie);
                ongoingCreates.remove(b);               
-               reserved -= BLOCK_SIZE;
+               reserved -= blockSize;
                 throw ie;
            }
         }
@@ -358,7 +359,7 @@
             if (! ongoingCreates.remove(b)) {
                 throw new IOException("Tried to finalize block " + b + ", but 
could not find it in ongoingCreates after file-move!");
             } 
-            reserved -= BLOCK_SIZE;
+            reserved -= b.getNumBytes();
         }
     }
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java?rev=406746&r1=406745&r2=406746&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java Mon May 
15 14:26:04 2006
@@ -241,6 +241,18 @@
         }
 
         /**
+         * Get the block size of the first block
+         * @return the number of bytes
+         */
+        public long getBlockSize() {
+          if (blocks == null || blocks.length == 0) {
+            return 0;
+          } else {
+            return blocks[0].getNumBytes();
+          }
+        }
+        
+        /**
          */
         void listContents(Vector v) {
             if (parent != null && blocks != null) {
@@ -747,7 +759,27 @@
       }
       return fileBlocks;
     }
-                                 
+
+    /**
+     * Get the blocksize of a file
+     * @param filename the filename
+     * @return the number of bytes in the first block
+     * @throws IOException if it is a directory or does not exist.
+     */
+    public long getBlockSize(String filename) throws IOException {
+      synchronized (rootDir) {
+        INode fileNode = rootDir.getNode(filename);
+        if (fileNode == null) {
+          throw new IOException("Unknown file: " + filename);
+        }
+        if (fileNode.isDir()) {
+          throw new IOException("Getting block size of a directory: " + 
+                                filename);
+        }
+        return fileNode.getBlockSize();
+      }
+    }
+    
     /**
      * Remove the file from management, return blocks
      */

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=406746&r1=406745&r2=406746&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Mon 
May 15 14:26:04 2006
@@ -37,10 +37,6 @@
  ***************************************************/
 class FSNamesystem implements FSConstants {
     public static final Logger LOG = 
LogFormatter.getLogger("org.apache.hadoop.fs.FSNamesystem");
-    static {
-      // for debugging the pending Creates problems
-      LOG.setLevel(Level.FINE);
-    }
 
     //
     // Stores the correct file name hierarchy
@@ -276,6 +272,10 @@
       return true;
     }
     
+    public long getBlockSize(String filename) throws IOException {
+      return dir.getBlockSize(filename);
+    }
+    
     /**
      * Check whether the replication parameter is within the range
      * determined by system configuration.
@@ -312,7 +312,8 @@
                                             UTF8 holder, 
                                             UTF8 clientMachine, 
                                             boolean overwrite,
-                                            short replication 
+                                            short replication,
+                                            long blockSize
                                           ) throws IOException {
       NameNode.stateChangeLog.fine("DIR* NameSystem.startFile: file "
             +src+" for "+holder+" at "+clientMachine);
@@ -340,7 +341,8 @@
         }
 
         // Get the array of replication targets 
-        DatanodeInfo targets[] = chooseTargets(replication, null, 
clientMachine);
+        DatanodeInfo targets[] = chooseTargets(replication, null, 
+                                               clientMachine, blockSize);
         if (targets.length < this.minReplication) {
             throw new IOException("failed to create file "+src
                     +" on client " + clientMachine
@@ -351,6 +353,7 @@
         // Reserve space for this pending file
         pendingCreates.put(src, 
                            new FileUnderConstruction(replication, 
+                                                     blockSize,
                                                      holder,
                                                      clientMachine));
         NameNode.stateChangeLog.finer( "DIR* NameSystem.startFile: "
@@ -421,7 +424,7 @@
         
         // Get the array of replication targets 
         DatanodeInfo targets[] = chooseTargets(pendingFile.getReplication(), 
-            null, pendingFile.getClientMachine());
+            null, pendingFile.getClientMachine(), pendingFile.getBlockSize());
         if (targets.length < this.minReplication) {
           throw new IOException("File " + src + " could only be replicated to 
" +
                                 targets.length + " nodes, instead of " +
@@ -1400,6 +1403,7 @@
                     }
 
                     Block block = (Block) it.next();
+                    long blockSize = block.getNumBytes();
                     FSDirectory.INode fileINode = dir.getFileByBlock(block);
                     if( fileINode == null ) { // block does not belong to any 
file
                         it.remove();
@@ -1411,7 +1415,13 @@
                         if (containingNodes.contains(srcNode) 
                               && ( excessBlocks == null 
                                || ! excessBlocks.contains(block))) {
-                            DatanodeInfo targets[] = 
chooseTargets(Math.min(fileINode.getReplication() - containingNodes.size(), 
this.maxReplicationStreams - xmitsInProgress), containingNodes, null);
+                            DatanodeInfo targets[] = 
+                              
chooseTargets(Math.min(fileINode.getReplication() 
+                                                       - 
containingNodes.size(), 
+                                                     maxReplicationStreams
+                                                       - xmitsInProgress), 
+                                                     containingNodes, null, 
+                                                     blockSize);
                             if (targets.length > 0) {
                                 // Build items to return
                                 replicateBlocks.add(block);
@@ -1481,7 +1491,8 @@
      * considered targets.
      * @return array of DatanodeInfo instances uses as targets.
      */
-    DatanodeInfo[] chooseTargets(int desiredReplicates, TreeSet 
forbiddenNodes, UTF8 clientMachine) {
+    DatanodeInfo[] chooseTargets(int desiredReplicates, TreeSet forbiddenNodes,
+                                 UTF8 clientMachine, long blockSize) {
         if (desiredReplicates > datanodeMap.size()) {
           LOG.warning("Replication requested of "+desiredReplicates
                       +" is larger than cluster size ("+datanodeMap.size()
@@ -1493,7 +1504,8 @@
         Vector targets = new Vector();
 
         for (int i = 0; i < desiredReplicates; i++) {
-            DatanodeInfo target = chooseTarget(forbiddenNodes, alreadyChosen, 
clientMachine);
+            DatanodeInfo target = chooseTarget(forbiddenNodes, alreadyChosen, 
+                                               clientMachine, blockSize);
             if (target == null)
               break; // calling chooseTarget again won't help
             targets.add(target);
@@ -1514,7 +1526,8 @@
      * @return DatanodeInfo instance to use or null if something went wrong
      * (a log message is emitted if null is returned).
      */
-    DatanodeInfo chooseTarget(TreeSet forbidden1, TreeSet forbidden2, UTF8 
clientMachine) {
+    DatanodeInfo chooseTarget(TreeSet forbidden1, TreeSet forbidden2, 
+                              UTF8 clientMachine, long blockSize) {
         //
         // Check if there are any available targets at all
         //
@@ -1565,7 +1578,7 @@
                 for (Iterator it = targetList.iterator(); it.hasNext(); ) {
                     DatanodeInfo node = (DatanodeInfo) it.next();
                     if (clientMachine.equals(node.getHost())) {
-                        if (node.getRemaining() > BLOCK_SIZE * 
MIN_BLOCKS_FOR_WRITE) {
+                        if (node.getRemaining() > blockSize * 
MIN_BLOCKS_FOR_WRITE) {
                             return node;
                         }
                     }
@@ -1577,7 +1590,7 @@
             //
             for (Iterator it = targetList.iterator(); it.hasNext(); ) {
                 DatanodeInfo node = (DatanodeInfo) it.next();
-                if (node.getRemaining() > BLOCK_SIZE * MIN_BLOCKS_FOR_WRITE) {
+                if (node.getRemaining() > blockSize * MIN_BLOCKS_FOR_WRITE) {
                     return node;
                 }
             }
@@ -1589,7 +1602,7 @@
             //
             for (Iterator it = targetList.iterator(); it.hasNext(); ) {
                 DatanodeInfo node = (DatanodeInfo) it.next();
-                if (node.getRemaining() > BLOCK_SIZE) {
+                if (node.getRemaining() > blockSize) {
                     return node;
                 }
             }
@@ -1615,14 +1628,17 @@
      */
     private class FileUnderConstruction {
       private short blockReplication; // file replication
+      private long blockSize;
       private Vector blocks;
       private UTF8 clientName;         // lease holder
       private UTF8 clientMachine;
       
       FileUnderConstruction(short replication,
+                            long blockSize,
                             UTF8 clientName,
                             UTF8 clientMachine) throws IOException {
         this.blockReplication = replication;
+        this.blockSize = blockSize;
         this.blocks = new Vector();
         this.clientName = clientName;
         this.clientMachine = clientMachine;
@@ -1630,6 +1646,10 @@
       
       public short getReplication() {
         return this.blockReplication;
+      }
+      
+      public long getBlockSize() {
+        return blockSize;
       }
       
       public Vector getBlocks() {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?rev=406746&r1=406745&r2=406746&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Mon May 15 
14:26:04 2006
@@ -181,7 +181,8 @@
                                String clientName, 
                                String clientMachine, 
                                boolean overwrite,
-                               short replication
+                               short replication,
+                               long blockSize
     ) throws IOException {
        stateChangeLog.fine("*DIR* NameNode.create: file "
             +src+" for "+clientName+" at "+clientMachine);
@@ -189,7 +190,8 @@
                                                 new UTF8(clientName), 
                                                 new UTF8(clientMachine), 
                                                 overwrite,
-                                                replication);
+                                                replication,
+                                                blockSize);
         Block b = (Block) results[0];
         DatanodeInfo targets[] = (DatanodeInfo[]) results[1];
         return new LocatedBlock(b, targets);
@@ -279,6 +281,11 @@
             return results;
         }
     }
+    
+    public long getBlockSize(String filename) throws IOException {
+      return namesystem.getBlockSize(filename);
+    }
+    
     /**
      */
     public boolean rename(String src, String dst) throws IOException {

Modified: 
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java?rev=406746&r1=406745&r2=406746&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java 
Mon May 15 14:26:04 2006
@@ -38,12 +38,14 @@
                   Path file, 
                   boolean overwrite, 
                   short replication,
+                  long blockSize,
                   Configuration conf)
       throws IOException {
-      super(fs.createRaw(file, overwrite, replication));
+      super(fs.createRaw(file, overwrite, replication, blockSize));
       this.bytesPerSum = conf.getInt("io.bytes.per.checksum", 512);
       this.sums = new FSDataOutputStream(
-            fs.createRaw(FileSystem.getChecksumFile(file), true, replication), 
+            fs.createRaw(FileSystem.getChecksumFile(file), true, 
+                         replication, blockSize), 
             conf);
       sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length);
       sums.writeInt(this.bytesPerSum);
@@ -127,11 +129,11 @@
 
   public FSDataOutputStream(FileSystem fs, Path file,
                             boolean overwrite, Configuration conf,
-                            int bufferSize, short replication )
+                            int bufferSize, short replication, long blockSize )
   throws IOException {
     super(new Buffer(
             new PositionCache(
-                new Summer(fs, file, overwrite, replication, conf)), 
+                new Summer(fs, file, overwrite, replication, blockSize, 
conf)), 
             bufferSize));
   }
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java?rev=406746&r1=406745&r2=406746&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java Mon May 
15 14:26:04 2006
@@ -175,7 +175,8 @@
     public FSDataOutputStream create(Path f) throws IOException {
       return create(f, true, 
                     getConf().getInt("io.file.buffer.size", 4096),
-                    (short)getConf().getInt("dfs.replication", 3));
+                    getDefaultReplication(),
+                    getDefaultBlockSize());
     }
 
     /**
@@ -186,7 +187,8 @@
       throws IOException {
       return create(f, true, 
                     getConf().getInt("io.file.buffer.size", 4096),
-                    replication);
+                    replication,
+                    getDefaultBlockSize());
     }
 
     /**
@@ -201,7 +203,8 @@
                                       int bufferSize
                                     ) throws IOException {
       return create( f, overwrite, bufferSize, 
-                    (short)getConf().getInt("dfs.replication", 3));
+                     getDefaultReplication(),
+                     getDefaultBlockSize());
     }
     
     /**
@@ -215,10 +218,11 @@
     public FSDataOutputStream create( Path f, 
                                       boolean overwrite,
                                       int bufferSize,
-                                      short replication
+                                      short replication,
+                                      long blockSize
                                     ) throws IOException {
       return new FSDataOutputStream(this, f, overwrite, getConf(), 
-                                    bufferSize, replication );
+                                    bufferSize, replication, blockSize );
     }
 
     /** Opens an OutputStream at the indicated Path.
@@ -227,7 +231,9 @@
      *   the file will be overwritten, and if false an error will be thrown.
      * @param replication required block replication for the file. 
      */
-    public abstract FSOutputStream createRaw(Path f, boolean overwrite, short 
replication)
+    public abstract FSOutputStream createRaw(Path f, boolean overwrite, 
+                                             short replication,
+                                             long blockSize)
       throws IOException;
 
     /** @deprecated Call [EMAIL PROTECTED] #createNewFile(Path)} instead. */
@@ -547,8 +553,20 @@
                                                long start, long length,
                                                int crc);
 
+    /**
+     * Get the size for a particular file.
+     * @param f the filename
+     * @return the number of bytes in a block
+     */
+    public abstract long getBlockSize(Path f) throws IOException;
+    
     /** Return the number of bytes that large input files should be optimally
      * be split into to minimize i/o time. */
-    public abstract long getBlockSize();
+    public abstract long getDefaultBlockSize();
+    
+    /**
+     * Get the default replication.
+     */
+    public abstract short getDefaultReplication();
 
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java?rev=406746&r1=406745&r2=406746&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java Mon 
May 15 14:26:04 2006
@@ -161,7 +161,8 @@
       }
     }
 
-    public FSOutputStream createRaw(Path f, boolean overwrite, short 
replication)
+    public FSOutputStream createRaw(Path f, boolean overwrite, 
+                                    short replication, long blockSize)
       throws IOException {
         if (exists(f) && ! overwrite) {
             throw new IOException("File already exists:"+f);
@@ -356,9 +357,18 @@
       }
     }
 
-    public long getBlockSize() {
+    public long getDefaultBlockSize() {
       // default to 32MB: large enough to minimize the impact of seeks
       return getConf().getLong("fs.local.block.size", 32 * 1024 * 1024);
+    }
+
+    public long getBlockSize(Path filename) {
+      // local doesn't really do blocks, so just use the global number
+      return getDefaultBlockSize();
+    }
+    
+    public short getDefaultReplication() {
+      return 1;
     }
 
 

Modified: 
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java?rev=406746&r1=406745&r2=406746&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java 
Mon May 15 14:26:04 2006
@@ -32,7 +32,7 @@
   public static final Logger LOG =
     LogFormatter.getLogger("org.apache.hadoop.mapred.InputFormatBase");
 
-  private static final double SPLIT_SLOP = 0.1;   // 10% slop
+  private static final double SPLIT_SLOP = 1.1;   // 10% slop
 
   private long minSplitSize = 1;
 
@@ -117,34 +117,22 @@
       totalSize += fs.getLength(files[i]);
     }
 
-    long bytesPerSplit = totalSize / numSplits;   // start w/ desired num 
splits
+    long goalSize = totalSize / numSplits;   // start w/ desired num splits
 
-    long fsBlockSize = fs.getBlockSize();
-    if (bytesPerSplit > fsBlockSize) {            // no larger than fs blocks
-      bytesPerSplit = fsBlockSize;
-    }
-
-    long configuredMinSplitSize = job.getLong("mapred.min.split.size", 0);
-    if( configuredMinSplitSize < minSplitSize )
-       configuredMinSplitSize = minSplitSize;
-    if (bytesPerSplit < configuredMinSplitSize) { // no smaller than min size
-      bytesPerSplit = configuredMinSplitSize;
-    }
-
-    long maxPerSplit = bytesPerSplit + (long)(bytesPerSplit*SPLIT_SLOP);
-
-    //LOG.info("bytesPerSplit = " + bytesPerSplit);
-    //LOG.info("maxPerSplit = " + maxPerSplit);
+    long minSize = Math.max(job.getLong("mapred.min.split.size", 1),
+                            minSplitSize);
 
     ArrayList splits = new ArrayList(numSplits);  // generate splits
     for (int i = 0; i < files.length; i++) {
       Path file = files[i];
       long length = fs.getLength(file);
+      long blockSize = fs.getBlockSize(file);
+      long splitSize = computeSplitSize(goalSize, minSize, blockSize);
 
       long bytesRemaining = length;
-      while (bytesRemaining >= maxPerSplit) {
-        splits.add(new FileSplit(file, length-bytesRemaining, bytesPerSplit));
-        bytesRemaining -= bytesPerSplit;
+      while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
+        splits.add(new FileSplit(file, length-bytesRemaining, splitSize));
+        bytesRemaining -= splitSize;
       }
       
       if (bytesRemaining != 0) {
@@ -156,5 +144,9 @@
     return (FileSplit[])splits.toArray(new FileSplit[splits.size()]);
   }
 
+  private static long computeSplitSize(long goalSize, long minSize,
+                                       long blockSize) {
+    return Math.max(minSize, Math.min(goalSize, blockSize));
+  }
 }
 

Modified: 
lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DistributedFSCheck.java
URL: 
http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DistributedFSCheck.java?rev=406746&r1=406745&r2=406746&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DistributedFSCheck.java 
(original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DistributedFSCheck.java 
Mon May 15 14:26:04 2006
@@ -104,7 +104,7 @@
     if( ! fs.isDirectory(rootFile) ) {
       nrFiles++;
       // For a regular file generate <fName,offset> pairs
-      long blockSize = fs.getBlockSize();
+      long blockSize = fs.getDefaultBlockSize();
       long fileLength = fs.getLength( rootFile );
       for( long offset = 0; offset < fileLength; offset += blockSize )
         writer.append(new UTF8(rootFile.toString()), new LongWritable(offset));
@@ -136,7 +136,7 @@
       in = new DataInputStream(fs.open(new Path(name)));
       long actualSize = 0;
       try {
-        long blockSize = fs.getBlockSize();
+        long blockSize = fs.getDefaultBlockSize();
         int curSize = bufferSize;
         for(  actualSize = 0; 
               curSize == bufferSize && actualSize < blockSize;


Reply via email to