This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 875f9231eaf5281de8d2464b6ee3ebe5384bfa55
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Jul 8 08:56:19 2022 -0700

    RATIS-1609. RaftStorageImpl should not lock the directory in the 
constructor. (#667)
---
 .../org/apache/ratis/util/ConcurrentUtils.java     | 33 ++++++---
 .../org/apache/ratis/util/MemoizedSupplier.java    |  5 ++
 .../apache/ratis/server/storage/RaftStorage.java   |  5 +-
 .../apache/ratis/server/impl/RaftServerImpl.java   |  4 +-
 .../apache/ratis/server/impl/RaftServerProxy.java  | 21 ++++--
 .../org/apache/ratis/server/impl/ServerState.java  | 79 ++++++++++++++--------
 .../server/storage/RaftStorageDirectoryImpl.java   |  1 +
 .../ratis/server/storage/RaftStorageImpl.java      | 53 ++++++++++-----
 .../ratis/server/storage/StorageImplUtils.java     |  2 -
 .../ratis/server/storage/RaftStorageTestUtils.java |  4 +-
 .../apache/ratis/grpc/TestRaftServerWithGrpc.java  | 12 ++--
 .../apache/ratis/server/ServerRestartTests.java    |  7 +-
 .../ratis/server/storage/TestRaftStorage.java      |  8 ++-
 13 files changed, 153 insertions(+), 81 deletions(-)

diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
index d0c87288..df214b38 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.util;
 
+import org.apache.ratis.util.function.CheckedConsumer;
 import org.apache.ratis.util.function.CheckedFunction;
 
 import java.util.ArrayList;
@@ -138,31 +139,41 @@ public interface ConcurrentUtils {
 
   /**
    * The same as collection.parallelStream().forEach(action) except that
-   * (1) this method is asynchronous, and
-   * (2) an executor can be passed to this method.
+   * (1) this method is asynchronous,
+   * (2) an executor can be passed to this method, and
+   * (3) the action can throw a checked exception.
    *
    * @param collection The given collection.
    * @param action To act on each element in the collection.
    * @param executor To execute the action.
-   * @param <T> The element type.
+   * @param <E> The element type.
+   * @param <THROWABLE> the exception type.
    *
    * @return a {@link CompletableFuture} that is completed
    *         when the action is completed for each element in the collection.
+   *         When the action throws an exception, the future will be completed 
exceptionally.
    *
    * @see Collection#parallelStream()
    * @see java.util.stream.Stream#forEach(Consumer)
    */
-  static <T> CompletableFuture<Void> parallelForEachAsync(Collection<T> 
collection, Consumer<? super T> action,
-      Executor executor) {
-    final List<CompletableFuture<T>> futures = new 
ArrayList<>(collection.size());
+  static <E, THROWABLE extends Throwable> CompletableFuture<Void> 
parallelForEachAsync(
+      Collection<E> collection, CheckedConsumer<? super E, THROWABLE> action, 
Executor executor) {
+    final List<CompletableFuture<E>> futures = new 
ArrayList<>(collection.size());
     collection.forEach(element -> {
-      final CompletableFuture<T> f = new CompletableFuture<>();
+      final CompletableFuture<E> f = new CompletableFuture<>();
       futures.add(f);
-      executor.execute(() -> {
-        action.accept(element);
-        f.complete(element);
-      });
+      executor.execute(() -> accept(action, element, f));
     });
     return JavaUtils.allOf(futures);
   }
+
+  static <E, THROWABLE extends Throwable> void accept(
+      CheckedConsumer<? super E, THROWABLE> action, E element, 
CompletableFuture<E> f) {
+    try {
+      action.accept(element);
+      f.complete(element);
+    } catch (Throwable t) {
+      f.completeExceptionally(t);
+    }
+  }
 }
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/MemoizedSupplier.java 
b/ratis-common/src/main/java/org/apache/ratis/util/MemoizedSupplier.java
index 1f45de28..f179d2dc 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/MemoizedSupplier.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/MemoizedSupplier.java
@@ -71,4 +71,9 @@ public final class MemoizedSupplier<T> implements Supplier<T> 
{
   public boolean isInitialized() {
     return value != null;
   }
+
+  @Override
+  public String toString() {
+    return isInitialized()? "Memoized:" + get(): "UNINITIALIZED";
+  }
 }
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
 
b/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
index bd2a59a2..dde3c31b 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
@@ -34,6 +34,9 @@ import java.lang.reflect.Method;
 public interface RaftStorage extends Closeable {
   Logger LOG = LoggerFactory.getLogger(RaftStorage.class);
 
+  /** Initialize the storage. */
+  void initialize() throws IOException;
+
   /** @return the storage directory. */
   RaftStorageDirectory getStorageDir();
 
@@ -43,7 +46,7 @@ public interface RaftStorage extends Closeable {
   /** @return the corruption policy for raft log. */
   CorruptionPolicy getLogCorruptionPolicy();
 
-   static Builder newBuilder() {
+  static Builder newBuilder() {
     return new Builder();
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 32471dde..a24e25d8 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -302,10 +302,12 @@ class RaftServerImpl implements RaftServer.Division,
     this.role.transitionRole(newRole);
   }
 
-  boolean start() {
+  boolean start() throws IOException {
     if (!lifeCycle.compareAndTransition(NEW, STARTING)) {
       return false;
     }
+    state.initialize(stateMachine);
+
     final RaftConfigurationImpl conf = getRaftConf();
     if (conf != null && conf.containsInBothConfs(getId())) {
       LOG.info("{}: start as a follower, conf={}", getMemberId(), conf);
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 01d71005..96f7efbe 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -376,13 +376,16 @@ class RaftServerProxy implements RaftServer {
 
   @Override
   public void start() throws IOException {
+    lifeCycle.startAndTransition(this::startImpl, IOException.class);
+  }
+
+  private void startImpl() throws IOException {
     ConcurrentUtils.parallelForEachAsync(getImpls(), RaftServerImpl::start, 
executor).join();
 
-    lifeCycle.startAndTransition(() -> {
-      LOG.info("{}: start RPC server", getId());
-      getServerRpc().start();
-      getDataStreamServerRpc().start();
-    }, IOException.class);
+    LOG.info("{}: start RPC server", getId());
+    getServerRpc().start();
+    getDataStreamServerRpc().start();
+
     pauseMonitor.start();
   }
 
@@ -480,8 +483,12 @@ class RaftServerProxy implements RaftServer {
     return impls.addNew(newGroup)
         .thenApplyAsync(newImpl -> {
           LOG.debug("{}: newImpl = {}", getId(), newImpl);
-          final boolean started = newImpl.start();
-          Preconditions.assertTrue(started, () -> getId()+ ": failed to start 
a new impl: " + newImpl);
+          try {
+            final boolean started = newImpl.start();
+            Preconditions.assertTrue(started, () -> getId()+ ": failed to 
start a new impl: " + newImpl);
+          } catch (IOException e) {
+            throw new CompletionException(e);
+          }
           return newImpl.newSuccessReply(request);
         }, implExecutor)
         .whenComplete((raftClientReply, throwable) -> {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 37c6fcd3..212b6934 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -35,6 +35,8 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.statemachine.SnapshotInfo;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.Timestamp;
 
@@ -64,11 +66,11 @@ class ServerState implements Closeable {
   private final RaftGroupMemberId memberId;
   private final RaftServerImpl server;
   /** Raft log */
-  private final RaftLog log;
+  private final MemoizedSupplier<RaftLog> log;
   /** Raft configuration */
   private final ConfigurationManager configurationManager;
   /** The thread that applies committed log entries to the state machine */
-  private final StateMachineUpdater stateMachineUpdater;
+  private final MemoizedSupplier<StateMachineUpdater> stateMachineUpdater;
   /** local storage for log and snapshot */
   private RaftStorageImpl storage;
   private final SnapshotManager snapshotManager;
@@ -137,12 +139,8 @@ class ServerState implements Closeable {
 
     snapshotManager = new SnapshotManager(storage, id);
 
-    stateMachine.initialize(server.getRaftServer(), group.getGroupId(), 
storage);
-    // read configuration from the storage
-    
Optional.ofNullable(storage.readRaftConfiguration()).ifPresent(this::setRaftConf);
-
     // On start the leader is null, start the clock now
-    leaderId = null;
+    this.leaderId = null;
     this.lastNoLeaderTime = Timestamp.currentTime();
     this.noLeaderTimeout = 
RaftServerConfigKeys.Notification.noLeaderTimeout(prop);
 
@@ -150,16 +148,23 @@ class ServerState implements Closeable {
         .map(SnapshotInfo::getIndex)
         .filter(i -> i >= 0)
         .orElse(RaftLog.INVALID_LOG_INDEX);
+    this.log = JavaUtils.memoize(() -> 
initRaftLog(getSnapshotIndexFromStateMachine, prop));
+    this.stateMachineUpdater = JavaUtils.memoize(() -> new StateMachineUpdater(
+        stateMachine, server, this, getLog().getSnapshotIndex(), prop));
+  }
+
+  void initialize(StateMachine stateMachine) throws IOException {
+    storage.initialize();
+    // read configuration from the storage
+    
Optional.ofNullable(storage.readRaftConfiguration()).ifPresent(this::setRaftConf);
+
+    stateMachine.initialize(server.getRaftServer(), 
getMemberId().getGroupId(), storage);
 
     // we cannot apply log entries to the state machine in this step, since we
     // do not know whether the local log entries have been committed.
-    this.log = initRaftLog(getMemberId(), server, storage, this::setRaftConf, 
getSnapshotIndexFromStateMachine, prop);
-
-    final RaftStorageMetadata metadata = log.loadMetadata();
+    final RaftStorageMetadata metadata = log.get().loadMetadata();
     currentTerm.set(metadata.getTerm());
     votedFor = metadata.getVotedFor();
-
-    stateMachineUpdater = new StateMachineUpdater(stateMachine, server, this, 
log.getSnapshotIndex(), prop);
   }
 
   RaftGroupMemberId getMemberId() {
@@ -195,7 +200,15 @@ class ServerState implements Closeable {
   }
 
   void start() {
-    stateMachineUpdater.start();
+    stateMachineUpdater.get().start();
+  }
+
+  private RaftLog initRaftLog(LongSupplier getSnapshotIndexFromStateMachine, 
RaftProperties prop) {
+    try {
+      return initRaftLog(getMemberId(), server, storage, this::setRaftConf, 
getSnapshotIndexFromStateMachine, prop);
+    } catch (IOException e) {
+      throw new IllegalStateException(getMemberId() + ": Failed to 
initRaftLog.", e);
+    }
   }
 
   private static RaftLog initRaftLog(RaftGroupMemberId memberId, 
RaftServerImpl server, RaftStorage storage,
@@ -260,7 +273,7 @@ class ServerState implements Closeable {
   }
 
   void persistMetadata() throws IOException {
-    log.persistMetadata(RaftStorageMetadata.valueOf(currentTerm.get(), 
votedFor));
+    getLog().persistMetadata(RaftStorageMetadata.valueOf(currentTerm.get(), 
votedFor));
   }
 
   RaftPeerId getVotedFor() {
@@ -312,8 +325,18 @@ class ServerState implements Closeable {
     setLeader(getMemberId().getPeerId(), "becomeLeader");
   }
 
+  StateMachineUpdater getStateMachineUpdater() {
+    if (!stateMachineUpdater.isInitialized()) {
+      throw new IllegalStateException(getMemberId() + ": stateMachineUpdater 
is uninitialized.");
+    }
+    return stateMachineUpdater.get();
+  }
+
   RaftLog getLog() {
-    return log;
+    if (!log.isInitialized()) {
+      throw new IllegalStateException(getMemberId() + ": log is 
uninitialized.");
+    }
+    return log.get();
   }
 
   TermIndex getLastEntry() {
@@ -330,7 +353,7 @@ class ServerState implements Closeable {
   }
 
   void appendLog(TransactionContext operation) throws StateMachineException {
-    log.append(currentTerm.get(), operation);
+    getLog().append(currentTerm.get(), operation);
     Objects.requireNonNull(operation.getLogEntry());
   }
 
@@ -406,33 +429,33 @@ class ServerState implements Closeable {
   }
 
   boolean updateCommitIndex(long majorityIndex, long curTerm, boolean 
isLeader) {
-    if (log.updateCommitIndex(majorityIndex, curTerm, isLeader)) {
-      stateMachineUpdater.notifyUpdater();
+    if (getLog().updateCommitIndex(majorityIndex, curTerm, isLeader)) {
+      getStateMachineUpdater().notifyUpdater();
       return true;
     }
     return false;
   }
 
   void notifyStateMachineUpdater() {
-    stateMachineUpdater.notifyUpdater();
+    getStateMachineUpdater().notifyUpdater();
   }
 
   void reloadStateMachine(long lastIndexInSnapshot) {
-    log.updateSnapshotIndex(lastIndexInSnapshot);
-    stateMachineUpdater.reloadStateMachine();
+    getLog().updateSnapshotIndex(lastIndexInSnapshot);
+    getStateMachineUpdater().reloadStateMachine();
   }
 
   @Override
   public void close() throws IOException {
     try {
-      stateMachineUpdater.stopAndJoin();
+      getStateMachineUpdater().stopAndJoin();
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       LOG.warn("{}: Interrupted when joining stateMachineUpdater", 
getMemberId(), e);
     }
     LOG.info("{}: closes. applyIndex: {}", getMemberId(), 
getLastAppliedIndex());
 
-    log.close();
+    getLog().close();
     storage.close();
   }
 
@@ -449,7 +472,7 @@ class ServerState implements Closeable {
   }
 
   void updateInstalledSnapshotIndex(TermIndex lastTermIndexInSnapshot) {
-    log.onSnapshotInstalled(lastTermIndexInSnapshot.getIndex());
+    getLog().onSnapshotInstalled(lastTermIndexInSnapshot.getIndex());
     latestInstalledSnapshot.set(lastTermIndexInSnapshot);
   }
 
@@ -473,13 +496,13 @@ class ServerState implements Closeable {
   }
 
   long getNextIndex() {
-    final long logNextIndex = log.getNextIndex();
-    final long snapshotNextIndex = log.getSnapshotIndex() + 1;
+    final long logNextIndex = getLog().getNextIndex();
+    final long snapshotNextIndex = getLog().getSnapshotIndex() + 1;
     return Math.max(logNextIndex, snapshotNextIndex);
   }
 
   long getLastAppliedIndex() {
-    return stateMachineUpdater.getStateMachineLastAppliedIndex();
+    return getStateMachineUpdater().getStateMachineLastAppliedIndex();
   }
 
   boolean containsTermIndex(TermIndex ti) {
@@ -491,6 +514,6 @@ class ServerState implements Closeable {
     if 
(Optional.ofNullable(getLatestSnapshot()).map(SnapshotInfo::getTermIndex).filter(ti::equals).isPresent())
 {
       return true;
     }
-    return log.contains(ti);
+    return getLog().contains(ti);
   }
 }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java
index 7d27f7a0..a7fa13b5 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java
@@ -42,6 +42,7 @@ class RaftStorageDirectoryImpl implements 
RaftStorageDirectory {
   private static final String JVM_NAME = 
ManagementFactory.getRuntimeMXBean().getName();
 
   enum StorageState {
+    UNINITIALIZED,
     NON_EXISTENT,
     NOT_FORMATTED,
     NO_SPACE,
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java
index fdf1b0b1..6513efd3 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java
@@ -23,7 +23,6 @@ import 
org.apache.ratis.server.RaftServerConfigKeys.Log.CorruptionPolicy;
 import org.apache.ratis.server.raftlog.LogProtoUtils;
 import org.apache.ratis.server.storage.RaftStorageDirectoryImpl.StorageState;
 import org.apache.ratis.util.JavaUtils;
-import org.apache.ratis.util.Preconditions;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -38,30 +37,48 @@ public class RaftStorageImpl implements RaftStorage {
 
   // TODO support multiple storage directories
   private final RaftStorageDirectoryImpl storageDir;
-  private final StorageState state;
+  private final StartupOption startupOption;
   private final CorruptionPolicy logCorruptionPolicy;
+  private volatile StorageState state = StorageState.UNINITIALIZED;
   private volatile RaftStorageMetadataFileImpl metaFile;
 
   RaftStorageImpl(File dir, CorruptionPolicy logCorruptionPolicy, 
StartupOption option,
-      long storageFeeSpaceMin) throws IOException {
+      long storageFeeSpaceMin) {
     this.storageDir = new RaftStorageDirectoryImpl(dir, storageFeeSpaceMin);
-    if (option == StartupOption.FORMAT) {
-      if (storageDir.analyzeStorage(false) == StorageState.NON_EXISTENT) {
-        throw new IOException("Cannot format " + storageDir);
-      }
-      storageDir.lock();
-      format();
-      state = storageDir.analyzeStorage(false);
-      Preconditions.assertTrue(state == StorageState.NORMAL);
-    } else {
-      state = analyzeAndRecoverStorage(true); // metaFile is initialized here
-      if (state != StorageState.NORMAL) {
-        storageDir.unlock();
-        throw new IOException("Cannot load " + storageDir
-            + ". Its state: " + state);
+    this.logCorruptionPolicy = 
Optional.ofNullable(logCorruptionPolicy).orElseGet(CorruptionPolicy::getDefault);
+    this.startupOption = option;
+  }
+
+  @Override
+  public void initialize() throws IOException {
+    try {
+      if (startupOption == StartupOption.FORMAT) {
+        if (storageDir.analyzeStorage(false) == StorageState.NON_EXISTENT) {
+          throw new IOException("Cannot format " + storageDir);
+        }
+        storageDir.lock();
+        format();
+        state = storageDir.analyzeStorage(false);
+      } else {
+        state = analyzeAndRecoverStorage(true); // metaFile is initialized here
       }
+    } catch (Throwable t) {
+      unlockOnFailure(storageDir);
+      throw t;
+    }
+
+    if (state != StorageState.NORMAL) {
+      unlockOnFailure(storageDir);
+      throw new IOException("Failed to load " + storageDir + ": " + state);
+    }
+  }
+
+  static void unlockOnFailure(RaftStorageDirectoryImpl dir) {
+    try {
+      dir.unlock();
+    } catch (Throwable t) {
+      LOG.warn("Failed to unlock " + dir, t);
     }
-    this.logCorruptionPolicy = 
Optional.ofNullable(logCorruptionPolicy).orElseGet(CorruptionPolicy::getDefault);
   }
 
   StorageState getState() {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java
index 10296657..aeff6014 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java
@@ -48,8 +48,6 @@ public final class StorageImplUtils {
       Thread.currentThread().interrupt();
       throw IOUtils.toInterruptedIOException(
           "Interrupted when creating RaftStorage " + dir, e);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
     }
     return raftStorage;
   }
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
index d22bc5c4..bb4f6a07 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
@@ -34,11 +34,13 @@ import java.util.function.Consumer;
 
 public interface RaftStorageTestUtils {
   static RaftStorage newRaftStorage(File dir) throws IOException {
-    return RaftStorage.newBuilder()
+    final RaftStorage storage = RaftStorage.newBuilder()
         .setDirectory(dir)
         .setOption(RaftStorage.StartupOption.RECOVER)
         
.setStorageFreeSpaceMin(RaftServerConfigKeys.STORAGE_FREE_SPACE_MIN_DEFAULT)
         .build();
+    storage.initialize();
+    return storage;
   }
 
   static String getLogFlushTimeMetric(String memberId) {
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
index 509916db..54c65175 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
@@ -70,6 +70,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.SortedMap;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
@@ -125,13 +126,10 @@ public class TestRaftServerWithGrpc extends BaseTest 
implements MiniRaftClusterW
     // the raft server proxy created earlier. Raft server proxy should close
     // the rpc server on failure.
     RaftServerConfigKeys.setStorageDir(p, 
Collections.singletonList(cluster.getStorageDir(leaderId)));
-    try {
-      LOG.info("start a new server with the same address");
-      newRaftServer(cluster, leaderId, stateMachine, p).start();
-    } catch (IOException e) {
-      Assert.assertTrue(e.getCause() instanceof OverlappingFileLockException);
-      Assert.assertTrue(e.getMessage().contains("directory is already 
locked"));
-    }
+    testFailureCase("Starting a new server with the same address should fail",
+        () -> newRaftServer(cluster, leaderId, stateMachine, p).start(),
+        CompletionException.class, LOG, IOException.class, 
OverlappingFileLockException.class);
+
     // Try to start a raft server rpc at the leader address.
     
cluster.getServerFactory(leaderId).newRaftServerRpc(cluster.getServer(leaderId));
   }
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java 
b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
index 537293d7..0e5c82e2 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
@@ -231,7 +231,7 @@ public abstract class ServerRestartTests<CLUSTER extends 
MiniRaftCluster>
   }
 
   void runTestRestartCommitIndex(MiniRaftCluster cluster) throws Exception {
-    final SimpleMessage[] messages = SimpleMessage.create(100);
+    final SimpleMessage[] messages = SimpleMessage.create(10);
     final List<CompletableFuture<Void>> futures = new 
ArrayList<>(messages.length);
     for(int i = 0; i < messages.length; i++) {
       final CompletableFuture<Void> f = new CompletableFuture<>();
@@ -248,6 +248,7 @@ public abstract class ServerRestartTests<CLUSTER extends 
MiniRaftCluster>
       }).start();
     }
     JavaUtils.allOf(futures).get();
+    LOG.info("sent {} messages.", messages.length);
 
     final List<RaftPeerId> ids = new ArrayList<>();
     final RaftServer.Division leader = cluster.getLeader();
@@ -343,7 +344,7 @@ public abstract class ServerRestartTests<CLUSTER extends 
MiniRaftCluster>
 
     testFailureCase("restart-fail-ChecksumException",
         () -> runWithNewCluster(1, this::runTestRestartWithCorruptedLogEntry),
-        CompletionException.class, ChecksumException.class);
+        CompletionException.class, IllegalStateException.class, 
ChecksumException.class);
 
     Log.setCorruptionPolicy(p, policy);
   }
@@ -354,7 +355,7 @@ public abstract class ServerRestartTests<CLUSTER extends 
MiniRaftCluster>
     final RaftPeerId id = leader.getId();
 
     // send a few messages
-    final SimpleMessage[] messages = SimpleMessage.create(10);
+    final SimpleMessage[] messages = SimpleMessage.create(100);
     final SimpleMessage lastMessage = messages[messages.length - 1];
     try (final RaftClient client = cluster.createClient()) {
       for (SimpleMessage m : messages) {
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java 
b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
index 5a34264c..ffc8de59 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
@@ -49,11 +49,13 @@ import java.util.regex.Matcher;
  */
 public class TestRaftStorage extends BaseTest {
   static RaftStorageImpl newRaftStorage(File dir) throws IOException {
-    return (RaftStorageImpl) RaftStorage.newBuilder()
+    final RaftStorageImpl impl = (RaftStorageImpl) RaftStorage.newBuilder()
         .setDirectory(dir)
         .setOption(RaftStorage.StartupOption.RECOVER)
         
.setStorageFreeSpaceMin(RaftServerConfigKeys.STORAGE_FREE_SPACE_MIN_DEFAULT)
         .build();
+    impl.initialize();
+    return impl;
   }
 
   private File storageDir;
@@ -71,11 +73,13 @@ public class TestRaftStorage extends BaseTest {
   }
 
   static RaftStorageImpl formatRaftStorage(File dir) throws IOException {
-    return (RaftStorageImpl) RaftStorage.newBuilder()
+    final RaftStorageImpl impl = (RaftStorageImpl) RaftStorage.newBuilder()
         .setDirectory(dir)
         .setOption(RaftStorage.StartupOption.FORMAT)
         .setStorageFreeSpaceMin(SizeInBytes.valueOf(0))
         .build();
+    impl.initialize();
+    return impl;
   }
 
   @Test

Reply via email to