Author: brandonwilliams
Date: Wed Oct 20 19:17:06 2010
New Revision: 1025680

URL: http://svn.apache.org/viewvc?rev=1025680&view=rev
Log:
Add framing to hint destinations.  Patch by brandonwilliams, reviewed by 
jbellis for CASSANDRA-1617

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=1025680&r1=1025679&r2=1025680&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java 
Wed Oct 20 19:17:06 2010
@@ -55,13 +55,12 @@ public class RowMutationVerbHandler impl
             if (hintedBytes != null)
             {
                 assert hintedBytes.length > 0;
-                ByteBuffer bb = ByteBuffer.wrap(hintedBytes);
-                byte[] addressBytes = new 
byte[FBUtilities.getLocalAddress().getHostAddress().getBytes(UTF_8).length];
-                while (bb.remaining() > 0)
+                DataInputStream dis = new DataInputStream(new 
ByteArrayInputStream(hintedBytes));
+                while (dis.available() > 0)
                 {
-                    bb.get(addressBytes);
+                    byte[] addressBytes = FBUtilities.readShortByteArray(dis);
                     if (logger_.isDebugEnabled())
-                        logger_.debug("Adding hint for " + 
InetAddress.getByAddress(addressBytes));
+                        logger_.debug("Adding hint for " + 
InetAddress.getByName(new String(addressBytes)));
                     RowMutation hintedMutation = new 
RowMutation(Table.SYSTEM_TABLE, addressBytes);
                     hintedMutation.addHints(rm);
                     hintedMutation.apply();

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1025680&r1=1025679&r2=1025680&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed 
Oct 20 19:17:06 2010
@@ -18,9 +18,7 @@
 
 package org.apache.cassandra.service;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
+import java.io.*;
 import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.util.*;
@@ -180,12 +178,17 @@ public class StorageProxy implements Sto
 
     }
 
-    private static void addHintHeader(Message message, InetAddress target)
+    private static void addHintHeader(Message message, InetAddress target) 
throws IOException
     {
-        byte[] oldHint = message.getHeader(RowMutation.HINT);
-        byte[] address = target.getHostAddress().getBytes(UTF_8);
-        byte[] hint = oldHint == null ? address : ArrayUtils.addAll(oldHint, 
address);
-        message.setHeader(RowMutation.HINT, hint);
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(bos);
+        byte[] previousHints = message.getHeader(RowMutation.HINT);
+        if (previousHints != null)
+        {
+            dos.write(previousHints);
+        }
+        
FBUtilities.writeShortByteArray(target.getHostAddress().getBytes(UTF_8), dos);
+        message.setHeader(RowMutation.HINT, bos.toByteArray());
     }
 
     private static void insertLocalMessage(final RowMutation rm, final 
IWriteResponseHandler responseHandler)


Reply via email to