This is an automated email from the ASF dual-hosted git repository.

toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f47c16  Prevent it being possible to close streams more than once.
     new 4a6fb03  Merge pull request #26 from Happy0/stream_close
8f47c16 is described below

commit 8f47c16d138e7468a4a5ada373a2e5f09a71dd34
Author: Gordon Martin <[email protected]>
AuthorDate: Thu Jul 18 15:40:38 2019 +0100

    Prevent it being possible to close streams more than once.
---
 .../tuweni/scuttlebutt/rpc/mux/RPCHandler.java     | 55 +++++++++++++++++++---
 1 file changed, 48 insertions(+), 7 deletions(-)

diff --git 
a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java
 
b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java
index b0f3639..eb3fadc 100644
--- 
a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java
+++ 
b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java
@@ -99,6 +99,7 @@ public class RPCHandler implements Multiplexer, ClientHandler 
{
 
         awaitingAsyncResponse.put(requestNumber, result);
         Bytes bytes = RPCCodec.encodeRequest(message.body(), requestNumber, 
request.getRPCFlags());
+        logOutgoingRequest(message);
         sendBytes(bytes);
       }
     };
@@ -122,7 +123,16 @@ public class RPCHandler implements Multiplexer, 
ClientHandler {
       Bytes requestBytes = RPCCodec.encodeRequest(message.body(), 
requestNumber, rpcFlags);
 
       Runnable closeStreamHandler = () -> {
-        endStream(requestNumber);
+
+        // Run on vertx context because this callback may be called from a 
different
+        // thread by the caller
+        vertx.runOnContext(new Handler<Void>() {
+          @Override
+          public void handle(Void event) {
+            endStream(requestNumber);
+          }
+        });
+
       };
 
       ScuttlebuttStreamHandler scuttlebuttStreamHandler = 
responseSink.apply(closeStreamHandler);
@@ -131,6 +141,7 @@ public class RPCHandler implements Multiplexer, 
ClientHandler {
         scuttlebuttStreamHandler.onStreamError(new 
ConnectionClosedException());
       } else {
         streams.put(requestNumber, scuttlebuttStreamHandler);
+        logOutgoingRequest(message);
         sendBytes(requestBytes);
       }
 
@@ -140,6 +151,15 @@ public class RPCHandler implements Multiplexer, 
ClientHandler {
     vertx.runOnContext(synchronizedRequest);
   }
 
+  private void logOutgoingRequest(RPCMessage rpcMessage) {
+    if (logger.isDebugEnabled()) {
+      String requestString = new String(rpcMessage.asString());
+      String logMessage = String.format("[%d] Outgoing request: %s", 
rpcMessage.requestNumber(), requestString);
+      logger.debug(logMessage);
+    }
+
+  }
+
   @Override
   public void close() {
     vertx.runOnContext((event) -> {
@@ -192,14 +212,14 @@ public class RPCHandler implements Multiplexer, 
ClientHandler {
   private void handleRequest(RPCMessage rpcMessage) {
     // Not yet implemented
     logger.warn("Received incoming request, but we do not yet handle any 
requests: " + rpcMessage.asString());
-
   }
 
   private void handleResponse(RPCMessage response) {
     int requestNumber = response.requestNumber() * -1;
 
     if (logger.isDebugEnabled()) {
-      logger.debug("Incoming response: " + response.asString());
+      String logMessage = String.format("[%d] incoming response: %s", 
requestNumber, response.asString());
+      logger.debug(logMessage);
     }
 
     byte rpcFlags = response.rpcFlags();
@@ -214,9 +234,8 @@ public class RPCHandler implements Multiplexer, 
ClientHandler {
       if (scuttlebuttStreamHandler != null) {
 
         if (response.isSuccessfulLastMessage()) {
-          streams.remove(requestNumber);
+          // Confirm our end of the stream close and inform the consumer of 
the stream that it is closed
           endStream(requestNumber);
-          scuttlebuttStreamHandler.onStreamEnd();
         } else if (exception.isPresent()) {
           scuttlebuttStreamHandler.onStreamError(exception.get());
         } else {
@@ -260,10 +279,32 @@ public class RPCHandler implements Multiplexer, 
ClientHandler {
     messageSender.accept(bytes);
   }
 
+  /**
+   * Sends an stream close message over the RPC channel to for the given 
request number if we have not already closed
+   * our end of the stream.
+   *
+   * Removes the stream handler from the state, so any newly incoming messages 
until the other side of the stream has
+   * closed its end will be ignored.
+   *
+   * @param requestNumber the request number of the stream to send a close 
message over RPC for
+   */
   private void endStream(int requestNumber) {
     try {
-      Bytes streamEnd = RPCCodec.encodeStreamEndRequest(requestNumber);
-      sendBytes(streamEnd);
+      ScuttlebuttStreamHandler streamHandler = streams.remove(requestNumber);
+
+      // Only send the message if the stream hasn't already been closed at our 
end
+      if (streamHandler != null) {
+        Bytes streamEnd = RPCCodec.encodeStreamEndRequest(requestNumber);
+
+        streamHandler.onStreamEnd();
+        if (logger.isDebugEnabled()) {
+          String logMessage = String.format("[%d] Sending close stream 
message.", requestNumber);
+          logger.debug(logMessage);
+        }
+
+        sendBytes(streamEnd);
+      }
+
     } catch (JsonProcessingException e) {
       logger.warn("Unexpectedly could not encode stream end message to JSON.");
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to