Repository: hbase-site Updated Branches: refs/heads/asf-site dd8334d37 -> f55ebeaa5
http://git-wip-us.apache.org/repos/asf/hbase-site/blob/f55ebeaa/testdevapidocs/src-html/org/apache/hadoop/hbase/ipc/AbstractTestIPC.TestFailingRpcServer.FailingConnection.html ---------------------------------------------------------------------- diff --git a/testdevapidocs/src-html/org/apache/hadoop/hbase/ipc/AbstractTestIPC.TestFailingRpcServer.FailingConnection.html b/testdevapidocs/src-html/org/apache/hadoop/hbase/ipc/AbstractTestIPC.TestFailingRpcServer.FailingConnection.html index 8335bdf..e49c7e6 100644 --- a/testdevapidocs/src-html/org/apache/hadoop/hbase/ipc/AbstractTestIPC.TestFailingRpcServer.FailingConnection.html +++ b/testdevapidocs/src-html/org/apache/hadoop/hbase/ipc/AbstractTestIPC.TestFailingRpcServer.FailingConnection.html @@ -323,138 +323,139 @@ <span class="sourceLineNo">315</span> new InetSocketAddress("localhost", 0), conf, scheduler);<a name="line.315"></a> <span class="sourceLineNo">316</span> }<a name="line.316"></a> <span class="sourceLineNo">317</span><a name="line.317"></a> -<span class="sourceLineNo">318</span> class FailingConnection extends Connection {<a name="line.318"></a> -<span class="sourceLineNo">319</span> public FailingConnection(SocketChannel channel, long lastContact) {<a name="line.319"></a> -<span class="sourceLineNo">320</span> super(channel, lastContact);<a name="line.320"></a> -<span class="sourceLineNo">321</span> }<a name="line.321"></a> -<span class="sourceLineNo">322</span><a name="line.322"></a> -<span class="sourceLineNo">323</span> @Override<a name="line.323"></a> -<span class="sourceLineNo">324</span> public void processRequest(ByteBuff buf) throws IOException, InterruptedException {<a name="line.324"></a> -<span class="sourceLineNo">325</span> // this will throw exception after the connection header is read, and an RPC is sent<a name="line.325"></a> -<span class="sourceLineNo">326</span> // from client<a name="line.326"></a> -<span class="sourceLineNo">327</span> throw new DoNotRetryIOException("Failing for test");<a name="line.327"></a> -<span class="sourceLineNo">328</span> }<a name="line.328"></a> -<span class="sourceLineNo">329</span> }<a name="line.329"></a> -<span class="sourceLineNo">330</span><a name="line.330"></a> -<span class="sourceLineNo">331</span> @Override<a name="line.331"></a> -<span class="sourceLineNo">332</span> protected Connection getConnection(SocketChannel channel, long time) {<a name="line.332"></a> -<span class="sourceLineNo">333</span> return new FailingConnection(channel, time);<a name="line.333"></a> -<span class="sourceLineNo">334</span> }<a name="line.334"></a> -<span class="sourceLineNo">335</span> }<a name="line.335"></a> -<span class="sourceLineNo">336</span><a name="line.336"></a> -<span class="sourceLineNo">337</span> /** Tests that the connection closing is handled by the client with outstanding RPC calls */<a name="line.337"></a> -<span class="sourceLineNo">338</span> @Test<a name="line.338"></a> -<span class="sourceLineNo">339</span> public void testConnectionCloseWithOutstandingRPCs() throws InterruptedException, IOException {<a name="line.339"></a> -<span class="sourceLineNo">340</span> Configuration conf = new Configuration(CONF);<a name="line.340"></a> -<span class="sourceLineNo">341</span> RpcServer rpcServer = new TestFailingRpcServer(conf);<a name="line.341"></a> -<span class="sourceLineNo">342</span> try (AbstractRpcClient<?> client = createRpcClient(conf)) {<a name="line.342"></a> -<span class="sourceLineNo">343</span> rpcServer.start();<a name="line.343"></a> -<span class="sourceLineNo">344</span> BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());<a name="line.344"></a> -<span class="sourceLineNo">345</span> EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();<a name="line.345"></a> -<span class="sourceLineNo">346</span> stub.echo(null, param);<a name="line.346"></a> -<span class="sourceLineNo">347</span> fail("RPC should have failed because connection closed");<a name="line.347"></a> -<span class="sourceLineNo">348</span> } catch (ServiceException e) {<a name="line.348"></a> -<span class="sourceLineNo">349</span> LOG.info("Caught expected exception: " + e.toString());<a name="line.349"></a> -<span class="sourceLineNo">350</span> } finally {<a name="line.350"></a> -<span class="sourceLineNo">351</span> rpcServer.stop();<a name="line.351"></a> -<span class="sourceLineNo">352</span> }<a name="line.352"></a> -<span class="sourceLineNo">353</span> }<a name="line.353"></a> -<span class="sourceLineNo">354</span><a name="line.354"></a> -<span class="sourceLineNo">355</span> @Test<a name="line.355"></a> -<span class="sourceLineNo">356</span> public void testAsyncEcho() throws IOException {<a name="line.356"></a> -<span class="sourceLineNo">357</span> Configuration conf = HBaseConfiguration.create();<a name="line.357"></a> -<span class="sourceLineNo">358</span> RpcServer rpcServer = RpcServerFactory.createRpcServer(null,<a name="line.358"></a> -<span class="sourceLineNo">359</span> "testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface(<a name="line.359"></a> -<span class="sourceLineNo">360</span> SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,<a name="line.360"></a> -<span class="sourceLineNo">361</span> new FifoRpcScheduler(CONF, 1));<a name="line.361"></a> -<span class="sourceLineNo">362</span> try (AbstractRpcClient<?> client = createRpcClient(conf)) {<a name="line.362"></a> -<span class="sourceLineNo">363</span> rpcServer.start();<a name="line.363"></a> -<span class="sourceLineNo">364</span> Interface stub = newStub(client, rpcServer.getListenerAddress());<a name="line.364"></a> -<span class="sourceLineNo">365</span> int num = 10;<a name="line.365"></a> -<span class="sourceLineNo">366</span> List<HBaseRpcController> pcrcList = new ArrayList<>();<a name="line.366"></a> -<span class="sourceLineNo">367</span> List<BlockingRpcCallback<EchoResponseProto>> callbackList = new ArrayList<>();<a name="line.367"></a> -<span class="sourceLineNo">368</span> for (int i = 0; i < num; i++) {<a name="line.368"></a> -<span class="sourceLineNo">369</span> HBaseRpcController pcrc = new HBaseRpcControllerImpl();<a name="line.369"></a> -<span class="sourceLineNo">370</span> BlockingRpcCallback<EchoResponseProto> done = new BlockingRpcCallback<>();<a name="line.370"></a> -<span class="sourceLineNo">371</span> stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage("hello-" + i).build(), done);<a name="line.371"></a> -<span class="sourceLineNo">372</span> pcrcList.add(pcrc);<a name="line.372"></a> -<span class="sourceLineNo">373</span> callbackList.add(done);<a name="line.373"></a> -<span class="sourceLineNo">374</span> }<a name="line.374"></a> -<span class="sourceLineNo">375</span> for (int i = 0; i < num; i++) {<a name="line.375"></a> -<span class="sourceLineNo">376</span> HBaseRpcController pcrc = pcrcList.get(i);<a name="line.376"></a> -<span class="sourceLineNo">377</span> assertFalse(pcrc.failed());<a name="line.377"></a> -<span class="sourceLineNo">378</span> assertNull(pcrc.cellScanner());<a name="line.378"></a> -<span class="sourceLineNo">379</span> assertEquals("hello-" + i, callbackList.get(i).get().getMessage());<a name="line.379"></a> -<span class="sourceLineNo">380</span> }<a name="line.380"></a> -<span class="sourceLineNo">381</span> } finally {<a name="line.381"></a> -<span class="sourceLineNo">382</span> rpcServer.stop();<a name="line.382"></a> -<span class="sourceLineNo">383</span> }<a name="line.383"></a> -<span class="sourceLineNo">384</span> }<a name="line.384"></a> -<span class="sourceLineNo">385</span><a name="line.385"></a> -<span class="sourceLineNo">386</span> @Test<a name="line.386"></a> -<span class="sourceLineNo">387</span> public void testAsyncRemoteError() throws IOException {<a name="line.387"></a> -<span class="sourceLineNo">388</span> AbstractRpcClient<?> client = createRpcClient(CONF);<a name="line.388"></a> -<span class="sourceLineNo">389</span> RpcServer rpcServer = RpcServerFactory.createRpcServer(null,<a name="line.389"></a> -<span class="sourceLineNo">390</span> "testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface(<a name="line.390"></a> -<span class="sourceLineNo">391</span> SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,<a name="line.391"></a> -<span class="sourceLineNo">392</span> new FifoRpcScheduler(CONF, 1));<a name="line.392"></a> -<span class="sourceLineNo">393</span> try {<a name="line.393"></a> -<span class="sourceLineNo">394</span> rpcServer.start();<a name="line.394"></a> -<span class="sourceLineNo">395</span> Interface stub = newStub(client, rpcServer.getListenerAddress());<a name="line.395"></a> -<span class="sourceLineNo">396</span> BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>();<a name="line.396"></a> -<span class="sourceLineNo">397</span> HBaseRpcController pcrc = new HBaseRpcControllerImpl();<a name="line.397"></a> -<span class="sourceLineNo">398</span> stub.error(pcrc, EmptyRequestProto.getDefaultInstance(), callback);<a name="line.398"></a> -<span class="sourceLineNo">399</span> assertNull(callback.get());<a name="line.399"></a> -<span class="sourceLineNo">400</span> assertTrue(pcrc.failed());<a name="line.400"></a> -<span class="sourceLineNo">401</span> LOG.info("Caught expected exception: " + pcrc.getFailed());<a name="line.401"></a> -<span class="sourceLineNo">402</span> IOException ioe = ProtobufUtil.handleRemoteException(pcrc.getFailed());<a name="line.402"></a> -<span class="sourceLineNo">403</span> assertTrue(ioe instanceof DoNotRetryIOException);<a name="line.403"></a> -<span class="sourceLineNo">404</span> assertTrue(ioe.getMessage().contains("server error!"));<a name="line.404"></a> -<span class="sourceLineNo">405</span> } finally {<a name="line.405"></a> -<span class="sourceLineNo">406</span> client.close();<a name="line.406"></a> -<span class="sourceLineNo">407</span> rpcServer.stop();<a name="line.407"></a> -<span class="sourceLineNo">408</span> }<a name="line.408"></a> -<span class="sourceLineNo">409</span> }<a name="line.409"></a> -<span class="sourceLineNo">410</span><a name="line.410"></a> -<span class="sourceLineNo">411</span> @Test<a name="line.411"></a> -<span class="sourceLineNo">412</span> public void testAsyncTimeout() throws IOException {<a name="line.412"></a> -<span class="sourceLineNo">413</span> RpcServer rpcServer = RpcServerFactory.createRpcServer(null,<a name="line.413"></a> -<span class="sourceLineNo">414</span> "testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface(<a name="line.414"></a> -<span class="sourceLineNo">415</span> SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,<a name="line.415"></a> -<span class="sourceLineNo">416</span> new FifoRpcScheduler(CONF, 1));<a name="line.416"></a> -<span class="sourceLineNo">417</span> try (AbstractRpcClient<?> client = createRpcClient(CONF)) {<a name="line.417"></a> -<span class="sourceLineNo">418</span> rpcServer.start();<a name="line.418"></a> -<span class="sourceLineNo">419</span> Interface stub = newStub(client, rpcServer.getListenerAddress());<a name="line.419"></a> -<span class="sourceLineNo">420</span> List<HBaseRpcController> pcrcList = new ArrayList<>();<a name="line.420"></a> -<span class="sourceLineNo">421</span> List<BlockingRpcCallback<EmptyResponseProto>> callbackList = new ArrayList<>();<a name="line.421"></a> -<span class="sourceLineNo">422</span> int ms = 1000;<a name="line.422"></a> -<span class="sourceLineNo">423</span> int timeout = 100;<a name="line.423"></a> -<span class="sourceLineNo">424</span> long startTime = System.nanoTime();<a name="line.424"></a> -<span class="sourceLineNo">425</span> for (int i = 0; i < 10; i++) {<a name="line.425"></a> -<span class="sourceLineNo">426</span> HBaseRpcController pcrc = new HBaseRpcControllerImpl();<a name="line.426"></a> -<span class="sourceLineNo">427</span> pcrc.setCallTimeout(timeout);<a name="line.427"></a> -<span class="sourceLineNo">428</span> BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>();<a name="line.428"></a> -<span class="sourceLineNo">429</span> stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(ms).build(), callback);<a name="line.429"></a> -<span class="sourceLineNo">430</span> pcrcList.add(pcrc);<a name="line.430"></a> -<span class="sourceLineNo">431</span> callbackList.add(callback);<a name="line.431"></a> -<span class="sourceLineNo">432</span> }<a name="line.432"></a> -<span class="sourceLineNo">433</span> for (BlockingRpcCallback<?> callback : callbackList) {<a name="line.433"></a> -<span class="sourceLineNo">434</span> assertNull(callback.get());<a name="line.434"></a> -<span class="sourceLineNo">435</span> }<a name="line.435"></a> -<span class="sourceLineNo">436</span> long waitTime = (System.nanoTime() - startTime) / 1000000;<a name="line.436"></a> -<span class="sourceLineNo">437</span> for (HBaseRpcController pcrc : pcrcList) {<a name="line.437"></a> -<span class="sourceLineNo">438</span> assertTrue(pcrc.failed());<a name="line.438"></a> -<span class="sourceLineNo">439</span> LOG.info("Caught expected exception: " + pcrc.getFailed());<a name="line.439"></a> -<span class="sourceLineNo">440</span> IOException ioe = ProtobufUtil.handleRemoteException(pcrc.getFailed());<a name="line.440"></a> -<span class="sourceLineNo">441</span> assertTrue(ioe.getCause() instanceof CallTimeoutException);<a name="line.441"></a> -<span class="sourceLineNo">442</span> }<a name="line.442"></a> -<span class="sourceLineNo">443</span> // confirm that we got exception before the actual pause.<a name="line.443"></a> -<span class="sourceLineNo">444</span> assertTrue(waitTime < ms);<a name="line.444"></a> -<span class="sourceLineNo">445</span> } finally {<a name="line.445"></a> -<span class="sourceLineNo">446</span> rpcServer.stop();<a name="line.446"></a> -<span class="sourceLineNo">447</span> }<a name="line.447"></a> -<span class="sourceLineNo">448</span> }<a name="line.448"></a> -<span class="sourceLineNo">449</span>}<a name="line.449"></a> +<span class="sourceLineNo">318</span> class FailingConnection extends SimpleServerRpcConnection {<a name="line.318"></a> +<span class="sourceLineNo">319</span> public FailingConnection(TestFailingRpcServer rpcServer, SocketChannel channel,<a name="line.319"></a> +<span class="sourceLineNo">320</span> long lastContact) {<a name="line.320"></a> +<span class="sourceLineNo">321</span> super(rpcServer, channel, lastContact);<a name="line.321"></a> +<span class="sourceLineNo">322</span> }<a name="line.322"></a> +<span class="sourceLineNo">323</span><a name="line.323"></a> +<span class="sourceLineNo">324</span> @Override<a name="line.324"></a> +<span class="sourceLineNo">325</span> public void processRequest(ByteBuff buf) throws IOException, InterruptedException {<a name="line.325"></a> +<span class="sourceLineNo">326</span> // this will throw exception after the connection header is read, and an RPC is sent<a name="line.326"></a> +<span class="sourceLineNo">327</span> // from client<a name="line.327"></a> +<span class="sourceLineNo">328</span> throw new DoNotRetryIOException("Failing for test");<a name="line.328"></a> +<span class="sourceLineNo">329</span> }<a name="line.329"></a> +<span class="sourceLineNo">330</span> }<a name="line.330"></a> +<span class="sourceLineNo">331</span><a name="line.331"></a> +<span class="sourceLineNo">332</span> @Override<a name="line.332"></a> +<span class="sourceLineNo">333</span> protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) {<a name="line.333"></a> +<span class="sourceLineNo">334</span> return new FailingConnection(this, channel, time);<a name="line.334"></a> +<span class="sourceLineNo">335</span> }<a name="line.335"></a> +<span class="sourceLineNo">336</span> }<a name="line.336"></a> +<span class="sourceLineNo">337</span><a name="line.337"></a> +<span class="sourceLineNo">338</span> /** Tests that the connection closing is handled by the client with outstanding RPC calls */<a name="line.338"></a> +<span class="sourceLineNo">339</span> @Test<a name="line.339"></a> +<span class="sourceLineNo">340</span> public void testConnectionCloseWithOutstandingRPCs() throws InterruptedException, IOException {<a name="line.340"></a> +<span class="sourceLineNo">341</span> Configuration conf = new Configuration(CONF);<a name="line.341"></a> +<span class="sourceLineNo">342</span> RpcServer rpcServer = new TestFailingRpcServer(conf);<a name="line.342"></a> +<span class="sourceLineNo">343</span> try (AbstractRpcClient<?> client = createRpcClient(conf)) {<a name="line.343"></a> +<span class="sourceLineNo">344</span> rpcServer.start();<a name="line.344"></a> +<span class="sourceLineNo">345</span> BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());<a name="line.345"></a> +<span class="sourceLineNo">346</span> EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();<a name="line.346"></a> +<span class="sourceLineNo">347</span> stub.echo(null, param);<a name="line.347"></a> +<span class="sourceLineNo">348</span> fail("RPC should have failed because connection closed");<a name="line.348"></a> +<span class="sourceLineNo">349</span> } catch (ServiceException e) {<a name="line.349"></a> +<span class="sourceLineNo">350</span> LOG.info("Caught expected exception: " + e.toString());<a name="line.350"></a> +<span class="sourceLineNo">351</span> } finally {<a name="line.351"></a> +<span class="sourceLineNo">352</span> rpcServer.stop();<a name="line.352"></a> +<span class="sourceLineNo">353</span> }<a name="line.353"></a> +<span class="sourceLineNo">354</span> }<a name="line.354"></a> +<span class="sourceLineNo">355</span><a name="line.355"></a> +<span class="sourceLineNo">356</span> @Test<a name="line.356"></a> +<span class="sourceLineNo">357</span> public void testAsyncEcho() throws IOException {<a name="line.357"></a> +<span class="sourceLineNo">358</span> Configuration conf = HBaseConfiguration.create();<a name="line.358"></a> +<span class="sourceLineNo">359</span> RpcServer rpcServer = RpcServerFactory.createRpcServer(null,<a name="line.359"></a> +<span class="sourceLineNo">360</span> "testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface(<a name="line.360"></a> +<span class="sourceLineNo">361</span> SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,<a name="line.361"></a> +<span class="sourceLineNo">362</span> new FifoRpcScheduler(CONF, 1));<a name="line.362"></a> +<span class="sourceLineNo">363</span> try (AbstractRpcClient<?> client = createRpcClient(conf)) {<a name="line.363"></a> +<span class="sourceLineNo">364</span> rpcServer.start();<a name="line.364"></a> +<span class="sourceLineNo">365</span> Interface stub = newStub(client, rpcServer.getListenerAddress());<a name="line.365"></a> +<span class="sourceLineNo">366</span> int num = 10;<a name="line.366"></a> +<span class="sourceLineNo">367</span> List<HBaseRpcController> pcrcList = new ArrayList<>();<a name="line.367"></a> +<span class="sourceLineNo">368</span> List<BlockingRpcCallback<EchoResponseProto>> callbackList = new ArrayList<>();<a name="line.368"></a> +<span class="sourceLineNo">369</span> for (int i = 0; i < num; i++) {<a name="line.369"></a> +<span class="sourceLineNo">370</span> HBaseRpcController pcrc = new HBaseRpcControllerImpl();<a name="line.370"></a> +<span class="sourceLineNo">371</span> BlockingRpcCallback<EchoResponseProto> done = new BlockingRpcCallback<>();<a name="line.371"></a> +<span class="sourceLineNo">372</span> stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage("hello-" + i).build(), done);<a name="line.372"></a> +<span class="sourceLineNo">373</span> pcrcList.add(pcrc);<a name="line.373"></a> +<span class="sourceLineNo">374</span> callbackList.add(done);<a name="line.374"></a> +<span class="sourceLineNo">375</span> }<a name="line.375"></a> +<span class="sourceLineNo">376</span> for (int i = 0; i < num; i++) {<a name="line.376"></a> +<span class="sourceLineNo">377</span> HBaseRpcController pcrc = pcrcList.get(i);<a name="line.377"></a> +<span class="sourceLineNo">378</span> assertFalse(pcrc.failed());<a name="line.378"></a> +<span class="sourceLineNo">379</span> assertNull(pcrc.cellScanner());<a name="line.379"></a> +<span class="sourceLineNo">380</span> assertEquals("hello-" + i, callbackList.get(i).get().getMessage());<a name="line.380"></a> +<span class="sourceLineNo">381</span> }<a name="line.381"></a> +<span class="sourceLineNo">382</span> } finally {<a name="line.382"></a> +<span class="sourceLineNo">383</span> rpcServer.stop();<a name="line.383"></a> +<span class="sourceLineNo">384</span> }<a name="line.384"></a> +<span class="sourceLineNo">385</span> }<a name="line.385"></a> +<span class="sourceLineNo">386</span><a name="line.386"></a> +<span class="sourceLineNo">387</span> @Test<a name="line.387"></a> +<span class="sourceLineNo">388</span> public void testAsyncRemoteError() throws IOException {<a name="line.388"></a> +<span class="sourceLineNo">389</span> AbstractRpcClient<?> client = createRpcClient(CONF);<a name="line.389"></a> +<span class="sourceLineNo">390</span> RpcServer rpcServer = RpcServerFactory.createRpcServer(null,<a name="line.390"></a> +<span class="sourceLineNo">391</span> "testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface(<a name="line.391"></a> +<span class="sourceLineNo">392</span> SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,<a name="line.392"></a> +<span class="sourceLineNo">393</span> new FifoRpcScheduler(CONF, 1));<a name="line.393"></a> +<span class="sourceLineNo">394</span> try {<a name="line.394"></a> +<span class="sourceLineNo">395</span> rpcServer.start();<a name="line.395"></a> +<span class="sourceLineNo">396</span> Interface stub = newStub(client, rpcServer.getListenerAddress());<a name="line.396"></a> +<span class="sourceLineNo">397</span> BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>();<a name="line.397"></a> +<span class="sourceLineNo">398</span> HBaseRpcController pcrc = new HBaseRpcControllerImpl();<a name="line.398"></a> +<span class="sourceLineNo">399</span> stub.error(pcrc, EmptyRequestProto.getDefaultInstance(), callback);<a name="line.399"></a> +<span class="sourceLineNo">400</span> assertNull(callback.get());<a name="line.400"></a> +<span class="sourceLineNo">401</span> assertTrue(pcrc.failed());<a name="line.401"></a> +<span class="sourceLineNo">402</span> LOG.info("Caught expected exception: " + pcrc.getFailed());<a name="line.402"></a> +<span class="sourceLineNo">403</span> IOException ioe = ProtobufUtil.handleRemoteException(pcrc.getFailed());<a name="line.403"></a> +<span class="sourceLineNo">404</span> assertTrue(ioe instanceof DoNotRetryIOException);<a name="line.404"></a> +<span class="sourceLineNo">405</span> assertTrue(ioe.getMessage().contains("server error!"));<a name="line.405"></a> +<span class="sourceLineNo">406</span> } finally {<a name="line.406"></a> +<span class="sourceLineNo">407</span> client.close();<a name="line.407"></a> +<span class="sourceLineNo">408</span> rpcServer.stop();<a name="line.408"></a> +<span class="sourceLineNo">409</span> }<a name="line.409"></a> +<span class="sourceLineNo">410</span> }<a name="line.410"></a> +<span class="sourceLineNo">411</span><a name="line.411"></a> +<span class="sourceLineNo">412</span> @Test<a name="line.412"></a> +<span class="sourceLineNo">413</span> public void testAsyncTimeout() throws IOException {<a name="line.413"></a> +<span class="sourceLineNo">414</span> RpcServer rpcServer = RpcServerFactory.createRpcServer(null,<a name="line.414"></a> +<span class="sourceLineNo">415</span> "testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface(<a name="line.415"></a> +<span class="sourceLineNo">416</span> SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,<a name="line.416"></a> +<span class="sourceLineNo">417</span> new FifoRpcScheduler(CONF, 1));<a name="line.417"></a> +<span class="sourceLineNo">418</span> try (AbstractRpcClient<?> client = createRpcClient(CONF)) {<a name="line.418"></a> +<span class="sourceLineNo">419</span> rpcServer.start();<a name="line.419"></a> +<span class="sourceLineNo">420</span> Interface stub = newStub(client, rpcServer.getListenerAddress());<a name="line.420"></a> +<span class="sourceLineNo">421</span> List<HBaseRpcController> pcrcList = new ArrayList<>();<a name="line.421"></a> +<span class="sourceLineNo">422</span> List<BlockingRpcCallback<EmptyResponseProto>> callbackList = new ArrayList<>();<a name="line.422"></a> +<span class="sourceLineNo">423</span> int ms = 1000;<a name="line.423"></a> +<span class="sourceLineNo">424</span> int timeout = 100;<a name="line.424"></a> +<span class="sourceLineNo">425</span> long startTime = System.nanoTime();<a name="line.425"></a> +<span class="sourceLineNo">426</span> for (int i = 0; i < 10; i++) {<a name="line.426"></a> +<span class="sourceLineNo">427</span> HBaseRpcController pcrc = new HBaseRpcControllerImpl();<a name="line.427"></a> +<span class="sourceLineNo">428</span> pcrc.setCallTimeout(timeout);<a name="line.428"></a> +<span class="sourceLineNo">429</span> BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>();<a name="line.429"></a> +<span class="sourceLineNo">430</span> stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(ms).build(), callback);<a name="line.430"></a> +<span class="sourceLineNo">431</span> pcrcList.add(pcrc);<a name="line.431"></a> +<span class="sourceLineNo">432</span> callbackList.add(callback);<a name="line.432"></a> +<span class="sourceLineNo">433</span> }<a name="line.433"></a> +<span class="sourceLineNo">434</span> for (BlockingRpcCallback<?> callback : callbackList) {<a name="line.434"></a> +<span class="sourceLineNo">435</span> assertNull(callback.get());<a name="line.435"></a> +<span class="sourceLineNo">436</span> }<a name="line.436"></a> +<span class="sourceLineNo">437</span> long waitTime = (System.nanoTime() - startTime) / 1000000;<a name="line.437"></a> +<span class="sourceLineNo">438</span> for (HBaseRpcController pcrc : pcrcList) {<a name="line.438"></a> +<span class="sourceLineNo">439</span> assertTrue(pcrc.failed());<a name="line.439"></a> +<span class="sourceLineNo">440</span> LOG.info("Caught expected exception: " + pcrc.getFailed());<a name="line.440"></a> +<span class="sourceLineNo">441</span> IOException ioe = ProtobufUtil.handleRemoteException(pcrc.getFailed());<a name="line.441"></a> +<span class="sourceLineNo">442</span> assertTrue(ioe.getCause() instanceof CallTimeoutException);<a name="line.442"></a> +<span class="sourceLineNo">443</span> }<a name="line.443"></a> +<span class="sourceLineNo">444</span> // confirm that we got exception before the actual pause.<a name="line.444"></a> +<span class="sourceLineNo">445</span> assertTrue(waitTime < ms);<a name="line.445"></a> +<span class="sourceLineNo">446</span> } finally {<a name="line.446"></a> +<span class="sourceLineNo">447</span> rpcServer.stop();<a name="line.447"></a> +<span class="sourceLineNo">448</span> }<a name="line.448"></a> +<span class="sourceLineNo">449</span> }<a name="line.449"></a> +<span class="sourceLineNo">450</span>}<a name="line.450"></a>
