Author: jing9 Date: Mon Sep 23 20:02:38 2013 New Revision: 1525681 URL: http://svn.apache.org/r1525681 Log: HDFS-4971. Move IO operations out of locking in OpenFileCtx. Contributed by Jing Zhao and Brandon Li.
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOffsetRange.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java?rev=1525681&r1=1525680&r2=1525681&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java Mon Sep 23 20:02:38 2013 @@ -97,7 +97,7 @@ public class AsyncDataService { void writeAsync(OpenFileCtx openFileCtx) { if (LOG.isDebugEnabled()) { LOG.debug("Scheduling write back task for fileId: " - + openFileCtx.copyLatestAttr().getFileId()); + + openFileCtx.getLatestAttr().getFileId()); } WriteBackTask wbTask = new WriteBackTask(openFileCtx); execute(wbTask); @@ -125,7 +125,7 @@ public class AsyncDataService { public String toString() { // Called in AsyncDataService.execute for displaying error messages. return "write back data for fileId" - + openFileCtx.copyLatestAttr().getFileId() + " with nextOffset " + + openFileCtx.getLatestAttr().getFileId() + " with nextOffset " + openFileCtx.getNextOffset(); } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java?rev=1525681&r1=1525680&r2=1525681&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java Mon Sep 23 20:02:38 2013 @@ -17,19 +17,34 @@ */ package org.apache.hadoop.hdfs.nfs.nfs3; +import java.util.Comparator; + +import com.google.common.base.Preconditions; + /** * OffsetRange is the range of read/write request. A single point (e.g.,[5,5]) * is not a valid range. */ -public class OffsetRange implements Comparable<OffsetRange> { +public class OffsetRange { + + public static final Comparator<OffsetRange> ReverseComparatorOnMin = + new Comparator<OffsetRange>() { + @Override + public int compare(OffsetRange o1, OffsetRange o2) { + if (o1.getMin() == o2.getMin()) { + return o1.getMax() < o2.getMax() ? + 1 : (o1.getMax() > o2.getMax() ? -1 : 0); + } else { + return o1.getMin() < o2.getMin() ? 1 : -1; + } + } + }; + private final long min; private final long max; OffsetRange(long min, long max) { - if ((min >= max) || (min < 0) || (max < 0)) { - throw new IllegalArgumentException("Wrong offset range: (" + min + "," - + max + ")"); - } + Preconditions.checkArgument(min >= 0 && max >= 0 && min < max); this.min = min; this.max = max; } @@ -49,24 +64,10 @@ public class OffsetRange implements Comp @Override public boolean equals(Object o) { - assert (o instanceof OffsetRange); - OffsetRange range = (OffsetRange) o; - return (min == range.getMin()) && (max == range.getMax()); - } - - private static int compareTo(long left, long right) { - if (left < right) { - return -1; - } else if (left > right) { - return 1; - } else { - return 0; + if (o instanceof OffsetRange) { + OffsetRange range = (OffsetRange) o; + return (min == range.getMin()) && (max == range.getMax()); } - } - - @Override - public int compareTo(OffsetRange other) { - final int d = compareTo(min, other.getMin()); - return d != 0 ? d : compareTo(max, other.getMax()); + return false; } } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java?rev=1525681&r1=1525680&r2=1525681&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java Mon Sep 23 20:02:38 2013 @@ -22,12 +22,14 @@ import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.RandomAccessFile; +import java.nio.channels.ClosedChannelException; import java.security.InvalidParameterException; import java.util.EnumSet; import java.util.Iterator; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.concurrent.locks.ReentrantLock; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -50,8 +52,11 @@ import org.apache.hadoop.nfs.nfs3.respon import org.apache.hadoop.nfs.nfs3.response.WccData; import org.apache.hadoop.oncrpc.XDR; import org.apache.hadoop.oncrpc.security.VerifierNone; +import org.apache.hadoop.util.Daemon; import org.jboss.netty.channel.Channel; +import com.google.common.base.Preconditions; + /** * OpenFileCtx saves the context of one HDFS file output stream. Access to it is * synchronized by its member lock. @@ -59,34 +64,42 @@ import org.jboss.netty.channel.Channel; class OpenFileCtx { public static final Log LOG = LogFactory.getLog(OpenFileCtx.class); - /** - * Lock to synchronize OpenFileCtx changes. Thread should get this lock before - * any read/write operation to an OpenFileCtx object - */ - private final ReentrantLock ctxLock; + // Pending writes water mark for dump, 1MB + private static long DUMP_WRITE_WATER_MARK = 1024 * 1024; + + public final static int COMMIT_FINISHED = 0; + public final static int COMMIT_WAIT = 1; + public final static int COMMIT_INACTIVE_CTX = 2; + public final static int COMMIT_INACTIVE_WITH_PENDING_WRITE = 3; + public final static int COMMIT_ERROR = 4; // The stream status. False means the stream is closed. - private boolean activeState; + private volatile boolean activeState; // The stream write-back status. True means one thread is doing write back. - private boolean asyncStatus; + private volatile boolean asyncStatus; + /** + * The current offset of the file in HDFS. All the content before this offset + * has been written back to HDFS. + */ + private AtomicLong nextOffset; private final HdfsDataOutputStream fos; + + // TODO: make it mutable and update it after each writing back to HDFS private final Nfs3FileAttributes latestAttr; - private long nextOffset; - private final SortedMap<OffsetRange, WriteCtx> pendingWrites; + private final ConcurrentNavigableMap<OffsetRange, WriteCtx> pendingWrites; // The last write, commit request or write-back event. Updating time to keep // output steam alive. private long lastAccessTime; - // Pending writes water mark for dump, 1MB - private static int DUMP_WRITE_WATER_MARK = 1024 * 1024; + private volatile boolean enabledDump; private FileOutputStream dumpOut; - private long nonSequentialWriteInMemory; - private boolean enabledDump; + private AtomicLong nonSequentialWriteInMemory; private RandomAccessFile raf; private final String dumpFilePath; + private Daemon dumpThread; private void updateLastAccessTime() { lastAccessTime = System.currentTimeMillis(); @@ -96,89 +109,50 @@ class OpenFileCtx { return System.currentTimeMillis() - lastAccessTime > streamTimeout; } + public long getNextOffset() { + return nextOffset.get(); + } + // Increase or decrease the memory occupation of non-sequential writes private long updateNonSequentialWriteInMemory(long count) { - nonSequentialWriteInMemory += count; + long newValue = nonSequentialWriteInMemory.addAndGet(count); if (LOG.isDebugEnabled()) { LOG.debug("Update nonSequentialWriteInMemory by " + count + " new value:" - + nonSequentialWriteInMemory); + + newValue); } - if (nonSequentialWriteInMemory < 0) { - LOG.error("nonSequentialWriteInMemory is negative after update with count " - + count); - throw new IllegalArgumentException( - "nonSequentialWriteInMemory is negative after update with count " - + count); - } - return nonSequentialWriteInMemory; + Preconditions.checkState(newValue >= 0, + "nonSequentialWriteInMemory is negative after update with count " + + count); + return newValue; } OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr, String dumpFilePath) { this.fos = fos; this.latestAttr = latestAttr; - pendingWrites = new TreeMap<OffsetRange, WriteCtx>(); + // We use the ReverseComparatorOnMin as the comparator of the map. In this + // way, we first dump the data with larger offset. In the meanwhile, we + // retrieve the last element to write back to HDFS. + pendingWrites = new ConcurrentSkipListMap<OffsetRange, WriteCtx>( + OffsetRange.ReverseComparatorOnMin); updateLastAccessTime(); activeState = true; asyncStatus = false; dumpOut = null; raf = null; - nonSequentialWriteInMemory = 0; + nonSequentialWriteInMemory = new AtomicLong(0); + this.dumpFilePath = dumpFilePath; enabledDump = dumpFilePath == null ? false: true; - nextOffset = latestAttr.getSize(); - assert(nextOffset == this.fos.getPos()); - - ctxLock = new ReentrantLock(true); + nextOffset = new AtomicLong(); + nextOffset.set(latestAttr.getSize()); + assert(nextOffset.get() == this.fos.getPos()); + dumpThread = null; } - private void lockCtx() { - if (LOG.isTraceEnabled()) { - StackTraceElement[] stacktrace = Thread.currentThread().getStackTrace(); - StackTraceElement e = stacktrace[2]; - String methodName = e.getMethodName(); - LOG.trace("lock ctx, caller:" + methodName); - } - ctxLock.lock(); - } - - private void unlockCtx() { - ctxLock.unlock(); - if (LOG.isTraceEnabled()) { - StackTraceElement[] stacktrace = Thread.currentThread().getStackTrace(); - StackTraceElement e = stacktrace[2]; - String methodName = e.getMethodName(); - LOG.info("unlock ctx, caller:" + methodName); - } - } - - // Make a copy of the latestAttr - public Nfs3FileAttributes copyLatestAttr() { - Nfs3FileAttributes ret; - lockCtx(); - try { - ret = new Nfs3FileAttributes(latestAttr); - } finally { - unlockCtx(); - } - return ret; - } - - private long getNextOffsetUnprotected() { - assert(ctxLock.isLocked()); - return nextOffset; - } - - public long getNextOffset() { - long ret; - lockCtx(); - try { - ret = getNextOffsetUnprotected(); - } finally { - unlockCtx(); - } - return ret; + public Nfs3FileAttributes getLatestAttr() { + return latestAttr; } // Get flushed offset. Note that flushed data may not be persisted. @@ -187,12 +161,7 @@ class OpenFileCtx { } // Check if need to dump the new writes - private void checkDump(long count) { - assert (ctxLock.isLocked()); - - // Always update the in memory count - updateNonSequentialWriteInMemory(count); - + private void checkDump() { if (!enabledDump) { if (LOG.isDebugEnabled()) { LOG.debug("Do nothing, dump is disabled."); @@ -200,66 +169,111 @@ class OpenFileCtx { return; } - if (nonSequentialWriteInMemory < DUMP_WRITE_WATER_MARK) { + if (nonSequentialWriteInMemory.get() < DUMP_WRITE_WATER_MARK) { return; } - // Create dump outputstream for the first time - if (dumpOut == null) { - LOG.info("Create dump file:" + dumpFilePath); - File dumpFile = new File(dumpFilePath); - try { - if (dumpFile.exists()) { - LOG.fatal("The dump file should not exist:" + dumpFilePath); - throw new RuntimeException("The dump file should not exist:" - + dumpFilePath); + // wake up the dumper thread to dump the data + synchronized (this) { + if (nonSequentialWriteInMemory.get() >= DUMP_WRITE_WATER_MARK) { + if (LOG.isDebugEnabled()) { + LOG.debug("Asking dumper to dump..."); + } + if (dumpThread == null) { + dumpThread = new Daemon(new Dumper()); + dumpThread.start(); + } else { + this.notifyAll(); } - dumpOut = new FileOutputStream(dumpFile); - } catch (IOException e) { - LOG.error("Got failure when creating dump stream " + dumpFilePath - + " with error:" + e); - enabledDump = false; - IOUtils.cleanup(LOG, dumpOut); - return; } } - // Get raf for the first dump - if (raf == null) { - try { - raf = new RandomAccessFile(dumpFilePath, "r"); - } catch (FileNotFoundException e) { - LOG.error("Can't get random access to file " + dumpFilePath); - // Disable dump - enabledDump = false; - return; + } + + class Dumper implements Runnable { + /** Dump data into a file */ + private void dump() { + // Create dump outputstream for the first time + if (dumpOut == null) { + LOG.info("Create dump file:" + dumpFilePath); + File dumpFile = new File(dumpFilePath); + try { + synchronized (this) { + // check if alive again + Preconditions.checkState(dumpFile.createNewFile(), + "The dump file should not exist: %s", dumpFilePath); + dumpOut = new FileOutputStream(dumpFile); + } + } catch (IOException e) { + LOG.error("Got failure when creating dump stream " + dumpFilePath, e); + enabledDump = false; + if (dumpOut != null) { + try { + dumpOut.close(); + } catch (IOException e1) { + LOG.error("Can't close dump stream " + dumpFilePath, e); + } + } + return; + } } - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Start dump, current write number:" + pendingWrites.size()); - } - Iterator<OffsetRange> it = pendingWrites.keySet().iterator(); - while (it.hasNext()) { - OffsetRange key = it.next(); - WriteCtx writeCtx = pendingWrites.get(key); - try { - long dumpedDataSize = writeCtx.dumpData(dumpOut, raf); - if (dumpedDataSize > 0) { - updateNonSequentialWriteInMemory(-dumpedDataSize); + + // Get raf for the first dump + if (raf == null) { + try { + raf = new RandomAccessFile(dumpFilePath, "r"); + } catch (FileNotFoundException e) { + LOG.error("Can't get random access to file " + dumpFilePath); + // Disable dump + enabledDump = false; + return; } - } catch (IOException e) { - LOG.error("Dump data failed:" + writeCtx + " with error:" + e); - // Disable dump - enabledDump = false; - return; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Start dump. Before dump, nonSequentialWriteInMemory == " + + nonSequentialWriteInMemory.get()); + } + + Iterator<OffsetRange> it = pendingWrites.keySet().iterator(); + while (activeState && it.hasNext() + && nonSequentialWriteInMemory.get() > 0) { + OffsetRange key = it.next(); + WriteCtx writeCtx = pendingWrites.get(key); + try { + long dumpedDataSize = writeCtx.dumpData(dumpOut, raf); + if (dumpedDataSize > 0) { + updateNonSequentialWriteInMemory(-dumpedDataSize); + } + } catch (IOException e) { + LOG.error("Dump data failed:" + writeCtx + " with error:" + e + + " OpenFileCtx state:" + activeState); + // Disable dump + enabledDump = false; + return; + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("After dump, nonSequentialWriteInMemory == " + + nonSequentialWriteInMemory.get()); } } - if (nonSequentialWriteInMemory != 0) { - LOG.fatal("After dump, nonSequentialWriteInMemory is not zero: " - + nonSequentialWriteInMemory); - throw new RuntimeException( - "After dump, nonSequentialWriteInMemory is not zero: " - + nonSequentialWriteInMemory); + + @Override + public void run() { + while (activeState && enabledDump) { + if (nonSequentialWriteInMemory.get() >= DUMP_WRITE_WATER_MARK) { + dump(); + } + synchronized (OpenFileCtx.this) { + if (nonSequentialWriteInMemory.get() < DUMP_WRITE_WATER_MARK) { + try { + OpenFileCtx.this.wait(); + } catch (InterruptedException e) { + } + } + } + } } } @@ -283,148 +297,196 @@ class OpenFileCtx { public void receivedNewWrite(DFSClient dfsClient, WRITE3Request request, Channel channel, int xid, AsyncDataService asyncDataService, IdUserGroup iug) { - - lockCtx(); - try { - if (!activeState) { - LOG.info("OpenFileCtx is inactive, fileId:" - + request.getHandle().getFileId()); - WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr); - WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, - fileWcc, 0, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF); - Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse( - new XDR(), xid, new VerifierNone()), xid); - } else { - // Handle repeated write requests(same xid or not). - // If already replied, send reply again. If not replied, drop the - // repeated request. - WriteCtx existantWriteCtx = checkRepeatedWriteRequest(request, channel, - xid); - if (existantWriteCtx != null) { - if (!existantWriteCtx.getReplied()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Repeated write request which hasn't be served: xid=" - + xid + ", drop it."); - } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Repeated write request which is already served: xid=" - + xid + ", resend response."); - } - WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr); - WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK, - fileWcc, request.getCount(), request.getStableHow(), - Nfs3Constant.WRITE_COMMIT_VERF); - Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse( - new XDR(), xid, new VerifierNone()), xid); + + if (!activeState) { + LOG.info("OpenFileCtx is inactive, fileId:" + + request.getHandle().getFileId()); + WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr); + WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, + fileWcc, 0, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF); + Nfs3Utils.writeChannel(channel, + response.writeHeaderAndResponse(new XDR(), xid, new VerifierNone()), + xid); + } else { + // Update the write time first + updateLastAccessTime(); + + // Handle repeated write requests (same xid or not). + // If already replied, send reply again. If not replied, drop the + // repeated request. + WriteCtx existantWriteCtx = checkRepeatedWriteRequest(request, channel, + xid); + if (existantWriteCtx != null) { + if (!existantWriteCtx.getReplied()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Repeated write request which hasn't be served: xid=" + + xid + ", drop it."); } - updateLastAccessTime(); - } else { - receivedNewWriteInternal(dfsClient, request, channel, xid, - asyncDataService, iug); + if (LOG.isDebugEnabled()) { + LOG.debug("Repeated write request which is already served: xid=" + + xid + ", resend response."); + } + WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr); + WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK, + fileWcc, request.getCount(), request.getStableHow(), + Nfs3Constant.WRITE_COMMIT_VERF); + Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse( + new XDR(), xid, new VerifierNone()), xid); } + } else { + // not a repeated write request + receivedNewWriteInternal(dfsClient, request, channel, xid, + asyncDataService, iug); } - - } finally { - unlockCtx(); } } - private void receivedNewWriteInternal(DFSClient dfsClient, - WRITE3Request request, Channel channel, int xid, - AsyncDataService asyncDataService, IdUserGroup iug) { + /** + * Creates and adds a WriteCtx into the pendingWrites map. This is a + * synchronized method to handle concurrent writes. + * + * @return A non-null {@link WriteCtx} instance if the incoming write + * request's offset >= nextOffset. Otherwise null. + */ + private synchronized WriteCtx addWritesToCache(WRITE3Request request, + Channel channel, int xid) { long offset = request.getOffset(); int count = request.getCount(); - WriteStableHow stableHow = request.getStableHow(); + long cachedOffset = nextOffset.get(); - // Get file length, fail non-append call - WccAttr preOpAttr = latestAttr.getWccAttr(); if (LOG.isDebugEnabled()) { - LOG.debug("requesed offset=" + offset + " and current filesize=" - + preOpAttr.getSize()); + LOG.debug("requesed offset=" + offset + " and current offset=" + + cachedOffset); } - long nextOffset = getNextOffsetUnprotected(); - if (offset == nextOffset) { - LOG.info("Add to the list, update nextOffset and notify the writer," - + " nextOffset:" + nextOffset); + // Fail non-append call + if (offset < cachedOffset) { + LOG.warn("(offset,count,nextOffset):" + "(" + offset + "," + count + "," + + nextOffset + ")"); + return null; + } else { + DataState dataState = offset == cachedOffset ? WriteCtx.DataState.NO_DUMP + : WriteCtx.DataState.ALLOW_DUMP; WriteCtx writeCtx = new WriteCtx(request.getHandle(), request.getOffset(), request.getCount(), request.getStableHow(), - request.getData().array(), channel, xid, false, DataState.NO_DUMP); - addWrite(writeCtx); - - // Create an async task and change openFileCtx status to indicate async - // task pending + request.getData().array(), channel, xid, false, dataState); + if (LOG.isDebugEnabled()) { + LOG.debug("Add new write to the list with nextOffset " + cachedOffset + + " and requesed offset=" + offset); + } + if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) { + // update the memory size + updateNonSequentialWriteInMemory(count); + } + // check if there is a WriteCtx with the same range in pendingWrites + WriteCtx oldWriteCtx = checkRepeatedWriteRequest(request, channel, xid); + if (oldWriteCtx == null) { + addWrite(writeCtx); + } else { + LOG.warn("Got a repeated request, same range, with xid:" + + writeCtx.getXid()); + } + return writeCtx; + } + } + + /** Process an overwrite write request */ + private void processOverWrite(DFSClient dfsClient, WRITE3Request request, + Channel channel, int xid, IdUserGroup iug) { + WccData wccData = new WccData(latestAttr.getWccAttr(), null); + long offset = request.getOffset(); + int count = request.getCount(); + WriteStableHow stableHow = request.getStableHow(); + WRITE3Response response; + long cachedOffset = nextOffset.get(); + if (offset + count > cachedOffset) { + LOG.warn("Haven't noticed any partial overwrite for a sequential file" + + " write requests. Treat it as a real random write, no support."); + response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0, + WriteStableHow.UNSTABLE, 0); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Process perfectOverWrite"); + } + // TODO: let executor handle perfect overwrite + response = processPerfectOverWrite(dfsClient, offset, count, stableHow, + request.getData().array(), + Nfs3Utils.getFileIdPath(request.getHandle()), wccData, iug); + } + updateLastAccessTime(); + Nfs3Utils.writeChannel(channel, + response.writeHeaderAndResponse(new XDR(), xid, new VerifierNone()), + xid); + } + + /** + * Check if we can start the write (back to HDFS) now. If there is no hole for + * writing, and there is no other threads writing (i.e., asyncStatus is + * false), start the writing and set asyncStatus to true. + * + * @return True if the new write is sequencial and we can start writing + * (including the case that there is already a thread writing). + */ + private synchronized boolean checkAndStartWrite( + AsyncDataService asyncDataService, WriteCtx writeCtx) { + + if (writeCtx.getOffset() == nextOffset.get()) { if (!asyncStatus) { + if (LOG.isDebugEnabled()) { + LOG.debug("Trigger the write back task. Current nextOffset: " + + nextOffset.get()); + } asyncStatus = true; asyncDataService.execute(new AsyncDataService.WriteBackTask(this)); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("The write back thread is working."); + } } - - // Update the write time first - updateLastAccessTime(); - Nfs3FileAttributes postOpAttr = new Nfs3FileAttributes(latestAttr); - - // Send response immediately for unstable write - if (request.getStableHow() == WriteStableHow.UNSTABLE) { - WccData fileWcc = new WccData(preOpAttr, postOpAttr); - WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK, - fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF); - Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse( - new XDR(), xid, new VerifierNone()), xid); - writeCtx.setReplied(true); - } - - } else if (offset > nextOffset) { - LOG.info("Add new write to the list but not update nextOffset:" - + nextOffset); - WriteCtx writeCtx = new WriteCtx(request.getHandle(), - request.getOffset(), request.getCount(), request.getStableHow(), - request.getData().array(), channel, xid, false, DataState.ALLOW_DUMP); - addWrite(writeCtx); + return true; + } else { + return false; + } + } - // Check if need to dump some pending requests to file - checkDump(request.getCount()); - updateLastAccessTime(); - Nfs3FileAttributes postOpAttr = new Nfs3FileAttributes(latestAttr); - - // In test, noticed some Linux client sends a batch (e.g., 1MB) - // of reordered writes and won't send more writes until it gets - // responses of the previous batch. So here send response immediately for - // unstable non-sequential write - if (request.getStableHow() == WriteStableHow.UNSTABLE) { - WccData fileWcc = new WccData(preOpAttr, postOpAttr); - WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK, - fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF); - Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse( - new XDR(), xid, new VerifierNone()), xid); - writeCtx.setReplied(true); - } + private void receivedNewWriteInternal(DFSClient dfsClient, + WRITE3Request request, Channel channel, int xid, + AsyncDataService asyncDataService, IdUserGroup iug) { + WriteStableHow stableHow = request.getStableHow(); + WccAttr preOpAttr = latestAttr.getWccAttr(); + int count = request.getCount(); - } else { + WriteCtx writeCtx = addWritesToCache(request, channel, xid); + if (writeCtx == null) { // offset < nextOffset - LOG.warn("(offset,count,nextOffset):" + "(" + offset + "," + count + "," - + nextOffset + ")"); - WccData wccData = new WccData(preOpAttr, null); - WRITE3Response response; - - if (offset + count > nextOffset) { - LOG.warn("Haven't noticed any partial overwrite out of a sequential file" - + "write requests, so treat it as a real random write, no support."); - response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0, - WriteStableHow.UNSTABLE, 0); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Process perfectOverWrite"); + processOverWrite(dfsClient, request, channel, xid, iug); + } else { + // The writes is added to pendingWrites. + // Check and start writing back if necessary + boolean startWriting = checkAndStartWrite(asyncDataService, writeCtx); + if (!startWriting) { + // offset > nextOffset. check if we need to dump data + checkDump(); + + // In test, noticed some Linux client sends a batch (e.g., 1MB) + // of reordered writes and won't send more writes until it gets + // responses of the previous batch. So here send response immediately + // for unstable non-sequential write + if (request.getStableHow() == WriteStableHow.UNSTABLE) { + if (LOG.isDebugEnabled()) { + LOG.debug("UNSTABLE write request, send response for offset: " + + writeCtx.getOffset()); + } + WccData fileWcc = new WccData(preOpAttr, latestAttr); + WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK, + fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF); + Nfs3Utils + .writeChannel(channel, response.writeHeaderAndResponse(new XDR(), + xid, new VerifierNone()), xid); + writeCtx.setReplied(true); } - response = processPerfectOverWrite(dfsClient, offset, count, stableHow, - request.getData().array(), - Nfs3Utils.getFileIdPath(request.getHandle()), wccData, iug); } - - updateLastAccessTime(); - Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse( - new XDR(), xid, new VerifierNone()), xid); } } @@ -436,7 +498,6 @@ class OpenFileCtx { private WRITE3Response processPerfectOverWrite(DFSClient dfsClient, long offset, int count, WriteStableHow stableHow, byte[] data, String path, WccData wccData, IdUserGroup iug) { - assert (ctxLock.isLocked()); WRITE3Response response = null; // Read the content back @@ -447,21 +508,30 @@ class OpenFileCtx { try { // Sync file data and length to avoid partial read failure fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); - + } catch (ClosedChannelException closedException) { + LOG.info("The FSDataOutputStream has been closed. " + + "Continue processing the perfect overwrite."); + } catch (IOException e) { + LOG.info("hsync failed when processing possible perfect overwrite, path=" + + path + " error:" + e); + return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow, + Nfs3Constant.WRITE_COMMIT_VERF); + } + + try { fis = new FSDataInputStream(dfsClient.open(path)); readCount = fis.read(offset, readbuffer, 0, count); if (readCount < count) { LOG.error("Can't read back " + count + " bytes, partial read size:" + readCount); - return response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, - stableHow, Nfs3Constant.WRITE_COMMIT_VERF); + return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow, + Nfs3Constant.WRITE_COMMIT_VERF); } - } catch (IOException e) { LOG.info("Read failed when processing possible perfect overwrite, path=" + path + " error:" + e); - return response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, - stableHow, Nfs3Constant.WRITE_COMMIT_VERF); + return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow, + Nfs3Constant.WRITE_COMMIT_VERF); } finally { IOUtils.cleanup(LOG, fis); } @@ -492,40 +562,26 @@ class OpenFileCtx { } return response; } - - public final static int COMMIT_FINISHED = 0; - public final static int COMMIT_WAIT = 1; - public final static int COMMIT_INACTIVE_CTX = 2; - public final static int COMMIT_ERROR = 3; /** * return one commit status: COMMIT_FINISHED, COMMIT_WAIT, * COMMIT_INACTIVE_CTX, COMMIT_ERROR */ public int checkCommit(long commitOffset) { - int ret = COMMIT_WAIT; - - lockCtx(); - try { - if (!activeState) { - ret = COMMIT_INACTIVE_CTX; - } else { - ret = checkCommitInternal(commitOffset); - } - } finally { - unlockCtx(); - } - return ret; + return activeState ? checkCommitInternal(commitOffset) + : COMMIT_INACTIVE_CTX; } private int checkCommitInternal(long commitOffset) { if (commitOffset == 0) { // Commit whole file - commitOffset = getNextOffsetUnprotected(); + commitOffset = nextOffset.get(); } long flushed = getFlushedOffset(); - LOG.info("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset); + if (LOG.isDebugEnabled()) { + LOG.debug("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset); + } if (flushed < commitOffset) { // Keep stream active updateLastAccessTime(); @@ -538,6 +594,13 @@ class OpenFileCtx { fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); // Nothing to do for metadata since attr related change is pass-through ret = COMMIT_FINISHED; + } catch (ClosedChannelException cce) { + ret = COMMIT_INACTIVE_CTX; + if (pendingWrites.isEmpty()) { + ret = COMMIT_INACTIVE_CTX; + } else { + ret = COMMIT_INACTIVE_WITH_PENDING_WRITE; + } } catch (IOException e) { LOG.error("Got stream error during data sync:" + e); // Do nothing. Stream will be closed eventually by StreamMonitor. @@ -550,18 +613,16 @@ class OpenFileCtx { } private void addWrite(WriteCtx writeCtx) { - assert (ctxLock.isLocked()); long offset = writeCtx.getOffset(); int count = writeCtx.getCount(); pendingWrites.put(new OffsetRange(offset, offset + count), writeCtx); } - /** * Check stream status to decide if it should be closed * @return true, remove stream; false, keep stream */ - public boolean streamCleanup(long fileId, long streamTimeout) { + public synchronized boolean streamCleanup(long fileId, long streamTimeout) { if (streamTimeout < WriteManager.MINIMIUM_STREAM_TIMEOUT) { throw new InvalidParameterException("StreamTimeout" + streamTimeout + "ms is less than MINIMIUM_STREAM_TIMEOUT " @@ -569,107 +630,97 @@ class OpenFileCtx { } boolean flag = false; - if (!ctxLock.tryLock()) { - if (LOG.isTraceEnabled()) { - LOG.trace("Another thread is working on it" + ctxLock.toString()); - } - return flag; - } - - try { - // Check the stream timeout - if (checkStreamTimeout(streamTimeout)) { - LOG.info("closing stream for fileId:" + fileId); - cleanup(); - flag = true; + // Check the stream timeout + if (checkStreamTimeout(streamTimeout)) { + if (LOG.isDebugEnabled()) { + LOG.debug("closing stream for fileId:" + fileId); } - } finally { - unlockCtx(); + cleanup(); + flag = true; } return flag; } - // Invoked by AsynDataService to do the write back - public void executeWriteBack() { - long nextOffset; - OffsetRange key; - WriteCtx writeCtx; - - try { - // Don't lock OpenFileCtx for all writes to reduce the timeout of other - // client request to the same file - while (true) { - lockCtx(); - if (!asyncStatus) { - // This should never happen. There should be only one thread working - // on one OpenFileCtx anytime. - LOG.fatal("The openFileCtx has false async status"); - throw new RuntimeException("The openFileCtx has false async status"); - } - // Any single write failure can change activeState to false, so do the - // check each loop. - if (pendingWrites.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("The asyn write task has no pendding writes, fileId: " - + latestAttr.getFileId()); - } - break; + /** + * Get (and remove) the next WriteCtx from {@link #pendingWrites} if possible. + * + * @return Null if {@link #pendingWrites} is null, or the next WriteCtx's + * offset is larger than nextOffSet. + */ + private synchronized WriteCtx offerNextToWrite() { + if (pendingWrites.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("The asyn write task has no pending writes, fileId: " + + latestAttr.getFileId()); + } + this.asyncStatus = false; + } else { + Entry<OffsetRange, WriteCtx> lastEntry = pendingWrites.lastEntry(); + OffsetRange range = lastEntry.getKey(); + WriteCtx toWrite = lastEntry.getValue(); + + if (LOG.isTraceEnabled()) { + LOG.trace("range.getMin()=" + range.getMin() + " nextOffset=" + + nextOffset); + } + + long offset = nextOffset.get(); + if (range.getMin() > offset) { + if (LOG.isDebugEnabled()) { + LOG.debug("The next sequencial write has not arrived yet"); } - if (!activeState) { - if (LOG.isDebugEnabled()) { - LOG.debug("The openFileCtx is not active anymore, fileId: " - + latestAttr.getFileId()); - } - break; + this.asyncStatus = false; + } else if (range.getMin() < offset && range.getMax() > offset) { + // shouldn't happen since we do sync for overlapped concurrent writers + LOG.warn("Got a overlapping write (" + range.getMin() + "," + + range.getMax() + "), nextOffset=" + offset + + ". Silently drop it now"); + pendingWrites.remove(range); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Remove write(" + range.getMin() + "-" + range.getMax() + + ") from the list"); } - - // Get the next sequential write - nextOffset = getNextOffsetUnprotected(); - key = pendingWrites.firstKey(); - if (LOG.isTraceEnabled()) { - LOG.trace("key.getMin()=" + key.getMin() + " nextOffset=" - + nextOffset); + // after writing, remove the WriteCtx from cache + pendingWrites.remove(range); + // update nextOffset + nextOffset.addAndGet(toWrite.getCount()); + if (LOG.isDebugEnabled()) { + LOG.debug("Change nextOffset to " + nextOffset.get()); } - - if (key.getMin() > nextOffset) { - if (LOG.isDebugEnabled()) { - LOG.info("The next sequencial write has not arrived yet"); - } - break; - - } else if (key.getMin() < nextOffset && key.getMax() > nextOffset) { - // Can't handle overlapping write. Didn't see it in tests yet. - LOG.fatal("Got a overlapping write (" + key.getMin() + "," - + key.getMax() + "), nextOffset=" + nextOffset); - throw new RuntimeException("Got a overlapping write (" + key.getMin() - + "," + key.getMax() + "), nextOffset=" + nextOffset); - - } else { - if (LOG.isTraceEnabled()) { - LOG.trace("Remove write(" + key.getMin() + "-" + key.getMax() - + ") from the list"); - } - writeCtx = pendingWrites.remove(key); + return toWrite; + } + } + return null; + } + + /** Invoked by AsynDataService to write back to HDFS */ + void executeWriteBack() { + Preconditions.checkState(asyncStatus, + "The openFileCtx has false async status"); + try { + while (activeState) { + WriteCtx toWrite = offerNextToWrite(); + if (toWrite != null) { // Do the write - doSingleWrite(writeCtx); + doSingleWrite(toWrite); updateLastAccessTime(); + } else { + break; } - - unlockCtx(); } - + + if (!activeState && LOG.isDebugEnabled()) { + LOG.debug("The openFileCtx is not active anymore, fileId: " + + +latestAttr.getFileId()); + } } finally { - // Always reset the async status so another async task can be created - // for this file + // make sure we reset asyncStatus to false asyncStatus = false; - if (ctxLock.isHeldByCurrentThread()) { - unlockCtx(); - } } } private void doSingleWrite(final WriteCtx writeCtx) { - assert(ctxLock.isLocked()); Channel channel = writeCtx.getChannel(); int xid = writeCtx.getXid(); @@ -679,20 +730,25 @@ class OpenFileCtx { byte[] data = null; try { data = writeCtx.getData(); - } catch (IOException e1) { + } catch (Exception e1) { LOG.error("Failed to get request data offset:" + offset + " count:" + count + " error:" + e1); // Cleanup everything cleanup(); return; } - assert (data.length == count); + + Preconditions.checkState(data.length == count); FileHandle handle = writeCtx.getHandle(); - LOG.info("do write, fileId: " + handle.getFileId() + " offset: " + offset - + " length:" + count + " stableHow:" + stableHow.getValue()); + if (LOG.isDebugEnabled()) { + LOG.debug("do write, fileId: " + handle.getFileId() + " offset: " + + offset + " length:" + count + " stableHow:" + stableHow.getValue()); + } try { + // The write is not protected by lock. asyncState is used to make sure + // there is one thread doing write back at any time fos.write(data, 0, count); long flushedOffset = getFlushedOffset(); @@ -701,11 +757,20 @@ class OpenFileCtx { + flushedOffset + " and nextOffset should be" + (offset + count)); } - nextOffset = flushedOffset; + + if (LOG.isDebugEnabled()) { + LOG.debug("After writing " + handle.getFileId() + " at offset " + + offset + ", update the memory count."); + } // Reduce memory occupation size if request was allowed dumped - if (writeCtx.getDataState() == DataState.ALLOW_DUMP) { - updateNonSequentialWriteInMemory(-count); + if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) { + synchronized (writeCtx) { + if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) { + writeCtx.setDataState(WriteCtx.DataState.NO_DUMP); + updateNonSequentialWriteInMemory(-count); + } + } } if (!writeCtx.getReplied()) { @@ -716,7 +781,6 @@ class OpenFileCtx { Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse( new XDR(), xid, new VerifierNone()), xid); } - } catch (IOException e) { LOG.error("Error writing to fileId " + handle.getFileId() + " at offset " + offset + " and length " + data.length, e); @@ -733,9 +797,21 @@ class OpenFileCtx { } } - private void cleanup() { - assert(ctxLock.isLocked()); + private synchronized void cleanup() { + if (!activeState) { + LOG.info("Current OpenFileCtx is already inactive, no need to cleanup."); + return; + } activeState = false; + + // stop the dump thread + if (dumpThread != null) { + dumpThread.interrupt(); + try { + dumpThread.join(3000); + } catch (InterruptedException e) { + } + } // Close stream try { @@ -753,7 +829,7 @@ class OpenFileCtx { while (!pendingWrites.isEmpty()) { OffsetRange key = pendingWrites.firstKey(); LOG.info("Fail pending write: (" + key.getMin() + "," + key.getMax() - + "), nextOffset=" + getNextOffsetUnprotected()); + + "), nextOffset=" + nextOffset.get()); WriteCtx writeCtx = pendingWrites.remove(key); if (!writeCtx.getReplied()) { @@ -767,23 +843,23 @@ class OpenFileCtx { } // Cleanup dump file - if (dumpOut!=null){ + if (dumpOut != null) { try { dumpOut.close(); } catch (IOException e) { e.printStackTrace(); } + File dumpFile = new File(dumpFilePath); + if (dumpFile.exists() && !dumpFile.delete()) { + LOG.error("Failed to delete dumpfile: " + dumpFile); + } } - if (raf!=null) { + if (raf != null) { try { raf.close(); } catch (IOException e) { e.printStackTrace(); } } - File dumpFile = new File(dumpFilePath); - if (dumpFile.delete()) { - LOG.error("Failed to delete dumpfile: "+ dumpFile); - } } } \ No newline at end of file Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java?rev=1525681&r1=1525680&r2=1525681&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java Mon Sep 23 20:02:38 2013 @@ -27,6 +27,8 @@ import org.apache.hadoop.nfs.nfs3.FileHa import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow; import org.jboss.netty.channel.Channel; +import com.google.common.base.Preconditions; + /** * WriteCtx saves the context of one write request, such as request, channel, * xid and reply status. @@ -49,13 +51,21 @@ class WriteCtx { private final long offset; private final int count; private final WriteStableHow stableHow; - private byte[] data; + private volatile byte[] data; private final Channel channel; private final int xid; private boolean replied; - private DataState dataState; + /** + * Data belonging to the same {@link OpenFileCtx} may be dumped to a file. + * After being dumped to the file, the corresponding {@link WriteCtx} records + * the dump file and the offset. + */ + private RandomAccessFile raf; + private long dumpFileOffset; + + private volatile DataState dataState; public DataState getDataState() { return dataState; @@ -64,12 +74,13 @@ class WriteCtx { public void setDataState(DataState dataState) { this.dataState = dataState; } - - private RandomAccessFile raf; - private long dumpFileOffset; - // Return the dumped data size - public long dumpData(FileOutputStream dumpOut, RandomAccessFile raf) + /** + * Writing the data into a local file. After the writing, if + * {@link #dataState} is still ALLOW_DUMP, set {@link #data} to null and set + * {@link #dataState} to DUMPED. + */ + long dumpData(FileOutputStream dumpOut, RandomAccessFile raf) throws IOException { if (dataState != DataState.ALLOW_DUMP) { if (LOG.isTraceEnabled()) { @@ -84,48 +95,63 @@ class WriteCtx { if (LOG.isDebugEnabled()) { LOG.debug("After dump, new dumpFileOffset:" + dumpFileOffset); } - data = null; - dataState = DataState.DUMPED; - return count; + // it is possible that while we dump the data, the data is also being + // written back to HDFS. After dump, if the writing back has not finished + // yet, we change its flag to DUMPED and set the data to null. Otherwise + // this WriteCtx instance should have been removed from the buffer. + if (dataState == DataState.ALLOW_DUMP) { + synchronized (this) { + if (dataState == DataState.ALLOW_DUMP) { + data = null; + dataState = DataState.DUMPED; + return count; + } + } + } + return 0; } - public FileHandle getHandle() { + FileHandle getHandle() { return handle; } - public long getOffset() { + long getOffset() { return offset; } - public int getCount() { + int getCount() { return count; } - public WriteStableHow getStableHow() { + WriteStableHow getStableHow() { return stableHow; } - public byte[] getData() throws IOException { + byte[] getData() throws IOException { if (dataState != DataState.DUMPED) { - if (data == null) { - throw new IOException("Data is not dumpted but has null:" + this); - } - } else { - // read back - if (data != null) { - throw new IOException("Data is dumpted but not null"); - } - data = new byte[count]; - raf.seek(dumpFileOffset); - int size = raf.read(data, 0, count); - if (size != count) { - throw new IOException("Data count is " + count + ", but read back " - + size + "bytes"); + synchronized (this) { + if (dataState != DataState.DUMPED) { + Preconditions.checkState(data != null); + return data; + } } } + // read back from dumped file + this.loadData(); return data; } + private void loadData() throws IOException { + Preconditions.checkState(data == null); + data = new byte[count]; + raf.seek(dumpFileOffset); + int size = raf.read(data, 0, count); + if (size != count) { + throw new IOException("Data count is " + count + ", but read back " + + size + "bytes"); + } + } + Channel getChannel() { return channel; } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java?rev=1525681&r1=1525680&r2=1525681&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java Mon Sep 23 20:02:38 2013 @@ -67,8 +67,8 @@ public class WriteManager { */ private long streamTimeout; - public static final long DEFAULT_STREAM_TIMEOUT = 10 * 1000; // 10 second - public static final long MINIMIUM_STREAM_TIMEOUT = 1 * 1000; // 1 second + public static final long DEFAULT_STREAM_TIMEOUT = 10 * 60 * 1000; //10 minutes + public static final long MINIMIUM_STREAM_TIMEOUT = 10 * 1000; //10 seconds void addOpenFileStream(FileHandle h, OpenFileCtx ctx) { openFileMap.put(h, ctx); @@ -215,6 +215,10 @@ public class WriteManager { LOG.info("Inactive stream, fileId=" + fileHandle.getFileId() + " commitOffset=" + commitOffset); return true; + } else if (ret == OpenFileCtx.COMMIT_INACTIVE_WITH_PENDING_WRITE) { + LOG.info("Inactive stream with pending writes, fileId=" + + fileHandle.getFileId() + " commitOffset=" + commitOffset); + return false; } assert (ret == OpenFileCtx.COMMIT_WAIT || ret == OpenFileCtx.COMMIT_ERROR); if (ret == OpenFileCtx.COMMIT_ERROR) { Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOffsetRange.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOffsetRange.java?rev=1525681&r1=1525680&r2=1525681&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOffsetRange.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOffsetRange.java Mon Sep 23 20:02:38 2013 @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.nfs.nfs3; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; import java.io.IOException; @@ -51,8 +52,9 @@ public class TestOffsetRange { OffsetRange r3 = new OffsetRange(1, 3); OffsetRange r4 = new OffsetRange(3, 4); - assertTrue(r2.compareTo(r3) == 0); - assertTrue(r2.compareTo(r1) == 1); - assertTrue(r2.compareTo(r4) == -1); + assertEquals(0, OffsetRange.ReverseComparatorOnMin.compare(r2, r3)); + assertEquals(0, OffsetRange.ReverseComparatorOnMin.compare(r2, r2)); + assertTrue(OffsetRange.ReverseComparatorOnMin.compare(r2, r1) < 0); + assertTrue(OffsetRange.ReverseComparatorOnMin.compare(r2, r4) > 0); } } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1525681&r1=1525680&r2=1525681&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon Sep 23 20:02:38 2013 @@ -414,6 +414,9 @@ Release 2.1.1-beta - 2013-09-23 HDFS-5212. Refactor RpcMessage and NFS3Response to support different types of authentication information. (jing9) + HDFS-4971. Move IO operations out of locking in OpenFileCtx. (brandonli and + jing9) + OPTIMIZATIONS BUG FIXES