Repository: hbase Updated Branches: refs/heads/branch-1.2 fd7234020 -> ab0651ed2
http://git-wip-us.apache.org/repos/asf/hbase/blob/930f68c0/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java index 6cef518..99c7f47 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.IsolationLevel; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; @@ -82,7 +83,7 @@ public class TestAtomicOperation { private static final Log LOG = LogFactory.getLog(TestAtomicOperation.class); @Rule public TestName name = new TestName(); - Region region = null; + HRegion region = null; private HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU(); // Test names @@ -144,13 +145,13 @@ public class TestAtomicOperation { * Test multi-threaded increments. */ @Test - public void testIncrementMultiThreads() throws IOException { + public void testIncrementMultiThreads(final boolean fast) throws IOException { LOG.info("Starting test testIncrementMultiThreads"); // run a with mixed column families (1 and 3 versions) initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2); - // create 25 threads, each will increment by its own quantity - int numThreads = 25; + // Create 100 threads, each will increment by its own quantity + int numThreads = 100; int incrementsPerThread = 1000; Incrementer[] all = new Incrementer[numThreads]; int expectedTotal = 0; @@ -173,9 +174,9 @@ public class TestAtomicOperation { LOG.info("Ignored", e); } } - assertICV(row, fam1, qual1, expectedTotal); - assertICV(row, fam1, qual2, expectedTotal*2); - assertICV(row, fam2, qual3, expectedTotal*3); + assertICV(row, fam1, qual1, expectedTotal, fast); + assertICV(row, fam1, qual2, expectedTotal*2, fast); + assertICV(row, fam2, qual3, expectedTotal*3, fast); LOG.info("testIncrementMultiThreads successfully verified that total is " + expectedTotal); } @@ -183,9 +184,11 @@ public class TestAtomicOperation { private void assertICV(byte [] row, byte [] familiy, byte[] qualifier, - long amount) throws IOException { + long amount, + boolean fast) throws IOException { // run a get and see? Get get = new Get(row); + if (fast) get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); get.addColumn(familiy, qualifier); Result result = region.get(get); assertEquals(1, result.size()); @@ -511,13 +514,13 @@ public class TestAtomicOperation { } public static class AtomicOperation extends Thread { - protected final Region region; + protected final HRegion region; protected final int numOps; protected final AtomicLong timeStamps; protected final AtomicInteger failures; protected final Random r = new Random(); - public AtomicOperation(Region region, int numOps, AtomicLong timeStamps, + public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps, AtomicInteger failures) { this.region = region; this.numOps = numOps; @@ -579,8 +582,8 @@ public class TestAtomicOperation { } private class PutThread extends TestThread { - private Region region; - PutThread(TestContext ctx, Region region) { + private HRegion region; + PutThread(TestContext ctx, HRegion region) { super(ctx); this.region = region; } @@ -596,8 +599,8 @@ public class TestAtomicOperation { } private class CheckAndPutThread extends TestThread { - private Region region; - CheckAndPutThread(TestContext ctx, Region region) { + private HRegion region; + CheckAndPutThread(TestContext ctx, HRegion region) { super(ctx); this.region = region; } http://git-wip-us.apache.org/repos/asf/hbase/blob/930f68c0/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionIncrement.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionIncrement.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionIncrement.java new file mode 100644 index 0000000..955da94 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionIncrement.java @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.CategoryBasedTimeout; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.TestIncrementsFromClientSide; +import org.apache.hadoop.hbase.regionserver.wal.FSHLog; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.rules.TestRule; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + + +/** + * Increments with some concurrency against a region to ensure we get the right answer. + * Test is parameterized to run the fast and slow path increments; if fast, + * HRegion.INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY is true. + * + * <p>There is similar test up in TestAtomicOperation. It does a test where it has 100 threads + * doing increments across two column families all on one row and the increments are connected to + * prove atomicity on row. + */ +@Category(MediumTests.class) +@RunWith(Parameterized.class) +public class TestRegionIncrement { + private static final Log LOG = LogFactory.getLog(TestRegionIncrement.class); + @Rule public TestName name = new TestName(); + @Rule public final TestRule timeout = + CategoryBasedTimeout.builder().withTimeout(this.getClass()). + withLookingForStuckThread(true).build(); + private static HBaseTestingUtility TEST_UTIL; + private final static byte [] INCREMENT_BYTES = Bytes.toBytes("increment"); + private static final int THREAD_COUNT = 10; + private static final int INCREMENT_COUNT = 10000; + + @Parameters(name = "fast={0}") + public static Collection<Object []> data() { + return Arrays.asList(new Object[] {Boolean.FALSE}, new Object [] {Boolean.TRUE}); + } + + private final boolean fast; + + public TestRegionIncrement(final boolean fast) { + this.fast = fast; + } + + @Before + public void setUp() throws Exception { + TEST_UTIL = HBaseTestingUtility.createLocalHTU(); + if (this.fast) { + TEST_UTIL.getConfiguration(). + setBoolean(HRegion.INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY, this.fast); + } + } + + @After + public void tearDown() throws Exception { + TEST_UTIL.cleanupTestDir(); + } + + private HRegion getRegion(final Configuration conf, final String tableName) throws IOException { + WAL wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDataTestDir(), + TEST_UTIL.getDataTestDir().toString(), conf); + return (HRegion)TEST_UTIL.createLocalHRegion(Bytes.toBytes(tableName), + HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, tableName, conf, + false, Durability.SKIP_WAL, wal, INCREMENT_BYTES); + } + + private void closeRegion(final HRegion region) throws IOException { + region.close(); + region.getWAL().close(); + } + + /** + * Increments a single cell a bunch of times. + */ + private static class SingleCellIncrementer extends Thread { + private final int count; + private final HRegion region; + private final Increment increment; + + SingleCellIncrementer(final int i, final int count, final HRegion region, + final Increment increment) { + super("" + i); + setDaemon(true); + this.count = count; + this.region = region; + this.increment = increment; + } + + @Override + public void run() { + for (int i = 0; i < this.count; i++) { + try { + this.region.increment(this.increment); + // LOG.info(getName() + " " + i); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + } + + /** + * Increments a random row's Cell <code>count</code> times. + */ + private static class CrossRowCellIncrementer extends Thread { + private final int count; + private final HRegion region; + private final Increment [] increments; + + CrossRowCellIncrementer(final int i, final int count, final HRegion region, final int range) { + super("" + i); + setDaemon(true); + this.count = count; + this.region = region; + this.increments = new Increment[range]; + for (int ii = 0; ii < range; ii++) { + this.increments[ii] = new Increment(Bytes.toBytes(i)); + this.increments[ii].addColumn(INCREMENT_BYTES, INCREMENT_BYTES, 1); + } + } + + @Override + public void run() { + for (int i = 0; i < this.count; i++) { + try { + int index = ThreadLocalRandom.current().nextInt(0, this.increments.length); + this.region.increment(this.increments[index]); + // LOG.info(getName() + " " + index); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + } + + /** + * Have each thread update its own Cell. Avoid contention with another thread. + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testUnContendedSingleCellIncrement() + throws IOException, InterruptedException { + final HRegion region = getRegion(TEST_UTIL.getConfiguration(), + TestIncrementsFromClientSide.filterStringSoTableNameSafe(this.name.getMethodName())); + long startTime = System.currentTimeMillis(); + try { + SingleCellIncrementer [] threads = new SingleCellIncrementer[THREAD_COUNT]; + for (int i = 0; i < threads.length; i++) { + byte [] rowBytes = Bytes.toBytes(i); + Increment increment = new Increment(rowBytes); + increment.addColumn(INCREMENT_BYTES, INCREMENT_BYTES, 1); + threads[i] = new SingleCellIncrementer(i, INCREMENT_COUNT, region, increment); + } + for (int i = 0; i < threads.length; i++) { + threads[i].start(); + } + for (int i = 0; i < threads.length; i++) { + threads[i].join(); + } + RegionScanner regionScanner = region.getScanner(new Scan()); + List<Cell> cells = new ArrayList<Cell>(THREAD_COUNT); + while(regionScanner.next(cells)) continue; + assertEquals(THREAD_COUNT, cells.size()); + long total = 0; + for (Cell cell: cells) total += Bytes.toLong(cell.getValue()); + assertEquals(INCREMENT_COUNT * THREAD_COUNT, total); + } finally { + closeRegion(region); + LOG.info(this.name.getMethodName() + " " + (System.currentTimeMillis() - startTime) + "ms"); + } + } + + /** + * Have each thread update its own Cell. Avoid contention with another thread. + * This is + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testContendedAcrossCellsIncrement() + throws IOException, InterruptedException { + final HRegion region = getRegion(TEST_UTIL.getConfiguration(), + TestIncrementsFromClientSide.filterStringSoTableNameSafe(this.name.getMethodName())); + long startTime = System.currentTimeMillis(); + try { + CrossRowCellIncrementer [] threads = new CrossRowCellIncrementer[THREAD_COUNT]; + for (int i = 0; i < threads.length; i++) { + threads[i] = new CrossRowCellIncrementer(i, INCREMENT_COUNT, region, THREAD_COUNT); + } + for (int i = 0; i < threads.length; i++) { + threads[i].start(); + } + for (int i = 0; i < threads.length; i++) { + threads[i].join(); + } + RegionScanner regionScanner = region.getScanner(new Scan()); + List<Cell> cells = new ArrayList<Cell>(100); + while(regionScanner.next(cells)) continue; + assertEquals(THREAD_COUNT, cells.size()); + long total = 0; + for (Cell cell: cells) total += Bytes.toLong(cell.getValue()); + assertEquals(INCREMENT_COUNT * THREAD_COUNT, total); + } finally { + closeRegion(region); + LOG.info(this.name.getMethodName() + " " + (System.currentTimeMillis() - startTime) + "ms"); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/930f68c0/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java index 94e2028..4c7a204 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java @@ -549,7 +549,7 @@ public class TestTags { public static class TestCoprocessorForTags extends BaseRegionObserver { - public static boolean checkTagPresence = false; + public static volatile boolean checkTagPresence = false; public static List<Tag> tags = null; @Override
