This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 14e7ff1e6f HDDS-10036. OzoneManagerRatisServer.getServer() should
return Division. (#5892)
14e7ff1e6f is described below
commit 14e7ff1e6fb2bf11f1df054c63b6e1729e328286
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Jan 1 04:23:59 2024 -0800
HDDS-10036. OzoneManagerRatisServer.getServer() should return Division.
(#5892)
---
.../hadoop/ozone/MiniOzoneHAClusterImpl.java | 8 +-
.../hadoop/ozone/om/TestAddRemoveOzoneManager.java | 15 ++-
.../ozone/om/TestOzoneManagerHAWithAllRunning.java | 2 +-
.../om/TestOzoneManagerHAWithStoppedNodes.java | 3 +-
.../hadoop/ozone/om/TestOzoneManagerPrepare.java | 9 +-
.../ozone/shell/TestTransferLeadershipShell.java | 13 +--
.../org/apache/hadoop/ozone/om/OzoneManager.java | 98 +++++++----------
.../ozone/om/ratis/OzoneManagerRatisServer.java | 116 ++++++++-------------
.../ozone/om/ratis/OzoneManagerStateMachine.java | 2 +-
.../ozone/om/request/upgrade/OMPrepareRequest.java | 4 +-
...OzoneManagerProtocolServerSideTranslatorPB.java | 24 +----
11 files changed, 115 insertions(+), 179 deletions(-)
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
index 97803e76bd..d79f28ba15 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
@@ -221,8 +221,12 @@ public class MiniOzoneHAClusterImpl extends
MiniOzoneClusterImpl {
@Override
public void restartOzoneManager() throws IOException {
- for (OzoneManager ozoneManager : omhaService.getActiveServices()) {
- stopOM(ozoneManager);
+ for (OzoneManager ozoneManager : omhaService.getServices()) {
+ try {
+ stopOM(ozoneManager);
+ } catch (Exception e) {
+ LOG.warn("Failed to stop OM: {}", ozoneManager.getOMServiceId());
+ }
}
omhaService.inactiveServices().forEachRemaining(omhaService::activate);
for (OzoneManager ozoneManager : omhaService.getServices()) {
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestAddRemoveOzoneManager.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestAddRemoveOzoneManager.java
index 632974475a..1fa93cf416 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestAddRemoveOzoneManager.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestAddRemoveOzoneManager.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -27,6 +27,8 @@ import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.IOUtils;
@@ -48,6 +50,8 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.tag.Flaky;
import org.apache.ratis.grpc.server.GrpcLogAppender;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.leader.FollowerInfo;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
@@ -124,6 +128,13 @@ public class TestAddRemoveOzoneManager {
}
}
+ static List<String> getCurrentPeersFromRaftConf(OzoneManagerRatisServer
omRatisServer) {
+ return
omRatisServer.getServerDivision().getRaftConf().getCurrentPeers().stream()
+ .map(RaftPeer::getId)
+ .map(RaftPeerId::toString)
+ .collect(Collectors.toList());
+ }
+
private void assertNewOMExistsInPeerList(String nodeId) throws Exception {
// Check that new peer exists in all OMs peers list and also in their Ratis
// server's peer list
@@ -132,7 +143,7 @@ public class TestAddRemoveOzoneManager {
+ " not present in Peer list of OM " + om.getOMNodeId());
assertTrue(om.getOmRatisServer().doesPeerExist(nodeId), "New OM node " +
nodeId
+ " not present in Peer list of OM " + om.getOMNodeId() + "
RatisServer");
- assertThat(om.getOmRatisServer().getCurrentPeersFromRaftConf())
+ assertThat(getCurrentPeersFromRaftConf(om.getOmRatisServer()))
.withFailMessage("New OM node " + nodeId + " not present in " +
om.getOMNodeId() + "'s RaftConf")
.contains(nodeId);
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java
index 0a8c5a6786..d55f6fd527 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java
@@ -461,7 +461,7 @@ class TestOzoneManagerHAWithAllRunning extends
TestOzoneManagerHA {
OzoneManagerRatisServer ozoneManagerRatisServer =
getCluster().getOzoneManager(currentLeaderNodeId).getOmRatisServer();
- RaftServer raftServer = ozoneManagerRatisServer.getServer();
+ final RaftServer raftServer =
ozoneManagerRatisServer.getServerDivision().getRaftServer();
ClientId clientId = ClientId.randomId();
long callId = 2000L;
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithStoppedNodes.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithStoppedNodes.java
index 3b90b1b74c..99b9cac544 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithStoppedNodes.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithStoppedNodes.java
@@ -387,7 +387,8 @@ public class TestOzoneManagerHAWithStoppedNodes extends
TestOzoneManagerHA {
final RaftProperties p = getCluster()
.getOzoneManager()
.getOmRatisServer()
- .getServer()
+ .getServerDivision()
+ .getRaftServer()
.getProperties();
final TimeDuration t = RaftServerConfigKeys.Log.Appender.waitTimeMin(p);
assertEquals(TimeDuration.ZERO, t,
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerPrepare.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerPrepare.java
index 4279055129..a154633697 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerPrepare.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerPrepare.java
@@ -55,6 +55,7 @@ import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Prepare
import org.apache.ozone.test.LambdaTestUtils;
import org.apache.ozone.test.tag.Slow;
import org.apache.ozone.test.tag.Unhealthy;
+import org.apache.ratis.server.RaftServer;
import org.apache.ratis.util.ExitUtils;
import org.apache.ozone.test.tag.Flaky;
import org.junit.jupiter.api.AfterEach;
@@ -338,11 +339,9 @@ public class TestOzoneManagerPrepare extends
TestOzoneManagerHA {
}
private boolean logFilesPresentInRatisPeer(OzoneManager om) {
- String ratisDir = om.getOmRatisServer().getServer().getProperties()
- .get("raft.server.storage.dir");
- String groupIdDirName =
- om.getOmRatisServer().getServer().getGroupIds().iterator()
- .next().getUuid().toString();
+ final RaftServer.Division server =
om.getOmRatisServer().getServerDivision();
+ final String ratisDir =
server.getRaftServer().getProperties().get("raft.server.storage.dir");
+ final String groupIdDirName =
server.getGroup().getGroupId().getUuid().toString();
File logDir = Paths.get(ratisDir, groupIdDirName, "current")
.toFile();
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestTransferLeadershipShell.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestTransferLeadershipShell.java
index fa2c24a12f..3e5377ce22 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestTransferLeadershipShell.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestTransferLeadershipShell.java
@@ -25,8 +25,6 @@ import
org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
import org.apache.hadoop.ozone.om.OzoneManager;
-import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
-import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
@@ -135,13 +133,10 @@ public class TestTransferLeadershipShell {
assertSCMResetPriorities();
}
- private void assertOMResetPriorities() throws IOException {
- OzoneManagerRatisServer ratisServer = cluster.getOMLeader()
- .getOmRatisServer();
- RaftGroupId raftGroupId = ratisServer.getRaftGroupId();
- Collection<RaftPeer> raftPeers = ratisServer
- .getServer()
- .getDivision(raftGroupId)
+ private void assertOMResetPriorities() {
+ final Collection<RaftPeer> raftPeers = cluster.getOMLeader()
+ .getOmRatisServer()
+ .getServerDivision()
.getGroup()
.getPeers();
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 3b9e78b276..c014723db5 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -386,8 +386,6 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
omClientProtocolMetrics;
private OzoneManagerHttpServer httpServer;
private final OMStorage omStorage;
- private final ScmBlockLocationProtocol scmBlockClient;
- private final StorageContainerLocationProtocol scmContainerClient;
private ObjectName omInfoBeanName;
private Timer metricsTimer;
private ScheduleOMMetricsWriteTask scheduleOMMetricsWriteTask;
@@ -603,9 +601,9 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
// Honor property 'hadoop.security.token.service.use_ip'
omRpcAddressTxt = new Text(SecurityUtil.buildTokenService(omNodeRpcAddr));
- scmContainerClient = getScmContainerClient(configuration);
+ final StorageContainerLocationProtocol scmContainerClient =
getScmContainerClient(configuration);
// verifies that the SCM info in the OM Version file is correct.
- scmBlockClient = getScmBlockClient(configuration);
+ final ScmBlockLocationProtocol scmBlockClient =
getScmBlockClient(configuration);
this.scmClient = new ScmClient(scmBlockClient, scmContainerClient,
configuration);
this.ozoneLockProvider = new OzoneLockProvider(getKeyPathLockEnabled(),
@@ -1904,7 +1902,7 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
* change could be to add or to remove an OM from the ring.
*/
public void updatePeerList(List<String> newPeers) {
- List<String> currentPeers = omRatisServer.getPeerIds();
+ final Set<String> currentPeers = omRatisServer.getPeerIds();
// NodeIds present in new node list and not in current peer list are the
// bootstapped OMs and should be added to the peer list
@@ -2411,16 +2409,15 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
@Override
public void cancelDelegationToken(Token<OzoneTokenIdentifier> token)
throws OMException {
- OzoneTokenIdentifier id = null;
try {
String canceller = getRemoteUser().getUserName();
- id = delegationTokenMgr.cancelToken(token, canceller);
+ final OzoneTokenIdentifier id = delegationTokenMgr.cancelToken(token,
canceller);
LOG.trace("Delegation token cancelled for dt: {}", id);
} catch (OMException oex) {
throw oex;
} catch (IOException ex) {
LOG.error("Delegation token cancellation failed for dt id: {}, cause:
{}",
- id, ex.getMessage());
+ token.getIdentifier(), ex.getMessage());
throw new OMException("Delegation token renewal failed for dt: " + token,
ex, TOKEN_ERROR_OTHER);
}
@@ -3029,40 +3026,33 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
return "" + omRpcAddress.getPort();
}
+ private static List<List<String>> getRatisRolesException(String
exceptionString) {
+ return
Collections.singletonList(Collections.singletonList(exceptionString));
+ }
+
@Override
public List<List<String>> getRatisRoles() {
- List<ServiceInfo> serviceList;
- List<List<String>> resultList = new ArrayList<>();
- List<String> messageException = new ArrayList<>();
int port = omNodeDetails.getRatisPort();
- RaftPeer leaderId;
- if (isRatisEnabled) {
- if (null == omRatisServer) {
- messageException.add("Server is shutting down");
- resultList.add(messageException);
- return resultList;
- }
- try {
- leaderId = omRatisServer.getLeader();
- if (leaderId == null) {
- LOG.error("No leader found");
- messageException.add("Exception: Not a Leader");
- resultList.add(messageException);
- return resultList;
- }
- serviceList = getServiceList();
- } catch (IOException e) {
- LOG.error("IO-Exception Occurred", e);
- messageException.add("IO-Exception Occurred, " + e.getMessage());
- resultList.add(messageException);
- return resultList;
- }
- return OmUtils.format(serviceList, port, leaderId.getId().toString());
- } else {
- messageException.add("Ratis Disabled");
- resultList.add(messageException);
- return resultList;
+ if (!isRatisEnabled) {
+ return getRatisRolesException("Ratis is disabled");
+ }
+ if (null == omRatisServer) {
+ return getRatisRolesException("Server is shutting down");
}
+ final RaftPeerId leaderId = omRatisServer.getLeaderId();
+ if (leaderId == null) {
+ LOG.error("No leader found");
+ return getRatisRolesException("No leader found");
+ }
+
+ final List<ServiceInfo> serviceList;
+ try {
+ serviceList = getServiceList();
+ } catch (IOException e) {
+ LOG.error("Failed to getServiceList", e);
+ return getRatisRolesException("IO-Exception Occurred, " +
e.getMessage());
+ }
+ return OmUtils.format(serviceList, port, leaderId.toString());
}
/**
@@ -3212,16 +3202,15 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
Map<String, String> auditMap = new LinkedHashMap<>();
auditMap.put("newLeaderId", newLeaderId);
try {
- RaftGroupId groupID = omRatisServer.getRaftGroup().getGroupId();
- RaftServer.Division division = omRatisServer.getServer()
- .getDivision(groupID);
- RaftPeerId targetPeerId;
+ final RaftServer.Division division = omRatisServer.getServerDivision();
+ final RaftPeerId targetPeerId;
if (newLeaderId.isEmpty()) {
- RaftPeer curLeader = omRatisServer.getLeader();
- targetPeerId = division.getGroup()
- .getPeers().stream().filter(a -> !a.equals(curLeader)).findFirst()
- .map(RaftPeer::getId).orElseThrow(() -> new IOException("Cannot" +
- " find a new leader to transfer leadership."));
+ final RaftPeerId curLeader = omRatisServer.getLeaderId();
+ targetPeerId = division.getGroup().getPeers().stream()
+ .map(RaftPeer::getId)
+ .filter(id -> !id.equals(curLeader))
+ .findFirst()
+ .orElseThrow(() -> new IOException("Cannot find a new leader to
transfer leadership."));
} else {
targetPeerId = RaftPeerId.valueOf(newLeaderId);
}
@@ -4124,24 +4113,13 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
OzoneManagerRatisServer.RaftServerStatus raftServerStatus =
omRatisServer.checkLeaderStatus();
RaftPeerId raftPeerId = omRatisServer.getRaftPeerId();
- RaftPeerId raftLeaderId = null;
- String raftLeaderAddress = null;
- RaftPeer leader = omRatisServer.getLeader();
- if (null != leader) {
- raftLeaderId = leader.getId();
- raftLeaderAddress = omRatisServer.getRaftLeaderAddress(leader);
- }
switch (raftServerStatus) {
case LEADER_AND_READY: return;
case LEADER_AND_NOT_READY:
- throw new OMLeaderNotReadyException(
- raftPeerId.toString() + " is Leader " +
- "but not ready to process request yet.");
+ throw new OMLeaderNotReadyException(raftPeerId + " is Leader but not
ready to process request yet.");
case NOT_LEADER:
- throw raftLeaderId == null ? new OMNotLeaderException(raftPeerId) :
- new OMNotLeaderException(raftPeerId, raftLeaderId,
- raftLeaderAddress);
+ throw omRatisServer.newOMNotLeaderException();
default: throw new IllegalStateException(
"Unknown Ratis Server state: " + raftServerStatus);
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
index f26a5e300a..d6a5b3810c 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
@@ -31,15 +31,17 @@ import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -86,8 +88,8 @@ import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.RaftStorage;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.LifeCycle;
+import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.StringUtils;
import org.apache.ratis.util.TimeDuration;
@@ -104,12 +106,11 @@ import static
org.apache.hadoop.util.MetricUtil.captureLatencyNs;
* Creates a Ratis server endpoint for OM.
*/
public final class OzoneManagerRatisServer {
- private static final Logger LOG = LoggerFactory
- .getLogger(OzoneManagerRatisServer.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(OzoneManagerRatisServer.class);
private final int port;
- private final InetSocketAddress omRatisAddress;
private final RaftServer server;
+ private final Supplier<RaftServer.Division> serverDivision;
private final RaftGroupId raftGroupId;
private final RaftGroup raftGroup;
private final RaftPeerId raftPeerId;
@@ -144,7 +145,6 @@ public final class OzoneManagerRatisServer {
SecurityConfig secConfig, CertificateClient certClient)
throws IOException {
this.ozoneManager = om;
- this.omRatisAddress = addr;
this.port = addr.getPort();
this.ratisStorageDir = OzoneManagerRatisUtils.getOMRatisDirectory(conf);
final RaftProperties serverProperties = newRaftProperties(
@@ -166,7 +166,7 @@ public final class OzoneManagerRatisServer {
raftPeersStr.append(", ").append(peer.getAddress());
}
LOG.info("Instantiating OM Ratis server with groupID: {} and peers: {}",
- raftGroupIdStr, raftPeersStr.toString().substring(2));
+ raftGroupIdStr, raftPeersStr.substring(2));
}
this.omStateMachine = getStateMachine(conf);
@@ -179,6 +179,13 @@ public final class OzoneManagerRatisServer {
.setStateMachine(omStateMachine)
.setOption(RaftStorage.StartupOption.RECOVER)
.build();
+ this.serverDivision = MemoizedSupplier.valueOf(() -> {
+ try {
+ return server.getDivision(raftGroupId);
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to getDivision for " +
raftGroupId, e);
+ }
+ });
this.perfMetrics = om.getPerfMetrics();
}
@@ -287,10 +294,9 @@ public final class OzoneManagerRatisServer {
}
private RaftClientRequest createRaftRequest(OMRequest omRequest) {
- RaftClientRequest raftClientRequest = captureLatencyNs(
+ return captureLatencyNs(
perfMetrics.getCreateRatisRequestLatencyNs(),
() -> createRaftRequestImpl(omRequest));
- return raftClientRequest;
}
/**
@@ -341,8 +347,7 @@ public final class OzoneManagerRatisServer {
" new OM peer {} to the Ratis group {}",
ozoneManager.getOMNodeId(),
newRaftPeer, raftGroup);
- List<RaftPeer> newPeersList = new ArrayList<>();
- newPeersList.addAll(raftPeerMap.values());
+ final List<RaftPeer> newPeersList = new ArrayList<>(raftPeerMap.values());
newPeersList.add(newRaftPeer);
checkLeaderStatus();
@@ -373,9 +378,10 @@ public final class OzoneManagerRatisServer {
"remove OM peer {} from Ratis group {}",
ozoneManager.getOMNodeId(),
removeNodeId, raftGroup);
- List<RaftPeer> newPeersList = new ArrayList<>();
- newPeersList.addAll(raftPeerMap.values());
- newPeersList.remove(raftPeerMap.get(removeNodeId));
+ final List<RaftPeer> newPeersList = raftPeerMap.entrySet().stream()
+ .filter(e -> !e.getKey().equals(removeNodeId))
+ .map(Map.Entry::getValue)
+ .collect(Collectors.toList());
checkLeaderStatus();
SetConfigurationRequest request = new SetConfigurationRequest(clientId,
@@ -397,10 +403,8 @@ public final class OzoneManagerRatisServer {
/**
* Return a list of peer NodeIds.
*/
- public List<String> getPeerIds() {
- List<String> peerIds = new ArrayList<>();
- peerIds.addAll(raftPeerMap.keySet());
- return peerIds;
+ public Set<String> getPeerIds() {
+ return Collections.unmodifiableSet(raftPeerMap.keySet());
}
/**
@@ -555,8 +559,8 @@ public final class OzoneManagerRatisServer {
}
@VisibleForTesting
- public RaftServer getServer() {
- return server;
+ public RaftServer.Division getServerDivision() {
+ return serverDivision.get();
}
/**
@@ -767,25 +771,18 @@ public final class OzoneManagerRatisServer {
.getPropsMatchPrefixAndTrimPrefix(OZONE_OM_HA_PREFIX + ".");
}
- public RaftPeer getLeader() {
- try {
- RaftServer.Division division = server.getDivision(raftGroupId);
- if (division != null) {
- if (division.getInfo().isLeader()) {
- return division.getPeer();
- } else {
- ByteString leaderId = division.getInfo().getRoleInfoProto()
- .getFollowerInfo().getLeaderInfo().getId().getId();
- return leaderId.isEmpty() ? null :
- division.getRaftConf().getPeer(RaftPeerId.valueOf(leaderId));
- }
- }
- } catch (IOException e) {
- // In this case we return not a leader.
- LOG.error("Fail to get RaftServer impl and therefore it's not clear " +
- "whether it's leader. ", e);
+ public RaftPeerId getLeaderId() {
+ return getServerDivision().getInfo().getLeaderId();
+ }
+
+ public OMNotLeaderException newOMNotLeaderException() {
+ final RaftPeerId leaderId = getLeaderId();
+ final RaftPeer leader = leaderId == null ? null :
getServerDivision().getRaftConf().getPeer(leaderId);
+ if (leader == null) {
+ return new OMNotLeaderException(raftPeerId);
}
- return null;
+ final String leaderAddress = getRaftLeaderAddress(leader);
+ return new OMNotLeaderException(raftPeerId, leader.getId(), leaderAddress);
}
/**
@@ -803,40 +800,15 @@ public final class OzoneManagerRatisServer {
* @return RaftServerStatus.
*/
public RaftServerStatus checkLeaderStatus() {
- try {
- RaftServer.Division division = server.getDivision(raftGroupId);
- if (division != null) {
- if (!division.getInfo().isLeader()) {
- return RaftServerStatus.NOT_LEADER;
- } else if (division.getInfo().isLeaderReady()) {
- return RaftServerStatus.LEADER_AND_READY;
- } else {
- return RaftServerStatus.LEADER_AND_NOT_READY;
- }
- }
- } catch (IOException ioe) {
- // In this case we return not a leader.
- LOG.error("Fail to get RaftServer impl and therefore it's not clear " +
- "whether it's leader. ", ioe);
- }
- return RaftServerStatus.NOT_LEADER;
- }
-
- /**
- * Get list of peer NodeIds from Ratis.
- * @return List of Peer NodeId's.
- */
- @VisibleForTesting
- public List<String> getCurrentPeersFromRaftConf() throws IOException {
- try {
- Collection<RaftPeer> currentPeers =
- server.getDivision(raftGroupId).getRaftConf().getCurrentPeers();
- List<String> currentPeerList = new ArrayList<>();
- currentPeers.forEach(e -> currentPeerList.add(e.getId().toString()));
- return currentPeerList;
- } catch (IOException e) {
- // In this case we return not a leader.
- throw new IOException("Failed to get peer information from Ratis.", e);
+ final RaftServer.Division division = getServerDivision();
+ if (division == null) {
+ return RaftServerStatus.NOT_LEADER;
+ } else if (!division.getInfo().isLeader()) {
+ return RaftServerStatus.NOT_LEADER;
+ } else if (division.getInfo().isLeaderReady()) {
+ return RaftServerStatus.LEADER_AND_READY;
+ } else {
+ return RaftServerStatus.LEADER_AND_NOT_READY;
}
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
index a25261ded9..46979b6e7f 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
@@ -237,7 +237,7 @@ public class OzoneManagerStateMachine extends
BaseStateMachine {
case SUCCESS:
case SNAPSHOT_UNAVAILABLE:
// Currently, only trigger for the one who installed snapshot
- if (ozoneManager.getOmRatisServer().getServer().getPeer().equals(peer)) {
+ if
(ozoneManager.getOmRatisServer().getServerDivision().getPeer().equals(peer)) {
ozoneManager.getOmSnapshotProvider().init();
}
break;
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
index 456da227b5..053daa1c7c 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
@@ -99,9 +99,7 @@ public class OMPrepareRequest extends OMClientRequest {
doubleBuffer.add(response, termIndex);
OzoneManagerRatisServer omRatisServer = ozoneManager.getOmRatisServer();
- RaftServer.Division division =
- omRatisServer.getServer()
- .getDivision(omRatisServer.getRaftGroup().getGroupId());
+ final RaftServer.Division division = omRatisServer.getServerDivision();
// Wait for outstanding double buffer entries to flush to disk,
// so they will not be purged from the log before being persisted to
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
index a33208a6d3..bebb2b4324 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.ozone.om.OMPerformanceMetrics;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
-import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerDoubleBuffer;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
@@ -57,7 +56,6 @@ import com.google.protobuf.ProtocolMessageEnum;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.ozone.security.S3SecurityUtil;
-import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.util.ExitUtils;
import org.slf4j.Logger;
@@ -272,32 +270,12 @@ public class OzoneManagerProtocolServerSideTranslatorPB
implements
private ServiceException createLeaderErrorException(
RaftServerStatus raftServerStatus) {
if (raftServerStatus == NOT_LEADER) {
- return createNotLeaderException();
+ return new ServiceException(omRatisServer.newOMNotLeaderException());
} else {
return createLeaderNotReadyException();
}
}
- private ServiceException createNotLeaderException() {
- RaftPeerId raftPeerId = omRatisServer.getRaftPeerId();
- RaftPeerId raftLeaderId = null;
- String raftLeaderAddress = null;
- RaftPeer leader = omRatisServer.getLeader();
- if (null != leader) {
- raftLeaderId = leader.getId();
- raftLeaderAddress = omRatisServer.getRaftLeaderAddress(leader);
- }
-
- OMNotLeaderException notLeaderException =
- raftLeaderId == null ? new OMNotLeaderException(raftPeerId) :
- new OMNotLeaderException(raftPeerId, raftLeaderId,
- raftLeaderAddress);
-
- LOG.debug(notLeaderException.getMessage());
-
- return new ServiceException(notLeaderException);
- }
-
private ServiceException createLeaderNotReadyException() {
RaftPeerId raftPeerId = omRatisServer.getRaftPeerId();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]