Author: brandonwilliams Date: Wed Oct 20 19:17:06 2010 New Revision: 1025680
URL: http://svn.apache.org/viewvc?rev=1025680&view=rev Log: Add framing to hint destinations. Patch by brandonwilliams, reviewed by jbellis for CASSANDRA-1617 Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=1025680&r1=1025679&r2=1025680&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Wed Oct 20 19:17:06 2010 @@ -55,13 +55,12 @@ public class RowMutationVerbHandler impl if (hintedBytes != null) { assert hintedBytes.length > 0; - ByteBuffer bb = ByteBuffer.wrap(hintedBytes); - byte[] addressBytes = new byte[FBUtilities.getLocalAddress().getHostAddress().getBytes(UTF_8).length]; - while (bb.remaining() > 0) + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(hintedBytes)); + while (dis.available() > 0) { - bb.get(addressBytes); + byte[] addressBytes = FBUtilities.readShortByteArray(dis); if (logger_.isDebugEnabled()) - logger_.debug("Adding hint for " + InetAddress.getByAddress(addressBytes)); + logger_.debug("Adding hint for " + InetAddress.getByName(new String(addressBytes))); RowMutation hintedMutation = new RowMutation(Table.SYSTEM_TABLE, addressBytes); hintedMutation.addHints(rm); hintedMutation.apply(); Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1025680&r1=1025679&r2=1025680&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed Oct 20 19:17:06 2010 @@ -18,9 +18,7 @@ package org.apache.cassandra.service; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.IOException; +import java.io.*; import java.lang.management.ManagementFactory; import java.net.InetAddress; import java.util.*; @@ -180,12 +178,17 @@ public class StorageProxy implements Sto } - private static void addHintHeader(Message message, InetAddress target) + private static void addHintHeader(Message message, InetAddress target) throws IOException { - byte[] oldHint = message.getHeader(RowMutation.HINT); - byte[] address = target.getHostAddress().getBytes(UTF_8); - byte[] hint = oldHint == null ? address : ArrayUtils.addAll(oldHint, address); - message.setHeader(RowMutation.HINT, hint); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + byte[] previousHints = message.getHeader(RowMutation.HINT); + if (previousHints != null) + { + dos.write(previousHints); + } + FBUtilities.writeShortByteArray(target.getHostAddress().getBytes(UTF_8), dos); + message.setHeader(RowMutation.HINT, bos.toByteArray()); } private static void insertLocalMessage(final RowMutation rm, final IWriteResponseHandler responseHandler)
