This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new dfa8c5e HBASE-21910 The nonce implementation is wrong for AsyncTable dfa8c5e is described below commit dfa8c5e12fcb20ecb171f0d310c19f6e3440811c Author: Duo Zhang <zhang...@apache.org> AuthorDate: Fri Feb 15 17:07:21 2019 +0800 HBASE-21910 The nonce implementation is wrong for AsyncTable Signed-off-by: Guanghao Zhang <zg...@apache.org> --- .../hadoop/hbase/client/RawAsyncTableImpl.java | 20 +++-- .../hbase/client/TestAsyncTableNoncedRetry.java | 86 +++++++++++++--------- 2 files changed, 63 insertions(+), 43 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index be94ca4..7562e6f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -197,12 +197,10 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { D convert(I info, S src, long nonceGroup, long nonce) throws IOException; } - private <REQ, RESP> CompletableFuture<RESP> noncedMutate(HBaseRpcController controller, - HRegionLocation loc, ClientService.Interface stub, REQ req, + private <REQ, RESP> CompletableFuture<RESP> noncedMutate(long nonceGroup, long nonce, + HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req, NoncedConverter<MutateRequest, byte[], REQ> reqConvert, Converter<RESP, HBaseRpcController, MutateResponse> respConverter) { - long nonceGroup = conn.getNonceGenerator().getNonceGroup(); - long nonce = conn.getNonceGenerator().newNonce(); return mutate(controller, loc, stub, req, (info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter); } @@ -254,18 +252,24 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { @Override public CompletableFuture<Result> append(Append append) { checkHasFamilies(append); + long nonceGroup = conn.getNonceGenerator().getNonceGroup(); + long nonce = conn.getNonceGenerator().newNonce(); return this.<Result> newCaller(append, rpcTimeoutNs) - .action((controller, loc, stub) -> this.<Append, Result> noncedMutate(controller, loc, stub, - append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult)) + .action( + (controller, loc, stub) -> this.<Append, Result> noncedMutate(nonceGroup, nonce, controller, + loc, stub, append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult)) .call(); } @Override public CompletableFuture<Result> increment(Increment increment) { checkHasFamilies(increment); + long nonceGroup = conn.getNonceGenerator().getNonceGroup(); + long nonce = conn.getNonceGenerator().newNonce(); return this.<Result> newCaller(increment, rpcTimeoutNs) - .action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(controller, loc, - stub, increment, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult)) + .action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(nonceGroup, nonce, + controller, loc, stub, increment, RequestConverter::buildMutateRequest, + RawAsyncTableImpl::toResult)) .call(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java index 3008561..82a57f2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java @@ -21,15 +21,22 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import java.io.IOException; +import java.util.Optional; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -44,7 +51,7 @@ public class TestAsyncTableNoncedRetry { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestAsyncTableNoncedRetry.class); + HBaseClassTestRule.forClass(TestAsyncTableNoncedRetry.class); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); @@ -58,40 +65,50 @@ public class TestAsyncTableNoncedRetry { private static AsyncConnection ASYNC_CONN; - private static long NONCE = 1L; + @Rule + public TestName testName = new TestName(); + + private byte[] row; + + private static AtomicInteger CALLED = new AtomicInteger(); - private static NonceGenerator NONCE_GENERATOR = new NonceGenerator() { + private static long SLEEP_TIME = 2000; + + public static final class SleepOnceCP implements RegionObserver, RegionCoprocessor { @Override - public long newNonce() { - return NONCE; + public Optional<RegionObserver> getRegionObserver() { + return Optional.of(this); } @Override - public long getNonceGroup() { - return 1L; + public Result postAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append, + Result result) throws IOException { + if (CALLED.getAndIncrement() == 0) { + Threads.sleepWithoutInterrupt(SLEEP_TIME); + } + return RegionObserver.super.postAppend(c, append, result); } - }; - - @Rule - public TestName testName = new TestName(); - private byte[] row; + @Override + public Result postIncrement(ObserverContext<RegionCoprocessorEnvironment> c, + Increment increment, Result result) throws IOException { + if (CALLED.getAndIncrement() == 0) { + Threads.sleepWithoutInterrupt(SLEEP_TIME); + } + return RegionObserver.super.postIncrement(c, increment, result); + } + } @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.startMiniCluster(1); - TEST_UTIL.createTable(TABLE_NAME, FAMILY); + TEST_UTIL.getAdmin() + .createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) + .setCoprocessor(SleepOnceCP.class.getName()).build()); TEST_UTIL.waitTableAvailable(TABLE_NAME); - AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); - ASYNC_CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry, - registry.getClusterId().get(), User.getCurrent()) { - - @Override - public NonceGenerator getNonceGenerator() { - return NONCE_GENERATOR; - } - }; + ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); } @AfterClass @@ -103,28 +120,27 @@ public class TestAsyncTableNoncedRetry { @Before public void setUp() throws IOException, InterruptedException { row = Bytes.toBytes(testName.getMethodName().replaceAll("[^0-9A-Za-z]", "_")); - NONCE++; + CALLED.set(0); } @Test public void testAppend() throws InterruptedException, ExecutionException { - AsyncTable<?> table = ASYNC_CONN.getTable(TABLE_NAME); + assertEquals(0, CALLED.get()); + AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME) + .setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build(); Result result = table.append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)).get(); - assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); - result = table.append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)).get(); - // the second call should have no effect as we always generate the same nonce. - assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); - result = table.get(new Get(row)).get(); + // make sure we called twice and the result is still correct + assertEquals(2, CALLED.get()); assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); } @Test public void testIncrement() throws InterruptedException, ExecutionException { - AsyncTable<?> table = ASYNC_CONN.getTable(TABLE_NAME); - assertEquals(1L, table.incrementColumnValue(row, FAMILY, QUALIFIER, 1L).get().longValue()); - // the second call should have no effect as we always generate the same nonce. + assertEquals(0, CALLED.get()); + AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME) + .setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build(); assertEquals(1L, table.incrementColumnValue(row, FAMILY, QUALIFIER, 1L).get().longValue()); - Result result = table.get(new Get(row)).get(); - assertEquals(1L, Bytes.toLong(result.getValue(FAMILY, QUALIFIER))); + // make sure we called twice and the result is still correct + assertEquals(2, CALLED.get()); } }