This is an automated email from the ASF dual-hosted git repository.

toulmean pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git


The following commit(s) were added to refs/heads/main by this push:
     new d6195d9  Add support for eth/66
     new a831bc4  Merge pull request #231 from atoulme/eth66
d6195d9 is described below

commit d6195d9b9ccf468f7f61b2a0f7120a254f9aa8ab
Author: Antoine Toulme <[email protected]>
AuthorDate: Sun May 2 11:30:21 2021 -0700

    Add support for eth/66
---
 .../org/apache/tuweni/devp2p/eth/EthClient.kt      |  11 +-
 .../devp2p/eth/{EthClient.kt => EthClient66.kt}    | 110 ++++++++-------
 .../org/apache/tuweni/devp2p/eth/EthController.kt  |  61 +++++++--
 .../org/apache/tuweni/devp2p/eth/EthHandler.kt     |  14 +-
 .../devp2p/eth/{EthHandler.kt => EthHandler66.kt}  | 150 ++++++++++++---------
 .../tuweni/devp2p/eth/EthHelloSubprotocol.kt       |  18 +--
 .../apache/tuweni/devp2p/eth/EthRequestsManager.kt |  36 -----
 .../org/apache/tuweni/devp2p/eth/EthSubprotocol.kt |  41 +++---
 .../apache/tuweni/devp2p/eth/EthSubprotocolTest.kt |  23 ++--
 .../apache/tuweni/devp2p/proxy/ProxySubprotocol.kt |  10 +-
 .../org/apache/tuweni/ethclient/EthereumClient.kt  |  60 +++++----
 .../tuweni/ethclient/FromBestBlockSynchronizer.kt  |   4 +-
 .../ethclient/FromUnknownParentSynchronizer.kt     |   4 +-
 .../tuweni/ethclient/PeerStatusEthSynchronizer.kt  |   4 +-
 .../org/apache/tuweni/ethclient/Synchronizer.kt    |   4 +-
 .../org/apache/tuweni/les/LESSubProtocolHandler.kt |   4 +-
 .../kotlin/org/apache/tuweni/les/LESSubprotocol.kt |  34 +----
 .../java/org/apache/tuweni/rlp/BytesRLPReader.java |   9 ++
 .../main/java/org/apache/tuweni/rlp/RLPReader.java |   7 +
 .../org/apache/tuweni/rlp/BytesRLPReaderTest.java  |  11 ++
 .../tuweni/rlpx/vertx/VertxAcceptanceTest.java     |  40 +++---
 .../tuweni/rlpx/vertx/VertxRLPxServiceTest.java    |  35 +++--
 .../apache/tuweni/rlpx/vertx/VertxRLPxService.java |  23 ++--
 .../rlpx/wire/DefaultSubProtocolIdentifier.java    |  14 +-
 .../tuweni/rlpx/wire/DefaultWireConnection.java    |  56 +++++---
 .../org/apache/tuweni/rlpx/wire/SubProtocol.java   |  11 +-
 .../tuweni/rlpx/wire/SubProtocolIdentifier.java    |  13 +-
 .../rlpx/wire/DefaultWireConnectionTest.java       |  17 +++
 28 files changed, 453 insertions(+), 371 deletions(-)

diff --git 
a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthClient.kt 
b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthClient.kt
index 0d3fa1c..5b9e3ed 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthClient.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthClient.kt
@@ -172,7 +172,7 @@ class EthClient(
   }
 
   @Suppress("TYPE_INFERENCE_ONLY_INPUT_TYPES_WARNING")
-  override fun wasRequested(
+  fun wasRequested(
     connection: WireConnection,
     headers: List<BlockHeader>
   ): CompletableAsyncResult<List<BlockHeader>>? {
@@ -184,16 +184,13 @@ class EthClient(
     }
   }
 
-  override fun wasRequested(connection: WireConnection, bodies: 
List<BlockBody>): Request<List<BlockBody>>? =
+  fun wasRequested(connection: WireConnection): Request<List<BlockBody>>? =
     bodiesRequests[connection.uri()]
 
-  override fun nodeDataWasRequested(connection: WireConnection, elements: 
List<Bytes?>): Request<List<Bytes?>>? =
+  fun nodeDataWasRequested(connection: WireConnection): Request<List<Bytes?>>? 
=
     nodeDataRequests[connection.uri()]
 
-  override fun transactionReceiptsRequested(
-    connection: WireConnection,
-    transactionReceipts: List<List<TransactionReceipt>>
-  ): Request<List<List<TransactionReceipt>>>? = 
transactionReceiptRequests[connection.uri()]
+  fun transactionReceiptsRequested(connection: WireConnection): 
Request<List<List<TransactionReceipt>>>? = 
transactionReceiptRequests[connection.uri()]
 
   override suspend fun submitPooledTransaction(vararg tx: Transaction) {
     for (t in tx) {
diff --git 
a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthClient.kt 
b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthClient66.kt
similarity index 68%
copy from devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthClient.kt
copy to devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthClient66.kt
index 0d3fa1c..7937686 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthClient.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthClient66.kt
@@ -17,7 +17,6 @@
 package org.apache.tuweni.devp2p.eth
 
 import org.apache.tuweni.bytes.Bytes
-import org.apache.tuweni.bytes.Bytes32
 import org.apache.tuweni.concurrent.AsyncResult
 import org.apache.tuweni.concurrent.CompletableAsyncResult
 import org.apache.tuweni.eth.Block
@@ -27,45 +26,47 @@ import org.apache.tuweni.eth.Hash
 import org.apache.tuweni.eth.Transaction
 import org.apache.tuweni.eth.TransactionReceipt
 import org.apache.tuweni.eth.repository.TransactionPool
+import org.apache.tuweni.rlp.RLP
 import org.apache.tuweni.rlpx.RLPxService
 import org.apache.tuweni.rlpx.wire.SubProtocolClient
 import org.apache.tuweni.rlpx.wire.WireConnection
 import org.apache.tuweni.units.bigints.UInt256
-import org.slf4j.LoggerFactory
+import org.apache.tuweni.units.bigints.UInt64
 
-val logger = LoggerFactory.getLogger(EthClient::class.java)
 /**
  * Client of the ETH subprotocol, allowing to request block and node data
  */
-class EthClient(
+class EthClient66(
   private val service: RLPxService,
   private val pendingTransactionsPool: TransactionPool,
-  private val connectionSelectionStrategy: ConnectionSelectionStrategy
+  private val connectionSelectionStrategy: ConnectionSelectionStrategy,
 ) :
   EthRequestsManager, SubProtocolClient {
 
-  private val headerRequests = mutableMapOf<Bytes32, 
Request<List<BlockHeader>>>()
-  private val bodiesRequests = HashMap<String, Request<List<BlockBody>>>()
-  private val nodeDataRequests = HashMap<String, Request<List<Bytes?>>>()
-  private val transactionReceiptRequests = HashMap<String, 
Request<List<List<TransactionReceipt>>>>()
+  private val headerRequests = mutableMapOf<Bytes, 
Request<List<BlockHeader>>>()
+  private val bodiesRequests = HashMap<Bytes, Request<List<BlockBody>>>()
+  private val nodeDataRequests = HashMap<Bytes, Request<List<Bytes?>>>()
+  private val transactionReceiptRequests = HashMap<Bytes, 
Request<List<List<TransactionReceipt>>>>()
 
   override fun connectionSelectionStrategy() = connectionSelectionStrategy
 
   override fun requestTransactionReceipts(
     blockHashes: List<Hash>,
-    connection: WireConnection?
+    connection: WireConnection?,
   ): AsyncResult<List<List<TransactionReceipt>>> {
     val conns = service.repository().asIterable(EthSubprotocol.ETH62)
     val handle = AsyncResult.incomplete<List<List<TransactionReceipt>>>()
     var done = false
     conns.forEach { conn ->
-
-      transactionReceiptRequests.computeIfAbsent(conn.uri()) {
+      transactionReceiptRequests.computeIfAbsent(UInt64.random().toBytes()) { 
key ->
         service.send(
           EthSubprotocol.ETH62,
           MessageType.GetReceipts.code,
           conn,
-          GetReceipts(blockHashes).toBytes()
+          RLP.encodeList {
+            it.writeValue(key)
+            it.writeRLP(GetReceipts(blockHashes).toBytes())
+          }
         )
         done = true
         Request(conn.uri(), handle, blockHashes)
@@ -82,17 +83,20 @@ class EthClient(
     maxHeaders: Long,
     skip: Long,
     reverse: Boolean,
-    connection: WireConnection?
+    connection: WireConnection?,
   ): AsyncResult<List<BlockHeader>> {
     logger.info("Requesting headers hash: $blockHash maxHeaders: $maxHeaders 
skip: $skip reverse: $reverse")
     val conn = connectionSelectionStrategy.selectConnection()
     val completion = AsyncResult.incomplete<List<BlockHeader>>()
-    headerRequests.computeIfAbsent(blockHash) {
+    headerRequests.computeIfAbsent(UInt64.random().toBytes()) { key ->
       service.send(
         EthSubprotocol.ETH62,
         MessageType.GetBlockHeaders.code,
         conn!!,
-        GetBlockHeaders(blockHash, maxHeaders, skip, reverse).toBytes()
+        RLP.encodeList {
+          it.writeValue(key)
+          it.writeRLP(GetBlockHeaders(blockHash, maxHeaders, skip, 
reverse).toBytes())
+        }
       )
       Request(connectionId = conn.uri(), handle = completion, data = blockHash)
     }
@@ -104,16 +108,20 @@ class EthClient(
     maxHeaders: Long,
     skip: Long,
     reverse: Boolean,
-    connection: WireConnection?
+    connection: WireConnection?,
   ): AsyncResult<List<BlockHeader>> {
-    val blockNumberBytes = UInt256.valueOf(blockNumber).toBytes()
+    val blockNumberBytes = UInt256.valueOf(blockNumber)
     val completion = AsyncResult.incomplete<List<BlockHeader>>()
-    headerRequests.computeIfAbsent(blockNumberBytes) {
+    headerRequests.computeIfAbsent(UInt64.random().toBytes()) { key ->
       service.send(
         EthSubprotocol.ETH62,
         MessageType.GetBlockHeaders.code,
         connection!!,
-        GetBlockHeaders(blockNumberBytes, maxHeaders, skip, reverse).toBytes()
+        RLP.encodeList {
+          it.writeValue(key)
+          it.writeRLP(GetBlockHeaders(blockNumberBytes, maxHeaders, skip, 
reverse).toBytes())
+        }
+
       )
       Request(connectionId = connection.uri(), handle = completion, data = 
blockNumber)
     }
@@ -122,18 +130,21 @@ class EthClient(
 
   override fun requestBlockHeaders(
     blockHashes: List<Hash>,
-    connection: WireConnection?
+    connection: WireConnection?,
   ): AsyncResult<List<BlockHeader>> {
     return AsyncResult.combine(blockHashes.stream().map { 
requestBlockHeader(it) })
   }
 
   override fun requestBlockHeader(blockHash: Hash, connection: 
WireConnection?): AsyncResult<BlockHeader> {
-    val request = headerRequests.computeIfAbsent(blockHash) {
+    val request = headerRequests.computeIfAbsent(UInt64.random().toBytes()) { 
key ->
       service.send(
         EthSubprotocol.ETH62,
         MessageType.GetBlockHeaders.code,
         connection!!,
-        GetBlockHeaders(blockHash, 1, 0, false).toBytes()
+        RLP.encodeList {
+          it.writeValue(key)
+          it.writeRLP(GetBlockHeaders(blockHash, 1, 0, false).toBytes())
+        }
       )
       val completion = AsyncResult.incomplete<List<BlockHeader>>()
       Request(connectionId = connection.uri(), handle = completion, data = 
blockHash)
@@ -143,12 +154,15 @@ class EthClient(
 
   override fun requestBlockBodies(blockHashes: List<Hash>, connection: 
WireConnection?): AsyncResult<List<BlockBody>> {
     val handle = AsyncResult.incomplete<List<BlockBody>>()
-    bodiesRequests.computeIfAbsent(connection!!.uri()) {
+    bodiesRequests.compute(UInt64.random().toBytes()) { key, _ ->
       service.send(
         EthSubprotocol.ETH62,
         MessageType.GetBlockBodies.code,
-        connection,
-        GetBlockBodies(blockHashes).toBytes()
+        connection!!,
+        RLP.encodeList {
+          it.writeValue(key)
+          it.writeRLP(GetBlockBodies(blockHashes).toBytes())
+        }
       )
       Request(connection.uri(), handle, blockHashes)
     }
@@ -171,38 +185,44 @@ class EthClient(
     return result
   }
 
-  @Suppress("TYPE_INFERENCE_ONLY_INPUT_TYPES_WARNING")
-  override fun wasRequested(
-    connection: WireConnection,
-    headers: List<BlockHeader>
+  fun headersRequested(
+    requestIdentifier: Bytes,
   ): CompletableAsyncResult<List<BlockHeader>>? {
-    val request = headerRequests.remove(headers) ?: return null
-    if (request.connectionId == connection.uri()) {
-      return request.handle
-    } else {
-      return null
-    }
+    val request = headerRequests.remove(requestIdentifier) ?: return null
+    return request.handle
   }
 
-  override fun wasRequested(connection: WireConnection, bodies: 
List<BlockBody>): Request<List<BlockBody>>? =
-    bodiesRequests[connection.uri()]
+  fun bodiesRequested(requestIdentifier: Bytes): Request<List<BlockBody>>? =
+    bodiesRequests[requestIdentifier]
 
-  override fun nodeDataWasRequested(connection: WireConnection, elements: 
List<Bytes?>): Request<List<Bytes?>>? =
-    nodeDataRequests[connection.uri()]
+  fun nodeDataWasRequested(requestIdentifier: Bytes): Request<List<Bytes?>>? =
+    nodeDataRequests[requestIdentifier]
 
-  override fun transactionReceiptsRequested(
-    connection: WireConnection,
-    transactionReceipts: List<List<TransactionReceipt>>
-  ): Request<List<List<TransactionReceipt>>>? = 
transactionReceiptRequests[connection.uri()]
+  fun transactionReceiptsRequested(
+    requestIdentifier: Bytes,
+  ): Request<List<List<TransactionReceipt>>>? = 
transactionReceiptRequests[requestIdentifier]
 
   override suspend fun submitPooledTransaction(vararg tx: Transaction) {
     for (t in tx) {
       pendingTransactionsPool.add(t)
     }
     val hashes = tx.map { it.hash }
-    val conns = service.repository().asIterable(EthSubprotocol.ETH65)
+    val conns = service.repository().asIterable(EthSubprotocol.ETH66)
     conns.forEach { conn ->
       service.send(
+        EthSubprotocol.ETH66,
+        MessageType.NewPooledTransactionHashes.code,
+        conn,
+        RLP.encodeList {
+          it.writeValue(UInt64.random().toBytes())
+          it.writeRLP(GetBlockBodies(hashes).toBytes())
+        }
+      )
+    }
+
+    val conns65 = service.repository().asIterable(EthSubprotocol.ETH65)
+    conns65.forEach { conn ->
+      service.send(
         EthSubprotocol.ETH65,
         MessageType.NewPooledTransactionHashes.code,
         conn,
diff --git 
a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthController.kt 
b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthController.kt
index 469f308..db94de5 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthController.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthController.kt
@@ -34,7 +34,7 @@ class EthController(
   val repository: BlockchainRepository,
   val pendingTransactionsPool: TransactionPool,
   val requestsManager: EthRequestsManager,
-  val connectionsListener: (WireConnection, Status) -> Unit = { _, _ -> }
+  val connectionsListener: (WireConnection, Status) -> Unit = { _, _ -> },
 ) {
 
   suspend fun findTransactionReceipts(hashes: List<Hash>): 
List<List<TransactionReceipt>> {
@@ -104,15 +104,33 @@ class EthController(
     requestsManager.requestBlockBodies(listOf(blockHash))
   }
 
-  suspend fun addNewBlockHeaders(connection: WireConnection, headers: 
List<BlockHeader>) {
-    val request = requestsManager.wasRequested(connection, headers)
-    if (request != null) {
-      request.complete(headers)
+  suspend fun addNewBlockHeaders(connection: WireConnection, 
requestIdentifier: Bytes?, headers: List<BlockHeader>) {
+    val request = when (requestsManager) {
+      is EthClient -> {
+        requestsManager.wasRequested(connection, headers)
+      }
+      is EthClient66 -> {
+        requestsManager.headersRequested(requestIdentifier!!)
+      }
+      else -> {
+        throw IllegalArgumentException("Unsupported requestsManager")
+      }
     }
+    request?.complete(headers)
   }
 
-  suspend fun addNewBlockBodies(connection: WireConnection, bodies: 
List<BlockBody>) {
-    val request = requestsManager.wasRequested(connection, bodies)
+  suspend fun addNewBlockBodies(connection: WireConnection, requestIdentifier: 
Bytes?, bodies: List<BlockBody>) {
+    val request = when (requestsManager) {
+      is EthClient -> {
+        requestsManager.wasRequested(connection)
+      }
+      is EthClient66 -> {
+        requestsManager.bodiesRequested(requestIdentifier!!)
+      }
+      else -> {
+        throw IllegalArgumentException("Unsupported requestsManager")
+      }
+    }
     if (request != null) {
       val hashes = request.data as List<*>
       for (i in 0..hashes.size) {
@@ -128,8 +146,18 @@ class EthController(
 
   suspend fun findNodeData(hashes: List<Hash>) = 
repository.retrieveNodeData(hashes)
 
-  suspend fun addNewNodeData(connection: WireConnection, elements: 
List<Bytes?>) {
-    val request = requestsManager.nodeDataWasRequested(connection, elements)
+  suspend fun addNewNodeData(connection: WireConnection, requestIdentifier: 
Bytes?, elements: List<Bytes?>) {
+    val request = when (requestsManager) {
+      is EthClient -> {
+        requestsManager.nodeDataWasRequested(connection)
+      }
+      is EthClient66 -> {
+        requestsManager.nodeDataWasRequested(requestIdentifier!!)
+      }
+      else -> {
+        throw IllegalArgumentException("Unsupported requestsManager")
+      }
+    }
     if (request != null) {
       val hashes = request.data as List<*>
       for (i in 0..hashes.size) {
@@ -144,9 +172,20 @@ class EthController(
 
   suspend fun addNewTransactionReceipts(
     connection: WireConnection,
-    transactionReceipts: List<List<TransactionReceipt>>
+    requestIdentifier: Bytes?,
+    transactionReceipts: List<List<TransactionReceipt>>,
   ) {
-    val request = requestsManager.transactionReceiptsRequested(connection, 
transactionReceipts)
+    val request = when (requestsManager) {
+      is EthClient -> {
+        requestsManager.transactionReceiptsRequested(connection)
+      }
+      is EthClient66 -> {
+        requestsManager.transactionReceiptsRequested(requestIdentifier!!)
+      }
+      else -> {
+        throw IllegalArgumentException("Unsupported requestsManager")
+      }
+    }
     if (request != null) {
       val hashes = request.data as List<*>
       for (i in 0..hashes.size) {
diff --git 
a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt 
b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt
index 34614f7..d989fe7 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt
@@ -37,7 +37,7 @@ internal class EthHandler(
   override val coroutineContext: CoroutineContext = Dispatchers.Default,
   private val blockchainInfo: BlockchainInformation,
   private val service: RLPxService,
-  private val controller: EthController
+  private val controller: EthController,
 ) : SubProtocolHandler, CoroutineScope {
 
   private val pendingStatus = WeakHashMap<String, PeerInfo>()
@@ -99,7 +99,7 @@ internal class EthHandler(
   }
 
   private suspend fun handleNodeData(connection: WireConnection, read: 
NodeData) {
-    controller.addNewNodeData(connection, read.elements)
+    controller.addNewNodeData(connection, null, read.elements)
   }
 
   private suspend fun handleStatus(connection: WireConnection, status: 
StatusMessage) {
@@ -153,7 +153,7 @@ internal class EthHandler(
   }
 
   private suspend fun handleReceipts(connection: WireConnection, receipts: 
Receipts) {
-    controller.addNewTransactionReceipts(connection, 
receipts.transactionReceipts)
+    controller.addNewTransactionReceipts(connection, null, 
receipts.transactionReceipts)
   }
 
   private suspend fun handleGetReceipts(connection: WireConnection, 
getReceipts: GetReceipts) {
@@ -180,7 +180,7 @@ internal class EthHandler(
   }
 
   private suspend fun handleBlockBodies(connection: WireConnection, message: 
BlockBodies) {
-    controller.addNewBlockBodies(connection, message.bodies)
+    controller.addNewBlockBodies(connection, null, message.bodies)
   }
 
   private suspend fun handleGetBlockBodies(connection: WireConnection, 
message: GetBlockBodies) {
@@ -193,7 +193,7 @@ internal class EthHandler(
   }
 
   private suspend fun handleHeaders(connection: WireConnection, headers: 
BlockHeaders) {
-    controller.addNewBlockHeaders(connection, headers.headers)
+    controller.addNewBlockHeaders(connection, null, headers.headers)
   }
 
   private suspend fun handleGetBlockHeaders(connection: WireConnection, 
blockHeaderRequest: GetBlockHeaders) {
@@ -216,7 +216,7 @@ internal class EthHandler(
     service.send(
       EthSubprotocol.ETH64, MessageType.Status.code, connection,
       StatusMessage(
-        EthSubprotocol.ETH64.version(),
+        EthSubprotocol.ETH65.version(),
         blockchainInfo.networkID(), blockchainInfo.totalDifficulty(),
         blockchainInfo.bestHash(), blockchainInfo.genesisHash(), 
blockchainInfo.getLatestForkHash(),
         blockchainInfo.getLatestFork()
@@ -230,7 +230,7 @@ internal class EthHandler(
   }
 }
 
-internal class PeerInfo() {
+internal class PeerInfo {
 
   val ready: CompletableAsyncCompletion = AsyncCompletion.incomplete()
 
diff --git 
a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt 
b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler66.kt
similarity index 64%
copy from devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt
copy to devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler66.kt
index 34614f7..f340d7f 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler66.kt
@@ -20,24 +20,25 @@ import kotlinx.coroutines.CoroutineScope
 import kotlinx.coroutines.Dispatchers
 import org.apache.tuweni.bytes.Bytes
 import org.apache.tuweni.concurrent.AsyncCompletion
-import org.apache.tuweni.concurrent.CompletableAsyncCompletion
 import org.apache.tuweni.concurrent.coroutines.asyncCompletion
 import org.apache.tuweni.eth.Hash
+import org.apache.tuweni.rlp.RLP
 import org.apache.tuweni.rlpx.RLPxService
 import org.apache.tuweni.rlpx.wire.DisconnectReason
 import org.apache.tuweni.rlpx.wire.SubProtocolHandler
 import org.apache.tuweni.rlpx.wire.WireConnection
+import org.apache.tuweni.units.bigints.UInt64
 import org.slf4j.LoggerFactory
 import java.util.WeakHashMap
 import kotlin.collections.ArrayList
 import kotlin.collections.set
 import kotlin.coroutines.CoroutineContext
 
-internal class EthHandler(
+internal class EthHandler66(
   override val coroutineContext: CoroutineContext = Dispatchers.Default,
   private val blockchainInfo: BlockchainInformation,
   private val service: RLPxService,
-  private val controller: EthController
+  private val controller: EthController,
 ) : SubProtocolHandler, CoroutineScope {
 
   private val pendingStatus = WeakHashMap<String, PeerInfo>()
@@ -48,32 +49,37 @@ internal class EthHandler(
     val MAX_POOLED_TX = 256
   }
 
-  override fun handle(connection: WireConnection, messageType: Int, message: 
Bytes) = asyncCompletion {
+  override fun handle(connection: WireConnection, messageType: Int, payload: 
Bytes) = asyncCompletion {
     logger.debug("Receiving message of type {}", messageType)
+    val pair = RLP.decode(payload) {
+      Pair(it.readValue(), it.readRemaining())
+    }
+    val requestIdentifier = pair.first
+    val message = pair.second
+
     when (messageType) {
       MessageType.Status.code -> handleStatus(connection, 
StatusMessage.read(message))
       MessageType.NewBlockHashes.code -> 
handleNewBlockHashes(NewBlockHashes.read(message))
       MessageType.Transactions.code -> 
handleTransactions(Transactions.read(message))
-      MessageType.GetBlockHeaders.code -> handleGetBlockHeaders(connection, 
GetBlockHeaders.read(message))
-      MessageType.BlockHeaders.code -> handleHeaders(connection, 
BlockHeaders.read(message))
-      MessageType.GetBlockBodies.code -> handleGetBlockBodies(connection, 
GetBlockBodies.read(message))
-      MessageType.BlockBodies.code -> handleBlockBodies(connection, 
BlockBodies.read(message))
+      MessageType.GetBlockHeaders.code -> handleGetBlockHeaders(connection, 
requestIdentifier, GetBlockHeaders.read(message))
+      MessageType.BlockHeaders.code -> handleHeaders(connection, 
requestIdentifier, BlockHeaders.read(message))
+      MessageType.GetBlockBodies.code -> handleGetBlockBodies(connection, 
requestIdentifier, GetBlockBodies.read(message))
+      MessageType.BlockBodies.code -> handleBlockBodies(connection, 
requestIdentifier, BlockBodies.read(message))
       MessageType.NewBlock.code -> handleNewBlock(NewBlock.read(message))
-      MessageType.GetNodeData.code -> handleGetNodeData(connection, 
GetNodeData.read(message))
-      MessageType.NodeData.code -> handleNodeData(connection, 
NodeData.read(message))
-      MessageType.GetReceipts.code -> handleGetReceipts(connection, 
GetReceipts.read(message))
-      MessageType.Receipts.code -> handleReceipts(connection, 
Receipts.read(message))
+      MessageType.GetNodeData.code -> handleGetNodeData(connection, 
requestIdentifier, GetNodeData.read(message))
+      MessageType.NodeData.code -> handleNodeData(connection, 
requestIdentifier, NodeData.read(message))
+      MessageType.GetReceipts.code -> handleGetReceipts(connection, 
requestIdentifier, GetReceipts.read(message))
+      MessageType.Receipts.code -> handleReceipts(connection, 
requestIdentifier, Receipts.read(message))
       MessageType.NewPooledTransactionHashes.code -> 
handleNewPooledTransactionHashes(
-        connection,
-        NewPooledTransactionHashes.read(message)
+        connection, NewPooledTransactionHashes.read(message)
       )
       MessageType.GetPooledTransactions.code -> handleGetPooledTransactions(
-        connection,
+        connection, requestIdentifier,
         GetPooledTransactions.read(message)
       )
       MessageType.PooledTransactions.code -> 
handlePooledTransactions(PooledTransactions.read(message))
       else -> {
-        logger.warn("Unknown message type {}", messageType)
+        logger.warn("Unknown message type {} with request identifier {}", 
messageType, requestIdentifier)
         service.disconnect(connection, DisconnectReason.SUBPROTOCOL_REASON)
       }
     }
@@ -83,14 +89,17 @@ internal class EthHandler(
     controller.addNewPooledTransactions(read.transactions)
   }
 
-  private suspend fun handleGetPooledTransactions(connection: WireConnection, 
read: GetPooledTransactions) {
+  private suspend fun handleGetPooledTransactions(connection: WireConnection, 
requestIdentifier: Bytes, read: GetPooledTransactions) {
     val tx = controller.findPooledTransactions(read.hashes)
     logger.debug("Responding to GetPooledTransactions with {} transactions", 
tx.size)
     service.send(
-      EthSubprotocol.ETH65,
+      EthSubprotocol.ETH66,
       MessageType.PooledTransactions.code,
       connection,
-      PooledTransactions(tx).toBytes()
+      RLP.encodeList {
+        it.writeValue(requestIdentifier)
+        it.writeRLP(PooledTransactions(tx).toBytes())
+      }
     )
   }
 
@@ -98,8 +107,8 @@ internal class EthHandler(
     controller.addNewTransactions(transactions.transactions)
   }
 
-  private suspend fun handleNodeData(connection: WireConnection, read: 
NodeData) {
-    controller.addNewNodeData(connection, read.elements)
+  private suspend fun handleNodeData(connection: WireConnection, 
requestIdentifier: Bytes, read: NodeData) {
+    controller.addNewNodeData(connection, requestIdentifier, read.elements)
   }
 
   private suspend fun handleStatus(connection: WireConnection, status: 
StatusMessage) {
@@ -133,10 +142,13 @@ internal class EthHandler(
       }
       if (missingTx.size == MAX_POOLED_TX) {
         service.send(
-          EthSubprotocol.ETH65,
+          EthSubprotocol.ETH66,
           MessageType.GetPooledTransactions.code,
           connection,
-          message.toBytes()
+          RLP.encodeList {
+            it.writeValue(UInt64.random().toBytes())
+            it.writeRLP(message.toBytes())
+          }
         )
         missingTx = ArrayList()
         message = GetPooledTransactions(missingTx)
@@ -144,34 +156,44 @@ internal class EthHandler(
     }
     if (!missingTx.isEmpty()) {
       service.send(
-        EthSubprotocol.ETH65,
+        EthSubprotocol.ETH66,
         MessageType.GetPooledTransactions.code,
         connection,
-        message.toBytes()
+        RLP.encodeList {
+          it.writeValue(UInt64.random().toBytes())
+          it.writeRLP(message.toBytes())
+        }
       )
     }
   }
 
-  private suspend fun handleReceipts(connection: WireConnection, receipts: 
Receipts) {
-    controller.addNewTransactionReceipts(connection, 
receipts.transactionReceipts)
+  private suspend fun handleReceipts(connection: WireConnection, 
requestIdentifier: Bytes, receipts: Receipts) {
+    controller.addNewTransactionReceipts(connection, requestIdentifier, 
receipts.transactionReceipts)
   }
 
-  private suspend fun handleGetReceipts(connection: WireConnection, 
getReceipts: GetReceipts) {
-
+  private suspend fun handleGetReceipts(connection: WireConnection, 
requestIdentifier: Bytes, getReceipts: GetReceipts) {
+    val receipts = controller.findTransactionReceipts(getReceipts.hashes)
     service.send(
-      EthSubprotocol.ETH64,
+      EthSubprotocol.ETH66,
       MessageType.Receipts.code,
       connection,
-      
Receipts(controller.findTransactionReceipts(getReceipts.hashes)).toBytes()
+      RLP.encodeList {
+        it.writeValue(requestIdentifier)
+        it.writeRLP(Receipts(receipts).toBytes())
+      }
     )
   }
 
-  private suspend fun handleGetNodeData(connection: WireConnection, nodeData: 
GetNodeData) {
+  private suspend fun handleGetNodeData(connection: WireConnection, 
requestIdentifier: Bytes, nodeData: GetNodeData) {
+    val data = controller.findNodeData(nodeData.hashes)
     service.send(
-      EthSubprotocol.ETH64,
+      EthSubprotocol.ETH66,
       MessageType.NodeData.code,
       connection,
-      NodeData(controller.findNodeData(nodeData.hashes)).toBytes()
+      RLP.encodeList {
+        it.writeValue(requestIdentifier)
+        it.writeRLP(NodeData(data).toBytes())
+      }
     )
   }
 
@@ -179,31 +201,41 @@ internal class EthHandler(
     controller.addNewBlock(read.block)
   }
 
-  private suspend fun handleBlockBodies(connection: WireConnection, message: 
BlockBodies) {
-    controller.addNewBlockBodies(connection, message.bodies)
+  private suspend fun handleBlockBodies(connection: WireConnection, 
requestIdentifier: Bytes, message: BlockBodies) {
+    controller.addNewBlockBodies(connection, requestIdentifier, message.bodies)
   }
 
-  private suspend fun handleGetBlockBodies(connection: WireConnection, 
message: GetBlockBodies) {
+  private suspend fun handleGetBlockBodies(connection: WireConnection, 
requestIdentifier: Bytes, message: GetBlockBodies) {
+    val bodies = BlockBodies(controller.findBlockBodies(message.hashes))
     service.send(
-      EthSubprotocol.ETH64,
+      EthSubprotocol.ETH66,
       MessageType.BlockBodies.code,
       connection,
-      BlockBodies(controller.findBlockBodies(message.hashes)).toBytes()
+      RLP.encodeList {
+        it.writeValue(requestIdentifier)
+        it.writeRLP(bodies.toBytes())
+      }
     )
   }
 
-  private suspend fun handleHeaders(connection: WireConnection, headers: 
BlockHeaders) {
-    controller.addNewBlockHeaders(connection, headers.headers)
+  private suspend fun handleHeaders(connection: WireConnection, 
requestIdentifier: Bytes, headers: BlockHeaders) {
+    controller.addNewBlockHeaders(connection, requestIdentifier, 
headers.headers)
   }
 
-  private suspend fun handleGetBlockHeaders(connection: WireConnection, 
blockHeaderRequest: GetBlockHeaders) {
+  private suspend fun handleGetBlockHeaders(connection: WireConnection, 
requestIdentifier: Bytes, blockHeaderRequest: GetBlockHeaders) {
     val headers = controller.findHeaders(
       blockHeaderRequest.block,
       blockHeaderRequest.maxHeaders,
       blockHeaderRequest.skip,
       blockHeaderRequest.reverse
     )
-    service.send(EthSubprotocol.ETH64, MessageType.BlockHeaders.code, 
connection, BlockHeaders(headers).toBytes())
+    service.send(
+      EthSubprotocol.ETH66, MessageType.BlockHeaders.code, connection,
+      RLP.encodeList {
+        it.writeValue(requestIdentifier)
+        it.writeRLP(BlockHeaders(headers).toBytes())
+      }
+    )
   }
 
   private suspend fun handleNewBlockHashes(message: NewBlockHashes) {
@@ -214,13 +246,18 @@ internal class EthHandler(
     val newPeer = PeerInfo()
     pendingStatus[connection.uri()] = newPeer
     service.send(
-      EthSubprotocol.ETH64, MessageType.Status.code, connection,
-      StatusMessage(
-        EthSubprotocol.ETH64.version(),
-        blockchainInfo.networkID(), blockchainInfo.totalDifficulty(),
-        blockchainInfo.bestHash(), blockchainInfo.genesisHash(), 
blockchainInfo.getLatestForkHash(),
-        blockchainInfo.getLatestFork()
-      ).toBytes()
+      EthSubprotocol.ETH66, MessageType.Status.code, connection,
+      RLP.encodeList {
+        it.writeValue(UInt64.random().toBytes())
+        it.writeRLP(
+          StatusMessage(
+            EthSubprotocol.ETH66.version(),
+            blockchainInfo.networkID(), blockchainInfo.totalDifficulty(),
+            blockchainInfo.bestHash(), blockchainInfo.genesisHash(), 
blockchainInfo.getLatestForkHash(),
+            blockchainInfo.getLatestFork()
+          ).toBytes()
+        )
+      }
     )
 
     return newPeer.ready
@@ -229,16 +266,3 @@ internal class EthHandler(
   override fun stop() = asyncCompletion {
   }
 }
-
-internal class PeerInfo() {
-
-  val ready: CompletableAsyncCompletion = AsyncCompletion.incomplete()
-
-  fun connect() {
-    ready.complete()
-  }
-
-  fun cancel() {
-    ready.cancel()
-  }
-}
diff --git 
a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHelloSubprotocol.kt
 
b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHelloSubprotocol.kt
index 5d6dbbc..e7dc548 100644
--- 
a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHelloSubprotocol.kt
+++ 
b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHelloSubprotocol.kt
@@ -32,10 +32,10 @@ class EthHelloSubprotocol(
 ) : SubProtocol {
 
   companion object {
-    val ETH62 = SubProtocolIdentifier.of("eth", 62)
-    val ETH63 = SubProtocolIdentifier.of("eth", 63)
-    val ETH64 = SubProtocolIdentifier.of("eth", 64)
-    val ETH65 = SubProtocolIdentifier.of("eth", 65)
+    val ETH62 = SubProtocolIdentifier.of("eth", 62, 8)
+    val ETH63 = SubProtocolIdentifier.of("eth", 63, 17)
+    val ETH64 = SubProtocolIdentifier.of("eth", 64, 17)
+    val ETH65 = SubProtocolIdentifier.of("eth", 65, 17)
   }
 
   override fun id(): SubProtocolIdentifier = ETH65
@@ -48,14 +48,6 @@ class EthHelloSubprotocol(
       )
   }
 
-  override fun versionRange(version: Int): Int {
-    return if (version == ETH62.version()) {
-      8
-    } else {
-      17
-    }
-  }
-
   override fun createHandler(service: RLPxService, client: SubProtocolClient): 
SubProtocolHandler {
     val controller = EthHelloController(listener)
     return EthHelloHandler(coroutineContext, blockchainInfo, service, 
controller)
@@ -63,7 +55,7 @@ class EthHelloSubprotocol(
 
   override fun getCapabilities() = mutableListOf(ETH62, ETH63, ETH64, ETH65)
 
-  override fun createClient(service: RLPxService): SubProtocolClient {
+  override fun createClient(service: RLPxService, identifier: 
SubProtocolIdentifier): SubProtocolClient {
     return EthHelloClient(service)
   }
 }
diff --git 
a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthRequestsManager.kt 
b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthRequestsManager.kt
index f5192d3..b598af3 100644
--- 
a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthRequestsManager.kt
+++ 
b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthRequestsManager.kt
@@ -16,7 +16,6 @@
  */
 package org.apache.tuweni.devp2p.eth
 
-import org.apache.tuweni.bytes.Bytes
 import org.apache.tuweni.concurrent.AsyncResult
 import org.apache.tuweni.concurrent.CompletableAsyncResult
 import org.apache.tuweni.eth.Block
@@ -128,41 +127,6 @@ interface EthRequestsManager {
   ): AsyncResult<List<List<TransactionReceipt>>>
 
   /**
-   * Checks if a request was made to get block headers
-   * @param connection the wire connection sending data
-   * @params headers list of block headers just received
-   * @return a handle to the completion of the operation, or null if no such 
request was placed
-   */
-  fun wasRequested(connection: WireConnection, headers: List<BlockHeader>): 
CompletableAsyncResult<List<BlockHeader>>?
-
-  /**
-   * Checks if a request was made to get block bodies
-   * @param connection the wire connection sending data
-   * @param bodies the bodies just received
-   * @return a handle to the completion of the operation, with metadata, or 
null if no such request was placed
-   */
-  fun wasRequested(connection: WireConnection, bodies: List<BlockBody>): 
Request<List<BlockBody>>?
-
-  /**
-   * Checks if a request was made to get node data
-   * @param connection the wire connection sending data
-   * @param elements the data just received
-   * @return a handle to the completion of the operation, with metadata, or 
null if no such request was placed
-   */
-  fun nodeDataWasRequested(connection: WireConnection, elements: 
List<Bytes?>): Request<List<Bytes?>>?
-
-  /**
-   * Checks if a request was made to get transaction receipts
-   * @param connection the wire connection sending data
-   * @param transactionReceipts the transaction receipts just received
-   * @return a handle to the completion of the operation, with metadata, or 
null if no such request was placed
-   */
-  fun transactionReceiptsRequested(
-    connection: WireConnection,
-    transactionReceipts: List<List<TransactionReceipt>>
-  ): Request<List<List<TransactionReceipt>>>?
-
-  /**
    * Submits a new pending transaction to the transaction pool to be gossiped 
to peers.
    * @param tx a new transaction
    */
diff --git 
a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocol.kt 
b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocol.kt
index 100835c..83b91f6 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocol.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocol.kt
@@ -39,38 +39,43 @@ class EthSubprotocol(
 ) : SubProtocol {
 
   companion object {
-    val ETH62 = SubProtocolIdentifier.of("eth", 62)
-    val ETH63 = SubProtocolIdentifier.of("eth", 63)
-    val ETH64 = SubProtocolIdentifier.of("eth", 64)
-    val ETH65 = SubProtocolIdentifier.of("eth", 65)
+    val ETH62 = SubProtocolIdentifier.of("eth", 62, 8)
+    val ETH63 = SubProtocolIdentifier.of("eth", 63, 17)
+    val ETH64 = SubProtocolIdentifier.of("eth", 64, 17)
+    val ETH65 = SubProtocolIdentifier.of("eth", 65, 17)
+    val ETH66 = SubProtocolIdentifier.of("eth", 66, 17)
   }
 
-  override fun id(): SubProtocolIdentifier = ETH65
+  override fun id(): SubProtocolIdentifier = ETH66
 
   override fun supports(subProtocolIdentifier: SubProtocolIdentifier): Boolean 
{
     return "eth".equals(subProtocolIdentifier.name()) && (
       subProtocolIdentifier.version() == ETH62.version() ||
         subProtocolIdentifier.version() == ETH63.version() || 
subProtocolIdentifier.version() == ETH64.version() ||
-        subProtocolIdentifier.version() == ETH65.version()
+        subProtocolIdentifier.version() == ETH65.version() || 
subProtocolIdentifier.version() == ETH66.version()
       )
   }
 
-  override fun versionRange(version: Int): Int {
-    return if (version == ETH62.version()) {
-      8
-    } else {
-      17
-    }
-  }
-
   override fun createHandler(service: RLPxService, client: SubProtocolClient): 
SubProtocolHandler {
     val controller = EthController(repository, pendingTransactionsPool, client 
as EthRequestsManager, listener)
-    return EthHandler(coroutineContext, blockchainInfo, service, controller)
+    if (client is EthClient66) {
+      return EthHandler66(coroutineContext, blockchainInfo, service, 
controller)
+    } else {
+      return EthHandler(coroutineContext, blockchainInfo, service, controller)
+    }
   }
 
-  override fun getCapabilities() = mutableListOf(ETH62, ETH63, ETH64, ETH65)
+  override fun getCapabilities() = mutableListOf(ETH62, ETH63, ETH64, ETH65, 
ETH66)
 
-  override fun createClient(service: RLPxService): SubProtocolClient {
-    return EthClient(service, pendingTransactionsPool, 
selectionStrategy(service.repository()))
+  override fun createClient(service: RLPxService, identifier: 
SubProtocolIdentifier): SubProtocolClient {
+    if (identifier == ETH66) {
+      return EthClient66(
+        service,
+        pendingTransactionsPool,
+        selectionStrategy(service.repository())
+      )
+    } else {
+      return EthClient(service, pendingTransactionsPool, 
selectionStrategy(service.repository()))
+    }
   }
 }
diff --git 
a/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocolTest.kt 
b/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocolTest.kt
index e407c38..0d3cacc 100644
--- 
a/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocolTest.kt
+++ 
b/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocolTest.kt
@@ -17,6 +17,11 @@
 package org.apache.tuweni.devp2p.eth
 
 import org.apache.tuweni.bytes.Bytes32
+import org.apache.tuweni.devp2p.eth.EthHelloSubprotocol.Companion.ETH63
+import org.apache.tuweni.devp2p.eth.EthHelloSubprotocol.Companion.ETH64
+import org.apache.tuweni.devp2p.eth.EthSubprotocol.Companion.ETH62
+import org.apache.tuweni.devp2p.eth.EthSubprotocol.Companion.ETH65
+import org.apache.tuweni.devp2p.eth.EthSubprotocol.Companion.ETH66
 import org.apache.tuweni.eth.Hash
 import org.apache.tuweni.eth.repository.BlockchainRepository
 import org.apache.tuweni.eth.repository.MemoryTransactionPool
@@ -49,7 +54,7 @@ class EthSubprotocolTest {
       repository = repository,
       pendingTransactionsPool = MemoryTransactionPool()
     )
-    assertEquals(SubProtocolIdentifier.of("eth", 65), eth.id())
+    assertEquals(SubProtocolIdentifier.of("eth", 66), eth.id())
   }
 
   @Test
@@ -60,6 +65,7 @@ class EthSubprotocolTest {
       repository = repository,
       pendingTransactionsPool = MemoryTransactionPool()
     )
+    assertTrue(eth.supports(SubProtocolIdentifier.of("eth", 66)))
     assertTrue(eth.supports(SubProtocolIdentifier.of("eth", 65)))
     assertTrue(eth.supports(SubProtocolIdentifier.of("eth", 64)))
     assertTrue(eth.supports(SubProtocolIdentifier.of("eth", 63)))
@@ -70,15 +76,10 @@ class EthSubprotocolTest {
 
   @Test
   fun rangeCheck() {
-    val repository = BlockchainRepository.inMemory()
-    val eth = EthSubprotocol(
-      blockchainInfo = blockchainInfo,
-      repository = repository,
-      pendingTransactionsPool = MemoryTransactionPool()
-    )
-    assertEquals(8, eth.versionRange(62))
-    assertEquals(17, eth.versionRange(63))
-    assertEquals(17, eth.versionRange(64))
-    assertEquals(17, eth.versionRange(65))
+    assertEquals(8, ETH62.versionRange())
+    assertEquals(17, ETH63.versionRange())
+    assertEquals(17, ETH64.versionRange())
+    assertEquals(17, ETH65.versionRange())
+    assertEquals(17, ETH66.versionRange())
   }
 }
diff --git 
a/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/ProxySubprotocol.kt
 
b/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/ProxySubprotocol.kt
index e415558..0ba26fa 100644
--- 
a/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/ProxySubprotocol.kt
+++ 
b/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/ProxySubprotocol.kt
@@ -21,20 +21,18 @@ import org.apache.tuweni.rlpx.wire.SubProtocol
 import org.apache.tuweni.rlpx.wire.SubProtocolClient
 import org.apache.tuweni.rlpx.wire.SubProtocolIdentifier
 
-class ProxySubprotocol() : SubProtocol {
+class ProxySubprotocol : SubProtocol {
 
   companion object {
-    val ID = SubProtocolIdentifier.of("pxy", 1)
+    val ID = SubProtocolIdentifier.of("pxy", 1, 3)
   }
 
   override fun id() = ID
 
-  override fun supports(subProtocolIdentifier: SubProtocolIdentifier): Boolean 
= subProtocolIdentifier.equals(ID)
-
-  override fun versionRange(version: Int): Int = 3
+  override fun supports(subProtocolIdentifier: SubProtocolIdentifier): Boolean 
= subProtocolIdentifier == ID
 
   override fun createHandler(service: RLPxService, client: SubProtocolClient) =
     ProxyHandler(service = service, client = client as ProxyClient)
 
-  override fun createClient(service: RLPxService) = ProxyClient(service)
+  override fun createClient(service: RLPxService, subProtocolIdentifier: 
SubProtocolIdentifier) = ProxyClient(service)
 }
diff --git 
a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt 
b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt
index 493f82a..aa008be 100644
--- a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt
+++ b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt
@@ -27,8 +27,9 @@ import org.apache.lucene.store.NIOFSDirectory
 import org.apache.tuweni.concurrent.AsyncCompletion
 import org.apache.tuweni.concurrent.coroutines.await
 import org.apache.tuweni.crypto.SECP256K1
-import org.apache.tuweni.devp2p.eth.EthClient
+import org.apache.tuweni.devp2p.eth.EthRequestsManager
 import org.apache.tuweni.devp2p.eth.EthSubprotocol
+import org.apache.tuweni.devp2p.eth.EthSubprotocol.Companion.ETH66
 import org.apache.tuweni.devp2p.eth.SimpleBlockchainInformation
 import org.apache.tuweni.eth.genesis.GenesisFile
 import org.apache.tuweni.eth.repository.BlockchainIndex
@@ -163,36 +164,37 @@ class EthereumClient(
           adapter
         )
         services[rlpxConfig.getName()] = service
-        peerRepository.addIdentityListener {
-          service.connectTo(
-            it.publicKey(),
-            InetSocketAddress(it.networkInterface(), it.port())
+        service.start().thenRun {
+          logger.info("Started Ethereum client ${rlpxConfig.getName()}")
+          peerRepository.addIdentityListener {
+            service.connectTo(
+              it.publicKey(),
+              InetSocketAddress(it.networkInterface(), it.port())
+            )
+          }
+          val synchronizer = PeerStatusEthSynchronizer(
+            repository = repository,
+            client = service.getClient(ETH66) as EthRequestsManager,
+            peerRepository = peerRepository,
+            adapter = adapter
+          )
+          synchronizers[rlpxConfig.getName() + "status"] = synchronizer
+          synchronizer.start()
+          val parentSynchronizer = FromUnknownParentSynchronizer(
+            repository = repository,
+            client = service.getClient(ETH66) as EthRequestsManager,
+            peerRepository = peerRepository
+          )
+          synchronizers[rlpxConfig.getName() + "parent"] = parentSynchronizer
+          parentSynchronizer.start()
+          val bestSynchronizer = FromBestBlockSynchronizer(
+            repository = repository,
+            client = service.getClient(ETH66) as EthRequestsManager,
+            peerRepository = peerRepository
           )
+          synchronizers[rlpxConfig.getName() + "best"] = bestSynchronizer
+          bestSynchronizer.start()
         }
-        val synchronizer = PeerStatusEthSynchronizer(
-          repository = repository,
-          client = ethSubprotocol.createClient(service) as EthClient,
-          peerRepository = peerRepository,
-          adapter = adapter
-        )
-        synchronizers[rlpxConfig.getName() + "status"] = synchronizer
-        synchronizer.start()
-        val parentSynchronizer = FromUnknownParentSynchronizer(
-          repository = repository,
-          client = ethSubprotocol.createClient(service) as EthClient,
-          peerRepository = peerRepository
-        )
-        synchronizers[rlpxConfig.getName() + "parent"] = parentSynchronizer
-        parentSynchronizer.start()
-        val bestSynchronizer = FromBestBlockSynchronizer(
-          repository = repository,
-          client = ethSubprotocol.createClient(service) as EthClient,
-          peerRepository = peerRepository
-        )
-        synchronizers[rlpxConfig.getName() + "best"] = bestSynchronizer
-        bestSynchronizer.start()
-        logger.info("Started Ethereum client ${rlpxConfig.getName()}")
-        service.start()
       }
     ).await()
 
diff --git 
a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromBestBlockSynchronizer.kt
 
b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromBestBlockSynchronizer.kt
index 7337455..56d92ad 100644
--- 
a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromBestBlockSynchronizer.kt
+++ 
b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromBestBlockSynchronizer.kt
@@ -19,7 +19,7 @@ package org.apache.tuweni.ethclient
 import kotlinx.coroutines.asCoroutineDispatcher
 import kotlinx.coroutines.delay
 import kotlinx.coroutines.launch
-import org.apache.tuweni.devp2p.eth.EthClient
+import org.apache.tuweni.devp2p.eth.EthRequestsManager
 import org.apache.tuweni.eth.repository.BlockchainRepository
 import java.util.concurrent.ExecutorService
 import java.util.concurrent.Executors
@@ -31,7 +31,7 @@ class FromBestBlockSynchronizer(
   executor: ExecutorService = Executors.newSingleThreadExecutor(),
   coroutineContext: CoroutineContext = executor.asCoroutineDispatcher(),
   repository: BlockchainRepository,
-  client: EthClient,
+  client: EthRequestsManager,
   peerRepository: EthereumPeerRepository
 ) : Synchronizer(executor, coroutineContext, repository, client, 
peerRepository) {
 
diff --git 
a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromUnknownParentSynchronizer.kt
 
b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromUnknownParentSynchronizer.kt
index edccb21..f9d07cf 100644
--- 
a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromUnknownParentSynchronizer.kt
+++ 
b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromUnknownParentSynchronizer.kt
@@ -19,7 +19,7 @@ package org.apache.tuweni.ethclient
 import kotlinx.coroutines.asCoroutineDispatcher
 import kotlinx.coroutines.delay
 import kotlinx.coroutines.launch
-import org.apache.tuweni.devp2p.eth.EthClient
+import org.apache.tuweni.devp2p.eth.EthRequestsManager
 import org.apache.tuweni.eth.BlockHeader
 import org.apache.tuweni.eth.repository.BlockchainRepository
 import java.util.concurrent.ExecutorService
@@ -33,7 +33,7 @@ class FromUnknownParentSynchronizer(
   executor: ExecutorService = Executors.newSingleThreadExecutor(),
   coroutineContext: CoroutineContext = executor.asCoroutineDispatcher(),
   repository: BlockchainRepository,
-  client: EthClient,
+  client: EthRequestsManager,
   peerRepository: EthereumPeerRepository
 ) : Synchronizer(executor, coroutineContext, repository, client, 
peerRepository) {
 
diff --git 
a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/PeerStatusEthSynchronizer.kt
 
b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/PeerStatusEthSynchronizer.kt
index 101eb6f..605aabe 100644
--- 
a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/PeerStatusEthSynchronizer.kt
+++ 
b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/PeerStatusEthSynchronizer.kt
@@ -18,7 +18,7 @@ package org.apache.tuweni.ethclient
 
 import kotlinx.coroutines.asCoroutineDispatcher
 import kotlinx.coroutines.launch
-import org.apache.tuweni.devp2p.eth.EthClient
+import org.apache.tuweni.devp2p.eth.EthRequestsManager
 import org.apache.tuweni.eth.Hash
 import org.apache.tuweni.eth.repository.BlockchainRepository
 import java.util.concurrent.ExecutorService
@@ -37,7 +37,7 @@ class PeerStatusEthSynchronizer(
   executor: ExecutorService = Executors.newSingleThreadExecutor(),
   coroutineContext: CoroutineContext = executor.asCoroutineDispatcher(),
   repository: BlockchainRepository,
-  client: EthClient,
+  client: EthRequestsManager,
   peerRepository: EthereumPeerRepository,
   private val adapter: WireConnectionPeerRepositoryAdapter
 ) : Synchronizer(executor, coroutineContext, repository, client, 
peerRepository) {
diff --git 
a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/Synchronizer.kt 
b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/Synchronizer.kt
index 8cb1e3c..4f8cb01 100644
--- a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/Synchronizer.kt
+++ b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/Synchronizer.kt
@@ -19,7 +19,7 @@ package org.apache.tuweni.ethclient
 import kotlinx.coroutines.CoroutineScope
 import kotlinx.coroutines.asCoroutineDispatcher
 import kotlinx.coroutines.launch
-import org.apache.tuweni.devp2p.eth.EthClient
+import org.apache.tuweni.devp2p.eth.EthRequestsManager
 import org.apache.tuweni.eth.BlockHeader
 import org.apache.tuweni.eth.Hash
 import org.apache.tuweni.eth.repository.BlockchainRepository
@@ -34,7 +34,7 @@ abstract class Synchronizer(
   val executor: ExecutorService = Executors.newFixedThreadPool(1),
   override val coroutineContext: CoroutineContext = 
executor.asCoroutineDispatcher(),
   val repository: BlockchainRepository,
-  val client: EthClient,
+  val client: EthRequestsManager,
   val peerRepository: EthereumPeerRepository
 ) : CoroutineScope {
   abstract fun start()
diff --git a/les/src/main/kotlin/org/apache/tuweni/les/LESSubProtocolHandler.kt 
b/les/src/main/kotlin/org/apache/tuweni/les/LESSubProtocolHandler.kt
index 2d9d442..ce7d592 100644
--- a/les/src/main/kotlin/org/apache/tuweni/les/LESSubProtocolHandler.kt
+++ b/les/src/main/kotlin/org/apache/tuweni/les/LESSubProtocolHandler.kt
@@ -128,11 +128,11 @@ internal class LESSubProtocolHandler(
     conn: WireConnection,
     blockBodiesMessage: BlockBodiesMessage
   ) {
-    controller.addNewBlockBodies(conn, blockBodiesMessage.blockBodies)
+    controller.addNewBlockBodies(conn, null, blockBodiesMessage.blockBodies)
   }
 
   private suspend fun handleBlockHeadersMessage(connection: WireConnection, 
blockHeadersMessage: BlockHeadersMessage) {
-    controller.addNewBlockHeaders(connection, blockHeadersMessage.blockHeaders)
+    controller.addNewBlockHeaders(connection, null, 
blockHeadersMessage.blockHeaders)
   }
 
   private suspend fun handleGetBlockHeaders(
diff --git a/les/src/main/kotlin/org/apache/tuweni/les/LESSubprotocol.kt 
b/les/src/main/kotlin/org/apache/tuweni/les/LESSubprotocol.kt
index 09ad2fb..bf41f27 100644
--- a/les/src/main/kotlin/org/apache/tuweni/les/LESSubprotocol.kt
+++ b/les/src/main/kotlin/org/apache/tuweni/les/LESSubprotocol.kt
@@ -68,48 +68,18 @@ class LESSubprotocol(
   private val listener: (WireConnection, Status) -> Unit = { _, _ -> }
 ) : SubProtocol {
 
-  /**
-   * Creates a new client for the subprotocol.
-   *
-   * @param service the rlpx service that will use the handler
-   * @return a new client for the subprotocol, bound to the service.
-   */
-  override fun createClient(service: RLPxService): SubProtocolClient {
+  override fun createClient(service: RLPxService, subProtocolIdentifier: 
SubProtocolIdentifier): SubProtocolClient {
     return EthClient(service, pendingTransactionsPool, 
connectionSelectionStrategy)
   }
 
-  /**
-   * @return the identifier of the subprotocol
-   */
   override fun id(): SubProtocolIdentifier {
     return LES_ID
   }
 
-  /**
-   * @param subProtocolIdentifier the identifier of the subprotocol
-   * @return true if the subprotocol ID and version are supported, false 
otherwise
-   */
   override fun supports(subProtocolIdentifier: SubProtocolIdentifier): Boolean 
{
     return "les" == subProtocolIdentifier.name() && 
subProtocolIdentifier.version() == 2
   }
 
-  /**
-   * Provides the length of the range of message types supported by the 
subprotocol for a given version
-   *
-   * @param version the version of the subprotocol to associate with the range
-   * @return the length of the range of message types supported by the 
subprotocol for a given version
-   */
-  override fun versionRange(version: Int): Int {
-    return 21
-  }
-
-  /**
-   * Creates a new handler for the subprotocol.
-   *
-   * @param service the rlpx service that will use the handler
-   * @param client the subprotocol client
-   * @return a new handler for the subprotocol, bound to the service.
-   */
   override fun createHandler(service: RLPxService, client: SubProtocolClient): 
SubProtocolHandler {
     val controller = EthController(repo, pendingTransactionsPool, client as 
EthRequestsManager, listener)
     return LESSubProtocolHandler(
@@ -128,6 +98,6 @@ class LESSubprotocol(
   }
 
   companion object {
-    internal val LES_ID = SubProtocolIdentifier.of("les", 2)
+    internal val LES_ID = SubProtocolIdentifier.of("les", 2, 21)
   }
 }
diff --git a/rlp/src/main/java/org/apache/tuweni/rlp/BytesRLPReader.java 
b/rlp/src/main/java/org/apache/tuweni/rlp/BytesRLPReader.java
index 0d43c43..8a4bec0 100644
--- a/rlp/src/main/java/org/apache/tuweni/rlp/BytesRLPReader.java
+++ b/rlp/src/main/java/org/apache/tuweni/rlp/BytesRLPReader.java
@@ -33,6 +33,15 @@ final class BytesRLPReader implements RLPReader {
   }
 
   @Override
+  public Bytes readRemaining() {
+    int remaining = content.size() - index;
+    if (remaining == 0) {
+      return Bytes.EMPTY;
+    }
+    return content.slice(remaining);
+  }
+
+  @Override
   public Bytes readValue(boolean lenient) {
     int remaining = content.size() - index;
     if (remaining == 0) {
diff --git a/rlp/src/main/java/org/apache/tuweni/rlp/RLPReader.java 
b/rlp/src/main/java/org/apache/tuweni/rlp/RLPReader.java
index 05e279f..e8a38da 100644
--- a/rlp/src/main/java/org/apache/tuweni/rlp/RLPReader.java
+++ b/rlp/src/main/java/org/apache/tuweni/rlp/RLPReader.java
@@ -406,4 +406,11 @@ public interface RLPReader {
    * @return current reader position
    */
   int position();
+
+  /**
+   * Provides the remainder of the bytes that have not been read yet.
+   * 
+   * @return the remainder of the input at current position
+   */
+  Bytes readRemaining();
 }
diff --git a/rlp/src/test/java/org/apache/tuweni/rlp/BytesRLPReaderTest.java 
b/rlp/src/test/java/org/apache/tuweni/rlp/BytesRLPReaderTest.java
index dfed5e0..6b0d245 100644
--- a/rlp/src/test/java/org/apache/tuweni/rlp/BytesRLPReaderTest.java
+++ b/rlp/src/test/java/org/apache/tuweni/rlp/BytesRLPReaderTest.java
@@ -219,4 +219,15 @@ class BytesRLPReaderTest {
     });
     assertEquals(expected, result);
   }
+
+  @Test
+  void shouldReadRemaining() {
+    Bytes input = Bytes.fromHexString("83646f6783646f67");
+    RLP.decode(input, reader -> {
+      reader.readValue();
+      assertEquals(4, reader.position());
+      assertEquals(Bytes.fromHexString("83646f67"), reader.readRemaining());
+      return null;
+    });
+  }
 }
diff --git 
a/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxAcceptanceTest.java
 
b/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxAcceptanceTest.java
index a2c309e..bc2c074 100644
--- 
a/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxAcceptanceTest.java
+++ 
b/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxAcceptanceTest.java
@@ -13,6 +13,7 @@
 package org.apache.tuweni.rlpx.vertx;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import org.apache.tuweni.bytes.Bytes;
@@ -79,13 +80,13 @@ class VertxAcceptanceTest {
     }
   }
 
-  private static class MyCustomSubProtocol implements SubProtocol {
+  static class MyCustomSubProtocol implements SubProtocol {
 
     public MyCustomSubProtocolHandler handler;
 
     @Override
     public SubProtocolIdentifier id() {
-      return SubProtocolIdentifier.of("cus", 1);
+      return SubProtocolIdentifier.of("cus", 1, 1);
     }
 
     @Override
@@ -94,18 +95,13 @@ class VertxAcceptanceTest {
     }
 
     @Override
-    public int versionRange(int version) {
-      return 1;
-    }
-
-    @Override
     public SubProtocolHandler createHandler(RLPxService service, 
SubProtocolClient client) {
       handler = new MyCustomSubProtocolHandler(service, id());
       return handler;
     }
 
     @Override
-    public SubProtocolClient createClient(RLPxService service) {
+    public SubProtocolClient createClient(RLPxService service, 
SubProtocolIdentifier identifier) {
       return null;
     }
   }
@@ -134,12 +130,14 @@ class VertxAcceptanceTest {
     secondService.start().join();
 
     try {
-      service.connectTo(secondKp.publicKey(), new 
InetSocketAddress("localhost", secondService.actualPort()));
-
-      Thread.sleep(3000);
+      WireConnection conn =
+          service.connectTo(secondKp.publicKey(), new 
InetSocketAddress("localhost", secondService.actualPort())).get();
+      assertNotNull(conn);
+      assertEquals(1, conn.agreedSubprotocols().size());
       assertEquals(1, repository.asMap().size());
       assertEquals(1, secondRepository.asMap().size());
 
+      Thread.sleep(1000);
       assertEquals(1, sp.handler.messages.size());
       assertEquals(1, secondSp.handler.messages.size());
 
@@ -176,11 +174,13 @@ class VertxAcceptanceTest {
     secondService.start().join();
 
     try {
-      service.connectTo(secondKp.publicKey(), new 
InetSocketAddress("localhost", secondService.actualPort()));
-
-      Thread.sleep(3000);
+      WireConnection conn =
+          service.connectTo(secondKp.publicKey(), new 
InetSocketAddress("localhost", secondService.actualPort())).get();
+      assertNotNull(conn);
+      assertEquals(1, conn.agreedSubprotocols().size());
       assertEquals(1, repository.asMap().size());
       assertEquals(1, secondRepository.asMap().size());
+      Thread.sleep(1000);
 
       assertEquals(1, sp.handler.messages.size());
       assertEquals(1, secondSp.handler.messages.size());
@@ -238,6 +238,11 @@ class VertxAcceptanceTest {
               public int version() {
                 return 63;
               }
+
+              @Override
+              public int versionRange() {
+                return 8;
+              }
             };
           }
 
@@ -247,17 +252,12 @@ class VertxAcceptanceTest {
           }
 
           @Override
-          public int versionRange(int version) {
-            return 0;
-          }
-
-          @Override
           public SubProtocolHandler createHandler(RLPxService service, 
SubProtocolClient client) {
             return null;
           }
 
           @Override
-          public SubProtocolClient createClient(RLPxService service) {
+          public SubProtocolClient createClient(RLPxService service, 
SubProtocolIdentifier identifier) {
             return null;
           }
         }), "Client 1", repository);
diff --git 
a/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxRLPxServiceTest.java
 
b/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxRLPxServiceTest.java
index 0ed9795..6694ae3 100644
--- 
a/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxRLPxServiceTest.java
+++ 
b/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxRLPxServiceTest.java
@@ -37,8 +37,10 @@ import org.apache.tuweni.rlpx.wire.WireConnection;
 
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import io.vertx.core.Vertx;
@@ -112,7 +114,7 @@ class VertxRLPxServiceTest {
   }
 
   @Test
-  void connectToOtherPeer(@VertxInstance Vertx vertx) throws Exception {
+  void connectToOtherPeerWithNoSubProtocols(@VertxInstance Vertx vertx) throws 
Exception {
     SECP256K1.KeyPair ourPair = SECP256K1.KeyPair.random();
     SECP256K1.KeyPair peerPair = SECP256K1.KeyPair.random();
     VertxRLPxService service = new VertxRLPxService(vertx, 0, "localhost", 
10000, ourPair, new ArrayList<>(), "abc");
@@ -122,23 +124,23 @@ class VertxRLPxServiceTest {
         new VertxRLPxService(vertx, 0, "localhost", 10000, peerPair, new 
ArrayList<>(), "abc");
     peerService.start().join();
 
-    try {
+    assertThrows(CancellationException.class, () -> {
       service.connectTo(peerPair.publicKey(), new 
InetSocketAddress("127.0.0.1", peerService.actualPort())).get();
-    } finally {
-      service.stop();
-      peerService.stop();
-    }
+    });
+    service.stop();
+    peerService.stop();
   }
 
   @Test
   void disconnectAfterStop(@VertxInstance Vertx vertx) throws Exception {
     SECP256K1.KeyPair ourPair = SECP256K1.KeyPair.random();
     SECP256K1.KeyPair peerPair = SECP256K1.KeyPair.random();
-    VertxRLPxService service = new VertxRLPxService(vertx, 0, "localhost", 
10000, ourPair, new ArrayList<>(), "abc");
+    List<SubProtocol> protocols = Arrays.asList(new 
VertxAcceptanceTest.MyCustomSubProtocol());
+    VertxRLPxService service = new VertxRLPxService(vertx, 0, "localhost", 
10000, ourPair, protocols, "abc");
     service.start().join();
 
-    VertxRLPxService peerService =
-        new VertxRLPxService(vertx, 0, "localhost", 10000, peerPair, new 
ArrayList<>(), "abc");
+
+    VertxRLPxService peerService = new VertxRLPxService(vertx, 0, "localhost", 
10000, peerPair, protocols, "abc");
     peerService.start().join();
 
     WireConnection conn = null;
@@ -166,7 +168,7 @@ class VertxRLPxServiceTest {
     List<SubProtocol> protocols = Collections.singletonList(new SubProtocol() {
       @Override
       public SubProtocolIdentifier id() {
-        return SubProtocolIdentifier.of("eth", 63);
+        return SubProtocolIdentifier.of("eth", 63, 17);
       }
 
       @Override
@@ -175,19 +177,15 @@ class VertxRLPxServiceTest {
       }
 
       @Override
-      public int versionRange(int version) {
-        return 0;
-      }
-
-      @Override
       public SubProtocolHandler createHandler(RLPxService service, 
SubProtocolClient client) {
         SubProtocolHandler handler = mock(SubProtocolHandler.class);
+        
when(handler.handleNewPeerConnection(any())).thenReturn(AsyncCompletion.COMPLETED);
         when(handler.stop()).thenReturn(AsyncCompletion.COMPLETED);
         return handler;
       }
 
       @Override
-      public SubProtocolClient createClient(RLPxService service) {
+      public SubProtocolClient createClient(RLPxService service, 
SubProtocolIdentifier identifier) {
         return mock(SubProtocolClient.class);
       }
     });
@@ -232,8 +230,9 @@ class VertxRLPxServiceTest {
   void getClientWeCreate(@VertxInstance Vertx vertx) throws Exception {
     SubProtocol sp = mock(SubProtocol.class);
     SubProtocolClient client = mock(SubProtocolClient.class);
-    when(sp.id()).thenReturn(SubProtocolIdentifier.of("foo", 1));
-    when(sp.createClient(any())).thenReturn(client);
+    when(sp.id()).thenReturn(SubProtocolIdentifier.of("foo", 1, 2));
+    
when(sp.getCapabilities()).thenReturn(Arrays.asList(SubProtocolIdentifier.of("foo",
 1, 2)));
+    when(sp.createClient(any(), any())).thenReturn(client);
     MemoryWireConnectionsRepository peerRepository = new 
MemoryWireConnectionsRepository();
     VertxRLPxService peerService = new VertxRLPxService(
         vertx,
diff --git 
a/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java 
b/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java
index 8effb49..930b1be 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java
@@ -39,7 +39,6 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
@@ -72,8 +71,8 @@ public final class VertxRLPxService implements RLPxService {
   private final String clientId;
   private final WireConnectionRepository repository;
 
-  private LinkedHashMap<SubProtocol, SubProtocolHandler> handlers;
-  private LinkedHashMap<SubProtocol, SubProtocolClient> clients;
+  private LinkedHashMap<SubProtocolIdentifier, SubProtocolHandler> handlers;
+  private LinkedHashMap<SubProtocolIdentifier, SubProtocolClient> clients;
   private NetClient client;
   private NetServer server;
 
@@ -175,9 +174,14 @@ public final class VertxRLPxService implements RLPxService 
{
       clients = new LinkedHashMap<>();
 
       for (SubProtocol subProtocol : subProtocols) {
-        SubProtocolClient client = subProtocol.createClient(this);
-        clients.put(subProtocol, client);
-        handlers.put(subProtocol, subProtocol.createHandler(this, client));
+        for (SubProtocolIdentifier identifier : subProtocol.getCapabilities()) 
{
+          if (identifier.versionRange() == 0) {
+            throw new IllegalArgumentException("Invalid subprotocol " + 
identifier.toString());
+          }
+          SubProtocolClient client = subProtocol.createClient(this, 
identifier);
+          clients.put(identifier, client);
+          handlers.put(identifier, subProtocol.createHandler(this, client));
+        }
       }
 
       client = vertx.createNetClient(new NetClientOptions());
@@ -303,12 +307,7 @@ public final class VertxRLPxService implements RLPxService 
{
     if (!started.get()) {
       throw new IllegalStateException("The RLPx service is not active");
     }
-    for (Map.Entry<SubProtocol, SubProtocolClient> clientEntry : 
clients.entrySet()) {
-      if (clientEntry.getKey().id().equals(subProtocolIdentifier)) {
-        return clientEntry.getValue();
-      }
-    }
-    return null;
+    return clients.get(subProtocolIdentifier);
   }
 
   @Override
diff --git 
a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultSubProtocolIdentifier.java
 
b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultSubProtocolIdentifier.java
index 85017d8..efd731d 100644
--- 
a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultSubProtocolIdentifier.java
+++ 
b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultSubProtocolIdentifier.java
@@ -21,10 +21,12 @@ final class DefaultSubProtocolIdentifier implements 
SubProtocolIdentifier {
 
   private final String name;
   private final int version;
+  private final int range;
 
-  DefaultSubProtocolIdentifier(String name, int version) {
+  DefaultSubProtocolIdentifier(String name, int version, int range) {
     this.name = name;
     this.version = version;
+    this.range = range;
   }
 
   @Override
@@ -38,6 +40,11 @@ final class DefaultSubProtocolIdentifier implements 
SubProtocolIdentifier {
   }
 
   @Override
+  public int versionRange() {
+    return range;
+  }
+
+  @Override
   public boolean equals(Object o) {
     if (this == o)
       return true;
@@ -51,4 +58,9 @@ final class DefaultSubProtocolIdentifier implements 
SubProtocolIdentifier {
   public int hashCode() {
     return Objects.hash(name, version);
   }
+
+  @Override
+  public String toString() {
+    return "DefaultSubProtocolIdentifier{" + "name='" + name + '\'' + ", 
version=" + version + '}';
+  }
 }
diff --git 
a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java 
b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java
index 73e5432..f05a607 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java
@@ -46,7 +46,7 @@ public final class DefaultWireConnection implements 
WireConnection {
   private final Consumer<RLPxMessage> writer;
   private final Consumer<HelloMessage> afterHandshakeListener;
   private final Runnable disconnectHandler;
-  private final LinkedHashMap<SubProtocol, SubProtocolHandler> subprotocols;
+  private final LinkedHashMap<SubProtocolIdentifier, SubProtocolHandler> 
subprotocols;
   private final int p2pVersion;
   private final String clientId;
   private final int advertisedPort;
@@ -57,7 +57,7 @@ public final class DefaultWireConnection implements 
WireConnection {
   private CompletableAsyncCompletion awaitingPong;
   private HelloMessage myHelloMessage;
   private HelloMessage peerHelloMessage;
-  private RangeMap<Integer, SubProtocol> subprotocolRangeMap = 
TreeRangeMap.create();
+  private RangeMap<Integer, SubProtocolIdentifier> subprotocolRangeMap = 
TreeRangeMap.create();
   private DisconnectReason disconnectReason;
   private boolean disconnectRequested;
   private boolean disconnectReceived;
@@ -85,7 +85,7 @@ public final class DefaultWireConnection implements 
WireConnection {
       Consumer<RLPxMessage> writer,
       Consumer<HelloMessage> afterHandshakeListener,
       Runnable disconnectHandler,
-      LinkedHashMap<SubProtocol, SubProtocolHandler> subprotocols,
+      LinkedHashMap<SubProtocolIdentifier, SubProtocolHandler> subprotocols,
       int p2pVersion,
       String clientId,
       int advertisedPort,
@@ -136,6 +136,17 @@ public final class DefaultWireConnection implements 
WireConnection {
         return;
       }
 
+      if (subprotocolRangeMap.asMapOfRanges().isEmpty()) {
+        logger
+            .debug(
+                "Useless peer detected, caps {}, our caps {}",
+                peerHelloMessage.capabilities(),
+                subprotocols.keySet());
+        disconnect(DisconnectReason.USELESS_PEER);
+        ready.cancel();
+        return;
+      }
+
       if (myHelloMessage == null) {
         sendHello();
       }
@@ -183,7 +194,8 @@ public final class DefaultWireConnection implements 
WireConnection {
         awaitingPong.complete();
       }
     } else {
-      Map.Entry<Range<Integer>, SubProtocol> subProtocolEntry = 
subprotocolRangeMap.getEntry(message.messageId());
+      Map.Entry<Range<Integer>, SubProtocolIdentifier> subProtocolEntry =
+          subprotocolRangeMap.getEntry(message.messageId());
       if (subProtocolEntry == null) {
         logger.debug("Unknown message received {}", message.messageId());
         disconnect(DisconnectReason.PROTOCOL_BREACH);
@@ -203,27 +215,29 @@ public final class DefaultWireConnection implements 
WireConnection {
     }
   }
 
-  private void initSupportedRange(List<Capability> capabilities) {
+  void initSupportedRange(List<Capability> capabilities) {
     int startRange = 16;
-    Map<String, Capability> pickedCapabilities = new HashMap<>();
-    for (SubProtocol sp : subprotocols.keySet()) {
+    Map<String, SubProtocolIdentifier> pickedCapabilities = new HashMap<>();
+    // find the max capability supported by the subprotocol
+    for (SubProtocolIdentifier sp : subprotocols.keySet()) {
       for (Capability cap : capabilities) {
-        if (sp.supports(SubProtocolIdentifier.of(cap.name(), cap.version()))) {
-          Capability oldPick = pickedCapabilities.get(cap.name());
+        if (sp.equals(SubProtocolIdentifier.of(cap.name(), cap.version()))) {
+          SubProtocolIdentifier oldPick = pickedCapabilities.get(cap.name());
           if (oldPick == null || oldPick.version() < cap.version()) {
-            pickedCapabilities.put(cap.name(), cap);
+            pickedCapabilities.put(cap.name(), sp);
           }
         }
       }
     }
 
     for (Capability cap : capabilities) {
-      if (!pickedCapabilities.containsValue(cap)) {
+      SubProtocolIdentifier capSp = SubProtocolIdentifier.of(cap.name(), 
cap.version());
+      if (!pickedCapabilities.get(cap.name()).equals(capSp)) {
         continue;
       }
-      for (SubProtocol sp : subprotocols.keySet()) {
-        if (sp.supports(SubProtocolIdentifier.of(cap.name(), cap.version()))) {
-          int numberOfMessageTypes = sp.versionRange(cap.version());
+      for (SubProtocolIdentifier sp : subprotocols.keySet()) {
+        if (sp.equals(capSp)) {
+          int numberOfMessageTypes = sp.versionRange();
           subprotocolRangeMap.put(Range.closedOpen(startRange, startRange + 
numberOfMessageTypes), sp);
           startRange += numberOfMessageTypes;
           break;
@@ -273,8 +287,6 @@ public final class DefaultWireConnection implements 
WireConnection {
             subprotocols
                 .keySet()
                 .stream()
-                .map(SubProtocol::getCapabilities)
-                .flatMap(subProtocolIdentifiers -> 
subProtocolIdentifiers.stream())
                 .map(
                     subProtocolIdentifier -> new Capability(
                         subProtocolIdentifier.name(),
@@ -286,8 +298,8 @@ public final class DefaultWireConnection implements 
WireConnection {
 
   @Override
   public boolean supports(SubProtocolIdentifier subProtocolIdentifier) {
-    for (SubProtocol sp : subprotocolRangeMap.asMapOfRanges().values()) {
-      if (sp.supports(subProtocolIdentifier)) {
+    for (SubProtocolIdentifier sp : 
subprotocolRangeMap.asMapOfRanges().values()) {
+      if (sp.equals(subProtocolIdentifier)) {
         return true;
       }
     }
@@ -297,8 +309,8 @@ public final class DefaultWireConnection implements 
WireConnection {
   @Override
   public Collection<SubProtocolIdentifier> agreedSubprotocols() {
     List<SubProtocolIdentifier> identifiers = new ArrayList<>();
-    for (SubProtocol sp : subprotocolRangeMap.asMapOfRanges().values()) {
-      identifiers.addAll(sp.getCapabilities());
+    for (SubProtocolIdentifier sp : 
subprotocolRangeMap.asMapOfRanges().values()) {
+      identifiers.add(sp);
     }
     return identifiers;
   }
@@ -306,8 +318,8 @@ public final class DefaultWireConnection implements 
WireConnection {
   public void sendMessage(SubProtocolIdentifier subProtocolIdentifier, int 
messageType, Bytes message) {
     logger.trace("Sending sub-protocol message {} {}", messageType, message);
     Integer offset = null;
-    for (Map.Entry<Range<Integer>, SubProtocol> entry : 
subprotocolRangeMap.asMapOfRanges().entrySet()) {
-      if (entry.getValue().supports(subProtocolIdentifier)) {
+    for (Map.Entry<Range<Integer>, SubProtocolIdentifier> entry : 
subprotocolRangeMap.asMapOfRanges().entrySet()) {
+      if (entry.getValue().equals(subProtocolIdentifier)) {
         offset = entry.getKey().lowerEndpoint();
         break;
       }
diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocol.java 
b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocol.java
index d48b70d..daae3a2 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocol.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocol.java
@@ -39,14 +39,6 @@ public interface SubProtocol {
   boolean supports(SubProtocolIdentifier subProtocolIdentifier);
 
   /**
-   * Provides the length of the range of message types supported by the 
subprotocol for a given version
-   *
-   * @param version the version of the subprotocol to associate with the range
-   * @return the length of the range of message types supported by the 
subprotocol for a given version
-   */
-  int versionRange(int version);
-
-  /**
    * Creates a new handler for the subprotocol.
    *
    * @param service the rlpx service that will use the handler
@@ -59,9 +51,10 @@ public interface SubProtocol {
    * Creates a new client for the subprotocol.
    *
    * @param service the rlpx service that will use the handler
+   * @param identifier the version of the subprotocol
    * @return a new client for the subprotocol, bound to the service.
    */
-  SubProtocolClient createClient(RLPxService service);
+  SubProtocolClient createClient(RLPxService service, SubProtocolIdentifier 
identifier);
 
   /**
    * Provides the capabilities supported by the subprotocol.
diff --git 
a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocolIdentifier.java 
b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocolIdentifier.java
index 29cb628..8546789 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocolIdentifier.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocolIdentifier.java
@@ -21,8 +21,12 @@ import static java.util.Objects.requireNonNull;
 public interface SubProtocolIdentifier {
 
   static SubProtocolIdentifier of(String name, int version) {
+    return of(name, version, 0);
+  }
+
+  static SubProtocolIdentifier of(String name, int version, int range) {
     requireNonNull(name);
-    return new DefaultSubProtocolIdentifier(name, version);
+    return new DefaultSubProtocolIdentifier(name, version, range);
   }
 
   /**
@@ -38,4 +42,11 @@ public interface SubProtocolIdentifier {
    * @return the version of the subprotocol
    */
   int version();
+
+  /**
+   * Provides the length of the range of message types supported by the 
subprotocol for a given version
+   *
+   * @return the length of the range of message types supported by the 
subprotocol for a given version
+   */
+  int versionRange();
 }
diff --git 
a/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/DefaultWireConnectionTest.java 
b/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/DefaultWireConnectionTest.java
index 9cf7719..f2d270e 100644
--- 
a/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/DefaultWireConnectionTest.java
+++ 
b/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/DefaultWireConnectionTest.java
@@ -13,6 +13,7 @@
 package org.apache.tuweni.rlpx.wire;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
 
 import org.apache.tuweni.bytes.Bytes;
 import org.apache.tuweni.concurrent.AsyncResult;
@@ -20,8 +21,10 @@ import org.apache.tuweni.crypto.SECP256K1;
 import org.apache.tuweni.junit.BouncyCastleExtension;
 import org.apache.tuweni.rlpx.RLPxMessage;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.junit.jupiter.api.Test;
@@ -170,4 +173,18 @@ class DefaultWireConnectionTest {
 
     assertEquals(DisconnectReason.INCOMPATIBLE_DEVP2P_VERSION.code, 
msg.reason());
   }
+
+  @Test
+  void testCapabilitiesNegotiation() {
+    SubProtocolIdentifier cus = SubProtocolIdentifier.of("cus", 1, 1);
+    LinkedHashMap<SubProtocolIdentifier, SubProtocolHandler> subprotocols = 
new LinkedHashMap<>();
+    subprotocols.put(cus, mock(SubProtocolHandler.class));
+    DefaultWireConnection conn = new DefaultWireConnection(nodeId, peerNodeId, 
disconnect -> {
+    }, helloMessage -> {
+    }, () -> {
+    }, subprotocols, 5, "abc", 10000, AsyncResult.incomplete(), "127.0.0.1", 
1234);
+    List<Capability> capabilityList = Arrays.asList(new Capability("cus", 1));
+    conn.initSupportedRange(capabilityList);
+    assertEquals(1, conn.agreedSubprotocols().size());
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to