This is an automated email from the ASF dual-hosted git repository.
toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
The following commit(s) were added to refs/heads/master by this push:
new 7191c74 Hobbits implementation for TCP/HTTP/UDP/WS. Still WIP
7191c74 is described below
commit 7191c7498d3da3c15e21b57866dc7f8cff0595d4
Author: Antoine Toulme <[email protected]>
AuthorDate: Fri Jun 7 13:31:14 2019 -0700
Hobbits implementation for TCP/HTTP/UDP/WS. Still WIP
---
.../org/apache/tuweni/hobbits/HobbitsTransport.kt | 178 ++++++++++++---
.../kotlin/org/apache/tuweni/hobbits/Message.kt | 10 +
.../apache/tuweni/hobbits/HobbitsTransportTest.kt | 2 +-
.../org/apache/tuweni/hobbits/InteractionTest.kt | 246 +++++++++++++++++++++
4 files changed, 409 insertions(+), 27 deletions(-)
diff --git
a/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/HobbitsTransport.kt
b/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/HobbitsTransport.kt
index 4c1d16c..8f3ddf0 100644
--- a/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/HobbitsTransport.kt
+++ b/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/HobbitsTransport.kt
@@ -26,6 +26,7 @@ import io.vertx.core.net.NetClient
import io.vertx.core.net.NetServer
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
+import org.apache.tuweni.bytes.Bytes
import org.apache.tuweni.concurrent.AsyncCompletion
import org.apache.tuweni.concurrent.coroutines.await
import java.lang.IllegalArgumentException
@@ -38,7 +39,7 @@ import kotlin.coroutines.CoroutineContext
/**
* Hobbits is a peer-to-peer transport stack specified at
https://www.github.com/deltap2p/hobbits.
*
- * This class works as a transport mechanism that can leverage a variety of
network transport
+ * This class works as a transport mech anism that can leverage a variety of
network transport
* mechanisms, such as TCP, HTTP, UDP and Web sockets.
*
* It can be used to contact other Hobbits endpoints, or to expose endpoints
to the network.
@@ -56,27 +57,42 @@ class HobbitsTransport(
private val udpEndpoints = mutableMapOf<String, Endpoint>()
private val wsEndpoints = mutableMapOf<String, Endpoint>()
+ private var exceptionHandler: ((Throwable) -> Unit)? = { }
+
private var httpClient: HttpClient? = null
private var tcpClient: NetClient? = null
private var udpClient: DatagramSocket? = null
- private var httpServer: HttpServer? = null
- private var tcpServer: NetServer? = null
+ private val httpServers = mutableMapOf<String, HttpServer>()
+ private val tcpServers = mutableMapOf<String, NetServer>()
+ private val udpServers = mutableMapOf<String, DatagramSocket>()
+ private val wsServers = mutableMapOf<String, HttpServer>()
+
+ /**
+ * Sets an exception handler that will be called whenever an exception
occurs during transport.
+ */
+ fun exceptionHandler(handler: (Throwable) -> Unit) {
+ exceptionHandler = handler
+ }
/**
* Creates a new endpoint over http.
* @param networkInterface the network interface to bind the endpoint to
* @param port the port to serve traffic from
+ * @param requestURI the request URI path to match
* @param tls whether the endpoint should be secured using TLS
+ * @param handler function called when a message is received
*/
fun createHTTPEndpoint(
id: String = "default",
networkInterface: String = "0.0.0.0",
port: Int = 9337,
- tls: Boolean = false
+ requestURI: String? = null,
+ tls: Boolean = false,
+ handler: (Message) -> Unit
) {
checkNotStarted()
- httpEndpoints[id] = Endpoint(networkInterface, port, tls)
+ httpEndpoints[id] = Endpoint(networkInterface, port, requestURI, tls,
handler)
}
/**
@@ -84,15 +100,17 @@ class HobbitsTransport(
* @param networkInterface the network interface to bind the endpoint to
* @param port the port to serve traffic from
* @param tls whether the endpoint should be secured using TLS
+ * @param handler function called when a message is received
*/
fun createTCPEndpoint(
id: String = "default",
networkInterface: String = "0.0.0.0",
port: Int = 9237,
- tls: Boolean = false
+ tls: Boolean = false,
+ handler: (Message) -> Unit
) {
checkNotStarted()
- tcpEndpoints[id] = Endpoint(networkInterface, port, tls)
+ tcpEndpoints[id] = Endpoint(networkInterface, port, null, tls, handler)
}
/**
@@ -100,43 +118,54 @@ class HobbitsTransport(
* @param networkInterface the network interface to bind the endpoint to
* @param port the port to serve traffic from
* @param tls whether the endpoint should be secured using TLS
+ * @param handler function called when a message is received
*/
- fun createUDPEndpoint(id: String = "default", networkInterface: String =
"0.0.0.0", port: Int = 9137) {
+ fun createUDPEndpoint(
+ id: String = "default",
+ networkInterface: String = "0.0.0.0",
+ port: Int = 9137,
+ handler: (Message) -> Unit
+ ) {
checkNotStarted()
- udpEndpoints[id] = Endpoint(networkInterface, port, false)
+ udpEndpoints[id] = Endpoint(networkInterface, port, null, false, handler)
}
/**
* Creates a new endpoint over websocket connections.
* @param networkInterface the network interface to bind the endpoint to
* @param port the port to serve traffic from
+ * @param requestURI the request URI path to match
* @param tls whether the endpoint should be secured using TLS
+ * @param handler function called when a message is received
*/
fun createWSEndpoint(
id: String = "default",
networkInterface: String = "0.0.0.0",
port: Int = 9037,
- tls: Boolean = false
+ requestURI: String? = null,
+ tls: Boolean = false,
+ handler: (Message) -> Unit
) {
checkNotStarted()
- wsEndpoints[id] = Endpoint(networkInterface, port, tls)
+ wsEndpoints[id] = Endpoint(networkInterface, port, requestURI, tls,
handler)
}
/**
* Sends a message using the transport specified.
*
*/
- suspend fun sendMessage(message: Message, transport: Transport, host:
String, port: Int) {
+ suspend fun sendMessage(message: Message, transport: Transport, host:
String, port: Int, requestURI: String = "") {
checkStarted()
val completion = AsyncCompletion.incomplete()
when (transport) {
Transport.HTTP -> {
@Suppress("DEPRECATION")
- val req = httpClient!!.request(HttpMethod.POST, port, host,
"/").handler {
+ val req = httpClient!!.request(HttpMethod.POST, port, host, requestURI)
+ .exceptionHandler(exceptionHandler).handler {
if (it.statusCode() == 200) {
completion.complete()
} else {
- completion.completeExceptionally(RuntimeException())
+
completion.completeExceptionally(RuntimeException("${it.statusCode()}"))
}
}
req.end(Buffer.buffer(message.toBytes().toArrayUnsafe()))
@@ -146,16 +175,29 @@ class HobbitsTransport(
if (res.failed()) {
completion.completeExceptionally(res.cause())
} else {
- res.result().end(Buffer.buffer(message.toBytes().toArrayUnsafe()))
+
res.result().exceptionHandler(exceptionHandler).end(Buffer.buffer(message.toBytes().toArrayUnsafe()))
completion.complete()
}
}
}
Transport.UDP -> {
- TODO()
+ udpClient!!.send(Buffer.buffer(message.toBytes().toArrayUnsafe()),
port, host) { handler ->
+ if (handler.failed()) {
+ completion.completeExceptionally(handler.cause())
+ } else {
+ completion.complete()
+ }
+ }
}
Transport.WS -> {
- TODO()
+ httpClient!!.websocket(port, host, requestURI, { handler ->
+ handler.exceptionHandler(exceptionHandler)
+
.writeBinaryMessage(Buffer.buffer(message.toBytes().toArrayUnsafe())).end()
+ completion.complete()
+ },
+ { exception ->
+ completion.completeExceptionally(exception)
+ })
}
}
completion.await()
@@ -201,15 +243,45 @@ class HobbitsTransport(
if (started.compareAndSet(false, true)) {
httpClient = vertx.createHttpClient()
tcpClient = vertx.createNetClient()
- udpClient = vertx.createDatagramSocket()
-
- httpServer = vertx.createHttpServer()
- tcpServer = vertx.createNetServer()
+ udpClient =
vertx.createDatagramSocket().exceptionHandler(exceptionHandler)
val completions = mutableListOf<AsyncCompletion>()
- for (endpoint in httpEndpoints.values) {
+ for ((id, endpoint) in httpEndpoints) {
+ val completion = AsyncCompletion.incomplete()
+ val httpServer = vertx.createHttpServer()
+ httpServers[id] = httpServer
+
+ httpServer.requestHandler {
+ if (endpoint.requestURI == null ||
it.path().startsWith(endpoint.requestURI)) {
+ it.bodyHandler {
endpoint.handler(Message.readMessage(Bytes.wrapBuffer(it))!!) }
+ it.response().statusCode = 200
+ it.response().end()
+ } else {
+ it.response().statusCode = 404
+ it.response().end()
+ }
+ }.listen(endpoint.port, endpoint.networkInterface) {
+ if (it.failed()) {
+ completion.completeExceptionally(it.cause())
+ } else {
+ completion.complete()
+ }
+ }
+ completions.add(completion)
+ }
+ for ((id, endpoint) in tcpEndpoints) {
val completion = AsyncCompletion.incomplete()
- httpServer!!.listen(endpoint.port, endpoint.networkInterface) {
+ val tcpServer = vertx.createNetServer()
+ tcpServers[id] = tcpServer
+ tcpServer.connectHandler { connectHandler -> connectHandler.handler {
buffer ->
+ val message = Message.readMessage(Bytes.wrapBuffer(buffer))
+ if (message == null) {
+ TODO("Buffer not implemented yet")
+ } else {
+ endpoint.handler(message)
+ }
+ }
+ }.listen(endpoint.port, endpoint.networkInterface) {
if (it.failed()) {
completion.completeExceptionally(it.cause())
} else {
@@ -218,9 +290,48 @@ class HobbitsTransport(
}
completions.add(completion)
}
- for (endpoint in tcpEndpoints.values) {
+ for ((id, endpoint) in udpEndpoints) {
val completion = AsyncCompletion.incomplete()
- tcpServer!!.listen(endpoint.port, endpoint.networkInterface) {
+
+ val udpServer = vertx.createDatagramSocket()
+ udpServers[id] = udpServer
+
+ udpServer.handler { packet ->
+ val message = Message.readMessage(Bytes.wrapBuffer(packet.data()))
+ if (message == null) {
+ TODO("Buffer not implemented yet")
+ } else {
+ endpoint.handler(message)
+ }
+ }.listen(endpoint.port, endpoint.networkInterface) {
+ if (it.failed()) {
+ completion.completeExceptionally(it.cause())
+ } else {
+ completion.complete()
+ }
+ }
+ completions.add(completion)
+ }
+ for ((id, endpoint) in wsEndpoints) {
+ val completion = AsyncCompletion.incomplete()
+ val httpServer = vertx.createHttpServer()
+ wsServers[id] = httpServer
+
+ httpServer.websocketHandler {
+ if (endpoint.requestURI == null ||
it.path().startsWith(endpoint.requestURI)) {
+ it.accept()
+
+ it.binaryMessageHandler { buffer ->
+ try {
+
endpoint.handler(Message.readMessage(Bytes.wrapBuffer(buffer))!!)
+ } finally {
+ it.end()
+ }
+ }
+ } else {
+ it.reject()
+ }
+ }.listen(endpoint.port, endpoint.networkInterface) {
if (it.failed()) {
completion.completeExceptionally(it.cause())
} else {
@@ -238,6 +349,15 @@ class HobbitsTransport(
httpClient!!.close()
tcpClient!!.close()
udpClient!!.close()
+ for (server in httpServers.values) {
+ server.close()
+ }
+ for (server in tcpServers.values) {
+ server.close()
+ }
+ for (server in udpServers.values) {
+ server.close()
+ }
}
}
@@ -254,7 +374,13 @@ class HobbitsTransport(
}
}
-internal data class Endpoint(val networkInterface: String, val port: Int, val
tls: Boolean)
+internal data class Endpoint(
+ val networkInterface: String,
+ val port: Int,
+ val requestURI: String?,
+ val tls: Boolean,
+ val handler: (Message) -> Unit
+)
enum class Transport() {
HTTP,
diff --git a/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/Message.kt
b/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/Message.kt
index 14c2cc3..259da84 100644
--- a/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/Message.kt
+++ b/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/Message.kt
@@ -85,9 +85,19 @@ class Message(
/**
* Writes a message into bytes.
+ * @return the bytes of the message
*/
fun toBytes(): Bytes {
val requestLine = "$protocol $version $command ${headers.size()}
${body.size()}\n"
return Bytes.concatenate(Bytes.wrap(requestLine.toByteArray()), headers,
body)
}
+
+ /**
+ * Provides the size of the message
+ * @return the size of the message
+ */
+ fun size(): Int {
+ return protocol.length + 5 + version.length + command.length +
headers.size().toString().length +
+ body.size().toString().length + headers.size() + body.size()
+ }
}
diff --git
a/hobbits/src/test/kotlin/org/apache/tuweni/hobbits/HobbitsTransportTest.kt
b/hobbits/src/test/kotlin/org/apache/tuweni/hobbits/HobbitsTransportTest.kt
index 8ddac68..2984dd0 100644
--- a/hobbits/src/test/kotlin/org/apache/tuweni/hobbits/HobbitsTransportTest.kt
+++ b/hobbits/src/test/kotlin/org/apache/tuweni/hobbits/HobbitsTransportTest.kt
@@ -57,7 +57,7 @@ class HobbitsTransportTest {
val server = HobbitsTransport(vertx)
server.start()
val exception: IllegalStateException = assertThrows {
- server.createHTTPEndpoint()
+ server.createHTTPEndpoint(handler = {})
}
assertEquals("Server already started", exception.message)
}
diff --git
a/hobbits/src/test/kotlin/org/apache/tuweni/hobbits/InteractionTest.kt
b/hobbits/src/test/kotlin/org/apache/tuweni/hobbits/InteractionTest.kt
new file mode 100644
index 0000000..49eeb09
--- /dev/null
+++ b/hobbits/src/test/kotlin/org/apache/tuweni/hobbits/InteractionTest.kt
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.hobbits
+
+import io.vertx.core.Vertx
+import kotlinx.coroutines.runBlocking
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.junit.VertxExtension
+import org.apache.tuweni.junit.VertxInstance
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.extension.ExtendWith
+import java.util.concurrent.atomic.AtomicReference
+
+@ExtendWith(VertxExtension::class)
+class TCPPersistentTest {
+
+ @Test
+ fun testTwoTCPConnections(@VertxInstance vertx: Vertx) {
+ val ref = AtomicReference<Message>()
+ val client1 = HobbitsTransport(vertx)
+ val client2 = HobbitsTransport(vertx)
+ runBlocking {
+ client1.createTCPEndpoint("foo", port = 10000, handler = ref::set)
+ client1.start()
+ client2.start()
+ client2.sendMessage(
+ Message(command = "WHO", body = Bytes.fromHexString("deadbeef"),
headers = Bytes.random(16)),
+ Transport.TCP,
+ "0.0.0.0",
+ 10000
+ )
+ }
+ Thread.sleep(200)
+ assertEquals(Bytes.fromHexString("deadbeef"), ref.get().body)
+ client1.stop()
+ client2.stop()
+ }
+
+ @Test
+ fun testTwoEndpoints(@VertxInstance vertx: Vertx) {
+ val ref = AtomicReference<Message>()
+ val ref2 = AtomicReference<Message>()
+ val client1 = HobbitsTransport(vertx)
+ val client2 = HobbitsTransport(vertx)
+ runBlocking {
+ client1.createTCPEndpoint("foo", port = 10000, handler = ref::set)
+ client1.createTCPEndpoint("bar", port = 10001, handler = ref2::set)
+ client1.start()
+ client2.start()
+ client2.sendMessage(
+ Message(command = "WHO", body = Bytes.fromHexString("deadbeef"),
headers = Bytes.random(16)),
+ Transport.TCP,
+ "0.0.0.0",
+ 10000
+ )
+ client2.sendMessage(
+ Message(command = "WHO", body = Bytes.fromHexString("deadbeef"),
headers = Bytes.random(16)),
+ Transport.TCP,
+ "0.0.0.0",
+ 10001
+ )
+ }
+ Thread.sleep(200)
+ assertEquals(Bytes.fromHexString("deadbeef"), ref.get().body)
+ assertEquals(Bytes.fromHexString("deadbeef"), ref2.get().body)
+ client1.stop()
+ client2.stop()
+ }
+}
+
+@ExtendWith(VertxExtension::class)
+class HTTPTest {
+ @Test
+ fun testTwoHTTPConnections(@VertxInstance vertx: Vertx) {
+ val ref = AtomicReference<Message>()
+ val client1 = HobbitsTransport(vertx)
+ val client2 = HobbitsTransport(vertx)
+
+ runBlocking {
+ client1.createHTTPEndpoint("foo", port = 10000, handler = ref::set)
+ client1.start()
+ client2.start()
+ client2.sendMessage(Message(command = "WHO", body =
Bytes.fromHexString("deadbeef"),
+ headers = Bytes.random(16)), Transport.HTTP, "0.0.0.0", 10000)
+ }
+ Thread.sleep(200)
+ assertEquals(Bytes.fromHexString("deadbeef"), ref.get().body)
+ client1.stop()
+ client2.stop()
+ }
+
+ @Test
+ fun testTwoEndpoints(@VertxInstance vertx: Vertx) {
+ val ref = AtomicReference<Message>()
+ val ref2 = AtomicReference<Message>()
+ val client1 = HobbitsTransport(vertx)
+ val client2 = HobbitsTransport(vertx)
+ runBlocking {
+ client1.createHTTPEndpoint("foo", port = 10000, handler = ref::set)
+ client1.createHTTPEndpoint("bar", port = 10001, handler = ref2::set)
+ client1.start()
+ client2.start()
+ client2.sendMessage(
+ Message(command = "WHO", body = Bytes.fromHexString("deadbeef"),
headers = Bytes.random(16)),
+ Transport.HTTP,
+ "0.0.0.0",
+ 10000
+ )
+ client2.sendMessage(
+ Message(command = "WHO", body = Bytes.fromHexString("deadbeef"),
headers = Bytes.random(16)),
+ Transport.HTTP,
+ "0.0.0.0",
+ 10001
+ )
+ }
+ Thread.sleep(200)
+ assertEquals(Bytes.fromHexString("deadbeef"), ref.get().body)
+ assertEquals(Bytes.fromHexString("deadbeef"), ref2.get().body)
+ client1.stop()
+ client2.stop()
+ }
+}
+
+@ExtendWith(VertxExtension::class)
+class UDPTest {
+ @Test
+ fun testTwoUDPConnections(@VertxInstance vertx: Vertx) {
+ val ref = AtomicReference<Message>()
+ val client1 = HobbitsTransport(vertx)
+ val client2 = HobbitsTransport(vertx)
+
+ runBlocking {
+ client1.createUDPEndpoint("foo", port = 10000, handler = ref::set)
+ client1.start()
+ client2.start()
+ client2.sendMessage(Message(command = "WHO", body =
Bytes.fromHexString("deadbeef"),
+ headers = Bytes.random(16)), Transport.UDP, "0.0.0.0", 10000)
+ }
+ Thread.sleep(200)
+ assertEquals(Bytes.fromHexString("deadbeef"), ref.get().body)
+ client1.stop()
+ client2.stop()
+ }
+
+ @Test
+ fun testTwoEndpoints(@VertxInstance vertx: Vertx) {
+ val ref = AtomicReference<Message>()
+ val ref2 = AtomicReference<Message>()
+ val client1 = HobbitsTransport(vertx)
+ val client2 = HobbitsTransport(vertx)
+ runBlocking {
+ client1.createUDPEndpoint("foo", port = 10000, handler = ref::set)
+ client1.createUDPEndpoint("bar", port = 10001, handler = ref2::set)
+ client1.start()
+ client2.start()
+ client2.sendMessage(
+ Message(command = "WHO", body = Bytes.fromHexString("deadbeef"),
headers = Bytes.random(16)),
+ Transport.UDP,
+ "0.0.0.0",
+ 10000
+ )
+ client2.sendMessage(
+ Message(command = "WHO", body = Bytes.fromHexString("deadbeef"),
headers = Bytes.random(16)),
+ Transport.UDP,
+ "0.0.0.0",
+ 10001
+ )
+ }
+ Thread.sleep(200)
+ assertEquals(Bytes.fromHexString("deadbeef"), ref.get().body)
+ assertEquals(Bytes.fromHexString("deadbeef"), ref2.get().body)
+ client1.stop()
+ client2.stop()
+ }
+}
+
+@ExtendWith(VertxExtension::class)
+class WebSocketTest {
+ @Test
+ fun testTwoWSConnections(@VertxInstance vertx: Vertx) {
+ vertx.exceptionHandler { it.printStackTrace() }
+ val ref = AtomicReference<Message>()
+ val client1 = HobbitsTransport(vertx)
+ val client2 = HobbitsTransport(vertx)
+
+ runBlocking {
+ client1.createWSEndpoint("foo", port = 10000, handler = ref::set)
+ client1.start()
+ client2.start()
+ client2.sendMessage(Message(command = "WHO", body =
Bytes.fromHexString("deadbeef"),
+ headers = Bytes.random(16)), Transport.WS, "0.0.0.0", 10000)
+ }
+ Thread.sleep(200)
+ assertEquals(Bytes.fromHexString("deadbeef"), ref.get().body)
+ client1.stop()
+ client2.stop()
+ }
+
+ @Test
+ fun testTwoEndpoints(@VertxInstance vertx: Vertx) {
+ val ref = AtomicReference<Message>()
+ val ref2 = AtomicReference<Message>()
+ val client1 = HobbitsTransport(vertx)
+ val client2 = HobbitsTransport(vertx)
+ runBlocking {
+ client1.exceptionHandler { it.printStackTrace() }
+ client2.exceptionHandler { it.printStackTrace() }
+ client1.createWSEndpoint("foo", port = 11000, handler = ref::set)
+ client1.createWSEndpoint("bar", port = 11001, handler = ref2::set)
+ client1.start()
+ client2.start()
+ client2.sendMessage(
+ Message(command = "WHO", body = Bytes.fromHexString("deadbeef"),
headers = Bytes.random(16)),
+ Transport.WS,
+ "0.0.0.0",
+ 11000
+ )
+ client2.sendMessage(
+ Message(command = "WHO", body = Bytes.fromHexString("deadbeef"),
headers = Bytes.random(16)),
+ Transport.WS,
+ "0.0.0.0",
+ 11001
+ )
+ }
+ Thread.sleep(200)
+ assertEquals(Bytes.fromHexString("deadbeef"), ref.get().body)
+ assertEquals(Bytes.fromHexString("deadbeef"), ref2.get().body)
+ client1.stop()
+ client2.stop()
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]