This is an automated email from the ASF dual-hosted git repository. reta pushed a commit to branch 3.5.x-fixes in repository https://gitbox.apache.org/repos/asf/cxf.git
commit 0fedc36c8c12ede90c7a08a3c27a4d0cee2448f7 Author: Andriy Redko <[email protected]> AuthorDate: Thu Feb 27 20:18:55 2025 -0500 CXF-9110: Fix extensive memory usage with enabled LoggingFeature (#2278) * CXF-9110: Fix extensive memory usage with enabled LoggingFeature * CXF-9110: Memory leak in DelayedCleanerImpl.queue when LoggingFeature is enabled --------- Co-authored-by: Yury Molchan <[email protected]> (cherry picked from commit a9351cd96339c9886058581d44ba92cb02a59713) (cherry picked from commit 2ec08fac115e6b287510da55eb72a7f8c1b72bdb) # Conflicts: # rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/logging/TestServiceRest.java (cherry picked from commit 0532266fa62e686c56e23c4d08062ecfa8eec2a3) --- .../java/org/apache/cxf/io/CachedOutputStream.java | 2 + .../apache/cxf/io/CachedOutputStreamCleaner.java | 8 +++ .../cxf/io/DelayedCachedOutputStreamCleaner.java | 29 +++++++--- .../cxf/jaxrs/client/logging/RESTLoggingTest.java | 62 ++++++++++++++++++++++ .../cxf/jaxrs/client/logging/TestServiceRest.java | 6 +++ 5 files changed, 100 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java b/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java index 3ba937d04b..f5526fa57f 100644 --- a/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java +++ b/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java @@ -283,8 +283,10 @@ public class CachedOutputStream extends OutputStream { } } finally { streamList.remove(currentStream); + // we are not backed by file anymore, unregister from the cleaner if (cachedOutputStreamCleaner != null) { cachedOutputStreamCleaner.unregister(currentStream); + cachedOutputStreamCleaner.unregister(this); } deleteTempFile(); inmem = true; diff --git a/core/src/main/java/org/apache/cxf/io/CachedOutputStreamCleaner.java b/core/src/main/java/org/apache/cxf/io/CachedOutputStreamCleaner.java index 3d6361a4c9..dbef316afb 100644 --- a/core/src/main/java/org/apache/cxf/io/CachedOutputStreamCleaner.java +++ b/core/src/main/java/org/apache/cxf/io/CachedOutputStreamCleaner.java @@ -40,4 +40,12 @@ public interface CachedOutputStreamCleaner { * Unregister the stream instance from the clean up (closed properly) */ void register(Closeable closeable); + + /** + * The exact or approximate (depending on the implementation) size of the cleaner queue + * @return exact or approximate (depending on the implementation) size of the cleaner queue + */ + default int size() { + return 0; + } } diff --git a/core/src/main/java/org/apache/cxf/io/DelayedCachedOutputStreamCleaner.java b/core/src/main/java/org/apache/cxf/io/DelayedCachedOutputStreamCleaner.java index 65e85da0a6..524adc0a69 100644 --- a/core/src/main/java/org/apache/cxf/io/DelayedCachedOutputStreamCleaner.java +++ b/core/src/main/java/org/apache/cxf/io/DelayedCachedOutputStreamCleaner.java @@ -53,19 +53,24 @@ public final class DelayedCachedOutputStreamCleaner implements CachedOutputStrea @Override default void register(Closeable closeable) { } - + @Override default void unregister(Closeable closeable) { } - + @Override default void close() { } - + @Override default void clean() { } - + + @Override + default int size() { + return 0; + } + default void forceClean() { } } @@ -102,18 +107,23 @@ public final class DelayedCachedOutputStreamCleaner implements CachedOutputStrea queue.drainTo(closeables); clean(closeables); } - + @Override public void forceClean() { clean(queue); } - + @Override public void close() { timer.cancel(); queue.clear(); } - + + @Override + public int size() { + return queue.size(); + } + private void clean(Collection<DelayedCloseable> closeables) { final Iterator<DelayedCloseable> iterator = closeables.iterator(); while (iterator.hasNext()) { @@ -225,6 +235,11 @@ public final class DelayedCachedOutputStreamCleaner implements CachedOutputStrea cleaner.unregister(closeable); } + @Override + public int size() { + return cleaner.size(); + } + @Override public void clean() { cleaner.clean(); diff --git a/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/logging/RESTLoggingTest.java b/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/logging/RESTLoggingTest.java index 6c81b20d1d..01c188c96f 100644 --- a/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/logging/RESTLoggingTest.java +++ b/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/logging/RESTLoggingTest.java @@ -19,17 +19,21 @@ package org.apache.cxf.jaxrs.client.logging; +import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.net.MalformedURLException; import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.LongAdder; +import org.apache.cxf.Bus; import org.apache.cxf.endpoint.Server; import org.apache.cxf.ext.logging.AbstractLoggingInterceptor; import org.apache.cxf.ext.logging.LoggingFeature; import org.apache.cxf.ext.logging.event.EventType; import org.apache.cxf.ext.logging.event.LogEvent; +import org.apache.cxf.io.CachedOutputStreamCleaner; import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; import org.apache.cxf.jaxrs.client.JAXRSClientFactoryBean; import org.apache.cxf.jaxrs.client.WebClient; @@ -39,7 +43,10 @@ import org.junit.Assert; import org.junit.Test; import static org.awaitility.Awaitility.await; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; public class RESTLoggingTest { @@ -57,6 +64,61 @@ public class RESTLoggingTest { Assert.assertEquals("test1", result); } + @Test + public void testCacheCleanUp() throws Exception { + LoggingFeature loggingFeature = new LoggingFeature(); + loggingFeature.setInMemThreshold(1); // To activate usage of the CachedOutputStream + + Server server = createService(SERVICE_URI, new TestServiceRest(), loggingFeature); + server.start(); + + try { + final JAXRSClientFactoryBean bean = new JAXRSClientFactoryBean(); + bean.setAddress(SERVICE_URI); + bean.setTransportId(LocalTransportFactory.TRANSPORT_ID); + + final LongAdder registers = new LongAdder(); + final WebClient client = bean.createWebClient(); + final Bus bus = bean.getBus(); + + // See please https://issues.apache.org/jira/browse/CXF-9110 + final CachedOutputStreamCleaner cleaner = bus.getExtension(CachedOutputStreamCleaner.class); + bus.setExtension(new CachedOutputStreamCleaner() { + @Override + public void clean() { + cleaner.clean(); + } + + @Override + public void unregister(Closeable closeable) { + cleaner.unregister(closeable); + } + + @Override + public void register(Closeable closeable) { + cleaner.register(closeable); + registers.increment(); + } + + @Override + public int size() { + return cleaner.size(); + } + }, CachedOutputStreamCleaner.class); + + String response = null; + for (int i = 0; i < 1_000; i++) { // ~2...5 seconds of the execution + response = client.post("DATA", String.class); + } + assertEquals("DATA", response); + + assertThat(registers.longValue(), equalTo(3000L)); + assertThat(cleaner.size(), equalTo(0)); + } finally { + server.destroy(); + } + } + @Test public void testBinary() throws IOException, InterruptedException { LoggingFeature loggingFeature = new LoggingFeature(); diff --git a/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/logging/TestServiceRest.java b/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/logging/TestServiceRest.java index cf506a4f29..0d94fc1f71 100644 --- a/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/logging/TestServiceRest.java +++ b/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/logging/TestServiceRest.java @@ -19,6 +19,7 @@ package org.apache.cxf.jaxrs.client.logging; import javax.ws.rs.GET; +import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.PathParam; @@ -29,4 +30,9 @@ public class TestServiceRest { return msg; } + @POST + public String post(String msg) { + return msg; + } } +
