This is an automated email from the ASF dual-hosted git repository.
toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
The following commit(s) were added to refs/heads/master by this push:
new 1978411 Add ability to keep peers alive explicitly
new 5836c33 Merge pull request #177 from atoulme/keep_alive_peers
1978411 is described below
commit 1978411331b245f751bf5e06e3d1bce6d4cec19a
Author: Antoine Toulme <[email protected]>
AuthorDate: Fri Nov 27 00:08:41 2020 -0800
Add ability to keep peers alive explicitly
---
.../apache/tuweni/les/LESSubProtocolHandlerTest.kt | 4 ++++
.../java/org/apache/tuweni/rlpx/RLPxService.java | 9 +++++++-
.../apache/tuweni/rlpx/vertx/VertxRLPxService.java | 27 +++++++++++++++++++++-
3 files changed, 38 insertions(+), 2 deletions(-)
diff --git
a/les/src/test/kotlin/org/apache/tuweni/les/LESSubProtocolHandlerTest.kt
b/les/src/test/kotlin/org/apache/tuweni/les/LESSubProtocolHandlerTest.kt
index 424090b..3af9c58 100644
--- a/les/src/test/kotlin/org/apache/tuweni/les/LESSubProtocolHandlerTest.kt
+++ b/les/src/test/kotlin/org/apache/tuweni/les/LESSubProtocolHandlerTest.kt
@@ -64,6 +64,7 @@ import org.junit.jupiter.api.extension.ExtendWith
import java.net.InetSocketAddress
import java.time.Instant
import java.time.temporal.ChronoUnit
+import java.util.Collections.emptyList
import java.util.UUID
@ExtendWith(BouncyCastleExtension::class, VertxExtension::class,
LuceneIndexWriterExtension::class)
@@ -111,6 +112,9 @@ internal class LESSubProtocolHandlerTest {
)
private class MyRLPxService : RLPxService {
+ override fun addToKeepAliveList(peerPublicKey: SECP256K1.PublicKey) {
+ TODO("not implemented") // To change body of created functions use File
| Settings | File Templates.
+ }
override fun actualPort(): Int {
TODO("not implemented") // To change body of created functions use File
| Settings | File Templates.
diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/RLPxService.java
b/rlpx/src/main/java/org/apache/tuweni/rlpx/RLPxService.java
index 0710ca6..9bfb135 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/RLPxService.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/RLPxService.java
@@ -29,7 +29,7 @@ import java.net.InetSocketAddress;
public interface RLPxService {
/**
- * Connects to a remote peer
+ * Connects to a remote peer.
*
* @param peerPublicKey the peer public key
* @param peerAddress the peer host and port
@@ -62,6 +62,13 @@ public interface RLPxService {
int advertisedPort();
/**
+ * Adds a peer public key to the list of peers to keep alive.
+ *
+ * @param peerPublicKey the peer public key
+ */
+ void addToKeepAliveList(SECP256K1.PublicKey peerPublicKey);
+
+ /**
* Starts the service.
*
* @return a future handler tracking starting the service.
diff --git
a/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java
b/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java
index 7864d57..e7687d4 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java
@@ -18,6 +18,7 @@ import org.apache.tuweni.concurrent.AsyncCompletion;
import org.apache.tuweni.concurrent.AsyncResult;
import org.apache.tuweni.concurrent.CompletableAsyncCompletion;
import org.apache.tuweni.concurrent.CompletableAsyncResult;
+import org.apache.tuweni.crypto.SECP256K1;
import org.apache.tuweni.crypto.SECP256K1.KeyPair;
import org.apache.tuweni.crypto.SECP256K1.PublicKey;
import org.apache.tuweni.rlpx.HandshakeMessage;
@@ -35,6 +36,7 @@ import org.apache.tuweni.rlpx.wire.SubProtocolIdentifier;
import org.apache.tuweni.rlpx.wire.WireConnection;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -68,13 +70,14 @@ public final class VertxRLPxService implements RLPxService {
private final KeyPair keyPair;
private final List<SubProtocol> subProtocols;
private final String clientId;
+ private final WireConnectionRepository repository;
private LinkedHashMap<SubProtocol, SubProtocolHandler> handlers;
private LinkedHashMap<SubProtocol, SubProtocolClient> clients;
private NetClient client;
private NetServer server;
- private final WireConnectionRepository repository;
+ private List<SECP256K1.PublicKey> keepAliveList = new ArrayList<>();
private static void checkPort(int port) {
if (port < 0 || port > 65536) {
@@ -146,6 +149,23 @@ public final class VertxRLPxService implements RLPxService
{
this.subProtocols = subProtocols;
this.clientId = clientId;
this.repository = repository;
+ repository.addDisconnectionListener(c -> {
+ if (keepAliveList.contains(c.peerPublicKey())) {
+
+ tryConnect(c.peerPublicKey(), new InetSocketAddress(c.peerHost(),
c.peerPort()));
+ }
+ });
+ }
+
+ private void tryConnect(SECP256K1.PublicKey peerPublicKey, InetSocketAddress
inetSocketAddress) {
+ vertx.runOnContext(event -> connectTo(peerPublicKey,
inetSocketAddress).whenComplete((result, e) -> {
+ if (e != null) {
+ logger.warn("Error reconnecting to peer {}@{}: {}", peerPublicKey,
inetSocketAddress, e);
+ tryConnect(peerPublicKey, inetSocketAddress);
+ } else {
+ logger.info("Connected successfully to keep alive peer {}@{}",
peerPublicKey, inetSocketAddress);
+ }
+ }));
}
@Override
@@ -395,4 +415,9 @@ public final class VertxRLPxService implements RLPxService {
wireConnection.handleConnectionStart();
return wireConnection;
}
+
+ @Override
+ public void addToKeepAliveList(SECP256K1.PublicKey peerPublicKey) {
+ keepAliveList.add(peerPublicKey);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]