This is an automated email from the ASF dual-hosted git repository.

toulmean pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git


The following commit(s) were added to refs/heads/main by this push:
     new 65b0923  Ability to send a message to a peer
     new 7a20940  Merge pull request #380 from atoulme/plumtree_rpc
65b0923 is described below

commit 65b0923cebbfc28b1d884964fc26302e58db1d08
Author: Antoine Toulme <[email protected]>
AuthorDate: Mon Mar 14 23:37:25 2022 -0700

    Ability to send a message to a peer
---
 .../java/org/apache/tuweni/gossip/GossipApp.java   |  2 +-
 .../plumtree/vertx/VertxGossipServerTest.java      | 48 +++++++++++++++++++++-
 .../apache/tuweni/plumtree/MessageListener.java    |  3 +-
 .../org/apache/tuweni/plumtree/MessageSender.java  |  8 +++-
 .../java/org/apache/tuweni/plumtree/State.java     |  7 +++-
 .../tuweni/plumtree/vertx/VertxGossipServer.java   | 16 ++++++++
 .../java/org/apache/tuweni/plumtree/StateTest.java |  2 +-
 7 files changed, 79 insertions(+), 7 deletions(-)

diff --git a/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java 
b/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java
index 4cfae4b..0d6509c 100644
--- a/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java
+++ b/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java
@@ -100,7 +100,7 @@ public final class GossipApp {
         opts.listenPort(),
         Hash::keccak256,
         repository,
-        (bytes, attr) -> readMessage(opts.messageLog(), errStream, bytes),
+        (bytes, attr, peer) -> readMessage(opts.messageLog(), errStream, 
bytes),
         null,
         new CountingPeerPruningFunction(10),
         100,
diff --git 
a/plumtree/src/integrationTest/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
 
b/plumtree/src/integrationTest/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
index fe29581..7f64843 100644
--- 
a/plumtree/src/integrationTest/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
+++ 
b/plumtree/src/integrationTest/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
@@ -21,6 +21,7 @@ import org.apache.tuweni.junit.VertxExtension;
 import org.apache.tuweni.junit.VertxInstance;
 import org.apache.tuweni.plumtree.EphemeralPeerRepository;
 import org.apache.tuweni.plumtree.MessageListener;
+import org.apache.tuweni.plumtree.Peer;
 
 import io.vertx.core.Vertx;
 import org.junit.jupiter.api.Test;
@@ -34,7 +35,7 @@ class VertxGossipServerTest {
     public Bytes message;
 
     @Override
-    public void listen(Bytes messageBody, String attributes) {
+    public void listen(Bytes messageBody, String attributes, Peer peer) {
       message = messageBody;
     }
   }
@@ -213,4 +214,49 @@ class VertxGossipServerTest {
     server2.stop().join();
     server3.stop().join();
   }
+
+  @Test
+  void sendMessages(@VertxInstance Vertx vertx) throws Exception {
+    MessageListenerImpl messageReceived1 = new MessageListenerImpl();
+    MessageListenerImpl messageReceived2 = new MessageListenerImpl();
+
+    EphemeralPeerRepository peerRepository1 = new EphemeralPeerRepository();
+
+    VertxGossipServer server1 = new VertxGossipServer(
+        vertx,
+        "127.0.0.1",
+        10000,
+        bytes -> bytes,
+        peerRepository1,
+        messageReceived1,
+        (message, peer) -> true,
+        null,
+        200,
+        200);
+    VertxGossipServer server2 = new VertxGossipServer(
+        vertx,
+        "127.0.0.1",
+        10001,
+        bytes -> bytes,
+        new EphemeralPeerRepository(),
+        messageReceived2,
+        (message, peer) -> true,
+        null,
+        200,
+        200);
+
+    server1.start().join();
+    server2.start().join();
+
+    server1.connectTo("127.0.0.1", 10001).join();
+    assertEquals(1, peerRepository1.eagerPushPeers().size());
+    String attributes = "{\"message_type\": \"BLOCK\"}";
+    server1.send(peerRepository1.peers().iterator().next(), attributes, 
Bytes.fromHexString("deadbeef"));
+    Thread.sleep(1000);
+    assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.message);
+    Thread.sleep(1000);
+
+    server1.stop().join();
+    server2.stop().join();
+  }
 }
diff --git 
a/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageListener.java 
b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageListener.java
index 4077e22..9cda77f 100644
--- a/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageListener.java
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageListener.java
@@ -24,6 +24,7 @@ public interface MessageListener {
    * 
    * @param messageBody the body of the message
    * @param attributes the attributes of the message
+   * @param peer the peer we received the message from
    */
-  public void listen(Bytes messageBody, String attributes);
+  public void listen(Bytes messageBody, String attributes, Peer peer);
 }
diff --git 
a/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java 
b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java
index dde092a..8a86580 100644
--- a/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java
@@ -38,9 +38,13 @@ public interface MessageSender {
      */
     PRUNE,
     /**
-     * Send a message
+     * Gossip a message
      */
-    GOSSIP
+    GOSSIP,
+    /**
+     * Send a direct message
+     */
+    SEND
   }
 
   /**
diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java 
b/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java
index 930bada..48e65b7 100644
--- a/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java
@@ -52,6 +52,11 @@ public final class State {
   private final Timer timer = new Timer("plumtree", true);
   private final long delay;
 
+  public void sendMessage(Peer peer, String attributes, Bytes message) {
+    Bytes messageHash = messageHashingFunction.hash(message);
+    messageSender.sendMessage(MessageSender.Verb.SEND, attributes, peer, 
messageHash, message);
+  }
+
   final class MessageHandler {
 
     private final Bytes hash;
@@ -94,7 +99,7 @@ public final class State {
                               .sendMessage(MessageSender.Verb.IHAVE, null, 
peer, hash, null)))
                       .collect(Collectors.toList()));
           if (sender != null) {
-            messageListener.listen(message, attributes);
+            messageListener.listen(message, attributes, sender);
           }
         }
       } else {
diff --git 
a/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
 
b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
index a37d153..fb08b6e 100644
--- 
a/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
+++ 
b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
@@ -95,6 +95,8 @@ public final class VertxGossipServer {
           case PRUNE:
             state.receivePruneMessage(peer);
             break;
+          case SEND:
+            payloadListener.listen(Bytes.fromHexString(message.payload), 
message.attributes, peer);
         }
       }
     }
@@ -246,4 +248,18 @@ public final class VertxGossipServer {
     }
     state.sendGossipMessage(attributes, message);
   }
+
+  /**
+   * Send a message to one peer specifically.
+   *
+   * @param peer the peer to send to
+   * @param attributes the payload to propagate
+   * @param message the payload to propagate
+   */
+  public void send(Peer peer, String attributes, Bytes message) {
+    if (!started.get()) {
+      throw new IllegalStateException("Server has not started");
+    }
+    state.sendMessage(peer, attributes, message);
+  }
 }
diff --git a/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java 
b/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java
index bd374ec..3de6278 100644
--- a/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java
+++ b/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java
@@ -54,7 +54,7 @@ class StateTest {
     public Bytes message;
 
     @Override
-    public void listen(Bytes messageBody, String attributes) {
+    public void listen(Bytes messageBody, String attributes, Peer peer) {
       message = messageBody;
     }
   };

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to