Author: dhruba Date: Wed Sep 19 15:12:49 2007 New Revision: 577456 URL: http://svn.apache.org/viewvc?rev=577456&view=rev Log: HADOOP-89. A client can access file data even before the creator has closed the file. Introduce a new dfs shell command "tail". (Dhruba Borthakur via dhruba)
Removed: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FileUnderConstruction.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PendingCreates.java Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/INode.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=577456&r1=577455&r2=577456&view=diff ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Wed Sep 19 15:12:49 2007 @@ -35,6 +35,10 @@ NEW FEATURES + HADOOP-89. A client can access file data even before the creator + has closed the file. Introduce a new command "tail" from dfs shell. + (Dhruba Borthakur via dhruba) + HADOOP-1636. Allow configuration of the number of jobs kept in memory by the JobTracker. (Michael Bieniosek via omalley) Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java?rev=577456&r1=577455&r2=577456&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java Wed Sep 19 15:12:49 2007 @@ -25,6 +25,7 @@ import org.apache.hadoop.metrics.MetricsRecord; import org.apache.hadoop.metrics.MetricsUtil; import org.apache.hadoop.metrics.MetricsContext; +import org.apache.hadoop.dfs.BlocksMap.BlockInfo; /************************************************* * FSDirectory stores the filesystem directory state. @@ -114,31 +115,43 @@ /** * Add the given filename to the fs. */ - public boolean addFile(String path, Block[] blocks, short replication, - long preferredBlockSize) { + INode addFile(String path, + short replication, + long preferredBlockSize, + String clientName, + String clientMachine, + DatanodeDescriptor clientNode) + throws IOException { waitForReady(); // Always do an implicit mkdirs for parent directory tree. long modTime = FSNamesystem.now(); if (!mkdirs(new Path(path).getParent().toString(), modTime)) { - return false; + return null; + } + INodeFile newNode = new INodeFileUnderConstruction(replication, + preferredBlockSize, modTime, clientName, + clientMachine, clientNode); + synchronized (rootDir) { + try { + newNode = rootDir.addNode(path, newNode); + } catch (FileNotFoundException e) { + newNode = null; + } } - INodeFile newNode = (INodeFile)unprotectedAddFile(path, blocks, replication, - modTime, - preferredBlockSize); if (newNode == null) { NameNode.stateChangeLog.info("DIR* FSDirectory.addFile: " - +"failed to add "+path+" with " - +blocks.length+" blocks to the file system"); - return false; + +"failed to add "+path + +" to the file system"); + return null; } // add create file record to log fsImage.getEditLog().logCreateFile(path, newNode); NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: " - +path+" with "+blocks.length+" blocks is added to the file system"); - return true; + +path+" is added to the file system"); + return newNode; } - + /** */ INode unprotectedAddFile( String path, @@ -171,28 +184,64 @@ } /** - * Add blocks to the file. + * Add a block to the file. Returns a reference to the added block. + */ + Block addBlock(String path, INode file, Block block) throws IOException { + waitForReady(); + + synchronized (rootDir) { + INodeFile fileNode = (INodeFile) file; + + // associate the new list of blocks with this file + namesystem.blocksMap.addINode(block, fileNode); + BlockInfo blockInfo = namesystem.blocksMap.getStoredBlock(block); + fileNode.addBlock(blockInfo); + + NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: " + + path + " with " + block + + " block is added to the in-memory " + + "file system"); + } + return block; + } + + /** + * Persist the block list for the inode. + */ + void persistBlocks(String path, INode file) throws IOException { + waitForReady(); + + synchronized (rootDir) { + INodeFile fileNode = (INodeFile) file; + + // create two transactions. The first one deletes the empty + // file and the second transaction recreates the same file + // with the appropriate set of blocks. + fsImage.getEditLog().logDelete(path, fileNode.getModificationTime()); + + // re-add create file record to log + fsImage.getEditLog().logCreateFile(path, fileNode); + NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: " + +path+" with "+ fileNode.getBlocks().length + +" blocks is persisted to the file system"); + } + } + + /** + * Remove a block to the file. */ - boolean addBlocks(String path, Block[] blocks) throws IOException { + boolean removeBlock(String path, INode file, Block block) throws IOException { waitForReady(); synchronized (rootDir) { - INodeFile fileNode = this.getFileINode(path); + INodeFile fileNode = (INodeFile) file; if (fileNode == null) { throw new IOException("Unknown file: " + path); } - if (fileNode.getBlocks() != null && - fileNode.getBlocks().length != 0) { - throw new IOException("Cannot add new blocks to " + - "already existing file."); - } - // associate the new list of blocks with this file - fileNode.allocateBlocks(blocks.length); - for (int i = 0; i < blocks.length; i++) { - fileNode.setBlock(i, - namesystem.blocksMap.addINode(blocks[i], fileNode)); - } + // modify file-> block and blocksMap + fileNode.removeBlock(block); + namesystem.blocksMap.removeINode(block); // create two transactions. The first one deletes the empty // file and the second transaction recreates the same file @@ -202,8 +251,8 @@ // re-add create file record to log fsImage.getEditLog().logCreateFile(path, fileNode); NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: " - +path+" with "+blocks.length - +" blocks is added to the file system"); + +path+" with "+block + +" block is added to the file system"); } return true; } @@ -372,6 +421,28 @@ } return v.toArray(new Block[v.size()]); } + } + } + } + + /** + * Replaces the specified inode with the specified one. + */ + void replaceNode(String path, INodeFile oldnode, INodeFile newnode) + throws IOException { + synchronized (rootDir) { + // + // Remove the node from the namespace + // + if (!oldnode.removeNode()) { + NameNode.stateChangeLog.warn("DIR* FSDirectory.replaceNode: " + + "failed to remove " + path); + throw new IOException("FSDirectory.replaceNode: " + + "failed to remove " + path); + } + rootDir.addNode(path, newnode); + for (Block b : newnode.getBlocks()) { + namesystem.blocksMap.addINode(b, newnode); } } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=577456&r1=577455&r2=577456&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Wed Sep 19 15:12:49 2007 @@ -104,12 +104,6 @@ new TreeMap<String, Collection<Block>>(); // - // Keeps track of files that are being created, plus the - // blocks that make them up. - // - PendingCreates pendingCreates = new PendingCreates(); - - // // Stats on overall usage // long totalCapacity = 0L, totalUsed=0L, totalRemaining = 0L; @@ -719,7 +713,15 @@ throw new IOException( text + " is less than the required minimum " + minReplication); } - + + void startFile(String src, String holder, String clientMachine, + boolean overwrite, short replication, long blockSize + ) throws IOException { + startFileInternal(src, holder, clientMachine, overwrite, + replication, blockSize); + getEditLog().logSync(); + } + /** * The client would like to create a new block for the indicated * filename. Return an array that consists of the block, plus a set @@ -731,7 +733,7 @@ * @throws IOException if the filename is invalid * [EMAIL PROTECTED] FSDirectory#isValidToCreate(String)}. */ - synchronized void startFile(String src, + synchronized void startFileInternal(String src, String holder, String clientMachine, boolean overwrite, @@ -746,10 +748,11 @@ throw new IOException("Invalid file name: " + src); } try { - FileUnderConstruction pendingFile = pendingCreates.get(src); - if (pendingFile != null) { + INode myFile = dir.getFileINode(src); + if (myFile != null && (myFile instanceof INodeFileUnderConstruction)) { + INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) myFile; // - // If the file exists in pendingCreate, then it must be in our + // If the file is under construction , then it must be in our // leases. Find the appropriate lease record. // Lease lease = getLease(holder); @@ -814,15 +817,6 @@ DatanodeDescriptor clientNode = host2DataNodeMap.getDatanodeByHost(clientMachine); - // Reserve space for this pending file - pendingCreates.put(src, - new FileUnderConstruction(replication, - blockSize, - holder, - clientMachine, - clientNode)); - NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: " - +"add "+src+" to pendingCreates for "+holder); synchronized (leases) { Lease lease = getLease(holder); if (lease == null) { @@ -836,20 +830,27 @@ } lease.startedCreate(src); } + + // + // Now we can add the name to the filesystem. This file has no + // blocks associated with it. + // + INode newNode = dir.addFile(src, replication, blockSize, + holder, + clientMachine, + clientNode); + if (newNode == null) { + throw new IOException("DIR* NameSystem.startFile: " + + "Unable to add file to namespace."); + } } catch (IOException ie) { NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: " +ie.getMessage()); throw ie; } - // - // Now we can add the name to the filesystem. This file has no - // blocks associated with it. - // - if (!dir.addFile(src, new Block[0], replication, blockSize)) { - throw new IOException("DIR* NameSystem.startFile: " + - "Unable to add file to namespace."); - } + NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: " + +"add "+src+" to namespace for "+holder); } /** @@ -882,7 +883,7 @@ // // make sure that we still have the lease on this file // - FileUnderConstruction pendingFile = pendingCreates.get(src); + INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) dir.getFileINode(src); if (pendingFile == null) { throw new LeaseExpiredException("No lease on " + src); } @@ -898,11 +899,11 @@ if (!checkFileProgress(pendingFile, false)) { throw new NotReplicatedYetException("Not replicated yet:" + src); } - fileLength = pendingFile.computeFileLength(); - blockSize = pendingFile.getBlockSize(); + fileLength = pendingFile.computeContentsLength(); + blockSize = pendingFile.getPreferredBlockSize(); clientNode = pendingFile.getClientNode(); replication = (int)pendingFile.getReplication(); - newBlock = allocateBlock(src); + newBlock = allocateBlock(src, pendingFile); } DatanodeDescriptor targets[] = replicator.chooseTarget(replication, @@ -928,13 +929,14 @@ // NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: " +b.getBlockName()+"of file "+src); - boolean status = pendingCreates.removeBlock(src, b); - if (status) { - NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: " + INode file = dir.getFileINode(src); + if (file != null) { + dir.removeBlock(src, file, b); + } + NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: " + b.getBlockName() + " is removed from pendingCreates"); - } - return status; + return true; } /** @@ -964,8 +966,7 @@ } /** - * Finalize the created file and make it world-accessible. The - * FSNamesystem will already know the blocks that make up the file. + * The FSNamesystem will already know the blocks that make up the file. * Before we return, we make sure that all the file's blocks have * been reported by datanodes and are replicated correctly. */ @@ -980,11 +981,11 @@ NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src + " for " + holder); if (isInSafeMode()) throw new SafeModeException("Cannot complete file " + src, safeMode); - FileUnderConstruction pendingFile = pendingCreates.get(src); + INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) dir.getFileINode(src); Block[] fileBlocks = dir.getFileBlocks(src); - if ((fileBlocks != null && fileBlocks.length > 0) || - pendingFile == null) { + if (fileBlocks == null || fileBlocks.length == 0 || + pendingFile == null) { NameNode.stateChangeLog.warn("DIR* NameSystem.completeFile: " + "failed to complete " + src + " because dir.getFileBlocks() is " + @@ -999,36 +1000,16 @@ return STILL_WAITING; } - Collection<Block> blocks = pendingFile.getBlocks(); - int nrBlocks = blocks.size(); - Block pendingBlocks[] = new Block[nrBlocks]; - - // - // We have the pending blocks, but they won't have - // length info in them (as they were allocated before - // data-write took place). Find the block stored in - // node descriptor. - // - int idx = 0; - for (Block b : blocks) { - Block storedBlock = blocksMap.getStoredBlock(b); - // according to checkFileProgress() every block is present & replicated - assert storedBlock != null : "Missing block " + b.getBlockName(); - pendingBlocks[idx++] = storedBlock; - } - - // - // add blocks to the file - // - if (!dir.addBlocks(src, pendingBlocks)) { - return OPERATION_FAILED; - } + // The file is no longer pending. + // Create permanent INode, update blockmap + INodeFile newFile = pendingFile.convertToInodeFile(); + dir.replaceNode(src, pendingFile, newFile); - // The file is no longer pending - pendingCreates.remove(src); - NameNode.stateChangeLog.debug( - "DIR* NameSystem.completeFile: " + src - + " is removed from pendingCreates"); + // persist block allocations for this file + dir.persistBlocks(src, newFile); + + NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src + + " blocklist persisted"); synchronized (leases) { Lease lease = getLease(holder); @@ -1051,6 +1032,8 @@ // Now that the file is real, we need to be sure to replicate // the blocks. int numExpectedReplicas = pendingFile.getReplication(); + Block[] pendingBlocks = pendingFile.getBlocks(); + int nrBlocks = pendingBlocks.length; for (int i = 0; i < nrBlocks; i++) { // filter out containingNodes that are marked for decommission. NumberReplicas number = countNodes(pendingBlocks[i]); @@ -1069,15 +1052,14 @@ /** * Allocate a block at the given pending filename */ - private Block allocateBlock(String src) throws IOException { + private Block allocateBlock(String src, INode file) throws IOException { Block b = null; do { b = new Block(FSNamesystem.randBlockId.nextLong(), 0); } while (isValidBlock(b)); - pendingCreates.addBlock(src, b); + b = dir.addBlock(src, file, b); NameNode.stateChangeLog.info("BLOCK* NameSystem.allocateBlock: " - +src+ ". "+b.getBlockName()+ - " is created and added to pendingCreates and pendingCreateBlocks"); + +src+ ". "+b.getBlockName()); return b; } @@ -1086,13 +1068,13 @@ * replicated. If not, return false. If checkall is true, then check * all blocks, otherwise check only penultimate block. */ - synchronized boolean checkFileProgress(FileUnderConstruction v, boolean checkall) { + synchronized boolean checkFileProgress(INodeFile v, boolean checkall) { if (checkall) { // // check all blocks of the file. // - for (Iterator<Block> it = v.getBlocks().iterator(); it.hasNext();) { - if (blocksMap.numNodes(it.next()) < this.minReplication) { + for (Block block: v.getBlocks()) { + if (blocksMap.numNodes(block) < this.minReplication) { return false; } } @@ -1490,21 +1472,36 @@ } /** - * Release a pending file creation lock. + * Move a file that is being written to be immutable. * @param src The filename * @param holder The datanode that was creating the file */ private void internalReleaseCreate(String src, String holder) throws IOException { - boolean status = pendingCreates.remove(src); - if (status) { - NameNode.stateChangeLog.debug("DIR* NameSystem.internalReleaseCreate: " + src - + " is removed from pendingCreates for " - + holder + " (failure)"); - } else { - NameNode.stateChangeLog.warn("DIR* NameSystem.internalReleaseCreate: " - + "attempt to release a create lock on "+ src - + " that was not in pedingCreates"); + INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) dir.getFileINode(src); + + // The last block that was allocated migth not have been used by the + // client. In this case, the size of the last block would be 0. A fsck + // will report this block as a missing block because no datanodes have it. + // Delete this block. + Block[] blocks = pendingFile.getBlocks(); + if (blocks != null && blocks.length > 1) { + Block last = blocks[blocks.length - 1]; + if (last.getNumBytes() == 0) { + pendingFile.removeBlock(last); + } } + + // The file is no longer pending. + // Create permanent INode, update blockmap + INodeFile newFile = pendingFile.convertToInodeFile(); + dir.replaceNode(src, pendingFile, newFile); + + // persist block allocations for this file + dir.persistBlocks(src, newFile); + + NameNode.stateChangeLog.debug("DIR* NameSystem.internalReleaseCreate: " + + src + " is no longer written to by " + + holder); } /** @@ -2161,7 +2158,18 @@ Block storedBlock = blocksMap.getStoredBlock(block); //extra look up! if (storedBlock != null && block != storedBlock) { if (block.getNumBytes() > 0) { - storedBlock.setNumBytes(block.getNumBytes()); + long cursize = storedBlock.getNumBytes(); + if (cursize == 0) { + storedBlock.setNumBytes(block.getNumBytes()); + } else if (cursize != block.getNumBytes()) { + LOG.warn("Inconsistent size for block " + block + + " reported from " + node.getName() + + " current size is " + cursize + + " reported size is " + block.getNumBytes()); + // Accept this block even if there is a problem with its + // size. Clients should detect data corruption because of + // CRC mismatch. + } } block = storedBlock; } @@ -2185,8 +2193,13 @@ + block.getBlockName() + " on " + node.getName()); } - if (fileINode == null) // block does not belong to any file + // + // if file is being actively written to, then do not check + // replication-factor here. It will be checked when the file is closed. + // + if (fileINode == null || fileINode instanceof INodeFileUnderConstruction) { return block; + } // filter out containingNodes that are marked for decommission. NumberReplicas num = countNodes(block); @@ -3460,8 +3473,7 @@ * Returns whether the given block is one pointed-to by a file. */ private boolean isValidBlock(Block b) { - return (blocksMap.getINode(b) != null || - pendingCreates.contains(b)); + return (blocksMap.getINode(b) != null); } // Distributed upgrade manager Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/INode.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/INode.java?rev=577456&r1=577455&r2=577456&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/INode.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/INode.java Wed Sep 19 15:12:49 2007 @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.Arrays; import java.util.List; +import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.dfs.BlocksMap.BlockInfo; @@ -472,6 +473,14 @@ allocateBlocks(nrBlocks); } + protected INodeFile(BlockInfo[] blklist, short replication, long modificationTime, + long preferredBlockSize) { + super(modificationTime); + this.blockReplication = replication; + this.preferredBlockSize = preferredBlockSize; + blocks = blklist; + } + boolean isDirectory() { return false; } @@ -492,7 +501,7 @@ * Get file blocks * @return file blocks */ - Block[] getBlocks() { + BlockInfo[] getBlocks() { return this.blocks; } @@ -505,6 +514,45 @@ } /** + * add a block to the block list + */ + void addBlock(BlockInfo newblock) { + if (this.blocks == null) { + this.blocks = new BlockInfo[1]; + this.blocks[0] = newblock; + } else { + int size = this.blocks.length; + BlockInfo[] newlist = new BlockInfo[size + 1]; + for (int i = 0; i < size; i++) { + newlist[i] = this.blocks[i]; + } + newlist[size] = newblock; + this.blocks = newlist; + } + } + + /** + * remove a block from the block list. This block should be + * the last one on the list. + */ + void removeBlock(Block oldblock) throws IOException { + if (this.blocks == null) { + throw new IOException("Trying to delete non-existant block " + + oldblock); + } + int size = this.blocks.length; + if (!this.blocks[size-1].equals(oldblock)) { + throw new IOException("Trying to delete non-existant block " + + oldblock); + } + BlockInfo[] newlist = new BlockInfo[size - 1]; + for (int i = 0; i < size-1; i++) { + newlist[i] = this.blocks[i]; + } + this.blocks = newlist; + } + + /** * Set file block */ void setBlock(int idx, BlockInfo blk) { @@ -536,5 +584,58 @@ */ long getPreferredBlockSize() { return preferredBlockSize; + } + + /** + * Return the penultimate allocated block for this file. + */ + Block getPenultimateBlock() { + if (blocks == null || blocks.length <= 1) { + return null; + } + return blocks[blocks.length - 2]; + } +} + +class INodeFileUnderConstruction extends INodeFile { + protected StringBytesWritable clientName; // lease holder + protected StringBytesWritable clientMachine; + protected DatanodeDescriptor clientNode; // if client is a cluster node too. + + INodeFileUnderConstruction(short replication, + long preferredBlockSize, + long modTime, + String clientName, + String clientMachine, + DatanodeDescriptor clientNode) + throws IOException { + super(0, replication, modTime, preferredBlockSize); + this.clientName = new StringBytesWritable(clientName); + this.clientMachine = new StringBytesWritable(clientMachine); + this.clientNode = clientNode; + } + + String getClientName() throws IOException { + return clientName.getString(); + } + + String getClientMachine() throws IOException { + return clientMachine.getString(); + } + + DatanodeDescriptor getClientNode() { + return clientNode; + } + + // + // converts a INodeFileUnderConstruction into a INodeFile + // + INodeFile convertToInodeFile() { + INodeFile obj = new INodeFile(getBlocks(), + getReplication(), + getModificationTime(), + getPreferredBlockSize()); + return obj; + } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java?rev=577456&r1=577455&r2=577456&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java Wed Sep 19 15:12:49 2007 @@ -44,6 +44,7 @@ modifFmt.setTimeZone(TimeZone.getTimeZone("UTC")); } static final String SETREP_SHORT_USAGE="-setrep [-R] [-w] <rep> <path/file>"; + static final String TAIL_USAGE="-tail [-f] <file>"; private static final DecimalFormat decimalFormat = new DecimalFormat("#*0.0#*"); @@ -881,6 +882,54 @@ } /** + * Parse the incoming command string + * @param cmd + * @param pos ignore anything before this pos in cmd + * @throws IOException + */ + private void tail(String[] cmd, int pos) throws IOException { + CommandFormat c = new CommandFormat("tail", 1, 1, "f"); + String src = null; + Path path = null; + short rep = 0; + + try { + List<String> parameters = c.parse(cmd, pos); + src = parameters.get(0); + } catch(IllegalArgumentException iae) { + System.err.println("Usage: java FsShell " + TAIL_USAGE); + throw iae; + } + boolean foption = c.options.get("f") ? true: false; + path = new Path(src); + + if (fs.isDirectory(path)) { + throw new IOException("Source must be a file."); + } + + long fileSize = fs.getFileStatus(path).getLen(); + long offset = (fileSize > 1024) ? fileSize - 1024: 0; + + while (true) { + FSDataInputStream in = fs.open(path); + in.seek(offset); + IOUtils.copyBytes(in, System.out, 1024, false); + offset = in.getPos(); + in.close(); + if (!foption) { + break; + } + fileSize = fs.getFileStatus(path).getLen(); + offset = (fileSize > offset) ? offset: fileSize; + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + break; + } + } + } + + /** * Return an abbreviated English-language desc of the byte length */ public static String byteDesc(long len) { @@ -926,6 +975,7 @@ "[-copyToLocal <src><localdst>] [-moveToLocal <src> <localdst>]\n\t" + "[-mkdir <path>] [-report] [" + SETREP_SHORT_USAGE + "]\n\t" + "[-touchz <path>] [-test -[ezd] <path>] [-stat [format] <path>]\n\t" + + "[-tail [-f] <path>]\n\t" + "[-help [cmd]]\n"; String conf ="-conf <configuration file>: Specify an application configuration file."; @@ -1025,6 +1075,10 @@ "\t\tin the specified format. Format accepts filesize in blocks (%b), filename (%n),\n" + "\t\tblock size (%o), replication (%r), modification date (%y, %Y)\n"; + String tail = TAIL_USAGE + + ": Show the last 1KB of the file. \n" + + "\t\tThe -f option shows apended data as the file grows. \n"; + String help = "-help [cmd]: \tDisplays help for given command or all commands if none\n" + "\t\tis specified.\n"; @@ -1078,6 +1132,8 @@ System.out.println(test); } else if ("stat".equals(cmd)) { System.out.println(stat); + } else if ("tail".equals(cmd)) { + System.out.println(tail); } else if ("help".equals(cmd)) { System.out.println(help); } else { @@ -1214,6 +1270,8 @@ } else if ("-stat".equals(cmd)) { System.err.println("Usage: java FsShell" + " [-stat [format] <path>]"); + } else if ("-tail".equals(cmd)) { + System.err.println("Usage: java FsShell [" + TAIL_USAGE + "]"); } else { System.err.println("Usage: java FsShell"); System.err.println(" [-ls <path>]"); @@ -1238,6 +1296,7 @@ System.err.println(" [-touchz <path>]"); System.err.println(" [-test -[ezd] <path>]"); System.err.println(" [-stat [format] <path>]"); + System.err.println(" [" + TAIL_USAGE + "]"); System.err.println(" [-help [cmd]]"); System.err.println(); ToolRunner.printGenericCommandUsage(System.err); @@ -1370,6 +1429,8 @@ } else { printHelp(""); } + } else if ("-tail".equals(cmd)) { + tail(argv, i); } else { exitCode = -1; System.err.println(cmd.substring(1) + ": Unknown command"); Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java?rev=577456&r1=577455&r2=577456&view=diff ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java Wed Sep 19 15:12:49 2007 @@ -27,14 +27,16 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSDataInputStream; /** - * This class tests the FileStatus API. + * This class tests that a file need not be closed before its + * data can be read by another client. */ public class TestFileCreation extends TestCase { static final long seed = 0xDEADBEEFL; static final int blockSize = 8192; - static final int fileSize = 16384; + static final int fileSize = 2 * blockSize; private static String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data","/tmp")) @@ -78,11 +80,27 @@ } } } + FSDataInputStream stm = fileSys.open(name); + byte[] expected = new byte[fileSize]; + Random rand = new Random(seed); + rand.nextBytes(expected); + // do a sanity check. Read the file + byte[] actual = new byte[fileSize]; + stm.readFully(0, actual); + checkData(actual, 0, expected, "Read 1"); } + private void checkData(byte[] actual, int from, byte[] expected, String message) { + for (int idx = 0; idx < actual.length; idx++) { + this.assertEquals(message+" byte "+(from+idx)+" differs. expected "+ + expected[from+idx]+" actual "+actual[idx], + actual[idx], expected[from+idx]); + actual[idx] = 0; + } + } /** - * Tests various options of File creation. + * Test that file data becomes available before file is closed. */ public void testFileCreation() throws IOException { Configuration conf = new Configuration(); @@ -115,9 +133,13 @@ // write to file writeFile(stm); - // close file. This makes all file data visible to clients. - stm.close(); + // verify that file size has changed + assertTrue(file1 + " should be of size " + fileSize, + fs.getFileStatus(file1).getLen() == fileSize); + + // Make sure a client can read it before it is closed. checkFile(fs, file1, 1); + stm.close(); } finally { fs.close();