This is an automated email from the ASF dual-hosted git repository. reta pushed a commit to branch 4.0.x-fixes in repository https://gitbox.apache.org/repos/asf/cxf.git
commit d0afd2cbf61adf169fdc46ebe50081bd55606e90 Author: Andriy Redko <[email protected]> AuthorDate: Mon May 12 20:40:38 2025 -0400 CXF-9132: HttpClientHTTPConduit releases the client while there are connection(s) still using it (#2398) (cherry picked from commit 395a5b8cfb643967a1e6722a9c2cf725fb25ba5c) --- .../cxf/transport/http/HttpClientHTTPConduit.java | 22 +++- .../cxf/systest/http/jaxws/JAXWSClientTest.java | 32 ++++++ .../systest/https/conduit/HTTPSConduitTest.java | 126 +++++++++++++++++++-- .../apache/cxf/systest/https/conduit/Server.java | 46 ++++---- .../apache/cxf/systest/https/conduit/Bethal.cxf | 13 +++ 5 files changed, 206 insertions(+), 33 deletions(-) diff --git a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java index 1df103883a..baf4b49a3b 100644 --- a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java +++ b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java @@ -66,9 +66,11 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Queue; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Flow; import java.util.concurrent.Flow.Subscriber; @@ -102,13 +104,14 @@ import org.apache.cxf.ws.addressing.EndpointReferenceType; public class HttpClientHTTPConduit extends URLConnectionHTTPConduit { private static final String FORCE_URLCONNECTION_HTTP_CONDUIT = "force.urlconnection.http.conduit"; private static final String SHARE_HTTPCLIENT_CONDUIT = "share.httpclient.http.conduit"; + private static final String HTTPS_RESET_HTTPCLIENT_CONDUIT = "https.reset.httpclient.http.conduit"; private static final Set<String> RESTRICTED_HEADERS = getRestrictedHeaders(); private static final HttpClientCache CLIENTS_CACHE = new HttpClientCache(); volatile RefCount<HttpClient> clientRef; - volatile int lastTlsHash = -1; volatile URI sslURL; private final ReentrantLock initializationLock = new ReentrantLock(); + private final Queue<RefCount<HttpClient>> deferredClientRefs = new ConcurrentLinkedQueue<>(); private static final class RefCount<T extends HttpClient> { private final AtomicLong count = new AtomicLong(); @@ -270,6 +273,8 @@ public class HttpClientHTTPConduit extends URLConnectionHTTPConduit { clientRef.release(); clientRef = null; } + deferredClientRefs.forEach(RefCount::release); + deferredClientRefs.clear(); defaultAddress = null; super.close(); } @@ -351,9 +356,18 @@ public class HttpClientHTTPConduit extends URLConnectionHTTPConduit { if (sslURL != null && isSslTargetDifferent(sslURL, uri)) { sslURL = null; - if (clientRef != null) { - clientRef.release(); - clientRef = null; + + // Reset the client in case of HTTPS URL change + final boolean httpsResetHttpClient = MessageUtils.getContextualBoolean(message, + HTTPS_RESET_HTTPCLIENT_CONDUIT, true); + if (httpsResetHttpClient) { + final RefCount<HttpClient> ref = clientRef; + // Do not release client immediately since it could be in use, instead + // move it off to deferred release queue. + if (ref != null) { + deferredClientRefs.add(ref); + clientRef = null; + } } } // If the HTTP_REQUEST_METHOD is not set, the default is "POST". diff --git a/systests/transports/src/test/java/org/apache/cxf/systest/http/jaxws/JAXWSClientTest.java b/systests/transports/src/test/java/org/apache/cxf/systest/http/jaxws/JAXWSClientTest.java index 4f7ddff486..fbfed4483f 100644 --- a/systests/transports/src/test/java/org/apache/cxf/systest/http/jaxws/JAXWSClientTest.java +++ b/systests/transports/src/test/java/org/apache/cxf/systest/http/jaxws/JAXWSClientTest.java @@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import jakarta.jws.WebService; +import jakarta.xml.ws.BindingProvider; import jakarta.xml.ws.soap.SOAPFaultException; import org.apache.cxf.configuration.security.AuthorizationPolicy; import org.apache.cxf.endpoint.Client; @@ -199,4 +200,35 @@ public class JAXWSClientTest extends AbstractBusClientServerTestBase { } } } + + @Test + public void testUpdateAddress() throws Exception { + // setup the feature by using JAXWS front-end API + final JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean(); + factory.setAddress("http://localhost:" + PORT + "/SoapContext/GreeterPort"); + factory.setServiceClass(Greeter.class); + + final Greeter proxy = factory.create(Greeter.class); + final Collection<Future<String>> futures = new ArrayList<>(); + final ExecutorService executor = Executors.newFixedThreadPool(10); + + try { + for (int i = 0; i < 100; ++i) { + futures.add(executor.submit(() -> { + final BindingProvider bp = (BindingProvider)proxy; + updateAddressPort(bp, PORT); + return proxy.greetMe("Hi!"); + })); + } + + for (final Future<String> f: futures) { + assertThat(f.get(10, TimeUnit.SECONDS), equalTo("Hi!".toUpperCase())); + } + } finally { + executor.shutdown(); + if (!executor.awaitTermination(30, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } + } } diff --git a/systests/transports/src/test/java/org/apache/cxf/systest/https/conduit/HTTPSConduitTest.java b/systests/transports/src/test/java/org/apache/cxf/systest/https/conduit/HTTPSConduitTest.java index ae69692507..df9290cc3c 100644 --- a/systests/transports/src/test/java/org/apache/cxf/systest/https/conduit/HTTPSConduitTest.java +++ b/systests/transports/src/test/java/org/apache/cxf/systest/https/conduit/HTTPSConduitTest.java @@ -31,9 +31,16 @@ import java.security.SecureRandom; import java.security.cert.Certificate; import java.security.cert.X509Certificate; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.TreeMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.KeyManager; @@ -47,6 +54,7 @@ import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.X509ExtendedTrustManager; import javax.xml.namespace.QName; +import jakarta.xml.ws.BindingProvider; import org.apache.cxf.Bus; import org.apache.cxf.BusFactory; import org.apache.cxf.bus.spring.BusApplicationContext; @@ -76,6 +84,8 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -111,7 +121,7 @@ public class HTTPSConduitTest extends AbstractBusClientServerTestBase { private static TLSClientParameters tlsClientParameters = new TLSClientParameters(); private static List<String> servers = new ArrayList<>(); - private static Map<String, String> addrMap = new TreeMap<>(); + private static Map<String, Collection<String>> addrMap = new TreeMap<>(); static { try (InputStream key = ClassLoaderUtils.getResourceAsStream("keys/Morpit.jks", HTTPSConduitTest.class); @@ -152,12 +162,13 @@ public class HTTPSConduitTest extends AbstractBusClientServerTestBase { public static void allocatePorts() { BusServer.resetPortMap(); addrMap.clear(); - addrMap.put("Mortimer", "http://localhost:" + getPort("PORT0") + "/"); - addrMap.put("Tarpin", "https://localhost:" + getPort("PORT1") + "/"); - addrMap.put("Poltim", "https://localhost:" + getPort("PORT2") + "/"); - addrMap.put("Gordy", "https://localhost:" + getPort("PORT3") + "/"); - addrMap.put("Bethal", "https://localhost:" + getPort("PORT4") + "/"); - addrMap.put("Morpit", "https://localhost:" + getPort("PORT5") + "/"); + addrMap.put("Mortimer", List.of("http://localhost:" + getPort("PORT0") + "/")); + addrMap.put("Tarpin", List.of("https://localhost:" + getPort("PORT1") + "/")); + addrMap.put("Poltim", List.of("https://localhost:" + getPort("PORT2") + "/")); + addrMap.put("Gordy", List.of("https://localhost:" + getPort("PORT3") + "/")); + addrMap.put("Bethal", List.of("https://localhost:" + getPort("PORT4") + "/", + "https://localhost:" + getPort("PORT6") + "/")); + addrMap.put("Morpit", List.of("https://localhost:" + getPort("PORT5") + "/")); tlsClientParameters.setDisableCNCheck(true); servers.clear(); } @@ -184,7 +195,7 @@ public class HTTPSConduitTest extends AbstractBusClientServerTestBase { boolean server = launchServer(Server.class, null, new String[] { name, - addrMap.get(name), + addrMap.get(name).stream().collect(Collectors.joining(",")), serverC.toString() }, IN_PROCESS); if (server) { @@ -844,5 +855,104 @@ public class HTTPSConduitTest extends AbstractBusClientServerTestBase { } } + @Test + public void testUpdateAddress() throws Exception { + startServer("Bethal"); + + URL config = getClass().getResource("BethalClientConfig.cxf"); + + // We go through the back door, setting the default bus. + new DefaultBusFactory().createBus(config); + URL wsdl = getClass().getResource("greeting.wsdl"); + assertNotNull("WSDL is null", wsdl); + + SOAPService service = new SOAPService(wsdl, serviceName); + assertNotNull("Service is null", service); + + Greeter bethal = service.getPort(bethalQ, Greeter.class); + updateAddressPort(bethal, getPort("PORT4")); + verifyBethalClient(bethal); + + updateAddressPort(bethal, getPort("PORT6")); + verifyBethalClient(bethal); + + // setup the feature by using JAXWS front-end API + final Collection<Future<String>> futures = new ArrayList<>(); + final ExecutorService executor = Executors.newFixedThreadPool(10); + final Random random = new Random(); + + try { + for (int i = 0; i < 30; ++i) { + futures.add(executor.submit(() -> { + if (random.nextBoolean()) { + updateAddressPort(bethal, getPort("PORT4")); + } else { + updateAddressPort(bethal, getPort("PORT6")); + } + return bethal.greetMe("timeout!"); + })); + } + + for (final Future<String> f: futures) { + assertThat(f.get(10, TimeUnit.SECONDS), equalTo("Hello timeout!")); + } + } finally { + executor.shutdown(); + if (!executor.awaitTermination(30, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } + } + + @Test + public void testUpdateAddressNoClientReset() throws Exception { + startServer("Bethal"); + + URL config = getClass().getResource("BethalClientConfig.cxf"); + + // We go through the back door, setting the default bus. + new DefaultBusFactory().createBus(config); + URL wsdl = getClass().getResource("greeting.wsdl"); + assertNotNull("WSDL is null", wsdl); + + SOAPService service = new SOAPService(wsdl, serviceName); + assertNotNull("Service is null", service); + + Greeter bethal = service.getPort(bethalQ, Greeter.class); + ((BindingProvider)bethal).getRequestContext().put("https.reset.httpclient.http.conduit", false); + + updateAddressPort(bethal, getPort("PORT4")); + verifyBethalClient(bethal); + + updateAddressPort(bethal, getPort("PORT6")); + verifyBethalClient(bethal); + + // setup the feature by using JAXWS front-end API + final Collection<Future<String>> futures = new ArrayList<>(); + final ExecutorService executor = Executors.newFixedThreadPool(10); + final Random random = new Random(); + + try { + for (int i = 0; i < 30; ++i) { + futures.add(executor.submit(() -> { + if (random.nextBoolean()) { + updateAddressPort(bethal, getPort("PORT4")); + } else { + updateAddressPort(bethal, getPort("PORT6")); + } + return bethal.greetMe("timeout!"); + })); + } + + for (final Future<String> f: futures) { + assertThat(f.get(10, TimeUnit.SECONDS), equalTo("Hello timeout!")); + } + } finally { + executor.shutdown(); + if (!executor.awaitTermination(30, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } + } } diff --git a/systests/transports/src/test/java/org/apache/cxf/systest/https/conduit/Server.java b/systests/transports/src/test/java/org/apache/cxf/systest/https/conduit/Server.java index 6e54b1bc91..a1887d7a06 100644 --- a/systests/transports/src/test/java/org/apache/cxf/systest/https/conduit/Server.java +++ b/systests/transports/src/test/java/org/apache/cxf/systest/https/conduit/Server.java @@ -32,9 +32,9 @@ public class Server extends AbstractBusTestServerBase { public static final String PORT = allocatePort(Server.class); private String name; - private String address; + private String[] addresses; private URL configFileURL; - private EndpointImpl ep; + private EndpointImpl[] eps; public Server(String[] args) throws Exception { this(args[0], args[1], args[2]); @@ -42,17 +42,17 @@ public class Server extends AbstractBusTestServerBase { public Server(String n, String addr, String conf) throws Exception { name = n; - address = addr; + addresses = addr.split(","); configFileURL = new URL(conf); - //System.out.println("Starting " + name - // + " Server at " + address - // + " with config " + configFileURL); - } + public void tearDown() throws Exception { - if (ep != null) { - ep.stop(); - ep = null; + if (eps != null) { + for (EndpointImpl ep: eps) { + if (ep != null) { + ep.stop(); + } + } } } @@ -73,17 +73,21 @@ public class Server extends AbstractBusTestServerBase { // to match the WSDL file that we are using. Object implementor = new GreeterImpl(name); - // I don't know why this works. - ep = - new EndpointImpl( - getBus(), - implementor, - nullBindingID, - this.getClass().getResource("greeting.wsdl").toString()); - // How the hell do I know what the name of the - // http-destination is from using this call? - ep.setEndpointName(new QName("http://apache.org/hello_world", name)); - ep.publish(address); + eps = new EndpointImpl[addresses.length]; + for (int i = 0; i < addresses.length; ++i) { + // I don't know why this works. + final EndpointImpl ep = + new EndpointImpl( + getBus(), + implementor, + nullBindingID, + this.getClass().getResource("greeting.wsdl").toString()); + // How the hell do I know what the name of the + // http-destination is from using this call? + ep.setEndpointName(new QName("http://apache.org/hello_world", name)); + ep.publish(addresses[i]); + eps[i] = ep; + } } diff --git a/systests/transports/src/test/resources/org/apache/cxf/systest/https/conduit/Bethal.cxf b/systests/transports/src/test/resources/org/apache/cxf/systest/https/conduit/Bethal.cxf index 83e6d4a8df..8302e28a9f 100644 --- a/systests/transports/src/test/resources/org/apache/cxf/systest/https/conduit/Bethal.cxf +++ b/systests/transports/src/test/resources/org/apache/cxf/systest/https/conduit/Bethal.cxf @@ -60,6 +60,19 @@ <sec:clientAuthentication want="true" required="true"/> </httpj:tlsServerParameters> </httpj:engine> + <httpj:engine port="${testutil.ports.BusServer.6}"> + <httpj:tlsServerParameters> + <sec:keyManagers keyPassword="OBF:1v2j1uum1xtv1zej1zer1xtn1uvk1v1v"> + <sec:keyStore type="JKS" password="OBF:1v2j1uum1xtv1zej1zer1xtn1uvk1v1v" + resource="keys/Bethal.jks"/> + </sec:keyManagers> + <sec:trustManagers> + <sec:keyStore type="JKS" password="OBF:1v2j1uum1xtv1zej1zer1xtn1uvk1v1v" + resource="keys/Truststore.jks"/> + </sec:trustManagers> + <sec:clientAuthentication want="true" required="true"/> + </httpj:tlsServerParameters> + </httpj:engine> </httpj:engine-factory> <cxf:bus>
