This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 5560718ab RATIS-2011. When a log entry is truncated, remove 
TransactionContext. (#1029)
5560718ab is described below

commit 5560718aba3ec9cb1447f7305ba4e23b2567effa
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Thu Jan 25 08:07:23 2024 -0800

    RATIS-2011. When a log entry is truncated, remove TransactionContext. 
(#1029)
---
 .../org/apache/ratis/client/impl/OrderedAsync.java | 11 ++-
 .../java/org/apache/ratis/util/BatchLogger.java    | 12 ++-
 .../main/java/org/apache/ratis/util/JavaUtils.java | 16 +---
 .../apache/ratis/server/impl/RaftServerImpl.java   | 21 ++++-
 .../ratis/server/impl/TransactionManager.java      | 49 +++++++++--
 .../apache/ratis/server/raftlog/LogProtoUtils.java | 18 +++-
 .../apache/ratis/server/raftlog/RaftLogBase.java   |  3 +-
 .../test/java/org/apache/ratis/RaftTestUtil.java   | 45 +++++++++-
 .../ratis/server/impl/RaftServerTestUtil.java      | 13 +++
 .../ratis/server/impl/RetryCacheTestUtil.java      |  4 +
 .../org/apache/ratis/RaftLogTruncateTests.java     | 95 +++++++++++++++++-----
 11 files changed, 231 insertions(+), 56 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
index 34dc3be11..09c6cd4ac 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
@@ -32,6 +32,7 @@ import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.rpc.CallId;
+import org.apache.ratis.util.BatchLogger;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
@@ -50,6 +51,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.LongFunction;
 
@@ -57,6 +59,10 @@ import java.util.function.LongFunction;
 public final class OrderedAsync {
   public static final Logger LOG = LoggerFactory.getLogger(OrderedAsync.class);
 
+  private enum BatchLogKey implements BatchLogger.Key {
+    SEND_REQUEST_EXCEPTION
+  }
+
   static class PendingOrderedRequest extends PendingClientRequest
       implements SlidingWindow.ClientSideRequest<RaftClientReply> {
     private final long callId;
@@ -204,7 +210,10 @@ public final class OrderedAsync {
       getSlidingWindow(request).receiveReply(
           request.getSlidingWindowEntry().getSeqNum(), reply, 
this::sendRequestWithRetry);
     }).exceptionally(e -> {
-      LOG.error(client.getId() + ": Failed* " + request, e);
+      final Throwable exception = e;
+      final String key = client.getId() + "-" + request.getCallId() + "-" + 
exception;
+      final Consumer<String> op = suffix -> LOG.error("{} {}: Failed* {}", 
suffix, client.getId(), request, exception);
+      BatchLogger.warn(BatchLogKey.SEND_REQUEST_EXCEPTION, key, op);
       handleException(pending, request, e);
       return null;
     });
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/BatchLogger.java 
b/ratis-common/src/main/java/org/apache/ratis/util/BatchLogger.java
index 9ccd66ad7..38dad5c49 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/BatchLogger.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/BatchLogger.java
@@ -35,7 +35,13 @@ public final class BatchLogger {
   private BatchLogger() {
   }
 
-  public interface Key {}
+  public interface Key {
+    TimeDuration DEFAULT_DURATION = TimeDuration.valueOf(5, TimeUnit.SECONDS);
+
+    default TimeDuration getBatchDuration() {
+      return DEFAULT_DURATION;
+    }
+  }
 
   private static final class UniqueId {
     private final Key key;
@@ -93,6 +99,10 @@ public final class BatchLogger {
   private static final TimeoutExecutor SCHEDULER = 
TimeoutExecutor.getInstance();
   private static final ConcurrentMap<UniqueId, BatchedLogEntry> LOG_CACHE = 
new ConcurrentHashMap<>();
 
+  public static void warn(Key key, String name, Consumer<String> op) {
+    warn(key, name, op, key.getBatchDuration(), true);
+  }
+
   public static void warn(Key key, String name, Consumer<String> op, 
TimeDuration batchDuration) {
     warn(key, name, op, batchDuration, true);
   }
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
index 00725903a..d3f899a7f 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
@@ -41,7 +41,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
-import java.util.function.BooleanSupplier;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -229,7 +228,7 @@ public interface JavaUtils {
         }
         if (log != null && log.isWarnEnabled()) {
           log.warn("FAILED \"" + name.get() + "\", attempt #" + i + "/" + 
numAttempts
-              + ": " + t + ", sleep " + sleepTime + " and then retry.", t);
+              + ", sleep " + sleepTime + " and then retry: " + t);
         }
       }
 
@@ -245,19 +244,6 @@ public interface JavaUtils {
     attemptRepeatedly(CheckedRunnable.asCheckedSupplier(runnable), 
numAttempts, sleepTime, name, log);
   }
 
-  /** Attempt to wait the given condition to return true multiple times. */
-  static void attemptUntilTrue(
-      BooleanSupplier condition, int numAttempts, TimeDuration sleepTime, 
String name, Logger log)
-      throws InterruptedException {
-    Objects.requireNonNull(condition, "condition == null");
-    attempt(() -> {
-      if (!condition.getAsBoolean()) {
-        throw new IllegalStateException("Condition " + name + " is false.");
-      }
-    }, numAttempts, sleepTime, name, log);
-  }
-
-
   static Timer runRepeatedly(Runnable runnable, long delay, long period, 
TimeUnit unit) {
     final Timer timer = new Timer(true);
     timer.schedule(new TimerTask() {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 396243b1c..8f6c9273e 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -90,6 +90,7 @@ import org.apache.ratis.statemachine.SnapshotInfo;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.statemachine.impl.TransactionContextImpl;
+import 
org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.util.CodeInjectionForTesting;
 import org.apache.ratis.util.CollectionUtils;
@@ -179,7 +180,7 @@ class RaftServerImpl implements RaftServer.Division,
   private final DataStreamMap dataStreamMap;
   private final RaftServerConfigKeys.Read.Option readOption;
 
-  private final TransactionManager transactionManager = new 
TransactionManager();
+  private final TransactionManager transactionManager;
   private final RetryCacheImpl retryCache;
   private final CommitInfoCache commitInfoCache = new CommitInfoCache();
   private final WriteIndexCache writeIndexCache;
@@ -225,6 +226,7 @@ class RaftServerImpl implements RaftServer.Division,
     this.dataStreamMap = new DataStreamMapImpl(id);
     this.readOption = RaftServerConfigKeys.Read.option(properties);
     this.writeIndexCache = new WriteIndexCache(properties);
+    this.transactionManager = new TransactionManager(id);
 
     this.leaderElectionMetrics = 
LeaderElectionMetrics.getLeaderElectionMetrics(
         getMemberId(), state::getLastLeaderElapsedTimeMs);
@@ -1821,7 +1823,7 @@ class RaftServerImpl implements RaftServer.Division,
     }
 
     return stateMachineFuture.whenComplete((reply, exception) -> {
-      transactionManager.remove(termIndex);
+      getTransactionManager().remove(termIndex);
       final RaftClientReply.Builder b = newReplyBuilder(invocationId, 
termIndex.getIndex());
       final RaftClientReply r;
       if (exception == null) {
@@ -1839,6 +1841,15 @@ class RaftServerImpl implements RaftServer.Division,
     });
   }
 
+  TransactionManager getTransactionManager() {
+    return transactionManager;
+  }
+
+  @VisibleForTesting
+  Map<TermIndex, MemoizedSupplier<TransactionContext>> 
getTransactionContextMapForTesting() {
+    return getTransactionManager().getMap();
+  }
+
   TransactionContext getTransactionContext(LogEntryProto entry, Boolean 
createNew) {
     if (!entry.hasStateMachineLogEntry()) {
       return null;
@@ -1854,9 +1865,9 @@ class RaftServerImpl implements RaftServer.Division,
     }
 
     if (!createNew) {
-      return transactionManager.get(termIndex);
+      return getTransactionManager().get(termIndex);
     }
-    return transactionManager.computeIfAbsent(termIndex,
+    return getTransactionManager().computeIfAbsent(termIndex,
         // call startTransaction only once
         MemoizedSupplier.valueOf(() -> stateMachine.startTransaction(entry, 
getInfo().getCurrentRole())));
   }
@@ -1898,6 +1909,8 @@ class RaftServerImpl implements RaftServer.Division,
    */
   void notifyTruncatedLogEntry(LogEntryProto logEntry) {
     if (logEntry.hasStateMachineLogEntry()) {
+      getTransactionManager().remove(TermIndex.valueOf(logEntry));
+
       final ClientInvocationId invocationId = 
ClientInvocationId.valueOf(logEntry.getStateMachineLogEntry());
       final CacheEntry cacheEntry = getRetryCache().getIfPresent(invocationId);
       if (cacheEntry != null) {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/TransactionManager.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransactionManager.java
index 283900fb6..c33bc26bc 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/TransactionManager.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransactionManager.java
@@ -19,27 +19,64 @@ package org.apache.ratis.server.impl;
 
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.statemachine.TransactionContext;
+import 
org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MemoizedSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.function.Supplier;
 
 /**
  * Managing {@link TransactionContext}.
  */
 class TransactionManager {
-  private final ConcurrentMap<TermIndex, Supplier<TransactionContext>> 
contexts = new ConcurrentHashMap<>();
+  static final Logger LOG = LoggerFactory.getLogger(TransactionManager.class);
+
+  private final String name;
+  private final ConcurrentMap<TermIndex, MemoizedSupplier<TransactionContext>> 
contexts = new ConcurrentHashMap<>();
+
+  TransactionManager(Object name) {
+    this.name = name + "-" + JavaUtils.getClassSimpleName(getClass());
+  }
+
+  @VisibleForTesting
+  Map<TermIndex, MemoizedSupplier<TransactionContext>> getMap() {
+    LOG.debug("{}", this);
+    return Collections.unmodifiableMap(contexts);
+  }
 
   TransactionContext get(TermIndex termIndex) {
-    return 
Optional.ofNullable(contexts.get(termIndex)).map(Supplier::get).orElse(null);
+    return 
Optional.ofNullable(contexts.get(termIndex)).map(MemoizedSupplier::get).orElse(null);
   }
 
-  TransactionContext computeIfAbsent(TermIndex termIndex, 
Supplier<TransactionContext> constructor) {
-    return contexts.computeIfAbsent(termIndex, i -> constructor).get();
+  TransactionContext computeIfAbsent(TermIndex termIndex, 
MemoizedSupplier<TransactionContext> constructor) {
+    final MemoizedSupplier<TransactionContext> m = 
contexts.computeIfAbsent(termIndex, i -> constructor);
+    if (!m.isInitialized()) {
+      LOG.debug("{}: {}", termIndex,  this);
+    }
+    return m.get();
   }
 
   void remove(TermIndex termIndex) {
-    contexts.remove(termIndex);
+    final MemoizedSupplier<TransactionContext> removed = 
contexts.remove(termIndex);
+    if (removed != null) {
+      LOG.debug("{}: {}", termIndex,  this);
+    }
+  }
+
+  @Override
+  public String toString() {
+    if (contexts.isEmpty()) {
+      return name + " <empty>";
+    }
+
+    final StringBuilder b = new StringBuilder(name);
+    contexts.forEach((k, v) -> b.append("\n  ").append(k).append(": 
initialized? ").append(v.isInitialized()));
+    return b.toString();
   }
 }
\ No newline at end of file
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
index b75777c9e..de06faf63 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
@@ -53,17 +53,27 @@ public final class LogProtoUtils {
       s = "(c:" + metadata.getCommitIndex() + ")";
     } else if (entry.hasConfigurationEntry()) {
       final RaftConfigurationProto config = entry.getConfigurationEntry();
-      s = "(current:" + 
config.getPeersList().stream().map(AbstractMessage::toString).collect(Collectors.joining(","))
 +
-          ", old:" + 
config.getOldPeersList().stream().map(AbstractMessage::toString).collect(Collectors.joining(","))
-          + ")";
+      s = "(current:" + peersToString(config.getPeersList())
+          + ", old:" + peersToString(config.getOldPeersList()) + ")";
     } else {
       s = "";
     }
     return TermIndex.valueOf(entry) + ", " + entry.getLogEntryBodyCase() + s;
   }
 
+  static String peersToString(List<RaftPeerProto> peers) {
+    return peers.stream().map(AbstractMessage::toString)
+        .map(s -> s.replace("\n", ""))
+        .map(s -> s.replace(" ", ""))
+        .collect(Collectors.joining(", "));
+  }
+
+  static String stateMachineLogEntryProtoToString(StateMachineLogEntryProto p) 
{
+    return "logData:" + p.getLogData() + ", stateMachineEntry:" + p.getType() 
+ ":" + p.getStateMachineEntry();
+  }
+
   public static String toLogEntryString(LogEntryProto entry) {
-    return toLogEntryString(entry, null);
+    return toLogEntryString(entry, 
LogProtoUtils::stateMachineLogEntryProtoToString);
   }
 
   public static String toLogEntriesString(List<LogEntryProto> entries) {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
index b56b343bd..49e66e253 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
@@ -375,7 +375,8 @@ public abstract class RaftLogBase implements RaftLog {
 
   @Override
   public String toString() {
-    return getName() + ":" + state + ":c" + getLastCommittedIndex();
+    return getName() + ":" + state + ":c" + getLastCommittedIndex()
+        + (isOpened()? ":last" + getLastEntryTermIndex(): "");
   }
 
   public AutoCloseableLock readLock() {
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 41a431149..8cd15aac1 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -45,6 +45,7 @@ import org.apache.ratis.util.ProtoUtils;
 import org.apache.ratis.util.TimeDuration;
 import org.junit.Assert;
 import org.junit.AssumptionViolatedException;
+import org.junit.jupiter.api.Assertions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,8 +53,12 @@ import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.EnumMap;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -267,6 +272,43 @@ public interface RaftTestUtil {
     });
   }
 
+  Comparator<LogEntryProto> LOG_ENTRY_PROTO_COMPARATOR = Comparator.comparing(
+      e -> e.getStateMachineLogEntry().getLogData().asReadOnlyByteBuffer());
+  Comparator<SimpleMessage> SIMPLE_MESSAGE_COMPARATOR = Comparator.comparing(
+      m -> m.getContent().asReadOnlyByteBuffer());
+
+  /** @return a map of message-array-index to {@link LogEntryProto} for the 
entries found in the given log. */
+  static Map<Integer, LogEntryProto> getStateMachineLogEntries(RaftLog log, 
SimpleMessage[] messages) {
+    if (messages.length == 0) {
+      return Collections.emptyMap();
+    }
+    final List<LogEntryProto> entries = getStateMachineLogEntries(log, s -> 
{});
+    if (entries.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    entries.sort(LOG_ENTRY_PROTO_COMPARATOR);
+    Arrays.sort(messages, SIMPLE_MESSAGE_COMPARATOR);
+
+    final Map<Integer, LogEntryProto> found = new HashMap<>();
+    for (int e = 0, m = 0; e < entries.size() && m < messages.length; ) {
+      final int diff = 
messages[m].getContent().asReadOnlyByteBuffer().compareTo(
+          
entries.get(e).getStateMachineLogEntry().getLogData().asReadOnlyByteBuffer());
+      if (diff == 0) {
+        found.put(m, entries.get(e));
+        m++;
+        e++;
+      } else if (diff < 0) {
+        m++; // message < entry
+      } else {
+        e++; // message > entry
+      }
+    }
+
+    Assertions.assertEquals(messages.length, found.size());
+    return found;
+  }
+
   static List<LogEntryProto> getStateMachineLogEntries(RaftLog log, 
Consumer<String> print) {
     final List<LogEntryProto> entries = new ArrayList<>();
     for (LogEntryProto e : getLogEntryProtos(log)) {
@@ -306,8 +348,7 @@ public interface RaftTestUtil {
       }
       Assert.assertTrue(e.getIndex() > logIndex);
       logIndex = e.getIndex();
-      Assert.assertArrayEquals(expectedMessages[i].getContent().toByteArray(),
-          e.getStateMachineLogEntry().getLogData().toByteArray());
+      Assert.assertEquals(expectedMessages[i].getContent(), 
e.getStateMachineLogEntry().getLogData());
     }
   }
 
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index 2927ec349..58a51e051 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -31,9 +31,12 @@ import org.apache.ratis.server.RaftConfiguration;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerRpc;
 import org.apache.ratis.server.leader.LogAppender;
+import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLog;
 import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.Slf4jUtils;
 import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.function.CheckedConsumer;
@@ -49,6 +52,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -138,6 +142,15 @@ public class RaftServerTestUtil {
     return (ConfigurationManager) 
RaftTestUtil.getDeclaredField(getState(server), "configurationManager");
   }
 
+  public static Logger getTransactionContextLog() {
+    return TransactionManager.LOG;
+  }
+
+  public static Map<TermIndex, MemoizedSupplier<TransactionContext>> 
getTransactionContextMap(
+      RaftServer.Division server) {
+    return ((RaftServerImpl)server).getTransactionContextMapForTesting();
+  }
+
   public static RaftConfiguration newRaftConfiguration(Collection<RaftPeer> 
peers) {
     return RaftConfigurationImpl.newBuilder().setConf(peers).build();
   }
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java
index f59958a94..e5a55e49c 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java
@@ -80,7 +80,11 @@ public class RetryCacheTestUtil {
     final RaftServerImpl server = mock(RaftServerImpl.class);
     when(server.getRetryCache()).thenReturn((RetryCacheImpl) retryCache);
     when(server.getMemberId()).thenReturn(memberId);
+
+    final TransactionManager transactionManager = new 
TransactionManager(memberId.getPeerId());
+    when(server.getTransactionManager()).thenReturn(transactionManager);
     
doCallRealMethod().when(server).notifyTruncatedLogEntry(any(LogEntryProto.class));
+
     return SegmentedRaftLog.newBuilder()
         .setMemberId(memberId)
         .setServer(server)
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/RaftLogTruncateTests.java 
b/ratis-test/src/test/java/org/apache/ratis/RaftLogTruncateTests.java
index 80c57741c..c21110ea0 100644
--- a/ratis-test/src/test/java/org/apache/ratis/RaftLogTruncateTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/RaftLogTruncateTests.java
@@ -22,23 +22,32 @@ import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.client.impl.OrderedAsync;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.MiniRaftCluster;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLog;
 import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
+import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.Slf4jUtils;
 import org.apache.ratis.util.TimeDuration;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
 import org.slf4j.event.Level;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -57,9 +66,12 @@ public abstract class RaftLogTruncateTests<CLUSTER extends 
MiniRaftCluster> exte
   }
 
   {
+    Slf4jUtils.setLogLevel(RaftServerTestUtil.getTransactionContextLog(), 
Level.TRACE);
+
     Slf4jUtils.setLogLevel(OrderedAsync.LOG, Level.ERROR);
     Slf4jUtils.setLogLevel(RaftServerConfigKeys.LOG, Level.ERROR);
     Slf4jUtils.setLogLevel(RaftClientConfigKeys.LOG, Level.ERROR);
+    Slf4jUtils.setLogLevel(RaftClientConfigKeys.LOG, Level.ERROR);
 
     final RaftProperties p = getProperties();
     p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, 
SimpleStateMachine4Testing.class, StateMachine.class);
@@ -88,8 +100,8 @@ public abstract class RaftLogTruncateTests<CLUSTER extends 
MiniRaftCluster> exte
     final List<RaftPeerId> remainingPeers = new ArrayList<>();
 
     final int majorityIndex = NUM_SERVERS / 2 + 1;
-    Assert.assertEquals(NUM_SERVERS - 1, oldFollowers.size());
-    Assert.assertTrue(majorityIndex < oldFollowers.size());
+    Assertions.assertEquals(NUM_SERVERS - 1, oldFollowers.size());
+    Assertions.assertTrue(majorityIndex < oldFollowers.size());
 
     for (int i = 0; i < majorityIndex; i++) {
       killedPeers.add(oldFollowers.get(i).getId());
@@ -113,18 +125,26 @@ public abstract class RaftLogTruncateTests<CLUSTER 
extends MiniRaftCluster> exte
     final List<Throwable> exceptions = Collections.synchronizedList(new 
ArrayList<>());
     final long oldLeaderTerm = oldLeader.getInfo().getCurrentTerm();
     LOG.info("oldLeader: {}, term={}", oldLeader.getId(), oldLeaderTerm);
+    LOG.info("killedPeers   : {}", killedPeers);
+    LOG.info("remainingPeers: {}", remainingPeers);
 
     final SimpleMessage[] firstBatch = SimpleMessage.create(5, "first");
     final SimpleMessage[] secondBatch = SimpleMessage.create(4, "second");
 
+    for (RaftPeer peer : cluster.getGroup().getPeers()) {
+      assertEmptyTransactionContextMap(cluster.getDivision(peer.getId()));
+    }
+
     try (final RaftClient client = cluster.createClient(oldLeader.getId())) {
       // send some messages
       for (SimpleMessage batch : firstBatch) {
         final RaftClientReply reply = client.io().send(batch);
-        Assert.assertTrue(reply.isSuccess());
+        Assertions.assertTrue(reply.isSuccess());
       }
-      for (RaftServer.Division f : cluster.getFollowers()) {
-        assertLogEntries(f, oldLeaderTerm, firstBatch);
+      for (RaftPeer peer : cluster.getGroup().getPeers()) {
+        final RaftServer.Division division = cluster.getDivision(peer.getId());
+        assertLogEntries(division, oldLeaderTerm, firstBatch);
+        assertEmptyTransactionContextMap(division);
       }
 
       // kill a majority of followers
@@ -148,17 +168,19 @@ public abstract class RaftLogTruncateTests<CLUSTER 
extends MiniRaftCluster> exte
       // check log messages
       final SimpleMessage[] expectedMessages = arraycopy(firstBatch, 
messagesToBeTruncated);
       for (RaftPeerId f : remainingPeers) {
-        assertLogEntries(cluster.getDivision(f), oldLeaderTerm, 
expectedMessages);
+        final RaftServer.Division division = cluster.getDivision(f);
+        assertLogEntries(division, oldLeaderTerm, expectedMessages);
+        if (!division.getId().equals(oldLeader.getId())) {
+          assertEntriesInTransactionContextMap(division, 
messagesToBeTruncated, firstBatch);
+        }
       }
       done.set(true);
       LOG.info("done");
     }
 
-    // kill the remaining servers
-    LOG.info("Before killServer {}: {}", remainingPeers, 
cluster.printServers());
-    for (RaftPeerId f : remainingPeers) {
-      cluster.killServer(f);
-    }
+    // kill the old leader
+    LOG.info("Before killServer {}: {}", oldLeader.getId(), 
cluster.printServers());
+    cluster.killServer(oldLeader.getId());
     LOG.info("After killServer {}: {}", remainingPeers, 
cluster.printServers());
 
     // restart the earlier followers
@@ -174,12 +196,10 @@ public abstract class RaftLogTruncateTests<CLUSTER 
extends MiniRaftCluster> exte
     final SegmentedRaftLog newLeaderLog = (SegmentedRaftLog) 
newLeader.getRaftLog();
     LOG.info("newLeader: {}, term {}, last={}", newLeader.getId(), 
newLeaderTerm,
         newLeaderLog.getLastEntryTermIndex());
-    Assert.assertTrue(killedPeers.contains(newLeader.getId()));
+    Assertions.assertTrue(killedPeers.contains(newLeader.getId()));
 
-    // restart the remaining servers
-    for (RaftPeerId f : remainingPeers) {
-      cluster.restartServer(f, false);
-    }
+    // restart the old leader
+    cluster.restartServer(oldLeader.getId(), false);
 
     // check RaftLog truncate
     for (RaftPeerId f : remainingPeers) {
@@ -190,14 +210,16 @@ public abstract class RaftLogTruncateTests<CLUSTER 
extends MiniRaftCluster> exte
       // send more messages
       for (SimpleMessage batch : secondBatch) {
         final RaftClientReply reply = client.io().send(batch);
-        Assert.assertTrue(reply.isSuccess());
+        Assertions.assertTrue(reply.isSuccess());
       }
     }
 
     // check log messages -- it should be truncated and then append the new 
messages
     final SimpleMessage[] expectedMessages = arraycopy(firstBatch, 
secondBatch);
-    for (RaftPeerId f : killedPeers) {
-      assertLogEntries(cluster.getDivision(f), oldLeaderTerm, 
expectedMessages);
+    for (RaftPeer peer : cluster.getGroup().getPeers()) {
+      final RaftServer.Division division = cluster.getDivision(peer.getId());
+      assertLogEntries(division, oldLeaderTerm, expectedMessages);
+      assertEmptyTransactionContextMap(division);
     }
 
     if (!exceptions.isEmpty()) {
@@ -205,7 +227,36 @@ public abstract class RaftLogTruncateTests<CLUSTER extends 
MiniRaftCluster> exte
       for(int i = 0 ; i < exceptions.size(); i++) {
         LOG.info("exception {})", i, exceptions.get(i));
       }
-      Assert.fail();
+      Assertions.fail();
+    }
+  }
+
+  static void assertEmptyTransactionContextMap(RaftServer.Division division) {
+    
Assertions.assertTrue(RaftServerTestUtil.getTransactionContextMap(division).isEmpty(),
+        () -> division.getId() + " TransactionContextMap is non-empty");
+  }
+
+  static void assertEntriesInTransactionContextMap(RaftServer.Division 
division,
+      SimpleMessage[] existing, SimpleMessage[] nonExisting) {
+    final RaftLog log = division.getRaftLog();
+    assertEntriesInTransactionContextMap(division,
+        RaftTestUtil.getStateMachineLogEntries(log, existing).values(),
+        RaftTestUtil.getStateMachineLogEntries(log, nonExisting).values());
+  }
+
+  static void assertEntriesInTransactionContextMap(RaftServer.Division 
division,
+      Collection<LogEntryProto> existing, Collection<LogEntryProto> 
nonExisting) {
+    final Map<TermIndex, MemoizedSupplier<TransactionContext>> map
+        = RaftServerTestUtil.getTransactionContextMap(division);
+    for(LogEntryProto e : existing) {
+      final TermIndex termIndex = TermIndex.valueOf(e);
+      Assertions.assertTrue(map.containsKey(termIndex),
+          () -> termIndex + " not found in " + division.getId());
+    }
+    for(LogEntryProto e : nonExisting) {
+      final TermIndex termIndex = TermIndex.valueOf(e);
+      Assertions.assertFalse(map.containsKey(termIndex),
+          () -> termIndex + " found in " + division.getId());
     }
   }
 

Reply via email to