This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new f30275ca60 HDDS-9823. Pipeline failure should trigger heartbeat
immediately (#5725)
f30275ca60 is described below
commit f30275ca605c596b8675d7c3e35d248ee21de470
Author: Ivan Andika <[email protected]>
AuthorDate: Mon Jan 29 15:12:22 2024 +0800
HDDS-9823. Pipeline failure should trigger heartbeat immediately (#5725)
---
.../transport/server/ratis/XceiverServerRatis.java | 61 +++++++++++++++-------
1 file changed, 42 insertions(+), 19 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 6d119b17b3..fcc611ea3f 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -25,12 +25,11 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
-import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
@@ -126,6 +125,27 @@ import static
org.apache.ratis.util.Preconditions.assertTrue;
public final class XceiverServerRatis implements XceiverServerSpi {
private static final Logger LOG = LoggerFactory
.getLogger(XceiverServerRatis.class);
+
+ private static class ActivePipelineContext {
+ /** The current datanode is the current leader of the pipeline. */
+ private final boolean isPipelineLeader;
+ /** The heartbeat containing pipeline close action has been triggered. */
+ private final boolean isPendingClose;
+
+ ActivePipelineContext(boolean isPipelineLeader, boolean isPendingClose) {
+ this.isPipelineLeader = isPipelineLeader;
+ this.isPendingClose = isPendingClose;
+ }
+
+ public boolean isPipelineLeader() {
+ return isPipelineLeader;
+ }
+
+ public boolean isPendingClose() {
+ return isPendingClose;
+ }
+ }
+
private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
private static final List<Integer> DEFAULT_PRIORITY_LIST =
new ArrayList<>(
@@ -151,11 +171,8 @@ public final class XceiverServerRatis implements
XceiverServerSpi {
private final ConfigurationSource conf;
// TODO: Remove the gids set when Ratis supports an api to query active
// pipelines
- private final Set<RaftGroupId> raftGids = ConcurrentHashMap.newKeySet();
+ private final ConcurrentMap<RaftGroupId, ActivePipelineContext>
activePipelines = new ConcurrentHashMap<>();
private final RaftPeerId raftPeerId;
- // pipelines for which I am the leader
- private final Map<RaftGroupId, Boolean> groupLeaderMap =
- new ConcurrentHashMap<>();
// Timeout used while calling submitRequest directly.
private final long requestTimeout;
private final boolean shouldDeleteRatisLogDirectory;
@@ -731,11 +748,11 @@ public final class XceiverServerRatis implements
XceiverServerSpi {
}
triggerPipelineClose(groupId, msg,
- ClosePipelineInfo.Reason.PIPELINE_FAILED, false);
+ ClosePipelineInfo.Reason.PIPELINE_FAILED);
}
private void triggerPipelineClose(RaftGroupId groupId, String detail,
- ClosePipelineInfo.Reason reasonCode, boolean triggerHB) {
+ ClosePipelineInfo.Reason reasonCode) {
PipelineID pipelineID = PipelineID.valueOf(groupId.getUuid());
ClosePipelineInfo.Builder closePipelineInfo =
ClosePipelineInfo.newBuilder()
@@ -749,9 +766,12 @@ public final class XceiverServerRatis implements
XceiverServerSpi {
.build();
if (context != null) {
context.addPipelineActionIfAbsent(action);
- // wait for the next HB timeout or right away?
- if (triggerHB) {
+ if (!activePipelines.get(groupId).isPendingClose()) {
+ // if pipeline close action has not been triggered before, we need
trigger pipeline close immediately to
+ // prevent SCM to allocate blocks on the failed pipeline
context.getParent().triggerHeartbeat();
+ activePipelines.computeIfPresent(groupId,
+ (key, value) -> new
ActivePipelineContext(value.isPipelineLeader(), true));
}
}
LOG.error("pipeline Action {} on pipeline {}.Reason : {}",
@@ -761,7 +781,7 @@ public final class XceiverServerRatis implements
XceiverServerSpi {
@Override
public boolean isExist(HddsProtos.PipelineID pipelineId) {
- return raftGids.contains(
+ return activePipelines.containsKey(
RaftGroupId.valueOf(PipelineID.getFromProtobuf(pipelineId).getId()));
}
@@ -785,9 +805,11 @@ public final class XceiverServerRatis implements
XceiverServerSpi {
for (RaftGroupId groupId : gids) {
HddsProtos.PipelineID pipelineID = PipelineID
.valueOf(groupId.getUuid()).getProtobuf();
+ boolean isLeader = activePipelines.getOrDefault(groupId,
+ new ActivePipelineContext(false, false)).isPipelineLeader();
reports.add(PipelineReport.newBuilder()
.setPipelineID(pipelineID)
- .setIsLeader(groupLeaderMap.getOrDefault(groupId, Boolean.FALSE))
+ .setIsLeader(isLeader)
.setBytesWritten(calculatePipelineBytesWritten(pipelineID))
.build());
}
@@ -877,7 +899,7 @@ public final class XceiverServerRatis implements
XceiverServerSpi {
"Ratis Transaction failure in datanode " + dnId + " with role " + role
+ " .Triggering pipeline close action.";
triggerPipelineClose(groupId, msg,
- ClosePipelineInfo.Reason.STATEMACHINE_TRANSACTION_FAILED, true);
+ ClosePipelineInfo.Reason.STATEMACHINE_TRANSACTION_FAILED);
}
/**
* The fact that the snapshot contents cannot be used to actually catch up
@@ -913,7 +935,7 @@ public final class XceiverServerRatis implements
XceiverServerSpi {
: t.getMessage();
triggerPipelineClose(groupId, msg,
- ClosePipelineInfo.Reason.PIPELINE_LOG_FAILED, true);
+ ClosePipelineInfo.Reason.PIPELINE_LOG_FAILED);
}
public long getMinReplicatedIndex(PipelineID pipelineID) throws IOException {
@@ -930,13 +952,12 @@ public final class XceiverServerRatis implements
XceiverServerSpi {
}
public void notifyGroupRemove(RaftGroupId gid) {
- raftGids.remove(gid);
- // Remove any entries for group leader map
- groupLeaderMap.remove(gid);
+ // Remove Group ID entry from the active pipeline map
+ activePipelines.remove(gid);
}
void notifyGroupAdd(RaftGroupId gid) {
- raftGids.add(gid);
+ activePipelines.put(gid, new ActivePipelineContext(false, false));
sendPipelineReport();
}
@@ -946,7 +967,9 @@ public final class XceiverServerRatis implements
XceiverServerSpi {
"leaderId: {}", groupMemberId.getGroupId(), raftPeerId1);
// Save the reported leader to be sent with the report to SCM
boolean leaderForGroup = this.raftPeerId.equals(raftPeerId1);
- groupLeaderMap.put(groupMemberId.getGroupId(), leaderForGroup);
+ activePipelines.compute(groupMemberId.getGroupId(),
+ (key, value) -> value == null ? new
ActivePipelineContext(leaderForGroup, false) :
+ new ActivePipelineContext(leaderForGroup, value.isPendingClose()));
if (context != null && leaderForGroup) {
// Publish new report from leader
sendPipelineReport();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]