HBASE-16824 Writer.flush() can be called on already closed streams in WAL roll


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c5172262
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c5172262
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c5172262

Branch: refs/heads/branch-1.3
Commit: c51722629418b8b5e3a6e688219ee7d806f251c7
Parents: d38310a
Author: Enis Soztutar <e...@apache.org>
Authored: Tue Oct 18 18:46:02 2016 -0700
Committer: Enis Soztutar <e...@apache.org>
Committed: Tue Oct 18 19:16:31 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/wal/FSHLog.java   | 31 ++++++++++++--
 .../wal/TestLogRollingNoCluster.java            | 43 ++++++++++++++------
 2 files changed, 57 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c5172262/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 097101b..9993d62 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -1131,6 +1131,7 @@ public class FSHLog implements WAL {
     private volatile long sequence;
     // Keep around last exception thrown. Clear on successful sync.
     private final BlockingQueue<SyncFuture> syncFutures;
+    private volatile SyncFuture takeSyncFuture = null;
 
     /**
      * UPDATE!
@@ -1220,13 +1221,21 @@ public class FSHLog implements WAL {
       return sequence;
     }
 
+    boolean areSyncFuturesReleased() {
+      // check whether there is no sync futures offered, and no in-flight sync 
futures that is being
+      // processed.
+      return syncFutures.size() <= 0
+          && takeSyncFuture == null;
+    }
+
     public void run() {
       long currentSequence;
       while (!isInterrupted()) {
         int syncCount = 0;
-        SyncFuture takeSyncFuture;
+
         try {
           while (true) {
+            takeSyncFuture = null;
             // We have to process what we 'take' from the queue
             takeSyncFuture = this.syncFutures.take();
             currentSequence = this.sequence;
@@ -1737,9 +1746,21 @@ public class FSHLog implements WAL {
      * @return True if outstanding sync futures still
      */
     private boolean isOutstandingSyncs() {
+      // Look at SyncFutures in the EventHandler
       for (int i = 0; i < this.syncFuturesCount; i++) {
         if (!this.syncFutures[i].isDone()) return true;
       }
+
+      return false;
+    }
+
+    private boolean isOutstandingSyncsFromRunners() {
+      // Look at SyncFutures in the SyncRunners
+      for (SyncRunner syncRunner: syncRunners) {
+        if(syncRunner.isAlive() && !syncRunner.areSyncFuturesReleased()) {
+          return true;
+        }
+      }
       return false;
     }
 
@@ -1850,11 +1871,13 @@ public class FSHLog implements WAL {
         // Wait on outstanding syncers; wait for them to finish syncing 
(unless we've been
         // shutdown or unless our latch has been thrown because we have been 
aborted or unless
         // this WAL is broken and we can't get a sync/append to complete).
-        while (!this.shutdown && this.zigzagLatch.isCocked() &&
-            highestSyncedSequence.get() < currentSequence &&
+        while ((!this.shutdown && this.zigzagLatch.isCocked()
+            && highestSyncedSequence.get() < currentSequence &&
             // We could be in here and all syncs are failing or failed. Check 
for this. Otherwise
             // we'll just be stuck here for ever. In other words, ensure there 
syncs running.
-            isOutstandingSyncs()) {
+            isOutstandingSyncs())
+            // Wait for all SyncRunners to finish their work so that we can 
replace the writer
+            || isOutstandingSyncsFromRunners()) {
           synchronized (this.safePointWaiter) {
             this.safePointWaiter.wait(0, 1);
           }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c5172262/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
index 7ce3615..bca4a7e 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
@@ -18,9 +18,9 @@
 package org.apache.hadoop.hbase.regionserver.wal;
 
 import static org.junit.Assert.assertFalse;
-
 import java.io.IOException;
-
+import java.util.TreeMap;
+import java.util.concurrent.ThreadLocalRandom;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKey;
@@ -49,7 +50,18 @@ import org.junit.experimental.categories.Category;
 public class TestLogRollingNoCluster {
   private final static HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
   private final static byte [] EMPTY_1K_ARRAY = new byte[1024];
-  private static final int THREAD_COUNT = 100; // Spin up this many threads
+  private static final int NUM_THREADS = 100; // Spin up this many threads
+  private static final int NUM_ENTRIES = 100; // How many entries to write
+
+  /** ProtobufLogWriter that simulates higher latencies in sync() call */
+  public static class HighLatencySyncWriter extends  ProtobufLogWriter {
+    @Override
+    public void sync() throws IOException {
+      Threads.sleep(ThreadLocalRandom.current().nextInt(10));
+      super.sync();
+      Threads.sleep(ThreadLocalRandom.current().nextInt(10));
+    }
+  }
 
   /**
    * Spin up a bunch of threads and have them all append to a WAL.  Roll the
@@ -58,37 +70,41 @@ public class TestLogRollingNoCluster {
    * @throws InterruptedException
    */
   @Test
-  public void testContendedLogRolling() throws IOException, 
InterruptedException {
-    Path dir = TEST_UTIL.getDataTestDir();
+  public void testContendedLogRolling() throws Exception {
+    TEST_UTIL.startMiniDFSCluster(3);
+    Path dir = TEST_UTIL.getDataTestDirOnTestFS();
+
     // The implementation needs to know the 'handler' count.
-    
TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 
THREAD_COUNT);
+    
TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 
NUM_THREADS);
     final Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
     FSUtils.setRootDir(conf, dir);
+    conf.set("hbase.regionserver.hlog.writer.impl", 
HighLatencySyncWriter.class.getName());
     final WALFactory wals = new WALFactory(conf, null, 
TestLogRollingNoCluster.class.getName());
     final WAL wal = wals.getWAL(new byte[]{}, null);
     
     Appender [] appenders = null;
 
-    final int count = THREAD_COUNT;
-    appenders = new Appender[count];
+    final int numThreads = NUM_THREADS;
+    appenders = new Appender[numThreads];
     try {
-      for (int i = 0; i < count; i++) {
+      for (int i = 0; i < numThreads; i++) {
         // Have each appending thread write 'count' entries
-        appenders[i] = new Appender(wal, i, count);
+        appenders[i] = new Appender(wal, i, NUM_ENTRIES);
       }
-      for (int i = 0; i < count; i++) {
+      for (int i = 0; i < numThreads; i++) {
         appenders[i].start();
       }
-      for (int i = 0; i < count; i++) {
+      for (int i = 0; i < numThreads; i++) {
         //ensure that all threads are joined before closing the wal
         appenders[i].join();
       }
     } finally {
       wals.close();
     }
-    for (int i = 0; i < count; i++) {
+    for (int i = 0; i < numThreads; i++) {
       assertFalse(appenders[i].isException());
     }
+    TEST_UTIL.shutdownMiniDFSCluster();
   }
 
   /**
@@ -137,6 +153,7 @@ public class TestLogRollingNoCluster {
           final HTableDescriptor htd = fts.get(TableName.META_TABLE_NAME);
           final long txid = wal.append(htd, hri, new 
WALKey(hri.getEncodedNameAsBytes(),
               TableName.META_TABLE_NAME, now, mvcc), edit, true);
+          Threads.sleep(ThreadLocalRandom.current().nextInt(5));
           wal.sync(txid);
         }
         String msg = getName() + " finished";

Reply via email to