This is an automated email from the ASF dual-hosted git repository.
toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
The following commit(s) were added to refs/heads/master by this push:
new 9a90492 Add test covering basic ethstats functions
new a37d34b Merge pull request #117 from atoulme/ethstats_testing
9a90492 is described below
commit 9a90492a2c0fb080bf0a21ee694c80945709f236
Author: Antoine Toulme <[email protected]>
AuthorDate: Sun Jul 5 13:13:50 2020 -0700
Add test covering basic ethstats functions
---
.../tuweni/ethstats/EthStatsReporterTest.java | 82 +++++++---------------
.../apache/tuweni/ethstats/FakeEthStatsServer.java | 76 ++++++++++++++++++++
.../apache/tuweni/ethstats/EthStatsReporter.java | 42 +++++++----
3 files changed, 127 insertions(+), 73 deletions(-)
diff --git
a/ethstats/src/integrationTest/java/org/apache/tuweni/ethstats/EthStatsReporterTest.java
b/ethstats/src/integrationTest/java/org/apache/tuweni/ethstats/EthStatsReporterTest.java
index de3884d..dd4a3d1 100644
---
a/ethstats/src/integrationTest/java/org/apache/tuweni/ethstats/EthStatsReporterTest.java
+++
b/ethstats/src/integrationTest/java/org/apache/tuweni/ethstats/EthStatsReporterTest.java
@@ -12,8 +12,13 @@
*/
package org.apache.tuweni.ethstats;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
+import org.apache.tuweni.concurrent.AsyncResult;
import org.apache.tuweni.eth.Address;
import org.apache.tuweni.eth.Hash;
import org.apache.tuweni.junit.VertxExtension;
@@ -21,6 +26,7 @@ import org.apache.tuweni.junit.VertxInstance;
import org.apache.tuweni.units.bigints.UInt256;
import java.net.URI;
+import java.time.Instant;
import java.util.Collections;
import io.vertx.core.Vertx;
@@ -30,13 +36,15 @@ import org.junit.jupiter.api.extension.ExtendWith;
@ExtendWith(VertxExtension.class)
public class EthStatsReporterTest {
- //@Disabled
@Test
void testConnectToLocalEthStats(@VertxInstance Vertx vertx) throws
InterruptedException {
+ Instant now = Instant.EPOCH;
+ FakeEthStatsServer server = new FakeEthStatsServer(vertx, "127.0.0.1", 0);
EthStatsReporter reporter = new EthStatsReporter(
vertx,
- Collections.singletonList(URI.create("ws://localhost:3000/api")),
+ "foo",
+ Collections.singletonList(URI.create("ws://localhost:" +
server.getPort() + "/api")),
"wat",
"name",
"node",
@@ -46,9 +54,17 @@ public class EthStatsReporterTest {
"Windoz",
"64",
(blockNumbers) -> {
- });
+ },
+ now::toEpochMilli);
+ AsyncResult<String> nextMessage = server.captureNextMessage();
+ reporter.start().join();
+ assertNotNull(server.getWebsocket());
+ assertEquals(
+
"{\"emit\":[\"hello\",{\"info\":{\"name\":\"name\",\"node\":\"node\",\"port\":33030,\"net\":\"10\",\"protocol\":\"eth/63\",\"api\":\"No\",\"os\":\"Windoz\",\"client\":\"0.1.0\",\"os_v\":\"64\",\"canUpdateHistory\":true},\"secret\":\"wat\",\"id\":\"foo\"}]}",
+ nextMessage.get());
+ nextMessage = server.captureNextMessage();
reporter
.sendNewHead(
new BlockStats(
@@ -65,64 +81,14 @@ public class EthStatsReporterTest {
Hash.fromBytes(Bytes32.random()),
Hash.fromBytes(Bytes32.random()),
Collections.emptyList()));
-
+
assertTrue(nextMessage.get().startsWith("{\"emit\":[\"block\",{\"block\""),
nextMessage.get());
+ nextMessage = server.captureNextMessage();
reporter.sendNewNodeStats(new NodeStats(true, false, true, 42, 9, 4000,
100));
+
assertTrue(nextMessage.get().startsWith("{\"emit\":[\"stats\",{\"stats\":"),
nextMessage.get());
+ nextMessage = server.captureNextMessage();
reporter.sendNewPendingTransactionCount(42);
- reporter.start();
+
assertEquals("{\"emit\":[\"pending\",{\"stats\":{\"pending\":42},\"id\":\"foo\"}]}",
nextMessage.get());
- Thread.sleep(1000);
- reporter
- .sendNewHead(
- new BlockStats(
- UInt256.valueOf(2),
- Hash.fromBytes(Bytes32.random()),
- Hash.fromBytes(Bytes32.random()),
- 3L,
- Address.fromBytes(Bytes.random(20)),
- 42L,
- 43,
- UInt256.valueOf(42L),
- UInt256.valueOf(84L),
- Collections.emptyList(),
- Hash.fromBytes(Bytes32.random()),
- Hash.fromBytes(Bytes32.random()),
- Collections.emptyList()));
- Thread.sleep(1000);
- Thread.sleep(1000);
- reporter
- .sendNewHead(
- new BlockStats(
- UInt256.valueOf(3),
- Hash.fromBytes(Bytes32.random()),
- Hash.fromBytes(Bytes32.random()),
- 3L,
- Address.fromBytes(Bytes.random(20)),
- 42L,
- 43,
- UInt256.valueOf(42L),
- UInt256.valueOf(84L),
- Collections.emptyList(),
- Hash.fromBytes(Bytes32.random()),
- Hash.fromBytes(Bytes32.random()),
- Collections.emptyList()));
- Thread.sleep(1000);
- reporter
- .sendNewHead(
- new BlockStats(
- UInt256.valueOf(4),
- Hash.fromBytes(Bytes32.random()),
- Hash.fromBytes(Bytes32.random()),
- 3L,
- Address.fromBytes(Bytes.random(20)),
- 42L,
- 43,
- UInt256.valueOf(42L),
- UInt256.valueOf(84L),
- Collections.emptyList(),
- Hash.fromBytes(Bytes32.random()),
- Hash.fromBytes(Bytes32.random()),
- Collections.emptyList()));
- Thread.sleep(5000);
reporter.stop();
}
}
diff --git
a/ethstats/src/integrationTest/java/org/apache/tuweni/ethstats/FakeEthStatsServer.java
b/ethstats/src/integrationTest/java/org/apache/tuweni/ethstats/FakeEthStatsServer.java
new file mode 100644
index 0000000..d9b54b2
--- /dev/null
+++
b/ethstats/src/integrationTest/java/org/apache/tuweni/ethstats/FakeEthStatsServer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.tuweni.ethstats;
+
+import org.apache.tuweni.concurrent.AsyncCompletion;
+import org.apache.tuweni.concurrent.AsyncResult;
+import org.apache.tuweni.concurrent.CompletableAsyncCompletion;
+import org.apache.tuweni.concurrent.CompletableAsyncResult;
+
+import io.vertx.core.Vertx;
+import io.vertx.core.http.HttpServer;
+import io.vertx.core.http.ServerWebSocket;
+
+class FakeEthStatsServer {
+
+ private final HttpServer server;
+ private final String networkInterface;
+ private int port;
+ private ServerWebSocket websocket;
+ private CompletableAsyncResult<String> result;
+
+ FakeEthStatsServer(Vertx vertx, String networkInterface, int port) {
+ this.networkInterface = networkInterface;
+
+ server = vertx.createHttpServer();
+ server.websocketHandler(this::connect);
+ CompletableAsyncCompletion compl = AsyncCompletion.incomplete();
+ server.listen(port, networkInterface, result -> {
+ if (port == 0) {
+ FakeEthStatsServer.this.port = server.actualPort();
+ } else {
+ FakeEthStatsServer.this.port = port;
+ }
+ compl.complete();
+ });
+ try {
+ compl.join();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ void connect(ServerWebSocket serverWebSocket) {
+ this.websocket = serverWebSocket;
+ websocket.accept();
+ websocket.writeTextMessage("{\"emit\":[\"ready\"]}");
+ websocket.handler(buffer -> {
+ if (result != null) {
+ result.complete(buffer.toString());
+ }
+ });
+ }
+
+ public ServerWebSocket getWebsocket() {
+ return websocket;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public AsyncResult<String> captureNextMessage() {
+ result = AsyncResult.incomplete();
+ return result;
+ }
+}
diff --git
a/ethstats/src/main/java/org/apache/tuweni/ethstats/EthStatsReporter.java
b/ethstats/src/main/java/org/apache/tuweni/ethstats/EthStatsReporter.java
index 8d15608..bd90be2 100644
--- a/ethstats/src/main/java/org/apache/tuweni/ethstats/EthStatsReporter.java
+++ b/ethstats/src/main/java/org/apache/tuweni/ethstats/EthStatsReporter.java
@@ -12,6 +12,8 @@
*/
package org.apache.tuweni.ethstats;
+import org.apache.tuweni.concurrent.AsyncCompletion;
+import org.apache.tuweni.concurrent.CompletableAsyncCompletion;
import org.apache.tuweni.eth.EthJsonModule;
import org.apache.tuweni.units.bigints.UInt256;
@@ -24,10 +26,10 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
+import java.util.function.Supplier;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
@@ -75,6 +77,7 @@ public final class EthStatsReporter {
private final String secret;
private final AtomicReference<Integer> newTxCount = new AtomicReference<>();
private final Consumer<List<UInt256>> historyRequester;
+ private final Supplier<Long> timeSupplier;
private WorkerExecutor executor;
private HttpClient client;
@@ -86,6 +89,7 @@ public final class EthStatsReporter {
* Default constructor.
*
* @param vertx a Vert.x instance, externally managed.
+ * @param id the id of the ethstats reporter for communications
* @param ethstatsServerURIs the URIs to connect to eth-netstats, such as
ws://www.ethnetstats.org:3000/api. URIs are
* tried in sequence, and the first one to work is used.
* @param secret the secret to use when we connect to eth-netstats
@@ -97,9 +101,11 @@ public final class EthStatsReporter {
* @param os the operating system on which the node runs
* @param osVer the version of the OS on which the node runs
* @param historyRequester a hook for ethstats to request block information
by number.
+ * @param timeSupplier a function supplying time in milliseconds since epoch.
*/
public EthStatsReporter(
Vertx vertx,
+ String id,
List<URI> ethstatsServerURIs,
String secret,
String name,
@@ -109,21 +115,26 @@ public final class EthStatsReporter {
String protocol,
String os,
String osVer,
- Consumer<List<UInt256>> historyRequester) {
- this.id = UUID.randomUUID().toString();
+ Consumer<List<UInt256>> historyRequester,
+ Supplier<Long> timeSupplier) {
+ this.id = id;
this.vertx = vertx;
this.ethstatsServerURIs = ethstatsServerURIs;
this.secret = secret;
this.nodeInfo = new NodeInfo(name, node, port, network, protocol, os,
osVer);
this.historyRequester = historyRequester;
+ this.timeSupplier = timeSupplier;
}
- public void start() {
+ public AsyncCompletion start() {
if (started.compareAndSet(false, true)) {
executor = vertx.createSharedWorkerExecutor("ethnetstats");
client = vertx.createHttpClient(new
HttpClientOptions().setLogActivity(true));
- startInternal();
+ CompletableAsyncCompletion completion = AsyncCompletion.incomplete();
+ startInternal(completion);
+ return completion;
}
+ return AsyncCompletion.COMPLETED;
}
public void stop() {
@@ -149,24 +160,25 @@ public final class EthStatsReporter {
newHistory.set(blocks);
}
- private void startInternal() {
- AtomicBoolean connectedOK = new AtomicBoolean(false);
+ private void startInternal(CompletableAsyncCompletion completion) {
for (URI uri : ethstatsServerURIs) {
executor.executeBlocking((Future<Boolean> handler) -> connect(handler,
uri), result -> {
logger.debug("Attempting to connect", result.cause());
- connectedOK.set(!result.failed() && result.result());
+ if (result.succeeded() && result.result()) {
+ completion.complete();
+ }
});
- if (connectedOK.get()) {
+ if (completion.isDone()) {
break;
}
}
- if (!connectedOK.get() && started.get()) {
- attemptConnect(null);
+ if (!completion.isDone() && started.get()) {
+ attemptConnect(completion);
}
}
- private void attemptConnect(Void aVoid) {
- vertx.setTimer(DELAY, handler -> this.startInternal());
+ private void attemptConnect(CompletableAsyncCompletion completion) {
+ vertx.setTimer(DELAY, handler -> this.startInternal(completion));
}
private void connect(Future<Boolean> result, URI uri) {
@@ -177,7 +189,7 @@ public final class EthStatsReporter {
uri.toString(),
MultiMap.caseInsensitiveMultiMap().add("origin",
"http://localhost"),
ws -> {
- ws.closeHandler(this::attemptConnect);
+ ws.closeHandler((aVoid) ->
attemptConnect(AsyncCompletion.incomplete()));
ws.exceptionHandler(e -> {
logger.debug("Error while communicating with ethnetstats", e);
@@ -247,7 +259,7 @@ public final class EthStatsReporter {
private void writePing(WebSocket ws) {
waitingOnPong.set(true);
- writeCommand(ws, "node-ping", "clientTime", Instant.now().toEpochMilli());
+ writeCommand(ws, "node-ping", "clientTime", timeSupplier.get());
}
private void reportPeriodically(WebSocket ws) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]