This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new fc39c38ea RATIS-2173. Fix zero-copy bugs for non-gRPC cases. (#1167)
fc39c38ea is described below

commit fc39c38eab115bd8349d753b2b9087c7f9398e0e
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Sun Oct 20 03:31:08 2024 -0700

    RATIS-2173. Fix zero-copy bugs for non-gRPC cases. (#1167)
---
 .../java/org/apache/ratis/TestMultiRaftGroup.java  | 11 +--
 .../ratis/server/leader/LogAppenderDefault.java    | 30 +++----
 .../ratis/server/raftlog/memory/MemoryRaftLog.java | 23 ++----
 .../test/java/org/apache/ratis/RaftBasicTests.java |  6 --
 .../test/java/org/apache/ratis/RaftTestUtil.java   |  7 +-
 .../ratis/server/impl/LeaderElectionTests.java     | 91 +++++++++-------------
 6 files changed, 66 insertions(+), 102 deletions(-)

diff --git 
a/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java 
b/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java
index 190f75858..ea3962c08 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java
@@ -22,25 +22,20 @@ import org.apache.ratis.examples.ParameterizedBaseTest;
 import org.apache.ratis.examples.arithmetic.ArithmeticStateMachine;
 import org.apache.ratis.examples.arithmetic.TestArithmetic;
 import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.impl.GroupManagementBaseTest;
 import org.apache.ratis.server.impl.MiniRaftCluster;
-import org.apache.ratis.util.Slf4jUtils;
 import org.apache.ratis.util.function.CheckedBiConsumer;
+import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
-import org.slf4j.event.Level;
 
 import java.io.IOException;
 import java.util.Collection;
 import java.util.concurrent.atomic.AtomicInteger;
 
+@Timeout(value = 300)
 public class TestMultiRaftGroup extends BaseTest {
-  static {
-    Slf4jUtils.setLogLevel(RaftServer.Division.LOG, Level.DEBUG);
-  }
-
-  public static Collection<Object[]> data() throws IOException {
+  public static Collection<Object[]> data() {
     return 
ParameterizedBaseTest.getMiniRaftClusters(ArithmeticStateMachine.class, 0);
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
index f75a80f82..8ec6c19db 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
@@ -59,23 +59,15 @@ class LogAppenderDefault extends LogAppenderBase {
   /** Send an appendEntries RPC; retry indefinitely. */
   private AppendEntriesReplyProto sendAppendEntriesWithRetries(AtomicLong 
requestFirstIndex)
       throws InterruptedException, InterruptedIOException, RaftLogIOException {
-    int retry = 0;
-
-    ReferenceCountedObject<AppendEntriesRequestProto> request = 
nextAppendEntriesRequest(
-        CallId.getAndIncrement(), false);
-    while (isRunning()) { // keep retrying for IOException
+    for(int retry = 0; isRunning(); retry++) {
+      final ReferenceCountedObject<AppendEntriesRequestProto> request = 
nextAppendEntriesRequest(
+          CallId.getAndIncrement(), false);
+      if (request == null) {
+        LOG.trace("{} no entries to send now, wait ...", this);
+        return null;
+      }
       try {
-        if (request == null || request.get().getEntriesCount() == 0) {
-          if (request != null) {
-            request.release();
-          }
-          request = nextAppendEntriesRequest(CallId.getAndIncrement(), false);
-        }
-
-        if (request == null) {
-          LOG.trace("{} no entries to send now, wait ...", this);
-          return null;
-        } else if (!isRunning()) {
+        if (!isRunning()) {
           LOG.info("{} is stopped. Skip appendEntries.", this);
           return null;
         }
@@ -84,17 +76,19 @@ class LogAppenderDefault extends LogAppenderBase {
         final AppendEntriesReplyProto reply = sendAppendEntries(proto);
         final long first = proto.getEntriesCount() > 0 ? 
proto.getEntries(0).getIndex() : RaftLog.INVALID_LOG_INDEX;
         requestFirstIndex.set(first);
-        request.release();
         return reply;
       } catch (InterruptedIOException | RaftLogIOException e) {
         throw e;
       } catch (IOException ioe) {
         // TODO should have more detailed retry policy here.
-        if (retry++ % 10 == 0) { // to reduce the number of messages
+        if (retry % 10 == 0) { // to reduce the number of messages
           LOG.warn("{}: Failed to appendEntries (retry={})", this, retry, ioe);
         }
         handleException(ioe);
+      } finally {
+        request.release();
       }
+
       if (isRunning()) {
         getServer().properties().rpcSleepTime().sleep();
       }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
index 2aac6c1b1..f4b6dc452 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
@@ -45,15 +45,10 @@ import java.util.function.LongSupplier;
  */
 public class MemoryRaftLog extends RaftLogBase {
   static class EntryList {
-    private final List<ReferenceCountedObject<LogEntryProto>> entries = new 
ArrayList<>();
-
-    ReferenceCountedObject<LogEntryProto> getRef(int i) {
-      return i >= 0 && i < entries.size() ? entries.get(i) : null;
-    }
+    private final List<LogEntryProto> entries = new ArrayList<>();
 
     LogEntryProto get(int i) {
-      final ReferenceCountedObject<LogEntryProto> ref = getRef(i);
-      return ref != null ? ref.get() : null;
+      return i >= 0 && i < entries.size() ? entries.get(i) : null;
     }
 
     TermIndex getTermIndex(int i) {
@@ -81,13 +76,10 @@ public class MemoryRaftLog extends RaftLogBase {
     }
 
     void clear(int from, int to) {
-      List<ReferenceCountedObject<LogEntryProto>> subList = 
entries.subList(from, to);
-      subList.forEach(ReferenceCountedObject::release);
-      subList.clear();
+      entries.subList(from, to).clear();
     }
 
-    void add(ReferenceCountedObject<LogEntryProto> entryRef) {
-      entryRef.retain();
+    void add(LogEntryProto entryRef) {
       entries.add(entryRef);
     }
   }
@@ -128,7 +120,8 @@ public class MemoryRaftLog extends RaftLogBase {
   public ReferenceCountedObject<LogEntryProto> retainLog(long index) {
     checkLogState();
     try (AutoCloseableLock readLock = readLock()) {
-      ReferenceCountedObject<LogEntryProto> ref = 
entries.getRef(Math.toIntExact(index));
+      final LogEntryProto entry = entries.get(Math.toIntExact(index));
+      final ReferenceCountedObject<LogEntryProto> ref = 
ReferenceCountedObject.wrap(entry);
       ref.retain();
       return ref;
     }
@@ -205,7 +198,7 @@ public class MemoryRaftLog extends RaftLogBase {
     LogEntryProto entry = entryRef.retain();
     try (AutoCloseableLock writeLock = writeLock()) {
       validateLogEntry(entry);
-      entries.add(entryRef);
+      entries.add(entry);
     } finally {
       entryRef.release();
     }
@@ -253,7 +246,7 @@ public class MemoryRaftLog extends RaftLogBase {
       }
       for (int i = index; i < logEntryProtos.size(); i++) {
         LogEntryProto logEntryProto = logEntryProtos.get(i);
-        entries.add(entriesRef.delegate(logEntryProto));
+        entries.add(LogProtoUtils.copy(logEntryProto));
         
futures.add(CompletableFuture.completedFuture(logEntryProto.getIndex()));
       }
       return futures;
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index b16905d9a..dbb0dbcab 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -33,13 +33,11 @@ import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
 import org.apache.ratis.server.impl.MiniRaftCluster;
-import org.apache.ratis.server.impl.RaftServerTestUtil;
 import org.apache.ratis.server.impl.RetryCacheTestUtil;
 import org.apache.ratis.server.metrics.ServerMetricsTestUtils;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.util.ExitUtils;
 import org.apache.ratis.util.JavaUtils;
-import org.apache.ratis.util.Slf4jUtils;
 import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.Timestamp;
 import org.junit.jupiter.api.Assertions;
@@ -47,7 +45,6 @@ import org.junit.jupiter.api.Assumptions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.slf4j.Logger;
-import org.slf4j.event.Level;
 
 import java.io.IOException;
 import java.util.List;
@@ -75,9 +72,6 @@ public abstract class RaftBasicTests<CLUSTER extends 
MiniRaftCluster>
     extends BaseTest
     implements MiniRaftCluster.Factory.Get<CLUSTER> {
   {
-    Slf4jUtils.setLogLevel(RaftServer.Division.LOG, Level.DEBUG);
-    RaftServerTestUtil.setStateMachineUpdaterLogLevel(Level.DEBUG);
-
     RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), 
TimeDuration.valueOf(5, TimeUnit.SECONDS));
   }
 
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index be8739ad8..733993701 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -187,8 +187,11 @@ public interface RaftTestUtil {
             
log.get(termIndices[idxEntries].getIndex()).getStateMachineLogEntry().getLogData().toByteArray()))
 {
           ++idxExpected;
         }
-      } catch (IOException e) {
-        throw new RuntimeException(e);
+      } catch (Exception e) {
+        throw new IllegalStateException("Failed logEntriesContains: 
startIndex=" + startIndex
+            + ", endIndex=" + endIndex
+            + ", #expectedMessages=" + expectedMessages.length
+            + ", log=" + log, e);
       }
       ++idxEntries;
     }
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index 6a5c6387c..bda496c16 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -21,8 +21,8 @@ import org.apache.ratis.BaseTest;
 import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.metrics.impl.RatisMetricRegistryImpl;
 import org.apache.ratis.metrics.impl.DefaultTimekeeperImpl;
+import org.apache.ratis.metrics.impl.RatisMetricRegistryImpl;
 import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftGroupId;
@@ -37,19 +37,18 @@ import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.metrics.LeaderElectionMetrics;
 import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils;
-import org.apache.ratis.util.ExitUtils;
+import org.apache.ratis.thirdparty.com.codahale.metrics.Timer;
+import org.apache.ratis.util.CodeInjectionForTesting;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LifeCycle;
 import org.apache.ratis.util.Slf4jUtils;
 import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.Timestamp;
 import org.apache.ratis.util.function.CheckedBiConsumer;
-import org.apache.ratis.util.CodeInjectionForTesting;
-import org.junit.Assert;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
+import org.slf4j.event.Level;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -66,18 +65,15 @@ import java.util.stream.Collectors;
 import static org.apache.ratis.RaftTestUtil.waitForLeader;
 import static 
org.apache.ratis.server.metrics.LeaderElectionMetrics.LAST_LEADER_ELECTION_ELAPSED_TIME;
 import static 
org.apache.ratis.server.metrics.LeaderElectionMetrics.LEADER_ELECTION_COUNT_METRIC;
-import static 
org.apache.ratis.server.metrics.LeaderElectionMetrics.LEADER_ELECTION_TIME_TAKEN;
 import static 
org.apache.ratis.server.metrics.LeaderElectionMetrics.LEADER_ELECTION_TIMEOUT_COUNT_METRIC;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.junit.Assert.assertNotNull;
+import static 
org.apache.ratis.server.metrics.LeaderElectionMetrics.LEADER_ELECTION_TIME_TAKEN;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import org.apache.ratis.thirdparty.com.codahale.metrics.Timer;
-import org.slf4j.event.Level;
-
 public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
     extends BaseTest
     implements MiniRaftCluster.Factory.Get<CLUSTER> {
@@ -89,15 +85,16 @@ public abstract class LeaderElectionTests<CLUSTER extends 
MiniRaftCluster>
   @Test
   public void testBasicLeaderElection() throws Exception {
     LOG.info("Running testBasicLeaderElection");
-    final MiniRaftCluster cluster = newCluster(5);
-    cluster.start();
+    runWithNewCluster(5, this::runTestBasicLeaderElection);
+  }
+
+  void runTestBasicLeaderElection(MiniRaftCluster cluster) throws Exception {
     RaftTestUtil.waitAndKillLeader(cluster);
     RaftTestUtil.waitAndKillLeader(cluster);
     RaftTestUtil.waitAndKillLeader(cluster);
     testFailureCase("waitForLeader after killed a majority of servers",
         () -> RaftTestUtil.waitForLeader(cluster, null, false),
         IllegalStateException.class);
-    cluster.shutdown();
   }
 
   static class SleepCode implements CodeInjectionForTesting.Code {
@@ -124,9 +121,11 @@ public abstract class LeaderElectionTests<CLUSTER extends 
MiniRaftCluster>
     final int sleepMs = 1000 + ThreadLocalRandom.current().nextInt(1000);
     LOG.info("Running testWaitServerReady, sleep = {}ms", sleepMs);
     CodeInjectionForTesting.put(RaftServerImpl.START_COMPLETE, new 
SleepCode(sleepMs));
-    final MiniRaftCluster cluster = newCluster(1);
     final Timestamp startTime = Timestamp.currentTime();
-    cluster.start();
+    runWithNewCluster(1, c -> runTestWaitServerReady(c, sleepMs, startTime));
+  }
+
+  void runTestWaitServerReady(MiniRaftCluster cluster, int sleepMs, Timestamp 
startTime) throws Exception {
     LOG.info("Cluster started at {}ms", startTime.elapsedTimeMs());
     final RaftGroupId groupId = cluster.getGroupId();
     final RaftServerImpl server = (RaftServerImpl) 
cluster.getServers().iterator().next().getDivision(groupId);
@@ -138,16 +137,17 @@ public abstract class LeaderElectionTests<CLUSTER extends 
MiniRaftCluster>
     final long elapsedMs = startTime.elapsedTimeMs();
     // allow a small difference to tolerate system timer inaccuracy
     Assertions.assertTrue(elapsedMs > sleepMs - 10, () -> "elapseMs = " + 
elapsedMs + " but sleepMs = " + sleepMs);
-    cluster.shutdown();
     CodeInjectionForTesting.remove(RaftServerImpl.START_COMPLETE);
   }
 
   @Test
-  public void testAddServerForWaitReady() throws IOException, 
InterruptedException {
+  public void testAddServerForWaitReady() throws Exception {
     LOG.info("Running testAddServerForWaitReady");
     // normal startup cluster with 3 server
-    final MiniRaftCluster cluster = newCluster(3);
-    cluster.start();
+    runWithNewCluster(3, this::runTestAddServerForWaitReady);
+  }
+
+  void runTestAddServerForWaitReady(MiniRaftCluster cluster) throws Exception {
     RaftTestUtil.waitForLeader(cluster);
     try (RaftClient client = cluster.createClient()) {
       for (int i = 0; i < 10; ++i) {
@@ -162,7 +162,7 @@ public abstract class LeaderElectionTests<CLUSTER extends 
MiniRaftCluster>
       RaftClientReply reply = 
client.admin().setConfiguration(SetConfigurationRequest.Arguments.newBuilder()
               .setServersInNewConf(peerChanges.newPeers)
               .setMode(SetConfigurationRequest.Mode.ADD).build());
-      Assert.assertTrue(reply.isSuccess());
+      assertTrue(reply.isSuccess());
       for (RaftServer server : cluster.getServers()) {
         RaftServerProxy proxy = (RaftServerProxy) server;
         proxy.getImpls().forEach(s -> {
@@ -170,24 +170,20 @@ public abstract class LeaderElectionTests<CLUSTER extends 
MiniRaftCluster>
         });
       }
     }
-    cluster.shutdown();;
     CodeInjectionForTesting.remove(RaftServerImpl.START_COMPLETE);
   }
 
   @Test
   public void testChangeLeader() throws Exception {
-    SegmentedRaftLogTestUtils.setRaftLogWorkerLogLevel(Level.TRACE);
     LOG.info("Running testChangeLeader");
-    final MiniRaftCluster cluster = newCluster(3);
-    cluster.start();
+    runWithNewCluster(3, this::runTestChangeLeader);
+  }
 
+  void runTestChangeLeader(MiniRaftCluster cluster) throws Exception {
     RaftPeerId leader = RaftTestUtil.waitForLeader(cluster).getId();
     for(int i = 0; i < 10; i++) {
       leader = RaftTestUtil.changeLeader(cluster, leader, 
IllegalStateException::new);
-      ExitUtils.assertNotTerminated();
     }
-    SegmentedRaftLogTestUtils.setRaftLogWorkerLogLevel(Level.INFO);
-    cluster.shutdown();
   }
 
   @Test
@@ -273,8 +269,6 @@ public abstract class LeaderElectionTests<CLUSTER extends 
MiniRaftCluster>
         Assertions.assertEquals(newLeader.getId().toString(), 
reply.getReplierId());
         Assertions.assertTrue(reply.isSuccess());
       }
-
-      cluster.shutdown();
     }
   }
 
@@ -307,8 +301,6 @@ public abstract class LeaderElectionTests<CLUSTER extends 
MiniRaftCluster>
         Assertions.assertEquals(newLeader.getId().toString(), 
reply.getReplierId());
         Assertions.assertTrue(reply.isSuccess());
       }
-
-      cluster.shutdown();
     }
   }
 
@@ -366,8 +358,6 @@ public abstract class LeaderElectionTests<CLUSTER extends 
MiniRaftCluster>
 
         RaftTestUtil.deIsolate(cluster, newLeader.getId());
       }
-
-      cluster.shutdown();
     }
   }
 
@@ -405,14 +395,18 @@ public abstract class LeaderElectionTests<CLUSTER extends 
MiniRaftCluster>
 
   @Test
   public void testLateServerStart() throws Exception {
-    final int numServer = 3;
     LOG.info("Running testLateServerStart");
-    final MiniRaftCluster cluster = newCluster(numServer);
+    try (final MiniRaftCluster cluster = newCluster(3)) {
+      runTestLateServerStart(cluster);
+    }
+  }
+
+  void runTestLateServerStart(MiniRaftCluster cluster) throws Exception {
     cluster.initServers();
 
     // start all except one servers
     final Iterator<RaftServer> i = cluster.getServers().iterator();
-    for(int j = 1; j < numServer; j++) {
+    for(int j = 1; j < cluster.getNumServers(); j++) {
       i.next().start();
     }
 
@@ -430,7 +424,6 @@ public abstract class LeaderElectionTests<CLUSTER extends 
MiniRaftCluster>
         10, ONE_SECOND, "getLeaderId", LOG);
     LOG.info(cluster.printServers());
     Assertions.assertEquals(leader.getId(), lastServerLeaderId);
-    cluster.shutdown();
   }
 
   protected void testDisconnectLeader() throws Exception {
@@ -448,8 +441,6 @@ public abstract class LeaderElectionTests<CLUSTER extends 
MiniRaftCluster>
       } finally {
         RaftTestUtil.deIsolate(cluster, leader.getId());
       }
-
-      cluster.shutdown();
     }
   }
 
@@ -471,7 +462,6 @@ public abstract class LeaderElectionTests<CLUSTER extends 
MiniRaftCluster>
         Assertions.assertEquals(1, listener.size());
         Assertions.assertEquals(changes.newPeers[0].getId(), new 
ArrayList<>(listener).get(0).getId());
       }
-      cluster.shutdown();
     }
   }
 
@@ -497,7 +487,6 @@ public abstract class LeaderElectionTests<CLUSTER extends 
MiniRaftCluster>
         Assertions.assertEquals(1,
             
leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER).size());
       }
-      cluster.shutdown();
     }
   }
 
@@ -516,7 +505,6 @@ public abstract class LeaderElectionTests<CLUSTER extends 
MiniRaftCluster>
         Assertions.assertTrue(reply.isSuccess());
         Assertions.assertEquals(0, 
leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER).size());
       }
-      cluster.shutdown();
     }
   }
 
@@ -539,7 +527,6 @@ public abstract class LeaderElectionTests<CLUSTER extends 
MiniRaftCluster>
         Assertions.assertEquals(1, peer.size());
         Assertions.assertEquals(listeners.get(0).getId(), new 
ArrayList<>(peer).get(0).getId());
       }
-      cluster.shutdown();
     }
   }
 
@@ -558,15 +545,16 @@ public abstract class LeaderElectionTests<CLUSTER extends 
MiniRaftCluster>
         Collection<RaftPeer> peer = 
leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER);
         Assertions.assertEquals(0, peer.size());
       }
-      cluster.shutdown();
     }
   }
 
   @Test
-  public void testLeaderElectionMetrics() throws IOException, 
InterruptedException {
+  public void testLeaderElectionMetrics() throws Exception {
+    runWithNewCluster(3, this::runTestLeaderElectionMetrics);
+  }
+
+  void runTestLeaderElectionMetrics(MiniRaftCluster cluster) throws Exception {
     Timestamp timestamp = Timestamp.currentTime();
-    final MiniRaftCluster cluster = newCluster(3);
-    cluster.start();
     final RaftServer.Division leaderServer = waitForLeader(cluster);
 
     final RatisMetricRegistryImpl ratisMetricRegistry = 
(RatisMetricRegistryImpl)
@@ -588,7 +576,6 @@ public abstract class LeaderElectionTests<CLUSTER extends 
MiniRaftCluster>
     Long leaderElectionLatency = (Long) ratisMetricRegistry.getGauges((s, 
metric) ->
         
s.contains(LAST_LEADER_ELECTION_ELAPSED_TIME)).values().iterator().next().getValue();
     assertTrue(leaderElectionLatency > 0L);
-    cluster.shutdown();
   }
 
   @Test
@@ -654,8 +641,6 @@ public abstract class LeaderElectionTests<CLUSTER extends 
MiniRaftCluster>
         reply = client.io().send(new RaftTestUtil.SimpleMessage("message"));
         Assertions.assertTrue(reply.isSuccess());
       }
-
-      cluster.shutdown();
     } catch (Exception e) {
       fail(e.getMessage());
     }

Reply via email to