This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new f30275ca60 HDDS-9823. Pipeline failure should trigger heartbeat 
immediately (#5725)
f30275ca60 is described below

commit f30275ca605c596b8675d7c3e35d248ee21de470
Author: Ivan Andika <[email protected]>
AuthorDate: Mon Jan 29 15:12:22 2024 +0800

    HDDS-9823. Pipeline failure should trigger heartbeat immediately (#5725)
---
 .../transport/server/ratis/XceiverServerRatis.java | 61 +++++++++++++++-------
 1 file changed, 42 insertions(+), 19 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 6d119b17b3..fcc611ea3f 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -25,12 +25,11 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingDeque;
@@ -126,6 +125,27 @@ import static 
org.apache.ratis.util.Preconditions.assertTrue;
 public final class XceiverServerRatis implements XceiverServerSpi {
   private static final Logger LOG = LoggerFactory
       .getLogger(XceiverServerRatis.class);
+
+  private static class ActivePipelineContext {
+    /** The current datanode is the current leader of the pipeline. */
+    private final boolean isPipelineLeader;
+    /** The heartbeat containing pipeline close action has been triggered. */
+    private final boolean isPendingClose;
+
+    ActivePipelineContext(boolean isPipelineLeader, boolean isPendingClose) {
+      this.isPipelineLeader = isPipelineLeader;
+      this.isPendingClose = isPendingClose;
+    }
+
+    public boolean isPipelineLeader() {
+      return isPipelineLeader;
+    }
+
+    public boolean isPendingClose() {
+      return isPendingClose;
+    }
+  }
+
   private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
   private static final List<Integer> DEFAULT_PRIORITY_LIST =
       new ArrayList<>(
@@ -151,11 +171,8 @@ public final class XceiverServerRatis implements 
XceiverServerSpi {
   private final ConfigurationSource conf;
   // TODO: Remove the gids set when Ratis supports an api to query active
   // pipelines
-  private final Set<RaftGroupId> raftGids = ConcurrentHashMap.newKeySet();
+  private final ConcurrentMap<RaftGroupId, ActivePipelineContext> 
activePipelines = new ConcurrentHashMap<>();
   private final RaftPeerId raftPeerId;
-  // pipelines for which I am the leader
-  private final Map<RaftGroupId, Boolean> groupLeaderMap =
-      new ConcurrentHashMap<>();
   // Timeout used while calling submitRequest directly.
   private final long requestTimeout;
   private final boolean shouldDeleteRatisLogDirectory;
@@ -731,11 +748,11 @@ public final class XceiverServerRatis implements 
XceiverServerSpi {
     }
 
     triggerPipelineClose(groupId, msg,
-        ClosePipelineInfo.Reason.PIPELINE_FAILED, false);
+        ClosePipelineInfo.Reason.PIPELINE_FAILED);
   }
 
   private void triggerPipelineClose(RaftGroupId groupId, String detail,
-      ClosePipelineInfo.Reason reasonCode, boolean triggerHB) {
+      ClosePipelineInfo.Reason reasonCode) {
     PipelineID pipelineID = PipelineID.valueOf(groupId.getUuid());
     ClosePipelineInfo.Builder closePipelineInfo =
         ClosePipelineInfo.newBuilder()
@@ -749,9 +766,12 @@ public final class XceiverServerRatis implements 
XceiverServerSpi {
         .build();
     if (context != null) {
       context.addPipelineActionIfAbsent(action);
-      // wait for the next HB timeout or right away?
-      if (triggerHB) {
+      if (!activePipelines.get(groupId).isPendingClose()) {
+        // if pipeline close action has not been triggered before, we need 
trigger pipeline close immediately to
+        // prevent SCM to allocate blocks on the failed pipeline
         context.getParent().triggerHeartbeat();
+        activePipelines.computeIfPresent(groupId,
+            (key, value) -> new 
ActivePipelineContext(value.isPipelineLeader(), true));
       }
     }
     LOG.error("pipeline Action {} on pipeline {}.Reason : {}",
@@ -761,7 +781,7 @@ public final class XceiverServerRatis implements 
XceiverServerSpi {
 
   @Override
   public boolean isExist(HddsProtos.PipelineID pipelineId) {
-    return raftGids.contains(
+    return activePipelines.containsKey(
         RaftGroupId.valueOf(PipelineID.getFromProtobuf(pipelineId).getId()));
   }
 
@@ -785,9 +805,11 @@ public final class XceiverServerRatis implements 
XceiverServerSpi {
       for (RaftGroupId groupId : gids) {
         HddsProtos.PipelineID pipelineID = PipelineID
             .valueOf(groupId.getUuid()).getProtobuf();
+        boolean isLeader = activePipelines.getOrDefault(groupId,
+            new ActivePipelineContext(false, false)).isPipelineLeader();
         reports.add(PipelineReport.newBuilder()
             .setPipelineID(pipelineID)
-            .setIsLeader(groupLeaderMap.getOrDefault(groupId, Boolean.FALSE))
+            .setIsLeader(isLeader)
             .setBytesWritten(calculatePipelineBytesWritten(pipelineID))
             .build());
       }
@@ -877,7 +899,7 @@ public final class XceiverServerRatis implements 
XceiverServerSpi {
         "Ratis Transaction failure in datanode " + dnId + " with role " + role
             + " .Triggering pipeline close action.";
     triggerPipelineClose(groupId, msg,
-        ClosePipelineInfo.Reason.STATEMACHINE_TRANSACTION_FAILED, true);
+        ClosePipelineInfo.Reason.STATEMACHINE_TRANSACTION_FAILED);
   }
   /**
    * The fact that the snapshot contents cannot be used to actually catch up
@@ -913,7 +935,7 @@ public final class XceiverServerRatis implements 
XceiverServerSpi {
         : t.getMessage();
 
     triggerPipelineClose(groupId, msg,
-        ClosePipelineInfo.Reason.PIPELINE_LOG_FAILED, true);
+        ClosePipelineInfo.Reason.PIPELINE_LOG_FAILED);
   }
 
   public long getMinReplicatedIndex(PipelineID pipelineID) throws IOException {
@@ -930,13 +952,12 @@ public final class XceiverServerRatis implements 
XceiverServerSpi {
   }
 
   public void notifyGroupRemove(RaftGroupId gid) {
-    raftGids.remove(gid);
-    // Remove any entries for group leader map
-    groupLeaderMap.remove(gid);
+    // Remove Group ID entry from the active pipeline map
+    activePipelines.remove(gid);
   }
 
   void notifyGroupAdd(RaftGroupId gid) {
-    raftGids.add(gid);
+    activePipelines.put(gid, new ActivePipelineContext(false, false));
     sendPipelineReport();
   }
 
@@ -946,7 +967,9 @@ public final class XceiverServerRatis implements 
XceiverServerSpi {
         "leaderId: {}", groupMemberId.getGroupId(), raftPeerId1);
     // Save the reported leader to be sent with the report to SCM
     boolean leaderForGroup = this.raftPeerId.equals(raftPeerId1);
-    groupLeaderMap.put(groupMemberId.getGroupId(), leaderForGroup);
+    activePipelines.compute(groupMemberId.getGroupId(),
+        (key, value) -> value == null ? new 
ActivePipelineContext(leaderForGroup, false) :
+            new ActivePipelineContext(leaderForGroup, value.isPendingClose()));
     if (context != null && leaderForGroup) {
       // Publish new report from leader
       sendPipelineReport();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to