This is an automated email from the ASF dual-hosted git repository.

toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git


The following commit(s) were added to refs/heads/master by this push:
     new 39713a2  Add more flexibility to plumtree, allowing peer pruning 
management
39713a2 is described below

commit 39713a2e638b419bc79d494528067eb8866af14f
Author: Antoine Toulme <anto...@lunar-ocean.com>
AuthorDate: Wed May 29 21:53:03 2019 -0700

    Add more flexibility to plumtree, allowing peer pruning management
---
 .../tuweni/gossip/CountingPeerPruningFunction.java |  47 +++++++++
 .../java/org/apache/tuweni/gossip/GossipApp.java   |  46 +++++++--
 .../tuweni/gossip/GossipCommandLineOptions.java    |  94 ++++++++++++++++--
 .../tuweni/gossip/LoggingPeerRepository.java       |  27 +++--
 .../gossip/GossipCommandLineOptionsTest.java       |  47 +++++++--
 .../tuweni/gossip/GossipIntegrationTest.java       |  12 +++
 ...sipIntegrationTest.java => GossipLoadTest.java} |  97 +++++++++++-------
 .../tuweni/plumtree/EphemeralPeerRepository.java   |   3 +-
 .../org/apache/tuweni/plumtree/PeerPruning.java    |  29 ++++++
 .../org/apache/tuweni/plumtree/PeerRepository.java |   3 +-
 .../java/org/apache/tuweni/plumtree/State.java     |  39 ++++++--
 .../tuweni/plumtree/vertx/VertxGossipServer.java   |  59 +++++++----
 .../java/org/apache/tuweni/plumtree/StateTest.java | 109 ++++++++++++++++-----
 .../plumtree/vertx/VertxGossipServerTest.java      |  40 ++++++--
 14 files changed, 518 insertions(+), 134 deletions(-)

diff --git 
a/gossip/src/main/java/org/apache/tuweni/gossip/CountingPeerPruningFunction.java
 
b/gossip/src/main/java/org/apache/tuweni/gossip/CountingPeerPruningFunction.java
new file mode 100644
index 0000000..f7307f4
--- /dev/null
+++ 
b/gossip/src/main/java/org/apache/tuweni/gossip/CountingPeerPruningFunction.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.tuweni.gossip;
+
+import org.apache.tuweni.plumtree.Peer;
+import org.apache.tuweni.plumtree.PeerPruning;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.WeakHashMap;
+
+/**
+ * Function counting the number of times a peer showed.
+ */
+final class CountingPeerPruningFunction implements PeerPruning {
+
+  private final int numberOfOccurrences;
+  private final Map<Peer, Integer> countingOccurrences = 
Collections.synchronizedMap(new WeakHashMap<>());
+
+  public CountingPeerPruningFunction(int numberOfOccurrences) {
+    this.numberOfOccurrences = numberOfOccurrences;
+  }
+
+  @Override
+  public boolean prunePeer(Peer peer) {
+    Integer currentValue = countingOccurrences.putIfAbsent(peer, 1);
+    if (currentValue != null) {
+      if (currentValue + 1 >= numberOfOccurrences) {
+        countingOccurrences.remove(peer);
+        return true;
+      }
+      countingOccurrences.put(peer, currentValue + 1);
+    }
+
+    return false;
+  }
+}
diff --git a/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java 
b/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java
index 173ec7b..f69e12b 100644
--- a/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java
+++ b/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java
@@ -17,7 +17,6 @@ import org.apache.tuweni.bytes.Bytes;
 import org.apache.tuweni.concurrent.AsyncCompletion;
 import org.apache.tuweni.concurrent.CompletableAsyncCompletion;
 import org.apache.tuweni.crypto.Hash;
-import org.apache.tuweni.plumtree.EphemeralPeerRepository;
 import org.apache.tuweni.plumtree.vertx.VertxGossipServer;
 
 import java.io.IOException;
@@ -31,6 +30,9 @@ import java.security.Security;
 import java.time.Instant;
 import java.util.Collections;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -68,8 +70,14 @@ public final class GossipApp {
     gossipApp.start();
   }
 
-
-
+  private final ExecutorService senderThreadPool = 
Executors.newSingleThreadExecutor(new ThreadFactory() {
+    @Override
+    public Thread newThread(Runnable r) {
+      Thread t = new Thread(r, "sender");
+      t.setDaemon(false);
+      return t;
+    }
+  });
   private final GossipCommandLineOptions opts;
   private final Runnable terminateFunction;
   private final PrintStream errStream;
@@ -83,7 +91,7 @@ public final class GossipApp {
       PrintStream errStream,
       PrintStream outStream,
       Runnable terminateFunction) {
-    EphemeralPeerRepository repository = new EphemeralPeerRepository();
+    LoggingPeerRepository repository = new LoggingPeerRepository(outStream);
     outStream.println("Setting up server on " + opts.networkInterface() + ":" 
+ opts.listenPort());
     server = new VertxGossipServer(
         vertx,
@@ -92,7 +100,10 @@ public final class GossipApp {
         Hash::keccak256,
         repository,
         bytes -> readMessage(opts.messageLog(), errStream, bytes),
-        null);
+        null,
+        new CountingPeerPruningFunction(10),
+        100,
+        100);
     this.opts = opts;
     this.errStream = errStream;
     this.outStream = outStream;
@@ -135,13 +146,33 @@ public final class GossipApp {
       errStream.println("Server could not connect to other peers: " + 
e.getMessage());
     }
     outStream.println("Gossip started");
+
+    if (opts.sending()) {
+      outStream.println("Start sending messages");
+      senderThreadPool.submit(() -> {
+        for (int i = 0; i < opts.numberOfMessages(); i++) {
+          if (Thread.currentThread().isInterrupted()) {
+            return;
+          }
+          Bytes payload = Bytes.random(opts.payloadSize());
+          publish(payload);
+          try {
+            Thread.sleep(opts.sendInterval());
+          } catch (InterruptedException e) {
+            return;
+          }
+        }
+      });
+    }
   }
 
   private void handleRPCRequest(HttpServerRequest httpServerRequest) {
     if (HttpMethod.POST.equals(httpServerRequest.method())) {
       if ("/publish".equals(httpServerRequest.path())) {
         httpServerRequest.bodyHandler(body -> {
-          publish(Bytes.wrapBuffer(body));
+          Bytes message = Bytes.wrapBuffer(body);
+          outStream.println("Message to publish " + message.toHexString());
+          publish(message);
           httpServerRequest.response().setStatusCode(200).end();
         });
       } else {
@@ -153,6 +184,8 @@ public final class GossipApp {
   }
 
   void stop() {
+    outStream.println("Stopping sending");
+    senderThreadPool.shutdown();
     outStream.println("Stopping gossip");
     try {
       server.stop().join();
@@ -196,7 +229,6 @@ public final class GossipApp {
   }
 
   public void publish(Bytes message) {
-    outStream.println("Message to publish " + message.toHexString());
     server.gossip("", message);
   }
 }
diff --git 
a/gossip/src/main/java/org/apache/tuweni/gossip/GossipCommandLineOptions.java 
b/gossip/src/main/java/org/apache/tuweni/gossip/GossipCommandLineOptions.java
index 04a6f35..e1e776e 100644
--- 
a/gossip/src/main/java/org/apache/tuweni/gossip/GossipCommandLineOptions.java
+++ 
b/gossip/src/main/java/org/apache/tuweni/gossip/GossipCommandLineOptions.java
@@ -41,6 +41,14 @@ final class GossipCommandLineOptions {
         .addString("networkInterface", "0.0.0.0", "Network interface to bind 
to", null)
         .addListOfString("peers", Collections.emptyList(), "Static peers 
list", null)
         .addString("messagelog", "messages.log", "Log file where messages are 
stored", null)
+        .addBoolean("sending", false, "Whether this peer sends random messages 
to all other peers (load testing)", null)
+        .addInteger(
+            "sendInterval",
+            1000,
+            "Interval to wait in between sending messages in milliseconds 
(load testing)",
+            null)
+        .addInteger("numberOfMessages", 100, "Number of messages to publish to 
other peers (load testing)", null)
+        .addInteger("payloadSize", 200, "Size of the random payload to send to 
other peers (load testing)", null)
         .toSchema();
   }
 
@@ -62,6 +70,21 @@ final class GossipCommandLineOptions {
   @CommandLine.Option(names = {"-m", "--messageLog"} , description = "Log file 
where messages are stored")
   private String messageLog;
 
+  @CommandLine.Option(names = {"--sendInterval"} ,
+      description = "Interval to wait in between sending messages in 
milliseconds (load testing)")
+  private Integer sendInterval;
+
+  @CommandLine.Option(names = {"--payloadSize"} ,
+      description = "Size of the random payload to send to other peers (load 
testing)")
+  private Integer payloadSize;
+
+  @CommandLine.Option(names = {"--numberOfMessages"} , description = "Number 
of messages to publish (load testing)")
+  private Integer numberOfMessages;
+
+  @CommandLine.Option(names = {"--sending"} ,
+      description = "Whether this peer sends random messages to all other 
peers (load testing)")
+  private Boolean sending;
+
   @CommandLine.Option(names = {"-h", "--help"} , description = "Prints usage 
prompt")
   private boolean help;
 
@@ -70,12 +93,19 @@ final class GossipCommandLineOptions {
 
   GossipCommandLineOptions() {}
 
+  /**
+   * Constructor used for testing.
+   */
   GossipCommandLineOptions(
       String[] peers,
       Integer port,
       String networkInterface,
       String messageLog,
       Integer rpcPort,
+      Integer payloadSize,
+      Integer sendInterval,
+      Boolean sending,
+      Integer numberOfMessages,
       Configuration config) {
     this.peers = peers;
     this.port = port;
@@ -83,6 +113,10 @@ final class GossipCommandLineOptions {
     this.messageLog = messageLog;
     this.rpcPort = rpcPort;
     this.config = config;
+    this.payloadSize = payloadSize;
+    this.sendInterval = sendInterval;
+    this.numberOfMessages = numberOfMessages;
+    this.sending = sending;
   }
 
   private Configuration config() {
@@ -101,20 +135,12 @@ final class GossipCommandLineOptions {
       peerAddresses = new ArrayList<>();
       if (peers != null) {
         for (String peer : peers) {
-          URI peerURI = URI.create(peer);
-          if (peerURI.getHost() == null) {
-            throw new IllegalArgumentException("Invalid peer URI " + peerURI);
-          }
-          peerAddresses.add(peerURI);
+          readPeerInfo(peer);
         }
       } else {
         if (config() != null) {
           for (String peer : config().getListOfString("peers")) {
-            URI peerURI = URI.create(peer);
-            if (peerURI.getHost() == null) {
-              throw new IllegalArgumentException("Invalid peer URI " + 
peerURI);
-            }
-            peerAddresses.add(peerURI);
+            readPeerInfo(peer);
           }
         }
       }
@@ -122,6 +148,14 @@ final class GossipCommandLineOptions {
     return peerAddresses;
   }
 
+  private void readPeerInfo(String peer) {
+    URI peerURI = URI.create(peer);
+    if (peerURI.getHost() == null) {
+      throw new IllegalArgumentException("Invalid peer URI " + peerURI);
+    }
+    peerAddresses.add(peerURI);
+  }
+
   void validate() {
     int listenPort = listenPort();
     if (listenPort < 0 || listenPort > 65535) {
@@ -187,6 +221,46 @@ final class GossipCommandLineOptions {
     return "messages.log";
   }
 
+  boolean sending() {
+    if (sending != null) {
+      return sending;
+    }
+    if (config != null) {
+      return config.getBoolean("sending");
+    }
+    return false;
+  }
+
+  Integer payloadSize() {
+    if (payloadSize != null) {
+      return payloadSize;
+    }
+    if (config != null) {
+      return config.getInteger("payloadSize");
+    }
+    return 200;
+  }
+
+  Integer sendInterval() {
+    if (sendInterval != null) {
+      return sendInterval;
+    }
+    if (config != null) {
+      return config.getInteger("sendInterval");
+    }
+    return 1000;
+  }
+
+  Integer numberOfMessages() {
+    if (numberOfMessages != null) {
+      return numberOfMessages;
+    }
+    if (config != null) {
+      return config.getInteger("numberOfMessages");
+    }
+    return 100;
+  }
+
   boolean help() {
     return help;
   }
diff --git 
a/plumtree/src/main/java/org/apache/tuweni/plumtree/EphemeralPeerRepository.java
 b/gossip/src/main/java/org/apache/tuweni/gossip/LoggingPeerRepository.java
similarity index 75%
copy from 
plumtree/src/main/java/org/apache/tuweni/plumtree/EphemeralPeerRepository.java
copy to gossip/src/main/java/org/apache/tuweni/gossip/LoggingPeerRepository.java
index 7d0f4f7..ad0a450 100644
--- 
a/plumtree/src/main/java/org/apache/tuweni/plumtree/EphemeralPeerRepository.java
+++ b/gossip/src/main/java/org/apache/tuweni/gossip/LoggingPeerRepository.java
@@ -10,21 +10,27 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
-package org.apache.tuweni.plumtree;
+package org.apache.tuweni.gossip;
 
+import org.apache.tuweni.plumtree.Peer;
+import org.apache.tuweni.plumtree.PeerRepository;
+
+import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
-/**
- * In-memory peer repository.
- */
-public final class EphemeralPeerRepository implements PeerRepository {
+final class LoggingPeerRepository implements PeerRepository {
 
   private final Set<Peer> eagerPushPeers = ConcurrentHashMap.newKeySet();
   private final Set<Peer> lazyPushPeers = ConcurrentHashMap.newKeySet();
+  private final PrintStream logger;
+
+  public LoggingPeerRepository(PrintStream logger) {
+    this.logger = logger;
+  }
 
   @Override
   public void addEager(Peer peer) {
@@ -50,18 +56,22 @@ public final class EphemeralPeerRepository implements 
PeerRepository {
 
   @Override
   public void removePeer(Peer peer) {
+    logger.println("Removing peer " + peer);
     lazyPushPeers.remove(peer);
     eagerPushPeers.remove(peer);
   }
 
   @Override
-  public void moveToLazy(Peer peer) {
+  public boolean moveToLazy(Peer peer) {
+    logger.println("Move peer to lazy " + peer);
     eagerPushPeers.remove(peer);
     lazyPushPeers.add(peer);
+    return true;
   }
 
   @Override
   public void moveToEager(Peer peer) {
+    logger.println("Move peer to eager " + peer);
     lazyPushPeers.remove(peer);
     eagerPushPeers.add(peer);
   }
@@ -69,8 +79,11 @@ public final class EphemeralPeerRepository implements 
PeerRepository {
   @Override
   public void considerNewPeer(Peer peer) {
     if (!lazyPushPeers.contains(peer)) {
-      eagerPushPeers.add(peer);
+      if (eagerPushPeers.add(peer)) {
+        logger.println("Added new peer " + peer);
+      }
     }
 
   }
+
 }
diff --git 
a/gossip/src/test/java/org/apache/tuweni/gossip/GossipCommandLineOptionsTest.java
 
b/gossip/src/test/java/org/apache/tuweni/gossip/GossipCommandLineOptionsTest.java
index cd6b885..64a8d77 100644
--- 
a/gossip/src/test/java/org/apache/tuweni/gossip/GossipCommandLineOptionsTest.java
+++ 
b/gossip/src/test/java/org/apache/tuweni/gossip/GossipCommandLineOptionsTest.java
@@ -26,20 +26,31 @@ class GossipCommandLineOptionsTest {
 
   @Test
   void testInvalidPort() {
-    GossipCommandLineOptions opts = new GossipCommandLineOptions(new 
String[0], -4, "0.0.0.0", null, 3, null);
+    GossipCommandLineOptions opts =
+        new GossipCommandLineOptions(new String[0], -4, "0.0.0.0", null, 3, 0, 
0, false, 50, null);
     assertThrows(IllegalArgumentException.class, opts::validate);
   }
 
   @Test
   void testInvalidPeer() {
-    GossipCommandLineOptions opts =
-        new GossipCommandLineOptions(new String[] 
{"tcp://400.300.200.100:9000"}, 10, "0.0.0.0", null, 3, null);
+    GossipCommandLineOptions opts = new GossipCommandLineOptions(
+        new String[] {"tcp://400.300.200.100:9000"},
+        10,
+        "0.0.0.0",
+        null,
+        3,
+        0,
+        0,
+        false,
+        50,
+        null);
     assertThrows(IllegalArgumentException.class, opts::validate);
   }
 
   @Test
   void testInvalidNetworkInterface() {
-    GossipCommandLineOptions opts = new GossipCommandLineOptions(new String[] 
{}, 10, "400.300.200.100", null, 3, null);
+    GossipCommandLineOptions opts =
+        new GossipCommandLineOptions(new String[] {}, 10, "400.300.200.100", 
null, 3, 0, 0, false, 50, null);
     assertThrows(IllegalArgumentException.class, opts::validate);
   }
 
@@ -52,7 +63,7 @@ class GossipCommandLineOptionsTest {
             + "networkInterface=\"127.0.0.1\"\n"
             + "messageLog=\"D:/Temp\"",
         GossipCommandLineOptions.createConfigFileSchema());
-    GossipCommandLineOptions opts = new GossipCommandLineOptions(null, null, 
null, null, 3000, config);
+    GossipCommandLineOptions opts = new GossipCommandLineOptions(null, null, 
null, null, 3000, 0, 0, false, 50, config);
     opts.validate();
     assertEquals(1080, opts.listenPort());
     assertEquals(1, opts.peers().size());
@@ -70,7 +81,18 @@ class GossipCommandLineOptionsTest {
             + "networkInterface=\"127.0.0.1\"\n"
             + "messageLog=\"D:/Temp\"",
         GossipCommandLineOptions.createConfigFileSchema());
-    GossipCommandLineOptions opts = new GossipCommandLineOptions(null, null, 
null, null, 3000, config);
+    GossipCommandLineOptions opts = new GossipCommandLineOptions(
+        null,
+        null,
+        null,
+        null,
+        3000,
+        0,
+        0,
+
+        false,
+        50,
+        config);
     assertThrows(IllegalArgumentException.class, opts::validate);
   }
 
@@ -82,8 +104,17 @@ class GossipCommandLineOptionsTest {
             + "listenPort=1080\n"
             + "networkInterface=\"127.0.0.1\"\n"
             + "messageLog=\"D:/Temp\"");
-    GossipCommandLineOptions opts =
-        new GossipCommandLineOptions(new String[] {"tcp://192.168.0.1:3000"}, 
400, "0.0.0.0", "C:/Temp", 3000, config);
+    GossipCommandLineOptions opts = new GossipCommandLineOptions(
+        new String[] {"tcp://192.168.0.1:3000"},
+        400,
+        "0.0.0.0",
+        "C:/Temp",
+        3000,
+        0,
+        0,
+        false,
+        50,
+        config);
     assertEquals(400, opts.listenPort());
     assertEquals(1, opts.peers().size());
     assertEquals(URI.create("tcp://192.168.0.1:3000"), opts.peers().get(0));
diff --git 
a/gossip/src/test/java/org/apache/tuweni/gossip/GossipIntegrationTest.java 
b/gossip/src/test/java/org/apache/tuweni/gossip/GossipIntegrationTest.java
index 8a0e31f..73a9d8d 100644
--- a/gossip/src/test/java/org/apache/tuweni/gossip/GossipIntegrationTest.java
+++ b/gossip/src/test/java/org/apache/tuweni/gossip/GossipIntegrationTest.java
@@ -48,6 +48,10 @@ class GossipIntegrationTest {
         "127.0.0.1",
         tempDir.resolve("log1.log").toString(),
         10000,
+        0,
+        0,
+        false,
+        50,
         null);
     GossipCommandLineOptions opts2 = new GossipCommandLineOptions(
         new String[] {"tcp://127.0.0.1:9000", "tcp://127.0.0.1:9002"},
@@ -55,6 +59,10 @@ class GossipIntegrationTest {
         "127.0.0.1",
         tempDir.resolve("log2.log").toString(),
         10001,
+        0,
+        0,
+        false,
+        50,
         null);
     GossipCommandLineOptions opts3 = new GossipCommandLineOptions(
         new String[] {"tcp://127.0.0.1:9000", "tcp://127.0.0.1:9001"},
@@ -62,6 +70,10 @@ class GossipIntegrationTest {
         "127.0.0.1",
         tempDir.resolve("log3.log").toString(),
         10002,
+        0,
+        0,
+        false,
+        50,
         null);
     AtomicBoolean terminationRan = new AtomicBoolean(false);
 
diff --git 
a/gossip/src/test/java/org/apache/tuweni/gossip/GossipIntegrationTest.java 
b/gossip/src/test/java/org/apache/tuweni/gossip/GossipLoadTest.java
similarity index 61%
copy from 
gossip/src/test/java/org/apache/tuweni/gossip/GossipIntegrationTest.java
copy to gossip/src/test/java/org/apache/tuweni/gossip/GossipLoadTest.java
index 8a0e31f..81b23b2 100644
--- a/gossip/src/test/java/org/apache/tuweni/gossip/GossipIntegrationTest.java
+++ b/gossip/src/test/java/org/apache/tuweni/gossip/GossipLoadTest.java
@@ -15,14 +15,14 @@ package org.apache.tuweni.gossip;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 
-import org.apache.tuweni.bytes.Bytes;
-import org.apache.tuweni.bytes.Bytes32;
-import org.apache.tuweni.junit.*;
+import org.apache.tuweni.junit.BouncyCastleExtension;
+import org.apache.tuweni.junit.TempDirectory;
+import org.apache.tuweni.junit.TempDirectoryExtension;
+import org.apache.tuweni.junit.VertxExtension;
+import org.apache.tuweni.junit.VertxInstance;
 
-import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -31,41 +31,68 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import io.vertx.core.Vertx;
-import io.vertx.core.buffer.Buffer;
-import io.vertx.core.http.HttpClient;
-import io.vertx.core.http.HttpMethod;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
+/**
+ * Very basic load test scenario in one JVM.
+ */
 @ExtendWith({VertxExtension.class, TempDirectoryExtension.class, 
BouncyCastleExtension.class})
-class GossipIntegrationTest {
+class GossipLoadTest {
 
+  @Disabled
   @Test
-  void threeGossipServersStarting(@VertxInstance Vertx vertx, @TempDirectory 
Path tempDir) throws Exception {
+  void fourGossipServersWithOneSender(@VertxInstance Vertx vertx, 
@TempDirectory Path tempDir) throws Exception {
+    int numberOfMessages = 10000;
+    int sendingInterval = 20;
     GossipCommandLineOptions opts1 = new GossipCommandLineOptions(
-        new String[] {"tcp://127.0.0.1:9001", "tcp://127.0.0.1:9002"},
+        new String[] {"tcp://127.0.0.1:9001", "tcp://127.0.0.1:9002", 
"tcp://127.0.0.1:9003"},
         9000,
         "127.0.0.1",
         tempDir.resolve("log1.log").toString(),
         10000,
+        500,
+        20,
+        true,
+        numberOfMessages,
         null);
     GossipCommandLineOptions opts2 = new GossipCommandLineOptions(
-        new String[] {"tcp://127.0.0.1:9000", "tcp://127.0.0.1:9002"},
+        new String[] {},
         9001,
         "127.0.0.1",
         tempDir.resolve("log2.log").toString(),
         10001,
+        0,
+        0,
+        false,
+        0,
         null);
     GossipCommandLineOptions opts3 = new GossipCommandLineOptions(
-        new String[] {"tcp://127.0.0.1:9000", "tcp://127.0.0.1:9001"},
+        new String[] {"tcp://127.0.0.1:9003"},
         9002,
         "127.0.0.1",
         tempDir.resolve("log3.log").toString(),
         10002,
+        0,
+        0,
+        false,
+        0,
+        null);
+    GossipCommandLineOptions opts4 = new GossipCommandLineOptions(
+        new String[] {},
+        9003,
+        "127.0.0.1",
+        tempDir.resolve("log4.log").toString(),
+        10003,
+        0,
+        0,
+        false,
+        0,
         null);
     AtomicBoolean terminationRan = new AtomicBoolean(false);
 
-    ExecutorService service = Executors.newFixedThreadPool(3);
+    ExecutorService service = Executors.newFixedThreadPool(4);
 
     Future<GossipApp> app1Future = service.submit(() -> {
       GossipApp app = new GossipApp(vertx, opts1, System.err, System.out, () 
-> {
@@ -88,43 +115,37 @@ class GossipIntegrationTest {
       app.start();
       return app;
     });
+    Future<GossipApp> app4Future = service.submit(() -> {
+      GossipApp app = new GossipApp(vertx, opts4, System.err, System.out, () 
-> {
+        terminationRan.set(true);
+      });
+      app.start();
+      return app;
+    });
     GossipApp app1 = app1Future.get(10, TimeUnit.SECONDS);
     GossipApp app2 = app2Future.get(10, TimeUnit.SECONDS);
     GossipApp app3 = app3Future.get(10, TimeUnit.SECONDS);
+    GossipApp app4 = app4Future.get(10, TimeUnit.SECONDS);
 
     assertFalse(terminationRan.get());
 
-    HttpClient client = vertx.createHttpClient();
-
-    for (int i = 0; i < 20; i++) {
-      client.request(HttpMethod.POST, 10000, "127.0.0.1", 
"/publish").exceptionHandler(thr -> {
-        throw new RuntimeException(thr);
-      }).handler(resp -> {
-
-      
}).end(Buffer.buffer(Bytes32.rightPad(Bytes.ofUnsignedInt(i)).toHexString().getBytes(StandardCharsets.UTF_8)));
-    }
-
-    List<String> receiver1 = Collections.emptyList();
-
-    int counter = 0;
-    do {
-      Thread.sleep(1000);
-      counter++;
-      if (Files.exists(tempDir.resolve("log2.log"))) {
-        receiver1 = Files.readAllLines(tempDir.resolve("log2.log"));
-      }
-    } while (receiver1.size() < 20 && counter < 20);
+    Thread.sleep((long) (numberOfMessages * sendingInterval * 1.33));
 
-    client.close();
 
     service.submit(app1::stop);
     service.submit(app2::stop);
     service.submit(app3::stop);
+    service.submit(app4::stop);
 
+    List<String> receiver2 = Files.readAllLines(tempDir.resolve("log2.log"));
+    List<String> receiver3 = Files.readAllLines(tempDir.resolve("log3.log"));
+    List<String> receiver4 = Files.readAllLines(tempDir.resolve("log4.log"));
+    System.out.println("n2: " + receiver2.size() + "\n3: " + receiver3.size() 
+ "\n4: " + receiver4.size());
+    assertEquals(numberOfMessages, receiver2.size());
+    assertEquals(numberOfMessages, receiver3.size());
+    assertEquals(numberOfMessages, receiver4.size());
 
-    assertEquals(20, receiver1.size());
-    List<String> receiver2 = Files.readAllLines(tempDir.resolve("log3.log"));
-    assertEquals(20, receiver2.size());
+    assertFalse(tempDir.resolve("log1.log").toFile().exists());
 
     service.shutdown();
 
diff --git 
a/plumtree/src/main/java/org/apache/tuweni/plumtree/EphemeralPeerRepository.java
 
b/plumtree/src/main/java/org/apache/tuweni/plumtree/EphemeralPeerRepository.java
index 7d0f4f7..883ae2a 100644
--- 
a/plumtree/src/main/java/org/apache/tuweni/plumtree/EphemeralPeerRepository.java
+++ 
b/plumtree/src/main/java/org/apache/tuweni/plumtree/EphemeralPeerRepository.java
@@ -55,9 +55,10 @@ public final class EphemeralPeerRepository implements 
PeerRepository {
   }
 
   @Override
-  public void moveToLazy(Peer peer) {
+  public boolean moveToLazy(Peer peer) {
     eagerPushPeers.remove(peer);
     lazyPushPeers.add(peer);
+    return true;
   }
 
   @Override
diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/PeerPruning.java 
b/plumtree/src/main/java/org/apache/tuweni/plumtree/PeerPruning.java
new file mode 100644
index 0000000..f33eaf1
--- /dev/null
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/PeerPruning.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.tuweni.plumtree;
+
+/**
+ * Interface to decide whether to prune peers when they send messages late.
+ *
+ * Pruned peers become "lazy peers". They send message attestations (IHAVE).
+ */
+public interface PeerPruning {
+
+  /**
+   * Decides whether to prune a peer
+   *
+   * @param peer the peer to consider
+   * @return true if the peer should be pruned
+   */
+  boolean prunePeer(Peer peer);
+}
diff --git 
a/plumtree/src/main/java/org/apache/tuweni/plumtree/PeerRepository.java 
b/plumtree/src/main/java/org/apache/tuweni/plumtree/PeerRepository.java
index 9f0419c..d0f7f5d 100644
--- a/plumtree/src/main/java/org/apache/tuweni/plumtree/PeerRepository.java
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/PeerRepository.java
@@ -54,8 +54,9 @@ public interface PeerRepository {
    * Moves a peer to the list of lazy peers
    * 
    * @param peer the peer to move
+   * @return true if the move was effective
    */
-  void moveToLazy(Peer peer);
+  boolean moveToLazy(Peer peer);
 
   /**
    * Moves a peer to the list of eager peers.
diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java 
b/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java
index 26832ad..463583b 100644
--- a/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java
@@ -48,6 +48,7 @@ public final class State {
   private final MessageSender messageSender;
   private final Consumer<Bytes> messageListener;
   private final MessageValidator messageValidator;
+  private final PeerPruning peerPruningFunction;
   final Queue<Runnable> lazyQueue = new ConcurrentLinkedQueue<>();
   private final Timer timer = new Timer("plumtree", true);
   private final long delay;
@@ -66,7 +67,7 @@ public final class State {
     }
 
     /**
-     * Acts on receiving the full message
+     * Acts on receiving the full message.
      * 
      * @param sender the sender - may be null if we are submitting this 
message to the network
      * @param message the payload to send to the network
@@ -98,8 +99,9 @@ public final class State {
         }
       } else {
         if (sender != null) {
-          messageSender.sendMessage(MessageSender.Verb.PRUNE, null, sender, 
hash, null);
-          peerRepository.moveToLazy(sender);
+          if (peerPruningFunction.prunePeer(sender)) {
+            messageSender.sendMessage(MessageSender.Verb.PRUNE, null, sender, 
hash, null);
+          }
         }
       }
     }
@@ -138,24 +140,35 @@ public final class State {
    * @param messageSender a function abstracting sending messages to other 
peers.
    * @param messageListener a function consuming messages when they are 
gossiped.
    * @param messageValidator a function validating messages before they are 
gossiped to other peers.
+   * @param peerPruningFunction a function deciding whether to prune peers.
    */
   public State(
       PeerRepository peerRepository,
       MessageHashing messageHashingFunction,
       MessageSender messageSender,
       Consumer<Bytes> messageListener,
-      MessageValidator messageValidator) {
-    this(peerRepository, messageHashingFunction, messageSender, 
messageListener, messageValidator, 5000, 5000);
+      MessageValidator messageValidator,
+      PeerPruning peerPruningFunction) {
+    this(
+        peerRepository,
+        messageHashingFunction,
+        messageSender,
+        messageListener,
+        messageValidator,
+        peerPruningFunction,
+        5000,
+        5000);
   }
 
   /**
-   * Constructor using default time constants.
+   * Default constructor.
    * 
    * @param peerRepository the peer repository to use to store and access peer 
information.
    * @param messageHashingFunction the function to use to hash messages into 
hashes to compare them.
    * @param messageSender a function abstracting sending messages to other 
peers.
    * @param messageListener a function consuming messages when they are 
gossiped.
    * @param messageValidator a function validating messages before they are 
gossiped to other peers.
+   * @param peerPruningFunction a function deciding whether to prune peers.
    * @param graftDelay delay in milliseconds to apply before this peer grafts 
an other peer when it finds that peer has
    *        data it misses.
    * @param lazyQueueInterval the interval in milliseconds between sending 
messages to lazy peers.
@@ -166,6 +179,7 @@ public final class State {
       MessageSender messageSender,
       Consumer<Bytes> messageListener,
       MessageValidator messageValidator,
+      PeerPruning peerPruningFunction,
       long graftDelay,
       long lazyQueueInterval) {
     this.peerRepository = peerRepository;
@@ -173,6 +187,7 @@ public final class State {
     this.messageSender = messageSender;
     this.messageListener = messageListener;
     this.messageValidator = messageValidator;
+    this.peerPruningFunction = peerPruningFunction;
     this.delay = graftDelay;
     timer.schedule(new TimerTask() {
       @Override
@@ -208,9 +223,13 @@ public final class State {
    * @param attributes of the message
    * @param message the hash of the message
    */
-  public void receiveGossipMessage(Peer peer, String attributes, Bytes 
message) {
+  public void receiveGossipMessage(Peer peer, String attributes, Bytes 
message, Bytes messageHash) {
+    Bytes checkHash = messageHashingFunction.hash(message);
+    if (!checkHash.equals(messageHash)) {
+      return;
+    }
     peerRepository.considerNewPeer(peer);
-    MessageHandler handler = 
messageHandlers.computeIfAbsent(messageHashingFunction.hash(message), 
MessageHandler::new);
+    MessageHandler handler = messageHandlers.computeIfAbsent(messageHash, 
MessageHandler::new);
     handler.fullMessageReceived(peer, attributes, message);
   }
 
@@ -226,7 +245,7 @@ public final class State {
   }
 
   /**
-   * Requests a peer be pruned away from the eager peers into the lazy peers
+   * Requests a peer be pruned away from the eager peers into the lazy peers.
    *
    * @param peer the peer to move to lazy peers
    */
@@ -235,7 +254,7 @@ public final class State {
   }
 
   /**
-   * Requests a peer be grafted to the eager peers list
+   * Requests a peer be grafted to the eager peers list.
    *
    * @param peer the peer to add to the eager peers
    * @param messageHash the hash of the message that triggers this grafting
diff --git 
a/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
 
b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
index dce838d..5d8e75a 100644
--- 
a/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
+++ 
b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
@@ -19,6 +19,7 @@ import org.apache.tuweni.plumtree.MessageHashing;
 import org.apache.tuweni.plumtree.MessageSender;
 import org.apache.tuweni.plumtree.MessageValidator;
 import org.apache.tuweni.plumtree.Peer;
+import org.apache.tuweni.plumtree.PeerPruning;
 import org.apache.tuweni.plumtree.PeerRepository;
 import org.apache.tuweni.plumtree.State;
 
@@ -78,10 +79,14 @@ public final class VertxGossipServer {
 
         switch (message.verb) {
           case IHAVE:
-            state.receiveIHaveMessage(peer, 
Bytes.fromHexString(message.payload));
+            state.receiveIHaveMessage(peer, Bytes.fromHexString(message.hash));
             break;
           case GOSSIP:
-            state.receiveGossipMessage(peer, message.attributes, 
Bytes.fromHexString(message.payload));
+            state.receiveGossipMessage(
+                peer,
+                message.attributes,
+                Bytes.fromHexString(message.payload),
+                Bytes.fromHexString(message.hash));
             break;
           case GRAFT:
             state.receiveGraftMessage(peer, 
Bytes.fromHexString(message.payload));
@@ -98,17 +103,20 @@ public final class VertxGossipServer {
     }
   }
 
-  private final AtomicBoolean started = new AtomicBoolean(false);
-  private final Vertx vertx;
-  private final int port;
+  private NetClient client;
+  private final int graftDelay;
+  private final int lazyQueueInterval;
   private final MessageHashing messageHashing;
   private final String networkInterface;
-  private final PeerRepository peerRepository;
   private final Consumer<Bytes> payloadListener;
   private final MessageValidator payloadValidator;
-  private State state;
+  private final PeerPruning peerPruningFunction;
+  private final PeerRepository peerRepository;
+  private final int port;
   private NetServer server;
-  private NetClient client;
+  private final AtomicBoolean started = new AtomicBoolean(false);
+  private State state;
+  private final Vertx vertx;
 
   public VertxGossipServer(
       Vertx vertx,
@@ -117,7 +125,10 @@ public final class VertxGossipServer {
       MessageHashing messageHashing,
       PeerRepository peerRepository,
       Consumer<Bytes> payloadListener,
-      @Nullable MessageValidator payloadValidator) {
+      @Nullable MessageValidator payloadValidator,
+      @Nullable PeerPruning peerPruningFunction,
+      int graftDelay,
+      int lazyQueueInterval) {
     this.vertx = vertx;
     this.networkInterface = networkInterface;
     this.port = port;
@@ -125,6 +136,9 @@ public final class VertxGossipServer {
     this.peerRepository = peerRepository;
     this.payloadListener = payloadListener;
     this.payloadValidator = payloadValidator == null ? (bytes, peer) -> true : 
payloadValidator;
+    this.peerPruningFunction = peerPruningFunction == null ? (peer) -> true : 
peerPruningFunction;
+    this.graftDelay = graftDelay;
+    this.lazyQueueInterval = lazyQueueInterval;
   }
 
   public AsyncCompletion start() {
@@ -135,24 +149,27 @@ public final class VertxGossipServer {
       server.connectHandler(socket -> {
         Peer peer = new SocketPeer(socket);
         SocketHandler handler = new SocketHandler(peer);
-        socket.handler(handler::handle).closeHandler(handler::close);
+        
socket.handler(handler::handle).closeHandler(handler::close).exceptionHandler(Throwable::printStackTrace);
       });
+      server.exceptionHandler(Throwable::printStackTrace);
       server.listen(port, networkInterface, res -> {
         if (res.failed()) {
           completion.completeExceptionally(res.cause());
         } else {
           state = new State(peerRepository, messageHashing, (verb, attributes, 
peer, hash, payload) -> {
-            Message message = new Message();
-            message.verb = verb;
-            message.attributes = attributes;
-            message.hash = hash.toHexString();
-            message.payload = payload == null ? null : payload.toHexString();
-            try {
-              ((SocketPeer) 
peer).socket().write(Buffer.buffer(mapper.writeValueAsBytes(message)));
-            } catch (JsonProcessingException e) {
-              throw new RuntimeException(e);
-            }
-          }, payloadListener, payloadValidator);
+            vertx.runOnContext(handler -> {
+              Message message = new Message();
+              message.verb = verb;
+              message.attributes = attributes;
+              message.hash = hash.toHexString();
+              message.payload = payload == null ? null : payload.toHexString();
+              try {
+                ((SocketPeer) 
peer).socket().write(Buffer.buffer(mapper.writeValueAsBytes(message)));
+              } catch (JsonProcessingException e) {
+                throw new RuntimeException(e);
+              }
+            });
+          }, payloadListener, payloadValidator, peerPruningFunction, 
graftDelay, lazyQueueInterval);
 
           completion.complete();
         }
diff --git a/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java 
b/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java
index b171ee8..01a31cc 100644
--- a/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java
+++ b/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java
@@ -56,7 +56,13 @@ class StateTest {
   @Test
   void testInitialState() {
     EphemeralPeerRepository repo = new EphemeralPeerRepository();
-    State state = new State(repo, Hash::keccak256, new MockMessageSender(), 
messageRef::set, (message, peer) -> true);
+    State state = new State(
+        repo,
+        Hash::keccak256,
+        new MockMessageSender(),
+        messageRef::set,
+        (message, peer) -> true,
+        (peer) -> true);
     assertTrue(repo.peers().isEmpty());
     assertTrue(repo.lazyPushPeers().isEmpty());
     assertTrue(repo.eagerPushPeers().isEmpty());
@@ -65,7 +71,13 @@ class StateTest {
   @Test
   void firstRoundWithThreePeers() {
     EphemeralPeerRepository repo = new EphemeralPeerRepository();
-    State state = new State(repo, Hash::keccak256, new MockMessageSender(), 
messageRef::set, (message, peer) -> true);
+    State state = new State(
+        repo,
+        Hash::keccak256,
+        new MockMessageSender(),
+        messageRef::set,
+        (message, peer) -> true,
+        (peer) -> true);
     state.addPeer(new PeerImpl());
     state.addPeer(new PeerImpl());
     state.addPeer(new PeerImpl());
@@ -76,7 +88,13 @@ class StateTest {
   @Test
   void firstRoundWithTwoPeers() {
     EphemeralPeerRepository repo = new EphemeralPeerRepository();
-    State state = new State(repo, Hash::keccak256, new MockMessageSender(), 
messageRef::set, (message, peer) -> true);
+    State state = new State(
+        repo,
+        Hash::keccak256,
+        new MockMessageSender(),
+        messageRef::set,
+        (message, peer) -> true,
+        (peer) -> true);
     state.addPeer(new PeerImpl());
     state.addPeer(new PeerImpl());
     assertTrue(repo.lazyPushPeers().isEmpty());
@@ -86,7 +104,13 @@ class StateTest {
   @Test
   void firstRoundWithOnePeer() {
     EphemeralPeerRepository repo = new EphemeralPeerRepository();
-    State state = new State(repo, Hash::keccak256, new MockMessageSender(), 
messageRef::set, (message, peer) -> true);
+    State state = new State(
+        repo,
+        Hash::keccak256,
+        new MockMessageSender(),
+        messageRef::set,
+        (message, peer) -> true,
+        (peer) -> true);
     state.addPeer(new PeerImpl());
     assertTrue(repo.lazyPushPeers().isEmpty());
     assertEquals(1, repo.eagerPushPeers().size());
@@ -95,7 +119,13 @@ class StateTest {
   @Test
   void removePeer() {
     EphemeralPeerRepository repo = new EphemeralPeerRepository();
-    State state = new State(repo, Hash::keccak256, new MockMessageSender(), 
messageRef::set, (message, peer) -> true);
+    State state = new State(
+        repo,
+        Hash::keccak256,
+        new MockMessageSender(),
+        messageRef::set,
+        (message, peer) -> true,
+        (peer) -> true);
     Peer peer = new PeerImpl();
     state.addPeer(peer);
     state.removePeer(peer);
@@ -107,7 +137,13 @@ class StateTest {
   @Test
   void prunePeer() {
     EphemeralPeerRepository repo = new EphemeralPeerRepository();
-    State state = new State(repo, Hash::keccak256, new MockMessageSender(), 
messageRef::set, (message, peer) -> true);
+    State state = new State(
+        repo,
+        Hash::keccak256,
+        new MockMessageSender(),
+        messageRef::set,
+        (message, peer) -> true,
+        (peer) -> true);
     Peer peer = new PeerImpl();
     state.addPeer(peer);
     state.receivePruneMessage(peer);
@@ -118,7 +154,13 @@ class StateTest {
   @Test
   void graftPeer() {
     EphemeralPeerRepository repo = new EphemeralPeerRepository();
-    State state = new State(repo, Hash::keccak256, new MockMessageSender(), 
messageRef::set, (message, peer) -> true);
+    State state = new State(
+        repo,
+        Hash::keccak256,
+        new MockMessageSender(),
+        messageRef::set,
+        (message, peer) -> true,
+        (peer) -> true);
     Peer peer = new PeerImpl();
     state.addPeer(peer);
     state.receivePruneMessage(peer);
@@ -133,14 +175,15 @@ class StateTest {
   void receiveFullMessageFromEagerPeer() {
     EphemeralPeerRepository repo = new EphemeralPeerRepository();
     MockMessageSender messageSender = new MockMessageSender();
-    State state = new State(repo, Hash::keccak256, messageSender, 
messageRef::set, (message, peer) -> true);
+    State state =
+        new State(repo, Hash::keccak256, messageSender, messageRef::set, 
(message, peer) -> true, (peer) -> true);
     Peer peer = new PeerImpl();
     state.addPeer(peer);
     Peer otherPeer = new PeerImpl();
     state.addPeer(otherPeer);
     Bytes32 msg = Bytes32.random();
     String attributes = "{\"message_type\": \"BLOCK\"}";
-    state.receiveGossipMessage(peer, attributes, msg);
+    state.receiveGossipMessage(peer, attributes, msg, Hash.keccak256(msg));
     assertEquals(msg, messageSender.payload);
     assertEquals(otherPeer, messageSender.peer);
   }
@@ -149,7 +192,8 @@ class StateTest {
   void receiveFullMessageFromEagerPeerWithALazyPeer() {
     EphemeralPeerRepository repo = new EphemeralPeerRepository();
     MockMessageSender messageSender = new MockMessageSender();
-    State state = new State(repo, Hash::keccak256, messageSender, 
messageRef::set, (message, peer) -> true);
+    State state =
+        new State(repo, Hash::keccak256, messageSender, messageRef::set, 
(message, peer) -> true, (peer) -> true);
     Peer peer = new PeerImpl();
     state.addPeer(peer);
     Peer otherPeer = new PeerImpl();
@@ -159,7 +203,7 @@ class StateTest {
     state.addPeer(lazyPeer);
     repo.moveToLazy(lazyPeer);
     String attributes = "{\"message_type\": \"BLOCK\"}";
-    state.receiveGossipMessage(peer, attributes, msg);
+    state.receiveGossipMessage(peer, attributes, msg, Hash.keccak256(msg));
     assertEquals(msg, messageSender.payload);
     assertEquals(otherPeer, messageSender.peer);
     state.processQueue();
@@ -173,7 +217,8 @@ class StateTest {
   void receiveFullMessageFromEagerPeerThenPartialMessageFromLazyPeer() {
     EphemeralPeerRepository repo = new EphemeralPeerRepository();
     MockMessageSender messageSender = new MockMessageSender();
-    State state = new State(repo, Hash::keccak256, messageSender, 
messageRef::set, (message, peer) -> true);
+    State state =
+        new State(repo, Hash::keccak256, messageSender, messageRef::set, 
(message, peer) -> true, (peer) -> true);
     Peer peer = new PeerImpl();
     state.addPeer(peer);
     Peer lazyPeer = new PeerImpl();
@@ -181,7 +226,7 @@ class StateTest {
     repo.moveToLazy(lazyPeer);
     Bytes message = Bytes32.random();
     String attributes = "{\"message_type\": \"BLOCK\"}";
-    state.receiveGossipMessage(peer, attributes, message);
+    state.receiveGossipMessage(peer, attributes, message, 
Hash.keccak256(message));
     state.receiveIHaveMessage(lazyPeer, message);
     assertNull(messageSender.payload);
     assertNull(messageSender.peer);
@@ -191,7 +236,15 @@ class StateTest {
   void receivePartialMessageFromLazyPeerAndNoFullMessage() throws Exception {
     EphemeralPeerRepository repo = new EphemeralPeerRepository();
     MockMessageSender messageSender = new MockMessageSender();
-    State state = new State(repo, Hash::keccak256, messageSender, 
messageRef::set, (message, peer) -> true, 100, 4000);
+    State state = new State(
+        repo,
+        Hash::keccak256,
+        messageSender,
+        messageRef::set,
+        (message, peer) -> true,
+        (peer) -> true,
+        100,
+        4000);
     Peer peer = new PeerImpl();
     state.addPeer(peer);
     Peer lazyPeer = new PeerImpl();
@@ -209,7 +262,15 @@ class StateTest {
   void receivePartialMessageFromLazyPeerAndThenFullMessage() throws Exception {
     EphemeralPeerRepository repo = new EphemeralPeerRepository();
     MockMessageSender messageSender = new MockMessageSender();
-    State state = new State(repo, Hash::keccak256, messageSender, 
messageRef::set, (message, peer) -> true, 500, 4000);
+    State state = new State(
+        repo,
+        Hash::keccak256,
+        messageSender,
+        messageRef::set,
+        (message, peer) -> true,
+        (peer) -> true,
+        500,
+        4000);
     Peer peer = new PeerImpl();
     state.addPeer(peer);
     Peer lazyPeer = new PeerImpl();
@@ -219,7 +280,7 @@ class StateTest {
     state.receiveIHaveMessage(lazyPeer, Hash.keccak256(message));
     Thread.sleep(100);
     String attributes = "{\"message_type\": \"BLOCK\"}";
-    state.receiveGossipMessage(peer, attributes, message);
+    state.receiveGossipMessage(peer, attributes, message, 
Hash.keccak256(message));
     Thread.sleep(500);
     assertNull(messageSender.verb);
     assertNull(messageSender.payload);
@@ -230,11 +291,12 @@ class StateTest {
   void receiveFullMessageFromUnknownPeer() {
     EphemeralPeerRepository repo = new EphemeralPeerRepository();
     MockMessageSender messageSender = new MockMessageSender();
-    State state = new State(repo, Hash::keccak256, messageSender, 
messageRef::set, (message, peer) -> true);
+    State state =
+        new State(repo, Hash::keccak256, messageSender, messageRef::set, 
(message, peer) -> true, (peer) -> true);
     Peer peer = new PeerImpl();
     Bytes message = Bytes32.random();
     String attributes = "{\"message_type\": \"BLOCK\"}";
-    state.receiveGossipMessage(peer, attributes, message);
+    state.receiveGossipMessage(peer, attributes, message, 
Hash.keccak256(message));
     assertEquals(1, repo.eagerPushPeers().size());
     assertEquals(0, repo.lazyPushPeers().size());
     assertEquals(peer, repo.eagerPushPeers().iterator().next());
@@ -244,15 +306,16 @@ class StateTest {
   void prunePeerWhenReceivingTwiceTheSameFullMessage() {
     EphemeralPeerRepository repo = new EphemeralPeerRepository();
     MockMessageSender messageSender = new MockMessageSender();
-    State state = new State(repo, Hash::keccak256, messageSender, 
messageRef::set, (message, peer) -> true);
+    State state =
+        new State(repo, Hash::keccak256, messageSender, messageRef::set, 
(message, peer) -> true, (peer) -> true);
     Peer peer = new PeerImpl();
     Peer secondPeer = new PeerImpl();
     Bytes message = Bytes32.random();
     String attributes = "{\"message_type\": \"BLOCK\"}";
-    state.receiveGossipMessage(peer, attributes, message);
-    state.receiveGossipMessage(secondPeer, attributes, message);
-    assertEquals(1, repo.eagerPushPeers().size());
-    assertEquals(1, repo.lazyPushPeers().size());
+    state.receiveGossipMessage(peer, attributes, message, 
Hash.keccak256(message));
+    state.receiveGossipMessage(secondPeer, attributes, message, 
Hash.keccak256(message));
+    assertEquals(2, repo.eagerPushPeers().size());
+    assertEquals(0, repo.lazyPushPeers().size());
     assertNull(messageSender.payload);
     assertEquals(secondPeer, messageSender.peer);
     assertEquals(MessageSender.Verb.PRUNE, messageSender.verb);
diff --git 
a/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
 
b/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
index d5dd5d8..29ae06c 100644
--- 
a/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
+++ 
b/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
@@ -43,7 +43,10 @@ class VertxGossipServerTest {
         bytes -> bytes,
         new EphemeralPeerRepository(),
         messageReceived1::set,
-        (message, peer) -> true);
+        (message, peer) -> true,
+        null,
+        200,
+        200);
     VertxGossipServer server2 = new VertxGossipServer(
         vertx,
         "127.0.0.1",
@@ -51,7 +54,10 @@ class VertxGossipServerTest {
         bytes -> bytes,
         new EphemeralPeerRepository(),
         messageReceived2::set,
-        (message, peer) -> true);
+        (message, peer) -> true,
+        null,
+        200,
+        200);
 
     server1.start().join();
     server2.start().join();
@@ -85,7 +91,10 @@ class VertxGossipServerTest {
         bytes -> bytes,
         new EphemeralPeerRepository(),
         messageReceived1::set,
-        (message, peer) -> true);
+        (message, peer) -> true,
+        null,
+        200,
+        200);
     VertxGossipServer server2 = new VertxGossipServer(
         vertx,
         "127.0.0.1",
@@ -93,7 +102,10 @@ class VertxGossipServerTest {
         bytes -> bytes,
         new EphemeralPeerRepository(),
         messageReceived2::set,
-        (message, peer) -> true);
+        (message, peer) -> true,
+        null,
+        200,
+        200);
     VertxGossipServer server3 = new VertxGossipServer(
         vertx,
         "127.0.0.1",
@@ -101,7 +113,10 @@ class VertxGossipServerTest {
         bytes -> bytes,
         new EphemeralPeerRepository(),
         messageReceived3::set,
-        (message, peer) -> true);
+        (message, peer) -> true,
+        null,
+        200,
+        200);
 
     server1.start().join();
     server2.start().join();
@@ -141,7 +156,10 @@ class VertxGossipServerTest {
         bytes -> bytes,
         peerRepository1,
         messageReceived1::set,
-        (message, peer) -> true);
+        (message, peer) -> true,
+        null,
+        200,
+        200);
     VertxGossipServer server2 = new VertxGossipServer(
         vertx,
         "127.0.0.1",
@@ -149,7 +167,10 @@ class VertxGossipServerTest {
         bytes -> bytes,
         new EphemeralPeerRepository(),
         messageReceived2::set,
-        (message, peer) -> true);
+        (message, peer) -> true,
+        null,
+        200,
+        200);
     VertxGossipServer server3 = new VertxGossipServer(
         vertx,
         "127.0.0.1",
@@ -157,7 +178,10 @@ class VertxGossipServerTest {
         bytes -> bytes,
         peerRepository3,
         messageReceived2::set,
-        (message, peer) -> true);
+        (message, peer) -> true,
+        null,
+        200,
+        200);
 
     server1.start().join();
     server2.start().join();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@tuweni.apache.org
For additional commands, e-mail: commits-h...@tuweni.apache.org

Reply via email to