This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d09598dd3 HDDS-8958. Handle trust chain changes in clients when 
rootCAs are rotated. (#5649)
2d09598dd3 is described below

commit 2d09598dd352a81fb6a59189b3fe9c6027bdf785
Author: Istvan Fajth <[email protected]>
AuthorDate: Mon Dec 11 09:56:42 2023 +0100

    HDDS-8958. Handle trust chain changes in clients when rootCAs are rotated. 
(#5649)
---
 .../hdds/scm/ClientCredentialInterceptor.java      |  65 ----
 .../hadoop/hdds/scm/ECXceiverClientGrpc.java       |   7 +-
 .../apache/hadoop/hdds/scm/XceiverClientGrpc.java  |  23 +-
 .../hadoop/hdds/scm/XceiverClientManager.java      |  19 +-
 .../apache/hadoop/hdds/scm/XceiverClientRatis.java |  13 +-
 .../hadoop/hdds/scm/client/ClientTrustManager.java | 225 +++++++++++
 .../storage/TestBlockOutputStreamCorrectness.java  |   5 -
 .../org/apache/hadoop/hdds/ratis/RatisHelper.java  |   8 +-
 .../apache/hadoop/hdds/scm/XceiverClientSpi.java   |   6 -
 .../certificate/client/CACertificateProvider.java  |  31 ++
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |  15 -
 .../reconstruction/ECContainerOperationClient.java |  15 +-
 .../client/CertificateClientTestImpl.java          |   1 -
 .../hdds/scm/cli/ContainerOperationClient.java     |   8 +-
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |  34 +-
 .../hadoop/ozone/client/MockXceiverClientSpi.java  |   5 -
 .../hadoop/ozone/client/TestOzoneClient.java       |   8 +-
 .../hadoop/ozone/client/TestOzoneECClient.java     |   7 +-
 .../checksum/TestReplicatedFileChecksumHelper.java |   8 +-
 .../hadoop/ozone/om/helpers/ServiceInfoEx.java     |  36 +-
 .../ozoneimpl/TestOzoneContainerWithTLS.java       | 430 ++++++++++++---------
 .../ozone/om/TestOmContainerLocationCache.java     |   4 +-
 22 files changed, 606 insertions(+), 367 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ClientCredentialInterceptor.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ClientCredentialInterceptor.java
deleted file mode 100644
index 7a15808b2e..0000000000
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ClientCredentialInterceptor.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdds.scm;
-
-import org.apache.ratis.thirdparty.io.grpc.CallOptions;
-import org.apache.ratis.thirdparty.io.grpc.Channel;
-import org.apache.ratis.thirdparty.io.grpc.ClientCall;
-import org.apache.ratis.thirdparty.io.grpc.ClientInterceptor;
-import org.apache.ratis.thirdparty.io.grpc.ForwardingClientCall;
-import org.apache.ratis.thirdparty.io.grpc.Metadata;
-import org.apache.ratis.thirdparty.io.grpc.MethodDescriptor;
-
-import static org.apache.hadoop.ozone.OzoneConsts.OBT_METADATA_KEY;
-import static org.apache.hadoop.ozone.OzoneConsts.USER_METADATA_KEY;
-
-/**
- * GRPC client interceptor for ozone block token.
- */
-public class ClientCredentialInterceptor implements ClientInterceptor {
-
-  private final String user;
-  private final String token;
-
-  public ClientCredentialInterceptor(String user, String token) {
-    this.user = user;
-    this.token = token;
-  }
-
-  @Override
-  public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
-      MethodDescriptor<ReqT, RespT> method,
-      CallOptions callOptions,
-      Channel next) {
-
-    return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
-        next.newCall(method, callOptions)) {
-      @Override
-      public void start(Listener<RespT> responseListener, Metadata headers) {
-        if (token != null) {
-          headers.put(OBT_METADATA_KEY, token);
-        }
-        if (user != null) {
-          headers.put(USER_METADATA_KEY, user);
-        }
-        super.start(responseListener, headers);
-      }
-    };
-  }
-}
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ECXceiverClientGrpc.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ECXceiverClientGrpc.java
index dbf32c377b..224a79a59b 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ECXceiverClientGrpc.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ECXceiverClientGrpc.java
@@ -21,16 +21,15 @@ package org.apache.hadoop.hdds.scm;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.ratis.thirdparty.io.grpc.Status;
 import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
 
 import java.io.IOException;
-import java.security.cert.X509Certificate;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
@@ -54,8 +53,8 @@ public class ECXceiverClientGrpc extends XceiverClientGrpc {
   public ECXceiverClientGrpc(
       Pipeline pipeline,
       ConfigurationSource config,
-      List<X509Certificate> caCerts) {
-    super(pipeline, config, caCerts);
+      ClientTrustManager trustManager) {
+    super(pipeline, config, trustManager);
     this.enableRetries = 
config.getBoolean(OZONE_CLIENT_EC_GRPC_RETRIES_ENABLED,
         OZONE_CLIENT_EC_GRPC_RETRIES_ENABLED_DEFAULT);
     setTimeout(config.getTimeDuration(OzoneConfigKeys.
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index 5f8502d194..0a38e66048 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdds.scm;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
-import java.security.cert.X509Certificate;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -43,6 +42,7 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBl
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc.XceiverClientProtocolServiceStub;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.security.SecurityConfig;
@@ -94,7 +94,7 @@ public class XceiverClientGrpc extends XceiverClientSpi {
   private long timeout;
   private final SecurityConfig secConfig;
   private final boolean topologyAwareRead;
-  private final List<X509Certificate> caCerts;
+  private final ClientTrustManager trustManager;
   // Cache the DN which returned the GetBlock command so that the ReadChunk
   // command can be sent to the same DN.
   private final Map<DatanodeBlockID, DatanodeDetails> getBlockDNcache;
@@ -107,10 +107,10 @@ public class XceiverClientGrpc extends XceiverClientSpi {
    *
    * @param pipeline - Pipeline that defines the machines.
    * @param config   -- Ozone Config
-   * @param caCerts   - SCM ca certificate.
+   * @param trustManager - a {@link ClientTrustManager} with proper CA 
handling.
    */
   public XceiverClientGrpc(Pipeline pipeline, ConfigurationSource config,
-      List<X509Certificate> caCerts) {
+      ClientTrustManager trustManager) {
     super();
     Preconditions.checkNotNull(pipeline);
     Preconditions.checkNotNull(config);
@@ -128,7 +128,7 @@ public class XceiverClientGrpc extends XceiverClientSpi {
     this.topologyAwareRead = config.getBoolean(
         OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY,
         OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_DEFAULT);
-    this.caCerts = caCerts;
+    this.trustManager = trustManager;
     this.getBlockDNcache = new ConcurrentHashMap<>();
   }
 
@@ -157,15 +157,6 @@ public class XceiverClientGrpc extends XceiverClientSpi {
     connectToDatanode(dn);
   }
 
-  /**
-   * Token based auth is not currently supported, so this method works the same
-   * way as {@link #connect()}.
-   */
-  @Override
-  public void connect(String encodedToken) throws Exception {
-    connect();
-  }
-
   private synchronized void connectToDatanode(DatanodeDetails dn)
       throws IOException {
     if (isConnected(dn)) {
@@ -199,8 +190,8 @@ public class XceiverClientGrpc extends XceiverClientSpi {
             .intercept(new GrpcClientInterceptor());
     if (secConfig.isSecurityEnabled() && secConfig.isGrpcTlsEnabled()) {
       SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient();
-      if (caCerts != null) {
-        sslContextBuilder.trustManager(caCerts);
+      if (trustManager != null) {
+        sslContextBuilder.trustManager(trustManager);
       }
       if (secConfig.useTestCert()) {
         channelBuilder.overrideAuthority("localhost");
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
index 6b3e6cdbf4..46bd7e5345 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
@@ -20,8 +20,6 @@ package org.apache.hadoop.hdds.scm;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.security.cert.X509Certificate;
-import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
@@ -31,6 +29,7 @@ import org.apache.hadoop.hdds.conf.ConfigType;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneSecurityUtil;
@@ -68,10 +67,9 @@ public class XceiverClientManager implements Closeable, 
XceiverClientFactory {
       LoggerFactory.getLogger(XceiverClientManager.class);
   //TODO : change this to SCM configuration class
   private final ConfigurationSource conf;
-  private final ScmClientConfig clientConfig;
   private final Cache<String, XceiverClientSpi> clientCache;
   private final CacheMetrics cacheMetrics;
-  private List<X509Certificate> caCerts;
+  private ClientTrustManager trustManager;
 
   private static XceiverClientMetrics metrics;
   private boolean isSecurityEnabled;
@@ -89,16 +87,15 @@ public class XceiverClientManager implements Closeable, 
XceiverClientFactory {
 
   public XceiverClientManager(ConfigurationSource conf,
       ScmClientConfig clientConf,
-      List<X509Certificate> caCerts) throws IOException {
+      ClientTrustManager trustManager) throws IOException {
     Preconditions.checkNotNull(clientConf);
     Preconditions.checkNotNull(conf);
-    this.clientConfig = clientConf;
     long staleThresholdMs = clientConf.getStaleThreshold(MILLISECONDS);
     this.conf = conf;
     this.isSecurityEnabled = OzoneSecurityUtil.isSecurityEnabled(conf);
     if (isSecurityEnabled) {
-      Preconditions.checkNotNull(caCerts);
-      this.caCerts = caCerts;
+      Preconditions.checkNotNull(trustManager);
+      this.trustManager = trustManager;
     }
 
     this.clientCache = CacheBuilder.newBuilder()
@@ -249,13 +246,13 @@ public class XceiverClientManager implements Closeable, 
XceiverClientFactory {
             switch (type) {
             case RATIS:
               client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf,
-                  caCerts);
+                  trustManager);
               break;
             case STAND_ALONE:
-              client = new XceiverClientGrpc(pipeline, conf, caCerts);
+              client = new XceiverClientGrpc(pipeline, conf, trustManager);
               break;
             case EC:
-              client = new ECXceiverClientGrpc(pipeline, conf, caCerts);
+              client = new ECXceiverClientGrpc(pipeline, conf, trustManager);
               break;
             case CHAINED:
             default:
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
index 72ed30b37f..aff0aa966a 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
@@ -19,9 +19,7 @@
 package org.apache.hadoop.hdds.scm;
 
 import java.io.IOException;
-import java.security.cert.X509Certificate;
 import java.util.Collection;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
@@ -44,6 +42,7 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerC
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
 import org.apache.hadoop.hdds.ratis.RatisHelper;
+import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.security.SecurityConfig;
@@ -82,13 +81,13 @@ public final class XceiverClientRatis extends 
XceiverClientSpi {
 
   public static XceiverClientRatis newXceiverClientRatis(
       org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline,
-      ConfigurationSource ozoneConf, List<X509Certificate> caCerts) {
+      ConfigurationSource ozoneConf, ClientTrustManager trustManager) {
     final String rpcType = ozoneConf
         .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
             ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
     final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
     final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new
-        SecurityConfig(ozoneConf), caCerts);
+        SecurityConfig(ozoneConf), trustManager);
     return new XceiverClientRatis(pipeline,
         SupportedRpcType.valueOfIgnoreCase(rpcType),
         retryPolicy, tlsConfig, ozoneConf);
@@ -192,12 +191,6 @@ public final class XceiverClientRatis extends 
XceiverClientSpi {
     }
   }
 
-  @Override
-  public void connect(String encodedToken) throws Exception {
-    throw new UnsupportedOperationException("Block tokens are not " +
-        "implemented for Ratis clients.");
-  }
-
   @Override
   public void close() {
     final RaftClient c = client.getAndSet(null);
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ClientTrustManager.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ClientTrustManager.java
new file mode 100644
index 0000000000..0e297ae005
--- /dev/null
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ClientTrustManager.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.client;
+
+import 
org.apache.hadoop.hdds.security.x509.certificate.client.CACertificateProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.X509ExtendedTrustManager;
+import java.io.IOException;
+import java.net.Socket;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.Arrays;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * A {@link javax.net.ssl.TrustManager} implementation for gRPC and Ratis
+ * clients.
+ *
+ * This TrustManager instance is holding a reference to an externally supplied
+ * TrustManager instance, and forwards all requests to that one.
+ * This class is designed within the context of XceiverClientManager, where
+ * we have the TrustManager initialized based on a ServiceInfo object, and
+ * later on if the root of trust expires we need to refresh the rootCA
+ * certificate for long-running clients to deal with a rootCA rotation if
+ * necessary.
+ *
+ * The broader context where this class is usable is generally any place, where
+ * clients are created based on a factory, and that factory can cache the
+ * initial root of trust in this TrustManager, and a refresh mechanism as
+ * necessary if the root of trust is expected to change for the certificate
+ * of the server side.
+ *
+ * Note that in-memory provider is used to get the initial list of CA
+ * certificates, that will be used to verify server side certificates.
+ * In case of a certificate verification failure, the remote provider is used
+ * to fetch a new list of CA certificates that will be used to verify server
+ * side certificate from then on until the next failure.
+ * Failures expected to happen only after a CA certificate that issued
+ * the certificate of the servers is expired, or when the servers in 
preparation
+ * for this expiration event renewed their certificates with a new CA.
+ *
+ * Important to note that this logic without additional efforts is weak against
+ * a sophisticated attack, and should only be used with extra protection within
+ * the remote provider. In Ozone's case the supplied remote provider is
+ * an OzoneManager client, that verifies the identity of the server side
+ * via Kerberos and expects the other side to be identified as an Ozone 
Manager.
+ *
+ * The checkClientTrusted methods throw Unsupported operation exceptions,
+ * as this TrustManager instance is designed to be used only with client side
+ * SSL channels.
+ */
+public class ClientTrustManager extends X509ExtendedTrustManager {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ClientTrustManager.class);
+
+  private final CACertificateProvider remoteProvider;
+  private X509ExtendedTrustManager trustManager;
+
+  /**
+   * Creates a ClientTrustManager instance based on an in-memory and a remote
+   * trust anchor provider.
+   *
+   * The TrustManager first loads itself utilizing the in-memory provider to
+   * provide a trust anchor (CA certificate) to be present in the trust store.
+   *
+   * Once the trust can not be established it uses the remote trust anchor
+   * provider to refresh the locally known list of certificates.
+   *
+   * Any provider is allowed to be null, but not both.
+   * If any of them is null, then the mechanism will not be used to get the
+   * certificate list to be trusted.
+   *
+   * @param remoteProvider the provider to call once the root of trust has to
+   *                       be renewed potentially as certificate verification
+   *                       failed.
+   * @param inMemoryProvider the initial provider of the trusted certificates.
+   * @throws IOException in case an IO operation fails.
+   */
+  public ClientTrustManager(CACertificateProvider remoteProvider,
+      CACertificateProvider inMemoryProvider)
+      throws IOException {
+    checkArgument(remoteProvider != null || inMemoryProvider != null,
+        "Client trust configuration error, no mechanism present to find the" +
+            " rootCA certificate of the cluster.");
+    this.remoteProvider = remoteProvider;
+    try {
+      initialize(loadCerts(inMemoryProvider));
+    } catch (CertificateException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private void initialize(List<X509Certificate> caCerts)
+      throws CertificateException {
+    try {
+      KeyStore ks = KeyStore.getInstance("jks");
+      ks.load(null);
+
+      for (X509Certificate cert : caCerts) {
+        String serial = cert.getSerialNumber().toString();
+        ks.setCertificateEntry(serial, cert);
+      }
+
+      TrustManagerFactory trustManagerFactory = 
TrustManagerFactory.getInstance(
+          TrustManagerFactory.getDefaultAlgorithm());
+      trustManagerFactory.init(ks);
+      trustManager = Arrays.stream(trustManagerFactory.getTrustManagers())
+          .filter(tm -> tm instanceof X509ExtendedTrustManager)
+          .map(tm -> (X509ExtendedTrustManager) tm)
+          .findFirst()
+          .orElse(null);
+      if (trustManager == null) {
+        throw new GeneralSecurityException("Could not load TrustManager.");
+      }
+    } catch (GeneralSecurityException | IOException e) {
+      throw new CertificateException(e);
+    }
+  }
+
+  private List<X509Certificate> loadCerts(CACertificateProvider 
caCertsProvider)
+      throws CertificateException {
+    try {
+      LOG.info("Loading certificates for client.");
+      if (caCertsProvider == null) {
+        return remoteProvider.provideCACerts();
+      }
+      return caCertsProvider.provideCACerts();
+    } catch (IOException e) {
+      throw new CertificateException(e);
+    }
+  }
+
+  @Override
+  public void checkServerTrusted(X509Certificate[] chain, String authType,
+      Socket socket) throws CertificateException {
+    try {
+      trustManager.checkServerTrusted(chain, authType, socket);
+    } catch (CertificateException e) {
+      LOG.info("CheckServerTrusted call failed, trying to re-fetch " +
+          "rootCA certificate", e);
+      initialize(loadCerts(remoteProvider));
+      trustManager.checkServerTrusted(chain, authType, socket);
+    }
+  }
+
+  @Override
+  public void checkServerTrusted(X509Certificate[] chain, String authType,
+      SSLEngine engine) throws CertificateException {
+    try {
+      trustManager.checkServerTrusted(chain, authType, engine);
+    } catch (CertificateException e) {
+      LOG.info("CheckServerTrusted call failed, trying to re-fetch " +
+          "rootCA certificate", e);
+      initialize(loadCerts(remoteProvider));
+      trustManager.checkServerTrusted(chain, authType, engine);
+    }
+  }
+
+  @Override
+  public void checkServerTrusted(X509Certificate[] chain, String authType)
+      throws CertificateException {
+    try {
+      trustManager.checkServerTrusted(chain, authType);
+    } catch (CertificateException e) {
+      LOG.info("CheckServerTrusted call failed, trying to re-fetch " +
+          "rootCA certificate", e);
+      initialize(loadCerts(remoteProvider));
+      trustManager.checkServerTrusted(chain, authType);
+    }
+  }
+
+  @Override
+  public X509Certificate[] getAcceptedIssuers() {
+    return trustManager.getAcceptedIssuers();
+  }
+
+  @Override
+  public void checkClientTrusted(X509Certificate[] chain, String authType,
+      Socket socket) throws CertificateException {
+    throw new CertificateException(
+        new UnsupportedOperationException("ClientTrustManager should not" +
+            " be used as a trust manager of a server socket."));
+  }
+
+  @Override
+  public void checkClientTrusted(X509Certificate[] chain, String authType,
+      SSLEngine engine) throws CertificateException {
+    throw new CertificateException(
+        new UnsupportedOperationException("ClientTrustManager should not" +
+            " be used as a trust manager of a server socket."));
+  }
+
+  @Override
+  public void checkClientTrusted(X509Certificate[] chain, String authType)
+      throws CertificateException {
+    throw new CertificateException(
+        new UnsupportedOperationException("ClientTrustManager should not" +
+            " be used as a trust manager of a server socket."));
+  }
+}
diff --git 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
index 0c31a4941b..a2a0832bdb 100644
--- 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
+++ 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
@@ -135,11 +135,6 @@ public class TestBlockOutputStreamCorrectness {
 
     }
 
-    @Override
-    public void connect(String encodedToken) {
-
-    }
-
     @Override
     public void close() {
 
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
index e543c39c95..b1d3e98e99 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdds.ratis;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.security.cert.X509Certificate;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -69,6 +68,8 @@ import org.apache.ratis.util.JvmPauseMonitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.net.ssl.TrustManager;
+
 /**
  * Ratis helper methods.
  */
@@ -406,11 +407,10 @@ public final class RatisHelper {
   // For External gRPC client to server with gRPC TLS.
   // No mTLS for external client as SCM CA does not issued certificates for 
them
   public static GrpcTlsConfig createTlsClientConfig(SecurityConfig conf,
-      List<X509Certificate> caCerts) {
+      TrustManager trustManager) {
     GrpcTlsConfig tlsConfig = null;
     if (conf.isSecurityEnabled() && conf.isGrpcTlsEnabled()) {
-      tlsConfig = new GrpcTlsConfig(null, null,
-          caCerts, false);
+      tlsConfig = new GrpcTlsConfig(null, trustManager, false);
     }
     return tlsConfig;
   }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
index 24137d762d..71d309dee6 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
@@ -91,12 +91,6 @@ public abstract class XceiverClientSpi implements Closeable {
    */
   public abstract void connect() throws Exception;
 
-  /**
-   * Connects to the leader in the pipeline using encoded token. To be used
-   * in a secure cluster.
-   */
-  public abstract void connect(String encodedToken) throws Exception;
-
   @Override
   public abstract void close();
 
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CACertificateProvider.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CACertificateProvider.java
new file mode 100644
index 0000000000..4e600ef3fa
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CACertificateProvider.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.hdds.security.x509.certificate.client;
+
+import java.io.IOException;
+import java.security.cert.X509Certificate;
+import java.util.List;
+
+/**
+ * An interface that defines a trust anchor provider API this class relies on.
+ */
+@FunctionalInterface
+public interface CACertificateProvider {
+  List<X509Certificate> provideCACerts() throws IOException;
+}
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 01fa7c8a52..4f0f800dfd 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -19,9 +19,6 @@
 package org.apache.hadoop.ozone;
 
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.ratis.thirdparty.io.grpc.Context;
-import org.apache.ratis.thirdparty.io.grpc.Metadata;
 
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
@@ -29,8 +26,6 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.regex.Pattern;
 
-import static 
org.apache.ratis.thirdparty.io.grpc.Metadata.ASCII_STRING_MARSHALLER;
-
 /**
  * Set of constants used in Ozone implementation.
  */
@@ -383,16 +378,6 @@ public final class OzoneConsts {
   // https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
   public static final int MAXIMUM_NUMBER_OF_PARTS_PER_UPLOAD = 10000;
 
-  // GRPC block token metadata header and context key
-  public static final String OZONE_BLOCK_TOKEN = "blocktoken";
-  public static final Context.Key<UserGroupInformation> UGI_CTX_KEY =
-      Context.key("UGI");
-
-  public static final Metadata.Key<String> OBT_METADATA_KEY =
-      Metadata.Key.of(OZONE_BLOCK_TOKEN, ASCII_STRING_MARSHALLER);
-  public static final Metadata.Key<String> USER_METADATA_KEY =
-      Metadata.Key.of(OZONE_USER, ASCII_STRING_MARSHALLER);
-
   public static final String RPC_PORT = "RPC";
 
   // Default OMServiceID for OM Ratis servers to use as RaftGroupId
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECContainerOperationClient.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECContainerOperationClient.java
index a8ce297396..b862f832d7 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECContainerOperationClient.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECContainerOperationClient.java
@@ -25,11 +25,14 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
+import 
org.apache.hadoop.hdds.security.x509.certificate.client.CACertificateProvider;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
 import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.hdds.utils.HAUtils;
+import org.apache.hadoop.ozone.OzoneSecurityUtil;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -69,12 +72,18 @@ public class ECContainerOperationClient implements 
Closeable {
   private static XceiverClientManager createClientManager(
       ConfigurationSource conf, CertificateClient certificateClient)
       throws IOException {
+    ClientTrustManager trustManager = null;
+    if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
+      CACertificateProvider localCaCerts =
+          () -> HAUtils.buildCAX509List(certificateClient, conf);
+      CACertificateProvider remoteCacerts =
+          () -> HAUtils.buildCAX509List(null, conf);
+      trustManager = new ClientTrustManager(remoteCacerts, localCaCerts);
+    }
     return new XceiverClientManager(conf,
         new XceiverClientManager.XceiverClientManagerConfigBuilder()
             .setMaxCacheSize(256).setStaleThresholdMs(10 * 1000).build(),
-        certificateClient != null ?
-            HAUtils.buildCAX509List(certificateClient, conf) :
-            null);
+        trustManager);
   }
 
   public BlockData[] listBlock(long containerId, DatanodeDetails dn,
diff --git 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClientTestImpl.java
 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClientTestImpl.java
index 7ba14ce491..66d6ef893d 100644
--- 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClientTestImpl.java
+++ 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClientTestImpl.java
@@ -349,7 +349,6 @@ public class CertificateClientTestImpl implements 
CertificateClient {
     x509Certificate = newX509Certificate;
     certificateMap.put(x509Certificate.getSerialNumber().toString(),
         x509Certificate);
-    System.out.println(new Date() + " certificated is renewed");
 
     // notify notification receivers
     notificationReceivers.forEach(r -> r.notifyCertificateRenewed(this,
diff --git 
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
 
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
index 13e8aa77af..7aa91cec73 100644
--- 
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
+++ 
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
@@ -35,6 +35,8 @@ import org.apache.hadoop.hdds.scm.DatanodeAdminError;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
+import 
org.apache.hadoop.hdds.security.x509.certificate.client.CACertificateProvider;
 import org.apache.hadoop.hdds.scm.client.ScmClient;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
@@ -53,7 +55,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.security.cert.X509Certificate;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -113,11 +114,10 @@ public class ContainerOperationClient implements 
ScmClient {
       throws IOException {
     XceiverClientManager manager;
     if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
-      List<X509Certificate> caCertificates =
-          HAUtils.buildCAX509List(null, conf);
+      CACertificateProvider caCerts = () -> HAUtils.buildCAX509List(null, 
conf);
       manager = new XceiverClientManager(conf,
           conf.getObject(XceiverClientManager.ScmClientConfig.class),
-          caCertificates);
+          new ClientTrustManager(caCerts, null));
     } else {
       manager = new XceiverClientManager(conf);
     }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 6aed19fb9d..5302c88520 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -48,6 +48,8 @@ import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
+import 
org.apache.hadoop.hdds.security.x509.certificate.client.CACertificateProvider;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
@@ -151,8 +153,6 @@ import java.io.OutputStream;
 import java.net.URI;
 import java.security.InvalidKeyException;
 import java.security.PrivilegedExceptionAction;
-import java.security.cert.CertificateException;
-import java.security.cert.X509Certificate;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
@@ -244,7 +244,6 @@ public class RpcClient implements ClientProtocol {
         ozoneManagerProtocolClientSideTranslatorPB,
         OzoneManagerClientProtocol.class, conf);
     dtService = omTransport.getDelegationTokenService();
-    List<X509Certificate> x509Certificates = null;
     ServiceInfoEx serviceInfoEx = ozoneManagerClient.getServiceInfo();
     omVersion = getOmVersion(serviceInfoEx);
     if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
@@ -271,26 +270,9 @@ public class RpcClient implements ClientProtocol {
               + " meet the criteria.");
         }
       }
-      String caCertPem = null;
-      List<String> caCertPems = null;
-      caCertPem = serviceInfoEx.getCaCertificate();
-      caCertPems = serviceInfoEx.getCaCertPemList();
-      if (caCertPems == null || caCertPems.isEmpty()) {
-        if (caCertPem == null) {
-          LOG.error("RpcClient received empty caCertPems from serviceInfo");
-          CertificateException ex = new CertificateException(
-              "No caCerts found; caCertPem can" +
-                  " not be null when caCertPems is empty or null"
-          );
-          throw new IOException(ex);
-        }
-        caCertPems = Collections.singletonList(caCertPem);
-      }
-      x509Certificates = OzoneSecurityUtil.convertToX509(caCertPems);
     }
 
-    this.xceiverClientManager =
-        createXceiverClientFactory(x509Certificates);
+    this.xceiverClientManager = createXceiverClientFactory(serviceInfoEx);
 
     unsafeByteBufferConversion = conf.getBoolean(
         OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED,
@@ -381,10 +363,16 @@ public class RpcClient implements ClientProtocol {
   @NotNull
   @VisibleForTesting
   protected XceiverClientFactory createXceiverClientFactory(
-      List<X509Certificate> x509Certificates) throws IOException {
+      ServiceInfoEx serviceInfo) throws IOException {
+    ClientTrustManager trustManager = null;
+    if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
+      CACertificateProvider remoteCAProvider =
+          () -> ozoneManagerClient.getServiceInfo().provideCACerts();
+      trustManager = new ClientTrustManager(remoteCAProvider, serviceInfo);
+    }
     return new XceiverClientManager(conf,
         conf.getObject(XceiverClientManager.ScmClientConfig.class),
-        x509Certificates);
+        trustManager);
   }
 
   @VisibleForTesting
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
index 49eee44f6e..59eb49e555 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
@@ -64,11 +64,6 @@ public class MockXceiverClientSpi extends XceiverClientSpi {
 
   }
 
-  @Override
-  public void connect(String encodedToken) throws Exception {
-
-  }
-
   @Override
   public void close() {
 
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java
index c7a09cd2ce..ac879b1543 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.client.rpc.RpcClient;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
+import org.apache.hadoop.ozone.om.helpers.ServiceInfoEx;
 import org.apache.hadoop.ozone.om.protocolPB.OmTransport;
 import org.apache.ozone.test.LambdaTestUtils.VoidCallable;
 import org.jetbrains.annotations.NotNull;
@@ -41,10 +42,8 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
-import java.security.cert.X509Certificate;
 import java.time.Instant;
 import java.util.HashMap;
-import java.util.List;
 import java.util.UUID;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
@@ -83,15 +82,14 @@ public class TestOzoneClient {
     client = new OzoneClient(config, new RpcClient(config, null) {
 
       @Override
-      protected OmTransport createOmTransport(String omServiceId)
-          throws IOException {
+      protected OmTransport createOmTransport(String omServiceId) {
         return new MockOmTransport(blkAllocator);
       }
 
       @NotNull
       @Override
       protected XceiverClientFactory createXceiverClientFactory(
-          List<X509Certificate> x509Certificates) throws IOException {
+          ServiceInfoEx serviceInfo) {
         return new MockXceiverClientFactory();
       }
     });
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
index 1981816d3e..3cfab0f94b 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.client.rpc.RpcClient;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.helpers.ServiceInfoEx;
 import org.apache.hadoop.ozone.om.protocolPB.OmTransport;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.ozone.erasurecode.rawcoder.RSRawErasureCoderFactory;
@@ -54,7 +55,6 @@ import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.security.cert.X509Certificate;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -108,14 +108,13 @@ public class TestOzoneECClient {
     client = new OzoneClient(config, new RpcClient(config, null) {
 
       @Override
-      protected OmTransport createOmTransport(String omServiceId)
-          throws IOException {
+      protected OmTransport createOmTransport(String omServiceId) {
         return transport;
       }
 
       @Override
       protected XceiverClientFactory createXceiverClientFactory(
-          List<X509Certificate> x509Certificates) throws IOException {
+          ServiceInfoEx serviceInfo) {
         return factoryStub;
       }
     });
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/checksum/TestReplicatedFileChecksumHelper.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/checksum/TestReplicatedFileChecksumHelper.java
index a237380db9..5099bdb2ce 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/checksum/TestReplicatedFileChecksumHelper.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/checksum/TestReplicatedFileChecksumHelper.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.ozone.client.rpc.RpcClient;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.helpers.ServiceInfoEx;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
 import org.apache.hadoop.ozone.om.protocolPB.OmTransport;
 import org.apache.hadoop.util.DataChecksum;
@@ -59,7 +60,6 @@ import org.mockito.ArgumentMatchers;
 import org.mockito.Mockito;
 
 import java.io.IOException;
-import java.security.cert.X509Certificate;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -97,16 +97,14 @@ public class TestReplicatedFileChecksumHelper {
 
       @Override
       protected OmTransport createOmTransport(
-          String omServiceId)
-          throws IOException {
+          String omServiceId) {
         return new MockOmTransport();
       }
 
       @NotNull
       @Override
       protected XceiverClientFactory createXceiverClientFactory(
-          List<X509Certificate> x509Certificates)
-          throws IOException {
+          ServiceInfoEx serviceInfo) {
         return new MockXceiverClientFactory();
       }
     };
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/ServiceInfoEx.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/ServiceInfoEx.java
index e7968b890e..d102bdac14 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/ServiceInfoEx.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/ServiceInfoEx.java
@@ -18,19 +18,26 @@
 
 package org.apache.hadoop.ozone.om.helpers;
 
+import 
org.apache.hadoop.hdds.security.x509.certificate.client.CACertificateProvider;
+import org.apache.hadoop.ozone.OzoneSecurityUtil;
+
+import java.io.IOException;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.Collections;
 import java.util.List;
 
 /**
  * Wrapper class for service discovery, design for broader usage such as
  * security, etc.
  */
-public class ServiceInfoEx {
+public class ServiceInfoEx implements CACertificateProvider {
 
-  private List<ServiceInfo> infoList;
+  private final List<ServiceInfo> infoList;
 
   // PEM encoded string of SCM CA certificate.
-  private String caCertificate;
-  private List<String> caCertPemList;
+  private final String caCertificate;
+  private final List<String> caCertPemList;
 
   public ServiceInfoEx(List<ServiceInfo> infoList,
       String caCertificate, List<String> caCertPemList) {
@@ -50,4 +57,25 @@ public class ServiceInfoEx {
   public List<String> getCaCertPemList() {
     return caCertPemList;
   }
+
+  @Override
+  public List<X509Certificate> provideCACerts() throws IOException {
+    String caCertPem = getCaCertificate();
+    List<String> caCertPems = getCaCertPemList();
+    if (caCertPems == null || caCertPems.isEmpty()) {
+      if (caCertPem == null) {
+        throw new IOException(new CertificateException(
+            "No caCerts found; caCertPem can" +
+                " not be null when caCertPems is empty or null"
+        ));
+      }
+      // In OM, if caCertPem is null, then it becomes empty string on the
+      // client side, in this case we do not want to add it to the caCertPems
+      // list. This happens during testing.
+      if (!caCertPem.isEmpty()) {
+        caCertPems = Collections.singletonList(caCertPem);
+      }
+    }
+    return OzoneSecurityUtil.convertToX509(caCertPems);
+  }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
index 06426ca922..841f344fc3 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
@@ -24,79 +24,94 @@ import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
-import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientManager.ScmClientConfig;
+import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
 import org.apache.hadoop.hdds.security.symmetric.SecretKeyClient;
 import org.apache.hadoop.hdds.security.token.ContainerTokenIdentifier;
 import org.apache.hadoop.hdds.security.token.ContainerTokenSecretManager;
 import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClientTestImpl;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.client.SecretKeyTestClient;
-import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.ozone.container.replication.ContainerDownloader;
+import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
-import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ozone.test.GenericTestUtils.LogCapturer;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
-import org.junit.jupiter.api.Timeout;
 
 import java.io.IOException;
 import java.nio.file.Path;
 import java.security.cert.CertificateExpiredException;
-import java.time.Duration;
+import java.security.cert.X509Certificate;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
+import java.util.ArrayList;
 import java.util.Date;
+import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.TimeUnit;
 
+import static java.time.Duration.ofSeconds;
 import static java.util.Collections.singletonList;
-import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_KEY_LEN;
-import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_CA_ROTATION_ACK_TIMEOUT;
-import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_CA_ROTATION_CHECK_INTERNAL;
+import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_EXPIRY_TIME;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_TLS_ENABLED;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_TLS_TEST_CERT;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_DEFAULT_DURATION;
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_GRACE_DURATION_TOKEN_CHECKS_ENABLED;
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_RENEW_GRACE_DURATION;
 import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
+import static org.apache.hadoop.hdds.scm.container.ContainerID.valueOf;
+import static org.apache.hadoop.hdds.scm.pipeline.MockPipeline.createPipeline;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.HDDS_DATANODE_CONTAINER_DB_DIR;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
+import static 
org.apache.hadoop.ozone.container.ContainerTestHelper.getCloseContainer;
+import static 
org.apache.hadoop.ozone.container.ContainerTestHelper.getCreateContainerSecureRequest;
 import static 
org.apache.hadoop.ozone.container.ContainerTestHelper.getTestContainerID;
-import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.getMockContext;
 import static 
org.apache.hadoop.ozone.container.replication.CopyContainerCompression.NO_COMPRESSION;
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.MatcherAssert.assertThat;
+import static org.apache.ozone.test.GenericTestUtils.LogCapturer.captureLogs;
+import static org.apache.ozone.test.GenericTestUtils.waitFor;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.slf4j.LoggerFactory.getLogger;
 
 /**
  * Tests ozone containers via secure grpc/netty.
  */
 @Timeout(300)
-class TestOzoneContainerWithTLS {
+public class TestOzoneContainerWithTLS {
 
-  @TempDir
-  private Path tempFolder;
+  private static final int CERT_LIFETIME = 10; // seconds
+  private static final int ROOT_CERT_LIFE_TIME = 20; // seconds
 
+  private String clusterID;
   private OzoneConfiguration conf;
-  private ContainerTokenSecretManager secretManager;
+  private DatanodeDetails dn;
+  private Pipeline pipeline;
   private CertificateClientTestImpl caClient;
-  private SecretKeyClient secretKeyClient;
-  private final Duration certLifetime = Duration.ofSeconds(15);
+  private SecretKeyClient keyClient;
+  private ContainerTokenSecretManager secretManager;
+
+  @TempDir
+  private Path tempFolder;
 
   @BeforeEach
-  void setup() throws Exception {
+  public void setup() throws Exception {
     conf = new OzoneConfiguration();
 
     conf.set(OZONE_METADATA_DIRS,
@@ -107,73 +122,48 @@ class TestOzoneContainerWithTLS {
         tempFolder.resolve("containers").toString());
 
     conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
-    conf.setBoolean(HddsConfigKeys.HDDS_GRPC_TLS_ENABLED, true);
-
-    conf.setBoolean(HddsConfigKeys.HDDS_GRPC_TLS_TEST_CERT, true);
+    conf.setBoolean(HDDS_GRPC_TLS_ENABLED, true);
+    conf.setBoolean(HDDS_GRPC_TLS_TEST_CERT, true);
     conf.setBoolean(HDDS_X509_GRACE_DURATION_TOKEN_CHECKS_ENABLED, false);
-    conf.setInt(HDDS_KEY_LEN, 1024);
-
-    conf.set(HDDS_X509_DEFAULT_DURATION, certLifetime.toString());
-    conf.set(HDDS_X509_RENEW_GRACE_DURATION, "PT2S");
-    conf.set(HDDS_X509_CA_ROTATION_CHECK_INTERNAL, "PT1S"); // 1s
-    conf.set(HDDS_X509_CA_ROTATION_ACK_TIMEOUT, "PT1S"); // 1s
-
-    long expiryTime = conf.getTimeDuration(
-        HddsConfigKeys.HDDS_BLOCK_TOKEN_EXPIRY_TIME, "1s",
-        TimeUnit.MILLISECONDS);
-
-    caClient = new CertificateClientTestImpl(conf);
-    secretKeyClient = new SecretKeyTestClient();
-    secretManager = new ContainerTokenSecretManager(expiryTime,
-        secretKeyClient);
+    conf.set(HDDS_X509_DEFAULT_DURATION, ofSeconds(CERT_LIFETIME).toString());
+    conf.set(HddsConfigKeys.HDDS_X509_MAX_DURATION,
+        ofSeconds(ROOT_CERT_LIFE_TIME).toString());
+    conf.set(HDDS_X509_RENEW_GRACE_DURATION, ofSeconds(2).toString());
+    conf.setInt(HDDS_BLOCK_TOKEN_EXPIRY_TIME, 1000);
+
+    clusterID = UUID.randomUUID().toString();
+    caClient = new CertificateClientTestImpl(conf, false);
+    keyClient = new SecretKeyTestClient();
+    secretManager = new ContainerTokenSecretManager(1000, keyClient);
+
+    dn = aDatanode();
+    pipeline = createPipeline(singletonList(dn));
   }
 
   @Test
-  void testCertificateLifetime() {
-    Date afterExpiry = Date.from(LocalDateTime.now()
-        .plus(certLifetime)
+  public void testCertificateLifetime() {
+    Date atTimeAfterExpiry = Date.from(LocalDateTime.now()
+        .plusSeconds(CERT_LIFETIME)
         .atZone(ZoneId.systemDefault())
         .toInstant());
 
     assertThrows(CertificateExpiredException.class,
-        () -> caClient.getCertificate().checkValidity(afterExpiry));
+        () -> caClient.getCertificate().checkValidity(atTimeAfterExpiry));
   }
 
-  @ParameterizedTest
-  @ValueSource(booleans = {true, false})
-  void createContainer(boolean containerTokenEnabled) throws Exception {
+  @ParameterizedTest(name = "Container token enabled: {0}")
+  @ValueSource(booleans = {false, true})
+  public void createContainer(boolean containerTokenEnabled)
+      throws Exception {
     conf.setBoolean(HddsConfigKeys.HDDS_CONTAINER_TOKEN_ENABLED,
         containerTokenEnabled);
+    OzoneContainer container = createAndStartOzoneContainerInstance();
 
-    final long containerId = getTestContainerID();
-    DatanodeDetails dn = MockDatanodeDetails.randomDatanodeDetails();
-    OzoneContainer container = null;
-    try {
-      Pipeline pipeline = MockPipeline.createSingleNodePipeline();
-      conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
-          pipeline.getFirstNode().getPort(DatanodeDetails.Port.Name.STANDALONE)
-              .getValue());
-      conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, false);
-
-      container = new OzoneContainer(dn, conf, getMockContext(dn, conf),
-          caClient, secretKeyClient);
-      //Set scmId and manually start ozone container.
-      container.start(UUID.randomUUID().toString());
-
-      try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf,
-          singletonList(caClient.getCACertificate()))) {
-
-        if (containerTokenEnabled) {
-          client.connect();
-          createSecureContainer(client, containerId,
-              secretManager.generateToken(
-                  UserGroupInformation.getCurrentUser().getUserName(),
-                  ContainerID.valueOf(containerId)));
-        } else {
-          client.connect();
-          createContainer(client, containerId);
-        }
-      }
+    try (XceiverClientGrpc client =
+             new XceiverClientGrpc(pipeline, conf, aClientTrustManager())) {
+      client.connect();
+
+      createContainer(client, containerTokenEnabled, getTestContainerID());
     } finally {
       if (container != null) {
         container.stop();
@@ -181,134 +171,224 @@ class TestOzoneContainerWithTLS {
     }
   }
 
-  @ParameterizedTest
-  @ValueSource(booleans = {true, false})
-  void downloadContainer(boolean tokenEnabled) throws Exception {
-    DatanodeDetails dn = MockDatanodeDetails.createDatanodeDetails(
-        UUID.randomUUID().toString(), "localhost", "0.0.0.0",
-        "/default-rack");
-    Pipeline pipeline = MockPipeline.createSingleNodePipeline();
-    conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
-        pipeline.getFirstNode().getPort(DatanodeDetails.Port.Name.STANDALONE)
-            .getValue());
-    conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, false);
+  @ParameterizedTest(name = "Container token enabled: {0}")
+  @ValueSource(booleans = {false, true})
+  public void downloadContainer(boolean containerTokenEnabled)
+      throws Exception {
+    conf.setBoolean(HddsConfigKeys.HDDS_CONTAINER_TOKEN_ENABLED,
+        containerTokenEnabled);
+    OzoneContainer container = createAndStartOzoneContainerInstance();
 
-    OzoneContainer container = null;
+    ScmClientConfig scmClientConf = conf.getObject(ScmClientConfig.class);
+    XceiverClientManager clientManager =
+        new XceiverClientManager(conf, scmClientConf, aClientTrustManager());
+    XceiverClientSpi client = null;
     try {
-      container = new OzoneContainer(dn, conf,
-          getMockContext(dn, conf), caClient, secretKeyClient);
-
-      // Set scmId and manually start ozone container.
-      container.start(UUID.randomUUID().toString());
-
-      // Create containers
-      try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf,
-          singletonList(caClient.getCACertificate()))) {
-        client.connect();
+      client = clientManager.acquireClient(pipeline);
+      // at this point we have an established connection from the client to
+      // the container, and we do not expect a new SSL handshake while we are
+      // running container ops until the renewal, however it may happen, as
+      // the protocol can do a renegotiation at any time, so this dynamic
+      // introduces a very low chance of flakiness.
+      // The downloader client when it connects first, will do a failing
+      // handshake that we are expecting because before downloading, we wait
+      // for the expiration without renewing the certificate.
+      List<Long> containers = new ArrayList<>();
+      List<DatanodeDetails> sourceDatanodes = new ArrayList<>();
+      sourceDatanodes.add(dn);
+
+      containers.add(createAndCloseContainer(client, containerTokenEnabled));
+      letCertExpire();
+      containers.add(createAndCloseContainer(client, containerTokenEnabled));
+      assertDownloadContainerFails(containers.get(0), sourceDatanodes);
+
+      caClient.renewKey();
+      containers.add(createAndCloseContainer(client, containerTokenEnabled));
+      assertDownloadContainerWorks(containers, sourceDatanodes);
+    } finally {
+      if (container != null) {
+        container.stop();
+      }
+      if (client != null) {
+        clientManager.releaseClient(client, true);
+      }
+    }
+  }
 
-        final long containerId = getTestContainerID();
-        createAndCloseContainer(tokenEnabled, containerId, client);
+  @Test
+  public void testLongLivingClientWithCertRenews() throws Exception {
+    LogCapturer logs = captureLogs(getLogger(ClientTrustManager.class));
+    OzoneContainer container = createAndStartOzoneContainerInstance();
 
-        // Wait certificate to expire
-        GenericTestUtils.waitFor(
-            () -> caClient.getCertificate().getNotAfter().before(new Date()),
-            100, (int) certLifetime.toMillis());
+    ScmClientConfig scmClientConf = conf.getObject(ScmClientConfig.class);
+    scmClientConf.setStaleThreshold(500);
+    XceiverClientManager clientManager =
+        new XceiverClientManager(conf, scmClientConf, aClientTrustManager());
+    assertClientTrustManagerLoading(true, logs, "Client loaded certificates.");
 
-        // old client still function well after certificate expired
-        createAndCloseContainer(tokenEnabled, getTestContainerID(), client);
+    XceiverClientSpi client = null;
+    try {
+      client = clientManager.acquireClient(pipeline);
+      createAndCloseContainer(client, false);
+      assertClientTrustManagerLoading(false, logs,
+          "Check client did not reloaded certificates.");
+
+      letCACertExpire();
+
+      // works based off of initial SSL handshake
+      createAndCloseContainer(client, false);
+      assertClientTrustManagerLoading(false, logs,
+          "Check client did not reloaded certificates.");
+
+      clientManager.releaseClient(client, true);
+      client = clientManager.acquireClient(pipeline);
+      assertClientTrustManagerLoading(false, logs,
+          "Check second client creation does not reload certificates.");
+      // aim is to only load certs at first start, and in case of a 
verification
+      // failure
+
+      try {
+        // fails after a new client is created, as we just have expired certs
+        // in the source (caClient), so container ops still fail even though
+        // a retry/reload happens, but we expect it to happen before the 
failure
+        createAndCloseContainer(client, false);
+      } catch (Throwable e) {
+        assertClientTrustManagerFailedAndRetried(logs);
+        while (e.getCause() != null) {
+          e = e.getCause();
+        }
+        assertTrue((e instanceof CertificateExpiredException));
+      } finally {
+        clientManager.releaseClient(client, true);
+      }
 
-        // Download newly created container will fail because of cert expired
-        GenericTestUtils.LogCapturer logCapture = GenericTestUtils.LogCapturer
-            .captureLogs(SimpleContainerDownloader.LOG);
-        assertNull(downloadContainer(containerId, dn));
-        assertThat(logCapture.getOutput(),
-            containsString(CertificateExpiredException.class.getName()));
+      client = clientManager.acquireClient(pipeline);
+      caClient.renewRootCA();
+      caClient.renewKey();
+      Thread.sleep(1000); // wait for reloading trust managers to reload
 
-        // Renew the certificate
-        caClient.renewKey();
+      createAndCloseContainer(client, false);
+      assertClientTrustManagerFailedAndRetried(logs);
 
-        // old client still function well after certificate renewed
-        createAndCloseContainer(tokenEnabled, getTestContainerID(), client);
+    } finally {
+      if (client != null) {
+        clientManager.releaseClient(client, true);
+      }
+      if (container != null) {
+        container.stop();
+      }
+    }
+  }
 
-        // Wait keyManager and trustManager to reload
-        Thread.sleep(2000); // TODO replace
+  private void assertClientTrustManagerLoading(
+      boolean happened, LogCapturer logs, String msg) {
+    String loadedMsg = "Loading certificates for client.";
+    assertEquals(happened, logs.getOutput().contains(loadedMsg), msg);
+    logs.clearOutput();
+  }
 
-        // old client still function well after certificate reload
-        createAndCloseContainer(tokenEnabled, getTestContainerID(), client);
+  private void assertClientTrustManagerFailedAndRetried(LogCapturer logs) {
+    assertTrue(logs.getOutput().contains("trying to re-fetch rootCA"),
+        "Check client failed first, and initiates a reload.");
+    assertTrue(logs.getOutput().contains("Loading certificates for client."),
+        "Check client loaded certificates.");
+    logs.clearOutput();
+  }
 
-        // Download container should succeed after key and cert renewed
-        assertNotNull(downloadContainer(containerId, dn));
-      }
-    } finally {
+  private OzoneContainer createAndStartOzoneContainerInstance() {
+    OzoneContainer container = null;
+    try {
+      StateContext stateContext = ContainerTestUtils.getMockContext(dn, conf);
+      container = new OzoneContainer(
+          dn, conf, stateContext, caClient, keyClient);
+      container.start(clusterID);
+    } catch (Throwable e) {
       if (container != null) {
         container.stop();
       }
-      // TODO delete leftover hadoop-ozone/integration-test/container.db
+      fail(e);
     }
+    return container;
   }
 
-  private Path downloadContainer(long containerId, DatanodeDetails source)
-      throws IOException {
-    try (ContainerDownloader downloader = new SimpleContainerDownloader(
-        conf, caClient)) {
-      return downloader.getContainerDataFromReplicas(containerId,
-          singletonList(source), tempFolder.resolve("tmp"), NO_COMPRESSION);
-    }
+  private void assertDownloadContainerFails(long containerId,
+      List<DatanodeDetails> sourceDatanodes) {
+    LogCapturer logCapture = captureLogs(SimpleContainerDownloader.LOG);
+    SimpleContainerDownloader downloader =
+        new SimpleContainerDownloader(conf, caClient);
+    Path file = downloader.getContainerDataFromReplicas(containerId,
+        sourceDatanodes, tempFolder.resolve("tmp"), NO_COMPRESSION);
+    downloader.close();
+    assertNull(file);
+    assertTrue(logCapture.getOutput().contains(
+        "java.security.cert.CertificateExpiredException"));
   }
 
-  private void createAndCloseContainer(boolean containerTokenEnabled,
-      long containerId, XceiverClientGrpc client) throws Exception {
-    if (containerTokenEnabled) {
-      Token<ContainerTokenIdentifier> token = secretManager.generateToken(
-          UserGroupInformation.getCurrentUser().getUserName(),
-          ContainerID.valueOf(containerId));
-      createSecureContainer(client, containerId, token);
-      closeSecureContainer(client, containerId, token);
-    } else {
-      createContainer(client, containerId);
-      closeContainer(client, containerId);
+  private void assertDownloadContainerWorks(List<Long> containers,
+      List<DatanodeDetails> sourceDatanodes) {
+    for (Long cId : containers) {
+      SimpleContainerDownloader downloader =
+          new SimpleContainerDownloader(conf, caClient);
+      Path file = downloader.getContainerDataFromReplicas(cId, sourceDatanodes,
+          tempFolder.resolve("tmp"), NO_COMPRESSION);
+      downloader.close();
+      assertNotNull(file);
     }
   }
 
-  public static void createContainer(XceiverClientSpi client,
-      long containerID) throws Exception {
-    ContainerCommandRequestProto request = ContainerTestHelper
-        .getCreateContainerRequest(containerID, client.getPipeline());
+  private Token<ContainerTokenIdentifier> createContainer(
+      XceiverClientSpi client, boolean useToken, long id) throws IOException {
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    Token<ContainerTokenIdentifier> token = useToken
+        ? secretManager.generateToken(ugi.getUserName(), valueOf(id))
+        : null;
+
+    ContainerCommandRequestProto request =
+        getCreateContainerSecureRequest(id, client.getPipeline(), token);
     ContainerCommandResponseProto response = client.sendCommand(request);
     assertNotNull(response);
-    assertSame(ContainerProtos.Result.SUCCESS, response.getResult());
+    assertSame(response.getResult(), ContainerProtos.Result.SUCCESS);
+    return token;
   }
 
-  public static void createSecureContainer(XceiverClientSpi client,
-      long containerID, Token<ContainerTokenIdentifier> token)
-      throws Exception {
-    ContainerCommandRequestProto request =
-        ContainerTestHelper.getCreateContainerSecureRequest(
-            containerID, client.getPipeline(), token);
-    ContainerCommandResponseProto response =
-        client.sendCommand(request);
-    assertNotNull(response);
-    assertSame(ContainerProtos.Result.SUCCESS, response.getResult());
+  private long createAndCloseContainer(
+      XceiverClientSpi client, boolean useToken) {
+    long id = getTestContainerID();
+    try {
+      Token<ContainerTokenIdentifier>
+          token = createContainer(client, useToken, id);
+
+      ContainerCommandRequestProto request =
+          getCloseContainer(client.getPipeline(), id, token);
+      ContainerCommandResponseProto response = client.sendCommand(request);
+      assertNotNull(response);
+      assertSame(response.getResult(), ContainerProtos.Result.SUCCESS);
+    } catch (Exception e) {
+      Assertions.fail(e);
+    }
+    return id;
   }
 
-  public static void closeContainer(XceiverClientSpi client,
-      long containerID) throws Exception {
-    ContainerCommandRequestProto request = ContainerTestHelper
-        .getCloseContainer(client.getPipeline(), containerID);
-    ContainerCommandResponseProto response = client.sendCommand(request);
-    assertNotNull(response);
-    assertSame(ContainerProtos.Result.SUCCESS, response.getResult());
+  private void letCertExpire() throws Exception {
+    Date expiry = caClient.getCertificate().getNotAfter();
+    waitFor(() -> expiry.before(new Date()), 100, CERT_LIFETIME * 1000);
   }
 
-  public static void closeSecureContainer(XceiverClientSpi client,
-      long containerID, Token<ContainerTokenIdentifier> token)
-      throws Exception {
-    ContainerCommandRequestProto request =
-        ContainerTestHelper.getCloseContainer(client.getPipeline(),
-            containerID, token);
-    ContainerCommandResponseProto response =
-        client.sendCommand(request);
-    assertNotNull(response);
-    assertSame(ContainerProtos.Result.SUCCESS, response.getResult());
+  private void letCACertExpire() throws Exception {
+    Date expiry = caClient.getCACertificate().getNotAfter();
+    waitFor(() -> expiry.before(new Date()), 100, ROOT_CERT_LIFE_TIME * 1000);
+  }
+
+  private ClientTrustManager aClientTrustManager() throws IOException {
+    X509Certificate firstCert = caClient.getCACertificate();
+    return new ClientTrustManager(
+        () -> singletonList(caClient.getCACertificate()),
+        () -> singletonList(firstCert));
+  }
+
+  private DatanodeDetails aDatanode() {
+    return MockDatanodeDetails.createDatanodeDetails(
+        UUID.randomUUID().toString(), "localhost", "0.0.0.0",
+        "/default-rack");
   }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java
index c9fd1176cc..62931e03b6 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.ozone.client.rpc.RpcClient;
 import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.ServiceInfoEx;
 import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -90,7 +91,6 @@ import org.mockito.Mockito;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
-import java.security.cert.X509Certificate;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
@@ -175,7 +175,7 @@ public class TestOmContainerLocationCache {
       @NotNull
       @Override
       protected XceiverClientFactory createXceiverClientFactory(
-          List<X509Certificate> x509Certificates) throws IOException {
+          ServiceInfoEx serviceInfo) throws IOException {
         return mockDataNodeClientFactory();
       }
     };


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to