[
https://issues.apache.org/jira/browse/CXF-9192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Oleksandr Zhelezniak updated CXF-9192:
--------------------------------------
Description:
Looks like web-client is using httpv2 concurrently unsafely to read the data
from the input stream.
Module: *cxf-rt-transports-http*
inside the method:
org.apache.cxf.transport.http.HttpClientHTTPConduit.HttpClientWrappedOutputStream#getInputStream
It checks if the body content exists by calling java.io.InputStream#available
But it doesn't block to wait for incoming byte buffers, and it checks for
available bytes, but the method doesn't really wait for the bytes to appear in
the blocking queue, it just checks if it's empty or not
jdk.internal.net.http.ResponseSubscribers.HttpResponseInputStream#available
(when "buffers.isEmpty" == true)
Sometimes you might get lucky, and the buffer will appear in the blocking queue
before you check the input.available() sometimes it's not.
This issue can be reproduced when the server answers with (status code:
sc >= 300 && sc < 500)
org.apache.cxf.transport.http.HttpClientHTTPConduit.HttpClientWrappedOutputStream#getInputStream
and when the header "content-length" is absent.
You can reproduce it with this code:
(apache-cxf + junit5 and jdk11+)
{code:java}
package org.talend.components.stewardship.source;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import jakarta.ws.rs.core.Response;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpClient.Version;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.cxf.jaxrs.client.ClientConfiguration;
import org.apache.cxf.jaxrs.client.WebClient;
import org.apache.cxf.transport.http.HTTPConduit;
import org.apache.cxf.transport.http.HTTPTransportFactory;
import org.junit.jupiter.api.Test;
import lombok.SneakyThrows;
class ReproduceConcurrentIssue {
@SneakyThrows
private static boolean getCxf(final String url) {
System.setProperty("org.apache.cxf.transport.http.forceURLConnection",
"false");
boolean isUsingHttpUrlConnection =
HTTPTransportFactory.isForceURLConnectionConduit();
if (isUsingHttpUrlConnection) {
Field forceURLConnectionConduitField =
HTTPTransportFactory.class.getDeclaredField("forceURLConnectionConduit");
forceURLConnectionConduitField.setAccessible(true);
forceURLConnectionConduitField.setBoolean((Object) null, false);
}
WebClient webClient = WebClient.create(url);
ClientConfiguration config = WebClient.getConfig(webClient);
final Map<String, Object> requestContext = config.getRequestContext();
requestContext.put(HTTPConduit.FORCE_HTTP_VERSION, "2");
requestContext.put("http.redirect.max.same.uri.count", 3);
requestContext.put("http.redirect.relative.uri", "true");
requestContext.put("http.redirect.same.host.only", "false");
final HTTPConduit conduit =
WebClient.getConfig(webClient).getHttpConduit();
// this enables redirects so the content-length header won't exists
conduit.getClient().setAutoRedirect(true);
final Response response = webClient.invoke("GET", null);
InputStream inputStream = (InputStream) response.getEntity();
if (inputStream == null) {
return false;
}
try (inputStream; ByteArrayOutputStream baos = new
ByteArrayOutputStream()) {
byte[] buffer = new byte[1024];
int bytesRead;
while ((bytesRead = inputStream.read(buffer)) != -1) {
baos.write(buffer, 0, bytesRead);
}
return baos.toByteArray().length > 0;
}
}
@SneakyThrows
private static boolean getJdk(final String url) {
final HttpClient client = HttpClient.newBuilder()
.version(Version.HTTP_2)
.build();
HttpRequest request = HttpRequest.newBuilder()
.uri(new URI(url))
.GET()
.build();
CompletableFuture<HttpResponse<InputStream>> future = client.sendAsync(
request,
BodyHandlers.ofInputStream());
final HttpResponse<InputStream> httpResponse = future.get(120_000,
MILLISECONDS);
final InputStream body = httpResponse.body();
if (body.available() <= 0) {
try (body) {
return false;
}
} else {
return true;
}
}
@Test
void testProblem() {
/* try to increase this one if you can't reproduce */
final int tryAmount = 100;
int available = 0;
int notAvailable = 0;
for (int i = 0; i < tryAmount; i++) {
if (getCxf("https://www.cloudflare.com/non-existent-path")) {
available++;
} else {
notAvailable++;
}
}
System.err.println("Response body was empty: " + notAvailable);
System.out.println("Response body is available: " + available);
}
}
{code}
was:
Looks like web-client is using httpv2 concurrently unsafely to read the data
from the input stream.
Module: *cxf-rt-transports-http*
inside the method:
org.apache.cxf.transport.http.HttpClientHTTPConduit.HttpClientWrappedOutputStream#getInputStream
It checks if the body content exists by calling java.io.InputStream#available
But it doesn't block to wait for incoming byte buffers, and it checks for
available bytes, but the method doesn't really wait for the bytes to appear in
the blocking queue, it just checks if it's empty or not
jdk.internal.net.http.ResponseSubscribers.HttpResponseInputStream#available
(when "buffers.isEmpty" == true)
Sometimes you might get lucky, and the buffer will appear in the blocking queue
before you check the input.available() sometimes it's not.
This issue can be reproduced when the server answers with (status code:
sc >= 300 && sc < 500)
org.apache.cxf.transport.http.HttpClientHTTPConduit.HttpClientWrappedOutputStream#getInputStream
and when the header "content-length" is absent.
You can reproduce it with this code:
(apache-cxf + junit5 and jdk11+)
{code:java}
package org.talend.components.stewardship.source;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import jakarta.ws.rs.core.Response;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpClient.Version;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.cxf.jaxrs.client.ClientConfiguration;
import org.apache.cxf.jaxrs.client.WebClient;
import org.apache.cxf.transport.http.HTTPConduit;
import org.apache.cxf.transport.http.HTTPTransportFactory;
import org.junit.jupiter.api.Test;
import lombok.SneakyThrows;
class ReproduceConcurrentIssue {
@SneakyThrows
private static boolean getCxf(final String url) {
System.setProperty("org.apache.cxf.transport.http.forceURLConnection",
"false");
boolean isUsingHttpUrlConnection =
HTTPTransportFactory.isForceURLConnectionConduit();
if (isUsingHttpUrlConnection) {
Field forceURLConnectionConduitField =
HTTPTransportFactory.class.getDeclaredField("forceURLConnectionConduit");
forceURLConnectionConduitField.setAccessible(true);
forceURLConnectionConduitField.setBoolean((Object) null, false);
}
WebClient webClient = WebClient.create(url);
ClientConfiguration config = WebClient.getConfig(webClient);
final Map<String, Object> requestContext = config.getRequestContext();
requestContext.put(HTTPConduit.FORCE_HTTP_VERSION, "2");
requestContext.put("http.redirect.max.same.uri.count", 3);
requestContext.put("http.redirect.relative.uri", "true");
requestContext.put("http.redirect.same.host.only", "false");
final HTTPConduit conduit =
WebClient.getConfig(webClient).getHttpConduit();
conduit.getClient().setAutoRedirect(true);
final Response response = webClient.invoke("GET", null);
InputStream inputStream = (InputStream) response.getEntity();
if (inputStream == null) {
return false;
}
try (inputStream; ByteArrayOutputStream baos = new
ByteArrayOutputStream()) {
byte[] buffer = new byte[1024];
int bytesRead;
while ((bytesRead = inputStream.read(buffer)) != -1) {
baos.write(buffer, 0, bytesRead);
}
return baos.toByteArray().length > 0;
}
}
@SneakyThrows
private static boolean getJdk(final String url) {
final HttpClient client = HttpClient.newBuilder()
.version(Version.HTTP_2)
.build();
HttpRequest request = HttpRequest.newBuilder()
.uri(new URI(url))
.GET()
.build();
CompletableFuture<HttpResponse<InputStream>> future = client.sendAsync(
request,
BodyHandlers.ofInputStream());
final HttpResponse<InputStream> httpResponse = future.get(120_000,
MILLISECONDS);
final InputStream body = httpResponse.body();
if (body.available() <= 0) {
try (body) {
return false;
}
} else {
return true;
}
}
@Test
void testProblem() {
/* try to increase this one if you can't reproduce */
final int tryAmount = 100;
int available = 0;
int notAvailable = 0;
for (int i = 0; i < tryAmount; i++) {
if (getCxf("https://www.cloudflare.com/non-existent-path")) {
available++;
} else {
notAvailable++;
}
}
System.err.println("Response body was empty: " + notAvailable);
System.out.println("Response body is available: " + available);
}
}
{code}
> Unsafe concurrent read of body by web-client when protocol HTTPv2
> -----------------------------------------------------------------
>
> Key: CXF-9192
> URL: https://issues.apache.org/jira/browse/CXF-9192
> Project: CXF
> Issue Type: Bug
> Components: Transports
> Affects Versions: 4.1.3
> Environment: JDK11+
> Reporter: Oleksandr Zhelezniak
> Priority: Major
>
> Looks like web-client is using httpv2 concurrently unsafely to read the data
> from the input stream.
>
> Module: *cxf-rt-transports-http*
> inside the method:
> org.apache.cxf.transport.http.HttpClientHTTPConduit.HttpClientWrappedOutputStream#getInputStream
>
> It checks if the body content exists by calling java.io.InputStream#available
> But it doesn't block to wait for incoming byte buffers, and it checks for
> available bytes, but the method doesn't really wait for the bytes to appear
> in the blocking queue, it just checks if it's empty or not
> jdk.internal.net.http.ResponseSubscribers.HttpResponseInputStream#available
> (when "buffers.isEmpty" == true)
> Sometimes you might get lucky, and the buffer will appear in the blocking
> queue before you check the input.available() sometimes it's not.
>
> This issue can be reproduced when the server answers with (status code:
> sc >= 300 && sc < 500)
> org.apache.cxf.transport.http.HttpClientHTTPConduit.HttpClientWrappedOutputStream#getInputStream
> and when the header "content-length" is absent.
>
> You can reproduce it with this code:
> (apache-cxf + junit5 and jdk11+)
> {code:java}
> package org.talend.components.stewardship.source;
> import static java.util.concurrent.TimeUnit.MILLISECONDS;
> import jakarta.ws.rs.core.Response;
> import java.io.ByteArrayOutputStream;
> import java.io.InputStream;
> import java.lang.reflect.Field;
> import java.net.URI;
> import java.net.http.HttpClient;
> import java.net.http.HttpClient.Version;
> import java.net.http.HttpRequest;
> import java.net.http.HttpResponse;
> import java.net.http.HttpResponse.BodyHandlers;
> import java.util.Map;
> import java.util.concurrent.CompletableFuture;
> import org.apache.cxf.jaxrs.client.ClientConfiguration;
> import org.apache.cxf.jaxrs.client.WebClient;
> import org.apache.cxf.transport.http.HTTPConduit;
> import org.apache.cxf.transport.http.HTTPTransportFactory;
> import org.junit.jupiter.api.Test;
> import lombok.SneakyThrows;
> class ReproduceConcurrentIssue {
> @SneakyThrows
> private static boolean getCxf(final String url) {
>
> System.setProperty("org.apache.cxf.transport.http.forceURLConnection",
> "false");
> boolean isUsingHttpUrlConnection =
> HTTPTransportFactory.isForceURLConnectionConduit();
> if (isUsingHttpUrlConnection) {
> Field forceURLConnectionConduitField =
>
> HTTPTransportFactory.class.getDeclaredField("forceURLConnectionConduit");
> forceURLConnectionConduitField.setAccessible(true);
> forceURLConnectionConduitField.setBoolean((Object) null, false);
> }
> WebClient webClient = WebClient.create(url);
> ClientConfiguration config = WebClient.getConfig(webClient);
> final Map<String, Object> requestContext = config.getRequestContext();
> requestContext.put(HTTPConduit.FORCE_HTTP_VERSION, "2");
> requestContext.put("http.redirect.max.same.uri.count", 3);
> requestContext.put("http.redirect.relative.uri", "true");
> requestContext.put("http.redirect.same.host.only", "false");
> final HTTPConduit conduit =
> WebClient.getConfig(webClient).getHttpConduit();
> // this enables redirects so the content-length header won't exists
> conduit.getClient().setAutoRedirect(true);
> final Response response = webClient.invoke("GET", null);
> InputStream inputStream = (InputStream) response.getEntity();
> if (inputStream == null) {
> return false;
> }
> try (inputStream; ByteArrayOutputStream baos = new
> ByteArrayOutputStream()) {
> byte[] buffer = new byte[1024];
> int bytesRead;
> while ((bytesRead = inputStream.read(buffer)) != -1) {
> baos.write(buffer, 0, bytesRead);
> }
> return baos.toByteArray().length > 0;
> }
> }
> @SneakyThrows
> private static boolean getJdk(final String url) {
> final HttpClient client = HttpClient.newBuilder()
> .version(Version.HTTP_2)
> .build();
> HttpRequest request = HttpRequest.newBuilder()
> .uri(new URI(url))
> .GET()
> .build();
> CompletableFuture<HttpResponse<InputStream>> future =
> client.sendAsync(
> request,
> BodyHandlers.ofInputStream());
> final HttpResponse<InputStream> httpResponse = future.get(120_000,
> MILLISECONDS);
> final InputStream body = httpResponse.body();
> if (body.available() <= 0) {
> try (body) {
> return false;
> }
> } else {
> return true;
> }
> }
> @Test
> void testProblem() {
> /* try to increase this one if you can't reproduce */
> final int tryAmount = 100;
> int available = 0;
> int notAvailable = 0;
> for (int i = 0; i < tryAmount; i++) {
> if (getCxf("https://www.cloudflare.com/non-existent-path")) {
> available++;
> } else {
> notAvailable++;
> }
> }
> System.err.println("Response body was empty: " + notAvailable);
> System.out.println("Response body is available: " + available);
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)