This is an automated email from the ASF dual-hosted git repository.
reta pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cxf.git
The following commit(s) were added to refs/heads/main by this push:
new a9351cd963 CXF-9110: Fix extensive memory usage with enabled
LoggingFeature (#2278)
a9351cd963 is described below
commit a9351cd96339c9886058581d44ba92cb02a59713
Author: Andriy Redko <[email protected]>
AuthorDate: Thu Feb 27 20:18:55 2025 -0500
CXF-9110: Fix extensive memory usage with enabled LoggingFeature (#2278)
* CXF-9110: Fix extensive memory usage with enabled LoggingFeature
* CXF-9110: Memory leak in DelayedCleanerImpl.queue when LoggingFeature is
enabled
---------
Co-authored-by: Yury Molchan <[email protected]>
---
.../java/org/apache/cxf/io/CachedOutputStream.java | 2 +
.../apache/cxf/io/CachedOutputStreamCleaner.java | 8 +++
.../cxf/io/DelayedCachedOutputStreamCleaner.java | 29 +++++++---
.../cxf/jaxrs/client/logging/RESTLoggingTest.java | 62 ++++++++++++++++++++++
.../cxf/jaxrs/client/logging/TestServiceRest.java | 6 +++
5 files changed, 100 insertions(+), 7 deletions(-)
diff --git a/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java
b/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java
index 3ba937d04b..f5526fa57f 100644
--- a/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java
+++ b/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java
@@ -283,8 +283,10 @@ public class CachedOutputStream extends OutputStream {
}
} finally {
streamList.remove(currentStream);
+ // we are not backed by file anymore, unregister from the
cleaner
if (cachedOutputStreamCleaner != null) {
cachedOutputStreamCleaner.unregister(currentStream);
+ cachedOutputStreamCleaner.unregister(this);
}
deleteTempFile();
inmem = true;
diff --git
a/core/src/main/java/org/apache/cxf/io/CachedOutputStreamCleaner.java
b/core/src/main/java/org/apache/cxf/io/CachedOutputStreamCleaner.java
index 3d6361a4c9..dbef316afb 100644
--- a/core/src/main/java/org/apache/cxf/io/CachedOutputStreamCleaner.java
+++ b/core/src/main/java/org/apache/cxf/io/CachedOutputStreamCleaner.java
@@ -40,4 +40,12 @@ public interface CachedOutputStreamCleaner {
* Unregister the stream instance from the clean up (closed properly)
*/
void register(Closeable closeable);
+
+ /**
+ * The exact or approximate (depending on the implementation) size of the
cleaner queue
+ * @return exact or approximate (depending on the implementation) size of
the cleaner queue
+ */
+ default int size() {
+ return 0;
+ }
}
diff --git
a/core/src/main/java/org/apache/cxf/io/DelayedCachedOutputStreamCleaner.java
b/core/src/main/java/org/apache/cxf/io/DelayedCachedOutputStreamCleaner.java
index 5c8a176814..4d76b65425 100644
--- a/core/src/main/java/org/apache/cxf/io/DelayedCachedOutputStreamCleaner.java
+++ b/core/src/main/java/org/apache/cxf/io/DelayedCachedOutputStreamCleaner.java
@@ -52,19 +52,24 @@ public final class DelayedCachedOutputStreamCleaner
implements CachedOutputStrea
@Override
default void register(Closeable closeable) {
}
-
+
@Override
default void unregister(Closeable closeable) {
}
-
+
@Override
default void close() {
}
-
+
@Override
default void clean() {
}
-
+
+ @Override
+ default int size() {
+ return 0;
+ }
+
default void forceClean() {
}
}
@@ -101,18 +106,23 @@ public final class DelayedCachedOutputStreamCleaner
implements CachedOutputStrea
queue.drainTo(closeables);
clean(closeables);
}
-
+
@Override
public void forceClean() {
clean(queue);
}
-
+
@Override
public void close() {
timer.cancel();
queue.clear();
}
-
+
+ @Override
+ public int size() {
+ return queue.size();
+ }
+
private void clean(Collection<DelayedCloseable> closeables) {
final Iterator<DelayedCloseable> iterator = closeables.iterator();
while (iterator.hasNext()) {
@@ -224,6 +234,11 @@ public final class DelayedCachedOutputStreamCleaner
implements CachedOutputStrea
cleaner.unregister(closeable);
}
+ @Override
+ public int size() {
+ return cleaner.size();
+ }
+
@Override
public void clean() {
cleaner.clean();
diff --git
a/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/logging/RESTLoggingTest.java
b/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/logging/RESTLoggingTest.java
index 6c81b20d1d..01c188c96f 100644
---
a/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/logging/RESTLoggingTest.java
+++
b/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/logging/RESTLoggingTest.java
@@ -19,17 +19,21 @@
package org.apache.cxf.jaxrs.client.logging;
+import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.cxf.Bus;
import org.apache.cxf.endpoint.Server;
import org.apache.cxf.ext.logging.AbstractLoggingInterceptor;
import org.apache.cxf.ext.logging.LoggingFeature;
import org.apache.cxf.ext.logging.event.EventType;
import org.apache.cxf.ext.logging.event.LogEvent;
+import org.apache.cxf.io.CachedOutputStreamCleaner;
import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
import org.apache.cxf.jaxrs.client.JAXRSClientFactoryBean;
import org.apache.cxf.jaxrs.client.WebClient;
@@ -39,7 +43,10 @@ import org.junit.Assert;
import org.junit.Test;
import static org.awaitility.Awaitility.await;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
public class RESTLoggingTest {
@@ -57,6 +64,61 @@ public class RESTLoggingTest {
Assert.assertEquals("test1", result);
}
+ @Test
+ public void testCacheCleanUp() throws Exception {
+ LoggingFeature loggingFeature = new LoggingFeature();
+ loggingFeature.setInMemThreshold(1); // To activate usage of the
CachedOutputStream
+
+ Server server = createService(SERVICE_URI, new TestServiceRest(),
loggingFeature);
+ server.start();
+
+ try {
+ final JAXRSClientFactoryBean bean = new JAXRSClientFactoryBean();
+ bean.setAddress(SERVICE_URI);
+ bean.setTransportId(LocalTransportFactory.TRANSPORT_ID);
+
+ final LongAdder registers = new LongAdder();
+ final WebClient client = bean.createWebClient();
+ final Bus bus = bean.getBus();
+
+ // See please https://issues.apache.org/jira/browse/CXF-9110
+ final CachedOutputStreamCleaner cleaner =
bus.getExtension(CachedOutputStreamCleaner.class);
+ bus.setExtension(new CachedOutputStreamCleaner() {
+ @Override
+ public void clean() {
+ cleaner.clean();
+ }
+
+ @Override
+ public void unregister(Closeable closeable) {
+ cleaner.unregister(closeable);
+ }
+
+ @Override
+ public void register(Closeable closeable) {
+ cleaner.register(closeable);
+ registers.increment();
+ }
+
+ @Override
+ public int size() {
+ return cleaner.size();
+ }
+ }, CachedOutputStreamCleaner.class);
+
+ String response = null;
+ for (int i = 0; i < 1_000; i++) { // ~2...5 seconds of the
execution
+ response = client.post("DATA", String.class);
+ }
+ assertEquals("DATA", response);
+
+ assertThat(registers.longValue(), equalTo(3000L));
+ assertThat(cleaner.size(), equalTo(0));
+ } finally {
+ server.destroy();
+ }
+ }
+
@Test
public void testBinary() throws IOException, InterruptedException {
LoggingFeature loggingFeature = new LoggingFeature();
diff --git
a/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/logging/TestServiceRest.java
b/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/logging/TestServiceRest.java
index ab237bc710..1ef575e970 100644
---
a/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/logging/TestServiceRest.java
+++
b/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/logging/TestServiceRest.java
@@ -19,6 +19,7 @@
package org.apache.cxf.jaxrs.client.logging;
import jakarta.ws.rs.GET;
+import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
@@ -29,4 +30,9 @@ public class TestServiceRest {
return msg;
}
+ @POST
+ public String post(String msg) {
+ return msg;
+ }
}
+