http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java deleted file mode 100644 index b506b88..0000000 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java +++ /dev/null @@ -1,223 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.ipc; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellScannable; -import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.Test; -import org.junit.experimental.categories.Category; - - -@Category(SmallTests.class) -public class TestPayloadCarryingRpcController { - @Test - public void testListOfCellScannerables() throws IOException { - List<CellScannable> cells = new ArrayList<CellScannable>(); - final int count = 10; - for (int i = 0; i < count; i++) { - cells.add(createCell(i)); - } - PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cells); - CellScanner cellScanner = controller.cellScanner(); - int index = 0; - for (; cellScanner.advance(); index++) { - Cell cell = cellScanner.current(); - byte [] indexBytes = Bytes.toBytes(index); - assertTrue("" + index, Bytes.equals(indexBytes, 0, indexBytes.length, cell.getValueArray(), - cell.getValueOffset(), cell.getValueLength())); - } - assertEquals(count, index); - } - - /** - * @param index - * @return A faked out 'Cell' that does nothing but return index as its value - */ - static CellScannable createCell(final int index) { - return new CellScannable() { - @Override - public CellScanner cellScanner() { - return new CellScanner() { - @Override - public Cell current() { - // Fake out a Cell. All this Cell has is a value that is an int in size and equal - // to the above 'index' param serialized as an int. - return new Cell() { - private final int i = index; - - @Override - public byte[] getRowArray() { - // unused - return null; - } - - @Override - public int getRowOffset() { - // unused - return 0; - } - - @Override - public short getRowLength() { - // unused - return 0; - } - - @Override - public byte[] getFamilyArray() { - // unused - return null; - } - - @Override - public int getFamilyOffset() { - // unused - return 0; - } - - @Override - public byte getFamilyLength() { - // unused - return 0; - } - - @Override - public byte[] getQualifierArray() { - // unused - return null; - } - - @Override - public int getQualifierOffset() { - // unused - return 0; - } - - @Override - public int getQualifierLength() { - // unused - return 0; - } - - @Override - public long getTimestamp() { - // unused - return 0; - } - - @Override - public byte getTypeByte() { - // unused - return 0; - } - - @Override - public long getMvccVersion() { - // unused - return 0; - } - - @Override - public long getSequenceId() { - // unused - return 0; - } - - @Override - public byte[] getValueArray() { - return Bytes.toBytes(this.i); - } - - @Override - public int getValueOffset() { - return 0; - } - - @Override - public int getValueLength() { - return Bytes.SIZEOF_INT; - } - - @Override - public int getTagsOffset() { - // unused - return 0; - } - - @Override - public int getTagsLength() { - // unused - return 0; - } - - @Override - public byte[] getTagsArray() { - // unused - return null; - } - - @Override - public byte[] getValue() { - // unused - return null; - } - - @Override - public byte[] getFamily() { - // unused - return null; - } - - @Override - public byte[] getQualifier() { - // unused - return null; - } - - @Override - public byte[] getRow() { - // unused - return null; - } - }; - } - - private boolean hasCell = true; - @Override - public boolean advance() { - // We have one Cell only so return true first time then false ever after. - if (!hasCell) return hasCell; - hasCell = false; - return true; - } - }; - } - }; - } -}
http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java new file mode 100644 index 0000000..12b3661 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java @@ -0,0 +1,309 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.security; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.base.Strings; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.callback.NameCallback; +import javax.security.auth.callback.PasswordCallback; +import javax.security.auth.callback.TextOutputCallback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.sasl.RealmCallback; +import javax.security.sasl.RealmChoiceCallback; +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; + +import org.apache.hadoop.hbase.security.AbstractHBaseSaslRpcClient.SaslClientCallbackHandler; +import org.apache.hadoop.hbase.testclassification.SecurityTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; +import org.mockito.Mockito; + +@Category({SecurityTests.class, SmallTests.class}) +public class TestHBaseSaslRpcClient { + + static { + System.setProperty("java.security.krb5.realm", "DOMAIN.COM"); + System.setProperty("java.security.krb5.kdc", "DOMAIN.COM"); + } + + static final String DEFAULT_USER_NAME = "principal"; + static final String DEFAULT_USER_PASSWORD = "password"; + + private static final Logger LOG = Logger.getLogger(TestHBaseSaslRpcClient.class); + + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @BeforeClass + public static void before() { + Logger.getRootLogger().setLevel(Level.DEBUG); + } + + @Test + public void testSaslClientUsesGivenRpcProtection() throws Exception { + Token<? extends TokenIdentifier> token = createTokenMockWithCredentials(DEFAULT_USER_NAME, + DEFAULT_USER_PASSWORD); + for (SaslUtil.QualityOfProtection qop : SaslUtil.QualityOfProtection.values()) { + String negotiatedQop = new HBaseSaslRpcClient(AuthMethod.DIGEST, token, + "principal/[email protected]", false, qop.name()) { + public String getQop() { + return saslProps.get(Sasl.QOP); + } + }.getQop(); + assertEquals(negotiatedQop, qop.getSaslQop()); + } + } + + @Test + public void testSaslClientCallbackHandler() throws UnsupportedCallbackException { + final Token<? extends TokenIdentifier> token = createTokenMock(); + when(token.getIdentifier()).thenReturn(DEFAULT_USER_NAME.getBytes()); + when(token.getPassword()).thenReturn(DEFAULT_USER_PASSWORD.getBytes()); + + final NameCallback nameCallback = mock(NameCallback.class); + final PasswordCallback passwordCallback = mock(PasswordCallback.class); + final RealmCallback realmCallback = mock(RealmCallback.class); + final RealmChoiceCallback realmChoiceCallback = mock(RealmChoiceCallback.class); + + Callback[] callbackArray = {nameCallback, passwordCallback, + realmCallback, realmChoiceCallback}; + final SaslClientCallbackHandler saslClCallbackHandler = new SaslClientCallbackHandler(token); + saslClCallbackHandler.handle(callbackArray); + verify(nameCallback).setName(anyString()); + verify(realmCallback).setText(anyString()); + verify(passwordCallback).setPassword(any(char[].class)); + } + + @Test + public void testSaslClientCallbackHandlerWithException() { + final Token<? extends TokenIdentifier> token = createTokenMock(); + when(token.getIdentifier()).thenReturn(DEFAULT_USER_NAME.getBytes()); + when(token.getPassword()).thenReturn(DEFAULT_USER_PASSWORD.getBytes()); + final SaslClientCallbackHandler saslClCallbackHandler = new SaslClientCallbackHandler(token); + try { + saslClCallbackHandler.handle(new Callback[] { mock(TextOutputCallback.class) }); + } catch (UnsupportedCallbackException expEx) { + //expected + } catch (Exception ex) { + fail("testSaslClientCallbackHandlerWithException error : " + ex.getMessage()); + } + } + + @Test + public void testHBaseSaslRpcClientCreation() throws Exception { + //creation kerberos principal check section + assertFalse(assertSuccessCreationKerberosPrincipal(null)); + assertFalse(assertSuccessCreationKerberosPrincipal("DOMAIN.COM")); + assertFalse(assertSuccessCreationKerberosPrincipal("principal/DOMAIN.COM")); + if (!assertSuccessCreationKerberosPrincipal("principal/[email protected]")) { + // XXX: This can fail if kerberos support in the OS is not sane, see HBASE-10107. + // For now, don't assert, just warn + LOG.warn("Could not create a SASL client with valid Kerberos credential"); + } + + //creation digest principal check section + assertFalse(assertSuccessCreationDigestPrincipal(null, null)); + assertFalse(assertSuccessCreationDigestPrincipal("", "")); + assertFalse(assertSuccessCreationDigestPrincipal("", null)); + assertFalse(assertSuccessCreationDigestPrincipal(null, "")); + assertTrue(assertSuccessCreationDigestPrincipal(DEFAULT_USER_NAME, DEFAULT_USER_PASSWORD)); + + //creation simple principal check section + assertFalse(assertSuccessCreationSimplePrincipal("", "")); + assertFalse(assertSuccessCreationSimplePrincipal(null, null)); + assertFalse(assertSuccessCreationSimplePrincipal(DEFAULT_USER_NAME, DEFAULT_USER_PASSWORD)); + + //exceptions check section + assertTrue(assertIOExceptionThenSaslClientIsNull(DEFAULT_USER_NAME, DEFAULT_USER_PASSWORD)); + assertTrue(assertIOExceptionWhenGetStreamsBeforeConnectCall( + DEFAULT_USER_NAME, DEFAULT_USER_PASSWORD)); + } + + @Test + public void testAuthMethodReadWrite() throws IOException { + DataInputBuffer in = new DataInputBuffer(); + DataOutputBuffer out = new DataOutputBuffer(); + + assertAuthMethodRead(in, AuthMethod.SIMPLE); + assertAuthMethodRead(in, AuthMethod.KERBEROS); + assertAuthMethodRead(in, AuthMethod.DIGEST); + + assertAuthMethodWrite(out, AuthMethod.SIMPLE); + assertAuthMethodWrite(out, AuthMethod.KERBEROS); + assertAuthMethodWrite(out, AuthMethod.DIGEST); + } + + private void assertAuthMethodRead(DataInputBuffer in, AuthMethod authMethod) + throws IOException { + in.reset(new byte[] {authMethod.code}, 1); + assertEquals(authMethod, AuthMethod.read(in)); + } + + private void assertAuthMethodWrite(DataOutputBuffer out, AuthMethod authMethod) + throws IOException { + authMethod.write(out); + assertEquals(authMethod.code, out.getData()[0]); + out.reset(); + } + + private boolean assertIOExceptionWhenGetStreamsBeforeConnectCall(String principal, + String password) throws IOException { + boolean inState = false; + boolean outState = false; + + HBaseSaslRpcClient rpcClient = new HBaseSaslRpcClient(AuthMethod.DIGEST, + createTokenMockWithCredentials(principal, password), principal, false) { + @Override + public SaslClient createDigestSaslClient(String[] mechanismNames, + String saslDefaultRealm, CallbackHandler saslClientCallbackHandler) + throws IOException { + return Mockito.mock(SaslClient.class); + } + + @Override + public SaslClient createKerberosSaslClient(String[] mechanismNames, + String userFirstPart, String userSecondPart) throws IOException { + return Mockito.mock(SaslClient.class); + } + }; + + try { + rpcClient.getInputStream(Mockito.mock(InputStream.class)); + } catch(IOException ex) { + //Sasl authentication exchange hasn't completed yet + inState = true; + } + + try { + rpcClient.getOutputStream(Mockito.mock(OutputStream.class)); + } catch(IOException ex) { + //Sasl authentication exchange hasn't completed yet + outState = true; + } + + return inState && outState; + } + + private boolean assertIOExceptionThenSaslClientIsNull(String principal, String password) { + try { + new HBaseSaslRpcClient(AuthMethod.DIGEST, + createTokenMockWithCredentials(principal, password), principal, false) { + @Override + public SaslClient createDigestSaslClient(String[] mechanismNames, + String saslDefaultRealm, CallbackHandler saslClientCallbackHandler) + throws IOException { + return null; + } + + @Override + public SaslClient createKerberosSaslClient(String[] mechanismNames, + String userFirstPart, String userSecondPart) throws IOException { + return null; + } + }; + return false; + } catch (IOException ex) { + return true; + } + } + + private boolean assertSuccessCreationKerberosPrincipal(String principal) { + HBaseSaslRpcClient rpcClient = null; + try { + rpcClient = createSaslRpcClientForKerberos(principal); + } catch(Exception ex) { + LOG.error(ex.getMessage(), ex); + } + return rpcClient != null; + } + + private boolean assertSuccessCreationDigestPrincipal(String principal, String password) { + HBaseSaslRpcClient rpcClient = null; + try { + rpcClient = new HBaseSaslRpcClient(AuthMethod.DIGEST, + createTokenMockWithCredentials(principal, password), principal, false); + } catch(Exception ex) { + LOG.error(ex.getMessage(), ex); + } + return rpcClient != null; + } + + private boolean assertSuccessCreationSimplePrincipal(String principal, String password) { + HBaseSaslRpcClient rpcClient = null; + try { + rpcClient = createSaslRpcClientSimple(principal, password); + } catch(Exception ex) { + LOG.error(ex.getMessage(), ex); + } + return rpcClient != null; + } + + private HBaseSaslRpcClient createSaslRpcClientForKerberos(String principal) + throws IOException { + return new HBaseSaslRpcClient(AuthMethod.KERBEROS, createTokenMock(), principal, false); + } + + private Token<? extends TokenIdentifier> createTokenMockWithCredentials( + String principal, String password) + throws IOException { + Token<? extends TokenIdentifier> token = createTokenMock(); + if (!Strings.isNullOrEmpty(principal) && !Strings.isNullOrEmpty(password)) { + when(token.getIdentifier()).thenReturn(DEFAULT_USER_NAME.getBytes()); + when(token.getPassword()).thenReturn(DEFAULT_USER_PASSWORD.getBytes()); + } + return token; + } + + private HBaseSaslRpcClient createSaslRpcClientSimple(String principal, String password) + throws IOException { + return new HBaseSaslRpcClient(AuthMethod.SIMPLE, createTokenMock(), principal, false); + } + + @SuppressWarnings("unchecked") + private Token<? extends TokenIdentifier> createTokenMock() { + return mock(Token.class); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java ---------------------------------------------------------------------- diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java index 0a7ac8a..f41efc7 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java @@ -19,10 +19,17 @@ package org.apache.hadoop.hbase.ipc; import static org.apache.hadoop.hbase.ipc.RpcClient.SPECIFIC_WRITE_THREAD; +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import com.google.common.collect.Lists; +import com.google.protobuf.BlockingService; +import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.Message; + import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; @@ -40,27 +47,17 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.codec.Codec; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; -import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; -import com.google.common.collect.Lists; -import com.google.protobuf.BlockingService; -import com.google.protobuf.Message; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; -import com.google.protobuf.Descriptors.MethodDescriptor; @Category(IntegrationTests.class) public class IntegrationTestRpcClient { @@ -95,38 +92,13 @@ public class IntegrationTestRpcClient { } } - static final BlockingService SERVICE = - TestRpcServiceProtos.TestProtobufRpcProto - .newReflectiveBlockingService(new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() { - - @Override - public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request) - throws ServiceException { - return null; - } - - @Override - public EmptyResponseProto error(RpcController controller, EmptyRequestProto request) - throws ServiceException { - return null; - } - - @Override - public EchoResponseProto echo(RpcController controller, EchoRequestProto request) - throws ServiceException { - return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build(); - } - }); - - protected AbstractRpcClient createRpcClient(Configuration conf, boolean isSyncClient) { - return isSyncClient ? - new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT) : - new AsyncRpcClient(conf) { - @Override - Codec getCodec() { - return null; - } - }; + protected AbstractRpcClient<?> createRpcClient(Configuration conf, boolean isSyncClient) { + return isSyncClient ? new BlockingRpcClient(conf) : new NettyRpcClient(conf) { + @Override + Codec getCodec() { + return null; + } + }; } static String BIG_PAYLOAD; @@ -283,7 +255,7 @@ public class IntegrationTestRpcClient { } static class SimpleClient extends Thread { - AbstractRpcClient rpcClient; + AbstractRpcClient<?> rpcClient; AtomicBoolean running = new AtomicBoolean(true); AtomicBoolean sending = new AtomicBoolean(false); AtomicReference<Throwable> exception = new AtomicReference<>(null); @@ -292,7 +264,7 @@ public class IntegrationTestRpcClient { long numCalls = 0; Random random = new Random(); - public SimpleClient(Cluster cluster, AbstractRpcClient rpcClient, String id) { + public SimpleClient(Cluster cluster, AbstractRpcClient<?> rpcClient, String id) { this.cluster = cluster; this.rpcClient = rpcClient; this.id = id; @@ -301,24 +273,16 @@ public class IntegrationTestRpcClient { @Override public void run() { - MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); - while (running.get()) { boolean isBigPayload = random.nextBoolean(); String message = isBigPayload ? BIG_PAYLOAD : id + numCalls; EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build(); - EchoResponseProto ret = EchoResponseProto.newBuilder().setMessage("foo").build(); - + EchoResponseProto ret; TestRpcServer server = cluster.getRandomServer(); try { - User user = User.getCurrent(); - InetSocketAddress address = server.getListenerAddress(); - if (address == null) { - throw new IOException("Listener channel is closed"); - } sending.set(true); - ret = (EchoResponseProto) - rpcClient.callBlockingMethod(md, null, param, ret, user, address); + BlockingInterface stub = newBlockingStub(rpcClient, server.getListenerAddress()); + ret = stub.echo(null, param); } catch (Exception e) { LOG.warn(e); continue; // expected in case connection is closing or closed @@ -360,7 +324,7 @@ public class IntegrationTestRpcClient { cluster.startServer(); conf.setBoolean(SPECIFIC_WRITE_THREAD, true); for(int i = 0; i <1000; i++) { - AbstractRpcClient rpcClient = createRpcClient(conf, true); + AbstractRpcClient<?> rpcClient = createRpcClient(conf, true); SimpleClient client = new SimpleClient(cluster, rpcClient, "Client1"); client.start(); while(!client.isSending()) { @@ -452,7 +416,7 @@ public class IntegrationTestRpcClient { ArrayList<SimpleClient> clients = new ArrayList<>(); // all threads should share the same rpc client - AbstractRpcClient rpcClient = createRpcClient(conf, isSyncClient); + AbstractRpcClient<?> rpcClient = createRpcClient(conf, isSyncClient); for (int i = 0; i < 30; i++) { String clientId = "client_" + i + "_"; http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index db73dfe..871ea65 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -20,6 +20,15 @@ package org.apache.hadoop.hbase.ipc; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.protobuf.BlockingService; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; +import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.Message; +import com.google.protobuf.ServiceException; +import com.google.protobuf.TextFormat; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; @@ -58,7 +67,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -68,20 +76,17 @@ import javax.security.sasl.SaslServer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.CallQueueTooBigException; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.Server; -import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.NeedUnmanagedConnectionException; -import org.apache.hadoop.hbase.client.Operation; import org.apache.hadoop.hbase.client.VersionInfoUtil; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.conf.ConfigurationObserver; @@ -90,7 +95,6 @@ import org.apache.hadoop.hbase.exceptions.RequestTooBigException; import org.apache.hadoop.hbase.io.BoundedByteBufferPool; import org.apache.hadoop.hbase.io.ByteBufferInputStream; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; -import org.apache.hadoop.hbase.io.BoundedByteBufferPool; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -102,17 +106,16 @@ import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation; -import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.AuthMethod; import org.apache.hadoop.hbase.security.HBasePolicyProvider; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer; -import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler; import org.apache.hadoop.hbase.security.SaslStatus; import org.apache.hadoop.hbase.security.SaslUtil; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; import org.apache.hadoop.hbase.util.Bytes; @@ -134,17 +137,8 @@ import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.StringUtils; -import org.codehaus.jackson.map.ObjectMapper; import org.apache.htrace.TraceInfo; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.google.protobuf.BlockingService; -import com.google.protobuf.CodedInputStream; -import com.google.protobuf.CodedOutputStream; -import com.google.protobuf.Descriptors.MethodDescriptor; -import com.google.protobuf.Message; -import com.google.protobuf.ServiceException; -import com.google.protobuf.TextFormat; +import org.codehaus.jackson.map.ObjectMapper; /** * An RPC server that hosts protobuf described Services. @@ -163,8 +157,6 @@ import com.google.protobuf.TextFormat; * * CallRunner#run executes the call. When done, asks the included Call to put itself on new * queue for Responder to pull from and return result to client. - * - * @see RpcClientImpl */ @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) @InterfaceStability.Evolving @@ -195,7 +187,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { */ private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024; - private final IPCUtil ipcUtil; + private final CellBlockBuilder cellBlockBuilder; private static final String AUTH_FAILED_FOR = "Auth failed for "; private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for "; @@ -468,7 +460,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { // Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the // reservoir when finished. This is hacky and the hack is not contained but benefits are // high when we can avoid a big buffer allocation on each rpc. - this.cellBlock = ipcUtil.buildCellBlock(this.connection.codec, + this.cellBlock = cellBlockBuilder.buildCellBlock(this.connection.codec, this.connection.compressionCodec, cells, reservoir); if (this.cellBlock != null) { CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder(); @@ -1175,6 +1167,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { * @throws IOException */ private boolean processResponse(final Call call) throws IOException { + LOG.info("processing " + call); boolean error = true; try { // Send as much data as we can in the non-blocking fashion @@ -1442,7 +1435,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } saslServer = Sasl.createSaslServer(AuthMethod.DIGEST .getMechanismName(), null, SaslUtil.SASL_DEFAULT_REALM, - SaslUtil.SASL_PROPS, new SaslDigestCallbackHandler( + HBaseSaslRpcServer.getSaslProps(), new SaslDigestCallbackHandler( secretManager, this)); break; default: @@ -1462,7 +1455,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { public Object run() throws SaslException { saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS .getMechanismName(), names[0], names[1], - SaslUtil.SASL_PROPS, new SaslGssCallbackHandler()); + HBaseSaslRpcServer.getSaslProps(), new SaslGssCallbackHandler()); return null; } }); @@ -1988,7 +1981,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } if (header.hasCellBlockMeta()) { buf.position(offset); - cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec, buf); + cellScanner = cellBlockBuilder.createCellScanner(this.codec, this.compressionCodec, buf); } } catch (Throwable t) { InetSocketAddress address = getListenerAddress(); @@ -2194,7 +2187,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true); this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true); - this.ipcUtil = new IPCUtil(conf); + this.cellBlockBuilder = new CellBlockBuilder(conf); // Create the responder here @@ -2346,7 +2339,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { status.setRPCPacket(param); status.resume("Servicing call"); //get an instance of the method arg type - PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner); + HBaseRpcController controller = new HBaseRpcControllerImpl(cellScanner); controller.setCallTimeout(timeout); Message result = service.callBlockingMethod(md, controller, param); long endTime = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index 61790d0..71d03ce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -18,6 +18,10 @@ */ package org.apache.hadoop.hbase.master; +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.ByteString; +import com.google.protobuf.ServiceException; + import java.io.IOException; import java.net.ConnectException; import java.net.InetAddress; @@ -52,7 +56,7 @@ import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.ipc.FailedServerException; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; @@ -66,25 +70,21 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo; -import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId; +import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RegionOpeningState; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Triple; import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.RetryCounterFactory; +import org.apache.hadoop.hbase.util.Triple; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.ByteString; -import com.google.protobuf.ServiceException; - /** * The ServerManager class manages info about region servers. * <p> @@ -871,7 +871,7 @@ public class ServerManager { } } - private PayloadCarryingRpcController newRpcController() { + private HBaseRpcController newRpcController() { return rpcControllerFactory == null ? null : rpcControllerFactory.newController(); } @@ -899,7 +899,7 @@ public class ServerManager { region.getRegionNameAsString() + " failed because no RPC connection found to this server"); } - PayloadCarryingRpcController controller = newRpcController(); + HBaseRpcController controller = newRpcController(); return ProtobufUtil.closeRegion(controller, admin, server, region.getRegionName(), versionOfClosingNode, dest, transitionInZK); } @@ -922,7 +922,7 @@ public class ServerManager { if (server == null) return; try { AdminService.BlockingInterface admin = getRsAdmin(server); - PayloadCarryingRpcController controller = newRpcController(); + HBaseRpcController controller = newRpcController(); ProtobufUtil.warmupRegion(controller, admin, region); } catch (IOException e) { LOG.error("Received exception in RPC for warmup server:" + @@ -938,7 +938,7 @@ public class ServerManager { public static void closeRegionSilentlyAndWait(ClusterConnection connection, ServerName server, HRegionInfo region, long timeout) throws IOException, InterruptedException { AdminService.BlockingInterface rs = connection.getAdmin(server); - PayloadCarryingRpcController controller = connection.getRpcControllerFactory().newController(); + HBaseRpcController controller = connection.getRpcControllerFactory().newController(); try { ProtobufUtil.closeRegion(controller, rs, server, region.getRegionName(), false); } catch (IOException e) { @@ -946,6 +946,7 @@ public class ServerManager { } long expiration = timeout + System.currentTimeMillis(); while (System.currentTimeMillis() < expiration) { + controller.reset(); try { HRegionInfo rsRegion = ProtobufUtil.getRegionInfo(controller, rs, region.getRegionName()); @@ -989,7 +990,7 @@ public class ServerManager { + region_b.getRegionNameAsString() + " failed because no RPC connection found to this server"); } - PayloadCarryingRpcController controller = newRpcController(); + HBaseRpcController controller = newRpcController(); ProtobufUtil.mergeRegions(controller, admin, region_a, region_b, forcible, user); } @@ -1008,7 +1009,7 @@ public class ServerManager { } } try { - PayloadCarryingRpcController controller = newRpcController(); + HBaseRpcController controller = newRpcController(); AdminService.BlockingInterface admin = getRsAdmin(server); if (admin != null) { ServerInfo info = ProtobufUtil.getServerInfo(controller, admin); http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java index 7a1031c..d13a79c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.protobuf; +import com.google.protobuf.ServiceException; + import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; @@ -35,7 +37,8 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.SizedCellScanner; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; @@ -46,8 +49,6 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALKey; -import com.google.protobuf.ServiceException; - @InterfaceAudience.Private public class ReplicationProtbufUtil { /** @@ -66,7 +67,7 @@ public class ReplicationProtbufUtil { Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(entries, null, replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir); - PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond()); + HBaseRpcController controller = new HBaseRpcControllerImpl(p.getSecond()); try { admin.replicateWALEntry(controller, p.getFirst()); } catch (ServiceException se) { http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 9c89260..49ce348 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -90,7 +90,7 @@ import org.apache.hadoop.hbase.exceptions.ScannerResetException; import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.PriorityFunction; import org.apache.hadoop.hbase.ipc.QosPriority; import org.apache.hadoop.hbase.ipc.RpcCallContext; @@ -392,7 +392,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } private void addResult(final MutateResponse.Builder builder, - final Result result, final PayloadCarryingRpcController rpcc) { + final Result result, final HBaseRpcController rpcc) { if (result == null) return; if (isClientCellBlockSupport()) { builder.setResult(ProtobufUtil.toResultNoData(result)); @@ -404,7 +404,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } private void addResults(final ScanResponse.Builder builder, final List<Result> results, - final PayloadCarryingRpcController controller, boolean isDefaultRegion) { + final HBaseRpcController controller, boolean isDefaultRegion) { builder.setStale(!isDefaultRegion); if (results.isEmpty()) { return; @@ -1795,7 +1795,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, public ReplicateWALEntryResponse replay(final RpcController controller, final ReplicateWALEntryRequest request) throws ServiceException { long before = EnvironmentEdgeManager.currentTime(); - CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner(); + CellScanner cells = ((HBaseRpcController) controller).cellScanner(); try { checkOpen(); List<WALEntry> entries = request.getEntryList(); @@ -1900,7 +1900,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (regionServer.replicationSinkHandler != null) { requestCount.increment(); List<WALEntry> entries = request.getEntryList(); - CellScanner cellScanner = ((PayloadCarryingRpcController)controller).cellScanner(); + CellScanner cellScanner = ((HBaseRpcController)controller).cellScanner(); regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(entries, cellScanner); regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner, request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(), @@ -2129,10 +2129,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } else if (r != null) { ClientProtos.Result pbr; RpcCallContext call = RpcServer.getCurrentCall(); - if (isClientCellBlockSupport(call) && controller instanceof PayloadCarryingRpcController + if (isClientCellBlockSupport(call) && controller instanceof HBaseRpcController && VersionInfoUtil.hasMinimumVersion(call.getClientVersionInfo(), 1, 3)) { pbr = ProtobufUtil.toResultNoData(r); - ((PayloadCarryingRpcController) controller) + ((HBaseRpcController) controller) .setCellScanner(CellUtil.createCellScanner(r.rawCells())); addSize(call, r, null); } else { @@ -2175,7 +2175,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. // It is also the conduit via which we pass back data. - PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc; + HBaseRpcController controller = (HBaseRpcController)rpcc; CellScanner cellScanner = controller != null ? controller.cellScanner(): null; if (controller != null) { controller.setCellScanner(null); @@ -2305,7 +2305,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, final MutateRequest request) throws ServiceException { // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. // It is also the conduit via which we pass back data. - PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc; + HBaseRpcController controller = (HBaseRpcController)rpcc; CellScanner cellScanner = controller != null ? controller.cellScanner() : null; OperationQuota quota = null; RpcCallContext context = RpcServer.getCurrentCall(); @@ -2530,7 +2530,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } - private long getTimeLimit(PayloadCarryingRpcController controller, + private long getTimeLimit(HBaseRpcController controller, boolean allowHeartbeatMessages) { // Set the time limit to be half of the more restrictive timeout value (one of the // timeout values must be positive). In the event that both values are positive, the @@ -2559,7 +2559,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } // return whether we have more results in region. - private boolean scan(PayloadCarryingRpcController controller, ScanRequest request, + private boolean scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh, long maxQuotaResultSize, int maxResults, List<Result> results, ScanResponse.Builder builder, MutableObject lastBlock, RpcCallContext context) throws IOException { @@ -2714,9 +2714,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, @Override public ScanResponse scan(final RpcController controller, final ScanRequest request) throws ServiceException { - if (controller != null && !(controller instanceof PayloadCarryingRpcController)) { + if (controller != null && !(controller instanceof HBaseRpcController)) { throw new UnsupportedOperationException( - "We only do PayloadCarryingRpcControllers! FIX IF A PROBLEM: " + controller); + "We only do HBaseRpcController! FIX IF A PROBLEM: " + controller); } if (!request.hasScannerId() && !request.hasScan()) { throw new ServiceException( @@ -2839,7 +2839,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } if (!done) { - moreResultsInRegion = scan((PayloadCarryingRpcController) controller, request, rsh, + moreResultsInRegion = scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, results, builder, lastBlock, context); } } @@ -2858,7 +2858,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // if we have reached the limit of rows moreResults = false; } - addResults(builder, results, (PayloadCarryingRpcController) controller, + addResults(builder, results, (HBaseRpcController) controller, RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())); if (!moreResults || !moreResultsInRegion || closeScanner) { scannerClosed = true; http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java index 1314a4d..5f6fd45 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java @@ -18,6 +18,8 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import com.google.protobuf.ServiceException; + import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -27,7 +29,6 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; @@ -35,10 +36,11 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.RegionServerCallable; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; @@ -49,8 +51,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; -import com.google.protobuf.ServiceException; - /** * This class is responsible for replaying the edits coming from a failed region server. * <p> @@ -214,7 +214,7 @@ public class WALEditsReplaySink { Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray); - PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond()); + HBaseRpcController controller = rpcControllerFactory.newController(p.getSecond()); try { remoteSvr.replay(controller, p.getFirst()); } catch (ServiceException se) { http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java index 0697013..235e27a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java @@ -18,6 +18,11 @@ package org.apache.hadoop.hbase.replication.regionserver; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.collect.Lists; +import com.google.protobuf.ServiceException; + import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; @@ -34,7 +39,6 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CellScanner; @@ -45,29 +49,22 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.TableDescriptors; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.RegionAdminServiceCallable; -import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.RetryingCallable; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; -import org.apache.hadoop.hbase.wal.WALKey; -import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController; -import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer; -import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter; -import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; -import org.apache.hadoop.hbase.wal.WAL.Entry; -import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers; -import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink; import org.apache.hadoop.hbase.replication.BaseWALEntryFilter; import org.apache.hadoop.hbase.replication.ChainWALEntryFilter; import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; @@ -76,13 +73,14 @@ import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers; +import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink; +import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController; +import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer; +import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter; import org.apache.hadoop.util.StringUtils; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.collect.Lists; -import com.google.protobuf.ServiceException; - /** * A {@link ReplicationEndpoint} endpoint which receives the WAL edits from the * WAL, and sends the edits to replicas of regions. @@ -658,7 +656,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray, location.getRegionInfo().getEncodedNameAsBytes(), null, null, null); try { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond()); + HBaseRpcController controller = rpcControllerFactory.newController(p.getSecond()); controller.setCallTimeout(timeout); controller.setPriority(tableName); return stub.replay(controller, p.getFirst()); http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java index 8b0fa70..644a70d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java @@ -22,6 +22,7 @@ import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; import java.util.Locale; +import java.util.Map; import javax.security.auth.callback.Callback; import javax.security.auth.callback.CallbackHandler; @@ -33,14 +34,14 @@ import javax.security.sasl.RealmCallback; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager; -import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.SecretManager.InvalidToken; +import org.apache.hadoop.security.token.TokenIdentifier; /** * A utility class for dealing with SASL on RPC server @@ -49,11 +50,17 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken; public class HBaseSaslRpcServer { private static final Log LOG = LogFactory.getLog(HBaseSaslRpcServer.class); + private static Map<String, String> saslProps = null; + public static void init(Configuration conf) { - SaslUtil.initSaslProperties(conf.get("hbase.rpc.protection", + saslProps = SaslUtil.initSaslProperties(conf.get("hbase.rpc.protection", QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT))); } + public static Map<String, String> getSaslProps() { + return saslProps; + } + public static <T extends TokenIdentifier> T getIdentifier(String id, SecretManager<T> secretManager) throws InvalidToken { byte[] tokenId = SaslUtil.decodeIdentifier(id); http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessorNoCluster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessorNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessorNoCluster.java index c537fe0..96ac5af 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessorNoCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessorNoCluster.java @@ -22,6 +22,9 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -32,7 +35,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.HConnectionTestingUtility; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; @@ -47,9 +50,6 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; - /** * Test MetaTableAccessor but without spinning up a cluster. * We mock regionserver back and forth (we do spin up a zk cluster). @@ -163,7 +163,7 @@ public class TestMetaTableAccessorNoCluster { .thenThrow(new ServiceException("Server not running (3 of 3)")) .thenAnswer(new Answer<ScanResponse>() { public ScanResponse answer(InvocationOnMock invocation) throws Throwable { - ((PayloadCarryingRpcController) invocation.getArguments()[0]).setCellScanner(CellUtil + ((HBaseRpcController) invocation.getArguments()[0]).setCellScanner(CellUtil .createCellScanner(cellScannables)); return builder.setScannerId(1234567890L).setMoreResults(false).build(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java index de80a7b..350dd84 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java @@ -19,9 +19,12 @@ package org.apache.hadoop.hbase; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertFalse; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; import java.io.IOException; import java.net.ConnectException; @@ -30,8 +33,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionTestingUtility; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.master.RegionState; @@ -53,9 +57,6 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.mockito.Mockito; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; - /** * Test {@link org.apache.hadoop.hbase.zookeeper.MetaTableLocator} */ @@ -253,7 +254,7 @@ public class TestMetaTableLocator { thenReturn(implementation); RpcControllerFactory controllerFactory = Mockito.mock(RpcControllerFactory.class); Mockito.when(controllerFactory.newController()).thenReturn( - Mockito.mock(PayloadCarryingRpcController.class)); + Mockito.mock(HBaseRpcController.class)); Mockito.when(connection.getRpcControllerFactory()).thenReturn(controllerFactory); ServerName sn = ServerName.valueOf("example.com", 1234, System.currentTimeMillis()); http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java index a475e5a..711f520 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java @@ -22,40 +22,38 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import com.google.protobuf.BlockingRpcChannel; +import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.Message; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.SocketTimeoutException; import java.net.UnknownHostException; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MasterNotRunningException; -import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ipc.AbstractRpcClient; -import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.ipc.BlockingRpcClient; import org.apache.hadoop.hbase.ipc.RpcClientFactory; -import org.apache.hadoop.hbase.ipc.RpcClientImpl; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -import com.google.protobuf.BlockingRpcChannel; -import com.google.protobuf.Descriptors.MethodDescriptor; -import com.google.protobuf.Message; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; - -@Category(MediumTests.class) +@Category({MediumTests.class, ClientTests.class}) public class TestClientTimeouts { - private static final Log LOG = LogFactory.getLog(TestClientTimeouts.class); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); protected static int SLAVES = 1; @@ -86,7 +84,6 @@ public class TestClientTimeouts { */ @Test public void testAdminTimeout() throws Exception { - Connection lastConnection = null; boolean lastFailed = false; int initialInvocations = RandomTimeoutBlockingRpcChannel.invokations.get(); RandomTimeoutRpcClient rpcClient = (RandomTimeoutRpcClient) RpcClientFactory @@ -98,12 +95,11 @@ public class TestClientTimeouts { // Ensure the HBaseAdmin uses a new connection by changing Configuration. Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); conf.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1)); - HBaseAdmin admin = null; + Admin admin = null; + Connection connection = null; try { - admin = new HBaseAdmin(conf); - Connection connection = admin.getConnection(); - assertFalse(connection == lastConnection); - lastConnection = connection; + connection = ConnectionFactory.createConnection(conf); + admin = connection.getAdmin(); // run some admin commands HBaseAdmin.checkHBaseAvailable(conf); admin.setBalancerRunning(false, false); @@ -112,10 +108,15 @@ public class TestClientTimeouts { // a MasterNotRunningException. It's a bug if we get other exceptions. lastFailed = true; } finally { - admin.close(); - if (admin.getConnection().isClosed()) { - rpcClient = (RandomTimeoutRpcClient) RpcClientFactory - .createClient(TEST_UTIL.getConfiguration(), TEST_UTIL.getClusterKey()); + if(admin != null) { + admin.close(); + if (admin.getConnection().isClosed()) { + rpcClient = (RandomTimeoutRpcClient) RpcClientFactory + .createClient(TEST_UTIL.getConfiguration(), TEST_UTIL.getClusterKey()); + } + } + if(connection != null) { + connection.close(); } } } @@ -130,7 +131,7 @@ public class TestClientTimeouts { /** * Rpc Channel implementation with RandomTimeoutBlockingRpcChannel */ - public static class RandomTimeoutRpcClient extends RpcClientImpl { + public static class RandomTimeoutRpcClient extends BlockingRpcClient { public RandomTimeoutRpcClient(Configuration conf, String clusterId, SocketAddress localAddr, MetricsConnection metrics) { super(conf, clusterId, localAddr, metrics); @@ -153,9 +154,9 @@ public class TestClientTimeouts { public static final double CHANCE_OF_TIMEOUT = 0.3; private static AtomicInteger invokations = new AtomicInteger(); - RandomTimeoutBlockingRpcChannel(final RpcClientImpl rpcClient, final ServerName sn, - final User ticket, final int rpcTimeout) throws UnknownHostException { - super(rpcClient, sn, ticket, rpcTimeout); + RandomTimeoutBlockingRpcChannel(final BlockingRpcClient rpcClient, final ServerName sn, + final User ticket, final int rpcTimeout) { + super(rpcClient, new InetSocketAddress(sn.getHostname(), sn.getPort()), ticket, rpcTimeout); } @Override @@ -172,4 +173,4 @@ public class TestClientTimeouts { return super.callBlockingMethod(md, controller, param, returnType); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java index b788e35..ad406b4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java @@ -21,6 +21,9 @@ import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + import java.io.IOException; import java.util.ArrayList; @@ -31,12 +34,11 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.PleaseHoldException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest; @@ -49,6 +51,7 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionRequest import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningRequest; +import org.apache.hadoop.hbase.testclassification.SmallTests; import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -57,9 +60,6 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; - @Category(SmallTests.class) public class TestHBaseAdminNoCluster { @@ -314,7 +314,7 @@ public class TestHBaseAdminNoCluster { RpcControllerFactory rpcControllerFactory = Mockito.mock(RpcControllerFactory.class); Mockito.when(connection.getRpcControllerFactory()).thenReturn(rpcControllerFactory); Mockito.when(rpcControllerFactory.newController()).thenReturn( - Mockito.mock(PayloadCarryingRpcController.class)); + Mockito.mock(HBaseRpcController.class)); // we need a real retrying caller RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(configuration); http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index f468c16..6622ae9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -18,6 +18,13 @@ */ package org.apache.hadoop.hbase.client; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import com.google.common.collect.Lists; import java.io.IOException; @@ -61,7 +68,6 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterBase; -import org.apache.hadoop.hbase.ipc.CallTimeoutException; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.ServerTooBusyException; @@ -86,13 +92,6 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestRule; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - /** * This class is for testing HBaseConnectionManager features */ @@ -726,8 +725,11 @@ public class TestHCM { c2.setInt(HConstants.HBASE_CLIENT_PAUSE, 1); // don't wait between retries. c2.setInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 0); // Server do not really expire c2.setBoolean(RpcClient.SPECIFIC_WRITE_THREAD, allowsInterrupt); - - final HTable table = new HTable(c2, tableName); + c2.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 2000); + c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1000); + ConnectionManager.HConnectionImplementation conn = + (ConnectionManager.HConnectionImplementation) ConnectionManager.createConnection(c2); + final HTable table = (HTable) conn.getTable(tableName); Put put = new Put(ROW); put.add(FAM_NAM, ROW, ROW); @@ -749,6 +751,7 @@ public class TestHCM { done++; if (done % 100 == 0) LOG.info("done=" + done); + Thread.sleep(100); } } catch (Throwable t) { failed.set(t); @@ -766,8 +769,6 @@ public class TestHCM { }); ServerName sn = table.getRegionLocation(ROW).getServerName(); - ConnectionManager.HConnectionImplementation conn = - (ConnectionManager.HConnectionImplementation) table.getConnection(); RpcClient rpcClient = conn.getRpcClient(); LOG.info("Going to cancel connections. connection=" + conn.toString() + ", sn=" + sn); http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java index eb989d2..1d49460 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java @@ -21,6 +21,8 @@ import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import com.google.common.collect.Lists; + import java.io.IOException; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -30,21 +32,19 @@ import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.ProtobufCoprocessorService; -import org.apache.hadoop.hbase.ipc.DelegatingPayloadCarryingRpcController; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.DelegatingHBaseRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -import com.google.common.collect.Lists; - @Category(MediumTests.class) public class TestRpcControllerFactory { @@ -55,27 +55,27 @@ public class TestRpcControllerFactory { } @Override - public PayloadCarryingRpcController newController() { + public HBaseRpcController newController() { return new CountingRpcController(super.newController()); } @Override - public PayloadCarryingRpcController newController(final CellScanner cellScanner) { + public HBaseRpcController newController(final CellScanner cellScanner) { return new CountingRpcController(super.newController(cellScanner)); } @Override - public PayloadCarryingRpcController newController(final List<CellScannable> cellIterables) { + public HBaseRpcController newController(final List<CellScannable> cellIterables) { return new CountingRpcController(super.newController(cellIterables)); } } - public static class CountingRpcController extends DelegatingPayloadCarryingRpcController { + public static class CountingRpcController extends DelegatingHBaseRpcController { private static AtomicInteger INT_PRIORITY = new AtomicInteger(); private static AtomicInteger TABLE_PRIORITY = new AtomicInteger(); - public CountingRpcController(PayloadCarryingRpcController delegate) { + public CountingRpcController(HBaseRpcController delegate) { super(delegate); } http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java index cdda28a..da12683 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java @@ -23,19 +23,24 @@ import com.google.protobuf.RpcController; import com.google.protobuf.Service; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos; import org.apache.hadoop.hbase.protobuf.ResponseConverter; +import org.apache.hadoop.hbase.util.Threads; import java.io.IOException; /** * Test implementation of a coprocessor endpoint exposing the - * {@link TestRpcServiceProtos.TestProtobufRpcProto} service methods. For internal use by - * unit tests only. + * {@link TestRpcServiceProtos.TestProtobufRpcProto} service methods. For internal use by unit tests + * only. */ -public class ProtobufCoprocessorService - extends TestRpcServiceProtos.TestProtobufRpcProto +public class ProtobufCoprocessorService extends TestRpcServiceProtos.TestProtobufRpcProto implements CoprocessorService, Coprocessor { public ProtobufCoprocessorService() { } @@ -47,31 +52,46 @@ public class ProtobufCoprocessorService @Override public void ping(RpcController controller, TestProtos.EmptyRequestProto request, - RpcCallback<TestProtos.EmptyResponseProto> done) { + RpcCallback<TestProtos.EmptyResponseProto> done) { done.run(TestProtos.EmptyResponseProto.getDefaultInstance()); } @Override public void echo(RpcController controller, TestProtos.EchoRequestProto request, - RpcCallback<TestProtos.EchoResponseProto> done) { + RpcCallback<TestProtos.EchoResponseProto> done) { String message = request.getMessage(); done.run(TestProtos.EchoResponseProto.newBuilder().setMessage(message).build()); } @Override public void error(RpcController controller, TestProtos.EmptyRequestProto request, - RpcCallback<TestProtos.EmptyResponseProto> done) { + RpcCallback<TestProtos.EmptyResponseProto> done) { ResponseConverter.setControllerException(controller, new IOException("Test exception")); done.run(null); } @Override + public void pause(RpcController controller, PauseRequestProto request, + RpcCallback<EmptyResponseProto> done) { + Threads.sleep(request.getMs()); + done.run(EmptyResponseProto.getDefaultInstance()); + } + + @Override + public void addr(RpcController controller, EmptyRequestProto request, + RpcCallback<AddrResponseProto> done) { + done.run(AddrResponseProto.newBuilder().setAddr(RpcServer.getRemoteAddress().getHostAddress()) + .build()); + } + + @Override public void start(CoprocessorEnvironment env) throws IOException { - //To change body of implemented methods use File | Settings | File Templates. + // To change body of implemented methods use File | Settings | File Templates. } @Override public void stop(CoprocessorEnvironment env) throws IOException { - //To change body of implemented methods use File | Settings | File Templates. + // To change body of implemented methods use File | Settings | File Templates. } + }
