This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 8fb1831214 IGNITE-18495 Fix RAFT snapshot installation hang due to
response swap on retry (#1491)
8fb1831214 is described below
commit 8fb18312144bb45d7ee00aea5ce210b4df390d73
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Wed Jan 4 11:40:03 2023 +0400
IGNITE-18495 Fix RAFT snapshot installation hang due to response swap on
retry (#1491)
---
.../storage/snapshot/SnapshotExecutorImpl.java | 4 +-
.../raft/jraft/storage/SnapshotExecutorTest.java | 122 ++++++++++++++++++++-
2 files changed, 119 insertions(+), 7 deletions(-)
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java
index 992757f918..30d966add3 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java
@@ -605,7 +605,7 @@ public class SnapshotExecutorImpl implements
SnapshotExecutor {
// this RPC.
saved = m;
this.downloadingSnapshot.set(ds);
- result = false;
+ result = true;
}
else if (m.request.meta().lastIncludedIndex() >
ds.request.meta().lastIncludedIndex()) {
// |is| is older
@@ -642,7 +642,7 @@ public class SnapshotExecutorImpl implements
SnapshotExecutor {
}
if (saved != null) {
// Respond replaced session
- LOG.warn("Register DownloadingSnapshot failed: interrupted by
retry installling request.");
+ LOG.warn("Register DownloadingSnapshot failed: interrupted by
retry installing request.");
saved.done.sendResponse(RaftRpcFactory.DEFAULT //
.newResponse(msgFactory, RaftError.EINTR,
"Interrupted by the retry InstallSnapshotRequest"));
diff --git
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutorTest.java
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutorTest.java
index 1bef8f1b03..7a5affe459 100644
---
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutorTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutorTest.java
@@ -16,14 +16,24 @@
*/
package org.apache.ignite.raft.jraft.storage;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.internal.raft.storage.impl.DefaultLogStorageFactory;
import org.apache.ignite.internal.raft.storage.impl.IgniteJraftServiceFactory;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.IgniteTestUtils.RunnableX;
import org.apache.ignite.raft.jraft.FSMCaller;
import org.apache.ignite.raft.jraft.JRaftUtils;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
@@ -34,17 +44,20 @@ import
org.apache.ignite.raft.jraft.closure.SynchronizedClosure;
import org.apache.ignite.raft.jraft.core.NodeImpl;
import org.apache.ignite.raft.jraft.core.TimerManager;
import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
import org.apache.ignite.raft.jraft.option.CopyOptions;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.option.RaftOptions;
import org.apache.ignite.raft.jraft.option.SnapshotExecutorOptions;
import org.apache.ignite.raft.jraft.rpc.GetFileRequestBuilder;
+import org.apache.ignite.raft.jraft.rpc.InstallSnapshotRequestBuilder;
import org.apache.ignite.raft.jraft.rpc.Message;
import org.apache.ignite.raft.jraft.rpc.RaftClientService;
import org.apache.ignite.raft.jraft.rpc.RpcContext;
import org.apache.ignite.raft.jraft.rpc.RpcRequestClosure;
import org.apache.ignite.raft.jraft.rpc.RpcRequests;
import org.apache.ignite.raft.jraft.rpc.RpcResponseClosure;
+import org.apache.ignite.raft.jraft.rpc.impl.FutureImpl;
import org.apache.ignite.raft.jraft.storage.snapshot.Snapshot;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotExecutorImpl;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
@@ -52,6 +65,7 @@ import
org.apache.ignite.raft.jraft.storage.snapshot.local.LocalSnapshotMetaTabl
import org.apache.ignite.raft.jraft.storage.snapshot.local.LocalSnapshotReader;
import
org.apache.ignite.raft.jraft.storage.snapshot.local.LocalSnapshotStorage;
import org.apache.ignite.raft.jraft.storage.snapshot.local.LocalSnapshotWriter;
+import org.apache.ignite.raft.jraft.test.MockAsyncContext;
import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.raft.jraft.util.ByteString;
import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
@@ -63,14 +77,11 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.eq;
+import org.mockito.stubbing.Answer;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
@@ -145,6 +156,107 @@ public class SnapshotExecutorTest extends BaseStorageTest
{
ExecutorServiceHelper.shutdownAndAwaitTermination(executorService);
}
+ @Test
+ public void testRetryInstallSnapshot() throws Exception {
+ final InstallSnapshotRequestBuilder irb =
raftOptions.getRaftMessagesFactory().installSnapshotRequest();
+ irb.groupId("test");
+ irb.peerId(peerId);
+ irb.serverId("localhost-8080");
+ irb.uri("remote://localhost-8080/99");
+ irb.term(0);
+
irb.meta(raftOptions.getRaftMessagesFactory().snapshotMeta().lastIncludedIndex(1).lastIncludedTerm(2).build());
+
+ Mockito.when(this.raftClientService.connect(new
PeerId("localhost-8080"))).thenReturn(true);
+
+ final FutureImpl<Message> future = new FutureImpl<>();
+ final GetFileRequestBuilder rb =
raftOptions.getRaftMessagesFactory().getFileRequest().readerId(99)
+
.filename(Snapshot.JRAFT_SNAPSHOT_META_FILE).count(Integer.MAX_VALUE).offset(0)
+ .readPartly(true);
+
+ //mock get metadata
+ ArgumentCaptor<RpcResponseClosure> argument =
ArgumentCaptor.forClass(RpcResponseClosure.class);
+
+ final CountDownLatch retryLatch = new CountDownLatch(1);
+ final CountDownLatch answerLatch = new CountDownLatch(1);
+ Mockito.when(
+ this.raftClientService.getFile(eq(new PeerId("localhost-8080")),
eq(rb.build()),
+ eq(this.copyOpts.getTimeoutMs()),
argument.capture())).thenAnswer(new Answer<Future<Message>>() {
+ AtomicInteger count = new AtomicInteger(0);
+
+ @Override
+ public Future<Message> answer(InvocationOnMock invocation) throws
Throwable {
+ if (count.incrementAndGet() == 1) {
+ retryLatch.countDown();
+ answerLatch.await();
+ Thread.sleep(1000);
+ return future;
+ } else {
+ throw new IllegalStateException("shouldn't be called more
than once");
+ }
+ }
+ });
+
+ final MockAsyncContext installContext = new MockAsyncContext();
+ final MockAsyncContext retryInstallContext = new MockAsyncContext();
+ IgniteTestUtils.runAsync(new RunnableX() {
+ @Override
+ public void run() {
+ SnapshotExecutorTest.this.executor.installSnapshot(irb.build(),
+
raftOptions.getRaftMessagesFactory().installSnapshotResponse(), new
RpcRequestClosure(installContext, raftOptions.getRaftMessagesFactory()));
+ }
+ });
+
+ Thread.sleep(500);
+ assertTrue(retryLatch.await(5, TimeUnit.SECONDS));
+ IgniteTestUtils.runAsync(new RunnableX() {
+ @Override
+ public void run() {
+ answerLatch.countDown();
+ SnapshotExecutorTest.this.executor.installSnapshot(irb.build(),
+
raftOptions.getRaftMessagesFactory().installSnapshotResponse(), new
RpcRequestClosure(retryInstallContext,
+ options.getRaftMessagesFactory()));
+ }
+ });
+
+ RpcResponseClosure<RpcRequests.GetFileResponse> closure =
argument.getValue();
+ final ByteBuffer metaBuf = this.table.saveToByteBufferAsRemote();
+
closure.setResponse(raftOptions.getRaftMessagesFactory().getFileResponse().readSize(metaBuf.remaining()).eof(true)
+ .data(new ByteString(metaBuf).copy()).build());
+
+ //mock get file
+ argument = ArgumentCaptor.forClass(RpcResponseClosure.class);
+ rb.filename("testFile");
+ rb.count(this.raftOptions.getMaxByteCountPerRpc());
+ Mockito.when(
+ this.raftClientService.getFile(eq(new PeerId("localhost-8080")),
eq(rb.build()),
+ eq(this.copyOpts.getTimeoutMs()),
argument.capture())).thenReturn(future);
+
+ closure.run(Status.OK());
+ Thread.sleep(500);
+ closure = argument.getValue();
+
closure.setResponse(raftOptions.getRaftMessagesFactory().getFileResponse().readSize(100).eof(true)
+ .data(new ByteString(new byte[100]).copy()).build());
+
+ final ArgumentCaptor<LoadSnapshotClosure> loadSnapshotArg =
ArgumentCaptor.forClass(LoadSnapshotClosure.class);
+
Mockito.when(this.fSMCaller.onSnapshotLoad(loadSnapshotArg.capture())).thenReturn(true);
+ closure.run(Status.OK());
+ Thread.sleep(2000);
+ final LoadSnapshotClosure done = loadSnapshotArg.getValue();
+ final SnapshotReader reader = done.start();
+ assertNotNull(reader);
+ assertEquals(1, reader.listFiles().size());
+ assertTrue(reader.listFiles().contains("testFile"));
+ done.run(Status.OK());
+ this.executor.join();
+ assertEquals(2, this.executor.getLastSnapshotTerm());
+ assertEquals(1, this.executor.getLastSnapshotIndex());
+ assertNotNull(installContext.getResponseObject());
+ assertNotNull(retryInstallContext.getResponseObject());
+
assertEquals(installContext.as(RpcRequests.ErrorResponse.class).errorCode(),
RaftError.EINTR.getNumber());
+
assertTrue(retryInstallContext.as(RpcRequests.InstallSnapshotResponse.class).success());
+
+ }
+
@Test
public void testInstallSnapshot() throws Exception {
RaftMessagesFactory msgFactory = raftOptions.getRaftMessagesFactory();