HBASE-20129 Add UT for serial replication checker

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8b61a061
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8b61a061
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8b61a061

Branch: refs/heads/HBASE-20046-branch-2
Commit: 8b61a061d3d8cdf9d4bd271cc6211d669e925e60
Parents: f29bf1d
Author: zhangduo <zhang...@apache.org>
Authored: Tue Mar 6 08:40:31 2018 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Mon Apr 9 15:18:44 2018 +0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/MetaTableAccessor.java  |  71 ++++++++--
 .../regionserver/SerialReplicationChecker.java  |  18 +++
 .../TestSerialReplicationChecker.java           | 133 ++++++++++++++++++-
 3 files changed, 208 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/8b61a061/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
index 109f2d0..2a88b56 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
@@ -19,12 +19,14 @@ package org.apache.hadoop.hbase;
 
 import edu.umd.cs.findbugs.annotations.NonNull;
 import edu.umd.cs.findbugs.annotations.Nullable;
+import java.io.ByteArrayOutputStream;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -35,7 +37,6 @@ import java.util.TreeMap;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell.Type;
 import org.apache.hadoop.hbase.client.Connection;
@@ -150,11 +151,13 @@ public class MetaTableAccessor {
       META_REGION_PREFIX, 0, len);
   }
 
-  private static final byte[] REPLICATION_PARENT_QUALIFIER = 
Bytes.toBytes("parent");
+  @VisibleForTesting
+  public static final byte[] REPLICATION_PARENT_QUALIFIER = 
Bytes.toBytes("parent");
+
+  private static final byte ESCAPE_BYTE = (byte) 0xFF;
 
-  private static final String REPLICATION_PARENT_SEPARATOR = "|";
+  private static final byte SEPARATED_BYTE = 0x00;
 
-  private static final String REPLICATION_PARENT_SEPARATOR_REGEX = "\\|";
   /**
    * Lists all of the table regions currently in META.
    * Deprecated, keep there until some test use this.
@@ -1921,10 +1924,51 @@ public class MetaTableAccessor {
               .build());
   }
 
+  private static void writeRegionName(ByteArrayOutputStream out, byte[] 
regionName) {
+    for (byte b : regionName) {
+      if (b == ESCAPE_BYTE) {
+        out.write(ESCAPE_BYTE);
+      }
+      out.write(b);
+    }
+  }
+
+  @VisibleForTesting
+  public static byte[] getParentsBytes(List<RegionInfo> parents) {
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    Iterator<RegionInfo> iter = parents.iterator();
+    writeRegionName(bos, iter.next().getRegionName());
+    while (iter.hasNext()) {
+      bos.write(ESCAPE_BYTE);
+      bos.write(SEPARATED_BYTE);
+      writeRegionName(bos, iter.next().getRegionName());
+    }
+    return bos.toByteArray();
+  }
+
+  private static List<byte[]> parseParentsBytes(byte[] bytes) {
+    List<byte[]> parents = new ArrayList<>();
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    for (int i = 0; i < bytes.length; i++) {
+      if (bytes[i] == ESCAPE_BYTE) {
+        i++;
+        if (bytes[i] == SEPARATED_BYTE) {
+          parents.add(bos.toByteArray());
+          bos.reset();
+          continue;
+        }
+        // fall through to append the byte
+      }
+      bos.write(bytes[i]);
+    }
+    if (bos.size() > 0) {
+      parents.add(bos.toByteArray());
+    }
+    return parents;
+  }
+
   private static void addReplicationParent(Put put, List<RegionInfo> parents) 
throws IOException {
-    byte[] value = 
parents.stream().map(RegionReplicaUtil::getRegionInfoForDefaultReplica)
-      .map(RegionInfo::getRegionNameAsString).collect(Collectors
-        .collectingAndThen(Collectors.joining(REPLICATION_PARENT_SEPARATOR), 
Bytes::toBytes));
+    byte[] value = getParentsBytes(parents);
     
put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow())
       
.setFamily(HConstants.REPLICATION_BARRIER_FAMILY).setQualifier(REPLICATION_PARENT_QUALIFIER)
       
.setTimestamp(put.getTimeStamp()).setType(Type.Put).setValue(value).build());
@@ -1995,6 +2039,14 @@ public class MetaTableAccessor {
     public List<byte[]> getParentRegionNames() {
       return parentRegionNames;
     }
+
+    @Override
+    public String toString() {
+      return "ReplicationBarrierResult [barriers=" + Arrays.toString(barriers) 
+ ", state=" +
+        state + ", parentRegionNames=" +
+        
parentRegionNames.stream().map(Bytes::toStringBinary).collect(Collectors.joining(",
 ")) +
+        "]";
+    }
   }
 
   private static long getReplicationBarrier(Cell c) {
@@ -2014,10 +2066,7 @@ public class MetaTableAccessor {
     byte[] parentRegionsBytes =
       result.getValue(HConstants.REPLICATION_BARRIER_FAMILY, 
REPLICATION_PARENT_QUALIFIER);
     List<byte[]> parentRegionNames =
-      parentRegionsBytes != null
-        ? 
Stream.of(Bytes.toString(parentRegionsBytes).split(REPLICATION_PARENT_SEPARATOR_REGEX))
-          .map(Bytes::toBytes).collect(Collectors.toList())
-        : Collections.emptyList();
+      parentRegionsBytes != null ? parseParentsBytes(parentRegionsBytes) : 
Collections.emptyList();
     return new ReplicationBarrierResult(barriers, state, parentRegionNames);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/8b61a061/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java
index 95f3868..9276359 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java
@@ -35,6 +35,8 @@ import 
org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
 import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
@@ -108,6 +110,8 @@ import 
org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
 @InterfaceAudience.Private
 class SerialReplicationChecker {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(SerialReplicationChecker.class);
+
   public static final String REPLICATION_SERIALLY_WAITING_KEY =
     "hbase.serial.replication.waiting.ms";
   public static final long REPLICATION_SERIALLY_WAITING_DEFAULT = 10000;
@@ -182,9 +186,11 @@ class SerialReplicationChecker {
     long seqId = entry.getKey().getSequenceId();
     ReplicationBarrierResult barrierResult = 
MetaTableAccessor.getReplicationBarrierResult(conn,
       entry.getKey().getTableName(), row, 
entry.getKey().getEncodedRegionName());
+    LOG.debug("Replication barrier for {}: {}", entry, barrierResult);
     long[] barriers = barrierResult.getBarriers();
     int index = Arrays.binarySearch(barriers, seqId);
     if (index == -1) {
+      LOG.debug("{} is before the first barrier, pass", entry);
       // This means we are in the range before the first record openSeqNum, 
this usually because the
       // wal is written before we enable serial replication for this table, 
just return true since
       // we can not guarantee the order.
@@ -203,22 +209,29 @@ class SerialReplicationChecker {
       // we are in the first range, check whether we have parents
       for (byte[] regionName : barrierResult.getParentRegionNames()) {
         if (!isParentFinished(regionName)) {
+          LOG.debug("Parent {} has not been finished yet for entry {}, give 
up",
+            Bytes.toStringBinary(regionName), entry);
           return false;
         }
       }
       if (isLastRangeAndOpening(barrierResult, index)) {
+        LOG.debug("{} is in the last range and the region is opening, give 
up", entry);
         return false;
       }
+      LOG.debug("{} is in the first range, pass", entry);
       recordCanPush(encodedNameAsString, seqId, barriers, 1);
       return true;
     }
     // check whether the previous range is finished
     if (!isRangeFinished(barriers[index - 1], encodedNameAsString)) {
+      LOG.debug("Previous range for {} has not been finished yet, give up", 
entry);
       return false;
     }
     if (isLastRangeAndOpening(barrierResult, index)) {
+      LOG.debug("{} is in the last range and the region is opening, give up", 
entry);
       return false;
     }
+    LOG.debug("The previous range for {} has been finished, pass", entry);
     recordCanPush(encodedNameAsString, seqId, barriers, index);
     return true;
   }
@@ -229,8 +242,11 @@ class SerialReplicationChecker {
     Long canReplicateUnderSeqId = 
canPushUnder.getIfPresent(encodedNameAsString);
     if (canReplicateUnderSeqId != null) {
       if (seqId < canReplicateUnderSeqId.longValue()) {
+        LOG.trace("{} is before the end barrier {}, pass", entry, 
canReplicateUnderSeqId);
         return true;
       }
+      LOG.debug("{} is beyond the previous end barrier {}, remove from cache", 
entry,
+        canReplicateUnderSeqId);
       // we are already beyond the last safe point, remove
       canPushUnder.invalidate(encodedNameAsString);
     }
@@ -239,6 +255,7 @@ class SerialReplicationChecker {
     // has been moved to another RS and then back, so we need to check the 
barrier.
     MutableLong previousPushedSeqId = pushed.getUnchecked(encodedNameAsString);
     if (seqId == previousPushedSeqId.longValue() + 1) {
+      LOG.trace("The sequence id for {} is continuous, pass");
       previousPushedSeqId.increment();
       return true;
     }
@@ -249,6 +266,7 @@ class SerialReplicationChecker {
       throws IOException, InterruptedException {
     byte[] row = CellUtil.cloneRow(firstCellInEdit);
     while (!canPush(entry, row)) {
+      LOG.debug("Can not push{}, wait", entry);
       Thread.sleep(waitTimeMs);
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8b61a061/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
index c8387c5..58e9543 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
@@ -19,10 +19,16 @@ package org.apache.hadoop.hbase.replication.regionserver;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.Cell.Type;
 import org.apache.hadoop.hbase.CellBuilderFactory;
@@ -30,8 +36,10 @@ import org.apache.hadoop.hbase.CellBuilderType;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
@@ -54,6 +62,8 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
 
@@ -72,6 +82,8 @@ public class TestSerialReplicationChecker {
 
   private static String WAL_FILE_NAME = "test.wal";
 
+  private Connection conn;
+
   private SerialReplicationChecker checker;
 
   @Rule
@@ -98,8 +110,18 @@ public class TestSerialReplicationChecker {
     ReplicationSource source = mock(ReplicationSource.class);
     when(source.getPeerId()).thenReturn(PEER_ID);
     when(source.getQueueStorage()).thenReturn(QUEUE_STORAGE);
+    conn = mock(Connection.class);
+    when(conn.isClosed()).thenReturn(false);
+    doAnswer(new Answer<Table>() {
+
+      @Override
+      public Table answer(InvocationOnMock invocation) throws Throwable {
+        return UTIL.getConnection().getTable((TableName) 
invocation.getArgument(0));
+      }
+
+    }).when(conn).getTable(any(TableName.class));
     Server server = mock(Server.class);
-    when(server.getConnection()).thenReturn(UTIL.getConnection());
+    when(server.getConnection()).thenReturn(conn);
     when(source.getServer()).thenReturn(server);
     checker = new SerialReplicationChecker(UTIL.getConfiguration(), source);
     tableName = TableName.valueOf(name.getMethodName());
@@ -129,8 +151,10 @@ public class TestSerialReplicationChecker {
   private void addStateAndBarrier(RegionInfo region, RegionState.State state, 
long... barriers)
       throws IOException {
     Put put = new Put(region.getRegionName(), 
EnvironmentEdgeManager.currentTime());
-    put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER,
-      Bytes.toBytes(state.name()));
+    if (state != null) {
+      put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER,
+        Bytes.toBytes(state.name()));
+    }
     for (int i = 0; i < barriers.length; i++) {
       put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, 
HConstants.SEQNUM_QUALIFIER,
         put.getTimeStamp() - i, Bytes.toBytes(barriers[i]));
@@ -154,6 +178,15 @@ public class TestSerialReplicationChecker {
       PEER_ID, WAL_FILE_NAME, 10, ImmutableMap.of(region.getEncodedName(), 
seqId));
   }
 
+  private void addParents(RegionInfo region, List<RegionInfo> parents) throws 
IOException {
+    Put put = new Put(region.getRegionName(), 
EnvironmentEdgeManager.currentTime());
+    put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY,
+      MetaTableAccessor.REPLICATION_PARENT_QUALIFIER, 
MetaTableAccessor.getParentsBytes(parents));
+    try (Table table = 
UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
+      table.put(put);
+    }
+  }
+
   @Test
   public void testLastRegionAndOpeningCanNotPush() throws IOException, 
ReplicationException {
     RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
@@ -173,4 +206,98 @@ public class TestSerialReplicationChecker {
     setState(region, RegionState.State.OPENING);
     assertFalse(checker.canPush(createEntry(region, 104), cell));
   }
+
+  @Test
+  public void testCanPushUnder() throws IOException, ReplicationException {
+    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
+    addStateAndBarrier(region, RegionState.State.OPEN, 10, 100);
+    updatePushedSeqId(region, 9);
+    Cell cell = createCell(region);
+    assertTrue(checker.canPush(createEntry(region, 20), cell));
+    verify(conn, times(1)).getTable(any(TableName.class));
+    // not continuous
+    for (int i = 22; i < 100; i += 2) {
+      assertTrue(checker.canPush(createEntry(region, i), cell));
+    }
+    // verify that we do not go to meta table
+    verify(conn, times(1)).getTable(any(TableName.class));
+  }
+
+  @Test
+  public void testCanPushIfContinuous() throws IOException, 
ReplicationException {
+    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
+    addStateAndBarrier(region, RegionState.State.OPEN, 10);
+    updatePushedSeqId(region, 9);
+    Cell cell = createCell(region);
+    assertTrue(checker.canPush(createEntry(region, 20), cell));
+    verify(conn, times(1)).getTable(any(TableName.class));
+    // continuous
+    for (int i = 21; i < 100; i++) {
+      assertTrue(checker.canPush(createEntry(region, i), cell));
+    }
+    // verify that we do not go to meta table
+    verify(conn, times(1)).getTable(any(TableName.class));
+  }
+
+  @Test
+  public void testCanPushAfterMerge() throws IOException, ReplicationException 
{
+    // 0xFF is the escape byte when storing region name so let's make sure it 
can work.
+    byte[] endKey = new byte[] { (byte) 0xFF, 0x00, (byte) 0xFF, (byte) 0xFF, 
0x01 };
+    RegionInfo regionA =
+      
RegionInfoBuilder.newBuilder(tableName).setEndKey(endKey).setRegionId(1).build();
+    RegionInfo regionB =
+      
RegionInfoBuilder.newBuilder(tableName).setStartKey(endKey).setRegionId(2).build();
+    RegionInfo region = 
RegionInfoBuilder.newBuilder(tableName).setRegionId(3).build();
+    addStateAndBarrier(regionA, null, 10, 100);
+    addStateAndBarrier(regionB, null, 20, 200);
+    addStateAndBarrier(region, RegionState.State.OPEN, 200);
+    addParents(region, Arrays.asList(regionA, regionB));
+    Cell cell = createCell(region);
+    // can not push since both parents have not been finished yet
+    assertFalse(checker.canPush(createEntry(region, 300), cell));
+    updatePushedSeqId(regionB, 199);
+    // can not push since regionA has not been finished yet
+    assertFalse(checker.canPush(createEntry(region, 300), cell));
+    updatePushedSeqId(regionA, 99);
+    // can push since all parents have been finished
+    assertTrue(checker.canPush(createEntry(region, 300), cell));
+  }
+
+  @Test
+  public void testCanPushAfterSplit() throws IOException, ReplicationException 
{
+    // 0xFF is the escape byte when storing region name so let's make sure it 
can work.
+    byte[] endKey = new byte[] { (byte) 0xFF, 0x00, (byte) 0xFF, (byte) 0xFF, 
0x01 };
+    RegionInfo region = 
RegionInfoBuilder.newBuilder(tableName).setRegionId(1).build();
+    RegionInfo regionA =
+      
RegionInfoBuilder.newBuilder(tableName).setEndKey(endKey).setRegionId(2).build();
+    RegionInfo regionB =
+      
RegionInfoBuilder.newBuilder(tableName).setStartKey(endKey).setRegionId(3).build();
+    addStateAndBarrier(region, null, 10, 100);
+    addStateAndBarrier(regionA, RegionState.State.OPEN, 100, 200);
+    addStateAndBarrier(regionB, RegionState.State.OPEN, 100, 300);
+    addParents(regionA, Arrays.asList(region));
+    addParents(regionB, Arrays.asList(region));
+    Cell cellA = createCell(regionA);
+    Cell cellB = createCell(regionB);
+    // can not push since parent has not been finished yet
+    assertFalse(checker.canPush(createEntry(regionA, 150), cellA));
+    assertFalse(checker.canPush(createEntry(regionB, 200), cellB));
+    updatePushedSeqId(region, 99);
+    // can push since parent has been finished
+    assertTrue(checker.canPush(createEntry(regionA, 150), cellA));
+    assertTrue(checker.canPush(createEntry(regionB, 200), cellB));
+  }
+
+  @Test
+  public void testCanPushEqualsToBarrier() throws IOException, 
ReplicationException {
+    // For binary search, equals to an element will result to a positive 
value, let's test whether
+    // it works.
+    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
+    addStateAndBarrier(region, RegionState.State.OPEN, 10, 100);
+    Cell cell = createCell(region);
+    assertTrue(checker.canPush(createEntry(region, 10), cell));
+    assertFalse(checker.canPush(createEntry(region, 100), cell));
+    updatePushedSeqId(region, 99);
+    assertTrue(checker.canPush(createEntry(region, 100), cell));
+  }
 }

Reply via email to