This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch http-stream
in repository https://gitbox.apache.org/repos/asf/camel.git

commit d05f0eb81884ac1cc712210574995b2c6897d182
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed Aug 21 12:12:58 2024 +0200

    CAMEL-20938: http producer with disableStreamCache=true should use the raw 
stream as-is
---
 .../apache/camel/component/http/HttpProducer.java  | 16 +++++----
 .../component/http/HttpDisableStreamCacheTest.java | 40 ++++++++++++++++------
 2 files changed, 38 insertions(+), 18 deletions(-)

diff --git 
a/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java
 
b/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java
index b8386a1b277..4004e680800 100644
--- 
a/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java
+++ 
b/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java
@@ -474,12 +474,16 @@ public class HttpProducer extends DefaultProducer {
      * @throws IOException can be thrown
      */
     protected <T> T executeMethod(HttpHost httpHost, HttpUriRequest 
httpRequest, HttpClientResponseHandler<T> handler)
-            throws IOException {
-        HttpContext localContext = HttpClientContext.create();
+            throws IOException, HttpException {
+        HttpContext localContext;
         if (httpContext != null) {
             localContext = new BasicHttpContext(httpContext);
+        } else {
+            localContext = HttpClientContext.create();
         }
-        return httpClient.execute(httpHost, httpRequest, localContext, 
handler);
+        // execute open that does not automatic close response input-stream 
(this is done in exchange on-completion by Camel)
+        ClassicHttpResponse res = httpClient.executeOpen(httpHost, 
httpRequest, localContext);
+        return handler.handleResponse(res);
     }
 
     /**
@@ -550,10 +554,8 @@ public class HttpProducer extends DefaultProducer {
         } else {
             if (entity.isStreaming()) {
                 if (getEndpoint().isDisableStreamCache()) {
-                    // write to in-memory buffer
-                    ByteArrayOutputStream bos = new 
ByteArrayOutputStream(BUFFER_SIZE);
-                    entity.writeTo(bos);
-                    return bos.toByteArray();
+                    // use the response as-is
+                    return is;
                 } else {
                     int max = 
getEndpoint().getComponent().getResponsePayloadStreamingThreshold();
                     if (max > 0) {
diff --git 
a/components/camel-http/src/test/java/org/apache/camel/component/http/HttpDisableStreamCacheTest.java
 
b/components/camel-http/src/test/java/org/apache/camel/component/http/HttpDisableStreamCacheTest.java
index 19cb4595405..daa89f482d3 100644
--- 
a/components/camel-http/src/test/java/org/apache/camel/component/http/HttpDisableStreamCacheTest.java
+++ 
b/components/camel-http/src/test/java/org/apache/camel/component/http/HttpDisableStreamCacheTest.java
@@ -16,14 +16,18 @@
  */
 package org.apache.camel.component.http;
 
-import org.apache.camel.Exchange;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.http.handler.BasicValidationHandler;
+import org.apache.camel.util.IOHelper;
 import org.apache.hc.core5.http.impl.bootstrap.HttpServer;
 import org.apache.hc.core5.http.impl.bootstrap.ServerBootstrap;
 import org.junit.jupiter.api.Test;
 
 import static org.apache.camel.http.common.HttpMethods.GET;
-import static org.apache.camel.test.junit5.TestSupport.assertIsInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 
@@ -42,7 +46,6 @@ public class HttpDisableStreamCacheTest extends BaseHttpTest {
 
     @Override
     public void cleanupResources() throws Exception {
-
         if (localServer != null) {
             localServer.stop();
         }
@@ -50,15 +53,30 @@ public class HttpDisableStreamCacheTest extends 
BaseHttpTest {
 
     @Test
     public void httpDisableStreamCache() {
-        Exchange exchange = template.request("http://localhost:";
-                                             + localServer.getLocalPort() + 
"/test/?disableStreamCache=true",
-                exchange1 -> {
-                });
+        Object out = template.requestBody("direct:start", (String) null);
+        assertEquals("camel rocks!", 
context.getTypeConverter().convertTo(String.class, out));
+    }
 
-        byte[] arr = assertIsInstanceOf(byte[].class, 
exchange.getMessage().getBody());
-        assertNotNull(arr);
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").streamCache("false")
+                        .to("http://localhost:"; + localServer.getLocalPort() + 
"/test/?disableStreamCache=true")
+                        .process(e -> {
+                            InputStream is = (InputStream) 
e.getMessage().getBody();
+                            assertNotNull(is);
 
-        assertEquals("camel rocks!", 
context.getTypeConverter().convertTo(String.class, arr));
-    }
+                            // we can only read the raw http stream once
+                            ByteArrayOutputStream bos = new 
ByteArrayOutputStream();
+                            IOHelper.copy(is, bos);
+
+                            e.setVariable("newBody", bos.toString());
+                        })
+                        .setBody().variable("newBody");
 
+            }
+        };
+    }
 }

Reply via email to