This is an automated email from the ASF dual-hosted git repository. toulmean pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
The following commit(s) were added to refs/heads/master by this push: new 39713a2 Add more flexibility to plumtree, allowing peer pruning management 39713a2 is described below commit 39713a2e638b419bc79d494528067eb8866af14f Author: Antoine Toulme <anto...@lunar-ocean.com> AuthorDate: Wed May 29 21:53:03 2019 -0700 Add more flexibility to plumtree, allowing peer pruning management --- .../tuweni/gossip/CountingPeerPruningFunction.java | 47 +++++++++ .../java/org/apache/tuweni/gossip/GossipApp.java | 46 +++++++-- .../tuweni/gossip/GossipCommandLineOptions.java | 94 ++++++++++++++++-- .../tuweni/gossip/LoggingPeerRepository.java | 27 +++-- .../gossip/GossipCommandLineOptionsTest.java | 47 +++++++-- .../tuweni/gossip/GossipIntegrationTest.java | 12 +++ ...sipIntegrationTest.java => GossipLoadTest.java} | 97 +++++++++++------- .../tuweni/plumtree/EphemeralPeerRepository.java | 3 +- .../org/apache/tuweni/plumtree/PeerPruning.java | 29 ++++++ .../org/apache/tuweni/plumtree/PeerRepository.java | 3 +- .../java/org/apache/tuweni/plumtree/State.java | 39 ++++++-- .../tuweni/plumtree/vertx/VertxGossipServer.java | 59 +++++++---- .../java/org/apache/tuweni/plumtree/StateTest.java | 109 ++++++++++++++++----- .../plumtree/vertx/VertxGossipServerTest.java | 40 ++++++-- 14 files changed, 518 insertions(+), 134 deletions(-) diff --git a/gossip/src/main/java/org/apache/tuweni/gossip/CountingPeerPruningFunction.java b/gossip/src/main/java/org/apache/tuweni/gossip/CountingPeerPruningFunction.java new file mode 100644 index 0000000..f7307f4 --- /dev/null +++ b/gossip/src/main/java/org/apache/tuweni/gossip/CountingPeerPruningFunction.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.tuweni.gossip; + +import org.apache.tuweni.plumtree.Peer; +import org.apache.tuweni.plumtree.PeerPruning; + +import java.util.Collections; +import java.util.Map; +import java.util.WeakHashMap; + +/** + * Function counting the number of times a peer showed. + */ +final class CountingPeerPruningFunction implements PeerPruning { + + private final int numberOfOccurrences; + private final Map<Peer, Integer> countingOccurrences = Collections.synchronizedMap(new WeakHashMap<>()); + + public CountingPeerPruningFunction(int numberOfOccurrences) { + this.numberOfOccurrences = numberOfOccurrences; + } + + @Override + public boolean prunePeer(Peer peer) { + Integer currentValue = countingOccurrences.putIfAbsent(peer, 1); + if (currentValue != null) { + if (currentValue + 1 >= numberOfOccurrences) { + countingOccurrences.remove(peer); + return true; + } + countingOccurrences.put(peer, currentValue + 1); + } + + return false; + } +} diff --git a/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java b/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java index 173ec7b..f69e12b 100644 --- a/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java +++ b/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java @@ -17,7 +17,6 @@ import org.apache.tuweni.bytes.Bytes; import org.apache.tuweni.concurrent.AsyncCompletion; import org.apache.tuweni.concurrent.CompletableAsyncCompletion; import org.apache.tuweni.crypto.Hash; -import org.apache.tuweni.plumtree.EphemeralPeerRepository; import org.apache.tuweni.plumtree.vertx.VertxGossipServer; import java.io.IOException; @@ -31,6 +30,9 @@ import java.security.Security; import java.time.Instant; import java.util.Collections; import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -68,8 +70,14 @@ public final class GossipApp { gossipApp.start(); } - - + private final ExecutorService senderThreadPool = Executors.newSingleThreadExecutor(new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, "sender"); + t.setDaemon(false); + return t; + } + }); private final GossipCommandLineOptions opts; private final Runnable terminateFunction; private final PrintStream errStream; @@ -83,7 +91,7 @@ public final class GossipApp { PrintStream errStream, PrintStream outStream, Runnable terminateFunction) { - EphemeralPeerRepository repository = new EphemeralPeerRepository(); + LoggingPeerRepository repository = new LoggingPeerRepository(outStream); outStream.println("Setting up server on " + opts.networkInterface() + ":" + opts.listenPort()); server = new VertxGossipServer( vertx, @@ -92,7 +100,10 @@ public final class GossipApp { Hash::keccak256, repository, bytes -> readMessage(opts.messageLog(), errStream, bytes), - null); + null, + new CountingPeerPruningFunction(10), + 100, + 100); this.opts = opts; this.errStream = errStream; this.outStream = outStream; @@ -135,13 +146,33 @@ public final class GossipApp { errStream.println("Server could not connect to other peers: " + e.getMessage()); } outStream.println("Gossip started"); + + if (opts.sending()) { + outStream.println("Start sending messages"); + senderThreadPool.submit(() -> { + for (int i = 0; i < opts.numberOfMessages(); i++) { + if (Thread.currentThread().isInterrupted()) { + return; + } + Bytes payload = Bytes.random(opts.payloadSize()); + publish(payload); + try { + Thread.sleep(opts.sendInterval()); + } catch (InterruptedException e) { + return; + } + } + }); + } } private void handleRPCRequest(HttpServerRequest httpServerRequest) { if (HttpMethod.POST.equals(httpServerRequest.method())) { if ("/publish".equals(httpServerRequest.path())) { httpServerRequest.bodyHandler(body -> { - publish(Bytes.wrapBuffer(body)); + Bytes message = Bytes.wrapBuffer(body); + outStream.println("Message to publish " + message.toHexString()); + publish(message); httpServerRequest.response().setStatusCode(200).end(); }); } else { @@ -153,6 +184,8 @@ public final class GossipApp { } void stop() { + outStream.println("Stopping sending"); + senderThreadPool.shutdown(); outStream.println("Stopping gossip"); try { server.stop().join(); @@ -196,7 +229,6 @@ public final class GossipApp { } public void publish(Bytes message) { - outStream.println("Message to publish " + message.toHexString()); server.gossip("", message); } } diff --git a/gossip/src/main/java/org/apache/tuweni/gossip/GossipCommandLineOptions.java b/gossip/src/main/java/org/apache/tuweni/gossip/GossipCommandLineOptions.java index 04a6f35..e1e776e 100644 --- a/gossip/src/main/java/org/apache/tuweni/gossip/GossipCommandLineOptions.java +++ b/gossip/src/main/java/org/apache/tuweni/gossip/GossipCommandLineOptions.java @@ -41,6 +41,14 @@ final class GossipCommandLineOptions { .addString("networkInterface", "0.0.0.0", "Network interface to bind to", null) .addListOfString("peers", Collections.emptyList(), "Static peers list", null) .addString("messagelog", "messages.log", "Log file where messages are stored", null) + .addBoolean("sending", false, "Whether this peer sends random messages to all other peers (load testing)", null) + .addInteger( + "sendInterval", + 1000, + "Interval to wait in between sending messages in milliseconds (load testing)", + null) + .addInteger("numberOfMessages", 100, "Number of messages to publish to other peers (load testing)", null) + .addInteger("payloadSize", 200, "Size of the random payload to send to other peers (load testing)", null) .toSchema(); } @@ -62,6 +70,21 @@ final class GossipCommandLineOptions { @CommandLine.Option(names = {"-m", "--messageLog"} , description = "Log file where messages are stored") private String messageLog; + @CommandLine.Option(names = {"--sendInterval"} , + description = "Interval to wait in between sending messages in milliseconds (load testing)") + private Integer sendInterval; + + @CommandLine.Option(names = {"--payloadSize"} , + description = "Size of the random payload to send to other peers (load testing)") + private Integer payloadSize; + + @CommandLine.Option(names = {"--numberOfMessages"} , description = "Number of messages to publish (load testing)") + private Integer numberOfMessages; + + @CommandLine.Option(names = {"--sending"} , + description = "Whether this peer sends random messages to all other peers (load testing)") + private Boolean sending; + @CommandLine.Option(names = {"-h", "--help"} , description = "Prints usage prompt") private boolean help; @@ -70,12 +93,19 @@ final class GossipCommandLineOptions { GossipCommandLineOptions() {} + /** + * Constructor used for testing. + */ GossipCommandLineOptions( String[] peers, Integer port, String networkInterface, String messageLog, Integer rpcPort, + Integer payloadSize, + Integer sendInterval, + Boolean sending, + Integer numberOfMessages, Configuration config) { this.peers = peers; this.port = port; @@ -83,6 +113,10 @@ final class GossipCommandLineOptions { this.messageLog = messageLog; this.rpcPort = rpcPort; this.config = config; + this.payloadSize = payloadSize; + this.sendInterval = sendInterval; + this.numberOfMessages = numberOfMessages; + this.sending = sending; } private Configuration config() { @@ -101,20 +135,12 @@ final class GossipCommandLineOptions { peerAddresses = new ArrayList<>(); if (peers != null) { for (String peer : peers) { - URI peerURI = URI.create(peer); - if (peerURI.getHost() == null) { - throw new IllegalArgumentException("Invalid peer URI " + peerURI); - } - peerAddresses.add(peerURI); + readPeerInfo(peer); } } else { if (config() != null) { for (String peer : config().getListOfString("peers")) { - URI peerURI = URI.create(peer); - if (peerURI.getHost() == null) { - throw new IllegalArgumentException("Invalid peer URI " + peerURI); - } - peerAddresses.add(peerURI); + readPeerInfo(peer); } } } @@ -122,6 +148,14 @@ final class GossipCommandLineOptions { return peerAddresses; } + private void readPeerInfo(String peer) { + URI peerURI = URI.create(peer); + if (peerURI.getHost() == null) { + throw new IllegalArgumentException("Invalid peer URI " + peerURI); + } + peerAddresses.add(peerURI); + } + void validate() { int listenPort = listenPort(); if (listenPort < 0 || listenPort > 65535) { @@ -187,6 +221,46 @@ final class GossipCommandLineOptions { return "messages.log"; } + boolean sending() { + if (sending != null) { + return sending; + } + if (config != null) { + return config.getBoolean("sending"); + } + return false; + } + + Integer payloadSize() { + if (payloadSize != null) { + return payloadSize; + } + if (config != null) { + return config.getInteger("payloadSize"); + } + return 200; + } + + Integer sendInterval() { + if (sendInterval != null) { + return sendInterval; + } + if (config != null) { + return config.getInteger("sendInterval"); + } + return 1000; + } + + Integer numberOfMessages() { + if (numberOfMessages != null) { + return numberOfMessages; + } + if (config != null) { + return config.getInteger("numberOfMessages"); + } + return 100; + } + boolean help() { return help; } diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/EphemeralPeerRepository.java b/gossip/src/main/java/org/apache/tuweni/gossip/LoggingPeerRepository.java similarity index 75% copy from plumtree/src/main/java/org/apache/tuweni/plumtree/EphemeralPeerRepository.java copy to gossip/src/main/java/org/apache/tuweni/gossip/LoggingPeerRepository.java index 7d0f4f7..ad0a450 100644 --- a/plumtree/src/main/java/org/apache/tuweni/plumtree/EphemeralPeerRepository.java +++ b/gossip/src/main/java/org/apache/tuweni/gossip/LoggingPeerRepository.java @@ -10,21 +10,27 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package org.apache.tuweni.plumtree; +package org.apache.tuweni.gossip; +import org.apache.tuweni.plumtree.Peer; +import org.apache.tuweni.plumtree.PeerRepository; + +import java.io.PrintStream; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -/** - * In-memory peer repository. - */ -public final class EphemeralPeerRepository implements PeerRepository { +final class LoggingPeerRepository implements PeerRepository { private final Set<Peer> eagerPushPeers = ConcurrentHashMap.newKeySet(); private final Set<Peer> lazyPushPeers = ConcurrentHashMap.newKeySet(); + private final PrintStream logger; + + public LoggingPeerRepository(PrintStream logger) { + this.logger = logger; + } @Override public void addEager(Peer peer) { @@ -50,18 +56,22 @@ public final class EphemeralPeerRepository implements PeerRepository { @Override public void removePeer(Peer peer) { + logger.println("Removing peer " + peer); lazyPushPeers.remove(peer); eagerPushPeers.remove(peer); } @Override - public void moveToLazy(Peer peer) { + public boolean moveToLazy(Peer peer) { + logger.println("Move peer to lazy " + peer); eagerPushPeers.remove(peer); lazyPushPeers.add(peer); + return true; } @Override public void moveToEager(Peer peer) { + logger.println("Move peer to eager " + peer); lazyPushPeers.remove(peer); eagerPushPeers.add(peer); } @@ -69,8 +79,11 @@ public final class EphemeralPeerRepository implements PeerRepository { @Override public void considerNewPeer(Peer peer) { if (!lazyPushPeers.contains(peer)) { - eagerPushPeers.add(peer); + if (eagerPushPeers.add(peer)) { + logger.println("Added new peer " + peer); + } } } + } diff --git a/gossip/src/test/java/org/apache/tuweni/gossip/GossipCommandLineOptionsTest.java b/gossip/src/test/java/org/apache/tuweni/gossip/GossipCommandLineOptionsTest.java index cd6b885..64a8d77 100644 --- a/gossip/src/test/java/org/apache/tuweni/gossip/GossipCommandLineOptionsTest.java +++ b/gossip/src/test/java/org/apache/tuweni/gossip/GossipCommandLineOptionsTest.java @@ -26,20 +26,31 @@ class GossipCommandLineOptionsTest { @Test void testInvalidPort() { - GossipCommandLineOptions opts = new GossipCommandLineOptions(new String[0], -4, "0.0.0.0", null, 3, null); + GossipCommandLineOptions opts = + new GossipCommandLineOptions(new String[0], -4, "0.0.0.0", null, 3, 0, 0, false, 50, null); assertThrows(IllegalArgumentException.class, opts::validate); } @Test void testInvalidPeer() { - GossipCommandLineOptions opts = - new GossipCommandLineOptions(new String[] {"tcp://400.300.200.100:9000"}, 10, "0.0.0.0", null, 3, null); + GossipCommandLineOptions opts = new GossipCommandLineOptions( + new String[] {"tcp://400.300.200.100:9000"}, + 10, + "0.0.0.0", + null, + 3, + 0, + 0, + false, + 50, + null); assertThrows(IllegalArgumentException.class, opts::validate); } @Test void testInvalidNetworkInterface() { - GossipCommandLineOptions opts = new GossipCommandLineOptions(new String[] {}, 10, "400.300.200.100", null, 3, null); + GossipCommandLineOptions opts = + new GossipCommandLineOptions(new String[] {}, 10, "400.300.200.100", null, 3, 0, 0, false, 50, null); assertThrows(IllegalArgumentException.class, opts::validate); } @@ -52,7 +63,7 @@ class GossipCommandLineOptionsTest { + "networkInterface=\"127.0.0.1\"\n" + "messageLog=\"D:/Temp\"", GossipCommandLineOptions.createConfigFileSchema()); - GossipCommandLineOptions opts = new GossipCommandLineOptions(null, null, null, null, 3000, config); + GossipCommandLineOptions opts = new GossipCommandLineOptions(null, null, null, null, 3000, 0, 0, false, 50, config); opts.validate(); assertEquals(1080, opts.listenPort()); assertEquals(1, opts.peers().size()); @@ -70,7 +81,18 @@ class GossipCommandLineOptionsTest { + "networkInterface=\"127.0.0.1\"\n" + "messageLog=\"D:/Temp\"", GossipCommandLineOptions.createConfigFileSchema()); - GossipCommandLineOptions opts = new GossipCommandLineOptions(null, null, null, null, 3000, config); + GossipCommandLineOptions opts = new GossipCommandLineOptions( + null, + null, + null, + null, + 3000, + 0, + 0, + + false, + 50, + config); assertThrows(IllegalArgumentException.class, opts::validate); } @@ -82,8 +104,17 @@ class GossipCommandLineOptionsTest { + "listenPort=1080\n" + "networkInterface=\"127.0.0.1\"\n" + "messageLog=\"D:/Temp\""); - GossipCommandLineOptions opts = - new GossipCommandLineOptions(new String[] {"tcp://192.168.0.1:3000"}, 400, "0.0.0.0", "C:/Temp", 3000, config); + GossipCommandLineOptions opts = new GossipCommandLineOptions( + new String[] {"tcp://192.168.0.1:3000"}, + 400, + "0.0.0.0", + "C:/Temp", + 3000, + 0, + 0, + false, + 50, + config); assertEquals(400, opts.listenPort()); assertEquals(1, opts.peers().size()); assertEquals(URI.create("tcp://192.168.0.1:3000"), opts.peers().get(0)); diff --git a/gossip/src/test/java/org/apache/tuweni/gossip/GossipIntegrationTest.java b/gossip/src/test/java/org/apache/tuweni/gossip/GossipIntegrationTest.java index 8a0e31f..73a9d8d 100644 --- a/gossip/src/test/java/org/apache/tuweni/gossip/GossipIntegrationTest.java +++ b/gossip/src/test/java/org/apache/tuweni/gossip/GossipIntegrationTest.java @@ -48,6 +48,10 @@ class GossipIntegrationTest { "127.0.0.1", tempDir.resolve("log1.log").toString(), 10000, + 0, + 0, + false, + 50, null); GossipCommandLineOptions opts2 = new GossipCommandLineOptions( new String[] {"tcp://127.0.0.1:9000", "tcp://127.0.0.1:9002"}, @@ -55,6 +59,10 @@ class GossipIntegrationTest { "127.0.0.1", tempDir.resolve("log2.log").toString(), 10001, + 0, + 0, + false, + 50, null); GossipCommandLineOptions opts3 = new GossipCommandLineOptions( new String[] {"tcp://127.0.0.1:9000", "tcp://127.0.0.1:9001"}, @@ -62,6 +70,10 @@ class GossipIntegrationTest { "127.0.0.1", tempDir.resolve("log3.log").toString(), 10002, + 0, + 0, + false, + 50, null); AtomicBoolean terminationRan = new AtomicBoolean(false); diff --git a/gossip/src/test/java/org/apache/tuweni/gossip/GossipIntegrationTest.java b/gossip/src/test/java/org/apache/tuweni/gossip/GossipLoadTest.java similarity index 61% copy from gossip/src/test/java/org/apache/tuweni/gossip/GossipIntegrationTest.java copy to gossip/src/test/java/org/apache/tuweni/gossip/GossipLoadTest.java index 8a0e31f..81b23b2 100644 --- a/gossip/src/test/java/org/apache/tuweni/gossip/GossipIntegrationTest.java +++ b/gossip/src/test/java/org/apache/tuweni/gossip/GossipLoadTest.java @@ -15,14 +15,14 @@ package org.apache.tuweni.gossip; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import org.apache.tuweni.bytes.Bytes; -import org.apache.tuweni.bytes.Bytes32; -import org.apache.tuweni.junit.*; +import org.apache.tuweni.junit.BouncyCastleExtension; +import org.apache.tuweni.junit.TempDirectory; +import org.apache.tuweni.junit.TempDirectoryExtension; +import org.apache.tuweni.junit.VertxExtension; +import org.apache.tuweni.junit.VertxInstance; -import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; -import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -31,41 +31,68 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import io.vertx.core.Vertx; -import io.vertx.core.buffer.Buffer; -import io.vertx.core.http.HttpClient; -import io.vertx.core.http.HttpMethod; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +/** + * Very basic load test scenario in one JVM. + */ @ExtendWith({VertxExtension.class, TempDirectoryExtension.class, BouncyCastleExtension.class}) -class GossipIntegrationTest { +class GossipLoadTest { + @Disabled @Test - void threeGossipServersStarting(@VertxInstance Vertx vertx, @TempDirectory Path tempDir) throws Exception { + void fourGossipServersWithOneSender(@VertxInstance Vertx vertx, @TempDirectory Path tempDir) throws Exception { + int numberOfMessages = 10000; + int sendingInterval = 20; GossipCommandLineOptions opts1 = new GossipCommandLineOptions( - new String[] {"tcp://127.0.0.1:9001", "tcp://127.0.0.1:9002"}, + new String[] {"tcp://127.0.0.1:9001", "tcp://127.0.0.1:9002", "tcp://127.0.0.1:9003"}, 9000, "127.0.0.1", tempDir.resolve("log1.log").toString(), 10000, + 500, + 20, + true, + numberOfMessages, null); GossipCommandLineOptions opts2 = new GossipCommandLineOptions( - new String[] {"tcp://127.0.0.1:9000", "tcp://127.0.0.1:9002"}, + new String[] {}, 9001, "127.0.0.1", tempDir.resolve("log2.log").toString(), 10001, + 0, + 0, + false, + 0, null); GossipCommandLineOptions opts3 = new GossipCommandLineOptions( - new String[] {"tcp://127.0.0.1:9000", "tcp://127.0.0.1:9001"}, + new String[] {"tcp://127.0.0.1:9003"}, 9002, "127.0.0.1", tempDir.resolve("log3.log").toString(), 10002, + 0, + 0, + false, + 0, + null); + GossipCommandLineOptions opts4 = new GossipCommandLineOptions( + new String[] {}, + 9003, + "127.0.0.1", + tempDir.resolve("log4.log").toString(), + 10003, + 0, + 0, + false, + 0, null); AtomicBoolean terminationRan = new AtomicBoolean(false); - ExecutorService service = Executors.newFixedThreadPool(3); + ExecutorService service = Executors.newFixedThreadPool(4); Future<GossipApp> app1Future = service.submit(() -> { GossipApp app = new GossipApp(vertx, opts1, System.err, System.out, () -> { @@ -88,43 +115,37 @@ class GossipIntegrationTest { app.start(); return app; }); + Future<GossipApp> app4Future = service.submit(() -> { + GossipApp app = new GossipApp(vertx, opts4, System.err, System.out, () -> { + terminationRan.set(true); + }); + app.start(); + return app; + }); GossipApp app1 = app1Future.get(10, TimeUnit.SECONDS); GossipApp app2 = app2Future.get(10, TimeUnit.SECONDS); GossipApp app3 = app3Future.get(10, TimeUnit.SECONDS); + GossipApp app4 = app4Future.get(10, TimeUnit.SECONDS); assertFalse(terminationRan.get()); - HttpClient client = vertx.createHttpClient(); - - for (int i = 0; i < 20; i++) { - client.request(HttpMethod.POST, 10000, "127.0.0.1", "/publish").exceptionHandler(thr -> { - throw new RuntimeException(thr); - }).handler(resp -> { - - }).end(Buffer.buffer(Bytes32.rightPad(Bytes.ofUnsignedInt(i)).toHexString().getBytes(StandardCharsets.UTF_8))); - } - - List<String> receiver1 = Collections.emptyList(); - - int counter = 0; - do { - Thread.sleep(1000); - counter++; - if (Files.exists(tempDir.resolve("log2.log"))) { - receiver1 = Files.readAllLines(tempDir.resolve("log2.log")); - } - } while (receiver1.size() < 20 && counter < 20); + Thread.sleep((long) (numberOfMessages * sendingInterval * 1.33)); - client.close(); service.submit(app1::stop); service.submit(app2::stop); service.submit(app3::stop); + service.submit(app4::stop); + List<String> receiver2 = Files.readAllLines(tempDir.resolve("log2.log")); + List<String> receiver3 = Files.readAllLines(tempDir.resolve("log3.log")); + List<String> receiver4 = Files.readAllLines(tempDir.resolve("log4.log")); + System.out.println("n2: " + receiver2.size() + "\n3: " + receiver3.size() + "\n4: " + receiver4.size()); + assertEquals(numberOfMessages, receiver2.size()); + assertEquals(numberOfMessages, receiver3.size()); + assertEquals(numberOfMessages, receiver4.size()); - assertEquals(20, receiver1.size()); - List<String> receiver2 = Files.readAllLines(tempDir.resolve("log3.log")); - assertEquals(20, receiver2.size()); + assertFalse(tempDir.resolve("log1.log").toFile().exists()); service.shutdown(); diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/EphemeralPeerRepository.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/EphemeralPeerRepository.java index 7d0f4f7..883ae2a 100644 --- a/plumtree/src/main/java/org/apache/tuweni/plumtree/EphemeralPeerRepository.java +++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/EphemeralPeerRepository.java @@ -55,9 +55,10 @@ public final class EphemeralPeerRepository implements PeerRepository { } @Override - public void moveToLazy(Peer peer) { + public boolean moveToLazy(Peer peer) { eagerPushPeers.remove(peer); lazyPushPeers.add(peer); + return true; } @Override diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/PeerPruning.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/PeerPruning.java new file mode 100644 index 0000000..f33eaf1 --- /dev/null +++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/PeerPruning.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.tuweni.plumtree; + +/** + * Interface to decide whether to prune peers when they send messages late. + * + * Pruned peers become "lazy peers". They send message attestations (IHAVE). + */ +public interface PeerPruning { + + /** + * Decides whether to prune a peer + * + * @param peer the peer to consider + * @return true if the peer should be pruned + */ + boolean prunePeer(Peer peer); +} diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/PeerRepository.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/PeerRepository.java index 9f0419c..d0f7f5d 100644 --- a/plumtree/src/main/java/org/apache/tuweni/plumtree/PeerRepository.java +++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/PeerRepository.java @@ -54,8 +54,9 @@ public interface PeerRepository { * Moves a peer to the list of lazy peers * * @param peer the peer to move + * @return true if the move was effective */ - void moveToLazy(Peer peer); + boolean moveToLazy(Peer peer); /** * Moves a peer to the list of eager peers. diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java index 26832ad..463583b 100644 --- a/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java +++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java @@ -48,6 +48,7 @@ public final class State { private final MessageSender messageSender; private final Consumer<Bytes> messageListener; private final MessageValidator messageValidator; + private final PeerPruning peerPruningFunction; final Queue<Runnable> lazyQueue = new ConcurrentLinkedQueue<>(); private final Timer timer = new Timer("plumtree", true); private final long delay; @@ -66,7 +67,7 @@ public final class State { } /** - * Acts on receiving the full message + * Acts on receiving the full message. * * @param sender the sender - may be null if we are submitting this message to the network * @param message the payload to send to the network @@ -98,8 +99,9 @@ public final class State { } } else { if (sender != null) { - messageSender.sendMessage(MessageSender.Verb.PRUNE, null, sender, hash, null); - peerRepository.moveToLazy(sender); + if (peerPruningFunction.prunePeer(sender)) { + messageSender.sendMessage(MessageSender.Verb.PRUNE, null, sender, hash, null); + } } } } @@ -138,24 +140,35 @@ public final class State { * @param messageSender a function abstracting sending messages to other peers. * @param messageListener a function consuming messages when they are gossiped. * @param messageValidator a function validating messages before they are gossiped to other peers. + * @param peerPruningFunction a function deciding whether to prune peers. */ public State( PeerRepository peerRepository, MessageHashing messageHashingFunction, MessageSender messageSender, Consumer<Bytes> messageListener, - MessageValidator messageValidator) { - this(peerRepository, messageHashingFunction, messageSender, messageListener, messageValidator, 5000, 5000); + MessageValidator messageValidator, + PeerPruning peerPruningFunction) { + this( + peerRepository, + messageHashingFunction, + messageSender, + messageListener, + messageValidator, + peerPruningFunction, + 5000, + 5000); } /** - * Constructor using default time constants. + * Default constructor. * * @param peerRepository the peer repository to use to store and access peer information. * @param messageHashingFunction the function to use to hash messages into hashes to compare them. * @param messageSender a function abstracting sending messages to other peers. * @param messageListener a function consuming messages when they are gossiped. * @param messageValidator a function validating messages before they are gossiped to other peers. + * @param peerPruningFunction a function deciding whether to prune peers. * @param graftDelay delay in milliseconds to apply before this peer grafts an other peer when it finds that peer has * data it misses. * @param lazyQueueInterval the interval in milliseconds between sending messages to lazy peers. @@ -166,6 +179,7 @@ public final class State { MessageSender messageSender, Consumer<Bytes> messageListener, MessageValidator messageValidator, + PeerPruning peerPruningFunction, long graftDelay, long lazyQueueInterval) { this.peerRepository = peerRepository; @@ -173,6 +187,7 @@ public final class State { this.messageSender = messageSender; this.messageListener = messageListener; this.messageValidator = messageValidator; + this.peerPruningFunction = peerPruningFunction; this.delay = graftDelay; timer.schedule(new TimerTask() { @Override @@ -208,9 +223,13 @@ public final class State { * @param attributes of the message * @param message the hash of the message */ - public void receiveGossipMessage(Peer peer, String attributes, Bytes message) { + public void receiveGossipMessage(Peer peer, String attributes, Bytes message, Bytes messageHash) { + Bytes checkHash = messageHashingFunction.hash(message); + if (!checkHash.equals(messageHash)) { + return; + } peerRepository.considerNewPeer(peer); - MessageHandler handler = messageHandlers.computeIfAbsent(messageHashingFunction.hash(message), MessageHandler::new); + MessageHandler handler = messageHandlers.computeIfAbsent(messageHash, MessageHandler::new); handler.fullMessageReceived(peer, attributes, message); } @@ -226,7 +245,7 @@ public final class State { } /** - * Requests a peer be pruned away from the eager peers into the lazy peers + * Requests a peer be pruned away from the eager peers into the lazy peers. * * @param peer the peer to move to lazy peers */ @@ -235,7 +254,7 @@ public final class State { } /** - * Requests a peer be grafted to the eager peers list + * Requests a peer be grafted to the eager peers list. * * @param peer the peer to add to the eager peers * @param messageHash the hash of the message that triggers this grafting diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java index dce838d..5d8e75a 100644 --- a/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java +++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java @@ -19,6 +19,7 @@ import org.apache.tuweni.plumtree.MessageHashing; import org.apache.tuweni.plumtree.MessageSender; import org.apache.tuweni.plumtree.MessageValidator; import org.apache.tuweni.plumtree.Peer; +import org.apache.tuweni.plumtree.PeerPruning; import org.apache.tuweni.plumtree.PeerRepository; import org.apache.tuweni.plumtree.State; @@ -78,10 +79,14 @@ public final class VertxGossipServer { switch (message.verb) { case IHAVE: - state.receiveIHaveMessage(peer, Bytes.fromHexString(message.payload)); + state.receiveIHaveMessage(peer, Bytes.fromHexString(message.hash)); break; case GOSSIP: - state.receiveGossipMessage(peer, message.attributes, Bytes.fromHexString(message.payload)); + state.receiveGossipMessage( + peer, + message.attributes, + Bytes.fromHexString(message.payload), + Bytes.fromHexString(message.hash)); break; case GRAFT: state.receiveGraftMessage(peer, Bytes.fromHexString(message.payload)); @@ -98,17 +103,20 @@ public final class VertxGossipServer { } } - private final AtomicBoolean started = new AtomicBoolean(false); - private final Vertx vertx; - private final int port; + private NetClient client; + private final int graftDelay; + private final int lazyQueueInterval; private final MessageHashing messageHashing; private final String networkInterface; - private final PeerRepository peerRepository; private final Consumer<Bytes> payloadListener; private final MessageValidator payloadValidator; - private State state; + private final PeerPruning peerPruningFunction; + private final PeerRepository peerRepository; + private final int port; private NetServer server; - private NetClient client; + private final AtomicBoolean started = new AtomicBoolean(false); + private State state; + private final Vertx vertx; public VertxGossipServer( Vertx vertx, @@ -117,7 +125,10 @@ public final class VertxGossipServer { MessageHashing messageHashing, PeerRepository peerRepository, Consumer<Bytes> payloadListener, - @Nullable MessageValidator payloadValidator) { + @Nullable MessageValidator payloadValidator, + @Nullable PeerPruning peerPruningFunction, + int graftDelay, + int lazyQueueInterval) { this.vertx = vertx; this.networkInterface = networkInterface; this.port = port; @@ -125,6 +136,9 @@ public final class VertxGossipServer { this.peerRepository = peerRepository; this.payloadListener = payloadListener; this.payloadValidator = payloadValidator == null ? (bytes, peer) -> true : payloadValidator; + this.peerPruningFunction = peerPruningFunction == null ? (peer) -> true : peerPruningFunction; + this.graftDelay = graftDelay; + this.lazyQueueInterval = lazyQueueInterval; } public AsyncCompletion start() { @@ -135,24 +149,27 @@ public final class VertxGossipServer { server.connectHandler(socket -> { Peer peer = new SocketPeer(socket); SocketHandler handler = new SocketHandler(peer); - socket.handler(handler::handle).closeHandler(handler::close); + socket.handler(handler::handle).closeHandler(handler::close).exceptionHandler(Throwable::printStackTrace); }); + server.exceptionHandler(Throwable::printStackTrace); server.listen(port, networkInterface, res -> { if (res.failed()) { completion.completeExceptionally(res.cause()); } else { state = new State(peerRepository, messageHashing, (verb, attributes, peer, hash, payload) -> { - Message message = new Message(); - message.verb = verb; - message.attributes = attributes; - message.hash = hash.toHexString(); - message.payload = payload == null ? null : payload.toHexString(); - try { - ((SocketPeer) peer).socket().write(Buffer.buffer(mapper.writeValueAsBytes(message))); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - }, payloadListener, payloadValidator); + vertx.runOnContext(handler -> { + Message message = new Message(); + message.verb = verb; + message.attributes = attributes; + message.hash = hash.toHexString(); + message.payload = payload == null ? null : payload.toHexString(); + try { + ((SocketPeer) peer).socket().write(Buffer.buffer(mapper.writeValueAsBytes(message))); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + }); + }, payloadListener, payloadValidator, peerPruningFunction, graftDelay, lazyQueueInterval); completion.complete(); } diff --git a/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java b/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java index b171ee8..01a31cc 100644 --- a/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java +++ b/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java @@ -56,7 +56,13 @@ class StateTest { @Test void testInitialState() { EphemeralPeerRepository repo = new EphemeralPeerRepository(); - State state = new State(repo, Hash::keccak256, new MockMessageSender(), messageRef::set, (message, peer) -> true); + State state = new State( + repo, + Hash::keccak256, + new MockMessageSender(), + messageRef::set, + (message, peer) -> true, + (peer) -> true); assertTrue(repo.peers().isEmpty()); assertTrue(repo.lazyPushPeers().isEmpty()); assertTrue(repo.eagerPushPeers().isEmpty()); @@ -65,7 +71,13 @@ class StateTest { @Test void firstRoundWithThreePeers() { EphemeralPeerRepository repo = new EphemeralPeerRepository(); - State state = new State(repo, Hash::keccak256, new MockMessageSender(), messageRef::set, (message, peer) -> true); + State state = new State( + repo, + Hash::keccak256, + new MockMessageSender(), + messageRef::set, + (message, peer) -> true, + (peer) -> true); state.addPeer(new PeerImpl()); state.addPeer(new PeerImpl()); state.addPeer(new PeerImpl()); @@ -76,7 +88,13 @@ class StateTest { @Test void firstRoundWithTwoPeers() { EphemeralPeerRepository repo = new EphemeralPeerRepository(); - State state = new State(repo, Hash::keccak256, new MockMessageSender(), messageRef::set, (message, peer) -> true); + State state = new State( + repo, + Hash::keccak256, + new MockMessageSender(), + messageRef::set, + (message, peer) -> true, + (peer) -> true); state.addPeer(new PeerImpl()); state.addPeer(new PeerImpl()); assertTrue(repo.lazyPushPeers().isEmpty()); @@ -86,7 +104,13 @@ class StateTest { @Test void firstRoundWithOnePeer() { EphemeralPeerRepository repo = new EphemeralPeerRepository(); - State state = new State(repo, Hash::keccak256, new MockMessageSender(), messageRef::set, (message, peer) -> true); + State state = new State( + repo, + Hash::keccak256, + new MockMessageSender(), + messageRef::set, + (message, peer) -> true, + (peer) -> true); state.addPeer(new PeerImpl()); assertTrue(repo.lazyPushPeers().isEmpty()); assertEquals(1, repo.eagerPushPeers().size()); @@ -95,7 +119,13 @@ class StateTest { @Test void removePeer() { EphemeralPeerRepository repo = new EphemeralPeerRepository(); - State state = new State(repo, Hash::keccak256, new MockMessageSender(), messageRef::set, (message, peer) -> true); + State state = new State( + repo, + Hash::keccak256, + new MockMessageSender(), + messageRef::set, + (message, peer) -> true, + (peer) -> true); Peer peer = new PeerImpl(); state.addPeer(peer); state.removePeer(peer); @@ -107,7 +137,13 @@ class StateTest { @Test void prunePeer() { EphemeralPeerRepository repo = new EphemeralPeerRepository(); - State state = new State(repo, Hash::keccak256, new MockMessageSender(), messageRef::set, (message, peer) -> true); + State state = new State( + repo, + Hash::keccak256, + new MockMessageSender(), + messageRef::set, + (message, peer) -> true, + (peer) -> true); Peer peer = new PeerImpl(); state.addPeer(peer); state.receivePruneMessage(peer); @@ -118,7 +154,13 @@ class StateTest { @Test void graftPeer() { EphemeralPeerRepository repo = new EphemeralPeerRepository(); - State state = new State(repo, Hash::keccak256, new MockMessageSender(), messageRef::set, (message, peer) -> true); + State state = new State( + repo, + Hash::keccak256, + new MockMessageSender(), + messageRef::set, + (message, peer) -> true, + (peer) -> true); Peer peer = new PeerImpl(); state.addPeer(peer); state.receivePruneMessage(peer); @@ -133,14 +175,15 @@ class StateTest { void receiveFullMessageFromEagerPeer() { EphemeralPeerRepository repo = new EphemeralPeerRepository(); MockMessageSender messageSender = new MockMessageSender(); - State state = new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true); + State state = + new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true, (peer) -> true); Peer peer = new PeerImpl(); state.addPeer(peer); Peer otherPeer = new PeerImpl(); state.addPeer(otherPeer); Bytes32 msg = Bytes32.random(); String attributes = "{\"message_type\": \"BLOCK\"}"; - state.receiveGossipMessage(peer, attributes, msg); + state.receiveGossipMessage(peer, attributes, msg, Hash.keccak256(msg)); assertEquals(msg, messageSender.payload); assertEquals(otherPeer, messageSender.peer); } @@ -149,7 +192,8 @@ class StateTest { void receiveFullMessageFromEagerPeerWithALazyPeer() { EphemeralPeerRepository repo = new EphemeralPeerRepository(); MockMessageSender messageSender = new MockMessageSender(); - State state = new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true); + State state = + new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true, (peer) -> true); Peer peer = new PeerImpl(); state.addPeer(peer); Peer otherPeer = new PeerImpl(); @@ -159,7 +203,7 @@ class StateTest { state.addPeer(lazyPeer); repo.moveToLazy(lazyPeer); String attributes = "{\"message_type\": \"BLOCK\"}"; - state.receiveGossipMessage(peer, attributes, msg); + state.receiveGossipMessage(peer, attributes, msg, Hash.keccak256(msg)); assertEquals(msg, messageSender.payload); assertEquals(otherPeer, messageSender.peer); state.processQueue(); @@ -173,7 +217,8 @@ class StateTest { void receiveFullMessageFromEagerPeerThenPartialMessageFromLazyPeer() { EphemeralPeerRepository repo = new EphemeralPeerRepository(); MockMessageSender messageSender = new MockMessageSender(); - State state = new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true); + State state = + new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true, (peer) -> true); Peer peer = new PeerImpl(); state.addPeer(peer); Peer lazyPeer = new PeerImpl(); @@ -181,7 +226,7 @@ class StateTest { repo.moveToLazy(lazyPeer); Bytes message = Bytes32.random(); String attributes = "{\"message_type\": \"BLOCK\"}"; - state.receiveGossipMessage(peer, attributes, message); + state.receiveGossipMessage(peer, attributes, message, Hash.keccak256(message)); state.receiveIHaveMessage(lazyPeer, message); assertNull(messageSender.payload); assertNull(messageSender.peer); @@ -191,7 +236,15 @@ class StateTest { void receivePartialMessageFromLazyPeerAndNoFullMessage() throws Exception { EphemeralPeerRepository repo = new EphemeralPeerRepository(); MockMessageSender messageSender = new MockMessageSender(); - State state = new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true, 100, 4000); + State state = new State( + repo, + Hash::keccak256, + messageSender, + messageRef::set, + (message, peer) -> true, + (peer) -> true, + 100, + 4000); Peer peer = new PeerImpl(); state.addPeer(peer); Peer lazyPeer = new PeerImpl(); @@ -209,7 +262,15 @@ class StateTest { void receivePartialMessageFromLazyPeerAndThenFullMessage() throws Exception { EphemeralPeerRepository repo = new EphemeralPeerRepository(); MockMessageSender messageSender = new MockMessageSender(); - State state = new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true, 500, 4000); + State state = new State( + repo, + Hash::keccak256, + messageSender, + messageRef::set, + (message, peer) -> true, + (peer) -> true, + 500, + 4000); Peer peer = new PeerImpl(); state.addPeer(peer); Peer lazyPeer = new PeerImpl(); @@ -219,7 +280,7 @@ class StateTest { state.receiveIHaveMessage(lazyPeer, Hash.keccak256(message)); Thread.sleep(100); String attributes = "{\"message_type\": \"BLOCK\"}"; - state.receiveGossipMessage(peer, attributes, message); + state.receiveGossipMessage(peer, attributes, message, Hash.keccak256(message)); Thread.sleep(500); assertNull(messageSender.verb); assertNull(messageSender.payload); @@ -230,11 +291,12 @@ class StateTest { void receiveFullMessageFromUnknownPeer() { EphemeralPeerRepository repo = new EphemeralPeerRepository(); MockMessageSender messageSender = new MockMessageSender(); - State state = new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true); + State state = + new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true, (peer) -> true); Peer peer = new PeerImpl(); Bytes message = Bytes32.random(); String attributes = "{\"message_type\": \"BLOCK\"}"; - state.receiveGossipMessage(peer, attributes, message); + state.receiveGossipMessage(peer, attributes, message, Hash.keccak256(message)); assertEquals(1, repo.eagerPushPeers().size()); assertEquals(0, repo.lazyPushPeers().size()); assertEquals(peer, repo.eagerPushPeers().iterator().next()); @@ -244,15 +306,16 @@ class StateTest { void prunePeerWhenReceivingTwiceTheSameFullMessage() { EphemeralPeerRepository repo = new EphemeralPeerRepository(); MockMessageSender messageSender = new MockMessageSender(); - State state = new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true); + State state = + new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true, (peer) -> true); Peer peer = new PeerImpl(); Peer secondPeer = new PeerImpl(); Bytes message = Bytes32.random(); String attributes = "{\"message_type\": \"BLOCK\"}"; - state.receiveGossipMessage(peer, attributes, message); - state.receiveGossipMessage(secondPeer, attributes, message); - assertEquals(1, repo.eagerPushPeers().size()); - assertEquals(1, repo.lazyPushPeers().size()); + state.receiveGossipMessage(peer, attributes, message, Hash.keccak256(message)); + state.receiveGossipMessage(secondPeer, attributes, message, Hash.keccak256(message)); + assertEquals(2, repo.eagerPushPeers().size()); + assertEquals(0, repo.lazyPushPeers().size()); assertNull(messageSender.payload); assertEquals(secondPeer, messageSender.peer); assertEquals(MessageSender.Verb.PRUNE, messageSender.verb); diff --git a/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java b/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java index d5dd5d8..29ae06c 100644 --- a/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java +++ b/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java @@ -43,7 +43,10 @@ class VertxGossipServerTest { bytes -> bytes, new EphemeralPeerRepository(), messageReceived1::set, - (message, peer) -> true); + (message, peer) -> true, + null, + 200, + 200); VertxGossipServer server2 = new VertxGossipServer( vertx, "127.0.0.1", @@ -51,7 +54,10 @@ class VertxGossipServerTest { bytes -> bytes, new EphemeralPeerRepository(), messageReceived2::set, - (message, peer) -> true); + (message, peer) -> true, + null, + 200, + 200); server1.start().join(); server2.start().join(); @@ -85,7 +91,10 @@ class VertxGossipServerTest { bytes -> bytes, new EphemeralPeerRepository(), messageReceived1::set, - (message, peer) -> true); + (message, peer) -> true, + null, + 200, + 200); VertxGossipServer server2 = new VertxGossipServer( vertx, "127.0.0.1", @@ -93,7 +102,10 @@ class VertxGossipServerTest { bytes -> bytes, new EphemeralPeerRepository(), messageReceived2::set, - (message, peer) -> true); + (message, peer) -> true, + null, + 200, + 200); VertxGossipServer server3 = new VertxGossipServer( vertx, "127.0.0.1", @@ -101,7 +113,10 @@ class VertxGossipServerTest { bytes -> bytes, new EphemeralPeerRepository(), messageReceived3::set, - (message, peer) -> true); + (message, peer) -> true, + null, + 200, + 200); server1.start().join(); server2.start().join(); @@ -141,7 +156,10 @@ class VertxGossipServerTest { bytes -> bytes, peerRepository1, messageReceived1::set, - (message, peer) -> true); + (message, peer) -> true, + null, + 200, + 200); VertxGossipServer server2 = new VertxGossipServer( vertx, "127.0.0.1", @@ -149,7 +167,10 @@ class VertxGossipServerTest { bytes -> bytes, new EphemeralPeerRepository(), messageReceived2::set, - (message, peer) -> true); + (message, peer) -> true, + null, + 200, + 200); VertxGossipServer server3 = new VertxGossipServer( vertx, "127.0.0.1", @@ -157,7 +178,10 @@ class VertxGossipServerTest { bytes -> bytes, peerRepository3, messageReceived2::set, - (message, peer) -> true); + (message, peer) -> true, + null, + 200, + 200); server1.start().join(); server2.start().join(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@tuweni.apache.org For additional commands, e-mail: commits-h...@tuweni.apache.org