This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new b0943d5f86 HDDS-11501. Improve logging in XceiverServerRatis (#7252)
b0943d5f86 is described below
commit b0943d5f869ff2cdcd5ebe256206bfb3ad62fbfd
Author: jianghuazhu <[email protected]>
AuthorDate: Tue Oct 1 12:37:02 2024 +0800
HDDS-11501. Improve logging in XceiverServerRatis (#7252)
---
.../server/ratis/ContainerStateMachine.java | 5 +-
.../transport/server/ratis/XceiverServerRatis.java | 88 ++++++++++------------
2 files changed, 44 insertions(+), 49 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 873096a024..be566f84fc 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -83,6 +83,7 @@ import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.protocol.TermIndex;
@@ -1162,8 +1163,8 @@ public class ContainerStateMachine extends
BaseStateMachine {
}
@Override
- public void notifyFollowerSlowness(RoleInfoProto roleInfoProto) {
- ratisServer.handleNodeSlowness(gid, roleInfoProto);
+ public void notifyFollowerSlowness(RoleInfoProto roleInfoProto, RaftPeer
follower) {
+ ratisServer.handleFollowerSlowness(gid, roleInfoProto, follower);
}
@Override
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 7899cdcc0e..a4c1434398 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -104,6 +104,7 @@ import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.TraditionalBinaryPrefix;
@@ -161,19 +162,18 @@ public final class XceiverServerRatis implements
XceiverServerSpi {
private int clientPort;
private int dataStreamPort;
private final RaftServer server;
+ private final String name;
private final List<ThreadPoolExecutor> chunkExecutors;
private final ContainerDispatcher dispatcher;
private final ContainerController containerController;
private final ClientId clientId = ClientId.randomId();
private final StateContext context;
- private final long nodeFailureTimeoutMs;
private boolean isStarted = false;
private final DatanodeDetails datanodeDetails;
private final ConfigurationSource conf;
// TODO: Remove the gids set when Ratis supports an api to query active
// pipelines
private final ConcurrentMap<RaftGroupId, ActivePipelineContext>
activePipelines = new ConcurrentHashMap<>();
- private final RaftPeerId raftPeerId;
// Timeout used while calling submitRequest directly.
private final long requestTimeout;
private final boolean shouldDeleteRatisLogDirectory;
@@ -197,14 +197,14 @@ public final class XceiverServerRatis implements
XceiverServerSpi {
this.context = context;
this.dispatcher = dispatcher;
this.containerController = containerController;
- this.raftPeerId = RatisHelper.toRaftPeerId(dd);
String threadNamePrefix = datanodeDetails.threadNamePrefix();
chunkExecutors = createChunkExecutors(conf, threadNamePrefix);
- nodeFailureTimeoutMs = ratisServerConfig.getFollowerSlownessTimeout();
shouldDeleteRatisLogDirectory =
ratisServerConfig.shouldDeleteRatisLogDirectory();
RaftProperties serverProperties = newRaftProperties();
+ final RaftPeerId raftPeerId = RatisHelper.toRaftPeerId(dd);
+ this.name = getClass().getSimpleName() + "(" + raftPeerId + ")";
this.server =
RaftServer.newBuilder().setServerId(raftPeerId)
.setProperties(serverProperties)
@@ -474,7 +474,7 @@ public final class XceiverServerRatis implements
XceiverServerSpi {
// NOTE : the default value for the retry count in ratis is -1,
// which means retry indefinitely.
- int syncTimeoutRetryDefault = (int) nodeFailureTimeoutMs /
+ final int syncTimeoutRetryDefault = (int)
ratisServerConfig.getFollowerSlownessTimeout() /
dataSyncTimeout.toIntExact(TimeUnit.MILLISECONDS);
int numSyncRetries = conf.getInt(
OzoneConfigKeys.HDDS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_RETRIES,
@@ -558,7 +558,7 @@ public final class XceiverServerRatis implements
XceiverServerSpi {
@Override
public void start() throws IOException {
if (!isStarted) {
- LOG.info("Starting {} {}", getClass().getSimpleName(), server.getId());
+ LOG.info("Starting {}", name);
for (ThreadPoolExecutor executor : chunkExecutors) {
executor.prestartAllCoreThreads();
}
@@ -581,11 +581,11 @@ public final class XceiverServerRatis implements
XceiverServerSpi {
}
}
- private int getRealPort(InetSocketAddress address, Port.Name name) {
+ private int getRealPort(InetSocketAddress address, Port.Name portName) {
int realPort = address.getPort();
- datanodeDetails.setPort(DatanodeDetails.newPort(name, realPort));
- LOG.info("{} {} is started using port {} for {}",
- getClass().getSimpleName(), server.getId(), realPort, name);
+ final Port port = DatanodeDetails.newPort(portName, realPort);
+ datanodeDetails.setPort(port);
+ LOG.info("{} is started using port {}", name, port);
return realPort;
}
@@ -593,7 +593,7 @@ public final class XceiverServerRatis implements
XceiverServerSpi {
public void stop() {
if (isStarted) {
try {
- LOG.info("Stopping {} {}", getClass().getSimpleName(), server.getId());
+ LOG.info("Closing {}", name);
// shutdown server before the executors as while shutting down,
// some of the tasks would be executed using the executors.
server.close();
@@ -602,7 +602,7 @@ public final class XceiverServerRatis implements
XceiverServerSpi {
}
isStarted = false;
} catch (IOException e) {
- LOG.error("XceiverServerRatis Could not be stopped gracefully.", e);
+ LOG.error("Failed to close {}.", name, e);
}
}
}
@@ -706,45 +706,40 @@ public final class XceiverServerRatis implements
XceiverServerSpi {
nextCallId());
}
- private void handlePipelineFailure(RaftGroupId groupId,
- RoleInfoProto roleInfoProto) {
- String msg;
- UUID datanode = RatisHelper.toDatanodeId(roleInfoProto.getSelf());
- RaftPeerId id = RaftPeerId.valueOf(roleInfoProto.getSelf().getId());
+ private void handlePipelineFailure(RaftGroupId groupId, RoleInfoProto
roleInfoProto, String reason) {
+ final RaftPeerId raftPeerId =
RaftPeerId.valueOf(roleInfoProto.getSelf().getId());
+ Preconditions.assertEquals(getServer().getId(), raftPeerId, "raftPeerId");
+ final StringBuilder b = new StringBuilder()
+ .append(name).append(" with datanodeId
").append(RatisHelper.toDatanodeId(raftPeerId))
+ .append("handlePipelineFailure ").append(" for ").append(reason)
+ .append(": ").append(roleInfoProto.getRole())
+ .append(" elapsed
time=").append(roleInfoProto.getRoleElapsedTimeMs()).append("ms");
+
switch (roleInfoProto.getRole()) {
case CANDIDATE:
- msg = datanode + " is in candidate state for " +
- roleInfoProto.getCandidateInfo().getLastLeaderElapsedTimeMs() + "ms";
+ final long lastLeaderElapsedTime =
roleInfoProto.getCandidateInfo().getLastLeaderElapsedTimeMs();
+ b.append(",
lastLeaderElapsedTime=").append(lastLeaderElapsedTime).append("ms");
break;
case FOLLOWER:
- msg = datanode + " closes pipeline when installSnapshot from leader " +
- "because leader snapshot doesn't contain any data to replay, " +
- "all the log entries prior to the snapshot might have been purged." +
- "So follower should not try to install snapshot from leader but" +
- "can close the pipeline here. It's in follower state for " +
- roleInfoProto.getRoleElapsedTimeMs() + "ms";
+ b.append(",
outstandingOp=").append(roleInfoProto.getFollowerInfo().getOutstandingOp());
break;
case LEADER:
- StringBuilder sb = new StringBuilder();
- sb.append(datanode).append(" has not seen follower/s");
- for (RaftProtos.ServerRpcProto follower : roleInfoProto.getLeaderInfo()
- .getFollowerInfoList()) {
- if (follower.getLastRpcElapsedTimeMs() > nodeFailureTimeoutMs) {
- sb.append(" ").append(RatisHelper.toDatanodeId(follower.getId()))
- .append(" for ").append(follower.getLastRpcElapsedTimeMs())
- .append("ms");
- }
+ final long followerSlownessTimeoutMs =
ratisServerConfig.getFollowerSlownessTimeout();
+ for (RaftProtos.ServerRpcProto follower :
roleInfoProto.getLeaderInfo().getFollowerInfoList()) {
+ final long lastRpcElapsedTimeMs = follower.getLastRpcElapsedTimeMs();
+ final boolean slow = lastRpcElapsedTimeMs > followerSlownessTimeoutMs;
+ final RaftPeerId followerId =
RaftPeerId.valueOf(follower.getId().getId());
+ b.append("\n Follower ").append(followerId)
+ .append(" with datanodeId
").append(RatisHelper.toDatanodeId(followerId))
+ .append(" is ").append(slow ? "slow" : " responding")
+ .append(" with
lastRpcElapsedTime=").append(lastRpcElapsedTimeMs).append("ms");
}
- msg = sb.toString();
break;
default:
- LOG.error("unknown state: {}", roleInfoProto.getRole());
- throw new IllegalStateException("node" + id + " is in illegal role "
- + roleInfoProto.getRole());
+ throw new IllegalStateException("Unexpected role " +
roleInfoProto.getRole());
}
- triggerPipelineClose(groupId, msg,
- ClosePipelineInfo.Reason.PIPELINE_FAILED);
+ triggerPipelineClose(groupId, b.toString(),
ClosePipelineInfo.Reason.PIPELINE_FAILED);
}
private void triggerPipelineClose(RaftGroupId groupId, String detail,
@@ -869,12 +864,12 @@ public final class XceiverServerRatis implements
XceiverServerSpi {
processReply(reply);
}
- void handleNodeSlowness(RaftGroupId groupId, RoleInfoProto roleInfoProto) {
- handlePipelineFailure(groupId, roleInfoProto);
+ void handleFollowerSlowness(RaftGroupId groupId, RoleInfoProto
roleInfoProto, RaftPeer follower) {
+ handlePipelineFailure(groupId, roleInfoProto, "slow follower " +
follower.getId());
}
void handleNoLeader(RaftGroupId groupId, RoleInfoProto roleInfoProto) {
- handlePipelineFailure(groupId, roleInfoProto);
+ handlePipelineFailure(groupId, roleInfoProto, "no leader");
}
void handleApplyTransactionFailure(RaftGroupId groupId,
@@ -901,10 +896,9 @@ public final class XceiverServerRatis implements
XceiverServerSpi {
void handleInstallSnapshotFromLeader(RaftGroupId groupId,
RoleInfoProto roleInfoProto,
TermIndex firstTermIndexInLog) {
- LOG.warn("Install snapshot notification received from Leader with " +
- "termIndex: {}, terminating pipeline: {}",
+ LOG.warn("handleInstallSnapshotFromLeader for firstTermIndexInLog={},
terminating pipeline: {}",
firstTermIndexInLog, groupId);
- handlePipelineFailure(groupId, roleInfoProto);
+ handlePipelineFailure(groupId, roleInfoProto, "install snapshot
notification");
}
/**
@@ -950,7 +944,7 @@ public final class XceiverServerRatis implements
XceiverServerSpi {
LOG.info("Leader change notification received for group: {} with new " +
"leaderId: {}", groupMemberId.getGroupId(), raftPeerId1);
// Save the reported leader to be sent with the report to SCM
- boolean leaderForGroup = this.raftPeerId.equals(raftPeerId1);
+ final boolean leaderForGroup = server.getId().equals(raftPeerId1);
activePipelines.compute(groupMemberId.getGroupId(),
(key, value) -> value == null ? new
ActivePipelineContext(leaderForGroup, false) :
new ActivePipelineContext(leaderForGroup, value.isPendingClose()));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]