This is an automated email from the ASF dual-hosted git repository.
reta pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cxf.git
The following commit(s) were added to refs/heads/main by this push:
new 1ce9fe1cbf7 CXF-9171: DelayedCachedOutputStreamCleaner thread
accumulation after CVE-2025-23184 fix (#2684)
1ce9fe1cbf7 is described below
commit 1ce9fe1cbf779629e77f7b5f4f62bdb8d4322bcb
Author: Andriy Redko <[email protected]>
AuthorDate: Sat Nov 1 12:36:38 2025 -0400
CXF-9171: DelayedCachedOutputStreamCleaner thread accumulation after
CVE-2025-23184 fix (#2684)
* CXF-9171: DelayedCachedOutputStreamCleaner thread accumulation after
CVE-2025-23184 fix
* Address code review comments
---
.../java/org/apache/cxf/io/CachedConstants.java | 9 ++
.../cxf/io/DelayedCachedOutputStreamCleaner.java | 164 +++++++++++++++++++--
.../io/DelayedCachedOutputStreamCleanerTest.java | 58 ++++++--
3 files changed, 206 insertions(+), 25 deletions(-)
diff --git a/core/src/main/java/org/apache/cxf/io/CachedConstants.java
b/core/src/main/java/org/apache/cxf/io/CachedConstants.java
index 7b45d0861b8..1f5ddb9ba01 100644
--- a/core/src/main/java/org/apache/cxf/io/CachedConstants.java
+++ b/core/src/main/java/org/apache/cxf/io/CachedConstants.java
@@ -87,6 +87,15 @@ public final class CachedConstants {
public static final String CLEANER_CLEAN_ON_SHUTDOWN_BUS_PROP =
"bus.io.CachedOutputStreamCleaner.CleanOnShutdown";
+ /**
+ * The strategy to be used for cleaning up unclosed {@code
CachedOutputStream} instances. By default,
+ * there cleaner implementation creates a timer per each {@link Bus}
instance. However, in certain
+ * deployments it could lead to excessive number of timers being created,
so there is an alternative
+ * strategy that uses single timer instance. The supported strategies are:
default, single-timer.
+ */
+ public static final String CLEANER_STRATEGY_BUS_PROP =
+ "bus.io.CachedOutputStreamCleaner.Strategy";
+
private CachedConstants() {
// complete
}
diff --git
a/core/src/main/java/org/apache/cxf/io/DelayedCachedOutputStreamCleaner.java
b/core/src/main/java/org/apache/cxf/io/DelayedCachedOutputStreamCleaner.java
index 77721d115ff..f1e79649236 100644
--- a/core/src/main/java/org/apache/cxf/io/DelayedCachedOutputStreamCleaner.java
+++ b/core/src/main/java/org/apache/cxf/io/DelayedCachedOutputStreamCleaner.java
@@ -30,6 +30,8 @@ import java.util.TimerTask;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
import java.util.logging.Logger;
import jakarta.annotation.Resource;
@@ -41,6 +43,8 @@ import org.apache.cxf.common.logging.LogUtils;
public final class DelayedCachedOutputStreamCleaner implements
CachedOutputStreamCleaner, BusLifeCycleListener {
private static final Logger LOG =
LogUtils.getL7dLogger(DelayedCachedOutputStreamCleaner.class);
private static final long MIN_DELAY = 2000; /* 2 seconds */
+ private static final String DEFAULT_STRATEGY = "default";
+ private static final String SINGLE_TIMER_STRATEGY = "single-timer";
private static final DelayedCleaner NOOP_CLEANER = new DelayedCleaner() {
// NOOP
};
@@ -74,25 +78,145 @@ public final class DelayedCachedOutputStreamCleaner
implements CachedOutputStrea
}
}
- private static final class DelayedCleanerImpl implements DelayedCleaner {
+ private static final class SingleTimerDelayedCleaner implements
DelayedCleaner {
+ private static final Timer TIMER = new
Timer("DelayedCachedOutputStreamCleaner", true);
+
private final long delay; /* default is 30 minutes, in milliseconds */
private final DelayQueue<DelayedCloseable> queue = new DelayQueue<>();
- private final Timer timer;
-
- DelayedCleanerImpl(final long delay) {
+ private final Object lock = new Object();
+ private TimerTask timerTask;
+
+ SingleTimerDelayedCleaner(final long delay) {
this.delay = delay;
- this.timer = new Timer("DelayedCachedOutputStreamCleaner", true);
- this.timer.scheduleAtFixedRate(new TimerTask() {
- @Override
- public void run() {
- clean();
+ }
+
+ @Override
+ public void register(final Closeable closeable) {
+ TimerTask newTimerTask = null;
+
+ synchronized (lock) {
+ queue.put(new DelayedCloseable(closeable, delay));
+ if (timerTask == null) {
+ timerTask = new TimerTask() {
+ @Override
+ public void run() {
+ clean();
+ }
+ };
+ newTimerTask = timerTask;
+ }
+ }
+
+ if (newTimerTask != null) {
+ TIMER.scheduleAtFixedRate(newTimerTask, 0, Math.max(MIN_DELAY,
delay >> 1));
+ }
+ }
+
+ @Override
+ public void unregister(final Closeable closeable) {
+ TimerTask oldTimerTask = null;
+ synchronized (lock) {
+ queue.remove(new DelayedCloseable(closeable, delay));
+ if (queue.isEmpty() && timerTask != null) {
+ oldTimerTask = timerTask;
+ timerTask = null;
+ }
+ }
+ if (oldTimerTask != null) {
+ oldTimerTask.cancel();
+ }
+ }
+
+ @Override
+ public void clean() {
+ final Collection<DelayedCloseable> closeables = new ArrayList<>();
+ TimerTask oldTimerTask = null;
+ synchronized (lock) {
+ queue.drainTo(closeables);
+ if (queue.isEmpty() && timerTask != null) {
+ oldTimerTask = timerTask;
+ timerTask = null;
+ }
+ }
+ if (oldTimerTask != null) {
+ oldTimerTask.cancel();
+ }
+ clean(closeables);
+ }
+
+ @Override
+ public void forceClean() {
+ TimerTask oldTimerTask = null;
+ synchronized (lock) {
+ clean(queue);
+ if (timerTask != null) {
+ oldTimerTask = timerTask;
+ timerTask = null;
+ }
+ }
+ if (oldTimerTask != null) {
+ oldTimerTask.cancel();
+ }
+ }
+
+ @Override
+ public void close() {
+ TimerTask oldTimerTask = null;
+ synchronized (lock) {
+ queue.clear();
+ if (timerTask != null) {
+ oldTimerTask = timerTask;
+ timerTask = null;
+ }
+ }
+ if (oldTimerTask != null) {
+ oldTimerTask.cancel();
+ }
+ }
+
+ @Override
+ public int size() {
+ return queue.size();
+ }
+
+ private void clean(Collection<DelayedCloseable> closeables) {
+ final Iterator<DelayedCloseable> iterator = closeables.iterator();
+ while (iterator.hasNext()) {
+ final DelayedCloseable next = iterator.next();
+ try {
+ iterator.remove();
+ LOG.warning("Unclosed (leaked?) stream detected: " +
next.closeable.hashCode());
+ next.closeable.close();
+ } catch (final IOException | RuntimeException ex) {
+ LOG.warning("Unable to close (leaked?) stream: " +
ex.getMessage());
}
- }, 0, Math.max(MIN_DELAY, delay >> 1));
+ }
+ }
+ }
+
+ private static final class DefaultDelayedCleaner implements DelayedCleaner
{
+ private final long delay; /* default is 30 minutes, in milliseconds */
+ private final DelayQueue<DelayedCloseable> queue = new DelayQueue<>();
+ private final AtomicBoolean initialized = new AtomicBoolean(false);
+ private volatile Timer timer;
+
+ DefaultDelayedCleaner(final long delay) {
+ this.delay = delay;
}
@Override
public void register(Closeable closeable) {
queue.put(new DelayedCloseable(closeable, delay));
+ // Initialize timer lazily only when at least one closeable is
registered
+ if (initialized.compareAndSet(false, true)) {
+ this.timer = new Timer("DelayedCachedOutputStreamCleaner",
true);
+ this.timer.scheduleAtFixedRate(new TimerTask() {
+ @Override
+ public void run() {
+ clean();
+ }
+ }, 0, Math.max(MIN_DELAY, delay >> 1));
+ }
}
@Override
@@ -114,7 +238,10 @@ public final class DelayedCachedOutputStreamCleaner
implements CachedOutputStrea
@Override
public void close() {
- timer.cancel();
+ final Timer t = timer;
+ if (t != null) {
+ t.cancel();
+ }
queue.clear();
}
@@ -186,11 +313,22 @@ public final class DelayedCachedOutputStreamCleaner
implements CachedOutputStrea
Number delayValue = null;
BusLifeCycleManager busLifeCycleManager = null;
Boolean cleanupOnShutdownValue = null;
+ Function<Long, DelayedCleaner> delayedCleanerStrategy =
DefaultDelayedCleaner::new;
if (bus != null) {
delayValue = (Number)
bus.getProperty(CachedConstants.CLEANER_DELAY_BUS_PROP);
cleanupOnShutdownValue = (Boolean)
bus.getProperty(CachedConstants.CLEANER_CLEAN_ON_SHUTDOWN_BUS_PROP);
busLifeCycleManager = bus.getExtension(BusLifeCycleManager.class);
+ final String strategy = (String)
bus.getProperty(CachedConstants.CLEANER_STRATEGY_BUS_PROP);
+ if (strategy == null ||
DEFAULT_STRATEGY.equalsIgnoreCase(strategy)) {
+ delayedCleanerStrategy = DefaultDelayedCleaner::new;
+ } else if (SINGLE_TIMER_STRATEGY.equalsIgnoreCase(strategy)) {
+ delayedCleanerStrategy = SingleTimerDelayedCleaner::new;
+ } else {
+ throw new IllegalArgumentException("The value of " +
CachedConstants.CLEANER_STRATEGY_BUS_PROP
+ + " property is invalid: " + strategy + " (should be " +
DEFAULT_STRATEGY + " or "
+ + SINGLE_TIMER_STRATEGY);
+ }
}
if (cleaner != null) {
@@ -205,11 +343,11 @@ public final class DelayedCachedOutputStreamCleaner
implements CachedOutputStrea
if (delayValue == null) {
// Default delay is set to 30 mins
- cleaner = new DelayedCleanerImpl(TimeUnit.MILLISECONDS.convert(30,
TimeUnit.MINUTES));
+ cleaner =
delayedCleanerStrategy.apply(TimeUnit.MILLISECONDS.convert(30,
TimeUnit.MINUTES));
} else {
final long value = delayValue.longValue();
if (value > 0 && value >= MIN_DELAY) {
- cleaner = new DelayedCleanerImpl(value); /* already in
milliseconds */
+ cleaner = delayedCleanerStrategy.apply(value); /* already in
milliseconds */
} else {
cleaner = NOOP_CLEANER;
if (value != 0) {
diff --git
a/core/src/test/java/org/apache/cxf/io/DelayedCachedOutputStreamCleanerTest.java
b/core/src/test/java/org/apache/cxf/io/DelayedCachedOutputStreamCleanerTest.java
index 562b3a18f25..6e408d66096 100644
---
a/core/src/test/java/org/apache/cxf/io/DelayedCachedOutputStreamCleanerTest.java
+++
b/core/src/test/java/org/apache/cxf/io/DelayedCachedOutputStreamCleanerTest.java
@@ -33,6 +33,8 @@ import org.apache.cxf.bus.extension.ExtensionManagerBus;
import org.junit.After;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
@@ -40,9 +42,20 @@ import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+@RunWith(Parameterized.class)
public class DelayedCachedOutputStreamCleanerTest {
+ private final String strategy;
private Bus bus;
+ public DelayedCachedOutputStreamCleanerTest(final String strategy) {
+ this.strategy = strategy;
+ }
+
+ @Parameterized.Parameters(name = CachedConstants.CLEANER_STRATEGY_BUS_PROP
+ "= {0}")
+ public static String[] strategies() throws Exception {
+ return new String[] {"default", "single-timer", null};
+ }
+
@After
public void tearDown() {
if (bus != null) {
@@ -50,21 +63,30 @@ public class DelayedCachedOutputStreamCleanerTest {
bus = null;
}
}
-
+
@Test
public void testNoop() {
final Map<String, Object> properties =
Collections.singletonMap(CachedConstants.CLEANER_DELAY_BUS_PROP, 0);
- bus = new ExtensionManagerBus(new HashMap<>(), properties);
+ bus = createBus(properties);
final CachedOutputStreamCleaner cleaner =
bus.getExtension(CachedOutputStreamCleaner.class);
assertThat(cleaner,
instanceOf(DelayedCachedOutputStreamCleaner.class)); /* noop */
assertNoopCleaner(cleaner);
}
-
+
+ @Test
+ public void testNoTimer() {
+ final Map<String, Object> properties =
Collections.singletonMap(CachedConstants.CLEANER_DELAY_BUS_PROP, 2500);
+ bus = createBus(properties);
+
+ final CachedOutputStreamCleaner cleaner =
bus.getExtension(CachedOutputStreamCleaner.class);
+ assertThat(cleaner,
instanceOf(DelayedCachedOutputStreamCleaner.class));
+ }
+
@Test
public void testForceClean() throws InterruptedException {
- bus = new ExtensionManagerBus();
+ bus = createBus();
final CachedOutputStreamCleaner cleaner =
bus.getExtension(CachedOutputStreamCleaner.class);
assertThat(cleaner,
instanceOf(DelayedCachedOutputStreamCleaner.class));
@@ -88,7 +110,7 @@ public class DelayedCachedOutputStreamCleanerTest {
/* Delay of 2.5 seconds */
final Map<String, Object> properties =
Collections.singletonMap(CachedConstants.CLEANER_DELAY_BUS_PROP, 2500);
- bus = new ExtensionManagerBus(new HashMap<>(), properties);
+ bus = createBus(properties);
final CachedOutputStreamCleaner cleaner =
bus.getExtension(CachedOutputStreamCleaner.class);
cleaner.register(closeable1);
@@ -101,7 +123,7 @@ public class DelayedCachedOutputStreamCleanerTest {
@Test
public void testForceCleanForEmpty() throws InterruptedException {
- bus = new ExtensionManagerBus();
+ bus = createBus();
final CachedOutputStreamCleaner cleaner =
bus.getExtension(CachedOutputStreamCleaner.class);
assertThat(cleaner,
instanceOf(DelayedCachedOutputStreamCleaner.class));
@@ -121,7 +143,7 @@ public class DelayedCachedOutputStreamCleanerTest {
@Test
public void testForceCleanException() throws InterruptedException {
- bus = new ExtensionManagerBus();
+ bus = createBus();
final CachedOutputStreamCleaner cleaner =
bus.getExtension(CachedOutputStreamCleaner.class);
assertThat(cleaner,
instanceOf(DelayedCachedOutputStreamCleaner.class));
@@ -149,7 +171,7 @@ public class DelayedCachedOutputStreamCleanerTest {
public void testCleanOnShutdown() throws InterruptedException {
/* Delay of 5 seconds */
final Map<String, Object> properties =
Collections.singletonMap(CachedConstants.CLEANER_DELAY_BUS_PROP, 5000);
- bus = new ExtensionManagerBus(new HashMap<>(), properties);
+ bus = createBus(properties);
final AtomicBoolean latch = new AtomicBoolean();
final Closeable closeable = () -> latch.compareAndSet(false, true);
@@ -170,7 +192,7 @@ public class DelayedCachedOutputStreamCleanerTest {
final Map<String, Object> properties = new HashMap<>();
properties.put(CachedConstants.CLEANER_DELAY_BUS_PROP, 3000); /* 3
seconds */
properties.put(CachedConstants.CLEANER_CLEAN_ON_SHUTDOWN_BUS_PROP,
false);
- bus = new ExtensionManagerBus(new HashMap<>(), properties);
+ bus = createBus(properties);
final AtomicBoolean latch = new AtomicBoolean();
final Closeable closeable = () -> latch.compareAndSet(false, true);
@@ -188,7 +210,7 @@ public class DelayedCachedOutputStreamCleanerTest {
@Test
public void testNegativeDelay() throws InterruptedException {
final Map<String, Object> properties =
Collections.singletonMap(CachedConstants.CLEANER_DELAY_BUS_PROP, -1);
- bus = new ExtensionManagerBus(new HashMap<>(), properties);
+ bus = createBus(properties);
final CachedOutputStreamCleaner cleaner =
bus.getExtension(CachedOutputStreamCleaner.class);
assertThat(cleaner,
instanceOf(DelayedCachedOutputStreamCleaner.class)); /* noop */
@@ -199,7 +221,7 @@ public class DelayedCachedOutputStreamCleanerTest {
@Test
public void testTooSmallDelay() throws InterruptedException {
final Map<String, Object> properties =
Collections.singletonMap(CachedConstants.CLEANER_DELAY_BUS_PROP, 1500);
- bus = new ExtensionManagerBus(new HashMap<>(), properties);
+ bus = createBus(properties);
final CachedOutputStreamCleaner cleaner =
bus.getExtension(CachedOutputStreamCleaner.class);
assertThat(cleaner,
instanceOf(DelayedCachedOutputStreamCleaner.class)); /* noop */
@@ -215,7 +237,7 @@ public class DelayedCachedOutputStreamCleanerTest {
/* Delay of 5 seconds */
final Map<String, Object> properties =
Collections.singletonMap(CachedConstants.CLEANER_DELAY_BUS_PROP, 2500);
- bus = new ExtensionManagerBus(new HashMap<>(), properties);
+ bus = createBus(properties);
final CachedOutputStreamCleaner cleaner =
bus.getExtension(CachedOutputStreamCleaner.class);
cleaner.register(closeable1);
@@ -232,6 +254,18 @@ public class DelayedCachedOutputStreamCleanerTest {
assertThat(cleaner,
instanceOf(DelayedCachedOutputStreamCleaner.class));
}
+ private Bus createBus() {
+ return createBus(Collections.emptyMap());
+ }
+
+ private Bus createBus(Map<String, Object> properties) {
+ final Map<String, Object> combined = new HashMap<>(properties);
+ if (strategy != null) {
+ combined.put(CachedConstants.CLEANER_STRATEGY_BUS_PROP, strategy);
+ }
+ return new ExtensionManagerBus(new HashMap<>(), combined);
+ }
+
private void assertNoopCleaner(final CachedOutputStreamCleaner cleaner) {
final AtomicBoolean latch = new AtomicBoolean(false);
final Closeable closeable = () -> latch.compareAndSet(false, true);