This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch HDDS-7733-Symmetric-Tokens
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-7733-Symmetric-Tokens by 
this push:
     new 2f0d930505 HDDS-8164. Authorize secret key APIs (#4597)
2f0d930505 is described below

commit 2f0d930505e41844c8826d4f0bdd79cd8c22d0e2
Author: Duong Nguyen <[email protected]>
AuthorDate: Fri May 12 15:31:07 2023 -0700

    HDDS-8164. Authorize secret key APIs (#4597)
---
 .../org/apache/hadoop/hdds/HddsConfigKeys.java     |  12 +
 .../security/exception/SCMSecretKeyException.java  |  47 ++++
 .../security/exception/SCMSecurityException.java   |   4 +-
 .../common/src/main/resources/ozone-default.xml    |  27 ++
 .../apache/hadoop/ozone/HddsDatanodeService.java   |   6 +-
 .../hadoop/hdds/protocol/SCMSecurityProtocol.java  |  22 --
 .../hadoop/hdds/protocol/SecretKeyProtocol.java    |  55 ++++
 .../hdds/protocol/SecretKeyProtocolDatanode.java   |  34 +++
 .../hadoop/hdds/protocol/SecretKeyProtocolOm.java  |  32 +++
 .../hadoop/hdds/protocol/SecretKeyProtocolScm.java |  31 +++
 .../SCMSecurityProtocolClientSideTranslatorPB.java |  37 ---
 .../SecretKeyProtocolClientSideTranslatorPB.java   | 165 +++++++++++
 .../protocolPB/SecretKeyProtocolDatanodePB.java    |  40 +++
 .../hdds/protocolPB/SecretKeyProtocolOmPB.java     |  39 +++
 .../hdds/protocolPB/SecretKeyProtocolScmPB.java    |  38 +++
 .../SecretKeyProtocolFailoverProxyProvider.java    | 303 +++++++++++++++++++++
 ...a => SingleSecretKeyProtocolProxyProvider.java} |  19 +-
 .../security/symmetric/DefaultSecretKeyClient.java |  12 +-
 .../symmetric/DefaultSecretKeySignerClient.java    |  19 +-
 .../symmetric/DefaultSecretKeyVerifierClient.java  |  12 +-
 .../hdds/security/symmetric/ManagedSecretKey.java  |   8 +-
 .../apache/hadoop/hdds/utils/HddsServerUtil.java   |  69 ++++-
 .../src/main/proto/ScmSecretKeyProtocol.proto      | 111 ++++++++
 .../src/main/proto/ScmServerSecurityProtocol.proto |  42 +--
 .../hadoop/hdds/scm/ha/SCMHAManagerImpl.java       |  10 +-
 .../hdds/scm/ha/io/ManagedSecretKeyCodec.java      |   6 +-
 .../SCMSecurityProtocolServerSideTranslatorPB.java |  50 ----
 .../SecretKeyProtocolServerSideTranslatorPB.java   | 165 +++++++++++
 .../hadoop/hdds/scm/server/SCMPolicyProvider.java  |  15 +
 .../hdds/scm/server/SCMSecurityProtocolServer.java |  46 +++-
 .../org/apache/hadoop/ozone/TestSecretKeysApi.java | 125 ++++++---
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |   5 +-
 32 files changed, 1337 insertions(+), 269 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
index 4a55d6709b..4615db5017 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
@@ -287,6 +287,18 @@ public final class HddsConfigKeys {
   public static final String HDDS_SECURITY_CLIENT_SCM_CERTIFICATE_PROTOCOL_ACL 
=
       "hdds.security.client.scm.certificate.protocol.acl";
 
+  public static final String
+      HDDS_SECURITY_CLIENT_SCM_SECRET_KEY_OM_PROTOCOL_ACL =
+      "hdds.security.client.scm.secretkey.om.protocol.acl";
+
+  public static final String
+      HDDS_SECURITY_CLIENT_SCM_SECRET_KEY_SCM_PROTOCOL_ACL =
+      "hdds.security.client.scm.secretkey.scm.protocol.acl";
+
+  public static final String
+      HDDS_SECURITY_CLIENT_SCM_SECRET_KEY_DATANODE_PROTOCOL_ACL =
+      "hdds.security.client.scm.secretkey.datanode.protocol.acl";
+
   // Determines if the Container Chunk Manager will write user data to disk
   // Set to false only for specific performance tests
   public static final String HDDS_CONTAINER_PERSISTDATA =
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecretKeyException.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecretKeyException.java
new file mode 100644
index 0000000000..2b2b251869
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecretKeyException.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.security.exception;
+
+import java.io.IOException;
+
+/**
+ * Exception for all secret key related errors.
+ */
+public class SCMSecretKeyException extends IOException {
+  private final ErrorCode errorCode;
+
+  public SCMSecretKeyException(String message, ErrorCode errorCode) {
+    super(message);
+    this.errorCode = errorCode;
+  }
+
+  public ErrorCode getErrorCode() {
+    return errorCode;
+  }
+
+  /**
+   * Error codes to make it easy to decode these exceptions.
+   */
+  public enum ErrorCode {
+    OK,
+    INTERNAL_ERROR,
+    SECRET_KEY_NOT_ENABLED,
+    SECRET_KEY_NOT_INITIALIZED
+  }
+}
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecurityException.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecurityException.java
index 13b8395391..7e008afc41 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecurityException.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecurityException.java
@@ -109,8 +109,6 @@ public class SCMSecurityException extends IOException {
     MISSING_BLOCK_TOKEN,
     BLOCK_TOKEN_VERIFICATION_FAILED,
     GET_ROOT_CA_CERT_FAILED,
-    NOT_A_PRIMARY_SCM,
-    SECRET_KEY_NOT_ENABLED,
-    SECRET_KEY_NOT_INITIALIZED
+    NOT_A_PRIMARY_SCM
   }
 }
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 848b21e707..01a527130d 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -2484,6 +2484,33 @@
       client scm container protocol.
     </description>
   </property>
+  <property>
+    <name>hdds.security.client.scm.secretkey.om.protocol.acl</name>
+    <value>*</value>
+    <tag>SECURITY</tag>
+    <description>
+      Comma separated list of users and groups allowed to access
+      client scm secret key protocol for om.
+    </description>
+  </property>
+  <property>
+    <name>hdds.security.client.scm.secretkey.scm.protocol.acl</name>
+    <value>*</value>
+    <tag>SECURITY</tag>
+    <description>
+      Comma separated list of users and groups allowed to access
+      client scm secret key protocol for om.
+    </description>
+  </property>
+  <property>
+    <name>hdds.security.client.scm.secretkey.datanode.protocol.acl</name>
+    <value>*</value>
+    <tag>SECURITY</tag>
+    <description>
+      Comma separated list of users and groups allowed to access
+      client scm secret key protocol for datanodes.
+    </description>
+  </property>
   <property>
     <name>ozone.om.security.client.protocol.acl</name>
     <value>*</value>
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
index 16a2e219ae..f9dab4b455 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.datanode.metadata.DatanodeCRLStore;
 import org.apache.hadoop.hdds.datanode.metadata.DatanodeCRLStoreImpl;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.SecretKeyProtocol;
 import org.apache.hadoop.hdds.security.symmetric.DefaultSecretKeyClient;
 import org.apache.hadoop.hdds.security.symmetric.SecretKeyClient;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
@@ -298,7 +299,10 @@ public class HddsDatanodeService extends GenericCli 
implements ServicePlugin {
         dnCertClient = initializeCertificateClient(dnCertClient);
 
         if (secConf.isTokenEnabled()) {
-          secretKeyClient = DefaultSecretKeyClient.create(conf);
+          SecretKeyProtocol secretKeyProtocol =
+              HddsServerUtil.getSecretKeyClientForDatanode(conf);
+          secretKeyClient = DefaultSecretKeyClient.create(conf,
+              secretKeyProtocol);
           secretKeyClient.start(conf);
         }
       }
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocol/SCMSecurityProtocol.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocol/SCMSecurityProtocol.java
index 1cfe568d8a..2354a9b8d8 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocol/SCMSecurityProtocol.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocol/SCMSecurityProtocol.java
@@ -18,7 +18,6 @@ package org.apache.hadoop.hdds.protocol;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.UUID;
 
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -27,7 +26,6 @@ import 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.OzoneManagerDetailsProto
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmNodeDetailsProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeDetailsProto;
 import org.apache.hadoop.hdds.scm.ScmConfig;
-import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
 import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
 import org.apache.hadoop.security.KerberosInfo;
 
@@ -173,24 +171,4 @@ public interface SCMSecurityProtocol {
   String getCertificate(NodeDetailsProto nodeDetails,
       String certSignReq) throws IOException;
 
-
-  /**
-   * Get the current SecretKey that is used for signing tokens.
-   * @return ManagedSecretKey
-   */
-  ManagedSecretKey getCurrentSecretKey() throws IOException;
-
-  /**
-   * Get a particular SecretKey by ID.
-   *
-   * @param id the id to get SecretKey.
-   * @return ManagedSecretKey.
-   */
-  ManagedSecretKey getSecretKey(UUID id) throws IOException;
-
-  /**
-   * Get all the non-expired SecretKey managed by SCM.
-   * @return list of ManagedSecretKey.
-   */
-  List<ManagedSecretKey> getAllSecretKeys() throws IOException;
 }
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocol/SecretKeyProtocol.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocol/SecretKeyProtocol.java
new file mode 100644
index 0000000000..ec12efab53
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocol/SecretKeyProtocol.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.protocol;
+
+import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.hdds.scm.ScmConfig;
+import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
+import org.apache.hadoop.security.KerberosInfo;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * The protocol used to expose secret keys in SCM.
+ */
+@KerberosInfo(
+    serverPrincipal = ScmConfig.ConfigStrings.HDDS_SCM_KERBEROS_PRINCIPAL_KEY)
[email protected]
+public interface SecretKeyProtocol {
+
+  /**
+   * Get the current SecretKey that is used for signing tokens.
+   * @return ManagedSecretKey
+   */
+  ManagedSecretKey getCurrentSecretKey() throws IOException;
+
+  /**
+   * Get a particular SecretKey by ID.
+   *
+   * @param id the id to get SecretKey.
+   * @return ManagedSecretKey.
+   */
+  ManagedSecretKey getSecretKey(UUID id) throws IOException;
+
+  /**
+   * Get all the non-expired SecretKey managed by SCM.
+   * @return list of ManagedSecretKey.
+   */
+  List<ManagedSecretKey> getAllSecretKeys() throws IOException;
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocol/SecretKeyProtocolDatanode.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocol/SecretKeyProtocolDatanode.java
new file mode 100644
index 0000000000..433c27bd8a
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocol/SecretKeyProtocolDatanode.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.protocol;
+
+import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.security.KerberosInfo;
+
+import static 
org.apache.hadoop.hdds.scm.ScmConfig.ConfigStrings.HDDS_SCM_KERBEROS_PRINCIPAL_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
+
+/**
+ * The client protocol to access secret key from Datanode.
+ */
+@KerberosInfo(
+    serverPrincipal = HDDS_SCM_KERBEROS_PRINCIPAL_KEY,
+    clientPrincipal = DFS_DATANODE_KERBEROS_PRINCIPAL_KEY
+)
[email protected]
+public interface SecretKeyProtocolDatanode extends SecretKeyProtocol {
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocol/SecretKeyProtocolOm.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocol/SecretKeyProtocolOm.java
new file mode 100644
index 0000000000..9eebaecfe3
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocol/SecretKeyProtocolOm.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.protocol;
+
+import org.apache.hadoop.security.KerberosInfo;
+
+import static 
org.apache.hadoop.hdds.scm.ScmConfig.ConfigStrings.HDDS_SCM_KERBEROS_PRINCIPAL_KEY;
+
+/**
+ * The client protocol to access secret key from OM.
+ */
+@KerberosInfo(
+    serverPrincipal = HDDS_SCM_KERBEROS_PRINCIPAL_KEY,
+    // TODO: move OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY to hdds-common.
+    clientPrincipal = "ozone.om.kerberos.principal"
+)
+public interface SecretKeyProtocolOm extends SecretKeyProtocol {
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocol/SecretKeyProtocolScm.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocol/SecretKeyProtocolScm.java
new file mode 100644
index 0000000000..9439c2ede0
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocol/SecretKeyProtocolScm.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.protocol;
+
+import org.apache.hadoop.security.KerberosInfo;
+
+import static 
org.apache.hadoop.hdds.scm.ScmConfig.ConfigStrings.HDDS_SCM_KERBEROS_PRINCIPAL_KEY;
+
+/**
+ * The client protocol to access secret key from SCM.
+ */
+@KerberosInfo(
+    serverPrincipal = HDDS_SCM_KERBEROS_PRINCIPAL_KEY,
+    clientPrincipal = HDDS_SCM_KERBEROS_PRINCIPAL_KEY
+)
+public interface SecretKeyProtocolScm extends SecretKeyProtocol {
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java
index ab09061c44..d40551e71c 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java
@@ -22,9 +22,7 @@ import java.security.cert.CRLException;
 import java.security.cert.CertificateException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.UUID;
 import java.util.function.Consumer;
-import java.util.stream.Collectors;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
@@ -42,8 +40,6 @@ import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCer
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCrlsRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetDataNodeCertRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertRequestProto;
-import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetSecretKeyRequest;
-import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetSecretKeyResponse;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMListCACertificateRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetLatestCrlIdRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMListCertificateRequestProto;
@@ -55,7 +51,6 @@ import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMSecuri
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.Type;
 import 
org.apache.hadoop.hdds.scm.proxy.SCMSecurityProtocolFailoverProxyProvider;
 import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
-import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
 import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
 import org.apache.hadoop.io.retry.RetryProxy;
@@ -195,38 +190,6 @@ public class SCMSecurityProtocolClientSideTranslatorPB 
implements
         .getX509Certificate();
   }
 
-  @Override
-  public ManagedSecretKey getCurrentSecretKey() throws IOException {
-    SCMSecurityProtocolProtos.ManagedSecretKey secretKeyProto =
-        submitRequest(Type.GetCurrentSecretKey, builder -> {
-        }).getCurrentSecretKeyResponseProto().getSecretKey();
-    return ManagedSecretKey.fromProtobuf(secretKeyProto);
-  }
-
-  @Override
-  public ManagedSecretKey getSecretKey(UUID id) throws IOException {
-    SCMGetSecretKeyRequest request = SCMGetSecretKeyRequest.newBuilder()
-        .setSecretKeyId(HddsProtos.UUID.newBuilder()
-            .setMostSigBits(id.getMostSignificantBits())
-            .setLeastSigBits(id.getLeastSignificantBits())).build();
-    SCMGetSecretKeyResponse response = submitRequest(Type.GetSecretKey,
-        builder -> builder.setGetSecretKeyRequest(request))
-        .getGetSecretKeyResponseProto();
-
-    return response.hasSecretKey() ?
-        ManagedSecretKey.fromProtobuf(response.getSecretKey()) : null;
-  }
-
-  @Override
-  public List<ManagedSecretKey> getAllSecretKeys() throws IOException {
-    List<SCMSecurityProtocolProtos.ManagedSecretKey> secretKeysList =
-        submitRequest(Type.GetAllSecretKeys, builder -> {
-        }).getSecretKeysListResponseProto().getSecretKeysList();
-    return secretKeysList.stream()
-        .map(ManagedSecretKey::fromProtobuf)
-        .collect(Collectors.toList());
-  }
-
   /**
    * Get signed certificate for SCM node.
    *
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SecretKeyProtocolClientSideTranslatorPB.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SecretKeyProtocolClientSideTranslatorPB.java
new file mode 100644
index 0000000000..a0206555e3
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SecretKeyProtocolClientSideTranslatorPB.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.protocolPB;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hdds.protocol.SecretKeyProtocol;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos;
+import 
org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.SCMGetSecretKeyRequest;
+import 
org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.SCMGetSecretKeyResponse;
+import 
org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.SCMSecretKeyProtocolService.BlockingInterface;
+import 
org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.SCMSecretKeyRequest;
+import 
org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.SCMSecretKeyRequest.Builder;
+import 
org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.SCMSecretKeyResponse;
+import org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.Type;
+import org.apache.hadoop.hdds.scm.proxy.SecretKeyProtocolFailoverProxyProvider;
+import org.apache.hadoop.hdds.security.exception.SCMSecretKeyException;
+import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
+import org.apache.hadoop.hdds.tracing.TracingUtil;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.ipc.ProtobufHelper;
+import org.apache.hadoop.ipc.ProtocolTranslator;
+import org.apache.hadoop.ipc.RPC;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * This class is the client-side translator that forwards requests for
+ * {@link SecretKeyProtocol} to the server proxy.
+ */
+public class SecretKeyProtocolClientSideTranslatorPB implements
+    SecretKeyProtocol, ProtocolTranslator, Closeable {
+
+  /**
+   * RpcController is not used and hence is set to null.
+   */
+  private static final RpcController NULL_RPC_CONTROLLER = null;
+  private final BlockingInterface rpcProxy;
+  private SecretKeyProtocolFailoverProxyProvider failoverProxyProvider;
+
+  public SecretKeyProtocolClientSideTranslatorPB(
+      SecretKeyProtocolFailoverProxyProvider<? extends BlockingInterface>
+          proxyProvider, Class<? extends BlockingInterface> proxyClazz) {
+    Preconditions.checkState(proxyProvider != null);
+    this.failoverProxyProvider = proxyProvider;
+    this.rpcProxy = (BlockingInterface) RetryProxy.create(
+        proxyClazz, failoverProxyProvider,
+        failoverProxyProvider.getRetryPolicy());
+  }
+
+  /**
+   * Helper method to wrap the request and send the message.
+   */
+  private SCMSecretKeyResponse submitRequest(
+      Type type,
+      Consumer<Builder> builderConsumer) throws IOException {
+    final SCMSecretKeyResponse response;
+    try {
+
+      Builder builder = SCMSecretKeyRequest.newBuilder()
+          .setCmdType(type)
+          .setTraceID(TracingUtil.exportCurrentSpan());
+      builderConsumer.accept(builder);
+      SCMSecretKeyRequest wrapper = builder.build();
+
+      response = rpcProxy.submitRequest(NULL_RPC_CONTROLLER, wrapper);
+
+      handleError(response);
+
+    } catch (ServiceException ex) {
+      throw ProtobufHelper.getRemoteException(ex);
+    }
+    return response;
+  }
+
+  private SCMSecretKeyResponse handleError(SCMSecretKeyResponse resp)
+      throws SCMSecretKeyException {
+    if (resp.getStatus() != SCMSecretKeyProtocolProtos.Status.OK) {
+      throw new SCMSecretKeyException(resp.getMessage(),
+          
SCMSecretKeyException.ErrorCode.values()[resp.getStatus().ordinal()]);
+    }
+    return resp;
+  }
+
+  /**
+   * Closes this stream and releases any system resources associated
+   * with it. If the stream is already closed then invoking this
+   * method has no effect.
+   *
+   * <p> As noted in {@link AutoCloseable#close()}, cases where the
+   * close may fail require careful attention. It is strongly advised
+   * to relinquish the underlying resources and to internally
+   * <em>mark</em> the {@code Closeable} as closed, prior to throwing
+   * the {@code IOException}.
+   *
+   * @throws IOException if an I/O error occurs
+   */
+  @Override
+  public void close() throws IOException {
+    RPC.stopProxy(rpcProxy);
+  }
+
+  @Override
+  public ManagedSecretKey getCurrentSecretKey() throws IOException {
+    SCMSecretKeyProtocolProtos.ManagedSecretKey secretKeyProto =
+        submitRequest(Type.GetCurrentSecretKey, builder -> {
+        }).getCurrentSecretKeyResponseProto().getSecretKey();
+    return ManagedSecretKey.fromProtobuf(secretKeyProto);
+  }
+
+  @Override
+  public ManagedSecretKey getSecretKey(UUID id) throws IOException {
+    SCMGetSecretKeyRequest request = SCMGetSecretKeyRequest.newBuilder()
+        .setSecretKeyId(HddsProtos.UUID.newBuilder()
+            .setMostSigBits(id.getMostSignificantBits())
+            .setLeastSigBits(id.getLeastSignificantBits())).build();
+    SCMGetSecretKeyResponse response = submitRequest(Type.GetSecretKey,
+        builder -> builder.setGetSecretKeyRequest(request))
+        .getGetSecretKeyResponseProto();
+
+    return response.hasSecretKey() ?
+        ManagedSecretKey.fromProtobuf(response.getSecretKey()) : null;
+  }
+
+  @Override
+  public List<ManagedSecretKey> getAllSecretKeys() throws IOException {
+    List<SCMSecretKeyProtocolProtos.ManagedSecretKey> secretKeysList =
+        submitRequest(Type.GetAllSecretKeys, builder -> {
+        }).getSecretKeysListResponseProto().getSecretKeysList();
+    return secretKeysList.stream()
+        .map(ManagedSecretKey::fromProtobuf)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Return the proxy object underlying this protocol translator.
+   *
+   * @return the proxy object underlying this protocol translator.
+   */
+  @Override
+  public Object getUnderlyingProxyObject() {
+    return rpcProxy;
+  }
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SecretKeyProtocolDatanodePB.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SecretKeyProtocolDatanodePB.java
new file mode 100644
index 0000000000..57f53df879
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SecretKeyProtocolDatanodePB.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.protocolPB;
+
+import 
org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.SCMSecretKeyProtocolService;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.security.KerberosInfo;
+
+import static 
org.apache.hadoop.hdds.scm.ScmConfig.ConfigStrings.HDDS_SCM_KERBEROS_PRINCIPAL_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
+
+/**
+ * Protocol for secret key related operations, to be used by datanode
+ * service role.
+ */
+@ProtocolInfo(protocolName =
+    "org.apache.hadoop.hdds.protocol.SecretKeyProtocolDatanode",
+    protocolVersion = 1)
+@KerberosInfo(
+    serverPrincipal = HDDS_SCM_KERBEROS_PRINCIPAL_KEY,
+    clientPrincipal = DFS_DATANODE_KERBEROS_PRINCIPAL_KEY
+)
+public interface SecretKeyProtocolDatanodePB extends
+    SCMSecretKeyProtocolService.BlockingInterface {
+
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SecretKeyProtocolOmPB.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SecretKeyProtocolOmPB.java
new file mode 100644
index 0000000000..5865e67e46
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SecretKeyProtocolOmPB.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.protocolPB;
+
+import 
org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.SCMSecretKeyProtocolService;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.security.KerberosInfo;
+
+import static 
org.apache.hadoop.hdds.scm.ScmConfig.ConfigStrings.HDDS_SCM_KERBEROS_PRINCIPAL_KEY;
+
+/**
+ * Protocol for secret key related operations, to be used by OM service role.
+ */
+@ProtocolInfo(protocolName =
+    "org.apache.hadoop.hdds.protocol.SecretKeyProtocolOm",
+    protocolVersion = 1)
+@KerberosInfo(
+    serverPrincipal = HDDS_SCM_KERBEROS_PRINCIPAL_KEY,
+    // TODO: move OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY to hdds-common.
+    clientPrincipal = "ozone.om.kerberos.principal"
+)
+public interface SecretKeyProtocolOmPB extends
+    SCMSecretKeyProtocolService.BlockingInterface {
+
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SecretKeyProtocolScmPB.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SecretKeyProtocolScmPB.java
new file mode 100644
index 0000000000..42a1b15683
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SecretKeyProtocolScmPB.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.protocolPB;
+
+import 
org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.SCMSecretKeyProtocolService;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.security.KerberosInfo;
+
+import static 
org.apache.hadoop.hdds.scm.ScmConfig.ConfigStrings.HDDS_SCM_KERBEROS_PRINCIPAL_KEY;
+
+/**
+ * Protocol for secret key related operations, to be used by SCM service role.
+ */
+@ProtocolInfo(protocolName =
+    "org.apache.hadoop.hdds.protocol.SecretKeyProtocolScm",
+    protocolVersion = 1)
+@KerberosInfo(
+    serverPrincipal = HDDS_SCM_KERBEROS_PRINCIPAL_KEY,
+    clientPrincipal = HDDS_SCM_KERBEROS_PRINCIPAL_KEY
+)
+public interface SecretKeyProtocolScmPB extends
+    SCMSecretKeyProtocolService.BlockingInterface {
+
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SecretKeyProtocolFailoverProxyProvider.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SecretKeyProtocolFailoverProxyProvider.java
new file mode 100644
index 0000000000..9e985e9427
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SecretKeyProtocolFailoverProxyProvider.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.proxy;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.ConfigurationException;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import 
org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.SCMSecretKeyProtocolService;
+import org.apache.hadoop.hdds.ratis.ServerNotLeaderException;
+import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
+import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo;
+import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
+import org.apache.hadoop.io.retry.FailoverProxyProvider;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Failover proxy provider for SCMSecretKeyProtocolService server.
+ */
+public class SecretKeyProtocolFailoverProxyProvider
+    <T extends SCMSecretKeyProtocolService.BlockingInterface> implements
+    FailoverProxyProvider<T>, Closeable {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SecretKeyProtocolFailoverProxyProvider.class);
+
+  // scmNodeId -> ProxyInfo<rpcProxy>
+  private final Map<String, ProxyInfo<T>> scmProxies;
+
+  // scmNodeId -> SCMProxyInfo
+  private final Map<String, SCMProxyInfo> scmProxyInfoMap;
+
+  private List<String> scmNodeIds;
+
+  // As SCM Client is shared across threads, performFailOver()
+  // updates the currentProxySCMNodeId based on the updateLeaderNodeId which is
+  // updated in shouldRetry(). When 2 or more threads run in parallel, the
+  // RetryInvocationHandler will check the expectedFailOverCount
+  // and not execute performFailOver() for one of them. So the other thread(s)
+  // shall not call performFailOver(), it will call getProxy() which uses
+  // currentProxySCMNodeId and returns the proxy.
+  private volatile String currentProxySCMNodeId;
+  private volatile int currentProxyIndex;
+
+
+  private final ConfigurationSource conf;
+  private final SCMClientConfig scmClientConfig;
+  private final long scmVersion;
+
+  private String scmServiceId;
+
+  private final int maxRetryCount;
+  private final long retryInterval;
+
+  private final UserGroupInformation ugi;
+  private final Class<T> proxyClazz;
+
+  private String updatedLeaderNodeID = null;
+
+  /**
+   * Construct fail-over proxy provider for SCMSecurityProtocol Server.
+   * @param conf
+   * @param userGroupInformation
+   */
+  public SecretKeyProtocolFailoverProxyProvider(ConfigurationSource conf,
+      UserGroupInformation userGroupInformation, Class<T> proxyClazz) {
+    Preconditions.checkNotNull(userGroupInformation);
+    this.ugi = userGroupInformation;
+    this.conf = conf;
+    this.proxyClazz = proxyClazz;
+    this.scmVersion = RPC.getProtocolVersion(proxyClazz);
+
+    this.scmProxies = new HashMap<>();
+    this.scmProxyInfoMap = new HashMap<>();
+    loadConfigs();
+
+    this.currentProxyIndex = 0;
+    currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex);
+    scmClientConfig = conf.getObject(SCMClientConfig.class);
+    this.maxRetryCount = scmClientConfig.getRetryCount();
+    this.retryInterval = scmClientConfig.getRetryInterval();
+  }
+
+  protected synchronized void loadConfigs() {
+    List<SCMNodeInfo> scmNodeInfoList = SCMNodeInfo.buildNodeInfo(conf);
+    scmNodeIds = new ArrayList<>();
+
+    for (SCMNodeInfo scmNodeInfo : scmNodeInfoList) {
+      if (scmNodeInfo.getScmSecurityAddress() == null) {
+        throw new ConfigurationException("SCM Client Address could not " +
+            "be obtained from config. Config is not properly defined");
+      } else {
+        InetSocketAddress scmSecurityAddress =
+            NetUtils.createSocketAddr(scmNodeInfo.getScmSecurityAddress());
+
+        scmServiceId = scmNodeInfo.getServiceId();
+        String scmNodeId = scmNodeInfo.getNodeId();
+
+        scmNodeIds.add(scmNodeId);
+        SCMProxyInfo scmProxyInfo = new SCMProxyInfo(scmServiceId, scmNodeId,
+            scmSecurityAddress);
+        scmProxyInfoMap.put(scmNodeId, scmProxyInfo);
+      }
+    }
+  }
+
+  @Override
+  public synchronized ProxyInfo<T> getProxy() {
+    ProxyInfo<T> currentProxyInfo = scmProxies.get(getCurrentProxySCMNodeId());
+    if (currentProxyInfo == null) {
+      currentProxyInfo = createSCMProxy(getCurrentProxySCMNodeId());
+    }
+    return currentProxyInfo;
+  }
+
+  /**
+   * Creates proxy object.
+   */
+  private ProxyInfo<T> createSCMProxy(String nodeId) {
+    ProxyInfo<T> proxyInfo;
+    SCMProxyInfo scmProxyInfo = scmProxyInfoMap.get(nodeId);
+    InetSocketAddress address = scmProxyInfo.getAddress();
+    try {
+      T scmProxy = createSCMProxy(address);
+      // Create proxyInfo here, to make it work with all Hadoop versions.
+      proxyInfo = new ProxyInfo<T>(scmProxy, scmProxyInfo.toString());
+      scmProxies.put(nodeId, proxyInfo);
+      return proxyInfo;
+    } catch (IOException ioe) {
+      LOG.error("{} Failed to create RPC proxy to SCM at {}",
+          this.getClass().getSimpleName(), address, ioe);
+      throw new RuntimeException(ioe);
+    }
+  }
+
+  private T createSCMProxy(InetSocketAddress scmAddress)
+      throws IOException {
+    Configuration hadoopConf =
+        LegacyHadoopConfigurationSource.asHadoopConfiguration(conf);
+    RPC.setProtocolEngine(hadoopConf, proxyClazz,
+        ProtobufRpcEngine.class);
+
+    // FailoverOnNetworkException ensures that the IPC layer does not attempt
+    // retries on the same SCM in case of connection exception. This retry
+    // policy essentially results in TRY_ONCE_THEN_FAIL.
+
+    RetryPolicy connectionRetryPolicy = RetryPolicies
+        .failoverOnNetworkException(0);
+
+    return RPC.getProtocolProxy(proxyClazz,
+        scmVersion, scmAddress, ugi,
+        hadoopConf, NetUtils.getDefaultSocketFactory(hadoopConf),
+        (int)scmClientConfig.getRpcTimeOut(), 
connectionRetryPolicy).getProxy();
+  }
+
+
+  @Override
+  public synchronized void performFailover(T currentProxy) {
+    if (updatedLeaderNodeID != null) {
+      currentProxySCMNodeId = updatedLeaderNodeID;
+    } else {
+      nextProxyIndex();
+    }
+    LOG.debug("Failing over to next proxy. {}", getCurrentProxySCMNodeId());
+  }
+
+  public synchronized void performFailoverToAssignedLeader(String newLeader,
+      Exception e) {
+    ServerNotLeaderException snle =
+        (ServerNotLeaderException) SCMHAUtils.getServerNotLeaderException(e);
+    if (snle != null && snle.getSuggestedLeader() != null) {
+      Optional< SCMProxyInfo > matchedProxyInfo =
+          scmProxyInfoMap.values().stream().filter(
+              proxyInfo -> NetUtils.getHostPortString(proxyInfo.getAddress())
+                  .equals(snle.getSuggestedLeader())).findFirst();
+      if (matchedProxyInfo.isPresent()) {
+        newLeader = matchedProxyInfo.get().getNodeId();
+        LOG.debug("Performing failover to suggested leader {}, nodeId {}",
+            snle.getSuggestedLeader(), newLeader);
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Suggested leader {} does not match with any of the " +
+                          "proxyInfo adress {}", snle.getSuggestedLeader(),
+                  Arrays.toString(scmProxyInfoMap.values().toArray()));
+        }
+      }
+    }
+    assignLeaderToNode(newLeader);
+  }
+
+
+  private synchronized void assignLeaderToNode(String newLeaderNodeId) {
+    if (!currentProxySCMNodeId.equals(newLeaderNodeId)) {
+      if (scmProxyInfoMap.containsKey(newLeaderNodeId)) {
+        updatedLeaderNodeID = newLeaderNodeId;
+        LOG.debug("Updated LeaderNodeID {}", updatedLeaderNodeID);
+      } else {
+        updatedLeaderNodeID = null;
+      }
+    }
+  }
+
+  /**
+   * Update the proxy index to the next proxy in the list.
+   * @return the new proxy index
+   */
+  private synchronized void nextProxyIndex() {
+    // round robin the next proxy
+    currentProxyIndex = (getCurrentProxyIndex() + 1) % scmProxyInfoMap.size();
+    currentProxySCMNodeId =  scmNodeIds.get(currentProxyIndex);
+  }
+
+  public RetryPolicy getRetryPolicy() {
+    // Client will attempt up to maxFailovers number of failovers between
+    // available SCMs before throwing exception.
+
+    return (exception, retries, failovers, isIdempotentOrAtMostOnce) -> {
+
+      if (LOG.isDebugEnabled()) {
+        if (exception.getCause() != null) {
+          LOG.debug("RetryProxy: SCM Security Server {}: {}: {}",
+              getCurrentProxySCMNodeId(),
+              exception.getCause().getClass().getSimpleName(),
+              exception.getCause().getMessage());
+        } else {
+          LOG.debug("RetryProxy: SCM {}: {}", getCurrentProxySCMNodeId(),
+              exception.getMessage());
+        }
+      }
+
+      if (SCMHAUtils.checkRetriableWithNoFailoverException(exception)) {
+        setUpdatedLeaderNodeID();
+      } else {
+        performFailoverToAssignedLeader(null, exception);
+      }
+      return SCMHAUtils
+          .getRetryAction(failovers, retries, exception, maxRetryCount,
+              getRetryInterval());
+    };
+  }
+
+  public synchronized void setUpdatedLeaderNodeID() {
+    this.updatedLeaderNodeID = getCurrentProxySCMNodeId();
+  }
+
+  @Override
+  public Class<T> getInterface() {
+    return proxyClazz;
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    for (ProxyInfo<T> proxyInfo : scmProxies.values()) {
+      if (proxyInfo.proxy != null) {
+        RPC.stopProxy(proxyInfo.proxy);
+      }
+    }
+  }
+
+  public synchronized String getCurrentProxySCMNodeId() {
+    return currentProxySCMNodeId;
+  }
+
+  public synchronized int getCurrentProxyIndex() {
+    return currentProxyIndex;
+  }
+
+  private long getRetryInterval() {
+    return retryInterval;
+  }
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SingleSCMSecurityProtocolProxyProvider.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SingleSecretKeyProtocolProxyProvider.java
similarity index 72%
rename from 
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SingleSCMSecurityProtocolProxyProvider.java
rename to 
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SingleSecretKeyProtocolProxyProvider.java
index a69522d14f..f50f57a7a0 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SingleSCMSecurityProtocolProxyProvider.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SingleSecretKeyProtocolProxyProvider.java
@@ -17,23 +17,24 @@
 package org.apache.hadoop.hdds.scm.proxy;
 
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
+import 
org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.SCMSecretKeyProtocolService;
 import org.apache.hadoop.security.UserGroupInformation;
 
 /**
- * Proxy provider for
- * {@link org.apache.hadoop.hdds.protocol.SCMSecurityProtocol} against a single
- * SCM node (no fail-over).
+ * Proxy provider for SCMSecretKeyProtocolService against a
+ * single SCM node (no fail-over).
  */
-public class SingleSCMSecurityProtocolProxyProvider
-    extends SCMSecurityProtocolFailoverProxyProvider {
+public class SingleSecretKeyProtocolProxyProvider
+    <T extends SCMSecretKeyProtocolService.BlockingInterface>
+    extends SecretKeyProtocolFailoverProxyProvider<T> {
   private final String scmNodeId;
 
-  public SingleSCMSecurityProtocolProxyProvider(
+  public SingleSecretKeyProtocolProxyProvider(
       ConfigurationSource conf,
       UserGroupInformation userGroupInformation,
+      Class<T> clazz,
       String scmNodeId) {
-    super(conf, userGroupInformation);
+    super(conf, userGroupInformation, clazz);
     this.scmNodeId = scmNodeId;
   }
 
@@ -43,7 +44,7 @@ public class SingleSCMSecurityProtocolProxyProvider
   }
 
   @Override
-  public synchronized void performFailover(SCMSecurityProtocolPB currentProxy) 
{
+  public synchronized void performFailover(T currentProxy) {
     // do nothing.
   }
 
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/DefaultSecretKeyClient.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/DefaultSecretKeyClient.java
index d77fee778e..030b0c7b68 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/DefaultSecretKeyClient.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/DefaultSecretKeyClient.java
@@ -18,14 +18,12 @@
 package org.apache.hadoop.hdds.security.symmetric;
 
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
+import org.apache.hadoop.hdds.protocol.SecretKeyProtocol;
 import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 
 import java.io.IOException;
 import java.util.UUID;
 
-import static org.apache.hadoop.hdds.utils.HddsServerUtil.getScmSecurityClient;
-
 /**
  * A composition of {@link DefaultSecretKeySignerClient} and
  * {@link DefaultSecretKeyVerifierClient} for components need both APIs.
@@ -62,13 +60,13 @@ public class DefaultSecretKeyClient implements 
SecretKeyClient {
     return verifierClientDelegate.getSecretKey(id);
   }
 
-  public static SecretKeyClient create(ConfigurationSource conf)
+  public static SecretKeyClient create(ConfigurationSource conf,
+      SecretKeyProtocol secretKeyProtocol)
       throws IOException {
-    SCMSecurityProtocol securityProtocol = getScmSecurityClient(conf);
     SecretKeySignerClient singerClient =
-        new DefaultSecretKeySignerClient(securityProtocol);
+        new DefaultSecretKeySignerClient(secretKeyProtocol);
     SecretKeyVerifierClient verifierClient =
-        new DefaultSecretKeyVerifierClient(securityProtocol, conf);
+        new DefaultSecretKeyVerifierClient(secretKeyProtocol, conf);
     return new DefaultSecretKeyClient(singerClient, verifierClient);
   }
 }
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/DefaultSecretKeySignerClient.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/DefaultSecretKeySignerClient.java
index e29254c753..daabf74e37 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/DefaultSecretKeySignerClient.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/DefaultSecretKeySignerClient.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.hdds.security.symmetric;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
+import org.apache.hadoop.hdds.protocol.SecretKeyProtocol;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,7 +34,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static java.util.Objects.requireNonNull;
-import static org.apache.hadoop.hdds.utils.HddsServerUtil.getScmSecurityClient;
 
 /**
  * Default implementation of {@link SecretKeySignerClient} that fetches
@@ -45,14 +44,14 @@ public class DefaultSecretKeySignerClient implements 
SecretKeySignerClient {
   private static final Logger LOG =
       LoggerFactory.getLogger(DefaultSecretKeySignerClient.class);
 
-  private final SCMSecurityProtocol scmSecurityProtocol;
+  private final SecretKeyProtocol secretKeyProtocol;
   private final AtomicReference<ManagedSecretKey> cache =
       new AtomicReference<>();
   private ScheduledExecutorService executorService;
 
   public DefaultSecretKeySignerClient(
-      SCMSecurityProtocol scmSecurityProtocol) {
-    this.scmSecurityProtocol = scmSecurityProtocol;
+      SecretKeyProtocol secretKeyProtocol) {
+    this.secretKeyProtocol = secretKeyProtocol;
   }
 
   @Override
@@ -64,7 +63,7 @@ public class DefaultSecretKeySignerClient implements 
SecretKeySignerClient {
   @Override
   public void start(ConfigurationSource conf) throws IOException {
     final ManagedSecretKey initialKey =
-        scmSecurityProtocol.getCurrentSecretKey();
+        secretKeyProtocol.getCurrentSecretKey();
     LOG.info("Initial secret key fetched from SCM: {}.", initialKey);
     cache.set(initialKey);
     scheduleSecretKeyPoller(conf, initialKey.getCreationTime());
@@ -111,7 +110,7 @@ public class DefaultSecretKeySignerClient implements 
SecretKeySignerClient {
     // from SCM.
     if (nextRotate.isBefore(Instant.now())) {
       try {
-        ManagedSecretKey newKey = scmSecurityProtocol.getCurrentSecretKey();
+        ManagedSecretKey newKey = secretKeyProtocol.getCurrentSecretKey();
         if (!newKey.equals(current)) {
           cache.set(newKey);
           LOG.info("New secret key fetched from SCM: {}.", newKey);
@@ -123,10 +122,4 @@ public class DefaultSecretKeySignerClient implements 
SecretKeySignerClient {
       }
     }
   }
-
-  public static DefaultSecretKeySignerClient create(ConfigurationSource conf)
-      throws IOException {
-    SCMSecurityProtocol securityProtocol = getScmSecurityClient(conf);
-    return new DefaultSecretKeySignerClient(securityProtocol);
-  }
 }
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/DefaultSecretKeyVerifierClient.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/DefaultSecretKeyVerifierClient.java
index c79ae6ef8b..8a223cc16f 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/DefaultSecretKeyVerifierClient.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/DefaultSecretKeyVerifierClient.java
@@ -21,6 +21,7 @@ import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.SecretKeyProtocol;
 import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
 import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 import org.slf4j.Logger;
@@ -35,7 +36,6 @@ import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.hadoop.hdds.security.symmetric.SecretKeyConfig.parseExpiryDuration;
 import static 
org.apache.hadoop.hdds.security.symmetric.SecretKeyConfig.parseRotateDuration;
-import static org.apache.hadoop.hdds.utils.HddsServerUtil.getScmSecurityClient;
 
 /**
  * Default implementation of {@link SecretKeyVerifierClient} that fetches
@@ -47,7 +47,7 @@ public class DefaultSecretKeyVerifierClient implements 
SecretKeyVerifierClient {
 
   private final LoadingCache<UUID, Optional<ManagedSecretKey>> cache;
 
-  DefaultSecretKeyVerifierClient(SCMSecurityProtocol scmSecurityProtocol,
+  DefaultSecretKeyVerifierClient(SecretKeyProtocol secretKeyProtocol,
                                  ConfigurationSource conf) {
     Duration expiryDuration = parseExpiryDuration(conf);
     Duration rotateDuration = parseRotateDuration(conf);
@@ -68,7 +68,7 @@ public class DefaultSecretKeyVerifierClient implements 
SecretKeyVerifierClient {
         new CacheLoader<UUID, Optional<ManagedSecretKey>>() {
           @Override
           public Optional<ManagedSecretKey> load(UUID id) throws Exception {
-            ManagedSecretKey secretKey = scmSecurityProtocol.getSecretKey(id);
+            ManagedSecretKey secretKey = secretKeyProtocol.getSecretKey(id);
             LOG.info("Secret key fetched from SCM: {}", secretKey);
             return Optional.ofNullable(secretKey);
           }
@@ -102,10 +102,4 @@ public class DefaultSecretKeyVerifierClient implements 
SecretKeyVerifierClient {
           "key " + id + " from SCM", e.getCause());
     }
   }
-
-  public static DefaultSecretKeyVerifierClient create(ConfigurationSource conf)
-      throws IOException {
-    SCMSecurityProtocol securityProtocol = getScmSecurityClient(conf);
-    return new DefaultSecretKeyVerifierClient(securityProtocol, conf);
-  }
 }
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/ManagedSecretKey.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/ManagedSecretKey.java
index 78e4fc0b90..2ff44daf9b 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/ManagedSecretKey.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/ManagedSecretKey.java
@@ -19,7 +19,7 @@
 package org.apache.hadoop.hdds.security.symmetric;
 
 import com.google.protobuf.ByteString;
-import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.ProtobufUtils;
 
@@ -129,8 +129,8 @@ public final class ManagedSecretKey {
   /**
    * @return the protobuf message to deserialize this object.
    */
-  public SCMSecurityProtocolProtos.ManagedSecretKey toProtobuf() {
-    return SCMSecurityProtocolProtos.ManagedSecretKey.newBuilder()
+  public SCMSecretKeyProtocolProtos.ManagedSecretKey toProtobuf() {
+    return SCMSecretKeyProtocolProtos.ManagedSecretKey.newBuilder()
         .setId(ProtobufUtils.toProtobuf(id))
         .setCreationTime(this.creationTime.toEpochMilli())
         .setExpiryTime(this.expiryTime.toEpochMilli())
@@ -143,7 +143,7 @@ public final class ManagedSecretKey {
    * Create a {@link ManagedSecretKey} from a given protobuf message.
    */
   public static ManagedSecretKey fromProtobuf(
-      SCMSecurityProtocolProtos.ManagedSecretKey message) {
+      SCMSecretKeyProtocolProtos.ManagedSecretKey message) {
     UUID id = ProtobufUtils.fromProtobuf(message.getId());
     Instant creationTime = Instant.ofEpochMilli(message.getCreationTime());
     Instant expiryTime = Instant.ofEpochMilli(message.getExpiryTime());
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
index 7c896c0bf1..6dbcc661b8 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
@@ -44,14 +44,19 @@ import org.apache.hadoop.hdds.DFSConfigKeysLegacy;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
+import 
org.apache.hadoop.hdds.protocolPB.SecretKeyProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdds.protocolPB.SecretKeyProtocolDatanodePB;
+import org.apache.hadoop.hdds.protocolPB.SecretKeyProtocolOmPB;
 import 
org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdds.protocolPB.SecretKeyProtocolScmPB;
 import org.apache.hadoop.hdds.ratis.ServerNotLeaderException;
 import org.apache.hadoop.hdds.recon.ReconConfigKeys;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.hdds.scm.proxy.SCMClientConfig;
+import org.apache.hadoop.hdds.scm.proxy.SecretKeyProtocolFailoverProxyProvider;
 import 
org.apache.hadoop.hdds.scm.proxy.SCMSecurityProtocolFailoverProxyProvider;
-import org.apache.hadoop.hdds.scm.proxy.SingleSCMSecurityProtocolProxyProvider;
+import org.apache.hadoop.hdds.scm.proxy.SingleSecretKeyProtocolProxyProvider;
 import org.apache.hadoop.hdds.server.ServerUtils;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
 import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
@@ -448,16 +453,6 @@ public final class HddsServerUtil {
             UserGroupInformation.getCurrentUser()));
   }
 
-  /**
-   * Create a scm security client to interact with a specific SCM node.
-   */
-  public static SCMSecurityProtocolClientSideTranslatorPB
-      getScmSecurityClientSingleNode(ConfigurationSource conf, String 
scmNodeId,
-      UserGroupInformation ugi) throws IOException {
-    return new SCMSecurityProtocolClientSideTranslatorPB(
-        new SingleSCMSecurityProtocolProxyProvider(conf, ugi, scmNodeId));
-  }
-
   public static SCMSecurityProtocolClientSideTranslatorPB
       getScmSecurityClientWithMaxRetry(OzoneConfiguration conf,
       UserGroupInformation ugi) throws IOException {
@@ -516,6 +511,58 @@ public final class HddsServerUtil {
         SCMSecurityProtocol.class, conf);
   }
 
+  /**
+   * Create a {@link org.apache.hadoop.hdds.protocol.SecretKeyProtocol} for
+   * datanode service, should be use only if user is the Datanode identity.
+   */
+  public static SecretKeyProtocolClientSideTranslatorPB
+      getSecretKeyClientForDatanode(ConfigurationSource conf)
+      throws IOException {
+    return new SecretKeyProtocolClientSideTranslatorPB(
+        new SecretKeyProtocolFailoverProxyProvider(conf,
+            UserGroupInformation.getCurrentUser(),
+            SecretKeyProtocolDatanodePB.class),
+        SecretKeyProtocolDatanodePB.class);
+  }
+
+  /**
+   * Create a {@link org.apache.hadoop.hdds.protocol.SecretKeyProtocol} for
+   * OM service, should be use only if user is the OM identity.
+   */
+  public static SecretKeyProtocolClientSideTranslatorPB
+      getSecretKeyClientForOm(ConfigurationSource conf) throws IOException {
+    return new SecretKeyProtocolClientSideTranslatorPB(
+        new SecretKeyProtocolFailoverProxyProvider(conf,
+            UserGroupInformation.getCurrentUser(),
+            SecretKeyProtocolOmPB.class),
+        SecretKeyProtocolOmPB.class);
+  }
+
+  public static SecretKeyProtocolClientSideTranslatorPB
+      getSecretKeyClientForDatanode(ConfigurationSource conf,
+      UserGroupInformation ugi) {
+    return new SecretKeyProtocolClientSideTranslatorPB(
+        new SecretKeyProtocolFailoverProxyProvider(conf, ugi,
+            SecretKeyProtocolDatanodePB.class),
+        SecretKeyProtocolDatanodePB.class);
+  }
+
+  /**
+   * Create a {@link org.apache.hadoop.hdds.protocol.SecretKeyProtocol} for
+   * SCM service, should be use only if user is the Datanode identity.
+   *
+   * The protocol returned by this method only target a single destination
+   * SCM node.
+   */
+  public static SecretKeyProtocolClientSideTranslatorPB
+      getSecretKeyClientForScm(ConfigurationSource conf,
+      String scmNodeId, UserGroupInformation ugi) {
+    return new SecretKeyProtocolClientSideTranslatorPB(
+        new SingleSecretKeyProtocolProxyProvider(conf, ugi,
+            SecretKeyProtocolScmPB.class, scmNodeId),
+        SecretKeyProtocolScmPB.class);
+  }
+
   /**
    * Initialize hadoop metrics system for Ozone servers.
    * @param configuration OzoneConfiguration to use.
diff --git 
a/hadoop-hdds/interface-server/src/main/proto/ScmSecretKeyProtocol.proto 
b/hadoop-hdds/interface-server/src/main/proto/ScmSecretKeyProtocol.proto
new file mode 100644
index 0000000000..88b00ff7c3
--- /dev/null
+++ b/hadoop-hdds/interface-server/src/main/proto/ScmSecretKeyProtocol.proto
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * These .proto interfaces are private and unstable.
+ * Please see http://wiki.apache.org/hadoop/Compatibility
+ * for what changes are allowed for a *unstable* .proto interface.
+ */
+syntax = "proto2";
+
+option java_package = "org.apache.hadoop.hdds.protocol.proto";
+
+option java_outer_classname = "SCMSecretKeyProtocolProtos";
+
+option java_generic_services = true;
+
+option java_generate_equals_and_hash = true;
+
+package hadoop.hdds.security.symmetric;
+
+import "hdds.proto";
+
+/**
+All commands is send as request and all response come back via
+Response class. If adding new functions please follow this protocol, since
+our tracing and visibility tools depend on this pattern.
+*/
+message SCMSecretKeyRequest {
+    required Type cmdType = 1; // Type of the command
+
+    optional string traceID = 2;
+
+    optional SCMGetSecretKeyRequest getSecretKeyRequest = 3;
+}
+
+message SCMSecretKeyResponse {
+    required Type cmdType = 1; // Type of the command
+
+    // A string that identifies this command, we generate  Trace ID in Ozone
+    // frontend and this allows us to trace that command all over ozone.
+    optional string traceID = 2;
+
+    optional bool success = 3 [default = true];
+
+    optional string message = 4;
+
+    required Status status = 5;
+
+    optional SCMGetCurrentSecretKeyResponse currentSecretKeyResponseProto = 11;
+
+    optional SCMGetSecretKeyResponse getSecretKeyResponseProto = 12;
+
+    optional SCMSecretKeysListResponse secretKeysListResponseProto = 13;
+
+}
+
+enum Type {
+    GetCurrentSecretKey = 1;
+    GetSecretKey = 2;
+    GetAllSecretKeys = 3;
+}
+
+enum Status {
+    OK = 1;
+    INTERNAL_ERROR = 2;
+    SECRET_KEY_NOT_ENABLED = 3;
+    SECRET_KEY_NOT_INITIALIZED = 4;
+}
+
+service SCMSecretKeyProtocolService {
+    rpc submitRequest (SCMSecretKeyRequest) returns (SCMSecretKeyResponse);
+}
+
+message ManagedSecretKey {
+    required UUID id = 1;
+    required uint64 creationTime = 2;
+    required uint64 expiryTime = 3;
+    required string algorithm = 4;
+    required bytes encoded = 5;
+}
+
+message SCMGetSecretKeyRequest {
+    required UUID secretKeyId = 1;
+}
+
+message SCMGetCurrentSecretKeyResponse {
+    required ManagedSecretKey secretKey = 1;
+}
+
+message SCMGetSecretKeyResponse {
+    optional ManagedSecretKey secretKey = 1;
+}
+
+message SCMSecretKeysListResponse {
+    repeated ManagedSecretKey secretKeys = 1;
+}
diff --git 
a/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto 
b/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto
index 27d1e3c1c3..1768444e07 100644
--- 
a/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto
+++ 
b/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto
@@ -56,7 +56,6 @@ message SCMSecurityRequest {
     optional SCMGetLatestCrlIdRequestProto getLatestCrlIdRequest = 11;
     optional SCMRevokeCertificatesRequestProto revokeCertificatesRequest = 12;
     optional SCMGetCertRequestProto getCertRequest = 13;
-    optional SCMGetSecretKeyRequest getSecretKeyRequest = 14;
 }
 
 message SCMSecurityResponse {
@@ -81,13 +80,6 @@ message SCMSecurityResponse {
     optional SCMGetLatestCrlIdResponseProto getLatestCrlIdResponseProto = 9;
 
     optional SCMRevokeCertificatesResponseProto 
revokeCertificatesResponseProto = 10;
-
-    optional SCMGetCurrentSecretKeyResponse currentSecretKeyResponseProto = 11;
-
-    optional SCMGetSecretKeyResponse getSecretKeyResponseProto = 12;
-
-    optional SCMSecretKeysListResponse secretKeysListResponseProto = 13;
-
 }
 
 enum Type {
@@ -103,9 +95,6 @@ enum Type {
     GetLatestCrlId = 10;
     RevokeCertificates = 11;
     GetCert = 12;
-    GetCurrentSecretKey = 13;
-    GetSecretKey = 14;
-    GetAllSecretKeys = 15;
 }
 
 enum Status {
@@ -126,8 +115,6 @@ enum Status {
     GET_ROOT_CA_CERTIFICATE_FAILED = 15;
     NOT_A_PRIMARY_SCM = 16;
     REVOKE_CERTIFICATE_FAILED = 17;
-    SECRET_KEY_NOT_ENABLED = 18;
-    SECRET_KEY_NOT_INITIALIZED = 19;
 }
 /**
 * This message is send by data node to prove its identity and get an SCM
@@ -260,31 +247,4 @@ message SCMRevokeCertificatesResponseProto {
 
 service SCMSecurityProtocolService {
     rpc submitRequest (SCMSecurityRequest) returns (SCMSecurityResponse);
-}
-
-message ManagedSecretKey {
-    required UUID id = 1;
-    required uint64 creationTime = 2;
-    required uint64 expiryTime = 3;
-    required string algorithm = 4;
-    required bytes encoded = 5;
-}
-
-message SCMGetSecretKeyRequest {
-    required UUID secretKeyId = 1;
-}
-
-message SCMGetCurrentSecretKeyResponse {
-    required ManagedSecretKey secretKey = 1;
-}
-
-message SCMGetSecretKeyResponse {
-    optional ManagedSecretKey secretKey = 1;
-}
-
-message SCMSecretKeysListResponse {
-    repeated ManagedSecretKey secretKeys = 1;
-}
-
-
-
+}
\ No newline at end of file
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
index c8efe7b261..f677846f9c 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
@@ -20,7 +20,7 @@ package org.apache.hadoop.hdds.scm.ha;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import 
org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
+import 
org.apache.hadoop.hdds.protocolPB.SecretKeyProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdds.scm.AddSCMRequest;
 import org.apache.hadoop.hdds.scm.RemoveSCMRequest;
 import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer;
@@ -51,7 +51,7 @@ import java.io.IOException;
 import java.nio.file.Path;
 import java.util.List;
 
-import static 
org.apache.hadoop.hdds.utils.HddsServerUtil.getScmSecurityClientSingleNode;
+import static 
org.apache.hadoop.hdds.utils.HddsServerUtil.getSecretKeyClientForScm;
 
 /**
  * SCMHAManagerImpl uses Apache Ratis for HA implementation. We will have 2N+1
@@ -182,10 +182,10 @@ public class SCMHAManagerImpl implements SCMHAManager {
     }
 
     LOG.info("Getting secret keys from leader {}.", leaderID);
-    try (SCMSecurityProtocolClientSideTranslatorPB securityProtocol =
-             getScmSecurityClientSingleNode(conf, leaderID,
+    try (SecretKeyProtocolClientSideTranslatorPB secretKeyProtocol =
+             getSecretKeyClientForScm(conf, leaderID,
                  UserGroupInformation.getLoginUser())) {
-      return securityProtocol.getAllSecretKeys();
+      return secretKeyProtocol.getAllSecretKeys();
     }
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/ManagedSecretKeyCodec.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/ManagedSecretKeyCodec.java
index 384d818762..32705bb2a7 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/ManagedSecretKeyCodec.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/ManagedSecretKeyCodec.java
@@ -20,7 +20,7 @@ package org.apache.hadoop.hdds.scm.ha.io;
 
 import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos;
 import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
 
 /**
@@ -37,8 +37,8 @@ public class ManagedSecretKeyCodec implements Codec {
   @Override
   public Object deserialize(Class<?> type, ByteString value)
       throws InvalidProtocolBufferException {
-    SCMSecurityProtocolProtos.ManagedSecretKey message =
-        SCMSecurityProtocolProtos.ManagedSecretKey.parseFrom(value);
+    SCMSecretKeyProtocolProtos.ManagedSecretKey message =
+        SCMSecretKeyProtocolProtos.ManagedSecretKey.parseFrom(value);
     return ManagedSecretKey.fromProtobuf(message);
   }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecurityProtocolServerSideTranslatorPB.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecurityProtocolServerSideTranslatorPB.java
index 9b7f71bd8a..736aef15a0 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecurityProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecurityProtocolServerSideTranslatorPB.java
@@ -18,7 +18,6 @@ package org.apache.hadoop.hdds.scm.protocol;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.UUID;
 
 import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos;
@@ -27,20 +26,16 @@ import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCer
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertificateRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCrlsRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCrlsResponseProto;
-import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCurrentSecretKeyResponse;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetDataNodeCertRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetLatestCrlIdRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetLatestCrlIdResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetOMCertRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetSCMCertRequestProto;
-import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetSecretKeyRequest;
-import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetSecretKeyResponse;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMListCertificateRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMListCertificateResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMRevokeCertificatesRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMRevokeCertificatesResponseProto;
-import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMSecretKeysListResponse;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMSecurityRequest;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMSecurityResponse;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.Status;
@@ -48,7 +43,6 @@ import 
org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
 import org.apache.hadoop.hdds.scm.ha.RatisUtil;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
-import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
 import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
 import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher;
 import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics;
@@ -56,7 +50,6 @@ import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics;
 import com.google.protobuf.ProtocolMessageEnum;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
-import org.apache.hadoop.util.ProtobufUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -158,21 +151,6 @@ public class SCMSecurityProtocolServerSideTranslatorPB
             getCertificate(request.getGetCertRequest()))
             .build();
 
-      case GetCurrentSecretKey:
-        return scmSecurityResponse
-            .setCurrentSecretKeyResponseProto(getCurrentSecretKey())
-            .build();
-
-      case GetSecretKey:
-        return scmSecurityResponse.setGetSecretKeyResponseProto(
-                getSecretKey(request.getGetSecretKeyRequest()))
-            .build();
-
-      case GetAllSecretKeys:
-        return scmSecurityResponse
-            .setSecretKeysListResponseProto(getAllSecretKeys())
-            .build();
-
       default:
         throw new IllegalArgumentException(
             "Unknown request type: " + request.getCmdType());
@@ -195,34 +173,6 @@ public class SCMSecurityProtocolServerSideTranslatorPB
     }
   }
 
-  private SCMSecretKeysListResponse getAllSecretKeys() throws IOException {
-    SCMSecretKeysListResponse.Builder builder =
-        SCMSecretKeysListResponse.newBuilder();
-    impl.getAllSecretKeys()
-        .stream().map(ManagedSecretKey::toProtobuf)
-        .forEach(builder::addSecretKeys);
-    return builder.build();
-  }
-
-  private SCMGetSecretKeyResponse getSecretKey(
-      SCMGetSecretKeyRequest getSecretKeyRequest) throws IOException {
-    SCMGetSecretKeyResponse.Builder builder =
-        SCMGetSecretKeyResponse.newBuilder();
-    UUID id = ProtobufUtils.fromProtobuf(getSecretKeyRequest.getSecretKeyId());
-    ManagedSecretKey secretKey = impl.getSecretKey(id);
-    if (secretKey != null) {
-      builder.setSecretKey(secretKey.toProtobuf());
-    }
-    return builder.build();
-  }
-
-  private SCMGetCurrentSecretKeyResponse getCurrentSecretKey()
-      throws IOException {
-    return SCMGetCurrentSecretKeyResponse.newBuilder()
-        .setSecretKey(impl.getCurrentSecretKey().toProtobuf())
-        .build();
-  }
-
   /**
    * Convert exception to corresponsing status.
    * @param ex
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SecretKeyProtocolServerSideTranslatorPB.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SecretKeyProtocolServerSideTranslatorPB.java
new file mode 100644
index 0000000000..527d7a42fa
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SecretKeyProtocolServerSideTranslatorPB.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.protocol;
+
+import com.google.protobuf.ProtocolMessageEnum;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hdds.protocol.SecretKeyProtocol;
+import 
org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.SCMGetCurrentSecretKeyResponse;
+import 
org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.SCMGetSecretKeyRequest;
+import 
org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.SCMGetSecretKeyResponse;
+import 
org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.SCMSecretKeyRequest;
+import 
org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.SCMSecretKeyResponse;
+import 
org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.SCMSecretKeysListResponse;
+import org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.Status;
+import org.apache.hadoop.hdds.protocolPB.SecretKeyProtocolDatanodePB;
+import org.apache.hadoop.hdds.protocolPB.SecretKeyProtocolOmPB;
+import org.apache.hadoop.hdds.scm.ha.RatisUtil;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.security.exception.SCMSecretKeyException;
+import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
+import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher;
+import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics;
+import org.apache.hadoop.util.ProtobufUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.UUID;
+
+/**
+ * This class is the server-side translator that forwards requests received on
+ * {@link SecretKeyProtocolDatanodePB} to the server implementation.
+ */
+public class SecretKeyProtocolServerSideTranslatorPB
+    implements SecretKeyProtocolDatanodePB, SecretKeyProtocolOmPB {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SecretKeyProtocolServerSideTranslatorPB.class);
+
+  private final SecretKeyProtocol impl;
+  private final StorageContainerManager scm;
+
+  private OzoneProtocolMessageDispatcher<SCMSecretKeyRequest,
+      SCMSecretKeyResponse, ProtocolMessageEnum> dispatcher;
+
+  public SecretKeyProtocolServerSideTranslatorPB(SecretKeyProtocol impl,
+      StorageContainerManager storageContainerManager,
+      ProtocolMessageMetrics messageMetrics) {
+    this.impl = impl;
+    this.scm = storageContainerManager;
+    this.dispatcher =
+        new OzoneProtocolMessageDispatcher<>("SCMSecretKeyProtocol",
+            messageMetrics, LOG);
+  }
+
+  @Override
+  public SCMSecretKeyResponse submitRequest(RpcController controller,
+      SCMSecretKeyRequest request) throws ServiceException {
+    if (!scm.checkLeader()) {
+      RatisUtil.checkRatisException(
+          scm.getScmHAManager().getRatisServer().triggerNotLeaderException(),
+          scm.getSecurityProtocolRpcPort(), scm.getScmId());
+    }
+    return dispatcher.processRequest(request, this::processRequest,
+        request.getCmdType(), request.getTraceID());
+  }
+
+  public SCMSecretKeyResponse processRequest(SCMSecretKeyRequest request)
+      throws ServiceException {
+    SCMSecretKeyResponse.Builder scmSecurityResponse =
+        SCMSecretKeyResponse.newBuilder().setCmdType(request.getCmdType())
+            .setStatus(Status.OK);
+    try {
+      switch (request.getCmdType()) {
+      case GetCurrentSecretKey:
+        return scmSecurityResponse
+            .setCurrentSecretKeyResponseProto(getCurrentSecretKey())
+            .build();
+
+      case GetSecretKey:
+        return scmSecurityResponse.setGetSecretKeyResponseProto(
+                getSecretKey(request.getGetSecretKeyRequest()))
+            .build();
+
+      case GetAllSecretKeys:
+        return scmSecurityResponse
+            .setSecretKeysListResponseProto(getAllSecretKeys())
+            .build();
+
+      default:
+        throw new IllegalArgumentException(
+            "Unknown request type: " + request.getCmdType());
+      }
+    } catch (IOException e) {
+      RatisUtil.checkRatisException(e, scm.getSecurityProtocolRpcPort(),
+          scm.getScmId());
+      scmSecurityResponse.setSuccess(false);
+      scmSecurityResponse.setStatus(exceptionToResponseStatus(e));
+      // If actual cause is set in SCMSecurityException, set message with
+      // actual cause message.
+      if (e.getMessage() != null) {
+        scmSecurityResponse.setMessage(e.getMessage());
+      } else {
+        if (e.getCause() != null && e.getCause().getMessage() != null) {
+          scmSecurityResponse.setMessage(e.getCause().getMessage());
+        }
+      }
+      return scmSecurityResponse.build();
+    }
+  }
+
+  private SCMSecretKeysListResponse getAllSecretKeys()
+      throws IOException {
+    SCMSecretKeysListResponse.Builder builder =
+        SCMSecretKeysListResponse.newBuilder();
+    impl.getAllSecretKeys()
+        .stream().map(ManagedSecretKey::toProtobuf)
+        .forEach(builder::addSecretKeys);
+    return builder.build();
+  }
+
+  private SCMGetSecretKeyResponse getSecretKey(
+      SCMGetSecretKeyRequest getSecretKeyRequest) throws IOException {
+    SCMGetSecretKeyResponse.Builder builder =
+        SCMGetSecretKeyResponse.newBuilder();
+    UUID id = ProtobufUtils.fromProtobuf(getSecretKeyRequest.getSecretKeyId());
+    ManagedSecretKey secretKey = impl.getSecretKey(id);
+    if (secretKey != null) {
+      builder.setSecretKey(secretKey.toProtobuf());
+    }
+    return builder.build();
+  }
+
+  private SCMGetCurrentSecretKeyResponse getCurrentSecretKey()
+      throws IOException {
+    return SCMGetCurrentSecretKeyResponse.newBuilder()
+        .setSecretKey(impl.getCurrentSecretKey().toProtobuf())
+        .build();
+  }
+
+  private Status exceptionToResponseStatus(IOException ex) {
+    if (ex instanceof SCMSecretKeyException) {
+      return Status.values()[
+          ((SCMSecretKeyException) ex).getErrorCode().ordinal()];
+    } else {
+      return Status.INTERNAL_ERROR;
+    }
+  }
+
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMPolicyProvider.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMPolicyProvider.java
index 0602ba2341..ce0d5e25eb 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMPolicyProvider.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMPolicyProvider.java
@@ -22,7 +22,10 @@ import org.apache.hadoop.hdds.annotation.InterfaceAudience;
 import org.apache.hadoop.hdds.annotation.InterfaceAudience.Private;
 import org.apache.hadoop.hdds.annotation.InterfaceStability;
 import org.apache.hadoop.hdds.annotation.InterfaceStability.Unstable;
+import org.apache.hadoop.hdds.protocol.SecretKeyProtocolDatanode;
+import org.apache.hadoop.hdds.protocol.SecretKeyProtocolOm;
 import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
+import org.apache.hadoop.hdds.protocol.SecretKeyProtocolScm;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
 import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
@@ -35,6 +38,9 @@ import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SECURITY_CLIENT_DATANOD
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SECURITY_CLIENT_SCM_BLOCK_PROTOCOL_ACL;
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SECURITY_CLIENT_SCM_CERTIFICATE_PROTOCOL_ACL;
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SECURITY_CLIENT_SCM_CONTAINER_PROTOCOL_ACL;
+import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SECURITY_CLIENT_SCM_SECRET_KEY_DATANODE_PROTOCOL_ACL;
+import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SECURITY_CLIENT_SCM_SECRET_KEY_OM_PROTOCOL_ACL;
+import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SECURITY_CLIENT_SCM_SECRET_KEY_SCM_PROTOCOL_ACL;
 
 /**
  * {@link PolicyProvider} for SCM protocols.
@@ -72,6 +78,15 @@ public final class SCMPolicyProvider extends PolicyProvider {
           new Service(
               HDDS_SECURITY_CLIENT_SCM_CERTIFICATE_PROTOCOL_ACL,
               SCMSecurityProtocol.class),
+          new Service(
+              HDDS_SECURITY_CLIENT_SCM_SECRET_KEY_OM_PROTOCOL_ACL,
+              SecretKeyProtocolOm.class),
+          new Service(
+              HDDS_SECURITY_CLIENT_SCM_SECRET_KEY_SCM_PROTOCOL_ACL,
+              SecretKeyProtocolScm.class),
+          new Service(
+              HDDS_SECURITY_CLIENT_SCM_SECRET_KEY_DATANODE_PROTOCOL_ACL,
+              SecretKeyProtocolDatanode.class)
       };
 
   @SuppressFBWarnings("EI_EXPOSE_REP")
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
index b1c25ae7a6..dd882eb938 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
@@ -38,21 +38,27 @@ import java.util.stream.Collectors;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.SecretKeyProtocol;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeDetailsProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType;
 import 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.OzoneManagerDetailsProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmNodeDetailsProto;
+import org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos;
+import org.apache.hadoop.hdds.protocolPB.SecretKeyProtocolDatanodePB;
+import org.apache.hadoop.hdds.protocolPB.SecretKeyProtocolOmPB;
 import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
+import org.apache.hadoop.hdds.protocolPB.SecretKeyProtocolScmPB;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
+import 
org.apache.hadoop.hdds.scm.protocol.SecretKeyProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdds.scm.update.server.SCMUpdateServiceGrpcServer;
 import org.apache.hadoop.hdds.scm.update.client.UpdateServiceConfig;
 import org.apache.hadoop.hdds.scm.update.server.SCMCRLStore;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import 
org.apache.hadoop.hdds.scm.protocol.SCMSecurityProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdds.security.exception.SCMSecretKeyException;
 import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
-import 
org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode;
 import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
 import org.apache.hadoop.hdds.security.symmetric.SecretKeyManager;
 import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
@@ -75,6 +81,8 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import static 
org.apache.hadoop.hdds.security.exception.SCMSecretKeyException.ErrorCode.SECRET_KEY_NOT_ENABLED;
+import static 
org.apache.hadoop.hdds.security.exception.SCMSecretKeyException.ErrorCode.SECRET_KEY_NOT_INITIALIZED;
 import static 
org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.CERTIFICATE_NOT_FOUND;
 import static 
org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.GET_CA_CERT_FAILED;
 import static 
org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.GET_CERTIFICATE_FAILED;
@@ -86,7 +94,8 @@ import static 
org.apache.hadoop.hdds.security.x509.certificate.authority.Certifi
 @KerberosInfo(
     serverPrincipal = ScmConfig.ConfigStrings.HDDS_SCM_KERBEROS_PRINCIPAL_KEY)
 @InterfaceAudience.Private
-public class SCMSecurityProtocolServer implements SCMSecurityProtocol {
+public class SCMSecurityProtocolServer implements SCMSecurityProtocol,
+    SecretKeyProtocol {
 
   private static final Logger LOGGER = LoggerFactory
       .getLogger(SCMSecurityProtocolServer.class);
@@ -97,6 +106,7 @@ public class SCMSecurityProtocolServer implements 
SCMSecurityProtocol {
   private final SCMUpdateServiceGrpcServer grpcUpdateServer; // gRPC SERVER
   private final InetSocketAddress rpcAddress;
   private final ProtocolMessageMetrics metrics;
+  private final ProtocolMessageMetrics secretKeyMetrics;
   private final StorageContainerManager storageContainerManager;
 
   // SecretKey may not be enabled when neither block token nor container
@@ -125,11 +135,20 @@ public class SCMSecurityProtocolServer implements 
SCMSecurityProtocol {
     metrics = new ProtocolMessageMetrics("ScmSecurityProtocol",
         "SCM Security protocol metrics",
         SCMSecurityProtocolProtos.Type.values());
+    secretKeyMetrics = new ProtocolMessageMetrics("ScmSecretKeyProtocol",
+        "SCM SecretKey protocol metrics",
+        SCMSecretKeyProtocolProtos.Type.values());
     BlockingService secureProtoPbService =
         SCMSecurityProtocolProtos.SCMSecurityProtocolService
             .newReflectiveBlockingService(
                 new SCMSecurityProtocolServerSideTranslatorPB(this,
                     scm, metrics));
+    BlockingService secretKeyService =
+        SCMSecretKeyProtocolProtos.SCMSecretKeyProtocolService
+            .newReflectiveBlockingService(
+                new SecretKeyProtocolServerSideTranslatorPB(
+                    this, scm, secretKeyMetrics)
+        );
     this.rpcServer =
         StorageContainerManager.startRpcServer(
             conf,
@@ -137,6 +156,12 @@ public class SCMSecurityProtocolServer implements 
SCMSecurityProtocol {
             SCMSecurityProtocolPB.class,
             secureProtoPbService,
             handlerCount);
+    HddsServerUtil.addPBProtocol(conf, SecretKeyProtocolDatanodePB.class,
+        secretKeyService, rpcServer);
+    HddsServerUtil.addPBProtocol(conf, SecretKeyProtocolOmPB.class,
+        secretKeyService, rpcServer);
+    HddsServerUtil.addPBProtocol(conf, SecretKeyProtocolScmPB.class,
+        secretKeyService, rpcServer);
     if (conf.getBoolean(
         CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) {
       rpcServer.refreshServiceAcl(conf, SCMPolicyProvider.getInstance());
@@ -176,33 +201,34 @@ public class SCMSecurityProtocolServer implements 
SCMSecurityProtocol {
   }
 
   @Override
-  public ManagedSecretKey getCurrentSecretKey() throws SCMSecurityException {
+  public ManagedSecretKey getCurrentSecretKey() throws SCMSecretKeyException {
     validateSecretKeyStatus();
     return secretKeyManager.getCurrentSecretKey();
   }
 
   @Override
-  public ManagedSecretKey getSecretKey(UUID id) throws SCMSecurityException {
+  public ManagedSecretKey getSecretKey(UUID id) throws SCMSecretKeyException {
     validateSecretKeyStatus();
     return secretKeyManager.getSecretKey(id);
   }
 
   @Override
-  public List<ManagedSecretKey> getAllSecretKeys() throws SCMSecurityException 
{
+  public List<ManagedSecretKey> getAllSecretKeys()
+      throws SCMSecretKeyException {
     validateSecretKeyStatus();
     return secretKeyManager.getSortedKeys();
   }
 
-  private void validateSecretKeyStatus() throws SCMSecurityException {
+  private void validateSecretKeyStatus() throws SCMSecretKeyException {
     if (secretKeyManager == null) {
-      throw new SCMSecurityException("Secret keys are not enabled.",
-          ErrorCode.SECRET_KEY_NOT_ENABLED);
+      throw new SCMSecretKeyException("Secret keys are not enabled.",
+          SECRET_KEY_NOT_ENABLED);
     }
 
     if (!secretKeyManager.isInitialized()) {
-      throw new SCMSecurityException(
+      throw new SCMSecretKeyException(
           "Secret key initialization is not finished yet.",
-          ErrorCode.SECRET_KEY_NOT_INITIALIZED);
+          SECRET_KEY_NOT_INITIALIZED);
     }
   }
 
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecretKeysApi.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecretKeysApi.java
index c2bfbb0e15..6a1166140c 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecretKeysApi.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecretKeysApi.java
@@ -20,16 +20,17 @@ package org.apache.hadoop.ozone;
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
 import org.apache.hadoop.hdds.conf.DefaultConfigManager;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
-import org.apache.hadoop.hdds.scm.ScmConfig;
+import org.apache.hadoop.hdds.protocol.SecretKeyProtocol;
 import org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
-import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+import org.apache.hadoop.hdds.security.exception.SCMSecretKeyException;
 import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
-import org.apache.hadoop.hdds.utils.HddsServerUtil;
+import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.minikdc.MiniKdc;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.util.ExitUtil;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ratis.util.ExitUtils;
 import org.jetbrains.annotations.NotNull;
@@ -50,6 +51,7 @@ import java.util.UUID;
 import java.util.concurrent.TimeoutException;
 
 import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
 import static 
org.apache.hadoop.hdds.DFSConfigKeysLegacy.DFS_DATANODE_KERBEROS_KEYTAB_FILE_KEY;
 import static 
org.apache.hadoop.hdds.DFSConfigKeysLegacy.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
@@ -61,7 +63,8 @@ import static 
org.apache.hadoop.hdds.scm.ScmConfig.ConfigStrings.HDDS_SCM_KERBER
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY;
 import static 
org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig.ConfigStrings.HDDS_SCM_HTTP_KERBEROS_KEYTAB_FILE_KEY;
 import static 
org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig.ConfigStrings.HDDS_SCM_HTTP_KERBEROS_PRINCIPAL_KEY;
-import static 
org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.SECRET_KEY_NOT_ENABLED;
+import static 
org.apache.hadoop.hdds.security.exception.SCMSecretKeyException.ErrorCode.SECRET_KEY_NOT_ENABLED;
+import static 
org.apache.hadoop.hdds.utils.HddsServerUtil.getSecretKeyClientForDatanode;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_KERBEROS_KEYTAB_FILE;
@@ -70,7 +73,6 @@ import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_KEYTAB_F
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY;
 import static 
org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThrows;
@@ -94,6 +96,7 @@ public final class TestSecretKeysApi {
   private File spnegoKeytab;
   private File testUserKeytab;
   private String testUserPrincipal;
+  private String ozonePrincipal;
   private String clusterId;
   private String scmId;
   private MiniOzoneHAClusterImpl cluster;
@@ -104,6 +107,7 @@ public final class TestSecretKeysApi {
     conf.set(OZONE_SCM_CLIENT_ADDRESS_KEY, "localhost");
 
     ExitUtils.disableSystemExit();
+    ExitUtil.disableSystemExit();
 
     workDir = GenericTestUtils.getTestDir(getClass().getSimpleName());
     clusterId = UUID.randomUUID().toString();
@@ -124,10 +128,9 @@ public final class TestSecretKeysApi {
   }
 
   private void createCredentialsInKDC() throws Exception {
-    ScmConfig scmConfig = conf.getObject(ScmConfig.class);
     SCMHTTPServerConfig httpServerConfig =
         conf.getObject(SCMHTTPServerConfig.class);
-    createPrincipal(ozoneKeytab, scmConfig.getKerberosPrincipal());
+    createPrincipal(ozoneKeytab, ozonePrincipal);
     createPrincipal(spnegoKeytab, httpServerConfig.getKerberosPrincipal());
     createPrincipal(testUserKeytab, testUserPrincipal);
   }
@@ -155,11 +158,12 @@ public final class TestSecretKeysApi {
 
     String realm = miniKdc.getRealm();
     String hostAndRealm = host + "@" + realm;
-    conf.set(HDDS_SCM_KERBEROS_PRINCIPAL_KEY, "scm/" + hostAndRealm);
+    ozonePrincipal = "scm/" + hostAndRealm;
+    conf.set(HDDS_SCM_KERBEROS_PRINCIPAL_KEY, ozonePrincipal);
     conf.set(HDDS_SCM_HTTP_KERBEROS_PRINCIPAL_KEY, "HTTP_SCM/" + hostAndRealm);
-    conf.set(OZONE_OM_KERBEROS_PRINCIPAL_KEY, "scm/" + hostAndRealm);
+    conf.set(OZONE_OM_KERBEROS_PRINCIPAL_KEY, ozonePrincipal);
     conf.set(OZONE_OM_HTTP_KERBEROS_PRINCIPAL_KEY, "HTTP_OM/" + hostAndRealm);
-    conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, "scm/" + hostAndRealm);
+    conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, ozonePrincipal);
 
     ozoneKeytab = new File(workDir, "scm.keytab");
     spnegoKeytab = new File(workDir, "http.keytab");
@@ -176,6 +180,8 @@ public final class TestSecretKeysApi {
         spnegoKeytab.getAbsolutePath());
     conf.set(DFS_DATANODE_KERBEROS_KEYTAB_FILE_KEY,
         ozoneKeytab.getAbsolutePath());
+
+    conf.setBoolean(HADOOP_SECURITY_AUTHORIZATION, true);
   }
 
   /**
@@ -190,23 +196,22 @@ public final class TestSecretKeysApi {
     conf.set(HDDS_SECRET_KEY_ROTATE_DURATION, "1s");
     conf.set(HDDS_SECRET_KEY_EXPIRY_DURATION, "3000ms");
 
-    startCluster();
-    SCMSecurityProtocol securityProtocol = getScmSecurityProtocol();
+    startCluster(3);
+    SecretKeyProtocol secretKeyProtocol = getSecretKeyProtocol();
 
     // start the test when keys are full.
     GenericTestUtils.waitFor(() -> {
       try {
-        return securityProtocol.getAllSecretKeys().size() >= 3;
+        return secretKeyProtocol.getAllSecretKeys().size() >= 3;
       } catch (IOException ex) {
         throw new RuntimeException(ex);
       }
     }, 100, 4_000);
 
-    ManagedSecretKey initialKey = securityProtocol.getCurrentSecretKey();
+    ManagedSecretKey initialKey = secretKeyProtocol.getCurrentSecretKey();
     assertNotNull(initialKey);
-    List<ManagedSecretKey> initialKeys = securityProtocol.getAllSecretKeys();
+    List<ManagedSecretKey> initialKeys = secretKeyProtocol.getAllSecretKeys();
     assertEquals(initialKey, initialKeys.get(0));
-    ManagedSecretKey lastKey = initialKeys.get(initialKeys.size() - 1);
 
     LOG.info("Initial active key: {}", initialKey);
     LOG.info("Initial keys: {}", initialKeys);
@@ -214,29 +219,27 @@ public final class TestSecretKeysApi {
     // wait for the next rotation.
     GenericTestUtils.waitFor(() -> {
       try {
-        ManagedSecretKey newCurrentKey = 
securityProtocol.getCurrentSecretKey();
+        ManagedSecretKey newCurrentKey =
+            secretKeyProtocol.getCurrentSecretKey();
         return !newCurrentKey.equals(initialKey);
       } catch (IOException ex) {
         throw new RuntimeException(ex);
       }
     }, 100, 1500);
-    ManagedSecretKey  updatedKey = securityProtocol.getCurrentSecretKey();
-    List<ManagedSecretKey>  updatedKeys = securityProtocol.getAllSecretKeys();
+    ManagedSecretKey  updatedKey = secretKeyProtocol.getCurrentSecretKey();
+    List<ManagedSecretKey>  updatedKeys = secretKeyProtocol.getAllSecretKeys();
 
     LOG.info("Updated active key: {}", updatedKey);
     LOG.info("Updated keys: {}", updatedKeys);
 
     assertEquals(updatedKey, updatedKeys.get(0));
     assertEquals(initialKey, updatedKeys.get(1));
-    // ensure the last key from the previous cycle no longer managed.
-    assertTrue(lastKey.isExpired());
-    assertFalse(updatedKeys.contains(lastKey));
 
     // assert getSecretKey by ID.
-    ManagedSecretKey keyById = securityProtocol.getSecretKey(
+    ManagedSecretKey keyById = secretKeyProtocol.getSecretKey(
         updatedKey.getId());
     assertNotNull(keyById);
-    ManagedSecretKey nonExisting = securityProtocol.getSecretKey(
+    ManagedSecretKey nonExisting = secretKeyProtocol.getSecretKey(
         UUID.randomUUID());
     assertNull(nonExisting);
   }
@@ -246,19 +249,19 @@ public final class TestSecretKeysApi {
    */
   @Test
   public void testSecretKeyApiNotEnabled() throws Exception {
-    startCluster();
-    SCMSecurityProtocol securityProtocol = getScmSecurityProtocol();
+    startCluster(1);
+    SecretKeyProtocol secretKeyProtocol = getSecretKeyProtocol();
 
-    SCMSecurityException ex = assertThrows(SCMSecurityException.class,
-            securityProtocol::getCurrentSecretKey);
+    SCMSecretKeyException ex = assertThrows(SCMSecretKeyException.class,
+            secretKeyProtocol::getCurrentSecretKey);
     assertEquals(SECRET_KEY_NOT_ENABLED, ex.getErrorCode());
 
-    ex = assertThrows(SCMSecurityException.class,
-        () -> securityProtocol.getSecretKey(UUID.randomUUID()));
+    ex = assertThrows(SCMSecretKeyException.class,
+        () -> secretKeyProtocol.getSecretKey(UUID.randomUUID()));
     assertEquals(SECRET_KEY_NOT_ENABLED, ex.getErrorCode());
 
-    ex = assertThrows(SCMSecurityException.class,
-        securityProtocol::getAllSecretKeys);
+    ex = assertThrows(SCMSecretKeyException.class,
+        secretKeyProtocol::getAllSecretKeys);
     assertEquals(SECRET_KEY_NOT_ENABLED, ex.getErrorCode());
   }
 
@@ -274,8 +277,8 @@ public final class TestSecretKeysApi {
     conf.set(HDDS_SECRET_KEY_ROTATE_DURATION, "1d");
     conf.set(HDDS_SECRET_KEY_EXPIRY_DURATION, "7d");
 
-    startCluster();
-    SCMSecurityProtocol securityProtocol = getScmSecurityProtocol();
+    startCluster(3);
+    SecretKeyProtocol securityProtocol = getSecretKeyProtocol();
     List<ManagedSecretKey> keysInitial = securityProtocol.getAllSecretKeys();
     LOG.info("Keys before fail over: {}.", keysInitial);
 
@@ -294,7 +297,40 @@ public final class TestSecretKeysApi {
     }
   }
 
-  private void startCluster()
+  @Test
+  public void testSecretKeyAuthorization() throws Exception {
+    enableBlockToken();
+    conf.setBoolean(HADOOP_SECURITY_AUTHORIZATION, true);
+    startCluster(1);
+
+    // When HADOOP_SECURITY_AUTHORIZATION is enabled, SecretKey protocol
+    // is only available for Datanode and OM, any other authenticated user
+    // can't access the protocol.
+    SecretKeyProtocol secretKeyProtocol =
+        getSecretKeyProtocol(testUserPrincipal, testUserKeytab);
+    RemoteException ex =
+        assertThrows(RemoteException.class,
+            secretKeyProtocol::getCurrentSecretKey);
+    assertEquals(AuthorizationException.class.getName(), ex.getClassName());
+    assertTrue(ex.getMessage().contains(
+        "User [email protected] (auth:KERBEROS) is not authorized " +
+            "for protocol"));
+  }
+
+  @Test
+  public void testSecretKeyWithoutAuthorization() throws Exception {
+    enableBlockToken();
+    conf.setBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
+    startCluster(1);
+
+    // When HADOOP_SECURITY_AUTHORIZATION is not enabled, any other
+    // authenticated user can access the protocol.
+    SecretKeyProtocol secretKeyProtocol =
+        getSecretKeyProtocol(testUserPrincipal, testUserKeytab);
+    assertNotNull(secretKeyProtocol.getCurrentSecretKey());
+  }
+
+  private void startCluster(int numSCMs)
       throws IOException, TimeoutException, InterruptedException {
     OzoneManager.setTestSecureOmFlag(true);
     MiniOzoneCluster.Builder builder = MiniOzoneCluster.newHABuilder(conf)
@@ -302,7 +338,7 @@ public final class TestSecretKeysApi {
         .setSCMServiceId("TestSecretKey")
         .setScmId(scmId)
         .setNumDatanodes(3)
-        .setNumOfStorageContainerManagers(3)
+        .setNumOfStorageContainerManagers(numSCMs)
         .setNumOfOzoneManagers(1);
 
     cluster = (MiniOzoneHAClusterImpl) builder.build();
@@ -310,15 +346,18 @@ public final class TestSecretKeysApi {
   }
 
   @NotNull
-  private SCMSecurityProtocol getScmSecurityProtocol() throws IOException {
+  private SecretKeyProtocol getSecretKeyProtocol() throws IOException {
+    return getSecretKeyProtocol(ozonePrincipal, ozoneKeytab);
+  }
+
+  @NotNull
+  private SecretKeyProtocol getSecretKeyProtocol(
+      String user, File keyTab) throws IOException {
     UserGroupInformation ugi =
         UserGroupInformation.loginUserFromKeytabAndReturnUGI(
-            testUserPrincipal, testUserKeytab.getCanonicalPath());
+            user, keyTab.getCanonicalPath());
     ugi.setAuthenticationMethod(KERBEROS);
-    SCMSecurityProtocol scmSecurityProtocolClient =
-        HddsServerUtil.getScmSecurityClient(conf, ugi);
-    assertNotNull(scmSecurityProtocolClient);
-    return scmSecurityProtocolClient;
+    return getSecretKeyClientForDatanode(conf, ugi);
   }
 
   private void enableBlockToken() {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index f8d6d3addc..b309d314d0 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -73,6 +73,7 @@ import org.apache.hadoop.hdds.conf.ConfigurationException;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.ReconfigureProtocol;
+import org.apache.hadoop.hdds.protocol.SecretKeyProtocol;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import 
org.apache.hadoop.hdds.protocol.proto.ReconfigureProtocolProtos.ReconfigureProtocolService;
 import org.apache.hadoop.hdds.protocolPB.ReconfigureProtocolPB;
@@ -627,7 +628,9 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
       certClient = new OMCertificateClient(secConfig, omStorage,
           scmInfo == null ? null : scmInfo.getScmId(), this::saveNewCertId,
           this::terminateOM);
-      secretKeyClient = DefaultSecretKeySignerClient.create(conf);
+      SecretKeyProtocol secretKeyProtocol =
+          HddsServerUtil.getSecretKeyClientForOm(conf);
+      secretKeyClient = new DefaultSecretKeySignerClient(secretKeyProtocol);
     }
     if (secConfig.isBlockTokenEnabled()) {
       blockTokenMgr = createBlockTokenSecretManager();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to