This is an automated email from the ASF dual-hosted git repository.
toulmean pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
The following commit(s) were added to refs/heads/main by this push:
new 72ec242 nits to connect to peers non-stop
new af907db Merge pull request #278 from
atoulme/report_status_on_all_connections
72ec242 is described below
commit 72ec2423b18cc856893af4d251b5735ce408116e
Author: Antoine Toulme <[email protected]>
AuthorDate: Mon Jun 14 10:17:05 2021 +0200
nits to connect to peers non-stop
---
.../kotlin/org/apache/tuweni/devp2p/Scraper.kt | 2 +-
.../org/apache/tuweni/eth/crawler/CrawlerApp.kt | 76 +++++++++++++++++-----
.../tuweni/eth/crawler/RelationalPeerRepository.kt | 10 +--
.../apache/tuweni/rlpx/vertx/VertxRLPxService.java | 19 ++++--
4 files changed, 79 insertions(+), 28 deletions(-)
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/Scraper.kt
b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/Scraper.kt
index 1492afa..463139a 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/Scraper.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/Scraper.kt
@@ -126,7 +126,7 @@ class Scraper(
}
}
- for (i in 1..10) {
+ while (started.get()) {
service?.lookupAsync(SECP256K1.KeyPair.random().publicKey())?.thenAccept
{
for (newNode in it) {
if (nodes.add(newNode)) {
diff --git
a/eth-crawler/src/main/kotlin/org/apache/tuweni/eth/crawler/CrawlerApp.kt
b/eth-crawler/src/main/kotlin/org/apache/tuweni/eth/crawler/CrawlerApp.kt
index e8af7e2..10fc606 100644
--- a/eth-crawler/src/main/kotlin/org/apache/tuweni/eth/crawler/CrawlerApp.kt
+++ b/eth-crawler/src/main/kotlin/org/apache/tuweni/eth/crawler/CrawlerApp.kt
@@ -20,10 +20,15 @@ import com.zaxxer.hikari.HikariDataSource
import io.opentelemetry.sdk.metrics.SdkMeterProvider
import io.vertx.core.Vertx
import io.vertx.core.net.SocketAddress
+import kotlinx.coroutines.CoroutineDispatcher
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.async
+import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
+import org.apache.tuweni.concurrent.AsyncCompletion
import org.apache.tuweni.concurrent.ExpiringSet
import org.apache.tuweni.concurrent.coroutines.await
import org.apache.tuweni.crypto.SECP256K1
@@ -44,6 +49,7 @@ import java.net.InetSocketAddress
import java.nio.file.Files
import java.nio.file.Paths
import java.security.Security
+import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicBoolean
/**
@@ -63,8 +69,12 @@ object CrawlerApp {
}
System.exit(1)
}
- run(vertx, config)
+ val app = CrawlerApplication()
+ app.run(vertx, config)
}
+}
+
+class CrawlerApplication(override val coroutineContext: CoroutineDispatcher =
Executors.newFixedThreadPool(20).asCoroutineDispatcher()) : CoroutineScope {
fun run(vertx: Vertx, config: CrawlerConfig) {
val ds = HikariDataSource()
@@ -81,7 +91,8 @@ object CrawlerApp {
vertx = vertx,
initialURIs = config.bootNodes(),
bindAddress = SocketAddress.inetSocketAddress(config.discoveryPort(),
config.discoveryNetworkInterface()),
- repository = repo
+ repository = repo,
+
)
val contents = if (config.network() == null) {
Files.readAllBytes(Paths.get(config.genesisFile()))
@@ -112,11 +123,23 @@ object CrawlerApp {
}
}
- val rlpxService = VertxRLPxService(vertx, 0, "127.0.0.1", 0,
SECP256K1.KeyPair.random(), listOf(ethHelloProtocol), "Apache Tuweni network
crawler", meter)
+ val rlpxService = VertxRLPxService(
+ vertx,
+ 30303,
+ "127.0.0.1",
+ 30303,
+ SECP256K1.KeyPair.random(),
+ listOf(ethHelloProtocol),
+ "Apache Tuweni network crawler",
+ meter
+ )
repo.addListener {
- connect(rlpxService, it.nodeId, InetSocketAddress(it.endpoint.address,
it.endpoint.tcpPort ?: 30303))
+ runBlocking {
+ connect(rlpxService, it.nodeId, InetSocketAddress(it.endpoint.address,
it.endpoint.tcpPort ?: 30303))
+ }
}
- val restService = CrawlerRESTService(port = config.restPort(),
networkInterface = config.restNetworkInterface(), repository = repo)
+ val restService =
+ CrawlerRESTService(port = config.restPort(), networkInterface =
config.restNetworkInterface(), repository = repo)
val refreshLoop = AtomicBoolean(true)
val ethstatsDataRepository = EthstatsDataRepository(ds)
val ethstatsServer = EthStatsServer(
@@ -146,10 +169,14 @@ object CrawlerApp {
while (refreshLoop.get()) {
try {
for (connectionInfo in repo.getPeers(System.currentTimeMillis() -
60 * 60 * 1000)) {
- connect(rlpxService, connectionInfo.nodeId,
InetSocketAddress(connectionInfo.host, if (connectionInfo.port == 0) 30303 else
connectionInfo.port))
+ connect(
+ rlpxService,
+ connectionInfo.nodeId,
+ InetSocketAddress(connectionInfo.host, if (connectionInfo.port
== 0) 30303 else connectionInfo.port)
+ )
}
} catch (e: Exception) {
- logger.error("Error connecting to pending peers", e)
+ logger.error("Error connecting to peers", e)
}
delay(5 * 60 * 1000)
}
@@ -157,28 +184,41 @@ object CrawlerApp {
rlpxService.start().await()
scraper.start()
ethstatsServer.start()
+ val peerSeen = ExpiringSet<PeerConnectionInfo>(5 * 60 * 1000)
launch {
while (refreshLoop.get()) {
try {
- for (connectionInfo in repo.getPendingPeers()) {
- connect(
- rlpxService,
- connectionInfo.nodeId,
- InetSocketAddress(connectionInfo.host, if (connectionInfo.port
== 0) 30303 else connectionInfo.port)
- )
+ launch {
+ logger.info("Requesting pending peers")
+ val pendingPeers = repo.getPendingPeers(1000)
+ logger.info("Requesting connections to ${pendingPeers.size}
peers")
+ pendingPeers.map { connectionInfo ->
+ async {
+ if (peerSeen.add(connectionInfo)) {
+ connect(
+ rlpxService,
+ connectionInfo.nodeId,
+ InetSocketAddress(
+ connectionInfo.host,
+ if (connectionInfo.port == 0) 30303 else
connectionInfo.port
+ )
+ ).await()
+ }
+ }
+ }.awaitAll()
}
} catch (e: Exception) {
logger.error("Error connecting to pending peers", e)
}
- delay(2 * 60 * 1000)
+ delay(10 * 1000)
}
}
}
}
-}
-fun connect(rlpxService: RLPxService, key: SECP256K1.PublicKey, address:
InetSocketAddress) {
- rlpxService.connectTo(key, address).thenAccept {
- rlpxService.disconnect(it, DisconnectReason.CLIENT_QUITTING)
+ fun connect(rlpxService: RLPxService, key: SECP256K1.PublicKey, address:
InetSocketAddress): AsyncCompletion {
+ return rlpxService.connectTo(key, address).thenAccept {
+ rlpxService.disconnect(it, DisconnectReason.CLIENT_QUITTING)
+ }
}
}
diff --git
a/eth-crawler/src/main/kotlin/org/apache/tuweni/eth/crawler/RelationalPeerRepository.kt
b/eth-crawler/src/main/kotlin/org/apache/tuweni/eth/crawler/RelationalPeerRepository.kt
index 8050155..1847f31 100644
---
a/eth-crawler/src/main/kotlin/org/apache/tuweni/eth/crawler/RelationalPeerRepository.kt
+++
b/eth-crawler/src/main/kotlin/org/apache/tuweni/eth/crawler/RelationalPeerRepository.kt
@@ -59,7 +59,7 @@ open class RelationalPeerRepository(
fun get(nodeId: SECP256K1.PublicKey, endpoint: Endpoint): Peer {
dataSource.connection.use { conn ->
- logger.info("Get peer with $nodeId")
+ logger.trace("Get peer with $nodeId")
val stmt = conn.prepareStatement("select id,publickey from identity
where publickey=?")
stmt.setBytes(1, nodeId.bytes().toArrayUnsafe())
try {
@@ -82,7 +82,7 @@ open class RelationalPeerRepository(
}
return newPeer
} else {
- logger.debug("Found existing peer with public key
${nodeId.toHexString()}")
+ logger.trace("Found existing peer with public key
${nodeId.toHexString()}")
val id = rs.getString(1)
val pubKey = rs.getBytes(2)
return
RepositoryPeer(SECP256K1.PublicKey.fromBytes(Bytes.wrap(pubKey)), id, endpoint,
dataSource)
@@ -194,17 +194,17 @@ open class RelationalPeerRepository(
}
}
- internal fun getPendingPeers(): Set<PeerConnectionInfo> {
+ internal fun getPendingPeers(from: Int = 0, limit: Int = 100):
List<PeerConnectionInfo> {
dataSource.connection.use { conn ->
val stmt =
conn.prepareStatement(
"select endpoint.host, endpoint.port, identity.publickey from
endpoint inner " +
- "join identity on (endpoint.identity = identity.id) where
endpoint.identity NOT IN (select identity from nodeinfo)"
+ "join identity on (endpoint.identity = identity.id) where
endpoint.identity NOT IN (select identity from nodeinfo) order by
endpoint.lastSeen desc limit $limit offset $from"
)
stmt.use {
// map results.
val rs = stmt.executeQuery()
- val result = mutableSetOf<PeerConnectionInfo>()
+ val result = mutableListOf<PeerConnectionInfo>()
while (rs.next()) {
val pubkey =
SECP256K1.PublicKey.fromBytes(Bytes.wrap(rs.getBytes(3)))
val port = rs.getInt(2)
diff --git
a/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java
b/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java
index c74c034..c5485b8 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java
@@ -74,14 +74,15 @@ public final class VertxRLPxService implements RLPxService {
private final WireConnectionRepository repository;
private final LongCounter connectionsCreatedCounter;
private final LongCounter connectionsDisconnectedCounter;
+ private final List<SECP256K1.PublicKey> keepAliveList = new ArrayList<>();
+ private final int connectTimeout = 5 * 1000;
+ private final int idleTimeout = 30 * 1000;
private LinkedHashMap<SubProtocolIdentifier, SubProtocolHandler> handlers;
private LinkedHashMap<SubProtocolIdentifier, SubProtocolClient> clients;
private NetClient client;
private NetServer server;
- private List<SECP256K1.PublicKey> keepAliveList = new ArrayList<>();
-
private static void checkPort(int port) {
if (port < 0 || port > 65536) {
throw new IllegalArgumentException("Invalid port: " + port);
@@ -200,9 +201,19 @@ public final class VertxRLPxService implements RLPxService
{
}
}
- client = vertx.createNetClient(new NetClientOptions());
+ client = vertx
+ .createNetClient(
+ new NetClientOptions()
+ .setTcpKeepAlive(true)
+ .setConnectTimeout(connectTimeout)
+ .setIdleTimeout(idleTimeout));
server = vertx
- .createNetServer(new
NetServerOptions().setPort(listenPort).setHost(networkInterface).setTcpKeepAlive(true))
+ .createNetServer(
+ new NetServerOptions()
+ .setPort(listenPort)
+ .setHost(networkInterface)
+ .setTcpKeepAlive(true)
+ .setIdleTimeout(idleTimeout))
.connectHandler(this::receiveMessage);
CompletableAsyncCompletion complete = AsyncCompletion.incomplete();
server.listen(res -> {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]