This is an automated email from the ASF dual-hosted git repository.

dkulp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cxf.git


The following commit(s) were added to refs/heads/main by this push:
     new 1e9e61a376 [CXF-8895] ConnectionExceptions with larger payloads could 
cause a hang with new HttpClient based conduit
1e9e61a376 is described below

commit 1e9e61a3765318c7706a1886733fd45fe2f2f977
Author: Daniel Kulp <d...@kulp.com>
AuthorDate: Wed Jun 28 17:45:21 2023 -0400

    [CXF-8895] ConnectionExceptions with larger payloads could cause a hang 
with new HttpClient based conduit
---
 .../cxf/transport/http/HttpClientHTTPConduit.java  | 65 ++++++++++++++++++++--
 .../systest/dispatch/DispatchClientServerTest.java | 28 ++++++++++
 2 files changed, 88 insertions(+), 5 deletions(-)

diff --git 
a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java
 
b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java
index f836903431..a9e9f51830 100644
--- 
a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java
+++ 
b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java
@@ -74,6 +74,7 @@ import org.apache.cxf.common.util.PropertyUtils;
 import org.apache.cxf.configuration.jsse.TLSClientParameters;
 import org.apache.cxf.helpers.HttpHeaderHelper;
 import org.apache.cxf.helpers.JavaUtils;
+import org.apache.cxf.interceptor.Fault;
 import org.apache.cxf.io.CacheAndWriteOutputStream;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageUtils;
@@ -383,15 +384,60 @@ public class HttpClientHTTPConduit extends 
URLConnectionHTTPConduit {
         protected void setProtocolHeaders() throws IOException {
             HttpClient cl = outMessage.get(HttpClient.class);
             Address address = 
(Address)outMessage.get(KEY_HTTP_CONNECTION_ADDRESS);
-            HTTPClientPolicy csPolicy = getClient(outMessage);
+            final HTTPClientPolicy csPolicy = getClient(outMessage);
             String httpRequestMethod =
                 (String)outMessage.get(Message.HTTP_REQUEST_METHOD);
 
             pin = new PipedInputStream(csPolicy.getChunkLength() <= 0
                                         ? 4096 : csPolicy.getChunkLength());
-            pout = new PipedOutputStream(pin);
-            
-            
+            pout = new PipedOutputStream(pin) {
+                synchronized boolean canWrite() throws IOException {
+                    if (!connectionComplete) {
+                        // if we haven't connected yet, we'll see if an 
exception is the reason 
+                        // why we haven't connected.  Otherwise, wait for the 
connection
+                        // to complete.
+                        if (future.isDone()) {
+                            try {
+                                future.get();
+                            } catch (InterruptedException | ExecutionException 
e) {
+                                if (e.getCause() instanceof IOException) {
+                                    throw new Fault("Could not send Message.", 
LOG, (IOException)e.getCause());
+                                }
+                            }
+                            return false;
+                        }                        
+                        try {
+                            wait(csPolicy.getConnectionTimeout());
+                        } catch (InterruptedException e) {
+                            //ignore
+                        }
+                        if (future.isDone()) {
+                            try {
+                                future.get();
+                            } catch (InterruptedException | ExecutionException 
e) {
+                                if (e.getCause() instanceof IOException) {
+                                    throw new Fault("Could not send Message.", 
LOG, (IOException)e.getCause());
+                                }
+                            }
+                            return false;
+                        }
+                    }                    
+                    return true;
+                }
+                @Override
+                public void write(int b) throws IOException {
+                    if (connectionComplete || canWrite()) {
+                        super.write(b);
+                    }
+                }
+                @Override
+                public void write(byte[] b, int off, int len) throws 
IOException {
+                    if (connectionComplete || canWrite()) {
+                        super.write(b, off, len);
+                    }
+                }
+                
+            };
             
             if (KNOWN_HTTP_VERBS_WITH_NO_CONTENT.contains(httpRequestMethod)
                 || 
PropertyUtils.isTrue(outMessage.get(Headers.EMPTY_REQUEST_PROPERTY))) {
@@ -402,6 +448,9 @@ public class HttpClientHTTPConduit extends 
URLConnectionHTTPConduit {
                 @Override
                 public void subscribe(Subscriber<? super ByteBuffer> 
subscriber) {
                     connectionComplete = true;
+                    synchronized(pout) {
+                        pout.notifyAll();
+                    }
                     BodyPublishers.ofInputStream(new Supplier<InputStream>() {
                         public InputStream get() {
                             return pin;
@@ -443,8 +492,14 @@ public class HttpClientHTTPConduit extends 
URLConnectionHTTPConduit {
             
             
             final BodyHandler<InputStream> handler =  
BodyHandlers.ofInputStream();
-            
+
             future = cl.sendAsync(request, handler);
+            future.exceptionally(ex -> {
+                synchronized (pout) {
+                    pout.notifyAll();
+                }
+                return null;
+            });
         }
         @Override
         protected void setupWrappedStream() throws IOException {
diff --git 
a/systests/jaxws/src/test/java/org/apache/cxf/systest/dispatch/DispatchClientServerTest.java
 
b/systests/jaxws/src/test/java/org/apache/cxf/systest/dispatch/DispatchClientServerTest.java
index b129808ec8..e682b6525e 100644
--- 
a/systests/jaxws/src/test/java/org/apache/cxf/systest/dispatch/DispatchClientServerTest.java
+++ 
b/systests/jaxws/src/test/java/org/apache/cxf/systest/dispatch/DispatchClientServerTest.java
@@ -25,6 +25,7 @@ import java.net.URL;
 import java.net.http.HttpConnectTimeoutException;
 import java.net.http.HttpTimeoutException;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -47,6 +48,7 @@ import org.xml.sax.InputSource;
 import jakarta.jws.WebService;
 import jakarta.xml.bind.JAXBContext;
 import jakarta.xml.soap.MessageFactory;
+import jakarta.xml.soap.SOAPElement;
 import jakarta.xml.soap.SOAPMessage;
 import jakarta.xml.ws.AsyncHandler;
 import jakarta.xml.ws.BindingProvider;
@@ -205,6 +207,32 @@ public class DispatchClientServerTest extends 
AbstractBusClientServerTestBase {
                        || ex.getCause() instanceof 
java.net.SocketTimeoutException
                        || ex.getCause()  instanceof 
HttpConnectTimeoutException);
         }
+        
+        try {            
+            //create a really big message to make sure the write gets called
+            Iterator<jakarta.xml.soap.Node> nodes = 
soapReqMsg.getSOAPBody().getChildElements();
+            while (nodes.hasNext()) {
+                jakarta.xml.soap.Node nd = nodes.next();
+                if (nd instanceof SOAPElement) {
+                    SOAPElement se = ((SOAPElement)nd);
+                    for (int x = 0; x < 100; x++) {
+                        
se.addTextNode("TestSoapMessageTestSoapMessageTestSoapMessageTestSoapMessage");
+                    }
+                }
+            }            
+            
+            disp.invoke(soapReqMsg);
+            fail("Should have faulted");
+        } catch (SOAPFaultException ex) {
+            fail("should not be a SOAPFaultException");
+        } catch (WebServiceException ex) {
+            //expected
+            assertTrue(ex.getCause().getClass().getName(),
+                       ex.getCause() instanceof java.net.ConnectException
+                       || ex.getCause() instanceof 
java.net.SocketTimeoutException
+                       || ex.getCause()  instanceof 
HttpConnectTimeoutException);
+        }
+        
         dispImpl.close();
 
     }

Reply via email to