This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch http-stream in repository https://gitbox.apache.org/repos/asf/camel.git
commit d05f0eb81884ac1cc712210574995b2c6897d182 Author: Claus Ibsen <[email protected]> AuthorDate: Wed Aug 21 12:12:58 2024 +0200 CAMEL-20938: http producer with disableStreamCache=true should use the raw stream as-is --- .../apache/camel/component/http/HttpProducer.java | 16 +++++---- .../component/http/HttpDisableStreamCacheTest.java | 40 ++++++++++++++++------ 2 files changed, 38 insertions(+), 18 deletions(-) diff --git a/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java b/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java index b8386a1b277..4004e680800 100644 --- a/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java +++ b/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java @@ -474,12 +474,16 @@ public class HttpProducer extends DefaultProducer { * @throws IOException can be thrown */ protected <T> T executeMethod(HttpHost httpHost, HttpUriRequest httpRequest, HttpClientResponseHandler<T> handler) - throws IOException { - HttpContext localContext = HttpClientContext.create(); + throws IOException, HttpException { + HttpContext localContext; if (httpContext != null) { localContext = new BasicHttpContext(httpContext); + } else { + localContext = HttpClientContext.create(); } - return httpClient.execute(httpHost, httpRequest, localContext, handler); + // execute open that does not automatic close response input-stream (this is done in exchange on-completion by Camel) + ClassicHttpResponse res = httpClient.executeOpen(httpHost, httpRequest, localContext); + return handler.handleResponse(res); } /** @@ -550,10 +554,8 @@ public class HttpProducer extends DefaultProducer { } else { if (entity.isStreaming()) { if (getEndpoint().isDisableStreamCache()) { - // write to in-memory buffer - ByteArrayOutputStream bos = new ByteArrayOutputStream(BUFFER_SIZE); - entity.writeTo(bos); - return bos.toByteArray(); + // use the response as-is + return is; } else { int max = getEndpoint().getComponent().getResponsePayloadStreamingThreshold(); if (max > 0) { diff --git a/components/camel-http/src/test/java/org/apache/camel/component/http/HttpDisableStreamCacheTest.java b/components/camel-http/src/test/java/org/apache/camel/component/http/HttpDisableStreamCacheTest.java index 19cb4595405..daa89f482d3 100644 --- a/components/camel-http/src/test/java/org/apache/camel/component/http/HttpDisableStreamCacheTest.java +++ b/components/camel-http/src/test/java/org/apache/camel/component/http/HttpDisableStreamCacheTest.java @@ -16,14 +16,18 @@ */ package org.apache.camel.component.http; -import org.apache.camel.Exchange; +import java.io.ByteArrayOutputStream; +import java.io.InputStream; + +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.http.handler.BasicValidationHandler; +import org.apache.camel.util.IOHelper; import org.apache.hc.core5.http.impl.bootstrap.HttpServer; import org.apache.hc.core5.http.impl.bootstrap.ServerBootstrap; import org.junit.jupiter.api.Test; import static org.apache.camel.http.common.HttpMethods.GET; -import static org.apache.camel.test.junit5.TestSupport.assertIsInstanceOf; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -42,7 +46,6 @@ public class HttpDisableStreamCacheTest extends BaseHttpTest { @Override public void cleanupResources() throws Exception { - if (localServer != null) { localServer.stop(); } @@ -50,15 +53,30 @@ public class HttpDisableStreamCacheTest extends BaseHttpTest { @Test public void httpDisableStreamCache() { - Exchange exchange = template.request("http://localhost:" - + localServer.getLocalPort() + "/test/?disableStreamCache=true", - exchange1 -> { - }); + Object out = template.requestBody("direct:start", (String) null); + assertEquals("camel rocks!", context.getTypeConverter().convertTo(String.class, out)); + } - byte[] arr = assertIsInstanceOf(byte[].class, exchange.getMessage().getBody()); - assertNotNull(arr); + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").streamCache("false") + .to("http://localhost:" + localServer.getLocalPort() + "/test/?disableStreamCache=true") + .process(e -> { + InputStream is = (InputStream) e.getMessage().getBody(); + assertNotNull(is); - assertEquals("camel rocks!", context.getTypeConverter().convertTo(String.class, arr)); - } + // we can only read the raw http stream once + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + IOHelper.copy(is, bos); + + e.setVariable("newBody", bos.toString()); + }) + .setBody().variable("newBody"); + } + }; + } }
