This is an automated email from the ASF dual-hosted git repository.

toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a90492  Add test covering basic ethstats functions
     new a37d34b  Merge pull request #117 from atoulme/ethstats_testing
9a90492 is described below

commit 9a90492a2c0fb080bf0a21ee694c80945709f236
Author: Antoine Toulme <[email protected]>
AuthorDate: Sun Jul 5 13:13:50 2020 -0700

    Add test covering basic ethstats functions
---
 .../tuweni/ethstats/EthStatsReporterTest.java      | 82 +++++++---------------
 .../apache/tuweni/ethstats/FakeEthStatsServer.java | 76 ++++++++++++++++++++
 .../apache/tuweni/ethstats/EthStatsReporter.java   | 42 +++++++----
 3 files changed, 127 insertions(+), 73 deletions(-)

diff --git 
a/ethstats/src/integrationTest/java/org/apache/tuweni/ethstats/EthStatsReporterTest.java
 
b/ethstats/src/integrationTest/java/org/apache/tuweni/ethstats/EthStatsReporterTest.java
index de3884d..dd4a3d1 100644
--- 
a/ethstats/src/integrationTest/java/org/apache/tuweni/ethstats/EthStatsReporterTest.java
+++ 
b/ethstats/src/integrationTest/java/org/apache/tuweni/ethstats/EthStatsReporterTest.java
@@ -12,8 +12,13 @@
  */
 package org.apache.tuweni.ethstats;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
 import org.apache.tuweni.bytes.Bytes;
 import org.apache.tuweni.bytes.Bytes32;
+import org.apache.tuweni.concurrent.AsyncResult;
 import org.apache.tuweni.eth.Address;
 import org.apache.tuweni.eth.Hash;
 import org.apache.tuweni.junit.VertxExtension;
@@ -21,6 +26,7 @@ import org.apache.tuweni.junit.VertxInstance;
 import org.apache.tuweni.units.bigints.UInt256;
 
 import java.net.URI;
+import java.time.Instant;
 import java.util.Collections;
 
 import io.vertx.core.Vertx;
@@ -30,13 +36,15 @@ import org.junit.jupiter.api.extension.ExtendWith;
 @ExtendWith(VertxExtension.class)
 public class EthStatsReporterTest {
 
-  //@Disabled
   @Test
   void testConnectToLocalEthStats(@VertxInstance Vertx vertx) throws 
InterruptedException {
 
+    Instant now = Instant.EPOCH;
+    FakeEthStatsServer server = new FakeEthStatsServer(vertx, "127.0.0.1", 0);
     EthStatsReporter reporter = new EthStatsReporter(
         vertx,
-        Collections.singletonList(URI.create("ws://localhost:3000/api")),
+        "foo",
+        Collections.singletonList(URI.create("ws://localhost:" + 
server.getPort() + "/api")),
         "wat",
         "name",
         "node",
@@ -46,9 +54,17 @@ public class EthStatsReporterTest {
         "Windoz",
         "64",
         (blockNumbers) -> {
-        });
+        },
+        now::toEpochMilli);
 
+    AsyncResult<String> nextMessage = server.captureNextMessage();
+    reporter.start().join();
+    assertNotNull(server.getWebsocket());
+    assertEquals(
+        
"{\"emit\":[\"hello\",{\"info\":{\"name\":\"name\",\"node\":\"node\",\"port\":33030,\"net\":\"10\",\"protocol\":\"eth/63\",\"api\":\"No\",\"os\":\"Windoz\",\"client\":\"0.1.0\",\"os_v\":\"64\",\"canUpdateHistory\":true},\"secret\":\"wat\",\"id\":\"foo\"}]}",
+        nextMessage.get());
 
+    nextMessage = server.captureNextMessage();
     reporter
         .sendNewHead(
             new BlockStats(
@@ -65,64 +81,14 @@ public class EthStatsReporterTest {
                 Hash.fromBytes(Bytes32.random()),
                 Hash.fromBytes(Bytes32.random()),
                 Collections.emptyList()));
-
+    
assertTrue(nextMessage.get().startsWith("{\"emit\":[\"block\",{\"block\""), 
nextMessage.get());
+    nextMessage = server.captureNextMessage();
     reporter.sendNewNodeStats(new NodeStats(true, false, true, 42, 9, 4000, 
100));
+    
assertTrue(nextMessage.get().startsWith("{\"emit\":[\"stats\",{\"stats\":"), 
nextMessage.get());
+    nextMessage = server.captureNextMessage();
     reporter.sendNewPendingTransactionCount(42);
-    reporter.start();
+    
assertEquals("{\"emit\":[\"pending\",{\"stats\":{\"pending\":42},\"id\":\"foo\"}]}",
 nextMessage.get());
 
-    Thread.sleep(1000);
-    reporter
-        .sendNewHead(
-            new BlockStats(
-                UInt256.valueOf(2),
-                Hash.fromBytes(Bytes32.random()),
-                Hash.fromBytes(Bytes32.random()),
-                3L,
-                Address.fromBytes(Bytes.random(20)),
-                42L,
-                43,
-                UInt256.valueOf(42L),
-                UInt256.valueOf(84L),
-                Collections.emptyList(),
-                Hash.fromBytes(Bytes32.random()),
-                Hash.fromBytes(Bytes32.random()),
-                Collections.emptyList()));
-    Thread.sleep(1000);
-    Thread.sleep(1000);
-    reporter
-        .sendNewHead(
-            new BlockStats(
-                UInt256.valueOf(3),
-                Hash.fromBytes(Bytes32.random()),
-                Hash.fromBytes(Bytes32.random()),
-                3L,
-                Address.fromBytes(Bytes.random(20)),
-                42L,
-                43,
-                UInt256.valueOf(42L),
-                UInt256.valueOf(84L),
-                Collections.emptyList(),
-                Hash.fromBytes(Bytes32.random()),
-                Hash.fromBytes(Bytes32.random()),
-                Collections.emptyList()));
-    Thread.sleep(1000);
-    reporter
-        .sendNewHead(
-            new BlockStats(
-                UInt256.valueOf(4),
-                Hash.fromBytes(Bytes32.random()),
-                Hash.fromBytes(Bytes32.random()),
-                3L,
-                Address.fromBytes(Bytes.random(20)),
-                42L,
-                43,
-                UInt256.valueOf(42L),
-                UInt256.valueOf(84L),
-                Collections.emptyList(),
-                Hash.fromBytes(Bytes32.random()),
-                Hash.fromBytes(Bytes32.random()),
-                Collections.emptyList()));
-    Thread.sleep(5000);
     reporter.stop();
   }
 }
diff --git 
a/ethstats/src/integrationTest/java/org/apache/tuweni/ethstats/FakeEthStatsServer.java
 
b/ethstats/src/integrationTest/java/org/apache/tuweni/ethstats/FakeEthStatsServer.java
new file mode 100644
index 0000000..d9b54b2
--- /dev/null
+++ 
b/ethstats/src/integrationTest/java/org/apache/tuweni/ethstats/FakeEthStatsServer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.tuweni.ethstats;
+
+import org.apache.tuweni.concurrent.AsyncCompletion;
+import org.apache.tuweni.concurrent.AsyncResult;
+import org.apache.tuweni.concurrent.CompletableAsyncCompletion;
+import org.apache.tuweni.concurrent.CompletableAsyncResult;
+
+import io.vertx.core.Vertx;
+import io.vertx.core.http.HttpServer;
+import io.vertx.core.http.ServerWebSocket;
+
+class FakeEthStatsServer {
+
+  private final HttpServer server;
+  private final String networkInterface;
+  private int port;
+  private ServerWebSocket websocket;
+  private CompletableAsyncResult<String> result;
+
+  FakeEthStatsServer(Vertx vertx, String networkInterface, int port) {
+    this.networkInterface = networkInterface;
+
+    server = vertx.createHttpServer();
+    server.websocketHandler(this::connect);
+    CompletableAsyncCompletion compl = AsyncCompletion.incomplete();
+    server.listen(port, networkInterface, result -> {
+      if (port == 0) {
+        FakeEthStatsServer.this.port = server.actualPort();
+      } else {
+        FakeEthStatsServer.this.port = port;
+      }
+      compl.complete();
+    });
+    try {
+      compl.join();
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  void connect(ServerWebSocket serverWebSocket) {
+    this.websocket = serverWebSocket;
+    websocket.accept();
+    websocket.writeTextMessage("{\"emit\":[\"ready\"]}");
+    websocket.handler(buffer -> {
+      if (result != null) {
+        result.complete(buffer.toString());
+      }
+    });
+  }
+
+  public ServerWebSocket getWebsocket() {
+    return websocket;
+  }
+
+  public int getPort() {
+    return port;
+  }
+
+  public AsyncResult<String> captureNextMessage() {
+    result = AsyncResult.incomplete();
+    return result;
+  }
+}
diff --git 
a/ethstats/src/main/java/org/apache/tuweni/ethstats/EthStatsReporter.java 
b/ethstats/src/main/java/org/apache/tuweni/ethstats/EthStatsReporter.java
index 8d15608..bd90be2 100644
--- a/ethstats/src/main/java/org/apache/tuweni/ethstats/EthStatsReporter.java
+++ b/ethstats/src/main/java/org/apache/tuweni/ethstats/EthStatsReporter.java
@@ -12,6 +12,8 @@
  */
 package org.apache.tuweni.ethstats;
 
+import org.apache.tuweni.concurrent.AsyncCompletion;
+import org.apache.tuweni.concurrent.CompletableAsyncCompletion;
 import org.apache.tuweni.eth.EthJsonModule;
 import org.apache.tuweni.units.bigints.UInt256;
 
@@ -24,10 +26,10 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
+import java.util.function.Supplier;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
@@ -75,6 +77,7 @@ public final class EthStatsReporter {
   private final String secret;
   private final AtomicReference<Integer> newTxCount = new AtomicReference<>();
   private final Consumer<List<UInt256>> historyRequester;
+  private final Supplier<Long> timeSupplier;
 
   private WorkerExecutor executor;
   private HttpClient client;
@@ -86,6 +89,7 @@ public final class EthStatsReporter {
    * Default constructor.
    *
    * @param vertx a Vert.x instance, externally managed.
+   * @param id the id of the ethstats reporter for communications
    * @param ethstatsServerURIs the URIs to connect to eth-netstats, such as 
ws://www.ethnetstats.org:3000/api. URIs are
    *        tried in sequence, and the first one to work is used.
    * @param secret the secret to use when we connect to eth-netstats
@@ -97,9 +101,11 @@ public final class EthStatsReporter {
    * @param os the operating system on which the node runs
    * @param osVer the version of the OS on which the node runs
    * @param historyRequester a hook for ethstats to request block information 
by number.
+   * @param timeSupplier a function supplying time in milliseconds since epoch.
    */
   public EthStatsReporter(
       Vertx vertx,
+      String id,
       List<URI> ethstatsServerURIs,
       String secret,
       String name,
@@ -109,21 +115,26 @@ public final class EthStatsReporter {
       String protocol,
       String os,
       String osVer,
-      Consumer<List<UInt256>> historyRequester) {
-    this.id = UUID.randomUUID().toString();
+      Consumer<List<UInt256>> historyRequester,
+      Supplier<Long> timeSupplier) {
+    this.id = id;
     this.vertx = vertx;
     this.ethstatsServerURIs = ethstatsServerURIs;
     this.secret = secret;
     this.nodeInfo = new NodeInfo(name, node, port, network, protocol, os, 
osVer);
     this.historyRequester = historyRequester;
+    this.timeSupplier = timeSupplier;
   }
 
-  public void start() {
+  public AsyncCompletion start() {
     if (started.compareAndSet(false, true)) {
       executor = vertx.createSharedWorkerExecutor("ethnetstats");
       client = vertx.createHttpClient(new 
HttpClientOptions().setLogActivity(true));
-      startInternal();
+      CompletableAsyncCompletion completion = AsyncCompletion.incomplete();
+      startInternal(completion);
+      return completion;
     }
+    return AsyncCompletion.COMPLETED;
   }
 
   public void stop() {
@@ -149,24 +160,25 @@ public final class EthStatsReporter {
     newHistory.set(blocks);
   }
 
-  private void startInternal() {
-    AtomicBoolean connectedOK = new AtomicBoolean(false);
+  private void startInternal(CompletableAsyncCompletion completion) {
     for (URI uri : ethstatsServerURIs) {
       executor.executeBlocking((Future<Boolean> handler) -> connect(handler, 
uri), result -> {
         logger.debug("Attempting to connect", result.cause());
-        connectedOK.set(!result.failed() && result.result());
+        if (result.succeeded() && result.result()) {
+          completion.complete();
+        }
       });
-      if (connectedOK.get()) {
+      if (completion.isDone()) {
         break;
       }
     }
-    if (!connectedOK.get() && started.get()) {
-      attemptConnect(null);
+    if (!completion.isDone() && started.get()) {
+      attemptConnect(completion);
     }
   }
 
-  private void attemptConnect(Void aVoid) {
-    vertx.setTimer(DELAY, handler -> this.startInternal());
+  private void attemptConnect(CompletableAsyncCompletion completion) {
+    vertx.setTimer(DELAY, handler -> this.startInternal(completion));
   }
 
   private void connect(Future<Boolean> result, URI uri) {
@@ -177,7 +189,7 @@ public final class EthStatsReporter {
             uri.toString(),
             MultiMap.caseInsensitiveMultiMap().add("origin", 
"http://localhost";),
             ws -> {
-              ws.closeHandler(this::attemptConnect);
+              ws.closeHandler((aVoid) -> 
attemptConnect(AsyncCompletion.incomplete()));
               ws.exceptionHandler(e -> {
                 logger.debug("Error while communicating with ethnetstats", e);
 
@@ -247,7 +259,7 @@ public final class EthStatsReporter {
 
   private void writePing(WebSocket ws) {
     waitingOnPong.set(true);
-    writeCommand(ws, "node-ping", "clientTime", Instant.now().toEpochMilli());
+    writeCommand(ws, "node-ping", "clientTime", timeSupplier.get());
   }
 
   private void reportPeriodically(WebSocket ws) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to