This is an automated email from the ASF dual-hosted git repository. toulmean pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
commit dc438454910df47c12ce172c23a6ff8e685e74bf Author: Antoine Toulme <[email protected]> AuthorDate: Tue May 28 00:21:26 2019 -0700 Add support for EIP-868 to devp2p v4 --- .../org/apache/tuweni/devp2p/DiscoveryService.kt | 145 ++++++++++++++++++- .../org/apache/tuweni/devp2p/EthereumNodeRecord.kt | 12 ++ .../main/kotlin/org/apache/tuweni/devp2p/Packet.kt | 158 +++++++++++++++++++-- .../kotlin/org/apache/tuweni/devp2p/PacketType.kt | 16 +++ .../main/kotlin/org/apache/tuweni/devp2p/Peer.kt | 15 ++ .../org/apache/tuweni/devp2p/PeerRepository.kt | 10 +- .../tuweni/devp2p/DiscoveryServiceJavaTest.java | 3 + .../apache/tuweni/devp2p/DiscoveryServiceTest.kt | 8 +- ...{PongPacketTest.kt => ENRResponsePacketTest.kt} | 38 ++--- .../org/apache/tuweni/devp2p/PingPacketTest.kt | 30 +++- .../org/apache/tuweni/devp2p/PongPacketTest.kt | 28 +++- 11 files changed, 409 insertions(+), 54 deletions(-) diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/DiscoveryService.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/DiscoveryService.kt index 793eb91..9a09060 100644 --- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/DiscoveryService.kt +++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/DiscoveryService.kt @@ -40,6 +40,7 @@ import kotlinx.coroutines.launch import kotlinx.coroutines.withTimeout import kotlinx.coroutines.withTimeoutOrNull import kotlinx.coroutines.yield +import org.apache.tuweni.bytes.Bytes import org.apache.tuweni.bytes.Bytes32 import org.apache.tuweni.concurrent.AsyncCompletion import org.apache.tuweni.concurrent.AsyncResult @@ -61,6 +62,7 @@ import java.net.SocketAddress import java.net.URI import java.nio.ByteBuffer import java.nio.channels.ClosedChannelException +import java.time.Instant import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicLong @@ -69,7 +71,9 @@ import kotlin.coroutines.CoroutineContext internal const val PACKET_EXPIRATION_PERIOD_MS: Long = (20 * 1000) // 20 seconds internal const val PACKET_EXPIRATION_CHECK_GRACE_MS: Long = (5 * 1000) // 5 seconds internal const val PEER_VERIFICATION_TIMEOUT_MS: Long = (22 * 1000) // 22 seconds (packet expiration + a little) +internal const val ENR_REQUEST_TIMEOUT_MS: Long = (22 * 1000) // 22 seconds (packet expiration + a little) internal const val PEER_VERIFICATION_RETRY_DELAY_MS: Long = (5 * 60 * 1000) // 5 minutes +internal const val ENR_REQUEST_RETRY_DELAY_MS: Long = (5 * 60 * 1000) // 5 minutes internal const val BOOTSTRAP_PEER_VERIFICATION_TIMEOUT_MS: Long = (2 * 60 * 1000) // 2 minutes internal const val REFRESH_INTERVAL_MS: Long = (60 * 1000) // 1 minute internal const val PING_RETRIES: Int = 20 @@ -84,7 +88,9 @@ internal const val LOOKUP_RESPONSE_TIMEOUT_MS: Long = 500 // 500 milliseconds /** * An Ethereum ÐΞVp2p discovery service. * - * @author Chris Leishman - https://cleishm.github.io/ + * This service supports devp2p discovery v4, alongside support for EIP-868. + * http://eips.ethereum.org/EIPS/eip-868 + * */ interface DiscoveryService { @@ -99,6 +105,8 @@ interface DiscoveryService { * @param port the port to listen on (defaults to `0`, which will cause a random free port to be chosen) * @param host the host name or IP address of the interface to bind to (defaults to `null`, which will cause the * service to listen on all interfaces + * @param seq the sequence number of the Ethereum Node Record + * @param enrData the additional key/value pair entries to broadcast as an Ethereum Node Record (ENR). * @param bootstrapURIs the URIs for bootstrap nodes * @param peerRepository a [PeerRepository] for obtaining [Peer] instances * @param advertiseAddress the IP address to advertise to peers, or `null` if the address of the first bound @@ -117,6 +125,8 @@ interface DiscoveryService { keyPair: SECP256K1.KeyPair, port: Int = 0, host: String? = null, + seq: Long = Instant.now().toEpochMilli(), + enrData: Map<String, Bytes> = emptyMap(), bootstrapURIs: List<URI> = emptyList(), peerRepository: PeerRepository = EphemeralPeerRepository(), advertiseAddress: InetAddress? = null, @@ -133,6 +143,8 @@ interface DiscoveryService { return open( keyPair, bindAddress, + seq, + enrData, bootstrapURIs, peerRepository, advertiseAddress, @@ -152,6 +164,8 @@ interface DiscoveryService { * * @param keyPair the local node's keypair * @param bindAddress the address to listen on + * @param seq the sequence number of the Ethereum Node Record + * @param enrData the additional key/value pair entries to broadcast as an Ethereum Node Record (ENR). * @param bootstrapURIs the URIs for bootstrap nodes * @param peerRepository a [PeerRepository] for obtaining [Peer] instances * @param advertiseAddress the IP address to advertise for incoming packets @@ -168,6 +182,8 @@ interface DiscoveryService { fun open( keyPair: SECP256K1.KeyPair, bindAddress: InetSocketAddress, + seq: Long = Instant.now().toEpochMilli(), + enrData: Map<String, Bytes> = emptyMap(), bootstrapURIs: List<URI> = emptyList(), peerRepository: PeerRepository = EphemeralPeerRepository(), advertiseAddress: InetAddress? = null, @@ -181,8 +197,8 @@ interface DiscoveryService { timeSupplier: () -> Long = CURRENT_TIME_SUPPLIER ): DiscoveryService { return CoroutineDiscoveryService( - keyPair, bindAddress, bootstrapURIs, advertiseAddress, advertiseUdpPort, advertiseTcpPort, peerRepository, - routingTable, packetFilter, loggerProvider, channelGroup, bufferAllocator, timeSupplier + keyPair, seq, enrData, bindAddress, bootstrapURIs, advertiseAddress, advertiseUdpPort, advertiseTcpPort, + peerRepository, routingTable, packetFilter, loggerProvider, channelGroup, bufferAllocator, timeSupplier ) } } @@ -259,10 +275,13 @@ interface DiscoveryService { val unvalidatedPeerPackets: Long val unexpectedPongs: Long val unexpectedNeighbors: Long + val unexpectedENRResponses: Long } internal class CoroutineDiscoveryService( private val keyPair: SECP256K1.KeyPair, + private val seq: Long = Instant.now().toEpochMilli(), + private val enrData: Map<String, Bytes>, bindAddress: InetSocketAddress, bootstrapURIs: List<URI> = emptyList(), advertiseAddress: InetAddress? = null, @@ -280,8 +299,8 @@ internal class CoroutineDiscoveryService( private val logger = loggerProvider.getLogger(DiscoveryService::class.java) private val serviceDescriptor = "ÐΞVp2p discovery " + System.identityHashCode(this) - private val selfEndpoint: Endpoint + private val enr: Bytes private val job = Job() // override the default exception handler, which dumps to stderr @@ -306,7 +325,10 @@ internal class CoroutineDiscoveryService( private val verifyingEndpoints: Cache<InetSocketAddress, EndpointVerification> = CacheBuilder.newBuilder().expireAfterAccess(PEER_VERIFICATION_RETRY_DELAY_MS, TimeUnit.MILLISECONDS).build() + private val requestingENRs: Cache<InetSocketAddress, ENRRequest> = + CacheBuilder.newBuilder().expireAfterAccess(ENR_REQUEST_RETRY_DELAY_MS, TimeUnit.MILLISECONDS).build() private val awaitingPongs = ConcurrentHashMap<Bytes32, EndpointVerification>() + private val awaitingENRs = ConcurrentHashMap<Bytes32, ENRRequest>() private val findNodeStates: Cache<SECP256K1.PublicKey, FindNodeState> = CacheBuilder.newBuilder().expireAfterAccess(FIND_NODES_CACHE_EXPIRY, TimeUnit.MILLISECONDS) .removalListener<SECP256K1.PublicKey, FindNodeState> { it.value.close() } @@ -318,6 +340,7 @@ internal class CoroutineDiscoveryService( override var filteredPackets: Long by AtomicLong(0) override var unvalidatedPeerPackets: Long by AtomicLong(0) override var unexpectedPongs: Long by AtomicLong(0) + override var unexpectedENRResponses: Long by AtomicLong(0) override var unexpectedNeighbors: Long by AtomicLong(0) init { @@ -329,6 +352,9 @@ internal class CoroutineDiscoveryService( advertiseTcpPort ) + enr = EthereumNodeRecord.toRLP(keyPair, seq, enrData, selfEndpoint.address, selfEndpoint.tcpPort, + selfEndpoint.udpPort) + val bootstrapping = bootstrapURIs.map { uri -> activityLatch.countUp() async { @@ -535,6 +561,8 @@ internal class CoroutineDiscoveryService( is PongPacket -> handlePong(packet, address, arrivalTime) is FindNodePacket -> handleFindNode(packet, address, arrivalTime) is NeighborsPacket -> handleNeighbors(packet, address) + is ENRRequestPacket -> handleENRRequest(packet, address, arrivalTime) + is ENRResponsePacket -> handleENRResponse(packet, address, arrivalTime) }.let {} // guarantees "when" matching is exhaustive } @@ -553,7 +581,7 @@ internal class CoroutineDiscoveryService( ) } - val pong = PongPacket.create(keyPair, timeSupplier(), currentEndpoint, packet.hash) + val pong = PongPacket.create(keyPair, timeSupplier(), currentEndpoint, packet.hash, seq) sendPacket(from, pong) // https://github.com/ethereum/devp2p/blob/master/discv4.md#ping-packet-0x01 also suggests sending a ping // packet if the peer is unknown, however sending two packets in response to a single incoming would allow a @@ -594,6 +622,13 @@ internal class CoroutineDiscoveryService( } pending.complete(VerificationResult(peer, endpoint)) + + if (packet.enrSeq != null) { + if (peer.enr == null || peer.enr!!.seq < packet.enrSeq) { + val now = timeSupplier() + withTimeoutOrNull(ENR_REQUEST_TIMEOUT_MS) { enrRequest(endpoint, peer).verify(now) } + } + } } private suspend fun handleFindNode(packet: FindNodePacket, from: InetSocketAddress, arrivalTime: Long) { @@ -642,6 +677,47 @@ internal class CoroutineDiscoveryService( } } + private suspend fun handleENRRequest(packet: ENRRequestPacket, from: InetSocketAddress, arrivalTime: Long) { + val peer = peerRepository.get(packet.nodeId) + val (_, endpoint) = ensurePeerIsValid(peer, from, arrivalTime) ?: run { + logger.debug("{}: received enrRequest from {} which cannot be validated", serviceDescriptor, from) + ++unvalidatedPeerPackets + return + } + + logger.debug("{}: received enrRequest from {}", serviceDescriptor, from) + + val address = endpoint.udpSocketAddress + sendPacket(address, ENRResponsePacket.create(keyPair, timeSupplier(), packet.hash, enr)) + } + + private suspend fun handleENRResponse(packet: ENRResponsePacket, from: InetSocketAddress, arrivalTime: Long) { + val pending = awaitingENRs.remove(packet.requestHash) ?: run { + logger.debug("{}: received unexpected or late enr response from {}", serviceDescriptor, from) + ++unexpectedENRResponses + return + } + packet.requestHash + val sender = pending.peer + // COMPATIBILITY: If the node-id's don't match, the pong should probably be rejected. However, if a different + // peer is listening at the same address, it will respond to the ping with its node-id. Instead of rejecting, + // accept the pong and update the new peer record with the proven endpoint, preferring to keep its current + // tcpPort and otherwise keeping the tcpPort of the original peer. + val peer = if (sender.nodeId == packet.nodeId) sender else peerRepository.get(packet.nodeId) + + val enr = EthereumNodeRecord.fromRLP(packet.enr) + try { + enr.validate() + } catch (e: InvalidNodeRecordException) { + logger.debug("Invalid ENR", e) + return + } + + peer.updateENR(enr, arrivalTime) + + pending.complete(ENRResult(peer, enr)) + } + private fun addToRoutingTable(peer: Peer) { routingTable.add(peer)?.let { contested -> launch { @@ -673,7 +749,7 @@ internal class CoroutineDiscoveryService( private fun endpointVerification(endpoint: Endpoint, peer: Peer) = verifyingEndpoints.get(endpoint.udpSocketAddress) { EndpointVerification(endpoint, peer) } - // a representation of the state and current action for verifying and endpoint, + // a representation of the state and current action for verifying an endpoint, // to avoid concurrent attempts to verify the same endpoint private inner class EndpointVerification(val endpoint: Endpoint, val peer: Peer) { private val deferred = CompletableDeferred<VerificationResult?>() @@ -718,7 +794,7 @@ internal class CoroutineDiscoveryService( } private suspend fun sendPing(now: Long = timeSupplier()) { - val pingPacket = PingPacket.create(keyPair, now, selfEndpoint, endpoint) + val pingPacket = PingPacket.create(keyPair, now, selfEndpoint, endpoint, seq) // create local references to be captured in the closure, rather than the whole packet instance val hash = pingPacket.hash @@ -753,6 +829,61 @@ internal class CoroutineDiscoveryService( val endpoint: Endpoint ) + private fun enrRequest(endpoint: Endpoint, peer: Peer) = + requestingENRs.get(endpoint.udpSocketAddress) { ENRRequest(endpoint, peer) } + + // a representation of the state and current action for querying an ENR from a peer, + // to avoid concurrent attempts to request the same information. + private inner class ENRRequest(val endpoint: Endpoint, val peer: Peer) { + private val deferred = CompletableDeferred<ENRResult?>() + @Volatile + private var active: Job? = null + private var nextENRRequest: Long = 0 + private var retryDelay: Long = 0 + + suspend fun verify(now: Long = timeSupplier()): ENRResult? { + if (!deferred.isCompleted) { + // if not already actively requesting and enough time has passed since the last request, send a single request + synchronized(this) { + if (active?.isCompleted != false && now >= nextENRRequest) { + nextENRRequest = now + RESEND_DELAY_MS + launch { sendENRRequest(now) } + } + } + } + return deferred.await() + } + + private suspend fun sendENRRequest(now: Long = timeSupplier()) { + val enrRequestPacket = ENRRequestPacket.create(keyPair, now) + + // create local references to be captured in the closure, rather than the whole packet instance + val hash = enrRequestPacket.hash + val timeout = enrRequestPacket.expiration - now + + // very unlikely that there is another ping packet created with the same hash yet a different ENRRequest + // instance, but if there is then the first will be waiting on a deferred that never completes and will + // eventually time out + if (awaitingENRs.put(hash, this) != this) { + launch { + delay(timeout) + awaitingENRs.remove(hash) + } + sendPacket(endpoint.udpSocketAddress, enrRequestPacket) + } + } + + fun complete(result: ENRResult?): Boolean { + active?.cancel() + return deferred.complete(result) + } + } + + private data class ENRResult( + val peer: Peer, + val enr: EthereumNodeRecord + ) + @UseExperimental(ObsoleteCoroutinesApi::class) private suspend fun findNodes(peer: Peer, target: SECP256K1.PublicKey) { // consume all received nodes (and discard), thus suspending until completed diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/EthereumNodeRecord.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/EthereumNodeRecord.kt index b40a9e0..776450e 100644 --- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/EthereumNodeRecord.kt +++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/EthereumNodeRecord.kt @@ -157,6 +157,18 @@ class EthereumNodeRecord(val signature: Bytes, val seq: Long, val data: Map<Stri val ecPoint = SECP256K1.Parameters.CURVE.getCurve().decodePoint(keyBytes.toArrayUnsafe()) return SECP256K1.PublicKey.fromBytes(Bytes.wrap(ecPoint.getEncoded(false)).slice(1)) } + + fun ip(): InetAddress { + return InetAddress.getByAddress(data["ip"]!!.toArrayUnsafe()) + } + + fun tcp(): Int { + return data["tcp"]!!.toInt() + } + + fun udp(): Int { + return data["udp"]!!.toInt() + } } internal class InvalidNodeRecordException(message: String?) : RuntimeException(message) diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/Packet.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/Packet.kt index a46bad1..f1a6d88 100644 --- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/Packet.kt +++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/Packet.kt @@ -114,19 +114,20 @@ internal class PingPacket private constructor( hash: Bytes32, val from: Endpoint, val to: Endpoint, - expiration: Long + expiration: Long, + val enrSeq: Long? ) : Packet(nodeId, signature, hash, expiration) { companion object { private const val VERSION = 4 - fun create(keyPair: SECP256K1.KeyPair, now: Long, from: Endpoint, to: Endpoint): PingPacket { + fun create(keyPair: SECP256K1.KeyPair, now: Long, from: Endpoint, to: Endpoint, seq: Long?): PingPacket { val expiration = expirationFor(now) val sigHash = createSignature( PacketType.PING, keyPair ) { writer -> - encodeTo(writer, from, to, expiration) + encodeTo(writer, from, to, expiration, seq) } return PingPacket( keyPair.publicKey(), @@ -134,7 +135,8 @@ internal class PingPacket private constructor( sigHash.hash, from, to, - expiration + expiration, + seq ) } @@ -150,27 +152,36 @@ internal class PingPacket private constructor( val from = reader.readList { r -> Endpoint.readFrom(r) } val to = reader.readList { r -> Endpoint.readFrom(r) } val expiration = reader.readLong() // seconds + val seq: Long? + if (!reader.isComplete) { + seq = reader.readLong() + } else { + seq = null + } if (version < VERSION) { throw DecodingException("Unexpected version $VERSION in ping") } - PingPacket(publicKey, signature, hash, from, to, secToMsec(expiration)) + PingPacket(publicKey, signature, hash, from, to, secToMsec(expiration), seq) } } catch (e: RLPException) { throw DecodingException("Invalid ping packet", e) } } - private fun encodeTo(writer: RLPWriter, from: Endpoint, to: Endpoint, expiration: Long) { + private fun encodeTo(writer: RLPWriter, from: Endpoint, to: Endpoint, expiration: Long, seq: Long?) { writer.writeInt(VERSION) writer.writeList { w -> from.writeTo(w) } writer.writeList { w -> to.writeTo(w) } writer.writeLong(msecToSec(expiration)) // write in seconds + seq?.let { + writer.writeLong(it) + } } } override fun encodeTo(dst: ByteBuffer) = encodeTo(dst, PacketType.PING) { writer -> - encodeTo(writer, from, to, expiration) + encodeTo(writer, from, to, expiration, enrSeq) } } @@ -180,17 +191,18 @@ internal class PongPacket private constructor( hash: Bytes32, val to: Endpoint, val pingHash: Bytes32, - expiration: Long + expiration: Long, + val enrSeq: Long? ) : Packet(nodeId, signature, hash, expiration) { companion object { - fun create(keyPair: SECP256K1.KeyPair, now: Long, to: Endpoint, pingHash: Bytes32): PongPacket { + fun create(keyPair: SECP256K1.KeyPair, now: Long, to: Endpoint, pingHash: Bytes32, enrSeq: Long?): PongPacket { val expiration = expirationFor(now) val sigHash = createSignature( PacketType.PONG, keyPair ) { writer -> - encodeTo(writer, to, pingHash, expiration) + encodeTo(writer, to, pingHash, expiration, enrSeq) } return PongPacket( keyPair.publicKey(), @@ -198,7 +210,8 @@ internal class PongPacket private constructor( sigHash.hash, to, pingHash, - expiration + expiration, + enrSeq ) } @@ -213,22 +226,29 @@ internal class PongPacket private constructor( val to = reader.readList { r -> Endpoint.readFrom(r) } val pingHash = Bytes32.wrap(reader.readValue()) val expiration = reader.readLong() // seconds - PongPacket(publicKey, signature, hash, to, pingHash, secToMsec(expiration)) + val seq: Long? + if (!reader.isComplete) { + seq = reader.readLong() + } else { + seq = null + } + PongPacket(publicKey, signature, hash, to, pingHash, secToMsec(expiration), seq) } } catch (e: RLPException) { throw DecodingException("Invalid pong packet", e) } } - private fun encodeTo(writer: RLPWriter, to: Endpoint, pingHash: Bytes32, expiration: Long) { + private fun encodeTo(writer: RLPWriter, to: Endpoint, pingHash: Bytes32, expiration: Long, enrSeq: Long?) { writer.writeList { w -> to.writeTo(w) } writer.writeValue(pingHash) writer.writeLong(msecToSec(expiration)) + enrSeq?.let { writer.writeLong(it) } } } override fun encodeTo(dst: ByteBuffer) = encodeTo(dst, PacketType.PONG) { writer -> - encodeTo(writer, to, pingHash, expiration) + encodeTo(writer, to, pingHash, expiration, enrSeq) } } @@ -366,3 +386,113 @@ internal class NeighborsPacket private constructor( encodeTo(writer, nodes, expiration) } } + +internal class ENRRequestPacket private constructor( + nodeId: SECP256K1.PublicKey, + signature: SECP256K1.Signature, + hash: Bytes32, + expiration: Long +) : Packet(nodeId, signature, hash, expiration) { + + companion object { + fun decode( + payload: Bytes, + hash: Bytes32, + publicKey: SECP256K1.PublicKey, + signature: SECP256K1.Signature + ): ENRRequestPacket { + try { + return RLP.decodeList(payload) { reader -> + val expiration = reader.readLong() + ENRRequestPacket(publicKey, signature, hash, secToMsec(expiration)) + } + } catch (e: RLPException) { + throw DecodingException("Invalid enr request packet", e) + } + } + + private fun encodeTo(writer: RLPWriter, expiration: Long) { + writer.writeLong(msecToSec(expiration)) + } + + fun create(keyPair: SECP256K1.KeyPair, now: Long): Packet { + val expiration = expirationFor(now) + val sigHash = createSignature( + PacketType.ENRRESPONSE, + keyPair + ) { writer -> + ENRRequestPacket.encodeTo(writer, expiration) + } + return ENRRequestPacket( + keyPair.publicKey(), + sigHash.signature, + sigHash.hash, + expiration + ) + } + } + + override fun encodeTo(dst: ByteBuffer) = encodeTo(dst, PacketType.ENRREQUEST) { writer -> + ENRRequestPacket.encodeTo(writer, expiration) + } +} + +internal class ENRResponsePacket private constructor( + nodeId: SECP256K1.PublicKey, + signature: SECP256K1.Signature, + hash: Bytes32, + expiration: Long, + val requestHash: Bytes, + val enr: Bytes +) : Packet(nodeId, signature, hash, expiration) { + + companion object { + + fun create(keyPair: SECP256K1.KeyPair, now: Long, requestHash: Bytes, enr: Bytes): ENRResponsePacket { + val expiration = expirationFor(now) + val sigHash = createSignature( + PacketType.ENRRESPONSE, + keyPair + ) { writer -> + encodeTo(writer, requestHash, enr, expiration) + } + return ENRResponsePacket( + keyPair.publicKey(), + sigHash.signature, + sigHash.hash, + expiration, + requestHash, + enr + ) + } + + fun decode( + payload: Bytes, + hash: Bytes32, + publicKey: SECP256K1.PublicKey, + signature: SECP256K1.Signature + ): ENRResponsePacket { + try { + return RLP.decodeList(payload) { reader -> + //request-hash, ENR + val requestHash = reader.readValue() + val enr = reader.readValue() + val expiration = reader.readLong() + ENRResponsePacket(publicKey, signature, hash, secToMsec(expiration), requestHash, enr) + } + } catch (e: RLPException) { + throw DecodingException("Invalid enr response packet", e) + } + } + + private fun encodeTo(writer: RLPWriter, requestHash: Bytes, enr: Bytes, expiration: Long) { + writer.writeValue(requestHash) + writer.writeValue(enr) + writer.writeLong(msecToSec(expiration)) + } + } + + override fun encodeTo(dst: ByteBuffer) = encodeTo(dst, PacketType.ENRRESPONSE) { writer -> + ENRResponsePacket.encodeTo(writer, requestHash, enr, expiration) + } +} diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/PacketType.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/PacketType.kt index 55b91af..c46934c 100644 --- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/PacketType.kt +++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/PacketType.kt @@ -55,6 +55,22 @@ internal enum class PacketType( publicKey: SECP256K1.PublicKey, signature: SECP256K1.Signature ) = NeighborsPacket.decode(payload, hash, publicKey, signature) + }, + ENRREQUEST(0x05) { + override fun decode( + payload: Bytes, + hash: Bytes32, + publicKey: SECP256K1.PublicKey, + signature: SECP256K1.Signature + ) = ENRRequestPacket.decode(payload, hash, publicKey, signature) + }, + ENRRESPONSE(0x06) { + override fun decode( + payload: Bytes, + hash: Bytes32, + publicKey: SECP256K1.PublicKey, + signature: SECP256K1.Signature + ) = ENRResponsePacket.decode(payload, hash, publicKey, signature) }; companion object { diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/Peer.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/Peer.kt index d281558..ae3b3ad 100644 --- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/Peer.kt +++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/Peer.kt @@ -34,6 +34,11 @@ interface Peer { val endpoint: Endpoint? /** + * The Ethereum Node Record associated with the peer, if known. + */ + val enr: EthereumNodeRecord? + + /** * The last time the current endpoint of this peer was verified, in milliseconds since the epoch. * * Endpoint is verified by a ping/pong cycle: https://github.com/ethereum/devp2p/blob/master/discv4.md#endpoint-proof @@ -89,4 +94,14 @@ interface Peer { * @throws IllegalStateException if there is no endpoint for this peer */ fun seenAt(time: Long) + + /** + * Update the peer's ENR. + * + * Will only update if the [seq] is larger than the one associated with the peer. + * + * @param record the ENR record associated with the peer + * @param time the time this endpoint information was determined, in milliseconds since the epoch + */ + fun updateENR(record: EthereumNodeRecord, time: Long) } diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/PeerRepository.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/PeerRepository.kt index 1271efc..5fb267d 100644 --- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/PeerRepository.kt +++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/PeerRepository.kt @@ -132,10 +132,10 @@ class EphemeralPeerRepository : PeerRepository { override val nodeId: SECP256K1.PublicKey, knownEndpoint: Endpoint? = null ) : Peer { - @Volatile override var endpoint: Endpoint? = knownEndpoint + override var enr: EthereumNodeRecord? = null @Synchronized override fun getEndpoint(ifVerifiedOnOrAfter: Long): Endpoint? { if ((lastVerified ?: 0) >= ifVerifiedOnOrAfter) { @@ -191,5 +191,13 @@ class EphemeralPeerRepository : PeerRepository { lastSeen = time } } + + @Synchronized + override fun updateENR(record: EthereumNodeRecord, time: Long) { + if (enr == null || enr!!.seq < record.seq) { + enr = record + updateEndpoint(Endpoint(record.ip(), record.udp(), record.tcp()), time) + } + } } } diff --git a/devp2p/src/test/java/org/apache/tuweni/devp2p/DiscoveryServiceJavaTest.java b/devp2p/src/test/java/org/apache/tuweni/devp2p/DiscoveryServiceJavaTest.java index bcaae43..ef8e082 100644 --- a/devp2p/src/test/java/org/apache/tuweni/devp2p/DiscoveryServiceJavaTest.java +++ b/devp2p/src/test/java/org/apache/tuweni/devp2p/DiscoveryServiceJavaTest.java @@ -12,6 +12,7 @@ */ package org.apache.tuweni.devp2p; +import static java.util.Collections.emptyMap; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -56,6 +57,8 @@ class DiscoveryServiceJavaTest { SECP256K1.KeyPair.random(), 0, "localhost", + 1, + emptyMap(), Collections.singletonList(URI.create("enode://" + peerKeyPair.publicKey().toHexString() + "@127.0.0.1:10000")), repository); AsyncResult<Peer> result = repository.getAsync(peerKeyPair.publicKey()); diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/DiscoveryServiceTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/DiscoveryServiceTest.kt index cd0c7dc..0eedbd0 100644 --- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/DiscoveryServiceTest.kt +++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/DiscoveryServiceTest.kt @@ -80,7 +80,7 @@ internal class DiscoveryServiceTest { val clientKeyPair = SECP256K1.KeyPair.random() val client = CoroutineDatagramChannel.open() val clientEndpoint = Endpoint("192.168.1.1", 5678, 7654) - val ping = PingPacket.create(clientKeyPair, System.currentTimeMillis(), clientEndpoint, Endpoint(address)) + val ping = PingPacket.create(clientKeyPair, System.currentTimeMillis(), clientEndpoint, Endpoint(address), null) client.send(ping, address) val pong = client.receivePacket() as PongPacket @@ -119,7 +119,7 @@ internal class DiscoveryServiceTest { assertEquals(discoveryService.localPort, ping.from.udpPort) assertNull(ping.from.tcpPort) - val pong = PongPacket.create(bootstrapKeyPair, System.currentTimeMillis(), ping.from, ping.hash) + val pong = PongPacket.create(bootstrapKeyPair, System.currentTimeMillis(), ping.from, ping.hash, null) bootstrapClient.send(pong, address) val findNodes = bootstrapClient.receivePacket() as FindNodePacket @@ -161,7 +161,7 @@ internal class DiscoveryServiceTest { assertEquals(discoveryService.localPort, ping.from.udpPort) assertNull(ping.from.tcpPort) - val pong = PongPacket.create(SECP256K1.KeyPair.random(), System.currentTimeMillis(), ping.from, ping.hash) + val pong = PongPacket.create(SECP256K1.KeyPair.random(), System.currentTimeMillis(), ping.from, ping.hash, null) bootstrapClient.send(pong, address) delay(1000) @@ -247,7 +247,7 @@ internal class DiscoveryServiceTest { delay(500) assertNull(client.tryReceive(ByteBuffer.allocate(2048))) - val pong = PongPacket.create(clientKeyPair, System.currentTimeMillis(), ping.from, ping.hash) + val pong = PongPacket.create(clientKeyPair, System.currentTimeMillis(), ping.from, ping.hash, null) client.send(pong, address) val neighbors = client.receivePacket() as NeighborsPacket diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/PongPacketTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/ENRResponsePacketTest.kt similarity index 58% copy from devp2p/src/test/kotlin/org/apache/tuweni/devp2p/PongPacketTest.kt copy to devp2p/src/test/kotlin/org/apache/tuweni/devp2p/ENRResponsePacketTest.kt index 82b0f73..a061812 100644 --- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/PongPacketTest.kt +++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/ENRResponsePacketTest.kt @@ -24,18 +24,21 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.ExtendWith +import java.net.InetAddress import java.nio.ByteBuffer @ExtendWith(BouncyCastleExtension::class) -internal class PongPacketTest { +internal class ENRResponsePacketTest { @Test fun shouldEncodeThenDecodePacket() { val keyPair = SECP256K1.KeyPair.random() - val to = Endpoint("10.0.0.54", 6543, 6543) - val pingHash = Bytes32.random() + + val requestHash = Bytes32.random() + val enr = EthereumNodeRecord.toRLP(SECP256K1.KeyPair.random(), 2, emptyMap(), + InetAddress.getByName("example.com"), 3000, 12000) val now = System.currentTimeMillis() - val pong = PongPacket.create(keyPair, now, to, pingHash) + val pong = ENRResponsePacket.create(keyPair, now, requestHash, enr) val buffer = ByteBuffer.allocate(Packet.MAX_SIZE) pong.encodeTo(buffer) @@ -43,27 +46,12 @@ internal class PongPacketTest { val datagram = Bytes.wrapByteBuffer(buffer) val packet = Packet.decodeFrom(datagram) - assertTrue(packet is PongPacket) - - val pongPacket = packet as PongPacket - assertEquals(keyPair.publicKey(), pongPacket.nodeId) - assertEquals(Endpoint("10.0.0.54", 6543, 6543), pongPacket.to) - assertEquals(pingHash, pongPacket.pingHash) - assertEquals(((now + PACKET_EXPIRATION_PERIOD_MS + 999) / 1000) * 1000, pongPacket.expiration) - } - - @Test - fun decodeReferencePacket1() { - // https://github.com/ethereum/EIPs/blob/master/EIPS/eip-8.md - val datagram = Bytes.fromHexString( - "09b2428d83348d27cdf7064ad9024f526cebc19e4958f0fdad87c15eb598dd61d08423e0bf66b206" + - "9869e1724125f820d851c136684082774f870e614d95a2855d000f05d1648b2d5945470bc187c2d2" + - "216fbe870f43ed0909009882e176a46b0102f846d79020010db885a308d313198a2e037073488208" + - "ae82823aa0fbc914b16819237dcd8801d7e53f69e9719adecb3cc0e790c57e91ca4461c9548443b9" + - "a355c6010203c2040506a0c969a58f6f9095004c0177a6b47f451530cab38966a25cca5cb58f0555" + - "42124e") - val packet = Packet.decodeFrom(datagram) + assertTrue(packet is ENRResponsePacket) - assertTrue(packet is PongPacket) + val responsePacket = packet as ENRResponsePacket + assertEquals(keyPair.publicKey(), responsePacket.nodeId) + assertEquals(enr, responsePacket.enr) + assertEquals(requestHash, responsePacket.requestHash) + assertEquals(((now + PACKET_EXPIRATION_PERIOD_MS + 999) / 1000) * 1000, responsePacket.expiration) } } diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/PingPacketTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/PingPacketTest.kt index 5334cca..18f8fe1 100644 --- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/PingPacketTest.kt +++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/PingPacketTest.kt @@ -21,6 +21,7 @@ import org.apache.tuweni.crypto.SECP256K1 import org.apache.tuweni.junit.BouncyCastleExtension import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.ExtendWith import java.nio.ByteBuffer @@ -34,7 +35,7 @@ internal class PingPacketTest { val from = Endpoint("10.0.0.54", 6543, 6543) val to = Endpoint("192.168.34.65", 9832, 1453) val now = System.currentTimeMillis() - val ping = PingPacket.create(keyPair, now, from, to) + val ping = PingPacket.create(keyPair, now, from, to, null) val buffer = ByteBuffer.allocate(Packet.MAX_SIZE) ping.encodeTo(buffer) @@ -52,12 +53,36 @@ internal class PingPacketTest { } @Test + fun shouldEncodeThenDecodePacketWithEnrSeq() { + val keyPair = SECP256K1.KeyPair.random() + val from = Endpoint("10.0.0.54", 6543, 6543) + val to = Endpoint("192.168.34.65", 9832, 1453) + val now = System.currentTimeMillis() + val ping = PingPacket.create(keyPair, now, from, to, 64) + + val buffer = ByteBuffer.allocate(Packet.MAX_SIZE) + ping.encodeTo(buffer) + buffer.flip() + + val datagram = Bytes.wrapByteBuffer(buffer) + val packet = Packet.decodeFrom(datagram) + assertTrue(packet is PingPacket) + + val pingPacket = packet as PingPacket + assertEquals(keyPair.publicKey(), pingPacket.nodeId) + assertEquals(Endpoint("10.0.0.54", 6543, 6543), pingPacket.from) + assertEquals(Endpoint("192.168.34.65", 9832, 1453), pingPacket.to) + assertEquals(((now + PACKET_EXPIRATION_PERIOD_MS + 999) / 1000) * 1000, pingPacket.expiration) + assertEquals(64L, pingPacket.enrSeq) + } + + @Test fun shouldDecodePingPacketWithMissingEndpoint() { val keyPair = SECP256K1.KeyPair.random() val from = Endpoint("10.0.0.54", 6543, 6543) val to = Endpoint("192.168.34.65", 9832, 1453) val now = System.currentTimeMillis() - val ping = PingPacket.create(keyPair, now, from, to) + val ping = PingPacket.create(keyPair, now, from, to, null) val buffer = ByteBuffer.allocate(1024) ping.encodeTo(buffer) @@ -87,6 +112,7 @@ internal class PingPacketTest { assertTrue(packet is PingPacket) } + @Disabled("EIP-868 supercedes EIP-8 behavior") @Test fun decodeReferencePacket2() { // https://github.com/ethereum/EIPs/blob/master/EIPS/eip-8.md diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/PongPacketTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/PongPacketTest.kt index 82b0f73..5ce0a69 100644 --- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/PongPacketTest.kt +++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/PongPacketTest.kt @@ -22,6 +22,7 @@ import org.apache.tuweni.crypto.SECP256K1 import org.apache.tuweni.junit.BouncyCastleExtension import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.ExtendWith import java.nio.ByteBuffer @@ -35,7 +36,7 @@ internal class PongPacketTest { val to = Endpoint("10.0.0.54", 6543, 6543) val pingHash = Bytes32.random() val now = System.currentTimeMillis() - val pong = PongPacket.create(keyPair, now, to, pingHash) + val pong = PongPacket.create(keyPair, now, to, pingHash, null) val buffer = ByteBuffer.allocate(Packet.MAX_SIZE) pong.encodeTo(buffer) @@ -53,6 +54,31 @@ internal class PongPacketTest { } @Test + fun shouldEncodeThenDecodePacketWithSeq() { + val keyPair = SECP256K1.KeyPair.random() + val to = Endpoint("10.0.0.54", 6543, 6543) + val pingHash = Bytes32.random() + val now = System.currentTimeMillis() + val pong = PongPacket.create(keyPair, now, to, pingHash, 32) + + val buffer = ByteBuffer.allocate(Packet.MAX_SIZE) + pong.encodeTo(buffer) + buffer.flip() + + val datagram = Bytes.wrapByteBuffer(buffer) + val packet = Packet.decodeFrom(datagram) + assertTrue(packet is PongPacket) + + val pongPacket = packet as PongPacket + assertEquals(keyPair.publicKey(), pongPacket.nodeId) + assertEquals(Endpoint("10.0.0.54", 6543, 6543), pongPacket.to) + assertEquals(pingHash, pongPacket.pingHash) + assertEquals(((now + PACKET_EXPIRATION_PERIOD_MS + 999) / 1000) * 1000, pongPacket.expiration) + assertEquals(32L, pongPacket.enrSeq) + } + + @Disabled("EIP-868 supercedes EIP-8 behavior") + @Test fun decodeReferencePacket1() { // https://github.com/ethereum/EIPs/blob/master/EIPS/eip-8.md val datagram = Bytes.fromHexString( --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
