This is an automated email from the ASF dual-hosted git repository. liubao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-java-chassis.git
commit 86c125647328771bc7176987d9ca96ee2ffdd57b Author: wujimin <[email protected]> AuthorDate: Wed Oct 31 02:56:58 2018 +0800 [SCB-1001] bug fix: client disconnect first before download finished, should close ReadStream --- .../vertx/stream/InputStreamToReadStream.java | 6 +++++- .../foundation/vertx/stream/PumpCommon.java | 18 ++++++++++++++++-- .../foundation/vertx/stream/TestPumpFromPart.java | 14 ++++++++++++++ 3 files changed, 35 insertions(+), 3 deletions(-) diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/InputStreamToReadStream.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/InputStreamToReadStream.java index 01d6586..6d66fb6 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/InputStreamToReadStream.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/InputStreamToReadStream.java @@ -130,7 +130,7 @@ public class InputStreamToReadStream implements ReadStream<Buffer> { } } - private void handleException(Throwable e) { + public void handleException(Throwable e) { closeInputStream(); exceptionHandler.handle(e); } @@ -188,6 +188,10 @@ public class InputStreamToReadStream implements ReadStream<Buffer> { } private void closeInputStream() { + if (closed) { + return; + } + closed = true; if (!autoCloseInputStream) { return; diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpCommon.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpCommon.java index 156510a..77334b9 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpCommon.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpCommon.java @@ -22,6 +22,7 @@ import org.apache.servicecomb.foundation.common.io.AsyncCloseable; import io.vertx.core.Context; import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpClientResponse; import io.vertx.core.streams.Pump; import io.vertx.core.streams.ReadStream; import io.vertx.core.streams.WriteStream; @@ -41,12 +42,25 @@ public class PumpCommon { public CompletableFuture<Void> pump(Context context, ReadStream<Buffer> readStream, WriteStream<Buffer> writeStream) { CompletableFuture<Void> readFuture = new CompletableFuture<>(); - writeStream.exceptionHandler(readFuture::completeExceptionally); + writeStream.exceptionHandler(e -> { + // consumer -> producer + // 3rd consumer -> edge -> producer + // when download not finished, consumer stop download + // producer should stop download logic at once + if (readStream instanceof InputStreamToReadStream) { + ((InputStreamToReadStream) readStream).handleException(e); + } else if (readStream instanceof HttpClientResponse) { + // can not find a way to cancel/terminate request + // so can only close the connection. + ((HttpClientResponse) readStream).request().connection().close(); + } + readFuture.completeExceptionally(e); + }); readStream.exceptionHandler(readFuture::completeExceptionally); // just means read finished, not means write finished readStream.endHandler(readFuture::complete); - // if readStream(HttpClientResponse) and awriteStream(HttpServerResponse) + // if readStream(HttpClientResponse) and writeStream(HttpServerResponse) // belongs to difference eventloop // maybe will cause deadlock // if happened, vertx will print deadlock stacks diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpFromPart.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpFromPart.java index 96899e8..9ba8a28 100644 --- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpFromPart.java +++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpFromPart.java @@ -133,7 +133,14 @@ public class TestPumpFromPart { }; pump_error(null); + Assert.assertTrue(inputStreamClosed); + Assert.assertTrue(outputStreamClosed); + + inputStreamClosed = false; + outputStreamClosed = false; pump_error(context); + Assert.assertTrue(inputStreamClosed); + Assert.assertTrue(outputStreamClosed); } @Test @@ -152,6 +159,13 @@ public class TestPumpFromPart { }; pump_error(null); + Assert.assertTrue(inputStreamClosed); + Assert.assertTrue(outputStreamClosed); + + inputStreamClosed = false; + outputStreamClosed = false; pump_error(context); + Assert.assertTrue(inputStreamClosed); + Assert.assertTrue(outputStreamClosed); } }
