This is an automated email from the ASF dual-hosted git repository.
toulmean pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
The following commit(s) were added to refs/heads/main by this push:
new 65b0923 Ability to send a message to a peer
new 7a20940 Merge pull request #380 from atoulme/plumtree_rpc
65b0923 is described below
commit 65b0923cebbfc28b1d884964fc26302e58db1d08
Author: Antoine Toulme <[email protected]>
AuthorDate: Mon Mar 14 23:37:25 2022 -0700
Ability to send a message to a peer
---
.../java/org/apache/tuweni/gossip/GossipApp.java | 2 +-
.../plumtree/vertx/VertxGossipServerTest.java | 48 +++++++++++++++++++++-
.../apache/tuweni/plumtree/MessageListener.java | 3 +-
.../org/apache/tuweni/plumtree/MessageSender.java | 8 +++-
.../java/org/apache/tuweni/plumtree/State.java | 7 +++-
.../tuweni/plumtree/vertx/VertxGossipServer.java | 16 ++++++++
.../java/org/apache/tuweni/plumtree/StateTest.java | 2 +-
7 files changed, 79 insertions(+), 7 deletions(-)
diff --git a/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java
b/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java
index 4cfae4b..0d6509c 100644
--- a/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java
+++ b/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java
@@ -100,7 +100,7 @@ public final class GossipApp {
opts.listenPort(),
Hash::keccak256,
repository,
- (bytes, attr) -> readMessage(opts.messageLog(), errStream, bytes),
+ (bytes, attr, peer) -> readMessage(opts.messageLog(), errStream,
bytes),
null,
new CountingPeerPruningFunction(10),
100,
diff --git
a/plumtree/src/integrationTest/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
b/plumtree/src/integrationTest/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
index fe29581..7f64843 100644
---
a/plumtree/src/integrationTest/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
+++
b/plumtree/src/integrationTest/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
@@ -21,6 +21,7 @@ import org.apache.tuweni.junit.VertxExtension;
import org.apache.tuweni.junit.VertxInstance;
import org.apache.tuweni.plumtree.EphemeralPeerRepository;
import org.apache.tuweni.plumtree.MessageListener;
+import org.apache.tuweni.plumtree.Peer;
import io.vertx.core.Vertx;
import org.junit.jupiter.api.Test;
@@ -34,7 +35,7 @@ class VertxGossipServerTest {
public Bytes message;
@Override
- public void listen(Bytes messageBody, String attributes) {
+ public void listen(Bytes messageBody, String attributes, Peer peer) {
message = messageBody;
}
}
@@ -213,4 +214,49 @@ class VertxGossipServerTest {
server2.stop().join();
server3.stop().join();
}
+
+ @Test
+ void sendMessages(@VertxInstance Vertx vertx) throws Exception {
+ MessageListenerImpl messageReceived1 = new MessageListenerImpl();
+ MessageListenerImpl messageReceived2 = new MessageListenerImpl();
+
+ EphemeralPeerRepository peerRepository1 = new EphemeralPeerRepository();
+
+ VertxGossipServer server1 = new VertxGossipServer(
+ vertx,
+ "127.0.0.1",
+ 10000,
+ bytes -> bytes,
+ peerRepository1,
+ messageReceived1,
+ (message, peer) -> true,
+ null,
+ 200,
+ 200);
+ VertxGossipServer server2 = new VertxGossipServer(
+ vertx,
+ "127.0.0.1",
+ 10001,
+ bytes -> bytes,
+ new EphemeralPeerRepository(),
+ messageReceived2,
+ (message, peer) -> true,
+ null,
+ 200,
+ 200);
+
+ server1.start().join();
+ server2.start().join();
+
+ server1.connectTo("127.0.0.1", 10001).join();
+ assertEquals(1, peerRepository1.eagerPushPeers().size());
+ String attributes = "{\"message_type\": \"BLOCK\"}";
+ server1.send(peerRepository1.peers().iterator().next(), attributes,
Bytes.fromHexString("deadbeef"));
+ Thread.sleep(1000);
+ assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.message);
+ Thread.sleep(1000);
+
+ server1.stop().join();
+ server2.stop().join();
+ }
}
diff --git
a/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageListener.java
b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageListener.java
index 4077e22..9cda77f 100644
--- a/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageListener.java
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageListener.java
@@ -24,6 +24,7 @@ public interface MessageListener {
*
* @param messageBody the body of the message
* @param attributes the attributes of the message
+ * @param peer the peer we received the message from
*/
- public void listen(Bytes messageBody, String attributes);
+ public void listen(Bytes messageBody, String attributes, Peer peer);
}
diff --git
a/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java
b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java
index dde092a..8a86580 100644
--- a/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java
@@ -38,9 +38,13 @@ public interface MessageSender {
*/
PRUNE,
/**
- * Send a message
+ * Gossip a message
*/
- GOSSIP
+ GOSSIP,
+ /**
+ * Send a direct message
+ */
+ SEND
}
/**
diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java
b/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java
index 930bada..48e65b7 100644
--- a/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java
@@ -52,6 +52,11 @@ public final class State {
private final Timer timer = new Timer("plumtree", true);
private final long delay;
+ public void sendMessage(Peer peer, String attributes, Bytes message) {
+ Bytes messageHash = messageHashingFunction.hash(message);
+ messageSender.sendMessage(MessageSender.Verb.SEND, attributes, peer,
messageHash, message);
+ }
+
final class MessageHandler {
private final Bytes hash;
@@ -94,7 +99,7 @@ public final class State {
.sendMessage(MessageSender.Verb.IHAVE, null,
peer, hash, null)))
.collect(Collectors.toList()));
if (sender != null) {
- messageListener.listen(message, attributes);
+ messageListener.listen(message, attributes, sender);
}
}
} else {
diff --git
a/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
index a37d153..fb08b6e 100644
---
a/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
+++
b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
@@ -95,6 +95,8 @@ public final class VertxGossipServer {
case PRUNE:
state.receivePruneMessage(peer);
break;
+ case SEND:
+ payloadListener.listen(Bytes.fromHexString(message.payload),
message.attributes, peer);
}
}
}
@@ -246,4 +248,18 @@ public final class VertxGossipServer {
}
state.sendGossipMessage(attributes, message);
}
+
+ /**
+ * Send a message to one peer specifically.
+ *
+ * @param peer the peer to send to
+ * @param attributes the payload to propagate
+ * @param message the payload to propagate
+ */
+ public void send(Peer peer, String attributes, Bytes message) {
+ if (!started.get()) {
+ throw new IllegalStateException("Server has not started");
+ }
+ state.sendMessage(peer, attributes, message);
+ }
}
diff --git a/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java
b/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java
index bd374ec..3de6278 100644
--- a/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java
+++ b/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java
@@ -54,7 +54,7 @@ class StateTest {
public Bytes message;
@Override
- public void listen(Bytes messageBody, String attributes) {
+ public void listen(Bytes messageBody, String attributes, Peer peer) {
message = messageBody;
}
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]