This is an automated email from the ASF dual-hosted git repository.
toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
The following commit(s) were added to refs/heads/master by this push:
new 39713a2 Add more flexibility to plumtree, allowing peer pruning
management
39713a2 is described below
commit 39713a2e638b419bc79d494528067eb8866af14f
Author: Antoine Toulme <[email protected]>
AuthorDate: Wed May 29 21:53:03 2019 -0700
Add more flexibility to plumtree, allowing peer pruning management
---
.../tuweni/gossip/CountingPeerPruningFunction.java | 47 +++++++++
.../java/org/apache/tuweni/gossip/GossipApp.java | 46 +++++++--
.../tuweni/gossip/GossipCommandLineOptions.java | 94 ++++++++++++++++--
.../tuweni/gossip/LoggingPeerRepository.java | 27 +++--
.../gossip/GossipCommandLineOptionsTest.java | 47 +++++++--
.../tuweni/gossip/GossipIntegrationTest.java | 12 +++
...sipIntegrationTest.java => GossipLoadTest.java} | 97 +++++++++++-------
.../tuweni/plumtree/EphemeralPeerRepository.java | 3 +-
.../org/apache/tuweni/plumtree/PeerPruning.java | 29 ++++++
.../org/apache/tuweni/plumtree/PeerRepository.java | 3 +-
.../java/org/apache/tuweni/plumtree/State.java | 39 ++++++--
.../tuweni/plumtree/vertx/VertxGossipServer.java | 59 +++++++----
.../java/org/apache/tuweni/plumtree/StateTest.java | 109 ++++++++++++++++-----
.../plumtree/vertx/VertxGossipServerTest.java | 40 ++++++--
14 files changed, 518 insertions(+), 134 deletions(-)
diff --git
a/gossip/src/main/java/org/apache/tuweni/gossip/CountingPeerPruningFunction.java
b/gossip/src/main/java/org/apache/tuweni/gossip/CountingPeerPruningFunction.java
new file mode 100644
index 0000000..f7307f4
--- /dev/null
+++
b/gossip/src/main/java/org/apache/tuweni/gossip/CountingPeerPruningFunction.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.tuweni.gossip;
+
+import org.apache.tuweni.plumtree.Peer;
+import org.apache.tuweni.plumtree.PeerPruning;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.WeakHashMap;
+
+/**
+ * Function counting the number of times a peer showed.
+ */
+final class CountingPeerPruningFunction implements PeerPruning {
+
+ private final int numberOfOccurrences;
+ private final Map<Peer, Integer> countingOccurrences =
Collections.synchronizedMap(new WeakHashMap<>());
+
+ public CountingPeerPruningFunction(int numberOfOccurrences) {
+ this.numberOfOccurrences = numberOfOccurrences;
+ }
+
+ @Override
+ public boolean prunePeer(Peer peer) {
+ Integer currentValue = countingOccurrences.putIfAbsent(peer, 1);
+ if (currentValue != null) {
+ if (currentValue + 1 >= numberOfOccurrences) {
+ countingOccurrences.remove(peer);
+ return true;
+ }
+ countingOccurrences.put(peer, currentValue + 1);
+ }
+
+ return false;
+ }
+}
diff --git a/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java
b/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java
index 173ec7b..f69e12b 100644
--- a/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java
+++ b/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java
@@ -17,7 +17,6 @@ import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.concurrent.AsyncCompletion;
import org.apache.tuweni.concurrent.CompletableAsyncCompletion;
import org.apache.tuweni.crypto.Hash;
-import org.apache.tuweni.plumtree.EphemeralPeerRepository;
import org.apache.tuweni.plumtree.vertx.VertxGossipServer;
import java.io.IOException;
@@ -31,6 +30,9 @@ import java.security.Security;
import java.time.Instant;
import java.util.Collections;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -68,8 +70,14 @@ public final class GossipApp {
gossipApp.start();
}
-
-
+ private final ExecutorService senderThreadPool =
Executors.newSingleThreadExecutor(new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r, "sender");
+ t.setDaemon(false);
+ return t;
+ }
+ });
private final GossipCommandLineOptions opts;
private final Runnable terminateFunction;
private final PrintStream errStream;
@@ -83,7 +91,7 @@ public final class GossipApp {
PrintStream errStream,
PrintStream outStream,
Runnable terminateFunction) {
- EphemeralPeerRepository repository = new EphemeralPeerRepository();
+ LoggingPeerRepository repository = new LoggingPeerRepository(outStream);
outStream.println("Setting up server on " + opts.networkInterface() + ":"
+ opts.listenPort());
server = new VertxGossipServer(
vertx,
@@ -92,7 +100,10 @@ public final class GossipApp {
Hash::keccak256,
repository,
bytes -> readMessage(opts.messageLog(), errStream, bytes),
- null);
+ null,
+ new CountingPeerPruningFunction(10),
+ 100,
+ 100);
this.opts = opts;
this.errStream = errStream;
this.outStream = outStream;
@@ -135,13 +146,33 @@ public final class GossipApp {
errStream.println("Server could not connect to other peers: " +
e.getMessage());
}
outStream.println("Gossip started");
+
+ if (opts.sending()) {
+ outStream.println("Start sending messages");
+ senderThreadPool.submit(() -> {
+ for (int i = 0; i < opts.numberOfMessages(); i++) {
+ if (Thread.currentThread().isInterrupted()) {
+ return;
+ }
+ Bytes payload = Bytes.random(opts.payloadSize());
+ publish(payload);
+ try {
+ Thread.sleep(opts.sendInterval());
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+ });
+ }
}
private void handleRPCRequest(HttpServerRequest httpServerRequest) {
if (HttpMethod.POST.equals(httpServerRequest.method())) {
if ("/publish".equals(httpServerRequest.path())) {
httpServerRequest.bodyHandler(body -> {
- publish(Bytes.wrapBuffer(body));
+ Bytes message = Bytes.wrapBuffer(body);
+ outStream.println("Message to publish " + message.toHexString());
+ publish(message);
httpServerRequest.response().setStatusCode(200).end();
});
} else {
@@ -153,6 +184,8 @@ public final class GossipApp {
}
void stop() {
+ outStream.println("Stopping sending");
+ senderThreadPool.shutdown();
outStream.println("Stopping gossip");
try {
server.stop().join();
@@ -196,7 +229,6 @@ public final class GossipApp {
}
public void publish(Bytes message) {
- outStream.println("Message to publish " + message.toHexString());
server.gossip("", message);
}
}
diff --git
a/gossip/src/main/java/org/apache/tuweni/gossip/GossipCommandLineOptions.java
b/gossip/src/main/java/org/apache/tuweni/gossip/GossipCommandLineOptions.java
index 04a6f35..e1e776e 100644
---
a/gossip/src/main/java/org/apache/tuweni/gossip/GossipCommandLineOptions.java
+++
b/gossip/src/main/java/org/apache/tuweni/gossip/GossipCommandLineOptions.java
@@ -41,6 +41,14 @@ final class GossipCommandLineOptions {
.addString("networkInterface", "0.0.0.0", "Network interface to bind
to", null)
.addListOfString("peers", Collections.emptyList(), "Static peers
list", null)
.addString("messagelog", "messages.log", "Log file where messages are
stored", null)
+ .addBoolean("sending", false, "Whether this peer sends random messages
to all other peers (load testing)", null)
+ .addInteger(
+ "sendInterval",
+ 1000,
+ "Interval to wait in between sending messages in milliseconds
(load testing)",
+ null)
+ .addInteger("numberOfMessages", 100, "Number of messages to publish to
other peers (load testing)", null)
+ .addInteger("payloadSize", 200, "Size of the random payload to send to
other peers (load testing)", null)
.toSchema();
}
@@ -62,6 +70,21 @@ final class GossipCommandLineOptions {
@CommandLine.Option(names = {"-m", "--messageLog"} , description = "Log file
where messages are stored")
private String messageLog;
+ @CommandLine.Option(names = {"--sendInterval"} ,
+ description = "Interval to wait in between sending messages in
milliseconds (load testing)")
+ private Integer sendInterval;
+
+ @CommandLine.Option(names = {"--payloadSize"} ,
+ description = "Size of the random payload to send to other peers (load
testing)")
+ private Integer payloadSize;
+
+ @CommandLine.Option(names = {"--numberOfMessages"} , description = "Number
of messages to publish (load testing)")
+ private Integer numberOfMessages;
+
+ @CommandLine.Option(names = {"--sending"} ,
+ description = "Whether this peer sends random messages to all other
peers (load testing)")
+ private Boolean sending;
+
@CommandLine.Option(names = {"-h", "--help"} , description = "Prints usage
prompt")
private boolean help;
@@ -70,12 +93,19 @@ final class GossipCommandLineOptions {
GossipCommandLineOptions() {}
+ /**
+ * Constructor used for testing.
+ */
GossipCommandLineOptions(
String[] peers,
Integer port,
String networkInterface,
String messageLog,
Integer rpcPort,
+ Integer payloadSize,
+ Integer sendInterval,
+ Boolean sending,
+ Integer numberOfMessages,
Configuration config) {
this.peers = peers;
this.port = port;
@@ -83,6 +113,10 @@ final class GossipCommandLineOptions {
this.messageLog = messageLog;
this.rpcPort = rpcPort;
this.config = config;
+ this.payloadSize = payloadSize;
+ this.sendInterval = sendInterval;
+ this.numberOfMessages = numberOfMessages;
+ this.sending = sending;
}
private Configuration config() {
@@ -101,20 +135,12 @@ final class GossipCommandLineOptions {
peerAddresses = new ArrayList<>();
if (peers != null) {
for (String peer : peers) {
- URI peerURI = URI.create(peer);
- if (peerURI.getHost() == null) {
- throw new IllegalArgumentException("Invalid peer URI " + peerURI);
- }
- peerAddresses.add(peerURI);
+ readPeerInfo(peer);
}
} else {
if (config() != null) {
for (String peer : config().getListOfString("peers")) {
- URI peerURI = URI.create(peer);
- if (peerURI.getHost() == null) {
- throw new IllegalArgumentException("Invalid peer URI " +
peerURI);
- }
- peerAddresses.add(peerURI);
+ readPeerInfo(peer);
}
}
}
@@ -122,6 +148,14 @@ final class GossipCommandLineOptions {
return peerAddresses;
}
+ private void readPeerInfo(String peer) {
+ URI peerURI = URI.create(peer);
+ if (peerURI.getHost() == null) {
+ throw new IllegalArgumentException("Invalid peer URI " + peerURI);
+ }
+ peerAddresses.add(peerURI);
+ }
+
void validate() {
int listenPort = listenPort();
if (listenPort < 0 || listenPort > 65535) {
@@ -187,6 +221,46 @@ final class GossipCommandLineOptions {
return "messages.log";
}
+ boolean sending() {
+ if (sending != null) {
+ return sending;
+ }
+ if (config != null) {
+ return config.getBoolean("sending");
+ }
+ return false;
+ }
+
+ Integer payloadSize() {
+ if (payloadSize != null) {
+ return payloadSize;
+ }
+ if (config != null) {
+ return config.getInteger("payloadSize");
+ }
+ return 200;
+ }
+
+ Integer sendInterval() {
+ if (sendInterval != null) {
+ return sendInterval;
+ }
+ if (config != null) {
+ return config.getInteger("sendInterval");
+ }
+ return 1000;
+ }
+
+ Integer numberOfMessages() {
+ if (numberOfMessages != null) {
+ return numberOfMessages;
+ }
+ if (config != null) {
+ return config.getInteger("numberOfMessages");
+ }
+ return 100;
+ }
+
boolean help() {
return help;
}
diff --git
a/plumtree/src/main/java/org/apache/tuweni/plumtree/EphemeralPeerRepository.java
b/gossip/src/main/java/org/apache/tuweni/gossip/LoggingPeerRepository.java
similarity index 75%
copy from
plumtree/src/main/java/org/apache/tuweni/plumtree/EphemeralPeerRepository.java
copy to gossip/src/main/java/org/apache/tuweni/gossip/LoggingPeerRepository.java
index 7d0f4f7..ad0a450 100644
---
a/plumtree/src/main/java/org/apache/tuweni/plumtree/EphemeralPeerRepository.java
+++ b/gossip/src/main/java/org/apache/tuweni/gossip/LoggingPeerRepository.java
@@ -10,21 +10,27 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
-package org.apache.tuweni.plumtree;
+package org.apache.tuweni.gossip;
+import org.apache.tuweni.plumtree.Peer;
+import org.apache.tuweni.plumtree.PeerRepository;
+
+import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-/**
- * In-memory peer repository.
- */
-public final class EphemeralPeerRepository implements PeerRepository {
+final class LoggingPeerRepository implements PeerRepository {
private final Set<Peer> eagerPushPeers = ConcurrentHashMap.newKeySet();
private final Set<Peer> lazyPushPeers = ConcurrentHashMap.newKeySet();
+ private final PrintStream logger;
+
+ public LoggingPeerRepository(PrintStream logger) {
+ this.logger = logger;
+ }
@Override
public void addEager(Peer peer) {
@@ -50,18 +56,22 @@ public final class EphemeralPeerRepository implements
PeerRepository {
@Override
public void removePeer(Peer peer) {
+ logger.println("Removing peer " + peer);
lazyPushPeers.remove(peer);
eagerPushPeers.remove(peer);
}
@Override
- public void moveToLazy(Peer peer) {
+ public boolean moveToLazy(Peer peer) {
+ logger.println("Move peer to lazy " + peer);
eagerPushPeers.remove(peer);
lazyPushPeers.add(peer);
+ return true;
}
@Override
public void moveToEager(Peer peer) {
+ logger.println("Move peer to eager " + peer);
lazyPushPeers.remove(peer);
eagerPushPeers.add(peer);
}
@@ -69,8 +79,11 @@ public final class EphemeralPeerRepository implements
PeerRepository {
@Override
public void considerNewPeer(Peer peer) {
if (!lazyPushPeers.contains(peer)) {
- eagerPushPeers.add(peer);
+ if (eagerPushPeers.add(peer)) {
+ logger.println("Added new peer " + peer);
+ }
}
}
+
}
diff --git
a/gossip/src/test/java/org/apache/tuweni/gossip/GossipCommandLineOptionsTest.java
b/gossip/src/test/java/org/apache/tuweni/gossip/GossipCommandLineOptionsTest.java
index cd6b885..64a8d77 100644
---
a/gossip/src/test/java/org/apache/tuweni/gossip/GossipCommandLineOptionsTest.java
+++
b/gossip/src/test/java/org/apache/tuweni/gossip/GossipCommandLineOptionsTest.java
@@ -26,20 +26,31 @@ class GossipCommandLineOptionsTest {
@Test
void testInvalidPort() {
- GossipCommandLineOptions opts = new GossipCommandLineOptions(new
String[0], -4, "0.0.0.0", null, 3, null);
+ GossipCommandLineOptions opts =
+ new GossipCommandLineOptions(new String[0], -4, "0.0.0.0", null, 3, 0,
0, false, 50, null);
assertThrows(IllegalArgumentException.class, opts::validate);
}
@Test
void testInvalidPeer() {
- GossipCommandLineOptions opts =
- new GossipCommandLineOptions(new String[]
{"tcp://400.300.200.100:9000"}, 10, "0.0.0.0", null, 3, null);
+ GossipCommandLineOptions opts = new GossipCommandLineOptions(
+ new String[] {"tcp://400.300.200.100:9000"},
+ 10,
+ "0.0.0.0",
+ null,
+ 3,
+ 0,
+ 0,
+ false,
+ 50,
+ null);
assertThrows(IllegalArgumentException.class, opts::validate);
}
@Test
void testInvalidNetworkInterface() {
- GossipCommandLineOptions opts = new GossipCommandLineOptions(new String[]
{}, 10, "400.300.200.100", null, 3, null);
+ GossipCommandLineOptions opts =
+ new GossipCommandLineOptions(new String[] {}, 10, "400.300.200.100",
null, 3, 0, 0, false, 50, null);
assertThrows(IllegalArgumentException.class, opts::validate);
}
@@ -52,7 +63,7 @@ class GossipCommandLineOptionsTest {
+ "networkInterface=\"127.0.0.1\"\n"
+ "messageLog=\"D:/Temp\"",
GossipCommandLineOptions.createConfigFileSchema());
- GossipCommandLineOptions opts = new GossipCommandLineOptions(null, null,
null, null, 3000, config);
+ GossipCommandLineOptions opts = new GossipCommandLineOptions(null, null,
null, null, 3000, 0, 0, false, 50, config);
opts.validate();
assertEquals(1080, opts.listenPort());
assertEquals(1, opts.peers().size());
@@ -70,7 +81,18 @@ class GossipCommandLineOptionsTest {
+ "networkInterface=\"127.0.0.1\"\n"
+ "messageLog=\"D:/Temp\"",
GossipCommandLineOptions.createConfigFileSchema());
- GossipCommandLineOptions opts = new GossipCommandLineOptions(null, null,
null, null, 3000, config);
+ GossipCommandLineOptions opts = new GossipCommandLineOptions(
+ null,
+ null,
+ null,
+ null,
+ 3000,
+ 0,
+ 0,
+
+ false,
+ 50,
+ config);
assertThrows(IllegalArgumentException.class, opts::validate);
}
@@ -82,8 +104,17 @@ class GossipCommandLineOptionsTest {
+ "listenPort=1080\n"
+ "networkInterface=\"127.0.0.1\"\n"
+ "messageLog=\"D:/Temp\"");
- GossipCommandLineOptions opts =
- new GossipCommandLineOptions(new String[] {"tcp://192.168.0.1:3000"},
400, "0.0.0.0", "C:/Temp", 3000, config);
+ GossipCommandLineOptions opts = new GossipCommandLineOptions(
+ new String[] {"tcp://192.168.0.1:3000"},
+ 400,
+ "0.0.0.0",
+ "C:/Temp",
+ 3000,
+ 0,
+ 0,
+ false,
+ 50,
+ config);
assertEquals(400, opts.listenPort());
assertEquals(1, opts.peers().size());
assertEquals(URI.create("tcp://192.168.0.1:3000"), opts.peers().get(0));
diff --git
a/gossip/src/test/java/org/apache/tuweni/gossip/GossipIntegrationTest.java
b/gossip/src/test/java/org/apache/tuweni/gossip/GossipIntegrationTest.java
index 8a0e31f..73a9d8d 100644
--- a/gossip/src/test/java/org/apache/tuweni/gossip/GossipIntegrationTest.java
+++ b/gossip/src/test/java/org/apache/tuweni/gossip/GossipIntegrationTest.java
@@ -48,6 +48,10 @@ class GossipIntegrationTest {
"127.0.0.1",
tempDir.resolve("log1.log").toString(),
10000,
+ 0,
+ 0,
+ false,
+ 50,
null);
GossipCommandLineOptions opts2 = new GossipCommandLineOptions(
new String[] {"tcp://127.0.0.1:9000", "tcp://127.0.0.1:9002"},
@@ -55,6 +59,10 @@ class GossipIntegrationTest {
"127.0.0.1",
tempDir.resolve("log2.log").toString(),
10001,
+ 0,
+ 0,
+ false,
+ 50,
null);
GossipCommandLineOptions opts3 = new GossipCommandLineOptions(
new String[] {"tcp://127.0.0.1:9000", "tcp://127.0.0.1:9001"},
@@ -62,6 +70,10 @@ class GossipIntegrationTest {
"127.0.0.1",
tempDir.resolve("log3.log").toString(),
10002,
+ 0,
+ 0,
+ false,
+ 50,
null);
AtomicBoolean terminationRan = new AtomicBoolean(false);
diff --git
a/gossip/src/test/java/org/apache/tuweni/gossip/GossipIntegrationTest.java
b/gossip/src/test/java/org/apache/tuweni/gossip/GossipLoadTest.java
similarity index 61%
copy from
gossip/src/test/java/org/apache/tuweni/gossip/GossipIntegrationTest.java
copy to gossip/src/test/java/org/apache/tuweni/gossip/GossipLoadTest.java
index 8a0e31f..81b23b2 100644
--- a/gossip/src/test/java/org/apache/tuweni/gossip/GossipIntegrationTest.java
+++ b/gossip/src/test/java/org/apache/tuweni/gossip/GossipLoadTest.java
@@ -15,14 +15,14 @@ package org.apache.tuweni.gossip;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
-import org.apache.tuweni.bytes.Bytes;
-import org.apache.tuweni.bytes.Bytes32;
-import org.apache.tuweni.junit.*;
+import org.apache.tuweni.junit.BouncyCastleExtension;
+import org.apache.tuweni.junit.TempDirectory;
+import org.apache.tuweni.junit.TempDirectoryExtension;
+import org.apache.tuweni.junit.VertxExtension;
+import org.apache.tuweni.junit.VertxInstance;
-import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -31,41 +31,68 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import io.vertx.core.Vertx;
-import io.vertx.core.buffer.Buffer;
-import io.vertx.core.http.HttpClient;
-import io.vertx.core.http.HttpMethod;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+/**
+ * Very basic load test scenario in one JVM.
+ */
@ExtendWith({VertxExtension.class, TempDirectoryExtension.class,
BouncyCastleExtension.class})
-class GossipIntegrationTest {
+class GossipLoadTest {
+ @Disabled
@Test
- void threeGossipServersStarting(@VertxInstance Vertx vertx, @TempDirectory
Path tempDir) throws Exception {
+ void fourGossipServersWithOneSender(@VertxInstance Vertx vertx,
@TempDirectory Path tempDir) throws Exception {
+ int numberOfMessages = 10000;
+ int sendingInterval = 20;
GossipCommandLineOptions opts1 = new GossipCommandLineOptions(
- new String[] {"tcp://127.0.0.1:9001", "tcp://127.0.0.1:9002"},
+ new String[] {"tcp://127.0.0.1:9001", "tcp://127.0.0.1:9002",
"tcp://127.0.0.1:9003"},
9000,
"127.0.0.1",
tempDir.resolve("log1.log").toString(),
10000,
+ 500,
+ 20,
+ true,
+ numberOfMessages,
null);
GossipCommandLineOptions opts2 = new GossipCommandLineOptions(
- new String[] {"tcp://127.0.0.1:9000", "tcp://127.0.0.1:9002"},
+ new String[] {},
9001,
"127.0.0.1",
tempDir.resolve("log2.log").toString(),
10001,
+ 0,
+ 0,
+ false,
+ 0,
null);
GossipCommandLineOptions opts3 = new GossipCommandLineOptions(
- new String[] {"tcp://127.0.0.1:9000", "tcp://127.0.0.1:9001"},
+ new String[] {"tcp://127.0.0.1:9003"},
9002,
"127.0.0.1",
tempDir.resolve("log3.log").toString(),
10002,
+ 0,
+ 0,
+ false,
+ 0,
+ null);
+ GossipCommandLineOptions opts4 = new GossipCommandLineOptions(
+ new String[] {},
+ 9003,
+ "127.0.0.1",
+ tempDir.resolve("log4.log").toString(),
+ 10003,
+ 0,
+ 0,
+ false,
+ 0,
null);
AtomicBoolean terminationRan = new AtomicBoolean(false);
- ExecutorService service = Executors.newFixedThreadPool(3);
+ ExecutorService service = Executors.newFixedThreadPool(4);
Future<GossipApp> app1Future = service.submit(() -> {
GossipApp app = new GossipApp(vertx, opts1, System.err, System.out, ()
-> {
@@ -88,43 +115,37 @@ class GossipIntegrationTest {
app.start();
return app;
});
+ Future<GossipApp> app4Future = service.submit(() -> {
+ GossipApp app = new GossipApp(vertx, opts4, System.err, System.out, ()
-> {
+ terminationRan.set(true);
+ });
+ app.start();
+ return app;
+ });
GossipApp app1 = app1Future.get(10, TimeUnit.SECONDS);
GossipApp app2 = app2Future.get(10, TimeUnit.SECONDS);
GossipApp app3 = app3Future.get(10, TimeUnit.SECONDS);
+ GossipApp app4 = app4Future.get(10, TimeUnit.SECONDS);
assertFalse(terminationRan.get());
- HttpClient client = vertx.createHttpClient();
-
- for (int i = 0; i < 20; i++) {
- client.request(HttpMethod.POST, 10000, "127.0.0.1",
"/publish").exceptionHandler(thr -> {
- throw new RuntimeException(thr);
- }).handler(resp -> {
-
-
}).end(Buffer.buffer(Bytes32.rightPad(Bytes.ofUnsignedInt(i)).toHexString().getBytes(StandardCharsets.UTF_8)));
- }
-
- List<String> receiver1 = Collections.emptyList();
-
- int counter = 0;
- do {
- Thread.sleep(1000);
- counter++;
- if (Files.exists(tempDir.resolve("log2.log"))) {
- receiver1 = Files.readAllLines(tempDir.resolve("log2.log"));
- }
- } while (receiver1.size() < 20 && counter < 20);
+ Thread.sleep((long) (numberOfMessages * sendingInterval * 1.33));
- client.close();
service.submit(app1::stop);
service.submit(app2::stop);
service.submit(app3::stop);
+ service.submit(app4::stop);
+ List<String> receiver2 = Files.readAllLines(tempDir.resolve("log2.log"));
+ List<String> receiver3 = Files.readAllLines(tempDir.resolve("log3.log"));
+ List<String> receiver4 = Files.readAllLines(tempDir.resolve("log4.log"));
+ System.out.println("n2: " + receiver2.size() + "\n3: " + receiver3.size()
+ "\n4: " + receiver4.size());
+ assertEquals(numberOfMessages, receiver2.size());
+ assertEquals(numberOfMessages, receiver3.size());
+ assertEquals(numberOfMessages, receiver4.size());
- assertEquals(20, receiver1.size());
- List<String> receiver2 = Files.readAllLines(tempDir.resolve("log3.log"));
- assertEquals(20, receiver2.size());
+ assertFalse(tempDir.resolve("log1.log").toFile().exists());
service.shutdown();
diff --git
a/plumtree/src/main/java/org/apache/tuweni/plumtree/EphemeralPeerRepository.java
b/plumtree/src/main/java/org/apache/tuweni/plumtree/EphemeralPeerRepository.java
index 7d0f4f7..883ae2a 100644
---
a/plumtree/src/main/java/org/apache/tuweni/plumtree/EphemeralPeerRepository.java
+++
b/plumtree/src/main/java/org/apache/tuweni/plumtree/EphemeralPeerRepository.java
@@ -55,9 +55,10 @@ public final class EphemeralPeerRepository implements
PeerRepository {
}
@Override
- public void moveToLazy(Peer peer) {
+ public boolean moveToLazy(Peer peer) {
eagerPushPeers.remove(peer);
lazyPushPeers.add(peer);
+ return true;
}
@Override
diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/PeerPruning.java
b/plumtree/src/main/java/org/apache/tuweni/plumtree/PeerPruning.java
new file mode 100644
index 0000000..f33eaf1
--- /dev/null
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/PeerPruning.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.tuweni.plumtree;
+
+/**
+ * Interface to decide whether to prune peers when they send messages late.
+ *
+ * Pruned peers become "lazy peers". They send message attestations (IHAVE).
+ */
+public interface PeerPruning {
+
+ /**
+ * Decides whether to prune a peer
+ *
+ * @param peer the peer to consider
+ * @return true if the peer should be pruned
+ */
+ boolean prunePeer(Peer peer);
+}
diff --git
a/plumtree/src/main/java/org/apache/tuweni/plumtree/PeerRepository.java
b/plumtree/src/main/java/org/apache/tuweni/plumtree/PeerRepository.java
index 9f0419c..d0f7f5d 100644
--- a/plumtree/src/main/java/org/apache/tuweni/plumtree/PeerRepository.java
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/PeerRepository.java
@@ -54,8 +54,9 @@ public interface PeerRepository {
* Moves a peer to the list of lazy peers
*
* @param peer the peer to move
+ * @return true if the move was effective
*/
- void moveToLazy(Peer peer);
+ boolean moveToLazy(Peer peer);
/**
* Moves a peer to the list of eager peers.
diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java
b/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java
index 26832ad..463583b 100644
--- a/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java
@@ -48,6 +48,7 @@ public final class State {
private final MessageSender messageSender;
private final Consumer<Bytes> messageListener;
private final MessageValidator messageValidator;
+ private final PeerPruning peerPruningFunction;
final Queue<Runnable> lazyQueue = new ConcurrentLinkedQueue<>();
private final Timer timer = new Timer("plumtree", true);
private final long delay;
@@ -66,7 +67,7 @@ public final class State {
}
/**
- * Acts on receiving the full message
+ * Acts on receiving the full message.
*
* @param sender the sender - may be null if we are submitting this
message to the network
* @param message the payload to send to the network
@@ -98,8 +99,9 @@ public final class State {
}
} else {
if (sender != null) {
- messageSender.sendMessage(MessageSender.Verb.PRUNE, null, sender,
hash, null);
- peerRepository.moveToLazy(sender);
+ if (peerPruningFunction.prunePeer(sender)) {
+ messageSender.sendMessage(MessageSender.Verb.PRUNE, null, sender,
hash, null);
+ }
}
}
}
@@ -138,24 +140,35 @@ public final class State {
* @param messageSender a function abstracting sending messages to other
peers.
* @param messageListener a function consuming messages when they are
gossiped.
* @param messageValidator a function validating messages before they are
gossiped to other peers.
+ * @param peerPruningFunction a function deciding whether to prune peers.
*/
public State(
PeerRepository peerRepository,
MessageHashing messageHashingFunction,
MessageSender messageSender,
Consumer<Bytes> messageListener,
- MessageValidator messageValidator) {
- this(peerRepository, messageHashingFunction, messageSender,
messageListener, messageValidator, 5000, 5000);
+ MessageValidator messageValidator,
+ PeerPruning peerPruningFunction) {
+ this(
+ peerRepository,
+ messageHashingFunction,
+ messageSender,
+ messageListener,
+ messageValidator,
+ peerPruningFunction,
+ 5000,
+ 5000);
}
/**
- * Constructor using default time constants.
+ * Default constructor.
*
* @param peerRepository the peer repository to use to store and access peer
information.
* @param messageHashingFunction the function to use to hash messages into
hashes to compare them.
* @param messageSender a function abstracting sending messages to other
peers.
* @param messageListener a function consuming messages when they are
gossiped.
* @param messageValidator a function validating messages before they are
gossiped to other peers.
+ * @param peerPruningFunction a function deciding whether to prune peers.
* @param graftDelay delay in milliseconds to apply before this peer grafts
an other peer when it finds that peer has
* data it misses.
* @param lazyQueueInterval the interval in milliseconds between sending
messages to lazy peers.
@@ -166,6 +179,7 @@ public final class State {
MessageSender messageSender,
Consumer<Bytes> messageListener,
MessageValidator messageValidator,
+ PeerPruning peerPruningFunction,
long graftDelay,
long lazyQueueInterval) {
this.peerRepository = peerRepository;
@@ -173,6 +187,7 @@ public final class State {
this.messageSender = messageSender;
this.messageListener = messageListener;
this.messageValidator = messageValidator;
+ this.peerPruningFunction = peerPruningFunction;
this.delay = graftDelay;
timer.schedule(new TimerTask() {
@Override
@@ -208,9 +223,13 @@ public final class State {
* @param attributes of the message
* @param message the hash of the message
*/
- public void receiveGossipMessage(Peer peer, String attributes, Bytes
message) {
+ public void receiveGossipMessage(Peer peer, String attributes, Bytes
message, Bytes messageHash) {
+ Bytes checkHash = messageHashingFunction.hash(message);
+ if (!checkHash.equals(messageHash)) {
+ return;
+ }
peerRepository.considerNewPeer(peer);
- MessageHandler handler =
messageHandlers.computeIfAbsent(messageHashingFunction.hash(message),
MessageHandler::new);
+ MessageHandler handler = messageHandlers.computeIfAbsent(messageHash,
MessageHandler::new);
handler.fullMessageReceived(peer, attributes, message);
}
@@ -226,7 +245,7 @@ public final class State {
}
/**
- * Requests a peer be pruned away from the eager peers into the lazy peers
+ * Requests a peer be pruned away from the eager peers into the lazy peers.
*
* @param peer the peer to move to lazy peers
*/
@@ -235,7 +254,7 @@ public final class State {
}
/**
- * Requests a peer be grafted to the eager peers list
+ * Requests a peer be grafted to the eager peers list.
*
* @param peer the peer to add to the eager peers
* @param messageHash the hash of the message that triggers this grafting
diff --git
a/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
index dce838d..5d8e75a 100644
---
a/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
+++
b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
@@ -19,6 +19,7 @@ import org.apache.tuweni.plumtree.MessageHashing;
import org.apache.tuweni.plumtree.MessageSender;
import org.apache.tuweni.plumtree.MessageValidator;
import org.apache.tuweni.plumtree.Peer;
+import org.apache.tuweni.plumtree.PeerPruning;
import org.apache.tuweni.plumtree.PeerRepository;
import org.apache.tuweni.plumtree.State;
@@ -78,10 +79,14 @@ public final class VertxGossipServer {
switch (message.verb) {
case IHAVE:
- state.receiveIHaveMessage(peer,
Bytes.fromHexString(message.payload));
+ state.receiveIHaveMessage(peer, Bytes.fromHexString(message.hash));
break;
case GOSSIP:
- state.receiveGossipMessage(peer, message.attributes,
Bytes.fromHexString(message.payload));
+ state.receiveGossipMessage(
+ peer,
+ message.attributes,
+ Bytes.fromHexString(message.payload),
+ Bytes.fromHexString(message.hash));
break;
case GRAFT:
state.receiveGraftMessage(peer,
Bytes.fromHexString(message.payload));
@@ -98,17 +103,20 @@ public final class VertxGossipServer {
}
}
- private final AtomicBoolean started = new AtomicBoolean(false);
- private final Vertx vertx;
- private final int port;
+ private NetClient client;
+ private final int graftDelay;
+ private final int lazyQueueInterval;
private final MessageHashing messageHashing;
private final String networkInterface;
- private final PeerRepository peerRepository;
private final Consumer<Bytes> payloadListener;
private final MessageValidator payloadValidator;
- private State state;
+ private final PeerPruning peerPruningFunction;
+ private final PeerRepository peerRepository;
+ private final int port;
private NetServer server;
- private NetClient client;
+ private final AtomicBoolean started = new AtomicBoolean(false);
+ private State state;
+ private final Vertx vertx;
public VertxGossipServer(
Vertx vertx,
@@ -117,7 +125,10 @@ public final class VertxGossipServer {
MessageHashing messageHashing,
PeerRepository peerRepository,
Consumer<Bytes> payloadListener,
- @Nullable MessageValidator payloadValidator) {
+ @Nullable MessageValidator payloadValidator,
+ @Nullable PeerPruning peerPruningFunction,
+ int graftDelay,
+ int lazyQueueInterval) {
this.vertx = vertx;
this.networkInterface = networkInterface;
this.port = port;
@@ -125,6 +136,9 @@ public final class VertxGossipServer {
this.peerRepository = peerRepository;
this.payloadListener = payloadListener;
this.payloadValidator = payloadValidator == null ? (bytes, peer) -> true :
payloadValidator;
+ this.peerPruningFunction = peerPruningFunction == null ? (peer) -> true :
peerPruningFunction;
+ this.graftDelay = graftDelay;
+ this.lazyQueueInterval = lazyQueueInterval;
}
public AsyncCompletion start() {
@@ -135,24 +149,27 @@ public final class VertxGossipServer {
server.connectHandler(socket -> {
Peer peer = new SocketPeer(socket);
SocketHandler handler = new SocketHandler(peer);
- socket.handler(handler::handle).closeHandler(handler::close);
+
socket.handler(handler::handle).closeHandler(handler::close).exceptionHandler(Throwable::printStackTrace);
});
+ server.exceptionHandler(Throwable::printStackTrace);
server.listen(port, networkInterface, res -> {
if (res.failed()) {
completion.completeExceptionally(res.cause());
} else {
state = new State(peerRepository, messageHashing, (verb, attributes,
peer, hash, payload) -> {
- Message message = new Message();
- message.verb = verb;
- message.attributes = attributes;
- message.hash = hash.toHexString();
- message.payload = payload == null ? null : payload.toHexString();
- try {
- ((SocketPeer)
peer).socket().write(Buffer.buffer(mapper.writeValueAsBytes(message)));
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }, payloadListener, payloadValidator);
+ vertx.runOnContext(handler -> {
+ Message message = new Message();
+ message.verb = verb;
+ message.attributes = attributes;
+ message.hash = hash.toHexString();
+ message.payload = payload == null ? null : payload.toHexString();
+ try {
+ ((SocketPeer)
peer).socket().write(Buffer.buffer(mapper.writeValueAsBytes(message)));
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }, payloadListener, payloadValidator, peerPruningFunction,
graftDelay, lazyQueueInterval);
completion.complete();
}
diff --git a/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java
b/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java
index b171ee8..01a31cc 100644
--- a/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java
+++ b/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java
@@ -56,7 +56,13 @@ class StateTest {
@Test
void testInitialState() {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
- State state = new State(repo, Hash::keccak256, new MockMessageSender(),
messageRef::set, (message, peer) -> true);
+ State state = new State(
+ repo,
+ Hash::keccak256,
+ new MockMessageSender(),
+ messageRef::set,
+ (message, peer) -> true,
+ (peer) -> true);
assertTrue(repo.peers().isEmpty());
assertTrue(repo.lazyPushPeers().isEmpty());
assertTrue(repo.eagerPushPeers().isEmpty());
@@ -65,7 +71,13 @@ class StateTest {
@Test
void firstRoundWithThreePeers() {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
- State state = new State(repo, Hash::keccak256, new MockMessageSender(),
messageRef::set, (message, peer) -> true);
+ State state = new State(
+ repo,
+ Hash::keccak256,
+ new MockMessageSender(),
+ messageRef::set,
+ (message, peer) -> true,
+ (peer) -> true);
state.addPeer(new PeerImpl());
state.addPeer(new PeerImpl());
state.addPeer(new PeerImpl());
@@ -76,7 +88,13 @@ class StateTest {
@Test
void firstRoundWithTwoPeers() {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
- State state = new State(repo, Hash::keccak256, new MockMessageSender(),
messageRef::set, (message, peer) -> true);
+ State state = new State(
+ repo,
+ Hash::keccak256,
+ new MockMessageSender(),
+ messageRef::set,
+ (message, peer) -> true,
+ (peer) -> true);
state.addPeer(new PeerImpl());
state.addPeer(new PeerImpl());
assertTrue(repo.lazyPushPeers().isEmpty());
@@ -86,7 +104,13 @@ class StateTest {
@Test
void firstRoundWithOnePeer() {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
- State state = new State(repo, Hash::keccak256, new MockMessageSender(),
messageRef::set, (message, peer) -> true);
+ State state = new State(
+ repo,
+ Hash::keccak256,
+ new MockMessageSender(),
+ messageRef::set,
+ (message, peer) -> true,
+ (peer) -> true);
state.addPeer(new PeerImpl());
assertTrue(repo.lazyPushPeers().isEmpty());
assertEquals(1, repo.eagerPushPeers().size());
@@ -95,7 +119,13 @@ class StateTest {
@Test
void removePeer() {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
- State state = new State(repo, Hash::keccak256, new MockMessageSender(),
messageRef::set, (message, peer) -> true);
+ State state = new State(
+ repo,
+ Hash::keccak256,
+ new MockMessageSender(),
+ messageRef::set,
+ (message, peer) -> true,
+ (peer) -> true);
Peer peer = new PeerImpl();
state.addPeer(peer);
state.removePeer(peer);
@@ -107,7 +137,13 @@ class StateTest {
@Test
void prunePeer() {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
- State state = new State(repo, Hash::keccak256, new MockMessageSender(),
messageRef::set, (message, peer) -> true);
+ State state = new State(
+ repo,
+ Hash::keccak256,
+ new MockMessageSender(),
+ messageRef::set,
+ (message, peer) -> true,
+ (peer) -> true);
Peer peer = new PeerImpl();
state.addPeer(peer);
state.receivePruneMessage(peer);
@@ -118,7 +154,13 @@ class StateTest {
@Test
void graftPeer() {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
- State state = new State(repo, Hash::keccak256, new MockMessageSender(),
messageRef::set, (message, peer) -> true);
+ State state = new State(
+ repo,
+ Hash::keccak256,
+ new MockMessageSender(),
+ messageRef::set,
+ (message, peer) -> true,
+ (peer) -> true);
Peer peer = new PeerImpl();
state.addPeer(peer);
state.receivePruneMessage(peer);
@@ -133,14 +175,15 @@ class StateTest {
void receiveFullMessageFromEagerPeer() {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
MockMessageSender messageSender = new MockMessageSender();
- State state = new State(repo, Hash::keccak256, messageSender,
messageRef::set, (message, peer) -> true);
+ State state =
+ new State(repo, Hash::keccak256, messageSender, messageRef::set,
(message, peer) -> true, (peer) -> true);
Peer peer = new PeerImpl();
state.addPeer(peer);
Peer otherPeer = new PeerImpl();
state.addPeer(otherPeer);
Bytes32 msg = Bytes32.random();
String attributes = "{\"message_type\": \"BLOCK\"}";
- state.receiveGossipMessage(peer, attributes, msg);
+ state.receiveGossipMessage(peer, attributes, msg, Hash.keccak256(msg));
assertEquals(msg, messageSender.payload);
assertEquals(otherPeer, messageSender.peer);
}
@@ -149,7 +192,8 @@ class StateTest {
void receiveFullMessageFromEagerPeerWithALazyPeer() {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
MockMessageSender messageSender = new MockMessageSender();
- State state = new State(repo, Hash::keccak256, messageSender,
messageRef::set, (message, peer) -> true);
+ State state =
+ new State(repo, Hash::keccak256, messageSender, messageRef::set,
(message, peer) -> true, (peer) -> true);
Peer peer = new PeerImpl();
state.addPeer(peer);
Peer otherPeer = new PeerImpl();
@@ -159,7 +203,7 @@ class StateTest {
state.addPeer(lazyPeer);
repo.moveToLazy(lazyPeer);
String attributes = "{\"message_type\": \"BLOCK\"}";
- state.receiveGossipMessage(peer, attributes, msg);
+ state.receiveGossipMessage(peer, attributes, msg, Hash.keccak256(msg));
assertEquals(msg, messageSender.payload);
assertEquals(otherPeer, messageSender.peer);
state.processQueue();
@@ -173,7 +217,8 @@ class StateTest {
void receiveFullMessageFromEagerPeerThenPartialMessageFromLazyPeer() {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
MockMessageSender messageSender = new MockMessageSender();
- State state = new State(repo, Hash::keccak256, messageSender,
messageRef::set, (message, peer) -> true);
+ State state =
+ new State(repo, Hash::keccak256, messageSender, messageRef::set,
(message, peer) -> true, (peer) -> true);
Peer peer = new PeerImpl();
state.addPeer(peer);
Peer lazyPeer = new PeerImpl();
@@ -181,7 +226,7 @@ class StateTest {
repo.moveToLazy(lazyPeer);
Bytes message = Bytes32.random();
String attributes = "{\"message_type\": \"BLOCK\"}";
- state.receiveGossipMessage(peer, attributes, message);
+ state.receiveGossipMessage(peer, attributes, message,
Hash.keccak256(message));
state.receiveIHaveMessage(lazyPeer, message);
assertNull(messageSender.payload);
assertNull(messageSender.peer);
@@ -191,7 +236,15 @@ class StateTest {
void receivePartialMessageFromLazyPeerAndNoFullMessage() throws Exception {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
MockMessageSender messageSender = new MockMessageSender();
- State state = new State(repo, Hash::keccak256, messageSender,
messageRef::set, (message, peer) -> true, 100, 4000);
+ State state = new State(
+ repo,
+ Hash::keccak256,
+ messageSender,
+ messageRef::set,
+ (message, peer) -> true,
+ (peer) -> true,
+ 100,
+ 4000);
Peer peer = new PeerImpl();
state.addPeer(peer);
Peer lazyPeer = new PeerImpl();
@@ -209,7 +262,15 @@ class StateTest {
void receivePartialMessageFromLazyPeerAndThenFullMessage() throws Exception {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
MockMessageSender messageSender = new MockMessageSender();
- State state = new State(repo, Hash::keccak256, messageSender,
messageRef::set, (message, peer) -> true, 500, 4000);
+ State state = new State(
+ repo,
+ Hash::keccak256,
+ messageSender,
+ messageRef::set,
+ (message, peer) -> true,
+ (peer) -> true,
+ 500,
+ 4000);
Peer peer = new PeerImpl();
state.addPeer(peer);
Peer lazyPeer = new PeerImpl();
@@ -219,7 +280,7 @@ class StateTest {
state.receiveIHaveMessage(lazyPeer, Hash.keccak256(message));
Thread.sleep(100);
String attributes = "{\"message_type\": \"BLOCK\"}";
- state.receiveGossipMessage(peer, attributes, message);
+ state.receiveGossipMessage(peer, attributes, message,
Hash.keccak256(message));
Thread.sleep(500);
assertNull(messageSender.verb);
assertNull(messageSender.payload);
@@ -230,11 +291,12 @@ class StateTest {
void receiveFullMessageFromUnknownPeer() {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
MockMessageSender messageSender = new MockMessageSender();
- State state = new State(repo, Hash::keccak256, messageSender,
messageRef::set, (message, peer) -> true);
+ State state =
+ new State(repo, Hash::keccak256, messageSender, messageRef::set,
(message, peer) -> true, (peer) -> true);
Peer peer = new PeerImpl();
Bytes message = Bytes32.random();
String attributes = "{\"message_type\": \"BLOCK\"}";
- state.receiveGossipMessage(peer, attributes, message);
+ state.receiveGossipMessage(peer, attributes, message,
Hash.keccak256(message));
assertEquals(1, repo.eagerPushPeers().size());
assertEquals(0, repo.lazyPushPeers().size());
assertEquals(peer, repo.eagerPushPeers().iterator().next());
@@ -244,15 +306,16 @@ class StateTest {
void prunePeerWhenReceivingTwiceTheSameFullMessage() {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
MockMessageSender messageSender = new MockMessageSender();
- State state = new State(repo, Hash::keccak256, messageSender,
messageRef::set, (message, peer) -> true);
+ State state =
+ new State(repo, Hash::keccak256, messageSender, messageRef::set,
(message, peer) -> true, (peer) -> true);
Peer peer = new PeerImpl();
Peer secondPeer = new PeerImpl();
Bytes message = Bytes32.random();
String attributes = "{\"message_type\": \"BLOCK\"}";
- state.receiveGossipMessage(peer, attributes, message);
- state.receiveGossipMessage(secondPeer, attributes, message);
- assertEquals(1, repo.eagerPushPeers().size());
- assertEquals(1, repo.lazyPushPeers().size());
+ state.receiveGossipMessage(peer, attributes, message,
Hash.keccak256(message));
+ state.receiveGossipMessage(secondPeer, attributes, message,
Hash.keccak256(message));
+ assertEquals(2, repo.eagerPushPeers().size());
+ assertEquals(0, repo.lazyPushPeers().size());
assertNull(messageSender.payload);
assertEquals(secondPeer, messageSender.peer);
assertEquals(MessageSender.Verb.PRUNE, messageSender.verb);
diff --git
a/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
b/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
index d5dd5d8..29ae06c 100644
---
a/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
+++
b/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
@@ -43,7 +43,10 @@ class VertxGossipServerTest {
bytes -> bytes,
new EphemeralPeerRepository(),
messageReceived1::set,
- (message, peer) -> true);
+ (message, peer) -> true,
+ null,
+ 200,
+ 200);
VertxGossipServer server2 = new VertxGossipServer(
vertx,
"127.0.0.1",
@@ -51,7 +54,10 @@ class VertxGossipServerTest {
bytes -> bytes,
new EphemeralPeerRepository(),
messageReceived2::set,
- (message, peer) -> true);
+ (message, peer) -> true,
+ null,
+ 200,
+ 200);
server1.start().join();
server2.start().join();
@@ -85,7 +91,10 @@ class VertxGossipServerTest {
bytes -> bytes,
new EphemeralPeerRepository(),
messageReceived1::set,
- (message, peer) -> true);
+ (message, peer) -> true,
+ null,
+ 200,
+ 200);
VertxGossipServer server2 = new VertxGossipServer(
vertx,
"127.0.0.1",
@@ -93,7 +102,10 @@ class VertxGossipServerTest {
bytes -> bytes,
new EphemeralPeerRepository(),
messageReceived2::set,
- (message, peer) -> true);
+ (message, peer) -> true,
+ null,
+ 200,
+ 200);
VertxGossipServer server3 = new VertxGossipServer(
vertx,
"127.0.0.1",
@@ -101,7 +113,10 @@ class VertxGossipServerTest {
bytes -> bytes,
new EphemeralPeerRepository(),
messageReceived3::set,
- (message, peer) -> true);
+ (message, peer) -> true,
+ null,
+ 200,
+ 200);
server1.start().join();
server2.start().join();
@@ -141,7 +156,10 @@ class VertxGossipServerTest {
bytes -> bytes,
peerRepository1,
messageReceived1::set,
- (message, peer) -> true);
+ (message, peer) -> true,
+ null,
+ 200,
+ 200);
VertxGossipServer server2 = new VertxGossipServer(
vertx,
"127.0.0.1",
@@ -149,7 +167,10 @@ class VertxGossipServerTest {
bytes -> bytes,
new EphemeralPeerRepository(),
messageReceived2::set,
- (message, peer) -> true);
+ (message, peer) -> true,
+ null,
+ 200,
+ 200);
VertxGossipServer server3 = new VertxGossipServer(
vertx,
"127.0.0.1",
@@ -157,7 +178,10 @@ class VertxGossipServerTest {
bytes -> bytes,
peerRepository3,
messageReceived2::set,
- (message, peer) -> true);
+ (message, peer) -> true,
+ null,
+ 200,
+ 200);
server1.start().join();
server2.start().join();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]