This is an automated email from the ASF dual-hosted git repository. toulmean pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
commit ec647af92a1ba8537b2723e7f3d0b690eb1dc2d3 Author: Antoine Toulme <toulm...@apache.org> AuthorDate: Mon Apr 22 14:11:23 2019 -0700 Bring over latest changes on scuttlebutt --- scuttlebutt-rpc/build.gradle | 3 +- .../tuweni/scuttlebutt/rpc/RPCAsyncRequest.java | 12 +- .../tuweni/scuttlebutt/rpc/RPCErrorBody.java | 10 +- .../apache/tuweni/scuttlebutt/rpc/RPCFunction.java | 10 +- .../apache/tuweni/scuttlebutt/rpc/RPCMessage.java | 31 ++- .../tuweni/scuttlebutt/rpc/RPCRequestBody.java | 12 +- .../tuweni/scuttlebutt/rpc/RPCRequestType.java | 10 +- .../apache/tuweni/scuttlebutt/rpc/RPCResponse.java | 80 +++++++ .../tuweni/scuttlebutt/rpc/RPCStreamRequest.java | 12 +- .../tuweni/scuttlebutt/rpc/mux/Multiplexer.java | 22 +- .../tuweni/scuttlebutt/rpc/mux/RPCHandler.java | 243 +++++++++++---------- .../rpc/mux/ScuttlebuttStreamHandler.java | 14 +- .../mux/exceptions/ConnectionClosedException.java | 10 +- .../mux/exceptions/RPCRequestFailedException.java | 20 ++ .../scuttlebutt/rpc/PatchworkIntegrationTest.java | 8 - .../rpc/mux/PatchworkIntegrationTest.java | 99 ++++----- 16 files changed, 361 insertions(+), 235 deletions(-) diff --git a/scuttlebutt-rpc/build.gradle b/scuttlebutt-rpc/build.gradle index cf71a72..1c04ff4 100644 --- a/scuttlebutt-rpc/build.gradle +++ b/scuttlebutt-rpc/build.gradle @@ -1,9 +1,10 @@ -description = 'Scuttlebutt Handshake library' +description = 'Scuttlebutt RPC library' dependencies { compile project(':bytes') compile project(':concurrent') compile project(':crypto') + compileOnly 'io.vertx:vertx-core' compile project(':scuttlebutt') compile project(':scuttlebutt-handshake') compile 'org.logl:logl-api' diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCAsyncRequest.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCAsyncRequest.java index f4281d7..b94c224 100644 --- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCAsyncRequest.java +++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCAsyncRequest.java @@ -1,8 +1,8 @@ /* - * Copyright 2019 ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -10,9 +10,9 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package net.consensys.cava.scuttlebutt.rpc; +package org.apache.tuweni.scuttlebutt.rpc; -import net.consensys.cava.bytes.Bytes; +import org.apache.tuweni.bytes.Bytes; import java.util.List; diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCErrorBody.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCErrorBody.java index 87fa5b6..bd1e888 100644 --- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCErrorBody.java +++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCErrorBody.java @@ -1,8 +1,8 @@ /* - * Copyright 2019 ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -10,7 +10,7 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package net.consensys.cava.scuttlebutt.rpc; +package org.apache.tuweni.scuttlebutt.rpc; /** * An RPC message response body which contains an error diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCFunction.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCFunction.java index 511a491..cef5071 100644 --- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCFunction.java +++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCFunction.java @@ -1,8 +1,8 @@ /* - * Copyright 2019 ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -10,7 +10,7 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package net.consensys.cava.scuttlebutt.rpc; +package org.apache.tuweni.scuttlebutt.rpc; import java.util.ArrayList; import java.util.List; diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCMessage.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCMessage.java index 2f21645..7dade5d 100644 --- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCMessage.java +++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCMessage.java @@ -15,11 +15,12 @@ package org.apache.tuweni.scuttlebutt.rpc; import static java.nio.charset.StandardCharsets.UTF_8; import org.apache.tuweni.bytes.Bytes; +import org.apache.tuweni.scuttlebutt.rpc.mux.exceptions.RPCRequestFailedException; import java.io.IOException; +import java.util.Optional; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Optional; /** * Decoded RPC message, making elements of the message available directly. @@ -104,17 +105,41 @@ public final class RPCMessage { if (!isErrorMessage()) { // If the body of the response is 'true' or the error flag isn't set, it's a successful end condition - return Optional.absent(); + return Optional.empty(); } else { try { return Optional.of(asJSON(objectMapper, RPCErrorBody.class)); } catch (IOException e) { - return Optional.absent(); + return Optional.empty(); } } } /** + * + * @param objectMapper the objectmatter to deserialize the error with. + * + * @return an exception if this represents an error RPC response, otherwise nothing + */ + public Optional<RPCRequestFailedException> getException(ObjectMapper objectMapper) { + if (isErrorMessage()) { + Optional<RPCRequestFailedException> exception = + getErrorBody(objectMapper).map(errorBody -> new RPCRequestFailedException(errorBody.getMessage())); + + if (!exception.isPresent()) { + // If we failed to deserialize into the RPCErrorBody type there may be a bug in the server implementation + // which prevented it returning the correct type, so we just print whatever it returned + return Optional.of(new RPCRequestFailedException(this.asString())); + } else { + return exception; + } + + } else { + return Optional.empty(); + } + } + + /** * Provides the type of the body of the message: a binary message, a UTF-8 string or a JSON message. * * @return the type of the body: a binary message, a UTF-8 string or a JSON message diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCRequestBody.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCRequestBody.java index 70ad4e3..471d5b9 100644 --- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCRequestBody.java +++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCRequestBody.java @@ -1,8 +1,8 @@ /* - * Copyright 2019 ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -10,9 +10,9 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package net.consensys.cava.scuttlebutt.rpc; +package org.apache.tuweni.scuttlebutt.rpc; -import net.consensys.cava.bytes.Bytes; +import org.apache.tuweni.bytes.Bytes; import java.util.List; diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCRequestType.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCRequestType.java index 9703da4..85a18b2 100644 --- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCRequestType.java +++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCRequestType.java @@ -1,8 +1,8 @@ /* - * Copyright 2019 ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -10,7 +10,7 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package net.consensys.cava.scuttlebutt.rpc; +package org.apache.tuweni.scuttlebutt.rpc; import com.fasterxml.jackson.annotation.JsonProperty; diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCResponse.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCResponse.java new file mode 100644 index 0000000..73c6e86 --- /dev/null +++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCResponse.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.tuweni.scuttlebutt.rpc; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import org.apache.tuweni.bytes.Bytes; +import org.apache.tuweni.scuttlebutt.rpc.RPCFlag.BodyType; + +import java.io.IOException; + +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * A successful RPC response. + */ +public class RPCResponse { + + private final Bytes body; + private final BodyType bodyType; + + /** + * A successful RPC response. + * + * @param body the body of the response in bytes + * @param bodyType the type of the response (e.g. JSON, UTF-8 or binary.) + */ + public RPCResponse(Bytes body, BodyType bodyType) { + + this.body = body; + this.bodyType = bodyType; + } + + /** + * @return the RPC response body + */ + public Bytes body() { + return body; + } + + /** + * @return The type of the data contained in the body. + */ + public BodyType bodyType() { + return bodyType; + } + + /** + * Provides the body of the message as a UTF-8 string. + * + * @return the body of the message as a UTF-8 string + */ + public String asString() { + return new String(body().toArrayUnsafe(), UTF_8); + } + + /** + * Provides the body of the message, marshalled as a JSON object. + * + * @param objectMapper the object mapper to deserialize with + * @param clazz the JSON object class + * @param <T> the matching JSON object class + * @return a new instance of the JSON object class + * @throws IOException if an error occurs during marshalling + */ + public <T> T asJSON(ObjectMapper objectMapper, Class<T> clazz) throws IOException { + return objectMapper.readerFor(clazz).readValue(body().toArrayUnsafe()); + } + +} diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCStreamRequest.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCStreamRequest.java index ff198da..1249182 100644 --- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCStreamRequest.java +++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCStreamRequest.java @@ -1,8 +1,8 @@ /* - * Copyright 2019 ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -10,9 +10,9 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package net.consensys.cava.scuttlebutt.rpc; +package org.apache.tuweni.scuttlebutt.rpc; -import net.consensys.cava.bytes.Bytes; +import org.apache.tuweni.bytes.Bytes; import java.util.List; diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/Multiplexer.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/Multiplexer.java index 722dcbf..66b7d74 100644 --- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/Multiplexer.java +++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/Multiplexer.java @@ -1,8 +1,8 @@ /* - * Copyright 2019 ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -10,13 +10,13 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package net.consensys.cava.scuttlebutt.rpc.mux; +package org.apache.tuweni.scuttlebutt.rpc.mux; -import net.consensys.cava.concurrent.AsyncResult; -import net.consensys.cava.scuttlebutt.rpc.RPCAsyncRequest; -import net.consensys.cava.scuttlebutt.rpc.RPCMessage; -import net.consensys.cava.scuttlebutt.rpc.RPCStreamRequest; -import net.consensys.cava.scuttlebutt.rpc.mux.exceptions.ConnectionClosedException; +import org.apache.tuweni.concurrent.AsyncResult; +import org.apache.tuweni.scuttlebutt.rpc.RPCAsyncRequest; +import org.apache.tuweni.scuttlebutt.rpc.RPCResponse; +import org.apache.tuweni.scuttlebutt.rpc.RPCStreamRequest; +import org.apache.tuweni.scuttlebutt.rpc.mux.exceptions.ConnectionClosedException; import java.util.function.Function; @@ -35,7 +35,7 @@ public interface Multiplexer { * * @return an async result which will be completed with the result or an error if the request fails. */ - AsyncResult<RPCMessage> makeAsyncRequest(RPCAsyncRequest request); + AsyncResult<RPCResponse> makeAsyncRequest(RPCAsyncRequest request) throws JsonProcessingException; /** * Creates a request which opens a stream (e.g. a 'source' in the protocol docs.) diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java index 0ac4583..4655862 100644 --- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java +++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java @@ -1,8 +1,8 @@ /* - * Copyright 2019 ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -10,41 +10,36 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package net.consensys.cava.scuttlebutt.rpc.mux; - -import net.consensys.cava.bytes.Bytes; -import net.consensys.cava.concurrent.AsyncResult; -import net.consensys.cava.concurrent.CompletableAsyncResult; -import net.consensys.cava.scuttlebutt.handshake.vertx.ClientHandler; -import net.consensys.cava.scuttlebutt.rpc.RPCAsyncRequest; -import net.consensys.cava.scuttlebutt.rpc.RPCCodec; -import net.consensys.cava.scuttlebutt.rpc.RPCErrorBody; -import net.consensys.cava.scuttlebutt.rpc.RPCFlag; -import net.consensys.cava.scuttlebutt.rpc.RPCMessage; -import net.consensys.cava.scuttlebutt.rpc.RPCStreamRequest; -import net.consensys.cava.scuttlebutt.rpc.mux.exceptions.ConnectionClosedException; +package org.apache.tuweni.scuttlebutt.rpc.mux; + +import org.apache.tuweni.bytes.Bytes; +import org.apache.tuweni.concurrent.AsyncResult; +import org.apache.tuweni.concurrent.CompletableAsyncResult; +import org.apache.tuweni.scuttlebutt.handshake.vertx.ClientHandler; +import org.apache.tuweni.scuttlebutt.rpc.RPCAsyncRequest; +import org.apache.tuweni.scuttlebutt.rpc.RPCCodec; +import org.apache.tuweni.scuttlebutt.rpc.RPCFlag; +import org.apache.tuweni.scuttlebutt.rpc.RPCMessage; +import org.apache.tuweni.scuttlebutt.rpc.RPCResponse; +import org.apache.tuweni.scuttlebutt.rpc.RPCStreamRequest; +import org.apache.tuweni.scuttlebutt.rpc.mux.exceptions.ConnectionClosedException; +import org.apache.tuweni.scuttlebutt.rpc.mux.exceptions.RPCRequestFailedException; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.function.Consumer; import java.util.function.Function; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Optional; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; import org.logl.Logger; import org.logl.LoggerProvider; /** - * Handles RPC requests and responses from an active connection to a scuttlebutt node - * - * Note: the public methods on this class are synchronized so that a request is rejected if the connection has been - * closed before it begins and any 'in flight' requests are ended exceptionally with a 'connection closed' error without - * new incoming requests being added to the maps by threads. - * - * In the future,we could perhaps be carefully more fine grained about the locking if we require a high degree of - * concurrency. - * + * Handles RPC requests and responses from an active connection to a scuttlebutt node. */ public class RPCHandler implements Multiplexer, ClientHandler { @@ -53,7 +48,13 @@ public class RPCHandler implements Multiplexer, ClientHandler { private final Runnable connectionCloser; private final ObjectMapper objectMapper; - private Map<Integer, CompletableAsyncResult<RPCMessage>> awaitingAsyncResponse = new HashMap<>(); + /** + * We run each each update on the vertx event loop to update the request state synchronously, and to handle the + * underlying connection closing by failing the in progress requests and not accepting future requests + */ + private final Vertx vertx; + + private Map<Integer, CompletableAsyncResult<RPCResponse>> awaitingAsyncResponse = new HashMap<>(); private Map<Integer, ScuttlebuttStreamHandler> streams = new HashMap<>(); private boolean closed; @@ -61,16 +62,19 @@ public class RPCHandler implements Multiplexer, ClientHandler { /** * Makes RPC requests over a connection * + * @param vertx The vertx instance to queue requests with * @param messageSender sends the request to the node * @param terminationFn closes the connection * @param objectMapper the objectMapper to serialize and deserialize message request and response bodies * @param logger */ public RPCHandler( + Vertx vertx, Consumer<Bytes> messageSender, Runnable terminationFn, ObjectMapper objectMapper, LoggerProvider logger) { + this.vertx = vertx; this.messageSender = messageSender; this.connectionCloser = terminationFn; this.closed = false; @@ -80,86 +84,116 @@ public class RPCHandler implements Multiplexer, ClientHandler { } @Override - public synchronized AsyncResult<RPCMessage> makeAsyncRequest(RPCAsyncRequest request) { + public AsyncResult<RPCResponse> makeAsyncRequest(RPCAsyncRequest request) throws JsonProcessingException { - CompletableAsyncResult<RPCMessage> result = AsyncResult.incomplete(); + Bytes bodyBytes = request.toEncodedRpcMessage(objectMapper); - if (closed) { - result.completeExceptionally(new ConnectionClosedException()); - } + CompletableAsyncResult<RPCResponse> result = AsyncResult.incomplete(); - try { - RPCMessage message = new RPCMessage(request.toEncodedRpcMessage(objectMapper)); - int requestNumber = message.requestNumber(); - awaitingAsyncResponse.put(requestNumber, result); - Bytes bytes = RPCCodec.encodeRequest(message.body(), requestNumber, request.getRPCFlags()); - messageSender.accept(bytes); + Handler<Void> synchronizedAddRequest = (event) -> { + if (closed) { + result.completeExceptionally(new ConnectionClosedException()); + } else { + RPCMessage message = new RPCMessage(bodyBytes); + int requestNumber = message.requestNumber(); - } catch (JsonProcessingException e) { - result.completeExceptionally(e); - } + awaitingAsyncResponse.put(requestNumber, result); + Bytes bytes = RPCCodec.encodeRequest(message.body(), requestNumber, request.getRPCFlags()); + sendBytes(bytes); + } + }; + vertx.runOnContext(synchronizedAddRequest); return result; } @Override - public synchronized void openStream( - RPCStreamRequest request, - Function<Runnable, ScuttlebuttStreamHandler> responseSink) throws JsonProcessingException, - ConnectionClosedException { + public void openStream(RPCStreamRequest request, Function<Runnable, ScuttlebuttStreamHandler> responseSink) + throws JsonProcessingException { - if (closed) { - throw new ConnectionClosedException(); - } + Bytes bodyBytes = request.toEncodedRpcMessage(objectMapper); + + Handler<Void> synchronizedRequest = (event) -> { - try { RPCFlag[] rpcFlags = request.getRPCFlags(); - RPCMessage message = new RPCMessage(request.toEncodedRpcMessage(objectMapper)); + RPCMessage message = new RPCMessage(bodyBytes); int requestNumber = message.requestNumber(); - Bytes bytes = RPCCodec.encodeRequest(message.body(), requestNumber, rpcFlags); - messageSender.accept(bytes); - - Runnable closeStreamHandler = new Runnable() { - @Override - public void run() { + Bytes requestBytes = RPCCodec.encodeRequest(message.body(), requestNumber, rpcFlags); - try { - Bytes bytes = RPCCodec.encodeStreamEndRequest(requestNumber); - messageSender.accept(bytes); - } catch (JsonProcessingException e) { - logger.warn("Unexpectedly could not encode stream end message to JSON."); - } + Runnable closeStreamHandler = () -> { + try { + Bytes streamEnd = RPCCodec.encodeStreamEndRequest(requestNumber); + sendBytes(streamEnd); + } catch (JsonProcessingException e) { + logger.warn("Unexpectedly could not encode stream end message to JSON."); } + }; ScuttlebuttStreamHandler scuttlebuttStreamHandler = responseSink.apply(closeStreamHandler); - streams.put(requestNumber, scuttlebuttStreamHandler); - } catch (JsonProcessingException ex) { - throw ex; - } + if (closed) { + scuttlebuttStreamHandler.onStreamError(new ConnectionClosedException()); + } else { + streams.put(requestNumber, scuttlebuttStreamHandler); + sendBytes(requestBytes); + } + + + }; + + vertx.runOnContext(synchronizedRequest); } @Override - public synchronized void close() { - connectionCloser.run(); + public void close() { + vertx.runOnContext((event) -> { + connectionCloser.run(); + }); } @Override - public synchronized void receivedMessage(Bytes message) { + public void receivedMessage(Bytes message) { - RPCMessage rpcMessage = new RPCMessage(message); + Handler<Void> synchronizedHandleMessage = (event) -> { + RPCMessage rpcMessage = new RPCMessage(message); - // A negative request number indicates that this is a response, rather than a request that this node - // should service - if (rpcMessage.requestNumber() < 0) { - handleResponse(rpcMessage); - } else { - handleRequest(rpcMessage); - } + // A negative request number indicates that this is a response, rather than a request that this node + // should service + if (rpcMessage.requestNumber() < 0) { + handleResponse(rpcMessage); + } else { + handleRequest(rpcMessage); + } + }; + vertx.runOnContext(synchronizedHandleMessage); + } + + @Override + public void streamClosed() { + + Handler<Void> synchronizedCloseStream = (event) -> { + closed = true; + + streams.forEach((key, streamHandler) -> { + streamHandler.onStreamError(new ConnectionClosedException()); + }); + + streams.clear(); + + awaitingAsyncResponse.forEach((key, value) -> { + if (!value.isDone()) { + value.completeExceptionally(new ConnectionClosedException()); + } + }); + + awaitingAsyncResponse.clear(); + }; + + vertx.runOnContext(synchronizedCloseStream); } private void handleRequest(RPCMessage rpcMessage) { @@ -179,6 +213,8 @@ public class RPCHandler implements Multiplexer, ClientHandler { boolean isStream = RPCFlag.Stream.STREAM.isApplied(rpcFlags); + Optional<RPCRequestFailedException> exception = response.getException(objectMapper); + if (isStream) { ScuttlebuttStreamHandler scuttlebuttStreamHandler = streams.get(requestNumber); @@ -187,20 +223,11 @@ public class RPCHandler implements Multiplexer, ClientHandler { if (response.isSuccessfulLastMessage()) { streams.remove(requestNumber); scuttlebuttStreamHandler.onStreamEnd(); - } else if (response.isErrorMessage()) { - - Optional<RPCErrorBody> errorBody = response.getErrorBody(objectMapper); - - if (errorBody.isPresent()) { - scuttlebuttStreamHandler.onStreamError(new Exception(errorBody.get().getMessage())); - } else { - // This shouldn't happen, but for safety we fall back to just writing the whole body in the exception message - // if we fail to marshall it for whatever reason - scuttlebuttStreamHandler.onStreamError(new Exception(response.asString())); - } - + } else if (exception.isPresent()) { + scuttlebuttStreamHandler.onStreamError(exception.get()); } else { - scuttlebuttStreamHandler.onMessage(response); + RPCResponse successfulResponse = new RPCResponse(response.body(), response.bodyType()); + scuttlebuttStreamHandler.onMessage(successfulResponse); } } else { logger.warn( @@ -212,11 +239,18 @@ public class RPCHandler implements Multiplexer, ClientHandler { } else { - CompletableAsyncResult<RPCMessage> rpcMessageFuture = awaitingAsyncResponse.get(requestNumber); + CompletableAsyncResult<RPCResponse> rpcMessageFuture = awaitingAsyncResponse.remove(requestNumber); if (rpcMessageFuture != null) { - rpcMessageFuture.complete(response); - awaitingAsyncResponse.remove(requestNumber); + + if (exception.isPresent()) { + rpcMessageFuture.completeExceptionally(exception.get()); + } else { + RPCResponse successfulResponse = new RPCResponse(response.body(), response.bodyType()); + + rpcMessageFuture.complete(successfulResponse); + } + } else { logger.warn( "Couldn't find async handler for RPC response with request number " @@ -228,23 +262,8 @@ public class RPCHandler implements Multiplexer, ClientHandler { } - @Override - public void streamClosed() { - this.closed = true; - - streams.forEach((key, streamHandler) -> { - streamHandler.onStreamError(new ConnectionClosedException()); - }); - - streams.clear(); - - awaitingAsyncResponse.forEach((key, value) -> { - if (!value.isDone()) { - value.completeExceptionally(new ConnectionClosedException()); - } - - }); - - + private void sendBytes(Bytes bytes) { + messageSender.accept(bytes); } + } diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/ScuttlebuttStreamHandler.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/ScuttlebuttStreamHandler.java index d108663..d64b54a 100644 --- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/ScuttlebuttStreamHandler.java +++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/ScuttlebuttStreamHandler.java @@ -1,8 +1,8 @@ /* - * Copyright 2019 ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -10,9 +10,9 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package net.consensys.cava.scuttlebutt.rpc.mux; +package org.apache.tuweni.scuttlebutt.rpc.mux; -import net.consensys.cava.scuttlebutt.rpc.RPCMessage; +import org.apache.tuweni.scuttlebutt.rpc.RPCResponse; /** * Handles incoming items from a result stream @@ -24,7 +24,7 @@ public interface ScuttlebuttStreamHandler { * * @param message */ - void onMessage(RPCMessage message); + void onMessage(RPCResponse message); /** * Invoked when the stream has been closed. diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/exceptions/ConnectionClosedException.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/exceptions/ConnectionClosedException.java index 160946c..a3c269c 100644 --- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/exceptions/ConnectionClosedException.java +++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/exceptions/ConnectionClosedException.java @@ -1,8 +1,8 @@ /* - * Copyright 2019 ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -10,7 +10,7 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package net.consensys.cava.scuttlebutt.rpc.mux.exceptions; +package org.apache.tuweni.scuttlebutt.rpc.mux.exceptions; public class ConnectionClosedException extends Exception { diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/exceptions/RPCRequestFailedException.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/exceptions/RPCRequestFailedException.java new file mode 100644 index 0000000..5baefca --- /dev/null +++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/exceptions/RPCRequestFailedException.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.tuweni.scuttlebutt.rpc.mux.exceptions; + +public final class RPCRequestFailedException extends RuntimeException { + + public RPCRequestFailedException(String errorMessage) { + super(errorMessage); + } +} diff --git a/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/PatchworkIntegrationTest.java b/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/PatchworkIntegrationTest.java index 3178fae..9d8527d 100644 --- a/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/PatchworkIntegrationTest.java +++ b/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/PatchworkIntegrationTest.java @@ -16,13 +16,11 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assumptions.assumeTrue; import org.apache.tuweni.bytes.Bytes; import org.apache.tuweni.bytes.Bytes32; import org.apache.tuweni.concurrent.AsyncResult; import org.apache.tuweni.crypto.sodium.Signature; -import org.apache.tuweni.crypto.sodium.Sodium; import org.apache.tuweni.io.Base64; import org.apache.tuweni.junit.VertxExtension; import org.apache.tuweni.junit.VertxInstance; @@ -43,7 +41,6 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import io.vertx.core.Vertx; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -60,11 +57,6 @@ import org.logl.vertx.LoglLogDelegateFactory; @ExtendWith(VertxExtension.class) class PatchworkIntegrationTest { - @BeforeAll - static void checkAvailable() { - assumeTrue(Sodium.isAvailable(), "Sodium native library is not available"); - } - public static class MyClientHandler implements ClientHandler { private final Consumer<Bytes> sender; diff --git a/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/mux/PatchworkIntegrationTest.java b/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/mux/PatchworkIntegrationTest.java index 293ac92..6581b40 100644 --- a/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/mux/PatchworkIntegrationTest.java +++ b/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/mux/PatchworkIntegrationTest.java @@ -1,8 +1,8 @@ /* - * Copyright 2019 ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -10,7 +10,7 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package net.consensys.cava.scuttlebutt.rpc.mux; +package org.apache.tuweni.scuttlebutt.rpc.mux; /* * Copyright 2019 ConsenSys AG. @@ -26,22 +26,20 @@ package net.consensys.cava.scuttlebutt.rpc.mux; */ import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; - -import net.consensys.cava.bytes.Bytes; -import net.consensys.cava.bytes.Bytes32; -import net.consensys.cava.concurrent.AsyncResult; -import net.consensys.cava.concurrent.CompletableAsyncResult; -import net.consensys.cava.crypto.sodium.Signature; -import net.consensys.cava.io.Base64; -import net.consensys.cava.junit.VertxExtension; -import net.consensys.cava.junit.VertxInstance; -import net.consensys.cava.scuttlebutt.handshake.vertx.SecureScuttlebuttVertxClient; -import net.consensys.cava.scuttlebutt.rpc.RPCAsyncRequest; -import net.consensys.cava.scuttlebutt.rpc.RPCFunction; -import net.consensys.cava.scuttlebutt.rpc.RPCMessage; -import net.consensys.cava.scuttlebutt.rpc.RPCStreamRequest; -import net.consensys.cava.scuttlebutt.rpc.mux.exceptions.ConnectionClosedException; + +import org.apache.tuweni.bytes.Bytes; +import org.apache.tuweni.bytes.Bytes32; +import org.apache.tuweni.concurrent.AsyncResult; +import org.apache.tuweni.concurrent.CompletableAsyncResult; +import org.apache.tuweni.crypto.sodium.Signature; +import org.apache.tuweni.io.Base64; +import org.apache.tuweni.junit.VertxExtension; +import org.apache.tuweni.junit.VertxInstance; +import org.apache.tuweni.scuttlebutt.handshake.vertx.SecureScuttlebuttVertxClient; +import org.apache.tuweni.scuttlebutt.rpc.RPCAsyncRequest; +import org.apache.tuweni.scuttlebutt.rpc.RPCFunction; +import org.apache.tuweni.scuttlebutt.rpc.RPCResponse; +import org.apache.tuweni.scuttlebutt.rpc.RPCStreamRequest; import java.io.BufferedWriter; import java.io.File; @@ -78,27 +76,21 @@ public class PatchworkIntegrationTest { RPCHandler rpcHandler = makeRPCHandler(vertx); - List<AsyncResult<RPCMessage>> results = new ArrayList<>(); + List<AsyncResult<RPCResponse>> results = new ArrayList<>(); for (int i = 0; i < 10; i++) { RPCFunction function = new RPCFunction("whoami"); RPCAsyncRequest asyncRequest = new RPCAsyncRequest(function, new ArrayList<>()); - AsyncResult<RPCMessage> res = rpcHandler.makeAsyncRequest(asyncRequest); + AsyncResult<RPCResponse> res = rpcHandler.makeAsyncRequest(asyncRequest); results.add(res); } - AsyncResult<List<RPCMessage>> allResults = AsyncResult.combine(results); - List<RPCMessage> rpcMessages = allResults.get(); + AsyncResult<List<RPCResponse>> allResults = AsyncResult.combine(results); + List<RPCResponse> rpcMessages = allResults.get(); assertEquals(10, rpcMessages.size()); - - rpcMessages.forEach(msg -> { - assertFalse(msg.lastMessageOrError()); - - }); - } @@ -158,7 +150,7 @@ public class PatchworkIntegrationTest { RPCHandler rpcHandler = makeRPCHandler(vertx); - List<AsyncResult<RPCMessage>> results = new ArrayList<>(); + List<AsyncResult<RPCResponse>> results = new ArrayList<>(); for (int i = 0; i < 20; i++) { // Note: in a real use case, this would more likely be a Java class with these fields @@ -168,13 +160,13 @@ public class PatchworkIntegrationTest { RPCAsyncRequest asyncRequest = new RPCAsyncRequest(new RPCFunction("publish"), Arrays.asList(params)); - AsyncResult<RPCMessage> rpcMessageAsyncResult = rpcHandler.makeAsyncRequest(asyncRequest); + AsyncResult<RPCResponse> rpcMessageAsyncResult = rpcHandler.makeAsyncRequest(asyncRequest); results.add(rpcMessageAsyncResult); } - List<RPCMessage> rpcMessages = AsyncResult.combine(results).get(); + List<RPCResponse> rpcMessages = AsyncResult.combine(results).get(); rpcMessages.forEach(msg -> System.out.println(msg.asString())); } @@ -196,7 +188,7 @@ public class PatchworkIntegrationTest { AsyncResult<RPCHandler> onConnect = secureScuttlebuttVertxClient.connectTo(port, host, keyPair.publicKey(), (sender, terminationFn) -> { - return new RPCHandler(sender, terminationFn, new ObjectMapper(), loggerProvider); + return new RPCHandler(vertx, sender, terminationFn, new ObjectMapper(), loggerProvider); }); return onConnect.get(); @@ -219,26 +211,23 @@ public class PatchworkIntegrationTest { RPCStreamRequest streamRequest = new RPCStreamRequest(new RPCFunction("createUserStream"), Arrays.asList(params)); - try { - handler.openStream(streamRequest, (closeStream) -> new ScuttlebuttStreamHandler() { - @Override - public void onMessage(RPCMessage message) { - System.out.print(message.asString()); - } - - @Override - public void onStreamEnd() { - streamEnded.complete(null); - } - - @Override - public void onStreamError(Exception ex) { - - } - }); - } catch (ConnectionClosedException e) { - throw e; - } + handler.openStream(streamRequest, (closeStream) -> new ScuttlebuttStreamHandler() { + @Override + public void onMessage(RPCResponse message) { + System.out.print(message.asString()); + } + + @Override + public void onStreamEnd() { + streamEnded.complete(null); + } + + @Override + public void onStreamError(Exception ex) { + + streamEnded.completeExceptionally(ex); + } + }); // Wait until the stream is complete streamEnded.get(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@tuweni.apache.org For additional commands, e-mail: commits-h...@tuweni.apache.org