Repository: hbase Updated Branches: refs/heads/branch-1 d965d14a6 -> a9c008344
http://git-wip-us.apache.org/repos/asf/hbase/blob/a9c00834/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java index a0a8747..09c7e86 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java @@ -22,7 +22,6 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; import java.util.ArrayList; @@ -32,14 +31,14 @@ import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.junit.After; http://git-wip-us.apache.org/repos/asf/hbase/blob/a9c00834/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideNoCodec.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideNoCodec.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideNoCodec.java index ae96849..66fb69c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideNoCodec.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideNoCodec.java @@ -99,4 +99,4 @@ public class TestFromClientSideNoCodec { String codec = AbstractRpcClient.getDefaultCodec(c); assertTrue(codec == null || codec.length() == 0); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/a9c00834/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java index 2671af7..cd2409e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java @@ -27,7 +27,7 @@ import org.junit.experimental.categories.Category; /** * Test all client operations with a coprocessor that - * just implements the default flush/compact/scan policy + * just implements the default flush/compact/scan policy. */ @Category(LargeTests.class) public class TestFromClientSideWithCoprocessor extends TestFromClientSide { http://git-wip-us.apache.org/repos/asf/hbase/blob/a9c00834/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementFromClientSideWithCoprocessor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementFromClientSideWithCoprocessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementFromClientSideWithCoprocessor.java new file mode 100644 index 0000000..a67cc45 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementFromClientSideWithCoprocessor.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; +import org.apache.hadoop.hbase.regionserver.NoOpScanPolicyObserver; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.junit.Before; +import org.junit.experimental.categories.Category; + +/** + * Test all {@link Increment} client operations with a coprocessor that + * just implements the default flush/compact/scan policy. + * + * This test takes a long time. The test it derives from is parameterized so we run through both + * options of the test. + */ +@Category(LargeTests.class) +public class TestIncrementFromClientSideWithCoprocessor extends TestIncrementsFromClientSide { + public TestIncrementFromClientSideWithCoprocessor(final boolean fast) { + super(fast); + } + + @Before + public void before() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + MultiRowMutationEndpoint.class.getName(), NoOpScanPolicyObserver.class.getName()); + conf.setBoolean("hbase.table.sanity.checks", true); // enable for below tests + super.before(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/a9c00834/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java new file mode 100644 index 0000000..54a54a0 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java @@ -0,0 +1,433 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +/** + * Run Increment tests that use the HBase clients; {@link HTable}. + * + * Test is parameterized to run the slow and fast increment code paths. If fast, in the @before, we + * do a rolling restart of the single regionserver so that it can pick up the go fast configuration. + * Doing it this way should be faster than starting/stopping a cluster per test. + * + * Test takes a long time because spin up a cluster between each run -- ugh. + */ +@RunWith(Parameterized.class) +@Category(LargeTests.class) +@SuppressWarnings ("deprecation") +public class TestIncrementsFromClientSide { + final Log LOG = LogFactory.getLog(getClass()); + protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static byte [] ROW = Bytes.toBytes("testRow"); + private static byte [] FAMILY = Bytes.toBytes("testFamily"); + // This test depends on there being only one slave running at at a time. See the @Before + // method where we do rolling restart. + protected static int SLAVES = 1; + private String oldINCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY; + @Rule public TestName name = new TestName(); + @Parameters(name = "fast={0}") + public static Collection<Object []> data() { + return Arrays.asList(new Object[] {Boolean.FALSE}, new Object [] {Boolean.TRUE}); + } + private final boolean fast; + + public TestIncrementsFromClientSide(final boolean fast) { + this.fast = fast; + } + + @BeforeClass + public static void beforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + MultiRowMutationEndpoint.class.getName()); + conf.setBoolean("hbase.table.sanity.checks", true); // enable for below tests + // We need more than one region server in this test + TEST_UTIL.startMiniCluster(SLAVES); + } + + @Before + public void before() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + if (this.fast) { + // If fast is set, set our configuration and then do a rolling restart of the one + // regionserver so it picks up the new config. Doing this should be faster than starting + // and stopping a cluster for each test. + this.oldINCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY = + conf.get(HRegion.INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY); + conf.setBoolean(HRegion.INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY, this.fast); + HRegionServer rs = + TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().get(0).getRegionServer(); + TEST_UTIL.getHBaseCluster().startRegionServer(); + rs.stop("Restart"); + while(!rs.isStopped()) { + Threads.sleep(100); + LOG.info("Restarting " + rs); + } + TEST_UTIL.waitUntilNoRegionsInTransition(10000); + } + } + + @After + public void after() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + if (this.fast) { + if (this.oldINCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY != null) { + conf.set(HRegion.INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY, + this.oldINCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY); + } + } + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void afterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testIncrementWithDeletes() throws Exception { + LOG.info("Starting " + this.name.getMethodName()); + final TableName TABLENAME = + TableName.valueOf(filterStringSoTableNameSafe(this.name.getMethodName())); + Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY); + final byte[] COLUMN = Bytes.toBytes("column"); + + ht.incrementColumnValue(ROW, FAMILY, COLUMN, 5); + TEST_UTIL.flush(TABLENAME); + + Delete del = new Delete(ROW); + ht.delete(del); + + ht.incrementColumnValue(ROW, FAMILY, COLUMN, 5); + + Get get = new Get(ROW); + if (this.fast) get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); + Result r = ht.get(get); + assertEquals(1, r.size()); + assertEquals(5, Bytes.toLong(r.getValue(FAMILY, COLUMN))); + } + + @Test + public void testIncrementingInvalidValue() throws Exception { + LOG.info("Starting " + this.name.getMethodName()); + final TableName TABLENAME = + TableName.valueOf(filterStringSoTableNameSafe(this.name.getMethodName())); + Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY); + final byte[] COLUMN = Bytes.toBytes("column"); + Put p = new Put(ROW); + // write an integer here (not a Long) + p.add(FAMILY, COLUMN, Bytes.toBytes(5)); + ht.put(p); + try { + ht.incrementColumnValue(ROW, FAMILY, COLUMN, 5); + fail("Should have thrown DoNotRetryIOException"); + } catch (DoNotRetryIOException iox) { + // success + } + Increment inc = new Increment(ROW); + inc.addColumn(FAMILY, COLUMN, 5); + try { + ht.increment(inc); + fail("Should have thrown DoNotRetryIOException"); + } catch (DoNotRetryIOException iox) { + // success + } + } + + @Test + public void testIncrementInvalidArguments() throws Exception { + LOG.info("Starting " + this.name.getMethodName()); + final TableName TABLENAME = + TableName.valueOf(filterStringSoTableNameSafe(this.name.getMethodName())); + Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY); + final byte[] COLUMN = Bytes.toBytes("column"); + try { + // try null row + ht.incrementColumnValue(null, FAMILY, COLUMN, 5); + fail("Should have thrown IOException"); + } catch (IOException iox) { + // success + } + try { + // try null family + ht.incrementColumnValue(ROW, null, COLUMN, 5); + fail("Should have thrown IOException"); + } catch (IOException iox) { + // success + } + try { + // try null qualifier + ht.incrementColumnValue(ROW, FAMILY, null, 5); + fail("Should have thrown IOException"); + } catch (IOException iox) { + // success + } + // try null row + try { + Increment incNoRow = new Increment((byte [])null); + incNoRow.addColumn(FAMILY, COLUMN, 5); + fail("Should have thrown IllegalArgumentException"); + } catch (IllegalArgumentException iax) { + // success + } catch (NullPointerException npe) { + // success + } + // try null family + try { + Increment incNoFamily = new Increment(ROW); + incNoFamily.addColumn(null, COLUMN, 5); + fail("Should have thrown IllegalArgumentException"); + } catch (IllegalArgumentException iax) { + // success + } + // try null qualifier + try { + Increment incNoQualifier = new Increment(ROW); + incNoQualifier.addColumn(FAMILY, null, 5); + fail("Should have thrown IllegalArgumentException"); + } catch (IllegalArgumentException iax) { + // success + } + } + + @Test + public void testIncrementOutOfOrder() throws Exception { + LOG.info("Starting " + this.name.getMethodName()); + final TableName TABLENAME = + TableName.valueOf(filterStringSoTableNameSafe(this.name.getMethodName())); + Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY); + + byte [][] QUALIFIERS = new byte [][] { + Bytes.toBytes("B"), Bytes.toBytes("A"), Bytes.toBytes("C") + }; + + Increment inc = new Increment(ROW); + for (int i=0; i<QUALIFIERS.length; i++) { + inc.addColumn(FAMILY, QUALIFIERS[i], 1); + } + ht.increment(inc); + + // Verify expected results + Get get = new Get(ROW); + if (this.fast) get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); + Result r = ht.get(get); + Cell [] kvs = r.rawCells(); + assertEquals(3, kvs.length); + assertIncrementKey(kvs[0], ROW, FAMILY, QUALIFIERS[1], 1); + assertIncrementKey(kvs[1], ROW, FAMILY, QUALIFIERS[0], 1); + assertIncrementKey(kvs[2], ROW, FAMILY, QUALIFIERS[2], 1); + + // Now try multiple columns again + inc = new Increment(ROW); + for (int i=0; i<QUALIFIERS.length; i++) { + inc.addColumn(FAMILY, QUALIFIERS[i], 1); + } + ht.increment(inc); + + // Verify + r = ht.get(get); + kvs = r.rawCells(); + assertEquals(3, kvs.length); + assertIncrementKey(kvs[0], ROW, FAMILY, QUALIFIERS[1], 2); + assertIncrementKey(kvs[1], ROW, FAMILY, QUALIFIERS[0], 2); + assertIncrementKey(kvs[2], ROW, FAMILY, QUALIFIERS[2], 2); + } + + @Test + public void testIncrementOnSameColumn() throws Exception { + LOG.info("Starting " + this.name.getMethodName()); + final byte[] TABLENAME = Bytes.toBytes(filterStringSoTableNameSafe(this.name.getMethodName())); + HTable ht = TEST_UTIL.createTable(TABLENAME, FAMILY); + + byte[][] QUALIFIERS = + new byte[][] { Bytes.toBytes("A"), Bytes.toBytes("B"), Bytes.toBytes("C") }; + + Increment inc = new Increment(ROW); + for (int i = 0; i < QUALIFIERS.length; i++) { + inc.addColumn(FAMILY, QUALIFIERS[i], 1); + inc.addColumn(FAMILY, QUALIFIERS[i], 1); + } + ht.increment(inc); + + // Verify expected results + Get get = new Get(ROW); + if (this.fast) get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); + Result r = ht.get(get); + Cell[] kvs = r.rawCells(); + assertEquals(3, kvs.length); + assertIncrementKey(kvs[0], ROW, FAMILY, QUALIFIERS[0], 1); + assertIncrementKey(kvs[1], ROW, FAMILY, QUALIFIERS[1], 1); + assertIncrementKey(kvs[2], ROW, FAMILY, QUALIFIERS[2], 1); + + // Now try multiple columns again + inc = new Increment(ROW); + for (int i = 0; i < QUALIFIERS.length; i++) { + inc.addColumn(FAMILY, QUALIFIERS[i], 1); + inc.addColumn(FAMILY, QUALIFIERS[i], 1); + } + ht.increment(inc); + + // Verify + r = ht.get(get); + kvs = r.rawCells(); + assertEquals(3, kvs.length); + assertIncrementKey(kvs[0], ROW, FAMILY, QUALIFIERS[0], 2); + assertIncrementKey(kvs[1], ROW, FAMILY, QUALIFIERS[1], 2); + assertIncrementKey(kvs[2], ROW, FAMILY, QUALIFIERS[2], 2); + + ht.close(); + } + + @Test + public void testIncrement() throws Exception { + LOG.info("Starting " + this.name.getMethodName()); + final TableName TABLENAME = + TableName.valueOf(filterStringSoTableNameSafe(this.name.getMethodName())); + Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY); + + byte [][] ROWS = new byte [][] { + Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c"), + Bytes.toBytes("d"), Bytes.toBytes("e"), Bytes.toBytes("f"), + Bytes.toBytes("g"), Bytes.toBytes("h"), Bytes.toBytes("i") + }; + byte [][] QUALIFIERS = new byte [][] { + Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c"), + Bytes.toBytes("d"), Bytes.toBytes("e"), Bytes.toBytes("f"), + Bytes.toBytes("g"), Bytes.toBytes("h"), Bytes.toBytes("i") + }; + + // Do some simple single-column increments + + // First with old API + ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[0], 1); + ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[1], 2); + ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[2], 3); + ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[3], 4); + + // Now increment things incremented with old and do some new + Increment inc = new Increment(ROW); + inc.addColumn(FAMILY, QUALIFIERS[1], 1); + inc.addColumn(FAMILY, QUALIFIERS[3], 1); + inc.addColumn(FAMILY, QUALIFIERS[4], 1); + ht.increment(inc); + + // Verify expected results + Get get = new Get(ROW); + if (this.fast) get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); + Result r = ht.get(get); + Cell [] kvs = r.rawCells(); + assertEquals(5, kvs.length); + assertIncrementKey(kvs[0], ROW, FAMILY, QUALIFIERS[0], 1); + assertIncrementKey(kvs[1], ROW, FAMILY, QUALIFIERS[1], 3); + assertIncrementKey(kvs[2], ROW, FAMILY, QUALIFIERS[2], 3); + assertIncrementKey(kvs[3], ROW, FAMILY, QUALIFIERS[3], 5); + assertIncrementKey(kvs[4], ROW, FAMILY, QUALIFIERS[4], 1); + + // Now try multiple columns by different amounts + inc = new Increment(ROWS[0]); + for (int i=0;i<QUALIFIERS.length;i++) { + inc.addColumn(FAMILY, QUALIFIERS[i], i+1); + } + ht.increment(inc); + // Verify + get = new Get(ROWS[0]); + if (this.fast) get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); + r = ht.get(get); + kvs = r.rawCells(); + assertEquals(QUALIFIERS.length, kvs.length); + for (int i=0;i<QUALIFIERS.length;i++) { + assertIncrementKey(kvs[i], ROWS[0], FAMILY, QUALIFIERS[i], i+1); + } + + // Re-increment them + inc = new Increment(ROWS[0]); + for (int i=0;i<QUALIFIERS.length;i++) { + inc.addColumn(FAMILY, QUALIFIERS[i], i+1); + } + ht.increment(inc); + // Verify + r = ht.get(get); + kvs = r.rawCells(); + assertEquals(QUALIFIERS.length, kvs.length); + for (int i=0;i<QUALIFIERS.length;i++) { + assertIncrementKey(kvs[i], ROWS[0], FAMILY, QUALIFIERS[i], 2*(i+1)); + } + + // Verify that an Increment of an amount of zero, returns current count; i.e. same as for above + // test, that is: 2 * (i + 1). + inc = new Increment(ROWS[0]); + for (int i = 0; i < QUALIFIERS.length; i++) { + inc.addColumn(FAMILY, QUALIFIERS[i], 0); + } + ht.increment(inc); + r = ht.get(get); + kvs = r.rawCells(); + assertEquals(QUALIFIERS.length, kvs.length); + for (int i = 0; i < QUALIFIERS.length; i++) { + assertIncrementKey(kvs[i], ROWS[0], FAMILY, QUALIFIERS[i], 2*(i+1)); + } + } + + + /** + * Call over to the adjacent class's method of same name. + */ + static void assertIncrementKey(Cell key, byte [] row, byte [] family, + byte [] qualifier, long value) throws Exception { + TestFromClientSide.assertIncrementKey(key, row, family, qualifier, value); + } + + public static String filterStringSoTableNameSafe(final String str) { + return str.replaceAll("\\[fast\\=(.*)\\]", ".FAST.is.$1"); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/a9c00834/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java index 8e451cd..ab53e3e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java @@ -22,7 +22,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.IOException; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -32,7 +31,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; @@ -237,7 +235,8 @@ public abstract class TestTableInputFormatScanBase { ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); job.setReducerClass(ScanReducer.class); job.setNumReduceTasks(1); // one to get final "first" and "last" key - FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); + FileOutputFormat.setOutputPath(job, + new Path(TEST_UTIL.getDataTestDir(), job.getJobName())); LOG.info("Started " + job.getJobName()); assertTrue(job.waitForCompletion(true)); LOG.info("After map/reduce completion - job " + jobName); http://git-wip-us.apache.org/repos/asf/hbase/blob/a9c00834/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java index a49c62e..4763c55 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java @@ -19,8 +19,8 @@ package org.apache.hadoop.hbase.regionserver; import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.IsolationLevel; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; @@ -62,7 +63,6 @@ import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.hfile.BlockCache; -import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL; @@ -82,7 +82,7 @@ public class TestAtomicOperation { private static final Log LOG = LogFactory.getLog(TestAtomicOperation.class); @Rule public TestName name = new TestName(); - Region region = null; + HRegion region = null; private HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU(); // Test names @@ -177,17 +177,36 @@ public class TestAtomicOperation { } } + @Test + public void testIncrementMultiThreadsFastPath() throws IOException { + Configuration conf = TEST_UTIL.getConfiguration(); + String oldValue = conf.get(HRegion.INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY); + conf.setBoolean(HRegion.INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY, true); + try { + testIncrementMultiThreads(true); + } finally { + if (oldValue != null) conf.set(HRegion.INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY, oldValue); + } + } + + /** + * Test multi-threaded increments. Take the slow but consistent path through HRegion. + */ + @Test + public void testIncrementMultiThreadsSlowPath() throws IOException { + testIncrementMultiThreads(false); + } + /** * Test multi-threaded increments. */ - @Test - public void testIncrementMultiThreads() throws IOException { + private void testIncrementMultiThreads(final boolean fast) throws IOException { LOG.info("Starting test testIncrementMultiThreads"); // run a with mixed column families (1 and 3 versions) initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2); - // create 25 threads, each will increment by its own quantity - int numThreads = 25; + // Create 100 threads, each will increment by its own quantity + int numThreads = 100; int incrementsPerThread = 1000; Incrementer[] all = new Incrementer[numThreads]; int expectedTotal = 0; @@ -210,9 +229,9 @@ public class TestAtomicOperation { LOG.info("Ignored", e); } } - assertICV(row, fam1, qual1, expectedTotal); - assertICV(row, fam1, qual2, expectedTotal*2); - assertICV(row, fam2, qual3, expectedTotal*3); + assertICV(row, fam1, qual1, expectedTotal, fast); + assertICV(row, fam1, qual2, expectedTotal*2, fast); + assertICV(row, fam2, qual3, expectedTotal*3, fast); LOG.info("testIncrementMultiThreads successfully verified that total is " + expectedTotal); } @@ -220,9 +239,11 @@ public class TestAtomicOperation { private void assertICV(byte [] row, byte [] familiy, byte[] qualifier, - long amount) throws IOException { + long amount, + boolean fast) throws IOException { // run a get and see? Get get = new Get(row); + if (fast) get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); get.addColumn(familiy, qualifier); Result result = region.get(get); assertEquals(1, result.size()); @@ -329,8 +350,10 @@ public class TestAtomicOperation { Get g = new Get(row); Result result = region.get(g); - assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam1, qual2).length); - assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam2, qual3).length); + assertEquals(result.getValue(fam1, qual1).length, + result.getValue(fam1, qual2).length); + assertEquals(result.getValue(fam1, qual1).length, + result.getValue(fam2, qual3).length); } catch (IOException e) { e.printStackTrace(); failures.incrementAndGet(); @@ -548,13 +571,13 @@ public class TestAtomicOperation { } public static class AtomicOperation extends Thread { - protected final Region region; + protected final HRegion region; protected final int numOps; protected final AtomicLong timeStamps; protected final AtomicInteger failures; protected final Random r = new Random(); - public AtomicOperation(Region region, int numOps, AtomicLong timeStamps, + public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps, AtomicInteger failures) { this.region = region; this.numOps = numOps; @@ -616,8 +639,8 @@ public class TestAtomicOperation { } private class PutThread extends TestThread { - private Region region; - PutThread(TestContext ctx, Region region) { + private HRegion region; + PutThread(TestContext ctx, HRegion region) { super(ctx); this.region = region; } @@ -633,8 +656,8 @@ public class TestAtomicOperation { } private class CheckAndPutThread extends TestThread { - private Region region; - CheckAndPutThread(TestContext ctx, Region region) { + private HRegion region; + CheckAndPutThread(TestContext ctx, HRegion region) { super(ctx); this.region = region; } http://git-wip-us.apache.org/repos/asf/hbase/blob/a9c00834/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionIncrement.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionIncrement.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionIncrement.java new file mode 100644 index 0000000..3d25c40 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionIncrement.java @@ -0,0 +1,253 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.CategoryBasedTimeout; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.TestIncrementsFromClientSide; +import org.apache.hadoop.hbase.regionserver.wal.FSHLog; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.rules.TestRule; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + + +/** + * Increments with some concurrency against a region to ensure we get the right answer. + * Test is parameterized to run the fast and slow path increments; if fast, + * HRegion.INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY is true. + * + * <p>There is similar test up in TestAtomicOperation. It does a test where it has 100 threads + * doing increments across two column families all on one row and the increments are connected to + * prove atomicity on row. + */ +@Category(MediumTests.class) +@RunWith(Parameterized.class) +public class TestRegionIncrement { + private static final Log LOG = LogFactory.getLog(TestRegionIncrement.class); + @Rule public TestName name = new TestName(); + @Rule public final TestRule timeout = + CategoryBasedTimeout.builder().withTimeout(this.getClass()). + withLookingForStuckThread(true).build(); + private static HBaseTestingUtility TEST_UTIL; + private final static byte [] INCREMENT_BYTES = Bytes.toBytes("increment"); + private static final int THREAD_COUNT = 10; + private static final int INCREMENT_COUNT = 10000; + + @Parameters(name = "fast={0}") + public static Collection<Object []> data() { + return Arrays.asList(new Object[] {Boolean.FALSE}, new Object [] {Boolean.TRUE}); + } + + private final boolean fast; + + public TestRegionIncrement(final boolean fast) { + this.fast = fast; + } + + @Before + public void setUp() throws Exception { + TEST_UTIL = HBaseTestingUtility.createLocalHTU(); + if (this.fast) { + TEST_UTIL.getConfiguration(). + setBoolean(HRegion.INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY, this.fast); + } + } + + @After + public void tearDown() throws Exception { + TEST_UTIL.cleanupTestDir(); + } + + private HRegion getRegion(final Configuration conf, final String tableName) throws IOException { + WAL wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDataTestDir(), + TEST_UTIL.getDataTestDir().toString(), conf); + return (HRegion)TEST_UTIL.createLocalHRegion(Bytes.toBytes(tableName), + HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, tableName, conf, + false, Durability.SKIP_WAL, wal, INCREMENT_BYTES); + } + + private void closeRegion(final HRegion region) throws IOException { + region.close(); + region.getWAL().close(); + } + + /** + * Increments a single cell a bunch of times. + */ + private static class SingleCellIncrementer extends Thread { + private final int count; + private final HRegion region; + private final Increment increment; + + SingleCellIncrementer(final int i, final int count, final HRegion region, + final Increment increment) { + super("" + i); + setDaemon(true); + this.count = count; + this.region = region; + this.increment = increment; + } + + @Override + public void run() { + for (int i = 0; i < this.count; i++) { + try { + this.region.increment(this.increment); + // LOG.info(getName() + " " + i); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + } + + /** + * Increments a random row's Cell <code>count</code> times. + */ + private static class CrossRowCellIncrementer extends Thread { + private final int count; + private final HRegion region; + private final Increment [] increments; + + CrossRowCellIncrementer(final int i, final int count, final HRegion region, final int range) { + super("" + i); + setDaemon(true); + this.count = count; + this.region = region; + this.increments = new Increment[range]; + for (int ii = 0; ii < range; ii++) { + this.increments[ii] = new Increment(Bytes.toBytes(i)); + this.increments[ii].addColumn(INCREMENT_BYTES, INCREMENT_BYTES, 1); + } + } + + @Override + public void run() { + for (int i = 0; i < this.count; i++) { + try { + int index = ThreadLocalRandom.current().nextInt(0, this.increments.length); + this.region.increment(this.increments[index]); + // LOG.info(getName() + " " + index); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + } + + /** + * Have each thread update its own Cell. Avoid contention with another thread. + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testUnContendedSingleCellIncrement() + throws IOException, InterruptedException { + final HRegion region = getRegion(TEST_UTIL.getConfiguration(), + TestIncrementsFromClientSide.filterStringSoTableNameSafe(this.name.getMethodName())); + long startTime = System.currentTimeMillis(); + try { + SingleCellIncrementer [] threads = new SingleCellIncrementer[THREAD_COUNT]; + for (int i = 0; i < threads.length; i++) { + byte [] rowBytes = Bytes.toBytes(i); + Increment increment = new Increment(rowBytes); + increment.addColumn(INCREMENT_BYTES, INCREMENT_BYTES, 1); + threads[i] = new SingleCellIncrementer(i, INCREMENT_COUNT, region, increment); + } + for (int i = 0; i < threads.length; i++) { + threads[i].start(); + } + for (int i = 0; i < threads.length; i++) { + threads[i].join(); + } + RegionScanner regionScanner = region.getScanner(new Scan()); + List<Cell> cells = new ArrayList<Cell>(THREAD_COUNT); + while(regionScanner.next(cells)) continue; + assertEquals(THREAD_COUNT, cells.size()); + long total = 0; + for (Cell cell: cells) total += Bytes.toLong(cell.getValue()); + assertEquals(INCREMENT_COUNT * THREAD_COUNT, total); + } finally { + closeRegion(region); + LOG.info(this.name.getMethodName() + " " + (System.currentTimeMillis() - startTime) + "ms"); + } + } + + /** + * Have each thread update its own Cell. Avoid contention with another thread. + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testContendedAcrossCellsIncrement() + throws IOException, InterruptedException { + final HRegion region = getRegion(TEST_UTIL.getConfiguration(), + TestIncrementsFromClientSide.filterStringSoTableNameSafe(this.name.getMethodName())); + long startTime = System.currentTimeMillis(); + try { + CrossRowCellIncrementer [] threads = new CrossRowCellIncrementer[THREAD_COUNT]; + for (int i = 0; i < threads.length; i++) { + threads[i] = new CrossRowCellIncrementer(i, INCREMENT_COUNT, region, THREAD_COUNT); + } + for (int i = 0; i < threads.length; i++) { + threads[i].start(); + } + for (int i = 0; i < threads.length; i++) { + threads[i].join(); + } + RegionScanner regionScanner = region.getScanner(new Scan()); + List<Cell> cells = new ArrayList<Cell>(100); + while(regionScanner.next(cells)) continue; + assertEquals(THREAD_COUNT, cells.size()); + long total = 0; + for (Cell cell: cells) total += Bytes.toLong(cell.getValue()); + assertEquals(INCREMENT_COUNT * THREAD_COUNT, total); + } finally { + closeRegion(region); + LOG.info(this.name.getMethodName() + " " + (System.currentTimeMillis() - startTime) + "ms"); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/a9c00834/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java index 94e2028..4c7a204 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java @@ -549,7 +549,7 @@ public class TestTags { public static class TestCoprocessorForTags extends BaseRegionObserver { - public static boolean checkTagPresence = false; + public static volatile boolean checkTagPresence = false; public static List<Tag> tags = null; @Override