This is an automated email from the ASF dual-hosted git repository.
toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
The following commit(s) were added to refs/heads/master by this push:
new 9e7a41c Add a module to create a complete gossip application
new 5aead93 Merge pull request #9 from atoulme/gossip_app
9e7a41c is described below
commit 9e7a41ce4f6e4ed4a443190b0728faac332856de
Author: Antoine Toulme <[email protected]>
AuthorDate: Fri May 3 17:18:48 2019 -0700
Add a module to create a complete gossip application
---
dependency-versions.gradle | 1 +
dist/build.gradle | 24 ++-
settings.gradle => gossip/build.gradle | 55 +++---
.../java/org/apache/tuweni/gossip/GossipApp.java | 189 ++++++++++++++++++++
.../tuweni/gossip/GossipCommandLineOptions.java | 193 +++++++++++++++++++++
.../gossip/GossipCommandLineOptionsTest.java | 94 ++++++++++
.../tuweni/gossip/GossipIntegrationTest.java | 129 ++++++++++++++
gradle/check-licenses.gradle | 1 +
.../tuweni/plumtree/EphemeralPeerRepository.java | 7 +-
.../tuweni/plumtree/vertx/VertxGossipServer.java | 75 ++++----
.../plumtree/vertx/VertxGossipServerTest.java | 4 +-
settings.gradle | 1 +
12 files changed, 700 insertions(+), 73 deletions(-)
diff --git a/dependency-versions.gradle b/dependency-versions.gradle
index 4bae852..b8b62fb 100644
--- a/dependency-versions.gradle
+++ b/dependency-versions.gradle
@@ -24,6 +24,7 @@ dependencyManagement {
dependency('com.jolbox:bonecp:0.8.0.RELEASE')
dependency('com.squareup.okhttp3:okhttp:3.12.0')
dependency('com.winterbe:expekt:0.5.0')
+ dependency('info.picocli:picocli:4.0.0-alpha-2')
dependency('io.lettuce:lettuce-core:5.1.3.RELEASE')
dependency('io.vertx:vertx-core:3.6.2')
dependencySet(group: 'org.antlr', version: '4.7.1') {
diff --git a/dist/build.gradle b/dist/build.gradle
index 7304d37..82e12ad 100644
--- a/dist/build.gradle
+++ b/dist/build.gradle
@@ -75,22 +75,40 @@ distributions {
}
}
}
+ gossip {
+ contents {
+ into('bin') {
+ from { project(':gossip').startScripts.outputs.files }
+ fileMode = 0755
+ }
+ into('lib') {
+ def libs = []
+ libs << project(':gossip').configurations.runtime
+ from libs
+ from project(':gossip').jar
+ }
+ }
+ }
}
import org.gradle.crypto.checksum.Checksum
distTar{ compression = Compression.GZIP }
-
sourcesDistTar{ compression = Compression.GZIP }
+gossipDistTar{ compression = Compression.GZIP }
+
task createChecksums(type: Checksum, dependsOn: [
'distZip',
'distTar',
'sourcesDistZip',
- 'sourcesDistTar'
+ 'sourcesDistTar',
+ 'gossipDistZip',
+ 'gossipDistTar'
]) {
- files = distZip.outputs.files + distTar.outputs.files +
sourcesDistZip.outputs.files + sourcesDistTar.outputs.files
+ files = distZip.outputs.files + distTar.outputs.files +
sourcesDistZip.outputs.files + sourcesDistTar.outputs.files \
+ + gossipDistZip.outputs.files + gossipDistTar.outputs.files
outputDir = new File(project.buildDir, "distributions")
algorithm = Checksum.Algorithm.SHA512
}
diff --git a/settings.gradle b/gossip/build.gradle
similarity index 53%
copy from settings.gradle
copy to gossip/build.gradle
index 99c3ff0..f708004 100644
--- a/settings.gradle
+++ b/gossip/build.gradle
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding
copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the
@@ -10,33 +10,26 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
-rootProject.name='tuweni'
-include 'bytes'
-include 'concurrent'
-include 'concurrent-coroutines'
-include 'config'
-include 'crypto'
-include 'devp2p'
-include 'dist'
-include 'eth'
-include 'eth-reference-tests'
-include 'eth-repository'
-include 'io'
-include 'junit'
-include 'kademlia'
-include 'kv'
-include 'les'
-include 'merkle-trie'
-include 'net'
-include 'net-coroutines'
-include 'plumtree'
-include 'progpow'
-include 'rlp'
-include 'rlpx'
-include 'scuttlebutt'
-include 'scuttlebutt-discovery'
-include 'scuttlebutt-handshake'
-include 'scuttlebutt-rpc'
-include 'ssz'
-include 'toml'
-include 'units'
+plugins { id 'application' }
+
+description = 'Peer-to-peer gossip application.'
+
+
+dependencies {
+ compile 'com.fasterxml.jackson.core:jackson-databind'
+ compile 'com.google.guava:guava'
+ compile 'info.picocli:picocli'
+ compile 'io.vertx:vertx-core'
+ compile 'org.bouncycastle:bcprov-jdk15on'
+ compile project(':bytes')
+ compile project(':config')
+ compile project(':plumtree')
+
+ testCompile project(':bytes')
+ testCompile project(':junit')
+ testCompile 'org.junit.jupiter:junit-jupiter-api'
+ testCompile 'org.junit.jupiter:junit-jupiter-params'
+ testRuntime 'org.junit.jupiter:junit-jupiter-engine'
+}
+
+application { mainClassName = 'org.apache.tuweni.gossip.GossipApp' }
diff --git a/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java
b/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java
new file mode 100644
index 0000000..ad7293c
--- /dev/null
+++ b/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.tuweni.gossip;
+
+
+import org.apache.tuweni.bytes.Bytes;
+import org.apache.tuweni.concurrent.AsyncCompletion;
+import org.apache.tuweni.concurrent.CompletableAsyncCompletion;
+import org.apache.tuweni.crypto.Hash;
+import org.apache.tuweni.plumtree.EphemeralPeerRepository;
+import org.apache.tuweni.plumtree.vertx.VertxGossipServer;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.HttpMethod;
+import io.vertx.core.http.HttpServer;
+import io.vertx.core.http.HttpServerRequest;
+import picocli.CommandLine;
+
+/**
+ * Application running a gossip client, taking configuration from command line
or a configuration file.
+ *
+ */
+public final class GossipApp {
+
+ public static void main(String[] args) {
+ GossipCommandLineOptions opts = CommandLine.populateCommand(new
GossipCommandLineOptions(), args);
+ try {
+ opts.validate();
+ } catch (IllegalArgumentException e) {
+ System.err.println("Invalid configuration detected.\n\n" +
e.getMessage());
+ new CommandLine(opts).usage(System.out);
+ System.exit(1);
+ }
+ if (opts.help()) {
+ new CommandLine(opts).usage(System.out);
+ System.exit(0);
+ }
+ GossipApp gossipApp = new GossipApp(Vertx.vertx(), opts, System.err,
System.out, () -> System.exit(1));
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> gossipApp.stop()));
+ gossipApp.start();
+ }
+
+ private final GossipCommandLineOptions opts;
+ private final Runnable terminateFunction;
+ private final PrintStream errStream;
+ private final PrintStream outStream;
+ private final VertxGossipServer server;
+ private final HttpServer rpcServer;
+
+ GossipApp(
+ Vertx vertx,
+ GossipCommandLineOptions opts,
+ PrintStream errStream,
+ PrintStream outStream,
+ Runnable terminateFunction) {
+ EphemeralPeerRepository repository = new EphemeralPeerRepository();
+ server = new VertxGossipServer(
+ vertx,
+ opts.networkInterface(),
+ opts.listenPort(),
+ Hash::keccak256,
+ repository,
+ bytes -> readMessage(opts.messageLog(), errStream, bytes),
+ null);
+ this.opts = opts;
+ this.errStream = errStream;
+ this.outStream = outStream;
+ this.terminateFunction = terminateFunction;
+ this.rpcServer = vertx.createHttpServer();
+ }
+
+ void start() {
+ AsyncCompletion completion = server.start();
+ try {
+ completion.join();
+ } catch (CompletionException | InterruptedException e) {
+ errStream.println("Server could not start: " + e.getMessage());
+ terminateFunction.run();
+ }
+
+ CompletableAsyncCompletion rpcCompletion = AsyncCompletion.incomplete();
+ rpcServer.requestHandler(this::handleRPCRequest).listen(opts.rpcPort(),
opts.networkInterface(), res -> {
+ if (res.failed()) {
+ rpcCompletion.completeExceptionally(res.cause());
+ } else {
+ rpcCompletion.complete();
+ }
+ });
+ try {
+ rpcCompletion.join();
+ } catch (CompletionException | InterruptedException e) {
+ errStream.println("RPC server could not start: " + e.getMessage());
+ terminateFunction.run();
+ }
+
+ try {
+ AsyncCompletion
+ .allOf(opts.peers().stream().map(peer ->
server.connectTo(peer.getHost(), peer.getPort())))
+ .join(60, TimeUnit.SECONDS);
+ } catch (TimeoutException | InterruptedException e) {
+ errStream.println("Server could not connect to other peers: " +
e.getMessage());
+ }
+ }
+
+ private void handleRPCRequest(HttpServerRequest httpServerRequest) {
+ if (HttpMethod.POST.equals(httpServerRequest.method())) {
+ if ("/publish".equals(httpServerRequest.path())) {
+ httpServerRequest.bodyHandler(body -> {
+ publish(Bytes.wrapBuffer(body));
+ httpServerRequest.response().setStatusCode(200).end();
+ });
+ } else {
+ httpServerRequest.response().setStatusCode(404).end();
+ }
+ } else {
+ httpServerRequest.response().setStatusCode(405).end();
+ }
+ }
+
+ void stop() {
+ try {
+ server.stop().join();
+ } catch (InterruptedException e) {
+ errStream.println("Server could not stop: " + e.getMessage());
+ terminateFunction.run();
+ }
+
+ CompletableAsyncCompletion rpcCompletion = AsyncCompletion.incomplete();
+ rpcServer.close(res -> {
+ if (res.failed()) {
+ rpcCompletion.completeExceptionally(res.cause());
+ } else {
+ rpcCompletion.complete();
+ }
+ });
+ try {
+ rpcCompletion.join();
+ } catch (CompletionException | InterruptedException e) {
+ errStream.println("RPC server could not stop: " + e.getMessage());
+ terminateFunction.run();
+ }
+ }
+
+ private void readMessage(String messageLog, PrintStream err, Bytes bytes) {
+ ObjectMapper mapper = new ObjectMapper();
+ ObjectNode node = mapper.createObjectNode();
+ node.put("timestamp", Instant.now().toString());
+ node.put("value", new String(bytes.toArrayUnsafe(),
StandardCharsets.UTF_8));
+ try {
+ Path path = Paths.get(messageLog);
+ Files.write(
+ path,
+ Collections.singletonList(mapper.writeValueAsString(node)),
+ StandardCharsets.UTF_8,
+ Files.exists(path) ? StandardOpenOption.APPEND :
StandardOpenOption.CREATE);
+ } catch (IOException e) {
+ err.println(e.getMessage());
+ }
+ }
+
+ public void publish(Bytes message) {
+ server.gossip("", message);
+ }
+}
diff --git
a/gossip/src/main/java/org/apache/tuweni/gossip/GossipCommandLineOptions.java
b/gossip/src/main/java/org/apache/tuweni/gossip/GossipCommandLineOptions.java
new file mode 100644
index 0000000..04a6f35
--- /dev/null
+++
b/gossip/src/main/java/org/apache/tuweni/gossip/GossipCommandLineOptions.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.tuweni.gossip;
+
+
+import org.apache.tuweni.config.Configuration;
+import org.apache.tuweni.config.ConfigurationError;
+import org.apache.tuweni.config.PropertyValidator;
+import org.apache.tuweni.config.Schema;
+import org.apache.tuweni.config.SchemaBuilder;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import picocli.CommandLine;
+
+final class GossipCommandLineOptions {
+
+ static final Schema createConfigFileSchema() {
+ return SchemaBuilder
+ .create()
+ .addInteger("listenPort", 0, "Port to listen on",
PropertyValidator.inRange(0, 65536))
+ .addInteger("rpcPort", 0, "RPC port to listen on",
PropertyValidator.inRange(0, 65536))
+ .addString("networkInterface", "0.0.0.0", "Network interface to bind
to", null)
+ .addListOfString("peers", Collections.emptyList(), "Static peers
list", null)
+ .addString("messagelog", "messages.log", "Log file where messages are
stored", null)
+ .toSchema();
+ }
+
+ @CommandLine.Option(names = {"-c", "--config"} , description =
"Configuration file.")
+ private Path configPath = null;
+
+ @CommandLine.Option(arity = "0..*" , names = {"-p", "--peer"} , description
= "Static peers list")
+ private String[] peers;
+
+ @CommandLine.Option(names = {"-l", "--listen"} , description = "Port to
listen on")
+ private Integer port;
+
+ @CommandLine.Option(names = {"-r", "--rpc"} , description = "RPC port to
listen on")
+ private Integer rpcPort;
+
+ @CommandLine.Option(names = {"-n", "--networkInterface"} , description =
"Network interface to bind to")
+ private String networkInterface = "0.0.0.0";
+
+ @CommandLine.Option(names = {"-m", "--messageLog"} , description = "Log file
where messages are stored")
+ private String messageLog;
+
+ @CommandLine.Option(names = {"-h", "--help"} , description = "Prints usage
prompt")
+ private boolean help;
+
+ private List<URI> peerAddresses;
+ private Configuration config;
+
+ GossipCommandLineOptions() {}
+
+ GossipCommandLineOptions(
+ String[] peers,
+ Integer port,
+ String networkInterface,
+ String messageLog,
+ Integer rpcPort,
+ Configuration config) {
+ this.peers = peers;
+ this.port = port;
+ this.networkInterface = networkInterface;
+ this.messageLog = messageLog;
+ this.rpcPort = rpcPort;
+ this.config = config;
+ }
+
+ private Configuration config() {
+ if (config == null && configPath != null) {
+ try {
+ config = Configuration.fromToml(configPath, createConfigFileSchema());
+ } catch (IOException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+ return config;
+ }
+
+ List<URI> peers() {
+ if (peerAddresses == null) {
+ peerAddresses = new ArrayList<>();
+ if (peers != null) {
+ for (String peer : peers) {
+ URI peerURI = URI.create(peer);
+ if (peerURI.getHost() == null) {
+ throw new IllegalArgumentException("Invalid peer URI " + peerURI);
+ }
+ peerAddresses.add(peerURI);
+ }
+ } else {
+ if (config() != null) {
+ for (String peer : config().getListOfString("peers")) {
+ URI peerURI = URI.create(peer);
+ if (peerURI.getHost() == null) {
+ throw new IllegalArgumentException("Invalid peer URI " +
peerURI);
+ }
+ peerAddresses.add(peerURI);
+ }
+ }
+ }
+ }
+ return peerAddresses;
+ }
+
+ void validate() {
+ int listenPort = listenPort();
+ if (listenPort < 0 || listenPort > 65535) {
+ throw new IllegalArgumentException("Invalid port number " + listenPort);
+ }
+ int rpcPort = rpcPort();
+ if (rpcPort < 0 || rpcPort > 65535) {
+ throw new IllegalArgumentException("Invalid port number" + rpcPort);
+ }
+ peers();
+ try {
+ InetAddress.getByName(networkInterface);
+ } catch (UnknownHostException e) {
+ throw new IllegalArgumentException("Invalid network interface");
+ }
+ if (config() != null) {
+ List<ConfigurationError> errors = config().errors();
+ if (errors.size() > 0) {
+ String message = errors.stream().map(err -> "[" + err.position() + "]
" + err.getMessage()).collect(
+ Collectors.joining("\n"));
+ throw new IllegalArgumentException(message);
+ }
+ }
+ }
+
+ int listenPort() {
+ if (port != null) {
+ return port;
+ }
+ if (config() != null) {
+ return config.getInteger("listenPort");
+ }
+ return 0;
+ }
+
+ int rpcPort() {
+ if (rpcPort != null) {
+ return rpcPort;
+ }
+ if (config() != null) {
+ return config.getInteger("rpcPort");
+ }
+ return 0;
+ }
+
+ String networkInterface() {
+ if (networkInterface != null) {
+ return networkInterface;
+ }
+ if (config() != null) {
+ return config.getString("networkInterface");
+ }
+ return "0.0.0.0";
+ }
+
+ String messageLog() {
+ if (messageLog != null) {
+ return messageLog;
+ }
+ if (config != null) {
+ return config.getString("messageLog");
+ }
+ return "messages.log";
+ }
+
+ boolean help() {
+ return help;
+ }
+}
diff --git
a/gossip/src/test/java/org/apache/tuweni/gossip/GossipCommandLineOptionsTest.java
b/gossip/src/test/java/org/apache/tuweni/gossip/GossipCommandLineOptionsTest.java
new file mode 100644
index 0000000..cd6b885
--- /dev/null
+++
b/gossip/src/test/java/org/apache/tuweni/gossip/GossipCommandLineOptionsTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.tuweni.gossip;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.apache.tuweni.config.Configuration;
+
+import java.net.URI;
+import java.net.UnknownHostException;
+
+import org.junit.jupiter.api.Test;
+
+class GossipCommandLineOptionsTest {
+
+ @Test
+ void testInvalidPort() {
+ GossipCommandLineOptions opts = new GossipCommandLineOptions(new
String[0], -4, "0.0.0.0", null, 3, null);
+ assertThrows(IllegalArgumentException.class, opts::validate);
+ }
+
+ @Test
+ void testInvalidPeer() {
+ GossipCommandLineOptions opts =
+ new GossipCommandLineOptions(new String[]
{"tcp://400.300.200.100:9000"}, 10, "0.0.0.0", null, 3, null);
+ assertThrows(IllegalArgumentException.class, opts::validate);
+ }
+
+ @Test
+ void testInvalidNetworkInterface() {
+ GossipCommandLineOptions opts = new GossipCommandLineOptions(new String[]
{}, 10, "400.300.200.100", null, 3, null);
+ assertThrows(IllegalArgumentException.class, opts::validate);
+ }
+
+ @Test
+ void operateFromConfig() throws UnknownHostException {
+ Configuration config = Configuration.fromToml(
+ ""
+ + "peers=[\"tcp://127.0.0.1:2000\"]\n"
+ + "listenPort=1080\n"
+ + "networkInterface=\"127.0.0.1\"\n"
+ + "messageLog=\"D:/Temp\"",
+ GossipCommandLineOptions.createConfigFileSchema());
+ GossipCommandLineOptions opts = new GossipCommandLineOptions(null, null,
null, null, 3000, config);
+ opts.validate();
+ assertEquals(1080, opts.listenPort());
+ assertEquals(1, opts.peers().size());
+ assertEquals(URI.create("tcp://127.0.0.1:2000"), opts.peers().get(0));
+ assertEquals("127.0.0.1", opts.networkInterface());
+ assertEquals("D:/Temp", opts.messageLog());
+ }
+
+ @Test
+ void invalidConfigFilePort() throws UnknownHostException {
+ Configuration config = Configuration.fromToml(
+ ""
+ + "peers=[\"tcp://127.0.0.1:3000\"]\n"
+ + "listenPort=500000\n"
+ + "networkInterface=\"127.0.0.1\"\n"
+ + "messageLog=\"D:/Temp\"",
+ GossipCommandLineOptions.createConfigFileSchema());
+ GossipCommandLineOptions opts = new GossipCommandLineOptions(null, null,
null, null, 3000, config);
+ assertThrows(IllegalArgumentException.class, opts::validate);
+ }
+
+ @Test
+ void cliConfigOverConfigFile() throws UnknownHostException {
+ Configuration config = Configuration.fromToml(
+ ""
+ + "peers=\"tcp://127.0.0.1:3000\"\n"
+ + "listenPort=1080\n"
+ + "networkInterface=\"127.0.0.1\"\n"
+ + "messageLog=\"D:/Temp\"");
+ GossipCommandLineOptions opts =
+ new GossipCommandLineOptions(new String[] {"tcp://192.168.0.1:3000"},
400, "0.0.0.0", "C:/Temp", 3000, config);
+ assertEquals(400, opts.listenPort());
+ assertEquals(1, opts.peers().size());
+ assertEquals(URI.create("tcp://192.168.0.1:3000"), opts.peers().get(0));
+ assertEquals("0.0.0.0", opts.networkInterface());
+ assertEquals("C:/Temp", opts.messageLog());
+ }
+
+}
diff --git
a/gossip/src/test/java/org/apache/tuweni/gossip/GossipIntegrationTest.java
b/gossip/src/test/java/org/apache/tuweni/gossip/GossipIntegrationTest.java
new file mode 100644
index 0000000..7ff14d0
--- /dev/null
+++ b/gossip/src/test/java/org/apache/tuweni/gossip/GossipIntegrationTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.tuweni.gossip;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+import org.apache.tuweni.bytes.Bytes;
+import org.apache.tuweni.bytes.Bytes32;
+import org.apache.tuweni.junit.*;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpClient;
+import io.vertx.core.http.HttpMethod;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith({VertxExtension.class, TempDirectoryExtension.class,
BouncyCastleExtension.class})
+class GossipIntegrationTest {
+
+ @Test
+ void threeGossipServersStarting(@VertxInstance Vertx vertx, @TempDirectory
Path tempDir) throws Exception {
+ GossipCommandLineOptions opts1 = new GossipCommandLineOptions(
+ new String[] {"tcp://127.0.0.1:9001", "tcp://127.0.0.1:9002"},
+ 9000,
+ "127.0.0.1",
+ tempDir.resolve("log1.log").toString(),
+ 10000,
+ null);
+ GossipCommandLineOptions opts2 = new GossipCommandLineOptions(
+ new String[] {"tcp://127.0.0.1:9000", "tcp://127.0.0.1:9002"},
+ 9001,
+ "127.0.0.1",
+ tempDir.resolve("log2.log").toString(),
+ 10001,
+ null);
+ GossipCommandLineOptions opts3 = new GossipCommandLineOptions(
+ new String[] {"tcp://127.0.0.1:9000", "tcp://127.0.0.1:9001"},
+ 9002,
+ "127.0.0.1",
+ tempDir.resolve("log3.log").toString(),
+ 10002,
+ null);
+ AtomicBoolean terminationRan = new AtomicBoolean(false);
+
+ ExecutorService service = Executors.newFixedThreadPool(3);
+
+ Future<GossipApp> app1Future = service.submit(() -> {
+ GossipApp app = new GossipApp(vertx, opts1, System.err, System.out, ()
-> {
+ terminationRan.set(true);
+ });
+ app.start();
+ return app;
+ });
+ Future<GossipApp> app2Future = service.submit(() -> {
+ GossipApp app = new GossipApp(vertx, opts2, System.err, System.out, ()
-> {
+ terminationRan.set(true);
+ });
+ app.start();
+ return app;
+ });
+ Future<GossipApp> app3Future = service.submit(() -> {
+ GossipApp app = new GossipApp(vertx, opts3, System.err, System.out, ()
-> {
+ terminationRan.set(true);
+ });
+ app.start();
+ return app;
+ });
+ GossipApp app1 = app1Future.get(10, TimeUnit.SECONDS);
+ GossipApp app2 = app2Future.get(10, TimeUnit.SECONDS);
+ GossipApp app3 = app3Future.get(10, TimeUnit.SECONDS);
+
+ assertFalse(terminationRan.get());
+
+ HttpClient client = vertx.createHttpClient();
+
+ for (int i = 0; i < 20; i++) {
+ client.request(HttpMethod.POST, 10000, "127.0.0.1",
"/publish").exceptionHandler(thr -> {
+ throw new RuntimeException(thr);
+ }).handler(resp -> {
+
+
}).end(Buffer.buffer(Bytes32.rightPad(Bytes.ofUnsignedInt(i)).toHexString().getBytes(StandardCharsets.UTF_8)));
+ }
+
+ List<String> receiver1 = null;
+
+ int counter = 0;
+ do {
+ Thread.sleep(1000);
+ counter++;
+ receiver1 = Files.readAllLines(tempDir.resolve("log2.log"));
+ } while (receiver1.size() < 20 && counter < 20);
+
+ client.close();
+
+ service.submit(app1::stop);
+ service.submit(app2::stop);
+ service.submit(app3::stop);
+
+
+ assertEquals(20, receiver1.size());
+ List<String> receiver2 = Files.readAllLines(tempDir.resolve("log3.log"));
+ assertEquals(20, receiver2.size());
+
+ service.shutdown();
+
+ }
+}
diff --git a/gradle/check-licenses.gradle b/gradle/check-licenses.gradle
index e3fd319..8cc6d21 100644
--- a/gradle/check-licenses.gradle
+++ b/gradle/check-licenses.gradle
@@ -90,6 +90,7 @@ downloadLicenses {
'ASL, Version 2',
'The Apache License, Version 2.0',
'The Apache Software License, Version 2.0',
+ 'The Apache Software License, version 2.0'
],
(bsd): [
'Berkeley Software Distribution (BSD) License',
diff --git
a/plumtree/src/main/java/org/apache/tuweni/plumtree/EphemeralPeerRepository.java
b/plumtree/src/main/java/org/apache/tuweni/plumtree/EphemeralPeerRepository.java
index f75c2d8..acbdeec 100644
---
a/plumtree/src/main/java/org/apache/tuweni/plumtree/EphemeralPeerRepository.java
+++
b/plumtree/src/main/java/org/apache/tuweni/plumtree/EphemeralPeerRepository.java
@@ -14,15 +14,14 @@ package org.apache.tuweni.plumtree;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
public class EphemeralPeerRepository implements PeerRepository {
- private final Set<Peer> eagerPushPeers = Collections.synchronizedSet(new
HashSet<>());
- private final Set<Peer> lazyPushPeers = Collections.synchronizedSet(new
HashSet<>());
+ private final Set<Peer> eagerPushPeers = ConcurrentHashMap.newKeySet();
+ private final Set<Peer> lazyPushPeers = ConcurrentHashMap.newKeySet();
@Override
public void addEager(Peer peer) {
diff --git
a/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
index ba0a289..9cb2d38 100644
---
a/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
+++
b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
@@ -24,7 +24,9 @@ import org.apache.tuweni.plumtree.State;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
+import javax.annotation.Nullable;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -64,28 +66,30 @@ public final class VertxGossipServer {
void handle(Buffer data) {
buffer = Bytes.concatenate(buffer, Bytes.wrapBuffer(data));
- Message message;
- try {
- JsonParser parser =
mapper.getFactory().createParser(buffer.toArrayUnsafe());
- message = parser.readValueAs(Message.class);
- buffer = buffer.slice((int)
parser.getCurrentLocation().getByteOffset());
- } catch (IOException e) {
- return;
- }
+ while (!buffer.isEmpty()) {
+ Message message;
+ try {
+ JsonParser parser =
mapper.getFactory().createParser(buffer.toArrayUnsafe());
+ message = parser.readValueAs(Message.class);
+ buffer = buffer.slice((int)
parser.getCurrentLocation().getByteOffset());
+ } catch (IOException e) {
+ return;
+ }
- switch (message.verb) {
- case IHAVE:
- state.receiveIHaveMessage(peer,
Bytes.fromHexString(message.payload));
- break;
- case GOSSIP:
- state.receiveGossipMessage(peer, message.attributes,
Bytes.fromHexString(message.payload));
- break;
- case GRAFT:
- state.receiveGraftMessage(peer,
Bytes.fromHexString(message.payload));
- break;
- case PRUNE:
- state.receivePruneMessage(peer);
- break;
+ switch (message.verb) {
+ case IHAVE:
+ state.receiveIHaveMessage(peer,
Bytes.fromHexString(message.payload));
+ break;
+ case GOSSIP:
+ state.receiveGossipMessage(peer, message.attributes,
Bytes.fromHexString(message.payload));
+ break;
+ case GRAFT:
+ state.receiveGraftMessage(peer,
Bytes.fromHexString(message.payload));
+ break;
+ case PRUNE:
+ state.receivePruneMessage(peer);
+ break;
+ }
}
}
@@ -113,14 +117,14 @@ public final class VertxGossipServer {
MessageHashing messageHashing,
PeerRepository peerRepository,
Consumer<Bytes> payloadListener,
- MessageValidator payloadValidator) {
+ @Nullable MessageValidator payloadValidator) {
this.vertx = vertx;
this.networkInterface = networkInterface;
this.port = port;
this.messageHashing = messageHashing;
this.peerRepository = peerRepository;
this.payloadListener = payloadListener;
- this.payloadValidator = payloadValidator;
+ this.payloadValidator = payloadValidator == null ? (bytes, peer) -> true :
payloadValidator;
}
public AsyncCompletion start() {
@@ -184,16 +188,21 @@ public final class VertxGossipServer {
throw new IllegalStateException("Server has not started");
}
CompletableAsyncCompletion completion = AsyncCompletion.incomplete();
- client.connect(port, host, res -> {
- if (res.failed()) {
- completion.completeExceptionally(res.cause());
- } else {
- completion.complete();
- Peer peer = new SocketPeer(res.result());
- SocketHandler handler = new SocketHandler(peer);
- res.result().handler(handler::handle).closeHandler(handler::close);
- }
- });
+ AtomicInteger counter = new AtomicInteger(0);
+ while (!completion.isDone()) {
+ client.connect(port, host, res -> {
+ if (res.failed()) {
+ if (counter.incrementAndGet() > 5) {
+ completion.completeExceptionally(res.cause());
+ }
+ } else {
+ completion.complete();
+ Peer peer = new SocketPeer(res.result());
+ SocketHandler handler = new SocketHandler(peer);
+ res.result().handler(handler::handle).closeHandler(handler::close);
+ }
+ });
+ }
return completion;
}
diff --git
a/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
b/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
index 384adb3..29801db 100644
---
a/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
+++
b/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
@@ -106,7 +106,7 @@ class VertxGossipServerTest {
server3.connectTo("127.0.0.1", 10001).join();
String attributes = "{\"message_type\": \"BLOCK\"}";
server1.gossip(attributes, Bytes.fromHexString("deadbeef"));
- Thread.sleep(1000);
+ Thread.sleep(2000);
assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.get());
assertEquals(Bytes.fromHexString("deadbeef"), messageReceived3.get());
assertNull(messageReceived1.get());
@@ -161,7 +161,7 @@ class VertxGossipServerTest {
assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.get());
Thread.sleep(1000);
- assertTrue(peerRepository1.lazyPushPeers().size() == 1 ||
peerRepository3.lazyPushPeers().size() == 1);
+ assertTrue(peerRepository1.lazyPushPeers().size() > 1 ||
peerRepository3.lazyPushPeers().size() > 1);
server1.stop().join();
server2.stop().join();
diff --git a/settings.gradle b/settings.gradle
index 99c3ff0..2dc5b3b 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -21,6 +21,7 @@ include 'dist'
include 'eth'
include 'eth-reference-tests'
include 'eth-repository'
+include 'gossip'
include 'io'
include 'junit'
include 'kademlia'
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]