Repository: hbase Updated Branches: refs/heads/master d3decaab8 -> 0ae211eb3
http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae211eb/hbase-protocol-shaded/src/main/protobuf/RPC.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol-shaded/src/main/protobuf/RPC.proto b/hbase-protocol-shaded/src/main/protobuf/RPC.proto index 0cb234d..9cdf98c 100644 --- a/hbase-protocol-shaded/src/main/protobuf/RPC.proto +++ b/hbase-protocol-shaded/src/main/protobuf/RPC.proto @@ -89,6 +89,14 @@ message ConnectionHeader { // Class must implement hadoop's CompressionCodec Interface. Can't compress if no codec. optional string cell_block_compressor_class = 4; optional VersionInfo version_info = 5; + // the transformation for rpc AES encryption with Apache Commons Crypto + optional string rpc_crypto_cipher_transformation = 6; +} + +// This is sent by rpc server to negotiate the data if necessary +message ConnectionHeaderResponse { + // To use Apache Commons Crypto, negotiate the metadata + optional CryptoCipherMeta crypto_cipher_meta = 1; } // Optional Cell block Message. Included in client RequestHeader @@ -112,6 +120,17 @@ message ExceptionResponse { optional bool do_not_retry = 5; } +/** + * Cipher meta for Crypto + */ +message CryptoCipherMeta { + required string transformation = 1; + optional bytes inKey = 2; + optional bytes inIv = 3; + optional bytes outKey = 4; + optional bytes outIv = 5; +} + // Header sent making a request. message RequestHeader { // Monotonically increasing call_id to keep track of RPC requests and their response http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae211eb/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 836c4bc..00c7254 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -45,6 +45,7 @@ import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.channels.WritableByteChannel; +import java.security.GeneralSecurityException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; @@ -53,6 +54,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.Timer; import java.util.TimerTask; @@ -70,6 +72,9 @@ import javax.security.sasl.Sasl; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; +import org.apache.commons.crypto.cipher.CryptoCipherFactory; +import org.apache.commons.crypto.random.CryptoRandom; +import org.apache.commons.crypto.random.CryptoRandomFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -91,10 +96,12 @@ import org.apache.hadoop.hbase.io.ByteBufferInputStream; import org.apache.hadoop.hbase.io.ByteBufferListOutputStream; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; import org.apache.hadoop.hbase.io.ByteBufferPool; +import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; @@ -134,6 +141,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.htrace.TraceInfo; import org.codehaus.jackson.map.ObjectMapper; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream; import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream; @@ -423,6 +431,12 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { this.response = new BufferChain(responseBufs); } + protected synchronized void setConnectionHeaderResponse(ByteBuffer response) { + ByteBuffer[] responseBufs = new ByteBuffer[1]; + responseBufs[0] = response; + this.response = new BufferChain(responseBufs); + } + protected synchronized void setResponse(Object m, final CellScanner cells, Throwable t, String errorMsg) { if (this.isError) return; @@ -565,9 +579,16 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { byte [] responseBytes = bc.getBytes(); byte [] token; // synchronization may be needed since there can be multiple Handler - // threads using saslServer to wrap responses. - synchronized (connection.saslServer) { - token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length); + // threads using saslServer or Crypto AES to wrap responses. + if (connection.useCryptoAesWrap) { + // wrap with Crypto AES + synchronized (connection.cryptoAES) { + token = connection.cryptoAES.wrap(responseBytes, 0, responseBytes.length); + } + } else { + synchronized (connection.saslServer) { + token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length); + } } if (LOG.isTraceEnabled()) { LOG.trace("Adding saslServer wrapped token of size " + token.length @@ -1255,7 +1276,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4); boolean useSasl; SaslServer saslServer; + private CryptoAES cryptoAES; private boolean useWrap = false; + private boolean useCryptoAesWrap = false; // Fake 'call' for failed authorization response private static final int AUTHORIZATION_FAILED_CALLID = -1; private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null, null, @@ -1266,6 +1289,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { private static final int SASL_CALLID = -33; private final Call saslCall = new Call(SASL_CALLID, null, null, null, null, null, this, null, 0, null, null, 0); + // Fake 'call' for connection header response + private static final int CONNECTION_HEADER_RESPONSE_CALLID = -34; + private final Call setConnectionHeaderResponseCall = new Call(CONNECTION_HEADER_RESPONSE_CALLID, + null, null, null, null, null, this, null, 0, null, null, 0); // was authentication allowed with a fallback to simple auth private boolean authenticatedWithFallback; @@ -1376,7 +1403,13 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { processOneRpc(saslToken); } else { byte[] b = saslToken.array(); - byte [] plaintextData = saslServer.unwrap(b, saslToken.position(), saslToken.limit()); + byte [] plaintextData; + if (useCryptoAesWrap) { + // unwrap with CryptoAES + plaintextData = cryptoAES.unwrap(b, saslToken.position(), saslToken.limit()); + } else { + plaintextData = saslServer.unwrap(b, saslToken.position(), saslToken.limit()); + } processUnwrappedData(plaintextData); } } else { @@ -1503,6 +1536,31 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } } + /** + * Send the response for connection header + */ + private void doConnectionHeaderResponse(byte[] wrappedCipherMetaData) throws IOException { + ByteBufferOutputStream response = null; + DataOutputStream out = null; + try { + response = new ByteBufferOutputStream(wrappedCipherMetaData.length + 4); + out = new DataOutputStream(response); + out.writeInt(wrappedCipherMetaData.length); + out.write(wrappedCipherMetaData); + + setConnectionHeaderResponseCall.setConnectionHeaderResponse(response.getByteBuffer()); + setConnectionHeaderResponseCall.responder = responder; + setConnectionHeaderResponseCall.sendResponseIfReady(); + } finally { + if (out != null) { + out.close(); + } + if (response != null) { + response.close(); + } + } + } + private void disposeSasl() { if (saslServer != null) { try { @@ -1744,6 +1802,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { this.service = getService(services, serviceName); if (this.service == null) throw new UnknownServiceException(serviceName); setupCellBlockCodecs(this.connectionHeader); + RPCProtos.ConnectionHeaderResponse.Builder chrBuilder = + RPCProtos.ConnectionHeaderResponse.newBuilder(); + setupCryptoCipher(this.connectionHeader, chrBuilder); + responseConnectionHeader(chrBuilder); UserGroupInformation protocolUser = createUser(connectionHeader); if (!useSasl) { ugi = protocolUser; @@ -1792,8 +1854,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort + " with unknown version info"); } - - } /** @@ -1820,6 +1880,92 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } } + /** + * Set up cipher for rpc encryption with Apache Commons Crypto + * @throws FatalConnectionException + */ + private void setupCryptoCipher(final ConnectionHeader header, + RPCProtos.ConnectionHeaderResponse.Builder chrBuilder) throws FatalConnectionException { + // If simple auth, return + if (saslServer == null) return; + // check if rpc encryption with Crypto AES + String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP); + boolean isEncryption = SaslUtil.QualityOfProtection.PRIVACY + .getSaslQop().equalsIgnoreCase(qop); + boolean isCryptoAesEncryption = isEncryption && conf.getBoolean( + "hbase.rpc.crypto.encryption.aes.enabled", false); + if (!isCryptoAesEncryption) return; + if (!header.hasRpcCryptoCipherTransformation()) return; + String transformation = header.getRpcCryptoCipherTransformation(); + if (transformation == null || transformation.length() == 0) return; + // Negotiates AES based on complete saslServer. + // The Crypto metadata need to be encrypted and send to client. + Properties properties = new Properties(); + // the property for SecureRandomFactory + properties.setProperty(CryptoRandomFactory.CLASSES_KEY, + conf.get("hbase.crypto.sasl.encryption.aes.crypto.random", + "org.apache.commons.crypto.random.JavaCryptoRandom")); + // the property for cipher class + properties.setProperty(CryptoCipherFactory.CLASSES_KEY, + conf.get("hbase.rpc.crypto.encryption.aes.cipher.class", + "org.apache.commons.crypto.cipher.JceCipher")); + + int cipherKeyBits = conf.getInt( + "hbase.rpc.crypto.encryption.aes.cipher.keySizeBits", 128); + // generate key and iv + if (cipherKeyBits % 8 != 0) { + throw new IllegalArgumentException("The AES cipher key size in bits" + + " should be a multiple of byte"); + } + int len = cipherKeyBits / 8; + byte[] inKey = new byte[len]; + byte[] outKey = new byte[len]; + byte[] inIv = new byte[len]; + byte[] outIv = new byte[len]; + + try { + // generate the cipher meta data with SecureRandom + CryptoRandom secureRandom = CryptoRandomFactory.getCryptoRandom(properties); + secureRandom.nextBytes(inKey); + secureRandom.nextBytes(outKey); + secureRandom.nextBytes(inIv); + secureRandom.nextBytes(outIv); + + // create CryptoAES for server + cryptoAES = new CryptoAES(transformation, properties, + inKey, outKey, inIv, outIv); + // create SaslCipherMeta and send to client, + // for client, the [inKey, outKey], [inIv, outIv] should be reversed + RPCProtos.CryptoCipherMeta.Builder ccmBuilder = RPCProtos.CryptoCipherMeta.newBuilder(); + ccmBuilder.setTransformation(transformation); + ccmBuilder.setInIv(getByteString(outIv)); + ccmBuilder.setInKey(getByteString(outKey)); + ccmBuilder.setOutIv(getByteString(inIv)); + ccmBuilder.setOutKey(getByteString(inKey)); + chrBuilder.setCryptoCipherMeta(ccmBuilder); + useCryptoAesWrap = true; + } catch (GeneralSecurityException | IOException ex) { + throw new UnsupportedCryptoException(ex.getMessage(), ex); + } + } + + private void responseConnectionHeader(RPCProtos.ConnectionHeaderResponse.Builder chrBuilder) + throws FatalConnectionException { + // Response the connection header if Crypto AES is enabled + if (!chrBuilder.hasCryptoCipherMeta()) return; + try { + byte[] connectionHeaderResBytes = chrBuilder.build().toByteArray(); + // encrypt the Crypto AES cipher meta data with sasl server, and send to client + byte[] unwrapped = new byte[connectionHeaderResBytes.length + 4]; + Bytes.putBytes(unwrapped, 0, Bytes.toBytes(connectionHeaderResBytes.length), 0, 4); + Bytes.putBytes(unwrapped, 4, connectionHeaderResBytes, 0, connectionHeaderResBytes.length); + + doConnectionHeaderResponse(saslServer.wrap(unwrapped, 0, unwrapped.length)); + } catch (IOException ex) { + throw new UnsupportedCryptoException(ex.getMessage(), ex); + } + } + private void processUnwrappedData(byte[] inBuf) throws IOException, InterruptedException { ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf)); @@ -1858,7 +2004,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } } - private void processOneRpc(ByteBuffer buf) throws IOException, InterruptedException { if (connectionHeaderRead) { processRequest(buf); @@ -1988,6 +2133,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } } + private ByteString getByteString(byte[] bytes) { + // return singleton to reduce object allocation + return (bytes.length == 0) ? ByteString.EMPTY : ByteString.copyFrom(bytes); + } + private boolean authorizeConnection() throws IOException { try { // If auth method is DIGEST, the token was obtained by the http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae211eb/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java index 7bc97ca..b7d6f87 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java @@ -26,6 +26,7 @@ import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getSecuredConf import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertSame; +import static org.junit.Assert.fail; import java.io.File; import java.io.IOException; @@ -96,8 +97,8 @@ public class TestSecureIPC { @Parameters(name = "{index}: rpcClientImpl={0}") public static Collection<Object[]> parameters() { - return Arrays.asList(new Object[] { BlockingRpcClient.class.getName() }, - new Object[] { NettyRpcClient.class.getName() }); + return Arrays.asList(new Object[]{BlockingRpcClient.class.getName()}, + new Object[]{NettyRpcClient.class.getName()}); } @Parameter @@ -192,6 +193,42 @@ public class TestSecureIPC { callRpcService(User.create(ugi)); } + /** + * Test sasl encryption with Crypto AES. + * @throws Exception + */ + @Test + public void testSaslWithCryptoAES() throws Exception { + setRpcProtection("privacy", "privacy"); + setCryptoAES("true", "true"); + callRpcService(User.create(ugi)); + } + + /** + * Test various combinations of Server and Client configuration for Crypto AES. + * @throws Exception + */ + @Test + public void testDifferentConfWithCryptoAES() throws Exception { + setRpcProtection("privacy", "privacy"); + + setCryptoAES("false", "true"); + callRpcService(User.create(ugi)); + + setCryptoAES("true", "false"); + try { + callRpcService(User.create(ugi)); + fail("The exception should be thrown out for the rpc timeout."); + } catch (Exception e) { + // ignore the expected exception + } + } + + void setCryptoAES(String clientCryptoAES, String serverCryptoAES) { + clientConf.set("hbase.rpc.crypto.encryption.aes.enabled", clientCryptoAES); + serverConf.set("hbase.rpc.crypto.encryption.aes.enabled", serverCryptoAES); + } + private UserGroupInformation loginKerberosPrincipal(String krbKeytab, String krbPrincipal) throws Exception { Configuration cnf = new Configuration(); http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae211eb/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 35b9708..e08f64e 100644 --- a/pom.xml +++ b/pom.xml @@ -1228,6 +1228,7 @@ <spy.version>2.11.6</spy.version> <bouncycastle.version>1.46</bouncycastle.version> <kerby.version>1.0.0-RC2</kerby.version> + <commons-crypto.version>1.0.0</commons-crypto.version> <!-- Plugin Dependencies --> <maven.assembly.version>2.4</maven.assembly.version> <maven.antrun.version>1.8</maven.antrun.version> @@ -1806,6 +1807,17 @@ <artifactId>kerb-simplekdc</artifactId> <version>${kerby.version}</version> </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-crypto</artifactId> + <version>${commons-crypto.version}</version> + <exclusions> + <exclusion> + <groupId>net.java.dev.jna</groupId> + <artifactId>jna</artifactId> + </exclusion> + </exclusions> + </dependency> </dependencies> </dependencyManagement> <!-- Dependencies needed by subprojects -->
