Author: cutting Date: Mon May 15 14:26:04 2006 New Revision: 406746 URL: http://svn.apache.org/viewcvs?rev=406746&view=rev Log: HADOOP-212. Permit alteration of the file block size in DFS. Contributed by Owen.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/conf/hadoop-default.xml lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSFileInfo.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSck.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DfsPath.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DistributedFSCheck.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/CHANGES.txt?rev=406746&r1=406745&r2=406746&view=diff ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Mon May 15 14:26:04 2006 @@ -29,6 +29,12 @@ 7. HADOOP-219. Fix a NullPointerException when handling a checksum exception under SequenceFile.Sorter.sort(). (cutting & stack) + 8. HADOOP-212. Permit alteration of the file block size in DFS. The + default block size for new files may now be specified in the + configuration with the dfs.block.size property. The block size + may also be specified when files are opened. + (omalley via cutting) + Release 0.2.1 - 2006-05-12 Modified: lucene/hadoop/trunk/conf/hadoop-default.xml URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/conf/hadoop-default.xml?rev=406746&r1=406745&r2=406746&view=diff ============================================================================== --- lucene/hadoop/trunk/conf/hadoop-default.xml (original) +++ lucene/hadoop/trunk/conf/hadoop-default.xml Mon May 15 14:26:04 2006 @@ -135,6 +135,12 @@ </property> <property> + <name>dfs.block.size</name> + <value>67108864</value> + <description>The default block size for new files.</description> +</property> + +<property> <name>dfs.df.interval</name> <value>3000</value> <description>Disk usage statistics refresh interval in msec.</description> Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java?rev=406746&r1=406745&r2=406746&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java Mon May 15 14:26:04 2006 @@ -110,10 +110,9 @@ public void readFields(DataInput in) throws IOException { this.blkid = in.readLong(); this.len = in.readLong(); - if( len < 0 || len > FSConstants.BLOCK_SIZE ) - throw new IOException("Unexpected block size: " + len - + ". System block size = " - + FSConstants.BLOCK_SIZE + "."); + if( len < 0 ) { + throw new IOException("Unexpected block size: " + len); + } } ///////////////////////////////////// Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java?rev=406746&r1=406745&r2=406746&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java Mon May 15 14:26:04 2006 @@ -55,7 +55,8 @@ String clientName, String clientMachine, boolean overwrite, - short replication + short replication, + long blockSize ) throws IOException; /** @@ -231,4 +232,12 @@ * One DatanodeInfo object is returned for each DataNode. */ public DatanodeInfo[] getDatanodeReport() throws IOException; + + /** + * Get the block size for the given file. + * @param filename The name of the file + * @return The number of bytes in each block + * @throws IOException + */ + public long getBlockSize(String filename) throws IOException; } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=406746&r1=406745&r2=406746&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Mon May 15 14:26:04 2006 @@ -41,6 +41,7 @@ class DFSClient implements FSConstants { public static final Logger LOG = LogFormatter.getLogger("org.apache.hadoop.fs.DFSClient"); static int MAX_BLOCK_ACQUIRE_FAILURES = 3; + private static final long DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024; ClientProtocol namenode; String localName; boolean running = true; @@ -48,6 +49,8 @@ String clientName; Daemon leaseChecker; private Configuration conf; + private long defaultBlockSize; + private short defaultReplication; // required for unknown reason to make WritableFactories work distributed static { new DFSFileInfo(); } @@ -110,6 +113,8 @@ } else { this.clientName = "DFSClient_" + r.nextInt(); } + defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE); + defaultReplication = (short) conf.getInt("dfs.replication", 3); this.leaseChecker = new Daemon(new LeaseChecker()); this.leaseChecker.start(); } @@ -151,6 +156,37 @@ } /** + * Get the default block size for this cluster + * @return the default block size in bytes + */ + public long getDefaultBlockSize() { + return defaultBlockSize; + } + + public long getBlockSize(Path f) throws IOException { + // if we already know the answer, use it. + if (f instanceof DfsPath) { + return ((DfsPath) f).getBlockSize(); + } + int retries = 4; + while (true) { + try { + return namenode.getBlockSize(f.toString()); + } catch (IOException ie) { + LOG.info("Problem getting block size: " + + StringUtils.stringifyException(ie)); + if (--retries == 0) { + throw ie; + } + } + } + } + + public short getDefaultReplication() { + return defaultReplication; + } + + /** * Get hints about the location of the indicated block(s). The * array returned is as long as there are blocks in the indicated * range. Each block may have one or more locations. @@ -182,7 +218,7 @@ public FSOutputStream create( UTF8 src, boolean overwrite ) throws IOException { - return create( src, overwrite, (short)conf.getInt("dfs.replication", 3)); + return create( src, overwrite, defaultReplication, defaultBlockSize); } /** @@ -197,10 +233,12 @@ */ public FSOutputStream create( UTF8 src, boolean overwrite, - short replication + short replication, + long blockSize ) throws IOException { checkOpen(); - FSOutputStream result = new DFSOutputStream(src, overwrite, replication); + FSOutputStream result = new DFSOutputStream(src, overwrite, + replication, blockSize); synchronized (pendingCreates) { pendingCreates.put(src.toString(), result); } @@ -672,15 +710,19 @@ private long filePos = 0; private int bytesWrittenToBlock = 0; private String datanodeName; + private long blockSize; /** * Create a new output stream to the given DataNode. */ - public DFSOutputStream(UTF8 src, boolean overwrite, short replication) throws IOException { + public DFSOutputStream(UTF8 src, boolean overwrite, + short replication, long blockSize + ) throws IOException { this.src = src; this.overwrite = overwrite; this.replication = replication; this.backupFile = newBackupFile(); + this.blockSize = blockSize; this.backupStream = new FileOutputStream(backupFile); } @@ -766,7 +808,7 @@ while (true) { try { return namenode.create(src.toString(), clientName.toString(), - localName, overwrite, replication); + localName, overwrite, replication, blockSize); } catch (RemoteException e) { if (--retries == 0 || "org.apache.hadoop.dfs.NameNode.AlreadyBeingCreatedException". @@ -835,7 +877,7 @@ throw new IOException("Stream closed"); } - if ((bytesWrittenToBlock + pos == BLOCK_SIZE) || + if ((bytesWrittenToBlock + pos == blockSize) || (pos >= BUFFER_SIZE)) { flush(); } @@ -861,7 +903,7 @@ len -= toWrite; filePos += toWrite; - if ((bytesWrittenToBlock + pos >= BLOCK_SIZE) || + if ((bytesWrittenToBlock + pos >= blockSize) || (pos == BUFFER_SIZE)) { flush(); } @@ -877,10 +919,10 @@ throw new IOException("Stream closed"); } - if (bytesWrittenToBlock + pos >= BLOCK_SIZE) { - flushData(BLOCK_SIZE - bytesWrittenToBlock); + if (bytesWrittenToBlock + pos >= blockSize) { + flushData((int) blockSize - bytesWrittenToBlock); } - if (bytesWrittenToBlock == BLOCK_SIZE) { + if (bytesWrittenToBlock == blockSize) { endBlock(); } flushData(pos); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSFileInfo.java URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSFileInfo.java?rev=406746&r1=406745&r2=406746&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSFileInfo.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSFileInfo.java Mon May 15 14:26:04 2006 @@ -40,6 +40,7 @@ long contentsLen; boolean isDir; short blockReplication; + long blockSize; /** */ @@ -58,6 +59,7 @@ } else this.len = this.contentsLen = node.computeFileLength(); this.blockReplication = node.getReplication(); + blockSize = node.getBlockSize(); } /** @@ -102,6 +104,14 @@ return this.blockReplication; } + /** + * Get the block size of the file. + * @return the number of bytes + */ + public long getBlockSize() { + return blockSize; + } + ////////////////////////////////////////////////// // Writable ////////////////////////////////////////////////// @@ -111,6 +121,7 @@ out.writeLong(contentsLen); out.writeBoolean(isDir); out.writeShort(blockReplication); + out.writeLong(blockSize); } public void readFields(DataInput in) throws IOException { @@ -120,6 +131,7 @@ this.contentsLen = in.readLong(); this.isDir = in.readBoolean(); this.blockReplication = in.readShort(); + blockSize = in.readLong(); } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSck.java URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSck.java?rev=406746&r1=406745&r2=406746&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSck.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSck.java Mon May 15 14:26:04 2006 @@ -106,7 +106,7 @@ public Result fsck(String path) throws Exception { DFSFileInfo[] files = dfs.listPaths(new UTF8(path)); Result res = new Result(); - res.setReplication(conf.getInt("dfs.replication", 3)); + res.setReplication(dfs.getDefaultReplication()); for (int i = 0; i < files.length; i++) { check(files[i], res); } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DfsPath.java URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DfsPath.java?rev=406746&r1=406745&r2=406746&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DfsPath.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DfsPath.java Mon May 15 14:26:04 2006 @@ -18,7 +18,11 @@ import org.apache.hadoop.fs.Path; -/** DfsPath is a Path that's been annotated with some extra information. */ +/** + * DfsPath is a Path that's been annotated with some extra information. + * The point of this class is to pass back the "common" metadata about + * a file with the names in a directory listing to make accesses faster. + */ class DfsPath extends Path { DFSFileInfo info; @@ -41,5 +45,8 @@ } public short getReplication() { return info.getReplication(); + } + public long getBlockSize() { + return info.getBlockSize(); } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java?rev=406746&r1=406745&r2=406746&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java Mon May 15 14:26:04 2006 @@ -54,6 +54,18 @@ return workingDir; } + public long getDefaultBlockSize() { + return dfs.getDefaultBlockSize(); + } + + public long getBlockSize(Path f) throws IOException { + return dfs.getBlockSize(f); + } + + public short getDefaultReplication() { + return dfs.getDefaultReplication(); + } + private Path makeAbsolute(Path f) { if (f.isAbsolute()) { return f; @@ -78,9 +90,10 @@ return dfs.open(getPath(f)); } - public FSOutputStream createRaw(Path f, boolean overwrite, short replication) + public FSOutputStream createRaw(Path f, boolean overwrite, + short replication, long blockSize) throws IOException { - return dfs.create(getPath(f), overwrite, replication); + return dfs.create(getPath(f), overwrite, replication, blockSize); } public boolean setReplicationRaw( Path src, @@ -203,10 +216,6 @@ // FIXME: we should move the bad block(s) involved to a bad block // directory on their datanode, and then re-replicate the blocks, so that // no data is lost. a task may fail, but on retry it should succeed. - } - - public long getBlockSize() { - return FSConstants.BLOCK_SIZE; } /** Return the total raw capacity of the filesystem, disregarding Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?rev=406746&r1=406745&r2=406746&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Mon May 15 14:26:04 2006 @@ -23,7 +23,6 @@ * @author Mike Cafarella ************************************/ interface FSConstants { - public static int BLOCK_SIZE = 32 * 1000 * 1000; public static int MIN_BLOCKS_FOR_WRITE = 5; public static final long WRITE_COMPLETE = 0xcafae11a; Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java?rev=406746&r1=406745&r2=406746&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java Mon May 15 14:26:04 2006 @@ -263,6 +263,7 @@ if (isValidBlock(b)) { throw new IOException("Block " + b + " is valid, and cannot be written to."); } + long blockSize = b.getNumBytes(); // // Serialize access to /tmp, and check if file already there. @@ -279,7 +280,7 @@ // // Check if we have too little space // - if (getRemaining() < BLOCK_SIZE) { + if (getRemaining() < blockSize) { throw new IOException("Insufficient space for an additional block"); } @@ -288,7 +289,7 @@ // 'reserved' size, & create file // ongoingCreates.add(b); - reserved += BLOCK_SIZE; + reserved += blockSize; f = getTmpFile(b); try { if (f.exists()) { @@ -304,7 +305,7 @@ } catch (IOException ie) { System.out.println("Exception! " + ie); ongoingCreates.remove(b); - reserved -= BLOCK_SIZE; + reserved -= blockSize; throw ie; } } @@ -358,7 +359,7 @@ if (! ongoingCreates.remove(b)) { throw new IOException("Tried to finalize block " + b + ", but could not find it in ongoingCreates after file-move!"); } - reserved -= BLOCK_SIZE; + reserved -= b.getNumBytes(); } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java?rev=406746&r1=406745&r2=406746&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java Mon May 15 14:26:04 2006 @@ -241,6 +241,18 @@ } /** + * Get the block size of the first block + * @return the number of bytes + */ + public long getBlockSize() { + if (blocks == null || blocks.length == 0) { + return 0; + } else { + return blocks[0].getNumBytes(); + } + } + + /** */ void listContents(Vector v) { if (parent != null && blocks != null) { @@ -747,7 +759,27 @@ } return fileBlocks; } - + + /** + * Get the blocksize of a file + * @param filename the filename + * @return the number of bytes in the first block + * @throws IOException if it is a directory or does not exist. + */ + public long getBlockSize(String filename) throws IOException { + synchronized (rootDir) { + INode fileNode = rootDir.getNode(filename); + if (fileNode == null) { + throw new IOException("Unknown file: " + filename); + } + if (fileNode.isDir()) { + throw new IOException("Getting block size of a directory: " + + filename); + } + return fileNode.getBlockSize(); + } + } + /** * Remove the file from management, return blocks */ Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=406746&r1=406745&r2=406746&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Mon May 15 14:26:04 2006 @@ -37,10 +37,6 @@ ***************************************************/ class FSNamesystem implements FSConstants { public static final Logger LOG = LogFormatter.getLogger("org.apache.hadoop.fs.FSNamesystem"); - static { - // for debugging the pending Creates problems - LOG.setLevel(Level.FINE); - } // // Stores the correct file name hierarchy @@ -276,6 +272,10 @@ return true; } + public long getBlockSize(String filename) throws IOException { + return dir.getBlockSize(filename); + } + /** * Check whether the replication parameter is within the range * determined by system configuration. @@ -312,7 +312,8 @@ UTF8 holder, UTF8 clientMachine, boolean overwrite, - short replication + short replication, + long blockSize ) throws IOException { NameNode.stateChangeLog.fine("DIR* NameSystem.startFile: file " +src+" for "+holder+" at "+clientMachine); @@ -340,7 +341,8 @@ } // Get the array of replication targets - DatanodeInfo targets[] = chooseTargets(replication, null, clientMachine); + DatanodeInfo targets[] = chooseTargets(replication, null, + clientMachine, blockSize); if (targets.length < this.minReplication) { throw new IOException("failed to create file "+src +" on client " + clientMachine @@ -351,6 +353,7 @@ // Reserve space for this pending file pendingCreates.put(src, new FileUnderConstruction(replication, + blockSize, holder, clientMachine)); NameNode.stateChangeLog.finer( "DIR* NameSystem.startFile: " @@ -421,7 +424,7 @@ // Get the array of replication targets DatanodeInfo targets[] = chooseTargets(pendingFile.getReplication(), - null, pendingFile.getClientMachine()); + null, pendingFile.getClientMachine(), pendingFile.getBlockSize()); if (targets.length < this.minReplication) { throw new IOException("File " + src + " could only be replicated to " + targets.length + " nodes, instead of " + @@ -1400,6 +1403,7 @@ } Block block = (Block) it.next(); + long blockSize = block.getNumBytes(); FSDirectory.INode fileINode = dir.getFileByBlock(block); if( fileINode == null ) { // block does not belong to any file it.remove(); @@ -1411,7 +1415,13 @@ if (containingNodes.contains(srcNode) && ( excessBlocks == null || ! excessBlocks.contains(block))) { - DatanodeInfo targets[] = chooseTargets(Math.min(fileINode.getReplication() - containingNodes.size(), this.maxReplicationStreams - xmitsInProgress), containingNodes, null); + DatanodeInfo targets[] = + chooseTargets(Math.min(fileINode.getReplication() + - containingNodes.size(), + maxReplicationStreams + - xmitsInProgress), + containingNodes, null, + blockSize); if (targets.length > 0) { // Build items to return replicateBlocks.add(block); @@ -1481,7 +1491,8 @@ * considered targets. * @return array of DatanodeInfo instances uses as targets. */ - DatanodeInfo[] chooseTargets(int desiredReplicates, TreeSet forbiddenNodes, UTF8 clientMachine) { + DatanodeInfo[] chooseTargets(int desiredReplicates, TreeSet forbiddenNodes, + UTF8 clientMachine, long blockSize) { if (desiredReplicates > datanodeMap.size()) { LOG.warning("Replication requested of "+desiredReplicates +" is larger than cluster size ("+datanodeMap.size() @@ -1493,7 +1504,8 @@ Vector targets = new Vector(); for (int i = 0; i < desiredReplicates; i++) { - DatanodeInfo target = chooseTarget(forbiddenNodes, alreadyChosen, clientMachine); + DatanodeInfo target = chooseTarget(forbiddenNodes, alreadyChosen, + clientMachine, blockSize); if (target == null) break; // calling chooseTarget again won't help targets.add(target); @@ -1514,7 +1526,8 @@ * @return DatanodeInfo instance to use or null if something went wrong * (a log message is emitted if null is returned). */ - DatanodeInfo chooseTarget(TreeSet forbidden1, TreeSet forbidden2, UTF8 clientMachine) { + DatanodeInfo chooseTarget(TreeSet forbidden1, TreeSet forbidden2, + UTF8 clientMachine, long blockSize) { // // Check if there are any available targets at all // @@ -1565,7 +1578,7 @@ for (Iterator it = targetList.iterator(); it.hasNext(); ) { DatanodeInfo node = (DatanodeInfo) it.next(); if (clientMachine.equals(node.getHost())) { - if (node.getRemaining() > BLOCK_SIZE * MIN_BLOCKS_FOR_WRITE) { + if (node.getRemaining() > blockSize * MIN_BLOCKS_FOR_WRITE) { return node; } } @@ -1577,7 +1590,7 @@ // for (Iterator it = targetList.iterator(); it.hasNext(); ) { DatanodeInfo node = (DatanodeInfo) it.next(); - if (node.getRemaining() > BLOCK_SIZE * MIN_BLOCKS_FOR_WRITE) { + if (node.getRemaining() > blockSize * MIN_BLOCKS_FOR_WRITE) { return node; } } @@ -1589,7 +1602,7 @@ // for (Iterator it = targetList.iterator(); it.hasNext(); ) { DatanodeInfo node = (DatanodeInfo) it.next(); - if (node.getRemaining() > BLOCK_SIZE) { + if (node.getRemaining() > blockSize) { return node; } } @@ -1615,14 +1628,17 @@ */ private class FileUnderConstruction { private short blockReplication; // file replication + private long blockSize; private Vector blocks; private UTF8 clientName; // lease holder private UTF8 clientMachine; FileUnderConstruction(short replication, + long blockSize, UTF8 clientName, UTF8 clientMachine) throws IOException { this.blockReplication = replication; + this.blockSize = blockSize; this.blocks = new Vector(); this.clientName = clientName; this.clientMachine = clientMachine; @@ -1630,6 +1646,10 @@ public short getReplication() { return this.blockReplication; + } + + public long getBlockSize() { + return blockSize; } public Vector getBlocks() { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?rev=406746&r1=406745&r2=406746&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Mon May 15 14:26:04 2006 @@ -181,7 +181,8 @@ String clientName, String clientMachine, boolean overwrite, - short replication + short replication, + long blockSize ) throws IOException { stateChangeLog.fine("*DIR* NameNode.create: file " +src+" for "+clientName+" at "+clientMachine); @@ -189,7 +190,8 @@ new UTF8(clientName), new UTF8(clientMachine), overwrite, - replication); + replication, + blockSize); Block b = (Block) results[0]; DatanodeInfo targets[] = (DatanodeInfo[]) results[1]; return new LocatedBlock(b, targets); @@ -279,6 +281,11 @@ return results; } } + + public long getBlockSize(String filename) throws IOException { + return namesystem.getBlockSize(filename); + } + /** */ public boolean rename(String src, String dst) throws IOException { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java?rev=406746&r1=406745&r2=406746&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java Mon May 15 14:26:04 2006 @@ -38,12 +38,14 @@ Path file, boolean overwrite, short replication, + long blockSize, Configuration conf) throws IOException { - super(fs.createRaw(file, overwrite, replication)); + super(fs.createRaw(file, overwrite, replication, blockSize)); this.bytesPerSum = conf.getInt("io.bytes.per.checksum", 512); this.sums = new FSDataOutputStream( - fs.createRaw(FileSystem.getChecksumFile(file), true, replication), + fs.createRaw(FileSystem.getChecksumFile(file), true, + replication, blockSize), conf); sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length); sums.writeInt(this.bytesPerSum); @@ -127,11 +129,11 @@ public FSDataOutputStream(FileSystem fs, Path file, boolean overwrite, Configuration conf, - int bufferSize, short replication ) + int bufferSize, short replication, long blockSize ) throws IOException { super(new Buffer( new PositionCache( - new Summer(fs, file, overwrite, replication, conf)), + new Summer(fs, file, overwrite, replication, blockSize, conf)), bufferSize)); } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java?rev=406746&r1=406745&r2=406746&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java Mon May 15 14:26:04 2006 @@ -175,7 +175,8 @@ public FSDataOutputStream create(Path f) throws IOException { return create(f, true, getConf().getInt("io.file.buffer.size", 4096), - (short)getConf().getInt("dfs.replication", 3)); + getDefaultReplication(), + getDefaultBlockSize()); } /** @@ -186,7 +187,8 @@ throws IOException { return create(f, true, getConf().getInt("io.file.buffer.size", 4096), - replication); + replication, + getDefaultBlockSize()); } /** @@ -201,7 +203,8 @@ int bufferSize ) throws IOException { return create( f, overwrite, bufferSize, - (short)getConf().getInt("dfs.replication", 3)); + getDefaultReplication(), + getDefaultBlockSize()); } /** @@ -215,10 +218,11 @@ public FSDataOutputStream create( Path f, boolean overwrite, int bufferSize, - short replication + short replication, + long blockSize ) throws IOException { return new FSDataOutputStream(this, f, overwrite, getConf(), - bufferSize, replication ); + bufferSize, replication, blockSize ); } /** Opens an OutputStream at the indicated Path. @@ -227,7 +231,9 @@ * the file will be overwritten, and if false an error will be thrown. * @param replication required block replication for the file. */ - public abstract FSOutputStream createRaw(Path f, boolean overwrite, short replication) + public abstract FSOutputStream createRaw(Path f, boolean overwrite, + short replication, + long blockSize) throws IOException; /** @deprecated Call [EMAIL PROTECTED] #createNewFile(Path)} instead. */ @@ -547,8 +553,20 @@ long start, long length, int crc); + /** + * Get the size for a particular file. + * @param f the filename + * @return the number of bytes in a block + */ + public abstract long getBlockSize(Path f) throws IOException; + /** Return the number of bytes that large input files should be optimally * be split into to minimize i/o time. */ - public abstract long getBlockSize(); + public abstract long getDefaultBlockSize(); + + /** + * Get the default replication. + */ + public abstract short getDefaultReplication(); } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java?rev=406746&r1=406745&r2=406746&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java Mon May 15 14:26:04 2006 @@ -161,7 +161,8 @@ } } - public FSOutputStream createRaw(Path f, boolean overwrite, short replication) + public FSOutputStream createRaw(Path f, boolean overwrite, + short replication, long blockSize) throws IOException { if (exists(f) && ! overwrite) { throw new IOException("File already exists:"+f); @@ -356,9 +357,18 @@ } } - public long getBlockSize() { + public long getDefaultBlockSize() { // default to 32MB: large enough to minimize the impact of seeks return getConf().getLong("fs.local.block.size", 32 * 1024 * 1024); + } + + public long getBlockSize(Path filename) { + // local doesn't really do blocks, so just use the global number + return getDefaultBlockSize(); + } + + public short getDefaultReplication() { + return 1; } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java?rev=406746&r1=406745&r2=406746&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java Mon May 15 14:26:04 2006 @@ -32,7 +32,7 @@ public static final Logger LOG = LogFormatter.getLogger("org.apache.hadoop.mapred.InputFormatBase"); - private static final double SPLIT_SLOP = 0.1; // 10% slop + private static final double SPLIT_SLOP = 1.1; // 10% slop private long minSplitSize = 1; @@ -117,34 +117,22 @@ totalSize += fs.getLength(files[i]); } - long bytesPerSplit = totalSize / numSplits; // start w/ desired num splits + long goalSize = totalSize / numSplits; // start w/ desired num splits - long fsBlockSize = fs.getBlockSize(); - if (bytesPerSplit > fsBlockSize) { // no larger than fs blocks - bytesPerSplit = fsBlockSize; - } - - long configuredMinSplitSize = job.getLong("mapred.min.split.size", 0); - if( configuredMinSplitSize < minSplitSize ) - configuredMinSplitSize = minSplitSize; - if (bytesPerSplit < configuredMinSplitSize) { // no smaller than min size - bytesPerSplit = configuredMinSplitSize; - } - - long maxPerSplit = bytesPerSplit + (long)(bytesPerSplit*SPLIT_SLOP); - - //LOG.info("bytesPerSplit = " + bytesPerSplit); - //LOG.info("maxPerSplit = " + maxPerSplit); + long minSize = Math.max(job.getLong("mapred.min.split.size", 1), + minSplitSize); ArrayList splits = new ArrayList(numSplits); // generate splits for (int i = 0; i < files.length; i++) { Path file = files[i]; long length = fs.getLength(file); + long blockSize = fs.getBlockSize(file); + long splitSize = computeSplitSize(goalSize, minSize, blockSize); long bytesRemaining = length; - while (bytesRemaining >= maxPerSplit) { - splits.add(new FileSplit(file, length-bytesRemaining, bytesPerSplit)); - bytesRemaining -= bytesPerSplit; + while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { + splits.add(new FileSplit(file, length-bytesRemaining, splitSize)); + bytesRemaining -= splitSize; } if (bytesRemaining != 0) { @@ -156,5 +144,9 @@ return (FileSplit[])splits.toArray(new FileSplit[splits.size()]); } + private static long computeSplitSize(long goalSize, long minSize, + long blockSize) { + return Math.max(minSize, Math.min(goalSize, blockSize)); + } } Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DistributedFSCheck.java URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DistributedFSCheck.java?rev=406746&r1=406745&r2=406746&view=diff ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DistributedFSCheck.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DistributedFSCheck.java Mon May 15 14:26:04 2006 @@ -104,7 +104,7 @@ if( ! fs.isDirectory(rootFile) ) { nrFiles++; // For a regular file generate <fName,offset> pairs - long blockSize = fs.getBlockSize(); + long blockSize = fs.getDefaultBlockSize(); long fileLength = fs.getLength( rootFile ); for( long offset = 0; offset < fileLength; offset += blockSize ) writer.append(new UTF8(rootFile.toString()), new LongWritable(offset)); @@ -136,7 +136,7 @@ in = new DataInputStream(fs.open(new Path(name))); long actualSize = 0; try { - long blockSize = fs.getBlockSize(); + long blockSize = fs.getDefaultBlockSize(); int curSize = bufferSize; for( actualSize = 0; curSize == bufferSize && actualSize < blockSize;