This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 81e1e2f  HBASE-21910 The nonce implementation is wrong for AsyncTable
81e1e2f is described below

commit 81e1e2f94363e1bf4abb6102a8e379de875fee44
Author: Duo Zhang <zhang...@apache.org>
AuthorDate: Fri Feb 15 17:07:21 2019 +0800

    HBASE-21910 The nonce implementation is wrong for AsyncTable
    
    Signed-off-by: Guanghao Zhang <zg...@apache.org>
---
 .../hadoop/hbase/client/RawAsyncTableImpl.java     | 20 +++--
 .../hbase/client/TestAsyncTableNoncedRetry.java    | 86 +++++++++++++---------
 2 files changed, 63 insertions(+), 43 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index be94ca4..7562e6f 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -197,12 +197,10 @@ class RawAsyncTableImpl implements 
AsyncTable<AdvancedScanResultConsumer> {
     D convert(I info, S src, long nonceGroup, long nonce) throws IOException;
   }
 
-  private <REQ, RESP> CompletableFuture<RESP> noncedMutate(HBaseRpcController 
controller,
-      HRegionLocation loc, ClientService.Interface stub, REQ req,
+  private <REQ, RESP> CompletableFuture<RESP> noncedMutate(long nonceGroup, 
long nonce,
+      HBaseRpcController controller, HRegionLocation loc, 
ClientService.Interface stub, REQ req,
       NoncedConverter<MutateRequest, byte[], REQ> reqConvert,
       Converter<RESP, HBaseRpcController, MutateResponse> respConverter) {
-    long nonceGroup = conn.getNonceGenerator().getNonceGroup();
-    long nonce = conn.getNonceGenerator().newNonce();
     return mutate(controller, loc, stub, req,
       (info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), 
respConverter);
   }
@@ -254,18 +252,24 @@ class RawAsyncTableImpl implements 
AsyncTable<AdvancedScanResultConsumer> {
   @Override
   public CompletableFuture<Result> append(Append append) {
     checkHasFamilies(append);
+    long nonceGroup = conn.getNonceGenerator().getNonceGroup();
+    long nonce = conn.getNonceGenerator().newNonce();
     return this.<Result> newCaller(append, rpcTimeoutNs)
-      .action((controller, loc, stub) -> this.<Append, Result> 
noncedMutate(controller, loc, stub,
-        append, RequestConverter::buildMutateRequest, 
RawAsyncTableImpl::toResult))
+      .action(
+        (controller, loc, stub) -> this.<Append, Result> 
noncedMutate(nonceGroup, nonce, controller,
+          loc, stub, append, RequestConverter::buildMutateRequest, 
RawAsyncTableImpl::toResult))
       .call();
   }
 
   @Override
   public CompletableFuture<Result> increment(Increment increment) {
     checkHasFamilies(increment);
+    long nonceGroup = conn.getNonceGenerator().getNonceGroup();
+    long nonce = conn.getNonceGenerator().newNonce();
     return this.<Result> newCaller(increment, rpcTimeoutNs)
-      .action((controller, loc, stub) -> this.<Increment, Result> 
noncedMutate(controller, loc,
-        stub, increment, RequestConverter::buildMutateRequest, 
RawAsyncTableImpl::toResult))
+      .action((controller, loc, stub) -> this.<Increment, Result> 
noncedMutate(nonceGroup, nonce,
+        controller, loc, stub, increment, RequestConverter::buildMutateRequest,
+        RawAsyncTableImpl::toResult))
       .call();
   }
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
index 3008561..82a57f2 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
@@ -21,15 +21,22 @@ import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
+import java.util.Optional;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -44,7 +51,7 @@ public class TestAsyncTableNoncedRetry {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestAsyncTableNoncedRetry.class);
+    HBaseClassTestRule.forClass(TestAsyncTableNoncedRetry.class);
 
   private static final HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
 
@@ -58,40 +65,50 @@ public class TestAsyncTableNoncedRetry {
 
   private static AsyncConnection ASYNC_CONN;
 
-  private static long NONCE = 1L;
+  @Rule
+  public TestName testName = new TestName();
+
+  private byte[] row;
+
+  private static AtomicInteger CALLED = new AtomicInteger();
 
-  private static NonceGenerator NONCE_GENERATOR = new NonceGenerator() {
+  private static long SLEEP_TIME = 2000;
+
+  public static final class SleepOnceCP implements RegionObserver, 
RegionCoprocessor {
 
     @Override
-    public long newNonce() {
-      return NONCE;
+    public Optional<RegionObserver> getRegionObserver() {
+      return Optional.of(this);
     }
 
     @Override
-    public long getNonceGroup() {
-      return 1L;
+    public Result postAppend(ObserverContext<RegionCoprocessorEnvironment> c, 
Append append,
+        Result result) throws IOException {
+      if (CALLED.getAndIncrement() == 0) {
+        Threads.sleepWithoutInterrupt(SLEEP_TIME);
+      }
+      return RegionObserver.super.postAppend(c, append, result);
     }
-  };
-
-  @Rule
-  public TestName testName = new TestName();
 
-  private byte[] row;
+    @Override
+    public Result postIncrement(ObserverContext<RegionCoprocessorEnvironment> 
c,
+        Increment increment, Result result) throws IOException {
+      if (CALLED.getAndIncrement() == 0) {
+        Threads.sleepWithoutInterrupt(SLEEP_TIME);
+      }
+      return RegionObserver.super.postIncrement(c, increment, result);
+    }
+  }
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     TEST_UTIL.startMiniCluster(1);
-    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+    TEST_UTIL.getAdmin()
+      .createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
+        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
+        .setCoprocessor(SleepOnceCP.class.getName()).build());
     TEST_UTIL.waitTableAvailable(TABLE_NAME);
-    AsyncRegistry registry = 
AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
-    ASYNC_CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), 
registry,
-        registry.getClusterId().get(), User.getCurrent()) {
-
-      @Override
-      public NonceGenerator getNonceGenerator() {
-        return NONCE_GENERATOR;
-      }
-    };
+    ASYNC_CONN = 
ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
   }
 
   @AfterClass
@@ -103,28 +120,27 @@ public class TestAsyncTableNoncedRetry {
   @Before
   public void setUp() throws IOException, InterruptedException {
     row = Bytes.toBytes(testName.getMethodName().replaceAll("[^0-9A-Za-z]", 
"_"));
-    NONCE++;
+    CALLED.set(0);
   }
 
   @Test
   public void testAppend() throws InterruptedException, ExecutionException {
-    AsyncTable<?> table = ASYNC_CONN.getTable(TABLE_NAME);
+    assertEquals(0, CALLED.get());
+    AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
+      .setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build();
     Result result = table.append(new Append(row).addColumn(FAMILY, QUALIFIER, 
VALUE)).get();
-    assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
-    result = table.append(new Append(row).addColumn(FAMILY, QUALIFIER, 
VALUE)).get();
-    // the second call should have no effect as we always generate the same 
nonce.
-    assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
-    result = table.get(new Get(row)).get();
+    // make sure we called twice and the result is still correct
+    assertEquals(2, CALLED.get());
     assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
   }
 
   @Test
   public void testIncrement() throws InterruptedException, ExecutionException {
-    AsyncTable<?> table = ASYNC_CONN.getTable(TABLE_NAME);
-    assertEquals(1L, table.incrementColumnValue(row, FAMILY, QUALIFIER, 
1L).get().longValue());
-    // the second call should have no effect as we always generate the same 
nonce.
+    assertEquals(0, CALLED.get());
+    AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
+      .setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build();
     assertEquals(1L, table.incrementColumnValue(row, FAMILY, QUALIFIER, 
1L).get().longValue());
-    Result result = table.get(new Get(row)).get();
-    assertEquals(1L, Bytes.toLong(result.getValue(FAMILY, QUALIFIER)));
+    // make sure we called twice and the result is still correct
+    assertEquals(2, CALLED.get());
   }
 }

Reply via email to