Repository: cassandra Updated Branches: refs/heads/cassandra-2.0 17278b3a3 -> d4ec31f21 refs/heads/cassandra-2.1 f7e690d7f -> 8234bc15f refs/heads/trunk 5b85be04c -> 90387f63b
Don't shut ExpiringMap down. Patch by Benedict, reviewed by brandonwilliams for CASSANDRA-6948 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d4ec31f2 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d4ec31f2 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d4ec31f2 Branch: refs/heads/cassandra-2.0 Commit: d4ec31f21eb83502bd523c057bce2d3b249b3d80 Parents: 17278b3 Author: Brandon Williams <[email protected]> Authored: Wed Apr 2 14:05:14 2014 -0500 Committer: Brandon Williams <[email protected]> Committed: Wed Apr 2 14:14:30 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/net/MessagingService.java | 13 +----- .../org/apache/cassandra/utils/ExpiringMap.java | 47 +++++--------------- .../apache/cassandra/service/RemoveTest.java | 1 - 4 files changed, 15 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4ec31f2/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 66196d0..9bbcf07 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.7 + * Don't shut ExpiringMap down (CASSANDRA-6948) * Restrict Windows to parallel repairs (CASSANDRA-6907) * (Hadoop) Allow manually specifying start/end tokens in CFIF (CASSANDRA-6436) * Fix NPE in MeteredFlusher (CASSANDRA-6820) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4ec31f2/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index ad86bbd..094e861 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -392,7 +392,6 @@ public final class MessagingService implements MessagingServiceMBean */ public void listen(InetAddress localEp) throws ConfigurationException { - callbacks.reset(); // hack to allow tests to stop/restart MS for (ServerSocket ss : getServerSockets(localEp)) { SocketThread th = new SocketThread(ss, "ACCEPT-" + localEp); @@ -536,7 +535,7 @@ public final class MessagingService implements MessagingServiceMBean { assert message.verb != Verb.MUTATION; // mutations need to call the overload with a ConsistencyLevel int messageId = nextId(); - CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout); + ExpiringMap.CacheableObject<CallbackInfo> previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout); assert previous == null : String.format("Callback already exists for id %d! (%s)", messageId, previous); return messageId; } @@ -545,7 +544,7 @@ public final class MessagingService implements MessagingServiceMBean { assert message.verb == Verb.MUTATION || message.verb == Verb.COUNTER_MUTATION; int messageId = nextId(); - CallbackInfo previous = callbacks.put(messageId, new WriteCallbackInfo(to, cb, message, callbackDeserializers.get(message.verb), consistencyLevel), timeout); + ExpiringMap.CacheableObject<CallbackInfo> previous = callbacks.put(messageId, new WriteCallbackInfo(to, cb, message, callbackDeserializers.get(message.verb), consistencyLevel), timeout); assert previous == null : String.format("Callback already exists for id %d! (%s)", messageId, previous); return messageId; } @@ -654,11 +653,6 @@ public final class MessagingService implements MessagingServiceMBean subscribers.add(subcriber); } - public void clearCallbacksUnsafe() - { - callbacks.reset(); - } - /** * Wait for callbacks and don't allow any more to be created (since they could require writing hints) */ @@ -668,9 +662,6 @@ public final class MessagingService implements MessagingServiceMBean // We may need to schedule hints on the mutation stage, so it's erroneous to shut down the mutation stage first assert !StageManager.getStage(Stage.MUTATION).isShutdown(); - // the important part - callbacks.shutdownBlocking(); - // attempt to humor tests that try to stop and restart MS try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4ec31f2/src/java/org/apache/cassandra/utils/ExpiringMap.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/ExpiringMap.java b/src/java/org/apache/cassandra/utils/ExpiringMap.java index 7eec40e..cbec808 100644 --- a/src/java/org/apache/cassandra/utils/ExpiringMap.java +++ b/src/java/org/apache/cassandra/utils/ExpiringMap.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -35,7 +36,6 @@ import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; public class ExpiringMap<K, V> { private static final Logger logger = LoggerFactory.getLogger(ExpiringMap.class); - private volatile boolean shutdown; public static class CacheableObject<T> { @@ -55,12 +55,17 @@ public class ExpiringMap<K, V> { return atNano - createdAt > TimeUnit.MILLISECONDS.toNanos(timeout); } + + public String toString() + { + return "CacheableObject(obj=" + value.toString() + ", deltaFromTimeout=" + (System.nanoTime() - (createdAt + TimeUnit.MILLISECONDS.toNanos(timeout))) + "ns)"; + } } // if we use more ExpiringMaps we may want to add multiple threads to this executor private static final ScheduledExecutorService service = new DebuggableScheduledThreadPoolExecutor("EXPIRING-MAP-REAPER"); - private final ConcurrentMap<K, CacheableObject<V>> cache = new ConcurrentHashMap<K, CacheableObject<V>>(); + private final ConcurrentMap<K, CacheableObject<V>> cache = new ConcurrentHashMap<>(); private final long defaultExpiration; public ExpiringMap(long defaultExpiration) @@ -81,7 +86,7 @@ public class ExpiringMap<K, V> throw new IllegalArgumentException("Argument specified must be a positive number"); } - Runnable runnable = new Runnable() + Runnable reaperTask = new Runnable() { public void run() { @@ -100,45 +105,17 @@ public class ExpiringMap<K, V> logger.trace("Expired {} entries", n); } }; - service.scheduleWithFixedDelay(runnable, defaultExpiration / 2, defaultExpiration / 2, TimeUnit.MILLISECONDS); + service.scheduleWithFixedDelay(reaperTask, defaultExpiration / 2, defaultExpiration / 2, TimeUnit.MILLISECONDS); } - public void shutdownBlocking() - { - service.shutdown(); - try - { - service.awaitTermination(defaultExpiration * 2, TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) - { - throw new AssertionError(e); - } - } - - public void reset() - { - shutdown = false; - cache.clear(); - } - - public V put(K key, V value) + public CacheableObject<V> put(K key, V value) { return put(key, value, this.defaultExpiration); } - public V put(K key, V value, long timeout) + public CacheableObject<V> put(K key, V value, long timeout) { - if (shutdown) - { - // StorageProxy isn't equipped to deal with "I'm nominally alive, but I can't send any messages out." - // So we'll just sit on this thread until the rest of the server shutdown completes. - // - // See comments in CustomTThreadPoolServer.serve, CASSANDRA-3335, and CASSANDRA-3727. - Uninterruptibles.sleepUninterruptibly(Long.MAX_VALUE, TimeUnit.NANOSECONDS); - } - CacheableObject<V> previous = cache.put(key, new CacheableObject<V>(value, timeout)); - return (previous == null) ? null : previous.value; + return cache.put(key, new CacheableObject<V>(value, timeout)); } public V get(K key) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4ec31f2/test/unit/org/apache/cassandra/service/RemoveTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java index 62dd636..82e5e9e 100644 --- a/test/unit/org/apache/cassandra/service/RemoveTest.java +++ b/test/unit/org/apache/cassandra/service/RemoveTest.java @@ -92,7 +92,6 @@ public class RemoveTest public void tearDown() { SinkManager.clear(); - MessagingService.instance().clearCallbacksUnsafe(); MessagingService.instance().shutdown(); }
