Repository: hbase
Updated Branches:
  refs/heads/branch-2 0658252ed -> 2e4c1b628


HBASE-18845 TestReplicationSmallTests fails after HBASE-14004


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2e4c1b62
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2e4c1b62
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2e4c1b62

Branch: refs/heads/branch-2
Commit: 2e4c1b62884026ba8fc2d743d33a7f9d9125393e
Parents: 0658252
Author: zhangduo <zhang...@apache.org>
Authored: Mon Sep 25 12:07:19 2017 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Fri Sep 29 14:32:33 2017 +0800

----------------------------------------------------------------------
 .../replication/TestReplicationSmallTests.java  | 115 +++++++++----------
 .../hbase/replication/TestReplicationBase.java  |   2 +-
 2 files changed, 56 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/2e4c1b62/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
----------------------------------------------------------------------
diff --git 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
index 6105a0d..28bf249 100644
--- 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
+++ 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
@@ -26,7 +26,6 @@ import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.NavigableMap;
 import java.util.TreeMap;
@@ -39,13 +38,13 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Delete;
@@ -57,10 +56,14 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.client.replication.TableCFs;
 import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
 import org.apache.hadoop.hbase.replication.regionserver.Replication;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
 import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
@@ -73,8 +76,8 @@ import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.mapreduce.Job;
 import org.junit.Before;
 import org.junit.Rule;
@@ -162,7 +165,7 @@ public class TestReplicationSmallTests extends 
TestReplicationBase {
     htable1.put(put);
 
     Get get = new Get(row);
-    get.setMaxVersions();
+    get.readAllVersions();
     for (int i = 0; i < NB_RETRIES; i++) {
       if (i==NB_RETRIES-1) {
         fail("Waited too much time for put replication");
@@ -184,7 +187,7 @@ public class TestReplicationSmallTests extends 
TestReplicationBase {
     htable1.delete(d);
 
     get = new Get(row);
-    get.setMaxVersions();
+    get.readAllVersions();
     for (int i = 0; i < NB_RETRIES; i++) {
       if (i==NB_RETRIES-1) {
         fail("Waited too much time for put replication");
@@ -327,7 +330,7 @@ public class TestReplicationSmallTests extends 
TestReplicationBase {
   public void testDisableEnable() throws Exception {
 
     // Test disabling replication
-    admin.disablePeer(PEER_ID);
+    hbaseAdmin.disableReplicationPeer(PEER_ID);
 
     byte[] rowkey = Bytes.toBytes("disable enable");
     Put put = new Put(rowkey);
@@ -346,7 +349,7 @@ public class TestReplicationSmallTests extends 
TestReplicationBase {
     }
 
     // Test enable replication
-    admin.enablePeer(PEER_ID);
+    hbaseAdmin.enableReplicationPeer(PEER_ID);
 
     for (int i = 0; i < NB_RETRIES; i++) {
       Result res = htable2.get(get);
@@ -370,7 +373,7 @@ public class TestReplicationSmallTests extends 
TestReplicationBase {
   @Test(timeout=300000)
   public void testAddAndRemoveClusters() throws Exception {
     LOG.info("testAddAndRemoveClusters");
-    admin.removePeer(PEER_ID);
+    hbaseAdmin.removeReplicationPeer(PEER_ID);
     Thread.sleep(SLEEP_TIME);
     byte[] rowKey = Bytes.toBytes("Won't be replicated");
     Put put = new Put(rowKey);
@@ -392,7 +395,7 @@ public class TestReplicationSmallTests extends 
TestReplicationBase {
     }
     ReplicationPeerConfig rpc = new ReplicationPeerConfig();
     rpc.setClusterKey(utility2.getClusterKey());
-    admin.addPeer(PEER_ID, rpc, null);
+    hbaseAdmin.addReplicationPeer(PEER_ID, rpc);
     Thread.sleep(SLEEP_TIME);
     rowKey = Bytes.toBytes("do rep");
     put = new Put(rowKey);
@@ -525,13 +528,11 @@ public class TestReplicationSmallTests extends 
TestReplicationBase {
     Table lHtable2 = null;
 
     try {
-      HTableDescriptor table = new HTableDescriptor(tableName);
-      HColumnDescriptor fam = new HColumnDescriptor(familyname);
-      fam.setMaxVersions(100);
-      fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
-      table.addFamily(fam);
+      ColumnFamilyDescriptor fam = 
ColumnFamilyDescriptorBuilder.newBuilder(familyname)
+          
.setMaxVersions(100).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build();
+      TableDescriptor table = 
TableDescriptorBuilder.newBuilder(tableName).addColumnFamily(fam).build();
       scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-      for (HColumnDescriptor f : table.getColumnFamilies()) {
+      for (ColumnFamilyDescriptor f : table.getColumnFamilies()) {
         scopes.put(f.getName(), f.getScope());
       }
 
@@ -631,7 +632,7 @@ public class TestReplicationSmallTests extends 
TestReplicationBase {
     htable1.put(put);
 
     Scan scan = new Scan();
-    scan.setMaxVersions(100);
+    scan.readVersions(100);
     ResultScanner scanner1 = htable1.getScanner(scan);
     Result[] res1 = scanner1.next(1);
     scanner1.close();
@@ -641,7 +642,7 @@ public class TestReplicationSmallTests extends 
TestReplicationBase {
 
     for (int i = 0; i < NB_RETRIES; i++) {
       scan = new Scan();
-      scan.setMaxVersions(100);
+      scan.readVersions(100);
       scanner1 = htable2.getScanner(scan);
       res1 = scanner1.next(1);
       scanner1.close();
@@ -668,7 +669,7 @@ public class TestReplicationSmallTests extends 
TestReplicationBase {
     htable2.put(put);
 
     scan = new Scan();
-    scan.setMaxVersions(100);
+    scan.readVersions(100);
     scanner1 = htable2.getScanner(scan);
     res1 = scanner1.next(NB_ROWS_IN_BATCH);
     scanner1.close();
@@ -695,7 +696,7 @@ public class TestReplicationSmallTests extends 
TestReplicationBase {
     htable1.put(put);
 
     Scan scan = new Scan();
-    scan.setMaxVersions(100);
+    scan.readVersions(100);
     ResultScanner scanner1 = htable1.getScanner(scan);
     Result[] res1 = scanner1.next(1);
     scanner1.close();
@@ -705,7 +706,7 @@ public class TestReplicationSmallTests extends 
TestReplicationBase {
 
     for (int i = 0; i < NB_RETRIES; i++) {
       scan = new Scan();
-      scan.setMaxVersions(100);
+      scan.readVersions(100);
       scanner1 = htable2.getScanner(scan);
       res1 = scanner1.next(1);
       scanner1.close();
@@ -728,13 +729,13 @@ public class TestReplicationSmallTests extends 
TestReplicationBase {
 
     try {
       // Disabling replication and modifying the particular version of the 
cell to validate the feature.
-      admin.disablePeer(PEER_ID);
+      hbaseAdmin.disableReplicationPeer(PEER_ID);
       Put put2 = new Put(Bytes.toBytes("r1"));
       put2.addColumn(famName, qualifierName, ts +2, Bytes.toBytes("v99"));
       htable2.put(put2);
 
       scan = new Scan();
-      scan.setMaxVersions(100);
+      scan.readVersions(100);
       scanner1 = htable2.getScanner(scan);
       res1 = scanner1.next(NB_ROWS_IN_BATCH);
       scanner1.close();
@@ -745,7 +746,7 @@ public class TestReplicationSmallTests extends 
TestReplicationBase {
       runVerifyReplication(args, 0, 1);
       }
     finally {
-      admin.enablePeer(PEER_ID);
+      hbaseAdmin.enableReplicationPeer(PEER_ID);
     }
   }
 
@@ -786,21 +787,20 @@ public class TestReplicationSmallTests extends 
TestReplicationBase {
 
     // Create Tables
     for (int i = 0; i < numOfTables; i++) {
-      HTableDescriptor ht = new HTableDescriptor(TableName.valueOf(tName + i));
-      HColumnDescriptor cfd = new HColumnDescriptor(colFam);
-      cfd.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
-      ht.addFamily(cfd);
-      hadmin.createTable(ht);
+      
hadmin.createTable(TableDescriptorBuilder.newBuilder(TableName.valueOf(tName + 
i))
+          
.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(colFam))
+              .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
+          .build());
     }
 
     // verify the result
-    List<HashMap<String, String>> replicationColFams = admin.listReplicated();
+    List<TableCFs> replicationColFams = hbaseAdmin.listReplicatedTableCFs();
     int[] match = new int[numOfTables]; // array of 3 with init value of zero
 
     for (int i = 0; i < replicationColFams.size(); i++) {
-      HashMap<String, String> replicationEntry = replicationColFams.get(i);
-      String tn = replicationEntry.get(ReplicationAdmin.TNAME);
-      if ((tn.startsWith(tName)) && 
replicationEntry.get(ReplicationAdmin.CFNAME).equals(colFam)) {
+      TableCFs replicationEntry = replicationColFams.get(i);
+      String tn = replicationEntry.getTable().getNameAsString();
+      if (tn.startsWith(tName) && 
replicationEntry.getColumnFamilyMap().containsKey(colFam)) {
         int m = Integer.parseInt(tn.substring(tn.length() - 1)); // get the 
last digit
         match[m]++; // should only increase once
       }
@@ -831,7 +831,7 @@ public class TestReplicationSmallTests extends 
TestReplicationBase {
     HRegion region = 
utility1.getMiniHBaseCluster().getRegions(tableName).get(0);
     RegionInfo hri = region.getRegionInfo();
     NavigableMap<byte[], Integer> scopes = new 
TreeMap<>(Bytes.BYTES_COMPARATOR);
-    for (byte[] fam : htable1.getTableDescriptor().getFamiliesKeys()) {
+    for (byte[] fam : htable1.getDescriptor().getColumnFamilyNames()) {
       scopes.put(fam, 1);
     }
     final MultiVersionConcurrencyControl mvcc = new 
MultiVersionConcurrencyControl();
@@ -918,14 +918,14 @@ public class TestReplicationSmallTests extends 
TestReplicationBase {
     Path rootDir = FSUtils.getRootDir(conf1);
     FileSystem fs = rootDir.getFileSystem(conf1);
     String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
-    SnapshotTestingUtils.createSnapshotAndValidate(utility1.getHBaseAdmin(), 
tableName,
+    SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), 
tableName,
       new String(famName), sourceSnapshotName, rootDir, fs, true);
 
     // Take target snapshot
     Path peerRootDir = FSUtils.getRootDir(conf2);
     FileSystem peerFs = peerRootDir.getFileSystem(conf2);
     String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
-    SnapshotTestingUtils.createSnapshotAndValidate(utility2.getHBaseAdmin(), 
tableName,
+    SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), 
tableName,
       new String(famName), peerSnapshotName, peerRootDir, peerFs, true);
 
     String peerFSAddress = peerFs.getUri().toString();
@@ -963,11 +963,11 @@ public class TestReplicationSmallTests extends 
TestReplicationBase {
     htable2.delete(delete);
 
     sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
-    SnapshotTestingUtils.createSnapshotAndValidate(utility1.getHBaseAdmin(), 
tableName,
+    SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), 
tableName,
       new String(famName), sourceSnapshotName, rootDir, fs, true);
 
     peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
-    SnapshotTestingUtils.createSnapshotAndValidate(utility2.getHBaseAdmin(), 
tableName,
+    SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), 
tableName,
       new String(famName), peerSnapshotName, peerRootDir, peerFs, true);
 
     args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
@@ -1006,27 +1006,23 @@ public class TestReplicationSmallTests extends 
TestReplicationBase {
       emptyWalPaths.add(emptyWalPath);
     }
 
-    // inject our empty wal into the replication queue
+    // inject our empty wal into the replication queue, and then roll the 
original wal, which
+    // enqueues a new wal behind our empty wal. We must roll the wal here as 
now we use the WAL to
+    // determine if the file being replicated currently is still opened for 
write, so just inject a
+    // new wal to the replication queue does not mean the previous file is 
closed.
     for (int i = 0; i < numRs; i++) {
-      Replication replicationService =
-          (Replication) 
utility1.getHBaseCluster().getRegionServer(i).getReplicationSourceService();
+      HRegionServer hrs = utility1.getHBaseCluster().getRegionServer(i);
+      Replication replicationService = (Replication) 
hrs.getReplicationSourceService();
       replicationService.preLogRoll(null, emptyWalPaths.get(i));
       replicationService.postLogRoll(null, emptyWalPaths.get(i));
-    }
-
-    // wait for ReplicationSource to start reading from our empty wal
-    waitForLogAdvance(numRs, emptyWalPaths, false);
-
-    // roll the original wal, which enqueues a new wal behind our empty wal
-    for (int i = 0; i < numRs; i++) {
       RegionInfo regionInfo =
           
utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
-      WAL wal = 
utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
+      WAL wal = hrs.getWAL(regionInfo);
       wal.rollWriter(true);
     }
 
     // ReplicationSource should advance past the empty wal, or else the test 
will fail
-    waitForLogAdvance(numRs, emptyWalPaths, true);
+    waitForLogAdvance(numRs);
 
     // we're now writing to the new wal
     // if everything works, the source should've stopped reading from the 
empty wal, and start
@@ -1035,26 +1031,25 @@ public class TestReplicationSmallTests extends 
TestReplicationBase {
   }
 
   /**
-   * Waits for the ReplicationSource to start reading from the given paths
+   * Waits until there is only one log(the current writing one) in the 
replication queue
    * @param numRs number of regionservers
-   * @param emptyWalPaths path for each regionserver
-   * @param invert if true, waits until ReplicationSource is NOT reading from 
the given paths
    */
-  private void waitForLogAdvance(final int numRs, final List<Path> 
emptyWalPaths,
-      final boolean invert) throws Exception {
+  private void waitForLogAdvance(int numRs) throws Exception {
     Waiter.waitFor(conf1, 10000, new Waiter.Predicate<Exception>() {
       @Override
       public boolean evaluate() throws Exception {
         for (int i = 0; i < numRs; i++) {
+          HRegionServer hrs = utility1.getHBaseCluster().getRegionServer(i);
+          RegionInfo regionInfo =
+              
utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
+          WAL wal = hrs.getWAL(regionInfo);
+          Path currentFile = ((AbstractFSWAL<?>) wal).getCurrentFileName();
           Replication replicationService = (Replication) 
utility1.getHBaseCluster()
               .getRegionServer(i).getReplicationSourceService();
           for (ReplicationSourceInterface rsi : 
replicationService.getReplicationManager()
               .getSources()) {
             ReplicationSource source = (ReplicationSource) rsi;
-            if (!invert && 
!emptyWalPaths.get(i).equals(source.getCurrentPath())) {
-              return false;
-            }
-            if (invert && 
emptyWalPaths.get(i).equals(source.getCurrentPath())) {
+            if (!currentFile.equals(source.getCurrentPath())) {
               return false;
             }
           }

http://git-wip-us.apache.org/repos/asf/hbase/blob/2e4c1b62/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index 87918ee..206b500 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -63,7 +63,7 @@ public class TestReplicationBase {
   protected static ZooKeeperWatcher zkw2;
 
   protected static ReplicationAdmin admin;
-  private static Admin hbaseAdmin;
+  protected static Admin hbaseAdmin;
 
   protected static Table htable1;
   protected static Table htable2;

Reply via email to