http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
index 06318f0..24f195d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
@@ -48,17 +48,20 @@ import org.apache.hadoop.hbase.wal.WALKey;
 class FSWALEntry extends Entry {
   // The below data members are denoted 'transient' just to highlight these 
are not persisted;
   // they are only in memory and held here while passing over the ring buffer.
-  private final transient long sequence;
+  private final transient long txid;
   private final transient boolean inMemstore;
   private final transient HRegionInfo hri;
-  private final Set<byte[]> familyNames;
+  private final transient Set<byte[]> familyNames;
+  // In the new WAL logic, we will rewrite failed WAL entries to new WAL file, 
so we need to avoid
+  // calling stampRegionSequenceId again.
+  private transient boolean stamped = false;
 
-  FSWALEntry(final long sequence, final WALKey key, final WALEdit edit,
+  FSWALEntry(final long txid, final WALKey key, final WALEdit edit,
       final HRegionInfo hri, final boolean inMemstore) {
     super(key, edit);
     this.inMemstore = inMemstore;
     this.hri = hri;
-    this.sequence = sequence;
+    this.txid = txid;
     if (inMemstore) {
       // construct familyNames here to reduce the work of log sinker.
       ArrayList<Cell> cells = this.getEdit().getCells();
@@ -80,7 +83,7 @@ class FSWALEntry extends Entry {
   }
 
   public String toString() {
-    return "sequence=" + this.sequence + ", " + super.toString();
+    return "sequence=" + this.txid + ", " + super.toString();
   };
 
   boolean isInMemstore() {
@@ -92,10 +95,10 @@ class FSWALEntry extends Entry {
   }
 
   /**
-   * @return The sequence on the ring buffer when this edit was added.
+   * @return The transaction id of this edit.
    */
-  long getSequence() {
-    return this.sequence;
+  long getTxid() {
+    return this.txid;
   }
 
   /**
@@ -103,9 +106,12 @@ class FSWALEntry extends Entry {
    * SIDE-EFFECT is our stamping the sequenceid into every Cell AND setting 
the sequenceid into the
    * MVCC WriteEntry!!!!
    * @return The sequenceid we stamped on this edit.
-   * @throws IOException
    */
   long stampRegionSequenceId() throws IOException {
+    if (stamped) {
+      return getKey().getSequenceId();
+    }
+    stamped = true;
     long regionSequenceId = WALKey.NO_SEQUENCE_ID;
     MultiVersionConcurrencyControl mvcc = getKey().getMvcc();
     MultiVersionConcurrencyControl.WriteEntry we = null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
index 42abeae..7161e1e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
@@ -19,108 +19,42 @@
 
 package org.apache.hadoop.hbase.regionserver.wal;
 
-import static 
org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.DEFAULT_WAL_TRAILER_WARN_SIZE;
-import static 
org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.WAL_TRAILER_WARN_SIZE;
-
 import java.io.IOException;
+import java.io.OutputStream;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
-import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 
 /**
  * Writer for protobuf-based WAL.
  */
[email protected](HBaseInterfaceAudience.CONFIG)
-public class ProtobufLogWriter extends WriterBase {
-  private static final Log LOG = LogFactory.getLog(ProtobufLogWriter.class);
-  protected FSDataOutputStream output;
-  protected Codec.Encoder cellEncoder;
-  protected WALCellCodec.ByteStringCompressor compressor;
-  private boolean trailerWritten;
-  private WALTrailer trailer;
-  // maximum size of the wal Trailer in bytes. If a user writes/reads a 
trailer with size larger
-  // than this size, it is written/read respectively, with a WARN message in 
the log.
-  private int trailerWarnSize;
-
-  public ProtobufLogWriter() {
-    super();
-  }
-
-  protected WALCellCodec getCodec(Configuration conf, CompressionContext 
compressionContext)
-      throws IOException {
-    return WALCellCodec.create(conf, null, compressionContext);
-  }
-
-  protected WALHeader buildWALHeader(Configuration conf, WALHeader.Builder 
builder)
-      throws IOException {
-    if (!builder.hasWriterClsName()) {
-      builder.setWriterClsName(ProtobufLogWriter.class.getSimpleName());
-    }
-    if (!builder.hasCellCodecClsName()) {
-      builder.setCellCodecClsName(WALCellCodec.getWALCellCodecClass(conf));
-    }
-    return builder.build();
-  }
[email protected]
+public class ProtobufLogWriter extends AbstractProtobufLogWriter
+    implements DefaultWALProvider.Writer {
 
-  @Override
-  @SuppressWarnings("deprecation")
-  public void init(FileSystem fs, Path path, Configuration conf, boolean 
overwritable)
-  throws IOException {
-    super.init(fs, path, conf, overwritable);
-    assert this.output == null;
-    boolean doCompress = initializeCompressionContext(conf, path);
-    this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, 
DEFAULT_WAL_TRAILER_WARN_SIZE);
-    int bufferSize = FSUtils.getDefaultBufferSize(fs);
-    short replication = (short)conf.getInt(
-        "hbase.regionserver.hlog.replication", 
FSUtils.getDefaultReplication(fs, path));
-    long blockSize = conf.getLong("hbase.regionserver.hlog.blocksize",
-        FSUtils.getDefaultBlockSize(fs, path));
-    output = fs.createNonRecursive(path, overwritable, bufferSize, 
replication, blockSize, null);
-    output.write(ProtobufLogReader.PB_WAL_MAGIC);
-    boolean doTagCompress = doCompress
-        && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, 
true);
-    buildWALHeader(conf,
-        
WALHeader.newBuilder().setHasCompression(doCompress).setHasTagCompression(doTagCompress))
-        .writeDelimitedTo(output);
-
-    initAfterHeader(doCompress);
-
-    // instantiate trailer to default value.
-    trailer = WALTrailer.newBuilder().build();
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Initialized protobuf WAL=" + path + ", compression=" + 
doCompress);
-    }
-  }
+  private static final Log LOG = LogFactory.getLog(ProtobufLogWriter.class);
 
-  protected void initAfterHeader(boolean doCompress) throws IOException {
-    WALCellCodec codec = getCodec(conf, this.compressionContext);
-    this.cellEncoder = codec.getEncoder(this.output);
-    if (doCompress) {
-      this.compressor = codec.getByteStringCompressor();
-    }
-  }
+  protected FSDataOutputStream output;
 
   @Override
   public void append(Entry entry) throws IOException {
     entry.setCompressionContext(compressionContext);
-    entry.getKey().getBuilder(compressor).
-      
setFollowingKvCount(entry.getEdit().size()).build().writeDelimitedTo(output);
+    
entry.getKey().getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build()
+        .writeDelimitedTo(output);
     for (Cell cell : entry.getEdit().getCells()) {
       // cellEncoder must assume little about the stream, since we write PB 
and cells in turn.
       cellEncoder.write(cell);
     }
+    length.set(output.getPos());
   }
 
   @Override
@@ -137,32 +71,6 @@ public class ProtobufLogWriter extends WriterBase {
     }
   }
 
-  WALTrailer buildWALTrailer(WALTrailer.Builder builder) {
-    return builder.build();
-  }
-
-  private void writeWALTrailer() {
-    try {
-      int trailerSize = 0;
-      if (this.trailer == null) {
-        // use default trailer.
-        LOG.warn("WALTrailer is null. Continuing with default.");
-        this.trailer = buildWALTrailer(WALTrailer.newBuilder());
-        trailerSize = this.trailer.getSerializedSize();
-      } else if ((trailerSize = this.trailer.getSerializedSize()) > 
this.trailerWarnSize) {
-        // continue writing after warning the user.
-        LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum 
size : " +
-          trailerSize + " > " + this.trailerWarnSize);
-      }
-      this.trailer.writeTo(output);
-      output.writeInt(trailerSize);
-      output.write(ProtobufLogReader.PB_WAL_COMPLETE_MAGIC);
-      this.trailerWritten = true;
-    } catch (IOException ioe) {
-      LOG.warn("Failed to write trailer, non-fatal, continuing...", ioe);
-    }
-  }
-
   @Override
   public void sync() throws IOException {
     FSDataOutputStream fsdos = this.output;
@@ -171,21 +79,35 @@ public class ProtobufLogWriter extends WriterBase {
     fsdos.hflush();
   }
 
+  public FSDataOutputStream getStream() {
+    return this.output;
+  }
+
+  @SuppressWarnings("deprecation")
   @Override
-  public long getLength() throws IOException {
-    try {
-      return this.output.getPos();
-    } catch (NullPointerException npe) {
-      // Concurrent close...
-      throw new IOException(npe);
-    }
+  protected void initOutput(FileSystem fs, Path path, boolean overwritable, 
int bufferSize,
+      short replication, long blockSize) throws IOException {
+    this.output = fs.createNonRecursive(path, overwritable, bufferSize, 
replication, blockSize,
+      null);
   }
 
-  public FSDataOutputStream getStream() {
+  @Override
+  protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws 
IOException {
+    output.write(magic);
+    header.writeDelimitedTo(output);
+    return output.getPos();
+  }
+
+  @Override
+  protected OutputStream getOutputStreamForCellEncoder() {
     return this.output;
   }
 
-  void setWALTrailer(WALTrailer walTrailer) {
-    this.trailer = walTrailer;
+  @Override
+  protected long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) 
throws IOException {
+    trailer.writeTo(output);
+    output.writeInt(trailer.getSerializedSize());
+    output.write(magic);
+    return output.getPos();
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
index 7de8367..b5c9a2e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
@@ -18,33 +18,28 @@
 package org.apache.hadoop.hbase.regionserver.wal;
 
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.htrace.Span;
 
 /**
- * A Future on a filesystem sync call.  It given to a client or 'Handler' for 
it to wait on till
- * the sync completes.
- *
- * <p>Handlers coming in call append, append, append, and then do a flush/sync 
of
- * the edits they have appended the WAL before returning. Since sync takes a 
while to
- * complete, we give the Handlers back this sync future to wait on until the
- * actual HDFS sync completes. Meantime this sync future goes across the 
ringbuffer and into a
- * sync runner thread; when it completes, it finishes up the future, the 
handler get or failed
- * check completes and the Handler can then progress.
+ * A Future on a filesystem sync call. It given to a client or 'Handler' for 
it to wait on till the
+ * sync completes.
+ * <p>
+ * Handlers coming in call append, append, append, and then do a flush/sync of 
the edits they have
+ * appended the WAL before returning. Since sync takes a while to complete, we 
give the Handlers
+ * back this sync future to wait on until the actual HDFS sync completes. 
Meantime this sync future
+ * goes across a queue and is handled by a background thread; when it 
completes, it finishes up the
+ * future, the handler get or failed check completes and the Handler can then 
progress.
  * <p>
- * This is just a partial implementation of Future; we just implement get and
- * failure.  Unimplemented methods throw {@link UnsupportedOperationException}.
+ * This is just a partial implementation of Future; we just implement get and 
failure.
  * <p>
- * There is not a one-to-one correlation between dfs sync invocations and
- * instances of this class. A single dfs sync call may complete and mark many
- * SyncFutures as done; i.e. we batch up sync calls rather than do a dfs sync
- * call every time a Handler asks for it.
+ * There is not a one-to-one correlation between dfs sync invocations and 
instances of this class. A
+ * single dfs sync call may complete and mark many SyncFutures as done; i.e. 
we batch up sync calls
+ * rather than do a dfs sync call every time a Handler asks for it.
  * <p>
- * SyncFutures are immutable but recycled. Call #reset(long, Span) before use 
even
- * if it the first time, start the sync, then park the 'hitched' thread on a 
call to
- * #get().
+ * SyncFutures are immutable but recycled. Call #reset(long, Span) before use 
even if it the first
+ * time, start the sync, then park the 'hitched' thread on a call to #get().
  */
 @InterfaceAudience.Private
 class SyncFuture {
@@ -54,17 +49,17 @@ class SyncFuture {
   private static final long NOT_DONE = 0;
 
   /**
-   * The sequence at which we were added to the ring buffer.
+   * The transaction id of this operation, monotonically increases.
    */
-  private long ringBufferSequence;
+  private long txid;
 
   /**
-   * The sequence that was set in here when we were marked done. Should be 
equal
-   * or > ringBufferSequence.  Put this data member into the NOT_DONE state 
while this
-   * class is in use.  But for the first position on construction, let it be 
-1 so we can
-   * immediately call {@link #reset(long, Span)} below and it will work.
+   * The transaction id that was set in here when we were marked done. Should 
be equal or > txnId.
+   * Put this data member into the NOT_DONE state while this class is in use. 
But for the first
+   * position on construction, let it be -1 so we can immediately call {@link 
#reset(long, Span)}
+   * below and it will work.
    */
-  private long doneSequence = -1;
+  private long doneTxid = -1;
 
   /**
    * If error, the associated throwable. Set when the future is 'done'.
@@ -79,80 +74,83 @@ class SyncFuture {
   private Span span;
 
   /**
-   * Call this method to clear old usage and get it ready for new deploy. Call
-   * this method even if it is being used for the first time.
-   *
-   * @param sequence sequenceId from this Future's position in the RingBuffer
+   * Call this method to clear old usage and get it ready for new deploy. Call 
this method even if
+   * it is being used for the first time.
+   * @param txnId the new transaction id
    * @return this
    */
-  synchronized SyncFuture reset(final long sequence) {
-    return reset(sequence, null);
+  synchronized SyncFuture reset(final long txnId) {
+    return reset(txnId, null);
   }
 
   /**
-   * Call this method to clear old usage and get it ready for new deploy. Call
-   * this method even if it is being used for the first time.
-   *
+   * Call this method to clear old usage and get it ready for new deploy. Call 
this method even if
+   * it is being used for the first time.
    * @param sequence sequenceId from this Future's position in the RingBuffer
-   * @param span curren span, detached from caller. Don't forget to attach it 
when
-   *             resuming after a call to {@link #get()}.
+   * @param span curren span, detached from caller. Don't forget to attach it 
when resuming after a
+   *          call to {@link #get()}.
    * @return this
    */
-  synchronized SyncFuture reset(final long sequence, Span span) {
-    if (t != null && t != Thread.currentThread()) throw new 
IllegalStateException();
+  synchronized SyncFuture reset(final long txnId, Span span) {
+    if (t != null && t != Thread.currentThread()) {
+      throw new IllegalStateException();
+    }
     t = Thread.currentThread();
-    if (!isDone()) throw new IllegalStateException("" + sequence + " " + 
Thread.currentThread());
-    this.doneSequence = NOT_DONE;
-    this.ringBufferSequence = sequence;
+    if (!isDone()) {
+      throw new IllegalStateException("" + txnId + " " + 
Thread.currentThread());
+    }
+    this.doneTxid = NOT_DONE;
+    this.txid = txnId;
     this.span = span;
     return this;
   }
 
   @Override
   public synchronized String toString() {
-    return "done=" + isDone() + ", ringBufferSequence=" + 
this.ringBufferSequence;
+    return "done=" + isDone() + ", txid=" + this.txid;
   }
 
-  synchronized long getRingBufferSequence() {
-    return this.ringBufferSequence;
+  synchronized long getTxid() {
+    return this.txid;
   }
 
   /**
-   * Retrieve the {@code span} instance from this Future. EventHandler calls
-   * this method to continue the span. Thread waiting on this Future musn't 
call
-   * this method until AFTER calling {@link #get()} and the future has been
-   * released back to the originating thread.
+   * Retrieve the {@code span} instance from this Future. EventHandler calls 
this method to continue
+   * the span. Thread waiting on this Future musn't call this method until 
AFTER calling
+   * {@link #get()} and the future has been released back to the originating 
thread.
    */
   synchronized Span getSpan() {
     return this.span;
   }
 
   /**
-   * Used to re-attach a {@code span} to the Future. Called by the EventHandler
-   * after a it has completed processing and detached the span from its scope.
+   * Used to re-attach a {@code span} to the Future. Called by the 
EventHandler after a it has
+   * completed processing and detached the span from its scope.
    */
   synchronized void setSpan(Span span) {
     this.span = span;
   }
 
   /**
-   * @param sequence Sync sequence at which this future 'completed'.
-   * @param t Can be null.  Set if we are 'completing' on error (and this 't' 
is the error).
-   * @return True if we successfully marked this outstanding future as 
completed/done.
-   * Returns false if this future is already 'done' when this method called.
+   * @param txid the transaction id at which this future 'completed'.
+   * @param t Can be null. Set if we are 'completing' on error (and this 't' 
is the error).
+   * @return True if we successfully marked this outstanding future as 
completed/done. Returns false
+   *         if this future is already 'done' when this method called.
    */
-  synchronized boolean done(final long sequence, final Throwable t) {
-    if (isDone()) return false;
+  synchronized boolean done(final long txid, final Throwable t) {
+    if (isDone()) {
+      return false;
+    }
     this.throwable = t;
-    if (sequence < this.ringBufferSequence) {
+    if (txid < this.txid) {
       // Something badly wrong.
       if (throwable == null) {
-        this.throwable = new IllegalStateException("sequence=" + sequence +
-          ", ringBufferSequence=" + this.ringBufferSequence);
+        this.throwable =
+            new IllegalStateException("done txid=" + txid + ", my txid=" + 
this.txid);
       }
     }
     // Mark done.
-    this.doneSequence = sequence;
+    this.doneTxid = txid;
     // Wake up waiting threads.
     notify();
     return true;
@@ -166,21 +164,14 @@ class SyncFuture {
     while (!isDone()) {
       wait(1000);
     }
-    if (this.throwable != null) throw new ExecutionException(this.throwable);
-    return this.doneSequence;
-  }
-
-  public Long get(long timeout, TimeUnit unit)
-  throws InterruptedException, ExecutionException {
-    throw new UnsupportedOperationException();
-  }
-
-  public boolean isCancelled() {
-    throw new UnsupportedOperationException();
+    if (this.throwable != null) {
+      throw new ExecutionException(this.throwable);
+    }
+    return this.doneTxid;
   }
 
   synchronized boolean isDone() {
-    return this.doneSequence != NOT_DONE;
+    return this.doneTxid != NOT_DONE;
   }
 
   synchronized boolean isThrowable() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java
deleted file mode 100644
index 8188e02..0000000
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver.wal;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.io.util.LRUDictionary;
-import org.apache.hadoop.hbase.util.FSUtils;
-
-import org.apache.hadoop.hbase.wal.DefaultWALProvider;
-
-/**
- * Context used by our wal dictionary compressor. Null if we're not to do our
- * custom dictionary compression.
- */
[email protected]
-public abstract class WriterBase implements DefaultWALProvider.Writer {
-
-  protected CompressionContext compressionContext;
-  protected Configuration conf;
-
-  @Override
-  public void init(FileSystem fs, Path path, Configuration conf, boolean 
overwritable) throws IOException {
-    this.conf = conf;
-  }
-
-  public boolean initializeCompressionContext(Configuration conf, Path path) 
throws IOException {
-    boolean doCompress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, 
false);
-    if (doCompress) {
-      try {
-        this.compressionContext = new CompressionContext(LRUDictionary.class,
-            FSUtils.isRecoveredEdits(path), conf.getBoolean(
-                CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true));
-      } catch (Exception e) {
-        throw new IOException("Failed to initiate CompressionContext", e);
-      }
-    }
-    return doCompress;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java
index ea71701..32fe48b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java
@@ -72,6 +72,7 @@ import 
org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
 import org.apache.hadoop.io.EnumSetWritable;
+import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.DataChecksum;
 
@@ -417,7 +418,6 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
     public boolean progress() {
       return DFS_CLIENT_ADAPTOR.isClientRunning(client);
     }
-
   }
 
   static {
@@ -579,6 +579,18 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
     return futureList;
   }
 
+  /**
+   * Exception other than RemoteException thrown when calling create on 
namenode
+   */
+  public static class NameNodeException extends IOException {
+
+    private static final long serialVersionUID = 3143237406477095390L;
+
+    public NameNodeException(Throwable cause) {
+      super(cause);
+    }
+  }
+
   private static FanOutOneBlockAsyncDFSOutput 
createOutput(DistributedFileSystem dfs, String src,
       boolean overwrite, boolean createParent, short replication, long 
blockSize,
       EventLoop eventLoop) throws IOException {
@@ -587,11 +599,20 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
     DFSClient client = dfs.getClient();
     String clientName = client.getClientName();
     ClientProtocol namenode = client.getNamenode();
-    HdfsFileStatus stat = FILE_CREATER.create(namenode, src,
-      FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), 
clientName,
-      new EnumSetWritable<CreateFlag>(
-          overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)),
-      createParent, replication, blockSize);
+    HdfsFileStatus stat;
+    try {
+      stat = FILE_CREATER.create(namenode, src,
+        FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), 
clientName,
+        new EnumSetWritable<CreateFlag>(
+            overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)),
+        createParent, replication, blockSize);
+    } catch (Exception e) {
+      if (e instanceof RemoteException) {
+        throw (RemoteException) e;
+      } else {
+        throw new NameNodeException(e);
+      }
+    }
     beginFileLease(client, src, stat.getFileId());
     boolean succ = false;
     LocatedBlock locatedBlock = null;
@@ -656,6 +677,13 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
     }.resolve(dfs, f);
   }
 
+  public static boolean shouldRetryCreate(RemoteException e) {
+    // RetryStartFileException is introduced in HDFS 2.6+, so here we can only 
use the class name.
+    // For exceptions other than this, we just throw it out. This is same with
+    // DFSOutputStream.newStreamForCreate.
+    return e.getClassName().endsWith("RetryStartFileException");
+  }
+
   static void completeFile(DFSClient client, ClientProtocol namenode, String 
src, String clientName,
       ExtendedBlock block, long fileId) {
     for (int retry = 0;; retry++) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
new file mode 100644
index 0000000..2f5c299
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -0,0 +1,361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Base class of a WAL Provider that returns a single thread safe WAL that 
writes to HDFS. By
+ * default, this implementation picks a directory in HDFS based on a 
combination of
+ * <ul>
+ * <li>the HBase root directory
+ * <li>HConstants.HREGION_LOGDIR_NAME
+ * <li>the given factory's factoryId (usually identifying the regionserver by 
host:port)
+ * </ul>
+ * It also uses the providerId to differentiate among files.
+ */
[email protected]
[email protected]
+public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> 
implements WALProvider {
+
+  private static final Log LOG = 
LogFactory.getLog(AbstractFSWALProvider.class);
+
+  // Only public so classes back in regionserver.wal can access
+  public interface Reader extends WAL.Reader {
+    /**
+     * @param fs File system.
+     * @param path Path.
+     * @param c Configuration.
+     * @param s Input stream that may have been pre-opened by the caller; may 
be null.
+     */
+    void init(FileSystem fs, Path path, Configuration c, FSDataInputStream s) 
throws IOException;
+  }
+
+  protected volatile T wal;
+  protected WALFactory factory = null;
+  protected Configuration conf = null;
+  protected List<WALActionsListener> listeners = null;
+  protected String providerId = null;
+  protected AtomicBoolean initialized = new AtomicBoolean(false);
+  // for default wal provider, logPrefix won't change
+  protected String logPrefix = null;
+
+  /**
+   * we synchronized on walCreateLock to prevent wal recreation in different 
threads
+   */
+  private final Object walCreateLock = new Object();
+
+  /**
+   * @param factory factory that made us, identity used for FS layout. may not 
be null
+   * @param conf may not be null
+   * @param listeners may be null
+   * @param providerId differentiate between providers from one facotry, used 
for FS layout. may be
+   *          null
+   */
+  @Override
+  public void init(WALFactory factory, Configuration conf, 
List<WALActionsListener> listeners,
+      String providerId) throws IOException {
+    if (!initialized.compareAndSet(false, true)) {
+      throw new IllegalStateException("WALProvider.init should only be called 
once.");
+    }
+    this.factory = factory;
+    this.conf = conf;
+    this.listeners = listeners;
+    this.providerId = providerId;
+    // get log prefix
+    StringBuilder sb = new StringBuilder().append(factory.factoryId);
+    if (providerId != null) {
+      if (providerId.startsWith(WAL_FILE_NAME_DELIMITER)) {
+        sb.append(providerId);
+      } else {
+        sb.append(WAL_FILE_NAME_DELIMITER).append(providerId);
+      }
+    }
+    logPrefix = sb.toString();
+    doInit(conf);
+  }
+
+  @Override
+  public WAL getWAL(byte[] identifier, byte[] namespace) throws IOException {
+    T walCopy = wal;
+    if (walCopy == null) {
+      // only lock when need to create wal, and need to lock since
+      // creating hlog on fs is time consuming
+      synchronized (walCreateLock) {
+        walCopy = wal;
+        if (walCopy == null) {
+          walCopy = createWAL();
+          wal = walCopy;
+        }
+      }
+    }
+    return walCopy;
+  }
+
+  protected abstract T createWAL() throws IOException;
+
+  protected abstract void doInit(Configuration conf) throws IOException;
+
+  @Override
+  public void shutdown() throws IOException {
+    T log = this.wal;
+    if (log != null) {
+      log.shutdown();
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    T log = this.wal;
+    if (log != null) {
+      log.close();
+    }
+  }
+
+  /**
+   * iff the given WALFactory is using the DefaultWALProvider for meta and/or 
non-meta, count the
+   * number of files (rolled and active). if either of them aren't, count 0 
for that provider.
+   */
+  @Override
+  public long getNumLogFiles() {
+    T log = this.wal;
+    return log == null ? 0 : log.getNumLogFiles();
+  }
+
+  /**
+   * iff the given WALFactory is using the DefaultWALProvider for meta and/or 
non-meta, count the
+   * size of files (rolled and active). if either of them aren't, count 0 for 
that provider.
+   */
+  @Override
+  public long getLogFileSize() {
+    T log = this.wal;
+    return log == null ? 0 : log.getLogFileSize();
+  }
+
+  /**
+   * returns the number of rolled WAL files.
+   */
+  @VisibleForTesting
+  public static int getNumRolledLogFiles(WAL wal) {
+    return ((AbstractFSWAL<?>) wal).getNumRolledLogFiles();
+  }
+
+  /**
+   * return the current filename from the current wal.
+   */
+  @VisibleForTesting
+  public static Path getCurrentFileName(final WAL wal) {
+    return ((AbstractFSWAL<?>) wal).getCurrentFileName();
+  }
+
+  /**
+   * request a log roll, but don't actually do it.
+   */
+  @VisibleForTesting
+  static void requestLogRoll(final WAL wal) {
+    ((AbstractFSWAL<?>) wal).requestLogRoll();
+  }
+
+  // should be package private; more visible for use in AbstractFSWAL
+  public static final String WAL_FILE_NAME_DELIMITER = ".";
+  /** The hbase:meta region's WAL filename extension */
+  @VisibleForTesting
+  public static final String META_WAL_PROVIDER_ID = ".meta";
+  static final String DEFAULT_PROVIDER_ID = "default";
+
+  // Implementation details that currently leak in tests or elsewhere follow
+  /** File Extension used while splitting an WAL into regions (HBASE-2312) */
+  public static final String SPLITTING_EXT = "-splitting";
+
+  /**
+   * It returns the file create timestamp from the file name. For name format 
see
+   * {@link #validateWALFilename(String)} public until remaining tests move to 
o.a.h.h.wal
+   * @param wal must not be null
+   * @return the file number that is part of the WAL file name
+   */
+  @VisibleForTesting
+  public static long extractFileNumFromWAL(final WAL wal) {
+    final Path walName = ((AbstractFSWAL<?>) wal).getCurrentFileName();
+    if (walName == null) {
+      throw new IllegalArgumentException("The WAL path couldn't be null");
+    }
+    final String[] walPathStrs = walName.toString().split("\\" + 
WAL_FILE_NAME_DELIMITER);
+    return Long.parseLong(walPathStrs[walPathStrs.length - 
(isMetaFile(walName) ? 2 : 1)]);
+  }
+
+  /**
+   * Pattern used to validate a WAL file name see {@link 
#validateWALFilename(String)} for
+   * description.
+   */
+  private static final Pattern pattern = Pattern
+      .compile(".*\\.\\d*(" + META_WAL_PROVIDER_ID + ")*");
+
+  /**
+   * A WAL file name is of the format: &lt;wal-name&gt;{@link 
#WAL_FILE_NAME_DELIMITER}
+   * &lt;file-creation-timestamp&gt;[.meta]. provider-name is usually made up 
of a server-name and a
+   * provider-id
+   * @param filename name of the file to validate
+   * @return <tt>true</tt> if the filename matches an WAL, <tt>false</tt> 
otherwise
+   */
+  public static boolean validateWALFilename(String filename) {
+    return pattern.matcher(filename).matches();
+  }
+
+  /**
+   * Construct the directory name for all WALs on a given server.
+   * @param serverName Server name formatted as described in {@link ServerName}
+   * @return the relative WAL directory name, e.g. 
<code>.logs/1.example.org,60030,12345</code> if
+   *         <code>serverName</code> passed is 
<code>1.example.org,60030,12345</code>
+   */
+  public static String getWALDirectoryName(final String serverName) {
+    StringBuilder dirName = new StringBuilder(HConstants.HREGION_LOGDIR_NAME);
+    dirName.append("/");
+    dirName.append(serverName);
+    return dirName.toString();
+  }
+
+  /**
+   * Pulls a ServerName out of a Path generated according to our layout rules. 
In the below layouts,
+   * this method ignores the format of the logfile component. Current format: 
[base directory for
+   * hbase]/hbase/.logs/ServerName/logfile or [base directory for
+   * hbase]/hbase/.logs/ServerName-splitting/logfile Expected to work for 
individual log files and
+   * server-specific directories.
+   * @return null if it's not a log file. Returns the ServerName of the region 
server that created
+   *         this log file otherwise.
+   */
+  public static ServerName getServerNameFromWALDirectoryName(Configuration 
conf, String path)
+      throws IOException {
+    if (path == null || path.length() <= 
HConstants.HREGION_LOGDIR_NAME.length()) {
+      return null;
+    }
+
+    if (conf == null) {
+      throw new IllegalArgumentException("parameter conf must be set");
+    }
+
+    final String rootDir = conf.get(HConstants.HBASE_DIR);
+    if (rootDir == null || rootDir.isEmpty()) {
+      throw new IllegalArgumentException(HConstants.HBASE_DIR + " key not 
found in conf.");
+    }
+
+    final StringBuilder startPathSB = new StringBuilder(rootDir);
+    if (!rootDir.endsWith("/")) {
+      startPathSB.append('/');
+    }
+    startPathSB.append(HConstants.HREGION_LOGDIR_NAME);
+    if (!HConstants.HREGION_LOGDIR_NAME.endsWith("/")) {
+      startPathSB.append('/');
+    }
+    final String startPath = startPathSB.toString();
+
+    String fullPath;
+    try {
+      fullPath = FileSystem.get(conf).makeQualified(new Path(path)).toString();
+    } catch (IllegalArgumentException e) {
+      LOG.info("Call to makeQualified failed on " + path + " " + 
e.getMessage());
+      return null;
+    }
+
+    if (!fullPath.startsWith(startPath)) {
+      return null;
+    }
+
+    final String serverNameAndFile = fullPath.substring(startPath.length());
+
+    if (serverNameAndFile.indexOf('/') < "a,0,0".length()) {
+      // Either it's a file (not a directory) or it's not a ServerName format
+      return null;
+    }
+
+    Path p = new Path(path);
+    return getServerNameFromWALDirectoryName(p);
+  }
+
+  /**
+   * This function returns region server name from a log file name which is in 
one of the following
+   * formats:
+   * <ul>
+   * <li>hdfs://&lt;name node&gt;/hbase/.logs/&lt;server 
name&gt;-splitting/...</li>
+   * <li>hdfs://&lt;name node&gt;/hbase/.logs/&lt;server name&gt;/...</li>
+   * </ul>
+   * @return null if the passed in logFile isn't a valid WAL file path
+   */
+  public static ServerName getServerNameFromWALDirectoryName(Path logFile) {
+    String logDirName = logFile.getParent().getName();
+    // We were passed the directory and not a file in it.
+    if (logDirName.equals(HConstants.HREGION_LOGDIR_NAME)) {
+      logDirName = logFile.getName();
+    }
+    ServerName serverName = null;
+    if (logDirName.endsWith(SPLITTING_EXT)) {
+      logDirName = logDirName.substring(0, logDirName.length() - 
SPLITTING_EXT.length());
+    }
+    try {
+      serverName = ServerName.parseServerName(logDirName);
+    } catch (IllegalArgumentException ex) {
+      serverName = null;
+      LOG.warn("Cannot parse a server name from path=" + logFile + "; " + 
ex.getMessage());
+    }
+    if (serverName != null && serverName.getStartcode() < 0) {
+      LOG.warn("Invalid log file path=" + logFile);
+      serverName = null;
+    }
+    return serverName;
+  }
+
+  public static boolean isMetaFile(Path p) {
+    return isMetaFile(p.getName());
+  }
+
+  public static boolean isMetaFile(String p) {
+    if (p != null && p.endsWith(META_WAL_PROVIDER_ID)) {
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Get prefix of the log from its name, assuming WAL name in format of
+   * log_prefix.filenumber.log_suffix
+   * @param name Name of the WAL to parse
+   * @return prefix of the log
+   * @see AbstractFSWAL#getCurrentFileName()
+   */
+  public static String getWALPrefixFromWALName(String name) {
+    int endIndex = name.replaceAll(META_WAL_PROVIDER_ID, "").lastIndexOf(".");
+    return name.substring(0, endIndex);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
new file mode 100644
index 0000000..bc142ce
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
+import org.apache.hadoop.hbase.regionserver.wal.AsyncProtobufLogWriter;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Threads;
+
+import io.netty.channel.EventLoop;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+
+/**
+ * A WAL provider that use {@link AsyncFSWAL}.
+ */
[email protected]
[email protected]
+public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
+
+  // Only public so classes back in regionserver.wal can access
+  public interface AsyncWriter extends WALProvider.AsyncWriter {
+    void init(FileSystem fs, Path path, Configuration c, boolean overwritable) 
throws IOException;
+  }
+
+  private EventLoopGroup eventLoopGroup = null;
+
+  @Override
+  protected AsyncFSWAL createWAL() throws IOException {
+    return new AsyncFSWAL(FileSystem.get(conf), FSUtils.getRootDir(conf),
+        getWALDirectoryName(factory.factoryId), 
HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners,
+        true, logPrefix, META_WAL_PROVIDER_ID.equals(providerId) ? 
META_WAL_PROVIDER_ID : null,
+        eventLoopGroup.next());
+  }
+
+  @Override
+  protected void doInit(Configuration conf) throws IOException {
+    eventLoopGroup = new NioEventLoopGroup(1, 
Threads.newDaemonThreadFactory("AsyncFSWAL"));
+  }
+
+  /**
+   * public because of AsyncFSWAL. Should be package-private
+   */
+  public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem 
fs, Path path,
+      boolean overwritable, EventLoop eventLoop) throws IOException {
+    AsyncWriter writer = new AsyncProtobufLogWriter(eventLoop);
+    writer.init(fs, path, conf, overwritable);
+    return writer;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
index 027e7a2..9f0d0ea 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
@@ -19,357 +19,42 @@
 package org.apache.hadoop.hbase.wal;
 
 import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.regex.Pattern;
-
-import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.util.FSUtils;
-
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
 // imports for things that haven't moved from regionserver.wal yet.
 import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
 import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
-import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.util.FSUtils;
 
 /**
- * A WAL Provider that returns a single thread safe WAL that writes to HDFS.
- * By default, this implementation picks a directory in HDFS based on a 
combination of
- * <ul>
- *   <li>the HBase root directory
- *   <li>HConstants.HREGION_LOGDIR_NAME
- *   <li>the given factory's factoryId (usually identifying the regionserver 
by host:port)
- * </ul>
- * It also uses the providerId to diffentiate among files.
- *
+ * A WAL provider that use {@link FSHLog}.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public class DefaultWALProvider implements WALProvider {
-  private static final Log LOG = LogFactory.getLog(DefaultWALProvider.class);
+public class DefaultWALProvider extends AbstractFSWALProvider<FSHLog> {
 
-  // Only public so classes back in regionserver.wal can access
-  public interface Reader extends WAL.Reader {
-    /**
-     * @param fs File system.
-     * @param path Path.
-     * @param c Configuration.
-     * @param s Input stream that may have been pre-opened by the caller; may 
be null.
-     */
-    void init(FileSystem fs, Path path, Configuration c, FSDataInputStream s) 
throws IOException;
-  }
+  private static final Log LOG = LogFactory.getLog(DefaultWALProvider.class);
 
   // Only public so classes back in regionserver.wal can access
   public interface Writer extends WALProvider.Writer {
     void init(FileSystem fs, Path path, Configuration c, boolean overwritable) 
throws IOException;
   }
 
-  protected volatile FSHLog log = null;
-  private WALFactory factory = null;
-  private Configuration conf = null;
-  private List<WALActionsListener> listeners = null;
-  private String providerId = null;
-  private AtomicBoolean initialized = new AtomicBoolean(false);
-  // for default wal provider, logPrefix won't change
-  private String logPrefix = null;
-
-  /**
-   * we synchronized on walCreateLock to prevent wal recreation in different 
threads
-   */
-  private final Object walCreateLock = new Object();
-
-  /**
-   * @param factory factory that made us, identity used for FS layout. may not 
be null
-   * @param conf may not be null
-   * @param listeners may be null
-   * @param providerId differentiate between providers from one facotry, used 
for FS layout. may be
-   *                   null
-   */
-  @Override
-  public void init(final WALFactory factory, final Configuration conf,
-      final List<WALActionsListener> listeners, String providerId) throws 
IOException {
-    if (!initialized.compareAndSet(false, true)) {
-      throw new IllegalStateException("WALProvider.init should only be called 
once.");
-    }
-    this.factory = factory;
-    this.conf = conf;
-    this.listeners = listeners;
-    this.providerId = providerId;
-    // get log prefix
-    StringBuilder sb = new StringBuilder().append(factory.factoryId);
-    if (providerId != null) {
-      if (providerId.startsWith(WAL_FILE_NAME_DELIMITER)) {
-        sb.append(providerId);
-      } else {
-        sb.append(WAL_FILE_NAME_DELIMITER).append(providerId);
-      }
-    }
-    logPrefix = sb.toString();
-  }
-
-  @Override
-  public WAL getWAL(final byte[] identifier, byte[] namespace) throws 
IOException {
-    if (log == null) {
-      // only lock when need to create wal, and need to lock since
-      // creating hlog on fs is time consuming
-      synchronized (walCreateLock) {
-        if (log == null) {
-          log = new FSHLog(FileSystem.get(conf), FSUtils.getRootDir(conf),
-              getWALDirectoryName(factory.factoryId), 
HConstants.HREGION_OLDLOGDIR_NAME, conf,
-              listeners, true, logPrefix,
-              META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : 
null);
-        }
-      }
-    }
-    return log;
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (log != null) log.close();
-  }
-
-  @Override
-  public void shutdown() throws IOException {
-    if (log != null) log.shutdown();
-  }
-
-  // should be package private; more visible for use in FSHLog
-  public static final String WAL_FILE_NAME_DELIMITER = ".";
-  /** The hbase:meta region's WAL filename extension */
-  @VisibleForTesting
-  public static final String META_WAL_PROVIDER_ID = ".meta";
-  static final String DEFAULT_PROVIDER_ID = "default";
-
-  // Implementation details that currently leak in tests or elsewhere follow
-  /** File Extension used while splitting an WAL into regions (HBASE-2312) */
-  public static final String SPLITTING_EXT = "-splitting";
-
-  /**
-   * iff the given WALFactory is using the DefaultWALProvider for meta and/or 
non-meta,
-   * count the number of files (rolled and active). if either of them aren't, 
count 0
-   * for that provider.
-   */
-  @Override
-  public long getNumLogFiles() {
-    return log == null ? 0 : this.log.getNumLogFiles();
-  }
-
-  /**
-   * iff the given WALFactory is using the DefaultWALProvider for meta and/or 
non-meta,
-   * count the size of files (rolled and active). if either of them aren't, 
count 0
-   * for that provider.
-   */
-  @Override
-  public long getLogFileSize() {
-    return log == null ? 0 : this.log.getLogFileSize();
-  }
-
-  /**
-   * returns the number of rolled WAL files.
-   */
-  @VisibleForTesting
-  public static int getNumRolledLogFiles(WAL wal) {
-    return ((FSHLog)wal).getNumRolledLogFiles();
-  }
-
-  /**
-   * return the current filename from the current wal.
-   */
-  @VisibleForTesting
-  public static Path getCurrentFileName(final WAL wal) {
-    return ((FSHLog)wal).getCurrentFileName();
-  }
-
-  /**
-   * request a log roll, but don't actually do it.
-   */
-  @VisibleForTesting
-  static void requestLogRoll(final WAL wal) {
-    ((FSHLog)wal).requestLogRoll();
-  }
-
-  /**
-   * It returns the file create timestamp from the file name.
-   * For name format see {@link #validateWALFilename(String)}
-   * public until remaining tests move to o.a.h.h.wal
-   * @param wal must not be null
-   * @return the file number that is part of the WAL file name
-   */
-  @VisibleForTesting
-  public static long extractFileNumFromWAL(final WAL wal) {
-    final Path walName = ((FSHLog)wal).getCurrentFileName();
-    if (walName == null) {
-      throw new IllegalArgumentException("The WAL path couldn't be null");
-    }
-    final String[] walPathStrs = walName.toString().split("\\" + 
WAL_FILE_NAME_DELIMITER);
-    return Long.parseLong(walPathStrs[walPathStrs.length - 
(isMetaFile(walName) ? 2:1)]);
-  }
-
-  /**
-   * Pattern used to validate a WAL file name
-   * see {@link #validateWALFilename(String)} for description.
-   */
-  private static final Pattern pattern = 
Pattern.compile(".*\\.\\d*("+META_WAL_PROVIDER_ID+")*");
-
-  /**
-   * A WAL file name is of the format:
-   * &lt;wal-name&gt;{@link 
#WAL_FILE_NAME_DELIMITER}&lt;file-creation-timestamp&gt;[.meta].
-   *
-   * provider-name is usually made up of a server-name and a provider-id
-   *
-   * @param filename name of the file to validate
-   * @return <tt>true</tt> if the filename matches an WAL, <tt>false</tt>
-   *         otherwise
-   */
-  public static boolean validateWALFilename(String filename) {
-    return pattern.matcher(filename).matches();
-  }
-
-  /**
-   * Construct the directory name for all WALs on a given server.
-   *
-   * @param serverName
-   *          Server name formatted as described in {@link ServerName}
-   * @return the relative WAL directory name, e.g.
-   *         <code>.logs/1.example.org,60030,12345</code> if
-   *         <code>serverName</code> passed is
-   *         <code>1.example.org,60030,12345</code>
-   */
-  public static String getWALDirectoryName(final String serverName) {
-    StringBuilder dirName = new StringBuilder(HConstants.HREGION_LOGDIR_NAME);
-    dirName.append("/");
-    dirName.append(serverName);
-    return dirName.toString();
-  }
-
-  /**
-   * Pulls a ServerName out of a Path generated according to our layout rules.
-   *
-   * In the below layouts, this method ignores the format of the logfile 
component.
-   *
-   * Current format:
-   *
-   * [base directory for hbase]/hbase/.logs/ServerName/logfile
-   *      or
-   * [base directory for hbase]/hbase/.logs/ServerName-splitting/logfile
-   *
-   * Expected to work for individual log files and server-specific directories.
-   *
-   * @return null if it's not a log file. Returns the ServerName of the region
-   *         server that created this log file otherwise.
-   */
-  public static ServerName getServerNameFromWALDirectoryName(Configuration 
conf, String path)
-      throws IOException {
-    if (path == null
-        || path.length() <= HConstants.HREGION_LOGDIR_NAME.length()) {
-      return null;
-    }
-
-    if (conf == null) {
-      throw new IllegalArgumentException("parameter conf must be set");
-    }
-
-    final String rootDir = conf.get(HConstants.HBASE_DIR);
-    if (rootDir == null || rootDir.isEmpty()) {
-      throw new IllegalArgumentException(HConstants.HBASE_DIR
-          + " key not found in conf.");
-    }
-
-    final StringBuilder startPathSB = new StringBuilder(rootDir);
-    if (!rootDir.endsWith("/"))
-      startPathSB.append('/');
-    startPathSB.append(HConstants.HREGION_LOGDIR_NAME);
-    if (!HConstants.HREGION_LOGDIR_NAME.endsWith("/"))
-      startPathSB.append('/');
-    final String startPath = startPathSB.toString();
-
-    String fullPath;
-    try {
-      fullPath = FileSystem.get(conf).makeQualified(new Path(path)).toString();
-    } catch (IllegalArgumentException e) {
-      LOG.info("Call to makeQualified failed on " + path + " " + 
e.getMessage());
-      return null;
-    }
-
-    if (!fullPath.startsWith(startPath)) {
-      return null;
-    }
-
-    final String serverNameAndFile = fullPath.substring(startPath.length());
-
-    if (serverNameAndFile.indexOf('/') < "a,0,0".length()) {
-      // Either it's a file (not a directory) or it's not a ServerName format
-      return null;
-    }
-
-    Path p = new Path(path);
-    return getServerNameFromWALDirectoryName(p);
-  }
-
-  /**
-   * This function returns region server name from a log file name which is in 
one of the following
-   * formats:
-   * <ul>
-   *   <li>hdfs://&lt;name node&gt;/hbase/.logs/&lt;server 
name&gt;-splitting/...</li>
-   *   <li>hdfs://&lt;name node&gt;/hbase/.logs/&lt;server name&gt;/...</li>
-   * </ul>
-   * @param logFile
-   * @return null if the passed in logFile isn't a valid WAL file path
-   */
-  public static ServerName getServerNameFromWALDirectoryName(Path logFile) {
-    String logDirName = logFile.getParent().getName();
-    // We were passed the directory and not a file in it.
-    if (logDirName.equals(HConstants.HREGION_LOGDIR_NAME)) {
-      logDirName = logFile.getName();
-    }
-    ServerName serverName = null;
-    if (logDirName.endsWith(SPLITTING_EXT)) {
-      logDirName = logDirName.substring(0, logDirName.length() - 
SPLITTING_EXT.length());
-    }
-    try {
-      serverName = ServerName.parseServerName(logDirName);
-    } catch (IllegalArgumentException ex) {
-      serverName = null;
-      LOG.warn("Cannot parse a server name from path=" + logFile + "; " + 
ex.getMessage());
-    }
-    if (serverName != null && serverName.getStartcode() < 0) {
-      LOG.warn("Invalid log file path=" + logFile);
-      serverName = null;
-    }
-    return serverName;
-  }
-
-  public static boolean isMetaFile(Path p) {
-    return isMetaFile(p.getName());
-  }
-
-  public static boolean isMetaFile(String p) {
-    if (p != null && p.endsWith(META_WAL_PROVIDER_ID)) {
-      return true;
-    }
-    return false;
-  }
-
   /**
    * public because of FSHLog. Should be package-private
    */
   public static Writer createWriter(final Configuration conf, final FileSystem 
fs, final Path path,
-      final boolean overwritable)
-      throws IOException {
+      final boolean overwritable) throws IOException {
     // Configuration already does caching for the Class lookup.
     Class<? extends Writer> logWriterClass = 
conf.getClass("hbase.regionserver.hlog.writer.impl",
-        ProtobufLogWriter.class, Writer.class);
+      ProtobufLogWriter.class, Writer.class);
     try {
       Writer writer = logWriterClass.newInstance();
       writer.init(fs, path, conf, overwritable);
@@ -380,15 +65,14 @@ public class DefaultWALProvider implements WALProvider {
     }
   }
 
-  /**
-   * Get prefix of the log from its name, assuming WAL name in format of
-   * log_prefix.filenumber.log_suffix @see {@link FSHLog#getCurrentFileName()}
-   * @param name Name of the WAL to parse
-   * @return prefix of the log
-   */
-  public static String getWALPrefixFromWALName(String name) {
-    int endIndex = name.replaceAll(META_WAL_PROVIDER_ID, "").lastIndexOf(".");
-    return name.substring(0, endIndex);
+  @Override
+  protected FSHLog createWAL() throws IOException {
+    return new FSHLog(FileSystem.get(conf), FSUtils.getRootDir(conf),
+        getWALDirectoryName(factory.factoryId), 
HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners,
+        true, logPrefix, META_WAL_PROVIDER_ID.equals(providerId) ? 
META_WAL_PROVIDER_ID : null);
   }
 
+  @Override
+  protected void doInit(Configuration conf) throws IOException {
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index c3d4b2c..028c60b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -215,6 +215,10 @@ class DisabledWALProvider implements WALProvider {
     public String toString() {
       return "WAL disabled.";
     }
+
+    @Override
+    public void logRollerExited() {
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index 0b83528..051ce54 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -199,6 +199,13 @@ public interface WAL {
   String toString();
 
   /**
+   * In some WAL implementation, we will write WAL entries to new file if sync 
failed, which means,
+   * the fail recovery is depended on log roller. So here we tell the WAL that 
log roller has
+   * already been exited so the WAL cloud give up recovery.
+   */
+  void logRollerExited();
+
+  /**
    * When outside clients need to consume persisted WALs, they rely on a 
provided
    * Reader.
    */
@@ -268,7 +275,5 @@ public interface WAL {
     public String toString() {
       return this.key + "=" + this.edit;
     }
-
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
index 08f42aa..a2761df 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
@@ -76,7 +76,8 @@ public class WALFactory {
   static enum Providers {
     defaultProvider(DefaultWALProvider.class),
     filesystem(DefaultWALProvider.class),
-    multiwal(RegionGroupingProvider.class);
+    multiwal(RegionGroupingProvider.class),
+    asyncfs(AsyncFSWALProvider.class);
 
     Class<? extends WALProvider> clazz;
     Providers(Class<? extends WALProvider> clazz) {
@@ -350,9 +351,10 @@ public class WALFactory {
 
   /**
    * Create a writer for the WAL.
+   * <p>
    * should be package-private. public only for tests and
    * {@link org.apache.hadoop.hbase.regionserver.wal.Compressor}
-   * @return A WAL writer.  Close when done with it.
+   * @return A WAL writer. Close when done with it.
    * @throws IOException
    */
   public Writer createWALWriter(final FileSystem fs, final Path path) throws 
IOException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
index 2c500dc..ad79485 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
@@ -20,11 +20,11 @@ package org.apache.hadoop.hbase.wal;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.nio.channels.CompletionHandler;
 import java.util.List;
 
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 // imports for things that haven't moved from regionserver.wal yet.
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 
@@ -80,6 +80,12 @@ public interface WALProvider {
     long getLength() throws IOException;
   }
 
+  interface AsyncWriter extends Closeable {
+    <A> void sync(CompletionHandler<Long, A> handler, A attachment);
+    void append(WAL.Entry entry);
+    long getLength();
+  }
+
   /**
    * Get number of the log files this provider is managing
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java
new file mode 100644
index 0000000..7abdef9
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java
@@ -0,0 +1,332 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+/**
+ * Test log deletion as logs are rolled.
+ */
+public abstract class AbstractTestLogRolling  {
+  private static final Log LOG = 
LogFactory.getLog(AbstractTestLogRolling.class);
+  protected HRegionServer server;
+  protected String tableName;
+  protected byte[] value;
+  protected FileSystem fs;
+  protected MiniDFSCluster dfsCluster;
+  protected Admin admin;
+  protected MiniHBaseCluster cluster;
+  protected static final HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+  @Rule public final TestName name = new TestName();
+
+  public AbstractTestLogRolling()  {
+    this.server = null;
+    this.tableName = null;
+
+    String className = this.getClass().getName();
+    StringBuilder v = new StringBuilder(className);
+    while (v.length() < 1000) {
+      v.append(className);
+    }
+    this.value = Bytes.toBytes(v.toString());
+  }
+
+  // Need to override this setup so we can edit the config before it gets sent
+  // to the HDFS & HBase cluster startup.
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+
+
+    /**** configuration for testLogRolling ****/
+    // Force a region split after every 768KB
+    TEST_UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE, 768L 
* 1024L);
+
+    // We roll the log after every 32 writes
+    TEST_UTIL.getConfiguration().setInt("hbase.regionserver.maxlogentries", 
32);
+
+    
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.logroll.errors.tolerated",
 2);
+    TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000);
+
+    // For less frequently updated regions flush after every 2 flushes
+    
TEST_UTIL.getConfiguration().setInt("hbase.hregion.memstore.optionalflushcount",
 2);
+
+    // We flush the cache after every 8192 bytes
+    TEST_UTIL.getConfiguration().setInt(
+        HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 8192);
+
+    // Increase the amount of time between client retries
+    TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 10 * 1000);
+
+    // Reduce thread wake frequency so that other threads can get
+    // a chance to run.
+    TEST_UTIL.getConfiguration().setInt(HConstants.THREAD_WAKE_FREQUENCY, 2 * 
1000);
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    TEST_UTIL.startMiniCluster(1, 1, 2);
+
+    cluster = TEST_UTIL.getHBaseCluster();
+    dfsCluster = TEST_UTIL.getDFSCluster();
+    fs = TEST_UTIL.getTestFileSystem();
+    admin = TEST_UTIL.getHBaseAdmin();
+
+    // disable region rebalancing (interferes with log watching)
+    cluster.getMaster().balanceSwitch(false);
+  }
+
+  @After
+  public void tearDown() throws Exception  {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  protected void startAndWriteData() throws IOException, InterruptedException {
+    // When the hbase:meta table can be opened, the region servers are running
+    TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME);
+    this.server = cluster.getRegionServerThreads().get(0).getRegionServer();
+
+    Table table = createTestTable(this.tableName);
+
+    server = TEST_UTIL.getRSForFirstRegionInTable(table.getName());
+    for (int i = 1; i <= 256; i++) {    // 256 writes should cause 8 log rolls
+      doPut(table, i);
+      if (i % 32 == 0) {
+        // After every 32 writes sleep to let the log roller run
+        try {
+          Thread.sleep(2000);
+        } catch (InterruptedException e) {
+          // continue
+        }
+      }
+    }
+  }
+
+  /**
+   * Tests that log rolling doesn't hang when no data is written.
+   */
+  @Test(timeout=120000)
+  public void testLogRollOnNothingWritten() throws Exception {
+    final Configuration conf = TEST_UTIL.getConfiguration();
+    final WALFactory wals = new WALFactory(conf, null,
+        ServerName.valueOf("test.com",8080, 1).toString());
+    final WAL newLog = wals.getWAL(new byte[]{}, null);
+    try {
+      // Now roll the log before we write anything.
+      newLog.rollWriter(true);
+    } finally {
+      wals.close();
+    }
+  }
+
+  /**
+   * Tests that logs are deleted
+   * @throws IOException
+   * @throws org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException
+   */
+  @Test
+  public void testLogRolling() throws Exception {
+    this.tableName = getName();
+    // TODO: Why does this write data take for ever?
+    startAndWriteData();
+    HRegionInfo region =
+        
server.getOnlineRegions(TableName.valueOf(tableName)).get(0).getRegionInfo();
+    final WAL log = server.getWAL(region);
+    LOG.info("after writing there are " + 
DefaultWALProvider.getNumRolledLogFiles(log) +
+        " log files");
+
+      // flush all regions
+      for (Region r: server.getOnlineRegionsLocalContext()) {
+        r.flush(true);
+      }
+
+      // Now roll the log
+      log.rollWriter();
+
+    int count = DefaultWALProvider.getNumRolledLogFiles(log);
+    LOG.info("after flushing all regions and rolling logs there are " + count 
+ " log files");
+      assertTrue(("actual count: " + count), count <= 2);
+  }
+
+  protected String getName() {
+    return "TestLogRolling-" + name.getMethodName();
+  }
+
+  void writeData(Table table, int rownum) throws IOException {
+    doPut(table, rownum);
+
+    // sleep to let the log roller run (if it needs to)
+    try {
+      Thread.sleep(2000);
+    } catch (InterruptedException e) {
+      // continue
+    }
+  }
+
+  void validateData(Table table, int rownum) throws IOException {
+    String row = "row" + String.format("%1$04d", rownum);
+    Get get = new Get(Bytes.toBytes(row));
+    get.addFamily(HConstants.CATALOG_FAMILY);
+    Result result = table.get(get);
+    assertTrue(result.size() == 1);
+    assertTrue(Bytes.equals(value,
+                result.getValue(HConstants.CATALOG_FAMILY, null)));
+    LOG.info("Validated row " + row);
+  }
+
+  void batchWriteAndWait(Table table, final FSHLog log, int start, boolean 
expect, int timeout)
+      throws IOException {
+    for (int i = 0; i < 10; i++) {
+      Put put = new Put(Bytes.toBytes("row"
+          + String.format("%1$04d", (start + i))));
+      put.addColumn(HConstants.CATALOG_FAMILY, null, value);
+      table.put(put);
+    }
+    Put tmpPut = new Put(Bytes.toBytes("tmprow"));
+    tmpPut.addColumn(HConstants.CATALOG_FAMILY, null, value);
+    long startTime = System.currentTimeMillis();
+    long remaining = timeout;
+    while (remaining > 0) {
+      if (log.isLowReplicationRollEnabled() == expect) {
+        break;
+      } else {
+        // Trigger calling FSHlog#checkLowReplication()
+        table.put(tmpPut);
+        try {
+          Thread.sleep(200);
+        } catch (InterruptedException e) {
+          // continue
+        }
+        remaining = timeout - (System.currentTimeMillis() - startTime);
+      }
+    }
+  }
+
+  /**
+   * Tests that logs are deleted when some region has a compaction
+   * record in WAL and no other records. See HBASE-8597.
+   */
+  @Test
+  public void testCompactionRecordDoesntBlockRolling() throws Exception {
+    Table table = null;
+
+    // When the hbase:meta table can be opened, the region servers are running
+    Table t = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME);
+    try {
+      table = createTestTable(getName());
+
+      server = TEST_UTIL.getRSForFirstRegionInTable(table.getName());
+      Region region = server.getOnlineRegions(table.getName()).get(0);
+      final WAL log = server.getWAL(region.getRegionInfo());
+      Store s = region.getStore(HConstants.CATALOG_FAMILY);
+
+      //have to flush namespace to ensure it doesn't affect wall tests
+      admin.flush(TableName.NAMESPACE_TABLE_NAME);
+
+      // Put some stuff into table, to make sure we have some files to compact.
+      for (int i = 1; i <= 2; ++i) {
+        doPut(table, i);
+        admin.flush(table.getName());
+      }
+      doPut(table, 3); // don't flush yet, or compaction might trigger before 
we roll WAL
+      assertEquals("Should have no WAL after initial writes", 0,
+          DefaultWALProvider.getNumRolledLogFiles(log));
+      assertEquals(2, s.getStorefilesCount());
+
+      // Roll the log and compact table, to have compaction record in the 2nd 
WAL.
+      log.rollWriter();
+      assertEquals("Should have WAL; one table is not flushed", 1,
+          DefaultWALProvider.getNumRolledLogFiles(log));
+      admin.flush(table.getName());
+      region.compact(false);
+      // Wait for compaction in case if flush triggered it before us.
+      Assert.assertNotNull(s);
+      for (int waitTime = 3000; s.getStorefilesCount() > 1 && waitTime > 0; 
waitTime -= 200) {
+        Threads.sleepWithoutInterrupt(200);
+      }
+      assertEquals("Compaction didn't happen", 1, s.getStorefilesCount());
+
+      // Write some value to the table so the WAL cannot be deleted until 
table is flushed.
+      doPut(table, 0); // Now 2nd WAL will have both compaction and put record 
for table.
+      log.rollWriter(); // 1st WAL deleted, 2nd not deleted yet.
+      assertEquals("Should have WAL; one table is not flushed", 1,
+          DefaultWALProvider.getNumRolledLogFiles(log));
+
+      // Flush table to make latest WAL obsolete; write another record, and 
roll again.
+      admin.flush(table.getName());
+      doPut(table, 1);
+      log.rollWriter(); // Now 2nd WAL is deleted and 3rd is added.
+      assertEquals("Should have 1 WALs at the end", 1,
+          DefaultWALProvider.getNumRolledLogFiles(log));
+    } finally {
+      if (t != null) t.close();
+      if (table != null) table.close();
+    }
+  }
+
+  protected void doPut(Table table, int i) throws IOException {
+    Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i)));
+    put.addColumn(HConstants.CATALOG_FAMILY, null, value);
+    table.put(put);
+  }
+
+  protected Table createTestTable(String tableName) throws IOException {
+    // Create the test table and open it
+    HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
+    desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
+    admin.createTable(desc);
+    return TEST_UTIL.getConnection().getTable(desc.getTableName());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java
new file mode 100644
index 0000000..a4267a0
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java
@@ -0,0 +1,209 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+/**
+ * WAL tests that can be reused across providers.
+ */
+public abstract class AbstractTestProtobufLog<W extends Closeable> {
+  protected final static HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+
+  protected FileSystem fs;
+  protected Path dir;
+  protected WALFactory wals;
+
+  @Rule
+  public final TestName currentTest = new TestName();
+
+  @Before
+  public void setUp() throws Exception {
+    fs = TEST_UTIL.getDFSCluster().getFileSystem();
+    dir = new Path(TEST_UTIL.createRootDir(), currentTest.getMethodName());
+    wals = new WALFactory(TEST_UTIL.getConfiguration(), null, 
currentTest.getMethodName());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    wals.close();
+    FileStatus[] entries = fs.listStatus(new Path("/"));
+    for (FileStatus dir : entries) {
+      fs.delete(dir.getPath(), true);
+    }
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // Make block sizes small.
+    TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
+    // needed for testAppendClose()
+    TEST_UTIL.getConfiguration().setBoolean("dfs.support.broken.append", true);
+    TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
+    // quicker heartbeat interval for faster DN death notification
+    
TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 
5000);
+    TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
+    TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000);
+
+    // faster failover with cluster.shutdown();fs.close() idiom
+    TEST_UTIL.getConfiguration()
+        .setInt("hbase.ipc.client.connect.max.retries", 1);
+    TEST_UTIL.getConfiguration().setInt(
+        "dfs.client.block.recovery.retries", 1);
+    TEST_UTIL.getConfiguration().setInt(
+      "hbase.ipc.client.connection.maxidletime", 500);
+    TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
+        SampleRegionWALObserver.class.getName());
+    TEST_UTIL.startMiniDFSCluster(3);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * Reads the WAL with and without WALTrailer.
+   * @throws IOException
+   */
+  @Test
+  public void testWALTrailer() throws IOException {
+    // read With trailer.
+    doRead(true);
+    // read without trailer
+    doRead(false);
+  }
+
+  /**
+   * Appends entries in the WAL and reads it.
+   * @param withTrailer If 'withTrailer' is true, it calls a close on the 
WALwriter before reading
+   *          so that a trailer is appended to the WAL. Otherwise, it starts 
reading after the sync
+   *          call. This means that reader is not aware of the trailer. In 
this scenario, if the
+   *          reader tries to read the trailer in its next() call, it returns 
false from
+   *          ProtoBufLogReader.
+   * @throws IOException
+   */
+  private void doRead(boolean withTrailer) throws IOException {
+    final int columnCount = 5;
+    final int recordCount = 5;
+    final TableName tableName =
+        TableName.valueOf("tablename");
+    final byte[] row = Bytes.toBytes("row");
+    long timestamp = System.currentTimeMillis();
+    Path path = new Path(dir, "tempwal");
+    // delete the log if already exists, for test only
+    fs.delete(path, true);
+    W writer = null;
+    ProtobufLogReader reader = null;
+    try {
+      HRegionInfo hri = new HRegionInfo(tableName,
+          HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+      HTableDescriptor htd = new HTableDescriptor(tableName);
+      fs.mkdirs(dir);
+      // Write log in pb format.
+      writer = createWriter(path);
+      for (int i = 0; i < recordCount; ++i) {
+        WALKey key = new WALKey(
+            hri.getEncodedNameAsBytes(), tableName, i, timestamp, 
HConstants.DEFAULT_CLUSTER_ID);
+        WALEdit edit = new WALEdit();
+        for (int j = 0; j < columnCount; ++j) {
+          if (i == 0) {
+            htd.addFamily(new HColumnDescriptor("column" + j));
+          }
+          String value = i + "" + j;
+          edit.add(new KeyValue(row, row, row, timestamp, 
Bytes.toBytes(value)));
+        }
+        append(writer, new WAL.Entry(key, edit));
+      }
+      sync(writer);
+      if (withTrailer) writer.close();
+
+      // Now read the log using standard means.
+      reader = (ProtobufLogReader) wals.createReader(fs, path);
+      if (withTrailer) {
+        assertNotNull(reader.trailer);
+      } else {
+        assertNull(reader.trailer);
+      }
+      for (int i = 0; i < recordCount; ++i) {
+        WAL.Entry entry = reader.next();
+        assertNotNull(entry);
+        assertEquals(columnCount, entry.getEdit().size());
+        assertArrayEquals(hri.getEncodedNameAsBytes(), 
entry.getKey().getEncodedRegionName());
+        assertEquals(tableName, entry.getKey().getTablename());
+        int idx = 0;
+        for (Cell val : entry.getEdit().getCells()) {
+          assertTrue(Bytes.equals(row, 0, row.length, val.getRowArray(), 
val.getRowOffset(),
+            val.getRowLength()));
+          String value = i + "" + idx;
+          assertArrayEquals(Bytes.toBytes(value), CellUtil.cloneValue(val));
+          idx++;
+        }
+      }
+      WAL.Entry entry = reader.next();
+      assertNull(entry);
+    } finally {
+      if (writer != null) {
+        writer.close();
+      }
+      if (reader != null) {
+        reader.close();
+      }
+    }
+  }
+
+  protected abstract W createWriter(Path path) throws IOException;
+
+  protected abstract void append(W writer, WAL.Entry entry) throws IOException;
+
+  protected abstract void sync(W writer) throws IOException;
+}

Reply via email to