This is an automated email from the ASF dual-hosted git repository.
ivandika pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 3990530379d HDDS-14426. OMResponse.leaderOMNodeId should not use
RaftClientMessage.serverId (#9682)
3990530379d is described below
commit 3990530379df6a17fa130509664c0bfbe80cb596
Author: Ivan Andika <[email protected]>
AuthorDate: Thu Jan 29 16:47:07 2026 +0800
HDDS-14426. OMResponse.leaderOMNodeId should not use
RaftClientMessage.serverId (#9682)
---
.../ozone/om/ha/OMFailoverProxyProviderBase.java | 5 ++
.../hadoop/ozone/om/helpers/OMRatisHelper.java | 8 +-
.../ozone/om/protocolPB/Hadoop3OmTransport.java | 13 +--
...stOzoneManagerHAFollowerReadWithAllRunning.java | 99 ++++++++++++----------
.../ozone/om/TestOzoneManagerHAWithAllRunning.java | 40 ---------
.../ozone/om/ratis/OzoneManagerRatisServer.java | 2 +-
.../hadoop/fs/ozone/Hadoop27RpcTransport.java | 13 +--
7 files changed, 67 insertions(+), 113 deletions(-)
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java
index d52b4970e04..57d6caf823e 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java
@@ -176,6 +176,11 @@ public synchronized String getCurrentProxyOMNodeId() {
return omProxies.getNodeId(currentProxyIndex);
}
+ @VisibleForTesting
+ public synchronized String getNextProxyOMNodeId() {
+ return omProxies.getNodeId(nextProxyIndex);
+ }
+
@VisibleForTesting
public RetryPolicy getRetryPolicy(int maxFailovers) {
// Client will attempt up to maxFailovers number of failovers between
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java
index 3741abbbe87..a483fbc6870 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java
@@ -26,6 +26,7 @@
import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.slf4j.Logger;
@@ -64,13 +65,14 @@ public static OMResponse
convertByteStringToOMResponse(ByteString bytes) throws
}
/** Convert the given reply with proto 3 {@link ByteString} to a proto 2
response. */
- public static OMResponse getOMResponseFromRaftClientReply(RaftClientReply
reply) throws IOException {
+ public static OMResponse getOMResponseFromRaftClientReply(RaftClientReply
reply, RaftPeerId leaderOMNodeId)
+ throws IOException {
final OMResponse response =
convertByteStringToOMResponse(reply.getMessage().getContent());
- if (reply.getReplierId().equals(response.getLeaderOMNodeId())) {
+ if (leaderOMNodeId == null) {
return response;
}
return OMResponse.newBuilder(response)
- .setLeaderOMNodeId(reply.getReplierId())
+ .setLeaderOMNodeId(leaderOMNodeId.toString())
.build();
}
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
index 7de5af31a62..9614e403f10 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
@@ -90,18 +90,7 @@ public Hadoop3OmTransport(ConfigurationSource conf,
@Override
public OMResponse submitRequest(OMRequest payload) throws IOException {
try {
- OMResponse omResponse =
- rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload);
-
- if (omResponse.hasLeaderOMNodeId() && omFailoverProxyProvider != null) {
- String leaderOmId = omResponse.getLeaderOMNodeId();
-
- // Failover to the OM node returned by OMResponse leaderOMNodeId if
- // current proxy is not pointing to that node.
- omFailoverProxyProvider.setNextOmProxy(leaderOmId);
- omFailoverProxyProvider.performFailover(null);
- }
- return omResponse;
+ return rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload);
} catch (ServiceException e) {
OMNotLeaderException notLeaderException =
HadoopRpcOMFailoverProxyProvider.getNotLeaderException(e);
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java
index 6bafe2a6815..7ed84972142 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java
@@ -26,7 +26,6 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertSame;
@@ -57,10 +56,15 @@
import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
import
org.apache.hadoop.ozone.om.ha.HadoopRpcOMFollowerReadFailoverProxyProvider;
import org.apache.hadoop.ozone.om.ha.OMProxyInfo;
+import org.apache.hadoop.ozone.om.protocolPB.OmTransport;
+import
org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateVolumeRequest;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListVolumeRequest;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListVolumeRequest.Scope;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo;
import
org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
@@ -122,50 +126,6 @@ void testFollowerReadTargetsFollower() throws Exception {
assertEquals(followerOMNodeId, lastProxy.getNodeId());
}
- @Test
- public void testOMProxyProviderFailoverToCurrentLeader() throws Exception {
- ObjectStore objectStore = getObjectStore();
- HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
omFailoverProxyProvider =
- OmTestUtil.getFailoverProxyProvider(objectStore);
- HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB>
followerReadFailoverProxyProvider =
- OmTestUtil.getFollowerReadFailoverProxyProvider(objectStore);
- String initialFollowerReadNodeId =
followerReadFailoverProxyProvider.getCurrentProxy().getNodeId();
-
- // Run couple of createVolume tests to discover the current Leader OM
- createVolumeTest(true);
- createVolumeTest(true);
-
- // The oMFailoverProxyProvider will point to the current leader OM node.
- String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
-
- // Perform a manual failover of the proxy provider to move the
- // currentProxyIndex to a node other than the leader OM.
- omFailoverProxyProvider.selectNextOmProxy();
- omFailoverProxyProvider.performFailover(null);
-
- String newProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
- assertNotEquals(leaderOMNodeId, newProxyNodeId);
-
- // Once another request is sent to this new proxy node, the leader
- // information must be returned via the response and a failover must
- // happen to the leader proxy node.
- // This will also do some read operations where this might read from the
follower.
- createVolumeTest(true);
- Thread.sleep(2000);
-
- String newLeaderOMNodeId =
- omFailoverProxyProvider.getCurrentProxyOMNodeId();
-
- // The old and new Leader OM NodeId must match since there was no new
- // election in the Ratis ring.
- assertEquals(leaderOMNodeId, newLeaderOMNodeId);
-
- // The follower read proxy should remain unchanged since the follower is
not throwing exceptions
- // The performFailover on the leader proxy should not affect the follower
read proxy provider
- String currentFollowerReadNodeId =
followerReadFailoverProxyProvider.getCurrentProxy().getNodeId();
- assertEquals(initialFollowerReadNodeId, currentFollowerReadNodeId);
- }
-
/**
* Choose a follower to send the request, the returned exception should
* include the suggested leader node.
@@ -461,4 +421,53 @@ public void testAllBucketOperations() throws Exception {
OzoneTestUtils.expectOmException(OMException.ResultCodes.BUCKET_NOT_FOUND,
() -> retVolume.deleteBucket(bucketName));
}
+
+ @Test
+ void testOMResponseLeaderOmNodeId() throws Exception {
+ HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
omFailoverProxyProvider =
+ OmTestUtil.getFailoverProxyProvider(getObjectStore());
+ HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB>
followerReadFailoverProxyProvider =
+ OmTestUtil.getFollowerReadFailoverProxyProvider(getObjectStore());
+
+ // Make sure All OMs are ready
+ createVolumeTest(true);
+
+ // The OMFailoverProxyProvider will point to the current leader OM node.
+ String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
+ String initialNextProxyOmNodeId =
omFailoverProxyProvider.getNextProxyOMNodeId();
+ OzoneManager followerOM = null;
+ for (OzoneManager om: getCluster().getOzoneManagersList()) {
+ if (!om.isLeaderReady()) {
+ followerOM = om;
+ break;
+ }
+ }
+ assertNotNull(followerOM);
+ assertSame(OzoneManagerRatisServer.RaftServerStatus.NOT_LEADER,
+ followerOM.getOmRatisServer().getLeaderStatus());
+
+
+ ListVolumeRequest req =
+ ListVolumeRequest.newBuilder()
+ .setScope(Scope.VOLUMES_BY_USER)
+ .build();
+
+ OzoneManagerProtocolProtos.OMRequest readRequest =
+ OzoneManagerProtocolProtos.OMRequest.newBuilder()
+ .setCmdType(Type.ListVolume)
+ .setListVolumeRequest(req)
+ .setVersion(ClientVersion.CURRENT_VERSION)
+ .setClientId(randomUUID().toString())
+ .build();
+
+ OmTransport omTransport = ((OzoneManagerProtocolClientSideTranslatorPB)
+
getObjectStore().getClientProxy().getOzoneManagerClient()).getTransport();
+
followerReadFailoverProxyProvider.changeInitialProxyForTest(followerOM.getOMNodeId());
+ OMResponse omResponse = omTransport.submitRequest(readRequest);
+
+ // The returned OM response should be the same as the actual leader OM
node ID
+ assertEquals(leaderOMNodeId, omResponse.getLeaderOMNodeId());
+ // There should not be any change in the leader proxy's next proxy OM node
ID
+ assertEquals(initialNextProxyOmNodeId,
omFailoverProxyProvider.getNextProxyOMNodeId());
+ }
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java
index 06bec648196..1baa2fe9bdc 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java
@@ -33,7 +33,6 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -311,45 +310,6 @@ void testOMProxyProviderInitialization() {
}
}
- /**
- * Test HadoopRpcOMFailoverProxyProvider failover when current OM proxy is
not
- * the current OM Leader.
- */
- @Test
- public void testOMProxyProviderFailoverToCurrentLeader() throws Exception {
- ObjectStore objectStore = getObjectStore();
- final HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
omFailoverProxyProvider
- = OmTestUtil.getFailoverProxyProvider(objectStore);
-
- // Run couple of createVolume tests to discover the current Leader OM
- createVolumeTest(true);
- createVolumeTest(true);
-
- // The oMFailoverProxyProvider will point to the current leader OM node.
- String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
-
- // Perform a manual failover of the proxy provider to move the
- // currentProxyIndex to a node other than the leader OM.
- omFailoverProxyProvider.selectNextOmProxy();
- omFailoverProxyProvider.performFailover(null);
-
- String newProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
- assertNotEquals(leaderOMNodeId, newProxyNodeId);
-
- // Once another request is sent to this new proxy node, the leader
- // information must be returned via the response and a failover must
- // happen to the leader proxy node.
- createVolumeTest(true);
- Thread.sleep(2000);
-
- String newLeaderOMNodeId =
- omFailoverProxyProvider.getCurrentProxyOMNodeId();
-
- // The old and new Leader OM NodeId must match since there was no new
- // election in the Ratis ring.
- assertEquals(leaderOMNodeId, newLeaderOMNodeId);
- }
-
/**
* Choose a follower to send the request, the returned exception should
* include the suggested leader node.
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
index aa2e42d72da..99cddaec748 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
@@ -617,7 +617,7 @@ private OMResponse createOmResponseImpl(OMRequest omRequest,
private OMResponse getOMResponse(RaftClientReply reply) throws
ServiceException {
try {
- return OMRatisHelper.getOMResponseFromRaftClientReply(reply);
+ return OMRatisHelper.getOMResponseFromRaftClientReply(reply,
getLeaderId());
} catch (IOException ex) {
if (ex.getMessage() != null) {
throw new ServiceException(ex.getMessage(), ex);
diff --git
a/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
b/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
index f66dc091de6..919e06dc33f 100644
---
a/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
+++
b/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
@@ -85,18 +85,7 @@ public Hadoop27RpcTransport(ConfigurationSource conf,
@Override
public OMResponse submitRequest(OMRequest payload) throws IOException {
try {
- OMResponse omResponse =
- rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload);
-
- if (omResponse.hasLeaderOMNodeId() && omFailoverProxyProvider != null) {
- String leaderOmId = omResponse.getLeaderOMNodeId();
-
- // Failover to the OM node returned by OMResponse leaderOMNodeId if
- // current proxy is not pointing to that node.
- omFailoverProxyProvider.setNextOmProxy(leaderOmId);
- omFailoverProxyProvider.performFailover(null);
- }
- return omResponse;
+ return rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload);
} catch (ServiceException e) {
OMNotLeaderException notLeaderException =
HadoopRpcOMFailoverProxyProvider.getNotLeaderException(e);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]