This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new d1aecfd3a82 HDDS-13954. Add localLease for followerRead (#9320)
d1aecfd3a82 is described below

commit d1aecfd3a82d6ff1f781e51ae736edd0f79ec644
Author: Symious <[email protected]>
AuthorDate: Mon Dec 8 20:26:23 2025 +0800

    HDDS-13954. Add localLease for followerRead (#9320)
---
 .../java/org/apache/hadoop/ozone/om/OmConfig.java  | 56 ++++++++++++++++++++++
 .../hadoop/ozone/shell/TestOzoneShellHA.java       | 10 ++--
 .../shell/TestOzoneShellHAWithFollowerRead.java    | 30 +++++++++++-
 .../java/org/apache/hadoop/ozone/om/OMMetrics.java | 28 +++++++++++
 ...OzoneManagerProtocolServerSideTranslatorPB.java | 54 +++++++++++++++++++++
 5 files changed, 174 insertions(+), 4 deletions(-)

diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OmConfig.java 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OmConfig.java
index 8f762a7dbd6..02ce1aec334 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OmConfig.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OmConfig.java
@@ -150,6 +150,38 @@ public class OmConfig extends ReconfigurableConfig {
   )
   private boolean allowLeaderSkipLinearizableRead;
 
+  @Config(key = "ozone.om.follower.read.local.lease.enabled",
+      defaultValue = "false",
+      reconfigurable = true,
+      type = ConfigType.BOOLEAN,
+      tags = {ConfigTag.OM, ConfigTag.PERFORMANCE, ConfigTag.HA, 
ConfigTag.RATIS},
+      description = "If we enabled the local lease for Follower Read. " +
+          "If enabled, follower OM will decide if return local data 
directly\n" +
+          "based on lag log and time."
+  )
+  private boolean followerReadLocalLeaseEnabled;
+
+  @Config(key = "ozone.om.follower.read.local.lease.lag.limit",
+      defaultValue = "10000",
+      reconfigurable = true,
+      type = ConfigType.LONG,
+      tags = {ConfigTag.OM, ConfigTag.PERFORMANCE, ConfigTag.HA, 
ConfigTag.RATIS},
+      description = "If the lag between leader OM and follower OM is larger " +
+          "than this number, the follower OM is not up-to-date."
+  )
+  private long followerReadLocalLeaseLagLimit;
+
+  @Config(key = "ozone.om.follower.read.local.lease.time.ms",
+      defaultValue = "5000",
+      reconfigurable = true,
+      type = ConfigType.LONG,
+      tags = {ConfigTag.OM, ConfigTag.PERFORMANCE, ConfigTag.HA, 
ConfigTag.RATIS},
+      description = " If the lag time Ms between leader OM and follower OM is 
larger " +
+          "than this number, the follower OM is not up-to-date. " +
+          "By default, it's set to Ratis RPC timeout value."
+  )
+  private long followerReadLocalLeaseTimeMs;
+
   public long getRatisBasedFinalizationTimeout() {
     return ratisBasedFinalizationTimeout;
   }
@@ -190,6 +222,30 @@ public void setAllowLeaderSkipLinearizableRead(boolean 
newValue) {
     allowLeaderSkipLinearizableRead = newValue;
   }
 
+  public boolean isFollowerReadLocalLeaseEnabled() {
+    return followerReadLocalLeaseEnabled;
+  }
+
+  public void setFollowerReadLocalLeaseEnabled(boolean newValue) {
+    this.followerReadLocalLeaseEnabled = newValue;
+  }
+
+  public long getFollowerReadLocalLeaseLagLimit() {
+    return followerReadLocalLeaseLagLimit;
+  }
+
+  public void setFollowerReadLocalLeaseLagLimit(long newValue) {
+    this.followerReadLocalLeaseLagLimit = newValue;
+  }
+
+  public long getFollowerReadLocalLeaseTimeMs() {
+    return followerReadLocalLeaseTimeMs;
+  }
+
+  public void setFollowerReadLocalLeaseTimeMs(long newValue) {
+    this.followerReadLocalLeaseTimeMs = newValue;
+  }
+
   public void setMaxListSize(long newValue) {
     maxListSize = newValue;
     validate();
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneShellHA.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneShellHA.java
index 4f111017a98..743214d72f4 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneShellHA.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneShellHA.java
@@ -230,7 +230,7 @@ public void reset() {
     System.setErr(OLD_ERR);
   }
 
-  private void execute(GenericCli shell, String[] args) {
+  protected void execute(GenericCli shell, String[] args) {
     LOG.info("Executing OzoneShell command with args {}", Arrays.asList(args));
     CommandLine cmd = shell.getCmd();
 
@@ -347,7 +347,7 @@ private String[] getHASetConfStrings(String[] existingArgs) 
{
   /**
    * Helper function to generate keys for testing shell command of keys.
    */
-  private void generateKeys(String volumeName, String bucketName,
+  protected void generateKeys(String volumeName, String bucketName,
                             String bucketLayout) {
     String[] args = new String[] {
         "volume", "create", "o3://" + omServiceId + volumeName};
@@ -372,7 +372,7 @@ private void generateKeys(String volumeName, String 
bucketName,
   /**
    * Helper function to get nums of keys from info of listing command.
    */
-  private int getNumOfKeys() throws UnsupportedEncodingException {
+  protected int getNumOfKeys() throws UnsupportedEncodingException {
     return out.toString(DEFAULT_ENCODING).split("key").length - 1;
   }
 
@@ -2489,4 +2489,8 @@ private static String getKeyProviderURI(MiniKMS kms) {
   protected MiniOzoneHAClusterImpl getCluster() {
     return cluster;
   }
+
+  protected OzoneShell getOzoneShell() {
+    return ozoneShell;
+  }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneShellHAWithFollowerRead.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneShellHAWithFollowerRead.java
index 2abf1e020c1..8ea5319b97d 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneShellHAWithFollowerRead.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneShellHAWithFollowerRead.java
@@ -17,6 +17,9 @@
 
 package org.apache.hadoop.ozone.shell;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServerConfig;
@@ -64,8 +67,33 @@ public void testAllowLeaderSkipLinearizableRead() throws 
Exception {
     super.testListAllKeysInternal("skipvol2");
 
     long curMetrics = 
getCluster().getOMLeader().getMetrics().getNumLeaderSkipLinearizableRead();
-    Assertions.assertEquals(lastMetrics, curMetrics);
+    assertEquals(lastMetrics, curMetrics);
 
     getCluster().getOMLeader().setConfiguration(oldConf);
   }
+
+  @Test
+  public void testAllowFollowerReadLocalLease() throws Exception {
+    OzoneConfiguration oldConf = getCluster().getConf();
+    OzoneConfiguration newConf1 = new OzoneConfiguration(oldConf);
+    newConf1.setBoolean("ozone.om.follower.read.local.lease.enabled", true);
+    OzoneConfiguration newConf2 = new OzoneConfiguration(newConf1);
+    newConf2.setLong("ozone.om.follower.read.local.lease.time.ms", -1000);
+
+    try {
+      getCluster().getOzoneManager(1).setConfiguration(newConf1);
+      getCluster().getOzoneManager(2).setConfiguration(newConf2);
+
+      String[] args = new String[]{"volume", "list"};
+      for (int i = 0; i < 100; i++) {
+        execute(getOzoneShell(), args);
+      }
+      
assertThat(getCluster().getOzoneManager(1).getMetrics().getNumFollowerReadLocalLeaseSuccess()
 > 0).isTrue();
+      assertEquals(0, 
getCluster().getOzoneManager(2).getMetrics().getNumFollowerReadLocalLeaseSuccess());
+      
assertThat(getCluster().getOzoneManager(2).getMetrics().getNumFollowerReadLocalLeaseFailTime()
 > 0).isTrue();
+    } finally {
+      getCluster().getOzoneManager(1).setConfiguration(oldConf);
+      getCluster().getOzoneManager(2).setConfiguration(oldConf);
+    }
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
index a5001bbf626..3658ec96c65 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
@@ -110,6 +110,10 @@ public class OMMetrics implements OmMetadataReaderMetrics {
   private @Metric MutableCounterLong numLinearizableRead;
   private @Metric MutableCounterLong numLeaderSkipLinearizableRead;
 
+  private @Metric MutableCounterLong numFollowerReadLocalLeaseSuccess;
+  private @Metric MutableCounterLong numFollowerReadLocalLeaseFailLog;
+  private @Metric MutableCounterLong numFollowerReadLocalLeaseFailTime;
+
   // Failure Metrics
   private @Metric MutableCounterLong numVolumeCreateFails;
   private @Metric MutableCounterLong numVolumeUpdateFails;
@@ -978,6 +982,30 @@ public long getNumLeaderSkipLinearizableRead() {
     return numLeaderSkipLinearizableRead.value();
   }
 
+  public void incNumFollowerReadLocalLeaseSuccess() {
+    numFollowerReadLocalLeaseSuccess.incr();
+  }
+
+  public long getNumFollowerReadLocalLeaseSuccess() {
+    return numFollowerReadLocalLeaseSuccess.value();
+  }
+
+  public void incNumFollowerReadLocalLeaseFailLog() {
+    numFollowerReadLocalLeaseFailLog.incr();
+  }
+
+  public long getNumFollowerReadLocalLeaseFailLog() {
+    return numFollowerReadLocalLeaseFailLog.value();
+  }
+
+  public void incNumFollowerReadLocalLeaseFailTime() {
+    numFollowerReadLocalLeaseFailTime.incr();
+  }
+
+  public long getNumFollowerReadLocalLeaseFailTime() {
+    return numFollowerReadLocalLeaseFailTime.value();
+  }
+
   @VisibleForTesting
   public long getNumVolumeCreates() {
     return numVolumeCreates.value();
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
index 2c68ee9bb16..b0181a02bb7 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
@@ -48,7 +48,12 @@
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import org.apache.hadoop.ozone.security.S3SecurityUtil;
+import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
+import org.apache.ratis.proto.RaftProtos.FollowerInfoProto;
+import org.apache.ratis.proto.RaftProtos.ServerRpcProto;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.DivisionInfo;
+import org.apache.ratis.server.RaftServer.Division;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -184,6 +189,14 @@ public OMRequest getLastRequestToSubmit() {
 
   private OMResponse submitReadRequestToOM(OMRequest request)
       throws ServiceException {
+    // Read from leader or followers using linearizable read
+    if (ozoneManager.getConfig().isFollowerReadLocalLeaseEnabled() &&
+        allowFollowerReadLocalLease(omRatisServer.getServerDivision(),
+            ozoneManager.getConfig().getFollowerReadLocalLeaseLagLimit(),
+            ozoneManager.getConfig().getFollowerReadLocalLeaseTimeMs())) {
+      ozoneManager.getMetrics().incNumFollowerReadLocalLeaseSuccess();
+      return handler.handleReadRequest(request);
+    } 
     // Get current OM's role
     RaftServerStatus raftServerStatus = omRatisServer.getLeaderStatus();
     // === 1. Follower linearizable read ===
@@ -211,6 +224,47 @@ private OMResponse submitReadRequestToOM(OMRequest request)
     }
   }
 
+  boolean allowFollowerReadLocalLease(Division ratisDivision, long 
leaseLogLimit, long leaseTimeMsLimit) {
+    final DivisionInfo divisionInfo = ratisDivision.getInfo();
+    final FollowerInfoProto followerInfo = 
divisionInfo.getRoleInfoProto().getFollowerInfo();
+    if (followerInfo == null) {
+      LOG.debug("FollowerRead Local Lease not allowed: Not a follower. ");
+      return false; // not follower
+    }
+    final ServerRpcProto leaderInfo = followerInfo.getLeaderInfo();
+    if (leaderInfo == null) {
+      LOG.debug("FollowerRead Local Lease not allowed: No Leader ");
+      return false; // no leader
+    }
+
+    if (leaderInfo.getLastRpcElapsedTimeMs() > leaseTimeMsLimit) {
+      LOG.debug("FollowerRead Local Lease not allowed: Local lease Time 
expired. ");
+      ozoneManager.getMetrics().incNumFollowerReadLocalLeaseFailTime();
+      return false; // lease time expired
+    }
+
+    final RaftPeerId leaderId = divisionInfo.getLeaderId();
+    Long leaderCommit = null;
+    if (leaderId != null) {
+      for (CommitInfoProto i : ratisDivision.getCommitInfos()) {
+        if (i.getServer().getId().equals(leaderId.toByteString())) {
+          leaderCommit = i.getCommitIndex();
+        }
+      }
+    }
+    if (leaderCommit == null) {
+      LOG.debug("FollowerRead Local Lease not allowed: Leader Commit not 
exists. ");
+      return false;
+    }
+
+    boolean ret = divisionInfo.getLastAppliedIndex() + leaseLogLimit >= 
leaderCommit;
+    if (!ret) {
+      ozoneManager.getMetrics().incNumFollowerReadLocalLeaseFailLog();
+      LOG.debug("FollowerRead Local Lease not allowed: Index Lag exceeds 
limit. ");
+    }
+    return ret;
+  }
+
   private ServiceException createLeaderErrorException(
       RaftServerStatus raftServerStatus) {
     if (raftServerStatus == NOT_LEADER) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to