Add a wrapper to the Runnables executed by DSTPE to prevent exceptions from killing the tasks. Replace ExpiringMap's timer with a DSTPE. patch by jbellis; reviewed by slebresne for CASSANDRA-3537
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/eac19fee Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/eac19fee Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/eac19fee Branch: refs/heads/trunk Commit: eac19fee0384fdab25ae6d4eaa065199ce04fb4d Parents: 257d36e Author: Jonathan Ellis <[email protected]> Authored: Fri Feb 24 09:24:19 2012 -0600 Committer: Jonathan Ellis <[email protected]> Committed: Fri Feb 24 09:25:39 2012 -0600 ---------------------------------------------------------------------- .../DebuggableScheduledThreadPoolExecutor.java | 45 ++++++++++++++- .../concurrent/DebuggableThreadPoolExecutor.java | 29 +++++++--- .../org/apache/cassandra/net/MessagingService.java | 2 +- .../cassandra/service/AbstractCassandraDaemon.java | 2 +- .../org/apache/cassandra/utils/ExpiringMap.java | 38 ++++++------- 5 files changed, 84 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/eac19fee/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java index f5f1565..fcbaf67 100644 --- a/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java +++ b/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java @@ -26,10 +26,15 @@ import java.util.concurrent.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Like DebuggableThreadPoolExecutor, DebuggableScheduledThreadPoolExecutor always + * logs exceptions from the tasks it is given, even if Future.get is never called elsewhere. + * + * DebuggableScheduledThreadPoolExecutor also catches exceptions during Task execution + * so that they don't supress subsequent invocations of the task. + */ public class DebuggableScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor { - private static Logger logger = LoggerFactory.getLogger(DebuggableScheduledThreadPoolExecutor.class); - public DebuggableScheduledThreadPoolExecutor(int corePoolSize, String threadPoolName, int priority) { super(corePoolSize, new NamedThreadFactory(threadPoolName, priority)); @@ -40,10 +45,46 @@ public class DebuggableScheduledThreadPoolExecutor extends ScheduledThreadPoolEx this(1, threadPoolName, Thread.NORM_PRIORITY); } + // We need this as well as the wrapper for the benefit of non-repeating tasks @Override public void afterExecute(Runnable r, Throwable t) { super.afterExecute(r,t); DebuggableThreadPoolExecutor.logExceptionsAfterExecute(r, t); } + + // override scheduling to supress exceptions that would cancel future executions + @Override + public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) + { + return super.scheduleAtFixedRate(new UncomplainingRunnable(command), initialDelay, period, unit); + } + + @Override + public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) + { + return super.scheduleWithFixedDelay(new UncomplainingRunnable(command), initialDelay, delay, unit); + } + + private static class UncomplainingRunnable implements Runnable + { + private final Runnable runnable; + + public UncomplainingRunnable(Runnable runnable) + { + this.runnable = runnable; + } + + public void run() + { + try + { + runnable.run(); + } + catch (Throwable e) + { + DebuggableThreadPoolExecutor.handleOrLog(e); + } + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/eac19fee/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java index 60265b1..29a457a 100644 --- a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java +++ b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java @@ -114,15 +114,26 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor logExceptionsAfterExecute(r, t); } + /** + * Send @param t and any exception wrapped by @param r to the default uncaught exception handler, + * or log them if none such is set up + */ public static void logExceptionsAfterExecute(Runnable r, Throwable t) { - if (t == null) - t = extractThrowable(r); - - if (t != null) + Throwable hiddenThrowable = extractThrowable(r); + if (hiddenThrowable != null) + handleOrLog(hiddenThrowable); + + // ThreadPoolExecutor will re-throw exceptions thrown by its Task (which will be seen by + // the default uncaught exception handler) so we only need to do anything if that handler + // isn't set up yet. + if (t != null && Thread.getDefaultUncaughtExceptionHandler() == null) handleOrLog(t); } + /** + * Send @param t to the default uncaught exception handler, or log it if none such is set up + */ public static void handleOrLog(Throwable t) { if (Thread.getDefaultUncaughtExceptionHandler() == null) @@ -131,18 +142,21 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), t); } - public static Throwable extractThrowable(Runnable r) + /** + * @return any exception wrapped by @param runnable, i.e., if it is a FutureTask + */ + public static Throwable extractThrowable(Runnable runnable) { // Check for exceptions wrapped by FutureTask. We do this by calling get(), which will // cause it to throw any saved exception. // // Complicating things, calling get() on a ScheduledFutureTask will block until the task // is cancelled. Hence, the extra isDone check beforehand. - if ((r instanceof Future<?>) && ((Future<?>) r).isDone()) + if ((runnable instanceof Future<?>) && ((Future<?>) runnable).isDone()) { try { - ((Future<?>) r).get(); + ((Future<?>) runnable).get(); } catch (InterruptedException e) { @@ -160,5 +174,4 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor return null; } - } http://git-wip-us.apache.org/repos/asf/cassandra/blob/eac19fee/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 0bb36fb..b0e7de9 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -543,7 +543,7 @@ public final class MessagingService implements MessagingServiceMBean assert !StageManager.getStage(Stage.MUTATION).isShutdown(); // the important part - callbacks.shutdown(); + callbacks.shutdownBlocking(); // attempt to humor tests that try to stop and restart MS try http://git-wip-us.apache.org/repos/asf/cassandra/blob/eac19fee/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java b/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java index 8cc85ae..f124f9f 100644 --- a/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java @@ -131,7 +131,7 @@ public abstract class AbstractCassandraDaemon implements CassandraDaemon public void uncaughtException(Thread t, Throwable e) { exceptions.incrementAndGet(); - logger.error("Fatal exception in thread " + t, e); + logger.error("Exception in thread " + t, e); for (Throwable e2 = e; e2 != null; e2 = e2.getCause()) { // some code, like FileChannel.map, will wrap an OutOfMemoryError in another exception http://git-wip-us.apache.org/repos/asf/cassandra/blob/eac19fee/src/java/org/apache/cassandra/utils/ExpiringMap.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/ExpiringMap.java b/src/java/org/apache/cassandra/utils/ExpiringMap.java index 000af72..ff9f2da 100644 --- a/src/java/org/apache/cassandra/utils/ExpiringMap.java +++ b/src/java/org/apache/cassandra/utils/ExpiringMap.java @@ -18,13 +18,16 @@ package org.apache.cassandra.utils; -import java.util.*; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import com.google.common.base.Function; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; import org.cliffc.high_scale_lib.NonBlockingHashMap; public class ExpiringMap<K, V> @@ -57,9 +60,10 @@ public class ExpiringMap<K, V> } } + // if we use more ExpiringMaps we may want to add multiple threads to this executor + private static final ScheduledExecutorService service = new DebuggableScheduledThreadPoolExecutor("EXPIRING-MAP-REAPER"); + private final NonBlockingHashMap<K, CacheableObject<V>> cache = new NonBlockingHashMap<K, CacheableObject<V>>(); - private final Timer timer; - private static int counter = 0; private final long defaultExpiration; public ExpiringMap(long defaultExpiration) @@ -80,8 +84,7 @@ public class ExpiringMap<K, V> throw new IllegalArgumentException("Argument specified must be a positive number"); } - timer = new Timer("EXPIRING-MAP-TIMER-" + (++counter), true); - TimerTask task = new TimerTask() + Runnable runnable = new Runnable() { public void run() { @@ -100,25 +103,20 @@ public class ExpiringMap<K, V> logger.trace("Expired {} entries", n); } }; - timer.schedule(task, defaultExpiration / 2, defaultExpiration / 2); + service.scheduleWithFixedDelay(runnable, defaultExpiration / 2, defaultExpiration / 2, TimeUnit.MILLISECONDS); } - public void shutdown() + public void shutdownBlocking() { - shutdown = true; - while (!cache.isEmpty()) + service.shutdown(); + try { - logger.trace("Waiting for {} entries before shutting down ExpiringMap", cache.size()); - try - { - Thread.sleep(100); - } - catch (InterruptedException e) - { - throw new AssertionError(e); - } + service.awaitTermination(defaultExpiration * 2, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + throw new AssertionError(e); } - timer.cancel(); } public void reset()
