Author: nkeywal
Date: Mon Apr 7 13:54:27 2014
New Revision: 1585484
URL: http://svn.apache.org/r1585484
Log:
HBASE-10817 Add some tests on a real cluster for replica: multi master,
replication
Added:
hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
Modified:
hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
Added:
hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
URL:
http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java?rev=1585484&view=auto
==============================================================================
---
hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
(added)
+++
hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
Mon Apr 7 13:54:27 2014
@@ -0,0 +1,371 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
+import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+@Category(MediumTests.class)
+public class TestReplicaWithCluster {
+ private static final Log LOG =
LogFactory.getLog(TestReplicaWithCluster.class);
+
+ private static final int NB_SERVERS = 2;
+ private static final byte[] row =
TestReplicaWithCluster.class.getName().getBytes();
+ private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
+ private static final byte[] f = HConstants.CATALOG_FAMILY;
+
+ private final static int REFRESH_PERIOD = 1000;
+
+ /**
+ * This copro is used to synchronize the tests.
+ */
+ public static class SlowMeCopro extends BaseRegionObserver {
+ static final AtomicLong sleepTime = new AtomicLong(0);
+ static final AtomicReference<CountDownLatch> cdl =
+ new AtomicReference<CountDownLatch>(new CountDownLatch(0));
+
+ public SlowMeCopro() {
+ }
+
+ @Override
+ public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
+ final Get get, final List<Cell> results) throws
IOException {
+
+ if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
+ CountDownLatch latch = cdl.get();
+ try {
+ if (sleepTime.get() > 0) {
+ LOG.info("Sleeping for " + sleepTime.get() + " ms");
+ Thread.sleep(sleepTime.get());
+ } else if (latch.getCount() > 0) {
+ LOG.info("Waiting for the counterCountDownLatch");
+ latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
+ if (latch.getCount() > 0) {
+ throw new RuntimeException("Can't wait more");
+ }
+ }
+ } catch (InterruptedException e1) {
+ LOG.error(e1);
+ }
+ } else {
+ LOG.info("We're not the primary replicas.");
+ }
+ }
+ }
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ // enable store file refreshing
+ HTU.getConfiguration().setInt(
+ StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
REFRESH_PERIOD);
+
+ HTU.getConfiguration().setFloat("hbase.regionserver.logroll.multiplier",
0.0001f);
+ HTU.getConfiguration().setInt("replication.source.size.capacity", 10240);
+ HTU.getConfiguration().setLong("replication.source.sleepforretries", 100);
+ HTU.getConfiguration().setInt("hbase.regionserver.maxlogs", 2);
+ HTU.getConfiguration().setLong("hbase.master.logcleaner.ttl", 10);
+ HTU.getConfiguration().setInt("zookeeper.recovery.retry", 1);
+ HTU.getConfiguration().setInt("zookeeper.recovery.retry.intervalmill", 10);
+
+ HTU.startMiniCluster(NB_SERVERS);
+ HTU.getHBaseCluster().startMaster();
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ HTU.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testCreateDeleteTable() throws IOException {
+ // Create table then get the single region for our new table.
+ HTableDescriptor hdt = HTU.createTableDescriptor("testCreateDeleteTable");
+ hdt.setRegionReplication(NB_SERVERS);
+ hdt.addCoprocessor(SlowMeCopro.class.getName());
+ HTable table = HTU.createTable(hdt, new byte[][]{f},
HTU.getConfiguration());
+
+ Put p = new Put(row);
+ p.add(f, row, row);
+ table.put(p);
+
+ Get g = new Get(row);
+ Result r = table.get(g);
+ Assert.assertFalse(r.isStale());
+
+ try {
+ // But if we ask for stale we will get it
+ SlowMeCopro.cdl.set(new CountDownLatch(1));
+ g = new Get(row);
+ g.setConsistency(Consistency.TIMELINE);
+ r = table.get(g);
+ Assert.assertTrue(r.isStale());
+ SlowMeCopro.cdl.get().countDown();
+ } finally {
+ SlowMeCopro.cdl.get().countDown();
+ SlowMeCopro.sleepTime.set(0);
+ }
+
+ HTU.getHBaseAdmin().disableTable(hdt.getTableName());
+ HTU.deleteTable(hdt.getTableName());
+ }
+
+ @Test
+ public void testChangeTable() throws Exception {
+ HTableDescriptor hdt = HTU.createTableDescriptor("testChangeTable");
+ hdt.setRegionReplication(NB_SERVERS);
+ hdt.addCoprocessor(SlowMeCopro.class.getName());
+ HTable table = HTU.createTable(hdt, new byte[][]{f},
HTU.getConfiguration());
+
+ // basic test: it should work.
+ Put p = new Put(row);
+ p.add(f, row, row);
+ table.put(p);
+
+ Get g = new Get(row);
+ Result r = table.get(g);
+ Assert.assertFalse(r.isStale());
+
+ // Add a CF, it should work.
+ HTableDescriptor bHdt =
HTU.getHBaseAdmin().getTableDescriptor(hdt.getTableName());
+ HColumnDescriptor hcd = new HColumnDescriptor(row);
+ hdt.addFamily(hcd);
+ HTU.getHBaseAdmin().disableTable(hdt.getTableName());
+ HTU.getHBaseAdmin().modifyTable(hdt.getTableName(), hdt);
+ HTU.getHBaseAdmin().enableTable(hdt.getTableName());
+ HTableDescriptor nHdt =
HTU.getHBaseAdmin().getTableDescriptor(hdt.getTableName());
+ Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()),
+ bHdt.getColumnFamilies().length + 1, nHdt.getColumnFamilies().length);
+
+ p = new Put(row);
+ p.add(row, row, row);
+ table.put(p);
+
+ g = new Get(row);
+ r = table.get(g);
+ Assert.assertFalse(r.isStale());
+
+ try {
+ SlowMeCopro.cdl.set(new CountDownLatch(1));
+ g = new Get(row);
+ g.setConsistency(Consistency.TIMELINE);
+ r = table.get(g);
+ Assert.assertTrue(r.isStale());
+ } finally {
+ SlowMeCopro.cdl.get().countDown();
+ SlowMeCopro.sleepTime.set(0);
+ }
+
+ HTU.getHBaseCluster().stopMaster(0);
+ HBaseAdmin admin = new HBaseAdmin(HTU.getConfiguration());
+ nHdt =admin.getTableDescriptor(hdt.getTableName());
+ Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()),
+ bHdt.getColumnFamilies().length + 1, nHdt.getColumnFamilies().length);
+
+ admin.disableTable(hdt.getTableName());
+ admin.deleteTable(hdt.getTableName());
+ HTU.getHBaseCluster().startMaster();
+ }
+
+ @Test
+ public void testReplicaAndReplication() throws Exception {
+ HTableDescriptor hdt =
HTU.createTableDescriptor("testReplicaAndReplication");
+ hdt.setRegionReplication(NB_SERVERS);
+
+ HColumnDescriptor fam = new HColumnDescriptor(row);
+ fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+ hdt.addFamily(fam);
+
+ hdt.addCoprocessor(SlowMeCopro.class.getName());
+ HTU.getHBaseAdmin().createTable(hdt,
HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
+
+ Configuration conf2 = HBaseConfiguration.create(HTU.getConfiguration());
+ conf2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
+ conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
+ MiniZooKeeperCluster miniZK = HTU.getZkCluster();
+ HBaseTestingUtility HTU2 = new HBaseTestingUtility(conf2);
+ HTU2.setZkCluster(miniZK);
+ HTU2.startMiniCluster(NB_SERVERS);
+ LOG.info("Setup second Zk");
+ HTU2.getHBaseAdmin().createTable(hdt,
HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
+
+ ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
+ admin.addPeer("2", HTU2.getClusterKey());
+
+ Put p = new Put(row);
+ p.add(row, row, row);
+ final HTable table = new HTable(HTU.getConfiguration(),
hdt.getTableName());
+ table.put(p);
+
+ HTU.getHBaseAdmin().flush(table.getTableName());
+ LOG.info("Put & flush done on the first cluster. Now doing a get on the
same cluster.");
+
+ Waiter.waitFor(HTU.getConfiguration(), 1000, new
Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ try {
+ SlowMeCopro.cdl.set(new CountDownLatch(1));
+ Get g = new Get(row);
+ g.setConsistency(Consistency.TIMELINE);
+ Result r = table.get(g);
+ Assert.assertTrue(r.isStale());
+ return !r.isEmpty();
+ } finally {
+ SlowMeCopro.cdl.get().countDown();
+ SlowMeCopro.sleepTime.set(0);
+ } }
+ });
+
+ LOG.info("stale get on the first cluster done. Now for the second.");
+
+ final HTable table2 = new HTable(HTU.getConfiguration(),
hdt.getTableName());
+ Waiter.waitFor(HTU.getConfiguration(), 1000, new
Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ try {
+ SlowMeCopro.cdl.set(new CountDownLatch(1));
+ Get g = new Get(row);
+ g.setConsistency(Consistency.TIMELINE);
+ Result r = table2.get(g);
+ Assert.assertTrue(r.isStale());
+ return !r.isEmpty();
+ } finally {
+ SlowMeCopro.cdl.get().countDown();
+ SlowMeCopro.sleepTime.set(0);
+ } }
+ });
+
+ HTU.getHBaseAdmin().disableTable(hdt.getTableName());
+ HTU.deleteTable(hdt.getTableName());
+
+ HTU2.getHBaseAdmin().disableTable(hdt.getTableName());
+ HTU2.deleteTable(hdt.getTableName());
+
+ HTU2.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testBulkLoad() throws IOException {
+ // Create table then get the single region for our new table.
+ LOG.debug("Creating test table");
+ HTableDescriptor hdt = HTU.createTableDescriptor("testBulkLoad");
+ hdt.setRegionReplication(NB_SERVERS);
+ hdt.addCoprocessor(SlowMeCopro.class.getName());
+ HTable table = HTU.createTable(hdt, new byte[][]{f},
HTU.getConfiguration());
+
+ // create hfiles to load.
+ LOG.debug("Creating test data");
+ Path dir = HTU.getDataTestDirOnTestFS("testBulkLoad");
+ final int numRows = 10;
+ final byte[] qual = Bytes.toBytes("qual");
+ final byte[] val = Bytes.toBytes("val");
+ final List<Pair<byte[], String>> famPaths = new ArrayList<Pair<byte[],
String>>();
+ for (HColumnDescriptor col : hdt.getColumnFamilies()) {
+ Path hfile = new Path(dir, col.getNameAsString());
+ TestHRegionServerBulkLoad.createHFile(HTU.getTestFileSystem(), hfile,
col.getName(),
+ qual, val, numRows);
+ famPaths.add(new Pair<byte[], String>(col.getName(), hfile.toString()));
+ }
+
+ // bulk load HFiles
+ LOG.debug("Loading test data");
+ final HConnection conn = HTU.getHBaseAdmin().getConnection();
+ RegionServerCallable<Void> callable = new RegionServerCallable<Void>(
+ conn, hdt.getTableName(), TestHRegionServerBulkLoad.rowkey(0)) {
+ @Override
+ public Void call() throws Exception {
+ LOG.debug("Going to connect to server " + getLocation() + " for row "
+ + Bytes.toStringBinary(getRow()));
+ byte[] regionName = getLocation().getRegionInfo().getRegionName();
+ BulkLoadHFileRequest request =
+ RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName,
true);
+ getStub().bulkLoadHFile(null, request);
+ return null;
+ }
+ };
+ RpcRetryingCallerFactory factory = new
RpcRetryingCallerFactory(HTU.getConfiguration());
+ RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
+ caller.callWithRetries(callable);
+
+ // verify we can read them from the primary
+ LOG.debug("Verifying data load");
+ for (int i = 0; i < numRows; i++) {
+ byte[] row = TestHRegionServerBulkLoad.rowkey(i);
+ Get g = new Get(row);
+ Result r = table.get(g);
+ Assert.assertFalse(r.isStale());
+ }
+
+ // verify we can read them from the replica
+ LOG.debug("Verifying replica queries");
+ try {
+ SlowMeCopro.cdl.set(new CountDownLatch(1));
+ for (int i = 0; i < numRows; i++) {
+ byte[] row = TestHRegionServerBulkLoad.rowkey(i);
+ Get g = new Get(row);
+ g.setConsistency(Consistency.TIMELINE);
+ Result r = table.get(g);
+ Assert.assertTrue(r.isStale());
+ }
+ SlowMeCopro.cdl.get().countDown();
+ } finally {
+ SlowMeCopro.cdl.get().countDown();
+ SlowMeCopro.sleepTime.set(0);
+ }
+
+ HTU.getHBaseAdmin().disableTable(hdt.getTableName());
+ HTU.deleteTable(hdt.getTableName());
+ }
+}
Modified:
hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
URL:
http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java?rev=1585484&r1=1585483&r2=1585484&view=diff
==============================================================================
---
hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
(original)
+++
hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
Mon Apr 7 13:54:27 2014
@@ -83,7 +83,11 @@ public class TestHRegionServerBulkLoad {
}
}
- static byte[] rowkey(int i) {
+ /**
+ * Create a rowkey compatible with
+ * {@link #createHFile(FileSystem, Path, byte[], byte[], byte[], int)}.
+ */
+ public static byte[] rowkey(int i) {
return Bytes.toBytes(String.format("row_%08d", i));
}