This is an automated email from the ASF dual-hosted git repository.

toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git


The following commit(s) were added to refs/heads/master by this push:
     new 1978411  Add ability to keep peers alive explicitly
     new 5836c33  Merge pull request #177 from atoulme/keep_alive_peers
1978411 is described below

commit 1978411331b245f751bf5e06e3d1bce6d4cec19a
Author: Antoine Toulme <[email protected]>
AuthorDate: Fri Nov 27 00:08:41 2020 -0800

    Add ability to keep peers alive explicitly
---
 .../apache/tuweni/les/LESSubProtocolHandlerTest.kt |  4 ++++
 .../java/org/apache/tuweni/rlpx/RLPxService.java   |  9 +++++++-
 .../apache/tuweni/rlpx/vertx/VertxRLPxService.java | 27 +++++++++++++++++++++-
 3 files changed, 38 insertions(+), 2 deletions(-)

diff --git 
a/les/src/test/kotlin/org/apache/tuweni/les/LESSubProtocolHandlerTest.kt 
b/les/src/test/kotlin/org/apache/tuweni/les/LESSubProtocolHandlerTest.kt
index 424090b..3af9c58 100644
--- a/les/src/test/kotlin/org/apache/tuweni/les/LESSubProtocolHandlerTest.kt
+++ b/les/src/test/kotlin/org/apache/tuweni/les/LESSubProtocolHandlerTest.kt
@@ -64,6 +64,7 @@ import org.junit.jupiter.api.extension.ExtendWith
 import java.net.InetSocketAddress
 import java.time.Instant
 import java.time.temporal.ChronoUnit
+import java.util.Collections.emptyList
 import java.util.UUID
 
 @ExtendWith(BouncyCastleExtension::class, VertxExtension::class, 
LuceneIndexWriterExtension::class)
@@ -111,6 +112,9 @@ internal class LESSubProtocolHandlerTest {
   )
 
   private class MyRLPxService : RLPxService {
+    override fun addToKeepAliveList(peerPublicKey: SECP256K1.PublicKey) {
+      TODO("not implemented") // To change body of created functions use File 
| Settings | File Templates.
+    }
 
     override fun actualPort(): Int {
       TODO("not implemented") // To change body of created functions use File 
| Settings | File Templates.
diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/RLPxService.java 
b/rlpx/src/main/java/org/apache/tuweni/rlpx/RLPxService.java
index 0710ca6..9bfb135 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/RLPxService.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/RLPxService.java
@@ -29,7 +29,7 @@ import java.net.InetSocketAddress;
 public interface RLPxService {
 
   /**
-   * Connects to a remote peer
+   * Connects to a remote peer.
    *
    * @param peerPublicKey the peer public key
    * @param peerAddress the peer host and port
@@ -62,6 +62,13 @@ public interface RLPxService {
   int advertisedPort();
 
   /**
+   * Adds a peer public key to the list of peers to keep alive.
+   *
+   * @param peerPublicKey the peer public key
+   */
+  void addToKeepAliveList(SECP256K1.PublicKey peerPublicKey);
+
+  /**
    * Starts the service.
    * 
    * @return a future handler tracking starting the service.
diff --git 
a/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java 
b/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java
index 7864d57..e7687d4 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java
@@ -18,6 +18,7 @@ import org.apache.tuweni.concurrent.AsyncCompletion;
 import org.apache.tuweni.concurrent.AsyncResult;
 import org.apache.tuweni.concurrent.CompletableAsyncCompletion;
 import org.apache.tuweni.concurrent.CompletableAsyncResult;
+import org.apache.tuweni.crypto.SECP256K1;
 import org.apache.tuweni.crypto.SECP256K1.KeyPair;
 import org.apache.tuweni.crypto.SECP256K1.PublicKey;
 import org.apache.tuweni.rlpx.HandshakeMessage;
@@ -35,6 +36,7 @@ import org.apache.tuweni.rlpx.wire.SubProtocolIdentifier;
 import org.apache.tuweni.rlpx.wire.WireConnection;
 
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -68,13 +70,14 @@ public final class VertxRLPxService implements RLPxService {
   private final KeyPair keyPair;
   private final List<SubProtocol> subProtocols;
   private final String clientId;
+  private final WireConnectionRepository repository;
 
   private LinkedHashMap<SubProtocol, SubProtocolHandler> handlers;
   private LinkedHashMap<SubProtocol, SubProtocolClient> clients;
   private NetClient client;
   private NetServer server;
 
-  private final WireConnectionRepository repository;
+  private List<SECP256K1.PublicKey> keepAliveList = new ArrayList<>();
 
   private static void checkPort(int port) {
     if (port < 0 || port > 65536) {
@@ -146,6 +149,23 @@ public final class VertxRLPxService implements RLPxService 
{
     this.subProtocols = subProtocols;
     this.clientId = clientId;
     this.repository = repository;
+    repository.addDisconnectionListener(c -> {
+      if (keepAliveList.contains(c.peerPublicKey())) {
+
+        tryConnect(c.peerPublicKey(), new InetSocketAddress(c.peerHost(), 
c.peerPort()));
+      }
+    });
+  }
+
+  private void tryConnect(SECP256K1.PublicKey peerPublicKey, InetSocketAddress 
inetSocketAddress) {
+    vertx.runOnContext(event -> connectTo(peerPublicKey, 
inetSocketAddress).whenComplete((result, e) -> {
+      if (e != null) {
+        logger.warn("Error reconnecting to peer {}@{}: {}", peerPublicKey, 
inetSocketAddress, e);
+        tryConnect(peerPublicKey, inetSocketAddress);
+      } else {
+        logger.info("Connected successfully to keep alive peer {}@{}", 
peerPublicKey, inetSocketAddress);
+      }
+    }));
   }
 
   @Override
@@ -395,4 +415,9 @@ public final class VertxRLPxService implements RLPxService {
     wireConnection.handleConnectionStart();
     return wireConnection;
   }
+
+  @Override
+  public void addToKeepAliveList(SECP256K1.PublicKey peerPublicKey) {
+    keepAliveList.add(peerPublicKey);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to