This is an automated email from the ASF dual-hosted git repository. toulmean pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
commit 55bdb39a3e18569f3b9a09bc11bba6a1cb1818d1 Author: Antoine Toulme <[email protected]> AuthorDate: Mon Jun 10 16:35:34 2019 -0700 Allow consuming message body alongside its attributes --- .../java/org/apache/tuweni/gossip/GossipApp.java | 2 +- .../apache/tuweni/plumtree/MessageListener.java | 29 +++++++++++ .../java/org/apache/tuweni/plumtree/State.java | 9 ++-- .../tuweni/plumtree/vertx/VertxGossipServer.java | 6 +-- .../java/org/apache/tuweni/plumtree/StateTest.java | 40 +++++++++------ .../plumtree/vertx/VertxGossipServerTest.java | 60 +++++++++++++--------- 6 files changed, 95 insertions(+), 51 deletions(-) diff --git a/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java b/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java index 8866eee..8542997 100644 --- a/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java +++ b/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java @@ -100,7 +100,7 @@ public final class GossipApp { opts.listenPort(), Hash::keccak256, repository, - bytes -> readMessage(opts.messageLog(), errStream, bytes), + (bytes, attr) -> readMessage(opts.messageLog(), errStream, bytes), null, new CountingPeerPruningFunction(10), 100, diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageListener.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageListener.java new file mode 100644 index 0000000..4077e22 --- /dev/null +++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageListener.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.tuweni.plumtree; + +import org.apache.tuweni.bytes.Bytes; + +/** + * Listens to an incoming message, along with its attributes. + */ +public interface MessageListener { + + /** + * Consumes a message + * + * @param messageBody the body of the message + * @param attributes the attributes of the message + */ + public void listen(Bytes messageBody, String attributes); +} diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java index 463583b..eab5c5d 100644 --- a/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java +++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java @@ -24,7 +24,6 @@ import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -46,7 +45,7 @@ public final class State { }); private final MessageSender messageSender; - private final Consumer<Bytes> messageListener; + private final MessageListener messageListener; private final MessageValidator messageValidator; private final PeerPruning peerPruningFunction; final Queue<Runnable> lazyQueue = new ConcurrentLinkedQueue<>(); @@ -94,7 +93,7 @@ public final class State { .sendMessage(MessageSender.Verb.IHAVE, null, peer, hash, null))) .collect(Collectors.toList())); if (sender != null) { - messageListener.accept(message); + messageListener.listen(message, attributes); } } } else { @@ -146,7 +145,7 @@ public final class State { PeerRepository peerRepository, MessageHashing messageHashingFunction, MessageSender messageSender, - Consumer<Bytes> messageListener, + MessageListener messageListener, MessageValidator messageValidator, PeerPruning peerPruningFunction) { this( @@ -177,7 +176,7 @@ public final class State { PeerRepository peerRepository, MessageHashing messageHashingFunction, MessageSender messageSender, - Consumer<Bytes> messageListener, + MessageListener messageListener, MessageValidator messageValidator, PeerPruning peerPruningFunction, long graftDelay, diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java index 09afd6f..a3041b3 100644 --- a/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java +++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java @@ -16,6 +16,7 @@ import org.apache.tuweni.bytes.Bytes; import org.apache.tuweni.concurrent.AsyncCompletion; import org.apache.tuweni.concurrent.CompletableAsyncCompletion; import org.apache.tuweni.plumtree.MessageHashing; +import org.apache.tuweni.plumtree.MessageListener; import org.apache.tuweni.plumtree.MessageSender; import org.apache.tuweni.plumtree.MessageValidator; import org.apache.tuweni.plumtree.Peer; @@ -26,7 +27,6 @@ import org.apache.tuweni.plumtree.State; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; import javax.annotation.Nullable; import com.fasterxml.jackson.core.JsonParser; @@ -108,7 +108,7 @@ public final class VertxGossipServer { private final int lazyQueueInterval; private final MessageHashing messageHashing; private final String networkInterface; - private final Consumer<Bytes> payloadListener; + private final MessageListener payloadListener; private final MessageValidator payloadValidator; private final PeerPruning peerPruningFunction; private final PeerRepository peerRepository; @@ -124,7 +124,7 @@ public final class VertxGossipServer { int port, MessageHashing messageHashing, PeerRepository peerRepository, - Consumer<Bytes> payloadListener, + MessageListener payloadListener, @Nullable MessageValidator payloadValidator, @Nullable PeerPruning peerPruningFunction, int graftDelay, diff --git a/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java b/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java index 01a31cc..da5e592 100644 --- a/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java +++ b/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java @@ -21,8 +21,6 @@ import org.apache.tuweni.bytes.Bytes32; import org.apache.tuweni.crypto.Hash; import org.apache.tuweni.junit.BouncyCastleExtension; -import java.util.concurrent.atomic.AtomicReference; - import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -51,7 +49,15 @@ class StateTest { } } - private static final AtomicReference<Bytes> messageRef = new AtomicReference<>(); + private static final MessageListener messageListener = new MessageListener() { + + public Bytes message; + + @Override + public void listen(Bytes messageBody, String attributes) { + message = messageBody; + } + }; @Test void testInitialState() { @@ -60,7 +66,7 @@ class StateTest { repo, Hash::keccak256, new MockMessageSender(), - messageRef::set, + messageListener, (message, peer) -> true, (peer) -> true); assertTrue(repo.peers().isEmpty()); @@ -75,7 +81,7 @@ class StateTest { repo, Hash::keccak256, new MockMessageSender(), - messageRef::set, + messageListener, (message, peer) -> true, (peer) -> true); state.addPeer(new PeerImpl()); @@ -92,7 +98,7 @@ class StateTest { repo, Hash::keccak256, new MockMessageSender(), - messageRef::set, + messageListener, (message, peer) -> true, (peer) -> true); state.addPeer(new PeerImpl()); @@ -108,7 +114,7 @@ class StateTest { repo, Hash::keccak256, new MockMessageSender(), - messageRef::set, + messageListener, (message, peer) -> true, (peer) -> true); state.addPeer(new PeerImpl()); @@ -123,7 +129,7 @@ class StateTest { repo, Hash::keccak256, new MockMessageSender(), - messageRef::set, + messageListener, (message, peer) -> true, (peer) -> true); Peer peer = new PeerImpl(); @@ -141,7 +147,7 @@ class StateTest { repo, Hash::keccak256, new MockMessageSender(), - messageRef::set, + messageListener, (message, peer) -> true, (peer) -> true); Peer peer = new PeerImpl(); @@ -158,7 +164,7 @@ class StateTest { repo, Hash::keccak256, new MockMessageSender(), - messageRef::set, + messageListener, (message, peer) -> true, (peer) -> true); Peer peer = new PeerImpl(); @@ -176,7 +182,7 @@ class StateTest { EphemeralPeerRepository repo = new EphemeralPeerRepository(); MockMessageSender messageSender = new MockMessageSender(); State state = - new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true, (peer) -> true); + new State(repo, Hash::keccak256, messageSender, messageListener, (message, peer) -> true, (peer) -> true); Peer peer = new PeerImpl(); state.addPeer(peer); Peer otherPeer = new PeerImpl(); @@ -193,7 +199,7 @@ class StateTest { EphemeralPeerRepository repo = new EphemeralPeerRepository(); MockMessageSender messageSender = new MockMessageSender(); State state = - new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true, (peer) -> true); + new State(repo, Hash::keccak256, messageSender, messageListener, (message, peer) -> true, (peer) -> true); Peer peer = new PeerImpl(); state.addPeer(peer); Peer otherPeer = new PeerImpl(); @@ -218,7 +224,7 @@ class StateTest { EphemeralPeerRepository repo = new EphemeralPeerRepository(); MockMessageSender messageSender = new MockMessageSender(); State state = - new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true, (peer) -> true); + new State(repo, Hash::keccak256, messageSender, messageListener, (message, peer) -> true, (peer) -> true); Peer peer = new PeerImpl(); state.addPeer(peer); Peer lazyPeer = new PeerImpl(); @@ -240,7 +246,7 @@ class StateTest { repo, Hash::keccak256, messageSender, - messageRef::set, + messageListener, (message, peer) -> true, (peer) -> true, 100, @@ -266,7 +272,7 @@ class StateTest { repo, Hash::keccak256, messageSender, - messageRef::set, + messageListener, (message, peer) -> true, (peer) -> true, 500, @@ -292,7 +298,7 @@ class StateTest { EphemeralPeerRepository repo = new EphemeralPeerRepository(); MockMessageSender messageSender = new MockMessageSender(); State state = - new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true, (peer) -> true); + new State(repo, Hash::keccak256, messageSender, messageListener, (message, peer) -> true, (peer) -> true); Peer peer = new PeerImpl(); Bytes message = Bytes32.random(); String attributes = "{\"message_type\": \"BLOCK\"}"; @@ -307,7 +313,7 @@ class StateTest { EphemeralPeerRepository repo = new EphemeralPeerRepository(); MockMessageSender messageSender = new MockMessageSender(); State state = - new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true, (peer) -> true); + new State(repo, Hash::keccak256, messageSender, messageListener, (message, peer) -> true, (peer) -> true); Peer peer = new PeerImpl(); Peer secondPeer = new PeerImpl(); Bytes message = Bytes32.random(); diff --git a/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java b/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java index 29ae06c..fe29581 100644 --- a/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java +++ b/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java @@ -20,8 +20,7 @@ import org.apache.tuweni.bytes.Bytes; import org.apache.tuweni.junit.VertxExtension; import org.apache.tuweni.junit.VertxInstance; import org.apache.tuweni.plumtree.EphemeralPeerRepository; - -import java.util.concurrent.atomic.AtomicReference; +import org.apache.tuweni.plumtree.MessageListener; import io.vertx.core.Vertx; import org.junit.jupiter.api.Test; @@ -30,11 +29,21 @@ import org.junit.jupiter.api.extension.ExtendWith; @ExtendWith(VertxExtension.class) class VertxGossipServerTest { + private static class MessageListenerImpl implements MessageListener { + + public Bytes message; + + @Override + public void listen(Bytes messageBody, String attributes) { + message = messageBody; + } + } + @Test void gossipDeadBeefToOtherNode(@VertxInstance Vertx vertx) throws Exception { - AtomicReference<Bytes> messageReceived1 = new AtomicReference<>(); - AtomicReference<Bytes> messageReceived2 = new AtomicReference<>(); + MessageListenerImpl messageReceived1 = new MessageListenerImpl(); + MessageListenerImpl messageReceived2 = new MessageListenerImpl(); VertxGossipServer server1 = new VertxGossipServer( vertx, @@ -42,7 +51,7 @@ class VertxGossipServerTest { 10000, bytes -> bytes, new EphemeralPeerRepository(), - messageReceived1::set, + messageReceived1, (message, peer) -> true, null, 200, @@ -53,7 +62,7 @@ class VertxGossipServerTest { 10001, bytes -> bytes, new EphemeralPeerRepository(), - messageReceived2::set, + messageReceived2, (message, peer) -> true, null, 200, @@ -67,11 +76,11 @@ class VertxGossipServerTest { server1.gossip(attributes, Bytes.fromHexString("deadbeef")); for (int i = 0; i < 10; i++) { Thread.sleep(500); - if (Bytes.fromHexString("deadbeef").equals(messageReceived2.get())) { + if (Bytes.fromHexString("deadbeef").equals(messageReceived2.message)) { break; } } - assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.get()); + assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.message); server1.stop().join(); server2.stop().join(); @@ -80,9 +89,9 @@ class VertxGossipServerTest { @Test void gossipDeadBeefToTwoOtherNodes(@VertxInstance Vertx vertx) throws Exception { - AtomicReference<Bytes> messageReceived1 = new AtomicReference<>(); - AtomicReference<Bytes> messageReceived2 = new AtomicReference<>(); - AtomicReference<Bytes> messageReceived3 = new AtomicReference<>(); + MessageListenerImpl messageReceived1 = new MessageListenerImpl(); + MessageListenerImpl messageReceived2 = new MessageListenerImpl(); + MessageListenerImpl messageReceived3 = new MessageListenerImpl(); VertxGossipServer server1 = new VertxGossipServer( vertx, @@ -90,7 +99,7 @@ class VertxGossipServerTest { 10000, bytes -> bytes, new EphemeralPeerRepository(), - messageReceived1::set, + messageReceived1, (message, peer) -> true, null, 200, @@ -101,7 +110,7 @@ class VertxGossipServerTest { 10001, bytes -> bytes, new EphemeralPeerRepository(), - messageReceived2::set, + messageReceived2, (message, peer) -> true, null, 200, @@ -112,7 +121,7 @@ class VertxGossipServerTest { 10002, bytes -> bytes, new EphemeralPeerRepository(), - messageReceived3::set, + messageReceived3, (message, peer) -> true, null, 200, @@ -128,23 +137,24 @@ class VertxGossipServerTest { server1.gossip(attributes, Bytes.fromHexString("deadbeef")); for (int i = 0; i < 10; i++) { Thread.sleep(500); - if (Bytes.fromHexString("deadbeef").equals(messageReceived2.get()) - && Bytes.fromHexString("deadbeef").equals(messageReceived3.get())) { + if (Bytes.fromHexString("deadbeef").equals(messageReceived2.message) + && Bytes.fromHexString("deadbeef").equals(messageReceived3.message)) { break; } } - assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.get()); - assertEquals(Bytes.fromHexString("deadbeef"), messageReceived3.get()); - assertNull(messageReceived1.get()); + assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.message); + assertEquals(Bytes.fromHexString("deadbeef"), messageReceived3.message); + assertNull(messageReceived1.message); server1.stop().join(); server2.stop().join(); + server3.stop().join(); } @Test void gossipCollision(@VertxInstance Vertx vertx) throws Exception { - AtomicReference<Bytes> messageReceived1 = new AtomicReference<>(); - AtomicReference<Bytes> messageReceived2 = new AtomicReference<>(); + MessageListenerImpl messageReceived1 = new MessageListenerImpl(); + MessageListenerImpl messageReceived2 = new MessageListenerImpl(); EphemeralPeerRepository peerRepository1 = new EphemeralPeerRepository(); EphemeralPeerRepository peerRepository3 = new EphemeralPeerRepository(); @@ -155,7 +165,7 @@ class VertxGossipServerTest { 10000, bytes -> bytes, peerRepository1, - messageReceived1::set, + messageReceived1, (message, peer) -> true, null, 200, @@ -166,7 +176,7 @@ class VertxGossipServerTest { 10001, bytes -> bytes, new EphemeralPeerRepository(), - messageReceived2::set, + messageReceived2, (message, peer) -> true, null, 200, @@ -177,7 +187,7 @@ class VertxGossipServerTest { 10002, bytes -> bytes, peerRepository3, - messageReceived2::set, + messageReceived2, (message, peer) -> true, null, 200, @@ -194,7 +204,7 @@ class VertxGossipServerTest { String attributes = "{\"message_type\": \"BLOCK\"}"; server1.gossip(attributes, Bytes.fromHexString("deadbeef")); Thread.sleep(1000); - assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.get()); + assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.message); Thread.sleep(1000); assertTrue(peerRepository1.lazyPushPeers().size() > 0 || peerRepository3.lazyPushPeers().size() > 0); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
