This is an automated email from the ASF dual-hosted git repository. jrhea pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
commit 7324371957196b9bedd404ed607dbc7c8f9b296f Author: jonny rhea <[email protected]> AuthorDate: Mon Apr 22 18:10:49 2019 -0500 added message type to gossip header --- .../org/apache/tuweni/plumtree/MessageSender.java | 13 ++++++-- .../java/org/apache/tuweni/plumtree/State.java | 25 +++++++++------ .../tuweni/plumtree/vertx/VertxGossipServer.java | 10 +++--- .../java/org/apache/tuweni/plumtree/StateTest.java | 37 +++++++++++++++++----- .../plumtree/vertx/VertxGossipServerTest.java | 7 ++-- 5 files changed, 65 insertions(+), 27 deletions(-) diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java index ea18d67..5ff8bb6 100644 --- a/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java +++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java @@ -22,19 +22,28 @@ import javax.annotation.Nullable; public interface MessageSender { /** - * Types of message supported by the dialect + * Types of verbs supported by the dialect */ enum Verb { IHAVE, GRAFT, PRUNE, GOSSIP } /** + * Types of message supported by the dialect + */ + enum Type { + BLOCK, ATTESTATION, NADA + } + + /** * Sends bytes to a peer. * * @param verb the type of message + * @param type the type of message * @param peer the target of the message * @param hash the hash of the message * @param payload the bytes to send */ - void sendMessage(Verb verb, Peer peer, Bytes hash, @Nullable Bytes payload); + void sendMessage(Verb verb, Type type, Peer peer, Bytes hash, @Nullable Bytes payload); + } diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java index b65fdbe..5b199e0 100644 --- a/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java +++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java @@ -71,7 +71,7 @@ public final class State { * @param sender the sender - may be null if we are submitting this message to the network * @param message the payload to send to the network */ - void fullMessageReceived(@Nullable Peer sender, Bytes message) { + void fullMessageReceived(@Nullable Peer sender, MessageSender.Type type, Bytes message) { if (receivedFullMessage.compareAndSet(false, true)) { for (TimerTask task : tasks) { task.cancel(); @@ -80,7 +80,7 @@ public final class State { if (sender == null || messageValidator.validate(message, sender)) { for (Peer peer : peerRepository.eagerPushPeers()) { if (sender == null || !sender.equals(peer)) { - messageSender.sendMessage(MessageSender.Verb.GOSSIP, peer, hash, message); + messageSender.sendMessage(MessageSender.Verb.GOSSIP, type, peer, hash, message); } } lazyQueue.addAll( @@ -88,7 +88,9 @@ public final class State { .lazyPushPeers() .stream() .filter(p -> !lazyPeers.contains(p)) - .map(peer -> (Runnable) (() -> messageSender.sendMessage(MessageSender.Verb.IHAVE, peer, hash, null))) + .map( + peer -> (Runnable) (() -> messageSender + .sendMessage(MessageSender.Verb.IHAVE, MessageSender.Type.NADA, peer, hash, null))) .collect(Collectors.toList())); if (sender != null) { messageListener.accept(message); @@ -96,7 +98,7 @@ public final class State { } } else { if (sender != null) { - messageSender.sendMessage(MessageSender.Verb.PRUNE, sender, hash, null); + messageSender.sendMessage(MessageSender.Verb.PRUNE, MessageSender.Type.NADA, sender, hash, null); peerRepository.moveToLazy(sender); } } @@ -110,7 +112,8 @@ public final class State { if (newPeerIndex == lazyPeers.size()) { newPeerIndex = 0; } - messageSender.sendMessage(MessageSender.Verb.GRAFT, lazyPeers.get(index), hash, null); + messageSender + .sendMessage(MessageSender.Verb.GRAFT, MessageSender.Type.NADA, lazyPeers.get(index), hash, null); scheduleGraftMessage(newPeerIndex++); } }; @@ -203,12 +206,13 @@ public final class State { * Records a message was received in full from a peer. * * @param peer the peer that sent the message + * @param type message type of the message * @param message the hash of the message */ - public void receiveGossipMessage(Peer peer, Bytes message) { + public void receiveGossipMessage(Peer peer, MessageSender.Type type, Bytes message) { peerRepository.considerNewPeer(peer); MessageHandler handler = messageHandlers.computeIfAbsent(messageHashingFunction.hash(message), MessageHandler::new); - handler.fullMessageReceived(peer, message); + handler.fullMessageReceived(peer, type, message); } /** @@ -239,19 +243,20 @@ public final class State { */ public void receiveGraftMessage(Peer peer, Bytes messageHash) { peerRepository.moveToEager(peer); - messageSender.sendMessage(MessageSender.Verb.GOSSIP, peer, messageHash, null); + messageSender.sendMessage(MessageSender.Verb.GOSSIP, MessageSender.Type.NADA, peer, messageHash, null); } /** * Sends a gossip message to all peers, according to their status. * * @param message the message to propagate + * @param type message type of the message * @return The associated hash of the message */ - public Bytes sendGossipMessage(Bytes message) { + public Bytes sendGossipMessage(MessageSender.Type type, Bytes message) { Bytes messageHash = messageHashingFunction.hash(message); MessageHandler handler = messageHandlers.computeIfAbsent(messageHash, MessageHandler::new); - handler.fullMessageReceived(null, message); + handler.fullMessageReceived(null, type, message); return messageHash; } diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java index fe2cf24..ae10f43 100644 --- a/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java +++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java @@ -47,6 +47,7 @@ public final class VertxGossipServer { private static final class Message { public MessageSender.Verb verb; + public MessageSender.Type type; public String hash; public String payload; } @@ -77,7 +78,7 @@ public final class VertxGossipServer { state.receiveIHaveMessage(peer, Bytes.fromHexString(message.payload)); break; case GOSSIP: - state.receiveGossipMessage(peer, Bytes.fromHexString(message.payload)); + state.receiveGossipMessage(peer, message.type, Bytes.fromHexString(message.payload)); break; case GRAFT: state.receiveGraftMessage(peer, Bytes.fromHexString(message.payload)); @@ -136,9 +137,10 @@ public final class VertxGossipServer { if (res.failed()) { completion.completeExceptionally(res.cause()); } else { - state = new State(peerRepository, messageHashing, (verb, peer, hash, payload) -> { + state = new State(peerRepository, messageHashing, (verb, type, peer, hash, payload) -> { Message message = new Message(); message.verb = verb; + message.type = type; message.hash = hash.toHexString(); message.payload = payload == null ? null : payload.toHexString(); try { @@ -201,10 +203,10 @@ public final class VertxGossipServer { * * @param message the payload to propagate */ - public void gossip(Bytes message) { + public void gossip(MessageSender.Type type, Bytes message) { if (!started.get()) { throw new IllegalStateException("Server has not started"); } - state.sendGossipMessage(message); + state.sendGossipMessage(type, message); } } diff --git a/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java b/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java index be5f110..f245bbc 100644 --- a/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java +++ b/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java @@ -36,13 +36,15 @@ class StateTest { private static class MockMessageSender implements MessageSender { Verb verb; + Type type; Peer peer; Bytes hash; Bytes payload; @Override - public void sendMessage(Verb verb, Peer peer, Bytes hash, Bytes payload) { + public void sendMessage(Verb verb, Type type, Peer peer, Bytes hash, Bytes payload) { this.verb = verb; + this.type = type; this.peer = peer; this.hash = hash; this.payload = payload; @@ -73,6 +75,25 @@ class StateTest { } @Test + void firstRoundWithTwoPeers() { + EphemeralPeerRepository repo = new EphemeralPeerRepository(); + State state = new State(repo, Hash::keccak256, new MockMessageSender(), messageRef::set, (message, peer) -> true); + state.addPeer(new PeerImpl()); + state.addPeer(new PeerImpl()); + assertTrue(repo.lazyPushPeers().isEmpty()); + assertEquals(2, repo.eagerPushPeers().size()); + } + + @Test + void firstRoundWithOnePeer() { + EphemeralPeerRepository repo = new EphemeralPeerRepository(); + State state = new State(repo, Hash::keccak256, new MockMessageSender(), messageRef::set, (message, peer) -> true); + state.addPeer(new PeerImpl()); + assertTrue(repo.lazyPushPeers().isEmpty()); + assertEquals(1, repo.eagerPushPeers().size()); + } + + @Test void removePeer() { EphemeralPeerRepository repo = new EphemeralPeerRepository(); State state = new State(repo, Hash::keccak256, new MockMessageSender(), messageRef::set, (message, peer) -> true); @@ -119,7 +140,7 @@ class StateTest { Peer otherPeer = new PeerImpl(); state.addPeer(otherPeer); Bytes32 msg = Bytes32.random(); - state.receiveGossipMessage(peer, msg); + state.receiveGossipMessage(peer, MessageSender.Type.BLOCK, msg); assertEquals(msg, messageSender.payload); assertEquals(otherPeer, messageSender.peer); } @@ -137,7 +158,7 @@ class StateTest { Peer lazyPeer = new PeerImpl(); state.addPeer(lazyPeer); repo.moveToLazy(lazyPeer); - state.receiveGossipMessage(peer, msg); + state.receiveGossipMessage(peer, MessageSender.Type.BLOCK, msg); assertEquals(msg, messageSender.payload); assertEquals(otherPeer, messageSender.peer); state.processQueue(); @@ -157,7 +178,7 @@ class StateTest { state.addPeer(lazyPeer); repo.moveToLazy(lazyPeer); Bytes message = Bytes32.random(); - state.receiveGossipMessage(peer, message); + state.receiveGossipMessage(peer, MessageSender.Type.BLOCK, message); state.receiveIHaveMessage(lazyPeer, message); assertNull(messageSender.payload); assertNull(messageSender.peer); @@ -194,7 +215,7 @@ class StateTest { Bytes message = Bytes32.random(); state.receiveIHaveMessage(lazyPeer, Hash.keccak256(message)); Thread.sleep(100); - state.receiveGossipMessage(peer, message); + state.receiveGossipMessage(peer, MessageSender.Type.BLOCK, message); Thread.sleep(500); assertNull(messageSender.verb); assertNull(messageSender.payload); @@ -208,7 +229,7 @@ class StateTest { State state = new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true); Peer peer = new PeerImpl(); Bytes message = Bytes32.random(); - state.receiveGossipMessage(peer, message); + state.receiveGossipMessage(peer, MessageSender.Type.BLOCK, message); assertEquals(1, repo.eagerPushPeers().size()); assertEquals(0, repo.lazyPushPeers().size()); assertEquals(peer, repo.eagerPushPeers().iterator().next()); @@ -222,8 +243,8 @@ class StateTest { Peer peer = new PeerImpl(); Peer secondPeer = new PeerImpl(); Bytes message = Bytes32.random(); - state.receiveGossipMessage(peer, message); - state.receiveGossipMessage(secondPeer, message); + state.receiveGossipMessage(peer, MessageSender.Type.BLOCK, message); + state.receiveGossipMessage(secondPeer, MessageSender.Type.BLOCK, message); assertEquals(1, repo.eagerPushPeers().size()); assertEquals(1, repo.lazyPushPeers().size()); assertNull(messageSender.payload); diff --git a/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java b/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java index d6e6106..e6ebe71 100644 --- a/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java +++ b/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java @@ -20,6 +20,7 @@ import org.apache.tuweni.bytes.Bytes; import org.apache.tuweni.junit.VertxExtension; import org.apache.tuweni.junit.VertxInstance; import org.apache.tuweni.plumtree.EphemeralPeerRepository; +import org.apache.tuweni.plumtree.MessageSender; import java.util.concurrent.atomic.AtomicReference; @@ -57,7 +58,7 @@ class VertxGossipServerTest { server2.start().join(); server1.connectTo("127.0.0.1", 10001).join(); - server1.gossip(Bytes.fromHexString("deadbeef")); + server1.gossip(MessageSender.Type.BLOCK, Bytes.fromHexString("deadbeef")); Thread.sleep(1000); assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.get()); @@ -103,7 +104,7 @@ class VertxGossipServerTest { server1.connectTo("127.0.0.1", 10001).join(); server3.connectTo("127.0.0.1", 10001).join(); - server1.gossip(Bytes.fromHexString("deadbeef")); + server1.gossip(MessageSender.Type.BLOCK, Bytes.fromHexString("deadbeef")); Thread.sleep(1000); assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.get()); assertEquals(Bytes.fromHexString("deadbeef"), messageReceived3.get()); @@ -153,7 +154,7 @@ class VertxGossipServerTest { server1.connectTo("127.0.0.1", 10001).join(); server2.connectTo("127.0.0.1", 10002).join(); server1.connectTo("127.0.0.1", 10002).join(); - server1.gossip(Bytes.fromHexString("deadbeef")); + server1.gossip(MessageSender.Type.BLOCK, Bytes.fromHexString("deadbeef")); Thread.sleep(1000); assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.get()); Thread.sleep(1000); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
