This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new b0943d5f86 HDDS-11501. Improve logging in XceiverServerRatis (#7252)
b0943d5f86 is described below

commit b0943d5f869ff2cdcd5ebe256206bfb3ad62fbfd
Author: jianghuazhu <[email protected]>
AuthorDate: Tue Oct 1 12:37:02 2024 +0800

    HDDS-11501. Improve logging in XceiverServerRatis (#7252)
---
 .../server/ratis/ContainerStateMachine.java        |  5 +-
 .../transport/server/ratis/XceiverServerRatis.java | 88 ++++++++++------------
 2 files changed, 44 insertions(+), 49 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 873096a024..be566f84fc 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -83,6 +83,7 @@ import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.exceptions.StateMachineException;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.protocol.TermIndex;
@@ -1162,8 +1163,8 @@ public class ContainerStateMachine extends 
BaseStateMachine {
   }
 
   @Override
-  public void notifyFollowerSlowness(RoleInfoProto roleInfoProto) {
-    ratisServer.handleNodeSlowness(gid, roleInfoProto);
+  public void notifyFollowerSlowness(RoleInfoProto roleInfoProto, RaftPeer 
follower) {
+    ratisServer.handleFollowerSlowness(gid, roleInfoProto, follower);
   }
 
   @Override
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 7899cdcc0e..a4c1434398 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -104,6 +104,7 @@ import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.RaftServerRpc;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.TraditionalBinaryPrefix;
@@ -161,19 +162,18 @@ public final class XceiverServerRatis implements 
XceiverServerSpi {
   private int clientPort;
   private int dataStreamPort;
   private final RaftServer server;
+  private final String name;
   private final List<ThreadPoolExecutor> chunkExecutors;
   private final ContainerDispatcher dispatcher;
   private final ContainerController containerController;
   private final ClientId clientId = ClientId.randomId();
   private final StateContext context;
-  private final long nodeFailureTimeoutMs;
   private boolean isStarted = false;
   private final DatanodeDetails datanodeDetails;
   private final ConfigurationSource conf;
   // TODO: Remove the gids set when Ratis supports an api to query active
   // pipelines
   private final ConcurrentMap<RaftGroupId, ActivePipelineContext> 
activePipelines = new ConcurrentHashMap<>();
-  private final RaftPeerId raftPeerId;
   // Timeout used while calling submitRequest directly.
   private final long requestTimeout;
   private final boolean shouldDeleteRatisLogDirectory;
@@ -197,14 +197,14 @@ public final class XceiverServerRatis implements 
XceiverServerSpi {
     this.context = context;
     this.dispatcher = dispatcher;
     this.containerController = containerController;
-    this.raftPeerId = RatisHelper.toRaftPeerId(dd);
     String threadNamePrefix = datanodeDetails.threadNamePrefix();
     chunkExecutors = createChunkExecutors(conf, threadNamePrefix);
-    nodeFailureTimeoutMs = ratisServerConfig.getFollowerSlownessTimeout();
     shouldDeleteRatisLogDirectory =
         ratisServerConfig.shouldDeleteRatisLogDirectory();
 
     RaftProperties serverProperties = newRaftProperties();
+    final RaftPeerId raftPeerId = RatisHelper.toRaftPeerId(dd);
+    this.name = getClass().getSimpleName() + "(" + raftPeerId + ")";
     this.server =
         RaftServer.newBuilder().setServerId(raftPeerId)
             .setProperties(serverProperties)
@@ -474,7 +474,7 @@ public final class XceiverServerRatis implements 
XceiverServerSpi {
 
     // NOTE : the default value for the retry count in ratis is -1,
     // which means retry indefinitely.
-    int syncTimeoutRetryDefault = (int) nodeFailureTimeoutMs /
+    final int syncTimeoutRetryDefault = (int) 
ratisServerConfig.getFollowerSlownessTimeout() /
         dataSyncTimeout.toIntExact(TimeUnit.MILLISECONDS);
     int numSyncRetries = conf.getInt(
         OzoneConfigKeys.HDDS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_RETRIES,
@@ -558,7 +558,7 @@ public final class XceiverServerRatis implements 
XceiverServerSpi {
   @Override
   public void start() throws IOException {
     if (!isStarted) {
-      LOG.info("Starting {} {}", getClass().getSimpleName(), server.getId());
+      LOG.info("Starting {}", name);
       for (ThreadPoolExecutor executor : chunkExecutors) {
         executor.prestartAllCoreThreads();
       }
@@ -581,11 +581,11 @@ public final class XceiverServerRatis implements 
XceiverServerSpi {
     }
   }
 
-  private int getRealPort(InetSocketAddress address, Port.Name name) {
+  private int getRealPort(InetSocketAddress address, Port.Name portName) {
     int realPort = address.getPort();
-    datanodeDetails.setPort(DatanodeDetails.newPort(name, realPort));
-    LOG.info("{} {} is started using port {} for {}",
-        getClass().getSimpleName(), server.getId(), realPort, name);
+    final Port port = DatanodeDetails.newPort(portName, realPort);
+    datanodeDetails.setPort(port);
+    LOG.info("{} is started using port {}", name, port);
     return realPort;
   }
 
@@ -593,7 +593,7 @@ public final class XceiverServerRatis implements 
XceiverServerSpi {
   public void stop() {
     if (isStarted) {
       try {
-        LOG.info("Stopping {} {}", getClass().getSimpleName(), server.getId());
+        LOG.info("Closing {}", name);
         // shutdown server before the executors as while shutting down,
         // some of the tasks would be executed using the executors.
         server.close();
@@ -602,7 +602,7 @@ public final class XceiverServerRatis implements 
XceiverServerSpi {
         }
         isStarted = false;
       } catch (IOException e) {
-        LOG.error("XceiverServerRatis Could not be stopped gracefully.", e);
+        LOG.error("Failed to close {}.", name, e);
       }
     }
   }
@@ -706,45 +706,40 @@ public final class XceiverServerRatis implements 
XceiverServerSpi {
         nextCallId());
   }
 
-  private void handlePipelineFailure(RaftGroupId groupId,
-      RoleInfoProto roleInfoProto) {
-    String msg;
-    UUID datanode = RatisHelper.toDatanodeId(roleInfoProto.getSelf());
-    RaftPeerId id = RaftPeerId.valueOf(roleInfoProto.getSelf().getId());
+  private void handlePipelineFailure(RaftGroupId groupId, RoleInfoProto 
roleInfoProto, String reason) {
+    final RaftPeerId raftPeerId = 
RaftPeerId.valueOf(roleInfoProto.getSelf().getId());
+    Preconditions.assertEquals(getServer().getId(), raftPeerId, "raftPeerId");
+    final StringBuilder b = new StringBuilder()
+        .append(name).append(" with datanodeId 
").append(RatisHelper.toDatanodeId(raftPeerId))
+        .append("handlePipelineFailure ").append(" for ").append(reason)
+        .append(": ").append(roleInfoProto.getRole())
+        .append(" elapsed 
time=").append(roleInfoProto.getRoleElapsedTimeMs()).append("ms");
+
     switch (roleInfoProto.getRole()) {
     case CANDIDATE:
-      msg = datanode + " is in candidate state for " +
-          roleInfoProto.getCandidateInfo().getLastLeaderElapsedTimeMs() + "ms";
+      final long lastLeaderElapsedTime = 
roleInfoProto.getCandidateInfo().getLastLeaderElapsedTimeMs();
+      b.append(", 
lastLeaderElapsedTime=").append(lastLeaderElapsedTime).append("ms");
       break;
     case FOLLOWER:
-      msg = datanode + " closes pipeline when installSnapshot from leader " +
-          "because leader snapshot doesn't contain any data to replay, " +
-          "all the log entries prior to the snapshot might have been purged." +
-          "So follower should not try to install snapshot from leader but" +
-          "can close the pipeline here. It's in follower state for " +
-          roleInfoProto.getRoleElapsedTimeMs() + "ms";
+      b.append(", 
outstandingOp=").append(roleInfoProto.getFollowerInfo().getOutstandingOp());
       break;
     case LEADER:
-      StringBuilder sb = new StringBuilder();
-      sb.append(datanode).append(" has not seen follower/s");
-      for (RaftProtos.ServerRpcProto follower : roleInfoProto.getLeaderInfo()
-          .getFollowerInfoList()) {
-        if (follower.getLastRpcElapsedTimeMs() > nodeFailureTimeoutMs) {
-          sb.append(" ").append(RatisHelper.toDatanodeId(follower.getId()))
-              .append(" for ").append(follower.getLastRpcElapsedTimeMs())
-              .append("ms");
-        }
+      final long followerSlownessTimeoutMs = 
ratisServerConfig.getFollowerSlownessTimeout();
+      for (RaftProtos.ServerRpcProto follower : 
roleInfoProto.getLeaderInfo().getFollowerInfoList()) {
+        final long lastRpcElapsedTimeMs = follower.getLastRpcElapsedTimeMs();
+        final boolean slow = lastRpcElapsedTimeMs > followerSlownessTimeoutMs;
+        final RaftPeerId followerId = 
RaftPeerId.valueOf(follower.getId().getId());
+        b.append("\n  Follower ").append(followerId)
+            .append(" with datanodeId 
").append(RatisHelper.toDatanodeId(followerId))
+            .append(" is ").append(slow ? "slow" : " responding")
+            .append(" with 
lastRpcElapsedTime=").append(lastRpcElapsedTimeMs).append("ms");
       }
-      msg = sb.toString();
       break;
     default:
-      LOG.error("unknown state: {}", roleInfoProto.getRole());
-      throw new IllegalStateException("node" + id + " is in illegal role "
-          + roleInfoProto.getRole());
+      throw new IllegalStateException("Unexpected role " + 
roleInfoProto.getRole());
     }
 
-    triggerPipelineClose(groupId, msg,
-        ClosePipelineInfo.Reason.PIPELINE_FAILED);
+    triggerPipelineClose(groupId, b.toString(), 
ClosePipelineInfo.Reason.PIPELINE_FAILED);
   }
 
   private void triggerPipelineClose(RaftGroupId groupId, String detail,
@@ -869,12 +864,12 @@ public final class XceiverServerRatis implements 
XceiverServerSpi {
     processReply(reply);
   }
 
-  void handleNodeSlowness(RaftGroupId groupId, RoleInfoProto roleInfoProto) {
-    handlePipelineFailure(groupId, roleInfoProto);
+  void handleFollowerSlowness(RaftGroupId groupId, RoleInfoProto 
roleInfoProto, RaftPeer follower) {
+    handlePipelineFailure(groupId, roleInfoProto, "slow follower " + 
follower.getId());
   }
 
   void handleNoLeader(RaftGroupId groupId, RoleInfoProto roleInfoProto) {
-    handlePipelineFailure(groupId, roleInfoProto);
+    handlePipelineFailure(groupId, roleInfoProto, "no leader");
   }
 
   void handleApplyTransactionFailure(RaftGroupId groupId,
@@ -901,10 +896,9 @@ public final class XceiverServerRatis implements 
XceiverServerSpi {
   void handleInstallSnapshotFromLeader(RaftGroupId groupId,
                                        RoleInfoProto roleInfoProto,
                                        TermIndex firstTermIndexInLog) {
-    LOG.warn("Install snapshot notification received from Leader with " +
-        "termIndex: {}, terminating pipeline: {}",
+    LOG.warn("handleInstallSnapshotFromLeader for firstTermIndexInLog={}, 
terminating pipeline: {}",
         firstTermIndexInLog, groupId);
-    handlePipelineFailure(groupId, roleInfoProto);
+    handlePipelineFailure(groupId, roleInfoProto, "install snapshot 
notification");
   }
 
   /**
@@ -950,7 +944,7 @@ public final class XceiverServerRatis implements 
XceiverServerSpi {
     LOG.info("Leader change notification received for group: {} with new " +
         "leaderId: {}", groupMemberId.getGroupId(), raftPeerId1);
     // Save the reported leader to be sent with the report to SCM
-    boolean leaderForGroup = this.raftPeerId.equals(raftPeerId1);
+    final boolean leaderForGroup = server.getId().equals(raftPeerId1);
     activePipelines.compute(groupMemberId.getGroupId(),
         (key, value) -> value == null ? new 
ActivePipelineContext(leaderForGroup, false) :
             new ActivePipelineContext(leaderForGroup, value.isPendingClose()));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to