Author: brandonli Date: Fri Nov 8 01:41:43 2013 New Revision: 1539897 URL: http://svn.apache.org/r1539897 Log: HDFS-5364. Merging change r1539891 from branch-2
Added: hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtxCache.java - copied unchanged from r1539891, hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtxCache.java hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOpenFileCtxCache.java - copied unchanged from r1539891, hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOpenFileCtxCache.java Modified: hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Modified: hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java?rev=1539897&r1=1539896&r2=1539897&view=diff ============================================================================== --- hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java (original) +++ hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java Fri Nov 8 01:41:43 2013 @@ -23,33 +23,47 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.nfs.mount.Mountd; +import org.apache.hadoop.mount.MountdBase; import org.apache.hadoop.nfs.nfs3.Nfs3Base; import org.apache.hadoop.util.StringUtils; +import com.google.common.annotations.VisibleForTesting; + /** * Nfs server. Supports NFS v3 using {@link RpcProgramNfs3}. * Currently Mountd program is also started inside this class. * Only TCP server is supported and UDP is not supported. */ public class Nfs3 extends Nfs3Base { + private Mountd mountd; + static { Configuration.addDefaultResource("hdfs-default.xml"); Configuration.addDefaultResource("hdfs-site.xml"); } public Nfs3(List<String> exports) throws IOException { - super(new Mountd(exports), new RpcProgramNfs3()); + super(new RpcProgramNfs3()); + mountd = new Mountd(exports); } + @VisibleForTesting public Nfs3(List<String> exports, Configuration config) throws IOException { - super(new Mountd(exports, config), new RpcProgramNfs3(config), config); + super(new RpcProgramNfs3(config), config); + mountd = new Mountd(exports, config); } + public Mountd getMountd() { + return mountd; + } + public static void main(String[] args) throws IOException { StringUtils.startupShutdownMessage(Nfs3.class, args, LOG); List<String> exports = new ArrayList<String>(); exports.add("/"); + final Nfs3 nfsServer = new Nfs3(exports); + nfsServer.mountd.start(true); // Start mountd nfsServer.start(true); } } Modified: hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java?rev=1539897&r1=1539896&r2=1539897&view=diff ============================================================================== --- hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java (original) +++ hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java Fri Nov 8 01:41:43 2013 @@ -24,7 +24,6 @@ import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; -import java.security.InvalidParameterException; import java.util.EnumSet; import java.util.Iterator; import java.util.Map.Entry; @@ -96,7 +95,7 @@ class OpenFileCtx { // It's updated after each sync to HDFS private Nfs3FileAttributes latestAttr; - + private final ConcurrentNavigableMap<OffsetRange, WriteCtx> pendingWrites; private final ConcurrentNavigableMap<Long, CommitCtx> pendingCommits; @@ -165,10 +164,22 @@ class OpenFileCtx { return System.currentTimeMillis() - lastAccessTime > streamTimeout; } + long getLastAccessTime() { + return lastAccessTime; + } + public long getNextOffset() { return nextOffset.get(); } + boolean getActiveState() { + return this.activeState; + } + + boolean hasPendingWork() { + return (pendingWrites.size() != 0 || pendingCommits.size() != 0); + } + // Increase or decrease the memory occupation of non-sequential writes private long updateNonSequentialWriteInMemory(long count) { long newValue = nonSequentialWriteInMemory.addAndGet(count); @@ -800,19 +811,18 @@ class OpenFileCtx { * @return true, remove stream; false, keep stream */ public synchronized boolean streamCleanup(long fileId, long streamTimeout) { - if (streamTimeout < WriteManager.MINIMIUM_STREAM_TIMEOUT) { - throw new InvalidParameterException("StreamTimeout" + streamTimeout - + "ms is less than MINIMIUM_STREAM_TIMEOUT " - + WriteManager.MINIMIUM_STREAM_TIMEOUT + "ms"); + Preconditions + .checkState(streamTimeout >= Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT); + if (!activeState) { + return true; } boolean flag = false; // Check the stream timeout if (checkStreamTimeout(streamTimeout)) { if (LOG.isDebugEnabled()) { - LOG.debug("closing stream for fileId:" + fileId); + LOG.debug("stream can be closed for fileId:" + fileId); } - cleanup(); flag = true; } return flag; @@ -985,7 +995,7 @@ class OpenFileCtx { FileHandle handle = writeCtx.getHandle(); if (LOG.isDebugEnabled()) { LOG.debug("do write, fileId: " + handle.getFileId() + " offset: " - + offset + " length:" + count + " stableHow:" + stableHow.getValue()); + + offset + " length:" + count + " stableHow:" + stableHow.name()); } try { @@ -1066,7 +1076,7 @@ class OpenFileCtx { } } - private synchronized void cleanup() { + synchronized void cleanup() { if (!activeState) { LOG.info("Current OpenFileCtx is already inactive, no need to cleanup."); return; @@ -1074,7 +1084,7 @@ class OpenFileCtx { activeState = false; // stop the dump thread - if (dumpThread != null) { + if (dumpThread != null && dumpThread.isAlive()) { dumpThread.interrupt(); try { dumpThread.join(3000); @@ -1156,4 +1166,10 @@ class OpenFileCtx { void setActiveStatusForTest(boolean activeState) { this.activeState = activeState; } + + @Override + public String toString() { + return String.format("activeState: %b asyncStatus: %b nextOffset: %d", + activeState, asyncStatus, nextOffset.get()); + } } Modified: hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java?rev=1539897&r1=1539896&r2=1539897&view=diff ============================================================================== --- hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java (original) +++ hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java Fri Nov 8 01:41:43 2013 @@ -214,6 +214,11 @@ public class RpcProgramNfs3 extends RpcP } } + @Override + public void startDaemons() { + writeManager.startAsyncDataSerivce(); + } + /****************************************************** * RPC call handlers ******************************************************/ @@ -778,7 +783,8 @@ public class RpcProgramNfs3 extends RpcP int createMode = request.getMode(); if ((createMode != Nfs3Constant.CREATE_EXCLUSIVE) - && request.getObjAttr().getUpdateFields().contains(SetAttrField.SIZE)) { + && request.getObjAttr().getUpdateFields().contains(SetAttrField.SIZE) + && request.getObjAttr().getSize() != 0) { LOG.error("Setting file size is not supported when creating file: " + fileName + " dir fileId:" + dirHandle.getFileId()); return new CREATE3Response(Nfs3Status.NFS3ERR_INVAL); @@ -831,6 +837,23 @@ public class RpcProgramNfs3 extends RpcP postOpObjAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug); dirWcc = Nfs3Utils.createWccData(Nfs3Utils.getWccAttr(preOpDirAttr), dfsClient, dirFileIdPath, iug); + + // Add open stream + OpenFileCtx openFileCtx = new OpenFileCtx(fos, postOpObjAttr, + writeDumpDir + "/" + postOpObjAttr.getFileId(), dfsClient, iug); + fileHandle = new FileHandle(postOpObjAttr.getFileId()); + if (!writeManager.addOpenFileStream(fileHandle, openFileCtx)) { + LOG.warn("Can't add more stream, close it." + + " Future write will become append"); + fos.close(); + fos = null; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Opened stream for file:" + fileName + ", fileId:" + + fileHandle.getFileId()); + } + } + } catch (IOException e) { LOG.error("Exception", e); if (fos != null) { @@ -859,16 +882,6 @@ public class RpcProgramNfs3 extends RpcP } } - // Add open stream - OpenFileCtx openFileCtx = new OpenFileCtx(fos, postOpObjAttr, writeDumpDir - + "/" + postOpObjAttr.getFileId(), dfsClient, iug); - fileHandle = new FileHandle(postOpObjAttr.getFileId()); - writeManager.addOpenFileStream(fileHandle, openFileCtx); - if (LOG.isDebugEnabled()) { - LOG.debug("open stream for file:" + fileName + ", fileId:" - + fileHandle.getFileId()); - } - return new CREATE3Response(Nfs3Status.NFS3_OK, fileHandle, postOpObjAttr, dirWcc); } Modified: hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java?rev=1539897&r1=1539896&r2=1539897&view=diff ============================================================================== --- hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java (original) +++ hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java Fri Nov 8 01:41:43 2013 @@ -18,8 +18,6 @@ package org.apache.hadoop.hdfs.nfs.nfs3; import java.io.IOException; -import java.util.Iterator; -import java.util.Map.Entry; import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; @@ -29,11 +27,12 @@ import org.apache.hadoop.fs.CommonConfig import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.COMMIT_STATUS; +import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.nfs.NfsFileType; import org.apache.hadoop.nfs.nfs3.FileHandle; import org.apache.hadoop.nfs.nfs3.IdUserGroup; import org.apache.hadoop.nfs.nfs3.Nfs3Constant; -import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow; import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes; import org.apache.hadoop.nfs.nfs3.Nfs3Status; import org.apache.hadoop.nfs.nfs3.request.WRITE3Request; @@ -56,69 +55,70 @@ public class WriteManager { private final Configuration config; private final IdUserGroup iug; - private final ConcurrentMap<FileHandle, OpenFileCtx> openFileMap = Maps - .newConcurrentMap(); - + private AsyncDataService asyncDataService; private boolean asyncDataServiceStarted = false; - private final StreamMonitor streamMonitor; - + private final int maxStreams; + /** * The time limit to wait for accumulate reordered sequential writes to the * same file before the write is considered done. */ private long streamTimeout; - - public static final long DEFAULT_STREAM_TIMEOUT = 10 * 60 * 1000; //10 minutes - public static final long MINIMIUM_STREAM_TIMEOUT = 10 * 1000; //10 seconds - - void addOpenFileStream(FileHandle h, OpenFileCtx ctx) { - openFileMap.put(h, ctx); - if (LOG.isDebugEnabled()) { - LOG.debug("After add the new stream " + h.getFileId() - + ", the stream number:" + openFileMap.size()); + + private final OpenFileCtxCache fileContextCache; + + static public class MultipleCachedStreamException extends IOException { + private static final long serialVersionUID = 1L; + + public MultipleCachedStreamException(String msg) { + super(msg); } } + boolean addOpenFileStream(FileHandle h, OpenFileCtx ctx) { + return fileContextCache.put(h, ctx); + } + WriteManager(IdUserGroup iug, final Configuration config) { this.iug = iug; this.config = config; - - streamTimeout = config.getLong("dfs.nfs3.stream.timeout", - DEFAULT_STREAM_TIMEOUT); + streamTimeout = config.getLong(Nfs3Constant.OUTPUT_STREAM_TIMEOUT, + Nfs3Constant.OUTPUT_STREAM_TIMEOUT_DEFAULT); LOG.info("Stream timeout is " + streamTimeout + "ms."); - if (streamTimeout < MINIMIUM_STREAM_TIMEOUT) { + if (streamTimeout < Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT) { LOG.info("Reset stream timeout to minimum value " - + MINIMIUM_STREAM_TIMEOUT + "ms."); - streamTimeout = MINIMIUM_STREAM_TIMEOUT; + + Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT + "ms."); + streamTimeout = Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT; } - - this.streamMonitor = new StreamMonitor(); + maxStreams = config.getInt(Nfs3Constant.MAX_OPEN_FILES, + Nfs3Constant.MAX_OPEN_FILES_DEFAULT); + LOG.info("Maximum open streams is "+ maxStreams); + this.fileContextCache = new OpenFileCtxCache(config, streamTimeout); } - private void startAsyncDataSerivce() { - streamMonitor.start(); + void startAsyncDataSerivce() { + if (asyncDataServiceStarted) { + return; + } + fileContextCache.start(); this.asyncDataService = new AsyncDataService(); asyncDataServiceStarted = true; } - private void shutdownAsyncDataService() { - asyncDataService.shutdown(); + void shutdownAsyncDataService() { + if (!asyncDataServiceStarted) { + return; + } asyncDataServiceStarted = false; - streamMonitor.interrupt(); + asyncDataService.shutdown(); + fileContextCache.shutdown(); } void handleWrite(DFSClient dfsClient, WRITE3Request request, Channel channel, int xid, Nfs3FileAttributes preOpAttr) throws IOException { - // First write request starts the async data service - if (!asyncDataServiceStarted) { - startAsyncDataSerivce(); - } - - long offset = request.getOffset(); int count = request.getCount(); - WriteStableHow stableHow = request.getStableHow(); byte[] data = request.getData().array(); if (data.length < count) { WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL); @@ -129,13 +129,12 @@ public class WriteManager { FileHandle handle = request.getHandle(); if (LOG.isDebugEnabled()) { - LOG.debug("handleWrite fileId: " + handle.getFileId() + " offset: " - + offset + " length:" + count + " stableHow:" + stableHow.getValue()); + LOG.debug("handleWrite " + request); } // Check if there is a stream to write FileHandle fileHandle = request.getHandle(); - OpenFileCtx openFileCtx = openFileMap.get(fileHandle); + OpenFileCtx openFileCtx = fileContextCache.get(fileHandle); if (openFileCtx == null) { LOG.info("No opened stream for fileId:" + fileHandle.getFileId()); @@ -150,6 +149,15 @@ public class WriteManager { fos = dfsClient.append(fileIdPath, bufferSize, null, null); latestAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug); + } catch (RemoteException e) { + IOException io = e.unwrapRemoteException(); + if (io instanceof AlreadyBeingCreatedException) { + LOG.warn("Can't append file:" + fileIdPath + + ". Possibly the file is being closed. Drop the request:" + + request + ", wait for the client to retry..."); + return; + } + throw e; } catch (IOException e) { LOG.error("Can't apapend to file:" + fileIdPath + ", error:" + e); if (fos != null) { @@ -170,9 +178,26 @@ public class WriteManager { Nfs3Constant.FILE_DUMP_DIR_DEFAULT); openFileCtx = new OpenFileCtx(fos, latestAttr, writeDumpDir + "/" + fileHandle.getFileId(), dfsClient, iug); - addOpenFileStream(fileHandle, openFileCtx); + + if (!addOpenFileStream(fileHandle, openFileCtx)) { + LOG.info("Can't add new stream. Close it. Tell client to retry."); + try { + fos.close(); + } catch (IOException e) { + LOG.error("Can't close stream for fileId:" + handle.getFileId()); + } + // Notify client to retry + WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr); + WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_JUKEBOX, + fileWcc, 0, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF); + Nfs3Utils.writeChannel(channel, + response.writeHeaderAndResponse(new XDR(), xid, new VerifierNone()), + xid); + return; + } + if (LOG.isDebugEnabled()) { - LOG.debug("opened stream for file:" + fileHandle.getFileId()); + LOG.debug("Opened stream for appending file:" + fileHandle.getFileId()); } } @@ -185,7 +210,7 @@ public class WriteManager { void handleCommit(DFSClient dfsClient, FileHandle fileHandle, long commitOffset, Channel channel, int xid, Nfs3FileAttributes preOpAttr) { int status; - OpenFileCtx openFileCtx = openFileMap.get(fileHandle); + OpenFileCtx openFileCtx = fileContextCache.get(fileHandle); if (openFileCtx == null) { LOG.info("No opened stream for fileId:" + fileHandle.getFileId() @@ -238,7 +263,7 @@ public class WriteManager { String fileIdPath = Nfs3Utils.getFileIdPath(fileHandle); Nfs3FileAttributes attr = Nfs3Utils.getFileAttr(client, fileIdPath, iug); if (attr != null) { - OpenFileCtx openFileCtx = openFileMap.get(fileHandle); + OpenFileCtx openFileCtx = fileContextCache.get(fileHandle); if (openFileCtx != null) { attr.setSize(openFileCtx.getNextOffset()); attr.setUsed(openFileCtx.getNextOffset()); @@ -253,8 +278,8 @@ public class WriteManager { Nfs3FileAttributes attr = Nfs3Utils.getFileAttr(client, fileIdPath, iug); if ((attr != null) && (attr.getType() == NfsFileType.NFSREG.toValue())) { - OpenFileCtx openFileCtx = openFileMap - .get(new FileHandle(attr.getFileId())); + OpenFileCtx openFileCtx = fileContextCache.get(new FileHandle(attr + .getFileId())); if (openFileCtx != null) { attr.setSize(openFileCtx.getNextOffset()); @@ -263,56 +288,9 @@ public class WriteManager { } return attr; } - - @VisibleForTesting - ConcurrentMap<FileHandle, OpenFileCtx> getOpenFileMap() { - return this.openFileMap; - } - - /** - * StreamMonitor wakes up periodically to find and closes idle streams. - */ - class StreamMonitor extends Daemon { - private int rotation = 5 * 1000; // 5 seconds - private long lastWakeupTime = 0; - - @Override - public void run() { - while (true) { - Iterator<Entry<FileHandle, OpenFileCtx>> it = openFileMap.entrySet() - .iterator(); - if (LOG.isTraceEnabled()) { - LOG.trace("openFileMap size:" + openFileMap.size()); - } - while (it.hasNext()) { - Entry<FileHandle, OpenFileCtx> pairs = it.next(); - OpenFileCtx ctx = pairs.getValue(); - if (ctx.streamCleanup((pairs.getKey()).getFileId(), streamTimeout)) { - it.remove(); - if (LOG.isDebugEnabled()) { - LOG.debug("After remove stream " + pairs.getKey().getFileId() - + ", the stream number:" + openFileMap.size()); - } - } - } - - // Check if it can sleep - try { - long workedTime = System.currentTimeMillis() - lastWakeupTime; - if (workedTime < rotation) { - if (LOG.isTraceEnabled()) { - LOG.trace("StreamMonitor can still have a sleep:" - + ((rotation - workedTime) / 1000)); - } - Thread.sleep(rotation - workedTime); - } - lastWakeupTime = System.currentTimeMillis(); - } catch (InterruptedException e) { - LOG.info("StreamMonitor got interrupted"); - return; - } - } - } + @VisibleForTesting + OpenFileCtxCache getOpenFileCtxCache() { + return this.fileContextCache; } } Modified: hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java?rev=1539897&r1=1539896&r2=1539897&view=diff ============================================================================== --- hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java (original) +++ hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java Fri Nov 8 01:41:43 2013 @@ -51,7 +51,7 @@ public class TestMountd { Nfs3 nfs3 = new Nfs3(exports, config); nfs3.start(false); - RpcProgramMountd mountd = (RpcProgramMountd) nfs3.getMountBase() + RpcProgramMountd mountd = (RpcProgramMountd) nfs3.getMountd() .getRpcProgram(); mountd.nullOp(new XDR(), 1234, InetAddress.getByName("localhost")); Modified: hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java?rev=1539897&r1=1539896&r2=1539897&view=diff ============================================================================== --- hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java (original) +++ hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java Fri Nov 8 01:41:43 2013 @@ -135,6 +135,7 @@ public class TestOutOfOrderWrite { @Override protected ChannelPipelineFactory setPipelineFactory() { this.pipelineFactory = new ChannelPipelineFactory() { + @Override public ChannelPipeline getPipeline() { return Channels.pipeline( RpcUtil.constructRpcFrameDecoder(), Modified: hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java?rev=1539897&r1=1539896&r2=1539897&view=diff ============================================================================== --- hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java (original) +++ hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java Fri Nov 8 01:41:43 2013 @@ -186,9 +186,8 @@ public class TestWrites { private void waitWrite(RpcProgramNfs3 nfsd, FileHandle handle, int maxWaitTime) throws InterruptedException { int waitedTime = 0; - ConcurrentMap<FileHandle, OpenFileCtx> openFileMap = nfsd.getWriteManager() - .getOpenFileMap(); - OpenFileCtx ctx = openFileMap.get(handle); + OpenFileCtx ctx = nfsd.getWriteManager() + .getOpenFileCtxCache().get(handle); assertTrue(ctx != null); do { Thread.sleep(3000); Modified: hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1539897&r1=1539896&r2=1539897&view=diff ============================================================================== --- hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Nov 8 01:41:43 2013 @@ -93,6 +93,8 @@ Release 2.2.1 - UNRELEASED HDFS-5252. Stable write is not handled correctly in someplace. (brandonli) + HDFS-5364. Add OpenFileCtx cache. (brandonli) + Release 2.2.0 - 2013-10-13 INCOMPATIBLE CHANGES