HDDS-9. Add GRPC protocol interceptors for Ozone Block Token. Contributed by 
Xiaoyu Yao.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/aaf3d717
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/aaf3d717
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/aaf3d717

Branch: refs/heads/HDDS-4
Commit: aaf3d717d2378eed665fc5614035ca3d82926578
Parents: dcbcae5
Author: Xiaoyu Yao <[email protected]>
Authored: Tue Nov 20 20:21:08 2018 -0800
Committer: Xiaoyu Yao <[email protected]>
Committed: Wed Nov 21 13:19:38 2018 -0800

----------------------------------------------------------------------
 .../hdds/scm/ClientCredentialInterceptor.java   |  65 ++++
 .../hadoop/hdds/scm/XceiverClientGrpc.java      |  64 +++-
 .../org/apache/hadoop/hdds/HddsConfigKeys.java  |   5 +-
 .../exception/SCMSecurityException.java         |  52 +++
 .../hdds/security/exception/package-info.java   |  23 ++
 .../security/token/BlockTokenException.java     |  53 ++++
 .../hdds/security/token/BlockTokenVerifier.java | 113 +++++++
 .../token/OzoneBlockTokenIdentifier.java        | 199 ++++++++++++
 .../security/token/OzoneBlockTokenSelector.java |  55 ++++
 .../hdds/security/token/TokenVerifier.java      |  38 +++
 .../hdds/security/token/package-info.java       |  22 ++
 .../hdds/security/x509/SecurityConfig.java      |  11 +
 .../authority/CertificateServer.java            |   2 +-
 .../certificate/client/CertificateClient.java   |  11 +
 .../certificates/CertificateSignRequest.java    |   2 +-
 .../certificates/SelfSignedCertificate.java     |   2 +-
 .../x509/exceptions/CertificateException.java   |  14 +-
 .../x509/exceptions/SCMSecurityException.java   |  64 ----
 .../org/apache/hadoop/ozone/OzoneConsts.java    |  14 +
 hadoop-hdds/common/src/main/proto/hdds.proto    |   2 +-
 .../token/TestOzoneBlockTokenIdentifier.java    | 313 +++++++++++++++++++
 .../hdds/security/token/package-info.java       |  22 ++
 .../TestCertificateSignRequest.java             |   2 +-
 .../x509/certificates/TestRootCertificate.java  |   2 +-
 .../server/ServerCredentialInterceptor.java     |  74 +++++
 .../transport/server/XceiverServerGrpc.java     |  23 +-
 .../security/OzoneBlockTokenIdentifier.java     | 178 -----------
 .../ozone/security/OzoneBlockTokenSelector.java |  55 ----
 .../ozoneimpl/TestSecureOzoneContainer.java     | 209 +++++++++++++
 .../security/TestOzoneBlockTokenIdentifier.java | 255 ---------------
 30 files changed, 1363 insertions(+), 581 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf3d717/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ClientCredentialInterceptor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ClientCredentialInterceptor.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ClientCredentialInterceptor.java
new file mode 100644
index 0000000..7a15808
--- /dev/null
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ClientCredentialInterceptor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm;
+
+import org.apache.ratis.thirdparty.io.grpc.CallOptions;
+import org.apache.ratis.thirdparty.io.grpc.Channel;
+import org.apache.ratis.thirdparty.io.grpc.ClientCall;
+import org.apache.ratis.thirdparty.io.grpc.ClientInterceptor;
+import org.apache.ratis.thirdparty.io.grpc.ForwardingClientCall;
+import org.apache.ratis.thirdparty.io.grpc.Metadata;
+import org.apache.ratis.thirdparty.io.grpc.MethodDescriptor;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OBT_METADATA_KEY;
+import static org.apache.hadoop.ozone.OzoneConsts.USER_METADATA_KEY;
+
+/**
+ * GRPC client interceptor for ozone block token.
+ */
+public class ClientCredentialInterceptor implements ClientInterceptor {
+
+  private final String user;
+  private final String token;
+
+  public ClientCredentialInterceptor(String user, String token) {
+    this.user = user;
+    this.token = token;
+  }
+
+  @Override
+  public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
+      MethodDescriptor<ReqT, RespT> method,
+      CallOptions callOptions,
+      Channel next) {
+
+    return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
+        next.newCall(method, callOptions)) {
+      @Override
+      public void start(Listener<RespT> responseListener, Metadata headers) {
+        if (token != null) {
+          headers.put(OBT_METADATA_KEY, token);
+        }
+        if (user != null) {
+          headers.put(USER_METADATA_KEY, user);
+        }
+        super.start(responseListener, headers);
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf3d717/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index a824c29..ccc9372 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -31,22 +31,32 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServi
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSelector;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Time;
 import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
+import org.apache.ratis.thirdparty.io.grpc.Status;
 import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.UUID;
-import java.util.Map;
-import java.util.HashMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -93,7 +103,9 @@ public class XceiverClientGrpc extends XceiverClientSpi {
     connectToDatanode(dn);
   }
 
-  private void connectToDatanode(DatanodeDetails dn) {
+
+  private void connectToDatanode(DatanodeDetails dn) throws IOException,
+      SCMSecurityException {
     // read port from the data node, on failure use default configured
     // port.
     int port = dn.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue();
@@ -101,16 +113,49 @@ public class XceiverClientGrpc extends XceiverClientSpi {
       port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
           OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
     }
+
+    // Add credential context to the client call
+    String userName = UserGroupInformation.getCurrentUser()
+        .getShortUserName();
+
+    // Add block token if block token (mutual auth) is required but the client
+    // does not have a mTLS (private key and ca signed certificate)
+    String encodedToken = null;
+    SecurityConfig secConfig = new SecurityConfig(config);
+    if (secConfig.isGrpcBlockTokenEnabled()) {
+      InetSocketAddress addr = new InetSocketAddress(dn.getIpAddress(), port);
+      encodedToken = getEncodedBlockToken(addr);
+      if (encodedToken == null) {
+        throw new SCMSecurityException("No Block token available to access " +
+            "service at : " + addr.toString());
+      }
+    }
     LOG.debug("Connecting to server Port : " + dn.getIpAddress());
-    ManagedChannel channel =
-        NettyChannelBuilder.forAddress(dn.getIpAddress(), port).usePlaintext()
+    NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(dn
+            .getIpAddress(), port).usePlaintext()
             .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE)
-            .build();
+            .intercept(new ClientCredentialInterceptor(userName, 
encodedToken));
+    ManagedChannel channel = channelBuilder.build();
     XceiverClientProtocolServiceStub asyncStub =
         XceiverClientProtocolServiceGrpc.newStub(channel);
     asyncStubs.put(dn.getUuid(), asyncStub);
     channels.put(dn.getUuid(), channel);
   }
+
+  private String getEncodedBlockToken(InetSocketAddress addr)
+      throws IOException{
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    OzoneBlockTokenSelector tokenSelector = new OzoneBlockTokenSelector();
+    Text service = SecurityUtil.buildTokenService(addr);
+    Token<OzoneBlockTokenIdentifier> token = tokenSelector.selectToken(
+        service, ugi.getTokens());
+    if (token != null) {
+      token.setService(service);
+      return token.encodeToUrlString();
+    }
+    return null;
+  }
+
   /**
    * Returns if the xceiver client connects to all servers in the pipeline.
    *
@@ -173,6 +218,11 @@ public class XceiverClientGrpc extends XceiverClientSpi {
       } catch (ExecutionException | InterruptedException e) {
         LOG.warn("Failed to execute command " + request + " on datanode " + dn
             .getUuidString(), e);
+        if (Status.fromThrowable(e.getCause()).getCode()
+            == Status.UNAUTHENTICATED.getCode()) {
+          throw new SCMSecurityException("Failed to authenticate with " +
+              "GRPC XceiverServer with Ozone block token.");
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf3d717/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
index afc699f..b4e4f8e 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
@@ -137,6 +137,7 @@ public final class HddsConfigKeys {
       "hdds.x509.signature.algorithm";
   public static final String HDDS_X509_SIGNATURE_ALGO_DEFAULT = 
"SHA256withRSA";
 
-
-
+  public static final String HDDS_GRPC_BLOCK_TOKEN_ENABLED = "hdds.grpc.block" 
+
+      ".token.enabled";
+  public static final boolean HDDS_GRPC_BLOCK_TOKEN_ENABLED_DEFAULT = false;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf3d717/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecurityException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecurityException.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecurityException.java
new file mode 100644
index 0000000..21bc45b
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecurityException.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.security.exception;
+
+import java.io.IOException;
+
+/**
+ * Root Security Exception call for all Certificate related Execptions.
+ */
+public class SCMSecurityException extends IOException {
+
+  /**
+   * Ctor.
+   * @param message - Error Message.
+   */
+  public SCMSecurityException(String message) {
+    super(message);
+  }
+
+  /**
+   * Ctor.
+   * @param message - Message.
+   * @param cause  - Actual cause.
+   */
+  public SCMSecurityException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  /**
+   * Ctor.
+   * @param cause - Base Exception.
+   */
+  public SCMSecurityException(Throwable cause) {
+    super(cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf3d717/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/package-info.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/package-info.java
new file mode 100644
index 0000000..b980592
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/**
+ * Exceptions thrown by SCM security classes.
+ */
+package org.apache.hadoop.hdds.security.exception;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf3d717/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/BlockTokenException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/BlockTokenException.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/BlockTokenException.java
new file mode 100644
index 0000000..7ea0ebc
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/BlockTokenException.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.security.token;
+
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+
+/**
+ * Block Token Exceptions from the SCM Security layer.
+ */
+public class BlockTokenException extends SCMSecurityException {
+
+  /**
+   * Ctor.
+   * @param message - Error Message.
+   */
+  public BlockTokenException(String message) {
+    super(message);
+  }
+
+  /**
+   * Ctor.
+   * @param message - Message.
+   * @param cause  - Actual cause.
+   */
+  public BlockTokenException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  /**
+   * Ctor.
+   * @param cause - Base Exception.
+   */
+  public BlockTokenException(Throwable cause) {
+    super(cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf3d717/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/BlockTokenVerifier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/BlockTokenVerifier.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/BlockTokenVerifier.java
new file mode 100644
index 0000000..8c0d19e
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/BlockTokenVerifier.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.security.token;
+
+import com.google.common.base.Strings;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Time;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.security.cert.X509Certificate;
+
+
+/**
+ * Verify token and return a UGI with token if authenticated.
+ */
+public class BlockTokenVerifier implements TokenVerifier {
+
+  private final CertificateClient caClient;
+  private final SecurityConfig conf;
+
+  public BlockTokenVerifier(SecurityConfig conf, CertificateClient caClient) {
+    this.conf = conf;
+    this.caClient = caClient;
+  }
+
+  private boolean isExpired(long expiryDate) {
+    return Time.now() > expiryDate;
+  }
+
+  @Override
+  public UserGroupInformation verify(String user, String tokenStr)
+      throws SCMSecurityException {
+    if (conf.isGrpcBlockTokenEnabled()) {
+      if (Strings.isNullOrEmpty(tokenStr)) {
+        throw new BlockTokenException("Fail to find any token (empty or " +
+            "null.");
+      }
+      final Token<OzoneBlockTokenIdentifier> token = new Token();
+      OzoneBlockTokenIdentifier tokenId = new OzoneBlockTokenIdentifier();
+      try {
+        token.decodeFromUrlString(tokenStr);
+        ByteArrayInputStream buf = new ByteArrayInputStream(
+            token.getIdentifier());
+        DataInputStream in = new DataInputStream(buf);
+        tokenId.readFields(in);
+      } catch (IOException ex) {
+        throw new BlockTokenException("Failed to decode token : " + tokenStr);
+      }
+
+      // TODO: revisit this when caClient is ready, skip signature check now.
+      /**
+       * the final code should like
+       * if (caClient == null) {
+       *   throw new SCMSecurityException("Certificate client not available to
+       *       validate token");
+       * }
+       */
+      if (caClient != null) {
+        X509Certificate singerCert = caClient.queryCertificate(
+            "certId=" + tokenId.getOmCertSerialId());
+        if (singerCert == null) {
+          throw new BlockTokenException("Can't find signer certificate " +
+              "(OmCertSerialId: " + tokenId.getOmCertSerialId() +
+              ") of the block token for user: " + tokenId.getUser());
+        }
+        Boolean validToken = caClient.verifySignature(tokenId.getBytes(),
+            token.getPassword(), singerCert);
+        if (!validToken) {
+          throw new BlockTokenException("Invalid block token for user: " +
+              tokenId.getUser());
+        }
+      }
+      // check expiration
+      if (isExpired(tokenId.getExpiryDate())) {
+        UserGroupInformation tokenUser = tokenId.getUser();
+        tokenUser.setAuthenticationMethod(
+            UserGroupInformation.AuthenticationMethod.TOKEN);
+        throw new BlockTokenException("Expired block token for user: " +
+            tokenUser);
+      }
+      // defer access mode, bcsid and maxLength check to container dispatcher
+      UserGroupInformation ugi = tokenId.getUser();
+      ugi.addToken(token);
+      ugi.setAuthenticationMethod(UserGroupInformation
+          .AuthenticationMethod.TOKEN);
+      return ugi;
+    } else {
+      return UserGroupInformation.createRemoteUser(user);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf3d717/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/OzoneBlockTokenIdentifier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/OzoneBlockTokenIdentifier.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/OzoneBlockTokenIdentifier.java
new file mode 100644
index 0000000..89457fd
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/OzoneBlockTokenIdentifier.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.security.token;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto;
+import 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto;
+import 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.Builder;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.EnumSet;
+
+/**
+ * Block token identifier for Ozone/HDDS. Ozone block access token is similar
+ * to HDFS block access token, which is meant to be lightweight and
+ * short-lived. No need to renew or revoke a block access token. when a
+ * cached block access token expires, the client simply get a new one.
+ * Block access token should be cached only in memory and never write to disk.
+ */
[email protected]
+public class OzoneBlockTokenIdentifier extends TokenIdentifier {
+
+  static final Text KIND_NAME = new Text("HDDS_BLOCK_TOKEN");
+  private long expiryDate;
+  private String ownerId;
+  private String blockId;
+  private EnumSet<AccessModeProto> modes;
+  private String omCertSerialId;
+  private long maxLength;
+
+  public OzoneBlockTokenIdentifier() {
+  }
+
+  public OzoneBlockTokenIdentifier(String ownerId, String blockId,
+      EnumSet<AccessModeProto> modes, long expiryDate, String omCertSerialId,
+      long maxLength) {
+    this.ownerId = ownerId;
+    this.blockId = blockId;
+    this.expiryDate = expiryDate;
+    this.modes = modes == null ? EnumSet.noneOf(AccessModeProto.class) : modes;
+    this.omCertSerialId = omCertSerialId;
+    this.maxLength = maxLength;
+  }
+
+  @Override
+  public UserGroupInformation getUser() {
+    if (this.getOwnerId() == null || "".equals(this.getOwnerId())) {
+      return UserGroupInformation.createRemoteUser(blockId);
+    }
+    return UserGroupInformation.createRemoteUser(ownerId);
+  }
+
+  public long getExpiryDate() {
+    return expiryDate;
+  }
+
+  public String getOwnerId() {
+    return ownerId;
+  }
+
+  public String getBlockId() {
+    return blockId;
+  }
+
+  public EnumSet<AccessModeProto> getAccessModes() {
+    return modes;
+  }
+
+  public String getOmCertSerialId(){
+    return omCertSerialId;
+  }
+
+  public long getMaxLength() {
+    return maxLength;
+  }
+
+  @Override
+  public Text getKind() {
+    return KIND_NAME;
+  }
+
+  @Override
+  public String toString() {
+    return "block_token_identifier (expiryDate=" + this.getExpiryDate()
+        + ", ownerId=" + this.getOwnerId()
+        + ", omCertSerialId=" + this.getOmCertSerialId()
+        + ", blockId=" + this.getBlockId() + ", access modes="
+        + this.getAccessModes() + ", maxLength=" + this.getMaxLength() + ")";
+  }
+
+  static boolean isEqual(Object a, Object b) {
+    return a == null ? b == null : a.equals(b);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == this) {
+      return true;
+    }
+
+    if (obj instanceof OzoneBlockTokenIdentifier) {
+      OzoneBlockTokenIdentifier that = (OzoneBlockTokenIdentifier) obj;
+      return new EqualsBuilder()
+          .append(this.expiryDate, that.expiryDate)
+          .append(this.ownerId, that.ownerId)
+          .append(this.blockId, that.blockId)
+          .append(this.modes, that.modes)
+          .append(this.omCertSerialId, that.omCertSerialId)
+          .append(this.maxLength, that.maxLength)
+          .build();
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder(133, 567)
+        .append(this.expiryDate)
+        .append(this.blockId)
+        .append(this.ownerId)
+        .append(this.modes)
+        .append(this.omCertSerialId)
+        .append(this.maxLength)
+        .build();
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    final DataInputStream dis = (DataInputStream) in;
+    if (!dis.markSupported()) {
+      throw new IOException("Could not peek first byte.");
+    }
+    BlockTokenSecretProto tokenPtoto =
+        BlockTokenSecretProto.parseFrom((DataInputStream) in);
+    this.ownerId = tokenPtoto.getOwnerId();
+    this.blockId = tokenPtoto.getBlockId();
+    this.modes = EnumSet.copyOf(tokenPtoto.getModesList());
+    this.expiryDate = tokenPtoto.getExpiryDate();
+    this.omCertSerialId = tokenPtoto.getOmCertSerialId();
+    this.maxLength = tokenPtoto.getMaxLength();
+  }
+
+  @VisibleForTesting
+  public static OzoneBlockTokenIdentifier readFieldsProtobuf(DataInput in)
+      throws IOException {
+    BlockTokenSecretProto tokenPtoto =
+        BlockTokenSecretProto.parseFrom((DataInputStream) in);
+    return new OzoneBlockTokenIdentifier(tokenPtoto.getOwnerId(),
+        tokenPtoto.getBlockId(), EnumSet.copyOf(tokenPtoto.getModesList()),
+        tokenPtoto.getExpiryDate(), tokenPtoto.getOmCertSerialId(),
+        tokenPtoto.getMaxLength());
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    writeProtobuf(out);
+  }
+
+  @VisibleForTesting
+  void writeProtobuf(DataOutput out) throws IOException {
+    Builder builder = BlockTokenSecretProto.newBuilder()
+        .setBlockId(this.getBlockId())
+        .setOwnerId(this.getOwnerId())
+        .setOmCertSerialId(this.getOmCertSerialId())
+        .setExpiryDate(this.getExpiryDate())
+        .setMaxLength(this.getMaxLength());
+    // Add access mode allowed
+    for (AccessModeProto mode : this.getAccessModes()) {
+      builder.addModes(AccessModeProto.valueOf(mode.name()));
+    }
+    out.write(builder.build().toByteArray());
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf3d717/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/OzoneBlockTokenSelector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/OzoneBlockTokenSelector.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/OzoneBlockTokenSelector.java
new file mode 100644
index 0000000..1943cce
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/OzoneBlockTokenSelector.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.security.token;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+
+/**
+ * A block token selector for Ozone.
+ */
[email protected]
+public class OzoneBlockTokenSelector implements
+    TokenSelector<OzoneBlockTokenIdentifier> {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(OzoneBlockTokenSelector.class);
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public Token<OzoneBlockTokenIdentifier> selectToken(Text service,
+      Collection<Token<? extends TokenIdentifier>> tokens) {
+    if (service == null) {
+      return null;
+    }
+    for (Token<? extends TokenIdentifier> token : tokens) {
+      if (OzoneBlockTokenIdentifier.KIND_NAME.equals(token.getKind())) {
+        LOG.trace("Getting token for service:{}", service);
+        return (Token<OzoneBlockTokenIdentifier>) token;
+      }
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf3d717/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/TokenVerifier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/TokenVerifier.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/TokenVerifier.java
new file mode 100644
index 0000000..d8170ab
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/TokenVerifier.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.security.token;
+
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Ozone GRPC token header verifier.
+ */
+public interface TokenVerifier {
+  /**
+   * Given a user and tokenStr header, return a UGI object with token if
+   * verified.
+   * @param user user of the request
+   * @param tokenStr token str of the request
+   * @return UGI
+   * @throws SCMSecurityException
+   */
+  UserGroupInformation verify(String user, String tokenStr)
+      throws SCMSecurityException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf3d717/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/package-info.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/package-info.java
new file mode 100644
index 0000000..885bed5
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains the block token related test classes.
+ */
+package org.apache.hadoop.hdds.security.token;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf3d717/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/SecurityConfig.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/SecurityConfig.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/SecurityConfig.java
index 9231c8a..97627ca 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/SecurityConfig.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/SecurityConfig.java
@@ -34,6 +34,8 @@ import java.time.Duration;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DEFAULT_KEY_ALGORITHM;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DEFAULT_KEY_LEN;
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DEFAULT_SECURITY_PROVIDER;
+import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_BLOCK_TOKEN_ENABLED;
+import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_BLOCK_TOKEN_ENABLED_DEFAULT;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_KEY_ALGORITHM;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_KEY_DIR_NAME;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_KEY_DIR_NAME_DEFAULT;
@@ -70,6 +72,7 @@ public class SecurityConfig {
   private final String publicKeyFileName;
   private final Duration certDuration;
   private final String x509SignatureAlgo;
+  private final Boolean grpcBlockTokenEnabled;
 
   /**
    * Constructs a SecurityConfig.
@@ -106,6 +109,10 @@ public class SecurityConfig {
     this.x509SignatureAlgo = this.configuration.get(HDDS_X509_SIGNATURE_ALGO,
         HDDS_X509_SIGNATURE_ALGO_DEFAULT);
 
+    this.grpcBlockTokenEnabled = this.configuration.getBoolean(
+        HDDS_GRPC_BLOCK_TOKEN_ENABLED,
+        HDDS_GRPC_BLOCK_TOKEN_ENABLED_DEFAULT);
+
     // First Startup -- if the provider is null, check for the provider.
     if (SecurityConfig.provider == null) {
       synchronized (SecurityConfig.class) {
@@ -213,6 +220,10 @@ public class SecurityConfig {
     return this.certDuration;
   }
 
+  public Boolean isGrpcBlockTokenEnabled() {
+    return this.grpcBlockTokenEnabled;
+  }
+
   /**
    * Adds a security provider dynamically if it is not loaded already.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf3d717/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateServer.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateServer.java
index 9332e5b..5b5deef 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateServer.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateServer.java
@@ -19,8 +19,8 @@
 
 package org.apache.hadoop.hdds.security.x509.certificate.authority;
 
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 import 
org.apache.hadoop.hdds.security.x509.certificates.CertificateSignRequest;
-import org.apache.hadoop.hdds.security.x509.exceptions.SCMSecurityException;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.bouncycastle.cert.X509CertificateHolder;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf3d717/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java
index 1b6f576..bfc0576 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java
@@ -91,6 +91,17 @@ public interface CertificateClient {
       X509Certificate cert);
 
   /**
+   * Verifies a digital Signature, given the signature and the certificate of
+   * the signer.
+   * @param data - Data in byte array.
+   * @param signature - Byte Array containing the signature.
+   * @param cert - Certificate of the Signer.
+   * @return true if verified, false if not.
+   */
+  boolean verifySignature(byte[] data, byte[] signature,
+      X509Certificate cert);
+
+  /**
    * Returns a CSR builder that can be used to creates a Certificate sigining
    * request.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf3d717/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificates/CertificateSignRequest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificates/CertificateSignRequest.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificates/CertificateSignRequest.java
index 2e1f9df..0e762a5 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificates/CertificateSignRequest.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificates/CertificateSignRequest.java
@@ -20,9 +20,9 @@ package org.apache.hadoop.hdds.security.x509.certificates;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.exceptions.CertificateException;
-import org.apache.hadoop.hdds.security.x509.exceptions.SCMSecurityException;
 import org.apache.hadoop.hdds.security.x509.keys.SecurityUtil;
 import org.apache.logging.log4j.util.Strings;
 import org.bouncycastle.asn1.DEROctetString;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf3d717/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificates/SelfSignedCertificate.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificates/SelfSignedCertificate.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificates/SelfSignedCertificate.java
index f221246..85fba9b 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificates/SelfSignedCertificate.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificates/SelfSignedCertificate.java
@@ -22,9 +22,9 @@ package org.apache.hadoop.hdds.security.x509.certificates;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.exceptions.CertificateException;
-import org.apache.hadoop.hdds.security.x509.exceptions.SCMSecurityException;
 import org.apache.hadoop.util.Time;
 import org.apache.logging.log4j.util.Strings;
 import org.bouncycastle.asn1.x500.X500Name;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf3d717/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/exceptions/CertificateException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/exceptions/CertificateException.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/exceptions/CertificateException.java
index 4d2dcfd..5a8bf86 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/exceptions/CertificateException.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/exceptions/CertificateException.java
@@ -19,6 +19,8 @@
 
 package org.apache.hadoop.hdds.security.x509.exceptions;
 
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+
 /**
  * Certificate Exceptions from the SCM Security layer.
  */
@@ -48,16 +50,4 @@ public class CertificateException extends 
SCMSecurityException {
   public CertificateException(Throwable cause) {
     super(cause);
   }
-
-  /**
-   * Ctor.
-   * @param message - Error Message
-   * @param cause  - Cause
-   * @param enableSuppression - Enable suppression.
-   * @param writableStackTrace - Writable stack trace.
-   */
-  public CertificateException(String message, Throwable cause,
-      boolean enableSuppression, boolean writableStackTrace) {
-    super(message, cause, enableSuppression, writableStackTrace);
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf3d717/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/exceptions/SCMSecurityException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/exceptions/SCMSecurityException.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/exceptions/SCMSecurityException.java
deleted file mode 100644
index e1f6296..0000000
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/exceptions/SCMSecurityException.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.hadoop.hdds.security.x509.exceptions;
-
-/**
- * Root Security Exception call for all Certificate related Execptions.
- */
-public class SCMSecurityException extends Exception {
-
-  /**
-   * Ctor.
-   * @param message - Error Message.
-   */
-  public SCMSecurityException(String message) {
-    super(message);
-  }
-
-  /**
-   * Ctor.
-   * @param message - Message.
-   * @param cause  - Actual cause.
-   */
-  public SCMSecurityException(String message, Throwable cause) {
-    super(message, cause);
-  }
-
-  /**
-   * Ctor.
-   * @param cause - Base Exception.
-   */
-  public SCMSecurityException(Throwable cause) {
-    super(cause);
-  }
-
-
-  /**
-   * Ctor.
-   * @param message - Error Message
-   * @param cause  - Cause
-   * @param enableSuppression - Enable suppression.
-   * @param writableStackTrace - Writable stack trace.
-   */
-  public SCMSecurityException(String message, Throwable cause,
-      boolean enableSuppression, boolean writableStackTrace) {
-    super(message, cause, enableSuppression, writableStackTrace);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf3d717/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index e7c354c..dad3bfe 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -19,6 +19,11 @@
 package org.apache.hadoop.ozone;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.ratis.thirdparty.io.grpc.Context;
+import org.apache.ratis.thirdparty.io.grpc.Metadata;
+
+import static 
org.apache.ratis.thirdparty.io.grpc.Metadata.ASCII_STRING_MARSHALLER;
 
 /**
  * Set of constants used in Ozone implementation.
@@ -231,4 +236,13 @@ public final class OzoneConsts {
   public static final String OM_METRICS_FILE = "omMetrics";
   public static final String OM_METRICS_TEMP_FILE = OM_METRICS_FILE + ".tmp";
 
+  // GRPC block token metadata header and context key
+  public static final String OZONE_BLOCK_TOKEN = "blocktoken";
+  public static final Context.Key<UserGroupInformation> UGI_CTX_KEY =
+      Context.key("UGI");
+
+  public static final Metadata.Key<String> OBT_METADATA_KEY =
+      Metadata.Key.of(OZONE_BLOCK_TOKEN, ASCII_STRING_MARSHALLER);
+  public static final Metadata.Key<String> USER_METADATA_KEY =
+      Metadata.Key.of(OZONE_USER, ASCII_STRING_MARSHALLER);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf3d717/hadoop-hdds/common/src/main/proto/hdds.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto 
b/hadoop-hdds/common/src/main/proto/hdds.proto
index 2592ddf..6f44dae 100644
--- a/hadoop-hdds/common/src/main/proto/hdds.proto
+++ b/hadoop-hdds/common/src/main/proto/hdds.proto
@@ -206,7 +206,7 @@ message BlockTokenSecretProto {
     required uint64 expiryDate = 3;
     required string omCertSerialId = 4;
     repeated AccessModeProto modes = 5;
-
+    required uint64 maxLength = 6;
 }
 
 message BlockID {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf3d717/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/token/TestOzoneBlockTokenIdentifier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/token/TestOzoneBlockTokenIdentifier.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/token/TestOzoneBlockTokenIdentifier.java
new file mode 100644
index 0000000..77a2cec
--- /dev/null
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/token/TestOzoneBlockTokenIdentifier.java
@@ -0,0 +1,313 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.security.token;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.InvalidKeyException;
+import java.security.KeyPair;
+import java.security.NoSuchAlgorithmException;
+import java.security.NoSuchProviderException;
+import java.security.PrivateKey;
+import java.security.Signature;
+import java.security.SignatureException;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateEncodingException;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import javax.crypto.KeyGenerator;
+import javax.crypto.Mac;
+import javax.crypto.SecretKey;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test class for OzoneManagerDelegationToken.
+ */
+public class TestOzoneBlockTokenIdentifier {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(TestOzoneBlockTokenIdentifier.class);
+  private static final String BASEDIR = GenericTestUtils
+      .getTempPath(TestOzoneBlockTokenIdentifier.class.getSimpleName());
+  private static final String KEYSTORES_DIR =
+      new File(BASEDIR).getAbsolutePath();
+  private static long expiryTime;
+  private static KeyPair keyPair;
+  private static X509Certificate cert;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    File base = new File(BASEDIR);
+    FileUtil.fullyDelete(base);
+    base.mkdirs();
+    expiryTime = Time.monotonicNow() + 60 * 60 * 24;
+
+    // Create Ozone Master key pair.
+    keyPair = KeyStoreTestUtil.generateKeyPair("RSA");
+    // Create Ozone Master certificate (SCM CA issued cert) and key store.
+    cert = KeyStoreTestUtil
+        .generateCertificate("CN=OzoneMaster", keyPair, 30, "SHA256withRSA");
+  }
+
+  @After
+  public void cleanUp() throws Exception {
+    // KeyStoreTestUtil.cleanupSSLConfig(KEYSTORES_DIR, sslConfsDir);
+  }
+
+  @Test
+  public void testSignToken() throws GeneralSecurityException, IOException {
+    String keystore = new File(KEYSTORES_DIR, "keystore.jks")
+        .getAbsolutePath();
+    String truststore = new File(KEYSTORES_DIR, "truststore.jks")
+        .getAbsolutePath();
+    String trustPassword = "trustPass";
+    String keyStorePassword = "keyStorePass";
+    String keyPassword = "keyPass";
+
+
+    KeyStoreTestUtil.createKeyStore(keystore, keyStorePassword, keyPassword,
+        "OzoneMaster", keyPair.getPrivate(), cert);
+
+    // Create trust store and put the certificate in the trust store
+    Map<String, X509Certificate> certs = Collections.singletonMap("server",
+        cert);
+    KeyStoreTestUtil.createTrustStore(truststore, trustPassword, certs);
+
+    // Sign the OzoneMaster Token with Ozone Master private key
+    PrivateKey privateKey = keyPair.getPrivate();
+    OzoneBlockTokenIdentifier tokenId = new OzoneBlockTokenIdentifier(
+        "testUser", "84940",
+        EnumSet.allOf(HddsProtos.BlockTokenSecretProto.AccessModeProto.class),
+        expiryTime, cert.getSerialNumber().toString(), 128L);
+    byte[] signedToken = signTokenAsymmetric(tokenId, privateKey);
+
+    // Verify a valid signed OzoneMaster Token with Ozone Master
+    // public key(certificate)
+    boolean isValidToken = verifyTokenAsymmetric(tokenId, signedToken, cert);
+    LOG.info("{} is {}", tokenId, isValidToken ? "valid." : "invalid.");
+
+    // Verify an invalid signed OzoneMaster Token with Ozone Master
+    // public key(certificate)
+    tokenId = new OzoneBlockTokenIdentifier("", "",
+        EnumSet.allOf(HddsProtos.BlockTokenSecretProto.AccessModeProto.class),
+        expiryTime, cert.getSerialNumber().toString(), 128L);
+    LOG.info("Unsigned token {} is {}", tokenId,
+        verifyTokenAsymmetric(tokenId, RandomUtils.nextBytes(128), cert));
+
+  }
+
+  @Test
+  public void testTokenSerialization() throws GeneralSecurityException,
+      IOException {
+    String keystore = new File(KEYSTORES_DIR, "keystore.jks")
+        .getAbsolutePath();
+    String truststore = new File(KEYSTORES_DIR, "truststore.jks")
+        .getAbsolutePath();
+    String trustPassword = "trustPass";
+    String keyStorePassword = "keyStorePass";
+    String keyPassword = "keyPass";
+    long maxLength = 128L;
+
+    KeyStoreTestUtil.createKeyStore(keystore, keyStorePassword, keyPassword,
+        "OzoneMaster", keyPair.getPrivate(), cert);
+
+    // Create trust store and put the certificate in the trust store
+    Map<String, X509Certificate> certs = Collections.singletonMap("server",
+        cert);
+    KeyStoreTestUtil.createTrustStore(truststore, trustPassword, certs);
+
+    // Sign the OzoneMaster Token with Ozone Master private key
+    PrivateKey privateKey = keyPair.getPrivate();
+    OzoneBlockTokenIdentifier tokenId = new OzoneBlockTokenIdentifier(
+        "testUser", "84940",
+        EnumSet.allOf(HddsProtos.BlockTokenSecretProto.AccessModeProto.class),
+        expiryTime, cert.getSerialNumber().toString(), maxLength);
+    byte[] signedToken = signTokenAsymmetric(tokenId, privateKey);
+
+
+    Token<BlockTokenIdentifier> token = new Token(tokenId.getBytes(),
+        signedToken, tokenId.getKind(), new Text("host:port"));
+
+    String encodeToUrlString = token.encodeToUrlString();
+
+    Token<BlockTokenIdentifier>decodedToken = new Token();
+    decodedToken.decodeFromUrlString(encodeToUrlString);
+
+    OzoneBlockTokenIdentifier decodedTokenId = new OzoneBlockTokenIdentifier();
+    decodedTokenId.readFields(new DataInputStream(
+        new ByteArrayInputStream(decodedToken.getIdentifier())));
+
+    Assert.assertEquals(decodedTokenId, tokenId);
+    Assert.assertEquals(decodedTokenId.getMaxLength(), maxLength);
+
+    // Verify a decoded signed Token with public key(certificate)
+    boolean isValidToken = verifyTokenAsymmetric(decodedTokenId, decodedToken
+        .getPassword(), cert);
+    LOG.info("{} is {}", tokenId, isValidToken ? "valid." : "invalid.");
+  }
+
+
+  public byte[] signTokenAsymmetric(OzoneBlockTokenIdentifier tokenId,
+      PrivateKey privateKey) throws NoSuchAlgorithmException,
+      InvalidKeyException, SignatureException {
+    Signature rsaSignature = Signature.getInstance("SHA256withRSA");
+    rsaSignature.initSign(privateKey);
+    rsaSignature.update(tokenId.getBytes());
+    byte[] signature = rsaSignature.sign();
+    return signature;
+  }
+
+  public boolean verifyTokenAsymmetric(OzoneBlockTokenIdentifier tokenId,
+      byte[] signature, Certificate certificate) throws InvalidKeyException,
+      NoSuchAlgorithmException, SignatureException {
+    Signature rsaSignature = Signature.getInstance("SHA256withRSA");
+    rsaSignature.initVerify(certificate);
+    rsaSignature.update(tokenId.getBytes());
+    boolean isValid = rsaSignature.verify(signature);
+    return isValid;
+  }
+
+  private byte[] signTokenSymmetric(OzoneBlockTokenIdentifier identifier,
+      Mac mac, SecretKey key) {
+    try {
+      mac.init(key);
+    } catch (InvalidKeyException ike) {
+      throw new IllegalArgumentException("Invalid key to HMAC computation",
+          ike);
+    }
+    return mac.doFinal(identifier.getBytes());
+  }
+
+  OzoneBlockTokenIdentifier generateTestToken() {
+    return new OzoneBlockTokenIdentifier(RandomStringUtils.randomAlphabetic(6),
+        RandomStringUtils.randomAlphabetic(5),
+        EnumSet.allOf(HddsProtos.BlockTokenSecretProto.AccessModeProto.class),
+        expiryTime, cert.getSerialNumber().toString(), 1024768L);
+  }
+
+  @Test
+  public void testAsymmetricTokenPerf() throws NoSuchAlgorithmException,
+      CertificateEncodingException, NoSuchProviderException,
+      InvalidKeyException, SignatureException {
+    final int testTokenCount = 1000;
+    List<OzoneBlockTokenIdentifier> tokenIds = new ArrayList<>();
+    List<byte[]> tokenPasswordAsym = new ArrayList<>();
+    for (int i = 0; i < testTokenCount; i++) {
+      tokenIds.add(generateTestToken());
+    }
+
+    KeyPair kp = KeyStoreTestUtil.generateKeyPair("RSA");
+
+    // Create Ozone Master certificate (SCM CA issued cert) and key store
+    X509Certificate certificate;
+    certificate = KeyStoreTestUtil.generateCertificate("CN=OzoneMaster",
+        kp, 30, "SHA256withRSA");
+
+    long startTime = Time.monotonicNowNanos();
+    for (int i = 0; i < testTokenCount; i++) {
+      tokenPasswordAsym.add(
+          signTokenAsymmetric(tokenIds.get(i), kp.getPrivate()));
+    }
+    long duration = Time.monotonicNowNanos() - startTime;
+    LOG.info("Average token sign time with HmacSha256(RSA/1024 key) is {} ns",
+        duration / testTokenCount);
+
+    startTime = Time.monotonicNowNanos();
+    for (int i = 0; i < testTokenCount; i++) {
+      verifyTokenAsymmetric(tokenIds.get(i), tokenPasswordAsym.get(i),
+          certificate);
+    }
+    duration = Time.monotonicNowNanos() - startTime;
+    LOG.info("Average token verify time with HmacSha256(RSA/1024 key) "
+        + "is {} ns", duration / testTokenCount);
+  }
+
+  @Test
+  public void testSymmetricTokenPerf() {
+    String hmacSHA1 = "HmacSHA1";
+    String hmacSHA256 = "HmacSHA256";
+
+    testSymmetricTokenPerfHelper(hmacSHA1, 64);
+    testSymmetricTokenPerfHelper(hmacSHA256, 1024);
+  }
+
+  public void testSymmetricTokenPerfHelper(String hmacAlgorithm, int keyLen) {
+    final int testTokenCount = 1000;
+    List<OzoneBlockTokenIdentifier> tokenIds = new ArrayList<>();
+    List<byte[]> tokenPasswordSym = new ArrayList<>();
+    for (int i = 0; i < testTokenCount; i++) {
+      tokenIds.add(generateTestToken());
+    }
+
+    KeyGenerator keyGen;
+    try {
+      keyGen = KeyGenerator.getInstance(hmacAlgorithm);
+      keyGen.init(keyLen);
+    } catch (NoSuchAlgorithmException nsa) {
+      throw new IllegalArgumentException("Can't find " + hmacAlgorithm +
+          " algorithm.");
+    }
+
+    Mac mac;
+    try {
+      mac = Mac.getInstance(hmacAlgorithm);
+    } catch (NoSuchAlgorithmException nsa) {
+      throw new IllegalArgumentException("Can't find " + hmacAlgorithm +
+          " algorithm.");
+    }
+
+    SecretKey secretKey = keyGen.generateKey();
+
+    long startTime = Time.monotonicNowNanos();
+    for (int i = 0; i < testTokenCount; i++) {
+      tokenPasswordSym.add(
+          signTokenSymmetric(tokenIds.get(i), mac, secretKey));
+    }
+    long duration = Time.monotonicNowNanos() - startTime;
+    LOG.info("Average token sign time with {}({} symmetric key) is {} ns",
+        hmacAlgorithm, keyLen, duration / testTokenCount);
+  }
+
+  // TODO: verify certificate with a trust store
+  public boolean verifyCert(Certificate certificate) {
+    return true;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf3d717/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/token/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/token/package-info.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/token/package-info.java
new file mode 100644
index 0000000..d056655
--- /dev/null
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/token/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains the block token related classes.
+ */
+package org.apache.hadoop.hdds.security.token;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf3d717/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificates/TestCertificateSignRequest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificates/TestCertificateSignRequest.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificates/TestCertificateSignRequest.java
index 0b9ef31..e8de1ea 100644
--- 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificates/TestCertificateSignRequest.java
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificates/TestCertificateSignRequest.java
@@ -18,8 +18,8 @@
 package org.apache.hadoop.hdds.security.x509.certificates;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
-import org.apache.hadoop.hdds.security.x509.exceptions.SCMSecurityException;
 import org.apache.hadoop.hdds.security.x509.keys.HDDSKeyGenerator;
 import org.apache.hadoop.hdds.security.x509.keys.SecurityUtil;
 import org.bouncycastle.asn1.ASN1Sequence;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf3d717/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificates/TestRootCertificate.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificates/TestRootCertificate.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificates/TestRootCertificate.java
index 3a00ca0..16245a6 100644
--- 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificates/TestRootCertificate.java
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificates/TestRootCertificate.java
@@ -20,8 +20,8 @@
 package org.apache.hadoop.hdds.security.x509.certificates;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
-import org.apache.hadoop.hdds.security.x509.exceptions.SCMSecurityException;
 import org.apache.hadoop.hdds.security.x509.keys.HDDSKeyGenerator;
 import org.bouncycastle.asn1.x509.Extension;
 import org.bouncycastle.cert.X509CertificateHolder;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf3d717/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ServerCredentialInterceptor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ServerCredentialInterceptor.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ServerCredentialInterceptor.java
new file mode 100644
index 0000000..968f0c8
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ServerCredentialInterceptor.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.transport.server;
+
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+import org.apache.hadoop.hdds.security.token.TokenVerifier;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.ratis.thirdparty.io.grpc.Context;
+import org.apache.ratis.thirdparty.io.grpc.Contexts;
+import org.apache.ratis.thirdparty.io.grpc.Metadata;
+import org.apache.ratis.thirdparty.io.grpc.ServerCall;
+import org.apache.ratis.thirdparty.io.grpc.ServerCallHandler;
+import org.apache.ratis.thirdparty.io.grpc.ServerInterceptor;
+import org.apache.ratis.thirdparty.io.grpc.Status;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OBT_METADATA_KEY;
+import static org.apache.hadoop.ozone.OzoneConsts.USER_METADATA_KEY;
+import static org.apache.hadoop.ozone.OzoneConsts.UGI_CTX_KEY;
+/**
+ * Grpc Server Interceptor for Ozone Block token.
+ */
+public class ServerCredentialInterceptor implements ServerInterceptor {
+
+
+  private static final ServerCall.Listener NOOP_LISTENER =
+      new ServerCall.Listener() {
+  };
+
+  private final TokenVerifier verifier;
+
+  ServerCredentialInterceptor(TokenVerifier verifier) {
+    this.verifier = verifier;
+  }
+
+  @Override
+  public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
+      ServerCall<ReqT, RespT> call, Metadata headers,
+      ServerCallHandler<ReqT, RespT> next) {
+    String token = headers.get(OBT_METADATA_KEY);
+    String user = headers.get(USER_METADATA_KEY);
+    Context ctx = Context.current();
+    try {
+      UserGroupInformation ugi = verifier.verify(user, token);
+      if (ugi == null) {
+        call.close(Status.UNAUTHENTICATED.withDescription("Missing Block " +
+            "Token from headers when block token is required."), headers);
+        return NOOP_LISTENER;
+      } else {
+        ctx = ctx.withValue(UGI_CTX_KEY, ugi);
+      }
+    } catch (SCMSecurityException e) {
+      call.close(Status.UNAUTHENTICATED.withDescription(e.getMessage())
+          .withCause(e), headers);
+      return NOOP_LISTENER;
+    }
+    return Contexts.interceptCall(ctx, call, headers, next);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf3d717/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
index 4f7799d..2a3c99b 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ozone.container.common.transport.server;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -30,6 +31,9 @@ import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.container.common.helpers.
     StorageContainerException;
+import org.apache.hadoop.hdds.security.token.BlockTokenVerifier;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
@@ -37,6 +41,7 @@ import 
org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
 import org.apache.ratis.thirdparty.io.grpc.BindableService;
 import org.apache.ratis.thirdparty.io.grpc.Server;
 import org.apache.ratis.thirdparty.io.grpc.ServerBuilder;
+import org.apache.ratis.thirdparty.io.grpc.ServerInterceptors;
 import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -92,8 +97,16 @@ public final class XceiverServerGrpc implements 
XceiverServerSpi {
         DatanodeDetails.newPort(DatanodeDetails.Port.Name.STANDALONE, port));
     NettyServerBuilder nettyServerBuilder =
         ((NettyServerBuilder) ServerBuilder.forPort(port))
-            .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE)
-            .addService(new GrpcXceiverService(dispatcher));
+            .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE);
+
+    // Populate UGI context via ServerCredentialInterceptor
+    SecurityConfig secConfig = new SecurityConfig(conf);
+    ServerCredentialInterceptor credInterceptor =
+        new ServerCredentialInterceptor(
+            new BlockTokenVerifier(secConfig, getCaClient()));
+    nettyServerBuilder.addService(ServerInterceptors.intercept(
+          new GrpcXceiverService(dispatcher), credInterceptor));
+
     for (BindableService service : additionalServices) {
       nettyServerBuilder.addService(service);
     }
@@ -101,6 +114,12 @@ public final class XceiverServerGrpc implements 
XceiverServerSpi {
     storageContainer = dispatcher;
   }
 
+  @VisibleForTesting
+  public CertificateClient getCaClient() {
+    // TODO: instantiate CertificateClient
+    return null;
+  }
+
   @Override
   public int getIPCPort() {
     return this.port;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf3d717/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneBlockTokenIdentifier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneBlockTokenIdentifier.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneBlockTokenIdentifier.java
deleted file mode 100644
index 92b6d16..0000000
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneBlockTokenIdentifier.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.security;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto;
-import 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto;
-import 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.Builder;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.TokenIdentifier;
-
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.EnumSet;
-
-/**
- * Block token identifier for Ozone/HDDS. Ozone block access token is similar
- * to HDFS block access token, which is meant to be lightweight and
- * short-lived. No need to renew or revoke a block access token. when a
- * cached block access token expires, the client simply get a new one.
- * Block access token should be cached only in memory and never write to disk.
- */
[email protected]
-public class OzoneBlockTokenIdentifier extends TokenIdentifier {
-
-  static final Text KIND_NAME = new Text("HDDS_BLOCK_TOKEN");
-  private long expiryDate;
-  private String ownerId;
-  private String blockId;
-  private final EnumSet<AccessModeProto> modes;
-  private final String omCertSerialId;
-
-  public OzoneBlockTokenIdentifier(String ownerId, String blockId,
-      EnumSet<AccessModeProto> modes, long expiryDate, String omCertSerialId) {
-    this.ownerId = ownerId;
-    this.blockId = blockId;
-    this.expiryDate = expiryDate;
-    this.modes = modes == null ? EnumSet.noneOf(AccessModeProto.class) : modes;
-    this.omCertSerialId = omCertSerialId;
-  }
-
-  @Override
-  public UserGroupInformation getUser() {
-    if (this.getOwnerId() == null || "".equals(this.getOwnerId())) {
-      return UserGroupInformation.createRemoteUser(blockId);
-    }
-    return UserGroupInformation.createRemoteUser(ownerId);
-  }
-
-  public long getExpiryDate() {
-    return expiryDate;
-  }
-
-  public String getOwnerId() {
-    return ownerId;
-  }
-
-  public String getBlockId() {
-    return blockId;
-  }
-
-  public EnumSet<AccessModeProto> getAccessModes() {
-    return modes;
-  }
-
-  public String getOmCertSerialId(){
-    return omCertSerialId;
-  }
-
-  @Override
-  public Text getKind() {
-    return KIND_NAME;
-  }
-
-  @Override
-  public String toString() {
-    return "block_token_identifier (expiryDate=" + this.getExpiryDate()
-        + ", ownerId=" + this.getOwnerId()
-        + ", omCertSerialId=" + this.getOmCertSerialId()
-        + ", blockId=" + this.getBlockId() + ", access modes="
-        + this.getAccessModes() + ")";
-  }
-
-  static boolean isEqual(Object a, Object b) {
-    return a == null ? b == null : a.equals(b);
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (obj == this) {
-      return true;
-    }
-
-    if (obj instanceof OzoneBlockTokenIdentifier) {
-      OzoneBlockTokenIdentifier that = (OzoneBlockTokenIdentifier) obj;
-      return new EqualsBuilder()
-          .append(this.expiryDate, that.expiryDate)
-          .append(this.ownerId, that.ownerId)
-          .append(this.blockId, that.blockId)
-          .append(this.modes, that.modes)
-          .append(this.omCertSerialId, that.omCertSerialId)
-          .build();
-    }
-    return false;
-  }
-
-  @Override
-  public int hashCode() {
-    return new HashCodeBuilder(133, 567)
-        .append(this.expiryDate)
-        .append(this.blockId)
-        .append(this.ownerId)
-        .append(this.modes)
-        .append(this.omCertSerialId)
-        .build();
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    final DataInputStream dis = (DataInputStream) in;
-    if (!dis.markSupported()) {
-      throw new IOException("Could not peek first byte.");
-    }
-    readFieldsProtobuf(dis);
-  }
-
-  @VisibleForTesting
-  public static OzoneBlockTokenIdentifier readFieldsProtobuf(DataInput in)
-      throws IOException {
-    BlockTokenSecretProto tokenPtoto =
-        BlockTokenSecretProto.parseFrom((DataInputStream) in);
-    return new OzoneBlockTokenIdentifier(tokenPtoto.getOwnerId(),
-        tokenPtoto.getBlockId(), EnumSet.copyOf(tokenPtoto.getModesList()),
-        tokenPtoto.getExpiryDate(), tokenPtoto.getOmCertSerialId());
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    writeProtobuf(out);
-  }
-
-  @VisibleForTesting
-  void writeProtobuf(DataOutput out) throws IOException {
-    Builder builder = BlockTokenSecretProto.newBuilder()
-        .setBlockId(this.getBlockId())
-        .setOwnerId(this.getOwnerId())
-        .setOmCertSerialId(this.getOmCertSerialId())
-        .setExpiryDate(this.getExpiryDate());
-    // Add access mode allowed
-    for (AccessModeProto mode : this.getAccessModes()) {
-      builder.addModes(AccessModeProto.valueOf(mode.name()));
-    }
-    out.write(builder.build().toByteArray());
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aaf3d717/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneBlockTokenSelector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneBlockTokenSelector.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneBlockTokenSelector.java
deleted file mode 100644
index 086e19b..0000000
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneBlockTokenSelector.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.security;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.token.TokenSelector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collection;
-
-/**
- * A block token selector for Ozone.
- */
[email protected]
-public class OzoneBlockTokenSelector implements
-    TokenSelector<OzoneBlockTokenIdentifier> {
-
-  private static final Logger LOG = LoggerFactory
-      .getLogger(OzoneBlockTokenSelector.class);
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public Token<OzoneBlockTokenIdentifier> selectToken(Text service,
-      Collection<Token<? extends TokenIdentifier>> tokens) {
-    if (service == null) {
-      return null;
-    }
-    for (Token<? extends TokenIdentifier> token : tokens) {
-      if (OzoneBlockTokenIdentifier.KIND_NAME.equals(token.getKind())) {
-        LOG.trace("Getting token for service:{}", service);
-        return (Token<OzoneBlockTokenIdentifier>) token;
-      }
-    }
-    return null;
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to