This is an automated email from the ASF dual-hosted git repository.

toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git


The following commit(s) were added to refs/heads/master by this push:
     new 91ce130  On a server ending a scuttlebutt RPC stream, the client 
should send a final close message to close its end of the stream.
     new 9bdf9a7  Merge pull request #25 from Happy0/stream_end
91ce130 is described below

commit 91ce130fb5825f72378f19a490683335b44b533c
Author: Gordon Martin <[email protected]>
AuthorDate: Fri Jul 12 11:26:24 2019 +0100

    On a server ending a scuttlebutt RPC stream, the client should send a final 
close message to close its end of the stream.
---
 .../org/apache/tuweni/scuttlebutt/rpc/RPCCodec.java   |  7 ++++++-
 .../apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java | 19 +++++++++++--------
 2 files changed, 17 insertions(+), 9 deletions(-)

diff --git 
a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCCodec.java 
b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCCodec.java
index 750dfd0..58ffc4c 100644
--- 
a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCCodec.java
+++ 
b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCCodec.java
@@ -143,7 +143,12 @@ public final class RPCCodec {
   public static Bytes encodeStreamEndRequest(int requestNumber) throws 
JsonProcessingException {
     Boolean bool = Boolean.TRUE;
     byte[] bytes = mapper.writeValueAsBytes(bool);
-    return encodeRequest(Bytes.wrap(bytes), requestNumber, 
RPCFlag.EndOrError.END);
+    return encodeRequest(
+        Bytes.wrap(bytes),
+        requestNumber,
+        RPCFlag.EndOrError.END,
+        RPCFlag.BodyType.JSON,
+        RPCFlag.Stream.STREAM);
   }
 
   /**
diff --git 
a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java
 
b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java
index 4655862..b0f3639 100644
--- 
a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java
+++ 
b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java
@@ -122,14 +122,7 @@ public class RPCHandler implements Multiplexer, 
ClientHandler {
       Bytes requestBytes = RPCCodec.encodeRequest(message.body(), 
requestNumber, rpcFlags);
 
       Runnable closeStreamHandler = () -> {
-
-        try {
-          Bytes streamEnd = RPCCodec.encodeStreamEndRequest(requestNumber);
-          sendBytes(streamEnd);
-        } catch (JsonProcessingException e) {
-          logger.warn("Unexpectedly could not encode stream end message to 
JSON.");
-        }
-
+        endStream(requestNumber);
       };
 
       ScuttlebuttStreamHandler scuttlebuttStreamHandler = 
responseSink.apply(closeStreamHandler);
@@ -222,6 +215,7 @@ public class RPCHandler implements Multiplexer, 
ClientHandler {
 
         if (response.isSuccessfulLastMessage()) {
           streams.remove(requestNumber);
+          endStream(requestNumber);
           scuttlebuttStreamHandler.onStreamEnd();
         } else if (exception.isPresent()) {
           scuttlebuttStreamHandler.onStreamError(exception.get());
@@ -266,4 +260,13 @@ public class RPCHandler implements Multiplexer, 
ClientHandler {
     messageSender.accept(bytes);
   }
 
+  private void endStream(int requestNumber) {
+    try {
+      Bytes streamEnd = RPCCodec.encodeStreamEndRequest(requestNumber);
+      sendBytes(streamEnd);
+    } catch (JsonProcessingException e) {
+      logger.warn("Unexpectedly could not encode stream end message to JSON.");
+    }
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to