This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch 3.6.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git

commit fd7ce3c83098f61e35d27c121720700e2f06c829
Author: Andriy Redko <[email protected]>
AuthorDate: Sat Nov 1 12:36:38 2025 -0400

    CXF-9171: DelayedCachedOutputStreamCleaner thread accumulation after 
CVE-2025-23184 fix (#2684)
    
    * CXF-9171: DelayedCachedOutputStreamCleaner thread accumulation after 
CVE-2025-23184 fix
    
    * Address code review comments
    
    (cherry picked from commit 1ce9fe1cbf779629e77f7b5f4f62bdb8d4322bcb)
    (cherry picked from commit 38adf23f92af3e6115e431aef694377dcec5efd1)
---
 .../java/org/apache/cxf/io/CachedConstants.java    |   9 ++
 .../cxf/io/DelayedCachedOutputStreamCleaner.java   | 164 +++++++++++++++++++--
 .../io/DelayedCachedOutputStreamCleanerTest.java   |  58 ++++++--
 3 files changed, 206 insertions(+), 25 deletions(-)

diff --git a/core/src/main/java/org/apache/cxf/io/CachedConstants.java 
b/core/src/main/java/org/apache/cxf/io/CachedConstants.java
index 7b45d0861b8..1f5ddb9ba01 100644
--- a/core/src/main/java/org/apache/cxf/io/CachedConstants.java
+++ b/core/src/main/java/org/apache/cxf/io/CachedConstants.java
@@ -87,6 +87,15 @@ public final class CachedConstants {
     public static final String CLEANER_CLEAN_ON_SHUTDOWN_BUS_PROP =
         "bus.io.CachedOutputStreamCleaner.CleanOnShutdown";
 
+    /**
+     * The strategy to be used for cleaning up unclosed {@code 
CachedOutputStream} instances. By default,
+     * there cleaner implementation creates a timer per each {@link Bus} 
instance. However, in certain 
+     * deployments it could lead to excessive number of timers being created, 
so there is an alternative
+     * strategy that uses single timer instance. The supported strategies are: 
default, single-timer.
+     */
+    public static final String CLEANER_STRATEGY_BUS_PROP =
+        "bus.io.CachedOutputStreamCleaner.Strategy";
+
     private CachedConstants() {
         // complete
     }
diff --git 
a/core/src/main/java/org/apache/cxf/io/DelayedCachedOutputStreamCleaner.java 
b/core/src/main/java/org/apache/cxf/io/DelayedCachedOutputStreamCleaner.java
index b52d82b71da..d3e33be7852 100644
--- a/core/src/main/java/org/apache/cxf/io/DelayedCachedOutputStreamCleaner.java
+++ b/core/src/main/java/org/apache/cxf/io/DelayedCachedOutputStreamCleaner.java
@@ -30,6 +30,8 @@ import java.util.TimerTask;
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
 import java.util.logging.Logger;
 
 import javax.annotation.Resource;
@@ -42,6 +44,8 @@ import org.apache.cxf.common.logging.LogUtils;
 public final class DelayedCachedOutputStreamCleaner implements 
CachedOutputStreamCleaner, BusLifeCycleListener {
     private static final Logger LOG = 
LogUtils.getL7dLogger(DelayedCachedOutputStreamCleaner.class);
     private static final long MIN_DELAY = 2000; /* 2 seconds */
+    private static final String DEFAULT_STRATEGY = "default";
+    private static final String SINGLE_TIMER_STRATEGY = "single-timer";
     private static final DelayedCleaner NOOP_CLEANER = new DelayedCleaner() {
         // NOOP
     };
@@ -75,25 +79,145 @@ public final class DelayedCachedOutputStreamCleaner 
implements CachedOutputStrea
         }
     }
 
-    private static final class DelayedCleanerImpl implements DelayedCleaner {
+    private static final class SingleTimerDelayedCleaner implements 
DelayedCleaner {
+        private static final Timer TIMER = new 
Timer("DelayedCachedOutputStreamCleaner", true);
+
         private final long delay; /* default is 30 minutes, in milliseconds */
         private final DelayQueue<DelayedCloseable> queue = new DelayQueue<>();
-        private final Timer timer;
-        
-        DelayedCleanerImpl(final long delay) {
+        private final Object lock = new Object();
+        private TimerTask timerTask;
+
+        SingleTimerDelayedCleaner(final long delay) {
             this.delay = delay;
-            this.timer = new Timer("DelayedCachedOutputStreamCleaner", true);
-            this.timer.scheduleAtFixedRate(new TimerTask() {
-                @Override
-                public void run() {
-                    clean();
+        }
+
+        @Override
+        public void register(final Closeable closeable) {
+            TimerTask newTimerTask = null;
+
+            synchronized (lock) {
+                queue.put(new DelayedCloseable(closeable, delay));
+                if (timerTask == null) {
+                    timerTask = new TimerTask() {
+                        @Override
+                        public void run() {
+                            clean();
+                        }
+                    };
+                    newTimerTask = timerTask;
+                }
+            }
+
+            if (newTimerTask != null) {
+                TIMER.scheduleAtFixedRate(newTimerTask, 0, Math.max(MIN_DELAY, 
delay >> 1));
+            }
+        }
+
+        @Override
+        public void unregister(final Closeable closeable) {
+            TimerTask oldTimerTask = null;
+            synchronized (lock) {
+                queue.remove(new DelayedCloseable(closeable, delay));
+                if (queue.isEmpty() && timerTask != null) {
+                    oldTimerTask = timerTask;
+                    timerTask = null;
+                }
+            }
+            if (oldTimerTask != null) {
+                oldTimerTask.cancel();
+            }
+        }
+
+        @Override
+        public void clean() {
+            final Collection<DelayedCloseable> closeables = new ArrayList<>();
+            TimerTask oldTimerTask = null;
+            synchronized (lock) {
+                queue.drainTo(closeables);
+                if (queue.isEmpty() && timerTask != null) {
+                    oldTimerTask = timerTask;
+                    timerTask = null;
+                }
+            }
+            if (oldTimerTask != null) {
+                oldTimerTask.cancel();
+            }
+            clean(closeables);
+        }
+
+        @Override
+        public void forceClean() {
+            TimerTask oldTimerTask = null;
+            synchronized (lock) {
+                clean(queue);
+                if (timerTask != null) {
+                    oldTimerTask = timerTask;
+                    timerTask = null;
+                }
+            }
+            if (oldTimerTask != null) {
+                oldTimerTask.cancel();
+            }
+        }
+
+        @Override
+        public void close()  {
+            TimerTask oldTimerTask = null;
+            synchronized (lock) {
+                queue.clear();
+                if (timerTask != null) {
+                    oldTimerTask = timerTask;
+                    timerTask = null;
+                }
+            }
+            if (oldTimerTask != null) {
+                oldTimerTask.cancel();
+            }
+        }
+
+        @Override
+        public int size() {
+            return queue.size();
+        }
+
+        private void clean(Collection<DelayedCloseable> closeables) {
+            final Iterator<DelayedCloseable> iterator = closeables.iterator();
+            while (iterator.hasNext()) {
+                final DelayedCloseable next = iterator.next();
+                try {
+                    iterator.remove();
+                    LOG.warning("Unclosed (leaked?) stream detected: " + 
next.closeable.hashCode());
+                    next.closeable.close();
+                } catch (final IOException | RuntimeException ex) {
+                    LOG.warning("Unable to close (leaked?) stream: " + 
ex.getMessage());
                 }
-            }, 0, Math.max(MIN_DELAY, delay >> 1));
+            }
+        }
+    }
+
+    private static final class DefaultDelayedCleaner implements DelayedCleaner 
{
+        private final long delay; /* default is 30 minutes, in milliseconds */
+        private final DelayQueue<DelayedCloseable> queue = new DelayQueue<>();
+        private final AtomicBoolean initialized = new AtomicBoolean(false);
+        private volatile Timer timer;
+
+        DefaultDelayedCleaner(final long delay) {
+            this.delay = delay;
         }
 
         @Override
         public void register(Closeable closeable) {
             queue.put(new DelayedCloseable(closeable, delay));
+            // Initialize timer lazily only when at least one closeable is 
registered
+            if (initialized.compareAndSet(false, true)) {
+                this.timer = new Timer("DelayedCachedOutputStreamCleaner", 
true);
+                this.timer.scheduleAtFixedRate(new TimerTask() {
+                    @Override
+                    public void run() {
+                        clean();
+                    }
+                }, 0, Math.max(MIN_DELAY, delay >> 1));
+            }
         }
 
         @Override
@@ -115,7 +239,10 @@ public final class DelayedCachedOutputStreamCleaner 
implements CachedOutputStrea
 
         @Override
         public void close()  {
-            timer.cancel();
+            final Timer t = timer;
+            if (t != null) {
+                t.cancel();
+            }
             queue.clear();
         }
 
@@ -187,11 +314,22 @@ public final class DelayedCachedOutputStreamCleaner 
implements CachedOutputStrea
         Number delayValue = null;
         BusLifeCycleManager busLifeCycleManager = null;
         Boolean cleanupOnShutdownValue = null;
+        Function<Long, DelayedCleaner> delayedCleanerStrategy = 
DefaultDelayedCleaner::new;
 
         if (bus != null) {
             delayValue = (Number) 
bus.getProperty(CachedConstants.CLEANER_DELAY_BUS_PROP);
             cleanupOnShutdownValue = (Boolean) 
bus.getProperty(CachedConstants.CLEANER_CLEAN_ON_SHUTDOWN_BUS_PROP);
             busLifeCycleManager = bus.getExtension(BusLifeCycleManager.class);
+            final String strategy = (String) 
bus.getProperty(CachedConstants.CLEANER_STRATEGY_BUS_PROP);
+            if (strategy == null || 
DEFAULT_STRATEGY.equalsIgnoreCase(strategy)) {
+                delayedCleanerStrategy = DefaultDelayedCleaner::new;
+            } else if (SINGLE_TIMER_STRATEGY.equalsIgnoreCase(strategy)) {
+                delayedCleanerStrategy = SingleTimerDelayedCleaner::new;
+            } else {
+                throw new IllegalArgumentException("The value of " + 
CachedConstants.CLEANER_STRATEGY_BUS_PROP 
+                    + " property is invalid: " + strategy + " (should be " + 
DEFAULT_STRATEGY + " or " 
+                        + SINGLE_TIMER_STRATEGY);
+            }
         }
 
         if (cleaner != null) {
@@ -206,11 +344,11 @@ public final class DelayedCachedOutputStreamCleaner 
implements CachedOutputStrea
 
         if (delayValue == null) {
             // Default delay is set to 30 mins
-            cleaner = new DelayedCleanerImpl(TimeUnit.MILLISECONDS.convert(30, 
TimeUnit.MINUTES));
+            cleaner = 
delayedCleanerStrategy.apply(TimeUnit.MILLISECONDS.convert(30, 
TimeUnit.MINUTES));
         } else {
             final long value = delayValue.longValue();
             if (value > 0 && value >= MIN_DELAY) {
-                cleaner = new DelayedCleanerImpl(value); /* already in 
milliseconds */
+                cleaner = delayedCleanerStrategy.apply(value); /* already in 
milliseconds */
             } else {
                 cleaner = NOOP_CLEANER;
                 if (value != 0) {
diff --git 
a/core/src/test/java/org/apache/cxf/io/DelayedCachedOutputStreamCleanerTest.java
 
b/core/src/test/java/org/apache/cxf/io/DelayedCachedOutputStreamCleanerTest.java
index 562b3a18f25..6e408d66096 100644
--- 
a/core/src/test/java/org/apache/cxf/io/DelayedCachedOutputStreamCleanerTest.java
+++ 
b/core/src/test/java/org/apache/cxf/io/DelayedCachedOutputStreamCleanerTest.java
@@ -33,6 +33,8 @@ import org.apache.cxf.bus.extension.ExtensionManagerBus;
 
 import org.junit.After;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -40,9 +42,20 @@ import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
+@RunWith(Parameterized.class)
 public class DelayedCachedOutputStreamCleanerTest {
+    private final String strategy;
     private Bus bus;
 
+    public DelayedCachedOutputStreamCleanerTest(final String strategy) {
+        this.strategy = strategy;
+    }
+
+    @Parameterized.Parameters(name = CachedConstants.CLEANER_STRATEGY_BUS_PROP 
+ "= {0}")
+    public static String[] strategies() throws Exception {
+        return new String[] {"default", "single-timer", null};
+    }
+
     @After
     public void tearDown() {
         if (bus != null) {
@@ -50,21 +63,30 @@ public class DelayedCachedOutputStreamCleanerTest {
             bus = null;
         }
     }
-    
+
     @Test
     public void testNoop() {
         final Map<String, Object> properties = 
Collections.singletonMap(CachedConstants.CLEANER_DELAY_BUS_PROP, 0);
-        bus = new ExtensionManagerBus(new HashMap<>(), properties);
+        bus = createBus(properties);
         
         final CachedOutputStreamCleaner cleaner = 
bus.getExtension(CachedOutputStreamCleaner.class);
         assertThat(cleaner, 
instanceOf(DelayedCachedOutputStreamCleaner.class)); /* noop */
         
         assertNoopCleaner(cleaner);
     }
-    
+
+    @Test
+    public void testNoTimer() {
+        final Map<String, Object> properties = 
Collections.singletonMap(CachedConstants.CLEANER_DELAY_BUS_PROP, 2500);
+        bus = createBus(properties);
+        
+        final CachedOutputStreamCleaner cleaner = 
bus.getExtension(CachedOutputStreamCleaner.class);
+        assertThat(cleaner, 
instanceOf(DelayedCachedOutputStreamCleaner.class));
+    }
+
     @Test
     public void testForceClean() throws InterruptedException {
-        bus = new ExtensionManagerBus();
+        bus = createBus();
         
         final CachedOutputStreamCleaner cleaner = 
bus.getExtension(CachedOutputStreamCleaner.class);
         assertThat(cleaner, 
instanceOf(DelayedCachedOutputStreamCleaner.class));
@@ -88,7 +110,7 @@ public class DelayedCachedOutputStreamCleanerTest {
 
         /* Delay of 2.5 seconds */
         final Map<String, Object> properties = 
Collections.singletonMap(CachedConstants.CLEANER_DELAY_BUS_PROP, 2500);
-        bus = new ExtensionManagerBus(new HashMap<>(), properties);
+        bus = createBus(properties);
 
         final CachedOutputStreamCleaner cleaner = 
bus.getExtension(CachedOutputStreamCleaner.class);
         cleaner.register(closeable1);
@@ -101,7 +123,7 @@ public class DelayedCachedOutputStreamCleanerTest {
     
     @Test
     public void testForceCleanForEmpty() throws InterruptedException {
-        bus = new ExtensionManagerBus();
+        bus = createBus();
 
         final CachedOutputStreamCleaner cleaner = 
bus.getExtension(CachedOutputStreamCleaner.class);
         assertThat(cleaner, 
instanceOf(DelayedCachedOutputStreamCleaner.class));
@@ -121,7 +143,7 @@ public class DelayedCachedOutputStreamCleanerTest {
     
     @Test
     public void testForceCleanException() throws InterruptedException {
-        bus = new ExtensionManagerBus();
+        bus = createBus();
 
         final CachedOutputStreamCleaner cleaner = 
bus.getExtension(CachedOutputStreamCleaner.class);
         assertThat(cleaner, 
instanceOf(DelayedCachedOutputStreamCleaner.class));
@@ -149,7 +171,7 @@ public class DelayedCachedOutputStreamCleanerTest {
     public void testCleanOnShutdown() throws InterruptedException {
         /* Delay of 5 seconds */
         final Map<String, Object> properties = 
Collections.singletonMap(CachedConstants.CLEANER_DELAY_BUS_PROP, 5000);
-        bus = new ExtensionManagerBus(new HashMap<>(), properties);
+        bus = createBus(properties);
 
         final AtomicBoolean latch = new AtomicBoolean();
         final Closeable closeable = () -> latch.compareAndSet(false, true);
@@ -170,7 +192,7 @@ public class DelayedCachedOutputStreamCleanerTest {
         final Map<String, Object> properties = new HashMap<>();
         properties.put(CachedConstants.CLEANER_DELAY_BUS_PROP, 3000); /* 3 
seconds */
         properties.put(CachedConstants.CLEANER_CLEAN_ON_SHUTDOWN_BUS_PROP, 
false);
-        bus = new ExtensionManagerBus(new HashMap<>(), properties);
+        bus = createBus(properties);
 
         final AtomicBoolean latch = new AtomicBoolean();
         final Closeable closeable = () -> latch.compareAndSet(false, true);
@@ -188,7 +210,7 @@ public class DelayedCachedOutputStreamCleanerTest {
     @Test
     public void testNegativeDelay() throws InterruptedException {
         final Map<String, Object> properties = 
Collections.singletonMap(CachedConstants.CLEANER_DELAY_BUS_PROP, -1);
-        bus = new ExtensionManagerBus(new HashMap<>(), properties);
+        bus = createBus(properties);
 
         final CachedOutputStreamCleaner cleaner = 
bus.getExtension(CachedOutputStreamCleaner.class);
         assertThat(cleaner, 
instanceOf(DelayedCachedOutputStreamCleaner.class)); /* noop */
@@ -199,7 +221,7 @@ public class DelayedCachedOutputStreamCleanerTest {
     @Test
     public void testTooSmallDelay() throws InterruptedException {
         final Map<String, Object> properties = 
Collections.singletonMap(CachedConstants.CLEANER_DELAY_BUS_PROP, 1500);
-        bus = new ExtensionManagerBus(new HashMap<>(), properties);
+        bus = createBus(properties);
 
         final CachedOutputStreamCleaner cleaner = 
bus.getExtension(CachedOutputStreamCleaner.class);
         assertThat(cleaner, 
instanceOf(DelayedCachedOutputStreamCleaner.class)); /* noop */
@@ -215,7 +237,7 @@ public class DelayedCachedOutputStreamCleanerTest {
 
         /* Delay of 5 seconds */
         final Map<String, Object> properties = 
Collections.singletonMap(CachedConstants.CLEANER_DELAY_BUS_PROP, 2500);
-        bus = new ExtensionManagerBus(new HashMap<>(), properties);
+        bus = createBus(properties);
 
         final CachedOutputStreamCleaner cleaner = 
bus.getExtension(CachedOutputStreamCleaner.class);
         cleaner.register(closeable1);
@@ -232,6 +254,18 @@ public class DelayedCachedOutputStreamCleanerTest {
         assertThat(cleaner, 
instanceOf(DelayedCachedOutputStreamCleaner.class));
     }
 
+    private Bus createBus() {
+        return createBus(Collections.emptyMap());
+    }
+
+    private Bus createBus(Map<String, Object> properties) {
+        final Map<String, Object> combined = new HashMap<>(properties);
+        if (strategy != null) {
+            combined.put(CachedConstants.CLEANER_STRATEGY_BUS_PROP, strategy);
+        }
+        return new ExtensionManagerBus(new HashMap<>(), combined);
+    }
+
     private void assertNoopCleaner(final CachedOutputStreamCleaner cleaner) {
         final AtomicBoolean latch = new AtomicBoolean(false);
         final Closeable closeable = () -> latch.compareAndSet(false, true);

Reply via email to