[
https://issues.apache.org/jira/browse/CXF-9192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Andriy Redko resolved CXF-9192.
-------------------------------
Resolution: Fixed
> Inconsistent "best effort" read of body by web-client when protocol is HTTP/2
> -----------------------------------------------------------------------------
>
> Key: CXF-9192
> URL: https://issues.apache.org/jira/browse/CXF-9192
> Project: CXF
> Issue Type: Bug
> Components: Transports
> Affects Versions: 4.1.3
> Environment: JDK11+
> Reporter: Oleksandr Zhelezniak
> Assignee: Andriy Redko
> Priority: Major
> Fix For: 4.1.5, 3.6.10, 4.0.11
>
>
> Looks like web-client is using httpv2 concurrently unsafely to read the data
> from the input stream.
>
> Module: *cxf-rt-transports-http*
> inside the method:
> org.apache.cxf.transport.http.HttpClientHTTPConduit.HttpClientWrappedOutputStream#getInputStream
>
> It checks if the body content exists by calling java.io.InputStream#available
> But it doesn't block to wait for incoming byte buffers, and it checks for
> available bytes, but the method doesn't really wait for the bytes to appear
> in the blocking queue, it just checks if it's empty or not
> jdk.internal.net.http.ResponseSubscribers.HttpResponseInputStream#available
> (when "buffers.isEmpty" == true)
> Sometimes you might get lucky, and the buffer will appear in the blocking
> queue before you check the input.available() sometimes it's not.
>
> This issue can be reproduced when the server answers with (status code:
> sc >= 300 && sc < 500)
> org.apache.cxf.transport.http.HttpClientHTTPConduit.HttpClientWrappedOutputStream#getInputStream
> and when the header "content-length" is absent.
>
> You can reproduce it with this code:
> (apache-cxf + junit5 and jdk11+)
> {code:java}
> package org.talend.components.stewardship.source;
> import static java.util.concurrent.TimeUnit.MILLISECONDS;
> import jakarta.ws.rs.core.Response;
> import java.io.ByteArrayOutputStream;
> import java.io.InputStream;
> import java.lang.reflect.Field;
> import java.net.URI;
> import java.net.http.HttpClient;
> import java.net.http.HttpClient.Version;
> import java.net.http.HttpRequest;
> import java.net.http.HttpResponse;
> import java.net.http.HttpResponse.BodyHandlers;
> import java.util.Map;
> import java.util.concurrent.CompletableFuture;
> import org.apache.cxf.jaxrs.client.ClientConfiguration;
> import org.apache.cxf.jaxrs.client.WebClient;
> import org.apache.cxf.transport.http.HTTPConduit;
> import org.apache.cxf.transport.http.HTTPTransportFactory;
> import org.junit.jupiter.api.Test;
> import lombok.SneakyThrows;
> class ReproduceConcurrentIssue {
> @SneakyThrows
> private static boolean getCxf(final String url) {
>
> System.setProperty("org.apache.cxf.transport.http.forceURLConnection",
> "false");
> boolean isUsingHttpUrlConnection =
> HTTPTransportFactory.isForceURLConnectionConduit();
> if (isUsingHttpUrlConnection) {
> Field forceURLConnectionConduitField =
>
> HTTPTransportFactory.class.getDeclaredField("forceURLConnectionConduit");
> forceURLConnectionConduitField.setAccessible(true);
> forceURLConnectionConduitField.setBoolean((Object) null, false);
> }
> WebClient webClient = WebClient.create(url);
> ClientConfiguration config = WebClient.getConfig(webClient);
> final Map<String, Object> requestContext = config.getRequestContext();
> requestContext.put(HTTPConduit.FORCE_HTTP_VERSION, "2");
> requestContext.put("http.redirect.max.same.uri.count", 3);
> requestContext.put("http.redirect.relative.uri", "true");
> requestContext.put("http.redirect.same.host.only", "false");
> final HTTPConduit conduit =
> WebClient.getConfig(webClient).getHttpConduit();
> // this enables redirects so the content-length header won't exists
> conduit.getClient().setAutoRedirect(true);
> final Response response = webClient.invoke("GET", null);
> InputStream inputStream = (InputStream) response.getEntity();
> if (inputStream == null) {
> return false;
> }
> try (inputStream; ByteArrayOutputStream baos = new
> ByteArrayOutputStream()) {
> byte[] buffer = new byte[1024];
> int bytesRead;
> while ((bytesRead = inputStream.read(buffer)) != -1) {
> baos.write(buffer, 0, bytesRead);
> }
> return baos.toByteArray().length > 0;
> }
> }
> @SneakyThrows
> private static boolean getJdk(final String url) {
> final HttpClient client = HttpClient.newBuilder()
> .version(Version.HTTP_2)
> .build();
> HttpRequest request = HttpRequest.newBuilder()
> .uri(new URI(url))
> .GET()
> .build();
> CompletableFuture<HttpResponse<InputStream>> future =
> client.sendAsync(
> request,
> BodyHandlers.ofInputStream());
> final HttpResponse<InputStream> httpResponse = future.get(120_000,
> MILLISECONDS);
> final InputStream body = httpResponse.body();
> if (body.available() <= 0) {
> try (body) {
> return false;
> }
> } else {
> return true;
> }
> }
> @Test
> void testProblem() {
> /* try to increase this one if you can't reproduce */
> final int tryAmount = 100;
> int available = 0;
> int notAvailable = 0;
> for (int i = 0; i < tryAmount; i++) {
> if (getCxf("https://www.cloudflare.com/non-existent-path")) {
> available++;
> } else {
> notAvailable++;
> }
> }
> System.err.println("Response body was empty: " + notAvailable);
> System.out.println("Response body is available: " + available);
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)