Repository: hbase
Updated Branches:
  refs/heads/master d3decaab8 -> 0ae211eb3


http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae211eb/hbase-protocol-shaded/src/main/protobuf/RPC.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/RPC.proto 
b/hbase-protocol-shaded/src/main/protobuf/RPC.proto
index 0cb234d..9cdf98c 100644
--- a/hbase-protocol-shaded/src/main/protobuf/RPC.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/RPC.proto
@@ -89,6 +89,14 @@ message ConnectionHeader {
   // Class must implement hadoop's CompressionCodec Interface.  Can't compress 
if no codec.
   optional string cell_block_compressor_class = 4;
   optional VersionInfo version_info = 5;
+  // the transformation for rpc AES encryption with Apache Commons Crypto
+  optional string rpc_crypto_cipher_transformation = 6;
+}
+
+// This is sent by rpc server to negotiate the data if necessary
+message ConnectionHeaderResponse {
+  // To use Apache Commons Crypto, negotiate the metadata
+  optional CryptoCipherMeta crypto_cipher_meta = 1;
 }
 
 // Optional Cell block Message.  Included in client RequestHeader
@@ -112,6 +120,17 @@ message ExceptionResponse {
   optional bool do_not_retry = 5;
 }
 
+/**
+ * Cipher meta for Crypto
+ */
+message CryptoCipherMeta {
+  required string transformation = 1;
+  optional bytes inKey = 2;
+  optional bytes inIv = 3;
+  optional bytes outKey = 4;
+  optional bytes outIv = 5;
+}
+
 // Header sent making a request.
 message RequestHeader {
   // Monotonically increasing call_id to keep track of RPC requests and their 
response

http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae211eb/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 836c4bc..00c7254 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -45,6 +45,7 @@ import java.nio.channels.Selector;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.nio.channels.WritableByteChannel;
+import java.security.GeneralSecurityException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -53,6 +54,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -70,6 +72,9 @@ import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslServer;
 
+import org.apache.commons.crypto.cipher.CryptoCipherFactory;
+import org.apache.commons.crypto.random.CryptoRandom;
+import org.apache.commons.crypto.random.CryptoRandomFactory;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -91,10 +96,12 @@ import org.apache.hadoop.hbase.io.ByteBufferInputStream;
 import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
 import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
 import org.apache.hadoop.hbase.io.ByteBufferPool;
+import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
@@ -134,6 +141,7 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.htrace.TraceInfo;
 import org.codehaus.jackson.map.ObjectMapper;
 
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream;
@@ -423,6 +431,12 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
       this.response = new BufferChain(responseBufs);
     }
 
+    protected synchronized void setConnectionHeaderResponse(ByteBuffer 
response) {
+      ByteBuffer[] responseBufs = new ByteBuffer[1];
+      responseBufs[0] = response;
+      this.response = new BufferChain(responseBufs);
+    }
+
     protected synchronized void setResponse(Object m, final CellScanner cells,
         Throwable t, String errorMsg) {
       if (this.isError) return;
@@ -565,9 +579,16 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
       byte [] responseBytes = bc.getBytes();
       byte [] token;
       // synchronization may be needed since there can be multiple Handler
-      // threads using saslServer to wrap responses.
-      synchronized (connection.saslServer) {
-        token = connection.saslServer.wrap(responseBytes, 0, 
responseBytes.length);
+      // threads using saslServer or Crypto AES to wrap responses.
+      if (connection.useCryptoAesWrap) {
+        // wrap with Crypto AES
+        synchronized (connection.cryptoAES) {
+          token = connection.cryptoAES.wrap(responseBytes, 0, 
responseBytes.length);
+        }
+      } else {
+        synchronized (connection.saslServer) {
+          token = connection.saslServer.wrap(responseBytes, 0, 
responseBytes.length);
+        }
       }
       if (LOG.isTraceEnabled()) {
         LOG.trace("Adding saslServer wrapped token of size " + token.length
@@ -1255,7 +1276,9 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
     private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
     boolean useSasl;
     SaslServer saslServer;
+    private CryptoAES cryptoAES;
     private boolean useWrap = false;
+    private boolean useCryptoAesWrap = false;
     // Fake 'call' for failed authorization response
     private static final int AUTHORIZATION_FAILED_CALLID = -1;
     private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, 
null, null, null,
@@ -1266,6 +1289,10 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
     private static final int SASL_CALLID = -33;
     private final Call saslCall = new Call(SASL_CALLID, null, null, null, 
null, null, this, null,
         0, null, null, 0);
+    // Fake 'call' for connection header response
+    private static final int CONNECTION_HEADER_RESPONSE_CALLID = -34;
+    private final Call setConnectionHeaderResponseCall = new 
Call(CONNECTION_HEADER_RESPONSE_CALLID,
+        null, null, null, null, null, this, null, 0, null, null, 0);
 
     // was authentication allowed with a fallback to simple auth
     private boolean authenticatedWithFallback;
@@ -1376,7 +1403,13 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
           processOneRpc(saslToken);
         } else {
           byte[] b = saslToken.array();
-          byte [] plaintextData = saslServer.unwrap(b, saslToken.position(), 
saslToken.limit());
+          byte [] plaintextData;
+          if (useCryptoAesWrap) {
+            // unwrap with CryptoAES
+            plaintextData = cryptoAES.unwrap(b, saslToken.position(), 
saslToken.limit());
+          } else {
+            plaintextData = saslServer.unwrap(b, saslToken.position(), 
saslToken.limit());
+          }
           processUnwrappedData(plaintextData);
         }
       } else {
@@ -1503,6 +1536,31 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
       }
     }
 
+    /**
+     * Send the response for connection header
+     */
+    private void doConnectionHeaderResponse(byte[] wrappedCipherMetaData) 
throws IOException {
+      ByteBufferOutputStream response = null;
+      DataOutputStream out = null;
+      try {
+        response = new ByteBufferOutputStream(wrappedCipherMetaData.length + 
4);
+        out = new DataOutputStream(response);
+        out.writeInt(wrappedCipherMetaData.length);
+        out.write(wrappedCipherMetaData);
+
+        
setConnectionHeaderResponseCall.setConnectionHeaderResponse(response.getByteBuffer());
+        setConnectionHeaderResponseCall.responder = responder;
+        setConnectionHeaderResponseCall.sendResponseIfReady();
+      } finally {
+        if (out != null) {
+          out.close();
+        }
+        if (response != null) {
+          response.close();
+        }
+      }
+    }
+
     private void disposeSasl() {
       if (saslServer != null) {
         try {
@@ -1744,6 +1802,10 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
       this.service = getService(services, serviceName);
       if (this.service == null) throw new UnknownServiceException(serviceName);
       setupCellBlockCodecs(this.connectionHeader);
+      RPCProtos.ConnectionHeaderResponse.Builder chrBuilder =
+          RPCProtos.ConnectionHeaderResponse.newBuilder();
+      setupCryptoCipher(this.connectionHeader, chrBuilder);
+      responseConnectionHeader(chrBuilder);
       UserGroupInformation protocolUser = createUser(connectionHeader);
       if (!useSasl) {
         ugi = protocolUser;
@@ -1792,8 +1854,6 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
         AUDITLOG.info("Connection from " + this.hostAddress + " port: " + 
this.remotePort
             + " with unknown version info");
       }
-
-
     }
 
     /**
@@ -1820,6 +1880,92 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
       }
     }
 
+    /**
+     * Set up cipher for rpc encryption with Apache Commons Crypto
+     * @throws FatalConnectionException
+     */
+    private void setupCryptoCipher(final ConnectionHeader header,
+        RPCProtos.ConnectionHeaderResponse.Builder chrBuilder) throws 
FatalConnectionException {
+      // If simple auth, return
+      if (saslServer == null) return;
+      // check if rpc encryption with Crypto AES
+      String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
+      boolean isEncryption = SaslUtil.QualityOfProtection.PRIVACY
+          .getSaslQop().equalsIgnoreCase(qop);
+      boolean isCryptoAesEncryption = isEncryption && conf.getBoolean(
+          "hbase.rpc.crypto.encryption.aes.enabled", false);
+      if (!isCryptoAesEncryption) return;
+      if (!header.hasRpcCryptoCipherTransformation()) return;
+      String transformation = header.getRpcCryptoCipherTransformation();
+      if (transformation == null || transformation.length() == 0) return;
+       // Negotiates AES based on complete saslServer.
+       // The Crypto metadata need to be encrypted and send to client.
+      Properties properties = new Properties();
+      // the property for SecureRandomFactory
+      properties.setProperty(CryptoRandomFactory.CLASSES_KEY,
+          conf.get("hbase.crypto.sasl.encryption.aes.crypto.random",
+              "org.apache.commons.crypto.random.JavaCryptoRandom"));
+      // the property for cipher class
+      properties.setProperty(CryptoCipherFactory.CLASSES_KEY,
+          conf.get("hbase.rpc.crypto.encryption.aes.cipher.class",
+              "org.apache.commons.crypto.cipher.JceCipher"));
+
+      int cipherKeyBits = conf.getInt(
+          "hbase.rpc.crypto.encryption.aes.cipher.keySizeBits", 128);
+      // generate key and iv
+      if (cipherKeyBits % 8 != 0) {
+        throw new IllegalArgumentException("The AES cipher key size in bits" +
+            " should be a multiple of byte");
+      }
+      int len = cipherKeyBits / 8;
+      byte[] inKey = new byte[len];
+      byte[] outKey = new byte[len];
+      byte[] inIv = new byte[len];
+      byte[] outIv = new byte[len];
+
+      try {
+        // generate the cipher meta data with SecureRandom
+        CryptoRandom secureRandom = 
CryptoRandomFactory.getCryptoRandom(properties);
+        secureRandom.nextBytes(inKey);
+        secureRandom.nextBytes(outKey);
+        secureRandom.nextBytes(inIv);
+        secureRandom.nextBytes(outIv);
+
+        // create CryptoAES for server
+        cryptoAES = new CryptoAES(transformation, properties,
+            inKey, outKey, inIv, outIv);
+        // create SaslCipherMeta and send to client,
+        //  for client, the [inKey, outKey], [inIv, outIv] should be reversed
+        RPCProtos.CryptoCipherMeta.Builder ccmBuilder = 
RPCProtos.CryptoCipherMeta.newBuilder();
+        ccmBuilder.setTransformation(transformation);
+        ccmBuilder.setInIv(getByteString(outIv));
+        ccmBuilder.setInKey(getByteString(outKey));
+        ccmBuilder.setOutIv(getByteString(inIv));
+        ccmBuilder.setOutKey(getByteString(inKey));
+        chrBuilder.setCryptoCipherMeta(ccmBuilder);
+        useCryptoAesWrap = true;
+      } catch (GeneralSecurityException | IOException ex) {
+        throw new UnsupportedCryptoException(ex.getMessage(), ex);
+      }
+    }
+
+    private void 
responseConnectionHeader(RPCProtos.ConnectionHeaderResponse.Builder chrBuilder)
+        throws FatalConnectionException {
+      // Response the connection header if Crypto AES is enabled
+      if (!chrBuilder.hasCryptoCipherMeta()) return;
+      try {
+        byte[] connectionHeaderResBytes = chrBuilder.build().toByteArray();
+        // encrypt the Crypto AES cipher meta data with sasl server, and send 
to client
+        byte[] unwrapped = new byte[connectionHeaderResBytes.length + 4];
+        Bytes.putBytes(unwrapped, 0, 
Bytes.toBytes(connectionHeaderResBytes.length), 0, 4);
+        Bytes.putBytes(unwrapped, 4, connectionHeaderResBytes, 0, 
connectionHeaderResBytes.length);
+
+        doConnectionHeaderResponse(saslServer.wrap(unwrapped, 0, 
unwrapped.length));
+      } catch (IOException ex) {
+        throw new UnsupportedCryptoException(ex.getMessage(), ex);
+      }
+    }
+
     private void processUnwrappedData(byte[] inBuf) throws IOException,
     InterruptedException {
       ReadableByteChannel ch = Channels.newChannel(new 
ByteArrayInputStream(inBuf));
@@ -1858,7 +2004,6 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
       }
     }
 
-
     private void processOneRpc(ByteBuffer buf) throws IOException, 
InterruptedException {
       if (connectionHeaderRead) {
         processRequest(buf);
@@ -1988,6 +2133,11 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
       }
     }
 
+    private ByteString getByteString(byte[] bytes) {
+      // return singleton to reduce object allocation
+      return (bytes.length == 0) ? ByteString.EMPTY : 
ByteString.copyFrom(bytes);
+    }
+
     private boolean authorizeConnection() throws IOException {
       try {
         // If auth method is DIGEST, the token was obtained by the

http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae211eb/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java
index 7bc97ca..b7d6f87 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java
@@ -26,6 +26,7 @@ import static 
org.apache.hadoop.hbase.security.HBaseKerberosUtils.getSecuredConf
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertSame;
+import static org.junit.Assert.fail;
 
 import java.io.File;
 import java.io.IOException;
@@ -96,8 +97,8 @@ public class TestSecureIPC {
 
   @Parameters(name = "{index}: rpcClientImpl={0}")
   public static Collection<Object[]> parameters() {
-    return Arrays.asList(new Object[] { BlockingRpcClient.class.getName() },
-      new Object[] { NettyRpcClient.class.getName() });
+    return Arrays.asList(new Object[]{BlockingRpcClient.class.getName()},
+        new Object[]{NettyRpcClient.class.getName()});
   }
 
   @Parameter
@@ -192,6 +193,42 @@ public class TestSecureIPC {
     callRpcService(User.create(ugi));
   }
 
+  /**
+   * Test sasl encryption with Crypto AES.
+   * @throws Exception
+   */
+  @Test
+  public void testSaslWithCryptoAES() throws Exception {
+    setRpcProtection("privacy", "privacy");
+    setCryptoAES("true", "true");
+    callRpcService(User.create(ugi));
+  }
+
+  /**
+   * Test various combinations of Server and Client configuration for Crypto 
AES.
+   * @throws Exception
+   */
+  @Test
+  public void testDifferentConfWithCryptoAES() throws Exception {
+    setRpcProtection("privacy", "privacy");
+
+    setCryptoAES("false", "true");
+    callRpcService(User.create(ugi));
+
+    setCryptoAES("true", "false");
+    try {
+      callRpcService(User.create(ugi));
+      fail("The exception should be thrown out for the rpc timeout.");
+    } catch (Exception e) {
+      // ignore the expected exception
+    }
+  }
+
+  void setCryptoAES(String clientCryptoAES, String serverCryptoAES) {
+    clientConf.set("hbase.rpc.crypto.encryption.aes.enabled", clientCryptoAES);
+    serverConf.set("hbase.rpc.crypto.encryption.aes.enabled", serverCryptoAES);
+  }
+
   private UserGroupInformation loginKerberosPrincipal(String krbKeytab, String 
krbPrincipal)
       throws Exception {
     Configuration cnf = new Configuration();

http://git-wip-us.apache.org/repos/asf/hbase/blob/0ae211eb/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 35b9708..e08f64e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1228,6 +1228,7 @@
     <spy.version>2.11.6</spy.version>
     <bouncycastle.version>1.46</bouncycastle.version>
     <kerby.version>1.0.0-RC2</kerby.version>
+    <commons-crypto.version>1.0.0</commons-crypto.version>
     <!-- Plugin Dependencies -->
     <maven.assembly.version>2.4</maven.assembly.version>
     <maven.antrun.version>1.8</maven.antrun.version>
@@ -1806,6 +1807,17 @@
        <artifactId>kerb-simplekdc</artifactId>
        <version>${kerby.version}</version>
      </dependency>
+     <dependency>
+       <groupId>org.apache.commons</groupId>
+       <artifactId>commons-crypto</artifactId>
+       <version>${commons-crypto.version}</version>
+       <exclusions>
+         <exclusion>
+           <groupId>net.java.dev.jna</groupId>
+           <artifactId>jna</artifactId>
+         </exclusion>
+       </exclusions>
+     </dependency>
     </dependencies>
   </dependencyManagement>
   <!-- Dependencies needed by subprojects -->

Reply via email to