Author: jbellis
Date: Thu Jan 27 22:28:06 2011
New Revision: 1064340

URL: http://svn.apache.org/viewvc?rev=1064340&view=rev
Log:
merge from 0.6

Removed:
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/ILatencyPublisher.java
Modified:
    cassandra/branches/cassandra-0.7/   (props changed)
    cassandra/branches/cassandra-0.7/CHANGES.txt
    
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
   (props changed)
    
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
   (props changed)
    
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
   (props changed)
    
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
   (props changed)
    
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
   (props changed)
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/AsyncResult.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IWriteResponseHandler.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ExpiringMap.java

Propchange: cassandra/branches/cassandra-0.7/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 27 22:28:06 2011
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6:922689-1055311,1056121,1057932
+/cassandra/branches/cassandra-0.6:922689-1055311,1056121,1057932,1064193
 /cassandra/branches/cassandra-0.7:1026516,1035666,1050269
 /cassandra/branches/cassandra-0.7.0:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1064340&r1=1064339&r2=1064340&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Thu Jan 27 22:28:06 2011
@@ -4,6 +4,8 @@
  * allow nodes to be up without being part of  normal traffic (CASSANDRA-1951)
  * fix CLI "show keyspaces" with null options on NTS (CASSANDRA-2049)
  * fix possible ByteBuffer race conditions (CASSANDRA-2066)
+ * reduce garbage generated by MessagingService to prevent load spikes
+   (CASSANDRA-2058)
 
 
 0.7.1

Propchange: 
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 27 22:28:06 2011
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1055311,1056121,1057932
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1055311,1056121,1057932,1064193
 
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516,1035666,1050269
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689

Propchange: 
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 27 22:28:06 2011
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1055311,1056121,1057932
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1055311,1056121,1057932,1064193
 
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516,1035666,1050269
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689

Propchange: 
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 27 22:28:06 2011
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1055311,1056121,1057932
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1055311,1056121,1057932,1064193
 
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516,1035666,1050269
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689

Propchange: 
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 27 22:28:06 2011
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1055311,1056121,1057932
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1055311,1056121,1057932,1064193
 
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516,1035666,1050269
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689

Propchange: 
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 27 22:28:06 2011
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1055311,1056121,1057932
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1055311,1056121,1057932,1064193
 
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516,1035666,1050269
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1064340&r1=1064339&r2=1064340&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
 Thu Jan 27 22:28:06 2011
@@ -124,7 +124,7 @@ public class HintedHandOffManager
             rm.add(cf);
             Message message = rm.makeRowMutationMessage();
             IWriteResponseHandler responseHandler =  
WriteResponseHandler.create(endpoint);
-            MessagingService.instance().sendRR(message, 
Arrays.asList(endpoint), responseHandler);
+            MessagingService.instance().sendRR(message, endpoint, 
responseHandler);
             try
             {
                 responseHandler.get();

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java?rev=1064340&r1=1064339&r2=1064340&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
 Thu Jan 27 22:28:06 2011
@@ -208,10 +208,9 @@ public class DynamicEndpointSnitch exten
             return;
         if (!registered)
         {
-                   ILatencyPublisher handler = (ILatencyPublisher) 
MessagingService.instance().getVerbHandler(StorageService.Verb.REQUEST_RESPONSE);
-            if (handler != null)
+            if (MessagingService.instance() != null)
             {
-                handler.register(this);
+                MessagingService.instance().register(this);
                 registered = true;
             }
 

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/AsyncResult.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/AsyncResult.java?rev=1064340&r1=1064339&r2=1064340&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/AsyncResult.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/AsyncResult.java
 Thu Jan 27 22:28:06 2011
@@ -94,8 +94,6 @@ class AsyncResult implements IAsyncResul
         {
             lock.unlock();
         }        
-
-        
MessagingService.instance().removeRegisteredCallback(response.getMessageId());
     }
 
     public InetAddress getFrom()

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java?rev=1064340&r1=1064339&r2=1064340&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
 Thu Jan 27 22:28:06 2011
@@ -27,7 +27,6 @@ import java.nio.channels.AsynchronousClo
 import java.nio.channels.ServerSocketChannel;
 import java.security.MessageDigest;
 import java.util.*;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -45,21 +44,22 @@ import org.apache.cassandra.concurrent.S
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.locator.ILatencyPublisher;
 import org.apache.cassandra.locator.ILatencySubscriber;
 import org.apache.cassandra.net.io.SerializerType;
 import org.apache.cassandra.net.sink.SinkManager;
 import org.apache.cassandra.service.GCInspector;
+import org.apache.cassandra.service.ReadCallback;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.FileStreamTask;
 import org.apache.cassandra.streaming.StreamHeader;
 import org.apache.cassandra.utils.ExpiringMap;
 import org.apache.cassandra.utils.GuidGenerator;
+import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.SimpleCondition;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.cliffc.high_scale_lib.NonBlockingHashSet;
 
-public final class MessagingService implements MessagingServiceMBean, 
ILatencyPublisher
+public final class MessagingService implements MessagingServiceMBean
 {
     private static final int version_ = 1;
     //TODO: make this parameter dynamic somehow.  Not sure if config is 
appropriate.
@@ -69,8 +69,7 @@ public final class MessagingService impl
     private static final int PROTOCOL_MAGIC = 0xCA552DFA;
 
     /* This records all the results mapped by message Id */
-    private final ExpiringMap<String, IMessageCallback> callbacks;
-    private final ConcurrentMap<String, Collection<InetAddress>> targets = new 
NonBlockingHashMap<String, Collection<InetAddress>>();
+    private final ExpiringMap<String, Pair<InetAddress, IMessageCallback>> 
callbacks;
 
     /* Lookup table for registering message handlers based on the verb. */
     private final Map<StorageService.Verb, IVerbHandler> verbHandlers_;
@@ -116,24 +115,16 @@ public final class MessagingService impl
         };
         StorageService.scheduledTasks.scheduleWithFixedDelay(logDropped, 
LOG_DROPPED_INTERVAL_IN_MS, LOG_DROPPED_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
 
-        Function<String, ?> timeoutReporter = new Function<String, Object>()
+        Function<Pair<String, Pair<InetAddress, IMessageCallback>>, ?> 
timeoutReporter = new Function<Pair<String, Pair<InetAddress, 
IMessageCallback>>, Object>()
         {
-            public Object apply(String messageId)
+            public Object apply(Pair<String, Pair<InetAddress, 
IMessageCallback>> pair)
             {
-                Collection<InetAddress> addresses = targets.remove(messageId);
-                if (addresses == null)
-                    return null;
-
-                for (InetAddress address : addresses)
-                {
-                    for (ILatencySubscriber subscriber : subscribers)
-                        subscriber.receiveTiming(address, (double) 
DatabaseDescriptor.getRpcTimeout());
-                }
-
+                Pair<InetAddress, IMessageCallback> expiredValue = pair.right;
+                maybeAddLatency(expiredValue.right, expiredValue.left, 
(double) DatabaseDescriptor.getRpcTimeout());
                 return null;
             }
         };
-        callbacks = new ExpiringMap<String, IMessageCallback>((long) (1.1 * 
DatabaseDescriptor.getRpcTimeout()), timeoutReporter);
+        callbacks = new ExpiringMap<String, Pair<InetAddress, 
IMessageCallback>>((long) (1.1 * DatabaseDescriptor.getRpcTimeout()), 
timeoutReporter);
 
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         try
@@ -146,6 +137,24 @@ public final class MessagingService impl
         }
     }
 
+    /**
+     * Track latency information for the dynamic snitch
+     * @param cb: the callback associated with this message -- this lets us 
know if it's a message type we're interested in
+     * @param address: the host that replied to the message
+     * @param latency
+     */
+    public void maybeAddLatency(IMessageCallback cb, InetAddress address, 
double latency)
+    {
+        if (cb instanceof ReadCallback || cb instanceof AsyncResult)
+            addLatency(address, latency);
+    }
+
+    public void addLatency(InetAddress address, double latency)
+    {
+        for (ILatencySubscriber subscriber : subscribers)
+            subscriber.receiveTiming(address, latency);
+    }
+
     public static byte[] hash(String type, byte data[])
     {
         byte result[];
@@ -247,49 +256,9 @@ public final class MessagingService impl
         return verbHandlers_.get(type);
     }
 
-    /**
-     * Send a message to a given endpoint.
-     * @param message message to be sent.
-     * @param to endpoint to which the message needs to be sent
-     * @return an reference to an IAsyncResult which can be queried for the
-     * response
-     */
-    public String sendRR(Message message, Collection<InetAddress> to, 
IAsyncCallback cb)
+    private void addCallback(IMessageCallback cb, String messageId, 
InetAddress to)
     {
-        String messageId = message.getMessageId();
-        addCallback(cb, messageId);
-        for (InetAddress endpoint : to)
-        {
-            putTarget(messageId, endpoint);
-            sendOneWay(message, endpoint);
-        }
-        return messageId;
-    }
-
-    private void putTarget(String messageId, InetAddress endpoint)
-    {
-        Collection<InetAddress> addresses = targets.get(messageId);
-        if (addresses == null)
-        {
-            addresses = new NonBlockingHashSet<InetAddress>();
-            Collection<InetAddress> oldAddresses = 
targets.putIfAbsent(messageId, addresses);
-            if (oldAddresses != null)
-                addresses = oldAddresses;
-        }
-        addresses.add(endpoint);
-    }
-
-    private void removeTarget(String messageId, InetAddress from)
-    {
-        Collection<InetAddress> addresses = targets.get(messageId);
-        // null is expected if we removed the callback or we got a reply after 
its timeout expired
-        if (addresses != null)
-            addresses.remove(from);
-    }
-
-    public void addCallback(IAsyncCallback cb, String messageId)
-    {
-        callbacks.put(messageId, cb);
+        callbacks.put(messageId, new Pair<InetAddress, IMessageCallback>(to, 
cb));
     }
 
     /**
@@ -305,8 +274,7 @@ public final class MessagingService impl
     public String sendRR(Message message, InetAddress to, IAsyncCallback cb)
     {        
         String messageId = message.getMessageId();
-        addCallback(cb, messageId);
-        putTarget(messageId, to);
+        addCallback(cb, messageId, to);
         sendOneWay(message, to);
         return messageId;
     }
@@ -358,8 +326,7 @@ public final class MessagingService impl
     public IAsyncResult sendRR(Message message, InetAddress to)
     {
         IAsyncResult iar = new AsyncResult();
-        callbacks.put(message.getMessageId(), iar);
-        putTarget(message.getMessageId(), to);
+        addCallback(iar, message.getMessageId(), to);
         sendOneWay(message, to);
         return iar;
     }
@@ -420,14 +387,8 @@ public final class MessagingService impl
         stage.execute(runnable);
     }
 
-    public IMessageCallback getRegisteredCallback(String messageId)
+    public Pair<InetAddress, IMessageCallback> removeRegisteredCallback(String 
messageId)
     {
-        return callbacks.get(messageId);
-    }
-    
-    public IMessageCallback removeRegisteredCallback(String messageId)
-    {
-        targets.remove(messageId);
         return callbacks.remove(messageId);
     }
 
@@ -436,11 +397,6 @@ public final class MessagingService impl
         return callbacks.getAge(messageId);
     }
 
-    public void responseReceivedFrom(String messageId, InetAddress from)
-    {
-        removeTarget(messageId, from);
-    }
-
     public static void validateMagic(int magic) throws IOException
     {
         if (magic != PROTOCOL_MAGIC)

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/ResponseVerbHandler.java?rev=1064340&r1=1064339&r2=1064340&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
 Thu Jan 27 22:28:06 2011
@@ -25,27 +25,22 @@ import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.locator.ILatencyPublisher;
-import org.apache.cassandra.locator.ILatencySubscriber;
+import org.apache.cassandra.utils.Pair;
 
-public class ResponseVerbHandler implements IVerbHandler, ILatencyPublisher
+public class ResponseVerbHandler implements IVerbHandler
 {
     private static final Logger logger_ = LoggerFactory.getLogger( 
ResponseVerbHandler.class );
-    private List<ILatencySubscriber>  subscribers = new 
ArrayList<ILatencySubscriber>();
-
 
     public void doVerb(Message message)
     {     
         String messageId = message.getMessageId();
-        MessagingService.instance().responseReceivedFrom(messageId, 
message.getFrom());
         double age = System.currentTimeMillis() - 
MessagingService.instance().getRegisteredCallbackAge(messageId);
-        IMessageCallback cb = 
MessagingService.instance().getRegisteredCallback(messageId);
-        if (cb == null)
+        Pair<InetAddress, IMessageCallback> pair = 
MessagingService.instance().removeRegisteredCallback(messageId);
+        if (pair == null)
             return;
 
-        // if cb is not null, then age will be valid
-        for (ILatencySubscriber subscriber : subscribers)
-            subscriber.receiveTiming(message.getFrom(), age);
+        IMessageCallback cb = pair.right;
+        MessagingService.instance().maybeAddLatency(cb, message.getFrom(), 
age);
 
         if (cb instanceof IAsyncCallback)
         {
@@ -60,9 +55,4 @@ public class ResponseVerbHandler impleme
             ((IAsyncResult) cb).result(message);
         }
     }
-
-    public void register(ILatencySubscriber subscriber)
-    {
-        subscribers.add(subscriber);
-    }
 }

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java?rev=1064340&r1=1064339&r2=1064340&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
 Thu Jan 27 22:28:06 2011
@@ -70,13 +70,6 @@ public abstract class AbstractWriteRespo
         }
     }
 
-    public void addHintCallback(Message hintedMessage, InetAddress destination)
-    {
-        // (non-destination hints are part of the callback and count towards 
consistency only under CL.ANY)
-        if (writeEndpoints.contains(destination) || consistencyLevel == 
ConsistencyLevel.ANY)
-            MessagingService.instance().addCallback(this, 
hintedMessage.getMessageId());
-    }
-
     /** null message means "response from local write" */
     public abstract void response(Message msg);
 

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IWriteResponseHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IWriteResponseHandler.java?rev=1064340&r1=1064339&r2=1064340&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IWriteResponseHandler.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IWriteResponseHandler.java
 Thu Jan 27 22:28:06 2011
@@ -20,16 +20,13 @@ package org.apache.cassandra.service;
  *
  */
 
-import java.net.InetAddress;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.cassandra.net.IAsyncCallback;
-import org.apache.cassandra.net.Message;
 import org.apache.cassandra.thrift.UnavailableException;
 
 public interface IWriteResponseHandler extends IAsyncCallback
 {
     public void get() throws TimeoutException;
-    public void addHintCallback(Message hintedMessage, InetAddress 
destination);
     public void assureSufficientLiveNodes() throws UnavailableException;
 }

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1064340&r1=1064339&r2=1064340&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
 Thu Jan 27 22:28:06 2011
@@ -131,7 +131,6 @@ public class StorageProxy implements Sto
                 
                 // Multimap that holds onto all the messages and addresses 
meant for a specific datacenter
                 Map<String, Multimap<Message, InetAddress>> dcMessages = new 
HashMap<String, Multimap<Message, InetAddress>>(hintedEndpoints.size());
-                Message unhintedMessage = null;
 
                 for (Map.Entry<InetAddress, Collection<InetAddress>> entry : 
hintedEndpoints.asMap().entrySet())
                 {
@@ -150,15 +149,10 @@ public class StorageProxy implements Sto
                         else
                         {
                             // belongs on a different server
-                            if (unhintedMessage == null)
-                            {
-                                unhintedMessage = rm.makeRowMutationMessage();
-                                
MessagingService.instance().addCallback(responseHandler, 
unhintedMessage.getMessageId());
-                            }
+                            Message unhintedMessage = 
rm.makeRowMutationMessage();
                             if (logger.isDebugEnabled())
                                 logger.debug("insert writing key " + 
ByteBufferUtil.bytesToHex(rm.key()) + " to " + unhintedMessage.getMessageId() + 
"@" + destination);
                             
-                            
                             Multimap<Message, InetAddress> messages = 
dcMessages.get(dc);
                             if (messages == null)
                             {
@@ -182,7 +176,12 @@ public class StorageProxy implements Sto
                                     logger.debug("insert writing key " + 
ByteBufferUtil.bytesToHex(rm.key()) + " to " + hintedMessage.getMessageId() + 
"@" + destination + " for " + target);
                             }
                         }
-                        responseHandler.addHintCallback(hintedMessage, 
destination);
+                        // (non-destination hints are part of the callback and 
count towards consistency only under CL.ANY)
+                        // (non-destination hints are part of the callback and 
count towards consistency only under CL.ANY)
+                        if (writeEndpoints.contains(destination) || 
consistency_level == ConsistencyLevel.ANY)
+                            MessagingService.instance().sendRR(hintedMessage, 
destination, responseHandler);
+                        else
+                            
MessagingService.instance().sendOneWay(hintedMessage, destination);
 
                         Multimap<Message, InetAddress> messages = 
dcMessages.get(dc);
                         
@@ -196,7 +195,7 @@ public class StorageProxy implements Sto
                     }
                 }
 
-                sendMessages(localDataCenter, dcMessages);
+                sendMessages(localDataCenter, dcMessages, responseHandler);
             }
                         
             // wait for writes.  throws timeoutexception if necessary
@@ -219,7 +218,7 @@ public class StorageProxy implements Sto
     /**
      * for each datacenter, send a message to one node to relay the write to 
other replicas
      */
-    private static void sendMessages(String localDataCenter, Map<String, 
Multimap<Message, InetAddress>> dcMessages)
+    private static void sendMessages(String localDataCenter, Map<String, 
Multimap<Message, InetAddress>> dcMessages, IWriteResponseHandler handler)
     throws IOException
     {
         for (Map.Entry<String, Multimap<Message, InetAddress>> entry: 
dcMessages.entrySet())
@@ -230,15 +229,12 @@ public class StorageProxy implements Sto
             for (Map.Entry<Message, Collection<InetAddress>> messages: 
entry.getValue().asMap().entrySet())
             {
                 Message message = messages.getKey();
-                // a single message object is used for unhinted writes, so 
clean out any forwards
-                // from previous loop iterations
-                message.removeHeader(RowMutation.FORWARD_HEADER);
 
                 if (dataCenter.equals(localDataCenter))
                 {
                     // direct writes to local DC
                     for (InetAddress destination : messages.getValue())
-                        MessagingService.instance().sendOneWay(message, 
destination);
+                        MessagingService.instance().sendRR(message, 
destination, handler);
                 }
                 else
                 {
@@ -262,7 +258,7 @@ public class StorageProxy implements Sto
                         message.setHeader(RowMutation.FORWARD_HEADER, 
bos.toByteArray());
                     }
                     // send the combined message + forward headers
-                    MessagingService.instance().sendOneWay(message, target);
+                    MessagingService.instance().sendRR(message, target, 
handler);
                 }
             }
         }
@@ -373,7 +369,7 @@ public class StorageProxy implements Sto
             {
                 if (logger.isDebugEnabled())
                     logger.debug("reading data for " + command + " locally");
-                StageManager.getStage(Stage.READ).submit(new 
WeakReadLocalRunnable(command, handler));
+                StageManager.getStage(Stage.READ).submit(new 
LocalReadRunnable(command, handler));
             }
             else
             {
@@ -392,7 +388,7 @@ public class StorageProxy implements Sto
                 {
                     if (logger.isDebugEnabled())
                         logger.debug("reading digest for " + command + " 
locally");
-                    StageManager.getStage(Stage.READ).submit(new 
WeakReadLocalRunnable(digestCommand, handler));
+                    StageManager.getStage(Stage.READ).submit(new 
LocalReadRunnable(digestCommand, handler));
                 }
                 else
                 {
@@ -461,12 +457,13 @@ public class StorageProxy implements Sto
         return rows;
     }
 
-    static class WeakReadLocalRunnable extends WrappedRunnable
+    static class LocalReadRunnable extends WrappedRunnable
     {
         private final ReadCommand command;
         private final ReadCallback<Row> handler;
+        private final long start = System.currentTimeMillis();
 
-        WeakReadLocalRunnable(ReadCommand command, ReadCallback<Row> handler)
+        LocalReadRunnable(ReadCommand command, ReadCallback<Row> handler)
         {
             this.command = command;
             this.handler = handler;
@@ -475,10 +472,11 @@ public class StorageProxy implements Sto
         protected void runMayThrow() throws IOException
         {
             if (logger.isDebugEnabled())
-                logger.debug("weakreadlocal reading " + command);
+                logger.debug("LocalReadRunnable reading " + command);
 
             Table table = Table.open(command.table);
             ReadResponse result = ReadVerbHandler.getResponse(command, 
command.getRow(table));
+            
MessagingService.instance().addLatency(FBUtilities.getLocalAddress(), 
System.currentTimeMillis() - start);
             handler.response(result);
         }
     }
@@ -497,8 +495,11 @@ public class StorageProxy implements Sto
     {
         ReadResponseResolver resolver = new 
ReadResponseResolver(command.table, command.key);
         RepairCallback<Row> handler = new RepairCallback<Row>(resolver, 
endpoints);
-        Message messageRepair = command.makeReadMessage();
-        MessagingService.instance().sendRR(messageRepair, endpoints, handler);
+        for (InetAddress endpoint : endpoints)
+        {
+            Message messageRepair = command.makeReadMessage();
+            MessagingService.instance().sendRR(messageRepair, endpoint, 
handler);
+        }
         return handler;
     }
 
@@ -601,21 +602,26 @@ public class StorageProxy implements Sto
         final String myVersion = 
DatabaseDescriptor.getDefsVersion().toString();
         final Map<InetAddress, UUID> versions = new 
ConcurrentHashMap<InetAddress, UUID>();
         final Set<InetAddress> liveHosts = Gossiper.instance.getLiveMembers();
-        final Message msg = new Message(FBUtilities.getLocalAddress(), 
StorageService.Verb.SCHEMA_CHECK, ArrayUtils.EMPTY_BYTE_ARRAY);
         final CountDownLatch latch = new CountDownLatch(liveHosts.size());
-        // an empty message acts as a request to the SchemaCheckVerbHandler.
-        MessagingService.instance().sendRR(msg, liveHosts, new IAsyncCallback()
+
+        IAsyncCallback cb = new IAsyncCallback()
         {
-            public void response(Message msg)
+            public void response(Message message)
             {
                 // record the response from the remote node.
-                logger.debug("Received schema check response from " + 
msg.getFrom().getHostAddress());
-                UUID theirVersion = UUID.fromString(new 
String(msg.getMessageBody()));
-                versions.put(msg.getFrom(), theirVersion);
+                logger.debug("Received schema check response from " + 
message.getFrom().getHostAddress());
+                UUID theirVersion = UUID.fromString(new 
String(message.getMessageBody()));
+                versions.put(message.getFrom(), theirVersion);
                 latch.countDown();
             }
-        });
-        
+        };
+        // an empty message acts as a request to the SchemaCheckVerbHandler.
+        for (InetAddress endpoint : liveHosts)
+        {
+            Message message = new Message(FBUtilities.getLocalAddress(), 
StorageService.Verb.SCHEMA_CHECK, ArrayUtils.EMPTY_BYTE_ARRAY);
+            MessagingService.instance().sendRR(message, endpoint, cb);
+        }
+
         try
         {
             // wait for as long as possible. timeout-1s if possible.
@@ -895,8 +901,11 @@ public class StorageProxy implements Sto
         // Send out the truncate calls and track the responses with the 
callbacks.
         logger.debug("Starting to send truncate messages to hosts {}", 
allEndpoints);
         Truncation truncation = new Truncation(keyspace, cfname);
-        Message message = truncation.makeTruncationMessage();
-        MessagingService.instance().sendRR(message, allEndpoints, 
responseHandler);
+        for (InetAddress endpoint : allEndpoints)
+        {
+            Message message = truncation.makeTruncationMessage();
+            MessagingService.instance().sendRR(message, endpoint, 
responseHandler);
+        }
 
         // Wait for all
         logger.debug("Sent all truncate messages, now waiting for {} 
responses", blockFor);

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ExpiringMap.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ExpiringMap.java?rev=1064340&r1=1064339&r2=1064340&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ExpiringMap.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ExpiringMap.java
 Thu Jan 27 22:28:06 2011
@@ -18,10 +18,7 @@
 
 package org.apache.cassandra.utils;
 
-import java.util.Enumeration;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.*;
 import java.util.concurrent.Callable;
 
 import com.google.common.base.Function;
@@ -33,7 +30,7 @@ import org.cliffc.high_scale_lib.NonBloc
 public class ExpiringMap<K, V>
 {
     private static final Logger logger = 
LoggerFactory.getLogger(ExpiringMap.class);
-    private final Function<K, ?> postExpireHook;
+    private final Function<Pair<K,V>, ?> postExpireHook;
 
     private static class CacheableObject<T>
     {
@@ -69,18 +66,12 @@ public class ExpiringMap<K, V>
         @Override
         public void run()
         {
-            synchronized (cache)
+            for (Map.Entry<K, CacheableObject> entry : cache.entrySet())
             {
-                Enumeration<K> e = cache.keys();
-                while (e.hasMoreElements())
+                if (entry.getValue().isReadyToDie(expiration))
                 {
-                    K key = e.nextElement();
-                    CacheableObject co = cache.get(key);
-                    if (co != null && co.isReadyToDie(expiration))
-                    {
-                        cache.remove(key);
-                        postExpireHook.apply(key);
-                    }
+                    cache.remove(entry.getKey());
+                    postExpireHook.apply(new Pair(entry.getKey(), 
entry.getValue().getValue()));
                 }
             }
         }
@@ -99,7 +90,7 @@ public class ExpiringMap<K, V>
      *
      * @param expiration the TTL for objects in the cache in milliseconds
      */
-    public ExpiringMap(long expiration, Function<K, ?> postExpireHook)
+    public ExpiringMap(long expiration, Function<Pair<K,V>, ?> postExpireHook)
     {
         this.postExpireHook = postExpireHook;
         if (expiration <= 0)


Reply via email to