This is an automated email from the ASF dual-hosted git repository. liubao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-java-chassis.git
commit 840d3a2c100db6ecd6e13526d359e9bc1098e21a Author: wujimin <[email protected]> AuthorDate: Sat Apr 21 12:45:16 2018 +0800 [SCB-486] create ReadStreamPart to support route download stream in edge --- .../foundation/vertx/http/ReadStreamPart.java | 63 ++++++++++ .../VertxServerResponseToHttpServletResponse.java | 15 ++- .../foundation/vertx/http/TestReadStreamPart.java | 131 +++++++++++++++++++++ ...stVertxServerResponseToHttpServletResponse.java | 16 +++ 4 files changed, 219 insertions(+), 6 deletions(-) diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/ReadStreamPart.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/ReadStreamPart.java new file mode 100644 index 0000000..4014609 --- /dev/null +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/ReadStreamPart.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.servicecomb.foundation.vertx.http; + +import java.util.concurrent.CompletableFuture; + +import javax.servlet.http.Part; + +import org.apache.servicecomb.foundation.common.part.AbstractPart; + +import io.vertx.core.buffer.Buffer; +import io.vertx.core.streams.Pump; +import io.vertx.core.streams.ReadStream; +import io.vertx.core.streams.WriteStream; + +/** + * this is not a really part type, all method extend from AbstractPart is undefined except:<br> + * 1.getContentType<br> + * 2.getSubmittedFileName<br> + * extend from AbstractPart just because want to make it be Part type, + * so that can be sent by + * {@link org.apache.servicecomb.foundation.vertx.http.VertxServerResponseToHttpServletResponse#sendPart(Part) VertxServerResponseToHttpServletResponse.sendPart} + */ +public class ReadStreamPart extends AbstractPart { + private ReadStream<Buffer> readStream; + + public ReadStreamPart(ReadStream<Buffer> readStream) { + this.readStream = readStream; + + readStream.pause(); + } + + public CompletableFuture<Void> saveToWriteStream(WriteStream<Buffer> writeStream) { + CompletableFuture<Void> future = new CompletableFuture<>(); + + writeStream.exceptionHandler(future::completeExceptionally); + readStream.exceptionHandler(future::completeExceptionally); + readStream.endHandler(future::complete); + + // if readStream(HttpClientResponse) and writeStream(HttpServerResponse) + // belongs to difference eventloop + // maybe will cause deadlock + // if happened, vertx will print deadlock stacks + Pump.pump(readStream, writeStream).start(); + readStream.resume(); + + return future; + } +} diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxServerResponseToHttpServletResponse.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxServerResponseToHttpServletResponse.java index d467058..8f55886 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxServerResponseToHttpServletResponse.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxServerResponseToHttpServletResponse.java @@ -132,23 +132,26 @@ public class VertxServerResponseToHttpServletResponse extends AbstractHttpServle @Override public CompletableFuture<Void> sendPart(Part part) { - CompletableFuture<Void> future = new CompletableFuture<Void>(); - prepareSendPartHeader(part); + if (ReadStreamPart.class.isInstance(part)) { + return ((ReadStreamPart) part).saveToWriteStream(this.serverResponse); + } + + CompletableFuture<Void> future = new CompletableFuture<Void>(); try { InputStream is = part.getInputStream(); context.runOnContext(v -> { - InputStreamToReadStream aa = new InputStreamToReadStream(context.owner(), is); - aa.exceptionHandler(t -> { + InputStreamToReadStream inputStreamToReadStream = new InputStreamToReadStream(context.owner(), is); + inputStreamToReadStream.exceptionHandler(t -> { clearPartResource(part, is); future.completeExceptionally(t); }); - aa.endHandler(V -> { + inputStreamToReadStream.endHandler(V -> { clearPartResource(part, is); future.complete(null); }); - Pump.pump(aa, serverResponse).start(); + Pump.pump(inputStreamToReadStream, serverResponse).start(); }); } catch (IOException e) { future.completeExceptionally(e); diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestReadStreamPart.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestReadStreamPart.java new file mode 100644 index 0000000..cffd964 --- /dev/null +++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestReadStreamPart.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.servicecomb.foundation.vertx.http; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.ExecutionException; + +import org.apache.servicecomb.foundation.vertx.stream.InputStreamToReadStream; +import org.hamcrest.Matchers; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import io.vertx.core.AsyncResult; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.streams.WriteStream; +import mockit.Mock; +import mockit.MockUp; +import mockit.Mocked; + +public class TestReadStreamPart { + @Mocked + Vertx vertx; + + String src = "src"; + + InputStreamToReadStream readStream; + + ReadStreamPart part; + + InputStream inputStream = new ByteArrayInputStream(src.getBytes()); + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + + @Before + public void setup() { + readStream = new InputStreamToReadStream(vertx, inputStream); + part = new ReadStreamPart(readStream); + + new MockUp<Vertx>(vertx) { + @Mock + <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered, + Handler<AsyncResult<T>> resultHandler) { + Future<T> future = Future.future(); + blockingCodeHandler.handle(future); + future.setHandler(resultHandler); + } + }; + } + + @Test + public void saveToWriteStream() throws InterruptedException, ExecutionException { + Buffer buf = Buffer.buffer(); + WriteStream<Buffer> writeStream = new MockUp<WriteStream<Buffer>>() { + @Mock + WriteStream<Buffer> write(Buffer data) { + buf.appendBuffer(data); + return null; + } + }.getMockInstance(); + + part.saveToWriteStream(writeStream).get(); + + Assert.assertEquals(src, buf.toString()); + } + + @Test + public void saveToWriteStream_writeException() throws InterruptedException, ExecutionException { + Error error = new Error(); + WriteStream<Buffer> writeStream = new MockUp<WriteStream<Buffer>>() { + Handler<Throwable> exceptionHandler; + + @Mock + WriteStream<Buffer> exceptionHandler(Handler<Throwable> handler) { + this.exceptionHandler = handler; + return null; + } + + @Mock + WriteStream<Buffer> write(Buffer data) { + exceptionHandler.handle(error); + return null; + } + }.getMockInstance(); + + expectedException.expect(ExecutionException.class); + expectedException.expectCause(Matchers.sameInstance(error)); + + part.saveToWriteStream(writeStream).get(); + } + + @Test + public void saveToWrite_readException(@Mocked WriteStream<Buffer> writeStream) + throws InterruptedException, ExecutionException { + Error error = new Error(); + new MockUp<InputStream>(inputStream) { + @Mock + int read(byte b[]) throws IOException { + throw error; + } + }; + + expectedException.expect(ExecutionException.class); + expectedException.expectCause(Matchers.sameInstance(error)); + + part.saveToWriteStream(writeStream).get(); + } +} diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerResponseToHttpServletResponse.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerResponseToHttpServletResponse.java index fd6202c..f66dd1b 100644 --- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerResponseToHttpServletResponse.java +++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerResponseToHttpServletResponse.java @@ -47,6 +47,7 @@ import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpServerResponse; import io.vertx.core.impl.VertxImpl; +import io.vertx.core.streams.WriteStream; import mockit.Deencapsulation; import mockit.Expectations; import mockit.Mock; @@ -366,6 +367,21 @@ public class TestVertxServerResponseToHttpServletResponse { future.get(); } + @SuppressWarnings("unchecked") + @Test + public void sendPart_ReadStreamPart(@Mocked ReadStreamPart part) + throws IOException, InterruptedException, ExecutionException { + CompletableFuture<Void> future = new CompletableFuture<>(); + new Expectations() { + { + part.saveToWriteStream((WriteStream<Buffer>) any); + result = future; + } + }; + + Assert.assertSame(future, response.sendPart(part)); + } + @Test public void sendPart_succ(@Mocked Part part, @Mocked InputStream inputStream) throws IOException, InterruptedException, ExecutionException { -- To stop receiving notification emails like this one, please contact [email protected].
