HBASE-13318 RpcServer.getListenerAddress should handle when the accept channel is closed
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/efb82957 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/efb82957 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/efb82957 Branch: refs/heads/hbase-12439 Commit: efb82957da09ab06f5c887b3d62ad055bbba089f Parents: 928dade Author: Andrew Purtell <apurt...@apache.org> Authored: Mon Oct 26 14:24:42 2015 -0700 Committer: Andrew Purtell <apurt...@apache.org> Committed: Mon Oct 26 15:11:12 2015 -0700 ---------------------------------------------------------------------- .../hbase/ipc/IntegrationTestRpcClient.java | 22 ++++++++++++++++---- .../org/apache/hadoop/hbase/ipc/CallRunner.java | 11 ++++++---- .../org/apache/hadoop/hbase/ipc/RpcServer.java | 17 ++++++++++----- .../hbase/regionserver/RSRpcServices.java | 6 +++++- .../hadoop/hbase/ipc/AbstractTestIPC.java | 22 +++++++++++++++----- .../apache/hadoop/hbase/ipc/TestAsyncIPC.java | 9 ++++++++ .../apache/hadoop/hbase/ipc/TestDelayedRpc.java | 21 +++++++++++++------ .../org/apache/hadoop/hbase/ipc/TestIPC.java | 5 ++++- .../hadoop/hbase/ipc/TestProtoBufRpc.java | 6 +++++- .../hbase/ipc/TestRpcHandlerException.java | 7 +++++-- .../TestRSKilledWhenInitializing.java | 8 +++++-- .../hadoop/hbase/security/TestSecureRPC.java | 9 +++++--- .../security/token/TestTokenAuthentication.java | 6 +++++- 13 files changed, 114 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/efb82957/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java ---------------------------------------------------------------------- diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java index 09de871..c28f3e6 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java @@ -168,9 +168,13 @@ public class IntegrationTestRpcClient { TestRpcServer rpcServer = new TestRpcServer(conf); rpcServer.start(); - rpcServers.put(rpcServer.getListenerAddress(), rpcServer); + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } + rpcServers.put(address, rpcServer); serverList.add(rpcServer); - LOG.info("Started server: " + rpcServer.getListenerAddress()); + LOG.info("Started server: " + address); return rpcServer; } finally { lock.writeLock().unlock(); @@ -187,7 +191,13 @@ public class IntegrationTestRpcClient { int size = rpcServers.size(); int rand = random.nextInt(size); rpcServer = serverList.remove(rand); - rpcServers.remove(rpcServer.getListenerAddress()); + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + // Throw exception here. We can't remove this instance from the server map because + // we no longer have access to its map key + throw new IOException("Listener channel is closed"); + } + rpcServers.remove(address); if (rpcServer != null) { stopServer(rpcServer); @@ -305,8 +315,12 @@ public class IntegrationTestRpcClient { TestRpcServer server = cluster.getRandomServer(); try { User user = User.getCurrent(); + InetSocketAddress address = server.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } ret = (EchoResponseProto) - rpcClient.callBlockingMethod(md, null, param, ret, user, server.getListenerAddress()); + rpcClient.callBlockingMethod(md, null, param, ret, user, address); } catch (Exception e) { LOG.warn(e); continue; // expected in case connection is closing or closed http://git-wip-us.apache.org/repos/asf/hbase/blob/efb82957/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index 38b7c91..ede4b4e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -16,6 +16,7 @@ package org.apache.hadoop.hbase.ipc; * See the License for the specific language governing permissions and * limitations under the License. */ +import java.net.InetSocketAddress; import java.nio.channels.ClosedChannelException; import org.apache.commons.logging.Log; @@ -96,8 +97,9 @@ public class CallRunner { TraceScope traceScope = null; try { if (!this.rpcServer.isStarted()) { - throw new ServerNotRunningYetException("Server " + rpcServer.getListenerAddress() - + " is not running yet"); + InetSocketAddress address = rpcServer.getListenerAddress(); + throw new ServerNotRunningYetException("Server " + + (address != null ? address : "(channel closed)") + " is not running yet"); } if (call.tinfo != null) { traceScope = Trace.startSpan(call.toTraceString(), call.tinfo); @@ -143,9 +145,10 @@ public class CallRunner { throw e; } } catch (ClosedChannelException cce) { + InetSocketAddress address = rpcServer.getListenerAddress(); RpcServer.LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " + - "this means that the server " + rpcServer.getListenerAddress() + " was processing a " + - "request but the client went away. The error message was: " + + "this means that the server " + (address != null ? address : "(channel closed)") + + " was processing a request but the client went away. The error message was: " + cce.getMessage()); } catch (Exception e) { RpcServer.LOG.warn(Thread.currentThread().getName() http://git-wip-us.apache.org/repos/asf/hbase/blob/efb82957/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 8c08635..c20e972 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -1813,8 +1813,9 @@ public class RpcServer implements RpcServerInterface { responder, totalRequestSize, null, null); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION); + InetSocketAddress address = getListenerAddress(); setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION, - "Call queue is full on " + getListenerAddress() + + "Call queue is full on " + (address != null ? address : "(channel closed)") + ", is hbase.ipc.server.max.callqueue.size too small?"); responder.doRespond(callTooBig); return; @@ -1842,8 +1843,9 @@ public class RpcServer implements RpcServerInterface { buf, offset, buf.length); } } catch (Throwable t) { - String msg = getListenerAddress() + " is unable to read call parameter from client " + - getHostAddress(); + InetSocketAddress address = getListenerAddress(); + String msg = (address != null ? address : "(channel closed)") + + " is unable to read call parameter from client " + getHostAddress(); LOG.warn(msg, t); metrics.exception(t); @@ -2266,11 +2268,16 @@ public class RpcServer implements RpcServerInterface { } /** - * Return the socket (ip+port) on which the RPC server is listening to. - * @return the socket (ip+port) on which the RPC server is listening to. + * Return the socket (ip+port) on which the RPC server is listening to. May return null if + * the listener channel is closed. + * @return the socket (ip+port) on which the RPC server is listening to, or null if this + * information cannot be determined */ @Override public synchronized InetSocketAddress getListenerAddress() { + if (listener == null) { + return null; + } return listener.getAddress(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/efb82957/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 38288ef..28bf069 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -972,8 +972,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler, REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA); + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } // Set our address, however we need the final port that was given to rpcServer - isa = new InetSocketAddress(initialIsa.getHostName(), rpcServer.getListenerAddress().getPort()); + isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort()); rpcServer.setErrorHandler(this); rs.setName(name); } http://git-wip-us.apache.org/repos/asf/hbase/blob/efb82957/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java index d427419..dffd8e9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java @@ -159,10 +159,13 @@ public abstract class AbstractTestIPC { TestRpcServer rpcServer = new TestRpcServer(); try { rpcServer.start(); - InetSocketAddress address = rpcServer.getListenerAddress(); MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); final String message = "hello"; EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build(); + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } Pair<Message, CellScanner> r = client.call(null, md, param, md.getOutputType().toProto(), User.getCurrent(), address, new MetricsConnection.CallStats()); @@ -200,12 +203,14 @@ public abstract class AbstractTestIPC { TestRpcServer rpcServer = new TestRpcServer(); try { rpcServer.start(); - InetSocketAddress address = rpcServer.getListenerAddress(); MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController(CellUtil.createCellScanner(cells)); + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } Pair<Message, CellScanner> r = client.call(pcrc, md, param, md.getOutputType().toProto(), User.getCurrent(), address, new MetricsConnection.CallStats()); @@ -231,9 +236,12 @@ public abstract class AbstractTestIPC { AbstractRpcClient client = createRpcClientRTEDuringConnectionSetup(conf); try { rpcServer.start(); - InetSocketAddress address = rpcServer.getListenerAddress(); MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } client.call(null, md, param, null, User.getCurrent(), address, new MetricsConnection.CallStats()); fail("Expected an exception to have been thrown!"); @@ -258,10 +266,14 @@ public abstract class AbstractTestIPC { verify(scheduler).start(); MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } for (int i = 0; i < 10; i++) { client.call(new PayloadCarryingRpcController( CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))), md, param, - md.getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress(), + md.getOutputType().toProto(), User.getCurrent(), address, new MetricsConnection.CallStats()); } verify(scheduler, times(10)).dispatch((CallRunner) anyObject()); http://git-wip-us.apache.org/repos/asf/hbase/blob/efb82957/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java index d761ae9..b9d390a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java @@ -157,6 +157,9 @@ public class TestAsyncIPC extends AbstractTestIPC { try { rpcServer.start(); InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); @@ -193,6 +196,9 @@ public class TestAsyncIPC extends AbstractTestIPC { try { rpcServer.start(); InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); @@ -258,6 +264,9 @@ public class TestAsyncIPC extends AbstractTestIPC { try { rpcServer.start(); InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } long startTime = System.currentTimeMillis(); User user = User.getCurrent(); for (int i = 0; i < cycles; i++) { http://git-wip-us.apache.org/repos/asf/hbase/blob/efb82957/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java index 961001f..d379722 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java @@ -92,9 +92,12 @@ public class TestDelayedRpc { RpcClient rpcClient = RpcClientFactory.createClient( conf, HConstants.DEFAULT_CLUSTER_ID.toString()); try { + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel( - ServerName.valueOf(rpcServer.getListenerAddress().getHostName(), - rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()), + ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()), User.getCurrent(), RPC_CLIENT_TIMEOUT); TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub = TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel); @@ -174,9 +177,12 @@ public class TestDelayedRpc { RpcClient rpcClient = RpcClientFactory.createClient( conf, HConstants.DEFAULT_CLUSTER_ID.toString()); try { + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel( - ServerName.valueOf(rpcServer.getListenerAddress().getHostName(), - rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()), + ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()), User.getCurrent(), RPC_CLIENT_TIMEOUT); TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub = TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel); @@ -298,9 +304,12 @@ public class TestDelayedRpc { RpcClient rpcClient = RpcClientFactory.createClient( conf, HConstants.DEFAULT_CLUSTER_ID.toString()); try { + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel( - ServerName.valueOf(rpcServer.getListenerAddress().getHostName(), - rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()), + ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()), User.getCurrent(), 1000); TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub = TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel); http://git-wip-us.apache.org/repos/asf/hbase/blob/efb82957/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java index d1b8202..3fc1259 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java @@ -122,9 +122,12 @@ public class TestIPC extends AbstractTestIPC { rm.add(p); try { rpcServer.start(); - InetSocketAddress address = rpcServer.getListenerAddress(); long startTime = System.currentTimeMillis(); User user = User.getCurrent(); + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } for (int i = 0; i < cycles; i++) { List<CellScannable> cells = new ArrayList<CellScannable>(); // Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm); http://git-wip-us.apache.org/repos/asf/hbase/blob/efb82957/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java index cee459f..81869b4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java @@ -101,7 +101,11 @@ public class TestProtoBufRpc { Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)), new InetSocketAddress(ADDRESS, PORT), conf, new FifoRpcScheduler(conf, 10)); - this.isa = server.getListenerAddress(); + InetSocketAddress address = server.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } + this.isa = address; this.server.start(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/efb82957/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java index a4e55d9..a37ba11 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java @@ -180,9 +180,12 @@ public class TestRpcHandlerException { EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); PayloadCarryingRpcController controller = new PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList.of(CELL))); - + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } client.call(controller, md, param, md.getOutputType().toProto(), User.getCurrent(), - rpcServer.getListenerAddress(), new MetricsConnection.CallStats()); + address, new MetricsConnection.CallStats()); } catch (Throwable e) { assert(abortable.isAborted() == true); } finally { http://git-wip-us.apache.org/repos/asf/hbase/blob/efb82957/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java index 97e69b7..a3ac177 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver; import static org.junit.Assert.assertEquals; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -115,13 +116,16 @@ public class TestRSKilledWhenInitializing { @Override protected void handleReportForDutyResponse(RegionServerStartupResponse c) throws IOException { if (firstRS.getAndSet(false)) { + InetSocketAddress address = super.getRpcServer().getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } for (NameStringPair e : c.getMapEntriesList()) { String key = e.getName(); // The hostname the master sees us as. if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) { String hostnameFromMasterPOV = e.getValue(); - assertEquals(super.getRpcServer().getListenerAddress().getHostName(), - hostnameFromMasterPOV); + assertEquals(address.getHostName(), hostnameFromMasterPOV); } } while (!masterActive) { http://git-wip-us.apache.org/repos/asf/hbase/blob/efb82957/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java index 8eff063..66b8c75 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java @@ -143,11 +143,14 @@ public class TestSecureRPC { RpcClient rpcClient = RpcClientFactory.createClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString()); try { + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel( - ServerName.valueOf(rpcServer.getListenerAddress().getHostName(), rpcServer - .getListenerAddress().getPort(), System.currentTimeMillis()), User.getCurrent(), - 5000); + ServerName.valueOf(address.getHostName(), address.getPort(), + System.currentTimeMillis()), User.getCurrent(), 5000); TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub = TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel); List<Integer> results = new ArrayList<Integer>(); http://git-wip-us.apache.org/repos/asf/hbase/blob/efb82957/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java index c83e502..69c6e63 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java @@ -141,7 +141,11 @@ public class TestTokenAuthentication { AuthenticationProtos.AuthenticationService.BlockingInterface.class)); this.rpcServer = new RpcServer(this, "tokenServer", sai, initialIsa, conf, new FifoRpcScheduler(conf, 1)); - this.isa = this.rpcServer.getListenerAddress(); + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } + this.isa = address; this.sleeper = new Sleeper(1000, this); }