This is an automated email from the ASF dual-hosted git repository. reta pushed a commit to branch 3.6.x-fixes in repository https://gitbox.apache.org/repos/asf/cxf.git
commit fd7ce3c83098f61e35d27c121720700e2f06c829 Author: Andriy Redko <[email protected]> AuthorDate: Sat Nov 1 12:36:38 2025 -0400 CXF-9171: DelayedCachedOutputStreamCleaner thread accumulation after CVE-2025-23184 fix (#2684) * CXF-9171: DelayedCachedOutputStreamCleaner thread accumulation after CVE-2025-23184 fix * Address code review comments (cherry picked from commit 1ce9fe1cbf779629e77f7b5f4f62bdb8d4322bcb) (cherry picked from commit 38adf23f92af3e6115e431aef694377dcec5efd1) --- .../java/org/apache/cxf/io/CachedConstants.java | 9 ++ .../cxf/io/DelayedCachedOutputStreamCleaner.java | 164 +++++++++++++++++++-- .../io/DelayedCachedOutputStreamCleanerTest.java | 58 ++++++-- 3 files changed, 206 insertions(+), 25 deletions(-) diff --git a/core/src/main/java/org/apache/cxf/io/CachedConstants.java b/core/src/main/java/org/apache/cxf/io/CachedConstants.java index 7b45d0861b8..1f5ddb9ba01 100644 --- a/core/src/main/java/org/apache/cxf/io/CachedConstants.java +++ b/core/src/main/java/org/apache/cxf/io/CachedConstants.java @@ -87,6 +87,15 @@ public final class CachedConstants { public static final String CLEANER_CLEAN_ON_SHUTDOWN_BUS_PROP = "bus.io.CachedOutputStreamCleaner.CleanOnShutdown"; + /** + * The strategy to be used for cleaning up unclosed {@code CachedOutputStream} instances. By default, + * there cleaner implementation creates a timer per each {@link Bus} instance. However, in certain + * deployments it could lead to excessive number of timers being created, so there is an alternative + * strategy that uses single timer instance. The supported strategies are: default, single-timer. + */ + public static final String CLEANER_STRATEGY_BUS_PROP = + "bus.io.CachedOutputStreamCleaner.Strategy"; + private CachedConstants() { // complete } diff --git a/core/src/main/java/org/apache/cxf/io/DelayedCachedOutputStreamCleaner.java b/core/src/main/java/org/apache/cxf/io/DelayedCachedOutputStreamCleaner.java index b52d82b71da..d3e33be7852 100644 --- a/core/src/main/java/org/apache/cxf/io/DelayedCachedOutputStreamCleaner.java +++ b/core/src/main/java/org/apache/cxf/io/DelayedCachedOutputStreamCleaner.java @@ -30,6 +30,8 @@ import java.util.TimerTask; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import java.util.logging.Logger; import javax.annotation.Resource; @@ -42,6 +44,8 @@ import org.apache.cxf.common.logging.LogUtils; public final class DelayedCachedOutputStreamCleaner implements CachedOutputStreamCleaner, BusLifeCycleListener { private static final Logger LOG = LogUtils.getL7dLogger(DelayedCachedOutputStreamCleaner.class); private static final long MIN_DELAY = 2000; /* 2 seconds */ + private static final String DEFAULT_STRATEGY = "default"; + private static final String SINGLE_TIMER_STRATEGY = "single-timer"; private static final DelayedCleaner NOOP_CLEANER = new DelayedCleaner() { // NOOP }; @@ -75,25 +79,145 @@ public final class DelayedCachedOutputStreamCleaner implements CachedOutputStrea } } - private static final class DelayedCleanerImpl implements DelayedCleaner { + private static final class SingleTimerDelayedCleaner implements DelayedCleaner { + private static final Timer TIMER = new Timer("DelayedCachedOutputStreamCleaner", true); + private final long delay; /* default is 30 minutes, in milliseconds */ private final DelayQueue<DelayedCloseable> queue = new DelayQueue<>(); - private final Timer timer; - - DelayedCleanerImpl(final long delay) { + private final Object lock = new Object(); + private TimerTask timerTask; + + SingleTimerDelayedCleaner(final long delay) { this.delay = delay; - this.timer = new Timer("DelayedCachedOutputStreamCleaner", true); - this.timer.scheduleAtFixedRate(new TimerTask() { - @Override - public void run() { - clean(); + } + + @Override + public void register(final Closeable closeable) { + TimerTask newTimerTask = null; + + synchronized (lock) { + queue.put(new DelayedCloseable(closeable, delay)); + if (timerTask == null) { + timerTask = new TimerTask() { + @Override + public void run() { + clean(); + } + }; + newTimerTask = timerTask; + } + } + + if (newTimerTask != null) { + TIMER.scheduleAtFixedRate(newTimerTask, 0, Math.max(MIN_DELAY, delay >> 1)); + } + } + + @Override + public void unregister(final Closeable closeable) { + TimerTask oldTimerTask = null; + synchronized (lock) { + queue.remove(new DelayedCloseable(closeable, delay)); + if (queue.isEmpty() && timerTask != null) { + oldTimerTask = timerTask; + timerTask = null; + } + } + if (oldTimerTask != null) { + oldTimerTask.cancel(); + } + } + + @Override + public void clean() { + final Collection<DelayedCloseable> closeables = new ArrayList<>(); + TimerTask oldTimerTask = null; + synchronized (lock) { + queue.drainTo(closeables); + if (queue.isEmpty() && timerTask != null) { + oldTimerTask = timerTask; + timerTask = null; + } + } + if (oldTimerTask != null) { + oldTimerTask.cancel(); + } + clean(closeables); + } + + @Override + public void forceClean() { + TimerTask oldTimerTask = null; + synchronized (lock) { + clean(queue); + if (timerTask != null) { + oldTimerTask = timerTask; + timerTask = null; + } + } + if (oldTimerTask != null) { + oldTimerTask.cancel(); + } + } + + @Override + public void close() { + TimerTask oldTimerTask = null; + synchronized (lock) { + queue.clear(); + if (timerTask != null) { + oldTimerTask = timerTask; + timerTask = null; + } + } + if (oldTimerTask != null) { + oldTimerTask.cancel(); + } + } + + @Override + public int size() { + return queue.size(); + } + + private void clean(Collection<DelayedCloseable> closeables) { + final Iterator<DelayedCloseable> iterator = closeables.iterator(); + while (iterator.hasNext()) { + final DelayedCloseable next = iterator.next(); + try { + iterator.remove(); + LOG.warning("Unclosed (leaked?) stream detected: " + next.closeable.hashCode()); + next.closeable.close(); + } catch (final IOException | RuntimeException ex) { + LOG.warning("Unable to close (leaked?) stream: " + ex.getMessage()); } - }, 0, Math.max(MIN_DELAY, delay >> 1)); + } + } + } + + private static final class DefaultDelayedCleaner implements DelayedCleaner { + private final long delay; /* default is 30 minutes, in milliseconds */ + private final DelayQueue<DelayedCloseable> queue = new DelayQueue<>(); + private final AtomicBoolean initialized = new AtomicBoolean(false); + private volatile Timer timer; + + DefaultDelayedCleaner(final long delay) { + this.delay = delay; } @Override public void register(Closeable closeable) { queue.put(new DelayedCloseable(closeable, delay)); + // Initialize timer lazily only when at least one closeable is registered + if (initialized.compareAndSet(false, true)) { + this.timer = new Timer("DelayedCachedOutputStreamCleaner", true); + this.timer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + clean(); + } + }, 0, Math.max(MIN_DELAY, delay >> 1)); + } } @Override @@ -115,7 +239,10 @@ public final class DelayedCachedOutputStreamCleaner implements CachedOutputStrea @Override public void close() { - timer.cancel(); + final Timer t = timer; + if (t != null) { + t.cancel(); + } queue.clear(); } @@ -187,11 +314,22 @@ public final class DelayedCachedOutputStreamCleaner implements CachedOutputStrea Number delayValue = null; BusLifeCycleManager busLifeCycleManager = null; Boolean cleanupOnShutdownValue = null; + Function<Long, DelayedCleaner> delayedCleanerStrategy = DefaultDelayedCleaner::new; if (bus != null) { delayValue = (Number) bus.getProperty(CachedConstants.CLEANER_DELAY_BUS_PROP); cleanupOnShutdownValue = (Boolean) bus.getProperty(CachedConstants.CLEANER_CLEAN_ON_SHUTDOWN_BUS_PROP); busLifeCycleManager = bus.getExtension(BusLifeCycleManager.class); + final String strategy = (String) bus.getProperty(CachedConstants.CLEANER_STRATEGY_BUS_PROP); + if (strategy == null || DEFAULT_STRATEGY.equalsIgnoreCase(strategy)) { + delayedCleanerStrategy = DefaultDelayedCleaner::new; + } else if (SINGLE_TIMER_STRATEGY.equalsIgnoreCase(strategy)) { + delayedCleanerStrategy = SingleTimerDelayedCleaner::new; + } else { + throw new IllegalArgumentException("The value of " + CachedConstants.CLEANER_STRATEGY_BUS_PROP + + " property is invalid: " + strategy + " (should be " + DEFAULT_STRATEGY + " or " + + SINGLE_TIMER_STRATEGY); + } } if (cleaner != null) { @@ -206,11 +344,11 @@ public final class DelayedCachedOutputStreamCleaner implements CachedOutputStrea if (delayValue == null) { // Default delay is set to 30 mins - cleaner = new DelayedCleanerImpl(TimeUnit.MILLISECONDS.convert(30, TimeUnit.MINUTES)); + cleaner = delayedCleanerStrategy.apply(TimeUnit.MILLISECONDS.convert(30, TimeUnit.MINUTES)); } else { final long value = delayValue.longValue(); if (value > 0 && value >= MIN_DELAY) { - cleaner = new DelayedCleanerImpl(value); /* already in milliseconds */ + cleaner = delayedCleanerStrategy.apply(value); /* already in milliseconds */ } else { cleaner = NOOP_CLEANER; if (value != 0) { diff --git a/core/src/test/java/org/apache/cxf/io/DelayedCachedOutputStreamCleanerTest.java b/core/src/test/java/org/apache/cxf/io/DelayedCachedOutputStreamCleanerTest.java index 562b3a18f25..6e408d66096 100644 --- a/core/src/test/java/org/apache/cxf/io/DelayedCachedOutputStreamCleanerTest.java +++ b/core/src/test/java/org/apache/cxf/io/DelayedCachedOutputStreamCleanerTest.java @@ -33,6 +33,8 @@ import org.apache.cxf.bus.extension.ExtensionManagerBus; import org.junit.After; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import static org.awaitility.Awaitility.await; import static org.hamcrest.CoreMatchers.equalTo; @@ -40,9 +42,20 @@ import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +@RunWith(Parameterized.class) public class DelayedCachedOutputStreamCleanerTest { + private final String strategy; private Bus bus; + public DelayedCachedOutputStreamCleanerTest(final String strategy) { + this.strategy = strategy; + } + + @Parameterized.Parameters(name = CachedConstants.CLEANER_STRATEGY_BUS_PROP + "= {0}") + public static String[] strategies() throws Exception { + return new String[] {"default", "single-timer", null}; + } + @After public void tearDown() { if (bus != null) { @@ -50,21 +63,30 @@ public class DelayedCachedOutputStreamCleanerTest { bus = null; } } - + @Test public void testNoop() { final Map<String, Object> properties = Collections.singletonMap(CachedConstants.CLEANER_DELAY_BUS_PROP, 0); - bus = new ExtensionManagerBus(new HashMap<>(), properties); + bus = createBus(properties); final CachedOutputStreamCleaner cleaner = bus.getExtension(CachedOutputStreamCleaner.class); assertThat(cleaner, instanceOf(DelayedCachedOutputStreamCleaner.class)); /* noop */ assertNoopCleaner(cleaner); } - + + @Test + public void testNoTimer() { + final Map<String, Object> properties = Collections.singletonMap(CachedConstants.CLEANER_DELAY_BUS_PROP, 2500); + bus = createBus(properties); + + final CachedOutputStreamCleaner cleaner = bus.getExtension(CachedOutputStreamCleaner.class); + assertThat(cleaner, instanceOf(DelayedCachedOutputStreamCleaner.class)); + } + @Test public void testForceClean() throws InterruptedException { - bus = new ExtensionManagerBus(); + bus = createBus(); final CachedOutputStreamCleaner cleaner = bus.getExtension(CachedOutputStreamCleaner.class); assertThat(cleaner, instanceOf(DelayedCachedOutputStreamCleaner.class)); @@ -88,7 +110,7 @@ public class DelayedCachedOutputStreamCleanerTest { /* Delay of 2.5 seconds */ final Map<String, Object> properties = Collections.singletonMap(CachedConstants.CLEANER_DELAY_BUS_PROP, 2500); - bus = new ExtensionManagerBus(new HashMap<>(), properties); + bus = createBus(properties); final CachedOutputStreamCleaner cleaner = bus.getExtension(CachedOutputStreamCleaner.class); cleaner.register(closeable1); @@ -101,7 +123,7 @@ public class DelayedCachedOutputStreamCleanerTest { @Test public void testForceCleanForEmpty() throws InterruptedException { - bus = new ExtensionManagerBus(); + bus = createBus(); final CachedOutputStreamCleaner cleaner = bus.getExtension(CachedOutputStreamCleaner.class); assertThat(cleaner, instanceOf(DelayedCachedOutputStreamCleaner.class)); @@ -121,7 +143,7 @@ public class DelayedCachedOutputStreamCleanerTest { @Test public void testForceCleanException() throws InterruptedException { - bus = new ExtensionManagerBus(); + bus = createBus(); final CachedOutputStreamCleaner cleaner = bus.getExtension(CachedOutputStreamCleaner.class); assertThat(cleaner, instanceOf(DelayedCachedOutputStreamCleaner.class)); @@ -149,7 +171,7 @@ public class DelayedCachedOutputStreamCleanerTest { public void testCleanOnShutdown() throws InterruptedException { /* Delay of 5 seconds */ final Map<String, Object> properties = Collections.singletonMap(CachedConstants.CLEANER_DELAY_BUS_PROP, 5000); - bus = new ExtensionManagerBus(new HashMap<>(), properties); + bus = createBus(properties); final AtomicBoolean latch = new AtomicBoolean(); final Closeable closeable = () -> latch.compareAndSet(false, true); @@ -170,7 +192,7 @@ public class DelayedCachedOutputStreamCleanerTest { final Map<String, Object> properties = new HashMap<>(); properties.put(CachedConstants.CLEANER_DELAY_BUS_PROP, 3000); /* 3 seconds */ properties.put(CachedConstants.CLEANER_CLEAN_ON_SHUTDOWN_BUS_PROP, false); - bus = new ExtensionManagerBus(new HashMap<>(), properties); + bus = createBus(properties); final AtomicBoolean latch = new AtomicBoolean(); final Closeable closeable = () -> latch.compareAndSet(false, true); @@ -188,7 +210,7 @@ public class DelayedCachedOutputStreamCleanerTest { @Test public void testNegativeDelay() throws InterruptedException { final Map<String, Object> properties = Collections.singletonMap(CachedConstants.CLEANER_DELAY_BUS_PROP, -1); - bus = new ExtensionManagerBus(new HashMap<>(), properties); + bus = createBus(properties); final CachedOutputStreamCleaner cleaner = bus.getExtension(CachedOutputStreamCleaner.class); assertThat(cleaner, instanceOf(DelayedCachedOutputStreamCleaner.class)); /* noop */ @@ -199,7 +221,7 @@ public class DelayedCachedOutputStreamCleanerTest { @Test public void testTooSmallDelay() throws InterruptedException { final Map<String, Object> properties = Collections.singletonMap(CachedConstants.CLEANER_DELAY_BUS_PROP, 1500); - bus = new ExtensionManagerBus(new HashMap<>(), properties); + bus = createBus(properties); final CachedOutputStreamCleaner cleaner = bus.getExtension(CachedOutputStreamCleaner.class); assertThat(cleaner, instanceOf(DelayedCachedOutputStreamCleaner.class)); /* noop */ @@ -215,7 +237,7 @@ public class DelayedCachedOutputStreamCleanerTest { /* Delay of 5 seconds */ final Map<String, Object> properties = Collections.singletonMap(CachedConstants.CLEANER_DELAY_BUS_PROP, 2500); - bus = new ExtensionManagerBus(new HashMap<>(), properties); + bus = createBus(properties); final CachedOutputStreamCleaner cleaner = bus.getExtension(CachedOutputStreamCleaner.class); cleaner.register(closeable1); @@ -232,6 +254,18 @@ public class DelayedCachedOutputStreamCleanerTest { assertThat(cleaner, instanceOf(DelayedCachedOutputStreamCleaner.class)); } + private Bus createBus() { + return createBus(Collections.emptyMap()); + } + + private Bus createBus(Map<String, Object> properties) { + final Map<String, Object> combined = new HashMap<>(properties); + if (strategy != null) { + combined.put(CachedConstants.CLEANER_STRATEGY_BUS_PROP, strategy); + } + return new ExtensionManagerBus(new HashMap<>(), combined); + } + private void assertNoopCleaner(final CachedOutputStreamCleaner cleaner) { final AtomicBoolean latch = new AtomicBoolean(false); final Closeable closeable = () -> latch.compareAndSet(false, true);
