This is an automated email from the ASF dual-hosted git repository.
licheng pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-2823 by this push:
new 0aa9ba3 HDDS-3988: DN can distinguish SCMCommand from stale leader
SCM (#1314)
0aa9ba3 is described below
commit 0aa9ba3e354193855e43b488d325cd64137c5648
Author: GlenGeng <[email protected]>
AuthorDate: Mon Dec 7 13:10:39 2020 +0800
HDDS-3988: DN can distinguish SCMCommand from stale leader SCM (#1314)
* HDDS-3988: DN can distinguish SCMCommand from stale leader SCM.
* HDDS-3988: fix comments
---
.../common/statemachine/StateContext.java | 94 +++++++++++++++++++++-
.../states/endpoint/HeartbeatEndpointTask.java | 18 +++++
.../ozone/container/ozoneimpl/OzoneContainer.java | 8 ++
.../hadoop/ozone/protocol/commands/SCMCommand.java | 22 ++++-
.../proto/ScmServerDatanodeHeartbeatProtocol.proto | 6 ++
.../apache/hadoop/hdds/scm/ha/SCMHAManager.java | 9 ++-
.../hadoop/hdds/scm/ha/SCMHAManagerImpl.java | 26 +++---
.../hadoop/hdds/scm/node/SCMNodeManager.java | 29 ++++++-
.../hdds/scm/pipeline/PipelineManagerV2Impl.java | 11 ++-
.../hdds/scm/server/SCMDatanodeProtocolServer.java | 5 ++
.../hdds/scm/server/StorageContainerManager.java | 4 +-
.../hadoop/hdds/scm/ha/MockSCMHAManager.java | 5 +-
pom.xml | 2 +-
13 files changed, 208 insertions(+), 31 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 4cd769f..f39755f 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -23,6 +23,7 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -31,6 +32,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -81,6 +83,18 @@ public class StateContext {
private final AtomicLong threadPoolNotAvailableCount;
/**
+ * term of latest leader SCM, extract from SCMCommand.
+ *
+ * Only leader SCM (both latest and stale) can send out SCMCommand,
+ * which will save its term in SCMCommand. Since latest leader SCM
+ * always has the highest term, term can be used to detect SCMCommand
+ * from stale leader SCM.
+ *
+ * For non-HA mode, term of SCMCommand will be 0.
+ */
+ private Optional<Long> termOfLeaderSCM = Optional.empty();
+
+ /**
* Starting with a 2 sec heartbeat frequency which will be updated to the
* real HB frequency after scm registration. With this method the
* initial registration could be significant faster.
@@ -471,6 +485,65 @@ public class StateContext {
}
/**
+ * After startup, datanode needs detect latest leader SCM before handling
+ * any SCMCommand, so that it won't be disturbed by stale leader SCM.
+ *
+ * The rule is: after majority SCMs are in HEARTBEAT state and has
+ * heard from leader SCMs (commandQueue is not empty), datanode will init
+ * termOfLeaderSCM with the max term found in commandQueue.
+ *
+ * The init process also works for non-HA mode. In that case, term of all
+ * SCMCommands will be 0.
+ */
+ private void initTermOfLeaderSCM() {
+ // only init once
+ if (termOfLeaderSCM.isPresent()) {
+ return;
+ }
+
+ AtomicInteger scmNum = new AtomicInteger(0);
+ AtomicInteger activeScmNum = new AtomicInteger(0);
+
+ getParent().getConnectionManager().getValues()
+ .forEach(endpoint -> {
+ if (endpoint.isPassive()) {
+ return;
+ }
+ scmNum.incrementAndGet();
+ if (endpoint.getState()
+ == EndpointStateMachine.EndPointStates.HEARTBEAT) {
+ activeScmNum.incrementAndGet();
+ }
+ });
+
+ // majority SCMs should be in HEARTBEAT state.
+ if (activeScmNum.get() < scmNum.get() / 2 + 1) {
+ return;
+ }
+
+ // if commandQueue is not empty, init termOfLeaderSCM
+ // with the largest term found in commandQueue
+ commandQueue.stream()
+ .mapToLong(SCMCommand::getTerm)
+ .max()
+ .ifPresent(term -> termOfLeaderSCM = Optional.of(term));
+ }
+
+ /**
+ * monotonically increase termOfLeaderSCM.
+ * Always record the latest term that has seen.
+ */
+ private void updateTermOfLeaderSCM(SCMCommand<?> command) {
+ if (!termOfLeaderSCM.isPresent()) {
+ LOG.error("should init termOfLeaderSCM before update it.");
+ return;
+ }
+
+ termOfLeaderSCM = Optional.of(
+ Long.max(termOfLeaderSCM.get(), command.getTerm()));
+ }
+
+ /**
* Returns the next command or null if it is empty.
*
* @return SCMCommand or Null.
@@ -478,7 +551,26 @@ public class StateContext {
public SCMCommand getNextCommand() {
lock.lock();
try {
- return commandQueue.poll();
+ initTermOfLeaderSCM();
+ if (!termOfLeaderSCM.isPresent()) {
+ return null; // not ready yet
+ }
+
+ while (true) {
+ SCMCommand<?> command = commandQueue.poll();
+ if (command == null) {
+ return null;
+ }
+
+ updateTermOfLeaderSCM(command);
+ if (command.getTerm() == termOfLeaderSCM.get()) {
+ return command;
+ }
+
+ LOG.warn("Detect and drop a SCMCommand {} from stale leader SCM," +
+ " stale term {}, latest term {}.",
+ command, command.getTerm(), termOfLeaderSCM.get());
+ }
} finally {
lock.unlock();
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index da2034d..eac7b37 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -272,6 +272,9 @@ public class HeartbeatEndpointTask
DeleteBlocksCommand db = DeleteBlocksCommand
.getFromProtobuf(
commandResponseProto.getDeleteBlocksCommandProto());
+ if (commandResponseProto.hasTerm()) {
+ db.setTerm(commandResponseProto.getTerm());
+ }
if (!db.blocksTobeDeleted().isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug(DeletedContainerBlocksSummary
@@ -285,6 +288,9 @@ public class HeartbeatEndpointTask
CloseContainerCommand closeContainer =
CloseContainerCommand.getFromProtobuf(
commandResponseProto.getCloseContainerCommandProto());
+ if (commandResponseProto.hasTerm()) {
+ closeContainer.setTerm(commandResponseProto.getTerm());
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("Received SCM container close request for container {}",
closeContainer.getContainerID());
@@ -295,6 +301,9 @@ public class HeartbeatEndpointTask
ReplicateContainerCommand replicateContainerCommand =
ReplicateContainerCommand.getFromProtobuf(
commandResponseProto.getReplicateContainerCommandProto());
+ if (commandResponseProto.hasTerm()) {
+ replicateContainerCommand.setTerm(commandResponseProto.getTerm());
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("Received SCM container replicate request for container
{}",
replicateContainerCommand.getContainerID());
@@ -305,6 +314,9 @@ public class HeartbeatEndpointTask
DeleteContainerCommand deleteContainerCommand =
DeleteContainerCommand.getFromProtobuf(
commandResponseProto.getDeleteContainerCommandProto());
+ if (commandResponseProto.hasTerm()) {
+ deleteContainerCommand.setTerm(commandResponseProto.getTerm());
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("Received SCM delete container request for container {}",
deleteContainerCommand.getContainerID());
@@ -315,6 +327,9 @@ public class HeartbeatEndpointTask
CreatePipelineCommand createPipelineCommand =
CreatePipelineCommand.getFromProtobuf(
commandResponseProto.getCreatePipelineCommandProto());
+ if (commandResponseProto.hasTerm()) {
+ createPipelineCommand.setTerm(commandResponseProto.getTerm());
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("Received SCM create pipeline request {}",
createPipelineCommand.getPipelineID());
@@ -325,6 +340,9 @@ public class HeartbeatEndpointTask
ClosePipelineCommand closePipelineCommand =
ClosePipelineCommand.getFromProtobuf(
commandResponseProto.getClosePipelineCommandProto());
+ if (commandResponseProto.hasTerm()) {
+ closePipelineCommand.setTerm(commandResponseProto.getTerm());
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("Received SCM close pipeline request {}",
closePipelineCommand.getPipelineID());
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index a44ef38..5fd1690 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -85,6 +86,7 @@ public class OzoneContainer {
private List<ContainerDataScanner> dataScanners;
private final BlockDeletingService blockDeletingService;
private final GrpcTlsConfig tlsClientConfig;
+ private final AtomicBoolean isStarted;
/**
* Construct OzoneContainer object.
@@ -152,6 +154,8 @@ public class OzoneContainer {
TimeUnit.MILLISECONDS, config);
tlsClientConfig = RatisHelper.createTlsClientConfig(
secConf, certClient != null ? certClient.getCACertificate() : null);
+
+ isStarted = new AtomicBoolean(false);
}
public GrpcTlsConfig getTlsClientConfig() {
@@ -240,6 +244,10 @@ public class OzoneContainer {
* @throws IOException
*/
public void start(String scmId) throws IOException {
+ if (!isStarted.compareAndSet(false, true)) {
+ LOG.info("Ignore. OzoneContainer already started.");
+ return;
+ }
LOG.info("Attempting to start container services.");
startContainerScrub();
writeChannel.start();
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
index 3c4e05b..4d87bb0 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
@@ -30,7 +30,13 @@ import
org.apache.hadoop.hdds.server.events.IdentifiableEventPayload;
*/
public abstract class SCMCommand<T extends GeneratedMessage> implements
IdentifiableEventPayload {
- private long id;
+ private final long id;
+
+ // Under HA mode, holds term of underlying RaftServer iff current
+ // SCM is a leader, otherwise, holds term 0.
+ // Notes that, the first elected leader is from term 1, term 0,
+ // as the initial value of currentTerm, is never used under HA mode.
+ private long term = 0;
SCMCommand() {
this.id = HddsIdFactory.getLongId();
@@ -59,4 +65,18 @@ public abstract class SCMCommand<T extends GeneratedMessage>
implements
return id;
}
+ /**
+ * Get term of this command.
+ * @return term
+ */
+ public long getTerm() {
+ return term;
+ }
+
+ /**
+ * Set term of this command.
+ */
+ public void setTerm(long term) {
+ this.term = term;
+ }
}
diff --git
a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
index 4f610ff..973789a 100644
---
a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
+++
b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
@@ -303,6 +303,12 @@ message SCMCommandProto {
optional ReplicateContainerCommandProto replicateContainerCommandProto = 6;
optional CreatePipelineCommandProto createPipelineCommandProto = 7;
optional ClosePipelineCommandProto closePipelineCommandProto = 8;
+
+ // Under HA mode, holds term of underlying RaftServer iff current
+ // SCM is a leader, otherwise, holds term 0.
+ // Notes that, the first elected leader is from term 1, term 0,
+ // as the initial value of currentTerm, is never used under HA mode.
+ optional uint64 term = 15;
}
/**
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
index 8ee26a2..0fd5e82 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
@@ -22,6 +22,7 @@ import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import java.io.IOException;
+import java.util.Optional;
/**
* SCMHAManager provides HA service for SCM.
@@ -34,9 +35,13 @@ public interface SCMHAManager {
void start() throws IOException;
/**
- * Returns true if the current SCM is the leader.
+ * For HA mode, return an Optional that holds term of the
+ * underlying RaftServer iff current SCM is in leader role.
+ * Otherwise, return an empty optional.
+ *
+ * For non-HA mode, return an Optional that holds term 0.
*/
- boolean isLeader();
+ Optional<Long> isLeader();
/**
* Returns RatisServer instance associated with the SCM instance.
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
index 33f408d..5271ac6 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
@@ -21,6 +21,7 @@ import com.google.common.base.Preconditions;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
@@ -32,6 +33,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.Optional;
/**
* SCMHAManagerImpl uses Apache Ratis for HA implementation. We will have 2N+1
@@ -70,29 +72,28 @@ public class SCMHAManagerImpl implements SCMHAManager {
* {@inheritDoc}
*/
@Override
- public boolean isLeader() {
+ public Optional<Long> isLeader() {
if (!SCMHAUtils.isSCMHAEnabled(conf)) {
// When SCM HA is not enabled, the current SCM is always the leader.
- return true;
+ return Optional.of((long)0);
}
RaftServer server = ratisServer.getServer();
Preconditions.checkState(server instanceof RaftServerProxy);
- RaftServerImpl serverImpl = null;
try {
// SCM only has one raft group.
- serverImpl = ((RaftServerProxy) server)
+ RaftServerImpl serverImpl = ((RaftServerProxy) server)
.getImpl(ratisServer.getRaftGroupId());
if (serverImpl != null) {
- // Only when it's sure the current SCM is the leader, otherwise
- // it should all return false.
- return serverImpl.isLeader();
+ RaftProtos.RoleInfoProto roleInfoProto = serverImpl.getRoleInfoProto();
+ return roleInfoProto.hasLeaderInfo()
+ ? Optional.of(roleInfoProto.getLeaderInfo().getTerm())
+ : Optional.empty();
}
} catch (IOException ioe) {
LOG.error("Fail to get RaftServer impl and therefore it's not clear " +
"whether it's leader. ", ioe);
}
-
- return false;
+ return Optional.empty();
}
/**
@@ -104,11 +105,6 @@ public class SCMHAManagerImpl implements SCMHAManager {
}
private RaftPeerId getPeerIdFromRoleInfo(RaftServerImpl serverImpl) {
- /*
- TODO: Fix Me
- Ratis API has changed.
- RaftServerImpl#getRoleInfoProto is no more public.
-
if (serverImpl.isLeader()) {
return RaftPeerId.getRaftPeerId(
serverImpl.getRoleInfoProto().getLeaderInfo().toString());
@@ -119,8 +115,6 @@ public class SCMHAManagerImpl implements SCMHAManager {
} else {
return null;
}
- */
- return null;
}
@Override
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 328f271..89fd99e 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.Collections;
@@ -47,6 +48,7 @@ import org.apache.hadoop.hdds.scm.VersionInfo;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
@@ -106,13 +108,16 @@ public class SCMNodeManager implements NodeManager {
new ConcurrentHashMap<>();
private final int numPipelinesPerMetadataVolume;
private final int heavyNodeCriteria;
+ private final SCMHAManager scmhaManager;
/**
* Constructs SCM machine Manager.
*/
public SCMNodeManager(OzoneConfiguration conf,
- SCMStorageConfig scmStorageConfig, EventPublisher eventPublisher,
- NetworkTopology networkTopology) {
+ SCMStorageConfig scmStorageConfig,
+ EventPublisher eventPublisher,
+ NetworkTopology networkTopology,
+ SCMHAManager scmhaManager) {
this.nodeStateManager = new NodeStateManager(conf, eventPublisher);
this.version = VersionInfo.getLatestVersion();
this.commandQueue = new CommandQueue();
@@ -138,6 +143,14 @@ public class SCMNodeManager implements NodeManager {
ScmConfigKeys.OZONE_SCM_PIPELINE_PER_METADATA_VOLUME_DEFAULT);
String dnLimit = conf.get(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT);
this.heavyNodeCriteria = dnLimit == null ? 0 : Integer.parseInt(dnLimit);
+ this.scmhaManager = scmhaManager;
+ }
+
+ public SCMNodeManager(OzoneConfiguration conf,
+ SCMStorageConfig scmStorageConfig,
+ EventPublisher eventPublisher,
+ NetworkTopology networkTopology) {
+ this(conf, scmStorageConfig, eventPublisher, networkTopology, null);
}
private void registerMXBean() {
@@ -658,6 +671,18 @@ public class SCMNodeManager implements NodeManager {
// Refactor and remove all the usage of this method and delete this method.
@Override
public void addDatanodeCommand(UUID dnId, SCMCommand command) {
+ if (scmhaManager != null && command.getTerm() == 0) {
+ Optional<Long> termOpt = scmhaManager.isLeader();
+
+ if (!termOpt.isPresent()) {
+ LOG.warn("Not leader, drop SCMCommand {}.", command);
+ return;
+ }
+
+ LOG.warn("Help set term {} for SCMCommand {}. It is not an accurate " +
+ "way to set term of SCMCommand.", termOpt.get(), command);
+ command.setTerm(termOpt.get());
+ }
this.commandQueue.addCommand(dnId, command);
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
index 041c941..48fbdbf 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
@@ -49,6 +49,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -637,13 +638,15 @@ public final class PipelineManagerV2Impl implements
PipelineManager {
}
/**
- * Check if scm is current leader.
- * @throws NotLeaderException when it's not the current leader.
+ * return term of underlying RaftServer if role of SCM is leader.
+ * @throws NotLeaderException when it's not leader.
*/
- private void checkLeader() throws NotLeaderException {
- if (!scmhaManager.isLeader()) {
+ private long checkLeader() throws NotLeaderException {
+ Optional<Long> termOpt = scmhaManager.isLeader();
+ if (!termOpt.isPresent()) {
throw scmhaManager.triggerNotLeaderException();
}
+ return termOpt.get();
}
private void setBackgroundPipelineCreator(
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
index a295341..b71f906 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
@@ -292,6 +292,11 @@ public class SCMDatanodeProtocolServer implements
throws IOException {
SCMCommandProto.Builder builder =
SCMCommandProto.newBuilder();
+
+ // In HA mode, it is the term of current leader SCM.
+ // In non-HA mode, it is the default value 0.
+ builder.setTerm(cmd.getTerm());
+
switch (cmd.getType()) {
case reregisterCommand:
return builder
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 5843d5a..501472d 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -425,7 +425,7 @@ public final class StorageContainerManager extends
ServiceRuntimeInfoImpl
scmNodeManager = configurator.getScmNodeManager();
} else {
scmNodeManager = new SCMNodeManager(
- conf, scmStorageConfig, eventQueue, clusterMap);
+ conf, scmStorageConfig, eventQueue, clusterMap, scmHAManager);
}
placementMetrics = SCMContainerPlacementMetrics.create();
@@ -1027,7 +1027,7 @@ public final class StorageContainerManager extends
ServiceRuntimeInfoImpl
* @return - if the current scm is the leader.
*/
public boolean checkLeader() {
- return scmHAManager.isLeader();
+ return scmHAManager.isLeader().isPresent();
}
/**
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
index ac58438..ab329a5 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
@@ -79,8 +80,8 @@ public final class MockSCMHAManager implements SCMHAManager {
* {@inheritDoc}
*/
@Override
- public boolean isLeader() {
- return isLeader;
+ public Optional<Long> isLeader() {
+ return isLeader ? Optional.of((long)0) : Optional.empty();
}
public void setIsLeader(boolean isLeader) {
diff --git a/pom.xml b/pom.xml
index 05c34d5..d7f9a06 100644
--- a/pom.xml
+++ b/pom.xml
@@ -79,7 +79,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xs
<declared.ozone.version>${ozone.version}</declared.ozone.version>
<!-- Apache Ratis version -->
- <ratis.version>1.1.0-913f5a4-SNAPSHOT</ratis.version>
+ <ratis.version>1.1.0-4573fb7-SNAPSHOT</ratis.version>
<!-- Apache Ratis thirdparty version -->
<ratis.thirdparty.version>0.6.0-SNAPSHOT</ratis.thirdparty.version>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]