This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new e858e11  IGNITE-15038 Fixed a race in argument capturing. - Fixes #195.
e858e11 is described below

commit e858e116941edcf275743af21112763679ce528c
Author: Alexey Scherbakov <[email protected]>
AuthorDate: Fri Jul 2 19:09:50 2021 +0300

    IGNITE-15038 Fixed a race in argument capturing. - Fixes #195.
    
    Signed-off-by: Alexey Scherbakov <[email protected]>
---
 .../org/apache/ignite/raft/jraft/JRaftUtils.java   |   3 +-
 .../ignite/raft/jraft/core/ReplicatorTest.java     |   2 +-
 .../raft/jraft/storage/SnapshotExecutorTest.java   | 198 ++++++++++-----------
 .../snapshot/local/LocalSnapshotCopierTest.java    |  31 ++--
 .../apache/ignite/raft/jraft/test/TestUtils.java   |  17 ++
 modules/transactions/README.md                     |   8 +-
 6 files changed, 138 insertions(+), 121 deletions(-)

diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftUtils.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftUtils.java
index a3bf263..5492af6 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftUtils.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftUtils.java
@@ -62,10 +62,11 @@ public final class JRaftUtils {
      * @param prefix thread name prefix
      * @param number thread number
      * @return a new {@link ThreadPoolExecutor} instance
+     * @throws IllegalArgumentException If a number of threads is incorrect.
      */
     public static ExecutorService createExecutor(final String prefix, final 
int number) {
         if (number <= 0) {
-            return null;
+            throw new IllegalArgumentException();
         }
         return ThreadPoolUtil.newBuilder() //
             .poolName(prefix) //
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorTest.java
index 736ce65..9353597 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorTest.java
@@ -362,7 +362,7 @@ public class ReplicatorTest {
             Utils.monotonicMs());
 
         assertEquals(r.statInfo.runningState, Replicator.RunningState.IDLE);
-        this.id.unlock();
+        this.id.unlock(); // TODO asch fix bad unlock IGNITE-14832
         assertEquals(11, Replicator.getNextIndex(this.id));
         assertEquals(99, r.getWaitId());
         latch.await(); //make sure catch up closure is invoked.
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutorTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutorTest.java
index 5cd1974..75aeebd 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutorTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutorTest.java
@@ -19,6 +19,8 @@ package org.apache.ignite.raft.jraft.storage;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.Future;
 import org.apache.ignite.raft.jraft.FSMCaller;
 import org.apache.ignite.raft.jraft.JRaftUtils;
 import org.apache.ignite.raft.jraft.Status;
@@ -47,6 +49,7 @@ import 
org.apache.ignite.raft.jraft.storage.snapshot.local.LocalSnapshotMetaTabl
 import org.apache.ignite.raft.jraft.storage.snapshot.local.LocalSnapshotReader;
 import 
org.apache.ignite.raft.jraft.storage.snapshot.local.LocalSnapshotStorage;
 import org.apache.ignite.raft.jraft.storage.snapshot.local.LocalSnapshotWriter;
+import org.apache.ignite.raft.jraft.test.TestUtils;
 import org.apache.ignite.raft.jraft.util.ByteString;
 import org.apache.ignite.raft.jraft.util.Endpoint;
 import org.apache.ignite.raft.jraft.util.Utils;
@@ -90,174 +93,171 @@ public class SnapshotExecutorTest extends BaseStorageTest 
{
     @Mock
     private LocalSnapshotStorage snapshotStorage;
     private TimerManager timerManager;
+    private NodeOptions options;
 
     @Override
     @Before
     public void setup() throws Exception {
         super.setup();
-        this.timerManager = new TimerManager(5);
-        this.raftOptions = new RaftOptions();
-        this.writer = new LocalSnapshotWriter(this.path, this.snapshotStorage, 
this.raftOptions);
-        this.reader = new LocalSnapshotReader(this.snapshotStorage, null, new 
Endpoint("localhost", 8081),
-            this.raftOptions, this.path);
-
-        
Mockito.lenient().when(this.snapshotStorage.open()).thenReturn(this.reader);
-        
Mockito.lenient().when(this.snapshotStorage.create(true)).thenReturn(this.writer);
-
-        this.table = new LocalSnapshotMetaTable(this.raftOptions);
-        this.table.addFile("testFile", 
LocalFileMetaOutter.LocalFileMeta.newBuilder().setChecksum("test").build());
-        
this.table.setMeta(RaftOutter.SnapshotMeta.newBuilder().setLastIncludedIndex(1).setLastIncludedTerm(1).build());
-        this.uri = "remote://" + this.hostPort + "/" + this.readerId;
-        this.copyOpts = new CopyOptions();
-
-        Mockito.when(this.node.getRaftOptions()).thenReturn(new RaftOptions());
-        NodeOptions options = new NodeOptions();
+        timerManager = new TimerManager(5);
+        raftOptions = new RaftOptions();
+        writer = new LocalSnapshotWriter(path, snapshotStorage, raftOptions);
+        reader = new LocalSnapshotReader(snapshotStorage, null, new 
Endpoint("localhost", 8081),
+            raftOptions, path);
+
+        Mockito.lenient().when(snapshotStorage.open()).thenReturn(reader);
+        
Mockito.lenient().when(snapshotStorage.create(true)).thenReturn(writer);
+
+        table = new LocalSnapshotMetaTable(raftOptions);
+        table.addFile("testFile", 
LocalFileMetaOutter.LocalFileMeta.newBuilder().setChecksum("test").build());
+        
table.setMeta(RaftOutter.SnapshotMeta.newBuilder().setLastIncludedIndex(1).setLastIncludedTerm(1).build());
+        uri = "remote://" + hostPort + "/" + readerId;
+        copyOpts = new CopyOptions();
+
+        Mockito.when(node.getRaftOptions()).thenReturn(new RaftOptions());
+        options = new NodeOptions();
         options.setCommonExecutor(JRaftUtils.createExecutor("test-executor", 
Utils.cpus()));
-        Mockito.when(this.node.getOptions()).thenReturn(options);
-        
Mockito.when(this.node.getRpcClientService()).thenReturn(this.raftClientService);
-        
Mockito.when(this.node.getTimerManager()).thenReturn(this.timerManager);
-        Mockito.when(this.node.getServiceFactory()).thenReturn(new 
DefaultJRaftServiceFactory());
-        this.executor = new SnapshotExecutorImpl();
+        Mockito.when(node.getOptions()).thenReturn(options);
+        Mockito.when(node.getRpcClientService()).thenReturn(raftClientService);
+        Mockito.when(node.getTimerManager()).thenReturn(timerManager);
+        Mockito.when(node.getServiceFactory()).thenReturn(new 
DefaultJRaftServiceFactory());
+        executor = new SnapshotExecutorImpl();
         final SnapshotExecutorOptions opts = new SnapshotExecutorOptions();
-        opts.setFsmCaller(this.fSMCaller);
+        opts.setFsmCaller(fSMCaller);
         opts.setInitTerm(0);
-        opts.setNode(this.node);
-        opts.setLogManager(this.logManager);
-        opts.setUri(this.path);
-        this.addr = new Endpoint("localhost", 8081);
-        opts.setAddr(this.addr);
-        assertTrue(this.executor.init(opts));
+        opts.setNode(node);
+        opts.setLogManager(logManager);
+        opts.setUri(path);
+        addr = new Endpoint("localhost", 8081);
+        opts.setAddr(addr);
+        assertTrue(executor.init(opts));
     }
 
     @Override
     @After
     public void teardown() throws Exception {
-        this.executor.shutdown();
+        executor.shutdown();
         super.teardown();
-        this.timerManager.shutdown();
+        timerManager.shutdown();
+        options.getCommonExecutor().shutdown();
     }
 
     @Test
     public void testInstallSnapshot() throws Exception {
         final RpcRequests.InstallSnapshotRequest.Builder irb = 
RpcRequests.InstallSnapshotRequest.newBuilder();
         irb.setGroupId("test");
-        irb.setPeerId(this.addr.toString());
+        irb.setPeerId(addr.toString());
         irb.setServerId("localhost:8080");
         irb.setUri("remote://localhost:8080/99");
         irb.setTerm(0);
         
irb.setMeta(RaftOutter.SnapshotMeta.newBuilder().setLastIncludedIndex(1).setLastIncludedTerm(2).build());
 
-        Mockito.when(this.raftClientService.connect(new Endpoint("localhost", 
8080))).thenReturn(true);
+        Mockito.when(raftClientService.connect(new Endpoint("localhost", 
8080))).thenReturn(true);
 
-        final CompletableFuture<Message> future = new CompletableFuture<>();
+        final CompletableFuture<Message> fut = new CompletableFuture<>();
         final RpcRequests.GetFileRequest.Builder rb = 
RpcRequests.GetFileRequest.newBuilder().setReaderId(99)
             
.setFilename(Snapshot.JRAFT_SNAPSHOT_META_FILE).setCount(Integer.MAX_VALUE).setOffset(0)
             .setReadPartly(true);
 
-        //mock get metadata
+        // Mock get metadata
         ArgumentCaptor<RpcResponseClosure> argument = 
ArgumentCaptor.forClass(RpcResponseClosure.class);
-        Mockito.when(
-            this.raftClientService.getFile(eq(new Endpoint("localhost", 
8080)), eq(rb.build()),
-                eq(this.copyOpts.getTimeoutMs()), 
argument.capture())).thenReturn(future);
-
-        Utils.runInThread(Executors.newSingleThreadExecutor(), new Runnable() {
-            @Override
-            public void run() {
-                
SnapshotExecutorTest.this.executor.installSnapshot(irb.build(), 
RpcRequests.InstallSnapshotResponse
-                    .newBuilder(), new 
RpcRequestClosure(SnapshotExecutorTest.this.asyncCtx));
-            }
-        });
-
-        Thread.sleep(500);
+        Mockito.when(raftClientService.getFile(eq(new Endpoint("localhost", 
8080)), eq(rb.build()),
+                eq(copyOpts.getTimeoutMs()), 
argument.capture())).thenReturn(fut);
+
+        Future<?> snapFut = Utils.runInThread(ForkJoinPool.commonPool(), () -> 
executor.installSnapshot(irb.build(),
+            RpcRequests.InstallSnapshotResponse.newBuilder(), new 
RpcRequestClosure(asyncCtx)));
+
+        assertTrue(TestUtils.waitForArgumentCapture(argument, 5_000));
+
         RpcResponseClosure<RpcRequests.GetFileResponse> closure = 
argument.getValue();
-        final ByteBuffer metaBuf = this.table.saveToByteBufferAsRemote();
+        final ByteBuffer metaBuf = table.saveToByteBufferAsRemote();
         
closure.setResponse(RpcRequests.GetFileResponse.newBuilder().setReadSize(metaBuf.remaining()).setEof(true)
             .setData(new ByteString(metaBuf)).build());
 
-        //mock get file
+        // Mock get file
         argument = ArgumentCaptor.forClass(RpcResponseClosure.class);
         rb.setFilename("testFile");
-        rb.setCount(this.raftOptions.getMaxByteCountPerRpc());
-        Mockito.when(
-            this.raftClientService.getFile(eq(new Endpoint("localhost", 
8080)), eq(rb.build()),
-                eq(this.copyOpts.getTimeoutMs()), 
argument.capture())).thenReturn(future);
+        rb.setCount(raftOptions.getMaxByteCountPerRpc());
+        Mockito.when(raftClientService.getFile(eq(new Endpoint("localhost", 
8080)), eq(rb.build()),
+            eq(copyOpts.getTimeoutMs()), argument.capture())).thenReturn(fut);
 
         closure.run(Status.OK());
 
-        Thread.sleep(500);
+        assertTrue(TestUtils.waitForArgumentCapture(argument, 5_000));
+
         closure = argument.getValue();
+
         
closure.setResponse(RpcRequests.GetFileResponse.newBuilder().setReadSize(100).setEof(true)
             .setData(new ByteString(new byte[100])).build());
 
-        final ArgumentCaptor<LoadSnapshotClosure> loadSnapshotArg = 
ArgumentCaptor.forClass(LoadSnapshotClosure.class);
-        
Mockito.when(this.fSMCaller.onSnapshotLoad(loadSnapshotArg.capture())).thenReturn(true);
+        ArgumentCaptor<LoadSnapshotClosure> loadSnapshotArg = 
ArgumentCaptor.forClass(LoadSnapshotClosure.class);
+        
Mockito.when(fSMCaller.onSnapshotLoad(loadSnapshotArg.capture())).thenReturn(true);
         closure.run(Status.OK());
-        Thread.sleep(500);
+
+        assertTrue(TestUtils.waitForArgumentCapture(loadSnapshotArg, 5_000));
+
         final LoadSnapshotClosure done = loadSnapshotArg.getValue();
         final SnapshotReader reader = done.start();
         assertNotNull(reader);
         assertEquals(1, reader.listFiles().size());
         assertTrue(reader.listFiles().contains("testFile"));
         done.run(Status.OK());
-        this.executor.join();
-        assertEquals(2, this.executor.getLastSnapshotTerm());
-        assertEquals(1, this.executor.getLastSnapshotIndex());
+        executor.join();
+
+        assertTrue(snapFut.isDone());
+
+        assertEquals(2, executor.getLastSnapshotTerm());
+        assertEquals(1, executor.getLastSnapshotIndex());
     }
 
     @Test
     public void testInterruptInstalling() throws Exception {
         final RpcRequests.InstallSnapshotRequest.Builder irb = 
RpcRequests.InstallSnapshotRequest.newBuilder();
         irb.setGroupId("test");
-        irb.setPeerId(this.addr.toString());
+        irb.setPeerId(addr.toString());
         irb.setServerId("localhost:8080");
         irb.setUri("remote://localhost:8080/99");
         irb.setTerm(0);
         
irb.setMeta(RaftOutter.SnapshotMeta.newBuilder().setLastIncludedIndex(1).setLastIncludedTerm(1).build());
 
-        Mockito.lenient().when(this.raftClientService.connect(new 
Endpoint("localhost", 8080))).thenReturn(true);
+        Mockito.lenient().when(raftClientService.connect(new 
Endpoint("localhost", 8080))).thenReturn(true);
 
         final CompletableFuture<Message> future = new CompletableFuture<>();
         final RpcRequests.GetFileRequest.Builder rb = 
RpcRequests.GetFileRequest.newBuilder().setReaderId(99)
             
.setFilename(Snapshot.JRAFT_SNAPSHOT_META_FILE).setCount(Integer.MAX_VALUE).setOffset(0)
             .setReadPartly(true);
 
-        //mock get metadata
+        // Mock get metadata
         final ArgumentCaptor<RpcResponseClosure> argument = 
ArgumentCaptor.forClass(RpcResponseClosure.class);
         Mockito.lenient().when(
-            this.raftClientService.getFile(eq(new Endpoint("localhost", 
8080)), eq(rb.build()),
-                eq(this.copyOpts.getTimeoutMs()), 
argument.capture())).thenReturn(future);
-        Utils.runInThread(Executors.newSingleThreadExecutor(), new Runnable() {
-            @Override
-            public void run() {
-                
SnapshotExecutorTest.this.executor.installSnapshot(irb.build(), 
RpcRequests.InstallSnapshotResponse
-                    .newBuilder(), new 
RpcRequestClosure(SnapshotExecutorTest.this.asyncCtx));
-
-            }
-        });
-
-        this.executor.interruptDownloadingSnapshots(1);
-        this.executor.join();
-        assertEquals(0, this.executor.getLastSnapshotTerm());
-        assertEquals(0, this.executor.getLastSnapshotIndex());
+            raftClientService.getFile(eq(new Endpoint("localhost", 8080)), 
eq(rb.build()),
+                eq(copyOpts.getTimeoutMs()), 
argument.capture())).thenReturn(future);
+        Utils.runInThread(Executors.newSingleThreadExecutor(), () -> 
executor.installSnapshot(irb.build(), RpcRequests.InstallSnapshotResponse
+            .newBuilder(), new RpcRequestClosure(asyncCtx)));
+
+        executor.interruptDownloadingSnapshots(1);
+        executor.join();
+        assertEquals(0, executor.getLastSnapshotTerm());
+        assertEquals(0, executor.getLastSnapshotIndex());
     }
 
     @Test
     public void testDoSnapshot() throws Exception {
-        Mockito.when(this.fSMCaller.getLastAppliedIndex()).thenReturn(1L);
+        Mockito.when(fSMCaller.getLastAppliedIndex()).thenReturn(1L);
         final ArgumentCaptor<SaveSnapshotClosure> saveSnapshotClosureArg = 
ArgumentCaptor
             .forClass(SaveSnapshotClosure.class);
-        
Mockito.when(this.fSMCaller.onSnapshotSave(saveSnapshotClosureArg.capture())).thenReturn(true);
+        
Mockito.when(fSMCaller.onSnapshotSave(saveSnapshotClosureArg.capture())).thenReturn(true);
         final SynchronizedClosure done = new SynchronizedClosure();
-        this.executor.doSnapshot(done);
+        executor.doSnapshot(done);
         final SaveSnapshotClosure closure = saveSnapshotClosureArg.getValue();
         assertNotNull(closure);
         
closure.start(RaftOutter.SnapshotMeta.newBuilder().setLastIncludedIndex(2).setLastIncludedTerm(1).build());
         closure.run(Status.OK());
         done.await();
-        this.executor.join();
+        executor.join();
         assertTrue(done.getStatus().isOk());
-        assertEquals(1, this.executor.getLastSnapshotTerm());
-        assertEquals(2, this.executor.getLastSnapshotIndex());
+        assertEquals(1, executor.getLastSnapshotTerm());
+        assertEquals(2, executor.getLastSnapshotIndex());
     }
 
     @Test
@@ -265,14 +265,13 @@ public class SnapshotExecutorTest extends BaseStorageTest 
{
         final NodeOptions nodeOptions = new NodeOptions();
         nodeOptions.setSnapshotLogIndexMargin(10);
         
nodeOptions.setCommonExecutor(JRaftUtils.createExecutor("test-executor", 
Utils.cpus()));
-        Mockito.when(this.node.getOptions()).thenReturn(nodeOptions);
-        Mockito.when(this.fSMCaller.getLastAppliedIndex()).thenReturn(1L);
-        this.executor.doSnapshot(null);
-        this.executor.join();
-
-        assertEquals(0, this.executor.getLastSnapshotTerm());
-        assertEquals(0, this.executor.getLastSnapshotIndex());
+        Mockito.when(node.getOptions()).thenReturn(nodeOptions);
+        Mockito.when(fSMCaller.getLastAppliedIndex()).thenReturn(1L);
+        executor.doSnapshot(null);
+        executor.join();
 
+        assertEquals(0, executor.getLastSnapshotTerm());
+        assertEquals(0, executor.getLastSnapshotIndex());
     }
 
     @Test
@@ -280,23 +279,22 @@ public class SnapshotExecutorTest extends BaseStorageTest 
{
         final NodeOptions nodeOptions = new NodeOptions();
         nodeOptions.setSnapshotLogIndexMargin(5);
         
nodeOptions.setCommonExecutor(JRaftUtils.createExecutor("test-executor", 
Utils.cpus()));
-        Mockito.when(this.node.getOptions()).thenReturn(nodeOptions);
-        Mockito.when(this.fSMCaller.getLastAppliedIndex()).thenReturn(6L);
+        Mockito.when(node.getOptions()).thenReturn(nodeOptions);
+        Mockito.when(fSMCaller.getLastAppliedIndex()).thenReturn(6L);
 
         final ArgumentCaptor<SaveSnapshotClosure> saveSnapshotClosureArg = 
ArgumentCaptor
             .forClass(SaveSnapshotClosure.class);
-        
Mockito.when(this.fSMCaller.onSnapshotSave(saveSnapshotClosureArg.capture())).thenReturn(true);
+        
Mockito.when(fSMCaller.onSnapshotSave(saveSnapshotClosureArg.capture())).thenReturn(true);
         final SynchronizedClosure done = new SynchronizedClosure();
-        this.executor.doSnapshot(done);
+        executor.doSnapshot(done);
         final SaveSnapshotClosure closure = saveSnapshotClosureArg.getValue();
         assertNotNull(closure);
         
closure.start(RaftOutter.SnapshotMeta.newBuilder().setLastIncludedIndex(6).setLastIncludedTerm(1).build());
         closure.run(Status.OK());
         done.await();
-        this.executor.join();
-
-        assertEquals(1, this.executor.getLastSnapshotTerm());
-        assertEquals(6, this.executor.getLastSnapshotIndex());
+        executor.join();
 
+        assertEquals(1, executor.getLastSnapshotTerm());
+        assertEquals(6, executor.getLastSnapshotIndex());
     }
 }
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotCopierTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotCopierTest.java
index 72bf74d..a711c15 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotCopierTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotCopierTest.java
@@ -18,7 +18,7 @@ package org.apache.ignite.raft.jraft.storage.snapshot.local;
 
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ForkJoinPool;
 import org.apache.ignite.raft.jraft.JRaftUtils;
 import org.apache.ignite.raft.jraft.Status;
 import org.apache.ignite.raft.jraft.core.Scheduler;
@@ -37,6 +37,7 @@ import org.apache.ignite.raft.jraft.rpc.RpcResponseClosure;
 import org.apache.ignite.raft.jraft.storage.BaseStorageTest;
 import org.apache.ignite.raft.jraft.storage.snapshot.Snapshot;
 import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
+import org.apache.ignite.raft.jraft.test.TestUtils;
 import org.apache.ignite.raft.jraft.util.ByteString;
 import org.apache.ignite.raft.jraft.util.Endpoint;
 import org.apache.ignite.raft.jraft.util.Utils;
@@ -72,6 +73,7 @@ public class LocalSnapshotCopierTest extends BaseStorageTest {
     @Mock
     private LocalSnapshotStorage snapshotStorage;
     private Scheduler timerManager;
+    private NodeOptions nodeOptions;
 
     @Override
     @Before
@@ -93,7 +95,7 @@ public class LocalSnapshotCopierTest extends BaseStorageTest {
         this.copier = new LocalSnapshotCopier();
         this.copyOpts = new CopyOptions();
         Mockito.when(this.raftClientService.connect(new Endpoint("localhost", 
8081))).thenReturn(true);
-        NodeOptions nodeOptions = new NodeOptions();
+        nodeOptions = new NodeOptions();
         
nodeOptions.setCommonExecutor(JRaftUtils.createExecutor("test-executor", 
Utils.cpus()));
         assertTrue(this.copier.init(this.uri, new 
SnapshotCopierOptions(this.raftClientService, this.timerManager,
             this.raftOptions, nodeOptions)));
@@ -104,8 +106,9 @@ public class LocalSnapshotCopierTest extends 
BaseStorageTest {
     @After
     public void teardown() throws Exception {
         super.teardown();
-        this.copier.close();
-        this.timerManager.shutdown();
+        copier.close();
+        timerManager.shutdown();
+        nodeOptions.getCommonExecutor().shutdown();
     }
 
     @Test
@@ -117,12 +120,14 @@ public class LocalSnapshotCopierTest extends 
BaseStorageTest {
             .setReadPartly(true);
 
         //mock get metadata
-        final ArgumentCaptor<RpcResponseClosure> argument = 
ArgumentCaptor.forClass(RpcResponseClosure.class);
+        ArgumentCaptor<RpcResponseClosure> argument = 
ArgumentCaptor.forClass(RpcResponseClosure.class);
         Mockito.when(
             this.raftClientService.getFile(eq(new Endpoint("localhost", 
8081)), eq(rb.build()),
                 eq(this.copyOpts.getTimeoutMs()), 
argument.capture())).thenReturn(future);
         this.copier.start();
-        Thread.sleep(500);
+
+        assertTrue(TestUtils.waitForArgumentCapture(argument, 5_000));
+
         final RpcResponseClosure<RpcRequests.GetFileResponse> closure = 
argument.getValue();
 
         closure.run(new Status(RaftError.ECANCELED, "test cancel"));
@@ -143,19 +148,13 @@ public class LocalSnapshotCopierTest extends 
BaseStorageTest {
             .setReadPartly(true);
 
         //mock get metadata
-        final ArgumentCaptor<RpcResponseClosure> argument = 
ArgumentCaptor.forClass(RpcResponseClosure.class);
+        ArgumentCaptor<RpcResponseClosure> argument = 
ArgumentCaptor.forClass(RpcResponseClosure.class);
         Mockito.when(
             this.raftClientService.getFile(eq(new Endpoint("localhost", 
8081)), eq(rb.build()),
                 eq(this.copyOpts.getTimeoutMs()), 
argument.capture())).thenReturn(future);
         this.copier.start();
-        Thread.sleep(10);
 
-        Utils.runInThread(Executors.newSingleThreadExecutor(), new Runnable() {
-            @Override
-            public void run() {
-                LocalSnapshotCopierTest.this.copier.cancel();
-            }
-        });
+        Utils.runInThread(ForkJoinPool.commonPool(), () -> 
LocalSnapshotCopierTest.this.copier.cancel());
 
         this.copier.join();
         //start timer
@@ -179,7 +178,7 @@ public class LocalSnapshotCopierTest extends 
BaseStorageTest {
             this.raftClientService.getFile(eq(new Endpoint("localhost", 
8081)), eq(rb.build()),
                 eq(this.copyOpts.getTimeoutMs()), 
argument.capture())).thenReturn(future);
         this.copier.start();
-        Thread.sleep(500);
+        assertTrue(TestUtils.waitForArgumentCapture(argument, 5_000));
         RpcResponseClosure<RpcRequests.GetFileResponse> closure = 
argument.getValue();
         final ByteBuffer metaBuf = this.table.saveToByteBufferAsRemote();
         
closure.setResponse(RpcRequests.GetFileResponse.newBuilder().setReadSize(metaBuf.remaining()).setEof(true)
@@ -195,7 +194,7 @@ public class LocalSnapshotCopierTest extends 
BaseStorageTest {
 
         closure.run(Status.OK());
 
-        Thread.sleep(500);
+        assertTrue(TestUtils.waitForArgumentCapture(argument, 5_000));
         closure = argument.getValue();
         
closure.setResponse(RpcRequests.GetFileResponse.newBuilder().setReadSize(100).setEof(true)
             .setData(new ByteString(new byte[100])).build());
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/TestUtils.java 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/TestUtils.java
index 001d1bd..d1eb72e 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/TestUtils.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/TestUtils.java
@@ -38,6 +38,7 @@ import org.apache.ignite.raft.jraft.entity.LogId;
 import org.apache.ignite.raft.jraft.entity.PeerId;
 import org.apache.ignite.raft.jraft.rpc.RpcRequests;
 import org.apache.ignite.raft.jraft.util.Endpoint;
+import org.mockito.ArgumentCaptor;
 
 import static java.lang.Thread.sleep;
 
@@ -174,4 +175,20 @@ public class TestUtils {
 
         return false;
     }
+
+    /**
+     * @param captor The captor.
+     * @param timeout The timeout.
+     * @return {@code True} if condition has happened within the timeout.
+     */
+    public static boolean waitForArgumentCapture(ArgumentCaptor<?> captor, 
long timeout) {
+        return waitForCondition(() -> {
+            try {
+                return captor.getValue() != null;
+            }
+            catch (Exception e) {
+                return false;
+            }
+        }, timeout);
+    }
 }
diff --git a/modules/transactions/README.md b/modules/transactions/README.md
index e840895..004008d 100644
--- a/modules/transactions/README.md
+++ b/modules/transactions/README.md
@@ -113,11 +113,13 @@ its locks. This means in the case of split-brain, some 
keys will be locked until
 tx recovery protocol will converge.
 
 ## Leaserholder fail
-An enlisted node asks a coordinator if it can commit or not.
+If a tx is not started to COMMIT, the coordinator reverts a transaction on 
remaining leaseholders.
+Then a new leasholder is elected, it check for its pending transactions and 
asks a coordinator if it's possible to commit.
 
 ## Coordinator fail
-Broadcast recovery is necessary (because we don't have full tx topology on 
each enlisted node).
-All nodes are requested about local txs state. If at least one is commiting, 
it's safe to commit.
+Broadcast recovery (various strategies are possible: via gossip or dedicated 
node) is necessary (because we don't have 
+full tx topology on each enlisted node - because it's unknown until commit). 
All nodes are requested about local txs state. 
+If at least one is commiting, it's safe to commit.
 
 <em id="f1">[1]</em> CockroachDB: The Resilient Geo-Distributed SQL Database, 
2020 [↩](#a1)<br/>
 <em id="f2">[2]</em> Concurrency Control in Distributed Database Systems, 1981 
[↩](#a2)

Reply via email to