Author: jbellis
Date: Thu Dec 8 20:33:15 2011
New Revision: 1212092
URL: http://svn.apache.org/viewvc?rev=1212092&view=rev
Log:
improve UserInterruptedException encapsulation (and renamed to
CompactionInterruptedException)
patch by jbellis; reviewed by slebresne for CASSANDRA-3582
Added:
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionInterruptedException.java
Removed:
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/UserInterruptedException.java
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1212092&r1=1212091&r2=1212092&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Dec 8 20:33:15 2011
@@ -1,6 +1,6 @@
1.1-dev
* multi-dc replication optimization supporting CL > ONE (CASSANDRA-3577)
- * add command to stop compactions (CASSANDRA-1740, 3566)
+ * add command to stop compactions (CASSANDRA-1740, 3566, 3582)
* multithreaded streaming (CASSANDRA-3494)
* removed in-tree redhat spec (CASSANDRA-3567)
* "defragment" rows for name-based queries under STCS, again (CASSANDRA-2503)
Modified:
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=1212092&r1=1212091&r2=1212092&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
Thu Dec 8 20:33:15 2011
@@ -23,7 +23,6 @@ package org.apache.cassandra.concurrent;
import java.util.concurrent.*;
-import org.apache.cassandra.db.compaction.UserInterruptedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -117,6 +116,23 @@ public class DebuggableThreadPoolExecuto
public static void logExceptionsAfterExecute(Runnable r, Throwable t)
{
+ if (t == null)
+ t = extractThrowable(r);
+
+ if (t != null)
+ handleOrLog(t);
+ }
+
+ public static void handleOrLog(Throwable t)
+ {
+ if (Thread.getDefaultUncaughtExceptionHandler() == null)
+ logger.error("Error in ThreadPoolExecutor", t);
+ else
+
Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(),
t);
+ }
+
+ public static Throwable extractThrowable(Runnable r)
+ {
// Check for exceptions wrapped by FutureTask. We do this by calling
get(), which will
// cause it to throw any saved exception.
//
@@ -138,23 +154,11 @@ public class DebuggableThreadPoolExecuto
}
catch (ExecutionException e)
{
- Throwable actualException = e.getCause();
- if (actualException instanceof UserInterruptedException)
- logger.info("Task interrupted by user: " +
actualException);
- else if (Thread.getDefaultUncaughtExceptionHandler() == null)
- logger.error("Error in ThreadPoolExecutor",
actualException);
- else
-
Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(),
actualException);
+ return e.getCause();
}
}
- // exceptions for non-FutureTask runnables [i.e., added via execute()
instead of submit()]
- if (t != null)
- {
- if (Thread.getDefaultUncaughtExceptionHandler() == null)
- logger.error("Error in ThreadPoolExecutor", t);
- else
-
Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(),
t);
- }
+ return null;
}
+
}
Added:
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionInterruptedException.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionInterruptedException.java?rev=1212092&view=auto
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionInterruptedException.java
(added)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionInterruptedException.java
Thu Dec 8 20:33:15 2011
@@ -0,0 +1,11 @@
+package org.apache.cassandra.db.compaction;
+
+public class CompactionInterruptedException extends RuntimeException
+{
+ private static final long serialVersionUID = -8651427062512310398L;
+
+ public CompactionInterruptedException(CompactionInfo info)
+ {
+ super("Compaction interrupted: " + info);
+ }
+}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1212092&r1=1212091&r2=1212092&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
Thu Dec 8 20:33:15 2011
@@ -488,7 +488,7 @@ public class CompactionManager implement
while (!dataFile.isEOF())
{
if (scrubInfo.isStopped())
- throw new
UserInterruptedException(scrubInfo.getCompactionInfo());
+ throw new
CompactionInterruptedException(scrubInfo.getCompactionInfo());
long rowStart = dataFile.getFilePointer();
if (logger.isDebugEnabled())
logger.debug("Reading row at " + rowStart);
@@ -696,7 +696,7 @@ public class CompactionManager implement
while (scanner.hasNext())
{
if (ci.isStopped())
- throw new
UserInterruptedException(ci.getCompactionInfo());
+ throw new
CompactionInterruptedException(ci.getCompactionInfo());
SSTableIdentityIterator row = (SSTableIdentityIterator)
scanner.next();
if (Range.isInRanges(row.getKey().token, ranges))
{
@@ -827,7 +827,7 @@ public class CompactionManager implement
while (nni.hasNext())
{
if (ci.isStopped())
- throw new UserInterruptedException(ci.getCompactionInfo());
+ throw new
CompactionInterruptedException(ci.getCompactionInfo());
AbstractCompactedRow row = nni.next();
validator.add(row);
}
@@ -975,7 +975,7 @@ public class CompactionManager implement
return CompactionExecutor.compactions.size();
}
- private static class CompactionExecutor extends
DebuggableThreadPoolExecutor implements CompactionExecutorStatsCollector
+ private static class CompactionExecutor extends ThreadPoolExecutor
implements CompactionExecutorStatsCollector
{
// a synchronized identity set of running tasks to their compaction
info
private static final Set<CompactionInfo.Holder> compactions =
Collections.synchronizedSet(Collections.newSetFromMap(new
IdentityHashMap<CompactionInfo.Holder, Boolean>()));
@@ -983,6 +983,7 @@ public class CompactionManager implement
protected CompactionExecutor(int minThreads, int maxThreads, String
name, BlockingQueue<Runnable> queue)
{
super(minThreads, maxThreads, 60, TimeUnit.SECONDS, queue, new
NamedThreadFactory(name, Thread.MIN_PRIORITY));
+ allowCoreThreadTimeOut(true);
}
private CompactionExecutor(int threadCount, String name)
@@ -1009,6 +1010,29 @@ public class CompactionManager implement
{
return new ArrayList<CompactionInfo.Holder>(compactions);
}
+
+ // modified from DebuggableThreadPoolExecutor so that
CompactionInterruptedExceptions are not logged
+ @Override
+ public void afterExecute(Runnable r, Throwable t)
+ {
+ super.afterExecute(r,t);
+
+ if (t == null)
+ t = DebuggableThreadPoolExecutor.extractThrowable(r);
+
+ if (t != null)
+ {
+ if (t instanceof CompactionInterruptedException)
+ {
+ logger.info(t.getMessage());
+ logger.debug("Full interruption stack trace:", t);
+ }
+ else
+ {
+ DebuggableThreadPoolExecutor.handleOrLog(t);
+ }
+ }
+ }
}
private static class ValidationExecutor extends CompactionExecutor
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java?rev=1212092&r1=1212091&r2=1212092&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
Thu Dec 8 20:33:15 2011
@@ -153,7 +153,7 @@ public class CompactionTask extends Abst
while (nni.hasNext())
{
if (ci.isStopped())
- throw new UserInterruptedException(ci.getCompactionInfo());
+ throw new
CompactionInterruptedException(ci.getCompactionInfo());
AbstractCompactedRow row = nni.next();
if (row.isEmpty())
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java?rev=1212092&r1=1212091&r2=1212092&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
Thu Dec 8 20:33:15 2011
@@ -26,7 +26,7 @@ import org.apache.cassandra.db.Decorated
import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.compaction.CompactionInfo;
import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.db.compaction.UserInterruptedException;
+import org.apache.cassandra.db.compaction.CompactionInterruptedException;
import org.apache.cassandra.io.sstable.ReducingKeyIterator;
/**
@@ -60,7 +60,7 @@ public class SecondaryIndexBuilder exten
while (iter.hasNext())
{
if (isStopped())
- throw new UserInterruptedException(getCompactionInfo());
+ throw new CompactionInterruptedException(getCompactionInfo());
DecoratedKey<?> key = iter.next();
Table.indexRow(key, cfs, columns);
}