ivandika3 commented on code in PR #9796:
URL: https://github.com/apache/ozone/pull/9796#discussion_r2850703353


##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java:
##########
@@ -216,44 +221,152 @@ public OMRequest getLastRequestToSubmit() {
 
   private OMResponse submitReadRequestToOM(OMRequest request)
       throws ServiceException {
-    // Read from leader or followers using linearizable read
-    if (ozoneManager.getConfig().isFollowerReadLocalLeaseEnabled() &&
-        allowFollowerReadLocalLease(omRatisServer.getServerDivision(),
-            ozoneManager.getConfig().getFollowerReadLocalLeaseLagLimit(),
-            ozoneManager.getConfig().getFollowerReadLocalLeaseTimeMs())) {
-      ozoneManager.getMetrics().incNumFollowerReadLocalLeaseSuccess();
+    if (request.getCmdType().equals(PrepareStatus)) {
+      // PrepareStatus is an OM request that only target a single OM node.
+      // Therefore, all PrepareStatus requests should be served immediately 
without failover regardless
+      // of the OM node leadership or the read consistency. See 
PrepareSubCommand.
+      // The implementation is not ideal, but exists for compatibility reason.
       return handler.handleReadRequest(request);
-    } 
-    // Get current OM's role
-    RaftServerStatus raftServerStatus = omRatisServer.getLeaderStatus();
-    // === 1. Follower linearizable read ===
-    if (raftServerStatus == NOT_LEADER && omRatisServer.isLinearizableRead()) {
-      ozoneManager.getMetrics().incNumLinearizableRead();
-      return ozoneManager.getOmExecutionFlow().submit(request, false);
     }
-    // === 2. Leader local read (skip ReadIndex if allowed) ===
-    if (raftServerStatus == LEADER_AND_READY || 
request.getCmdType().equals(PrepareStatus)) {
-      if (ozoneManager.getConfig().isAllowLeaderSkipLinearizableRead()) {
-        ozoneManager.getMetrics().incNumLeaderSkipLinearizableRead();
-        // leader directly serves local committed data
+
+    if (!request.hasReadConsistencyHint() || 
!request.getReadConsistencyHint().hasReadConsistency() ||
+        request.getReadConsistencyHint().getReadConsistency() == 
UNKNOWN_READ_CONSISTENCY) {
+      // Read from leader or followers using linearizable read
+      if (ozoneManager.getConfig().isFollowerReadLocalLeaseEnabled() &&
+          allowFollowerReadLocalLease(omRatisServer.getServerDivision(),
+              ozoneManager.getConfig().getFollowerReadLocalLeaseLagLimit(),
+              ozoneManager.getConfig().getFollowerReadLocalLeaseTimeMs())) {
+        ozoneManager.getMetrics().incNumFollowerReadLocalLeaseSuccess();
         return handler.handleReadRequest(request);
       }
-      // otherwise use linearizable path when enabled
-      if (omRatisServer.isLinearizableRead()) {
+      // Get current OM's role
+      RaftServerStatus raftServerStatus = omRatisServer.getLeaderStatus();
+      // === 1. Follower linearizable read ===
+      if (raftServerStatus == NOT_LEADER && 
omRatisServer.isLinearizableRead()) {
         ozoneManager.getMetrics().incNumLinearizableRead();
         return ozoneManager.getOmExecutionFlow().submit(request, false);
       }
+      // === 2. Leader local read (skip ReadIndex if allowed) ===
+      if (raftServerStatus == LEADER_AND_READY) {
+        if (ozoneManager.getConfig().isAllowLeaderSkipLinearizableRead()) {
+          ozoneManager.getMetrics().incNumLeaderSkipLinearizableRead();
+          // leader directly serves local committed data
+          return handler.handleReadRequest(request);
+        }
+        // otherwise use linearizable path when enabled
+        if (omRatisServer.isLinearizableRead()) {
+          ozoneManager.getMetrics().incNumLinearizableRead();
+          return ozoneManager.getOmExecutionFlow().submit(request, false);
+        }
 
-      // fallback to local read
-      return handler.handleReadRequest(request);
+        // fallback to local read
+        return handler.handleReadRequest(request);
+      } else {
+        throw createLeaderErrorException(raftServerStatus);
+      }
     } else {
-      throw createLeaderErrorException(raftServerStatus);
+      // If read consistency hint is specified, we should try to respect it 
although
+      // there is no guarantee since it depends on the OM node configuration 
(e.g.
+      // whether OM Raft server enables linearizable read).
+      ReadConsistencyHint readConsistencyHint = 
request.getReadConsistencyHint();
+      ReadConsistencyProto readConsistency = 
readConsistencyHint.getReadConsistency();
+      RaftServerStatus raftServerStatus;
+      switch (readConsistency) {
+      case LOCAL_LEASE:
+        raftServerStatus = omRatisServer.getLeaderStatus();
+        switch (raftServerStatus) {
+        case NOT_LEADER:
+        case LEADER_AND_NOT_READY:
+          if (!ozoneManager.getConfig().isFollowerReadLocalLeaseEnabled()) {
+            throw createLeaderErrorException(raftServerStatus);
+          }
+          LocalLeaseContext localLeaseContext = 
readConsistencyHint.getLocalLeaseContext();
+          long localLeaseLagLimit = localLeaseContext.hasLagLimit() ?
+              localLeaseContext.getLagLimit() : 
ozoneManager.getConfig().getFollowerReadLocalLeaseLagLimit();
+          long localLeaseLeaseTimeMs = localLeaseContext.hasLeaseTimeMs() ?
+              localLeaseContext.getLeaseTimeMs() : 
ozoneManager.getConfig().getFollowerReadLocalLeaseTimeMs();
+          if (allowFollowerReadLocalLease(omRatisServer.getServerDivision(),
+              localLeaseLagLimit, localLeaseLeaseTimeMs)) {
+            ozoneManager.getMetrics().incNumFollowerReadLocalLeaseSuccess();
+            return handler.handleReadRequest(request);
+          }
+          // The LocalLease lag is too high, trigger failover
+          throw createLeaderErrorException(raftServerStatus);
+        case LEADER_AND_READY:
+          // Although local lease does not apply for leader (since leader is 
always up-to-date)
+          // We still add the local lease metrics for compatibility reasons
+          ozoneManager.getMetrics().incNumFollowerReadLocalLeaseSuccess();
+          return handler.handleReadRequest(request);
+        default:
+          throw createUnknownRaftServerStatusException(raftServerStatus);
+        }
+      case LINEARIZABLE_LEADER_ONLY:
+        raftServerStatus = omRatisServer.getLeaderStatus();
+        switch (raftServerStatus) {
+        case NOT_LEADER:
+        case LEADER_AND_NOT_READY:
+          throw createLeaderErrorException(raftServerStatus);
+        case LEADER_AND_READY:
+          if (omRatisServer.isLinearizableRead()) {
+            ozoneManager.getMetrics().incNumLinearizableRead();
+            return ozoneManager.getOmExecutionFlow().submit(request, false);
+          } else {
+            // If linearizable read is not enabled, fallback to leader read
+            return handler.handleReadRequest(request);
+          }
+        default:
+          throw createUnknownRaftServerStatusException(raftServerStatus);
+        }
+      case LINEARIZABLE_ALLOW_FOLLOWER:
+        raftServerStatus = omRatisServer.getLeaderStatus();
+        switch (raftServerStatus) {
+        case LEADER_AND_NOT_READY:
+          throw createLeaderErrorException(raftServerStatus);
+        case NOT_LEADER:
+          if (omRatisServer.isLinearizableRead()) {
+            ozoneManager.getMetrics().incNumLinearizableRead();
+            return ozoneManager.getOmExecutionFlow().submit(request, false);
+          } else {
+            throw createLeaderErrorException(raftServerStatus);
+          }
+        case LEADER_AND_READY:
+          if (ozoneManager.getConfig().isAllowLeaderSkipLinearizableRead()) {
+            ozoneManager.getMetrics().incNumLeaderSkipLinearizableRead();
+            // leader directly serves local committed data
+            return handler.handleReadRequest(request);
+          }
+
+          // If the Raft server read option is not LINEARIZABLE, this will
+          // use leader read
+          if (omRatisServer.isLinearizableRead()) {
+            ozoneManager.getMetrics().incNumLinearizableRead();
+          }
+          return ozoneManager.getOmExecutionFlow().submit(request, false);
+        default:
+          throw createUnknownRaftServerStatusException(raftServerStatus);
+        }
+      case DEFAULT:
+      default:
+        raftServerStatus = omRatisServer.getLeaderStatus();
+        switch (raftServerStatus) {
+        case LEADER_AND_READY:
+          return handler.handleReadRequest(request);
+        case LEADER_AND_NOT_READY:
+        case NOT_LEADER:
+          throw createLeaderErrorException(raftServerStatus);
+        default:
+          throw createUnknownRaftServerStatusException(raftServerStatus);
+        }
+      }
     }
   }
 
-  boolean allowFollowerReadLocalLease(Division ratisDivision, long 
leaseLogLimit, long leaseTimeMsLimit) {
+  boolean allowFollowerReadLocalLease(Division ratisDivision, long 
leaseLagLimit, long leaseTimeMsLimit) {

Review Comment:
   Thanks for catching this, updated. The initial change was because of the 
configuration which lead me to believe it's a typo.



##########
hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java:
##########
@@ -46,6 +52,8 @@ public class Hadoop27RpcTransport implements OmTransport {
   private final OzoneManagerProtocolPB rpcProxy;
 
   private final HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> 
omFailoverProxyProvider;
+  private final boolean followerReadEnabled;
+  private final ReadConsistencyProto defaultLeaderReadConsistency;

Review Comment:
   Thanks, updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to