This is an automated email from the ASF dual-hosted git repository.

duong pushed a commit to branch HDDS-7733-Symmetric-Tokens
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 43aaa9545a6814c99a2ea5bc6af31133ebefc34c
Author: Duong Nguyen <[email protected]>
AuthorDate: Fri Mar 3 09:01:35 2023 -0800

    HDDS-7734. Implement symmetric SecretKeys lifescycle management in SCM 
(#4194)
---
 .../org/apache/hadoop/hdds/HddsConfigKeys.java     |  22 +++
 .../common/src/main/resources/ozone-default.xml    |  48 +++++
 .../security/symmetric/LocalSecretKeyStore.java    | 199 +++++++++++++++++++
 .../hdds/security/symmetric/ManagedSecretKey.java  | 122 ++++++++++++
 .../hdds/security/symmetric/SecretKeyConfig.java   |  99 ++++++++++
 .../hdds/security/symmetric/SecretKeyManager.java  | 155 +++++++++++++++
 .../hdds/security/symmetric/SecretKeyState.java    |  52 +++++
 .../security/symmetric/SecretKeyStateImpl.java     | 108 +++++++++++
 .../hdds/security/symmetric/SecretKeyStore.java    |  35 ++++
 .../hdds/security/symmetric/package-info.java      |  63 ++++++
 .../symmetric/LocalSecretKeyStoreTest.java         | 188 ++++++++++++++++++
 .../security/symmetric/SecretKeyManagerTest.java   | 215 +++++++++++++++++++++
 .../src/main/proto/SCMRatisProtocol.proto          |   1 +
 .../src/main/proto/ScmServerSecurityProtocol.proto |   9 +
 .../apache/hadoop/hdds/scm/ha/io/CodecFactory.java |   2 +
 .../apache/hadoop/hdds/scm/ha/io/ListCodec.java    |   6 +-
 .../hdds/scm/ha/io/ManagedSecretKeyCodec.java      |  44 +++++
 .../scm/security/ScmSecretKeyStateBuilder.java     |  60 ++++++
 .../hdds/scm/security/SecretKeyManagerService.java | 159 +++++++++++++++
 .../hadoop/hdds/scm/security/package-info.java     |  22 +++
 .../hdds/scm/server/StorageContainerManager.java   |   9 +
 .../src/main/compose/ozonesecure-ha/docker-config  |   5 +
 22 files changed, 1622 insertions(+), 1 deletion(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
index 2c6b3ee62a..4a55d6709b 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
@@ -216,6 +216,28 @@ public final class HddsConfigKeys {
   public static final String HDDS_X509_ROOTCA_PRIVATE_KEY_FILE_DEFAULT =
       "";
 
+  public static final String HDDS_SECRET_KEY_FILE =
+      "hdds.secret.key.file.name";
+  public static final String HDDS_SECRET_KEY_FILE_DEFAULT = "secret_keys.json";
+
+  public static final String HDDS_SECRET_KEY_EXPIRY_DURATION =
+      "hdds.secret.key.expiry.duration";
+  public static final String HDDS_SECRET_KEY_EXPIRY_DURATION_DEFAULT = "7d";
+
+  public static final String HDDS_SECRET_KEY_ROTATE_DURATION =
+      "hdds.secret.key.rotate.duration";
+  public static final String HDDS_SECRET_KEY_ROTATE_DURATION_DEFAULT = "1d";
+
+  public static final String HDDS_SECRET_KEY_ALGORITHM =
+      "hdds.secret.key.algorithm";
+  public static final String HDDS_SECRET_KEY_ALGORITHM_DEFAULT =
+      "HmacSHA256";
+
+  public static final String HDDS_SECRET_KEY_ROTATE_CHECK_DURATION =
+      "hdds.secret.key.rotate.check.duration";
+  public static final String HDDS_SECRET_KEY_ROTATE_CHECK_DURATION_DEFAULT
+      = "10m";
+
   /**
    * Do not instantiate.
    */
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 56451ab83f..848b21e707 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3799,4 +3799,52 @@
       Max numbers of keys changed allowed for a snapshot diff job.
     </description>
   </property>
+
+  <property>
+    <name>hdds.secret.key.file.name</name>
+    <value>secret_keys.json</value>
+    <tag>SCM, SECURITY</tag>
+    <description>
+      Name of file which stores symmetric secret keys for token signatures.
+    </description>
+  </property>
+  <property>
+    <name>hdds.secret.key.expiry.duration</name>
+    <value>7d</value>
+    <tag>SCM, SECURITY</tag>
+    <description>
+      The duration for which symmetric secret keys issued by SCM are valid.
+      This default value, in combination with 
hdds.secret.key.rotate.duration=1d, results in 7 secret keys (for the
+      last 7 days) are kept valid at any point of time.
+    </description>
+  </property>
+  <property>
+    <name>hdds.secret.key.rotate.duration</name>
+    <value>1d</value>
+    <tag>SCM, SECURITY</tag>
+    <description>
+      The duration that SCM periodically generate a new symmetric secret keys.
+    </description>
+  </property>
+  <property>
+    <name>hdds.secret.key.rotate.check.duration</name>
+    <value>10m</value>
+    <tag>SCM, SECURITY</tag>
+    <description>
+      The duration that SCM periodically checks if it's time to generate new 
symmetric secret keys.
+      This config has an impact on the practical correctness of secret key 
expiry and rotation period. For example,
+      if hdds.secret.key.rotate.duration=1d and 
hdds.secret.key.rotate.check.duration=10m, the actual key rotation
+      will happen each 1d +/- 10m.
+    </description>
+  </property>
+  <property>
+    <name>hdds.secret.key.algorithm</name>
+    <value>HmacSHA256</value>
+    <tag>SCM, SECURITY</tag>
+    <description>
+      The algorithm that SCM uses to generate symmetric secret keys.
+      A valid algorithm is the one supported by KeyGenerator, as described at
+      
https://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#KeyGenerator.
+    </description>
+  </property>
 </configuration>
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/LocalSecretKeyStore.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/LocalSecretKeyStore.java
new file mode 100644
index 0000000000..48cc633b67
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/LocalSecretKeyStore.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.security.symmetric;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.SequenceWriter;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.crypto.SecretKey;
+import javax.crypto.spec.SecretKeySpec;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.PosixFilePermission;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import static com.google.common.collect.Sets.newHashSet;
+import static java.nio.file.Files.createDirectories;
+import static java.nio.file.Files.createFile;
+import static java.nio.file.Files.exists;
+import static java.nio.file.attribute.PosixFilePermission.OWNER_READ;
+import static java.nio.file.attribute.PosixFilePermission.OWNER_WRITE;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+
+/**
+ * A {@link SecretKeyStore} that saves and loads SecretKeys from/to a
+ * JSON file on local file system.
+ */
+public class LocalSecretKeyStore implements SecretKeyStore {
+  private static final Set<PosixFilePermission> SECRET_KEYS_PERMISSIONS =
+      newHashSet(OWNER_READ, OWNER_WRITE);
+  private static final Logger LOG =
+      LoggerFactory.getLogger(LocalSecretKeyStore.class);
+
+  private final Path secretKeysFile;
+  private final ObjectMapper mapper;
+
+  public LocalSecretKeyStore(Path secretKeysFile) {
+    this.secretKeysFile = requireNonNull(secretKeysFile);
+    this.mapper = new ObjectMapper()
+        .registerModule(new JavaTimeModule())
+        .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
+  }
+
+  @Override
+  public synchronized List<ManagedSecretKey> load() {
+    if (!secretKeysFile.toFile().exists()) {
+      return Collections.emptyList();
+    }
+
+    ObjectReader reader = mapper.readerFor(ManagedSecretKeyDto.class);
+    try (MappingIterator<ManagedSecretKeyDto> iterator =
+             reader.readValues(secretKeysFile.toFile())) {
+      List<ManagedSecretKeyDto> dtos = iterator.readAll();
+      List<ManagedSecretKey> result = dtos.stream()
+          .map(ManagedSecretKeyDto::toObject)
+          .collect(toList());
+      LOG.info("Loaded {} from {}", result, secretKeysFile);
+      return result;
+    } catch (IOException e) {
+      throw new IllegalStateException("Error reading SecretKeys from "
+          + secretKeysFile, e);
+    }
+  }
+
+  @Override
+  public synchronized void save(Collection<ManagedSecretKey> secretKeys) {
+    createSecretKeyFiles();
+
+    List<ManagedSecretKeyDto> dtos = secretKeys.stream()
+        .map(ManagedSecretKeyDto::new)
+        .collect(toList());
+
+    try (SequenceWriter writer =
+             mapper.writer().writeValues(secretKeysFile.toFile())) {
+      writer.init(true);
+      writer.writeAll(dtos);
+    } catch (IOException e) {
+      throw new IllegalStateException("Error saving SecretKeys to file "
+          + secretKeysFile, e);
+    }
+    LOG.info("Saved {} to file {}", secretKeys, secretKeysFile);
+  }
+
+  private void createSecretKeyFiles() {
+    try {
+      if (!exists(secretKeysFile)) {
+        Path parent = secretKeysFile.getParent();
+        if (parent != null && !exists(parent)) {
+          createDirectories(parent);
+        }
+        createFile(secretKeysFile);
+      }
+      Files.setPosixFilePermissions(secretKeysFile, SECRET_KEYS_PERMISSIONS);
+    } catch (IOException e) {
+      throw new IllegalStateException("Error setting secret keys file" +
+          " permission: " + secretKeysFile, e);
+    }
+  }
+
+  /**
+   * Just a simple DTO that allows serializing/deserializing the immutable
+   * {@link ManagedSecretKey} objects.
+   */
+  private static class ManagedSecretKeyDto {
+    private UUID id;
+    private Instant creationTime;
+    private Instant expiryTime;
+    private String algorithm;
+    private byte[] encoded;
+
+    /**
+     * Used by Jackson when deserializing.
+     */
+    ManagedSecretKeyDto() {
+    }
+
+    ManagedSecretKeyDto(ManagedSecretKey object) {
+      id = object.getId();
+      creationTime = object.getCreationTime();
+      expiryTime = object.getExpiryTime();
+      algorithm = object.getSecretKey().getAlgorithm();
+      encoded = object.getSecretKey().getEncoded();
+    }
+
+    public ManagedSecretKey toObject() {
+      SecretKey secretKey = new SecretKeySpec(this.encoded, this.algorithm);
+      return new ManagedSecretKey(id, creationTime,
+          expiryTime, secretKey);
+    }
+
+    public UUID getId() {
+      return id;
+    }
+
+    public void setId(UUID id) {
+      this.id = id;
+    }
+
+    public Instant getCreationTime() {
+      return creationTime;
+    }
+
+    public void setCreationTime(Instant creationTime) {
+      this.creationTime = creationTime;
+    }
+
+    public Instant getExpiryTime() {
+      return expiryTime;
+    }
+
+    public void setExpiryTime(Instant expiryTime) {
+      this.expiryTime = expiryTime;
+    }
+
+    public String getAlgorithm() {
+      return algorithm;
+    }
+
+    public void setAlgorithm(String algorithm) {
+      this.algorithm = algorithm;
+    }
+
+    public byte[] getEncoded() {
+      return encoded;
+    }
+
+    public void setEncoded(byte[] encoded) {
+      this.encoded = encoded;
+    }
+  }
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/ManagedSecretKey.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/ManagedSecretKey.java
new file mode 100644
index 0000000000..7e8aaacb48
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/ManagedSecretKey.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.security.symmetric;
+
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos;
+
+import javax.crypto.SecretKey;
+import javax.crypto.spec.SecretKeySpec;
+import java.io.Serializable;
+import java.time.Instant;
+import java.util.UUID;
+
+/**
+ * Enclosed a symmetric {@link SecretKey} with additional data for life-cycle
+ * management.
+ */
+public final class ManagedSecretKey implements Serializable {
+  private final UUID id;
+  private final Instant creationTime;
+  private final Instant expiryTime;
+  private final SecretKey secretKey;
+
+  public ManagedSecretKey(UUID id,
+                          Instant creationTime,
+                          Instant expiryTime,
+                          SecretKey secretKey) {
+    this.id = id;
+    this.creationTime = creationTime;
+    this.expiryTime = expiryTime;
+    this.secretKey = secretKey;
+  }
+
+  public boolean isExpired() {
+    return expiryTime.isBefore(Instant.now());
+  }
+
+  public UUID getId() {
+    return id;
+  }
+
+  public SecretKey getSecretKey() {
+    return secretKey;
+  }
+
+  public Instant getCreationTime() {
+    return creationTime;
+  }
+
+  public Instant getExpiryTime() {
+    return expiryTime;
+  }
+
+  @Override
+  public int hashCode() {
+    return id.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof ManagedSecretKey)) {
+      return false;
+    }
+    ManagedSecretKey that = (ManagedSecretKey) obj;
+    return this.id.equals(that.id);
+  }
+
+  @Override
+  public String toString() {
+    return "SecretKey(id = " + id + ", creation at: "
+        + creationTime + ", expire at: " + expiryTime + ")";
+  }
+
+  /**
+   * @return the protobuf message to deserialize this object.
+   */
+  public SCMSecurityProtocolProtos.ManagedSecretKey toProtobuf() {
+    HddsProtos.UUID uuid = HddsProtos.UUID.newBuilder()
+        .setMostSigBits(this.id.getMostSignificantBits())
+        .setLeastSigBits(this.id.getLeastSignificantBits())
+        .build();
+
+    return SCMSecurityProtocolProtos.ManagedSecretKey.newBuilder()
+        .setId(uuid)
+        .setCreationTime(this.creationTime.toEpochMilli())
+        .setExpiryTime(this.expiryTime.toEpochMilli())
+        .setAlgorithm(this.secretKey.getAlgorithm())
+        .setEncoded(ByteString.copyFrom(this.secretKey.getEncoded()))
+        .build();
+  }
+
+  /**
+   * Create a {@link ManagedSecretKey} from a given protobuf message.
+   */
+  public static ManagedSecretKey fromProtobuf(
+      SCMSecurityProtocolProtos.ManagedSecretKey message) {
+    UUID id = new UUID(message.getId().getMostSigBits(),
+        message.getId().getLeastSigBits());
+    Instant creationTime = Instant.ofEpochMilli(message.getCreationTime());
+    Instant expiryTime = Instant.ofEpochMilli(message.getExpiryTime());
+    SecretKey secretKey = new SecretKeySpec(message.getEncoded().toByteArray(),
+        message.getAlgorithm());
+    return new ManagedSecretKey(id, creationTime, expiryTime, secretKey);
+  }
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyConfig.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyConfig.java
new file mode 100644
index 0000000000..f2a9181051
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyConfig.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.security.symmetric;
+
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_KEY_DIR_NAME;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_KEY_DIR_NAME_DEFAULT;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_METADATA_DIR_NAME;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SECRET_KEY_ALGORITHM;
+import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SECRET_KEY_ALGORITHM_DEFAULT;
+import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SECRET_KEY_EXPIRY_DURATION;
+import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SECRET_KEY_EXPIRY_DURATION_DEFAULT;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SECRET_KEY_FILE;
+import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SECRET_KEY_FILE_DEFAULT;
+import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SECRET_KEY_ROTATE_CHECK_DURATION;
+import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SECRET_KEY_ROTATE_CHECK_DURATION_DEFAULT;
+import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SECRET_KEY_ROTATE_DURATION;
+import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SECRET_KEY_ROTATE_DURATION_DEFAULT;
+import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
+
+/**
+ * Configurations related to SecretKeys lifecycle management.
+ */
+public class SecretKeyConfig {
+  private final Path localSecretKeyFile;
+  private final Duration rotateDuration;
+  private final Duration expiryDuration;
+  private final String algorithm;
+  private final Duration rotationCheckDuration;
+
+  public SecretKeyConfig(ConfigurationSource conf, String component) {
+    String metadataDir = conf.get(HDDS_METADATA_DIR_NAME,
+        conf.get(OZONE_METADATA_DIRS));
+    String keyDir = conf.get(HDDS_KEY_DIR_NAME, HDDS_KEY_DIR_NAME_DEFAULT);
+    String fileName = conf.get(HDDS_SECRET_KEY_FILE,
+        HDDS_SECRET_KEY_FILE_DEFAULT);
+    localSecretKeyFile = Paths.get(metadataDir, component, keyDir, fileName);
+
+    long rotateDurationInMs = conf.getTimeDuration(
+        HDDS_SECRET_KEY_ROTATE_DURATION,
+        HDDS_SECRET_KEY_ROTATE_DURATION_DEFAULT, TimeUnit.MILLISECONDS);
+    this.rotateDuration = Duration.ofMillis(rotateDurationInMs);
+
+    long expiryDurationInMs = conf.getTimeDuration(
+        HDDS_SECRET_KEY_EXPIRY_DURATION,
+        HDDS_SECRET_KEY_EXPIRY_DURATION_DEFAULT, TimeUnit.MILLISECONDS);
+    this.expiryDuration = Duration.ofMillis(expiryDurationInMs);
+
+    this.algorithm = conf.get(HDDS_SECRET_KEY_ALGORITHM,
+        HDDS_SECRET_KEY_ALGORITHM_DEFAULT);
+
+    long rotationCheckInMs = conf.getTimeDuration(
+        HDDS_SECRET_KEY_ROTATE_CHECK_DURATION,
+        HDDS_SECRET_KEY_ROTATE_CHECK_DURATION_DEFAULT, TimeUnit.MILLISECONDS);
+    this.rotationCheckDuration = Duration.ofMillis(rotationCheckInMs);
+  }
+
+  public Path getLocalSecretKeyFile() {
+    return localSecretKeyFile;
+  }
+
+  public Duration getRotateDuration() {
+    return rotateDuration;
+  }
+
+  public Duration getExpiryDuration() {
+    return expiryDuration;
+  }
+
+  public String getAlgorithm() {
+    return algorithm;
+  }
+
+  public Duration getRotationCheckDuration() {
+    return rotationCheckDuration;
+  }
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyManager.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyManager.java
new file mode 100644
index 0000000000..0dc5bf8902
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyManager.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.security.symmetric;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.crypto.KeyGenerator;
+import java.security.NoSuchAlgorithmException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+
+import static java.time.Duration.between;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+
+/**
+ * This component manages symmetric SecretKey life-cycle, including generation,
+ * rotation and destruction.
+ */
+public class SecretKeyManager {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SecretKeyManager.class);
+
+  private final SecretKeyState state;
+  private final Duration rotationDuration;
+  private final Duration validityDuration;
+  private final SecretKeyStore keyStore;
+
+  private final KeyGenerator keyGenerator;
+
+  public SecretKeyManager(SecretKeyState state,
+                          SecretKeyStore keyStore,
+                          Duration rotationDuration,
+                          Duration validityDuration,
+                          String algorithm) {
+    this.state = requireNonNull(state);
+    this.rotationDuration = requireNonNull(rotationDuration);
+    this.validityDuration = requireNonNull(validityDuration);
+    this.keyStore = requireNonNull(keyStore);
+    this.keyGenerator = createKeyGenerator(algorithm);
+  }
+
+  public SecretKeyManager(SecretKeyState state,
+                          SecretKeyStore keyStore,
+                          SecretKeyConfig config) {
+    this(state, keyStore, config.getRotateDuration(),
+        config.getExpiryDuration(), config.getAlgorithm());
+  }
+
+  /**
+   * If the SecretKey state is not initialized, initialize it from by loading
+   * SecretKeys from local file, or generate new keys if the file doesn't
+   * exist.
+   */
+  public synchronized void checkAndInitialize() throws TimeoutException {
+    if (isInitialized()) {
+      return;
+    }
+
+    LOG.info("Initializing SecretKeys.");
+
+    // Load and filter expired keys.
+    List<ManagedSecretKey> allKeys = keyStore.load()
+        .stream()
+        .filter(x -> !x.isExpired())
+        .collect(toList());
+
+    if (allKeys.isEmpty()) {
+      // if no valid key present , generate new key as the current key.
+      // This happens at first start or restart after being down for
+      // a significant time.
+      ManagedSecretKey newKey = generateSecretKey();
+      allKeys.add(newKey);
+      LOG.info("No valid key has been loaded. " +
+          "A new key is generated: {}", newKey);
+    } else {
+      LOG.info("Keys reloaded: {}", allKeys);
+    }
+
+    state.updateKeys(allKeys);
+  }
+
+  public boolean isInitialized() {
+    return state.getCurrentKey() != null;
+  }
+
+  /**
+   * Check and rotate the keys.
+   *
+   * @return true if rotation actually happens, false if it doesn't.
+   */
+  public synchronized boolean checkAndRotate() throws TimeoutException {
+    // Initialize the state if it's not initialized already.
+    checkAndInitialize();
+
+    ManagedSecretKey currentKey = state.getCurrentKey();
+    if (shouldRotate(currentKey)) {
+      ManagedSecretKey newCurrentKey = generateSecretKey();
+      List<ManagedSecretKey> updatedKeys = state.getSortedKeys()
+          .stream().filter(x -> !x.isExpired())
+          .collect(toList());
+      updatedKeys.add(newCurrentKey);
+
+      LOG.info("SecretKey rotation is happening, new key generated {}",
+          newCurrentKey);
+      state.updateKeys(updatedKeys);
+      return true;
+    }
+    return false;
+  }
+
+  private boolean shouldRotate(ManagedSecretKey currentKey) {
+    Duration established = between(currentKey.getCreationTime(), 
Instant.now());
+    return established.compareTo(rotationDuration) >= 0;
+  }
+
+  private ManagedSecretKey generateSecretKey() {
+    Instant now = Instant.now();
+    return new ManagedSecretKey(
+        UUID.randomUUID(),
+        now,
+        now.plus(validityDuration),
+        keyGenerator.generateKey()
+    );
+  }
+
+  private KeyGenerator createKeyGenerator(String algorithm) {
+    try {
+      return KeyGenerator.getInstance(algorithm);
+    } catch (NoSuchAlgorithmException e) {
+      throw new IllegalArgumentException("Error creating KeyGenerator for " +
+          "algorithm " + algorithm, e);
+    }
+  }
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyState.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyState.java
new file mode 100644
index 0000000000..7be70b4b02
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyState.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.security.symmetric;
+
+import org.apache.hadoop.hdds.scm.metadata.Replicate;
+
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * This component holds the state of managed SecretKeys, including the
+ * current key and all active keys.
+ */
+public interface SecretKeyState {
+  /**
+   * Get the current active key, which is used for signing tokens. This is
+   * also the latest key managed by this state.
+   *
+   * @return the current active key, or null if the state is not initialized.
+   */
+  ManagedSecretKey getCurrentKey();
+
+  /**
+   * Get the keys that managed by this manager.
+   * The returned keys are sorted by creation time, in the order of latest
+   * to oldest.
+   */
+  List<ManagedSecretKey> getSortedKeys();
+
+  /**
+   * Update the SecretKeys.
+   * This method replicates SecretKeys across all SCM instances.
+   */
+  @Replicate
+  void updateKeys(List<ManagedSecretKey> newKeys) throws TimeoutException;
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyStateImpl.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyStateImpl.java
new file mode 100644
index 0000000000..d5c886fd99
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyStateImpl.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.security.symmetric;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static java.util.Comparator.comparing;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Default implementation of {@link SecretKeyState}.
+ */
+public final class SecretKeyStateImpl implements SecretKeyState {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SecretKeyStateImpl.class);
+
+  private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+  private List<ManagedSecretKey> sortedKeys;
+  private ManagedSecretKey currentKey;
+
+  private final SecretKeyStore keyStore;
+
+  /**
+   * Instantiate a state with no keys. This state object needs to be backed by
+   * a proper replication proxy so that the @Replication method works.
+   */
+  public SecretKeyStateImpl(SecretKeyStore keyStore) {
+    this.keyStore = requireNonNull(keyStore);
+  }
+
+  /**
+   * Get the current active key, which is used for signing tokens. This is
+   * also the latest key managed by this state.
+   */
+  @Override
+  public ManagedSecretKey getCurrentKey() {
+    lock.readLock().lock();
+    try {
+      return currentKey;
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Get the keys that managed by this manager.
+   * The returned keys are sorted by creation time, in the order of latest
+   * to oldest.
+   */
+  @Override
+  public List<ManagedSecretKey> getSortedKeys() {
+    lock.readLock().lock();
+    try {
+      return sortedKeys;
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Update the SecretKeys.
+   * This method replicates SecretKeys across all SCM instances.
+   */
+  @Override
+  public void updateKeys(List<ManagedSecretKey> newKeys) {
+    LOG.info("Updating keys with {}", newKeys);
+    lock.writeLock().lock();
+    try {
+      // Store sorted keys in order of latest to oldest and make it
+      // immutable so that can be used to answer queries directly.
+      sortedKeys = Collections.unmodifiableList(
+          newKeys.stream()
+              .sorted(comparing(ManagedSecretKey::getCreationTime).reversed())
+              .collect(toList())
+      );
+      currentKey = sortedKeys.get(0);
+      LOG.info("Current key updated {}", currentKey);
+      keyStore.save(sortedKeys);
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+}
\ No newline at end of file
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyStore.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyStore.java
new file mode 100644
index 0000000000..c851c3683d
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyStore.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.security.symmetric;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Interface for SecretKey storage component, which is responsible for saving
+ * the SecretKeys states persistently to ensure they're not lost during
+ * restarts.
+ *
+ * This interface allows new persistent storage to be plugged in easily.
+ */
+public interface SecretKeyStore {
+  List<ManagedSecretKey> load();
+
+  void save(Collection<ManagedSecretKey> secretKeys);
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/package-info.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/package-info.java
new file mode 100644
index 0000000000..2997fe0a26
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/package-info.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * In secure mode, Ozone uses symmetric key algorithm to sign all its issued
+ * tokens, such as block or container tokens. These tokens are then verified
+ * by datanodes to ensure their authenticity and integrity.
+ * <p/>
+ *
+ * That process requires symmetric {@link javax.crypto.SecretKey} to be
+ * generated, managed, and distributed to different Ozone components.
+ * For example, the token signer (Ozone Manager and SCM) and the
+ * verifier (datanode) need to use the same SecretKey.
+ * <p/>
+ *
+ * This package encloses the logic to manage symmetric secret keys
+ * lifecycle. In details, it consists of the following components:
+ * <ul>
+ *   <li>
+ *     The definition of manage secret key which is shared between SCM,
+ *     OM and datanodes, see
+ *     {@link org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey}.
+ *   </li>
+ *
+ *   <li>
+ *     The definition of secret key states, which is designed to get replicated
+ *     across all SCM instances, see
+ *     {@link org.apache.hadoop.hdds.security.symmetric.SecretKeyState}
+ *   </li>
+ *
+ *   <li>
+ *    The definition and implementation of secret key persistent storage, to
+ *    help retain SecretKey after restarts, see
+ *    {@link org.apache.hadoop.hdds.security.symmetric.SecretKeyStore} and
+ *    {@link org.apache.hadoop.hdds.security.symmetric.LocalSecretKeyStore}.
+ *   </li>
+ *
+ *   <li>
+ *     The basic logic to manage secret key lifecycle, see
+ *     {@link org.apache.hadoop.hdds.security.symmetric.SecretKeyManager}
+ *   </li>
+ * </ul>
+ *
+ * <p/>
+ * The original overall design can be found at
+ * <a href=https://issues.apache.org/jira/browse/HDDS-7733>HDDS-7733</a>.
+ */
+package org.apache.hadoop.hdds.security.symmetric;
diff --git 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/symmetric/LocalSecretKeyStoreTest.java
 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/symmetric/LocalSecretKeyStoreTest.java
new file mode 100644
index 0000000000..c406ce2b08
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/symmetric/LocalSecretKeyStoreTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.security.symmetric;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import javax.crypto.KeyGenerator;
+import javax.crypto.SecretKey;
+import javax.crypto.spec.SecretKeySpec;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.nio.file.attribute.PosixFilePermission;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.collect.Sets.newHashSet;
+import static java.nio.file.attribute.PosixFilePermission.OWNER_READ;
+import static java.nio.file.attribute.PosixFilePermission.OWNER_WRITE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link LocalSecretKeyStore}.
+ */
+public class LocalSecretKeyStoreTest {
+  private SecretKeyStore secretKeyStore;
+  private Path testSecretFile;
+
+  @BeforeEach
+  private void setup() throws Exception {
+    testSecretFile = Files.createTempFile("key-strore-test", ".json");
+    secretKeyStore = new LocalSecretKeyStore(testSecretFile);
+  }
+
+  public static Stream<Arguments> saveAndLoadTestCases() throws Exception {
+    return Stream.of(
+        // empty
+        Arguments.of(ImmutableList.of()),
+        // single secret keys.
+        Arguments.of(newArrayList(
+            generateKey("HmacSHA256")
+        )),
+        // multiple secret keys.
+        Arguments.of(newArrayList(
+            generateKey("HmacSHA1"),
+            generateKey("HmacSHA256")
+        ))
+    );
+  }
+
+  @ParameterizedTest
+  @MethodSource("saveAndLoadTestCases")
+  public void testSaveAndLoad(List<ManagedSecretKey> keys) throws IOException {
+    secretKeyStore.save(keys);
+
+    // Ensure the intended file exists and is readable and writeable to
+    // file owner only.
+    File file = testSecretFile.toFile();
+    assertTrue(file.exists());
+    Set<PosixFilePermission> permissions =
+        Files.getPosixFilePermissions(file.toPath());
+    assertEquals(newHashSet(OWNER_READ, OWNER_WRITE), permissions);
+
+    List<ManagedSecretKey> reloadedKeys = secretKeyStore.load();
+    assertEqualKeys(keys, reloadedKeys);
+  }
+
+  /**
+   * Verifies that secret keys are overwritten by subsequent writes.
+   */
+  @Test
+  public void testOverwrite() throws Exception {
+    List<ManagedSecretKey> initialKeys =
+        newArrayList(generateKey("HmacSHA256"));
+    secretKeyStore.save(initialKeys);
+
+    List<ManagedSecretKey> updatedKeys = newArrayList(
+        generateKey("HmacSHA1"),
+        generateKey("HmacSHA256")
+    );
+    secretKeyStore.save(updatedKeys);
+
+    assertEqualKeys(updatedKeys, secretKeyStore.load());
+  }
+
+  /**
+   * This scenario verifies if an existing secret keys file can be loaded.
+   * The intention of this is to ensure a saved file can be loaded after
+   * future changes to {@link ManagedSecretKey} schema.
+   *
+   * Please don't just change the content of test json if this
+   * test fails, instead, analyse the backward-compatibility of the change.
+   */
+  @Test
+  public void testLoadExistingFile() throws Exception {
+    // copy test file content to the backing file.
+    String testJson = "[\n" +
+        "  {\n" +
+        "    \"id\":\"78864cfb-793b-4157-8ad6-714c9f950a16\",\n" +
+        "    \"creationTime\":\"2007-12-03T10:15:30Z\",\n" +
+        "    \"expiryTime\":\"2007-12-03T11:15:30Z\",\n" +
+        "    \"algorithm\":\"HmacSHA256\",\n" +
+        "    \"encoded\":\"YSeCdJRB4RclxoeE69ENmTe2Cv8ybyKhHP3mq4M1r8o=\"\n" +
+        "  }\n" +
+        "]";
+    Files.write(testSecretFile, Collections.singletonList(testJson),
+        StandardOpenOption.WRITE);
+
+    Instant date = Instant.parse("2007-12-03T10:15:30.00Z");
+    ManagedSecretKey secretKey = new ManagedSecretKey(
+        UUID.fromString("78864cfb-793b-4157-8ad6-714c9f950a16"),
+        date,
+        date.plus(Duration.ofHours(1)),
+        new SecretKeySpec(
+            Base64.getDecoder().decode(
+                "YSeCdJRB4RclxoeE69ENmTe2Cv8ybyKhHP3mq4M1r8o="),
+            "HmacSHA256"
+        ));
+
+    List<ManagedSecretKey> expectedKeys = newArrayList(secretKey);
+    assertEqualKeys(expectedKeys, secretKeyStore.load());
+  }
+
+  private void assertEqualKeys(List<ManagedSecretKey> expected,
+                               List<ManagedSecretKey> actual) {
+    assertEquals(expected.size(), actual.size());
+    for (int i = 0; i < expected.size(); i++) {
+      ManagedSecretKey expectedKey = expected.get(i);
+      ManagedSecretKey actualKey = actual.get(i);
+
+      assertEquals(expectedKey.getId(), actualKey.getId());
+      assertEquals(expectedKey.getCreationTime().toEpochMilli(),
+          actualKey.getCreationTime().toEpochMilli());
+      assertEquals(expectedKey.getExpiryTime(),
+          actualKey.getExpiryTime());
+      assertEquals(expectedKey.getSecretKey(), actualKey.getSecretKey());
+    }
+  }
+
+  private static ManagedSecretKey generateKey(String algorithm)
+      throws Exception {
+    return generateKey(algorithm, Instant.now());
+  }
+
+  private static ManagedSecretKey generateKey(String algorithm,
+                                              Instant creationTime)
+      throws Exception {
+    KeyGenerator keyGen = KeyGenerator.getInstance(algorithm);
+    SecretKey secretKey = keyGen.generateKey();
+    return new ManagedSecretKey(
+        UUID.randomUUID(),
+        creationTime,
+        creationTime.plus(Duration.ofHours(1)),
+        secretKey
+    );
+  }
+}
diff --git 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyManagerTest.java
 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyManagerTest.java
new file mode 100644
index 0000000000..053148e28d
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyManagerTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.security.symmetric;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import javax.crypto.KeyGenerator;
+import javax.crypto.SecretKey;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Stream;
+
+import static com.google.common.collect.Lists.newArrayList;
+import static java.time.Instant.now;
+import static java.time.temporal.ChronoUnit.DAYS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.params.provider.Arguments.of;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests cases for {@link SecretKeyManager} implementation.
+ */
+public class SecretKeyManagerTest {
+  private static final Duration VALIDITY_DURATION = Duration.ofDays(3);
+  private static final Duration ROTATION_DURATION = Duration.ofDays(1);
+  private static final String ALGORITHM = "HmacSHA256";
+
+  private SecretKeyStore mockedKeyStore;
+
+  @BeforeEach
+  private void setup() {
+    mockedKeyStore = Mockito.mock(SecretKeyStore.class);
+  }
+
+  public static Stream<Arguments> loadSecretKeysTestCases() throws Exception {
+    ManagedSecretKey k0 = generateKey(now());
+    ManagedSecretKey k1 = generateKey(now().minus(1, DAYS));
+    ManagedSecretKey k2 = generateKey(now().minus(2, DAYS));
+    ManagedSecretKey k3 = generateKey(now().minus(3, DAYS));
+    ManagedSecretKey k4 = generateKey(now().minus(4, DAYS));
+    ManagedSecretKey k5 = generateKey(now().minus(5, DAYS));
+    return Stream.of(
+        // first start
+        of(ImmutableList.of(), null, null),
+
+        // restart => nothing is filtered
+        of(newArrayList(k0, k1, k2), k0, newArrayList(k0, k1, k2)),
+
+        // stop 1 day and start
+        of(newArrayList(k1, k2, k3), k1, newArrayList(k1, k2)),
+
+        // stop 2 day and start => expired keys are filtered
+        of(newArrayList(k2, k3, k4), k2, newArrayList(k2)),
+
+        // stop 3 day and start, all saved keys are filtered
+        of(newArrayList(k3, k4, k5), null, null)
+    );
+  }
+
+  /**
+   * Verify how SecretKeyManager initializes its keys under different 
scenarios,
+   * e.g. with or without the present of saved keys.
+   */
+  @ParameterizedTest
+  @MethodSource("loadSecretKeysTestCases")
+  public void testLoadSecretKeys(List<ManagedSecretKey> savedSecretKey,
+                                 ManagedSecretKey expectedCurrentKey,
+                                 List<ManagedSecretKey> expectedLoadedKeys)
+      throws Exception {
+    SecretKeyState state = new SecretKeyStateImpl(mockedKeyStore);
+    SecretKeyManager lifeCycleManager =
+        new SecretKeyManager(state, mockedKeyStore,
+            ROTATION_DURATION, VALIDITY_DURATION, ALGORITHM);
+
+    when(mockedKeyStore.load()).thenReturn(savedSecretKey);
+    lifeCycleManager.checkAndInitialize();
+
+    if (expectedCurrentKey != null) {
+      assertEquals(state.getCurrentKey(), expectedCurrentKey);
+      List<ManagedSecretKey> allKeys = state.getSortedKeys();
+      assertSameKeys(expectedLoadedKeys, allKeys);
+    } else {
+      // expect the current key is newly generated.
+      assertFalse(savedSecretKey.contains(state.getCurrentKey()));
+      assertEquals(1, state.getSortedKeys().size());
+      assertTrue(state.getSortedKeys().contains(
+          state.getCurrentKey()));
+    }
+  }
+
+  private static void assertSameKeys(Collection<ManagedSecretKey> expected,
+                                     Collection<ManagedSecretKey> actual) {
+    assertEquals(expected.size(), actual.size());
+    for (ManagedSecretKey expectedKey : expected) {
+      assertTrue(actual.contains(expectedKey));
+    }
+  }
+
+  public static Stream<Arguments> rotationTestCases() throws Exception {
+    ManagedSecretKey k0 = generateKey(now());
+    ManagedSecretKey k1 = generateKey(now().minus(1, DAYS));
+    ManagedSecretKey k2 = generateKey(now().minus(2, DAYS));
+    ManagedSecretKey k3 = generateKey(now().minus(3, DAYS));
+    ManagedSecretKey k4 = generateKey(now().minus(4, DAYS));
+    return Stream.of(
+
+        // Currentkey is new, not rotate.
+        of(newArrayList(k0, k1, k2), false, null),
+
+        // Current key just exceeds the rotation period.
+        of(newArrayList(k1, k2, k3), true, newArrayList(k1, k2)),
+
+        // Current key exceeds the rotation period for a significant time (2d).
+        of(newArrayList(k2, k3, k4), true, newArrayList(k2))
+    );
+  }
+
+  /**
+   * Verify rotation behavior under different scenarios.
+   */
+  @ParameterizedTest
+  @MethodSource("rotationTestCases")
+  public void testRotate(List<ManagedSecretKey> initialKeys,
+                         boolean expectRotate,
+                         List<ManagedSecretKey> expectedRetainedKeys)
+      throws TimeoutException {
+
+    SecretKeyState state = new SecretKeyStateImpl(mockedKeyStore);
+
+    SecretKeyManager lifeCycleManager =
+        new SecretKeyManager(state, mockedKeyStore,
+            ROTATION_DURATION, VALIDITY_DURATION, ALGORITHM);
+
+    // Set the initial state.
+    state.updateKeys(initialKeys);
+    ManagedSecretKey initialCurrentKey = state.getCurrentKey();
+    Mockito.reset(mockedKeyStore);
+
+    assertEquals(expectRotate, lifeCycleManager.checkAndRotate());
+
+    if (expectRotate) {
+      // Verify rotation behavior.
+
+      // 1. A new key is generated as current key.
+      ManagedSecretKey currentKey = state.getCurrentKey();
+      assertNotEquals(initialCurrentKey, currentKey);
+      assertFalse(initialKeys.contains(currentKey));
+
+      // 2. keys are correctly rotated, expired ones are excluded.
+      List<ManagedSecretKey> expectedAllKeys = expectedRetainedKeys;
+      expectedAllKeys.add(currentKey);
+      assertSameKeys(expectedAllKeys, state.getSortedKeys());
+
+      // 3. All keys are stored.
+      ArgumentCaptor<Collection<ManagedSecretKey>> storedKeyCaptor =
+          ArgumentCaptor.forClass(Collection.class);
+      verify(mockedKeyStore).save(storedKeyCaptor.capture());
+      assertSameKeys(expectedAllKeys, storedKeyCaptor.getValue());
+
+      // 4. The new generated key has correct data.
+      assertEquals(ALGORITHM, currentKey.getSecretKey().getAlgorithm());
+      assertEquals(0,
+          Duration.between(currentKey.getCreationTime(), now()).toMinutes());
+      Instant expectedExpiryTime = now().plus(VALIDITY_DURATION);
+      assertEquals(0,
+          Duration.between(currentKey.getExpiryTime(),
+              expectedExpiryTime).toMinutes());
+    } else {
+      assertEquals(initialCurrentKey, state.getCurrentKey());
+      assertSameKeys(initialKeys, state.getSortedKeys());
+    }
+  }
+
+  private static ManagedSecretKey generateKey(Instant creationTime)
+      throws Exception {
+    KeyGenerator keyGen = KeyGenerator.getInstance(ALGORITHM);
+    SecretKey secretKey = keyGen.generateKey();
+    return new ManagedSecretKey(
+        UUID.randomUUID(),
+        creationTime,
+        creationTime.plus(VALIDITY_DURATION),
+        secretKey
+    );
+  }
+}
diff --git a/hadoop-hdds/interface-server/src/main/proto/SCMRatisProtocol.proto 
b/hadoop-hdds/interface-server/src/main/proto/SCMRatisProtocol.proto
index 41da6a5468..4fb0737b39 100644
--- a/hadoop-hdds/interface-server/src/main/proto/SCMRatisProtocol.proto
+++ b/hadoop-hdds/interface-server/src/main/proto/SCMRatisProtocol.proto
@@ -29,6 +29,7 @@ enum RequestType {
     MOVE = 6;
     STATEFUL_SERVICE_CONFIG = 7;
     FINALIZE = 8;
+    SECRET_KEY = 9;
 }
 
 message Method {
diff --git 
a/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto 
b/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto
index dc6bcf986c..3621018fa8 100644
--- 
a/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto
+++ 
b/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto
@@ -249,3 +249,12 @@ message SCMRevokeCertificatesResponseProto {
 service SCMSecurityProtocolService {
     rpc submitRequest (SCMSecurityRequest) returns (SCMSecurityResponse);
 }
+
+message ManagedSecretKey {
+    required UUID id = 1;
+    required uint64 creationTime = 2;
+    required uint64 expiryTime = 3;
+    required string algorithm = 4;
+    required bytes encoded = 5;
+}
+
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/CodecFactory.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/CodecFactory.java
index 6c75593be1..af77051509 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/CodecFactory.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/CodecFactory.java
@@ -23,6 +23,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.ProtocolMessageEnum;
 
 import org.apache.commons.lang3.ClassUtils;
+import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
 
 import java.math.BigInteger;
 import java.security.cert.X509Certificate;
@@ -49,6 +50,7 @@ public final class CodecFactory {
     codecs.put(BigInteger.class, new BigIntegerCodec());
     codecs.put(X509Certificate.class, new X509CertificateCodec());
     codecs.put(ByteString.class, new ByteStringCodec());
+    codecs.put(ManagedSecretKey.class, new ManagedSecretKeyCodec());
   }
 
   private CodecFactory() { }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/ListCodec.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/ListCodec.java
index 0667b8776f..67d8d55227 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/ListCodec.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/ListCodec.java
@@ -23,6 +23,7 @@ import 
org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.ListArgument;
 import org.apache.hadoop.hdds.scm.ha.ReflectionUtil;
 
 import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -51,8 +52,11 @@ public class ListCodec implements Codec {
   public Object deserialize(Class<?> type, ByteString value)
       throws InvalidProtocolBufferException {
     try {
+      // If argument type is the generic interface, then determine a
+      // concrete implementation.
+      Class<?> concreteType = (type == List.class) ? ArrayList.class : type;
 
-      List<Object> result = (List<Object>) type.newInstance();
+      List<Object> result = (List<Object>) concreteType.newInstance();
       final ListArgument listArgs = (ListArgument) ReflectionUtil
           .getMethod(ListArgument.class, "parseFrom", byte[].class)
           .invoke(null, (Object) value.toByteArray());
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/ManagedSecretKeyCodec.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/ManagedSecretKeyCodec.java
new file mode 100644
index 0000000000..384d818762
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/ManagedSecretKeyCodec.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha.io;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos;
+import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
+
+/**
+ * A codec for {@link ManagedSecretKey} objects.
+ */
+public class ManagedSecretKeyCodec implements Codec {
+  @Override
+  public ByteString serialize(Object object)
+      throws InvalidProtocolBufferException {
+    ManagedSecretKey secretKey = (ManagedSecretKey) object;
+    return ByteString.copyFrom(secretKey.toProtobuf().toByteArray());
+  }
+
+  @Override
+  public Object deserialize(Class<?> type, ByteString value)
+      throws InvalidProtocolBufferException {
+    SCMSecurityProtocolProtos.ManagedSecretKey message =
+        SCMSecurityProtocolProtos.ManagedSecretKey.parseFrom(value);
+    return ManagedSecretKey.fromProtobuf(message);
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/security/ScmSecretKeyStateBuilder.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/security/ScmSecretKeyStateBuilder.java
new file mode 100644
index 0000000000..c689fd2db3
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/security/ScmSecretKeyStateBuilder.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.security;
+
+import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol;
+import org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler;
+import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeyState;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeyStateImpl;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeyStore;
+
+import java.lang.reflect.Proxy;
+
+/**
+ * Builder for {@link SecretKeyState} with a proper proxy to make @Replicate
+ * happen.
+ */
+public class ScmSecretKeyStateBuilder {
+  private SecretKeyStore secretKeyStore;
+  private SCMRatisServer scmRatisServer;
+
+  public ScmSecretKeyStateBuilder setSecretKeyStore(
+      SecretKeyStore secretKeyStore) {
+    this.secretKeyStore = secretKeyStore;
+    return this;
+  }
+
+  public ScmSecretKeyStateBuilder setRatisServer(
+      final SCMRatisServer ratisServer) {
+    scmRatisServer = ratisServer;
+    return this;
+  }
+
+  public SecretKeyState build() {
+    final SecretKeyState impl = new SecretKeyStateImpl(secretKeyStore);
+
+    final SCMHAInvocationHandler scmhaInvocationHandler =
+        new SCMHAInvocationHandler(SCMRatisProtocol.RequestType.SECRET_KEY,
+            impl, scmRatisServer);
+
+    return (SecretKeyState) Proxy.newProxyInstance(
+        SCMHAInvocationHandler.class.getClassLoader(),
+        new Class<?>[]{SecretKeyState.class}, scmhaInvocationHandler);
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/security/SecretKeyManagerService.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/security/SecretKeyManagerService.java
new file mode 100644
index 0000000000..27ce30a8a1
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/security/SecretKeyManagerService.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.security;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
+import org.apache.hadoop.hdds.scm.ha.SCMService;
+import org.apache.hadoop.hdds.security.symmetric.LocalSecretKeyStore;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeyConfig;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeyManager;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeyState;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeyStore;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.hadoop.ozone.OzoneConsts.SCM_CA_CERT_STORAGE_DIR;
+
+/**
+ * A background service running in SCM to maintain the SecretKeys lifecycle.
+ */
+public class SecretKeyManagerService implements SCMService, Runnable {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SecretKeyManagerService.class);
+
+  private final SCMContext scmContext;
+  private final SecretKeyManager secretKeyManager;
+  private final SecretKeyConfig secretKeyConfig;
+
+
+  /**
+   * SCMService related variables.
+   */
+  private final Lock serviceLock = new ReentrantLock();
+  private ServiceStatus serviceStatus = ServiceStatus.PAUSING;
+
+  private final ScheduledExecutorService scheduler;
+
+  @SuppressWarnings("parameternumber")
+  public SecretKeyManagerService(SCMContext scmContext,
+                                 ConfigurationSource conf,
+                                 SCMRatisServer ratisServer) {
+    this.scmContext = scmContext;
+
+    secretKeyConfig = new SecretKeyConfig(conf,
+        SCM_CA_CERT_STORAGE_DIR);
+    SecretKeyStore secretKeyStore = new LocalSecretKeyStore(
+        secretKeyConfig.getLocalSecretKeyFile());
+    SecretKeyState secretKeyState = new ScmSecretKeyStateBuilder()
+        .setSecretKeyStore(secretKeyStore)
+        .setRatisServer(ratisServer)
+        .build();
+    secretKeyManager = new SecretKeyManager(secretKeyState,
+        secretKeyStore, secretKeyConfig);
+
+    scheduler = Executors.newScheduledThreadPool(1,
+        new ThreadFactoryBuilder().setDaemon(true)
+            .setNameFormat(getServiceName())
+            .build());
+
+    start();
+  }
+
+  @Override
+  public void notifyStatusChanged() {
+    serviceLock.lock();
+    try {
+      if (scmContext.isLeaderReady()) {
+        // Asynchronously initialize SecretKeys for first time leader.
+        if (!secretKeyManager.isInitialized()) {
+          scheduler.schedule(() -> {
+            try {
+              secretKeyManager.checkAndInitialize();
+            } catch (TimeoutException e) {
+              throw new RuntimeException(
+                  "Timeout replicating initialized state.", e);
+            }
+          }, 0, TimeUnit.SECONDS);
+        }
+
+        serviceStatus = ServiceStatus.RUNNING;
+      } else {
+        serviceStatus = ServiceStatus.PAUSING;
+      }
+    } finally {
+      serviceLock.unlock();
+    }
+  }
+
+  @Override
+  public boolean shouldRun() {
+    serviceLock.lock();
+    try {
+      return serviceStatus == ServiceStatus.RUNNING;
+    } finally {
+      serviceLock.unlock();
+    }
+  }
+
+  @Override
+  public void run() {
+    if (!shouldRun()) {
+      return;
+    }
+
+    try {
+      secretKeyManager.checkAndRotate();
+    } catch (TimeoutException e) {
+      LOG.error("Error occurred when updating SecretKeys.", e);
+    }
+  }
+
+  @Override
+  public String getServiceName() {
+    return SecretKeyManagerService.class.getSimpleName();
+  }
+
+  @Override
+  public void start() {
+    LOG.info("Scheduling rotation checker with interval {}",
+        secretKeyConfig.getRotationCheckDuration());
+    scheduler.scheduleAtFixedRate(this, 0,
+        secretKeyConfig.getRotationCheckDuration().toMillis(),
+        TimeUnit.MILLISECONDS);
+  }
+
+  @Override
+  public void stop() {
+    scheduler.shutdownNow();
+  }
+
+  public static boolean isSecretKeyEnable(SecurityConfig conf) {
+    return conf.isSecurityEnabled() &&
+        (conf.isBlockTokenEnabled() || conf.isContainerTokenEnabled());
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/security/package-info.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/security/package-info.java
new file mode 100644
index 0000000000..296e7f0883
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/security/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Encapsulate classes dealing with security concern in SCM.
+ */
+package org.apache.hadoop.hdds.scm.security;
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 7d0fd4bd78..1d1a6be253 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
 import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator;
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.node.NodeAddressUpdateHandler;
+import org.apache.hadoop.hdds.scm.security.SecretKeyManagerService;
 import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationManager;
 import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationManagerImpl;
 import org.apache.hadoop.hdds.scm.ha.StatefulServiceStateManager;
@@ -189,6 +190,7 @@ import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_EVENT_REPORT_EXEC_WAIT_THRESHOLD_DEFAULT;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_EVENT_REPORT_QUEUE_WAIT_THRESHOLD_DEFAULT;
+import static 
org.apache.hadoop.hdds.scm.security.SecretKeyManagerService.isSecretKeyEnable;
 import static 
org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore.CertType.VALID_CERTS;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS;
 import static org.apache.hadoop.ozone.OzoneConsts.CRL_SEQUENCE_ID_KEY;
@@ -741,6 +743,13 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
 
     serviceManager.register(expiredContainerReplicaOpScrubber);
 
+    if (isSecretKeyEnable(securityConfig)) {
+      SecretKeyManagerService secretKeyManagerService =
+          new SecretKeyManagerService(scmContext, conf,
+              scmHAManager.getRatisServer());
+      serviceManager.register(secretKeyManagerService);
+    }
+
     if (configurator.getContainerManager() != null) {
       containerManager = configurator.getContainerManager();
     } else {
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config 
b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config
index 9be6e14a29..b57e5913e7 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config
@@ -162,3 +162,8 @@ no_proxy=om,scm,recon,s3g,kdc,localhost,127.0.0.1
 
 # Explicitly enable filesystem snapshot feature for this Docker compose cluster
 OZONE-SITE.XML_ozone.filesystem.snapshot.enabled=true
+
+
+OZONE-SITE.XML_hdds.secret.key.rotate.duration=5m
+OZONE-SITE.XML_hdds.secret.key.rotate.check.duration=1m
+OZONE-SITE.XML_hdds.secret.key.expiry.duration=1h
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to