http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java index ce32ed9..3f43f7f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java @@ -25,17 +25,8 @@ import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.Map; -import javax.security.auth.callback.Callback; -import javax.security.auth.callback.CallbackHandler; -import javax.security.auth.callback.NameCallback; -import javax.security.auth.callback.PasswordCallback; -import javax.security.auth.callback.UnsupportedCallbackException; -import javax.security.sasl.RealmCallback; -import javax.security.sasl.RealmChoiceCallback; import javax.security.sasl.Sasl; -import javax.security.sasl.SaslClient; import javax.security.sasl.SaslException; import org.apache.commons.logging.Log; @@ -48,105 +39,23 @@ import org.apache.hadoop.security.SaslOutputStream; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; -import com.google.common.annotations.VisibleForTesting; - /** - * A utility class that encapsulates SASL logic for RPC client. - * Copied from <code>org.apache.hadoop.security</code> + * A utility class that encapsulates SASL logic for RPC client. Copied from + * <code>org.apache.hadoop.security</code> */ @InterfaceAudience.Private -public class HBaseSaslRpcClient { - private static final Log LOG = LogFactory.getLog(HBaseSaslRpcClient.class); +public class HBaseSaslRpcClient extends AbstractHBaseSaslRpcClient { - private final SaslClient saslClient; - private final boolean fallbackAllowed; - protected final Map<String, String> saslProps; - /** - * Create a HBaseSaslRpcClient for an authentication method - * - * @param method - * the requested authentication method - * @param token - * token to use if needed by the authentication method - * @param serverPrincipal - * the server principal that we are trying to set the connection up to - * @param fallbackAllowed - * does the client allow fallback to simple authentication - * @throws IOException - */ - public HBaseSaslRpcClient(AuthMethod method, - Token<? extends TokenIdentifier> token, String serverPrincipal, boolean fallbackAllowed) - throws IOException { - this(method, token, serverPrincipal, fallbackAllowed, "authentication"); - } - /** - * Create a HBaseSaslRpcClient for an authentication method - * - * @param method - * the requested authentication method - * @param token - * token to use if needed by the authentication method - * @param serverPrincipal - * the server principal that we are trying to set the connection up to - * @param fallbackAllowed - * does the client allow fallback to simple authentication - * @param rpcProtection - * the protection level ("authentication", "integrity" or "privacy") - * @throws IOException - */ - public HBaseSaslRpcClient(AuthMethod method, - Token<? extends TokenIdentifier> token, String serverPrincipal, boolean fallbackAllowed, - String rpcProtection) throws IOException { - this.fallbackAllowed = fallbackAllowed; - saslProps = SaslUtil.initSaslProperties(rpcProtection); - switch (method) { - case DIGEST: - if (LOG.isDebugEnabled()) - LOG.debug("Creating SASL " + AuthMethod.DIGEST.getMechanismName() - + " client to authenticate to service at " + token.getService()); - saslClient = createDigestSaslClient( - new String[] { AuthMethod.DIGEST.getMechanismName() }, - SaslUtil.SASL_DEFAULT_REALM, new SaslClientCallbackHandler(token)); - break; - case KERBEROS: - if (LOG.isDebugEnabled()) { - LOG - .debug("Creating SASL " + AuthMethod.KERBEROS.getMechanismName() - + " client. Server's Kerberos principal name is " - + serverPrincipal); - } - if (serverPrincipal == null || serverPrincipal.length() == 0) { - throw new IOException( - "Failed to specify server's Kerberos principal name"); - } - String[] names = SaslUtil.splitKerberosName(serverPrincipal); - if (names.length != 3) { - throw new IOException( - "Kerberos principal does not have the expected format: " - + serverPrincipal); - } - saslClient = createKerberosSaslClient( - new String[] { AuthMethod.KERBEROS.getMechanismName() }, - names[0], names[1]); - break; - default: - throw new IOException("Unknown authentication method " + method); - } - if (saslClient == null) - throw new IOException("Unable to find SASL client implementation"); - } + private static final Log LOG = LogFactory.getLog(HBaseSaslRpcClient.class); - protected SaslClient createDigestSaslClient(String[] mechanismNames, - String saslDefaultRealm, CallbackHandler saslClientCallbackHandler) - throws IOException { - return Sasl.createSaslClient(mechanismNames, null, null, saslDefaultRealm, - saslProps, saslClientCallbackHandler); + public HBaseSaslRpcClient(AuthMethod method, Token<? extends TokenIdentifier> token, + String serverPrincipal, boolean fallbackAllowed) throws IOException { + super(method, token, serverPrincipal, fallbackAllowed); } - protected SaslClient createKerberosSaslClient(String[] mechanismNames, - String userFirstPart, String userSecondPart) throws IOException { - return Sasl.createSaslClient(mechanismNames, null, userFirstPart, - userSecondPart, saslProps, null); + public HBaseSaslRpcClient(AuthMethod method, Token<? extends TokenIdentifier> token, + String serverPrincipal, boolean fallbackAllowed, String rpcProtection) throws IOException { + super(method, token, serverPrincipal, fallbackAllowed, rpcProtection); } private static void readStatus(DataInputStream inStream) throws IOException { @@ -158,72 +67,65 @@ public class HBaseSaslRpcClient { } /** - * Do client side SASL authentication with server via the given InputStream - * and OutputStream - * - * @param inS - * InputStream to use - * @param outS - * OutputStream to use - * @return true if connection is set up, or false if needs to switch - * to simple Auth. + * Do client side SASL authentication with server via the given InputStream and OutputStream + * @param inS InputStream to use + * @param outS OutputStream to use + * @return true if connection is set up, or false if needs to switch to simple Auth. * @throws IOException */ - public boolean saslConnect(InputStream inS, OutputStream outS) - throws IOException { + public boolean saslConnect(InputStream inS, OutputStream outS) throws IOException { DataInputStream inStream = new DataInputStream(new BufferedInputStream(inS)); - DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream( - outS)); + DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(outS)); try { - byte[] saslToken = new byte[0]; - if (saslClient.hasInitialResponse()) - saslToken = saslClient.evaluateChallenge(saslToken); + byte[] saslToken = getInitialResponse(); if (saslToken != null) { outStream.writeInt(saslToken.length); outStream.write(saslToken, 0, saslToken.length); outStream.flush(); - if (LOG.isDebugEnabled()) - LOG.debug("Have sent token of size " + saslToken.length - + " from initSASLContext."); + if (LOG.isDebugEnabled()) { + LOG.debug("Have sent token of size " + saslToken.length + " from initSASLContext."); + } } - if (!saslClient.isComplete()) { + if (!isComplete()) { readStatus(inStream); int len = inStream.readInt(); if (len == SaslUtil.SWITCH_TO_SIMPLE_AUTH) { if (!fallbackAllowed) { - throw new IOException("Server asks us to fall back to SIMPLE auth, " + - "but this client is configured to only allow secure connections."); + throw new IOException("Server asks us to fall back to SIMPLE auth, " + + "but this client is configured to only allow secure connections."); } if (LOG.isDebugEnabled()) { LOG.debug("Server asks us to fall back to simple auth."); } - saslClient.dispose(); + dispose(); return false; } saslToken = new byte[len]; - if (LOG.isDebugEnabled()) + if (LOG.isDebugEnabled()) { LOG.debug("Will read input token of size " + saslToken.length + " for processing by initSASLContext"); + } inStream.readFully(saslToken); } - while (!saslClient.isComplete()) { - saslToken = saslClient.evaluateChallenge(saslToken); + while (!isComplete()) { + saslToken = evaluateChallenge(saslToken); if (saslToken != null) { - if (LOG.isDebugEnabled()) - LOG.debug("Will send token of size " + saslToken.length - + " from initSASLContext."); + if (LOG.isDebugEnabled()) { + LOG.debug("Will send token of size " + saslToken.length + " from initSASLContext."); + } outStream.writeInt(saslToken.length); outStream.write(saslToken, 0, saslToken.length); outStream.flush(); } - if (!saslClient.isComplete()) { + if (!isComplete()) { readStatus(inStream); saslToken = new byte[inStream.readInt()]; - if (LOG.isDebugEnabled()) + if (LOG.isDebugEnabled()) { LOG.debug("Will read input token of size " + saslToken.length + " for processing by initSASLContext"); + } inStream.readFully(saslToken); } } @@ -243,11 +145,8 @@ public class HBaseSaslRpcClient { } /** - * Get a SASL wrapped InputStream. Can be called only after saslConnect() has - * been called. - * - * @param in - * the InputStream to wrap + * Get a SASL wrapped InputStream. Can be called only after saslConnect() has been called. + * @param in the InputStream to wrap * @return a SASL wrapped InputStream * @throws IOException */ @@ -259,11 +158,8 @@ public class HBaseSaslRpcClient { } /** - * Get a SASL wrapped OutputStream. Can be called only after saslConnect() has - * been called. - * - * @param out - * the OutputStream to wrap + * Get a SASL wrapped OutputStream. Can be called only after saslConnect() has been called. + * @param out the OutputStream to wrap * @return a SASL wrapped OutputStream * @throws IOException */ @@ -273,58 +169,4 @@ public class HBaseSaslRpcClient { } return new SaslOutputStream(out, saslClient); } - - /** Release resources used by wrapped saslClient */ - public void dispose() throws SaslException { - saslClient.dispose(); - } - - @VisibleForTesting - static class SaslClientCallbackHandler implements CallbackHandler { - private final String userName; - private final char[] userPassword; - - public SaslClientCallbackHandler(Token<? extends TokenIdentifier> token) { - this.userName = SaslUtil.encodeIdentifier(token.getIdentifier()); - this.userPassword = SaslUtil.encodePassword(token.getPassword()); - } - - @Override - public void handle(Callback[] callbacks) - throws UnsupportedCallbackException { - NameCallback nc = null; - PasswordCallback pc = null; - RealmCallback rc = null; - for (Callback callback : callbacks) { - if (callback instanceof RealmChoiceCallback) { - continue; - } else if (callback instanceof NameCallback) { - nc = (NameCallback) callback; - } else if (callback instanceof PasswordCallback) { - pc = (PasswordCallback) callback; - } else if (callback instanceof RealmCallback) { - rc = (RealmCallback) callback; - } else { - throw new UnsupportedCallbackException(callback, - "Unrecognized SASL client callback"); - } - } - if (nc != null) { - if (LOG.isDebugEnabled()) - LOG.debug("SASL client callback: setting username: " + userName); - nc.setName(userName); - } - if (pc != null) { - if (LOG.isDebugEnabled()) - LOG.debug("SASL client callback: setting userPassword"); - pc.setPassword(userPassword); - } - if (rc != null) { - if (LOG.isDebugEnabled()) - LOG.debug("SASL client callback: setting realm: " - + rc.getDefaultText()); - rc.setText(rc.getDefaultText()); - } - } - } }
http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslChallengeDecoder.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslChallengeDecoder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslChallengeDecoder.java new file mode 100644 index 0000000..d818097 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslChallengeDecoder.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.security; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.ipc.RemoteException; + +/** + * Decode the sasl challenge sent by RpcServer. + */ +@InterfaceAudience.Private +public class SaslChallengeDecoder extends ByteToMessageDecoder { + + private static final int MAX_CHALLENGE_SIZE = 1024 * 1024; // 1M + + private ByteBuf tryDecodeChallenge(ByteBuf in, int offset, int readableBytes) throws IOException { + if (readableBytes < 4) { + return null; + } + int len = in.getInt(offset); + if (len <= 0) { + // fall back to simple + in.readerIndex(offset + 4); + return in.retainedSlice(offset, 4); + } + if (len > MAX_CHALLENGE_SIZE) { + throw new IOException( + "Sasl challenge too large(" + len + "), max allowed is " + MAX_CHALLENGE_SIZE); + } + int totalLen = 4 + len; + if (readableBytes < totalLen) { + return null; + } + in.readerIndex(offset + totalLen); + return in.retainedSlice(offset, totalLen); + } + + // will throw a RemoteException out if data is enough, so do not need to return anything. + private void tryDecodeError(ByteBuf in, int offset, int readableBytes) throws IOException { + if (readableBytes < 4) { + return; + } + int classLen = in.getInt(offset); + if (classLen <= 0) { + throw new IOException("Invalid exception class name length " + classLen); + } + if (classLen > MAX_CHALLENGE_SIZE) { + throw new IOException("Exception class name length too large(" + classLen + + "), max allowed is " + MAX_CHALLENGE_SIZE); + } + if (readableBytes < 4 + classLen + 4) { + return; + } + int msgLen = in.getInt(offset + 4 + classLen); + if (msgLen <= 0) { + throw new IOException("Invalid exception message length " + msgLen); + } + if (msgLen > MAX_CHALLENGE_SIZE) { + throw new IOException("Exception message length too large(" + msgLen + "), max allowed is " + + MAX_CHALLENGE_SIZE); + } + int totalLen = classLen + msgLen + 8; + if (readableBytes < totalLen) { + return; + } + String className = in.toString(offset + 4, classLen, HConstants.UTF8_CHARSET); + String msg = in.toString(offset + classLen + 8, msgLen, HConstants.UTF8_CHARSET); + in.readerIndex(offset + totalLen); + throw new RemoteException(className, msg); + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { + int readableBytes = in.readableBytes(); + if (readableBytes < 4) { + return; + } + int offset = in.readerIndex(); + int status = in.getInt(offset); + if (status == SaslStatus.SUCCESS.state) { + ByteBuf challenge = tryDecodeChallenge(in, offset + 4, readableBytes - 4); + if (challenge != null) { + out.add(challenge); + } + } else { + tryDecodeError(in, offset + 4, readableBytes - 4); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java deleted file mode 100644 index d583e20..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java +++ /dev/null @@ -1,382 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.security; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.Channel; -import io.netty.channel.ChannelDuplexHandler; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPromise; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; - -import javax.security.auth.callback.CallbackHandler; -import javax.security.sasl.Sasl; -import javax.security.sasl.SaslClient; -import javax.security.sasl.SaslException; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.security.PrivilegedExceptionAction; -import java.util.Map; -import java.util.Random; - -/** - * Handles Sasl connections - */ -@InterfaceAudience.Private -public class SaslClientHandler extends ChannelDuplexHandler { - private static final Log LOG = LogFactory.getLog(SaslClientHandler.class); - - private final boolean fallbackAllowed; - - private final UserGroupInformation ticket; - - /** - * Used for client or server's token to send or receive from each other. - */ - private final SaslClient saslClient; - private final Map<String, String> saslProps; - private final SaslExceptionHandler exceptionHandler; - private final SaslSuccessfulConnectHandler successfulConnectHandler; - private byte[] saslToken; - private byte[] connectionHeader; - private boolean firstRead = true; - - private int retryCount = 0; - private Random random; - - /** - * @param ticket the ugi - * @param method auth method - * @param token for Sasl - * @param serverPrincipal Server's Kerberos principal name - * @param fallbackAllowed True if server may also fall back to less secure connection - * @param rpcProtection Quality of protection. Can be 'authentication', 'integrity' or - * 'privacy'. - * @throws java.io.IOException if handler could not be created - */ - public SaslClientHandler(UserGroupInformation ticket, AuthMethod method, - Token<? extends TokenIdentifier> token, String serverPrincipal, boolean fallbackAllowed, - String rpcProtection, byte[] connectionHeader, SaslExceptionHandler exceptionHandler, - SaslSuccessfulConnectHandler successfulConnectHandler) throws IOException { - this.ticket = ticket; - this.fallbackAllowed = fallbackAllowed; - this.connectionHeader = connectionHeader; - - this.exceptionHandler = exceptionHandler; - this.successfulConnectHandler = successfulConnectHandler; - - saslProps = SaslUtil.initSaslProperties(rpcProtection); - switch (method) { - case DIGEST: - if (LOG.isDebugEnabled()) - LOG.debug("Creating SASL " + AuthMethod.DIGEST.getMechanismName() - + " client to authenticate to service at " + token.getService()); - saslClient = createDigestSaslClient(new String[] { AuthMethod.DIGEST.getMechanismName() }, - SaslUtil.SASL_DEFAULT_REALM, new HBaseSaslRpcClient.SaslClientCallbackHandler(token)); - break; - case KERBEROS: - if (LOG.isDebugEnabled()) { - LOG.debug("Creating SASL " + AuthMethod.KERBEROS.getMechanismName() - + " client. Server's Kerberos principal name is " + serverPrincipal); - } - if (serverPrincipal == null || serverPrincipal.isEmpty()) { - throw new IOException("Failed to specify server's Kerberos principal name"); - } - String[] names = SaslUtil.splitKerberosName(serverPrincipal); - if (names.length != 3) { - throw new IOException( - "Kerberos principal does not have the expected format: " + serverPrincipal); - } - saslClient = createKerberosSaslClient(new String[] { AuthMethod.KERBEROS.getMechanismName() }, - names[0], names[1]); - break; - default: - throw new IOException("Unknown authentication method " + method); - } - if (saslClient == null) { - throw new IOException("Unable to find SASL client implementation"); - } - } - - /** - * Create a Digest Sasl client - */ - protected SaslClient createDigestSaslClient(String[] mechanismNames, String saslDefaultRealm, - CallbackHandler saslClientCallbackHandler) throws IOException { - return Sasl.createSaslClient(mechanismNames, null, null, saslDefaultRealm, saslProps, - saslClientCallbackHandler); - } - - /** - * Create Kerberos client - * - * @param userFirstPart first part of username - * @param userSecondPart second part of username - */ - protected SaslClient createKerberosSaslClient(String[] mechanismNames, String userFirstPart, - String userSecondPart) throws IOException { - return Sasl - .createSaslClient(mechanismNames, null, userFirstPart, userSecondPart, saslProps, - null); - } - - @Override - public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { - saslClient.dispose(); - } - - private byte[] evaluateChallenge(final byte[] challenge) throws Exception { - return ticket.doAs(new PrivilegedExceptionAction<byte[]>() { - - @Override - public byte[] run() throws Exception { - return saslClient.evaluateChallenge(challenge); - } - }); - } - - @Override - public void handlerAdded(final ChannelHandlerContext ctx) throws Exception { - saslToken = new byte[0]; - if (saslClient.hasInitialResponse()) { - saslToken = evaluateChallenge(saslToken); - } - if (saslToken != null) { - writeSaslToken(ctx, saslToken); - if (LOG.isDebugEnabled()) { - LOG.debug("Have sent token of size " + saslToken.length + " from initSASLContext."); - } - } - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - ByteBuf in = (ByteBuf) msg; - - // If not complete, try to negotiate - if (!saslClient.isComplete()) { - while (!saslClient.isComplete() && in.isReadable()) { - readStatus(in); - int len = in.readInt(); - if (firstRead) { - firstRead = false; - if (len == SaslUtil.SWITCH_TO_SIMPLE_AUTH) { - if (!fallbackAllowed) { - throw new IOException("Server asks us to fall back to SIMPLE auth, " + "but this " - + "client is configured to only allow secure connections."); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Server asks us to fall back to simple auth."); - } - saslClient.dispose(); - - ctx.pipeline().remove(this); - successfulConnectHandler.onSuccess(ctx.channel()); - return; - } - } - saslToken = new byte[len]; - if (LOG.isDebugEnabled()) { - LOG.debug("Will read input token of size " + saslToken.length - + " for processing by initSASLContext"); - } - in.readBytes(saslToken); - - saslToken = evaluateChallenge(saslToken); - if (saslToken != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Will send token of size " + saslToken.length + " from initSASLContext."); - } - writeSaslToken(ctx, saslToken); - } - } - // release the memory - in.release(); - - if (saslClient.isComplete()) { - String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP); - - if (LOG.isDebugEnabled()) { - LOG.debug("SASL client context established. Negotiated QoP: " + qop); - } - - boolean useWrap = qop != null && !"auth".equalsIgnoreCase(qop); - - if (!useWrap) { - ctx.pipeline().remove(this); - successfulConnectHandler.onSuccess(ctx.channel()); - } else { - byte[] wrappedCH = saslClient.wrap(connectionHeader, 0, connectionHeader.length); - // write connection header - writeSaslToken(ctx, wrappedCH); - successfulConnectHandler.onSaslProtectionSucess(ctx.channel()); - } - } - } - // Normal wrapped reading - else { - try { - int length = in.readInt(); - if (LOG.isDebugEnabled()) { - LOG.debug("Actual length is " + length); - } - saslToken = new byte[length]; - in.readBytes(saslToken); - // release the memory - in.release(); - } catch (IndexOutOfBoundsException e) { - return; - } - try { - ByteBuf b = ctx.channel().alloc().buffer(saslToken.length); - - b.writeBytes(saslClient.unwrap(saslToken, 0, saslToken.length)); - ctx.fireChannelRead(b); - - } catch (SaslException se) { - try { - saslClient.dispose(); - } catch (SaslException ignored) { - LOG.debug("Ignoring SASL exception", ignored); - } - throw se; - } - } - } - - private void writeSaslToken(final ChannelHandlerContext ctx, byte[] saslToken) { - ByteBuf b = ctx.alloc().buffer(4 + saslToken.length); - b.writeInt(saslToken.length); - b.writeBytes(saslToken, 0, saslToken.length); - ctx.writeAndFlush(b).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - exceptionCaught(ctx, future.cause()); - } - } - }); - } - - /** - * Get the read status - */ - private static void readStatus(ByteBuf inStream) throws RemoteException { - int status = inStream.readInt(); // read status - if (status != SaslStatus.SUCCESS.state) { - throw new RemoteException(inStream.toString(Charset.forName("UTF-8")), - inStream.toString(Charset.forName("UTF-8"))); - } - } - - @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) - throws Exception { - saslClient.dispose(); - - ctx.close(); - - if (this.random == null) { - this.random = new Random(); - } - exceptionHandler.handle(this.retryCount++, this.random, cause); - } - - @Override - public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) - throws Exception { - // If not complete, try to negotiate - if (!saslClient.isComplete()) { - super.write(ctx, msg, promise); - } else { - ByteBuf in = (ByteBuf) msg; - byte[] unwrapped = new byte[in.readableBytes()]; - in.readBytes(unwrapped); - // release the memory - in.release(); - - try { - saslToken = saslClient.wrap(unwrapped, 0, unwrapped.length); - } catch (SaslException se) { - try { - saslClient.dispose(); - } catch (SaslException ignored) { - LOG.debug("Ignoring SASL exception", ignored); - } - promise.setFailure(se); - } - if (saslToken != null) { - ByteBuf out = ctx.channel().alloc().buffer(4 + saslToken.length); - out.writeInt(saslToken.length); - out.writeBytes(saslToken, 0, saslToken.length); - - ctx.write(out).addListener(new ChannelFutureListener() { - @Override public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - exceptionCaught(ctx, future.cause()); - } - } - }); - - saslToken = null; - } - } - } - - /** - * Handler for exceptions during Sasl connection - */ - public interface SaslExceptionHandler { - /** - * Handle the exception - * - * @param retryCount current retry count - * @param random to create new backoff with - */ - public void handle(int retryCount, Random random, Throwable cause); - } - - /** - * Handler for successful connects - */ - public interface SaslSuccessfulConnectHandler { - /** - * Runs on success - * - * @param channel which is successfully authenticated - */ - public void onSuccess(Channel channel); - - /** - * Runs on success if data protection used in Sasl - * - * @param channel which is successfully authenticated - */ - public void onSaslProtectionSucess(Channel channel); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java new file mode 100644 index 0000000..c2faf91 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.security; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; + +import javax.security.sasl.SaslClient; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Unwrap sasl messages. Should be placed after a + * {@link io.netty.handler.codec.LengthFieldBasedFrameDecoder} + */ +@InterfaceAudience.Private +public class SaslUnwrapHandler extends SimpleChannelInboundHandler<ByteBuf> { + + private final SaslClient saslClient; + + public SaslUnwrapHandler(SaslClient saslClient) { + this.saslClient = saslClient; + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + SaslUtil.safeDispose(saslClient); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { + byte[] bytes = new byte[msg.readableBytes()]; + msg.readBytes(bytes); + ctx.fireChannelRead(Unpooled.wrappedBuffer(saslClient.unwrap(bytes, 0, bytes.length))); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUtil.java index b505fc0..aaa9d7a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUtil.java @@ -22,6 +22,8 @@ import java.util.Map; import java.util.TreeMap; import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; import org.apache.commons.codec.binary.Base64; import org.apache.commons.logging.Log; @@ -30,7 +32,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; @InterfaceAudience.Private public class SaslUtil { - private static final Log log = LogFactory.getLog(SaslUtil.class); + private static final Log LOG = LogFactory.getLog(SaslUtil.class); public static final String SASL_DEFAULT_REALM = "default"; public static final int SWITCH_TO_SIMPLE_AUTH = -88; @@ -51,7 +53,7 @@ public class SaslUtil { public boolean matches(String stringQop) { if (saslQop.equals(stringQop)) { - log.warn("Use authentication/integrity/privacy as value for rpc protection " + LOG.warn("Use authentication/integrity/privacy as value for rpc protection " + "configurations instead of auth/auth-int/auth-conf."); return true; } @@ -113,4 +115,12 @@ public class SaslUtil { saslProps.put(Sasl.SERVER_AUTH, "true"); return saslProps; } + + static void safeDispose(SaslClient saslClient) { + try { + saslClient.dispose(); + } catch (SaslException e) { + LOG.error("Error disposing of SASL client", e); + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java new file mode 100644 index 0000000..ddb4ae9 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.security; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; +import io.netty.channel.CoalescingBufferQueue; +import io.netty.util.concurrent.PromiseCombiner; + +import javax.security.sasl.SaslClient; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * wrap sasl messages. + */ +@InterfaceAudience.Private +public class SaslWrapHandler extends ChannelOutboundHandlerAdapter { + + private final SaslClient saslClient; + + private CoalescingBufferQueue queue; + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + queue = new CoalescingBufferQueue(ctx.channel()); + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) + throws Exception { + if (msg instanceof ByteBuf) { + queue.add((ByteBuf) msg, promise); + } else { + ctx.write(msg, promise); + } + } + + public SaslWrapHandler(SaslClient saslClient) { + this.saslClient = saslClient; + } + + @Override + public void flush(ChannelHandlerContext ctx) throws Exception { + if (!queue.isEmpty()) { + ChannelPromise promise = ctx.newPromise(); + int readableBytes = queue.readableBytes(); + ByteBuf buf = queue.remove(readableBytes, promise); + byte[] bytes = new byte[readableBytes]; + buf.readBytes(bytes); + byte[] wrapperBytes = saslClient.wrap(bytes, 0, bytes.length); + ChannelPromise lenPromise = ctx.newPromise(); + ctx.write(ctx.alloc().buffer(4).writeInt(wrapperBytes.length), lenPromise); + ChannelPromise contentPromise = ctx.newPromise(); + ctx.write(Unpooled.wrappedBuffer(wrapperBytes), contentPromise); + PromiseCombiner combiner = new PromiseCombiner(); + combiner.addAll(lenPromise, contentPromise); + combiner.finish(promise); + } + ctx.flush(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java index 60ef357..ccabe66 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java @@ -27,7 +27,6 @@ import org.apache.commons.lang.time.StopWatch; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; @@ -53,11 +52,11 @@ public class TestCellBlockBuilder { private static final Log LOG = LogFactory.getLog(TestCellBlockBuilder.class); - CellBlockBuilder builder; + private CellBlockBuilder builder; @Before public void before() { - this.builder = new CellBlockBuilder(new Configuration()); + this.builder = new CellBlockBuilder(HBaseConfiguration.create()); } @Test @@ -164,9 +163,9 @@ public class TestCellBlockBuilder { + count + ", size=" + size + ", + took=" + timer.getTime() + "ms"); } - private static void timerTest(final CellBlockBuilder builder, final StopWatch timer, final int count, - final int size, final Codec codec, final CompressionCodec compressor, final boolean sized) - throws IOException { + private static void timerTest(final CellBlockBuilder builder, final StopWatch timer, + final int count, final int size, final Codec codec, final CompressionCodec compressor, + final boolean sized) throws IOException { doBuildCellBlockUndoCellBlock(builder, codec, compressor, count, size, sized); } @@ -187,10 +186,10 @@ public class TestCellBlockBuilder { usage(1); } } - CellBlockBuilder buildr = new CellBlockBuilder(HBaseConfiguration.create()); + CellBlockBuilder builder = new CellBlockBuilder(HBaseConfiguration.create()); ((Log4JLogger) CellBlockBuilder.LOG).getLogger().setLevel(Level.ALL); - timerTests(buildr, count, size, new KeyValueCodec(), null); - timerTests(buildr, count, size, new KeyValueCodec(), new DefaultCodec()); - timerTests(buildr, count, size, new KeyValueCodec(), new GzipCodec()); + timerTests(builder, count, size, new KeyValueCodec(), null); + timerTests(builder, count, size, new KeyValueCodec(), new DefaultCodec()); + timerTests(builder, count, size, new KeyValueCodec(), new GzipCodec()); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientDeprecatedNameMapping.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientDeprecatedNameMapping.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientDeprecatedNameMapping.java new file mode 100644 index 0000000..7ac5c2e --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientDeprecatedNameMapping.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.junit.Assert.assertThat; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ClientTests.class, SmallTests.class }) +public class TestRpcClientDeprecatedNameMapping { + + @Test + public void test() { + Configuration conf = HBaseConfiguration.create(); + conf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, BlockingRpcClient.class.getName()); + try (RpcClient client = RpcClientFactory.createClient(conf, HConstants.CLUSTER_ID_DEFAULT)) { + assertThat(client, instanceOf(BlockingRpcClient.class)); + } + conf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, + "org.apache.hadoop.hbase.ipc.RpcClientImpl"); + try (RpcClient client = RpcClientFactory.createClient(conf, HConstants.CLUSTER_ID_DEFAULT)) { + assertThat(client, instanceOf(BlockingRpcClient.class)); + } + conf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, NettyRpcClient.class.getName()); + try (RpcClient client = RpcClientFactory.createClient(conf, HConstants.CLUSTER_ID_DEFAULT)) { + assertThat(client, instanceOf(NettyRpcClient.class)); + } + conf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, + "org.apache.hadoop.hbase.ipc.AsyncRpcClient"); + try (RpcClient client = RpcClientFactory.createClient(conf, HConstants.CLUSTER_ID_DEFAULT)) { + assertThat(client, instanceOf(NettyRpcClient.class)); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java index 0e3aeab..12b3661 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java @@ -27,6 +27,8 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.common.base.Strings; + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -37,14 +39,14 @@ import javax.security.auth.callback.NameCallback; import javax.security.auth.callback.PasswordCallback; import javax.security.auth.callback.TextOutputCallback; import javax.security.auth.callback.UnsupportedCallbackException; -import javax.security.sasl.Sasl; import javax.security.sasl.RealmCallback; import javax.security.sasl.RealmChoiceCallback; +import javax.security.sasl.Sasl; import javax.security.sasl.SaslClient; +import org.apache.hadoop.hbase.security.AbstractHBaseSaslRpcClient.SaslClientCallbackHandler; import org.apache.hadoop.hbase.testclassification.SecurityTests; import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.security.HBaseSaslRpcClient.SaslClientCallbackHandler; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.security.token.Token; @@ -58,8 +60,6 @@ import org.junit.experimental.categories.Category; import org.junit.rules.ExpectedException; import org.mockito.Mockito; -import com.google.common.base.Strings; - @Category({SecurityTests.class, SmallTests.class}) public class TestHBaseSaslRpcClient { http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java ---------------------------------------------------------------------- diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java index 28c19ad..f41efc7 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java @@ -47,7 +47,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; @@ -93,15 +92,13 @@ public class IntegrationTestRpcClient { } } - protected AbstractRpcClient createRpcClient(Configuration conf, boolean isSyncClient) { - return isSyncClient ? - new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT) : - new AsyncRpcClient(conf) { - @Override - Codec getCodec() { - return null; - } - }; + protected AbstractRpcClient<?> createRpcClient(Configuration conf, boolean isSyncClient) { + return isSyncClient ? new BlockingRpcClient(conf) : new NettyRpcClient(conf) { + @Override + Codec getCodec() { + return null; + } + }; } static String BIG_PAYLOAD; @@ -258,7 +255,7 @@ public class IntegrationTestRpcClient { } static class SimpleClient extends Thread { - AbstractRpcClient rpcClient; + AbstractRpcClient<?> rpcClient; AtomicBoolean running = new AtomicBoolean(true); AtomicBoolean sending = new AtomicBoolean(false); AtomicReference<Throwable> exception = new AtomicReference<>(null); @@ -267,7 +264,7 @@ public class IntegrationTestRpcClient { long numCalls = 0; Random random = new Random(); - public SimpleClient(Cluster cluster, AbstractRpcClient rpcClient, String id) { + public SimpleClient(Cluster cluster, AbstractRpcClient<?> rpcClient, String id) { this.cluster = cluster; this.rpcClient = rpcClient; this.id = id; @@ -327,7 +324,7 @@ public class IntegrationTestRpcClient { cluster.startServer(); conf.setBoolean(SPECIFIC_WRITE_THREAD, true); for(int i = 0; i <1000; i++) { - AbstractRpcClient rpcClient = createRpcClient(conf, true); + AbstractRpcClient<?> rpcClient = createRpcClient(conf, true); SimpleClient client = new SimpleClient(cluster, rpcClient, "Client1"); client.start(); while(!client.isSending()) { @@ -419,7 +416,7 @@ public class IntegrationTestRpcClient { ArrayList<SimpleClient> clients = new ArrayList<>(); // all threads should share the same rpc client - AbstractRpcClient rpcClient = createRpcClient(conf, isSyncClient); + AbstractRpcClient<?> rpcClient = createRpcClient(conf, isSyncClient); for (int i = 0; i < 30; i++) { String clientId = "client_" + i + "_"; http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index f611796..0df5097 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -157,7 +157,7 @@ import com.google.protobuf.TextFormat; * CallRunner#run executes the call. When done, asks the included Call to put itself on new * queue for Responder to pull from and return result to client. * - * @see RpcClientImpl + * @see BlockingRpcClient */ @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) @InterfaceStability.Evolving http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index 663535b..f97dfb4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -863,6 +863,7 @@ public class ServerManager { } long expiration = timeout + System.currentTimeMillis(); while (System.currentTimeMillis() < expiration) { + controller.reset(); try { HRegionInfo rsRegion = ProtobufUtil.getRegionInfo(controller, rs, region.getRegionName()); http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MultiHConnection.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MultiHConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MultiHConnection.java index 1688874..5934e07 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MultiHConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MultiHConnection.java @@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.coprocessor.Batch; -import org.apache.hadoop.hbase.ipc.AsyncRpcClient; /** * Provides ability to create multiple Connection instances and allows to process a batch of http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java index f49c558..0349ca5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.SocketTimeoutException; import java.net.UnknownHostException; @@ -36,7 +37,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.ipc.AbstractRpcClient; import org.apache.hadoop.hbase.ipc.RpcClientFactory; -import org.apache.hadoop.hbase.ipc.RpcClientImpl; +import org.apache.hadoop.hbase.ipc.BlockingRpcClient; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -130,7 +131,7 @@ public class TestClientTimeouts { /** * Rpc Channel implementation with RandomTimeoutBlockingRpcChannel */ - public static class RandomTimeoutRpcClient extends RpcClientImpl { + public static class RandomTimeoutRpcClient extends BlockingRpcClient { public RandomTimeoutRpcClient(Configuration conf, String clusterId, SocketAddress localAddr, MetricsConnection metrics) { super(conf, clusterId, localAddr, metrics); @@ -153,9 +154,9 @@ public class TestClientTimeouts { public static final double CHANCE_OF_TIMEOUT = 0.3; private static AtomicInteger invokations = new AtomicInteger(); - RandomTimeoutBlockingRpcChannel(final RpcClientImpl rpcClient, final ServerName sn, - final User ticket, final int rpcTimeout) throws UnknownHostException { - super(rpcClient, sn, ticket, rpcTimeout); + RandomTimeoutBlockingRpcChannel(final BlockingRpcClient rpcClient, final ServerName sn, + final User ticket, final int rpcTimeout) { + super(rpcClient, new InetSocketAddress(sn.getHostname(), sn.getPort()), ticket, rpcTimeout); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java index c1c9b1e..aac020d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java @@ -170,7 +170,7 @@ public class TestRpcControllerFactory { ResultScanner scan = table.getScanner(fam1); scan.next(); scan.close(); - counter = verifyCount(counter); + counter = verifyCount(counter + 2); Get g2 = new Get(row); table.get(Lists.newArrayList(g, g2)); @@ -189,7 +189,7 @@ public class TestRpcControllerFactory { // reversed, regular scanInfo.setSmall(false); - counter = doScan(table, scanInfo, counter); + counter = doScan(table, scanInfo, counter + 2); table.close(); connection.close(); http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java index 2dbe6b0..a8ea4ee 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java @@ -19,7 +19,9 @@ package org.apache.hadoop.hbase.ipc; import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub; +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newStub; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -50,9 +52,12 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.compress.GzipCodec; @@ -93,7 +98,7 @@ public abstract class AbstractTestIPC { } } - protected abstract AbstractRpcClient createRpcClientNoCodec(Configuration conf); + protected abstract AbstractRpcClient<?> createRpcClientNoCodec(Configuration conf); /** * Ensure we do not HAVE TO HAVE a codec. @@ -102,7 +107,7 @@ public abstract class AbstractTestIPC { public void testNoCodec() throws IOException, ServiceException { Configuration conf = HBaseConfiguration.create(); TestRpcServer rpcServer = new TestRpcServer(); - try (AbstractRpcClient client = createRpcClientNoCodec(conf)) { + try (AbstractRpcClient<?> client = createRpcClientNoCodec(conf)) { rpcServer.start(); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); HBaseRpcController pcrc = new HBaseRpcControllerImpl(); @@ -115,7 +120,7 @@ public abstract class AbstractTestIPC { } } - protected abstract AbstractRpcClient createRpcClient(Configuration conf); + protected abstract AbstractRpcClient<?> createRpcClient(Configuration conf); /** * It is hard to verify the compression is actually happening under the wraps. Hope that if @@ -132,7 +137,7 @@ public abstract class AbstractTestIPC { cells.add(CELL); } TestRpcServer rpcServer = new TestRpcServer(); - try (AbstractRpcClient client = createRpcClient(conf)) { + try (AbstractRpcClient<?> client = createRpcClient(conf)) { rpcServer.start(); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); HBaseRpcController pcrc = new HBaseRpcControllerImpl(CellUtil.createCellScanner(cells)); @@ -152,14 +157,14 @@ public abstract class AbstractTestIPC { } } - protected abstract AbstractRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf) - throws IOException; + protected abstract AbstractRpcClient<?> createRpcClientRTEDuringConnectionSetup( + Configuration conf) throws IOException; @Test public void testRTEDuringConnectionSetup() throws Exception { Configuration conf = HBaseConfiguration.create(); TestRpcServer rpcServer = new TestRpcServer(); - try (AbstractRpcClient client = createRpcClientRTEDuringConnectionSetup(conf)) { + try (AbstractRpcClient<?> client = createRpcClientRTEDuringConnectionSetup(conf)) { rpcServer.start(); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); stub.ping(null, EmptyRequestProto.getDefaultInstance()); @@ -180,7 +185,7 @@ public abstract class AbstractTestIPC { RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1)); RpcServer rpcServer = new TestRpcServer(scheduler, CONF); verify(scheduler).init((RpcScheduler.Context) anyObject()); - try (AbstractRpcClient client = createRpcClient(CONF)) { + try (AbstractRpcClient<?> client = createRpcClient(CONF)) { rpcServer.start(); verify(scheduler).start(); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); @@ -201,7 +206,7 @@ public abstract class AbstractTestIPC { Configuration conf = new Configuration(CONF); conf.setInt(RpcServer.MAX_REQUEST_SIZE, 100); RpcServer rpcServer = new TestRpcServer(conf); - try (AbstractRpcClient client = createRpcClient(conf)) { + try (AbstractRpcClient<?> client = createRpcClient(conf)) { rpcServer.start(); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); StringBuilder message = new StringBuilder(120); @@ -232,7 +237,7 @@ public abstract class AbstractTestIPC { throws IOException, ServiceException { TestRpcServer rpcServer = new TestRpcServer(); InetSocketAddress localAddr = new InetSocketAddress("localhost", 0); - try (AbstractRpcClient client = createRpcClient(CONF)) { + try (AbstractRpcClient<?> client = createRpcClient(CONF)) { rpcServer.start(); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); assertEquals(localAddr.getAddress().getHostAddress(), @@ -245,12 +250,12 @@ public abstract class AbstractTestIPC { @Test public void testRemoteError() throws IOException, ServiceException { TestRpcServer rpcServer = new TestRpcServer(); - try (AbstractRpcClient client = createRpcClient(CONF)) { + try (AbstractRpcClient<?> client = createRpcClient(CONF)) { rpcServer.start(); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); stub.error(null, EmptyRequestProto.getDefaultInstance()); } catch (ServiceException e) { - LOG.info("Caught expected exception: " + e.getMessage()); + LOG.info("Caught expected exception: " + e); IOException ioe = ProtobufUtil.handleRemoteException(e); assertTrue(ioe instanceof DoNotRetryIOException); assertTrue(ioe.getMessage().contains("server error!")); @@ -262,7 +267,7 @@ public abstract class AbstractTestIPC { @Test public void testTimeout() throws IOException { TestRpcServer rpcServer = new TestRpcServer(); - try (AbstractRpcClient client = createRpcClient(CONF)) { + try (AbstractRpcClient<?> client = createRpcClient(CONF)) { rpcServer.start(); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); HBaseRpcController pcrc = new HBaseRpcControllerImpl(); @@ -277,7 +282,7 @@ public abstract class AbstractTestIPC { } catch (ServiceException e) { long waitTime = (System.nanoTime() - startTime) / 1000000; // expected - LOG.info("Caught expected exception: " + e.getMessage()); + LOG.info("Caught expected exception: " + e); IOException ioe = ProtobufUtil.handleRemoteException(e); assertTrue(ioe.getCause() instanceof CallTimeoutException); // confirm that we got exception before the actual pause. @@ -327,7 +332,7 @@ public abstract class AbstractTestIPC { public void testConnectionCloseWithOutstandingRPCs() throws InterruptedException, IOException { Configuration conf = new Configuration(CONF); RpcServer rpcServer = new TestFailingRpcServer(conf); - try (AbstractRpcClient client = createRpcClient(conf)) { + try (AbstractRpcClient<?> client = createRpcClient(conf)) { rpcServer.start(); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); @@ -339,4 +344,90 @@ public abstract class AbstractTestIPC { rpcServer.stop(); } } + + @Test + public void testAsyncEcho() throws IOException { + Configuration conf = HBaseConfiguration.create(); + TestRpcServer rpcServer = new TestRpcServer(); + try (AbstractRpcClient<?> client = createRpcClient(conf)) { + rpcServer.start(); + Interface stub = newStub(client, rpcServer.getListenerAddress()); + int num = 10; + List<HBaseRpcController> pcrcList = new ArrayList<>(); + List<BlockingRpcCallback<EchoResponseProto>> callbackList = new ArrayList<>(); + for (int i = 0; i < num; i++) { + HBaseRpcController pcrc = new HBaseRpcControllerImpl(); + BlockingRpcCallback<EchoResponseProto> done = new BlockingRpcCallback<>(); + stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage("hello-" + i).build(), done); + pcrcList.add(pcrc); + callbackList.add(done); + } + for (int i = 0; i < num; i++) { + HBaseRpcController pcrc = pcrcList.get(i); + assertFalse(pcrc.failed()); + assertNull(pcrc.cellScanner()); + assertEquals("hello-" + i, callbackList.get(i).get().getMessage()); + } + } finally { + rpcServer.stop(); + } + } + + @Test + public void testAsyncRemoteError() throws IOException { + AbstractRpcClient<?> client = createRpcClient(CONF); + TestRpcServer rpcServer = new TestRpcServer(); + try { + rpcServer.start(); + Interface stub = newStub(client, rpcServer.getListenerAddress()); + BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>(); + HBaseRpcController pcrc = new HBaseRpcControllerImpl(); + stub.error(pcrc, EmptyRequestProto.getDefaultInstance(), callback); + assertNull(callback.get()); + assertTrue(pcrc.failed()); + LOG.info("Caught expected exception: " + pcrc.getFailed()); + IOException ioe = ProtobufUtil.handleRemoteException(pcrc.getFailed()); + assertTrue(ioe instanceof DoNotRetryIOException); + assertTrue(ioe.getMessage().contains("server error!")); + } finally { + client.close(); + rpcServer.stop(); + } + } + + @Test + public void testAsyncTimeout() throws IOException { + TestRpcServer rpcServer = new TestRpcServer(); + try (AbstractRpcClient<?> client = createRpcClient(CONF)) { + rpcServer.start(); + Interface stub = newStub(client, rpcServer.getListenerAddress()); + List<HBaseRpcController> pcrcList = new ArrayList<>(); + List<BlockingRpcCallback<EmptyResponseProto>> callbackList = new ArrayList<>(); + int ms = 1000; + int timeout = 100; + long startTime = System.nanoTime(); + for (int i = 0; i < 10; i++) { + HBaseRpcController pcrc = new HBaseRpcControllerImpl(); + pcrc.setCallTimeout(timeout); + BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>(); + stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(ms).build(), callback); + pcrcList.add(pcrc); + callbackList.add(callback); + } + for (BlockingRpcCallback<?> callback : callbackList) { + assertNull(callback.get()); + } + long waitTime = (System.nanoTime() - startTime) / 1000000; + for (HBaseRpcController pcrc : pcrcList) { + assertTrue(pcrc.failed()); + LOG.info("Caught expected exception: " + pcrc.getFailed()); + IOException ioe = ProtobufUtil.handleRemoteException(pcrc.getFailed()); + assertTrue(ioe.getCause() instanceof CallTimeoutException); + } + // confirm that we got exception before the actual pause. + assertTrue(waitTime < ms); + } finally { + rpcServer.stop(); + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java deleted file mode 100644 index 565f5bf..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.ipc; - -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOutboundHandlerAdapter; -import io.netty.channel.ChannelPromise; -import io.netty.channel.epoll.EpollEventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.codec.Codec; -import org.apache.hadoop.hbase.testclassification.RPCTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; - -@RunWith(Parameterized.class) -@Category({ RPCTests.class, SmallTests.class }) -public class TestAsyncIPC extends AbstractTestIPC { - - @Parameters - public static Collection<Object[]> parameters() { - List<Object[]> paramList = new ArrayList<>(); - paramList.add(new Object[] { false, false }); - paramList.add(new Object[] { false, true }); - paramList.add(new Object[] { true, false }); - paramList.add(new Object[] { true, true }); - return paramList; - } - - private final boolean useNativeTransport; - - private final boolean useGlobalEventLoopGroup; - - public TestAsyncIPC(boolean useNativeTransport, boolean useGlobalEventLoopGroup) { - this.useNativeTransport = useNativeTransport; - this.useGlobalEventLoopGroup = useGlobalEventLoopGroup; - } - - private void setConf(Configuration conf) { - conf.setBoolean(AsyncRpcClient.USE_NATIVE_TRANSPORT, useNativeTransport); - conf.setBoolean(AsyncRpcClient.USE_GLOBAL_EVENT_LOOP_GROUP, useGlobalEventLoopGroup); - if (useGlobalEventLoopGroup && AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP != null) { - if (useNativeTransport - && !(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst() instanceof EpollEventLoopGroup) - || (!useNativeTransport && !(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP - .getFirst() instanceof NioEventLoopGroup))) { - AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst().shutdownGracefully(); - AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP = null; - } - } - } - - @Override - protected AsyncRpcClient createRpcClientNoCodec(Configuration conf) { - setConf(conf); - return new AsyncRpcClient(conf) { - - @Override - Codec getCodec() { - return null; - } - - }; - } - - @Override - protected AsyncRpcClient createRpcClient(Configuration conf) { - setConf(conf); - return new AsyncRpcClient(conf); - } - - @Override - protected AsyncRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf) { - setConf(conf); - return new AsyncRpcClient(conf, new ChannelInitializer<SocketChannel>() { - @Override - protected void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addFirst(new ChannelOutboundHandlerAdapter() { - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) - throws Exception { - promise.setFailure(new RuntimeException("Injected fault")); - } - }); - } - }); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java new file mode 100644 index 0000000..98efcfb --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java @@ -0,0 +1,58 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.codec.Codec; +import org.apache.hadoop.hbase.testclassification.RPCTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.experimental.categories.Category; + +@Category({ RPCTests.class, SmallTests.class }) +public class TestBlockingIPC extends AbstractTestIPC { + + @Override + protected BlockingRpcClient createRpcClientNoCodec(Configuration conf) { + return new BlockingRpcClient(conf) { + @Override + Codec getCodec() { + return null; + } + }; + } + + @Override + protected BlockingRpcClient createRpcClient(Configuration conf) { + return new BlockingRpcClient(conf); + } + + @Override + protected BlockingRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf) + throws IOException { + return new BlockingRpcClient(conf) { + + @Override + boolean isTcpNoDelay() { + throw new RuntimeException("Injected fault"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java deleted file mode 100644 index 12bc35c..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.ipc; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertSame; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.testclassification.RPCTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category({ RPCTests.class, SmallTests.class }) -public class TestGlobalEventLoopGroup { - - @Test - public void test() { - Configuration conf = HBaseConfiguration.create(); - conf.setBoolean(AsyncRpcClient.USE_GLOBAL_EVENT_LOOP_GROUP, true); - AsyncRpcClient client = new AsyncRpcClient(conf); - assertNotNull(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP); - AsyncRpcClient client1 = new AsyncRpcClient(conf); - assertSame(client.bootstrap.group(), client1.bootstrap.group()); - client1.close(); - assertFalse(client.bootstrap.group().isShuttingDown()); - - conf.setBoolean(AsyncRpcClient.USE_GLOBAL_EVENT_LOOP_GROUP, false); - AsyncRpcClient client2 = new AsyncRpcClient(conf); - assertNotSame(client.bootstrap.group(), client2.bootstrap.group()); - client2.close(); - - client.close(); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java deleted file mode 100644 index b88cb7a..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.ipc; - -import static org.mockito.Matchers.anyInt; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.spy; - -import java.io.IOException; -import java.net.Socket; - -import javax.net.SocketFactory; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.codec.Codec; -import org.apache.hadoop.hbase.testclassification.RPCTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.net.NetUtils; -import org.junit.experimental.categories.Category; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -@Category({ RPCTests.class, SmallTests.class }) -public class TestIPC extends AbstractTestIPC { - - @Override - protected RpcClientImpl createRpcClientNoCodec(Configuration conf) { - return new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT) { - @Override - Codec getCodec() { - return null; - } - }; - } - - @Override - protected RpcClientImpl createRpcClient(Configuration conf) { - return new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT); - } - - @Override - protected RpcClientImpl createRpcClientRTEDuringConnectionSetup(Configuration conf) - throws IOException { - SocketFactory spyFactory = spy(NetUtils.getDefaultSocketFactory(conf)); - Mockito.doAnswer(new Answer<Socket>() { - @Override - public Socket answer(InvocationOnMock invocation) throws Throwable { - Socket s = spy((Socket) invocation.callRealMethod()); - doThrow(new RuntimeException("Injected fault")).when(s).setSoTimeout(anyInt()); - return s; - } - }).when(spyFactory).createSocket(); - - return new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT, spyFactory); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java new file mode 100644 index 0000000..3b32383 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollSocketChannel; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.codec.Codec; +import org.apache.hadoop.hbase.testclassification.RPCTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.JVM; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +@Category({ RPCTests.class, SmallTests.class }) +public class TestNettyIPC extends AbstractTestIPC { + + @Parameters(name = "{index}: EventLoop={0}") + public static Collection<Object[]> parameters() { + List<Object[]> params = new ArrayList<>(); + params.add(new Object[] { "nio" }); + params.add(new Object[] { "perClientNio" }); + if (JVM.isLinux() && JVM.isAmd64()) { + params.add(new Object[] { "epoll" }); + } + return params; + } + + @Parameter + public String eventLoopType; + + private static NioEventLoopGroup NIO; + + private static EpollEventLoopGroup EPOLL; + + @BeforeClass + public static void setUpBeforeClass() { + NIO = new NioEventLoopGroup(); + if (JVM.isLinux() && JVM.isAmd64()) { + EPOLL = new EpollEventLoopGroup(); + } + } + + @AfterClass + public static void tearDownAfterClass() { + if (NIO != null) { + NIO.shutdownGracefully(); + } + if (EPOLL != null) { + EPOLL.shutdownGracefully(); + } + } + + private void setConf(Configuration conf) { + switch (eventLoopType) { + case "nio": + NettyRpcClientConfigHelper.setEventLoopConfig(conf, NIO, NioSocketChannel.class); + break; + case "epoll": + NettyRpcClientConfigHelper.setEventLoopConfig(conf, EPOLL, EpollSocketChannel.class); + break; + case "perClientNio": + NettyRpcClientConfigHelper.createEventLoopPerClient(conf); + break; + default: + break; + } + } + + @Override + protected NettyRpcClient createRpcClientNoCodec(Configuration conf) { + setConf(conf); + return new NettyRpcClient(conf) { + + @Override + Codec getCodec() { + return null; + } + + }; + } + + @Override + protected NettyRpcClient createRpcClient(Configuration conf) { + setConf(conf); + return new NettyRpcClient(conf); + } + + @Override + protected NettyRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf) { + setConf(conf); + return new NettyRpcClient(conf) { + + @Override + boolean isTcpNoDelay() { + throw new RuntimeException("Injected fault"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java index ae658a3..6354123 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java @@ -62,7 +62,7 @@ public class TestProtobufRpcServiceImpl implements BlockingInterface { } public static Interface newStub(RpcClient client, InetSocketAddress addr) throws IOException { - return TestProtobufRpcProto.newStub(client.createProtobufRpcChannel( + return TestProtobufRpcProto.newStub(client.createRpcChannel( ServerName.valueOf(addr.getHostName(), addr.getPort(), System.currentTimeMillis()), User.getCurrent(), 0)); }