This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 8fb1831214 IGNITE-18495 Fix RAFT snapshot installation hang due to 
response swap on retry (#1491)
8fb1831214 is described below

commit 8fb18312144bb45d7ee00aea5ce210b4df390d73
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Wed Jan 4 11:40:03 2023 +0400

    IGNITE-18495 Fix RAFT snapshot installation hang due to response swap on 
retry (#1491)
---
 .../storage/snapshot/SnapshotExecutorImpl.java     |   4 +-
 .../raft/jraft/storage/SnapshotExecutorTest.java   | 122 ++++++++++++++++++++-
 2 files changed, 119 insertions(+), 7 deletions(-)

diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java
index 992757f918..30d966add3 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java
@@ -605,7 +605,7 @@ public class SnapshotExecutorImpl implements 
SnapshotExecutor {
                 // this RPC.
                 saved = m;
                 this.downloadingSnapshot.set(ds);
-                result = false;
+                result = true;
             }
             else if (m.request.meta().lastIncludedIndex() > 
ds.request.meta().lastIncludedIndex()) {
                 // |is| is older
@@ -642,7 +642,7 @@ public class SnapshotExecutorImpl implements 
SnapshotExecutor {
         }
         if (saved != null) {
             // Respond replaced session
-            LOG.warn("Register DownloadingSnapshot failed: interrupted by 
retry installling request.");
+            LOG.warn("Register DownloadingSnapshot failed: interrupted by 
retry installing request.");
             saved.done.sendResponse(RaftRpcFactory.DEFAULT //
                 .newResponse(msgFactory, RaftError.EINTR,
                     "Interrupted by the retry InstallSnapshotRequest"));
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutorTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutorTest.java
index 1bef8f1b03..7a5affe459 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutorTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutorTest.java
@@ -16,14 +16,24 @@
  */
 package org.apache.ignite.raft.jraft.storage;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.internal.raft.storage.impl.DefaultLogStorageFactory;
 import org.apache.ignite.internal.raft.storage.impl.IgniteJraftServiceFactory;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.IgniteTestUtils.RunnableX;
 import org.apache.ignite.raft.jraft.FSMCaller;
 import org.apache.ignite.raft.jraft.JRaftUtils;
 import org.apache.ignite.raft.jraft.RaftMessagesFactory;
@@ -34,17 +44,20 @@ import 
org.apache.ignite.raft.jraft.closure.SynchronizedClosure;
 import org.apache.ignite.raft.jraft.core.NodeImpl;
 import org.apache.ignite.raft.jraft.core.TimerManager;
 import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
 import org.apache.ignite.raft.jraft.option.CopyOptions;
 import org.apache.ignite.raft.jraft.option.NodeOptions;
 import org.apache.ignite.raft.jraft.option.RaftOptions;
 import org.apache.ignite.raft.jraft.option.SnapshotExecutorOptions;
 import org.apache.ignite.raft.jraft.rpc.GetFileRequestBuilder;
+import org.apache.ignite.raft.jraft.rpc.InstallSnapshotRequestBuilder;
 import org.apache.ignite.raft.jraft.rpc.Message;
 import org.apache.ignite.raft.jraft.rpc.RaftClientService;
 import org.apache.ignite.raft.jraft.rpc.RpcContext;
 import org.apache.ignite.raft.jraft.rpc.RpcRequestClosure;
 import org.apache.ignite.raft.jraft.rpc.RpcRequests;
 import org.apache.ignite.raft.jraft.rpc.RpcResponseClosure;
+import org.apache.ignite.raft.jraft.rpc.impl.FutureImpl;
 import org.apache.ignite.raft.jraft.storage.snapshot.Snapshot;
 import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotExecutorImpl;
 import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
@@ -52,6 +65,7 @@ import 
org.apache.ignite.raft.jraft.storage.snapshot.local.LocalSnapshotMetaTabl
 import org.apache.ignite.raft.jraft.storage.snapshot.local.LocalSnapshotReader;
 import 
org.apache.ignite.raft.jraft.storage.snapshot.local.LocalSnapshotStorage;
 import org.apache.ignite.raft.jraft.storage.snapshot.local.LocalSnapshotWriter;
+import org.apache.ignite.raft.jraft.test.MockAsyncContext;
 import org.apache.ignite.raft.jraft.test.TestUtils;
 import org.apache.ignite.raft.jraft.util.ByteString;
 import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
@@ -63,14 +77,11 @@ import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
 import org.mockito.junit.jupiter.MockitoExtension;
 import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.eq;
+import org.mockito.stubbing.Answer;
 
 @ExtendWith(MockitoExtension.class)
 @MockitoSettings(strictness = Strictness.LENIENT)
@@ -145,6 +156,107 @@ public class SnapshotExecutorTest extends BaseStorageTest 
{
         ExecutorServiceHelper.shutdownAndAwaitTermination(executorService);
     }
 
+    @Test
+    public void testRetryInstallSnapshot() throws Exception {
+        final InstallSnapshotRequestBuilder irb = 
raftOptions.getRaftMessagesFactory().installSnapshotRequest();
+        irb.groupId("test");
+        irb.peerId(peerId);
+        irb.serverId("localhost-8080");
+        irb.uri("remote://localhost-8080/99");
+        irb.term(0);
+        
irb.meta(raftOptions.getRaftMessagesFactory().snapshotMeta().lastIncludedIndex(1).lastIncludedTerm(2).build());
+
+        Mockito.when(this.raftClientService.connect(new 
PeerId("localhost-8080"))).thenReturn(true);
+
+        final FutureImpl<Message> future = new FutureImpl<>();
+        final GetFileRequestBuilder rb = 
raftOptions.getRaftMessagesFactory().getFileRequest().readerId(99)
+            
.filename(Snapshot.JRAFT_SNAPSHOT_META_FILE).count(Integer.MAX_VALUE).offset(0)
+            .readPartly(true);
+
+        //mock get metadata
+        ArgumentCaptor<RpcResponseClosure> argument = 
ArgumentCaptor.forClass(RpcResponseClosure.class);
+
+        final CountDownLatch retryLatch = new CountDownLatch(1);
+        final CountDownLatch answerLatch = new CountDownLatch(1);
+        Mockito.when(
+            this.raftClientService.getFile(eq(new PeerId("localhost-8080")), 
eq(rb.build()),
+                eq(this.copyOpts.getTimeoutMs()), 
argument.capture())).thenAnswer(new Answer<Future<Message>>() {
+            AtomicInteger count = new AtomicInteger(0);
+
+            @Override
+            public Future<Message> answer(InvocationOnMock invocation) throws 
Throwable {
+                if (count.incrementAndGet() == 1) {
+                    retryLatch.countDown();
+                    answerLatch.await();
+                    Thread.sleep(1000);
+                    return future;
+                } else {
+                    throw new IllegalStateException("shouldn't be called more 
than once");
+                }
+            }
+        });
+
+        final MockAsyncContext installContext = new MockAsyncContext();
+        final MockAsyncContext retryInstallContext = new MockAsyncContext();
+        IgniteTestUtils.runAsync(new RunnableX() {
+            @Override
+            public void run() {
+                SnapshotExecutorTest.this.executor.installSnapshot(irb.build(),
+                        
raftOptions.getRaftMessagesFactory().installSnapshotResponse(), new 
RpcRequestClosure(installContext, raftOptions.getRaftMessagesFactory()));
+            }
+        });
+
+        Thread.sleep(500);
+        assertTrue(retryLatch.await(5, TimeUnit.SECONDS));
+        IgniteTestUtils.runAsync(new RunnableX() {
+            @Override
+            public void run() {
+                answerLatch.countDown();
+                SnapshotExecutorTest.this.executor.installSnapshot(irb.build(),
+                        
raftOptions.getRaftMessagesFactory().installSnapshotResponse(), new 
RpcRequestClosure(retryInstallContext,
+                                options.getRaftMessagesFactory()));
+            }
+        });
+
+        RpcResponseClosure<RpcRequests.GetFileResponse> closure = 
argument.getValue();
+        final ByteBuffer metaBuf = this.table.saveToByteBufferAsRemote();
+        
closure.setResponse(raftOptions.getRaftMessagesFactory().getFileResponse().readSize(metaBuf.remaining()).eof(true)
+            .data(new ByteString(metaBuf).copy()).build());
+
+        //mock get file
+        argument = ArgumentCaptor.forClass(RpcResponseClosure.class);
+        rb.filename("testFile");
+        rb.count(this.raftOptions.getMaxByteCountPerRpc());
+        Mockito.when(
+            this.raftClientService.getFile(eq(new PeerId("localhost-8080")), 
eq(rb.build()),
+                eq(this.copyOpts.getTimeoutMs()), 
argument.capture())).thenReturn(future);
+
+        closure.run(Status.OK());
+        Thread.sleep(500);
+        closure = argument.getValue();
+        
closure.setResponse(raftOptions.getRaftMessagesFactory().getFileResponse().readSize(100).eof(true)
+            .data(new ByteString(new byte[100]).copy()).build());
+
+        final ArgumentCaptor<LoadSnapshotClosure> loadSnapshotArg = 
ArgumentCaptor.forClass(LoadSnapshotClosure.class);
+        
Mockito.when(this.fSMCaller.onSnapshotLoad(loadSnapshotArg.capture())).thenReturn(true);
+        closure.run(Status.OK());
+        Thread.sleep(2000);
+        final LoadSnapshotClosure done = loadSnapshotArg.getValue();
+        final SnapshotReader reader = done.start();
+        assertNotNull(reader);
+        assertEquals(1, reader.listFiles().size());
+        assertTrue(reader.listFiles().contains("testFile"));
+        done.run(Status.OK());
+        this.executor.join();
+        assertEquals(2, this.executor.getLastSnapshotTerm());
+        assertEquals(1, this.executor.getLastSnapshotIndex());
+        assertNotNull(installContext.getResponseObject());
+        assertNotNull(retryInstallContext.getResponseObject());
+        
assertEquals(installContext.as(RpcRequests.ErrorResponse.class).errorCode(), 
RaftError.EINTR.getNumber());
+        
assertTrue(retryInstallContext.as(RpcRequests.InstallSnapshotResponse.class).success());
+
+    }
+
     @Test
     public void testInstallSnapshot() throws Exception {
         RaftMessagesFactory msgFactory = raftOptions.getRaftMessagesFactory();

Reply via email to