This is an automated email from the ASF dual-hosted git repository.

licheng pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-2823 by this push:
     new 0aa9ba3  HDDS-3988: DN can distinguish SCMCommand from stale leader 
SCM (#1314)
0aa9ba3 is described below

commit 0aa9ba3e354193855e43b488d325cd64137c5648
Author: GlenGeng <[email protected]>
AuthorDate: Mon Dec 7 13:10:39 2020 +0800

    HDDS-3988: DN can distinguish SCMCommand from stale leader SCM (#1314)
    
    * HDDS-3988: DN can distinguish SCMCommand from stale leader SCM.
    
    * HDDS-3988: fix comments
---
 .../common/statemachine/StateContext.java          | 94 +++++++++++++++++++++-
 .../states/endpoint/HeartbeatEndpointTask.java     | 18 +++++
 .../ozone/container/ozoneimpl/OzoneContainer.java  |  8 ++
 .../hadoop/ozone/protocol/commands/SCMCommand.java | 22 ++++-
 .../proto/ScmServerDatanodeHeartbeatProtocol.proto |  6 ++
 .../apache/hadoop/hdds/scm/ha/SCMHAManager.java    |  9 ++-
 .../hadoop/hdds/scm/ha/SCMHAManagerImpl.java       | 26 +++---
 .../hadoop/hdds/scm/node/SCMNodeManager.java       | 29 ++++++-
 .../hdds/scm/pipeline/PipelineManagerV2Impl.java   | 11 ++-
 .../hdds/scm/server/SCMDatanodeProtocolServer.java |  5 ++
 .../hdds/scm/server/StorageContainerManager.java   |  4 +-
 .../hadoop/hdds/scm/ha/MockSCMHAManager.java       |  5 +-
 pom.xml                                            |  2 +-
 13 files changed, 208 insertions(+), 31 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 4cd769f..f39755f 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -23,6 +23,7 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -31,6 +32,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -81,6 +83,18 @@ public class StateContext {
   private final AtomicLong threadPoolNotAvailableCount;
 
   /**
+   * term of latest leader SCM, extract from SCMCommand.
+   *
+   * Only leader SCM (both latest and stale) can send out SCMCommand,
+   * which will save its term in SCMCommand. Since latest leader SCM
+   * always has the highest term, term can be used to detect SCMCommand
+   * from stale leader SCM.
+   *
+   * For non-HA mode, term of SCMCommand will be 0.
+   */
+  private Optional<Long> termOfLeaderSCM = Optional.empty();
+
+  /**
    * Starting with a 2 sec heartbeat frequency which will be updated to the
    * real HB frequency after scm registration. With this method the
    * initial registration could be significant faster.
@@ -471,6 +485,65 @@ public class StateContext {
   }
 
   /**
+   * After startup, datanode needs detect latest leader SCM before handling
+   * any SCMCommand, so that it won't be disturbed by stale leader SCM.
+   *
+   * The rule is: after majority SCMs are in HEARTBEAT state and has
+   * heard from leader SCMs (commandQueue is not empty), datanode will init
+   * termOfLeaderSCM with the max term found in commandQueue.
+   *
+   * The init process also works for non-HA mode. In that case, term of all
+   * SCMCommands will be 0.
+   */
+  private void initTermOfLeaderSCM() {
+    // only init once
+    if (termOfLeaderSCM.isPresent()) {
+      return;
+    }
+
+    AtomicInteger scmNum = new AtomicInteger(0);
+    AtomicInteger activeScmNum = new AtomicInteger(0);
+
+    getParent().getConnectionManager().getValues()
+        .forEach(endpoint -> {
+          if (endpoint.isPassive()) {
+            return;
+          }
+          scmNum.incrementAndGet();
+          if (endpoint.getState()
+              == EndpointStateMachine.EndPointStates.HEARTBEAT) {
+            activeScmNum.incrementAndGet();
+          }
+        });
+
+    // majority SCMs should be in HEARTBEAT state.
+    if (activeScmNum.get() < scmNum.get() / 2 + 1) {
+      return;
+    }
+
+    // if commandQueue is not empty, init termOfLeaderSCM
+    // with the largest term found in commandQueue
+    commandQueue.stream()
+        .mapToLong(SCMCommand::getTerm)
+        .max()
+        .ifPresent(term -> termOfLeaderSCM = Optional.of(term));
+  }
+
+  /**
+   * monotonically increase termOfLeaderSCM.
+   * Always record the latest term that has seen.
+   */
+  private void updateTermOfLeaderSCM(SCMCommand<?> command) {
+    if (!termOfLeaderSCM.isPresent()) {
+      LOG.error("should init termOfLeaderSCM before update it.");
+      return;
+    }
+
+    termOfLeaderSCM = Optional.of(
+        Long.max(termOfLeaderSCM.get(), command.getTerm()));
+  }
+
+  /**
    * Returns the next command or null if it is empty.
    *
    * @return SCMCommand or Null.
@@ -478,7 +551,26 @@ public class StateContext {
   public SCMCommand getNextCommand() {
     lock.lock();
     try {
-      return commandQueue.poll();
+      initTermOfLeaderSCM();
+      if (!termOfLeaderSCM.isPresent()) {
+        return null;      // not ready yet
+      }
+
+      while (true) {
+        SCMCommand<?> command = commandQueue.poll();
+        if (command == null) {
+          return null;
+        }
+
+        updateTermOfLeaderSCM(command);
+        if (command.getTerm() == termOfLeaderSCM.get()) {
+          return command;
+        }
+
+        LOG.warn("Detect and drop a SCMCommand {} from stale leader SCM," +
+            " stale term {}, latest term {}.",
+            command, command.getTerm(), termOfLeaderSCM.get());
+      }
     } finally {
       lock.unlock();
     }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index da2034d..eac7b37 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -272,6 +272,9 @@ public class HeartbeatEndpointTask
         DeleteBlocksCommand db = DeleteBlocksCommand
             .getFromProtobuf(
                 commandResponseProto.getDeleteBlocksCommandProto());
+        if (commandResponseProto.hasTerm()) {
+          db.setTerm(commandResponseProto.getTerm());
+        }
         if (!db.blocksTobeDeleted().isEmpty()) {
           if (LOG.isDebugEnabled()) {
             LOG.debug(DeletedContainerBlocksSummary
@@ -285,6 +288,9 @@ public class HeartbeatEndpointTask
         CloseContainerCommand closeContainer =
             CloseContainerCommand.getFromProtobuf(
                 commandResponseProto.getCloseContainerCommandProto());
+        if (commandResponseProto.hasTerm()) {
+          closeContainer.setTerm(commandResponseProto.getTerm());
+        }
         if (LOG.isDebugEnabled()) {
           LOG.debug("Received SCM container close request for container {}",
               closeContainer.getContainerID());
@@ -295,6 +301,9 @@ public class HeartbeatEndpointTask
         ReplicateContainerCommand replicateContainerCommand =
             ReplicateContainerCommand.getFromProtobuf(
                 commandResponseProto.getReplicateContainerCommandProto());
+        if (commandResponseProto.hasTerm()) {
+          replicateContainerCommand.setTerm(commandResponseProto.getTerm());
+        }
         if (LOG.isDebugEnabled()) {
           LOG.debug("Received SCM container replicate request for container 
{}",
               replicateContainerCommand.getContainerID());
@@ -305,6 +314,9 @@ public class HeartbeatEndpointTask
         DeleteContainerCommand deleteContainerCommand =
             DeleteContainerCommand.getFromProtobuf(
                 commandResponseProto.getDeleteContainerCommandProto());
+        if (commandResponseProto.hasTerm()) {
+          deleteContainerCommand.setTerm(commandResponseProto.getTerm());
+        }
         if (LOG.isDebugEnabled()) {
           LOG.debug("Received SCM delete container request for container {}",
               deleteContainerCommand.getContainerID());
@@ -315,6 +327,9 @@ public class HeartbeatEndpointTask
         CreatePipelineCommand createPipelineCommand =
             CreatePipelineCommand.getFromProtobuf(
                 commandResponseProto.getCreatePipelineCommandProto());
+        if (commandResponseProto.hasTerm()) {
+          createPipelineCommand.setTerm(commandResponseProto.getTerm());
+        }
         if (LOG.isDebugEnabled()) {
           LOG.debug("Received SCM create pipeline request {}",
               createPipelineCommand.getPipelineID());
@@ -325,6 +340,9 @@ public class HeartbeatEndpointTask
         ClosePipelineCommand closePipelineCommand =
             ClosePipelineCommand.getFromProtobuf(
                 commandResponseProto.getClosePipelineCommandProto());
+        if (commandResponseProto.hasTerm()) {
+          closePipelineCommand.setTerm(commandResponseProto.getTerm());
+        }
         if (LOG.isDebugEnabled()) {
           LOG.debug("Received SCM close pipeline request {}",
               closePipelineCommand.getPipelineID());
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index a44ef38..5fd1690 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -85,6 +86,7 @@ public class OzoneContainer {
   private List<ContainerDataScanner> dataScanners;
   private final BlockDeletingService blockDeletingService;
   private final GrpcTlsConfig tlsClientConfig;
+  private final AtomicBoolean isStarted;
 
   /**
    * Construct OzoneContainer object.
@@ -152,6 +154,8 @@ public class OzoneContainer {
             TimeUnit.MILLISECONDS, config);
     tlsClientConfig = RatisHelper.createTlsClientConfig(
         secConf, certClient != null ? certClient.getCACertificate() : null);
+
+    isStarted = new AtomicBoolean(false);
   }
 
   public GrpcTlsConfig getTlsClientConfig() {
@@ -240,6 +244,10 @@ public class OzoneContainer {
    * @throws IOException
    */
   public void start(String scmId) throws IOException {
+    if (!isStarted.compareAndSet(false, true)) {
+      LOG.info("Ignore. OzoneContainer already started.");
+      return;
+    }
     LOG.info("Attempting to start container services.");
     startContainerScrub();
     writeChannel.start();
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
index 3c4e05b..4d87bb0 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
@@ -30,7 +30,13 @@ import 
org.apache.hadoop.hdds.server.events.IdentifiableEventPayload;
  */
 public abstract class SCMCommand<T extends GeneratedMessage> implements
     IdentifiableEventPayload {
-  private long id;
+  private final long id;
+
+  // Under HA mode, holds term of underlying RaftServer iff current
+  // SCM is a leader, otherwise, holds term 0.
+  // Notes that, the first elected leader is from term 1, term 0,
+  // as the initial value of currentTerm, is never used under HA mode.
+  private long term = 0;
 
   SCMCommand() {
     this.id = HddsIdFactory.getLongId();
@@ -59,4 +65,18 @@ public abstract class SCMCommand<T extends GeneratedMessage> 
implements
     return id;
   }
 
+  /**
+   * Get term of this command.
+   * @return term
+   */
+  public long getTerm() {
+    return term;
+  }
+
+  /**
+   * Set term of this command.
+   */
+  public void setTerm(long term) {
+    this.term = term;
+  }
 }
diff --git 
a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
 
b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
index 4f610ff..973789a 100644
--- 
a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
+++ 
b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
@@ -303,6 +303,12 @@ message SCMCommandProto {
   optional ReplicateContainerCommandProto replicateContainerCommandProto = 6;
   optional CreatePipelineCommandProto createPipelineCommandProto = 7;
   optional ClosePipelineCommandProto closePipelineCommandProto = 8;
+
+  // Under HA mode, holds term of underlying RaftServer iff current
+  // SCM is a leader, otherwise, holds term 0.
+  // Notes that, the first elected leader is from term 1, term 0,
+  // as the initial value of currentTerm, is never used under HA mode.
+  optional uint64 term = 15;
 }
 
 /**
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
index 8ee26a2..0fd5e82 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
@@ -22,6 +22,7 @@ import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
 
 import java.io.IOException;
+import java.util.Optional;
 
 /**
  * SCMHAManager provides HA service for SCM.
@@ -34,9 +35,13 @@ public interface SCMHAManager {
   void start() throws IOException;
 
   /**
-   * Returns true if the current SCM is the leader.
+   * For HA mode, return an Optional that holds term of the
+   * underlying RaftServer iff current SCM is in leader role.
+   * Otherwise, return an empty optional.
+   *
+   * For non-HA mode, return an Optional that holds term 0.
    */
-  boolean isLeader();
+  Optional<Long> isLeader();
 
   /**
    * Returns RatisServer instance associated with the SCM instance.
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
index 33f408d..5271ac6 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
@@ -21,6 +21,7 @@ import com.google.common.base.Preconditions;
 import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
@@ -32,6 +33,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Optional;
 
 /**
  * SCMHAManagerImpl uses Apache Ratis for HA implementation. We will have 2N+1
@@ -70,29 +72,28 @@ public class SCMHAManagerImpl implements SCMHAManager {
    * {@inheritDoc}
    */
   @Override
-  public boolean isLeader() {
+  public Optional<Long> isLeader() {
     if (!SCMHAUtils.isSCMHAEnabled(conf)) {
       // When SCM HA is not enabled, the current SCM is always the leader.
-      return true;
+      return Optional.of((long)0);
     }
     RaftServer server = ratisServer.getServer();
     Preconditions.checkState(server instanceof RaftServerProxy);
-    RaftServerImpl serverImpl = null;
     try {
       // SCM only has one raft group.
-      serverImpl = ((RaftServerProxy) server)
+      RaftServerImpl serverImpl = ((RaftServerProxy) server)
           .getImpl(ratisServer.getRaftGroupId());
       if (serverImpl != null) {
-        // Only when it's sure the current SCM is the leader, otherwise
-        // it should all return false.
-        return serverImpl.isLeader();
+        RaftProtos.RoleInfoProto roleInfoProto = serverImpl.getRoleInfoProto();
+        return roleInfoProto.hasLeaderInfo()
+            ? Optional.of(roleInfoProto.getLeaderInfo().getTerm())
+            : Optional.empty();
       }
     } catch (IOException ioe) {
       LOG.error("Fail to get RaftServer impl and therefore it's not clear " +
           "whether it's leader. ", ioe);
     }
-
-    return false;
+    return Optional.empty();
   }
 
   /**
@@ -104,11 +105,6 @@ public class SCMHAManagerImpl implements SCMHAManager {
   }
 
   private RaftPeerId getPeerIdFromRoleInfo(RaftServerImpl serverImpl) {
-    /*
-      TODO: Fix Me
-              Ratis API has changed.
-              RaftServerImpl#getRoleInfoProto is no more public.
-
     if (serverImpl.isLeader()) {
       return RaftPeerId.getRaftPeerId(
           serverImpl.getRoleInfoProto().getLeaderInfo().toString());
@@ -119,8 +115,6 @@ public class SCMHAManagerImpl implements SCMHAManager {
     } else {
       return null;
     }
-     */
-    return null;
   }
 
   @Override
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 328f271..89fd99e 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.Collections;
@@ -47,6 +48,7 @@ import org.apache.hadoop.hdds.scm.VersionInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
 import org.apache.hadoop.hdds.scm.net.NetworkTopology;
 import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
@@ -106,13 +108,16 @@ public class SCMNodeManager implements NodeManager {
       new ConcurrentHashMap<>();
   private final int numPipelinesPerMetadataVolume;
   private final int heavyNodeCriteria;
+  private final SCMHAManager scmhaManager;
 
   /**
    * Constructs SCM machine Manager.
    */
   public SCMNodeManager(OzoneConfiguration conf,
-      SCMStorageConfig scmStorageConfig, EventPublisher eventPublisher,
-      NetworkTopology networkTopology) {
+                        SCMStorageConfig scmStorageConfig,
+                        EventPublisher eventPublisher,
+                        NetworkTopology networkTopology,
+                        SCMHAManager scmhaManager) {
     this.nodeStateManager = new NodeStateManager(conf, eventPublisher);
     this.version = VersionInfo.getLatestVersion();
     this.commandQueue = new CommandQueue();
@@ -138,6 +143,14 @@ public class SCMNodeManager implements NodeManager {
             ScmConfigKeys.OZONE_SCM_PIPELINE_PER_METADATA_VOLUME_DEFAULT);
     String dnLimit = conf.get(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT);
     this.heavyNodeCriteria = dnLimit == null ? 0 : Integer.parseInt(dnLimit);
+    this.scmhaManager = scmhaManager;
+  }
+
+  public SCMNodeManager(OzoneConfiguration conf,
+                        SCMStorageConfig scmStorageConfig,
+                        EventPublisher eventPublisher,
+                        NetworkTopology networkTopology) {
+    this(conf, scmStorageConfig, eventPublisher, networkTopology, null);
   }
 
   private void registerMXBean() {
@@ -658,6 +671,18 @@ public class SCMNodeManager implements NodeManager {
   // Refactor and remove all the usage of this method and delete this method.
   @Override
   public void addDatanodeCommand(UUID dnId, SCMCommand command) {
+    if (scmhaManager != null && command.getTerm() == 0) {
+      Optional<Long> termOpt = scmhaManager.isLeader();
+
+      if (!termOpt.isPresent()) {
+        LOG.warn("Not leader, drop SCMCommand {}.", command);
+        return;
+      }
+
+      LOG.warn("Help set term {} for SCMCommand {}. It is not an accurate " +
+          "way to set term of SCMCommand.", termOpt.get(), command);
+      command.setTerm(termOpt.get());
+    }
     this.commandQueue.addCommand(dnId, command);
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
index 041c941..48fbdbf 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
@@ -49,6 +49,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -637,13 +638,15 @@ public final class PipelineManagerV2Impl implements 
PipelineManager {
   }
 
   /**
-   * Check if scm is current leader.
-   * @throws NotLeaderException when it's not the current leader.
+   * return term of underlying RaftServer if role of SCM is leader.
+   * @throws NotLeaderException when it's not leader.
    */
-  private void checkLeader() throws NotLeaderException {
-    if (!scmhaManager.isLeader()) {
+  private long checkLeader() throws NotLeaderException {
+    Optional<Long> termOpt = scmhaManager.isLeader();
+    if (!termOpt.isPresent()) {
       throw scmhaManager.triggerNotLeaderException();
     }
+    return termOpt.get();
   }
 
   private void setBackgroundPipelineCreator(
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
index a295341..b71f906 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
@@ -292,6 +292,11 @@ public class SCMDatanodeProtocolServer implements
       throws IOException {
     SCMCommandProto.Builder builder =
         SCMCommandProto.newBuilder();
+
+    // In HA mode, it is the term of current leader SCM.
+    // In non-HA mode, it is the default value 0.
+    builder.setTerm(cmd.getTerm());
+
     switch (cmd.getType()) {
     case reregisterCommand:
       return builder
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 5843d5a..501472d 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -425,7 +425,7 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
       scmNodeManager = configurator.getScmNodeManager();
     } else {
       scmNodeManager = new SCMNodeManager(
-          conf, scmStorageConfig, eventQueue, clusterMap);
+          conf, scmStorageConfig, eventQueue, clusterMap, scmHAManager);
     }
 
     placementMetrics = SCMContainerPlacementMetrics.create();
@@ -1027,7 +1027,7 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
    * @return - if the current scm is the leader.
    */
   public boolean checkLeader() {
-    return scmHAManager.isLeader();
+    return scmHAManager.isLeader().isPresent();
   }
 
   /**
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
index ac58438..ab329a5 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
 import java.util.EnumMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
@@ -79,8 +80,8 @@ public final class MockSCMHAManager implements SCMHAManager {
    * {@inheritDoc}
    */
   @Override
-  public boolean isLeader() {
-    return isLeader;
+  public Optional<Long> isLeader() {
+    return isLeader ? Optional.of((long)0) : Optional.empty();
   }
 
   public void setIsLeader(boolean isLeader) {
diff --git a/pom.xml b/pom.xml
index 05c34d5..d7f9a06 100644
--- a/pom.xml
+++ b/pom.xml
@@ -79,7 +79,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xs
     <declared.ozone.version>${ozone.version}</declared.ozone.version>
 
     <!-- Apache Ratis version -->
-    <ratis.version>1.1.0-913f5a4-SNAPSHOT</ratis.version>
+    <ratis.version>1.1.0-4573fb7-SNAPSHOT</ratis.version>
 
     <!-- Apache Ratis thirdparty version -->
     <ratis.thirdparty.version>0.6.0-SNAPSHOT</ratis.thirdparty.version>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to