This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 53a3eaa9 RATIS-1609. RaftStorageImpl should not lock the directory in
the constructor. (#667)
53a3eaa9 is described below
commit 53a3eaa9618f7323ccac3864a9bc86cdec883762
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Jul 8 08:56:19 2022 -0700
RATIS-1609. RaftStorageImpl should not lock the directory in the
constructor. (#667)
---
.../org/apache/ratis/util/ConcurrentUtils.java | 33 ++++++---
.../org/apache/ratis/util/MemoizedSupplier.java | 5 ++
.../apache/ratis/server/storage/RaftStorage.java | 5 +-
.../apache/ratis/server/impl/RaftServerImpl.java | 4 +-
.../apache/ratis/server/impl/RaftServerProxy.java | 21 ++++--
.../org/apache/ratis/server/impl/ServerState.java | 79 ++++++++++++++--------
.../server/storage/RaftStorageDirectoryImpl.java | 1 +
.../ratis/server/storage/RaftStorageImpl.java | 53 ++++++++++-----
.../ratis/server/storage/StorageImplUtils.java | 2 -
.../ratis/server/storage/RaftStorageTestUtils.java | 4 +-
.../apache/ratis/grpc/TestRaftServerWithGrpc.java | 12 ++--
.../apache/ratis/server/ServerRestartTests.java | 7 +-
.../ratis/server/storage/TestRaftStorage.java | 8 ++-
13 files changed, 153 insertions(+), 81 deletions(-)
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
b/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
index d0c87288..df214b38 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.util;
+import org.apache.ratis.util.function.CheckedConsumer;
import org.apache.ratis.util.function.CheckedFunction;
import java.util.ArrayList;
@@ -138,31 +139,41 @@ public interface ConcurrentUtils {
/**
* The same as collection.parallelStream().forEach(action) except that
- * (1) this method is asynchronous, and
- * (2) an executor can be passed to this method.
+ * (1) this method is asynchronous,
+ * (2) an executor can be passed to this method, and
+ * (3) the action can throw a checked exception.
*
* @param collection The given collection.
* @param action To act on each element in the collection.
* @param executor To execute the action.
- * @param <T> The element type.
+ * @param <E> The element type.
+ * @param <THROWABLE> the exception type.
*
* @return a {@link CompletableFuture} that is completed
* when the action is completed for each element in the collection.
+ * When the action throws an exception, the future will be completed
exceptionally.
*
* @see Collection#parallelStream()
* @see java.util.stream.Stream#forEach(Consumer)
*/
- static <T> CompletableFuture<Void> parallelForEachAsync(Collection<T>
collection, Consumer<? super T> action,
- Executor executor) {
- final List<CompletableFuture<T>> futures = new
ArrayList<>(collection.size());
+ static <E, THROWABLE extends Throwable> CompletableFuture<Void>
parallelForEachAsync(
+ Collection<E> collection, CheckedConsumer<? super E, THROWABLE> action,
Executor executor) {
+ final List<CompletableFuture<E>> futures = new
ArrayList<>(collection.size());
collection.forEach(element -> {
- final CompletableFuture<T> f = new CompletableFuture<>();
+ final CompletableFuture<E> f = new CompletableFuture<>();
futures.add(f);
- executor.execute(() -> {
- action.accept(element);
- f.complete(element);
- });
+ executor.execute(() -> accept(action, element, f));
});
return JavaUtils.allOf(futures);
}
+
+ static <E, THROWABLE extends Throwable> void accept(
+ CheckedConsumer<? super E, THROWABLE> action, E element,
CompletableFuture<E> f) {
+ try {
+ action.accept(element);
+ f.complete(element);
+ } catch (Throwable t) {
+ f.completeExceptionally(t);
+ }
+ }
}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/MemoizedSupplier.java
b/ratis-common/src/main/java/org/apache/ratis/util/MemoizedSupplier.java
index 1f45de28..f179d2dc 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/MemoizedSupplier.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/MemoizedSupplier.java
@@ -71,4 +71,9 @@ public final class MemoizedSupplier<T> implements Supplier<T>
{
public boolean isInitialized() {
return value != null;
}
+
+ @Override
+ public String toString() {
+ return isInitialized()? "Memoized:" + get(): "UNINITIALIZED";
+ }
}
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
index bd2a59a2..dde3c31b 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
@@ -34,6 +34,9 @@ import java.lang.reflect.Method;
public interface RaftStorage extends Closeable {
Logger LOG = LoggerFactory.getLogger(RaftStorage.class);
+ /** Initialize the storage. */
+ void initialize() throws IOException;
+
/** @return the storage directory. */
RaftStorageDirectory getStorageDir();
@@ -43,7 +46,7 @@ public interface RaftStorage extends Closeable {
/** @return the corruption policy for raft log. */
CorruptionPolicy getLogCorruptionPolicy();
- static Builder newBuilder() {
+ static Builder newBuilder() {
return new Builder();
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 32471dde..a24e25d8 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -302,10 +302,12 @@ class RaftServerImpl implements RaftServer.Division,
this.role.transitionRole(newRole);
}
- boolean start() {
+ boolean start() throws IOException {
if (!lifeCycle.compareAndTransition(NEW, STARTING)) {
return false;
}
+ state.initialize(stateMachine);
+
final RaftConfigurationImpl conf = getRaftConf();
if (conf != null && conf.containsInBothConfs(getId())) {
LOG.info("{}: start as a follower, conf={}", getMemberId(), conf);
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 01d71005..96f7efbe 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -376,13 +376,16 @@ class RaftServerProxy implements RaftServer {
@Override
public void start() throws IOException {
+ lifeCycle.startAndTransition(this::startImpl, IOException.class);
+ }
+
+ private void startImpl() throws IOException {
ConcurrentUtils.parallelForEachAsync(getImpls(), RaftServerImpl::start,
executor).join();
- lifeCycle.startAndTransition(() -> {
- LOG.info("{}: start RPC server", getId());
- getServerRpc().start();
- getDataStreamServerRpc().start();
- }, IOException.class);
+ LOG.info("{}: start RPC server", getId());
+ getServerRpc().start();
+ getDataStreamServerRpc().start();
+
pauseMonitor.start();
}
@@ -480,8 +483,12 @@ class RaftServerProxy implements RaftServer {
return impls.addNew(newGroup)
.thenApplyAsync(newImpl -> {
LOG.debug("{}: newImpl = {}", getId(), newImpl);
- final boolean started = newImpl.start();
- Preconditions.assertTrue(started, () -> getId()+ ": failed to start
a new impl: " + newImpl);
+ try {
+ final boolean started = newImpl.start();
+ Preconditions.assertTrue(started, () -> getId()+ ": failed to
start a new impl: " + newImpl);
+ } catch (IOException e) {
+ throw new CompletionException(e);
+ }
return newImpl.newSuccessReply(request);
}, implExecutor)
.whenComplete((raftClientReply, throwable) -> {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 37c6fcd3..212b6934 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -35,6 +35,8 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.Timestamp;
@@ -64,11 +66,11 @@ class ServerState implements Closeable {
private final RaftGroupMemberId memberId;
private final RaftServerImpl server;
/** Raft log */
- private final RaftLog log;
+ private final MemoizedSupplier<RaftLog> log;
/** Raft configuration */
private final ConfigurationManager configurationManager;
/** The thread that applies committed log entries to the state machine */
- private final StateMachineUpdater stateMachineUpdater;
+ private final MemoizedSupplier<StateMachineUpdater> stateMachineUpdater;
/** local storage for log and snapshot */
private RaftStorageImpl storage;
private final SnapshotManager snapshotManager;
@@ -137,12 +139,8 @@ class ServerState implements Closeable {
snapshotManager = new SnapshotManager(storage, id);
- stateMachine.initialize(server.getRaftServer(), group.getGroupId(),
storage);
- // read configuration from the storage
-
Optional.ofNullable(storage.readRaftConfiguration()).ifPresent(this::setRaftConf);
-
// On start the leader is null, start the clock now
- leaderId = null;
+ this.leaderId = null;
this.lastNoLeaderTime = Timestamp.currentTime();
this.noLeaderTimeout =
RaftServerConfigKeys.Notification.noLeaderTimeout(prop);
@@ -150,16 +148,23 @@ class ServerState implements Closeable {
.map(SnapshotInfo::getIndex)
.filter(i -> i >= 0)
.orElse(RaftLog.INVALID_LOG_INDEX);
+ this.log = JavaUtils.memoize(() ->
initRaftLog(getSnapshotIndexFromStateMachine, prop));
+ this.stateMachineUpdater = JavaUtils.memoize(() -> new StateMachineUpdater(
+ stateMachine, server, this, getLog().getSnapshotIndex(), prop));
+ }
+
+ void initialize(StateMachine stateMachine) throws IOException {
+ storage.initialize();
+ // read configuration from the storage
+
Optional.ofNullable(storage.readRaftConfiguration()).ifPresent(this::setRaftConf);
+
+ stateMachine.initialize(server.getRaftServer(),
getMemberId().getGroupId(), storage);
// we cannot apply log entries to the state machine in this step, since we
// do not know whether the local log entries have been committed.
- this.log = initRaftLog(getMemberId(), server, storage, this::setRaftConf,
getSnapshotIndexFromStateMachine, prop);
-
- final RaftStorageMetadata metadata = log.loadMetadata();
+ final RaftStorageMetadata metadata = log.get().loadMetadata();
currentTerm.set(metadata.getTerm());
votedFor = metadata.getVotedFor();
-
- stateMachineUpdater = new StateMachineUpdater(stateMachine, server, this,
log.getSnapshotIndex(), prop);
}
RaftGroupMemberId getMemberId() {
@@ -195,7 +200,15 @@ class ServerState implements Closeable {
}
void start() {
- stateMachineUpdater.start();
+ stateMachineUpdater.get().start();
+ }
+
+ private RaftLog initRaftLog(LongSupplier getSnapshotIndexFromStateMachine,
RaftProperties prop) {
+ try {
+ return initRaftLog(getMemberId(), server, storage, this::setRaftConf,
getSnapshotIndexFromStateMachine, prop);
+ } catch (IOException e) {
+ throw new IllegalStateException(getMemberId() + ": Failed to
initRaftLog.", e);
+ }
}
private static RaftLog initRaftLog(RaftGroupMemberId memberId,
RaftServerImpl server, RaftStorage storage,
@@ -260,7 +273,7 @@ class ServerState implements Closeable {
}
void persistMetadata() throws IOException {
- log.persistMetadata(RaftStorageMetadata.valueOf(currentTerm.get(),
votedFor));
+ getLog().persistMetadata(RaftStorageMetadata.valueOf(currentTerm.get(),
votedFor));
}
RaftPeerId getVotedFor() {
@@ -312,8 +325,18 @@ class ServerState implements Closeable {
setLeader(getMemberId().getPeerId(), "becomeLeader");
}
+ StateMachineUpdater getStateMachineUpdater() {
+ if (!stateMachineUpdater.isInitialized()) {
+ throw new IllegalStateException(getMemberId() + ": stateMachineUpdater
is uninitialized.");
+ }
+ return stateMachineUpdater.get();
+ }
+
RaftLog getLog() {
- return log;
+ if (!log.isInitialized()) {
+ throw new IllegalStateException(getMemberId() + ": log is
uninitialized.");
+ }
+ return log.get();
}
TermIndex getLastEntry() {
@@ -330,7 +353,7 @@ class ServerState implements Closeable {
}
void appendLog(TransactionContext operation) throws StateMachineException {
- log.append(currentTerm.get(), operation);
+ getLog().append(currentTerm.get(), operation);
Objects.requireNonNull(operation.getLogEntry());
}
@@ -406,33 +429,33 @@ class ServerState implements Closeable {
}
boolean updateCommitIndex(long majorityIndex, long curTerm, boolean
isLeader) {
- if (log.updateCommitIndex(majorityIndex, curTerm, isLeader)) {
- stateMachineUpdater.notifyUpdater();
+ if (getLog().updateCommitIndex(majorityIndex, curTerm, isLeader)) {
+ getStateMachineUpdater().notifyUpdater();
return true;
}
return false;
}
void notifyStateMachineUpdater() {
- stateMachineUpdater.notifyUpdater();
+ getStateMachineUpdater().notifyUpdater();
}
void reloadStateMachine(long lastIndexInSnapshot) {
- log.updateSnapshotIndex(lastIndexInSnapshot);
- stateMachineUpdater.reloadStateMachine();
+ getLog().updateSnapshotIndex(lastIndexInSnapshot);
+ getStateMachineUpdater().reloadStateMachine();
}
@Override
public void close() throws IOException {
try {
- stateMachineUpdater.stopAndJoin();
+ getStateMachineUpdater().stopAndJoin();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("{}: Interrupted when joining stateMachineUpdater",
getMemberId(), e);
}
LOG.info("{}: closes. applyIndex: {}", getMemberId(),
getLastAppliedIndex());
- log.close();
+ getLog().close();
storage.close();
}
@@ -449,7 +472,7 @@ class ServerState implements Closeable {
}
void updateInstalledSnapshotIndex(TermIndex lastTermIndexInSnapshot) {
- log.onSnapshotInstalled(lastTermIndexInSnapshot.getIndex());
+ getLog().onSnapshotInstalled(lastTermIndexInSnapshot.getIndex());
latestInstalledSnapshot.set(lastTermIndexInSnapshot);
}
@@ -473,13 +496,13 @@ class ServerState implements Closeable {
}
long getNextIndex() {
- final long logNextIndex = log.getNextIndex();
- final long snapshotNextIndex = log.getSnapshotIndex() + 1;
+ final long logNextIndex = getLog().getNextIndex();
+ final long snapshotNextIndex = getLog().getSnapshotIndex() + 1;
return Math.max(logNextIndex, snapshotNextIndex);
}
long getLastAppliedIndex() {
- return stateMachineUpdater.getStateMachineLastAppliedIndex();
+ return getStateMachineUpdater().getStateMachineLastAppliedIndex();
}
boolean containsTermIndex(TermIndex ti) {
@@ -491,6 +514,6 @@ class ServerState implements Closeable {
if
(Optional.ofNullable(getLatestSnapshot()).map(SnapshotInfo::getTermIndex).filter(ti::equals).isPresent())
{
return true;
}
- return log.contains(ti);
+ return getLog().contains(ti);
}
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java
index 7d27f7a0..a7fa13b5 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java
@@ -42,6 +42,7 @@ class RaftStorageDirectoryImpl implements
RaftStorageDirectory {
private static final String JVM_NAME =
ManagementFactory.getRuntimeMXBean().getName();
enum StorageState {
+ UNINITIALIZED,
NON_EXISTENT,
NOT_FORMATTED,
NO_SPACE,
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java
index fdf1b0b1..6513efd3 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java
@@ -23,7 +23,6 @@ import
org.apache.ratis.server.RaftServerConfigKeys.Log.CorruptionPolicy;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.storage.RaftStorageDirectoryImpl.StorageState;
import org.apache.ratis.util.JavaUtils;
-import org.apache.ratis.util.Preconditions;
import java.io.File;
import java.io.FileNotFoundException;
@@ -38,30 +37,48 @@ public class RaftStorageImpl implements RaftStorage {
// TODO support multiple storage directories
private final RaftStorageDirectoryImpl storageDir;
- private final StorageState state;
+ private final StartupOption startupOption;
private final CorruptionPolicy logCorruptionPolicy;
+ private volatile StorageState state = StorageState.UNINITIALIZED;
private volatile RaftStorageMetadataFileImpl metaFile;
RaftStorageImpl(File dir, CorruptionPolicy logCorruptionPolicy,
StartupOption option,
- long storageFeeSpaceMin) throws IOException {
+ long storageFeeSpaceMin) {
this.storageDir = new RaftStorageDirectoryImpl(dir, storageFeeSpaceMin);
- if (option == StartupOption.FORMAT) {
- if (storageDir.analyzeStorage(false) == StorageState.NON_EXISTENT) {
- throw new IOException("Cannot format " + storageDir);
- }
- storageDir.lock();
- format();
- state = storageDir.analyzeStorage(false);
- Preconditions.assertTrue(state == StorageState.NORMAL);
- } else {
- state = analyzeAndRecoverStorage(true); // metaFile is initialized here
- if (state != StorageState.NORMAL) {
- storageDir.unlock();
- throw new IOException("Cannot load " + storageDir
- + ". Its state: " + state);
+ this.logCorruptionPolicy =
Optional.ofNullable(logCorruptionPolicy).orElseGet(CorruptionPolicy::getDefault);
+ this.startupOption = option;
+ }
+
+ @Override
+ public void initialize() throws IOException {
+ try {
+ if (startupOption == StartupOption.FORMAT) {
+ if (storageDir.analyzeStorage(false) == StorageState.NON_EXISTENT) {
+ throw new IOException("Cannot format " + storageDir);
+ }
+ storageDir.lock();
+ format();
+ state = storageDir.analyzeStorage(false);
+ } else {
+ state = analyzeAndRecoverStorage(true); // metaFile is initialized here
}
+ } catch (Throwable t) {
+ unlockOnFailure(storageDir);
+ throw t;
+ }
+
+ if (state != StorageState.NORMAL) {
+ unlockOnFailure(storageDir);
+ throw new IOException("Failed to load " + storageDir + ": " + state);
+ }
+ }
+
+ static void unlockOnFailure(RaftStorageDirectoryImpl dir) {
+ try {
+ dir.unlock();
+ } catch (Throwable t) {
+ LOG.warn("Failed to unlock " + dir, t);
}
- this.logCorruptionPolicy =
Optional.ofNullable(logCorruptionPolicy).orElseGet(CorruptionPolicy::getDefault);
}
StorageState getState() {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java
b/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java
index 10296657..aeff6014 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java
@@ -48,8 +48,6 @@ public final class StorageImplUtils {
Thread.currentThread().interrupt();
throw IOUtils.toInterruptedIOException(
"Interrupted when creating RaftStorage " + dir, e);
- } catch (IOException e) {
- throw new RuntimeException(e);
}
return raftStorage;
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
index d22bc5c4..bb4f6a07 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
@@ -34,11 +34,13 @@ import java.util.function.Consumer;
public interface RaftStorageTestUtils {
static RaftStorage newRaftStorage(File dir) throws IOException {
- return RaftStorage.newBuilder()
+ final RaftStorage storage = RaftStorage.newBuilder()
.setDirectory(dir)
.setOption(RaftStorage.StartupOption.RECOVER)
.setStorageFreeSpaceMin(RaftServerConfigKeys.STORAGE_FREE_SPACE_MIN_DEFAULT)
.build();
+ storage.initialize();
+ return storage;
}
static String getLogFlushTimeMetric(String memberId) {
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
index 509916db..54c65175 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
@@ -70,6 +70,7 @@ import java.util.Collections;
import java.util.List;
import java.util.SortedMap;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@@ -125,13 +126,10 @@ public class TestRaftServerWithGrpc extends BaseTest
implements MiniRaftClusterW
// the raft server proxy created earlier. Raft server proxy should close
// the rpc server on failure.
RaftServerConfigKeys.setStorageDir(p,
Collections.singletonList(cluster.getStorageDir(leaderId)));
- try {
- LOG.info("start a new server with the same address");
- newRaftServer(cluster, leaderId, stateMachine, p).start();
- } catch (IOException e) {
- Assert.assertTrue(e.getCause() instanceof OverlappingFileLockException);
- Assert.assertTrue(e.getMessage().contains("directory is already
locked"));
- }
+ testFailureCase("Starting a new server with the same address should fail",
+ () -> newRaftServer(cluster, leaderId, stateMachine, p).start(),
+ CompletionException.class, LOG, IOException.class,
OverlappingFileLockException.class);
+
// Try to start a raft server rpc at the leader address.
cluster.getServerFactory(leaderId).newRaftServerRpc(cluster.getServer(leaderId));
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
index 537293d7..0e5c82e2 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
@@ -231,7 +231,7 @@ public abstract class ServerRestartTests<CLUSTER extends
MiniRaftCluster>
}
void runTestRestartCommitIndex(MiniRaftCluster cluster) throws Exception {
- final SimpleMessage[] messages = SimpleMessage.create(100);
+ final SimpleMessage[] messages = SimpleMessage.create(10);
final List<CompletableFuture<Void>> futures = new
ArrayList<>(messages.length);
for(int i = 0; i < messages.length; i++) {
final CompletableFuture<Void> f = new CompletableFuture<>();
@@ -248,6 +248,7 @@ public abstract class ServerRestartTests<CLUSTER extends
MiniRaftCluster>
}).start();
}
JavaUtils.allOf(futures).get();
+ LOG.info("sent {} messages.", messages.length);
final List<RaftPeerId> ids = new ArrayList<>();
final RaftServer.Division leader = cluster.getLeader();
@@ -343,7 +344,7 @@ public abstract class ServerRestartTests<CLUSTER extends
MiniRaftCluster>
testFailureCase("restart-fail-ChecksumException",
() -> runWithNewCluster(1, this::runTestRestartWithCorruptedLogEntry),
- CompletionException.class, ChecksumException.class);
+ CompletionException.class, IllegalStateException.class,
ChecksumException.class);
Log.setCorruptionPolicy(p, policy);
}
@@ -354,7 +355,7 @@ public abstract class ServerRestartTests<CLUSTER extends
MiniRaftCluster>
final RaftPeerId id = leader.getId();
// send a few messages
- final SimpleMessage[] messages = SimpleMessage.create(10);
+ final SimpleMessage[] messages = SimpleMessage.create(100);
final SimpleMessage lastMessage = messages[messages.length - 1];
try (final RaftClient client = cluster.createClient()) {
for (SimpleMessage m : messages) {
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
index 5a34264c..ffc8de59 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
@@ -49,11 +49,13 @@ import java.util.regex.Matcher;
*/
public class TestRaftStorage extends BaseTest {
static RaftStorageImpl newRaftStorage(File dir) throws IOException {
- return (RaftStorageImpl) RaftStorage.newBuilder()
+ final RaftStorageImpl impl = (RaftStorageImpl) RaftStorage.newBuilder()
.setDirectory(dir)
.setOption(RaftStorage.StartupOption.RECOVER)
.setStorageFreeSpaceMin(RaftServerConfigKeys.STORAGE_FREE_SPACE_MIN_DEFAULT)
.build();
+ impl.initialize();
+ return impl;
}
private File storageDir;
@@ -71,11 +73,13 @@ public class TestRaftStorage extends BaseTest {
}
static RaftStorageImpl formatRaftStorage(File dir) throws IOException {
- return (RaftStorageImpl) RaftStorage.newBuilder()
+ final RaftStorageImpl impl = (RaftStorageImpl) RaftStorage.newBuilder()
.setDirectory(dir)
.setOption(RaftStorage.StartupOption.FORMAT)
.setStorageFreeSpaceMin(SizeInBytes.valueOf(0))
.build();
+ impl.initialize();
+ return impl;
}
@Test