This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch snapshot-3
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 59b40376ea62e584fb3eac04de592e923214f97d
Author: William Song <48054931+szywill...@users.noreply.github.com>
AuthorDate: Fri Feb 21 03:38:50 2025 +0800

    RATIS-2242 change consistency criteria of heartbeat during appendLog (#1215)
---
 .../apache/ratis/server/impl/RaftServerImpl.java   |  18 +++-
 .../apache/ratis/server/impl/RaftServerProxy.java  |  10 +-
 .../apache/ratis/server/impl/ServerImplUtils.java  | 110 +++++++++++++++++++++
 3 files changed, 130 insertions(+), 8 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 02e038ef8..b18960575 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -17,7 +17,6 @@
  */
 package org.apache.ratis.server.impl;
 
-import java.util.concurrent.CountDownLatch;
 import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.metrics.Timekeeper;
@@ -25,11 +24,11 @@ import 
org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
 import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
-import org.apache.ratis.proto.RaftProtos.LogInfoProto;
 import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
 import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
 import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.proto.RaftProtos.LogInfoProto;
 import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
 import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
 import org.apache.ratis.proto.RaftProtos.RaftConfigurationProto;
@@ -82,6 +81,8 @@ import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.RaftServerRpc;
 import org.apache.ratis.server.impl.LeaderElection.Phase;
 import org.apache.ratis.server.impl.RetryCacheImpl.CacheEntry;
+import org.apache.ratis.server.impl.ServerImplUtils.ConsecutiveIndices;
+import org.apache.ratis.server.impl.ServerImplUtils.NavigableIndices;
 import org.apache.ratis.server.leader.LeaderState;
 import org.apache.ratis.server.metrics.LeaderElectionMetrics;
 import org.apache.ratis.server.metrics.RaftServerMetricsImpl;
@@ -112,6 +113,7 @@ import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.ProtoUtils;
 import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.function.CheckedSupplier;
+import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
 
 import java.io.File;
 import java.io.IOException;
@@ -126,6 +128,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadLocalRandom;
@@ -250,6 +253,7 @@ class RaftServerImpl implements RaftServer.Division,
   private final ThreadGroup threadGroup;
 
   private final AtomicReference<CompletableFuture<Void>> appendLogFuture;
+  private final NavigableIndices appendLogTermIndices = new NavigableIndices();
 
   RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy 
proxy, RaftStorage.StartupOption option)
       throws IOException {
@@ -1621,9 +1625,15 @@ class RaftServerImpl implements RaftServer.Division,
       return reply;
     });
   }
+
   private CompletableFuture<Void> appendLog(List<LogEntryProto> entries) {
+    final List<ConsecutiveIndices> entriesTermIndices = 
ConsecutiveIndices.convert(entries);
+    appendLogTermIndices.append(entriesTermIndices);
     return appendLogFuture.updateAndGet(f -> f.thenCompose(
-            ignored -> JavaUtils.allOf(state.getLog().append(entries))));
+            ignored -> JavaUtils.allOf(state.getLog().append(entries))))
+        .whenComplete((v, e) -> {
+          appendLogTermIndices.removeExisting(entriesTermIndices);
+        });
   }
 
   private long checkInconsistentAppendEntries(TermIndex previous, 
List<LogEntryProto> entries) {
@@ -1650,7 +1660,7 @@ class RaftServerImpl implements RaftServer.Division,
     }
 
     // Check if "previous" is contained in current state.
-    if (previous != null && !state.containsTermIndex(previous)) {
+    if (previous != null && !(appendLogTermIndices.contains(previous) || 
state.containsTermIndex(previous))) {
       final long replyNextIndex = Math.min(state.getNextIndex(), 
previous.getIndex());
       LOG.info("{}: Failed appendEntries as previous log entry ({}) is not 
found", getMemberId(), previous);
       return replyNextIndex;
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 242bb377b..ac4ba64f8 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -640,10 +640,12 @@ class RaftServerProxy implements RaftServer {
   }
 
   @Override
-  public CompletableFuture<AppendEntriesReplyProto> 
appendEntriesAsync(AppendEntriesRequestProto request) {
-    final RaftGroupId groupId = 
ProtoUtils.toRaftGroupId(request.getServerRequest().getRaftGroupId());
-    return getImplFuture(groupId)
-        .thenCompose(impl -> impl.executeSubmitServerRequestAsync(() -> 
impl.appendEntriesAsync(request)));
+  public CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(
+      AppendEntriesRequestProto request) {
+      final RaftGroupId groupId = 
ProtoUtils.toRaftGroupId(request.getServerRequest().getRaftGroupId());
+      return getImplFuture(groupId)
+          .thenCompose(impl -> JavaUtils.callAsUnchecked(
+              () -> impl.appendEntriesAsync(request), 
CompletionException::new));
   }
 
   @Override
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
index e26c6e0ab..c5010a534 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
@@ -40,11 +40,121 @@ import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.TimeDuration;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 
 /** Server utilities for internal use. */
 public final class ServerImplUtils {
+  /** The consecutive indices within the same term. */
+  static class ConsecutiveIndices {
+    /** Convert the given entries to a list of {@link ConsecutiveIndices} */
+    static List<ConsecutiveIndices> convert(List<LogEntryProto> entries) {
+      if (entries == null || entries.isEmpty()) {
+        return Collections.emptyList();
+      }
+
+      List<ConsecutiveIndices> indices = null;
+
+      LogEntryProto previous = entries.get(0);
+      long startIndex = previous.getIndex();
+      int count = 1;
+
+      for (int i = 1; i < entries.size(); i++) {
+        final LogEntryProto current = entries.get(i);
+        // validate if the indices are consecutive
+        Preconditions.assertSame(previous.getIndex() + 1, current.getIndex(), 
"index");
+
+        if (current.getTerm() == previous.getTerm()) {
+          count++;
+        } else {
+          // validate if the terms are increasing
+          Preconditions.assertTrue(previous.getTerm() < current.getTerm(), 
"term");
+          if (indices == null) {
+            indices = new ArrayList<>();
+          }
+          indices.add(new ConsecutiveIndices(previous.getTerm(), startIndex, 
count));
+
+          startIndex = current.getIndex();
+          count = 1;
+        }
+        previous = current;
+      }
+
+      final ConsecutiveIndices last = new 
ConsecutiveIndices(previous.getTerm(), startIndex, count);
+      if (indices == null) {
+        return Collections.singletonList(last);
+      } else {
+        indices.add(last);
+        return indices;
+      }
+    }
+
+    private final long term;
+    private final long startIndex;
+    private final int count;
+
+    ConsecutiveIndices(long term, long startIndex, int count) {
+      Preconditions.assertTrue(count > 0, () -> "count = " + count + " <= 0 ");
+      this.term = term;
+      this.startIndex = startIndex;
+      this.count = count;
+    }
+
+    long getNextIndex() {
+      return startIndex + count;
+    }
+
+    Long getTerm(long index) {
+      final long diff = index - startIndex;
+      return diff < 0 || diff >= count ? null: term;
+    }
+  }
+
+  /** A data structure to support the {@link #contains(TermIndex)} method. */
+  static class NavigableIndices {
+    private final NavigableMap<Long, ConsecutiveIndices> map = new TreeMap<>();
+
+    boolean contains(TermIndex ti) {
+      final Long term = getTerm(ti.getIndex());
+      return term != null && term == ti.getTerm();
+    }
+
+    synchronized Long getTerm(long index) {
+      if (map.isEmpty()) {
+        return null;
+      }
+
+      final Map.Entry<Long, ConsecutiveIndices> floorEntry = 
map.floorEntry(index);
+      if (floorEntry == null) {
+        return null;
+      }
+      return floorEntry.getValue().getTerm(index);
+    }
+
+    synchronized void append(List<ConsecutiveIndices> entriesTermIndices) {
+      for(ConsecutiveIndices indices : entriesTermIndices) {
+        // validate startIndex
+        final Map.Entry<Long, ConsecutiveIndices> lastEntry = map.lastEntry();
+        if (lastEntry != null) {
+          Preconditions.assertSame(lastEntry.getValue().getNextIndex(), 
indices.startIndex, "startIndex");
+        }
+        map.put(indices.startIndex, indices);
+      }
+    }
+
+    synchronized void removeExisting(List<ConsecutiveIndices> 
entriesTermIndices) {
+      for(ConsecutiveIndices indices : entriesTermIndices) {
+        final ConsecutiveIndices removed = map.remove(indices.startIndex);
+        Preconditions.assertSame(indices, removed, "removed");
+      }
+    }
+  }
+
   private ServerImplUtils() {
     //Never constructed
   }

Reply via email to