This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 99f4132744c HDDS-14682. Unify OzoneManagerProtocolPB failover proxy 
provider (#9929)
99f4132744c is described below

commit 99f4132744cb373bfb1d3ff3e02e508c9d05c84b
Author: Ivan Andika <[email protected]>
AuthorDate: Fri Mar 27 21:22:19 2026 +0800

    HDDS-14682. Unify OzoneManagerProtocolPB failover proxy provider (#9929)
---
 ...doopRpcOMFollowerReadFailoverProxyProvider.java | 55 +++++++++----------
 .../ozone/om/protocolPB/Hadoop3OmTransport.java    | 61 ++++++----------------
 ...stOzoneManagerHAFollowerReadWithAllRunning.java |  5 +-
 .../hadoop/fs/ozone/Hadoop27RpcTransport.java      | 61 ++++++----------------
 4 files changed, 63 insertions(+), 119 deletions(-)

diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFollowerReadFailoverProxyProvider.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFollowerReadFailoverProxyProvider.java
index 305f5c51769..eec55683f32 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFollowerReadFailoverProxyProvider.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFollowerReadFailoverProxyProvider.java
@@ -71,7 +71,7 @@ public class HadoopRpcOMFollowerReadFailoverProxyProvider 
implements FailoverPro
   private static final Logger LOG = 
LoggerFactory.getLogger(HadoopRpcOMFollowerReadFailoverProxyProvider.class);
 
   /** The inner proxy provider used for leader-based failover. */
-  private final HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> 
failoverProxy;
+  private final HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> 
leaderProxy;
 
   /** The combined proxy which redirects to other proxies as necessary. */
   private final ProxyInfo<OzoneManagerProtocolPB> combinedProxy;
@@ -97,30 +97,31 @@ public class HadoopRpcOMFollowerReadFailoverProxyProvider 
implements FailoverPro
   private final ReadConsistencyHint leaderReadConsistency;
 
   public HadoopRpcOMFollowerReadFailoverProxyProvider(
-      HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> failoverProxy
+      HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> leaderProxy
   ) {
-    this(failoverProxy, ReadConsistency.LINEARIZABLE_ALLOW_FOLLOWER, 
ReadConsistency.DEFAULT);
+    this(leaderProxy, ReadConsistency.LINEARIZABLE_ALLOW_FOLLOWER, 
ReadConsistency.DEFAULT, true);
   }
 
   public HadoopRpcOMFollowerReadFailoverProxyProvider(
-      HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> failoverProxy,
+      HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> leaderProxy,
       ReadConsistency followerReadConsistencyType,
-      ReadConsistency leaderReadConsistencyType) {
+      ReadConsistency leaderReadConsistencyType,
+      boolean useFollowerRead) {
     Preconditions.assertTrue(followerReadConsistencyType.allowFollowerRead(),
         "Invalid follower read consistency " + followerReadConsistencyType);
     Preconditions.assertTrue(!leaderReadConsistencyType.allowFollowerRead(),
         "Invalid leader read consistency " + leaderReadConsistencyType);
-    this.failoverProxy = failoverProxy;
+    this.leaderProxy = leaderProxy;
     // Create a wrapped proxy containing all the proxies. Since this combined
     // proxy is just redirecting to other proxies, all invocations can share 
it.
-    final String combinedInfo = "[" + failoverProxy.getOMProxies().stream()
+    final String combinedInfo = "[" + leaderProxy.getOMProxies().stream()
         .map(a -> a.proxyInfo)
         .reduce((a, b) -> a + ", " + b).orElse("") + "]";
     OzoneManagerProtocolPB wrappedProxy = (OzoneManagerProtocolPB) 
Proxy.newProxyInstance(
         FollowerReadInvocationHandler.class.getClassLoader(),
         new Class<?>[] {OzoneManagerProtocolPB.class}, new 
FollowerReadInvocationHandler());
     combinedProxy = new ProxyInfo<>(wrappedProxy, combinedInfo);
-    this.useFollowerRead = true;
+    this.useFollowerRead = useFollowerRead;
     this.followerReadConsistency = followerReadConsistencyType.getHint();
     this.leaderReadConsistency = leaderReadConsistencyType.getHint();
   }
@@ -139,7 +140,7 @@ public ProxyInfo<OzoneManagerProtocolPB> getProxy() {
   public void performFailover(OzoneManagerProtocolPB currProxy) {
     // Since FollowerReadInvocationHandler might user or fallback to 
leader-based failover logic,
     // we should delegate the failover logic to the leader's failover.
-    failoverProxy.performFailover(currProxy);
+    leaderProxy.performFailover(currProxy);
   }
 
   public RetryPolicy getRetryPolicy(int maxFailovers) {
@@ -149,7 +150,7 @@ public RetryPolicy getRetryPolicy(int maxFailovers) {
     //    (when follower read is disabled or using write request)
     // 2. The FollowerInvocationHandler is also written so that the thrown 
exception is handled by the
     //    OMFailoverProxyProviderbase's RetryPolicy
-    return failoverProxy.getRetryPolicy(maxFailovers);
+    return leaderProxy.getRetryPolicy(maxFailovers);
   }
 
   /**
@@ -202,15 +203,15 @@ public OMProxyInfo<OzoneManagerProtocolPB> 
getCurrentProxy() {
    * @return The new proxy that should be used.
    */
   private synchronized OMProxyInfo<OzoneManagerProtocolPB> 
changeProxy(OMProxyInfo<OzoneManagerProtocolPB> initial) {
-    OMProxyInfo<OzoneManagerProtocolPB> currentProxy = 
failoverProxy.getOMProxyMap().get(currentIndex);
+    OMProxyInfo<OzoneManagerProtocolPB> currentProxy = 
leaderProxy.getOMProxyMap().get(currentIndex);
     if (currentProxy != initial) {
       // Must have been a concurrent modification; ignore the move request
       return currentProxy;
     }
-    final OMProxyInfo.OrderedMap<OzoneManagerProtocolPB> omProxies = 
failoverProxy.getOMProxyMap();
+    final OMProxyInfo.OrderedMap<OzoneManagerProtocolPB> omProxies = 
leaderProxy.getOMProxyMap();
     currentIndex = (currentIndex + 1) % omProxies.size();
     final String currentOmNodeId = omProxies.getNodeId(currentIndex);
-    currentProxy = failoverProxy.createOMProxyIfNeeded(currentOmNodeId);
+    currentProxy = leaderProxy.createOMProxyIfNeeded(currentOmNodeId);
     LOG.debug("Changed current proxy from {} to {}",
         initial == null ? "none" : initial.proxyInfo,
         currentProxy.proxyInfo);
@@ -258,7 +259,7 @@ public Object invoke(Object proxy, final Method method, 
final Object[] args)
 
       if (isFollowerReadEligible) {
         int failedCount = 0;
-        for (int i = 0; useFollowerRead && i < 
failoverProxy.getOMProxyMap().size(); i++) {
+        for (int i = 0; useFollowerRead && i < 
leaderProxy.getOMProxyMap().size(); i++) {
           OMProxyInfo<OzoneManagerProtocolPB> current = getCurrentProxy();
           LOG.debug("Attempting to service {} with cmdType {} using proxy {}",
               method.getName(), omRequest.getCmdType(), current.proxyInfo);
@@ -320,7 +321,7 @@ public Object invoke(Object proxy, final Method method, 
final Object[] args)
               }
             }
 
-            if (!failoverProxy.shouldFailover(e)) {
+            if (!leaderProxy.shouldFailover(e)) {
               // We reuse the leader proxy provider failover since we want to 
ensure
               // if the follower read proxy decides that the exception should 
be failed,
               // the leader proxy provider failover retry policy (i.e. 
OMFailoverProxyProviderBase#getRetryPolicy)
@@ -363,17 +364,17 @@ public Object invoke(Object proxy, final Method method, 
final Object[] args)
       // or this is a write request. In any case, forward the request to
       // the leader OM.
       LOG.debug("Using leader-based failoverProxy to service {}", 
method.getName());
-      final OMProxyInfo<OzoneManagerProtocolPB> leaderProxy = 
failoverProxy.getProxy();
+      final OMProxyInfo<OzoneManagerProtocolPB> currentLeaderProxy = 
leaderProxy.getProxy();
       Object retVal = null;
       try {
-        retVal = method.invoke(leaderProxy.getProxy(), args);
+        retVal = method.invoke(currentLeaderProxy.getProxy(), args);
       } catch (InvocationTargetException e) {
         LOG.debug("Exception thrown from leader-based failoverProxy", 
e.getCause());
         // This exception will be handled by the 
OMFailoverProxyProviderBase#getRetryPolicy
         // (see getRetryPolicy). This ensures that the leader-only failover 
should still work.
         throwServiceException(e.getCause());
       }
-      lastProxy = leaderProxy;
+      lastProxy = currentLeaderProxy;
       return retVal;
     }
 
@@ -385,15 +386,15 @@ public void close() throws IOException {
     @Override
     public ConnectionId getConnectionId() {
       return RPC.getConnectionIdForProxy(useFollowerRead
-          ? getCurrentProxy().proxy : failoverProxy.getProxy().getProxy());
+          ? getCurrentProxy().proxy : leaderProxy.getProxy().getProxy());
     }
   }
 
   @Override
   public synchronized void close() throws IOException {
-    // All the proxies are stored in the underlying failoverProxy
-    // so we invoke close on the underlying failoverProxy
-    failoverProxy.close();
+    // All the proxies are stored in the underlying leaderProxy
+    // so we invoke close on the underlying leaderProxy
+    leaderProxy.close();
   }
 
   @VisibleForTesting
@@ -403,22 +404,22 @@ public boolean isUseFollowerRead() {
 
   @VisibleForTesting
   public List<OMProxyInfo<OzoneManagerProtocolPB>> getOMProxies() {
-    return failoverProxy.getOMProxies();
+    return leaderProxy.getOMProxies();
   }
 
   public synchronized void changeInitialProxyForTest(String initialOmNodeId) {
-    final OMProxyInfo<OzoneManagerProtocolPB> currentProxy = 
failoverProxy.getOMProxyMap().get(currentIndex);
+    final OMProxyInfo<OzoneManagerProtocolPB> currentProxy = 
leaderProxy.getOMProxyMap().get(currentIndex);
     if (currentProxy != null && 
currentProxy.getNodeId().equals(initialOmNodeId)) {
       return;
     }
 
-    int indexOfTargetNodeId = 
failoverProxy.getOMProxyMap().indexOf(initialOmNodeId);
-    if (indexOfTargetNodeId == -1) {
+    Integer indexOfTargetNodeId = 
leaderProxy.getOMProxyMap().indexOf(initialOmNodeId);
+    if (indexOfTargetNodeId == null) {
       return;
     }
 
     currentIndex = indexOfTargetNodeId;
-    failoverProxy.createOMProxyIfNeeded(initialOmNodeId);
+    leaderProxy.createOMProxyIfNeeded(initialOmNodeId);
   }
 
   /**
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
index c5a6988dc7f..98fcd66e762 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
@@ -37,7 +37,6 @@
 import org.apache.hadoop.ozone.om.helpers.ReadConsistency;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
-import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ReadConsistencyHint;
 import org.apache.hadoop.security.UserGroupInformation;
 
 /**
@@ -53,9 +52,7 @@ public class Hadoop3OmTransport implements OmTransport {
   private final OzoneManagerProtocolPB rpcProxy;
 
   private final HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> 
omFailoverProxyProvider;
-  private final boolean followerReadEnabled;
-  private final ReadConsistencyHint defaultLeaderReadConsistencyHint;
-  private HadoopRpcOMFollowerReadFailoverProxyProvider 
followerReadFailoverProxyProvider;
+  private final HadoopRpcOMFollowerReadFailoverProxyProvider 
followerReadFailoverProxyProvider;
 
   public Hadoop3OmTransport(ConfigurationSource conf,
       UserGroupInformation ugi, String omServiceId) throws IOException {
@@ -67,10 +64,9 @@ public Hadoop3OmTransport(ConfigurationSource conf,
     this.omFailoverProxyProvider = new HadoopRpcOMFailoverProxyProvider<>(
             conf, ugi, omServiceId, OzoneManagerProtocolPB.class);
 
-    followerReadEnabled = conf.getBoolean(
+    boolean followerReadEnabled = conf.getBoolean(
         OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_ENABLED_KEY,
-        OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_ENABLED_DEFAULT
-    );
+        OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_ENABLED_DEFAULT);
 
     int maxFailovers = conf.getInt(
         OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
@@ -79,37 +75,29 @@ public Hadoop3OmTransport(ConfigurationSource conf,
     String defaultLeaderReadConsistencyStr = 
conf.get(OZONE_CLIENT_LEADER_READ_DEFAULT_CONSISTENCY_KEY,
         OZONE_CLIENT_LEADER_READ_DEFAULT_CONSISTENCY_DEFAULT);
     ReadConsistency defaultLeaderReadConsistency = 
ReadConsistency.valueOf(defaultLeaderReadConsistencyStr);
-    defaultLeaderReadConsistencyHint = defaultLeaderReadConsistency.getHint();
 
     // TODO: In the future, we might support more FollowerReadProxyProvider 
strategies depending on factors
     //  like latency, applied index, etc.
     //  So instead of enabling using follower read configuration, we can 
simply let user to configure the
     //  failover proxy provider instead (similar to 
dfs.client.failover.proxy.provider.<nameservice>)
-    if (followerReadEnabled) {
-      String defaultFollowerReadConsistencyStr = conf.get(
-          OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_DEFAULT_CONSISTENCY_KEY,
-          
OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_DEFAULT_CONSISTENCY_DEFAULT
-      );
-      ReadConsistency defaultFollowerReadConsistency =
-          ReadConsistency.valueOf(defaultFollowerReadConsistencyStr);
-      this.followerReadFailoverProxyProvider =
-          new 
HadoopRpcOMFollowerReadFailoverProxyProvider(omFailoverProxyProvider,
-              defaultFollowerReadConsistency,
-              defaultLeaderReadConsistency);
-      this.rpcProxy = 
OzoneManagerProtocolPB.newProxy(followerReadFailoverProxyProvider, 
maxFailovers);
-    } else {
-      // TODO HDDS-14682: It should be possible to simply instantiate 
HadoopRpcOMFollowerReadFailoverProxyProvider
-      //  even if the follower read is not enabled. We can try this to ensure 
that the tests still pass which
-      //  suggests that the HadoopRpcOMFollowerReadFailoverProxyProvider is a 
indeed a superset of
-      //  HadoopRpcOMFollowerReadFailoverProxyProvider
-      this.rpcProxy = OzoneManagerProtocolPB.newProxy(omFailoverProxyProvider, 
maxFailovers);
-    }
+    String defaultFollowerReadConsistencyStr = conf.get(
+        OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_DEFAULT_CONSISTENCY_KEY,
+        OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_DEFAULT_CONSISTENCY_DEFAULT
+    );
+    ReadConsistency defaultFollowerReadConsistency =
+        ReadConsistency.valueOf(defaultFollowerReadConsistencyStr);
+    this.followerReadFailoverProxyProvider =
+        new 
HadoopRpcOMFollowerReadFailoverProxyProvider(omFailoverProxyProvider,
+            defaultFollowerReadConsistency,
+            defaultLeaderReadConsistency,
+            followerReadEnabled);
+    this.rpcProxy = 
OzoneManagerProtocolPB.newProxy(followerReadFailoverProxyProvider, 
maxFailovers);
   }
 
   @Override
   public OMResponse submitRequest(OMRequest payload) throws IOException {
     try {
-      return rpcProxy.submitRequest(NULL_RPC_CONTROLLER, 
getOMRequest(payload));
+      return rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload);
     } catch (ServiceException e) {
       OMNotLeaderException notLeaderException =
           HadoopRpcOMFailoverProxyProvider.getNotLeaderException(e);
@@ -120,23 +108,6 @@ public OMResponse submitRequest(OMRequest payload) throws 
IOException {
     }
   }
 
-  private OMRequest getOMRequest(OMRequest basePayload) {
-    // TODO HDDS-14682: We can remove this logic once we always use 
FollowerReadProxyProvider
-    if (followerReadEnabled) {
-      // Follower read uses FollowerReadInvocationHandler to set the 
invocation handler
-      // Return the request payload as is
-      return basePayload;
-    }
-    if (basePayload.hasReadConsistencyHint()) {
-      // If there is already user-defined read consistency hint, we should 
respect it
-      return basePayload;
-    }
-
-    return basePayload.toBuilder()
-        .setReadConsistencyHint(defaultLeaderReadConsistencyHint)
-        .build();
-  }
-
   @Override
   public Text getDelegationTokenService() {
     return omFailoverProxyProvider.getCurrentProxyDelegationToken();
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java
index d984ced1fc2..9262da093a4 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java
@@ -31,7 +31,6 @@
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNotSame;
-import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -487,7 +486,9 @@ void testClientWithFollowerReadDisabled() throws Exception {
       // This will trigger getServiceId so the client should point to the 
leader
       leaderOnlyClient = OzoneClientFactory.getRpcClient(getOmServiceId(), 
clientConf);
       ObjectStore leaderOnlyObjectStore = leaderOnlyClient.getObjectStore();
-      
assertNull(OmTestUtil.getFollowerReadFailoverProxyProvider(leaderOnlyObjectStore));
+      // FollowerReadProxyProvider is the unified proxy provider regardless of 
whether follower
+      // read is enabled or not
+      
assertNotNull(OmTestUtil.getFollowerReadFailoverProxyProvider(leaderOnlyObjectStore));
       HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> 
leaderProxyProvider =
           OmTestUtil.getFailoverProxyProvider(leaderOnlyObjectStore);
 
diff --git 
a/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
 
b/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
index b1f0e5f1bb4..c03a5173806 100644
--- 
a/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
+++ 
b/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
@@ -38,7 +38,6 @@
 import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
-import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ReadConsistencyHint;
 import org.apache.hadoop.security.UserGroupInformation;
 
 /**
@@ -51,9 +50,7 @@ public class Hadoop27RpcTransport implements OmTransport {
   private final OzoneManagerProtocolPB rpcProxy;
 
   private final HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> 
omFailoverProxyProvider;
-  private final boolean followerReadEnabled;
-  private final ReadConsistencyHint defaultLeaderReadConsistencyHint;
-  private HadoopRpcOMFollowerReadFailoverProxyProvider 
followerReadFailoverProxyProvider;
+  private final HadoopRpcOMFollowerReadFailoverProxyProvider 
followerReadFailoverProxyProvider;
 
   public Hadoop27RpcTransport(ConfigurationSource conf,
       UserGroupInformation ugi, String omServiceId) throws IOException {
@@ -65,10 +62,9 @@ public Hadoop27RpcTransport(ConfigurationSource conf,
     this.omFailoverProxyProvider = new HadoopRpcOMFailoverProxyProvider<>(
             conf, ugi, omServiceId, OzoneManagerProtocolPB.class);
 
-    followerReadEnabled = conf.getBoolean(
+    boolean followerReadEnabled = conf.getBoolean(
         OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_ENABLED_KEY,
-        OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_ENABLED_DEFAULT
-    );
+        OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_ENABLED_DEFAULT);
 
     int maxFailovers = conf.getInt(
         OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
@@ -77,34 +73,25 @@ public Hadoop27RpcTransport(ConfigurationSource conf,
     String defaultLeaderReadConsistencyStr = 
conf.get(OZONE_CLIENT_LEADER_READ_DEFAULT_CONSISTENCY_KEY,
         OZONE_CLIENT_LEADER_READ_DEFAULT_CONSISTENCY_DEFAULT);
     ReadConsistency defaultLeaderReadConsistency = 
ReadConsistency.valueOf(defaultLeaderReadConsistencyStr);
-    defaultLeaderReadConsistencyHint = defaultLeaderReadConsistency.getHint();
-
-    if (followerReadEnabled) {
-      String defaultFollowerReadConsistencyStr = conf.get(
-          OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_DEFAULT_CONSISTENCY_KEY,
-          
OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_DEFAULT_CONSISTENCY_DEFAULT
-      );
-      ReadConsistency defaultFollowerReadConsistency =
-          ReadConsistency.valueOf(defaultFollowerReadConsistencyStr);
-      this.followerReadFailoverProxyProvider =
-          new 
HadoopRpcOMFollowerReadFailoverProxyProvider(omFailoverProxyProvider,
-              defaultFollowerReadConsistency,
-              defaultLeaderReadConsistency);
-      this.rpcProxy = 
OzoneManagerProtocolPB.newProxy(followerReadFailoverProxyProvider, 
maxFailovers);
-    } else {
-      // TODO: It should be possible to simply instantiate 
HadoopRpcOMFollowerReadFailoverProxyProvider
-      //  even if the follower read is not enabled. We can try this to ensure 
that the tests still pass which
-      //  suggests that the HadoopRpcOMFollowerReadFailoverProxyProvider is a 
indeed a superset of
-      //  HadoopRpcOMFollowerReadFailoverProxyProvider
-      this.rpcProxy = OzoneManagerProtocolPB.newProxy(omFailoverProxyProvider, 
maxFailovers);
-    }
 
+    String defaultFollowerReadConsistencyStr = conf.get(
+        OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_DEFAULT_CONSISTENCY_KEY,
+        OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_DEFAULT_CONSISTENCY_DEFAULT
+    );
+    ReadConsistency defaultFollowerReadConsistency =
+        ReadConsistency.valueOf(defaultFollowerReadConsistencyStr);
+    this.followerReadFailoverProxyProvider =
+        new 
HadoopRpcOMFollowerReadFailoverProxyProvider(omFailoverProxyProvider,
+            defaultFollowerReadConsistency,
+            defaultLeaderReadConsistency,
+            followerReadEnabled);
+    this.rpcProxy = 
OzoneManagerProtocolPB.newProxy(followerReadFailoverProxyProvider, 
maxFailovers);
   }
 
   @Override
   public OMResponse submitRequest(OMRequest payload) throws IOException {
     try {
-      return rpcProxy.submitRequest(NULL_RPC_CONTROLLER, 
getOMRequest(payload));
+      return rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload);
     } catch (ServiceException e) {
       OMNotLeaderException notLeaderException =
           HadoopRpcOMFailoverProxyProvider.getNotLeaderException(e);
@@ -115,22 +102,6 @@ public OMResponse submitRequest(OMRequest payload) throws 
IOException {
     }
   }
 
-  private OMRequest getOMRequest(OMRequest basePayload) {
-    if (followerReadEnabled) {
-      // Follower read uses FollowerReadInvocationHandler to set the 
invocation handler
-      // Return the request payload as is
-      return basePayload;
-    }
-    if (basePayload.hasReadConsistencyHint()) {
-      // If there is already user-defined read consistency hint, we should 
respect it
-      return basePayload;
-    }
-
-    return basePayload.toBuilder()
-        .setReadConsistencyHint(defaultLeaderReadConsistencyHint)
-        .build();
-  }
-
   @Override
   public Text getDelegationTokenService() {
     return null;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to