This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 32745c3c4 RATIS-2116. Fix the issue where RaftServerImpl.appendEntries
may be blocked indefinitely (#1116)
32745c3c4 is described below
commit 32745c3c4ecb9777a6d44e703134ef1181ad2dd2
Author: Haibo Sun <[email protected]>
AuthorDate: Tue Jul 2 14:42:21 2024 +0800
RATIS-2116. Fix the issue where RaftServerImpl.appendEntries may be blocked
indefinitely (#1116)
---
.../org/apache/ratis/util/DataBlockingQueue.java | 7 ++
.../main/java/org/apache/ratis/util/DataQueue.java | 5 ++
.../raftlog/segmented/SegmentedRaftLogWorker.java | 6 +-
.../raftlog/segmented/TestSegmentedRaftLog.java | 84 ++++++++++++++++++++++
4 files changed, 101 insertions(+), 1 deletion(-)
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java
b/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java
index 842b8f154..e905893e5 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java
@@ -162,4 +162,11 @@ public class DataBlockingQueue<E> extends DataQueue<E> {
return results;
}
}
+
+ @Override
+ public E peek() {
+ try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) {
+ return super.peek();
+ }
+ }
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
b/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
index 3db06f56e..38762caa1 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
@@ -154,6 +154,11 @@ public class DataQueue<E> implements Iterable<E> {
return polled;
}
+ /** Peek the head element from this queue. */
+ public E peek() {
+ return q.peek();
+ }
+
/** The same as {@link java.util.Collection#remove(Object)}. */
public boolean remove(E e) {
final boolean removed = q.remove(e);
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index b9d1442a0..dedba2690 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -63,6 +63,9 @@ class SegmentedRaftLogWorker {
static final TimeDuration ONE_SECOND = TimeDuration.valueOf(1,
TimeUnit.SECONDS);
+ private static final String CLASS_NAME =
JavaUtils.getClassSimpleName(SegmentedRaftLogWorker.class);
+ static final String RUN_WORKER = CLASS_NAME + ".runWorker";
+
static class StateMachineDataPolicy {
private final boolean sync;
private final TimeDuration syncTimeout;
@@ -298,6 +301,7 @@ class SegmentedRaftLogWorker {
// if and when a log task encounters an exception
RaftLogIOException logIOException = null;
+ CodeInjectionForTesting.execute(RUN_WORKER, server == null ? null :
server.getId(), null, queue);
while (running) {
try {
Task task = queue.poll(ONE_SECOND);
@@ -356,7 +360,7 @@ class SegmentedRaftLogWorker {
} else if (pendingFlushNum >= forceSyncNum) {
return true;
}
- return pendingFlushNum > 0 && queue.isEmpty();
+ return pendingFlushNum > 0 && !(queue.peek() instanceof WriteLog);
}
private void flushIfNecessary() throws IOException {
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index 5779a9347..52942279b 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -41,6 +41,8 @@ import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.util.CodeInjectionForTesting;
+import org.apache.ratis.util.DataBlockingQueue;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.ReferenceCountedObject;
import org.apache.ratis.util.Slf4jUtils;
@@ -57,8 +59,13 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Stream;
@@ -74,6 +81,7 @@ import org.slf4j.event.Level;
import static java.lang.Boolean.FALSE;
import static java.lang.Boolean.TRUE;
+import static
org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogWorker.RUN_WORKER;
import static
org.apache.ratis.server.storage.RaftStorageTestUtils.getLogUnsafe;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.params.provider.Arguments.arguments;
@@ -396,6 +404,82 @@ public class TestSegmentedRaftLog extends BaseTest {
}
}
+ @ParameterizedTest
+ @MethodSource("data")
+ public void testPurgeAfterAppendEntry(Boolean useAsyncFlush, Boolean
smSyncFlush) throws Exception {
+ RaftServerConfigKeys.Log.setAsyncFlushEnabled(properties, useAsyncFlush);
+ RaftServerConfigKeys.Log.StateMachineData.setSync(properties, smSyncFlush);
+ RaftServerConfigKeys.Log.setPurgeGap(properties, 1);
+ RaftServerConfigKeys.Log.setForceSyncNum(properties, 128);
+
+ int startTerm = 0;
+ int endTerm = 2;
+ int segmentSize = 10;
+ long endIndexOfClosedSegment = segmentSize * (endTerm - startTerm - 1);
+ long nextStartIndex = segmentSize * (endTerm - startTerm);
+
+ // append entries and roll logSegment for later purge operation
+ List<SegmentRange> ranges0 = prepareRanges(startTerm, endTerm,
segmentSize, 0);
+ List<LogEntryProto> entries0 = prepareLogEntries(ranges0, null);
+ try (SegmentedRaftLog raftLog = newSegmentedRaftLog()) {
+ raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
+
entries0.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
+ }
+
+ // test the pattern in the task queue of SegmentedRaftLogWorker:
(WriteLog, ..., PurgeLog)
+ List<SegmentRange> ranges = prepareRanges(endTerm - 1, endTerm, 1,
nextStartIndex);
+ List<LogEntryProto> entries = prepareLogEntries(ranges, null);
+
+ try (SegmentedRaftLog raftLog = newSegmentedRaftLog()) {
+ final CountDownLatch raftLogOpened = new CountDownLatch(1);
+ final CountDownLatch tasksAdded = new CountDownLatch(1);
+
+ // inject test code to make the pattern (WriteLog, PurgeLog)
+ final ConcurrentLinkedQueue<CompletableFuture<Long>> appendFutures = new
ConcurrentLinkedQueue<>();
+ final AtomicReference<CompletableFuture<Long>> purgeFuture = new
AtomicReference<>();
+ final AtomicInteger tasksCount = new AtomicInteger(0);
+ CodeInjectionForTesting.put(RUN_WORKER, (localId, remoteId, args) -> {
+ // wait for raftLog to be opened
+ try {
+ if(!raftLogOpened.await(FIVE_SECONDS.getDuration(),
FIVE_SECONDS.getUnit())) {
+ throw new TimeoutException();
+ }
+ } catch (InterruptedException | TimeoutException e) {
+ LOG.error("an exception occurred", e);
+ throw new RuntimeException(e);
+ }
+
+ // add WriteLog and PurgeLog tasks
+ entries.stream().map(raftLog::appendEntry).forEach(appendFutures::add);
+ purgeFuture.set(raftLog.purge(endIndexOfClosedSegment));
+
+ tasksCount.set(((DataBlockingQueue<?>) args[0]).getNumElements());
+ tasksAdded.countDown();
+ return true;
+ });
+
+ // open raftLog
+ raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
+ raftLogOpened.countDown();
+
+ // wait for all tasks to be added
+ if(!tasksAdded.await(FIVE_SECONDS.getDuration(),
FIVE_SECONDS.getUnit())) {
+ throw new TimeoutException();
+ }
+ Assertions.assertEquals(entries.size() + 1, tasksCount.get());
+
+ // check if the purge task is executed
+ final Long purged = purgeFuture.get().get();
+ LOG.info("purgeIndex = {}, purged = {}", endIndexOfClosedSegment,
purged);
+ Assertions.assertEquals(endIndexOfClosedSegment,
raftLog.getRaftLogCache().getStartIndex());
+
+ // check if the appendEntry futures are done
+ JavaUtils.allOf(appendFutures).get(FIVE_SECONDS.getDuration(),
FIVE_SECONDS.getUnit());
+ } finally {
+ CodeInjectionForTesting.put(RUN_WORKER, (localId, remoteId, args) ->
false);
+ }
+ }
+
@ParameterizedTest
@MethodSource("data")
public void testTruncate(Boolean useAsyncFlush, Boolean smSyncFlush) throws
Exception {