This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new cd9ec69bc RATIS-1757: Missing some metrics for listener. (#796)
cd9ec69bc is described below

commit cd9ec69bc9010ed3142e6ebafed49745a7fd61ef
Author: qian0817 <[email protected]>
AuthorDate: Thu Dec 8 21:40:05 2022 +0800

    RATIS-1757: Missing some metrics for listener. (#796)
---
 .../apache/ratis/server/impl/LeaderStateImpl.java  | 27 +++++++++-------------
 .../server/metrics/RaftServerMetricsImpl.java      |  5 ++--
 2 files changed, 13 insertions(+), 19 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index a23e8d44c..644b93fd0 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -366,11 +366,11 @@ class LeaderStateImpl implements LeaderState {
     placeHolderIndex = raftLog.getNextIndex();
 
     senders = new SenderList();
-    addSenders(others, placeHolderIndex, true, RaftPeerRole.FOLLOWER);
+    addSenders(others, placeHolderIndex, true);
 
     final Collection<RaftPeer> listeners = 
conf.getAllPeers(RaftPeerRole.LISTENER);
     if (!listeners.isEmpty()) {
-      addSenders(listeners, placeHolderIndex, false, RaftPeerRole.LISTENER);
+      addSenders(listeners, placeHolderIndex, false);
     }
   }
 
@@ -460,8 +460,8 @@ class LeaderStateImpl implements LeaderState {
       applyOldNewConf();
     } else {
       // update the LeaderState's sender list
-      addAndStartSenders(newPeers, RaftPeerRole.FOLLOWER);
-      addAndStartSenders(newListeners, RaftPeerRole.LISTENER);
+      addAndStartSenders(newPeers);
+      addAndStartSenders(newListeners);
     }
     return pending;
   }
@@ -569,23 +569,20 @@ class LeaderStateImpl implements LeaderState {
   /**
    * Update sender list for setConfiguration request
    */
-  void addAndStartSenders(Collection<RaftPeer> newPeers, RaftPeerRole role) {
+  void addAndStartSenders(Collection<RaftPeer> newPeers) {
     if (!newPeers.isEmpty()) {
-      addSenders(newPeers, RaftLog.LEAST_VALID_LOG_INDEX, false, 
role).forEach(LogAppender::start);
+      addSenders(newPeers, RaftLog.LEAST_VALID_LOG_INDEX, 
false).forEach(LogAppender::start);
     }
   }
 
-  Collection<LogAppender> addSenders(Collection<RaftPeer> newPeers, long 
nextIndex, boolean attendVote,
-      RaftPeerRole role) {
+  Collection<LogAppender> addSenders(Collection<RaftPeer> newPeers, long 
nextIndex, boolean attendVote) {
     final Timestamp t = 
Timestamp.currentTime().addTimeMs(-server.getMaxTimeoutMs());
     final List<LogAppender> newAppenders = newPeers.stream()
         .map(peer -> {
           final FollowerInfo f = new FollowerInfoImpl(server.getMemberId(), 
peer, t, nextIndex, attendVote);
           followerInfoMap.put(peer.getId(), f);
-          if (role == RaftPeerRole.FOLLOWER) {
-            raftServerMetrics.addFollower(peer.getId());
-            logAppenderMetrics.addFollowerGauges(peer.getId(), 
f::getNextIndex, f::getMatchIndex, f::getLastRpcTime);
-          }
+          raftServerMetrics.addFollower(peer.getId());
+          logAppenderMetrics.addFollowerGauges(peer.getId(), f::getNextIndex, 
f::getMatchIndex, f::getLastRpcTime);
           return server.newLogAppender(this, f);
         }).collect(Collectors.toList());
     senders.addAll(newAppenders);
@@ -606,10 +603,8 @@ class LeaderStateImpl implements LeaderState {
     senders.removeAll(Collections.singleton(sender));
 
     final RaftPeer peer = info.getPeer();
-    if (server.getRaftConf().containsInConf(peer.getId())) {
-      addAndStartSenders(Collections.singleton(peer), RaftPeerRole.FOLLOWER);
-    } else if (server.getRaftConf().containsInConf(peer.getId(), 
RaftPeerRole.LISTENER)) {
-      addAndStartSenders(Collections.singleton(peer), RaftPeerRole.LISTENER);
+    if (server.getRaftConf().containsInConf(peer.getId(), 
RaftPeerRole.FOLLOWER, RaftPeerRole.LISTENER)) {
+      addAndStartSenders(Collections.singleton(peer));
     }
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java
index f37cfd464..171451705 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java
@@ -20,7 +20,6 @@ package org.apache.ratis.server.metrics;
 
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -98,7 +97,7 @@ public final class RaftServerMetricsImpl extends RatisMetrics 
implements RaftSer
       = newHeartbeatTimer(FOLLOWER_APPEND_ENTRIES_LATENCY);
 
   /** Follower Id -> heartbeat elapsed */
-  private final Map<RaftPeerId, Long> followerLastHeartbeatElapsedTimeMap = 
new HashMap<>();
+  private final Map<RaftPeerId, Long> followerLastHeartbeatElapsedTimeMap = 
new ConcurrentHashMap<>();
   private final Supplier<Function<RaftPeerId, CommitInfoProto>> 
commitInfoCache;
 
   /** id -> metric */
@@ -287,4 +286,4 @@ public final class RaftServerMetricsImpl extends 
RatisMetrics implements RaftSer
   public void onSnapshotInstalled() {
     numInstallSnapshot.inc();
   }
-}
\ No newline at end of file
+}

Reply via email to