This is an automated email from the ASF dual-hosted git repository.

toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git

commit dc438454910df47c12ce172c23a6ff8e685e74bf
Author: Antoine Toulme <[email protected]>
AuthorDate: Tue May 28 00:21:26 2019 -0700

    Add support for EIP-868 to devp2p v4
---
 .../org/apache/tuweni/devp2p/DiscoveryService.kt   | 145 ++++++++++++++++++-
 .../org/apache/tuweni/devp2p/EthereumNodeRecord.kt |  12 ++
 .../main/kotlin/org/apache/tuweni/devp2p/Packet.kt | 158 +++++++++++++++++++--
 .../kotlin/org/apache/tuweni/devp2p/PacketType.kt  |  16 +++
 .../main/kotlin/org/apache/tuweni/devp2p/Peer.kt   |  15 ++
 .../org/apache/tuweni/devp2p/PeerRepository.kt     |  10 +-
 .../tuweni/devp2p/DiscoveryServiceJavaTest.java    |   3 +
 .../apache/tuweni/devp2p/DiscoveryServiceTest.kt   |   8 +-
 ...{PongPacketTest.kt => ENRResponsePacketTest.kt} |  38 ++---
 .../org/apache/tuweni/devp2p/PingPacketTest.kt     |  30 +++-
 .../org/apache/tuweni/devp2p/PongPacketTest.kt     |  28 +++-
 11 files changed, 409 insertions(+), 54 deletions(-)

diff --git 
a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/DiscoveryService.kt 
b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/DiscoveryService.kt
index 793eb91..9a09060 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/DiscoveryService.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/DiscoveryService.kt
@@ -40,6 +40,7 @@ import kotlinx.coroutines.launch
 import kotlinx.coroutines.withTimeout
 import kotlinx.coroutines.withTimeoutOrNull
 import kotlinx.coroutines.yield
+import org.apache.tuweni.bytes.Bytes
 import org.apache.tuweni.bytes.Bytes32
 import org.apache.tuweni.concurrent.AsyncCompletion
 import org.apache.tuweni.concurrent.AsyncResult
@@ -61,6 +62,7 @@ import java.net.SocketAddress
 import java.net.URI
 import java.nio.ByteBuffer
 import java.nio.channels.ClosedChannelException
+import java.time.Instant
 import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicLong
@@ -69,7 +71,9 @@ import kotlin.coroutines.CoroutineContext
 internal const val PACKET_EXPIRATION_PERIOD_MS: Long = (20 * 1000) // 20 
seconds
 internal const val PACKET_EXPIRATION_CHECK_GRACE_MS: Long = (5 * 1000) // 5 
seconds
 internal const val PEER_VERIFICATION_TIMEOUT_MS: Long = (22 * 1000) // 22 
seconds (packet expiration + a little)
+internal const val ENR_REQUEST_TIMEOUT_MS: Long = (22 * 1000) // 22 seconds 
(packet expiration + a little)
 internal const val PEER_VERIFICATION_RETRY_DELAY_MS: Long = (5 * 60 * 1000) // 
5 minutes
+internal const val ENR_REQUEST_RETRY_DELAY_MS: Long = (5 * 60 * 1000) // 5 
minutes
 internal const val BOOTSTRAP_PEER_VERIFICATION_TIMEOUT_MS: Long = (2 * 60 * 
1000) // 2 minutes
 internal const val REFRESH_INTERVAL_MS: Long = (60 * 1000) // 1 minute
 internal const val PING_RETRIES: Int = 20
@@ -84,7 +88,9 @@ internal const val LOOKUP_RESPONSE_TIMEOUT_MS: Long = 500 // 
500 milliseconds
 /**
  * An Ethereum ÐΞVp2p discovery service.
  *
- * @author Chris Leishman - https://cleishm.github.io/
+ * This service supports devp2p discovery v4, alongside support for EIP-868.
+ * http://eips.ethereum.org/EIPS/eip-868
+ *
  */
 interface DiscoveryService {
 
@@ -99,6 +105,8 @@ interface DiscoveryService {
      * @param port the port to listen on (defaults to `0`, which will cause a 
random free port to be chosen)
      * @param host the host name or IP address of the interface to bind to 
(defaults to `null`, which will cause the
      *         service to listen on all interfaces
+     * @param seq the sequence number of the Ethereum Node Record
+     * @param enrData the additional key/value pair entries to broadcast as an 
Ethereum Node Record (ENR).
      * @param bootstrapURIs the URIs for bootstrap nodes
      * @param peerRepository a [PeerRepository] for obtaining [Peer] instances
      * @param advertiseAddress the IP address to advertise to peers, or `null` 
if the address of the first bound
@@ -117,6 +125,8 @@ interface DiscoveryService {
       keyPair: SECP256K1.KeyPair,
       port: Int = 0,
       host: String? = null,
+      seq: Long = Instant.now().toEpochMilli(),
+      enrData: Map<String, Bytes> = emptyMap(),
       bootstrapURIs: List<URI> = emptyList(),
       peerRepository: PeerRepository = EphemeralPeerRepository(),
       advertiseAddress: InetAddress? = null,
@@ -133,6 +143,8 @@ interface DiscoveryService {
       return open(
         keyPair,
         bindAddress,
+        seq,
+        enrData,
         bootstrapURIs,
         peerRepository,
         advertiseAddress,
@@ -152,6 +164,8 @@ interface DiscoveryService {
      *
      * @param keyPair the local node's keypair
      * @param bindAddress the address to listen on
+     * @param seq the sequence number of the Ethereum Node Record
+     * @param enrData the additional key/value pair entries to broadcast as an 
Ethereum Node Record (ENR).
      * @param bootstrapURIs the URIs for bootstrap nodes
      * @param peerRepository a [PeerRepository] for obtaining [Peer] instances
      * @param advertiseAddress the IP address to advertise for incoming packets
@@ -168,6 +182,8 @@ interface DiscoveryService {
     fun open(
       keyPair: SECP256K1.KeyPair,
       bindAddress: InetSocketAddress,
+      seq: Long = Instant.now().toEpochMilli(),
+      enrData: Map<String, Bytes> = emptyMap(),
       bootstrapURIs: List<URI> = emptyList(),
       peerRepository: PeerRepository = EphemeralPeerRepository(),
       advertiseAddress: InetAddress? = null,
@@ -181,8 +197,8 @@ interface DiscoveryService {
       timeSupplier: () -> Long = CURRENT_TIME_SUPPLIER
     ): DiscoveryService {
       return CoroutineDiscoveryService(
-        keyPair, bindAddress, bootstrapURIs, advertiseAddress, 
advertiseUdpPort, advertiseTcpPort, peerRepository,
-        routingTable, packetFilter, loggerProvider, channelGroup, 
bufferAllocator, timeSupplier
+        keyPair, seq, enrData, bindAddress, bootstrapURIs, advertiseAddress, 
advertiseUdpPort, advertiseTcpPort,
+        peerRepository, routingTable, packetFilter, loggerProvider, 
channelGroup, bufferAllocator, timeSupplier
       )
     }
   }
@@ -259,10 +275,13 @@ interface DiscoveryService {
   val unvalidatedPeerPackets: Long
   val unexpectedPongs: Long
   val unexpectedNeighbors: Long
+  val unexpectedENRResponses: Long
 }
 
 internal class CoroutineDiscoveryService(
   private val keyPair: SECP256K1.KeyPair,
+  private val seq: Long = Instant.now().toEpochMilli(),
+  private val enrData: Map<String, Bytes>,
   bindAddress: InetSocketAddress,
   bootstrapURIs: List<URI> = emptyList(),
   advertiseAddress: InetAddress? = null,
@@ -280,8 +299,8 @@ internal class CoroutineDiscoveryService(
 
   private val logger = loggerProvider.getLogger(DiscoveryService::class.java)
   private val serviceDescriptor = "ÐΞVp2p discovery " + 
System.identityHashCode(this)
-
   private val selfEndpoint: Endpoint
+  private val enr: Bytes
 
   private val job = Job()
   // override the default exception handler, which dumps to stderr
@@ -306,7 +325,10 @@ internal class CoroutineDiscoveryService(
 
   private val verifyingEndpoints: Cache<InetSocketAddress, 
EndpointVerification> =
     
CacheBuilder.newBuilder().expireAfterAccess(PEER_VERIFICATION_RETRY_DELAY_MS, 
TimeUnit.MILLISECONDS).build()
+  private val requestingENRs: Cache<InetSocketAddress, ENRRequest> =
+    CacheBuilder.newBuilder().expireAfterAccess(ENR_REQUEST_RETRY_DELAY_MS, 
TimeUnit.MILLISECONDS).build()
   private val awaitingPongs = ConcurrentHashMap<Bytes32, 
EndpointVerification>()
+  private val awaitingENRs = ConcurrentHashMap<Bytes32, ENRRequest>()
   private val findNodeStates: Cache<SECP256K1.PublicKey, FindNodeState> =
     CacheBuilder.newBuilder().expireAfterAccess(FIND_NODES_CACHE_EXPIRY, 
TimeUnit.MILLISECONDS)
       .removalListener<SECP256K1.PublicKey, FindNodeState> { it.value.close() }
@@ -318,6 +340,7 @@ internal class CoroutineDiscoveryService(
   override var filteredPackets: Long by AtomicLong(0)
   override var unvalidatedPeerPackets: Long by AtomicLong(0)
   override var unexpectedPongs: Long by AtomicLong(0)
+  override var unexpectedENRResponses: Long by AtomicLong(0)
   override var unexpectedNeighbors: Long by AtomicLong(0)
 
   init {
@@ -329,6 +352,9 @@ internal class CoroutineDiscoveryService(
       advertiseTcpPort
     )
 
+    enr = EthereumNodeRecord.toRLP(keyPair, seq, enrData, 
selfEndpoint.address, selfEndpoint.tcpPort,
+      selfEndpoint.udpPort)
+
     val bootstrapping = bootstrapURIs.map { uri ->
       activityLatch.countUp()
       async {
@@ -535,6 +561,8 @@ internal class CoroutineDiscoveryService(
       is PongPacket -> handlePong(packet, address, arrivalTime)
       is FindNodePacket -> handleFindNode(packet, address, arrivalTime)
       is NeighborsPacket -> handleNeighbors(packet, address)
+      is ENRRequestPacket -> handleENRRequest(packet, address, arrivalTime)
+      is ENRResponsePacket -> handleENRResponse(packet, address, arrivalTime)
     }.let {} // guarantees "when" matching is exhaustive
   }
 
@@ -553,7 +581,7 @@ internal class CoroutineDiscoveryService(
       )
     }
 
-    val pong = PongPacket.create(keyPair, timeSupplier(), currentEndpoint, 
packet.hash)
+    val pong = PongPacket.create(keyPair, timeSupplier(), currentEndpoint, 
packet.hash, seq)
     sendPacket(from, pong)
     // 
https://github.com/ethereum/devp2p/blob/master/discv4.md#ping-packet-0x01 also 
suggests sending a ping
     // packet if the peer is unknown, however sending two packets in response 
to a single incoming would allow a
@@ -594,6 +622,13 @@ internal class CoroutineDiscoveryService(
     }
 
     pending.complete(VerificationResult(peer, endpoint))
+
+    if (packet.enrSeq != null) {
+      if (peer.enr == null || peer.enr!!.seq < packet.enrSeq) {
+        val now = timeSupplier()
+        withTimeoutOrNull(ENR_REQUEST_TIMEOUT_MS) { enrRequest(endpoint, 
peer).verify(now) }
+      }
+    }
   }
 
   private suspend fun handleFindNode(packet: FindNodePacket, from: 
InetSocketAddress, arrivalTime: Long) {
@@ -642,6 +677,47 @@ internal class CoroutineDiscoveryService(
     }
   }
 
+  private suspend fun handleENRRequest(packet: ENRRequestPacket, from: 
InetSocketAddress, arrivalTime: Long) {
+    val peer = peerRepository.get(packet.nodeId)
+    val (_, endpoint) = ensurePeerIsValid(peer, from, arrivalTime) ?: run {
+      logger.debug("{}: received enrRequest from {} which cannot be 
validated", serviceDescriptor, from)
+      ++unvalidatedPeerPackets
+      return
+    }
+
+    logger.debug("{}: received enrRequest from {}", serviceDescriptor, from)
+
+    val address = endpoint.udpSocketAddress
+    sendPacket(address, ENRResponsePacket.create(keyPair, timeSupplier(), 
packet.hash, enr))
+  }
+
+  private suspend fun handleENRResponse(packet: ENRResponsePacket, from: 
InetSocketAddress, arrivalTime: Long) {
+    val pending = awaitingENRs.remove(packet.requestHash) ?: run {
+      logger.debug("{}: received unexpected or late enr response from {}", 
serviceDescriptor, from)
+      ++unexpectedENRResponses
+      return
+    }
+    packet.requestHash
+    val sender = pending.peer
+    // COMPATIBILITY: If the node-id's don't match, the pong should probably 
be rejected. However, if a different
+    // peer is listening at the same address, it will respond to the ping with 
its node-id. Instead of rejecting,
+    // accept the pong and update the new peer record with the proven 
endpoint, preferring to keep its current
+    // tcpPort and otherwise keeping the tcpPort of the original peer.
+    val peer = if (sender.nodeId == packet.nodeId) sender else 
peerRepository.get(packet.nodeId)
+
+    val enr = EthereumNodeRecord.fromRLP(packet.enr)
+    try {
+      enr.validate()
+    } catch (e: InvalidNodeRecordException) {
+      logger.debug("Invalid ENR", e)
+      return
+    }
+
+    peer.updateENR(enr, arrivalTime)
+
+    pending.complete(ENRResult(peer, enr))
+  }
+
   private fun addToRoutingTable(peer: Peer) {
     routingTable.add(peer)?.let { contested ->
       launch {
@@ -673,7 +749,7 @@ internal class CoroutineDiscoveryService(
   private fun endpointVerification(endpoint: Endpoint, peer: Peer) =
     verifyingEndpoints.get(endpoint.udpSocketAddress) { 
EndpointVerification(endpoint, peer) }
 
-  // a representation of the state and current action for verifying and 
endpoint,
+  // a representation of the state and current action for verifying an 
endpoint,
   // to avoid concurrent attempts to verify the same endpoint
   private inner class EndpointVerification(val endpoint: Endpoint, val peer: 
Peer) {
     private val deferred = CompletableDeferred<VerificationResult?>()
@@ -718,7 +794,7 @@ internal class CoroutineDiscoveryService(
     }
 
     private suspend fun sendPing(now: Long = timeSupplier()) {
-      val pingPacket = PingPacket.create(keyPair, now, selfEndpoint, endpoint)
+      val pingPacket = PingPacket.create(keyPair, now, selfEndpoint, endpoint, 
seq)
 
       // create local references to be captured in the closure, rather than 
the whole packet instance
       val hash = pingPacket.hash
@@ -753,6 +829,61 @@ internal class CoroutineDiscoveryService(
     val endpoint: Endpoint
   )
 
+  private fun enrRequest(endpoint: Endpoint, peer: Peer) =
+    requestingENRs.get(endpoint.udpSocketAddress) { ENRRequest(endpoint, peer) 
}
+
+  // a representation of the state and current action for querying an ENR from 
a peer,
+  // to avoid concurrent attempts to request the same information.
+  private inner class ENRRequest(val endpoint: Endpoint, val peer: Peer) {
+    private val deferred = CompletableDeferred<ENRResult?>()
+    @Volatile
+    private var active: Job? = null
+    private var nextENRRequest: Long = 0
+    private var retryDelay: Long = 0
+
+    suspend fun verify(now: Long = timeSupplier()): ENRResult? {
+      if (!deferred.isCompleted) {
+        // if not already actively requesting and enough time has passed since 
the last request, send a single request
+        synchronized(this) {
+          if (active?.isCompleted != false && now >= nextENRRequest) {
+            nextENRRequest = now + RESEND_DELAY_MS
+            launch { sendENRRequest(now) }
+          }
+        }
+      }
+      return deferred.await()
+    }
+
+    private suspend fun sendENRRequest(now: Long = timeSupplier()) {
+      val enrRequestPacket = ENRRequestPacket.create(keyPair, now)
+
+      // create local references to be captured in the closure, rather than 
the whole packet instance
+      val hash = enrRequestPacket.hash
+      val timeout = enrRequestPacket.expiration - now
+
+      // very unlikely that there is another ping packet created with the same 
hash yet a different ENRRequest
+      // instance, but if there is then the first will be waiting on a 
deferred that never completes and will
+      // eventually time out
+      if (awaitingENRs.put(hash, this) != this) {
+        launch {
+          delay(timeout)
+          awaitingENRs.remove(hash)
+        }
+        sendPacket(endpoint.udpSocketAddress, enrRequestPacket)
+      }
+    }
+
+    fun complete(result: ENRResult?): Boolean {
+      active?.cancel()
+      return deferred.complete(result)
+    }
+  }
+
+  private data class ENRResult(
+    val peer: Peer,
+    val enr: EthereumNodeRecord
+  )
+
   @UseExperimental(ObsoleteCoroutinesApi::class)
   private suspend fun findNodes(peer: Peer, target: SECP256K1.PublicKey) {
     // consume all received nodes (and discard), thus suspending until 
completed
diff --git 
a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/EthereumNodeRecord.kt 
b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/EthereumNodeRecord.kt
index b40a9e0..776450e 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/EthereumNodeRecord.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/EthereumNodeRecord.kt
@@ -157,6 +157,18 @@ class EthereumNodeRecord(val signature: Bytes, val seq: 
Long, val data: Map<Stri
     val ecPoint = 
SECP256K1.Parameters.CURVE.getCurve().decodePoint(keyBytes.toArrayUnsafe())
     return 
SECP256K1.PublicKey.fromBytes(Bytes.wrap(ecPoint.getEncoded(false)).slice(1))
   }
+
+  fun ip(): InetAddress {
+    return InetAddress.getByAddress(data["ip"]!!.toArrayUnsafe())
+  }
+
+  fun tcp(): Int {
+    return data["tcp"]!!.toInt()
+  }
+
+  fun udp(): Int {
+    return data["udp"]!!.toInt()
+  }
 }
 
 internal class InvalidNodeRecordException(message: String?) : 
RuntimeException(message)
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/Packet.kt 
b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/Packet.kt
index a46bad1..f1a6d88 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/Packet.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/Packet.kt
@@ -114,19 +114,20 @@ internal class PingPacket private constructor(
   hash: Bytes32,
   val from: Endpoint,
   val to: Endpoint,
-  expiration: Long
+  expiration: Long,
+  val enrSeq: Long?
 ) : Packet(nodeId, signature, hash, expiration) {
 
   companion object {
     private const val VERSION = 4
 
-    fun create(keyPair: SECP256K1.KeyPair, now: Long, from: Endpoint, to: 
Endpoint): PingPacket {
+    fun create(keyPair: SECP256K1.KeyPair, now: Long, from: Endpoint, to: 
Endpoint, seq: Long?): PingPacket {
       val expiration = expirationFor(now)
       val sigHash = createSignature(
         PacketType.PING,
         keyPair
       ) { writer ->
-        encodeTo(writer, from, to, expiration)
+        encodeTo(writer, from, to, expiration, seq)
       }
       return PingPacket(
         keyPair.publicKey(),
@@ -134,7 +135,8 @@ internal class PingPacket private constructor(
         sigHash.hash,
         from,
         to,
-        expiration
+        expiration,
+        seq
       )
     }
 
@@ -150,27 +152,36 @@ internal class PingPacket private constructor(
           val from = reader.readList { r -> Endpoint.readFrom(r) }
           val to = reader.readList { r -> Endpoint.readFrom(r) }
           val expiration = reader.readLong() // seconds
+          val seq: Long?
+          if (!reader.isComplete) {
+            seq = reader.readLong()
+          } else {
+            seq = null
+          }
 
           if (version < VERSION) {
             throw DecodingException("Unexpected version $VERSION in ping")
           }
-          PingPacket(publicKey, signature, hash, from, to, 
secToMsec(expiration))
+          PingPacket(publicKey, signature, hash, from, to, 
secToMsec(expiration), seq)
         }
       } catch (e: RLPException) {
         throw DecodingException("Invalid ping packet", e)
       }
     }
 
-    private fun encodeTo(writer: RLPWriter, from: Endpoint, to: Endpoint, 
expiration: Long) {
+    private fun encodeTo(writer: RLPWriter, from: Endpoint, to: Endpoint, 
expiration: Long, seq: Long?) {
       writer.writeInt(VERSION)
       writer.writeList { w -> from.writeTo(w) }
       writer.writeList { w -> to.writeTo(w) }
       writer.writeLong(msecToSec(expiration)) // write in seconds
+      seq?.let {
+        writer.writeLong(it)
+      }
     }
   }
 
   override fun encodeTo(dst: ByteBuffer) = encodeTo(dst, PacketType.PING) { 
writer ->
-    encodeTo(writer, from, to, expiration)
+    encodeTo(writer, from, to, expiration, enrSeq)
   }
 }
 
@@ -180,17 +191,18 @@ internal class PongPacket private constructor(
   hash: Bytes32,
   val to: Endpoint,
   val pingHash: Bytes32,
-  expiration: Long
+  expiration: Long,
+  val enrSeq: Long?
 ) : Packet(nodeId, signature, hash, expiration) {
 
   companion object {
-    fun create(keyPair: SECP256K1.KeyPair, now: Long, to: Endpoint, pingHash: 
Bytes32): PongPacket {
+    fun create(keyPair: SECP256K1.KeyPair, now: Long, to: Endpoint, pingHash: 
Bytes32, enrSeq: Long?): PongPacket {
       val expiration = expirationFor(now)
       val sigHash = createSignature(
         PacketType.PONG,
         keyPair
       ) { writer ->
-        encodeTo(writer, to, pingHash, expiration)
+        encodeTo(writer, to, pingHash, expiration, enrSeq)
       }
       return PongPacket(
         keyPair.publicKey(),
@@ -198,7 +210,8 @@ internal class PongPacket private constructor(
         sigHash.hash,
         to,
         pingHash,
-        expiration
+        expiration,
+        enrSeq
       )
     }
 
@@ -213,22 +226,29 @@ internal class PongPacket private constructor(
           val to = reader.readList { r -> Endpoint.readFrom(r) }
           val pingHash = Bytes32.wrap(reader.readValue())
           val expiration = reader.readLong() // seconds
-          PongPacket(publicKey, signature, hash, to, pingHash, 
secToMsec(expiration))
+          val seq: Long?
+          if (!reader.isComplete) {
+            seq = reader.readLong()
+          } else {
+            seq = null
+          }
+          PongPacket(publicKey, signature, hash, to, pingHash, 
secToMsec(expiration), seq)
         }
       } catch (e: RLPException) {
         throw DecodingException("Invalid pong packet", e)
       }
     }
 
-    private fun encodeTo(writer: RLPWriter, to: Endpoint, pingHash: Bytes32, 
expiration: Long) {
+    private fun encodeTo(writer: RLPWriter, to: Endpoint, pingHash: Bytes32, 
expiration: Long, enrSeq: Long?) {
       writer.writeList { w -> to.writeTo(w) }
       writer.writeValue(pingHash)
       writer.writeLong(msecToSec(expiration))
+      enrSeq?.let { writer.writeLong(it) }
     }
   }
 
   override fun encodeTo(dst: ByteBuffer) = encodeTo(dst, PacketType.PONG) { 
writer ->
-    encodeTo(writer, to, pingHash, expiration)
+    encodeTo(writer, to, pingHash, expiration, enrSeq)
   }
 }
 
@@ -366,3 +386,113 @@ internal class NeighborsPacket private constructor(
     encodeTo(writer, nodes, expiration)
   }
 }
+
+internal class ENRRequestPacket private constructor(
+  nodeId: SECP256K1.PublicKey,
+  signature: SECP256K1.Signature,
+  hash: Bytes32,
+  expiration: Long
+) : Packet(nodeId, signature, hash, expiration) {
+
+  companion object {
+    fun decode(
+      payload: Bytes,
+      hash: Bytes32,
+      publicKey: SECP256K1.PublicKey,
+      signature: SECP256K1.Signature
+    ): ENRRequestPacket {
+      try {
+        return RLP.decodeList(payload) { reader ->
+          val expiration = reader.readLong()
+          ENRRequestPacket(publicKey, signature, hash, secToMsec(expiration))
+        }
+      } catch (e: RLPException) {
+        throw DecodingException("Invalid enr request packet", e)
+      }
+    }
+
+    private fun encodeTo(writer: RLPWriter, expiration: Long) {
+      writer.writeLong(msecToSec(expiration))
+    }
+
+    fun create(keyPair: SECP256K1.KeyPair, now: Long): Packet {
+      val expiration = expirationFor(now)
+      val sigHash = createSignature(
+        PacketType.ENRRESPONSE,
+        keyPair
+      ) { writer ->
+        ENRRequestPacket.encodeTo(writer, expiration)
+      }
+      return ENRRequestPacket(
+        keyPair.publicKey(),
+        sigHash.signature,
+        sigHash.hash,
+        expiration
+      )
+    }
+  }
+
+  override fun encodeTo(dst: ByteBuffer) = encodeTo(dst, 
PacketType.ENRREQUEST) { writer ->
+    ENRRequestPacket.encodeTo(writer, expiration)
+  }
+}
+
+internal class ENRResponsePacket private constructor(
+  nodeId: SECP256K1.PublicKey,
+  signature: SECP256K1.Signature,
+  hash: Bytes32,
+  expiration: Long,
+  val requestHash: Bytes,
+  val enr: Bytes
+) : Packet(nodeId, signature, hash, expiration) {
+
+  companion object {
+
+    fun create(keyPair: SECP256K1.KeyPair, now: Long, requestHash: Bytes, enr: 
Bytes): ENRResponsePacket {
+      val expiration = expirationFor(now)
+      val sigHash = createSignature(
+        PacketType.ENRRESPONSE,
+        keyPair
+      ) { writer ->
+        encodeTo(writer, requestHash, enr, expiration)
+      }
+      return ENRResponsePacket(
+        keyPair.publicKey(),
+        sigHash.signature,
+        sigHash.hash,
+        expiration,
+        requestHash,
+        enr
+      )
+    }
+
+    fun decode(
+      payload: Bytes,
+      hash: Bytes32,
+      publicKey: SECP256K1.PublicKey,
+      signature: SECP256K1.Signature
+    ): ENRResponsePacket {
+      try {
+        return RLP.decodeList(payload) { reader ->
+          //request-hash, ENR
+          val requestHash = reader.readValue()
+          val enr = reader.readValue()
+          val expiration = reader.readLong()
+          ENRResponsePacket(publicKey, signature, hash, secToMsec(expiration), 
requestHash, enr)
+        }
+      } catch (e: RLPException) {
+        throw DecodingException("Invalid enr response packet", e)
+      }
+    }
+
+    private fun encodeTo(writer: RLPWriter, requestHash: Bytes, enr: Bytes, 
expiration: Long) {
+      writer.writeValue(requestHash)
+      writer.writeValue(enr)
+      writer.writeLong(msecToSec(expiration))
+    }
+  }
+
+  override fun encodeTo(dst: ByteBuffer) = encodeTo(dst, 
PacketType.ENRRESPONSE) { writer ->
+    ENRResponsePacket.encodeTo(writer, requestHash, enr, expiration)
+  }
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/PacketType.kt 
b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/PacketType.kt
index 55b91af..c46934c 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/PacketType.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/PacketType.kt
@@ -55,6 +55,22 @@ internal enum class PacketType(
       publicKey: SECP256K1.PublicKey,
       signature: SECP256K1.Signature
     ) = NeighborsPacket.decode(payload, hash, publicKey, signature)
+  },
+  ENRREQUEST(0x05) {
+    override fun decode(
+      payload: Bytes,
+      hash: Bytes32,
+      publicKey: SECP256K1.PublicKey,
+      signature: SECP256K1.Signature
+    ) = ENRRequestPacket.decode(payload, hash, publicKey, signature)
+  },
+  ENRRESPONSE(0x06) {
+    override fun decode(
+      payload: Bytes,
+      hash: Bytes32,
+      publicKey: SECP256K1.PublicKey,
+      signature: SECP256K1.Signature
+    ) = ENRResponsePacket.decode(payload, hash, publicKey, signature)
   };
 
   companion object {
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/Peer.kt 
b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/Peer.kt
index d281558..ae3b3ad 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/Peer.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/Peer.kt
@@ -34,6 +34,11 @@ interface Peer {
   val endpoint: Endpoint?
 
   /**
+   * The Ethereum Node Record associated with the peer, if known.
+   */
+  val enr: EthereumNodeRecord?
+
+  /**
    * The last time the current endpoint of this peer was verified, in 
milliseconds since the epoch.
    *
    * Endpoint is verified by a ping/pong cycle: 
https://github.com/ethereum/devp2p/blob/master/discv4.md#endpoint-proof
@@ -89,4 +94,14 @@ interface Peer {
    * @throws IllegalStateException if there is no endpoint for this peer
    */
   fun seenAt(time: Long)
+
+  /**
+   * Update the peer's ENR.
+   *
+   * Will only update if the [seq] is larger than the one associated with the 
peer.
+   *
+   * @param record the ENR record associated with the peer
+   * @param time the time this endpoint information was determined, in 
milliseconds since the epoch
+   */
+  fun updateENR(record: EthereumNodeRecord, time: Long)
 }
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/PeerRepository.kt 
b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/PeerRepository.kt
index 1271efc..5fb267d 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/PeerRepository.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/PeerRepository.kt
@@ -132,10 +132,10 @@ class EphemeralPeerRepository : PeerRepository {
     override val nodeId: SECP256K1.PublicKey,
     knownEndpoint: Endpoint? = null
   ) : Peer {
-
     @Volatile
     override var endpoint: Endpoint? = knownEndpoint
 
+    override var enr: EthereumNodeRecord? = null
     @Synchronized
     override fun getEndpoint(ifVerifiedOnOrAfter: Long): Endpoint? {
       if ((lastVerified ?: 0) >= ifVerifiedOnOrAfter) {
@@ -191,5 +191,13 @@ class EphemeralPeerRepository : PeerRepository {
         lastSeen = time
       }
     }
+
+    @Synchronized
+    override fun updateENR(record: EthereumNodeRecord, time: Long) {
+      if (enr == null || enr!!.seq < record.seq) {
+        enr = record
+        updateEndpoint(Endpoint(record.ip(), record.udp(), record.tcp()), time)
+      }
+    }
   }
 }
diff --git 
a/devp2p/src/test/java/org/apache/tuweni/devp2p/DiscoveryServiceJavaTest.java 
b/devp2p/src/test/java/org/apache/tuweni/devp2p/DiscoveryServiceJavaTest.java
index bcaae43..ef8e082 100644
--- 
a/devp2p/src/test/java/org/apache/tuweni/devp2p/DiscoveryServiceJavaTest.java
+++ 
b/devp2p/src/test/java/org/apache/tuweni/devp2p/DiscoveryServiceJavaTest.java
@@ -12,6 +12,7 @@
  */
 package org.apache.tuweni.devp2p;
 
+import static java.util.Collections.emptyMap;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -56,6 +57,8 @@ class DiscoveryServiceJavaTest {
         SECP256K1.KeyPair.random(),
         0,
         "localhost",
+        1,
+        emptyMap(),
         Collections.singletonList(URI.create("enode://" + 
peerKeyPair.publicKey().toHexString() + "@127.0.0.1:10000")),
         repository);
     AsyncResult<Peer> result = repository.getAsync(peerKeyPair.publicKey());
diff --git 
a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/DiscoveryServiceTest.kt 
b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/DiscoveryServiceTest.kt
index cd0c7dc..0eedbd0 100644
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/DiscoveryServiceTest.kt
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/DiscoveryServiceTest.kt
@@ -80,7 +80,7 @@ internal class DiscoveryServiceTest {
     val clientKeyPair = SECP256K1.KeyPair.random()
     val client = CoroutineDatagramChannel.open()
     val clientEndpoint = Endpoint("192.168.1.1", 5678, 7654)
-    val ping = PingPacket.create(clientKeyPair, System.currentTimeMillis(), 
clientEndpoint, Endpoint(address))
+    val ping = PingPacket.create(clientKeyPair, System.currentTimeMillis(), 
clientEndpoint, Endpoint(address), null)
     client.send(ping, address)
 
     val pong = client.receivePacket() as PongPacket
@@ -119,7 +119,7 @@ internal class DiscoveryServiceTest {
     assertEquals(discoveryService.localPort, ping.from.udpPort)
     assertNull(ping.from.tcpPort)
 
-    val pong = PongPacket.create(bootstrapKeyPair, System.currentTimeMillis(), 
ping.from, ping.hash)
+    val pong = PongPacket.create(bootstrapKeyPair, System.currentTimeMillis(), 
ping.from, ping.hash, null)
     bootstrapClient.send(pong, address)
 
     val findNodes = bootstrapClient.receivePacket() as FindNodePacket
@@ -161,7 +161,7 @@ internal class DiscoveryServiceTest {
     assertEquals(discoveryService.localPort, ping.from.udpPort)
     assertNull(ping.from.tcpPort)
 
-    val pong = PongPacket.create(SECP256K1.KeyPair.random(), 
System.currentTimeMillis(), ping.from, ping.hash)
+    val pong = PongPacket.create(SECP256K1.KeyPair.random(), 
System.currentTimeMillis(), ping.from, ping.hash, null)
     bootstrapClient.send(pong, address)
 
     delay(1000)
@@ -247,7 +247,7 @@ internal class DiscoveryServiceTest {
     delay(500)
     assertNull(client.tryReceive(ByteBuffer.allocate(2048)))
 
-    val pong = PongPacket.create(clientKeyPair, System.currentTimeMillis(), 
ping.from, ping.hash)
+    val pong = PongPacket.create(clientKeyPair, System.currentTimeMillis(), 
ping.from, ping.hash, null)
     client.send(pong, address)
 
     val neighbors = client.receivePacket() as NeighborsPacket
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/PongPacketTest.kt 
b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/ENRResponsePacketTest.kt
similarity index 58%
copy from devp2p/src/test/kotlin/org/apache/tuweni/devp2p/PongPacketTest.kt
copy to devp2p/src/test/kotlin/org/apache/tuweni/devp2p/ENRResponsePacketTest.kt
index 82b0f73..a061812 100644
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/PongPacketTest.kt
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/ENRResponsePacketTest.kt
@@ -24,18 +24,21 @@ import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.Assertions.assertTrue
 import org.junit.jupiter.api.Test
 import org.junit.jupiter.api.extension.ExtendWith
+import java.net.InetAddress
 import java.nio.ByteBuffer
 
 @ExtendWith(BouncyCastleExtension::class)
-internal class PongPacketTest {
+internal class ENRResponsePacketTest {
 
   @Test
   fun shouldEncodeThenDecodePacket() {
     val keyPair = SECP256K1.KeyPair.random()
-    val to = Endpoint("10.0.0.54", 6543, 6543)
-    val pingHash = Bytes32.random()
+
+    val requestHash = Bytes32.random()
+    val enr = EthereumNodeRecord.toRLP(SECP256K1.KeyPair.random(), 2, 
emptyMap(),
+      InetAddress.getByName("example.com"), 3000, 12000)
     val now = System.currentTimeMillis()
-    val pong = PongPacket.create(keyPair, now, to, pingHash)
+    val pong = ENRResponsePacket.create(keyPair, now, requestHash, enr)
 
     val buffer = ByteBuffer.allocate(Packet.MAX_SIZE)
     pong.encodeTo(buffer)
@@ -43,27 +46,12 @@ internal class PongPacketTest {
 
     val datagram = Bytes.wrapByteBuffer(buffer)
     val packet = Packet.decodeFrom(datagram)
-    assertTrue(packet is PongPacket)
-
-    val pongPacket = packet as PongPacket
-    assertEquals(keyPair.publicKey(), pongPacket.nodeId)
-    assertEquals(Endpoint("10.0.0.54", 6543, 6543), pongPacket.to)
-    assertEquals(pingHash, pongPacket.pingHash)
-    assertEquals(((now + PACKET_EXPIRATION_PERIOD_MS + 999) / 1000) * 1000, 
pongPacket.expiration)
-  }
-
-  @Test
-  fun decodeReferencePacket1() {
-    // https://github.com/ethereum/EIPs/blob/master/EIPS/eip-8.md
-    val datagram = Bytes.fromHexString(
-      
"09b2428d83348d27cdf7064ad9024f526cebc19e4958f0fdad87c15eb598dd61d08423e0bf66b206"
 +
-        
"9869e1724125f820d851c136684082774f870e614d95a2855d000f05d1648b2d5945470bc187c2d2"
 +
-        
"216fbe870f43ed0909009882e176a46b0102f846d79020010db885a308d313198a2e037073488208"
 +
-        
"ae82823aa0fbc914b16819237dcd8801d7e53f69e9719adecb3cc0e790c57e91ca4461c9548443b9"
 +
-        
"a355c6010203c2040506a0c969a58f6f9095004c0177a6b47f451530cab38966a25cca5cb58f0555"
 +
-        "42124e")
-    val packet = Packet.decodeFrom(datagram)
+    assertTrue(packet is ENRResponsePacket)
 
-    assertTrue(packet is PongPacket)
+    val responsePacket = packet as ENRResponsePacket
+    assertEquals(keyPair.publicKey(), responsePacket.nodeId)
+    assertEquals(enr, responsePacket.enr)
+    assertEquals(requestHash, responsePacket.requestHash)
+    assertEquals(((now + PACKET_EXPIRATION_PERIOD_MS + 999) / 1000) * 1000, 
responsePacket.expiration)
   }
 }
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/PingPacketTest.kt 
b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/PingPacketTest.kt
index 5334cca..18f8fe1 100644
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/PingPacketTest.kt
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/PingPacketTest.kt
@@ -21,6 +21,7 @@ import org.apache.tuweni.crypto.SECP256K1
 import org.apache.tuweni.junit.BouncyCastleExtension
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.Assertions.assertTrue
+import org.junit.jupiter.api.Disabled
 import org.junit.jupiter.api.Test
 import org.junit.jupiter.api.extension.ExtendWith
 import java.nio.ByteBuffer
@@ -34,7 +35,7 @@ internal class PingPacketTest {
     val from = Endpoint("10.0.0.54", 6543, 6543)
     val to = Endpoint("192.168.34.65", 9832, 1453)
     val now = System.currentTimeMillis()
-    val ping = PingPacket.create(keyPair, now, from, to)
+    val ping = PingPacket.create(keyPair, now, from, to, null)
 
     val buffer = ByteBuffer.allocate(Packet.MAX_SIZE)
     ping.encodeTo(buffer)
@@ -52,12 +53,36 @@ internal class PingPacketTest {
   }
 
   @Test
+  fun shouldEncodeThenDecodePacketWithEnrSeq() {
+    val keyPair = SECP256K1.KeyPair.random()
+    val from = Endpoint("10.0.0.54", 6543, 6543)
+    val to = Endpoint("192.168.34.65", 9832, 1453)
+    val now = System.currentTimeMillis()
+    val ping = PingPacket.create(keyPair, now, from, to, 64)
+
+    val buffer = ByteBuffer.allocate(Packet.MAX_SIZE)
+    ping.encodeTo(buffer)
+    buffer.flip()
+
+    val datagram = Bytes.wrapByteBuffer(buffer)
+    val packet = Packet.decodeFrom(datagram)
+    assertTrue(packet is PingPacket)
+
+    val pingPacket = packet as PingPacket
+    assertEquals(keyPair.publicKey(), pingPacket.nodeId)
+    assertEquals(Endpoint("10.0.0.54", 6543, 6543), pingPacket.from)
+    assertEquals(Endpoint("192.168.34.65", 9832, 1453), pingPacket.to)
+    assertEquals(((now + PACKET_EXPIRATION_PERIOD_MS + 999) / 1000) * 1000, 
pingPacket.expiration)
+    assertEquals(64L, pingPacket.enrSeq)
+  }
+
+  @Test
   fun shouldDecodePingPacketWithMissingEndpoint() {
     val keyPair = SECP256K1.KeyPair.random()
     val from = Endpoint("10.0.0.54", 6543, 6543)
     val to = Endpoint("192.168.34.65", 9832, 1453)
     val now = System.currentTimeMillis()
-    val ping = PingPacket.create(keyPair, now, from, to)
+    val ping = PingPacket.create(keyPair, now, from, to, null)
 
     val buffer = ByteBuffer.allocate(1024)
     ping.encodeTo(buffer)
@@ -87,6 +112,7 @@ internal class PingPacketTest {
     assertTrue(packet is PingPacket)
   }
 
+  @Disabled("EIP-868 supercedes EIP-8 behavior")
   @Test
   fun decodeReferencePacket2() {
     // https://github.com/ethereum/EIPs/blob/master/EIPS/eip-8.md
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/PongPacketTest.kt 
b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/PongPacketTest.kt
index 82b0f73..5ce0a69 100644
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/PongPacketTest.kt
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/PongPacketTest.kt
@@ -22,6 +22,7 @@ import org.apache.tuweni.crypto.SECP256K1
 import org.apache.tuweni.junit.BouncyCastleExtension
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.Assertions.assertTrue
+import org.junit.jupiter.api.Disabled
 import org.junit.jupiter.api.Test
 import org.junit.jupiter.api.extension.ExtendWith
 import java.nio.ByteBuffer
@@ -35,7 +36,7 @@ internal class PongPacketTest {
     val to = Endpoint("10.0.0.54", 6543, 6543)
     val pingHash = Bytes32.random()
     val now = System.currentTimeMillis()
-    val pong = PongPacket.create(keyPair, now, to, pingHash)
+    val pong = PongPacket.create(keyPair, now, to, pingHash, null)
 
     val buffer = ByteBuffer.allocate(Packet.MAX_SIZE)
     pong.encodeTo(buffer)
@@ -53,6 +54,31 @@ internal class PongPacketTest {
   }
 
   @Test
+  fun shouldEncodeThenDecodePacketWithSeq() {
+    val keyPair = SECP256K1.KeyPair.random()
+    val to = Endpoint("10.0.0.54", 6543, 6543)
+    val pingHash = Bytes32.random()
+    val now = System.currentTimeMillis()
+    val pong = PongPacket.create(keyPair, now, to, pingHash, 32)
+
+    val buffer = ByteBuffer.allocate(Packet.MAX_SIZE)
+    pong.encodeTo(buffer)
+    buffer.flip()
+
+    val datagram = Bytes.wrapByteBuffer(buffer)
+    val packet = Packet.decodeFrom(datagram)
+    assertTrue(packet is PongPacket)
+
+    val pongPacket = packet as PongPacket
+    assertEquals(keyPair.publicKey(), pongPacket.nodeId)
+    assertEquals(Endpoint("10.0.0.54", 6543, 6543), pongPacket.to)
+    assertEquals(pingHash, pongPacket.pingHash)
+    assertEquals(((now + PACKET_EXPIRATION_PERIOD_MS + 999) / 1000) * 1000, 
pongPacket.expiration)
+    assertEquals(32L, pongPacket.enrSeq)
+  }
+
+  @Disabled("EIP-868 supercedes EIP-8 behavior")
+  @Test
   fun decodeReferencePacket1() {
     // https://github.com/ethereum/EIPs/blob/master/EIPS/eip-8.md
     val datagram = Bytes.fromHexString(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to