Author: xedin
Date: Tue Aug 16 20:47:22 2011
New Revision: 1158439
URL: http://svn.apache.org/viewvc?rev=1158439&view=rev
Log:
Fix missing logging for some exceptions
patch by Jonathan Ellis; reviewed by Pavel Yaskevich for CASSANDRA-2061
Added:
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
cassandra/trunk/src/java/org/apache/cassandra/concurrent/RetryingScheduledThreadPoolExecutor.java
cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
cassandra/trunk/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1158439&r1=1158438&r2=1158439&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Aug 16 20:47:22 2011
@@ -32,7 +32,7 @@
* refactoring of the secondary index api (CASSANDRA-2982)
* make CL > ONE reads wait for digest reconciliation before returning
(CASSANDRA-2494)
-
+ * fix missing logging for some exceptions (CASSANDRA-2061)
0.8.5
* fix NPE when encryption_options is unspecified (CASSANDRA-3007)
Added:
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java?rev=1158439&view=auto
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
(added)
+++
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
Tue Aug 16 20:47:22 2011
@@ -0,0 +1,49 @@
+package org.apache.cassandra.concurrent;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.util.concurrent.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DebuggableScheduledThreadPoolExecutor extends
ScheduledThreadPoolExecutor
+{
+ private static Logger logger =
LoggerFactory.getLogger(DebuggableScheduledThreadPoolExecutor.class);
+
+ public DebuggableScheduledThreadPoolExecutor(int corePoolSize, String
threadPoolName, int priority)
+ {
+ super(corePoolSize, new NamedThreadFactory(threadPoolName, priority));
+ }
+
+ public DebuggableScheduledThreadPoolExecutor(String threadPoolName)
+ {
+ this(1, threadPoolName, Thread.NORM_PRIORITY);
+ }
+
+ @Override
+ public void afterExecute(Runnable r, Throwable t)
+ {
+ super.afterExecute(r,t);
+ DebuggableThreadPoolExecutor.logExceptionsAfterExecute(r, t);
+ }
+}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=1158439&r1=1158438&r2=1158439&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
Tue Aug 16 20:47:22 2011
@@ -111,12 +111,16 @@ public class DebuggableThreadPoolExecuto
public static void logExceptionsAfterExecute(Runnable r, Throwable t)
{
- // exceptions wrapped by FutureTask
- if (r instanceof FutureTask<?>)
+ // Check for exceptions wrapped by FutureTask. We do this by calling
get(), which will
+ // cause it to throw any saved exception.
+ //
+ // Complicating things, calling get() on a ScheduledFutureTask will
block until the task
+ // is cancelled. Hence, the extra isDone check beforehand.
+ if ((r instanceof Future<?>) && ((Future<?>) r).isDone())
{
try
{
- ((FutureTask<?>) r).get();
+ ((Future<?>) r).get();
}
catch (InterruptedException e)
{
Modified:
cassandra/trunk/src/java/org/apache/cassandra/concurrent/RetryingScheduledThreadPoolExecutor.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/RetryingScheduledThreadPoolExecutor.java?rev=1158439&r1=1158438&r2=1158439&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/concurrent/RetryingScheduledThreadPoolExecutor.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/concurrent/RetryingScheduledThreadPoolExecutor.java
Tue Aug 16 20:47:22 2011
@@ -1,116 +0,0 @@
-package org.apache.cassandra.concurrent;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.util.concurrent.*;
-
-public class RetryingScheduledThreadPoolExecutor extends
ScheduledThreadPoolExecutor
-{
- public RetryingScheduledThreadPoolExecutor(String threadPoolName, int
priority)
- {
- this(1, threadPoolName, priority);
- }
-
- public RetryingScheduledThreadPoolExecutor(int corePoolSize, String
threadPoolName, int priority)
- {
- super(corePoolSize, new NamedThreadFactory(threadPoolName, priority));
- }
-
- public RetryingScheduledThreadPoolExecutor(String threadPoolName)
- {
- this(1, threadPoolName, Thread.NORM_PRIORITY);
- }
-
- @Override
- protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable,
RunnableScheduledFuture<V> task)
- {
- return new LoggingScheduledFuture<V>(task);
- }
-
- /**
- * Wraps RunnableScheduledFuture.run to log an error on exception rather
than kill the executor thread.
- * All the other methods just wrap the RSF counterpart.
- * @param <V>
- */
- private static class LoggingScheduledFuture<V> implements
RunnableScheduledFuture<V>
- {
- private final RunnableScheduledFuture<V> task;
-
- public LoggingScheduledFuture(RunnableScheduledFuture<V> task)
- {
- this.task = task;
- }
-
- public boolean isPeriodic()
- {
- return task.isPeriodic();
- }
-
- public long getDelay(TimeUnit unit)
- {
- return task.getDelay(unit);
- }
-
- public int compareTo(Delayed o)
- {
- return task.compareTo(o);
- }
-
- public void run()
- {
- try
- {
- task.run();
- }
- catch (Exception e)
- {
- if (Thread.getDefaultUncaughtExceptionHandler() != null)
-
Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(),
e.getCause());
- }
- }
-
- public boolean cancel(boolean mayInterruptIfRunning)
- {
- return task.cancel(mayInterruptIfRunning);
- }
-
- public boolean isCancelled()
- {
- return task.isCancelled();
- }
-
- public boolean isDone()
- {
- return task.isDone();
- }
-
- public V get() throws InterruptedException, ExecutionException
- {
- return task.get();
- }
-
- public V get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException
- {
- return task.get(timeout, unit);
- }
- }
-}
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1158439&r1=1158438&r2=1158439&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Tue Aug 16
20:47:22 2011
@@ -36,7 +36,7 @@ import org.cliffc.high_scale_lib.NonBloc
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.concurrent.RetryingScheduledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.net.Message;
@@ -56,7 +56,7 @@ import org.apache.cassandra.service.Stor
public class Gossiper implements IFailureDetectionEventListener
{
- private static final RetryingScheduledThreadPoolExecutor executor = new
RetryingScheduledThreadPoolExecutor("GossipTasks");
+ private static final DebuggableScheduledThreadPoolExecutor executor = new
DebuggableScheduledThreadPoolExecutor("GossipTasks");
static final ApplicationState[] STATES = ApplicationState.values();
static final List<String> DEAD_STATES =
Arrays.asList(VersionedValue.REMOVING_TOKEN, VersionedValue.REMOVED_TOKEN,
VersionedValue.STATUS_LEFT);
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1158439&r1=1158438&r2=1158439&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Tue Aug 16 20:47:22 2011
@@ -38,7 +38,7 @@ import org.apache.commons.lang.StringUti
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.concurrent.RetryingScheduledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
@@ -78,7 +78,7 @@ import org.apache.cassandra.utils.Wrappe
*/
public class StorageService implements IEndpointStateChangeSubscriber,
StorageServiceMBean
{
- private static Logger logger_ =
LoggerFactory.getLogger(StorageService.class);
+ private static Logger logger_ =
LoggerFactory.getLogger(StorageService.class);
public static final int RING_DELAY = 30 * 1000; // delay after which we
assume ring has stablized
@@ -151,12 +151,12 @@ public class StorageService implements I
/**
* This pool is used for periodic short (sub-second) tasks.
*/
- public static final RetryingScheduledThreadPoolExecutor scheduledTasks =
new RetryingScheduledThreadPoolExecutor("ScheduledTasks");
+ public static final DebuggableScheduledThreadPoolExecutor scheduledTasks
= new DebuggableScheduledThreadPoolExecutor("ScheduledTasks");
/**
* This pool is used by tasks that can have longer execution times, and
usually are non periodic.
*/
- public static final RetryingScheduledThreadPoolExecutor tasks = new
RetryingScheduledThreadPoolExecutor("NonPeriodicTasks");
+ public static final DebuggableScheduledThreadPoolExecutor tasks = new
DebuggableScheduledThreadPoolExecutor("NonPeriodicTasks");
/* This abstraction maintains the token/endpoint metadata information */
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java?rev=1158439&r1=1158438&r2=1158439&view=diff
==============================================================================
---
cassandra/trunk/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
(original)
+++
cassandra/trunk/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
Tue Aug 16 20:47:22 2011
@@ -35,7 +35,7 @@ public class DynamicEndpointSnitchTest
public void testSnitch() throws InterruptedException, IOException,
ConfigurationException
{
// do this because SS needs to be initialized before DES can work
properly.
- StorageService.instance.initClient();
+ StorageService.instance.initClient(0);
int sleeptime = 150;
DynamicEndpointSnitch dsnitch = new DynamicEndpointSnitch(new
SimpleSnitch());
InetAddress self = FBUtilities.getBroadcastAddress();