Author: jbellis
Date: Thu Jan 14 03:18:34 2010
New Revision: 899042
URL: http://svn.apache.org/viewvc?rev=899042&view=rev
Log:
merge from 0.5 branch
Added:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/concurrent/
- copied from r899039,
incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/concurrent/
Modified:
incubator/cassandra/trunk/ (props changed)
incubator/cassandra/trunk/CHANGES.txt
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java
(props changed)
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Column.java
(props changed)
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java
(props changed)
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/NotFoundException.java
(props changed)
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/SuperColumn.java
(props changed)
incubator/cassandra/trunk/src/java/org/ (props changed)
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/WrappedRunnable.java
incubator/cassandra/trunk/test/unit/org/ (props changed)
Propchange: incubator/cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 14 03:18:34 2010
@@ -1,3 +1,3 @@
/incubator/cassandra/branches/cassandra-0.3:774578-796573
/incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5:888872-898853
+/incubator/cassandra/branches/cassandra-0.5:888872-899039
Modified: incubator/cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/CHANGES.txt?rev=899042&r1=899041&r2=899042&view=diff
==============================================================================
--- incubator/cassandra/trunk/CHANGES.txt (original)
+++ incubator/cassandra/trunk/CHANGES.txt Thu Jan 14 03:18:34 2010
@@ -5,6 +5,8 @@
0.5.0 final
* avoid attempting to delete temporary bootstrap files twice (CASSANDRA-681)
* fix bogus NaN in nodeprobe cfstats output (CASSANDRA-646)
+ * provide a policy for dealing with single thread executors w/ a full queue
+ (CASSANDRA-694)
0.5.0 RC3
Propchange:
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 14 03:18:34 2010
@@ -1,4 +1,4 @@
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Cassandra.java:888872-898853
+/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Cassandra.java:888872-899039
/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java:749219-768588
Propchange:
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 14 03:18:34 2010
@@ -1,5 +1,5 @@
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Column.java:888872-898853
+/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Column.java:888872-899039
/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Column.java:749219-794428
/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/column_t.java:749219-768588
Propchange:
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 14 03:18:34 2010
@@ -1,4 +1,4 @@
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:888872-898853
+/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:888872-899039
/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:749219-768588
Propchange:
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 14 03:18:34 2010
@@ -1,4 +1,4 @@
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:888872-898853
+/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:888872-899039
/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:749219-768588
Propchange:
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 14 03:18:34 2010
@@ -1,5 +1,5 @@
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:888872-898853
+/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:888872-899039
/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:749219-794428
/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:749219-768588
Propchange: incubator/cassandra/trunk/src/java/org/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 14 03:18:34 2010
@@ -1,4 +1,4 @@
/incubator/cassandra/branches/cassandra-0.3/src/java/org:774578-796573
/incubator/cassandra/branches/cassandra-0.4/src/java/org:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/src/java/org:888872-898853
+/incubator/cassandra/branches/cassandra-0.5/src/java/org:888872-899039
/incubator/cassandra/trunk/src/java/org:749219-769885
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=899042&r1=899041&r2=899042&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
Thu Jan 14 03:18:34 2010
@@ -1,71 +1,83 @@
-package org.apache.cassandra.concurrent;
-
-import java.util.concurrent.*;
-
-import org.apache.log4j.Logger;
-
-public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
-{
- protected static Logger logger =
Logger.getLogger(JMXEnabledThreadPoolExecutor.class);
-
- public DebuggableThreadPoolExecutor(String threadPoolName)
- {
- this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName));
- }
-
- public DebuggableThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory)
- {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory);
-
- if (maximumPoolSize > 1)
- {
- this.setRejectedExecutionHandler(new
ThreadPoolExecutor.CallerRunsPolicy());
- }
- else
- {
- // preserve task serialization. this is more complicated than it
needs to be,
- // since TPE rejects if queue.offer reports a full queue.
- // the easiest option (since most of TPE.execute deals with
private members)
- // appears to be to wrap the given queue class with one whose offer
- // simply delegates to put(). this would be ugly, since it
violates both
- // the spirit and letter of queue.offer, but effective.
- // so far, though, all our serialized executors use unbounded
queues,
- // so actually implementing this has not been necessary.
- this.setRejectedExecutionHandler(new RejectedExecutionHandler()
- {
- public void rejectedExecution(Runnable r, ThreadPoolExecutor
executor)
- {
- throw new AssertionError("Blocking serialized executor is
not yet implemented");
- }
- });
- }
- }
-
- public void afterExecute(Runnable r, Throwable t)
- {
- super.afterExecute(r,t);
-
- // exceptions wrapped by FutureTask
- if (r instanceof FutureTask)
- {
- try
- {
- ((FutureTask) r).get();
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
- catch (ExecutionException e)
- {
- logger.error("Error in executor futuretask", e);
- }
- }
-
- // exceptions for non-FutureTask runnables [i.e., added via execute()
instead of submit()]
- if (t != null)
- {
- logger.error("Error in ThreadPoolExecutor", t);
- }
- }
-}
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.*;
+
+import org.apache.log4j.Logger;
+
+public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
+{
+ protected static Logger logger =
Logger.getLogger(JMXEnabledThreadPoolExecutor.class);
+
+ public DebuggableThreadPoolExecutor(String threadPoolName)
+ {
+ this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName));
+ }
+
+ public DebuggableThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory)
+ {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory);
+
+ if (maximumPoolSize > 1)
+ {
+ // clearly strict serialization is not a requirement. just make
the calling thread execute.
+ this.setRejectedExecutionHandler(new
ThreadPoolExecutor.CallerRunsPolicy());
+ }
+ else
+ {
+ // preserve task serialization. this is more complicated than it
needs to be,
+ // since TPE rejects if queue.offer reports a full queue. we'll
just
+ // override this with a handler that retries until it gets in.
ugly, but effective.
+ // (there is an extensive analysis of the options here at
+ //
http://today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html)
+ this.setRejectedExecutionHandler(new RejectedExecutionHandler()
+ {
+ public void rejectedExecution(Runnable task,
ThreadPoolExecutor executor)
+ {
+ BlockingQueue<Runnable> queue = executor.getQueue();
+ while (true)
+ {
+ if (executor.isShutdown())
+ throw new
RejectedExecutionException("ThreadPoolExecutor has shut down");
+ try
+ {
+ if (queue.offer(task, 1000, TimeUnit.MILLISECONDS))
+ break;
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+ }
+ });
+ }
+ }
+
+ public void afterExecute(Runnable r, Throwable t)
+ {
+ super.afterExecute(r,t);
+
+ // exceptions wrapped by FutureTask
+ if (r instanceof FutureTask)
+ {
+ try
+ {
+ ((FutureTask) r).get();
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ catch (ExecutionException e)
+ {
+ logger.error("Error in executor futuretask", e);
+ }
+ }
+
+ // exceptions for non-FutureTask runnables [i.e., added via execute()
instead of submit()]
+ if (t != null)
+ {
+ logger.error("Error in ThreadPoolExecutor", t);
+ }
+ }
+}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=899042&r1=899041&r2=899042&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Thu Jan 14 03:18:34 2010
@@ -80,7 +80,7 @@
Runtime.getRuntime().availableProcessors(),
Integer.MAX_VALUE,
TimeUnit.SECONDS,
- new
LinkedBlockingQueue<Runnable>(2 * Runtime.getRuntime().availableProcessors()),
+ new
LinkedBlockingQueue<Runnable>(1 + 2 *
Runtime.getRuntime().availableProcessors()),
new
NamedThreadFactory("FLUSH-SORTER-POOL"));
private static ExecutorService flushWriter_
= new
JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getAllDataFileLocations().length,
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/WrappedRunnable.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/WrappedRunnable.java?rev=899042&r1=899041&r2=899042&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/WrappedRunnable.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/WrappedRunnable.java
Thu Jan 14 03:18:34 2010
@@ -1,18 +1,39 @@
-package org.apache.cassandra.utils;
-
-public abstract class WrappedRunnable implements Runnable
-{
- public final void run()
- {
- try
- {
- runMayThrow();
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- }
-
- abstract protected void runMayThrow() throws Exception;
-}
+package org.apache.cassandra.utils;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+public abstract class WrappedRunnable implements Runnable
+{
+ public final void run()
+ {
+ try
+ {
+ runMayThrow();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ abstract protected void runMayThrow() throws Exception;
+}
Propchange: incubator/cassandra/trunk/test/unit/org/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 14 03:18:34 2010
@@ -1,4 +1,4 @@
/incubator/cassandra/branches/cassandra-0.3/test/unit/org:774578-796573
/incubator/cassandra/branches/cassandra-0.4/test/unit/org:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/test/unit/org:888872-898853
+/incubator/cassandra/branches/cassandra-0.5/test/unit/org:888872-899039
/incubator/cassandra/trunk/test/unit/org:749219-768583