Author: brandonli Date: Wed Aug 28 17:23:55 2013 New Revision: 1518292 URL: http://svn.apache.org/r1518292 Log: HDFS-5078 Support file append in NFSv3 gateway to enable data streaming to HDFS. Contributed by Brandon Li
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java?rev=1518292&r1=1518291&r2=1518292&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java Wed Aug 28 17:23:55 2013 @@ -126,6 +126,9 @@ class OpenFileCtx { nonSequentialWriteInMemory = 0; this.dumpFilePath = dumpFilePath; enabledDump = dumpFilePath == null ? false: true; + nextOffset = latestAttr.getSize(); + assert(nextOffset == this.fos.getPos()); + ctxLock = new ReentrantLock(true); } @@ -685,12 +688,14 @@ class OpenFileCtx { try { fos.write(data, 0, count); - - if (fos.getPos() != (offset + count)) { + + long flushedOffset = getFlushedOffset(); + if (flushedOffset != (offset + count)) { throw new IOException("output stream is out of sync, pos=" - + fos.getPos() + " and nextOffset should be" + (offset + count)); + + flushedOffset + " and nextOffset should be" + + (offset + count)); } - nextOffset = fos.getPos(); + nextOffset = flushedOffset; // Reduce memory occupation size if request was allowed dumped if (writeCtx.getDataState() == DataState.ALLOW_DUMP) { Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java?rev=1518292&r1=1518291&r2=1518292&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java Wed Aug 28 17:23:55 2013 @@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem.Statistics; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FsStatus; import org.apache.hadoop.fs.Options; @@ -123,7 +124,7 @@ public class RpcProgramNfs3 extends RpcP private final Configuration config = new Configuration(); private final WriteManager writeManager; - private final IdUserGroup iug;// = new IdUserGroup(); + private final IdUserGroup iug; private final DFSClientCache clientCache; private final NfsExports exports; @@ -161,10 +162,14 @@ public class RpcProgramNfs3 extends RpcP DFSConfigKeys.DFS_REPLICATION_DEFAULT); blockSize = config.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT); - bufferSize = config.getInt("io.file.buffer.size", 4096); - - writeDumpDir = config.get("dfs.nfs3.dump.dir", "/tmp/.hdfs-nfs"); - boolean enableDump = config.getBoolean("dfs.nfs3.enableDump", true); + bufferSize = config.getInt( + CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, + CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT); + + writeDumpDir = config.get(Nfs3Constant.FILE_DUMP_DIR_KEY, + Nfs3Constant.FILE_DUMP_DIR_DEFAULT); + boolean enableDump = config.getBoolean(Nfs3Constant.ENABLE_FILE_DUMP_KEY, + Nfs3Constant.ENABLE_FILE_DUMP_DEFAULT); if (!enableDump) { writeDumpDir = null; } else { @@ -1112,6 +1117,7 @@ public class RpcProgramNfs3 extends RpcP } } + @Override public SYMLINK3Response symlink(XDR xdr, RpcAuthSys authSys, InetAddress client) { return new SYMLINK3Response(Nfs3Status.NFS3ERR_NOTSUPP); Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java?rev=1518292&r1=1518291&r2=1518292&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java Wed Aug 28 17:23:55 2013 @@ -25,7 +25,9 @@ import java.util.concurrent.ConcurrentMa import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.nfs.NfsFileType; import org.apache.hadoop.nfs.nfs3.FileHandle; import org.apache.hadoop.nfs.nfs3.IdUserGroup; @@ -48,6 +50,7 @@ import com.google.common.collect.Maps; public class WriteManager { public static final Log LOG = LogFactory.getLog(WriteManager.class); + private final Configuration config; private final IdUserGroup iug; private final ConcurrentMap<FileHandle, OpenFileCtx> openFileMap = Maps .newConcurrentMap(); @@ -76,6 +79,7 @@ public class WriteManager { WriteManager(IdUserGroup iug, final Configuration config) { this.iug = iug; + this.config = config; streamTimeout = config.getLong("dfs.nfs3.stream.timeout", DEFAULT_STREAM_TIMEOUT); @@ -129,12 +133,41 @@ public class WriteManager { OpenFileCtx openFileCtx = openFileMap.get(fileHandle); if (openFileCtx == null) { LOG.info("No opened stream for fileId:" + fileHandle.getFileId()); - WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr), preOpAttr); - WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, - fileWcc, count, request.getStableHow(), - Nfs3Constant.WRITE_COMMIT_VERF); - Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid)); - return; + + String fileIdPath = Nfs3Utils.getFileIdPath(fileHandle.getFileId()); + HdfsDataOutputStream fos = null; + Nfs3FileAttributes latestAttr = null; + try { + int bufferSize = config.getInt( + CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, + CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT); + + fos = dfsClient.append(fileIdPath, bufferSize, null, null); + + latestAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug); + } catch (IOException e) { + LOG.error("Can't apapend to file:" + fileIdPath + ", error:" + e); + if (fos != null) { + fos.close(); + } + WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr), + preOpAttr); + WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, + fileWcc, count, request.getStableHow(), + Nfs3Constant.WRITE_COMMIT_VERF); + Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid)); + return; + } + + // Add open stream + String writeDumpDir = config.get(Nfs3Constant.FILE_DUMP_DIR_KEY, + Nfs3Constant.FILE_DUMP_DIR_DEFAULT); + openFileCtx = new OpenFileCtx(fos, latestAttr, writeDumpDir + "/" + + fileHandle.getFileId()); + addOpenFileStream(fileHandle, openFileCtx); + if (LOG.isDebugEnabled()) { + LOG.debug("opened stream for file:" + fileHandle.getFileId()); + } } // Add write into the async job queue Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1518292&r1=1518291&r2=1518292&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Aug 28 17:23:55 2013 @@ -307,6 +307,9 @@ Release 2.1.1-beta - UNRELEASED HDFS-4947 Add NFS server export table to control export by hostname or IP range (Jing Zhao via brandonli) + HDFS-5078 Support file append in NFSv3 gateway to enable data streaming + to HDFS (brandonli) + IMPROVEMENTS HDFS-4513. Clarify in the WebHDFS REST API that all JSON respsonses may