This is an automated email from the ASF dual-hosted git repository.

toulmean pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git


The following commit(s) were added to refs/heads/main by this push:
     new 72ec242  nits to connect to peers non-stop
     new af907db  Merge pull request #278 from 
atoulme/report_status_on_all_connections
72ec242 is described below

commit 72ec2423b18cc856893af4d251b5735ce408116e
Author: Antoine Toulme <[email protected]>
AuthorDate: Mon Jun 14 10:17:05 2021 +0200

    nits to connect to peers non-stop
---
 .../kotlin/org/apache/tuweni/devp2p/Scraper.kt     |  2 +-
 .../org/apache/tuweni/eth/crawler/CrawlerApp.kt    | 76 +++++++++++++++++-----
 .../tuweni/eth/crawler/RelationalPeerRepository.kt | 10 +--
 .../apache/tuweni/rlpx/vertx/VertxRLPxService.java | 19 ++++--
 4 files changed, 79 insertions(+), 28 deletions(-)

diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/Scraper.kt 
b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/Scraper.kt
index 1492afa..463139a 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/Scraper.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/Scraper.kt
@@ -126,7 +126,7 @@ class Scraper(
       }
     }
 
-    for (i in 1..10) {
+    while (started.get()) {
       service?.lookupAsync(SECP256K1.KeyPair.random().publicKey())?.thenAccept 
{
         for (newNode in it) {
           if (nodes.add(newNode)) {
diff --git 
a/eth-crawler/src/main/kotlin/org/apache/tuweni/eth/crawler/CrawlerApp.kt 
b/eth-crawler/src/main/kotlin/org/apache/tuweni/eth/crawler/CrawlerApp.kt
index e8af7e2..10fc606 100644
--- a/eth-crawler/src/main/kotlin/org/apache/tuweni/eth/crawler/CrawlerApp.kt
+++ b/eth-crawler/src/main/kotlin/org/apache/tuweni/eth/crawler/CrawlerApp.kt
@@ -20,10 +20,15 @@ import com.zaxxer.hikari.HikariDataSource
 import io.opentelemetry.sdk.metrics.SdkMeterProvider
 import io.vertx.core.Vertx
 import io.vertx.core.net.SocketAddress
+import kotlinx.coroutines.CoroutineDispatcher
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.asCoroutineDispatcher
 import kotlinx.coroutines.async
+import kotlinx.coroutines.awaitAll
 import kotlinx.coroutines.delay
 import kotlinx.coroutines.launch
 import kotlinx.coroutines.runBlocking
+import org.apache.tuweni.concurrent.AsyncCompletion
 import org.apache.tuweni.concurrent.ExpiringSet
 import org.apache.tuweni.concurrent.coroutines.await
 import org.apache.tuweni.crypto.SECP256K1
@@ -44,6 +49,7 @@ import java.net.InetSocketAddress
 import java.nio.file.Files
 import java.nio.file.Paths
 import java.security.Security
+import java.util.concurrent.Executors
 import java.util.concurrent.atomic.AtomicBoolean
 
 /**
@@ -63,8 +69,12 @@ object CrawlerApp {
       }
       System.exit(1)
     }
-    run(vertx, config)
+    val app = CrawlerApplication()
+    app.run(vertx, config)
   }
+}
+
+class CrawlerApplication(override val coroutineContext: CoroutineDispatcher = 
Executors.newFixedThreadPool(20).asCoroutineDispatcher()) : CoroutineScope {
 
   fun run(vertx: Vertx, config: CrawlerConfig) {
     val ds = HikariDataSource()
@@ -81,7 +91,8 @@ object CrawlerApp {
       vertx = vertx,
       initialURIs = config.bootNodes(),
       bindAddress = SocketAddress.inetSocketAddress(config.discoveryPort(), 
config.discoveryNetworkInterface()),
-      repository = repo
+      repository = repo,
+
     )
     val contents = if (config.network() == null) {
       Files.readAllBytes(Paths.get(config.genesisFile()))
@@ -112,11 +123,23 @@ object CrawlerApp {
       }
     }
 
-    val rlpxService = VertxRLPxService(vertx, 0, "127.0.0.1", 0, 
SECP256K1.KeyPair.random(), listOf(ethHelloProtocol), "Apache Tuweni network 
crawler", meter)
+    val rlpxService = VertxRLPxService(
+      vertx,
+      30303,
+      "127.0.0.1",
+      30303,
+      SECP256K1.KeyPair.random(),
+      listOf(ethHelloProtocol),
+      "Apache Tuweni network crawler",
+      meter
+    )
     repo.addListener {
-      connect(rlpxService, it.nodeId, InetSocketAddress(it.endpoint.address, 
it.endpoint.tcpPort ?: 30303))
+      runBlocking {
+        connect(rlpxService, it.nodeId, InetSocketAddress(it.endpoint.address, 
it.endpoint.tcpPort ?: 30303))
+      }
     }
-    val restService = CrawlerRESTService(port = config.restPort(), 
networkInterface = config.restNetworkInterface(), repository = repo)
+    val restService =
+      CrawlerRESTService(port = config.restPort(), networkInterface = 
config.restNetworkInterface(), repository = repo)
     val refreshLoop = AtomicBoolean(true)
     val ethstatsDataRepository = EthstatsDataRepository(ds)
     val ethstatsServer = EthStatsServer(
@@ -146,10 +169,14 @@ object CrawlerApp {
         while (refreshLoop.get()) {
           try {
             for (connectionInfo in repo.getPeers(System.currentTimeMillis() - 
60 * 60 * 1000)) {
-              connect(rlpxService, connectionInfo.nodeId, 
InetSocketAddress(connectionInfo.host, if (connectionInfo.port == 0) 30303 else 
connectionInfo.port))
+              connect(
+                rlpxService,
+                connectionInfo.nodeId,
+                InetSocketAddress(connectionInfo.host, if (connectionInfo.port 
== 0) 30303 else connectionInfo.port)
+              )
             }
           } catch (e: Exception) {
-            logger.error("Error connecting to pending peers", e)
+            logger.error("Error connecting to peers", e)
           }
           delay(5 * 60 * 1000)
         }
@@ -157,28 +184,41 @@ object CrawlerApp {
       rlpxService.start().await()
       scraper.start()
       ethstatsServer.start()
+      val peerSeen = ExpiringSet<PeerConnectionInfo>(5 * 60 * 1000)
       launch {
         while (refreshLoop.get()) {
           try {
-            for (connectionInfo in repo.getPendingPeers()) {
-              connect(
-                rlpxService,
-                connectionInfo.nodeId,
-                InetSocketAddress(connectionInfo.host, if (connectionInfo.port 
== 0) 30303 else connectionInfo.port)
-              )
+            launch {
+              logger.info("Requesting pending peers")
+              val pendingPeers = repo.getPendingPeers(1000)
+              logger.info("Requesting connections to ${pendingPeers.size} 
peers")
+              pendingPeers.map { connectionInfo ->
+                async {
+                  if (peerSeen.add(connectionInfo)) {
+                    connect(
+                      rlpxService,
+                      connectionInfo.nodeId,
+                      InetSocketAddress(
+                        connectionInfo.host,
+                        if (connectionInfo.port == 0) 30303 else 
connectionInfo.port
+                      )
+                    ).await()
+                  }
+                }
+              }.awaitAll()
             }
           } catch (e: Exception) {
             logger.error("Error connecting to pending peers", e)
           }
-          delay(2 * 60 * 1000)
+          delay(10 * 1000)
         }
       }
     }
   }
-}
 
-fun connect(rlpxService: RLPxService, key: SECP256K1.PublicKey, address: 
InetSocketAddress) {
-  rlpxService.connectTo(key, address).thenAccept {
-    rlpxService.disconnect(it, DisconnectReason.CLIENT_QUITTING)
+  fun connect(rlpxService: RLPxService, key: SECP256K1.PublicKey, address: 
InetSocketAddress): AsyncCompletion {
+    return rlpxService.connectTo(key, address).thenAccept {
+      rlpxService.disconnect(it, DisconnectReason.CLIENT_QUITTING)
+    }
   }
 }
diff --git 
a/eth-crawler/src/main/kotlin/org/apache/tuweni/eth/crawler/RelationalPeerRepository.kt
 
b/eth-crawler/src/main/kotlin/org/apache/tuweni/eth/crawler/RelationalPeerRepository.kt
index 8050155..1847f31 100644
--- 
a/eth-crawler/src/main/kotlin/org/apache/tuweni/eth/crawler/RelationalPeerRepository.kt
+++ 
b/eth-crawler/src/main/kotlin/org/apache/tuweni/eth/crawler/RelationalPeerRepository.kt
@@ -59,7 +59,7 @@ open class RelationalPeerRepository(
 
   fun get(nodeId: SECP256K1.PublicKey, endpoint: Endpoint): Peer {
     dataSource.connection.use { conn ->
-      logger.info("Get peer with $nodeId")
+      logger.trace("Get peer with $nodeId")
       val stmt = conn.prepareStatement("select id,publickey from identity 
where publickey=?")
       stmt.setBytes(1, nodeId.bytes().toArrayUnsafe())
       try {
@@ -82,7 +82,7 @@ open class RelationalPeerRepository(
             }
             return newPeer
           } else {
-            logger.debug("Found existing peer with public key 
${nodeId.toHexString()}")
+            logger.trace("Found existing peer with public key 
${nodeId.toHexString()}")
             val id = rs.getString(1)
             val pubKey = rs.getBytes(2)
             return 
RepositoryPeer(SECP256K1.PublicKey.fromBytes(Bytes.wrap(pubKey)), id, endpoint, 
dataSource)
@@ -194,17 +194,17 @@ open class RelationalPeerRepository(
     }
   }
 
-  internal fun getPendingPeers(): Set<PeerConnectionInfo> {
+  internal fun getPendingPeers(from: Int = 0, limit: Int = 100): 
List<PeerConnectionInfo> {
     dataSource.connection.use { conn ->
       val stmt =
         conn.prepareStatement(
           "select endpoint.host, endpoint.port, identity.publickey from 
endpoint inner " +
-            "join identity on (endpoint.identity = identity.id) where 
endpoint.identity NOT IN (select identity from nodeinfo)"
+            "join identity on (endpoint.identity = identity.id) where 
endpoint.identity NOT IN (select identity from nodeinfo) order by 
endpoint.lastSeen desc limit $limit offset $from"
         )
       stmt.use {
         // map results.
         val rs = stmt.executeQuery()
-        val result = mutableSetOf<PeerConnectionInfo>()
+        val result = mutableListOf<PeerConnectionInfo>()
         while (rs.next()) {
           val pubkey = 
SECP256K1.PublicKey.fromBytes(Bytes.wrap(rs.getBytes(3)))
           val port = rs.getInt(2)
diff --git 
a/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java 
b/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java
index c74c034..c5485b8 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java
@@ -74,14 +74,15 @@ public final class VertxRLPxService implements RLPxService {
   private final WireConnectionRepository repository;
   private final LongCounter connectionsCreatedCounter;
   private final LongCounter connectionsDisconnectedCounter;
+  private final List<SECP256K1.PublicKey> keepAliveList = new ArrayList<>();
+  private final int connectTimeout = 5 * 1000;
+  private final int idleTimeout = 30 * 1000;
 
   private LinkedHashMap<SubProtocolIdentifier, SubProtocolHandler> handlers;
   private LinkedHashMap<SubProtocolIdentifier, SubProtocolClient> clients;
   private NetClient client;
   private NetServer server;
 
-  private List<SECP256K1.PublicKey> keepAliveList = new ArrayList<>();
-
   private static void checkPort(int port) {
     if (port < 0 || port > 65536) {
       throw new IllegalArgumentException("Invalid port: " + port);
@@ -200,9 +201,19 @@ public final class VertxRLPxService implements RLPxService 
{
         }
       }
 
-      client = vertx.createNetClient(new NetClientOptions());
+      client = vertx
+          .createNetClient(
+              new NetClientOptions()
+                  .setTcpKeepAlive(true)
+                  .setConnectTimeout(connectTimeout)
+                  .setIdleTimeout(idleTimeout));
       server = vertx
-          .createNetServer(new 
NetServerOptions().setPort(listenPort).setHost(networkInterface).setTcpKeepAlive(true))
+          .createNetServer(
+              new NetServerOptions()
+                  .setPort(listenPort)
+                  .setHost(networkInterface)
+                  .setTcpKeepAlive(true)
+                  .setIdleTimeout(idleTimeout))
           .connectHandler(this::receiveMessage);
       CompletableAsyncCompletion complete = AsyncCompletion.incomplete();
       server.listen(res -> {

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to