This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new cedcd2ad4 RATIS-1967. Do not store CommitInfoProto in CommitInfoCache.
(#988)
cedcd2ad4 is described below
commit cedcd2ad4cd2da13230aaa0d15678e5aee9b0729
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Dec 13 14:03:04 2023 -0800
RATIS-1967. Do not store CommitInfoProto in CommitInfoCache. (#988)
---
.../apache/ratis/server/impl/CommitInfoCache.java | 33 +++++++++++++-----
.../ratis/server/impl/ConfigurationManager.java | 40 ++++++++++++++++------
.../apache/ratis/server/impl/RaftServerImpl.java | 26 ++++++++++----
.../org/apache/ratis/server/impl/ServerState.java | 6 +++-
.../server/metrics/RaftServerMetricsImpl.java | 15 ++++----
.../ratis/server/impl/TestRetryCacheMetrics.java | 20 +++++------
6 files changed, 93 insertions(+), 47 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/CommitInfoCache.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/CommitInfoCache.java
index aa0bb05d9..04210d709 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/CommitInfoCache.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/CommitInfoCache.java
@@ -24,30 +24,45 @@ import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.ProtoUtils;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/** Caching the commit information. */
class CommitInfoCache {
- private final ConcurrentMap<RaftPeerId, CommitInfoProto> map = new
ConcurrentHashMap<>();
+ private final ConcurrentMap<RaftPeerId, Long> map = new
ConcurrentHashMap<>();
- CommitInfoProto get(RaftPeerId id) {
- return map.get(id);
+ Optional<Long> get(RaftPeerId id) {
+ return Optional.ofNullable(map.get(id));
}
CommitInfoProto update(RaftPeer peer, long newCommitIndex) {
Objects.requireNonNull(peer, "peer == null");
- return map.compute(peer.getId(), (id, old) ->
- old == null || newCommitIndex > old.getCommitIndex()?
ProtoUtils.toCommitInfoProto(peer, newCommitIndex): old);
+ final long updated = update(peer.getId(), newCommitIndex);
+ return ProtoUtils.toCommitInfoProto(peer, updated);
}
- CommitInfoProto update(CommitInfoProto newInfo) {
- return map.compute(RaftPeerId.valueOf(newInfo.getServer().getId()),
- (id, old) -> old == null || newInfo.getCommitIndex() >
old.getCommitIndex()? newInfo: old);
+ long update(RaftPeerId peerId, long newCommitIndex) {
+ Objects.requireNonNull(peerId, "peerId == null");
+ return map.compute(peerId, (id, oldCommitIndex) -> {
+ if (oldCommitIndex != null) {
+ // get around BX_UNBOXING_IMMEDIATELY_REBOXED
+ final long old = oldCommitIndex;
+ if (old >= newCommitIndex) {
+ return old;
+ }
+ }
+ return newCommitIndex;
+ });
+ }
+
+ void update(CommitInfoProto newInfo) {
+ final RaftPeerId id = RaftPeerId.valueOf(newInfo.getServer().getId());
+ update(id, newInfo.getCommitIndex());
}
@Override
public String toString() {
- return JavaUtils.getClassSimpleName(getClass()) + ":" + map.values();
+ return JavaUtils.getClassSimpleName(getClass()) + ":" + map;
}
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java
index ed51a657a..0e020b7e3 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java
@@ -17,6 +17,9 @@
*/
package org.apache.ratis.server.impl;
+import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
@@ -31,6 +34,7 @@ import java.util.*;
* entries.
*/
public class ConfigurationManager {
+ private final RaftPeerId id;
private final RaftConfigurationImpl initialConf;
private final NavigableMap<Long, RaftConfigurationImpl> configurations = new
TreeMap<>();
/**
@@ -38,10 +42,21 @@ public class ConfigurationManager {
* the last entry of the map. Otherwise is initialConf.
*/
private volatile RaftConfigurationImpl currentConf;
+ /** Cache the peer corresponding to {@link #id}. */
+ private volatile RaftPeer currentPeer;
- ConfigurationManager(RaftConfigurationImpl initialConf) {
+ ConfigurationManager(RaftPeerId id, RaftConfigurationImpl initialConf) {
+ this.id = id;
this.initialConf = initialConf;
- this.currentConf = initialConf;
+ setCurrentConf(initialConf);
+ }
+
+ private void setCurrentConf(RaftConfigurationImpl currentConf) {
+ this.currentConf = currentConf;
+ final RaftPeer peer = currentConf.getPeer(id, RaftPeerRole.FOLLOWER,
RaftPeerRole.LISTENER);
+ if (peer != null) {
+ this.currentPeer = peer;
+ }
}
synchronized void addConfiguration(RaftConfiguration conf) {
@@ -57,7 +72,7 @@ public class ConfigurationManager {
private void addRaftConfigurationImpl(long logIndex, RaftConfigurationImpl
conf) {
configurations.put(logIndex, conf);
if (logIndex == configurations.lastEntry().getKey()) {
- currentConf = conf;
+ setCurrentConf(conf);
}
}
@@ -65,21 +80,24 @@ public class ConfigurationManager {
return currentConf;
}
+ RaftPeer getCurrentPeer() {
+ return currentPeer;
+ }
+
/**
* Remove all the configurations whose log index is >= the given index.
+ *
* @param index The given index. All the configurations whose log index is >=
* this value will be removed.
- * @return The configuration with largest log index < the given index.
*/
- synchronized RaftConfiguration removeConfigurations(long index) {
+ synchronized void removeConfigurations(long index) {
// remove all configurations starting at the index
- for(final Iterator<?> iter =
configurations.tailMap(index).entrySet().iterator(); iter.hasNext();) {
- iter.next();
- iter.remove();
+ final SortedMap<Long, RaftConfigurationImpl> tail =
configurations.tailMap(index);
+ if (tail.isEmpty()) {
+ return;
}
- currentConf = configurations.isEmpty() ? initialConf :
- configurations.lastEntry().getValue();
- return currentConf;
+ tail.clear();
+ setCurrentConf(configurations.isEmpty() ? initialConf :
configurations.lastEntry().getValue());
}
synchronized int numOfConf() {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 5e6c81215..ed5457b3d 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -275,7 +275,7 @@ class RaftServerImpl implements RaftServer.Division,
this.leaderElectionMetrics =
LeaderElectionMetrics.getLeaderElectionMetrics(
getMemberId(), state::getLastLeaderElapsedTimeMs);
this.raftServerMetrics =
RaftServerMetricsImpl.computeIfAbsentRaftServerMetrics(
- getMemberId(), () -> commitInfoCache::get, retryCache::getStatistics);
+ getMemberId(), this::getCommitIndex, retryCache::getStatistics);
this.startComplete = new AtomicBoolean(false);
this.threadGroup = new ThreadGroup(proxy.getThreadGroup(),
getMemberId().toString());
@@ -294,6 +294,10 @@ class RaftServerImpl implements RaftServer.Division,
id + "-client");
}
+ private long getCommitIndex(RaftPeerId id) {
+ return commitInfoCache.get(id).orElse(0L);
+ }
+
@Override
public DivisionProperties properties() {
return divisionProperties;
@@ -452,6 +456,12 @@ class RaftServerImpl implements RaftServer.Division,
return getState().getMemberId();
}
+ @Override
+ public RaftPeer getPeer() {
+ return Optional.ofNullable(getState().getCurrentPeer())
+ .orElseGet(() -> getRaftServer().getPeer());
+ }
+
@Override
public DivisionInfo getInfo() {
return info;
@@ -622,7 +632,8 @@ class RaftServerImpl implements RaftServer.Division,
public Collection<CommitInfoProto> getCommitInfos() {
final List<CommitInfoProto> infos = new ArrayList<>();
// add the commit info of this server
- infos.add(updateCommitInfoCache());
+ final long commitIndex = updateCommitInfoCache();
+ infos.add(ProtoUtils.toCommitInfoProto(getPeer(), commitIndex));
// add the commit infos of other servers
if (getInfo().isLeader()) {
@@ -633,9 +644,10 @@ class RaftServerImpl implements RaftServer.Division,
Stream.concat(
raftConf.getAllPeers(RaftPeerRole.FOLLOWER).stream(),
raftConf.getAllPeers(RaftPeerRole.LISTENER).stream())
- .map(RaftPeer::getId)
- .filter(id -> !id.equals(getId()))
- .map(commitInfoCache::get)
+ .filter(peer -> !peer.getId().equals(getId()))
+ .map(peer -> commitInfoCache.get(peer.getId())
+ .map(index -> ProtoUtils.toCommitInfoProto(peer, index))
+ .orElse(null))
.filter(Objects::nonNull)
.forEach(infos::add);
}
@@ -1534,8 +1546,8 @@ class RaftServerImpl implements RaftServer.Division,
}
}
- private CommitInfoProto updateCommitInfoCache() {
- return commitInfoCache.update(getPeer(),
state.getLog().getLastCommittedIndex());
+ private long updateCommitInfoCache() {
+ return commitInfoCache.update(getId(),
state.getLog().getLastCommittedIndex());
}
ExecutorService getServerExecutor() {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 07c0e6c4a..e21f63caa 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -109,7 +109,7 @@ class ServerState {
final RaftConfigurationImpl initialConf =
RaftConfigurationImpl.newBuilder()
.setConf(followerPeers, listenerPeers)
.build();
- configurationManager = new ConfigurationManager(initialConf);
+ configurationManager = new ConfigurationManager(id, initialConf);
LOG.info("{}: {}", getMemberId(), configurationManager);
final String storageDirName = group.getGroupId().getUuid().toString();
@@ -196,6 +196,10 @@ class ServerState {
return configurationManager.getCurrent();
}
+ RaftPeer getCurrentPeer() {
+ return configurationManager.getCurrentPeer();
+ }
+
long getCurrentTerm() {
return currentTerm.get();
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java
index 171451705..cdbce6ee2 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java
@@ -18,11 +18,11 @@
package org.apache.ratis.server.metrics;
-import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
+import java.util.function.ToLongFunction;
import org.apache.ratis.metrics.LongCounter;
import org.apache.ratis.metrics.Timekeeper;
@@ -30,7 +30,6 @@ import org.apache.ratis.metrics.Timekeeper;
import org.apache.ratis.metrics.MetricRegistryInfo;
import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
-import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
import org.apache.ratis.protocol.RaftClientRequest.Type;
import org.apache.ratis.protocol.RaftGroupMemberId;
@@ -98,7 +97,7 @@ public final class RaftServerMetricsImpl extends RatisMetrics
implements RaftSer
/** Follower Id -> heartbeat elapsed */
private final Map<RaftPeerId, Long> followerLastHeartbeatElapsedTimeMap =
new ConcurrentHashMap<>();
- private final Supplier<Function<RaftPeerId, CommitInfoProto>>
commitInfoCache;
+ private final ToLongFunction<RaftPeerId> commitInfoCache;
/** id -> metric */
private static final Map<RaftGroupMemberId, RaftServerMetricsImpl> METRICS =
new ConcurrentHashMap<>();
@@ -111,7 +110,7 @@ public final class RaftServerMetricsImpl extends
RatisMetrics implements RaftSer
}
public static RaftServerMetricsImpl
computeIfAbsentRaftServerMetrics(RaftGroupMemberId serverId,
- Supplier<Function<RaftPeerId, CommitInfoProto>> commitInfoCache,
+ ToLongFunction<RaftPeerId> commitInfoCache,
Supplier<RetryCache.Statistics> retryCacheStatistics) {
return METRICS.computeIfAbsent(serverId,
key -> new RaftServerMetricsImpl(serverId, commitInfoCache,
retryCacheStatistics));
@@ -122,7 +121,7 @@ public final class RaftServerMetricsImpl extends
RatisMetrics implements RaftSer
}
public RaftServerMetricsImpl(RaftGroupMemberId serverId,
- Supplier<Function<RaftPeerId, CommitInfoProto>> commitInfoCache,
+ ToLongFunction<RaftPeerId> commitInfoCache,
Supplier<RetryCache.Statistics> retryCacheStatistics) {
super(createRegistry(serverId.toString()));
this.commitInfoCache = commitInfoCache;
@@ -183,10 +182,8 @@ public final class RaftServerMetricsImpl extends
RatisMetrics implements RaftSer
* Register a commit index tracker for the peer in cluster.
*/
private void addPeerCommitIndexGauge(RaftPeerId peerId) {
- getRegistry().gauge(getPeerCommitIndexGaugeKey(peerId), () -> () ->
Optional.ofNullable(commitInfoCache.get())
- .map(cache -> cache.apply(peerId))
- .map(CommitInfoProto::getCommitIndex)
- .orElse(0L));
+ getRegistry().gauge(getPeerCommitIndexGaugeKey(peerId),
+ () -> () -> commitInfoCache.applyAsLong(peerId));
}
@VisibleForTesting
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRetryCacheMetrics.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRetryCacheMetrics.java
index e5222d21b..b25a50bf9 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRetryCacheMetrics.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRetryCacheMetrics.java
@@ -52,10 +52,10 @@ public class TestRetryCacheMetrics {
retryCache = new
RetryCacheImpl(RaftServerConfigKeys.RetryCache.EXPIRY_TIME_DEFAULT, null);
final RaftServerMetricsImpl raftServerMetrics =
RaftServerMetricsImpl.computeIfAbsentRaftServerMetrics(
- raftGroupMemberId, () -> null, retryCache::getStatistics);
+ raftGroupMemberId, id -> 0L, retryCache::getStatistics);
ratisMetricRegistry = (RatisMetricRegistryImpl)
raftServerMetrics.getRegistry();
}
-
+
@After
public void tearDown() {
retryCache.close();
@@ -92,23 +92,23 @@ public class TestRetryCacheMetrics {
}
private static void checkHit(long count, double rate) {
- Long hitCount = (Long) ratisMetricRegistry.getGauges((s, metric) ->
+ final long hitCount = (Long) ratisMetricRegistry.getGauges((s, metric) ->
s.contains(RETRY_CACHE_HIT_COUNT_METRIC)).values().iterator().next().getValue();
- assertEquals(hitCount.longValue(), count);
+ assertEquals(hitCount, count);
- Double hitRate = (Double) ratisMetricRegistry.getGauges((s, metric) ->
+ final double hitRate = (Double) ratisMetricRegistry.getGauges((s,
metric) ->
s.contains(RETRY_CACHE_HIT_RATE_METRIC)).values().iterator().next().getValue();
- assertEquals(hitRate.doubleValue(), rate, 0.0);
+ assertEquals(hitRate, rate, 0.0);
}
private static void checkMiss(long count, double rate) {
- Long missCount = (Long) ratisMetricRegistry.getGauges((s, metric) ->
+ final long missCount = (Long) ratisMetricRegistry.getGauges((s, metric)
->
s.contains(RETRY_CACHE_MISS_COUNT_METRIC)).values().iterator().next().getValue();
- assertEquals(missCount.longValue(), count);
+ assertEquals(missCount, count);
- Double missRate = (Double) ratisMetricRegistry.getGauges((s, metric) ->
+ final double missRate = (Double) ratisMetricRegistry.getGauges((s,
metric) ->
s.contains(RETRY_CACHE_MISS_RATE_METRIC)).values().iterator().next().getValue();
- assertEquals(missRate.doubleValue(), rate, 0.0);
+ assertEquals(missRate, rate, 0.0);
}
private static void checkEntryCount(long expected) {