Modified: 
hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java?rev=1564832&r1=1564831&r2=1564832&view=diff
==============================================================================
--- 
hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
 (original)
+++ 
hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
 Wed Feb  5 17:08:33 2014
@@ -27,6 +27,9 @@ import org.apache.hadoop.hbase.HRegionIn
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.MediumTests;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
 import org.junit.Assert;
@@ -73,9 +76,33 @@ public class TestAsyncProcess {
   private static final String success = "success";
   private static Exception failure = new Exception("failure");
 
-  static class MyAsyncProcess<Res> extends AsyncProcess<Res> {
+  static class MyAsyncProcess extends AsyncProcess {
     final AtomicInteger nbMultiResponse = new AtomicInteger();
     final AtomicInteger nbActions = new AtomicInteger();
+    public List<AsyncRequestFuture> allReqs = new 
ArrayList<AsyncRequestFuture>();
+
+    @Override
+    protected <Res> AsyncRequestFutureImpl<Res> 
createAsyncRequestFuture(TableName tableName,
+        List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
+        Batch.Callback<Res> callback, Object[] results, boolean needResults) {
+      // Test HTable has tableName of null, so pass DUMMY_TABLE
+      AsyncRequestFutureImpl<Res> r = super.createAsyncRequestFuture(
+          DUMMY_TABLE, actions, nonceGroup, pool, callback, results, 
needResults);
+      r.hardRetryLimit = new AtomicInteger(1);
+      allReqs.add(r);
+      return r;
+    }
+
+    @SuppressWarnings("unchecked")
+    public long getRetriesRequested() {
+      long result = 0;
+      for (AsyncRequestFuture ars : allReqs) {
+        if (ars instanceof AsyncProcess.AsyncRequestFutureImpl) {
+          result += (1 - 
((AsyncRequestFutureImpl<?>)ars).hardRetryLimit.get());
+        }
+      }
+      return result;
+    }
 
     static class CountingThreadFactory implements ThreadFactory {
       final AtomicInteger nbThreads;
@@ -91,15 +118,29 @@ public class TestAsyncProcess {
       }
     }
 
-    public MyAsyncProcess(HConnection hc, AsyncProcessCallback<Res> callback, 
Configuration conf) {
-      this(hc, callback, conf, new AtomicInteger());
+    public MyAsyncProcess(HConnection hc, Configuration conf) {
+      this(hc, conf, new AtomicInteger());
     }
 
-      public MyAsyncProcess(HConnection hc, AsyncProcessCallback<Res> 
callback, Configuration conf,
-                          AtomicInteger nbThreads) {
-      super(hc, DUMMY_TABLE, new ThreadPoolExecutor(1, 20, 60, 
TimeUnit.SECONDS,
-        new SynchronousQueue<Runnable>(), new 
CountingThreadFactory(nbThreads)),
-          callback, conf, new RpcRetryingCallerFactory(conf));
+    public MyAsyncProcess(HConnection hc, Configuration conf, AtomicInteger 
nbThreads) {
+      super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
+          new SynchronousQueue<Runnable>(), new 
CountingThreadFactory(nbThreads)),
+            new RpcRetryingCallerFactory(conf), false);
+    }
+
+    public MyAsyncProcess(
+          HConnection hc, Configuration conf, boolean useGlobalErrors) {
+      super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
+        new SynchronousQueue<Runnable>(), new CountingThreadFactory(new 
AtomicInteger())),
+          new RpcRetryingCallerFactory(conf), useGlobalErrors);
+    }
+
+    @Override
+    public <Res> AsyncRequestFuture submit(TableName tableName, List<? extends 
Row> rows,
+        boolean atLeastOne, Callback<Res> callback, boolean needResults)
+            throws InterruptedIOException {
+      // We use results in tests to check things, so override to always save 
them.
+      return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true);
     }
 
     @Override
@@ -112,7 +153,7 @@ public class TestAsyncProcess {
         throws IOException, RuntimeException {
           try {
             // sleep one second in order for threadpool to start another 
thread instead of reusing
-            // existing one. 
+            // existing one.
             Thread.sleep(1000);
           } catch (InterruptedException e) {
             // ignore error
@@ -144,7 +185,6 @@ public class TestAsyncProcess {
    * Returns our async process.
    */
   static class MyConnectionImpl extends 
HConnectionManager.HConnectionImplementation {
-    MyAsyncProcess<?> ap;
     final AtomicInteger nbThreads = new AtomicInteger(0);
     final static Configuration c = new Configuration();
 
@@ -161,15 +201,6 @@ public class TestAsyncProcess {
     }
 
     @Override
-    protected <R> AsyncProcess createAsyncProcess(TableName tableName,
-                                                  ExecutorService pool,
-                                                  
AsyncProcess.AsyncProcessCallback<R> callback,
-                                                  Configuration confn ) {
-      ap = new MyAsyncProcess<R>(this, callback, c, nbThreads);
-      return ap;
-    }
-
-    @Override
     public HRegionLocation locateRegion(final TableName tableName,
                                         final byte[] row) {
       return loc1;
@@ -207,55 +238,57 @@ public class TestAsyncProcess {
   @Test
   public void testSubmit() throws Exception {
     HConnection hc = createHConnection();
-    AsyncProcess ap = new MyAsyncProcess<Object>(hc, null, conf);
+    AsyncProcess ap = new MyAsyncProcess(hc, conf);
 
     List<Put> puts = new ArrayList<Put>();
     puts.add(createPut(1, true));
 
-    ap.submit(puts, false);
+    ap.submit(DUMMY_TABLE, puts, false, null, false);
     Assert.assertTrue(puts.isEmpty());
   }
 
   @Test
   public void testSubmitWithCB() throws Exception {
     HConnection hc = createHConnection();
-    MyCB mcb = new MyCB();
-    AsyncProcess ap = new MyAsyncProcess<Object>(hc, mcb, conf);
+    final AtomicInteger updateCalled = new AtomicInteger(0);
+    Batch.Callback<Object> cb = new Batch.Callback<Object>() {
+      public void update(byte[] region, byte[] row, Object result) {
+        updateCalled.incrementAndGet();
+      }
+    };
+    AsyncProcess ap = new MyAsyncProcess(hc, conf);
 
     List<Put> puts = new ArrayList<Put>();
     puts.add(createPut(1, true));
 
-    ap.submit(puts, false);
+    final AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, cb, 
false);
     Assert.assertTrue(puts.isEmpty());
-
-    while (!(mcb.successCalled.get() == 1) && !ap.hasError()) {
-      Thread.sleep(1);
-    }
-    Assert.assertEquals(mcb.successCalled.get(), 1);
+    ars.waitUntilDone();
+    Assert.assertEquals(updateCalled.get(), 1);
   }
 
   @Test
   public void testSubmitBusyRegion() throws Exception {
     HConnection hc = createHConnection();
-    AsyncProcess ap = new MyAsyncProcess<Object>(hc, null, conf);
+    AsyncProcess ap = new MyAsyncProcess(hc, conf);
 
     List<Put> puts = new ArrayList<Put>();
     puts.add(createPut(1, true));
 
     ap.incTaskCounters(Arrays.asList(hri1.getRegionName()), sn);
-    ap.submit(puts, false);
+    ap.submit(DUMMY_TABLE, puts, false, null, false);
     Assert.assertEquals(puts.size(), 1);
 
     ap.decTaskCounters(Arrays.asList(hri1.getRegionName()), sn);
-    ap.submit(puts, false);
-    Assert.assertTrue(puts.isEmpty());
+    ap.submit(DUMMY_TABLE, puts, false, null, false);
+    Assert.assertEquals(0, puts.size());
   }
 
 
   @Test
   public void testSubmitBusyRegionServer() throws Exception {
     HConnection hc = createHConnection();
-    AsyncProcess<Object> ap = new MyAsyncProcess<Object>(hc, null, conf);
+    AsyncProcess ap = new MyAsyncProcess(hc, conf);
 
     ap.taskCounterPerServer.put(sn2, new 
AtomicInteger(ap.maxConcurrentTasksPerServer));
 
@@ -265,80 +298,44 @@ public class TestAsyncProcess {
     puts.add(createPut(1, true)); // <== this one will make it, the region is 
already in
     puts.add(createPut(2, true)); // <== new region, but the rs is ok
 
-    ap.submit(puts, false);
+    ap.submit(DUMMY_TABLE, puts, false, null, false);
     Assert.assertEquals(" puts=" + puts, 1, puts.size());
 
     ap.taskCounterPerServer.put(sn2, new 
AtomicInteger(ap.maxConcurrentTasksPerServer - 1));
-    ap.submit(puts, false);
+    ap.submit(DUMMY_TABLE, puts, false, null, false);
     Assert.assertTrue(puts.isEmpty());
   }
 
   @Test
   public void testFail() throws Exception {
-    HConnection hc = createHConnection();
-    MyCB mcb = new MyCB();
-    AsyncProcess ap = new MyAsyncProcess<Object>(hc, mcb, conf);
+    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
 
     List<Put> puts = new ArrayList<Put>();
     Put p = createPut(1, false);
     puts.add(p);
 
-    ap.submit(puts, false);
-    Assert.assertTrue(puts.isEmpty());
-
-    while (!ap.hasError()) {
-      Thread.sleep(1);
-    }
-
-    Assert.assertEquals(0, mcb.successCalled.get());
-    Assert.assertEquals(2, mcb.retriableFailure.get());
-    Assert.assertEquals(1, mcb.failureCalled.get());
-
-    Assert.assertEquals(1, ap.getErrors().exceptions.size());
-    Assert.assertTrue("was: " + ap.getErrors().exceptions.get(0),
-        failure.equals(ap.getErrors().exceptions.get(0)));
-    Assert.assertTrue("was: " + ap.getErrors().exceptions.get(0),
-        failure.equals(ap.getErrors().exceptions.get(0)));
-
-    Assert.assertEquals(1, ap.getFailedOperations().size());
-    Assert.assertTrue("was: " + ap.getFailedOperations().get(0),
-        p.equals(ap.getFailedOperations().get(0)));
+    AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
+    Assert.assertEquals(0, puts.size());
+    ars.waitUntilDone();
+    verifyResult(ars, false);
+    Assert.assertEquals(2L, ap.getRetriesRequested());
+
+    Assert.assertEquals(1, ars.getErrors().exceptions.size());
+    Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0),
+        failure.equals(ars.getErrors().exceptions.get(0)));
+    Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0),
+        failure.equals(ars.getErrors().exceptions.get(0)));
+
+    Assert.assertEquals(1, ars.getFailedOperations().size());
+    Assert.assertTrue("was: " + ars.getFailedOperations().get(0),
+        p.equals(ars.getFailedOperations().get(0)));
   }
 
-  @Test
-  public void testWaitForNextTaskDone() throws IOException {
-    HConnection hc = createHConnection();
-    MyCB mcb = new MyCB();
-    final AsyncProcess ap = new MyAsyncProcess<Object>(hc, mcb, conf);
-    ap.tasksSent.incrementAndGet();
-
-    final AtomicBoolean checkPoint = new AtomicBoolean(false);
-    final AtomicBoolean checkPoint2 = new AtomicBoolean(false);
-
-    Thread t = new Thread(){
-      @Override
-      public void run(){
-        Threads.sleep(1000);
-        Assert.assertFalse(checkPoint.get());
-        ap.tasksDone.incrementAndGet();
-        checkPoint2.set(true);
-      }
-    };
-
-    t.start();
-    ap.waitForNextTaskDone(0);
-    checkPoint.set(true);
-    while (!checkPoint2.get()){
-      Threads.sleep(1);
-    }
-  }
 
   @Test
   public void testSubmitTrue() throws IOException {
-    HConnection hc = createHConnection();
-    MyCB mcb = new MyCB();
-    final AsyncProcess<Object> ap = new MyAsyncProcess<Object>(hc, mcb, conf);
-    ap.tasksSent.incrementAndGet();
+    final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, 
false);
+    ap.tasksInProgress.incrementAndGet();
     final AtomicInteger ai = new AtomicInteger(1);
     ap.taskCounterPerRegion.put(hri1.getRegionName(), ai);
 
@@ -349,9 +346,9 @@ public class TestAsyncProcess {
       @Override
       public void run(){
         Threads.sleep(1000);
-        Assert.assertFalse(checkPoint.get());
+        Assert.assertFalse(checkPoint.get()); // TODO: this is timing-dependent
         ai.decrementAndGet();
-        ap.tasksDone.incrementAndGet();
+        ap.tasksInProgress.decrementAndGet();
         checkPoint2.set(true);
       }
     };
@@ -360,12 +357,12 @@ public class TestAsyncProcess {
     Put p = createPut(1, true);
     puts.add(p);
 
-    ap.submit(puts, false);
+    ap.submit(DUMMY_TABLE, puts, false, null, false);
     Assert.assertFalse(puts.isEmpty());
 
     t.start();
 
-    ap.submit(puts, true);
+    ap.submit(DUMMY_TABLE, puts, true, null, false);
     Assert.assertTrue(puts.isEmpty());
 
     checkPoint.set(true);
@@ -376,71 +373,50 @@ public class TestAsyncProcess {
 
   @Test
   public void testFailAndSuccess() throws Exception {
-    HConnection hc = createHConnection();
-    MyCB mcb = new MyCB();
-    AsyncProcess ap = new MyAsyncProcess<Object>(hc, mcb, conf);
+    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
 
     List<Put> puts = new ArrayList<Put>();
     puts.add(createPut(1, false));
     puts.add(createPut(1, true));
     puts.add(createPut(1, true));
 
-    ap.submit(puts, false);
+    AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
     Assert.assertTrue(puts.isEmpty());
-
-    long cutoff = System.currentTimeMillis() + 60000;
-    while (!ap.hasError() && System.currentTimeMillis() < cutoff) {
-      Thread.sleep(1);
-    }
-    Assert.assertTrue(ap.hasError());
-    ap.waitUntilDone();
- 
-    Assert.assertEquals(mcb.successCalled.get(), 2);
-    Assert.assertEquals(mcb.retriableFailure.get(), 2);
-    Assert.assertEquals(mcb.failureCalled.get(), 1);
-
-    Assert.assertEquals(1, ap.getErrors().actions.size());
-
+    ars.waitUntilDone();
+    verifyResult(ars, false, true, true);
+    Assert.assertEquals(2, ap.getRetriesRequested());
+    Assert.assertEquals(1, ars.getErrors().actions.size());
 
     puts.add(createPut(1, true));
-    ap.submit(puts, false);
-    Assert.assertTrue(puts.isEmpty());
-
-    while (mcb.successCalled.get() != 3) {
-      Thread.sleep(1);
-    }
-    Assert.assertEquals(mcb.retriableFailure.get(), 2);
-    Assert.assertEquals(mcb.failureCalled.get(), 1);
-
-    ap.clearErrors();
-    Assert.assertTrue(ap.getErrors().actions.isEmpty());
+    // Wait for AP to be free. While ars might have the result, ap counters 
are decreased later.
+    ap.waitUntilDone();
+    ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
+    Assert.assertEquals(0, puts.size());
+    ars.waitUntilDone();
+    Assert.assertEquals(2, ap.getRetriesRequested());
+    verifyResult(ars, true);
   }
 
   @Test
   public void testFlush() throws Exception {
-    HConnection hc = createHConnection();
-    MyCB mcb = new MyCB();
-    AsyncProcess ap = new MyAsyncProcess<Object>(hc, mcb, conf);
+    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
 
     List<Put> puts = new ArrayList<Put>();
     puts.add(createPut(1, false));
     puts.add(createPut(1, true));
     puts.add(createPut(1, true));
 
-    ap.submit(puts, false);
-    ap.waitUntilDone();
-
-    Assert.assertEquals(mcb.successCalled.get(), 2);
-    Assert.assertEquals(mcb.retriableFailure.get(), 2);
-    Assert.assertEquals(mcb.failureCalled.get(), 1);
+    AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
+    ars.waitUntilDone();
+    verifyResult(ars, false, true, true);
+    Assert.assertEquals(2, ap.getRetriesRequested());
 
-    Assert.assertEquals(1, ap.getFailedOperations().size());
+    Assert.assertEquals(1, ars.getFailedOperations().size());
   }
 
   @Test
   public void testMaxTask() throws Exception {
-    HConnection hc = createHConnection();
-    final AsyncProcess ap = new MyAsyncProcess<Object>(hc, null, conf);
+    final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, 
false);
 
     for (int i = 0; i < 1000; i++) {
       ap.incTaskCounters(Arrays.asList("dummy".getBytes()), sn);
@@ -461,7 +437,7 @@ public class TestAsyncProcess {
     t.start();
 
     try {
-      ap.submit(puts, false);
+      ap.submit(DUMMY_TABLE, puts, false, null, false);
       Assert.fail("We should have been interrupted.");
     } catch (InterruptedIOException expected) {
     }
@@ -471,7 +447,7 @@ public class TestAsyncProcess {
     Thread t2 = new Thread() {
       public void run() {
         Threads.sleep(sleepTime);
-        while (ap.tasksDone.get() > 0) {
+        while (ap.tasksInProgress.get() > 0) {
           ap.decTaskCounters(Arrays.asList("dummy".getBytes()), sn);
         }
       }
@@ -479,39 +455,13 @@ public class TestAsyncProcess {
     t2.start();
 
     long start = System.currentTimeMillis();
-    ap.submit(new ArrayList<Row>(), false);
+    ap.submit(DUMMY_TABLE, new ArrayList<Row>(), false, null, false);
     long end = System.currentTimeMillis();
 
     //Adds 100 to secure us against approximate timing.
     Assert.assertTrue(start + 100L + sleepTime > end);
   }
 
-
-  private class MyCB implements AsyncProcess.AsyncProcessCallback<Object> {
-    private final AtomicInteger successCalled = new AtomicInteger(0);
-    private final AtomicInteger failureCalled = new AtomicInteger(0);
-    private final AtomicInteger retriableFailure = new AtomicInteger(0);
-
-
-    @Override
-    public void success(int originalIndex, byte[] region, Row row, Object o) {
-      successCalled.incrementAndGet();
-    }
-
-    @Override
-    public boolean failure(int originalIndex, Row row, Throwable t) {
-      failureCalled.incrementAndGet();
-      return true;
-    }
-
-    @Override
-    public boolean retriableFailure(int originalIndex, Row row, Throwable 
exception) {
-      // We retry once only.
-      return (retriableFailure.incrementAndGet() < 2);
-    }
-  }
-
-
   private static HConnection createHConnection() throws IOException {
     HConnection hc = Mockito.mock(HConnection.class);
 
@@ -535,14 +485,17 @@ public class TestAsyncProcess {
     Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE),
         Mockito.eq(FAILS))).thenReturn(loc2);
 
+    NonceGenerator ng = Mockito.mock(NonceGenerator.class);
+    Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE);
+    Mockito.when(hc.getNonceGenerator()).thenReturn(ng);
+
     return hc;
   }
 
   @Test
   public void testHTablePutSuccess() throws Exception {
     HTable ht = Mockito.mock(HTable.class);
-    HConnection hc = createHConnection();
-    ht.ap = new MyAsyncProcess<Object>(hc, null, conf);
+    ht.ap = new MyAsyncProcess(createHConnection(), conf, true);
 
     Put put = createPut(1, true);
 
@@ -553,9 +506,8 @@ public class TestAsyncProcess {
 
   private void doHTableFailedPut(boolean bufferOn) throws Exception {
     HTable ht = new HTable();
-    HConnection hc = createHConnection();
-    MyCB mcb = new MyCB(); // This allows to have some hints on what's going 
on.
-    ht.ap = new MyAsyncProcess<Object>(hc, mcb, conf);
+    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, true);
+    ht.ap = ap;
     ht.setAutoFlush(true, true);
     if (bufferOn) {
       ht.setWriteBufferSize(1024L * 1024L);
@@ -575,9 +527,15 @@ public class TestAsyncProcess {
     } catch (RetriesExhaustedException expected) {
     }
     Assert.assertEquals(0L, ht.currentWriteBufferSize);
-    Assert.assertEquals(0, mcb.successCalled.get());
-    Assert.assertEquals(2, mcb.retriableFailure.get());
-    Assert.assertEquals(1, mcb.failureCalled.get());
+    // The table should have sent one request, maybe after multiple attempts
+    AsyncRequestFuture ars = null;
+    for (AsyncRequestFuture someReqs : ap.allReqs) {
+      if (someReqs.getResults().length == 0) continue;
+      Assert.assertTrue(ars == null);
+      ars = someReqs;
+    }
+    Assert.assertTrue(ars != null);
+    verifyResult(ars, false);
 
     // This should not raise any exception, puts have been 'received' before 
by the catch.
     ht.close();
@@ -589,23 +547,22 @@ public class TestAsyncProcess {
   }
 
   @Test
-  public void doHTableFailedPutWithoutBuffer() throws Exception {
+  public void testHTableFailedPutWithoutBuffer() throws Exception {
     doHTableFailedPut(false);
   }
 
   @Test
   public void testHTableFailedPutAndNewPut() throws Exception {
     HTable ht = new HTable();
-    HConnection hc = createHConnection();
-    MyCB mcb = new MyCB(); // This allows to have some hints on what's going 
on.
-    ht.ap = new MyAsyncProcess<Object>(hc, mcb, conf);
+    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, true);
+    ht.ap = ap;
     ht.setAutoFlush(false, true);
     ht.setWriteBufferSize(0);
 
     Put p = createPut(1, false);
     ht.put(p);
 
-    ht.ap.waitUntilDone(); // Let's do all the retries.
+    ap.waitUntilDone(); // Let's do all the retries.
 
     // We're testing that we're behaving as we were behaving in 0.94: sending 
exceptions in the
     //  doPut if it fails.
@@ -626,14 +583,13 @@ public class TestAsyncProcess {
   @Test
   public void testWithNoClearOnFail() throws IOException {
     HTable ht = new HTable();
-    HConnection hc = createHConnection();
-    MyCB mcb = new MyCB();
-    ht.ap = new MyAsyncProcess<Object>(hc, mcb, conf);
+    ht.ap = new MyAsyncProcess(createHConnection(), conf, true);
     ht.setAutoFlush(false, false);
 
     Put p = createPut(1, false);
     ht.put(p);
     Assert.assertEquals(0, ht.writeAsyncBuffer.size());
+
     try {
       ht.flushCommits();
     } catch (RetriesExhaustedWithDetailsException expected) {
@@ -651,6 +607,7 @@ public class TestAsyncProcess {
   public void testBatch() throws IOException, InterruptedException {
     HTable ht = new HTable();
     ht.connection = new MyConnectionImpl();
+    ht.multiAp = new MyAsyncProcess(ht.connection, conf, false);
 
     List<Put> puts = new ArrayList<Put>();
     puts.add(createPut(1, true));
@@ -686,10 +643,9 @@ public class TestAsyncProcess {
     // set default writeBufferSize
     ht.setWriteBufferSize(configuration.getLong("hbase.client.write.buffer", 
2097152));
 
-    MyConnectionImpl mci = new MyConnectionImpl(configuration);
-    ht.connection = mci;
-    ht.ap = new MyAsyncProcess<Object>(mci, null, configuration);
-
+    ht.connection = new MyConnectionImpl(configuration);
+    MyAsyncProcess ap = new MyAsyncProcess(ht.connection, conf, true);
+    ht.ap = ap;
 
     Assert.assertNotNull(ht.ap.createServerErrorTracker());
     Assert.assertTrue(ht.ap.serverTrackerTimeout > 200);
@@ -705,7 +661,7 @@ public class TestAsyncProcess {
     } catch (RetriesExhaustedWithDetailsException expected) {
     }
     // Checking that the ErrorsServers came into play and didn't make us stop 
immediately
-    Assert.assertEquals(ht.ap.tasksSent.get(), 3);
+    Assert.assertEquals(2, ap.getRetriesRequested());
   }
 
   /**
@@ -730,11 +686,13 @@ public class TestAsyncProcess {
     HTable ht = new HTable();
     MyConnectionImpl2 con = new MyConnectionImpl2(hrls);
     ht.connection = con;
+    MyAsyncProcess ap = new MyAsyncProcess(con, conf, con.nbThreads);
+    ht.multiAp = ap;
 
     ht.batch(gets);
 
-    Assert.assertEquals(con.ap.nbActions.get(), NB_REGS);
-    Assert.assertEquals("1 multi response per server", 2, 
con.ap.nbMultiResponse.get());
+    Assert.assertEquals(ap.nbActions.get(), NB_REGS);
+    Assert.assertEquals("1 multi response per server", 2, 
ap.nbMultiResponse.get());
     Assert.assertEquals("1 thread per server", 2, con.nbThreads.get());
 
     int nbReg = 0;
@@ -744,6 +702,13 @@ public class TestAsyncProcess {
     Assert.assertEquals("nbReg=" + nbReg, nbReg, NB_REGS);
   }
 
+  private void verifyResult(AsyncRequestFuture ars, boolean... expected) {
+    Object[] actual = ars.getResults();
+    Assert.assertEquals(expected.length, actual.length);
+    for (int i = 0; i < expected.length; ++i) {
+      Assert.assertEquals(expected[i], !(actual[i] instanceof Throwable));
+    }
+  }
 
   /**
    * @param regCnt  the region: 1 to 3.

Modified: 
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java?rev=1564832&r1=1564831&r2=1564832&view=diff
==============================================================================
--- 
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java
 (original)
+++ 
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java
 Wed Feb  5 17:08:33 2014
@@ -63,6 +63,7 @@ import com.google.protobuf.ServiceExcept
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class CoprocessorHConnection implements HConnection {
+  private static final NonceGenerator ng = new 
HConnectionManager.NoNonceGenerator();
 
   /**
    * Create an unmanaged {@link HConnection} based on the environment in which 
we are running the
@@ -388,6 +389,11 @@ public class CoprocessorHConnection impl
 
   @Override
   public NonceGenerator getNonceGenerator() {
-    return null; // don't use nonces for coprocessor connection
+    return ng; // don't use nonces for coprocessor connection
+  }
+
+  @Override
+  public AsyncProcess getAsyncProcess() {
+    return delegate.getAsyncProcess();
   }
 }
\ No newline at end of file

Modified: 
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java?rev=1564832&r1=1564831&r2=1564832&view=diff
==============================================================================
--- 
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
 (original)
+++ 
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
 Wed Feb  5 17:08:33 2014
@@ -117,6 +117,10 @@ public class HConnectionTestingUtility {
       Mockito.when(c.getClient(Mockito.any(ServerName.class))).
         thenReturn(client);
     }
+    NonceGenerator ng = Mockito.mock(NonceGenerator.class);
+    Mockito.when(c.getNonceGenerator()).thenReturn(ng);
+    Mockito.when(c.getAsyncProcess()).thenReturn(new AsyncProcess(
+        c, conf, null, RpcRetryingCallerFactory.instantiate(conf), false));
     return c;
   }
 

Modified: 
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java?rev=1564832&r1=1564831&r2=1564832&view=diff
==============================================================================
--- 
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
 (original)
+++ 
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
 Wed Feb  5 17:08:33 2014
@@ -62,8 +62,13 @@ import org.apache.hadoop.hbase.master.Ca
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
+import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
+import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
@@ -73,6 +78,8 @@ import org.apache.hadoop.hbase.zookeeper
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import com.google.protobuf.RpcController;
 import com.google.protobuf.Service;
@@ -105,6 +112,18 @@ public class TestCatalogJanitor {
       } catch (ServiceException se) {
         throw ProtobufUtil.getRemoteException(se);
       }
+      try {
+        Mockito.when(ri.multi(
+          (RpcController)Mockito.any(), (MultiRequest)Mockito.any())).
+            thenAnswer(new Answer<MultiResponse>() {
+              @Override
+              public MultiResponse answer(InvocationOnMock invocation) throws 
Throwable {
+                return buildMultiResponse( 
(MultiRequest)invocation.getArguments()[1]);
+              }
+            });
+      } catch (ServiceException se) {
+        throw ProtobufUtil.getRemoteException(se);
+      }
       // Mock an HConnection and a AdminProtocol implementation.  Have the
       // HConnection return the HRI.  Have the HRI return a few mocked up 
responses
       // to make our test work.
@@ -940,5 +959,23 @@ public class TestCatalogJanitor {
     return htd;
   }
 
+  private MultiResponse buildMultiResponse(MultiRequest req) {
+    MultiResponse.Builder builder = MultiResponse.newBuilder();
+    RegionActionResult.Builder regionActionResultBuilder =
+        RegionActionResult.newBuilder();
+    ResultOrException.Builder roeBuilder = ResultOrException.newBuilder();
+    for (RegionAction regionAction: req.getRegionActionList()) {
+      regionActionResultBuilder.clear();
+      for (ClientProtos.Action action: regionAction.getActionList()) {
+        roeBuilder.clear();
+        roeBuilder.setResult(ClientProtos.Result.getDefaultInstance());
+        roeBuilder.setIndex(action.getIndex());
+        regionActionResultBuilder.addResultOrException(roeBuilder.build());
+      }
+      builder.addRegionActionResult(regionActionResultBuilder.build());
+    }
+    return builder.build();
+  }
+
 }
 


Reply via email to