This is an automated email from the ASF dual-hosted git repository.

toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git

commit 55bdb39a3e18569f3b9a09bc11bba6a1cb1818d1
Author: Antoine Toulme <[email protected]>
AuthorDate: Mon Jun 10 16:35:34 2019 -0700

    Allow consuming message body alongside its attributes
---
 .../java/org/apache/tuweni/gossip/GossipApp.java   |  2 +-
 .../apache/tuweni/plumtree/MessageListener.java    | 29 +++++++++++
 .../java/org/apache/tuweni/plumtree/State.java     |  9 ++--
 .../tuweni/plumtree/vertx/VertxGossipServer.java   |  6 +--
 .../java/org/apache/tuweni/plumtree/StateTest.java | 40 +++++++++------
 .../plumtree/vertx/VertxGossipServerTest.java      | 60 +++++++++++++---------
 6 files changed, 95 insertions(+), 51 deletions(-)

diff --git a/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java 
b/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java
index 8866eee..8542997 100644
--- a/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java
+++ b/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java
@@ -100,7 +100,7 @@ public final class GossipApp {
         opts.listenPort(),
         Hash::keccak256,
         repository,
-        bytes -> readMessage(opts.messageLog(), errStream, bytes),
+        (bytes, attr) -> readMessage(opts.messageLog(), errStream, bytes),
         null,
         new CountingPeerPruningFunction(10),
         100,
diff --git 
a/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageListener.java 
b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageListener.java
new file mode 100644
index 0000000..4077e22
--- /dev/null
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageListener.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.tuweni.plumtree;
+
+import org.apache.tuweni.bytes.Bytes;
+
+/**
+ * Listens to an incoming message, along with its attributes.
+ */
+public interface MessageListener {
+
+  /**
+   * Consumes a message
+   * 
+   * @param messageBody the body of the message
+   * @param attributes the attributes of the message
+   */
+  public void listen(Bytes messageBody, String attributes);
+}
diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java 
b/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java
index 463583b..eab5c5d 100644
--- a/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java
@@ -24,7 +24,6 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 
@@ -46,7 +45,7 @@ public final class State {
       });
 
   private final MessageSender messageSender;
-  private final Consumer<Bytes> messageListener;
+  private final MessageListener messageListener;
   private final MessageValidator messageValidator;
   private final PeerPruning peerPruningFunction;
   final Queue<Runnable> lazyQueue = new ConcurrentLinkedQueue<>();
@@ -94,7 +93,7 @@ public final class State {
                           .sendMessage(MessageSender.Verb.IHAVE, null, peer, 
hash, null)))
                   .collect(Collectors.toList()));
           if (sender != null) {
-            messageListener.accept(message);
+            messageListener.listen(message, attributes);
           }
         }
       } else {
@@ -146,7 +145,7 @@ public final class State {
       PeerRepository peerRepository,
       MessageHashing messageHashingFunction,
       MessageSender messageSender,
-      Consumer<Bytes> messageListener,
+      MessageListener messageListener,
       MessageValidator messageValidator,
       PeerPruning peerPruningFunction) {
     this(
@@ -177,7 +176,7 @@ public final class State {
       PeerRepository peerRepository,
       MessageHashing messageHashingFunction,
       MessageSender messageSender,
-      Consumer<Bytes> messageListener,
+      MessageListener messageListener,
       MessageValidator messageValidator,
       PeerPruning peerPruningFunction,
       long graftDelay,
diff --git 
a/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
 
b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
index 09afd6f..a3041b3 100644
--- 
a/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
+++ 
b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
@@ -16,6 +16,7 @@ import org.apache.tuweni.bytes.Bytes;
 import org.apache.tuweni.concurrent.AsyncCompletion;
 import org.apache.tuweni.concurrent.CompletableAsyncCompletion;
 import org.apache.tuweni.plumtree.MessageHashing;
+import org.apache.tuweni.plumtree.MessageListener;
 import org.apache.tuweni.plumtree.MessageSender;
 import org.apache.tuweni.plumtree.MessageValidator;
 import org.apache.tuweni.plumtree.Peer;
@@ -26,7 +27,6 @@ import org.apache.tuweni.plumtree.State;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Consumer;
 import javax.annotation.Nullable;
 
 import com.fasterxml.jackson.core.JsonParser;
@@ -108,7 +108,7 @@ public final class VertxGossipServer {
   private final int lazyQueueInterval;
   private final MessageHashing messageHashing;
   private final String networkInterface;
-  private final Consumer<Bytes> payloadListener;
+  private final MessageListener payloadListener;
   private final MessageValidator payloadValidator;
   private final PeerPruning peerPruningFunction;
   private final PeerRepository peerRepository;
@@ -124,7 +124,7 @@ public final class VertxGossipServer {
       int port,
       MessageHashing messageHashing,
       PeerRepository peerRepository,
-      Consumer<Bytes> payloadListener,
+      MessageListener payloadListener,
       @Nullable MessageValidator payloadValidator,
       @Nullable PeerPruning peerPruningFunction,
       int graftDelay,
diff --git a/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java 
b/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java
index 01a31cc..da5e592 100644
--- a/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java
+++ b/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java
@@ -21,8 +21,6 @@ import org.apache.tuweni.bytes.Bytes32;
 import org.apache.tuweni.crypto.Hash;
 import org.apache.tuweni.junit.BouncyCastleExtension;
 
-import java.util.concurrent.atomic.AtomicReference;
-
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
@@ -51,7 +49,15 @@ class StateTest {
     }
   }
 
-  private static final AtomicReference<Bytes> messageRef = new 
AtomicReference<>();
+  private static final MessageListener messageListener = new MessageListener() 
{
+
+    public Bytes message;
+
+    @Override
+    public void listen(Bytes messageBody, String attributes) {
+      message = messageBody;
+    }
+  };
 
   @Test
   void testInitialState() {
@@ -60,7 +66,7 @@ class StateTest {
         repo,
         Hash::keccak256,
         new MockMessageSender(),
-        messageRef::set,
+        messageListener,
         (message, peer) -> true,
         (peer) -> true);
     assertTrue(repo.peers().isEmpty());
@@ -75,7 +81,7 @@ class StateTest {
         repo,
         Hash::keccak256,
         new MockMessageSender(),
-        messageRef::set,
+        messageListener,
         (message, peer) -> true,
         (peer) -> true);
     state.addPeer(new PeerImpl());
@@ -92,7 +98,7 @@ class StateTest {
         repo,
         Hash::keccak256,
         new MockMessageSender(),
-        messageRef::set,
+        messageListener,
         (message, peer) -> true,
         (peer) -> true);
     state.addPeer(new PeerImpl());
@@ -108,7 +114,7 @@ class StateTest {
         repo,
         Hash::keccak256,
         new MockMessageSender(),
-        messageRef::set,
+        messageListener,
         (message, peer) -> true,
         (peer) -> true);
     state.addPeer(new PeerImpl());
@@ -123,7 +129,7 @@ class StateTest {
         repo,
         Hash::keccak256,
         new MockMessageSender(),
-        messageRef::set,
+        messageListener,
         (message, peer) -> true,
         (peer) -> true);
     Peer peer = new PeerImpl();
@@ -141,7 +147,7 @@ class StateTest {
         repo,
         Hash::keccak256,
         new MockMessageSender(),
-        messageRef::set,
+        messageListener,
         (message, peer) -> true,
         (peer) -> true);
     Peer peer = new PeerImpl();
@@ -158,7 +164,7 @@ class StateTest {
         repo,
         Hash::keccak256,
         new MockMessageSender(),
-        messageRef::set,
+        messageListener,
         (message, peer) -> true,
         (peer) -> true);
     Peer peer = new PeerImpl();
@@ -176,7 +182,7 @@ class StateTest {
     EphemeralPeerRepository repo = new EphemeralPeerRepository();
     MockMessageSender messageSender = new MockMessageSender();
     State state =
-        new State(repo, Hash::keccak256, messageSender, messageRef::set, 
(message, peer) -> true, (peer) -> true);
+        new State(repo, Hash::keccak256, messageSender, messageListener, 
(message, peer) -> true, (peer) -> true);
     Peer peer = new PeerImpl();
     state.addPeer(peer);
     Peer otherPeer = new PeerImpl();
@@ -193,7 +199,7 @@ class StateTest {
     EphemeralPeerRepository repo = new EphemeralPeerRepository();
     MockMessageSender messageSender = new MockMessageSender();
     State state =
-        new State(repo, Hash::keccak256, messageSender, messageRef::set, 
(message, peer) -> true, (peer) -> true);
+        new State(repo, Hash::keccak256, messageSender, messageListener, 
(message, peer) -> true, (peer) -> true);
     Peer peer = new PeerImpl();
     state.addPeer(peer);
     Peer otherPeer = new PeerImpl();
@@ -218,7 +224,7 @@ class StateTest {
     EphemeralPeerRepository repo = new EphemeralPeerRepository();
     MockMessageSender messageSender = new MockMessageSender();
     State state =
-        new State(repo, Hash::keccak256, messageSender, messageRef::set, 
(message, peer) -> true, (peer) -> true);
+        new State(repo, Hash::keccak256, messageSender, messageListener, 
(message, peer) -> true, (peer) -> true);
     Peer peer = new PeerImpl();
     state.addPeer(peer);
     Peer lazyPeer = new PeerImpl();
@@ -240,7 +246,7 @@ class StateTest {
         repo,
         Hash::keccak256,
         messageSender,
-        messageRef::set,
+        messageListener,
         (message, peer) -> true,
         (peer) -> true,
         100,
@@ -266,7 +272,7 @@ class StateTest {
         repo,
         Hash::keccak256,
         messageSender,
-        messageRef::set,
+        messageListener,
         (message, peer) -> true,
         (peer) -> true,
         500,
@@ -292,7 +298,7 @@ class StateTest {
     EphemeralPeerRepository repo = new EphemeralPeerRepository();
     MockMessageSender messageSender = new MockMessageSender();
     State state =
-        new State(repo, Hash::keccak256, messageSender, messageRef::set, 
(message, peer) -> true, (peer) -> true);
+        new State(repo, Hash::keccak256, messageSender, messageListener, 
(message, peer) -> true, (peer) -> true);
     Peer peer = new PeerImpl();
     Bytes message = Bytes32.random();
     String attributes = "{\"message_type\": \"BLOCK\"}";
@@ -307,7 +313,7 @@ class StateTest {
     EphemeralPeerRepository repo = new EphemeralPeerRepository();
     MockMessageSender messageSender = new MockMessageSender();
     State state =
-        new State(repo, Hash::keccak256, messageSender, messageRef::set, 
(message, peer) -> true, (peer) -> true);
+        new State(repo, Hash::keccak256, messageSender, messageListener, 
(message, peer) -> true, (peer) -> true);
     Peer peer = new PeerImpl();
     Peer secondPeer = new PeerImpl();
     Bytes message = Bytes32.random();
diff --git 
a/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
 
b/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
index 29ae06c..fe29581 100644
--- 
a/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
+++ 
b/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
@@ -20,8 +20,7 @@ import org.apache.tuweni.bytes.Bytes;
 import org.apache.tuweni.junit.VertxExtension;
 import org.apache.tuweni.junit.VertxInstance;
 import org.apache.tuweni.plumtree.EphemeralPeerRepository;
-
-import java.util.concurrent.atomic.AtomicReference;
+import org.apache.tuweni.plumtree.MessageListener;
 
 import io.vertx.core.Vertx;
 import org.junit.jupiter.api.Test;
@@ -30,11 +29,21 @@ import org.junit.jupiter.api.extension.ExtendWith;
 @ExtendWith(VertxExtension.class)
 class VertxGossipServerTest {
 
+  private static class MessageListenerImpl implements MessageListener {
+
+    public Bytes message;
+
+    @Override
+    public void listen(Bytes messageBody, String attributes) {
+      message = messageBody;
+    }
+  }
+
   @Test
   void gossipDeadBeefToOtherNode(@VertxInstance Vertx vertx) throws Exception {
 
-    AtomicReference<Bytes> messageReceived1 = new AtomicReference<>();
-    AtomicReference<Bytes> messageReceived2 = new AtomicReference<>();
+    MessageListenerImpl messageReceived1 = new MessageListenerImpl();
+    MessageListenerImpl messageReceived2 = new MessageListenerImpl();
 
     VertxGossipServer server1 = new VertxGossipServer(
         vertx,
@@ -42,7 +51,7 @@ class VertxGossipServerTest {
         10000,
         bytes -> bytes,
         new EphemeralPeerRepository(),
-        messageReceived1::set,
+        messageReceived1,
         (message, peer) -> true,
         null,
         200,
@@ -53,7 +62,7 @@ class VertxGossipServerTest {
         10001,
         bytes -> bytes,
         new EphemeralPeerRepository(),
-        messageReceived2::set,
+        messageReceived2,
         (message, peer) -> true,
         null,
         200,
@@ -67,11 +76,11 @@ class VertxGossipServerTest {
     server1.gossip(attributes, Bytes.fromHexString("deadbeef"));
     for (int i = 0; i < 10; i++) {
       Thread.sleep(500);
-      if (Bytes.fromHexString("deadbeef").equals(messageReceived2.get())) {
+      if (Bytes.fromHexString("deadbeef").equals(messageReceived2.message)) {
         break;
       }
     }
-    assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.get());
+    assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.message);
 
     server1.stop().join();
     server2.stop().join();
@@ -80,9 +89,9 @@ class VertxGossipServerTest {
   @Test
   void gossipDeadBeefToTwoOtherNodes(@VertxInstance Vertx vertx) throws 
Exception {
 
-    AtomicReference<Bytes> messageReceived1 = new AtomicReference<>();
-    AtomicReference<Bytes> messageReceived2 = new AtomicReference<>();
-    AtomicReference<Bytes> messageReceived3 = new AtomicReference<>();
+    MessageListenerImpl messageReceived1 = new MessageListenerImpl();
+    MessageListenerImpl messageReceived2 = new MessageListenerImpl();
+    MessageListenerImpl messageReceived3 = new MessageListenerImpl();
 
     VertxGossipServer server1 = new VertxGossipServer(
         vertx,
@@ -90,7 +99,7 @@ class VertxGossipServerTest {
         10000,
         bytes -> bytes,
         new EphemeralPeerRepository(),
-        messageReceived1::set,
+        messageReceived1,
         (message, peer) -> true,
         null,
         200,
@@ -101,7 +110,7 @@ class VertxGossipServerTest {
         10001,
         bytes -> bytes,
         new EphemeralPeerRepository(),
-        messageReceived2::set,
+        messageReceived2,
         (message, peer) -> true,
         null,
         200,
@@ -112,7 +121,7 @@ class VertxGossipServerTest {
         10002,
         bytes -> bytes,
         new EphemeralPeerRepository(),
-        messageReceived3::set,
+        messageReceived3,
         (message, peer) -> true,
         null,
         200,
@@ -128,23 +137,24 @@ class VertxGossipServerTest {
     server1.gossip(attributes, Bytes.fromHexString("deadbeef"));
     for (int i = 0; i < 10; i++) {
       Thread.sleep(500);
-      if (Bytes.fromHexString("deadbeef").equals(messageReceived2.get())
-          && Bytes.fromHexString("deadbeef").equals(messageReceived3.get())) {
+      if (Bytes.fromHexString("deadbeef").equals(messageReceived2.message)
+          && Bytes.fromHexString("deadbeef").equals(messageReceived3.message)) 
{
         break;
       }
     }
-    assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.get());
-    assertEquals(Bytes.fromHexString("deadbeef"), messageReceived3.get());
-    assertNull(messageReceived1.get());
+    assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.message);
+    assertEquals(Bytes.fromHexString("deadbeef"), messageReceived3.message);
+    assertNull(messageReceived1.message);
 
     server1.stop().join();
     server2.stop().join();
+    server3.stop().join();
   }
 
   @Test
   void gossipCollision(@VertxInstance Vertx vertx) throws Exception {
-    AtomicReference<Bytes> messageReceived1 = new AtomicReference<>();
-    AtomicReference<Bytes> messageReceived2 = new AtomicReference<>();
+    MessageListenerImpl messageReceived1 = new MessageListenerImpl();
+    MessageListenerImpl messageReceived2 = new MessageListenerImpl();
 
     EphemeralPeerRepository peerRepository1 = new EphemeralPeerRepository();
     EphemeralPeerRepository peerRepository3 = new EphemeralPeerRepository();
@@ -155,7 +165,7 @@ class VertxGossipServerTest {
         10000,
         bytes -> bytes,
         peerRepository1,
-        messageReceived1::set,
+        messageReceived1,
         (message, peer) -> true,
         null,
         200,
@@ -166,7 +176,7 @@ class VertxGossipServerTest {
         10001,
         bytes -> bytes,
         new EphemeralPeerRepository(),
-        messageReceived2::set,
+        messageReceived2,
         (message, peer) -> true,
         null,
         200,
@@ -177,7 +187,7 @@ class VertxGossipServerTest {
         10002,
         bytes -> bytes,
         peerRepository3,
-        messageReceived2::set,
+        messageReceived2,
         (message, peer) -> true,
         null,
         200,
@@ -194,7 +204,7 @@ class VertxGossipServerTest {
     String attributes = "{\"message_type\": \"BLOCK\"}";
     server1.gossip(attributes, Bytes.fromHexString("deadbeef"));
     Thread.sleep(1000);
-    assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.get());
+    assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.message);
     Thread.sleep(1000);
 
     assertTrue(peerRepository1.lazyPushPeers().size() > 0 || 
peerRepository3.lazyPushPeers().size() > 0);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to