This is an automated email from the ASF dual-hosted git repository.

liubao pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-servicecomb-java-chassis.git

commit 86c125647328771bc7176987d9ca96ee2ffdd57b
Author: wujimin <[email protected]>
AuthorDate: Wed Oct 31 02:56:58 2018 +0800

    [SCB-1001] bug fix: client disconnect first before download finished, 
should close ReadStream
---
 .../vertx/stream/InputStreamToReadStream.java          |  6 +++++-
 .../foundation/vertx/stream/PumpCommon.java            | 18 ++++++++++++++++--
 .../foundation/vertx/stream/TestPumpFromPart.java      | 14 ++++++++++++++
 3 files changed, 35 insertions(+), 3 deletions(-)

diff --git 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/InputStreamToReadStream.java
 
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/InputStreamToReadStream.java
index 01d6586..6d66fb6 100644
--- 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/InputStreamToReadStream.java
+++ 
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/InputStreamToReadStream.java
@@ -130,7 +130,7 @@ public class InputStreamToReadStream implements 
ReadStream<Buffer> {
     }
   }
 
-  private void handleException(Throwable e) {
+  public void handleException(Throwable e) {
     closeInputStream();
     exceptionHandler.handle(e);
   }
@@ -188,6 +188,10 @@ public class InputStreamToReadStream implements 
ReadStream<Buffer> {
   }
 
   private void closeInputStream() {
+    if (closed) {
+      return;
+    }
+    
     closed = true;
     if (!autoCloseInputStream) {
       return;
diff --git 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpCommon.java
 
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpCommon.java
index 156510a..77334b9 100644
--- 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpCommon.java
+++ 
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpCommon.java
@@ -22,6 +22,7 @@ import 
org.apache.servicecomb.foundation.common.io.AsyncCloseable;
 
 import io.vertx.core.Context;
 import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpClientResponse;
 import io.vertx.core.streams.Pump;
 import io.vertx.core.streams.ReadStream;
 import io.vertx.core.streams.WriteStream;
@@ -41,12 +42,25 @@ public class PumpCommon {
   public CompletableFuture<Void> pump(Context context, ReadStream<Buffer> 
readStream, WriteStream<Buffer> writeStream) {
     CompletableFuture<Void> readFuture = new CompletableFuture<>();
 
-    writeStream.exceptionHandler(readFuture::completeExceptionally);
+    writeStream.exceptionHandler(e -> {
+      // consumer -> producer
+      // 3rd consumer -> edge -> producer
+      // when download not finished, consumer stop download
+      // producer should stop download logic at once
+      if (readStream instanceof InputStreamToReadStream) {
+        ((InputStreamToReadStream) readStream).handleException(e);
+      } else if (readStream instanceof HttpClientResponse) {
+        // can not find a way to cancel/terminate request
+        // so can only close the connection.
+        ((HttpClientResponse) readStream).request().connection().close();
+      }
+      readFuture.completeExceptionally(e);
+    });
     readStream.exceptionHandler(readFuture::completeExceptionally);
     // just means read finished, not means write finished
     readStream.endHandler(readFuture::complete);
 
-    // if readStream(HttpClientResponse) and awriteStream(HttpServerResponse)
+    // if readStream(HttpClientResponse) and writeStream(HttpServerResponse)
     // belongs to difference eventloop
     // maybe will cause deadlock
     // if happened, vertx will print deadlock stacks
diff --git 
a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpFromPart.java
 
b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpFromPart.java
index 96899e8..9ba8a28 100644
--- 
a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpFromPart.java
+++ 
b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpFromPart.java
@@ -133,7 +133,14 @@ public class TestPumpFromPart {
     };
 
     pump_error(null);
+    Assert.assertTrue(inputStreamClosed);
+    Assert.assertTrue(outputStreamClosed);
+
+    inputStreamClosed = false;
+    outputStreamClosed = false;
     pump_error(context);
+    Assert.assertTrue(inputStreamClosed);
+    Assert.assertTrue(outputStreamClosed);
   }
 
   @Test
@@ -152,6 +159,13 @@ public class TestPumpFromPart {
     };
 
     pump_error(null);
+    Assert.assertTrue(inputStreamClosed);
+    Assert.assertTrue(outputStreamClosed);
+
+    inputStreamClosed = false;
+    outputStreamClosed = false;
     pump_error(context);
+    Assert.assertTrue(inputStreamClosed);
+    Assert.assertTrue(outputStreamClosed);
   }
 }

Reply via email to