Author: jbellis
Date: Tue May 17 22:14:35 2011
New Revision: 1104598

URL: http://svn.apache.org/viewvc?rev=1104598&view=rev
Log:
add per-callback timeouts to ExpiringMap

Modified:
    
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/net/MessagingService.java
    
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/utils/ExpiringMap.java

Modified: 
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/net/MessagingService.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/net/MessagingService.java?rev=1104598&r1=1104597&r2=1104598&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/net/MessagingService.java
 (original)
+++ 
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/net/MessagingService.java
 Tue May 17 22:14:35 2011
@@ -83,6 +83,7 @@ public final class MessagingService impl
     private final SimpleCondition listenGate;
     private final Map<StorageService.Verb, AtomicInteger> droppedMessages = 
new EnumMap<StorageService.Verb, AtomicInteger>(StorageService.Verb.class);
     private final List<ILatencySubscriber> subscribers = new 
ArrayList<ILatencySubscriber>();
+    private static final long DEFAULT_CALLBACK_TIMEOUT = (long) (1.1 * 
DatabaseDescriptor.getRpcTimeout());
 
     {
         for (StorageService.Verb verb : StorageService.Verb.values())
@@ -121,7 +122,7 @@ public final class MessagingService impl
                 return null;
             }
         };
-        callbacks = new ExpiringMap<String, Pair<InetAddress, 
IMessageCallback>>((long) (1.1 * DatabaseDescriptor.getRpcTimeout()), 
timeoutReporter);
+        callbacks = new ExpiringMap<String, Pair<InetAddress, 
IMessageCallback>>(DEFAULT_CALLBACK_TIMEOUT, timeoutReporter);
 
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         try
@@ -256,7 +257,12 @@ public final class MessagingService impl
 
     private void addCallback(IMessageCallback cb, String messageId, 
InetAddress to)
     {
-        Pair<InetAddress, IMessageCallback> previous = 
callbacks.put(messageId, new Pair<InetAddress, IMessageCallback>(to, cb));
+        addCallback(cb, messageId, to, DEFAULT_CALLBACK_TIMEOUT);
+    }
+
+    private void addCallback(IMessageCallback cb, String messageId, 
InetAddress to, long timeout)
+    {
+        Pair<InetAddress, IMessageCallback> previous = 
callbacks.put(messageId, new Pair<InetAddress, IMessageCallback>(to, cb), 
timeout);
         assert previous == null;
     }
     
@@ -267,6 +273,14 @@ public final class MessagingService impl
         return Integer.toString(idGen.incrementAndGet());
     }
 
+    /*
+     * @see #sendRR(Message message, InetAddress to, IMessageCallback cb, long 
timeout)
+     */
+    public String sendRR(Message message, InetAddress to, IMessageCallback cb)
+    {
+        return sendRR(message, to, cb, DEFAULT_CALLBACK_TIMEOUT);
+    }
+
     /**
      * Send a message to a given endpoint. This method specifies a callback
      * which is invoked with the actual response.
@@ -275,12 +289,13 @@ public final class MessagingService impl
      * @param cb callback interface which is used to pass the responses or
      *           suggest that a timeout occurred to the invoker of the send().
      *           suggest that a timeout occurred to the invoker of the send().
+     * @param timeout the timeout used for expiration
      * @return an reference to message id used to match with the result
      */
-    public String sendRR(Message message, InetAddress to, IMessageCallback cb)
+    public String sendRR(Message message, InetAddress to, IMessageCallback cb, 
long timeout)
     {
         String id = nextId();
-        addCallback(cb, id, to);
+        addCallback(cb, id, to, timeout);
         sendOneWay(message, id, to);
         return id;
     }
@@ -624,4 +639,9 @@ public final class MessagingService impl
             completedTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().ackCon.getCompletedMesssages());
         return completedTasks;
     }
+
+    public static long getDefaultCallbackTimeout()
+    {
+        return DEFAULT_CALLBACK_TIMEOUT;
+    }
 }

Modified: 
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/utils/ExpiringMap.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/utils/ExpiringMap.java?rev=1104598&r1=1104597&r2=1104598&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/utils/ExpiringMap.java
 (original)
+++ 
cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/utils/ExpiringMap.java
 Tue May 17 22:14:35 2011
@@ -32,11 +32,13 @@ public class ExpiringMap<K, V>
     {
         private final T value;
         private final long age;
+        private final long expiration;
 
-        CacheableObject(T o)
+        CacheableObject(T o, long e)
         {
             assert o != null;
             value = o;
+            expiration = e;
             age = System.currentTimeMillis();
         }
 
@@ -45,26 +47,21 @@ public class ExpiringMap<K, V>
             return value;
         }
 
-        boolean isReadyToDie(long expiration)
+        boolean isReadyToDie(long start)
         {
-            return ((System.currentTimeMillis() - age) > expiration);
+            return ((start - age) > expiration);
         }
     }
 
     private class CacheMonitor extends TimerTask
     {
-        private final long expiration;
-
-        CacheMonitor(long expiration)
-        {
-            this.expiration = expiration;
-        }
 
         public void run()
         {
+            long start = System.currentTimeMillis();
             for (Map.Entry<K, CacheableObject<V>> entry : cache.entrySet())
             {
-                if (entry.getValue().isReadyToDie(expiration))
+                if (entry.getValue().isReadyToDie(start))
                 {
                     cache.remove(entry.getKey());
                     if (postExpireHook != null)
@@ -77,6 +74,7 @@ public class ExpiringMap<K, V>
     private final NonBlockingHashMap<K, CacheableObject<V>> cache = new 
NonBlockingHashMap<K, CacheableObject<V>>();
     private final Timer timer;
     private static int counter = 0;
+    private final long expiration;
 
     public ExpiringMap(long expiration)
     {
@@ -90,13 +88,15 @@ public class ExpiringMap<K, V>
     public ExpiringMap(long expiration, Function<Pair<K,V>, ?> postExpireHook)
     {
         this.postExpireHook = postExpireHook;
+        this.expiration = expiration;
+
         if (expiration <= 0)
         {
             throw new IllegalArgumentException("Argument specified must be a 
positive number");
         }
 
         timer = new Timer("EXPIRING-MAP-TIMER-" + (++counter), true);
-        timer.schedule(new CacheMonitor(expiration), expiration / 2, 
expiration / 2);
+        timer.schedule(new CacheMonitor(), expiration / 2, expiration / 2);
     }
 
     public void shutdown()
@@ -106,7 +106,12 @@ public class ExpiringMap<K, V>
 
     public V put(K key, V value)
     {
-        CacheableObject<V> previous = cache.put(key, new 
CacheableObject<V>(value));
+        return put(key, value, this.expiration);
+    }
+
+    public V put(K key, V value, long timeout)
+    {
+        CacheableObject<V> previous = cache.put(key, new 
CacheableObject<V>(value, timeout));
         return (previous == null) ? null : previous.getValue();
     }
 


Reply via email to