Author: jbellis
Date: Thu Dec  2 17:39:36 2010
New Revision: 1041486

URL: http://svn.apache.org/viewvc?rev=1041486&view=rev
Log:
r/m out-of-date contrib/maven for CASSANDRA-1805

Removed:
    cassandra/branches/cassandra-0.7/contrib/maven/
Modified:
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutation.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutation.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutation.java?rev=1041486&r1=1041485&r2=1041486&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutation.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutation.java
 Thu Dec  2 17:39:36 2010
@@ -47,6 +47,7 @@ public class RowMutation
 {
     private static ICompactSerializer<RowMutation> serializer_;
     public static final String HINT = "HINT";
+    public static final String FORWARD_HEADER = "FORWARD";
 
     static
     {

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=1041486&r1=1041485&r2=1041486&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
 Thu Dec  2 17:39:36 2010
@@ -18,25 +18,23 @@
 
 package org.apache.cassandra.db;
 
-import java.io.*;
-
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
 import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 
 import com.google.common.base.Charsets;
-
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.net.*;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
-import static com.google.common.base.Charsets.UTF_8;
-
 
 public class RowMutationVerbHandler implements IVerbHandler
 {
@@ -69,6 +67,11 @@ public class RowMutationVerbHandler impl
                     hintedMutation.apply();
                 }
             }
+        
+            // Check if there were any forwarding headers in this message
+            byte[] forwardBytes = 
message.getHeader(RowMutation.FORWARD_HEADER);
+            if (forwardBytes != null)
+                forwardToLocalNodes(message, forwardBytes);
 
             Table.open(rm.getTable()).apply(rm, bytes, true);
 
@@ -82,5 +85,34 @@ public class RowMutationVerbHandler impl
         {
             logger_.error("Error in row mutation", e);
         }
+    }  
+    
+    private void forwardToLocalNodes(Message message, byte[] forwardBytes) 
throws UnknownHostException
+    {
+        // remove fwds from message to avoid infinite loop
+        message.setHeader(RowMutation.FORWARD_HEADER, null);
+
+        int bytesPerInetAddress = 
FBUtilities.getLocalAddress().getAddress().length;
+        assert forwardBytes.length >= bytesPerInetAddress;
+        assert forwardBytes.length % bytesPerInetAddress == 0;
+
+        int offset = 0;
+        byte[] addressBytes = new byte[bytesPerInetAddress];
+
+        // Send a message to each of the addresses on our Forward List
+        while (offset < forwardBytes.length)
+        {
+            System.arraycopy(forwardBytes, offset, addressBytes, 0, 
bytesPerInetAddress);
+            InetAddress address = InetAddress.getByAddress(addressBytes);
+
+            if (logger_.isDebugEnabled())
+                logger_.debug("Forwarding message to " + address);
+
+            // Send the original message to the address specified by the 
FORWARD_HINT
+            // Let the response go back to the coordinator
+            MessagingService.instance.sendOneWay(message, message.getFrom());
+
+            offset += bytesPerInetAddress;
+        }
     }
 }

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1041486&r1=1041485&r2=1041486&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
 Thu Dec  2 17:39:36 2010
@@ -27,20 +27,23 @@ import java.util.concurrent.*;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
-import static com.google.common.base.Charsets.UTF_8;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.dht.*;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.TokenMetadata;
@@ -53,7 +56,8 @@ import org.apache.cassandra.utils.FBUtil
 import org.apache.cassandra.utils.LatencyTracker;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.WrappedRunnable;
-import org.apache.cassandra.db.filter.QueryFilter;
+
+import static com.google.common.base.Charsets.UTF_8;
 
 public class StorageProxy implements StorageProxyMBean
 {
@@ -90,13 +94,14 @@ public class StorageProxy implements Sto
      * @param mutations the mutations to be applied across the replicas
      * @param consistency_level the consistency level for the operation
     */
-    public static void mutate(List<RowMutation> mutations, ConsistencyLevel 
consistency_level) throws UnavailableException, TimeoutException
+    public static void mutate(List<RowMutation> mutations, ConsistencyLevel 
consistencyLevel) throws UnavailableException, TimeoutException
     {
         long startTime = System.nanoTime();
-        ArrayList<IWriteResponseHandler> responseHandlers = new 
ArrayList<IWriteResponseHandler>();
+        List<IWriteResponseHandler> responseHandlers = new 
ArrayList<IWriteResponseHandler>();
 
         RowMutation mostRecentRowMutation = null;
         StorageService ss = StorageService.instance;
+        String localDataCenter = getDataCenter(FBUtilities.getLocalAddress());
         
         try
         {
@@ -110,58 +115,67 @@ public class StorageProxy implements Sto
                 Collection<InetAddress> writeEndpoints = 
ss.getTokenMetadata().getWriteEndpoints(StorageService.getPartitioner().getToken(rm.key()),
 table, naturalEndpoints);
                 Multimap<InetAddress, InetAddress> hintedEndpoints = 
rs.getHintedEndpoints(writeEndpoints);
                 
-                // send out the writes, as in mutate() above, but this time 
with a callback that tracks responses
-                final IWriteResponseHandler responseHandler = 
rs.getWriteResponseHandler(writeEndpoints, hintedEndpoints, consistency_level);
+                final IWriteResponseHandler responseHandler = 
rs.getWriteResponseHandler(writeEndpoints, hintedEndpoints, consistencyLevel);
+                
+                // exit early if we can't fulfuill the CL at this time
                 responseHandler.assureSufficientLiveNodes();
-
+                
                 responseHandlers.add(responseHandler);
-                Message unhintedMessage = null;
-                for (Map.Entry<InetAddress, Collection<InetAddress>> entry : 
hintedEndpoints.asMap().entrySet())
-                {
-                    InetAddress destination = entry.getKey();
-                    Collection<InetAddress> targets = entry.getValue();
+                
+                // Creates a Multimap that holds onto all the messages and 
addresses meant for a specific datacenter.
+                Multimap<String, Pair<Message, InetAddress>> dcMap = 
groupEndpointsByDataCenter(rm, hintedEndpoints, responseHandler);
+                               
+                               // Traverse all dataCenters where messages will 
be sent to.
+                for (Map.Entry<String, Collection<Pair<Message, InetAddress>>> 
entry : dcMap.asMap().entrySet())
+                {
+                    String dataCenter = entry.getKey();
+                    
+                    // Grab a set of all the messages bound for this 
dataCenter and create an iterator over this set. 
+                    Collection<Pair<Message, InetAddress>> 
messagesForDataCenter = entry.getValue();
+                    Iterator<Pair<Message, InetAddress>> iter = 
messagesForDataCenter.iterator();
+                    assert iter.hasNext();
+
+                    // First endpoint in list is the destination for this group
+                    Pair<Message, InetAddress> messageAndDestination = 
iter.next();
 
-                    if (targets.size() == 1 && 
targets.iterator().next().equals(destination))
+                    Message primaryMessage = messageAndDestination.left;
+                    InetAddress target = messageAndDestination.right;
+
+                    // Add all the other destinations that are bound for the 
same dataCenter as a header in the primary message.
+                    while (iter.hasNext())
                     {
-                        // unhinted writes
-                        if (destination.equals(FBUtilities.getLocalAddress()))
+                        messageAndDestination = iter.next();
+                        assert messageAndDestination.left == primaryMessage;
+                       
+                        if (dataCenter.equals(localDataCenter))
                         {
-                            insertLocalMessage(rm, responseHandler);
+                            // direct write to local DC
+                            assert 
primaryMessage.getHeader(RowMutation.FORWARD_HEADER) == null;
+                            
MessagingService.instance.sendOneWay(primaryMessage, target);
                         }
                         else
                         {
-                            // belongs on a different server.  send it there.
-                            if (unhintedMessage == null)
-                            {
-                                unhintedMessage = rm.makeRowMutationMessage();
-                                
MessagingService.instance.addCallback(responseHandler, 
unhintedMessage.getMessageId());
-                            }
-                            if (logger.isDebugEnabled())
-                                logger.debug("insert writing key " + 
FBUtilities.bytesToHex(rm.key()) + " to " + unhintedMessage.getMessageId() + 
"@" + destination);
-                            
MessagingService.instance.sendOneWay(unhintedMessage, destination);
-                        }
-                    }
-                    else
-                    {
-                        // hinted
-                        Message hintedMessage = rm.makeRowMutationMessage();
-                        for (InetAddress target : targets)
-                        {
-                            if (!target.equals(destination))
-                            {
-                                addHintHeader(hintedMessage, target);
-                                if (logger.isDebugEnabled())
-                                    logger.debug("insert writing key " + 
FBUtilities.bytesToHex(rm.key()) + " to " + hintedMessage.getMessageId() + "@" 
+ destination + " for " + target);
-                            }
+                            // group all nodes in this DC as forward headers 
on the primary message
+                            ByteArrayOutputStream bos = new 
ByteArrayOutputStream();
+                            DataOutputStream dos = new DataOutputStream(bos);
+
+                            // append to older addresses
+                            byte[] previousHints = 
primaryMessage.getHeader(RowMutation.FORWARD_HEADER);
+                            if (previousHints != null)
+                                dos.write(previousHints);
+
+                            
dos.write(messageAndDestination.right.getAddress());
+                            
primaryMessage.setHeader(RowMutation.FORWARD_HEADER, bos.toByteArray());
                         }
-                        responseHandler.addHintCallback(hintedMessage, 
destination);
-                        MessagingService.instance.sendOneWay(hintedMessage, 
destination);
-                    }
+                    }                                
+                    
+                    MessagingService.instance.sendOneWay(primaryMessage, 
target);
                 }
             }
+                        
             // wait for writes.  throws timeoutexception if necessary
             for (IWriteResponseHandler responseHandler : responseHandlers)
-            {
+            {                  
                 responseHandler.get();
             }
         }
@@ -178,6 +192,66 @@ public class StorageProxy implements Sto
         }
 
     }
+    
+    private static Multimap<String, Pair<Message, InetAddress>> 
groupEndpointsByDataCenter(RowMutation rm, Multimap<InetAddress, InetAddress> 
endpoints,  final IWriteResponseHandler responseHandler) throws IOException
+    {
+     
+        Set<Map.Entry<InetAddress, Collection<InetAddress>>> endpointSet = 
endpoints.asMap().entrySet();
+        Multimap<String, Pair<Message, InetAddress>> dcMap = 
HashMultimap.create(endpointSet.size(), 10);
+        Message unhintedMessage = null;
+        
+        for (Map.Entry<InetAddress, Collection<InetAddress>> entry : 
endpointSet)
+        {
+            InetAddress destination = entry.getKey();
+            Collection<InetAddress> targets = entry.getValue();                
   
+
+            String dataCenter = getDataCenter(destination);
+            
+            if (targets.size() == 1 && 
targets.iterator().next().equals(destination))
+            {
+                // unhinted writes
+                if (destination.equals(FBUtilities.getLocalAddress()))
+                {                          
+                    insertLocalMessage(rm, responseHandler);
+                }
+                else
+                {
+                    // belongs on a different server.
+                    if (unhintedMessage == null)
+                    {
+                        unhintedMessage = rm.makeRowMutationMessage(); 
+                        MessagingService.instance.addCallback(responseHandler, 
unhintedMessage.getMessageId());
+                    }
+                
+                    if (logger.isDebugEnabled())
+                        logger.debug("insert writing key " + 
FBUtilities.bytesToHex(rm.key()) + " to " + unhintedMessage.getMessageId() + 
"@" + destination);
+                
+                    dcMap.put(dataCenter, new 
Pair<Message,InetAddress>(unhintedMessage, destination));
+                }
+            }
+            else
+            {
+                // hinted
+                Message hintedMessage = rm.makeRowMutationMessage();
+                
+                for (InetAddress target : targets)
+                {
+                    if (!target.equals(destination))
+                    {
+                        addHintHeader(hintedMessage, target);
+                        if (logger.isDebugEnabled())
+                            logger.debug("insert writing key " + 
FBUtilities.bytesToHex(rm.key()) + " to " + hintedMessage.getMessageId() + "@" 
+ destination + " for " + target);
+                    }
+                }
+                
+                responseHandler.addHintCallback(hintedMessage, destination);
+                dcMap.put(dataCenter, new 
Pair<Message,InetAddress>(hintedMessage, destination));
+            }
+        }
+        
+        return dcMap;
+    }
+
 
     private static void addHintHeader(Message message, InetAddress target) 
throws IOException
     {
@@ -192,6 +266,22 @@ public class StorageProxy implements Sto
         message.setHeader(RowMutation.HINT, bos.toByteArray());
     }
 
+    private static String getDataCenter(InetAddress addr)
+    {
+        String dataCenter = null;
+        try
+        {
+            dataCenter = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(addr);   
+        }
+        catch (UnsupportedOperationException e)
+        {
+            // SimpleSnitch throws this
+            dataCenter = "default";
+        }
+        
+        return dataCenter;
+    }
+    
     private static void insertLocalMessage(final RowMutation rm, final 
IWriteResponseHandler responseHandler)
     {
         if (logger.isDebugEnabled())


Reply via email to