szetszwo commented on a change in pull request #294:
URL: https://github.com/apache/incubator-ratis/pull/294#discussion_r528763724
##########
File path:
ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
##########
@@ -50,32 +57,57 @@ public void testMultipleStreamsMultipleServers() throws
Exception {
void runTestDataStream(CLUSTER cluster) throws Exception {
RaftTestUtil.waitForLeader(cluster);
- final List<CompletableFuture<Void>> futures = new ArrayList<>();
- futures.add(CompletableFuture.runAsync(() -> runTestDataStream(cluster, 5,
10, 1_000_000, 10), executor));
- futures.add(CompletableFuture.runAsync(() -> runTestDataStream(cluster, 2,
20, 1_000, 10_000), executor));
- futures.forEach(CompletableFuture::join);
+
+ final List<CompletableFuture<Long>> futures = new ArrayList<>();
+ futures.add(CompletableFuture.supplyAsync(() -> runTestDataStream(cluster,
5, 10, 1_000_000, 10), executor));
+ futures.add(CompletableFuture.supplyAsync(() -> runTestDataStream(cluster,
2, 20, 1_000, 10_000), executor));
+ final long maxIndex = futures.stream()
+ .map(CompletableFuture::join)
+ .max(Long::compareTo)
+ .orElseThrow(IllegalStateException::new);
+
+ // wait for all servers to catch up
+ try (RaftClient client = cluster.createClient()) {
+ client.async().watch(maxIndex, ReplicationLevel.ALL).join();
+ }
+ // assert all streams are linked
+ for (RaftServerProxy proxy : cluster.getServers()) {
+ final RaftServerImpl impl = proxy.getImpl(cluster.getGroupId());
+ final MultiDataStreamStateMachine stateMachine =
(MultiDataStreamStateMachine) impl.getStateMachine();
+ for (SingleDataStream s : stateMachine.getStreams()) {
+ Assert.assertNotNull(s.getLogEntry());
+ }
+ }
}
- void runTestDataStream(CLUSTER cluster, int numClients, int numStreams, int
bufferSize, int bufferNum) {
- final List<CompletableFuture<Void>> futures = new ArrayList<>();
+ Long runTestDataStream(CLUSTER cluster, int numClients, int numStreams, int
bufferSize, int bufferNum) {
+ final List<CompletableFuture<Long>> futures = new ArrayList<>();
for (int j = 0; j < numClients; j++) {
- futures.add(CompletableFuture.runAsync(() -> runTestDataStream(cluster,
numStreams, bufferSize, bufferNum), executor));
+ futures.add(CompletableFuture.supplyAsync(() ->
runTestDataStream(cluster, numStreams, bufferSize, bufferNum), executor));
}
Assert.assertEquals(numClients, futures.size());
- futures.forEach(CompletableFuture::join);
+ return futures.stream()
+ .map(CompletableFuture::join)
+ .max(Long::compareTo)
+ .orElseThrow(IllegalStateException::new);
}
- void runTestDataStream(CLUSTER cluster, int numStreams, int bufferSize, int
bufferNum) {
+ long runTestDataStream(CLUSTER cluster, int numStreams, int bufferSize, int
bufferNum) {
final Iterable<RaftServer> servers =
CollectionUtils.as(cluster.getServers(), s -> s);
- final List<CompletableFuture<Void>> futures = new ArrayList<>();
+ final RaftPeerId leader = cluster.getLeader().getId();
+ final List<CompletableFuture<RaftClientReply>> futures = new ArrayList<>();
try(RaftClient client = cluster.createClient()) {
for (int i = 0; i < numStreams; i++) {
final DataStreamOutputImpl out = (DataStreamOutputImpl)
client.getDataStreamApi().stream();
- futures.add(CompletableFuture.runAsync(() ->
DataStreamTestUtils.writeAndCloseAndAssertReplies(
- servers, out, bufferSize, bufferNum), executor));
+ futures.add(CompletableFuture.supplyAsync(() ->
DataStreamTestUtils.writeAndCloseAndAssertReplies(
+ servers, leader, out, bufferSize, bufferNum).join(), executor));
Review comment:
@runzhiwang , FYI, this is the bug in the test -- join() was missing so
that the test can pass even if the returned CompletableFuture<RaftClientReply>
might complete exceptionally. It indeed always completed exceptionally since
applyTransaction was not overridden.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]