Author: xedin Date: Tue Aug 16 20:47:22 2011 New Revision: 1158439 URL: http://svn.apache.org/viewvc?rev=1158439&view=rev Log: Fix missing logging for some exceptions patch by Jonathan Ellis; reviewed by Pavel Yaskevich for CASSANDRA-2061
Added: cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java Modified: cassandra/trunk/CHANGES.txt cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java cassandra/trunk/src/java/org/apache/cassandra/concurrent/RetryingScheduledThreadPoolExecutor.java cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java cassandra/trunk/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1158439&r1=1158438&r2=1158439&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Tue Aug 16 20:47:22 2011 @@ -32,7 +32,7 @@ * refactoring of the secondary index api (CASSANDRA-2982) * make CL > ONE reads wait for digest reconciliation before returning (CASSANDRA-2494) - + * fix missing logging for some exceptions (CASSANDRA-2061) 0.8.5 * fix NPE when encryption_options is unspecified (CASSANDRA-3007) Added: cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java?rev=1158439&view=auto ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java (added) +++ cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java Tue Aug 16 20:47:22 2011 @@ -0,0 +1,49 @@ +package org.apache.cassandra.concurrent; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import java.util.concurrent.*; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DebuggableScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor +{ + private static Logger logger = LoggerFactory.getLogger(DebuggableScheduledThreadPoolExecutor.class); + + public DebuggableScheduledThreadPoolExecutor(int corePoolSize, String threadPoolName, int priority) + { + super(corePoolSize, new NamedThreadFactory(threadPoolName, priority)); + } + + public DebuggableScheduledThreadPoolExecutor(String threadPoolName) + { + this(1, threadPoolName, Thread.NORM_PRIORITY); + } + + @Override + public void afterExecute(Runnable r, Throwable t) + { + super.afterExecute(r,t); + DebuggableThreadPoolExecutor.logExceptionsAfterExecute(r, t); + } +} Modified: cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=1158439&r1=1158438&r2=1158439&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java Tue Aug 16 20:47:22 2011 @@ -111,12 +111,16 @@ public class DebuggableThreadPoolExecuto public static void logExceptionsAfterExecute(Runnable r, Throwable t) { - // exceptions wrapped by FutureTask - if (r instanceof FutureTask<?>) + // Check for exceptions wrapped by FutureTask. We do this by calling get(), which will + // cause it to throw any saved exception. + // + // Complicating things, calling get() on a ScheduledFutureTask will block until the task + // is cancelled. Hence, the extra isDone check beforehand. + if ((r instanceof Future<?>) && ((Future<?>) r).isDone()) { try { - ((FutureTask<?>) r).get(); + ((Future<?>) r).get(); } catch (InterruptedException e) { Modified: cassandra/trunk/src/java/org/apache/cassandra/concurrent/RetryingScheduledThreadPoolExecutor.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/RetryingScheduledThreadPoolExecutor.java?rev=1158439&r1=1158438&r2=1158439&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/concurrent/RetryingScheduledThreadPoolExecutor.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/concurrent/RetryingScheduledThreadPoolExecutor.java Tue Aug 16 20:47:22 2011 @@ -1,116 +0,0 @@ -package org.apache.cassandra.concurrent; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import java.util.concurrent.*; - -public class RetryingScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor -{ - public RetryingScheduledThreadPoolExecutor(String threadPoolName, int priority) - { - this(1, threadPoolName, priority); - } - - public RetryingScheduledThreadPoolExecutor(int corePoolSize, String threadPoolName, int priority) - { - super(corePoolSize, new NamedThreadFactory(threadPoolName, priority)); - } - - public RetryingScheduledThreadPoolExecutor(String threadPoolName) - { - this(1, threadPoolName, Thread.NORM_PRIORITY); - } - - @Override - protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) - { - return new LoggingScheduledFuture<V>(task); - } - - /** - * Wraps RunnableScheduledFuture.run to log an error on exception rather than kill the executor thread. - * All the other methods just wrap the RSF counterpart. - * @param <V> - */ - private static class LoggingScheduledFuture<V> implements RunnableScheduledFuture<V> - { - private final RunnableScheduledFuture<V> task; - - public LoggingScheduledFuture(RunnableScheduledFuture<V> task) - { - this.task = task; - } - - public boolean isPeriodic() - { - return task.isPeriodic(); - } - - public long getDelay(TimeUnit unit) - { - return task.getDelay(unit); - } - - public int compareTo(Delayed o) - { - return task.compareTo(o); - } - - public void run() - { - try - { - task.run(); - } - catch (Exception e) - { - if (Thread.getDefaultUncaughtExceptionHandler() != null) - Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), e.getCause()); - } - } - - public boolean cancel(boolean mayInterruptIfRunning) - { - return task.cancel(mayInterruptIfRunning); - } - - public boolean isCancelled() - { - return task.isCancelled(); - } - - public boolean isDone() - { - return task.isDone(); - } - - public V get() throws InterruptedException, ExecutionException - { - return task.get(); - } - - public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException - { - return task.get(timeout, unit); - } - } -} Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1158439&r1=1158438&r2=1158439&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Tue Aug 16 20:47:22 2011 @@ -36,7 +36,7 @@ import org.cliffc.high_scale_lib.NonBloc import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.concurrent.RetryingScheduledThreadPoolExecutor; +import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.net.Message; @@ -56,7 +56,7 @@ import org.apache.cassandra.service.Stor public class Gossiper implements IFailureDetectionEventListener { - private static final RetryingScheduledThreadPoolExecutor executor = new RetryingScheduledThreadPoolExecutor("GossipTasks"); + private static final DebuggableScheduledThreadPoolExecutor executor = new DebuggableScheduledThreadPoolExecutor("GossipTasks"); static final ApplicationState[] STATES = ApplicationState.values(); static final List<String> DEAD_STATES = Arrays.asList(VersionedValue.REMOVING_TOKEN, VersionedValue.REMOVED_TOKEN, VersionedValue.STATUS_LEFT); Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1158439&r1=1158438&r2=1158439&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Tue Aug 16 20:47:22 2011 @@ -38,7 +38,7 @@ import org.apache.commons.lang.StringUti import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.concurrent.RetryingScheduledThreadPoolExecutor; +import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.CFMetaData; @@ -78,7 +78,7 @@ import org.apache.cassandra.utils.Wrappe */ public class StorageService implements IEndpointStateChangeSubscriber, StorageServiceMBean { - private static Logger logger_ = LoggerFactory.getLogger(StorageService.class); + private static Logger logger_ = LoggerFactory.getLogger(StorageService.class); public static final int RING_DELAY = 30 * 1000; // delay after which we assume ring has stablized @@ -151,12 +151,12 @@ public class StorageService implements I /** * This pool is used for periodic short (sub-second) tasks. */ - public static final RetryingScheduledThreadPoolExecutor scheduledTasks = new RetryingScheduledThreadPoolExecutor("ScheduledTasks"); + public static final DebuggableScheduledThreadPoolExecutor scheduledTasks = new DebuggableScheduledThreadPoolExecutor("ScheduledTasks"); /** * This pool is used by tasks that can have longer execution times, and usually are non periodic. */ - public static final RetryingScheduledThreadPoolExecutor tasks = new RetryingScheduledThreadPoolExecutor("NonPeriodicTasks"); + public static final DebuggableScheduledThreadPoolExecutor tasks = new DebuggableScheduledThreadPoolExecutor("NonPeriodicTasks"); /* This abstraction maintains the token/endpoint metadata information */ Modified: cassandra/trunk/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java?rev=1158439&r1=1158438&r2=1158439&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java Tue Aug 16 20:47:22 2011 @@ -35,7 +35,7 @@ public class DynamicEndpointSnitchTest public void testSnitch() throws InterruptedException, IOException, ConfigurationException { // do this because SS needs to be initialized before DES can work properly. - StorageService.instance.initClient(); + StorageService.instance.initClient(0); int sleeptime = 150; DynamicEndpointSnitch dsnitch = new DynamicEndpointSnitch(new SimpleSnitch()); InetAddress self = FBUtilities.getBroadcastAddress();