Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java?rev=1537330&r1=1537329&r2=1537330&view=diff ============================================================================== --- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java (original) +++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java Wed Oct 30 22:21:59 2013 @@ -22,12 +22,15 @@ import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; import java.security.InvalidParameterException; import java.util.EnumSet; import java.util.Iterator; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.concurrent.locks.ReentrantLock; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -45,12 +48,18 @@ import org.apache.hadoop.nfs.nfs3.Nfs3Co import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes; import org.apache.hadoop.nfs.nfs3.Nfs3Status; import org.apache.hadoop.nfs.nfs3.request.WRITE3Request; +import org.apache.hadoop.nfs.nfs3.response.COMMIT3Response; import org.apache.hadoop.nfs.nfs3.response.WRITE3Response; import org.apache.hadoop.nfs.nfs3.response.WccAttr; import org.apache.hadoop.nfs.nfs3.response.WccData; import org.apache.hadoop.oncrpc.XDR; +import org.apache.hadoop.oncrpc.security.VerifierNone; +import org.apache.hadoop.util.Daemon; import org.jboss.netty.channel.Channel; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + /** * OpenFileCtx saves the context of one HDFS file output stream. Access to it is * synchronized by its member lock. @@ -58,34 +67,95 @@ import org.jboss.netty.channel.Channel; class OpenFileCtx { public static final Log LOG = LogFactory.getLog(OpenFileCtx.class); - /** - * Lock to synchronize OpenFileCtx changes. Thread should get this lock before - * any read/write operation to an OpenFileCtx object - */ - private final ReentrantLock ctxLock; + // Pending writes water mark for dump, 1MB + private static long DUMP_WRITE_WATER_MARK = 1024 * 1024; + + static enum COMMIT_STATUS { + COMMIT_FINISHED, + COMMIT_WAIT, + COMMIT_INACTIVE_CTX, + COMMIT_INACTIVE_WITH_PENDING_WRITE, + COMMIT_ERROR, + COMMIT_DO_SYNC; + } + private final DFSClient client; + private final IdUserGroup iug; + // The stream status. False means the stream is closed. - private boolean activeState; + private volatile boolean activeState; // The stream write-back status. True means one thread is doing write back. - private boolean asyncStatus; + private volatile boolean asyncStatus; + /** + * The current offset of the file in HDFS. All the content before this offset + * has been written back to HDFS. + */ + private AtomicLong nextOffset; private final HdfsDataOutputStream fos; - private final Nfs3FileAttributes latestAttr; - private long nextOffset; + + // It's updated after each sync to HDFS + private Nfs3FileAttributes latestAttr; - private final SortedMap<OffsetRange, WriteCtx> pendingWrites; + private final ConcurrentNavigableMap<OffsetRange, WriteCtx> pendingWrites; + + private final ConcurrentNavigableMap<Long, CommitCtx> pendingCommits; + + static class CommitCtx { + private final long offset; + private final Channel channel; + private final int xid; + private final Nfs3FileAttributes preOpAttr; + + // Remember time for debug purpose + private final long startTime; + + long getOffset() { + return offset; + } + + Channel getChannel() { + return channel; + } + + int getXid() { + return xid; + } + + Nfs3FileAttributes getPreOpAttr() { + return preOpAttr; + } + + long getStartTime() { + return startTime; + } + + CommitCtx(long offset, Channel channel, int xid, + Nfs3FileAttributes preOpAttr) { + this.offset = offset; + this.channel = channel; + this.xid = xid; + this.preOpAttr = preOpAttr; + this.startTime = System.currentTimeMillis(); + } + + @Override + public String toString() { + return String.format("offset: %d xid: %d startTime: %d", offset, xid, + startTime); + } + } // The last write, commit request or write-back event. Updating time to keep // output steam alive. private long lastAccessTime; - // Pending writes water mark for dump, 1MB - private static int DUMP_WRITE_WATER_MARK = 1024 * 1024; + private volatile boolean enabledDump; private FileOutputStream dumpOut; - private long nonSequentialWriteInMemory; - private boolean enabledDump; + private AtomicLong nonSequentialWriteInMemory; private RandomAccessFile raf; private final String dumpFilePath; + private Daemon dumpThread; private void updateLastAccessTime() { lastAccessTime = System.currentTimeMillis(); @@ -95,91 +165,57 @@ class OpenFileCtx { return System.currentTimeMillis() - lastAccessTime > streamTimeout; } + public long getNextOffset() { + return nextOffset.get(); + } + // Increase or decrease the memory occupation of non-sequential writes private long updateNonSequentialWriteInMemory(long count) { - nonSequentialWriteInMemory += count; + long newValue = nonSequentialWriteInMemory.addAndGet(count); if (LOG.isDebugEnabled()) { LOG.debug("Update nonSequentialWriteInMemory by " + count + " new value:" - + nonSequentialWriteInMemory); + + newValue); } - if (nonSequentialWriteInMemory < 0) { - LOG.error("nonSequentialWriteInMemory is negative after update with count " - + count); - throw new IllegalArgumentException( - "nonSequentialWriteInMemory is negative after update with count " - + count); - } - return nonSequentialWriteInMemory; + Preconditions.checkState(newValue >= 0, + "nonSequentialWriteInMemory is negative after update with count " + + count); + return newValue; } OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr, - String dumpFilePath) { + String dumpFilePath, DFSClient client, IdUserGroup iug) { this.fos = fos; this.latestAttr = latestAttr; - pendingWrites = new TreeMap<OffsetRange, WriteCtx>(); + // We use the ReverseComparatorOnMin as the comparator of the map. In this + // way, we first dump the data with larger offset. In the meanwhile, we + // retrieve the last element to write back to HDFS. + pendingWrites = new ConcurrentSkipListMap<OffsetRange, WriteCtx>( + OffsetRange.ReverseComparatorOnMin); + + pendingCommits = new ConcurrentSkipListMap<Long, CommitCtx>(); + updateLastAccessTime(); activeState = true; asyncStatus = false; dumpOut = null; raf = null; - nonSequentialWriteInMemory = 0; + nonSequentialWriteInMemory = new AtomicLong(0); + this.dumpFilePath = dumpFilePath; enabledDump = dumpFilePath == null ? false: true; - nextOffset = latestAttr.getSize(); - try { - assert(nextOffset == this.fos.getPos()); + nextOffset = new AtomicLong(); + nextOffset.set(latestAttr.getSize()); + try { + assert(nextOffset.get() == this.fos.getPos()); } catch (IOException e) {} - - ctxLock = new ReentrantLock(true); - } - - private void lockCtx() { - if (LOG.isTraceEnabled()) { - StackTraceElement[] stacktrace = Thread.currentThread().getStackTrace(); - StackTraceElement e = stacktrace[2]; - String methodName = e.getMethodName(); - LOG.trace("lock ctx, caller:" + methodName); - } - ctxLock.lock(); - } - - private void unlockCtx() { - ctxLock.unlock(); - if (LOG.isTraceEnabled()) { - StackTraceElement[] stacktrace = Thread.currentThread().getStackTrace(); - StackTraceElement e = stacktrace[2]; - String methodName = e.getMethodName(); - LOG.info("unlock ctx, caller:" + methodName); - } - } - - // Make a copy of the latestAttr - public Nfs3FileAttributes copyLatestAttr() { - Nfs3FileAttributes ret; - lockCtx(); - try { - ret = new Nfs3FileAttributes(latestAttr); - } finally { - unlockCtx(); - } - return ret; - } - - private long getNextOffsetUnprotected() { - assert(ctxLock.isLocked()); - return nextOffset; + dumpThread = null; + this.client = client; + this.iug = iug; } - public long getNextOffset() { - long ret; - lockCtx(); - try { - ret = getNextOffsetUnprotected(); - } finally { - unlockCtx(); - } - return ret; + public Nfs3FileAttributes getLatestAttr() { + return latestAttr; } // Get flushed offset. Note that flushed data may not be persisted. @@ -188,12 +224,7 @@ class OpenFileCtx { } // Check if need to dump the new writes - private void checkDump(long count) { - assert (ctxLock.isLocked()); - - // Always update the in memory count - updateNonSequentialWriteInMemory(count); - + private void checkDump() { if (!enabledDump) { if (LOG.isDebugEnabled()) { LOG.debug("Do nothing, dump is disabled."); @@ -201,66 +232,129 @@ class OpenFileCtx { return; } - if (nonSequentialWriteInMemory < DUMP_WRITE_WATER_MARK) { + if (nonSequentialWriteInMemory.get() < DUMP_WRITE_WATER_MARK) { return; } - // Create dump outputstream for the first time - if (dumpOut == null) { - LOG.info("Create dump file:" + dumpFilePath); - File dumpFile = new File(dumpFilePath); - try { - if (dumpFile.exists()) { - LOG.fatal("The dump file should not exist:" + dumpFilePath); - throw new RuntimeException("The dump file should not exist:" - + dumpFilePath); + // wake up the dumper thread to dump the data + synchronized (this) { + if (nonSequentialWriteInMemory.get() >= DUMP_WRITE_WATER_MARK) { + if (LOG.isDebugEnabled()) { + LOG.debug("Asking dumper to dump..."); + } + if (dumpThread == null) { + dumpThread = new Daemon(new Dumper()); + dumpThread.start(); + } else { + this.notifyAll(); } - dumpOut = new FileOutputStream(dumpFile); - } catch (IOException e) { - LOG.error("Got failure when creating dump stream " + dumpFilePath - + " with error:" + e); - enabledDump = false; - IOUtils.cleanup(LOG, dumpOut); - return; } } - // Get raf for the first dump - if (raf == null) { - try { - raf = new RandomAccessFile(dumpFilePath, "r"); - } catch (FileNotFoundException e) { - LOG.error("Can't get random access to file " + dumpFilePath); - // Disable dump - enabledDump = false; - return; + } + + class Dumper implements Runnable { + /** Dump data into a file */ + private void dump() { + // Create dump outputstream for the first time + if (dumpOut == null) { + LOG.info("Create dump file:" + dumpFilePath); + File dumpFile = new File(dumpFilePath); + try { + synchronized (this) { + // check if alive again + Preconditions.checkState(dumpFile.createNewFile(), + "The dump file should not exist: %s", dumpFilePath); + dumpOut = new FileOutputStream(dumpFile); + } + } catch (IOException e) { + LOG.error("Got failure when creating dump stream " + dumpFilePath, e); + enabledDump = false; + if (dumpOut != null) { + try { + dumpOut.close(); + } catch (IOException e1) { + LOG.error("Can't close dump stream " + dumpFilePath, e); + } + } + return; + } } - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Start dump, current write number:" + pendingWrites.size()); - } - Iterator<OffsetRange> it = pendingWrites.keySet().iterator(); - while (it.hasNext()) { - OffsetRange key = it.next(); - WriteCtx writeCtx = pendingWrites.get(key); - try { - long dumpedDataSize = writeCtx.dumpData(dumpOut, raf); - if (dumpedDataSize > 0) { - updateNonSequentialWriteInMemory(-dumpedDataSize); + + // Get raf for the first dump + if (raf == null) { + try { + raf = new RandomAccessFile(dumpFilePath, "r"); + } catch (FileNotFoundException e) { + LOG.error("Can't get random access to file " + dumpFilePath); + // Disable dump + enabledDump = false; + return; } - } catch (IOException e) { - LOG.error("Dump data failed:" + writeCtx + " with error:" + e); - // Disable dump - enabledDump = false; - return; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Start dump. Before dump, nonSequentialWriteInMemory == " + + nonSequentialWriteInMemory.get()); + } + + Iterator<OffsetRange> it = pendingWrites.keySet().iterator(); + while (activeState && it.hasNext() + && nonSequentialWriteInMemory.get() > 0) { + OffsetRange key = it.next(); + WriteCtx writeCtx = pendingWrites.get(key); + if (writeCtx == null) { + // This write was just deleted + continue; + } + try { + long dumpedDataSize = writeCtx.dumpData(dumpOut, raf); + if (dumpedDataSize > 0) { + updateNonSequentialWriteInMemory(-dumpedDataSize); + } + } catch (IOException e) { + LOG.error("Dump data failed:" + writeCtx + " with error:" + e + + " OpenFileCtx state:" + activeState); + // Disable dump + enabledDump = false; + return; + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("After dump, nonSequentialWriteInMemory == " + + nonSequentialWriteInMemory.get()); } } - if (nonSequentialWriteInMemory != 0) { - LOG.fatal("After dump, nonSequentialWriteInMemory is not zero: " - + nonSequentialWriteInMemory); - throw new RuntimeException( - "After dump, nonSequentialWriteInMemory is not zero: " - + nonSequentialWriteInMemory); + + @Override + public void run() { + while (activeState && enabledDump) { + try { + if (nonSequentialWriteInMemory.get() >= DUMP_WRITE_WATER_MARK) { + dump(); + } + synchronized (OpenFileCtx.this) { + if (nonSequentialWriteInMemory.get() < DUMP_WRITE_WATER_MARK) { + try { + OpenFileCtx.this.wait(); + if (LOG.isDebugEnabled()) { + LOG.debug("Dumper woke up"); + } + } catch (InterruptedException e) { + LOG.info("Dumper is interrupted, dumpFilePath= " + + OpenFileCtx.this.dumpFilePath); + } + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Dumper checking OpenFileCtx activeState: " + activeState + + " enabledDump: " + enabledDump); + } + } catch (Throwable t) { + LOG.info("Dumper get Throwable: " + t + ". dumpFilePath: " + + OpenFileCtx.this.dumpFilePath); + } + } } } @@ -284,143 +378,252 @@ class OpenFileCtx { public void receivedNewWrite(DFSClient dfsClient, WRITE3Request request, Channel channel, int xid, AsyncDataService asyncDataService, IdUserGroup iug) { - - lockCtx(); - try { - if (!activeState) { - LOG.info("OpenFileCtx is inactive, fileId:" - + request.getHandle().getFileId()); - WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr); - WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, - fileWcc, 0, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF); - Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid)); - } else { - // Handle repeated write requests(same xid or not). - // If already replied, send reply again. If not replied, drop the - // repeated request. - WriteCtx existantWriteCtx = checkRepeatedWriteRequest(request, channel, - xid); - if (existantWriteCtx != null) { - if (!existantWriteCtx.getReplied()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Repeated write request which hasn't be served: xid=" - + xid + ", drop it."); - } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Repeated write request which is already served: xid=" - + xid + ", resend response."); - } - WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr); - WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK, - fileWcc, request.getCount(), request.getStableHow(), - Nfs3Constant.WRITE_COMMIT_VERF); - Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid)); + + if (!activeState) { + LOG.info("OpenFileCtx is inactive, fileId:" + + request.getHandle().getFileId()); + WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr); + WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, + fileWcc, 0, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF); + Nfs3Utils.writeChannel(channel, + response.writeHeaderAndResponse(new XDR(), xid, new VerifierNone()), + xid); + } else { + // Update the write time first + updateLastAccessTime(); + + // Handle repeated write requests (same xid or not). + // If already replied, send reply again. If not replied, drop the + // repeated request. + WriteCtx existantWriteCtx = checkRepeatedWriteRequest(request, channel, + xid); + if (existantWriteCtx != null) { + if (!existantWriteCtx.getReplied()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Repeated write request which hasn't be served: xid=" + + xid + ", drop it."); } - updateLastAccessTime(); - } else { - receivedNewWriteInternal(dfsClient, request, channel, xid, - asyncDataService, iug); + if (LOG.isDebugEnabled()) { + LOG.debug("Repeated write request which is already served: xid=" + + xid + ", resend response."); + } + WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr); + WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK, + fileWcc, request.getCount(), request.getStableHow(), + Nfs3Constant.WRITE_COMMIT_VERF); + Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse( + new XDR(), xid, new VerifierNone()), xid); } + } else { + // not a repeated write request + receivedNewWriteInternal(dfsClient, request, channel, xid, + asyncDataService, iug); } - - } finally { - unlockCtx(); } } - private void receivedNewWriteInternal(DFSClient dfsClient, - WRITE3Request request, Channel channel, int xid, - AsyncDataService asyncDataService, IdUserGroup iug) { + @VisibleForTesting + public static void alterWriteRequest(WRITE3Request request, long cachedOffset) { long offset = request.getOffset(); int count = request.getCount(); - WriteStableHow stableHow = request.getStableHow(); - - // Get file length, fail non-append call - WccAttr preOpAttr = latestAttr.getWccAttr(); + long smallerCount = offset + count - cachedOffset; if (LOG.isDebugEnabled()) { - LOG.debug("requesed offset=" + offset + " and current filesize=" - + preOpAttr.getSize()); + LOG.debug(String.format("Got overwrite with appended data (%d-%d)," + + " current offset %d," + " drop the overlapped section (%d-%d)" + + " and append new data (%d-%d).", offset, (offset + count - 1), + cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset + + count - 1))); + } + + ByteBuffer data = request.getData(); + Preconditions.checkState(data.position() == 0, + "The write request data has non-zero position"); + data.position((int) (cachedOffset - offset)); + Preconditions.checkState(data.limit() - data.position() == smallerCount, + "The write request buffer has wrong limit/position regarding count"); + + request.setOffset(cachedOffset); + request.setCount((int) smallerCount); + } + + /** + * Creates and adds a WriteCtx into the pendingWrites map. This is a + * synchronized method to handle concurrent writes. + * + * @return A non-null {@link WriteCtx} instance if the incoming write + * request's offset >= nextOffset. Otherwise null. + */ + private synchronized WriteCtx addWritesToCache(WRITE3Request request, + Channel channel, int xid) { + long offset = request.getOffset(); + int count = request.getCount(); + long cachedOffset = nextOffset.get(); + int originalCount = WriteCtx.INVALID_ORIGINAL_COUNT; + + if (LOG.isDebugEnabled()) { + LOG.debug("requesed offset=" + offset + " and current offset=" + + cachedOffset); } - long nextOffset = getNextOffsetUnprotected(); - if (offset == nextOffset) { - LOG.info("Add to the list, update nextOffset and notify the writer," - + " nextOffset:" + nextOffset); - WriteCtx writeCtx = new WriteCtx(request.getHandle(), - request.getOffset(), request.getCount(), request.getStableHow(), - request.getData().array(), channel, xid, false, DataState.NO_DUMP); - addWrite(writeCtx); + // Handle a special case first + if ((offset < cachedOffset) && (offset + count > cachedOffset)) { + // One Linux client behavior: after a file is closed and reopened to + // write, the client sometimes combines previous written data(could still + // be in kernel buffer) with newly appended data in one write. This is + // usually the first write after file reopened. In this + // case, we log the event and drop the overlapped section. + LOG.warn(String.format("Got overwrite with appended data (%d-%d)," + + " current offset %d," + " drop the overlapped section (%d-%d)" + + " and append new data (%d-%d).", offset, (offset + count - 1), + cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset + + count - 1))); + + if (!pendingWrites.isEmpty()) { + LOG.warn("There are other pending writes, fail this jumbo write"); + return null; + } - // Create an async task and change openFileCtx status to indicate async - // task pending + LOG.warn("Modify this write to write only the appended data"); + alterWriteRequest(request, cachedOffset); + + // Update local variable + originalCount = count; + offset = request.getOffset(); + count = request.getCount(); + } + + // Fail non-append call + if (offset < cachedOffset) { + LOG.warn("(offset,count,nextOffset):" + "(" + offset + "," + count + "," + + nextOffset + ")"); + return null; + } else { + DataState dataState = offset == cachedOffset ? WriteCtx.DataState.NO_DUMP + : WriteCtx.DataState.ALLOW_DUMP; + WriteCtx writeCtx = new WriteCtx(request.getHandle(), + request.getOffset(), request.getCount(), originalCount, + request.getStableHow(), request.getData(), channel, xid, false, + dataState); + if (LOG.isDebugEnabled()) { + LOG.debug("Add new write to the list with nextOffset " + cachedOffset + + " and requesed offset=" + offset); + } + if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) { + // update the memory size + updateNonSequentialWriteInMemory(count); + } + // check if there is a WriteCtx with the same range in pendingWrites + WriteCtx oldWriteCtx = checkRepeatedWriteRequest(request, channel, xid); + if (oldWriteCtx == null) { + addWrite(writeCtx); + } else { + LOG.warn("Got a repeated request, same range, with xid:" + + writeCtx.getXid()); + } + return writeCtx; + } + } + + /** Process an overwrite write request */ + private void processOverWrite(DFSClient dfsClient, WRITE3Request request, + Channel channel, int xid, IdUserGroup iug) { + WccData wccData = new WccData(latestAttr.getWccAttr(), null); + long offset = request.getOffset(); + int count = request.getCount(); + WriteStableHow stableHow = request.getStableHow(); + WRITE3Response response; + long cachedOffset = nextOffset.get(); + if (offset + count > cachedOffset) { + LOG.warn("Treat this jumbo write as a real random write, no support."); + response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0, + WriteStableHow.UNSTABLE, Nfs3Constant.WRITE_COMMIT_VERF); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Process perfectOverWrite"); + } + // TODO: let executor handle perfect overwrite + response = processPerfectOverWrite(dfsClient, offset, count, stableHow, + request.getData().array(), + Nfs3Utils.getFileIdPath(request.getHandle()), wccData, iug); + } + updateLastAccessTime(); + Nfs3Utils.writeChannel(channel, + response.writeHeaderAndResponse(new XDR(), xid, new VerifierNone()), + xid); + } + + /** + * Check if we can start the write (back to HDFS) now. If there is no hole for + * writing, and there is no other threads writing (i.e., asyncStatus is + * false), start the writing and set asyncStatus to true. + * + * @return True if the new write is sequencial and we can start writing + * (including the case that there is already a thread writing). + */ + private synchronized boolean checkAndStartWrite( + AsyncDataService asyncDataService, WriteCtx writeCtx) { + + if (writeCtx.getOffset() == nextOffset.get()) { if (!asyncStatus) { + if (LOG.isDebugEnabled()) { + LOG.debug("Trigger the write back task. Current nextOffset: " + + nextOffset.get()); + } asyncStatus = true; asyncDataService.execute(new AsyncDataService.WriteBackTask(this)); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("The write back thread is working."); + } } - - // Update the write time first - updateLastAccessTime(); - Nfs3FileAttributes postOpAttr = new Nfs3FileAttributes(latestAttr); - - // Send response immediately for unstable write - if (request.getStableHow() == WriteStableHow.UNSTABLE) { - WccData fileWcc = new WccData(preOpAttr, postOpAttr); - WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK, - fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF); - Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid)); - writeCtx.setReplied(true); - } - - } else if (offset > nextOffset) { - LOG.info("Add new write to the list but not update nextOffset:" - + nextOffset); - WriteCtx writeCtx = new WriteCtx(request.getHandle(), - request.getOffset(), request.getCount(), request.getStableHow(), - request.getData().array(), channel, xid, false, DataState.ALLOW_DUMP); - addWrite(writeCtx); + return true; + } else { + return false; + } + } - // Check if need to dump some pending requests to file - checkDump(request.getCount()); - updateLastAccessTime(); - Nfs3FileAttributes postOpAttr = new Nfs3FileAttributes(latestAttr); - - // In test, noticed some Linux client sends a batch (e.g., 1MB) - // of reordered writes and won't send more writes until it gets - // responses of the previous batch. So here send response immediately for - // unstable non-sequential write - if (request.getStableHow() == WriteStableHow.UNSTABLE) { - WccData fileWcc = new WccData(preOpAttr, postOpAttr); - WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK, - fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF); - Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid)); - writeCtx.setReplied(true); - } + private void receivedNewWriteInternal(DFSClient dfsClient, + WRITE3Request request, Channel channel, int xid, + AsyncDataService asyncDataService, IdUserGroup iug) { + WriteStableHow stableHow = request.getStableHow(); + WccAttr preOpAttr = latestAttr.getWccAttr(); + int count = request.getCount(); - } else { + WriteCtx writeCtx = addWritesToCache(request, channel, xid); + if (writeCtx == null) { // offset < nextOffset - LOG.warn("(offset,count,nextOffset):" + "(" + offset + "," + count + "," - + nextOffset + ")"); - WccData wccData = new WccData(preOpAttr, null); - WRITE3Response response; + processOverWrite(dfsClient, request, channel, xid, iug); + } else { + // The writes is added to pendingWrites. + // Check and start writing back if necessary + boolean startWriting = checkAndStartWrite(asyncDataService, writeCtx); + if (!startWriting) { + // offset > nextOffset. check if we need to dump data + checkDump(); + + // In test, noticed some Linux client sends a batch (e.g., 1MB) + // of reordered writes and won't send more writes until it gets + // responses of the previous batch. So here send response immediately + // for unstable non-sequential write + if (stableHow != WriteStableHow.UNSTABLE) { + LOG.info("Have to change stable write to unstable write:" + + request.getStableHow()); + stableHow = WriteStableHow.UNSTABLE; + } - if (offset + count > nextOffset) { - LOG.warn("Haven't noticed any partial overwrite out of a sequential file" - + "write requests, so treat it as a real random write, no support."); - response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0, - WriteStableHow.UNSTABLE, 0); - } else { if (LOG.isDebugEnabled()) { - LOG.debug("Process perfectOverWrite"); + LOG.debug("UNSTABLE write request, send response for offset: " + + writeCtx.getOffset()); } - response = processPerfectOverWrite(dfsClient, offset, count, stableHow, - request.getData().array(), - Nfs3Utils.getFileIdPath(request.getHandle()), wccData, iug); + WccData fileWcc = new WccData(preOpAttr, latestAttr); + WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK, + fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF); + Nfs3Utils + .writeChannel(channel, response.writeHeaderAndResponse(new XDR(), + xid, new VerifierNone()), xid); + writeCtx.setReplied(true); } - - updateLastAccessTime(); - Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid)); } } @@ -432,7 +635,6 @@ class OpenFileCtx { private WRITE3Response processPerfectOverWrite(DFSClient dfsClient, long offset, int count, WriteStableHow stableHow, byte[] data, String path, WccData wccData, IdUserGroup iug) { - assert (ctxLock.isLocked()); WRITE3Response response = null; // Read the content back @@ -443,21 +645,30 @@ class OpenFileCtx { try { // Sync file data and length to avoid partial read failure fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); - + } catch (ClosedChannelException closedException) { + LOG.info("The FSDataOutputStream has been closed. " + + "Continue processing the perfect overwrite."); + } catch (IOException e) { + LOG.info("hsync failed when processing possible perfect overwrite, path=" + + path + " error:" + e); + return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow, + Nfs3Constant.WRITE_COMMIT_VERF); + } + + try { fis = new FSDataInputStream(dfsClient.open(path)); readCount = fis.read(offset, readbuffer, 0, count); if (readCount < count) { LOG.error("Can't read back " + count + " bytes, partial read size:" + readCount); - return response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, - stableHow, Nfs3Constant.WRITE_COMMIT_VERF); + return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow, + Nfs3Constant.WRITE_COMMIT_VERF); } - } catch (IOException e) { LOG.info("Read failed when processing possible perfect overwrite, path=" + path + " error:" + e); - return response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, - stableHow, Nfs3Constant.WRITE_COMMIT_VERF); + return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow, + Nfs3Constant.WRITE_COMMIT_VERF); } finally { IOUtils.cleanup(LOG, fis); } @@ -467,7 +678,7 @@ class OpenFileCtx { if (comparator.compare(readbuffer, 0, readCount, data, 0, count) != 0) { LOG.info("Perfect overwrite has different content"); response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0, - stableHow, 0); + stableHow, Nfs3Constant.WRITE_COMMIT_VERF); } else { LOG.info("Perfect overwrite has same content," + " updating the mtime, then return success"); @@ -479,45 +690,63 @@ class OpenFileCtx { LOG.info("Got error when processing perfect overwrite, path=" + path + " error:" + e); return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow, - 0); + Nfs3Constant.WRITE_COMMIT_VERF); } wccData.setPostOpAttr(postOpAttr); response = new WRITE3Response(Nfs3Status.NFS3_OK, wccData, count, - stableHow, 0); + stableHow, Nfs3Constant.WRITE_COMMIT_VERF); } return response; } - - public final static int COMMIT_FINISHED = 0; - public final static int COMMIT_WAIT = 1; - public final static int COMMIT_INACTIVE_CTX = 2; - public final static int COMMIT_ERROR = 3; - /** - * return one commit status: COMMIT_FINISHED, COMMIT_WAIT, - * COMMIT_INACTIVE_CTX, COMMIT_ERROR - */ - public int checkCommit(long commitOffset) { - int ret = COMMIT_WAIT; + public COMMIT_STATUS checkCommit(DFSClient dfsClient, long commitOffset, + Channel channel, int xid, Nfs3FileAttributes preOpAttr) { + // Keep stream active + updateLastAccessTime(); + Preconditions.checkState(commitOffset >= 0); - lockCtx(); - try { - if (!activeState) { - ret = COMMIT_INACTIVE_CTX; - } else { - ret = checkCommitInternal(commitOffset); + COMMIT_STATUS ret = checkCommitInternal(commitOffset, channel, xid, + preOpAttr); + if (LOG.isDebugEnabled()) { + LOG.debug("Got commit status: " + ret.name()); + } + // Do the sync outside the lock + if (ret == COMMIT_STATUS.COMMIT_DO_SYNC + || ret == COMMIT_STATUS.COMMIT_FINISHED) { + try { + // Sync file data and length + fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); + // Nothing to do for metadata since attr related change is pass-through + } catch (ClosedChannelException cce) { + if (pendingWrites.isEmpty()) { + ret = COMMIT_STATUS.COMMIT_FINISHED; + } else { + ret = COMMIT_STATUS.COMMIT_ERROR; + } + } catch (IOException e) { + LOG.error("Got stream error during data sync:" + e); + // Do nothing. Stream will be closed eventually by StreamMonitor. + // status = Nfs3Status.NFS3ERR_IO; + ret = COMMIT_STATUS.COMMIT_ERROR; } - } finally { - unlockCtx(); } return ret; } - - private int checkCommitInternal(long commitOffset) { - if (commitOffset == 0) { - // Commit whole file - commitOffset = getNextOffsetUnprotected(); + + /** + * return one commit status: COMMIT_FINISHED, COMMIT_WAIT, + * COMMIT_INACTIVE_CTX, COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR + */ + private synchronized COMMIT_STATUS checkCommitInternal(long commitOffset, + Channel channel, int xid, Nfs3FileAttributes preOpAttr) { + if (!activeState) { + if (pendingWrites.isEmpty()) { + return COMMIT_STATUS.COMMIT_INACTIVE_CTX; + } else { + // TODO: return success if already committed + return COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE; + } } long flushed = 0; @@ -525,45 +754,52 @@ class OpenFileCtx { flushed = getFlushedOffset(); } catch (IOException e) { LOG.error("Can't get flushed offset, error:" + e); - return COMMIT_ERROR; + return COMMIT_STATUS.COMMIT_ERROR; } - LOG.info("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset); - if (flushed < commitOffset) { - // Keep stream active - updateLastAccessTime(); - return COMMIT_WAIT; + if (LOG.isDebugEnabled()) { + LOG.debug("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset); } - int ret = COMMIT_WAIT; - try { - // Sync file data and length - fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); - // Nothing to do for metadata since attr related change is pass-through - ret = COMMIT_FINISHED; - } catch (IOException e) { - LOG.error("Got stream error during data sync:" + e); - // Do nothing. Stream will be closed eventually by StreamMonitor. - ret = COMMIT_ERROR; + if (commitOffset > 0) { + if (commitOffset > flushed) { + CommitCtx commitCtx = new CommitCtx(commitOffset, channel, xid, + preOpAttr); + pendingCommits.put(commitOffset, commitCtx); + return COMMIT_STATUS.COMMIT_WAIT; + } else { + return COMMIT_STATUS.COMMIT_DO_SYNC; + } } - // Keep stream active - updateLastAccessTime(); - return ret; + Entry<OffsetRange, WriteCtx> key = pendingWrites.firstEntry(); + + // Commit whole file, commitOffset == 0 + if (pendingWrites.isEmpty()) { + // Note that, there is no guarantee data is synced. TODO: We could still + // do a sync here though the output stream might be closed. + return COMMIT_STATUS.COMMIT_FINISHED; + } else { + // Insert commit + long maxOffset = key.getKey().getMax() - 1; + Preconditions.checkState(maxOffset > 0); + CommitCtx commitCtx = new CommitCtx(maxOffset, channel, xid, preOpAttr); + pendingCommits.put(maxOffset, commitCtx); + return COMMIT_STATUS.COMMIT_WAIT; + } } private void addWrite(WriteCtx writeCtx) { - assert (ctxLock.isLocked()); long offset = writeCtx.getOffset(); int count = writeCtx.getCount(); + // For the offset range (min, max), min is inclusive, and max is exclusive pendingWrites.put(new OffsetRange(offset, offset + count), writeCtx); } - /** * Check stream status to decide if it should be closed * @return true, remove stream; false, keep stream */ - public boolean streamCleanup(long fileId, long streamTimeout) { + public synchronized boolean streamCleanup(long fileId, long streamTimeout) { if (streamTimeout < WriteManager.MINIMIUM_STREAM_TIMEOUT) { throw new InvalidParameterException("StreamTimeout" + streamTimeout + "ms is less than MINIMIUM_STREAM_TIMEOUT " @@ -571,131 +807,191 @@ class OpenFileCtx { } boolean flag = false; - if (!ctxLock.tryLock()) { - if (LOG.isTraceEnabled()) { - LOG.trace("Another thread is working on it" + ctxLock.toString()); - } - return flag; - } - - try { - // Check the stream timeout - if (checkStreamTimeout(streamTimeout)) { - LOG.info("closing stream for fileId:" + fileId); - cleanup(); - flag = true; + // Check the stream timeout + if (checkStreamTimeout(streamTimeout)) { + if (LOG.isDebugEnabled()) { + LOG.debug("closing stream for fileId:" + fileId); } - } finally { - unlockCtx(); + cleanup(); + flag = true; } return flag; } - // Invoked by AsynDataService to do the write back - public void executeWriteBack() { - long nextOffset; - OffsetRange key; - WriteCtx writeCtx; - - try { - // Don't lock OpenFileCtx for all writes to reduce the timeout of other - // client request to the same file - while (true) { - lockCtx(); - if (!asyncStatus) { - // This should never happen. There should be only one thread working - // on one OpenFileCtx anytime. - LOG.fatal("The openFileCtx has false async status"); - throw new RuntimeException("The openFileCtx has false async status"); - } - // Any single write failure can change activeState to false, so do the - // check each loop. - if (pendingWrites.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("The asyn write task has no pendding writes, fileId: " - + latestAttr.getFileId()); - } - break; + /** + * Get (and remove) the next WriteCtx from {@link #pendingWrites} if possible. + * + * @return Null if {@link #pendingWrites} is null, or the next WriteCtx's + * offset is larger than nextOffSet. + */ + private synchronized WriteCtx offerNextToWrite() { + if (pendingWrites.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("The asyn write task has no pending writes, fileId: " + + latestAttr.getFileId()); + } + // process pending commit again to handle this race: a commit is added + // to pendingCommits map just after the last doSingleWrite returns. + // There is no pending write and the commit should be handled by the + // last doSingleWrite. Due to the race, the commit is left along and + // can't be processed until cleanup. Therefore, we should do another + // processCommits to fix the race issue. + processCommits(nextOffset.get()); // nextOffset has same value as + // flushedOffset + this.asyncStatus = false; + return null; + } + + Entry<OffsetRange, WriteCtx> lastEntry = pendingWrites.lastEntry(); + OffsetRange range = lastEntry.getKey(); + WriteCtx toWrite = lastEntry.getValue(); + + if (LOG.isTraceEnabled()) { + LOG.trace("range.getMin()=" + range.getMin() + " nextOffset=" + + nextOffset); + } + + long offset = nextOffset.get(); + if (range.getMin() > offset) { + if (LOG.isDebugEnabled()) { + LOG.debug("The next sequencial write has not arrived yet"); } - if (!activeState) { - if (LOG.isDebugEnabled()) { - LOG.debug("The openFileCtx is not active anymore, fileId: " - + latestAttr.getFileId()); - } - break; + processCommits(nextOffset.get()); // handle race + this.asyncStatus = false; + } else if (range.getMin() < offset && range.getMax() > offset) { + // shouldn't happen since we do sync for overlapped concurrent writers + LOG.warn("Got a overlapping write (" + range.getMin() + "," + + range.getMax() + "), nextOffset=" + offset + + ". Silently drop it now"); + pendingWrites.remove(range); + processCommits(nextOffset.get()); // handle race + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Remove write(" + range.getMin() + "-" + range.getMax() + + ") from the list"); } - - // Get the next sequential write - nextOffset = getNextOffsetUnprotected(); - key = pendingWrites.firstKey(); - if (LOG.isTraceEnabled()) { - LOG.trace("key.getMin()=" + key.getMin() + " nextOffset=" - + nextOffset); + // after writing, remove the WriteCtx from cache + pendingWrites.remove(range); + // update nextOffset + nextOffset.addAndGet(toWrite.getCount()); + if (LOG.isDebugEnabled()) { + LOG.debug("Change nextOffset to " + nextOffset.get()); } - - if (key.getMin() > nextOffset) { - if (LOG.isDebugEnabled()) { - LOG.info("The next sequencial write has not arrived yet"); - } - break; - - } else if (key.getMin() < nextOffset && key.getMax() > nextOffset) { - // Can't handle overlapping write. Didn't see it in tests yet. - LOG.fatal("Got a overlapping write (" + key.getMin() + "," - + key.getMax() + "), nextOffset=" + nextOffset); - throw new RuntimeException("Got a overlapping write (" + key.getMin() - + "," + key.getMax() + "), nextOffset=" + nextOffset); - - } else { - if (LOG.isTraceEnabled()) { - LOG.trace("Remove write(" + key.getMin() + "-" + key.getMax() - + ") from the list"); - } - writeCtx = pendingWrites.remove(key); + return toWrite; + } + + return null; + } + + /** Invoked by AsynDataService to write back to HDFS */ + void executeWriteBack() { + Preconditions.checkState(asyncStatus, + "The openFileCtx has false async status"); + try { + while (activeState) { + WriteCtx toWrite = offerNextToWrite(); + if (toWrite != null) { // Do the write - doSingleWrite(writeCtx); + doSingleWrite(toWrite); updateLastAccessTime(); + } else { + break; } - - unlockCtx(); } - + + if (!activeState && LOG.isDebugEnabled()) { + LOG.debug("The openFileCtx is not active anymore, fileId: " + + latestAttr.getFileId()); + } } finally { - // Always reset the async status so another async task can be created - // for this file + // make sure we reset asyncStatus to false asyncStatus = false; - if (ctxLock.isHeldByCurrentThread()) { - unlockCtx(); - } } } + private void processCommits(long offset) { + Preconditions.checkState(offset > 0); + long flushedOffset = 0; + Entry<Long, CommitCtx> entry = null; + + int status = Nfs3Status.NFS3ERR_IO; + try { + flushedOffset = getFlushedOffset(); + entry = pendingCommits.firstEntry(); + if (entry == null || entry.getValue().offset > flushedOffset) { + return; + } + + // Now do sync for the ready commits + // Sync file data and length + fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); + status = Nfs3Status.NFS3_OK; + } catch (ClosedChannelException cce) { + if (!pendingWrites.isEmpty()) { + LOG.error("Can't sync for fileId: " + latestAttr.getFileId() + + ". Channel closed with writes pending"); + } + status = Nfs3Status.NFS3ERR_IO; + } catch (IOException e) { + LOG.error("Got stream error during data sync:" + e); + // Do nothing. Stream will be closed eventually by StreamMonitor. + status = Nfs3Status.NFS3ERR_IO; + } + + // Update latestAttr + try { + latestAttr = Nfs3Utils.getFileAttr(client, + Nfs3Utils.getFileIdPath(latestAttr.getFileId()), iug); + } catch (IOException e) { + LOG.error("Can't get new file attr for fileId: " + latestAttr.getFileId()); + status = Nfs3Status.NFS3ERR_IO; + } + + if (latestAttr.getSize() != offset) { + LOG.error("After sync, the expect file size: " + offset + + ", however actual file size is: " + latestAttr.getSize()); + status = Nfs3Status.NFS3ERR_IO; + } + WccData wccData = new WccData(Nfs3Utils.getWccAttr(latestAttr), latestAttr); + + // Send response for the ready commits + while (entry != null && entry.getValue().offset <= flushedOffset) { + pendingCommits.remove(entry.getKey()); + CommitCtx commit = entry.getValue(); + + COMMIT3Response response = new COMMIT3Response(status, wccData, + Nfs3Constant.WRITE_COMMIT_VERF); + Nfs3Utils.writeChannelCommit(commit.getChannel(), response + .writeHeaderAndResponse(new XDR(), commit.getXid(), + new VerifierNone()), commit.getXid()); + + if (LOG.isDebugEnabled()) { + LOG.debug("FileId: " + latestAttr.getFileid() + " Service time:" + + (System.currentTimeMillis() - commit.getStartTime()) + + "ms. Sent response for commit:" + commit); + } + entry = pendingCommits.firstEntry(); + } + } + private void doSingleWrite(final WriteCtx writeCtx) { - assert(ctxLock.isLocked()); Channel channel = writeCtx.getChannel(); int xid = writeCtx.getXid(); long offset = writeCtx.getOffset(); int count = writeCtx.getCount(); WriteStableHow stableHow = writeCtx.getStableHow(); - byte[] data = null; - try { - data = writeCtx.getData(); - } catch (IOException e1) { - LOG.error("Failed to get request data offset:" + offset + " count:" - + count + " error:" + e1); - // Cleanup everything - cleanup(); - return; - } - assert (data.length == count); - + FileHandle handle = writeCtx.getHandle(); - LOG.info("do write, fileId: " + handle.getFileId() + " offset: " + offset - + " length:" + count + " stableHow:" + stableHow.getValue()); + if (LOG.isDebugEnabled()) { + LOG.debug("do write, fileId: " + handle.getFileId() + " offset: " + + offset + " length:" + count + " stableHow:" + stableHow.getValue()); + } try { - fos.write(data, 0, count); + // The write is not protected by lock. asyncState is used to make sure + // there is one thread doing write back at any time + writeCtx.writeData(fos); long flushedOffset = getFlushedOffset(); if (flushedOffset != (offset + count)) { @@ -703,27 +999,47 @@ class OpenFileCtx { + flushedOffset + " and nextOffset should be" + (offset + count)); } - nextOffset = flushedOffset; + // Reduce memory occupation size if request was allowed dumped - if (writeCtx.getDataState() == DataState.ALLOW_DUMP) { - updateNonSequentialWriteInMemory(-count); + if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) { + synchronized (writeCtx) { + if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) { + writeCtx.setDataState(WriteCtx.DataState.NO_DUMP); + updateNonSequentialWriteInMemory(-count); + if (LOG.isDebugEnabled()) { + LOG.debug("After writing " + handle.getFileId() + " at offset " + + offset + ", updated the memory count, new value:" + + nonSequentialWriteInMemory.get()); + } + } + } } if (!writeCtx.getReplied()) { WccAttr preOpAttr = latestAttr.getWccAttr(); WccData fileWcc = new WccData(preOpAttr, latestAttr); + if (writeCtx.getOriginalCount() != WriteCtx.INVALID_ORIGINAL_COUNT) { + LOG.warn("Return original count:" + writeCtx.getOriginalCount() + + " instead of real data count:" + count); + count = writeCtx.getOriginalCount(); + } WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK, fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF); - Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid)); + Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse( + new XDR(), xid, new VerifierNone()), xid); } - + + // Handle the waiting commits without holding any lock + processCommits(writeCtx.getOffset() + writeCtx.getCount()); + } catch (IOException e) { LOG.error("Error writing to fileId " + handle.getFileId() + " at offset " - + offset + " and length " + data.length, e); + + offset + " and length " + count, e); if (!writeCtx.getReplied()) { WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO); - Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid)); + Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse( + new XDR(), xid, new VerifierNone()), xid); // Keep stream open. Either client retries or SteamMonitor closes it. } @@ -733,9 +1049,21 @@ class OpenFileCtx { } } - private void cleanup() { - assert(ctxLock.isLocked()); + private synchronized void cleanup() { + if (!activeState) { + LOG.info("Current OpenFileCtx is already inactive, no need to cleanup."); + return; + } activeState = false; + + // stop the dump thread + if (dumpThread != null) { + dumpThread.interrupt(); + try { + dumpThread.join(3000); + } catch (InterruptedException e) { + } + } // Close stream try { @@ -753,36 +1081,62 @@ class OpenFileCtx { while (!pendingWrites.isEmpty()) { OffsetRange key = pendingWrites.firstKey(); LOG.info("Fail pending write: (" + key.getMin() + "," + key.getMax() - + "), nextOffset=" + getNextOffsetUnprotected()); + + "), nextOffset=" + nextOffset.get()); WriteCtx writeCtx = pendingWrites.remove(key); if (!writeCtx.getReplied()) { WccData fileWcc = new WccData(preOpAttr, latestAttr); WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, fileWcc, 0, writeCtx.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF); - Nfs3Utils.writeChannel(writeCtx.getChannel(), - response.send(new XDR(), writeCtx.getXid())); + Nfs3Utils.writeChannel(writeCtx.getChannel(), response + .writeHeaderAndResponse(new XDR(), writeCtx.getXid(), + new VerifierNone()), writeCtx.getXid()); } } // Cleanup dump file - if (dumpOut!=null){ + if (dumpOut != null) { try { dumpOut.close(); } catch (IOException e) { e.printStackTrace(); } + File dumpFile = new File(dumpFilePath); + if (dumpFile.exists() && !dumpFile.delete()) { + LOG.error("Failed to delete dumpfile: " + dumpFile); + } } - if (raf!=null) { + if (raf != null) { try { raf.close(); } catch (IOException e) { e.printStackTrace(); } } - File dumpFile = new File(dumpFilePath); - if (dumpFile.delete()) { - LOG.error("Failed to delete dumpfile: "+ dumpFile); - } + } + + @VisibleForTesting + ConcurrentNavigableMap<OffsetRange, WriteCtx> getPendingWritesForTest(){ + return pendingWrites; + } + + @VisibleForTesting + ConcurrentNavigableMap<Long, CommitCtx> getPendingCommitsForTest(){ + return pendingCommits; + } + + @VisibleForTesting + long getNextOffsetForTest() { + return nextOffset.get(); + } + + @VisibleForTesting + void setNextOffsetForTest(long newValue) { + nextOffset.set(newValue); + } + + @VisibleForTesting + void setActiveStatusForTest(boolean activeState) { + this.activeState = activeState; } }