Modified: hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java?rev=1564832&r1=1564831&r2=1564832&view=diff ============================================================================== --- hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java (original) +++ hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java Wed Feb 5 17:08:33 2014 @@ -27,6 +27,9 @@ import org.apache.hadoop.hbase.HRegionIn import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture; +import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.junit.Assert; @@ -73,9 +76,33 @@ public class TestAsyncProcess { private static final String success = "success"; private static Exception failure = new Exception("failure"); - static class MyAsyncProcess<Res> extends AsyncProcess<Res> { + static class MyAsyncProcess extends AsyncProcess { final AtomicInteger nbMultiResponse = new AtomicInteger(); final AtomicInteger nbActions = new AtomicInteger(); + public List<AsyncRequestFuture> allReqs = new ArrayList<AsyncRequestFuture>(); + + @Override + protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName tableName, + List<Action<Row>> actions, long nonceGroup, ExecutorService pool, + Batch.Callback<Res> callback, Object[] results, boolean needResults) { + // Test HTable has tableName of null, so pass DUMMY_TABLE + AsyncRequestFutureImpl<Res> r = super.createAsyncRequestFuture( + DUMMY_TABLE, actions, nonceGroup, pool, callback, results, needResults); + r.hardRetryLimit = new AtomicInteger(1); + allReqs.add(r); + return r; + } + + @SuppressWarnings("unchecked") + public long getRetriesRequested() { + long result = 0; + for (AsyncRequestFuture ars : allReqs) { + if (ars instanceof AsyncProcess.AsyncRequestFutureImpl) { + result += (1 - ((AsyncRequestFutureImpl<?>)ars).hardRetryLimit.get()); + } + } + return result; + } static class CountingThreadFactory implements ThreadFactory { final AtomicInteger nbThreads; @@ -91,15 +118,29 @@ public class TestAsyncProcess { } } - public MyAsyncProcess(HConnection hc, AsyncProcessCallback<Res> callback, Configuration conf) { - this(hc, callback, conf, new AtomicInteger()); + public MyAsyncProcess(HConnection hc, Configuration conf) { + this(hc, conf, new AtomicInteger()); } - public MyAsyncProcess(HConnection hc, AsyncProcessCallback<Res> callback, Configuration conf, - AtomicInteger nbThreads) { - super(hc, DUMMY_TABLE, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, - new SynchronousQueue<Runnable>(), new CountingThreadFactory(nbThreads)), - callback, conf, new RpcRetryingCallerFactory(conf)); + public MyAsyncProcess(HConnection hc, Configuration conf, AtomicInteger nbThreads) { + super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, + new SynchronousQueue<Runnable>(), new CountingThreadFactory(nbThreads)), + new RpcRetryingCallerFactory(conf), false); + } + + public MyAsyncProcess( + HConnection hc, Configuration conf, boolean useGlobalErrors) { + super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, + new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())), + new RpcRetryingCallerFactory(conf), useGlobalErrors); + } + + @Override + public <Res> AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows, + boolean atLeastOne, Callback<Res> callback, boolean needResults) + throws InterruptedIOException { + // We use results in tests to check things, so override to always save them. + return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true); } @Override @@ -112,7 +153,7 @@ public class TestAsyncProcess { throws IOException, RuntimeException { try { // sleep one second in order for threadpool to start another thread instead of reusing - // existing one. + // existing one. Thread.sleep(1000); } catch (InterruptedException e) { // ignore error @@ -144,7 +185,6 @@ public class TestAsyncProcess { * Returns our async process. */ static class MyConnectionImpl extends HConnectionManager.HConnectionImplementation { - MyAsyncProcess<?> ap; final AtomicInteger nbThreads = new AtomicInteger(0); final static Configuration c = new Configuration(); @@ -161,15 +201,6 @@ public class TestAsyncProcess { } @Override - protected <R> AsyncProcess createAsyncProcess(TableName tableName, - ExecutorService pool, - AsyncProcess.AsyncProcessCallback<R> callback, - Configuration confn ) { - ap = new MyAsyncProcess<R>(this, callback, c, nbThreads); - return ap; - } - - @Override public HRegionLocation locateRegion(final TableName tableName, final byte[] row) { return loc1; @@ -207,55 +238,57 @@ public class TestAsyncProcess { @Test public void testSubmit() throws Exception { HConnection hc = createHConnection(); - AsyncProcess ap = new MyAsyncProcess<Object>(hc, null, conf); + AsyncProcess ap = new MyAsyncProcess(hc, conf); List<Put> puts = new ArrayList<Put>(); puts.add(createPut(1, true)); - ap.submit(puts, false); + ap.submit(DUMMY_TABLE, puts, false, null, false); Assert.assertTrue(puts.isEmpty()); } @Test public void testSubmitWithCB() throws Exception { HConnection hc = createHConnection(); - MyCB mcb = new MyCB(); - AsyncProcess ap = new MyAsyncProcess<Object>(hc, mcb, conf); + final AtomicInteger updateCalled = new AtomicInteger(0); + Batch.Callback<Object> cb = new Batch.Callback<Object>() { + public void update(byte[] region, byte[] row, Object result) { + updateCalled.incrementAndGet(); + } + }; + AsyncProcess ap = new MyAsyncProcess(hc, conf); List<Put> puts = new ArrayList<Put>(); puts.add(createPut(1, true)); - ap.submit(puts, false); + final AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, cb, false); Assert.assertTrue(puts.isEmpty()); - - while (!(mcb.successCalled.get() == 1) && !ap.hasError()) { - Thread.sleep(1); - } - Assert.assertEquals(mcb.successCalled.get(), 1); + ars.waitUntilDone(); + Assert.assertEquals(updateCalled.get(), 1); } @Test public void testSubmitBusyRegion() throws Exception { HConnection hc = createHConnection(); - AsyncProcess ap = new MyAsyncProcess<Object>(hc, null, conf); + AsyncProcess ap = new MyAsyncProcess(hc, conf); List<Put> puts = new ArrayList<Put>(); puts.add(createPut(1, true)); ap.incTaskCounters(Arrays.asList(hri1.getRegionName()), sn); - ap.submit(puts, false); + ap.submit(DUMMY_TABLE, puts, false, null, false); Assert.assertEquals(puts.size(), 1); ap.decTaskCounters(Arrays.asList(hri1.getRegionName()), sn); - ap.submit(puts, false); - Assert.assertTrue(puts.isEmpty()); + ap.submit(DUMMY_TABLE, puts, false, null, false); + Assert.assertEquals(0, puts.size()); } @Test public void testSubmitBusyRegionServer() throws Exception { HConnection hc = createHConnection(); - AsyncProcess<Object> ap = new MyAsyncProcess<Object>(hc, null, conf); + AsyncProcess ap = new MyAsyncProcess(hc, conf); ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer)); @@ -265,80 +298,44 @@ public class TestAsyncProcess { puts.add(createPut(1, true)); // <== this one will make it, the region is already in puts.add(createPut(2, true)); // <== new region, but the rs is ok - ap.submit(puts, false); + ap.submit(DUMMY_TABLE, puts, false, null, false); Assert.assertEquals(" puts=" + puts, 1, puts.size()); ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer - 1)); - ap.submit(puts, false); + ap.submit(DUMMY_TABLE, puts, false, null, false); Assert.assertTrue(puts.isEmpty()); } @Test public void testFail() throws Exception { - HConnection hc = createHConnection(); - MyCB mcb = new MyCB(); - AsyncProcess ap = new MyAsyncProcess<Object>(hc, mcb, conf); + MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false); List<Put> puts = new ArrayList<Put>(); Put p = createPut(1, false); puts.add(p); - ap.submit(puts, false); - Assert.assertTrue(puts.isEmpty()); - - while (!ap.hasError()) { - Thread.sleep(1); - } - - Assert.assertEquals(0, mcb.successCalled.get()); - Assert.assertEquals(2, mcb.retriableFailure.get()); - Assert.assertEquals(1, mcb.failureCalled.get()); - - Assert.assertEquals(1, ap.getErrors().exceptions.size()); - Assert.assertTrue("was: " + ap.getErrors().exceptions.get(0), - failure.equals(ap.getErrors().exceptions.get(0))); - Assert.assertTrue("was: " + ap.getErrors().exceptions.get(0), - failure.equals(ap.getErrors().exceptions.get(0))); - - Assert.assertEquals(1, ap.getFailedOperations().size()); - Assert.assertTrue("was: " + ap.getFailedOperations().get(0), - p.equals(ap.getFailedOperations().get(0))); + AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true); + Assert.assertEquals(0, puts.size()); + ars.waitUntilDone(); + verifyResult(ars, false); + Assert.assertEquals(2L, ap.getRetriesRequested()); + + Assert.assertEquals(1, ars.getErrors().exceptions.size()); + Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0), + failure.equals(ars.getErrors().exceptions.get(0))); + Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0), + failure.equals(ars.getErrors().exceptions.get(0))); + + Assert.assertEquals(1, ars.getFailedOperations().size()); + Assert.assertTrue("was: " + ars.getFailedOperations().get(0), + p.equals(ars.getFailedOperations().get(0))); } - @Test - public void testWaitForNextTaskDone() throws IOException { - HConnection hc = createHConnection(); - MyCB mcb = new MyCB(); - final AsyncProcess ap = new MyAsyncProcess<Object>(hc, mcb, conf); - ap.tasksSent.incrementAndGet(); - - final AtomicBoolean checkPoint = new AtomicBoolean(false); - final AtomicBoolean checkPoint2 = new AtomicBoolean(false); - - Thread t = new Thread(){ - @Override - public void run(){ - Threads.sleep(1000); - Assert.assertFalse(checkPoint.get()); - ap.tasksDone.incrementAndGet(); - checkPoint2.set(true); - } - }; - - t.start(); - ap.waitForNextTaskDone(0); - checkPoint.set(true); - while (!checkPoint2.get()){ - Threads.sleep(1); - } - } @Test public void testSubmitTrue() throws IOException { - HConnection hc = createHConnection(); - MyCB mcb = new MyCB(); - final AsyncProcess<Object> ap = new MyAsyncProcess<Object>(hc, mcb, conf); - ap.tasksSent.incrementAndGet(); + final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false); + ap.tasksInProgress.incrementAndGet(); final AtomicInteger ai = new AtomicInteger(1); ap.taskCounterPerRegion.put(hri1.getRegionName(), ai); @@ -349,9 +346,9 @@ public class TestAsyncProcess { @Override public void run(){ Threads.sleep(1000); - Assert.assertFalse(checkPoint.get()); + Assert.assertFalse(checkPoint.get()); // TODO: this is timing-dependent ai.decrementAndGet(); - ap.tasksDone.incrementAndGet(); + ap.tasksInProgress.decrementAndGet(); checkPoint2.set(true); } }; @@ -360,12 +357,12 @@ public class TestAsyncProcess { Put p = createPut(1, true); puts.add(p); - ap.submit(puts, false); + ap.submit(DUMMY_TABLE, puts, false, null, false); Assert.assertFalse(puts.isEmpty()); t.start(); - ap.submit(puts, true); + ap.submit(DUMMY_TABLE, puts, true, null, false); Assert.assertTrue(puts.isEmpty()); checkPoint.set(true); @@ -376,71 +373,50 @@ public class TestAsyncProcess { @Test public void testFailAndSuccess() throws Exception { - HConnection hc = createHConnection(); - MyCB mcb = new MyCB(); - AsyncProcess ap = new MyAsyncProcess<Object>(hc, mcb, conf); + MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false); List<Put> puts = new ArrayList<Put>(); puts.add(createPut(1, false)); puts.add(createPut(1, true)); puts.add(createPut(1, true)); - ap.submit(puts, false); + AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true); Assert.assertTrue(puts.isEmpty()); - - long cutoff = System.currentTimeMillis() + 60000; - while (!ap.hasError() && System.currentTimeMillis() < cutoff) { - Thread.sleep(1); - } - Assert.assertTrue(ap.hasError()); - ap.waitUntilDone(); - - Assert.assertEquals(mcb.successCalled.get(), 2); - Assert.assertEquals(mcb.retriableFailure.get(), 2); - Assert.assertEquals(mcb.failureCalled.get(), 1); - - Assert.assertEquals(1, ap.getErrors().actions.size()); - + ars.waitUntilDone(); + verifyResult(ars, false, true, true); + Assert.assertEquals(2, ap.getRetriesRequested()); + Assert.assertEquals(1, ars.getErrors().actions.size()); puts.add(createPut(1, true)); - ap.submit(puts, false); - Assert.assertTrue(puts.isEmpty()); - - while (mcb.successCalled.get() != 3) { - Thread.sleep(1); - } - Assert.assertEquals(mcb.retriableFailure.get(), 2); - Assert.assertEquals(mcb.failureCalled.get(), 1); - - ap.clearErrors(); - Assert.assertTrue(ap.getErrors().actions.isEmpty()); + // Wait for AP to be free. While ars might have the result, ap counters are decreased later. + ap.waitUntilDone(); + ars = ap.submit(DUMMY_TABLE, puts, false, null, true); + Assert.assertEquals(0, puts.size()); + ars.waitUntilDone(); + Assert.assertEquals(2, ap.getRetriesRequested()); + verifyResult(ars, true); } @Test public void testFlush() throws Exception { - HConnection hc = createHConnection(); - MyCB mcb = new MyCB(); - AsyncProcess ap = new MyAsyncProcess<Object>(hc, mcb, conf); + MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false); List<Put> puts = new ArrayList<Put>(); puts.add(createPut(1, false)); puts.add(createPut(1, true)); puts.add(createPut(1, true)); - ap.submit(puts, false); - ap.waitUntilDone(); - - Assert.assertEquals(mcb.successCalled.get(), 2); - Assert.assertEquals(mcb.retriableFailure.get(), 2); - Assert.assertEquals(mcb.failureCalled.get(), 1); + AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true); + ars.waitUntilDone(); + verifyResult(ars, false, true, true); + Assert.assertEquals(2, ap.getRetriesRequested()); - Assert.assertEquals(1, ap.getFailedOperations().size()); + Assert.assertEquals(1, ars.getFailedOperations().size()); } @Test public void testMaxTask() throws Exception { - HConnection hc = createHConnection(); - final AsyncProcess ap = new MyAsyncProcess<Object>(hc, null, conf); + final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false); for (int i = 0; i < 1000; i++) { ap.incTaskCounters(Arrays.asList("dummy".getBytes()), sn); @@ -461,7 +437,7 @@ public class TestAsyncProcess { t.start(); try { - ap.submit(puts, false); + ap.submit(DUMMY_TABLE, puts, false, null, false); Assert.fail("We should have been interrupted."); } catch (InterruptedIOException expected) { } @@ -471,7 +447,7 @@ public class TestAsyncProcess { Thread t2 = new Thread() { public void run() { Threads.sleep(sleepTime); - while (ap.tasksDone.get() > 0) { + while (ap.tasksInProgress.get() > 0) { ap.decTaskCounters(Arrays.asList("dummy".getBytes()), sn); } } @@ -479,39 +455,13 @@ public class TestAsyncProcess { t2.start(); long start = System.currentTimeMillis(); - ap.submit(new ArrayList<Row>(), false); + ap.submit(DUMMY_TABLE, new ArrayList<Row>(), false, null, false); long end = System.currentTimeMillis(); //Adds 100 to secure us against approximate timing. Assert.assertTrue(start + 100L + sleepTime > end); } - - private class MyCB implements AsyncProcess.AsyncProcessCallback<Object> { - private final AtomicInteger successCalled = new AtomicInteger(0); - private final AtomicInteger failureCalled = new AtomicInteger(0); - private final AtomicInteger retriableFailure = new AtomicInteger(0); - - - @Override - public void success(int originalIndex, byte[] region, Row row, Object o) { - successCalled.incrementAndGet(); - } - - @Override - public boolean failure(int originalIndex, Row row, Throwable t) { - failureCalled.incrementAndGet(); - return true; - } - - @Override - public boolean retriableFailure(int originalIndex, Row row, Throwable exception) { - // We retry once only. - return (retriableFailure.incrementAndGet() < 2); - } - } - - private static HConnection createHConnection() throws IOException { HConnection hc = Mockito.mock(HConnection.class); @@ -535,14 +485,17 @@ public class TestAsyncProcess { Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(FAILS))).thenReturn(loc2); + NonceGenerator ng = Mockito.mock(NonceGenerator.class); + Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE); + Mockito.when(hc.getNonceGenerator()).thenReturn(ng); + return hc; } @Test public void testHTablePutSuccess() throws Exception { HTable ht = Mockito.mock(HTable.class); - HConnection hc = createHConnection(); - ht.ap = new MyAsyncProcess<Object>(hc, null, conf); + ht.ap = new MyAsyncProcess(createHConnection(), conf, true); Put put = createPut(1, true); @@ -553,9 +506,8 @@ public class TestAsyncProcess { private void doHTableFailedPut(boolean bufferOn) throws Exception { HTable ht = new HTable(); - HConnection hc = createHConnection(); - MyCB mcb = new MyCB(); // This allows to have some hints on what's going on. - ht.ap = new MyAsyncProcess<Object>(hc, mcb, conf); + MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, true); + ht.ap = ap; ht.setAutoFlush(true, true); if (bufferOn) { ht.setWriteBufferSize(1024L * 1024L); @@ -575,9 +527,15 @@ public class TestAsyncProcess { } catch (RetriesExhaustedException expected) { } Assert.assertEquals(0L, ht.currentWriteBufferSize); - Assert.assertEquals(0, mcb.successCalled.get()); - Assert.assertEquals(2, mcb.retriableFailure.get()); - Assert.assertEquals(1, mcb.failureCalled.get()); + // The table should have sent one request, maybe after multiple attempts + AsyncRequestFuture ars = null; + for (AsyncRequestFuture someReqs : ap.allReqs) { + if (someReqs.getResults().length == 0) continue; + Assert.assertTrue(ars == null); + ars = someReqs; + } + Assert.assertTrue(ars != null); + verifyResult(ars, false); // This should not raise any exception, puts have been 'received' before by the catch. ht.close(); @@ -589,23 +547,22 @@ public class TestAsyncProcess { } @Test - public void doHTableFailedPutWithoutBuffer() throws Exception { + public void testHTableFailedPutWithoutBuffer() throws Exception { doHTableFailedPut(false); } @Test public void testHTableFailedPutAndNewPut() throws Exception { HTable ht = new HTable(); - HConnection hc = createHConnection(); - MyCB mcb = new MyCB(); // This allows to have some hints on what's going on. - ht.ap = new MyAsyncProcess<Object>(hc, mcb, conf); + MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, true); + ht.ap = ap; ht.setAutoFlush(false, true); ht.setWriteBufferSize(0); Put p = createPut(1, false); ht.put(p); - ht.ap.waitUntilDone(); // Let's do all the retries. + ap.waitUntilDone(); // Let's do all the retries. // We're testing that we're behaving as we were behaving in 0.94: sending exceptions in the // doPut if it fails. @@ -626,14 +583,13 @@ public class TestAsyncProcess { @Test public void testWithNoClearOnFail() throws IOException { HTable ht = new HTable(); - HConnection hc = createHConnection(); - MyCB mcb = new MyCB(); - ht.ap = new MyAsyncProcess<Object>(hc, mcb, conf); + ht.ap = new MyAsyncProcess(createHConnection(), conf, true); ht.setAutoFlush(false, false); Put p = createPut(1, false); ht.put(p); Assert.assertEquals(0, ht.writeAsyncBuffer.size()); + try { ht.flushCommits(); } catch (RetriesExhaustedWithDetailsException expected) { @@ -651,6 +607,7 @@ public class TestAsyncProcess { public void testBatch() throws IOException, InterruptedException { HTable ht = new HTable(); ht.connection = new MyConnectionImpl(); + ht.multiAp = new MyAsyncProcess(ht.connection, conf, false); List<Put> puts = new ArrayList<Put>(); puts.add(createPut(1, true)); @@ -686,10 +643,9 @@ public class TestAsyncProcess { // set default writeBufferSize ht.setWriteBufferSize(configuration.getLong("hbase.client.write.buffer", 2097152)); - MyConnectionImpl mci = new MyConnectionImpl(configuration); - ht.connection = mci; - ht.ap = new MyAsyncProcess<Object>(mci, null, configuration); - + ht.connection = new MyConnectionImpl(configuration); + MyAsyncProcess ap = new MyAsyncProcess(ht.connection, conf, true); + ht.ap = ap; Assert.assertNotNull(ht.ap.createServerErrorTracker()); Assert.assertTrue(ht.ap.serverTrackerTimeout > 200); @@ -705,7 +661,7 @@ public class TestAsyncProcess { } catch (RetriesExhaustedWithDetailsException expected) { } // Checking that the ErrorsServers came into play and didn't make us stop immediately - Assert.assertEquals(ht.ap.tasksSent.get(), 3); + Assert.assertEquals(2, ap.getRetriesRequested()); } /** @@ -730,11 +686,13 @@ public class TestAsyncProcess { HTable ht = new HTable(); MyConnectionImpl2 con = new MyConnectionImpl2(hrls); ht.connection = con; + MyAsyncProcess ap = new MyAsyncProcess(con, conf, con.nbThreads); + ht.multiAp = ap; ht.batch(gets); - Assert.assertEquals(con.ap.nbActions.get(), NB_REGS); - Assert.assertEquals("1 multi response per server", 2, con.ap.nbMultiResponse.get()); + Assert.assertEquals(ap.nbActions.get(), NB_REGS); + Assert.assertEquals("1 multi response per server", 2, ap.nbMultiResponse.get()); Assert.assertEquals("1 thread per server", 2, con.nbThreads.get()); int nbReg = 0; @@ -744,6 +702,13 @@ public class TestAsyncProcess { Assert.assertEquals("nbReg=" + nbReg, nbReg, NB_REGS); } + private void verifyResult(AsyncRequestFuture ars, boolean... expected) { + Object[] actual = ars.getResults(); + Assert.assertEquals(expected.length, actual.length); + for (int i = 0; i < expected.length; ++i) { + Assert.assertEquals(expected[i], !(actual[i] instanceof Throwable)); + } + } /** * @param regCnt the region: 1 to 3.
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java?rev=1564832&r1=1564831&r2=1564832&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java Wed Feb 5 17:08:33 2014 @@ -63,6 +63,7 @@ import com.google.protobuf.ServiceExcept @InterfaceAudience.Private @InterfaceStability.Evolving public class CoprocessorHConnection implements HConnection { + private static final NonceGenerator ng = new HConnectionManager.NoNonceGenerator(); /** * Create an unmanaged {@link HConnection} based on the environment in which we are running the @@ -388,6 +389,11 @@ public class CoprocessorHConnection impl @Override public NonceGenerator getNonceGenerator() { - return null; // don't use nonces for coprocessor connection + return ng; // don't use nonces for coprocessor connection + } + + @Override + public AsyncProcess getAsyncProcess() { + return delegate.getAsyncProcess(); } } \ No newline at end of file Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java?rev=1564832&r1=1564831&r2=1564832&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java (original) +++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java Wed Feb 5 17:08:33 2014 @@ -117,6 +117,10 @@ public class HConnectionTestingUtility { Mockito.when(c.getClient(Mockito.any(ServerName.class))). thenReturn(client); } + NonceGenerator ng = Mockito.mock(NonceGenerator.class); + Mockito.when(c.getNonceGenerator()).thenReturn(ng); + Mockito.when(c.getAsyncProcess()).thenReturn(new AsyncProcess( + c, conf, null, RpcRetryingCallerFactory.instantiate(conf), false)); return c; } Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java?rev=1564832&r1=1564831&r2=1564832&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java (original) +++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java Wed Feb 5 17:08:33 2014 @@ -62,8 +62,13 @@ import org.apache.hadoop.hbase.master.Ca import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; @@ -73,6 +78,8 @@ import org.apache.hadoop.hbase.zookeeper import org.junit.Test; import org.junit.experimental.categories.Category; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import com.google.protobuf.RpcController; import com.google.protobuf.Service; @@ -105,6 +112,18 @@ public class TestCatalogJanitor { } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } + try { + Mockito.when(ri.multi( + (RpcController)Mockito.any(), (MultiRequest)Mockito.any())). + thenAnswer(new Answer<MultiResponse>() { + @Override + public MultiResponse answer(InvocationOnMock invocation) throws Throwable { + return buildMultiResponse( (MultiRequest)invocation.getArguments()[1]); + } + }); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } // Mock an HConnection and a AdminProtocol implementation. Have the // HConnection return the HRI. Have the HRI return a few mocked up responses // to make our test work. @@ -940,5 +959,23 @@ public class TestCatalogJanitor { return htd; } + private MultiResponse buildMultiResponse(MultiRequest req) { + MultiResponse.Builder builder = MultiResponse.newBuilder(); + RegionActionResult.Builder regionActionResultBuilder = + RegionActionResult.newBuilder(); + ResultOrException.Builder roeBuilder = ResultOrException.newBuilder(); + for (RegionAction regionAction: req.getRegionActionList()) { + regionActionResultBuilder.clear(); + for (ClientProtos.Action action: regionAction.getActionList()) { + roeBuilder.clear(); + roeBuilder.setResult(ClientProtos.Result.getDefaultInstance()); + roeBuilder.setIndex(action.getIndex()); + regionActionResultBuilder.addResultOrException(roeBuilder.build()); + } + builder.addRegionActionResult(regionActionResultBuilder.build()); + } + return builder.build(); + } + }