http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java index bb6763f..3d88115 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java @@ -18,6 +18,17 @@ package org.apache.hadoop.hbase.security; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslException; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -28,123 +39,23 @@ import org.apache.hadoop.security.SaslOutputStream; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; -import javax.security.auth.callback.Callback; -import javax.security.auth.callback.CallbackHandler; -import javax.security.auth.callback.NameCallback; -import javax.security.auth.callback.PasswordCallback; -import javax.security.auth.callback.UnsupportedCallbackException; -import javax.security.sasl.RealmCallback; -import javax.security.sasl.RealmChoiceCallback; -import javax.security.sasl.Sasl; -import javax.security.sasl.SaslClient; -import javax.security.sasl.SaslException; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -import com.google.common.annotations.VisibleForTesting; - /** - * A utility class that encapsulates SASL logic for RPC client. - * Copied from <code>org.apache.hadoop.security</code> + * A utility class that encapsulates SASL logic for RPC client. Copied from + * <code>org.apache.hadoop.security</code> */ @InterfaceAudience.Private -public class HBaseSaslRpcClient { - private static final Log LOG = LogFactory.getLog(HBaseSaslRpcClient.class); +public class HBaseSaslRpcClient extends AbstractHBaseSaslRpcClient { - private final SaslClient saslClient; - private final boolean fallbackAllowed; - /** - * Create a HBaseSaslRpcClient for an authentication method - * - * @param method - * the requested authentication method - * @param token - * token to use if needed by the authentication method - * @param serverPrincipal - * the server principal that we are trying to set the connection up to - * @param fallbackAllowed - * does the client allow fallback to simple authentication - * @throws IOException - */ - public HBaseSaslRpcClient(AuthMethod method, - Token<? extends TokenIdentifier> token, String serverPrincipal, boolean fallbackAllowed) - throws IOException { - this(method, token, serverPrincipal, fallbackAllowed, "authentication"); - } - /** - * Create a HBaseSaslRpcClient for an authentication method - * - * @param method - * the requested authentication method - * @param token - * token to use if needed by the authentication method - * @param serverPrincipal - * the server principal that we are trying to set the connection up to - * @param fallbackAllowed - * does the client allow fallback to simple authentication - * @param rpcProtection - * the protection level ("authentication", "integrity" or "privacy") - * @throws IOException - */ - public HBaseSaslRpcClient(AuthMethod method, - Token<? extends TokenIdentifier> token, String serverPrincipal, boolean fallbackAllowed, - String rpcProtection) throws IOException { - this.fallbackAllowed = fallbackAllowed; - SaslUtil.initSaslProperties(rpcProtection); - switch (method) { - case DIGEST: - if (LOG.isDebugEnabled()) - LOG.debug("Creating SASL " + AuthMethod.DIGEST.getMechanismName() - + " client to authenticate to service at " + token.getService()); - saslClient = createDigestSaslClient( - new String[] { AuthMethod.DIGEST.getMechanismName() }, - SaslUtil.SASL_DEFAULT_REALM, new SaslClientCallbackHandler(token)); - break; - case KERBEROS: - if (LOG.isDebugEnabled()) { - LOG - .debug("Creating SASL " + AuthMethod.KERBEROS.getMechanismName() - + " client. Server's Kerberos principal name is " - + serverPrincipal); - } - if (serverPrincipal == null || serverPrincipal.length() == 0) { - throw new IOException( - "Failed to specify server's Kerberos principal name"); - } - String[] names = SaslUtil.splitKerberosName(serverPrincipal); - if (names.length != 3) { - throw new IOException( - "Kerberos principal does not have the expected format: " - + serverPrincipal); - } - saslClient = createKerberosSaslClient( - new String[] { AuthMethod.KERBEROS.getMechanismName() }, - names[0], names[1]); - break; - default: - throw new IOException("Unknown authentication method " + method); - } - if (saslClient == null) - throw new IOException("Unable to find SASL client implementation"); - } + private static final Log LOG = LogFactory.getLog(HBaseSaslRpcClient.class); - protected SaslClient createDigestSaslClient(String[] mechanismNames, - String saslDefaultRealm, CallbackHandler saslClientCallbackHandler) - throws IOException { - return Sasl.createSaslClient(mechanismNames, null, null, saslDefaultRealm, - SaslUtil.SASL_PROPS, saslClientCallbackHandler); + public HBaseSaslRpcClient(AuthMethod method, Token<? extends TokenIdentifier> token, + String serverPrincipal, boolean fallbackAllowed) throws IOException { + super(method, token, serverPrincipal, fallbackAllowed); } - protected SaslClient createKerberosSaslClient(String[] mechanismNames, - String userFirstPart, String userSecondPart) throws IOException { - return Sasl.createSaslClient(mechanismNames, null, userFirstPart, - userSecondPart, SaslUtil.SASL_PROPS, null); + public HBaseSaslRpcClient(AuthMethod method, Token<? extends TokenIdentifier> token, + String serverPrincipal, boolean fallbackAllowed, String rpcProtection) throws IOException { + super(method, token, serverPrincipal, fallbackAllowed, rpcProtection); } private static void readStatus(DataInputStream inStream) throws IOException { @@ -156,72 +67,65 @@ public class HBaseSaslRpcClient { } /** - * Do client side SASL authentication with server via the given InputStream - * and OutputStream - * - * @param inS - * InputStream to use - * @param outS - * OutputStream to use - * @return true if connection is set up, or false if needs to switch - * to simple Auth. + * Do client side SASL authentication with server via the given InputStream and OutputStream + * @param inS InputStream to use + * @param outS OutputStream to use + * @return true if connection is set up, or false if needs to switch to simple Auth. * @throws IOException */ - public boolean saslConnect(InputStream inS, OutputStream outS) - throws IOException { + public boolean saslConnect(InputStream inS, OutputStream outS) throws IOException { DataInputStream inStream = new DataInputStream(new BufferedInputStream(inS)); - DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream( - outS)); + DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(outS)); try { - byte[] saslToken = new byte[0]; - if (saslClient.hasInitialResponse()) - saslToken = saslClient.evaluateChallenge(saslToken); + byte[] saslToken = getInitialResponse(); if (saslToken != null) { outStream.writeInt(saslToken.length); outStream.write(saslToken, 0, saslToken.length); outStream.flush(); - if (LOG.isDebugEnabled()) - LOG.debug("Have sent token of size " + saslToken.length - + " from initSASLContext."); + if (LOG.isDebugEnabled()) { + LOG.debug("Have sent token of size " + saslToken.length + " from initSASLContext."); + } } - if (!saslClient.isComplete()) { + if (!isComplete()) { readStatus(inStream); int len = inStream.readInt(); if (len == SaslUtil.SWITCH_TO_SIMPLE_AUTH) { if (!fallbackAllowed) { - throw new IOException("Server asks us to fall back to SIMPLE auth, " + - "but this client is configured to only allow secure connections."); + throw new IOException("Server asks us to fall back to SIMPLE auth, " + + "but this client is configured to only allow secure connections."); } if (LOG.isDebugEnabled()) { LOG.debug("Server asks us to fall back to simple auth."); } - saslClient.dispose(); + dispose(); return false; } saslToken = new byte[len]; - if (LOG.isDebugEnabled()) + if (LOG.isDebugEnabled()) { LOG.debug("Will read input token of size " + saslToken.length + " for processing by initSASLContext"); + } inStream.readFully(saslToken); } - while (!saslClient.isComplete()) { - saslToken = saslClient.evaluateChallenge(saslToken); + while (!isComplete()) { + saslToken = evaluateChallenge(saslToken); if (saslToken != null) { - if (LOG.isDebugEnabled()) - LOG.debug("Will send token of size " + saslToken.length - + " from initSASLContext."); + if (LOG.isDebugEnabled()) { + LOG.debug("Will send token of size " + saslToken.length + " from initSASLContext."); + } outStream.writeInt(saslToken.length); outStream.write(saslToken, 0, saslToken.length); outStream.flush(); } - if (!saslClient.isComplete()) { + if (!isComplete()) { readStatus(inStream); saslToken = new byte[inStream.readInt()]; - if (LOG.isDebugEnabled()) + if (LOG.isDebugEnabled()) { LOG.debug("Will read input token of size " + saslToken.length + " for processing by initSASLContext"); + } inStream.readFully(saslToken); } } @@ -241,11 +145,8 @@ public class HBaseSaslRpcClient { } /** - * Get a SASL wrapped InputStream. Can be called only after saslConnect() has - * been called. - * - * @param in - * the InputStream to wrap + * Get a SASL wrapped InputStream. Can be called only after saslConnect() has been called. + * @param in the InputStream to wrap * @return a SASL wrapped InputStream * @throws IOException */ @@ -257,11 +158,8 @@ public class HBaseSaslRpcClient { } /** - * Get a SASL wrapped OutputStream. Can be called only after saslConnect() has - * been called. - * - * @param out - * the OutputStream to wrap + * Get a SASL wrapped OutputStream. Can be called only after saslConnect() has been called. + * @param out the OutputStream to wrap * @return a SASL wrapped OutputStream * @throws IOException */ @@ -271,58 +169,4 @@ public class HBaseSaslRpcClient { } return new SaslOutputStream(out, saslClient); } - - /** Release resources used by wrapped saslClient */ - public void dispose() throws SaslException { - saslClient.dispose(); - } - - @VisibleForTesting - static class SaslClientCallbackHandler implements CallbackHandler { - private final String userName; - private final char[] userPassword; - - public SaslClientCallbackHandler(Token<? extends TokenIdentifier> token) { - this.userName = SaslUtil.encodeIdentifier(token.getIdentifier()); - this.userPassword = SaslUtil.encodePassword(token.getPassword()); - } - - @Override - public void handle(Callback[] callbacks) - throws UnsupportedCallbackException { - NameCallback nc = null; - PasswordCallback pc = null; - RealmCallback rc = null; - for (Callback callback : callbacks) { - if (callback instanceof RealmChoiceCallback) { - continue; - } else if (callback instanceof NameCallback) { - nc = (NameCallback) callback; - } else if (callback instanceof PasswordCallback) { - pc = (PasswordCallback) callback; - } else if (callback instanceof RealmCallback) { - rc = (RealmCallback) callback; - } else { - throw new UnsupportedCallbackException(callback, - "Unrecognized SASL client callback"); - } - } - if (nc != null) { - if (LOG.isDebugEnabled()) - LOG.debug("SASL client callback: setting username: " + userName); - nc.setName(userName); - } - if (pc != null) { - if (LOG.isDebugEnabled()) - LOG.debug("SASL client callback: setting userPassword"); - pc.setPassword(userPassword); - } - if (rc != null) { - if (LOG.isDebugEnabled()) - LOG.debug("SASL client callback: setting realm: " - + rc.getDefaultText()); - rc.setText(rc.getDefaultText()); - } - } - } -} +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java new file mode 100644 index 0000000..f624608 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.security; + +import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; + +import java.io.IOException; + +import javax.security.sasl.Sasl; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; + +/** + * Implement SASL logic for netty rpc client. + */ [email protected] +public class NettyHBaseSaslRpcClient extends AbstractHBaseSaslRpcClient { + private static final Log LOG = LogFactory.getLog(NettyHBaseSaslRpcClient.class); + + public NettyHBaseSaslRpcClient(AuthMethod method, Token<? extends TokenIdentifier> token, + String serverPrincipal, boolean fallbackAllowed, String rpcProtection) throws IOException { + super(method, token, serverPrincipal, fallbackAllowed, rpcProtection); + } + + public void setupSaslHandler(ChannelPipeline p) { + String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP); + if (LOG.isDebugEnabled()) { + LOG.debug("SASL client context established. Negotiated QoP: " + qop); + } + if (qop == null || "auth".equalsIgnoreCase(qop)) { + return; + } + // add wrap and unwrap handlers to pipeline. + p.addFirst(new SaslWrapHandler(saslClient), + new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4), + new SaslUnwrapHandler(saslClient)); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java new file mode 100644 index 0000000..50609b4 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.security; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.util.concurrent.Promise; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.FallbackDisallowedException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; + +/** + * Implement SASL logic for netty rpc client. + */ [email protected] +public class NettyHBaseSaslRpcClientHandler extends SimpleChannelInboundHandler<ByteBuf> { + + private static final Log LOG = LogFactory.getLog(NettyHBaseSaslRpcClientHandler.class); + + private final Promise<Boolean> saslPromise; + + private final UserGroupInformation ugi; + + private final NettyHBaseSaslRpcClient saslRpcClient; + + /** + * @param saslPromise {@code true} if success, {@code false} if server tells us to fallback to + * simple. + */ + public NettyHBaseSaslRpcClientHandler(Promise<Boolean> saslPromise, UserGroupInformation ugi, + AuthMethod method, Token<? extends TokenIdentifier> token, String serverPrincipal, + boolean fallbackAllowed, String rpcProtection) throws IOException { + this.saslPromise = saslPromise; + this.ugi = ugi; + this.saslRpcClient = new NettyHBaseSaslRpcClient(method, token, serverPrincipal, + fallbackAllowed, rpcProtection); + } + + private void writeResponse(ChannelHandlerContext ctx, byte[] response) { + if (LOG.isDebugEnabled()) { + LOG.debug("Will send token of size " + response.length + " from initSASLContext."); + } + ctx.writeAndFlush( + ctx.alloc().buffer(4 + response.length).writeInt(response.length).writeBytes(response)); + } + + private void tryComplete(ChannelHandlerContext ctx) { + if (!saslRpcClient.isComplete()) { + return; + } + saslRpcClient.setupSaslHandler(ctx.pipeline()); + saslPromise.setSuccess(true); + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) { + try { + byte[] initialResponse = ugi.doAs(new PrivilegedExceptionAction<byte[]>() { + + @Override + public byte[] run() throws Exception { + return saslRpcClient.getInitialResponse(); + } + }); + if (initialResponse != null) { + writeResponse(ctx, initialResponse); + } + tryComplete(ctx); + } catch (Exception e) { + // the exception thrown by handlerAdded will not be passed to the exceptionCaught below + // because netty will remove a handler if handlerAdded throws an exception. + exceptionCaught(ctx, e); + } + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { + int len = msg.readInt(); + if (len == SaslUtil.SWITCH_TO_SIMPLE_AUTH) { + saslRpcClient.dispose(); + if (saslRpcClient.fallbackAllowed) { + saslPromise.trySuccess(false); + } else { + saslPromise.tryFailure(new FallbackDisallowedException()); + } + return; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Will read input token of size " + len + " for processing by initSASLContext"); + } + final byte[] challenge = new byte[len]; + msg.readBytes(challenge); + byte[] response = ugi.doAs(new PrivilegedExceptionAction<byte[]>() { + + @Override + public byte[] run() throws Exception { + return saslRpcClient.evaluateChallenge(challenge); + } + }); + if (response != null) { + writeResponse(ctx, response); + } + tryComplete(ctx); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + saslRpcClient.dispose(); + saslPromise.tryFailure(new IOException("Connection closed")); + ctx.fireChannelInactive(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + saslRpcClient.dispose(); + saslPromise.tryFailure(cause); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslChallengeDecoder.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslChallengeDecoder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslChallengeDecoder.java new file mode 100644 index 0000000..57bb36c --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslChallengeDecoder.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.security; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.ipc.RemoteException; + +/** + * Decode the sasl challenge sent by RpcServer. + */ [email protected] +public class SaslChallengeDecoder extends ByteToMessageDecoder { + + private static final int MAX_CHALLENGE_SIZE = 1024 * 1024; // 1M + + private ByteBuf tryDecodeChallenge(ByteBuf in, int offset, int readableBytes) throws IOException { + if (readableBytes < 4) { + return null; + } + int len = in.getInt(offset); + if (len <= 0) { + // fall back to simple + in.readerIndex(offset + 4); + return in.retainedSlice(offset, 4); + } + if (len > MAX_CHALLENGE_SIZE) { + throw new IOException( + "Sasl challenge too large(" + len + "), max allowed is " + MAX_CHALLENGE_SIZE); + } + int totalLen = 4 + len; + if (readableBytes < totalLen) { + return null; + } + in.readerIndex(offset + totalLen); + return in.retainedSlice(offset, totalLen); + } + + // will throw a RemoteException out if data is enough, so do not need to return anything. + private void tryDecodeError(ByteBuf in, int offset, int readableBytes) throws IOException { + if (readableBytes < 4) { + return; + } + int classLen = in.getInt(offset); + if (classLen <= 0) { + throw new IOException("Invalid exception class name length " + classLen); + } + if (classLen > MAX_CHALLENGE_SIZE) { + throw new IOException("Exception class name length too large(" + classLen + + "), max allowed is " + MAX_CHALLENGE_SIZE); + } + if (readableBytes < 4 + classLen + 4) { + return; + } + int msgLen = in.getInt(offset + 4 + classLen); + if (msgLen <= 0) { + throw new IOException("Invalid exception message length " + msgLen); + } + if (msgLen > MAX_CHALLENGE_SIZE) { + throw new IOException("Exception message length too large(" + msgLen + "), max allowed is " + + MAX_CHALLENGE_SIZE); + } + int totalLen = classLen + msgLen + 8; + if (readableBytes < totalLen) { + return; + } + String className = in.toString(offset + 4, classLen, HConstants.UTF8_CHARSET); + String msg = in.toString(offset + classLen + 8, msgLen, HConstants.UTF8_CHARSET); + in.readerIndex(offset + totalLen); + throw new RemoteException(className, msg); + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { + int readableBytes = in.readableBytes(); + if (readableBytes < 4) { + return; + } + int offset = in.readerIndex(); + int status = in.getInt(offset); + if (status == SaslStatus.SUCCESS.state) { + ByteBuf challenge = tryDecodeChallenge(in, offset + 4, readableBytes - 4); + if (challenge != null) { + out.add(challenge); + } + } else { + tryDecodeError(in, offset + 4, readableBytes - 4); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java deleted file mode 100644 index 0f11083..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java +++ /dev/null @@ -1,401 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.security; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.Channel; -import io.netty.channel.ChannelDuplexHandler; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPromise; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; - -import javax.security.auth.callback.CallbackHandler; -import javax.security.sasl.Sasl; -import javax.security.sasl.SaslClient; -import javax.security.sasl.SaslException; - -import java.io.IOException; -import java.nio.charset.Charset; -import java.security.PrivilegedExceptionAction; -import java.util.Random; - -/** - * Handles Sasl connections - */ [email protected] -public class SaslClientHandler extends ChannelDuplexHandler { - private static final Log LOG = LogFactory.getLog(SaslClientHandler.class); - - private final boolean fallbackAllowed; - - private final UserGroupInformation ticket; - - /** - * Used for client or server's token to send or receive from each other. - */ - private final SaslClient saslClient; - private final SaslExceptionHandler exceptionHandler; - private final SaslSuccessfulConnectHandler successfulConnectHandler; - private byte[] saslToken; - private byte[] connectionHeader; - private boolean firstRead = true; - - private int retryCount = 0; - private Random random; - - /** - * Constructor - * - * @param ticket the ugi - * @param method auth method - * @param token for Sasl - * @param serverPrincipal Server's Kerberos principal name - * @param fallbackAllowed True if server may also fall back to less secure connection - * @param rpcProtection Quality of protection. Can be 'authentication', 'integrity' or - * 'privacy'. - * @param exceptionHandler handler for exceptions - * @param successfulConnectHandler handler for succesful connects - * @throws java.io.IOException if handler could not be created - */ - public SaslClientHandler(UserGroupInformation ticket, AuthMethod method, - Token<? extends TokenIdentifier> token, String serverPrincipal, boolean fallbackAllowed, - String rpcProtection, byte[] connectionHeader, SaslExceptionHandler exceptionHandler, - SaslSuccessfulConnectHandler successfulConnectHandler) throws IOException { - this.ticket = ticket; - this.fallbackAllowed = fallbackAllowed; - this.connectionHeader = connectionHeader; - - this.exceptionHandler = exceptionHandler; - this.successfulConnectHandler = successfulConnectHandler; - - SaslUtil.initSaslProperties(rpcProtection); - switch (method) { - case DIGEST: - if (LOG.isDebugEnabled()) - LOG.debug("Creating SASL " + AuthMethod.DIGEST.getMechanismName() - + " client to authenticate to service at " + token.getService()); - saslClient = createDigestSaslClient(new String[] { AuthMethod.DIGEST.getMechanismName() }, - SaslUtil.SASL_DEFAULT_REALM, new HBaseSaslRpcClient.SaslClientCallbackHandler(token)); - break; - case KERBEROS: - if (LOG.isDebugEnabled()) { - LOG.debug("Creating SASL " + AuthMethod.KERBEROS.getMechanismName() - + " client. Server's Kerberos principal name is " + serverPrincipal); - } - if (serverPrincipal == null || serverPrincipal.isEmpty()) { - throw new IOException("Failed to specify server's Kerberos principal name"); - } - String[] names = SaslUtil.splitKerberosName(serverPrincipal); - if (names.length != 3) { - throw new IOException( - "Kerberos principal does not have the expected format: " + serverPrincipal); - } - saslClient = createKerberosSaslClient(new String[] { AuthMethod.KERBEROS.getMechanismName() }, - names[0], names[1]); - break; - default: - throw new IOException("Unknown authentication method " + method); - } - if (saslClient == null) { - throw new IOException("Unable to find SASL client implementation"); - } - } - - /** - * Create a Digest Sasl client - * - * @param mechanismNames names of mechanisms - * @param saslDefaultRealm default realm for sasl - * @param saslClientCallbackHandler handler for the client - * @return new SaslClient - * @throws java.io.IOException if creation went wrong - */ - protected SaslClient createDigestSaslClient(String[] mechanismNames, String saslDefaultRealm, - CallbackHandler saslClientCallbackHandler) throws IOException { - return Sasl.createSaslClient(mechanismNames, null, null, saslDefaultRealm, SaslUtil.SASL_PROPS, - saslClientCallbackHandler); - } - - /** - * Create Kerberos client - * - * @param mechanismNames names of mechanisms - * @param userFirstPart first part of username - * @param userSecondPart second part of username - * @return new SaslClient - * @throws java.io.IOException if fails - */ - protected SaslClient createKerberosSaslClient(String[] mechanismNames, String userFirstPart, - String userSecondPart) throws IOException { - return Sasl - .createSaslClient(mechanismNames, null, userFirstPart, userSecondPart, SaslUtil.SASL_PROPS, - null); - } - - @Override - public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { - saslClient.dispose(); - } - - private byte[] evaluateChallenge(final byte[] challenge) throws Exception { - return ticket.doAs(new PrivilegedExceptionAction<byte[]>() { - - @Override - public byte[] run() throws Exception { - return saslClient.evaluateChallenge(challenge); - } - }); - } - - @Override - public void handlerAdded(final ChannelHandlerContext ctx) throws Exception { - saslToken = new byte[0]; - if (saslClient.hasInitialResponse()) { - saslToken = evaluateChallenge(saslToken); - } - if (saslToken != null) { - writeSaslToken(ctx, saslToken); - if (LOG.isDebugEnabled()) { - LOG.debug("Have sent token of size " + saslToken.length + " from initSASLContext."); - } - } - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - ByteBuf in = (ByteBuf) msg; - - // If not complete, try to negotiate - if (!saslClient.isComplete()) { - while (!saslClient.isComplete() && in.isReadable()) { - readStatus(in); - int len = in.readInt(); - if (firstRead) { - firstRead = false; - if (len == SaslUtil.SWITCH_TO_SIMPLE_AUTH) { - if (!fallbackAllowed) { - throw new IOException("Server asks us to fall back to SIMPLE auth, " + "but this " - + "client is configured to only allow secure connections."); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Server asks us to fall back to simple auth."); - } - saslClient.dispose(); - - ctx.pipeline().remove(this); - successfulConnectHandler.onSuccess(ctx.channel()); - return; - } - } - saslToken = new byte[len]; - if (LOG.isDebugEnabled()) { - LOG.debug("Will read input token of size " + saslToken.length - + " for processing by initSASLContext"); - } - in.readBytes(saslToken); - - saslToken = evaluateChallenge(saslToken); - if (saslToken != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Will send token of size " + saslToken.length + " from initSASLContext."); - } - writeSaslToken(ctx, saslToken); - } - } - // release the memory - in.release(); - - if (saslClient.isComplete()) { - String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP); - - if (LOG.isDebugEnabled()) { - LOG.debug("SASL client context established. Negotiated QoP: " + qop); - } - - boolean useWrap = qop != null && !"auth".equalsIgnoreCase(qop); - - if (!useWrap) { - ctx.pipeline().remove(this); - successfulConnectHandler.onSuccess(ctx.channel()); - } else { - byte[] wrappedCH = saslClient.wrap(connectionHeader, 0, connectionHeader.length); - // write connection header - writeSaslToken(ctx, wrappedCH); - successfulConnectHandler.onSaslProtectionSucess(ctx.channel()); - } - } - } - // Normal wrapped reading - else { - try { - int length = in.readInt(); - if (LOG.isDebugEnabled()) { - LOG.debug("Actual length is " + length); - } - saslToken = new byte[length]; - in.readBytes(saslToken); - // release the memory - in.release(); - } catch (IndexOutOfBoundsException e) { - return; - } - try { - ByteBuf b = ctx.channel().alloc().buffer(saslToken.length); - - b.writeBytes(saslClient.unwrap(saslToken, 0, saslToken.length)); - ctx.fireChannelRead(b); - - } catch (SaslException se) { - try { - saslClient.dispose(); - } catch (SaslException ignored) { - LOG.debug("Ignoring SASL exception", ignored); - } - throw se; - } - } - } - - /** - * Write SASL token - * @param ctx to write to - * @param saslToken to write - */ - private void writeSaslToken(final ChannelHandlerContext ctx, byte[] saslToken) { - ByteBuf b = ctx.alloc().buffer(4 + saslToken.length); - b.writeInt(saslToken.length); - b.writeBytes(saslToken, 0, saslToken.length); - ctx.writeAndFlush(b).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - exceptionCaught(ctx, future.cause()); - } - } - }); - } - - /** - * Get the read status - * - * @param inStream to read - * @throws org.apache.hadoop.ipc.RemoteException if status was not success - */ - private static void readStatus(ByteBuf inStream) throws RemoteException { - int status = inStream.readInt(); // read status - if (status != SaslStatus.SUCCESS.state) { - throw new RemoteException(inStream.toString(Charset.forName("UTF-8")), - inStream.toString(Charset.forName("UTF-8"))); - } - } - - @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) - throws Exception { - saslClient.dispose(); - - ctx.close(); - - if (this.random == null) { - this.random = new Random(); - } - exceptionHandler.handle(this.retryCount++, this.random, cause); - } - - @Override - public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) - throws Exception { - // If not complete, try to negotiate - if (!saslClient.isComplete()) { - super.write(ctx, msg, promise); - } else { - ByteBuf in = (ByteBuf) msg; - byte[] unwrapped = new byte[in.readableBytes()]; - in.readBytes(unwrapped); - // release the memory - in.release(); - - try { - saslToken = saslClient.wrap(unwrapped, 0, unwrapped.length); - } catch (SaslException se) { - try { - saslClient.dispose(); - } catch (SaslException ignored) { - LOG.debug("Ignoring SASL exception", ignored); - } - promise.setFailure(se); - } - if (saslToken != null) { - ByteBuf out = ctx.channel().alloc().buffer(4 + saslToken.length); - out.writeInt(saslToken.length); - out.writeBytes(saslToken, 0, saslToken.length); - - ctx.write(out).addListener(new ChannelFutureListener() { - @Override public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - exceptionCaught(ctx, future.cause()); - } - } - }); - - saslToken = null; - } - } - } - - /** - * Handler for exceptions during Sasl connection - */ - public interface SaslExceptionHandler { - /** - * Handle the exception - * - * @param retryCount current retry count - * @param random to create new backoff with - * @param cause of fail - */ - public void handle(int retryCount, Random random, Throwable cause); - } - - /** - * Handler for successful connects - */ - public interface SaslSuccessfulConnectHandler { - /** - * Runs on success - * - * @param channel which is successfully authenticated - */ - public void onSuccess(Channel channel); - - /** - * Runs on success if data protection used in Sasl - * - * @param channel which is successfully authenticated - */ - public void onSaslProtectionSucess(Channel channel); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java new file mode 100644 index 0000000..e631478 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.security; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; + +import javax.security.sasl.SaslClient; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Unwrap sasl messages. Should be placed after a + * {@link io.netty.handler.codec.LengthFieldBasedFrameDecoder} + */ [email protected] +public class SaslUnwrapHandler extends SimpleChannelInboundHandler<ByteBuf> { + + private final SaslClient saslClient; + + public SaslUnwrapHandler(SaslClient saslClient) { + this.saslClient = saslClient; + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + SaslUtil.safeDispose(saslClient); + ctx.fireChannelInactive(); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { + byte[] bytes = new byte[msg.readableBytes()]; + msg.readBytes(bytes); + ctx.fireChannelRead(Unpooled.wrappedBuffer(saslClient.unwrap(bytes, 0, bytes.length))); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUtil.java index cfc4088..aaa9d7a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUtil.java @@ -18,14 +18,12 @@ */ package org.apache.hadoop.hbase.security; -import org.apache.commons.codec.binary.Base64; -import org.apache.hadoop.hbase.classification.InterfaceAudience; - -import java.util.Locale; import java.util.Map; import java.util.TreeMap; import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; import org.apache.commons.codec.binary.Base64; import org.apache.commons.logging.Log; @@ -34,26 +32,33 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; @InterfaceAudience.Private public class SaslUtil { - private static final Log log = LogFactory.getLog(SaslUtil.class); + private static final Log LOG = LogFactory.getLog(SaslUtil.class); public static final String SASL_DEFAULT_REALM = "default"; - public static final Map<String, String> SASL_PROPS = - new TreeMap<String, String>(); public static final int SWITCH_TO_SIMPLE_AUTH = -88; - public static enum QualityOfProtection { + public enum QualityOfProtection { AUTHENTICATION("auth"), INTEGRITY("auth-int"), PRIVACY("auth-conf"); - public final String saslQop; + private final String saslQop; - private QualityOfProtection(String saslQop) { + QualityOfProtection(String saslQop) { this.saslQop = saslQop; } public String getSaslQop() { return saslQop; } + + public boolean matches(String stringQop) { + if (saslQop.equals(stringQop)) { + LOG.warn("Use authentication/integrity/privacy as value for rpc protection " + + "configurations instead of auth/auth-int/auth-conf."); + return true; + } + return name().equalsIgnoreCase(stringQop); + } } /** Splitting fully qualified Kerberos name into parts */ @@ -75,40 +80,47 @@ public class SaslUtil { /** * Returns {@link org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection} - * corresponding to the given {@code stringQop} value. Returns null if value is - * invalid. + * corresponding to the given {@code stringQop} value. + * @throws IllegalArgumentException If stringQop doesn't match any QOP. */ public static QualityOfProtection getQop(String stringQop) { - QualityOfProtection qop = null; - if (QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT).equals(stringQop) - || QualityOfProtection.AUTHENTICATION.saslQop.equals(stringQop)) { - qop = QualityOfProtection.AUTHENTICATION; - } else if (QualityOfProtection.INTEGRITY.name().toLowerCase(Locale.ROOT).equals(stringQop) - || QualityOfProtection.INTEGRITY.saslQop.equals(stringQop)) { - qop = QualityOfProtection.INTEGRITY; - } else if (QualityOfProtection.PRIVACY.name().toLowerCase(Locale.ROOT).equals(stringQop) - || QualityOfProtection.PRIVACY.saslQop.equals(stringQop)) { - qop = QualityOfProtection.PRIVACY; + for (QualityOfProtection qop : QualityOfProtection.values()) { + if (qop.matches(stringQop)) { + return qop; + } } - if (qop == null) { - throw new IllegalArgumentException("Invalid qop: " + stringQop - + ". It must be one of 'authentication', 'integrity', 'privacy'."); - } - if (QualityOfProtection.AUTHENTICATION.saslQop.equals(stringQop) - || QualityOfProtection.INTEGRITY.saslQop.equals(stringQop) - || QualityOfProtection.PRIVACY.saslQop.equals(stringQop)) { - log.warn("Use authentication/integrity/privacy as value for rpc protection " - + "configurations instead of auth/auth-int/auth-conf."); + throw new IllegalArgumentException("Invalid qop: " + stringQop + + ". It must be one of 'authentication', 'integrity', 'privacy'."); + } + + /** + * @param rpcProtection Value of 'hbase.rpc.protection' configuration. + * @return Map with values for SASL properties. + */ + static Map<String, String> initSaslProperties(String rpcProtection) { + String saslQop; + if (rpcProtection.isEmpty()) { + saslQop = QualityOfProtection.AUTHENTICATION.getSaslQop(); + } else { + String[] qops = rpcProtection.split(","); + StringBuilder saslQopBuilder = new StringBuilder(); + for (int i = 0; i < qops.length; ++i) { + QualityOfProtection qop = getQop(qops[i]); + saslQopBuilder.append(",").append(qop.getSaslQop()); + } + saslQop = saslQopBuilder.substring(1); // remove first ',' } - return qop; + Map<String, String> saslProps = new TreeMap<>(); + saslProps.put(Sasl.QOP, saslQop); + saslProps.put(Sasl.SERVER_AUTH, "true"); + return saslProps; } - static void initSaslProperties(String rpcProtection) { - QualityOfProtection saslQOP = getQop(rpcProtection); - if (saslQOP == null) { - saslQOP = QualityOfProtection.AUTHENTICATION; + static void safeDispose(SaslClient saslClient) { + try { + saslClient.dispose(); + } catch (SaslException e) { + LOG.error("Error disposing of SASL client", e); } - SaslUtil.SASL_PROPS.put(Sasl.QOP, saslQOP.getSaslQop()); - SaslUtil.SASL_PROPS.put(Sasl.SERVER_AUTH, "true"); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java new file mode 100644 index 0000000..14ecf2e --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.security; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; +import io.netty.channel.CoalescingBufferQueue; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.PromiseCombiner; + +import java.io.IOException; + +import javax.security.sasl.SaslClient; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * wrap sasl messages. + */ [email protected] +public class SaslWrapHandler extends ChannelOutboundHandlerAdapter { + + private final SaslClient saslClient; + + private CoalescingBufferQueue queue; + + public SaslWrapHandler(SaslClient saslClient) { + this.saslClient = saslClient; + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + queue = new CoalescingBufferQueue(ctx.channel()); + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) + throws Exception { + if (msg instanceof ByteBuf) { + queue.add((ByteBuf) msg, promise); + } else { + ctx.write(msg, promise); + } + } + + @Override + public void flush(ChannelHandlerContext ctx) throws Exception { + if (queue.isEmpty()) { + return; + } + ByteBuf buf = null; + try { + ChannelPromise promise = ctx.newPromise(); + int readableBytes = queue.readableBytes(); + buf = queue.remove(readableBytes, promise); + byte[] bytes = new byte[readableBytes]; + buf.readBytes(bytes); + byte[] wrapperBytes = saslClient.wrap(bytes, 0, bytes.length); + ChannelPromise lenPromise = ctx.newPromise(); + ctx.write(ctx.alloc().buffer(4).writeInt(wrapperBytes.length), lenPromise); + ChannelPromise contentPromise = ctx.newPromise(); + ctx.write(Unpooled.wrappedBuffer(wrapperBytes), contentPromise); + PromiseCombiner combiner = new PromiseCombiner(); + combiner.addAll(lenPromise, contentPromise); + combiner.finish(promise); + ctx.flush(); + } finally { + if (buf != null) { + ReferenceCountUtil.safeRelease(buf); + } + } + } + + @Override + public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + if (!queue.isEmpty()) { + queue.releaseAndFailAll(new IOException("Connection closed")); + } + ctx.close(promise); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java index 5799aaf..2f9d921 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java @@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.security.SecurityCapability; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService.BlockingInterface; @@ -99,7 +99,7 @@ public class AccessControlClient { public static void grant(Connection connection, final TableName tableName, final String userName, final byte[] family, final byte[] qual, boolean mergeExistingPermissions, final Permission.Action... actions) throws Throwable { - PayloadCarryingRpcController controller = + HBaseRpcController controller = ((ClusterConnection) connection).getRpcControllerFactory().newController(); controller.setPriority(tableName); try (Table table = connection.getTable(ACL_TABLE_NAME)) { @@ -139,7 +139,7 @@ public class AccessControlClient { public static void grant(final Connection connection, final String namespace, final String userName, boolean mergeExistingPermissions, final Permission.Action... actions) throws Throwable { - PayloadCarryingRpcController controller = + HBaseRpcController controller = ((ClusterConnection) connection).getRpcControllerFactory().newController(); try (Table table = connection.getTable(ACL_TABLE_NAME)) { @@ -174,7 +174,7 @@ public class AccessControlClient { */ public static void grant(final Connection connection, final String userName, boolean mergeExistingPermissions, final Permission.Action... actions) throws Throwable { - PayloadCarryingRpcController controller = + HBaseRpcController controller = ((ClusterConnection) connection).getRpcControllerFactory().newController(); try (Table table = connection.getTable(ACL_TABLE_NAME)) { ProtobufUtil.grant(controller, getAccessControlServiceStub(table), userName, @@ -214,7 +214,7 @@ public class AccessControlClient { public static void revoke(final Connection connection, final TableName tableName, final String username, final byte[] family, final byte[] qualifier, final Permission.Action... actions) throws Throwable { - PayloadCarryingRpcController controller + HBaseRpcController controller = ((ClusterConnection) connection).getRpcControllerFactory().newController(); controller.setPriority(tableName); try (Table table = connection.getTable(ACL_TABLE_NAME)) { @@ -233,7 +233,7 @@ public class AccessControlClient { */ public static void revoke(final Connection connection, final String namespace, final String userName, final Permission.Action... actions) throws Throwable { - PayloadCarryingRpcController controller + HBaseRpcController controller = ((ClusterConnection) connection).getRpcControllerFactory().newController(); try (Table table = connection.getTable(ACL_TABLE_NAME)) { ProtobufUtil.revoke(controller, getAccessControlServiceStub(table), userName, namespace, @@ -247,7 +247,7 @@ public class AccessControlClient { */ public static void revoke(final Connection connection, final String userName, final Permission.Action... actions) throws Throwable { - PayloadCarryingRpcController controller + HBaseRpcController controller = ((ClusterConnection) connection).getRpcControllerFactory().newController(); try (Table table = connection.getTable(ACL_TABLE_NAME)) { ProtobufUtil.revoke(controller, getAccessControlServiceStub(table), userName, actions); @@ -263,7 +263,7 @@ public class AccessControlClient { */ public static List<UserPermission> getUserPermissions(Connection connection, String tableRegex) throws Throwable { - PayloadCarryingRpcController controller + HBaseRpcController controller = ((ClusterConnection) connection).getRpcControllerFactory().newController(); List<UserPermission> permList = new ArrayList<UserPermission>(); try (Table table = connection.getTable(ACL_TABLE_NAME)) { http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java index 1630d83..f70e0de 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hbase.zookeeper; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.RpcController; + import java.io.EOFException; import java.io.IOException; import java.net.ConnectException; @@ -43,7 +46,6 @@ import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.ipc.FailedServerException; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -58,8 +60,6 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.ipc.RemoteException; import org.apache.zookeeper.KeeperException; -import com.google.protobuf.InvalidProtocolBufferException; - /** * Utility class to perform operation (get/wait for/verify/set/delete) on znode in ZooKeeper * which keeps hbase:meta region server location. @@ -319,7 +319,7 @@ public class MetaTableLocator { return false; } Throwable t; - PayloadCarryingRpcController controller = null; + RpcController controller = null; if (connection instanceof ClusterConnection) { controller = ((ClusterConnection) connection).getRpcControllerFactory().newController(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromAdmin.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromAdmin.java index 8aa8007..6385c27 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromAdmin.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromAdmin.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import com.google.protobuf.RpcController; + import java.io.IOException; import org.apache.commons.logging.Log; @@ -27,21 +29,19 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotResponse; +import org.apache.hadoop.hbase.testclassification.SmallTests; import org.junit.Test; import org.junit.experimental.categories.Category; import org.mockito.Mockito; -import com.google.protobuf.RpcController; - /** * Test snapshot logic from the client */ @@ -88,7 +88,7 @@ public class TestSnapshotFromAdmin { RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf); RpcControllerFactory controllerFactory = Mockito.mock(RpcControllerFactory.class); Mockito.when(controllerFactory.newController()).thenReturn( - Mockito.mock(PayloadCarryingRpcController.class)); + Mockito.mock(HBaseRpcController.class)); Mockito.when(mockConnection.getRpcRetryingCallerFactory()).thenReturn(callerFactory); Mockito.when(mockConnection.getRpcControllerFactory()).thenReturn(controllerFactory); // set the max wait time for the snapshot to complete @@ -136,7 +136,7 @@ public class TestSnapshotFromAdmin { RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf); RpcControllerFactory controllerFactory = Mockito.mock(RpcControllerFactory.class); Mockito.when(controllerFactory.newController()).thenReturn( - Mockito.mock(PayloadCarryingRpcController.class)); + Mockito.mock(HBaseRpcController.class)); Mockito.when(mockConnection.getRpcRetryingCallerFactory()).thenReturn(callerFactory); Mockito.when(mockConnection.getRpcControllerFactory()).thenReturn(controllerFactory); Admin admin = new HBaseAdmin(mockConnection); http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java new file mode 100644 index 0000000..6506347 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; + +import org.apache.commons.lang.time.StopWatch; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.codec.Codec; +import org.apache.hadoop.hbase.codec.KeyValueCodec; +import org.apache.hadoop.hbase.io.SizedCellScanner; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.DefaultCodec; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.log4j.Level; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ClientTests.class, SmallTests.class }) +public class TestCellBlockBuilder { + + private static final Log LOG = LogFactory.getLog(TestCellBlockBuilder.class); + + private CellBlockBuilder builder; + + @Before + public void before() { + this.builder = new CellBlockBuilder(HBaseConfiguration.create()); + } + + @Test + public void testBuildCellBlock() throws IOException { + doBuildCellBlockUndoCellBlock(this.builder, new KeyValueCodec(), null); + doBuildCellBlockUndoCellBlock(this.builder, new KeyValueCodec(), new DefaultCodec()); + doBuildCellBlockUndoCellBlock(this.builder, new KeyValueCodec(), new GzipCodec()); + } + + static void doBuildCellBlockUndoCellBlock(final CellBlockBuilder builder, final Codec codec, + final CompressionCodec compressor) throws IOException { + doBuildCellBlockUndoCellBlock(builder, codec, compressor, 10, 1, false); + } + + static void doBuildCellBlockUndoCellBlock(final CellBlockBuilder builder, final Codec codec, + final CompressionCodec compressor, final int count, final int size, final boolean sized) + throws IOException { + Cell[] cells = getCells(count, size); + CellScanner cellScanner = sized ? getSizedCellScanner(cells) + : CellUtil.createCellScanner(Arrays.asList(cells).iterator()); + ByteBuffer bb = builder.buildCellBlock(codec, compressor, cellScanner); + cellScanner = builder.createCellScanner(codec, compressor, bb); + int i = 0; + while (cellScanner.advance()) { + i++; + } + assertEquals(count, i); + } + + static CellScanner getSizedCellScanner(final Cell[] cells) { + int size = -1; + for (Cell cell : cells) { + size += CellUtil.estimatedSerializedSizeOf(cell); + } + final int totalSize = ClassSize.align(size); + final CellScanner cellScanner = CellUtil.createCellScanner(cells); + return new SizedCellScanner() { + @Override + public long heapSize() { + return totalSize; + } + + @Override + public Cell current() { + return cellScanner.current(); + } + + @Override + public boolean advance() throws IOException { + return cellScanner.advance(); + } + }; + } + + static Cell[] getCells(final int howMany) { + return getCells(howMany, 1024); + } + + static Cell[] getCells(final int howMany, final int valueSize) { + Cell[] cells = new Cell[howMany]; + byte[] value = new byte[valueSize]; + for (int i = 0; i < howMany; i++) { + byte[] index = Bytes.toBytes(i); + KeyValue kv = new KeyValue(index, Bytes.toBytes("f"), index, value); + cells[i] = kv; + } + return cells; + } + + private static final String COUNT = "--count="; + private static final String SIZE = "--size="; + + /** + * Prints usage and then exits w/ passed <code>errCode</code> + * @param errCode + */ + private static void usage(final int errCode) { + System.out.println("Usage: IPCUtil [options]"); + System.out.println("Micro-benchmarking how changed sizes and counts work with buffer resizing"); + System.out.println(" --count Count of Cells"); + System.out.println(" --size Size of Cell values"); + System.out.println("Example: IPCUtil --count=1024 --size=1024"); + System.exit(errCode); + } + + private static void timerTests(final CellBlockBuilder builder, final int count, final int size, + final Codec codec, final CompressionCodec compressor) throws IOException { + final int cycles = 1000; + StopWatch timer = new StopWatch(); + timer.start(); + for (int i = 0; i < cycles; i++) { + timerTest(builder, timer, count, size, codec, compressor, false); + } + timer.stop(); + LOG.info("Codec=" + codec + ", compression=" + compressor + ", sized=" + false + ", count=" + + count + ", size=" + size + ", + took=" + timer.getTime() + "ms"); + timer.reset(); + timer.start(); + for (int i = 0; i < cycles; i++) { + timerTest(builder, timer, count, size, codec, compressor, true); + } + timer.stop(); + LOG.info("Codec=" + codec + ", compression=" + compressor + ", sized=" + true + ", count=" + + count + ", size=" + size + ", + took=" + timer.getTime() + "ms"); + } + + private static void timerTest(final CellBlockBuilder builder, final StopWatch timer, + final int count, final int size, final Codec codec, final CompressionCodec compressor, + final boolean sized) throws IOException { + doBuildCellBlockUndoCellBlock(builder, codec, compressor, count, size, sized); + } + + /** + * For running a few tests of methods herein. + * @param args + * @throws IOException + */ + public static void main(String[] args) throws IOException { + int count = 1024; + int size = 10240; + for (String arg : args) { + if (arg.startsWith(COUNT)) { + count = Integer.parseInt(arg.replace(COUNT, "")); + } else if (arg.startsWith(SIZE)) { + size = Integer.parseInt(arg.replace(SIZE, "")); + } else { + usage(1); + } + } + CellBlockBuilder builder = new CellBlockBuilder(HBaseConfiguration.create()); + ((Log4JLogger) CellBlockBuilder.LOG).getLogger().setLevel(Level.ALL); + timerTests(builder, count, size, new KeyValueCodec(), null); + timerTests(builder, count, size, new KeyValueCodec(), new DefaultCodec()); + timerTests(builder, count, size, new KeyValueCodec(), new GzipCodec()); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestHBaseRpcControllerImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestHBaseRpcControllerImpl.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestHBaseRpcControllerImpl.java new file mode 100644 index 0000000..3b0a6b2 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestHBaseRpcControllerImpl.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScannable; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ClientTests.class, SmallTests.class }) +public class TestHBaseRpcControllerImpl { + + @Test + public void testListOfCellScannerables() throws IOException { + List<CellScannable> cells = new ArrayList<CellScannable>(); + final int count = 10; + for (int i = 0; i < count; i++) { + cells.add(createCell(i)); + } + HBaseRpcController controller = new HBaseRpcControllerImpl(cells); + CellScanner cellScanner = controller.cellScanner(); + int index = 0; + for (; cellScanner.advance(); index++) { + Cell cell = cellScanner.current(); + byte[] indexBytes = Bytes.toBytes(index); + assertTrue("" + index, Bytes.equals(indexBytes, 0, indexBytes.length, cell.getValueArray(), + cell.getValueOffset(), cell.getValueLength())); + } + assertEquals(count, index); + } + + /** + * @param index + * @return A faked out 'Cell' that does nothing but return index as its value + */ + static CellScannable createCell(final int index) { + return new CellScannable() { + @Override + public CellScanner cellScanner() { + return new CellScanner() { + @Override + public Cell current() { + // Fake out a Cell. All this Cell has is a value that is an int in size and equal + // to the above 'index' param serialized as an int. + return new Cell() { + private final int i = index; + + @Override + public byte[] getRowArray() { + // unused + return null; + } + + @Override + public int getRowOffset() { + // unused + return 0; + } + + @Override + public short getRowLength() { + // unused + return 0; + } + + @Override + public byte[] getFamilyArray() { + // unused + return null; + } + + @Override + public int getFamilyOffset() { + // unused + return 0; + } + + @Override + public byte getFamilyLength() { + // unused + return 0; + } + + @Override + public byte[] getQualifierArray() { + // unused + return null; + } + + @Override + public int getQualifierOffset() { + // unused + return 0; + } + + @Override + public int getQualifierLength() { + // unused + return 0; + } + + @Override + public long getTimestamp() { + // unused + return 0; + } + + @Override + public byte getTypeByte() { + // unused + return 0; + } + + @Override + public long getSequenceId() { + // unused + return 0; + } + + @Override + public byte[] getValueArray() { + return Bytes.toBytes(this.i); + } + + @Override + public int getValueOffset() { + return 0; + } + + @Override + public int getValueLength() { + return Bytes.SIZEOF_INT; + } + + @Override + public int getTagsOffset() { + // unused + return 0; + } + + @Override + public int getTagsLength() { + // unused + return 0; + } + + @Override + public byte[] getTagsArray() { + // unused + return null; + } + + @Override + public long getMvccVersion() { + return 0; + } + + @Override + public byte[] getValue() { + return Bytes.toBytes(this.i); + } + + @Override + public byte[] getFamily() { + return null; + } + + @Override + public byte[] getQualifier() { + return null; + } + + @Override + public byte[] getRow() { + return null; + } + }; + } + + private boolean hasCell = true; + + @Override + public boolean advance() { + // We have one Cell only so return true first time then false ever after. + if (!hasCell) return hasCell; + hasCell = false; + return true; + } + }; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java index bb580c8..7c4ac02 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java @@ -17,180 +17,32 @@ */ package org.apache.hadoop.hbase.ipc; -import static org.junit.Assert.assertEquals; +import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException; +import static org.junit.Assert.assertTrue; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Arrays; +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.net.SocketTimeoutException; -import org.apache.commons.lang.time.StopWatch; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.commons.logging.impl.Log4JLogger; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; +import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.codec.Codec; -import org.apache.hadoop.hbase.codec.KeyValueCodec; -import org.apache.hadoop.hbase.io.SizedCellScanner; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.ClassSize; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.DefaultCodec; -import org.apache.hadoop.io.compress.GzipCodec; -import org.apache.log4j.Level; -import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; -@Category(SmallTests.class) +@Category({ ClientTests.class, SmallTests.class }) public class TestIPCUtil { - private static final Log LOG = LogFactory.getLog(TestIPCUtil.class); - - IPCUtil util; - @Before - public void before() { - this.util = new IPCUtil(new Configuration()); - } - @Test - public void testBuildCellBlock() throws IOException { - doBuildCellBlockUndoCellBlock(this.util, new KeyValueCodec(), null); - doBuildCellBlockUndoCellBlock(this.util, new KeyValueCodec(), new DefaultCodec()); - doBuildCellBlockUndoCellBlock(this.util, new KeyValueCodec(), new GzipCodec()); - } - - static void doBuildCellBlockUndoCellBlock(final IPCUtil util, - final Codec codec, final CompressionCodec compressor) - throws IOException { - doBuildCellBlockUndoCellBlock(util, codec, compressor, 10, 1, false); - } - - static void doBuildCellBlockUndoCellBlock(final IPCUtil util, final Codec codec, - final CompressionCodec compressor, final int count, final int size, final boolean sized) - throws IOException { - Cell [] cells = getCells(count, size); - CellScanner cellScanner = sized? getSizedCellScanner(cells): - CellUtil.createCellScanner(Arrays.asList(cells).iterator()); - ByteBuffer bb = util.buildCellBlock(codec, compressor, cellScanner); - cellScanner = util.createCellScanner(codec, compressor, bb); - int i = 0; - while (cellScanner.advance()) { - i++; - } - assertEquals(count, i); - } - - static CellScanner getSizedCellScanner(final Cell [] cells) { - int size = -1; - for (Cell cell: cells) { - size += CellUtil.estimatedSerializedSizeOf(cell); - } - final int totalSize = ClassSize.align(size); - final CellScanner cellScanner = CellUtil.createCellScanner(cells); - return new SizedCellScanner() { - @Override - public long heapSize() { - return totalSize; - } - - @Override - public Cell current() { - return cellScanner.current(); - } - - @Override - public boolean advance() throws IOException { - return cellScanner.advance(); - } - }; - } - - static Cell [] getCells(final int howMany) { - return getCells(howMany, 1024); - } - - static Cell [] getCells(final int howMany, final int valueSize) { - Cell [] cells = new Cell[howMany]; - byte [] value = new byte[valueSize]; - for (int i = 0; i < howMany; i++) { - byte [] index = Bytes.toBytes(i); - KeyValue kv = new KeyValue(index, Bytes.toBytes("f"), index, value); - cells[i] = kv; - } - return cells; - } - - private static final String COUNT = "--count="; - private static final String SIZE = "--size="; - - /** - * Prints usage and then exits w/ passed <code>errCode</code> - * @param errCode - */ - private static void usage(final int errCode) { - System.out.println("Usage: IPCUtil [options]"); - System.out.println("Micro-benchmarking how changed sizes and counts work with buffer resizing"); - System.out.println(" --count Count of Cells"); - System.out.println(" --size Size of Cell values"); - System.out.println("Example: IPCUtil --count=1024 --size=1024"); - System.exit(errCode); - } - - private static void timerTests(final IPCUtil util, final int count, final int size, - final Codec codec, final CompressionCodec compressor) - throws IOException { - final int cycles = 1000; - StopWatch timer = new StopWatch(); - timer.start(); - for (int i = 0; i < cycles; i++) { - timerTest(util, timer, count, size, codec, compressor, false); - } - timer.stop(); - LOG.info("Codec=" + codec + ", compression=" + compressor + ", sized=" + false + - ", count=" + count + ", size=" + size + ", + took=" + timer.getTime() + "ms"); - timer.reset(); - timer.start(); - for (int i = 0; i < cycles; i++) { - timerTest(util, timer, count, size, codec, compressor, true); - } - timer.stop(); - LOG.info("Codec=" + codec + ", compression=" + compressor + ", sized=" + true + - ", count=" + count + ", size=" + size + ", + took=" + timer.getTime() + "ms"); - } - - private static void timerTest(final IPCUtil util, final StopWatch timer, final int count, - final int size, final Codec codec, final CompressionCodec compressor, final boolean sized) - throws IOException { - doBuildCellBlockUndoCellBlock(util, codec, compressor, count, size, sized); - } - - /** - * For running a few tests of methods herein. - * @param args - * @throws IOException - */ - public static void main(String[] args) throws IOException { - int count = 1024; - int size = 10240; - for (String arg: args) { - if (arg.startsWith(COUNT)) { - count = Integer.parseInt(arg.replace(COUNT, "")); - } else if (arg.startsWith(SIZE)) { - size = Integer.parseInt(arg.replace(SIZE, "")); - } else { - usage(1); - } - } - IPCUtil util = new IPCUtil(HBaseConfiguration.create()); - ((Log4JLogger)IPCUtil.LOG).getLogger().setLevel(Level.ALL); - timerTests(util, count, size, new KeyValueCodec(), null); - timerTests(util, count, size, new KeyValueCodec(), new DefaultCodec()); - timerTests(util, count, size, new KeyValueCodec(), new GzipCodec()); + public void testWrapException() throws Exception { + final InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 0); + assertTrue(wrapException(address, new ConnectException()) instanceof ConnectException); + assertTrue( + wrapException(address, new SocketTimeoutException()) instanceof SocketTimeoutException); + assertTrue(wrapException(address, new ConnectionClosingException( + "Test AbstractRpcClient#wrapException")) instanceof ConnectionClosingException); + assertTrue( + wrapException(address, new CallTimeoutException("Test AbstractRpcClient#wrapException")) + .getCause() instanceof CallTimeoutException); } }
