This is an automated email from the ASF dual-hosted git repository.

toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e7a41c  Add a module to create a complete gossip application
     new 5aead93  Merge pull request #9 from atoulme/gossip_app
9e7a41c is described below

commit 9e7a41ce4f6e4ed4a443190b0728faac332856de
Author: Antoine Toulme <[email protected]>
AuthorDate: Fri May 3 17:18:48 2019 -0700

    Add a module to create a complete gossip application
---
 dependency-versions.gradle                         |   1 +
 dist/build.gradle                                  |  24 ++-
 settings.gradle => gossip/build.gradle             |  55 +++---
 .../java/org/apache/tuweni/gossip/GossipApp.java   | 189 ++++++++++++++++++++
 .../tuweni/gossip/GossipCommandLineOptions.java    | 193 +++++++++++++++++++++
 .../gossip/GossipCommandLineOptionsTest.java       |  94 ++++++++++
 .../tuweni/gossip/GossipIntegrationTest.java       | 129 ++++++++++++++
 gradle/check-licenses.gradle                       |   1 +
 .../tuweni/plumtree/EphemeralPeerRepository.java   |   7 +-
 .../tuweni/plumtree/vertx/VertxGossipServer.java   |  75 ++++----
 .../plumtree/vertx/VertxGossipServerTest.java      |   4 +-
 settings.gradle                                    |   1 +
 12 files changed, 700 insertions(+), 73 deletions(-)

diff --git a/dependency-versions.gradle b/dependency-versions.gradle
index 4bae852..b8b62fb 100644
--- a/dependency-versions.gradle
+++ b/dependency-versions.gradle
@@ -24,6 +24,7 @@ dependencyManagement {
     dependency('com.jolbox:bonecp:0.8.0.RELEASE')
     dependency('com.squareup.okhttp3:okhttp:3.12.0')
     dependency('com.winterbe:expekt:0.5.0')
+    dependency('info.picocli:picocli:4.0.0-alpha-2')
     dependency('io.lettuce:lettuce-core:5.1.3.RELEASE')
     dependency('io.vertx:vertx-core:3.6.2')
     dependencySet(group: 'org.antlr', version: '4.7.1') {
diff --git a/dist/build.gradle b/dist/build.gradle
index 7304d37..82e12ad 100644
--- a/dist/build.gradle
+++ b/dist/build.gradle
@@ -75,22 +75,40 @@ distributions {
       }
     }
   }
+  gossip {
+    contents {
+      into('bin') {
+        from { project(':gossip').startScripts.outputs.files }
+        fileMode = 0755
+      }
+      into('lib') {
+        def libs = []
+        libs << project(':gossip').configurations.runtime
+        from libs
+        from project(':gossip').jar
+      }
+    }
+  }
 }
 
 import org.gradle.crypto.checksum.Checksum
 
 distTar{ compression = Compression.GZIP }
 
-
 sourcesDistTar{ compression = Compression.GZIP }
 
+gossipDistTar{ compression = Compression.GZIP }
+
 task createChecksums(type: Checksum, dependsOn: [
   'distZip',
   'distTar',
   'sourcesDistZip',
-  'sourcesDistTar'
+  'sourcesDistTar',
+  'gossipDistZip',
+  'gossipDistTar'
 ]) {
-  files = distZip.outputs.files + distTar.outputs.files + 
sourcesDistZip.outputs.files + sourcesDistTar.outputs.files
+  files = distZip.outputs.files + distTar.outputs.files + 
sourcesDistZip.outputs.files + sourcesDistTar.outputs.files \
+      + gossipDistZip.outputs.files + gossipDistTar.outputs.files
   outputDir = new File(project.buildDir, "distributions")
   algorithm = Checksum.Algorithm.SHA512
 }
diff --git a/settings.gradle b/gossip/build.gradle
similarity index 53%
copy from settings.gradle
copy to gossip/build.gradle
index 99c3ff0..f708004 100644
--- a/settings.gradle
+++ b/gossip/build.gradle
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
  * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
@@ -10,33 +10,26 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
-rootProject.name='tuweni'
-include 'bytes'
-include 'concurrent'
-include 'concurrent-coroutines'
-include 'config'
-include 'crypto'
-include 'devp2p'
-include 'dist'
-include 'eth'
-include 'eth-reference-tests'
-include 'eth-repository'
-include 'io'
-include 'junit'
-include 'kademlia'
-include 'kv'
-include 'les'
-include 'merkle-trie'
-include 'net'
-include 'net-coroutines'
-include 'plumtree'
-include 'progpow'
-include 'rlp'
-include 'rlpx'
-include 'scuttlebutt'
-include 'scuttlebutt-discovery'
-include 'scuttlebutt-handshake'
-include 'scuttlebutt-rpc'
-include 'ssz'
-include 'toml'
-include 'units'
+plugins { id 'application' }
+
+description = 'Peer-to-peer gossip application.'
+
+
+dependencies {
+  compile 'com.fasterxml.jackson.core:jackson-databind'
+  compile 'com.google.guava:guava'
+  compile 'info.picocli:picocli'
+  compile 'io.vertx:vertx-core'
+  compile 'org.bouncycastle:bcprov-jdk15on'
+  compile project(':bytes')
+  compile project(':config')
+  compile project(':plumtree')
+
+  testCompile project(':bytes')
+  testCompile project(':junit')
+  testCompile 'org.junit.jupiter:junit-jupiter-api'
+  testCompile 'org.junit.jupiter:junit-jupiter-params'
+  testRuntime 'org.junit.jupiter:junit-jupiter-engine'
+}
+
+application { mainClassName = 'org.apache.tuweni.gossip.GossipApp' }
diff --git a/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java 
b/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java
new file mode 100644
index 0000000..ad7293c
--- /dev/null
+++ b/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.tuweni.gossip;
+
+
+import org.apache.tuweni.bytes.Bytes;
+import org.apache.tuweni.concurrent.AsyncCompletion;
+import org.apache.tuweni.concurrent.CompletableAsyncCompletion;
+import org.apache.tuweni.crypto.Hash;
+import org.apache.tuweni.plumtree.EphemeralPeerRepository;
+import org.apache.tuweni.plumtree.vertx.VertxGossipServer;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.HttpMethod;
+import io.vertx.core.http.HttpServer;
+import io.vertx.core.http.HttpServerRequest;
+import picocli.CommandLine;
+
+/**
+ * Application running a gossip client, taking configuration from command line 
or a configuration file.
+ *
+ */
+public final class GossipApp {
+
+  public static void main(String[] args) {
+    GossipCommandLineOptions opts = CommandLine.populateCommand(new 
GossipCommandLineOptions(), args);
+    try {
+      opts.validate();
+    } catch (IllegalArgumentException e) {
+      System.err.println("Invalid configuration detected.\n\n" + 
e.getMessage());
+      new CommandLine(opts).usage(System.out);
+      System.exit(1);
+    }
+    if (opts.help()) {
+      new CommandLine(opts).usage(System.out);
+      System.exit(0);
+    }
+    GossipApp gossipApp = new GossipApp(Vertx.vertx(), opts, System.err, 
System.out, () -> System.exit(1));
+    Runtime.getRuntime().addShutdownHook(new Thread(() -> gossipApp.stop()));
+    gossipApp.start();
+  }
+
+  private final GossipCommandLineOptions opts;
+  private final Runnable terminateFunction;
+  private final PrintStream errStream;
+  private final PrintStream outStream;
+  private final VertxGossipServer server;
+  private final HttpServer rpcServer;
+
+  GossipApp(
+      Vertx vertx,
+      GossipCommandLineOptions opts,
+      PrintStream errStream,
+      PrintStream outStream,
+      Runnable terminateFunction) {
+    EphemeralPeerRepository repository = new EphemeralPeerRepository();
+    server = new VertxGossipServer(
+        vertx,
+        opts.networkInterface(),
+        opts.listenPort(),
+        Hash::keccak256,
+        repository,
+        bytes -> readMessage(opts.messageLog(), errStream, bytes),
+        null);
+    this.opts = opts;
+    this.errStream = errStream;
+    this.outStream = outStream;
+    this.terminateFunction = terminateFunction;
+    this.rpcServer = vertx.createHttpServer();
+  }
+
+  void start() {
+    AsyncCompletion completion = server.start();
+    try {
+      completion.join();
+    } catch (CompletionException | InterruptedException e) {
+      errStream.println("Server could not start: " + e.getMessage());
+      terminateFunction.run();
+    }
+
+    CompletableAsyncCompletion rpcCompletion = AsyncCompletion.incomplete();
+    rpcServer.requestHandler(this::handleRPCRequest).listen(opts.rpcPort(), 
opts.networkInterface(), res -> {
+      if (res.failed()) {
+        rpcCompletion.completeExceptionally(res.cause());
+      } else {
+        rpcCompletion.complete();
+      }
+    });
+    try {
+      rpcCompletion.join();
+    } catch (CompletionException | InterruptedException e) {
+      errStream.println("RPC server could not start: " + e.getMessage());
+      terminateFunction.run();
+    }
+
+    try {
+      AsyncCompletion
+          .allOf(opts.peers().stream().map(peer -> 
server.connectTo(peer.getHost(), peer.getPort())))
+          .join(60, TimeUnit.SECONDS);
+    } catch (TimeoutException | InterruptedException e) {
+      errStream.println("Server could not connect to other peers: " + 
e.getMessage());
+    }
+  }
+
+  private void handleRPCRequest(HttpServerRequest httpServerRequest) {
+    if (HttpMethod.POST.equals(httpServerRequest.method())) {
+      if ("/publish".equals(httpServerRequest.path())) {
+        httpServerRequest.bodyHandler(body -> {
+          publish(Bytes.wrapBuffer(body));
+          httpServerRequest.response().setStatusCode(200).end();
+        });
+      } else {
+        httpServerRequest.response().setStatusCode(404).end();
+      }
+    } else {
+      httpServerRequest.response().setStatusCode(405).end();
+    }
+  }
+
+  void stop() {
+    try {
+      server.stop().join();
+    } catch (InterruptedException e) {
+      errStream.println("Server could not stop: " + e.getMessage());
+      terminateFunction.run();
+    }
+
+    CompletableAsyncCompletion rpcCompletion = AsyncCompletion.incomplete();
+    rpcServer.close(res -> {
+      if (res.failed()) {
+        rpcCompletion.completeExceptionally(res.cause());
+      } else {
+        rpcCompletion.complete();
+      }
+    });
+    try {
+      rpcCompletion.join();
+    } catch (CompletionException | InterruptedException e) {
+      errStream.println("RPC server could not stop: " + e.getMessage());
+      terminateFunction.run();
+    }
+  }
+
+  private void readMessage(String messageLog, PrintStream err, Bytes bytes) {
+    ObjectMapper mapper = new ObjectMapper();
+    ObjectNode node = mapper.createObjectNode();
+    node.put("timestamp", Instant.now().toString());
+    node.put("value", new String(bytes.toArrayUnsafe(), 
StandardCharsets.UTF_8));
+    try {
+      Path path = Paths.get(messageLog);
+      Files.write(
+          path,
+          Collections.singletonList(mapper.writeValueAsString(node)),
+          StandardCharsets.UTF_8,
+          Files.exists(path) ? StandardOpenOption.APPEND : 
StandardOpenOption.CREATE);
+    } catch (IOException e) {
+      err.println(e.getMessage());
+    }
+  }
+
+  public void publish(Bytes message) {
+    server.gossip("", message);
+  }
+}
diff --git 
a/gossip/src/main/java/org/apache/tuweni/gossip/GossipCommandLineOptions.java 
b/gossip/src/main/java/org/apache/tuweni/gossip/GossipCommandLineOptions.java
new file mode 100644
index 0000000..04a6f35
--- /dev/null
+++ 
b/gossip/src/main/java/org/apache/tuweni/gossip/GossipCommandLineOptions.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.tuweni.gossip;
+
+
+import org.apache.tuweni.config.Configuration;
+import org.apache.tuweni.config.ConfigurationError;
+import org.apache.tuweni.config.PropertyValidator;
+import org.apache.tuweni.config.Schema;
+import org.apache.tuweni.config.SchemaBuilder;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import picocli.CommandLine;
+
+final class GossipCommandLineOptions {
+
+  static final Schema createConfigFileSchema() {
+    return SchemaBuilder
+        .create()
+        .addInteger("listenPort", 0, "Port to listen on", 
PropertyValidator.inRange(0, 65536))
+        .addInteger("rpcPort", 0, "RPC port to listen on", 
PropertyValidator.inRange(0, 65536))
+        .addString("networkInterface", "0.0.0.0", "Network interface to bind 
to", null)
+        .addListOfString("peers", Collections.emptyList(), "Static peers 
list", null)
+        .addString("messagelog", "messages.log", "Log file where messages are 
stored", null)
+        .toSchema();
+  }
+
+  @CommandLine.Option(names = {"-c", "--config"} , description = 
"Configuration file.")
+  private Path configPath = null;
+
+  @CommandLine.Option(arity = "0..*" , names = {"-p", "--peer"} , description 
= "Static peers list")
+  private String[] peers;
+
+  @CommandLine.Option(names = {"-l", "--listen"} , description = "Port to 
listen on")
+  private Integer port;
+
+  @CommandLine.Option(names = {"-r", "--rpc"} , description = "RPC port to 
listen on")
+  private Integer rpcPort;
+
+  @CommandLine.Option(names = {"-n", "--networkInterface"} , description = 
"Network interface to bind to")
+  private String networkInterface = "0.0.0.0";
+
+  @CommandLine.Option(names = {"-m", "--messageLog"} , description = "Log file 
where messages are stored")
+  private String messageLog;
+
+  @CommandLine.Option(names = {"-h", "--help"} , description = "Prints usage 
prompt")
+  private boolean help;
+
+  private List<URI> peerAddresses;
+  private Configuration config;
+
+  GossipCommandLineOptions() {}
+
+  GossipCommandLineOptions(
+      String[] peers,
+      Integer port,
+      String networkInterface,
+      String messageLog,
+      Integer rpcPort,
+      Configuration config) {
+    this.peers = peers;
+    this.port = port;
+    this.networkInterface = networkInterface;
+    this.messageLog = messageLog;
+    this.rpcPort = rpcPort;
+    this.config = config;
+  }
+
+  private Configuration config() {
+    if (config == null && configPath != null) {
+      try {
+        config = Configuration.fromToml(configPath, createConfigFileSchema());
+      } catch (IOException e) {
+        throw new IllegalArgumentException(e);
+      }
+    }
+    return config;
+  }
+
+  List<URI> peers() {
+    if (peerAddresses == null) {
+      peerAddresses = new ArrayList<>();
+      if (peers != null) {
+        for (String peer : peers) {
+          URI peerURI = URI.create(peer);
+          if (peerURI.getHost() == null) {
+            throw new IllegalArgumentException("Invalid peer URI " + peerURI);
+          }
+          peerAddresses.add(peerURI);
+        }
+      } else {
+        if (config() != null) {
+          for (String peer : config().getListOfString("peers")) {
+            URI peerURI = URI.create(peer);
+            if (peerURI.getHost() == null) {
+              throw new IllegalArgumentException("Invalid peer URI " + 
peerURI);
+            }
+            peerAddresses.add(peerURI);
+          }
+        }
+      }
+    }
+    return peerAddresses;
+  }
+
+  void validate() {
+    int listenPort = listenPort();
+    if (listenPort < 0 || listenPort > 65535) {
+      throw new IllegalArgumentException("Invalid port number " + listenPort);
+    }
+    int rpcPort = rpcPort();
+    if (rpcPort < 0 || rpcPort > 65535) {
+      throw new IllegalArgumentException("Invalid port number" + rpcPort);
+    }
+    peers();
+    try {
+      InetAddress.getByName(networkInterface);
+    } catch (UnknownHostException e) {
+      throw new IllegalArgumentException("Invalid network interface");
+    }
+    if (config() != null) {
+      List<ConfigurationError> errors = config().errors();
+      if (errors.size() > 0) {
+        String message = errors.stream().map(err -> "[" + err.position() + "] 
" + err.getMessage()).collect(
+            Collectors.joining("\n"));
+        throw new IllegalArgumentException(message);
+      }
+    }
+  }
+
+  int listenPort() {
+    if (port != null) {
+      return port;
+    }
+    if (config() != null) {
+      return config.getInteger("listenPort");
+    }
+    return 0;
+  }
+
+  int rpcPort() {
+    if (rpcPort != null) {
+      return rpcPort;
+    }
+    if (config() != null) {
+      return config.getInteger("rpcPort");
+    }
+    return 0;
+  }
+
+  String networkInterface() {
+    if (networkInterface != null) {
+      return networkInterface;
+    }
+    if (config() != null) {
+      return config.getString("networkInterface");
+    }
+    return "0.0.0.0";
+  }
+
+  String messageLog() {
+    if (messageLog != null) {
+      return messageLog;
+    }
+    if (config != null) {
+      return config.getString("messageLog");
+    }
+    return "messages.log";
+  }
+
+  boolean help() {
+    return help;
+  }
+}
diff --git 
a/gossip/src/test/java/org/apache/tuweni/gossip/GossipCommandLineOptionsTest.java
 
b/gossip/src/test/java/org/apache/tuweni/gossip/GossipCommandLineOptionsTest.java
new file mode 100644
index 0000000..cd6b885
--- /dev/null
+++ 
b/gossip/src/test/java/org/apache/tuweni/gossip/GossipCommandLineOptionsTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.tuweni.gossip;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.apache.tuweni.config.Configuration;
+
+import java.net.URI;
+import java.net.UnknownHostException;
+
+import org.junit.jupiter.api.Test;
+
+class GossipCommandLineOptionsTest {
+
+  @Test
+  void testInvalidPort() {
+    GossipCommandLineOptions opts = new GossipCommandLineOptions(new 
String[0], -4, "0.0.0.0", null, 3, null);
+    assertThrows(IllegalArgumentException.class, opts::validate);
+  }
+
+  @Test
+  void testInvalidPeer() {
+    GossipCommandLineOptions opts =
+        new GossipCommandLineOptions(new String[] 
{"tcp://400.300.200.100:9000"}, 10, "0.0.0.0", null, 3, null);
+    assertThrows(IllegalArgumentException.class, opts::validate);
+  }
+
+  @Test
+  void testInvalidNetworkInterface() {
+    GossipCommandLineOptions opts = new GossipCommandLineOptions(new String[] 
{}, 10, "400.300.200.100", null, 3, null);
+    assertThrows(IllegalArgumentException.class, opts::validate);
+  }
+
+  @Test
+  void operateFromConfig() throws UnknownHostException {
+    Configuration config = Configuration.fromToml(
+        ""
+            + "peers=[\"tcp://127.0.0.1:2000\"]\n"
+            + "listenPort=1080\n"
+            + "networkInterface=\"127.0.0.1\"\n"
+            + "messageLog=\"D:/Temp\"",
+        GossipCommandLineOptions.createConfigFileSchema());
+    GossipCommandLineOptions opts = new GossipCommandLineOptions(null, null, 
null, null, 3000, config);
+    opts.validate();
+    assertEquals(1080, opts.listenPort());
+    assertEquals(1, opts.peers().size());
+    assertEquals(URI.create("tcp://127.0.0.1:2000"), opts.peers().get(0));
+    assertEquals("127.0.0.1", opts.networkInterface());
+    assertEquals("D:/Temp", opts.messageLog());
+  }
+
+  @Test
+  void invalidConfigFilePort() throws UnknownHostException {
+    Configuration config = Configuration.fromToml(
+        ""
+            + "peers=[\"tcp://127.0.0.1:3000\"]\n"
+            + "listenPort=500000\n"
+            + "networkInterface=\"127.0.0.1\"\n"
+            + "messageLog=\"D:/Temp\"",
+        GossipCommandLineOptions.createConfigFileSchema());
+    GossipCommandLineOptions opts = new GossipCommandLineOptions(null, null, 
null, null, 3000, config);
+    assertThrows(IllegalArgumentException.class, opts::validate);
+  }
+
+  @Test
+  void cliConfigOverConfigFile() throws UnknownHostException {
+    Configuration config = Configuration.fromToml(
+        ""
+            + "peers=\"tcp://127.0.0.1:3000\"\n"
+            + "listenPort=1080\n"
+            + "networkInterface=\"127.0.0.1\"\n"
+            + "messageLog=\"D:/Temp\"");
+    GossipCommandLineOptions opts =
+        new GossipCommandLineOptions(new String[] {"tcp://192.168.0.1:3000"}, 
400, "0.0.0.0", "C:/Temp", 3000, config);
+    assertEquals(400, opts.listenPort());
+    assertEquals(1, opts.peers().size());
+    assertEquals(URI.create("tcp://192.168.0.1:3000"), opts.peers().get(0));
+    assertEquals("0.0.0.0", opts.networkInterface());
+    assertEquals("C:/Temp", opts.messageLog());
+  }
+
+}
diff --git 
a/gossip/src/test/java/org/apache/tuweni/gossip/GossipIntegrationTest.java 
b/gossip/src/test/java/org/apache/tuweni/gossip/GossipIntegrationTest.java
new file mode 100644
index 0000000..7ff14d0
--- /dev/null
+++ b/gossip/src/test/java/org/apache/tuweni/gossip/GossipIntegrationTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.tuweni.gossip;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+import org.apache.tuweni.bytes.Bytes;
+import org.apache.tuweni.bytes.Bytes32;
+import org.apache.tuweni.junit.*;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpClient;
+import io.vertx.core.http.HttpMethod;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith({VertxExtension.class, TempDirectoryExtension.class, 
BouncyCastleExtension.class})
+class GossipIntegrationTest {
+
+  @Test
+  void threeGossipServersStarting(@VertxInstance Vertx vertx, @TempDirectory 
Path tempDir) throws Exception {
+    GossipCommandLineOptions opts1 = new GossipCommandLineOptions(
+        new String[] {"tcp://127.0.0.1:9001", "tcp://127.0.0.1:9002"},
+        9000,
+        "127.0.0.1",
+        tempDir.resolve("log1.log").toString(),
+        10000,
+        null);
+    GossipCommandLineOptions opts2 = new GossipCommandLineOptions(
+        new String[] {"tcp://127.0.0.1:9000", "tcp://127.0.0.1:9002"},
+        9001,
+        "127.0.0.1",
+        tempDir.resolve("log2.log").toString(),
+        10001,
+        null);
+    GossipCommandLineOptions opts3 = new GossipCommandLineOptions(
+        new String[] {"tcp://127.0.0.1:9000", "tcp://127.0.0.1:9001"},
+        9002,
+        "127.0.0.1",
+        tempDir.resolve("log3.log").toString(),
+        10002,
+        null);
+    AtomicBoolean terminationRan = new AtomicBoolean(false);
+
+    ExecutorService service = Executors.newFixedThreadPool(3);
+
+    Future<GossipApp> app1Future = service.submit(() -> {
+      GossipApp app = new GossipApp(vertx, opts1, System.err, System.out, () 
-> {
+        terminationRan.set(true);
+      });
+      app.start();
+      return app;
+    });
+    Future<GossipApp> app2Future = service.submit(() -> {
+      GossipApp app = new GossipApp(vertx, opts2, System.err, System.out, () 
-> {
+        terminationRan.set(true);
+      });
+      app.start();
+      return app;
+    });
+    Future<GossipApp> app3Future = service.submit(() -> {
+      GossipApp app = new GossipApp(vertx, opts3, System.err, System.out, () 
-> {
+        terminationRan.set(true);
+      });
+      app.start();
+      return app;
+    });
+    GossipApp app1 = app1Future.get(10, TimeUnit.SECONDS);
+    GossipApp app2 = app2Future.get(10, TimeUnit.SECONDS);
+    GossipApp app3 = app3Future.get(10, TimeUnit.SECONDS);
+
+    assertFalse(terminationRan.get());
+
+    HttpClient client = vertx.createHttpClient();
+
+    for (int i = 0; i < 20; i++) {
+      client.request(HttpMethod.POST, 10000, "127.0.0.1", 
"/publish").exceptionHandler(thr -> {
+        throw new RuntimeException(thr);
+      }).handler(resp -> {
+
+      
}).end(Buffer.buffer(Bytes32.rightPad(Bytes.ofUnsignedInt(i)).toHexString().getBytes(StandardCharsets.UTF_8)));
+    }
+
+    List<String> receiver1 = null;
+
+    int counter = 0;
+    do {
+      Thread.sleep(1000);
+      counter++;
+      receiver1 = Files.readAllLines(tempDir.resolve("log2.log"));
+    } while (receiver1.size() < 20 && counter < 20);
+
+    client.close();
+
+    service.submit(app1::stop);
+    service.submit(app2::stop);
+    service.submit(app3::stop);
+
+
+    assertEquals(20, receiver1.size());
+    List<String> receiver2 = Files.readAllLines(tempDir.resolve("log3.log"));
+    assertEquals(20, receiver2.size());
+
+    service.shutdown();
+
+  }
+}
diff --git a/gradle/check-licenses.gradle b/gradle/check-licenses.gradle
index e3fd319..8cc6d21 100644
--- a/gradle/check-licenses.gradle
+++ b/gradle/check-licenses.gradle
@@ -90,6 +90,7 @@ downloadLicenses {
       'ASL, Version 2',
       'The Apache License, Version 2.0',
       'The Apache Software License, Version 2.0',
+      'The Apache Software License, version 2.0'
     ],
     (bsd): [
       'Berkeley Software Distribution (BSD) License',
diff --git 
a/plumtree/src/main/java/org/apache/tuweni/plumtree/EphemeralPeerRepository.java
 
b/plumtree/src/main/java/org/apache/tuweni/plumtree/EphemeralPeerRepository.java
index f75c2d8..acbdeec 100644
--- 
a/plumtree/src/main/java/org/apache/tuweni/plumtree/EphemeralPeerRepository.java
+++ 
b/plumtree/src/main/java/org/apache/tuweni/plumtree/EphemeralPeerRepository.java
@@ -14,15 +14,14 @@ package org.apache.tuweni.plumtree;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class EphemeralPeerRepository implements PeerRepository {
 
-  private final Set<Peer> eagerPushPeers = Collections.synchronizedSet(new 
HashSet<>());
-  private final Set<Peer> lazyPushPeers = Collections.synchronizedSet(new 
HashSet<>());
+  private final Set<Peer> eagerPushPeers = ConcurrentHashMap.newKeySet();
+  private final Set<Peer> lazyPushPeers = ConcurrentHashMap.newKeySet();
 
   @Override
   public void addEager(Peer peer) {
diff --git 
a/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
 
b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
index ba0a289..9cb2d38 100644
--- 
a/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
+++ 
b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
@@ -24,7 +24,9 @@ import org.apache.tuweni.plumtree.State;
 
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
+import javax.annotation.Nullable;
 
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -64,28 +66,30 @@ public final class VertxGossipServer {
 
     void handle(Buffer data) {
       buffer = Bytes.concatenate(buffer, Bytes.wrapBuffer(data));
-      Message message;
-      try {
-        JsonParser parser = 
mapper.getFactory().createParser(buffer.toArrayUnsafe());
-        message = parser.readValueAs(Message.class);
-        buffer = buffer.slice((int) 
parser.getCurrentLocation().getByteOffset());
-      } catch (IOException e) {
-        return;
-      }
+      while (!buffer.isEmpty()) {
+        Message message;
+        try {
+          JsonParser parser = 
mapper.getFactory().createParser(buffer.toArrayUnsafe());
+          message = parser.readValueAs(Message.class);
+          buffer = buffer.slice((int) 
parser.getCurrentLocation().getByteOffset());
+        } catch (IOException e) {
+          return;
+        }
 
-      switch (message.verb) {
-        case IHAVE:
-          state.receiveIHaveMessage(peer, 
Bytes.fromHexString(message.payload));
-          break;
-        case GOSSIP:
-          state.receiveGossipMessage(peer, message.attributes, 
Bytes.fromHexString(message.payload));
-          break;
-        case GRAFT:
-          state.receiveGraftMessage(peer, 
Bytes.fromHexString(message.payload));
-          break;
-        case PRUNE:
-          state.receivePruneMessage(peer);
-          break;
+        switch (message.verb) {
+          case IHAVE:
+            state.receiveIHaveMessage(peer, 
Bytes.fromHexString(message.payload));
+            break;
+          case GOSSIP:
+            state.receiveGossipMessage(peer, message.attributes, 
Bytes.fromHexString(message.payload));
+            break;
+          case GRAFT:
+            state.receiveGraftMessage(peer, 
Bytes.fromHexString(message.payload));
+            break;
+          case PRUNE:
+            state.receivePruneMessage(peer);
+            break;
+        }
       }
     }
 
@@ -113,14 +117,14 @@ public final class VertxGossipServer {
       MessageHashing messageHashing,
       PeerRepository peerRepository,
       Consumer<Bytes> payloadListener,
-      MessageValidator payloadValidator) {
+      @Nullable MessageValidator payloadValidator) {
     this.vertx = vertx;
     this.networkInterface = networkInterface;
     this.port = port;
     this.messageHashing = messageHashing;
     this.peerRepository = peerRepository;
     this.payloadListener = payloadListener;
-    this.payloadValidator = payloadValidator;
+    this.payloadValidator = payloadValidator == null ? (bytes, peer) -> true : 
payloadValidator;
   }
 
   public AsyncCompletion start() {
@@ -184,16 +188,21 @@ public final class VertxGossipServer {
       throw new IllegalStateException("Server has not started");
     }
     CompletableAsyncCompletion completion = AsyncCompletion.incomplete();
-    client.connect(port, host, res -> {
-      if (res.failed()) {
-        completion.completeExceptionally(res.cause());
-      } else {
-        completion.complete();
-        Peer peer = new SocketPeer(res.result());
-        SocketHandler handler = new SocketHandler(peer);
-        res.result().handler(handler::handle).closeHandler(handler::close);
-      }
-    });
+    AtomicInteger counter = new AtomicInteger(0);
+    while (!completion.isDone()) {
+      client.connect(port, host, res -> {
+        if (res.failed()) {
+          if (counter.incrementAndGet() > 5) {
+            completion.completeExceptionally(res.cause());
+          }
+        } else {
+          completion.complete();
+          Peer peer = new SocketPeer(res.result());
+          SocketHandler handler = new SocketHandler(peer);
+          res.result().handler(handler::handle).closeHandler(handler::close);
+        }
+      });
+    }
 
     return completion;
   }
diff --git 
a/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
 
b/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
index 384adb3..29801db 100644
--- 
a/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
+++ 
b/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
@@ -106,7 +106,7 @@ class VertxGossipServerTest {
     server3.connectTo("127.0.0.1", 10001).join();
     String attributes = "{\"message_type\": \"BLOCK\"}";
     server1.gossip(attributes, Bytes.fromHexString("deadbeef"));
-    Thread.sleep(1000);
+    Thread.sleep(2000);
     assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.get());
     assertEquals(Bytes.fromHexString("deadbeef"), messageReceived3.get());
     assertNull(messageReceived1.get());
@@ -161,7 +161,7 @@ class VertxGossipServerTest {
     assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.get());
     Thread.sleep(1000);
 
-    assertTrue(peerRepository1.lazyPushPeers().size() == 1 || 
peerRepository3.lazyPushPeers().size() == 1);
+    assertTrue(peerRepository1.lazyPushPeers().size() > 1 || 
peerRepository3.lazyPushPeers().size() > 1);
 
     server1.stop().join();
     server2.stop().join();
diff --git a/settings.gradle b/settings.gradle
index 99c3ff0..2dc5b3b 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -21,6 +21,7 @@ include 'dist'
 include 'eth'
 include 'eth-reference-tests'
 include 'eth-repository'
+include 'gossip'
 include 'io'
 include 'junit'
 include 'kademlia'


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to