This is an automated email from the ASF dual-hosted git repository.
toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
The following commit(s) were added to refs/heads/master by this push:
new 1b14afb Fix gossip test - make sure writing to file is synchronized
1b14afb is described below
commit 1b14afb9c5fcad4a6d69526274fced75fe04db3c
Author: Antoine Toulme <[email protected]>
AuthorDate: Thu May 30 14:45:04 2019 -0700
Fix gossip test - make sure writing to file is synchronized
---
.../java/org/apache/tuweni/gossip/GossipApp.java | 30 ++++++++-------
.../tuweni/gossip/GossipIntegrationTest.java | 44 +++++++++++++++++-----
2 files changed, 51 insertions(+), 23 deletions(-)
diff --git a/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java
b/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java
index f69e12b..bcf0eb3 100644
--- a/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java
+++ b/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java
@@ -101,7 +101,7 @@ public final class GossipApp {
repository,
bytes -> readMessage(opts.messageLog(), errStream, bytes),
null,
- new CountingPeerPruningFunction(10),
+ new CountingPeerPruningFunction(10000000),
100,
100);
this.opts = opts;
@@ -212,19 +212,21 @@ public final class GossipApp {
}
private void readMessage(String messageLog, PrintStream err, Bytes bytes) {
- ObjectMapper mapper = new ObjectMapper();
- ObjectNode node = mapper.createObjectNode();
- node.put("timestamp", Instant.now().toString());
- node.put("value", new String(bytes.toArrayUnsafe(),
StandardCharsets.UTF_8));
- try {
- Path path = Paths.get(messageLog);
- Files.write(
- path,
- Collections.singletonList(mapper.writeValueAsString(node)),
- StandardCharsets.UTF_8,
- Files.exists(path) ? StandardOpenOption.APPEND :
StandardOpenOption.CREATE);
- } catch (IOException e) {
- err.println(e.getMessage());
+ synchronized (this) {
+ ObjectMapper mapper = new ObjectMapper();
+ ObjectNode node = mapper.createObjectNode();
+ node.put("timestamp", Instant.now().toString());
+ node.put("value", new String(bytes.toArrayUnsafe(),
StandardCharsets.UTF_8));
+ try {
+ Path path = Paths.get(messageLog);
+ Files.write(
+ path,
+ Collections.singletonList(mapper.writeValueAsString(node)),
+ StandardCharsets.UTF_8,
+ Files.exists(path) ? StandardOpenOption.APPEND :
StandardOpenOption.CREATE);
+ } catch (IOException e) {
+ err.println(e.getMessage());
+ }
}
}
diff --git
a/gossip/src/test/java/org/apache/tuweni/gossip/GossipIntegrationTest.java
b/gossip/src/test/java/org/apache/tuweni/gossip/GossipIntegrationTest.java
index 73a9d8d..70d9d23 100644
--- a/gossip/src/test/java/org/apache/tuweni/gossip/GossipIntegrationTest.java
+++ b/gossip/src/test/java/org/apache/tuweni/gossip/GossipIntegrationTest.java
@@ -22,6 +22,7 @@ import org.apache.tuweni.junit.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
@@ -29,6 +30,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
@@ -108,37 +111,60 @@ class GossipIntegrationTest {
HttpClient client = vertx.createHttpClient();
+ List<String> sent = new ArrayList<>();
+
for (int i = 0; i < 20; i++) {
+ String message = Bytes32.rightPad(Bytes.ofUnsignedInt(i)).toHexString();
+ sent.add(message);
+
+ Thread.sleep(100);
client.request(HttpMethod.POST, 10000, "127.0.0.1",
"/publish").exceptionHandler(thr -> {
throw new RuntimeException(thr);
}).handler(resp -> {
-
}).end(Buffer.buffer(Bytes32.rightPad(Bytes.ofUnsignedInt(i)).toHexString().getBytes(StandardCharsets.UTF_8)));
+ }).end(Buffer.buffer(message.getBytes(StandardCharsets.UTF_8)));
}
List<String> receiver1 = Collections.emptyList();
-
+ List<String> receiver2 = Collections.emptyList();
int counter = 0;
do {
- Thread.sleep(1000);
+ Thread.sleep(100);
counter++;
if (Files.exists(tempDir.resolve("log2.log"))) {
receiver1 = Files.readAllLines(tempDir.resolve("log2.log"));
}
- } while (receiver1.size() < 20 && counter < 20);
+ if (Files.exists(tempDir.resolve("log3.log"))) {
+ receiver2 = Files.readAllLines(tempDir.resolve("log3.log"));
+ }
+ } while ((receiver1.size() < 20 || receiver2.size() < 20) && counter <
100);
client.close();
service.submit(app1::stop);
service.submit(app2::stop);
service.submit(app3::stop);
+ service.shutdown();
+ List<String> receiver1Expected = new ArrayList<>(sent);
+ Pattern pattern = Pattern.compile("value\":\"(.*)\"}$");
+ for (String received : receiver1) {
+ Matcher match = pattern.matcher(received);
+ match.find();
+ String value = match.group(1);
+ receiver1Expected.remove(value);
+ }
+ List<String> receiver2Expected = new ArrayList<>(sent);
+ for (String received : receiver2) {
+ Matcher match = pattern.matcher(received);
+ match.find();
+ String value = match.group(1);
+ receiver2Expected.remove(value);
+ }
- assertEquals(20, receiver1.size());
- List<String> receiver2 = Files.readAllLines(tempDir.resolve("log3.log"));
+ receiver2 = Files.readAllLines(tempDir.resolve("log3.log"));
assertEquals(20, receiver2.size());
-
- service.shutdown();
-
+ receiver1 = Files.readAllLines(tempDir.resolve("log2.log"));
+ assertEquals(20, receiver1.size());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]