[ 
https://issues.apache.org/jira/browse/CXF-9192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Zhelezniak updated CXF-9192:
--------------------------------------
    Description: 
Looks like web-client is using httpv2 concurrently unsafely to read the data 
from the input stream.

 

Module: *cxf-rt-transports-http*

inside the method: 
org.apache.cxf.transport.http.HttpClientHTTPConduit.HttpClientWrappedOutputStream#getInputStream
 

It checks if the body content exists by calling java.io.InputStream#available

But it doesn't block to wait for incoming byte buffers, and it checks for 
available bytes, but the method doesn't really wait for the bytes to appear in 
the blocking queue, it just checks if it's empty or not

jdk.internal.net.http.ResponseSubscribers.HttpResponseInputStream#available

(when "buffers.isEmpty" == true)

Sometimes you might get lucky, and the buffer will appear in the blocking queue 
before you check the input.available() sometimes it's not.

 

This issue can be reproduced when the server answers with (status code:
sc >= 300 && sc < 500)
org.apache.cxf.transport.http.HttpClientHTTPConduit.HttpClientWrappedOutputStream#getInputStream
and when the header "content-length" is absent. 

 

You can reproduce it with this code:

(apache-cxf + junit5 and jdk11+)

{code:java}
package org.talend.components.stewardship.source;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

import jakarta.ws.rs.core.Response;

import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpClient.Version;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import org.apache.cxf.jaxrs.client.ClientConfiguration;
import org.apache.cxf.jaxrs.client.WebClient;
import org.apache.cxf.transport.http.HTTPConduit;
import org.apache.cxf.transport.http.HTTPTransportFactory;
import org.junit.jupiter.api.Test;

import lombok.SneakyThrows;

class ReproduceConcurrentIssue {

    @SneakyThrows
    private static boolean getCxf(final String url) {
        System.setProperty("org.apache.cxf.transport.http.forceURLConnection", 
"false");

        boolean isUsingHttpUrlConnection = 
HTTPTransportFactory.isForceURLConnectionConduit();
        if (isUsingHttpUrlConnection) {
            Field forceURLConnectionConduitField =
                    
HTTPTransportFactory.class.getDeclaredField("forceURLConnectionConduit");
            forceURLConnectionConduitField.setAccessible(true);
            forceURLConnectionConduitField.setBoolean((Object) null, false);
        }

        WebClient webClient = WebClient.create(url);
        ClientConfiguration config = WebClient.getConfig(webClient);
        final Map<String, Object> requestContext = config.getRequestContext();
        requestContext.put(HTTPConduit.FORCE_HTTP_VERSION, "2");
        requestContext.put("http.redirect.max.same.uri.count", 3);
        requestContext.put("http.redirect.relative.uri", "true");
        requestContext.put("http.redirect.same.host.only", "false");
        final HTTPConduit conduit = 
WebClient.getConfig(webClient).getHttpConduit();
        // this enables redirects so the content-length header won't exists
        conduit.getClient().setAutoRedirect(true);

        final Response response = webClient.invoke("GET", null);

        InputStream inputStream = (InputStream) response.getEntity();
        if (inputStream == null) {
            return false;
        }

        try (inputStream; ByteArrayOutputStream baos = new 
ByteArrayOutputStream()) {
            byte[] buffer = new byte[1024];
            int bytesRead;

            while ((bytesRead = inputStream.read(buffer)) != -1) {
                baos.write(buffer, 0, bytesRead);
            }

            return baos.toByteArray().length > 0;
        }
    }

    @SneakyThrows
    private static boolean getJdk(final String url) {
        final HttpClient client = HttpClient.newBuilder()
                .version(Version.HTTP_2)
                .build();
        HttpRequest request = HttpRequest.newBuilder()
                .uri(new URI(url))
                .GET()
                .build();

        CompletableFuture<HttpResponse<InputStream>> future = client.sendAsync(
                request,
                BodyHandlers.ofInputStream());
        final HttpResponse<InputStream> httpResponse = future.get(120_000, 
MILLISECONDS);
        final InputStream body = httpResponse.body();
        if (body.available() <= 0) {
            try (body) {
                return false;
            }
        } else {
            return true;
        }
    }

    @Test
    void testProblem() {
        /* try to increase this one if you can't reproduce */
        final int tryAmount = 100;

        int available = 0;
        int notAvailable = 0;
        for (int i = 0; i < tryAmount; i++) {
            if (getCxf("https://www.cloudflare.com/non-existent-path";)) {
                available++;
            } else {
                notAvailable++;
            }
        }
        System.err.println("Response body was empty: " + notAvailable);
        System.out.println("Response body is available: " + available);
    }
}
 {code}

  was:
Looks like web-client is using httpv2 concurrently unsafely to read the data 
from the input stream.

 

Module: *cxf-rt-transports-http*

inside the method: 
org.apache.cxf.transport.http.HttpClientHTTPConduit.HttpClientWrappedOutputStream#getInputStream
 

It checks if the body content exists by calling java.io.InputStream#available

But it doesn't block to wait for incoming byte buffers, and it checks for 
available bytes, but the method doesn't really wait for the bytes to appear in 
the blocking queue, it just checks if it's empty or not

jdk.internal.net.http.ResponseSubscribers.HttpResponseInputStream#available

(when "buffers.isEmpty" == true)

Sometimes you might get lucky, and the buffer will appear in the blocking queue 
before you check the input.available() sometimes it's not.

 

This issue can be reproduced when the server answers with (status code:
sc >= 300 && sc < 500)
org.apache.cxf.transport.http.HttpClientHTTPConduit.HttpClientWrappedOutputStream#getInputStream
and when the header "content-length" is absent. 

 

You can reproduce it with this code:

(apache-cxf + junit5 and jdk11+)

{code:java}
package org.talend.components.stewardship.source;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

import jakarta.ws.rs.core.Response;

import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpClient.Version;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import org.apache.cxf.jaxrs.client.ClientConfiguration;
import org.apache.cxf.jaxrs.client.WebClient;
import org.apache.cxf.transport.http.HTTPConduit;
import org.apache.cxf.transport.http.HTTPTransportFactory;
import org.junit.jupiter.api.Test;

import lombok.SneakyThrows;

class ReproduceConcurrentIssue {

    @SneakyThrows
    private static boolean getCxf(final String url) {
        System.setProperty("org.apache.cxf.transport.http.forceURLConnection", 
"false");

        boolean isUsingHttpUrlConnection = 
HTTPTransportFactory.isForceURLConnectionConduit();
        if (isUsingHttpUrlConnection) {
            Field forceURLConnectionConduitField =
                    
HTTPTransportFactory.class.getDeclaredField("forceURLConnectionConduit");
            forceURLConnectionConduitField.setAccessible(true);
            forceURLConnectionConduitField.setBoolean((Object) null, false);
        }

        WebClient webClient = WebClient.create(url);
        ClientConfiguration config = WebClient.getConfig(webClient);
        final Map<String, Object> requestContext = config.getRequestContext();
        requestContext.put(HTTPConduit.FORCE_HTTP_VERSION, "2");
        requestContext.put("http.redirect.max.same.uri.count", 3);
        requestContext.put("http.redirect.relative.uri", "true");
        requestContext.put("http.redirect.same.host.only", "false");
        final HTTPConduit conduit = 
WebClient.getConfig(webClient).getHttpConduit();
        conduit.getClient().setAutoRedirect(true);

        final Response response = webClient.invoke("GET", null);

        InputStream inputStream = (InputStream) response.getEntity();
        if (inputStream == null) {
            return false;
        }

        try (inputStream; ByteArrayOutputStream baos = new 
ByteArrayOutputStream()) {
            byte[] buffer = new byte[1024];
            int bytesRead;

            while ((bytesRead = inputStream.read(buffer)) != -1) {
                baos.write(buffer, 0, bytesRead);
            }

            return baos.toByteArray().length > 0;
        }
    }

    @SneakyThrows
    private static boolean getJdk(final String url) {
        final HttpClient client = HttpClient.newBuilder()
                .version(Version.HTTP_2)
                .build();
        HttpRequest request = HttpRequest.newBuilder()
                .uri(new URI(url))
                .GET()
                .build();

        CompletableFuture<HttpResponse<InputStream>> future = client.sendAsync(
                request,
                BodyHandlers.ofInputStream());
        final HttpResponse<InputStream> httpResponse = future.get(120_000, 
MILLISECONDS);
        final InputStream body = httpResponse.body();
        if (body.available() <= 0) {
            try (body) {
                return false;
            }
        } else {
            return true;
        }
    }

    @Test
    void testProblem() {
        /* try to increase this one if you can't reproduce */
        final int tryAmount = 100;

        int available = 0;
        int notAvailable = 0;
        for (int i = 0; i < tryAmount; i++) {
            if (getCxf("https://www.cloudflare.com/non-existent-path";)) {
                available++;
            } else {
                notAvailable++;
            }
        }
        System.err.println("Response body was empty: " + notAvailable);
        System.out.println("Response body is available: " + available);
    }
}
 {code}


> Unsafe concurrent read of body by web-client when protocol HTTPv2
> -----------------------------------------------------------------
>
>                 Key: CXF-9192
>                 URL: https://issues.apache.org/jira/browse/CXF-9192
>             Project: CXF
>          Issue Type: Bug
>          Components: Transports
>    Affects Versions: 4.1.3
>         Environment: JDK11+
>            Reporter: Oleksandr Zhelezniak
>            Priority: Major
>
> Looks like web-client is using httpv2 concurrently unsafely to read the data 
> from the input stream.
>  
> Module: *cxf-rt-transports-http*
> inside the method: 
> org.apache.cxf.transport.http.HttpClientHTTPConduit.HttpClientWrappedOutputStream#getInputStream
>  
> It checks if the body content exists by calling java.io.InputStream#available
> But it doesn't block to wait for incoming byte buffers, and it checks for 
> available bytes, but the method doesn't really wait for the bytes to appear 
> in the blocking queue, it just checks if it's empty or not
> jdk.internal.net.http.ResponseSubscribers.HttpResponseInputStream#available
> (when "buffers.isEmpty" == true)
> Sometimes you might get lucky, and the buffer will appear in the blocking 
> queue before you check the input.available() sometimes it's not.
>  
> This issue can be reproduced when the server answers with (status code:
> sc >= 300 && sc < 500)
> org.apache.cxf.transport.http.HttpClientHTTPConduit.HttpClientWrappedOutputStream#getInputStream
> and when the header "content-length" is absent. 
>  
> You can reproduce it with this code:
> (apache-cxf + junit5 and jdk11+)
> {code:java}
> package org.talend.components.stewardship.source;
> import static java.util.concurrent.TimeUnit.MILLISECONDS;
> import jakarta.ws.rs.core.Response;
> import java.io.ByteArrayOutputStream;
> import java.io.InputStream;
> import java.lang.reflect.Field;
> import java.net.URI;
> import java.net.http.HttpClient;
> import java.net.http.HttpClient.Version;
> import java.net.http.HttpRequest;
> import java.net.http.HttpResponse;
> import java.net.http.HttpResponse.BodyHandlers;
> import java.util.Map;
> import java.util.concurrent.CompletableFuture;
> import org.apache.cxf.jaxrs.client.ClientConfiguration;
> import org.apache.cxf.jaxrs.client.WebClient;
> import org.apache.cxf.transport.http.HTTPConduit;
> import org.apache.cxf.transport.http.HTTPTransportFactory;
> import org.junit.jupiter.api.Test;
> import lombok.SneakyThrows;
> class ReproduceConcurrentIssue {
>     @SneakyThrows
>     private static boolean getCxf(final String url) {
>         
> System.setProperty("org.apache.cxf.transport.http.forceURLConnection", 
> "false");
>         boolean isUsingHttpUrlConnection = 
> HTTPTransportFactory.isForceURLConnectionConduit();
>         if (isUsingHttpUrlConnection) {
>             Field forceURLConnectionConduitField =
>                     
> HTTPTransportFactory.class.getDeclaredField("forceURLConnectionConduit");
>             forceURLConnectionConduitField.setAccessible(true);
>             forceURLConnectionConduitField.setBoolean((Object) null, false);
>         }
>         WebClient webClient = WebClient.create(url);
>         ClientConfiguration config = WebClient.getConfig(webClient);
>         final Map<String, Object> requestContext = config.getRequestContext();
>         requestContext.put(HTTPConduit.FORCE_HTTP_VERSION, "2");
>         requestContext.put("http.redirect.max.same.uri.count", 3);
>         requestContext.put("http.redirect.relative.uri", "true");
>         requestContext.put("http.redirect.same.host.only", "false");
>         final HTTPConduit conduit = 
> WebClient.getConfig(webClient).getHttpConduit();
>         // this enables redirects so the content-length header won't exists
>         conduit.getClient().setAutoRedirect(true);
>         final Response response = webClient.invoke("GET", null);
>         InputStream inputStream = (InputStream) response.getEntity();
>         if (inputStream == null) {
>             return false;
>         }
>         try (inputStream; ByteArrayOutputStream baos = new 
> ByteArrayOutputStream()) {
>             byte[] buffer = new byte[1024];
>             int bytesRead;
>             while ((bytesRead = inputStream.read(buffer)) != -1) {
>                 baos.write(buffer, 0, bytesRead);
>             }
>             return baos.toByteArray().length > 0;
>         }
>     }
>     @SneakyThrows
>     private static boolean getJdk(final String url) {
>         final HttpClient client = HttpClient.newBuilder()
>                 .version(Version.HTTP_2)
>                 .build();
>         HttpRequest request = HttpRequest.newBuilder()
>                 .uri(new URI(url))
>                 .GET()
>                 .build();
>         CompletableFuture<HttpResponse<InputStream>> future = 
> client.sendAsync(
>                 request,
>                 BodyHandlers.ofInputStream());
>         final HttpResponse<InputStream> httpResponse = future.get(120_000, 
> MILLISECONDS);
>         final InputStream body = httpResponse.body();
>         if (body.available() <= 0) {
>             try (body) {
>                 return false;
>             }
>         } else {
>             return true;
>         }
>     }
>     @Test
>     void testProblem() {
>         /* try to increase this one if you can't reproduce */
>         final int tryAmount = 100;
>         int available = 0;
>         int notAvailable = 0;
>         for (int i = 0; i < tryAmount; i++) {
>             if (getCxf("https://www.cloudflare.com/non-existent-path";)) {
>                 available++;
>             } else {
>                 notAvailable++;
>             }
>         }
>         System.err.println("Response body was empty: " + notAvailable);
>         System.out.println("Response body is available: " + available);
>     }
> }
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to