This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new e858e11 IGNITE-15038 Fixed a race in argument capturing. - Fixes #195.
e858e11 is described below
commit e858e116941edcf275743af21112763679ce528c
Author: Alexey Scherbakov <[email protected]>
AuthorDate: Fri Jul 2 19:09:50 2021 +0300
IGNITE-15038 Fixed a race in argument capturing. - Fixes #195.
Signed-off-by: Alexey Scherbakov <[email protected]>
---
.../org/apache/ignite/raft/jraft/JRaftUtils.java | 3 +-
.../ignite/raft/jraft/core/ReplicatorTest.java | 2 +-
.../raft/jraft/storage/SnapshotExecutorTest.java | 198 ++++++++++-----------
.../snapshot/local/LocalSnapshotCopierTest.java | 31 ++--
.../apache/ignite/raft/jraft/test/TestUtils.java | 17 ++
modules/transactions/README.md | 8 +-
6 files changed, 138 insertions(+), 121 deletions(-)
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftUtils.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftUtils.java
index a3bf263..5492af6 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftUtils.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftUtils.java
@@ -62,10 +62,11 @@ public final class JRaftUtils {
* @param prefix thread name prefix
* @param number thread number
* @return a new {@link ThreadPoolExecutor} instance
+ * @throws IllegalArgumentException If a number of threads is incorrect.
*/
public static ExecutorService createExecutor(final String prefix, final
int number) {
if (number <= 0) {
- return null;
+ throw new IllegalArgumentException();
}
return ThreadPoolUtil.newBuilder() //
.poolName(prefix) //
diff --git
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorTest.java
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorTest.java
index 736ce65..9353597 100644
---
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorTest.java
@@ -362,7 +362,7 @@ public class ReplicatorTest {
Utils.monotonicMs());
assertEquals(r.statInfo.runningState, Replicator.RunningState.IDLE);
- this.id.unlock();
+ this.id.unlock(); // TODO asch fix bad unlock IGNITE-14832
assertEquals(11, Replicator.getNextIndex(this.id));
assertEquals(99, r.getWaitId());
latch.await(); //make sure catch up closure is invoked.
diff --git
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutorTest.java
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutorTest.java
index 5cd1974..75aeebd 100644
---
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutorTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutorTest.java
@@ -19,6 +19,8 @@ package org.apache.ignite.raft.jraft.storage;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.Future;
import org.apache.ignite.raft.jraft.FSMCaller;
import org.apache.ignite.raft.jraft.JRaftUtils;
import org.apache.ignite.raft.jraft.Status;
@@ -47,6 +49,7 @@ import
org.apache.ignite.raft.jraft.storage.snapshot.local.LocalSnapshotMetaTabl
import org.apache.ignite.raft.jraft.storage.snapshot.local.LocalSnapshotReader;
import
org.apache.ignite.raft.jraft.storage.snapshot.local.LocalSnapshotStorage;
import org.apache.ignite.raft.jraft.storage.snapshot.local.LocalSnapshotWriter;
+import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.raft.jraft.util.ByteString;
import org.apache.ignite.raft.jraft.util.Endpoint;
import org.apache.ignite.raft.jraft.util.Utils;
@@ -90,174 +93,171 @@ public class SnapshotExecutorTest extends BaseStorageTest
{
@Mock
private LocalSnapshotStorage snapshotStorage;
private TimerManager timerManager;
+ private NodeOptions options;
@Override
@Before
public void setup() throws Exception {
super.setup();
- this.timerManager = new TimerManager(5);
- this.raftOptions = new RaftOptions();
- this.writer = new LocalSnapshotWriter(this.path, this.snapshotStorage,
this.raftOptions);
- this.reader = new LocalSnapshotReader(this.snapshotStorage, null, new
Endpoint("localhost", 8081),
- this.raftOptions, this.path);
-
-
Mockito.lenient().when(this.snapshotStorage.open()).thenReturn(this.reader);
-
Mockito.lenient().when(this.snapshotStorage.create(true)).thenReturn(this.writer);
-
- this.table = new LocalSnapshotMetaTable(this.raftOptions);
- this.table.addFile("testFile",
LocalFileMetaOutter.LocalFileMeta.newBuilder().setChecksum("test").build());
-
this.table.setMeta(RaftOutter.SnapshotMeta.newBuilder().setLastIncludedIndex(1).setLastIncludedTerm(1).build());
- this.uri = "remote://" + this.hostPort + "/" + this.readerId;
- this.copyOpts = new CopyOptions();
-
- Mockito.when(this.node.getRaftOptions()).thenReturn(new RaftOptions());
- NodeOptions options = new NodeOptions();
+ timerManager = new TimerManager(5);
+ raftOptions = new RaftOptions();
+ writer = new LocalSnapshotWriter(path, snapshotStorage, raftOptions);
+ reader = new LocalSnapshotReader(snapshotStorage, null, new
Endpoint("localhost", 8081),
+ raftOptions, path);
+
+ Mockito.lenient().when(snapshotStorage.open()).thenReturn(reader);
+
Mockito.lenient().when(snapshotStorage.create(true)).thenReturn(writer);
+
+ table = new LocalSnapshotMetaTable(raftOptions);
+ table.addFile("testFile",
LocalFileMetaOutter.LocalFileMeta.newBuilder().setChecksum("test").build());
+
table.setMeta(RaftOutter.SnapshotMeta.newBuilder().setLastIncludedIndex(1).setLastIncludedTerm(1).build());
+ uri = "remote://" + hostPort + "/" + readerId;
+ copyOpts = new CopyOptions();
+
+ Mockito.when(node.getRaftOptions()).thenReturn(new RaftOptions());
+ options = new NodeOptions();
options.setCommonExecutor(JRaftUtils.createExecutor("test-executor",
Utils.cpus()));
- Mockito.when(this.node.getOptions()).thenReturn(options);
-
Mockito.when(this.node.getRpcClientService()).thenReturn(this.raftClientService);
-
Mockito.when(this.node.getTimerManager()).thenReturn(this.timerManager);
- Mockito.when(this.node.getServiceFactory()).thenReturn(new
DefaultJRaftServiceFactory());
- this.executor = new SnapshotExecutorImpl();
+ Mockito.when(node.getOptions()).thenReturn(options);
+ Mockito.when(node.getRpcClientService()).thenReturn(raftClientService);
+ Mockito.when(node.getTimerManager()).thenReturn(timerManager);
+ Mockito.when(node.getServiceFactory()).thenReturn(new
DefaultJRaftServiceFactory());
+ executor = new SnapshotExecutorImpl();
final SnapshotExecutorOptions opts = new SnapshotExecutorOptions();
- opts.setFsmCaller(this.fSMCaller);
+ opts.setFsmCaller(fSMCaller);
opts.setInitTerm(0);
- opts.setNode(this.node);
- opts.setLogManager(this.logManager);
- opts.setUri(this.path);
- this.addr = new Endpoint("localhost", 8081);
- opts.setAddr(this.addr);
- assertTrue(this.executor.init(opts));
+ opts.setNode(node);
+ opts.setLogManager(logManager);
+ opts.setUri(path);
+ addr = new Endpoint("localhost", 8081);
+ opts.setAddr(addr);
+ assertTrue(executor.init(opts));
}
@Override
@After
public void teardown() throws Exception {
- this.executor.shutdown();
+ executor.shutdown();
super.teardown();
- this.timerManager.shutdown();
+ timerManager.shutdown();
+ options.getCommonExecutor().shutdown();
}
@Test
public void testInstallSnapshot() throws Exception {
final RpcRequests.InstallSnapshotRequest.Builder irb =
RpcRequests.InstallSnapshotRequest.newBuilder();
irb.setGroupId("test");
- irb.setPeerId(this.addr.toString());
+ irb.setPeerId(addr.toString());
irb.setServerId("localhost:8080");
irb.setUri("remote://localhost:8080/99");
irb.setTerm(0);
irb.setMeta(RaftOutter.SnapshotMeta.newBuilder().setLastIncludedIndex(1).setLastIncludedTerm(2).build());
- Mockito.when(this.raftClientService.connect(new Endpoint("localhost",
8080))).thenReturn(true);
+ Mockito.when(raftClientService.connect(new Endpoint("localhost",
8080))).thenReturn(true);
- final CompletableFuture<Message> future = new CompletableFuture<>();
+ final CompletableFuture<Message> fut = new CompletableFuture<>();
final RpcRequests.GetFileRequest.Builder rb =
RpcRequests.GetFileRequest.newBuilder().setReaderId(99)
.setFilename(Snapshot.JRAFT_SNAPSHOT_META_FILE).setCount(Integer.MAX_VALUE).setOffset(0)
.setReadPartly(true);
- //mock get metadata
+ // Mock get metadata
ArgumentCaptor<RpcResponseClosure> argument =
ArgumentCaptor.forClass(RpcResponseClosure.class);
- Mockito.when(
- this.raftClientService.getFile(eq(new Endpoint("localhost",
8080)), eq(rb.build()),
- eq(this.copyOpts.getTimeoutMs()),
argument.capture())).thenReturn(future);
-
- Utils.runInThread(Executors.newSingleThreadExecutor(), new Runnable() {
- @Override
- public void run() {
-
SnapshotExecutorTest.this.executor.installSnapshot(irb.build(),
RpcRequests.InstallSnapshotResponse
- .newBuilder(), new
RpcRequestClosure(SnapshotExecutorTest.this.asyncCtx));
- }
- });
-
- Thread.sleep(500);
+ Mockito.when(raftClientService.getFile(eq(new Endpoint("localhost",
8080)), eq(rb.build()),
+ eq(copyOpts.getTimeoutMs()),
argument.capture())).thenReturn(fut);
+
+ Future<?> snapFut = Utils.runInThread(ForkJoinPool.commonPool(), () ->
executor.installSnapshot(irb.build(),
+ RpcRequests.InstallSnapshotResponse.newBuilder(), new
RpcRequestClosure(asyncCtx)));
+
+ assertTrue(TestUtils.waitForArgumentCapture(argument, 5_000));
+
RpcResponseClosure<RpcRequests.GetFileResponse> closure =
argument.getValue();
- final ByteBuffer metaBuf = this.table.saveToByteBufferAsRemote();
+ final ByteBuffer metaBuf = table.saveToByteBufferAsRemote();
closure.setResponse(RpcRequests.GetFileResponse.newBuilder().setReadSize(metaBuf.remaining()).setEof(true)
.setData(new ByteString(metaBuf)).build());
- //mock get file
+ // Mock get file
argument = ArgumentCaptor.forClass(RpcResponseClosure.class);
rb.setFilename("testFile");
- rb.setCount(this.raftOptions.getMaxByteCountPerRpc());
- Mockito.when(
- this.raftClientService.getFile(eq(new Endpoint("localhost",
8080)), eq(rb.build()),
- eq(this.copyOpts.getTimeoutMs()),
argument.capture())).thenReturn(future);
+ rb.setCount(raftOptions.getMaxByteCountPerRpc());
+ Mockito.when(raftClientService.getFile(eq(new Endpoint("localhost",
8080)), eq(rb.build()),
+ eq(copyOpts.getTimeoutMs()), argument.capture())).thenReturn(fut);
closure.run(Status.OK());
- Thread.sleep(500);
+ assertTrue(TestUtils.waitForArgumentCapture(argument, 5_000));
+
closure = argument.getValue();
+
closure.setResponse(RpcRequests.GetFileResponse.newBuilder().setReadSize(100).setEof(true)
.setData(new ByteString(new byte[100])).build());
- final ArgumentCaptor<LoadSnapshotClosure> loadSnapshotArg =
ArgumentCaptor.forClass(LoadSnapshotClosure.class);
-
Mockito.when(this.fSMCaller.onSnapshotLoad(loadSnapshotArg.capture())).thenReturn(true);
+ ArgumentCaptor<LoadSnapshotClosure> loadSnapshotArg =
ArgumentCaptor.forClass(LoadSnapshotClosure.class);
+
Mockito.when(fSMCaller.onSnapshotLoad(loadSnapshotArg.capture())).thenReturn(true);
closure.run(Status.OK());
- Thread.sleep(500);
+
+ assertTrue(TestUtils.waitForArgumentCapture(loadSnapshotArg, 5_000));
+
final LoadSnapshotClosure done = loadSnapshotArg.getValue();
final SnapshotReader reader = done.start();
assertNotNull(reader);
assertEquals(1, reader.listFiles().size());
assertTrue(reader.listFiles().contains("testFile"));
done.run(Status.OK());
- this.executor.join();
- assertEquals(2, this.executor.getLastSnapshotTerm());
- assertEquals(1, this.executor.getLastSnapshotIndex());
+ executor.join();
+
+ assertTrue(snapFut.isDone());
+
+ assertEquals(2, executor.getLastSnapshotTerm());
+ assertEquals(1, executor.getLastSnapshotIndex());
}
@Test
public void testInterruptInstalling() throws Exception {
final RpcRequests.InstallSnapshotRequest.Builder irb =
RpcRequests.InstallSnapshotRequest.newBuilder();
irb.setGroupId("test");
- irb.setPeerId(this.addr.toString());
+ irb.setPeerId(addr.toString());
irb.setServerId("localhost:8080");
irb.setUri("remote://localhost:8080/99");
irb.setTerm(0);
irb.setMeta(RaftOutter.SnapshotMeta.newBuilder().setLastIncludedIndex(1).setLastIncludedTerm(1).build());
- Mockito.lenient().when(this.raftClientService.connect(new
Endpoint("localhost", 8080))).thenReturn(true);
+ Mockito.lenient().when(raftClientService.connect(new
Endpoint("localhost", 8080))).thenReturn(true);
final CompletableFuture<Message> future = new CompletableFuture<>();
final RpcRequests.GetFileRequest.Builder rb =
RpcRequests.GetFileRequest.newBuilder().setReaderId(99)
.setFilename(Snapshot.JRAFT_SNAPSHOT_META_FILE).setCount(Integer.MAX_VALUE).setOffset(0)
.setReadPartly(true);
- //mock get metadata
+ // Mock get metadata
final ArgumentCaptor<RpcResponseClosure> argument =
ArgumentCaptor.forClass(RpcResponseClosure.class);
Mockito.lenient().when(
- this.raftClientService.getFile(eq(new Endpoint("localhost",
8080)), eq(rb.build()),
- eq(this.copyOpts.getTimeoutMs()),
argument.capture())).thenReturn(future);
- Utils.runInThread(Executors.newSingleThreadExecutor(), new Runnable() {
- @Override
- public void run() {
-
SnapshotExecutorTest.this.executor.installSnapshot(irb.build(),
RpcRequests.InstallSnapshotResponse
- .newBuilder(), new
RpcRequestClosure(SnapshotExecutorTest.this.asyncCtx));
-
- }
- });
-
- this.executor.interruptDownloadingSnapshots(1);
- this.executor.join();
- assertEquals(0, this.executor.getLastSnapshotTerm());
- assertEquals(0, this.executor.getLastSnapshotIndex());
+ raftClientService.getFile(eq(new Endpoint("localhost", 8080)),
eq(rb.build()),
+ eq(copyOpts.getTimeoutMs()),
argument.capture())).thenReturn(future);
+ Utils.runInThread(Executors.newSingleThreadExecutor(), () ->
executor.installSnapshot(irb.build(), RpcRequests.InstallSnapshotResponse
+ .newBuilder(), new RpcRequestClosure(asyncCtx)));
+
+ executor.interruptDownloadingSnapshots(1);
+ executor.join();
+ assertEquals(0, executor.getLastSnapshotTerm());
+ assertEquals(0, executor.getLastSnapshotIndex());
}
@Test
public void testDoSnapshot() throws Exception {
- Mockito.when(this.fSMCaller.getLastAppliedIndex()).thenReturn(1L);
+ Mockito.when(fSMCaller.getLastAppliedIndex()).thenReturn(1L);
final ArgumentCaptor<SaveSnapshotClosure> saveSnapshotClosureArg =
ArgumentCaptor
.forClass(SaveSnapshotClosure.class);
-
Mockito.when(this.fSMCaller.onSnapshotSave(saveSnapshotClosureArg.capture())).thenReturn(true);
+
Mockito.when(fSMCaller.onSnapshotSave(saveSnapshotClosureArg.capture())).thenReturn(true);
final SynchronizedClosure done = new SynchronizedClosure();
- this.executor.doSnapshot(done);
+ executor.doSnapshot(done);
final SaveSnapshotClosure closure = saveSnapshotClosureArg.getValue();
assertNotNull(closure);
closure.start(RaftOutter.SnapshotMeta.newBuilder().setLastIncludedIndex(2).setLastIncludedTerm(1).build());
closure.run(Status.OK());
done.await();
- this.executor.join();
+ executor.join();
assertTrue(done.getStatus().isOk());
- assertEquals(1, this.executor.getLastSnapshotTerm());
- assertEquals(2, this.executor.getLastSnapshotIndex());
+ assertEquals(1, executor.getLastSnapshotTerm());
+ assertEquals(2, executor.getLastSnapshotIndex());
}
@Test
@@ -265,14 +265,13 @@ public class SnapshotExecutorTest extends BaseStorageTest
{
final NodeOptions nodeOptions = new NodeOptions();
nodeOptions.setSnapshotLogIndexMargin(10);
nodeOptions.setCommonExecutor(JRaftUtils.createExecutor("test-executor",
Utils.cpus()));
- Mockito.when(this.node.getOptions()).thenReturn(nodeOptions);
- Mockito.when(this.fSMCaller.getLastAppliedIndex()).thenReturn(1L);
- this.executor.doSnapshot(null);
- this.executor.join();
-
- assertEquals(0, this.executor.getLastSnapshotTerm());
- assertEquals(0, this.executor.getLastSnapshotIndex());
+ Mockito.when(node.getOptions()).thenReturn(nodeOptions);
+ Mockito.when(fSMCaller.getLastAppliedIndex()).thenReturn(1L);
+ executor.doSnapshot(null);
+ executor.join();
+ assertEquals(0, executor.getLastSnapshotTerm());
+ assertEquals(0, executor.getLastSnapshotIndex());
}
@Test
@@ -280,23 +279,22 @@ public class SnapshotExecutorTest extends BaseStorageTest
{
final NodeOptions nodeOptions = new NodeOptions();
nodeOptions.setSnapshotLogIndexMargin(5);
nodeOptions.setCommonExecutor(JRaftUtils.createExecutor("test-executor",
Utils.cpus()));
- Mockito.when(this.node.getOptions()).thenReturn(nodeOptions);
- Mockito.when(this.fSMCaller.getLastAppliedIndex()).thenReturn(6L);
+ Mockito.when(node.getOptions()).thenReturn(nodeOptions);
+ Mockito.when(fSMCaller.getLastAppliedIndex()).thenReturn(6L);
final ArgumentCaptor<SaveSnapshotClosure> saveSnapshotClosureArg =
ArgumentCaptor
.forClass(SaveSnapshotClosure.class);
-
Mockito.when(this.fSMCaller.onSnapshotSave(saveSnapshotClosureArg.capture())).thenReturn(true);
+
Mockito.when(fSMCaller.onSnapshotSave(saveSnapshotClosureArg.capture())).thenReturn(true);
final SynchronizedClosure done = new SynchronizedClosure();
- this.executor.doSnapshot(done);
+ executor.doSnapshot(done);
final SaveSnapshotClosure closure = saveSnapshotClosureArg.getValue();
assertNotNull(closure);
closure.start(RaftOutter.SnapshotMeta.newBuilder().setLastIncludedIndex(6).setLastIncludedTerm(1).build());
closure.run(Status.OK());
done.await();
- this.executor.join();
-
- assertEquals(1, this.executor.getLastSnapshotTerm());
- assertEquals(6, this.executor.getLastSnapshotIndex());
+ executor.join();
+ assertEquals(1, executor.getLastSnapshotTerm());
+ assertEquals(6, executor.getLastSnapshotIndex());
}
}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotCopierTest.java
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotCopierTest.java
index 72bf74d..a711c15 100644
---
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotCopierTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotCopierTest.java
@@ -18,7 +18,7 @@ package org.apache.ignite.raft.jraft.storage.snapshot.local;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ForkJoinPool;
import org.apache.ignite.raft.jraft.JRaftUtils;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.core.Scheduler;
@@ -37,6 +37,7 @@ import org.apache.ignite.raft.jraft.rpc.RpcResponseClosure;
import org.apache.ignite.raft.jraft.storage.BaseStorageTest;
import org.apache.ignite.raft.jraft.storage.snapshot.Snapshot;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
+import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.raft.jraft.util.ByteString;
import org.apache.ignite.raft.jraft.util.Endpoint;
import org.apache.ignite.raft.jraft.util.Utils;
@@ -72,6 +73,7 @@ public class LocalSnapshotCopierTest extends BaseStorageTest {
@Mock
private LocalSnapshotStorage snapshotStorage;
private Scheduler timerManager;
+ private NodeOptions nodeOptions;
@Override
@Before
@@ -93,7 +95,7 @@ public class LocalSnapshotCopierTest extends BaseStorageTest {
this.copier = new LocalSnapshotCopier();
this.copyOpts = new CopyOptions();
Mockito.when(this.raftClientService.connect(new Endpoint("localhost",
8081))).thenReturn(true);
- NodeOptions nodeOptions = new NodeOptions();
+ nodeOptions = new NodeOptions();
nodeOptions.setCommonExecutor(JRaftUtils.createExecutor("test-executor",
Utils.cpus()));
assertTrue(this.copier.init(this.uri, new
SnapshotCopierOptions(this.raftClientService, this.timerManager,
this.raftOptions, nodeOptions)));
@@ -104,8 +106,9 @@ public class LocalSnapshotCopierTest extends
BaseStorageTest {
@After
public void teardown() throws Exception {
super.teardown();
- this.copier.close();
- this.timerManager.shutdown();
+ copier.close();
+ timerManager.shutdown();
+ nodeOptions.getCommonExecutor().shutdown();
}
@Test
@@ -117,12 +120,14 @@ public class LocalSnapshotCopierTest extends
BaseStorageTest {
.setReadPartly(true);
//mock get metadata
- final ArgumentCaptor<RpcResponseClosure> argument =
ArgumentCaptor.forClass(RpcResponseClosure.class);
+ ArgumentCaptor<RpcResponseClosure> argument =
ArgumentCaptor.forClass(RpcResponseClosure.class);
Mockito.when(
this.raftClientService.getFile(eq(new Endpoint("localhost",
8081)), eq(rb.build()),
eq(this.copyOpts.getTimeoutMs()),
argument.capture())).thenReturn(future);
this.copier.start();
- Thread.sleep(500);
+
+ assertTrue(TestUtils.waitForArgumentCapture(argument, 5_000));
+
final RpcResponseClosure<RpcRequests.GetFileResponse> closure =
argument.getValue();
closure.run(new Status(RaftError.ECANCELED, "test cancel"));
@@ -143,19 +148,13 @@ public class LocalSnapshotCopierTest extends
BaseStorageTest {
.setReadPartly(true);
//mock get metadata
- final ArgumentCaptor<RpcResponseClosure> argument =
ArgumentCaptor.forClass(RpcResponseClosure.class);
+ ArgumentCaptor<RpcResponseClosure> argument =
ArgumentCaptor.forClass(RpcResponseClosure.class);
Mockito.when(
this.raftClientService.getFile(eq(new Endpoint("localhost",
8081)), eq(rb.build()),
eq(this.copyOpts.getTimeoutMs()),
argument.capture())).thenReturn(future);
this.copier.start();
- Thread.sleep(10);
- Utils.runInThread(Executors.newSingleThreadExecutor(), new Runnable() {
- @Override
- public void run() {
- LocalSnapshotCopierTest.this.copier.cancel();
- }
- });
+ Utils.runInThread(ForkJoinPool.commonPool(), () ->
LocalSnapshotCopierTest.this.copier.cancel());
this.copier.join();
//start timer
@@ -179,7 +178,7 @@ public class LocalSnapshotCopierTest extends
BaseStorageTest {
this.raftClientService.getFile(eq(new Endpoint("localhost",
8081)), eq(rb.build()),
eq(this.copyOpts.getTimeoutMs()),
argument.capture())).thenReturn(future);
this.copier.start();
- Thread.sleep(500);
+ assertTrue(TestUtils.waitForArgumentCapture(argument, 5_000));
RpcResponseClosure<RpcRequests.GetFileResponse> closure =
argument.getValue();
final ByteBuffer metaBuf = this.table.saveToByteBufferAsRemote();
closure.setResponse(RpcRequests.GetFileResponse.newBuilder().setReadSize(metaBuf.remaining()).setEof(true)
@@ -195,7 +194,7 @@ public class LocalSnapshotCopierTest extends
BaseStorageTest {
closure.run(Status.OK());
- Thread.sleep(500);
+ assertTrue(TestUtils.waitForArgumentCapture(argument, 5_000));
closure = argument.getValue();
closure.setResponse(RpcRequests.GetFileResponse.newBuilder().setReadSize(100).setEof(true)
.setData(new ByteString(new byte[100])).build());
diff --git
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/TestUtils.java
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/TestUtils.java
index 001d1bd..d1eb72e 100644
---
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/TestUtils.java
+++
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/TestUtils.java
@@ -38,6 +38,7 @@ import org.apache.ignite.raft.jraft.entity.LogId;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.rpc.RpcRequests;
import org.apache.ignite.raft.jraft.util.Endpoint;
+import org.mockito.ArgumentCaptor;
import static java.lang.Thread.sleep;
@@ -174,4 +175,20 @@ public class TestUtils {
return false;
}
+
+ /**
+ * @param captor The captor.
+ * @param timeout The timeout.
+ * @return {@code True} if condition has happened within the timeout.
+ */
+ public static boolean waitForArgumentCapture(ArgumentCaptor<?> captor,
long timeout) {
+ return waitForCondition(() -> {
+ try {
+ return captor.getValue() != null;
+ }
+ catch (Exception e) {
+ return false;
+ }
+ }, timeout);
+ }
}
diff --git a/modules/transactions/README.md b/modules/transactions/README.md
index e840895..004008d 100644
--- a/modules/transactions/README.md
+++ b/modules/transactions/README.md
@@ -113,11 +113,13 @@ its locks. This means in the case of split-brain, some
keys will be locked until
tx recovery protocol will converge.
## Leaserholder fail
-An enlisted node asks a coordinator if it can commit or not.
+If a tx is not started to COMMIT, the coordinator reverts a transaction on
remaining leaseholders.
+Then a new leasholder is elected, it check for its pending transactions and
asks a coordinator if it's possible to commit.
## Coordinator fail
-Broadcast recovery is necessary (because we don't have full tx topology on
each enlisted node).
-All nodes are requested about local txs state. If at least one is commiting,
it's safe to commit.
+Broadcast recovery (various strategies are possible: via gossip or dedicated
node) is necessary (because we don't have
+full tx topology on each enlisted node - because it's unknown until commit).
All nodes are requested about local txs state.
+If at least one is commiting, it's safe to commit.
<em id="f1">[1]</em> CockroachDB: The Resilient Geo-Distributed SQL Database,
2020 [↩](#a1)<br/>
<em id="f2">[2]</em> Concurrency Control in Distributed Database Systems, 1981
[↩](#a2)