Author: jbellis
Date: Tue Mar 2 21:22:21 2010
New Revision: 918186
URL: http://svn.apache.org/viewvc?rev=918186&view=rev
Log:
clean up hints/headers, add ability to hint multiple targets per message.
patch by jbellis; reviewed by Ryan King for CASSANDRA-822
Modified:
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Header.java
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Message.java
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java
Modified:
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=918186&r1=918185&r2=918186&view=diff
==============================================================================
---
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
(original)
+++
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
Tue Mar 2 21:22:21 2010
@@ -21,12 +21,15 @@
import java.io.*;
import java.net.InetAddress;
+import java.nio.ByteBuffer;
+
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.log4j.Logger;
import org.apache.cassandra.net.*;
+import org.apache.cassandra.utils.FBUtilities;
public class RowMutationVerbHandler implements IVerbHandler
{
@@ -45,15 +48,21 @@
/* Check if there were any hints in this message */
byte[] hintedBytes = message.getHeader(RowMutation.HINT);
- if ( hintedBytes != null && hintedBytes.length > 0 )
+ if (hintedBytes != null)
{
- InetAddress hint = InetAddress.getByAddress(hintedBytes);
- if (logger_.isDebugEnabled())
- logger_.debug("Adding hint for " + hint);
- /* add necessary hints to this mutation */
- RowMutation hintedMutation = new
RowMutation(Table.SYSTEM_TABLE, rm.getTable());
- hintedMutation.addHints(rm.key(), hintedBytes);
- hintedMutation.apply();
+ assert hintedBytes.length > 0;
+ ByteBuffer bb = ByteBuffer.wrap(hintedBytes);
+ byte[] addressBytes = new
byte[FBUtilities.getLocalAddress().getAddress().length];
+ while (bb.remaining() > 0)
+ {
+ bb.get(addressBytes);
+ InetAddress hint = InetAddress.getByAddress(addressBytes);
+ if (logger_.isDebugEnabled())
+ logger_.debug("Adding hint for " + hint);
+ RowMutation hintedMutation = new
RowMutation(Table.SYSTEM_TABLE, rm.getTable());
+ hintedMutation.addHints(rm.key(), addressBytes);
+ hintedMutation.apply();
+ }
}
Table.open(rm.getTable()).apply(rm, bytes, true);
Modified:
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Header.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Header.java?rev=918186&r1=918185&r2=918186&view=diff
==============================================================================
---
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Header.java
(original)
+++
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Header.java
Tue Mar 2 21:22:21 2010
@@ -99,35 +99,15 @@
messageId_ = id;
}
- void setMessageType(String type)
- {
- type_ = type;
- }
-
- void setMessageVerb(StorageService.Verb verb)
- {
- verb_ = verb;
- }
-
byte[] getDetail(Object key)
{
return details_.get(key);
}
-
- void removeDetail(Object key)
- {
- details_.remove(key);
- }
-
- void addDetail(String key, byte[] value)
+
+ void setDetail(String key, byte[] value)
{
details_.put(key, value);
}
-
- Map<String, byte[]> getDetails()
- {
- return details_;
- }
}
class HeaderSerializer implements ICompactSerializer<Header>
Modified:
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Message.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Message.java?rev=918186&r1=918185&r2=918186&view=diff
==============================================================================
---
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Message.java
(original)
+++
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Message.java
Tue Mar 2 21:22:21 2010
@@ -21,7 +21,6 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.util.Map;
import java.net.InetAddress;
import org.apache.cassandra.concurrent.StageManager;
@@ -64,14 +63,9 @@
return header_.getDetail(key);
}
- public void addHeader(String key, byte[] value)
+ public void setHeader(String key, byte[] value)
{
- header_.addDetail(key, value);
- }
-
- public Map<String, byte[]> getHeaders()
- {
- return header_.getDetails();
+ header_.setDetail(key, value);
}
public byte[] getMessageBody()
Modified:
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java?rev=918186&r1=918185&r2=918186&view=diff
==============================================================================
---
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
Tue Mar 2 21:22:21 2010
@@ -27,11 +27,10 @@
import java.util.concurrent.Future;
import java.lang.management.ManagementFactory;
+import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import java.net.InetAddress;
@@ -145,7 +144,7 @@
else
{
Message hintedMessage =
rm.makeRowMutationMessage();
- hintedMessage.addHeader(RowMutation.HINT,
target.getAddress());
+ addHintHeader(hintedMessage, target);
if (logger.isDebugEnabled())
logger.debug("insert writing key " + rm.key()
+ " to " + hintedMessage.getMessageId() + "@" + hintedTarget + " for " +
target);
MessagingService.instance.sendOneWay(hintedMessage, hintedTarget);
@@ -163,7 +162,14 @@
writeStats.addNano(System.nanoTime() - startTime);
}
}
-
+
+ private static void addHintHeader(Message message, InetAddress target)
+ {
+ byte[] oldHint = message.getHeader(RowMutation.HINT);
+ byte[] hint = oldHint == null ? target.getAddress() :
ArrayUtils.addAll(oldHint, target.getAddress());
+ message.setHeader(RowMutation.HINT, hint);
+ }
+
public static void mutateBlocking(List<RowMutation> mutations,
ConsistencyLevel consistency_level) throws UnavailableException,
TimeoutException
{
long startTime = System.nanoTime();
@@ -214,7 +220,7 @@
else
{
Message hintedMessage = rm.makeRowMutationMessage();
- hintedMessage.addHeader(RowMutation.HINT,
naturalTarget.getAddress());
+ addHintHeader(hintedMessage, naturalTarget);
// (hints are part of the callback and count towards
consistency only under CL.ANY
if (consistency_level == ConsistencyLevel.ANY)
MessagingService.instance.addCallback(responseHandler,
hintedMessage.getMessageId());
@@ -343,7 +349,7 @@
if (logger.isDebugEnabled())
logger.debug("weakreadremote reading " + command + " from " +
message.getMessageId() + "@" + endPoint);
- message.addHeader(ReadCommand.DO_REPAIR,
ReadCommand.DO_REPAIR.getBytes());
+ message.setHeader(ReadCommand.DO_REPAIR,
ReadCommand.DO_REPAIR.getBytes());
iars.add(MessagingService.instance.sendRR(message, endPoint));
}
Modified:
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=918186&r1=918185&r2=918186&view=diff
==============================================================================
---
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java
(original)
+++
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java
Tue Mar 2 21:22:21 2010
@@ -130,7 +130,7 @@
StreamOutManager.get(target).addFilesToStream(pendingFiles);
StreamInitiateMessage biMessage = new
StreamInitiateMessage(pendingFiles);
Message message =
StreamInitiateMessage.makeStreamInitiateMessage(biMessage);
- message.addHeader(StreamOut.TABLE_NAME, table.getBytes());
+ message.setHeader(StreamOut.TABLE_NAME, table.getBytes());
if (logger.isDebugEnabled())
logger.debug("Sending a stream initiate message to " + target + "
...");
MessagingService.instance.sendOneWay(message, target);