Author: nkeywal
Date: Mon Apr  7 13:54:27 2014
New Revision: 1585484

URL: http://svn.apache.org/r1585484
Log:
HBASE-10817 Add some tests on a real cluster for replica: multi master, 
replication

Added:
    
hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
Modified:
    
hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java

Added: 
hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java?rev=1585484&view=auto
==============================================================================
--- 
hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
 (added)
+++ 
hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
 Mon Apr  7 13:54:27 2014
@@ -0,0 +1,371 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
+import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+@Category(MediumTests.class)
+public class TestReplicaWithCluster {
+  private static final Log LOG = 
LogFactory.getLog(TestReplicaWithCluster.class);
+
+  private static final int NB_SERVERS = 2;
+  private static final byte[] row = 
TestReplicaWithCluster.class.getName().getBytes();
+  private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
+  private static final byte[] f = HConstants.CATALOG_FAMILY;
+
+  private final static int REFRESH_PERIOD = 1000;
+
+  /**
+   * This copro is used to synchronize the tests.
+   */
+  public static class SlowMeCopro extends BaseRegionObserver {
+    static final AtomicLong sleepTime = new AtomicLong(0);
+    static final AtomicReference<CountDownLatch> cdl =
+        new AtomicReference<CountDownLatch>(new CountDownLatch(0));
+
+    public SlowMeCopro() {
+    }
+
+    @Override
+    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
+                         final Get get, final List<Cell> results) throws 
IOException {
+
+      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
+        CountDownLatch latch = cdl.get();
+        try {
+          if (sleepTime.get() > 0) {
+            LOG.info("Sleeping for " + sleepTime.get() + " ms");
+            Thread.sleep(sleepTime.get());
+          } else if (latch.getCount() > 0) {
+            LOG.info("Waiting for the counterCountDownLatch");
+            latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
+            if (latch.getCount() > 0) {
+              throw new RuntimeException("Can't wait more");
+            }
+          }
+        } catch (InterruptedException e1) {
+          LOG.error(e1);
+        }
+      } else {
+        LOG.info("We're not the primary replicas.");
+      }
+    }
+  }
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    // enable store file refreshing
+    HTU.getConfiguration().setInt(
+        StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 
REFRESH_PERIOD);
+
+    HTU.getConfiguration().setFloat("hbase.regionserver.logroll.multiplier", 
0.0001f);
+    HTU.getConfiguration().setInt("replication.source.size.capacity", 10240);
+    HTU.getConfiguration().setLong("replication.source.sleepforretries", 100);
+    HTU.getConfiguration().setInt("hbase.regionserver.maxlogs", 2);
+    HTU.getConfiguration().setLong("hbase.master.logcleaner.ttl", 10);
+    HTU.getConfiguration().setInt("zookeeper.recovery.retry", 1);
+    HTU.getConfiguration().setInt("zookeeper.recovery.retry.intervalmill", 10);
+
+    HTU.startMiniCluster(NB_SERVERS);
+    HTU.getHBaseCluster().startMaster();
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    HTU.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testCreateDeleteTable() throws IOException {
+    // Create table then get the single region for our new table.
+    HTableDescriptor hdt = HTU.createTableDescriptor("testCreateDeleteTable");
+    hdt.setRegionReplication(NB_SERVERS);
+    hdt.addCoprocessor(SlowMeCopro.class.getName());
+    HTable table = HTU.createTable(hdt, new byte[][]{f}, 
HTU.getConfiguration());
+
+    Put p = new Put(row);
+    p.add(f, row, row);
+    table.put(p);
+
+    Get g = new Get(row);
+    Result r = table.get(g);
+    Assert.assertFalse(r.isStale());
+
+    try {
+      // But if we ask for stale we will get it
+      SlowMeCopro.cdl.set(new CountDownLatch(1));
+      g = new Get(row);
+      g.setConsistency(Consistency.TIMELINE);
+      r = table.get(g);
+      Assert.assertTrue(r.isStale());
+      SlowMeCopro.cdl.get().countDown();
+    } finally {
+      SlowMeCopro.cdl.get().countDown();
+      SlowMeCopro.sleepTime.set(0);
+    }
+
+    HTU.getHBaseAdmin().disableTable(hdt.getTableName());
+    HTU.deleteTable(hdt.getTableName());
+  }
+
+  @Test
+  public void testChangeTable() throws Exception {
+    HTableDescriptor hdt = HTU.createTableDescriptor("testChangeTable");
+    hdt.setRegionReplication(NB_SERVERS);
+    hdt.addCoprocessor(SlowMeCopro.class.getName());
+    HTable table = HTU.createTable(hdt, new byte[][]{f}, 
HTU.getConfiguration());
+
+    // basic test: it should work.
+    Put p = new Put(row);
+    p.add(f, row, row);
+    table.put(p);
+
+    Get g = new Get(row);
+    Result r = table.get(g);
+    Assert.assertFalse(r.isStale());
+
+    // Add a CF, it should work.
+    HTableDescriptor bHdt = 
HTU.getHBaseAdmin().getTableDescriptor(hdt.getTableName());
+    HColumnDescriptor hcd = new HColumnDescriptor(row);
+    hdt.addFamily(hcd);
+    HTU.getHBaseAdmin().disableTable(hdt.getTableName());
+    HTU.getHBaseAdmin().modifyTable(hdt.getTableName(), hdt);
+    HTU.getHBaseAdmin().enableTable(hdt.getTableName());
+    HTableDescriptor nHdt = 
HTU.getHBaseAdmin().getTableDescriptor(hdt.getTableName());
+    Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()),
+        bHdt.getColumnFamilies().length + 1, nHdt.getColumnFamilies().length);
+
+    p = new Put(row);
+    p.add(row, row, row);
+    table.put(p);
+
+    g = new Get(row);
+    r = table.get(g);
+    Assert.assertFalse(r.isStale());
+
+    try {
+      SlowMeCopro.cdl.set(new CountDownLatch(1));
+      g = new Get(row);
+      g.setConsistency(Consistency.TIMELINE);
+      r = table.get(g);
+      Assert.assertTrue(r.isStale());
+    } finally {
+      SlowMeCopro.cdl.get().countDown();
+      SlowMeCopro.sleepTime.set(0);
+    }
+
+    HTU.getHBaseCluster().stopMaster(0);
+    HBaseAdmin admin = new HBaseAdmin(HTU.getConfiguration());
+    nHdt =admin.getTableDescriptor(hdt.getTableName());
+    Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()),
+        bHdt.getColumnFamilies().length + 1, nHdt.getColumnFamilies().length);
+
+    admin.disableTable(hdt.getTableName());
+    admin.deleteTable(hdt.getTableName());
+    HTU.getHBaseCluster().startMaster();
+  }
+
+  @Test
+  public void testReplicaAndReplication() throws Exception {
+    HTableDescriptor hdt = 
HTU.createTableDescriptor("testReplicaAndReplication");
+    hdt.setRegionReplication(NB_SERVERS);
+
+    HColumnDescriptor fam = new HColumnDescriptor(row);
+    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+    hdt.addFamily(fam);
+
+    hdt.addCoprocessor(SlowMeCopro.class.getName());
+    HTU.getHBaseAdmin().createTable(hdt, 
HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
+
+    Configuration conf2 = HBaseConfiguration.create(HTU.getConfiguration());
+    conf2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
+    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
+    MiniZooKeeperCluster miniZK = HTU.getZkCluster();
+    HBaseTestingUtility HTU2 = new HBaseTestingUtility(conf2);
+    HTU2.setZkCluster(miniZK);
+    HTU2.startMiniCluster(NB_SERVERS);
+    LOG.info("Setup second Zk");
+    HTU2.getHBaseAdmin().createTable(hdt, 
HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
+
+    ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
+    admin.addPeer("2", HTU2.getClusterKey());
+
+    Put p = new Put(row);
+    p.add(row, row, row);
+    final HTable table = new HTable(HTU.getConfiguration(), 
hdt.getTableName());
+    table.put(p);
+
+    HTU.getHBaseAdmin().flush(table.getTableName());
+    LOG.info("Put & flush done on the first cluster. Now doing a get on the 
same cluster.");
+
+    Waiter.waitFor(HTU.getConfiguration(), 1000, new 
Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        try {
+          SlowMeCopro.cdl.set(new CountDownLatch(1));
+          Get g = new Get(row);
+          g.setConsistency(Consistency.TIMELINE);
+          Result r = table.get(g);
+          Assert.assertTrue(r.isStale());
+          return  !r.isEmpty();
+        } finally {
+          SlowMeCopro.cdl.get().countDown();
+          SlowMeCopro.sleepTime.set(0);
+        }      }
+    });
+
+    LOG.info("stale get on the first cluster done. Now for the second.");
+
+    final HTable table2 = new HTable(HTU.getConfiguration(), 
hdt.getTableName());
+    Waiter.waitFor(HTU.getConfiguration(), 1000, new 
Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        try {
+          SlowMeCopro.cdl.set(new CountDownLatch(1));
+          Get g = new Get(row);
+          g.setConsistency(Consistency.TIMELINE);
+          Result r = table2.get(g);
+          Assert.assertTrue(r.isStale());
+          return  !r.isEmpty();
+        } finally {
+          SlowMeCopro.cdl.get().countDown();
+          SlowMeCopro.sleepTime.set(0);
+        }      }
+    });
+
+    HTU.getHBaseAdmin().disableTable(hdt.getTableName());
+    HTU.deleteTable(hdt.getTableName());
+
+    HTU2.getHBaseAdmin().disableTable(hdt.getTableName());
+    HTU2.deleteTable(hdt.getTableName());
+
+    HTU2.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testBulkLoad() throws IOException {
+    // Create table then get the single region for our new table.
+    LOG.debug("Creating test table");
+    HTableDescriptor hdt = HTU.createTableDescriptor("testBulkLoad");
+    hdt.setRegionReplication(NB_SERVERS);
+    hdt.addCoprocessor(SlowMeCopro.class.getName());
+    HTable table = HTU.createTable(hdt, new byte[][]{f}, 
HTU.getConfiguration());
+
+    // create hfiles to load.
+    LOG.debug("Creating test data");
+    Path dir = HTU.getDataTestDirOnTestFS("testBulkLoad");
+    final int numRows = 10;
+    final byte[] qual = Bytes.toBytes("qual");
+    final byte[] val  = Bytes.toBytes("val");
+    final List<Pair<byte[], String>> famPaths = new ArrayList<Pair<byte[], 
String>>();
+    for (HColumnDescriptor col : hdt.getColumnFamilies()) {
+      Path hfile = new Path(dir, col.getNameAsString());
+      TestHRegionServerBulkLoad.createHFile(HTU.getTestFileSystem(), hfile, 
col.getName(),
+        qual, val, numRows);
+      famPaths.add(new Pair<byte[], String>(col.getName(), hfile.toString()));
+    }
+
+    // bulk load HFiles
+    LOG.debug("Loading test data");
+    final HConnection conn = HTU.getHBaseAdmin().getConnection();
+    RegionServerCallable<Void> callable = new RegionServerCallable<Void>(
+      conn, hdt.getTableName(), TestHRegionServerBulkLoad.rowkey(0)) {
+        @Override
+        public Void call() throws Exception {
+          LOG.debug("Going to connect to server " + getLocation() + " for row "
+            + Bytes.toStringBinary(getRow()));
+          byte[] regionName = getLocation().getRegionInfo().getRegionName();
+          BulkLoadHFileRequest request =
+            RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName, 
true);
+          getStub().bulkLoadHFile(null, request);
+          return null;
+        }
+      };
+    RpcRetryingCallerFactory factory = new 
RpcRetryingCallerFactory(HTU.getConfiguration());
+    RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
+    caller.callWithRetries(callable);
+
+    // verify we can read them from the primary
+    LOG.debug("Verifying data load");
+    for (int i = 0; i < numRows; i++) {
+      byte[] row = TestHRegionServerBulkLoad.rowkey(i);
+      Get g = new Get(row);
+      Result r = table.get(g);
+      Assert.assertFalse(r.isStale());
+    }
+
+    // verify we can read them from the replica
+    LOG.debug("Verifying replica queries");
+    try {
+      SlowMeCopro.cdl.set(new CountDownLatch(1));
+      for (int i = 0; i < numRows; i++) {
+        byte[] row = TestHRegionServerBulkLoad.rowkey(i);
+        Get g = new Get(row);
+        g.setConsistency(Consistency.TIMELINE);
+        Result r = table.get(g);
+        Assert.assertTrue(r.isStale());
+      }
+      SlowMeCopro.cdl.get().countDown();
+    } finally {
+      SlowMeCopro.cdl.get().countDown();
+      SlowMeCopro.sleepTime.set(0);
+    }
+
+    HTU.getHBaseAdmin().disableTable(hdt.getTableName());
+    HTU.deleteTable(hdt.getTableName());
+  }
+}

Modified: 
hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java?rev=1585484&r1=1585483&r2=1585484&view=diff
==============================================================================
--- 
hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
 (original)
+++ 
hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
 Mon Apr  7 13:54:27 2014
@@ -83,7 +83,11 @@ public class TestHRegionServerBulkLoad {
     }
   }
 
-  static byte[] rowkey(int i) {
+  /**
+   * Create a rowkey compatible with
+   * {@link #createHFile(FileSystem, Path, byte[], byte[], byte[], int)}.
+   */
+  public static byte[] rowkey(int i) {
     return Bytes.toBytes(String.format("row_%08d", i));
   }
 


Reply via email to