This is an automated email from the ASF dual-hosted git repository.
toulmean pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
The following commit(s) were added to refs/heads/main by this push:
new d6195d9 Add support for eth/66
new a831bc4 Merge pull request #231 from atoulme/eth66
d6195d9 is described below
commit d6195d9b9ccf468f7f61b2a0f7120a254f9aa8ab
Author: Antoine Toulme <[email protected]>
AuthorDate: Sun May 2 11:30:21 2021 -0700
Add support for eth/66
---
.../org/apache/tuweni/devp2p/eth/EthClient.kt | 11 +-
.../devp2p/eth/{EthClient.kt => EthClient66.kt} | 110 ++++++++-------
.../org/apache/tuweni/devp2p/eth/EthController.kt | 61 +++++++--
.../org/apache/tuweni/devp2p/eth/EthHandler.kt | 14 +-
.../devp2p/eth/{EthHandler.kt => EthHandler66.kt} | 150 ++++++++++++---------
.../tuweni/devp2p/eth/EthHelloSubprotocol.kt | 18 +--
.../apache/tuweni/devp2p/eth/EthRequestsManager.kt | 36 -----
.../org/apache/tuweni/devp2p/eth/EthSubprotocol.kt | 41 +++---
.../apache/tuweni/devp2p/eth/EthSubprotocolTest.kt | 23 ++--
.../apache/tuweni/devp2p/proxy/ProxySubprotocol.kt | 10 +-
.../org/apache/tuweni/ethclient/EthereumClient.kt | 60 +++++----
.../tuweni/ethclient/FromBestBlockSynchronizer.kt | 4 +-
.../ethclient/FromUnknownParentSynchronizer.kt | 4 +-
.../tuweni/ethclient/PeerStatusEthSynchronizer.kt | 4 +-
.../org/apache/tuweni/ethclient/Synchronizer.kt | 4 +-
.../org/apache/tuweni/les/LESSubProtocolHandler.kt | 4 +-
.../kotlin/org/apache/tuweni/les/LESSubprotocol.kt | 34 +----
.../java/org/apache/tuweni/rlp/BytesRLPReader.java | 9 ++
.../main/java/org/apache/tuweni/rlp/RLPReader.java | 7 +
.../org/apache/tuweni/rlp/BytesRLPReaderTest.java | 11 ++
.../tuweni/rlpx/vertx/VertxAcceptanceTest.java | 40 +++---
.../tuweni/rlpx/vertx/VertxRLPxServiceTest.java | 35 +++--
.../apache/tuweni/rlpx/vertx/VertxRLPxService.java | 23 ++--
.../rlpx/wire/DefaultSubProtocolIdentifier.java | 14 +-
.../tuweni/rlpx/wire/DefaultWireConnection.java | 56 +++++---
.../org/apache/tuweni/rlpx/wire/SubProtocol.java | 11 +-
.../tuweni/rlpx/wire/SubProtocolIdentifier.java | 13 +-
.../rlpx/wire/DefaultWireConnectionTest.java | 17 +++
28 files changed, 453 insertions(+), 371 deletions(-)
diff --git
a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthClient.kt
b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthClient.kt
index 0d3fa1c..5b9e3ed 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthClient.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthClient.kt
@@ -172,7 +172,7 @@ class EthClient(
}
@Suppress("TYPE_INFERENCE_ONLY_INPUT_TYPES_WARNING")
- override fun wasRequested(
+ fun wasRequested(
connection: WireConnection,
headers: List<BlockHeader>
): CompletableAsyncResult<List<BlockHeader>>? {
@@ -184,16 +184,13 @@ class EthClient(
}
}
- override fun wasRequested(connection: WireConnection, bodies:
List<BlockBody>): Request<List<BlockBody>>? =
+ fun wasRequested(connection: WireConnection): Request<List<BlockBody>>? =
bodiesRequests[connection.uri()]
- override fun nodeDataWasRequested(connection: WireConnection, elements:
List<Bytes?>): Request<List<Bytes?>>? =
+ fun nodeDataWasRequested(connection: WireConnection): Request<List<Bytes?>>?
=
nodeDataRequests[connection.uri()]
- override fun transactionReceiptsRequested(
- connection: WireConnection,
- transactionReceipts: List<List<TransactionReceipt>>
- ): Request<List<List<TransactionReceipt>>>? =
transactionReceiptRequests[connection.uri()]
+ fun transactionReceiptsRequested(connection: WireConnection):
Request<List<List<TransactionReceipt>>>? =
transactionReceiptRequests[connection.uri()]
override suspend fun submitPooledTransaction(vararg tx: Transaction) {
for (t in tx) {
diff --git
a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthClient.kt
b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthClient66.kt
similarity index 68%
copy from devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthClient.kt
copy to devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthClient66.kt
index 0d3fa1c..7937686 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthClient.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthClient66.kt
@@ -17,7 +17,6 @@
package org.apache.tuweni.devp2p.eth
import org.apache.tuweni.bytes.Bytes
-import org.apache.tuweni.bytes.Bytes32
import org.apache.tuweni.concurrent.AsyncResult
import org.apache.tuweni.concurrent.CompletableAsyncResult
import org.apache.tuweni.eth.Block
@@ -27,45 +26,47 @@ import org.apache.tuweni.eth.Hash
import org.apache.tuweni.eth.Transaction
import org.apache.tuweni.eth.TransactionReceipt
import org.apache.tuweni.eth.repository.TransactionPool
+import org.apache.tuweni.rlp.RLP
import org.apache.tuweni.rlpx.RLPxService
import org.apache.tuweni.rlpx.wire.SubProtocolClient
import org.apache.tuweni.rlpx.wire.WireConnection
import org.apache.tuweni.units.bigints.UInt256
-import org.slf4j.LoggerFactory
+import org.apache.tuweni.units.bigints.UInt64
-val logger = LoggerFactory.getLogger(EthClient::class.java)
/**
* Client of the ETH subprotocol, allowing to request block and node data
*/
-class EthClient(
+class EthClient66(
private val service: RLPxService,
private val pendingTransactionsPool: TransactionPool,
- private val connectionSelectionStrategy: ConnectionSelectionStrategy
+ private val connectionSelectionStrategy: ConnectionSelectionStrategy,
) :
EthRequestsManager, SubProtocolClient {
- private val headerRequests = mutableMapOf<Bytes32,
Request<List<BlockHeader>>>()
- private val bodiesRequests = HashMap<String, Request<List<BlockBody>>>()
- private val nodeDataRequests = HashMap<String, Request<List<Bytes?>>>()
- private val transactionReceiptRequests = HashMap<String,
Request<List<List<TransactionReceipt>>>>()
+ private val headerRequests = mutableMapOf<Bytes,
Request<List<BlockHeader>>>()
+ private val bodiesRequests = HashMap<Bytes, Request<List<BlockBody>>>()
+ private val nodeDataRequests = HashMap<Bytes, Request<List<Bytes?>>>()
+ private val transactionReceiptRequests = HashMap<Bytes,
Request<List<List<TransactionReceipt>>>>()
override fun connectionSelectionStrategy() = connectionSelectionStrategy
override fun requestTransactionReceipts(
blockHashes: List<Hash>,
- connection: WireConnection?
+ connection: WireConnection?,
): AsyncResult<List<List<TransactionReceipt>>> {
val conns = service.repository().asIterable(EthSubprotocol.ETH62)
val handle = AsyncResult.incomplete<List<List<TransactionReceipt>>>()
var done = false
conns.forEach { conn ->
-
- transactionReceiptRequests.computeIfAbsent(conn.uri()) {
+ transactionReceiptRequests.computeIfAbsent(UInt64.random().toBytes()) {
key ->
service.send(
EthSubprotocol.ETH62,
MessageType.GetReceipts.code,
conn,
- GetReceipts(blockHashes).toBytes()
+ RLP.encodeList {
+ it.writeValue(key)
+ it.writeRLP(GetReceipts(blockHashes).toBytes())
+ }
)
done = true
Request(conn.uri(), handle, blockHashes)
@@ -82,17 +83,20 @@ class EthClient(
maxHeaders: Long,
skip: Long,
reverse: Boolean,
- connection: WireConnection?
+ connection: WireConnection?,
): AsyncResult<List<BlockHeader>> {
logger.info("Requesting headers hash: $blockHash maxHeaders: $maxHeaders
skip: $skip reverse: $reverse")
val conn = connectionSelectionStrategy.selectConnection()
val completion = AsyncResult.incomplete<List<BlockHeader>>()
- headerRequests.computeIfAbsent(blockHash) {
+ headerRequests.computeIfAbsent(UInt64.random().toBytes()) { key ->
service.send(
EthSubprotocol.ETH62,
MessageType.GetBlockHeaders.code,
conn!!,
- GetBlockHeaders(blockHash, maxHeaders, skip, reverse).toBytes()
+ RLP.encodeList {
+ it.writeValue(key)
+ it.writeRLP(GetBlockHeaders(blockHash, maxHeaders, skip,
reverse).toBytes())
+ }
)
Request(connectionId = conn.uri(), handle = completion, data = blockHash)
}
@@ -104,16 +108,20 @@ class EthClient(
maxHeaders: Long,
skip: Long,
reverse: Boolean,
- connection: WireConnection?
+ connection: WireConnection?,
): AsyncResult<List<BlockHeader>> {
- val blockNumberBytes = UInt256.valueOf(blockNumber).toBytes()
+ val blockNumberBytes = UInt256.valueOf(blockNumber)
val completion = AsyncResult.incomplete<List<BlockHeader>>()
- headerRequests.computeIfAbsent(blockNumberBytes) {
+ headerRequests.computeIfAbsent(UInt64.random().toBytes()) { key ->
service.send(
EthSubprotocol.ETH62,
MessageType.GetBlockHeaders.code,
connection!!,
- GetBlockHeaders(blockNumberBytes, maxHeaders, skip, reverse).toBytes()
+ RLP.encodeList {
+ it.writeValue(key)
+ it.writeRLP(GetBlockHeaders(blockNumberBytes, maxHeaders, skip,
reverse).toBytes())
+ }
+
)
Request(connectionId = connection.uri(), handle = completion, data =
blockNumber)
}
@@ -122,18 +130,21 @@ class EthClient(
override fun requestBlockHeaders(
blockHashes: List<Hash>,
- connection: WireConnection?
+ connection: WireConnection?,
): AsyncResult<List<BlockHeader>> {
return AsyncResult.combine(blockHashes.stream().map {
requestBlockHeader(it) })
}
override fun requestBlockHeader(blockHash: Hash, connection:
WireConnection?): AsyncResult<BlockHeader> {
- val request = headerRequests.computeIfAbsent(blockHash) {
+ val request = headerRequests.computeIfAbsent(UInt64.random().toBytes()) {
key ->
service.send(
EthSubprotocol.ETH62,
MessageType.GetBlockHeaders.code,
connection!!,
- GetBlockHeaders(blockHash, 1, 0, false).toBytes()
+ RLP.encodeList {
+ it.writeValue(key)
+ it.writeRLP(GetBlockHeaders(blockHash, 1, 0, false).toBytes())
+ }
)
val completion = AsyncResult.incomplete<List<BlockHeader>>()
Request(connectionId = connection.uri(), handle = completion, data =
blockHash)
@@ -143,12 +154,15 @@ class EthClient(
override fun requestBlockBodies(blockHashes: List<Hash>, connection:
WireConnection?): AsyncResult<List<BlockBody>> {
val handle = AsyncResult.incomplete<List<BlockBody>>()
- bodiesRequests.computeIfAbsent(connection!!.uri()) {
+ bodiesRequests.compute(UInt64.random().toBytes()) { key, _ ->
service.send(
EthSubprotocol.ETH62,
MessageType.GetBlockBodies.code,
- connection,
- GetBlockBodies(blockHashes).toBytes()
+ connection!!,
+ RLP.encodeList {
+ it.writeValue(key)
+ it.writeRLP(GetBlockBodies(blockHashes).toBytes())
+ }
)
Request(connection.uri(), handle, blockHashes)
}
@@ -171,38 +185,44 @@ class EthClient(
return result
}
- @Suppress("TYPE_INFERENCE_ONLY_INPUT_TYPES_WARNING")
- override fun wasRequested(
- connection: WireConnection,
- headers: List<BlockHeader>
+ fun headersRequested(
+ requestIdentifier: Bytes,
): CompletableAsyncResult<List<BlockHeader>>? {
- val request = headerRequests.remove(headers) ?: return null
- if (request.connectionId == connection.uri()) {
- return request.handle
- } else {
- return null
- }
+ val request = headerRequests.remove(requestIdentifier) ?: return null
+ return request.handle
}
- override fun wasRequested(connection: WireConnection, bodies:
List<BlockBody>): Request<List<BlockBody>>? =
- bodiesRequests[connection.uri()]
+ fun bodiesRequested(requestIdentifier: Bytes): Request<List<BlockBody>>? =
+ bodiesRequests[requestIdentifier]
- override fun nodeDataWasRequested(connection: WireConnection, elements:
List<Bytes?>): Request<List<Bytes?>>? =
- nodeDataRequests[connection.uri()]
+ fun nodeDataWasRequested(requestIdentifier: Bytes): Request<List<Bytes?>>? =
+ nodeDataRequests[requestIdentifier]
- override fun transactionReceiptsRequested(
- connection: WireConnection,
- transactionReceipts: List<List<TransactionReceipt>>
- ): Request<List<List<TransactionReceipt>>>? =
transactionReceiptRequests[connection.uri()]
+ fun transactionReceiptsRequested(
+ requestIdentifier: Bytes,
+ ): Request<List<List<TransactionReceipt>>>? =
transactionReceiptRequests[requestIdentifier]
override suspend fun submitPooledTransaction(vararg tx: Transaction) {
for (t in tx) {
pendingTransactionsPool.add(t)
}
val hashes = tx.map { it.hash }
- val conns = service.repository().asIterable(EthSubprotocol.ETH65)
+ val conns = service.repository().asIterable(EthSubprotocol.ETH66)
conns.forEach { conn ->
service.send(
+ EthSubprotocol.ETH66,
+ MessageType.NewPooledTransactionHashes.code,
+ conn,
+ RLP.encodeList {
+ it.writeValue(UInt64.random().toBytes())
+ it.writeRLP(GetBlockBodies(hashes).toBytes())
+ }
+ )
+ }
+
+ val conns65 = service.repository().asIterable(EthSubprotocol.ETH65)
+ conns65.forEach { conn ->
+ service.send(
EthSubprotocol.ETH65,
MessageType.NewPooledTransactionHashes.code,
conn,
diff --git
a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthController.kt
b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthController.kt
index 469f308..db94de5 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthController.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthController.kt
@@ -34,7 +34,7 @@ class EthController(
val repository: BlockchainRepository,
val pendingTransactionsPool: TransactionPool,
val requestsManager: EthRequestsManager,
- val connectionsListener: (WireConnection, Status) -> Unit = { _, _ -> }
+ val connectionsListener: (WireConnection, Status) -> Unit = { _, _ -> },
) {
suspend fun findTransactionReceipts(hashes: List<Hash>):
List<List<TransactionReceipt>> {
@@ -104,15 +104,33 @@ class EthController(
requestsManager.requestBlockBodies(listOf(blockHash))
}
- suspend fun addNewBlockHeaders(connection: WireConnection, headers:
List<BlockHeader>) {
- val request = requestsManager.wasRequested(connection, headers)
- if (request != null) {
- request.complete(headers)
+ suspend fun addNewBlockHeaders(connection: WireConnection,
requestIdentifier: Bytes?, headers: List<BlockHeader>) {
+ val request = when (requestsManager) {
+ is EthClient -> {
+ requestsManager.wasRequested(connection, headers)
+ }
+ is EthClient66 -> {
+ requestsManager.headersRequested(requestIdentifier!!)
+ }
+ else -> {
+ throw IllegalArgumentException("Unsupported requestsManager")
+ }
}
+ request?.complete(headers)
}
- suspend fun addNewBlockBodies(connection: WireConnection, bodies:
List<BlockBody>) {
- val request = requestsManager.wasRequested(connection, bodies)
+ suspend fun addNewBlockBodies(connection: WireConnection, requestIdentifier:
Bytes?, bodies: List<BlockBody>) {
+ val request = when (requestsManager) {
+ is EthClient -> {
+ requestsManager.wasRequested(connection)
+ }
+ is EthClient66 -> {
+ requestsManager.bodiesRequested(requestIdentifier!!)
+ }
+ else -> {
+ throw IllegalArgumentException("Unsupported requestsManager")
+ }
+ }
if (request != null) {
val hashes = request.data as List<*>
for (i in 0..hashes.size) {
@@ -128,8 +146,18 @@ class EthController(
suspend fun findNodeData(hashes: List<Hash>) =
repository.retrieveNodeData(hashes)
- suspend fun addNewNodeData(connection: WireConnection, elements:
List<Bytes?>) {
- val request = requestsManager.nodeDataWasRequested(connection, elements)
+ suspend fun addNewNodeData(connection: WireConnection, requestIdentifier:
Bytes?, elements: List<Bytes?>) {
+ val request = when (requestsManager) {
+ is EthClient -> {
+ requestsManager.nodeDataWasRequested(connection)
+ }
+ is EthClient66 -> {
+ requestsManager.nodeDataWasRequested(requestIdentifier!!)
+ }
+ else -> {
+ throw IllegalArgumentException("Unsupported requestsManager")
+ }
+ }
if (request != null) {
val hashes = request.data as List<*>
for (i in 0..hashes.size) {
@@ -144,9 +172,20 @@ class EthController(
suspend fun addNewTransactionReceipts(
connection: WireConnection,
- transactionReceipts: List<List<TransactionReceipt>>
+ requestIdentifier: Bytes?,
+ transactionReceipts: List<List<TransactionReceipt>>,
) {
- val request = requestsManager.transactionReceiptsRequested(connection,
transactionReceipts)
+ val request = when (requestsManager) {
+ is EthClient -> {
+ requestsManager.transactionReceiptsRequested(connection)
+ }
+ is EthClient66 -> {
+ requestsManager.transactionReceiptsRequested(requestIdentifier!!)
+ }
+ else -> {
+ throw IllegalArgumentException("Unsupported requestsManager")
+ }
+ }
if (request != null) {
val hashes = request.data as List<*>
for (i in 0..hashes.size) {
diff --git
a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt
b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt
index 34614f7..d989fe7 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt
@@ -37,7 +37,7 @@ internal class EthHandler(
override val coroutineContext: CoroutineContext = Dispatchers.Default,
private val blockchainInfo: BlockchainInformation,
private val service: RLPxService,
- private val controller: EthController
+ private val controller: EthController,
) : SubProtocolHandler, CoroutineScope {
private val pendingStatus = WeakHashMap<String, PeerInfo>()
@@ -99,7 +99,7 @@ internal class EthHandler(
}
private suspend fun handleNodeData(connection: WireConnection, read:
NodeData) {
- controller.addNewNodeData(connection, read.elements)
+ controller.addNewNodeData(connection, null, read.elements)
}
private suspend fun handleStatus(connection: WireConnection, status:
StatusMessage) {
@@ -153,7 +153,7 @@ internal class EthHandler(
}
private suspend fun handleReceipts(connection: WireConnection, receipts:
Receipts) {
- controller.addNewTransactionReceipts(connection,
receipts.transactionReceipts)
+ controller.addNewTransactionReceipts(connection, null,
receipts.transactionReceipts)
}
private suspend fun handleGetReceipts(connection: WireConnection,
getReceipts: GetReceipts) {
@@ -180,7 +180,7 @@ internal class EthHandler(
}
private suspend fun handleBlockBodies(connection: WireConnection, message:
BlockBodies) {
- controller.addNewBlockBodies(connection, message.bodies)
+ controller.addNewBlockBodies(connection, null, message.bodies)
}
private suspend fun handleGetBlockBodies(connection: WireConnection,
message: GetBlockBodies) {
@@ -193,7 +193,7 @@ internal class EthHandler(
}
private suspend fun handleHeaders(connection: WireConnection, headers:
BlockHeaders) {
- controller.addNewBlockHeaders(connection, headers.headers)
+ controller.addNewBlockHeaders(connection, null, headers.headers)
}
private suspend fun handleGetBlockHeaders(connection: WireConnection,
blockHeaderRequest: GetBlockHeaders) {
@@ -216,7 +216,7 @@ internal class EthHandler(
service.send(
EthSubprotocol.ETH64, MessageType.Status.code, connection,
StatusMessage(
- EthSubprotocol.ETH64.version(),
+ EthSubprotocol.ETH65.version(),
blockchainInfo.networkID(), blockchainInfo.totalDifficulty(),
blockchainInfo.bestHash(), blockchainInfo.genesisHash(),
blockchainInfo.getLatestForkHash(),
blockchainInfo.getLatestFork()
@@ -230,7 +230,7 @@ internal class EthHandler(
}
}
-internal class PeerInfo() {
+internal class PeerInfo {
val ready: CompletableAsyncCompletion = AsyncCompletion.incomplete()
diff --git
a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt
b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler66.kt
similarity index 64%
copy from devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt
copy to devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler66.kt
index 34614f7..f340d7f 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler66.kt
@@ -20,24 +20,25 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import org.apache.tuweni.bytes.Bytes
import org.apache.tuweni.concurrent.AsyncCompletion
-import org.apache.tuweni.concurrent.CompletableAsyncCompletion
import org.apache.tuweni.concurrent.coroutines.asyncCompletion
import org.apache.tuweni.eth.Hash
+import org.apache.tuweni.rlp.RLP
import org.apache.tuweni.rlpx.RLPxService
import org.apache.tuweni.rlpx.wire.DisconnectReason
import org.apache.tuweni.rlpx.wire.SubProtocolHandler
import org.apache.tuweni.rlpx.wire.WireConnection
+import org.apache.tuweni.units.bigints.UInt64
import org.slf4j.LoggerFactory
import java.util.WeakHashMap
import kotlin.collections.ArrayList
import kotlin.collections.set
import kotlin.coroutines.CoroutineContext
-internal class EthHandler(
+internal class EthHandler66(
override val coroutineContext: CoroutineContext = Dispatchers.Default,
private val blockchainInfo: BlockchainInformation,
private val service: RLPxService,
- private val controller: EthController
+ private val controller: EthController,
) : SubProtocolHandler, CoroutineScope {
private val pendingStatus = WeakHashMap<String, PeerInfo>()
@@ -48,32 +49,37 @@ internal class EthHandler(
val MAX_POOLED_TX = 256
}
- override fun handle(connection: WireConnection, messageType: Int, message:
Bytes) = asyncCompletion {
+ override fun handle(connection: WireConnection, messageType: Int, payload:
Bytes) = asyncCompletion {
logger.debug("Receiving message of type {}", messageType)
+ val pair = RLP.decode(payload) {
+ Pair(it.readValue(), it.readRemaining())
+ }
+ val requestIdentifier = pair.first
+ val message = pair.second
+
when (messageType) {
MessageType.Status.code -> handleStatus(connection,
StatusMessage.read(message))
MessageType.NewBlockHashes.code ->
handleNewBlockHashes(NewBlockHashes.read(message))
MessageType.Transactions.code ->
handleTransactions(Transactions.read(message))
- MessageType.GetBlockHeaders.code -> handleGetBlockHeaders(connection,
GetBlockHeaders.read(message))
- MessageType.BlockHeaders.code -> handleHeaders(connection,
BlockHeaders.read(message))
- MessageType.GetBlockBodies.code -> handleGetBlockBodies(connection,
GetBlockBodies.read(message))
- MessageType.BlockBodies.code -> handleBlockBodies(connection,
BlockBodies.read(message))
+ MessageType.GetBlockHeaders.code -> handleGetBlockHeaders(connection,
requestIdentifier, GetBlockHeaders.read(message))
+ MessageType.BlockHeaders.code -> handleHeaders(connection,
requestIdentifier, BlockHeaders.read(message))
+ MessageType.GetBlockBodies.code -> handleGetBlockBodies(connection,
requestIdentifier, GetBlockBodies.read(message))
+ MessageType.BlockBodies.code -> handleBlockBodies(connection,
requestIdentifier, BlockBodies.read(message))
MessageType.NewBlock.code -> handleNewBlock(NewBlock.read(message))
- MessageType.GetNodeData.code -> handleGetNodeData(connection,
GetNodeData.read(message))
- MessageType.NodeData.code -> handleNodeData(connection,
NodeData.read(message))
- MessageType.GetReceipts.code -> handleGetReceipts(connection,
GetReceipts.read(message))
- MessageType.Receipts.code -> handleReceipts(connection,
Receipts.read(message))
+ MessageType.GetNodeData.code -> handleGetNodeData(connection,
requestIdentifier, GetNodeData.read(message))
+ MessageType.NodeData.code -> handleNodeData(connection,
requestIdentifier, NodeData.read(message))
+ MessageType.GetReceipts.code -> handleGetReceipts(connection,
requestIdentifier, GetReceipts.read(message))
+ MessageType.Receipts.code -> handleReceipts(connection,
requestIdentifier, Receipts.read(message))
MessageType.NewPooledTransactionHashes.code ->
handleNewPooledTransactionHashes(
- connection,
- NewPooledTransactionHashes.read(message)
+ connection, NewPooledTransactionHashes.read(message)
)
MessageType.GetPooledTransactions.code -> handleGetPooledTransactions(
- connection,
+ connection, requestIdentifier,
GetPooledTransactions.read(message)
)
MessageType.PooledTransactions.code ->
handlePooledTransactions(PooledTransactions.read(message))
else -> {
- logger.warn("Unknown message type {}", messageType)
+ logger.warn("Unknown message type {} with request identifier {}",
messageType, requestIdentifier)
service.disconnect(connection, DisconnectReason.SUBPROTOCOL_REASON)
}
}
@@ -83,14 +89,17 @@ internal class EthHandler(
controller.addNewPooledTransactions(read.transactions)
}
- private suspend fun handleGetPooledTransactions(connection: WireConnection,
read: GetPooledTransactions) {
+ private suspend fun handleGetPooledTransactions(connection: WireConnection,
requestIdentifier: Bytes, read: GetPooledTransactions) {
val tx = controller.findPooledTransactions(read.hashes)
logger.debug("Responding to GetPooledTransactions with {} transactions",
tx.size)
service.send(
- EthSubprotocol.ETH65,
+ EthSubprotocol.ETH66,
MessageType.PooledTransactions.code,
connection,
- PooledTransactions(tx).toBytes()
+ RLP.encodeList {
+ it.writeValue(requestIdentifier)
+ it.writeRLP(PooledTransactions(tx).toBytes())
+ }
)
}
@@ -98,8 +107,8 @@ internal class EthHandler(
controller.addNewTransactions(transactions.transactions)
}
- private suspend fun handleNodeData(connection: WireConnection, read:
NodeData) {
- controller.addNewNodeData(connection, read.elements)
+ private suspend fun handleNodeData(connection: WireConnection,
requestIdentifier: Bytes, read: NodeData) {
+ controller.addNewNodeData(connection, requestIdentifier, read.elements)
}
private suspend fun handleStatus(connection: WireConnection, status:
StatusMessage) {
@@ -133,10 +142,13 @@ internal class EthHandler(
}
if (missingTx.size == MAX_POOLED_TX) {
service.send(
- EthSubprotocol.ETH65,
+ EthSubprotocol.ETH66,
MessageType.GetPooledTransactions.code,
connection,
- message.toBytes()
+ RLP.encodeList {
+ it.writeValue(UInt64.random().toBytes())
+ it.writeRLP(message.toBytes())
+ }
)
missingTx = ArrayList()
message = GetPooledTransactions(missingTx)
@@ -144,34 +156,44 @@ internal class EthHandler(
}
if (!missingTx.isEmpty()) {
service.send(
- EthSubprotocol.ETH65,
+ EthSubprotocol.ETH66,
MessageType.GetPooledTransactions.code,
connection,
- message.toBytes()
+ RLP.encodeList {
+ it.writeValue(UInt64.random().toBytes())
+ it.writeRLP(message.toBytes())
+ }
)
}
}
- private suspend fun handleReceipts(connection: WireConnection, receipts:
Receipts) {
- controller.addNewTransactionReceipts(connection,
receipts.transactionReceipts)
+ private suspend fun handleReceipts(connection: WireConnection,
requestIdentifier: Bytes, receipts: Receipts) {
+ controller.addNewTransactionReceipts(connection, requestIdentifier,
receipts.transactionReceipts)
}
- private suspend fun handleGetReceipts(connection: WireConnection,
getReceipts: GetReceipts) {
-
+ private suspend fun handleGetReceipts(connection: WireConnection,
requestIdentifier: Bytes, getReceipts: GetReceipts) {
+ val receipts = controller.findTransactionReceipts(getReceipts.hashes)
service.send(
- EthSubprotocol.ETH64,
+ EthSubprotocol.ETH66,
MessageType.Receipts.code,
connection,
-
Receipts(controller.findTransactionReceipts(getReceipts.hashes)).toBytes()
+ RLP.encodeList {
+ it.writeValue(requestIdentifier)
+ it.writeRLP(Receipts(receipts).toBytes())
+ }
)
}
- private suspend fun handleGetNodeData(connection: WireConnection, nodeData:
GetNodeData) {
+ private suspend fun handleGetNodeData(connection: WireConnection,
requestIdentifier: Bytes, nodeData: GetNodeData) {
+ val data = controller.findNodeData(nodeData.hashes)
service.send(
- EthSubprotocol.ETH64,
+ EthSubprotocol.ETH66,
MessageType.NodeData.code,
connection,
- NodeData(controller.findNodeData(nodeData.hashes)).toBytes()
+ RLP.encodeList {
+ it.writeValue(requestIdentifier)
+ it.writeRLP(NodeData(data).toBytes())
+ }
)
}
@@ -179,31 +201,41 @@ internal class EthHandler(
controller.addNewBlock(read.block)
}
- private suspend fun handleBlockBodies(connection: WireConnection, message:
BlockBodies) {
- controller.addNewBlockBodies(connection, message.bodies)
+ private suspend fun handleBlockBodies(connection: WireConnection,
requestIdentifier: Bytes, message: BlockBodies) {
+ controller.addNewBlockBodies(connection, requestIdentifier, message.bodies)
}
- private suspend fun handleGetBlockBodies(connection: WireConnection,
message: GetBlockBodies) {
+ private suspend fun handleGetBlockBodies(connection: WireConnection,
requestIdentifier: Bytes, message: GetBlockBodies) {
+ val bodies = BlockBodies(controller.findBlockBodies(message.hashes))
service.send(
- EthSubprotocol.ETH64,
+ EthSubprotocol.ETH66,
MessageType.BlockBodies.code,
connection,
- BlockBodies(controller.findBlockBodies(message.hashes)).toBytes()
+ RLP.encodeList {
+ it.writeValue(requestIdentifier)
+ it.writeRLP(bodies.toBytes())
+ }
)
}
- private suspend fun handleHeaders(connection: WireConnection, headers:
BlockHeaders) {
- controller.addNewBlockHeaders(connection, headers.headers)
+ private suspend fun handleHeaders(connection: WireConnection,
requestIdentifier: Bytes, headers: BlockHeaders) {
+ controller.addNewBlockHeaders(connection, requestIdentifier,
headers.headers)
}
- private suspend fun handleGetBlockHeaders(connection: WireConnection,
blockHeaderRequest: GetBlockHeaders) {
+ private suspend fun handleGetBlockHeaders(connection: WireConnection,
requestIdentifier: Bytes, blockHeaderRequest: GetBlockHeaders) {
val headers = controller.findHeaders(
blockHeaderRequest.block,
blockHeaderRequest.maxHeaders,
blockHeaderRequest.skip,
blockHeaderRequest.reverse
)
- service.send(EthSubprotocol.ETH64, MessageType.BlockHeaders.code,
connection, BlockHeaders(headers).toBytes())
+ service.send(
+ EthSubprotocol.ETH66, MessageType.BlockHeaders.code, connection,
+ RLP.encodeList {
+ it.writeValue(requestIdentifier)
+ it.writeRLP(BlockHeaders(headers).toBytes())
+ }
+ )
}
private suspend fun handleNewBlockHashes(message: NewBlockHashes) {
@@ -214,13 +246,18 @@ internal class EthHandler(
val newPeer = PeerInfo()
pendingStatus[connection.uri()] = newPeer
service.send(
- EthSubprotocol.ETH64, MessageType.Status.code, connection,
- StatusMessage(
- EthSubprotocol.ETH64.version(),
- blockchainInfo.networkID(), blockchainInfo.totalDifficulty(),
- blockchainInfo.bestHash(), blockchainInfo.genesisHash(),
blockchainInfo.getLatestForkHash(),
- blockchainInfo.getLatestFork()
- ).toBytes()
+ EthSubprotocol.ETH66, MessageType.Status.code, connection,
+ RLP.encodeList {
+ it.writeValue(UInt64.random().toBytes())
+ it.writeRLP(
+ StatusMessage(
+ EthSubprotocol.ETH66.version(),
+ blockchainInfo.networkID(), blockchainInfo.totalDifficulty(),
+ blockchainInfo.bestHash(), blockchainInfo.genesisHash(),
blockchainInfo.getLatestForkHash(),
+ blockchainInfo.getLatestFork()
+ ).toBytes()
+ )
+ }
)
return newPeer.ready
@@ -229,16 +266,3 @@ internal class EthHandler(
override fun stop() = asyncCompletion {
}
}
-
-internal class PeerInfo() {
-
- val ready: CompletableAsyncCompletion = AsyncCompletion.incomplete()
-
- fun connect() {
- ready.complete()
- }
-
- fun cancel() {
- ready.cancel()
- }
-}
diff --git
a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHelloSubprotocol.kt
b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHelloSubprotocol.kt
index 5d6dbbc..e7dc548 100644
---
a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHelloSubprotocol.kt
+++
b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHelloSubprotocol.kt
@@ -32,10 +32,10 @@ class EthHelloSubprotocol(
) : SubProtocol {
companion object {
- val ETH62 = SubProtocolIdentifier.of("eth", 62)
- val ETH63 = SubProtocolIdentifier.of("eth", 63)
- val ETH64 = SubProtocolIdentifier.of("eth", 64)
- val ETH65 = SubProtocolIdentifier.of("eth", 65)
+ val ETH62 = SubProtocolIdentifier.of("eth", 62, 8)
+ val ETH63 = SubProtocolIdentifier.of("eth", 63, 17)
+ val ETH64 = SubProtocolIdentifier.of("eth", 64, 17)
+ val ETH65 = SubProtocolIdentifier.of("eth", 65, 17)
}
override fun id(): SubProtocolIdentifier = ETH65
@@ -48,14 +48,6 @@ class EthHelloSubprotocol(
)
}
- override fun versionRange(version: Int): Int {
- return if (version == ETH62.version()) {
- 8
- } else {
- 17
- }
- }
-
override fun createHandler(service: RLPxService, client: SubProtocolClient):
SubProtocolHandler {
val controller = EthHelloController(listener)
return EthHelloHandler(coroutineContext, blockchainInfo, service,
controller)
@@ -63,7 +55,7 @@ class EthHelloSubprotocol(
override fun getCapabilities() = mutableListOf(ETH62, ETH63, ETH64, ETH65)
- override fun createClient(service: RLPxService): SubProtocolClient {
+ override fun createClient(service: RLPxService, identifier:
SubProtocolIdentifier): SubProtocolClient {
return EthHelloClient(service)
}
}
diff --git
a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthRequestsManager.kt
b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthRequestsManager.kt
index f5192d3..b598af3 100644
---
a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthRequestsManager.kt
+++
b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthRequestsManager.kt
@@ -16,7 +16,6 @@
*/
package org.apache.tuweni.devp2p.eth
-import org.apache.tuweni.bytes.Bytes
import org.apache.tuweni.concurrent.AsyncResult
import org.apache.tuweni.concurrent.CompletableAsyncResult
import org.apache.tuweni.eth.Block
@@ -128,41 +127,6 @@ interface EthRequestsManager {
): AsyncResult<List<List<TransactionReceipt>>>
/**
- * Checks if a request was made to get block headers
- * @param connection the wire connection sending data
- * @params headers list of block headers just received
- * @return a handle to the completion of the operation, or null if no such
request was placed
- */
- fun wasRequested(connection: WireConnection, headers: List<BlockHeader>):
CompletableAsyncResult<List<BlockHeader>>?
-
- /**
- * Checks if a request was made to get block bodies
- * @param connection the wire connection sending data
- * @param bodies the bodies just received
- * @return a handle to the completion of the operation, with metadata, or
null if no such request was placed
- */
- fun wasRequested(connection: WireConnection, bodies: List<BlockBody>):
Request<List<BlockBody>>?
-
- /**
- * Checks if a request was made to get node data
- * @param connection the wire connection sending data
- * @param elements the data just received
- * @return a handle to the completion of the operation, with metadata, or
null if no such request was placed
- */
- fun nodeDataWasRequested(connection: WireConnection, elements:
List<Bytes?>): Request<List<Bytes?>>?
-
- /**
- * Checks if a request was made to get transaction receipts
- * @param connection the wire connection sending data
- * @param transactionReceipts the transaction receipts just received
- * @return a handle to the completion of the operation, with metadata, or
null if no such request was placed
- */
- fun transactionReceiptsRequested(
- connection: WireConnection,
- transactionReceipts: List<List<TransactionReceipt>>
- ): Request<List<List<TransactionReceipt>>>?
-
- /**
* Submits a new pending transaction to the transaction pool to be gossiped
to peers.
* @param tx a new transaction
*/
diff --git
a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocol.kt
b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocol.kt
index 100835c..83b91f6 100644
--- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocol.kt
+++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocol.kt
@@ -39,38 +39,43 @@ class EthSubprotocol(
) : SubProtocol {
companion object {
- val ETH62 = SubProtocolIdentifier.of("eth", 62)
- val ETH63 = SubProtocolIdentifier.of("eth", 63)
- val ETH64 = SubProtocolIdentifier.of("eth", 64)
- val ETH65 = SubProtocolIdentifier.of("eth", 65)
+ val ETH62 = SubProtocolIdentifier.of("eth", 62, 8)
+ val ETH63 = SubProtocolIdentifier.of("eth", 63, 17)
+ val ETH64 = SubProtocolIdentifier.of("eth", 64, 17)
+ val ETH65 = SubProtocolIdentifier.of("eth", 65, 17)
+ val ETH66 = SubProtocolIdentifier.of("eth", 66, 17)
}
- override fun id(): SubProtocolIdentifier = ETH65
+ override fun id(): SubProtocolIdentifier = ETH66
override fun supports(subProtocolIdentifier: SubProtocolIdentifier): Boolean
{
return "eth".equals(subProtocolIdentifier.name()) && (
subProtocolIdentifier.version() == ETH62.version() ||
subProtocolIdentifier.version() == ETH63.version() ||
subProtocolIdentifier.version() == ETH64.version() ||
- subProtocolIdentifier.version() == ETH65.version()
+ subProtocolIdentifier.version() == ETH65.version() ||
subProtocolIdentifier.version() == ETH66.version()
)
}
- override fun versionRange(version: Int): Int {
- return if (version == ETH62.version()) {
- 8
- } else {
- 17
- }
- }
-
override fun createHandler(service: RLPxService, client: SubProtocolClient):
SubProtocolHandler {
val controller = EthController(repository, pendingTransactionsPool, client
as EthRequestsManager, listener)
- return EthHandler(coroutineContext, blockchainInfo, service, controller)
+ if (client is EthClient66) {
+ return EthHandler66(coroutineContext, blockchainInfo, service,
controller)
+ } else {
+ return EthHandler(coroutineContext, blockchainInfo, service, controller)
+ }
}
- override fun getCapabilities() = mutableListOf(ETH62, ETH63, ETH64, ETH65)
+ override fun getCapabilities() = mutableListOf(ETH62, ETH63, ETH64, ETH65,
ETH66)
- override fun createClient(service: RLPxService): SubProtocolClient {
- return EthClient(service, pendingTransactionsPool,
selectionStrategy(service.repository()))
+ override fun createClient(service: RLPxService, identifier:
SubProtocolIdentifier): SubProtocolClient {
+ if (identifier == ETH66) {
+ return EthClient66(
+ service,
+ pendingTransactionsPool,
+ selectionStrategy(service.repository())
+ )
+ } else {
+ return EthClient(service, pendingTransactionsPool,
selectionStrategy(service.repository()))
+ }
}
}
diff --git
a/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocolTest.kt
b/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocolTest.kt
index e407c38..0d3cacc 100644
---
a/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocolTest.kt
+++
b/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthSubprotocolTest.kt
@@ -17,6 +17,11 @@
package org.apache.tuweni.devp2p.eth
import org.apache.tuweni.bytes.Bytes32
+import org.apache.tuweni.devp2p.eth.EthHelloSubprotocol.Companion.ETH63
+import org.apache.tuweni.devp2p.eth.EthHelloSubprotocol.Companion.ETH64
+import org.apache.tuweni.devp2p.eth.EthSubprotocol.Companion.ETH62
+import org.apache.tuweni.devp2p.eth.EthSubprotocol.Companion.ETH65
+import org.apache.tuweni.devp2p.eth.EthSubprotocol.Companion.ETH66
import org.apache.tuweni.eth.Hash
import org.apache.tuweni.eth.repository.BlockchainRepository
import org.apache.tuweni.eth.repository.MemoryTransactionPool
@@ -49,7 +54,7 @@ class EthSubprotocolTest {
repository = repository,
pendingTransactionsPool = MemoryTransactionPool()
)
- assertEquals(SubProtocolIdentifier.of("eth", 65), eth.id())
+ assertEquals(SubProtocolIdentifier.of("eth", 66), eth.id())
}
@Test
@@ -60,6 +65,7 @@ class EthSubprotocolTest {
repository = repository,
pendingTransactionsPool = MemoryTransactionPool()
)
+ assertTrue(eth.supports(SubProtocolIdentifier.of("eth", 66)))
assertTrue(eth.supports(SubProtocolIdentifier.of("eth", 65)))
assertTrue(eth.supports(SubProtocolIdentifier.of("eth", 64)))
assertTrue(eth.supports(SubProtocolIdentifier.of("eth", 63)))
@@ -70,15 +76,10 @@ class EthSubprotocolTest {
@Test
fun rangeCheck() {
- val repository = BlockchainRepository.inMemory()
- val eth = EthSubprotocol(
- blockchainInfo = blockchainInfo,
- repository = repository,
- pendingTransactionsPool = MemoryTransactionPool()
- )
- assertEquals(8, eth.versionRange(62))
- assertEquals(17, eth.versionRange(63))
- assertEquals(17, eth.versionRange(64))
- assertEquals(17, eth.versionRange(65))
+ assertEquals(8, ETH62.versionRange())
+ assertEquals(17, ETH63.versionRange())
+ assertEquals(17, ETH64.versionRange())
+ assertEquals(17, ETH65.versionRange())
+ assertEquals(17, ETH66.versionRange())
}
}
diff --git
a/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/ProxySubprotocol.kt
b/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/ProxySubprotocol.kt
index e415558..0ba26fa 100644
---
a/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/ProxySubprotocol.kt
+++
b/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/ProxySubprotocol.kt
@@ -21,20 +21,18 @@ import org.apache.tuweni.rlpx.wire.SubProtocol
import org.apache.tuweni.rlpx.wire.SubProtocolClient
import org.apache.tuweni.rlpx.wire.SubProtocolIdentifier
-class ProxySubprotocol() : SubProtocol {
+class ProxySubprotocol : SubProtocol {
companion object {
- val ID = SubProtocolIdentifier.of("pxy", 1)
+ val ID = SubProtocolIdentifier.of("pxy", 1, 3)
}
override fun id() = ID
- override fun supports(subProtocolIdentifier: SubProtocolIdentifier): Boolean
= subProtocolIdentifier.equals(ID)
-
- override fun versionRange(version: Int): Int = 3
+ override fun supports(subProtocolIdentifier: SubProtocolIdentifier): Boolean
= subProtocolIdentifier == ID
override fun createHandler(service: RLPxService, client: SubProtocolClient) =
ProxyHandler(service = service, client = client as ProxyClient)
- override fun createClient(service: RLPxService) = ProxyClient(service)
+ override fun createClient(service: RLPxService, subProtocolIdentifier:
SubProtocolIdentifier) = ProxyClient(service)
}
diff --git
a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt
b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt
index 493f82a..aa008be 100644
--- a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt
+++ b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt
@@ -27,8 +27,9 @@ import org.apache.lucene.store.NIOFSDirectory
import org.apache.tuweni.concurrent.AsyncCompletion
import org.apache.tuweni.concurrent.coroutines.await
import org.apache.tuweni.crypto.SECP256K1
-import org.apache.tuweni.devp2p.eth.EthClient
+import org.apache.tuweni.devp2p.eth.EthRequestsManager
import org.apache.tuweni.devp2p.eth.EthSubprotocol
+import org.apache.tuweni.devp2p.eth.EthSubprotocol.Companion.ETH66
import org.apache.tuweni.devp2p.eth.SimpleBlockchainInformation
import org.apache.tuweni.eth.genesis.GenesisFile
import org.apache.tuweni.eth.repository.BlockchainIndex
@@ -163,36 +164,37 @@ class EthereumClient(
adapter
)
services[rlpxConfig.getName()] = service
- peerRepository.addIdentityListener {
- service.connectTo(
- it.publicKey(),
- InetSocketAddress(it.networkInterface(), it.port())
+ service.start().thenRun {
+ logger.info("Started Ethereum client ${rlpxConfig.getName()}")
+ peerRepository.addIdentityListener {
+ service.connectTo(
+ it.publicKey(),
+ InetSocketAddress(it.networkInterface(), it.port())
+ )
+ }
+ val synchronizer = PeerStatusEthSynchronizer(
+ repository = repository,
+ client = service.getClient(ETH66) as EthRequestsManager,
+ peerRepository = peerRepository,
+ adapter = adapter
+ )
+ synchronizers[rlpxConfig.getName() + "status"] = synchronizer
+ synchronizer.start()
+ val parentSynchronizer = FromUnknownParentSynchronizer(
+ repository = repository,
+ client = service.getClient(ETH66) as EthRequestsManager,
+ peerRepository = peerRepository
+ )
+ synchronizers[rlpxConfig.getName() + "parent"] = parentSynchronizer
+ parentSynchronizer.start()
+ val bestSynchronizer = FromBestBlockSynchronizer(
+ repository = repository,
+ client = service.getClient(ETH66) as EthRequestsManager,
+ peerRepository = peerRepository
)
+ synchronizers[rlpxConfig.getName() + "best"] = bestSynchronizer
+ bestSynchronizer.start()
}
- val synchronizer = PeerStatusEthSynchronizer(
- repository = repository,
- client = ethSubprotocol.createClient(service) as EthClient,
- peerRepository = peerRepository,
- adapter = adapter
- )
- synchronizers[rlpxConfig.getName() + "status"] = synchronizer
- synchronizer.start()
- val parentSynchronizer = FromUnknownParentSynchronizer(
- repository = repository,
- client = ethSubprotocol.createClient(service) as EthClient,
- peerRepository = peerRepository
- )
- synchronizers[rlpxConfig.getName() + "parent"] = parentSynchronizer
- parentSynchronizer.start()
- val bestSynchronizer = FromBestBlockSynchronizer(
- repository = repository,
- client = ethSubprotocol.createClient(service) as EthClient,
- peerRepository = peerRepository
- )
- synchronizers[rlpxConfig.getName() + "best"] = bestSynchronizer
- bestSynchronizer.start()
- logger.info("Started Ethereum client ${rlpxConfig.getName()}")
- service.start()
}
).await()
diff --git
a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromBestBlockSynchronizer.kt
b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromBestBlockSynchronizer.kt
index 7337455..56d92ad 100644
---
a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromBestBlockSynchronizer.kt
+++
b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromBestBlockSynchronizer.kt
@@ -19,7 +19,7 @@ package org.apache.tuweni.ethclient
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
-import org.apache.tuweni.devp2p.eth.EthClient
+import org.apache.tuweni.devp2p.eth.EthRequestsManager
import org.apache.tuweni.eth.repository.BlockchainRepository
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
@@ -31,7 +31,7 @@ class FromBestBlockSynchronizer(
executor: ExecutorService = Executors.newSingleThreadExecutor(),
coroutineContext: CoroutineContext = executor.asCoroutineDispatcher(),
repository: BlockchainRepository,
- client: EthClient,
+ client: EthRequestsManager,
peerRepository: EthereumPeerRepository
) : Synchronizer(executor, coroutineContext, repository, client,
peerRepository) {
diff --git
a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromUnknownParentSynchronizer.kt
b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromUnknownParentSynchronizer.kt
index edccb21..f9d07cf 100644
---
a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromUnknownParentSynchronizer.kt
+++
b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromUnknownParentSynchronizer.kt
@@ -19,7 +19,7 @@ package org.apache.tuweni.ethclient
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
-import org.apache.tuweni.devp2p.eth.EthClient
+import org.apache.tuweni.devp2p.eth.EthRequestsManager
import org.apache.tuweni.eth.BlockHeader
import org.apache.tuweni.eth.repository.BlockchainRepository
import java.util.concurrent.ExecutorService
@@ -33,7 +33,7 @@ class FromUnknownParentSynchronizer(
executor: ExecutorService = Executors.newSingleThreadExecutor(),
coroutineContext: CoroutineContext = executor.asCoroutineDispatcher(),
repository: BlockchainRepository,
- client: EthClient,
+ client: EthRequestsManager,
peerRepository: EthereumPeerRepository
) : Synchronizer(executor, coroutineContext, repository, client,
peerRepository) {
diff --git
a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/PeerStatusEthSynchronizer.kt
b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/PeerStatusEthSynchronizer.kt
index 101eb6f..605aabe 100644
---
a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/PeerStatusEthSynchronizer.kt
+++
b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/PeerStatusEthSynchronizer.kt
@@ -18,7 +18,7 @@ package org.apache.tuweni.ethclient
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.launch
-import org.apache.tuweni.devp2p.eth.EthClient
+import org.apache.tuweni.devp2p.eth.EthRequestsManager
import org.apache.tuweni.eth.Hash
import org.apache.tuweni.eth.repository.BlockchainRepository
import java.util.concurrent.ExecutorService
@@ -37,7 +37,7 @@ class PeerStatusEthSynchronizer(
executor: ExecutorService = Executors.newSingleThreadExecutor(),
coroutineContext: CoroutineContext = executor.asCoroutineDispatcher(),
repository: BlockchainRepository,
- client: EthClient,
+ client: EthRequestsManager,
peerRepository: EthereumPeerRepository,
private val adapter: WireConnectionPeerRepositoryAdapter
) : Synchronizer(executor, coroutineContext, repository, client,
peerRepository) {
diff --git
a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/Synchronizer.kt
b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/Synchronizer.kt
index 8cb1e3c..4f8cb01 100644
--- a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/Synchronizer.kt
+++ b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/Synchronizer.kt
@@ -19,7 +19,7 @@ package org.apache.tuweni.ethclient
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.launch
-import org.apache.tuweni.devp2p.eth.EthClient
+import org.apache.tuweni.devp2p.eth.EthRequestsManager
import org.apache.tuweni.eth.BlockHeader
import org.apache.tuweni.eth.Hash
import org.apache.tuweni.eth.repository.BlockchainRepository
@@ -34,7 +34,7 @@ abstract class Synchronizer(
val executor: ExecutorService = Executors.newFixedThreadPool(1),
override val coroutineContext: CoroutineContext =
executor.asCoroutineDispatcher(),
val repository: BlockchainRepository,
- val client: EthClient,
+ val client: EthRequestsManager,
val peerRepository: EthereumPeerRepository
) : CoroutineScope {
abstract fun start()
diff --git a/les/src/main/kotlin/org/apache/tuweni/les/LESSubProtocolHandler.kt
b/les/src/main/kotlin/org/apache/tuweni/les/LESSubProtocolHandler.kt
index 2d9d442..ce7d592 100644
--- a/les/src/main/kotlin/org/apache/tuweni/les/LESSubProtocolHandler.kt
+++ b/les/src/main/kotlin/org/apache/tuweni/les/LESSubProtocolHandler.kt
@@ -128,11 +128,11 @@ internal class LESSubProtocolHandler(
conn: WireConnection,
blockBodiesMessage: BlockBodiesMessage
) {
- controller.addNewBlockBodies(conn, blockBodiesMessage.blockBodies)
+ controller.addNewBlockBodies(conn, null, blockBodiesMessage.blockBodies)
}
private suspend fun handleBlockHeadersMessage(connection: WireConnection,
blockHeadersMessage: BlockHeadersMessage) {
- controller.addNewBlockHeaders(connection, blockHeadersMessage.blockHeaders)
+ controller.addNewBlockHeaders(connection, null,
blockHeadersMessage.blockHeaders)
}
private suspend fun handleGetBlockHeaders(
diff --git a/les/src/main/kotlin/org/apache/tuweni/les/LESSubprotocol.kt
b/les/src/main/kotlin/org/apache/tuweni/les/LESSubprotocol.kt
index 09ad2fb..bf41f27 100644
--- a/les/src/main/kotlin/org/apache/tuweni/les/LESSubprotocol.kt
+++ b/les/src/main/kotlin/org/apache/tuweni/les/LESSubprotocol.kt
@@ -68,48 +68,18 @@ class LESSubprotocol(
private val listener: (WireConnection, Status) -> Unit = { _, _ -> }
) : SubProtocol {
- /**
- * Creates a new client for the subprotocol.
- *
- * @param service the rlpx service that will use the handler
- * @return a new client for the subprotocol, bound to the service.
- */
- override fun createClient(service: RLPxService): SubProtocolClient {
+ override fun createClient(service: RLPxService, subProtocolIdentifier:
SubProtocolIdentifier): SubProtocolClient {
return EthClient(service, pendingTransactionsPool,
connectionSelectionStrategy)
}
- /**
- * @return the identifier of the subprotocol
- */
override fun id(): SubProtocolIdentifier {
return LES_ID
}
- /**
- * @param subProtocolIdentifier the identifier of the subprotocol
- * @return true if the subprotocol ID and version are supported, false
otherwise
- */
override fun supports(subProtocolIdentifier: SubProtocolIdentifier): Boolean
{
return "les" == subProtocolIdentifier.name() &&
subProtocolIdentifier.version() == 2
}
- /**
- * Provides the length of the range of message types supported by the
subprotocol for a given version
- *
- * @param version the version of the subprotocol to associate with the range
- * @return the length of the range of message types supported by the
subprotocol for a given version
- */
- override fun versionRange(version: Int): Int {
- return 21
- }
-
- /**
- * Creates a new handler for the subprotocol.
- *
- * @param service the rlpx service that will use the handler
- * @param client the subprotocol client
- * @return a new handler for the subprotocol, bound to the service.
- */
override fun createHandler(service: RLPxService, client: SubProtocolClient):
SubProtocolHandler {
val controller = EthController(repo, pendingTransactionsPool, client as
EthRequestsManager, listener)
return LESSubProtocolHandler(
@@ -128,6 +98,6 @@ class LESSubprotocol(
}
companion object {
- internal val LES_ID = SubProtocolIdentifier.of("les", 2)
+ internal val LES_ID = SubProtocolIdentifier.of("les", 2, 21)
}
}
diff --git a/rlp/src/main/java/org/apache/tuweni/rlp/BytesRLPReader.java
b/rlp/src/main/java/org/apache/tuweni/rlp/BytesRLPReader.java
index 0d43c43..8a4bec0 100644
--- a/rlp/src/main/java/org/apache/tuweni/rlp/BytesRLPReader.java
+++ b/rlp/src/main/java/org/apache/tuweni/rlp/BytesRLPReader.java
@@ -33,6 +33,15 @@ final class BytesRLPReader implements RLPReader {
}
@Override
+ public Bytes readRemaining() {
+ int remaining = content.size() - index;
+ if (remaining == 0) {
+ return Bytes.EMPTY;
+ }
+ return content.slice(remaining);
+ }
+
+ @Override
public Bytes readValue(boolean lenient) {
int remaining = content.size() - index;
if (remaining == 0) {
diff --git a/rlp/src/main/java/org/apache/tuweni/rlp/RLPReader.java
b/rlp/src/main/java/org/apache/tuweni/rlp/RLPReader.java
index 05e279f..e8a38da 100644
--- a/rlp/src/main/java/org/apache/tuweni/rlp/RLPReader.java
+++ b/rlp/src/main/java/org/apache/tuweni/rlp/RLPReader.java
@@ -406,4 +406,11 @@ public interface RLPReader {
* @return current reader position
*/
int position();
+
+ /**
+ * Provides the remainder of the bytes that have not been read yet.
+ *
+ * @return the remainder of the input at current position
+ */
+ Bytes readRemaining();
}
diff --git a/rlp/src/test/java/org/apache/tuweni/rlp/BytesRLPReaderTest.java
b/rlp/src/test/java/org/apache/tuweni/rlp/BytesRLPReaderTest.java
index dfed5e0..6b0d245 100644
--- a/rlp/src/test/java/org/apache/tuweni/rlp/BytesRLPReaderTest.java
+++ b/rlp/src/test/java/org/apache/tuweni/rlp/BytesRLPReaderTest.java
@@ -219,4 +219,15 @@ class BytesRLPReaderTest {
});
assertEquals(expected, result);
}
+
+ @Test
+ void shouldReadRemaining() {
+ Bytes input = Bytes.fromHexString("83646f6783646f67");
+ RLP.decode(input, reader -> {
+ reader.readValue();
+ assertEquals(4, reader.position());
+ assertEquals(Bytes.fromHexString("83646f67"), reader.readRemaining());
+ return null;
+ });
+ }
}
diff --git
a/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxAcceptanceTest.java
b/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxAcceptanceTest.java
index a2c309e..bc2c074 100644
---
a/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxAcceptanceTest.java
+++
b/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxAcceptanceTest.java
@@ -13,6 +13,7 @@
package org.apache.tuweni.rlpx.vertx;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import org.apache.tuweni.bytes.Bytes;
@@ -79,13 +80,13 @@ class VertxAcceptanceTest {
}
}
- private static class MyCustomSubProtocol implements SubProtocol {
+ static class MyCustomSubProtocol implements SubProtocol {
public MyCustomSubProtocolHandler handler;
@Override
public SubProtocolIdentifier id() {
- return SubProtocolIdentifier.of("cus", 1);
+ return SubProtocolIdentifier.of("cus", 1, 1);
}
@Override
@@ -94,18 +95,13 @@ class VertxAcceptanceTest {
}
@Override
- public int versionRange(int version) {
- return 1;
- }
-
- @Override
public SubProtocolHandler createHandler(RLPxService service,
SubProtocolClient client) {
handler = new MyCustomSubProtocolHandler(service, id());
return handler;
}
@Override
- public SubProtocolClient createClient(RLPxService service) {
+ public SubProtocolClient createClient(RLPxService service,
SubProtocolIdentifier identifier) {
return null;
}
}
@@ -134,12 +130,14 @@ class VertxAcceptanceTest {
secondService.start().join();
try {
- service.connectTo(secondKp.publicKey(), new
InetSocketAddress("localhost", secondService.actualPort()));
-
- Thread.sleep(3000);
+ WireConnection conn =
+ service.connectTo(secondKp.publicKey(), new
InetSocketAddress("localhost", secondService.actualPort())).get();
+ assertNotNull(conn);
+ assertEquals(1, conn.agreedSubprotocols().size());
assertEquals(1, repository.asMap().size());
assertEquals(1, secondRepository.asMap().size());
+ Thread.sleep(1000);
assertEquals(1, sp.handler.messages.size());
assertEquals(1, secondSp.handler.messages.size());
@@ -176,11 +174,13 @@ class VertxAcceptanceTest {
secondService.start().join();
try {
- service.connectTo(secondKp.publicKey(), new
InetSocketAddress("localhost", secondService.actualPort()));
-
- Thread.sleep(3000);
+ WireConnection conn =
+ service.connectTo(secondKp.publicKey(), new
InetSocketAddress("localhost", secondService.actualPort())).get();
+ assertNotNull(conn);
+ assertEquals(1, conn.agreedSubprotocols().size());
assertEquals(1, repository.asMap().size());
assertEquals(1, secondRepository.asMap().size());
+ Thread.sleep(1000);
assertEquals(1, sp.handler.messages.size());
assertEquals(1, secondSp.handler.messages.size());
@@ -238,6 +238,11 @@ class VertxAcceptanceTest {
public int version() {
return 63;
}
+
+ @Override
+ public int versionRange() {
+ return 8;
+ }
};
}
@@ -247,17 +252,12 @@ class VertxAcceptanceTest {
}
@Override
- public int versionRange(int version) {
- return 0;
- }
-
- @Override
public SubProtocolHandler createHandler(RLPxService service,
SubProtocolClient client) {
return null;
}
@Override
- public SubProtocolClient createClient(RLPxService service) {
+ public SubProtocolClient createClient(RLPxService service,
SubProtocolIdentifier identifier) {
return null;
}
}), "Client 1", repository);
diff --git
a/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxRLPxServiceTest.java
b/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxRLPxServiceTest.java
index 0ed9795..6694ae3 100644
---
a/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxRLPxServiceTest.java
+++
b/rlpx/src/integrationTest/java/org/apache/tuweni/rlpx/vertx/VertxRLPxServiceTest.java
@@ -37,8 +37,10 @@ import org.apache.tuweni.rlpx.wire.WireConnection;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicBoolean;
import io.vertx.core.Vertx;
@@ -112,7 +114,7 @@ class VertxRLPxServiceTest {
}
@Test
- void connectToOtherPeer(@VertxInstance Vertx vertx) throws Exception {
+ void connectToOtherPeerWithNoSubProtocols(@VertxInstance Vertx vertx) throws
Exception {
SECP256K1.KeyPair ourPair = SECP256K1.KeyPair.random();
SECP256K1.KeyPair peerPair = SECP256K1.KeyPair.random();
VertxRLPxService service = new VertxRLPxService(vertx, 0, "localhost",
10000, ourPair, new ArrayList<>(), "abc");
@@ -122,23 +124,23 @@ class VertxRLPxServiceTest {
new VertxRLPxService(vertx, 0, "localhost", 10000, peerPair, new
ArrayList<>(), "abc");
peerService.start().join();
- try {
+ assertThrows(CancellationException.class, () -> {
service.connectTo(peerPair.publicKey(), new
InetSocketAddress("127.0.0.1", peerService.actualPort())).get();
- } finally {
- service.stop();
- peerService.stop();
- }
+ });
+ service.stop();
+ peerService.stop();
}
@Test
void disconnectAfterStop(@VertxInstance Vertx vertx) throws Exception {
SECP256K1.KeyPair ourPair = SECP256K1.KeyPair.random();
SECP256K1.KeyPair peerPair = SECP256K1.KeyPair.random();
- VertxRLPxService service = new VertxRLPxService(vertx, 0, "localhost",
10000, ourPair, new ArrayList<>(), "abc");
+ List<SubProtocol> protocols = Arrays.asList(new
VertxAcceptanceTest.MyCustomSubProtocol());
+ VertxRLPxService service = new VertxRLPxService(vertx, 0, "localhost",
10000, ourPair, protocols, "abc");
service.start().join();
- VertxRLPxService peerService =
- new VertxRLPxService(vertx, 0, "localhost", 10000, peerPair, new
ArrayList<>(), "abc");
+
+ VertxRLPxService peerService = new VertxRLPxService(vertx, 0, "localhost",
10000, peerPair, protocols, "abc");
peerService.start().join();
WireConnection conn = null;
@@ -166,7 +168,7 @@ class VertxRLPxServiceTest {
List<SubProtocol> protocols = Collections.singletonList(new SubProtocol() {
@Override
public SubProtocolIdentifier id() {
- return SubProtocolIdentifier.of("eth", 63);
+ return SubProtocolIdentifier.of("eth", 63, 17);
}
@Override
@@ -175,19 +177,15 @@ class VertxRLPxServiceTest {
}
@Override
- public int versionRange(int version) {
- return 0;
- }
-
- @Override
public SubProtocolHandler createHandler(RLPxService service,
SubProtocolClient client) {
SubProtocolHandler handler = mock(SubProtocolHandler.class);
+
when(handler.handleNewPeerConnection(any())).thenReturn(AsyncCompletion.COMPLETED);
when(handler.stop()).thenReturn(AsyncCompletion.COMPLETED);
return handler;
}
@Override
- public SubProtocolClient createClient(RLPxService service) {
+ public SubProtocolClient createClient(RLPxService service,
SubProtocolIdentifier identifier) {
return mock(SubProtocolClient.class);
}
});
@@ -232,8 +230,9 @@ class VertxRLPxServiceTest {
void getClientWeCreate(@VertxInstance Vertx vertx) throws Exception {
SubProtocol sp = mock(SubProtocol.class);
SubProtocolClient client = mock(SubProtocolClient.class);
- when(sp.id()).thenReturn(SubProtocolIdentifier.of("foo", 1));
- when(sp.createClient(any())).thenReturn(client);
+ when(sp.id()).thenReturn(SubProtocolIdentifier.of("foo", 1, 2));
+
when(sp.getCapabilities()).thenReturn(Arrays.asList(SubProtocolIdentifier.of("foo",
1, 2)));
+ when(sp.createClient(any(), any())).thenReturn(client);
MemoryWireConnectionsRepository peerRepository = new
MemoryWireConnectionsRepository();
VertxRLPxService peerService = new VertxRLPxService(
vertx,
diff --git
a/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java
b/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java
index 8effb49..930b1be 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java
@@ -39,7 +39,6 @@ import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@@ -72,8 +71,8 @@ public final class VertxRLPxService implements RLPxService {
private final String clientId;
private final WireConnectionRepository repository;
- private LinkedHashMap<SubProtocol, SubProtocolHandler> handlers;
- private LinkedHashMap<SubProtocol, SubProtocolClient> clients;
+ private LinkedHashMap<SubProtocolIdentifier, SubProtocolHandler> handlers;
+ private LinkedHashMap<SubProtocolIdentifier, SubProtocolClient> clients;
private NetClient client;
private NetServer server;
@@ -175,9 +174,14 @@ public final class VertxRLPxService implements RLPxService
{
clients = new LinkedHashMap<>();
for (SubProtocol subProtocol : subProtocols) {
- SubProtocolClient client = subProtocol.createClient(this);
- clients.put(subProtocol, client);
- handlers.put(subProtocol, subProtocol.createHandler(this, client));
+ for (SubProtocolIdentifier identifier : subProtocol.getCapabilities())
{
+ if (identifier.versionRange() == 0) {
+ throw new IllegalArgumentException("Invalid subprotocol " +
identifier.toString());
+ }
+ SubProtocolClient client = subProtocol.createClient(this,
identifier);
+ clients.put(identifier, client);
+ handlers.put(identifier, subProtocol.createHandler(this, client));
+ }
}
client = vertx.createNetClient(new NetClientOptions());
@@ -303,12 +307,7 @@ public final class VertxRLPxService implements RLPxService
{
if (!started.get()) {
throw new IllegalStateException("The RLPx service is not active");
}
- for (Map.Entry<SubProtocol, SubProtocolClient> clientEntry :
clients.entrySet()) {
- if (clientEntry.getKey().id().equals(subProtocolIdentifier)) {
- return clientEntry.getValue();
- }
- }
- return null;
+ return clients.get(subProtocolIdentifier);
}
@Override
diff --git
a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultSubProtocolIdentifier.java
b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultSubProtocolIdentifier.java
index 85017d8..efd731d 100644
---
a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultSubProtocolIdentifier.java
+++
b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultSubProtocolIdentifier.java
@@ -21,10 +21,12 @@ final class DefaultSubProtocolIdentifier implements
SubProtocolIdentifier {
private final String name;
private final int version;
+ private final int range;
- DefaultSubProtocolIdentifier(String name, int version) {
+ DefaultSubProtocolIdentifier(String name, int version, int range) {
this.name = name;
this.version = version;
+ this.range = range;
}
@Override
@@ -38,6 +40,11 @@ final class DefaultSubProtocolIdentifier implements
SubProtocolIdentifier {
}
@Override
+ public int versionRange() {
+ return range;
+ }
+
+ @Override
public boolean equals(Object o) {
if (this == o)
return true;
@@ -51,4 +58,9 @@ final class DefaultSubProtocolIdentifier implements
SubProtocolIdentifier {
public int hashCode() {
return Objects.hash(name, version);
}
+
+ @Override
+ public String toString() {
+ return "DefaultSubProtocolIdentifier{" + "name='" + name + '\'' + ",
version=" + version + '}';
+ }
}
diff --git
a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java
b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java
index 73e5432..f05a607 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java
@@ -46,7 +46,7 @@ public final class DefaultWireConnection implements
WireConnection {
private final Consumer<RLPxMessage> writer;
private final Consumer<HelloMessage> afterHandshakeListener;
private final Runnable disconnectHandler;
- private final LinkedHashMap<SubProtocol, SubProtocolHandler> subprotocols;
+ private final LinkedHashMap<SubProtocolIdentifier, SubProtocolHandler>
subprotocols;
private final int p2pVersion;
private final String clientId;
private final int advertisedPort;
@@ -57,7 +57,7 @@ public final class DefaultWireConnection implements
WireConnection {
private CompletableAsyncCompletion awaitingPong;
private HelloMessage myHelloMessage;
private HelloMessage peerHelloMessage;
- private RangeMap<Integer, SubProtocol> subprotocolRangeMap =
TreeRangeMap.create();
+ private RangeMap<Integer, SubProtocolIdentifier> subprotocolRangeMap =
TreeRangeMap.create();
private DisconnectReason disconnectReason;
private boolean disconnectRequested;
private boolean disconnectReceived;
@@ -85,7 +85,7 @@ public final class DefaultWireConnection implements
WireConnection {
Consumer<RLPxMessage> writer,
Consumer<HelloMessage> afterHandshakeListener,
Runnable disconnectHandler,
- LinkedHashMap<SubProtocol, SubProtocolHandler> subprotocols,
+ LinkedHashMap<SubProtocolIdentifier, SubProtocolHandler> subprotocols,
int p2pVersion,
String clientId,
int advertisedPort,
@@ -136,6 +136,17 @@ public final class DefaultWireConnection implements
WireConnection {
return;
}
+ if (subprotocolRangeMap.asMapOfRanges().isEmpty()) {
+ logger
+ .debug(
+ "Useless peer detected, caps {}, our caps {}",
+ peerHelloMessage.capabilities(),
+ subprotocols.keySet());
+ disconnect(DisconnectReason.USELESS_PEER);
+ ready.cancel();
+ return;
+ }
+
if (myHelloMessage == null) {
sendHello();
}
@@ -183,7 +194,8 @@ public final class DefaultWireConnection implements
WireConnection {
awaitingPong.complete();
}
} else {
- Map.Entry<Range<Integer>, SubProtocol> subProtocolEntry =
subprotocolRangeMap.getEntry(message.messageId());
+ Map.Entry<Range<Integer>, SubProtocolIdentifier> subProtocolEntry =
+ subprotocolRangeMap.getEntry(message.messageId());
if (subProtocolEntry == null) {
logger.debug("Unknown message received {}", message.messageId());
disconnect(DisconnectReason.PROTOCOL_BREACH);
@@ -203,27 +215,29 @@ public final class DefaultWireConnection implements
WireConnection {
}
}
- private void initSupportedRange(List<Capability> capabilities) {
+ void initSupportedRange(List<Capability> capabilities) {
int startRange = 16;
- Map<String, Capability> pickedCapabilities = new HashMap<>();
- for (SubProtocol sp : subprotocols.keySet()) {
+ Map<String, SubProtocolIdentifier> pickedCapabilities = new HashMap<>();
+ // find the max capability supported by the subprotocol
+ for (SubProtocolIdentifier sp : subprotocols.keySet()) {
for (Capability cap : capabilities) {
- if (sp.supports(SubProtocolIdentifier.of(cap.name(), cap.version()))) {
- Capability oldPick = pickedCapabilities.get(cap.name());
+ if (sp.equals(SubProtocolIdentifier.of(cap.name(), cap.version()))) {
+ SubProtocolIdentifier oldPick = pickedCapabilities.get(cap.name());
if (oldPick == null || oldPick.version() < cap.version()) {
- pickedCapabilities.put(cap.name(), cap);
+ pickedCapabilities.put(cap.name(), sp);
}
}
}
}
for (Capability cap : capabilities) {
- if (!pickedCapabilities.containsValue(cap)) {
+ SubProtocolIdentifier capSp = SubProtocolIdentifier.of(cap.name(),
cap.version());
+ if (!pickedCapabilities.get(cap.name()).equals(capSp)) {
continue;
}
- for (SubProtocol sp : subprotocols.keySet()) {
- if (sp.supports(SubProtocolIdentifier.of(cap.name(), cap.version()))) {
- int numberOfMessageTypes = sp.versionRange(cap.version());
+ for (SubProtocolIdentifier sp : subprotocols.keySet()) {
+ if (sp.equals(capSp)) {
+ int numberOfMessageTypes = sp.versionRange();
subprotocolRangeMap.put(Range.closedOpen(startRange, startRange +
numberOfMessageTypes), sp);
startRange += numberOfMessageTypes;
break;
@@ -273,8 +287,6 @@ public final class DefaultWireConnection implements
WireConnection {
subprotocols
.keySet()
.stream()
- .map(SubProtocol::getCapabilities)
- .flatMap(subProtocolIdentifiers ->
subProtocolIdentifiers.stream())
.map(
subProtocolIdentifier -> new Capability(
subProtocolIdentifier.name(),
@@ -286,8 +298,8 @@ public final class DefaultWireConnection implements
WireConnection {
@Override
public boolean supports(SubProtocolIdentifier subProtocolIdentifier) {
- for (SubProtocol sp : subprotocolRangeMap.asMapOfRanges().values()) {
- if (sp.supports(subProtocolIdentifier)) {
+ for (SubProtocolIdentifier sp :
subprotocolRangeMap.asMapOfRanges().values()) {
+ if (sp.equals(subProtocolIdentifier)) {
return true;
}
}
@@ -297,8 +309,8 @@ public final class DefaultWireConnection implements
WireConnection {
@Override
public Collection<SubProtocolIdentifier> agreedSubprotocols() {
List<SubProtocolIdentifier> identifiers = new ArrayList<>();
- for (SubProtocol sp : subprotocolRangeMap.asMapOfRanges().values()) {
- identifiers.addAll(sp.getCapabilities());
+ for (SubProtocolIdentifier sp :
subprotocolRangeMap.asMapOfRanges().values()) {
+ identifiers.add(sp);
}
return identifiers;
}
@@ -306,8 +318,8 @@ public final class DefaultWireConnection implements
WireConnection {
public void sendMessage(SubProtocolIdentifier subProtocolIdentifier, int
messageType, Bytes message) {
logger.trace("Sending sub-protocol message {} {}", messageType, message);
Integer offset = null;
- for (Map.Entry<Range<Integer>, SubProtocol> entry :
subprotocolRangeMap.asMapOfRanges().entrySet()) {
- if (entry.getValue().supports(subProtocolIdentifier)) {
+ for (Map.Entry<Range<Integer>, SubProtocolIdentifier> entry :
subprotocolRangeMap.asMapOfRanges().entrySet()) {
+ if (entry.getValue().equals(subProtocolIdentifier)) {
offset = entry.getKey().lowerEndpoint();
break;
}
diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocol.java
b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocol.java
index d48b70d..daae3a2 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocol.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocol.java
@@ -39,14 +39,6 @@ public interface SubProtocol {
boolean supports(SubProtocolIdentifier subProtocolIdentifier);
/**
- * Provides the length of the range of message types supported by the
subprotocol for a given version
- *
- * @param version the version of the subprotocol to associate with the range
- * @return the length of the range of message types supported by the
subprotocol for a given version
- */
- int versionRange(int version);
-
- /**
* Creates a new handler for the subprotocol.
*
* @param service the rlpx service that will use the handler
@@ -59,9 +51,10 @@ public interface SubProtocol {
* Creates a new client for the subprotocol.
*
* @param service the rlpx service that will use the handler
+ * @param identifier the version of the subprotocol
* @return a new client for the subprotocol, bound to the service.
*/
- SubProtocolClient createClient(RLPxService service);
+ SubProtocolClient createClient(RLPxService service, SubProtocolIdentifier
identifier);
/**
* Provides the capabilities supported by the subprotocol.
diff --git
a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocolIdentifier.java
b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocolIdentifier.java
index 29cb628..8546789 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocolIdentifier.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocolIdentifier.java
@@ -21,8 +21,12 @@ import static java.util.Objects.requireNonNull;
public interface SubProtocolIdentifier {
static SubProtocolIdentifier of(String name, int version) {
+ return of(name, version, 0);
+ }
+
+ static SubProtocolIdentifier of(String name, int version, int range) {
requireNonNull(name);
- return new DefaultSubProtocolIdentifier(name, version);
+ return new DefaultSubProtocolIdentifier(name, version, range);
}
/**
@@ -38,4 +42,11 @@ public interface SubProtocolIdentifier {
* @return the version of the subprotocol
*/
int version();
+
+ /**
+ * Provides the length of the range of message types supported by the
subprotocol for a given version
+ *
+ * @return the length of the range of message types supported by the
subprotocol for a given version
+ */
+ int versionRange();
}
diff --git
a/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/DefaultWireConnectionTest.java
b/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/DefaultWireConnectionTest.java
index 9cf7719..f2d270e 100644
---
a/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/DefaultWireConnectionTest.java
+++
b/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/DefaultWireConnectionTest.java
@@ -13,6 +13,7 @@
package org.apache.tuweni.rlpx.wire;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.concurrent.AsyncResult;
@@ -20,8 +21,10 @@ import org.apache.tuweni.crypto.SECP256K1;
import org.apache.tuweni.junit.BouncyCastleExtension;
import org.apache.tuweni.rlpx.RLPxMessage;
+import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Test;
@@ -170,4 +173,18 @@ class DefaultWireConnectionTest {
assertEquals(DisconnectReason.INCOMPATIBLE_DEVP2P_VERSION.code,
msg.reason());
}
+
+ @Test
+ void testCapabilitiesNegotiation() {
+ SubProtocolIdentifier cus = SubProtocolIdentifier.of("cus", 1, 1);
+ LinkedHashMap<SubProtocolIdentifier, SubProtocolHandler> subprotocols =
new LinkedHashMap<>();
+ subprotocols.put(cus, mock(SubProtocolHandler.class));
+ DefaultWireConnection conn = new DefaultWireConnection(nodeId, peerNodeId,
disconnect -> {
+ }, helloMessage -> {
+ }, () -> {
+ }, subprotocols, 5, "abc", 10000, AsyncResult.incomplete(), "127.0.0.1",
1234);
+ List<Capability> capabilityList = Arrays.asList(new Capability("cus", 1));
+ conn.initSupportedRange(capabilityList);
+ assertEquals(1, conn.agreedSubprotocols().size());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]