This is an automated email from the ASF dual-hosted git repository.

duong pushed a commit to branch HDDS-7733-Symmetric-Tokens
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit ba51dbacdff19371c3d2649f34645442046b604c
Author: Duong Nguyen <[email protected]>
AuthorDate: Tue May 9 11:08:24 2023 -0700

    HDDS-7945. Integrate secret keys to SCM snapshot (#4549)
---
 .../SCMSecurityProtocolFailoverProxyProvider.java  |   2 +-
 .../SingleSCMSecurityProtocolProxyProvider.java    |  55 ++++
 .../symmetric/DefaultSecretKeySignerClient.java    |  16 +-
 .../hdds/security/symmetric/SecretKeyManager.java  |   4 +
 .../hdds/security/symmetric/SecretKeyState.java    |   5 +
 .../security/symmetric/SecretKeyStateImpl.java     |   9 +
 .../apache/hadoop/hdds/utils/HddsServerUtil.java   |  11 +
 .../apache/hadoop/hdds/scm/ha/SCMHAManager.java    |   8 +
 .../hadoop/hdds/scm/ha/SCMHAManagerImpl.java       |  26 ++
 .../hadoop/hdds/scm/ha/SCMHAManagerStub.java       |   6 +
 .../apache/hadoop/hdds/scm/ha/SCMStateMachine.java |  28 +-
 .../hdds/scm/server/StorageContainerManager.java   |   2 +-
 .../hadoop/hdds/scm/ha/TestSCMHAManagerImpl.java   |   4 +-
 .../hadoop/hdds/scm/TestSCMInstallSnapshot.java    |   2 +-
 .../hadoop/ozone/MiniOzoneHAClusterImpl.java       |  10 +-
 .../org/apache/hadoop/ozone/TestSecretKeysApi.java |   2 +-
 .../ozone/scm/TestSCMInstallSnapshotWithHA.java    |   4 +-
 .../TestSecretKeySnapshot.java}                    | 306 +++++++++------------
 18 files changed, 309 insertions(+), 191 deletions(-)

diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecurityProtocolFailoverProxyProvider.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecurityProtocolFailoverProxyProvider.java
index 18d1be6b43..5a95e1ffd7 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecurityProtocolFailoverProxyProvider.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecurityProtocolFailoverProxyProvider.java
@@ -289,7 +289,7 @@ public class SCMSecurityProtocolFailoverProxyProvider 
implements
     for (Map.Entry<String, ProxyInfo<SCMSecurityProtocolPB>> proxy :
         scmProxies.entrySet()) {
       if (proxy.getValue() != null) {
-        RPC.stopProxy(proxy.getValue());
+        RPC.stopProxy(proxy.getValue().proxy);
       }
       scmProxies.remove(proxy.getKey());
     }
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SingleSCMSecurityProtocolProxyProvider.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SingleSCMSecurityProtocolProxyProvider.java
new file mode 100644
index 0000000000..a69522d14f
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SingleSCMSecurityProtocolProxyProvider.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.proxy;
+
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Proxy provider for
+ * {@link org.apache.hadoop.hdds.protocol.SCMSecurityProtocol} against a single
+ * SCM node (no fail-over).
+ */
+public class SingleSCMSecurityProtocolProxyProvider
+    extends SCMSecurityProtocolFailoverProxyProvider {
+  private final String scmNodeId;
+
+  public SingleSCMSecurityProtocolProxyProvider(
+      ConfigurationSource conf,
+      UserGroupInformation userGroupInformation,
+      String scmNodeId) {
+    super(conf, userGroupInformation);
+    this.scmNodeId = scmNodeId;
+  }
+
+  @Override
+  public synchronized String getCurrentProxySCMNodeId() {
+    return scmNodeId;
+  }
+
+  @Override
+  public synchronized void performFailover(SCMSecurityProtocolPB currentProxy) 
{
+    // do nothing.
+  }
+
+  @Override
+  public synchronized void performFailoverToAssignedLeader(String newLeader,
+      Exception e) {
+    // do nothing.
+  }
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/DefaultSecretKeySignerClient.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/DefaultSecretKeySignerClient.java
index a1056f9139..e29254c753 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/DefaultSecretKeySignerClient.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/DefaultSecretKeySignerClient.java
@@ -72,14 +72,16 @@ public class DefaultSecretKeySignerClient implements 
SecretKeySignerClient {
 
   @Override
   public void stop() {
-    executorService.shutdown();
-    try {
-      if (executorService.awaitTermination(1, TimeUnit.MINUTES)) {
-        executorService.shutdownNow();
+    if (executorService != null) {
+      executorService.shutdown();
+      try {
+        if (executorService.awaitTermination(1, TimeUnit.MINUTES)) {
+          executorService.shutdownNow();
+        }
+      } catch (InterruptedException e) {
+        LOG.error("Interrupted while shutting down executor service.", e);
+        Thread.currentThread().interrupt();
       }
-    } catch (InterruptedException e) {
-      LOG.error("Interrupted while shutting down executor service.", e);
-      Thread.currentThread().interrupt();
     }
   }
 
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyManager.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyManager.java
index f7a481cc05..8685a7fb52 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyManager.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyManager.java
@@ -143,6 +143,10 @@ public class SecretKeyManager implements SecretKeyClient {
     return state.getSortedKeys();
   }
 
+  public void reinitialize(List<ManagedSecretKey> secretKeys) {
+    state.reinitialize(secretKeys);
+  }
+
   private boolean shouldRotate(ManagedSecretKey currentKey) {
     Duration established = between(currentKey.getCreationTime(), 
Instant.now());
     return established.compareTo(rotationDuration) >= 0;
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyState.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyState.java
index 7b510a10b2..43518b901a 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyState.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyState.java
@@ -52,4 +52,9 @@ public interface SecretKeyState {
    */
   @Replicate
   void updateKeys(List<ManagedSecretKey> newKeys) throws TimeoutException;
+
+  /**
+   * Update SecretKeys from a snapshot from SCM leader.
+   */
+  void reinitialize(List<ManagedSecretKey> secretKeys);
 }
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyStateImpl.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyStateImpl.java
index 727b005d2b..b3f0ae55d9 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyStateImpl.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyStateImpl.java
@@ -106,6 +106,10 @@ public final class SecretKeyStateImpl implements 
SecretKeyState {
    */
   @Override
   public void updateKeys(List<ManagedSecretKey> newKeys) {
+    updateKeysInternal(newKeys);
+  }
+
+  private void updateKeysInternal(List<ManagedSecretKey> newKeys) {
     LOG.info("Updating keys with {}", newKeys);
     lock.writeLock().lock();
     try {
@@ -127,4 +131,9 @@ public final class SecretKeyStateImpl implements 
SecretKeyState {
       lock.writeLock().unlock();
     }
   }
+
+  @Override
+  public void reinitialize(List<ManagedSecretKey> secretKeys) {
+    updateKeysInternal(secretKeys);
+  }
 }
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
index f120ffad33..7c896c0bf1 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.hdds.scm.proxy.SCMClientConfig;
 import 
org.apache.hadoop.hdds.scm.proxy.SCMSecurityProtocolFailoverProxyProvider;
+import org.apache.hadoop.hdds.scm.proxy.SingleSCMSecurityProtocolProxyProvider;
 import org.apache.hadoop.hdds.server.ServerUtils;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
 import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
@@ -447,6 +448,16 @@ public final class HddsServerUtil {
             UserGroupInformation.getCurrentUser()));
   }
 
+  /**
+   * Create a scm security client to interact with a specific SCM node.
+   */
+  public static SCMSecurityProtocolClientSideTranslatorPB
+      getScmSecurityClientSingleNode(ConfigurationSource conf, String 
scmNodeId,
+      UserGroupInformation ugi) throws IOException {
+    return new SCMSecurityProtocolClientSideTranslatorPB(
+        new SingleSCMSecurityProtocolProxyProvider(conf, ugi, scmNodeId));
+  }
+
   public static SCMSecurityProtocolClientSideTranslatorPB
       getScmSecurityClientWithMaxRetry(OzoneConfiguration conf,
       UserGroupInformation ugi) throws IOException {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
index 0a86de3b37..03f6ae293b 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
@@ -20,10 +20,12 @@ package org.apache.hadoop.hdds.scm.ha;
 import org.apache.hadoop.hdds.scm.AddSCMRequest;
 import org.apache.hadoop.hdds.scm.RemoveSCMRequest;
 import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer;
+import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
 import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
 import org.apache.ratis.server.protocol.TermIndex;
 
 import java.io.IOException;
+import java.util.List;
 
 /**
  * SCMHAManager provides HA service for SCM.
@@ -87,6 +89,12 @@ public interface SCMHAManager extends AutoCloseable {
    */
   DBCheckpoint downloadCheckpointFromLeader(String leaderId);
 
+  /**
+   * Get secret keys from SCM leader.
+   */
+  List<ManagedSecretKey> getSecretKeysFromLeader(String leaderID)
+      throws IOException;
+
   /**
    * Verify the SCM DB checkpoint downloaded from leader.
    *
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
index 6af0fb9919..c8efe7b261 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
@@ -20,12 +20,16 @@ package org.apache.hadoop.hdds.scm.ha;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import 
org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdds.scm.AddSCMRequest;
 import org.apache.hadoop.hdds.scm.RemoveSCMRequest;
 import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer;
 import org.apache.hadoop.hdds.scm.metadata.SCMDBTransactionBufferImpl;
 import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
+import org.apache.hadoop.hdds.scm.security.SecretKeyManagerService;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.utils.HAUtils;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -36,6 +40,7 @@ import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.OzoneSecurityUtil;
 import org.apache.hadoop.hdds.ExitManager;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.util.FileUtils;
 import org.slf4j.Logger;
@@ -44,6 +49,9 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Path;
+import java.util.List;
+
+import static 
org.apache.hadoop.hdds.utils.HddsServerUtil.getScmSecurityClientSingleNode;
 
 /**
  * SCMHAManagerImpl uses Apache Ratis for HA implementation. We will have 2N+1
@@ -60,6 +68,7 @@ public class SCMHAManagerImpl implements SCMHAManager {
 
   private final SCMRatisServer ratisServer;
   private final ConfigurationSource conf;
+  private final SecurityConfig securityConfig;
   private final DBTransactionBuffer transactionBuffer;
   private final SCMSnapshotProvider scmSnapshotProvider;
   private final StorageContainerManager scm;
@@ -72,8 +81,10 @@ public class SCMHAManagerImpl implements SCMHAManager {
    * Creates SCMHAManager instance.
    */
   public SCMHAManagerImpl(final ConfigurationSource conf,
+      final SecurityConfig securityConfig,
       final StorageContainerManager scm) throws IOException {
     this.conf = conf;
+    this.securityConfig = securityConfig;
     this.scm = scm;
     this.exitManager = new ExitManager();
     if (SCMHAUtils.isSCMHAEnabled(conf)) {
@@ -163,6 +174,21 @@ public class SCMHAManagerImpl implements SCMHAManager {
     return dBCheckpoint;
   }
 
+  @Override
+  public List<ManagedSecretKey> getSecretKeysFromLeader(String leaderID)
+      throws IOException {
+    if (!SecretKeyManagerService.isSecretKeyEnable(securityConfig)) {
+      return null;
+    }
+
+    LOG.info("Getting secret keys from leader {}.", leaderID);
+    try (SCMSecurityProtocolClientSideTranslatorPB securityProtocol =
+             getScmSecurityClientSingleNode(conf, leaderID,
+                 UserGroupInformation.getLoginUser())) {
+      return securityProtocol.getAllSecretKeys();
+    }
+  }
+
   @Override
   public TermIndex verifyCheckpointFromLeader(String leaderId,
                                               DBCheckpoint checkpoint) {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerStub.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerStub.java
index 6382df3483..c15d805fb1 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerStub.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerStub.java
@@ -30,6 +30,7 @@ import 
org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
 import org.apache.hadoop.hdds.scm.AddSCMRequest;
 import org.apache.hadoop.hdds.scm.RemoveSCMRequest;
 import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer;
+import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
 import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
 import org.apache.hadoop.hdds.utils.db.DBStore;
@@ -148,6 +149,11 @@ public final class SCMHAManagerStub implements 
SCMHAManager {
     return null;
   }
 
+  @Override
+  public List<ManagedSecretKey> getSecretKeysFromLeader(String leaderID) {
+    return null;
+  }
+
   @Override
   public TermIndex verifyCheckpointFromLeader(String leaderId,
                                               DBCheckpoint checkpoint) {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
index c88331db98..9384661537 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm.ha;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.util.EnumMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Collection;
 import java.util.Optional;
@@ -37,6 +38,7 @@ import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
 import org.apache.hadoop.hdds.utils.TransactionInfo;
 import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
 import org.apache.hadoop.util.concurrent.HadoopExecutors;
@@ -62,6 +64,8 @@ import org.apache.ratis.util.LifeCycle;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.util.Objects.requireNonNull;
+
 /**
  * The SCMStateMachine is the state machine for SCMRatisServer. It is
  * responsible for applying ratis committed transactions to
@@ -83,6 +87,7 @@ public class SCMStateMachine extends BaseStateMachine {
   // ensures serializable between notifyInstallSnapshotFromLeader()
   // and reinitialize().
   private DBCheckpoint installingDBCheckpoint = null;
+  private List<ManagedSecretKey> installingSecretKeys = null;
 
   private AtomicLong currentLeaderTerm = new AtomicLong(-1L);
   private AtomicBoolean refreshedAfterLeaderReady = new AtomicBoolean(false);
@@ -243,12 +248,23 @@ public class SCMStateMachine extends BaseStateMachine {
             return null;
           }
 
+          List<ManagedSecretKey> secretKeys;
+          try {
+            secretKeys =
+                scm.getScmHAManager().getSecretKeysFromLeader(leaderNodeId);
+            LOG.info("Got secret keys from leaders {}", secretKeys);
+          } catch (IOException ex) {
+            LOG.error("Failed to get secret keys from SCM leader {}",
+                leaderNodeId, ex);
+            return null;
+          }
+
           TermIndex termIndex =
               scm.getScmHAManager().verifyCheckpointFromLeader(
                   leaderNodeId, checkpoint);
 
           if (termIndex != null) {
-            setInstallingDBCheckpoint(checkpoint);
+            setInstallingSnapshotData(checkpoint, secretKeys);
           }
           return termIndex;
         },
@@ -381,9 +397,11 @@ public class SCMStateMachine extends BaseStateMachine {
   public void reinitialize() throws IOException {
     Preconditions.checkNotNull(installingDBCheckpoint);
     DBCheckpoint checkpoint = installingDBCheckpoint;
+    List<ManagedSecretKey> secretKeys = installingSecretKeys;
 
     // explicitly set installingDBCheckpoint to be null
     installingDBCheckpoint = null;
+    installingSecretKeys = null;
 
     TermIndex termIndex = null;
     try {
@@ -402,6 +420,10 @@ public class SCMStateMachine extends BaseStateMachine {
       LOG.error("Failed to unpause ", ioe);
     }
 
+    if (secretKeys != null) {
+      requireNonNull(scm.getSecretKeyManager()).reinitialize(secretKeys);
+    }
+
     getLifeCycle().transition(LifeCycle.State.STARTING);
     getLifeCycle().transition(LifeCycle.State.RUNNING);
   }
@@ -425,8 +447,10 @@ public class SCMStateMachine extends BaseStateMachine {
   }
 
   @VisibleForTesting
-  public void setInstallingDBCheckpoint(DBCheckpoint checkpoint) {
+  public void setInstallingSnapshotData(DBCheckpoint checkpoint,
+      List<ManagedSecretKey> secretKeys) {
     Preconditions.checkArgument(installingDBCheckpoint == null);
     installingDBCheckpoint = checkpoint;
+    installingSecretKeys = secretKeys;
   }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 467a406f7e..a3f6f55493 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -632,7 +632,7 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
     if (configurator.getSCMHAManager() != null) {
       scmHAManager = configurator.getSCMHAManager();
     } else {
-      scmHAManager = new SCMHAManagerImpl(conf, this);
+      scmHAManager = new SCMHAManagerImpl(conf, securityConfig, this);
     }
 
     if (configurator.getLeaseManager() != null) {
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMHAManagerImpl.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMHAManagerImpl.java
index 11209217cc..8ca660491e 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMHAManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMHAManagerImpl.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationManager;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.certificate.client
     .CertificateClient;
 import org.apache.hadoop.hdds.utils.TransactionInfo;
@@ -254,7 +255,8 @@ class TestSCMHAManagerImpl {
     when(nodeDetails.getRatisHostPortStr()).thenReturn("localhost:" +
         conf.get(ScmConfigKeys.OZONE_SCM_RATIS_PORT_KEY));
 
-    final SCMHAManager manager = new SCMHAManagerImpl(conf, scm);
+    final SCMHAManager manager = new SCMHAManagerImpl(conf,
+        new SecurityConfig(conf), scm);
     when(scm.getScmHAManager()).thenReturn(manager);
     return scm;
   }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMInstallSnapshot.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMInstallSnapshot.java
index cd8a7d2e68..407ae94664 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMInstallSnapshot.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMInstallSnapshot.java
@@ -151,7 +151,7 @@ public class TestSCMInstallSnapshot {
     SCMStateMachine sm =
         scm.getScmHAManager().getRatisServer().getSCMStateMachine();
     sm.pause();
-    sm.setInstallingDBCheckpoint(checkpoint);
+    sm.setInstallingSnapshotData(checkpoint, null);
     sm.reinitialize();
 
     Assert.assertNotNull(
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
index 26d31717e3..717964e122 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
@@ -171,6 +171,12 @@ public class MiniOzoneHAClusterImpl extends 
MiniOzoneClusterImpl {
     return this.scmhaService.getServiceByIndex(index);
   }
 
+  public StorageContainerManager getScmLeader() {
+    return getStorageContainerManagers().stream()
+        .filter(StorageContainerManager::checkLeader)
+        .findFirst().orElse(null);
+  }
+
   private OzoneManager getOMLeader(boolean waitForLeaderElection)
       throws TimeoutException, InterruptedException {
     if (waitForLeaderElection) {
@@ -701,9 +707,7 @@ public class MiniOzoneHAClusterImpl extends 
MiniOzoneClusterImpl {
         conf.set(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY,
             "127.0.0.1:" + blockPort);
 
-        if (i <= numOfActiveSCMs) {
-          scmPorts.release(scmNodeId);
-        }
+        scmPorts.release(scmNodeId);
         scmRpcPorts.release(scmNodeId);
       }
 
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecretKeysApi.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecretKeysApi.java
index 217b08e728..c2bfbb0e15 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecretKeysApi.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecretKeysApi.java
@@ -85,7 +85,7 @@ public final class TestSecretKeysApi {
       .getLogger(TestSecretKeysApi.class);
 
   @Rule
-  public Timeout timeout = Timeout.seconds(180);
+  public Timeout timeout = Timeout.seconds(500);
 
   private MiniKdc miniKdc;
   private OzoneConfiguration conf;
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMInstallSnapshotWithHA.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMInstallSnapshotWithHA.java
index 3b8ad5faf3..74868bee2a 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMInstallSnapshotWithHA.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMInstallSnapshotWithHA.java
@@ -279,8 +279,8 @@ public class TestSCMInstallSnapshotWithHA {
         s == LifeCycle.State.NEW || s.isPausingOrPaused());
 
     // Verify correct reloading
-    followerSM.setInstallingDBCheckpoint(
-        new RocksDBCheckpoint(checkpointBackup.toPath()));
+    followerSM.setInstallingSnapshotData(
+        new RocksDBCheckpoint(checkpointBackup.toPath()), null);
     followerSM.reinitialize();
     Assert.assertEquals(followerSM.getLastAppliedTermIndex(),
         leaderCheckpointTrxnInfo.getTermIndex());
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecretKeysApi.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSecretKeySnapshot.java
similarity index 51%
copy from 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecretKeysApi.java
copy to 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSecretKeySnapshot.java
index 217b08e728..410fc5bd65 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecretKeysApi.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSecretKeySnapshot.java
@@ -15,35 +15,37 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.ozone;
+package org.apache.hadoop.ozone.scm;
 
-import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.conf.DefaultConfigManager;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.scm.ScmConfig;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.ha.SCMStateMachine;
 import org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
-import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
-import org.apache.hadoop.hdds.utils.HddsServerUtil;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeyManager;
 import org.apache.hadoop.minikdc.MiniKdc;
-import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ratis.util.ExitUtils;
-import org.jetbrains.annotations.NotNull;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 import java.util.UUID;
@@ -61,7 +63,6 @@ import static 
org.apache.hadoop.hdds.scm.ScmConfig.ConfigStrings.HDDS_SCM_KERBER
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY;
 import static 
org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig.ConfigStrings.HDDS_SCM_HTTP_KERBEROS_KEYTAB_FILE_KEY;
 import static 
org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig.ConfigStrings.HDDS_SCM_HTTP_KERBEROS_PRINCIPAL_KEY;
-import static 
org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.SECRET_KEY_NOT_ENABLED;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_KERBEROS_KEYTAB_FILE;
@@ -69,36 +70,36 @@ import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_KERBEROS_PRI
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_KEYTAB_FILE_KEY;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY;
 import static 
org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThrows;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
- * Integration test to verify symmetric SecretKeys APIs in a secure cluster.
+ * Integration test to verify that symmetric secret keys are correctly
+ * synchronized from leader to follower during snapshot installation.
  */
[email protected]
-public final class TestSecretKeysApi {
+@Timeout(500)
+public final class TestSecretKeySnapshot {
   private static final Logger LOG = LoggerFactory
-      .getLogger(TestSecretKeysApi.class);
-
-  @Rule
-  public Timeout timeout = Timeout.seconds(180);
+      .getLogger(TestSecretKeySnapshot.class);
+  private static final long SNAPSHOT_THRESHOLD = 100;
+  private static final int LOG_PURGE_GAP = 100;
+  public static final int ROTATE_CHECK_DURATION_MS = 1_000;
+  public static final int ROTATE_DURATION_MS = 30_000;
+  public static final int EXPIRY_DURATION_MS = 61_000;
 
   private MiniKdc miniKdc;
   private OzoneConfiguration conf;
   private File workDir;
   private File ozoneKeytab;
   private File spnegoKeytab;
-  private File testUserKeytab;
-  private String testUserPrincipal;
+  private String host;
   private String clusterId;
   private String scmId;
   private MiniOzoneHAClusterImpl cluster;
 
-  @Before
+  @BeforeEach
   public void init() throws Exception {
     conf = new OzoneConfiguration();
     conf.set(OZONE_SCM_CLIENT_ADDRESS_KEY, "localhost");
@@ -112,9 +113,32 @@ public final class TestSecretKeysApi {
     startMiniKdc();
     setSecureConfig();
     createCredentialsInKDC();
+
+    conf.setBoolean(ScmConfigKeys.OZONE_SCM_HA_RAFT_LOG_PURGE_ENABLED, true);
+    conf.setInt(ScmConfigKeys.OZONE_SCM_HA_RAFT_LOG_PURGE_GAP, LOG_PURGE_GAP);
+    conf.setLong(ScmConfigKeys.OZONE_SCM_HA_RATIS_SNAPSHOT_THRESHOLD,
+        SNAPSHOT_THRESHOLD);
+
+    conf.set(HDDS_SECRET_KEY_ROTATE_CHECK_DURATION,
+        ROTATE_CHECK_DURATION_MS + "ms");
+    conf.set(HDDS_SECRET_KEY_ROTATE_DURATION, ROTATE_DURATION_MS + "ms");
+    conf.set(HDDS_SECRET_KEY_EXPIRY_DURATION, EXPIRY_DURATION_MS + "ms");
+
+    MiniOzoneCluster.Builder builder = MiniOzoneCluster.newHABuilder(conf)
+        .setClusterId(clusterId)
+        .setSCMServiceId("TestSecretKeySnapshot")
+        .setScmId(scmId)
+        .setSCMServiceId("SCMServiceId")
+        .setNumDatanodes(1)
+        .setNumOfStorageContainerManagers(3)
+        .setNumOfActiveSCMs(2)
+        .setNumOfOzoneManagers(1);
+
+    cluster = (MiniOzoneHAClusterImpl) builder.build();
+    cluster.waitForClusterToBeReady();
   }
 
-  @After
+  @AfterEach
   public void stop() {
     miniKdc.stop();
     if (cluster != null) {
@@ -129,7 +153,6 @@ public final class TestSecretKeysApi {
         conf.getObject(SCMHTTPServerConfig.class);
     createPrincipal(ozoneKeytab, scmConfig.getKerberosPrincipal());
     createPrincipal(spnegoKeytab, httpServerConfig.getKerberosPrincipal());
-    createPrincipal(testUserKeytab, testUserPrincipal);
   }
 
   private void createPrincipal(File keytab, String... principal)
@@ -145,7 +168,7 @@ public final class TestSecretKeysApi {
 
   private void setSecureConfig() throws IOException {
     conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
-    String host = InetAddress.getLocalHost().getCanonicalHostName()
+    host = InetAddress.getLocalHost().getCanonicalHostName()
         .toLowerCase();
 
     conf.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS.name());
@@ -163,8 +186,6 @@ public final class TestSecretKeysApi {
 
     ozoneKeytab = new File(workDir, "scm.keytab");
     spnegoKeytab = new File(workDir, "http.keytab");
-    testUserKeytab = new File(workDir, "testuser.keytab");
-    testUserPrincipal = "test@" + realm;
 
     conf.set(HDDS_SCM_KERBEROS_KEYTAB_FILE_KEY,
         ozoneKeytab.getAbsolutePath());
@@ -176,152 +197,93 @@ public final class TestSecretKeysApi {
         spnegoKeytab.getAbsolutePath());
     conf.set(DFS_DATANODE_KERBEROS_KEYTAB_FILE_KEY,
         ozoneKeytab.getAbsolutePath());
-  }
 
-  /**
-   * Test secret key apis in happy case.
-   */
-  @Test
-  public void testSecretKeyApiSuccess() throws Exception {
-    enableBlockToken();
-    // set a low rotation period, of 1s, expiry is 3s, expect 3 active keys
-    // at any moment.
-    conf.set(HDDS_SECRET_KEY_ROTATE_CHECK_DURATION, "100ms");
-    conf.set(HDDS_SECRET_KEY_ROTATE_DURATION, "1s");
-    conf.set(HDDS_SECRET_KEY_EXPIRY_DURATION, "3000ms");
-
-    startCluster();
-    SCMSecurityProtocol securityProtocol = getScmSecurityProtocol();
-
-    // start the test when keys are full.
-    GenericTestUtils.waitFor(() -> {
-      try {
-        return securityProtocol.getAllSecretKeys().size() >= 3;
-      } catch (IOException ex) {
-        throw new RuntimeException(ex);
-      }
-    }, 100, 4_000);
-
-    ManagedSecretKey initialKey = securityProtocol.getCurrentSecretKey();
-    assertNotNull(initialKey);
-    List<ManagedSecretKey> initialKeys = securityProtocol.getAllSecretKeys();
-    assertEquals(initialKey, initialKeys.get(0));
-    ManagedSecretKey lastKey = initialKeys.get(initialKeys.size() - 1);
-
-    LOG.info("Initial active key: {}", initialKey);
-    LOG.info("Initial keys: {}", initialKeys);
-
-    // wait for the next rotation.
-    GenericTestUtils.waitFor(() -> {
-      try {
-        ManagedSecretKey newCurrentKey = 
securityProtocol.getCurrentSecretKey();
-        return !newCurrentKey.equals(initialKey);
-      } catch (IOException ex) {
-        throw new RuntimeException(ex);
-      }
-    }, 100, 1500);
-    ManagedSecretKey  updatedKey = securityProtocol.getCurrentSecretKey();
-    List<ManagedSecretKey>  updatedKeys = securityProtocol.getAllSecretKeys();
-
-    LOG.info("Updated active key: {}", updatedKey);
-    LOG.info("Updated keys: {}", updatedKeys);
-
-    assertEquals(updatedKey, updatedKeys.get(0));
-    assertEquals(initialKey, updatedKeys.get(1));
-    // ensure the last key from the previous cycle no longer managed.
-    assertTrue(lastKey.isExpired());
-    assertFalse(updatedKeys.contains(lastKey));
-
-    // assert getSecretKey by ID.
-    ManagedSecretKey keyById = securityProtocol.getSecretKey(
-        updatedKey.getId());
-    assertNotNull(keyById);
-    ManagedSecretKey nonExisting = securityProtocol.getSecretKey(
-        UUID.randomUUID());
-    assertNull(nonExisting);
+    conf.setBoolean(HDDS_BLOCK_TOKEN_ENABLED, true);
   }
 
-  /**
-   * Verify API behavior when block token is not enable.
-   */
   @Test
-  public void testSecretKeyApiNotEnabled() throws Exception {
-    startCluster();
-    SCMSecurityProtocol securityProtocol = getScmSecurityProtocol();
-
-    SCMSecurityException ex = assertThrows(SCMSecurityException.class,
-            securityProtocol::getCurrentSecretKey);
-    assertEquals(SECRET_KEY_NOT_ENABLED, ex.getErrorCode());
+  public void testInstallSnapshot() throws Exception {
+    // Get the leader SCM
+    StorageContainerManager leaderSCM = cluster.getScmLeader();
+    assertNotNull(leaderSCM);
+    // Find the inactive SCM
+    String followerId = cluster.getInactiveSCM().next().getSCMNodeId();
+
+    StorageContainerManager followerSCM = cluster.getSCM(followerId);
+
+    // wait until leader SCM got enough secret keys.
+    SecretKeyManager leaderSecretKeyManager = leaderSCM.getSecretKeyManager();
+    GenericTestUtils.waitFor(
+        () -> leaderSecretKeyManager.getSortedKeys().size() >= 2,
+        ROTATE_CHECK_DURATION_MS, EXPIRY_DURATION_MS);
+
+    writeToIncreaseLogIndex(leaderSCM, 200);
+    ManagedSecretKey currentKeyInLeader =
+        leaderSecretKeyManager.getCurrentSecretKey();
+
+    // Start the inactive SCM. Install Snapshot will happen as part
+    // of setConfiguration() call to ratis leader and the follower will catch
+    // up
+    LOG.info("Starting follower...");
+    cluster.startInactiveSCM(followerId);
+
+    // The recently started  should be lagging behind the leader .
+    SCMStateMachine followerSM =
+        followerSCM.getScmHAManager().getRatisServer().getSCMStateMachine();
+
+    // Wait & retry for follower to update transactions to leader
+    // snapshot index.
+    // Timeout error if follower does not load update within 3s
+    GenericTestUtils.waitFor(() ->
+        followerSM.getLastAppliedTermIndex().getIndex() >= 200,
+        100, 3000);
+    long followerLastAppliedIndex =
+        followerSM.getLastAppliedTermIndex().getIndex();
+    assertTrue(followerLastAppliedIndex >= 200);
+    assertFalse(followerSM.getLifeCycleState().isPausingOrPaused());
+
+    // Verify that the follower has the secret keys created
+    // while it was inactive.
+    SecretKeyManager followerSecretKeyManager =
+        followerSCM.getSecretKeyManager();
+    assertTrue(followerSecretKeyManager.isInitialized());
+    List<ManagedSecretKey> followerKeys =
+        followerSecretKeyManager.getSortedKeys();
+    LOG.info("Follower secret keys after snapshot: {}", followerKeys);
+    assertTrue(followerKeys.size() >= 2);
+    assertTrue(followerKeys.contains(currentKeyInLeader));
+    assertEquals(leaderSecretKeyManager.getSortedKeys(), followerKeys);
+
+    // Wait for the next rotation, assert that the updates can be synchronized
+    // normally post snapshot.
+    ManagedSecretKey currentKeyPostSnapshot =
+        leaderSecretKeyManager.getCurrentSecretKey();
+    GenericTestUtils.waitFor(() ->
+            !leaderSecretKeyManager.getCurrentSecretKey()
+                .equals(currentKeyPostSnapshot),
+        ROTATE_CHECK_DURATION_MS, ROTATE_DURATION_MS);
+    assertEquals(leaderSecretKeyManager.getSortedKeys(),
+        followerSecretKeyManager.getSortedKeys());
 
-    ex = assertThrows(SCMSecurityException.class,
-        () -> securityProtocol.getSecretKey(UUID.randomUUID()));
-    assertEquals(SECRET_KEY_NOT_ENABLED, ex.getErrorCode());
-
-    ex = assertThrows(SCMSecurityException.class,
-        securityProtocol::getAllSecretKeys);
-    assertEquals(SECRET_KEY_NOT_ENABLED, ex.getErrorCode());
   }
 
-  /**
-   * Verify API behavior when SCM leader fails.
-   */
-  @Test
-  public void testSecretKeyAfterSCMFailover() throws Exception {
-    enableBlockToken();
-    // set a long duration period, so that no rotation happens during SCM
-    // leader change.
-    conf.set(HDDS_SECRET_KEY_ROTATE_CHECK_DURATION, "10m");
-    conf.set(HDDS_SECRET_KEY_ROTATE_DURATION, "1d");
-    conf.set(HDDS_SECRET_KEY_EXPIRY_DURATION, "7d");
-
-    startCluster();
-    SCMSecurityProtocol securityProtocol = getScmSecurityProtocol();
-    List<ManagedSecretKey> keysInitial = securityProtocol.getAllSecretKeys();
-    LOG.info("Keys before fail over: {}.", keysInitial);
-
-    // turn the current SCM leader off.
-    StorageContainerManager activeSCM = cluster.getActiveSCM();
-    cluster.shutdownStorageContainerManager(activeSCM);
-    // wait for
-    cluster.waitForSCMToBeReady();
-
-    List<ManagedSecretKey> keysAfter = securityProtocol.getAllSecretKeys();
-    LOG.info("Keys after fail over: {}.", keysAfter);
-
-    assertEquals(keysInitial.size(), keysAfter.size());
-    for (int i = 0; i < keysInitial.size(); i++) {
-      assertEquals(keysInitial.get(i), keysAfter.get(i));
+  private List<ContainerInfo> writeToIncreaseLogIndex(
+      StorageContainerManager scm, long targetLogIndex)
+      throws IOException, InterruptedException, TimeoutException {
+    List<ContainerInfo> containers = new ArrayList<>();
+    SCMStateMachine stateMachine =
+        scm.getScmHAManager().getRatisServer().getSCMStateMachine();
+    long logIndex = scm.getScmHAManager().getRatisServer().getSCMStateMachine()
+        .getLastAppliedTermIndex().getIndex();
+    while (logIndex <= targetLogIndex) {
+      containers.add(scm.getContainerManager()
+          .allocateContainer(
+              RatisReplicationConfig.getInstance(ReplicationFactor.ONE),
+              this.getClass().getName()));
+      Thread.sleep(100);
+      logIndex = stateMachine.getLastAppliedTermIndex().getIndex();
     }
+    return containers;
   }
 
-  private void startCluster()
-      throws IOException, TimeoutException, InterruptedException {
-    OzoneManager.setTestSecureOmFlag(true);
-    MiniOzoneCluster.Builder builder = MiniOzoneCluster.newHABuilder(conf)
-        .setClusterId(clusterId)
-        .setSCMServiceId("TestSecretKey")
-        .setScmId(scmId)
-        .setNumDatanodes(3)
-        .setNumOfStorageContainerManagers(3)
-        .setNumOfOzoneManagers(1);
-
-    cluster = (MiniOzoneHAClusterImpl) builder.build();
-    cluster.waitForClusterToBeReady();
-  }
-
-  @NotNull
-  private SCMSecurityProtocol getScmSecurityProtocol() throws IOException {
-    UserGroupInformation ugi =
-        UserGroupInformation.loginUserFromKeytabAndReturnUGI(
-            testUserPrincipal, testUserKeytab.getCanonicalPath());
-    ugi.setAuthenticationMethod(KERBEROS);
-    SCMSecurityProtocol scmSecurityProtocolClient =
-        HddsServerUtil.getScmSecurityClient(conf, ugi);
-    assertNotNull(scmSecurityProtocolClient);
-    return scmSecurityProtocolClient;
-  }
-
-  private void enableBlockToken() {
-    conf.setBoolean(HDDS_BLOCK_TOKEN_ENABLED, true);
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to