[ https://issues.apache.org/jira/browse/SCB-1001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671292#comment-16671292 ]
ASF GitHub Bot commented on SCB-1001: ------------------------------------- liubao68 closed pull request #977: [SCB-1001] Vertx fix download close read stream URL: https://github.com/apache/incubator-servicecomb-java-chassis/pull/977 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java index 6d9582034..932aef4c3 100644 --- a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java +++ b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java @@ -17,7 +17,6 @@ package org.apache.servicecomb.common.rest; -import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.List; @@ -268,12 +267,18 @@ protected void onExecuteHttpServerFiltersFinish(Response response, Throwable e) try { responseEx.flushBuffer(); - } catch (IOException flushException) { + } catch (Throwable flushException) { LOGGER.error("Failed to flush rest response, operation:{}, request uri:{}", getMicroserviceQualifiedName(), requestEx.getRequestURI(), flushException); } - requestEx.getAsyncContext().complete(); + try { + requestEx.getAsyncContext().complete(); + } catch (Throwable completeException) { + LOGGER.error("Failed to complete async rest response, operation:{}, request uri:{}", + getMicroserviceQualifiedName(), requestEx.getRequestURI(), completeException); + } + // if failed to locate path, then will not create invocation // TODO: statistics this case if (invocation != null) { diff --git a/demo/demo-springmvc/springmvc-client/src/main/java/org/apache/servicecomb/demo/springmvc/client/CodeFirstRestTemplateSpringmvc.java b/demo/demo-springmvc/springmvc-client/src/main/java/org/apache/servicecomb/demo/springmvc/client/CodeFirstRestTemplateSpringmvc.java index a51588f1b..1c92669e7 100644 --- a/demo/demo-springmvc/springmvc-client/src/main/java/org/apache/servicecomb/demo/springmvc/client/CodeFirstRestTemplateSpringmvc.java +++ b/demo/demo-springmvc/springmvc-client/src/main/java/org/apache/servicecomb/demo/springmvc/client/CodeFirstRestTemplateSpringmvc.java @@ -70,8 +70,6 @@ private TestGeneric testGeneric = new TestGeneric(); - private TestDownload testDownload = new TestDownload(); - private TestRestTemplate testRestTemplate = new TestRestTemplate(); private TestContentType testContentType = new TestContentType(); @@ -80,8 +78,6 @@ @Override protected void testOnlyRest(RestTemplate template, String cseUrlPrefix) { - testDownload.runRest(); - try { testUpload(template, cseUrlPrefix); } catch (IOException e) { diff --git a/demo/demo-springmvc/springmvc-client/src/main/java/org/apache/servicecomb/demo/springmvc/client/DownloadSchemaIntf.java b/demo/demo-springmvc/springmvc-client/src/main/java/org/apache/servicecomb/demo/springmvc/client/DownloadSchemaIntf.java deleted file mode 100644 index 93e67ee7e..000000000 --- a/demo/demo-springmvc/springmvc-client/src/main/java/org/apache/servicecomb/demo/springmvc/client/DownloadSchemaIntf.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.servicecomb.demo.springmvc.client; - -import org.apache.servicecomb.foundation.vertx.http.ReadStreamPart; - -public interface DownloadSchemaIntf { - ReadStreamPart tempFileEntity(String content); - - ReadStreamPart tempFilePart(String content); - - ReadStreamPart file(String content); - - ReadStreamPart chineseAndSpaceFile(String content); - - ReadStreamPart resource(String content); - - ReadStreamPart entityResource(String content); - - ReadStreamPart entityInputStream(String content); - - ReadStreamPart bytes(String content); - - ReadStreamPart netInputStream(String content); -} diff --git a/demo/demo-springmvc/springmvc-server/src/main/java/org/apache/servicecomb/demo/springmvc/server/DownloadSchema.java b/demo/demo-springmvc/springmvc-server/src/main/java/org/apache/servicecomb/demo/springmvc/server/DownloadSchema.java deleted file mode 100644 index 563166327..000000000 --- a/demo/demo-springmvc/springmvc-server/src/main/java/org/apache/servicecomb/demo/springmvc/server/DownloadSchema.java +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.servicecomb.demo.springmvc.server; - -import java.io.ByteArrayInputStream; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.net.HttpURLConnection; -import java.net.URI; -import java.net.URL; -import java.net.URLDecoder; -import java.net.URLEncoder; -import java.nio.charset.StandardCharsets; -import java.util.UUID; - -import javax.servlet.http.Part; - -import org.apache.commons.io.FileUtils; -import org.apache.http.entity.StringEntity; -import org.apache.http.impl.bootstrap.HttpServer; -import org.apache.http.impl.bootstrap.ServerBootstrap; -import org.apache.servicecomb.foundation.common.part.FilePart; -import org.apache.servicecomb.provider.rest.common.RestSchema; -import org.springframework.core.io.ByteArrayResource; -import org.springframework.core.io.Resource; -import org.springframework.http.HttpHeaders; -import org.springframework.http.MediaType; -import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestMapping; - -import io.swagger.annotations.ApiResponse; -import io.swagger.annotations.ApiResponses; - -@RestSchema(schemaId = "download") -@RequestMapping(path = "/download") -public class DownloadSchema { - File tempDir = new File("target/downloadTemp"); - - public DownloadSchema() throws IOException { - FileUtils.deleteQuietly(tempDir); - FileUtils.forceMkdir(tempDir); - - // for download from net stream case - HttpServer server = ServerBootstrap - .bootstrap() - .setListenerPort(9000) - .registerHandler("/download/netInputStream", (req, resp, context) -> { - String uri = req.getRequestLine().getUri(); - String query = URI.create(uri).getQuery(); - int idx = query.indexOf('='); - String content = query.substring(idx + 1); - content = URLDecoder.decode(content, StandardCharsets.UTF_8.name()); - resp.setEntity(new StringEntity(content, StandardCharsets.UTF_8.name())); - }) - .create(); - server.start(); - } - - protected File createTempFile(String content) throws IOException { - return createTempFile(null, content); - } - - protected File createTempFile(String name, String content) throws IOException { - if (name == null) { - name = "download-" + UUID.randomUUID().toString() + ".txt"; - } - File file = new File(tempDir, name); - FileUtils.write(file, content); - return file; - } - - // customize HttpHeaders.CONTENT_DISPOSITION to be "attachment;filename=tempFileEntity.txt" - @GetMapping(path = "/tempFileEntity") - public ResponseEntity<Part> tempFileEntity(String content) throws IOException { - File file = createTempFile(content); - - return ResponseEntity - .ok() - .header(HttpHeaders.CONTENT_DISPOSITION, "attachment;filename=tempFileEntity.txt") - .body(new FilePart(null, file) - .setDeleteAfterFinished(true)); - } - - // generate HttpHeaders.CONTENT_DISPOSITION to be "attachment;filename=tempFilePart.txt" automatically - @GetMapping(path = "/tempFilePart") - public Part tempFilePart(String content) throws IOException { - File file = createTempFile(content); - - return new FilePart(null, file) - .setDeleteAfterFinished(true) - .setSubmittedFileName("tempFilePart.txt"); - } - - @GetMapping(path = "/file") - public File file(String content) throws IOException { - return createTempFile("file.txt", content); - } - - @GetMapping(path = "/chineseAndSpaceFile") - public Part chineseAndSpaceFile(String content) throws IOException { - File file = createTempFile(content); - return new FilePart(null, file) - .setDeleteAfterFinished(true) - .setSubmittedFileName("测 试.test.txt"); - } - - @GetMapping(path = "/resource") - @ApiResponses({ - @ApiResponse(code = 200, response = File.class, message = ""), - }) - public Resource resource(String content) throws IOException { - return new ByteArrayResource(content.getBytes(StandardCharsets.UTF_8)) { - @Override - public String getFilename() { - return "resource.txt"; - } - }; - } - - @GetMapping(path = "/entityResource") - @ApiResponses({ - @ApiResponse(code = 200, response = File.class, message = ""), - }) - public ResponseEntity<Resource> entityResource(String content) throws IOException { - return ResponseEntity - .ok() - .header(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_PLAIN_VALUE) - .header(HttpHeaders.CONTENT_DISPOSITION, "attachment;filename=entityResource.txt") - .body(new ByteArrayResource(content.getBytes(StandardCharsets.UTF_8))); - } - - @GetMapping(path = "/entityInputStream") - @ApiResponses({ - @ApiResponse(code = 200, response = File.class, message = ""), - }) - public ResponseEntity<InputStream> entityInputStream(String content) throws IOException { - return ResponseEntity - .ok() - .header(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_PLAIN_VALUE) - .header(HttpHeaders.CONTENT_DISPOSITION, "attachment;filename=entityInputStream.txt") - .body(new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8))); - } - - @GetMapping(path = "/bytes") - @ApiResponses({ - @ApiResponse(code = 200, response = File.class, message = ""), - }) - public ResponseEntity<byte[]> bytes(String content) throws IOException { - return ResponseEntity - .ok() - .header(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_PLAIN_VALUE) - .header(HttpHeaders.CONTENT_DISPOSITION, "attachment;filename=bytes.txt") - .body(content.getBytes(StandardCharsets.UTF_8)); - } - - @GetMapping(path = "/netInputStream") - @ApiResponses({ - @ApiResponse(code = 200, response = File.class, message = ""), - }) - public ResponseEntity<InputStream> netInputStream(String content) throws IOException { - URL url = new URL("http://localhost:9000/download/netInputStream?content=" - + URLEncoder.encode(content, StandardCharsets.UTF_8.name())); - HttpURLConnection conn = (HttpURLConnection) url.openConnection(); - return ResponseEntity - .ok() - .header(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_PLAIN_VALUE) - .header(HttpHeaders.CONTENT_DISPOSITION, "attachment;filename=netInputStream.txt") - .body(conn.getInputStream()); - } -} diff --git a/demo/demo-springmvc/springmvc-server/src/main/java/org/apache/servicecomb/demo/springmvc/server/ProducerTestsAfterBootup.java b/demo/demo-springmvc/springmvc-server/src/main/java/org/apache/servicecomb/demo/springmvc/server/ProducerTestsAfterBootup.java index e19296faa..6023d1dd9 100644 --- a/demo/demo-springmvc/springmvc-server/src/main/java/org/apache/servicecomb/demo/springmvc/server/ProducerTestsAfterBootup.java +++ b/demo/demo-springmvc/springmvc-server/src/main/java/org/apache/servicecomb/demo/springmvc/server/ProducerTestsAfterBootup.java @@ -59,7 +59,7 @@ public void testSchemaNotChange() { } public void testRegisteredBasePath() { - TestMgr.check(13, RegistryUtils.getMicroservice().getPaths().size()); + TestMgr.check(12, RegistryUtils.getMicroservice().getPaths().size()); } private String getSwaggerContent(Swagger swagger) { diff --git a/demo/demo-springmvc/springmvc-server/src/main/resources/microservice.yaml b/demo/demo-springmvc/springmvc-server/src/main/resources/microservice.yaml index fba7554db..393dc5257 100644 --- a/demo/demo-springmvc/springmvc-server/src/main/resources/microservice.yaml +++ b/demo/demo-springmvc/springmvc-server/src/main/resources/microservice.yaml @@ -14,21 +14,20 @@ ## See the License for the specific language governing permissions and ## limitations under the License. ## --------------------------------------------------------------------------- -servicecomb: - microserviceVersionFactory: org.apache.servicecomb.core.definition.PrivateMicroserviceVersionMetaFactory APPLICATION_ID: springmvctest service_description: name: springmvc version: 0.0.3 paths: - - path: /test1/testpath - property: - checksession: false - - path: /test2/testpath - property: - checksession: true + - path: /test1/testpath + property: + checksession: false + - path: /test2/testpath + property: + checksession: true servicecomb: + microserviceVersionFactory: org.apache.servicecomb.core.definition.PrivateMicroserviceVersionMetaFactory service: registry: registerPath: true @@ -44,7 +43,7 @@ servicecomb: interval: 10 watch: false autodiscovery: true -# can download config center from https://cse-bucket.obs.myhwclouds.com/LocalCSE/Local-CSE-1.0.0.zip to test dynamic config + # can download config center from https://cse-bucket.obs.myhwclouds.com/LocalCSE/Local-CSE-1.0.0.zip to test dynamic config config: client: serverUri: http://127.0.0.1:30113 diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/InputStreamToReadStream.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/InputStreamToReadStream.java index 01d65869e..6d66fb60f 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/InputStreamToReadStream.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/InputStreamToReadStream.java @@ -130,7 +130,7 @@ private synchronized void readInWorker(Future<ReadResult> future) { } } - private void handleException(Throwable e) { + public void handleException(Throwable e) { closeInputStream(); exceptionHandler.handle(e); } @@ -188,6 +188,10 @@ private synchronized void handleEnd() { } private void closeInputStream() { + if (closed) { + return; + } + closed = true; if (!autoCloseInputStream) { return; diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpCommon.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpCommon.java index 156510a94..77334b9c9 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpCommon.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpCommon.java @@ -22,6 +22,7 @@ import io.vertx.core.Context; import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpClientResponse; import io.vertx.core.streams.Pump; import io.vertx.core.streams.ReadStream; import io.vertx.core.streams.WriteStream; @@ -41,12 +42,25 @@ public CompletableFuture<Void> pump(Context context, ReadStream<Buffer> readStream, WriteStream<Buffer> writeStream) { CompletableFuture<Void> readFuture = new CompletableFuture<>(); - writeStream.exceptionHandler(readFuture::completeExceptionally); + writeStream.exceptionHandler(e -> { + // consumer -> producer + // 3rd consumer -> edge -> producer + // when download not finished, consumer stop download + // producer should stop download logic at once + if (readStream instanceof InputStreamToReadStream) { + ((InputStreamToReadStream) readStream).handleException(e); + } else if (readStream instanceof HttpClientResponse) { + // can not find a way to cancel/terminate request + // so can only close the connection. + ((HttpClientResponse) readStream).request().connection().close(); + } + readFuture.completeExceptionally(e); + }); readStream.exceptionHandler(readFuture::completeExceptionally); // just means read finished, not means write finished readStream.endHandler(readFuture::complete); - // if readStream(HttpClientResponse) and awriteStream(HttpServerResponse) + // if readStream(HttpClientResponse) and writeStream(HttpServerResponse) // belongs to difference eventloop // maybe will cause deadlock // if happened, vertx will print deadlock stacks diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpFromPart.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpFromPart.java index 96899e85a..9ba8a2864 100644 --- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpFromPart.java +++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpFromPart.java @@ -133,7 +133,14 @@ void readInWorker(Future<ReadResult> future) { }; pump_error(null); + Assert.assertTrue(inputStreamClosed); + Assert.assertTrue(outputStreamClosed); + + inputStreamClosed = false; + outputStreamClosed = false; pump_error(context); + Assert.assertTrue(inputStreamClosed); + Assert.assertTrue(outputStreamClosed); } @Test @@ -152,6 +159,13 @@ void write(byte[] b) throws IOException { }; pump_error(null); + Assert.assertTrue(inputStreamClosed); + Assert.assertTrue(outputStreamClosed); + + inputStreamClosed = false; + outputStreamClosed = false; pump_error(context); + Assert.assertTrue(inputStreamClosed); + Assert.assertTrue(outputStreamClosed); } } diff --git a/integration-tests/it-consumer/pom.xml b/integration-tests/it-consumer/pom.xml index 9821cab1f..7e743a855 100644 --- a/integration-tests/it-consumer/pom.xml +++ b/integration-tests/it-consumer/pom.xml @@ -33,6 +33,12 @@ <groupId>org.apache.servicecomb.tests</groupId> <artifactId>it-common</artifactId> </dependency> + <dependency> + <groupId>com.squareup.okhttp3</groupId> + <artifactId>okhttp</artifactId> + <scope>compile</scope> + </dependency> + <dependency> <groupId>org.apache.maven</groupId> <artifactId>maven-model</artifactId> diff --git a/integration-tests/it-consumer/src/main/java/org/apache/servicecomb/it/ConsumerMain.java b/integration-tests/it-consumer/src/main/java/org/apache/servicecomb/it/ConsumerMain.java index 61db4463f..cbc00d707 100644 --- a/integration-tests/it-consumer/src/main/java/org/apache/servicecomb/it/ConsumerMain.java +++ b/integration-tests/it-consumer/src/main/java/org/apache/servicecomb/it/ConsumerMain.java @@ -19,6 +19,7 @@ import org.apache.servicecomb.core.SCBEngine; import org.apache.servicecomb.foundation.common.utils.BeanUtils; import org.apache.servicecomb.it.deploy.Deploys; +import org.apache.servicecomb.it.deploy.MicroserviceDeploy; import org.apache.servicecomb.it.junit.ITJUnitUtils; import org.apache.servicecomb.it.schema.TestApiOperation; import org.apache.servicecomb.it.testcase.TestAnnotatedAttribute; @@ -27,6 +28,8 @@ import org.apache.servicecomb.it.testcase.TestDataTypePrimitive; import org.apache.servicecomb.it.testcase.TestDefaultJsonValueJaxrsSchema; import org.apache.servicecomb.it.testcase.TestDefaultValue; +import org.apache.servicecomb.it.testcase.TestDownload; +import org.apache.servicecomb.it.testcase.TestDownloadSlowStreamEdge; import org.apache.servicecomb.it.testcase.TestIgnoreMethod; import org.apache.servicecomb.it.testcase.TestIgnoreStaticMethod; import org.apache.servicecomb.it.testcase.TestParamCodec; @@ -53,10 +56,12 @@ public static void main(String[] args) throws Throwable { BeanUtils.init(); ITUtils.waitBootFinished(); - run(); - - SCBEngine.getInstance().destroy(); - deploys.getServiceCenter().stop(); + try { + run(); + } finally { + SCBEngine.getInstance().destroy(); + deploys.getServiceCenter().stop(); + } resultPrinter.print(); @@ -70,51 +75,22 @@ protected static void run() throws Throwable { // if not ready, will start a new instance and wait for ready deploys.getEdge().ensureReady(); // deploys.getZuul().ensureReady(zuul); - ITJUnitUtils.run(TestIgnoreStaticMethod.class); - ITJUnitUtils.run(TestIgnoreMethod.class); - ITJUnitUtils.run(TestApiParam.class); - ITJUnitUtils.run(TestApiOperation.class); - - // 1.base test case - // include all extension point abnormal scenes test case - - // deploy standalone base-producer - // only run one case for "any" transport - // run highway - // run rest - // run native restTemplate to edge/zuul - // stop standalone base-producer - testStandalone(); - - // deploy tomcat base-producer - // run vertx-servlet - // run native restTemplate to edge/zuul - // stop tomcat base-producer - - // deploy spring boot base-producer - // run vertx-servlet - // run native restTemplate to edge/zuul - // stop spring boot base-producer - - // 2.complex test case - // 1)start new producer version - // consumer/edge/zuul should ...... - // 2)delete new producer version - // consumer/edge/zuul should ...... - // ...... - - // 3.deploy development mode producer - // ...... - - testSpringBoot2Standalone(); - - testHttp2CStandalone(); - - testSpringBoot2Servlet(); - //http2 - testHttp2Standalone(); - - deploys.getEdge().stop(); + + try { + ITJUnitUtils.run(TestIgnoreStaticMethod.class); + ITJUnitUtils.run(TestIgnoreMethod.class); + ITJUnitUtils.run(TestApiParam.class); + ITJUnitUtils.run(TestApiOperation.class); + + testOneProducer(deploys.getBaseProducer(), ConsumerMain::testStandalone); + testOneProducer(deploys.getBaseHttp2Producer(), ConsumerMain::testH2Standalone); + testOneProducer(deploys.getBaseHttp2CProducer(), ConsumerMain::testH2CStandalone); + + testOneProducer(deploys.getSpringBoot2StandaloneProducer(), ConsumerMain::testSpringBoot2Standalone); + testOneProducer(deploys.getSpringBoot2ServletProducer(), ConsumerMain::testSpringBoot2Servlet); + } finally { + deploys.getEdge().stop(); + } } private static void runShareTestCases() throws Throwable { @@ -125,6 +101,11 @@ private static void runShareTestCases() throws Throwable { // only rest support default value feature ITJUnitUtils.runWithRest(TestDefaultValue.class); + // currently have bug with http2 + if (!ITJUnitUtils.getProducerName().endsWith("-h2") && !ITJUnitUtils.getProducerName().endsWith("-h2c")) { + ITJUnitUtils.runWithRest(TestDownload.class); + } + ITJUnitUtils.runWithHighwayAndRest(TestTrace.class); ITJUnitUtils.run(TestTraceEdge.class); @@ -136,11 +117,23 @@ private static void runShareTestCases() throws Throwable { ITJUnitUtils.run(TestRestController.class); } - private static void testStandalone() throws Throwable { - deploys.getBaseProducer().ensureReady(); + interface ITTask { + void run() throws Throwable; + } - ITJUnitUtils.addProducer("it-producer"); + private static void testOneProducer(MicroserviceDeploy microserviceDeploy, ITTask task) throws Throwable { + microserviceDeploy.ensureReady(); + ITJUnitUtils.addProducer(microserviceDeploy.getMicroserviceDeployDefinition().getMicroserviceName()); + + try { + task.run(); + } finally { + ITJUnitUtils.popProducer(); + microserviceDeploy.stop(); + } + } + private static void testStandalone() throws Throwable { runShareTestCases(); // currently not support update 3rd url, so only test once @@ -150,65 +143,34 @@ private static void testStandalone() throws Throwable { ITJUnitUtils.runWithRest(TestRestServerConfig.class); ITJUnitUtils.run(TestRestServerConfigEdge.class); - ITJUnitUtils.popProducer(); - deploys.getBaseProducer().stop(); + // currently, only support vertx download + ITJUnitUtils.run(TestDownloadSlowStreamEdge.class); } - private static void testHttp2CStandalone() throws Throwable { - deploys.getBaseHttp2CProducer().ensureReady(); - - ITJUnitUtils.addProducer("it-producer"); - + private static void testH2CStandalone() throws Throwable { runShareTestCases(); - // currently not support update 3rd url, so only test once - ITJUnitUtils.run(Test3rdPartyInvocation.class); - //as setMaxInitialLineLength() is not work for http2, do not need // ITJUnitUtils.runWithRest(TestRestServerConfig.class) ITJUnitUtils.run(TestRestServerConfigEdge.class); - - ITJUnitUtils.popProducer(); - deploys.getBaseHttp2CProducer().stop(); } - private static void testHttp2Standalone() throws Throwable { - deploys.getBaseHttp2Producer().ensureReady(); - - ITJUnitUtils.addProducer("it-producer"); - + private static void testH2Standalone() throws Throwable { runShareTestCases(); - // currently not support update 3rd url, so only test once - ITJUnitUtils.run(Test3rdPartyInvocation.class); - //as setMaxInitialLineLength() is not work for http2, do not need // ITJUnitUtils.runWithRest(TestRestServerConfig.class) ITJUnitUtils.run(TestRestServerConfigEdge.class); - - ITJUnitUtils.popProducer(); - deploys.getBaseHttp2Producer().stop(); } private static void testSpringBoot2Standalone() throws Throwable { - deploys.getSpringBoot2StandaloneProducer().ensureReady(); - - ITJUnitUtils.addProducer("it-producer-deploy-springboot2-standalone"); - runShareTestCases(); - ITJUnitUtils.popProducer(); - deploys.getSpringBoot2StandaloneProducer().stop(); + // currently, only support vertx download + ITJUnitUtils.run(TestDownloadSlowStreamEdge.class); } private static void testSpringBoot2Servlet() throws Throwable { - deploys.getSpringBoot2ServletProducer().ensureReady(); - - ITJUnitUtils.addProducer("it-producer-deploy-springboot2-servlet"); - runShareTestCases(); - - ITJUnitUtils.popProducer(); - deploys.getSpringBoot2ServletProducer().stop(); } } diff --git a/integration-tests/it-consumer/src/main/java/org/apache/servicecomb/it/deploy/Deploys.java b/integration-tests/it-consumer/src/main/java/org/apache/servicecomb/it/deploy/Deploys.java index f71f1c793..0b5a2a2b1 100644 --- a/integration-tests/it-consumer/src/main/java/org/apache/servicecomb/it/deploy/Deploys.java +++ b/integration-tests/it-consumer/src/main/java/org/apache/servicecomb/it/deploy/Deploys.java @@ -217,7 +217,7 @@ private void initBaseHttp2Producer() { }); } definition.setAppId("integration-test"); - definition.setMicroserviceName("it-producer"); + definition.setMicroserviceName("it-producer-h2"); definition.setVersion(DEFAULT_MICROSERVICE_VERSION); initDeployDefinition(definition); @@ -231,7 +231,7 @@ private void initBaseHttp2CProducer() { definition.setCmd("it-producer"); definition.setArgs(new String[] {"-Dservicecomb.rest.address=0.0.0.0:0?protocol=http2"}); definition.setAppId("integration-test"); - definition.setMicroserviceName("it-producer"); + definition.setMicroserviceName("it-producer-h2c"); definition.setVersion(DEFAULT_MICROSERVICE_VERSION); initDeployDefinition(definition); diff --git a/integration-tests/it-consumer/src/main/java/org/apache/servicecomb/it/deploy/MicroserviceDeploy.java b/integration-tests/it-consumer/src/main/java/org/apache/servicecomb/it/deploy/MicroserviceDeploy.java index 94d118001..e24cfcbec 100644 --- a/integration-tests/it-consumer/src/main/java/org/apache/servicecomb/it/deploy/MicroserviceDeploy.java +++ b/integration-tests/it-consumer/src/main/java/org/apache/servicecomb/it/deploy/MicroserviceDeploy.java @@ -35,6 +35,10 @@ public MicroserviceDeploy(DeployDefinition deployDefinition) { this.microserviceDeployDefinition.setStartCompleteLog("ServiceComb is ready."); } + public MicroserviceDeployDefinition getMicroserviceDeployDefinition() { + return microserviceDeployDefinition; + } + @Override protected String[] createCmds() { // must set jar at the end of the cmds @@ -45,7 +49,8 @@ public MicroserviceDeploy(DeployDefinition deployDefinition) { protected String[] addArgs(String[] cmds) { // add jar return ArrayUtils.addAll(super.addArgs(cmds), - "-DselfController=" + RegistryUtils.getMicroserviceInstance().getInstanceId(), + "-DselfController=" + RegistryUtils.getMicroserviceInstance().getInstanceId(), + "-Dservice_description.name=" + microserviceDeployDefinition.getMicroserviceName(), deployDefinition.getCmd()); } @@ -75,18 +80,5 @@ public void stop() { sendCommand("ms-stop"); waitStop(); - - MicroserviceVersionRule microserviceVersionRule = RegistryUtils.getServiceRegistry().getAppManager() - .getOrCreateMicroserviceVersionRule(microserviceDeployDefinition.getAppId(), - microserviceDeployDefinition.getMicroserviceName(), - microserviceDeployDefinition.getVersion()); - while (microserviceVersionRule.getInstances().size() > 0) { - try { - LOGGER.info("{} not stop finished wait.", microserviceDeployDefinition.getDisplayName()); - Thread.sleep(3000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } } } diff --git a/demo/demo-springmvc/springmvc-client/src/main/java/org/apache/servicecomb/demo/springmvc/client/TestDownload.java b/integration-tests/it-consumer/src/main/java/org/apache/servicecomb/it/testcase/TestDownload.java similarity index 68% rename from demo/demo-springmvc/springmvc-client/src/main/java/org/apache/servicecomb/demo/springmvc/client/TestDownload.java rename to integration-tests/it-consumer/src/main/java/org/apache/servicecomb/it/testcase/TestDownload.java index 0436382be..9a3947c5f 100644 --- a/demo/demo-springmvc/springmvc-client/src/main/java/org/apache/servicecomb/demo/springmvc/client/TestDownload.java +++ b/integration-tests/it-consumer/src/main/java/org/apache/servicecomb/it/testcase/TestDownload.java @@ -14,10 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.servicecomb.demo.springmvc.client; +package org.apache.servicecomb.it.testcase; import java.io.File; import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.ArrayList; import java.util.List; import java.util.UUID; @@ -25,22 +27,19 @@ import java.util.concurrent.ExecutionException; import org.apache.commons.io.FileUtils; -import org.apache.servicecomb.demo.TestMgr; import org.apache.servicecomb.foundation.vertx.http.ReadStreamPart; -import org.apache.servicecomb.provider.pojo.Invoker; -import org.apache.servicecomb.provider.springmvc.reference.CseRestTemplate; -import org.springframework.web.client.RestTemplate; +import org.apache.servicecomb.it.Consumers; +import org.apache.servicecomb.it.testcase.support.DownloadSchemaIntf; +import org.junit.Assert; +import org.junit.Test; import com.google.common.collect.Iterables; public class TestDownload { private File dir = new File("target/download"); - private DownloadSchemaIntf intf = Invoker.createProxy("springmvc", "download", DownloadSchemaIntf.class); - - private RestTemplate restTemplate = new CseRestTemplate(); - - private String prefix = "cse://springmvc/download"; + private static Consumers<DownloadSchemaIntf> consumers = new Consumers<>("download", + DownloadSchemaIntf.class); private List<CompletableFuture<?>> futures = new ArrayList<>(); @@ -66,6 +65,12 @@ private String readFileToString(File file) { return checkFuture(future); } + private String getStackTrace(Throwable e) { + StringWriter writer = new StringWriter(); + e.printStackTrace(new PrintWriter(writer)); + return writer.toString(); + } + private <T> CompletableFuture<T> checkFuture(CompletableFuture<T> future) { Error error = new Error(); future.whenComplete((result, e) -> { @@ -78,53 +83,54 @@ private String readFileToString(File file) { value = new String((byte[]) value); } - TestMgr.check(content, value, error); + Assert.assertEquals(getStackTrace(error), content, value); }); return future; } private ReadStreamPart templateGet(String methodPath) { - return restTemplate - .getForObject(prefix + "/" + methodPath + "?content={content}", + return consumers.getSCBRestTemplate() + .getForObject("/" + methodPath + "?content={content}", ReadStreamPart.class, content); } + @Test @SuppressWarnings("unchecked") public void runRest() { - futures.add(checkFile(intf.tempFileEntity(content))); + futures.add(checkFile(consumers.getIntf().tempFileEntity(content))); futures.add(checkFuture(templateGet("tempFileEntity").saveAsBytes())); - futures.add(checkFile(intf.tempFilePart(content))); + futures.add(checkFile(consumers.getIntf().tempFilePart(content))); futures.add(checkFuture(templateGet("tempFilePart").saveAsString())); - futures.add(checkFile(intf.file(content))); + futures.add(checkFile(consumers.getIntf().file(content))); futures.add(checkFuture(templateGet("file").saveAsString())); { - ReadStreamPart part = intf.chineseAndSpaceFile(content); - TestMgr.check("测 试.test.txt", part.getSubmittedFileName()); + ReadStreamPart part = consumers.getIntf().chineseAndSpaceFile(content); + Assert.assertEquals("测 试.test.txt", part.getSubmittedFileName()); futures.add(checkFile(part)); part = templateGet("chineseAndSpaceFile"); - TestMgr.check("测 试.test.txt", part.getSubmittedFileName()); + Assert.assertEquals("测 试.test.txt", part.getSubmittedFileName()); futures.add(checkFuture(part.saveAsString())); } - futures.add(checkFile(intf.resource(content))); + futures.add(checkFile(consumers.getIntf().resource(content))); futures.add(checkFuture(templateGet("resource").saveAsString())); - futures.add(checkFile(intf.entityResource(content))); + futures.add(checkFile(consumers.getIntf().entityResource(content))); futures.add(checkFuture(templateGet("entityResource").saveAsString())); - futures.add(checkFile(intf.entityInputStream(content))); + futures.add(checkFile(consumers.getIntf().entityInputStream(content))); futures.add(checkFuture(templateGet("entityInputStream").saveAsString())); - futures.add(checkFile(intf.bytes(content))); + futures.add(checkFile(consumers.getIntf().bytes(content))); futures.add(checkFuture(templateGet("bytes").saveAsString())); - futures.add(checkFile(intf.netInputStream(content))); + futures.add(checkFile(consumers.getIntf().netInputStream(content))); futures.add(checkFuture(templateGet("netInputStream").saveAsString())); try { @@ -132,7 +138,7 @@ public void runRest() { .allOf(Iterables.toArray((List<CompletableFuture<Object>>) (Object) futures, CompletableFuture.class)) .get(); } catch (InterruptedException | ExecutionException e1) { - TestMgr.failed("test download failed.", e1); + Assert.fail("test download failed: " + getStackTrace(e1)); } } } diff --git a/integration-tests/it-consumer/src/main/java/org/apache/servicecomb/it/testcase/TestDownloadSlowStreamEdge.java b/integration-tests/it-consumer/src/main/java/org/apache/servicecomb/it/testcase/TestDownloadSlowStreamEdge.java new file mode 100644 index 000000000..8c9cfa554 --- /dev/null +++ b/integration-tests/it-consumer/src/main/java/org/apache/servicecomb/it/testcase/TestDownloadSlowStreamEdge.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.servicecomb.it.testcase; + +import java.io.IOException; + +import org.apache.servicecomb.it.extend.engine.GateRestTemplate; +import org.junit.Test; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; + +public class TestDownloadSlowStreamEdge { + static GateRestTemplate client = GateRestTemplate.createEdgeRestTemplate("download"); + + @Test + public void clearInputStreamAfterDisconnect() throws IOException { +// URL url = new URL(client.getUrlPrefix() + "/slowInputStream"); +// HttpURLConnection conn = (HttpURLConnection) url.openConnection(); +// conn.getInputStream().close(); +// conn.disconnect(); + + OkHttpClient httpClient = new OkHttpClient(); + Request request = new Request.Builder().url(client.getUrlPrefix() + "/slowInputStream") + .build(); + Response response = httpClient.newCall(request).execute(); + + response.body().byteStream(); + response.body().close(); + + client.getForObject("/waitSlowInputStreamClosed", Void.class); + } +} diff --git a/integration-tests/it-consumer/src/main/java/org/apache/servicecomb/it/testcase/base/TestDownload.java b/integration-tests/it-consumer/src/main/java/org/apache/servicecomb/it/testcase/base/TestDownload.java deleted file mode 100644 index 1c6588d06..000000000 --- a/integration-tests/it-consumer/src/main/java/org/apache/servicecomb/it/testcase/base/TestDownload.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.servicecomb.it.testcase.base; - -import org.junit.Assert; -import org.junit.Test; - -public class TestDownload { - // private static DownloadSchemaIntf intf = Invoker - // .createProxy("it-producer", "download", DownloadSchemaIntf.class); - - static int x; - - @Test - public void test1() { - x++; - System.out.println("test" + x); -// intf.tempFileEntity("abc"); - } - - @Test - public void test2() { - x++; - System.out.println("test2"); - Assert.assertEquals(x, x + 1); - } -// private File dir = new File("target/download"); -// -// -// private RestTemplate restTemplate = new CseRestTemplate(); -// -// private String prefix = "cse://springmvc/download"; -// -// private List<CompletableFuture<?>> futures = new ArrayList<>(); -// -// private String content = "file content"; -// -// public TestDownload() { -// FileUtils.deleteQuietly(dir); -// } -// -// private String readFileToString(File file) { -// try { -// return FileUtils.readFileToString(file); -// } catch (IOException e) { -// return "read file failed:" + e.getMessage(); -// } -// } -// -// private CompletableFuture<File> checkFile(ReadStreamPart part) { -// CompletableFuture<File> future = part.saveToFile("target/download/" -// + UUID.randomUUID().toString() -// + "-" -// + part.getSubmittedFileName()); -// return checkFuture(future); -// } -// -// private <T> CompletableFuture<T> checkFuture(CompletableFuture<T> future) { -// Error error = new Error(); -// future.whenComplete((result, e) -> { -// Object value = result; -// if (File.class.isInstance(value)) { -// File file = (File) value; -// value = readFileToString(file); -// file.delete(); -// } else if (byte[].class.isInstance(value)) { -// value = new String((byte[]) value); -// } -// -// TestMgr.check(content, value, error); -// }); -// -// return future; -// } -// -// private ReadStreamPart templateGet(String methodPath) { -// return restTemplate -// .getForObject(prefix + "/" + methodPath + "?content={content}", -// ReadStreamPart.class, -// content); -// } -// -// public void runRest() { -// futures.add(checkFile(intf.tempFileEntity(content))); -// futures.add(checkFuture(templateGet("tempFileEntity").saveAsBytes())); -// -// futures.add(checkFile(intf.tempFilePart(content))); -// futures.add(checkFuture(templateGet("tempFilePart").saveAsString())); -// -// futures.add(checkFile(intf.file(content))); -// futures.add(checkFuture(templateGet("file").saveAsString())); -// -// { -// ReadStreamPart part = intf.chineseAndSpaceFile(content); -// TestMgr.check("测 试.test.txt", part.getSubmittedFileName()); -// futures.add(checkFile(part)); -// -// part = templateGet("chineseAndSpaceFile"); -// TestMgr.check("测 试.test.txt", part.getSubmittedFileName()); -// futures.add(checkFuture(part.saveAsString())); -// } -// -// futures.add(checkFile(intf.resource(content))); -// futures.add(checkFuture(templateGet("resource").saveAsString())); -// -// futures.add(checkFile(intf.entityResource(content))); -// futures.add(checkFuture(templateGet("entityResource").saveAsString())); -// -// futures.add(checkFile(intf.entityInputStream(content))); -// futures.add(checkFuture(templateGet("entityInputStream").saveAsString())); -// -// futures.add(checkFile(intf.bytes(content))); -// futures.add(checkFuture(templateGet("bytes").saveAsString())); -// -// futures.add(checkFile(intf.netInputStream(content))); -// futures.add(checkFuture(templateGet("netInputStream").saveAsString())); -// -// try { -// CompletableFuture -// .allOf(futures.toArray(new CompletableFuture[futures.size()])) -// .get(); -// } catch (InterruptedException | ExecutionException e1) { -// TestMgr.failed("test download failed.", e1); -// } -// } -} diff --git a/integration-tests/it-edge/src/main/resources/microservice.yaml b/integration-tests/it-edge/src/main/resources/microservice.yaml index e5c6ea928..888f1dd6f 100644 --- a/integration-tests/it-edge/src/main/resources/microservice.yaml +++ b/integration-tests/it-edge/src/main/resources/microservice.yaml @@ -25,27 +25,54 @@ servicecomb: default: loadbalance service: it-auth: loadbalance -# this is not the majority, not testing this feature by default -# rest: -# parameter: -# decodeAsObject: true -# query: -# ignoreDefaultValue: true -# emptyAsNull: true + # this is not the majority, not testing this feature by default + # rest: + # parameter: + # decodeAsObject: true + # query: + # ignoreDefaultValue: true + # emptyAsNull: true operation: it-producer: + download: + slowInputStream: + transport: rest + defaultJsonValueJaxrs: + jsonInput: + transport: rest + queryInput: + transport: rest + it-producer-h2: + download: + slowInputStream: + transport: rest + defaultJsonValueJaxrs: + jsonInput: + transport: rest + queryInput: + transport: rest + it-producer-h2c: + download: + slowInputStream: + transport: rest defaultJsonValueJaxrs: jsonInput: transport: rest queryInput: transport: rest it-producer-deploy-springboot2-servlet: + download: + slowInputStream: + transport: rest defaultJsonValueJaxrs: jsonInput: transport: rest queryInput: transport: rest it-producer-deploy-springboot2-standalone: + download: + slowInputStream: + transport: rest defaultJsonValueJaxrs: jsonInput: transport: rest diff --git a/integration-tests/it-producer/src/main/java/org/apache/servicecomb/it/schema/DownloadSchema.java b/integration-tests/it-producer/src/main/java/org/apache/servicecomb/it/schema/DownloadSchema.java index 651caaccb..b1123b60f 100644 --- a/integration-tests/it-producer/src/main/java/org/apache/servicecomb/it/schema/DownloadSchema.java +++ b/integration-tests/it-producer/src/main/java/org/apache/servicecomb/it/schema/DownloadSchema.java @@ -20,6 +20,9 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.lang.Thread.State; import java.net.HttpURLConnection; import java.net.URI; import java.net.URL; @@ -27,15 +30,18 @@ import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import java.util.UUID; +import java.util.concurrent.TimeUnit; import javax.servlet.http.Part; import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; import org.apache.http.entity.StringEntity; import org.apache.http.impl.bootstrap.HttpServer; import org.apache.http.impl.bootstrap.ServerBootstrap; import org.apache.servicecomb.core.BootListener; import org.apache.servicecomb.foundation.common.part.FilePart; +import org.apache.servicecomb.it.ITUtils; import org.apache.servicecomb.provider.rest.common.RestSchema; import org.springframework.core.io.ByteArrayResource; import org.springframework.core.io.Resource; @@ -49,7 +55,7 @@ import io.swagger.annotations.ApiResponses; @RestSchema(schemaId = "download") -@RequestMapping(path = "/base/v1//download") +@RequestMapping(path = "/v1/download") public class DownloadSchema implements BootListener { File tempDir = new File("target/downloadTemp"); @@ -194,4 +200,45 @@ public String getFilename() { conn.disconnect(); return responseEntity; } + + private Thread slowInputStreamThread; + + @GetMapping(path = "/waitSlowInputStreamClosed") + public void waitSlowInputStreamClosed() { + while (!slowInputStreamThread.getState().equals(State.TERMINATED)) { + ITUtils.forceWait(TimeUnit.MILLISECONDS, 500); + } + } + + @ApiResponses({@ApiResponse(code = 200, response = File.class, message = "")}) + @GetMapping(path = "/slowInputStream") + public ResponseEntity<InputStream> slowInputStream() throws IOException { + PipedInputStream in = new PipedInputStream(); + PipedOutputStream out = new PipedOutputStream(); + in.connect(out); + + slowInputStreamThread = new Thread(() -> { + Thread.currentThread().setName("download thread"); + byte[] bytes = "1".getBytes(); + for (; ; ) { + try { + out.write(bytes); + out.flush(); + Thread.sleep(1000); + } catch (Throwable e) { + break; + } + } + + IOUtils.closeQuietly(out); + }); + slowInputStreamThread.start(); + + ResponseEntity<InputStream> responseEntity = ResponseEntity + .ok() + .header(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_PLAIN_VALUE) + .header(HttpHeaders.CONTENT_DISPOSITION, "attachment;filename=slowInputStream.txt") + .body(in); + return responseEntity; + } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > [vertx] downloading, client disconnect first, should close WriteStream right > now. > --------------------------------------------------------------------------------- > > Key: SCB-1001 > URL: https://issues.apache.org/jira/browse/SCB-1001 > Project: Apache ServiceComb > Issue Type: Sub-task > Components: Java-Chassis > Reporter: wujimin > Assignee: wujimin > Priority: Major > Fix For: java-chassis-1.1.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)