Author: jbellis
Date: Tue Jan 25 20:12:15 2011
New Revision: 1063435
URL: http://svn.apache.org/viewvc?rev=1063435&view=rev
Log:
clean out forward headers from message between loops
patch by ivancso and jbellis; reviewed by tjake for CASSANDRA-2051
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1063435&r1=1063434&r2=1063435&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
Tue Jan 25 20:12:15 2011
@@ -226,31 +226,29 @@ public class StorageProxy implements Sto
{
String dataCenter = entry.getKey();
- // Grab a set of all the messages bound for this dataCenter and
create an iterator over this set.
- Map<Message, Collection<InetAddress>> messagesForDataCenter =
entry.getValue().asMap();
-
- for (Map.Entry<Message, Collection<InetAddress>> messages:
messagesForDataCenter.entrySet())
+ // send the messages corresponding to this datacenter
+ for (Map.Entry<Message, Collection<InetAddress>> messages:
entry.getValue().asMap().entrySet())
{
Message message = messages.getKey();
- Iterator<InetAddress> iter = messages.getValue().iterator();
- assert iter.hasNext();
-
- // First endpoint in list is the destination for this group
- InetAddress target = iter.next();
+ // a single message object is used for unhinted writes, so
clean out any forwards
+ // from previous loop iterations
+ message.removeHeader(RowMutation.FORWARD_HEADER);
- // Add all the other destinations that are bound for the same
dataCenter as a header in the primary message.
- while (iter.hasNext())
+ if (dataCenter.equals(localDataCenter))
{
- InetAddress destination = iter.next();
-
- if (dataCenter.equals(localDataCenter))
- {
- // direct write to local DC
- assert message.getHeader(RowMutation.FORWARD_HEADER)
== null;
+ // direct writes to local DC
+ for (InetAddress destination : messages.getValue())
MessagingService.instance().sendOneWay(message,
destination);
- }
- else
+ }
+ else
+ {
+ // Non-local DC. First endpoint in list is the destination
for this group
+ Iterator<InetAddress> iter =
messages.getValue().iterator();
+ InetAddress target = iter.next();
+ // Add all the other destinations of the same message as a
header in the primary message.
+ while (iter.hasNext())
{
+ InetAddress destination = iter.next();
// group all nodes in this DC as forward headers on
the primary message
ByteArrayOutputStream bos = new
ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
@@ -263,9 +261,9 @@ public class StorageProxy implements Sto
dos.write(destination.getAddress());
message.setHeader(RowMutation.FORWARD_HEADER,
bos.toByteArray());
}
+ // send the combined message + forward headers
+ MessagingService.instance().sendOneWay(message, target);
}
-
- MessagingService.instance().sendOneWay(message, target);
}
}
}