This is an automated email from the ASF dual-hosted git repository.

pifta pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 366c2fb57a HDDS-1059. Recover public key and certificate with private 
key present for DN and Recon (#5404)
366c2fb57a is described below

commit 366c2fb57ae15acbb4c357086328ddec2faeb4f3
Author: Sammi Chen <[email protected]>
AuthorDate: Thu Nov 16 09:32:04 2023 +0800

    HDDS-1059. Recover public key and certificate with private key present for 
DN and Recon (#5404)
---
 .../x509/certificate/client/CertificateClient.java |   2 +-
 .../apache/hadoop/ozone/HddsDatanodeService.java   |  27 +--
 .../hadoop/ozone/TestHddsSecureDatanodeInit.java   |  36 ++--
 .../client/DefaultCertificateClient.java           | 137 +++++++++++---
 .../certificate/client/SCMCertificateClient.java   | 176 ++++++++++++++++++
 .../client/CertificateClientTestImpl.java          |   3 +-
 .../client/TestDnCertificateClientInit.java        |   4 +-
 .../apache/hadoop/hdds/scm/ha/HASecurityUtils.java | 205 ++-------------------
 .../ozonesecure/public-key-cert-recovery-test.sh   |  35 ++++
 .../dist/src/main/compose/ozonesecure/test.sh      |   3 +
 .../apache/hadoop/ozone/TestDelegationToken.java   |  37 +++-
 .../hadoop/ozone/TestSecureOzoneCluster.java       | 121 +++++++-----
 .../hadoop/ozone/om/TestSecureOzoneManager.java    |   7 +-
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  35 ++--
 .../security/TestOmCertificateClientInit.java      |   5 +-
 .../TestOzoneDelegationTokenSecretManager.java     |   5 +-
 .../org/apache/hadoop/ozone/recon/ReconServer.java |  24 +--
 17 files changed, 488 insertions(+), 374 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java
index aa1217598b..4798244985 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java
@@ -248,7 +248,7 @@ public interface CertificateClient extends Closeable {
    * Initialize certificate client.
    *
    * */
-  InitResponse init() throws CertificateException;
+  void initWithRecovery() throws IOException;
 
   /**
    * Represents initialization response of client.
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
index c9934c41bf..3cd0477ffd 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
@@ -48,7 +48,6 @@ import 
org.apache.hadoop.hdds.security.symmetric.DefaultSecretKeyClient;
 import org.apache.hadoop.hdds.security.symmetric.SecretKeyClient;
 import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import 
org.apache.hadoop.hdds.security.x509.certificate.client.DNCertificateClient;
-import 
org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateSignRequest;
 import org.apache.hadoop.hdds.server.http.HttpConfig;
 import org.apache.hadoop.hdds.server.OzoneAdmins;
 import org.apache.hadoop.hdds.server.http.RatisDropwizardExports;
@@ -393,31 +392,7 @@ public class HddsDatanodeService extends GenericCli 
implements ServicePlugin {
           this::terminateDatanode);
       certClient = dnCertClient;
     }
-    CertificateClient.InitResponse response = certClient.init();
-    LOG.info("Init response: {}", response);
-    switch (response) {
-    case SUCCESS:
-      LOG.info("Initialization successful, case:{}.", response);
-      break;
-    case GETCERT:
-      CertificateSignRequest.Builder csrBuilder = certClient.getCSRBuilder();
-      String dnCertSerialId =
-          certClient.signAndStoreCertificate(csrBuilder.build());
-      // persist cert ID to VERSION file
-      datanodeDetails.setCertSerialId(dnCertSerialId);
-      persistDatanodeDetails(datanodeDetails);
-      LOG.info("Successfully stored SCM signed certificate, case:{}.",
-          response);
-      break;
-    case FAILURE:
-      LOG.error("DN security initialization failed, case:{}.", response);
-      throw new RuntimeException("DN security initialization failed.");
-    default:
-      LOG.error("DN security initialization failed. Init response: {}",
-          response);
-      throw new RuntimeException("DN security initialization failed.");
-    }
-
+    certClient.initWithRecovery();
     return certClient;
   }
 
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/TestHddsSecureDatanodeInit.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/TestHddsSecureDatanodeInit.java
index 2f592b4566..8c3558879a 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/TestHddsSecureDatanodeInit.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/TestHddsSecureDatanodeInit.java
@@ -52,7 +52,6 @@ import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_CA_ROTATION_ACK_TI
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_CA_ROTATION_CHECK_INTERNAL;
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_GRACE_DURATION_TOKEN_CHECKS_ENABLED;
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_RENEW_GRACE_DURATION;
-import static org.apache.hadoop.ozone.HddsDatanodeService.getLogger;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
 import static org.mockito.ArgumentMatchers.anyObject;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -116,7 +115,6 @@ public class TestHddsSecureDatanodeInit {
         return mock(SCMSecurityProtocolClientSideTranslatorPB.class);
       }
     };
-    dnLogs = GenericTestUtils.LogCapturer.captureLogs(getLogger());
     callQuietly(() -> {
       service.start(conf);
       return null;
@@ -125,6 +123,8 @@ public class TestHddsSecureDatanodeInit {
       service.initializeCertificateClient(service.getCertificateClient());
       return null;
     });
+    dnLogs = GenericTestUtils.LogCapturer.captureLogs(
+        ((DNCertificateClient)service.getCertificateClient()).getLogger());
     certCodec = new CertificateCodec(securityConfig, DN_COMPONENT);
     keyCodec = new KeyCodec(securityConfig, DN_COMPONENT);
     dnLogs.clearOutput();
@@ -157,7 +157,7 @@ public class TestHddsSecureDatanodeInit {
         securityConfig.getCertificateFileName()).toFile());
     dnLogs.clearOutput();
     client = new DNCertificateClient(securityConfig, scmClient, 
datanodeDetails,
-        certHolder.getSerialNumber().toString(), null, null);
+        certHolder.getSerialNumber().toString(), id -> { }, null);
   }
 
   @AfterEach
@@ -233,17 +233,31 @@ public class TestHddsSecureDatanodeInit {
   public void testSecureDnStartupCase4() throws Exception {
     // Case 4: When public key as well as certificate is missing.
     keyCodec.writePrivateKey(privateKey);
-    RuntimeException rteException = Assertions.assertThrows(
-        RuntimeException.class,
-        () -> service.initializeCertificateClient(client));
-    Assertions.assertTrue(rteException.getMessage()
-        .contains("DN security initialization failed"));
+    // provide a new valid SCMGetCertResponseProto
+    X509CertificateHolder newCertHolder = generateX509CertHolder(null, null,
+        Duration.ofSeconds(CERT_LIFETIME));
+    String pemCert = CertificateCodec.getPEMEncodedString(newCertHolder);
+    // provide an invalid SCMGetCertResponseProto. Without
+    // setX509CACertificate(pemCert), signAndStoreCert will throw exception.
+    SCMSecurityProtocolProtos.SCMGetCertResponseProto responseProto =
+        SCMSecurityProtocolProtos.SCMGetCertResponseProto
+            .newBuilder().setResponseCode(SCMSecurityProtocolProtos
+                .SCMGetCertResponseProto.ResponseCode.success)
+            .setX509Certificate(pemCert)
+            .setX509CACertificate(pemCert)
+            .build();
+    when(scmClient.getDataNodeCertificateChain(anyObject(), anyString()))
+        .thenReturn(responseProto);
+    service.initializeCertificateClient(client);
     Assertions.assertNotNull(client.getPrivateKey());
-    Assertions.assertNull(client.getPublicKey());
-    Assertions.assertNull(client.getCertificate());
+    Assertions.assertNotNull(client.getPublicKey());
+    Assertions.assertNotNull(client.getCertificate());
     Assertions.assertTrue(dnLogs.getOutput()
-        .contains("Init response: FAILURE"));
+        .contains("Init response: GETCERT"));
     dnLogs.clearOutput();
+    // reset scmClient behavior
+    when(scmClient.getDataNodeCertificateChain(anyObject(), anyString()))
+        .thenReturn(null);
   }
 
   @Test
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java
index e18a7934b9..403295aebf 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java
@@ -28,6 +28,7 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.StandardCopyOption;
 import java.security.InvalidKeyException;
+import java.security.KeyFactory;
 import java.security.KeyPair;
 import java.security.NoSuchAlgorithmException;
 import java.security.NoSuchProviderException;
@@ -38,7 +39,9 @@ import java.security.Signature;
 import java.security.SignatureException;
 import java.security.cert.CertPath;
 import java.security.cert.X509Certificate;
+import java.security.interfaces.RSAPrivateCrtKey;
 import java.security.spec.InvalidKeySpecException;
+import java.security.spec.RSAPublicKeySpec;
 import java.time.Duration;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
@@ -328,7 +331,7 @@ public abstract class DefaultCertificateClient implements 
CertificateClient {
    * @return public key or Null if there is no data.
    */
   @Override
-  public PublicKey getPublicKey() {
+  public synchronized PublicKey getPublicKey() {
     if (publicKey != null) {
       return publicKey;
     }
@@ -657,39 +660,48 @@ public abstract class DefaultCertificateClient implements 
CertificateClient {
    * 2. Generates and stores a keypair.
    * 3. Try to recover public key if private key and certificate is present
    *    but public key is missing.
+   * 4. Try to refetch certificate if public key and private key are present
+   *    but certificate is missing.
+   * 5. Try to recover public key from private key(RSA only) if private key
+   *    is present but public key and certificate are missing, and refetch
+   *    certificate.
    *
    * Truth table:
+   *  +--------------+---------------+--------------+---------------------+
+   *  | Private Key  | Public Keys   | Certificate  |   Result            |
+   *  +--------------+---------------+--------------+---------------------+
+   *  | False  (0)   | False   (0)   | False  (0)   |   GETCERT->SUCCESS  |
+   *  | False  (0)   | False   (0)   | True   (1)   |   FAILURE           |
+   *  | False  (0)   | True    (1)   | False  (0)   |   FAILURE           |
+   *  | False  (0)   | True    (1)   | True   (1)   |   FAILURE           |
+   *  | True   (1)   | False   (0)   | False  (0)   |   GETCERT->SUCCESS  |
+   *  | True   (1)   | False   (0)   | True   (1)   |   SUCCESS           |
+   *  | True   (1)   | True    (1)   | False  (0)   |   GETCERT->SUCCESS  |
+   *  | True   (1)   | True    (1)   | True   (1)   |   SUCCESS           |
    *  +--------------+-----------------+--------------+----------------+
-   *  | Private Key  | Public Keys     | Certificate  |   Result       |
-   *  +--------------+-----------------+--------------+----------------+
-   *  | False  (0)   | False   (0)     | False  (0)   |   GETCERT  000 |
-   *  | False  (0)   | False   (0)     | True   (1)   |   FAILURE  001 |
-   *  | False  (0)   | True    (1)     | False  (0)   |   FAILURE  010 |
-   *  | False  (0)   | True    (1)     | True   (1)   |   FAILURE  011 |
-   *  | True   (1)   | False   (0)     | False  (0)   |   FAILURE  100 |
-   *  | True   (1)   | False   (0)     | True   (1)   |   SUCCESS  101 |
-   *  | True   (1)   | True    (1)     | False  (0)   |   GETCERT  110 |
-   *  | True   (1)   | True    (1)     | True   (1)   |   SUCCESS  111 |
-   *  +--------------+-----------------+--------------+----------------+
-   *
-   * @return InitResponse
-   * Returns FAILURE in following cases:
-   * 1. If private key is missing but public key or certificate is available.
-   * 2. If public key and certificate is missing.
    *
-   * Returns SUCCESS in following cases:
+   * Success in following cases:
    * 1. If keypair as well certificate is available.
    * 2. If private key and certificate is available and public key is
    *    recovered successfully.
+   * 3. If private key and public key are present while certificate is
+   *    missing, certificate is refetched successfully.
+   * 4. If private key is present while public key and certificate are missing,
+   *    public key is recovered and certificate is refetched successfully.
    *
-   * Returns GETCERT in following cases:
-   * 1. First time when keypair and certificate is not available, keypair
-   *    will be generated and stored at configured location.
-   * 2. When keypair (public/private key) is available but certificate is
-   *    missing.
+   * Throw exception in following cases:
+   * 1. If private key is missing.
+   * 2. If private key or certificate is present, public key is missing,
+   *    and cannot recover public key from private key or certificate
+   * 3. If refetch certificate fails.
    */
   @Override
-  public synchronized InitResponse init() throws CertificateException {
+  public synchronized void initWithRecovery() throws IOException {
+    recoverStateIfNeeded(init());
+  }
+
+  @VisibleForTesting
+  public synchronized InitResponse init() throws IOException {
     int initCase = 0;
     PrivateKey pvtKey = getPrivateKey();
     PublicKey pubKey = getPublicKey();
@@ -737,9 +749,11 @@ public abstract class DefaultCertificateClient implements 
CertificateClient {
     case PRIVATE_KEY:
       getLogger().info("Found private key but public key and certificate " +
           "is missing.");
-      // TODO: Recovering public key from private might be possible in some
-      //  cases.
-      return FAILURE;
+      if (recoverPublicKeyFromPrivateKey()) {
+        return GETCERT;
+      } else {
+        return FAILURE;
+      }
     case PUBLICKEY_CERT:
       getLogger().error("Found public key and certificate but private " +
           "key is missing.");
@@ -772,11 +786,43 @@ public abstract class DefaultCertificateClient implements 
CertificateClient {
     default:
       getLogger().error("Unexpected case: {} (private/public/cert)",
           Integer.toBinaryString(init.ordinal()));
-
       return FAILURE;
     }
   }
 
+
+  /**
+   * Recover the state if needed.
+   * */
+  protected void recoverStateIfNeeded(InitResponse state) throws IOException {
+    String upperCaseComponent = component.toUpperCase();
+    getLogger().info("Init response: {}", state);
+    switch (state) {
+    case SUCCESS:
+      getLogger().info("Initialization successful, case:{}.", state);
+      break;
+    case GETCERT:
+      String certId = signAndStoreCertificate(getCSRBuilder().build());
+      if (certIdSaveCallback != null) {
+        certIdSaveCallback.accept(certId);
+      } else {
+        throw new RuntimeException(upperCaseComponent + " doesn't have " +
+            "the certIdSaveCallback set. The new " +
+            "certificate ID " + certId + " cannot be persisted to " +
+            "the VERSION file");
+      }
+      getLogger().info("Successfully stored {} signed certificate, case:{}.",
+          upperCaseComponent, state);
+      break;
+    case FAILURE:
+    default:
+      getLogger().error("{} security initialization failed. " +
+          "Init response: {}", upperCaseComponent, state);
+      throw new RuntimeException(upperCaseComponent +
+          " security initialization failed.");
+    }
+  }
+
   /**
    * Validate keypair and certificate.
    * */
@@ -822,6 +868,39 @@ public abstract class DefaultCertificateClient implements 
CertificateClient {
     return true;
   }
 
+  /**
+   * Tries to recover public key from private key. Also validates recovered
+   * public key.
+   * */
+  protected boolean recoverPublicKeyFromPrivateKey()
+      throws CertificateException {
+    PrivateKey priKey = getPrivateKey();
+    try {
+      if (priKey != null && priKey instanceof RSAPrivateCrtKey) {
+        // if it's RSA private key
+        RSAPrivateCrtKey rsaCrtKey = (RSAPrivateCrtKey) priKey;
+        RSAPublicKeySpec rsaPublicKeySpec = new RSAPublicKeySpec(
+            rsaCrtKey.getModulus(), rsaCrtKey.getPublicExponent());
+        PublicKey pubKey = KeyFactory.getInstance(securityConfig.getKeyAlgo())
+            .generatePublic(rsaPublicKeySpec);
+        if (validateKeyPair(pubKey)) {
+          keyCodec.writePublicKey(pubKey);
+          publicKey = pubKey;
+          getLogger().info("Public key is recovered from the private key.");
+          return true;
+        }
+      }
+    } catch (InvalidKeySpecException | NoSuchAlgorithmException |
+             IOException e) {
+      throw new CertificateException("Error while trying to recover " +
+          "public key.", e, BOOTSTRAP_ERROR);
+    }
+
+    getLogger().error("Can't recover public key " +
+        "corresponding to private key.");
+    return false;
+  }
+
   /**
    * Validates public and private key of certificate client.
    *
@@ -831,7 +910,7 @@ public abstract class DefaultCertificateClient implements 
CertificateClient {
       throws CertificateException {
     byte[] challenge =
         RandomStringUtils.random(1000, 0, 0, false, false, null,
-                new SecureRandom()).getBytes(StandardCharsets.UTF_8);
+            new SecureRandom()).getBytes(StandardCharsets.UTF_8);
     return verifySignature(challenge, signData(challenge), pubKey);
   }
 
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/SCMCertificateClient.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/SCMCertificateClient.java
index b01efd31bb..c4c07fce83 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/SCMCertificateClient.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/SCMCertificateClient.java
@@ -25,26 +25,41 @@ import 
org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslator
 import org.apache.hadoop.hdds.security.SecurityConfig;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.security.x509.certificate.authority.CAType;
+import 
org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateServer;
+import 
org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore;
+import 
org.apache.hadoop.hdds.security.x509.certificate.authority.DefaultCAServer;
+import 
org.apache.hadoop.hdds.security.x509.certificate.authority.profile.DefaultCAProfile;
+import 
org.apache.hadoop.hdds.security.x509.certificate.authority.profile.PKIProfile;
 import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
 import 
org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateSignRequest;
 import org.apache.hadoop.hdds.security.x509.exception.CertificateException;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.OzoneSecurityUtil;
+import org.bouncycastle.cert.X509CertificateHolder;
 import org.bouncycastle.pkcs.PKCS10CertificationRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.math.BigInteger;
+import java.net.InetAddress;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.security.KeyPair;
+import java.security.cert.CertPath;
 import java.security.cert.X509Certificate;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.function.Consumer;
 
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType.SCM;
+import static 
org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateApprover.ApprovalType.KERBEROS_TRUSTED;
 import static 
org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateSignRequest.getEncodedString;
+import static org.apache.hadoop.ozone.OzoneConsts.SCM_ROOT_CA_COMPONENT_NAME;
+import static org.apache.hadoop.ozone.OzoneConsts.SCM_ROOT_CA_PREFIX;
 import static org.apache.hadoop.ozone.OzoneConsts.SCM_SUB_CA_PREFIX;
 
 /**
@@ -64,6 +79,22 @@ public class SCMCertificateClient extends 
DefaultCertificateClient {
   private String cId;
   private String scmHostname;
   private ExecutorService executorService;
+  private boolean isPrimarySCM = false;
+  private Consumer<String> saveCertIdCallback;
+
+  @SuppressWarnings("parameternumber")
+  public SCMCertificateClient(SecurityConfig securityConfig,
+      SCMSecurityProtocolClientSideTranslatorPB scmClient,
+      String scmId, String clusterId, String scmCertId, String hostname,
+      boolean isPrimarySCM, Consumer<String> saveCertId) {
+    super(securityConfig, scmClient, LOG, scmCertId, COMPONENT_NAME,
+        HddsUtils.threadNamePrefix(scmId), saveCertId, null);
+    this.scmId = scmId;
+    this.cId = clusterId;
+    this.scmHostname = hostname;
+    this.isPrimarySCM = isPrimarySCM;
+    this.saveCertIdCallback = saveCertId;
+  }
 
   public SCMCertificateClient(SecurityConfig securityConfig,
       SCMSecurityProtocolClientSideTranslatorPB scmClient,
@@ -248,4 +279,149 @@ public class SCMCertificateClient extends 
DefaultCertificateClient {
       executorService = null;
     }
   }
+
+  @Override
+  protected void recoverStateIfNeeded(InitResponse state) throws IOException {
+    LOG.info("Init response: {}", state);
+    switch (state) {
+    case SUCCESS:
+      LOG.info("Initialization successful.");
+      break;
+    case GETCERT:
+      if (!isPrimarySCM) {
+        getRootCASignedSCMCert();
+      } else {
+        getPrimarySCMSelfSignedCert();
+      }
+      LOG.info("Successfully stored SCM signed certificate.");
+      break;
+    case FAILURE:
+    default:
+      LOG.error("SCM security initialization failed. Init response: {}",
+          state);
+      throw new RuntimeException("SCM security initialization failed.");
+    }
+  }
+
+  /**
+   * For bootstrapped SCM get sub-ca signed certificate and root CA
+   * certificate using scm security client and store it using certificate
+   * client.
+   */
+  private void getRootCASignedSCMCert() {
+    try {
+      // Generate CSR.
+      PKCS10CertificationRequest csr = getCSRBuilder().build();
+      HddsProtos.ScmNodeDetailsProto scmNodeDetailsProto =
+          HddsProtos.ScmNodeDetailsProto.newBuilder()
+              .setClusterId(cId)
+              .setHostName(scmHostname)
+              .setScmNodeId(scmId).build();
+
+      // Get SCM sub CA cert.
+      SCMGetCertResponseProto response = getScmSecureClient().
+          getSCMCertChain(scmNodeDetailsProto, getEncodedString(csr), false);
+      String pemEncodedCert = response.getX509Certificate();
+
+      // Store SCM sub CA and root CA certificate.
+      if (response.hasX509CACertificate()) {
+        String pemEncodedRootCert = response.getX509CACertificate();
+        storeCertificate(pemEncodedRootCert, CAType.SUBORDINATE);
+        storeCertificate(pemEncodedCert, CAType.NONE);
+        //note: this does exactly the same as store certificate
+        persistSubCACertificate(pemEncodedCert);
+
+        X509Certificate certificate =
+            CertificateCodec.getX509Certificate(pemEncodedCert);
+        // Persist scm cert serial ID.
+        saveCertIdCallback.accept(certificate.getSerialNumber().toString());
+      } else {
+        throw new RuntimeException("Unable to retrieve SCM certificate chain");
+      }
+    } catch (IOException | java.security.cert.CertificateException e) {
+      LOG.error("Error while fetching/storing SCM signed certificate.", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+
+  /**
+   * For primary SCM get sub-ca signed certificate and root CA certificate by
+   * root CA certificate server and store it using certificate client.
+   */
+  private void getPrimarySCMSelfSignedCert() {
+    try {
+      CertificateServer rootCAServer = initializeRootCertificateServer(
+          getSecurityConfig(), null, BigInteger.ONE,
+          new DefaultCAProfile(), SCM_ROOT_CA_COMPONENT_NAME);
+      CertPath rootCACertificatePath = rootCAServer.getCaCertPath();
+      String pemEncodedRootCert =
+          CertificateCodec.getPEMEncodedString(rootCACertificatePath);
+
+      PKCS10CertificationRequest csr = getCSRBuilder().build();
+      CertPath subSCMCertHolderList = rootCAServer.requestCertificate(
+          csr, KERBEROS_TRUSTED, SCM,
+              BigInteger.ONE.add(BigInteger.ONE).toString()).get();
+      String pemEncodedCert =
+          CertificateCodec.getPEMEncodedString(subSCMCertHolderList);
+
+      storeCertificate(pemEncodedRootCert, CAType.SUBORDINATE);
+      storeCertificate(pemEncodedCert, CAType.NONE);
+      //note: this does exactly the same as store certificate
+      persistSubCACertificate(pemEncodedCert);
+      X509Certificate cert =
+          (X509Certificate) subSCMCertHolderList.getCertificates().get(0);
+      X509CertificateHolder subSCMCertHolder =
+          CertificateCodec.getCertificateHolder(cert);
+
+      // Persist scm cert serial ID.
+      saveCertIdCallback.accept(subSCMCertHolder.getSerialNumber().toString());
+    } catch (InterruptedException | ExecutionException | IOException |
+             java.security.cert.CertificateException e) {
+      LOG.error("Error while fetching/storing SCM signed certificate.", e);
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * This function creates/initializes a certificate server as needed.
+   * This function is idempotent, so calling this again and again after the
+   * server is initialized is not a problem.
+   *
+   * @param config
+   * @param scmCertStore
+   * @param pkiProfile
+   * @param component
+   */
+  public CertificateServer initializeRootCertificateServer(
+      SecurityConfig config, CertificateStore scmCertStore,
+      BigInteger rootCertId, PKIProfile pkiProfile, String component)
+      throws IOException {
+    String subject = String.format(SCM_ROOT_CA_PREFIX, rootCertId) +
+        InetAddress.getLocalHost().getHostName();
+
+    DefaultCAServer rootCAServer = new DefaultCAServer(subject,
+        cId, scmId, scmCertStore, rootCertId, pkiProfile,
+        component);
+
+    rootCAServer.init(config, CAType.ROOT);
+    return rootCAServer;
+  }
+
+  /**
+   * Persists the sub SCM signed certificate to the location which can be
+   * read by sub CA Certificate server.
+   *
+   * @param certificateHolder
+   * @throws IOException
+   */
+  private void persistSubCACertificate(
+      String certificateHolder) throws IOException {
+    CertificateCodec certCodec =
+        new CertificateCodec(getSecurityConfig(), getComponentName());
+
+    certCodec.writeCertificate(certCodec.getLocation().toAbsolutePath(),
+        getSecurityConfig().getCertificateFileName(), certificateHolder);
+  }
 }
diff --git 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClientTestImpl.java
 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClientTestImpl.java
index 286dd154c7..7ba14ce491 100644
--- 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClientTestImpl.java
+++ 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClientTestImpl.java
@@ -260,8 +260,7 @@ public class CertificateClientTestImpl implements 
CertificateClient {
   }
 
   @Override
-  public InitResponse init() throws CertificateException {
-    return null;
+  public void initWithRecovery() throws IOException {
   }
 
   @Override
diff --git 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/client/TestDnCertificateClientInit.java
 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/client/TestDnCertificateClientInit.java
index 18cf33131c..987e841e51 100644
--- 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/client/TestDnCertificateClientInit.java
+++ 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/client/TestDnCertificateClientInit.java
@@ -61,7 +61,7 @@ public class TestDnCertificateClientInit {
 
   private KeyPair keyPair;
   private String certSerialId = "3284792342234";
-  private CertificateClient dnCertificateClient;
+  private DNCertificateClient dnCertificateClient;
   private HDDSKeyGenerator keyGenerator;
   private Path metaDirPath;
   private SecurityConfig securityConfig;
@@ -74,7 +74,7 @@ public class TestDnCertificateClientInit {
         arguments(false, false, false, GETCERT),
         arguments(false, false, true, FAILURE),
         arguments(false, true, false, FAILURE),
-        arguments(true, false, false, FAILURE),
+        arguments(true, false, false, GETCERT),
         arguments(false, true, true, FAILURE),
         arguments(true, true, false, GETCERT),
         arguments(true, false, true, SUCCESS),
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/HASecurityUtils.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/HASecurityUtils.java
index 41b314aba5..72f4c4dc87 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/HASecurityUtils.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/HASecurityUtils.java
@@ -17,8 +17,6 @@
 package org.apache.hadoop.hdds.scm.ha;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmNodeDetailsProto;
-import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertResponseProto;
 import 
org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdds.ratis.RatisHelper;
 import org.apache.hadoop.hdds.scm.proxy.SCMClientConfig;
@@ -30,13 +28,9 @@ import 
org.apache.hadoop.hdds.security.x509.certificate.authority.CAType;
 import 
org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateServer;
 import 
org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore;
 import 
org.apache.hadoop.hdds.security.x509.certificate.authority.DefaultCAServer;
-import 
org.apache.hadoop.hdds.security.x509.certificate.authority.profile.DefaultCAProfile;
 import 
org.apache.hadoop.hdds.security.x509.certificate.authority.profile.PKIProfile;
 import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
-import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient.InitResponse;
 import 
org.apache.hadoop.hdds.security.x509.certificate.client.SCMCertificateClient;
-import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
-import 
org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateSignRequest;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.conf.RaftProperties;
@@ -47,31 +41,20 @@ import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.retry.RetryPolicies;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.util.TimeDuration;
-import org.bouncycastle.cert.X509CertificateHolder;
-import org.bouncycastle.pkcs.PKCS10CertificationRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.math.BigInteger;
 import java.net.InetAddress;
-import java.security.cert.CertPath;
-import java.security.cert.CertificateException;
 import java.security.cert.X509Certificate;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType.SCM;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_INFO_WAIT_DURATION;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_INFO_WAIT_DURATION_DEFAULT;
-import static 
org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateApprover.ApprovalType.KERBEROS_TRUSTED;
-import static 
org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateSignRequest.getEncodedString;
-import static 
org.apache.hadoop.hdds.utils.HddsServerUtil.getScmSecurityClientWithMaxRetry;
 import static org.apache.hadoop.ozone.OzoneConsts.SCM_ROOT_CA_COMPONENT_NAME;
 import static org.apache.hadoop.ozone.OzoneConsts.SCM_ROOT_CA_PREFIX;
-import static org.apache.hadoop.ozone.OzoneConsts.SCM_SUB_CA_PREFIX;
-import static org.apache.hadoop.security.UserGroupInformation.getCurrentUser;
 
 /**
  * Utilities for SCM HA security.
@@ -99,144 +82,23 @@ public final class HASecurityUtils {
 
     SecurityConfig securityConfig = new SecurityConfig(conf);
     SCMSecurityProtocolClientSideTranslatorPB scmSecurityClient =
-        getScmSecurityClientWithMaxRetry(conf, getCurrentUser());
-    try (SCMCertificateClient certClient =
+        getScmSecurityClientWithFixedDuration(conf);
+    try (CertificateClient certClient =
         new SCMCertificateClient(securityConfig, scmSecurityClient,
             scmStorageConfig.getScmId(), scmStorageConfig.getClusterID(),
-            scmStorageConfig.getScmCertSerialId(), scmHostname)) {
-      InitResponse response = certClient.init();
-      LOG.info("Init response: {}", response);
-      switch (response) {
-      case SUCCESS:
-        LOG.info("Initialization successful.");
-        break;
-      case GETCERT:
-        if (!primaryscm) {
-          getRootCASignedSCMCert(conf, certClient, securityConfig,
-              scmStorageConfig, scmHostname);
-        } else {
-          getPrimarySCMSelfSignedCert(certClient, securityConfig,
-              scmStorageConfig, scmHostname);
-        }
-        LOG.info("Successfully stored SCM signed certificate.");
-        break;
-      case FAILURE:
-        LOG.error("SCM security initialization failed.");
-        throw new RuntimeException("OM security initialization failed.");
-      default:
-        LOG.error("SCM security initialization failed. Init response: {}",
-            response);
-        throw new RuntimeException("SCM security initialization failed.");
-      }
+            scmStorageConfig.getScmCertSerialId(), scmHostname, primaryscm,
+            certIDString -> {
+              try {
+                scmStorageConfig.setScmCertSerialId(certIDString);
+              } catch (IOException e) {
+                LOG.error("Failed to set new certificate ID", e);
+                throw new RuntimeException("Failed to set new certificate ID");
+              }
+            })) {
+      certClient.initWithRecovery();
     }
   }
 
-  /**
-   * For bootstrapped SCM get sub-ca signed certificate and root CA
-   * certificate using scm security client and store it using certificate
-   * client.
-   */
-  private static void getRootCASignedSCMCert(
-      OzoneConfiguration configuration, SCMCertificateClient client,
-      SecurityConfig securityConfig,
-      SCMStorageConfig scmStorageConfig, String scmHostname) {
-    try {
-      // Create SCM security client.
-      SCMSecurityProtocolClientSideTranslatorPB secureScmClient =
-          getScmSecurityClientWithFixedDuration(configuration);
-
-      // Generate CSR.
-      PKCS10CertificationRequest csr = generateCSR(client, scmStorageConfig,
-          securityConfig, scmHostname);
-
-      ScmNodeDetailsProto scmNodeDetailsProto =
-          ScmNodeDetailsProto.newBuilder()
-              .setClusterId(scmStorageConfig.getClusterID())
-              .setHostName(scmHostname)
-              .setScmNodeId(scmStorageConfig.getScmId()).build();
-
-      // Get SCM sub CA cert.
-      SCMGetCertResponseProto response = secureScmClient.
-          getSCMCertChain(scmNodeDetailsProto, getEncodedString(csr), false);
-      String pemEncodedCert = response.getX509Certificate();
-
-      // Store SCM sub CA and root CA certificate.
-      if (response.hasX509CACertificate()) {
-        String pemEncodedRootCert = response.getX509CACertificate();
-        client.storeCertificate(
-            pemEncodedRootCert, CAType.SUBORDINATE);
-        client.storeCertificate(pemEncodedCert, CAType.NONE);
-        //note: this does exactly the same as store certificate
-        persistSubCACertificate(securityConfig, client,
-            pemEncodedCert);
-
-        X509Certificate certificate =
-            CertificateCodec.getX509Certificate(pemEncodedCert);
-        // Persist scm cert serial ID.
-        scmStorageConfig.setScmCertSerialId(certificate.getSerialNumber()
-            .toString());
-      } else {
-        throw new RuntimeException("Unable to retrieve SCM certificate chain");
-      }
-    } catch (IOException | CertificateException e) {
-      LOG.error("Error while fetching/storing SCM signed certificate.", e);
-      throw new RuntimeException(e);
-    }
-  }
-
-
-  /**
-   * For primary SCM get sub-ca signed certificate and root CA certificate by
-   * root CA certificate server and store it using certificate client.
-   */
-  private static void getPrimarySCMSelfSignedCert(SCMCertificateClient client,
-      SecurityConfig config, SCMStorageConfig scmStorageConfig,
-      String scmHostname) {
-    try {
-      CertificateServer rootCAServer =
-          initializeRootCertificateServer(config, null, scmStorageConfig,
-              new DefaultCAProfile());
-
-      // First SCM sub CA certificate ID 2
-      String certId = BigInteger.ONE.add(BigInteger.ONE).toString();
-
-      PKCS10CertificationRequest csr = generateCSR(client, scmStorageConfig,
-          config, scmHostname);
-
-      CertPath subSCMCertHolderList = rootCAServer.
-          requestCertificate(csr, KERBEROS_TRUSTED, SCM, certId).get();
-
-      CertPath rootCACertificatePath =
-          rootCAServer.getCaCertPath();
-
-      String pemEncodedCert =
-          CertificateCodec.getPEMEncodedString(subSCMCertHolderList);
-
-      String pemEncodedRootCert =
-          CertificateCodec.getPEMEncodedString(rootCACertificatePath);
-
-      client.storeCertificate(
-          pemEncodedRootCert, CAType.SUBORDINATE);
-      client.storeCertificate(pemEncodedCert, CAType.NONE);
-      //note: this does exactly the same as store certificate
-      persistSubCACertificate(config, client, pemEncodedCert);
-      X509Certificate cert =
-          (X509Certificate) subSCMCertHolderList.getCertificates().get(0);
-      X509CertificateHolder subSCMCertHolder =
-          CertificateCodec.getCertificateHolder(cert);
-
-      // Persist scm cert serial ID.
-      scmStorageConfig.setScmCertSerialId(subSCMCertHolder.getSerialNumber()
-          .toString());
-    } catch (InterruptedException | ExecutionException | IOException |
-        CertificateException  e) {
-      LOG.error("Error while fetching/storing SCM signed certificate.", e);
-      Thread.currentThread().interrupt();
-      throw new RuntimeException(e);
-    }
-
-  }
-
   /**
    * This function creates/initializes a certificate server as needed.
    * This function is idempotent, so calling this again and again after the
@@ -284,49 +146,6 @@ public final class HASecurityUtils {
         SCM_ROOT_CA_COMPONENT_NAME);
   }
 
-  /**
-   * Generate CSR to obtain SCM sub CA certificate.
-   */
-  private static PKCS10CertificationRequest generateCSR(
-      SCMCertificateClient client, SCMStorageConfig scmStorageConfig,
-      SecurityConfig config, String scmHostname)
-      throws IOException {
-    CertificateSignRequest.Builder builder = client.getCSRBuilder();
-
-    // Get host name.
-    String subject = SCM_SUB_CA_PREFIX + scmHostname;
-
-    builder.setConfiguration(config)
-        .setScmID(scmStorageConfig.getScmId())
-        .setClusterID(scmStorageConfig.getClusterID())
-        .setSubject(subject);
-
-    LOG.info("Creating csr for SCM->hostName:{},scmId:{},clusterId:{}," +
-            "subject:{}", scmHostname, scmStorageConfig.getScmId(),
-        scmStorageConfig.getClusterID(), subject);
-
-    return builder.build();
-  }
-
-  /**
-   * Persists the sub SCM signed certificate to the location which can be
-   * read by sub CA Certificate server.
-   *
-   * @param config
-   * @param certificateClient
-   * @param certificateHolder
-   * @throws IOException
-   */
-  private static void persistSubCACertificate(SecurityConfig config,
-      CertificateClient certificateClient,
-      String certificateHolder) throws IOException {
-    CertificateCodec certCodec =
-        new CertificateCodec(config, certificateClient.getComponentName());
-
-    certCodec.writeCertificate(certCodec.getLocation().toAbsolutePath(),
-        config.getCertificateFileName(), certificateHolder);
-  }
-
   /**
    * Create GrpcTlsConfig.
    *
diff --git 
a/hadoop-ozone/dist/src/main/compose/ozonesecure/public-key-cert-recovery-test.sh
 
b/hadoop-ozone/dist/src/main/compose/ozonesecure/public-key-cert-recovery-test.sh
new file mode 100755
index 0000000000..01293c6685
--- /dev/null
+++ 
b/hadoop-ozone/dist/src/main/compose/ozonesecure/public-key-cert-recovery-test.sh
@@ -0,0 +1,35 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+#suite:secure
+
+#test recon
+wait_for_port recon 9888 120
+wait_for_execute_command recon 60 "find /data/metadata/recon/keys/public.pem 
-delete && find /data/metadata/recon/certs/*.crt -delete"
+docker-compose stop recon
+docker-compose start recon
+wait_for_port recon 9888 120
+wait_for_execute_command recon 60 "find /data/metadata/recon/keys/public.pem 
&& find /data/metadata/recon/certs/ROOTCA*.crt"
+
+#test DN
+wait_for_port datanode 9864 120
+wait_for_execute_command datanode 60 "find /data/metadata/dn/keys/public.pem 
-delete && find /data/metadata/dn/certs/*.crt -delete"
+docker-compose stop datanode
+docker-compose start datanode
+wait_for_port datanode 9864 120
+wait_for_execute_command datanode 60 "find /data/metadata/dn/keys/public.pem 
&& find /data/metadata/dn/certs/ROOTCA*.crt"
+
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure/test.sh 
b/hadoop-ozone/dist/src/main/compose/ozonesecure/test.sh
index cc8492ecdc..41f4fa9a58 100755
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure/test.sh
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure/test.sh
@@ -54,3 +54,6 @@ docker-compose up -d --scale datanode=2
 execute_robot_test scm -v container:1 -v count:2 replication/wait.robot
 docker-compose up -d --scale datanode=3
 execute_robot_test scm -v container:1 -v count:3 replication/wait.robot
+
+#test public key and certificate recovery
+source "$COMPOSE_DIR/public-key-cert-recovery-test.sh"
\ No newline at end of file
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDelegationToken.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDelegationToken.java
index 1bb6fadf52..cb6bbc9dd0 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDelegationToken.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDelegationToken.java
@@ -26,12 +26,17 @@ import java.security.KeyPair;
 import java.security.PrivilegedExceptionAction;
 import java.util.Properties;
 import java.util.stream.Stream;
+import java.util.UUID;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.HddsTestUtils;
 import org.apache.hadoop.hdds.scm.ScmConfig;
+import org.apache.hadoop.hdds.scm.ha.HASecurityUtils;
 import org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig;
+import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.security.SecurityConfig;
+import 
org.apache.hadoop.hdds.security.x509.certificate.authority.DefaultCAServer;
 import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClientTestImpl;
 import org.apache.hadoop.hdds.security.x509.keys.HDDSKeyGenerator;
 import org.apache.hadoop.hdds.security.x509.keys.KeyCodec;
@@ -125,9 +130,8 @@ public final class TestDelegationToken {
   private StorageContainerManager scm;
   private OzoneManager om;
   private String host;
-  private String clusterId;
-  private String scmId;
-  private String omId;
+  private String clusterId = UUID.randomUUID().toString();
+  private String scmId = UUID.randomUUID().toString();
   private OzoneManagerProtocolClientSideTranslatorPB omClient;
 
   public static Stream<Boolean> options() {
@@ -245,6 +249,17 @@ public final class TestDelegationToken {
         spnegoKeytab.getAbsolutePath());
   }
 
+  private void initSCM() throws IOException {
+    DefaultCAServer.setTestSecureFlag(true);
+    SCMStorageConfig scmStore = new SCMStorageConfig(conf);
+    scmStore.setClusterId(clusterId);
+    scmStore.setScmId(scmId);
+    HASecurityUtils.initializeSecurity(scmStore, conf,
+        InetAddress.getLocalHost().getHostName(), true);
+    scmStore.setPrimaryScmNodeId(scmId);
+    // writes the version file properties
+    scmStore.initialize();
+  }
 
   /**
    * Performs following tests for delegation token.
@@ -258,6 +273,9 @@ public final class TestDelegationToken {
   @ParameterizedTest
   @MethodSource("options")
   public void testDelegationToken(boolean useIp) throws Exception {
+    initSCM();
+    scm = HddsTestUtils.getScmSimple(conf);
+    scm.start();
 
     // Capture logs for assertions
     LogCapturer logs = LogCapturer.captureLogs(Server.AUDITLOG);
@@ -297,6 +315,7 @@ public final class TestDelegationToken {
       om.start();
       UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
       String username = ugi.getUserName();
+      logs.clearOutput();
 
       // Get first OM client which will authenticate via Kerberos
       omClient = new OzoneManagerProtocolClientSideTranslatorPB(
@@ -309,7 +328,7 @@ public final class TestDelegationToken {
 
       // Case 1: Test successful delegation token.
       Token<OzoneTokenIdentifier> token = omClient
-          .getDelegationToken(new Text("om"));
+          .getDelegationToken(new Text("scm"));
 
       // Case 2: Test successful token renewal.
       long renewalTime = omClient.renewDelegationToken(token);
@@ -402,12 +421,18 @@ public final class TestDelegationToken {
 
   private void setupOm(OzoneConfiguration config) throws Exception {
     OMStorage omStore = new OMStorage(config);
-    omStore.setClusterId("testClusterId");
+    omStore.setClusterId(clusterId);
     omStore.setOmCertSerialId(OM_CERT_SERIAL_ID);
     // writes the version file properties
     omStore.initialize();
+
+    // OM uses scm/[email protected] to access SCM
+    config.set(OZONE_OM_KERBEROS_PRINCIPAL_KEY,
+        "scm/" + host + "@" + miniKdc.getRealm());
+    omKeyTab = new File(workDir, "scm.keytab");
+    config.set(OZONE_OM_KERBEROS_KEYTAB_FILE_KEY, omKeyTab.getAbsolutePath());
+
     OzoneManager.setTestSecureOmFlag(true);
     om = OzoneManager.createOm(config);
   }
-
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
index 48b1f32842..939161ee84 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
@@ -466,6 +466,21 @@ final class TestSecureOzoneCluster {
           SCMHANodeDetails.loadSCMHAConfig(conf, scmStore)
                   .getLocalNodeDetails(), conf);
     }
+
+    /*
+     * As all these processes run inside the same JVM, there are issues around
+     * the Hadoop UGI if different processes run with different principals.
+     * In this test, the OM has to contact the SCM to download certs. SCM runs
+     * as scm/host@REALM, but the OM logs in as om/host@REALM, and then the 
test
+     * fails, and the OM is unable to contact the SCM due to kerberos login
+     * issues. To work around that, have the OM run as the same principal as 
the
+     * SCM, and then the test passes.
+     *
+     */
+    conf.set(HDDS_SCM_KERBEROS_PRINCIPAL_KEY,
+        "om/" + host + "@" + miniKdc.getRealm());
+    File keyTab = new File(workDir, "om.keytab");
+    conf.set(HDDS_SCM_KERBEROS_KEYTAB_FILE_KEY, keyTab.getAbsolutePath());
   }
 
   @Test
@@ -519,10 +534,17 @@ final class TestSecureOzoneCluster {
     initSCM();
     // Create a secure SCM instance as om client will connect to it
     scm = HddsTestUtils.getScmSimple(conf);
-    setupOm(conf);
-    conf.set(OZONE_OM_KERBEROS_PRINCIPAL_KEY,
-        "[email protected]");
-    testCommonKerberosFailures(() -> OzoneManager.createOm(conf));
+    try {
+      scm.start();
+      setupOm(conf);
+      conf.set(OZONE_OM_KERBEROS_PRINCIPAL_KEY,
+          "[email protected]");
+      testCommonKerberosFailures(() -> OzoneManager.createOm(conf));
+    } finally {
+      if (scm != null) {
+        scm.stop();
+      }
+    }
   }
 
   /**
@@ -536,25 +558,32 @@ final class TestSecureOzoneCluster {
     LogCapturer logs = LogCapturer.captureLogs(OzoneManager.getLogger());
     GenericTestUtils.setLogLevel(OzoneManager.getLogger(), INFO);
 
-    setupOm(conf);
     try {
+      scm.start();
+      setupOm(conf);
       om.start();
     } catch (Exception ex) {
       // Expects timeout failure from scmClient in om but om user login via
       // kerberos should succeed.
       assertTrue(logs.getOutput().contains("Ozone Manager login successful"));
+    } finally {
+      if (scm != null) {
+        scm.stop();
+      }
     }
   }
 
   @Test
   public void testAccessControlExceptionOnClient() throws Exception {
     initSCM();
-    // Create a secure SCM instance as om client will connect to it
-    scm = HddsTestUtils.getScmSimple(conf);
     LogCapturer logs = LogCapturer.captureLogs(OzoneManager.getLogger());
     GenericTestUtils.setLogLevel(OzoneManager.getLogger(), INFO);
-    setupOm(conf);
     try {
+      // Create a secure SCM instance as om client will connect to it
+      scm = HddsTestUtils.getScmSimple(conf);
+      scm.start();
+
+      setupOm(conf);
       om.setCertClient(new CertificateClientTestImpl(conf));
       om.start();
     } catch (Exception ex) {
@@ -614,15 +643,18 @@ final class TestSecureOzoneCluster {
         .setLogLevel(LoggerFactory.getLogger(Server.class.getName()), INFO);
     LogCapturer omLogs = LogCapturer.captureLogs(OzoneManager.getLogger());
 
-    // Setup secure OM for start.
-    OzoneConfiguration newConf = new OzoneConfiguration(conf);
-    int tokenMaxLifetime = 1000;
-    newConf.setLong(DELEGATION_TOKEN_MAX_LIFETIME_KEY, tokenMaxLifetime);
-    setupOm(newConf);
-    OzoneManager.setTestSecureOmFlag(true);
-    // Start OM
-
+    // Setup SCM
+    initSCM();
+    scm = HddsTestUtils.getScmSimple(conf);
     try {
+      // Start SCM
+      scm.start();
+
+      // Setup secure OM for start.
+      int tokenMaxLifetime = 1000;
+      conf.setLong(DELEGATION_TOKEN_MAX_LIFETIME_KEY, tokenMaxLifetime);
+      setupOm(conf);
+      OzoneManager.setTestSecureOmFlag(true);
       om.setCertClient(new CertificateClientTestImpl(conf));
       om.start();
 
@@ -683,13 +715,16 @@ final class TestSecureOzoneCluster {
       assertTrue(omLogs.getOutput().contains("can't be found in cache"));
       omLogs.clearOutput();
     } finally {
+      if (scm != null) {
+        scm.stop();
+      }
       IOUtils.closeQuietly(om);
     }
   }
 
   private void setupOm(OzoneConfiguration config) throws Exception {
     OMStorage omStore = new OMStorage(config);
-    omStore.setClusterId("testClusterId");
+    omStore.setClusterId(clusterId);
     omStore.setOmCertSerialId(OM_CERT_SERIAL_ID);
     // writes the version file properties
     omStore.initialize();
@@ -700,10 +735,13 @@ final class TestSecureOzoneCluster {
   @Test
   @Flaky("HDDS-9349")
   public void testGetSetRevokeS3Secret() throws Exception {
-
-    // Setup secure OM for start
-    setupOm(conf);
+    initSCM();
     try {
+      scm = HddsTestUtils.getScmSimple(conf);
+      scm.start();
+
+      // Setup secure OM for start
+      setupOm(conf);
       // Start OM
       om.setCertClient(new CertificateClientTestImpl(conf));
       om.start();
@@ -793,6 +831,9 @@ final class TestSecureOzoneCluster {
       }
 
     } finally {
+      if (scm != null) {
+        scm.stop();
+      }
       IOUtils.closeQuietly(om);
     }
   }
@@ -803,28 +844,9 @@ final class TestSecureOzoneCluster {
   @Test
   public void testSecureOmReInit() throws Exception {
     LogCapturer omLogs =
-        LogCapturer.captureLogs(OzoneManager.getLogger());
+        LogCapturer.captureLogs(OMCertificateClient.LOG);
     omLogs.clearOutput();
 
-    /*
-     * As all these processes run inside the same JVM, there are issues around
-     * the Hadoop UGI if different processes run with different principals.
-     * In this test, the OM has to contact the SCM to download certs. SCM runs
-     * as scm/host@REALM, but the OM logs in as om/host@REALM, and then the 
test
-     * fails, and the OM is unable to contact the SCM due to kerberos login
-     * issues. To work around that, have the OM run as the same principal as 
the
-     * SCM, and then the test passes.
-     *
-     * TODO: Need to look into this further to see if there is a better way to
-     *       address this problem.
-     */
-    String realm = miniKdc.getRealm();
-    conf.set(OZONE_OM_KERBEROS_PRINCIPAL_KEY,
-        "scm/" + host + "@" + realm);
-    omKeyTab = new File(workDir, "scm.keytab");
-    conf.set(OZONE_OM_KERBEROS_KEYTAB_FILE_KEY,
-        omKeyTab.getAbsolutePath());
-
     initSCM();
     try {
       scm = HddsTestUtils.getScmSimple(conf);
@@ -853,7 +875,7 @@ final class TestSecureOzoneCluster {
       assertNotNull(om.getCertificateClient().getCertificate());
       assertTrue(omLogs.getOutput().contains("Init response: GETCERT"));
       assertTrue(omLogs.getOutput().contains("Successfully stored " +
-          "SCM signed certificate"));
+          "OM signed certificate"));
       X509Certificate certificate = om.getCertificateClient().getCertificate();
       validateCertificate(certificate);
 
@@ -871,7 +893,7 @@ final class TestSecureOzoneCluster {
   @Test
   public void testSecureOmInitSuccess() throws Exception {
     LogCapturer omLogs =
-        LogCapturer.captureLogs(OzoneManager.getLogger());
+        LogCapturer.captureLogs(OMCertificateClient.LOG);
     omLogs.clearOutput();
     initSCM();
     try {
@@ -890,7 +912,7 @@ final class TestSecureOzoneCluster {
       assertEquals(3, om.getCertificateClient().getTrustChain().size());
       assertTrue(omLogs.getOutput().contains("Init response: GETCERT"));
       assertTrue(omLogs.getOutput().contains("Successfully stored " +
-          "SCM signed certificate"));
+          "OM signed certificate"));
       X509Certificate certificate = om.getCertificateClient().getCertificate();
       validateCertificate(certificate);
       String pemEncodedCACert =
@@ -958,10 +980,8 @@ final class TestSecureOzoneCluster {
     when(scmClient.getOMCertChain(any(), anyString()))
         .thenReturn(responseProto);
 
-    try (OMCertificateClient client =
-        new OMCertificateClient(
-            securityConfig, scmClient, omStorage, omInfo, "", scmId, null, 
null)
-    ) {
+    try (OMCertificateClient client = new OMCertificateClient(
+        securityConfig, scmClient, omStorage, omInfo, "", scmId, null, null)) {
       client.init();
 
       // create Ozone Manager instance, it will start the monitor task
@@ -1146,7 +1166,11 @@ final class TestSecureOzoneCluster {
    */
   @Test
   public void testDelegationTokenRenewCrossCertificateRenew() throws Exception 
{
+    initSCM();
     try {
+      scm = HddsTestUtils.getScmSimple(conf);
+      scm.start();
+
       // Setup secure OM for start.
       final int certLifetime = 40 * 1000; // 40s
       OzoneConfiguration newConf = new OzoneConfiguration(conf);
@@ -1210,6 +1234,9 @@ final class TestSecureOzoneCluster {
       assertTrue(expiryTime > 0);
       assertTrue(new Date(expiryTime).before(omCert.getNotAfter()));
     } finally {
+      if (scm != null) {
+        scm.stop();
+      }
       IOUtils.closeQuietly(om);
     }
   }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSecureOzoneManager.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSecureOzoneManager.java
index 03477f851e..38d4ca1cff 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSecureOzoneManager.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSecureOzoneManager.java
@@ -124,7 +124,7 @@ public class TestSecureOzoneManager {
     // Case 1: When keypair as well as certificate is missing. Initial keypair
     // boot-up. Get certificate will fail when SCM is not running.
     SecurityConfig securityConfig = new SecurityConfig(conf);
-    CertificateClient client =
+    OMCertificateClient client =
         new OMCertificateClient(
             securityConfig, null, omStorage, omInfo, "", scmId, null, null);
     Assert.assertEquals(CertificateClient.InitResponse.GETCERT, client.init());
@@ -151,9 +151,9 @@ public class TestSecureOzoneManager {
             securityConfig, null, omStorage, omInfo, "", null, null, null);
     FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation(COMPONENT)
         .toString(), securityConfig.getPublicKeyFileName()).toFile());
-    Assert.assertEquals(CertificateClient.InitResponse.FAILURE, client.init());
+    Assert.assertEquals(CertificateClient.InitResponse.GETCERT, client.init());
     Assert.assertNotNull(client.getPrivateKey());
-    Assert.assertNull(client.getPublicKey());
+    Assert.assertNotNull(client.getPublicKey());
     Assert.assertNull(client.getCertificate());
     client.close();
 
@@ -162,7 +162,6 @@ public class TestSecureOzoneManager {
         new OMCertificateClient(
             securityConfig, null, omStorage, omInfo, "", null, null, null);
     KeyCodec keyCodec = new KeyCodec(securityConfig, COMPONENT);
-    keyCodec.writePublicKey(publicKey);
     FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation(COMPONENT)
         .toString(), securityConfig.getPrivateKeyFileName()).toFile());
     Assert.assertEquals(CertificateClient.InitResponse.FAILURE, client.init());
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 387b06f9be..b1fb9f003f 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -113,7 +113,6 @@ import 
org.apache.hadoop.hdds.security.symmetric.SecretKeySignerClient;
 import org.apache.hadoop.hdds.security.symmetric.DefaultSecretKeySignerClient;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSecretManager;
 import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
-import 
org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateSignRequest;
 import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
 import org.apache.hadoop.hdds.server.http.RatisDropwizardExports;
 import org.apache.hadoop.hdds.utils.HAUtils;
@@ -1408,31 +1407,19 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     SCMSecurityProtocolClientSideTranslatorPB scmSecurityClient =
         getScmSecurityClientWithMaxRetry(conf, getCurrentUser());
 
-    CertificateClient certClient =
+    OMCertificateClient certClient =
         new OMCertificateClient(
             new SecurityConfig(conf), scmSecurityClient, omStore, omInfo,
-            "", scmId, null, null);
-    CertificateClient.InitResponse response = certClient.init();
-    LOG.info("Init response: {}", response);
-    switch (response) {
-    case SUCCESS:
-      LOG.info("Initialization successful.");
-      break;
-    case GETCERT:
-      // Sign and persist OM cert.
-      CertificateSignRequest.Builder builder = certClient.getCSRBuilder();
-      omStore.setOmCertSerialId(
-          certClient.signAndStoreCertificate(builder.build()));
-      LOG.info("Successfully stored SCM signed certificate.");
-      break;
-    case FAILURE:
-      LOG.error("OM security initialization failed.");
-      throw new RuntimeException("OM security initialization failed.");
-    default:
-      LOG.error("OM security initialization failed. Init response: {}",
-          response);
-      throw new RuntimeException("OM security initialization failed.");
-    }
+            "", scmId,
+            certId -> {
+              try {
+                omStore.setOmCertSerialId(certId);
+              } catch (IOException e) {
+                LOG.error("Failed to set new certificate ID", e);
+                throw new RuntimeException("Failed to set new certificate ID");
+              }
+            }, null);
+    certClient.initWithRecovery();
   }
 
   private void initializeRatisDirs(OzoneConfiguration conf) throws IOException 
{
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/TestOmCertificateClientInit.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/TestOmCertificateClientInit.java
index ce480cd0ad..7aa0a0fcca 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/TestOmCertificateClientInit.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/TestOmCertificateClientInit.java
@@ -22,7 +22,6 @@ import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.security.SecurityConfig;
-import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
 import org.apache.hadoop.hdds.security.x509.keys.HDDSKeyGenerator;
 import org.apache.hadoop.hdds.security.x509.keys.KeyCodec;
@@ -65,7 +64,7 @@ public class TestOmCertificateClientInit {
 
   private KeyPair keyPair;
   private String certSerialId = "3284792342234";
-  private CertificateClient omCertificateClient;
+  private OMCertificateClient omCertificateClient;
   private HDDSKeyGenerator keyGenerator;
   private Path metaDirPath;
   private SecurityConfig securityConfig;
@@ -78,7 +77,7 @@ public class TestOmCertificateClientInit {
         arguments(false, false, false, GETCERT),
         arguments(false, false, true, FAILURE),
         arguments(false, true, false, FAILURE),
-        arguments(true, false, false, FAILURE),
+        arguments(true, false, false, GETCERT),
         arguments(false, true, true, FAILURE),
         arguments(true, true, false, GETCERT),
         arguments(true, false, true, SUCCESS),
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/TestOzoneDelegationTokenSecretManager.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/TestOzoneDelegationTokenSecretManager.java
index 3beff0aa71..de00db6117 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/TestOzoneDelegationTokenSecretManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/TestOzoneDelegationTokenSecretManager.java
@@ -34,7 +34,6 @@ import java.util.UUID;
 import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.security.SecurityConfig;
-import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
 import org.apache.hadoop.hdds.server.ServerUtils;
 import org.apache.hadoop.io.Text;
@@ -84,7 +83,7 @@ public class TestOzoneDelegationTokenSecretManager {
   private OzoneManager om;
   private OzoneDelegationTokenSecretManager secretManager;
   private SecurityConfig securityConfig;
-  private CertificateClient certificateClient;
+  private OMCertificateClient certificateClient;
   private long expiryTime;
   private Text serviceRpcAdd;
   private OzoneConfiguration conf;
@@ -138,7 +137,7 @@ public class TestOzoneDelegationTokenSecretManager {
   /**
    * Helper function to create certificate client.
    * */
-  private CertificateClient setupCertificateClient() throws Exception {
+  private OMCertificateClient setupCertificateClient() throws Exception {
     KeyPair keyPair = KeyStoreTestUtil.generateKeyPair("RSA");
     CertificateFactory fact = CertificateCodec.getCertFactory();
     X509Certificate singleCert = KeyStoreTestUtil
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServer.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServer.java
index 7a02f08375..bae3b3ea16 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServer.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServer.java
@@ -183,29 +183,7 @@ public class ReconServer extends GenericCli {
     SecurityConfig secConf = new SecurityConfig(configuration);
     certClient = new ReconCertificateClient(secConf, scmSecurityClient,
         reconStorage, this::saveNewCertId, this::terminateRecon);
-
-    CertificateClient.InitResponse response = certClient.init();
-    LOG.info("Init response: {}", response);
-    switch (response) {
-    case SUCCESS:
-      LOG.info("Initialization successful, case:{}.", response);
-      break;
-    case GETCERT:
-      String certId = certClient.signAndStoreCertificate(
-          certClient.getCSRBuilder().build());
-      reconStorage.setReconCertSerialId(certId);
-      reconStorage.persistCurrentState();
-      LOG.info("Successfully stored SCM signed certificate, case:{}.",
-          response);
-      break;
-    case FAILURE:
-      LOG.error("Recon security initialization failed, case:{}.", response);
-      throw new RuntimeException("Recon security initialization failed.");
-    default:
-      LOG.error("Recon security initialization failed. Init response: {}",
-          response);
-      throw new RuntimeException("Recon security initialization failed.");
-    }
+    certClient.initWithRecovery();
   }
 
   public void saveNewCertId(String newCertId) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to