kerneltime commented on code in PR #4194:
URL: https://github.com/apache/ozone/pull/4194#discussion_r1088504179


##########
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java:
##########
@@ -216,6 +216,28 @@ public final class HddsConfigKeys {
   public static final String HDDS_X509_ROOTCA_PRIVATE_KEY_FILE_DEFAULT =
       "";
 
+  public static final String HDDS_SECRET_KEY_FILE =

Review Comment:
   To make sure I understand the configurations. Keys expire in 
`HDDS_SECRET_KEY_EXPIRY_DURATION`; new keys are generated every 
`HDDS_SECRET_KEY_ROTATE_DURATION` and are checked for rotation every 
`HDDS_SECRET_KEY_ROTATE_CHECK_DURATION`? In that case, the defaults say keys 
are checked for rotation every 10 minutes, are valid for 7 days and a new one 
is generated daily. Thus, seven keys are floating around that are valid at any 
given time. Could you add some comments explaining this?
   



##########
hadoop-hdds/common/src/main/resources/ozone-default.xml:
##########
@@ -3523,4 +3523,49 @@
       Interval in MINUTES by Recon to request SCM DB Snapshot.
     </description>
   </property>
+  <property>
+    <name>hdds.secret.key.file.name</name>
+    <value>secret_keys.json</value>
+    <tag>SCM, SECURITY</tag>
+    <description>
+      Name of file which stores symmetric secret keys for token signatures.
+    </description>
+  </property>
+  <property>
+    <name>hdds.secret.key.expiry.duration</name>
+    <value>P7D</value>
+    <tag>SCM, SECURITY</tag>
+    <description>
+      The duration for which symmetric secret keys issued by SCM are valid.
+      The formats accepted are based on the ISO-8601 duration format 
PnDTnHnMn.nS
+    </description>
+  </property>
+  <property>
+    <name>hdds.secret.key.rotate.duration</name>
+    <value>P1D</value>
+    <tag>SCM, SECURITY</tag>
+    <description>
+      The duration that SCM periodically generate a new symmetric secret keys.
+      The formats accepted are based on the ISO-8601 duration format 
PnDTnHnMn.nS
+    </description>
+  </property>
+  <property>
+    <name>hdds.secret.key.rotate.check.duration</name>
+    <value>PT10M</value>

Review Comment:
   This could be in hours considering a day is the default.



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/LocalSecretKeyStore.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.security.symmetric;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.SequenceWriter;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.crypto.SecretKey;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.PosixFilePermission;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import static com.google.common.collect.Sets.newHashSet;
+import static java.nio.file.attribute.PosixFilePermission.OWNER_READ;
+import static java.nio.file.attribute.PosixFilePermission.OWNER_WRITE;
+import static java.util.stream.Collectors.toList;
+
+/**
+ * A {@link SecretKeyStore} that saves and loads SecretKeys from/to a
+ * JSON file on local file system.
+ */
+public class LocalSecretKeyStore implements SecretKeyStore {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(LocalSecretKeyStore.class);
+
+  private final Path secretKeysFile;
+  private final ObjectMapper mapper;
+
+  public LocalSecretKeyStore(Path secretKeysFile) {
+    this.secretKeysFile = secretKeysFile;
+    this.mapper = new ObjectMapper()
+        .registerModule(new JavaTimeModule())
+        .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
+  }
+
+  @Override
+  public synchronized List<ManagedSecretKey> load() {
+    if (!secretKeysFile.toFile().exists()) {
+      return Collections.emptyList();
+    }
+
+    ObjectReader reader = mapper.readerFor(ManagedSecretKeyDto.class);
+    try (MappingIterator<ManagedSecretKeyDto> iterator =
+             reader.readValues(secretKeysFile.toFile())) {
+      List<ManagedSecretKeyDto> dtos = iterator.readAll();
+      List<ManagedSecretKey> result = dtos.stream()
+          .map(ManagedSecretKeyDto::toObject)
+          .collect(toList());
+      LOG.info("Loaded {} from {}", result, secretKeysFile);
+      return result;
+    } catch (IOException e) {
+      throw new IllegalStateException("Error reading SecretKeys from "
+          + secretKeysFile, e);
+    }
+  }
+
+  @Override
+  public synchronized void save(Collection<ManagedSecretKey> secretKeys) {
+    setFileOwnerPermissions();
+
+    List<ManagedSecretKeyDto> dtos = secretKeys.stream()
+        .map(ManagedSecretKeyDto::new)
+        .collect(toList());
+
+    try (SequenceWriter writer =
+             mapper.writer().writeValues(secretKeysFile.toFile())) {
+      writer.init(true);
+      for (ManagedSecretKeyDto dto : dtos) {
+        writer.write(dto);
+      }
+    } catch (IOException e) {
+      throw new IllegalStateException("Error saving SecretKeys to file "
+          + secretKeysFile, e);
+    }
+    LOG.info("Saved {} to file {}", secretKeys, secretKeysFile);
+  }
+
+  private void setFileOwnerPermissions() {
+    Set<PosixFilePermission> permissions = newHashSet(OWNER_READ, OWNER_WRITE);

Review Comment:
   This can be a static unmodifiable set. 



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/LocalSecretKeyStore.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.security.symmetric;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.SequenceWriter;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.crypto.SecretKey;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.PosixFilePermission;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import static com.google.common.collect.Sets.newHashSet;
+import static java.nio.file.attribute.PosixFilePermission.OWNER_READ;
+import static java.nio.file.attribute.PosixFilePermission.OWNER_WRITE;
+import static java.util.stream.Collectors.toList;
+
+/**
+ * A {@link SecretKeyStore} that saves and loads SecretKeys from/to a
+ * JSON file on local file system.
+ */
+public class LocalSecretKeyStore implements SecretKeyStore {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(LocalSecretKeyStore.class);
+
+  private final Path secretKeysFile;
+  private final ObjectMapper mapper;
+
+  public LocalSecretKeyStore(Path secretKeysFile) {
+    this.secretKeysFile = secretKeysFile;
+    this.mapper = new ObjectMapper()
+        .registerModule(new JavaTimeModule())
+        .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
+  }
+
+  @Override
+  public synchronized List<ManagedSecretKey> load() {
+    if (!secretKeysFile.toFile().exists()) {
+      return Collections.emptyList();
+    }
+
+    ObjectReader reader = mapper.readerFor(ManagedSecretKeyDto.class);
+    try (MappingIterator<ManagedSecretKeyDto> iterator =
+             reader.readValues(secretKeysFile.toFile())) {
+      List<ManagedSecretKeyDto> dtos = iterator.readAll();
+      List<ManagedSecretKey> result = dtos.stream()
+          .map(ManagedSecretKeyDto::toObject)
+          .collect(toList());
+      LOG.info("Loaded {} from {}", result, secretKeysFile);
+      return result;
+    } catch (IOException e) {
+      throw new IllegalStateException("Error reading SecretKeys from "
+          + secretKeysFile, e);
+    }
+  }
+
+  @Override
+  public synchronized void save(Collection<ManagedSecretKey> secretKeys) {
+    setFileOwnerPermissions();

Review Comment:
   I think this can be changed to createFile which also sets permissions. Right 
now this method is responsible to create the file and set the permission.



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyManager.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.security.symmetric;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.crypto.KeyGenerator;
+import java.security.NoSuchAlgorithmException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+
+import static java.time.Duration.between;
+import static java.util.Comparator.comparing;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+
+/**
+ * This component manages symmetric SecretKey life-cycle, including generation,
+ * rotation and destruction.
+ */
+public class SecretKeyManager {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SecretKeyManager.class);
+
+  private final SecretKeyState state;
+  private boolean pendingInititializedState = false;
+  private final Duration rotationDuration;
+  private final Duration validityDuration;
+  private final SecretKeyStore keyStore;
+
+  private final KeyGenerator keyGenerator;
+
+  public SecretKeyManager(SecretKeyState state,
+                          SecretKeyStore keyStore,
+                          Duration rotationDuration,
+                          Duration validityDuration,
+                          String algorithm) {
+    this.state = requireNonNull(state);
+    this.rotationDuration = requireNonNull(rotationDuration);
+    this.validityDuration = requireNonNull(validityDuration);
+    this.keyStore = requireNonNull(keyStore);
+    this.keyGenerator = createKeyGenerator(algorithm);
+  }
+
+  public SecretKeyManager(SecretKeyState state,
+                          SecretKeyStore keyStore,
+                          SecretKeyConfig config) {
+    this(state, keyStore, config.getRotateDuration(),
+        config.getExpiryDuration(), config.getAlgorithm());
+  }
+
+  /**
+   * Initialize the state from by loading SecretKeys from local file, or
+   * generate new keys if the file doesn't exist.
+   *
+   * @throws TimeoutException can possibly occur when replicating the state.
+   */
+  public synchronized boolean initialize() {
+    if (state.getCurrentKey() != null) {
+      return false;
+    }
+
+    List<ManagedSecretKey> sortedKeys = keyStore.load()
+        .stream()
+        .filter(x -> !x.isExpired())
+        .sorted(comparing(ManagedSecretKey::getCreationTime))
+        .collect(toList());
+
+    ManagedSecretKey currentKey;
+    if (sortedKeys.isEmpty()) {
+      // First start, generate new key as the current key.
+      currentKey = generateSecretKey();
+      sortedKeys.add(currentKey);
+      LOG.info("No keys is loaded, generated new key: {}", currentKey);
+    } else {
+      // For restarts, reload allKeys and take the latest one as current.
+      currentKey = sortedKeys.get(sortedKeys.size() - 1);
+      LOG.info("Key reloaded, current key: {}, all keys: {}", currentKey,
+          sortedKeys);
+    }
+
+    // First, update the SecretKey state to make it visible immediately on the
+    // current instance.
+    state.updateKeysInternal(currentKey, sortedKeys);

Review Comment:
   Should the keys be agreed upon before serving them out?



##########
hadoop-hdds/common/src/main/resources/ozone-default.xml:
##########
@@ -3523,4 +3523,49 @@
       Interval in MINUTES by Recon to request SCM DB Snapshot.
     </description>
   </property>
+  <property>
+    <name>hdds.secret.key.file.name</name>
+    <value>secret_keys.json</value>
+    <tag>SCM, SECURITY</tag>
+    <description>
+      Name of file which stores symmetric secret keys for token signatures.
+    </description>
+  </property>
+  <property>
+    <name>hdds.secret.key.expiry.duration</name>
+    <value>P7D</value>

Review Comment:
   Nit: We follow a different convention in other places, it might make sense 
to stick with `TimeDurationUtil.java` which is used in the same config file for 
other durations.



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/LocalSecretKeyStore.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.security.symmetric;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.SequenceWriter;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.crypto.SecretKey;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.PosixFilePermission;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import static com.google.common.collect.Sets.newHashSet;
+import static java.nio.file.attribute.PosixFilePermission.OWNER_READ;
+import static java.nio.file.attribute.PosixFilePermission.OWNER_WRITE;
+import static java.util.stream.Collectors.toList;
+
+/**
+ * A {@link SecretKeyStore} that saves and loads SecretKeys from/to a
+ * JSON file on local file system.
+ */
+public class LocalSecretKeyStore implements SecretKeyStore {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(LocalSecretKeyStore.class);
+
+  private final Path secretKeysFile;
+  private final ObjectMapper mapper;
+
+  public LocalSecretKeyStore(Path secretKeysFile) {
+    this.secretKeysFile = secretKeysFile;
+    this.mapper = new ObjectMapper()
+        .registerModule(new JavaTimeModule())
+        .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
+  }
+
+  @Override
+  public synchronized List<ManagedSecretKey> load() {
+    if (!secretKeysFile.toFile().exists()) {
+      return Collections.emptyList();
+    }
+
+    ObjectReader reader = mapper.readerFor(ManagedSecretKeyDto.class);
+    try (MappingIterator<ManagedSecretKeyDto> iterator =
+             reader.readValues(secretKeysFile.toFile())) {
+      List<ManagedSecretKeyDto> dtos = iterator.readAll();
+      List<ManagedSecretKey> result = dtos.stream()
+          .map(ManagedSecretKeyDto::toObject)
+          .collect(toList());
+      LOG.info("Loaded {} from {}", result, secretKeysFile);
+      return result;
+    } catch (IOException e) {
+      throw new IllegalStateException("Error reading SecretKeys from "
+          + secretKeysFile, e);
+    }
+  }
+
+  @Override
+  public synchronized void save(Collection<ManagedSecretKey> secretKeys) {
+    setFileOwnerPermissions();
+
+    List<ManagedSecretKeyDto> dtos = secretKeys.stream()
+        .map(ManagedSecretKeyDto::new)
+        .collect(toList());
+
+    try (SequenceWriter writer =
+             mapper.writer().writeValues(secretKeysFile.toFile())) {
+      writer.init(true);
+      for (ManagedSecretKeyDto dto : dtos) {
+        writer.write(dto);
+      }
+    } catch (IOException e) {
+      throw new IllegalStateException("Error saving SecretKeys to file "
+          + secretKeysFile, e);
+    }
+    LOG.info("Saved {} to file {}", secretKeys, secretKeysFile);
+  }
+
+  private void setFileOwnerPermissions() {

Review Comment:
   ```suggestion
     private void createSecretKeysFile() {
   ```



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyManager.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.security.symmetric;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.crypto.KeyGenerator;
+import java.security.NoSuchAlgorithmException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+
+import static java.time.Duration.between;
+import static java.util.Comparator.comparing;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+
+/**
+ * This component manages symmetric SecretKey life-cycle, including generation,
+ * rotation and destruction.
+ */
+public class SecretKeyManager {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SecretKeyManager.class);
+
+  private final SecretKeyState state;
+  private boolean pendingInititializedState = false;
+  private final Duration rotationDuration;
+  private final Duration validityDuration;
+  private final SecretKeyStore keyStore;
+
+  private final KeyGenerator keyGenerator;
+
+  public SecretKeyManager(SecretKeyState state,
+                          SecretKeyStore keyStore,
+                          Duration rotationDuration,
+                          Duration validityDuration,
+                          String algorithm) {
+    this.state = requireNonNull(state);
+    this.rotationDuration = requireNonNull(rotationDuration);
+    this.validityDuration = requireNonNull(validityDuration);
+    this.keyStore = requireNonNull(keyStore);
+    this.keyGenerator = createKeyGenerator(algorithm);
+  }
+
+  public SecretKeyManager(SecretKeyState state,
+                          SecretKeyStore keyStore,
+                          SecretKeyConfig config) {
+    this(state, keyStore, config.getRotateDuration(),
+        config.getExpiryDuration(), config.getAlgorithm());
+  }
+
+  /**
+   * Initialize the state from by loading SecretKeys from local file, or
+   * generate new keys if the file doesn't exist.
+   *
+   * @throws TimeoutException can possibly occur when replicating the state.
+   */
+  public synchronized boolean initialize() {
+    if (state.getCurrentKey() != null) {
+      return false;
+    }
+
+    List<ManagedSecretKey> sortedKeys = keyStore.load()
+        .stream()
+        .filter(x -> !x.isExpired())
+        .sorted(comparing(ManagedSecretKey::getCreationTime))
+        .collect(toList());
+
+    ManagedSecretKey currentKey;
+    if (sortedKeys.isEmpty()) {
+      // First start, generate new key as the current key.
+      currentKey = generateSecretKey();
+      sortedKeys.add(currentKey);
+      LOG.info("No keys is loaded, generated new key: {}", currentKey);
+    } else {
+      // For restarts, reload allKeys and take the latest one as current.
+      currentKey = sortedKeys.get(sortedKeys.size() - 1);
+      LOG.info("Key reloaded, current key: {}, all keys: {}", currentKey,
+          sortedKeys);
+    }
+
+    // First, update the SecretKey state to make it visible immediately on the
+    // current instance.
+    state.updateKeysInternal(currentKey, sortedKeys);
+    // Then, remember to replicate SecretKey states to all instances.
+    pendingInititializedState = true;
+    return true;
+  }
+
+  public synchronized void flushInitializedState() throws TimeoutException {

Review Comment:
   Add a javadoc for what this method is expected to do



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/LocalSecretKeyStore.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.security.symmetric;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.SequenceWriter;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.crypto.SecretKey;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.PosixFilePermission;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import static com.google.common.collect.Sets.newHashSet;
+import static java.nio.file.attribute.PosixFilePermission.OWNER_READ;
+import static java.nio.file.attribute.PosixFilePermission.OWNER_WRITE;
+import static java.util.stream.Collectors.toList;
+
+/**
+ * A {@link SecretKeyStore} that saves and loads SecretKeys from/to a
+ * JSON file on local file system.
+ */
+public class LocalSecretKeyStore implements SecretKeyStore {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(LocalSecretKeyStore.class);
+
+  private final Path secretKeysFile;
+  private final ObjectMapper mapper;
+
+  public LocalSecretKeyStore(Path secretKeysFile) {
+    this.secretKeysFile = secretKeysFile;
+    this.mapper = new ObjectMapper()
+        .registerModule(new JavaTimeModule())
+        .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
+  }
+
+  @Override
+  public synchronized List<ManagedSecretKey> load() {
+    if (!secretKeysFile.toFile().exists()) {
+      return Collections.emptyList();
+    }
+
+    ObjectReader reader = mapper.readerFor(ManagedSecretKeyDto.class);
+    try (MappingIterator<ManagedSecretKeyDto> iterator =
+             reader.readValues(secretKeysFile.toFile())) {
+      List<ManagedSecretKeyDto> dtos = iterator.readAll();
+      List<ManagedSecretKey> result = dtos.stream()
+          .map(ManagedSecretKeyDto::toObject)
+          .collect(toList());
+      LOG.info("Loaded {} from {}", result, secretKeysFile);
+      return result;
+    } catch (IOException e) {
+      throw new IllegalStateException("Error reading SecretKeys from "
+          + secretKeysFile, e);
+    }
+  }
+
+  @Override
+  public synchronized void save(Collection<ManagedSecretKey> secretKeys) {
+    setFileOwnerPermissions();
+
+    List<ManagedSecretKeyDto> dtos = secretKeys.stream()
+        .map(ManagedSecretKeyDto::new)
+        .collect(toList());
+
+    try (SequenceWriter writer =
+             mapper.writer().writeValues(secretKeysFile.toFile())) {
+      writer.init(true);
+      for (ManagedSecretKeyDto dto : dtos) {
+        writer.write(dto);
+      }
+    } catch (IOException e) {
+      throw new IllegalStateException("Error saving SecretKeys to file "
+          + secretKeysFile, e);
+    }
+    LOG.info("Saved {} to file {}", secretKeys, secretKeysFile);
+  }
+
+  private void setFileOwnerPermissions() {
+    Set<PosixFilePermission> permissions = newHashSet(OWNER_READ, OWNER_WRITE);
+    try {
+      if (!Files.exists(secretKeysFile)) {
+        if (!Files.exists(secretKeysFile.getParent())) {
+          Files.createDirectories(secretKeysFile.getParent());
+        }
+        Files.createFile(secretKeysFile);
+      }
+      Files.setPosixFilePermissions(secretKeysFile, permissions);
+    } catch (IOException e) {

Review Comment:
   Unlikely but include `FileAlreadyExistsException`



##########
hadoop-hdds/common/src/main/resources/ozone-default.xml:
##########
@@ -3523,4 +3523,49 @@
       Interval in MINUTES by Recon to request SCM DB Snapshot.
     </description>
   </property>
+  <property>
+    <name>hdds.secret.key.file.name</name>
+    <value>secret_keys.json</value>
+    <tag>SCM, SECURITY</tag>
+    <description>
+      Name of file which stores symmetric secret keys for token signatures.
+    </description>
+  </property>
+  <property>
+    <name>hdds.secret.key.expiry.duration</name>
+    <value>P7D</value>
+    <tag>SCM, SECURITY</tag>
+    <description>
+      The duration for which symmetric secret keys issued by SCM are valid.
+      The formats accepted are based on the ISO-8601 duration format 
PnDTnHnMn.nS
+    </description>
+  </property>
+  <property>
+    <name>hdds.secret.key.rotate.duration</name>
+    <value>P1D</value>
+    <tag>SCM, SECURITY</tag>
+    <description>
+      The duration that SCM periodically generate a new symmetric secret keys.
+      The formats accepted are based on the ISO-8601 duration format 
PnDTnHnMn.nS
+    </description>
+  </property>
+  <property>
+    <name>hdds.secret.key.rotate.check.duration</name>
+    <value>PT10M</value>
+    <tag>SCM, SECURITY</tag>
+    <description>
+      The duration that SCM periodically checks if it's time to generate new 
symmetric secret keys.
+      This must be smaller than hdds.secret.key.rotate.duration.
+      The formats accepted are based on the ISO-8601 duration format 
PnDTnHnMn.nS
+    </description>
+  </property>
+  <property>
+    <name>hdds.secret.key.algorithm</name>
+    <value>HmacSHA256</value>
+    <tag>SCM, SECURITY</tag>
+    <description>
+      The algorithm that SCM uses to generate symmetric secret keys.
+      The formats accepted are based on the ISO-8601 duration format 
PnDTnHnMn.nS

Review Comment:
   ```suggestion
         The algorithm that SCM uses to generate symmetric secret keys.
   ```



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/LocalSecretKeyStore.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.security.symmetric;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.SequenceWriter;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.crypto.SecretKey;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.PosixFilePermission;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import static com.google.common.collect.Sets.newHashSet;
+import static java.nio.file.attribute.PosixFilePermission.OWNER_READ;
+import static java.nio.file.attribute.PosixFilePermission.OWNER_WRITE;
+import static java.util.stream.Collectors.toList;
+
+/**
+ * A {@link SecretKeyStore} that saves and loads SecretKeys from/to a
+ * JSON file on local file system.
+ */
+public class LocalSecretKeyStore implements SecretKeyStore {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(LocalSecretKeyStore.class);
+
+  private final Path secretKeysFile;
+  private final ObjectMapper mapper;
+
+  public LocalSecretKeyStore(Path secretKeysFile) {
+    this.secretKeysFile = secretKeysFile;
+    this.mapper = new ObjectMapper()
+        .registerModule(new JavaTimeModule())
+        .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
+  }
+
+  @Override
+  public synchronized List<ManagedSecretKey> load() {
+    if (!secretKeysFile.toFile().exists()) {
+      return Collections.emptyList();
+    }
+
+    ObjectReader reader = mapper.readerFor(ManagedSecretKeyDto.class);
+    try (MappingIterator<ManagedSecretKeyDto> iterator =
+             reader.readValues(secretKeysFile.toFile())) {
+      List<ManagedSecretKeyDto> dtos = iterator.readAll();
+      List<ManagedSecretKey> result = dtos.stream()
+          .map(ManagedSecretKeyDto::toObject)
+          .collect(toList());
+      LOG.info("Loaded {} from {}", result, secretKeysFile);
+      return result;
+    } catch (IOException e) {
+      throw new IllegalStateException("Error reading SecretKeys from "
+          + secretKeysFile, e);
+    }
+  }
+
+  @Override
+  public synchronized void save(Collection<ManagedSecretKey> secretKeys) {
+    setFileOwnerPermissions();
+
+    List<ManagedSecretKeyDto> dtos = secretKeys.stream()
+        .map(ManagedSecretKeyDto::new)
+        .collect(toList());
+
+    try (SequenceWriter writer =
+             mapper.writer().writeValues(secretKeysFile.toFile())) {
+      writer.init(true);
+      for (ManagedSecretKeyDto dto : dtos) {
+        writer.write(dto);
+      }
+    } catch (IOException e) {
+      throw new IllegalStateException("Error saving SecretKeys to file "
+          + secretKeysFile, e);
+    }
+    LOG.info("Saved {} to file {}", secretKeys, secretKeysFile);
+  }
+
+  private void setFileOwnerPermissions() {
+    Set<PosixFilePermission> permissions = newHashSet(OWNER_READ, OWNER_WRITE);
+    try {
+      if (!Files.exists(secretKeysFile)) {
+        if (!Files.exists(secretKeysFile.getParent())) {
+          Files.createDirectories(secretKeysFile.getParent());
+        }
+        Files.createFile(secretKeysFile);
+      }
+      Files.setPosixFilePermissions(secretKeysFile, permissions);
+    } catch (IOException e) {
+      throw new IllegalStateException("Error setting secret keys file" +
+          " permission: " + secretKeysFile, e);
+    }
+  }
+
+  private static class ManagedSecretKeyDto {
+    private UUID id;

Review Comment:
   Add a monotonically increasing count to avoid depending on wallclock time to 
sort the keys. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to