Author: jbellis
Date: Wed Jan 13 22:23:44 2010
New Revision: 898973
URL: http://svn.apache.org/viewvc?rev=898973&view=rev
Log:
assumption that all single-threaded executors have an unbounded queue is no
longer valid. provide a policy for dealing with single thread executors w/ a
full queue. patch by jbellis; tested by Ryan Daum for CASSANDRA-694
Added:
incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/concurrent/
incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java
(with props)
Modified:
incubator/cassandra/branches/cassandra-0.5/CHANGES.txt
incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
Modified: incubator/cassandra/branches/cassandra-0.5/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/CHANGES.txt?rev=898973&r1=898972&r2=898973&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/CHANGES.txt (original)
+++ incubator/cassandra/branches/cassandra-0.5/CHANGES.txt Wed Jan 13 22:23:44
2010
@@ -1,6 +1,8 @@
0.5.0 final
* avoid attempting to delete temporary bootstrap files twice (CASSANDRA-681)
* fix bogus NaN in nodeprobe cfstats output (CASSANDRA-646)
+ * provide a policy for dealing with single thread executors w/ a full queue
+ (CASSANDRA-694)
0.5.0 RC3
Modified:
incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=898973&r1=898972&r2=898973&view=diff
==============================================================================
---
incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
(original)
+++
incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
Wed Jan 13 22:23:44 2010
@@ -65,23 +65,35 @@
if (maximumPoolSize > 1)
{
+ // clearly strict serialization is not a requirement. just make
the calling thread execute.
this.setRejectedExecutionHandler(new
ThreadPoolExecutor.CallerRunsPolicy());
}
else
{
// preserve task serialization. this is more complicated than it
needs to be,
- // since TPE rejects if queue.offer reports a full queue.
- // the easiest option (since most of TPE.execute deals with
private members)
- // appears to be to wrap the given queue class with one whose offer
- // simply delegates to put(). this would be ugly, since it
violates both
- // the spirit and letter of queue.offer, but effective.
- // so far, though, all our serialized executors use unbounded
queues,
- // so actually implementing this has not been necessary.
+ // since TPE rejects if queue.offer reports a full queue. we'll
just
+ // override this with a handler that retries until it gets in.
ugly, but effective.
+ // (there is an extensive analysis of the options here at
+ //
http://today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html)
this.setRejectedExecutionHandler(new RejectedExecutionHandler()
{
- public void rejectedExecution(Runnable r, ThreadPoolExecutor
executor)
+ public void rejectedExecution(Runnable task,
ThreadPoolExecutor executor)
{
- throw new AssertionError("Blocking serialized executor is
not yet implemented");
+ BlockingQueue<Runnable> queue = executor.getQueue();
+ while (true)
+ {
+ if (executor.isShutdown())
+ throw new
RejectedExecutionException("ThreadPoolExecutor has shut down");
+ try
+ {
+ if (queue.offer(task, 1000, TimeUnit.MILLISECONDS))
+ break;
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
}
});
}
Added:
incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java?rev=898973&view=auto
==============================================================================
---
incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java
(added)
+++
incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java
Wed Jan 13 22:23:44 2010
@@ -0,0 +1,40 @@
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import org.apache.cassandra.utils.WrappedRunnable;
+
+public class DebuggableThreadPoolExecutorTest
+{
+ @Test
+ public void testSerialization() throws InterruptedException
+ {
+ LinkedBlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>(1);
+ DebuggableThreadPoolExecutor executor = new
DebuggableThreadPoolExecutor(1,
+
1,
+
Integer.MAX_VALUE,
+
TimeUnit.MILLISECONDS,
+
q,
+
new NamedThreadFactory("TEST"));
+ WrappedRunnable runnable = new WrappedRunnable()
+ {
+ public void runMayThrow() throws InterruptedException
+ {
+ Thread.sleep(50);
+ }
+ };
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < 10; i++)
+ {
+ executor.submit(runnable);
+ }
+ assert q.size() > 0 : q.size();
+ while (executor.getCompletedTaskCount() < 10)
+ continue;
+ long delta = System.currentTimeMillis() - start;
+ assert delta >= 9 * 50 : delta;
+ }
+}
Propchange:
incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java
------------------------------------------------------------------------------
svn:eol-style = native