Repository: hbase
Updated Branches:
  refs/heads/master 1339ff966 -> 773aff90f


HBASE-20417 Do not read wal entries when peer is disabled


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/773aff90
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/773aff90
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/773aff90

Branch: refs/heads/master
Commit: 773aff90fdf9edf4687b373d1c7ba859d166a437
Parents: 1339ff9
Author: zhangduo <zhang...@apache.org>
Authored: Sat Apr 14 22:00:15 2018 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Mon Apr 16 22:07:33 2018 +0800

----------------------------------------------------------------------
 .../regionserver/ReplicationSourceShipper.java  | 15 ++---
 .../ReplicationSourceWALReader.java             |  4 ++
 .../regionserver/TestWALEntryStream.java        | 59 +++++++++++++++++++-
 3 files changed, 68 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/773aff90/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 2097d00..11fd660 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -87,12 +87,11 @@ public class ReplicationSourceShipper extends Thread {
     setWorkerState(WorkerState.RUNNING);
     // Loop until we close down
     while (isActive()) {
-      int sleepMultiplier = 1;
       // Sleep until replication is enabled again
       if (!source.isPeerEnabled()) {
-        if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
-          sleepMultiplier++;
-        }
+        // The peer enabled check is in memory, not expensive, so do not need 
to increase the
+        // sleep interval as it may cause a long lag when we enable the peer.
+        sleepForRetries("Replication is disabled", 1);
         continue;
       }
       try {
@@ -188,8 +187,8 @@ public class ReplicationSourceShipper extends Thread {
         }
         break;
       } catch (Exception ex) {
-        LOG.warn(source.getReplicationEndpoint().getClass().getName() + " 
threw unknown exception:"
-            + org.apache.hadoop.util.StringUtils.stringifyException(ex));
+        LOG.warn("{} threw unknown exception:",
+          source.getReplicationEndpoint().getClass().getName(), ex);
         if (sleepForRetries("ReplicationEndpoint threw exception", 
sleepMultiplier)) {
           sleepMultiplier++;
         }
@@ -292,9 +291,7 @@ public class ReplicationSourceShipper extends Thread {
    */
   public boolean sleepForRetries(String msg, int sleepMultiplier) {
     try {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + 
sleepMultiplier);
-      }
+      LOG.trace("{}, sleeping {} times {}", msg, sleepForRetries, 
sleepMultiplier);
       Thread.sleep(this.sleepForRetries * sleepMultiplier);
     } catch (InterruptedException e) {
       LOG.debug("Interrupted while sleeping between retries");

http://git-wip-us.apache.org/repos/asf/hbase/blob/773aff90/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 7ba347f..64fd48d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -126,6 +126,10 @@ class ReplicationSourceWALReader extends Thread {
               source.getWALFileLengthProvider(), 
source.getServerWALsBelongTo(),
               source.getSourceMetrics())) {
         while (isReaderRunning()) { // loop here to keep reusing stream while 
we can
+          if (!source.isPeerEnabled()) {
+            Threads.sleep(sleepForRetries);
+            continue;
+          }
           if (!checkQuota()) {
             continue;
           }

http://git-wip-us.apache.org/repos/asf/hbase/blob/773aff90/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index 2670756..35e4f82 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -30,7 +30,12 @@ import java.io.IOException;
 import java.util.NavigableMap;
 import java.util.OptionalLong;
 import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.Future;
 import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -328,7 +333,7 @@ public class TestWALEntryStream {
     }
   }
 
-  private ReplicationSourceWALReader createReader(boolean recovered, 
Configuration conf) {
+  private ReplicationSource mockReplicationSource(boolean recovered, 
Configuration conf) {
     ReplicationSourceManager mockSourceManager = 
Mockito.mock(ReplicationSourceManager.class);
     when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
     Server mockServer = Mockito.mock(Server.class);
@@ -338,6 +343,12 @@ public class TestWALEntryStream {
     when(source.getWALFileLengthProvider()).thenReturn(log);
     when(source.getServer()).thenReturn(mockServer);
     when(source.isRecovered()).thenReturn(recovered);
+    return source;
+  }
+
+  private ReplicationSourceWALReader createReader(boolean recovered, 
Configuration conf) {
+    ReplicationSource source = mockReplicationSource(recovered, conf);
+    when(source.isPeerEnabled()).thenReturn(true);
     ReplicationSourceWALReader reader =
       new ReplicationSourceWALReader(fs, conf, walQueue, 0, getDummyFilter(), 
source);
     reader.start();
@@ -460,6 +471,52 @@ public class TestWALEntryStream {
     assertFalse(entryBatch.isEndOfFile());
   }
 
+  @Test
+  public void testReplicationSourceWALReaderDisabled()
+      throws IOException, InterruptedException, ExecutionException {
+    appendEntriesToLogAndSync(3);
+    // get ending position
+    long position;
+    try (WALEntryStream entryStream =
+      new WALEntryStream(walQueue, fs, CONF, 0, log, null, new 
MetricsSource("1"))) {
+      entryStream.next();
+      entryStream.next();
+      entryStream.next();
+      position = entryStream.getPosition();
+    }
+
+    // start up a reader
+    Path walPath = walQueue.peek();
+    ReplicationSource source = mockReplicationSource(false, CONF);
+    AtomicInteger invokeCount = new AtomicInteger(0);
+    AtomicBoolean enabled = new AtomicBoolean(false);
+    when(source.isPeerEnabled()).then(i -> {
+      invokeCount.incrementAndGet();
+      return enabled.get();
+    });
+
+    ReplicationSourceWALReader reader =
+      new ReplicationSourceWALReader(fs, CONF, walQueue, 0, getDummyFilter(), 
source);
+    reader.start();
+    Future<WALEntryBatch> future = ForkJoinPool.commonPool().submit(() -> {
+      return reader.take();
+    });
+    // make sure that the isPeerEnabled has been called several times
+    TEST_UTIL.waitFor(30000, () -> invokeCount.get() >= 5);
+    // confirm that we can read nothing if the peer is disabled
+    assertFalse(future.isDone());
+    // then enable the peer, we should get the batch
+    enabled.set(true);
+    WALEntryBatch entryBatch = future.get();
+
+    // should've batched up our entries
+    assertNotNull(entryBatch);
+    assertEquals(3, entryBatch.getWalEntries().size());
+    assertEquals(position, entryBatch.getLastWalPosition());
+    assertEquals(walPath, entryBatch.getLastWalPath());
+    assertEquals(3, entryBatch.getNbRowKeys());
+  }
+
   private String getRow(WAL.Entry entry) {
     Cell cell = entry.getEdit().getCells().get(0);
     return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), 
cell.getRowLength());

Reply via email to