This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch 3.5.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git

commit 0fedc36c8c12ede90c7a08a3c27a4d0cee2448f7
Author: Andriy Redko <[email protected]>
AuthorDate: Thu Feb 27 20:18:55 2025 -0500

    CXF-9110: Fix extensive memory usage with enabled LoggingFeature (#2278)
    
    * CXF-9110: Fix extensive memory usage with enabled LoggingFeature
    
    * CXF-9110: Memory leak in DelayedCleanerImpl.queue when LoggingFeature is 
enabled
    
    ---------
    
    Co-authored-by: Yury Molchan <[email protected]>
    (cherry picked from commit a9351cd96339c9886058581d44ba92cb02a59713)
    (cherry picked from commit 2ec08fac115e6b287510da55eb72a7f8c1b72bdb)
    
    # Conflicts:
    #       
rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/logging/TestServiceRest.java
    (cherry picked from commit 0532266fa62e686c56e23c4d08062ecfa8eec2a3)
---
 .../java/org/apache/cxf/io/CachedOutputStream.java |  2 +
 .../apache/cxf/io/CachedOutputStreamCleaner.java   |  8 +++
 .../cxf/io/DelayedCachedOutputStreamCleaner.java   | 29 +++++++---
 .../cxf/jaxrs/client/logging/RESTLoggingTest.java  | 62 ++++++++++++++++++++++
 .../cxf/jaxrs/client/logging/TestServiceRest.java  |  6 +++
 5 files changed, 100 insertions(+), 7 deletions(-)

diff --git a/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java 
b/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java
index 3ba937d04b..f5526fa57f 100644
--- a/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java
+++ b/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java
@@ -283,8 +283,10 @@ public class CachedOutputStream extends OutputStream {
                     }
                 } finally {
                     streamList.remove(currentStream);
+                    // we are not backed by file anymore, unregister from the 
cleaner
                     if (cachedOutputStreamCleaner != null) {
                         cachedOutputStreamCleaner.unregister(currentStream);
+                        cachedOutputStreamCleaner.unregister(this);
                     }
                     deleteTempFile();
                     inmem = true;
diff --git 
a/core/src/main/java/org/apache/cxf/io/CachedOutputStreamCleaner.java 
b/core/src/main/java/org/apache/cxf/io/CachedOutputStreamCleaner.java
index 3d6361a4c9..dbef316afb 100644
--- a/core/src/main/java/org/apache/cxf/io/CachedOutputStreamCleaner.java
+++ b/core/src/main/java/org/apache/cxf/io/CachedOutputStreamCleaner.java
@@ -40,4 +40,12 @@ public interface CachedOutputStreamCleaner {
      * Unregister the stream instance from the clean up (closed properly)
      */
     void register(Closeable closeable);
+
+    /**
+     * The exact or approximate (depending on the implementation) size of the 
cleaner queue
+     * @return exact or approximate (depending on the implementation) size of 
the cleaner queue
+     */
+    default int size() {
+        return 0;
+    }
 }
diff --git 
a/core/src/main/java/org/apache/cxf/io/DelayedCachedOutputStreamCleaner.java 
b/core/src/main/java/org/apache/cxf/io/DelayedCachedOutputStreamCleaner.java
index 65e85da0a6..524adc0a69 100644
--- a/core/src/main/java/org/apache/cxf/io/DelayedCachedOutputStreamCleaner.java
+++ b/core/src/main/java/org/apache/cxf/io/DelayedCachedOutputStreamCleaner.java
@@ -53,19 +53,24 @@ public final class DelayedCachedOutputStreamCleaner 
implements CachedOutputStrea
         @Override
         default void register(Closeable closeable) {
         }
-        
+
         @Override
         default void unregister(Closeable closeable) {
         }
-        
+
         @Override
         default void close() {
         }
-        
+
         @Override
         default void clean() {
         }
-        
+
+        @Override
+        default int size() {
+            return 0;
+        }
+
         default void forceClean() {
         }
     }
@@ -102,18 +107,23 @@ public final class DelayedCachedOutputStreamCleaner 
implements CachedOutputStrea
             queue.drainTo(closeables);
             clean(closeables);
         }
-        
+
         @Override
         public void forceClean() {
             clean(queue);
         }
-        
+
         @Override
         public void close()  {
             timer.cancel();
             queue.clear();
         }
-        
+
+        @Override
+        public int size() {
+            return queue.size();
+        }
+
         private void clean(Collection<DelayedCloseable> closeables) {
             final Iterator<DelayedCloseable> iterator = closeables.iterator();
             while (iterator.hasNext()) {
@@ -225,6 +235,11 @@ public final class DelayedCachedOutputStreamCleaner 
implements CachedOutputStrea
         cleaner.unregister(closeable);
     }
 
+    @Override
+    public int size() {
+        return cleaner.size();
+    }
+
     @Override
     public void clean() {
         cleaner.clean();
diff --git 
a/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/logging/RESTLoggingTest.java
 
b/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/logging/RESTLoggingTest.java
index 6c81b20d1d..01c188c96f 100644
--- 
a/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/logging/RESTLoggingTest.java
+++ 
b/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/logging/RESTLoggingTest.java
@@ -19,17 +19,21 @@
 package org.apache.cxf.jaxrs.client.logging;
 
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.MalformedURLException;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.atomic.LongAdder;
 
+import org.apache.cxf.Bus;
 import org.apache.cxf.endpoint.Server;
 import org.apache.cxf.ext.logging.AbstractLoggingInterceptor;
 import org.apache.cxf.ext.logging.LoggingFeature;
 import org.apache.cxf.ext.logging.event.EventType;
 import org.apache.cxf.ext.logging.event.LogEvent;
+import org.apache.cxf.io.CachedOutputStreamCleaner;
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
 import org.apache.cxf.jaxrs.client.JAXRSClientFactoryBean;
 import org.apache.cxf.jaxrs.client.WebClient;
@@ -39,7 +43,10 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import static org.awaitility.Awaitility.await;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
 
 public class RESTLoggingTest {
 
@@ -57,6 +64,61 @@ public class RESTLoggingTest {
         Assert.assertEquals("test1", result);
     }
 
+    @Test
+    public void testCacheCleanUp() throws Exception {
+        LoggingFeature loggingFeature = new LoggingFeature();
+        loggingFeature.setInMemThreshold(1); // To activate usage of the 
CachedOutputStream
+
+        Server server = createService(SERVICE_URI, new TestServiceRest(), 
loggingFeature);
+        server.start();
+
+        try {
+            final JAXRSClientFactoryBean bean = new JAXRSClientFactoryBean();
+            bean.setAddress(SERVICE_URI);
+            bean.setTransportId(LocalTransportFactory.TRANSPORT_ID);
+    
+            final LongAdder registers = new LongAdder();
+            final WebClient client = bean.createWebClient();
+            final Bus bus = bean.getBus();
+
+            // See please https://issues.apache.org/jira/browse/CXF-9110
+            final CachedOutputStreamCleaner cleaner = 
bus.getExtension(CachedOutputStreamCleaner.class);
+            bus.setExtension(new CachedOutputStreamCleaner() {
+                @Override
+                public void clean() {
+                    cleaner.clean();
+                }
+
+                @Override
+                public void unregister(Closeable closeable) {
+                    cleaner.unregister(closeable);
+                }
+
+                @Override
+                public void register(Closeable closeable) {
+                    cleaner.register(closeable);
+                    registers.increment();
+                }
+
+                @Override
+                public int size() {
+                    return cleaner.size();
+                }
+            }, CachedOutputStreamCleaner.class);
+
+            String response = null;
+            for (int i = 0; i < 1_000; i++) { // ~2...5 seconds of the 
execution
+                response = client.post("DATA", String.class);
+            }
+            assertEquals("DATA", response);
+
+            assertThat(registers.longValue(), equalTo(3000L));
+            assertThat(cleaner.size(), equalTo(0));
+        } finally {
+            server.destroy();
+        }
+    }
+
     @Test
     public void testBinary() throws IOException, InterruptedException {
         LoggingFeature loggingFeature = new LoggingFeature();
diff --git 
a/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/logging/TestServiceRest.java
 
b/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/logging/TestServiceRest.java
index cf506a4f29..0d94fc1f71 100644
--- 
a/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/logging/TestServiceRest.java
+++ 
b/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/logging/TestServiceRest.java
@@ -19,6 +19,7 @@
 package org.apache.cxf.jaxrs.client.logging;
 
 import javax.ws.rs.GET;
+import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 
@@ -29,4 +30,9 @@ public class TestServiceRest {
         return msg;
     }
 
+    @POST
+    public String post(String msg) {
+        return msg;
+    }
 }
+

Reply via email to