Author: jbellis
Date: Wed May 12 00:11:54 2010
New Revision: 943345
URL: http://svn.apache.org/viewvc?rev=943345&view=rev
Log:
clean up ExpiringMap and improve concurrency w/ NBHM instead of HashTable
patch by jbellis; reviewed by gdusbabek for CASSANDRA-1077
Modified:
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=943345&r1=943344&r2=943345&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Wed
May 12 00:11:54 2010
@@ -91,8 +91,8 @@ public class MessagingService implements
* which is the sum of the threads in the pool that adds shit into the
table and the
* pool that retrives the callback from here.
*/
- callbackMap_ = new ExpiringMap<String, IAsyncCallback>( 2 *
DatabaseDescriptor.getRpcTimeout() );
- taskCompletionMap_ = new ExpiringMap<String, IAsyncResult>( 2 *
DatabaseDescriptor.getRpcTimeout() );
+ callbackMap_ = new ExpiringMap<String, IAsyncCallback>((long) (1.1 *
DatabaseDescriptor.getRpcTimeout()));
+ taskCompletionMap_ = new ExpiringMap<String, IAsyncResult>((long) (1.1
* DatabaseDescriptor.getRpcTimeout()));
// read executor puts messages to deserialize on this.
messageDeserializerExecutor_ = new JMXEnabledThreadPoolExecutor(1,
Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java?rev=943345&r1=943344&r2=943345&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java Wed
May 12 00:11:54 2010
@@ -20,6 +20,7 @@ package org.apache.cassandra.utils;
import java.util.*;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,83 +28,59 @@ public class ExpiringMap<K, V>
{
private static final Logger logger =
LoggerFactory.getLogger(ExpiringMap.class);
- private class CacheableObject
+ private static class CacheableObject<T>
{
- private V value_;
- private long age_;
+ private final T value;
+ private final long age;
- CacheableObject(V o)
+ CacheableObject(T o)
{
- value_ = o;
- age_ = System.currentTimeMillis();
+ value = o;
+ age = System.currentTimeMillis();
}
- @Override
- public boolean equals(Object o)
- {
- return value_.equals(o);
- }
-
- @Override
- public int hashCode()
+ T getValue()
{
- return value_.hashCode();
- }
-
- V getValue()
- {
- return value_;
+ return value;
}
boolean isReadyToDie(long expiration)
{
- return ((System.currentTimeMillis() - age_) > expiration);
+ return ((System.currentTimeMillis() - age) > expiration);
}
}
private class CacheMonitor extends TimerTask
{
- private long expiration_;
+ private final long expiration;
CacheMonitor(long expiration)
{
- expiration_ = expiration;
+ this.expiration = expiration;
}
@Override
public void run()
{
- synchronized (cache_)
+ synchronized (cache)
{
- Enumeration<K> e = cache_.keys();
+ Enumeration<K> e = cache.keys();
while (e.hasMoreElements())
{
K key = e.nextElement();
- CacheableObject co = cache_.get(key);
- if (co != null && co.isReadyToDie(expiration_))
+ CacheableObject co = cache.get(key);
+ if (co != null && co.isReadyToDie(expiration))
{
- cache_.remove(key);
+ cache.remove(key);
}
}
}
}
}
- private Hashtable<K, CacheableObject> cache_;
- private Timer timer_;
- private static int counter_ = 0;
-
- private void init(long expiration)
- {
- if (expiration <= 0)
- {
- throw new IllegalArgumentException("Argument specified must be a
positive number");
- }
-
- cache_ = new Hashtable<K, CacheableObject>();
- timer_ = new Timer("CACHETABLE-TIMER-" + (++counter_), true);
- timer_.schedule(new CacheMonitor(expiration), expiration, expiration);
- }
+ private final NonBlockingHashMap<K, CacheableObject> cache = new
NonBlockingHashMap<K, CacheableObject>();
+ private final Timer timer;
+ private static int counter = 0;
/*
* Specify the TTL for objects in the cache
@@ -111,23 +88,29 @@ public class ExpiringMap<K, V>
*/
public ExpiringMap(long expiration)
{
- init(expiration);
+ if (expiration <= 0)
+ {
+ throw new IllegalArgumentException("Argument specified must be a
positive number");
+ }
+
+ timer = new Timer("EXPIRING-MAP-TIMER-" + (++counter), true);
+ timer.schedule(new CacheMonitor(expiration), expiration / 2,
expiration / 2);
}
public void shutdown()
{
- timer_.cancel();
+ timer.cancel();
}
public void put(K key, V value)
{
- cache_.put(key, new CacheableObject(value));
+ cache.put(key, new CacheableObject<V>(value));
}
public V get(K key)
{
V result = null;
- CacheableObject co = cache_.get(key);
+ CacheableObject<V> co = cache.get(key);
if (co != null)
{
result = co.getValue();
@@ -137,7 +120,7 @@ public class ExpiringMap<K, V>
public V remove(K key)
{
- CacheableObject co = cache_.remove(key);
+ CacheableObject<V> co = cache.remove(key);
V result = null;
if (co != null)
{
@@ -148,21 +131,21 @@ public class ExpiringMap<K, V>
public int size()
{
- return cache_.size();
+ return cache.size();
}
public boolean containsKey(K key)
{
- return cache_.containsKey(key);
+ return cache.containsKey(key);
}
public boolean isEmpty()
{
- return cache_.isEmpty();
+ return cache.isEmpty();
}
public Set<K> keySet()
{
- return cache_.keySet();
+ return cache.keySet();
}
}