This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1 by this push:
     new 26b9e76  HBASE-22301 Consider rolling the WAL if the HDFS write 
pipeline is slow
26b9e76 is described below

commit 26b9e76bbf8f7737c8b9620e74a5b0bbebb7ae2a
Author: Andrew Purtell <[email protected]>
AuthorDate: Tue Apr 30 15:22:54 2019 -0700

    HBASE-22301 Consider rolling the WAL if the HDFS write pipeline is slow
---
 .../hbase/regionserver/wal/MetricsWALSource.java   |  19 +-
 .../regionserver/wal/MetricsWALSourceImpl.java     |  30 ++-
 .../hadoop/hbase/regionserver/LogRoller.java       |   2 +-
 .../hadoop/hbase/regionserver/wal/FSHLog.java      | 162 +++++++++++++---
 .../hadoop/hbase/regionserver/wal/MetricsWAL.java  |  19 +-
 .../hbase/regionserver/wal/WALActionsListener.java |  16 +-
 .../hadoop/hbase/wal/DisabledWALProvider.java      |   2 +-
 .../hbase/regionserver/wal/TestLogRolling.java     | 203 +++++++++++++++++++--
 .../hbase/regionserver/wal/TestMetricsWAL.java     |  16 +-
 9 files changed, 417 insertions(+), 52 deletions(-)

diff --git 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java
 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java
index 2be1d0d..5987642 100644
--- 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java
+++ 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java
@@ -58,10 +58,19 @@ public interface MetricsWALSource extends BaseSource {
   String SYNC_TIME = "syncTime";
   String SYNC_TIME_DESC = "The time it took to sync the WAL to HDFS.";
   String ROLL_REQUESTED = "rollRequest";
-  String ROLL_REQUESTED_DESC = "How many times a log roll has been requested 
total";
+  String ROLL_REQUESTED_DESC = "How many times a roll has been requested 
total";
+  String ERROR_ROLL_REQUESTED = "errorRollRequest";
+  String ERROR_ROLL_REQUESTED_DESC =
+      "How many times a roll was requested due to I/O or other errors.";
   String LOW_REPLICA_ROLL_REQUESTED = "lowReplicaRollRequest";
   String LOW_REPLICA_ROLL_REQUESTED_DESC =
-      "How many times a log roll was requested due to too few DN's in the 
write pipeline.";
+      "How many times a roll was requested due to too few datanodes in the 
write pipeline.";
+  String SLOW_SYNC_ROLL_REQUESTED = "slowSyncRollRequest";
+  String SLOW_SYNC_ROLL_REQUESTED_DESC =
+      "How many times a roll was requested due to sync too slow on the write 
pipeline.";
+  String SIZE_ROLL_REQUESTED = "sizeRollRequest";
+  String SIZE_ROLL_REQUESTED_DESC =
+      "How many times a roll was requested due to file size roll threshold.";
   String WRITTEN_BYTES = "writtenBytes";
   String WRITTEN_BYTES_DESC = "Size (in bytes) of the data written to the 
WAL.";
 
@@ -92,8 +101,14 @@ public interface MetricsWALSource extends BaseSource {
 
   void incrementLogRollRequested();
 
+  void incrementErrorLogRoll();
+
   void incrementLowReplicationLogRoll();
 
+  void incrementSlowSyncLogRoll();
+
+  void incrementSizeLogRoll();
+
   void incrementWrittenBytes(long val);
 
   long getWrittenBytes();
diff --git 
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java
 
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java
index 1299637..1900694 100644
--- 
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java
+++ 
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java
@@ -39,7 +39,10 @@ public class MetricsWALSourceImpl extends BaseSourceImpl 
implements MetricsWALSo
   private final MutableFastCounter appendCount;
   private final MutableFastCounter slowAppendCount;
   private final MutableFastCounter logRollRequested;
-  private final MutableFastCounter lowReplicationLogRollRequested;
+  private final MutableFastCounter errorRollRequested;
+  private final MutableFastCounter lowReplicationRollRequested;
+  private final MutableFastCounter slowSyncRollRequested;
+  private final MutableFastCounter sizeRollRequested;
   private final MutableFastCounter writtenBytes;
 
   public MetricsWALSourceImpl() {
@@ -61,8 +64,14 @@ public class MetricsWALSourceImpl extends BaseSourceImpl 
implements MetricsWALSo
     syncTimeHisto = this.getMetricsRegistry().newTimeHistogram(SYNC_TIME, 
SYNC_TIME_DESC);
     logRollRequested =
         this.getMetricsRegistry().newCounter(ROLL_REQUESTED, 
ROLL_REQUESTED_DESC, 0L);
-    lowReplicationLogRollRequested = this.getMetricsRegistry()
+    errorRollRequested = this.getMetricsRegistry()
+        .newCounter(ERROR_ROLL_REQUESTED, ERROR_ROLL_REQUESTED_DESC, 0L);
+    lowReplicationRollRequested = this.getMetricsRegistry()
         .newCounter(LOW_REPLICA_ROLL_REQUESTED, 
LOW_REPLICA_ROLL_REQUESTED_DESC, 0L);
+    slowSyncRollRequested = this.getMetricsRegistry()
+        .newCounter(SLOW_SYNC_ROLL_REQUESTED, SLOW_SYNC_ROLL_REQUESTED_DESC, 
0L);
+    sizeRollRequested = this.getMetricsRegistry()
+        .newCounter(SIZE_ROLL_REQUESTED, SIZE_ROLL_REQUESTED_DESC, 0L);
     writtenBytes = this.getMetricsRegistry().newCounter(WRITTEN_BYTES, 
WRITTEN_BYTES_DESC, 0l);
   }
 
@@ -97,8 +106,23 @@ public class MetricsWALSourceImpl extends BaseSourceImpl 
implements MetricsWALSo
   }
 
   @Override
+  public void incrementErrorLogRoll() {
+    errorRollRequested.incr();
+  }
+
+  @Override
   public void incrementLowReplicationLogRoll() {
-    lowReplicationLogRollRequested.incr();
+    lowReplicationRollRequested.incr();
+  }
+
+  @Override
+  public void incrementSlowSyncLogRoll() {
+    slowSyncRollRequested.incr();
+  }
+
+  @Override
+  public void incrementSizeLogRoll() {
+    sizeRollRequested.incr();
   }
 
   @Override
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
index 246c02c..fd208c2 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
@@ -71,7 +71,7 @@ public class LogRoller extends HasThread {
     if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) {
       wal.registerWALActionsListener(new WALActionsListener.Base() {
         @Override
-        public void logRollRequested(boolean lowReplicas) {
+        public void logRollRequested(WALActionsListener.RollRequestReason 
reason) {
           walNeedsRoll.put(wal, Boolean.TRUE);
           // TODO logs will contend with each other here, replace with e.g. 
DelayedQueue
           synchronized(rollLog) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index b46ad0f..8f2803b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -17,6 +17,10 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
+import static 
org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR;
+import static 
org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.LOW_REPLICATION;
+import static 
org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SIZE;
+import static 
org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SLOW_SYNC;
 import static 
org.apache.hadoop.hbase.wal.DefaultWALProvider.WAL_FILE_NAME_DELIMITER;
 
 import java.io.FileNotFoundException;
@@ -160,9 +164,18 @@ public class FSHLog implements WAL {
 
   private static final Log LOG = LogFactory.getLog(FSHLog.class);
 
-  private static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms
+  static final String SLOW_SYNC_TIME_MS ="hbase.regionserver.wal.slowsync.ms";
+  static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms
+  static final String ROLL_ON_SYNC_TIME_MS = 
"hbase.regionserver.wal.roll.on.sync.ms";
+  static final int DEFAULT_ROLL_ON_SYNC_TIME_MS = 10000; // in ms
+  static final String SLOW_SYNC_ROLL_THRESHOLD = 
"hbase.regionserver.wal.slowsync.roll.threshold";
+  static final int DEFAULT_SLOW_SYNC_ROLL_THRESHOLD = 100; // 100 slow sync 
warnings
+  static final String SLOW_SYNC_ROLL_INTERVAL_MS =
+    "hbase.regionserver.wal.slowsync.roll.interval.ms";
+  static final int DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS = 60 * 1000; // in ms, 1 
minute
 
-  private static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in 
ms, 5min
+  static final String WAL_SYNC_TIMEOUT_MS = 
"hbase.regionserver.wal.sync.timeout";
+  static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min
 
   /**
    * The nexus at which all incoming handlers meet.  Does appends and sync 
with an ordering.
@@ -280,7 +293,10 @@ public class FSHLog implements WAL {
 
   private final boolean useHsync;
 
-  private final int slowSyncNs;
+  private final long slowSyncNs, rollOnSyncNs;
+  private final int slowSyncRollThreshold;
+  private final int slowSyncCheckInterval;
+  private final AtomicInteger slowSyncCount = new AtomicInteger();
 
   private final long walSyncTimeout;
 
@@ -350,9 +366,13 @@ public class FSHLog implements WAL {
 
   private final AtomicInteger closeErrorCount = new AtomicInteger();
 
+  protected volatile boolean rollRequested;
+
   // Last time to check low replication on hlog's pipeline
   private volatile long lastTimeCheckLowReplication = 
EnvironmentEdgeManager.currentTime();
 
+  // Last time we asked to roll the log due to a slow sync
+  private volatile long lastTimeCheckSlowSync = 
EnvironmentEdgeManager.currentTime();
 
   /**
    * WAL Comparator; it compares the timestamp (log filenum), present in the 
log file name.
@@ -540,11 +560,16 @@ public class FSHLog implements WAL {
     // rollWriter sets this.hdfs_out if it can.
     rollWriter();
 
-    this.slowSyncNs =
-        1000000 * conf.getInt("hbase.regionserver.hlog.slowsync.ms",
-          DEFAULT_SLOW_SYNC_TIME_MS);
-    this.walSyncTimeout = conf.getLong("hbase.regionserver.hlog.sync.timeout",
-        DEFAULT_WAL_SYNC_TIMEOUT_MS);
+    this.slowSyncNs = 
TimeUnit.MILLISECONDS.toNanos(conf.getInt(SLOW_SYNC_TIME_MS,
+      conf.getInt("hbase.regionserver.hlog.slowsync.ms", 
DEFAULT_SLOW_SYNC_TIME_MS)));
+    this.rollOnSyncNs = 
TimeUnit.MILLISECONDS.toNanos(conf.getInt(ROLL_ON_SYNC_TIME_MS,
+      DEFAULT_ROLL_ON_SYNC_TIME_MS));
+    this.slowSyncRollThreshold = conf.getInt(SLOW_SYNC_ROLL_THRESHOLD,
+      DEFAULT_SLOW_SYNC_ROLL_THRESHOLD);
+    this.slowSyncCheckInterval = conf.getInt(SLOW_SYNC_ROLL_INTERVAL_MS,
+      DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS);
+    this.walSyncTimeout = conf.getLong(WAL_SYNC_TIMEOUT_MS,
+      conf.getLong("hbase.regionserver.hlog.sync.timeout", 
DEFAULT_WAL_SYNC_TIMEOUT_MS));
 
     // This is the 'writer' -- a single threaded executor.  This single thread 
'consumes' what is
     // put on the ring buffer.
@@ -720,6 +745,11 @@ public class FSHLog implements WAL {
         // NewPath could be equal to oldPath if replaceWriter fails.
         newPath = replaceWriter(oldPath, newPath, nextWriter, nextHdfsOut);
         tellListenersAboutPostLogRoll(oldPath, newPath);
+        // Reset rollRequested status
+        rollRequested = false;
+        // We got a new writer, so reset the slow sync count
+        lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime();
+        slowSyncCount.set(0);
         // Can we delete any of the old log files?
         if (getNumRolledLogFiles() > 0) {
           cleanOldLogs();
@@ -1303,8 +1333,11 @@ public class FSHLog implements WAL {
             syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, 
lastException);
             // Can we release other syncs?
             syncCount += releaseSyncFutures(currentSequence, lastException);
-            if (lastException != null) requestLogRoll();
-            else checkLogRoll();
+            if (lastException != null) {
+              requestLogRoll();
+            } else {
+              checkLogRoll();
+            }
           }
           postSync(System.nanoTime() - start, syncCount);
         } catch (InterruptedException e) {
@@ -1321,24 +1354,37 @@ public class FSHLog implements WAL {
    * Schedule a log roll if needed.
    */
   public void checkLogRoll() {
+    // If we have already requested a roll, do nothing
+    if (isLogRollRequested()) {
+      return;
+    }
     // Will return immediately if we are in the middle of a WAL log roll 
currently.
-    if (!rollWriterLock.tryLock()) return;
-    boolean lowReplication;
-    try {
-      lowReplication = checkLowReplication();
-    } finally {
-      rollWriterLock.unlock();
+    if (!rollWriterLock.tryLock()) {
+      return;
     }
     try {
-      if (lowReplication || (writer != null && writer.getLength() > 
logrollsize)) {
-        requestLogRoll(lowReplication);
+      if (checkLowReplication()) {
+        LOG.warn("Requesting log roll because of low replication, current 
pipeline: " +
+            Arrays.toString(getPipeLine()));
+        requestLogRoll(LOW_REPLICATION);
+      } else if (writer != null && writer.getLength() > logrollsize) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Requesting log roll because of file size threshold; 
length=" +
+            writer.getLength() + ", logrollsize=" + logrollsize);
+        }
+        requestLogRoll(SIZE);
+      } else if (checkSlowSync()) {
+        // We log this already in checkSlowSync
+        requestLogRoll(SLOW_SYNC);
       }
     } catch (IOException e) {
       LOG.warn("Writer.getLength() failed; continuing", e);
+    } finally {
+      rollWriterLock.unlock();
     }
   }
 
-  /*
+  /**
    * @return true if number of replicas for the WAL is lower than threshold
    */
   private boolean checkLowReplication() {
@@ -1389,6 +1435,41 @@ public class FSHLog implements WAL {
     return logRollNeeded;
   }
 
+  /**
+   * @return true if we exceeded the slow sync roll threshold over the last 
check
+   *              interval
+   */
+  private boolean checkSlowSync() {
+    boolean result = false;
+    long now = EnvironmentEdgeManager.currentTime();
+    long elapsedTime = now - lastTimeCheckSlowSync;
+    if (elapsedTime >= slowSyncCheckInterval) {
+      if (slowSyncCount.get() >= slowSyncRollThreshold) {
+        if (elapsedTime >= (2 * slowSyncCheckInterval)) {
+          // If two or more slowSyncCheckInterval have elapsed this is a 
corner case
+          // where a train of slow syncs almost triggered us but then there 
was a long
+          // interval from then until the one more that pushed us over. If so, 
we
+          // should do nothing and let the count reset.
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("checkSlowSync triggered but we decided to ignore it; " +
+              "count=" + slowSyncCount.get() + ", threshold=" + 
slowSyncRollThreshold +
+              ", elapsedTime=" + elapsedTime +  " ms, slowSyncCheckInterval=" +
+              slowSyncCheckInterval + " ms");
+          }
+          // Fall through to count reset below
+        } else {
+          LOG.warn("Requesting log roll because we exceeded slow sync 
threshold; count=" +
+            slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold +
+            ", current pipeline: " + Arrays.toString(getPipeLine()));
+          result = true;
+        }
+      }
+      lastTimeCheckSlowSync = now;
+      slowSyncCount.set(0);
+    }
+    return result;
+  }
+
   private SyncFuture publishSyncOnRingBuffer(long sequence) {
     return publishSyncOnRingBuffer(sequence, null, false);
   }
@@ -1452,10 +1533,23 @@ public class FSHLog implements WAL {
     if (timeInNanos > this.slowSyncNs) {
       String msg =
           new StringBuilder().append("Slow sync cost: ")
-              .append(timeInNanos / 1000000).append(" ms, current pipeline: ")
+              .append(TimeUnit.NANOSECONDS.toMillis(timeInNanos))
+              .append(" ms, current pipeline: ")
               .append(Arrays.toString(getPipeLine())).toString();
       Trace.addTimelineAnnotation(msg);
       LOG.info(msg);
+      // A single sync took too long.
+      // Elsewhere in checkSlowSync, called from checkLogRoll, we will look at 
cumulative
+      // effects. Here we have a single data point that indicates we should 
take immediate
+      // action, so do so.
+      if (timeInNanos > this.rollOnSyncNs) {
+        LOG.warn("Requesting log roll because we exceeded slow sync threshold; 
time=" +
+          TimeUnit.NANOSECONDS.toMillis(timeInNanos) + " ms, threshold=" +
+          TimeUnit.NANOSECONDS.toMillis(rollOnSyncNs) + " ms, current 
pipeline: " +
+          Arrays.toString(getPipeLine()));
+        requestLogRoll(SLOW_SYNC);
+      }
+      slowSyncCount.incrementAndGet(); // it's fine to unconditionally 
increment this
     }
     if (!listeners.isEmpty()) {
       for (WALActionsListener listener : listeners) {
@@ -1539,15 +1633,24 @@ public class FSHLog implements WAL {
     }
   }
 
+  protected boolean isLogRollRequested() {
+    return rollRequested;
+  }
+
   // public only until class moves to o.a.h.h.wal
   public void requestLogRoll() {
-    requestLogRoll(false);
+    requestLogRoll(ERROR);
   }
 
-  private void requestLogRoll(boolean tooFewReplicas) {
+  private void requestLogRoll(final WALActionsListener.RollRequestReason 
reason) {
+    // If we have already requested a roll, don't do it again
+    if (rollRequested) {
+      return;
+    }
     if (!this.listeners.isEmpty()) {
+      rollRequested = true; // No point to assert this unless there is a 
registered listener
       for (WALActionsListener i: this.listeners) {
-        i.logRollRequested(tooFewReplicas);
+        i.logRollRequested(reason);
       }
     }
   }
@@ -1599,8 +1702,7 @@ public class FSHLog implements WAL {
 
   public static final long FIXED_OVERHEAD = ClassSize.align(
     ClassSize.OBJECT + (5 * ClassSize.REFERENCE) +
-    ClassSize.ATOMIC_INTEGER + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG));
-
+    ClassSize.ATOMIC_INTEGER + (3 * Bytes.SIZEOF_INT) + (4 * 
Bytes.SIZEOF_LONG));
 
   private static void split(final Configuration conf, final Path p) throws 
IOException {
     FileSystem fs = FSUtils.getWALFileSystem(conf);
@@ -2083,4 +2185,14 @@ public class FSHLog implements WAL {
   public long getLastTimeCheckLowReplication() {
     return this.lastTimeCheckLowReplication;
   }
+
+  @VisibleForTesting
+  Writer getWriter() {
+    return this.writer;
+  }
+
+  @VisibleForTesting
+  void setWriter(Writer writer) {
+    this.writer = writer;
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java
index 69a31cd..c2b6996 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java
@@ -72,10 +72,23 @@ public class MetricsWAL extends WALActionsListener.Base {
   }
 
   @Override
-  public void logRollRequested(boolean underReplicated) {
+  public void logRollRequested(WALActionsListener.RollRequestReason reason) {
     source.incrementLogRollRequested();
-    if (underReplicated) {
-      source.incrementLowReplicationLogRoll();
+    switch (reason) {
+      case ERROR:
+        source.incrementErrorLogRoll();
+        break;
+      case LOW_REPLICATION:
+        source.incrementLowReplicationLogRoll();
+        break;
+      case SIZE:
+        source.incrementSizeLogRoll();
+        break;
+      case SLOW_SYNC:
+        source.incrementSlowSyncLogRoll();
+        break;
+      default:
+        break;
     }
   }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
index 60ab7b8..78da53f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
@@ -34,6 +34,18 @@ import org.apache.hadoop.hbase.wal.WALKey;
 @InterfaceAudience.Private
 public interface WALActionsListener {
 
+  /** The reason for the log roll request. */
+  static enum RollRequestReason {
+    /** The length of the log exceeds the roll size threshold. */
+    SIZE,
+    /** Too few replicas in the writer pipeline. */
+    LOW_REPLICATION,
+    /** Too much time spent waiting for sync. */
+    SLOW_SYNC,
+    /** I/O or other error. */
+    ERROR
+  };
+
   /**
    * The WAL is going to be rolled. The oldPath can be null if this is
    * the first log file from the regionserver.
@@ -67,7 +79,7 @@ public interface WALActionsListener {
   /**
    * A request was made that the WAL be rolled.
    */
-  void logRollRequested(boolean tooFewReplicas);
+  void logRollRequested(RollRequestReason reason);
 
   /**
    * The WAL is about to close.
@@ -130,7 +142,7 @@ public interface WALActionsListener {
     public void postLogArchive(Path oldPath, Path newPath) throws IOException 
{}
 
     @Override
-    public void logRollRequested(boolean tooFewReplicas) {}
+    public void logRollRequested(RollRequestReason reason) {}
 
     @Override
     public void logCloseRequested() {}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index 8931c08..adbd450 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -119,7 +119,7 @@ class DisabledWALProvider implements WALProvider {
     public byte[][] rollWriter() {
       if (!listeners.isEmpty()) {
         for (WALActionsListener listener : listeners) {
-          listener.logRollRequested(false);
+          
listener.logRollRequested(WALActionsListener.RollRequestReason.ERROR);
         }
         for (WALActionsListener listener : listeners) {
           try {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
index 68200d2..f799786 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -46,6 +47,7 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTable;
@@ -64,7 +66,9 @@ import org.apache.hadoop.hbase.util.JVMClusterUtil;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
 import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALProvider.Writer;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -146,9 +150,15 @@ public class TestLogRolling  {
     TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
     // the namenode might still try to choose the recently-dead datanode
     // for a pipeline, so try to a new pipeline multiple times
-     TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30);
+    TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30);
     
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.tolerable.lowreplication",
 2);
     
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.lowreplication.rolllimit",
 3);
+
+    // For slow sync threshold test: roll after 5 slow syncs in 10 seconds
+    TEST_UTIL.getConfiguration().setInt(FSHLog.SLOW_SYNC_ROLL_THRESHOLD, 5);
+    TEST_UTIL.getConfiguration().setInt(FSHLog.SLOW_SYNC_ROLL_INTERVAL_MS, 10 
* 1000);
+    // For slow sync threshold test: roll once after a sync above this 
threshold
+    TEST_UTIL.getConfiguration().setInt(FSHLog.ROLL_ON_SYNC_TIME_MS, 5000);
   }
 
   @Before
@@ -223,19 +233,187 @@ public class TestLogRolling  {
     LOG.info("after writing there are " + 
DefaultWALProvider.getNumRolledLogFiles(log) +
         " log files");
 
-      // flush all regions
-      for (Region r: server.getOnlineRegionsLocalContext()) {
-        r.flush(true);
-      }
+    // flush all regions
+    for (Region r: server.getOnlineRegionsLocalContext()) {
+      r.flush(true);
+    }
 
-      // Now roll the log
-      log.rollWriter();
+    // Now roll the log
+    log.rollWriter();
 
     int count = DefaultWALProvider.getNumRolledLogFiles(log);
     LOG.info("after flushing all regions and rolling logs there are " + count 
+ " log files");
       assertTrue(("actual count: " + count), count <= 2);
   }
 
+  @Test
+  public void testSlowSyncLogRolling() throws Exception {
+    // Create the test table
+    HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(getName()));
+    desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
+    admin.createTable(desc);
+    Table table = TEST_UTIL.getConnection().getTable(desc.getTableName());
+    int row = 1;
+    try {
+      assertTrue(((HTable) table).isAutoFlush());
+
+      // Get a reference to the FSHLog
+      server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
+      HRegionInfo region = 
server.getOnlineRegions(desc.getTableName()).get(0).getRegionInfo();
+      final FSHLog log = (FSHLog) server.getWAL(region);
+
+      // Register a WALActionsListener to observe if a SLOW_SYNC roll is 
requested
+
+      final AtomicBoolean slowSyncHookCalled = new AtomicBoolean();
+      log.registerWALActionsListener(new WALActionsListener.Base() {
+        @Override
+        public void logRollRequested(WALActionsListener.RollRequestReason 
reason) {
+          switch (reason) {
+            case SLOW_SYNC:
+              slowSyncHookCalled.lazySet(true);
+              break;
+            default:
+              break;
+          }
+        }
+      });
+
+      // Write some data
+
+      for (int i = 0; i < 10; i++) {
+        writeData(table, row++);
+      }
+
+      assertFalse("Should not have triggered log roll due to SLOW_SYNC",
+        slowSyncHookCalled.get());
+
+      // Set up for test
+      slowSyncHookCalled.set(false);
+
+      // Wrap the current writer with the anonymous class below that adds 200 
ms of
+      // latency to any sync on the hlog. This should be more than sufficient 
to trigger
+      // slow sync warnings.
+      final Writer oldWriter1 = log.getWriter();
+      final Writer newWriter1 = new Writer() {
+        @Override
+        public void close() throws IOException {
+          oldWriter1.close();
+        }
+        @Override
+        public void sync(boolean forceSync) throws IOException {
+          try {
+            Thread.sleep(200);
+          } catch (InterruptedException e) {
+            InterruptedIOException ex = new InterruptedIOException();
+            ex.initCause(e);
+            throw ex;
+          }
+          oldWriter1.sync(forceSync);
+        }
+        @Override
+        public void append(Entry entry) throws IOException {
+          oldWriter1.append(entry);
+        }
+        @Override
+        public long getLength() throws IOException {
+          return oldWriter1.getLength();
+        }
+      };
+      log.setWriter(newWriter1);
+
+      // Write some data.
+      // We need to write at least 5 times, but double it. We should only 
request
+      // a SLOW_SYNC roll once in the current interval.
+      for (int i = 0; i < 10; i++) {
+        writeData(table, row++);
+      }
+
+      // Wait for our wait injecting writer to get rolled out, as needed.
+
+      TEST_UTIL.waitFor(10000, 100, new 
Waiter.ExplainingPredicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          return log.getWriter() != newWriter1;
+        }
+        @Override
+        public String explainFailure() throws Exception {
+          return "Waited too long for our test writer to get rolled out";
+        }
+      });
+
+      assertTrue("Should have triggered log roll due to SLOW_SYNC",
+        slowSyncHookCalled.get());
+
+      // Set up for test
+      slowSyncHookCalled.set(false);
+
+      // Wrap the current writer with the anonymous class below that adds 5000 
ms of
+      // latency to any sync on the hlog.
+      // This will trip the other threshold.
+      final Writer oldWriter2 = log.getWriter();
+      final Writer newWriter2 = new Writer() {
+        @Override
+        public void close() throws IOException {
+          oldWriter2.close();
+        }
+        @Override
+        public void sync(boolean forceSync) throws IOException {
+          try {
+            Thread.sleep(5000);
+          } catch (InterruptedException e) {
+            InterruptedIOException ex = new InterruptedIOException();
+            ex.initCause(e);
+            throw ex;
+          }
+          oldWriter2.sync(forceSync);
+        }
+        @Override
+        public void append(Entry entry) throws IOException {
+          oldWriter2.append(entry);
+        }
+        @Override
+        public long getLength() throws IOException {
+          return oldWriter2.getLength();
+        }
+      };
+      log.setWriter(newWriter2);
+
+      // Write some data. Should only take one sync.
+
+      writeData(table, row++);
+
+      // Wait for our wait injecting writer to get rolled out, as needed.
+
+      TEST_UTIL.waitFor(10000, 100, new 
Waiter.ExplainingPredicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          return log.getWriter() != newWriter2;
+        }
+        @Override
+        public String explainFailure() throws Exception {
+          return "Waited too long for our test writer to get rolled out";
+        }
+      });
+
+      assertTrue("Should have triggered log roll due to SLOW_SYNC",
+        slowSyncHookCalled.get());
+
+      // Set up for test
+      slowSyncHookCalled.set(false);
+
+      // Write some data
+      for (int i = 0; i < 10; i++) {
+        writeData(table, row++);
+      }
+
+      assertFalse("Should not have triggered log roll due to SLOW_SYNC",
+        slowSyncHookCalled.get());
+
+    } finally {
+      table.close();
+    }
+  }
+
   private String getName() {
     return "TestLogRolling-" + name.getMethodName();
   }
@@ -316,12 +494,15 @@ public class TestLogRolling  {
     HRegionInfo region = 
server.getOnlineRegions(desc.getTableName()).get(0).getRegionInfo();
     final FSHLog log = (FSHLog) server.getWAL(region);
     final AtomicBoolean lowReplicationHookCalled = new AtomicBoolean(false);
-
     log.registerWALActionsListener(new WALActionsListener.Base() {
       @Override
-      public void logRollRequested(boolean lowReplication) {
-        if (lowReplication) {
-          lowReplicationHookCalled.lazySet(true);
+      public void logRollRequested(WALActionsListener.RollRequestReason 
reason) {
+        switch (reason) {
+          case LOW_REPLICATION:
+            lowReplicationHookCalled.lazySet(true);
+            break;
+          default:
+            break;
         }
       }
     });
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java
index 9a7d494..b36336c 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java
@@ -35,13 +35,21 @@ public class TestMetricsWAL {
   public void testLogRollRequested() throws Exception {
     MetricsWALSource source = mock(MetricsWALSourceImpl.class);
     MetricsWAL metricsWAL = new MetricsWAL(source);
-    metricsWAL.logRollRequested(false);
-    metricsWAL.logRollRequested(true);
+    metricsWAL.logRollRequested(WALActionsListener.RollRequestReason.ERROR);
+    
metricsWAL.logRollRequested(WALActionsListener.RollRequestReason.LOW_REPLICATION);
+    
metricsWAL.logRollRequested(WALActionsListener.RollRequestReason.SLOW_SYNC);
+    metricsWAL.logRollRequested(WALActionsListener.RollRequestReason.SIZE);
 
-    // Log roll was requested twice
-    verify(source, times(2)).incrementLogRollRequested();
+    // Log roll was requested four times
+    verify(source, times(4)).incrementLogRollRequested();
+    // One was because of an IO error.
+    verify(source, times(1)).incrementErrorLogRoll();
     // One was because of low replication on the hlog.
     verify(source, times(1)).incrementLowReplicationLogRoll();
+    // One was because of slow sync on the hlog.
+    verify(source, times(1)).incrementSlowSyncLogRoll();
+    // One was because of hlog file length limit.
+    verify(source, times(1)).incrementSizeLogRoll();
   }
 
   @Test

Reply via email to