This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-3.1.1_review
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 6390a28bdf1d2c454d49a11dca117e5bbc482f54
Author: Haibo Sun <[email protected]>
AuthorDate: Tue Jul 2 14:42:21 2024 +0800

    RATIS-2116. Fix the issue where RaftServerImpl.appendEntries may be blocked 
indefinitely (#1116)
---
 .../org/apache/ratis/util/DataBlockingQueue.java   |  7 ++
 .../main/java/org/apache/ratis/util/DataQueue.java |  5 ++
 .../raftlog/segmented/SegmentedRaftLogWorker.java  |  6 +-
 .../raftlog/segmented/TestSegmentedRaftLog.java    | 84 ++++++++++++++++++++++
 4 files changed, 101 insertions(+), 1 deletion(-)

diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java 
b/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java
index 842b8f154..e905893e5 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java
@@ -162,4 +162,11 @@ public class DataBlockingQueue<E> extends DataQueue<E> {
       return results;
     }
   }
+
+  @Override
+  public E peek() {
+    try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) {
+      return super.peek();
+    }
+  }
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java 
b/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
index 3db06f56e..38762caa1 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
@@ -154,6 +154,11 @@ public class DataQueue<E> implements Iterable<E> {
     return polled;
   }
 
+  /** Peek the head element from this queue. */
+  public E peek() {
+    return q.peek();
+  }
+
   /** The same as {@link java.util.Collection#remove(Object)}. */
   public boolean remove(E e) {
     final boolean removed = q.remove(e);
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index bfeca7f87..573270cca 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -63,6 +63,9 @@ class SegmentedRaftLogWorker {
 
   static final TimeDuration ONE_SECOND = TimeDuration.valueOf(1, 
TimeUnit.SECONDS);
 
+  private static final String CLASS_NAME = 
JavaUtils.getClassSimpleName(SegmentedRaftLogWorker.class);
+  static final String RUN_WORKER = CLASS_NAME + ".runWorker";
+
   static class StateMachineDataPolicy {
     private final boolean sync;
     private final TimeDuration syncTimeout;
@@ -297,6 +300,7 @@ class SegmentedRaftLogWorker {
     // if and when a log task encounters an exception
     RaftLogIOException logIOException = null;
 
+    CodeInjectionForTesting.execute(RUN_WORKER, server == null ? null : 
server.getId(), null, queue);
     while (running) {
       try {
         Task task = queue.poll(ONE_SECOND);
@@ -355,7 +359,7 @@ class SegmentedRaftLogWorker {
     } else if (pendingFlushNum >= forceSyncNum) {
       return true;
     }
-    return pendingFlushNum > 0 && queue.isEmpty();
+    return pendingFlushNum > 0 && !(queue.peek() instanceof WriteLog);
   }
 
   private void flushIfNecessary() throws IOException {
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index b564501f3..93eb7db0e 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -40,6 +40,8 @@ import org.apache.ratis.server.storage.RaftStorageTestUtils;
 import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.util.CodeInjectionForTesting;
+import org.apache.ratis.util.DataBlockingQueue;
 import org.apache.ratis.util.LifeCycle;
 import org.apache.ratis.util.Slf4jUtils;
 import org.apache.ratis.util.FileUtils;
@@ -55,8 +57,13 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 import java.util.stream.Stream;
@@ -72,6 +79,7 @@ import org.slf4j.event.Level;
 
 import static java.lang.Boolean.FALSE;
 import static java.lang.Boolean.TRUE;
+import static 
org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogWorker.RUN_WORKER;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.params.provider.Arguments.arguments;
 
@@ -393,6 +401,82 @@ public class TestSegmentedRaftLog extends BaseTest {
     }
   }
 
+  @ParameterizedTest
+  @MethodSource("data")
+  public void testPurgeAfterAppendEntry(Boolean useAsyncFlush, Boolean 
smSyncFlush) throws Exception {
+    RaftServerConfigKeys.Log.setAsyncFlushEnabled(properties, useAsyncFlush);
+    RaftServerConfigKeys.Log.StateMachineData.setSync(properties, smSyncFlush);
+    RaftServerConfigKeys.Log.setPurgeGap(properties, 1);
+    RaftServerConfigKeys.Log.setForceSyncNum(properties, 128);
+
+    int startTerm = 0;
+    int endTerm = 2;
+    int segmentSize = 10;
+    long endIndexOfClosedSegment = segmentSize * (endTerm - startTerm - 1);
+    long nextStartIndex = segmentSize * (endTerm - startTerm);
+
+    // append entries and roll logSegment for later purge operation
+    List<SegmentRange> ranges0 = prepareRanges(startTerm, endTerm, 
segmentSize, 0);
+    List<LogEntryProto> entries0 = prepareLogEntries(ranges0, null);
+    try (SegmentedRaftLog raftLog = newSegmentedRaftLog()) {
+      raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
+      
entries0.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
+    }
+
+    // test the pattern in the task queue of SegmentedRaftLogWorker: 
(WriteLog, ..., PurgeLog)
+    List<SegmentRange> ranges = prepareRanges(endTerm - 1, endTerm, 1, 
nextStartIndex);
+    List<LogEntryProto> entries = prepareLogEntries(ranges, null);
+
+    try (SegmentedRaftLog raftLog = newSegmentedRaftLog()) {
+      final CountDownLatch raftLogOpened = new CountDownLatch(1);
+      final CountDownLatch tasksAdded = new CountDownLatch(1);
+
+      // inject test code to make the pattern (WriteLog, PurgeLog)
+      final ConcurrentLinkedQueue<CompletableFuture<Long>> appendFutures = new 
ConcurrentLinkedQueue<>();
+      final AtomicReference<CompletableFuture<Long>> purgeFuture = new 
AtomicReference<>();
+      final AtomicInteger tasksCount = new AtomicInteger(0);
+      CodeInjectionForTesting.put(RUN_WORKER, (localId, remoteId, args) -> {
+        // wait for raftLog to be opened
+        try {
+          if(!raftLogOpened.await(FIVE_SECONDS.getDuration(), 
FIVE_SECONDS.getUnit())) {
+            throw new TimeoutException();
+          }
+        } catch (InterruptedException | TimeoutException e) {
+          LOG.error("an exception occurred", e);
+          throw new RuntimeException(e);
+        }
+
+        // add WriteLog and PurgeLog tasks
+        entries.stream().map(raftLog::appendEntry).forEach(appendFutures::add);
+        purgeFuture.set(raftLog.purge(endIndexOfClosedSegment));
+
+        tasksCount.set(((DataBlockingQueue<?>) args[0]).getNumElements());
+        tasksAdded.countDown();
+        return true;
+      });
+
+      // open raftLog
+      raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
+      raftLogOpened.countDown();
+
+      // wait for all tasks to be added
+      if(!tasksAdded.await(FIVE_SECONDS.getDuration(), 
FIVE_SECONDS.getUnit())) {
+        throw new TimeoutException();
+      }
+      Assertions.assertEquals(entries.size() + 1, tasksCount.get());
+
+      // check if the purge task is executed
+      final Long purged = purgeFuture.get().get();
+      LOG.info("purgeIndex = {}, purged = {}", endIndexOfClosedSegment, 
purged);
+      Assertions.assertEquals(endIndexOfClosedSegment, 
raftLog.getRaftLogCache().getStartIndex());
+
+      // check if the appendEntry futures are done
+      JavaUtils.allOf(appendFutures).get(FIVE_SECONDS.getDuration(), 
FIVE_SECONDS.getUnit());
+    } finally {
+      CodeInjectionForTesting.put(RUN_WORKER, (localId, remoteId, args) -> 
false);
+    }
+  }
+
   @ParameterizedTest
   @MethodSource("data")
   public void testTruncate(Boolean useAsyncFlush, Boolean smSyncFlush) throws 
Exception {

Reply via email to