This is an automated email from the ASF dual-hosted git repository.

ivandika pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 3990530379d HDDS-14426. OMResponse.leaderOMNodeId should not use 
RaftClientMessage.serverId (#9682)
3990530379d is described below

commit 3990530379df6a17fa130509664c0bfbe80cb596
Author: Ivan Andika <[email protected]>
AuthorDate: Thu Jan 29 16:47:07 2026 +0800

    HDDS-14426. OMResponse.leaderOMNodeId should not use 
RaftClientMessage.serverId (#9682)
---
 .../ozone/om/ha/OMFailoverProxyProviderBase.java   |  5 ++
 .../hadoop/ozone/om/helpers/OMRatisHelper.java     |  8 +-
 .../ozone/om/protocolPB/Hadoop3OmTransport.java    | 13 +--
 ...stOzoneManagerHAFollowerReadWithAllRunning.java | 99 ++++++++++++----------
 .../ozone/om/TestOzoneManagerHAWithAllRunning.java | 40 ---------
 .../ozone/om/ratis/OzoneManagerRatisServer.java    |  2 +-
 .../hadoop/fs/ozone/Hadoop27RpcTransport.java      | 13 +--
 7 files changed, 67 insertions(+), 113 deletions(-)

diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java
index d52b4970e04..57d6caf823e 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java
@@ -176,6 +176,11 @@ public synchronized String getCurrentProxyOMNodeId() {
     return omProxies.getNodeId(currentProxyIndex);
   }
 
+  @VisibleForTesting
+  public synchronized String getNextProxyOMNodeId() {
+    return omProxies.getNodeId(nextProxyIndex);
+  }
+
   @VisibleForTesting
   public RetryPolicy getRetryPolicy(int maxFailovers) {
     // Client will attempt up to maxFailovers number of failovers between
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java
index 3741abbbe87..a483fbc6870 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java
@@ -26,6 +26,7 @@
 import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations;
 import org.slf4j.Logger;
@@ -64,13 +65,14 @@ public static OMResponse 
convertByteStringToOMResponse(ByteString bytes) throws
   }
 
   /** Convert the given reply with proto 3 {@link ByteString} to a proto 2 
response. */
-  public static OMResponse getOMResponseFromRaftClientReply(RaftClientReply 
reply) throws IOException {
+  public static OMResponse getOMResponseFromRaftClientReply(RaftClientReply 
reply, RaftPeerId leaderOMNodeId)
+      throws IOException {
     final OMResponse response = 
convertByteStringToOMResponse(reply.getMessage().getContent());
-    if (reply.getReplierId().equals(response.getLeaderOMNodeId())) {
+    if (leaderOMNodeId == null) {
       return response;
     }
     return OMResponse.newBuilder(response)
-        .setLeaderOMNodeId(reply.getReplierId())
+        .setLeaderOMNodeId(leaderOMNodeId.toString())
         .build();
   }
 
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
index 7de5af31a62..9614e403f10 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
@@ -90,18 +90,7 @@ public Hadoop3OmTransport(ConfigurationSource conf,
   @Override
   public OMResponse submitRequest(OMRequest payload) throws IOException {
     try {
-      OMResponse omResponse =
-          rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload);
-
-      if (omResponse.hasLeaderOMNodeId() && omFailoverProxyProvider != null) {
-        String leaderOmId = omResponse.getLeaderOMNodeId();
-
-        // Failover to the OM node returned by OMResponse leaderOMNodeId if
-        // current proxy is not pointing to that node.
-        omFailoverProxyProvider.setNextOmProxy(leaderOmId);
-        omFailoverProxyProvider.performFailover(null);
-      }
-      return omResponse;
+      return rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload);
     } catch (ServiceException e) {
       OMNotLeaderException notLeaderException =
           HadoopRpcOMFailoverProxyProvider.getNotLeaderException(e);
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java
index 6bafe2a6815..7ed84972142 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java
@@ -26,7 +26,6 @@
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNotSame;
 import static org.junit.jupiter.api.Assertions.assertSame;
@@ -57,10 +56,15 @@
 import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
 import 
org.apache.hadoop.ozone.om.ha.HadoopRpcOMFollowerReadFailoverProxyProvider;
 import org.apache.hadoop.ozone.om.ha.OMProxyInfo;
+import org.apache.hadoop.ozone.om.protocolPB.OmTransport;
+import 
org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
 import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
 import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateVolumeRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListVolumeRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListVolumeRequest.Scope;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo;
 import 
org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
@@ -122,50 +126,6 @@ void testFollowerReadTargetsFollower() throws Exception {
     assertEquals(followerOMNodeId, lastProxy.getNodeId());
   }
 
-  @Test
-  public void testOMProxyProviderFailoverToCurrentLeader() throws Exception {
-    ObjectStore objectStore = getObjectStore();
-    HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> 
omFailoverProxyProvider =
-        OmTestUtil.getFailoverProxyProvider(objectStore);
-    HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB> 
followerReadFailoverProxyProvider =
-        OmTestUtil.getFollowerReadFailoverProxyProvider(objectStore);
-    String initialFollowerReadNodeId = 
followerReadFailoverProxyProvider.getCurrentProxy().getNodeId();
-
-    // Run couple of createVolume tests to discover the current Leader OM
-    createVolumeTest(true);
-    createVolumeTest(true);
-
-    // The oMFailoverProxyProvider will point to the current leader OM node.
-    String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
-
-    // Perform a manual failover of the proxy provider to move the
-    // currentProxyIndex to a node other than the leader OM.
-    omFailoverProxyProvider.selectNextOmProxy();
-    omFailoverProxyProvider.performFailover(null);
-
-    String newProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
-    assertNotEquals(leaderOMNodeId, newProxyNodeId);
-
-    // Once another request is sent to this new proxy node, the leader
-    // information must be returned via the response and a failover must
-    // happen to the leader proxy node.
-    // This will also do some read operations where this might read from the 
follower.
-    createVolumeTest(true);
-    Thread.sleep(2000);
-
-    String newLeaderOMNodeId =
-        omFailoverProxyProvider.getCurrentProxyOMNodeId();
-
-    // The old and new Leader OM NodeId must match since there was no new
-    // election in the Ratis ring.
-    assertEquals(leaderOMNodeId, newLeaderOMNodeId);
-
-    // The follower read proxy should remain unchanged since the follower is 
not throwing exceptions
-    // The performFailover on the leader proxy should not affect the follower 
read proxy provider
-    String currentFollowerReadNodeId = 
followerReadFailoverProxyProvider.getCurrentProxy().getNodeId();
-    assertEquals(initialFollowerReadNodeId, currentFollowerReadNodeId);
-  }
-
   /**
    * Choose a follower to send the request, the returned exception should
    * include the suggested leader node.
@@ -461,4 +421,53 @@ public void testAllBucketOperations() throws Exception {
     OzoneTestUtils.expectOmException(OMException.ResultCodes.BUCKET_NOT_FOUND,
         () -> retVolume.deleteBucket(bucketName));
   }
+
+  @Test
+  void testOMResponseLeaderOmNodeId() throws Exception {
+    HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> 
omFailoverProxyProvider =
+        OmTestUtil.getFailoverProxyProvider(getObjectStore());
+    HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB> 
followerReadFailoverProxyProvider =
+        OmTestUtil.getFollowerReadFailoverProxyProvider(getObjectStore());
+
+    // Make sure All OMs are ready
+    createVolumeTest(true);
+
+    // The OMFailoverProxyProvider will point to the current leader OM node.
+    String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
+    String initialNextProxyOmNodeId = 
omFailoverProxyProvider.getNextProxyOMNodeId();
+    OzoneManager followerOM = null;
+    for (OzoneManager om: getCluster().getOzoneManagersList()) {
+      if (!om.isLeaderReady()) {
+        followerOM = om;
+        break;
+      }
+    }
+    assertNotNull(followerOM);
+    assertSame(OzoneManagerRatisServer.RaftServerStatus.NOT_LEADER,
+        followerOM.getOmRatisServer().getLeaderStatus());
+
+
+    ListVolumeRequest req =
+        ListVolumeRequest.newBuilder()
+            .setScope(Scope.VOLUMES_BY_USER)
+            .build();
+
+    OzoneManagerProtocolProtos.OMRequest readRequest =
+        OzoneManagerProtocolProtos.OMRequest.newBuilder()
+            .setCmdType(Type.ListVolume)
+            .setListVolumeRequest(req)
+            .setVersion(ClientVersion.CURRENT_VERSION)
+            .setClientId(randomUUID().toString())
+            .build();
+
+    OmTransport omTransport = ((OzoneManagerProtocolClientSideTranslatorPB)
+        
getObjectStore().getClientProxy().getOzoneManagerClient()).getTransport();
+    
followerReadFailoverProxyProvider.changeInitialProxyForTest(followerOM.getOMNodeId());
+    OMResponse omResponse = omTransport.submitRequest(readRequest);
+
+    // The returned OM response should be the same as the actual leader OM 
node ID
+    assertEquals(leaderOMNodeId, omResponse.getLeaderOMNodeId());
+    // There should not be any change in the leader proxy's next proxy OM node 
ID
+    assertEquals(initialNextProxyOmNodeId, 
omFailoverProxyProvider.getNextProxyOMNodeId());
+  }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java
index 06bec648196..1baa2fe9bdc 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java
@@ -33,7 +33,6 @@
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -311,45 +310,6 @@ void testOMProxyProviderInitialization() {
     }
   }
 
-  /**
-   * Test HadoopRpcOMFailoverProxyProvider failover when current OM proxy is 
not
-   * the current OM Leader.
-   */
-  @Test
-  public void testOMProxyProviderFailoverToCurrentLeader() throws Exception {
-    ObjectStore objectStore = getObjectStore();
-    final HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> 
omFailoverProxyProvider
-        = OmTestUtil.getFailoverProxyProvider(objectStore);
-
-    // Run couple of createVolume tests to discover the current Leader OM
-    createVolumeTest(true);
-    createVolumeTest(true);
-
-    // The oMFailoverProxyProvider will point to the current leader OM node.
-    String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
-
-    // Perform a manual failover of the proxy provider to move the
-    // currentProxyIndex to a node other than the leader OM.
-    omFailoverProxyProvider.selectNextOmProxy();
-    omFailoverProxyProvider.performFailover(null);
-
-    String newProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
-    assertNotEquals(leaderOMNodeId, newProxyNodeId);
-
-    // Once another request is sent to this new proxy node, the leader
-    // information must be returned via the response and a failover must
-    // happen to the leader proxy node.
-    createVolumeTest(true);
-    Thread.sleep(2000);
-
-    String newLeaderOMNodeId =
-        omFailoverProxyProvider.getCurrentProxyOMNodeId();
-
-    // The old and new Leader OM NodeId must match since there was no new
-    // election in the Ratis ring.
-    assertEquals(leaderOMNodeId, newLeaderOMNodeId);
-  }
-
   /**
    * Choose a follower to send the request, the returned exception should
    * include the suggested leader node.
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
index aa2e42d72da..99cddaec748 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
@@ -617,7 +617,7 @@ private OMResponse createOmResponseImpl(OMRequest omRequest,
 
   private OMResponse getOMResponse(RaftClientReply reply) throws 
ServiceException {
     try {
-      return OMRatisHelper.getOMResponseFromRaftClientReply(reply);
+      return OMRatisHelper.getOMResponseFromRaftClientReply(reply, 
getLeaderId());
     } catch (IOException ex) {
       if (ex.getMessage() != null) {
         throw new ServiceException(ex.getMessage(), ex);
diff --git 
a/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
 
b/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
index f66dc091de6..919e06dc33f 100644
--- 
a/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
+++ 
b/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
@@ -85,18 +85,7 @@ public Hadoop27RpcTransport(ConfigurationSource conf,
   @Override
   public OMResponse submitRequest(OMRequest payload) throws IOException {
     try {
-      OMResponse omResponse =
-          rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload);
-
-      if (omResponse.hasLeaderOMNodeId() && omFailoverProxyProvider != null) {
-        String leaderOmId = omResponse.getLeaderOMNodeId();
-
-        // Failover to the OM node returned by OMResponse leaderOMNodeId if
-        // current proxy is not pointing to that node.
-        omFailoverProxyProvider.setNextOmProxy(leaderOmId);
-        omFailoverProxyProvider.performFailover(null);
-      }
-      return omResponse;
+      return rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload);
     } catch (ServiceException e) {
       OMNotLeaderException notLeaderException =
           HadoopRpcOMFailoverProxyProvider.getNotLeaderException(e);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to