This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 875f9231eaf5281de8d2464b6ee3ebe5384bfa55 Author: Tsz-Wo Nicholas Sze <[email protected]> AuthorDate: Fri Jul 8 08:56:19 2022 -0700 RATIS-1609. RaftStorageImpl should not lock the directory in the constructor. (#667) --- .../org/apache/ratis/util/ConcurrentUtils.java | 33 ++++++--- .../org/apache/ratis/util/MemoizedSupplier.java | 5 ++ .../apache/ratis/server/storage/RaftStorage.java | 5 +- .../apache/ratis/server/impl/RaftServerImpl.java | 4 +- .../apache/ratis/server/impl/RaftServerProxy.java | 21 ++++-- .../org/apache/ratis/server/impl/ServerState.java | 79 ++++++++++++++-------- .../server/storage/RaftStorageDirectoryImpl.java | 1 + .../ratis/server/storage/RaftStorageImpl.java | 53 ++++++++++----- .../ratis/server/storage/StorageImplUtils.java | 2 - .../ratis/server/storage/RaftStorageTestUtils.java | 4 +- .../apache/ratis/grpc/TestRaftServerWithGrpc.java | 12 ++-- .../apache/ratis/server/ServerRestartTests.java | 7 +- .../ratis/server/storage/TestRaftStorage.java | 8 ++- 13 files changed, 153 insertions(+), 81 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java index d0c87288..df214b38 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java @@ -17,6 +17,7 @@ */ package org.apache.ratis.util; +import org.apache.ratis.util.function.CheckedConsumer; import org.apache.ratis.util.function.CheckedFunction; import java.util.ArrayList; @@ -138,31 +139,41 @@ public interface ConcurrentUtils { /** * The same as collection.parallelStream().forEach(action) except that - * (1) this method is asynchronous, and - * (2) an executor can be passed to this method. + * (1) this method is asynchronous, + * (2) an executor can be passed to this method, and + * (3) the action can throw a checked exception. * * @param collection The given collection. * @param action To act on each element in the collection. * @param executor To execute the action. - * @param <T> The element type. + * @param <E> The element type. + * @param <THROWABLE> the exception type. * * @return a {@link CompletableFuture} that is completed * when the action is completed for each element in the collection. + * When the action throws an exception, the future will be completed exceptionally. * * @see Collection#parallelStream() * @see java.util.stream.Stream#forEach(Consumer) */ - static <T> CompletableFuture<Void> parallelForEachAsync(Collection<T> collection, Consumer<? super T> action, - Executor executor) { - final List<CompletableFuture<T>> futures = new ArrayList<>(collection.size()); + static <E, THROWABLE extends Throwable> CompletableFuture<Void> parallelForEachAsync( + Collection<E> collection, CheckedConsumer<? super E, THROWABLE> action, Executor executor) { + final List<CompletableFuture<E>> futures = new ArrayList<>(collection.size()); collection.forEach(element -> { - final CompletableFuture<T> f = new CompletableFuture<>(); + final CompletableFuture<E> f = new CompletableFuture<>(); futures.add(f); - executor.execute(() -> { - action.accept(element); - f.complete(element); - }); + executor.execute(() -> accept(action, element, f)); }); return JavaUtils.allOf(futures); } + + static <E, THROWABLE extends Throwable> void accept( + CheckedConsumer<? super E, THROWABLE> action, E element, CompletableFuture<E> f) { + try { + action.accept(element); + f.complete(element); + } catch (Throwable t) { + f.completeExceptionally(t); + } + } } diff --git a/ratis-common/src/main/java/org/apache/ratis/util/MemoizedSupplier.java b/ratis-common/src/main/java/org/apache/ratis/util/MemoizedSupplier.java index 1f45de28..f179d2dc 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/MemoizedSupplier.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/MemoizedSupplier.java @@ -71,4 +71,9 @@ public final class MemoizedSupplier<T> implements Supplier<T> { public boolean isInitialized() { return value != null; } + + @Override + public String toString() { + return isInitialized()? "Memoized:" + get(): "UNINITIALIZED"; + } } diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java b/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java index bd2a59a2..dde3c31b 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java @@ -34,6 +34,9 @@ import java.lang.reflect.Method; public interface RaftStorage extends Closeable { Logger LOG = LoggerFactory.getLogger(RaftStorage.class); + /** Initialize the storage. */ + void initialize() throws IOException; + /** @return the storage directory. */ RaftStorageDirectory getStorageDir(); @@ -43,7 +46,7 @@ public interface RaftStorage extends Closeable { /** @return the corruption policy for raft log. */ CorruptionPolicy getLogCorruptionPolicy(); - static Builder newBuilder() { + static Builder newBuilder() { return new Builder(); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 32471dde..a24e25d8 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -302,10 +302,12 @@ class RaftServerImpl implements RaftServer.Division, this.role.transitionRole(newRole); } - boolean start() { + boolean start() throws IOException { if (!lifeCycle.compareAndTransition(NEW, STARTING)) { return false; } + state.initialize(stateMachine); + final RaftConfigurationImpl conf = getRaftConf(); if (conf != null && conf.containsInBothConfs(getId())) { LOG.info("{}: start as a follower, conf={}", getMemberId(), conf); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java index 01d71005..96f7efbe 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java @@ -376,13 +376,16 @@ class RaftServerProxy implements RaftServer { @Override public void start() throws IOException { + lifeCycle.startAndTransition(this::startImpl, IOException.class); + } + + private void startImpl() throws IOException { ConcurrentUtils.parallelForEachAsync(getImpls(), RaftServerImpl::start, executor).join(); - lifeCycle.startAndTransition(() -> { - LOG.info("{}: start RPC server", getId()); - getServerRpc().start(); - getDataStreamServerRpc().start(); - }, IOException.class); + LOG.info("{}: start RPC server", getId()); + getServerRpc().start(); + getDataStreamServerRpc().start(); + pauseMonitor.start(); } @@ -480,8 +483,12 @@ class RaftServerProxy implements RaftServer { return impls.addNew(newGroup) .thenApplyAsync(newImpl -> { LOG.debug("{}: newImpl = {}", getId(), newImpl); - final boolean started = newImpl.start(); - Preconditions.assertTrue(started, () -> getId()+ ": failed to start a new impl: " + newImpl); + try { + final boolean started = newImpl.start(); + Preconditions.assertTrue(started, () -> getId()+ ": failed to start a new impl: " + newImpl); + } catch (IOException e) { + throw new CompletionException(e); + } return newImpl.newSuccessReply(request); }, implExecutor) .whenComplete((raftClientReply, throwable) -> { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java index 37c6fcd3..212b6934 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java @@ -35,6 +35,8 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto; import org.apache.ratis.statemachine.SnapshotInfo; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.statemachine.TransactionContext; +import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.MemoizedSupplier; import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.Timestamp; @@ -64,11 +66,11 @@ class ServerState implements Closeable { private final RaftGroupMemberId memberId; private final RaftServerImpl server; /** Raft log */ - private final RaftLog log; + private final MemoizedSupplier<RaftLog> log; /** Raft configuration */ private final ConfigurationManager configurationManager; /** The thread that applies committed log entries to the state machine */ - private final StateMachineUpdater stateMachineUpdater; + private final MemoizedSupplier<StateMachineUpdater> stateMachineUpdater; /** local storage for log and snapshot */ private RaftStorageImpl storage; private final SnapshotManager snapshotManager; @@ -137,12 +139,8 @@ class ServerState implements Closeable { snapshotManager = new SnapshotManager(storage, id); - stateMachine.initialize(server.getRaftServer(), group.getGroupId(), storage); - // read configuration from the storage - Optional.ofNullable(storage.readRaftConfiguration()).ifPresent(this::setRaftConf); - // On start the leader is null, start the clock now - leaderId = null; + this.leaderId = null; this.lastNoLeaderTime = Timestamp.currentTime(); this.noLeaderTimeout = RaftServerConfigKeys.Notification.noLeaderTimeout(prop); @@ -150,16 +148,23 @@ class ServerState implements Closeable { .map(SnapshotInfo::getIndex) .filter(i -> i >= 0) .orElse(RaftLog.INVALID_LOG_INDEX); + this.log = JavaUtils.memoize(() -> initRaftLog(getSnapshotIndexFromStateMachine, prop)); + this.stateMachineUpdater = JavaUtils.memoize(() -> new StateMachineUpdater( + stateMachine, server, this, getLog().getSnapshotIndex(), prop)); + } + + void initialize(StateMachine stateMachine) throws IOException { + storage.initialize(); + // read configuration from the storage + Optional.ofNullable(storage.readRaftConfiguration()).ifPresent(this::setRaftConf); + + stateMachine.initialize(server.getRaftServer(), getMemberId().getGroupId(), storage); // we cannot apply log entries to the state machine in this step, since we // do not know whether the local log entries have been committed. - this.log = initRaftLog(getMemberId(), server, storage, this::setRaftConf, getSnapshotIndexFromStateMachine, prop); - - final RaftStorageMetadata metadata = log.loadMetadata(); + final RaftStorageMetadata metadata = log.get().loadMetadata(); currentTerm.set(metadata.getTerm()); votedFor = metadata.getVotedFor(); - - stateMachineUpdater = new StateMachineUpdater(stateMachine, server, this, log.getSnapshotIndex(), prop); } RaftGroupMemberId getMemberId() { @@ -195,7 +200,15 @@ class ServerState implements Closeable { } void start() { - stateMachineUpdater.start(); + stateMachineUpdater.get().start(); + } + + private RaftLog initRaftLog(LongSupplier getSnapshotIndexFromStateMachine, RaftProperties prop) { + try { + return initRaftLog(getMemberId(), server, storage, this::setRaftConf, getSnapshotIndexFromStateMachine, prop); + } catch (IOException e) { + throw new IllegalStateException(getMemberId() + ": Failed to initRaftLog.", e); + } } private static RaftLog initRaftLog(RaftGroupMemberId memberId, RaftServerImpl server, RaftStorage storage, @@ -260,7 +273,7 @@ class ServerState implements Closeable { } void persistMetadata() throws IOException { - log.persistMetadata(RaftStorageMetadata.valueOf(currentTerm.get(), votedFor)); + getLog().persistMetadata(RaftStorageMetadata.valueOf(currentTerm.get(), votedFor)); } RaftPeerId getVotedFor() { @@ -312,8 +325,18 @@ class ServerState implements Closeable { setLeader(getMemberId().getPeerId(), "becomeLeader"); } + StateMachineUpdater getStateMachineUpdater() { + if (!stateMachineUpdater.isInitialized()) { + throw new IllegalStateException(getMemberId() + ": stateMachineUpdater is uninitialized."); + } + return stateMachineUpdater.get(); + } + RaftLog getLog() { - return log; + if (!log.isInitialized()) { + throw new IllegalStateException(getMemberId() + ": log is uninitialized."); + } + return log.get(); } TermIndex getLastEntry() { @@ -330,7 +353,7 @@ class ServerState implements Closeable { } void appendLog(TransactionContext operation) throws StateMachineException { - log.append(currentTerm.get(), operation); + getLog().append(currentTerm.get(), operation); Objects.requireNonNull(operation.getLogEntry()); } @@ -406,33 +429,33 @@ class ServerState implements Closeable { } boolean updateCommitIndex(long majorityIndex, long curTerm, boolean isLeader) { - if (log.updateCommitIndex(majorityIndex, curTerm, isLeader)) { - stateMachineUpdater.notifyUpdater(); + if (getLog().updateCommitIndex(majorityIndex, curTerm, isLeader)) { + getStateMachineUpdater().notifyUpdater(); return true; } return false; } void notifyStateMachineUpdater() { - stateMachineUpdater.notifyUpdater(); + getStateMachineUpdater().notifyUpdater(); } void reloadStateMachine(long lastIndexInSnapshot) { - log.updateSnapshotIndex(lastIndexInSnapshot); - stateMachineUpdater.reloadStateMachine(); + getLog().updateSnapshotIndex(lastIndexInSnapshot); + getStateMachineUpdater().reloadStateMachine(); } @Override public void close() throws IOException { try { - stateMachineUpdater.stopAndJoin(); + getStateMachineUpdater().stopAndJoin(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); LOG.warn("{}: Interrupted when joining stateMachineUpdater", getMemberId(), e); } LOG.info("{}: closes. applyIndex: {}", getMemberId(), getLastAppliedIndex()); - log.close(); + getLog().close(); storage.close(); } @@ -449,7 +472,7 @@ class ServerState implements Closeable { } void updateInstalledSnapshotIndex(TermIndex lastTermIndexInSnapshot) { - log.onSnapshotInstalled(lastTermIndexInSnapshot.getIndex()); + getLog().onSnapshotInstalled(lastTermIndexInSnapshot.getIndex()); latestInstalledSnapshot.set(lastTermIndexInSnapshot); } @@ -473,13 +496,13 @@ class ServerState implements Closeable { } long getNextIndex() { - final long logNextIndex = log.getNextIndex(); - final long snapshotNextIndex = log.getSnapshotIndex() + 1; + final long logNextIndex = getLog().getNextIndex(); + final long snapshotNextIndex = getLog().getSnapshotIndex() + 1; return Math.max(logNextIndex, snapshotNextIndex); } long getLastAppliedIndex() { - return stateMachineUpdater.getStateMachineLastAppliedIndex(); + return getStateMachineUpdater().getStateMachineLastAppliedIndex(); } boolean containsTermIndex(TermIndex ti) { @@ -491,6 +514,6 @@ class ServerState implements Closeable { if (Optional.ofNullable(getLatestSnapshot()).map(SnapshotInfo::getTermIndex).filter(ti::equals).isPresent()) { return true; } - return log.contains(ti); + return getLog().contains(ti); } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java index 7d27f7a0..a7fa13b5 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java @@ -42,6 +42,7 @@ class RaftStorageDirectoryImpl implements RaftStorageDirectory { private static final String JVM_NAME = ManagementFactory.getRuntimeMXBean().getName(); enum StorageState { + UNINITIALIZED, NON_EXISTENT, NOT_FORMATTED, NO_SPACE, diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java index fdf1b0b1..6513efd3 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java @@ -23,7 +23,6 @@ import org.apache.ratis.server.RaftServerConfigKeys.Log.CorruptionPolicy; import org.apache.ratis.server.raftlog.LogProtoUtils; import org.apache.ratis.server.storage.RaftStorageDirectoryImpl.StorageState; import org.apache.ratis.util.JavaUtils; -import org.apache.ratis.util.Preconditions; import java.io.File; import java.io.FileNotFoundException; @@ -38,30 +37,48 @@ public class RaftStorageImpl implements RaftStorage { // TODO support multiple storage directories private final RaftStorageDirectoryImpl storageDir; - private final StorageState state; + private final StartupOption startupOption; private final CorruptionPolicy logCorruptionPolicy; + private volatile StorageState state = StorageState.UNINITIALIZED; private volatile RaftStorageMetadataFileImpl metaFile; RaftStorageImpl(File dir, CorruptionPolicy logCorruptionPolicy, StartupOption option, - long storageFeeSpaceMin) throws IOException { + long storageFeeSpaceMin) { this.storageDir = new RaftStorageDirectoryImpl(dir, storageFeeSpaceMin); - if (option == StartupOption.FORMAT) { - if (storageDir.analyzeStorage(false) == StorageState.NON_EXISTENT) { - throw new IOException("Cannot format " + storageDir); - } - storageDir.lock(); - format(); - state = storageDir.analyzeStorage(false); - Preconditions.assertTrue(state == StorageState.NORMAL); - } else { - state = analyzeAndRecoverStorage(true); // metaFile is initialized here - if (state != StorageState.NORMAL) { - storageDir.unlock(); - throw new IOException("Cannot load " + storageDir - + ". Its state: " + state); + this.logCorruptionPolicy = Optional.ofNullable(logCorruptionPolicy).orElseGet(CorruptionPolicy::getDefault); + this.startupOption = option; + } + + @Override + public void initialize() throws IOException { + try { + if (startupOption == StartupOption.FORMAT) { + if (storageDir.analyzeStorage(false) == StorageState.NON_EXISTENT) { + throw new IOException("Cannot format " + storageDir); + } + storageDir.lock(); + format(); + state = storageDir.analyzeStorage(false); + } else { + state = analyzeAndRecoverStorage(true); // metaFile is initialized here } + } catch (Throwable t) { + unlockOnFailure(storageDir); + throw t; + } + + if (state != StorageState.NORMAL) { + unlockOnFailure(storageDir); + throw new IOException("Failed to load " + storageDir + ": " + state); + } + } + + static void unlockOnFailure(RaftStorageDirectoryImpl dir) { + try { + dir.unlock(); + } catch (Throwable t) { + LOG.warn("Failed to unlock " + dir, t); } - this.logCorruptionPolicy = Optional.ofNullable(logCorruptionPolicy).orElseGet(CorruptionPolicy::getDefault); } StorageState getState() { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java index 10296657..aeff6014 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java @@ -48,8 +48,6 @@ public final class StorageImplUtils { Thread.currentThread().interrupt(); throw IOUtils.toInterruptedIOException( "Interrupted when creating RaftStorage " + dir, e); - } catch (IOException e) { - throw new RuntimeException(e); } return raftStorage; } diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java index d22bc5c4..bb4f6a07 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java @@ -34,11 +34,13 @@ import java.util.function.Consumer; public interface RaftStorageTestUtils { static RaftStorage newRaftStorage(File dir) throws IOException { - return RaftStorage.newBuilder() + final RaftStorage storage = RaftStorage.newBuilder() .setDirectory(dir) .setOption(RaftStorage.StartupOption.RECOVER) .setStorageFreeSpaceMin(RaftServerConfigKeys.STORAGE_FREE_SPACE_MIN_DEFAULT) .build(); + storage.initialize(); + return storage; } static String getLogFlushTimeMetric(String memberId) { diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java index 509916db..54c65175 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java @@ -70,6 +70,7 @@ import java.util.Collections; import java.util.List; import java.util.SortedMap; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; @@ -125,13 +126,10 @@ public class TestRaftServerWithGrpc extends BaseTest implements MiniRaftClusterW // the raft server proxy created earlier. Raft server proxy should close // the rpc server on failure. RaftServerConfigKeys.setStorageDir(p, Collections.singletonList(cluster.getStorageDir(leaderId))); - try { - LOG.info("start a new server with the same address"); - newRaftServer(cluster, leaderId, stateMachine, p).start(); - } catch (IOException e) { - Assert.assertTrue(e.getCause() instanceof OverlappingFileLockException); - Assert.assertTrue(e.getMessage().contains("directory is already locked")); - } + testFailureCase("Starting a new server with the same address should fail", + () -> newRaftServer(cluster, leaderId, stateMachine, p).start(), + CompletionException.class, LOG, IOException.class, OverlappingFileLockException.class); + // Try to start a raft server rpc at the leader address. cluster.getServerFactory(leaderId).newRaftServerRpc(cluster.getServer(leaderId)); } diff --git a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java index 537293d7..0e5c82e2 100644 --- a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java +++ b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java @@ -231,7 +231,7 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster> } void runTestRestartCommitIndex(MiniRaftCluster cluster) throws Exception { - final SimpleMessage[] messages = SimpleMessage.create(100); + final SimpleMessage[] messages = SimpleMessage.create(10); final List<CompletableFuture<Void>> futures = new ArrayList<>(messages.length); for(int i = 0; i < messages.length; i++) { final CompletableFuture<Void> f = new CompletableFuture<>(); @@ -248,6 +248,7 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster> }).start(); } JavaUtils.allOf(futures).get(); + LOG.info("sent {} messages.", messages.length); final List<RaftPeerId> ids = new ArrayList<>(); final RaftServer.Division leader = cluster.getLeader(); @@ -343,7 +344,7 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster> testFailureCase("restart-fail-ChecksumException", () -> runWithNewCluster(1, this::runTestRestartWithCorruptedLogEntry), - CompletionException.class, ChecksumException.class); + CompletionException.class, IllegalStateException.class, ChecksumException.class); Log.setCorruptionPolicy(p, policy); } @@ -354,7 +355,7 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster> final RaftPeerId id = leader.getId(); // send a few messages - final SimpleMessage[] messages = SimpleMessage.create(10); + final SimpleMessage[] messages = SimpleMessage.create(100); final SimpleMessage lastMessage = messages[messages.length - 1]; try (final RaftClient client = cluster.createClient()) { for (SimpleMessage m : messages) { diff --git a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java index 5a34264c..ffc8de59 100644 --- a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java +++ b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java @@ -49,11 +49,13 @@ import java.util.regex.Matcher; */ public class TestRaftStorage extends BaseTest { static RaftStorageImpl newRaftStorage(File dir) throws IOException { - return (RaftStorageImpl) RaftStorage.newBuilder() + final RaftStorageImpl impl = (RaftStorageImpl) RaftStorage.newBuilder() .setDirectory(dir) .setOption(RaftStorage.StartupOption.RECOVER) .setStorageFreeSpaceMin(RaftServerConfigKeys.STORAGE_FREE_SPACE_MIN_DEFAULT) .build(); + impl.initialize(); + return impl; } private File storageDir; @@ -71,11 +73,13 @@ public class TestRaftStorage extends BaseTest { } static RaftStorageImpl formatRaftStorage(File dir) throws IOException { - return (RaftStorageImpl) RaftStorage.newBuilder() + final RaftStorageImpl impl = (RaftStorageImpl) RaftStorage.newBuilder() .setDirectory(dir) .setOption(RaftStorage.StartupOption.FORMAT) .setStorageFreeSpaceMin(SizeInBytes.valueOf(0)) .build(); + impl.initialize(); + return impl; } @Test
