This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new a698f48de7c HDDS-14379. Implement basic Hadoop OM client proxy 
provider to read from followers (#9641)
a698f48de7c is described below

commit a698f48de7c762d62e20126210f21faef92e0c4e
Author: Ivan Andika <[email protected]>
AuthorDate: Fri Jan 23 02:58:22 2026 +0800

    HDDS-14379. Implement basic Hadoop OM client proxy provider to read from 
followers (#9641)
---
 .../java/org/apache/hadoop/hdds/HddsUtils.java     |   4 +
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   |   5 +
 .../common/src/main/resources/ozone-default.xml    |   7 +
 .../main/java/org/apache/hadoop/ozone/OmUtils.java | 133 ++++-
 .../om/ha/HadoopRpcOMFailoverProxyProvider.java    |   9 +-
 ...doopRpcOMFollowerReadFailoverProxyProvider.java | 411 +++++++++++++++
 .../ozone/om/ha/OMFailoverProxyProviderBase.java   |   4 +
 .../ozone/om/protocolPB/Hadoop3OmTransport.java    |  37 +-
 .../om/protocolPB/OzoneManagerProtocolPB.java      |   7 +
 .../java/org/apache/hadoop/ozone/TestOmUtils.java  |  34 ++
 ...doopRpcOMFollowerReadFailoverProxyProvider.java | 581 +++++++++++++++++++++
 .../org/apache/hadoop/ozone/om/OmTestUtil.java     |  11 +
 .../ozone/om/TestOzoneManagerHAFollowerRead.java   | 436 ++++++++++++++++
 ...stOzoneManagerHAFollowerReadWithAllRunning.java | 464 ++++++++++++++++
 ...OzoneManagerHAFollowerReadWithStoppedNodes.java | 346 ++++++++++++
 .../ozone/om/TestOzoneManagerHAWithAllRunning.java |  67 ++-
 .../hadoop/fs/ozone/Hadoop27RpcTransport.java      |  28 +-
 17 files changed, 2574 insertions(+), 10 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
index d74cae83942..fce0f295e33 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
@@ -41,6 +41,7 @@
 import java.io.File;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.lang.reflect.UndeclaredThrowableException;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.nio.file.Path;
@@ -615,6 +616,9 @@ public static Throwable getUnwrappedException(Exception ex) 
{
     if (t instanceof RemoteException) {
       t = ((RemoteException) t).unwrapRemoteException();
     }
+    if (t instanceof UndeclaredThrowableException) {
+      t = ((UndeclaredThrowableException) t).getUndeclaredThrowable();
+    }
     while (t != null) {
       if (t instanceof RpcException ||
           t instanceof AccessControlException ||
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index ceca7d0c882..2179414af38 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -488,6 +488,11 @@ public final class OzoneConfigKeys {
   public static final long OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_DEFAULT =
       2000;
 
+  // Ozone Client Follower Read
+  public static final String OZONE_CLIENT_FOLLOWER_READ_ENABLED_KEY =
+      "ozone.client.follower.read.enabled";
+  public static final boolean OZONE_CLIENT_FOLLOWER_READ_ENABLED_DEFAULT = 
false;
+
   public static final String OZONE_FREON_HTTP_ENABLED_KEY =
       "ozone.freon.http.enabled";
   public static final String OZONE_FREON_HTTP_BIND_HOST_KEY =
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index b143b8786f4..d076e1e78d3 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3141,6 +3141,13 @@
       wait time is introduced after all the OM proxies have been attempted 
once.
     </description>
   </property>
+  <property>
+    <name>ozone.client.follower.read.enabled</name>
+    <value>false</value>
+    <description>
+      Enable client to read from OM followers.
+    </description>
+  </property>
   <property>
     <name>ozone.om.admin.protocol.max.retries</name>
     <value>20</value>
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
index 240b6b89531..7e98b1279c0 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
@@ -78,6 +78,7 @@
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -228,8 +229,7 @@ public static int getOmRpcPort(ConfigurationSource conf) {
    * @param omRequest OMRequest proto
    * @return True if its readOnly, false otherwise.
    */
-  public static boolean isReadOnly(
-      OzoneManagerProtocolProtos.OMRequest omRequest) {
+  public static boolean isReadOnly(OMRequest omRequest) {
     OzoneManagerProtocolProtos.Type cmdType = omRequest.getCmdType();
     switch (cmdType) {
     case CheckVolumeAccess:
@@ -352,6 +352,135 @@ public static boolean isReadOnly(
     }
   }
 
+  /**
+   * Checks if the OM request should be sent to the follower or leader.
+   * <p>
+   * Note that this method is not equivalent to {@link 
OmUtils#isReadOnly(OMRequest)}
+   * since there are cases that a "read" requests (ones that do not go through 
Ratis) requires
+   * to be sent to the leader.
+   * @param omRequest OMRequest proto
+   * @return True if the request should be sent to the follower.
+   */
+  public static boolean shouldSendToFollower(OMRequest omRequest) {
+    OzoneManagerProtocolProtos.Type cmdType = omRequest.getCmdType();
+    switch (cmdType) {
+    case CheckVolumeAccess:
+    case InfoVolume:
+    case ListVolume:
+    case InfoBucket:
+    case ListBuckets:
+    case LookupKey:
+    case ListKeys:
+    case ListKeysLight:
+    case ListTrash:
+      // ListTrash is deprecated by HDDS-11251. Keeping this in here
+      // As protobuf currently doesn't support deprecating enum fields
+      // TODO: Remove once migrated to proto3 and mark fields in proto
+      // as deprecated
+    case ListOpenFiles:
+    case ListMultiPartUploadParts:
+    case GetFileStatus:
+    case LookupFile:
+    case ListStatus:
+    case ListStatusLight:
+    case GetAcl:
+    case ListMultipartUploads:
+    case FinalizeUpgradeProgress:
+    case PrepareStatus:
+    case GetS3VolumeContext:
+    case ListTenant:
+    case TenantGetUserInfo:
+    case TenantListUser:
+    case ListSnapshot:
+    case RefetchSecretKey:
+    case GetKeyInfo:
+    case GetSnapshotInfo:
+    case GetObjectTagging:
+      return true;
+    case CreateVolume:
+    case SetVolumeProperty:
+    case DeleteVolume:
+    case CreateBucket:
+    case SetBucketProperty:
+    case DeleteBucket:
+    case CreateKey:
+    case RenameKey:
+    case RenameKeys:
+    case DeleteKey:
+    case DeleteKeys:
+    case CommitKey:
+    case AllocateBlock:
+    case InitiateMultiPartUpload:
+    case CommitMultiPartUpload:
+    case CompleteMultiPartUpload:
+    case AbortMultiPartUpload:
+    case GetS3Secret:
+    case GetDelegationToken:
+    case RenewDelegationToken:
+    case CancelDelegationToken:
+    case CreateDirectory:
+    case CreateFile:
+    case RemoveAcl:
+    case SetAcl:
+    case AddAcl:
+    case PurgeKeys:
+    case RecoverTrash:
+      // RecoverTrash is deprecated by HDDS-11251. Keeping this in here
+      // As protobuf currently doesn't support deprecating enum fields
+      // TODO: Remove once migrated to proto3 and mark fields in proto
+      // as deprecated
+    case FinalizeUpgrade:
+    case Prepare:
+    case CancelPrepare:
+    case DeleteOpenKeys:
+    case SetS3Secret:
+    case RevokeS3Secret:
+    case PurgeDirectories:
+    case PurgePaths:
+    case CreateTenant:
+    case DeleteTenant:
+    case TenantAssignUserAccessId:
+    case TenantRevokeUserAccessId:
+    case TenantAssignAdmin:
+    case TenantRevokeAdmin:
+    case SetRangerServiceVersion:
+    case CreateSnapshot:
+    case DeleteSnapshot:
+    case RenameSnapshot:
+    case SnapshotMoveDeletedKeys:
+    case SnapshotMoveTableKeys:
+    case SnapshotPurge:
+    case RecoverLease:
+    case SetTimes:
+    case AbortExpiredMultiPartUploads:
+    case SetSnapshotProperty:
+    case QuotaRepair:
+    case PutObjectTagging:
+    case DeleteObjectTagging:
+    case ServiceList: // OM leader should have the most up-to-date OM service 
list info
+    case RangerBGSync: // Ranger Background Sync task is only run on leader
+    case SnapshotDiff:
+    case CancelSnapshotDiff:
+    case ListSnapshotDiffJobs:
+    case PrintCompactionLogDag:
+      // Snapshot diff is a local to a single OM node so we should not send it 
arbitrarily
+      // to any OM nodes
+    case TransferLeadership: // Transfer leadership should be initiated by the 
leader
+    case SetSafeMode: // SafeMode should be initiated by the leader
+    case StartQuotaRepair:
+    case GetQuotaRepairStatus:
+      // Quota repair lifecycle request should be initiated by the leader
+    case DBUpdates: // We are currently only interested on the leader DB info
+    case UnknownCommand:
+      return false;
+    case EchoRPC:
+      return omRequest.getEchoRPCRequest().getReadOnly();
+    default:
+      LOG.error("CmdType {} is not categorized to be sent to follower.", 
cmdType);
+      return false;
+    }
+  }
+
   public static byte[] getSHADigest() throws IOException {
     try {
       SRAND.nextBytes(randomBytes);
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFailoverProxyProvider.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFailoverProxyProvider.java
index 7d5e40e9f8d..4a3a4fc4715 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFailoverProxyProvider.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFailoverProxyProvider.java
@@ -45,7 +45,7 @@
 public class HadoopRpcOMFailoverProxyProvider<T> extends
       OMFailoverProxyProviderBase<T> {
 
-  private static final Logger LOG =
+  protected static final Logger LOG =
       LoggerFactory.getLogger(HadoopRpcOMFailoverProxyProvider.class);
 
   private final Text delegationTokenService;
@@ -123,7 +123,7 @@ public synchronized ProxyInfo<T> getProxy() {
   /**
    * Creates proxy object.
    */
-  protected ProxyInfo<T> createOMProxyIfNeeded(OMProxyInfo<T> omProxyInfo) {
+  protected synchronized ProxyInfo<T> createOMProxyIfNeeded(OMProxyInfo<T> 
omProxyInfo) {
     if (omProxyInfo.proxy == null) {
       try {
         omProxyInfo.proxy = createOMProxy(omProxyInfo.getAddress());
@@ -136,6 +136,11 @@ protected ProxyInfo<T> 
createOMProxyIfNeeded(OMProxyInfo<T> omProxyInfo) {
     return omProxyInfo;
   }
 
+  protected synchronized ProxyInfo<T> createOMProxyIfNeeded(String omNodeId) {
+    OMProxyInfo<T> omProxyInfo = getOmProxy(omNodeId);
+    return createOMProxyIfNeeded(omProxyInfo);
+  }
+
   public Text getCurrentProxyDelegationToken() {
     return delegationTokenService;
   }
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFollowerReadFailoverProxyProvider.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFollowerReadFailoverProxyProvider.java
new file mode 100644
index 00000000000..55f46fdb0d6
--- /dev/null
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFollowerReadFailoverProxyProvider.java
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.ha;
+
+import static 
org.apache.hadoop.ozone.om.ha.OMFailoverProxyProviderBase.getLeaderNotReadyException;
+import static 
org.apache.hadoop.ozone.om.ha.OMFailoverProxyProviderBase.getNotLeaderException;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.List;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.io.retry.FailoverProxyProvider;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ipc_.Client.ConnectionId;
+import org.apache.hadoop.ipc_.RPC;
+import org.apache.hadoop.ipc_.RpcInvocationHandler;
+import org.apache.hadoop.ipc_.RpcNoSuchProtocolException;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
+import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
+import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link org.apache.hadoop.io.retry.FailoverProxyProvider} implementation
+ * that supports reading from follower OM(s) (i.e. non-leader OMs also includes
+ * OM listeners).
+ * <p>
+ * This constructs a wrapper proxy might send the read request to follower
+ * OM(s), if follower read is enabled. It will try to send read requests
+ * to the first OM node. If RPC failed, it will try to failover to the next OM 
node.
+ * It will fail back to the leader OM after it has exhausted all the OMs.
+ * TODO: Currently the logic does not prioritize forwarding to followers since
+ *  it requires an extra RPC latency to check the OM role info.
+ *  In the future, we can try to try to pick the followers before forwarding
+ *  the request to the leader (similar to ObserverReadProxyProvider).
+ * <p>
+ * Read and write requests will still be sent to leader OM if reading from
+ * follower is disabled.
+ */
+public class HadoopRpcOMFollowerReadFailoverProxyProvider<T> implements 
FailoverProxyProvider<T> {
+  @VisibleForTesting
+  public static final Logger LOG = 
LoggerFactory.getLogger(HadoopRpcOMFollowerReadFailoverProxyProvider.class);
+
+  private final Class<T> protocolClass;
+
+  /** The inner proxy provider used for leader-based failover. */
+  private final HadoopRpcOMFailoverProxyProvider<T> failoverProxy;
+
+  /** The combined proxy which redirects to other proxies as necessary. */
+  private final ProxyInfo<T> combinedProxy;
+
+  /**
+   * Whether reading from follower is enabled. If this is false, all read
+   * requests will still go to OM leader.
+   */
+  private volatile boolean useFollowerRead;
+
+  /**
+   * The current index of the underlying leader-based proxy provider's 
omNodesInOrder currently being used.
+   * Should only be accessed in synchronized methods.
+   */
+  private int currentIndex = -1;
+
+  /**
+   * The proxy currently being used to send the read request.
+   * Should only be accessed in synchronized methods.
+   */
+  private OMProxyInfo<T> currentProxy;
+
+  /** The last proxy that has been used. Only used for testing. */
+  private volatile OMProxyInfo<T> lastProxy = null;
+
+  public HadoopRpcOMFollowerReadFailoverProxyProvider(
+      ConfigurationSource configuration, UserGroupInformation ugi, String 
omServiceId, Class<T> protocol)
+      throws IOException {
+    this(omServiceId, protocol,
+        new HadoopRpcOMFailoverProxyProvider<>(configuration, ugi, 
omServiceId, protocol));
+  }
+
+  public HadoopRpcOMFollowerReadFailoverProxyProvider(String omServiceId, 
Class<T> protocol,
+      HadoopRpcOMFailoverProxyProvider<T> failoverProxy) {
+    this.protocolClass = protocol;
+    this.failoverProxy = failoverProxy;
+
+    // Create a wrapped proxy containing all the proxies. Since this combined
+    // proxy is just redirecting to other proxies, all invocations can share 
it.
+    final String combinedInfo = "[" + failoverProxy.getOMProxies().stream()
+        .map(a -> a.proxyInfo)
+        .reduce((a, b) -> a + ", " + b).orElse("") + "]";
+    @SuppressWarnings("unchecked")
+    T wrappedProxy = (T) Proxy.newProxyInstance(
+        FollowerReadInvocationHandler.class.getClassLoader(),
+        new Class<?>[] {protocol}, new FollowerReadInvocationHandler());
+    combinedProxy = new ProxyInfo<>(wrappedProxy, combinedInfo);
+
+    if (wrappedProxy instanceof OzoneManagerProtocolPB) {
+      this.useFollowerRead = true;
+    } else {
+      LOG.debug("Disabling follower reads for {} because the requested proxy "
+          + "class does not implement {}", omServiceId, 
OzoneManagerProtocolPB.class.getName());
+      this.useFollowerRead = false;
+    }
+  }
+
+  @Override
+  public Class<T> getInterface() {
+    return protocolClass;
+  }
+
+  @Override
+  public ProxyInfo<T> getProxy() {
+    return combinedProxy;
+  }
+
+  @Override
+  public void performFailover(T currProxy) {
+    // Since FollowerReadInvocationHandler might user or fallback to 
leader-based failover logic,
+    // we should delegate the failover logic to the leader's failover.
+    failoverProxy.performFailover(currProxy);
+  }
+
+  public RetryPolicy getRetryPolicy(int maxFailovers) {
+    // We use the OMFailoverProxyProviderBase's RetryPolicy instead of using 
our own retry policy
+    // for a few reasons
+    // 1. We want to ensure that the retry policy behavior remains the same 
when we use the leader proxy
+    //    (when follower read is disabled or using write request)
+    // 2. The FollowerInvocationHandler is also written so that the thrown 
exception is handled by the
+    //    OMFailoverProxyProviderbase's RetryPolicy
+    return failoverProxy.getRetryPolicy(maxFailovers);
+  }
+
+  /**
+   * Parse the OM request from the request args.
+   *
+   * @return parsed OM request.
+   */
+  private static OMRequest parseOMRequest(Object[] args) throws 
ServiceException {
+    String error = null;
+    if (args == null) {
+      error = "args == null";
+    } else if (args.length < 2) {
+      error = "args.length == " + args.length + " < 2";
+    } else if (args[1] == null) {
+      error = "args[1] == null";
+    } else if (!(args[1] instanceof OMRequest)) {
+      error = "Non-OMRequest: " + args[1].getClass();
+    }
+    if (error != null) {
+      // Throws a non-retriable exception to prevent retry and failover
+      // See the HddsUtils#shouldNotFailoverOnRpcException used in
+      // OMFailoverProxyProviderBase#shouldFailover
+      throwServiceException(new RpcNoSuchProtocolException("Failed to 
parseOMRequest: " + error));
+    }
+    return (OMRequest) args[1];
+  }
+
+  @VisibleForTesting
+  void setUseFollowerRead(boolean flag) {
+    this.useFollowerRead = flag;
+  }
+
+  @VisibleForTesting
+  public ProxyInfo<T> getLastProxy() {
+    return lastProxy;
+  }
+
+  /**
+   * Return the currently used proxy. If there is none, first calls
+   * {@link #changeProxy(OMProxyInfo)} to initialize one.
+   */
+  @VisibleForTesting
+  public OMProxyInfo<T> getCurrentProxy() {
+    return changeProxy(null);
+  }
+
+  /**
+   * Move to the next proxy in the proxy list. If the OMProxyInfo supplied by
+   * the caller does not match the current proxy, the call is ignored; this is
+   * to handle concurrent calls (to avoid changing the proxy multiple times).
+   * The service state of the newly selected proxy will be updated before
+   * returning.
+   *
+   * @param initial The expected current proxy
+   * @return The new proxy that should be used.
+   */
+  private synchronized OMProxyInfo<T> changeProxy(OMProxyInfo<T> initial) {
+    if (currentProxy != initial) {
+      // Must have been a concurrent modification; ignore the move request
+      return currentProxy;
+    }
+    currentIndex = (currentIndex + 1) % 
failoverProxy.getOmNodesInOrder().size();
+    String currentOmNodeId = 
failoverProxy.getOmNodesInOrder().get(currentIndex);
+    currentProxy = (OMProxyInfo<T>) 
failoverProxy.createOMProxyIfNeeded(currentOmNodeId);
+    LOG.debug("Changed current proxy from {} to {}",
+        initial == null ? "none" : initial.proxyInfo,
+        currentProxy.proxyInfo);
+    return currentProxy;
+  }
+
+  /**
+   * An InvocationHandler to handle incoming requests. This class's invoke
+   * method contains the primary logic for redirecting to followers.
+   * <p>
+   * If follower reads are enabled, attempt to send read operations to the
+   * current proxy which can be either a leader or follower. If the current
+   * proxy's OM node fails, adjust the current proxy and return on the next 
one.
+   * <p>
+   * Write requests are always forwarded to the leader.
+   */
+  private class FollowerReadInvocationHandler implements RpcInvocationHandler {
+
+    @Override
+    public Object invoke(Object proxy, final Method method, final Object[] 
args)
+        throws Throwable {
+      lastProxy = null;
+      if (method.getDeclaringClass() == Object.class) {
+        // If the method is not a OzoneManagerProtocolPB method (e.g. 
Object#toString()),
+        // we should invoke the method on the current proxy
+        return method.invoke(this, args);
+      }
+      Object retVal = null;
+      OMRequest omRequest = parseOMRequest(args);
+      if (useFollowerRead && OmUtils.shouldSendToFollower(omRequest)) {
+        int failedCount = 0;
+        for (int i = 0; useFollowerRead && i < 
failoverProxy.getOmNodesInOrder().size(); i++) {
+          OMProxyInfo<T> current = getCurrentProxy();
+          LOG.debug("Attempting to service {} with cmdType {} using proxy {}",
+              method.getName(), omRequest.getCmdType(), current.proxyInfo);
+          try {
+            retVal = method.invoke(current.proxy, args);
+            lastProxy = current;
+            LOG.debug("Invocation of {} with cmdType {} using {} was 
successful",
+                method.getName(), omRequest.getCmdType(), current.proxyInfo);
+            return retVal;
+          } catch (InvocationTargetException ite) {
+            LOG.debug("Invocation of {} with cmdType {} using proxy {} 
failed", method.getName(),
+                omRequest.getCmdType(), current.proxyInfo, ite);
+            if (!(ite.getCause() instanceof Exception)) {
+              throwServiceException(ite.getCause());
+            }
+            Exception e = (Exception) ite.getCause();
+            if (e instanceof InterruptedIOException ||
+                e instanceof InterruptedException) {
+              // If interrupted, do not retry.
+              LOG.warn("Invocation returned interrupted exception on [{}];",
+                  current.proxyInfo, e);
+              throwServiceException(e);
+            }
+
+            if (e instanceof ServiceException) {
+              OMNotLeaderException notLeaderException =
+                  getNotLeaderException(e);
+              if (notLeaderException != null) {
+                // We should disable follower read here since this means
+                // the OM follower does not support / disable follower read or 
something is misconfigured
+                LOG.debug("Encountered OMNotLeaderException from {}. " +
+                    "Disable OM follower read and retry OM leader directly.", 
current.proxyInfo);
+                useFollowerRead = false;
+                // Break here instead of throwing exception so that it is not 
counted
+                // as a failover
+                break;
+              }
+
+              OMLeaderNotReadyException leaderNotReadyException =
+                  getLeaderNotReadyException(e);
+              if (leaderNotReadyException != null) {
+                LOG.debug("Encountered OMLeaderNotReadyException from {}. " +
+                    "Directly throw the exception to trigger retry", 
current.proxyInfo);
+                // Throw here to trigger retry since we already communicate to 
the leader
+                // If we break here instead, we will retry the same leader 
again without waiting
+                throw e;
+              }
+            }
+
+            if (!failoverProxy.shouldFailover(e)) {
+              // We reuse the leader proxy provider failover since we want to 
ensure
+              // if the follower read proxy decides that the exception should 
be failed,
+              // the leader proxy provider failover retry policy (i.e. 
OMFailoverProxyProviderBase#getRetryPolicy)
+              // should also fail the call.
+              // Otherwise, if the follower read proxy decides the exception 
should be failed, but
+              // the leader decides to failover to the its next proxy, the 
follower read proxy remains
+              // unchanged and the next read calls might query the same 
failing OM node and
+              // fail indefinitely.
+              LOG.debug("Invocation with cmdType {} returned exception on [{}] 
that cannot be retried; " +
+                      "{} failure(s) so far",
+                  omRequest.getCmdType(), current.proxyInfo, failedCount, e);
+              throw e;
+            } else {
+              failedCount++;
+              LOG.warn(
+                  "Invocation with cmdType {} returned exception on [{}]; {} 
failure(s) so far",
+                  omRequest.getCmdType(), current.proxyInfo, failedCount, e);
+              changeProxy(current);
+            }
+          }
+        }
+
+        // Only log message if there are actual follower failures.
+        // Getting here with failedCount = 0 could
+        // be that there is simply no Follower node running at all.
+        if (failedCount > 0) {
+          // If we get here, it means all followers have failed.
+          LOG.warn("{} nodes have failed for read request {} with cmdType {}."
+                  + " Falling back to leader.", failedCount,
+              omRequest.getCmdType(), method.getName());
+        } else {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Read falling back to leader without follower read "
+                + "fail, is there no follower node running?");
+          }
+        }
+      }
+
+      // Either all followers have failed, follower reads are disabled,
+      // or this is a write request. In any case, forward the request to
+      // the leader OM.
+      LOG.debug("Using leader-based failoverProxy to service {}", 
method.getName());
+      OMProxyInfo<T> leaderProxy = (OMProxyInfo<T>) failoverProxy.getProxy();
+      try {
+        retVal = method.invoke(leaderProxy.proxy, args);
+      } catch (InvocationTargetException e) {
+        LOG.debug("Exception thrown from leader-based failoverProxy", 
e.getCause());
+        // This exception will be handled by the 
OMFailoverProxyProviderBase#getRetryPolicy
+        // (see getRetryPolicy). This ensures that the leader-only failover 
should still work.
+        throwServiceException(e.getCause());
+      }
+      lastProxy = leaderProxy;
+      return retVal;
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    @Override
+    public ConnectionId getConnectionId() {
+      return RPC.getConnectionIdForProxy(useFollowerRead
+          ? getCurrentProxy().proxy : failoverProxy.getProxy().proxy);
+    }
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    // All the proxies are stored in the underlying failoverProxy
+    // so we invoke close on the underlying failoverProxy
+    failoverProxy.close();
+  }
+
+  @VisibleForTesting
+  public boolean isUseFollowerRead() {
+    return useFollowerRead;
+  }
+
+  @VisibleForTesting
+  public List<OMProxyInfo<T>> getOMProxies() {
+    return failoverProxy.getOMProxies();
+  }
+
+  public synchronized void changeInitialProxyForTest(String initialOmNodeId) {
+    if (currentProxy != null && 
currentProxy.getNodeId().equals(initialOmNodeId)) {
+      return;
+    }
+
+    int indexOfTargetNodeId = 
failoverProxy.getOmNodesInOrder().indexOf(initialOmNodeId);
+    if (indexOfTargetNodeId == -1) {
+      return;
+    }
+
+    currentIndex = indexOfTargetNodeId;
+    currentProxy = (OMProxyInfo<T>) 
failoverProxy.createOMProxyIfNeeded(initialOmNodeId);
+  }
+
+  /**
+   * Throw the passed {@link Throwable} wrapped in {@link ServiceException}.
+   * This is required to prevent {@link 
java.lang.reflect.UndeclaredThrowableException} to be thrown
+   * since {@link OzoneManagerProtocolPB#submitRequest(RpcController, 
OMRequest)} only
+   * throws {@link ServiceException}.
+   * @param e exception to wrap in {@link ServiceException}.
+   * @throws ServiceException the exception that wraps the passed throwable.
+   */
+  private static void throwServiceException(Throwable e) throws 
ServiceException {
+    throw e instanceof ServiceException ? (ServiceException) e : new 
ServiceException(e);
+  }
+
+}
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java
index 04cc40a6b0f..739999d39f8 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java
@@ -416,6 +416,10 @@ public Map<String, OMProxyInfo<T>> getOMProxyMap() {
     return omProxies;
   }
 
+  protected synchronized OMProxyInfo<T> getOmProxy(String nodeId) {
+    return omProxies.get(nodeId);
+  }
+
   /**
    * Unwrap the exception and return the wrapped OMLeaderNotReadyException if 
any.
    *
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
index f774f884554..7de5af31a62 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
@@ -30,6 +30,7 @@
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
 import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
+import 
org.apache.hadoop.ozone.om.ha.HadoopRpcOMFollowerReadFailoverProxyProvider;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -45,6 +46,7 @@ public class Hadoop3OmTransport implements OmTransport {
   private static final RpcController NULL_RPC_CONTROLLER = null;
 
   private final HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> 
omFailoverProxyProvider;
+  private HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB> 
followerReadFailoverProxyProvider;
 
   private final OzoneManagerProtocolPB rpcProxy;
 
@@ -58,10 +60,31 @@ public Hadoop3OmTransport(ConfigurationSource conf,
     this.omFailoverProxyProvider = new HadoopRpcOMFailoverProxyProvider<>(
             conf, ugi, omServiceId, OzoneManagerProtocolPB.class);
 
+    boolean followerReadEnabled = conf.getBoolean(
+        OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_ENABLED_KEY,
+        OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_ENABLED_DEFAULT
+    );
+
     int maxFailovers = conf.getInt(
         OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
         OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
-    this.rpcProxy = OzoneManagerProtocolPB.newProxy(omFailoverProxyProvider, 
maxFailovers);
+
+    // TODO: In the future, we might support more FollowerReadProxyProvider 
strategies depending on factors
+    //  like latency, applied index, etc.
+    //  So instead of enabling using follower read configuration, we can 
simply let user to configure the
+    //  failover proxy provider instead (similar to 
dfs.client.failover.proxy.provider.<nameservice>)
+    if (followerReadEnabled) {
+      this.followerReadFailoverProxyProvider = new 
HadoopRpcOMFollowerReadFailoverProxyProvider<>(
+          omServiceId, OzoneManagerProtocolPB.class, omFailoverProxyProvider
+      );
+      this.rpcProxy = 
OzoneManagerProtocolPB.newProxy(followerReadFailoverProxyProvider, 
maxFailovers);
+    } else {
+      // TODO: It should be possible to simply instantiate 
HadoopRpcOMFollowerReadFailoverProxyProvider
+      //  even if the follower read is not enabled. We can try this to ensure 
that the tests still pass which
+      //  suggests that the HadoopRpcOMFollowerReadFailoverProxyProvider is a 
indeed a superset of
+      //  HadoopRpcOMFollowerReadFailoverProxyProvider
+      this.rpcProxy = OzoneManagerProtocolPB.newProxy(omFailoverProxyProvider, 
maxFailovers);
+    }
   }
 
   @Override
@@ -99,8 +122,18 @@ public 
HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> getOmFailoverPro
     return omFailoverProxyProvider;
   }
 
+  @VisibleForTesting
+  public HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB>
+      getOmFollowerReadFailoverProxyProvider() {
+    return followerReadFailoverProxyProvider;
+  }
+
   @Override
   public void close() throws IOException {
-    omFailoverProxyProvider.close();
+    if (followerReadFailoverProxyProvider != null) {
+      followerReadFailoverProxyProvider.close();
+    } else {
+      omFailoverProxyProvider.close();
+    }
   }
 }
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolPB.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolPB.java
index f4f7c54403a..16f663ec574 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolPB.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolPB.java
@@ -21,6 +21,7 @@
 import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.ipc_.ProtocolInfo;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
+import 
org.apache.hadoop.ozone.om.ha.HadoopRpcOMFollowerReadFailoverProxyProvider;
 import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProviderBase;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneManagerService;
 import org.apache.hadoop.ozone.security.OzoneDelegationTokenSelector;
@@ -44,4 +45,10 @@ static OzoneManagerProtocolPB 
newProxy(OMFailoverProxyProviderBase<OzoneManagerP
     return (OzoneManagerProtocolPB) 
RetryProxy.create(OzoneManagerProtocolPB.class, failoverProxyProvider,
         failoverProxyProvider.getRetryPolicy(maxFailovers));
   }
+  
+  static OzoneManagerProtocolPB 
newProxy(HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB>
+      followerReadFailoverProxyProvider, int maxFailovers) {
+    return (OzoneManagerProtocolPB) 
RetryProxy.create(OzoneManagerProtocolPB.class, 
followerReadFailoverProxyProvider,
+        followerReadFailoverProxyProvider.getRetryPolicy(maxFailovers));
+  }
 }
diff --git 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/TestOmUtils.java 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/TestOmUtils.java
index 9aea06fd796..b61bf330316 100644
--- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/TestOmUtils.java
+++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/TestOmUtils.java
@@ -41,10 +41,16 @@
 import java.nio.file.Path;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.UUID;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.LoggerFactory;
 
 /**
  * Unit tests for {@link OmUtils}.
@@ -326,4 +332,32 @@ void testGetObjectIdFromTxIdValidation() {
 
   // Consistency checks between epoch and txId are covered by
   // testAddEpochToTxId() and testGetObjectIdFromTxId().
+
+  /**
+   * Test that all of {@link OzoneManagerProtocolProtos.Type} enum values are
+   * categorized in {@link OmUtils#shouldSendToFollower(OMRequest)}.
+   */
+  @Test
+  public void testShouldSendToFollowerFollowerReadCapturesAllCmdTypeEnums() {
+    GenericTestUtils.LogCapturer logCapturer =
+        GenericTestUtils.LogCapturer.captureLogs(
+            LoggerFactory.getLogger(OmUtils.class));
+    OzoneManagerProtocolProtos.Type[] cmdTypes =
+        OzoneManagerProtocolProtos.Type.values();
+    String clientId = UUID.randomUUID().toString();
+
+    for (OzoneManagerProtocolProtos.Type cmdType : cmdTypes) {
+      OMRequest request = OMRequest.newBuilder()
+          .setCmdType(cmdType)
+          .setClientId(clientId)
+          .build();
+      OmUtils.shouldSendToFollower(request);
+      Assertions.assertFalse(
+          logCapturer.getOutput().contains(
+              "CmdType " + cmdType + " is not categorized to be sent to 
follower."
+          )
+      );
+      logCapturer.clearOutput();
+    }
+  }
 }
diff --git 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestHadoopRpcOMFollowerReadFailoverProxyProvider.java
 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestHadoopRpcOMFollowerReadFailoverProxyProvider.java
new file mode 100644
index 00000000000..099d7497a3e
--- /dev/null
+++ 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestHadoopRpcOMFollowerReadFailoverProxyProvider.java
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.ha;
+
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_NODES_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.StringJoiner;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.io.retry.RetryInvocationHandler;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.ipc_.RemoteException;
+import org.apache.hadoop.ipc_.RpcNoSuchProtocolException;
+import org.apache.hadoop.ozone.ClientVersion;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.ha.ConfUtils;
+import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
+import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
+import org.apache.hadoop.ozone.om.protocolPB.OMAdminProtocolPB;
+import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateKeyRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetKeyInfoRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.protocol.ClientId;
+import org.junit.jupiter.api.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Tests for {@link HadoopRpcOMFollowerReadFailoverProxyProvider}.
+ */
+public class TestHadoopRpcOMFollowerReadFailoverProxyProvider {
+  private static final long SLOW_RESPONSE_SLEEP_TIME = 
TimeUnit.SECONDS.toMillis(2);
+  private static final String OM_SERVICE_ID = "om-service-test1";
+  private static final String NODE_ID_BASE_STR = "omNode-";
+  private static final String DUMMY_NODE_ADDR = "0.0.0.0:8080";
+  private OzoneConfiguration conf;
+
+  private HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB> 
proxyProvider;
+  private OzoneManagerProtocolPB retryProxy;
+  private String[] omNodeIds;
+  private OMAnswer[] omNodeAnswers;
+
+  @Test
+  public void testWithNonClientProxy() throws Exception {
+    setupProxyProvider(2);
+    HadoopRpcOMFollowerReadFailoverProxyProvider<OMAdminProtocolPB> 
adminProxyProvider =
+        new HadoopRpcOMFollowerReadFailoverProxyProvider<>(conf,
+            UserGroupInformation.getCurrentUser(), OM_SERVICE_ID, 
OMAdminProtocolPB.class);
+    // follower read is only enabled for OzoneManagerProtocolPB and disabled 
otherwise
+    assertFalse(adminProxyProvider.isUseFollowerRead());
+  }
+
+  @Test
+  void testWriteOperationOnLeader() throws Exception {
+    setupProxyProvider(3);
+    omNodeAnswers[2].isLeader = true;
+
+    doWrite();
+
+    assertHandledBy(2);
+    assertTrue(proxyProvider.isUseFollowerRead());
+    // Although the write request is forwarded to the leader,
+    // the follower read proxy provider should still point to first OM follower
+    assertEquals(proxyProvider.getCurrentProxy().getNodeId(), omNodeIds[0]);
+  }
+
+  @Test
+  void testWriteOperationOnLeaderNotReady() throws Exception {
+    setupProxyProvider(3);
+    omNodeAnswers[0].isLeader = true;
+    omNodeAnswers[0].isLeaderReady = false;
+
+    new Thread(() -> {
+      try {
+        Thread.sleep(1000);
+        omNodeAnswers[0].isLeaderReady = true;
+      } catch (InterruptedException ignored) {
+      }
+    }).start();
+
+    long start = Time.monotonicNow();
+    doWrite();
+    long elapsed = Time.monotonicNow() - start;
+
+    assertTrue(elapsed > 1000,
+        "Write operation finished earlier than expected");
+
+    assertHandledBy(0);
+    assertTrue(proxyProvider.isUseFollowerRead());
+  }
+
+  @Test
+  void testWriteLeaderFailover() throws Exception {
+    setupProxyProvider(3);
+    omNodeAnswers[0].isLeader = true;
+
+    doWrite();
+    assertHandledBy(0);
+    // Read current proxy remain unchanged
+    assertEquals(proxyProvider.getCurrentProxy().getNodeId(), omNodeIds[0]);
+
+
+    // Leader failover from omNode-1 to omNode-2
+    omNodeAnswers[0].isLeader = false;
+    omNodeAnswers[1].isLeader = true;
+    doWrite();
+    assertHandledBy(1);
+    assertEquals(proxyProvider.getCurrentProxy().getNodeId(), omNodeIds[0]);
+
+    // Leader failover back from omNode-2 to omNode-1
+    omNodeAnswers[0].isLeader = true;
+    omNodeAnswers[1].isLeader = false;
+    doWrite();
+    assertHandledBy(0);
+    assertEquals(proxyProvider.getCurrentProxy().getNodeId(), omNodeIds[0]);
+  }
+
+  @Test
+  void testReadOperationOnFollower() throws Exception {
+    setupProxyProvider(3);
+    omNodeAnswers[0].isLeader = false;
+
+    doRead();
+
+    assertHandledBy(0);
+    assertTrue(proxyProvider.isUseFollowerRead());
+  }
+
+  @Test
+  void testReadOperationOnLeader() throws Exception {
+    setupProxyProvider(3);
+    omNodeAnswers[0].isLeader = true;
+
+    doRead();
+
+    // Follower read can still read from OM leader
+    assertHandledBy(0);
+    assertTrue(proxyProvider.isUseFollowerRead());
+  }
+
+  @Test
+  void testReadOperationOnLeaderNotReady() throws Exception {
+    setupProxyProvider(3);
+    omNodeAnswers[0].isLeader = true;
+    omNodeAnswers[0].isLeaderReady = false;
+
+    new Thread(() -> {
+      try {
+        Thread.sleep(1000);
+        omNodeAnswers[0].isLeaderReady = true;
+      } catch (InterruptedException ignored) {
+      }
+    }).start();
+
+    long start = Time.monotonicNow();
+    doRead();
+    long elapsed = Time.monotonicNow() - start;
+
+    assertTrue(elapsed > 1000,
+        "Read operation finished earlier than expected");
+
+    assertHandledBy(0);
+    assertTrue(proxyProvider.isUseFollowerRead());
+  }
+
+  @Test
+  void testReadOperationOnFollowerWhenFollowerReadUnsupported() throws 
Exception {
+    setupProxyProvider(3);
+    // Disable all follower reads from all OM nodes
+    for (OMAnswer omAnswer : omNodeAnswers) {
+      omAnswer.isFollowerReadSupported = false;
+    }
+    omNodeAnswers[1].isLeader = true;
+
+    doRead();
+    // The read request will be handled by the leader
+    assertHandledBy(1);
+    // Since OMNotLeaderException is thrown during follower read, the
+    // proxy will keep sending reads from the leader from now on
+    assertFalse(proxyProvider.isUseFollowerRead());
+
+    // Try to simulate leader change
+    omNodeAnswers[1].isLeader = false;
+    omNodeAnswers[2].isLeader = true;
+
+    doRead();
+    assertHandledBy(2);
+
+    assertFalse(proxyProvider.isUseFollowerRead());
+  }
+
+  @Test
+  void testUnreachableFollowers() throws Exception {
+    setupProxyProvider(3);
+    omNodeAnswers[2].isLeader = true;
+    // Mark the first follower as unreachable
+    omNodeAnswers[0].unreachable = true;
+
+    // It will be handled by the second follower since the first follower is
+    // unreachable
+    doRead();
+    assertHandledBy(1);
+
+    // Now make the second follower as unavailable
+    // All followers are unreachable now
+    omNodeAnswers[1].unreachable = true;
+
+    // Confirm that read still succeeds even though followers are not available
+    doRead();
+    // It will be handled by the leader
+    assertHandledBy(2);
+  }
+
+  @Test
+  void testReadOnSlowFollower() throws Exception {
+    setupProxyProvider(3);
+    omNodeAnswers[0].slowNode = true;
+
+    long start = Time.monotonicNow();
+    doRead();
+    long elapsed = Time.monotonicNow() - start;
+    assertHandledBy(0);
+    assertThat(elapsed)
+        .withFailMessage(() -> "Read operation finished earlier than expected")
+        .isGreaterThanOrEqualTo(SLOW_RESPONSE_SLEEP_TIME);
+
+  }
+
+  @Test
+  void testMixedWriteAndRead() throws Exception {
+    setupProxyProvider(3);
+    omNodeAnswers[1].isLeader = true;
+
+    doWrite();
+
+    // Write is handled by the leader
+    assertHandledBy(1);
+
+    doRead();
+
+    // Read is handled by the first follower
+    assertHandledBy(0);
+  }
+
+  @Test
+  void testWriteWithAllOMsUnreachable() throws Exception {
+    setupProxyProvider(3);
+    omNodeAnswers[0].unreachable = true;
+    omNodeAnswers[1].unreachable = true;
+    omNodeAnswers[2].unreachable = true;
+
+    ServiceException exception = assertThrows(ServiceException.class, 
this::doWrite);
+    assertInstanceOf(IOException.class, exception.getCause());
+  }
+
+  @Test
+  void testReadWithAllOMsUnreachable() throws Exception {
+    setupProxyProvider(3);
+    omNodeAnswers[0].unreachable = true;
+    omNodeAnswers[1].unreachable = true;
+    omNodeAnswers[2].unreachable = true;
+
+    ServiceException exception = assertThrows(ServiceException.class, 
this::doRead);
+    assertInstanceOf(IOException.class, exception.getCause());
+  }
+
+  @Test
+  void testObjectMethodsOnProxy() throws Exception {
+    setupProxyProvider(2);
+
+    assertNotNull(retryProxy.toString());
+    retryProxy.hashCode();
+    retryProxy.equals(retryProxy);
+  }
+
+  @Test
+  void testObjectMethodsDoNotSelectProxy() throws Exception {
+    setupProxyProvider(2);
+
+    assertNull(proxyProvider.getLastProxy());
+  }
+
+  @Test
+  void testShortArgsArrayDoesNotThrowArrayIndex() throws Exception {
+    setupProxyProvider(2);
+
+    Object combinedProxy = proxyProvider.getProxy().proxy;
+    InvocationHandler handler = Proxy.getInvocationHandler(combinedProxy);
+    Method submitRequest = OzoneManagerProtocolPB.class.getMethod(
+        "submitRequest", RpcController.class, OMRequest.class);
+
+    ServiceException exception = assertThrows(ServiceException.class,
+        () -> handler.invoke(combinedProxy, submitRequest, new Object[] 
{null}));
+    assertInstanceOf(RpcNoSuchProtocolException.class, exception.getCause());
+  }
+
+  @Test
+  void testNullRequest() throws Exception {
+    setupProxyProvider(2);
+    ServiceException exception = assertThrows(ServiceException.class,
+        () -> retryProxy.submitRequest(null, null));
+    assertInstanceOf(RpcNoSuchProtocolException.class, exception.getCause());
+  }
+
+  private void setupProxyProvider(int omNodeCount) throws Exception {
+    setupProxyProvider(omNodeCount, new OzoneConfiguration());
+  }
+
+  private void setupProxyProvider(int omNodeCount, OzoneConfiguration config) 
throws Exception {
+    omNodeIds = new String[omNodeCount];
+    omNodeAnswers = new OMAnswer[omNodeCount];
+    StringJoiner allNodeIds = new StringJoiner(",");
+    OzoneManagerProtocolPB[] proxies = new OzoneManagerProtocolPB[omNodeCount];
+    // Setup each OM node with the mocked proxy
+    Map<String, OzoneManagerProtocolPB> proxyMap = new HashMap<>();
+    for (int i = 0; i < omNodeCount; i++) {
+      String nodeId = NODE_ID_BASE_STR + (i + 1); // 1-th indexed
+      config.set(ConfUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY, OM_SERVICE_ID,
+          nodeId), DUMMY_NODE_ADDR);
+      allNodeIds.add(nodeId);
+      omNodeIds[i] = nodeId;
+      omNodeAnswers[i] = new OMAnswer();
+      proxies[i] = mock(OzoneManagerProtocolPB.class);
+      doAnswer(omNodeAnswers[i].clientAnswer)
+          .when(proxies[i]).submitRequest(any(), any());
+      doAnswer(omNodeAnswers[i].clientAnswer)
+          .when(proxies[i]).submitRequest(any(), any());
+      proxyMap.put(nodeId, proxies[i]);
+    }
+    config.set(ConfUtils.addKeySuffixes(OZONE_OM_NODES_KEY, OM_SERVICE_ID),
+        allNodeIds.toString());
+    config.set(OZONE_OM_SERVICE_IDS_KEY, OM_SERVICE_ID);
+    config.setInt(OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 2 * omNodeCount);
+    config.setLong(
+        OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_KEY, 500);
+
+    // Create a leader-based failover proxy provider using the mocked proxies
+    HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> 
underlyingProxyProvider =
+        new HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB>(config, 
UserGroupInformation.getCurrentUser(),
+            OM_SERVICE_ID, OzoneManagerProtocolPB.class) {
+          @Override
+          protected synchronized ProxyInfo<OzoneManagerProtocolPB> 
createOMProxyIfNeeded(
+              OMProxyInfo<OzoneManagerProtocolPB> omProxyInfo) {
+            if (omProxyInfo.proxy == null) {
+              omProxyInfo.proxy = proxyMap.get(omProxyInfo.getNodeId());
+            }
+            return omProxyInfo;
+          }
+
+          @Override
+          protected void initOmProxiesFromConfigs(ConfigurationSource config, 
String omSvcId) throws IOException {
+            Map<String, OMProxyInfo<OzoneManagerProtocolPB>> omProxies = new 
HashMap<>();
+
+            List<String> omNodeIDList = new ArrayList<>();
+
+            Collection<String> activeOmNodeIds = 
OmUtils.getActiveOMNodeIds(config,
+                omSvcId);
+
+            for (String nodeId : 
OmUtils.emptyAsSingletonNull(activeOmNodeIds)) {
+
+              String rpcAddrKey = 
ConfUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
+                  omSvcId, nodeId);
+              String rpcAddrStr = OmUtils.getOmRpcAddress(config, rpcAddrKey);
+              if (rpcAddrStr == null) {
+                continue;
+              }
+
+              // ProxyInfo.proxy will be set during first time call to server.
+              OMProxyInfo<OzoneManagerProtocolPB> omProxyInfo = new 
OMProxyInfo<>(omSvcId, nodeId, rpcAddrStr);
+
+              if (omProxyInfo.getAddress() != null) {
+                // For a non-HA OM setup, nodeId might be null. If so, we 
assign it
+                // the default value
+                if (nodeId == null) {
+                  nodeId = OzoneConsts.OM_DEFAULT_NODE_ID;
+                }
+                omProxies.put(nodeId, omProxyInfo);
+                omNodeIDList.add(nodeId);
+              } else {
+                LOG.error("Failed to create OM proxy for {} at address {}",
+                    nodeId, rpcAddrStr);
+              }
+            }
+
+            if (omProxies.isEmpty()) {
+              throw new IllegalArgumentException("Could not find any 
configured " +
+                  "addresses for OM. Please configure the system with "
+                  + OZONE_OM_ADDRESS_KEY);
+            }
+            // By default, the omNodesInOrder is shuffled to reduce hotspot, 
but we can sort it here to
+            // make it easier to test
+            Collections.sort(omNodeIDList);
+            initOmProxies(omProxies, omNodeIDList);
+          }
+        };
+
+    // Wrap the leader-based failover proxy provider with follower read proxy 
provider
+    proxyProvider = new HadoopRpcOMFollowerReadFailoverProxyProvider<>(
+        OM_SERVICE_ID, OzoneManagerProtocolPB.class, underlyingProxyProvider);
+    assertTrue(proxyProvider.isUseFollowerRead());
+    // Wrap the follower read proxy provider in retry proxy to allow automatic 
failover
+    retryProxy = (OzoneManagerProtocolPB) RetryProxy.create(
+        OzoneManagerProtocolPB.class, proxyProvider,
+        proxyProvider.getRetryPolicy(2 * omNodeCount)
+    );
+    // This is currently added to prevent IllegalStateException in
+    // Client#setCallIdAndRetryCount since it seems that callId is set but not 
unset properly
+    RetryInvocationHandler.SET_CALL_ID_FOR_TEST.set(false);
+    conf = config;
+  }
+
+  private void doRead() throws Exception {
+    doRead(retryProxy);
+  }
+
+  private void doWrite() throws Exception {
+    doWrite(retryProxy);
+  }
+
+  private static void doWrite(OzoneManagerProtocolPB client) throws Exception {
+    CreateKeyRequest.Builder req = CreateKeyRequest.newBuilder();
+    KeyArgs keyArgs = KeyArgs.newBuilder()
+        .setVolumeName("volume")
+        .setBucketName("bucket")
+        .setKeyName("key")
+        .build();
+    req.setKeyArgs(keyArgs);
+
+    OMRequest omRequest = OMRequest.newBuilder()
+        .setVersion(ClientVersion.CURRENT_VERSION)
+        .setClientId(ClientId.randomId().toString())
+        .setCmdType(Type.CreateKey)
+        .setCreateKeyRequest(req)
+        .build();
+
+    client.submitRequest(null, omRequest);
+  }
+
+  private static void doRead(OzoneManagerProtocolPB client) throws Exception {
+    KeyArgs keyArgs = KeyArgs.newBuilder()
+        .setVolumeName("volume")
+        .setBucketName("bucket")
+        .setKeyName("key")
+        .build();
+    GetKeyInfoRequest.Builder req = GetKeyInfoRequest.newBuilder()
+        .setKeyArgs(keyArgs);
+
+    OMRequest omRequest = OMRequest.newBuilder()
+        .setVersion(ClientVersion.CURRENT_VERSION)
+        .setClientId(ClientId.randomId().toString())
+        .setCmdType(Type.GetKeyInfo)
+        .setGetKeyInfoRequest(req)
+        .build();
+
+    client.submitRequest(null, omRequest);
+  }
+
+  private void assertHandledBy(int omNodeIdx) {
+    OMProxyInfo<OzoneManagerProtocolPB> lastProxy =
+        (OMProxyInfo<OzoneManagerProtocolPB>) proxyProvider.getLastProxy();
+    assertEquals(omNodeIds[omNodeIdx], lastProxy.getNodeId());
+  }
+
+  private static class OMAnswer {
+
+    private volatile boolean unreachable = false;
+    private volatile boolean slowNode = false;
+
+    private volatile boolean isLeader = false;
+    private volatile boolean isLeaderReady = true;
+    private volatile boolean isFollowerReadSupported = true;
+
+    private OMProtocolAnswer clientAnswer = new OMProtocolAnswer();
+
+    private class OMProtocolAnswer implements Answer<OMResponse> {
+      @Override
+      public OMResponse answer(InvocationOnMock invocationOnMock) throws 
Throwable {
+        if (unreachable) {
+          throw new IOException("Unavailable");
+        }
+
+        // sleep to simulate slow rpc responses.
+        if (slowNode) {
+          Thread.sleep(SLOW_RESPONSE_SLEEP_TIME);
+        }
+        OMRequest omRequest = invocationOnMock.getArgument(1);
+        switch (omRequest.getCmdType()) {
+        case CreateKey:
+          if (!isLeader) {
+            throw new ServiceException(
+                new RemoteException(
+                    OMNotLeaderException.class.getCanonicalName(),
+                    "Write can only be done on leader"
+                )
+            );
+          }
+          if (isLeader && !isLeaderReady) {
+            throw new ServiceException(
+                new RemoteException(
+                    OMLeaderNotReadyException.class.getCanonicalName(),
+                    "Leader is not ready yet"
+                )
+            );
+          }
+          break;
+        case GetKeyInfo:
+          if (!isLeader && !isFollowerReadSupported) {
+            throw new ServiceException(
+                new RemoteException(
+                    OMNotLeaderException.class.getCanonicalName(),
+                    "OM follower read is not supported"
+                )
+            );
+          }
+          if (isLeader && !isLeaderReady) {
+            throw new ServiceException(
+                new RemoteException(
+                    OMLeaderNotReadyException.class.getCanonicalName(),
+                    "Leader is not ready yet"
+                )
+            );
+          }
+          break;
+        default:
+          throw new IllegalArgumentException("Unsupported cmdType");
+        }
+        return null;
+      }
+    }
+  }
+}
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/OmTestUtil.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/OmTestUtil.java
index a90aacd4b19..c5a52049128 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/OmTestUtil.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/OmTestUtil.java
@@ -19,6 +19,7 @@
 
 import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
+import 
org.apache.hadoop.ozone.om.ha.HadoopRpcOMFollowerReadFailoverProxyProvider;
 import org.apache.hadoop.ozone.om.protocolPB.Hadoop3OmTransport;
 import 
org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
 import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
@@ -34,6 +35,16 @@ static 
HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> getFailoverProxy
     return transport.getOmFailoverProxyProvider();
   }
 
+  static HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB> 
getFollowerReadFailoverProxyProvider(
+      ObjectStore store) {
+    OzoneManagerProtocolClientSideTranslatorPB ozoneManagerClient =
+        (OzoneManagerProtocolClientSideTranslatorPB) 
store.getClientProxy().getOzoneManagerClient();
+
+    Hadoop3OmTransport transport =
+        (Hadoop3OmTransport) ozoneManagerClient.getTransport();
+    return transport.getOmFollowerReadFailoverProxyProvider();
+  }
+
   static String getCurrentOmProxyNodeId(ObjectStore store) {
     return getFailoverProxyProvider(store).getCurrentProxyOMNodeId();
   }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerRead.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerRead.java
new file mode 100644
index 00000000000..ddafde7103e
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerRead.java
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_ENABLED_KEY;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ipc_.RemoteException;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.BucketArgs;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneKey;
+import org.apache.hadoop.ozone.client.OzoneKeyDetails;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.VolumeArgs;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServerConfig;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+
+/**
+ * Base class for Ozone Manager HA tests.
+ */
+public abstract class TestOzoneManagerHAFollowerRead {
+
+  private static MiniOzoneHAClusterImpl cluster = null;
+  private static ObjectStore objectStore;
+  private static OzoneConfiguration conf;
+  private static String omServiceId;
+  private static int numOfOMs = 3;
+  private static final int LOG_PURGE_GAP = 50;
+  /* Reduce max number of retries to speed up unit test. */
+  private static final int OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS = 5;
+  private static final int IPC_CLIENT_CONNECT_MAX_RETRIES = 4;
+  private static final long SNAPSHOT_THRESHOLD = 50;
+  private static final Duration RETRY_CACHE_DURATION = Duration.ofSeconds(30);
+  private static OzoneClient client;
+
+  public MiniOzoneHAClusterImpl getCluster() {
+    return cluster;
+  }
+
+  public ObjectStore getObjectStore() {
+    return objectStore;
+  }
+
+  public static OzoneClient getClient() {
+    return client;
+  }
+
+  public OzoneConfiguration getConf() {
+    return conf;
+  }
+
+  public String getOmServiceId() {
+    return omServiceId;
+  }
+
+  public static int getLogPurgeGap() {
+    return LOG_PURGE_GAP;
+  }
+
+  public static long getSnapshotThreshold() {
+    return SNAPSHOT_THRESHOLD;
+  }
+
+  public static int getNumOfOMs() {
+    return numOfOMs;
+  }
+
+  public static int getOzoneClientFailoverMaxAttempts() {
+    return OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS;
+  }
+
+  public static Duration getRetryCacheDuration() {
+    return RETRY_CACHE_DURATION;
+  }
+
+  @BeforeAll
+  public static void init() throws Exception {
+    conf = new OzoneConfiguration();
+    omServiceId = "om-service-test1";
+    conf.setBoolean(OZONE_ACL_ENABLED, true);
+    conf.set(OzoneConfigKeys.OZONE_ADMINISTRATORS,
+        OZONE_ADMINISTRATORS_WILDCARD);
+    conf.setInt(OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
+        OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS);
+    conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
+        IPC_CLIENT_CONNECT_MAX_RETRIES);
+    /* Reduce IPC retry interval to speed up unit test. */
+    conf.setInt(IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY, 200);
+    conf.setInt(OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP, LOG_PURGE_GAP);
+    conf.setLong(
+        OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY,
+        SNAPSHOT_THRESHOLD);
+    // Enable filesystem snapshot feature for the test regardless of the 
default
+    conf.setBoolean(OMConfigKeys.OZONE_FILESYSTEM_SNAPSHOT_ENABLED_KEY, true);
+
+    // Some subclasses check RocksDB directly as part of their tests. These
+    // depend on OBS layout.
+    conf.set(OZONE_DEFAULT_BUCKET_LAYOUT,
+        OMConfigKeys.OZONE_BUCKET_LAYOUT_OBJECT_STORE);
+
+    OzoneManagerRatisServerConfig omHAConfig =
+        conf.getObject(OzoneManagerRatisServerConfig.class);
+
+    omHAConfig.setRetryCacheTimeout(RETRY_CACHE_DURATION);
+
+    // Enable the OM follower read
+    omHAConfig.setReadOption("LINEARIZABLE");
+    omHAConfig.setReadLeaderLeaseEnabled(true);
+
+    conf.setFromObject(omHAConfig);
+
+    // config for key deleting service.
+    conf.set(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, "10s");
+    conf.set(OZONE_KEY_DELETING_LIMIT_PER_TASK, "2");
+
+    MiniOzoneHAClusterImpl.Builder clusterBuilder = 
MiniOzoneCluster.newHABuilder(conf)
+        .setOMServiceId(omServiceId)
+        .setNumOfOzoneManagers(numOfOMs);
+
+    cluster = clusterBuilder.build();
+    cluster.waitForClusterToBeReady();
+
+    OzoneConfiguration clientConf = OzoneConfiguration.of(conf);
+    clientConf.setBoolean(OZONE_CLIENT_FOLLOWER_READ_ENABLED_KEY, true);
+    client = OzoneClientFactory.getRpcClient(omServiceId, clientConf);
+    objectStore = client.getObjectStore();
+  }
+
+  @AfterAll
+  public static void shutdown() {
+    IOUtils.closeQuietly(client);
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Create a key in the bucket.
+   *
+   * @return the key name.
+   */
+  public static String createKey(OzoneBucket ozoneBucket) throws IOException {
+    String keyName = "key" + RandomStringUtils.secure().nextNumeric(5);
+    createKey(ozoneBucket, keyName);
+    return keyName;
+  }
+
+  public static void createKey(OzoneBucket ozoneBucket, String keyName) throws 
IOException {
+    String data = "data" + RandomStringUtils.secure().nextNumeric(5);
+    OzoneOutputStream ozoneOutputStream = ozoneBucket.createKey(keyName, 
data.length(), ReplicationType.RATIS,
+        ReplicationFactor.ONE, new HashMap<>());
+    ozoneOutputStream.write(data.getBytes(UTF_8), 0, data.length());
+    ozoneOutputStream.close();
+  }
+
+  public static String createPrefixName() {
+    return "prefix" + RandomStringUtils.secure().nextNumeric(5) + 
OZONE_URI_DELIMITER;
+  }
+
+  public static void createPrefix(OzoneObj prefixObj) throws IOException {
+    assertTrue(objectStore.setAcl(prefixObj, Collections.emptyList()));
+  }
+
+  protected OzoneBucket setupBucket() throws Exception {
+    String userName = "user" + RandomStringUtils.secure().nextNumeric(5);
+    String adminName = "admin" + RandomStringUtils.secure().nextNumeric(5);
+    String volumeName = "volume" + UUID.randomUUID();
+
+    VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
+        .setOwner(userName)
+        .setAdmin(adminName)
+        .build();
+
+    objectStore.createVolume(volumeName, createVolumeArgs);
+    OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName);
+
+    assertEquals(volumeName, retVolumeinfo.getName());
+    assertEquals(userName, retVolumeinfo.getOwner());
+    assertEquals(adminName, retVolumeinfo.getAdmin());
+
+    String bucketName = UUID.randomUUID().toString();
+    retVolumeinfo.createBucket(bucketName);
+
+    OzoneBucket ozoneBucket = retVolumeinfo.getBucket(bucketName);
+
+    assertEquals(bucketName, ozoneBucket.getName());
+    assertEquals(volumeName, ozoneBucket.getVolumeName());
+
+    return ozoneBucket;
+  }
+
+  protected OzoneBucket linkBucket(OzoneBucket srcBuk) throws Exception {
+    String userName = "user" + RandomStringUtils.secure().nextNumeric(5);
+    String adminName = "admin" + RandomStringUtils.secure().nextNumeric(5);
+    String linkedVolName = "volume-link-" + 
RandomStringUtils.secure().nextNumeric(5);
+
+    VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
+        .setOwner(userName)
+        .setAdmin(adminName)
+        .build();
+
+    BucketArgs createBucketArgs = new BucketArgs.Builder()
+        .setSourceVolume(srcBuk.getVolumeName())
+        .setSourceBucket(srcBuk.getName())
+        .build();
+
+    objectStore.createVolume(linkedVolName, createVolumeArgs);
+    OzoneVolume linkedVolumeInfo = objectStore.getVolume(linkedVolName);
+
+    assertEquals(linkedVolName, linkedVolumeInfo.getName());
+    assertEquals(userName, linkedVolumeInfo.getOwner());
+    assertEquals(adminName, linkedVolumeInfo.getAdmin());
+
+    String linkedBucketName = UUID.randomUUID().toString();
+    linkedVolumeInfo.createBucket(linkedBucketName, createBucketArgs);
+
+    OzoneBucket linkedBucket = linkedVolumeInfo.getBucket(linkedBucketName);
+
+    assertEquals(linkedBucketName, linkedBucket.getName());
+    assertEquals(linkedVolName, linkedBucket.getVolumeName());
+    assertTrue(linkedBucket.isLink());
+
+    return linkedBucket;
+  }
+
+  /**
+   * Create a volume and test its attribute.
+   */
+  protected void createVolumeTest(boolean checkSuccess) throws Exception {
+    String userName = "user" + RandomStringUtils.secure().nextNumeric(5);
+    String adminName = "admin" + RandomStringUtils.secure().nextNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.secure().nextNumeric(5);
+
+    VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
+        .setOwner(userName)
+        .setAdmin(adminName)
+        .build();
+
+    try {
+      objectStore.createVolume(volumeName, createVolumeArgs);
+
+      OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName);
+
+      if (checkSuccess) {
+        assertEquals(volumeName, retVolumeinfo.getName());
+        assertEquals(userName, retVolumeinfo.getOwner());
+        assertEquals(adminName, retVolumeinfo.getAdmin());
+      } else {
+        // Verify that the request failed
+        fail("There is no quorum. Request should have failed");
+      }
+    } catch (IOException e) {
+      if (!checkSuccess) {
+        // If the last OM to be tried by the RetryProxy is down, we would get
+        // ConnectException. Otherwise, we would get a RemoteException from the
+        // last running OM as it would fail to get a quorum.
+        if (e instanceof RemoteException) {
+          assertThat(e).hasMessageContaining("is not the leader");
+        } else if (e instanceof ConnectException) {
+          assertThat(e).hasMessageContaining("Connection refused");
+        } else {
+          assertThat(e).hasMessageContaining("Could not determine or connect 
to OM Leader");
+        }
+      } else {
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * This method createFile and verifies the file is successfully created or
+   * not.
+   *
+   * @param ozoneBucket
+   * @param keyName
+   * @param data
+   * @param recursive
+   * @param overwrite
+   * @throws Exception
+   */
+  protected void testCreateFile(OzoneBucket ozoneBucket, String keyName,
+      String data, boolean recursive,
+      boolean overwrite)
+      throws Exception {
+
+    OzoneOutputStream ozoneOutputStream = ozoneBucket.createFile(keyName,
+        data.length(), ReplicationType.RATIS, ReplicationFactor.ONE,
+        overwrite, recursive);
+
+    ozoneOutputStream.write(data.getBytes(UTF_8), 0, data.length());
+    ozoneOutputStream.close();
+
+    OzoneKeyDetails ozoneKeyDetails = ozoneBucket.getKey(keyName);
+
+    assertEquals(keyName, ozoneKeyDetails.getName());
+    assertEquals(ozoneBucket.getName(), ozoneKeyDetails.getBucketName());
+    assertEquals(ozoneBucket.getVolumeName(),
+        ozoneKeyDetails.getVolumeName());
+    assertEquals(data.length(), ozoneKeyDetails.getDataSize());
+    assertTrue(ozoneKeyDetails.isFile());
+
+    try (OzoneInputStream ozoneInputStream = ozoneBucket.readKey(keyName)) {
+      byte[] fileContent = new byte[data.getBytes(UTF_8).length];
+      IOUtils.readFully(ozoneInputStream, fileContent);
+      assertEquals(data, new String(fileContent, UTF_8));
+    }
+
+    Iterator<? extends OzoneKey> iterator = ozoneBucket.listKeys("/");
+    while (iterator.hasNext()) {
+      OzoneKey ozoneKey = iterator.next();
+      if (!ozoneKey.getName().endsWith(OM_KEY_PREFIX)) {
+        assertTrue(ozoneKey.isFile());
+      } else {
+        assertFalse(ozoneKey.isFile());
+      }
+    }
+  }
+
+  protected void createKeyTest(boolean checkSuccess) throws Exception {
+    String userName = "user" + RandomStringUtils.secure().nextNumeric(5);
+    String adminName = "admin" + RandomStringUtils.secure().nextNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.secure().nextNumeric(5);
+
+    VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
+        .setOwner(userName)
+        .setAdmin(adminName)
+        .build();
+
+    try {
+      getObjectStore().createVolume(volumeName, createVolumeArgs);
+
+      OzoneVolume retVolumeinfo = getObjectStore().getVolume(volumeName);
+
+      assertEquals(volumeName, retVolumeinfo.getName());
+      assertEquals(userName, retVolumeinfo.getOwner());
+      assertEquals(adminName, retVolumeinfo.getAdmin());
+
+      String bucketName = UUID.randomUUID().toString();
+      String keyName = UUID.randomUUID().toString();
+      retVolumeinfo.createBucket(bucketName);
+
+      OzoneBucket ozoneBucket = retVolumeinfo.getBucket(bucketName);
+
+      assertEquals(bucketName, ozoneBucket.getName());
+      assertEquals(volumeName, ozoneBucket.getVolumeName());
+
+      String value = "random data";
+      OzoneOutputStream ozoneOutputStream = ozoneBucket.createKey(keyName,
+          value.length(), ReplicationType.RATIS,
+          ReplicationFactor.ONE, new HashMap<>());
+      ozoneOutputStream.write(value.getBytes(UTF_8), 0, value.length());
+      ozoneOutputStream.close();
+
+      try (OzoneInputStream ozoneInputStream = ozoneBucket.readKey(keyName)) {
+        byte[] fileContent = new byte[value.getBytes(UTF_8).length];
+        IOUtils.readFully(ozoneInputStream, fileContent);
+        assertEquals(value, new String(fileContent, UTF_8));
+      }
+
+    } catch (IOException e) {
+      if (!checkSuccess) {
+        // If the last OM to be tried by the RetryProxy is down, we would get
+        // ConnectException. Otherwise, we would get a RemoteException from the
+        // last running OM as it would fail to get a quorum.
+        if (e instanceof RemoteException) {
+          assertThat(e).hasMessageContaining("is not the leader");
+        } else if (e instanceof ConnectException) {
+          assertThat(e).hasMessageContaining("Connection refused");
+        } else {
+          assertThat(e).hasMessageContaining("Could not determine or connect 
to OM Leader");
+        }
+      } else {
+        throw e;
+      }
+    }
+  }
+
+  protected void waitForLeaderToBeReady()
+      throws InterruptedException, TimeoutException {
+    // Wait for Leader Election timeout
+    cluster.waitForLeaderOM();
+  }
+}
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java
new file mode 100644
index 00000000000..6bafe2a6815
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java
@@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import static java.util.UUID.randomUUID;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_ENABLED_KEY;
+import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.DIRECTORY_NOT_FOUND;
+import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_ALREADY_EXISTS;
+import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_A_FILE;
+import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.PARTIAL_DELETE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.google.protobuf.ServiceException;
+import java.net.InetSocketAddress;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.StorageType;
+import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.hadoop.ozone.ClientVersion;
+import org.apache.hadoop.ozone.OzoneTestUtils;
+import org.apache.hadoop.ozone.client.BucketArgs;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneKey;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.VolumeArgs;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
+import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
+import 
org.apache.hadoop.ozone.om.ha.HadoopRpcOMFollowerReadFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.ha.OMProxyInfo;
+import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateVolumeRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo;
+import 
org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Ozone Manager HA follower read tests where all OMs are running throughout 
all tests.
+ * @see TestOzoneManagerHAFollowerReadWithAllRunning
+ */
+public class TestOzoneManagerHAFollowerReadWithAllRunning extends 
TestOzoneManagerHAFollowerRead {
+
+  @Test
+  void testOMFollowerReadProxyProviderInitialization() {
+    HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB> 
followerReadFailoverProxyProvider =
+        OmTestUtil.getFollowerReadFailoverProxyProvider(getObjectStore());
+
+    List<OMProxyInfo<OzoneManagerProtocolPB>> omProxies =
+        followerReadFailoverProxyProvider.getOMProxies();
+
+    assertEquals(getNumOfOMs(), omProxies.size());
+
+    for (int i = 0; i < getNumOfOMs(); i++) {
+      OzoneManager om = getCluster().getOzoneManager(i);
+      InetSocketAddress omRpcServerAddr = om.getOmRpcServerAddr();
+      boolean omClientProxyExists = false;
+      for (OMProxyInfo<OzoneManagerProtocolPB> omProxyInfo : omProxies) {
+        if (omProxyInfo.getAddress().equals(omRpcServerAddr)) {
+          omClientProxyExists = true;
+          break;
+        }
+      }
+      assertTrue(omClientProxyExists,
+          () -> "No Client Proxy for node " + om.getOMNodeId());
+    }
+  }
+
+  @Test
+  void testFollowerReadTargetsFollower() throws Exception {
+    ObjectStore objectStore = getObjectStore();
+    HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB> 
followerReadFailoverProxyProvider =
+        OmTestUtil.getFollowerReadFailoverProxyProvider(objectStore);
+
+    String leaderOMNodeId = getCluster().getOMLeader().getOMNodeId();
+    String followerOMNodeId = null;
+    for (OzoneManager om : getCluster().getOzoneManagersList()) {
+      if (!om.getOMNodeId().equals(leaderOMNodeId)) {
+        followerOMNodeId = om.getOMNodeId();
+        break;
+      }
+    }
+    assertNotNull(followerOMNodeId);
+
+    
followerReadFailoverProxyProvider.changeInitialProxyForTest(followerOMNodeId);
+    objectStore.getClientProxy().listVolumes(null, null, 10);
+
+    OMProxyInfo<OzoneManagerProtocolPB> lastProxy =
+        (OMProxyInfo<OzoneManagerProtocolPB>) 
followerReadFailoverProxyProvider.getLastProxy();
+    assertNotNull(lastProxy);
+    assertEquals(followerOMNodeId, lastProxy.getNodeId());
+  }
+
+  @Test
+  public void testOMProxyProviderFailoverToCurrentLeader() throws Exception {
+    ObjectStore objectStore = getObjectStore();
+    HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> 
omFailoverProxyProvider =
+        OmTestUtil.getFailoverProxyProvider(objectStore);
+    HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB> 
followerReadFailoverProxyProvider =
+        OmTestUtil.getFollowerReadFailoverProxyProvider(objectStore);
+    String initialFollowerReadNodeId = 
followerReadFailoverProxyProvider.getCurrentProxy().getNodeId();
+
+    // Run couple of createVolume tests to discover the current Leader OM
+    createVolumeTest(true);
+    createVolumeTest(true);
+
+    // The oMFailoverProxyProvider will point to the current leader OM node.
+    String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
+
+    // Perform a manual failover of the proxy provider to move the
+    // currentProxyIndex to a node other than the leader OM.
+    omFailoverProxyProvider.selectNextOmProxy();
+    omFailoverProxyProvider.performFailover(null);
+
+    String newProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
+    assertNotEquals(leaderOMNodeId, newProxyNodeId);
+
+    // Once another request is sent to this new proxy node, the leader
+    // information must be returned via the response and a failover must
+    // happen to the leader proxy node.
+    // This will also do some read operations where this might read from the 
follower.
+    createVolumeTest(true);
+    Thread.sleep(2000);
+
+    String newLeaderOMNodeId =
+        omFailoverProxyProvider.getCurrentProxyOMNodeId();
+
+    // The old and new Leader OM NodeId must match since there was no new
+    // election in the Ratis ring.
+    assertEquals(leaderOMNodeId, newLeaderOMNodeId);
+
+    // The follower read proxy should remain unchanged since the follower is 
not throwing exceptions
+    // The performFailover on the leader proxy should not affect the follower 
read proxy provider
+    String currentFollowerReadNodeId = 
followerReadFailoverProxyProvider.getCurrentProxy().getNodeId();
+    assertEquals(initialFollowerReadNodeId, currentFollowerReadNodeId);
+  }
+
+  /**
+   * Choose a follower to send the request, the returned exception should
+   * include the suggested leader node.
+   */
+  @Test
+  public void testFailoverWithSuggestedLeader() throws Exception {
+    HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> 
omFailoverProxyProvider =
+        OmTestUtil.getFailoverProxyProvider(getObjectStore());
+
+    // Make sure All OMs are ready.
+    createVolumeTest(true);
+
+    String leaderOMNodeId = null;
+    OzoneManager followerOM = null;
+    for (OzoneManager om: getCluster().getOzoneManagersList()) {
+      if (om.isLeaderReady()) {
+        leaderOMNodeId = om.getOMNodeId();
+      } else if (followerOM == null) {
+        followerOM = om;
+      }
+    }
+    assertNotNull(followerOM);
+    assertNotNull(leaderOMNodeId);
+    String leaderOMAddress = ((OMProxyInfo)
+        omFailoverProxyProvider.getOMProxyMap().get(leaderOMNodeId))
+        .getAddress().getAddress().toString();
+    assertSame(OzoneManagerRatisServer.RaftServerStatus.NOT_LEADER,
+        followerOM.getOmRatisServer().getLeaderStatus());
+
+    CreateVolumeRequest.Builder req =
+        CreateVolumeRequest.newBuilder();
+    VolumeInfo volumeInfo = VolumeInfo.newBuilder()
+        .setVolume("testvolume")
+        .setAdminName("admin")
+        .setOwnerName("admin")
+        .build();
+    req.setVolumeInfo(volumeInfo);
+
+    OzoneManagerProtocolProtos.OMRequest writeRequest =
+        OzoneManagerProtocolProtos.OMRequest.newBuilder()
+            .setCmdType(Type.CreateVolume)
+            .setCreateVolumeRequest(req)
+            .setVersion(ClientVersion.CURRENT_VERSION)
+            .setClientId(randomUUID().toString())
+            .build();
+
+    OzoneManagerProtocolServerSideTranslatorPB omServerProtocol =
+        followerOM.getOmServerProtocol();
+    ServiceException ex = assertThrows(ServiceException.class,
+        () -> omServerProtocol.submitRequest(null, writeRequest));
+    assertThat(ex).hasCauseInstanceOf(OMNotLeaderException.class)
+        .hasMessageEndingWith("Suggested leader is OM:" + leaderOMNodeId + "[" 
+ leaderOMAddress + "].");
+  }
+
+  /**
+   * Test strong read-after-write consistency across clients. This means that
+   * after a single client has finished writing, another client should be able
+   * to immediately see the changes.
+   */
+  @Test
+  void testLinearizableReadConsistency() throws Exception {
+    // Setup another client
+    OzoneConfiguration clientConf = OzoneConfiguration.of(getConf());
+    clientConf.setBoolean(OZONE_CLIENT_FOLLOWER_READ_ENABLED_KEY, true);
+    OzoneClient anotherClient = null;
+    try {
+      anotherClient = OzoneClientFactory.getRpcClient(getOmServiceId(), 
clientConf);
+      ObjectStore anotherObjectStore = anotherClient.getObjectStore();
+
+      // Ensure that the proxy provider of the two clients are not shared
+      assertNotSame(
+          OmTestUtil.getFailoverProxyProvider(getObjectStore()),
+          OmTestUtil.getFailoverProxyProvider(anotherObjectStore));
+      HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB> 
otherClientFollowerReadProxyProvider =
+          OmTestUtil.getFollowerReadFailoverProxyProvider(anotherObjectStore);
+      assertNotSame(
+          OmTestUtil.getFollowerReadFailoverProxyProvider(getObjectStore()),
+          otherClientFollowerReadProxyProvider);
+      String initialProxyOmNodeId = 
otherClientFollowerReadProxyProvider.getCurrentProxy().getNodeId();
+
+      // Setup the bucket and create a key with the default client
+      OzoneBucket ozoneBucket = setupBucket();
+      String key = createKey(ozoneBucket);
+
+      // Immediately read using another client, this might or might
+      // not be sent to the leader. Regardless, the other client should be
+      // able to see the read immediately.
+      OzoneKey keyReadFromAnotherClient = 
anotherObjectStore.getClientProxy().headObject(
+          ozoneBucket.getVolumeName(), ozoneBucket.getName(), key);
+      assertEquals(key, keyReadFromAnotherClient.getName());
+
+      // Create a more keys
+      for (int i = 0; i < 100; i++) {
+        createKey(ozoneBucket);
+      }
+
+      List<OzoneKey> ozoneKeys = anotherObjectStore.getClientProxy().listKeys(
+          ozoneBucket.getVolumeName(), ozoneBucket.getName(),
+          null, null, 1000);
+      assertEquals(101, ozoneKeys.size());
+      // Since the OM node is normal, it should not failover
+      assertEquals(initialProxyOmNodeId, 
otherClientFollowerReadProxyProvider.getCurrentProxy().getNodeId());
+    } finally {
+      IOUtils.closeQuietly(anotherClient);
+    }
+  }
+
+  @Test
+  void testFileOperationsWithRecursive() throws Exception {
+    OzoneBucket ozoneBucket = setupBucket();
+
+    String data = "random data";
+
+    // one level key name
+    testCreateFile(ozoneBucket, randomUUID().toString(), data, true, false);
+
+    // multi level key name
+    String keyName = "dir1/dir2/dir3/file1";
+    testCreateFile(ozoneBucket, keyName, data, true, false);
+
+    String newData = "random data random data";
+
+    // multi level key name with overwrite set.
+    testCreateFile(ozoneBucket, keyName, newData, true, true);
+
+    OMException ex = assertThrows(OMException.class,
+        () -> testCreateFile(ozoneBucket, keyName, "any", true, false));
+    assertEquals(FILE_ALREADY_EXISTS, ex.getResult());
+
+    // Try now with a file name which is same as a directory.
+    String dir = "folder/folder2";
+    ozoneBucket.createDirectory(dir);
+    ex = assertThrows(OMException.class,
+        () -> testCreateFile(ozoneBucket, dir, "any", true, false));
+    assertEquals(NOT_A_FILE, ex.getResult());
+  }
+
+  @Test
+  void testKeysDelete() throws Exception {
+    OzoneBucket ozoneBucket = setupBucket();
+    String data = "random data";
+    String keyName1 = "dir/file1";
+    String keyName2 = "dir/file2";
+    String keyName3 = "dir/file3";
+    String keyName4 = "dir/file4";
+    List<String> keyList1 = new ArrayList<>();
+    keyList1.add(keyName2);
+    keyList1.add(keyName3);
+
+    testCreateFile(ozoneBucket, keyName1, data, true, false);
+    testCreateFile(ozoneBucket, keyName2, data, true, false);
+    testCreateFile(ozoneBucket, keyName3, data, true, false);
+    testCreateFile(ozoneBucket, keyName4, data, true, false);
+
+    // Delete keyName1 use deleteKey api.
+    ozoneBucket.deleteKey(keyName1);
+
+    // Delete keyName2 and keyName3 in keyList1 using the deleteKeys api.
+    ozoneBucket.deleteKeys(keyList1);
+
+    // In keyList2 keyName3 was previously deleted and KeyName4 exists .
+    List<String> keyList2 = new ArrayList<>();
+    keyList2.add(keyName3);
+    keyList2.add(keyName4);
+
+    // Because keyName3 has been deleted, there should be a KEY_NOT_FOUND
+    // exception. In this case, we test for deletion failure.
+    OMException ex = assertThrows(OMException.class,
+        () -> ozoneBucket.deleteKeys(keyList2));
+    // The expected exception PARTIAL_DELETE, as if not able to delete, we
+    // return error codee PARTIAL_DElETE.
+    assertEquals(PARTIAL_DELETE, ex.getResult());
+  }
+
+  @Test
+  void testFileOperationsWithNonRecursive() throws Exception {
+    OzoneBucket ozoneBucket = setupBucket();
+
+    String data = "random data";
+
+    // one level key name
+    testCreateFile(ozoneBucket, randomUUID().toString(), data, false, false);
+
+    // multi level key name
+    String keyName = "dir1/dir2/dir3/file1";
+
+    // Should fail, as this is non-recursive and no parent directories exist
+    try {
+      testCreateFile(ozoneBucket, keyName, data, false, false);
+    } catch (OMException ex) {
+      assertEquals(DIRECTORY_NOT_FOUND, ex.getResult());
+    }
+
+    // create directory, now this should pass.
+    ozoneBucket.createDirectory("dir1/dir2/dir3");
+    testCreateFile(ozoneBucket, keyName, data, false, false);
+    data = "random data random data";
+
+    // multi level key name with overwrite set.
+    testCreateFile(ozoneBucket, keyName, data, false, true);
+
+    OMException ex = assertThrows(OMException.class,
+        () -> testCreateFile(ozoneBucket, keyName, "any", false, false));
+    assertEquals(FILE_ALREADY_EXISTS, ex.getResult());
+
+    // Try now with a file which already exists under the path
+    ozoneBucket.createDirectory("folder1/folder2/folder3/folder4");
+
+    testCreateFile(ozoneBucket, "folder1/folder2/folder3/folder4/file1", data,
+        false, false);
+
+    testCreateFile(ozoneBucket, "folder1/folder2/folder3/file1", data, false,
+        false);
+
+    // Try now with a file under path already. This should fail.
+    String dir = "folder/folder2";
+    ozoneBucket.createDirectory(dir);
+    ex = assertThrows(OMException.class,
+        () -> testCreateFile(ozoneBucket, dir, "any", false, false)
+    );
+    assertEquals(NOT_A_FILE, ex.getResult());
+  }
+
+  private OzoneVolume createAndCheckVolume(String volumeName)
+      throws Exception {
+    String userName = "user" + RandomStringUtils.secure().nextNumeric(5);
+    String adminName = "admin" + RandomStringUtils.secure().nextNumeric(5);
+    VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
+        .setOwner(userName)
+        .setAdmin(adminName)
+        .build();
+
+    ObjectStore objectStore = getObjectStore();
+    objectStore.createVolume(volumeName, createVolumeArgs);
+
+    OzoneVolume retVolume = objectStore.getVolume(volumeName);
+
+    assertEquals(volumeName, retVolume.getName());
+    assertEquals(userName, retVolume.getOwner());
+    assertEquals(adminName, retVolume.getAdmin());
+
+    return retVolume;
+  }
+
+  @Test
+  public void testAllVolumeOperations() throws Exception {
+    String volumeName = "volume" + RandomStringUtils.secure().nextNumeric(5);
+
+    createAndCheckVolume(volumeName);
+
+    ObjectStore objectStore = getObjectStore();
+    objectStore.deleteVolume(volumeName);
+
+    OzoneTestUtils.expectOmException(OMException.ResultCodes.VOLUME_NOT_FOUND,
+        () -> objectStore.getVolume(volumeName));
+
+    OzoneTestUtils.expectOmException(OMException.ResultCodes.VOLUME_NOT_FOUND,
+        () -> objectStore.deleteVolume(volumeName));
+  }
+
+  @Test
+  public void testAllBucketOperations() throws Exception {
+    String volumeName = "volume" + RandomStringUtils.secure().nextNumeric(5);
+    String bucketName = "volume" + RandomStringUtils.secure().nextNumeric(5);
+
+    OzoneVolume retVolume = createAndCheckVolume(volumeName);
+
+    BucketArgs bucketArgs =
+        BucketArgs.newBuilder().setStorageType(StorageType.DISK)
+            .setVersioning(true).build();
+
+
+    retVolume.createBucket(bucketName, bucketArgs);
+
+
+    OzoneBucket ozoneBucket = retVolume.getBucket(bucketName);
+
+    assertEquals(volumeName, ozoneBucket.getVolumeName());
+    assertEquals(bucketName, ozoneBucket.getName());
+    assertTrue(ozoneBucket.getVersioning());
+    assertEquals(StorageType.DISK, ozoneBucket.getStorageType());
+    assertFalse(ozoneBucket.getCreationTime().isAfter(Instant.now()));
+
+
+    // Change versioning to false
+    ozoneBucket.setVersioning(false);
+
+    ozoneBucket = retVolume.getBucket(bucketName);
+    assertFalse(ozoneBucket.getVersioning());
+
+    retVolume.deleteBucket(bucketName);
+
+    OzoneTestUtils.expectOmException(OMException.ResultCodes.BUCKET_NOT_FOUND,
+        () -> retVolume.deleteBucket(bucketName));
+  }
+}
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithStoppedNodes.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithStoppedNodes.java
new file mode 100644
index 00000000000..a2e89a9e5b5
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithStoppedNodes.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.hadoop.ozone.MiniOzoneHAClusterImpl.NODE_FAILURE_TIMEOUT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_DEFAULT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.net.ConnectException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdfs.LogVerificationAppender;
+import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
+import 
org.apache.hadoop.ozone.om.ha.HadoopRpcOMFollowerReadFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.ha.OMProxyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
+import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
+import org.apache.log4j.Logger;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+
+/**
+ * Ozone Manager HA follower read tests that stop/restart one or more OM nodes.
+ * @see TestOzoneManagerHAFollowerReadWithAllRunning
+ */
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+public class TestOzoneManagerHAFollowerReadWithStoppedNodes extends 
TestOzoneManagerHAFollowerRead {
+
+  /**
+   * After restarting OMs we need to wait
+   * for a leader to be elected and ready.
+   */
+  @BeforeEach
+  void setup() throws Exception {
+    waitForLeaderToBeReady();
+  }
+
+  /**
+   * Restart all OMs after each test.
+   */
+  @AfterEach
+  void resetCluster() throws Exception {
+    MiniOzoneHAClusterImpl cluster = getCluster();
+    if (cluster != null) {
+      cluster.restartOzoneManager();
+    }
+  }
+
+  /**
+   * Test client request succeeds when one OM node is down.
+   */
+  @Test
+  void oneOMDown() throws Exception {
+    changeFollowerReadInitialProxy(1);
+
+    getCluster().stopOzoneManager(1);
+    Thread.sleep(NODE_FAILURE_TIMEOUT * 4);
+
+    createVolumeTest(true);
+    createKeyTest(true);
+  }
+
+  /**
+   * Test client request fails when 2 OMs are down.
+   */
+  @Test
+  void twoOMDown() throws Exception {
+    changeFollowerReadInitialProxy(1);
+
+    getCluster().stopOzoneManager(1);
+    getCluster().stopOzoneManager(2);
+    Thread.sleep(NODE_FAILURE_TIMEOUT * 4);
+
+    createVolumeTest(false);
+    createKeyTest(false);
+  }
+
+  @Test
+  void testMultipartUpload() throws Exception {
+
+    // Happy scenario when all OM's are up.
+    OzoneBucket ozoneBucket = setupBucket();
+
+    String keyName = UUID.randomUUID().toString();
+    String uploadID = initiateMultipartUpload(ozoneBucket, keyName);
+
+    createMultipartKeyAndReadKey(ozoneBucket, keyName, uploadID);
+
+    testMultipartUploadWithOneOmNodeDown();
+  }
+
+  private void testMultipartUploadWithOneOmNodeDown() throws Exception {
+    OzoneBucket ozoneBucket = setupBucket();
+
+    String keyName = UUID.randomUUID().toString();
+    String uploadID = initiateMultipartUpload(ozoneBucket, keyName);
+
+    // After initiate multipartupload, shutdown leader OM.
+    // Stop leader OM, to see when the OM leader changes
+    // multipart upload is happening successfully or not.
+
+    HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> 
omFailoverProxyProvider =
+        OmTestUtil.getFailoverProxyProvider(getObjectStore());
+    HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB> 
followerReadFailoverProxyProvider =
+        OmTestUtil.getFollowerReadFailoverProxyProvider(getObjectStore());
+
+    // The omFailoverProxyProvider will point to the current leader OM node.
+    String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
+
+    // Also change the initial follower read proxy to the current leader OM 
node
+    changeFollowerReadInitialProxy(leaderOMNodeId);
+
+    // Stop one of the ozone manager, to see when the OM leader changes
+    // multipart upload is happening successfully or not.
+    getCluster().stopOzoneManager(leaderOMNodeId);
+    Thread.sleep(NODE_FAILURE_TIMEOUT * 4);
+
+    createMultipartKeyAndReadKey(ozoneBucket, keyName, uploadID);
+
+    String newLeaderOMNodeId =
+        omFailoverProxyProvider.getCurrentProxyOMNodeId();
+
+    assertNotEquals(leaderOMNodeId, newLeaderOMNodeId);
+    assertNotEquals(leaderOMNodeId, 
followerReadFailoverProxyProvider.getCurrentProxy().getNodeId());
+  }
+
+  private String initiateMultipartUpload(OzoneBucket ozoneBucket,
+      String keyName) throws Exception {
+
+    OmMultipartInfo omMultipartInfo =
+        ozoneBucket.initiateMultipartUpload(keyName,
+            ReplicationType.RATIS,
+            ReplicationFactor.ONE);
+
+    String uploadID = omMultipartInfo.getUploadID();
+    assertNotNull(uploadID);
+    return uploadID;
+  }
+
+  private void createMultipartKeyAndReadKey(OzoneBucket ozoneBucket,
+      String keyName, String uploadID) throws Exception {
+
+    String value = "random data";
+    OzoneOutputStream ozoneOutputStream = ozoneBucket.createMultipartKey(
+        keyName, value.length(), 1, uploadID);
+    ozoneOutputStream.write(value.getBytes(UTF_8), 0, value.length());
+    ozoneOutputStream.getMetadata().put(OzoneConsts.ETAG, 
DigestUtils.md5Hex(value));
+    ozoneOutputStream.close();
+
+
+    Map<Integer, String> partsMap = new HashMap<>();
+    partsMap.put(1, ozoneOutputStream.getCommitUploadPartInfo().getETag());
+    OmMultipartUploadCompleteInfo omMultipartUploadCompleteInfo =
+        ozoneBucket.completeMultipartUpload(keyName, uploadID, partsMap);
+
+    assertNotNull(omMultipartUploadCompleteInfo);
+    assertNotNull(omMultipartUploadCompleteInfo.getHash());
+
+
+    try (OzoneInputStream ozoneInputStream = ozoneBucket.readKey(keyName)) {
+      byte[] fileContent = new byte[value.getBytes(UTF_8).length];
+      IOUtils.readFully(ozoneInputStream, fileContent);
+      assertEquals(value, new String(fileContent, UTF_8));
+    }
+  }
+
+  @Test
+  void testLeaderOmProxyProviderFailoverOnConnectionFailure() throws Exception 
{
+    ObjectStore objectStore = getObjectStore();
+    HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> 
omFailoverProxyProvider =
+        OmTestUtil.getFailoverProxyProvider(objectStore);
+    String firstProxyNodeId = 
omFailoverProxyProvider.getCurrentProxyOMNodeId();
+
+    createVolumeTest(true);
+
+    // On stopping the current OM Proxy, the next connection attempt should
+    // failover to a another OM proxy.
+    getCluster().stopOzoneManager(firstProxyNodeId);
+    Thread.sleep(OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_DEFAULT * 4);
+
+    // Next request to the proxy provider should result in a failover
+    createVolumeTest(true);
+    Thread.sleep(OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_DEFAULT);
+
+    // Get the new OM Proxy NodeId
+    String newProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
+
+    // Verify that a failover occurred. the new proxy nodeId should be
+    // different from the old proxy nodeId.
+    assertNotEquals(firstProxyNodeId, newProxyNodeId);
+  }
+
+  @Test
+  void testFollowerReadOmProxyProviderFailoverOnConnectionFailure() throws 
Exception {
+    ObjectStore objectStore = getObjectStore();
+    HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB> 
followerReadFailoverProxyProvider =
+        OmTestUtil.getFollowerReadFailoverProxyProvider(objectStore);
+    String firstProxyNodeId = 
followerReadFailoverProxyProvider.getCurrentProxy().getNodeId();
+
+    objectStore.getClientProxy().listVolumes(null, null, 1000);
+
+    // On stopping the current OM Proxy, the next connection attempt should
+    // failover to another OM proxy.
+    getCluster().stopOzoneManager(firstProxyNodeId);
+
+    // Next request to the proxy provider should result in a failover
+    objectStore.getClientProxy().listVolumes(null, null, 1000);
+
+    // Get the new OM Proxy NodeId
+    String newProxyNodeId = 
followerReadFailoverProxyProvider.getCurrentProxy().getNodeId();
+
+    // Verify that a failover occurred. the new proxy nodeId should be
+    // different from the old proxy nodeId.
+    assertNotEquals(firstProxyNodeId, newProxyNodeId);
+    assertTrue(followerReadFailoverProxyProvider.isUseFollowerRead());
+  }
+
+  @Test
+  void testFollowerReadSkipsStoppedFollower() throws Exception {
+    ObjectStore objectStore = getObjectStore();
+    HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB> 
followerReadFailoverProxyProvider =
+        OmTestUtil.getFollowerReadFailoverProxyProvider(objectStore);
+
+    String leaderOMNodeId = getCluster().getOMLeader().getOMNodeId();
+    List<String> followerOmNodeIds = new ArrayList<>();
+    for (OzoneManager om : getCluster().getOzoneManagersList()) {
+      if (!om.getOMNodeId().equals(leaderOMNodeId)) {
+        followerOmNodeIds.add(om.getOMNodeId());
+      }
+    }
+    assertFalse(followerOmNodeIds.isEmpty());
+
+    String stoppedFollowerNodeId = followerOmNodeIds.get(0);
+    getCluster().stopOzoneManager(stoppedFollowerNodeId);
+    Thread.sleep(NODE_FAILURE_TIMEOUT * 4);
+
+    
followerReadFailoverProxyProvider.changeInitialProxyForTest(stoppedFollowerNodeId);
+    objectStore.getClientProxy().listVolumes(null, null, 10);
+
+    OMProxyInfo lastProxy =
+        (OMProxyInfo) followerReadFailoverProxyProvider.getLastProxy();
+    assertNotNull(lastProxy);
+    assertNotEquals(stoppedFollowerNodeId, lastProxy.getNodeId());
+  }
+
+  @Test
+  @Order(Integer.MAX_VALUE - 1)
+  void testIncrementalWaitTimeWithSameNodeFailover() throws Exception {
+    long waitBetweenRetries = getConf().getLong(
+        OzoneConfigKeys.OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_KEY,
+        OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_DEFAULT);
+    HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> 
omFailoverProxyProvider =
+        OmTestUtil.getFailoverProxyProvider(getObjectStore());
+
+    // The omFailoverProxyProvider will point to the current leader OM node.
+    String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
+
+    getCluster().stopOzoneManager(leaderOMNodeId);
+    Thread.sleep(NODE_FAILURE_TIMEOUT * 4);
+    createKeyTest(true); // failover should happen to new node
+
+    long numTimesTriedToSameNode = omFailoverProxyProvider.getWaitTime()
+        / waitBetweenRetries;
+    omFailoverProxyProvider.setNextOmProxy(omFailoverProxyProvider.
+        getCurrentProxyOMNodeId());
+    assertEquals((numTimesTriedToSameNode + 1) * waitBetweenRetries,
+        omFailoverProxyProvider.getWaitTime());
+  }
+
+  @Test
+  void testOMRetryProxy() {
+    int maxFailoverAttempts = getOzoneClientFailoverMaxAttempts();
+    // Stop all the OMs.
+    for (int i = 0; i < getNumOfOMs(); i++) {
+      getCluster().stopOzoneManager(i);
+    }
+
+    final LogVerificationAppender appender = new LogVerificationAppender();
+    final Logger logger = Logger.getRootLogger();
+    logger.addAppender(appender);
+
+    // After making N (set maxRetries value) connection attempts to OMs,
+    // the RpcClient should give up.
+    assertThrows(ConnectException.class, () -> createVolumeTest(true));
+    assertEquals(1,
+        appender.countLinesWithMessage("Failed to connect to OMs:"));
+    assertEquals(maxFailoverAttempts,
+        appender.countLinesWithMessage("Trying to failover"));
+    assertEquals(1, appender.countLinesWithMessage("Attempted " +
+        maxFailoverAttempts + " failovers."));
+  }
+
+  private void changeFollowerReadInitialProxy(int omIndex) {
+    // Change the initial proxy to the OM to be stopped to test follower read 
failover
+    HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB> 
followerReadFailoverProxyProvider =
+        OmTestUtil.getFollowerReadFailoverProxyProvider(getObjectStore());
+    
followerReadFailoverProxyProvider.changeInitialProxyForTest(getCluster().getOzoneManager(omIndex).getOMNodeId());
+  }
+
+  private void changeFollowerReadInitialProxy(String omNodeId) {
+    HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB> 
followerReadFailoverProxyProvider =
+        OmTestUtil.getFollowerReadFailoverProxyProvider(getObjectStore());
+    followerReadFailoverProxyProvider.changeInitialProxyForTest(omNodeId);
+  }
+
+}
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java
index 9e002899d55..06bec648196 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java
@@ -20,6 +20,7 @@
 import static java.util.UUID.randomUUID;
 import static org.apache.hadoop.ozone.OzoneAcl.AclScope.ACCESS;
 import static org.apache.hadoop.ozone.OzoneAcl.AclScope.DEFAULT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_ENABLED_KEY;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.DIRECTORY_NOT_FOUND;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_ALREADY_EXISTS;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_A_FILE;
@@ -51,18 +52,23 @@
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.StorageType;
+import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.ozone.ClientVersion;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.OzoneTestUtils;
 import org.apache.hadoop.ozone.client.BucketArgs;
 import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.client.VolumeArgs;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
 import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
+import 
org.apache.hadoop.ozone.om.ha.HadoopRpcOMFollowerReadFailoverProxyProvider;
 import org.apache.hadoop.ozone.om.ha.OMProxyInfo;
 import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
 import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
@@ -371,7 +377,7 @@ public void testFailoverWithSuggestedLeader() throws 
Exception {
     assertSame(followerOM.getOmRatisServer().getLeaderStatus(),
         OzoneManagerRatisServer.RaftServerStatus.NOT_LEADER);
 
-    OzoneManagerProtocolProtos.OMRequest writeRequest =
+    OzoneManagerProtocolProtos.OMRequest readRequest =
         OzoneManagerProtocolProtos.OMRequest.newBuilder()
             .setCmdType(OzoneManagerProtocolProtos.Type.ListVolume)
             .setVersion(ClientVersion.CURRENT_VERSION)
@@ -381,7 +387,7 @@ public void testFailoverWithSuggestedLeader() throws 
Exception {
     OzoneManagerProtocolServerSideTranslatorPB omServerProtocol =
         followerOM.getOmServerProtocol();
     ServiceException ex = assertThrows(ServiceException.class,
-        () -> omServerProtocol.submitRequest(null, writeRequest));
+        () -> omServerProtocol.submitRequest(null, readRequest));
     assertThat(ex).hasCauseInstanceOf(OMNotLeaderException.class)
         .hasMessageEndingWith("Suggested leader is OM:" + leaderOMNodeId + "[" 
+ leaderOMAddress + "].");
   }
@@ -1144,4 +1150,61 @@ void testOMRatisSnapshot() throws Exception {
             "snapshot indices");
 
   }
+
+  @Test
+  void testOMFollowerReadWithClusterDisabled() throws Exception {
+    OzoneConfiguration clientConf = OzoneConfiguration.of(getConf());
+    clientConf.setBoolean(OZONE_CLIENT_FOLLOWER_READ_ENABLED_KEY, true);
+
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+
+    VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
+        .setOwner(userName)
+        .setAdmin(adminName)
+        .build();
+    OzoneClient ozoneClient = null;
+    try {
+      ozoneClient = OzoneClientFactory.getRpcClient(clientConf);
+      ObjectStore objectStore = ozoneClient.getObjectStore();
+      HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> 
leaderFailoverProxyProvider =
+          OmTestUtil.getFailoverProxyProvider(objectStore);
+      HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB> 
followerReadFailoverProxyProvider =
+          OmTestUtil.getFollowerReadFailoverProxyProvider(objectStore);
+      assertNotNull(followerReadFailoverProxyProvider);
+      assertTrue(followerReadFailoverProxyProvider.isUseFollowerRead());
+
+
+      // Trigger write so that the leader failover proxy provider points to 
the leader
+      objectStore.createVolume(volumeName, createVolumeArgs);
+
+      // Get the leader OM node ID
+      String leaderOMNodeId = 
leaderFailoverProxyProvider.getCurrentProxyOMNodeId();
+
+      // Pick a follower and tigger read so that read failover proxy provider 
fall back to the leader-only read
+      // on encountering OMNotLeaderException
+      String followerOMNodeId = null;
+      for (OzoneManager om : getCluster().getOzoneManagersList()) {
+        if (!om.getOMNodeId().equals(leaderOMNodeId)) {
+          followerOMNodeId = om.getOMNodeId();
+          break;
+        }
+      }
+      assertNotNull(followerOMNodeId);
+      
followerReadFailoverProxyProvider.changeInitialProxyForTest(followerOMNodeId);
+      objectStore.getVolume(volumeName);
+
+      // Client follower read is disabled since it detected that the cluster 
does not
+      // support follower read
+      assertFalse(followerReadFailoverProxyProvider.isUseFollowerRead());
+      OMProxyInfo<OzoneManagerProtocolPB> lastProxy =
+          (OMProxyInfo<OzoneManagerProtocolPB>) 
followerReadFailoverProxyProvider.getLastProxy();
+      // The last read will be done on the leader
+      assertEquals(leaderFailoverProxyProvider.getCurrentProxyOMNodeId(), 
lastProxy.getNodeId());
+    } finally {
+      IOUtils.closeQuietly(ozoneClient);
+    }
+
+  }
 }
diff --git 
a/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
 
b/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
index 0d38d357b8a..f66dc091de6 100644
--- 
a/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
+++ 
b/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
@@ -29,6 +29,7 @@
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
 import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
+import 
org.apache.hadoop.ozone.om.ha.HadoopRpcOMFollowerReadFailoverProxyProvider;
 import org.apache.hadoop.ozone.om.protocolPB.OmTransport;
 import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
@@ -45,6 +46,7 @@ public class Hadoop27RpcTransport implements OmTransport {
   private final OzoneManagerProtocolPB rpcProxy;
 
   private final HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> 
omFailoverProxyProvider;
+  private HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB> 
followerReadFailoverProxyProvider;
 
   public Hadoop27RpcTransport(ConfigurationSource conf,
       UserGroupInformation ugi, String omServiceId) throws IOException {
@@ -56,10 +58,28 @@ public Hadoop27RpcTransport(ConfigurationSource conf,
     this.omFailoverProxyProvider = new HadoopRpcOMFailoverProxyProvider<>(
             conf, ugi, omServiceId, OzoneManagerProtocolPB.class);
 
+    boolean followerReadEnabled = conf.getBoolean(
+        OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_ENABLED_KEY,
+        OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_ENABLED_DEFAULT
+    );
+
     int maxFailovers = conf.getInt(
         OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
         OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
-    this.rpcProxy = OzoneManagerProtocolPB.newProxy(omFailoverProxyProvider, 
maxFailovers);
+
+    if (followerReadEnabled) {
+      this.followerReadFailoverProxyProvider = new 
HadoopRpcOMFollowerReadFailoverProxyProvider<>(
+          omServiceId, OzoneManagerProtocolPB.class, omFailoverProxyProvider
+      );
+      this.rpcProxy = 
OzoneManagerProtocolPB.newProxy(followerReadFailoverProxyProvider, 
maxFailovers);
+    } else {
+      // TODO: It should be possible to simply instantiate 
HadoopRpcOMFollowerReadFailoverProxyProvider
+      //  even if the follower read is not enabled. We can try this to ensure 
that the tests still pass which
+      //  suggests that the HadoopRpcOMFollowerReadFailoverProxyProvider is a 
indeed a superset of
+      //  HadoopRpcOMFollowerReadFailoverProxyProvider
+      this.rpcProxy = OzoneManagerProtocolPB.newProxy(omFailoverProxyProvider, 
maxFailovers);
+    }
+
   }
 
   @Override
@@ -94,6 +114,10 @@ public Text getDelegationTokenService() {
 
   @Override
   public void close() throws IOException {
-    omFailoverProxyProvider.close();
+    if (followerReadFailoverProxyProvider != null) {
+      followerReadFailoverProxyProvider.close();
+    } else {
+      omFailoverProxyProvider.close();
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to