This is an automated email from the ASF dual-hosted git repository. dkulp pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/cxf.git
The following commit(s) were added to refs/heads/main by this push: new 1e9e61a376 [CXF-8895] ConnectionExceptions with larger payloads could cause a hang with new HttpClient based conduit 1e9e61a376 is described below commit 1e9e61a3765318c7706a1886733fd45fe2f2f977 Author: Daniel Kulp <d...@kulp.com> AuthorDate: Wed Jun 28 17:45:21 2023 -0400 [CXF-8895] ConnectionExceptions with larger payloads could cause a hang with new HttpClient based conduit --- .../cxf/transport/http/HttpClientHTTPConduit.java | 65 ++++++++++++++++++++-- .../systest/dispatch/DispatchClientServerTest.java | 28 ++++++++++ 2 files changed, 88 insertions(+), 5 deletions(-) diff --git a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java index f836903431..a9e9f51830 100644 --- a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java +++ b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java @@ -74,6 +74,7 @@ import org.apache.cxf.common.util.PropertyUtils; import org.apache.cxf.configuration.jsse.TLSClientParameters; import org.apache.cxf.helpers.HttpHeaderHelper; import org.apache.cxf.helpers.JavaUtils; +import org.apache.cxf.interceptor.Fault; import org.apache.cxf.io.CacheAndWriteOutputStream; import org.apache.cxf.message.Message; import org.apache.cxf.message.MessageUtils; @@ -383,15 +384,60 @@ public class HttpClientHTTPConduit extends URLConnectionHTTPConduit { protected void setProtocolHeaders() throws IOException { HttpClient cl = outMessage.get(HttpClient.class); Address address = (Address)outMessage.get(KEY_HTTP_CONNECTION_ADDRESS); - HTTPClientPolicy csPolicy = getClient(outMessage); + final HTTPClientPolicy csPolicy = getClient(outMessage); String httpRequestMethod = (String)outMessage.get(Message.HTTP_REQUEST_METHOD); pin = new PipedInputStream(csPolicy.getChunkLength() <= 0 ? 4096 : csPolicy.getChunkLength()); - pout = new PipedOutputStream(pin); - - + pout = new PipedOutputStream(pin) { + synchronized boolean canWrite() throws IOException { + if (!connectionComplete) { + // if we haven't connected yet, we'll see if an exception is the reason + // why we haven't connected. Otherwise, wait for the connection + // to complete. + if (future.isDone()) { + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + if (e.getCause() instanceof IOException) { + throw new Fault("Could not send Message.", LOG, (IOException)e.getCause()); + } + } + return false; + } + try { + wait(csPolicy.getConnectionTimeout()); + } catch (InterruptedException e) { + //ignore + } + if (future.isDone()) { + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + if (e.getCause() instanceof IOException) { + throw new Fault("Could not send Message.", LOG, (IOException)e.getCause()); + } + } + return false; + } + } + return true; + } + @Override + public void write(int b) throws IOException { + if (connectionComplete || canWrite()) { + super.write(b); + } + } + @Override + public void write(byte[] b, int off, int len) throws IOException { + if (connectionComplete || canWrite()) { + super.write(b, off, len); + } + } + + }; if (KNOWN_HTTP_VERBS_WITH_NO_CONTENT.contains(httpRequestMethod) || PropertyUtils.isTrue(outMessage.get(Headers.EMPTY_REQUEST_PROPERTY))) { @@ -402,6 +448,9 @@ public class HttpClientHTTPConduit extends URLConnectionHTTPConduit { @Override public void subscribe(Subscriber<? super ByteBuffer> subscriber) { connectionComplete = true; + synchronized(pout) { + pout.notifyAll(); + } BodyPublishers.ofInputStream(new Supplier<InputStream>() { public InputStream get() { return pin; @@ -443,8 +492,14 @@ public class HttpClientHTTPConduit extends URLConnectionHTTPConduit { final BodyHandler<InputStream> handler = BodyHandlers.ofInputStream(); - + future = cl.sendAsync(request, handler); + future.exceptionally(ex -> { + synchronized (pout) { + pout.notifyAll(); + } + return null; + }); } @Override protected void setupWrappedStream() throws IOException { diff --git a/systests/jaxws/src/test/java/org/apache/cxf/systest/dispatch/DispatchClientServerTest.java b/systests/jaxws/src/test/java/org/apache/cxf/systest/dispatch/DispatchClientServerTest.java index b129808ec8..e682b6525e 100644 --- a/systests/jaxws/src/test/java/org/apache/cxf/systest/dispatch/DispatchClientServerTest.java +++ b/systests/jaxws/src/test/java/org/apache/cxf/systest/dispatch/DispatchClientServerTest.java @@ -25,6 +25,7 @@ import java.net.URL; import java.net.http.HttpConnectTimeoutException; import java.net.http.HttpTimeoutException; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -47,6 +48,7 @@ import org.xml.sax.InputSource; import jakarta.jws.WebService; import jakarta.xml.bind.JAXBContext; import jakarta.xml.soap.MessageFactory; +import jakarta.xml.soap.SOAPElement; import jakarta.xml.soap.SOAPMessage; import jakarta.xml.ws.AsyncHandler; import jakarta.xml.ws.BindingProvider; @@ -205,6 +207,32 @@ public class DispatchClientServerTest extends AbstractBusClientServerTestBase { || ex.getCause() instanceof java.net.SocketTimeoutException || ex.getCause() instanceof HttpConnectTimeoutException); } + + try { + //create a really big message to make sure the write gets called + Iterator<jakarta.xml.soap.Node> nodes = soapReqMsg.getSOAPBody().getChildElements(); + while (nodes.hasNext()) { + jakarta.xml.soap.Node nd = nodes.next(); + if (nd instanceof SOAPElement) { + SOAPElement se = ((SOAPElement)nd); + for (int x = 0; x < 100; x++) { + se.addTextNode("TestSoapMessageTestSoapMessageTestSoapMessageTestSoapMessage"); + } + } + } + + disp.invoke(soapReqMsg); + fail("Should have faulted"); + } catch (SOAPFaultException ex) { + fail("should not be a SOAPFaultException"); + } catch (WebServiceException ex) { + //expected + assertTrue(ex.getCause().getClass().getName(), + ex.getCause() instanceof java.net.ConnectException + || ex.getCause() instanceof java.net.SocketTimeoutException + || ex.getCause() instanceof HttpConnectTimeoutException); + } + dispImpl.close(); }