Modified: 
hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- 
hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
 (original)
+++ 
hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
 Wed Oct 30 22:21:59 2013
@@ -22,12 +22,15 @@ import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
 import java.security.InvalidParameterException;
 import java.util.EnumSet;
 import java.util.Iterator;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -45,12 +48,18 @@ import org.apache.hadoop.nfs.nfs3.Nfs3Co
 import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
 import org.apache.hadoop.nfs.nfs3.Nfs3Status;
 import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
+import org.apache.hadoop.nfs.nfs3.response.COMMIT3Response;
 import org.apache.hadoop.nfs.nfs3.response.WRITE3Response;
 import org.apache.hadoop.nfs.nfs3.response.WccAttr;
 import org.apache.hadoop.nfs.nfs3.response.WccData;
 import org.apache.hadoop.oncrpc.XDR;
+import org.apache.hadoop.oncrpc.security.VerifierNone;
+import org.apache.hadoop.util.Daemon;
 import org.jboss.netty.channel.Channel;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
 /**
  * OpenFileCtx saves the context of one HDFS file output stream. Access to it 
is
  * synchronized by its member lock.
@@ -58,34 +67,95 @@ import org.jboss.netty.channel.Channel;
 class OpenFileCtx {
   public static final Log LOG = LogFactory.getLog(OpenFileCtx.class);
   
-  /**
-   * Lock to synchronize OpenFileCtx changes. Thread should get this lock 
before
-   * any read/write operation to an OpenFileCtx object
-   */
-  private final ReentrantLock ctxLock;
+  // Pending writes water mark for dump, 1MB
+  private static long DUMP_WRITE_WATER_MARK = 1024 * 1024;
+
+  static enum COMMIT_STATUS {
+    COMMIT_FINISHED,
+    COMMIT_WAIT,
+    COMMIT_INACTIVE_CTX,
+    COMMIT_INACTIVE_WITH_PENDING_WRITE,
+    COMMIT_ERROR,
+    COMMIT_DO_SYNC;
+  }
 
+  private final DFSClient client;
+  private final IdUserGroup iug;
+  
   // The stream status. False means the stream is closed.
-  private boolean activeState;
+  private volatile boolean activeState;
   // The stream write-back status. True means one thread is doing write back.
-  private boolean asyncStatus;
+  private volatile boolean asyncStatus;
 
+  /**
+   * The current offset of the file in HDFS. All the content before this offset
+   * has been written back to HDFS.
+   */
+  private AtomicLong nextOffset;
   private final HdfsDataOutputStream fos;
-  private final Nfs3FileAttributes latestAttr;
-  private long nextOffset;
+  
+  // It's updated after each sync to HDFS
+  private Nfs3FileAttributes latestAttr;
 
-  private final SortedMap<OffsetRange, WriteCtx> pendingWrites;
+  private final ConcurrentNavigableMap<OffsetRange, WriteCtx> pendingWrites;
+  
+  private final ConcurrentNavigableMap<Long, CommitCtx> pendingCommits;
+
+  static class CommitCtx {
+    private final long offset;
+    private final Channel channel;
+    private final int xid;
+    private final Nfs3FileAttributes preOpAttr;
+
+    // Remember time for debug purpose
+    private final long startTime;
+
+    long getOffset() {
+      return offset;
+    }
+
+    Channel getChannel() {
+      return channel;
+    }
+
+    int getXid() {
+      return xid;
+    }
+
+    Nfs3FileAttributes getPreOpAttr() {
+      return preOpAttr;
+    }
+
+    long getStartTime() {
+      return startTime;
+    }
+
+    CommitCtx(long offset, Channel channel, int xid,
+        Nfs3FileAttributes preOpAttr) {
+      this.offset = offset;
+      this.channel = channel;
+      this.xid = xid;
+      this.preOpAttr = preOpAttr;
+      this.startTime = System.currentTimeMillis();
+    }
+
+    @Override
+    public String toString() {
+      return String.format("offset: %d xid: %d startTime: %d", offset, xid,
+          startTime);
+    }
+  }
   
   // The last write, commit request or write-back event. Updating time to keep
   // output steam alive.
   private long lastAccessTime;
   
-  // Pending writes water mark for dump, 1MB
-  private static int DUMP_WRITE_WATER_MARK = 1024 * 1024; 
+  private volatile boolean enabledDump;
   private FileOutputStream dumpOut;
-  private long nonSequentialWriteInMemory;
-  private boolean enabledDump;
+  private AtomicLong nonSequentialWriteInMemory;
   private RandomAccessFile raf;
   private final String dumpFilePath;
+  private Daemon dumpThread;
   
   private void updateLastAccessTime() {
     lastAccessTime = System.currentTimeMillis();
@@ -95,91 +165,57 @@ class OpenFileCtx {
     return System.currentTimeMillis() - lastAccessTime > streamTimeout;
   }
   
+  public long getNextOffset() {
+    return nextOffset.get();
+  }
+  
   // Increase or decrease the memory occupation of non-sequential writes
   private long updateNonSequentialWriteInMemory(long count) {
-    nonSequentialWriteInMemory += count;
+    long newValue = nonSequentialWriteInMemory.addAndGet(count);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Update nonSequentialWriteInMemory by " + count + " new value:"
-          + nonSequentialWriteInMemory);
+          + newValue);
     }
 
-    if (nonSequentialWriteInMemory < 0) {
-      LOG.error("nonSequentialWriteInMemory is negative after update with 
count "
-          + count);
-      throw new IllegalArgumentException(
-          "nonSequentialWriteInMemory is negative after update with count "
-              + count);
-    }
-    return nonSequentialWriteInMemory;
+    Preconditions.checkState(newValue >= 0,
+        "nonSequentialWriteInMemory is negative after update with count "
+            + count);
+    return newValue;
   }
   
   OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr,
-      String dumpFilePath) {
+      String dumpFilePath, DFSClient client, IdUserGroup iug) {
     this.fos = fos;
     this.latestAttr = latestAttr;
-    pendingWrites = new TreeMap<OffsetRange, WriteCtx>();
+    // We use the ReverseComparatorOnMin as the comparator of the map. In this
+    // way, we first dump the data with larger offset. In the meanwhile, we
+    // retrieve the last element to write back to HDFS.
+    pendingWrites = new ConcurrentSkipListMap<OffsetRange, WriteCtx>(
+        OffsetRange.ReverseComparatorOnMin);
+    
+    pendingCommits = new ConcurrentSkipListMap<Long, CommitCtx>();
+    
     updateLastAccessTime();
     activeState = true;
     asyncStatus = false;
     dumpOut = null;
     raf = null;
-    nonSequentialWriteInMemory = 0;
+    nonSequentialWriteInMemory = new AtomicLong(0);
+  
     this.dumpFilePath = dumpFilePath;  
     enabledDump = dumpFilePath == null ? false: true;
-    nextOffset = latestAttr.getSize();
-    try {
-      assert(nextOffset == this.fos.getPos());
+    nextOffset = new AtomicLong();
+    nextOffset.set(latestAttr.getSize());
+    try {      
+      assert(nextOffset.get() == this.fos.getPos());
     } catch (IOException e) {}
-
-    ctxLock = new ReentrantLock(true);
-  }
-
-  private void lockCtx() {
-    if (LOG.isTraceEnabled()) {
-      StackTraceElement[] stacktrace = Thread.currentThread().getStackTrace();
-      StackTraceElement e = stacktrace[2];
-      String methodName = e.getMethodName();
-      LOG.trace("lock ctx, caller:" + methodName);
-    }
-    ctxLock.lock();
-  }
-
-  private void unlockCtx() {
-    ctxLock.unlock();
-    if (LOG.isTraceEnabled()) {
-      StackTraceElement[] stacktrace = Thread.currentThread().getStackTrace();
-      StackTraceElement e = stacktrace[2];
-      String methodName = e.getMethodName();
-      LOG.info("unlock ctx, caller:" + methodName);
-    }
-  }
-  
-  // Make a copy of the latestAttr
-  public Nfs3FileAttributes copyLatestAttr() {
-    Nfs3FileAttributes ret;
-    lockCtx();
-    try {
-      ret = new Nfs3FileAttributes(latestAttr);
-    } finally {
-      unlockCtx();
-    }
-    return ret;
-  }
-  
-  private long getNextOffsetUnprotected() {
-    assert(ctxLock.isLocked());
-    return nextOffset;
+    dumpThread = null;
+    this.client = client;
+    this.iug = iug;
   }
 
-  public long getNextOffset() {
-    long ret;
-    lockCtx();
-    try {
-      ret = getNextOffsetUnprotected();
-    } finally {
-      unlockCtx();
-    }
-    return ret;
+  public Nfs3FileAttributes getLatestAttr() {
+    return latestAttr;
   }
   
   // Get flushed offset. Note that flushed data may not be persisted.
@@ -188,12 +224,7 @@ class OpenFileCtx {
   }
   
   // Check if need to dump the new writes
-  private void checkDump(long count) {
-    assert (ctxLock.isLocked());
-
-    // Always update the in memory count
-    updateNonSequentialWriteInMemory(count);
-
+  private void checkDump() {
     if (!enabledDump) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Do nothing, dump is disabled.");
@@ -201,66 +232,129 @@ class OpenFileCtx {
       return;
     }
 
-    if (nonSequentialWriteInMemory < DUMP_WRITE_WATER_MARK) {
+    if (nonSequentialWriteInMemory.get() < DUMP_WRITE_WATER_MARK) {
       return;
     }
 
-    // Create dump outputstream for the first time
-    if (dumpOut == null) {
-      LOG.info("Create dump file:" + dumpFilePath);
-      File dumpFile = new File(dumpFilePath);
-      try {
-        if (dumpFile.exists()) {
-          LOG.fatal("The dump file should not exist:" + dumpFilePath);
-          throw new RuntimeException("The dump file should not exist:"
-              + dumpFilePath);
+    // wake up the dumper thread to dump the data
+    synchronized (this) {
+      if (nonSequentialWriteInMemory.get() >= DUMP_WRITE_WATER_MARK) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Asking dumper to dump...");
+        }
+        if (dumpThread == null) {
+          dumpThread = new Daemon(new Dumper());
+          dumpThread.start();
+        } else {
+          this.notifyAll();          
         }
-        dumpOut = new FileOutputStream(dumpFile);
-      } catch (IOException e) {
-        LOG.error("Got failure when creating dump stream " + dumpFilePath
-            + " with error:" + e);
-        enabledDump = false;
-        IOUtils.cleanup(LOG, dumpOut);
-        return;
       }
     }
-    // Get raf for the first dump
-    if (raf == null) {
-      try {
-        raf = new RandomAccessFile(dumpFilePath, "r");
-      } catch (FileNotFoundException e) {
-        LOG.error("Can't get random access to file " + dumpFilePath);
-        // Disable dump
-        enabledDump = false;
-        return;
+  }
+
+  class Dumper implements Runnable {
+    /** Dump data into a file */
+    private void dump() {
+      // Create dump outputstream for the first time
+      if (dumpOut == null) {
+        LOG.info("Create dump file:" + dumpFilePath);
+        File dumpFile = new File(dumpFilePath);
+        try {
+          synchronized (this) {
+            // check if alive again
+            Preconditions.checkState(dumpFile.createNewFile(),
+                "The dump file should not exist: %s", dumpFilePath);
+            dumpOut = new FileOutputStream(dumpFile);
+          }
+        } catch (IOException e) {
+          LOG.error("Got failure when creating dump stream " + dumpFilePath, 
e);
+          enabledDump = false;
+          if (dumpOut != null) {
+            try {
+              dumpOut.close();
+            } catch (IOException e1) {
+              LOG.error("Can't close dump stream " + dumpFilePath, e);
+            }
+          }
+          return;
+        }
       }
-    }
-    
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Start dump, current write number:" + pendingWrites.size());
-    }
-    Iterator<OffsetRange> it = pendingWrites.keySet().iterator();
-    while (it.hasNext()) {
-      OffsetRange key = it.next();
-      WriteCtx writeCtx = pendingWrites.get(key);
-      try {
-        long dumpedDataSize = writeCtx.dumpData(dumpOut, raf);
-        if (dumpedDataSize > 0) {
-          updateNonSequentialWriteInMemory(-dumpedDataSize);
+
+      // Get raf for the first dump
+      if (raf == null) {
+        try {
+          raf = new RandomAccessFile(dumpFilePath, "r");
+        } catch (FileNotFoundException e) {
+          LOG.error("Can't get random access to file " + dumpFilePath);
+          // Disable dump
+          enabledDump = false;
+          return;
         }
-      } catch (IOException e) {
-        LOG.error("Dump data failed:" + writeCtx + " with error:" + e);
-        // Disable dump
-        enabledDump = false;
-        return;
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Start dump. Before dump, nonSequentialWriteInMemory == "
+            + nonSequentialWriteInMemory.get());
+      }
+
+      Iterator<OffsetRange> it = pendingWrites.keySet().iterator();
+      while (activeState && it.hasNext()
+          && nonSequentialWriteInMemory.get() > 0) {
+        OffsetRange key = it.next();
+        WriteCtx writeCtx = pendingWrites.get(key);
+        if (writeCtx == null) {
+          // This write was just deleted
+          continue;
+        }
+        try {
+          long dumpedDataSize = writeCtx.dumpData(dumpOut, raf);
+          if (dumpedDataSize > 0) {
+            updateNonSequentialWriteInMemory(-dumpedDataSize);
+          }
+        } catch (IOException e) {
+          LOG.error("Dump data failed:" + writeCtx + " with error:" + e
+              + " OpenFileCtx state:" + activeState);
+          // Disable dump
+          enabledDump = false;
+          return;
+        }
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("After dump, nonSequentialWriteInMemory == "
+            + nonSequentialWriteInMemory.get());
       }
     }
-    if (nonSequentialWriteInMemory != 0) {
-      LOG.fatal("After dump, nonSequentialWriteInMemory is not zero: "
-          + nonSequentialWriteInMemory);
-      throw new RuntimeException(
-          "After dump, nonSequentialWriteInMemory is not zero: "
-              + nonSequentialWriteInMemory);
+
+    @Override
+    public void run() {
+      while (activeState && enabledDump) {
+        try {
+          if (nonSequentialWriteInMemory.get() >= DUMP_WRITE_WATER_MARK) {
+            dump();
+          }
+          synchronized (OpenFileCtx.this) {
+            if (nonSequentialWriteInMemory.get() < DUMP_WRITE_WATER_MARK) {
+              try {
+                OpenFileCtx.this.wait();
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("Dumper woke up");
+                }
+              } catch (InterruptedException e) {
+                LOG.info("Dumper is interrupted, dumpFilePath= "
+                    + OpenFileCtx.this.dumpFilePath);
+              }
+            }
+          }
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Dumper checking OpenFileCtx activeState: " + activeState
+                + " enabledDump: " + enabledDump);
+          }
+        } catch (Throwable t) {
+          LOG.info("Dumper get Throwable: " + t + ". dumpFilePath: "
+              + OpenFileCtx.this.dumpFilePath);
+        }
+      }
     }
   }
   
@@ -284,143 +378,252 @@ class OpenFileCtx {
   public void receivedNewWrite(DFSClient dfsClient, WRITE3Request request,
       Channel channel, int xid, AsyncDataService asyncDataService,
       IdUserGroup iug) {
-
-    lockCtx();
-    try {
-      if (!activeState) {
-        LOG.info("OpenFileCtx is inactive, fileId:"
-            + request.getHandle().getFileId());
-        WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
-        WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
-            fileWcc, 0, request.getStableHow(), 
Nfs3Constant.WRITE_COMMIT_VERF);
-        Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
-      } else {
-        // Handle repeated write requests(same xid or not).
-        // If already replied, send reply again. If not replied, drop the
-        // repeated request.
-        WriteCtx existantWriteCtx = checkRepeatedWriteRequest(request, channel,
-            xid);
-        if (existantWriteCtx != null) {
-          if (!existantWriteCtx.getReplied()) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Repeated write request which hasn't be served: xid="
-                  + xid + ", drop it.");
-            }
-          } else {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Repeated write request which is already served: xid="
-                  + xid + ", resend response.");
-            }
-            WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
-            WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
-                fileWcc, request.getCount(), request.getStableHow(),
-                Nfs3Constant.WRITE_COMMIT_VERF);
-            Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+    
+    if (!activeState) {
+      LOG.info("OpenFileCtx is inactive, fileId:"
+          + request.getHandle().getFileId());
+      WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
+      WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
+          fileWcc, 0, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF);
+      Nfs3Utils.writeChannel(channel,
+          response.writeHeaderAndResponse(new XDR(), xid, new VerifierNone()),
+          xid);
+    } else {
+      // Update the write time first
+      updateLastAccessTime();
+      
+      // Handle repeated write requests (same xid or not).
+      // If already replied, send reply again. If not replied, drop the
+      // repeated request.
+      WriteCtx existantWriteCtx = checkRepeatedWriteRequest(request, channel,
+          xid);
+      if (existantWriteCtx != null) {
+        if (!existantWriteCtx.getReplied()) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Repeated write request which hasn't be served: xid="
+                + xid + ", drop it.");
           }
-          updateLastAccessTime();
-          
         } else {
-          receivedNewWriteInternal(dfsClient, request, channel, xid,
-              asyncDataService, iug);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Repeated write request which is already served: xid="
+                + xid + ", resend response.");
+          }
+          WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
+          WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
+              fileWcc, request.getCount(), request.getStableHow(),
+              Nfs3Constant.WRITE_COMMIT_VERF);
+          Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
+              new XDR(), xid, new VerifierNone()), xid);
         }
+      } else {
+        // not a repeated write request
+        receivedNewWriteInternal(dfsClient, request, channel, xid,
+            asyncDataService, iug);
       }
-
-    } finally {
-      unlockCtx();
     }
   }
 
-  private void receivedNewWriteInternal(DFSClient dfsClient,
-      WRITE3Request request, Channel channel, int xid,
-      AsyncDataService asyncDataService, IdUserGroup iug) {
+  @VisibleForTesting
+  public static void alterWriteRequest(WRITE3Request request, long 
cachedOffset) {
     long offset = request.getOffset();
     int count = request.getCount();
-    WriteStableHow stableHow = request.getStableHow();
-
-    // Get file length, fail non-append call
-    WccAttr preOpAttr = latestAttr.getWccAttr();
+    long smallerCount = offset + count - cachedOffset;
     if (LOG.isDebugEnabled()) {
-      LOG.debug("requesed offset=" + offset + " and current filesize="
-          + preOpAttr.getSize());
+      LOG.debug(String.format("Got overwrite with appended data (%d-%d),"
+          + " current offset %d," + " drop the overlapped section (%d-%d)"
+          + " and append new data (%d-%d).", offset, (offset + count - 1),
+          cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset
+              + count - 1)));
+    }
+    
+    ByteBuffer data = request.getData();
+    Preconditions.checkState(data.position() == 0,
+        "The write request data has non-zero position");
+    data.position((int) (cachedOffset - offset));
+    Preconditions.checkState(data.limit() - data.position() == smallerCount,
+        "The write request buffer has wrong limit/position regarding count");
+    
+    request.setOffset(cachedOffset);
+    request.setCount((int) smallerCount);
+  }
+  
+  /**
+   * Creates and adds a WriteCtx into the pendingWrites map. This is a
+   * synchronized method to handle concurrent writes.
+   * 
+   * @return A non-null {@link WriteCtx} instance if the incoming write
+   *         request's offset >= nextOffset. Otherwise null.
+   */
+  private synchronized WriteCtx addWritesToCache(WRITE3Request request,
+      Channel channel, int xid) {
+    long offset = request.getOffset();
+    int count = request.getCount();
+    long cachedOffset = nextOffset.get();
+    int originalCount = WriteCtx.INVALID_ORIGINAL_COUNT;
+    
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("requesed offset=" + offset + " and current offset="
+          + cachedOffset);
     }
 
-    long nextOffset = getNextOffsetUnprotected();
-    if (offset == nextOffset) {
-      LOG.info("Add to the list, update nextOffset and notify the writer,"
-          + " nextOffset:" + nextOffset);
-      WriteCtx writeCtx = new WriteCtx(request.getHandle(),
-          request.getOffset(), request.getCount(), request.getStableHow(),
-          request.getData().array(), channel, xid, false, DataState.NO_DUMP);
-      addWrite(writeCtx);
+    // Handle a special case first
+    if ((offset < cachedOffset) && (offset + count > cachedOffset)) {
+      // One Linux client behavior: after a file is closed and reopened to
+      // write, the client sometimes combines previous written data(could still
+      // be in kernel buffer) with newly appended data in one write. This is
+      // usually the first write after file reopened. In this
+      // case, we log the event and drop the overlapped section.
+      LOG.warn(String.format("Got overwrite with appended data (%d-%d),"
+          + " current offset %d," + " drop the overlapped section (%d-%d)"
+          + " and append new data (%d-%d).", offset, (offset + count - 1),
+          cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset
+              + count - 1)));
+
+      if (!pendingWrites.isEmpty()) {
+        LOG.warn("There are other pending writes, fail this jumbo write");
+        return null;
+      }
       
-      // Create an async task and change openFileCtx status to indicate async
-      // task pending
+      LOG.warn("Modify this write to write only the appended data");
+      alterWriteRequest(request, cachedOffset);
+
+      // Update local variable
+      originalCount = count;
+      offset = request.getOffset();
+      count = request.getCount();
+    }
+    
+    // Fail non-append call
+    if (offset < cachedOffset) {
+      LOG.warn("(offset,count,nextOffset):" + "(" + offset + "," + count + ","
+          + nextOffset + ")");
+      return null;
+    } else {
+      DataState dataState = offset == cachedOffset ? WriteCtx.DataState.NO_DUMP
+          : WriteCtx.DataState.ALLOW_DUMP;
+      WriteCtx writeCtx = new WriteCtx(request.getHandle(),
+          request.getOffset(), request.getCount(), originalCount,
+          request.getStableHow(), request.getData(), channel, xid, false,
+          dataState);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Add new write to the list with nextOffset " + cachedOffset
+            + " and requesed offset=" + offset);
+      }
+      if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
+        // update the memory size
+        updateNonSequentialWriteInMemory(count);
+      }
+      // check if there is a WriteCtx with the same range in pendingWrites
+      WriteCtx oldWriteCtx = checkRepeatedWriteRequest(request, channel, xid);
+      if (oldWriteCtx == null) {
+        addWrite(writeCtx);
+      } else {
+        LOG.warn("Got a repeated request, same range, with xid:"
+            + writeCtx.getXid());
+      }
+      return writeCtx;
+    }
+  }
+  
+  /** Process an overwrite write request */
+  private void processOverWrite(DFSClient dfsClient, WRITE3Request request,
+      Channel channel, int xid, IdUserGroup iug) {
+    WccData wccData = new WccData(latestAttr.getWccAttr(), null);
+    long offset = request.getOffset();
+    int count = request.getCount();
+    WriteStableHow stableHow = request.getStableHow();
+    WRITE3Response response;
+    long cachedOffset = nextOffset.get();
+    if (offset + count > cachedOffset) {
+      LOG.warn("Treat this jumbo write as a real random write, no support.");
+      response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0,
+          WriteStableHow.UNSTABLE, Nfs3Constant.WRITE_COMMIT_VERF);
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Process perfectOverWrite");
+      }
+      // TODO: let executor handle perfect overwrite
+      response = processPerfectOverWrite(dfsClient, offset, count, stableHow,
+          request.getData().array(),
+          Nfs3Utils.getFileIdPath(request.getHandle()), wccData, iug);
+    }
+    updateLastAccessTime();
+    Nfs3Utils.writeChannel(channel,
+        response.writeHeaderAndResponse(new XDR(), xid, new VerifierNone()),
+        xid);
+  }
+  
+  /**
+   * Check if we can start the write (back to HDFS) now. If there is no hole 
for
+   * writing, and there is no other threads writing (i.e., asyncStatus is
+   * false), start the writing and set asyncStatus to true.
+   * 
+   * @return True if the new write is sequencial and we can start writing
+   *         (including the case that there is already a thread writing).
+   */
+  private synchronized boolean checkAndStartWrite(
+      AsyncDataService asyncDataService, WriteCtx writeCtx) {
+    
+    if (writeCtx.getOffset() == nextOffset.get()) {
       if (!asyncStatus) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Trigger the write back task. Current nextOffset: "
+              + nextOffset.get());
+        }
         asyncStatus = true;
         asyncDataService.execute(new AsyncDataService.WriteBackTask(this));
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("The write back thread is working.");
+        }
       }
-      
-      // Update the write time first
-      updateLastAccessTime();
-      Nfs3FileAttributes postOpAttr = new Nfs3FileAttributes(latestAttr);
-
-      // Send response immediately for unstable write
-      if (request.getStableHow() == WriteStableHow.UNSTABLE) {
-        WccData fileWcc = new WccData(preOpAttr, postOpAttr);
-        WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
-            fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
-        Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
-        writeCtx.setReplied(true);
-      }
-
-    } else if (offset > nextOffset) {
-      LOG.info("Add new write to the list but not update nextOffset:"
-          + nextOffset);
-      WriteCtx writeCtx = new WriteCtx(request.getHandle(),
-          request.getOffset(), request.getCount(), request.getStableHow(),
-          request.getData().array(), channel, xid, false, 
DataState.ALLOW_DUMP);
-      addWrite(writeCtx);
+      return true;
+    } else {
+      return false;
+    }
+  }
 
-      // Check if need to dump some pending requests to file
-      checkDump(request.getCount());
-      updateLastAccessTime();
-      Nfs3FileAttributes postOpAttr = new Nfs3FileAttributes(latestAttr);
-      
-      // In test, noticed some Linux client sends a batch (e.g., 1MB)
-      // of reordered writes and won't send more writes until it gets
-      // responses of the previous batch. So here send response immediately for
-      // unstable non-sequential write
-      if (request.getStableHow() == WriteStableHow.UNSTABLE) {
-        WccData fileWcc = new WccData(preOpAttr, postOpAttr);
-        WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
-            fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
-        Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
-        writeCtx.setReplied(true);
-      }
+  private void receivedNewWriteInternal(DFSClient dfsClient,
+      WRITE3Request request, Channel channel, int xid,
+      AsyncDataService asyncDataService, IdUserGroup iug) {
+    WriteStableHow stableHow = request.getStableHow();
+    WccAttr preOpAttr = latestAttr.getWccAttr();
+    int count = request.getCount();
 
-    } else {
+    WriteCtx writeCtx = addWritesToCache(request, channel, xid);
+    if (writeCtx == null) {
       // offset < nextOffset
-      LOG.warn("(offset,count,nextOffset):" + "(" + offset + "," + count + ","
-          + nextOffset + ")");
-      WccData wccData = new WccData(preOpAttr, null);
-      WRITE3Response response;
+      processOverWrite(dfsClient, request, channel, xid, iug);
+    } else {
+      // The writes is added to pendingWrites.
+      // Check and start writing back if necessary
+      boolean startWriting = checkAndStartWrite(asyncDataService, writeCtx);
+      if (!startWriting) {
+        // offset > nextOffset. check if we need to dump data
+        checkDump();
+        
+        // In test, noticed some Linux client sends a batch (e.g., 1MB)
+        // of reordered writes and won't send more writes until it gets
+        // responses of the previous batch. So here send response immediately
+        // for unstable non-sequential write
+        if (stableHow != WriteStableHow.UNSTABLE) {
+          LOG.info("Have to change stable write to unstable write:"
+              + request.getStableHow());
+          stableHow = WriteStableHow.UNSTABLE;
+        }
 
-      if (offset + count > nextOffset) {
-        LOG.warn("Haven't noticed any partial overwrite out of a sequential 
file"
-            + "write requests, so treat it as a real random write, no 
support.");
-        response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0,
-            WriteStableHow.UNSTABLE, 0);
-      } else {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Process perfectOverWrite");
+          LOG.debug("UNSTABLE write request, send response for offset: "
+              + writeCtx.getOffset());
         }
-        response = processPerfectOverWrite(dfsClient, offset, count, stableHow,
-            request.getData().array(),
-            Nfs3Utils.getFileIdPath(request.getHandle()), wccData, iug);
+        WccData fileWcc = new WccData(preOpAttr, latestAttr);
+        WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
+            fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
+        Nfs3Utils
+            .writeChannel(channel, response.writeHeaderAndResponse(new XDR(),
+                xid, new VerifierNone()), xid);
+        writeCtx.setReplied(true);
       }
-      
-      updateLastAccessTime();
-      Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
     }
   }
   
@@ -432,7 +635,6 @@ class OpenFileCtx {
   private WRITE3Response processPerfectOverWrite(DFSClient dfsClient,
       long offset, int count, WriteStableHow stableHow, byte[] data,
       String path, WccData wccData, IdUserGroup iug) {
-    assert (ctxLock.isLocked());
     WRITE3Response response = null;
 
     // Read the content back
@@ -443,21 +645,30 @@ class OpenFileCtx {
     try {
       // Sync file data and length to avoid partial read failure
       fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
-      
+    } catch (ClosedChannelException closedException) {
+      LOG.info("The FSDataOutputStream has been closed. " +
+               "Continue processing the perfect overwrite.");
+    } catch (IOException e) {
+      LOG.info("hsync failed when processing possible perfect overwrite, path="
+          + path + " error:" + e);
+      return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow,
+          Nfs3Constant.WRITE_COMMIT_VERF);
+    }
+    
+    try {
       fis = new FSDataInputStream(dfsClient.open(path));
       readCount = fis.read(offset, readbuffer, 0, count);
       if (readCount < count) {
         LOG.error("Can't read back " + count + " bytes, partial read size:"
             + readCount);
-        return response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0,
-            stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
+        return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow,
+            Nfs3Constant.WRITE_COMMIT_VERF);
       }
-
     } catch (IOException e) {
       LOG.info("Read failed when processing possible perfect overwrite, path="
           + path + " error:" + e);
-      return response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0,
-          stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
+      return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow,
+          Nfs3Constant.WRITE_COMMIT_VERF);
     } finally {
       IOUtils.cleanup(LOG, fis);
     }
@@ -467,7 +678,7 @@ class OpenFileCtx {
     if (comparator.compare(readbuffer, 0, readCount, data, 0, count) != 0) {
       LOG.info("Perfect overwrite has different content");
       response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0,
-          stableHow, 0);
+          stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
     } else {
       LOG.info("Perfect overwrite has same content,"
           + " updating the mtime, then return success");
@@ -479,45 +690,63 @@ class OpenFileCtx {
         LOG.info("Got error when processing perfect overwrite, path=" + path
             + " error:" + e);
         return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow,
-            0);
+            Nfs3Constant.WRITE_COMMIT_VERF);
       }
 
       wccData.setPostOpAttr(postOpAttr);
       response = new WRITE3Response(Nfs3Status.NFS3_OK, wccData, count,
-          stableHow, 0);
+          stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
     }
     return response;
   }
-  
-  public final static int COMMIT_FINISHED = 0;
-  public final static int COMMIT_WAIT = 1;
-  public final static int COMMIT_INACTIVE_CTX = 2;
-  public final static int COMMIT_ERROR = 3;
 
-  /**
-   * return one commit status: COMMIT_FINISHED, COMMIT_WAIT,
-   * COMMIT_INACTIVE_CTX, COMMIT_ERROR
-   */
-  public int checkCommit(long commitOffset) {
-    int ret = COMMIT_WAIT;
+  public COMMIT_STATUS checkCommit(DFSClient dfsClient, long commitOffset,
+      Channel channel, int xid, Nfs3FileAttributes preOpAttr) {
+    // Keep stream active
+    updateLastAccessTime();
+    Preconditions.checkState(commitOffset >= 0);
 
-    lockCtx();
-    try {
-      if (!activeState) {
-        ret = COMMIT_INACTIVE_CTX;
-      } else {
-        ret = checkCommitInternal(commitOffset);
+    COMMIT_STATUS ret = checkCommitInternal(commitOffset, channel, xid,
+        preOpAttr);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Got commit status: " + ret.name());
+    }
+    // Do the sync outside the lock
+    if (ret == COMMIT_STATUS.COMMIT_DO_SYNC
+        || ret == COMMIT_STATUS.COMMIT_FINISHED) {
+      try {
+        // Sync file data and length
+        fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
+        // Nothing to do for metadata since attr related change is pass-through
+      } catch (ClosedChannelException cce) {
+        if (pendingWrites.isEmpty()) {
+          ret = COMMIT_STATUS.COMMIT_FINISHED;
+        } else {
+          ret = COMMIT_STATUS.COMMIT_ERROR;
+        }
+      } catch (IOException e) {
+        LOG.error("Got stream error during data sync:" + e);
+        // Do nothing. Stream will be closed eventually by StreamMonitor.
+        // status = Nfs3Status.NFS3ERR_IO;
+        ret = COMMIT_STATUS.COMMIT_ERROR;
       }
-    } finally {
-      unlockCtx();
     }
     return ret;
   }
-  
-  private int checkCommitInternal(long commitOffset) {
-    if (commitOffset == 0) {
-      // Commit whole file
-      commitOffset = getNextOffsetUnprotected();
+
+  /**
+   * return one commit status: COMMIT_FINISHED, COMMIT_WAIT,
+   * COMMIT_INACTIVE_CTX, COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR
+   */
+  private synchronized COMMIT_STATUS checkCommitInternal(long commitOffset,
+      Channel channel, int xid, Nfs3FileAttributes preOpAttr) {
+    if (!activeState) {
+      if (pendingWrites.isEmpty()) {
+        return COMMIT_STATUS.COMMIT_INACTIVE_CTX;
+      } else {
+        // TODO: return success if already committed
+        return COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE;
+      }
     }
 
     long flushed = 0;
@@ -525,45 +754,52 @@ class OpenFileCtx {
       flushed = getFlushedOffset();
     } catch (IOException e) {
       LOG.error("Can't get flushed offset, error:" + e);
-      return COMMIT_ERROR;
+      return COMMIT_STATUS.COMMIT_ERROR;
     }
-    LOG.info("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset);
-    if (flushed < commitOffset) {
-      // Keep stream active
-      updateLastAccessTime();
-      return COMMIT_WAIT;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("getFlushedOffset=" + flushed + " commitOffset=" + 
commitOffset);
     }
 
-    int ret = COMMIT_WAIT;
-    try {
-      // Sync file data and length
-      fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
-      // Nothing to do for metadata since attr related change is pass-through
-      ret = COMMIT_FINISHED;
-    } catch (IOException e) {
-      LOG.error("Got stream error during data sync:" + e);
-      // Do nothing. Stream will be closed eventually by StreamMonitor.
-      ret = COMMIT_ERROR;
+    if (commitOffset > 0) {
+      if (commitOffset > flushed) {
+        CommitCtx commitCtx = new CommitCtx(commitOffset, channel, xid,
+            preOpAttr);
+        pendingCommits.put(commitOffset, commitCtx);
+        return COMMIT_STATUS.COMMIT_WAIT;
+      } else {
+        return COMMIT_STATUS.COMMIT_DO_SYNC;
+      }
     }
 
-    // Keep stream active
-    updateLastAccessTime();
-    return ret;
+    Entry<OffsetRange, WriteCtx> key = pendingWrites.firstEntry();
+
+    // Commit whole file, commitOffset == 0
+    if (pendingWrites.isEmpty()) {
+      // Note that, there is no guarantee data is synced. TODO: We could still
+      // do a sync here though the output stream might be closed.
+      return COMMIT_STATUS.COMMIT_FINISHED;
+    } else {
+      // Insert commit
+      long maxOffset = key.getKey().getMax() - 1;
+      Preconditions.checkState(maxOffset > 0);
+      CommitCtx commitCtx = new CommitCtx(maxOffset, channel, xid, preOpAttr);
+      pendingCommits.put(maxOffset, commitCtx);
+      return COMMIT_STATUS.COMMIT_WAIT;
+    }
   }
   
   private void addWrite(WriteCtx writeCtx) {
-    assert (ctxLock.isLocked());
     long offset = writeCtx.getOffset();
     int count = writeCtx.getCount();
+    // For the offset range (min, max), min is inclusive, and max is exclusive
     pendingWrites.put(new OffsetRange(offset, offset + count), writeCtx);
   }
   
-  
   /**
    * Check stream status to decide if it should be closed
    * @return true, remove stream; false, keep stream
    */
-  public boolean streamCleanup(long fileId, long streamTimeout) {
+  public synchronized boolean streamCleanup(long fileId, long streamTimeout) {
     if (streamTimeout < WriteManager.MINIMIUM_STREAM_TIMEOUT) {
       throw new InvalidParameterException("StreamTimeout" + streamTimeout
           + "ms is less than MINIMIUM_STREAM_TIMEOUT "
@@ -571,131 +807,191 @@ class OpenFileCtx {
     }
     
     boolean flag = false;
-    if (!ctxLock.tryLock()) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Another thread is working on it" + ctxLock.toString());
-      }
-      return flag;
-    }
-    
-    try {
-      // Check the stream timeout
-      if (checkStreamTimeout(streamTimeout)) {
-        LOG.info("closing stream for fileId:" + fileId);
-        cleanup();
-        flag = true;
+    // Check the stream timeout
+    if (checkStreamTimeout(streamTimeout)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("closing stream for fileId:" + fileId);
       }
-    } finally {
-      unlockCtx();
+      cleanup();
+      flag = true;
     }
     return flag;
   }
   
-  // Invoked by AsynDataService to do the write back
-  public void executeWriteBack() {
-    long nextOffset;
-    OffsetRange key;
-    WriteCtx writeCtx;
-
-    try {
-      // Don't lock OpenFileCtx for all writes to reduce the timeout of other
-      // client request to the same file
-      while (true) {
-        lockCtx();
-        if (!asyncStatus) {
-          // This should never happen. There should be only one thread working
-          // on one OpenFileCtx anytime.
-          LOG.fatal("The openFileCtx has false async status");
-          throw new RuntimeException("The openFileCtx has false async status");
-        }
-        // Any single write failure can change activeState to false, so do the
-        // check each loop.
-        if (pendingWrites.isEmpty()) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("The asyn write task has no pendding writes, fileId: "
-                + latestAttr.getFileId());
-          }
-          break;
+  /**
+   * Get (and remove) the next WriteCtx from {@link #pendingWrites} if 
possible.
+   * 
+   * @return Null if {@link #pendingWrites} is null, or the next WriteCtx's
+   *         offset is larger than nextOffSet.
+   */
+  private synchronized WriteCtx offerNextToWrite() {
+    if (pendingWrites.isEmpty()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("The asyn write task has no pending writes, fileId: "
+            + latestAttr.getFileId());
+      }
+      // process pending commit again to handle this race: a commit is added
+      // to pendingCommits map just after the last doSingleWrite returns.
+      // There is no pending write and the commit should be handled by the
+      // last doSingleWrite. Due to the race, the commit is left along and
+      // can't be processed until cleanup. Therefore, we should do another
+      // processCommits to fix the race issue.
+      processCommits(nextOffset.get()); // nextOffset has same value as
+                                        // flushedOffset
+      this.asyncStatus = false;
+      return null;
+    } 
+    
+      Entry<OffsetRange, WriteCtx> lastEntry = pendingWrites.lastEntry();
+      OffsetRange range = lastEntry.getKey();
+      WriteCtx toWrite = lastEntry.getValue();
+      
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("range.getMin()=" + range.getMin() + " nextOffset="
+            + nextOffset);
+      }
+      
+      long offset = nextOffset.get();
+      if (range.getMin() > offset) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("The next sequencial write has not arrived yet");
         }
-        if (!activeState) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("The openFileCtx is not active anymore, fileId: "
-                + latestAttr.getFileId());
-          }
-          break;
+        processCommits(nextOffset.get()); // handle race
+        this.asyncStatus = false;
+      } else if (range.getMin() < offset && range.getMax() > offset) {
+        // shouldn't happen since we do sync for overlapped concurrent writers
+        LOG.warn("Got a overlapping write (" + range.getMin() + ","
+            + range.getMax() + "), nextOffset=" + offset
+            + ". Silently drop it now");
+        pendingWrites.remove(range);
+        processCommits(nextOffset.get()); // handle race
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Remove write(" + range.getMin() + "-" + range.getMax()
+              + ") from the list");
         }
-
-        // Get the next sequential write
-        nextOffset = getNextOffsetUnprotected();
-        key = pendingWrites.firstKey();
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("key.getMin()=" + key.getMin() + " nextOffset="
-              + nextOffset);
+        // after writing, remove the WriteCtx from cache 
+        pendingWrites.remove(range);
+        // update nextOffset
+        nextOffset.addAndGet(toWrite.getCount());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Change nextOffset to " + nextOffset.get());
         }
-
-        if (key.getMin() > nextOffset) {
-          if (LOG.isDebugEnabled()) {
-            LOG.info("The next sequencial write has not arrived yet");
-          }
-          break;
-
-        } else if (key.getMin() < nextOffset && key.getMax() > nextOffset) {
-          // Can't handle overlapping write. Didn't see it in tests yet.
-          LOG.fatal("Got a overlapping write (" + key.getMin() + ","
-              + key.getMax() + "), nextOffset=" + nextOffset);
-          throw new RuntimeException("Got a overlapping write (" + key.getMin()
-              + "," + key.getMax() + "), nextOffset=" + nextOffset);
-
-        } else {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("Remove write(" + key.getMin() + "-" + key.getMax()
-                + ") from the list");
-          }
-          writeCtx = pendingWrites.remove(key);
+        return toWrite;
+      }
+    
+    return null;
+  }
+  
+  /** Invoked by AsynDataService to write back to HDFS */
+  void executeWriteBack() {
+    Preconditions.checkState(asyncStatus,
+        "The openFileCtx has false async status");
+    try {
+      while (activeState) {
+        WriteCtx toWrite = offerNextToWrite();
+        if (toWrite != null) {
           // Do the write
-          doSingleWrite(writeCtx);
+          doSingleWrite(toWrite);
           updateLastAccessTime();
+        } else {
+          break;
         }
-        
-        unlockCtx();
       }
-
+      
+      if (!activeState && LOG.isDebugEnabled()) {
+        LOG.debug("The openFileCtx is not active anymore, fileId: "
+            + latestAttr.getFileId());
+      }
     } finally {
-      // Always reset the async status so another async task can be created
-      // for this file
+      // make sure we reset asyncStatus to false
       asyncStatus = false;
-      if (ctxLock.isHeldByCurrentThread()) {
-        unlockCtx();
-      }
     }
   }
 
+  private void processCommits(long offset) {
+    Preconditions.checkState(offset > 0);
+    long flushedOffset = 0;
+    Entry<Long, CommitCtx> entry = null;
+
+    int status = Nfs3Status.NFS3ERR_IO;
+    try {
+      flushedOffset = getFlushedOffset();
+      entry = pendingCommits.firstEntry();
+      if (entry == null || entry.getValue().offset > flushedOffset) {
+        return;
+      }
+
+      // Now do sync for the ready commits
+      // Sync file data and length
+      fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
+      status = Nfs3Status.NFS3_OK;
+    } catch (ClosedChannelException cce) {
+      if (!pendingWrites.isEmpty()) {
+        LOG.error("Can't sync for fileId: " + latestAttr.getFileId()
+            + ". Channel closed with writes pending");
+      }
+      status = Nfs3Status.NFS3ERR_IO;
+    } catch (IOException e) {
+      LOG.error("Got stream error during data sync:" + e);
+      // Do nothing. Stream will be closed eventually by StreamMonitor.
+      status = Nfs3Status.NFS3ERR_IO;
+    }
+
+    // Update latestAttr
+    try {
+      latestAttr = Nfs3Utils.getFileAttr(client,
+          Nfs3Utils.getFileIdPath(latestAttr.getFileId()), iug);
+    } catch (IOException e) {
+      LOG.error("Can't get new file attr for fileId: " + 
latestAttr.getFileId());
+      status = Nfs3Status.NFS3ERR_IO;
+    }
+
+    if (latestAttr.getSize() != offset) {
+      LOG.error("After sync, the expect file size: " + offset
+          + ", however actual file size is: " + latestAttr.getSize());
+      status = Nfs3Status.NFS3ERR_IO;
+    }
+    WccData wccData = new WccData(Nfs3Utils.getWccAttr(latestAttr), 
latestAttr);
+
+    // Send response for the ready commits
+    while (entry != null && entry.getValue().offset <= flushedOffset) {
+      pendingCommits.remove(entry.getKey());
+      CommitCtx commit = entry.getValue();
+
+      COMMIT3Response response = new COMMIT3Response(status, wccData,
+          Nfs3Constant.WRITE_COMMIT_VERF);
+      Nfs3Utils.writeChannelCommit(commit.getChannel(), response
+          .writeHeaderAndResponse(new XDR(), commit.getXid(),
+              new VerifierNone()), commit.getXid());
+      
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("FileId: " + latestAttr.getFileid() + " Service time:"
+            + (System.currentTimeMillis() - commit.getStartTime())
+            + "ms. Sent response for commit:" + commit);
+      }
+      entry = pendingCommits.firstEntry();
+    }
+  }
+  
   private void doSingleWrite(final WriteCtx writeCtx) {
-    assert(ctxLock.isLocked());
     Channel channel = writeCtx.getChannel();
     int xid = writeCtx.getXid();
 
     long offset = writeCtx.getOffset();
     int count = writeCtx.getCount();
     WriteStableHow stableHow = writeCtx.getStableHow();
-    byte[] data = null;
-    try {
-      data = writeCtx.getData();
-    } catch (IOException e1) {
-      LOG.error("Failed to get request data offset:" + offset + " count:"
-          + count + " error:" + e1);
-      // Cleanup everything
-      cleanup();
-      return;
-    }
-    assert (data.length == count);
-
+    
     FileHandle handle = writeCtx.getHandle();
-    LOG.info("do write, fileId: " + handle.getFileId() + " offset: " + offset
-        + " length:" + count + " stableHow:" + stableHow.getValue());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("do write, fileId: " + handle.getFileId() + " offset: "
+          + offset + " length:" + count + " stableHow:" + 
stableHow.getValue());
+    }
 
     try {
-      fos.write(data, 0, count);
+      // The write is not protected by lock. asyncState is used to make sure
+      // there is one thread doing write back at any time    
+      writeCtx.writeData(fos);
       
       long flushedOffset = getFlushedOffset();
       if (flushedOffset != (offset + count)) {
@@ -703,27 +999,47 @@ class OpenFileCtx {
             + flushedOffset + " and nextOffset should be"
             + (offset + count));
       }
-      nextOffset = flushedOffset;
+      
 
       // Reduce memory occupation size if request was allowed dumped
-      if (writeCtx.getDataState() == DataState.ALLOW_DUMP) {
-        updateNonSequentialWriteInMemory(-count);
+      if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
+        synchronized (writeCtx) {
+          if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
+            writeCtx.setDataState(WriteCtx.DataState.NO_DUMP);
+            updateNonSequentialWriteInMemory(-count);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("After writing " + handle.getFileId() + " at offset "
+                  + offset + ", updated the memory count, new value:"
+                  + nonSequentialWriteInMemory.get());
+            }
+          }
+        }
       }
       
       if (!writeCtx.getReplied()) {
         WccAttr preOpAttr = latestAttr.getWccAttr();
         WccData fileWcc = new WccData(preOpAttr, latestAttr);
+        if (writeCtx.getOriginalCount() != WriteCtx.INVALID_ORIGINAL_COUNT) {
+          LOG.warn("Return original count:" + writeCtx.getOriginalCount()
+              + " instead of real data count:" + count);
+          count = writeCtx.getOriginalCount();
+        }
         WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
             fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
-        Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+        Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
+            new XDR(), xid, new VerifierNone()), xid);
       }
-
+      
+      // Handle the waiting commits without holding any lock
+      processCommits(writeCtx.getOffset() + writeCtx.getCount());
+     
     } catch (IOException e) {
       LOG.error("Error writing to fileId " + handle.getFileId() + " at offset "
-          + offset + " and length " + data.length, e);
+          + offset + " and length " + count, e);
       if (!writeCtx.getReplied()) {
         WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO);
-        Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+        Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
+            new XDR(), xid, new VerifierNone()), xid);
         // Keep stream open. Either client retries or SteamMonitor closes it.
       }
 
@@ -733,9 +1049,21 @@ class OpenFileCtx {
     }
   }
 
-  private void cleanup() {
-    assert(ctxLock.isLocked());
+  private synchronized void cleanup() {
+    if (!activeState) {
+      LOG.info("Current OpenFileCtx is already inactive, no need to cleanup.");
+      return;
+    }
     activeState = false;
+
+    // stop the dump thread
+    if (dumpThread != null) {
+      dumpThread.interrupt();
+      try {
+        dumpThread.join(3000);
+      } catch (InterruptedException e) {
+      }
+    }
     
     // Close stream
     try {
@@ -753,36 +1081,62 @@ class OpenFileCtx {
     while (!pendingWrites.isEmpty()) {
       OffsetRange key = pendingWrites.firstKey();
       LOG.info("Fail pending write: (" + key.getMin() + "," + key.getMax()
-          + "), nextOffset=" + getNextOffsetUnprotected());
+          + "), nextOffset=" + nextOffset.get());
       
       WriteCtx writeCtx = pendingWrites.remove(key);
       if (!writeCtx.getReplied()) {
         WccData fileWcc = new WccData(preOpAttr, latestAttr);
         WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
             fileWcc, 0, writeCtx.getStableHow(), 
Nfs3Constant.WRITE_COMMIT_VERF);
-        Nfs3Utils.writeChannel(writeCtx.getChannel(),
-            response.send(new XDR(), writeCtx.getXid()));
+        Nfs3Utils.writeChannel(writeCtx.getChannel(), response
+            .writeHeaderAndResponse(new XDR(), writeCtx.getXid(),
+                new VerifierNone()), writeCtx.getXid());
       }
     }
     
     // Cleanup dump file
-    if (dumpOut!=null){
+    if (dumpOut != null) {
       try {
         dumpOut.close();
       } catch (IOException e) {
         e.printStackTrace();
       }
+      File dumpFile = new File(dumpFilePath);
+      if (dumpFile.exists() && !dumpFile.delete()) {
+        LOG.error("Failed to delete dumpfile: " + dumpFile);
+      }
     }
-    if (raf!=null) {
+    if (raf != null) {
       try {
         raf.close();
       } catch (IOException e) {
         e.printStackTrace();
       }
     }
-    File dumpFile = new File(dumpFilePath);
-    if (dumpFile.delete()) {
-      LOG.error("Failed to delete dumpfile: "+ dumpFile);
-    }
+  }
+  
+  @VisibleForTesting
+  ConcurrentNavigableMap<OffsetRange, WriteCtx> getPendingWritesForTest(){
+    return pendingWrites;
+  }
+  
+  @VisibleForTesting
+  ConcurrentNavigableMap<Long, CommitCtx> getPendingCommitsForTest(){
+    return pendingCommits;
+  }
+  
+  @VisibleForTesting
+  long getNextOffsetForTest() {
+    return nextOffset.get();
+  }
+  
+  @VisibleForTesting
+  void setNextOffsetForTest(long newValue) {
+    nextOffset.set(newValue);
+  }
+  
+  @VisibleForTesting
+  void setActiveStatusForTest(boolean activeState) {
+    this.activeState = activeState;
   }
 }


Reply via email to