HBASE-20125 Add UT for serial replication after region split and merge

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6b541275
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6b541275
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6b541275

Branch: refs/heads/branch-2
Commit: 6b5412759fdc2accc15f2fb4dd3ed0138f34e555
Parents: 8b61a06
Author: zhangduo <zhang...@apache.org>
Authored: Tue Mar 6 21:31:05 2018 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Mon Apr 9 15:18:44 2018 +0800

----------------------------------------------------------------------
 .../hbase/replication/WALEntryFilter.java       |  17 +-
 .../regionserver/ReplicationSourceShipper.java  |   4 +-
 .../ReplicationSourceWALReader.java             |  54 +++--
 .../regionserver/WALEntryStream.java            |  73 ++++---
 .../replication/TestSerialReplication.java      | 200 ++++++++++++++++---
 5 files changed, 270 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/6b541275/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
index 417f868..cd3f1bd 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hbase.replication;
 
 import org.apache.yetus.audience.InterfaceAudience;
@@ -35,12 +34,20 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
  */
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
 public interface WALEntryFilter {
+
   /**
-   * Applies the filter, possibly returning a different Entry instance.
-   * If null is returned, the entry will be skipped.
+   * <p>
+   * Applies the filter, possibly returning a different Entry instance. If 
null is returned, the
+   * entry will be skipped.
+   * </p>
+   * <p>
+   * Notice that you are free to modify the cell list of the give entry, but 
do not change the
+   * content of the cell, it may be used by others at the same time(and 
usually you can not modify a
+   * cell unless you cast it to the implementation class, which is not a good 
idea).
+   * </p>
    * @param entry Entry to filter
-   * @return a (possibly modified) Entry to use. Returning null or an entry 
with
-   * no cells will cause the entry to be skipped for replication.
+   * @return a (possibly modified) Entry to use. Returning null or an entry 
with no cells will cause
+   *         the entry to be skipped for replication.
    */
   public Entry filter(Entry entry);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6b541275/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index d207d77..50aaf95 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -120,7 +120,7 @@ public class ReplicationSourceShipper extends Thread {
   /**
    * Do the shipping logic
    */
-  protected void shipEdits(WALEntryBatch entryBatch) {
+  protected final void shipEdits(WALEntryBatch entryBatch) {
     List<Entry> entries = entryBatch.getWalEntries();
     long lastReadPosition = entryBatch.getLastWalPosition();
     currentPath = entryBatch.getLastWalPath();
@@ -253,7 +253,7 @@ public class ReplicationSourceShipper extends Thread {
     return 0;
   }
 
-  protected boolean isActive() {
+  protected final boolean isActive() {
     return source.isSourceActive() && state == WorkerState.RUNNING && 
!isInterrupted();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/6b541275/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index fe87aec..ad3baaf 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -71,6 +71,13 @@ public class ReplicationSourceWALReader extends Thread {
   private final int maxRetriesMultiplier;
   private final boolean eofAutoRecovery;
 
+  // used to store the first cell in an entry before filtering. This is 
because that if serial
+  // replication is enabled, we may find out that an entry can not be pushed 
after filtering. And
+  // when we try the next time, the cells maybe null since the entry has 
already been filtered,
+  // especially for region event wal entries. And this can also used to 
determine whether we can
+  // skip filtering.
+  private Cell firstCellInEntryBeforeFiltering;
+
   //Indicates whether this particular worker is running
   private boolean isReaderRunning = true;
 
@@ -162,37 +169,52 @@ public class ReplicationSourceWALReader extends Thread {
     }
   }
 
+  private void removeEntryFromStream(WALEntryStream entryStream, WALEntryBatch 
batch)
+      throws IOException {
+    entryStream.next();
+    firstCellInEntryBeforeFiltering = null;
+    batch.setLastWalPosition(entryStream.getPosition());
+  }
+
   private WALEntryBatch readWALEntries(WALEntryStream entryStream)
       throws IOException, InterruptedException {
     if (!entryStream.hasNext()) {
       return null;
     }
+    long positionBefore = entryStream.getPosition();
     WALEntryBatch batch =
       new WALEntryBatch(replicationBatchCountCapacity, 
entryStream.getCurrentPath());
     do {
       Entry entry = entryStream.peek();
-      batch.setLastWalPosition(entryStream.getPosition());
       boolean hasSerialReplicationScope = 
entry.getKey().hasSerialReplicationScope();
-      // Used to locate the region record in meta table. In WAL we only have 
the table name and
-      // encoded region name which can not be mapping to region name without 
scanning all the
-      // records for a table, so we need a start key, just like what we have 
done at client side
-      // when locating a region. For the markers, we will use the start key of 
the region as the row
-      // key for the edit. And we need to do this before filtering since all 
the cells may be
-      // filtered out, especially that for the markers.
-      Cell firstCellInEdit = null;
+      boolean doFiltering = true;
       if (hasSerialReplicationScope) {
-        assert !entry.getEdit().isEmpty() : "should not write empty edits";
-        firstCellInEdit = entry.getEdit().getCells().get(0);
+        if (firstCellInEntryBeforeFiltering == null) {
+          assert !entry.getEdit().isEmpty() : "should not write empty edits";
+          // Used to locate the region record in meta table. In WAL we only 
have the table name and
+          // encoded region name which can not be mapping to region name 
without scanning all the
+          // records for a table, so we need a start key, just like what we 
have done at client side
+          // when locating a region. For the markers, we will use the start 
key of the region as the
+          // row key for the edit. And we need to do this before filtering 
since all the cells may
+          // be filtered out, especially that for the markers.
+          firstCellInEntryBeforeFiltering = entry.getEdit().getCells().get(0);
+        } else {
+          // if this is not null then we know that the entry has already been 
filtered.
+          doFiltering = false;
+        }
+      }
+
+      if (doFiltering) {
+        entry = filterEntry(entry);
       }
-      entry = filterEntry(entry);
       if (entry != null) {
         if (hasSerialReplicationScope) {
-          if (!serialReplicationChecker.canPush(entry, firstCellInEdit)) {
-            if (batch.getNbEntries() > 0) {
+          if (!serialReplicationChecker.canPush(entry, 
firstCellInEntryBeforeFiltering)) {
+            if (batch.getLastWalPosition() > positionBefore) {
               // we have something that can push, break
               break;
             } else {
-              serialReplicationChecker.waitUntilCanPush(entry, 
firstCellInEdit);
+              serialReplicationChecker.waitUntilCanPush(entry, 
firstCellInEntryBeforeFiltering);
             }
           }
           // arrive here means we can push the entry, record the last sequence 
id
@@ -200,7 +222,7 @@ public class ReplicationSourceWALReader extends Thread {
             entry.getKey().getSequenceId());
         }
         // actually remove the entry.
-        entryStream.next();
+        removeEntryFromStream(entryStream, batch);
         WALEdit edit = entry.getEdit();
         if (edit != null && !edit.isEmpty()) {
           long entrySize = getEntrySize(entry);
@@ -215,7 +237,7 @@ public class ReplicationSourceWALReader extends Thread {
         }
       } else {
         // actually remove the entry.
-        entryStream.next();
+        removeEntryFromStream(entryStream, batch);
       }
     } while (entryStream.hasNext());
     return batch;

http://git-wip-us.apache.org/repos/asf/hbase/blob/6b541275/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index bcab9b4..c639a48 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -56,8 +56,12 @@ class WALEntryStream implements Closeable {
   private Path currentPath;
   // cache of next entry for hasNext()
   private Entry currentEntry;
+  // position for the current entry. As now we support peek, which means that 
the upper layer may
+  // choose to return before reading the current entry, so it is not safe to 
return the value below
+  // in getPosition.
+  private long currentPositionOfEntry = 0;
   // position after reading current entry
-  private long currentPosition = 0;
+  private long currentPositionOfReader = 0;
   private final PriorityBlockingQueue<Path> logQueue;
   private final FileSystem fs;
   private final Configuration conf;
@@ -82,7 +86,7 @@ class WALEntryStream implements Closeable {
     this.logQueue = logQueue;
     this.fs = fs;
     this.conf = conf;
-    this.currentPosition = startPosition;
+    this.currentPositionOfEntry = startPosition;
     this.walFileLengthProvider = walFileLengthProvider;
     this.serverName = serverName;
     this.metrics = metrics;
@@ -110,6 +114,7 @@ class WALEntryStream implements Closeable {
    */
   public Entry next() throws IOException {
     Entry save = peek();
+    currentPositionOfEntry = currentPositionOfReader;
     currentEntry = null;
     return save;
   }
@@ -126,7 +131,7 @@ class WALEntryStream implements Closeable {
    * @return the position of the last Entry returned by next()
    */
   public long getPosition() {
-    return currentPosition;
+    return currentPositionOfEntry;
   }
 
   /**
@@ -140,7 +145,7 @@ class WALEntryStream implements Closeable {
     StringBuilder sb = new StringBuilder();
     if (currentPath != null) {
       sb.append("currently replicating from: ").append(currentPath).append(" 
at position: ")
-          .append(currentPosition).append("\n");
+          .append(currentPositionOfEntry).append("\n");
     } else {
       sb.append("no replication ongoing, waiting for new log");
     }
@@ -159,7 +164,7 @@ class WALEntryStream implements Closeable {
   }
 
   private void setPosition(long position) {
-    currentPosition = position;
+    currentPositionOfEntry = position;
   }
 
   private void setCurrentPath(Path path) {
@@ -168,19 +173,19 @@ class WALEntryStream implements Closeable {
 
   private void tryAdvanceEntry() throws IOException {
     if (checkReader()) {
-      boolean beingWritten = readNextEntryAndSetPosition();
+      boolean beingWritten = readNextEntryAndRecordReaderPosition();
       if (currentEntry == null && !beingWritten) {
         // no more entries in this log file, and the file is already closed, 
i.e, rolled
         // Before dequeueing, we should always get one more attempt at reading.
         // This is in case more entries came in after we opened the reader, 
and the log is rolled
         // while we were reading. See HBASE-6758
         resetReader();
-        readNextEntryAndSetPosition();
+        readNextEntryAndRecordReaderPosition();
         if (currentEntry == null) {
           if (checkAllBytesParsed()) { // now we're certain we're done with 
this log file
             dequeueCurrentLog();
             if (openNextLog()) {
-              readNextEntryAndSetPosition();
+              readNextEntryAndRecordReaderPosition();
             }
           }
         }
@@ -201,45 +206,49 @@ class WALEntryStream implements Closeable {
     try {
       stat = fs.getFileStatus(this.currentPath);
     } catch (IOException exception) {
-      LOG.warn("Couldn't get file length information about log " + 
this.currentPath + ", it "
-          + (trailerSize < 0 ? "was not" : "was") + " closed cleanly " + 
getCurrentPathStat());
+      LOG.warn("Couldn't get file length information about log {}, it {} 
closed cleanly {}",
+        currentPath, trailerSize < 0 ? "was not" : "was", 
getCurrentPathStat());
       metrics.incrUnknownFileLengthForClosedWAL();
     }
+    // Here we use currentPositionOfReader instead of currentPositionOfEntry.
+    // We only call this method when currentEntry is null so usually they are 
the same, but there
+    // are two exceptions. One is we have nothing in the file but only a 
header, in this way
+    // the currentPositionOfEntry will always be 0 since we have no change to 
update it. The other
+    // is that we reach the end of file, then currentPositionOfEntry will 
point to the tail of the
+    // last valid entry, and the currentPositionOfReader will usually point to 
the end of the file.
     if (stat != null) {
       if (trailerSize < 0) {
-        if (currentPosition < stat.getLen()) {
-          final long skippedBytes = stat.getLen() - currentPosition;
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Reached the end of WAL file '" + currentPath
-                + "'. It was not closed cleanly, so we did not parse " + 
skippedBytes
-                + " bytes of data. This is normally ok.");
-          }
+        if (currentPositionOfReader < stat.getLen()) {
+          final long skippedBytes = stat.getLen() - currentPositionOfReader;
+          LOG.debug(
+            "Reached the end of WAL file '{}'. It was not closed cleanly," +
+              " so we did not parse {} bytes of data. This is normally ok.",
+            currentPath, skippedBytes);
           metrics.incrUncleanlyClosedWALs();
           metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
         }
-      } else if (currentPosition + trailerSize < stat.getLen()) {
-        LOG.warn("Processing end of WAL file '" + currentPath + "'. At 
position " + currentPosition
-            + ", which is too far away from reported file length " + 
stat.getLen()
-            + ". Restarting WAL reading (see HBASE-15983 for details). " + 
getCurrentPathStat());
+      } else if (currentPositionOfReader + trailerSize < stat.getLen()) {
+        LOG.warn(
+          "Processing end of WAL file '{}'. At position {}, which is too far 
away from" +
+            " reported file length {}. Restarting WAL reading (see HBASE-15983 
for details). {}",
+          currentPath, currentPositionOfReader, stat.getLen(), 
getCurrentPathStat());
         setPosition(0);
         resetReader();
         metrics.incrRestartedWALReading();
-        metrics.incrRepeatedFileBytes(currentPosition);
+        metrics.incrRepeatedFileBytes(currentPositionOfReader);
         return false;
       }
     }
     if (LOG.isTraceEnabled()) {
-      LOG.trace("Reached the end of log " + this.currentPath + ", and the 
length of the file is "
-          + (stat == null ? "N/A" : stat.getLen()));
+      LOG.trace("Reached the end of log " + this.currentPath + ", and the 
length of the file is " +
+        (stat == null ? "N/A" : stat.getLen()));
     }
     metrics.incrCompletedWAL();
     return true;
   }
 
   private void dequeueCurrentLog() throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Reached the end of log " + currentPath);
-    }
+    LOG.debug("Reached the end of log {}", currentPath);
     closeReader();
     logQueue.remove();
     setPosition(0);
@@ -249,7 +258,7 @@ class WALEntryStream implements Closeable {
   /**
    * Returns whether the file is opened for writing.
    */
-  private boolean readNextEntryAndSetPosition() throws IOException {
+  private boolean readNextEntryAndRecordReaderPosition() throws IOException {
     Entry readEntry = reader.next();
     long readerPos = reader.getPosition();
     OptionalLong fileLength = 
walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath);
@@ -265,10 +274,10 @@ class WALEntryStream implements Closeable {
     }
     if (readEntry != null) {
       metrics.incrLogEditsRead();
-      metrics.incrLogReadInBytes(readerPos - currentPosition);
+      metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry);
     }
     currentEntry = readEntry; // could be null
-    setPosition(readerPos);
+    this.currentPositionOfReader = readerPos;
     return fileLength.isPresent();
   }
 
@@ -401,8 +410,8 @@ class WALEntryStream implements Closeable {
   }
 
   private void seek() throws IOException {
-    if (currentPosition != 0) {
-      reader.seek(currentPosition);
+    if (currentPositionOfEntry != 0) {
+      reader.seek(currentPositionOfEntry);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/6b541275/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
index 1408cf0..9d8e7fe 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
@@ -18,11 +18,18 @@
 package org.apache.hadoop.hbase.replication;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -40,6 +47,7 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import 
org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALFactory;
@@ -124,6 +132,8 @@ public class TestSerialReplication {
   public static void setUpBeforeClass() throws Exception {
     UTIL.getConfiguration().setInt("replication.source.nb.capacity", 10);
     UTIL.startMiniCluster(3);
+    // disable balancer
+    UTIL.getAdmin().balancerSwitch(false, true);
     LOG_DIR = UTIL.getDataTestDirOnTestFS("replicated");
     FS = UTIL.getTestFileSystem();
     FS.mkdirs(LOG_DIR);
@@ -141,7 +151,6 @@ public class TestSerialReplication {
 
   @Before
   public void setUp() throws IOException, StreamLacksCapabilityException {
-    UTIL.ensureSomeRegionServersAvailable(3);
     logPath = new Path(LOG_DIR, name.getMethodName());
     WRITER = WALFactory.createWALWriter(FS, logPath, UTIL.getConfiguration());
     // add in disable state, so later when enabling it all sources will start 
push together.
@@ -152,35 +161,41 @@ public class TestSerialReplication {
   }
 
   @After
-  public void tearDown() throws IOException {
+  public void tearDown() throws Exception {
     UTIL.getAdmin().removeReplicationPeer(PEER_ID);
+    for (RegionServerThread t : 
UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()) {
+      t.getRegionServer().getWalRoller().requestRollAll();
+    }
+    UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
+
+      @Override
+      public boolean evaluate() throws Exception {
+        return UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().stream()
+          .map(t -> 
t.getRegionServer()).allMatch(HRegionServer::walRollRequestFinished);
+      }
+
+      @Override
+      public String explainFailure() throws Exception {
+        return "Log roll has not finished yet";
+      }
+    });
+    for (RegionServerThread t : 
UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()) {
+      t.getRegionServer().getWalRoller().requestRollAll();
+    }
     if (WRITER != null) {
       WRITER.close();
       WRITER = null;
     }
   }
 
-  @Test
-  public void testRegionMove() throws Exception {
-    TableName tableName = TableName.valueOf(name.getMethodName());
-    UTIL.getAdmin().createTable(
-      
TableDescriptorBuilder.newBuilder(tableName).addColumnFamily(ColumnFamilyDescriptorBuilder
-        
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_SERIAL).build()).build());
-    UTIL.waitTableAvailable(tableName);
-    try (Table table = UTIL.getConnection().getTable(tableName)) {
-      for (int i = 0; i < 100; i++) {
-        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, 
Bytes.toBytes(i)));
-      }
-    }
-    RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
-    HRegionServer rs = 
UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName));
+  private void moveRegion(RegionInfo region, HRegionServer rs) throws 
Exception {
     UTIL.getAdmin().move(region.getEncodedNameAsBytes(),
       Bytes.toBytes(rs.getServerName().getServerName()));
     UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
 
       @Override
       public boolean evaluate() throws Exception {
-        return !rs.getRegions(tableName).isEmpty();
+        return rs.getRegion(region.getEncodedName()) != null;
       }
 
       @Override
@@ -188,11 +203,9 @@ public class TestSerialReplication {
         return region + " is still not on " + rs;
       }
     });
-    try (Table table = UTIL.getConnection().getTable(tableName)) {
-      for (int i = 100; i < 200; i++) {
-        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, 
Bytes.toBytes(i)));
-      }
-    }
+  }
+
+  private void enablePeerAndWaitUntilReplicationDone(int expectedEntries) 
throws Exception {
     UTIL.getAdmin().enableReplicationPeer(PEER_ID);
     UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
 
@@ -203,7 +216,7 @@ public class TestSerialReplication {
           while (reader.next() != null) {
             count++;
           }
-          return count >= 200;
+          return count >= expectedEntries;
         } catch (IOException e) {
           return false;
         }
@@ -214,6 +227,29 @@ public class TestSerialReplication {
         return "Not enough entries replicated";
       }
     });
+  }
+
+  @Test
+  public void testRegionMove() throws Exception {
+    TableName tableName = TableName.valueOf(name.getMethodName());
+    UTIL.getAdmin().createTable(
+      
TableDescriptorBuilder.newBuilder(tableName).addColumnFamily(ColumnFamilyDescriptorBuilder
+        
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_SERIAL).build()).build());
+    UTIL.waitTableAvailable(tableName);
+    try (Table table = UTIL.getConnection().getTable(tableName)) {
+      for (int i = 0; i < 100; i++) {
+        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, 
Bytes.toBytes(i)));
+      }
+    }
+    RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
+    HRegionServer rs = 
UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName));
+    moveRegion(region, rs);
+    try (Table table = UTIL.getConnection().getTable(tableName)) {
+      for (int i = 100; i < 200; i++) {
+        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, 
Bytes.toBytes(i)));
+      }
+    }
+    enablePeerAndWaitUntilReplicationDone(200);
     try (WAL.Reader reader =
       WALFactory.createReader(UTIL.getTestFileSystem(), logPath, 
UTIL.getConfiguration())) {
       long seqId = -1L;
@@ -231,4 +267,122 @@ public class TestSerialReplication {
       assertEquals(200, count);
     }
   }
+
+  @Test
+  public void testRegionSplit() throws Exception {
+    TableName tableName = TableName.valueOf(name.getMethodName());
+    UTIL.getAdmin().createTable(
+      
TableDescriptorBuilder.newBuilder(tableName).addColumnFamily(ColumnFamilyDescriptorBuilder
+        
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_SERIAL).build()).build());
+    UTIL.waitTableAvailable(tableName);
+    try (Table table = UTIL.getConnection().getTable(tableName)) {
+      for (int i = 0; i < 100; i++) {
+        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, 
Bytes.toBytes(i)));
+      }
+    }
+    UTIL.flush(tableName);
+    RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
+    UTIL.getAdmin().splitRegionAsync(region.getEncodedNameAsBytes(), 
Bytes.toBytes(50)).get(30,
+      TimeUnit.SECONDS);
+    UTIL.waitUntilNoRegionsInTransition(30000);
+    List<RegionInfo> regions = UTIL.getAdmin().getRegions(tableName);
+    assertEquals(2, regions.size());
+    try (Table table = UTIL.getConnection().getTable(tableName)) {
+      for (int i = 0; i < 100; i++) {
+        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, 
Bytes.toBytes(i)));
+      }
+    }
+    enablePeerAndWaitUntilReplicationDone(200);
+    Map<String, Long> regionsToSeqId = new HashMap<>();
+    regionsToSeqId.put(region.getEncodedName(), -1L);
+    regions.stream().map(RegionInfo::getEncodedName).forEach(n -> 
regionsToSeqId.put(n, -1L));
+    try (WAL.Reader reader =
+      WALFactory.createReader(UTIL.getTestFileSystem(), logPath, 
UTIL.getConfiguration())) {
+      int count = 0;
+      for (Entry entry;;) {
+        entry = reader.next();
+        if (entry == null) {
+          break;
+        }
+        String encodedName = 
Bytes.toString(entry.getKey().getEncodedRegionName());
+        Long seqId = regionsToSeqId.get(encodedName);
+        assertNotNull(
+          "Unexcepted entry " + entry + ", expected regions " + region + ", or 
" + regions, seqId);
+        assertTrue("Sequence id go backwards from " + seqId + " to " +
+          entry.getKey().getSequenceId() + " for " + encodedName,
+          entry.getKey().getSequenceId() >= seqId.longValue());
+        if (count < 100) {
+          assertEquals(encodedName + " is pushed before parent " + 
region.getEncodedName(),
+            region.getEncodedName(), encodedName);
+        } else {
+          assertNotEquals(region.getEncodedName(), encodedName);
+        }
+        count++;
+      }
+      assertEquals(200, count);
+    }
+  }
+
+  @Test
+  public void testRegionMerge() throws Exception {
+    byte[] splitKey = Bytes.toBytes(50);
+    TableName tableName = TableName.valueOf(name.getMethodName());
+    UTIL.getAdmin().createTable(
+      TableDescriptorBuilder.newBuilder(tableName)
+        .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF)
+          .setScope(HConstants.REPLICATION_SCOPE_SERIAL).build())
+        .build(),
+      new byte[][] { splitKey });
+    UTIL.waitTableAvailable(tableName);
+    try (Table table = UTIL.getConnection().getTable(tableName)) {
+      for (int i = 0; i < 100; i++) {
+        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, 
Bytes.toBytes(i)));
+      }
+    }
+    List<RegionInfo> regions = UTIL.getAdmin().getRegions(tableName);
+    UTIL.getAdmin()
+      .mergeRegionsAsync(
+        
regions.stream().map(RegionInfo::getEncodedNameAsBytes).toArray(byte[][]::new), 
false)
+      .get(30, TimeUnit.SECONDS);
+    UTIL.waitUntilNoRegionsInTransition(30000);
+    List<RegionInfo> regionsAfterMerge = UTIL.getAdmin().getRegions(tableName);
+    assertEquals(1, regionsAfterMerge.size());
+    try (Table table = UTIL.getConnection().getTable(tableName)) {
+      for (int i = 0; i < 100; i++) {
+        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, 
Bytes.toBytes(i)));
+      }
+    }
+    enablePeerAndWaitUntilReplicationDone(200);
+    Map<String, Long> regionsToSeqId = new HashMap<>();
+    RegionInfo region = regionsAfterMerge.get(0);
+    regionsToSeqId.put(region.getEncodedName(), -1L);
+    regions.stream().map(RegionInfo::getEncodedName).forEach(n -> 
regionsToSeqId.put(n, -1L));
+    try (WAL.Reader reader =
+      WALFactory.createReader(UTIL.getTestFileSystem(), logPath, 
UTIL.getConfiguration())) {
+      int count = 0;
+      for (Entry entry;;) {
+        entry = reader.next();
+        if (entry == null) {
+          break;
+        }
+        String encodedName = 
Bytes.toString(entry.getKey().getEncodedRegionName());
+        Long seqId = regionsToSeqId.get(encodedName);
+        assertNotNull(
+          "Unexcepted entry " + entry + ", expected regions " + region + ", or 
" + regions, seqId);
+        assertTrue("Sequence id go backwards from " + seqId + " to " +
+          entry.getKey().getSequenceId() + " for " + encodedName,
+          entry.getKey().getSequenceId() >= seqId.longValue());
+        if (count < 100) {
+          assertNotEquals(
+            encodedName + " is pushed before parents " +
+              
regions.stream().map(RegionInfo::getEncodedName).collect(Collectors.joining(" 
and ")),
+            region.getEncodedName(), encodedName);
+        } else {
+          assertEquals(region.getEncodedName(), encodedName);
+        }
+        count++;
+      }
+      assertEquals(200, count);
+    }
+  }
 }

Reply via email to