Repository: hbase
Updated Branches:
  refs/heads/master c536c8511 -> 45af3831f


http://git-wip-us.apache.org/repos/asf/hbase/blob/45af3831/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
index 7efe198..565f5bf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Descriptors.MethodDescriptor;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOutboundHandlerAdapter;
@@ -27,33 +25,12 @@ import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CellScannable;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.MetricsConnection;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.codec.Codec;
-import 
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
-import org.apache.hadoop.hbase.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
-import 
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
-import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.RPCTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.junit.experimental.categories.Category;
@@ -65,8 +42,6 @@ import org.junit.runners.Parameterized.Parameters;
 @Category({ RPCTests.class, SmallTests.class })
 public class TestAsyncIPC extends AbstractTestIPC {
 
-  private static final Log LOG = LogFactory.getLog(TestAsyncIPC.class);
-
   @Parameters
   public static Collection<Object[]> parameters() {
     List<Object[]> paramList = new ArrayList<>();
@@ -92,8 +67,8 @@ public class TestAsyncIPC extends AbstractTestIPC {
     if (useGlobalEventLoopGroup && AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP != 
null) {
       if (useNativeTransport
           && !(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst() instanceof 
EpollEventLoopGroup)
-          || (!useNativeTransport
-          && !(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst() instanceof 
NioEventLoopGroup))) {
+          || (!useNativeTransport && !(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP
+              .getFirst() instanceof NioEventLoopGroup))) {
         AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst().shutdownGracefully();
         AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP = null;
       }
@@ -123,80 +98,16 @@ public class TestAsyncIPC extends AbstractTestIPC {
   protected AsyncRpcClient 
createRpcClientRTEDuringConnectionSetup(Configuration conf) {
     setConf(conf);
     return new AsyncRpcClient(conf, new ChannelInitializer<SocketChannel>() {
+      @Override
+      protected void initChannel(SocketChannel ch) throws Exception {
+        ch.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
           @Override
-          protected void initChannel(SocketChannel ch) throws Exception {
-            ch.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
-              @Override
-              public void write(ChannelHandlerContext ctx, Object msg, 
ChannelPromise promise)
-                  throws Exception {
-                promise.setFailure(new RuntimeException("Injected fault"));
-              }
-            });
+          public void write(ChannelHandlerContext ctx, Object msg, 
ChannelPromise promise)
+              throws Exception {
+            promise.setFailure(new RuntimeException("Injected fault"));
           }
         });
-  }
-
-  public static void main(String[] args) throws IOException, SecurityException,
-      NoSuchMethodException, InterruptedException {
-    if (args.length != 2) {
-      System.out.println("Usage: TestAsyncIPC <CYCLES> <CELLS_PER_CYCLE>");
-      return;
-    }
-    // ((Log4JLogger)HBaseServer.LOG).getLogger().setLevel(Level.INFO);
-    // ((Log4JLogger)HBaseClient.LOG).getLogger().setLevel(Level.INFO);
-    int cycles = Integer.parseInt(args[0]);
-    int cellcount = Integer.parseInt(args[1]);
-    Configuration conf = HBaseConfiguration.create();
-    TestRpcServer rpcServer = new TestRpcServer();
-    MethodDescriptor md = 
SERVICE.getDescriptorForType().findMethodByName("echo");
-    EchoRequestProto param = 
EchoRequestProto.newBuilder().setMessage("hello").build();
-    KeyValue kv = BIG_CELL;
-    Put p = new Put(CellUtil.cloneRow(kv));
-    for (int i = 0; i < cellcount; i++) {
-      p.add(kv);
-    }
-    RowMutations rm = new RowMutations(CellUtil.cloneRow(kv));
-    rm.add(p);
-    try (AsyncRpcClient client = new AsyncRpcClient(conf)) {
-      rpcServer.start();
-      InetSocketAddress address = rpcServer.getListenerAddress();
-      if (address == null) {
-        throw new IOException("Listener channel is closed");
       }
-      long startTime = System.currentTimeMillis();
-      User user = User.getCurrent();
-      for (int i = 0; i < cycles; i++) {
-        List<CellScannable> cells = new ArrayList<>();
-        // Message param = 
RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm);
-        ClientProtos.RegionAction.Builder builder =
-            
RequestConverter.buildNoDataRegionAction(HConstants.EMPTY_BYTE_ARRAY, rm, cells,
-                RegionAction.newBuilder(), ClientProtos.Action.newBuilder(),
-                MutationProto.newBuilder());
-        builder.setRegion(RegionSpecifier
-            .newBuilder()
-            .setType(RegionSpecifierType.REGION_NAME)
-            .setValue(
-                
ByteString.copyFrom(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())));
-        if (i % 100000 == 0) {
-          LOG.info("" + i);
-          // Uncomment this for a thread dump every so often.
-          // ReflectionUtils.printThreadInfo(new PrintWriter(System.out),
-          // "Thread dump " + Thread.currentThread().getName());
-        }
-        PayloadCarryingRpcController pcrc =
-            new 
PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
-        // Pair<Message, CellScanner> response =
-        client.call(pcrc, md, builder.build(), param, user, address,
-            new MetricsConnection.CallStats());
-        /*
-         * int count = 0; while (p.getSecond().advance()) { count++; } 
assertEquals(cells.size(),
-         * count);
-         */
-      }
-      LOG.info("Cycled " + cycles + " time(s) with " + cellcount + " cell(s) 
in "
-          + (System.currentTimeMillis() - startTime) + "ms");
-    } finally {
-      rpcServer.stop();
-    }
+    });
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/45af3831/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
index 56de07d..b88cb7a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
@@ -22,37 +22,14 @@ import static org.mockito.Matchers.anyInt;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.spy;
 
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Descriptors.MethodDescriptor;
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.net.Socket;
-import java.util.ArrayList;
-import java.util.List;
 
 import javax.net.SocketFactory;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CellScannable;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.MetricsConnection;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.codec.Codec;
-import 
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
-import org.apache.hadoop.hbase.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
-import 
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
-import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.RPCTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.net.NetUtils;
@@ -64,8 +41,6 @@ import org.mockito.stubbing.Answer;
 @Category({ RPCTests.class, SmallTests.class })
 public class TestIPC extends AbstractTestIPC {
 
-  private static final Log LOG = LogFactory.getLog(TestIPC.class);
-
   @Override
   protected RpcClientImpl createRpcClientNoCodec(Configuration conf) {
     return new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT) {
@@ -96,71 +71,4 @@ public class TestIPC extends AbstractTestIPC {
 
     return new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT, spyFactory);
   }
-
-  public static void main(String[] args) throws IOException, SecurityException,
-      NoSuchMethodException, InterruptedException {
-    if (args.length != 2) {
-      System.out.println("Usage: TestIPC <CYCLES> <CELLS_PER_CYCLE>");
-      return;
-    }
-    // ((Log4JLogger)HBaseServer.LOG).getLogger().setLevel(Level.INFO);
-    // ((Log4JLogger)HBaseClient.LOG).getLogger().setLevel(Level.INFO);
-    int cycles = Integer.parseInt(args[0]);
-    int cellcount = Integer.parseInt(args[1]);
-    Configuration conf = HBaseConfiguration.create();
-    TestRpcServer rpcServer = new TestRpcServer();
-    MethodDescriptor md = 
SERVICE.getDescriptorForType().findMethodByName("echo");
-    EchoRequestProto param = 
EchoRequestProto.newBuilder().setMessage("hello").build();
-    RpcClientImpl client = new RpcClientImpl(conf, 
HConstants.CLUSTER_ID_DEFAULT);
-    KeyValue kv = BIG_CELL;
-    Put p = new Put(CellUtil.cloneRow(kv));
-    for (int i = 0; i < cellcount; i++) {
-      p.add(kv);
-    }
-    RowMutations rm = new RowMutations(CellUtil.cloneRow(kv));
-    rm.add(p);
-    try {
-      rpcServer.start();
-      long startTime = System.currentTimeMillis();
-      User user = User.getCurrent();
-      InetSocketAddress address = rpcServer.getListenerAddress();
-      if (address == null) {
-        throw new IOException("Listener channel is closed");
-      }
-      for (int i = 0; i < cycles; i++) {
-        List<CellScannable> cells = new ArrayList<>();
-        // Message param = 
RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm);
-        ClientProtos.RegionAction.Builder builder =
-            
RequestConverter.buildNoDataRegionAction(HConstants.EMPTY_BYTE_ARRAY, rm, cells,
-              RegionAction.newBuilder(), ClientProtos.Action.newBuilder(),
-              MutationProto.newBuilder());
-        builder.setRegion(RegionSpecifier
-            .newBuilder()
-            .setType(RegionSpecifierType.REGION_NAME)
-            .setValue(
-              
ByteString.copyFrom(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())));
-        if (i % 100000 == 0) {
-          LOG.info("" + i);
-          // Uncomment this for a thread dump every so often.
-          // ReflectionUtils.printThreadInfo(new PrintWriter(System.out),
-          // "Thread dump " + Thread.currentThread().getName());
-        }
-        PayloadCarryingRpcController pcrc =
-            new 
PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
-        // Pair<Message, CellScanner> response =
-        client.call(pcrc, md, builder.build(), param, user, address,
-            new MetricsConnection.CallStats());
-        /*
-         * int count = 0; while (p.getSecond().advance()) { count++; } 
assertEquals(cells.size(),
-         * count);
-         */
-      }
-      LOG.info("Cycled " + cycles + " time(s) with " + cellcount + " cell(s) 
in "
-          + (System.currentTimeMillis() - startTime) + "ms");
-    } finally {
-      client.close();
-      rpcServer.stop();
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/45af3831/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
index 81869b4..dcde844 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
@@ -17,42 +17,39 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
+import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
+import static 
org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.ServiceException;
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
-import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.RPCTests;
-import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
-import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
 import 
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
 import 
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
-import 
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
-import 
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
-import org.apache.hadoop.hbase.security.User;
+import 
org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RPCTests;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.Before;
 import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import com.google.protobuf.BlockingRpcChannel;
-import com.google.protobuf.BlockingService;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-
 /**
- * Test for testing protocol buffer based RPC mechanism.
- * This test depends on test.proto definition of types in 
<code>src/test/protobuf/test.proto</code>
- * and protobuf service definition from 
<code>src/test/protobuf/test_rpc_service.proto</code>
+ * Test for testing protocol buffer based RPC mechanism. This test depends on 
test.proto definition
+ * of types in <code>src/test/protobuf/test.proto</code> and protobuf service 
definition from
+ * <code>src/test/protobuf/test_rpc_service.proto</code>
  */
-@Category({RPCTests.class, MediumTests.class})
+@Category({ RPCTests.class, MediumTests.class })
 public class TestProtoBufRpc {
   public final static String ADDRESS = "localhost";
   public static int PORT = 0;
@@ -60,47 +57,18 @@ public class TestProtoBufRpc {
   private Configuration conf;
   private RpcServerInterface server;
 
-  /**
-   * Implementation of the test service defined out in TestRpcServiceProtos
-   */
-  static class PBServerImpl
-  implements TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface {
-    @Override
-    public EmptyResponseProto ping(RpcController unused,
-        EmptyRequestProto request) throws ServiceException {
-      return EmptyResponseProto.newBuilder().build();
-    }
-
-    @Override
-    public EchoResponseProto echo(RpcController unused, EchoRequestProto 
request)
-        throws ServiceException {
-      return EchoResponseProto.newBuilder().setMessage(request.getMessage())
-          .build();
-    }
-
-    @Override
-    public EmptyResponseProto error(RpcController unused,
-        EmptyRequestProto request) throws ServiceException {
-      throw new ServiceException("error", new IOException("error"));
-    }
-  }
-
   @Before
-  public  void setUp() throws IOException { // Setup server for both protocols
+  public void setUp() throws IOException { // Setup server for both protocols
     this.conf = HBaseConfiguration.create();
     Logger log = Logger.getLogger("org.apache.hadoop.ipc.HBaseServer");
     log.setLevel(Level.DEBUG);
     log = Logger.getLogger("org.apache.hadoop.ipc.HBaseServer.trace");
     log.setLevel(Level.TRACE);
     // Create server side implementation
-    PBServerImpl serverImpl = new PBServerImpl();
-    BlockingService service =
-      
TestRpcServiceProtos.TestProtobufRpcProto.newReflectiveBlockingService(serverImpl);
     // Get RPC server for server side implementation
     this.server = new RpcServer(null, "testrpc",
-        Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, 
null)),
-        new InetSocketAddress(ADDRESS, PORT), conf,
-        new FifoRpcScheduler(conf, 10));
+        Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, 
null)),
+        new InetSocketAddress(ADDRESS, PORT), conf, new FifoRpcScheduler(conf, 
10));
     InetSocketAddress address = server.getListenerAddress();
     if (address == null) {
       throw new IOException("Listener channel is closed");
@@ -118,25 +86,20 @@ public class TestProtoBufRpc {
   public void testProtoBufRpc() throws Exception {
     RpcClient rpcClient = RpcClientFactory.createClient(conf, 
HConstants.CLUSTER_ID_DEFAULT);
     try {
-      BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
-          ServerName.valueOf(this.isa.getHostName(), this.isa.getPort(), 
System.currentTimeMillis()),
-        User.getCurrent(), 0);
-      TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
-        TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel);
+      BlockingInterface stub = newBlockingStub(rpcClient, this.isa);
       // Test ping method
-      TestProtos.EmptyRequestProto emptyRequest =
-        TestProtos.EmptyRequestProto.newBuilder().build();
+      TestProtos.EmptyRequestProto emptyRequest = 
TestProtos.EmptyRequestProto.newBuilder().build();
       stub.ping(null, emptyRequest);
 
       // Test echo method
       EchoRequestProto echoRequest = 
EchoRequestProto.newBuilder().setMessage("hello").build();
       EchoResponseProto echoResponse = stub.echo(null, echoRequest);
-      Assert.assertEquals(echoResponse.getMessage(), "hello");
+      assertEquals(echoResponse.getMessage(), "hello");
 
       // Test error method - error should be thrown as RemoteException
       try {
         stub.error(null, emptyRequest);
-        Assert.fail("Expected exception is not thrown");
+        fail("Expected exception is not thrown");
       } catch (ServiceException e) {
       }
     } finally {

http://git-wip-us.apache.org/repos/asf/hbase/blob/45af3831/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java
new file mode 100644
index 0000000..ce7521e
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import com.google.protobuf.BlockingService;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import 
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto;
+import 
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
+import 
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
+import 
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
+import 
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
+import 
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto;
+import 
org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto;
+import 
org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
+import 
org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Threads;
+
[email protected]
+public class TestProtobufRpcServiceImpl implements BlockingInterface {
+
+  public static final BlockingService SERVICE = TestProtobufRpcProto
+      .newReflectiveBlockingService(new TestProtobufRpcServiceImpl());
+
+  public static BlockingInterface newBlockingStub(RpcClient client, 
InetSocketAddress addr)
+      throws IOException {
+    return newBlockingStub(client, addr, User.getCurrent());
+  }
+
+  public static BlockingInterface newBlockingStub(RpcClient client, 
InetSocketAddress addr,
+      User user) throws IOException {
+    return 
TestProtobufRpcProto.newBlockingStub(client.createBlockingRpcChannel(
+      ServerName.valueOf(addr.getHostName(), addr.getPort(), 
System.currentTimeMillis()), user, 0));
+  }
+
+  public static Interface newStub(RpcClient client, InetSocketAddress addr) 
throws IOException {
+    return TestProtobufRpcProto.newStub(client.createProtobufRpcChannel(
+      ServerName.valueOf(addr.getHostName(), addr.getPort(), 
System.currentTimeMillis()),
+      User.getCurrent(), 0));
+  }
+
+  @Override
+  public EmptyResponseProto ping(RpcController controller, EmptyRequestProto 
request)
+      throws ServiceException {
+    return EmptyResponseProto.getDefaultInstance();
+  }
+
+  @Override
+  public EchoResponseProto echo(RpcController controller, EchoRequestProto 
request)
+      throws ServiceException {
+    if (controller instanceof PayloadCarryingRpcController) {
+      PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) 
controller;
+      // If cells, scan them to check we are able to iterate what we were 
given and since this is an
+      // echo, just put them back on the controller creating a new block. 
Tests our block building.
+      CellScanner cellScanner = pcrc.cellScanner();
+      List<Cell> list = null;
+      if (cellScanner != null) {
+        list = new ArrayList<>();
+        try {
+          while (cellScanner.advance()) {
+            list.add(cellScanner.current());
+          }
+        } catch (IOException e) {
+          throw new ServiceException(e);
+        }
+      }
+      cellScanner = CellUtil.createCellScanner(list);
+      ((PayloadCarryingRpcController) controller).setCellScanner(cellScanner);
+    }
+    return 
EchoResponseProto.newBuilder().setMessage(request.getMessage()).build();
+  }
+
+  @Override
+  public EmptyResponseProto error(RpcController controller, EmptyRequestProto 
request)
+      throws ServiceException {
+    throw new ServiceException(new DoNotRetryIOException("server error!"));
+  }
+
+  @Override
+  public EmptyResponseProto pause(RpcController controller, PauseRequestProto 
request)
+      throws ServiceException {
+    Threads.sleepWithoutInterrupt(request.getMs());
+    return EmptyResponseProto.getDefaultInstance();
+  }
+
+  @Override
+  public AddrResponseProto addr(RpcController controller, EmptyRequestProto 
request)
+      throws ServiceException {
+    return 
AddrResponseProto.newBuilder().setAddr(RpcServer.getRemoteAddress().getHostAddress())
+        .build();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/45af3831/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
index a37ba11..749009f 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
@@ -17,108 +17,31 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
-import com.google.common.collect.ImmutableList;
+import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
+import static 
org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
+import static org.mockito.Mockito.mock;
+
 import com.google.common.collect.Lists;
-import com.google.protobuf.BlockingService;
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.MetricsConnection;
 import 
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
-import 
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
-import 
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
-import 
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
-import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
-import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
-import org.apache.hadoop.hbase.security.User;
+import 
org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
 import org.apache.hadoop.hbase.testclassification.RPCTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.util.Pair;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.mockito.Mockito;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.mockito.Mockito.mock;
 
-@Category({RPCTests.class, SmallTests.class})
+@Category({ RPCTests.class, SmallTests.class })
 public class TestRpcHandlerException {
-  private static final Log LOG = 
LogFactory.getLog(TestRpcHandlerException.class);
-  static String example = "xyz";
-  static byte[] CELL_BYTES = example.getBytes();
-  static Cell CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, 
CELL_BYTES);
 
   private final static Configuration CONF = HBaseConfiguration.create();
-  RpcExecutor rpcExecutor = Mockito.mock(RpcExecutor.class);
-
-  // We are using the test TestRpcServiceProtos generated classes and Service 
because they are
-  // available and basic with methods like 'echo', and ping. Below we make a 
blocking service
-  // by passing in implementation of blocking interface. We use this service 
in all tests that
-  // follow.
-  private static final BlockingService SERVICE =
-      TestRpcServiceProtos.TestProtobufRpcProto
-      .newReflectiveBlockingService(new TestRpcServiceProtos
-                 .TestProtobufRpcProto.BlockingInterface() {
-
-        @Override
-        public EmptyResponseProto ping(RpcController controller, 
EmptyRequestProto request)
-            throws ServiceException {
-          return null;
-        }
-
-        @Override
-        public EmptyResponseProto error(RpcController controller, 
EmptyRequestProto request)
-            throws ServiceException {
-          return null;
-        }
-
-        @Override
-        public EchoResponseProto echo(RpcController controller, 
EchoRequestProto request)
-            throws Error, RuntimeException {
-          if (controller instanceof PayloadCarryingRpcController) {
-            PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) 
controller;
-            // If cells, scan them to check we are able to iterate what we 
were given and since
-            // this is
-            // an echo, just put them back on the controller creating a new 
block. Tests our
-            // block
-            // building.
-            CellScanner cellScanner = pcrc.cellScanner();
-            List<Cell> list = null;
-            if (cellScanner != null) {
-               list = new ArrayList<Cell>();
-               try {
-                       while (cellScanner.advance()) {
-                               list.add(cellScanner.current());
-                               throw new StackOverflowError();
-                       }
-               } catch (StackOverflowError e) {
-                       throw e;
-               } catch (IOException e) {
-                       throw new RuntimeException(e);
-               }
-            }
-            cellScanner = CellUtil.createCellScanner(list);
-            ((PayloadCarryingRpcController) 
controller).setCellScanner(cellScanner);
-          }
-          return 
EchoResponseProto.newBuilder().setMessage(request.getMessage()).build();
-        }
-      });
 
   /**
    * Instance of server. We actually don't do anything speical in here so 
could just use
@@ -126,29 +49,18 @@ public class TestRpcHandlerException {
    */
   private static class TestRpcServer extends RpcServer {
 
-    TestRpcServer() throws IOException {
-      this(new FifoRpcScheduler(CONF, 1));
-    }
-
     TestRpcServer(RpcScheduler scheduler) throws IOException {
       super(null, "testRpcServer",
-                 Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, 
null)),
-                 new InetSocketAddress("localhost", 0), CONF, scheduler);
-    }
-
-    @Override
-    public Pair<Message, CellScanner> call(BlockingService service, 
MethodDescriptor md,
-      Message param, CellScanner cellScanner, long receiveTime, 
MonitoredRPCHandler status)
-          throws IOException {
-      return super.call(service, md, param, cellScanner, receiveTime, status);
+          Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)),
+          new InetSocketAddress("localhost", 0), CONF, scheduler);
     }
   }
 
-  /** Tests that the rpc scheduler is called when requests arrive.
-   *  When Rpc handler thread dies, the client will hang and the test will 
fail.
-   *  The test is meant to be a unit test to test the behavior.
-   *
-   * */
+  /**
+   * Tests that the rpc scheduler is called when requests arrive. When Rpc 
handler thread dies, the
+   * client will hang and the test will fail. The test is meant to be a unit 
test to test the
+   * behavior.
+   */
   private class AbortServer implements Abortable {
     private boolean aborted = false;
 
@@ -163,7 +75,8 @@ public class TestRpcHandlerException {
     }
   }
 
-  /* This is a unit test to make sure to abort region server when the number 
of Rpc handler thread
+  /*
+   * This is a unit test to make sure to abort region server when the number 
of Rpc handler thread
    * caught errors exceeds the threshold. Client will hang when RS aborts.
    */
   @Ignore
@@ -173,21 +86,12 @@ public class TestRpcHandlerException {
     Abortable abortable = new AbortServer();
     RpcScheduler scheduler = new SimpleRpcScheduler(CONF, 2, 0, 0, 
qosFunction, abortable, 0);
     RpcServer rpcServer = new TestRpcServer(scheduler);
-    RpcClientImpl client = new RpcClientImpl(CONF, 
HConstants.CLUSTER_ID_DEFAULT);
-    try {
+    try (RpcClientImpl client = new RpcClientImpl(CONF, 
HConstants.CLUSTER_ID_DEFAULT)) {
       rpcServer.start();
-      MethodDescriptor md = 
SERVICE.getDescriptorForType().findMethodByName("echo");
-      EchoRequestProto param = 
EchoRequestProto.newBuilder().setMessage("hello").build();
-      PayloadCarryingRpcController controller =
-          new 
PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList.of(CELL)));
-      InetSocketAddress address = rpcServer.getListenerAddress();
-      if (address == null) {
-        throw new IOException("Listener channel is closed");
-      }
-      client.call(controller, md, param, md.getOutputType().toProto(), 
User.getCurrent(),
-          address, new MetricsConnection.CallStats());
+      BlockingInterface stub = newBlockingStub(client, 
rpcServer.getListenerAddress());
+      stub.echo(null, 
EchoRequestProto.newBuilder().setMessage("hello").build());
     } catch (Throwable e) {
-      assert(abortable.isAborted() == true);
+      assert (abortable.isAborted() == true);
     } finally {
       rpcServer.stop();
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/45af3831/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java
index 385b7b0..c1b8de7 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hadoop.hbase.security;
 
+import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
+import static 
org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
 import static 
org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting;
 import static 
org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting;
 import static 
org.apache.hadoop.hbase.security.HBaseKerberosUtils.getSecuredConfiguration;
@@ -25,6 +27,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertSame;
 
+import com.google.common.collect.Lists;
+import com.google.protobuf.ServiceException;
+
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -32,27 +37,21 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
-import java.util.concurrent.ThreadLocalRandom;
 
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
+import javax.security.sasl.SaslException;
+
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
-import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
+import 
org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
 import org.apache.hadoop.minikdc.MiniKdc;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
@@ -64,12 +63,6 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.mockito.Mockito;
 
-import com.google.common.collect.Lists;
-import com.google.protobuf.BlockingRpcChannel;
-import com.google.protobuf.BlockingService;
-
-import javax.security.sasl.SaslException;
-
 public abstract class AbstractTestSecureIPC {
 
   private static final HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
@@ -77,55 +70,6 @@ public abstract class AbstractTestSecureIPC {
   private static final File KEYTAB_FILE = new 
File(TEST_UTIL.getDataTestDir("keytab").toUri()
       .getPath());
 
-  static final BlockingService SERVICE =
-      TestRpcServiceProtos.TestProtobufRpcProto.newReflectiveBlockingService(
-          new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
-
-            @Override
-            public TestProtos.EmptyResponseProto ping(RpcController controller,
-                                                      
TestProtos.EmptyRequestProto request)
-                throws ServiceException {
-              return null;
-            }
-
-            @Override
-            public TestProtos.EmptyResponseProto error(RpcController 
controller,
-                                                       
TestProtos.EmptyRequestProto request)
-                throws ServiceException {
-              return null;
-            }
-
-            @Override
-            public TestProtos.EchoResponseProto echo(RpcController controller,
-                                                     
TestProtos.EchoRequestProto request)
-                throws ServiceException {
-              if (controller instanceof PayloadCarryingRpcController) {
-                PayloadCarryingRpcController pcrc = 
(PayloadCarryingRpcController) controller;
-                // If cells, scan them to check we are able to iterate what we 
were given and since
-                // this is
-                // an echo, just put them back on the controller creating a 
new block. Tests our
-                // block
-                // building.
-                CellScanner cellScanner = pcrc.cellScanner();
-                List<Cell> list = null;
-                if (cellScanner != null) {
-                  list = new ArrayList<Cell>();
-                  try {
-                    while (cellScanner.advance()) {
-                      list.add(cellScanner.current());
-                    }
-                  } catch (IOException e) {
-                    throw new ServiceException(e);
-                  }
-                }
-                cellScanner = CellUtil.createCellScanner(list);
-                ((PayloadCarryingRpcController) 
controller).setCellScanner(cellScanner);
-              }
-              return TestProtos.EchoResponseProto.newBuilder()
-                  .setMessage(request.getMessage()).build();
-            }
-          });
-
   private static MiniKdc KDC;
   private static String HOST = "localhost";
   private static String PRINCIPAL;
@@ -262,16 +206,8 @@ public abstract class AbstractTestSecureIPC {
     rpcServer.start();
     try (RpcClient rpcClient = RpcClientFactory.createClient(clientConf,
         HConstants.DEFAULT_CLUSTER_ID.toString())) {
-      InetSocketAddress address = rpcServer.getListenerAddress();
-      if (address == null) {
-        throw new IOException("Listener channel is closed");
-      }
-      BlockingRpcChannel channel =
-          rpcClient.createBlockingRpcChannel(
-              ServerName.valueOf(address.getHostName(), address.getPort(),
-                  System.currentTimeMillis()), clientUser, 0);
-      TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
-          TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel);
+      BlockingInterface stub = newBlockingStub(rpcClient, 
rpcServer.getListenerAddress(),
+        clientUser);
       List<String> results = new ArrayList<>();
       TestThread th1 = new TestThread(stub, results);
       final Throwable exception[] = new Throwable[1];
@@ -298,11 +234,11 @@ public abstract class AbstractTestSecureIPC {
   }
 
   public static class TestThread extends Thread {
-      private final 
TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub;
+      private final BlockingInterface stub;
 
       private final List<String> results;
 
-          public 
TestThread(TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub, 
List<String> results) {
+          public TestThread(BlockingInterface stub, List<String> results) {
           this.stub = stub;
           this.results = results;
         }

Reply via email to