This is an automated email from the ASF dual-hosted git repository.

toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git

commit 2bd29202c48d3336e3beab29ad142375c8f2c9a0
Author: Antoine Toulme <[email protected]>
AuthorDate: Fri May 29 23:14:14 2020 -0700

    various fixes to design of rlpx service, allow to return a future mapping 
to when the connection is complete
---
 .../kotlin/org/apache/tuweni/les/LESSubprotocol.kt |  2 +
 .../apache/tuweni/les/LESSubProtocolHandlerTest.kt |  3 +-
 .../java/org/apache/tuweni/rlpx/RLPxService.java   |  5 +-
 .../apache/tuweni/rlpx/vertx/VertxRLPxService.java | 31 ++++++----
 .../rlpx/wire/DefaultSubProtocolIdentifier.java    | 19 +++++-
 .../tuweni/rlpx/wire/DefaultWireConnection.java    | 67 +++++++++++++++++-----
 .../org/apache/tuweni/rlpx/wire/SubProtocol.java   |  9 +++
 .../tuweni/rlpx/wire/SubProtocolIdentifier.java    |  4 ++
 .../rlpx/wire/DefaultWireConnectionTest.java       | 15 ++---
 .../org/apache/tuweni/rlpx/wire/PingPongTest.java  |  6 +-
 10 files changed, 125 insertions(+), 36 deletions(-)

diff --git a/les/src/main/kotlin/org/apache/tuweni/les/LESSubprotocol.kt 
b/les/src/main/kotlin/org/apache/tuweni/les/LESSubprotocol.kt
index d79eb9a..726333f 100644
--- a/les/src/main/kotlin/org/apache/tuweni/les/LESSubprotocol.kt
+++ b/les/src/main/kotlin/org/apache/tuweni/les/LESSubprotocol.kt
@@ -51,6 +51,8 @@ class LESSubprotocol
    private val repo: BlockchainRepository
  ) : SubProtocol {
 
+  override fun getCapabilities(): MutableList<SubProtocolIdentifier> = 
mutableListOf(SubProtocolIdentifier.of("les", 2))
+
   override fun id(): SubProtocolIdentifier {
     return LES_ID
   }
diff --git 
a/les/src/test/kotlin/org/apache/tuweni/les/LESSubProtocolHandlerTest.kt 
b/les/src/test/kotlin/org/apache/tuweni/les/LESSubProtocolHandlerTest.kt
index 3619b12..89a1f02 100644
--- a/les/src/test/kotlin/org/apache/tuweni/les/LESSubProtocolHandlerTest.kt
+++ b/les/src/test/kotlin/org/apache/tuweni/les/LESSubProtocolHandlerTest.kt
@@ -44,6 +44,7 @@ import org.apache.tuweni.units.bigints.UInt256
 import org.apache.tuweni.units.ethereum.Gas
 import org.apache.tuweni.units.ethereum.Wei
 import org.apache.lucene.index.IndexWriter
+import org.apache.tuweni.concurrent.AsyncResult
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.Assertions.assertNotNull
 import org.junit.jupiter.api.Assertions.assertThrows
@@ -97,7 +98,7 @@ constructor() {
     var message: Bytes? = null
     var disconnectReason: DisconnectReason? = null
 
-    override fun connectTo(peerPublicKey: SECP256K1.PublicKey, peerAddress: 
InetSocketAddress): AsyncCompletion? {
+    override fun connectTo(peerPublicKey: SECP256K1.PublicKey, peerAddress: 
InetSocketAddress): AsyncResult<String>? {
       return null
     }
 
diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/RLPxService.java 
b/rlpx/src/main/java/org/apache/tuweni/rlpx/RLPxService.java
index f4ecce9..f5e3b7c 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/RLPxService.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/RLPxService.java
@@ -14,6 +14,7 @@ package org.apache.tuweni.rlpx;
 
 import org.apache.tuweni.bytes.Bytes;
 import org.apache.tuweni.concurrent.AsyncCompletion;
+import org.apache.tuweni.concurrent.AsyncResult;
 import org.apache.tuweni.crypto.SECP256K1;
 import org.apache.tuweni.rlpx.wire.DisconnectReason;
 import org.apache.tuweni.rlpx.wire.SubProtocolIdentifier;
@@ -30,9 +31,9 @@ public interface RLPxService {
    *
    * @param peerPublicKey the peer public key
    * @param peerAddress the peer host and port
-   * @return a handle that completes if the peer connects successfully.
+   * @return a handle that completes if the peer connects successfully, 
providing the connection ID.
    */
-  AsyncCompletion connectTo(SECP256K1.PublicKey peerPublicKey, 
InetSocketAddress peerAddress);
+  AsyncResult<String> connectTo(SECP256K1.PublicKey peerPublicKey, 
InetSocketAddress peerAddress);
 
 
   /**
diff --git 
a/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java 
b/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java
index 5c72044..e96e7c5 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/vertx/VertxRLPxService.java
@@ -15,7 +15,9 @@ package org.apache.tuweni.rlpx.vertx;
 import org.apache.tuweni.bytes.Bytes;
 import org.apache.tuweni.bytes.Bytes32;
 import org.apache.tuweni.concurrent.AsyncCompletion;
+import org.apache.tuweni.concurrent.AsyncResult;
 import org.apache.tuweni.concurrent.CompletableAsyncCompletion;
+import org.apache.tuweni.concurrent.CompletableAsyncResult;
 import org.apache.tuweni.crypto.SECP256K1.KeyPair;
 import org.apache.tuweni.crypto.SECP256K1.PublicKey;
 import org.apache.tuweni.rlpx.HandshakeMessage;
@@ -202,7 +204,7 @@ public final class VertxRLPxService implements RLPxService {
   }
 
   private void receiveMessage(NetSocket netSocket) {
-    netSocket.handler(new Handler<Buffer>() {
+    netSocket.handler(new Handler<>() {
 
       private RLPxConnection conn;
 
@@ -217,8 +219,7 @@ public final class VertxRLPxService implements RLPxService {
                   keyPair,
                   bytes -> 
netSocket.write(Buffer.buffer(bytes.toArrayUnsafe())));
           if (wireConnection == null) {
-            this.wireConnection = createConnection(conn, netSocket);
-            wireConnection.handleConnectionStart();
+            this.wireConnection = createConnection(conn, netSocket, 
AsyncResult.incomplete());
           }
         } else {
           conn.stream(Bytes.wrapBuffer(buffer), 
wireConnection::messageReceived);
@@ -283,11 +284,11 @@ public final class VertxRLPxService implements 
RLPxService {
   }
 
   @Override
-  public AsyncCompletion connectTo(PublicKey peerPublicKey, InetSocketAddress 
peerAddress) {
+  public AsyncResult<String> connectTo(PublicKey peerPublicKey, 
InetSocketAddress peerAddress) {
     if (!started.get()) {
       throw new IllegalStateException("The RLPx service is not active");
     }
-    CompletableAsyncCompletion connected = AsyncCompletion.incomplete();
+    CompletableAsyncResult<String> connected = AsyncResult.incomplete();
     logger.debug("Connecting to {} with public key {}", peerAddress, 
peerPublicKey);
     client
         .connect(
@@ -300,7 +301,14 @@ public final class VertxRLPxService implements RLPxService 
{
               logger.debug("Initiating handshake to {}", peerAddress);
               
netSocket.write(Buffer.buffer(initHandshakeMessage.toArrayUnsafe()));
 
-              netSocket.handler(new Handler<Buffer>() {
+              netSocket.closeHandler(event -> {
+                logger.debug("Connection {} closed", peerAddress);
+                if (!connected.isDone()) {
+                  connected.cancel();
+                }
+              });
+
+              netSocket.handler(new Handler<>() {
 
                 private RLPxConnection conn;
 
@@ -331,8 +339,7 @@ public final class VertxRLPxService implements RLPxService {
                               keyPair.publicKey(),
                               peerPublicKey);
 
-                      this.wireConnection = createConnection(conn, netSocket);
-                      connected.complete();
+                      this.wireConnection = createConnection(conn, netSocket, 
connected);
                       if (messageBytes.isEmpty()) {
                         return;
                       }
@@ -352,7 +359,10 @@ public final class VertxRLPxService implements RLPxService 
{
     return connected;
   }
 
-  private DefaultWireConnection createConnection(RLPxConnection conn, 
NetSocket netSocket) {
+  private DefaultWireConnection createConnection(
+      RLPxConnection conn,
+      NetSocket netSocket,
+      CompletableAsyncResult<String> ready) {
     String id = UUID.randomUUID().toString();
     DefaultWireConnection wireConnection =
         new DefaultWireConnection(id, conn.publicKey().bytes(), 
conn.peerPublicKey().bytes(), message -> {
@@ -360,8 +370,9 @@ public final class VertxRLPxService implements RLPxService {
             Bytes bytes = conn.write(message);
             vertx.eventBus().send(netSocket.writeHandlerID(), 
Buffer.buffer(bytes.toArrayUnsafe()));
           }
-        }, conn::configureAfterHandshake, netSocket::end, handlers, 
DEVP2P_VERSION, clientId, advertisedPort());
+        }, conn::configureAfterHandshake, netSocket::end, handlers, 
DEVP2P_VERSION, clientId, advertisedPort(), ready);
     repository.add(wireConnection);
+    wireConnection.handleConnectionStart();
     return wireConnection;
   }
 
diff --git 
a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultSubProtocolIdentifier.java
 
b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultSubProtocolIdentifier.java
index c67447d..85017d8 100644
--- 
a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultSubProtocolIdentifier.java
+++ 
b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultSubProtocolIdentifier.java
@@ -12,6 +12,8 @@
  */
 package org.apache.tuweni.rlpx.wire;
 
+import java.util.Objects;
+
 /**
  * Default implementation of a sub protocol identifier
  */
@@ -20,7 +22,7 @@ final class DefaultSubProtocolIdentifier implements 
SubProtocolIdentifier {
   private final String name;
   private final int version;
 
-  public DefaultSubProtocolIdentifier(String name, int version) {
+  DefaultSubProtocolIdentifier(String name, int version) {
     this.name = name;
     this.version = version;
   }
@@ -34,4 +36,19 @@ final class DefaultSubProtocolIdentifier implements 
SubProtocolIdentifier {
   public int version() {
     return version;
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o)
+      return true;
+    if (o == null || getClass() != o.getClass())
+      return false;
+    DefaultSubProtocolIdentifier that = (DefaultSubProtocolIdentifier) o;
+    return version == that.version && name.equals(that.name);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(name, version);
+  }
 }
diff --git 
a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java 
b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java
index f33c6d6..3a246c4 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java
@@ -15,15 +15,16 @@ package org.apache.tuweni.rlpx.wire;
 import org.apache.tuweni.bytes.Bytes;
 import org.apache.tuweni.concurrent.AsyncCompletion;
 import org.apache.tuweni.concurrent.CompletableAsyncCompletion;
+import org.apache.tuweni.concurrent.CompletableAsyncResult;
 import org.apache.tuweni.rlpx.RLPxMessage;
 
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
-import com.google.common.collect.BoundType;
 import com.google.common.collect.Range;
 import com.google.common.collect.RangeMap;
 import com.google.common.collect.TreeRangeMap;
@@ -47,6 +48,7 @@ public final class DefaultWireConnection implements 
WireConnection {
   private final int p2pVersion;
   private final String clientId;
   private final int advertisedPort;
+  private final CompletableAsyncResult<String> ready;
 
   private CompletableAsyncCompletion awaitingPong;
   private HelloMessage myHelloMessage;
@@ -59,7 +61,6 @@ public final class DefaultWireConnection implements 
WireConnection {
    * @param id the id of the connection
    * @param nodeId the node id of this node
    * @param peerNodeId the node id of the peer
-   * @param logger a logger
    * @param writer the message writer
    * @param afterHandshakeListener a listener called after the handshake is 
complete with the peer hello message.
    * @param disconnectHandler the handler to run upon receiving a disconnect 
message
@@ -67,6 +68,7 @@ public final class DefaultWireConnection implements 
WireConnection {
    * @param p2pVersion the version of the devp2p protocol supported by this 
client
    * @param clientId the client ID to announce in HELLO messages
    * @param advertisedPort the port we listen to, to announce in HELLO messages
+   * @param ready a handle to complete when the connection is ready for use.
    */
   public DefaultWireConnection(
       String id,
@@ -78,7 +80,8 @@ public final class DefaultWireConnection implements 
WireConnection {
       LinkedHashMap<SubProtocol, SubProtocolHandler> subprotocols,
       int p2pVersion,
       String clientId,
-      int advertisedPort) {
+      int advertisedPort,
+      CompletableAsyncResult<String> ready) {
     this.id = id;
     this.nodeId = nodeId;
     this.peerNodeId = peerNodeId;
@@ -89,6 +92,7 @@ public final class DefaultWireConnection implements 
WireConnection {
     this.p2pVersion = p2pVersion;
     this.clientId = clientId;
     this.advertisedPort = advertisedPort;
+    this.ready = ready;
     logger.debug("New wire connection created");
   }
 
@@ -97,24 +101,27 @@ public final class DefaultWireConnection implements 
WireConnection {
       peerHelloMessage = HelloMessage.read(message.content());
       logger.debug("Received peer Hello message {}", peerHelloMessage);
       initSupportedRange(peerHelloMessage.capabilities());
-
       if (peerHelloMessage.nodeId() == null || 
peerHelloMessage.nodeId().isEmpty()) {
         disconnect(DisconnectReason.NULL_NODE_IDENTITY_RECEIVED);
+        ready.cancel();
         return;
       }
 
       if (!peerHelloMessage.nodeId().equals(peerNodeId)) {
         disconnect(DisconnectReason.UNEXPECTED_IDENTITY);
+        ready.cancel();
         return;
       }
 
       if (peerHelloMessage.nodeId().equals(nodeId)) {
         disconnect(DisconnectReason.CONNECTED_TO_SELF);
+        ready.cancel();
         return;
       }
 
       if (peerHelloMessage.p2pVersion() > p2pVersion) {
         disconnect(DisconnectReason.INCOMPATIBLE_DEVP2P_VERSION);
+        ready.cancel();
         return;
       }
 
@@ -124,13 +131,22 @@ public final class DefaultWireConnection implements 
WireConnection {
 
       afterHandshakeListener.accept(peerHelloMessage);
 
-      for (SubProtocol subProtocol : 
subprotocolRangeMap.asMapOfRanges().values()) {
-        subprotocols.get(subProtocol).handleNewPeerConnection(id);
-      }
+      AsyncCompletion allSubProtocols = AsyncCompletion
+          .allOf(
+              subprotocolRangeMap
+                  .asMapOfRanges()
+                  .values()
+                  .stream()
+                  .map(subprotocols::get)
+                  .map(handler -> handler.handleNewPeerConnection(id)));
+      allSubProtocols.thenRun(() -> ready.complete(id));
       return;
     } else if (message.messageId() == 1) {
       DisconnectMessage.read(message.content());
       disconnectHandler.run();
+      if (!ready.isDone()) {
+        ready.cancel();
+      }
       return;
     }
 
@@ -148,23 +164,42 @@ public final class DefaultWireConnection implements 
WireConnection {
     } else {
       Map.Entry<Range<Integer>, SubProtocol> subProtocolEntry = 
subprotocolRangeMap.getEntry(message.messageId());
       if (subProtocolEntry == null) {
+        logger.debug("Unknown message received {}", message.messageId());
         disconnect(DisconnectReason.PROTOCOL_BREACH);
+        if (!ready.isDone()) {
+          ready.cancel();
+        }
       } else {
         int offset = subProtocolEntry.getKey().lowerEndpoint();
+        logger.debug("Received message of type {}", message.messageId() - 
offset);
         subprotocols.get(subProtocolEntry.getValue()).handle(id, 
message.messageId() - offset, message.content());
       }
     }
   }
 
   private void initSupportedRange(List<Capability> capabilities) {
-    int startRange = 17;
+    int startRange = 16;
+    Map<String, Capability> pickedCapabilities = new HashMap<>();
+    for (SubProtocol sp : subprotocols.keySet()) {
+      for (Capability cap : capabilities) {
+        if (sp.supports(SubProtocolIdentifier.of(cap.name(), cap.version()))) {
+          Capability oldPick = pickedCapabilities.get(cap.name());
+          if (oldPick == null || oldPick.version() < cap.version()) {
+            pickedCapabilities.put(cap.name(), cap);
+          }
+        }
+      }
+    }
+
     for (Capability cap : capabilities) {
+      if (!pickedCapabilities.containsValue(cap)) {
+        continue;
+      }
       for (SubProtocol sp : subprotocols.keySet()) {
         if (sp.supports(SubProtocolIdentifier.of(cap.name(), cap.version()))) {
           int numberOfMessageTypes = sp.versionRange(cap.version());
-          subprotocolRangeMap
-              .put(Range.range(startRange, BoundType.CLOSED, startRange + 
numberOfMessageTypes, BoundType.CLOSED), sp);
-          startRange += numberOfMessageTypes + 1;
+          subprotocolRangeMap.put(Range.closedOpen(startRange, startRange + 
numberOfMessageTypes), sp);
+          startRange += numberOfMessageTypes;
           break;
         }
       }
@@ -209,7 +244,12 @@ public final class DefaultWireConnection implements 
WireConnection {
             subprotocols
                 .keySet()
                 .stream()
-                .map(sp -> new Capability(sp.id().name(), sp.id().version()))
+                .map(SubProtocol::getCapabilities)
+                .flatMap(subProtocolIdentifiers -> 
subProtocolIdentifiers.stream())
+                .map(
+                    subProtocolIdentifier -> new Capability(
+                        subProtocolIdentifier.name(),
+                        subProtocolIdentifier.version()))
                 .collect(Collectors.toList()));
     logger.debug("Sending hello message {}", myHelloMessage);
     writer.accept(new RLPxMessage(0, myHelloMessage.toBytes()));
@@ -220,8 +260,9 @@ public final class DefaultWireConnection implements 
WireConnection {
     return id;
   }
 
+  @SuppressWarnings("CatchAndPrintStackTrace")
   public void sendMessage(SubProtocolIdentifier subProtocolIdentifier, int 
messageType, Bytes message) {
-    logger.debug("Sending sub-protocol message {}", message);
+    logger.debug("Sending sub-protocol message {} {}", messageType, message);
     Integer offset = null;
     for (Map.Entry<Range<Integer>, SubProtocol> entry : 
subprotocolRangeMap.asMapOfRanges().entrySet()) {
       if (entry.getValue().supports(subProtocolIdentifier)) {
diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocol.java 
b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocol.java
index 535a478..fd951de 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocol.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocol.java
@@ -15,6 +15,8 @@ package org.apache.tuweni.rlpx.wire;
 
 import org.apache.tuweni.rlpx.RLPxService;
 
+import java.util.List;
+
 /**
  * Defines a subprotocol to be used for wire connections
  */
@@ -46,4 +48,11 @@ public interface SubProtocol {
    * @return a new handler for the subprotocol, bound to the service.
    */
   SubProtocolHandler createHandler(RLPxService service);
+
+  /**
+   * Provides the capabilities supported by the subprotocol.
+   * 
+   * @return the capabilities for this protocol, ordered.
+   */
+  List<SubProtocolIdentifier> getCapabilities();
 }
diff --git 
a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocolIdentifier.java 
b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocolIdentifier.java
index 0daf1d9..01402dd 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocolIdentifier.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/SubProtocolIdentifier.java
@@ -12,12 +12,16 @@
  */
 package org.apache.tuweni.rlpx.wire;
 
+
+import static java.util.Objects.requireNonNull;
+
 /**
  * Identifier of a subprotocol, comprised of a name and version.
  */
 public interface SubProtocolIdentifier {
 
   static SubProtocolIdentifier of(String name, int version) {
+    requireNonNull(name);
     return new DefaultSubProtocolIdentifier(name, version);
   }
 
diff --git 
a/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/DefaultWireConnectionTest.java 
b/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/DefaultWireConnectionTest.java
index ad5ca8f..a9cd415 100644
--- 
a/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/DefaultWireConnectionTest.java
+++ 
b/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/DefaultWireConnectionTest.java
@@ -15,6 +15,7 @@ package org.apache.tuweni.rlpx.wire;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 import org.apache.tuweni.bytes.Bytes;
+import org.apache.tuweni.concurrent.AsyncResult;
 import org.apache.tuweni.crypto.SECP256K1;
 import org.apache.tuweni.junit.BouncyCastleExtension;
 import org.apache.tuweni.rlpx.RLPxMessage;
@@ -38,7 +39,7 @@ class DefaultWireConnectionTest {
     DefaultWireConnection conn =
         new DefaultWireConnection("abc", nodeId, peerNodeId, 
capturedDisconnect::set, helloMessage -> {
         }, () -> {
-        }, new LinkedHashMap<>(), 3, "abc", 10000);
+        }, new LinkedHashMap<>(), 3, "abc", 10000, AsyncResult.incomplete());
 
     conn.messageReceived(new RLPxMessage(45, Bytes.EMPTY));
     assertEquals(1, capturedDisconnect.get().messageId());
@@ -53,7 +54,7 @@ class DefaultWireConnectionTest {
     DefaultWireConnection conn =
         new DefaultWireConnection("abc", nodeId, peerNodeId, 
capturedDisconnect::set, helloMessage -> {
         }, () -> {
-        }, new LinkedHashMap<>(), 4, "abc", 10000);
+        }, new LinkedHashMap<>(), 4, "abc", 10000, AsyncResult.incomplete());
     conn.sendHello();
     conn.messageReceived(new RLPxMessage(45, Bytes.EMPTY));
     assertEquals(1, capturedDisconnect.get().messageId());
@@ -68,7 +69,7 @@ class DefaultWireConnectionTest {
     DefaultWireConnection conn =
         new DefaultWireConnection("abc", nodeId, peerNodeId, 
capturedDisconnect::set, helloMessage -> {
         }, () -> {
-        }, new LinkedHashMap<>(), 28, "abc", 10000);
+        }, new LinkedHashMap<>(), 28, "abc", 10000, AsyncResult.incomplete());
     conn.sendHello();
     conn
         .messageReceived(
@@ -90,7 +91,7 @@ class DefaultWireConnectionTest {
     DefaultWireConnection conn =
         new DefaultWireConnection("abc", nodeId, peerNodeId, 
capturedDisconnect::set, helloMessage -> {
         }, () -> {
-        }, new LinkedHashMap<>(), 32, "abc", 10000);
+        }, new LinkedHashMap<>(), 32, "abc", 10000, AsyncResult.incomplete());
     conn.sendHello();
     conn
         .messageReceived(
@@ -108,7 +109,7 @@ class DefaultWireConnectionTest {
     DefaultWireConnection conn =
         new DefaultWireConnection("abc", nodeId, peerNodeId, 
capturedDisconnect::set, helloMessage -> {
         }, () -> {
-        }, new LinkedHashMap<>(), 32, "abc", 10000);
+        }, new LinkedHashMap<>(), 32, "abc", 10000, AsyncResult.incomplete());
     conn.sendHello();
     conn
         .messageReceived(
@@ -128,7 +129,7 @@ class DefaultWireConnectionTest {
     DefaultWireConnection conn =
         new DefaultWireConnection("abc", nodeId, nodeId, 
capturedDisconnect::set, helloMessage -> {
         }, () -> {
-        }, new LinkedHashMap<>(), 33, "abc", 10000);
+        }, new LinkedHashMap<>(), 33, "abc", 10000, AsyncResult.incomplete());
     conn.sendHello();
     conn
         .messageReceived(
@@ -146,7 +147,7 @@ class DefaultWireConnectionTest {
     DefaultWireConnection conn =
         new DefaultWireConnection("abc", nodeId, peerNodeId, 
capturedDisconnect::set, helloMessage -> {
         }, () -> {
-        }, new LinkedHashMap<>(), 5, "abc", 10000);
+        }, new LinkedHashMap<>(), 5, "abc", 10000, AsyncResult.incomplete());
     conn.sendHello();
     conn
         .messageReceived(
diff --git a/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/PingPongTest.java 
b/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/PingPongTest.java
index 4769873..e72ca0a 100644
--- a/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/PingPongTest.java
+++ b/rlpx/src/test/java/org/apache/tuweni/rlpx/wire/PingPongTest.java
@@ -19,6 +19,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import org.apache.tuweni.bytes.Bytes;
 import org.apache.tuweni.concurrent.AsyncCompletion;
+import org.apache.tuweni.concurrent.AsyncResult;
 import org.apache.tuweni.crypto.SECP256K1;
 import org.apache.tuweni.junit.BouncyCastleExtension;
 import org.apache.tuweni.rlpx.RLPxMessage;
@@ -51,7 +52,8 @@ class PingPongTest {
         new LinkedHashMap<>(),
         2,
         "abc",
-        10000);
+        10000,
+        AsyncResult.incomplete());
 
     AsyncCompletion completion = conn.sendPing();
     assertFalse(completion.isDone());
@@ -67,7 +69,7 @@ class PingPongTest {
     DefaultWireConnection conn =
         new DefaultWireConnection("abc", nodeId, peerNodeId, 
capturedPong::set, helloMessage -> {
         }, () -> {
-        }, new LinkedHashMap<>(), 1, "abc", 10000);
+        }, new LinkedHashMap<>(), 1, "abc", 10000, AsyncResult.incomplete());
 
     conn.messageReceived(new RLPxMessage(2, Bytes.EMPTY));
     assertNotNull(capturedPong.get());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to