This is an automated email from the ASF dual-hosted git repository. liubao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-java-chassis.git
commit be790e6b559700faac6dc677848d586f1e0a3af0 Author: wujimin <[email protected]> AuthorDate: Sun Apr 22 12:45:08 2018 +0800 [SCB-487] ReadStreamPart support saveAsBytes/saveAsString/saveToFile --- .../foundation/vertx/http/ReadStreamPart.java | 103 +++++++++++++- .../io/vertx/core/file/impl/AsyncFileUitls.java | 28 ++++ .../foundation/vertx/http/TestReadStreamPart.java | 158 ++++++++++++++++++++- .../rest/client/http/RestClientInvocation.java | 2 +- 4 files changed, 285 insertions(+), 6 deletions(-) diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/ReadStreamPart.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/ReadStreamPart.java index 4014609..ddc56c9 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/ReadStreamPart.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/ReadStreamPart.java @@ -16,13 +16,26 @@ */ package org.apache.servicecomb.foundation.vertx.http; +import java.io.File; import java.util.concurrent.CompletableFuture; +import java.util.function.Function; import javax.servlet.http.Part; +import javax.ws.rs.core.HttpHeaders; +import org.apache.commons.lang.StringUtils; +import org.apache.servicecomb.foundation.common.http.HttpUtils; import org.apache.servicecomb.foundation.common.part.AbstractPart; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import io.vertx.core.AsyncResult; +import io.vertx.core.Context; +import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; +import io.vertx.core.file.AsyncFile; +import io.vertx.core.file.OpenOptions; +import io.vertx.core.http.HttpClientResponse; import io.vertx.core.streams.Pump; import io.vertx.core.streams.ReadStream; import io.vertx.core.streams.WriteStream; @@ -36,9 +49,26 @@ import io.vertx.core.streams.WriteStream; * {@link org.apache.servicecomb.foundation.vertx.http.VertxServerResponseToHttpServletResponse#sendPart(Part) VertxServerResponseToHttpServletResponse.sendPart} */ public class ReadStreamPart extends AbstractPart { + private static final Logger LOGGER = LoggerFactory.getLogger(ReadStreamPart.class); + + private Context context; + private ReadStream<Buffer> readStream; - public ReadStreamPart(ReadStream<Buffer> readStream) { + public ReadStreamPart(Context context, HttpClientResponse httpClientResponse) { + this(context, (ReadStream<Buffer>) httpClientResponse); + + setSubmittedFileName( + HttpUtils.parseFileNameFromHeaderValue(httpClientResponse.getHeader(HttpHeaders.CONTENT_DISPOSITION))); + + String contentType = httpClientResponse.getHeader(HttpHeaders.CONTENT_TYPE); + if (StringUtils.isNotEmpty(contentType)) { + this.contentType(contentType); + } + } + + public ReadStreamPart(Context context, ReadStream<Buffer> readStream) { + this.context = context; this.readStream = readStream; readStream.pause(); @@ -60,4 +90,75 @@ public class ReadStreamPart extends AbstractPart { return future; } + + public CompletableFuture<byte[]> saveAsBytes() { + return saveAs(buf -> { + return buf.getBytes(); + }); + } + + public CompletableFuture<String> saveAsString() { + return saveAs(buf -> { + return buf.toString(); + }); + } + + public <T> CompletableFuture<T> saveAs(Function<Buffer, T> converter) { + CompletableFuture<T> future = new CompletableFuture<>(); + Buffer buffer = Buffer.buffer(); + + readStream.exceptionHandler(future::completeExceptionally); + readStream.handler(buffer::appendBuffer); + readStream.endHandler(v -> { + future.complete(converter.apply(buffer)); + }); + readStream.resume(); + + return future; + } + + public CompletableFuture<File> saveToFile(String fileName) { + File file = new File(fileName); + file.getParentFile().mkdirs(); + OpenOptions openOptions = new OpenOptions().setCreateNew(true); + return saveToFile(file, openOptions); + } + + public CompletableFuture<File> saveToFile(File file, OpenOptions openOptions) { + CompletableFuture<File> future = new CompletableFuture<>(); + + Vertx vertx = context.owner(); + vertx.fileSystem().open(file.getAbsolutePath(), openOptions, ar -> { + onFileOpened(file, ar, future); + }); + + return future; + } + + protected void onFileOpened(File file, AsyncResult<AsyncFile> ar, CompletableFuture<File> future) { + if (ar.failed()) { + future.completeExceptionally(ar.cause()); + return; + } + + AsyncFile asyncFile = ar.result(); + CompletableFuture<Void> saveFuture = saveToWriteStream(asyncFile); + saveFuture.whenComplete((v, saveException) -> { + asyncFile.close(closeAr -> { + if (closeAr.failed()) { + LOGGER.error("Failed to close file {}.", file); + } + + // whatever close success or failed + // will not affect to result + // result just only related to write + if (saveException == null) { + future.complete(file); + return; + } + + future.completeExceptionally(saveException); + }); + }); + } } diff --git a/foundations/foundation-vertx/src/test/java/io/vertx/core/file/impl/AsyncFileUitls.java b/foundations/foundation-vertx/src/test/java/io/vertx/core/file/impl/AsyncFileUitls.java new file mode 100644 index 0000000..a458f9b --- /dev/null +++ b/foundations/foundation-vertx/src/test/java/io/vertx/core/file/impl/AsyncFileUitls.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.vertx.core.file.impl; + +import io.vertx.core.file.AsyncFile; +import io.vertx.core.file.OpenOptions; +import io.vertx.core.impl.ContextImpl; +import io.vertx.core.impl.VertxInternal; + +public class AsyncFileUitls { + public static AsyncFile createAsyncFile(VertxInternal vertx, String path, OpenOptions options, ContextImpl context) { + return new AsyncFileImpl(vertx, path, options, context); + } +} diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestReadStreamPart.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestReadStreamPart.java index cffd964..0f9e730 100644 --- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestReadStreamPart.java +++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestReadStreamPart.java @@ -17,10 +17,15 @@ package org.apache.servicecomb.foundation.vertx.http; import java.io.ByteArrayInputStream; +import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.util.UUID; import java.util.concurrent.ExecutionException; +import javax.ws.rs.core.HttpHeaders; + +import org.apache.commons.io.FileUtils; import org.apache.servicecomb.foundation.vertx.stream.InputStreamToReadStream; import org.hamcrest.Matchers; import org.junit.Assert; @@ -30,18 +35,35 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import io.vertx.core.AsyncResult; +import io.vertx.core.Context; import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; +import io.vertx.core.file.AsyncFile; +import io.vertx.core.file.FileSystem; +import io.vertx.core.file.FileSystemException; +import io.vertx.core.file.OpenOptions; +import io.vertx.core.file.impl.AsyncFileUitls; +import io.vertx.core.file.impl.FileSystemImpl; +import io.vertx.core.file.impl.WindowsFileSystem; +import io.vertx.core.http.HttpClientResponse; +import io.vertx.core.impl.ContextImpl; +import io.vertx.core.impl.EventLoopContext; +import io.vertx.core.impl.Utils; +import io.vertx.core.impl.VertxInternal; import io.vertx.core.streams.WriteStream; +import mockit.Expectations; import mockit.Mock; import mockit.MockUp; import mockit.Mocked; public class TestReadStreamPart { @Mocked - Vertx vertx; + VertxInternal vertx; + + // @Mocked + ContextImpl context; String src = "src"; @@ -54,14 +76,31 @@ public class TestReadStreamPart { @Rule public ExpectedException expectedException = ExpectedException.none(); + FileSystem fileSystem; + + protected FileSystem getFileSystem() { + return Utils.isWindows() ? new WindowsFileSystem(vertx) : new FileSystemImpl(vertx); + } @Before public void setup() { - readStream = new InputStreamToReadStream(vertx, inputStream); - part = new ReadStreamPart(readStream); - new MockUp<Vertx>(vertx) { @Mock + FileSystem fileSystem() { + return fileSystem; + } + + @Mock + ContextImpl getContext() { + return context; + } + + @Mock + ContextImpl getOrCreateContext() { + return context; + } + + @Mock <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered, Handler<AsyncResult<T>> resultHandler) { Future<T> future = Future.future(); @@ -69,6 +108,78 @@ public class TestReadStreamPart { future.setHandler(resultHandler); } }; + + context = new EventLoopContext(vertx, null, null, null, "id", null, null); + new MockUp<Context>(context) { + @Mock + Vertx owner() { + return vertx; + } + + @Mock + void runOnContext(Handler<Void> task) { + task.handle(null); + } + + @Mock + <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, Handler<AsyncResult<T>> resultHandler) { + Future<T> future = Future.future(); + blockingCodeHandler.handle(future); + future.setHandler(resultHandler); + } + }; + + fileSystem = getFileSystem(); + + readStream = new InputStreamToReadStream(vertx, inputStream); + part = new ReadStreamPart(context, readStream); + + new MockUp<FileSystem>(fileSystem) { + @Mock + FileSystem open(String path, OpenOptions options, Handler<AsyncResult<AsyncFile>> handler) { + try { + AsyncFile asyncFile = AsyncFileUitls.createAsyncFile(vertx, path, options, context); + handler.handle(Future.succeededFuture(asyncFile)); + } catch (Exception e) { + handler.handle(Future.failedFuture(e)); + } + return fileSystem; + } + }; + } + + @Test + public void constructFromHttpClientResponse_noContentType(@Mocked HttpClientResponse httpClientResponse) { + new Expectations() { + { + httpClientResponse.getHeader(HttpHeaders.CONTENT_DISPOSITION); + result = "xx;filename=name.txt"; + httpClientResponse.getHeader(HttpHeaders.CONTENT_TYPE); + result = null; + } + }; + + part = new ReadStreamPart(context, httpClientResponse); + + Assert.assertEquals("name.txt", part.getSubmittedFileName()); + Assert.assertEquals("text/plain", part.getContentType()); + } + + @Test + public void constructFromHttpClientResponse_hasContentType(@Mocked HttpClientResponse httpClientResponse) { + new Expectations() { + { + httpClientResponse.getHeader(HttpHeaders.CONTENT_DISPOSITION); + result = "xx;filename=name.txt"; + httpClientResponse.getHeader(HttpHeaders.CONTENT_TYPE); + result = "type"; + } + }; + + part = new ReadStreamPart(context, httpClientResponse); + + Assert.assertEquals("name.txt", part.getSubmittedFileName()); + Assert.assertEquals("type", part.getContentType()); } @Test @@ -128,4 +239,43 @@ public class TestReadStreamPart { part.saveToWriteStream(writeStream).get(); } + + @Test + public void saveAsBytes() throws InterruptedException, ExecutionException { + Assert.assertArrayEquals(src.getBytes(), part.saveAsBytes().get()); + } + + @Test + public void saveAsString() throws InterruptedException, ExecutionException { + Assert.assertEquals(src, part.saveAsString().get()); + } + + @Test + public void saveToFile() throws InterruptedException, ExecutionException, IOException { + File dir = new File("target/notExist-" + UUID.randomUUID().toString()); + File file = new File(dir, "a.txt"); + + Assert.assertFalse(dir.exists()); + + part.saveToFile(file.getAbsolutePath()).get(); + + Assert.assertEquals(src, FileUtils.readFileToString(file)); + + FileUtils.forceDelete(dir); + Assert.assertFalse(dir.exists()); + } + + @Test + public void saveToFile_notExist_notCreate() throws InterruptedException, ExecutionException, IOException { + File dir = new File("target/notExist-" + UUID.randomUUID().toString()); + File file = new File(dir, "a.txt"); + + Assert.assertFalse(dir.exists()); + + expectedException.expect(ExecutionException.class); + expectedException.expectCause(Matchers.instanceOf(FileSystemException.class)); + + OpenOptions openOptions = new OpenOptions().setCreateNew(false); + part.saveToFile(file, openOptions).get(); + } } diff --git a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java index 9a586a1..db148dd 100644 --- a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java +++ b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java @@ -155,7 +155,7 @@ public class RestClientInvocation { if (HttpStatus.isSuccess(clientResponse.statusCode()) && Part.class.equals(invocation.getOperationMeta().getMethod().getReturnType())) { - ReadStreamPart part = new ReadStreamPart(httpClientResponse); + ReadStreamPart part = new ReadStreamPart(httpClientWithContext.context(), httpClientResponse); invocation.getHandlerContext().put(RestConst.READ_STREAM_PART, part); processResponseBody(null); return; -- To stop receiving notification emails like this one, please contact [email protected].
