HBASE-19929 Call RS.stop on a session expired RS may hang

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/dcbb3317
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/dcbb3317
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/dcbb3317

Branch: refs/heads/HBASE-19064
Commit: dcbb331792c210a71e4cebe004c8477b34993770
Parents: d8b999e
Author: zhangduo <zhang...@apache.org>
Authored: Wed Feb 7 15:52:04 2018 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Wed Feb 7 15:52:04 2018 +0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/util/DrainBarrier.java  | 134 -------
 .../hadoop/hbase/util/TestDrainBarrier.java     | 127 -------
 .../hbase/regionserver/HRegionServer.java       |   2 +-
 .../hadoop/hbase/regionserver/LogRoller.java    |  40 +-
 .../hbase/regionserver/wal/AbstractFSWAL.java   |  45 +--
 .../hbase/regionserver/wal/AsyncFSWAL.java      |  42 ++-
 .../TestShutdownWhileWALBroken.java             | 164 ++++++++
 .../hbase/regionserver/TestWALLockup.java       | 378 +++++--------------
 8 files changed, 324 insertions(+), 608 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/dcbb3317/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DrainBarrier.java
----------------------------------------------------------------------
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DrainBarrier.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DrainBarrier.java
deleted file mode 100644
index b64ebdf..0000000
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DrainBarrier.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.util;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.yetus.audience.InterfaceAudience;
-
-/**
- * A simple barrier that can be used by classes that need to wait for some 
operations to
- * finish before stopping/closing/etc. forever.
- */
-@InterfaceAudience.Private
-public class DrainBarrier {
-  /**
-   * Contains the number of outstanding operations, as well as flags.
-   * Initially, the number of operations is 1. Each beginOp increments, and 
endOp decrements it.
-   * beginOp does not proceed when it sees the draining flag. When stop is 
called, it atomically
-   * decrements the number of operations (the initial 1) and sets the draining 
flag. If stop did
-   * the decrement to zero, that means there are no more operations 
outstanding, so stop is done.
-   * Otherwise, stop blocks, and the endOp that decrements the count to 0 
unblocks it.
-   */
-  private final AtomicLong valueAndFlags = new AtomicLong(inc(0));
-  private final static long DRAINING_FLAG = 0x1;
-  private final static int FLAG_BIT_COUNT = 1;
-
-  /**
-   * Tries to start an operation.
-   * @return false iff the stop is in progress, and the operation cannot be 
started.
-   */
-  public boolean beginOp() {
-    long oldValAndFlags;
-    do {
-      oldValAndFlags = valueAndFlags.get();
-      if (isDraining(oldValAndFlags)) return false;
-    } while (!valueAndFlags.compareAndSet(oldValAndFlags, 
inc(oldValAndFlags)));
-    return true;
-  }
-
-  /**
-   * Ends the operation. Unblocks the blocked caller of stop, if necessary.
-   */
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
-      justification="First, we do change the state before notify, 2nd, it 
doesn't even matter")
-  public void endOp() {
-    long oldValAndFlags;
-    do {
-      oldValAndFlags = valueAndFlags.get();
-      long unacceptableCount = isDraining(oldValAndFlags) ? 0 : 1;
-      if (getValue(oldValAndFlags) == unacceptableCount) {
-        throw new AssertionError("endOp called without corresponding beginOp 
call ("
-          + "the current count is " + unacceptableCount + ")");
-      }
-    } while (!valueAndFlags.compareAndSet(oldValAndFlags, 
dec(oldValAndFlags)));
-    if (getValue(oldValAndFlags) == 1) {
-      synchronized (this) { this.notifyAll(); }
-    }
-  }
-
-  /**
-   * Blocks new operations from starting, waits for the current ones to drain.
-   * If someone already called it, returns immediately, which is currently 
unavoidable as
-   * most of the users stop and close things right and left, and hope for the 
best.
-   * stopAndWaitForOpsOnce asserts instead.
-   * @throws InterruptedException the wait for operations has been interrupted.
-   */
-  public void stopAndDrainOps() throws InterruptedException {
-    stopAndDrainOps(true);
-  }
-
-  /**
-   * Blocks new operations from starting, waits for the current ones to drain.
-   * Can only be called once.
-   * @throws InterruptedException the wait for operations has been interrupted.
-   */
-  public void stopAndDrainOpsOnce() throws InterruptedException {
-    stopAndDrainOps(false);
-  }
-
-  /**
-   * @param ignoreRepeatedCalls If this is true and somebody already called 
stop, this method
-   *                            will return immediately if true; if this is 
false and somebody
-   *                            already called stop, it will assert.
-   */
-  // Justification for warnings - wait is not unconditional, and contrary to 
what WA_NOT_IN_LOOP
-  // description says we are not waiting on multiple conditions.
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings({"UW_UNCOND_WAIT", 
"WA_NOT_IN_LOOP"})
-  private void stopAndDrainOps(boolean ignoreRepeatedCalls) throws 
InterruptedException {
-    long oldValAndFlags;
-    do {
-      oldValAndFlags = valueAndFlags.get();
-      if (isDraining(oldValAndFlags)) {
-        if (ignoreRepeatedCalls) return;
-        throw new AssertionError("stopAndWaitForOpsOnce called more than 
once");
-      }
-    } while (!valueAndFlags.compareAndSet(oldValAndFlags, dec(oldValAndFlags) 
| DRAINING_FLAG));
-    if (getValue(oldValAndFlags) == 1) return; // There were no operations 
outstanding.
-    synchronized (this) { this.wait(); }
-  }
-
-  // Helper methods.
-  private static final boolean isDraining(long valueAndFlags) {
-    return (valueAndFlags & DRAINING_FLAG) == DRAINING_FLAG;
-  }
-
-  private static final long getValue(long valueAndFlags) {
-    return valueAndFlags >> FLAG_BIT_COUNT;
-  }
-
-  private static final long inc(long valueAndFlags) {
-    return valueAndFlags + (1 << FLAG_BIT_COUNT); // Not checking for overflow.
-  }
-
-  private static final long dec(long valueAndFlags) {
-    return valueAndFlags - (1 << FLAG_BIT_COUNT); // Negative overflow checked 
outside.
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/dcbb3317/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestDrainBarrier.java
----------------------------------------------------------------------
diff --git 
a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestDrainBarrier.java 
b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestDrainBarrier.java
deleted file mode 100644
index 5c3d053..0000000
--- 
a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestDrainBarrier.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.util;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.testclassification.MiscTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category({MiscTests.class, SmallTests.class})
-public class TestDrainBarrier {
-
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestDrainBarrier.class);
-
-  @Test
-  public void testBeginEndStopWork() throws Exception {
-    DrainBarrier barrier = new DrainBarrier();
-    assertTrue(barrier.beginOp());
-    assertTrue(barrier.beginOp());
-    barrier.endOp();
-    barrier.endOp();
-    barrier.stopAndDrainOps();
-    assertFalse(barrier.beginOp());
-  }
-
-  @Test
-  public void testUnmatchedEndAssert() throws Exception {
-    DrainBarrier barrier = new DrainBarrier();
-    try {
-      barrier.endOp();
-      throw new Error("Should have asserted");
-    } catch (AssertionError e) {
-    }
-
-    barrier.beginOp();
-    barrier.beginOp();
-    barrier.endOp();
-    barrier.endOp();
-    try {
-      barrier.endOp();
-      throw new Error("Should have asserted");
-    } catch (AssertionError e) {
-    }
-  }
-
-  @Test
-  public void testStopWithoutOpsDoesntBlock() throws Exception {
-    DrainBarrier barrier = new DrainBarrier();
-    barrier.stopAndDrainOpsOnce();
-
-    barrier = new DrainBarrier();
-    barrier.beginOp();
-    barrier.endOp();
-    barrier.stopAndDrainOpsOnce();
-  }
-
-  @Test
-  /** This test tests blocking and can have false positives in very bad timing 
cases. */
-  public void testStopIsBlockedByOps() throws Exception {
-    final DrainBarrier barrier = new DrainBarrier();
-    barrier.beginOp();
-    barrier.beginOp();
-    barrier.beginOp();
-    barrier.endOp();
-
-    Thread stoppingThread = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          barrier.stopAndDrainOpsOnce();
-        } catch (InterruptedException e) {
-          fail("Should not have happened");
-        }
-      }
-    });
-    stoppingThread.start();
-
-    // First "end" should not unblock the thread, but the second should.
-    barrier.endOp();
-    stoppingThread.join(1000);
-    assertTrue(stoppingThread.isAlive());
-    barrier.endOp();
-    stoppingThread.join(30000); // When not broken, will be a very fast wait; 
set safe value.
-    assertFalse(stoppingThread.isAlive());
-  }
-
-  @Test
-  public void testMultipleStopOnceAssert() throws Exception {
-    DrainBarrier barrier = new DrainBarrier();
-    barrier.stopAndDrainOpsOnce();
-    try {
-      barrier.stopAndDrainOpsOnce();
-      throw new Error("Should have asserted");
-    } catch (AssertionError e) {
-    }
-  }
-
-  @Test
-  public void testMultipleSloppyStopsHaveNoEffect() throws Exception {
-    DrainBarrier barrier = new DrainBarrier();
-    barrier.stopAndDrainOps();
-    barrier.stopAndDrainOps();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/dcbb3317/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 3a93c76..0d59b12 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1181,7 +1181,7 @@ public class HRegionServer extends HasThread implements
 
   @VisibleForTesting
   protected void tryRegionServerReport(long reportStartTime, long 
reportEndTime)
-  throws IOException {
+      throws IOException {
     RegionServerStatusService.BlockingInterface rss = rssStub;
     if (rss == null) {
       // the current server could be stopping.

http://git-wip-us.apache.org/repos/asf/hbase/blob/dcbb3317/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
index 7a247cf..55c5219 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
@@ -132,6 +132,23 @@ public class LogRoller extends HasThread implements 
Closeable {
     }
   }
 
+  private void abort(String reason, Throwable cause) {
+    // close all WALs before calling abort on RS.
+    // This is because AsyncFSWAL replies on us for rolling a new writer to 
make progress, and if we
+    // failed, AsyncFSWAL may be stuck, so we need to close it to let the 
upper layer know that it
+    // is already broken.
+    for (WAL wal : walNeedsRoll.keySet()) {
+      // shutdown rather than close here since we are going to abort the RS 
and the wals need to be
+      // split when recovery
+      try {
+        wal.shutdown();
+      } catch (IOException e) {
+        LOG.warn("Failed to shutdown wal", e);
+      }
+    }
+    server.abort(reason, cause);
+  }
+
   @Override
   public void run() {
     while (running) {
@@ -153,10 +170,8 @@ public class LogRoller extends HasThread implements 
Closeable {
           continue;
         }
         // Time for periodic roll
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Wal roll period " + this.rollperiod + "ms elapsed");
-        }
-      } else if (LOG.isDebugEnabled()) {
+        LOG.debug("Wal roll period {} ms elapsed", this.rollperiod);
+      } else {
         LOG.debug("WAL roll requested");
       }
       rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH
@@ -170,20 +185,22 @@ public class LogRoller extends HasThread implements 
Closeable {
               entry.getValue().booleanValue());
           walNeedsRoll.put(wal, Boolean.FALSE);
           if (regionsToFlush != null) {
-            for (byte [] r: regionsToFlush) scheduleFlush(r);
+            for (byte[] r : regionsToFlush) {
+              scheduleFlush(r);
+            }
           }
         }
       } catch (FailedLogCloseException e) {
-        server.abort("Failed log close in log roller", e);
+        abort("Failed log close in log roller", e);
       } catch (java.net.ConnectException e) {
-        server.abort("Failed log close in log roller", e);
+        abort("Failed log close in log roller", e);
       } catch (IOException ex) {
         // Abort if we get here.  We probably won't recover an IOE. HBASE-1132
-        server.abort("IOE in log roller",
+        abort("IOE in log roller",
           ex instanceof RemoteException ? ((RemoteException) 
ex).unwrapRemoteException() : ex);
       } catch (Exception ex) {
         LOG.error("Log rolling failed", ex);
-        server.abort("Log rolling failed", ex);
+        abort("Log rolling failed", ex);
       } finally {
         try {
           rollLog.set(false);
@@ -211,9 +228,8 @@ public class LogRoller extends HasThread implements 
Closeable {
       }
     }
     if (!scheduled) {
-      LOG.warn("Failed to schedule flush of " +
-        Bytes.toString(encodedRegionName) + ", region=" + r + ", requester=" +
-        requester);
+      LOG.warn("Failed to schedule flush of {}, region={}, requester={}",
+        Bytes.toString(encodedRegionName), r, requester);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/dcbb3317/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index 54a5cd3..14fbe10 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -17,12 +17,11 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
+import static 
org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
 import static 
org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
-import static 
org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
 
 import com.lmax.disruptor.RingBuffer;
-
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
@@ -46,7 +45,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
-
 import org.apache.commons.lang3.mutable.MutableLong;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -66,7 +64,6 @@ import org.apache.hadoop.hbase.trace.TraceUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CollectionUtils;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
-import org.apache.hadoop.hbase.util.DrainBarrier;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Pair;
@@ -84,6 +81,7 @@ import org.apache.htrace.core.TraceScope;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 
 /**
@@ -173,9 +171,6 @@ public abstract class AbstractFSWAL<W extends WriterBase> 
implements WAL {
    */
   protected final SequenceIdAccounting sequenceIdAccounting = new 
SequenceIdAccounting();
 
-  /** The barrier used to ensure that close() waits for all log rolls and 
flushes to finish. */
-  protected final DrainBarrier closeBarrier = new DrainBarrier();
-
   protected final long slowSyncNs;
 
   private final long walSyncTimeoutNs;
@@ -452,32 +447,22 @@ public abstract class AbstractFSWAL<W extends WriterBase> 
implements WAL {
 
   @Override
   public Long startCacheFlush(byte[] encodedRegionName, Set<byte[]> families) {
-    if (!closeBarrier.beginOp()) {
-      LOG.info("Flush not started for " + Bytes.toString(encodedRegionName) + 
"; server closing.");
-      return null;
-    }
     return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, 
families);
   }
 
   @Override
   public Long startCacheFlush(byte[] encodedRegionName, Map<byte[], Long> 
familyToSeq) {
-    if (!closeBarrier.beginOp()) {
-      LOG.info("Flush not started for " + Bytes.toString(encodedRegionName) + 
"; server closing.");
-      return null;
-    }
     return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, 
familyToSeq);
   }
 
   @Override
   public void completeCacheFlush(byte[] encodedRegionName) {
     this.sequenceIdAccounting.completeCacheFlush(encodedRegionName);
-    closeBarrier.endOp();
   }
 
   @Override
   public void abortCacheFlush(byte[] encodedRegionName) {
     this.sequenceIdAccounting.abortCacheFlush(encodedRegionName);
-    closeBarrier.endOp();
   }
 
   @Override
@@ -715,7 +700,11 @@ public abstract class AbstractFSWAL<W extends WriterBase> 
implements WAL {
     // Now we have published the ringbuffer, halt the current thread until we 
get an answer back.
     try {
       if (syncFuture != null) {
-        syncFuture.get(walSyncTimeoutNs);
+        if (closed) {
+          throw new IOException("WAL has been closed");
+        } else {
+          syncFuture.get(walSyncTimeoutNs);
+        }
       }
     } catch (TimeoutIOException tioe) {
       // SyncFuture reuse by thread, if TimeoutIOException happens, ringbuffer
@@ -755,10 +744,6 @@ public abstract class AbstractFSWAL<W extends WriterBase> 
implements WAL {
         LOG.debug("WAL closed. Skipping rolling of writer");
         return regionsToFlush;
       }
-      if (!closeBarrier.beginOp()) {
-        LOG.debug("WAL closing. Skipping rolling of writer");
-        return regionsToFlush;
-      }
       try (TraceScope scope = TraceUtil.createTrace("FSHLog.rollWriter")) {
         Path oldPath = getOldPath();
         Path newPath = getNewPath();
@@ -783,8 +768,6 @@ public abstract class AbstractFSWAL<W extends WriterBase> 
implements WAL {
         throw new IOException(
             "Underlying FileSystem can't meet stream requirements. See RS log 
" + "for details.",
             exception);
-      } finally {
-        closeBarrier.endOp();
       }
       return regionsToFlush;
     } finally {
@@ -818,20 +801,18 @@ public abstract class AbstractFSWAL<W extends WriterBase> 
implements WAL {
       return;
     }
     closed = true;
-    try {
-      // Prevent all further flushing and rolling.
-      closeBarrier.stopAndDrainOps();
-    } catch (InterruptedException e) {
-      LOG.error("Exception while waiting for cache flushes and log rolls", e);
-      Thread.currentThread().interrupt();
-    }
     // Tell our listeners that the log is closing
     if (!this.listeners.isEmpty()) {
       for (WALActionsListener i : this.listeners) {
         i.logCloseRequested();
       }
     }
-    doShutdown();
+    rollWriterLock.lock();
+    try {
+      doShutdown();
+    } finally {
+      rollWriterLock.unlock();
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/dcbb3317/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index faf3b77..19d89df 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -608,19 +608,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> 
{
 
   @Override
   protected AsyncWriter createWriterInstance(Path path) throws IOException {
-    try {
-      return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, 
eventLoopGroup,
-        channelClass);
-    } catch (IOException e) {
-      // this usually means master already think we are dead so let's fail all 
the pending
-      // syncs. The shutdown process of RS will wait for all regions to be 
closed before calling
-      // WAL.close so if we do not wake up the thread blocked by sync here it 
will cause dead
-      // lock.
-      if (e.getMessage().contains("Parent directory doesn't exist:")) {
-        syncFutures.forEach(f -> f.done(f.getTxid(), e));
-      }
-      throw e;
-    }
+    return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, 
eventLoopGroup,
+      channelClass);
   }
 
   private void waitForSafePoint() {
@@ -675,17 +664,34 @@ public class AsyncFSWAL extends 
AbstractFSWAL<AsyncWriter> {
     closeExecutor.shutdown();
     try {
       if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, 
TimeUnit.SECONDS)) {
-        LOG.error("We have waited " + waitOnShutdownInSeconds + " seconds but"
-          + " the close of async writer doesn't complete."
-          + "Please check the status of underlying filesystem"
-          + " or increase the wait time by the config \""
-          + ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS + "\"");
+        LOG.error("We have waited " + waitOnShutdownInSeconds + " seconds but" 
+
+          " the close of async writer doesn't complete." +
+          "Please check the status of underlying filesystem" +
+          " or increase the wait time by the config \"" + 
ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS +
+          "\"");
       }
     } catch (InterruptedException e) {
       LOG.error("The wait for close of async writer is interrupted");
       Thread.currentThread().interrupt();
     }
     IOException error = new IOException("WAL has been closed");
+    long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1;
+    // drain all the pending sync requests
+    for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor <= 
cursorBound;
+      nextCursor++) {
+      if (!waitingConsumePayloads.isPublished(nextCursor)) {
+        break;
+      }
+      RingBufferTruck truck = waitingConsumePayloads.get(nextCursor);
+      switch (truck.type()) {
+        case SYNC:
+          syncFutures.add(truck.unloadSync());
+          break;
+        default:
+          break;
+      }
+    }
+    // and fail them
     syncFutures.forEach(f -> f.done(f.getTxid(), error));
     if (!(consumeExecutor instanceof EventLoop)) {
       consumeExecutor.shutdown();

http://git-wip-us.apache.org/repos/asf/hbase/blob/dcbb3317/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestShutdownWhileWALBroken.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestShutdownWhileWALBroken.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestShutdownWhileWALBroken.java
new file mode 100644
index 0000000..6c9b5e3
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestShutdownWhileWALBroken.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.YouAreDeadException;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * See HBASE-19929 for more details.
+ */
+@RunWith(Parameterized.class)
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestShutdownWhileWALBroken {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestShutdownWhileWALBroken.class);
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestShutdownWhileWALBroken.class);
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static TableName TABLE_NAME = 
TableName.valueOf("TestShutdownWhileWALBroken");
+
+  private static byte[] CF = Bytes.toBytes("CF");
+
+  @Parameter
+  public String walType;
+
+  @Parameters(name = "{index}: WAL={0}")
+  public static List<Object[]> params() {
+    return Arrays.asList(new Object[] { "asyncfs" }, new Object[] { 
"filesystem" });
+  }
+
+  public static final class MyRegionServer extends HRegionServer {
+
+    private final CountDownLatch latch = new CountDownLatch(1);
+
+    public MyRegionServer(Configuration conf) throws IOException {
+      super(conf);
+    }
+
+    @Override
+    protected void tryRegionServerReport(long reportStartTime, long 
reportEndTime)
+        throws IOException {
+      try {
+        super.tryRegionServerReport(reportStartTime, reportEndTime);
+      } catch (YouAreDeadException e) {
+        LOG.info("Caught YouAreDeadException, ignore", e);
+      }
+    }
+
+    @Override
+    public void abort(String reason, Throwable cause) {
+      if (cause instanceof SessionExpiredException) {
+        // called from ZKWatcher, let's wait a bit to make sure that we call 
stop before calling
+        // abort.
+        try {
+          latch.await();
+        } catch (InterruptedException e) {
+        }
+      } else {
+        // abort from other classes, usually LogRoller, now we can make 
progress on abort.
+        latch.countDown();
+      }
+      super.abort(reason, cause);
+    }
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    UTIL.getConfiguration().setClass(HConstants.REGION_SERVER_IMPL, 
MyRegionServer.class,
+      HRegionServer.class);
+    UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, walType);
+    UTIL.getConfiguration().set(WALFactory.META_WAL_PROVIDER, walType);
+    UTIL.startMiniCluster(2);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void test() throws Exception {
+    UTIL.createMultiRegionTable(TABLE_NAME, CF);
+    try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) {
+      UTIL.loadTable(table, CF);
+    }
+    int numRegions = UTIL.getMiniHBaseCluster().getRegions(TABLE_NAME).size();
+    RegionServerThread rst0 = 
UTIL.getMiniHBaseCluster().getRegionServerThreads().get(0);
+    RegionServerThread rst1 = 
UTIL.getMiniHBaseCluster().getRegionServerThreads().get(1);
+    HRegionServer liveRS;
+    RegionServerThread toKillRSThread;
+    if 
(rst1.getRegionServer().getRegions(TableName.META_TABLE_NAME).isEmpty()) {
+      liveRS = rst0.getRegionServer();
+      toKillRSThread = rst1;
+    } else {
+      liveRS = rst1.getRegionServer();
+      toKillRSThread = rst0;
+    }
+    assertTrue(liveRS.getRegions(TABLE_NAME).size() < numRegions);
+    UTIL.expireSession(toKillRSThread.getRegionServer().getZooKeeper(), false);
+    UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
+
+      @Override
+      public boolean evaluate() throws Exception {
+        return liveRS.getRegions(TABLE_NAME).size() == numRegions;
+      }
+
+      @Override
+      public String explainFailure() throws Exception {
+        return "Failover is not finished yet";
+      }
+    });
+    toKillRSThread.getRegionServer().stop("Stop for test");
+    // make sure that we can successfully quit
+    toKillRSThread.join();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/dcbb3317/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
index 1e59248b..ca65914 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.CoordinatedStateManager;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
@@ -43,9 +42,9 @@ import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.regionserver.wal.DamagedWALException;
 import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
-import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
 import org.apache.hadoop.hbase.util.Threads;
@@ -67,11 +66,12 @@ import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
 /**
- * Testing for lock up of WAL subsystem.
- * Copied from TestHRegion.
+ * Testing for lock up of FSHLog.
  */
-@Category({MediumTests.class})
+@Category({ RegionServerTests.class, MediumTests.class })
 public class TestWALLockup {
 
   @ClassRule
@@ -79,14 +79,15 @@ public class TestWALLockup {
       HBaseClassTestRule.forClass(TestWALLockup.class);
 
   private static final Logger LOG = 
LoggerFactory.getLogger(TestWALLockup.class);
-  @Rule public TestName name = new TestName();
+
+  @Rule
+  public TestName name = new TestName();
 
   private static final String COLUMN_FAMILY = "MyCF";
   private static final byte [] COLUMN_FAMILY_BYTES = 
Bytes.toBytes(COLUMN_FAMILY);
 
   HRegion region = null;
-  // Do not run unit tests in parallel (? Why not?  It don't work?  Why not?  
St.Ack)
-  private static HBaseTestingUtility TEST_UTIL;
+  private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
   private static Configuration CONF ;
   private String dir;
 
@@ -95,7 +96,6 @@ public class TestWALLockup {
 
   @Before
   public void setup() throws IOException {
-    TEST_UTIL = HBaseTestingUtility.createLocalHTU();
     CONF = TEST_UTIL.getConfiguration();
     // Disable block cache.
     CONF.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
@@ -110,100 +110,99 @@ public class TestWALLockup {
     TEST_UTIL.cleanupTestDir();
   }
 
-  String getName() {
+  private String getName() {
     return name.getMethodName();
   }
 
-  /**
-   * Reproduce locking up that happens when we get an inopportune sync during 
setup for
-   * zigzaglatch wait. See HBASE-14317. If below is broken, we will see this 
test timeout because
-   * it is locked up.
-   * <p>First I need to set up some mocks for Server and RegionServerServices. 
I also need to
-   * set up a dodgy WAL that will throw an exception when we go to append to 
it.
-   */
-  @Test (timeout=20000)
-  public void testLockupWhenSyncInMiddleOfZigZagSetup() throws IOException {
-    // A WAL that we can have throw exceptions when a flag is set.
-    class DodgyFSLog extends FSHLog {
-      // Set this when want the WAL to start throwing exceptions.
-      volatile boolean throwException = false;
+  // A WAL that we can have throw exceptions when a flag is set.
+  private static final class DodgyFSLog extends FSHLog {
+    // Set this when want the WAL to start throwing exceptions.
+    volatile boolean throwException = false;
 
-      // Latch to hold up processing until after another operation has had 
time to run.
-      CountDownLatch latch = new CountDownLatch(1);
+    // Latch to hold up processing until after another operation has had time 
to run.
+    CountDownLatch latch = new CountDownLatch(1);
 
-      public DodgyFSLog(FileSystem fs, Path root, String logDir, Configuration 
conf)
-      throws IOException {
-        super(fs, root, logDir, conf);
-      }
+    public DodgyFSLog(FileSystem fs, Path root, String logDir, Configuration 
conf)
+        throws IOException {
+      super(fs, root, logDir, conf);
+    }
 
-      @Override
-      protected void afterCreatingZigZagLatch() {
-        // If throwException set, then append will throw an exception causing 
the WAL to be
-        // rolled. We'll come in here. Hold up processing until a sync can get 
in before
-        // the zigzag has time to complete its setup and get its own sync in. 
This is what causes
-        // the lock up we've seen in production.
-        if (throwException) {
-          try {
-            LOG.info("LATCHED");
-            // So, timing can have it that the test can run and the bad flush 
below happens
-            // before we get here. In this case, we'll be stuck waiting on 
this latch but there
-            // is nothing in the WAL pipeline to get us to the below 
beforeWaitOnSafePoint...
-            // because all WALs have rolled. In this case, just give up on 
test.
-            if (!this.latch.await(5, TimeUnit.SECONDS)) {
-              LOG.warn("GIVE UP! Failed waiting on latch...Test is ABORTED!");
-            }
-          } catch (InterruptedException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
+    @Override
+    protected void afterCreatingZigZagLatch() {
+      // If throwException set, then append will throw an exception causing 
the WAL to be
+      // rolled. We'll come in here. Hold up processing until a sync can get 
in before
+      // the zigzag has time to complete its setup and get its own sync in. 
This is what causes
+      // the lock up we've seen in production.
+      if (throwException) {
+        try {
+          LOG.info("LATCHED");
+          // So, timing can have it that the test can run and the bad flush 
below happens
+          // before we get here. In this case, we'll be stuck waiting on this 
latch but there
+          // is nothing in the WAL pipeline to get us to the below 
beforeWaitOnSafePoint...
+          // because all WALs have rolled. In this case, just give up on test.
+          if (!this.latch.await(5, TimeUnit.SECONDS)) {
+            LOG.warn("GIVE UP! Failed waiting on latch...Test is ABORTED!");
           }
+        } catch (InterruptedException e) {
         }
       }
+    }
 
-      @Override
-      protected void beforeWaitOnSafePoint() {
-        if (throwException) {
-          LOG.info("COUNTDOWN");
-          // Don't countdown latch until someone waiting on it otherwise, the 
above
-          // afterCreatingZigZagLatch will get to the latch and no one will 
ever free it and we'll
-          // be stuck; test won't go down
-          while (this.latch.getCount() <= 0) Threads.sleep(1);
-          this.latch.countDown();
-        }
+    @Override
+    protected void beforeWaitOnSafePoint() {
+      if (throwException) {
+        LOG.info("COUNTDOWN");
+        // Don't countdown latch until someone waiting on it otherwise, the 
above
+        // afterCreatingZigZagLatch will get to the latch and no one will ever 
free it and we'll
+        // be stuck; test won't go down
+        while (this.latch.getCount() <= 0)
+          Threads.sleep(1);
+        this.latch.countDown();
       }
+    }
 
-      @Override
-      protected Writer createWriterInstance(Path path) throws IOException {
-        final Writer w = super.createWriterInstance(path);
-        return new Writer() {
-          @Override
-          public void close() throws IOException {
-            w.close();
-          }
+    @Override
+    protected Writer createWriterInstance(Path path) throws IOException {
+      final Writer w = super.createWriterInstance(path);
+      return new Writer() {
+        @Override
+        public void close() throws IOException {
+          w.close();
+        }
 
-          @Override
-          public void sync() throws IOException {
-            if (throwException) {
-              throw new IOException("FAKE! Failed to replace a bad 
datanode...SYNC");
-            }
-            w.sync();
+        @Override
+        public void sync() throws IOException {
+          if (throwException) {
+            throw new IOException("FAKE! Failed to replace a bad 
datanode...SYNC");
           }
+          w.sync();
+        }
 
-          @Override
-          public void append(Entry entry) throws IOException {
-            if (throwException) {
-              throw new IOException("FAKE! Failed to replace a bad 
datanode...APPEND");
-            }
-            w.append(entry);
+        @Override
+        public void append(Entry entry) throws IOException {
+          if (throwException) {
+            throw new IOException("FAKE! Failed to replace a bad 
datanode...APPEND");
           }
+          w.append(entry);
+        }
 
-          @Override
-          public long getLength() {
-            return w.getLength();
-          }
-        };
-      }
+        @Override
+        public long getLength() {
+          return w.getLength();
+        }
+      };
     }
+  }
 
+  /**
+   * Reproduce locking up that happens when we get an inopportune sync during 
setup for
+   * zigzaglatch wait. See HBASE-14317. If below is broken, we will see this 
test timeout because
+   * it is locked up.
+   * <p>First I need to set up some mocks for Server and RegionServerServices. 
I also need to
+   * set up a dodgy WAL that will throw an exception when we go to append to 
it.
+   */
+  @Test
+  public void testLockupWhenSyncInMiddleOfZigZagSetup() throws IOException {
     // Mocked up server and regionserver services. Needed below.
     Server server = Mockito.mock(Server.class);
     Mockito.when(server.getConfiguration()).thenReturn(CONF);
@@ -222,7 +221,6 @@ public class TestWALLockup {
     // There is no 'stop' once a logRoller is running.. it just dies.
     logRoller.start();
     // Now get a region and start adding in edits.
-    HTableDescriptor htd = new HTableDescriptor(TableName.META_TABLE_NAME);
     final HRegion region = initHRegion(tableName, null, null, dodgyWAL);
     byte [] bytes = Bytes.toBytes(getName());
     NavigableMap<byte[], Integer> scopes = new TreeMap<>(
@@ -236,7 +234,7 @@ public class TestWALLockup {
       Put put = new Put(bytes);
       put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
       WALKeyImpl key = new 
WALKeyImpl(region.getRegionInfo().getEncodedNameAsBytes(),
-          htd.getTableName(), System.currentTimeMillis(), mvcc, scopes);
+        TableName.META_TABLE_NAME, System.currentTimeMillis(), mvcc, scopes);
       WALEdit edit = new WALEdit();
       CellScanner CellScanner = put.cellScanner();
       assertTrue(CellScanner.advance());
@@ -281,7 +279,9 @@ public class TestWALLockup {
       t.setDaemon(true);
       t.start();
       // Wait until
-      while (dodgyWAL.latch.getCount() > 0) Threads.sleep(1);
+      while (dodgyWAL.latch.getCount() > 0) {
+        Threads.sleep(1);
+      }
       // Now assert I got a new WAL file put in place even though loads of 
errors above.
       assertTrue(originalWAL != dodgyWAL.getCurrentFileName());
       // Can I append to it?
@@ -294,203 +294,13 @@ public class TestWALLockup {
     } finally {
       // To stop logRoller, its server has to say it is stopped.
       Mockito.when(server.isStopped()).thenReturn(true);
-      if (logRoller != null) logRoller.close();
-      try {
-        if (region != null) region.close();
-        if (dodgyWAL != null) dodgyWAL.close();
-      } catch (Exception e) {
-        LOG.info("On way out", e);
-      }
-    }
-  }
-
-  /**
-   * Reproduce locking up that happens when there's no further syncs after
-   * append fails, and causing an isolated sync then infinite wait. See
-   * HBASE-16960. If below is broken, we will see this test timeout because it
-   * is locked up.
-   * <p/>
-   * Steps for reproduce:<br/>
-   * 1. Trigger server abort through dodgyWAL1<br/>
-   * 2. Add a {@link DummyWALActionsListener} to dodgyWAL2 to cause ringbuffer
-   * event handler thread sleep for a while thus keeping {@code endOfBatch}
-   * false<br/>
-   * 3. Publish a sync then an append which will throw exception, check whether
-   * the sync could return
-   */
-  @Test(timeout = 20000)
-  public void testLockup16960() throws IOException {
-    // A WAL that we can have throw exceptions when a flag is set.
-    class DodgyFSLog extends FSHLog {
-      // Set this when want the WAL to start throwing exceptions.
-      volatile boolean throwException = false;
-
-      public DodgyFSLog(FileSystem fs, Path root, String logDir,
-          Configuration conf) throws IOException {
-        super(fs, root, logDir, conf);
-      }
-
-      @Override
-      protected Writer createWriterInstance(Path path) throws IOException {
-        final Writer w = super.createWriterInstance(path);
-        return new Writer() {
-          @Override
-          public void close() throws IOException {
-            w.close();
-          }
-
-          @Override
-          public void sync() throws IOException {
-            if (throwException) {
-              throw new IOException(
-                  "FAKE! Failed to replace a bad datanode...SYNC");
-            }
-            w.sync();
-          }
-
-          @Override
-          public void append(Entry entry) throws IOException {
-            if (throwException) {
-              throw new IOException(
-                  "FAKE! Failed to replace a bad datanode...APPEND");
-            }
-            w.append(entry);
-          }
-
-          @Override
-          public long getLength() {
-            return w.getLength();
-          }
-        };
-      }
-
-      @Override
-      protected long doReplaceWriter(Path oldPath, Path newPath,
-          Writer nextWriter) throws IOException {
-        if (throwException) {
-          throw new FailedLogCloseException("oldPath=" + oldPath + ", newPath="
-              + newPath);
-        }
-        long oldFileLen = 0L;
-        oldFileLen = super.doReplaceWriter(oldPath, newPath, nextWriter);
-        return oldFileLen;
-      }
-    }
-
-    // Mocked up server and regionserver services. Needed below.
-    Server server = new DummyServer(CONF, ServerName.valueOf(
-        "hostname1.example.org", 1234, 1L).toString());
-    RegionServerServices services = Mockito.mock(RegionServerServices.class);
-
-    CONF.setLong("hbase.regionserver.hlog.sync.timeout", 10000);
-
-    // OK. Now I have my mocked up Server & RegionServerServices and dodgy WAL,
-    // go ahead with test.
-    FileSystem fs = FileSystem.get(CONF);
-    Path rootDir = new Path(dir + getName());
-    DodgyFSLog dodgyWAL1 = new DodgyFSLog(fs, rootDir, getName(), CONF);
-
-    Path rootDir2 = new Path(dir + getName() + "2");
-    final DodgyFSLog dodgyWAL2 = new DodgyFSLog(fs, rootDir2, getName() + "2",
-        CONF);
-    // Add a listener to force ringbuffer event handler sleep for a while
-    dodgyWAL2.registerWALActionsListener(new DummyWALActionsListener());
-
-    // I need a log roller running.
-    LogRoller logRoller = new LogRoller(server, services);
-    logRoller.addWAL(dodgyWAL1);
-    logRoller.addWAL(dodgyWAL2);
-    // There is no 'stop' once a logRoller is running.. it just dies.
-    logRoller.start();
-    // Now get a region and start adding in edits.
-    HTableDescriptor htd = new HTableDescriptor(TableName.META_TABLE_NAME);
-    final HRegion region = initHRegion(tableName, null, null, dodgyWAL1);
-    byte[] bytes = Bytes.toBytes(getName());
-    NavigableMap<byte[], Integer> scopes = new TreeMap<>(
-        Bytes.BYTES_COMPARATOR);
-    scopes.put(COLUMN_FAMILY_BYTES, 0);
-    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
-    try {
-      Put put = new Put(bytes);
-      put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
-      WALKeyImpl key = new 
WALKeyImpl(region.getRegionInfo().getEncodedNameAsBytes(),
-          htd.getTableName(), System.currentTimeMillis(), mvcc, scopes);
-      WALEdit edit = new WALEdit();
-      CellScanner CellScanner = put.cellScanner();
-      assertTrue(CellScanner.advance());
-      edit.add(CellScanner.current());
-
-      LOG.info("SET throwing of exception on append");
-      dodgyWAL1.throwException = true;
-      // This append provokes a WAL roll request
-      dodgyWAL1.append(region.getRegionInfo(), key, edit, true);
-      boolean exception = false;
-      try {
-        dodgyWAL1.sync();
-      } catch (Exception e) {
-        exception = true;
-      }
-      assertTrue("Did not get sync exception", exception);
-
-      // LogRoller call dodgyWAL1.rollWriter get FailedLogCloseException and
-      // cause server abort.
-      try {
-        // wait LogRoller exit.
-        Thread.sleep(50);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-
-      final CountDownLatch latch = new CountDownLatch(1);
-
-      // make RingBufferEventHandler sleep 1s, so the following sync
-      // endOfBatch=false
-      key = new WALKeyImpl(region.getRegionInfo().getEncodedNameAsBytes(),
-          TableName.valueOf("sleep"), System.currentTimeMillis(), mvcc, 
scopes);
-      dodgyWAL2.append(region.getRegionInfo(), key, edit, true);
-
-      Thread t = new Thread("Sync") {
-        @Override
-        public void run() {
-          try {
-            dodgyWAL2.sync();
-          } catch (IOException e) {
-            LOG.info("In sync", e);
-          }
-          latch.countDown();
-          LOG.info("Sync exiting");
-        }
-      };
-      t.setDaemon(true);
-      t.start();
-      try {
-        // make sure sync have published.
-        Thread.sleep(100);
-      } catch (InterruptedException e1) {
-        e1.printStackTrace();
-      }
-      // make append throw DamagedWALException
-      key = new WALKeyImpl(region.getRegionInfo().getEncodedNameAsBytes(),
-          TableName.valueOf("DamagedWALException"), 
System.currentTimeMillis(), mvcc, scopes);
-      dodgyWAL2.append(region.getRegionInfo(), key, edit, true);
-
-      while (latch.getCount() > 0) {
-        Threads.sleep(100);
-      }
-      assertTrue(server.isAborted());
-    } finally {
-      if (logRoller != null) {
-        logRoller.close();
-      }
+      Closeables.close(logRoller, true);
       try {
         if (region != null) {
           region.close();
         }
-        if (dodgyWAL1 != null) {
-          dodgyWAL1.close();
-        }
-        if (dodgyWAL2 != null) {
-          dodgyWAL2.close();
+        if (dodgyWAL != null) {
+          dodgyWAL.close();
         }
       } catch (Exception e) {
         LOG.info("On way out", e);
@@ -606,11 +416,11 @@ public class TestWALLockup {
   }
 
   /**
-   * @return A region on which you must call
-   *         {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
+   * @return A region on which you must call {@link 
HBaseTestingUtility#closeRegionAndWAL(HRegion)}
+   *         when done.
    */
-  public static HRegion initHRegion(TableName tableName, byte[] startKey, 
byte[] stopKey, WAL wal)
-  throws IOException {
+  private static HRegion initHRegion(TableName tableName, byte[] startKey, 
byte[] stopKey, WAL wal)
+      throws IOException {
     ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 
0, null);
     return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, false, 
Durability.SYNC_WAL,
       wal, COLUMN_FAMILY_BYTES);

Reply via email to