This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 5560718ab RATIS-2011. When a log entry is truncated, remove
TransactionContext. (#1029)
5560718ab is described below
commit 5560718aba3ec9cb1447f7305ba4e23b2567effa
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Thu Jan 25 08:07:23 2024 -0800
RATIS-2011. When a log entry is truncated, remove TransactionContext.
(#1029)
---
.../org/apache/ratis/client/impl/OrderedAsync.java | 11 ++-
.../java/org/apache/ratis/util/BatchLogger.java | 12 ++-
.../main/java/org/apache/ratis/util/JavaUtils.java | 16 +---
.../apache/ratis/server/impl/RaftServerImpl.java | 21 ++++-
.../ratis/server/impl/TransactionManager.java | 49 +++++++++--
.../apache/ratis/server/raftlog/LogProtoUtils.java | 18 +++-
.../apache/ratis/server/raftlog/RaftLogBase.java | 3 +-
.../test/java/org/apache/ratis/RaftTestUtil.java | 45 +++++++++-
.../ratis/server/impl/RaftServerTestUtil.java | 13 +++
.../ratis/server/impl/RetryCacheTestUtil.java | 4 +
.../org/apache/ratis/RaftLogTruncateTests.java | 95 +++++++++++++++++-----
11 files changed, 231 insertions(+), 56 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
index 34dc3be11..09c6cd4ac 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
@@ -32,6 +32,7 @@ import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.rpc.CallId;
+import org.apache.ratis.util.BatchLogger;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
@@ -50,6 +51,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongFunction;
@@ -57,6 +59,10 @@ import java.util.function.LongFunction;
public final class OrderedAsync {
public static final Logger LOG = LoggerFactory.getLogger(OrderedAsync.class);
+ private enum BatchLogKey implements BatchLogger.Key {
+ SEND_REQUEST_EXCEPTION
+ }
+
static class PendingOrderedRequest extends PendingClientRequest
implements SlidingWindow.ClientSideRequest<RaftClientReply> {
private final long callId;
@@ -204,7 +210,10 @@ public final class OrderedAsync {
getSlidingWindow(request).receiveReply(
request.getSlidingWindowEntry().getSeqNum(), reply,
this::sendRequestWithRetry);
}).exceptionally(e -> {
- LOG.error(client.getId() + ": Failed* " + request, e);
+ final Throwable exception = e;
+ final String key = client.getId() + "-" + request.getCallId() + "-" +
exception;
+ final Consumer<String> op = suffix -> LOG.error("{} {}: Failed* {}",
suffix, client.getId(), request, exception);
+ BatchLogger.warn(BatchLogKey.SEND_REQUEST_EXCEPTION, key, op);
handleException(pending, request, e);
return null;
});
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/BatchLogger.java
b/ratis-common/src/main/java/org/apache/ratis/util/BatchLogger.java
index 9ccd66ad7..38dad5c49 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/BatchLogger.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/BatchLogger.java
@@ -35,7 +35,13 @@ public final class BatchLogger {
private BatchLogger() {
}
- public interface Key {}
+ public interface Key {
+ TimeDuration DEFAULT_DURATION = TimeDuration.valueOf(5, TimeUnit.SECONDS);
+
+ default TimeDuration getBatchDuration() {
+ return DEFAULT_DURATION;
+ }
+ }
private static final class UniqueId {
private final Key key;
@@ -93,6 +99,10 @@ public final class BatchLogger {
private static final TimeoutExecutor SCHEDULER =
TimeoutExecutor.getInstance();
private static final ConcurrentMap<UniqueId, BatchedLogEntry> LOG_CACHE =
new ConcurrentHashMap<>();
+ public static void warn(Key key, String name, Consumer<String> op) {
+ warn(key, name, op, key.getBatchDuration(), true);
+ }
+
public static void warn(Key key, String name, Consumer<String> op,
TimeDuration batchDuration) {
warn(key, name, op, batchDuration, true);
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
index 00725903a..d3f899a7f 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
@@ -41,7 +41,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
-import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -229,7 +228,7 @@ public interface JavaUtils {
}
if (log != null && log.isWarnEnabled()) {
log.warn("FAILED \"" + name.get() + "\", attempt #" + i + "/" +
numAttempts
- + ": " + t + ", sleep " + sleepTime + " and then retry.", t);
+ + ", sleep " + sleepTime + " and then retry: " + t);
}
}
@@ -245,19 +244,6 @@ public interface JavaUtils {
attemptRepeatedly(CheckedRunnable.asCheckedSupplier(runnable),
numAttempts, sleepTime, name, log);
}
- /** Attempt to wait the given condition to return true multiple times. */
- static void attemptUntilTrue(
- BooleanSupplier condition, int numAttempts, TimeDuration sleepTime,
String name, Logger log)
- throws InterruptedException {
- Objects.requireNonNull(condition, "condition == null");
- attempt(() -> {
- if (!condition.getAsBoolean()) {
- throw new IllegalStateException("Condition " + name + " is false.");
- }
- }, numAttempts, sleepTime, name, log);
- }
-
-
static Timer runRepeatedly(Runnable runnable, long delay, long period,
TimeUnit unit) {
final Timer timer = new Timer(true);
timer.schedule(new TimerTask() {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 396243b1c..8f6c9273e 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -90,6 +90,7 @@ import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.statemachine.impl.TransactionContextImpl;
+import
org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
import
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.util.CodeInjectionForTesting;
import org.apache.ratis.util.CollectionUtils;
@@ -179,7 +180,7 @@ class RaftServerImpl implements RaftServer.Division,
private final DataStreamMap dataStreamMap;
private final RaftServerConfigKeys.Read.Option readOption;
- private final TransactionManager transactionManager = new
TransactionManager();
+ private final TransactionManager transactionManager;
private final RetryCacheImpl retryCache;
private final CommitInfoCache commitInfoCache = new CommitInfoCache();
private final WriteIndexCache writeIndexCache;
@@ -225,6 +226,7 @@ class RaftServerImpl implements RaftServer.Division,
this.dataStreamMap = new DataStreamMapImpl(id);
this.readOption = RaftServerConfigKeys.Read.option(properties);
this.writeIndexCache = new WriteIndexCache(properties);
+ this.transactionManager = new TransactionManager(id);
this.leaderElectionMetrics =
LeaderElectionMetrics.getLeaderElectionMetrics(
getMemberId(), state::getLastLeaderElapsedTimeMs);
@@ -1821,7 +1823,7 @@ class RaftServerImpl implements RaftServer.Division,
}
return stateMachineFuture.whenComplete((reply, exception) -> {
- transactionManager.remove(termIndex);
+ getTransactionManager().remove(termIndex);
final RaftClientReply.Builder b = newReplyBuilder(invocationId,
termIndex.getIndex());
final RaftClientReply r;
if (exception == null) {
@@ -1839,6 +1841,15 @@ class RaftServerImpl implements RaftServer.Division,
});
}
+ TransactionManager getTransactionManager() {
+ return transactionManager;
+ }
+
+ @VisibleForTesting
+ Map<TermIndex, MemoizedSupplier<TransactionContext>>
getTransactionContextMapForTesting() {
+ return getTransactionManager().getMap();
+ }
+
TransactionContext getTransactionContext(LogEntryProto entry, Boolean
createNew) {
if (!entry.hasStateMachineLogEntry()) {
return null;
@@ -1854,9 +1865,9 @@ class RaftServerImpl implements RaftServer.Division,
}
if (!createNew) {
- return transactionManager.get(termIndex);
+ return getTransactionManager().get(termIndex);
}
- return transactionManager.computeIfAbsent(termIndex,
+ return getTransactionManager().computeIfAbsent(termIndex,
// call startTransaction only once
MemoizedSupplier.valueOf(() -> stateMachine.startTransaction(entry,
getInfo().getCurrentRole())));
}
@@ -1898,6 +1909,8 @@ class RaftServerImpl implements RaftServer.Division,
*/
void notifyTruncatedLogEntry(LogEntryProto logEntry) {
if (logEntry.hasStateMachineLogEntry()) {
+ getTransactionManager().remove(TermIndex.valueOf(logEntry));
+
final ClientInvocationId invocationId =
ClientInvocationId.valueOf(logEntry.getStateMachineLogEntry());
final CacheEntry cacheEntry = getRetryCache().getIfPresent(invocationId);
if (cacheEntry != null) {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/TransactionManager.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransactionManager.java
index 283900fb6..c33bc26bc 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/TransactionManager.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransactionManager.java
@@ -19,27 +19,64 @@ package org.apache.ratis.server.impl;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.statemachine.TransactionContext;
+import
org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MemoizedSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.function.Supplier;
/**
* Managing {@link TransactionContext}.
*/
class TransactionManager {
- private final ConcurrentMap<TermIndex, Supplier<TransactionContext>>
contexts = new ConcurrentHashMap<>();
+ static final Logger LOG = LoggerFactory.getLogger(TransactionManager.class);
+
+ private final String name;
+ private final ConcurrentMap<TermIndex, MemoizedSupplier<TransactionContext>>
contexts = new ConcurrentHashMap<>();
+
+ TransactionManager(Object name) {
+ this.name = name + "-" + JavaUtils.getClassSimpleName(getClass());
+ }
+
+ @VisibleForTesting
+ Map<TermIndex, MemoizedSupplier<TransactionContext>> getMap() {
+ LOG.debug("{}", this);
+ return Collections.unmodifiableMap(contexts);
+ }
TransactionContext get(TermIndex termIndex) {
- return
Optional.ofNullable(contexts.get(termIndex)).map(Supplier::get).orElse(null);
+ return
Optional.ofNullable(contexts.get(termIndex)).map(MemoizedSupplier::get).orElse(null);
}
- TransactionContext computeIfAbsent(TermIndex termIndex,
Supplier<TransactionContext> constructor) {
- return contexts.computeIfAbsent(termIndex, i -> constructor).get();
+ TransactionContext computeIfAbsent(TermIndex termIndex,
MemoizedSupplier<TransactionContext> constructor) {
+ final MemoizedSupplier<TransactionContext> m =
contexts.computeIfAbsent(termIndex, i -> constructor);
+ if (!m.isInitialized()) {
+ LOG.debug("{}: {}", termIndex, this);
+ }
+ return m.get();
}
void remove(TermIndex termIndex) {
- contexts.remove(termIndex);
+ final MemoizedSupplier<TransactionContext> removed =
contexts.remove(termIndex);
+ if (removed != null) {
+ LOG.debug("{}: {}", termIndex, this);
+ }
+ }
+
+ @Override
+ public String toString() {
+ if (contexts.isEmpty()) {
+ return name + " <empty>";
+ }
+
+ final StringBuilder b = new StringBuilder(name);
+ contexts.forEach((k, v) -> b.append("\n ").append(k).append(":
initialized? ").append(v.isInitialized()));
+ return b.toString();
}
}
\ No newline at end of file
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
index b75777c9e..de06faf63 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
@@ -53,17 +53,27 @@ public final class LogProtoUtils {
s = "(c:" + metadata.getCommitIndex() + ")";
} else if (entry.hasConfigurationEntry()) {
final RaftConfigurationProto config = entry.getConfigurationEntry();
- s = "(current:" +
config.getPeersList().stream().map(AbstractMessage::toString).collect(Collectors.joining(","))
+
- ", old:" +
config.getOldPeersList().stream().map(AbstractMessage::toString).collect(Collectors.joining(","))
- + ")";
+ s = "(current:" + peersToString(config.getPeersList())
+ + ", old:" + peersToString(config.getOldPeersList()) + ")";
} else {
s = "";
}
return TermIndex.valueOf(entry) + ", " + entry.getLogEntryBodyCase() + s;
}
+ static String peersToString(List<RaftPeerProto> peers) {
+ return peers.stream().map(AbstractMessage::toString)
+ .map(s -> s.replace("\n", ""))
+ .map(s -> s.replace(" ", ""))
+ .collect(Collectors.joining(", "));
+ }
+
+ static String stateMachineLogEntryProtoToString(StateMachineLogEntryProto p)
{
+ return "logData:" + p.getLogData() + ", stateMachineEntry:" + p.getType()
+ ":" + p.getStateMachineEntry();
+ }
+
public static String toLogEntryString(LogEntryProto entry) {
- return toLogEntryString(entry, null);
+ return toLogEntryString(entry,
LogProtoUtils::stateMachineLogEntryProtoToString);
}
public static String toLogEntriesString(List<LogEntryProto> entries) {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
index b56b343bd..49e66e253 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
@@ -375,7 +375,8 @@ public abstract class RaftLogBase implements RaftLog {
@Override
public String toString() {
- return getName() + ":" + state + ":c" + getLastCommittedIndex();
+ return getName() + ":" + state + ":c" + getLastCommittedIndex()
+ + (isOpened()? ":last" + getLastEntryTermIndex(): "");
}
public AutoCloseableLock readLock() {
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 41a431149..8cd15aac1 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -45,6 +45,7 @@ import org.apache.ratis.util.ProtoUtils;
import org.apache.ratis.util.TimeDuration;
import org.junit.Assert;
import org.junit.AssumptionViolatedException;
+import org.junit.jupiter.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,8 +53,12 @@ import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.EnumMap;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -267,6 +272,43 @@ public interface RaftTestUtil {
});
}
+ Comparator<LogEntryProto> LOG_ENTRY_PROTO_COMPARATOR = Comparator.comparing(
+ e -> e.getStateMachineLogEntry().getLogData().asReadOnlyByteBuffer());
+ Comparator<SimpleMessage> SIMPLE_MESSAGE_COMPARATOR = Comparator.comparing(
+ m -> m.getContent().asReadOnlyByteBuffer());
+
+ /** @return a map of message-array-index to {@link LogEntryProto} for the
entries found in the given log. */
+ static Map<Integer, LogEntryProto> getStateMachineLogEntries(RaftLog log,
SimpleMessage[] messages) {
+ if (messages.length == 0) {
+ return Collections.emptyMap();
+ }
+ final List<LogEntryProto> entries = getStateMachineLogEntries(log, s ->
{});
+ if (entries.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ entries.sort(LOG_ENTRY_PROTO_COMPARATOR);
+ Arrays.sort(messages, SIMPLE_MESSAGE_COMPARATOR);
+
+ final Map<Integer, LogEntryProto> found = new HashMap<>();
+ for (int e = 0, m = 0; e < entries.size() && m < messages.length; ) {
+ final int diff =
messages[m].getContent().asReadOnlyByteBuffer().compareTo(
+
entries.get(e).getStateMachineLogEntry().getLogData().asReadOnlyByteBuffer());
+ if (diff == 0) {
+ found.put(m, entries.get(e));
+ m++;
+ e++;
+ } else if (diff < 0) {
+ m++; // message < entry
+ } else {
+ e++; // message > entry
+ }
+ }
+
+ Assertions.assertEquals(messages.length, found.size());
+ return found;
+ }
+
static List<LogEntryProto> getStateMachineLogEntries(RaftLog log,
Consumer<String> print) {
final List<LogEntryProto> entries = new ArrayList<>();
for (LogEntryProto e : getLogEntryProtos(log)) {
@@ -306,8 +348,7 @@ public interface RaftTestUtil {
}
Assert.assertTrue(e.getIndex() > logIndex);
logIndex = e.getIndex();
- Assert.assertArrayEquals(expectedMessages[i].getContent().toByteArray(),
- e.getStateMachineLogEntry().getLogData().toByteArray());
+ Assert.assertEquals(expectedMessages[i].getContent(),
e.getStateMachineLogEntry().getLogData());
}
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index 2927ec349..58a51e051 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -31,9 +31,12 @@ import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.server.leader.LogAppender;
+import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLog;
import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Slf4jUtils;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.function.CheckedConsumer;
@@ -49,6 +52,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -138,6 +142,15 @@ public class RaftServerTestUtil {
return (ConfigurationManager)
RaftTestUtil.getDeclaredField(getState(server), "configurationManager");
}
+ public static Logger getTransactionContextLog() {
+ return TransactionManager.LOG;
+ }
+
+ public static Map<TermIndex, MemoizedSupplier<TransactionContext>>
getTransactionContextMap(
+ RaftServer.Division server) {
+ return ((RaftServerImpl)server).getTransactionContextMapForTesting();
+ }
+
public static RaftConfiguration newRaftConfiguration(Collection<RaftPeer>
peers) {
return RaftConfigurationImpl.newBuilder().setConf(peers).build();
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java
index f59958a94..e5a55e49c 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java
@@ -80,7 +80,11 @@ public class RetryCacheTestUtil {
final RaftServerImpl server = mock(RaftServerImpl.class);
when(server.getRetryCache()).thenReturn((RetryCacheImpl) retryCache);
when(server.getMemberId()).thenReturn(memberId);
+
+ final TransactionManager transactionManager = new
TransactionManager(memberId.getPeerId());
+ when(server.getTransactionManager()).thenReturn(transactionManager);
doCallRealMethod().when(server).notifyTruncatedLogEntry(any(LogEntryProto.class));
+
return SegmentedRaftLog.newBuilder()
.setMemberId(memberId)
.setServer(server)
diff --git
a/ratis-test/src/test/java/org/apache/ratis/RaftLogTruncateTests.java
b/ratis-test/src/test/java/org/apache/ratis/RaftLogTruncateTests.java
index 80c57741c..c21110ea0 100644
--- a/ratis-test/src/test/java/org/apache/ratis/RaftLogTruncateTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/RaftLogTruncateTests.java
@@ -22,23 +22,32 @@ import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.client.impl.OrderedAsync;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.MiniRaftCluster;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLog;
import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
+import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Slf4jUtils;
import org.apache.ratis.util.TimeDuration;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
import org.slf4j.event.Level;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -57,9 +66,12 @@ public abstract class RaftLogTruncateTests<CLUSTER extends
MiniRaftCluster> exte
}
{
+ Slf4jUtils.setLogLevel(RaftServerTestUtil.getTransactionContextLog(),
Level.TRACE);
+
Slf4jUtils.setLogLevel(OrderedAsync.LOG, Level.ERROR);
Slf4jUtils.setLogLevel(RaftServerConfigKeys.LOG, Level.ERROR);
Slf4jUtils.setLogLevel(RaftClientConfigKeys.LOG, Level.ERROR);
+ Slf4jUtils.setLogLevel(RaftClientConfigKeys.LOG, Level.ERROR);
final RaftProperties p = getProperties();
p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
SimpleStateMachine4Testing.class, StateMachine.class);
@@ -88,8 +100,8 @@ public abstract class RaftLogTruncateTests<CLUSTER extends
MiniRaftCluster> exte
final List<RaftPeerId> remainingPeers = new ArrayList<>();
final int majorityIndex = NUM_SERVERS / 2 + 1;
- Assert.assertEquals(NUM_SERVERS - 1, oldFollowers.size());
- Assert.assertTrue(majorityIndex < oldFollowers.size());
+ Assertions.assertEquals(NUM_SERVERS - 1, oldFollowers.size());
+ Assertions.assertTrue(majorityIndex < oldFollowers.size());
for (int i = 0; i < majorityIndex; i++) {
killedPeers.add(oldFollowers.get(i).getId());
@@ -113,18 +125,26 @@ public abstract class RaftLogTruncateTests<CLUSTER
extends MiniRaftCluster> exte
final List<Throwable> exceptions = Collections.synchronizedList(new
ArrayList<>());
final long oldLeaderTerm = oldLeader.getInfo().getCurrentTerm();
LOG.info("oldLeader: {}, term={}", oldLeader.getId(), oldLeaderTerm);
+ LOG.info("killedPeers : {}", killedPeers);
+ LOG.info("remainingPeers: {}", remainingPeers);
final SimpleMessage[] firstBatch = SimpleMessage.create(5, "first");
final SimpleMessage[] secondBatch = SimpleMessage.create(4, "second");
+ for (RaftPeer peer : cluster.getGroup().getPeers()) {
+ assertEmptyTransactionContextMap(cluster.getDivision(peer.getId()));
+ }
+
try (final RaftClient client = cluster.createClient(oldLeader.getId())) {
// send some messages
for (SimpleMessage batch : firstBatch) {
final RaftClientReply reply = client.io().send(batch);
- Assert.assertTrue(reply.isSuccess());
+ Assertions.assertTrue(reply.isSuccess());
}
- for (RaftServer.Division f : cluster.getFollowers()) {
- assertLogEntries(f, oldLeaderTerm, firstBatch);
+ for (RaftPeer peer : cluster.getGroup().getPeers()) {
+ final RaftServer.Division division = cluster.getDivision(peer.getId());
+ assertLogEntries(division, oldLeaderTerm, firstBatch);
+ assertEmptyTransactionContextMap(division);
}
// kill a majority of followers
@@ -148,17 +168,19 @@ public abstract class RaftLogTruncateTests<CLUSTER
extends MiniRaftCluster> exte
// check log messages
final SimpleMessage[] expectedMessages = arraycopy(firstBatch,
messagesToBeTruncated);
for (RaftPeerId f : remainingPeers) {
- assertLogEntries(cluster.getDivision(f), oldLeaderTerm,
expectedMessages);
+ final RaftServer.Division division = cluster.getDivision(f);
+ assertLogEntries(division, oldLeaderTerm, expectedMessages);
+ if (!division.getId().equals(oldLeader.getId())) {
+ assertEntriesInTransactionContextMap(division,
messagesToBeTruncated, firstBatch);
+ }
}
done.set(true);
LOG.info("done");
}
- // kill the remaining servers
- LOG.info("Before killServer {}: {}", remainingPeers,
cluster.printServers());
- for (RaftPeerId f : remainingPeers) {
- cluster.killServer(f);
- }
+ // kill the old leader
+ LOG.info("Before killServer {}: {}", oldLeader.getId(),
cluster.printServers());
+ cluster.killServer(oldLeader.getId());
LOG.info("After killServer {}: {}", remainingPeers,
cluster.printServers());
// restart the earlier followers
@@ -174,12 +196,10 @@ public abstract class RaftLogTruncateTests<CLUSTER
extends MiniRaftCluster> exte
final SegmentedRaftLog newLeaderLog = (SegmentedRaftLog)
newLeader.getRaftLog();
LOG.info("newLeader: {}, term {}, last={}", newLeader.getId(),
newLeaderTerm,
newLeaderLog.getLastEntryTermIndex());
- Assert.assertTrue(killedPeers.contains(newLeader.getId()));
+ Assertions.assertTrue(killedPeers.contains(newLeader.getId()));
- // restart the remaining servers
- for (RaftPeerId f : remainingPeers) {
- cluster.restartServer(f, false);
- }
+ // restart the old leader
+ cluster.restartServer(oldLeader.getId(), false);
// check RaftLog truncate
for (RaftPeerId f : remainingPeers) {
@@ -190,14 +210,16 @@ public abstract class RaftLogTruncateTests<CLUSTER
extends MiniRaftCluster> exte
// send more messages
for (SimpleMessage batch : secondBatch) {
final RaftClientReply reply = client.io().send(batch);
- Assert.assertTrue(reply.isSuccess());
+ Assertions.assertTrue(reply.isSuccess());
}
}
// check log messages -- it should be truncated and then append the new
messages
final SimpleMessage[] expectedMessages = arraycopy(firstBatch,
secondBatch);
- for (RaftPeerId f : killedPeers) {
- assertLogEntries(cluster.getDivision(f), oldLeaderTerm,
expectedMessages);
+ for (RaftPeer peer : cluster.getGroup().getPeers()) {
+ final RaftServer.Division division = cluster.getDivision(peer.getId());
+ assertLogEntries(division, oldLeaderTerm, expectedMessages);
+ assertEmptyTransactionContextMap(division);
}
if (!exceptions.isEmpty()) {
@@ -205,7 +227,36 @@ public abstract class RaftLogTruncateTests<CLUSTER extends
MiniRaftCluster> exte
for(int i = 0 ; i < exceptions.size(); i++) {
LOG.info("exception {})", i, exceptions.get(i));
}
- Assert.fail();
+ Assertions.fail();
+ }
+ }
+
+ static void assertEmptyTransactionContextMap(RaftServer.Division division) {
+
Assertions.assertTrue(RaftServerTestUtil.getTransactionContextMap(division).isEmpty(),
+ () -> division.getId() + " TransactionContextMap is non-empty");
+ }
+
+ static void assertEntriesInTransactionContextMap(RaftServer.Division
division,
+ SimpleMessage[] existing, SimpleMessage[] nonExisting) {
+ final RaftLog log = division.getRaftLog();
+ assertEntriesInTransactionContextMap(division,
+ RaftTestUtil.getStateMachineLogEntries(log, existing).values(),
+ RaftTestUtil.getStateMachineLogEntries(log, nonExisting).values());
+ }
+
+ static void assertEntriesInTransactionContextMap(RaftServer.Division
division,
+ Collection<LogEntryProto> existing, Collection<LogEntryProto>
nonExisting) {
+ final Map<TermIndex, MemoizedSupplier<TransactionContext>> map
+ = RaftServerTestUtil.getTransactionContextMap(division);
+ for(LogEntryProto e : existing) {
+ final TermIndex termIndex = TermIndex.valueOf(e);
+ Assertions.assertTrue(map.containsKey(termIndex),
+ () -> termIndex + " not found in " + division.getId());
+ }
+ for(LogEntryProto e : nonExisting) {
+ final TermIndex termIndex = TermIndex.valueOf(e);
+ Assertions.assertFalse(map.containsKey(termIndex),
+ () -> termIndex + " found in " + division.getId());
}
}