This is an automated email from the ASF dual-hosted git repository. toulmean pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
commit 2bd29202c48d3336e3beab29ad142375c8f2c9a0 Author: Antoine Toulme <[email protected]> AuthorDate: Fri May 29 23:14:14 2020 -0700 various fixes to design of rlpx service, allow to return a future mapping to when the connection is complete --- .../kotlin/org/apache/tuweni/les/LESSubprotocol.kt | 2 + .../apache/tuweni/les/LESSubProtocolHandlerTest.kt | 3 +- .../java/org/apache/tuweni/rlpx/RLPxService.java | 5 +- .../apache/tuweni/rlpx/vertx/VertxRLPxService.java | 31 ++++++---- .../rlpx/wire/DefaultSubProtocolIdentifier.java | 19 +++++- .../tuweni/rlpx/wire/DefaultWireConnection.java | 67 +++++++++++++++++----- .../org/apache/tuweni/rlpx/wire/SubProtocol.java | 9 +++ .../tuweni/rlpx/wire/SubProtocolIdentifier.java | 4 ++ .../rlpx/wire/DefaultWireConnectionTest.java | 15 ++--- .../org/apache/tuweni/rlpx/wire/PingPongTest.java | 6 +- 10 files changed, 125 insertions(+), 36 deletions(-) diff --git a/les/src/main/kotlin/org/apache/tuweni/les/LESSubprotocol.kt b/les/src/main/kotlin/org/apache/tuweni/les/LESSubprotocol.kt index d79eb9a..726333f 100644 --- a/les/src/main/kotlin/org/apache/tuweni/les/LESSubprotocol.kt +++ b/les/src/main/kotlin/org/apache/tuweni/les/LESSubprotocol.kt @@ -51,6 +51,8 @@ class LESSubprotocol private val repo: BlockchainRepository ) : SubProtocol { + override fun getCapabilities(): MutableList<SubProtocolIdentifier> = mutableListOf(SubProtocolIdentifier.of("les", 2)) + override fun id(): SubProtocolIdentifier { return LES_ID } diff --git a/les/src/test/kotlin/org/apache/tuweni/les/LESSubProtocolHandlerTest.kt b/les/src/test/kotlin/org/apache/tuweni/les/LESSubProtocolHandlerTest.kt index 3619b12..89a1f02 100644 --- a/les/src/test/kotlin/org/apache/tuweni/les/LESSubProtocolHandlerTest.kt +++ b/les/src/test/kotlin/org/apache/tuweni/les/LESSubProtocolHandlerTest.kt @@ -44,6 +44,7 @@ import org.apache.tuweni.units.bigints.UInt256 import org.apache.tuweni.units.ethereum.Gas import org.apache.tuweni.units.ethereum.Wei import org.apache.lucene.index.IndexWriter +import org.apache.tuweni.concurrent.AsyncResult import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertNotNull import org.junit.jupiter.api.Assertions.assertThrows @@ -97,7 +98,7 @@ constructor() { var message: Bytes? = null var disconnectReason: DisconnectReason? = null - override fun connectTo(peerPublicKey: SECP256K1.PublicKey, peerAddress: InetSocketAddress): AsyncCompletion? { + override fun connectTo(peerPublicKey: SECP256K1.PublicKey, peerAddress: InetSocketAddress): AsyncResult<String>? { return null } diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/RLPxService.java b/rlpx/src/main/java/org/apache/tuweni/rlpx/RLPxService.java index f4ecce9..f5e3b7c 100644 --- a/rlpx/src/main/java/org/apache/tuweni/rlpx/RLPxService.java +++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/RLPxService.java @@ -14,6 +14,7 @@ package org.apache.tuweni.rlpx; import org.apache.tuweni.bytes.Bytes; import org.apache.tuweni.concurrent.AsyncCompletion; +import org.apache.tuweni.concurrent.AsyncResult; import org.apache.tuweni.crypto.SECP256K1; import org.apache.tuweni.rlpx.wire.DisconnectReason; import org.apache.tuweni.rlpx.wire.SubProtocolIdentifier; @@ -30,9 +31,9 @@ public interface RLPxService { * * @param peerPublicKey the peer public key * @param peerAddress the peer host and port - * @return a handle that completes if the peer connects successfully. + * @return a handle that completes if the peer connects successfully, providing the connection ID. */ - AsyncCompletion connectTo(SECP256K1.PublicKey peerPublicKey, InetSocketAddress peerAddress); + AsyncResult<String> connectTo(SECP256K1.PublicKey peerPublicKey, InetSocketAddress peerAddress); /** diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java b/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java index 5c72044..e96e7c5 100644 --- a/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java +++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java @@ -15,7 +15,9 @@ package org.apache.tuweni.rlpx.vertx; import org.apache.tuweni.bytes.Bytes; import org.apache.tuweni.bytes.Bytes32; import org.apache.tuweni.concurrent.AsyncCompletion; +import org.apache.tuweni.concurrent.AsyncResult; import org.apache.tuweni.concurrent.CompletableAsyncCompletion; +import org.apache.tuweni.concurrent.CompletableAsyncResult; import org.apache.tuweni.crypto.SECP256K1.KeyPair; import org.apache.tuweni.crypto.SECP256K1.PublicKey; import org.apache.tuweni.rlpx.HandshakeMessage; @@ -202,7 +204,7 @@ public final class VertxRLPxService implements RLPxService { } private void receiveMessage(NetSocket netSocket) { - netSocket.handler(new Handler<Buffer>() { + netSocket.handler(new Handler<>() { private RLPxConnection conn; @@ -217,8 +219,7 @@ public final class VertxRLPxService implements RLPxService { keyPair, bytes -> netSocket.write(Buffer.buffer(bytes.toArrayUnsafe()))); if (wireConnection == null) { - this.wireConnection = createConnection(conn, netSocket); - wireConnection.handleConnectionStart(); + this.wireConnection = createConnection(conn, netSocket, AsyncResult.incomplete()); } } else { conn.stream(Bytes.wrapBuffer(buffer), wireConnection::messageReceived); @@ -283,11 +284,11 @@ public final class VertxRLPxService implements RLPxService { } @Override - public AsyncCompletion connectTo(PublicKey peerPublicKey, InetSocketAddress peerAddress) { + public AsyncResult<String> connectTo(PublicKey peerPublicKey, InetSocketAddress peerAddress) { if (!started.get()) { throw new IllegalStateException("The RLPx service is not active"); } - CompletableAsyncCompletion connected = AsyncCompletion.incomplete(); + CompletableAsyncResult<String> connected = AsyncResult.incomplete(); logger.debug("Connecting to {} with public key {}", peerAddress, peerPublicKey); client .connect( @@ -300,7 +301,14 @@ public final class VertxRLPxService implements RLPxService { logger.debug("Initiating handshake to {}", peerAddress); netSocket.write(Buffer.buffer(initHandshakeMessage.toArrayUnsafe())); - netSocket.handler(new Handler<Buffer>() { + netSocket.closeHandler(event -> { + logger.debug("Connection {} closed", peerAddress); + if (!connected.isDone()) { + connected.cancel(); + } + }); + + netSocket.handler(new Handler<>() { private RLPxConnection conn; @@ -331,8 +339,7 @@ public final class VertxRLPxService implements RLPxService { keyPair.publicKey(), peerPublicKey); - this.wireConnection = createConnection(conn, netSocket); - connected.complete(); + this.wireConnection = createConnection(conn, netSocket, connected); if (messageBytes.isEmpty()) { return; } @@ -352,7 +359,10 @@ public final class VertxRLPxService implements RLPxService { return connected; } - private DefaultWireConnection createConnection(RLPxConnection conn, NetSocket netSocket) { + private DefaultWireConnection createConnection( + RLPxConnection conn, + NetSocket netSocket, + CompletableAsyncResult<String> ready) { String id = UUID.randomUUID().toString(); DefaultWireConnection wireConnection = new DefaultWireConnection(id, conn.publicKey().bytes(), conn.peerPublicKey().bytes(), message -> { @@ -360,8 +370,9 @@ public final class VertxRLPxService implements RLPxService { Bytes bytes = conn.write(message); vertx.eventBus().send(netSocket.writeHandlerID(), Buffer.buffer(bytes.toArrayUnsafe())); } - }, conn::configureAfterHandshake, netSocket::end, handlers, DEVP2P_VERSION, clientId, advertisedPort()); + }, conn::configureAfterHandshake, netSocket::end, handlers, DEVP2P_VERSION, clientId, advertisedPort(), ready); repository.add(wireConnection); + wireConnection.handleConnectionStart(); return wireConnection; } diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultSubProtocolIdentifier.java b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultSubProtocolIdentifier.java index c67447d..85017d8 100644 --- a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultSubProtocolIdentifier.java +++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultSubProtocolIdentifier.java @@ -12,6 +12,8 @@ */ package org.apache.tuweni.rlpx.wire; +import java.util.Objects; + /** * Default implementation of a sub protocol identifier */ @@ -20,7 +22,7 @@ final class DefaultSubProtocolIdentifier implements SubProtocolIdentifier { private final String name; private final int version; - public DefaultSubProtocolIdentifier(String name, int version) { + DefaultSubProtocolIdentifier(String name, int version) { this.name = name; this.version = version; } @@ -34,4 +36,19 @@ final class DefaultSubProtocolIdentifier implements SubProtocolIdentifier { public int version() { return version; } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + DefaultSubProtocolIdentifier that = (DefaultSubProtocolIdentifier) o; + return version == that.version && name.equals(that.name); + } + + @Override + public int hashCode() { + return Objects.hash(name, version); + } } diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java index f33c6d6..3a246c4 100644 --- a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java +++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java @@ -15,15 +15,16 @@ package org.apache.tuweni.rlpx.wire; import org.apache.tuweni.bytes.Bytes; import org.apache.tuweni.concurrent.AsyncCompletion; import org.apache.tuweni.concurrent.CompletableAsyncCompletion; +import org.apache.tuweni.concurrent.CompletableAsyncResult; import org.apache.tuweni.rlpx.RLPxMessage; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.function.Consumer; import java.util.stream.Collectors; -import com.google.common.collect.BoundType; import com.google.common.collect.Range; import com.google.common.collect.RangeMap; import com.google.common.collect.TreeRangeMap; @@ -47,6 +48,7 @@ public final class DefaultWireConnection implements WireConnection { private final int p2pVersion; private final String clientId; private final int advertisedPort; + private final CompletableAsyncResult<String> ready; private CompletableAsyncCompletion awaitingPong; private HelloMessage myHelloMessage; @@ -59,7 +61,6 @@ public final class DefaultWireConnection implements WireConnection { * @param id the id of the connection * @param nodeId the node id of this node * @param peerNodeId the node id of the peer - * @param logger a logger * @param writer the message writer * @param afterHandshakeListener a listener called after the handshake is complete with the peer hello message. * @param disconnectHandler the handler to run upon receiving a disconnect message @@ -67,6 +68,7 @@ public final class DefaultWireConnection implements WireConnection { * @param p2pVersion the version of the devp2p protocol supported by this client * @param clientId the client ID to announce in HELLO messages * @param advertisedPort the port we listen to, to announce in HELLO messages + * @param ready a handle to complete when the connection is ready for use. */ public DefaultWireConnection( String id, @@ -78,7 +80,8 @@ public final class DefaultWireConnection implements WireConnection { LinkedHashMap<SubProtocol, SubProtocolHandler> subprotocols, int p2pVersion, String clientId, - int advertisedPort) { + int advertisedPort, + CompletableAsyncResult<String> ready) { this.id = id; this.nodeId = nodeId; this.peerNodeId = peerNodeId; @@ -89,6 +92,7 @@ public final class DefaultWireConnection implements WireConnection { this.p2pVersion = p2pVersion; this.clientId = clientId; this.advertisedPort = advertisedPort; + this.ready = ready; logger.debug("New wire connection created"); } @@ -97,24 +101,27 @@ public final class DefaultWireConnection implements WireConnection { peerHelloMessage = HelloMessage.read(message.content()); logger.debug("Received peer Hello message {}", peerHelloMessage); initSupportedRange(peerHelloMessage.capabilities()); - if (peerHelloMessage.nodeId() == null || peerHelloMessage.nodeId().isEmpty()) { disconnect(DisconnectReason.NULL_NODE_IDENTITY_RECEIVED); + ready.cancel(); return; } if (!peerHelloMessage.nodeId().equals(peerNodeId)) { disconnect(DisconnectReason.UNEXPECTED_IDENTITY); + ready.cancel(); return; } if (peerHelloMessage.nodeId().equals(nodeId)) { disconnect(DisconnectReason.CONNECTED_TO_SELF); + ready.cancel(); return; } if (peerHelloMessage.p2pVersion() > p2pVersion) { disconnect(DisconnectReason.INCOMPATIBLE_DEVP2P_VERSION); + ready.cancel(); return; } @@ -124,13 +131,22 @@ public final class DefaultWireConnection implements WireConnection { afterHandshakeListener.accept(peerHelloMessage); - for (SubProtocol subProtocol : subprotocolRangeMap.asMapOfRanges().values()) { - subprotocols.get(subProtocol).handleNewPeerConnection(id); - } + AsyncCompletion allSubProtocols = AsyncCompletion + .allOf( + subprotocolRangeMap + .asMapOfRanges() + .values() + .stream() + .map(subprotocols::get) + .map(handler -> handler.handleNewPeerConnection(id))); + allSubProtocols.thenRun(() -> ready.complete(id)); return; } else if (message.messageId() == 1) { DisconnectMessage.read(message.content()); disconnectHandler.run(); + if (!ready.isDone()) { + ready.cancel(); + } return; } @@ -148,23 +164,42 @@ public final class DefaultWireConnection implements WireConnection { } else { Map.Entry<Range<Integer>, SubProtocol> subProtocolEntry = subprotocolRangeMap.getEntry(message.messageId()); if (subProtocolEntry == null) { + logger.debug("Unknown message received {}", message.messageId()); disconnect(DisconnectReason.PROTOCOL_BREACH); + if (!ready.isDone()) { + ready.cancel(); + } } else { int offset = subProtocolEntry.getKey().lowerEndpoint(); + logger.debug("Received message of type {}", message.messageId() - offset); subprotocols.get(subProtocolEntry.getValue()).handle(id, message.messageId() - offset, message.content()); } } } private void initSupportedRange(List<Capability> capabilities) { - int startRange = 17; + int startRange = 16; + Map<String, Capability> pickedCapabilities = new HashMap<>(); + for (SubProtocol sp : subprotocols.keySet()) { + for (Capability cap : capabilities) { + if (sp.supports(SubProtocolIdentifier.of(cap.name(), cap.version()))) { + Capability oldPick = pickedCapabilities.get(cap.name()); + if (oldPick == null || oldPick.version() < cap.version()) { + pickedCapabilities.put(cap.name(), cap); + } + } + } + } + for (Capability cap : capabilities) { + if (!pickedCapabilities.containsValue(cap)) { + continue; + } for (SubProtocol sp : subprotocols.keySet()) { if (sp.supports(SubProtocolIdentifier.of(cap.name(), cap.version()))) { int numberOfMessageTypes = sp.versionRange(cap.version()); - subprotocolRangeMap - .put(Range.range(startRange, BoundType.CLOSED, startRange + numberOfMessageTypes, BoundType.CLOSED), sp); - startRange += numberOfMessageTypes + 1; + subprotocolRangeMap.put(Range.closedOpen(startRange, startRange + numberOfMessageTypes), sp); + startRange += numberOfMessageTypes; break; } } @@ -209,7 +244,12 @@ public final class DefaultWireConnection implements WireConnection { subprotocols .keySet() .stream() - .map(sp -> new Capability(sp.id().name(), sp.id().version())) + .map(SubProtocol::getCapabilities) + .flatMap(subProtocolIdentifiers -> subProtocolIdentifiers.stream()) + .map( + subProtocolIdentifier -> new Capability( + subProtocolIdentifier.name(), + subProtocolIdentifier.version())) .collect(Collectors.toList())); logger.debug("Sending hello message {}", myHelloMessage); writer.accept(new RLPxMessage(0, myHelloMessage.toBytes())); @@ -220,8 +260,9 @@ public final class DefaultWireConnection implements WireConnection { return id; } + @SuppressWarnings("CatchAndPrintStackTrace") public void sendMessage(SubProtocolIdentifier subProtocolIdentifier, int messageType, Bytes message) { - logger.debug("Sending sub-protocol message {}", message); + logger.debug("Sending sub-protocol message {} {}", messageType, message); Integer offset = null; for (Map.Entry<Range<Integer>, SubProtocol> entry : subprotocolRangeMap.asMapOfRanges().entrySet()) { if (entry.getValue().supports(subProtocolIdentifier)) { diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocol.java b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocol.java index 535a478..fd951de 100644 --- a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocol.java +++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocol.java @@ -15,6 +15,8 @@ package org.apache.tuweni.rlpx.wire; import org.apache.tuweni.rlpx.RLPxService; +import java.util.List; + /** * Defines a subprotocol to be used for wire connections */ @@ -46,4 +48,11 @@ public interface SubProtocol { * @return a new handler for the subprotocol, bound to the service. */ SubProtocolHandler createHandler(RLPxService service); + + /** + * Provides the capabilities supported by the subprotocol. + * + * @return the capabilities for this protocol, ordered. + */ + List<SubProtocolIdentifier> getCapabilities(); } diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocolIdentifier.java b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocolIdentifier.java index 0daf1d9..01402dd 100644 --- a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocolIdentifier.java +++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocolIdentifier.java @@ -12,12 +12,16 @@ */ package org.apache.tuweni.rlpx.wire; + +import static java.util.Objects.requireNonNull; + /** * Identifier of a subprotocol, comprised of a name and version. */ public interface SubProtocolIdentifier { static SubProtocolIdentifier of(String name, int version) { + requireNonNull(name); return new DefaultSubProtocolIdentifier(name, version); } diff --git a/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/DefaultWireConnectionTest.java b/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/DefaultWireConnectionTest.java index ad5ca8f..a9cd415 100644 --- a/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/DefaultWireConnectionTest.java +++ b/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/DefaultWireConnectionTest.java @@ -15,6 +15,7 @@ package org.apache.tuweni.rlpx.wire; import static org.junit.jupiter.api.Assertions.assertEquals; import org.apache.tuweni.bytes.Bytes; +import org.apache.tuweni.concurrent.AsyncResult; import org.apache.tuweni.crypto.SECP256K1; import org.apache.tuweni.junit.BouncyCastleExtension; import org.apache.tuweni.rlpx.RLPxMessage; @@ -38,7 +39,7 @@ class DefaultWireConnectionTest { DefaultWireConnection conn = new DefaultWireConnection("abc", nodeId, peerNodeId, capturedDisconnect::set, helloMessage -> { }, () -> { - }, new LinkedHashMap<>(), 3, "abc", 10000); + }, new LinkedHashMap<>(), 3, "abc", 10000, AsyncResult.incomplete()); conn.messageReceived(new RLPxMessage(45, Bytes.EMPTY)); assertEquals(1, capturedDisconnect.get().messageId()); @@ -53,7 +54,7 @@ class DefaultWireConnectionTest { DefaultWireConnection conn = new DefaultWireConnection("abc", nodeId, peerNodeId, capturedDisconnect::set, helloMessage -> { }, () -> { - }, new LinkedHashMap<>(), 4, "abc", 10000); + }, new LinkedHashMap<>(), 4, "abc", 10000, AsyncResult.incomplete()); conn.sendHello(); conn.messageReceived(new RLPxMessage(45, Bytes.EMPTY)); assertEquals(1, capturedDisconnect.get().messageId()); @@ -68,7 +69,7 @@ class DefaultWireConnectionTest { DefaultWireConnection conn = new DefaultWireConnection("abc", nodeId, peerNodeId, capturedDisconnect::set, helloMessage -> { }, () -> { - }, new LinkedHashMap<>(), 28, "abc", 10000); + }, new LinkedHashMap<>(), 28, "abc", 10000, AsyncResult.incomplete()); conn.sendHello(); conn .messageReceived( @@ -90,7 +91,7 @@ class DefaultWireConnectionTest { DefaultWireConnection conn = new DefaultWireConnection("abc", nodeId, peerNodeId, capturedDisconnect::set, helloMessage -> { }, () -> { - }, new LinkedHashMap<>(), 32, "abc", 10000); + }, new LinkedHashMap<>(), 32, "abc", 10000, AsyncResult.incomplete()); conn.sendHello(); conn .messageReceived( @@ -108,7 +109,7 @@ class DefaultWireConnectionTest { DefaultWireConnection conn = new DefaultWireConnection("abc", nodeId, peerNodeId, capturedDisconnect::set, helloMessage -> { }, () -> { - }, new LinkedHashMap<>(), 32, "abc", 10000); + }, new LinkedHashMap<>(), 32, "abc", 10000, AsyncResult.incomplete()); conn.sendHello(); conn .messageReceived( @@ -128,7 +129,7 @@ class DefaultWireConnectionTest { DefaultWireConnection conn = new DefaultWireConnection("abc", nodeId, nodeId, capturedDisconnect::set, helloMessage -> { }, () -> { - }, new LinkedHashMap<>(), 33, "abc", 10000); + }, new LinkedHashMap<>(), 33, "abc", 10000, AsyncResult.incomplete()); conn.sendHello(); conn .messageReceived( @@ -146,7 +147,7 @@ class DefaultWireConnectionTest { DefaultWireConnection conn = new DefaultWireConnection("abc", nodeId, peerNodeId, capturedDisconnect::set, helloMessage -> { }, () -> { - }, new LinkedHashMap<>(), 5, "abc", 10000); + }, new LinkedHashMap<>(), 5, "abc", 10000, AsyncResult.incomplete()); conn.sendHello(); conn .messageReceived( diff --git a/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/PingPongTest.java b/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/PingPongTest.java index 4769873..e72ca0a 100644 --- a/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/PingPongTest.java +++ b/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/PingPongTest.java @@ -19,6 +19,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import org.apache.tuweni.bytes.Bytes; import org.apache.tuweni.concurrent.AsyncCompletion; +import org.apache.tuweni.concurrent.AsyncResult; import org.apache.tuweni.crypto.SECP256K1; import org.apache.tuweni.junit.BouncyCastleExtension; import org.apache.tuweni.rlpx.RLPxMessage; @@ -51,7 +52,8 @@ class PingPongTest { new LinkedHashMap<>(), 2, "abc", - 10000); + 10000, + AsyncResult.incomplete()); AsyncCompletion completion = conn.sendPing(); assertFalse(completion.isDone()); @@ -67,7 +69,7 @@ class PingPongTest { DefaultWireConnection conn = new DefaultWireConnection("abc", nodeId, peerNodeId, capturedPong::set, helloMessage -> { }, () -> { - }, new LinkedHashMap<>(), 1, "abc", 10000); + }, new LinkedHashMap<>(), 1, "abc", 10000, AsyncResult.incomplete()); conn.messageReceived(new RLPxMessage(2, Bytes.EMPTY)); assertNotNull(capturedPong.get()); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
