This is an automated email from the ASF dual-hosted git repository.

duong pushed a commit to branch HDDS-7733-Symmetric-Tokens
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 42e2cb302e074b41e7ace927498c719a52a65558
Author: Duong Nguyen <[email protected]>
AuthorDate: Tue Mar 14 17:59:54 2023 -0700

    HDDS-7830. SCM API for OM and Datanode to get secret keys (#4345)
---
 .../security/exception/SCMSecurityException.java   |   4 +-
 .../java/org/apache/hadoop/util/ProtobufUtils.java |  41 +++
 .../hadoop/hdds/utils/TestProtobufUtils.java       |  48 +++
 .../hadoop/hdds/protocol/SCMSecurityProtocol.java  |  23 ++
 .../SCMSecurityProtocolClientSideTranslatorPB.java |  38 +++
 .../hdds/security/symmetric/ManagedSecretKey.java  |  12 +-
 .../hdds/security/symmetric/SecretKeyManager.java  |  12 +
 .../hdds/security/symmetric/SecretKeyState.java    |   3 +
 .../security/symmetric/SecretKeyStateImpl.java     |  23 ++
 .../src/main/proto/ScmServerSecurityProtocol.proto |  30 ++
 .../SCMSecurityProtocolServerSideTranslatorPB.java |  50 ++++
 .../hdds/scm/security/SecretKeyManagerService.java |   4 +
 .../hdds/scm/server/SCMSecurityProtocolServer.java |  48 ++-
 .../hdds/scm/server/StorageContainerManager.java   |  22 +-
 .../scm/server/TestSCMSecurityProtocolServer.java  |   2 +-
 .../hadoop/ozone/MiniOzoneHAClusterImpl.java       |   7 +-
 .../org/apache/hadoop/ozone/TestSecretKeysApi.java | 329 +++++++++++++++++++++
 17 files changed, 674 insertions(+), 22 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecurityException.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecurityException.java
index 7e008afc41..13b8395391 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecurityException.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecurityException.java
@@ -109,6 +109,8 @@ public class SCMSecurityException extends IOException {
     MISSING_BLOCK_TOKEN,
     BLOCK_TOKEN_VERIFICATION_FAILED,
     GET_ROOT_CA_CERT_FAILED,
-    NOT_A_PRIMARY_SCM
+    NOT_A_PRIMARY_SCM,
+    SECRET_KEY_NOT_ENABLED,
+    SECRET_KEY_NOT_INITIALIZED
   }
 }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/util/ProtobufUtils.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/util/ProtobufUtils.java
new file mode 100644
index 0000000000..428157981e
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/util/ProtobufUtils.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+
+import java.util.UUID;
+
+/**
+ * Contains utilities to ease common protobuf to java object conversions.
+ */
+public final class ProtobufUtils {
+  private ProtobufUtils() {
+  }
+
+  public static HddsProtos.UUID toProtobuf(UUID uuid) {
+    return HddsProtos.UUID.newBuilder()
+        .setMostSigBits(uuid.getMostSignificantBits())
+        .setLeastSigBits(uuid.getLeastSignificantBits())
+        .build();
+  }
+
+  public static UUID fromProtobuf(HddsProtos.UUID proto) {
+    return new UUID(proto.getMostSigBits(), proto.getLeastSigBits());
+  }
+}
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestProtobufUtils.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestProtobufUtils.java
new file mode 100644
index 0000000000..fe6a57846c
--- /dev/null
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestProtobufUtils.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.utils;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.junit.jupiter.api.Test;
+
+import java.util.UUID;
+
+import static org.apache.hadoop.util.ProtobufUtils.fromProtobuf;
+import static org.apache.hadoop.util.ProtobufUtils.toProtobuf;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Test-cases for {@link org.apache.hadoop.util.ProtobufUtils}.
+ */
+public class TestProtobufUtils {
+  @Test
+  public void testUuidToProtobuf() {
+    UUID object = UUID.randomUUID();
+    HddsProtos.UUID protobuf = toProtobuf(object);
+    assertEquals(object.getLeastSignificantBits(), protobuf.getLeastSigBits());
+    assertEquals(object.getMostSignificantBits(), protobuf.getMostSigBits());
+  }
+
+  @Test
+  public void testUuidConversion() {
+    UUID original = UUID.randomUUID();
+    HddsProtos.UUID protobuf = toProtobuf(original);
+    UUID deserialized = fromProtobuf(protobuf);
+    assertEquals(original, deserialized);
+  }
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocol/SCMSecurityProtocol.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocol/SCMSecurityProtocol.java
index 26107d54ac..1cfe568d8a 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocol/SCMSecurityProtocol.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocol/SCMSecurityProtocol.java
@@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.protocol;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.UUID;
 
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -26,6 +27,7 @@ import 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.OzoneManagerDetailsProto
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmNodeDetailsProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeDetailsProto;
 import org.apache.hadoop.hdds.scm.ScmConfig;
+import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
 import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
 import org.apache.hadoop.security.KerberosInfo;
 
@@ -170,4 +172,25 @@ public interface SCMSecurityProtocol {
    */
   String getCertificate(NodeDetailsProto nodeDetails,
       String certSignReq) throws IOException;
+
+
+  /**
+   * Get the current SecretKey that is used for signing tokens.
+   * @return ManagedSecretKey
+   */
+  ManagedSecretKey getCurrentSecretKey() throws IOException;
+
+  /**
+   * Get a particular SecretKey by ID.
+   *
+   * @param id the id to get SecretKey.
+   * @return ManagedSecretKey.
+   */
+  ManagedSecretKey getSecretKey(UUID id) throws IOException;
+
+  /**
+   * Get all the non-expired SecretKey managed by SCM.
+   * @return list of ManagedSecretKey.
+   */
+  List<ManagedSecretKey> getAllSecretKeys() throws IOException;
 }
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java
index 19bae372b4..ab09061c44 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java
@@ -22,7 +22,9 @@ import java.security.cert.CRLException;
 import java.security.cert.CertificateException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.UUID;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
@@ -40,6 +42,8 @@ import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCer
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCrlsRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetDataNodeCertRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertRequestProto;
+import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetSecretKeyRequest;
+import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetSecretKeyResponse;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMListCACertificateRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetLatestCrlIdRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMListCertificateRequestProto;
@@ -51,6 +55,7 @@ import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMSecuri
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.Type;
 import 
org.apache.hadoop.hdds.scm.proxy.SCMSecurityProtocolFailoverProxyProvider;
 import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
 import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
 import org.apache.hadoop.io.retry.RetryProxy;
@@ -60,6 +65,7 @@ import org.apache.hadoop.ipc.RPC;
 
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
+
 import static 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetOMCertRequestProto;
 
 /**
@@ -189,6 +195,38 @@ public class SCMSecurityProtocolClientSideTranslatorPB 
implements
         .getX509Certificate();
   }
 
+  @Override
+  public ManagedSecretKey getCurrentSecretKey() throws IOException {
+    SCMSecurityProtocolProtos.ManagedSecretKey secretKeyProto =
+        submitRequest(Type.GetCurrentSecretKey, builder -> {
+        }).getCurrentSecretKeyResponseProto().getSecretKey();
+    return ManagedSecretKey.fromProtobuf(secretKeyProto);
+  }
+
+  @Override
+  public ManagedSecretKey getSecretKey(UUID id) throws IOException {
+    SCMGetSecretKeyRequest request = SCMGetSecretKeyRequest.newBuilder()
+        .setSecretKeyId(HddsProtos.UUID.newBuilder()
+            .setMostSigBits(id.getMostSignificantBits())
+            .setLeastSigBits(id.getLeastSignificantBits())).build();
+    SCMGetSecretKeyResponse response = submitRequest(Type.GetSecretKey,
+        builder -> builder.setGetSecretKeyRequest(request))
+        .getGetSecretKeyResponseProto();
+
+    return response.hasSecretKey() ?
+        ManagedSecretKey.fromProtobuf(response.getSecretKey()) : null;
+  }
+
+  @Override
+  public List<ManagedSecretKey> getAllSecretKeys() throws IOException {
+    List<SCMSecurityProtocolProtos.ManagedSecretKey> secretKeysList =
+        submitRequest(Type.GetAllSecretKeys, builder -> {
+        }).getSecretKeysListResponseProto().getSecretKeysList();
+    return secretKeysList.stream()
+        .map(ManagedSecretKey::fromProtobuf)
+        .collect(Collectors.toList());
+  }
+
   /**
    * Get signed certificate for SCM node.
    *
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/ManagedSecretKey.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/ManagedSecretKey.java
index 7e8aaacb48..3128265e9a 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/ManagedSecretKey.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/ManagedSecretKey.java
@@ -19,8 +19,8 @@
 package org.apache.hadoop.hdds.security.symmetric;
 
 import com.google.protobuf.ByteString;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos;
+import org.apache.hadoop.util.ProtobufUtils;
 
 import javax.crypto.SecretKey;
 import javax.crypto.spec.SecretKeySpec;
@@ -92,13 +92,8 @@ public final class ManagedSecretKey implements Serializable {
    * @return the protobuf message to deserialize this object.
    */
   public SCMSecurityProtocolProtos.ManagedSecretKey toProtobuf() {
-    HddsProtos.UUID uuid = HddsProtos.UUID.newBuilder()
-        .setMostSigBits(this.id.getMostSignificantBits())
-        .setLeastSigBits(this.id.getLeastSignificantBits())
-        .build();
-
     return SCMSecurityProtocolProtos.ManagedSecretKey.newBuilder()
-        .setId(uuid)
+        .setId(ProtobufUtils.toProtobuf(id))
         .setCreationTime(this.creationTime.toEpochMilli())
         .setExpiryTime(this.expiryTime.toEpochMilli())
         .setAlgorithm(this.secretKey.getAlgorithm())
@@ -111,8 +106,7 @@ public final class ManagedSecretKey implements Serializable 
{
    */
   public static ManagedSecretKey fromProtobuf(
       SCMSecurityProtocolProtos.ManagedSecretKey message) {
-    UUID id = new UUID(message.getId().getMostSigBits(),
-        message.getId().getLeastSigBits());
+    UUID id = ProtobufUtils.fromProtobuf(message.getId());
     Instant creationTime = Instant.ofEpochMilli(message.getCreationTime());
     Instant expiryTime = Instant.ofEpochMilli(message.getExpiryTime());
     SecretKey secretKey = new SecretKeySpec(message.getEncoded().toByteArray(),
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyManager.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyManager.java
index 0dc5bf8902..cb529e10d1 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyManager.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyManager.java
@@ -129,6 +129,18 @@ public class SecretKeyManager {
     return false;
   }
 
+  public ManagedSecretKey getCurrentKey() {
+    return state.getCurrentKey();
+  }
+
+  public ManagedSecretKey getKey(UUID id) {
+    return state.getKey(id);
+  }
+
+  public List<ManagedSecretKey> getSortedKeys() {
+    return state.getSortedKeys();
+  }
+
   private boolean shouldRotate(ManagedSecretKey currentKey) {
     Duration established = between(currentKey.getCreationTime(), 
Instant.now());
     return established.compareTo(rotationDuration) >= 0;
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyState.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyState.java
index 7be70b4b02..7b510a10b2 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyState.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyState.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.security.symmetric;
 import org.apache.hadoop.hdds.scm.metadata.Replicate;
 
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.TimeoutException;
 
 /**
@@ -36,6 +37,8 @@ public interface SecretKeyState {
    */
   ManagedSecretKey getCurrentKey();
 
+  ManagedSecretKey getKey(UUID id);
+
   /**
    * Get the keys that managed by this manager.
    * The returned keys are sorted by creation time, in the order of latest
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyStateImpl.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyStateImpl.java
index d5c886fd99..b1d66e1186 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyStateImpl.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyStateImpl.java
@@ -23,12 +23,16 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
 
 import static java.util.Comparator.comparing;
 import static java.util.Objects.requireNonNull;
 import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
 
 /**
  * Default implementation of {@link SecretKeyState}.
@@ -41,6 +45,7 @@ public final class SecretKeyStateImpl implements 
SecretKeyState {
 
   private List<ManagedSecretKey> sortedKeys;
   private ManagedSecretKey currentKey;
+  private Map<UUID, ManagedSecretKey> keyById;
 
   private final SecretKeyStore keyStore;
 
@@ -66,6 +71,20 @@ public final class SecretKeyStateImpl implements 
SecretKeyState {
     }
   }
 
+  @Override
+  public ManagedSecretKey getKey(UUID id) {
+    lock.readLock().lock();
+    try {
+      // Return null if not initialized yet.
+      if (keyById == null) {
+        return null;
+      }
+      return keyById.get(id);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
   /**
    * Get the keys that managed by this manager.
    * The returned keys are sorted by creation time, in the order of latest
@@ -98,6 +117,10 @@ public final class SecretKeyStateImpl implements 
SecretKeyState {
               .collect(toList())
       );
       currentKey = sortedKeys.get(0);
+      keyById = newKeys.stream().collect(toMap(
+          ManagedSecretKey::getId,
+          Function.identity()
+      ));
       LOG.info("Current key updated {}", currentKey);
       keyStore.save(sortedKeys);
     } finally {
diff --git 
a/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto 
b/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto
index 3621018fa8..27d1e3c1c3 100644
--- 
a/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto
+++ 
b/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto
@@ -56,6 +56,7 @@ message SCMSecurityRequest {
     optional SCMGetLatestCrlIdRequestProto getLatestCrlIdRequest = 11;
     optional SCMRevokeCertificatesRequestProto revokeCertificatesRequest = 12;
     optional SCMGetCertRequestProto getCertRequest = 13;
+    optional SCMGetSecretKeyRequest getSecretKeyRequest = 14;
 }
 
 message SCMSecurityResponse {
@@ -81,6 +82,12 @@ message SCMSecurityResponse {
 
     optional SCMRevokeCertificatesResponseProto 
revokeCertificatesResponseProto = 10;
 
+    optional SCMGetCurrentSecretKeyResponse currentSecretKeyResponseProto = 11;
+
+    optional SCMGetSecretKeyResponse getSecretKeyResponseProto = 12;
+
+    optional SCMSecretKeysListResponse secretKeysListResponseProto = 13;
+
 }
 
 enum Type {
@@ -96,6 +103,9 @@ enum Type {
     GetLatestCrlId = 10;
     RevokeCertificates = 11;
     GetCert = 12;
+    GetCurrentSecretKey = 13;
+    GetSecretKey = 14;
+    GetAllSecretKeys = 15;
 }
 
 enum Status {
@@ -116,6 +126,8 @@ enum Status {
     GET_ROOT_CA_CERTIFICATE_FAILED = 15;
     NOT_A_PRIMARY_SCM = 16;
     REVOKE_CERTIFICATE_FAILED = 17;
+    SECRET_KEY_NOT_ENABLED = 18;
+    SECRET_KEY_NOT_INITIALIZED = 19;
 }
 /**
 * This message is send by data node to prove its identity and get an SCM
@@ -258,3 +270,21 @@ message ManagedSecretKey {
     required bytes encoded = 5;
 }
 
+message SCMGetSecretKeyRequest {
+    required UUID secretKeyId = 1;
+}
+
+message SCMGetCurrentSecretKeyResponse {
+    required ManagedSecretKey secretKey = 1;
+}
+
+message SCMGetSecretKeyResponse {
+    optional ManagedSecretKey secretKey = 1;
+}
+
+message SCMSecretKeysListResponse {
+    repeated ManagedSecretKey secretKeys = 1;
+}
+
+
+
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecurityProtocolServerSideTranslatorPB.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecurityProtocolServerSideTranslatorPB.java
index 736aef15a0..9b7f71bd8a 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecurityProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecurityProtocolServerSideTranslatorPB.java
@@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm.protocol;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.UUID;
 
 import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos;
@@ -26,16 +27,20 @@ import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCer
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertificateRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCrlsRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCrlsResponseProto;
+import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCurrentSecretKeyResponse;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetDataNodeCertRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetLatestCrlIdRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetLatestCrlIdResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetOMCertRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetSCMCertRequestProto;
+import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetSecretKeyRequest;
+import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetSecretKeyResponse;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMListCertificateRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMListCertificateResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMRevokeCertificatesRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMRevokeCertificatesResponseProto;
+import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMSecretKeysListResponse;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMSecurityRequest;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMSecurityResponse;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.Status;
@@ -43,6 +48,7 @@ import 
org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
 import org.apache.hadoop.hdds.scm.ha.RatisUtil;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
 import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
 import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher;
 import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics;
@@ -50,6 +56,7 @@ import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics;
 import com.google.protobuf.ProtocolMessageEnum;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
+import org.apache.hadoop.util.ProtobufUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -151,6 +158,21 @@ public class SCMSecurityProtocolServerSideTranslatorPB
             getCertificate(request.getGetCertRequest()))
             .build();
 
+      case GetCurrentSecretKey:
+        return scmSecurityResponse
+            .setCurrentSecretKeyResponseProto(getCurrentSecretKey())
+            .build();
+
+      case GetSecretKey:
+        return scmSecurityResponse.setGetSecretKeyResponseProto(
+                getSecretKey(request.getGetSecretKeyRequest()))
+            .build();
+
+      case GetAllSecretKeys:
+        return scmSecurityResponse
+            .setSecretKeysListResponseProto(getAllSecretKeys())
+            .build();
+
       default:
         throw new IllegalArgumentException(
             "Unknown request type: " + request.getCmdType());
@@ -173,6 +195,34 @@ public class SCMSecurityProtocolServerSideTranslatorPB
     }
   }
 
+  private SCMSecretKeysListResponse getAllSecretKeys() throws IOException {
+    SCMSecretKeysListResponse.Builder builder =
+        SCMSecretKeysListResponse.newBuilder();
+    impl.getAllSecretKeys()
+        .stream().map(ManagedSecretKey::toProtobuf)
+        .forEach(builder::addSecretKeys);
+    return builder.build();
+  }
+
+  private SCMGetSecretKeyResponse getSecretKey(
+      SCMGetSecretKeyRequest getSecretKeyRequest) throws IOException {
+    SCMGetSecretKeyResponse.Builder builder =
+        SCMGetSecretKeyResponse.newBuilder();
+    UUID id = ProtobufUtils.fromProtobuf(getSecretKeyRequest.getSecretKeyId());
+    ManagedSecretKey secretKey = impl.getSecretKey(id);
+    if (secretKey != null) {
+      builder.setSecretKey(secretKey.toProtobuf());
+    }
+    return builder.build();
+  }
+
+  private SCMGetCurrentSecretKeyResponse getCurrentSecretKey()
+      throws IOException {
+    return SCMGetCurrentSecretKeyResponse.newBuilder()
+        .setSecretKey(impl.getCurrentSecretKey().toProtobuf())
+        .build();
+  }
+
   /**
    * Convert exception to corresponsing status.
    * @param ex
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/security/SecretKeyManagerService.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/security/SecretKeyManagerService.java
index 27ce30a8a1..1761f97992 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/security/SecretKeyManagerService.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/security/SecretKeyManagerService.java
@@ -147,6 +147,10 @@ public class SecretKeyManagerService implements 
SCMService, Runnable {
         TimeUnit.MILLISECONDS);
   }
 
+  public SecretKeyManager getSecretKeyManager() {
+    return secretKeyManager;
+  }
+
   @Override
   public void stop() {
     scheduler.shutdownNow();
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
index 2a99415573..8d09b95296 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
@@ -30,6 +30,7 @@ import java.util.Date;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.stream.Collectors;
@@ -44,12 +45,16 @@ import 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.OzoneManagerDetailsProto
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmNodeDetailsProto;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos;
 import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
 import org.apache.hadoop.hdds.scm.update.server.SCMUpdateServiceGrpcServer;
 import org.apache.hadoop.hdds.scm.update.client.UpdateServiceConfig;
 import org.apache.hadoop.hdds.scm.update.server.SCMCRLStore;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import 
org.apache.hadoop.hdds.scm.protocol.SCMSecurityProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+import 
org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode;
+import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeyManager;
 import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
 import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.hdds.scm.ScmConfig;
@@ -68,6 +73,8 @@ import org.bouncycastle.asn1.x509.CRLReason;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import static 
org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.CERTIFICATE_NOT_FOUND;
 import static 
org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.GET_CA_CERT_FAILED;
 import static 
org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.GET_CERTIFICATE_FAILED;
@@ -92,15 +99,21 @@ public class SCMSecurityProtocolServer implements 
SCMSecurityProtocol {
   private final ProtocolMessageMetrics metrics;
   private final StorageContainerManager storageContainerManager;
 
+  // SecretKey may not be enabled when neither block token nor container
+  // token is enabled.
+  private final SecretKeyManager secretKeyManager;
+
   SCMSecurityProtocolServer(OzoneConfiguration conf,
       CertificateServer rootCertificateServer,
       CertificateServer scmCertificateServer,
-      X509Certificate rootCACert, StorageContainerManager scm)
+      X509Certificate rootCACert, StorageContainerManager scm,
+      @Nullable SecretKeyManager secretKeyManager)
       throws IOException {
     this.storageContainerManager = scm;
     this.rootCertificateServer = rootCertificateServer;
     this.scmCertificateServer = scmCertificateServer;
     this.rootCACertificate = rootCACert;
+    this.secretKeyManager = secretKeyManager;
     final int handlerCount =
         conf.getInt(ScmConfigKeys.OZONE_SCM_SECURITY_HANDLER_COUNT_KEY,
             ScmConfigKeys.OZONE_SCM_SECURITY_HANDLER_COUNT_DEFAULT);
@@ -162,6 +175,37 @@ public class SCMSecurityProtocolServer implements 
SCMSecurityProtocol {
     return getEncodedCertToString(certSignReq, nodeDetails.getNodeType());
   }
 
+  @Override
+  public ManagedSecretKey getCurrentSecretKey() throws SCMSecurityException {
+    validateSecretKeyStatus();
+    return secretKeyManager.getCurrentKey();
+  }
+
+  @Override
+  public ManagedSecretKey getSecretKey(UUID id) throws SCMSecurityException {
+    validateSecretKeyStatus();
+    return secretKeyManager.getKey(id);
+  }
+
+  @Override
+  public List<ManagedSecretKey> getAllSecretKeys() throws SCMSecurityException 
{
+    validateSecretKeyStatus();
+    return secretKeyManager.getSortedKeys();
+  }
+
+  private void validateSecretKeyStatus() throws SCMSecurityException {
+    if (secretKeyManager == null) {
+      throw new SCMSecurityException("Secret keys are not enabled.",
+          ErrorCode.SECRET_KEY_NOT_ENABLED);
+    }
+
+    if (!secretKeyManager.isInitialized()) {
+      throw new SCMSecurityException(
+          "Secret key initialization is not finished yet.",
+          ErrorCode.SECRET_KEY_NOT_INITIALIZED);
+    }
+  }
+
   /**
    * Get SCM signed certificate for OM.
    *
@@ -368,7 +412,7 @@ public class SCMSecurityProtocolServer implements 
SCMSecurityProtocol {
     } catch (InterruptedException | ExecutionException e) {
       Thread.currentThread().interrupt();
       throw new SCMException("Fail to revoke certs",
-          SCMException.ResultCodes.FAILED_TO_REVOKE_CERTIFICATES);
+          ResultCodes.FAILED_TO_REVOKE_CERTIFICATES);
     }
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 1d1a6be253..e79e7a5536 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -75,6 +75,7 @@ import 
org.apache.hadoop.hdds.scm.ha.StatefulServiceStateManagerImpl;
 import org.apache.hadoop.hdds.scm.server.upgrade.SCMUpgradeFinalizationContext;
 import 
org.apache.hadoop.hdds.scm.server.upgrade.ScmHAUnfinalizedStateValidationAction;
 import org.apache.hadoop.hdds.scm.pipeline.WritableContainerFactory;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeyManager;
 import org.apache.hadoop.hdds.security.token.ContainerTokenGenerator;
 import org.apache.hadoop.hdds.security.token.ContainerTokenSecretManager;
 import org.apache.hadoop.hdds.security.x509.certificate.authority.CAType;
@@ -303,6 +304,7 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
   // container replicas.
   private ContainerReplicaPendingOps containerReplicaPendingOps;
   private final AtomicBoolean isStopped = new AtomicBoolean(false);
+  private final SecretKeyManagerService secretKeyManagerService;
 
   /** A list of property that are reconfigurable at runtime. */
   private final SortedSet<String> reconfigurableProperties =
@@ -388,6 +390,14 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
 
     initializeSystemManagers(conf, configurator);
 
+    if (isSecretKeyEnable(securityConfig)) {
+      secretKeyManagerService = new SecretKeyManagerService(scmContext, conf,
+              scmHAManager.getRatisServer());
+      serviceManager.register(secretKeyManagerService);
+    } else {
+      secretKeyManagerService = null;
+    }
+
     // Authenticate SCM if security is enabled, this initialization can only
     // be done after the metadata store is initialized.
     if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
@@ -743,13 +753,6 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
 
     serviceManager.register(expiredContainerReplicaOpScrubber);
 
-    if (isSecretKeyEnable(securityConfig)) {
-      SecretKeyManagerService secretKeyManagerService =
-          new SecretKeyManagerService(scmContext, conf,
-              scmHAManager.getRatisServer());
-      serviceManager.register(secretKeyManagerService);
-    }
-
     if (configurator.getContainerManager() != null) {
       containerManager = configurator.getContainerManager();
     } else {
@@ -879,6 +882,9 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
               certificateStore, scmStorageConfig, new DefaultProfile());
     }
 
+    SecretKeyManager secretKeyManager = secretKeyManagerService != null ?
+        secretKeyManagerService.getSecretKeyManager() : null;
+
     // We need to pass getCACertificate as rootCA certificate,
     // as for SCM CA is root-CA.
     securityProtocolServer = new SCMSecurityProtocolServer(conf,
@@ -886,7 +892,7 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
         scmCertificateClient == null ? null :
             scmCertificateClient.getRootCACertificate() != null ?
             scmCertificateClient.getRootCACertificate() :
-            scmCertificateClient.getCACertificate(), this);
+            scmCertificateClient.getCACertificate(), this, secretKeyManager);
 
     if (securityConfig.isContainerTokenEnabled()) {
       containerTokenMgr = createContainerTokenSecretManager(configuration);
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMSecurityProtocolServer.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMSecurityProtocolServer.java
index 5480caaf97..82f2cc182a 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMSecurityProtocolServer.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMSecurityProtocolServer.java
@@ -41,7 +41,7 @@ public class TestSCMSecurityProtocolServer {
     config.set(OZONE_SCM_SECURITY_SERVICE_ADDRESS_KEY,
         OZONE_SCM_SECURITY_SERVICE_BIND_HOST_DEFAULT + ":0");
     securityProtocolServer = new SCMSecurityProtocolServer(config, null,
-        null, null, null);
+        null, null, null, null);
   }
 
   @AfterEach
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
index 4cb535d62f..26d31717e3 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
@@ -383,7 +383,7 @@ public class MiniOzoneHAClusterImpl extends 
MiniOzoneClusterImpl {
     // StorageContainerManager constructors respectively).  So we need to 
manage
     // them separately, see initOMHAConfig() and initSCMHAConfig().
     private final ReservedPorts omPorts = new ReservedPorts(3);
-    private final ReservedPorts scmPorts = new ReservedPorts(3);
+    private final ReservedPorts scmPorts = new ReservedPorts(4);
 
     /**
      * Creates a new Builder.
@@ -673,12 +673,17 @@ public class MiniOzoneHAClusterImpl extends 
MiniOzoneClusterImpl {
             scmServiceId, scmNodeId);
         String scmGrpcPortKey = ConfUtils.addKeySuffixes(
             ScmConfigKeys.OZONE_SCM_GRPC_PORT_KEY, scmServiceId, scmNodeId);
+        String scmSecurityAddrKey = ConfUtils.addKeySuffixes(
+            ScmConfigKeys.OZONE_SCM_SECURITY_SERVICE_ADDRESS_KEY, scmServiceId,
+            scmNodeId);
 
         PrimitiveIterator.OfInt nodePorts = scmPorts.assign(scmNodeId);
         PrimitiveIterator.OfInt rpcPorts = scmRpcPorts.assign(scmNodeId);
         conf.set(scmAddrKey, "127.0.0.1");
         conf.set(scmHttpAddrKey, "127.0.0.1:" + nodePorts.nextInt());
         conf.set(scmHttpsAddrKey, "127.0.0.1:" + nodePorts.nextInt());
+        conf.set(scmSecurityAddrKey, "127.0.0.1:" + nodePorts.nextInt());
+        conf.set("ozone.scm.update.service.port", "0");
 
         int ratisPort = nodePorts.nextInt();
         conf.setInt(scmRatisPortKey, ratisPort);
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecretKeysApi.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecretKeysApi.java
new file mode 100644
index 0000000000..9ec8750d78
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecretKeysApi.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone;
+
+import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.hdds.conf.DefaultConfigManager;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
+import org.apache.hadoop.hdds.scm.ScmConfig;
+import org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
+import org.apache.hadoop.hdds.utils.HddsServerUtil;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.util.ExitUtils;
+import org.jetbrains.annotations.NotNull;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+import static 
org.apache.hadoop.hdds.DFSConfigKeysLegacy.DFS_DATANODE_KERBEROS_KEYTAB_FILE_KEY;
+import static 
org.apache.hadoop.hdds.DFSConfigKeysLegacy.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
+import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SECRET_KEY_EXPIRY_DURATION;
+import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SECRET_KEY_ROTATE_CHECK_DURATION;
+import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SECRET_KEY_ROTATE_DURATION;
+import static 
org.apache.hadoop.hdds.scm.ScmConfig.ConfigStrings.HDDS_SCM_KERBEROS_KEYTAB_FILE_KEY;
+import static 
org.apache.hadoop.hdds.scm.ScmConfig.ConfigStrings.HDDS_SCM_KERBEROS_PRINCIPAL_KEY;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY;
+import static 
org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig.ConfigStrings.HDDS_SCM_HTTP_KERBEROS_KEYTAB_FILE_KEY;
+import static 
org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig.ConfigStrings.HDDS_SCM_HTTP_KERBEROS_PRINCIPAL_KEY;
+import static 
org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.SECRET_KEY_NOT_ENABLED;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_KERBEROS_KEYTAB_FILE;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_KERBEROS_PRINCIPAL_KEY;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_KEYTAB_FILE_KEY;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY;
+import static 
org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration test to verify symmetric SecretKeys APIs in a secure cluster.
+ */
[email protected]
+public final class TestSecretKeysApi {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(TestSecretKeysApi.class);
+
+  @Rule
+  public Timeout timeout = Timeout.seconds(1600);
+
+  private MiniKdc miniKdc;
+  private OzoneConfiguration conf;
+  private File workDir;
+  private File ozoneKeytab;
+  private File spnegoKeytab;
+  private File testUserKeytab;
+  private String testUserPrincipal;
+  private String host;
+  private String clusterId;
+  private String scmId;
+  private MiniOzoneHAClusterImpl cluster;
+
+  @Before
+  public void init() throws Exception {
+    conf = new OzoneConfiguration();
+    conf.set(OZONE_SCM_CLIENT_ADDRESS_KEY, "localhost");
+
+    ExitUtils.disableSystemExit();
+
+    workDir = GenericTestUtils.getTestDir(getClass().getSimpleName());
+    clusterId = UUID.randomUUID().toString();
+    scmId = UUID.randomUUID().toString();
+
+    startMiniKdc();
+    setSecureConfig();
+    createCredentialsInKDC();
+  }
+
+  @After
+  public void stop() {
+    miniKdc.stop();
+    if (cluster != null) {
+      cluster.stop();
+    }
+    DefaultConfigManager.clearDefaultConfigs();
+  }
+
+  private void createCredentialsInKDC() throws Exception {
+    ScmConfig scmConfig = conf.getObject(ScmConfig.class);
+    SCMHTTPServerConfig httpServerConfig =
+        conf.getObject(SCMHTTPServerConfig.class);
+    createPrincipal(ozoneKeytab, scmConfig.getKerberosPrincipal());
+    createPrincipal(spnegoKeytab, httpServerConfig.getKerberosPrincipal());
+    createPrincipal(testUserKeytab, testUserPrincipal);
+  }
+
+  private void createPrincipal(File keytab, String... principal)
+      throws Exception {
+    miniKdc.createPrincipal(keytab, principal);
+  }
+
+  private void startMiniKdc() throws Exception {
+    Properties securityProperties = MiniKdc.createConf();
+    miniKdc = new MiniKdc(securityProperties, workDir);
+    miniKdc.start();
+  }
+
+  private void setSecureConfig() throws IOException {
+    conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
+    host = InetAddress.getLocalHost().getCanonicalHostName()
+        .toLowerCase();
+
+    conf.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS.name());
+
+    String curUser = UserGroupInformation.getCurrentUser().getUserName();
+    conf.set(OZONE_ADMINISTRATORS, curUser);
+
+    String realm = miniKdc.getRealm();
+    String hostAndRealm = host + "@" + realm;
+    conf.set(HDDS_SCM_KERBEROS_PRINCIPAL_KEY, "scm/" + hostAndRealm);
+    conf.set(HDDS_SCM_HTTP_KERBEROS_PRINCIPAL_KEY, "HTTP_SCM/" + hostAndRealm);
+    conf.set(OZONE_OM_KERBEROS_PRINCIPAL_KEY, "scm/" + hostAndRealm);
+    conf.set(OZONE_OM_HTTP_KERBEROS_PRINCIPAL_KEY, "HTTP_OM/" + hostAndRealm);
+    conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, "scm/" + hostAndRealm);
+
+    ozoneKeytab = new File(workDir, "scm.keytab");
+    spnegoKeytab = new File(workDir, "http.keytab");
+    testUserKeytab = new File(workDir, "testuser.keytab");
+    testUserPrincipal = "test@" + realm;
+
+    conf.set(HDDS_SCM_KERBEROS_KEYTAB_FILE_KEY,
+        ozoneKeytab.getAbsolutePath());
+    conf.set(HDDS_SCM_HTTP_KERBEROS_KEYTAB_FILE_KEY,
+        spnegoKeytab.getAbsolutePath());
+    conf.set(OZONE_OM_KERBEROS_KEYTAB_FILE_KEY,
+        ozoneKeytab.getAbsolutePath());
+    conf.set(OZONE_OM_HTTP_KERBEROS_KEYTAB_FILE,
+        spnegoKeytab.getAbsolutePath());
+    conf.set(DFS_DATANODE_KERBEROS_KEYTAB_FILE_KEY,
+        ozoneKeytab.getAbsolutePath());
+  }
+
+
+  /**
+   * Test secret key apis in happy case.
+   */
+  @Test
+  public void testSecretKeyApiSuccess() throws Exception {
+    enableBlockToken();
+    // set a low rotation period, of 1s, expiry is 3s, expect 3 active keys
+    // at any moment.
+    conf.set(HDDS_SECRET_KEY_ROTATE_CHECK_DURATION, "100ms");
+    conf.set(HDDS_SECRET_KEY_ROTATE_DURATION, "1s");
+    conf.set(HDDS_SECRET_KEY_EXPIRY_DURATION, "3000ms");
+
+    startCluster();
+    SCMSecurityProtocol securityProtocol = getScmSecurityProtocol();
+
+    // start the test when keys are full.
+    GenericTestUtils.waitFor(() -> {
+      try {
+        return securityProtocol.getAllSecretKeys().size() >= 3;
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+    }, 100, 4_000);
+
+    ManagedSecretKey initialKey = securityProtocol.getCurrentSecretKey();
+    assertNotNull(initialKey);
+    List<ManagedSecretKey> initialKeys = securityProtocol.getAllSecretKeys();
+    assertEquals(initialKey, initialKeys.get(0));
+    ManagedSecretKey lastKey = initialKeys.get(initialKeys.size() - 1);
+
+    LOG.info("Initial active key: {}", initialKey);
+    LOG.info("Initial keys: {}", initialKeys);
+
+    // wait for the next rotation.
+    GenericTestUtils.waitFor(() -> {
+      try {
+        ManagedSecretKey newCurrentKey = 
securityProtocol.getCurrentSecretKey();
+        return !newCurrentKey.equals(initialKey);
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+    }, 100, 1500);
+    ManagedSecretKey  updatedKey = securityProtocol.getCurrentSecretKey();
+    List<ManagedSecretKey>  updatedKeys = securityProtocol.getAllSecretKeys();
+
+    LOG.info("Updated active key: {}", updatedKey);
+    LOG.info("Updated keys: {}", updatedKeys);
+
+    assertEquals(updatedKey, updatedKeys.get(0));
+    assertEquals(initialKey, updatedKeys.get(1));
+    // ensure the last key from the previous cycle no longer managed.
+    assertTrue(lastKey.isExpired());
+    assertFalse(updatedKeys.contains(lastKey));
+
+    // assert getSecretKey by ID.
+    ManagedSecretKey keyById = securityProtocol.getSecretKey(
+        updatedKey.getId());
+    assertNotNull(keyById);
+    ManagedSecretKey nonExisting = securityProtocol.getSecretKey(
+        UUID.randomUUID());
+    assertNull(nonExisting);
+  }
+
+  /**
+   * Verify API behavior when block token is not enable.
+   */
+  @Test
+  public void testSecretKeyApiNotEnabled() throws Exception {
+    startCluster();
+    SCMSecurityProtocol securityProtocol = getScmSecurityProtocol();
+
+    SCMSecurityException ex = assertThrows(SCMSecurityException.class,
+            securityProtocol::getCurrentSecretKey);
+    assertEquals(SECRET_KEY_NOT_ENABLED, ex.getErrorCode());
+
+    ex = assertThrows(SCMSecurityException.class,
+        () -> securityProtocol.getSecretKey(UUID.randomUUID()));
+    assertEquals(SECRET_KEY_NOT_ENABLED, ex.getErrorCode());
+
+    ex = assertThrows(SCMSecurityException.class,
+        securityProtocol::getAllSecretKeys);
+    assertEquals(SECRET_KEY_NOT_ENABLED, ex.getErrorCode());
+  }
+
+  /**
+   * Verify API behavior when SCM leader fails.
+   */
+  @Test
+  public void testSecretKeyAfterSCMFailover() throws Exception {
+    enableBlockToken();
+    // set a long duration period, so that no rotation happens during SCM
+    // leader change.
+    conf.set(HDDS_SECRET_KEY_ROTATE_CHECK_DURATION, "10m");
+    conf.set(HDDS_SECRET_KEY_ROTATE_DURATION, "1d");
+    conf.set(HDDS_SECRET_KEY_EXPIRY_DURATION, "7d");
+
+    startCluster();
+    SCMSecurityProtocol securityProtocol = getScmSecurityProtocol();
+    List<ManagedSecretKey> keysInitial = securityProtocol.getAllSecretKeys();
+    LOG.info("Keys before fail over: {}.", keysInitial);
+
+    // turn the current SCM leader off.
+    StorageContainerManager activeSCM = cluster.getActiveSCM();
+    cluster.shutdownStorageContainerManager(activeSCM);
+    // wait for
+    cluster.waitForSCMToBeReady();
+
+    List<ManagedSecretKey> keysAfter = securityProtocol.getAllSecretKeys();
+    LOG.info("Keys after fail over: {}.", keysAfter);
+
+    assertEquals(keysInitial.size(), keysAfter.size());
+    for (int i = 0; i < keysInitial.size(); i++) {
+      assertEquals(keysInitial.get(i), keysAfter.get(i));
+    }
+  }
+
+  private void startCluster()
+      throws IOException, TimeoutException, InterruptedException {
+    OzoneManager.setTestSecureOmFlag(true);
+    MiniOzoneCluster.Builder builder = MiniOzoneCluster.newHABuilder(conf)
+        .setClusterId(clusterId)
+        .setSCMServiceId("TestSecretKey")
+        .setScmId(scmId)
+        .setNumDatanodes(3)
+        .setNumOfStorageContainerManagers(3)
+        .setNumOfOzoneManagers(1);
+
+    cluster = (MiniOzoneHAClusterImpl) builder.build();
+    cluster.waitForClusterToBeReady();
+  }
+
+  @NotNull
+  private SCMSecurityProtocol getScmSecurityProtocol() throws IOException {
+    UserGroupInformation ugi =
+        UserGroupInformation.loginUserFromKeytabAndReturnUGI(
+            testUserPrincipal, testUserKeytab.getCanonicalPath());
+    ugi.setAuthenticationMethod(KERBEROS);
+    SCMSecurityProtocol scmSecurityProtocolClient =
+        HddsServerUtil.getScmSecurityClient(conf, ugi);
+    assertNotNull(scmSecurityProtocolClient);
+    return scmSecurityProtocolClient;
+  }
+
+  private void enableBlockToken() {
+    conf.setBoolean(HDDS_BLOCK_TOKEN_ENABLED, true);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to