This is an automated email from the ASF dual-hosted git repository. toulmean pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
commit 1da45edfe6114b0632690e00e1f4728c45572944 Author: Antoine Toulme <[email protected]> AuthorDate: Wed Apr 3 16:58:53 2019 -0700 Add a sample integration example to plumtree --- plumtree/build.gradle | 4 + .../{StateActor.java => MessageValidator.java} | 18 +- .../main/java/org/apache/tuweni/plumtree/Peer.java | 5 +- .../java/org/apache/tuweni/plumtree/State.java | 115 +++++++++--- .../SocketPeer.java} | 26 ++- .../tuweni/plumtree/vertx/VertxGossipServer.java | 208 +++++++++++++++++++++ .../java/org/apache/tuweni/plumtree/StateTest.java | 92 ++++----- .../plumtree/vertx/VertxGossipServerTest.java | 165 ++++++++++++++++ 8 files changed, 562 insertions(+), 71 deletions(-) diff --git a/plumtree/build.gradle b/plumtree/build.gradle index 9d6569f..041b8bb 100644 --- a/plumtree/build.gradle +++ b/plumtree/build.gradle @@ -2,9 +2,13 @@ description = 'Plumtree - Push-Lazy-pUsh Multicast TREE, an implementation of Ep dependencies { compile project(':bytes') + compile project(':concurrent') compile project(':crypto') + compileOnly 'io.vertx:vertx-core' + testCompile project(':junit') + testCompile 'io.vertx:vertx-core' testCompile 'org.bouncycastle:bcprov-jdk15on' testCompile 'org.junit.jupiter:junit-jupiter-api' testCompile 'org.junit.jupiter:junit-jupiter-params' diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/StateActor.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageValidator.java similarity index 63% rename from plumtree/src/main/java/org/apache/tuweni/plumtree/StateActor.java rename to plumtree/src/main/java/org/apache/tuweni/plumtree/MessageValidator.java index dc5029d..d17132d 100644 --- a/plumtree/src/main/java/org/apache/tuweni/plumtree/StateActor.java +++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageValidator.java @@ -12,5 +12,21 @@ */ package org.apache.tuweni.plumtree; -public interface StateActor { +import org.apache.tuweni.bytes.Bytes; + +/** + * Validator for a message and a peer. + * + * This validator is called prior to gossiping the message from that peer to other peers. + */ +public interface MessageValidator { + + /** + * Validates that the message from the peer is valid. + * + * @param message the payload sent over the network + * @param peer the peer that sent the message + * @return true if the message is valid + */ + boolean validate(Bytes message, Peer peer); } diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/Peer.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/Peer.java index bf42d86..10e87b3 100644 --- a/plumtree/src/main/java/org/apache/tuweni/plumtree/Peer.java +++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/Peer.java @@ -12,5 +12,8 @@ */ package org.apache.tuweni.plumtree; -public class Peer { +/** + * A peer part of the gossip system. + */ +public interface Peer { } diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java index 36ecd37..f35f358 100644 --- a/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java +++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java @@ -15,15 +15,18 @@ package org.apache.tuweni.plumtree; import org.apache.tuweni.bytes.Bytes; import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Timer; import java.util.TimerTask; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.stream.Collectors; +import javax.annotation.Nullable; /** * Local state to our peer, representing the make-up of the tree of peers. @@ -32,8 +35,19 @@ public final class State { private final PeerRepository peerRepository; private final MessageHashing messageHashingFunction; - private final Map<Bytes, MessageHandler> messageHandlers = new ConcurrentHashMap<>(); + private final int maxMessagesHandlers = 1000000; + private final Map<Bytes, MessageHandler> messageHandlers = + Collections.synchronizedMap(new LinkedHashMap<Bytes, MessageHandler>() { + + @Override + protected boolean removeEldestEntry(final Map.Entry<Bytes, MessageHandler> eldest) { + return super.size() > maxMessagesHandlers; + } + }); + private final MessageSender messageSender; + private final Consumer<Bytes> messageListener; + private final MessageValidator messageValidator; private final Queue<Runnable> lazyQueue = new ConcurrentLinkedQueue<>(); private final Timer timer = new Timer("plumtree", true); private final long delay; @@ -41,6 +55,7 @@ public final class State { final class MessageHandler { private final Bytes hash; + private final AtomicBoolean receivedFullMessage = new AtomicBoolean(false); private final AtomicBoolean requestingGraftMessage = new AtomicBoolean(false); private List<TimerTask> tasks = new ArrayList<>(); @@ -50,26 +65,38 @@ public final class State { this.hash = hash; } - void fullMessageReceived(Peer sender, Bytes message) { + /** + * Acts on receiving the full message + * + * @param sender the sender - may be null if we are submitting this message to the network + * @param message the payload to send to the network + */ + void fullMessageReceived(@Nullable Peer sender, Bytes message) { if (receivedFullMessage.compareAndSet(false, true)) { for (TimerTask task : tasks) { task.cancel(); } - for (Peer peer : peerRepository.eagerPushPeers()) { - if (!sender.equals(peer)) { - messageSender.sendMessage(MessageSender.Verb.GOSSIP, peer, message); + + if (sender == null || messageValidator.validate(message, sender)) { + for (Peer peer : peerRepository.eagerPushPeers()) { + if (sender == null || !sender.equals(peer)) { + messageSender.sendMessage(MessageSender.Verb.GOSSIP, peer, message); + } } + lazyQueue.addAll( + peerRepository + .lazyPushPeers() + .stream() + .filter(p -> !lazyPeers.contains(p)) + .map(peer -> (Runnable) (() -> messageSender.sendMessage(MessageSender.Verb.IHAVE, peer, hash))) + .collect(Collectors.toList())); + messageListener.accept(message); } - lazyQueue.addAll( - peerRepository - .lazyPushPeers() - .stream() - .filter(p -> !lazyPeers.contains(p)) - .map(peer -> (Runnable) (() -> messageSender.sendMessage(MessageSender.Verb.IHAVE, peer, hash))) - .collect(Collectors.toList())); } else { - messageSender.sendMessage(MessageSender.Verb.PRUNE, sender, null); - peerRepository.moveToLazy(sender); + if (sender != null) { + messageSender.sendMessage(MessageSender.Verb.PRUNE, sender, null); + peerRepository.moveToLazy(sender); + } } } @@ -99,19 +126,49 @@ public final class State { } } - public State(PeerRepository peerRepository, MessageHashing messageHashingFunction, MessageSender messageSender) { - this(peerRepository, messageHashingFunction, messageSender, 5000, 5000); + /** + * Constructor using default time constants. + * + * @param peerRepository the peer repository to use to store and access peer information. + * @param messageHashingFunction the function to use to hash messages into hashes to compare them. + * @param messageSender a function abstracting sending messages to other peers. + * @param messageListener a function consuming messages when they are gossiped. + * @param messageValidator a function validating messages before they are gossiped to other peers. + */ + public State( + PeerRepository peerRepository, + MessageHashing messageHashingFunction, + MessageSender messageSender, + Consumer<Bytes> messageListener, + MessageValidator messageValidator) { + this(peerRepository, messageHashingFunction, messageSender, messageListener, messageValidator, 5000, 5000); } + /** + * Constructor using default time constants. + * + * @param peerRepository the peer repository to use to store and access peer information. + * @param messageHashingFunction the function to use to hash messages into hashes to compare them. + * @param messageSender a function abstracting sending messages to other peers. + * @param messageListener a function consuming messages when they are gossiped. + * @param messageValidator a function validating messages before they are gossiped to other peers. + * @param graftDelay delay in milliseconds to apply before this peer grafts an other peer when it finds that peer has + * data it misses. + * @param lazyQueueInterval the interval in milliseconds between sending messages to lazy peers. + */ public State( PeerRepository peerRepository, MessageHashing messageHashingFunction, MessageSender messageSender, + Consumer<Bytes> messageListener, + MessageValidator messageValidator, long graftDelay, long lazyQueueInterval) { this.peerRepository = peerRepository; this.messageHashingFunction = messageHashingFunction; this.messageSender = messageSender; + this.messageListener = messageListener; + this.messageValidator = messageValidator; this.delay = graftDelay; timer.schedule(new TimerTask() { @Override @@ -132,7 +189,7 @@ public final class State { /** * Removes a peer from the collection of peers we are connected to. - * + * * @param peer the peer to remove */ public void removePeer(Peer peer) { @@ -141,31 +198,31 @@ public final class State { } /** - * Records a message was received in full from a peer + * Records a message was received in full from a peer. * * @param peer the peer that sent the message * @param message the hash of the message */ - public void receiveFullMessage(Peer peer, Bytes message) { + public void receiveGossipMessage(Peer peer, Bytes message) { peerRepository.considerNewPeer(peer); MessageHandler handler = messageHandlers.computeIfAbsent(messageHashingFunction.hash(message), MessageHandler::new); handler.fullMessageReceived(peer, message); } /** - * Records a message was partially received from a peer + * Records a message was partially received from a peer. * * @param peer the peer that sent the message * @param messageHash the hash of the message */ - public void receiveHeaderMessage(Peer peer, Bytes messageHash) { + public void receiveIHaveMessage(Peer peer, Bytes messageHash) { MessageHandler handler = messageHandlers.computeIfAbsent(messageHash, MessageHandler::new); handler.partialMessageReceived(peer); } /** * Requests a peer be pruned away from the eager peers into the lazy peers - * + * * @param peer the peer to move to lazy peers */ public void receivePruneMessage(Peer peer) { @@ -174,7 +231,7 @@ public final class State { /** * Requests a peer be grafted to the eager peers list - * + * * @param peer the peer to add to the eager peers * @param messageHash the hash of the message that triggers this grafting */ @@ -183,6 +240,16 @@ public final class State { messageSender.sendMessage(MessageSender.Verb.GOSSIP, peer, messageHash); } + /** + * Sends a gossip message to all peers, according to their status. + * + * @param message the message to propagate + */ + public void sendGossipMessage(Bytes message) { + MessageHandler handler = messageHandlers.computeIfAbsent(messageHashingFunction.hash(message), MessageHandler::new); + handler.fullMessageReceived(null, message); + } + void processQueue() { for (Runnable r : lazyQueue) { r.run(); diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/StateActorFactory.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/SocketPeer.java similarity index 63% rename from plumtree/src/main/java/org/apache/tuweni/plumtree/StateActorFactory.java rename to plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/SocketPeer.java index 61d076b..d5322c2 100644 --- a/plumtree/src/main/java/org/apache/tuweni/plumtree/StateActorFactory.java +++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/SocketPeer.java @@ -10,9 +10,29 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package org.apache.tuweni.plumtree; +package org.apache.tuweni.plumtree.vertx; -public interface StateActorFactory { +import org.apache.tuweni.plumtree.Peer; - public StateActor create(State state); +import io.vertx.core.net.NetSocket; + +/** + * Vert.x gossip peer associated with a socket + */ +final class SocketPeer implements Peer { + + private final NetSocket socket; + + SocketPeer(NetSocket socket) { + this.socket = socket; + } + + NetSocket socket() { + return socket; + } + + @Override + public String toString() { + return socket.localAddress().toString(); + } } diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java new file mode 100644 index 0000000..303ac60 --- /dev/null +++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.tuweni.plumtree.vertx; + +import org.apache.tuweni.bytes.Bytes; +import org.apache.tuweni.concurrent.AsyncCompletion; +import org.apache.tuweni.concurrent.CompletableAsyncCompletion; +import org.apache.tuweni.plumtree.MessageHashing; +import org.apache.tuweni.plumtree.MessageSender; +import org.apache.tuweni.plumtree.MessageValidator; +import org.apache.tuweni.plumtree.Peer; +import org.apache.tuweni.plumtree.PeerRepository; +import org.apache.tuweni.plumtree.State; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.net.NetClient; +import io.vertx.core.net.NetServer; + +/** + * Vert.x implementation of the plumtree gossip. + * + * This implementation is provided as an example and relies on a simplistic JSON serialization of messages. + * + */ +public final class VertxGossipServer { + + private static final ObjectMapper mapper = new ObjectMapper(); + + private static final class Message { + + public MessageSender.Verb verb; + public String payload; + } + private final class SocketHandler { + + private final Peer peer; + + SocketHandler(Peer peer) { + this.peer = peer; + state.addPeer(peer); + } + + private Bytes buffer = Bytes.EMPTY; + + void handle(Buffer data) { + buffer = Bytes.concatenate(buffer, Bytes.wrapBuffer(data)); + Message message; + try { + JsonParser parser = mapper.getFactory().createParser(buffer.toArrayUnsafe()); + message = parser.readValueAs(Message.class); + buffer = buffer.slice((int) parser.getCurrentLocation().getByteOffset()); + } catch (IOException e) { + return; + } + + switch (message.verb) { + case IHAVE: + state.receiveIHaveMessage(peer, Bytes.fromHexString(message.payload)); + break; + case GOSSIP: + state.receiveGossipMessage(peer, Bytes.fromHexString(message.payload)); + break; + case GRAFT: + state.receiveGraftMessage(peer, Bytes.fromHexString(message.payload)); + break; + case PRUNE: + state.receivePruneMessage(peer); + break; + } + } + + void close(Void aVoid) { + state.removePeer(peer); + } + } + + private final AtomicBoolean started = new AtomicBoolean(false); + private final Vertx vertx; + private final int port; + private final MessageHashing messageHashing; + private final String networkInterface; + private final PeerRepository peerRepository; + private final Consumer<Bytes> payloadListener; + private final MessageValidator payloadValidator; + private State state; + private NetServer server; + private NetClient client; + + public VertxGossipServer( + Vertx vertx, + String networkInterface, + int port, + MessageHashing messageHashing, + PeerRepository peerRepository, + Consumer<Bytes> payloadListener, + MessageValidator payloadValidator) { + this.vertx = vertx; + this.networkInterface = networkInterface; + this.port = port; + this.messageHashing = messageHashing; + this.peerRepository = peerRepository; + this.payloadListener = payloadListener; + this.payloadValidator = payloadValidator; + } + + public AsyncCompletion start() { + if (started.compareAndSet(false, true)) { + CompletableAsyncCompletion completion = AsyncCompletion.incomplete(); + server = vertx.createNetServer(); + client = vertx.createNetClient(); + server.connectHandler(socket -> { + Peer peer = new SocketPeer(socket); + SocketHandler handler = new SocketHandler(peer); + socket.handler(handler::handle).closeHandler(handler::close); + }); + server.listen(port, networkInterface, res -> { + if (res.failed()) { + completion.completeExceptionally(res.cause()); + } else { + state = new State(peerRepository, messageHashing, (verb, peer, payload) -> { + Message message = new Message(); + message.verb = verb; + message.payload = payload == null ? null : payload.toHexString(); + try { + ((SocketPeer) peer).socket().write(Buffer.buffer(mapper.writeValueAsBytes(message))); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + }, payloadListener, payloadValidator); + + completion.complete(); + } + }); + + return completion; + } else { + return AsyncCompletion.completed(); + } + } + + public AsyncCompletion stop() { + if (started.compareAndSet(true, false)) { + CompletableAsyncCompletion completion = AsyncCompletion.incomplete(); + + state.stop(); + client.close(); + server.close(res -> { + if (res.failed()) { + completion.completeExceptionally(res.cause()); + } else { + completion.complete(); + } + }); + + return completion; + } + return AsyncCompletion.completed(); + } + + public AsyncCompletion connectTo(String host, int port) { + if (!started.get()) { + throw new IllegalStateException("Server has not started"); + } + CompletableAsyncCompletion completion = AsyncCompletion.incomplete(); + client.connect(port, host, res -> { + if (res.failed()) { + completion.completeExceptionally(res.cause()); + } else { + completion.complete(); + Peer peer = new SocketPeer(res.result()); + SocketHandler handler = new SocketHandler(peer); + res.result().handler(handler::handle).closeHandler(handler::close); + } + }); + + return completion; + } + + /** + * Gossip a message to all known peers. + * + * @param message the payload to propagate + */ + public void gossip(Bytes message) { + if (!started.get()) { + throw new IllegalStateException("Server has not started"); + } + state.sendGossipMessage(message); + } +} diff --git a/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java b/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java index ef4cade..3dfb206 100644 --- a/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java +++ b/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java @@ -21,12 +21,18 @@ import org.apache.tuweni.bytes.Bytes32; import org.apache.tuweni.crypto.Hash; import org.apache.tuweni.junit.BouncyCastleExtension; +import java.util.concurrent.atomic.AtomicReference; + import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @ExtendWith(BouncyCastleExtension.class) class StateTest { + private static class PeerImpl implements Peer { + + } + private static class MockMessageSender implements MessageSender { Verb verb; @@ -42,10 +48,12 @@ class StateTest { } } + private static final AtomicReference<Bytes> messageRef = new AtomicReference<>(); + @Test void testInitialState() { EphemeralPeerRepository repo = new EphemeralPeerRepository(); - State state = new State(repo, Hash::keccak256, new MockMessageSender()); + State state = new State(repo, Hash::keccak256, new MockMessageSender(), messageRef::set, (message, peer) -> true); assertTrue(repo.peers().isEmpty()); assertTrue(repo.lazyPushPeers().isEmpty()); assertTrue(repo.eagerPushPeers().isEmpty()); @@ -54,10 +62,10 @@ class StateTest { @Test void firstRoundWithThreePeers() { EphemeralPeerRepository repo = new EphemeralPeerRepository(); - State state = new State(repo, Hash::keccak256, new MockMessageSender()); - state.addPeer(new Peer()); - state.addPeer(new Peer()); - state.addPeer(new Peer()); + State state = new State(repo, Hash::keccak256, new MockMessageSender(), messageRef::set, (message, peer) -> true); + state.addPeer(new PeerImpl()); + state.addPeer(new PeerImpl()); + state.addPeer(new PeerImpl()); assertTrue(repo.lazyPushPeers().isEmpty()); assertEquals(3, repo.eagerPushPeers().size()); } @@ -65,8 +73,8 @@ class StateTest { @Test void removePeer() { EphemeralPeerRepository repo = new EphemeralPeerRepository(); - State state = new State(repo, Hash::keccak256, new MockMessageSender()); - Peer peer = new Peer(); + State state = new State(repo, Hash::keccak256, new MockMessageSender(), messageRef::set, (message, peer) -> true); + Peer peer = new PeerImpl(); state.addPeer(peer); state.removePeer(peer); assertTrue(repo.peers().isEmpty()); @@ -77,8 +85,8 @@ class StateTest { @Test void prunePeer() { EphemeralPeerRepository repo = new EphemeralPeerRepository(); - State state = new State(repo, Hash::keccak256, new MockMessageSender()); - Peer peer = new Peer(); + State state = new State(repo, Hash::keccak256, new MockMessageSender(), messageRef::set, (message, peer) -> true); + Peer peer = new PeerImpl(); state.addPeer(peer); state.receivePruneMessage(peer); assertEquals(0, repo.eagerPushPeers().size()); @@ -88,8 +96,8 @@ class StateTest { @Test void graftPeer() { EphemeralPeerRepository repo = new EphemeralPeerRepository(); - State state = new State(repo, Hash::keccak256, new MockMessageSender()); - Peer peer = new Peer(); + State state = new State(repo, Hash::keccak256, new MockMessageSender(), messageRef::set, (message, peer) -> true); + Peer peer = new PeerImpl(); state.addPeer(peer); state.receivePruneMessage(peer); assertEquals(0, repo.eagerPushPeers().size()); @@ -103,13 +111,13 @@ class StateTest { void receiveFullMessageFromEagerPeer() { EphemeralPeerRepository repo = new EphemeralPeerRepository(); MockMessageSender messageSender = new MockMessageSender(); - State state = new State(repo, Hash::keccak256, messageSender); - Peer peer = new Peer(); + State state = new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true); + Peer peer = new PeerImpl(); state.addPeer(peer); - Peer otherPeer = new Peer(); + Peer otherPeer = new PeerImpl(); state.addPeer(otherPeer); Bytes32 msg = Bytes32.random(); - state.receiveFullMessage(peer, msg); + state.receiveGossipMessage(peer, msg); assertEquals(msg, messageSender.payload); assertEquals(otherPeer, messageSender.peer); } @@ -118,16 +126,16 @@ class StateTest { void receiveFullMessageFromEagerPeerWithALazyPeer() { EphemeralPeerRepository repo = new EphemeralPeerRepository(); MockMessageSender messageSender = new MockMessageSender(); - State state = new State(repo, Hash::keccak256, messageSender); - Peer peer = new Peer(); + State state = new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true); + Peer peer = new PeerImpl(); state.addPeer(peer); - Peer otherPeer = new Peer(); + Peer otherPeer = new PeerImpl(); state.addPeer(otherPeer); Bytes32 msg = Bytes32.random(); - Peer lazyPeer = new Peer(); + Peer lazyPeer = new PeerImpl(); state.addPeer(lazyPeer); repo.moveToLazy(lazyPeer); - state.receiveFullMessage(peer, msg); + state.receiveGossipMessage(peer, msg); assertEquals(msg, messageSender.payload); assertEquals(otherPeer, messageSender.peer); state.processQueue(); @@ -140,15 +148,15 @@ class StateTest { void receiveFullMessageFromEagerPeerThenPartialMessageFromLazyPeer() { EphemeralPeerRepository repo = new EphemeralPeerRepository(); MockMessageSender messageSender = new MockMessageSender(); - State state = new State(repo, Hash::keccak256, messageSender); - Peer peer = new Peer(); + State state = new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true); + Peer peer = new PeerImpl(); state.addPeer(peer); - Peer lazyPeer = new Peer(); + Peer lazyPeer = new PeerImpl(); state.addPeer(lazyPeer); repo.moveToLazy(lazyPeer); Bytes message = Bytes32.random(); - state.receiveFullMessage(peer, message); - state.receiveHeaderMessage(lazyPeer, message); + state.receiveGossipMessage(peer, message); + state.receiveIHaveMessage(lazyPeer, message); assertNull(messageSender.payload); assertNull(messageSender.peer); } @@ -157,14 +165,14 @@ class StateTest { void receivePartialMessageFromLazyPeerAndNoFullMessage() throws Exception { EphemeralPeerRepository repo = new EphemeralPeerRepository(); MockMessageSender messageSender = new MockMessageSender(); - State state = new State(repo, Hash::keccak256, messageSender, 100, 4000); - Peer peer = new Peer(); + State state = new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true, 100, 4000); + Peer peer = new PeerImpl(); state.addPeer(peer); - Peer lazyPeer = new Peer(); + Peer lazyPeer = new PeerImpl(); state.addPeer(lazyPeer); repo.moveToLazy(lazyPeer); Bytes message = Bytes32.random(); - state.receiveHeaderMessage(lazyPeer, message); + state.receiveIHaveMessage(lazyPeer, message); Thread.sleep(200); assertEquals(message, messageSender.payload); assertEquals(lazyPeer, messageSender.peer); @@ -175,16 +183,16 @@ class StateTest { void receivePartialMessageFromLazyPeerAndThenFullMessage() throws Exception { EphemeralPeerRepository repo = new EphemeralPeerRepository(); MockMessageSender messageSender = new MockMessageSender(); - State state = new State(repo, Hash::keccak256, messageSender, 500, 4000); - Peer peer = new Peer(); + State state = new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true, 500, 4000); + Peer peer = new PeerImpl(); state.addPeer(peer); - Peer lazyPeer = new Peer(); + Peer lazyPeer = new PeerImpl(); state.addPeer(lazyPeer); repo.moveToLazy(lazyPeer); Bytes message = Bytes32.random(); - state.receiveHeaderMessage(lazyPeer, Hash.keccak256(message)); + state.receiveIHaveMessage(lazyPeer, Hash.keccak256(message)); Thread.sleep(100); - state.receiveFullMessage(peer, message); + state.receiveGossipMessage(peer, message); Thread.sleep(500); assertNull(messageSender.verb); assertNull(messageSender.payload); @@ -195,10 +203,10 @@ class StateTest { void receiveFullMessageFromUnknownPeer() { EphemeralPeerRepository repo = new EphemeralPeerRepository(); MockMessageSender messageSender = new MockMessageSender(); - State state = new State(repo, Hash::keccak256, messageSender); - Peer peer = new Peer(); + State state = new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true); + Peer peer = new PeerImpl(); Bytes message = Bytes32.random(); - state.receiveFullMessage(peer, message); + state.receiveGossipMessage(peer, message); assertEquals(1, repo.eagerPushPeers().size()); assertEquals(0, repo.lazyPushPeers().size()); assertEquals(peer, repo.eagerPushPeers().iterator().next()); @@ -208,12 +216,12 @@ class StateTest { void prunePeerWhenReceivingTwiceTheSameFullMessage() { EphemeralPeerRepository repo = new EphemeralPeerRepository(); MockMessageSender messageSender = new MockMessageSender(); - State state = new State(repo, Hash::keccak256, messageSender); - Peer peer = new Peer(); - Peer secondPeer = new Peer(); + State state = new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true); + Peer peer = new PeerImpl(); + Peer secondPeer = new PeerImpl(); Bytes message = Bytes32.random(); - state.receiveFullMessage(peer, message); - state.receiveFullMessage(secondPeer, message); + state.receiveGossipMessage(peer, message); + state.receiveGossipMessage(secondPeer, message); assertEquals(1, repo.eagerPushPeers().size()); assertEquals(1, repo.lazyPushPeers().size()); assertNull(messageSender.payload); diff --git a/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java b/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java new file mode 100644 index 0000000..f465afb --- /dev/null +++ b/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.tuweni.plumtree.vertx; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.apache.tuweni.bytes.Bytes; +import org.apache.tuweni.junit.VertxExtension; +import org.apache.tuweni.junit.VertxInstance; +import org.apache.tuweni.plumtree.EphemeralPeerRepository; + +import java.util.concurrent.atomic.AtomicReference; + +import io.vertx.core.Vertx; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(VertxExtension.class) +class VertxGossipServerTest { + + @Test + void gossipDeadBeefToOtherNode(@VertxInstance Vertx vertx) throws Exception { + + AtomicReference<Bytes> messageReceived1 = new AtomicReference<>(); + AtomicReference<Bytes> messageReceived2 = new AtomicReference<>(); + + VertxGossipServer server1 = new VertxGossipServer( + vertx, + "127.0.0.1", + 10000, + bytes -> bytes, + new EphemeralPeerRepository(), + messageReceived1::set, + (message, peer) -> true); + VertxGossipServer server2 = new VertxGossipServer( + vertx, + "127.0.0.1", + 10001, + bytes -> bytes, + new EphemeralPeerRepository(), + messageReceived2::set, + (message, peer) -> true); + + server1.start().join(); + server2.start().join(); + + server1.connectTo("127.0.0.1", 10001).join(); + server1.gossip(Bytes.fromHexString("deadbeef")); + Thread.sleep(1000); + assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.get()); + + server1.stop().join(); + server2.stop().join(); + } + + @Test + void gossipDeadBeefToTwoOtherNodes(@VertxInstance Vertx vertx) throws Exception { + + AtomicReference<Bytes> messageReceived1 = new AtomicReference<>(); + AtomicReference<Bytes> messageReceived2 = new AtomicReference<>(); + AtomicReference<Bytes> messageReceived3 = new AtomicReference<>(); + + VertxGossipServer server1 = new VertxGossipServer( + vertx, + "127.0.0.1", + 10000, + bytes -> bytes, + new EphemeralPeerRepository(), + messageReceived1::set, + (message, peer) -> true); + VertxGossipServer server2 = new VertxGossipServer( + vertx, + "127.0.0.1", + 10001, + bytes -> bytes, + new EphemeralPeerRepository(), + messageReceived2::set, + (message, peer) -> true); + VertxGossipServer server3 = new VertxGossipServer( + vertx, + "127.0.0.1", + 10002, + bytes -> bytes, + new EphemeralPeerRepository(), + messageReceived3::set, + (message, peer) -> true); + + server1.start().join(); + server2.start().join(); + server3.start().join(); + + server1.connectTo("127.0.0.1", 10001).join(); + server3.connectTo("127.0.0.1", 10001).join(); + server1.gossip(Bytes.fromHexString("deadbeef")); + Thread.sleep(1000); + assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.get()); + assertEquals(Bytes.fromHexString("deadbeef"), messageReceived3.get()); + + server1.stop().join(); + server2.stop().join(); + } + + @Test + void gossipCollision(@VertxInstance Vertx vertx) throws Exception { + AtomicReference<Bytes> messageReceived1 = new AtomicReference<>(); + AtomicReference<Bytes> messageReceived2 = new AtomicReference<>(); + + EphemeralPeerRepository peerRepository1 = new EphemeralPeerRepository(); + EphemeralPeerRepository peerRepository3 = new EphemeralPeerRepository(); + + VertxGossipServer server1 = new VertxGossipServer( + vertx, + "127.0.0.1", + 10000, + bytes -> bytes, + peerRepository1, + messageReceived1::set, + (message, peer) -> true); + VertxGossipServer server2 = new VertxGossipServer( + vertx, + "127.0.0.1", + 10001, + bytes -> bytes, + new EphemeralPeerRepository(), + messageReceived2::set, + (message, peer) -> true); + VertxGossipServer server3 = new VertxGossipServer( + vertx, + "127.0.0.1", + 10002, + bytes -> bytes, + peerRepository3, + messageReceived2::set, + (message, peer) -> true); + + server1.start().join(); + server2.start().join(); + server3.start().join(); + + server1.connectTo("127.0.0.1", 10001).join(); + server2.connectTo("127.0.0.1", 10002).join(); + server1.connectTo("127.0.0.1", 10002).join(); + server1.gossip(Bytes.fromHexString("deadbeef")); + Thread.sleep(1000); + assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.get()); + Thread.sleep(1000); + + assertTrue(peerRepository1.lazyPushPeers().size() == 1 || peerRepository3.lazyPushPeers().size() == 1); + + server1.stop().join(); + server2.stop().join(); + server3.stop().join(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
