Oleksandr Zhelezniak created CXF-9192:
-----------------------------------------
Summary: Unsafe concurrent read of body by web-client when
protocol HTTPv2
Key: CXF-9192
URL: https://issues.apache.org/jira/browse/CXF-9192
Project: CXF
Issue Type: Bug
Components: Transports
Affects Versions: 4.1.3
Environment: JDK11+
Reporter: Oleksandr Zhelezniak
Looks like web-client is using httpv2 concurrently unsafely to read the data
from the input stream.
Module: *cxf-rt-transports-http*
inside the method:
org.apache.cxf.transport.http.HttpClientHTTPConduit.HttpClientWrappedOutputStream#getInputStream
It checks if the body content exists by calling java.io.InputStream#available
But it doesn't block to wait for incoming byte buffers, and it checks for
available bytes, but the method doesn't really wait for the bytes to appear in
the blocking queue, it just checks if it's empty or not
jdk.internal.net.http.ResponseSubscribers.HttpResponseInputStream#available
(when "buffers.isEmpty" == true)
Sometimes you might get lucky, and the buffer will appear in the blocking queue
before you check the input.available() sometimes it's not.
This issue can be reproduced when the server answers with (status code:
sc >= 300 && sc < 500)
org.apache.cxf.transport.http.HttpClientHTTPConduit.HttpClientWrappedOutputStream#getInputStream
and when the header "content-length" is absent.
You can reproduce it with this code:
(apache-cxf + junit5 and jdk11+)
{code:java}
package org.talend.components.stewardship.source;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import jakarta.ws.rs.core.Response;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpClient.Version;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.cxf.jaxrs.client.ClientConfiguration;
import org.apache.cxf.jaxrs.client.WebClient;
import org.apache.cxf.transport.http.HTTPConduit;
import org.apache.cxf.transport.http.HTTPTransportFactory;
import org.junit.jupiter.api.Test;
import lombok.SneakyThrows;
class ReproduceConcurrentIssue {
@SneakyThrows
private static boolean getCxf(final String url) {
System.setProperty("org.apache.cxf.transport.http.forceURLConnection",
"false");
boolean isUsingHttpUrlConnection =
HTTPTransportFactory.isForceURLConnectionConduit();
if (isUsingHttpUrlConnection) {
Field forceURLConnectionConduitField =
HTTPTransportFactory.class.getDeclaredField("forceURLConnectionConduit");
forceURLConnectionConduitField.setAccessible(true);
forceURLConnectionConduitField.setBoolean((Object) null, false);
}
WebClient webClient = WebClient.create(url);
ClientConfiguration config = WebClient.getConfig(webClient);
final Map<String, Object> requestContext = config.getRequestContext();
requestContext.put(HTTPConduit.FORCE_HTTP_VERSION, "2");
requestContext.put("http.redirect.max.same.uri.count", 3);
requestContext.put("http.redirect.relative.uri", "true");
requestContext.put("http.redirect.same.host.only", "false");
final HTTPConduit conduit =
WebClient.getConfig(webClient).getHttpConduit();
conduit.getClient().setAutoRedirect(true);
final Response response = webClient.invoke("GET", null);
InputStream inputStream = (InputStream) response.getEntity();
if (inputStream == null) {
return false;
}
try (inputStream; ByteArrayOutputStream baos = new
ByteArrayOutputStream()) {
byte[] buffer = new byte[1024];
int bytesRead;
while ((bytesRead = inputStream.read(buffer)) != -1) {
baos.write(buffer, 0, bytesRead);
}
return baos.toByteArray().length > 0;
}
}
@SneakyThrows
private static boolean getJdk(final String url) {
final HttpClient client = HttpClient.newBuilder()
.version(Version.HTTP_2)
.build();
HttpRequest request = HttpRequest.newBuilder()
.uri(new URI(url))
.GET()
.build();
CompletableFuture<HttpResponse<InputStream>> future = client.sendAsync(
request,
BodyHandlers.ofInputStream());
final HttpResponse<InputStream> httpResponse = future.get(120_000,
MILLISECONDS);
final InputStream body = httpResponse.body();
if (body.available() <= 0) {
try (body) {
return false;
}
} else {
return true;
}
}
@Test
void testProblem() {
/* try to increase this one if you can't reproduce */
final int tryAmount = 100;
int available = 0;
int notAvailable = 0;
for (int i = 0; i < tryAmount; i++) {
if (getCxf("https://www.cloudflare.com/non-existent-path")) {
available++;
} else {
notAvailable++;
}
}
System.err.println("Response body was empty: " + notAvailable);
System.out.println("Response body is available: " + available);
}
}
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)