Author: jbellis
Date: Tue May 17 22:14:35 2011
New Revision: 1104598
URL: http://svn.apache.org/viewvc?rev=1104598&view=rev
Log:
add per-callback timeouts to ExpiringMap
Modified:
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/utils/ExpiringMap.java
Modified:
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/net/MessagingService.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/net/MessagingService.java?rev=1104598&r1=1104597&r2=1104598&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/net/MessagingService.java
(original)
+++
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/net/MessagingService.java
Tue May 17 22:14:35 2011
@@ -83,6 +83,7 @@ public final class MessagingService impl
private final SimpleCondition listenGate;
private final Map<StorageService.Verb, AtomicInteger> droppedMessages =
new EnumMap<StorageService.Verb, AtomicInteger>(StorageService.Verb.class);
private final List<ILatencySubscriber> subscribers = new
ArrayList<ILatencySubscriber>();
+ private static final long DEFAULT_CALLBACK_TIMEOUT = (long) (1.1 *
DatabaseDescriptor.getRpcTimeout());
{
for (StorageService.Verb verb : StorageService.Verb.values())
@@ -121,7 +122,7 @@ public final class MessagingService impl
return null;
}
};
- callbacks = new ExpiringMap<String, Pair<InetAddress,
IMessageCallback>>((long) (1.1 * DatabaseDescriptor.getRpcTimeout()),
timeoutReporter);
+ callbacks = new ExpiringMap<String, Pair<InetAddress,
IMessageCallback>>(DEFAULT_CALLBACK_TIMEOUT, timeoutReporter);
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
try
@@ -256,7 +257,12 @@ public final class MessagingService impl
private void addCallback(IMessageCallback cb, String messageId,
InetAddress to)
{
- Pair<InetAddress, IMessageCallback> previous =
callbacks.put(messageId, new Pair<InetAddress, IMessageCallback>(to, cb));
+ addCallback(cb, messageId, to, DEFAULT_CALLBACK_TIMEOUT);
+ }
+
+ private void addCallback(IMessageCallback cb, String messageId,
InetAddress to, long timeout)
+ {
+ Pair<InetAddress, IMessageCallback> previous =
callbacks.put(messageId, new Pair<InetAddress, IMessageCallback>(to, cb),
timeout);
assert previous == null;
}
@@ -267,6 +273,14 @@ public final class MessagingService impl
return Integer.toString(idGen.incrementAndGet());
}
+ /*
+ * @see #sendRR(Message message, InetAddress to, IMessageCallback cb, long
timeout)
+ */
+ public String sendRR(Message message, InetAddress to, IMessageCallback cb)
+ {
+ return sendRR(message, to, cb, DEFAULT_CALLBACK_TIMEOUT);
+ }
+
/**
* Send a message to a given endpoint. This method specifies a callback
* which is invoked with the actual response.
@@ -275,12 +289,13 @@ public final class MessagingService impl
* @param cb callback interface which is used to pass the responses or
* suggest that a timeout occurred to the invoker of the send().
* suggest that a timeout occurred to the invoker of the send().
+ * @param timeout the timeout used for expiration
* @return an reference to message id used to match with the result
*/
- public String sendRR(Message message, InetAddress to, IMessageCallback cb)
+ public String sendRR(Message message, InetAddress to, IMessageCallback cb,
long timeout)
{
String id = nextId();
- addCallback(cb, id, to);
+ addCallback(cb, id, to, timeout);
sendOneWay(message, id, to);
return id;
}
@@ -624,4 +639,9 @@ public final class MessagingService impl
completedTasks.put(entry.getKey().getHostAddress(),
entry.getValue().ackCon.getCompletedMesssages());
return completedTasks;
}
+
+ public static long getDefaultCallbackTimeout()
+ {
+ return DEFAULT_CALLBACK_TIMEOUT;
+ }
}
Modified:
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/utils/ExpiringMap.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/utils/ExpiringMap.java?rev=1104598&r1=1104597&r2=1104598&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/utils/ExpiringMap.java
(original)
+++
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/utils/ExpiringMap.java
Tue May 17 22:14:35 2011
@@ -32,11 +32,13 @@ public class ExpiringMap<K, V>
{
private final T value;
private final long age;
+ private final long expiration;
- CacheableObject(T o)
+ CacheableObject(T o, long e)
{
assert o != null;
value = o;
+ expiration = e;
age = System.currentTimeMillis();
}
@@ -45,26 +47,21 @@ public class ExpiringMap<K, V>
return value;
}
- boolean isReadyToDie(long expiration)
+ boolean isReadyToDie(long start)
{
- return ((System.currentTimeMillis() - age) > expiration);
+ return ((start - age) > expiration);
}
}
private class CacheMonitor extends TimerTask
{
- private final long expiration;
-
- CacheMonitor(long expiration)
- {
- this.expiration = expiration;
- }
public void run()
{
+ long start = System.currentTimeMillis();
for (Map.Entry<K, CacheableObject<V>> entry : cache.entrySet())
{
- if (entry.getValue().isReadyToDie(expiration))
+ if (entry.getValue().isReadyToDie(start))
{
cache.remove(entry.getKey());
if (postExpireHook != null)
@@ -77,6 +74,7 @@ public class ExpiringMap<K, V>
private final NonBlockingHashMap<K, CacheableObject<V>> cache = new
NonBlockingHashMap<K, CacheableObject<V>>();
private final Timer timer;
private static int counter = 0;
+ private final long expiration;
public ExpiringMap(long expiration)
{
@@ -90,13 +88,15 @@ public class ExpiringMap<K, V>
public ExpiringMap(long expiration, Function<Pair<K,V>, ?> postExpireHook)
{
this.postExpireHook = postExpireHook;
+ this.expiration = expiration;
+
if (expiration <= 0)
{
throw new IllegalArgumentException("Argument specified must be a
positive number");
}
timer = new Timer("EXPIRING-MAP-TIMER-" + (++counter), true);
- timer.schedule(new CacheMonitor(expiration), expiration / 2,
expiration / 2);
+ timer.schedule(new CacheMonitor(), expiration / 2, expiration / 2);
}
public void shutdown()
@@ -106,7 +106,12 @@ public class ExpiringMap<K, V>
public V put(K key, V value)
{
- CacheableObject<V> previous = cache.put(key, new
CacheableObject<V>(value));
+ return put(key, value, this.expiration);
+ }
+
+ public V put(K key, V value, long timeout)
+ {
+ CacheableObject<V> previous = cache.put(key, new
CacheableObject<V>(value, timeout));
return (previous == null) ? null : previous.getValue();
}