This is an automated email from the ASF dual-hosted git repository. jrhea pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
commit b98d45931122a02a6582d62565601acad585cf0b Author: jonny rhea <[email protected]> AuthorDate: Mon Apr 22 22:25:37 2019 -0500 changed message_type to generic attribute string --- .../org/apache/tuweni/plumtree/MessageSender.java | 11 ++------- .../java/org/apache/tuweni/plumtree/State.java | 25 ++++++++++----------- .../tuweni/plumtree/vertx/VertxGossipServer.java | 14 ++++++------ .../java/org/apache/tuweni/plumtree/StateTest.java | 26 +++++++++++++--------- .../plumtree/vertx/VertxGossipServerTest.java | 10 +++++---- 5 files changed, 43 insertions(+), 43 deletions(-) diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java index 5ff8bb6..dc59235 100644 --- a/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java +++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java @@ -29,21 +29,14 @@ public interface MessageSender { } /** - * Types of message supported by the dialect - */ - enum Type { - BLOCK, ATTESTATION, NADA - } - - /** * Sends bytes to a peer. * * @param verb the type of message - * @param type the type of message + * @param attributes the attributes of message * @param peer the target of the message * @param hash the hash of the message * @param payload the bytes to send */ - void sendMessage(Verb verb, Type type, Peer peer, Bytes hash, @Nullable Bytes payload); + void sendMessage(Verb verb, String attributes, Peer peer, Bytes hash, @Nullable Bytes payload); } diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java index 5b199e0..7151a45 100644 --- a/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java +++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java @@ -71,7 +71,7 @@ public final class State { * @param sender the sender - may be null if we are submitting this message to the network * @param message the payload to send to the network */ - void fullMessageReceived(@Nullable Peer sender, MessageSender.Type type, Bytes message) { + void fullMessageReceived(@Nullable Peer sender, String attributes, Bytes message) { if (receivedFullMessage.compareAndSet(false, true)) { for (TimerTask task : tasks) { task.cancel(); @@ -80,7 +80,7 @@ public final class State { if (sender == null || messageValidator.validate(message, sender)) { for (Peer peer : peerRepository.eagerPushPeers()) { if (sender == null || !sender.equals(peer)) { - messageSender.sendMessage(MessageSender.Verb.GOSSIP, type, peer, hash, message); + messageSender.sendMessage(MessageSender.Verb.GOSSIP, attributes, peer, hash, message); } } lazyQueue.addAll( @@ -90,7 +90,7 @@ public final class State { .filter(p -> !lazyPeers.contains(p)) .map( peer -> (Runnable) (() -> messageSender - .sendMessage(MessageSender.Verb.IHAVE, MessageSender.Type.NADA, peer, hash, null))) + .sendMessage(MessageSender.Verb.IHAVE, null, peer, hash, null))) .collect(Collectors.toList())); if (sender != null) { messageListener.accept(message); @@ -98,7 +98,7 @@ public final class State { } } else { if (sender != null) { - messageSender.sendMessage(MessageSender.Verb.PRUNE, MessageSender.Type.NADA, sender, hash, null); + messageSender.sendMessage(MessageSender.Verb.PRUNE, null, sender, hash, null); peerRepository.moveToLazy(sender); } } @@ -112,8 +112,7 @@ public final class State { if (newPeerIndex == lazyPeers.size()) { newPeerIndex = 0; } - messageSender - .sendMessage(MessageSender.Verb.GRAFT, MessageSender.Type.NADA, lazyPeers.get(index), hash, null); + messageSender.sendMessage(MessageSender.Verb.GRAFT, null, lazyPeers.get(index), hash, null); scheduleGraftMessage(newPeerIndex++); } }; @@ -206,13 +205,13 @@ public final class State { * Records a message was received in full from a peer. * * @param peer the peer that sent the message - * @param type message type of the message + * @param attributes of the message * @param message the hash of the message */ - public void receiveGossipMessage(Peer peer, MessageSender.Type type, Bytes message) { + public void receiveGossipMessage(Peer peer, String attributes, Bytes message) { peerRepository.considerNewPeer(peer); MessageHandler handler = messageHandlers.computeIfAbsent(messageHashingFunction.hash(message), MessageHandler::new); - handler.fullMessageReceived(peer, type, message); + handler.fullMessageReceived(peer, attributes, message); } /** @@ -243,20 +242,20 @@ public final class State { */ public void receiveGraftMessage(Peer peer, Bytes messageHash) { peerRepository.moveToEager(peer); - messageSender.sendMessage(MessageSender.Verb.GOSSIP, MessageSender.Type.NADA, peer, messageHash, null); + messageSender.sendMessage(MessageSender.Verb.GOSSIP, null, peer, messageHash, null); } /** * Sends a gossip message to all peers, according to their status. * * @param message the message to propagate - * @param type message type of the message + * @param attributes of the message * @return The associated hash of the message */ - public Bytes sendGossipMessage(MessageSender.Type type, Bytes message) { + public Bytes sendGossipMessage(String attributes, Bytes message) { Bytes messageHash = messageHashingFunction.hash(message); MessageHandler handler = messageHandlers.computeIfAbsent(messageHash, MessageHandler::new); - handler.fullMessageReceived(null, type, message); + handler.fullMessageReceived(null, attributes, message); return messageHash; } diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java index ae10f43..ddbb74c 100644 --- a/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java +++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java @@ -47,7 +47,7 @@ public final class VertxGossipServer { private static final class Message { public MessageSender.Verb verb; - public MessageSender.Type type; + public String attributes; public String hash; public String payload; } @@ -78,7 +78,7 @@ public final class VertxGossipServer { state.receiveIHaveMessage(peer, Bytes.fromHexString(message.payload)); break; case GOSSIP: - state.receiveGossipMessage(peer, message.type, Bytes.fromHexString(message.payload)); + state.receiveGossipMessage(peer, message.attributes, Bytes.fromHexString(message.payload)); break; case GRAFT: state.receiveGraftMessage(peer, Bytes.fromHexString(message.payload)); @@ -137,10 +137,10 @@ public final class VertxGossipServer { if (res.failed()) { completion.completeExceptionally(res.cause()); } else { - state = new State(peerRepository, messageHashing, (verb, type, peer, hash, payload) -> { + state = new State(peerRepository, messageHashing, (verb, attributes, peer, hash, payload) -> { Message message = new Message(); message.verb = verb; - message.type = type; + message.attributes = attributes; message.hash = hash.toHexString(); message.payload = payload == null ? null : payload.toHexString(); try { @@ -200,13 +200,13 @@ public final class VertxGossipServer { /** * Gossip a message to all known peers. - * + * @param attributes the payload to propagate * @param message the payload to propagate */ - public void gossip(MessageSender.Type type, Bytes message) { + public void gossip(String attributes, Bytes message) { if (!started.get()) { throw new IllegalStateException("Server has not started"); } - state.sendGossipMessage(type, message); + state.sendGossipMessage(attributes, message); } } diff --git a/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java b/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java index f245bbc..5c03da3 100644 --- a/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java +++ b/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java @@ -36,15 +36,15 @@ class StateTest { private static class MockMessageSender implements MessageSender { Verb verb; - Type type; + String attributes; Peer peer; Bytes hash; Bytes payload; @Override - public void sendMessage(Verb verb, Type type, Peer peer, Bytes hash, Bytes payload) { + public void sendMessage(Verb verb, String attributes, Peer peer, Bytes hash, Bytes payload) { this.verb = verb; - this.type = type; + this.attributes = attributes; this.peer = peer; this.hash = hash; this.payload = payload; @@ -140,7 +140,8 @@ class StateTest { Peer otherPeer = new PeerImpl(); state.addPeer(otherPeer); Bytes32 msg = Bytes32.random(); - state.receiveGossipMessage(peer, MessageSender.Type.BLOCK, msg); + String attributes = "{\"message_type\": \"BLOCK\"}"; + state.receiveGossipMessage(peer, attributes, msg); assertEquals(msg, messageSender.payload); assertEquals(otherPeer, messageSender.peer); } @@ -158,7 +159,8 @@ class StateTest { Peer lazyPeer = new PeerImpl(); state.addPeer(lazyPeer); repo.moveToLazy(lazyPeer); - state.receiveGossipMessage(peer, MessageSender.Type.BLOCK, msg); + String attributes = "{\"message_type\": \"BLOCK\"}"; + state.receiveGossipMessage(peer, attributes, msg); assertEquals(msg, messageSender.payload); assertEquals(otherPeer, messageSender.peer); state.processQueue(); @@ -178,7 +180,8 @@ class StateTest { state.addPeer(lazyPeer); repo.moveToLazy(lazyPeer); Bytes message = Bytes32.random(); - state.receiveGossipMessage(peer, MessageSender.Type.BLOCK, message); + String attributes = "{\"message_type\": \"BLOCK\"}"; + state.receiveGossipMessage(peer, attributes, message); state.receiveIHaveMessage(lazyPeer, message); assertNull(messageSender.payload); assertNull(messageSender.peer); @@ -215,7 +218,8 @@ class StateTest { Bytes message = Bytes32.random(); state.receiveIHaveMessage(lazyPeer, Hash.keccak256(message)); Thread.sleep(100); - state.receiveGossipMessage(peer, MessageSender.Type.BLOCK, message); + String attributes = "{\"message_type\": \"BLOCK\"}"; + state.receiveGossipMessage(peer, attributes, message); Thread.sleep(500); assertNull(messageSender.verb); assertNull(messageSender.payload); @@ -229,7 +233,8 @@ class StateTest { State state = new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true); Peer peer = new PeerImpl(); Bytes message = Bytes32.random(); - state.receiveGossipMessage(peer, MessageSender.Type.BLOCK, message); + String attributes = "{\"message_type\": \"BLOCK\"}"; + state.receiveGossipMessage(peer, attributes, message); assertEquals(1, repo.eagerPushPeers().size()); assertEquals(0, repo.lazyPushPeers().size()); assertEquals(peer, repo.eagerPushPeers().iterator().next()); @@ -243,8 +248,9 @@ class StateTest { Peer peer = new PeerImpl(); Peer secondPeer = new PeerImpl(); Bytes message = Bytes32.random(); - state.receiveGossipMessage(peer, MessageSender.Type.BLOCK, message); - state.receiveGossipMessage(secondPeer, MessageSender.Type.BLOCK, message); + String attributes = "{\"message_type\": \"BLOCK\"}"; + state.receiveGossipMessage(peer, attributes, message); + state.receiveGossipMessage(secondPeer, attributes, message); assertEquals(1, repo.eagerPushPeers().size()); assertEquals(1, repo.lazyPushPeers().size()); assertNull(messageSender.payload); diff --git a/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java b/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java index e6ebe71..384adb3 100644 --- a/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java +++ b/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java @@ -20,7 +20,6 @@ import org.apache.tuweni.bytes.Bytes; import org.apache.tuweni.junit.VertxExtension; import org.apache.tuweni.junit.VertxInstance; import org.apache.tuweni.plumtree.EphemeralPeerRepository; -import org.apache.tuweni.plumtree.MessageSender; import java.util.concurrent.atomic.AtomicReference; @@ -58,7 +57,8 @@ class VertxGossipServerTest { server2.start().join(); server1.connectTo("127.0.0.1", 10001).join(); - server1.gossip(MessageSender.Type.BLOCK, Bytes.fromHexString("deadbeef")); + String attributes = "{\"message_type\": \"BLOCK\"}"; + server1.gossip(attributes, Bytes.fromHexString("deadbeef")); Thread.sleep(1000); assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.get()); @@ -104,7 +104,8 @@ class VertxGossipServerTest { server1.connectTo("127.0.0.1", 10001).join(); server3.connectTo("127.0.0.1", 10001).join(); - server1.gossip(MessageSender.Type.BLOCK, Bytes.fromHexString("deadbeef")); + String attributes = "{\"message_type\": \"BLOCK\"}"; + server1.gossip(attributes, Bytes.fromHexString("deadbeef")); Thread.sleep(1000); assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.get()); assertEquals(Bytes.fromHexString("deadbeef"), messageReceived3.get()); @@ -154,7 +155,8 @@ class VertxGossipServerTest { server1.connectTo("127.0.0.1", 10001).join(); server2.connectTo("127.0.0.1", 10002).join(); server1.connectTo("127.0.0.1", 10002).join(); - server1.gossip(MessageSender.Type.BLOCK, Bytes.fromHexString("deadbeef")); + String attributes = "{\"message_type\": \"BLOCK\"}"; + server1.gossip(attributes, Bytes.fromHexString("deadbeef")); Thread.sleep(1000); assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.get()); Thread.sleep(1000); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
